aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/remote_client_connection.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/remote_client_connection.cpp
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/remote_client_connection.cpp')
-rw-r--r--library/cpp/messagebus/remote_client_connection.cpp343
1 files changed, 343 insertions, 0 deletions
diff --git a/library/cpp/messagebus/remote_client_connection.cpp b/library/cpp/messagebus/remote_client_connection.cpp
new file mode 100644
index 0000000000..8c7a6db3a8
--- /dev/null
+++ b/library/cpp/messagebus/remote_client_connection.cpp
@@ -0,0 +1,343 @@
+#include "remote_client_connection.h"
+
+#include "mb_lwtrace.h"
+#include "network.h"
+#include "remote_client_session.h"
+
+#include <library/cpp/messagebus/actor/executor.h>
+#include <library/cpp/messagebus/actor/temp_tls_vector.h>
+
+#include <util/generic/cast.h>
+#include <util/thread/singleton.h>
+
+LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER)
+
+using namespace NActor;
+using namespace NBus;
+using namespace NBus::NPrivate;
+
+TRemoteClientConnection::TRemoteClientConnection(TRemoteClientSessionPtr session, ui64 id, TNetAddr addr)
+ : TRemoteConnection(session.Get(), id, addr)
+ , ClientHandler(GetSession()->ClientHandler)
+{
+ Y_VERIFY(addr.GetPort() > 0, "must connect to non-zero port");
+
+ ScheduleWrite();
+}
+
+TRemoteClientSession* TRemoteClientConnection::GetSession() {
+ return CheckedCast<TRemoteClientSession*>(Session.Get());
+}
+
+TBusMessage* TRemoteClientConnection::PopAck(TBusKey id) {
+ return AckMessages.Pop(id);
+}
+
+SOCKET TRemoteClientConnection::CreateSocket(const TNetAddr& addr) {
+ SOCKET handle = socket(addr.Addr()->sa_family, SOCK_STREAM, 0);
+ Y_VERIFY(handle != INVALID_SOCKET, "failed to create socket: %s", LastSystemErrorText());
+
+ TSocketHolder s(handle);
+
+ SetNonBlock(s, true);
+ SetNoDelay(s, Config.TcpNoDelay);
+ SetSockOptTcpCork(s, Config.TcpCork);
+ SetCloseOnExec(s, true);
+ SetKeepAlive(s, true);
+ if (Config.SocketRecvBufferSize != 0) {
+ SetInputBuffer(s, Config.SocketRecvBufferSize);
+ }
+ if (Config.SocketSendBufferSize != 0) {
+ SetOutputBuffer(s, Config.SocketSendBufferSize);
+ }
+ if (Config.SocketToS >= 0) {
+ SetSocketToS(s, &addr, Config.SocketToS);
+ }
+
+ return s.Release();
+}
+
+void TRemoteClientConnection::TryConnect() {
+ if (AtomicGet(WriterData.Down)) {
+ return;
+ }
+ Y_VERIFY(!WriterData.Status.Connected);
+
+ TInstant now = TInstant::Now();
+
+ if (!WriterData.Channel) {
+ if ((now - LastConnectAttempt) < TDuration::MilliSeconds(Config.RetryInterval)) {
+ DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED);
+ return;
+ }
+ LastConnectAttempt = now;
+
+ TSocket connectSocket(CreateSocket(PeerAddr));
+ WriterData.SetChannel(Session->WriteEventLoop.Register(connectSocket, this, WriteCookie));
+ }
+
+ if (BeforeSendQueue.IsEmpty() && WriterData.SendQueue.Empty() && !Config.ReconnectWhenIdle) {
+ // TryConnect is called from Writer::Act, which is called in cycle
+ // from session's ScheduleTimeoutMessages via Cron. This prevent these excessive connects.
+ return;
+ }
+
+ ++WriterData.Status.ConnectSyscalls;
+
+ int ret = connect(WriterData.Channel->GetSocket(), PeerAddr.Addr(), PeerAddr.Len());
+ int err = ret ? LastSystemError() : 0;
+
+ if (!ret || (ret && err == EISCONN)) {
+ WriterData.Status.ConnectTime = now;
+ ++WriterData.SocketVersion;
+
+ WriterData.Channel->DisableWrite();
+ WriterData.Status.Connected = true;
+ AtomicSet(ReturnConnectFailedImmediately, false);
+
+ WriterData.Status.MyAddr = TNetAddr(GetSockAddr(WriterData.Channel->GetSocket()));
+
+ TSocket readSocket = WriterData.Channel->GetSocketPtr();
+
+ ReaderGetSocketQueue()->EnqueueAndSchedule(TWriterToReaderSocketMessage(readSocket, WriterData.SocketVersion));
+
+ FireClientConnectionEvent(TClientConnectionEvent::CONNECTED);
+
+ ScheduleWrite();
+ } else {
+ if (WouldBlock() || err == EALREADY) {
+ WriterData.Channel->EnableWrite();
+ } else {
+ WriterData.DropChannel();
+ WriterData.Status.MyAddr = TNetAddr();
+ WriterData.Status.Connected = false;
+ WriterData.Status.ConnectError = err;
+
+ DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED);
+ }
+ }
+}
+
+void TRemoteClientConnection::HandleEvent(SOCKET socket, void* cookie) {
+ Y_UNUSED(socket);
+ Y_ASSERT(cookie == WriteCookie || cookie == ReadCookie);
+ if (cookie == ReadCookie) {
+ ScheduleRead();
+ } else {
+ ScheduleWrite();
+ }
+}
+
+void TRemoteClientConnection::WriterFillStatus() {
+ TRemoteConnection::WriterFillStatus();
+ WriterData.Status.AckMessagesSize = AckMessages.Size();
+}
+
+void TRemoteClientConnection::BeforeTryWrite() {
+ ProcessReplyQueue();
+ TimeoutMessages();
+}
+
+namespace NBus {
+ namespace NPrivate {
+ class TInvokeOnReply: public IWorkItem {
+ private:
+ TRemoteClientSession* RemoteClientSession;
+ TNonDestroyingHolder<TBusMessage> Request;
+ TBusMessagePtrAndHeader Response;
+
+ public:
+ TInvokeOnReply(TRemoteClientSession* session,
+ TNonDestroyingAutoPtr<TBusMessage> request, TBusMessagePtrAndHeader& response)
+ : RemoteClientSession(session)
+ , Request(request)
+ {
+ Response.Swap(response);
+ }
+
+ void DoWork() override {
+ THolder<TInvokeOnReply> holder(this);
+ RemoteClientSession->ReleaseInFlightAndCallOnReply(Request.Release(), Response);
+ // TODO: TRemoteClientSessionSemaphore should be enough
+ RemoteClientSession->JobCount.Decrement();
+ }
+ };
+
+ }
+}
+
+void TRemoteClientConnection::ProcessReplyQueue() {
+ if (AtomicGet(WriterData.Down)) {
+ return;
+ }
+
+ bool executeInWorkerPool = Session->Config.ExecuteOnReplyInWorkerPool;
+
+ TTempTlsVector<TBusMessagePtrAndHeader, void, TVectorSwaps> replyQueueTemp;
+ TTempTlsVector< ::NActor::IWorkItem*> workQueueTemp;
+
+ ReplyQueue.DequeueAllSingleConsumer(replyQueueTemp.GetVector());
+ if (executeInWorkerPool) {
+ workQueueTemp.GetVector()->reserve(replyQueueTemp.GetVector()->size());
+ }
+
+ for (auto& resp : *replyQueueTemp.GetVector()) {
+ TBusMessage* req = PopAck(resp.Header.Id);
+
+ if (!req) {
+ WriterErrorMessage(resp.MessagePtr.Release(), MESSAGE_UNKNOWN);
+ continue;
+ }
+
+ if (executeInWorkerPool) {
+ workQueueTemp.GetVector()->push_back(new TInvokeOnReply(GetSession(), req, resp));
+ } else {
+ GetSession()->ReleaseInFlightAndCallOnReply(req, resp);
+ }
+ }
+
+ if (executeInWorkerPool) {
+ Session->JobCount.Add(workQueueTemp.GetVector()->size());
+ Session->Queue->EnqueueWork(*workQueueTemp.GetVector());
+ }
+}
+
+void TRemoteClientConnection::TimeoutMessages() {
+ if (!TimeToTimeoutMessages.FetchTask()) {
+ return;
+ }
+
+ TMessagesPtrs timedOutMessages;
+
+ TInstant sendDeadline;
+ TInstant ackDeadline;
+ if (IsReturnConnectFailedImmediately()) {
+ sendDeadline = TInstant::Max();
+ ackDeadline = TInstant::Max();
+ } else {
+ TInstant now = TInstant::Now();
+ sendDeadline = now - TDuration::MilliSeconds(Session->Config.SendTimeout);
+ ackDeadline = now - TDuration::MilliSeconds(Session->Config.TotalTimeout);
+ }
+
+ {
+ TMessagesPtrs temp;
+ WriterData.SendQueue.Timeout(sendDeadline, &temp);
+ timedOutMessages.insert(timedOutMessages.end(), temp.begin(), temp.end());
+ }
+
+ // Ignores message that is being written currently (that is stored
+ // in WriteMessage). It is not a big problem, because after written
+ // to the network, message will be placed to the AckMessages queue,
+ // and timed out on the next iteration of this procedure.
+
+ {
+ TMessagesPtrs temp;
+ AckMessages.Timeout(ackDeadline, &temp);
+ timedOutMessages.insert(timedOutMessages.end(), temp.begin(), temp.end());
+ }
+
+ ResetOneWayFlag(timedOutMessages);
+
+ GetSession()->ReleaseInFlight(timedOutMessages);
+ WriterErrorMessages(timedOutMessages, MESSAGE_TIMEOUT);
+}
+
+void TRemoteClientConnection::ScheduleTimeoutMessages() {
+ TimeToTimeoutMessages.AddTask();
+ ScheduleWrite();
+}
+
+void TRemoteClientConnection::ReaderProcessMessageUnknownVersion(TArrayRef<const char>) {
+ LWPROBE(Error, ToString(MESSAGE_INVALID_VERSION), ToString(PeerAddr), "");
+ ReaderData.Status.Incremental.StatusCounter[MESSAGE_INVALID_VERSION] += 1;
+ // TODO: close connection
+ Y_FAIL("unknown message");
+}
+
+void TRemoteClientConnection::ClearOutgoingQueue(TMessagesPtrs& result, bool reconnect) {
+ Y_ASSERT(result.empty());
+
+ TRemoteConnection::ClearOutgoingQueue(result, reconnect);
+ AckMessages.Clear(&result);
+
+ ResetOneWayFlag(result);
+ GetSession()->ReleaseInFlight(result);
+}
+
+void TRemoteClientConnection::MessageSent(TArrayRef<TBusMessagePtrAndHeader> messages) {
+ for (auto& message : messages) {
+ bool oneWay = message.LocalFlags & MESSAGE_ONE_WAY_INTERNAL;
+
+ if (oneWay) {
+ message.MessagePtr->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL;
+
+ TBusMessage* ackMsg = this->PopAck(message.Header.Id);
+ if (!ackMsg) {
+ // TODO: expired?
+ }
+
+ if (ackMsg != message.MessagePtr.Get()) {
+ // TODO: non-unique id?
+ }
+
+ GetSession()->ReleaseInFlight({message.MessagePtr.Get()});
+ ClientHandler->OnMessageSentOneWay(message.MessagePtr.Release());
+ } else {
+ ClientHandler->OnMessageSent(message.MessagePtr.Get());
+ AckMessages.Push(message);
+ }
+ }
+}
+
+EMessageStatus TRemoteClientConnection::SendMessage(TBusMessage* req, bool wait) {
+ return SendMessageImpl(req, wait, false);
+}
+
+EMessageStatus TRemoteClientConnection::SendMessageOneWay(TBusMessage* req, bool wait) {
+ return SendMessageImpl(req, wait, true);
+}
+
+EMessageStatus TRemoteClientConnection::SendMessageImpl(TBusMessage* msg, bool wait, bool oneWay) {
+ msg->CheckClean();
+
+ if (Session->IsDown()) {
+ return MESSAGE_SHUTDOWN;
+ }
+
+ if (wait) {
+ Y_VERIFY(!Session->Queue->GetExecutor()->IsInExecutorThread());
+ GetSession()->ClientRemoteInFlight.Wait();
+ } else {
+ if (!GetSession()->ClientRemoteInFlight.TryWait()) {
+ return MESSAGE_BUSY;
+ }
+ }
+
+ GetSession()->AcquireInFlight({msg});
+
+ EMessageStatus ret = MESSAGE_OK;
+
+ if (oneWay) {
+ msg->LocalFlags |= MESSAGE_ONE_WAY_INTERNAL;
+ }
+
+ msg->GetHeader()->SendTime = Now();
+
+ if (IsReturnConnectFailedImmediately()) {
+ ret = MESSAGE_CONNECT_FAILED;
+ goto clean;
+ }
+
+ Send(msg);
+
+ return MESSAGE_OK;
+clean:
+ msg->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL;
+ GetSession()->ReleaseInFlight({msg});
+ return ret;
+}
+
+void TRemoteClientConnection::OpenConnection() {
+ // TODO
+}