diff options
author | nga <nga@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
commit | c2a1af049e9deca890e9923abe64fe6c59060348 (patch) | |
tree | b222e5ac2e2e98872661c51ccceee5da0d291e13 /library/cpp/messagebus/remote_connection.cpp | |
parent | 1f553f46fb4f3c5eec631352cdd900a0709016af (diff) | |
download | ydb-c2a1af049e9deca890e9923abe64fe6c59060348.tar.gz |
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/remote_connection.cpp')
-rw-r--r-- | library/cpp/messagebus/remote_connection.cpp | 312 |
1 files changed, 156 insertions, 156 deletions
diff --git a/library/cpp/messagebus/remote_connection.cpp b/library/cpp/messagebus/remote_connection.cpp index 2e14d78eb4..22932569db 100644 --- a/library/cpp/messagebus/remote_connection.cpp +++ b/library/cpp/messagebus/remote_connection.cpp @@ -1,11 +1,11 @@ #include "remote_connection.h" - -#include "key_value_printer.h" + +#include "key_value_printer.h" #include "mb_lwtrace.h" #include "network.h" -#include "remote_client_connection.h" -#include "remote_client_session.h" -#include "remote_server_session.h" +#include "remote_client_connection.h" +#include "remote_client_session.h" +#include "remote_server_session.h" #include "session_impl.h" #include <library/cpp/messagebus/actor/what_thread_does.h> @@ -14,12 +14,12 @@ #include <util/network/init.h> #include <util/system/atomic.h> -LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER) - -using namespace NActor; -using namespace NBus; -using namespace NBus::NPrivate; - +LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER) + +using namespace NActor; +using namespace NBus; +using namespace NBus::NPrivate; + namespace NBus { namespace NPrivate { TRemoteConnection::TRemoteConnection(TRemoteSessionPtr session, ui64 connectionId, TNetAddr addr) @@ -46,7 +46,7 @@ namespace NBus { ReaderData.Status.ConnectionId = connectionId; const TInstant now = TInstant::Now(); - + WriterFillStatus(); GranStatus.Writer.Update(WriterData.Status, now, true); @@ -56,7 +56,7 @@ namespace NBus { TRemoteConnection::~TRemoteConnection() { Y_VERIFY(ReplyQueue.IsEmpty()); } - + TRemoteConnection::TWriterData::TWriterData() : Down(0) , SocketVersion(0) @@ -65,7 +65,7 @@ namespace NBus { , State(WRITER_FILLING) { } - + TRemoteConnection::TWriterData::~TWriterData() { Y_VERIFY(AtomicGet(Down)); Y_VERIFY(SendQueue.Empty()); @@ -76,7 +76,7 @@ namespace NBus { return (MoreBytes = left >= bytes ? 0 : bytes - left) == 0; } - + void TRemoteConnection::TWriterData::SetChannel(NEventLoop::TChannelPtr channel) { Y_VERIFY(!Channel, "must not have channel"); Y_VERIFY(Buffer.GetBuffer().Empty() && Buffer.LeftSize() == 0, "buffer must be empty"); @@ -95,11 +95,11 @@ namespace NBus { Channel->Unregister(); Channel.Drop(); } - + Buffer.Reset(); State = WRITER_FILLING; } - + void TRemoteConnection::TReaderData::DropChannel() { // TODO: make Drop call Unregister if (!!Channel) { @@ -109,7 +109,7 @@ namespace NBus { Buffer.Reset(); Offset = 0; } - + TRemoteConnection::TReaderData::TReaderData() : Down(0) , SocketVersion(0) @@ -117,31 +117,31 @@ namespace NBus { , MoreBytes(0) { } - + TRemoteConnection::TReaderData::~TReaderData() { Y_VERIFY(AtomicGet(Down)); } - + void TRemoteConnection::Send(TNonDestroyingAutoPtr<TBusMessage> msg) { BeforeSendQueue.Enqueue(msg.Release()); AtomicIncrement(WriterData.InFlight); ScheduleWrite(); } - + void TRemoteConnection::ClearOutgoingQueue(TMessagesPtrs& result, bool reconnect) { if (!reconnect) { // Do not clear send queue if reconnecting WriterData.SendQueue.Clear(&result); } } - + void TRemoteConnection::Shutdown(EMessageStatus status) { ScheduleShutdown(status); ReaderData.ShutdownComplete.WaitI(); WriterData.ShutdownComplete.WaitI(); } - + void TRemoteConnection::TryConnect() { Y_FAIL("TryConnect is client connection only operation"); } @@ -158,27 +158,27 @@ namespace NBus { if (!WriterData.TimeToRotateCounters.FetchTask()) { return; } - + WriterData.Status.DurationCounterPrev = WriterData.Status.DurationCounter; Reset(WriterData.Status.DurationCounter); } - + void TRemoteConnection::WriterSendStatus(TInstant now, bool force) { GranStatus.Writer.Update(std::bind(&TRemoteConnection::WriterGetStatus, this), now, force); } - + void TRemoteConnection::ReaderSendStatus(TInstant now, bool force) { GranStatus.Reader.Update(std::bind(&TRemoteConnection::ReaderFillStatus, this), now, force); } - + const TRemoteConnectionReaderStatus& TRemoteConnection::ReaderFillStatus() { ReaderData.Status.BufferSize = ReaderData.Buffer.Capacity(); ReaderData.Status.QuotaMsg = QuotaMsg.Tokens(); ReaderData.Status.QuotaBytes = QuotaBytes.Tokens(); - + return ReaderData.Status; } - + void TRemoteConnection::ProcessItem(TReaderTag, ::NActor::TDefaultTag, TWriterToReaderSocketMessage readSocket) { if (AtomicGet(ReaderData.Down)) { ReaderData.Status.Fd = INVALID_SOCKET; @@ -189,48 +189,48 @@ namespace NBus { ReaderData.Status.Fd = readSocket.Socket; ReaderData.SocketVersion = readSocket.SocketVersion; - + if (readSocket.Socket != INVALID_SOCKET) { ReaderData.SetChannel(Session->ReadEventLoop.Register(readSocket.Socket, this, ReadCookie)); ReaderData.Channel->EnableRead(); } } - + void TRemoteConnection::ProcessItem(TWriterTag, TReconnectTag, ui32 socketVersion) { Y_VERIFY(socketVersion <= WriterData.SocketVersion, "something weird"); - + if (WriterData.SocketVersion != socketVersion) { return; } Y_VERIFY(WriterData.Status.Connected, "must be connected at this point"); Y_VERIFY(!!WriterData.Channel, "must have channel at this point"); - + WriterData.Status.Connected = false; WriterData.DropChannel(); WriterData.Status.MyAddr = TNetAddr(); ++WriterData.SocketVersion; LastConnectAttempt = TInstant(); - + TMessagesPtrs cleared; ClearOutgoingQueue(cleared, true); WriterErrorMessages(cleared, MESSAGE_DELIVERY_FAILED); - + FireClientConnectionEvent(TClientConnectionEvent::DISCONNECTED); - + ReaderGetSocketQueue()->EnqueueAndSchedule(TWriterToReaderSocketMessage(INVALID_SOCKET, WriterData.SocketVersion)); } - + void TRemoteConnection::ProcessItem(TWriterTag, TWakeReaderTag, ui32 awakeFlags) { WriterData.AwakeFlags |= awakeFlags; - + ReadQuotaWakeup(); } - + void TRemoteConnection::Act(TReaderTag) { TInstant now = TInstant::Now(); - + ReaderData.Status.Acts += 1; - + ReaderGetSocketQueue()->DequeueAllLikelyEmpty(); if (AtomicGet(ReaderData.Down)) { @@ -238,41 +238,41 @@ namespace NBus { ReaderProcessStatusDown(); ReaderData.ShutdownComplete.Signal(); - + } else if (!!ReaderData.Channel) { Y_ASSERT(ReaderData.ReadMessages.empty()); - + for (int i = 0;; ++i) { if (i == 100) { // perform other tasks GetReaderActor()->AddTaskFromActorLoop(); break; } - + if (NeedInterruptRead()) { ReaderData.Channel->EnableRead(); break; } - + if (!ReaderFillBuffer()) break; - + if (!ReaderProcessBuffer()) break; } - + ReaderFlushMessages(); } - + ReaderSendStatus(now); - } - + } + bool TRemoteConnection::QuotaAcquire(size_t msg, size_t bytes) { ui32 wakeFlags = 0; - + if (!QuotaMsg.Acquire(msg)) wakeFlags |= WAKE_QUOTA_MSG; - + else if (!QuotaBytes.Acquire(bytes)) wakeFlags |= WAKE_QUOTA_BYTES; @@ -380,7 +380,7 @@ namespace NBus { ReaderData.Buffer.ChopHead(ReaderData.Offset); ReaderData.Offset = 0; - + if (ReaderData.Buffer.Capacity() > MaxBufferSize && ReaderData.Buffer.Size() <= MaxBufferSize) { ReaderData.Status.Incremental.BufferDrops += 1; @@ -388,7 +388,7 @@ namespace NBus { // probably should use another constant temp.Reserve(Config.DefaultBufferSize); temp.Append(ReaderData.Buffer.Data(), ReaderData.Buffer.Size()); - + ReaderData.Buffer.Swap(temp); } @@ -398,14 +398,14 @@ namespace NBus { bool TRemoteConnection::ReaderFillBuffer() { if (!ReaderData.BufferMore()) return true; - + if (ReaderData.Buffer.Avail() == 0) { if (ReaderData.Buffer.Size() == 0) { ReaderData.Buffer.Reserve(Config.DefaultBufferSize); } else { ReaderData.Buffer.Reserve(ReaderData.Buffer.Size() * 2); } - } + } Y_ASSERT(ReaderData.Buffer.Avail() > 0); @@ -414,7 +414,7 @@ namespace NBus { TWhatThreadDoesPushPop pp("recv syscall"); bytes = SocketRecv(ReaderData.Channel->GetSocket(), TArrayRef<char>(ReaderData.Buffer.Pos(), ReaderData.Buffer.Avail())); } - + if (bytes < 0) { if (WouldBlock()) { ReaderData.Channel->EnableRead(); @@ -425,30 +425,30 @@ namespace NBus { return false; } } - + if (bytes == 0) { ReaderData.Channel->DisableRead(); // TODO: incorrect: it is possible that only input is shutdown, and output is available ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_DELIVERY_FAILED, false); return false; } - + ReaderData.Status.Incremental.NetworkOps += 1; - + ReaderData.Buffer.Advance(bytes); ReaderData.MoreBytes = 0; return true; } - + void TRemoteConnection::ClearBeforeSendQueue(EMessageStatus reason) { BeforeSendQueue.DequeueAll(std::bind(&TRemoteConnection::WriterBeforeWriteErrorMessage, this, std::placeholders::_1, reason)); } - + void TRemoteConnection::ClearReplyQueue(EMessageStatus reason) { TVectorSwaps<TBusMessagePtrAndHeader> replyQueueTemp; Y_ASSERT(replyQueueTemp.empty()); ReplyQueue.DequeueAllSingleConsumer(&replyQueueTemp); - + TVector<TBusMessage*> messages; for (TVectorSwaps<TBusMessagePtrAndHeader>::reverse_iterator message = replyQueueTemp.rbegin(); message != replyQueueTemp.rend(); ++message) { @@ -458,8 +458,8 @@ namespace NBus { WriterErrorMessages(messages, reason); replyQueueTemp.clear(); - } - + } + void TRemoteConnection::ProcessBeforeSendQueueMessage(TBusMessage* message, TInstant now) { // legacy clients expect this field to be set if (!Session->IsSource_) { @@ -482,7 +482,7 @@ namespace NBus { const TRemoteConnectionWriterStatus& TRemoteConnection::WriterGetStatus() { WriterRotateCounters(); WriterFillStatus(); - + return WriterData.Status; } @@ -496,40 +496,40 @@ namespace NBus { WriterData.Status.SendQueueSize = WriterData.SendQueue.Size(); WriterData.Status.State = WriterData.State; } - + void TRemoteConnection::WriterProcessStatusDown() { Session->GetDeadConnectionWriterStatusQueue()->EnqueueAndSchedule(WriterData.Status.Incremental); Reset(WriterData.Status.Incremental); } - + void TRemoteConnection::ReaderProcessStatusDown() { Session->GetDeadConnectionReaderStatusQueue()->EnqueueAndSchedule(ReaderData.Status.Incremental); Reset(ReaderData.Status.Incremental); } - + void TRemoteConnection::ProcessWriterDown() { if (!RemovedFromSession) { Session->GetRemoveConnectionQueue()->EnqueueAndSchedule(this); - + if (Session->IsSource_) { if (WriterData.Status.Connected) { FireClientConnectionEvent(TClientConnectionEvent::DISCONNECTED); } } - + LWPROBE(Disconnected, ToString(PeerAddr)); RemovedFromSession = true; } - + WriterData.DropChannel(); - + DropEnqueuedData(ShutdownReason, MESSAGE_SHUTDOWN); - + WriterProcessStatusDown(); - + WriterData.ShutdownComplete.Signal(); } - + void TRemoteConnection::DropEnqueuedData(EMessageStatus reason, EMessageStatus reasonForQueues) { ClearReplyQueue(reasonForQueues); ClearBeforeSendQueue(reasonForQueues); @@ -554,23 +554,23 @@ namespace NBus { void TRemoteConnection::BeforeTryWrite() { } - + void TRemoteConnection::Act(TWriterTag) { TInstant now = TInstant::Now(); - + WriterData.Status.Acts += 1; - + if (Y_UNLIKELY(AtomicGet(WriterData.Down))) { // dump status must work even if WriterDown WriterSendStatus(now, true); ProcessWriterDown(); return; - } - + } + ProcessBeforeSendQueue(now); - + BeforeTryWrite(); - + WriterFillInFlight(); WriterGetReconnectQueue()->DequeueAllLikelyEmpty(); @@ -587,43 +587,43 @@ namespace NBus { if (WriterData.State == WRITER_FILLING) { WriterFillBuffer(); - + if (WriterData.State == WRITER_FILLING) { WriterData.Channel->DisableWrite(); break; } - + Y_ASSERT(!WriterData.Buffer.Empty()); } - + if (WriterData.State == WRITER_FLUSHING) { WriterFlushBuffer(); - + if (WriterData.State == WRITER_FLUSHING) { break; } } } } - + WriterGetWakeQueue()->DequeueAllLikelyEmpty(); - + WriterSendStatus(now); } - + void TRemoteConnection::WriterFlushBuffer() { Y_ASSERT(WriterData.State == WRITER_FLUSHING); Y_ASSERT(!WriterData.Buffer.Empty()); - + WriterData.CorkUntil = TInstant::Zero(); - + while (!WriterData.Buffer.Empty()) { ssize_t bytes; { TWhatThreadDoesPushPop pp("send syscall"); bytes = SocketSend(WriterData.Channel->GetSocket(), TArrayRef<const char>(WriterData.Buffer.LeftPos(), WriterData.Buffer.Size())); } - + if (bytes < 0) { if (WouldBlock()) { WriterData.Channel->EnableWrite(); @@ -634,18 +634,18 @@ namespace NBus { return; } } - + WriterData.Status.Incremental.NetworkOps += 1; - + WriterData.Buffer.LeftProceed(bytes); } - + WriterData.Buffer.Clear(); if (WriterData.Buffer.Capacity() > MaxBufferSize) { WriterData.Status.Incremental.BufferDrops += 1; WriterData.Buffer.Reset(); } - + WriterData.State = WRITER_FILLING; } @@ -655,8 +655,8 @@ namespace NBus { } else { ScheduleShutdown(status); } - } - + } + void TRemoteConnection::ScheduleShutdown(EMessageStatus status) { ShutdownReason = status; @@ -673,20 +673,20 @@ namespace NBus { Y_VERIFY(buffer.Size() >= posForAssertion, "incorrect Serialize implementation, pos before serialize: %d, pos after serialize: %d", int(posForAssertion), int(buffer.Size())); - } - + } + namespace { inline void WriteHeader(const TBusHeader& header, TBuffer& data) { data.Reserve(data.Size() + sizeof(TBusHeader)); /// \todo hton instead of memcpy memcpy(data.Data() + data.Size(), &header, sizeof(TBusHeader)); data.Advance(sizeof(TBusHeader)); - } - + } + inline void WriteDummyHeader(TBuffer& data) { data.Resize(data.Size() + sizeof(TBusHeader)); } - + } void TRemoteConnection::SerializeMessage(TBusMessage* msg, TBuffer* data, TMessageCounter* counter) const { @@ -695,17 +695,17 @@ namespace NBus { size_t dataSize; bool compressionRequested = msg->IsCompressed(); - + if (compressionRequested) { TBuffer compdata; TBuffer plaindata; CallSerialize(msg, plaindata); - + dataSize = sizeof(TBusHeader) + plaindata.Size(); - + NCodecs::TCodecPtr c = Proto->GetTransportCodec(); c->Encode(TStringBuf{plaindata.data(), plaindata.size()}, compdata); - + if (compdata.Size() < plaindata.Size()) { plaindata.Clear(); msg->GetHeader()->Size = sizeof(TBusHeader) + compdata.Size(); @@ -721,21 +721,21 @@ namespace NBus { } else { WriteDummyHeader(*data); CallSerialize(msg, *data); - + dataSize = msg->GetHeader()->Size = data->Size() - pos; - + data->Proceed(pos); WriteHeader(*msg->GetHeader(), *data); data->Proceed(pos + msg->GetHeader()->Size); } - + Y_ASSERT(msg->GetHeader()->Size == data->Size() - pos); counter->AddMessage(dataSize, data->Size() - pos, msg->IsCompressed(), compressionRequested); } - + TBusMessage* TRemoteConnection::DeserializeMessage(TArrayRef<const char> dataRef, const TBusHeader* header, TMessageCounter* messageCounter, EMessageStatus* status) const { size_t dataSize; - + TBusMessage* message; if (header->FlagsInternal & MESSAGE_COMPRESS_INTERNAL) { TBuffer msg; @@ -751,7 +751,7 @@ namespace NBus { *status = MESSAGE_DECOMPRESS_ERROR; return nullptr; } - + msg.Append(dataRef.data(), sizeof(TBusHeader)); msg.Append(plaindata.Data(), plaindata.Size()); } @@ -774,39 +774,39 @@ namespace NBus { } *message->GetHeader() = *header; } - + messageCounter->AddMessage(dataSize, dataRef.size(), header->FlagsInternal & MESSAGE_COMPRESS_INTERNAL, false); - + return message; } - + void TRemoteConnection::ResetOneWayFlag(TArrayRef<TBusMessage*> messages) { for (auto message : messages) { message->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL; } - } - + } + void TRemoteConnection::ReaderFlushMessages() { if (!ReaderData.ReadMessages.empty()) { Session->OnMessageReceived(this, ReaderData.ReadMessages); ReaderData.ReadMessages.clear(); - } - } - + } + } + // @return false if actor should break bool TRemoteConnection::MessageRead(TArrayRef<const char> readDataRef, TInstant now) { TBusHeader header(readDataRef); - + Y_ASSERT(readDataRef.size() == header.Size); - + if (header.GetVersionInternal() != YBUS_VERSION) { ReaderProcessMessageUnknownVersion(readDataRef); return true; } - + EMessageStatus deserializeFailureStatus = MESSAGE_OK; TBusMessage* r = DeserializeMessage(readDataRef, &header, &ReaderData.Status.Incremental.MessageCounter, &deserializeFailureStatus); - + if (!r) { Y_VERIFY(deserializeFailureStatus != MESSAGE_OK, "state check"); LWPROBE(Error, ToString(deserializeFailureStatus), ToString(PeerAddr), ""); @@ -814,16 +814,16 @@ namespace NBus { ScheduleShutdownOnServerOrReconnectOnClient(deserializeFailureStatus, false); return false; } - + LWPROBE(Read, r->GetHeader()->Size); - + r->ReplyTo = PeerAddrSocketAddr; - + TBusMessagePtrAndHeader h(r); r->RecvTime = now; QuotaConsume(1, header.Size); - + ReaderData.ReadMessages.push_back(h); if (ReaderData.ReadMessages.size() >= 100) { ReaderFlushMessages(); @@ -831,12 +831,12 @@ namespace NBus { return true; } - + void TRemoteConnection::WriterFillBuffer() { Y_ASSERT(WriterData.State == WRITER_FILLING); Y_ASSERT(WriterData.Buffer.LeftSize() == 0); - + if (Y_UNLIKELY(!WrongVersionRequests.IsEmpty())) { TVector<TBusHeader> headers; WrongVersionRequests.DequeueAllSingleConsumer(&headers); @@ -849,12 +849,12 @@ namespace NBus { response.SetVersionInternal(YBUS_VERSION); WriteHeader(response, WriterData.Buffer.GetBuffer()); } - + Y_ASSERT(!WriterData.Buffer.Empty()); WriterData.State = WRITER_FLUSHING; return; } - + TTempTlsVector<TBusMessagePtrAndHeader, void, TVectorSwaps> writeMessages; for (;;) { @@ -862,7 +862,7 @@ namespace NBus { if (!writeMessage) { break; } - + if (Config.Cork != TDuration::Zero()) { if (WriterData.CorkUntil == TInstant::Zero()) { WriterData.CorkUntil = TInstant::Now() + Config.Cork; @@ -870,29 +870,29 @@ namespace NBus { } size_t sizeBeforeSerialize = WriterData.Buffer.Size(); - + TMessageCounter messageCounter = WriterData.Status.Incremental.MessageCounter; - + SerializeMessage(writeMessage.Get(), &WriterData.Buffer.GetBuffer(), &messageCounter); - + size_t written = WriterData.Buffer.Size() - sizeBeforeSerialize; if (written > Config.MaxMessageSize) { WriterData.Buffer.GetBuffer().EraseBack(written); WriterBeforeWriteErrorMessage(writeMessage.Release(), MESSAGE_MESSAGE_TOO_LARGE); continue; } - + WriterData.Status.Incremental.MessageCounter = messageCounter; - + TBusMessagePtrAndHeader h(writeMessage.Release()); writeMessages.GetVector()->push_back(h); - + Y_ASSERT(!WriterData.Buffer.Empty()); if (WriterData.Buffer.Size() >= Config.SendThreshold) { break; } - } - + } + if (!WriterData.Buffer.Empty()) { if (WriterData.Buffer.Size() >= Config.SendThreshold) { WriterData.State = WRITER_FLUSHING; @@ -909,31 +909,31 @@ namespace NBus { // keep filling Y_ASSERT(WriterData.State == WRITER_FILLING); } - + size_t bytes = MessageSize(*writeMessages.GetVector()); - + QuotaReturnSelf(writeMessages.GetVector()->size(), bytes); - + // This is called before `send` syscall inducing latency MessageSent(*writeMessages.GetVector()); - } - + } + size_t TRemoteConnection::MessageSize(TArrayRef<TBusMessagePtrAndHeader> messages) { size_t size = 0; for (const auto& message : messages) size += message.MessagePtr->RequestSize; - + return size; } - + size_t TRemoteConnection::GetInFlight() { return AtomicGet(WriterData.InFlight); - } - + } + size_t TRemoteConnection::GetConnectSyscallsNumForTest() { return WriterData.Status.ConnectSyscalls; - } - + } + void TRemoteConnection::WriterBeforeWriteErrorMessage(TBusMessage* message, EMessageStatus status) { if (Session->IsSource_) { CheckedCast<TRemoteClientSession*>(Session.Get())->ReleaseInFlight({message}); @@ -970,5 +970,5 @@ namespace NBus { return !AtomicGet(WriterData.Down); } - } -} + } +} |