diff options
author | single <single@yandex-team.ru> | 2022-02-10 16:50:30 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:30 +0300 |
commit | f7835298a8840c8e5d98715bf23efa9c7e03b9c4 (patch) | |
tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/messagebus/remote_connection.cpp | |
parent | 8ae96df130bbede609c3504aa9af1bc6ff5361b3 (diff) | |
download | ydb-f7835298a8840c8e5d98715bf23efa9c7e03b9c4.tar.gz |
Restoring authorship annotation for <single@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/remote_connection.cpp')
-rw-r--r-- | library/cpp/messagebus/remote_connection.cpp | 100 |
1 files changed, 50 insertions, 50 deletions
diff --git a/library/cpp/messagebus/remote_connection.cpp b/library/cpp/messagebus/remote_connection.cpp index 730fc0f554..22932569db 100644 --- a/library/cpp/messagebus/remote_connection.cpp +++ b/library/cpp/messagebus/remote_connection.cpp @@ -48,11 +48,11 @@ namespace NBus { const TInstant now = TInstant::Now(); WriterFillStatus(); - + GranStatus.Writer.Update(WriterData.Status, now, true); GranStatus.Reader.Update(ReaderData.Status, now, true); } - + TRemoteConnection::~TRemoteConnection() { Y_VERIFY(ReplyQueue.IsEmpty()); } @@ -73,7 +73,7 @@ namespace NBus { bool TRemoteConnection::TReaderData::HasBytesInBuf(size_t bytes) noexcept { size_t left = Buffer.Size() - Offset; - + return (MoreBytes = left >= bytes ? 0 : bytes - left) == 0; } @@ -83,13 +83,13 @@ namespace NBus { Y_VERIFY(State == WRITER_FILLING, "state must be initial"); Channel = channel; } - + void TRemoteConnection::TReaderData::SetChannel(NEventLoop::TChannelPtr channel) { Y_VERIFY(!Channel, "must not have channel"); Y_VERIFY(Buffer.Empty(), "buffer must be empty"); Channel = channel; } - + void TRemoteConnection::TWriterData::DropChannel() { if (!!Channel) { Channel->Unregister(); @@ -184,7 +184,7 @@ namespace NBus { ReaderData.Status.Fd = INVALID_SOCKET; return; } - + ReaderData.DropChannel(); ReaderData.Status.Fd = readSocket.Socket; @@ -232,10 +232,10 @@ namespace NBus { ReaderData.Status.Acts += 1; ReaderGetSocketQueue()->DequeueAllLikelyEmpty(); - + if (AtomicGet(ReaderData.Down)) { ReaderData.DropChannel(); - + ReaderProcessStatusDown(); ReaderData.ShutdownComplete.Signal(); @@ -262,7 +262,7 @@ namespace NBus { } ReaderFlushMessages(); - } + } ReaderSendStatus(now); } @@ -275,109 +275,109 @@ namespace NBus { else if (!QuotaBytes.Acquire(bytes)) wakeFlags |= WAKE_QUOTA_BYTES; - + if (wakeFlags) { ReaderData.Status.QuotaExhausted++; - + WriterGetWakeQueue()->EnqueueAndSchedule(wakeFlags); } - + return wakeFlags == 0; } - + void TRemoteConnection::QuotaConsume(size_t msg, size_t bytes) { QuotaMsg.Consume(msg); QuotaBytes.Consume(bytes); } - + void TRemoteConnection::QuotaReturnSelf(size_t items, size_t bytes) { if (QuotaReturnValues(items, bytes)) ReadQuotaWakeup(); } - + void TRemoteConnection::QuotaReturnAside(size_t items, size_t bytes) { if (QuotaReturnValues(items, bytes) && !AtomicGet(WriterData.Down)) WriterGetWakeQueue()->EnqueueAndSchedule(0x0); } - + bool TRemoteConnection::QuotaReturnValues(size_t items, size_t bytes) { bool rMsg = QuotaMsg.Return(items); bool rBytes = QuotaBytes.Return(bytes); - + return rMsg || rBytes; } - + void TRemoteConnection::ReadQuotaWakeup() { const ui32 mask = WriterData.AwakeFlags & WriteWakeFlags(); - + if (mask && mask == WriterData.AwakeFlags) { WriterData.Status.ReaderWakeups++; WriterData.AwakeFlags = 0; - + ScheduleRead(); } } - + ui32 TRemoteConnection::WriteWakeFlags() const { ui32 awakeFlags = 0; - + if (QuotaMsg.IsAboveWake()) awakeFlags |= WAKE_QUOTA_MSG; - + if (QuotaBytes.IsAboveWake()) awakeFlags |= WAKE_QUOTA_BYTES; - + return awakeFlags; } - + bool TRemoteConnection::ReaderProcessBuffer() { TInstant now = TInstant::Now(); - + for (;;) { if (!ReaderData.HasBytesInBuf(sizeof(TBusHeader))) { break; } - + TBusHeader header(MakeArrayRef(ReaderData.Buffer.Data() + ReaderData.Offset, ReaderData.Buffer.Size() - ReaderData.Offset)); - + if (header.Size < sizeof(TBusHeader)) { LWPROBE(Error, ToString(MESSAGE_HEADER_CORRUPTED), ToString(PeerAddr), ToString(header.Size)); ReaderData.Status.Incremental.StatusCounter[MESSAGE_HEADER_CORRUPTED] += 1; ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_HEADER_CORRUPTED, false); return false; } - + if (!IsVersionNegotiation(header) && !IsBusKeyValid(header.Id)) { LWPROBE(Error, ToString(MESSAGE_HEADER_CORRUPTED), ToString(PeerAddr), ToString(header.Size)); ReaderData.Status.Incremental.StatusCounter[MESSAGE_HEADER_CORRUPTED] += 1; ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_HEADER_CORRUPTED, false); return false; } - + if (header.Size > Config.MaxMessageSize) { LWPROBE(Error, ToString(MESSAGE_MESSAGE_TOO_LARGE), ToString(PeerAddr), ToString(header.Size)); ReaderData.Status.Incremental.StatusCounter[MESSAGE_MESSAGE_TOO_LARGE] += 1; ScheduleShutdownOnServerOrReconnectOnClient(MESSAGE_MESSAGE_TOO_LARGE, false); return false; } - + if (!ReaderData.HasBytesInBuf(header.Size)) { if (ReaderData.Offset == 0) { ReaderData.Buffer.Reserve(header.Size); } break; } - + if (!QuotaAcquire(1, header.Size)) return false; - + if (!MessageRead(MakeArrayRef(ReaderData.Buffer.Data() + ReaderData.Offset, header.Size), now)) { return false; } - + ReaderData.Offset += header.Size; } - + ReaderData.Buffer.ChopHead(ReaderData.Offset); ReaderData.Offset = 0; @@ -408,7 +408,7 @@ namespace NBus { } Y_ASSERT(ReaderData.Buffer.Avail() > 0); - + ssize_t bytes; { TWhatThreadDoesPushPop pp("recv syscall"); @@ -454,7 +454,7 @@ namespace NBus { message != replyQueueTemp.rend(); ++message) { messages.push_back(message->MessagePtr.Release()); } - + WriterErrorMessages(messages, reason); replyQueueTemp.clear(); @@ -535,10 +535,10 @@ namespace NBus { ClearBeforeSendQueue(reasonForQueues); WriterGetReconnectQueue()->Clear(); WriterGetWakeQueue()->Clear(); - + TMessagesPtrs cleared; ClearOutgoingQueue(cleared, false); - + if (!Session->IsSource_) { for (auto& i : cleared) { TBusMessagePtrAndHeader h(i); @@ -548,10 +548,10 @@ namespace NBus { // and this part is not batch } } - + WriterErrorMessages(cleared, reason); } - + void TRemoteConnection::BeforeTryWrite() { } @@ -638,7 +638,7 @@ namespace NBus { WriterData.Status.Incremental.NetworkOps += 1; WriterData.Buffer.LeftProceed(bytes); - } + } WriterData.Buffer.Clear(); if (WriterData.Buffer.Capacity() > MaxBufferSize) { @@ -654,12 +654,12 @@ namespace NBus { WriterGetReconnectQueue()->EnqueueAndSchedule(writer ? WriterData.SocketVersion : ReaderData.SocketVersion); } else { ScheduleShutdown(status); - } + } } void TRemoteConnection::ScheduleShutdown(EMessageStatus status) { ShutdownReason = status; - + AtomicSet(ReaderData.Down, 1); ScheduleRead(); @@ -856,7 +856,7 @@ namespace NBus { } TTempTlsVector<TBusMessagePtrAndHeader, void, TVectorSwaps> writeMessages; - + for (;;) { THolder<TBusMessage> writeMessage(WriterData.SendQueue.PopFront()); if (!writeMessage) { @@ -944,12 +944,12 @@ namespace NBus { WriterErrorMessage(h.MessagePtr.Release(), status); } } - + void TRemoteConnection::WriterErrorMessage(TNonDestroyingAutoPtr<TBusMessage> m, EMessageStatus status) { TBusMessage* released = m.Release(); WriterErrorMessages(MakeArrayRef(&released, 1), status); } - + void TRemoteConnection::WriterErrorMessages(const TArrayRef<TBusMessage*> ms, EMessageStatus status) { ResetOneWayFlag(ms); @@ -958,17 +958,17 @@ namespace NBus { Session->InvokeOnError(m, status); } } - + void TRemoteConnection::FireClientConnectionEvent(TClientConnectionEvent::EType type) { Y_VERIFY(Session->IsSource_, "state check"); TClientConnectionEvent event(type, ConnectionId, PeerAddr); TRemoteClientSession* session = CheckedCast<TRemoteClientSession*>(Session.Get()); session->ClientHandler->OnClientConnectionEvent(event); } - + bool TRemoteConnection::IsAlive() const { return !AtomicGet(WriterData.Down); } - + } } |