diff options
author | yazevnul <yazevnul@yandex-team.ru> | 2022-02-10 16:46:46 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:46:46 +0300 |
commit | 8cbc307de0221f84c80c42dcbe07d40727537e2c (patch) | |
tree | 625d5a673015d1df891e051033e9fcde5c7be4e5 /library/cpp/messagebus/remote_client_connection.cpp | |
parent | 30d1ef3941e0dc835be7609de5ebee66958f215a (diff) | |
download | ydb-8cbc307de0221f84c80c42dcbe07d40727537e2c.tar.gz |
Restoring authorship annotation for <yazevnul@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/remote_client_connection.cpp')
-rw-r--r-- | library/cpp/messagebus/remote_client_connection.cpp | 42 |
1 files changed, 21 insertions, 21 deletions
diff --git a/library/cpp/messagebus/remote_client_connection.cpp b/library/cpp/messagebus/remote_client_connection.cpp index 8c7a6db3a8..9e173706f5 100644 --- a/library/cpp/messagebus/remote_client_connection.cpp +++ b/library/cpp/messagebus/remote_client_connection.cpp @@ -20,7 +20,7 @@ TRemoteClientConnection::TRemoteClientConnection(TRemoteClientSessionPtr session : TRemoteConnection(session.Get(), id, addr) , ClientHandler(GetSession()->ClientHandler) { - Y_VERIFY(addr.GetPort() > 0, "must connect to non-zero port"); + Y_VERIFY(addr.GetPort() > 0, "must connect to non-zero port"); ScheduleWrite(); } @@ -35,7 +35,7 @@ TBusMessage* TRemoteClientConnection::PopAck(TBusKey id) { SOCKET TRemoteClientConnection::CreateSocket(const TNetAddr& addr) { SOCKET handle = socket(addr.Addr()->sa_family, SOCK_STREAM, 0); - Y_VERIFY(handle != INVALID_SOCKET, "failed to create socket: %s", LastSystemErrorText()); + Y_VERIFY(handle != INVALID_SOCKET, "failed to create socket: %s", LastSystemErrorText()); TSocketHolder s(handle); @@ -61,7 +61,7 @@ void TRemoteClientConnection::TryConnect() { if (AtomicGet(WriterData.Down)) { return; } - Y_VERIFY(!WriterData.Status.Connected); + Y_VERIFY(!WriterData.Status.Connected); TInstant now = TInstant::Now(); @@ -119,8 +119,8 @@ void TRemoteClientConnection::TryConnect() { } void TRemoteClientConnection::HandleEvent(SOCKET socket, void* cookie) { - Y_UNUSED(socket); - Y_ASSERT(cookie == WriteCookie || cookie == ReadCookie); + Y_UNUSED(socket); + Y_ASSERT(cookie == WriteCookie || cookie == ReadCookie); if (cookie == ReadCookie) { ScheduleRead(); } else { @@ -181,18 +181,18 @@ void TRemoteClientConnection::ProcessReplyQueue() { workQueueTemp.GetVector()->reserve(replyQueueTemp.GetVector()->size()); } - for (auto& resp : *replyQueueTemp.GetVector()) { - TBusMessage* req = PopAck(resp.Header.Id); + for (auto& resp : *replyQueueTemp.GetVector()) { + TBusMessage* req = PopAck(resp.Header.Id); if (!req) { - WriterErrorMessage(resp.MessagePtr.Release(), MESSAGE_UNKNOWN); + WriterErrorMessage(resp.MessagePtr.Release(), MESSAGE_UNKNOWN); continue; } if (executeInWorkerPool) { - workQueueTemp.GetVector()->push_back(new TInvokeOnReply(GetSession(), req, resp)); + workQueueTemp.GetVector()->push_back(new TInvokeOnReply(GetSession(), req, resp)); } else { - GetSession()->ReleaseInFlightAndCallOnReply(req, resp); + GetSession()->ReleaseInFlightAndCallOnReply(req, resp); } } @@ -252,11 +252,11 @@ void TRemoteClientConnection::ReaderProcessMessageUnknownVersion(TArrayRef<const LWPROBE(Error, ToString(MESSAGE_INVALID_VERSION), ToString(PeerAddr), ""); ReaderData.Status.Incremental.StatusCounter[MESSAGE_INVALID_VERSION] += 1; // TODO: close connection - Y_FAIL("unknown message"); + Y_FAIL("unknown message"); } void TRemoteClientConnection::ClearOutgoingQueue(TMessagesPtrs& result, bool reconnect) { - Y_ASSERT(result.empty()); + Y_ASSERT(result.empty()); TRemoteConnection::ClearOutgoingQueue(result, reconnect); AckMessages.Clear(&result); @@ -266,26 +266,26 @@ void TRemoteClientConnection::ClearOutgoingQueue(TMessagesPtrs& result, bool rec } void TRemoteClientConnection::MessageSent(TArrayRef<TBusMessagePtrAndHeader> messages) { - for (auto& message : messages) { - bool oneWay = message.LocalFlags & MESSAGE_ONE_WAY_INTERNAL; + for (auto& message : messages) { + bool oneWay = message.LocalFlags & MESSAGE_ONE_WAY_INTERNAL; if (oneWay) { - message.MessagePtr->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL; + message.MessagePtr->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL; - TBusMessage* ackMsg = this->PopAck(message.Header.Id); + TBusMessage* ackMsg = this->PopAck(message.Header.Id); if (!ackMsg) { // TODO: expired? } - if (ackMsg != message.MessagePtr.Get()) { + if (ackMsg != message.MessagePtr.Get()) { // TODO: non-unique id? } GetSession()->ReleaseInFlight({message.MessagePtr.Get()}); - ClientHandler->OnMessageSentOneWay(message.MessagePtr.Release()); + ClientHandler->OnMessageSentOneWay(message.MessagePtr.Release()); } else { - ClientHandler->OnMessageSent(message.MessagePtr.Get()); - AckMessages.Push(message); + ClientHandler->OnMessageSent(message.MessagePtr.Get()); + AckMessages.Push(message); } } } @@ -306,7 +306,7 @@ EMessageStatus TRemoteClientConnection::SendMessageImpl(TBusMessage* msg, bool w } if (wait) { - Y_VERIFY(!Session->Queue->GetExecutor()->IsInExecutorThread()); + Y_VERIFY(!Session->Queue->GetExecutor()->IsInExecutorThread()); GetSession()->ClientRemoteInFlight.Wait(); } else { if (!GetSession()->ClientRemoteInFlight.TryWait()) { |