aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/remote_client_connection.cpp
diff options
context:
space:
mode:
authoryazevnul <yazevnul@yandex-team.ru>2022-02-10 16:46:46 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:46:46 +0300
commit8cbc307de0221f84c80c42dcbe07d40727537e2c (patch)
tree625d5a673015d1df891e051033e9fcde5c7be4e5 /library/cpp/messagebus/remote_client_connection.cpp
parent30d1ef3941e0dc835be7609de5ebee66958f215a (diff)
downloadydb-8cbc307de0221f84c80c42dcbe07d40727537e2c.tar.gz
Restoring authorship annotation for <yazevnul@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/remote_client_connection.cpp')
-rw-r--r--library/cpp/messagebus/remote_client_connection.cpp42
1 files changed, 21 insertions, 21 deletions
diff --git a/library/cpp/messagebus/remote_client_connection.cpp b/library/cpp/messagebus/remote_client_connection.cpp
index 8c7a6db3a8..9e173706f5 100644
--- a/library/cpp/messagebus/remote_client_connection.cpp
+++ b/library/cpp/messagebus/remote_client_connection.cpp
@@ -20,7 +20,7 @@ TRemoteClientConnection::TRemoteClientConnection(TRemoteClientSessionPtr session
: TRemoteConnection(session.Get(), id, addr)
, ClientHandler(GetSession()->ClientHandler)
{
- Y_VERIFY(addr.GetPort() > 0, "must connect to non-zero port");
+ Y_VERIFY(addr.GetPort() > 0, "must connect to non-zero port");
ScheduleWrite();
}
@@ -35,7 +35,7 @@ TBusMessage* TRemoteClientConnection::PopAck(TBusKey id) {
SOCKET TRemoteClientConnection::CreateSocket(const TNetAddr& addr) {
SOCKET handle = socket(addr.Addr()->sa_family, SOCK_STREAM, 0);
- Y_VERIFY(handle != INVALID_SOCKET, "failed to create socket: %s", LastSystemErrorText());
+ Y_VERIFY(handle != INVALID_SOCKET, "failed to create socket: %s", LastSystemErrorText());
TSocketHolder s(handle);
@@ -61,7 +61,7 @@ void TRemoteClientConnection::TryConnect() {
if (AtomicGet(WriterData.Down)) {
return;
}
- Y_VERIFY(!WriterData.Status.Connected);
+ Y_VERIFY(!WriterData.Status.Connected);
TInstant now = TInstant::Now();
@@ -119,8 +119,8 @@ void TRemoteClientConnection::TryConnect() {
}
void TRemoteClientConnection::HandleEvent(SOCKET socket, void* cookie) {
- Y_UNUSED(socket);
- Y_ASSERT(cookie == WriteCookie || cookie == ReadCookie);
+ Y_UNUSED(socket);
+ Y_ASSERT(cookie == WriteCookie || cookie == ReadCookie);
if (cookie == ReadCookie) {
ScheduleRead();
} else {
@@ -181,18 +181,18 @@ void TRemoteClientConnection::ProcessReplyQueue() {
workQueueTemp.GetVector()->reserve(replyQueueTemp.GetVector()->size());
}
- for (auto& resp : *replyQueueTemp.GetVector()) {
- TBusMessage* req = PopAck(resp.Header.Id);
+ for (auto& resp : *replyQueueTemp.GetVector()) {
+ TBusMessage* req = PopAck(resp.Header.Id);
if (!req) {
- WriterErrorMessage(resp.MessagePtr.Release(), MESSAGE_UNKNOWN);
+ WriterErrorMessage(resp.MessagePtr.Release(), MESSAGE_UNKNOWN);
continue;
}
if (executeInWorkerPool) {
- workQueueTemp.GetVector()->push_back(new TInvokeOnReply(GetSession(), req, resp));
+ workQueueTemp.GetVector()->push_back(new TInvokeOnReply(GetSession(), req, resp));
} else {
- GetSession()->ReleaseInFlightAndCallOnReply(req, resp);
+ GetSession()->ReleaseInFlightAndCallOnReply(req, resp);
}
}
@@ -252,11 +252,11 @@ void TRemoteClientConnection::ReaderProcessMessageUnknownVersion(TArrayRef<const
LWPROBE(Error, ToString(MESSAGE_INVALID_VERSION), ToString(PeerAddr), "");
ReaderData.Status.Incremental.StatusCounter[MESSAGE_INVALID_VERSION] += 1;
// TODO: close connection
- Y_FAIL("unknown message");
+ Y_FAIL("unknown message");
}
void TRemoteClientConnection::ClearOutgoingQueue(TMessagesPtrs& result, bool reconnect) {
- Y_ASSERT(result.empty());
+ Y_ASSERT(result.empty());
TRemoteConnection::ClearOutgoingQueue(result, reconnect);
AckMessages.Clear(&result);
@@ -266,26 +266,26 @@ void TRemoteClientConnection::ClearOutgoingQueue(TMessagesPtrs& result, bool rec
}
void TRemoteClientConnection::MessageSent(TArrayRef<TBusMessagePtrAndHeader> messages) {
- for (auto& message : messages) {
- bool oneWay = message.LocalFlags & MESSAGE_ONE_WAY_INTERNAL;
+ for (auto& message : messages) {
+ bool oneWay = message.LocalFlags & MESSAGE_ONE_WAY_INTERNAL;
if (oneWay) {
- message.MessagePtr->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL;
+ message.MessagePtr->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL;
- TBusMessage* ackMsg = this->PopAck(message.Header.Id);
+ TBusMessage* ackMsg = this->PopAck(message.Header.Id);
if (!ackMsg) {
// TODO: expired?
}
- if (ackMsg != message.MessagePtr.Get()) {
+ if (ackMsg != message.MessagePtr.Get()) {
// TODO: non-unique id?
}
GetSession()->ReleaseInFlight({message.MessagePtr.Get()});
- ClientHandler->OnMessageSentOneWay(message.MessagePtr.Release());
+ ClientHandler->OnMessageSentOneWay(message.MessagePtr.Release());
} else {
- ClientHandler->OnMessageSent(message.MessagePtr.Get());
- AckMessages.Push(message);
+ ClientHandler->OnMessageSent(message.MessagePtr.Get());
+ AckMessages.Push(message);
}
}
}
@@ -306,7 +306,7 @@ EMessageStatus TRemoteClientConnection::SendMessageImpl(TBusMessage* msg, bool w
}
if (wait) {
- Y_VERIFY(!Session->Queue->GetExecutor()->IsInExecutorThread());
+ Y_VERIFY(!Session->Queue->GetExecutor()->IsInExecutorThread());
GetSession()->ClientRemoteInFlight.Wait();
} else {
if (!GetSession()->ClientRemoteInFlight.TryWait()) {