diff options
author | somov <somov@yandex-team.ru> | 2022-02-10 16:45:47 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:47 +0300 |
commit | a5950576e397b1909261050b8c7da16db58f10b1 (patch) | |
tree | 7ba7677f6a4c3e19e2cefab34d16df2c8963b4d4 /library/cpp/messagebus/remote_connection.cpp | |
parent | 81eddc8c0b55990194e112b02d127b87d54164a9 (diff) | |
download | ydb-a5950576e397b1909261050b8c7da16db58f10b1.tar.gz |
Restoring authorship annotation for <somov@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/remote_connection.cpp')
-rw-r--r-- | library/cpp/messagebus/remote_connection.cpp | 58 |
1 files changed, 29 insertions, 29 deletions
diff --git a/library/cpp/messagebus/remote_connection.cpp b/library/cpp/messagebus/remote_connection.cpp index 22932569db..59b58f7797 100644 --- a/library/cpp/messagebus/remote_connection.cpp +++ b/library/cpp/messagebus/remote_connection.cpp @@ -7,9 +7,9 @@ #include "remote_client_session.h" #include "remote_server_session.h" #include "session_impl.h" - + #include <library/cpp/messagebus/actor/what_thread_does.h> - + #include <util/generic/cast.h> #include <util/network/init.h> #include <util/system/atomic.h> @@ -44,7 +44,7 @@ namespace NBus { WriterData.Status.ConnectionId = connectionId; WriterData.Status.PeerAddr = PeerAddr; ReaderData.Status.ConnectionId = connectionId; - + const TInstant now = TInstant::Now(); WriterFillStatus(); @@ -70,7 +70,7 @@ namespace NBus { Y_VERIFY(AtomicGet(Down)); Y_VERIFY(SendQueue.Empty()); } - + bool TRemoteConnection::TReaderData::HasBytesInBuf(size_t bytes) noexcept { size_t left = Buffer.Size() - Offset; @@ -137,7 +137,7 @@ namespace NBus { void TRemoteConnection::Shutdown(EMessageStatus status) { ScheduleShutdown(status); - + ReaderData.ShutdownComplete.WaitI(); WriterData.ShutdownComplete.WaitI(); } @@ -145,15 +145,15 @@ namespace NBus { void TRemoteConnection::TryConnect() { Y_FAIL("TryConnect is client connection only operation"); } - + void TRemoteConnection::ScheduleRead() { GetReaderActor()->Schedule(); } - + void TRemoteConnection::ScheduleWrite() { GetWriterActor()->Schedule(); } - + void TRemoteConnection::WriterRotateCounters() { if (!WriterData.TimeToRotateCounters.FetchTask()) { return; @@ -383,7 +383,7 @@ namespace NBus { if (ReaderData.Buffer.Capacity() > MaxBufferSize && ReaderData.Buffer.Size() <= MaxBufferSize) { ReaderData.Status.Incremental.BufferDrops += 1; - + TBuffer temp; // probably should use another constant temp.Reserve(Config.DefaultBufferSize); @@ -391,7 +391,7 @@ namespace NBus { ReaderData.Buffer.Swap(temp); } - + return true; } @@ -406,7 +406,7 @@ namespace NBus { ReaderData.Buffer.Reserve(ReaderData.Buffer.Size() * 2); } } - + Y_ASSERT(ReaderData.Buffer.Avail() > 0); ssize_t bytes; @@ -465,27 +465,27 @@ namespace NBus { if (!Session->IsSource_) { message->SendTime = now.MilliSeconds(); } - + WriterData.SendQueue.PushBack(message); } - + void TRemoteConnection::ProcessBeforeSendQueue(TInstant now) { BeforeSendQueue.DequeueAll(std::bind(&TRemoteConnection::ProcessBeforeSendQueueMessage, this, std::placeholders::_1, now)); - } - + } + void TRemoteConnection::WriterFillInFlight() { // this is hack for TLoadBalancedProtocol WriterFillStatus(); AtomicSet(WriterData.InFlight, WriterData.Status.GetInFlight()); } - + const TRemoteConnectionWriterStatus& TRemoteConnection::WriterGetStatus() { WriterRotateCounters(); WriterFillStatus(); return WriterData.Status; } - + void TRemoteConnection::WriterFillStatus() { if (!!WriterData.Channel) { WriterData.Status.Fd = WriterData.Channel->GetSocket(); @@ -644,11 +644,11 @@ namespace NBus { if (WriterData.Buffer.Capacity() > MaxBufferSize) { WriterData.Status.Incremental.BufferDrops += 1; WriterData.Buffer.Reset(); - } + } WriterData.State = WRITER_FILLING; } - + void TRemoteConnection::ScheduleShutdownOnServerOrReconnectOnClient(EMessageStatus status, bool writer) { if (Session->IsSource_) { WriterGetReconnectQueue()->EnqueueAndSchedule(writer ? WriterData.SocketVersion : ReaderData.SocketVersion); @@ -662,11 +662,11 @@ namespace NBus { AtomicSet(ReaderData.Down, 1); ScheduleRead(); - + AtomicSet(WriterData.Down, 1); ScheduleWrite(); } - + void TRemoteConnection::CallSerialize(TBusMessage* msg, TBuffer& buffer) const { size_t posForAssertion = buffer.Size(); Proto->Serialize(msg, buffer); @@ -688,12 +688,12 @@ namespace NBus { } } - + void TRemoteConnection::SerializeMessage(TBusMessage* msg, TBuffer* data, TMessageCounter* counter) const { size_t pos = data->Size(); - + size_t dataSize; - + bool compressionRequested = msg->IsCompressed(); if (compressionRequested) { @@ -821,20 +821,20 @@ namespace NBus { TBusMessagePtrAndHeader h(r); r->RecvTime = now; - + QuotaConsume(1, header.Size); ReaderData.ReadMessages.push_back(h); if (ReaderData.ReadMessages.size() >= 100) { ReaderFlushMessages(); } - + return true; } void TRemoteConnection::WriterFillBuffer() { Y_ASSERT(WriterData.State == WRITER_FILLING); - + Y_ASSERT(WriterData.Buffer.LeftSize() == 0); if (Y_UNLIKELY(!WrongVersionRequests.IsEmpty())) { @@ -868,7 +868,7 @@ namespace NBus { WriterData.CorkUntil = TInstant::Now() + Config.Cork; } } - + size_t sizeBeforeSerialize = WriterData.Buffer.Size(); TMessageCounter messageCounter = WriterData.Status.Incremental.MessageCounter; @@ -952,7 +952,7 @@ namespace NBus { void TRemoteConnection::WriterErrorMessages(const TArrayRef<TBusMessage*> ms, EMessageStatus status) { ResetOneWayFlag(ms); - + WriterData.Status.Incremental.StatusCounter[status] += ms.size(); for (auto m : ms) { Session->InvokeOnError(m, status); |