aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/remote_connection.cpp
diff options
context:
space:
mode:
authorsomov <somov@yandex-team.ru>2022-02-10 16:45:47 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:47 +0300
commita5950576e397b1909261050b8c7da16db58f10b1 (patch)
tree7ba7677f6a4c3e19e2cefab34d16df2c8963b4d4 /library/cpp/messagebus/remote_connection.cpp
parent81eddc8c0b55990194e112b02d127b87d54164a9 (diff)
downloadydb-a5950576e397b1909261050b8c7da16db58f10b1.tar.gz
Restoring authorship annotation for <somov@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/remote_connection.cpp')
-rw-r--r--library/cpp/messagebus/remote_connection.cpp58
1 files changed, 29 insertions, 29 deletions
diff --git a/library/cpp/messagebus/remote_connection.cpp b/library/cpp/messagebus/remote_connection.cpp
index 22932569db..59b58f7797 100644
--- a/library/cpp/messagebus/remote_connection.cpp
+++ b/library/cpp/messagebus/remote_connection.cpp
@@ -7,9 +7,9 @@
#include "remote_client_session.h"
#include "remote_server_session.h"
#include "session_impl.h"
-
+
#include <library/cpp/messagebus/actor/what_thread_does.h>
-
+
#include <util/generic/cast.h>
#include <util/network/init.h>
#include <util/system/atomic.h>
@@ -44,7 +44,7 @@ namespace NBus {
WriterData.Status.ConnectionId = connectionId;
WriterData.Status.PeerAddr = PeerAddr;
ReaderData.Status.ConnectionId = connectionId;
-
+
const TInstant now = TInstant::Now();
WriterFillStatus();
@@ -70,7 +70,7 @@ namespace NBus {
Y_VERIFY(AtomicGet(Down));
Y_VERIFY(SendQueue.Empty());
}
-
+
bool TRemoteConnection::TReaderData::HasBytesInBuf(size_t bytes) noexcept {
size_t left = Buffer.Size() - Offset;
@@ -137,7 +137,7 @@ namespace NBus {
void TRemoteConnection::Shutdown(EMessageStatus status) {
ScheduleShutdown(status);
-
+
ReaderData.ShutdownComplete.WaitI();
WriterData.ShutdownComplete.WaitI();
}
@@ -145,15 +145,15 @@ namespace NBus {
void TRemoteConnection::TryConnect() {
Y_FAIL("TryConnect is client connection only operation");
}
-
+
void TRemoteConnection::ScheduleRead() {
GetReaderActor()->Schedule();
}
-
+
void TRemoteConnection::ScheduleWrite() {
GetWriterActor()->Schedule();
}
-
+
void TRemoteConnection::WriterRotateCounters() {
if (!WriterData.TimeToRotateCounters.FetchTask()) {
return;
@@ -383,7 +383,7 @@ namespace NBus {
if (ReaderData.Buffer.Capacity() > MaxBufferSize && ReaderData.Buffer.Size() <= MaxBufferSize) {
ReaderData.Status.Incremental.BufferDrops += 1;
-
+
TBuffer temp;
// probably should use another constant
temp.Reserve(Config.DefaultBufferSize);
@@ -391,7 +391,7 @@ namespace NBus {
ReaderData.Buffer.Swap(temp);
}
-
+
return true;
}
@@ -406,7 +406,7 @@ namespace NBus {
ReaderData.Buffer.Reserve(ReaderData.Buffer.Size() * 2);
}
}
-
+
Y_ASSERT(ReaderData.Buffer.Avail() > 0);
ssize_t bytes;
@@ -465,27 +465,27 @@ namespace NBus {
if (!Session->IsSource_) {
message->SendTime = now.MilliSeconds();
}
-
+
WriterData.SendQueue.PushBack(message);
}
-
+
void TRemoteConnection::ProcessBeforeSendQueue(TInstant now) {
BeforeSendQueue.DequeueAll(std::bind(&TRemoteConnection::ProcessBeforeSendQueueMessage, this, std::placeholders::_1, now));
- }
-
+ }
+
void TRemoteConnection::WriterFillInFlight() {
// this is hack for TLoadBalancedProtocol
WriterFillStatus();
AtomicSet(WriterData.InFlight, WriterData.Status.GetInFlight());
}
-
+
const TRemoteConnectionWriterStatus& TRemoteConnection::WriterGetStatus() {
WriterRotateCounters();
WriterFillStatus();
return WriterData.Status;
}
-
+
void TRemoteConnection::WriterFillStatus() {
if (!!WriterData.Channel) {
WriterData.Status.Fd = WriterData.Channel->GetSocket();
@@ -644,11 +644,11 @@ namespace NBus {
if (WriterData.Buffer.Capacity() > MaxBufferSize) {
WriterData.Status.Incremental.BufferDrops += 1;
WriterData.Buffer.Reset();
- }
+ }
WriterData.State = WRITER_FILLING;
}
-
+
void TRemoteConnection::ScheduleShutdownOnServerOrReconnectOnClient(EMessageStatus status, bool writer) {
if (Session->IsSource_) {
WriterGetReconnectQueue()->EnqueueAndSchedule(writer ? WriterData.SocketVersion : ReaderData.SocketVersion);
@@ -662,11 +662,11 @@ namespace NBus {
AtomicSet(ReaderData.Down, 1);
ScheduleRead();
-
+
AtomicSet(WriterData.Down, 1);
ScheduleWrite();
}
-
+
void TRemoteConnection::CallSerialize(TBusMessage* msg, TBuffer& buffer) const {
size_t posForAssertion = buffer.Size();
Proto->Serialize(msg, buffer);
@@ -688,12 +688,12 @@ namespace NBus {
}
}
-
+
void TRemoteConnection::SerializeMessage(TBusMessage* msg, TBuffer* data, TMessageCounter* counter) const {
size_t pos = data->Size();
-
+
size_t dataSize;
-
+
bool compressionRequested = msg->IsCompressed();
if (compressionRequested) {
@@ -821,20 +821,20 @@ namespace NBus {
TBusMessagePtrAndHeader h(r);
r->RecvTime = now;
-
+
QuotaConsume(1, header.Size);
ReaderData.ReadMessages.push_back(h);
if (ReaderData.ReadMessages.size() >= 100) {
ReaderFlushMessages();
}
-
+
return true;
}
void TRemoteConnection::WriterFillBuffer() {
Y_ASSERT(WriterData.State == WRITER_FILLING);
-
+
Y_ASSERT(WriterData.Buffer.LeftSize() == 0);
if (Y_UNLIKELY(!WrongVersionRequests.IsEmpty())) {
@@ -868,7 +868,7 @@ namespace NBus {
WriterData.CorkUntil = TInstant::Now() + Config.Cork;
}
}
-
+
size_t sizeBeforeSerialize = WriterData.Buffer.Size();
TMessageCounter messageCounter = WriterData.Status.Incremental.MessageCounter;
@@ -952,7 +952,7 @@ namespace NBus {
void TRemoteConnection::WriterErrorMessages(const TArrayRef<TBusMessage*> ms, EMessageStatus status) {
ResetOneWayFlag(ms);
-
+
WriterData.Status.Incremental.StatusCounter[status] += ms.size();
for (auto m : ms) {
Session->InvokeOnError(m, status);