aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/remote_server_connection.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/remote_server_connection.cpp
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/remote_server_connection.cpp')
-rw-r--r--library/cpp/messagebus/remote_server_connection.cpp73
1 files changed, 73 insertions, 0 deletions
diff --git a/library/cpp/messagebus/remote_server_connection.cpp b/library/cpp/messagebus/remote_server_connection.cpp
new file mode 100644
index 0000000000..74be34ded9
--- /dev/null
+++ b/library/cpp/messagebus/remote_server_connection.cpp
@@ -0,0 +1,73 @@
+#include "remote_server_connection.h"
+
+#include "mb_lwtrace.h"
+#include "remote_server_session.h"
+
+#include <util/generic/cast.h>
+
+LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER)
+
+using namespace NBus;
+using namespace NBus::NPrivate;
+
+TRemoteServerConnection::TRemoteServerConnection(TRemoteServerSessionPtr session, ui64 id, TNetAddr addr)
+ : TRemoteConnection(session.Get(), id, addr)
+{
+}
+
+void TRemoteServerConnection::Init(SOCKET socket, TInstant now) {
+ WriterData.Status.ConnectTime = now;
+ WriterData.Status.Connected = true;
+
+ Y_VERIFY(socket != INVALID_SOCKET, "must be a valid socket");
+
+ TSocket readSocket(socket);
+ TSocket writeSocket = readSocket;
+
+ // this must not be done in constructor, because if event loop is stopped,
+ // this is deleted
+ WriterData.SetChannel(Session->WriteEventLoop.Register(writeSocket, this, WriteCookie));
+ WriterData.SocketVersion = 1;
+
+ ReaderGetSocketQueue()->EnqueueAndSchedule(TWriterToReaderSocketMessage(readSocket, WriterData.SocketVersion));
+}
+
+TRemoteServerSession* TRemoteServerConnection::GetSession() {
+ return CheckedCast<TRemoteServerSession*>(Session.Get());
+}
+
+void TRemoteServerConnection::HandleEvent(SOCKET socket, void* cookie) {
+ Y_UNUSED(socket);
+ Y_ASSERT(cookie == ReadCookie || cookie == WriteCookie);
+ if (cookie == ReadCookie) {
+ GetSession()->ServerOwnedMessages.Wait();
+ ScheduleRead();
+ } else {
+ ScheduleWrite();
+ }
+}
+
+bool TRemoteServerConnection::NeedInterruptRead() {
+ return !GetSession()->ServerOwnedMessages.TryWait();
+}
+
+void TRemoteServerConnection::MessageSent(TArrayRef<TBusMessagePtrAndHeader> messages) {
+ TInstant now = TInstant::Now();
+
+ GetSession()->ReleaseInWorkResponses(messages);
+ for (auto& message : messages) {
+ TInstant recvTime = message.MessagePtr->RecvTime;
+ GetSession()->ServerHandler->OnSent(message.MessagePtr.Release());
+ TDuration d = now - recvTime;
+ WriterData.Status.DurationCounter.AddDuration(d);
+ WriterData.Status.Incremental.ProcessDurationHistogram.AddTime(d);
+ }
+}
+
+void TRemoteServerConnection::ReaderProcessMessageUnknownVersion(TArrayRef<const char> dataRef) {
+ TBusHeader header(dataRef);
+ // TODO: full version hex
+ LWPROBE(ServerUnknownVersion, ToString(PeerAddr), header.GetVersionInternal());
+ WrongVersionRequests.Enqueue(header);
+ GetWriterActor()->Schedule();
+}