aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/remote_server_connection.cpp
diff options
context:
space:
mode:
authornga <nga@yandex-team.ru>2022-02-10 16:48:09 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:09 +0300
commit1f553f46fb4f3c5eec631352cdd900a0709016af (patch)
treea231fba2c03b440becaea6c86a2702d0bfb0336e /library/cpp/messagebus/remote_server_connection.cpp
parentc4de7efdedc25b49cbea74bd589eecb61b55b60a (diff)
downloadydb-1f553f46fb4f3c5eec631352cdd900a0709016af.tar.gz
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/remote_server_connection.cpp')
-rw-r--r--library/cpp/messagebus/remote_server_connection.cpp124
1 files changed, 62 insertions, 62 deletions
diff --git a/library/cpp/messagebus/remote_server_connection.cpp b/library/cpp/messagebus/remote_server_connection.cpp
index 74be34ded9..9af7fb2185 100644
--- a/library/cpp/messagebus/remote_server_connection.cpp
+++ b/library/cpp/messagebus/remote_server_connection.cpp
@@ -1,73 +1,73 @@
-#include "remote_server_connection.h"
-
+#include "remote_server_connection.h"
+
#include "mb_lwtrace.h"
-#include "remote_server_session.h"
-
+#include "remote_server_session.h"
+
#include <util/generic/cast.h>
-
-LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER)
-
-using namespace NBus;
-using namespace NBus::NPrivate;
-
-TRemoteServerConnection::TRemoteServerConnection(TRemoteServerSessionPtr session, ui64 id, TNetAddr addr)
- : TRemoteConnection(session.Get(), id, addr)
-{
-}
-
+
+LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER)
+
+using namespace NBus;
+using namespace NBus::NPrivate;
+
+TRemoteServerConnection::TRemoteServerConnection(TRemoteServerSessionPtr session, ui64 id, TNetAddr addr)
+ : TRemoteConnection(session.Get(), id, addr)
+{
+}
+
void TRemoteServerConnection::Init(SOCKET socket, TInstant now) {
- WriterData.Status.ConnectTime = now;
- WriterData.Status.Connected = true;
-
+ WriterData.Status.ConnectTime = now;
+ WriterData.Status.Connected = true;
+
Y_VERIFY(socket != INVALID_SOCKET, "must be a valid socket");
-
- TSocket readSocket(socket);
- TSocket writeSocket = readSocket;
-
- // this must not be done in constructor, because if event loop is stopped,
- // this is deleted
- WriterData.SetChannel(Session->WriteEventLoop.Register(writeSocket, this, WriteCookie));
- WriterData.SocketVersion = 1;
-
- ReaderGetSocketQueue()->EnqueueAndSchedule(TWriterToReaderSocketMessage(readSocket, WriterData.SocketVersion));
-}
-
-TRemoteServerSession* TRemoteServerConnection::GetSession() {
- return CheckedCast<TRemoteServerSession*>(Session.Get());
-}
-
-void TRemoteServerConnection::HandleEvent(SOCKET socket, void* cookie) {
+
+ TSocket readSocket(socket);
+ TSocket writeSocket = readSocket;
+
+ // this must not be done in constructor, because if event loop is stopped,
+ // this is deleted
+ WriterData.SetChannel(Session->WriteEventLoop.Register(writeSocket, this, WriteCookie));
+ WriterData.SocketVersion = 1;
+
+ ReaderGetSocketQueue()->EnqueueAndSchedule(TWriterToReaderSocketMessage(readSocket, WriterData.SocketVersion));
+}
+
+TRemoteServerSession* TRemoteServerConnection::GetSession() {
+ return CheckedCast<TRemoteServerSession*>(Session.Get());
+}
+
+void TRemoteServerConnection::HandleEvent(SOCKET socket, void* cookie) {
Y_UNUSED(socket);
Y_ASSERT(cookie == ReadCookie || cookie == WriteCookie);
- if (cookie == ReadCookie) {
- GetSession()->ServerOwnedMessages.Wait();
- ScheduleRead();
- } else {
- ScheduleWrite();
- }
-}
-
-bool TRemoteServerConnection::NeedInterruptRead() {
- return !GetSession()->ServerOwnedMessages.TryWait();
-}
-
+ if (cookie == ReadCookie) {
+ GetSession()->ServerOwnedMessages.Wait();
+ ScheduleRead();
+ } else {
+ ScheduleWrite();
+ }
+}
+
+bool TRemoteServerConnection::NeedInterruptRead() {
+ return !GetSession()->ServerOwnedMessages.TryWait();
+}
+
void TRemoteServerConnection::MessageSent(TArrayRef<TBusMessagePtrAndHeader> messages) {
- TInstant now = TInstant::Now();
-
- GetSession()->ReleaseInWorkResponses(messages);
+ TInstant now = TInstant::Now();
+
+ GetSession()->ReleaseInWorkResponses(messages);
for (auto& message : messages) {
TInstant recvTime = message.MessagePtr->RecvTime;
GetSession()->ServerHandler->OnSent(message.MessagePtr.Release());
- TDuration d = now - recvTime;
- WriterData.Status.DurationCounter.AddDuration(d);
- WriterData.Status.Incremental.ProcessDurationHistogram.AddTime(d);
- }
-}
-
+ TDuration d = now - recvTime;
+ WriterData.Status.DurationCounter.AddDuration(d);
+ WriterData.Status.Incremental.ProcessDurationHistogram.AddTime(d);
+ }
+}
+
void TRemoteServerConnection::ReaderProcessMessageUnknownVersion(TArrayRef<const char> dataRef) {
- TBusHeader header(dataRef);
- // TODO: full version hex
- LWPROBE(ServerUnknownVersion, ToString(PeerAddr), header.GetVersionInternal());
- WrongVersionRequests.Enqueue(header);
- GetWriterActor()->Schedule();
-}
+ TBusHeader header(dataRef);
+ // TODO: full version hex
+ LWPROBE(ServerUnknownVersion, ToString(PeerAddr), header.GetVersionInternal());
+ WrongVersionRequests.Enqueue(header);
+ GetWriterActor()->Schedule();
+}