aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/remote_connection.cpp
diff options
context:
space:
mode:
authorsingle <single@yandex-team.ru>2022-02-10 16:50:30 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:30 +0300
commitf7835298a8840c8e5d98715bf23efa9c7e03b9c4 (patch)
tree5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/messagebus/remote_connection.cpp
parent8ae96df130bbede609c3504aa9af1bc6ff5361b3 (diff)
downloadydb-f7835298a8840c8e5d98715bf23efa9c7e03b9c4.tar.gz
Restoring authorship annotation for <single@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/remote_connection.cpp')
-rw-r--r--library/cpp/messagebus/remote_connection.cpp100
1 files changed, 50 insertions, 50 deletions
diff --git a/library/cpp/messagebus/remote_connection.cpp b/library/cpp/messagebus/remote_connection.cpp
index 730fc0f554..22932569db 100644
--- a/library/cpp/messagebus/remote_connection.cpp
+++ b/library/cpp/messagebus/remote_connection.cpp
@@ -48,11 +48,11 @@ namespace NBus {
const TInstant now = TInstant::Now();
WriterFillStatus();
-
+
GranStatus.Writer.Update(WriterData.Status, now, true);
GranStatus.Reader.Update(ReaderData.Status, now, true);
}
-
+
TRemoteConnection::~TRemoteConnection() {
Y_VERIFY(ReplyQueue.IsEmpty());
}
@@ -73,7 +73,7 @@ namespace NBus {
bool TRemoteConnection::TReaderData::HasBytesInBuf(size_t bytes) noexcept {
size_t left = Buffer.Size() - Offset;
-
+
return (MoreBytes = left >= bytes ? 0 : bytes - left) == 0;
}
@@ -83,13 +83,13 @@ namespace NBus {
Y_VERIFY(State == WRITER_FILLING, "state must be initial");
Channel = channel;
}
-
+
void TRemoteConnection::TReaderData::SetChannel(NEventLoop::TChannelPtr channel) {
Y_VERIFY(!Channel, "must not have channel");
Y_VERIFY(Buffer.Empty(), "buffer must be empty");
Channel = channel;
}
-
+
void TRemoteConnection::TWriterData::DropChannel() {
if (!!Channel) {
Channel->Unregister();
@@ -184,7 +184,7 @@ namespace NBus {
ReaderData.Status.Fd = INVALID_SOCKET;
return;
}
-
+
ReaderData.DropChannel();
ReaderData.Status.Fd = readSocket.Socket;
@@ -232,10 +232,10 @@ namespace NBus {
ReaderData.Status.Acts += 1;
ReaderGetSocketQueue()->DequeueAllLikelyEmpty();
-
+
if (AtomicGet(ReaderData.Down)) {
ReaderData.DropChannel();
-
+
ReaderProcessStatusDown();
ReaderData.ShutdownComplete.Signal();
@@ -262,7 +262,7 @@ namespace NBus {
}
ReaderFlushMessages();
- }
+ }
ReaderSendStatus(now);
}
@@ -275,109 +275,109 @@ namespace NBus {
else if (!QuotaBytes.Acquire(bytes))
wakeFlags |= WAKE_QUOTA_BYTES;
-
+
if (wakeFlags) {
ReaderData.Status.QuotaExhausted++;
-
+
WriterGetWakeQueue()->EnqueueAndSchedule(wakeFlags);
}
-
+
return wakeFlags == 0;
}
-
+
void TRemoteConnection::QuotaConsume(size_t msg, size_t bytes) {
QuotaMsg.Consume(msg);
QuotaBytes.Consume(bytes);
}
-
+
void TRemoteConnection::QuotaReturnSelf(size_t items, size_t bytes) {
if (QuotaReturnValues(items, bytes))
ReadQuotaWakeup();
}
-
+
void TRemoteConnection::QuotaReturnAside(size_t items, size_t bytes) {
if (QuotaReturnValues(items, bytes) && !AtomicGet(WriterData.Down))
WriterGetWakeQueue()->EnqueueAndSchedule(0x0);
}
-
+
bool TRemoteConnection::QuotaReturnValues(size_t items, size_t bytes) {
bool rMsg = QuotaMsg.Return(items);
bool rBytes = QuotaBytes.Return(bytes);
-
+
return rMsg || rBytes;
}
-
+
void TRemoteConnection::ReadQuotaWakeup() {
const ui32 mask = WriterData.AwakeFlags & WriteWakeFlags();
-
+
if (mask && mask == WriterData.AwakeFlags) {
WriterData.Status.ReaderWakeups++;
WriterData.AwakeFlags = 0;
-
+
ScheduleRead();
}
}
-
+
ui32 TRemoteConnection::WriteWakeFlags() const {
ui32 awakeFlags = 0;
-
+
if (QuotaMsg.IsAboveWake())
awakeFlags |= WAKE_QUOTA_MSG;
-
+
if (QuotaBytes.IsAboveWake())
awakeFlags |= WAKE_QUOTA_BYTES;
-
+
return awakeFlags;
}
-
+
bool TRemoteConnection::ReaderProcessBuffer() {
TInstant now = TInstant::Now();
-
+
for (;;) {
if (!ReaderData.HasBytesInBuf(sizeof(TBusHeader))) {
break;
}
-
+
TBusHeader header(MakeArrayRef(ReaderData.Buffer.Data() + ReaderData.Offset, ReaderData.Buffer.Size() - ReaderData.Offset));
-
+
if (header.Size < sizeof(TBusHeader)) {
LWPROBE(Error, ToString(MESSAGE_HEADER_CORRUPTED), ToString(PeerAddr), ToString(header.Size));
ReaderData.Status.Incremental.StatusCounter[MESSAGE_HEADER_CORRUPTED] += 1;
ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_HEADER_CORRUPTED, false);
return false;
}
-
+
if (!IsVersionNegotiation(header) && !IsBusKeyValid(header.Id)) {
LWPROBE(Error, ToString(MESSAGE_HEADER_CORRUPTED), ToString(PeerAddr), ToString(header.Size));
ReaderData.Status.Incremental.StatusCounter[MESSAGE_HEADER_CORRUPTED] += 1;
ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_HEADER_CORRUPTED, false);
return false;
}
-
+
if (header.Size > Config.MaxMessageSize) {
LWPROBE(Error, ToString(MESSAGE_MESSAGE_TOO_LARGE), ToString(PeerAddr), ToString(header.Size));
ReaderData.Status.Incremental.StatusCounter[MESSAGE_MESSAGE_TOO_LARGE] += 1;
ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_MESSAGE_TOO_LARGE, false);
return false;
}
-
+
if (!ReaderData.HasBytesInBuf(header.Size)) {
if (ReaderData.Offset == 0) {
ReaderData.Buffer.Reserve(header.Size);
}
break;
}
-
+
if (!QuotaAcquire(1, header.Size))
return false;
-
+
if (!MessageRead(MakeArrayRef(ReaderData.Buffer.Data() + ReaderData.Offset, header.Size), now)) {
return false;
}
-
+
ReaderData.Offset += header.Size;
}
-
+
ReaderData.Buffer.ChopHead(ReaderData.Offset);
ReaderData.Offset = 0;
@@ -408,7 +408,7 @@ namespace NBus {
}
Y_ASSERT(ReaderData.Buffer.Avail() > 0);
-
+
ssize_t bytes;
{
TWhatThreadDoesPushPop pp("recv syscall");
@@ -454,7 +454,7 @@ namespace NBus {
message != replyQueueTemp.rend(); ++message) {
messages.push_back(message->MessagePtr.Release());
}
-
+
WriterErrorMessages(messages, reason);
replyQueueTemp.clear();
@@ -535,10 +535,10 @@ namespace NBus {
ClearBeforeSendQueue(reasonForQueues);
WriterGetReconnectQueue()->Clear();
WriterGetWakeQueue()->Clear();
-
+
TMessagesPtrs cleared;
ClearOutgoingQueue(cleared, false);
-
+
if (!Session->IsSource_) {
for (auto& i : cleared) {
TBusMessagePtrAndHeader h(i);
@@ -548,10 +548,10 @@ namespace NBus {
// and this part is not batch
}
}
-
+
WriterErrorMessages(cleared, reason);
}
-
+
void TRemoteConnection::BeforeTryWrite() {
}
@@ -638,7 +638,7 @@ namespace NBus {
WriterData.Status.Incremental.NetworkOps += 1;
WriterData.Buffer.LeftProceed(bytes);
- }
+ }
WriterData.Buffer.Clear();
if (WriterData.Buffer.Capacity() > MaxBufferSize) {
@@ -654,12 +654,12 @@ namespace NBus {
WriterGetReconnectQueue()->EnqueueAndSchedule(writer ? WriterData.SocketVersion : ReaderData.SocketVersion);
} else {
ScheduleShutdown(status);
- }
+ }
}
void TRemoteConnection::ScheduleShutdown(EMessageStatus status) {
ShutdownReason = status;
-
+
AtomicSet(ReaderData.Down, 1);
ScheduleRead();
@@ -856,7 +856,7 @@ namespace NBus {
}
TTempTlsVector<TBusMessagePtrAndHeader, void, TVectorSwaps> writeMessages;
-
+
for (;;) {
THolder<TBusMessage> writeMessage(WriterData.SendQueue.PopFront());
if (!writeMessage) {
@@ -944,12 +944,12 @@ namespace NBus {
WriterErrorMessage(h.MessagePtr.Release(), status);
}
}
-
+
void TRemoteConnection::WriterErrorMessage(TNonDestroyingAutoPtr<TBusMessage> m, EMessageStatus status) {
TBusMessage* released = m.Release();
WriterErrorMessages(MakeArrayRef(&released, 1), status);
}
-
+
void TRemoteConnection::WriterErrorMessages(const TArrayRef<TBusMessage*> ms, EMessageStatus status) {
ResetOneWayFlag(ms);
@@ -958,17 +958,17 @@ namespace NBus {
Session->InvokeOnError(m, status);
}
}
-
+
void TRemoteConnection::FireClientConnectionEvent(TClientConnectionEvent::EType type) {
Y_VERIFY(Session->IsSource_, "state check");
TClientConnectionEvent event(type, ConnectionId, PeerAddr);
TRemoteClientSession* session = CheckedCast<TRemoteClientSession*>(Session.Get());
session->ClientHandler->OnClientConnectionEvent(event);
}
-
+
bool TRemoteConnection::IsAlive() const {
return !AtomicGet(WriterData.Down);
}
-
+
}
}