diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/remote_connection.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/remote_connection.cpp')
-rw-r--r-- | library/cpp/messagebus/remote_connection.cpp | 974 |
1 files changed, 974 insertions, 0 deletions
diff --git a/library/cpp/messagebus/remote_connection.cpp b/library/cpp/messagebus/remote_connection.cpp new file mode 100644 index 0000000000..22932569db --- /dev/null +++ b/library/cpp/messagebus/remote_connection.cpp @@ -0,0 +1,974 @@ +#include "remote_connection.h" + +#include "key_value_printer.h" +#include "mb_lwtrace.h" +#include "network.h" +#include "remote_client_connection.h" +#include "remote_client_session.h" +#include "remote_server_session.h" +#include "session_impl.h" + +#include <library/cpp/messagebus/actor/what_thread_does.h> + +#include <util/generic/cast.h> +#include <util/network/init.h> +#include <util/system/atomic.h> + +LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER) + +using namespace NActor; +using namespace NBus; +using namespace NBus::NPrivate; + +namespace NBus { + namespace NPrivate { + TRemoteConnection::TRemoteConnection(TRemoteSessionPtr session, ui64 connectionId, TNetAddr addr) + : TActor<TRemoteConnection, TWriterTag>(session->Queue->WorkQueue.Get()) + , TActor<TRemoteConnection, TReaderTag>(session->Queue->WorkQueue.Get()) + , TScheduleActor<TRemoteConnection, TWriterTag>(&session->Queue->Scheduler) + , Session(session) + , Proto(session->Proto) + , Config(session->Config) + , RemovedFromSession(false) + , ConnectionId(connectionId) + , PeerAddr(addr) + , PeerAddrSocketAddr(addr) + , CreatedTime(TInstant::Now()) + , ReturnConnectFailedImmediately(false) + , GranStatus(Config.Secret.StatusFlushPeriod) + , QuotaMsg(!Session->IsSource_, Config.PerConnectionMaxInFlight, 0) + , QuotaBytes(!Session->IsSource_, Config.PerConnectionMaxInFlightBySize, 0) + , MaxBufferSize(session->Config.MaxBufferSize) + , ShutdownReason(MESSAGE_OK) + { + WriterData.Status.ConnectionId = connectionId; + WriterData.Status.PeerAddr = PeerAddr; + ReaderData.Status.ConnectionId = connectionId; + + const TInstant now = TInstant::Now(); + + WriterFillStatus(); + + GranStatus.Writer.Update(WriterData.Status, now, true); + GranStatus.Reader.Update(ReaderData.Status, now, true); + } + + TRemoteConnection::~TRemoteConnection() { + Y_VERIFY(ReplyQueue.IsEmpty()); + } + + TRemoteConnection::TWriterData::TWriterData() + : Down(0) + , SocketVersion(0) + , InFlight(0) + , AwakeFlags(0) + , State(WRITER_FILLING) + { + } + + TRemoteConnection::TWriterData::~TWriterData() { + Y_VERIFY(AtomicGet(Down)); + Y_VERIFY(SendQueue.Empty()); + } + + bool TRemoteConnection::TReaderData::HasBytesInBuf(size_t bytes) noexcept { + size_t left = Buffer.Size() - Offset; + + return (MoreBytes = left >= bytes ? 0 : bytes - left) == 0; + } + + void TRemoteConnection::TWriterData::SetChannel(NEventLoop::TChannelPtr channel) { + Y_VERIFY(!Channel, "must not have channel"); + Y_VERIFY(Buffer.GetBuffer().Empty() && Buffer.LeftSize() == 0, "buffer must be empty"); + Y_VERIFY(State == WRITER_FILLING, "state must be initial"); + Channel = channel; + } + + void TRemoteConnection::TReaderData::SetChannel(NEventLoop::TChannelPtr channel) { + Y_VERIFY(!Channel, "must not have channel"); + Y_VERIFY(Buffer.Empty(), "buffer must be empty"); + Channel = channel; + } + + void TRemoteConnection::TWriterData::DropChannel() { + if (!!Channel) { + Channel->Unregister(); + Channel.Drop(); + } + + Buffer.Reset(); + State = WRITER_FILLING; + } + + void TRemoteConnection::TReaderData::DropChannel() { + // TODO: make Drop call Unregister + if (!!Channel) { + Channel->Unregister(); + Channel.Drop(); + } + Buffer.Reset(); + Offset = 0; + } + + TRemoteConnection::TReaderData::TReaderData() + : Down(0) + , SocketVersion(0) + , Offset(0) + , MoreBytes(0) + { + } + + TRemoteConnection::TReaderData::~TReaderData() { + Y_VERIFY(AtomicGet(Down)); + } + + void TRemoteConnection::Send(TNonDestroyingAutoPtr<TBusMessage> msg) { + BeforeSendQueue.Enqueue(msg.Release()); + AtomicIncrement(WriterData.InFlight); + ScheduleWrite(); + } + + void TRemoteConnection::ClearOutgoingQueue(TMessagesPtrs& result, bool reconnect) { + if (!reconnect) { + // Do not clear send queue if reconnecting + WriterData.SendQueue.Clear(&result); + } + } + + void TRemoteConnection::Shutdown(EMessageStatus status) { + ScheduleShutdown(status); + + ReaderData.ShutdownComplete.WaitI(); + WriterData.ShutdownComplete.WaitI(); + } + + void TRemoteConnection::TryConnect() { + Y_FAIL("TryConnect is client connection only operation"); + } + + void TRemoteConnection::ScheduleRead() { + GetReaderActor()->Schedule(); + } + + void TRemoteConnection::ScheduleWrite() { + GetWriterActor()->Schedule(); + } + + void TRemoteConnection::WriterRotateCounters() { + if (!WriterData.TimeToRotateCounters.FetchTask()) { + return; + } + + WriterData.Status.DurationCounterPrev = WriterData.Status.DurationCounter; + Reset(WriterData.Status.DurationCounter); + } + + void TRemoteConnection::WriterSendStatus(TInstant now, bool force) { + GranStatus.Writer.Update(std::bind(&TRemoteConnection::WriterGetStatus, this), now, force); + } + + void TRemoteConnection::ReaderSendStatus(TInstant now, bool force) { + GranStatus.Reader.Update(std::bind(&TRemoteConnection::ReaderFillStatus, this), now, force); + } + + const TRemoteConnectionReaderStatus& TRemoteConnection::ReaderFillStatus() { + ReaderData.Status.BufferSize = ReaderData.Buffer.Capacity(); + ReaderData.Status.QuotaMsg = QuotaMsg.Tokens(); + ReaderData.Status.QuotaBytes = QuotaBytes.Tokens(); + + return ReaderData.Status; + } + + void TRemoteConnection::ProcessItem(TReaderTag, ::NActor::TDefaultTag, TWriterToReaderSocketMessage readSocket) { + if (AtomicGet(ReaderData.Down)) { + ReaderData.Status.Fd = INVALID_SOCKET; + return; + } + + ReaderData.DropChannel(); + + ReaderData.Status.Fd = readSocket.Socket; + ReaderData.SocketVersion = readSocket.SocketVersion; + + if (readSocket.Socket != INVALID_SOCKET) { + ReaderData.SetChannel(Session->ReadEventLoop.Register(readSocket.Socket, this, ReadCookie)); + ReaderData.Channel->EnableRead(); + } + } + + void TRemoteConnection::ProcessItem(TWriterTag, TReconnectTag, ui32 socketVersion) { + Y_VERIFY(socketVersion <= WriterData.SocketVersion, "something weird"); + + if (WriterData.SocketVersion != socketVersion) { + return; + } + Y_VERIFY(WriterData.Status.Connected, "must be connected at this point"); + Y_VERIFY(!!WriterData.Channel, "must have channel at this point"); + + WriterData.Status.Connected = false; + WriterData.DropChannel(); + WriterData.Status.MyAddr = TNetAddr(); + ++WriterData.SocketVersion; + LastConnectAttempt = TInstant(); + + TMessagesPtrs cleared; + ClearOutgoingQueue(cleared, true); + WriterErrorMessages(cleared, MESSAGE_DELIVERY_FAILED); + + FireClientConnectionEvent(TClientConnectionEvent::DISCONNECTED); + + ReaderGetSocketQueue()->EnqueueAndSchedule(TWriterToReaderSocketMessage(INVALID_SOCKET, WriterData.SocketVersion)); + } + + void TRemoteConnection::ProcessItem(TWriterTag, TWakeReaderTag, ui32 awakeFlags) { + WriterData.AwakeFlags |= awakeFlags; + + ReadQuotaWakeup(); + } + + void TRemoteConnection::Act(TReaderTag) { + TInstant now = TInstant::Now(); + + ReaderData.Status.Acts += 1; + + ReaderGetSocketQueue()->DequeueAllLikelyEmpty(); + + if (AtomicGet(ReaderData.Down)) { + ReaderData.DropChannel(); + + ReaderProcessStatusDown(); + ReaderData.ShutdownComplete.Signal(); + + } else if (!!ReaderData.Channel) { + Y_ASSERT(ReaderData.ReadMessages.empty()); + + for (int i = 0;; ++i) { + if (i == 100) { + // perform other tasks + GetReaderActor()->AddTaskFromActorLoop(); + break; + } + + if (NeedInterruptRead()) { + ReaderData.Channel->EnableRead(); + break; + } + + if (!ReaderFillBuffer()) + break; + + if (!ReaderProcessBuffer()) + break; + } + + ReaderFlushMessages(); + } + + ReaderSendStatus(now); + } + + bool TRemoteConnection::QuotaAcquire(size_t msg, size_t bytes) { + ui32 wakeFlags = 0; + + if (!QuotaMsg.Acquire(msg)) + wakeFlags |= WAKE_QUOTA_MSG; + + else if (!QuotaBytes.Acquire(bytes)) + wakeFlags |= WAKE_QUOTA_BYTES; + + if (wakeFlags) { + ReaderData.Status.QuotaExhausted++; + + WriterGetWakeQueue()->EnqueueAndSchedule(wakeFlags); + } + + return wakeFlags == 0; + } + + void TRemoteConnection::QuotaConsume(size_t msg, size_t bytes) { + QuotaMsg.Consume(msg); + QuotaBytes.Consume(bytes); + } + + void TRemoteConnection::QuotaReturnSelf(size_t items, size_t bytes) { + if (QuotaReturnValues(items, bytes)) + ReadQuotaWakeup(); + } + + void TRemoteConnection::QuotaReturnAside(size_t items, size_t bytes) { + if (QuotaReturnValues(items, bytes) && !AtomicGet(WriterData.Down)) + WriterGetWakeQueue()->EnqueueAndSchedule(0x0); + } + + bool TRemoteConnection::QuotaReturnValues(size_t items, size_t bytes) { + bool rMsg = QuotaMsg.Return(items); + bool rBytes = QuotaBytes.Return(bytes); + + return rMsg || rBytes; + } + + void TRemoteConnection::ReadQuotaWakeup() { + const ui32 mask = WriterData.AwakeFlags & WriteWakeFlags(); + + if (mask && mask == WriterData.AwakeFlags) { + WriterData.Status.ReaderWakeups++; + WriterData.AwakeFlags = 0; + + ScheduleRead(); + } + } + + ui32 TRemoteConnection::WriteWakeFlags() const { + ui32 awakeFlags = 0; + + if (QuotaMsg.IsAboveWake()) + awakeFlags |= WAKE_QUOTA_MSG; + + if (QuotaBytes.IsAboveWake()) + awakeFlags |= WAKE_QUOTA_BYTES; + + return awakeFlags; + } + + bool TRemoteConnection::ReaderProcessBuffer() { + TInstant now = TInstant::Now(); + + for (;;) { + if (!ReaderData.HasBytesInBuf(sizeof(TBusHeader))) { + break; + } + + TBusHeader header(MakeArrayRef(ReaderData.Buffer.Data() + ReaderData.Offset, ReaderData.Buffer.Size() - ReaderData.Offset)); + + if (header.Size < sizeof(TBusHeader)) { + LWPROBE(Error, ToString(MESSAGE_HEADER_CORRUPTED), ToString(PeerAddr), ToString(header.Size)); + ReaderData.Status.Incremental.StatusCounter[MESSAGE_HEADER_CORRUPTED] += 1; + ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_HEADER_CORRUPTED, false); + return false; + } + + if (!IsVersionNegotiation(header) && !IsBusKeyValid(header.Id)) { + LWPROBE(Error, ToString(MESSAGE_HEADER_CORRUPTED), ToString(PeerAddr), ToString(header.Size)); + ReaderData.Status.Incremental.StatusCounter[MESSAGE_HEADER_CORRUPTED] += 1; + ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_HEADER_CORRUPTED, false); + return false; + } + + if (header.Size > Config.MaxMessageSize) { + LWPROBE(Error, ToString(MESSAGE_MESSAGE_TOO_LARGE), ToString(PeerAddr), ToString(header.Size)); + ReaderData.Status.Incremental.StatusCounter[MESSAGE_MESSAGE_TOO_LARGE] += 1; + ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_MESSAGE_TOO_LARGE, false); + return false; + } + + if (!ReaderData.HasBytesInBuf(header.Size)) { + if (ReaderData.Offset == 0) { + ReaderData.Buffer.Reserve(header.Size); + } + break; + } + + if (!QuotaAcquire(1, header.Size)) + return false; + + if (!MessageRead(MakeArrayRef(ReaderData.Buffer.Data() + ReaderData.Offset, header.Size), now)) { + return false; + } + + ReaderData.Offset += header.Size; + } + + ReaderData.Buffer.ChopHead(ReaderData.Offset); + ReaderData.Offset = 0; + + if (ReaderData.Buffer.Capacity() > MaxBufferSize && ReaderData.Buffer.Size() <= MaxBufferSize) { + ReaderData.Status.Incremental.BufferDrops += 1; + + TBuffer temp; + // probably should use another constant + temp.Reserve(Config.DefaultBufferSize); + temp.Append(ReaderData.Buffer.Data(), ReaderData.Buffer.Size()); + + ReaderData.Buffer.Swap(temp); + } + + return true; + } + + bool TRemoteConnection::ReaderFillBuffer() { + if (!ReaderData.BufferMore()) + return true; + + if (ReaderData.Buffer.Avail() == 0) { + if (ReaderData.Buffer.Size() == 0) { + ReaderData.Buffer.Reserve(Config.DefaultBufferSize); + } else { + ReaderData.Buffer.Reserve(ReaderData.Buffer.Size() * 2); + } + } + + Y_ASSERT(ReaderData.Buffer.Avail() > 0); + + ssize_t bytes; + { + TWhatThreadDoesPushPop pp("recv syscall"); + bytes = SocketRecv(ReaderData.Channel->GetSocket(), TArrayRef<char>(ReaderData.Buffer.Pos(), ReaderData.Buffer.Avail())); + } + + if (bytes < 0) { + if (WouldBlock()) { + ReaderData.Channel->EnableRead(); + return false; + } else { + ReaderData.Channel->DisableRead(); + ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_DELIVERY_FAILED, false); + return false; + } + } + + if (bytes == 0) { + ReaderData.Channel->DisableRead(); + // TODO: incorrect: it is possible that only input is shutdown, and output is available + ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_DELIVERY_FAILED, false); + return false; + } + + ReaderData.Status.Incremental.NetworkOps += 1; + + ReaderData.Buffer.Advance(bytes); + ReaderData.MoreBytes = 0; + return true; + } + + void TRemoteConnection::ClearBeforeSendQueue(EMessageStatus reason) { + BeforeSendQueue.DequeueAll(std::bind(&TRemoteConnection::WriterBeforeWriteErrorMessage, this, std::placeholders::_1, reason)); + } + + void TRemoteConnection::ClearReplyQueue(EMessageStatus reason) { + TVectorSwaps<TBusMessagePtrAndHeader> replyQueueTemp; + Y_ASSERT(replyQueueTemp.empty()); + ReplyQueue.DequeueAllSingleConsumer(&replyQueueTemp); + + TVector<TBusMessage*> messages; + for (TVectorSwaps<TBusMessagePtrAndHeader>::reverse_iterator message = replyQueueTemp.rbegin(); + message != replyQueueTemp.rend(); ++message) { + messages.push_back(message->MessagePtr.Release()); + } + + WriterErrorMessages(messages, reason); + + replyQueueTemp.clear(); + } + + void TRemoteConnection::ProcessBeforeSendQueueMessage(TBusMessage* message, TInstant now) { + // legacy clients expect this field to be set + if (!Session->IsSource_) { + message->SendTime = now.MilliSeconds(); + } + + WriterData.SendQueue.PushBack(message); + } + + void TRemoteConnection::ProcessBeforeSendQueue(TInstant now) { + BeforeSendQueue.DequeueAll(std::bind(&TRemoteConnection::ProcessBeforeSendQueueMessage, this, std::placeholders::_1, now)); + } + + void TRemoteConnection::WriterFillInFlight() { + // this is hack for TLoadBalancedProtocol + WriterFillStatus(); + AtomicSet(WriterData.InFlight, WriterData.Status.GetInFlight()); + } + + const TRemoteConnectionWriterStatus& TRemoteConnection::WriterGetStatus() { + WriterRotateCounters(); + WriterFillStatus(); + + return WriterData.Status; + } + + void TRemoteConnection::WriterFillStatus() { + if (!!WriterData.Channel) { + WriterData.Status.Fd = WriterData.Channel->GetSocket(); + } else { + WriterData.Status.Fd = INVALID_SOCKET; + } + WriterData.Status.BufferSize = WriterData.Buffer.Capacity(); + WriterData.Status.SendQueueSize = WriterData.SendQueue.Size(); + WriterData.Status.State = WriterData.State; + } + + void TRemoteConnection::WriterProcessStatusDown() { + Session->GetDeadConnectionWriterStatusQueue()->EnqueueAndSchedule(WriterData.Status.Incremental); + Reset(WriterData.Status.Incremental); + } + + void TRemoteConnection::ReaderProcessStatusDown() { + Session->GetDeadConnectionReaderStatusQueue()->EnqueueAndSchedule(ReaderData.Status.Incremental); + Reset(ReaderData.Status.Incremental); + } + + void TRemoteConnection::ProcessWriterDown() { + if (!RemovedFromSession) { + Session->GetRemoveConnectionQueue()->EnqueueAndSchedule(this); + + if (Session->IsSource_) { + if (WriterData.Status.Connected) { + FireClientConnectionEvent(TClientConnectionEvent::DISCONNECTED); + } + } + + LWPROBE(Disconnected, ToString(PeerAddr)); + RemovedFromSession = true; + } + + WriterData.DropChannel(); + + DropEnqueuedData(ShutdownReason, MESSAGE_SHUTDOWN); + + WriterProcessStatusDown(); + + WriterData.ShutdownComplete.Signal(); + } + + void TRemoteConnection::DropEnqueuedData(EMessageStatus reason, EMessageStatus reasonForQueues) { + ClearReplyQueue(reasonForQueues); + ClearBeforeSendQueue(reasonForQueues); + WriterGetReconnectQueue()->Clear(); + WriterGetWakeQueue()->Clear(); + + TMessagesPtrs cleared; + ClearOutgoingQueue(cleared, false); + + if (!Session->IsSource_) { + for (auto& i : cleared) { + TBusMessagePtrAndHeader h(i); + CheckedCast<TRemoteServerSession*>(Session.Get())->ReleaseInWorkResponses(MakeArrayRef(&h, 1)); + // assignment back is weird + i = h.MessagePtr.Release(); + // and this part is not batch + } + } + + WriterErrorMessages(cleared, reason); + } + + void TRemoteConnection::BeforeTryWrite() { + } + + void TRemoteConnection::Act(TWriterTag) { + TInstant now = TInstant::Now(); + + WriterData.Status.Acts += 1; + + if (Y_UNLIKELY(AtomicGet(WriterData.Down))) { + // dump status must work even if WriterDown + WriterSendStatus(now, true); + ProcessWriterDown(); + return; + } + + ProcessBeforeSendQueue(now); + + BeforeTryWrite(); + + WriterFillInFlight(); + + WriterGetReconnectQueue()->DequeueAllLikelyEmpty(); + + if (!WriterData.Status.Connected) { + TryConnect(); + } else { + for (int i = 0;; ++i) { + if (i == 100) { + // perform other tasks + GetWriterActor()->AddTaskFromActorLoop(); + break; + } + + if (WriterData.State == WRITER_FILLING) { + WriterFillBuffer(); + + if (WriterData.State == WRITER_FILLING) { + WriterData.Channel->DisableWrite(); + break; + } + + Y_ASSERT(!WriterData.Buffer.Empty()); + } + + if (WriterData.State == WRITER_FLUSHING) { + WriterFlushBuffer(); + + if (WriterData.State == WRITER_FLUSHING) { + break; + } + } + } + } + + WriterGetWakeQueue()->DequeueAllLikelyEmpty(); + + WriterSendStatus(now); + } + + void TRemoteConnection::WriterFlushBuffer() { + Y_ASSERT(WriterData.State == WRITER_FLUSHING); + Y_ASSERT(!WriterData.Buffer.Empty()); + + WriterData.CorkUntil = TInstant::Zero(); + + while (!WriterData.Buffer.Empty()) { + ssize_t bytes; + { + TWhatThreadDoesPushPop pp("send syscall"); + bytes = SocketSend(WriterData.Channel->GetSocket(), TArrayRef<const char>(WriterData.Buffer.LeftPos(), WriterData.Buffer.Size())); + } + + if (bytes < 0) { + if (WouldBlock()) { + WriterData.Channel->EnableWrite(); + return; + } else { + WriterData.Channel->DisableWrite(); + ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_DELIVERY_FAILED, true); + return; + } + } + + WriterData.Status.Incremental.NetworkOps += 1; + + WriterData.Buffer.LeftProceed(bytes); + } + + WriterData.Buffer.Clear(); + if (WriterData.Buffer.Capacity() > MaxBufferSize) { + WriterData.Status.Incremental.BufferDrops += 1; + WriterData.Buffer.Reset(); + } + + WriterData.State = WRITER_FILLING; + } + + void TRemoteConnection::ScheduleShutdownOnServerOrReconnectOnClient(EMessageStatus status, bool writer) { + if (Session->IsSource_) { + WriterGetReconnectQueue()->EnqueueAndSchedule(writer ? WriterData.SocketVersion : ReaderData.SocketVersion); + } else { + ScheduleShutdown(status); + } + } + + void TRemoteConnection::ScheduleShutdown(EMessageStatus status) { + ShutdownReason = status; + + AtomicSet(ReaderData.Down, 1); + ScheduleRead(); + + AtomicSet(WriterData.Down, 1); + ScheduleWrite(); + } + + void TRemoteConnection::CallSerialize(TBusMessage* msg, TBuffer& buffer) const { + size_t posForAssertion = buffer.Size(); + Proto->Serialize(msg, buffer); + Y_VERIFY(buffer.Size() >= posForAssertion, + "incorrect Serialize implementation, pos before serialize: %d, pos after serialize: %d", + int(posForAssertion), int(buffer.Size())); + } + + namespace { + inline void WriteHeader(const TBusHeader& header, TBuffer& data) { + data.Reserve(data.Size() + sizeof(TBusHeader)); + /// \todo hton instead of memcpy + memcpy(data.Data() + data.Size(), &header, sizeof(TBusHeader)); + data.Advance(sizeof(TBusHeader)); + } + + inline void WriteDummyHeader(TBuffer& data) { + data.Resize(data.Size() + sizeof(TBusHeader)); + } + + } + + void TRemoteConnection::SerializeMessage(TBusMessage* msg, TBuffer* data, TMessageCounter* counter) const { + size_t pos = data->Size(); + + size_t dataSize; + + bool compressionRequested = msg->IsCompressed(); + + if (compressionRequested) { + TBuffer compdata; + TBuffer plaindata; + CallSerialize(msg, plaindata); + + dataSize = sizeof(TBusHeader) + plaindata.Size(); + + NCodecs::TCodecPtr c = Proto->GetTransportCodec(); + c->Encode(TStringBuf{plaindata.data(), plaindata.size()}, compdata); + + if (compdata.Size() < plaindata.Size()) { + plaindata.Clear(); + msg->GetHeader()->Size = sizeof(TBusHeader) + compdata.Size(); + WriteHeader(*msg->GetHeader(), *data); + data->Append(compdata.Data(), compdata.Size()); + } else { + compdata.Clear(); + msg->SetCompressed(false); + msg->GetHeader()->Size = sizeof(TBusHeader) + plaindata.Size(); + WriteHeader(*msg->GetHeader(), *data); + data->Append(plaindata.Data(), plaindata.Size()); + } + } else { + WriteDummyHeader(*data); + CallSerialize(msg, *data); + + dataSize = msg->GetHeader()->Size = data->Size() - pos; + + data->Proceed(pos); + WriteHeader(*msg->GetHeader(), *data); + data->Proceed(pos + msg->GetHeader()->Size); + } + + Y_ASSERT(msg->GetHeader()->Size == data->Size() - pos); + counter->AddMessage(dataSize, data->Size() - pos, msg->IsCompressed(), compressionRequested); + } + + TBusMessage* TRemoteConnection::DeserializeMessage(TArrayRef<const char> dataRef, const TBusHeader* header, TMessageCounter* messageCounter, EMessageStatus* status) const { + size_t dataSize; + + TBusMessage* message; + if (header->FlagsInternal & MESSAGE_COMPRESS_INTERNAL) { + TBuffer msg; + { + TBuffer plaindata; + NCodecs::TCodecPtr c = Proto->GetTransportCodec(); + try { + TArrayRef<const char> payload = TBusMessage::GetPayload(dataRef); + c->Decode(TStringBuf{payload.data(), payload.size()}, plaindata); + } catch (...) { + // catch all, because + // http://nga.at.yandex-team.ru/replies.xml?item_no=3884 + *status = MESSAGE_DECOMPRESS_ERROR; + return nullptr; + } + + msg.Append(dataRef.data(), sizeof(TBusHeader)); + msg.Append(plaindata.Data(), plaindata.Size()); + } + TArrayRef<const char> msgRef(msg.Data(), msg.Size()); + dataSize = sizeof(TBusHeader) + msgRef.size(); + // TODO: track error types + message = Proto->Deserialize(header->Type, msgRef.Slice(sizeof(TBusHeader))).Release(); + if (!message) { + *status = MESSAGE_DESERIALIZE_ERROR; + return nullptr; + } + *message->GetHeader() = *header; + message->SetCompressed(true); + } else { + dataSize = dataRef.size(); + message = Proto->Deserialize(header->Type, dataRef.Slice(sizeof(TBusHeader))).Release(); + if (!message) { + *status = MESSAGE_DESERIALIZE_ERROR; + return nullptr; + } + *message->GetHeader() = *header; + } + + messageCounter->AddMessage(dataSize, dataRef.size(), header->FlagsInternal & MESSAGE_COMPRESS_INTERNAL, false); + + return message; + } + + void TRemoteConnection::ResetOneWayFlag(TArrayRef<TBusMessage*> messages) { + for (auto message : messages) { + message->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL; + } + } + + void TRemoteConnection::ReaderFlushMessages() { + if (!ReaderData.ReadMessages.empty()) { + Session->OnMessageReceived(this, ReaderData.ReadMessages); + ReaderData.ReadMessages.clear(); + } + } + + // @return false if actor should break + bool TRemoteConnection::MessageRead(TArrayRef<const char> readDataRef, TInstant now) { + TBusHeader header(readDataRef); + + Y_ASSERT(readDataRef.size() == header.Size); + + if (header.GetVersionInternal() != YBUS_VERSION) { + ReaderProcessMessageUnknownVersion(readDataRef); + return true; + } + + EMessageStatus deserializeFailureStatus = MESSAGE_OK; + TBusMessage* r = DeserializeMessage(readDataRef, &header, &ReaderData.Status.Incremental.MessageCounter, &deserializeFailureStatus); + + if (!r) { + Y_VERIFY(deserializeFailureStatus != MESSAGE_OK, "state check"); + LWPROBE(Error, ToString(deserializeFailureStatus), ToString(PeerAddr), ""); + ReaderData.Status.Incremental.StatusCounter[deserializeFailureStatus] += 1; + ScheduleShutdownOnServerOrReconnectOnClient(deserializeFailureStatus, false); + return false; + } + + LWPROBE(Read, r->GetHeader()->Size); + + r->ReplyTo = PeerAddrSocketAddr; + + TBusMessagePtrAndHeader h(r); + r->RecvTime = now; + + QuotaConsume(1, header.Size); + + ReaderData.ReadMessages.push_back(h); + if (ReaderData.ReadMessages.size() >= 100) { + ReaderFlushMessages(); + } + + return true; + } + + void TRemoteConnection::WriterFillBuffer() { + Y_ASSERT(WriterData.State == WRITER_FILLING); + + Y_ASSERT(WriterData.Buffer.LeftSize() == 0); + + if (Y_UNLIKELY(!WrongVersionRequests.IsEmpty())) { + TVector<TBusHeader> headers; + WrongVersionRequests.DequeueAllSingleConsumer(&headers); + for (TVector<TBusHeader>::reverse_iterator header = headers.rbegin(); + header != headers.rend(); ++header) { + TBusHeader response = *header; + response.SendTime = NBus::Now(); + response.Size = sizeof(TBusHeader); + response.FlagsInternal = 0; + response.SetVersionInternal(YBUS_VERSION); + WriteHeader(response, WriterData.Buffer.GetBuffer()); + } + + Y_ASSERT(!WriterData.Buffer.Empty()); + WriterData.State = WRITER_FLUSHING; + return; + } + + TTempTlsVector<TBusMessagePtrAndHeader, void, TVectorSwaps> writeMessages; + + for (;;) { + THolder<TBusMessage> writeMessage(WriterData.SendQueue.PopFront()); + if (!writeMessage) { + break; + } + + if (Config.Cork != TDuration::Zero()) { + if (WriterData.CorkUntil == TInstant::Zero()) { + WriterData.CorkUntil = TInstant::Now() + Config.Cork; + } + } + + size_t sizeBeforeSerialize = WriterData.Buffer.Size(); + + TMessageCounter messageCounter = WriterData.Status.Incremental.MessageCounter; + + SerializeMessage(writeMessage.Get(), &WriterData.Buffer.GetBuffer(), &messageCounter); + + size_t written = WriterData.Buffer.Size() - sizeBeforeSerialize; + if (written > Config.MaxMessageSize) { + WriterData.Buffer.GetBuffer().EraseBack(written); + WriterBeforeWriteErrorMessage(writeMessage.Release(), MESSAGE_MESSAGE_TOO_LARGE); + continue; + } + + WriterData.Status.Incremental.MessageCounter = messageCounter; + + TBusMessagePtrAndHeader h(writeMessage.Release()); + writeMessages.GetVector()->push_back(h); + + Y_ASSERT(!WriterData.Buffer.Empty()); + if (WriterData.Buffer.Size() >= Config.SendThreshold) { + break; + } + } + + if (!WriterData.Buffer.Empty()) { + if (WriterData.Buffer.Size() >= Config.SendThreshold) { + WriterData.State = WRITER_FLUSHING; + } else if (WriterData.CorkUntil == TInstant::Zero()) { + WriterData.State = WRITER_FLUSHING; + } else if (TInstant::Now() >= WriterData.CorkUntil) { + WriterData.State = WRITER_FLUSHING; + } else { + // keep filling + Y_ASSERT(WriterData.State == WRITER_FILLING); + GetWriterSchedulerActor()->ScheduleAt(WriterData.CorkUntil); + } + } else { + // keep filling + Y_ASSERT(WriterData.State == WRITER_FILLING); + } + + size_t bytes = MessageSize(*writeMessages.GetVector()); + + QuotaReturnSelf(writeMessages.GetVector()->size(), bytes); + + // This is called before `send` syscall inducing latency + MessageSent(*writeMessages.GetVector()); + } + + size_t TRemoteConnection::MessageSize(TArrayRef<TBusMessagePtrAndHeader> messages) { + size_t size = 0; + for (const auto& message : messages) + size += message.MessagePtr->RequestSize; + + return size; + } + + size_t TRemoteConnection::GetInFlight() { + return AtomicGet(WriterData.InFlight); + } + + size_t TRemoteConnection::GetConnectSyscallsNumForTest() { + return WriterData.Status.ConnectSyscalls; + } + + void TRemoteConnection::WriterBeforeWriteErrorMessage(TBusMessage* message, EMessageStatus status) { + if (Session->IsSource_) { + CheckedCast<TRemoteClientSession*>(Session.Get())->ReleaseInFlight({message}); + WriterErrorMessage(message, status); + } else { + TBusMessagePtrAndHeader h(message); + CheckedCast<TRemoteServerSession*>(Session.Get())->ReleaseInWorkResponses(MakeArrayRef(&h, 1)); + WriterErrorMessage(h.MessagePtr.Release(), status); + } + } + + void TRemoteConnection::WriterErrorMessage(TNonDestroyingAutoPtr<TBusMessage> m, EMessageStatus status) { + TBusMessage* released = m.Release(); + WriterErrorMessages(MakeArrayRef(&released, 1), status); + } + + void TRemoteConnection::WriterErrorMessages(const TArrayRef<TBusMessage*> ms, EMessageStatus status) { + ResetOneWayFlag(ms); + + WriterData.Status.Incremental.StatusCounter[status] += ms.size(); + for (auto m : ms) { + Session->InvokeOnError(m, status); + } + } + + void TRemoteConnection::FireClientConnectionEvent(TClientConnectionEvent::EType type) { + Y_VERIFY(Session->IsSource_, "state check"); + TClientConnectionEvent event(type, ConnectionId, PeerAddr); + TRemoteClientSession* session = CheckedCast<TRemoteClientSession*>(Session.Get()); + session->ClientHandler->OnClientConnectionEvent(event); + } + + bool TRemoteConnection::IsAlive() const { + return !AtomicGet(WriterData.Down); + } + + } +} |