diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:17 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:17 +0300 |
commit | d3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch) | |
tree | dd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/messagebus/remote_connection.cpp | |
parent | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff) | |
download | ydb-d3a398281c6fd1d3672036cb2d63f842d2cb28c5.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/remote_connection.cpp')
-rw-r--r-- | library/cpp/messagebus/remote_connection.cpp | 1614 |
1 files changed, 807 insertions, 807 deletions
diff --git a/library/cpp/messagebus/remote_connection.cpp b/library/cpp/messagebus/remote_connection.cpp index ca4a66f68a..22932569db 100644 --- a/library/cpp/messagebus/remote_connection.cpp +++ b/library/cpp/messagebus/remote_connection.cpp @@ -20,955 +20,955 @@ using namespace NActor; using namespace NBus; using namespace NBus::NPrivate; -namespace NBus { - namespace NPrivate { - TRemoteConnection::TRemoteConnection(TRemoteSessionPtr session, ui64 connectionId, TNetAddr addr) - : TActor<TRemoteConnection, TWriterTag>(session->Queue->WorkQueue.Get()) - , TActor<TRemoteConnection, TReaderTag>(session->Queue->WorkQueue.Get()) - , TScheduleActor<TRemoteConnection, TWriterTag>(&session->Queue->Scheduler) - , Session(session) - , Proto(session->Proto) - , Config(session->Config) - , RemovedFromSession(false) - , ConnectionId(connectionId) - , PeerAddr(addr) - , PeerAddrSocketAddr(addr) - , CreatedTime(TInstant::Now()) - , ReturnConnectFailedImmediately(false) - , GranStatus(Config.Secret.StatusFlushPeriod) - , QuotaMsg(!Session->IsSource_, Config.PerConnectionMaxInFlight, 0) - , QuotaBytes(!Session->IsSource_, Config.PerConnectionMaxInFlightBySize, 0) - , MaxBufferSize(session->Config.MaxBufferSize) - , ShutdownReason(MESSAGE_OK) - { - WriterData.Status.ConnectionId = connectionId; - WriterData.Status.PeerAddr = PeerAddr; - ReaderData.Status.ConnectionId = connectionId; - - const TInstant now = TInstant::Now(); - - WriterFillStatus(); - - GranStatus.Writer.Update(WriterData.Status, now, true); - GranStatus.Reader.Update(ReaderData.Status, now, true); - } - - TRemoteConnection::~TRemoteConnection() { - Y_VERIFY(ReplyQueue.IsEmpty()); - } - - TRemoteConnection::TWriterData::TWriterData() - : Down(0) - , SocketVersion(0) - , InFlight(0) - , AwakeFlags(0) - , State(WRITER_FILLING) - { - } - - TRemoteConnection::TWriterData::~TWriterData() { +namespace NBus { + namespace NPrivate { + TRemoteConnection::TRemoteConnection(TRemoteSessionPtr session, ui64 connectionId, TNetAddr addr) + : TActor<TRemoteConnection, TWriterTag>(session->Queue->WorkQueue.Get()) + , TActor<TRemoteConnection, TReaderTag>(session->Queue->WorkQueue.Get()) + , TScheduleActor<TRemoteConnection, TWriterTag>(&session->Queue->Scheduler) + , Session(session) + , Proto(session->Proto) + , Config(session->Config) + , RemovedFromSession(false) + , ConnectionId(connectionId) + , PeerAddr(addr) + , PeerAddrSocketAddr(addr) + , CreatedTime(TInstant::Now()) + , ReturnConnectFailedImmediately(false) + , GranStatus(Config.Secret.StatusFlushPeriod) + , QuotaMsg(!Session->IsSource_, Config.PerConnectionMaxInFlight, 0) + , QuotaBytes(!Session->IsSource_, Config.PerConnectionMaxInFlightBySize, 0) + , MaxBufferSize(session->Config.MaxBufferSize) + , ShutdownReason(MESSAGE_OK) + { + WriterData.Status.ConnectionId = connectionId; + WriterData.Status.PeerAddr = PeerAddr; + ReaderData.Status.ConnectionId = connectionId; + + const TInstant now = TInstant::Now(); + + WriterFillStatus(); + + GranStatus.Writer.Update(WriterData.Status, now, true); + GranStatus.Reader.Update(ReaderData.Status, now, true); + } + + TRemoteConnection::~TRemoteConnection() { + Y_VERIFY(ReplyQueue.IsEmpty()); + } + + TRemoteConnection::TWriterData::TWriterData() + : Down(0) + , SocketVersion(0) + , InFlight(0) + , AwakeFlags(0) + , State(WRITER_FILLING) + { + } + + TRemoteConnection::TWriterData::~TWriterData() { Y_VERIFY(AtomicGet(Down)); - Y_VERIFY(SendQueue.Empty()); - } - - bool TRemoteConnection::TReaderData::HasBytesInBuf(size_t bytes) noexcept { - size_t left = Buffer.Size() - Offset; - - return (MoreBytes = left >= bytes ? 0 : bytes - left) == 0; - } - - void TRemoteConnection::TWriterData::SetChannel(NEventLoop::TChannelPtr channel) { - Y_VERIFY(!Channel, "must not have channel"); - Y_VERIFY(Buffer.GetBuffer().Empty() && Buffer.LeftSize() == 0, "buffer must be empty"); - Y_VERIFY(State == WRITER_FILLING, "state must be initial"); - Channel = channel; - } - - void TRemoteConnection::TReaderData::SetChannel(NEventLoop::TChannelPtr channel) { - Y_VERIFY(!Channel, "must not have channel"); - Y_VERIFY(Buffer.Empty(), "buffer must be empty"); - Channel = channel; - } - - void TRemoteConnection::TWriterData::DropChannel() { - if (!!Channel) { - Channel->Unregister(); - Channel.Drop(); - } - - Buffer.Reset(); - State = WRITER_FILLING; - } - - void TRemoteConnection::TReaderData::DropChannel() { - // TODO: make Drop call Unregister - if (!!Channel) { - Channel->Unregister(); - Channel.Drop(); - } - Buffer.Reset(); - Offset = 0; - } - - TRemoteConnection::TReaderData::TReaderData() - : Down(0) - , SocketVersion(0) - , Offset(0) - , MoreBytes(0) - { - } - - TRemoteConnection::TReaderData::~TReaderData() { + Y_VERIFY(SendQueue.Empty()); + } + + bool TRemoteConnection::TReaderData::HasBytesInBuf(size_t bytes) noexcept { + size_t left = Buffer.Size() - Offset; + + return (MoreBytes = left >= bytes ? 0 : bytes - left) == 0; + } + + void TRemoteConnection::TWriterData::SetChannel(NEventLoop::TChannelPtr channel) { + Y_VERIFY(!Channel, "must not have channel"); + Y_VERIFY(Buffer.GetBuffer().Empty() && Buffer.LeftSize() == 0, "buffer must be empty"); + Y_VERIFY(State == WRITER_FILLING, "state must be initial"); + Channel = channel; + } + + void TRemoteConnection::TReaderData::SetChannel(NEventLoop::TChannelPtr channel) { + Y_VERIFY(!Channel, "must not have channel"); + Y_VERIFY(Buffer.Empty(), "buffer must be empty"); + Channel = channel; + } + + void TRemoteConnection::TWriterData::DropChannel() { + if (!!Channel) { + Channel->Unregister(); + Channel.Drop(); + } + + Buffer.Reset(); + State = WRITER_FILLING; + } + + void TRemoteConnection::TReaderData::DropChannel() { + // TODO: make Drop call Unregister + if (!!Channel) { + Channel->Unregister(); + Channel.Drop(); + } + Buffer.Reset(); + Offset = 0; + } + + TRemoteConnection::TReaderData::TReaderData() + : Down(0) + , SocketVersion(0) + , Offset(0) + , MoreBytes(0) + { + } + + TRemoteConnection::TReaderData::~TReaderData() { Y_VERIFY(AtomicGet(Down)); - } - - void TRemoteConnection::Send(TNonDestroyingAutoPtr<TBusMessage> msg) { - BeforeSendQueue.Enqueue(msg.Release()); - AtomicIncrement(WriterData.InFlight); - ScheduleWrite(); - } - - void TRemoteConnection::ClearOutgoingQueue(TMessagesPtrs& result, bool reconnect) { - if (!reconnect) { - // Do not clear send queue if reconnecting - WriterData.SendQueue.Clear(&result); - } - } + } - void TRemoteConnection::Shutdown(EMessageStatus status) { - ScheduleShutdown(status); + void TRemoteConnection::Send(TNonDestroyingAutoPtr<TBusMessage> msg) { + BeforeSendQueue.Enqueue(msg.Release()); + AtomicIncrement(WriterData.InFlight); + ScheduleWrite(); + } - ReaderData.ShutdownComplete.WaitI(); - WriterData.ShutdownComplete.WaitI(); - } + void TRemoteConnection::ClearOutgoingQueue(TMessagesPtrs& result, bool reconnect) { + if (!reconnect) { + // Do not clear send queue if reconnecting + WriterData.SendQueue.Clear(&result); + } + } - void TRemoteConnection::TryConnect() { - Y_FAIL("TryConnect is client connection only operation"); - } + void TRemoteConnection::Shutdown(EMessageStatus status) { + ScheduleShutdown(status); - void TRemoteConnection::ScheduleRead() { - GetReaderActor()->Schedule(); - } + ReaderData.ShutdownComplete.WaitI(); + WriterData.ShutdownComplete.WaitI(); + } - void TRemoteConnection::ScheduleWrite() { - GetWriterActor()->Schedule(); - } + void TRemoteConnection::TryConnect() { + Y_FAIL("TryConnect is client connection only operation"); + } - void TRemoteConnection::WriterRotateCounters() { - if (!WriterData.TimeToRotateCounters.FetchTask()) { - return; - } + void TRemoteConnection::ScheduleRead() { + GetReaderActor()->Schedule(); + } - WriterData.Status.DurationCounterPrev = WriterData.Status.DurationCounter; - Reset(WriterData.Status.DurationCounter); - } + void TRemoteConnection::ScheduleWrite() { + GetWriterActor()->Schedule(); + } - void TRemoteConnection::WriterSendStatus(TInstant now, bool force) { - GranStatus.Writer.Update(std::bind(&TRemoteConnection::WriterGetStatus, this), now, force); - } + void TRemoteConnection::WriterRotateCounters() { + if (!WriterData.TimeToRotateCounters.FetchTask()) { + return; + } - void TRemoteConnection::ReaderSendStatus(TInstant now, bool force) { - GranStatus.Reader.Update(std::bind(&TRemoteConnection::ReaderFillStatus, this), now, force); - } + WriterData.Status.DurationCounterPrev = WriterData.Status.DurationCounter; + Reset(WriterData.Status.DurationCounter); + } - const TRemoteConnectionReaderStatus& TRemoteConnection::ReaderFillStatus() { - ReaderData.Status.BufferSize = ReaderData.Buffer.Capacity(); - ReaderData.Status.QuotaMsg = QuotaMsg.Tokens(); - ReaderData.Status.QuotaBytes = QuotaBytes.Tokens(); + void TRemoteConnection::WriterSendStatus(TInstant now, bool force) { + GranStatus.Writer.Update(std::bind(&TRemoteConnection::WriterGetStatus, this), now, force); + } - return ReaderData.Status; - } + void TRemoteConnection::ReaderSendStatus(TInstant now, bool force) { + GranStatus.Reader.Update(std::bind(&TRemoteConnection::ReaderFillStatus, this), now, force); + } + + const TRemoteConnectionReaderStatus& TRemoteConnection::ReaderFillStatus() { + ReaderData.Status.BufferSize = ReaderData.Buffer.Capacity(); + ReaderData.Status.QuotaMsg = QuotaMsg.Tokens(); + ReaderData.Status.QuotaBytes = QuotaBytes.Tokens(); + + return ReaderData.Status; + } - void TRemoteConnection::ProcessItem(TReaderTag, ::NActor::TDefaultTag, TWriterToReaderSocketMessage readSocket) { - if (AtomicGet(ReaderData.Down)) { - ReaderData.Status.Fd = INVALID_SOCKET; - return; - } + void TRemoteConnection::ProcessItem(TReaderTag, ::NActor::TDefaultTag, TWriterToReaderSocketMessage readSocket) { + if (AtomicGet(ReaderData.Down)) { + ReaderData.Status.Fd = INVALID_SOCKET; + return; + } - ReaderData.DropChannel(); + ReaderData.DropChannel(); - ReaderData.Status.Fd = readSocket.Socket; - ReaderData.SocketVersion = readSocket.SocketVersion; + ReaderData.Status.Fd = readSocket.Socket; + ReaderData.SocketVersion = readSocket.SocketVersion; - if (readSocket.Socket != INVALID_SOCKET) { - ReaderData.SetChannel(Session->ReadEventLoop.Register(readSocket.Socket, this, ReadCookie)); - ReaderData.Channel->EnableRead(); - } - } + if (readSocket.Socket != INVALID_SOCKET) { + ReaderData.SetChannel(Session->ReadEventLoop.Register(readSocket.Socket, this, ReadCookie)); + ReaderData.Channel->EnableRead(); + } + } - void TRemoteConnection::ProcessItem(TWriterTag, TReconnectTag, ui32 socketVersion) { - Y_VERIFY(socketVersion <= WriterData.SocketVersion, "something weird"); + void TRemoteConnection::ProcessItem(TWriterTag, TReconnectTag, ui32 socketVersion) { + Y_VERIFY(socketVersion <= WriterData.SocketVersion, "something weird"); - if (WriterData.SocketVersion != socketVersion) { - return; - } - Y_VERIFY(WriterData.Status.Connected, "must be connected at this point"); - Y_VERIFY(!!WriterData.Channel, "must have channel at this point"); + if (WriterData.SocketVersion != socketVersion) { + return; + } + Y_VERIFY(WriterData.Status.Connected, "must be connected at this point"); + Y_VERIFY(!!WriterData.Channel, "must have channel at this point"); - WriterData.Status.Connected = false; - WriterData.DropChannel(); - WriterData.Status.MyAddr = TNetAddr(); - ++WriterData.SocketVersion; - LastConnectAttempt = TInstant(); + WriterData.Status.Connected = false; + WriterData.DropChannel(); + WriterData.Status.MyAddr = TNetAddr(); + ++WriterData.SocketVersion; + LastConnectAttempt = TInstant(); - TMessagesPtrs cleared; - ClearOutgoingQueue(cleared, true); - WriterErrorMessages(cleared, MESSAGE_DELIVERY_FAILED); + TMessagesPtrs cleared; + ClearOutgoingQueue(cleared, true); + WriterErrorMessages(cleared, MESSAGE_DELIVERY_FAILED); - FireClientConnectionEvent(TClientConnectionEvent::DISCONNECTED); + FireClientConnectionEvent(TClientConnectionEvent::DISCONNECTED); - ReaderGetSocketQueue()->EnqueueAndSchedule(TWriterToReaderSocketMessage(INVALID_SOCKET, WriterData.SocketVersion)); - } + ReaderGetSocketQueue()->EnqueueAndSchedule(TWriterToReaderSocketMessage(INVALID_SOCKET, WriterData.SocketVersion)); + } - void TRemoteConnection::ProcessItem(TWriterTag, TWakeReaderTag, ui32 awakeFlags) { - WriterData.AwakeFlags |= awakeFlags; + void TRemoteConnection::ProcessItem(TWriterTag, TWakeReaderTag, ui32 awakeFlags) { + WriterData.AwakeFlags |= awakeFlags; - ReadQuotaWakeup(); - } + ReadQuotaWakeup(); + } - void TRemoteConnection::Act(TReaderTag) { - TInstant now = TInstant::Now(); + void TRemoteConnection::Act(TReaderTag) { + TInstant now = TInstant::Now(); - ReaderData.Status.Acts += 1; + ReaderData.Status.Acts += 1; - ReaderGetSocketQueue()->DequeueAllLikelyEmpty(); + ReaderGetSocketQueue()->DequeueAllLikelyEmpty(); - if (AtomicGet(ReaderData.Down)) { - ReaderData.DropChannel(); + if (AtomicGet(ReaderData.Down)) { + ReaderData.DropChannel(); - ReaderProcessStatusDown(); - ReaderData.ShutdownComplete.Signal(); + ReaderProcessStatusDown(); + ReaderData.ShutdownComplete.Signal(); - } else if (!!ReaderData.Channel) { - Y_ASSERT(ReaderData.ReadMessages.empty()); + } else if (!!ReaderData.Channel) { + Y_ASSERT(ReaderData.ReadMessages.empty()); - for (int i = 0;; ++i) { - if (i == 100) { - // perform other tasks - GetReaderActor()->AddTaskFromActorLoop(); - break; - } + for (int i = 0;; ++i) { + if (i == 100) { + // perform other tasks + GetReaderActor()->AddTaskFromActorLoop(); + break; + } - if (NeedInterruptRead()) { - ReaderData.Channel->EnableRead(); - break; - } + if (NeedInterruptRead()) { + ReaderData.Channel->EnableRead(); + break; + } - if (!ReaderFillBuffer()) - break; + if (!ReaderFillBuffer()) + break; - if (!ReaderProcessBuffer()) - break; - } + if (!ReaderProcessBuffer()) + break; + } - ReaderFlushMessages(); + ReaderFlushMessages(); } - ReaderSendStatus(now); + ReaderSendStatus(now); } - bool TRemoteConnection::QuotaAcquire(size_t msg, size_t bytes) { - ui32 wakeFlags = 0; + bool TRemoteConnection::QuotaAcquire(size_t msg, size_t bytes) { + ui32 wakeFlags = 0; - if (!QuotaMsg.Acquire(msg)) - wakeFlags |= WAKE_QUOTA_MSG; + if (!QuotaMsg.Acquire(msg)) + wakeFlags |= WAKE_QUOTA_MSG; - else if (!QuotaBytes.Acquire(bytes)) - wakeFlags |= WAKE_QUOTA_BYTES; + else if (!QuotaBytes.Acquire(bytes)) + wakeFlags |= WAKE_QUOTA_BYTES; - if (wakeFlags) { - ReaderData.Status.QuotaExhausted++; + if (wakeFlags) { + ReaderData.Status.QuotaExhausted++; - WriterGetWakeQueue()->EnqueueAndSchedule(wakeFlags); - } + WriterGetWakeQueue()->EnqueueAndSchedule(wakeFlags); + } - return wakeFlags == 0; - } + return wakeFlags == 0; + } - void TRemoteConnection::QuotaConsume(size_t msg, size_t bytes) { - QuotaMsg.Consume(msg); - QuotaBytes.Consume(bytes); - } + void TRemoteConnection::QuotaConsume(size_t msg, size_t bytes) { + QuotaMsg.Consume(msg); + QuotaBytes.Consume(bytes); + } - void TRemoteConnection::QuotaReturnSelf(size_t items, size_t bytes) { - if (QuotaReturnValues(items, bytes)) - ReadQuotaWakeup(); - } + void TRemoteConnection::QuotaReturnSelf(size_t items, size_t bytes) { + if (QuotaReturnValues(items, bytes)) + ReadQuotaWakeup(); + } - void TRemoteConnection::QuotaReturnAside(size_t items, size_t bytes) { - if (QuotaReturnValues(items, bytes) && !AtomicGet(WriterData.Down)) - WriterGetWakeQueue()->EnqueueAndSchedule(0x0); - } + void TRemoteConnection::QuotaReturnAside(size_t items, size_t bytes) { + if (QuotaReturnValues(items, bytes) && !AtomicGet(WriterData.Down)) + WriterGetWakeQueue()->EnqueueAndSchedule(0x0); + } - bool TRemoteConnection::QuotaReturnValues(size_t items, size_t bytes) { - bool rMsg = QuotaMsg.Return(items); - bool rBytes = QuotaBytes.Return(bytes); + bool TRemoteConnection::QuotaReturnValues(size_t items, size_t bytes) { + bool rMsg = QuotaMsg.Return(items); + bool rBytes = QuotaBytes.Return(bytes); - return rMsg || rBytes; - } + return rMsg || rBytes; + } - void TRemoteConnection::ReadQuotaWakeup() { - const ui32 mask = WriterData.AwakeFlags & WriteWakeFlags(); + void TRemoteConnection::ReadQuotaWakeup() { + const ui32 mask = WriterData.AwakeFlags & WriteWakeFlags(); - if (mask && mask == WriterData.AwakeFlags) { - WriterData.Status.ReaderWakeups++; - WriterData.AwakeFlags = 0; + if (mask && mask == WriterData.AwakeFlags) { + WriterData.Status.ReaderWakeups++; + WriterData.AwakeFlags = 0; - ScheduleRead(); - } - } + ScheduleRead(); + } + } - ui32 TRemoteConnection::WriteWakeFlags() const { - ui32 awakeFlags = 0; + ui32 TRemoteConnection::WriteWakeFlags() const { + ui32 awakeFlags = 0; - if (QuotaMsg.IsAboveWake()) - awakeFlags |= WAKE_QUOTA_MSG; + if (QuotaMsg.IsAboveWake()) + awakeFlags |= WAKE_QUOTA_MSG; - if (QuotaBytes.IsAboveWake()) - awakeFlags |= WAKE_QUOTA_BYTES; + if (QuotaBytes.IsAboveWake()) + awakeFlags |= WAKE_QUOTA_BYTES; - return awakeFlags; - } + return awakeFlags; + } - bool TRemoteConnection::ReaderProcessBuffer() { - TInstant now = TInstant::Now(); + bool TRemoteConnection::ReaderProcessBuffer() { + TInstant now = TInstant::Now(); + + for (;;) { + if (!ReaderData.HasBytesInBuf(sizeof(TBusHeader))) { + break; + } + + TBusHeader header(MakeArrayRef(ReaderData.Buffer.Data() + ReaderData.Offset, ReaderData.Buffer.Size() - ReaderData.Offset)); + + if (header.Size < sizeof(TBusHeader)) { + LWPROBE(Error, ToString(MESSAGE_HEADER_CORRUPTED), ToString(PeerAddr), ToString(header.Size)); + ReaderData.Status.Incremental.StatusCounter[MESSAGE_HEADER_CORRUPTED] += 1; + ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_HEADER_CORRUPTED, false); + return false; + } + + if (!IsVersionNegotiation(header) && !IsBusKeyValid(header.Id)) { + LWPROBE(Error, ToString(MESSAGE_HEADER_CORRUPTED), ToString(PeerAddr), ToString(header.Size)); + ReaderData.Status.Incremental.StatusCounter[MESSAGE_HEADER_CORRUPTED] += 1; + ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_HEADER_CORRUPTED, false); + return false; + } + + if (header.Size > Config.MaxMessageSize) { + LWPROBE(Error, ToString(MESSAGE_MESSAGE_TOO_LARGE), ToString(PeerAddr), ToString(header.Size)); + ReaderData.Status.Incremental.StatusCounter[MESSAGE_MESSAGE_TOO_LARGE] += 1; + ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_MESSAGE_TOO_LARGE, false); + return false; + } + + if (!ReaderData.HasBytesInBuf(header.Size)) { + if (ReaderData.Offset == 0) { + ReaderData.Buffer.Reserve(header.Size); + } + break; + } + + if (!QuotaAcquire(1, header.Size)) + return false; + + if (!MessageRead(MakeArrayRef(ReaderData.Buffer.Data() + ReaderData.Offset, header.Size), now)) { + return false; + } + + ReaderData.Offset += header.Size; + } - for (;;) { - if (!ReaderData.HasBytesInBuf(sizeof(TBusHeader))) { - break; - } + ReaderData.Buffer.ChopHead(ReaderData.Offset); + ReaderData.Offset = 0; - TBusHeader header(MakeArrayRef(ReaderData.Buffer.Data() + ReaderData.Offset, ReaderData.Buffer.Size() - ReaderData.Offset)); + if (ReaderData.Buffer.Capacity() > MaxBufferSize && ReaderData.Buffer.Size() <= MaxBufferSize) { + ReaderData.Status.Incremental.BufferDrops += 1; - if (header.Size < sizeof(TBusHeader)) { - LWPROBE(Error, ToString(MESSAGE_HEADER_CORRUPTED), ToString(PeerAddr), ToString(header.Size)); - ReaderData.Status.Incremental.StatusCounter[MESSAGE_HEADER_CORRUPTED] += 1; - ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_HEADER_CORRUPTED, false); - return false; - } + TBuffer temp; + // probably should use another constant + temp.Reserve(Config.DefaultBufferSize); + temp.Append(ReaderData.Buffer.Data(), ReaderData.Buffer.Size()); - if (!IsVersionNegotiation(header) && !IsBusKeyValid(header.Id)) { - LWPROBE(Error, ToString(MESSAGE_HEADER_CORRUPTED), ToString(PeerAddr), ToString(header.Size)); - ReaderData.Status.Incremental.StatusCounter[MESSAGE_HEADER_CORRUPTED] += 1; - ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_HEADER_CORRUPTED, false); - return false; - } + ReaderData.Buffer.Swap(temp); + } - if (header.Size > Config.MaxMessageSize) { - LWPROBE(Error, ToString(MESSAGE_MESSAGE_TOO_LARGE), ToString(PeerAddr), ToString(header.Size)); - ReaderData.Status.Incremental.StatusCounter[MESSAGE_MESSAGE_TOO_LARGE] += 1; - ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_MESSAGE_TOO_LARGE, false); - return false; - } + return true; + } - if (!ReaderData.HasBytesInBuf(header.Size)) { - if (ReaderData.Offset == 0) { - ReaderData.Buffer.Reserve(header.Size); - } - break; - } + bool TRemoteConnection::ReaderFillBuffer() { + if (!ReaderData.BufferMore()) + return true; - if (!QuotaAcquire(1, header.Size)) - return false; + if (ReaderData.Buffer.Avail() == 0) { + if (ReaderData.Buffer.Size() == 0) { + ReaderData.Buffer.Reserve(Config.DefaultBufferSize); + } else { + ReaderData.Buffer.Reserve(ReaderData.Buffer.Size() * 2); + } + } - if (!MessageRead(MakeArrayRef(ReaderData.Buffer.Data() + ReaderData.Offset, header.Size), now)) { - return false; - } + Y_ASSERT(ReaderData.Buffer.Avail() > 0); - ReaderData.Offset += header.Size; - } + ssize_t bytes; + { + TWhatThreadDoesPushPop pp("recv syscall"); + bytes = SocketRecv(ReaderData.Channel->GetSocket(), TArrayRef<char>(ReaderData.Buffer.Pos(), ReaderData.Buffer.Avail())); + } - ReaderData.Buffer.ChopHead(ReaderData.Offset); - ReaderData.Offset = 0; + if (bytes < 0) { + if (WouldBlock()) { + ReaderData.Channel->EnableRead(); + return false; + } else { + ReaderData.Channel->DisableRead(); + ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_DELIVERY_FAILED, false); + return false; + } + } - if (ReaderData.Buffer.Capacity() > MaxBufferSize && ReaderData.Buffer.Size() <= MaxBufferSize) { - ReaderData.Status.Incremental.BufferDrops += 1; + if (bytes == 0) { + ReaderData.Channel->DisableRead(); + // TODO: incorrect: it is possible that only input is shutdown, and output is available + ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_DELIVERY_FAILED, false); + return false; + } - TBuffer temp; - // probably should use another constant - temp.Reserve(Config.DefaultBufferSize); - temp.Append(ReaderData.Buffer.Data(), ReaderData.Buffer.Size()); + ReaderData.Status.Incremental.NetworkOps += 1; - ReaderData.Buffer.Swap(temp); - } + ReaderData.Buffer.Advance(bytes); + ReaderData.MoreBytes = 0; + return true; + } - return true; + void TRemoteConnection::ClearBeforeSendQueue(EMessageStatus reason) { + BeforeSendQueue.DequeueAll(std::bind(&TRemoteConnection::WriterBeforeWriteErrorMessage, this, std::placeholders::_1, reason)); } - bool TRemoteConnection::ReaderFillBuffer() { - if (!ReaderData.BufferMore()) - return true; + void TRemoteConnection::ClearReplyQueue(EMessageStatus reason) { + TVectorSwaps<TBusMessagePtrAndHeader> replyQueueTemp; + Y_ASSERT(replyQueueTemp.empty()); + ReplyQueue.DequeueAllSingleConsumer(&replyQueueTemp); - if (ReaderData.Buffer.Avail() == 0) { - if (ReaderData.Buffer.Size() == 0) { - ReaderData.Buffer.Reserve(Config.DefaultBufferSize); - } else { - ReaderData.Buffer.Reserve(ReaderData.Buffer.Size() * 2); - } + TVector<TBusMessage*> messages; + for (TVectorSwaps<TBusMessagePtrAndHeader>::reverse_iterator message = replyQueueTemp.rbegin(); + message != replyQueueTemp.rend(); ++message) { + messages.push_back(message->MessagePtr.Release()); } - Y_ASSERT(ReaderData.Buffer.Avail() > 0); + WriterErrorMessages(messages, reason); - ssize_t bytes; - { - TWhatThreadDoesPushPop pp("recv syscall"); - bytes = SocketRecv(ReaderData.Channel->GetSocket(), TArrayRef<char>(ReaderData.Buffer.Pos(), ReaderData.Buffer.Avail())); - } - - if (bytes < 0) { - if (WouldBlock()) { - ReaderData.Channel->EnableRead(); - return false; - } else { - ReaderData.Channel->DisableRead(); - ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_DELIVERY_FAILED, false); - return false; - } - } - - if (bytes == 0) { - ReaderData.Channel->DisableRead(); - // TODO: incorrect: it is possible that only input is shutdown, and output is available - ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_DELIVERY_FAILED, false); - return false; - } - - ReaderData.Status.Incremental.NetworkOps += 1; - - ReaderData.Buffer.Advance(bytes); - ReaderData.MoreBytes = 0; - return true; - } - - void TRemoteConnection::ClearBeforeSendQueue(EMessageStatus reason) { - BeforeSendQueue.DequeueAll(std::bind(&TRemoteConnection::WriterBeforeWriteErrorMessage, this, std::placeholders::_1, reason)); - } - - void TRemoteConnection::ClearReplyQueue(EMessageStatus reason) { - TVectorSwaps<TBusMessagePtrAndHeader> replyQueueTemp; - Y_ASSERT(replyQueueTemp.empty()); - ReplyQueue.DequeueAllSingleConsumer(&replyQueueTemp); - - TVector<TBusMessage*> messages; - for (TVectorSwaps<TBusMessagePtrAndHeader>::reverse_iterator message = replyQueueTemp.rbegin(); - message != replyQueueTemp.rend(); ++message) { - messages.push_back(message->MessagePtr.Release()); - } - - WriterErrorMessages(messages, reason); - - replyQueueTemp.clear(); - } - - void TRemoteConnection::ProcessBeforeSendQueueMessage(TBusMessage* message, TInstant now) { - // legacy clients expect this field to be set - if (!Session->IsSource_) { - message->SendTime = now.MilliSeconds(); - } - - WriterData.SendQueue.PushBack(message); - } - - void TRemoteConnection::ProcessBeforeSendQueue(TInstant now) { - BeforeSendQueue.DequeueAll(std::bind(&TRemoteConnection::ProcessBeforeSendQueueMessage, this, std::placeholders::_1, now)); - } - - void TRemoteConnection::WriterFillInFlight() { - // this is hack for TLoadBalancedProtocol - WriterFillStatus(); - AtomicSet(WriterData.InFlight, WriterData.Status.GetInFlight()); - } - - const TRemoteConnectionWriterStatus& TRemoteConnection::WriterGetStatus() { - WriterRotateCounters(); - WriterFillStatus(); - - return WriterData.Status; - } - - void TRemoteConnection::WriterFillStatus() { - if (!!WriterData.Channel) { - WriterData.Status.Fd = WriterData.Channel->GetSocket(); - } else { - WriterData.Status.Fd = INVALID_SOCKET; - } - WriterData.Status.BufferSize = WriterData.Buffer.Capacity(); - WriterData.Status.SendQueueSize = WriterData.SendQueue.Size(); - WriterData.Status.State = WriterData.State; - } - - void TRemoteConnection::WriterProcessStatusDown() { - Session->GetDeadConnectionWriterStatusQueue()->EnqueueAndSchedule(WriterData.Status.Incremental); - Reset(WriterData.Status.Incremental); - } - - void TRemoteConnection::ReaderProcessStatusDown() { - Session->GetDeadConnectionReaderStatusQueue()->EnqueueAndSchedule(ReaderData.Status.Incremental); - Reset(ReaderData.Status.Incremental); - } - - void TRemoteConnection::ProcessWriterDown() { - if (!RemovedFromSession) { - Session->GetRemoveConnectionQueue()->EnqueueAndSchedule(this); - - if (Session->IsSource_) { - if (WriterData.Status.Connected) { - FireClientConnectionEvent(TClientConnectionEvent::DISCONNECTED); - } - } - - LWPROBE(Disconnected, ToString(PeerAddr)); - RemovedFromSession = true; - } - - WriterData.DropChannel(); + replyQueueTemp.clear(); + } - DropEnqueuedData(ShutdownReason, MESSAGE_SHUTDOWN); + void TRemoteConnection::ProcessBeforeSendQueueMessage(TBusMessage* message, TInstant now) { + // legacy clients expect this field to be set + if (!Session->IsSource_) { + message->SendTime = now.MilliSeconds(); + } - WriterProcessStatusDown(); + WriterData.SendQueue.PushBack(message); + } - WriterData.ShutdownComplete.Signal(); - } + void TRemoteConnection::ProcessBeforeSendQueue(TInstant now) { + BeforeSendQueue.DequeueAll(std::bind(&TRemoteConnection::ProcessBeforeSendQueueMessage, this, std::placeholders::_1, now)); + } - void TRemoteConnection::DropEnqueuedData(EMessageStatus reason, EMessageStatus reasonForQueues) { - ClearReplyQueue(reasonForQueues); - ClearBeforeSendQueue(reasonForQueues); - WriterGetReconnectQueue()->Clear(); - WriterGetWakeQueue()->Clear(); + void TRemoteConnection::WriterFillInFlight() { + // this is hack for TLoadBalancedProtocol + WriterFillStatus(); + AtomicSet(WriterData.InFlight, WriterData.Status.GetInFlight()); + } - TMessagesPtrs cleared; - ClearOutgoingQueue(cleared, false); + const TRemoteConnectionWriterStatus& TRemoteConnection::WriterGetStatus() { + WriterRotateCounters(); + WriterFillStatus(); - if (!Session->IsSource_) { - for (auto& i : cleared) { - TBusMessagePtrAndHeader h(i); - CheckedCast<TRemoteServerSession*>(Session.Get())->ReleaseInWorkResponses(MakeArrayRef(&h, 1)); - // assignment back is weird - i = h.MessagePtr.Release(); - // and this part is not batch - } - } + return WriterData.Status; + } - WriterErrorMessages(cleared, reason); - } + void TRemoteConnection::WriterFillStatus() { + if (!!WriterData.Channel) { + WriterData.Status.Fd = WriterData.Channel->GetSocket(); + } else { + WriterData.Status.Fd = INVALID_SOCKET; + } + WriterData.Status.BufferSize = WriterData.Buffer.Capacity(); + WriterData.Status.SendQueueSize = WriterData.SendQueue.Size(); + WriterData.Status.State = WriterData.State; + } - void TRemoteConnection::BeforeTryWrite() { - } + void TRemoteConnection::WriterProcessStatusDown() { + Session->GetDeadConnectionWriterStatusQueue()->EnqueueAndSchedule(WriterData.Status.Incremental); + Reset(WriterData.Status.Incremental); + } - void TRemoteConnection::Act(TWriterTag) { - TInstant now = TInstant::Now(); + void TRemoteConnection::ReaderProcessStatusDown() { + Session->GetDeadConnectionReaderStatusQueue()->EnqueueAndSchedule(ReaderData.Status.Incremental); + Reset(ReaderData.Status.Incremental); + } - WriterData.Status.Acts += 1; + void TRemoteConnection::ProcessWriterDown() { + if (!RemovedFromSession) { + Session->GetRemoveConnectionQueue()->EnqueueAndSchedule(this); - if (Y_UNLIKELY(AtomicGet(WriterData.Down))) { - // dump status must work even if WriterDown - WriterSendStatus(now, true); - ProcessWriterDown(); - return; + if (Session->IsSource_) { + if (WriterData.Status.Connected) { + FireClientConnectionEvent(TClientConnectionEvent::DISCONNECTED); + } + } + + LWPROBE(Disconnected, ToString(PeerAddr)); + RemovedFromSession = true; } - ProcessBeforeSendQueue(now); + WriterData.DropChannel(); - BeforeTryWrite(); + DropEnqueuedData(ShutdownReason, MESSAGE_SHUTDOWN); - WriterFillInFlight(); + WriterProcessStatusDown(); - WriterGetReconnectQueue()->DequeueAllLikelyEmpty(); + WriterData.ShutdownComplete.Signal(); + } - if (!WriterData.Status.Connected) { - TryConnect(); - } else { - for (int i = 0;; ++i) { - if (i == 100) { - // perform other tasks - GetWriterActor()->AddTaskFromActorLoop(); - break; - } + void TRemoteConnection::DropEnqueuedData(EMessageStatus reason, EMessageStatus reasonForQueues) { + ClearReplyQueue(reasonForQueues); + ClearBeforeSendQueue(reasonForQueues); + WriterGetReconnectQueue()->Clear(); + WriterGetWakeQueue()->Clear(); + + TMessagesPtrs cleared; + ClearOutgoingQueue(cleared, false); + + if (!Session->IsSource_) { + for (auto& i : cleared) { + TBusMessagePtrAndHeader h(i); + CheckedCast<TRemoteServerSession*>(Session.Get())->ReleaseInWorkResponses(MakeArrayRef(&h, 1)); + // assignment back is weird + i = h.MessagePtr.Release(); + // and this part is not batch + } + } - if (WriterData.State == WRITER_FILLING) { - WriterFillBuffer(); + WriterErrorMessages(cleared, reason); + } - if (WriterData.State == WRITER_FILLING) { - WriterData.Channel->DisableWrite(); - break; - } + void TRemoteConnection::BeforeTryWrite() { + } - Y_ASSERT(!WriterData.Buffer.Empty()); - } + void TRemoteConnection::Act(TWriterTag) { + TInstant now = TInstant::Now(); - if (WriterData.State == WRITER_FLUSHING) { - WriterFlushBuffer(); + WriterData.Status.Acts += 1; - if (WriterData.State == WRITER_FLUSHING) { - break; - } - } - } - } + if (Y_UNLIKELY(AtomicGet(WriterData.Down))) { + // dump status must work even if WriterDown + WriterSendStatus(now, true); + ProcessWriterDown(); + return; + } - WriterGetWakeQueue()->DequeueAllLikelyEmpty(); + ProcessBeforeSendQueue(now); - WriterSendStatus(now); - } + BeforeTryWrite(); - void TRemoteConnection::WriterFlushBuffer() { - Y_ASSERT(WriterData.State == WRITER_FLUSHING); - Y_ASSERT(!WriterData.Buffer.Empty()); + WriterFillInFlight(); - WriterData.CorkUntil = TInstant::Zero(); + WriterGetReconnectQueue()->DequeueAllLikelyEmpty(); - while (!WriterData.Buffer.Empty()) { - ssize_t bytes; - { - TWhatThreadDoesPushPop pp("send syscall"); - bytes = SocketSend(WriterData.Channel->GetSocket(), TArrayRef<const char>(WriterData.Buffer.LeftPos(), WriterData.Buffer.Size())); - } + if (!WriterData.Status.Connected) { + TryConnect(); + } else { + for (int i = 0;; ++i) { + if (i == 100) { + // perform other tasks + GetWriterActor()->AddTaskFromActorLoop(); + break; + } + + if (WriterData.State == WRITER_FILLING) { + WriterFillBuffer(); + + if (WriterData.State == WRITER_FILLING) { + WriterData.Channel->DisableWrite(); + break; + } + + Y_ASSERT(!WriterData.Buffer.Empty()); + } + + if (WriterData.State == WRITER_FLUSHING) { + WriterFlushBuffer(); + + if (WriterData.State == WRITER_FLUSHING) { + break; + } + } + } + } + + WriterGetWakeQueue()->DequeueAllLikelyEmpty(); - if (bytes < 0) { - if (WouldBlock()) { - WriterData.Channel->EnableWrite(); - return; - } else { - WriterData.Channel->DisableWrite(); - ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_DELIVERY_FAILED, true); - return; - } - } + WriterSendStatus(now); + } + + void TRemoteConnection::WriterFlushBuffer() { + Y_ASSERT(WriterData.State == WRITER_FLUSHING); + Y_ASSERT(!WriterData.Buffer.Empty()); - WriterData.Status.Incremental.NetworkOps += 1; + WriterData.CorkUntil = TInstant::Zero(); - WriterData.Buffer.LeftProceed(bytes); + while (!WriterData.Buffer.Empty()) { + ssize_t bytes; + { + TWhatThreadDoesPushPop pp("send syscall"); + bytes = SocketSend(WriterData.Channel->GetSocket(), TArrayRef<const char>(WriterData.Buffer.LeftPos(), WriterData.Buffer.Size())); + } + + if (bytes < 0) { + if (WouldBlock()) { + WriterData.Channel->EnableWrite(); + return; + } else { + WriterData.Channel->DisableWrite(); + ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_DELIVERY_FAILED, true); + return; + } + } + + WriterData.Status.Incremental.NetworkOps += 1; + + WriterData.Buffer.LeftProceed(bytes); } - WriterData.Buffer.Clear(); - if (WriterData.Buffer.Capacity() > MaxBufferSize) { - WriterData.Status.Incremental.BufferDrops += 1; - WriterData.Buffer.Reset(); + WriterData.Buffer.Clear(); + if (WriterData.Buffer.Capacity() > MaxBufferSize) { + WriterData.Status.Incremental.BufferDrops += 1; + WriterData.Buffer.Reset(); } - WriterData.State = WRITER_FILLING; - } + WriterData.State = WRITER_FILLING; + } - void TRemoteConnection::ScheduleShutdownOnServerOrReconnectOnClient(EMessageStatus status, bool writer) { - if (Session->IsSource_) { - WriterGetReconnectQueue()->EnqueueAndSchedule(writer ? WriterData.SocketVersion : ReaderData.SocketVersion); - } else { - ScheduleShutdown(status); + void TRemoteConnection::ScheduleShutdownOnServerOrReconnectOnClient(EMessageStatus status, bool writer) { + if (Session->IsSource_) { + WriterGetReconnectQueue()->EnqueueAndSchedule(writer ? WriterData.SocketVersion : ReaderData.SocketVersion); + } else { + ScheduleShutdown(status); } } - void TRemoteConnection::ScheduleShutdown(EMessageStatus status) { - ShutdownReason = status; + void TRemoteConnection::ScheduleShutdown(EMessageStatus status) { + ShutdownReason = status; - AtomicSet(ReaderData.Down, 1); - ScheduleRead(); + AtomicSet(ReaderData.Down, 1); + ScheduleRead(); - AtomicSet(WriterData.Down, 1); - ScheduleWrite(); - } + AtomicSet(WriterData.Down, 1); + ScheduleWrite(); + } - void TRemoteConnection::CallSerialize(TBusMessage* msg, TBuffer& buffer) const { - size_t posForAssertion = buffer.Size(); - Proto->Serialize(msg, buffer); - Y_VERIFY(buffer.Size() >= posForAssertion, - "incorrect Serialize implementation, pos before serialize: %d, pos after serialize: %d", - int(posForAssertion), int(buffer.Size())); + void TRemoteConnection::CallSerialize(TBusMessage* msg, TBuffer& buffer) const { + size_t posForAssertion = buffer.Size(); + Proto->Serialize(msg, buffer); + Y_VERIFY(buffer.Size() >= posForAssertion, + "incorrect Serialize implementation, pos before serialize: %d, pos after serialize: %d", + int(posForAssertion), int(buffer.Size())); } - namespace { + namespace { inline void WriteHeader(const TBusHeader& header, TBuffer& data) { - data.Reserve(data.Size() + sizeof(TBusHeader)); - /// \todo hton instead of memcpy - memcpy(data.Data() + data.Size(), &header, sizeof(TBusHeader)); - data.Advance(sizeof(TBusHeader)); + data.Reserve(data.Size() + sizeof(TBusHeader)); + /// \todo hton instead of memcpy + memcpy(data.Data() + data.Size(), &header, sizeof(TBusHeader)); + data.Advance(sizeof(TBusHeader)); } inline void WriteDummyHeader(TBuffer& data) { - data.Resize(data.Size() + sizeof(TBusHeader)); - } + data.Resize(data.Size() + sizeof(TBusHeader)); + } - } + } - void TRemoteConnection::SerializeMessage(TBusMessage* msg, TBuffer* data, TMessageCounter* counter) const { - size_t pos = data->Size(); + void TRemoteConnection::SerializeMessage(TBusMessage* msg, TBuffer* data, TMessageCounter* counter) const { + size_t pos = data->Size(); - size_t dataSize; + size_t dataSize; - bool compressionRequested = msg->IsCompressed(); + bool compressionRequested = msg->IsCompressed(); - if (compressionRequested) { - TBuffer compdata; - TBuffer plaindata; - CallSerialize(msg, plaindata); + if (compressionRequested) { + TBuffer compdata; + TBuffer plaindata; + CallSerialize(msg, plaindata); - dataSize = sizeof(TBusHeader) + plaindata.Size(); + dataSize = sizeof(TBusHeader) + plaindata.Size(); - NCodecs::TCodecPtr c = Proto->GetTransportCodec(); + NCodecs::TCodecPtr c = Proto->GetTransportCodec(); c->Encode(TStringBuf{plaindata.data(), plaindata.size()}, compdata); - if (compdata.Size() < plaindata.Size()) { - plaindata.Clear(); - msg->GetHeader()->Size = sizeof(TBusHeader) + compdata.Size(); - WriteHeader(*msg->GetHeader(), *data); - data->Append(compdata.Data(), compdata.Size()); - } else { - compdata.Clear(); - msg->SetCompressed(false); - msg->GetHeader()->Size = sizeof(TBusHeader) + plaindata.Size(); - WriteHeader(*msg->GetHeader(), *data); - data->Append(plaindata.Data(), plaindata.Size()); - } - } else { - WriteDummyHeader(*data); - CallSerialize(msg, *data); - - dataSize = msg->GetHeader()->Size = data->Size() - pos; - - data->Proceed(pos); - WriteHeader(*msg->GetHeader(), *data); - data->Proceed(pos + msg->GetHeader()->Size); - } - - Y_ASSERT(msg->GetHeader()->Size == data->Size() - pos); - counter->AddMessage(dataSize, data->Size() - pos, msg->IsCompressed(), compressionRequested); - } - - TBusMessage* TRemoteConnection::DeserializeMessage(TArrayRef<const char> dataRef, const TBusHeader* header, TMessageCounter* messageCounter, EMessageStatus* status) const { - size_t dataSize; - - TBusMessage* message; - if (header->FlagsInternal & MESSAGE_COMPRESS_INTERNAL) { - TBuffer msg; - { - TBuffer plaindata; - NCodecs::TCodecPtr c = Proto->GetTransportCodec(); - try { - TArrayRef<const char> payload = TBusMessage::GetPayload(dataRef); - c->Decode(TStringBuf{payload.data(), payload.size()}, plaindata); - } catch (...) { - // catch all, because - // http://nga.at.yandex-team.ru/replies.xml?item_no=3884 - *status = MESSAGE_DECOMPRESS_ERROR; - return nullptr; - } - - msg.Append(dataRef.data(), sizeof(TBusHeader)); - msg.Append(plaindata.Data(), plaindata.Size()); - } - TArrayRef<const char> msgRef(msg.Data(), msg.Size()); - dataSize = sizeof(TBusHeader) + msgRef.size(); - // TODO: track error types - message = Proto->Deserialize(header->Type, msgRef.Slice(sizeof(TBusHeader))).Release(); - if (!message) { - *status = MESSAGE_DESERIALIZE_ERROR; - return nullptr; - } - *message->GetHeader() = *header; - message->SetCompressed(true); - } else { - dataSize = dataRef.size(); - message = Proto->Deserialize(header->Type, dataRef.Slice(sizeof(TBusHeader))).Release(); - if (!message) { - *status = MESSAGE_DESERIALIZE_ERROR; - return nullptr; - } - *message->GetHeader() = *header; - } - - messageCounter->AddMessage(dataSize, dataRef.size(), header->FlagsInternal & MESSAGE_COMPRESS_INTERNAL, false); - - return message; - } - - void TRemoteConnection::ResetOneWayFlag(TArrayRef<TBusMessage*> messages) { - for (auto message : messages) { - message->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL; - } - } - - void TRemoteConnection::ReaderFlushMessages() { - if (!ReaderData.ReadMessages.empty()) { - Session->OnMessageReceived(this, ReaderData.ReadMessages); - ReaderData.ReadMessages.clear(); - } - } - - // @return false if actor should break - bool TRemoteConnection::MessageRead(TArrayRef<const char> readDataRef, TInstant now) { - TBusHeader header(readDataRef); - - Y_ASSERT(readDataRef.size() == header.Size); - - if (header.GetVersionInternal() != YBUS_VERSION) { - ReaderProcessMessageUnknownVersion(readDataRef); - return true; - } - - EMessageStatus deserializeFailureStatus = MESSAGE_OK; - TBusMessage* r = DeserializeMessage(readDataRef, &header, &ReaderData.Status.Incremental.MessageCounter, &deserializeFailureStatus); - - if (!r) { - Y_VERIFY(deserializeFailureStatus != MESSAGE_OK, "state check"); - LWPROBE(Error, ToString(deserializeFailureStatus), ToString(PeerAddr), ""); - ReaderData.Status.Incremental.StatusCounter[deserializeFailureStatus] += 1; - ScheduleShutdownOnServerOrReconnectOnClient(deserializeFailureStatus, false); - return false; - } - - LWPROBE(Read, r->GetHeader()->Size); - - r->ReplyTo = PeerAddrSocketAddr; - - TBusMessagePtrAndHeader h(r); - r->RecvTime = now; - - QuotaConsume(1, header.Size); - - ReaderData.ReadMessages.push_back(h); - if (ReaderData.ReadMessages.size() >= 100) { - ReaderFlushMessages(); - } - - return true; - } - - void TRemoteConnection::WriterFillBuffer() { - Y_ASSERT(WriterData.State == WRITER_FILLING); - - Y_ASSERT(WriterData.Buffer.LeftSize() == 0); - - if (Y_UNLIKELY(!WrongVersionRequests.IsEmpty())) { - TVector<TBusHeader> headers; - WrongVersionRequests.DequeueAllSingleConsumer(&headers); - for (TVector<TBusHeader>::reverse_iterator header = headers.rbegin(); - header != headers.rend(); ++header) { - TBusHeader response = *header; - response.SendTime = NBus::Now(); - response.Size = sizeof(TBusHeader); - response.FlagsInternal = 0; - response.SetVersionInternal(YBUS_VERSION); - WriteHeader(response, WriterData.Buffer.GetBuffer()); - } - - Y_ASSERT(!WriterData.Buffer.Empty()); - WriterData.State = WRITER_FLUSHING; - return; - } - - TTempTlsVector<TBusMessagePtrAndHeader, void, TVectorSwaps> writeMessages; - - for (;;) { - THolder<TBusMessage> writeMessage(WriterData.SendQueue.PopFront()); - if (!writeMessage) { - break; - } - - if (Config.Cork != TDuration::Zero()) { - if (WriterData.CorkUntil == TInstant::Zero()) { - WriterData.CorkUntil = TInstant::Now() + Config.Cork; - } - } + if (compdata.Size() < plaindata.Size()) { + plaindata.Clear(); + msg->GetHeader()->Size = sizeof(TBusHeader) + compdata.Size(); + WriteHeader(*msg->GetHeader(), *data); + data->Append(compdata.Data(), compdata.Size()); + } else { + compdata.Clear(); + msg->SetCompressed(false); + msg->GetHeader()->Size = sizeof(TBusHeader) + plaindata.Size(); + WriteHeader(*msg->GetHeader(), *data); + data->Append(plaindata.Data(), plaindata.Size()); + } + } else { + WriteDummyHeader(*data); + CallSerialize(msg, *data); + + dataSize = msg->GetHeader()->Size = data->Size() - pos; + + data->Proceed(pos); + WriteHeader(*msg->GetHeader(), *data); + data->Proceed(pos + msg->GetHeader()->Size); + } - size_t sizeBeforeSerialize = WriterData.Buffer.Size(); + Y_ASSERT(msg->GetHeader()->Size == data->Size() - pos); + counter->AddMessage(dataSize, data->Size() - pos, msg->IsCompressed(), compressionRequested); + } - TMessageCounter messageCounter = WriterData.Status.Incremental.MessageCounter; + TBusMessage* TRemoteConnection::DeserializeMessage(TArrayRef<const char> dataRef, const TBusHeader* header, TMessageCounter* messageCounter, EMessageStatus* status) const { + size_t dataSize; + + TBusMessage* message; + if (header->FlagsInternal & MESSAGE_COMPRESS_INTERNAL) { + TBuffer msg; + { + TBuffer plaindata; + NCodecs::TCodecPtr c = Proto->GetTransportCodec(); + try { + TArrayRef<const char> payload = TBusMessage::GetPayload(dataRef); + c->Decode(TStringBuf{payload.data(), payload.size()}, plaindata); + } catch (...) { + // catch all, because + // http://nga.at.yandex-team.ru/replies.xml?item_no=3884 + *status = MESSAGE_DECOMPRESS_ERROR; + return nullptr; + } + + msg.Append(dataRef.data(), sizeof(TBusHeader)); + msg.Append(plaindata.Data(), plaindata.Size()); + } + TArrayRef<const char> msgRef(msg.Data(), msg.Size()); + dataSize = sizeof(TBusHeader) + msgRef.size(); + // TODO: track error types + message = Proto->Deserialize(header->Type, msgRef.Slice(sizeof(TBusHeader))).Release(); + if (!message) { + *status = MESSAGE_DESERIALIZE_ERROR; + return nullptr; + } + *message->GetHeader() = *header; + message->SetCompressed(true); + } else { + dataSize = dataRef.size(); + message = Proto->Deserialize(header->Type, dataRef.Slice(sizeof(TBusHeader))).Release(); + if (!message) { + *status = MESSAGE_DESERIALIZE_ERROR; + return nullptr; + } + *message->GetHeader() = *header; + } - SerializeMessage(writeMessage.Get(), &WriterData.Buffer.GetBuffer(), &messageCounter); + messageCounter->AddMessage(dataSize, dataRef.size(), header->FlagsInternal & MESSAGE_COMPRESS_INTERNAL, false); - size_t written = WriterData.Buffer.Size() - sizeBeforeSerialize; - if (written > Config.MaxMessageSize) { - WriterData.Buffer.GetBuffer().EraseBack(written); - WriterBeforeWriteErrorMessage(writeMessage.Release(), MESSAGE_MESSAGE_TOO_LARGE); - continue; - } + return message; + } + + void TRemoteConnection::ResetOneWayFlag(TArrayRef<TBusMessage*> messages) { + for (auto message : messages) { + message->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL; + } + } + + void TRemoteConnection::ReaderFlushMessages() { + if (!ReaderData.ReadMessages.empty()) { + Session->OnMessageReceived(this, ReaderData.ReadMessages); + ReaderData.ReadMessages.clear(); + } + } + + // @return false if actor should break + bool TRemoteConnection::MessageRead(TArrayRef<const char> readDataRef, TInstant now) { + TBusHeader header(readDataRef); - WriterData.Status.Incremental.MessageCounter = messageCounter; + Y_ASSERT(readDataRef.size() == header.Size); + + if (header.GetVersionInternal() != YBUS_VERSION) { + ReaderProcessMessageUnknownVersion(readDataRef); + return true; + } - TBusMessagePtrAndHeader h(writeMessage.Release()); - writeMessages.GetVector()->push_back(h); + EMessageStatus deserializeFailureStatus = MESSAGE_OK; + TBusMessage* r = DeserializeMessage(readDataRef, &header, &ReaderData.Status.Incremental.MessageCounter, &deserializeFailureStatus); - Y_ASSERT(!WriterData.Buffer.Empty()); - if (WriterData.Buffer.Size() >= Config.SendThreshold) { - break; - } + if (!r) { + Y_VERIFY(deserializeFailureStatus != MESSAGE_OK, "state check"); + LWPROBE(Error, ToString(deserializeFailureStatus), ToString(PeerAddr), ""); + ReaderData.Status.Incremental.StatusCounter[deserializeFailureStatus] += 1; + ScheduleShutdownOnServerOrReconnectOnClient(deserializeFailureStatus, false); + return false; } - - if (!WriterData.Buffer.Empty()) { - if (WriterData.Buffer.Size() >= Config.SendThreshold) { - WriterData.State = WRITER_FLUSHING; - } else if (WriterData.CorkUntil == TInstant::Zero()) { - WriterData.State = WRITER_FLUSHING; - } else if (TInstant::Now() >= WriterData.CorkUntil) { - WriterData.State = WRITER_FLUSHING; - } else { - // keep filling - Y_ASSERT(WriterData.State == WRITER_FILLING); - GetWriterSchedulerActor()->ScheduleAt(WriterData.CorkUntil); - } - } else { - // keep filling - Y_ASSERT(WriterData.State == WRITER_FILLING); - } - - size_t bytes = MessageSize(*writeMessages.GetVector()); - - QuotaReturnSelf(writeMessages.GetVector()->size(), bytes); - - // This is called before `send` syscall inducing latency - MessageSent(*writeMessages.GetVector()); - } - - size_t TRemoteConnection::MessageSize(TArrayRef<TBusMessagePtrAndHeader> messages) { - size_t size = 0; - for (const auto& message : messages) - size += message.MessagePtr->RequestSize; - - return size; - } - - size_t TRemoteConnection::GetInFlight() { - return AtomicGet(WriterData.InFlight); - } - - size_t TRemoteConnection::GetConnectSyscallsNumForTest() { - return WriterData.Status.ConnectSyscalls; - } - - void TRemoteConnection::WriterBeforeWriteErrorMessage(TBusMessage* message, EMessageStatus status) { - if (Session->IsSource_) { - CheckedCast<TRemoteClientSession*>(Session.Get())->ReleaseInFlight({message}); - WriterErrorMessage(message, status); - } else { - TBusMessagePtrAndHeader h(message); - CheckedCast<TRemoteServerSession*>(Session.Get())->ReleaseInWorkResponses(MakeArrayRef(&h, 1)); - WriterErrorMessage(h.MessagePtr.Release(), status); - } - } - - void TRemoteConnection::WriterErrorMessage(TNonDestroyingAutoPtr<TBusMessage> m, EMessageStatus status) { - TBusMessage* released = m.Release(); - WriterErrorMessages(MakeArrayRef(&released, 1), status); - } - - void TRemoteConnection::WriterErrorMessages(const TArrayRef<TBusMessage*> ms, EMessageStatus status) { - ResetOneWayFlag(ms); - - WriterData.Status.Incremental.StatusCounter[status] += ms.size(); - for (auto m : ms) { - Session->InvokeOnError(m, status); - } - } - - void TRemoteConnection::FireClientConnectionEvent(TClientConnectionEvent::EType type) { - Y_VERIFY(Session->IsSource_, "state check"); - TClientConnectionEvent event(type, ConnectionId, PeerAddr); - TRemoteClientSession* session = CheckedCast<TRemoteClientSession*>(Session.Get()); - session->ClientHandler->OnClientConnectionEvent(event); - } - - bool TRemoteConnection::IsAlive() const { - return !AtomicGet(WriterData.Down); - } + + LWPROBE(Read, r->GetHeader()->Size); + + r->ReplyTo = PeerAddrSocketAddr; + + TBusMessagePtrAndHeader h(r); + r->RecvTime = now; + + QuotaConsume(1, header.Size); + + ReaderData.ReadMessages.push_back(h); + if (ReaderData.ReadMessages.size() >= 100) { + ReaderFlushMessages(); + } + + return true; + } + + void TRemoteConnection::WriterFillBuffer() { + Y_ASSERT(WriterData.State == WRITER_FILLING); + + Y_ASSERT(WriterData.Buffer.LeftSize() == 0); + + if (Y_UNLIKELY(!WrongVersionRequests.IsEmpty())) { + TVector<TBusHeader> headers; + WrongVersionRequests.DequeueAllSingleConsumer(&headers); + for (TVector<TBusHeader>::reverse_iterator header = headers.rbegin(); + header != headers.rend(); ++header) { + TBusHeader response = *header; + response.SendTime = NBus::Now(); + response.Size = sizeof(TBusHeader); + response.FlagsInternal = 0; + response.SetVersionInternal(YBUS_VERSION); + WriteHeader(response, WriterData.Buffer.GetBuffer()); + } + + Y_ASSERT(!WriterData.Buffer.Empty()); + WriterData.State = WRITER_FLUSHING; + return; + } + + TTempTlsVector<TBusMessagePtrAndHeader, void, TVectorSwaps> writeMessages; + + for (;;) { + THolder<TBusMessage> writeMessage(WriterData.SendQueue.PopFront()); + if (!writeMessage) { + break; + } + + if (Config.Cork != TDuration::Zero()) { + if (WriterData.CorkUntil == TInstant::Zero()) { + WriterData.CorkUntil = TInstant::Now() + Config.Cork; + } + } + + size_t sizeBeforeSerialize = WriterData.Buffer.Size(); + + TMessageCounter messageCounter = WriterData.Status.Incremental.MessageCounter; + + SerializeMessage(writeMessage.Get(), &WriterData.Buffer.GetBuffer(), &messageCounter); + + size_t written = WriterData.Buffer.Size() - sizeBeforeSerialize; + if (written > Config.MaxMessageSize) { + WriterData.Buffer.GetBuffer().EraseBack(written); + WriterBeforeWriteErrorMessage(writeMessage.Release(), MESSAGE_MESSAGE_TOO_LARGE); + continue; + } + + WriterData.Status.Incremental.MessageCounter = messageCounter; + + TBusMessagePtrAndHeader h(writeMessage.Release()); + writeMessages.GetVector()->push_back(h); + + Y_ASSERT(!WriterData.Buffer.Empty()); + if (WriterData.Buffer.Size() >= Config.SendThreshold) { + break; + } + } + + if (!WriterData.Buffer.Empty()) { + if (WriterData.Buffer.Size() >= Config.SendThreshold) { + WriterData.State = WRITER_FLUSHING; + } else if (WriterData.CorkUntil == TInstant::Zero()) { + WriterData.State = WRITER_FLUSHING; + } else if (TInstant::Now() >= WriterData.CorkUntil) { + WriterData.State = WRITER_FLUSHING; + } else { + // keep filling + Y_ASSERT(WriterData.State == WRITER_FILLING); + GetWriterSchedulerActor()->ScheduleAt(WriterData.CorkUntil); + } + } else { + // keep filling + Y_ASSERT(WriterData.State == WRITER_FILLING); + } + + size_t bytes = MessageSize(*writeMessages.GetVector()); + + QuotaReturnSelf(writeMessages.GetVector()->size(), bytes); + + // This is called before `send` syscall inducing latency + MessageSent(*writeMessages.GetVector()); + } + + size_t TRemoteConnection::MessageSize(TArrayRef<TBusMessagePtrAndHeader> messages) { + size_t size = 0; + for (const auto& message : messages) + size += message.MessagePtr->RequestSize; + + return size; + } + + size_t TRemoteConnection::GetInFlight() { + return AtomicGet(WriterData.InFlight); + } + + size_t TRemoteConnection::GetConnectSyscallsNumForTest() { + return WriterData.Status.ConnectSyscalls; + } + + void TRemoteConnection::WriterBeforeWriteErrorMessage(TBusMessage* message, EMessageStatus status) { + if (Session->IsSource_) { + CheckedCast<TRemoteClientSession*>(Session.Get())->ReleaseInFlight({message}); + WriterErrorMessage(message, status); + } else { + TBusMessagePtrAndHeader h(message); + CheckedCast<TRemoteServerSession*>(Session.Get())->ReleaseInWorkResponses(MakeArrayRef(&h, 1)); + WriterErrorMessage(h.MessagePtr.Release(), status); + } + } + + void TRemoteConnection::WriterErrorMessage(TNonDestroyingAutoPtr<TBusMessage> m, EMessageStatus status) { + TBusMessage* released = m.Release(); + WriterErrorMessages(MakeArrayRef(&released, 1), status); + } + + void TRemoteConnection::WriterErrorMessages(const TArrayRef<TBusMessage*> ms, EMessageStatus status) { + ResetOneWayFlag(ms); + + WriterData.Status.Incremental.StatusCounter[status] += ms.size(); + for (auto m : ms) { + Session->InvokeOnError(m, status); + } + } + + void TRemoteConnection::FireClientConnectionEvent(TClientConnectionEvent::EType type) { + Y_VERIFY(Session->IsSource_, "state check"); + TClientConnectionEvent event(type, ConnectionId, PeerAddr); + TRemoteClientSession* session = CheckedCast<TRemoteClientSession*>(Session.Get()); + session->ClientHandler->OnClientConnectionEvent(event); + } + + bool TRemoteConnection::IsAlive() const { + return !AtomicGet(WriterData.Down); + } } } |