From 72cb13b4aff9bc9cf22e49251bc8fd143f82538f Mon Sep 17 00:00:00 2001
From: Anton Samokhvalov <pg83@yandex.ru>
Date: Thu, 10 Feb 2022 16:45:15 +0300
Subject: Restoring authorship annotation for Anton Samokhvalov
 <pg83@yandex.ru>. Commit 1 of 2.

---
 library/cpp/messagebus/remote_connection.cpp | 1614 +++++++++++++-------------
 1 file changed, 807 insertions(+), 807 deletions(-)

(limited to 'library/cpp/messagebus/remote_connection.cpp')

diff --git a/library/cpp/messagebus/remote_connection.cpp b/library/cpp/messagebus/remote_connection.cpp
index 22932569db..ca4a66f68a 100644
--- a/library/cpp/messagebus/remote_connection.cpp
+++ b/library/cpp/messagebus/remote_connection.cpp
@@ -20,955 +20,955 @@ using namespace NActor;
 using namespace NBus;
 using namespace NBus::NPrivate;
 
-namespace NBus {
-    namespace NPrivate {
-        TRemoteConnection::TRemoteConnection(TRemoteSessionPtr session, ui64 connectionId, TNetAddr addr)
-            : TActor<TRemoteConnection, TWriterTag>(session->Queue->WorkQueue.Get())
-            , TActor<TRemoteConnection, TReaderTag>(session->Queue->WorkQueue.Get())
-            , TScheduleActor<TRemoteConnection, TWriterTag>(&session->Queue->Scheduler)
-            , Session(session)
-            , Proto(session->Proto)
-            , Config(session->Config)
-            , RemovedFromSession(false)
-            , ConnectionId(connectionId)
-            , PeerAddr(addr)
-            , PeerAddrSocketAddr(addr)
-            , CreatedTime(TInstant::Now())
-            , ReturnConnectFailedImmediately(false)
-            , GranStatus(Config.Secret.StatusFlushPeriod)
-            , QuotaMsg(!Session->IsSource_, Config.PerConnectionMaxInFlight, 0)
-            , QuotaBytes(!Session->IsSource_, Config.PerConnectionMaxInFlightBySize, 0)
-            , MaxBufferSize(session->Config.MaxBufferSize)
-            , ShutdownReason(MESSAGE_OK)
-        {
-            WriterData.Status.ConnectionId = connectionId;
-            WriterData.Status.PeerAddr = PeerAddr;
-            ReaderData.Status.ConnectionId = connectionId;
-
-            const TInstant now = TInstant::Now();
-
-            WriterFillStatus();
-
-            GranStatus.Writer.Update(WriterData.Status, now, true);
-            GranStatus.Reader.Update(ReaderData.Status, now, true);
-        }
-
-        TRemoteConnection::~TRemoteConnection() {
-            Y_VERIFY(ReplyQueue.IsEmpty());
-        }
-
-        TRemoteConnection::TWriterData::TWriterData()
-            : Down(0)
-            , SocketVersion(0)
-            , InFlight(0)
-            , AwakeFlags(0)
-            , State(WRITER_FILLING)
-        {
-        }
-
-        TRemoteConnection::TWriterData::~TWriterData() {
+namespace NBus { 
+    namespace NPrivate { 
+        TRemoteConnection::TRemoteConnection(TRemoteSessionPtr session, ui64 connectionId, TNetAddr addr) 
+            : TActor<TRemoteConnection, TWriterTag>(session->Queue->WorkQueue.Get()) 
+            , TActor<TRemoteConnection, TReaderTag>(session->Queue->WorkQueue.Get()) 
+            , TScheduleActor<TRemoteConnection, TWriterTag>(&session->Queue->Scheduler) 
+            , Session(session) 
+            , Proto(session->Proto) 
+            , Config(session->Config) 
+            , RemovedFromSession(false) 
+            , ConnectionId(connectionId) 
+            , PeerAddr(addr) 
+            , PeerAddrSocketAddr(addr) 
+            , CreatedTime(TInstant::Now()) 
+            , ReturnConnectFailedImmediately(false) 
+            , GranStatus(Config.Secret.StatusFlushPeriod) 
+            , QuotaMsg(!Session->IsSource_, Config.PerConnectionMaxInFlight, 0) 
+            , QuotaBytes(!Session->IsSource_, Config.PerConnectionMaxInFlightBySize, 0) 
+            , MaxBufferSize(session->Config.MaxBufferSize) 
+            , ShutdownReason(MESSAGE_OK) 
+        { 
+            WriterData.Status.ConnectionId = connectionId; 
+            WriterData.Status.PeerAddr = PeerAddr; 
+            ReaderData.Status.ConnectionId = connectionId; 
+
+            const TInstant now = TInstant::Now(); 
+
+            WriterFillStatus(); 
+
+            GranStatus.Writer.Update(WriterData.Status, now, true); 
+            GranStatus.Reader.Update(ReaderData.Status, now, true); 
+        } 
+
+        TRemoteConnection::~TRemoteConnection() { 
+            Y_VERIFY(ReplyQueue.IsEmpty()); 
+        } 
+
+        TRemoteConnection::TWriterData::TWriterData() 
+            : Down(0) 
+            , SocketVersion(0) 
+            , InFlight(0) 
+            , AwakeFlags(0) 
+            , State(WRITER_FILLING) 
+        { 
+        } 
+
+        TRemoteConnection::TWriterData::~TWriterData() { 
             Y_VERIFY(AtomicGet(Down));
-            Y_VERIFY(SendQueue.Empty());
-        }
-
-        bool TRemoteConnection::TReaderData::HasBytesInBuf(size_t bytes) noexcept {
-            size_t left = Buffer.Size() - Offset;
-
-            return (MoreBytes = left >= bytes ? 0 : bytes - left) == 0;
-        }
-
-        void TRemoteConnection::TWriterData::SetChannel(NEventLoop::TChannelPtr channel) {
-            Y_VERIFY(!Channel, "must not have channel");
-            Y_VERIFY(Buffer.GetBuffer().Empty() && Buffer.LeftSize() == 0, "buffer must be empty");
-            Y_VERIFY(State == WRITER_FILLING, "state must be initial");
-            Channel = channel;
-        }
-
-        void TRemoteConnection::TReaderData::SetChannel(NEventLoop::TChannelPtr channel) {
-            Y_VERIFY(!Channel, "must not have channel");
-            Y_VERIFY(Buffer.Empty(), "buffer must be empty");
-            Channel = channel;
-        }
-
-        void TRemoteConnection::TWriterData::DropChannel() {
-            if (!!Channel) {
-                Channel->Unregister();
-                Channel.Drop();
-            }
-
-            Buffer.Reset();
-            State = WRITER_FILLING;
-        }
-
-        void TRemoteConnection::TReaderData::DropChannel() {
-            // TODO: make Drop call Unregister
-            if (!!Channel) {
-                Channel->Unregister();
-                Channel.Drop();
-            }
-            Buffer.Reset();
-            Offset = 0;
-        }
-
-        TRemoteConnection::TReaderData::TReaderData()
-            : Down(0)
-            , SocketVersion(0)
-            , Offset(0)
-            , MoreBytes(0)
-        {
-        }
-
-        TRemoteConnection::TReaderData::~TReaderData() {
+            Y_VERIFY(SendQueue.Empty()); 
+        } 
+
+        bool TRemoteConnection::TReaderData::HasBytesInBuf(size_t bytes) noexcept { 
+            size_t left = Buffer.Size() - Offset; 
+
+            return (MoreBytes = left >= bytes ? 0 : bytes - left) == 0; 
+        } 
+
+        void TRemoteConnection::TWriterData::SetChannel(NEventLoop::TChannelPtr channel) { 
+            Y_VERIFY(!Channel, "must not have channel"); 
+            Y_VERIFY(Buffer.GetBuffer().Empty() && Buffer.LeftSize() == 0, "buffer must be empty"); 
+            Y_VERIFY(State == WRITER_FILLING, "state must be initial"); 
+            Channel = channel; 
+        } 
+
+        void TRemoteConnection::TReaderData::SetChannel(NEventLoop::TChannelPtr channel) { 
+            Y_VERIFY(!Channel, "must not have channel"); 
+            Y_VERIFY(Buffer.Empty(), "buffer must be empty"); 
+            Channel = channel; 
+        } 
+
+        void TRemoteConnection::TWriterData::DropChannel() { 
+            if (!!Channel) { 
+                Channel->Unregister(); 
+                Channel.Drop(); 
+            } 
+
+            Buffer.Reset(); 
+            State = WRITER_FILLING; 
+        } 
+
+        void TRemoteConnection::TReaderData::DropChannel() { 
+            // TODO: make Drop call Unregister 
+            if (!!Channel) { 
+                Channel->Unregister(); 
+                Channel.Drop(); 
+            } 
+            Buffer.Reset(); 
+            Offset = 0; 
+        } 
+
+        TRemoteConnection::TReaderData::TReaderData() 
+            : Down(0) 
+            , SocketVersion(0) 
+            , Offset(0) 
+            , MoreBytes(0) 
+        { 
+        } 
+
+        TRemoteConnection::TReaderData::~TReaderData() { 
             Y_VERIFY(AtomicGet(Down));
-        }
+        } 
+
+        void TRemoteConnection::Send(TNonDestroyingAutoPtr<TBusMessage> msg) { 
+            BeforeSendQueue.Enqueue(msg.Release()); 
+            AtomicIncrement(WriterData.InFlight); 
+            ScheduleWrite(); 
+        } 
+
+        void TRemoteConnection::ClearOutgoingQueue(TMessagesPtrs& result, bool reconnect) { 
+            if (!reconnect) { 
+                // Do not clear send queue if reconnecting 
+                WriterData.SendQueue.Clear(&result); 
+            } 
+        } 
 
-        void TRemoteConnection::Send(TNonDestroyingAutoPtr<TBusMessage> msg) {
-            BeforeSendQueue.Enqueue(msg.Release());
-            AtomicIncrement(WriterData.InFlight);
-            ScheduleWrite();
-        }
+        void TRemoteConnection::Shutdown(EMessageStatus status) { 
+            ScheduleShutdown(status); 
 
-        void TRemoteConnection::ClearOutgoingQueue(TMessagesPtrs& result, bool reconnect) {
-            if (!reconnect) {
-                // Do not clear send queue if reconnecting
-                WriterData.SendQueue.Clear(&result);
-            }
-        }
+            ReaderData.ShutdownComplete.WaitI(); 
+            WriterData.ShutdownComplete.WaitI(); 
+        } 
 
-        void TRemoteConnection::Shutdown(EMessageStatus status) {
-            ScheduleShutdown(status);
+        void TRemoteConnection::TryConnect() { 
+            Y_FAIL("TryConnect is client connection only operation"); 
+        } 
 
-            ReaderData.ShutdownComplete.WaitI();
-            WriterData.ShutdownComplete.WaitI();
-        }
+        void TRemoteConnection::ScheduleRead() { 
+            GetReaderActor()->Schedule(); 
+        } 
 
-        void TRemoteConnection::TryConnect() {
-            Y_FAIL("TryConnect is client connection only operation");
-        }
+        void TRemoteConnection::ScheduleWrite() { 
+            GetWriterActor()->Schedule(); 
+        } 
 
-        void TRemoteConnection::ScheduleRead() {
-            GetReaderActor()->Schedule();
-        }
+        void TRemoteConnection::WriterRotateCounters() { 
+            if (!WriterData.TimeToRotateCounters.FetchTask()) { 
+                return; 
+            } 
 
-        void TRemoteConnection::ScheduleWrite() {
-            GetWriterActor()->Schedule();
-        }
+            WriterData.Status.DurationCounterPrev = WriterData.Status.DurationCounter; 
+            Reset(WriterData.Status.DurationCounter); 
+        } 
 
-        void TRemoteConnection::WriterRotateCounters() {
-            if (!WriterData.TimeToRotateCounters.FetchTask()) {
-                return;
-            }
+        void TRemoteConnection::WriterSendStatus(TInstant now, bool force) { 
+            GranStatus.Writer.Update(std::bind(&TRemoteConnection::WriterGetStatus, this), now, force); 
+        } 
 
-            WriterData.Status.DurationCounterPrev = WriterData.Status.DurationCounter;
-            Reset(WriterData.Status.DurationCounter);
-        }
+        void TRemoteConnection::ReaderSendStatus(TInstant now, bool force) { 
+            GranStatus.Reader.Update(std::bind(&TRemoteConnection::ReaderFillStatus, this), now, force); 
+        } 
 
-        void TRemoteConnection::WriterSendStatus(TInstant now, bool force) {
-            GranStatus.Writer.Update(std::bind(&TRemoteConnection::WriterGetStatus, this), now, force);
-        }
+        const TRemoteConnectionReaderStatus& TRemoteConnection::ReaderFillStatus() { 
+            ReaderData.Status.BufferSize = ReaderData.Buffer.Capacity(); 
+            ReaderData.Status.QuotaMsg = QuotaMsg.Tokens(); 
+            ReaderData.Status.QuotaBytes = QuotaBytes.Tokens(); 
 
-        void TRemoteConnection::ReaderSendStatus(TInstant now, bool force) {
-            GranStatus.Reader.Update(std::bind(&TRemoteConnection::ReaderFillStatus, this), now, force);
-        }
-
-        const TRemoteConnectionReaderStatus& TRemoteConnection::ReaderFillStatus() {
-            ReaderData.Status.BufferSize = ReaderData.Buffer.Capacity();
-            ReaderData.Status.QuotaMsg = QuotaMsg.Tokens();
-            ReaderData.Status.QuotaBytes = QuotaBytes.Tokens();
-
-            return ReaderData.Status;
-        }
+            return ReaderData.Status; 
+        } 
 
-        void TRemoteConnection::ProcessItem(TReaderTag, ::NActor::TDefaultTag, TWriterToReaderSocketMessage readSocket) {
-            if (AtomicGet(ReaderData.Down)) {
-                ReaderData.Status.Fd = INVALID_SOCKET;
-                return;
-            }
+        void TRemoteConnection::ProcessItem(TReaderTag, ::NActor::TDefaultTag, TWriterToReaderSocketMessage readSocket) { 
+            if (AtomicGet(ReaderData.Down)) { 
+                ReaderData.Status.Fd = INVALID_SOCKET; 
+                return; 
+            } 
 
-            ReaderData.DropChannel();
+            ReaderData.DropChannel(); 
 
-            ReaderData.Status.Fd = readSocket.Socket;
-            ReaderData.SocketVersion = readSocket.SocketVersion;
+            ReaderData.Status.Fd = readSocket.Socket; 
+            ReaderData.SocketVersion = readSocket.SocketVersion; 
 
-            if (readSocket.Socket != INVALID_SOCKET) {
-                ReaderData.SetChannel(Session->ReadEventLoop.Register(readSocket.Socket, this, ReadCookie));
-                ReaderData.Channel->EnableRead();
-            }
-        }
+            if (readSocket.Socket != INVALID_SOCKET) { 
+                ReaderData.SetChannel(Session->ReadEventLoop.Register(readSocket.Socket, this, ReadCookie)); 
+                ReaderData.Channel->EnableRead(); 
+            } 
+        } 
 
-        void TRemoteConnection::ProcessItem(TWriterTag, TReconnectTag, ui32 socketVersion) {
-            Y_VERIFY(socketVersion <= WriterData.SocketVersion, "something weird");
+        void TRemoteConnection::ProcessItem(TWriterTag, TReconnectTag, ui32 socketVersion) { 
+            Y_VERIFY(socketVersion <= WriterData.SocketVersion, "something weird"); 
 
-            if (WriterData.SocketVersion != socketVersion) {
-                return;
-            }
-            Y_VERIFY(WriterData.Status.Connected, "must be connected at this point");
-            Y_VERIFY(!!WriterData.Channel, "must have channel at this point");
+            if (WriterData.SocketVersion != socketVersion) { 
+                return; 
+            } 
+            Y_VERIFY(WriterData.Status.Connected, "must be connected at this point"); 
+            Y_VERIFY(!!WriterData.Channel, "must have channel at this point"); 
 
-            WriterData.Status.Connected = false;
-            WriterData.DropChannel();
-            WriterData.Status.MyAddr = TNetAddr();
-            ++WriterData.SocketVersion;
-            LastConnectAttempt = TInstant();
+            WriterData.Status.Connected = false; 
+            WriterData.DropChannel(); 
+            WriterData.Status.MyAddr = TNetAddr(); 
+            ++WriterData.SocketVersion; 
+            LastConnectAttempt = TInstant(); 
 
-            TMessagesPtrs cleared;
-            ClearOutgoingQueue(cleared, true);
-            WriterErrorMessages(cleared, MESSAGE_DELIVERY_FAILED);
+            TMessagesPtrs cleared; 
+            ClearOutgoingQueue(cleared, true); 
+            WriterErrorMessages(cleared, MESSAGE_DELIVERY_FAILED); 
 
-            FireClientConnectionEvent(TClientConnectionEvent::DISCONNECTED);
+            FireClientConnectionEvent(TClientConnectionEvent::DISCONNECTED); 
 
-            ReaderGetSocketQueue()->EnqueueAndSchedule(TWriterToReaderSocketMessage(INVALID_SOCKET, WriterData.SocketVersion));
-        }
+            ReaderGetSocketQueue()->EnqueueAndSchedule(TWriterToReaderSocketMessage(INVALID_SOCKET, WriterData.SocketVersion)); 
+        } 
 
-        void TRemoteConnection::ProcessItem(TWriterTag, TWakeReaderTag, ui32 awakeFlags) {
-            WriterData.AwakeFlags |= awakeFlags;
+        void TRemoteConnection::ProcessItem(TWriterTag, TWakeReaderTag, ui32 awakeFlags) { 
+            WriterData.AwakeFlags |= awakeFlags; 
 
-            ReadQuotaWakeup();
-        }
+            ReadQuotaWakeup(); 
+        } 
 
-        void TRemoteConnection::Act(TReaderTag) {
-            TInstant now = TInstant::Now();
+        void TRemoteConnection::Act(TReaderTag) { 
+            TInstant now = TInstant::Now(); 
 
-            ReaderData.Status.Acts += 1;
+            ReaderData.Status.Acts += 1; 
 
-            ReaderGetSocketQueue()->DequeueAllLikelyEmpty();
+            ReaderGetSocketQueue()->DequeueAllLikelyEmpty(); 
 
-            if (AtomicGet(ReaderData.Down)) {
-                ReaderData.DropChannel();
+            if (AtomicGet(ReaderData.Down)) { 
+                ReaderData.DropChannel(); 
 
-                ReaderProcessStatusDown();
-                ReaderData.ShutdownComplete.Signal();
+                ReaderProcessStatusDown(); 
+                ReaderData.ShutdownComplete.Signal(); 
 
-            } else if (!!ReaderData.Channel) {
-                Y_ASSERT(ReaderData.ReadMessages.empty());
+            } else if (!!ReaderData.Channel) { 
+                Y_ASSERT(ReaderData.ReadMessages.empty()); 
 
-                for (int i = 0;; ++i) {
-                    if (i == 100) {
-                        // perform other tasks
-                        GetReaderActor()->AddTaskFromActorLoop();
-                        break;
-                    }
+                for (int i = 0;; ++i) { 
+                    if (i == 100) { 
+                        // perform other tasks 
+                        GetReaderActor()->AddTaskFromActorLoop(); 
+                        break; 
+                    } 
 
-                    if (NeedInterruptRead()) {
-                        ReaderData.Channel->EnableRead();
-                        break;
-                    }
+                    if (NeedInterruptRead()) { 
+                        ReaderData.Channel->EnableRead(); 
+                        break; 
+                    } 
 
-                    if (!ReaderFillBuffer())
-                        break;
+                    if (!ReaderFillBuffer()) 
+                        break; 
 
-                    if (!ReaderProcessBuffer())
-                        break;
-                }
+                    if (!ReaderProcessBuffer()) 
+                        break; 
+                } 
 
-                ReaderFlushMessages();
+                ReaderFlushMessages(); 
             }
 
-            ReaderSendStatus(now);
+            ReaderSendStatus(now); 
         }
 
-        bool TRemoteConnection::QuotaAcquire(size_t msg, size_t bytes) {
-            ui32 wakeFlags = 0;
+        bool TRemoteConnection::QuotaAcquire(size_t msg, size_t bytes) { 
+            ui32 wakeFlags = 0; 
 
-            if (!QuotaMsg.Acquire(msg))
-                wakeFlags |= WAKE_QUOTA_MSG;
+            if (!QuotaMsg.Acquire(msg)) 
+                wakeFlags |= WAKE_QUOTA_MSG; 
 
-            else if (!QuotaBytes.Acquire(bytes))
-                wakeFlags |= WAKE_QUOTA_BYTES;
+            else if (!QuotaBytes.Acquire(bytes)) 
+                wakeFlags |= WAKE_QUOTA_BYTES; 
 
-            if (wakeFlags) {
-                ReaderData.Status.QuotaExhausted++;
+            if (wakeFlags) { 
+                ReaderData.Status.QuotaExhausted++; 
 
-                WriterGetWakeQueue()->EnqueueAndSchedule(wakeFlags);
-            }
+                WriterGetWakeQueue()->EnqueueAndSchedule(wakeFlags); 
+            } 
 
-            return wakeFlags == 0;
-        }
+            return wakeFlags == 0; 
+        } 
 
-        void TRemoteConnection::QuotaConsume(size_t msg, size_t bytes) {
-            QuotaMsg.Consume(msg);
-            QuotaBytes.Consume(bytes);
-        }
+        void TRemoteConnection::QuotaConsume(size_t msg, size_t bytes) { 
+            QuotaMsg.Consume(msg); 
+            QuotaBytes.Consume(bytes); 
+        } 
 
-        void TRemoteConnection::QuotaReturnSelf(size_t items, size_t bytes) {
-            if (QuotaReturnValues(items, bytes))
-                ReadQuotaWakeup();
-        }
+        void TRemoteConnection::QuotaReturnSelf(size_t items, size_t bytes) { 
+            if (QuotaReturnValues(items, bytes)) 
+                ReadQuotaWakeup(); 
+        } 
 
-        void TRemoteConnection::QuotaReturnAside(size_t items, size_t bytes) {
-            if (QuotaReturnValues(items, bytes) && !AtomicGet(WriterData.Down))
-                WriterGetWakeQueue()->EnqueueAndSchedule(0x0);
-        }
+        void TRemoteConnection::QuotaReturnAside(size_t items, size_t bytes) { 
+            if (QuotaReturnValues(items, bytes) && !AtomicGet(WriterData.Down)) 
+                WriterGetWakeQueue()->EnqueueAndSchedule(0x0); 
+        } 
 
-        bool TRemoteConnection::QuotaReturnValues(size_t items, size_t bytes) {
-            bool rMsg = QuotaMsg.Return(items);
-            bool rBytes = QuotaBytes.Return(bytes);
+        bool TRemoteConnection::QuotaReturnValues(size_t items, size_t bytes) { 
+            bool rMsg = QuotaMsg.Return(items); 
+            bool rBytes = QuotaBytes.Return(bytes); 
 
-            return rMsg || rBytes;
-        }
+            return rMsg || rBytes; 
+        } 
 
-        void TRemoteConnection::ReadQuotaWakeup() {
-            const ui32 mask = WriterData.AwakeFlags & WriteWakeFlags();
+        void TRemoteConnection::ReadQuotaWakeup() { 
+            const ui32 mask = WriterData.AwakeFlags & WriteWakeFlags(); 
 
-            if (mask && mask == WriterData.AwakeFlags) {
-                WriterData.Status.ReaderWakeups++;
-                WriterData.AwakeFlags = 0;
+            if (mask && mask == WriterData.AwakeFlags) { 
+                WriterData.Status.ReaderWakeups++; 
+                WriterData.AwakeFlags = 0; 
 
-                ScheduleRead();
-            }
-        }
+                ScheduleRead(); 
+            } 
+        } 
 
-        ui32 TRemoteConnection::WriteWakeFlags() const {
-            ui32 awakeFlags = 0;
+        ui32 TRemoteConnection::WriteWakeFlags() const { 
+            ui32 awakeFlags = 0; 
 
-            if (QuotaMsg.IsAboveWake())
-                awakeFlags |= WAKE_QUOTA_MSG;
+            if (QuotaMsg.IsAboveWake()) 
+                awakeFlags |= WAKE_QUOTA_MSG; 
 
-            if (QuotaBytes.IsAboveWake())
-                awakeFlags |= WAKE_QUOTA_BYTES;
+            if (QuotaBytes.IsAboveWake()) 
+                awakeFlags |= WAKE_QUOTA_BYTES; 
 
-            return awakeFlags;
-        }
+            return awakeFlags; 
+        } 
 
-        bool TRemoteConnection::ReaderProcessBuffer() {
-            TInstant now = TInstant::Now();
-
-            for (;;) {
-                if (!ReaderData.HasBytesInBuf(sizeof(TBusHeader))) {
-                    break;
-                }
-
-                TBusHeader header(MakeArrayRef(ReaderData.Buffer.Data() + ReaderData.Offset, ReaderData.Buffer.Size() - ReaderData.Offset));
-
-                if (header.Size < sizeof(TBusHeader)) {
-                    LWPROBE(Error, ToString(MESSAGE_HEADER_CORRUPTED), ToString(PeerAddr), ToString(header.Size));
-                    ReaderData.Status.Incremental.StatusCounter[MESSAGE_HEADER_CORRUPTED] += 1;
-                    ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_HEADER_CORRUPTED, false);
-                    return false;
-                }
-
-                if (!IsVersionNegotiation(header) && !IsBusKeyValid(header.Id)) {
-                    LWPROBE(Error, ToString(MESSAGE_HEADER_CORRUPTED), ToString(PeerAddr), ToString(header.Size));
-                    ReaderData.Status.Incremental.StatusCounter[MESSAGE_HEADER_CORRUPTED] += 1;
-                    ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_HEADER_CORRUPTED, false);
-                    return false;
-                }
-
-                if (header.Size > Config.MaxMessageSize) {
-                    LWPROBE(Error, ToString(MESSAGE_MESSAGE_TOO_LARGE), ToString(PeerAddr), ToString(header.Size));
-                    ReaderData.Status.Incremental.StatusCounter[MESSAGE_MESSAGE_TOO_LARGE] += 1;
-                    ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_MESSAGE_TOO_LARGE, false);
-                    return false;
-                }
-
-                if (!ReaderData.HasBytesInBuf(header.Size)) {
-                    if (ReaderData.Offset == 0) {
-                        ReaderData.Buffer.Reserve(header.Size);
-                    }
-                    break;
-                }
-
-                if (!QuotaAcquire(1, header.Size))
-                    return false;
-
-                if (!MessageRead(MakeArrayRef(ReaderData.Buffer.Data() + ReaderData.Offset, header.Size), now)) {
-                    return false;
-                }
-
-                ReaderData.Offset += header.Size;
-            }
+        bool TRemoteConnection::ReaderProcessBuffer() { 
+            TInstant now = TInstant::Now(); 
 
-            ReaderData.Buffer.ChopHead(ReaderData.Offset);
-            ReaderData.Offset = 0;
+            for (;;) { 
+                if (!ReaderData.HasBytesInBuf(sizeof(TBusHeader))) { 
+                    break; 
+                } 
 
-            if (ReaderData.Buffer.Capacity() > MaxBufferSize && ReaderData.Buffer.Size() <= MaxBufferSize) {
-                ReaderData.Status.Incremental.BufferDrops += 1;
+                TBusHeader header(MakeArrayRef(ReaderData.Buffer.Data() + ReaderData.Offset, ReaderData.Buffer.Size() - ReaderData.Offset)); 
 
-                TBuffer temp;
-                // probably should use another constant
-                temp.Reserve(Config.DefaultBufferSize);
-                temp.Append(ReaderData.Buffer.Data(), ReaderData.Buffer.Size());
+                if (header.Size < sizeof(TBusHeader)) { 
+                    LWPROBE(Error, ToString(MESSAGE_HEADER_CORRUPTED), ToString(PeerAddr), ToString(header.Size)); 
+                    ReaderData.Status.Incremental.StatusCounter[MESSAGE_HEADER_CORRUPTED] += 1; 
+                    ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_HEADER_CORRUPTED, false); 
+                    return false; 
+                } 
 
-                ReaderData.Buffer.Swap(temp);
-            }
+                if (!IsVersionNegotiation(header) && !IsBusKeyValid(header.Id)) { 
+                    LWPROBE(Error, ToString(MESSAGE_HEADER_CORRUPTED), ToString(PeerAddr), ToString(header.Size)); 
+                    ReaderData.Status.Incremental.StatusCounter[MESSAGE_HEADER_CORRUPTED] += 1; 
+                    ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_HEADER_CORRUPTED, false); 
+                    return false; 
+                } 
 
-            return true;
-        }
+                if (header.Size > Config.MaxMessageSize) { 
+                    LWPROBE(Error, ToString(MESSAGE_MESSAGE_TOO_LARGE), ToString(PeerAddr), ToString(header.Size)); 
+                    ReaderData.Status.Incremental.StatusCounter[MESSAGE_MESSAGE_TOO_LARGE] += 1; 
+                    ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_MESSAGE_TOO_LARGE, false); 
+                    return false; 
+                } 
 
-        bool TRemoteConnection::ReaderFillBuffer() {
-            if (!ReaderData.BufferMore())
-                return true;
+                if (!ReaderData.HasBytesInBuf(header.Size)) { 
+                    if (ReaderData.Offset == 0) { 
+                        ReaderData.Buffer.Reserve(header.Size); 
+                    } 
+                    break; 
+                } 
 
-            if (ReaderData.Buffer.Avail() == 0) {
-                if (ReaderData.Buffer.Size() == 0) {
-                    ReaderData.Buffer.Reserve(Config.DefaultBufferSize);
-                } else {
-                    ReaderData.Buffer.Reserve(ReaderData.Buffer.Size() * 2);
-                }
-            }
+                if (!QuotaAcquire(1, header.Size)) 
+                    return false; 
 
-            Y_ASSERT(ReaderData.Buffer.Avail() > 0);
+                if (!MessageRead(MakeArrayRef(ReaderData.Buffer.Data() + ReaderData.Offset, header.Size), now)) { 
+                    return false; 
+                } 
 
-            ssize_t bytes;
-            {
-                TWhatThreadDoesPushPop pp("recv syscall");
-                bytes = SocketRecv(ReaderData.Channel->GetSocket(), TArrayRef<char>(ReaderData.Buffer.Pos(), ReaderData.Buffer.Avail()));
-            }
+                ReaderData.Offset += header.Size; 
+            } 
 
-            if (bytes < 0) {
-                if (WouldBlock()) {
-                    ReaderData.Channel->EnableRead();
-                    return false;
-                } else {
-                    ReaderData.Channel->DisableRead();
-                    ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_DELIVERY_FAILED, false);
-                    return false;
-                }
-            }
+            ReaderData.Buffer.ChopHead(ReaderData.Offset); 
+            ReaderData.Offset = 0; 
 
-            if (bytes == 0) {
-                ReaderData.Channel->DisableRead();
-                // TODO: incorrect: it is possible that only input is shutdown, and output is available
-                ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_DELIVERY_FAILED, false);
-                return false;
-            }
+            if (ReaderData.Buffer.Capacity() > MaxBufferSize && ReaderData.Buffer.Size() <= MaxBufferSize) { 
+                ReaderData.Status.Incremental.BufferDrops += 1; 
 
-            ReaderData.Status.Incremental.NetworkOps += 1;
+                TBuffer temp; 
+                // probably should use another constant 
+                temp.Reserve(Config.DefaultBufferSize); 
+                temp.Append(ReaderData.Buffer.Data(), ReaderData.Buffer.Size()); 
 
-            ReaderData.Buffer.Advance(bytes);
-            ReaderData.MoreBytes = 0;
-            return true;
-        }
+                ReaderData.Buffer.Swap(temp); 
+            } 
 
-        void TRemoteConnection::ClearBeforeSendQueue(EMessageStatus reason) {
-            BeforeSendQueue.DequeueAll(std::bind(&TRemoteConnection::WriterBeforeWriteErrorMessage, this, std::placeholders::_1, reason));
+            return true; 
         }
 
-        void TRemoteConnection::ClearReplyQueue(EMessageStatus reason) {
-            TVectorSwaps<TBusMessagePtrAndHeader> replyQueueTemp;
-            Y_ASSERT(replyQueueTemp.empty());
-            ReplyQueue.DequeueAllSingleConsumer(&replyQueueTemp);
+        bool TRemoteConnection::ReaderFillBuffer() { 
+            if (!ReaderData.BufferMore()) 
+                return true; 
 
-            TVector<TBusMessage*> messages;
-            for (TVectorSwaps<TBusMessagePtrAndHeader>::reverse_iterator message = replyQueueTemp.rbegin();
-                 message != replyQueueTemp.rend(); ++message) {
-                messages.push_back(message->MessagePtr.Release());
+            if (ReaderData.Buffer.Avail() == 0) { 
+                if (ReaderData.Buffer.Size() == 0) { 
+                    ReaderData.Buffer.Reserve(Config.DefaultBufferSize); 
+                } else { 
+                    ReaderData.Buffer.Reserve(ReaderData.Buffer.Size() * 2); 
+                } 
             }
 
-            WriterErrorMessages(messages, reason);
+            Y_ASSERT(ReaderData.Buffer.Avail() > 0); 
 
-            replyQueueTemp.clear();
-        }
+            ssize_t bytes; 
+            { 
+                TWhatThreadDoesPushPop pp("recv syscall"); 
+                bytes = SocketRecv(ReaderData.Channel->GetSocket(), TArrayRef<char>(ReaderData.Buffer.Pos(), ReaderData.Buffer.Avail()));
+            } 
+
+            if (bytes < 0) { 
+                if (WouldBlock()) { 
+                    ReaderData.Channel->EnableRead(); 
+                    return false; 
+                } else { 
+                    ReaderData.Channel->DisableRead(); 
+                    ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_DELIVERY_FAILED, false); 
+                    return false; 
+                } 
+            } 
+
+            if (bytes == 0) { 
+                ReaderData.Channel->DisableRead(); 
+                // TODO: incorrect: it is possible that only input is shutdown, and output is available 
+                ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_DELIVERY_FAILED, false); 
+                return false; 
+            } 
+
+            ReaderData.Status.Incremental.NetworkOps += 1; 
+
+            ReaderData.Buffer.Advance(bytes); 
+            ReaderData.MoreBytes = 0; 
+            return true; 
+        } 
+
+        void TRemoteConnection::ClearBeforeSendQueue(EMessageStatus reason) { 
+            BeforeSendQueue.DequeueAll(std::bind(&TRemoteConnection::WriterBeforeWriteErrorMessage, this, std::placeholders::_1, reason)); 
+        } 
+
+        void TRemoteConnection::ClearReplyQueue(EMessageStatus reason) { 
+            TVectorSwaps<TBusMessagePtrAndHeader> replyQueueTemp; 
+            Y_ASSERT(replyQueueTemp.empty()); 
+            ReplyQueue.DequeueAllSingleConsumer(&replyQueueTemp); 
+
+            TVector<TBusMessage*> messages; 
+            for (TVectorSwaps<TBusMessagePtrAndHeader>::reverse_iterator message = replyQueueTemp.rbegin(); 
+                 message != replyQueueTemp.rend(); ++message) { 
+                messages.push_back(message->MessagePtr.Release()); 
+            } 
+
+            WriterErrorMessages(messages, reason); 
+ 
+            replyQueueTemp.clear(); 
+        }
+
+        void TRemoteConnection::ProcessBeforeSendQueueMessage(TBusMessage* message, TInstant now) { 
+            // legacy clients expect this field to be set 
+            if (!Session->IsSource_) { 
+                message->SendTime = now.MilliSeconds(); 
+            } 
+
+            WriterData.SendQueue.PushBack(message); 
+        } 
+
+        void TRemoteConnection::ProcessBeforeSendQueue(TInstant now) { 
+            BeforeSendQueue.DequeueAll(std::bind(&TRemoteConnection::ProcessBeforeSendQueueMessage, this, std::placeholders::_1, now)); 
+        }
+
+        void TRemoteConnection::WriterFillInFlight() { 
+            // this is hack for TLoadBalancedProtocol 
+            WriterFillStatus(); 
+            AtomicSet(WriterData.InFlight, WriterData.Status.GetInFlight()); 
+        } 
+
+        const TRemoteConnectionWriterStatus& TRemoteConnection::WriterGetStatus() { 
+            WriterRotateCounters(); 
+            WriterFillStatus(); 
+
+            return WriterData.Status; 
+        } 
+
+        void TRemoteConnection::WriterFillStatus() { 
+            if (!!WriterData.Channel) { 
+                WriterData.Status.Fd = WriterData.Channel->GetSocket(); 
+            } else { 
+                WriterData.Status.Fd = INVALID_SOCKET; 
+            } 
+            WriterData.Status.BufferSize = WriterData.Buffer.Capacity(); 
+            WriterData.Status.SendQueueSize = WriterData.SendQueue.Size(); 
+            WriterData.Status.State = WriterData.State; 
+        } 
+
+        void TRemoteConnection::WriterProcessStatusDown() { 
+            Session->GetDeadConnectionWriterStatusQueue()->EnqueueAndSchedule(WriterData.Status.Incremental); 
+            Reset(WriterData.Status.Incremental); 
+        } 
+
+        void TRemoteConnection::ReaderProcessStatusDown() { 
+            Session->GetDeadConnectionReaderStatusQueue()->EnqueueAndSchedule(ReaderData.Status.Incremental); 
+            Reset(ReaderData.Status.Incremental); 
+        } 
+
+        void TRemoteConnection::ProcessWriterDown() { 
+            if (!RemovedFromSession) { 
+                Session->GetRemoveConnectionQueue()->EnqueueAndSchedule(this); 
+
+                if (Session->IsSource_) { 
+                    if (WriterData.Status.Connected) { 
+                        FireClientConnectionEvent(TClientConnectionEvent::DISCONNECTED); 
+                    } 
+                } 
+
+                LWPROBE(Disconnected, ToString(PeerAddr)); 
+                RemovedFromSession = true; 
+            } 
+
+            WriterData.DropChannel(); 
 
-        void TRemoteConnection::ProcessBeforeSendQueueMessage(TBusMessage* message, TInstant now) {
-            // legacy clients expect this field to be set
-            if (!Session->IsSource_) {
-                message->SendTime = now.MilliSeconds();
-            }
+            DropEnqueuedData(ShutdownReason, MESSAGE_SHUTDOWN); 
 
-            WriterData.SendQueue.PushBack(message);
-        }
+            WriterProcessStatusDown(); 
 
-        void TRemoteConnection::ProcessBeforeSendQueue(TInstant now) {
-            BeforeSendQueue.DequeueAll(std::bind(&TRemoteConnection::ProcessBeforeSendQueueMessage, this, std::placeholders::_1, now));
-        }
+            WriterData.ShutdownComplete.Signal(); 
+        } 
 
-        void TRemoteConnection::WriterFillInFlight() {
-            // this is hack for TLoadBalancedProtocol
-            WriterFillStatus();
-            AtomicSet(WriterData.InFlight, WriterData.Status.GetInFlight());
-        }
+        void TRemoteConnection::DropEnqueuedData(EMessageStatus reason, EMessageStatus reasonForQueues) { 
+            ClearReplyQueue(reasonForQueues); 
+            ClearBeforeSendQueue(reasonForQueues); 
+            WriterGetReconnectQueue()->Clear(); 
+            WriterGetWakeQueue()->Clear(); 
 
-        const TRemoteConnectionWriterStatus& TRemoteConnection::WriterGetStatus() {
-            WriterRotateCounters();
-            WriterFillStatus();
+            TMessagesPtrs cleared; 
+            ClearOutgoingQueue(cleared, false); 
 
-            return WriterData.Status;
-        }
+            if (!Session->IsSource_) { 
+                for (auto& i : cleared) { 
+                    TBusMessagePtrAndHeader h(i); 
+                    CheckedCast<TRemoteServerSession*>(Session.Get())->ReleaseInWorkResponses(MakeArrayRef(&h, 1)); 
+                    // assignment back is weird 
+                    i = h.MessagePtr.Release(); 
+                    // and this part is not batch 
+                } 
+            } 
 
-        void TRemoteConnection::WriterFillStatus() {
-            if (!!WriterData.Channel) {
-                WriterData.Status.Fd = WriterData.Channel->GetSocket();
-            } else {
-                WriterData.Status.Fd = INVALID_SOCKET;
-            }
-            WriterData.Status.BufferSize = WriterData.Buffer.Capacity();
-            WriterData.Status.SendQueueSize = WriterData.SendQueue.Size();
-            WriterData.Status.State = WriterData.State;
-        }
+            WriterErrorMessages(cleared, reason); 
+        } 
 
-        void TRemoteConnection::WriterProcessStatusDown() {
-            Session->GetDeadConnectionWriterStatusQueue()->EnqueueAndSchedule(WriterData.Status.Incremental);
-            Reset(WriterData.Status.Incremental);
-        }
+        void TRemoteConnection::BeforeTryWrite() { 
+        } 
 
-        void TRemoteConnection::ReaderProcessStatusDown() {
-            Session->GetDeadConnectionReaderStatusQueue()->EnqueueAndSchedule(ReaderData.Status.Incremental);
-            Reset(ReaderData.Status.Incremental);
-        }
+        void TRemoteConnection::Act(TWriterTag) { 
+            TInstant now = TInstant::Now(); 
 
-        void TRemoteConnection::ProcessWriterDown() {
-            if (!RemovedFromSession) {
-                Session->GetRemoveConnectionQueue()->EnqueueAndSchedule(this);
+            WriterData.Status.Acts += 1; 
 
-                if (Session->IsSource_) {
-                    if (WriterData.Status.Connected) {
-                        FireClientConnectionEvent(TClientConnectionEvent::DISCONNECTED);
-                    }
-                }
-
-                LWPROBE(Disconnected, ToString(PeerAddr));
-                RemovedFromSession = true;
+            if (Y_UNLIKELY(AtomicGet(WriterData.Down))) { 
+                // dump status must work even if WriterDown 
+                WriterSendStatus(now, true); 
+                ProcessWriterDown(); 
+                return; 
             }
 
-            WriterData.DropChannel();
+            ProcessBeforeSendQueue(now); 
 
-            DropEnqueuedData(ShutdownReason, MESSAGE_SHUTDOWN);
+            BeforeTryWrite(); 
 
-            WriterProcessStatusDown();
+            WriterFillInFlight(); 
 
-            WriterData.ShutdownComplete.Signal();
-        }
+            WriterGetReconnectQueue()->DequeueAllLikelyEmpty(); 
 
-        void TRemoteConnection::DropEnqueuedData(EMessageStatus reason, EMessageStatus reasonForQueues) {
-            ClearReplyQueue(reasonForQueues);
-            ClearBeforeSendQueue(reasonForQueues);
-            WriterGetReconnectQueue()->Clear();
-            WriterGetWakeQueue()->Clear();
-
-            TMessagesPtrs cleared;
-            ClearOutgoingQueue(cleared, false);
-
-            if (!Session->IsSource_) {
-                for (auto& i : cleared) {
-                    TBusMessagePtrAndHeader h(i);
-                    CheckedCast<TRemoteServerSession*>(Session.Get())->ReleaseInWorkResponses(MakeArrayRef(&h, 1));
-                    // assignment back is weird
-                    i = h.MessagePtr.Release();
-                    // and this part is not batch
-                }
-            }
+            if (!WriterData.Status.Connected) { 
+                TryConnect(); 
+            } else { 
+                for (int i = 0;; ++i) { 
+                    if (i == 100) { 
+                        // perform other tasks 
+                        GetWriterActor()->AddTaskFromActorLoop(); 
+                        break; 
+                    } 
 
-            WriterErrorMessages(cleared, reason);
-        }
+                    if (WriterData.State == WRITER_FILLING) { 
+                        WriterFillBuffer(); 
 
-        void TRemoteConnection::BeforeTryWrite() {
-        }
-
-        void TRemoteConnection::Act(TWriterTag) {
-            TInstant now = TInstant::Now();
-
-            WriterData.Status.Acts += 1;
-
-            if (Y_UNLIKELY(AtomicGet(WriterData.Down))) {
-                // dump status must work even if WriterDown
-                WriterSendStatus(now, true);
-                ProcessWriterDown();
-                return;
-            }
+                        if (WriterData.State == WRITER_FILLING) { 
+                            WriterData.Channel->DisableWrite(); 
+                            break; 
+                        } 
 
-            ProcessBeforeSendQueue(now);
+                        Y_ASSERT(!WriterData.Buffer.Empty()); 
+                    } 
 
-            BeforeTryWrite();
+                    if (WriterData.State == WRITER_FLUSHING) { 
+                        WriterFlushBuffer(); 
 
-            WriterFillInFlight();
+                        if (WriterData.State == WRITER_FLUSHING) { 
+                            break; 
+                        } 
+                    } 
+                } 
+            } 
 
-            WriterGetReconnectQueue()->DequeueAllLikelyEmpty();
+            WriterGetWakeQueue()->DequeueAllLikelyEmpty(); 
 
-            if (!WriterData.Status.Connected) {
-                TryConnect();
-            } else {
-                for (int i = 0;; ++i) {
-                    if (i == 100) {
-                        // perform other tasks
-                        GetWriterActor()->AddTaskFromActorLoop();
-                        break;
-                    }
+            WriterSendStatus(now); 
+        } 
 
-                    if (WriterData.State == WRITER_FILLING) {
-                        WriterFillBuffer();
-
-                        if (WriterData.State == WRITER_FILLING) {
-                            WriterData.Channel->DisableWrite();
-                            break;
-                        }
-
-                        Y_ASSERT(!WriterData.Buffer.Empty());
-                    }
-
-                    if (WriterData.State == WRITER_FLUSHING) {
-                        WriterFlushBuffer();
-
-                        if (WriterData.State == WRITER_FLUSHING) {
-                            break;
-                        }
-                    }
-                }
-            }
+        void TRemoteConnection::WriterFlushBuffer() { 
+            Y_ASSERT(WriterData.State == WRITER_FLUSHING); 
+            Y_ASSERT(!WriterData.Buffer.Empty()); 
 
-            WriterGetWakeQueue()->DequeueAllLikelyEmpty();
+            WriterData.CorkUntil = TInstant::Zero(); 
 
-            WriterSendStatus(now);
-        }
+            while (!WriterData.Buffer.Empty()) { 
+                ssize_t bytes; 
+                { 
+                    TWhatThreadDoesPushPop pp("send syscall"); 
+                    bytes = SocketSend(WriterData.Channel->GetSocket(), TArrayRef<const char>(WriterData.Buffer.LeftPos(), WriterData.Buffer.Size()));
+                } 
 
-        void TRemoteConnection::WriterFlushBuffer() {
-            Y_ASSERT(WriterData.State == WRITER_FLUSHING);
-            Y_ASSERT(!WriterData.Buffer.Empty());
+                if (bytes < 0) { 
+                    if (WouldBlock()) { 
+                        WriterData.Channel->EnableWrite(); 
+                        return; 
+                    } else { 
+                        WriterData.Channel->DisableWrite(); 
+                        ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_DELIVERY_FAILED, true); 
+                        return; 
+                    } 
+                } 
 
-            WriterData.CorkUntil = TInstant::Zero();
+                WriterData.Status.Incremental.NetworkOps += 1; 
 
-            while (!WriterData.Buffer.Empty()) {
-                ssize_t bytes;
-                {
-                    TWhatThreadDoesPushPop pp("send syscall");
-                    bytes = SocketSend(WriterData.Channel->GetSocket(), TArrayRef<const char>(WriterData.Buffer.LeftPos(), WriterData.Buffer.Size()));
-                }
-
-                if (bytes < 0) {
-                    if (WouldBlock()) {
-                        WriterData.Channel->EnableWrite();
-                        return;
-                    } else {
-                        WriterData.Channel->DisableWrite();
-                        ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_DELIVERY_FAILED, true);
-                        return;
-                    }
-                }
-
-                WriterData.Status.Incremental.NetworkOps += 1;
-
-                WriterData.Buffer.LeftProceed(bytes);
+                WriterData.Buffer.LeftProceed(bytes); 
             }
 
-            WriterData.Buffer.Clear();
-            if (WriterData.Buffer.Capacity() > MaxBufferSize) {
-                WriterData.Status.Incremental.BufferDrops += 1;
-                WriterData.Buffer.Reset();
+            WriterData.Buffer.Clear(); 
+            if (WriterData.Buffer.Capacity() > MaxBufferSize) { 
+                WriterData.Status.Incremental.BufferDrops += 1; 
+                WriterData.Buffer.Reset(); 
             }
 
-            WriterData.State = WRITER_FILLING;
-        }
+            WriterData.State = WRITER_FILLING; 
+        } 
 
-        void TRemoteConnection::ScheduleShutdownOnServerOrReconnectOnClient(EMessageStatus status, bool writer) {
-            if (Session->IsSource_) {
-                WriterGetReconnectQueue()->EnqueueAndSchedule(writer ? WriterData.SocketVersion : ReaderData.SocketVersion);
-            } else {
-                ScheduleShutdown(status);
+        void TRemoteConnection::ScheduleShutdownOnServerOrReconnectOnClient(EMessageStatus status, bool writer) { 
+            if (Session->IsSource_) { 
+                WriterGetReconnectQueue()->EnqueueAndSchedule(writer ? WriterData.SocketVersion : ReaderData.SocketVersion); 
+            } else { 
+                ScheduleShutdown(status); 
             }
         }
 
-        void TRemoteConnection::ScheduleShutdown(EMessageStatus status) {
-            ShutdownReason = status;
+        void TRemoteConnection::ScheduleShutdown(EMessageStatus status) { 
+            ShutdownReason = status; 
 
-            AtomicSet(ReaderData.Down, 1);
-            ScheduleRead();
+            AtomicSet(ReaderData.Down, 1); 
+            ScheduleRead(); 
 
-            AtomicSet(WriterData.Down, 1);
-            ScheduleWrite();
-        }
+            AtomicSet(WriterData.Down, 1); 
+            ScheduleWrite(); 
+        } 
 
-        void TRemoteConnection::CallSerialize(TBusMessage* msg, TBuffer& buffer) const {
-            size_t posForAssertion = buffer.Size();
-            Proto->Serialize(msg, buffer);
-            Y_VERIFY(buffer.Size() >= posForAssertion,
-                     "incorrect Serialize implementation, pos before serialize: %d, pos after serialize: %d",
-                     int(posForAssertion), int(buffer.Size()));
+        void TRemoteConnection::CallSerialize(TBusMessage* msg, TBuffer& buffer) const { 
+            size_t posForAssertion = buffer.Size(); 
+            Proto->Serialize(msg, buffer); 
+            Y_VERIFY(buffer.Size() >= posForAssertion, 
+                     "incorrect Serialize implementation, pos before serialize: %d, pos after serialize: %d", 
+                     int(posForAssertion), int(buffer.Size())); 
         }
 
-        namespace {
+        namespace { 
             inline void WriteHeader(const TBusHeader& header, TBuffer& data) {
-                data.Reserve(data.Size() + sizeof(TBusHeader));
-                /// \todo hton instead of memcpy
-                memcpy(data.Data() + data.Size(), &header, sizeof(TBusHeader));
-                data.Advance(sizeof(TBusHeader));
+                data.Reserve(data.Size() + sizeof(TBusHeader)); 
+                /// \todo hton instead of memcpy 
+                memcpy(data.Data() + data.Size(), &header, sizeof(TBusHeader)); 
+                data.Advance(sizeof(TBusHeader)); 
             }
 
             inline void WriteDummyHeader(TBuffer& data) {
-                data.Resize(data.Size() + sizeof(TBusHeader));
-            }
+                data.Resize(data.Size() + sizeof(TBusHeader)); 
+            } 
 
-        }
+        } 
 
-        void TRemoteConnection::SerializeMessage(TBusMessage* msg, TBuffer* data, TMessageCounter* counter) const {
-            size_t pos = data->Size();
+        void TRemoteConnection::SerializeMessage(TBusMessage* msg, TBuffer* data, TMessageCounter* counter) const { 
+            size_t pos = data->Size(); 
 
-            size_t dataSize;
+            size_t dataSize; 
 
-            bool compressionRequested = msg->IsCompressed();
+            bool compressionRequested = msg->IsCompressed(); 
 
-            if (compressionRequested) {
-                TBuffer compdata;
-                TBuffer plaindata;
-                CallSerialize(msg, plaindata);
+            if (compressionRequested) { 
+                TBuffer compdata; 
+                TBuffer plaindata; 
+                CallSerialize(msg, plaindata); 
 
-                dataSize = sizeof(TBusHeader) + plaindata.Size();
+                dataSize = sizeof(TBusHeader) + plaindata.Size(); 
 
-                NCodecs::TCodecPtr c = Proto->GetTransportCodec();
+                NCodecs::TCodecPtr c = Proto->GetTransportCodec(); 
                 c->Encode(TStringBuf{plaindata.data(), plaindata.size()}, compdata);
 
-                if (compdata.Size() < plaindata.Size()) {
-                    plaindata.Clear();
-                    msg->GetHeader()->Size = sizeof(TBusHeader) + compdata.Size();
-                    WriteHeader(*msg->GetHeader(), *data);
-                    data->Append(compdata.Data(), compdata.Size());
-                } else {
-                    compdata.Clear();
-                    msg->SetCompressed(false);
-                    msg->GetHeader()->Size = sizeof(TBusHeader) + plaindata.Size();
-                    WriteHeader(*msg->GetHeader(), *data);
-                    data->Append(plaindata.Data(), plaindata.Size());
-                }
-            } else {
-                WriteDummyHeader(*data);
-                CallSerialize(msg, *data);
-
-                dataSize = msg->GetHeader()->Size = data->Size() - pos;
-
-                data->Proceed(pos);
-                WriteHeader(*msg->GetHeader(), *data);
-                data->Proceed(pos + msg->GetHeader()->Size);
-            }
+                if (compdata.Size() < plaindata.Size()) { 
+                    plaindata.Clear(); 
+                    msg->GetHeader()->Size = sizeof(TBusHeader) + compdata.Size(); 
+                    WriteHeader(*msg->GetHeader(), *data); 
+                    data->Append(compdata.Data(), compdata.Size()); 
+                } else { 
+                    compdata.Clear(); 
+                    msg->SetCompressed(false); 
+                    msg->GetHeader()->Size = sizeof(TBusHeader) + plaindata.Size(); 
+                    WriteHeader(*msg->GetHeader(), *data); 
+                    data->Append(plaindata.Data(), plaindata.Size()); 
+                } 
+            } else { 
+                WriteDummyHeader(*data); 
+                CallSerialize(msg, *data); 
+
+                dataSize = msg->GetHeader()->Size = data->Size() - pos; 
+
+                data->Proceed(pos); 
+                WriteHeader(*msg->GetHeader(), *data); 
+                data->Proceed(pos + msg->GetHeader()->Size); 
+            } 
+
+            Y_ASSERT(msg->GetHeader()->Size == data->Size() - pos); 
+            counter->AddMessage(dataSize, data->Size() - pos, msg->IsCompressed(), compressionRequested); 
+        } 
+
+        TBusMessage* TRemoteConnection::DeserializeMessage(TArrayRef<const char> dataRef, const TBusHeader* header, TMessageCounter* messageCounter, EMessageStatus* status) const { 
+            size_t dataSize; 
+
+            TBusMessage* message; 
+            if (header->FlagsInternal & MESSAGE_COMPRESS_INTERNAL) { 
+                TBuffer msg; 
+                { 
+                    TBuffer plaindata; 
+                    NCodecs::TCodecPtr c = Proto->GetTransportCodec(); 
+                    try { 
+                        TArrayRef<const char> payload = TBusMessage::GetPayload(dataRef); 
+                        c->Decode(TStringBuf{payload.data(), payload.size()}, plaindata); 
+                    } catch (...) { 
+                        // catch all, because 
+                        // http://nga.at.yandex-team.ru/replies.xml?item_no=3884 
+                        *status = MESSAGE_DECOMPRESS_ERROR; 
+                        return nullptr; 
+                    } 
+
+                    msg.Append(dataRef.data(), sizeof(TBusHeader)); 
+                    msg.Append(plaindata.Data(), plaindata.Size()); 
+                } 
+                TArrayRef<const char> msgRef(msg.Data(), msg.Size()); 
+                dataSize = sizeof(TBusHeader) + msgRef.size(); 
+                // TODO: track error types 
+                message = Proto->Deserialize(header->Type, msgRef.Slice(sizeof(TBusHeader))).Release(); 
+                if (!message) { 
+                    *status = MESSAGE_DESERIALIZE_ERROR; 
+                    return nullptr; 
+                } 
+                *message->GetHeader() = *header; 
+                message->SetCompressed(true); 
+            } else { 
+                dataSize = dataRef.size(); 
+                message = Proto->Deserialize(header->Type, dataRef.Slice(sizeof(TBusHeader))).Release(); 
+                if (!message) { 
+                    *status = MESSAGE_DESERIALIZE_ERROR; 
+                    return nullptr; 
+                } 
+                *message->GetHeader() = *header; 
+            } 
+
+            messageCounter->AddMessage(dataSize, dataRef.size(), header->FlagsInternal & MESSAGE_COMPRESS_INTERNAL, false); 
+
+            return message; 
+        } 
+
+        void TRemoteConnection::ResetOneWayFlag(TArrayRef<TBusMessage*> messages) { 
+            for (auto message : messages) { 
+                message->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL; 
+            } 
+        }
+
+        void TRemoteConnection::ReaderFlushMessages() { 
+            if (!ReaderData.ReadMessages.empty()) { 
+                Session->OnMessageReceived(this, ReaderData.ReadMessages); 
+                ReaderData.ReadMessages.clear(); 
+            }
+        }
+
+        // @return false if actor should break 
+        bool TRemoteConnection::MessageRead(TArrayRef<const char> readDataRef, TInstant now) { 
+            TBusHeader header(readDataRef); 
+
+            Y_ASSERT(readDataRef.size() == header.Size); 
+
+            if (header.GetVersionInternal() != YBUS_VERSION) { 
+                ReaderProcessMessageUnknownVersion(readDataRef); 
+                return true; 
+            } 
+
+            EMessageStatus deserializeFailureStatus = MESSAGE_OK; 
+            TBusMessage* r = DeserializeMessage(readDataRef, &header, &ReaderData.Status.Incremental.MessageCounter, &deserializeFailureStatus); 
+
+            if (!r) { 
+                Y_VERIFY(deserializeFailureStatus != MESSAGE_OK, "state check"); 
+                LWPROBE(Error, ToString(deserializeFailureStatus), ToString(PeerAddr), ""); 
+                ReaderData.Status.Incremental.StatusCounter[deserializeFailureStatus] += 1; 
+                ScheduleShutdownOnServerOrReconnectOnClient(deserializeFailureStatus, false); 
+                return false; 
+            } 
+
+            LWPROBE(Read, r->GetHeader()->Size); 
+
+            r->ReplyTo = PeerAddrSocketAddr; 
+
+            TBusMessagePtrAndHeader h(r); 
+            r->RecvTime = now; 
+
+            QuotaConsume(1, header.Size); 
+
+            ReaderData.ReadMessages.push_back(h); 
+            if (ReaderData.ReadMessages.size() >= 100) { 
+                ReaderFlushMessages(); 
+            } 
+
+            return true; 
+        } 
+
+        void TRemoteConnection::WriterFillBuffer() { 
+            Y_ASSERT(WriterData.State == WRITER_FILLING); 
+
+            Y_ASSERT(WriterData.Buffer.LeftSize() == 0); 
+
+            if (Y_UNLIKELY(!WrongVersionRequests.IsEmpty())) { 
+                TVector<TBusHeader> headers; 
+                WrongVersionRequests.DequeueAllSingleConsumer(&headers); 
+                for (TVector<TBusHeader>::reverse_iterator header = headers.rbegin(); 
+                     header != headers.rend(); ++header) { 
+                    TBusHeader response = *header; 
+                    response.SendTime = NBus::Now(); 
+                    response.Size = sizeof(TBusHeader); 
+                    response.FlagsInternal = 0; 
+                    response.SetVersionInternal(YBUS_VERSION); 
+                    WriteHeader(response, WriterData.Buffer.GetBuffer()); 
+                } 
+
+                Y_ASSERT(!WriterData.Buffer.Empty()); 
+                WriterData.State = WRITER_FLUSHING; 
+                return; 
+            } 
+
+            TTempTlsVector<TBusMessagePtrAndHeader, void, TVectorSwaps> writeMessages; 
+
+            for (;;) { 
+                THolder<TBusMessage> writeMessage(WriterData.SendQueue.PopFront()); 
+                if (!writeMessage) { 
+                    break; 
+                } 
+
+                if (Config.Cork != TDuration::Zero()) { 
+                    if (WriterData.CorkUntil == TInstant::Zero()) { 
+                        WriterData.CorkUntil = TInstant::Now() + Config.Cork; 
+                    } 
+                } 
 
-            Y_ASSERT(msg->GetHeader()->Size == data->Size() - pos);
-            counter->AddMessage(dataSize, data->Size() - pos, msg->IsCompressed(), compressionRequested);
-        }
+                size_t sizeBeforeSerialize = WriterData.Buffer.Size(); 
 
-        TBusMessage* TRemoteConnection::DeserializeMessage(TArrayRef<const char> dataRef, const TBusHeader* header, TMessageCounter* messageCounter, EMessageStatus* status) const {
-            size_t dataSize;
-
-            TBusMessage* message;
-            if (header->FlagsInternal & MESSAGE_COMPRESS_INTERNAL) {
-                TBuffer msg;
-                {
-                    TBuffer plaindata;
-                    NCodecs::TCodecPtr c = Proto->GetTransportCodec();
-                    try {
-                        TArrayRef<const char> payload = TBusMessage::GetPayload(dataRef);
-                        c->Decode(TStringBuf{payload.data(), payload.size()}, plaindata);
-                    } catch (...) {
-                        // catch all, because
-                        // http://nga.at.yandex-team.ru/replies.xml?item_no=3884
-                        *status = MESSAGE_DECOMPRESS_ERROR;
-                        return nullptr;
-                    }
-
-                    msg.Append(dataRef.data(), sizeof(TBusHeader));
-                    msg.Append(plaindata.Data(), plaindata.Size());
-                }
-                TArrayRef<const char> msgRef(msg.Data(), msg.Size());
-                dataSize = sizeof(TBusHeader) + msgRef.size();
-                // TODO: track error types
-                message = Proto->Deserialize(header->Type, msgRef.Slice(sizeof(TBusHeader))).Release();
-                if (!message) {
-                    *status = MESSAGE_DESERIALIZE_ERROR;
-                    return nullptr;
-                }
-                *message->GetHeader() = *header;
-                message->SetCompressed(true);
-            } else {
-                dataSize = dataRef.size();
-                message = Proto->Deserialize(header->Type, dataRef.Slice(sizeof(TBusHeader))).Release();
-                if (!message) {
-                    *status = MESSAGE_DESERIALIZE_ERROR;
-                    return nullptr;
-                }
-                *message->GetHeader() = *header;
-            }
+                TMessageCounter messageCounter = WriterData.Status.Incremental.MessageCounter; 
 
-            messageCounter->AddMessage(dataSize, dataRef.size(), header->FlagsInternal & MESSAGE_COMPRESS_INTERNAL, false);
+                SerializeMessage(writeMessage.Get(), &WriterData.Buffer.GetBuffer(), &messageCounter); 
 
-            return message;
-        }
-
-        void TRemoteConnection::ResetOneWayFlag(TArrayRef<TBusMessage*> messages) {
-            for (auto message : messages) {
-                message->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL;
-            }
-        }
-
-        void TRemoteConnection::ReaderFlushMessages() {
-            if (!ReaderData.ReadMessages.empty()) {
-                Session->OnMessageReceived(this, ReaderData.ReadMessages);
-                ReaderData.ReadMessages.clear();
-            }
-        }
-
-        // @return false if actor should break
-        bool TRemoteConnection::MessageRead(TArrayRef<const char> readDataRef, TInstant now) {
-            TBusHeader header(readDataRef);
+                size_t written = WriterData.Buffer.Size() - sizeBeforeSerialize; 
+                if (written > Config.MaxMessageSize) { 
+                    WriterData.Buffer.GetBuffer().EraseBack(written); 
+                    WriterBeforeWriteErrorMessage(writeMessage.Release(), MESSAGE_MESSAGE_TOO_LARGE); 
+                    continue; 
+                } 
 
-            Y_ASSERT(readDataRef.size() == header.Size);
-
-            if (header.GetVersionInternal() != YBUS_VERSION) {
-                ReaderProcessMessageUnknownVersion(readDataRef);
-                return true;
-            }
+                WriterData.Status.Incremental.MessageCounter = messageCounter; 
 
-            EMessageStatus deserializeFailureStatus = MESSAGE_OK;
-            TBusMessage* r = DeserializeMessage(readDataRef, &header, &ReaderData.Status.Incremental.MessageCounter, &deserializeFailureStatus);
+                TBusMessagePtrAndHeader h(writeMessage.Release()); 
+                writeMessages.GetVector()->push_back(h); 
 
-            if (!r) {
-                Y_VERIFY(deserializeFailureStatus != MESSAGE_OK, "state check");
-                LWPROBE(Error, ToString(deserializeFailureStatus), ToString(PeerAddr), "");
-                ReaderData.Status.Incremental.StatusCounter[deserializeFailureStatus] += 1;
-                ScheduleShutdownOnServerOrReconnectOnClient(deserializeFailureStatus, false);
-                return false;
+                Y_ASSERT(!WriterData.Buffer.Empty()); 
+                if (WriterData.Buffer.Size() >= Config.SendThreshold) { 
+                    break; 
+                } 
             }
-
-            LWPROBE(Read, r->GetHeader()->Size);
-
-            r->ReplyTo = PeerAddrSocketAddr;
-
-            TBusMessagePtrAndHeader h(r);
-            r->RecvTime = now;
-
-            QuotaConsume(1, header.Size);
-
-            ReaderData.ReadMessages.push_back(h);
-            if (ReaderData.ReadMessages.size() >= 100) {
-                ReaderFlushMessages();
-            }
-
-            return true;
-        }
-
-        void TRemoteConnection::WriterFillBuffer() {
-            Y_ASSERT(WriterData.State == WRITER_FILLING);
-
-            Y_ASSERT(WriterData.Buffer.LeftSize() == 0);
-
-            if (Y_UNLIKELY(!WrongVersionRequests.IsEmpty())) {
-                TVector<TBusHeader> headers;
-                WrongVersionRequests.DequeueAllSingleConsumer(&headers);
-                for (TVector<TBusHeader>::reverse_iterator header = headers.rbegin();
-                     header != headers.rend(); ++header) {
-                    TBusHeader response = *header;
-                    response.SendTime = NBus::Now();
-                    response.Size = sizeof(TBusHeader);
-                    response.FlagsInternal = 0;
-                    response.SetVersionInternal(YBUS_VERSION);
-                    WriteHeader(response, WriterData.Buffer.GetBuffer());
-                }
-
-                Y_ASSERT(!WriterData.Buffer.Empty());
-                WriterData.State = WRITER_FLUSHING;
-                return;
-            }
-
-            TTempTlsVector<TBusMessagePtrAndHeader, void, TVectorSwaps> writeMessages;
-
-            for (;;) {
-                THolder<TBusMessage> writeMessage(WriterData.SendQueue.PopFront());
-                if (!writeMessage) {
-                    break;
-                }
-
-                if (Config.Cork != TDuration::Zero()) {
-                    if (WriterData.CorkUntil == TInstant::Zero()) {
-                        WriterData.CorkUntil = TInstant::Now() + Config.Cork;
-                    }
-                }
-
-                size_t sizeBeforeSerialize = WriterData.Buffer.Size();
-
-                TMessageCounter messageCounter = WriterData.Status.Incremental.MessageCounter;
-
-                SerializeMessage(writeMessage.Get(), &WriterData.Buffer.GetBuffer(), &messageCounter);
-
-                size_t written = WriterData.Buffer.Size() - sizeBeforeSerialize;
-                if (written > Config.MaxMessageSize) {
-                    WriterData.Buffer.GetBuffer().EraseBack(written);
-                    WriterBeforeWriteErrorMessage(writeMessage.Release(), MESSAGE_MESSAGE_TOO_LARGE);
-                    continue;
-                }
-
-                WriterData.Status.Incremental.MessageCounter = messageCounter;
-
-                TBusMessagePtrAndHeader h(writeMessage.Release());
-                writeMessages.GetVector()->push_back(h);
-
-                Y_ASSERT(!WriterData.Buffer.Empty());
-                if (WriterData.Buffer.Size() >= Config.SendThreshold) {
-                    break;
-                }
-            }
-
-            if (!WriterData.Buffer.Empty()) {
-                if (WriterData.Buffer.Size() >= Config.SendThreshold) {
-                    WriterData.State = WRITER_FLUSHING;
-                } else if (WriterData.CorkUntil == TInstant::Zero()) {
-                    WriterData.State = WRITER_FLUSHING;
-                } else if (TInstant::Now() >= WriterData.CorkUntil) {
-                    WriterData.State = WRITER_FLUSHING;
-                } else {
-                    // keep filling
-                    Y_ASSERT(WriterData.State == WRITER_FILLING);
-                    GetWriterSchedulerActor()->ScheduleAt(WriterData.CorkUntil);
-                }
-            } else {
-                // keep filling
-                Y_ASSERT(WriterData.State == WRITER_FILLING);
-            }
-
-            size_t bytes = MessageSize(*writeMessages.GetVector());
-
-            QuotaReturnSelf(writeMessages.GetVector()->size(), bytes);
-
-            // This is called before `send` syscall inducing latency
-            MessageSent(*writeMessages.GetVector());
-        }
-
-        size_t TRemoteConnection::MessageSize(TArrayRef<TBusMessagePtrAndHeader> messages) {
-            size_t size = 0;
-            for (const auto& message : messages)
-                size += message.MessagePtr->RequestSize;
-
-            return size;
-        }
-
-        size_t TRemoteConnection::GetInFlight() {
-            return AtomicGet(WriterData.InFlight);
-        }
-
-        size_t TRemoteConnection::GetConnectSyscallsNumForTest() {
-            return WriterData.Status.ConnectSyscalls;
-        }
-
-        void TRemoteConnection::WriterBeforeWriteErrorMessage(TBusMessage* message, EMessageStatus status) {
-            if (Session->IsSource_) {
-                CheckedCast<TRemoteClientSession*>(Session.Get())->ReleaseInFlight({message});
-                WriterErrorMessage(message, status);
-            } else {
-                TBusMessagePtrAndHeader h(message);
-                CheckedCast<TRemoteServerSession*>(Session.Get())->ReleaseInWorkResponses(MakeArrayRef(&h, 1));
-                WriterErrorMessage(h.MessagePtr.Release(), status);
-            }
-        }
-
-        void TRemoteConnection::WriterErrorMessage(TNonDestroyingAutoPtr<TBusMessage> m, EMessageStatus status) {
-            TBusMessage* released = m.Release();
-            WriterErrorMessages(MakeArrayRef(&released, 1), status);
-        }
-
-        void TRemoteConnection::WriterErrorMessages(const TArrayRef<TBusMessage*> ms, EMessageStatus status) {
-            ResetOneWayFlag(ms);
-
-            WriterData.Status.Incremental.StatusCounter[status] += ms.size();
-            for (auto m : ms) {
-                Session->InvokeOnError(m, status);
-            }
-        }
-
-        void TRemoteConnection::FireClientConnectionEvent(TClientConnectionEvent::EType type) {
-            Y_VERIFY(Session->IsSource_, "state check");
-            TClientConnectionEvent event(type, ConnectionId, PeerAddr);
-            TRemoteClientSession* session = CheckedCast<TRemoteClientSession*>(Session.Get());
-            session->ClientHandler->OnClientConnectionEvent(event);
-        }
-
-        bool TRemoteConnection::IsAlive() const {
-            return !AtomicGet(WriterData.Down);
-        }
+
+            if (!WriterData.Buffer.Empty()) { 
+                if (WriterData.Buffer.Size() >= Config.SendThreshold) { 
+                    WriterData.State = WRITER_FLUSHING; 
+                } else if (WriterData.CorkUntil == TInstant::Zero()) { 
+                    WriterData.State = WRITER_FLUSHING; 
+                } else if (TInstant::Now() >= WriterData.CorkUntil) { 
+                    WriterData.State = WRITER_FLUSHING; 
+                } else { 
+                    // keep filling 
+                    Y_ASSERT(WriterData.State == WRITER_FILLING); 
+                    GetWriterSchedulerActor()->ScheduleAt(WriterData.CorkUntil); 
+                } 
+            } else { 
+                // keep filling 
+                Y_ASSERT(WriterData.State == WRITER_FILLING); 
+            } 
+
+            size_t bytes = MessageSize(*writeMessages.GetVector()); 
+
+            QuotaReturnSelf(writeMessages.GetVector()->size(), bytes); 
+
+            // This is called before `send` syscall inducing latency 
+            MessageSent(*writeMessages.GetVector()); 
+        }
+
+        size_t TRemoteConnection::MessageSize(TArrayRef<TBusMessagePtrAndHeader> messages) { 
+            size_t size = 0; 
+            for (const auto& message : messages) 
+                size += message.MessagePtr->RequestSize; 
+
+            return size; 
+        } 
+
+        size_t TRemoteConnection::GetInFlight() { 
+            return AtomicGet(WriterData.InFlight); 
+        }
+
+        size_t TRemoteConnection::GetConnectSyscallsNumForTest() { 
+            return WriterData.Status.ConnectSyscalls; 
+        }
+
+        void TRemoteConnection::WriterBeforeWriteErrorMessage(TBusMessage* message, EMessageStatus status) { 
+            if (Session->IsSource_) { 
+                CheckedCast<TRemoteClientSession*>(Session.Get())->ReleaseInFlight({message}); 
+                WriterErrorMessage(message, status); 
+            } else { 
+                TBusMessagePtrAndHeader h(message); 
+                CheckedCast<TRemoteServerSession*>(Session.Get())->ReleaseInWorkResponses(MakeArrayRef(&h, 1)); 
+                WriterErrorMessage(h.MessagePtr.Release(), status); 
+            } 
+        } 
+
+        void TRemoteConnection::WriterErrorMessage(TNonDestroyingAutoPtr<TBusMessage> m, EMessageStatus status) { 
+            TBusMessage* released = m.Release(); 
+            WriterErrorMessages(MakeArrayRef(&released, 1), status); 
+        } 
+
+        void TRemoteConnection::WriterErrorMessages(const TArrayRef<TBusMessage*> ms, EMessageStatus status) { 
+            ResetOneWayFlag(ms); 
+
+            WriterData.Status.Incremental.StatusCounter[status] += ms.size(); 
+            for (auto m : ms) { 
+                Session->InvokeOnError(m, status); 
+            } 
+        } 
+
+        void TRemoteConnection::FireClientConnectionEvent(TClientConnectionEvent::EType type) { 
+            Y_VERIFY(Session->IsSource_, "state check"); 
+            TClientConnectionEvent event(type, ConnectionId, PeerAddr); 
+            TRemoteClientSession* session = CheckedCast<TRemoteClientSession*>(Session.Get()); 
+            session->ClientHandler->OnClientConnectionEvent(event); 
+        } 
+
+        bool TRemoteConnection::IsAlive() const { 
+            return !AtomicGet(WriterData.Down); 
+        } 
 
     }
 }
-- 
cgit v1.2.3