diff options
author | nga <nga@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
commit | 1f553f46fb4f3c5eec631352cdd900a0709016af (patch) | |
tree | a231fba2c03b440becaea6c86a2702d0bfb0336e /library/cpp/messagebus/remote_server_connection.cpp | |
parent | c4de7efdedc25b49cbea74bd589eecb61b55b60a (diff) | |
download | ydb-1f553f46fb4f3c5eec631352cdd900a0709016af.tar.gz |
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/remote_server_connection.cpp')
-rw-r--r-- | library/cpp/messagebus/remote_server_connection.cpp | 124 |
1 files changed, 62 insertions, 62 deletions
diff --git a/library/cpp/messagebus/remote_server_connection.cpp b/library/cpp/messagebus/remote_server_connection.cpp index 74be34ded9..9af7fb2185 100644 --- a/library/cpp/messagebus/remote_server_connection.cpp +++ b/library/cpp/messagebus/remote_server_connection.cpp @@ -1,73 +1,73 @@ -#include "remote_server_connection.h" - +#include "remote_server_connection.h" + #include "mb_lwtrace.h" -#include "remote_server_session.h" - +#include "remote_server_session.h" + #include <util/generic/cast.h> - -LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER) - -using namespace NBus; -using namespace NBus::NPrivate; - -TRemoteServerConnection::TRemoteServerConnection(TRemoteServerSessionPtr session, ui64 id, TNetAddr addr) - : TRemoteConnection(session.Get(), id, addr) -{ -} - + +LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER) + +using namespace NBus; +using namespace NBus::NPrivate; + +TRemoteServerConnection::TRemoteServerConnection(TRemoteServerSessionPtr session, ui64 id, TNetAddr addr) + : TRemoteConnection(session.Get(), id, addr) +{ +} + void TRemoteServerConnection::Init(SOCKET socket, TInstant now) { - WriterData.Status.ConnectTime = now; - WriterData.Status.Connected = true; - + WriterData.Status.ConnectTime = now; + WriterData.Status.Connected = true; + Y_VERIFY(socket != INVALID_SOCKET, "must be a valid socket"); - - TSocket readSocket(socket); - TSocket writeSocket = readSocket; - - // this must not be done in constructor, because if event loop is stopped, - // this is deleted - WriterData.SetChannel(Session->WriteEventLoop.Register(writeSocket, this, WriteCookie)); - WriterData.SocketVersion = 1; - - ReaderGetSocketQueue()->EnqueueAndSchedule(TWriterToReaderSocketMessage(readSocket, WriterData.SocketVersion)); -} - -TRemoteServerSession* TRemoteServerConnection::GetSession() { - return CheckedCast<TRemoteServerSession*>(Session.Get()); -} - -void TRemoteServerConnection::HandleEvent(SOCKET socket, void* cookie) { + + TSocket readSocket(socket); + TSocket writeSocket = readSocket; + + // this must not be done in constructor, because if event loop is stopped, + // this is deleted + WriterData.SetChannel(Session->WriteEventLoop.Register(writeSocket, this, WriteCookie)); + WriterData.SocketVersion = 1; + + ReaderGetSocketQueue()->EnqueueAndSchedule(TWriterToReaderSocketMessage(readSocket, WriterData.SocketVersion)); +} + +TRemoteServerSession* TRemoteServerConnection::GetSession() { + return CheckedCast<TRemoteServerSession*>(Session.Get()); +} + +void TRemoteServerConnection::HandleEvent(SOCKET socket, void* cookie) { Y_UNUSED(socket); Y_ASSERT(cookie == ReadCookie || cookie == WriteCookie); - if (cookie == ReadCookie) { - GetSession()->ServerOwnedMessages.Wait(); - ScheduleRead(); - } else { - ScheduleWrite(); - } -} - -bool TRemoteServerConnection::NeedInterruptRead() { - return !GetSession()->ServerOwnedMessages.TryWait(); -} - + if (cookie == ReadCookie) { + GetSession()->ServerOwnedMessages.Wait(); + ScheduleRead(); + } else { + ScheduleWrite(); + } +} + +bool TRemoteServerConnection::NeedInterruptRead() { + return !GetSession()->ServerOwnedMessages.TryWait(); +} + void TRemoteServerConnection::MessageSent(TArrayRef<TBusMessagePtrAndHeader> messages) { - TInstant now = TInstant::Now(); - - GetSession()->ReleaseInWorkResponses(messages); + TInstant now = TInstant::Now(); + + GetSession()->ReleaseInWorkResponses(messages); for (auto& message : messages) { TInstant recvTime = message.MessagePtr->RecvTime; GetSession()->ServerHandler->OnSent(message.MessagePtr.Release()); - TDuration d = now - recvTime; - WriterData.Status.DurationCounter.AddDuration(d); - WriterData.Status.Incremental.ProcessDurationHistogram.AddTime(d); - } -} - + TDuration d = now - recvTime; + WriterData.Status.DurationCounter.AddDuration(d); + WriterData.Status.Incremental.ProcessDurationHistogram.AddTime(d); + } +} + void TRemoteServerConnection::ReaderProcessMessageUnknownVersion(TArrayRef<const char> dataRef) { - TBusHeader header(dataRef); - // TODO: full version hex - LWPROBE(ServerUnknownVersion, ToString(PeerAddr), header.GetVersionInternal()); - WrongVersionRequests.Enqueue(header); - GetWriterActor()->Schedule(); -} + TBusHeader header(dataRef); + // TODO: full version hex + LWPROBE(ServerUnknownVersion, ToString(PeerAddr), header.GetVersionInternal()); + WrongVersionRequests.Enqueue(header); + GetWriterActor()->Schedule(); +} |