diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/remote_server_connection.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/remote_server_connection.cpp')
-rw-r--r-- | library/cpp/messagebus/remote_server_connection.cpp | 73 |
1 files changed, 73 insertions, 0 deletions
diff --git a/library/cpp/messagebus/remote_server_connection.cpp b/library/cpp/messagebus/remote_server_connection.cpp new file mode 100644 index 0000000000..74be34ded9 --- /dev/null +++ b/library/cpp/messagebus/remote_server_connection.cpp @@ -0,0 +1,73 @@ +#include "remote_server_connection.h" + +#include "mb_lwtrace.h" +#include "remote_server_session.h" + +#include <util/generic/cast.h> + +LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER) + +using namespace NBus; +using namespace NBus::NPrivate; + +TRemoteServerConnection::TRemoteServerConnection(TRemoteServerSessionPtr session, ui64 id, TNetAddr addr) + : TRemoteConnection(session.Get(), id, addr) +{ +} + +void TRemoteServerConnection::Init(SOCKET socket, TInstant now) { + WriterData.Status.ConnectTime = now; + WriterData.Status.Connected = true; + + Y_VERIFY(socket != INVALID_SOCKET, "must be a valid socket"); + + TSocket readSocket(socket); + TSocket writeSocket = readSocket; + + // this must not be done in constructor, because if event loop is stopped, + // this is deleted + WriterData.SetChannel(Session->WriteEventLoop.Register(writeSocket, this, WriteCookie)); + WriterData.SocketVersion = 1; + + ReaderGetSocketQueue()->EnqueueAndSchedule(TWriterToReaderSocketMessage(readSocket, WriterData.SocketVersion)); +} + +TRemoteServerSession* TRemoteServerConnection::GetSession() { + return CheckedCast<TRemoteServerSession*>(Session.Get()); +} + +void TRemoteServerConnection::HandleEvent(SOCKET socket, void* cookie) { + Y_UNUSED(socket); + Y_ASSERT(cookie == ReadCookie || cookie == WriteCookie); + if (cookie == ReadCookie) { + GetSession()->ServerOwnedMessages.Wait(); + ScheduleRead(); + } else { + ScheduleWrite(); + } +} + +bool TRemoteServerConnection::NeedInterruptRead() { + return !GetSession()->ServerOwnedMessages.TryWait(); +} + +void TRemoteServerConnection::MessageSent(TArrayRef<TBusMessagePtrAndHeader> messages) { + TInstant now = TInstant::Now(); + + GetSession()->ReleaseInWorkResponses(messages); + for (auto& message : messages) { + TInstant recvTime = message.MessagePtr->RecvTime; + GetSession()->ServerHandler->OnSent(message.MessagePtr.Release()); + TDuration d = now - recvTime; + WriterData.Status.DurationCounter.AddDuration(d); + WriterData.Status.Incremental.ProcessDurationHistogram.AddTime(d); + } +} + +void TRemoteServerConnection::ReaderProcessMessageUnknownVersion(TArrayRef<const char> dataRef) { + TBusHeader header(dataRef); + // TODO: full version hex + LWPROBE(ServerUnknownVersion, ToString(PeerAddr), header.GetVersionInternal()); + WrongVersionRequests.Enqueue(header); + GetWriterActor()->Schedule(); +} |