aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/remote_connection.cpp
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:17 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:17 +0300
commitd3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch)
treedd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/messagebus/remote_connection.cpp
parent72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff)
downloadydb-d3a398281c6fd1d3672036cb2d63f842d2cb28c5.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/remote_connection.cpp')
-rw-r--r--library/cpp/messagebus/remote_connection.cpp1614
1 files changed, 807 insertions, 807 deletions
diff --git a/library/cpp/messagebus/remote_connection.cpp b/library/cpp/messagebus/remote_connection.cpp
index ca4a66f68a..22932569db 100644
--- a/library/cpp/messagebus/remote_connection.cpp
+++ b/library/cpp/messagebus/remote_connection.cpp
@@ -20,955 +20,955 @@ using namespace NActor;
using namespace NBus;
using namespace NBus::NPrivate;
-namespace NBus {
- namespace NPrivate {
- TRemoteConnection::TRemoteConnection(TRemoteSessionPtr session, ui64 connectionId, TNetAddr addr)
- : TActor<TRemoteConnection, TWriterTag>(session->Queue->WorkQueue.Get())
- , TActor<TRemoteConnection, TReaderTag>(session->Queue->WorkQueue.Get())
- , TScheduleActor<TRemoteConnection, TWriterTag>(&session->Queue->Scheduler)
- , Session(session)
- , Proto(session->Proto)
- , Config(session->Config)
- , RemovedFromSession(false)
- , ConnectionId(connectionId)
- , PeerAddr(addr)
- , PeerAddrSocketAddr(addr)
- , CreatedTime(TInstant::Now())
- , ReturnConnectFailedImmediately(false)
- , GranStatus(Config.Secret.StatusFlushPeriod)
- , QuotaMsg(!Session->IsSource_, Config.PerConnectionMaxInFlight, 0)
- , QuotaBytes(!Session->IsSource_, Config.PerConnectionMaxInFlightBySize, 0)
- , MaxBufferSize(session->Config.MaxBufferSize)
- , ShutdownReason(MESSAGE_OK)
- {
- WriterData.Status.ConnectionId = connectionId;
- WriterData.Status.PeerAddr = PeerAddr;
- ReaderData.Status.ConnectionId = connectionId;
-
- const TInstant now = TInstant::Now();
-
- WriterFillStatus();
-
- GranStatus.Writer.Update(WriterData.Status, now, true);
- GranStatus.Reader.Update(ReaderData.Status, now, true);
- }
-
- TRemoteConnection::~TRemoteConnection() {
- Y_VERIFY(ReplyQueue.IsEmpty());
- }
-
- TRemoteConnection::TWriterData::TWriterData()
- : Down(0)
- , SocketVersion(0)
- , InFlight(0)
- , AwakeFlags(0)
- , State(WRITER_FILLING)
- {
- }
-
- TRemoteConnection::TWriterData::~TWriterData() {
+namespace NBus {
+ namespace NPrivate {
+ TRemoteConnection::TRemoteConnection(TRemoteSessionPtr session, ui64 connectionId, TNetAddr addr)
+ : TActor<TRemoteConnection, TWriterTag>(session->Queue->WorkQueue.Get())
+ , TActor<TRemoteConnection, TReaderTag>(session->Queue->WorkQueue.Get())
+ , TScheduleActor<TRemoteConnection, TWriterTag>(&session->Queue->Scheduler)
+ , Session(session)
+ , Proto(session->Proto)
+ , Config(session->Config)
+ , RemovedFromSession(false)
+ , ConnectionId(connectionId)
+ , PeerAddr(addr)
+ , PeerAddrSocketAddr(addr)
+ , CreatedTime(TInstant::Now())
+ , ReturnConnectFailedImmediately(false)
+ , GranStatus(Config.Secret.StatusFlushPeriod)
+ , QuotaMsg(!Session->IsSource_, Config.PerConnectionMaxInFlight, 0)
+ , QuotaBytes(!Session->IsSource_, Config.PerConnectionMaxInFlightBySize, 0)
+ , MaxBufferSize(session->Config.MaxBufferSize)
+ , ShutdownReason(MESSAGE_OK)
+ {
+ WriterData.Status.ConnectionId = connectionId;
+ WriterData.Status.PeerAddr = PeerAddr;
+ ReaderData.Status.ConnectionId = connectionId;
+
+ const TInstant now = TInstant::Now();
+
+ WriterFillStatus();
+
+ GranStatus.Writer.Update(WriterData.Status, now, true);
+ GranStatus.Reader.Update(ReaderData.Status, now, true);
+ }
+
+ TRemoteConnection::~TRemoteConnection() {
+ Y_VERIFY(ReplyQueue.IsEmpty());
+ }
+
+ TRemoteConnection::TWriterData::TWriterData()
+ : Down(0)
+ , SocketVersion(0)
+ , InFlight(0)
+ , AwakeFlags(0)
+ , State(WRITER_FILLING)
+ {
+ }
+
+ TRemoteConnection::TWriterData::~TWriterData() {
Y_VERIFY(AtomicGet(Down));
- Y_VERIFY(SendQueue.Empty());
- }
-
- bool TRemoteConnection::TReaderData::HasBytesInBuf(size_t bytes) noexcept {
- size_t left = Buffer.Size() - Offset;
-
- return (MoreBytes = left >= bytes ? 0 : bytes - left) == 0;
- }
-
- void TRemoteConnection::TWriterData::SetChannel(NEventLoop::TChannelPtr channel) {
- Y_VERIFY(!Channel, "must not have channel");
- Y_VERIFY(Buffer.GetBuffer().Empty() && Buffer.LeftSize() == 0, "buffer must be empty");
- Y_VERIFY(State == WRITER_FILLING, "state must be initial");
- Channel = channel;
- }
-
- void TRemoteConnection::TReaderData::SetChannel(NEventLoop::TChannelPtr channel) {
- Y_VERIFY(!Channel, "must not have channel");
- Y_VERIFY(Buffer.Empty(), "buffer must be empty");
- Channel = channel;
- }
-
- void TRemoteConnection::TWriterData::DropChannel() {
- if (!!Channel) {
- Channel->Unregister();
- Channel.Drop();
- }
-
- Buffer.Reset();
- State = WRITER_FILLING;
- }
-
- void TRemoteConnection::TReaderData::DropChannel() {
- // TODO: make Drop call Unregister
- if (!!Channel) {
- Channel->Unregister();
- Channel.Drop();
- }
- Buffer.Reset();
- Offset = 0;
- }
-
- TRemoteConnection::TReaderData::TReaderData()
- : Down(0)
- , SocketVersion(0)
- , Offset(0)
- , MoreBytes(0)
- {
- }
-
- TRemoteConnection::TReaderData::~TReaderData() {
+ Y_VERIFY(SendQueue.Empty());
+ }
+
+ bool TRemoteConnection::TReaderData::HasBytesInBuf(size_t bytes) noexcept {
+ size_t left = Buffer.Size() - Offset;
+
+ return (MoreBytes = left >= bytes ? 0 : bytes - left) == 0;
+ }
+
+ void TRemoteConnection::TWriterData::SetChannel(NEventLoop::TChannelPtr channel) {
+ Y_VERIFY(!Channel, "must not have channel");
+ Y_VERIFY(Buffer.GetBuffer().Empty() && Buffer.LeftSize() == 0, "buffer must be empty");
+ Y_VERIFY(State == WRITER_FILLING, "state must be initial");
+ Channel = channel;
+ }
+
+ void TRemoteConnection::TReaderData::SetChannel(NEventLoop::TChannelPtr channel) {
+ Y_VERIFY(!Channel, "must not have channel");
+ Y_VERIFY(Buffer.Empty(), "buffer must be empty");
+ Channel = channel;
+ }
+
+ void TRemoteConnection::TWriterData::DropChannel() {
+ if (!!Channel) {
+ Channel->Unregister();
+ Channel.Drop();
+ }
+
+ Buffer.Reset();
+ State = WRITER_FILLING;
+ }
+
+ void TRemoteConnection::TReaderData::DropChannel() {
+ // TODO: make Drop call Unregister
+ if (!!Channel) {
+ Channel->Unregister();
+ Channel.Drop();
+ }
+ Buffer.Reset();
+ Offset = 0;
+ }
+
+ TRemoteConnection::TReaderData::TReaderData()
+ : Down(0)
+ , SocketVersion(0)
+ , Offset(0)
+ , MoreBytes(0)
+ {
+ }
+
+ TRemoteConnection::TReaderData::~TReaderData() {
Y_VERIFY(AtomicGet(Down));
- }
-
- void TRemoteConnection::Send(TNonDestroyingAutoPtr<TBusMessage> msg) {
- BeforeSendQueue.Enqueue(msg.Release());
- AtomicIncrement(WriterData.InFlight);
- ScheduleWrite();
- }
-
- void TRemoteConnection::ClearOutgoingQueue(TMessagesPtrs& result, bool reconnect) {
- if (!reconnect) {
- // Do not clear send queue if reconnecting
- WriterData.SendQueue.Clear(&result);
- }
- }
+ }
- void TRemoteConnection::Shutdown(EMessageStatus status) {
- ScheduleShutdown(status);
+ void TRemoteConnection::Send(TNonDestroyingAutoPtr<TBusMessage> msg) {
+ BeforeSendQueue.Enqueue(msg.Release());
+ AtomicIncrement(WriterData.InFlight);
+ ScheduleWrite();
+ }
- ReaderData.ShutdownComplete.WaitI();
- WriterData.ShutdownComplete.WaitI();
- }
+ void TRemoteConnection::ClearOutgoingQueue(TMessagesPtrs& result, bool reconnect) {
+ if (!reconnect) {
+ // Do not clear send queue if reconnecting
+ WriterData.SendQueue.Clear(&result);
+ }
+ }
- void TRemoteConnection::TryConnect() {
- Y_FAIL("TryConnect is client connection only operation");
- }
+ void TRemoteConnection::Shutdown(EMessageStatus status) {
+ ScheduleShutdown(status);
- void TRemoteConnection::ScheduleRead() {
- GetReaderActor()->Schedule();
- }
+ ReaderData.ShutdownComplete.WaitI();
+ WriterData.ShutdownComplete.WaitI();
+ }
- void TRemoteConnection::ScheduleWrite() {
- GetWriterActor()->Schedule();
- }
+ void TRemoteConnection::TryConnect() {
+ Y_FAIL("TryConnect is client connection only operation");
+ }
- void TRemoteConnection::WriterRotateCounters() {
- if (!WriterData.TimeToRotateCounters.FetchTask()) {
- return;
- }
+ void TRemoteConnection::ScheduleRead() {
+ GetReaderActor()->Schedule();
+ }
- WriterData.Status.DurationCounterPrev = WriterData.Status.DurationCounter;
- Reset(WriterData.Status.DurationCounter);
- }
+ void TRemoteConnection::ScheduleWrite() {
+ GetWriterActor()->Schedule();
+ }
- void TRemoteConnection::WriterSendStatus(TInstant now, bool force) {
- GranStatus.Writer.Update(std::bind(&TRemoteConnection::WriterGetStatus, this), now, force);
- }
+ void TRemoteConnection::WriterRotateCounters() {
+ if (!WriterData.TimeToRotateCounters.FetchTask()) {
+ return;
+ }
- void TRemoteConnection::ReaderSendStatus(TInstant now, bool force) {
- GranStatus.Reader.Update(std::bind(&TRemoteConnection::ReaderFillStatus, this), now, force);
- }
+ WriterData.Status.DurationCounterPrev = WriterData.Status.DurationCounter;
+ Reset(WriterData.Status.DurationCounter);
+ }
- const TRemoteConnectionReaderStatus& TRemoteConnection::ReaderFillStatus() {
- ReaderData.Status.BufferSize = ReaderData.Buffer.Capacity();
- ReaderData.Status.QuotaMsg = QuotaMsg.Tokens();
- ReaderData.Status.QuotaBytes = QuotaBytes.Tokens();
+ void TRemoteConnection::WriterSendStatus(TInstant now, bool force) {
+ GranStatus.Writer.Update(std::bind(&TRemoteConnection::WriterGetStatus, this), now, force);
+ }
- return ReaderData.Status;
- }
+ void TRemoteConnection::ReaderSendStatus(TInstant now, bool force) {
+ GranStatus.Reader.Update(std::bind(&TRemoteConnection::ReaderFillStatus, this), now, force);
+ }
+
+ const TRemoteConnectionReaderStatus& TRemoteConnection::ReaderFillStatus() {
+ ReaderData.Status.BufferSize = ReaderData.Buffer.Capacity();
+ ReaderData.Status.QuotaMsg = QuotaMsg.Tokens();
+ ReaderData.Status.QuotaBytes = QuotaBytes.Tokens();
+
+ return ReaderData.Status;
+ }
- void TRemoteConnection::ProcessItem(TReaderTag, ::NActor::TDefaultTag, TWriterToReaderSocketMessage readSocket) {
- if (AtomicGet(ReaderData.Down)) {
- ReaderData.Status.Fd = INVALID_SOCKET;
- return;
- }
+ void TRemoteConnection::ProcessItem(TReaderTag, ::NActor::TDefaultTag, TWriterToReaderSocketMessage readSocket) {
+ if (AtomicGet(ReaderData.Down)) {
+ ReaderData.Status.Fd = INVALID_SOCKET;
+ return;
+ }
- ReaderData.DropChannel();
+ ReaderData.DropChannel();
- ReaderData.Status.Fd = readSocket.Socket;
- ReaderData.SocketVersion = readSocket.SocketVersion;
+ ReaderData.Status.Fd = readSocket.Socket;
+ ReaderData.SocketVersion = readSocket.SocketVersion;
- if (readSocket.Socket != INVALID_SOCKET) {
- ReaderData.SetChannel(Session->ReadEventLoop.Register(readSocket.Socket, this, ReadCookie));
- ReaderData.Channel->EnableRead();
- }
- }
+ if (readSocket.Socket != INVALID_SOCKET) {
+ ReaderData.SetChannel(Session->ReadEventLoop.Register(readSocket.Socket, this, ReadCookie));
+ ReaderData.Channel->EnableRead();
+ }
+ }
- void TRemoteConnection::ProcessItem(TWriterTag, TReconnectTag, ui32 socketVersion) {
- Y_VERIFY(socketVersion <= WriterData.SocketVersion, "something weird");
+ void TRemoteConnection::ProcessItem(TWriterTag, TReconnectTag, ui32 socketVersion) {
+ Y_VERIFY(socketVersion <= WriterData.SocketVersion, "something weird");
- if (WriterData.SocketVersion != socketVersion) {
- return;
- }
- Y_VERIFY(WriterData.Status.Connected, "must be connected at this point");
- Y_VERIFY(!!WriterData.Channel, "must have channel at this point");
+ if (WriterData.SocketVersion != socketVersion) {
+ return;
+ }
+ Y_VERIFY(WriterData.Status.Connected, "must be connected at this point");
+ Y_VERIFY(!!WriterData.Channel, "must have channel at this point");
- WriterData.Status.Connected = false;
- WriterData.DropChannel();
- WriterData.Status.MyAddr = TNetAddr();
- ++WriterData.SocketVersion;
- LastConnectAttempt = TInstant();
+ WriterData.Status.Connected = false;
+ WriterData.DropChannel();
+ WriterData.Status.MyAddr = TNetAddr();
+ ++WriterData.SocketVersion;
+ LastConnectAttempt = TInstant();
- TMessagesPtrs cleared;
- ClearOutgoingQueue(cleared, true);
- WriterErrorMessages(cleared, MESSAGE_DELIVERY_FAILED);
+ TMessagesPtrs cleared;
+ ClearOutgoingQueue(cleared, true);
+ WriterErrorMessages(cleared, MESSAGE_DELIVERY_FAILED);
- FireClientConnectionEvent(TClientConnectionEvent::DISCONNECTED);
+ FireClientConnectionEvent(TClientConnectionEvent::DISCONNECTED);
- ReaderGetSocketQueue()->EnqueueAndSchedule(TWriterToReaderSocketMessage(INVALID_SOCKET, WriterData.SocketVersion));
- }
+ ReaderGetSocketQueue()->EnqueueAndSchedule(TWriterToReaderSocketMessage(INVALID_SOCKET, WriterData.SocketVersion));
+ }
- void TRemoteConnection::ProcessItem(TWriterTag, TWakeReaderTag, ui32 awakeFlags) {
- WriterData.AwakeFlags |= awakeFlags;
+ void TRemoteConnection::ProcessItem(TWriterTag, TWakeReaderTag, ui32 awakeFlags) {
+ WriterData.AwakeFlags |= awakeFlags;
- ReadQuotaWakeup();
- }
+ ReadQuotaWakeup();
+ }
- void TRemoteConnection::Act(TReaderTag) {
- TInstant now = TInstant::Now();
+ void TRemoteConnection::Act(TReaderTag) {
+ TInstant now = TInstant::Now();
- ReaderData.Status.Acts += 1;
+ ReaderData.Status.Acts += 1;
- ReaderGetSocketQueue()->DequeueAllLikelyEmpty();
+ ReaderGetSocketQueue()->DequeueAllLikelyEmpty();
- if (AtomicGet(ReaderData.Down)) {
- ReaderData.DropChannel();
+ if (AtomicGet(ReaderData.Down)) {
+ ReaderData.DropChannel();
- ReaderProcessStatusDown();
- ReaderData.ShutdownComplete.Signal();
+ ReaderProcessStatusDown();
+ ReaderData.ShutdownComplete.Signal();
- } else if (!!ReaderData.Channel) {
- Y_ASSERT(ReaderData.ReadMessages.empty());
+ } else if (!!ReaderData.Channel) {
+ Y_ASSERT(ReaderData.ReadMessages.empty());
- for (int i = 0;; ++i) {
- if (i == 100) {
- // perform other tasks
- GetReaderActor()->AddTaskFromActorLoop();
- break;
- }
+ for (int i = 0;; ++i) {
+ if (i == 100) {
+ // perform other tasks
+ GetReaderActor()->AddTaskFromActorLoop();
+ break;
+ }
- if (NeedInterruptRead()) {
- ReaderData.Channel->EnableRead();
- break;
- }
+ if (NeedInterruptRead()) {
+ ReaderData.Channel->EnableRead();
+ break;
+ }
- if (!ReaderFillBuffer())
- break;
+ if (!ReaderFillBuffer())
+ break;
- if (!ReaderProcessBuffer())
- break;
- }
+ if (!ReaderProcessBuffer())
+ break;
+ }
- ReaderFlushMessages();
+ ReaderFlushMessages();
}
- ReaderSendStatus(now);
+ ReaderSendStatus(now);
}
- bool TRemoteConnection::QuotaAcquire(size_t msg, size_t bytes) {
- ui32 wakeFlags = 0;
+ bool TRemoteConnection::QuotaAcquire(size_t msg, size_t bytes) {
+ ui32 wakeFlags = 0;
- if (!QuotaMsg.Acquire(msg))
- wakeFlags |= WAKE_QUOTA_MSG;
+ if (!QuotaMsg.Acquire(msg))
+ wakeFlags |= WAKE_QUOTA_MSG;
- else if (!QuotaBytes.Acquire(bytes))
- wakeFlags |= WAKE_QUOTA_BYTES;
+ else if (!QuotaBytes.Acquire(bytes))
+ wakeFlags |= WAKE_QUOTA_BYTES;
- if (wakeFlags) {
- ReaderData.Status.QuotaExhausted++;
+ if (wakeFlags) {
+ ReaderData.Status.QuotaExhausted++;
- WriterGetWakeQueue()->EnqueueAndSchedule(wakeFlags);
- }
+ WriterGetWakeQueue()->EnqueueAndSchedule(wakeFlags);
+ }
- return wakeFlags == 0;
- }
+ return wakeFlags == 0;
+ }
- void TRemoteConnection::QuotaConsume(size_t msg, size_t bytes) {
- QuotaMsg.Consume(msg);
- QuotaBytes.Consume(bytes);
- }
+ void TRemoteConnection::QuotaConsume(size_t msg, size_t bytes) {
+ QuotaMsg.Consume(msg);
+ QuotaBytes.Consume(bytes);
+ }
- void TRemoteConnection::QuotaReturnSelf(size_t items, size_t bytes) {
- if (QuotaReturnValues(items, bytes))
- ReadQuotaWakeup();
- }
+ void TRemoteConnection::QuotaReturnSelf(size_t items, size_t bytes) {
+ if (QuotaReturnValues(items, bytes))
+ ReadQuotaWakeup();
+ }
- void TRemoteConnection::QuotaReturnAside(size_t items, size_t bytes) {
- if (QuotaReturnValues(items, bytes) && !AtomicGet(WriterData.Down))
- WriterGetWakeQueue()->EnqueueAndSchedule(0x0);
- }
+ void TRemoteConnection::QuotaReturnAside(size_t items, size_t bytes) {
+ if (QuotaReturnValues(items, bytes) && !AtomicGet(WriterData.Down))
+ WriterGetWakeQueue()->EnqueueAndSchedule(0x0);
+ }
- bool TRemoteConnection::QuotaReturnValues(size_t items, size_t bytes) {
- bool rMsg = QuotaMsg.Return(items);
- bool rBytes = QuotaBytes.Return(bytes);
+ bool TRemoteConnection::QuotaReturnValues(size_t items, size_t bytes) {
+ bool rMsg = QuotaMsg.Return(items);
+ bool rBytes = QuotaBytes.Return(bytes);
- return rMsg || rBytes;
- }
+ return rMsg || rBytes;
+ }
- void TRemoteConnection::ReadQuotaWakeup() {
- const ui32 mask = WriterData.AwakeFlags & WriteWakeFlags();
+ void TRemoteConnection::ReadQuotaWakeup() {
+ const ui32 mask = WriterData.AwakeFlags & WriteWakeFlags();
- if (mask && mask == WriterData.AwakeFlags) {
- WriterData.Status.ReaderWakeups++;
- WriterData.AwakeFlags = 0;
+ if (mask && mask == WriterData.AwakeFlags) {
+ WriterData.Status.ReaderWakeups++;
+ WriterData.AwakeFlags = 0;
- ScheduleRead();
- }
- }
+ ScheduleRead();
+ }
+ }
- ui32 TRemoteConnection::WriteWakeFlags() const {
- ui32 awakeFlags = 0;
+ ui32 TRemoteConnection::WriteWakeFlags() const {
+ ui32 awakeFlags = 0;
- if (QuotaMsg.IsAboveWake())
- awakeFlags |= WAKE_QUOTA_MSG;
+ if (QuotaMsg.IsAboveWake())
+ awakeFlags |= WAKE_QUOTA_MSG;
- if (QuotaBytes.IsAboveWake())
- awakeFlags |= WAKE_QUOTA_BYTES;
+ if (QuotaBytes.IsAboveWake())
+ awakeFlags |= WAKE_QUOTA_BYTES;
- return awakeFlags;
- }
+ return awakeFlags;
+ }
- bool TRemoteConnection::ReaderProcessBuffer() {
- TInstant now = TInstant::Now();
+ bool TRemoteConnection::ReaderProcessBuffer() {
+ TInstant now = TInstant::Now();
+
+ for (;;) {
+ if (!ReaderData.HasBytesInBuf(sizeof(TBusHeader))) {
+ break;
+ }
+
+ TBusHeader header(MakeArrayRef(ReaderData.Buffer.Data() + ReaderData.Offset, ReaderData.Buffer.Size() - ReaderData.Offset));
+
+ if (header.Size < sizeof(TBusHeader)) {
+ LWPROBE(Error, ToString(MESSAGE_HEADER_CORRUPTED), ToString(PeerAddr), ToString(header.Size));
+ ReaderData.Status.Incremental.StatusCounter[MESSAGE_HEADER_CORRUPTED] += 1;
+ ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_HEADER_CORRUPTED, false);
+ return false;
+ }
+
+ if (!IsVersionNegotiation(header) && !IsBusKeyValid(header.Id)) {
+ LWPROBE(Error, ToString(MESSAGE_HEADER_CORRUPTED), ToString(PeerAddr), ToString(header.Size));
+ ReaderData.Status.Incremental.StatusCounter[MESSAGE_HEADER_CORRUPTED] += 1;
+ ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_HEADER_CORRUPTED, false);
+ return false;
+ }
+
+ if (header.Size > Config.MaxMessageSize) {
+ LWPROBE(Error, ToString(MESSAGE_MESSAGE_TOO_LARGE), ToString(PeerAddr), ToString(header.Size));
+ ReaderData.Status.Incremental.StatusCounter[MESSAGE_MESSAGE_TOO_LARGE] += 1;
+ ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_MESSAGE_TOO_LARGE, false);
+ return false;
+ }
+
+ if (!ReaderData.HasBytesInBuf(header.Size)) {
+ if (ReaderData.Offset == 0) {
+ ReaderData.Buffer.Reserve(header.Size);
+ }
+ break;
+ }
+
+ if (!QuotaAcquire(1, header.Size))
+ return false;
+
+ if (!MessageRead(MakeArrayRef(ReaderData.Buffer.Data() + ReaderData.Offset, header.Size), now)) {
+ return false;
+ }
+
+ ReaderData.Offset += header.Size;
+ }
- for (;;) {
- if (!ReaderData.HasBytesInBuf(sizeof(TBusHeader))) {
- break;
- }
+ ReaderData.Buffer.ChopHead(ReaderData.Offset);
+ ReaderData.Offset = 0;
- TBusHeader header(MakeArrayRef(ReaderData.Buffer.Data() + ReaderData.Offset, ReaderData.Buffer.Size() - ReaderData.Offset));
+ if (ReaderData.Buffer.Capacity() > MaxBufferSize && ReaderData.Buffer.Size() <= MaxBufferSize) {
+ ReaderData.Status.Incremental.BufferDrops += 1;
- if (header.Size < sizeof(TBusHeader)) {
- LWPROBE(Error, ToString(MESSAGE_HEADER_CORRUPTED), ToString(PeerAddr), ToString(header.Size));
- ReaderData.Status.Incremental.StatusCounter[MESSAGE_HEADER_CORRUPTED] += 1;
- ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_HEADER_CORRUPTED, false);
- return false;
- }
+ TBuffer temp;
+ // probably should use another constant
+ temp.Reserve(Config.DefaultBufferSize);
+ temp.Append(ReaderData.Buffer.Data(), ReaderData.Buffer.Size());
- if (!IsVersionNegotiation(header) && !IsBusKeyValid(header.Id)) {
- LWPROBE(Error, ToString(MESSAGE_HEADER_CORRUPTED), ToString(PeerAddr), ToString(header.Size));
- ReaderData.Status.Incremental.StatusCounter[MESSAGE_HEADER_CORRUPTED] += 1;
- ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_HEADER_CORRUPTED, false);
- return false;
- }
+ ReaderData.Buffer.Swap(temp);
+ }
- if (header.Size > Config.MaxMessageSize) {
- LWPROBE(Error, ToString(MESSAGE_MESSAGE_TOO_LARGE), ToString(PeerAddr), ToString(header.Size));
- ReaderData.Status.Incremental.StatusCounter[MESSAGE_MESSAGE_TOO_LARGE] += 1;
- ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_MESSAGE_TOO_LARGE, false);
- return false;
- }
+ return true;
+ }
- if (!ReaderData.HasBytesInBuf(header.Size)) {
- if (ReaderData.Offset == 0) {
- ReaderData.Buffer.Reserve(header.Size);
- }
- break;
- }
+ bool TRemoteConnection::ReaderFillBuffer() {
+ if (!ReaderData.BufferMore())
+ return true;
- if (!QuotaAcquire(1, header.Size))
- return false;
+ if (ReaderData.Buffer.Avail() == 0) {
+ if (ReaderData.Buffer.Size() == 0) {
+ ReaderData.Buffer.Reserve(Config.DefaultBufferSize);
+ } else {
+ ReaderData.Buffer.Reserve(ReaderData.Buffer.Size() * 2);
+ }
+ }
- if (!MessageRead(MakeArrayRef(ReaderData.Buffer.Data() + ReaderData.Offset, header.Size), now)) {
- return false;
- }
+ Y_ASSERT(ReaderData.Buffer.Avail() > 0);
- ReaderData.Offset += header.Size;
- }
+ ssize_t bytes;
+ {
+ TWhatThreadDoesPushPop pp("recv syscall");
+ bytes = SocketRecv(ReaderData.Channel->GetSocket(), TArrayRef<char>(ReaderData.Buffer.Pos(), ReaderData.Buffer.Avail()));
+ }
- ReaderData.Buffer.ChopHead(ReaderData.Offset);
- ReaderData.Offset = 0;
+ if (bytes < 0) {
+ if (WouldBlock()) {
+ ReaderData.Channel->EnableRead();
+ return false;
+ } else {
+ ReaderData.Channel->DisableRead();
+ ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_DELIVERY_FAILED, false);
+ return false;
+ }
+ }
- if (ReaderData.Buffer.Capacity() > MaxBufferSize && ReaderData.Buffer.Size() <= MaxBufferSize) {
- ReaderData.Status.Incremental.BufferDrops += 1;
+ if (bytes == 0) {
+ ReaderData.Channel->DisableRead();
+ // TODO: incorrect: it is possible that only input is shutdown, and output is available
+ ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_DELIVERY_FAILED, false);
+ return false;
+ }
- TBuffer temp;
- // probably should use another constant
- temp.Reserve(Config.DefaultBufferSize);
- temp.Append(ReaderData.Buffer.Data(), ReaderData.Buffer.Size());
+ ReaderData.Status.Incremental.NetworkOps += 1;
- ReaderData.Buffer.Swap(temp);
- }
+ ReaderData.Buffer.Advance(bytes);
+ ReaderData.MoreBytes = 0;
+ return true;
+ }
- return true;
+ void TRemoteConnection::ClearBeforeSendQueue(EMessageStatus reason) {
+ BeforeSendQueue.DequeueAll(std::bind(&TRemoteConnection::WriterBeforeWriteErrorMessage, this, std::placeholders::_1, reason));
}
- bool TRemoteConnection::ReaderFillBuffer() {
- if (!ReaderData.BufferMore())
- return true;
+ void TRemoteConnection::ClearReplyQueue(EMessageStatus reason) {
+ TVectorSwaps<TBusMessagePtrAndHeader> replyQueueTemp;
+ Y_ASSERT(replyQueueTemp.empty());
+ ReplyQueue.DequeueAllSingleConsumer(&replyQueueTemp);
- if (ReaderData.Buffer.Avail() == 0) {
- if (ReaderData.Buffer.Size() == 0) {
- ReaderData.Buffer.Reserve(Config.DefaultBufferSize);
- } else {
- ReaderData.Buffer.Reserve(ReaderData.Buffer.Size() * 2);
- }
+ TVector<TBusMessage*> messages;
+ for (TVectorSwaps<TBusMessagePtrAndHeader>::reverse_iterator message = replyQueueTemp.rbegin();
+ message != replyQueueTemp.rend(); ++message) {
+ messages.push_back(message->MessagePtr.Release());
}
- Y_ASSERT(ReaderData.Buffer.Avail() > 0);
+ WriterErrorMessages(messages, reason);
- ssize_t bytes;
- {
- TWhatThreadDoesPushPop pp("recv syscall");
- bytes = SocketRecv(ReaderData.Channel->GetSocket(), TArrayRef<char>(ReaderData.Buffer.Pos(), ReaderData.Buffer.Avail()));
- }
-
- if (bytes < 0) {
- if (WouldBlock()) {
- ReaderData.Channel->EnableRead();
- return false;
- } else {
- ReaderData.Channel->DisableRead();
- ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_DELIVERY_FAILED, false);
- return false;
- }
- }
-
- if (bytes == 0) {
- ReaderData.Channel->DisableRead();
- // TODO: incorrect: it is possible that only input is shutdown, and output is available
- ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_DELIVERY_FAILED, false);
- return false;
- }
-
- ReaderData.Status.Incremental.NetworkOps += 1;
-
- ReaderData.Buffer.Advance(bytes);
- ReaderData.MoreBytes = 0;
- return true;
- }
-
- void TRemoteConnection::ClearBeforeSendQueue(EMessageStatus reason) {
- BeforeSendQueue.DequeueAll(std::bind(&TRemoteConnection::WriterBeforeWriteErrorMessage, this, std::placeholders::_1, reason));
- }
-
- void TRemoteConnection::ClearReplyQueue(EMessageStatus reason) {
- TVectorSwaps<TBusMessagePtrAndHeader> replyQueueTemp;
- Y_ASSERT(replyQueueTemp.empty());
- ReplyQueue.DequeueAllSingleConsumer(&replyQueueTemp);
-
- TVector<TBusMessage*> messages;
- for (TVectorSwaps<TBusMessagePtrAndHeader>::reverse_iterator message = replyQueueTemp.rbegin();
- message != replyQueueTemp.rend(); ++message) {
- messages.push_back(message->MessagePtr.Release());
- }
-
- WriterErrorMessages(messages, reason);
-
- replyQueueTemp.clear();
- }
-
- void TRemoteConnection::ProcessBeforeSendQueueMessage(TBusMessage* message, TInstant now) {
- // legacy clients expect this field to be set
- if (!Session->IsSource_) {
- message->SendTime = now.MilliSeconds();
- }
-
- WriterData.SendQueue.PushBack(message);
- }
-
- void TRemoteConnection::ProcessBeforeSendQueue(TInstant now) {
- BeforeSendQueue.DequeueAll(std::bind(&TRemoteConnection::ProcessBeforeSendQueueMessage, this, std::placeholders::_1, now));
- }
-
- void TRemoteConnection::WriterFillInFlight() {
- // this is hack for TLoadBalancedProtocol
- WriterFillStatus();
- AtomicSet(WriterData.InFlight, WriterData.Status.GetInFlight());
- }
-
- const TRemoteConnectionWriterStatus& TRemoteConnection::WriterGetStatus() {
- WriterRotateCounters();
- WriterFillStatus();
-
- return WriterData.Status;
- }
-
- void TRemoteConnection::WriterFillStatus() {
- if (!!WriterData.Channel) {
- WriterData.Status.Fd = WriterData.Channel->GetSocket();
- } else {
- WriterData.Status.Fd = INVALID_SOCKET;
- }
- WriterData.Status.BufferSize = WriterData.Buffer.Capacity();
- WriterData.Status.SendQueueSize = WriterData.SendQueue.Size();
- WriterData.Status.State = WriterData.State;
- }
-
- void TRemoteConnection::WriterProcessStatusDown() {
- Session->GetDeadConnectionWriterStatusQueue()->EnqueueAndSchedule(WriterData.Status.Incremental);
- Reset(WriterData.Status.Incremental);
- }
-
- void TRemoteConnection::ReaderProcessStatusDown() {
- Session->GetDeadConnectionReaderStatusQueue()->EnqueueAndSchedule(ReaderData.Status.Incremental);
- Reset(ReaderData.Status.Incremental);
- }
-
- void TRemoteConnection::ProcessWriterDown() {
- if (!RemovedFromSession) {
- Session->GetRemoveConnectionQueue()->EnqueueAndSchedule(this);
-
- if (Session->IsSource_) {
- if (WriterData.Status.Connected) {
- FireClientConnectionEvent(TClientConnectionEvent::DISCONNECTED);
- }
- }
-
- LWPROBE(Disconnected, ToString(PeerAddr));
- RemovedFromSession = true;
- }
-
- WriterData.DropChannel();
+ replyQueueTemp.clear();
+ }
- DropEnqueuedData(ShutdownReason, MESSAGE_SHUTDOWN);
+ void TRemoteConnection::ProcessBeforeSendQueueMessage(TBusMessage* message, TInstant now) {
+ // legacy clients expect this field to be set
+ if (!Session->IsSource_) {
+ message->SendTime = now.MilliSeconds();
+ }
- WriterProcessStatusDown();
+ WriterData.SendQueue.PushBack(message);
+ }
- WriterData.ShutdownComplete.Signal();
- }
+ void TRemoteConnection::ProcessBeforeSendQueue(TInstant now) {
+ BeforeSendQueue.DequeueAll(std::bind(&TRemoteConnection::ProcessBeforeSendQueueMessage, this, std::placeholders::_1, now));
+ }
- void TRemoteConnection::DropEnqueuedData(EMessageStatus reason, EMessageStatus reasonForQueues) {
- ClearReplyQueue(reasonForQueues);
- ClearBeforeSendQueue(reasonForQueues);
- WriterGetReconnectQueue()->Clear();
- WriterGetWakeQueue()->Clear();
+ void TRemoteConnection::WriterFillInFlight() {
+ // this is hack for TLoadBalancedProtocol
+ WriterFillStatus();
+ AtomicSet(WriterData.InFlight, WriterData.Status.GetInFlight());
+ }
- TMessagesPtrs cleared;
- ClearOutgoingQueue(cleared, false);
+ const TRemoteConnectionWriterStatus& TRemoteConnection::WriterGetStatus() {
+ WriterRotateCounters();
+ WriterFillStatus();
- if (!Session->IsSource_) {
- for (auto& i : cleared) {
- TBusMessagePtrAndHeader h(i);
- CheckedCast<TRemoteServerSession*>(Session.Get())->ReleaseInWorkResponses(MakeArrayRef(&h, 1));
- // assignment back is weird
- i = h.MessagePtr.Release();
- // and this part is not batch
- }
- }
+ return WriterData.Status;
+ }
- WriterErrorMessages(cleared, reason);
- }
+ void TRemoteConnection::WriterFillStatus() {
+ if (!!WriterData.Channel) {
+ WriterData.Status.Fd = WriterData.Channel->GetSocket();
+ } else {
+ WriterData.Status.Fd = INVALID_SOCKET;
+ }
+ WriterData.Status.BufferSize = WriterData.Buffer.Capacity();
+ WriterData.Status.SendQueueSize = WriterData.SendQueue.Size();
+ WriterData.Status.State = WriterData.State;
+ }
- void TRemoteConnection::BeforeTryWrite() {
- }
+ void TRemoteConnection::WriterProcessStatusDown() {
+ Session->GetDeadConnectionWriterStatusQueue()->EnqueueAndSchedule(WriterData.Status.Incremental);
+ Reset(WriterData.Status.Incremental);
+ }
- void TRemoteConnection::Act(TWriterTag) {
- TInstant now = TInstant::Now();
+ void TRemoteConnection::ReaderProcessStatusDown() {
+ Session->GetDeadConnectionReaderStatusQueue()->EnqueueAndSchedule(ReaderData.Status.Incremental);
+ Reset(ReaderData.Status.Incremental);
+ }
- WriterData.Status.Acts += 1;
+ void TRemoteConnection::ProcessWriterDown() {
+ if (!RemovedFromSession) {
+ Session->GetRemoveConnectionQueue()->EnqueueAndSchedule(this);
- if (Y_UNLIKELY(AtomicGet(WriterData.Down))) {
- // dump status must work even if WriterDown
- WriterSendStatus(now, true);
- ProcessWriterDown();
- return;
+ if (Session->IsSource_) {
+ if (WriterData.Status.Connected) {
+ FireClientConnectionEvent(TClientConnectionEvent::DISCONNECTED);
+ }
+ }
+
+ LWPROBE(Disconnected, ToString(PeerAddr));
+ RemovedFromSession = true;
}
- ProcessBeforeSendQueue(now);
+ WriterData.DropChannel();
- BeforeTryWrite();
+ DropEnqueuedData(ShutdownReason, MESSAGE_SHUTDOWN);
- WriterFillInFlight();
+ WriterProcessStatusDown();
- WriterGetReconnectQueue()->DequeueAllLikelyEmpty();
+ WriterData.ShutdownComplete.Signal();
+ }
- if (!WriterData.Status.Connected) {
- TryConnect();
- } else {
- for (int i = 0;; ++i) {
- if (i == 100) {
- // perform other tasks
- GetWriterActor()->AddTaskFromActorLoop();
- break;
- }
+ void TRemoteConnection::DropEnqueuedData(EMessageStatus reason, EMessageStatus reasonForQueues) {
+ ClearReplyQueue(reasonForQueues);
+ ClearBeforeSendQueue(reasonForQueues);
+ WriterGetReconnectQueue()->Clear();
+ WriterGetWakeQueue()->Clear();
+
+ TMessagesPtrs cleared;
+ ClearOutgoingQueue(cleared, false);
+
+ if (!Session->IsSource_) {
+ for (auto& i : cleared) {
+ TBusMessagePtrAndHeader h(i);
+ CheckedCast<TRemoteServerSession*>(Session.Get())->ReleaseInWorkResponses(MakeArrayRef(&h, 1));
+ // assignment back is weird
+ i = h.MessagePtr.Release();
+ // and this part is not batch
+ }
+ }
- if (WriterData.State == WRITER_FILLING) {
- WriterFillBuffer();
+ WriterErrorMessages(cleared, reason);
+ }
- if (WriterData.State == WRITER_FILLING) {
- WriterData.Channel->DisableWrite();
- break;
- }
+ void TRemoteConnection::BeforeTryWrite() {
+ }
- Y_ASSERT(!WriterData.Buffer.Empty());
- }
+ void TRemoteConnection::Act(TWriterTag) {
+ TInstant now = TInstant::Now();
- if (WriterData.State == WRITER_FLUSHING) {
- WriterFlushBuffer();
+ WriterData.Status.Acts += 1;
- if (WriterData.State == WRITER_FLUSHING) {
- break;
- }
- }
- }
- }
+ if (Y_UNLIKELY(AtomicGet(WriterData.Down))) {
+ // dump status must work even if WriterDown
+ WriterSendStatus(now, true);
+ ProcessWriterDown();
+ return;
+ }
- WriterGetWakeQueue()->DequeueAllLikelyEmpty();
+ ProcessBeforeSendQueue(now);
- WriterSendStatus(now);
- }
+ BeforeTryWrite();
- void TRemoteConnection::WriterFlushBuffer() {
- Y_ASSERT(WriterData.State == WRITER_FLUSHING);
- Y_ASSERT(!WriterData.Buffer.Empty());
+ WriterFillInFlight();
- WriterData.CorkUntil = TInstant::Zero();
+ WriterGetReconnectQueue()->DequeueAllLikelyEmpty();
- while (!WriterData.Buffer.Empty()) {
- ssize_t bytes;
- {
- TWhatThreadDoesPushPop pp("send syscall");
- bytes = SocketSend(WriterData.Channel->GetSocket(), TArrayRef<const char>(WriterData.Buffer.LeftPos(), WriterData.Buffer.Size()));
- }
+ if (!WriterData.Status.Connected) {
+ TryConnect();
+ } else {
+ for (int i = 0;; ++i) {
+ if (i == 100) {
+ // perform other tasks
+ GetWriterActor()->AddTaskFromActorLoop();
+ break;
+ }
+
+ if (WriterData.State == WRITER_FILLING) {
+ WriterFillBuffer();
+
+ if (WriterData.State == WRITER_FILLING) {
+ WriterData.Channel->DisableWrite();
+ break;
+ }
+
+ Y_ASSERT(!WriterData.Buffer.Empty());
+ }
+
+ if (WriterData.State == WRITER_FLUSHING) {
+ WriterFlushBuffer();
+
+ if (WriterData.State == WRITER_FLUSHING) {
+ break;
+ }
+ }
+ }
+ }
+
+ WriterGetWakeQueue()->DequeueAllLikelyEmpty();
- if (bytes < 0) {
- if (WouldBlock()) {
- WriterData.Channel->EnableWrite();
- return;
- } else {
- WriterData.Channel->DisableWrite();
- ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_DELIVERY_FAILED, true);
- return;
- }
- }
+ WriterSendStatus(now);
+ }
+
+ void TRemoteConnection::WriterFlushBuffer() {
+ Y_ASSERT(WriterData.State == WRITER_FLUSHING);
+ Y_ASSERT(!WriterData.Buffer.Empty());
- WriterData.Status.Incremental.NetworkOps += 1;
+ WriterData.CorkUntil = TInstant::Zero();
- WriterData.Buffer.LeftProceed(bytes);
+ while (!WriterData.Buffer.Empty()) {
+ ssize_t bytes;
+ {
+ TWhatThreadDoesPushPop pp("send syscall");
+ bytes = SocketSend(WriterData.Channel->GetSocket(), TArrayRef<const char>(WriterData.Buffer.LeftPos(), WriterData.Buffer.Size()));
+ }
+
+ if (bytes < 0) {
+ if (WouldBlock()) {
+ WriterData.Channel->EnableWrite();
+ return;
+ } else {
+ WriterData.Channel->DisableWrite();
+ ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_DELIVERY_FAILED, true);
+ return;
+ }
+ }
+
+ WriterData.Status.Incremental.NetworkOps += 1;
+
+ WriterData.Buffer.LeftProceed(bytes);
}
- WriterData.Buffer.Clear();
- if (WriterData.Buffer.Capacity() > MaxBufferSize) {
- WriterData.Status.Incremental.BufferDrops += 1;
- WriterData.Buffer.Reset();
+ WriterData.Buffer.Clear();
+ if (WriterData.Buffer.Capacity() > MaxBufferSize) {
+ WriterData.Status.Incremental.BufferDrops += 1;
+ WriterData.Buffer.Reset();
}
- WriterData.State = WRITER_FILLING;
- }
+ WriterData.State = WRITER_FILLING;
+ }
- void TRemoteConnection::ScheduleShutdownOnServerOrReconnectOnClient(EMessageStatus status, bool writer) {
- if (Session->IsSource_) {
- WriterGetReconnectQueue()->EnqueueAndSchedule(writer ? WriterData.SocketVersion : ReaderData.SocketVersion);
- } else {
- ScheduleShutdown(status);
+ void TRemoteConnection::ScheduleShutdownOnServerOrReconnectOnClient(EMessageStatus status, bool writer) {
+ if (Session->IsSource_) {
+ WriterGetReconnectQueue()->EnqueueAndSchedule(writer ? WriterData.SocketVersion : ReaderData.SocketVersion);
+ } else {
+ ScheduleShutdown(status);
}
}
- void TRemoteConnection::ScheduleShutdown(EMessageStatus status) {
- ShutdownReason = status;
+ void TRemoteConnection::ScheduleShutdown(EMessageStatus status) {
+ ShutdownReason = status;
- AtomicSet(ReaderData.Down, 1);
- ScheduleRead();
+ AtomicSet(ReaderData.Down, 1);
+ ScheduleRead();
- AtomicSet(WriterData.Down, 1);
- ScheduleWrite();
- }
+ AtomicSet(WriterData.Down, 1);
+ ScheduleWrite();
+ }
- void TRemoteConnection::CallSerialize(TBusMessage* msg, TBuffer& buffer) const {
- size_t posForAssertion = buffer.Size();
- Proto->Serialize(msg, buffer);
- Y_VERIFY(buffer.Size() >= posForAssertion,
- "incorrect Serialize implementation, pos before serialize: %d, pos after serialize: %d",
- int(posForAssertion), int(buffer.Size()));
+ void TRemoteConnection::CallSerialize(TBusMessage* msg, TBuffer& buffer) const {
+ size_t posForAssertion = buffer.Size();
+ Proto->Serialize(msg, buffer);
+ Y_VERIFY(buffer.Size() >= posForAssertion,
+ "incorrect Serialize implementation, pos before serialize: %d, pos after serialize: %d",
+ int(posForAssertion), int(buffer.Size()));
}
- namespace {
+ namespace {
inline void WriteHeader(const TBusHeader& header, TBuffer& data) {
- data.Reserve(data.Size() + sizeof(TBusHeader));
- /// \todo hton instead of memcpy
- memcpy(data.Data() + data.Size(), &header, sizeof(TBusHeader));
- data.Advance(sizeof(TBusHeader));
+ data.Reserve(data.Size() + sizeof(TBusHeader));
+ /// \todo hton instead of memcpy
+ memcpy(data.Data() + data.Size(), &header, sizeof(TBusHeader));
+ data.Advance(sizeof(TBusHeader));
}
inline void WriteDummyHeader(TBuffer& data) {
- data.Resize(data.Size() + sizeof(TBusHeader));
- }
+ data.Resize(data.Size() + sizeof(TBusHeader));
+ }
- }
+ }
- void TRemoteConnection::SerializeMessage(TBusMessage* msg, TBuffer* data, TMessageCounter* counter) const {
- size_t pos = data->Size();
+ void TRemoteConnection::SerializeMessage(TBusMessage* msg, TBuffer* data, TMessageCounter* counter) const {
+ size_t pos = data->Size();
- size_t dataSize;
+ size_t dataSize;
- bool compressionRequested = msg->IsCompressed();
+ bool compressionRequested = msg->IsCompressed();
- if (compressionRequested) {
- TBuffer compdata;
- TBuffer plaindata;
- CallSerialize(msg, plaindata);
+ if (compressionRequested) {
+ TBuffer compdata;
+ TBuffer plaindata;
+ CallSerialize(msg, plaindata);
- dataSize = sizeof(TBusHeader) + plaindata.Size();
+ dataSize = sizeof(TBusHeader) + plaindata.Size();
- NCodecs::TCodecPtr c = Proto->GetTransportCodec();
+ NCodecs::TCodecPtr c = Proto->GetTransportCodec();
c->Encode(TStringBuf{plaindata.data(), plaindata.size()}, compdata);
- if (compdata.Size() < plaindata.Size()) {
- plaindata.Clear();
- msg->GetHeader()->Size = sizeof(TBusHeader) + compdata.Size();
- WriteHeader(*msg->GetHeader(), *data);
- data->Append(compdata.Data(), compdata.Size());
- } else {
- compdata.Clear();
- msg->SetCompressed(false);
- msg->GetHeader()->Size = sizeof(TBusHeader) + plaindata.Size();
- WriteHeader(*msg->GetHeader(), *data);
- data->Append(plaindata.Data(), plaindata.Size());
- }
- } else {
- WriteDummyHeader(*data);
- CallSerialize(msg, *data);
-
- dataSize = msg->GetHeader()->Size = data->Size() - pos;
-
- data->Proceed(pos);
- WriteHeader(*msg->GetHeader(), *data);
- data->Proceed(pos + msg->GetHeader()->Size);
- }
-
- Y_ASSERT(msg->GetHeader()->Size == data->Size() - pos);
- counter->AddMessage(dataSize, data->Size() - pos, msg->IsCompressed(), compressionRequested);
- }
-
- TBusMessage* TRemoteConnection::DeserializeMessage(TArrayRef<const char> dataRef, const TBusHeader* header, TMessageCounter* messageCounter, EMessageStatus* status) const {
- size_t dataSize;
-
- TBusMessage* message;
- if (header->FlagsInternal & MESSAGE_COMPRESS_INTERNAL) {
- TBuffer msg;
- {
- TBuffer plaindata;
- NCodecs::TCodecPtr c = Proto->GetTransportCodec();
- try {
- TArrayRef<const char> payload = TBusMessage::GetPayload(dataRef);
- c->Decode(TStringBuf{payload.data(), payload.size()}, plaindata);
- } catch (...) {
- // catch all, because
- // http://nga.at.yandex-team.ru/replies.xml?item_no=3884
- *status = MESSAGE_DECOMPRESS_ERROR;
- return nullptr;
- }
-
- msg.Append(dataRef.data(), sizeof(TBusHeader));
- msg.Append(plaindata.Data(), plaindata.Size());
- }
- TArrayRef<const char> msgRef(msg.Data(), msg.Size());
- dataSize = sizeof(TBusHeader) + msgRef.size();
- // TODO: track error types
- message = Proto->Deserialize(header->Type, msgRef.Slice(sizeof(TBusHeader))).Release();
- if (!message) {
- *status = MESSAGE_DESERIALIZE_ERROR;
- return nullptr;
- }
- *message->GetHeader() = *header;
- message->SetCompressed(true);
- } else {
- dataSize = dataRef.size();
- message = Proto->Deserialize(header->Type, dataRef.Slice(sizeof(TBusHeader))).Release();
- if (!message) {
- *status = MESSAGE_DESERIALIZE_ERROR;
- return nullptr;
- }
- *message->GetHeader() = *header;
- }
-
- messageCounter->AddMessage(dataSize, dataRef.size(), header->FlagsInternal & MESSAGE_COMPRESS_INTERNAL, false);
-
- return message;
- }
-
- void TRemoteConnection::ResetOneWayFlag(TArrayRef<TBusMessage*> messages) {
- for (auto message : messages) {
- message->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL;
- }
- }
-
- void TRemoteConnection::ReaderFlushMessages() {
- if (!ReaderData.ReadMessages.empty()) {
- Session->OnMessageReceived(this, ReaderData.ReadMessages);
- ReaderData.ReadMessages.clear();
- }
- }
-
- // @return false if actor should break
- bool TRemoteConnection::MessageRead(TArrayRef<const char> readDataRef, TInstant now) {
- TBusHeader header(readDataRef);
-
- Y_ASSERT(readDataRef.size() == header.Size);
-
- if (header.GetVersionInternal() != YBUS_VERSION) {
- ReaderProcessMessageUnknownVersion(readDataRef);
- return true;
- }
-
- EMessageStatus deserializeFailureStatus = MESSAGE_OK;
- TBusMessage* r = DeserializeMessage(readDataRef, &header, &ReaderData.Status.Incremental.MessageCounter, &deserializeFailureStatus);
-
- if (!r) {
- Y_VERIFY(deserializeFailureStatus != MESSAGE_OK, "state check");
- LWPROBE(Error, ToString(deserializeFailureStatus), ToString(PeerAddr), "");
- ReaderData.Status.Incremental.StatusCounter[deserializeFailureStatus] += 1;
- ScheduleShutdownOnServerOrReconnectOnClient(deserializeFailureStatus, false);
- return false;
- }
-
- LWPROBE(Read, r->GetHeader()->Size);
-
- r->ReplyTo = PeerAddrSocketAddr;
-
- TBusMessagePtrAndHeader h(r);
- r->RecvTime = now;
-
- QuotaConsume(1, header.Size);
-
- ReaderData.ReadMessages.push_back(h);
- if (ReaderData.ReadMessages.size() >= 100) {
- ReaderFlushMessages();
- }
-
- return true;
- }
-
- void TRemoteConnection::WriterFillBuffer() {
- Y_ASSERT(WriterData.State == WRITER_FILLING);
-
- Y_ASSERT(WriterData.Buffer.LeftSize() == 0);
-
- if (Y_UNLIKELY(!WrongVersionRequests.IsEmpty())) {
- TVector<TBusHeader> headers;
- WrongVersionRequests.DequeueAllSingleConsumer(&headers);
- for (TVector<TBusHeader>::reverse_iterator header = headers.rbegin();
- header != headers.rend(); ++header) {
- TBusHeader response = *header;
- response.SendTime = NBus::Now();
- response.Size = sizeof(TBusHeader);
- response.FlagsInternal = 0;
- response.SetVersionInternal(YBUS_VERSION);
- WriteHeader(response, WriterData.Buffer.GetBuffer());
- }
-
- Y_ASSERT(!WriterData.Buffer.Empty());
- WriterData.State = WRITER_FLUSHING;
- return;
- }
-
- TTempTlsVector<TBusMessagePtrAndHeader, void, TVectorSwaps> writeMessages;
-
- for (;;) {
- THolder<TBusMessage> writeMessage(WriterData.SendQueue.PopFront());
- if (!writeMessage) {
- break;
- }
-
- if (Config.Cork != TDuration::Zero()) {
- if (WriterData.CorkUntil == TInstant::Zero()) {
- WriterData.CorkUntil = TInstant::Now() + Config.Cork;
- }
- }
+ if (compdata.Size() < plaindata.Size()) {
+ plaindata.Clear();
+ msg->GetHeader()->Size = sizeof(TBusHeader) + compdata.Size();
+ WriteHeader(*msg->GetHeader(), *data);
+ data->Append(compdata.Data(), compdata.Size());
+ } else {
+ compdata.Clear();
+ msg->SetCompressed(false);
+ msg->GetHeader()->Size = sizeof(TBusHeader) + plaindata.Size();
+ WriteHeader(*msg->GetHeader(), *data);
+ data->Append(plaindata.Data(), plaindata.Size());
+ }
+ } else {
+ WriteDummyHeader(*data);
+ CallSerialize(msg, *data);
+
+ dataSize = msg->GetHeader()->Size = data->Size() - pos;
+
+ data->Proceed(pos);
+ WriteHeader(*msg->GetHeader(), *data);
+ data->Proceed(pos + msg->GetHeader()->Size);
+ }
- size_t sizeBeforeSerialize = WriterData.Buffer.Size();
+ Y_ASSERT(msg->GetHeader()->Size == data->Size() - pos);
+ counter->AddMessage(dataSize, data->Size() - pos, msg->IsCompressed(), compressionRequested);
+ }
- TMessageCounter messageCounter = WriterData.Status.Incremental.MessageCounter;
+ TBusMessage* TRemoteConnection::DeserializeMessage(TArrayRef<const char> dataRef, const TBusHeader* header, TMessageCounter* messageCounter, EMessageStatus* status) const {
+ size_t dataSize;
+
+ TBusMessage* message;
+ if (header->FlagsInternal & MESSAGE_COMPRESS_INTERNAL) {
+ TBuffer msg;
+ {
+ TBuffer plaindata;
+ NCodecs::TCodecPtr c = Proto->GetTransportCodec();
+ try {
+ TArrayRef<const char> payload = TBusMessage::GetPayload(dataRef);
+ c->Decode(TStringBuf{payload.data(), payload.size()}, plaindata);
+ } catch (...) {
+ // catch all, because
+ // http://nga.at.yandex-team.ru/replies.xml?item_no=3884
+ *status = MESSAGE_DECOMPRESS_ERROR;
+ return nullptr;
+ }
+
+ msg.Append(dataRef.data(), sizeof(TBusHeader));
+ msg.Append(plaindata.Data(), plaindata.Size());
+ }
+ TArrayRef<const char> msgRef(msg.Data(), msg.Size());
+ dataSize = sizeof(TBusHeader) + msgRef.size();
+ // TODO: track error types
+ message = Proto->Deserialize(header->Type, msgRef.Slice(sizeof(TBusHeader))).Release();
+ if (!message) {
+ *status = MESSAGE_DESERIALIZE_ERROR;
+ return nullptr;
+ }
+ *message->GetHeader() = *header;
+ message->SetCompressed(true);
+ } else {
+ dataSize = dataRef.size();
+ message = Proto->Deserialize(header->Type, dataRef.Slice(sizeof(TBusHeader))).Release();
+ if (!message) {
+ *status = MESSAGE_DESERIALIZE_ERROR;
+ return nullptr;
+ }
+ *message->GetHeader() = *header;
+ }
- SerializeMessage(writeMessage.Get(), &WriterData.Buffer.GetBuffer(), &messageCounter);
+ messageCounter->AddMessage(dataSize, dataRef.size(), header->FlagsInternal & MESSAGE_COMPRESS_INTERNAL, false);
- size_t written = WriterData.Buffer.Size() - sizeBeforeSerialize;
- if (written > Config.MaxMessageSize) {
- WriterData.Buffer.GetBuffer().EraseBack(written);
- WriterBeforeWriteErrorMessage(writeMessage.Release(), MESSAGE_MESSAGE_TOO_LARGE);
- continue;
- }
+ return message;
+ }
+
+ void TRemoteConnection::ResetOneWayFlag(TArrayRef<TBusMessage*> messages) {
+ for (auto message : messages) {
+ message->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL;
+ }
+ }
+
+ void TRemoteConnection::ReaderFlushMessages() {
+ if (!ReaderData.ReadMessages.empty()) {
+ Session->OnMessageReceived(this, ReaderData.ReadMessages);
+ ReaderData.ReadMessages.clear();
+ }
+ }
+
+ // @return false if actor should break
+ bool TRemoteConnection::MessageRead(TArrayRef<const char> readDataRef, TInstant now) {
+ TBusHeader header(readDataRef);
- WriterData.Status.Incremental.MessageCounter = messageCounter;
+ Y_ASSERT(readDataRef.size() == header.Size);
+
+ if (header.GetVersionInternal() != YBUS_VERSION) {
+ ReaderProcessMessageUnknownVersion(readDataRef);
+ return true;
+ }
- TBusMessagePtrAndHeader h(writeMessage.Release());
- writeMessages.GetVector()->push_back(h);
+ EMessageStatus deserializeFailureStatus = MESSAGE_OK;
+ TBusMessage* r = DeserializeMessage(readDataRef, &header, &ReaderData.Status.Incremental.MessageCounter, &deserializeFailureStatus);
- Y_ASSERT(!WriterData.Buffer.Empty());
- if (WriterData.Buffer.Size() >= Config.SendThreshold) {
- break;
- }
+ if (!r) {
+ Y_VERIFY(deserializeFailureStatus != MESSAGE_OK, "state check");
+ LWPROBE(Error, ToString(deserializeFailureStatus), ToString(PeerAddr), "");
+ ReaderData.Status.Incremental.StatusCounter[deserializeFailureStatus] += 1;
+ ScheduleShutdownOnServerOrReconnectOnClient(deserializeFailureStatus, false);
+ return false;
}
-
- if (!WriterData.Buffer.Empty()) {
- if (WriterData.Buffer.Size() >= Config.SendThreshold) {
- WriterData.State = WRITER_FLUSHING;
- } else if (WriterData.CorkUntil == TInstant::Zero()) {
- WriterData.State = WRITER_FLUSHING;
- } else if (TInstant::Now() >= WriterData.CorkUntil) {
- WriterData.State = WRITER_FLUSHING;
- } else {
- // keep filling
- Y_ASSERT(WriterData.State == WRITER_FILLING);
- GetWriterSchedulerActor()->ScheduleAt(WriterData.CorkUntil);
- }
- } else {
- // keep filling
- Y_ASSERT(WriterData.State == WRITER_FILLING);
- }
-
- size_t bytes = MessageSize(*writeMessages.GetVector());
-
- QuotaReturnSelf(writeMessages.GetVector()->size(), bytes);
-
- // This is called before `send` syscall inducing latency
- MessageSent(*writeMessages.GetVector());
- }
-
- size_t TRemoteConnection::MessageSize(TArrayRef<TBusMessagePtrAndHeader> messages) {
- size_t size = 0;
- for (const auto& message : messages)
- size += message.MessagePtr->RequestSize;
-
- return size;
- }
-
- size_t TRemoteConnection::GetInFlight() {
- return AtomicGet(WriterData.InFlight);
- }
-
- size_t TRemoteConnection::GetConnectSyscallsNumForTest() {
- return WriterData.Status.ConnectSyscalls;
- }
-
- void TRemoteConnection::WriterBeforeWriteErrorMessage(TBusMessage* message, EMessageStatus status) {
- if (Session->IsSource_) {
- CheckedCast<TRemoteClientSession*>(Session.Get())->ReleaseInFlight({message});
- WriterErrorMessage(message, status);
- } else {
- TBusMessagePtrAndHeader h(message);
- CheckedCast<TRemoteServerSession*>(Session.Get())->ReleaseInWorkResponses(MakeArrayRef(&h, 1));
- WriterErrorMessage(h.MessagePtr.Release(), status);
- }
- }
-
- void TRemoteConnection::WriterErrorMessage(TNonDestroyingAutoPtr<TBusMessage> m, EMessageStatus status) {
- TBusMessage* released = m.Release();
- WriterErrorMessages(MakeArrayRef(&released, 1), status);
- }
-
- void TRemoteConnection::WriterErrorMessages(const TArrayRef<TBusMessage*> ms, EMessageStatus status) {
- ResetOneWayFlag(ms);
-
- WriterData.Status.Incremental.StatusCounter[status] += ms.size();
- for (auto m : ms) {
- Session->InvokeOnError(m, status);
- }
- }
-
- void TRemoteConnection::FireClientConnectionEvent(TClientConnectionEvent::EType type) {
- Y_VERIFY(Session->IsSource_, "state check");
- TClientConnectionEvent event(type, ConnectionId, PeerAddr);
- TRemoteClientSession* session = CheckedCast<TRemoteClientSession*>(Session.Get());
- session->ClientHandler->OnClientConnectionEvent(event);
- }
-
- bool TRemoteConnection::IsAlive() const {
- return !AtomicGet(WriterData.Down);
- }
+
+ LWPROBE(Read, r->GetHeader()->Size);
+
+ r->ReplyTo = PeerAddrSocketAddr;
+
+ TBusMessagePtrAndHeader h(r);
+ r->RecvTime = now;
+
+ QuotaConsume(1, header.Size);
+
+ ReaderData.ReadMessages.push_back(h);
+ if (ReaderData.ReadMessages.size() >= 100) {
+ ReaderFlushMessages();
+ }
+
+ return true;
+ }
+
+ void TRemoteConnection::WriterFillBuffer() {
+ Y_ASSERT(WriterData.State == WRITER_FILLING);
+
+ Y_ASSERT(WriterData.Buffer.LeftSize() == 0);
+
+ if (Y_UNLIKELY(!WrongVersionRequests.IsEmpty())) {
+ TVector<TBusHeader> headers;
+ WrongVersionRequests.DequeueAllSingleConsumer(&headers);
+ for (TVector<TBusHeader>::reverse_iterator header = headers.rbegin();
+ header != headers.rend(); ++header) {
+ TBusHeader response = *header;
+ response.SendTime = NBus::Now();
+ response.Size = sizeof(TBusHeader);
+ response.FlagsInternal = 0;
+ response.SetVersionInternal(YBUS_VERSION);
+ WriteHeader(response, WriterData.Buffer.GetBuffer());
+ }
+
+ Y_ASSERT(!WriterData.Buffer.Empty());
+ WriterData.State = WRITER_FLUSHING;
+ return;
+ }
+
+ TTempTlsVector<TBusMessagePtrAndHeader, void, TVectorSwaps> writeMessages;
+
+ for (;;) {
+ THolder<TBusMessage> writeMessage(WriterData.SendQueue.PopFront());
+ if (!writeMessage) {
+ break;
+ }
+
+ if (Config.Cork != TDuration::Zero()) {
+ if (WriterData.CorkUntil == TInstant::Zero()) {
+ WriterData.CorkUntil = TInstant::Now() + Config.Cork;
+ }
+ }
+
+ size_t sizeBeforeSerialize = WriterData.Buffer.Size();
+
+ TMessageCounter messageCounter = WriterData.Status.Incremental.MessageCounter;
+
+ SerializeMessage(writeMessage.Get(), &WriterData.Buffer.GetBuffer(), &messageCounter);
+
+ size_t written = WriterData.Buffer.Size() - sizeBeforeSerialize;
+ if (written > Config.MaxMessageSize) {
+ WriterData.Buffer.GetBuffer().EraseBack(written);
+ WriterBeforeWriteErrorMessage(writeMessage.Release(), MESSAGE_MESSAGE_TOO_LARGE);
+ continue;
+ }
+
+ WriterData.Status.Incremental.MessageCounter = messageCounter;
+
+ TBusMessagePtrAndHeader h(writeMessage.Release());
+ writeMessages.GetVector()->push_back(h);
+
+ Y_ASSERT(!WriterData.Buffer.Empty());
+ if (WriterData.Buffer.Size() >= Config.SendThreshold) {
+ break;
+ }
+ }
+
+ if (!WriterData.Buffer.Empty()) {
+ if (WriterData.Buffer.Size() >= Config.SendThreshold) {
+ WriterData.State = WRITER_FLUSHING;
+ } else if (WriterData.CorkUntil == TInstant::Zero()) {
+ WriterData.State = WRITER_FLUSHING;
+ } else if (TInstant::Now() >= WriterData.CorkUntil) {
+ WriterData.State = WRITER_FLUSHING;
+ } else {
+ // keep filling
+ Y_ASSERT(WriterData.State == WRITER_FILLING);
+ GetWriterSchedulerActor()->ScheduleAt(WriterData.CorkUntil);
+ }
+ } else {
+ // keep filling
+ Y_ASSERT(WriterData.State == WRITER_FILLING);
+ }
+
+ size_t bytes = MessageSize(*writeMessages.GetVector());
+
+ QuotaReturnSelf(writeMessages.GetVector()->size(), bytes);
+
+ // This is called before `send` syscall inducing latency
+ MessageSent(*writeMessages.GetVector());
+ }
+
+ size_t TRemoteConnection::MessageSize(TArrayRef<TBusMessagePtrAndHeader> messages) {
+ size_t size = 0;
+ for (const auto& message : messages)
+ size += message.MessagePtr->RequestSize;
+
+ return size;
+ }
+
+ size_t TRemoteConnection::GetInFlight() {
+ return AtomicGet(WriterData.InFlight);
+ }
+
+ size_t TRemoteConnection::GetConnectSyscallsNumForTest() {
+ return WriterData.Status.ConnectSyscalls;
+ }
+
+ void TRemoteConnection::WriterBeforeWriteErrorMessage(TBusMessage* message, EMessageStatus status) {
+ if (Session->IsSource_) {
+ CheckedCast<TRemoteClientSession*>(Session.Get())->ReleaseInFlight({message});
+ WriterErrorMessage(message, status);
+ } else {
+ TBusMessagePtrAndHeader h(message);
+ CheckedCast<TRemoteServerSession*>(Session.Get())->ReleaseInWorkResponses(MakeArrayRef(&h, 1));
+ WriterErrorMessage(h.MessagePtr.Release(), status);
+ }
+ }
+
+ void TRemoteConnection::WriterErrorMessage(TNonDestroyingAutoPtr<TBusMessage> m, EMessageStatus status) {
+ TBusMessage* released = m.Release();
+ WriterErrorMessages(MakeArrayRef(&released, 1), status);
+ }
+
+ void TRemoteConnection::WriterErrorMessages(const TArrayRef<TBusMessage*> ms, EMessageStatus status) {
+ ResetOneWayFlag(ms);
+
+ WriterData.Status.Incremental.StatusCounter[status] += ms.size();
+ for (auto m : ms) {
+ Session->InvokeOnError(m, status);
+ }
+ }
+
+ void TRemoteConnection::FireClientConnectionEvent(TClientConnectionEvent::EType type) {
+ Y_VERIFY(Session->IsSource_, "state check");
+ TClientConnectionEvent event(type, ConnectionId, PeerAddr);
+ TRemoteClientSession* session = CheckedCast<TRemoteClientSession*>(Session.Get());
+ session->ClientHandler->OnClientConnectionEvent(event);
+ }
+
+ bool TRemoteConnection::IsAlive() const {
+ return !AtomicGet(WriterData.Down);
+ }
}
}