From c2a1af049e9deca890e9923abe64fe6c59060348 Mon Sep 17 00:00:00 2001
From: nga <nga@yandex-team.ru>
Date: Thu, 10 Feb 2022 16:48:09 +0300
Subject: Restoring authorship annotation for <nga@yandex-team.ru>. Commit 2 of
 2.

---
 library/cpp/messagebus/remote_connection.cpp | 312 +++++++++++++--------------
 1 file changed, 156 insertions(+), 156 deletions(-)

(limited to 'library/cpp/messagebus/remote_connection.cpp')

diff --git a/library/cpp/messagebus/remote_connection.cpp b/library/cpp/messagebus/remote_connection.cpp
index 2e14d78eb4..22932569db 100644
--- a/library/cpp/messagebus/remote_connection.cpp
+++ b/library/cpp/messagebus/remote_connection.cpp
@@ -1,11 +1,11 @@
 #include "remote_connection.h"
- 
-#include "key_value_printer.h" 
+
+#include "key_value_printer.h"
 #include "mb_lwtrace.h"
 #include "network.h"
-#include "remote_client_connection.h" 
-#include "remote_client_session.h" 
-#include "remote_server_session.h" 
+#include "remote_client_connection.h"
+#include "remote_client_session.h"
+#include "remote_server_session.h"
 #include "session_impl.h"
 
 #include <library/cpp/messagebus/actor/what_thread_does.h>
@@ -14,12 +14,12 @@
 #include <util/network/init.h>
 #include <util/system/atomic.h>
 
-LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER) 
- 
-using namespace NActor; 
-using namespace NBus; 
-using namespace NBus::NPrivate; 
- 
+LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER)
+
+using namespace NActor;
+using namespace NBus;
+using namespace NBus::NPrivate;
+
 namespace NBus {
     namespace NPrivate {
         TRemoteConnection::TRemoteConnection(TRemoteSessionPtr session, ui64 connectionId, TNetAddr addr)
@@ -46,7 +46,7 @@ namespace NBus {
             ReaderData.Status.ConnectionId = connectionId;
 
             const TInstant now = TInstant::Now();
- 
+
             WriterFillStatus();
 
             GranStatus.Writer.Update(WriterData.Status, now, true);
@@ -56,7 +56,7 @@ namespace NBus {
         TRemoteConnection::~TRemoteConnection() {
             Y_VERIFY(ReplyQueue.IsEmpty());
         }
- 
+
         TRemoteConnection::TWriterData::TWriterData()
             : Down(0)
             , SocketVersion(0)
@@ -65,7 +65,7 @@ namespace NBus {
             , State(WRITER_FILLING)
         {
         }
- 
+
         TRemoteConnection::TWriterData::~TWriterData() {
             Y_VERIFY(AtomicGet(Down));
             Y_VERIFY(SendQueue.Empty());
@@ -76,7 +76,7 @@ namespace NBus {
 
             return (MoreBytes = left >= bytes ? 0 : bytes - left) == 0;
         }
- 
+
         void TRemoteConnection::TWriterData::SetChannel(NEventLoop::TChannelPtr channel) {
             Y_VERIFY(!Channel, "must not have channel");
             Y_VERIFY(Buffer.GetBuffer().Empty() && Buffer.LeftSize() == 0, "buffer must be empty");
@@ -95,11 +95,11 @@ namespace NBus {
                 Channel->Unregister();
                 Channel.Drop();
             }
- 
+
             Buffer.Reset();
             State = WRITER_FILLING;
         }
- 
+
         void TRemoteConnection::TReaderData::DropChannel() {
             // TODO: make Drop call Unregister
             if (!!Channel) {
@@ -109,7 +109,7 @@ namespace NBus {
             Buffer.Reset();
             Offset = 0;
         }
- 
+
         TRemoteConnection::TReaderData::TReaderData()
             : Down(0)
             , SocketVersion(0)
@@ -117,31 +117,31 @@ namespace NBus {
             , MoreBytes(0)
         {
         }
- 
+
         TRemoteConnection::TReaderData::~TReaderData() {
             Y_VERIFY(AtomicGet(Down));
         }
- 
+
         void TRemoteConnection::Send(TNonDestroyingAutoPtr<TBusMessage> msg) {
             BeforeSendQueue.Enqueue(msg.Release());
             AtomicIncrement(WriterData.InFlight);
             ScheduleWrite();
         }
- 
+
         void TRemoteConnection::ClearOutgoingQueue(TMessagesPtrs& result, bool reconnect) {
             if (!reconnect) {
                 // Do not clear send queue if reconnecting
                 WriterData.SendQueue.Clear(&result);
             }
         }
- 
+
         void TRemoteConnection::Shutdown(EMessageStatus status) {
             ScheduleShutdown(status);
 
             ReaderData.ShutdownComplete.WaitI();
             WriterData.ShutdownComplete.WaitI();
         }
- 
+
         void TRemoteConnection::TryConnect() {
             Y_FAIL("TryConnect is client connection only operation");
         }
@@ -158,27 +158,27 @@ namespace NBus {
             if (!WriterData.TimeToRotateCounters.FetchTask()) {
                 return;
             }
- 
+
             WriterData.Status.DurationCounterPrev = WriterData.Status.DurationCounter;
             Reset(WriterData.Status.DurationCounter);
         }
- 
+
         void TRemoteConnection::WriterSendStatus(TInstant now, bool force) {
             GranStatus.Writer.Update(std::bind(&TRemoteConnection::WriterGetStatus, this), now, force);
         }
- 
+
         void TRemoteConnection::ReaderSendStatus(TInstant now, bool force) {
             GranStatus.Reader.Update(std::bind(&TRemoteConnection::ReaderFillStatus, this), now, force);
         }
- 
+
         const TRemoteConnectionReaderStatus& TRemoteConnection::ReaderFillStatus() {
             ReaderData.Status.BufferSize = ReaderData.Buffer.Capacity();
             ReaderData.Status.QuotaMsg = QuotaMsg.Tokens();
             ReaderData.Status.QuotaBytes = QuotaBytes.Tokens();
- 
+
             return ReaderData.Status;
         }
- 
+
         void TRemoteConnection::ProcessItem(TReaderTag, ::NActor::TDefaultTag, TWriterToReaderSocketMessage readSocket) {
             if (AtomicGet(ReaderData.Down)) {
                 ReaderData.Status.Fd = INVALID_SOCKET;
@@ -189,48 +189,48 @@ namespace NBus {
 
             ReaderData.Status.Fd = readSocket.Socket;
             ReaderData.SocketVersion = readSocket.SocketVersion;
- 
+
             if (readSocket.Socket != INVALID_SOCKET) {
                 ReaderData.SetChannel(Session->ReadEventLoop.Register(readSocket.Socket, this, ReadCookie));
                 ReaderData.Channel->EnableRead();
             }
         }
- 
+
         void TRemoteConnection::ProcessItem(TWriterTag, TReconnectTag, ui32 socketVersion) {
             Y_VERIFY(socketVersion <= WriterData.SocketVersion, "something weird");
- 
+
             if (WriterData.SocketVersion != socketVersion) {
                 return;
             }
             Y_VERIFY(WriterData.Status.Connected, "must be connected at this point");
             Y_VERIFY(!!WriterData.Channel, "must have channel at this point");
- 
+
             WriterData.Status.Connected = false;
             WriterData.DropChannel();
             WriterData.Status.MyAddr = TNetAddr();
             ++WriterData.SocketVersion;
             LastConnectAttempt = TInstant();
- 
+
             TMessagesPtrs cleared;
             ClearOutgoingQueue(cleared, true);
             WriterErrorMessages(cleared, MESSAGE_DELIVERY_FAILED);
- 
+
             FireClientConnectionEvent(TClientConnectionEvent::DISCONNECTED);
- 
+
             ReaderGetSocketQueue()->EnqueueAndSchedule(TWriterToReaderSocketMessage(INVALID_SOCKET, WriterData.SocketVersion));
         }
- 
+
         void TRemoteConnection::ProcessItem(TWriterTag, TWakeReaderTag, ui32 awakeFlags) {
             WriterData.AwakeFlags |= awakeFlags;
- 
+
             ReadQuotaWakeup();
         }
- 
+
         void TRemoteConnection::Act(TReaderTag) {
             TInstant now = TInstant::Now();
- 
+
             ReaderData.Status.Acts += 1;
- 
+
             ReaderGetSocketQueue()->DequeueAllLikelyEmpty();
 
             if (AtomicGet(ReaderData.Down)) {
@@ -238,41 +238,41 @@ namespace NBus {
 
                 ReaderProcessStatusDown();
                 ReaderData.ShutdownComplete.Signal();
- 
+
             } else if (!!ReaderData.Channel) {
                 Y_ASSERT(ReaderData.ReadMessages.empty());
- 
+
                 for (int i = 0;; ++i) {
                     if (i == 100) {
                         // perform other tasks
                         GetReaderActor()->AddTaskFromActorLoop();
                         break;
                     }
- 
+
                     if (NeedInterruptRead()) {
                         ReaderData.Channel->EnableRead();
                         break;
                     }
- 
+
                     if (!ReaderFillBuffer())
                         break;
- 
+
                     if (!ReaderProcessBuffer())
                         break;
                 }
- 
+
                 ReaderFlushMessages();
             }
- 
+
             ReaderSendStatus(now);
-        } 
- 
+        }
+
         bool TRemoteConnection::QuotaAcquire(size_t msg, size_t bytes) {
             ui32 wakeFlags = 0;
- 
+
             if (!QuotaMsg.Acquire(msg))
                 wakeFlags |= WAKE_QUOTA_MSG;
- 
+
             else if (!QuotaBytes.Acquire(bytes))
                 wakeFlags |= WAKE_QUOTA_BYTES;
 
@@ -380,7 +380,7 @@ namespace NBus {
 
             ReaderData.Buffer.ChopHead(ReaderData.Offset);
             ReaderData.Offset = 0;
- 
+
             if (ReaderData.Buffer.Capacity() > MaxBufferSize && ReaderData.Buffer.Size() <= MaxBufferSize) {
                 ReaderData.Status.Incremental.BufferDrops += 1;
 
@@ -388,7 +388,7 @@ namespace NBus {
                 // probably should use another constant
                 temp.Reserve(Config.DefaultBufferSize);
                 temp.Append(ReaderData.Buffer.Data(), ReaderData.Buffer.Size());
- 
+
                 ReaderData.Buffer.Swap(temp);
             }
 
@@ -398,14 +398,14 @@ namespace NBus {
         bool TRemoteConnection::ReaderFillBuffer() {
             if (!ReaderData.BufferMore())
                 return true;
- 
+
             if (ReaderData.Buffer.Avail() == 0) {
                 if (ReaderData.Buffer.Size() == 0) {
                     ReaderData.Buffer.Reserve(Config.DefaultBufferSize);
                 } else {
                     ReaderData.Buffer.Reserve(ReaderData.Buffer.Size() * 2);
                 }
-            } 
+            }
 
             Y_ASSERT(ReaderData.Buffer.Avail() > 0);
 
@@ -414,7 +414,7 @@ namespace NBus {
                 TWhatThreadDoesPushPop pp("recv syscall");
                 bytes = SocketRecv(ReaderData.Channel->GetSocket(), TArrayRef<char>(ReaderData.Buffer.Pos(), ReaderData.Buffer.Avail()));
             }
- 
+
             if (bytes < 0) {
                 if (WouldBlock()) {
                     ReaderData.Channel->EnableRead();
@@ -425,30 +425,30 @@ namespace NBus {
                     return false;
                 }
             }
- 
+
             if (bytes == 0) {
                 ReaderData.Channel->DisableRead();
                 // TODO: incorrect: it is possible that only input is shutdown, and output is available
                 ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_DELIVERY_FAILED, false);
                 return false;
             }
- 
+
             ReaderData.Status.Incremental.NetworkOps += 1;
- 
+
             ReaderData.Buffer.Advance(bytes);
             ReaderData.MoreBytes = 0;
             return true;
         }
- 
+
         void TRemoteConnection::ClearBeforeSendQueue(EMessageStatus reason) {
             BeforeSendQueue.DequeueAll(std::bind(&TRemoteConnection::WriterBeforeWriteErrorMessage, this, std::placeholders::_1, reason));
         }
- 
+
         void TRemoteConnection::ClearReplyQueue(EMessageStatus reason) {
             TVectorSwaps<TBusMessagePtrAndHeader> replyQueueTemp;
             Y_ASSERT(replyQueueTemp.empty());
             ReplyQueue.DequeueAllSingleConsumer(&replyQueueTemp);
- 
+
             TVector<TBusMessage*> messages;
             for (TVectorSwaps<TBusMessagePtrAndHeader>::reverse_iterator message = replyQueueTemp.rbegin();
                  message != replyQueueTemp.rend(); ++message) {
@@ -458,8 +458,8 @@ namespace NBus {
             WriterErrorMessages(messages, reason);
 
             replyQueueTemp.clear();
-        } 
- 
+        }
+
         void TRemoteConnection::ProcessBeforeSendQueueMessage(TBusMessage* message, TInstant now) {
             // legacy clients expect this field to be set
             if (!Session->IsSource_) {
@@ -482,7 +482,7 @@ namespace NBus {
         const TRemoteConnectionWriterStatus& TRemoteConnection::WriterGetStatus() {
             WriterRotateCounters();
             WriterFillStatus();
- 
+
             return WriterData.Status;
         }
 
@@ -496,40 +496,40 @@ namespace NBus {
             WriterData.Status.SendQueueSize = WriterData.SendQueue.Size();
             WriterData.Status.State = WriterData.State;
         }
- 
+
         void TRemoteConnection::WriterProcessStatusDown() {
             Session->GetDeadConnectionWriterStatusQueue()->EnqueueAndSchedule(WriterData.Status.Incremental);
             Reset(WriterData.Status.Incremental);
         }
- 
+
         void TRemoteConnection::ReaderProcessStatusDown() {
             Session->GetDeadConnectionReaderStatusQueue()->EnqueueAndSchedule(ReaderData.Status.Incremental);
             Reset(ReaderData.Status.Incremental);
         }
- 
+
         void TRemoteConnection::ProcessWriterDown() {
             if (!RemovedFromSession) {
                 Session->GetRemoveConnectionQueue()->EnqueueAndSchedule(this);
- 
+
                 if (Session->IsSource_) {
                     if (WriterData.Status.Connected) {
                         FireClientConnectionEvent(TClientConnectionEvent::DISCONNECTED);
                     }
                 }
- 
+
                 LWPROBE(Disconnected, ToString(PeerAddr));
                 RemovedFromSession = true;
             }
- 
+
             WriterData.DropChannel();
- 
+
             DropEnqueuedData(ShutdownReason, MESSAGE_SHUTDOWN);
- 
+
             WriterProcessStatusDown();
- 
+
             WriterData.ShutdownComplete.Signal();
         }
- 
+
         void TRemoteConnection::DropEnqueuedData(EMessageStatus reason, EMessageStatus reasonForQueues) {
             ClearReplyQueue(reasonForQueues);
             ClearBeforeSendQueue(reasonForQueues);
@@ -554,23 +554,23 @@ namespace NBus {
 
         void TRemoteConnection::BeforeTryWrite() {
         }
- 
+
         void TRemoteConnection::Act(TWriterTag) {
             TInstant now = TInstant::Now();
- 
+
             WriterData.Status.Acts += 1;
- 
+
             if (Y_UNLIKELY(AtomicGet(WriterData.Down))) {
                 // dump status must work even if WriterDown
                 WriterSendStatus(now, true);
                 ProcessWriterDown();
                 return;
-            } 
- 
+            }
+
             ProcessBeforeSendQueue(now);
- 
+
             BeforeTryWrite();
- 
+
             WriterFillInFlight();
 
             WriterGetReconnectQueue()->DequeueAllLikelyEmpty();
@@ -587,43 +587,43 @@ namespace NBus {
 
                     if (WriterData.State == WRITER_FILLING) {
                         WriterFillBuffer();
- 
+
                         if (WriterData.State == WRITER_FILLING) {
                             WriterData.Channel->DisableWrite();
                             break;
                         }
- 
+
                         Y_ASSERT(!WriterData.Buffer.Empty());
                     }
- 
+
                     if (WriterData.State == WRITER_FLUSHING) {
                         WriterFlushBuffer();
- 
+
                         if (WriterData.State == WRITER_FLUSHING) {
                             break;
                         }
                     }
                 }
             }
- 
+
             WriterGetWakeQueue()->DequeueAllLikelyEmpty();
- 
+
             WriterSendStatus(now);
         }
- 
+
         void TRemoteConnection::WriterFlushBuffer() {
             Y_ASSERT(WriterData.State == WRITER_FLUSHING);
             Y_ASSERT(!WriterData.Buffer.Empty());
- 
+
             WriterData.CorkUntil = TInstant::Zero();
- 
+
             while (!WriterData.Buffer.Empty()) {
                 ssize_t bytes;
                 {
                     TWhatThreadDoesPushPop pp("send syscall");
                     bytes = SocketSend(WriterData.Channel->GetSocket(), TArrayRef<const char>(WriterData.Buffer.LeftPos(), WriterData.Buffer.Size()));
                 }
- 
+
                 if (bytes < 0) {
                     if (WouldBlock()) {
                         WriterData.Channel->EnableWrite();
@@ -634,18 +634,18 @@ namespace NBus {
                         return;
                     }
                 }
- 
+
                 WriterData.Status.Incremental.NetworkOps += 1;
- 
+
                 WriterData.Buffer.LeftProceed(bytes);
             }
- 
+
             WriterData.Buffer.Clear();
             if (WriterData.Buffer.Capacity() > MaxBufferSize) {
                 WriterData.Status.Incremental.BufferDrops += 1;
                 WriterData.Buffer.Reset();
             }
- 
+
             WriterData.State = WRITER_FILLING;
         }
 
@@ -655,8 +655,8 @@ namespace NBus {
             } else {
                 ScheduleShutdown(status);
             }
-        } 
- 
+        }
+
         void TRemoteConnection::ScheduleShutdown(EMessageStatus status) {
             ShutdownReason = status;
 
@@ -673,20 +673,20 @@ namespace NBus {
             Y_VERIFY(buffer.Size() >= posForAssertion,
                      "incorrect Serialize implementation, pos before serialize: %d, pos after serialize: %d",
                      int(posForAssertion), int(buffer.Size()));
-        } 
- 
+        }
+
         namespace {
             inline void WriteHeader(const TBusHeader& header, TBuffer& data) {
                 data.Reserve(data.Size() + sizeof(TBusHeader));
                 /// \todo hton instead of memcpy
                 memcpy(data.Data() + data.Size(), &header, sizeof(TBusHeader));
                 data.Advance(sizeof(TBusHeader));
-            } 
- 
+            }
+
             inline void WriteDummyHeader(TBuffer& data) {
                 data.Resize(data.Size() + sizeof(TBusHeader));
             }
- 
+
         }
 
         void TRemoteConnection::SerializeMessage(TBusMessage* msg, TBuffer* data, TMessageCounter* counter) const {
@@ -695,17 +695,17 @@ namespace NBus {
             size_t dataSize;
 
             bool compressionRequested = msg->IsCompressed();
- 
+
             if (compressionRequested) {
                 TBuffer compdata;
                 TBuffer plaindata;
                 CallSerialize(msg, plaindata);
- 
+
                 dataSize = sizeof(TBusHeader) + plaindata.Size();
- 
+
                 NCodecs::TCodecPtr c = Proto->GetTransportCodec();
                 c->Encode(TStringBuf{plaindata.data(), plaindata.size()}, compdata);
- 
+
                 if (compdata.Size() < plaindata.Size()) {
                     plaindata.Clear();
                     msg->GetHeader()->Size = sizeof(TBusHeader) + compdata.Size();
@@ -721,21 +721,21 @@ namespace NBus {
             } else {
                 WriteDummyHeader(*data);
                 CallSerialize(msg, *data);
- 
+
                 dataSize = msg->GetHeader()->Size = data->Size() - pos;
- 
+
                 data->Proceed(pos);
                 WriteHeader(*msg->GetHeader(), *data);
                 data->Proceed(pos + msg->GetHeader()->Size);
             }
- 
+
             Y_ASSERT(msg->GetHeader()->Size == data->Size() - pos);
             counter->AddMessage(dataSize, data->Size() - pos, msg->IsCompressed(), compressionRequested);
         }
- 
+
         TBusMessage* TRemoteConnection::DeserializeMessage(TArrayRef<const char> dataRef, const TBusHeader* header, TMessageCounter* messageCounter, EMessageStatus* status) const {
             size_t dataSize;
- 
+
             TBusMessage* message;
             if (header->FlagsInternal & MESSAGE_COMPRESS_INTERNAL) {
                 TBuffer msg;
@@ -751,7 +751,7 @@ namespace NBus {
                         *status = MESSAGE_DECOMPRESS_ERROR;
                         return nullptr;
                     }
- 
+
                     msg.Append(dataRef.data(), sizeof(TBusHeader));
                     msg.Append(plaindata.Data(), plaindata.Size());
                 }
@@ -774,39 +774,39 @@ namespace NBus {
                 }
                 *message->GetHeader() = *header;
             }
- 
+
             messageCounter->AddMessage(dataSize, dataRef.size(), header->FlagsInternal & MESSAGE_COMPRESS_INTERNAL, false);
- 
+
             return message;
         }
- 
+
         void TRemoteConnection::ResetOneWayFlag(TArrayRef<TBusMessage*> messages) {
             for (auto message : messages) {
                 message->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL;
             }
-        } 
- 
+        }
+
         void TRemoteConnection::ReaderFlushMessages() {
             if (!ReaderData.ReadMessages.empty()) {
                 Session->OnMessageReceived(this, ReaderData.ReadMessages);
                 ReaderData.ReadMessages.clear();
-            } 
-        } 
- 
+            }
+        }
+
         // @return false if actor should break
         bool TRemoteConnection::MessageRead(TArrayRef<const char> readDataRef, TInstant now) {
             TBusHeader header(readDataRef);
- 
+
             Y_ASSERT(readDataRef.size() == header.Size);
- 
+
             if (header.GetVersionInternal() != YBUS_VERSION) {
                 ReaderProcessMessageUnknownVersion(readDataRef);
                 return true;
             }
- 
+
             EMessageStatus deserializeFailureStatus = MESSAGE_OK;
             TBusMessage* r = DeserializeMessage(readDataRef, &header, &ReaderData.Status.Incremental.MessageCounter, &deserializeFailureStatus);
- 
+
             if (!r) {
                 Y_VERIFY(deserializeFailureStatus != MESSAGE_OK, "state check");
                 LWPROBE(Error, ToString(deserializeFailureStatus), ToString(PeerAddr), "");
@@ -814,16 +814,16 @@ namespace NBus {
                 ScheduleShutdownOnServerOrReconnectOnClient(deserializeFailureStatus, false);
                 return false;
             }
- 
+
             LWPROBE(Read, r->GetHeader()->Size);
- 
+
             r->ReplyTo = PeerAddrSocketAddr;
- 
+
             TBusMessagePtrAndHeader h(r);
             r->RecvTime = now;
 
             QuotaConsume(1, header.Size);
- 
+
             ReaderData.ReadMessages.push_back(h);
             if (ReaderData.ReadMessages.size() >= 100) {
                 ReaderFlushMessages();
@@ -831,12 +831,12 @@ namespace NBus {
 
             return true;
         }
- 
+
         void TRemoteConnection::WriterFillBuffer() {
             Y_ASSERT(WriterData.State == WRITER_FILLING);
 
             Y_ASSERT(WriterData.Buffer.LeftSize() == 0);
- 
+
             if (Y_UNLIKELY(!WrongVersionRequests.IsEmpty())) {
                 TVector<TBusHeader> headers;
                 WrongVersionRequests.DequeueAllSingleConsumer(&headers);
@@ -849,12 +849,12 @@ namespace NBus {
                     response.SetVersionInternal(YBUS_VERSION);
                     WriteHeader(response, WriterData.Buffer.GetBuffer());
                 }
- 
+
                 Y_ASSERT(!WriterData.Buffer.Empty());
                 WriterData.State = WRITER_FLUSHING;
                 return;
             }
- 
+
             TTempTlsVector<TBusMessagePtrAndHeader, void, TVectorSwaps> writeMessages;
 
             for (;;) {
@@ -862,7 +862,7 @@ namespace NBus {
                 if (!writeMessage) {
                     break;
                 }
- 
+
                 if (Config.Cork != TDuration::Zero()) {
                     if (WriterData.CorkUntil == TInstant::Zero()) {
                         WriterData.CorkUntil = TInstant::Now() + Config.Cork;
@@ -870,29 +870,29 @@ namespace NBus {
                 }
 
                 size_t sizeBeforeSerialize = WriterData.Buffer.Size();
- 
+
                 TMessageCounter messageCounter = WriterData.Status.Incremental.MessageCounter;
- 
+
                 SerializeMessage(writeMessage.Get(), &WriterData.Buffer.GetBuffer(), &messageCounter);
- 
+
                 size_t written = WriterData.Buffer.Size() - sizeBeforeSerialize;
                 if (written > Config.MaxMessageSize) {
                     WriterData.Buffer.GetBuffer().EraseBack(written);
                     WriterBeforeWriteErrorMessage(writeMessage.Release(), MESSAGE_MESSAGE_TOO_LARGE);
                     continue;
                 }
- 
+
                 WriterData.Status.Incremental.MessageCounter = messageCounter;
- 
+
                 TBusMessagePtrAndHeader h(writeMessage.Release());
                 writeMessages.GetVector()->push_back(h);
- 
+
                 Y_ASSERT(!WriterData.Buffer.Empty());
                 if (WriterData.Buffer.Size() >= Config.SendThreshold) {
                     break;
                 }
-            } 
- 
+            }
+
             if (!WriterData.Buffer.Empty()) {
                 if (WriterData.Buffer.Size() >= Config.SendThreshold) {
                     WriterData.State = WRITER_FLUSHING;
@@ -909,31 +909,31 @@ namespace NBus {
                 // keep filling
                 Y_ASSERT(WriterData.State == WRITER_FILLING);
             }
- 
+
             size_t bytes = MessageSize(*writeMessages.GetVector());
- 
+
             QuotaReturnSelf(writeMessages.GetVector()->size(), bytes);
- 
+
             // This is called before `send` syscall inducing latency
             MessageSent(*writeMessages.GetVector());
-        } 
- 
+        }
+
         size_t TRemoteConnection::MessageSize(TArrayRef<TBusMessagePtrAndHeader> messages) {
             size_t size = 0;
             for (const auto& message : messages)
                 size += message.MessagePtr->RequestSize;
- 
+
             return size;
         }
- 
+
         size_t TRemoteConnection::GetInFlight() {
             return AtomicGet(WriterData.InFlight);
-        } 
- 
+        }
+
         size_t TRemoteConnection::GetConnectSyscallsNumForTest() {
             return WriterData.Status.ConnectSyscalls;
-        } 
- 
+        }
+
         void TRemoteConnection::WriterBeforeWriteErrorMessage(TBusMessage* message, EMessageStatus status) {
             if (Session->IsSource_) {
                 CheckedCast<TRemoteClientSession*>(Session.Get())->ReleaseInFlight({message});
@@ -970,5 +970,5 @@ namespace NBus {
             return !AtomicGet(WriterData.Down);
         }
 
-    } 
-} 
+    }
+}
-- 
cgit v1.2.3