aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/remote_client_connection.cpp
diff options
context:
space:
mode:
authornga <nga@yandex-team.ru>2022-02-10 16:48:09 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:09 +0300
commit1f553f46fb4f3c5eec631352cdd900a0709016af (patch)
treea231fba2c03b440becaea6c86a2702d0bfb0336e /library/cpp/messagebus/remote_client_connection.cpp
parentc4de7efdedc25b49cbea74bd589eecb61b55b60a (diff)
downloadydb-1f553f46fb4f3c5eec631352cdd900a0709016af.tar.gz
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/remote_client_connection.cpp')
-rw-r--r--library/cpp/messagebus/remote_client_connection.cpp556
1 files changed, 278 insertions, 278 deletions
diff --git a/library/cpp/messagebus/remote_client_connection.cpp b/library/cpp/messagebus/remote_client_connection.cpp
index 8c7a6db3a8..b7b05e7bed 100644
--- a/library/cpp/messagebus/remote_client_connection.cpp
+++ b/library/cpp/messagebus/remote_client_connection.cpp
@@ -1,143 +1,143 @@
-#include "remote_client_connection.h"
-
+#include "remote_client_connection.h"
+
#include "mb_lwtrace.h"
#include "network.h"
-#include "remote_client_session.h"
-
+#include "remote_client_session.h"
+
#include <library/cpp/messagebus/actor/executor.h>
#include <library/cpp/messagebus/actor/temp_tls_vector.h>
-
+
#include <util/generic/cast.h>
#include <util/thread/singleton.h>
-
-LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER)
-
-using namespace NActor;
-using namespace NBus;
-using namespace NBus::NPrivate;
-
-TRemoteClientConnection::TRemoteClientConnection(TRemoteClientSessionPtr session, ui64 id, TNetAddr addr)
- : TRemoteConnection(session.Get(), id, addr)
- , ClientHandler(GetSession()->ClientHandler)
-{
+
+LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER)
+
+using namespace NActor;
+using namespace NBus;
+using namespace NBus::NPrivate;
+
+TRemoteClientConnection::TRemoteClientConnection(TRemoteClientSessionPtr session, ui64 id, TNetAddr addr)
+ : TRemoteConnection(session.Get(), id, addr)
+ , ClientHandler(GetSession()->ClientHandler)
+{
Y_VERIFY(addr.GetPort() > 0, "must connect to non-zero port");
-
- ScheduleWrite();
-}
-
-TRemoteClientSession* TRemoteClientConnection::GetSession() {
- return CheckedCast<TRemoteClientSession*>(Session.Get());
-}
-
-TBusMessage* TRemoteClientConnection::PopAck(TBusKey id) {
- return AckMessages.Pop(id);
-}
-
+
+ ScheduleWrite();
+}
+
+TRemoteClientSession* TRemoteClientConnection::GetSession() {
+ return CheckedCast<TRemoteClientSession*>(Session.Get());
+}
+
+TBusMessage* TRemoteClientConnection::PopAck(TBusKey id) {
+ return AckMessages.Pop(id);
+}
+
SOCKET TRemoteClientConnection::CreateSocket(const TNetAddr& addr) {
- SOCKET handle = socket(addr.Addr()->sa_family, SOCK_STREAM, 0);
+ SOCKET handle = socket(addr.Addr()->sa_family, SOCK_STREAM, 0);
Y_VERIFY(handle != INVALID_SOCKET, "failed to create socket: %s", LastSystemErrorText());
-
- TSocketHolder s(handle);
-
- SetNonBlock(s, true);
- SetNoDelay(s, Config.TcpNoDelay);
- SetSockOptTcpCork(s, Config.TcpCork);
- SetCloseOnExec(s, true);
- SetKeepAlive(s, true);
- if (Config.SocketRecvBufferSize != 0) {
- SetInputBuffer(s, Config.SocketRecvBufferSize);
- }
- if (Config.SocketSendBufferSize != 0) {
- SetOutputBuffer(s, Config.SocketSendBufferSize);
- }
- if (Config.SocketToS >= 0) {
- SetSocketToS(s, &addr, Config.SocketToS);
- }
-
- return s.Release();
-}
-
-void TRemoteClientConnection::TryConnect() {
- if (AtomicGet(WriterData.Down)) {
- return;
- }
+
+ TSocketHolder s(handle);
+
+ SetNonBlock(s, true);
+ SetNoDelay(s, Config.TcpNoDelay);
+ SetSockOptTcpCork(s, Config.TcpCork);
+ SetCloseOnExec(s, true);
+ SetKeepAlive(s, true);
+ if (Config.SocketRecvBufferSize != 0) {
+ SetInputBuffer(s, Config.SocketRecvBufferSize);
+ }
+ if (Config.SocketSendBufferSize != 0) {
+ SetOutputBuffer(s, Config.SocketSendBufferSize);
+ }
+ if (Config.SocketToS >= 0) {
+ SetSocketToS(s, &addr, Config.SocketToS);
+ }
+
+ return s.Release();
+}
+
+void TRemoteClientConnection::TryConnect() {
+ if (AtomicGet(WriterData.Down)) {
+ return;
+ }
Y_VERIFY(!WriterData.Status.Connected);
-
- TInstant now = TInstant::Now();
-
- if (!WriterData.Channel) {
- if ((now - LastConnectAttempt) < TDuration::MilliSeconds(Config.RetryInterval)) {
+
+ TInstant now = TInstant::Now();
+
+ if (!WriterData.Channel) {
+ if ((now - LastConnectAttempt) < TDuration::MilliSeconds(Config.RetryInterval)) {
DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED);
- return;
- }
- LastConnectAttempt = now;
-
- TSocket connectSocket(CreateSocket(PeerAddr));
- WriterData.SetChannel(Session->WriteEventLoop.Register(connectSocket, this, WriteCookie));
- }
-
+ return;
+ }
+ LastConnectAttempt = now;
+
+ TSocket connectSocket(CreateSocket(PeerAddr));
+ WriterData.SetChannel(Session->WriteEventLoop.Register(connectSocket, this, WriteCookie));
+ }
+
if (BeforeSendQueue.IsEmpty() && WriterData.SendQueue.Empty() && !Config.ReconnectWhenIdle) {
// TryConnect is called from Writer::Act, which is called in cycle
// from session's ScheduleTimeoutMessages via Cron. This prevent these excessive connects.
return;
}
- ++WriterData.Status.ConnectSyscalls;
-
- int ret = connect(WriterData.Channel->GetSocket(), PeerAddr.Addr(), PeerAddr.Len());
- int err = ret ? LastSystemError() : 0;
-
- if (!ret || (ret && err == EISCONN)) {
- WriterData.Status.ConnectTime = now;
- ++WriterData.SocketVersion;
-
- WriterData.Channel->DisableWrite();
- WriterData.Status.Connected = true;
- AtomicSet(ReturnConnectFailedImmediately, false);
-
- WriterData.Status.MyAddr = TNetAddr(GetSockAddr(WriterData.Channel->GetSocket()));
-
- TSocket readSocket = WriterData.Channel->GetSocketPtr();
-
- ReaderGetSocketQueue()->EnqueueAndSchedule(TWriterToReaderSocketMessage(readSocket, WriterData.SocketVersion));
-
- FireClientConnectionEvent(TClientConnectionEvent::CONNECTED);
-
- ScheduleWrite();
- } else {
- if (WouldBlock() || err == EALREADY) {
- WriterData.Channel->EnableWrite();
- } else {
- WriterData.DropChannel();
- WriterData.Status.MyAddr = TNetAddr();
- WriterData.Status.Connected = false;
- WriterData.Status.ConnectError = err;
-
+ ++WriterData.Status.ConnectSyscalls;
+
+ int ret = connect(WriterData.Channel->GetSocket(), PeerAddr.Addr(), PeerAddr.Len());
+ int err = ret ? LastSystemError() : 0;
+
+ if (!ret || (ret && err == EISCONN)) {
+ WriterData.Status.ConnectTime = now;
+ ++WriterData.SocketVersion;
+
+ WriterData.Channel->DisableWrite();
+ WriterData.Status.Connected = true;
+ AtomicSet(ReturnConnectFailedImmediately, false);
+
+ WriterData.Status.MyAddr = TNetAddr(GetSockAddr(WriterData.Channel->GetSocket()));
+
+ TSocket readSocket = WriterData.Channel->GetSocketPtr();
+
+ ReaderGetSocketQueue()->EnqueueAndSchedule(TWriterToReaderSocketMessage(readSocket, WriterData.SocketVersion));
+
+ FireClientConnectionEvent(TClientConnectionEvent::CONNECTED);
+
+ ScheduleWrite();
+ } else {
+ if (WouldBlock() || err == EALREADY) {
+ WriterData.Channel->EnableWrite();
+ } else {
+ WriterData.DropChannel();
+ WriterData.Status.MyAddr = TNetAddr();
+ WriterData.Status.Connected = false;
+ WriterData.Status.ConnectError = err;
+
DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED);
- }
- }
-}
-
-void TRemoteClientConnection::HandleEvent(SOCKET socket, void* cookie) {
+ }
+ }
+}
+
+void TRemoteClientConnection::HandleEvent(SOCKET socket, void* cookie) {
Y_UNUSED(socket);
Y_ASSERT(cookie == WriteCookie || cookie == ReadCookie);
- if (cookie == ReadCookie) {
- ScheduleRead();
- } else {
- ScheduleWrite();
- }
-}
-
-void TRemoteClientConnection::WriterFillStatus() {
- TRemoteConnection::WriterFillStatus();
- WriterData.Status.AckMessagesSize = AckMessages.Size();
-}
-
-void TRemoteClientConnection::BeforeTryWrite() {
- ProcessReplyQueue();
- TimeoutMessages();
-}
-
+ if (cookie == ReadCookie) {
+ ScheduleRead();
+ } else {
+ ScheduleWrite();
+ }
+}
+
+void TRemoteClientConnection::WriterFillStatus() {
+ TRemoteConnection::WriterFillStatus();
+ WriterData.Status.AckMessagesSize = AckMessages.Size();
+}
+
+void TRemoteClientConnection::BeforeTryWrite() {
+ ProcessReplyQueue();
+ TimeoutMessages();
+}
+
namespace NBus {
namespace NPrivate {
class TInvokeOnReply: public IWorkItem {
@@ -145,7 +145,7 @@ namespace NBus {
TRemoteClientSession* RemoteClientSession;
TNonDestroyingHolder<TBusMessage> Request;
TBusMessagePtrAndHeader Response;
-
+
public:
TInvokeOnReply(TRemoteClientSession* session,
TNonDestroyingAutoPtr<TBusMessage> request, TBusMessagePtrAndHeader& response)
@@ -154,7 +154,7 @@ namespace NBus {
{
Response.Swap(response);
}
-
+
void DoWork() override {
THolder<TInvokeOnReply> holder(this);
RemoteClientSession->ReleaseInFlightAndCallOnReply(Request.Release(), Response);
@@ -162,182 +162,182 @@ namespace NBus {
RemoteClientSession->JobCount.Decrement();
}
};
-
+
}
}
-
-void TRemoteClientConnection::ProcessReplyQueue() {
- if (AtomicGet(WriterData.Down)) {
- return;
- }
-
- bool executeInWorkerPool = Session->Config.ExecuteOnReplyInWorkerPool;
-
- TTempTlsVector<TBusMessagePtrAndHeader, void, TVectorSwaps> replyQueueTemp;
- TTempTlsVector< ::NActor::IWorkItem*> workQueueTemp;
-
- ReplyQueue.DequeueAllSingleConsumer(replyQueueTemp.GetVector());
- if (executeInWorkerPool) {
- workQueueTemp.GetVector()->reserve(replyQueueTemp.GetVector()->size());
- }
-
+
+void TRemoteClientConnection::ProcessReplyQueue() {
+ if (AtomicGet(WriterData.Down)) {
+ return;
+ }
+
+ bool executeInWorkerPool = Session->Config.ExecuteOnReplyInWorkerPool;
+
+ TTempTlsVector<TBusMessagePtrAndHeader, void, TVectorSwaps> replyQueueTemp;
+ TTempTlsVector< ::NActor::IWorkItem*> workQueueTemp;
+
+ ReplyQueue.DequeueAllSingleConsumer(replyQueueTemp.GetVector());
+ if (executeInWorkerPool) {
+ workQueueTemp.GetVector()->reserve(replyQueueTemp.GetVector()->size());
+ }
+
for (auto& resp : *replyQueueTemp.GetVector()) {
TBusMessage* req = PopAck(resp.Header.Id);
-
- if (!req) {
+
+ if (!req) {
WriterErrorMessage(resp.MessagePtr.Release(), MESSAGE_UNKNOWN);
- continue;
- }
-
- if (executeInWorkerPool) {
+ continue;
+ }
+
+ if (executeInWorkerPool) {
workQueueTemp.GetVector()->push_back(new TInvokeOnReply(GetSession(), req, resp));
- } else {
+ } else {
GetSession()->ReleaseInFlightAndCallOnReply(req, resp);
- }
- }
-
- if (executeInWorkerPool) {
- Session->JobCount.Add(workQueueTemp.GetVector()->size());
- Session->Queue->EnqueueWork(*workQueueTemp.GetVector());
- }
-}
-
-void TRemoteClientConnection::TimeoutMessages() {
- if (!TimeToTimeoutMessages.FetchTask()) {
- return;
- }
-
- TMessagesPtrs timedOutMessages;
-
- TInstant sendDeadline;
- TInstant ackDeadline;
- if (IsReturnConnectFailedImmediately()) {
- sendDeadline = TInstant::Max();
- ackDeadline = TInstant::Max();
- } else {
- TInstant now = TInstant::Now();
- sendDeadline = now - TDuration::MilliSeconds(Session->Config.SendTimeout);
- ackDeadline = now - TDuration::MilliSeconds(Session->Config.TotalTimeout);
- }
-
- {
- TMessagesPtrs temp;
- WriterData.SendQueue.Timeout(sendDeadline, &temp);
- timedOutMessages.insert(timedOutMessages.end(), temp.begin(), temp.end());
- }
-
- // Ignores message that is being written currently (that is stored
- // in WriteMessage). It is not a big problem, because after written
- // to the network, message will be placed to the AckMessages queue,
- // and timed out on the next iteration of this procedure.
-
- {
- TMessagesPtrs temp;
- AckMessages.Timeout(ackDeadline, &temp);
- timedOutMessages.insert(timedOutMessages.end(), temp.begin(), temp.end());
- }
-
- ResetOneWayFlag(timedOutMessages);
-
- GetSession()->ReleaseInFlight(timedOutMessages);
- WriterErrorMessages(timedOutMessages, MESSAGE_TIMEOUT);
-}
-
-void TRemoteClientConnection::ScheduleTimeoutMessages() {
- TimeToTimeoutMessages.AddTask();
- ScheduleWrite();
-}
-
+ }
+ }
+
+ if (executeInWorkerPool) {
+ Session->JobCount.Add(workQueueTemp.GetVector()->size());
+ Session->Queue->EnqueueWork(*workQueueTemp.GetVector());
+ }
+}
+
+void TRemoteClientConnection::TimeoutMessages() {
+ if (!TimeToTimeoutMessages.FetchTask()) {
+ return;
+ }
+
+ TMessagesPtrs timedOutMessages;
+
+ TInstant sendDeadline;
+ TInstant ackDeadline;
+ if (IsReturnConnectFailedImmediately()) {
+ sendDeadline = TInstant::Max();
+ ackDeadline = TInstant::Max();
+ } else {
+ TInstant now = TInstant::Now();
+ sendDeadline = now - TDuration::MilliSeconds(Session->Config.SendTimeout);
+ ackDeadline = now - TDuration::MilliSeconds(Session->Config.TotalTimeout);
+ }
+
+ {
+ TMessagesPtrs temp;
+ WriterData.SendQueue.Timeout(sendDeadline, &temp);
+ timedOutMessages.insert(timedOutMessages.end(), temp.begin(), temp.end());
+ }
+
+ // Ignores message that is being written currently (that is stored
+ // in WriteMessage). It is not a big problem, because after written
+ // to the network, message will be placed to the AckMessages queue,
+ // and timed out on the next iteration of this procedure.
+
+ {
+ TMessagesPtrs temp;
+ AckMessages.Timeout(ackDeadline, &temp);
+ timedOutMessages.insert(timedOutMessages.end(), temp.begin(), temp.end());
+ }
+
+ ResetOneWayFlag(timedOutMessages);
+
+ GetSession()->ReleaseInFlight(timedOutMessages);
+ WriterErrorMessages(timedOutMessages, MESSAGE_TIMEOUT);
+}
+
+void TRemoteClientConnection::ScheduleTimeoutMessages() {
+ TimeToTimeoutMessages.AddTask();
+ ScheduleWrite();
+}
+
void TRemoteClientConnection::ReaderProcessMessageUnknownVersion(TArrayRef<const char>) {
- LWPROBE(Error, ToString(MESSAGE_INVALID_VERSION), ToString(PeerAddr), "");
- ReaderData.Status.Incremental.StatusCounter[MESSAGE_INVALID_VERSION] += 1;
- // TODO: close connection
+ LWPROBE(Error, ToString(MESSAGE_INVALID_VERSION), ToString(PeerAddr), "");
+ ReaderData.Status.Incremental.StatusCounter[MESSAGE_INVALID_VERSION] += 1;
+ // TODO: close connection
Y_FAIL("unknown message");
-}
-
-void TRemoteClientConnection::ClearOutgoingQueue(TMessagesPtrs& result, bool reconnect) {
+}
+
+void TRemoteClientConnection::ClearOutgoingQueue(TMessagesPtrs& result, bool reconnect) {
Y_ASSERT(result.empty());
-
- TRemoteConnection::ClearOutgoingQueue(result, reconnect);
- AckMessages.Clear(&result);
-
- ResetOneWayFlag(result);
- GetSession()->ReleaseInFlight(result);
-}
-
+
+ TRemoteConnection::ClearOutgoingQueue(result, reconnect);
+ AckMessages.Clear(&result);
+
+ ResetOneWayFlag(result);
+ GetSession()->ReleaseInFlight(result);
+}
+
void TRemoteClientConnection::MessageSent(TArrayRef<TBusMessagePtrAndHeader> messages) {
for (auto& message : messages) {
bool oneWay = message.LocalFlags & MESSAGE_ONE_WAY_INTERNAL;
-
- if (oneWay) {
+
+ if (oneWay) {
message.MessagePtr->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL;
-
+
TBusMessage* ackMsg = this->PopAck(message.Header.Id);
- if (!ackMsg) {
- // TODO: expired?
- }
-
+ if (!ackMsg) {
+ // TODO: expired?
+ }
+
if (ackMsg != message.MessagePtr.Get()) {
- // TODO: non-unique id?
- }
-
+ // TODO: non-unique id?
+ }
+
GetSession()->ReleaseInFlight({message.MessagePtr.Get()});
ClientHandler->OnMessageSentOneWay(message.MessagePtr.Release());
- } else {
+ } else {
ClientHandler->OnMessageSent(message.MessagePtr.Get());
AckMessages.Push(message);
- }
- }
-}
-
-EMessageStatus TRemoteClientConnection::SendMessage(TBusMessage* req, bool wait) {
- return SendMessageImpl(req, wait, false);
-}
-
-EMessageStatus TRemoteClientConnection::SendMessageOneWay(TBusMessage* req, bool wait) {
- return SendMessageImpl(req, wait, true);
-}
-
-EMessageStatus TRemoteClientConnection::SendMessageImpl(TBusMessage* msg, bool wait, bool oneWay) {
- msg->CheckClean();
-
- if (Session->IsDown()) {
- return MESSAGE_SHUTDOWN;
- }
-
- if (wait) {
+ }
+ }
+}
+
+EMessageStatus TRemoteClientConnection::SendMessage(TBusMessage* req, bool wait) {
+ return SendMessageImpl(req, wait, false);
+}
+
+EMessageStatus TRemoteClientConnection::SendMessageOneWay(TBusMessage* req, bool wait) {
+ return SendMessageImpl(req, wait, true);
+}
+
+EMessageStatus TRemoteClientConnection::SendMessageImpl(TBusMessage* msg, bool wait, bool oneWay) {
+ msg->CheckClean();
+
+ if (Session->IsDown()) {
+ return MESSAGE_SHUTDOWN;
+ }
+
+ if (wait) {
Y_VERIFY(!Session->Queue->GetExecutor()->IsInExecutorThread());
- GetSession()->ClientRemoteInFlight.Wait();
- } else {
- if (!GetSession()->ClientRemoteInFlight.TryWait()) {
- return MESSAGE_BUSY;
- }
- }
-
+ GetSession()->ClientRemoteInFlight.Wait();
+ } else {
+ if (!GetSession()->ClientRemoteInFlight.TryWait()) {
+ return MESSAGE_BUSY;
+ }
+ }
+
GetSession()->AcquireInFlight({msg});
-
- EMessageStatus ret = MESSAGE_OK;
-
- if (oneWay) {
- msg->LocalFlags |= MESSAGE_ONE_WAY_INTERNAL;
- }
-
- msg->GetHeader()->SendTime = Now();
-
- if (IsReturnConnectFailedImmediately()) {
- ret = MESSAGE_CONNECT_FAILED;
- goto clean;
- }
-
- Send(msg);
-
- return MESSAGE_OK;
-clean:
- msg->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL;
+
+ EMessageStatus ret = MESSAGE_OK;
+
+ if (oneWay) {
+ msg->LocalFlags |= MESSAGE_ONE_WAY_INTERNAL;
+ }
+
+ msg->GetHeader()->SendTime = Now();
+
+ if (IsReturnConnectFailedImmediately()) {
+ ret = MESSAGE_CONNECT_FAILED;
+ goto clean;
+ }
+
+ Send(msg);
+
+ return MESSAGE_OK;
+clean:
+ msg->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL;
GetSession()->ReleaseInFlight({msg});
- return ret;
-}
-
-void TRemoteClientConnection::OpenConnection() {
- // TODO
-}
+ return ret;
+}
+
+void TRemoteClientConnection::OpenConnection() {
+ // TODO
+}