diff options
author | nga <nga@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
commit | 1f553f46fb4f3c5eec631352cdd900a0709016af (patch) | |
tree | a231fba2c03b440becaea6c86a2702d0bfb0336e /library/cpp/messagebus/remote_client_connection.cpp | |
parent | c4de7efdedc25b49cbea74bd589eecb61b55b60a (diff) | |
download | ydb-1f553f46fb4f3c5eec631352cdd900a0709016af.tar.gz |
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/remote_client_connection.cpp')
-rw-r--r-- | library/cpp/messagebus/remote_client_connection.cpp | 556 |
1 files changed, 278 insertions, 278 deletions
diff --git a/library/cpp/messagebus/remote_client_connection.cpp b/library/cpp/messagebus/remote_client_connection.cpp index 8c7a6db3a8..b7b05e7bed 100644 --- a/library/cpp/messagebus/remote_client_connection.cpp +++ b/library/cpp/messagebus/remote_client_connection.cpp @@ -1,143 +1,143 @@ -#include "remote_client_connection.h" - +#include "remote_client_connection.h" + #include "mb_lwtrace.h" #include "network.h" -#include "remote_client_session.h" - +#include "remote_client_session.h" + #include <library/cpp/messagebus/actor/executor.h> #include <library/cpp/messagebus/actor/temp_tls_vector.h> - + #include <util/generic/cast.h> #include <util/thread/singleton.h> - -LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER) - -using namespace NActor; -using namespace NBus; -using namespace NBus::NPrivate; - -TRemoteClientConnection::TRemoteClientConnection(TRemoteClientSessionPtr session, ui64 id, TNetAddr addr) - : TRemoteConnection(session.Get(), id, addr) - , ClientHandler(GetSession()->ClientHandler) -{ + +LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER) + +using namespace NActor; +using namespace NBus; +using namespace NBus::NPrivate; + +TRemoteClientConnection::TRemoteClientConnection(TRemoteClientSessionPtr session, ui64 id, TNetAddr addr) + : TRemoteConnection(session.Get(), id, addr) + , ClientHandler(GetSession()->ClientHandler) +{ Y_VERIFY(addr.GetPort() > 0, "must connect to non-zero port"); - - ScheduleWrite(); -} - -TRemoteClientSession* TRemoteClientConnection::GetSession() { - return CheckedCast<TRemoteClientSession*>(Session.Get()); -} - -TBusMessage* TRemoteClientConnection::PopAck(TBusKey id) { - return AckMessages.Pop(id); -} - + + ScheduleWrite(); +} + +TRemoteClientSession* TRemoteClientConnection::GetSession() { + return CheckedCast<TRemoteClientSession*>(Session.Get()); +} + +TBusMessage* TRemoteClientConnection::PopAck(TBusKey id) { + return AckMessages.Pop(id); +} + SOCKET TRemoteClientConnection::CreateSocket(const TNetAddr& addr) { - SOCKET handle = socket(addr.Addr()->sa_family, SOCK_STREAM, 0); + SOCKET handle = socket(addr.Addr()->sa_family, SOCK_STREAM, 0); Y_VERIFY(handle != INVALID_SOCKET, "failed to create socket: %s", LastSystemErrorText()); - - TSocketHolder s(handle); - - SetNonBlock(s, true); - SetNoDelay(s, Config.TcpNoDelay); - SetSockOptTcpCork(s, Config.TcpCork); - SetCloseOnExec(s, true); - SetKeepAlive(s, true); - if (Config.SocketRecvBufferSize != 0) { - SetInputBuffer(s, Config.SocketRecvBufferSize); - } - if (Config.SocketSendBufferSize != 0) { - SetOutputBuffer(s, Config.SocketSendBufferSize); - } - if (Config.SocketToS >= 0) { - SetSocketToS(s, &addr, Config.SocketToS); - } - - return s.Release(); -} - -void TRemoteClientConnection::TryConnect() { - if (AtomicGet(WriterData.Down)) { - return; - } + + TSocketHolder s(handle); + + SetNonBlock(s, true); + SetNoDelay(s, Config.TcpNoDelay); + SetSockOptTcpCork(s, Config.TcpCork); + SetCloseOnExec(s, true); + SetKeepAlive(s, true); + if (Config.SocketRecvBufferSize != 0) { + SetInputBuffer(s, Config.SocketRecvBufferSize); + } + if (Config.SocketSendBufferSize != 0) { + SetOutputBuffer(s, Config.SocketSendBufferSize); + } + if (Config.SocketToS >= 0) { + SetSocketToS(s, &addr, Config.SocketToS); + } + + return s.Release(); +} + +void TRemoteClientConnection::TryConnect() { + if (AtomicGet(WriterData.Down)) { + return; + } Y_VERIFY(!WriterData.Status.Connected); - - TInstant now = TInstant::Now(); - - if (!WriterData.Channel) { - if ((now - LastConnectAttempt) < TDuration::MilliSeconds(Config.RetryInterval)) { + + TInstant now = TInstant::Now(); + + if (!WriterData.Channel) { + if ((now - LastConnectAttempt) < TDuration::MilliSeconds(Config.RetryInterval)) { DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED); - return; - } - LastConnectAttempt = now; - - TSocket connectSocket(CreateSocket(PeerAddr)); - WriterData.SetChannel(Session->WriteEventLoop.Register(connectSocket, this, WriteCookie)); - } - + return; + } + LastConnectAttempt = now; + + TSocket connectSocket(CreateSocket(PeerAddr)); + WriterData.SetChannel(Session->WriteEventLoop.Register(connectSocket, this, WriteCookie)); + } + if (BeforeSendQueue.IsEmpty() && WriterData.SendQueue.Empty() && !Config.ReconnectWhenIdle) { // TryConnect is called from Writer::Act, which is called in cycle // from session's ScheduleTimeoutMessages via Cron. This prevent these excessive connects. return; } - ++WriterData.Status.ConnectSyscalls; - - int ret = connect(WriterData.Channel->GetSocket(), PeerAddr.Addr(), PeerAddr.Len()); - int err = ret ? LastSystemError() : 0; - - if (!ret || (ret && err == EISCONN)) { - WriterData.Status.ConnectTime = now; - ++WriterData.SocketVersion; - - WriterData.Channel->DisableWrite(); - WriterData.Status.Connected = true; - AtomicSet(ReturnConnectFailedImmediately, false); - - WriterData.Status.MyAddr = TNetAddr(GetSockAddr(WriterData.Channel->GetSocket())); - - TSocket readSocket = WriterData.Channel->GetSocketPtr(); - - ReaderGetSocketQueue()->EnqueueAndSchedule(TWriterToReaderSocketMessage(readSocket, WriterData.SocketVersion)); - - FireClientConnectionEvent(TClientConnectionEvent::CONNECTED); - - ScheduleWrite(); - } else { - if (WouldBlock() || err == EALREADY) { - WriterData.Channel->EnableWrite(); - } else { - WriterData.DropChannel(); - WriterData.Status.MyAddr = TNetAddr(); - WriterData.Status.Connected = false; - WriterData.Status.ConnectError = err; - + ++WriterData.Status.ConnectSyscalls; + + int ret = connect(WriterData.Channel->GetSocket(), PeerAddr.Addr(), PeerAddr.Len()); + int err = ret ? LastSystemError() : 0; + + if (!ret || (ret && err == EISCONN)) { + WriterData.Status.ConnectTime = now; + ++WriterData.SocketVersion; + + WriterData.Channel->DisableWrite(); + WriterData.Status.Connected = true; + AtomicSet(ReturnConnectFailedImmediately, false); + + WriterData.Status.MyAddr = TNetAddr(GetSockAddr(WriterData.Channel->GetSocket())); + + TSocket readSocket = WriterData.Channel->GetSocketPtr(); + + ReaderGetSocketQueue()->EnqueueAndSchedule(TWriterToReaderSocketMessage(readSocket, WriterData.SocketVersion)); + + FireClientConnectionEvent(TClientConnectionEvent::CONNECTED); + + ScheduleWrite(); + } else { + if (WouldBlock() || err == EALREADY) { + WriterData.Channel->EnableWrite(); + } else { + WriterData.DropChannel(); + WriterData.Status.MyAddr = TNetAddr(); + WriterData.Status.Connected = false; + WriterData.Status.ConnectError = err; + DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED); - } - } -} - -void TRemoteClientConnection::HandleEvent(SOCKET socket, void* cookie) { + } + } +} + +void TRemoteClientConnection::HandleEvent(SOCKET socket, void* cookie) { Y_UNUSED(socket); Y_ASSERT(cookie == WriteCookie || cookie == ReadCookie); - if (cookie == ReadCookie) { - ScheduleRead(); - } else { - ScheduleWrite(); - } -} - -void TRemoteClientConnection::WriterFillStatus() { - TRemoteConnection::WriterFillStatus(); - WriterData.Status.AckMessagesSize = AckMessages.Size(); -} - -void TRemoteClientConnection::BeforeTryWrite() { - ProcessReplyQueue(); - TimeoutMessages(); -} - + if (cookie == ReadCookie) { + ScheduleRead(); + } else { + ScheduleWrite(); + } +} + +void TRemoteClientConnection::WriterFillStatus() { + TRemoteConnection::WriterFillStatus(); + WriterData.Status.AckMessagesSize = AckMessages.Size(); +} + +void TRemoteClientConnection::BeforeTryWrite() { + ProcessReplyQueue(); + TimeoutMessages(); +} + namespace NBus { namespace NPrivate { class TInvokeOnReply: public IWorkItem { @@ -145,7 +145,7 @@ namespace NBus { TRemoteClientSession* RemoteClientSession; TNonDestroyingHolder<TBusMessage> Request; TBusMessagePtrAndHeader Response; - + public: TInvokeOnReply(TRemoteClientSession* session, TNonDestroyingAutoPtr<TBusMessage> request, TBusMessagePtrAndHeader& response) @@ -154,7 +154,7 @@ namespace NBus { { Response.Swap(response); } - + void DoWork() override { THolder<TInvokeOnReply> holder(this); RemoteClientSession->ReleaseInFlightAndCallOnReply(Request.Release(), Response); @@ -162,182 +162,182 @@ namespace NBus { RemoteClientSession->JobCount.Decrement(); } }; - + } } - -void TRemoteClientConnection::ProcessReplyQueue() { - if (AtomicGet(WriterData.Down)) { - return; - } - - bool executeInWorkerPool = Session->Config.ExecuteOnReplyInWorkerPool; - - TTempTlsVector<TBusMessagePtrAndHeader, void, TVectorSwaps> replyQueueTemp; - TTempTlsVector< ::NActor::IWorkItem*> workQueueTemp; - - ReplyQueue.DequeueAllSingleConsumer(replyQueueTemp.GetVector()); - if (executeInWorkerPool) { - workQueueTemp.GetVector()->reserve(replyQueueTemp.GetVector()->size()); - } - + +void TRemoteClientConnection::ProcessReplyQueue() { + if (AtomicGet(WriterData.Down)) { + return; + } + + bool executeInWorkerPool = Session->Config.ExecuteOnReplyInWorkerPool; + + TTempTlsVector<TBusMessagePtrAndHeader, void, TVectorSwaps> replyQueueTemp; + TTempTlsVector< ::NActor::IWorkItem*> workQueueTemp; + + ReplyQueue.DequeueAllSingleConsumer(replyQueueTemp.GetVector()); + if (executeInWorkerPool) { + workQueueTemp.GetVector()->reserve(replyQueueTemp.GetVector()->size()); + } + for (auto& resp : *replyQueueTemp.GetVector()) { TBusMessage* req = PopAck(resp.Header.Id); - - if (!req) { + + if (!req) { WriterErrorMessage(resp.MessagePtr.Release(), MESSAGE_UNKNOWN); - continue; - } - - if (executeInWorkerPool) { + continue; + } + + if (executeInWorkerPool) { workQueueTemp.GetVector()->push_back(new TInvokeOnReply(GetSession(), req, resp)); - } else { + } else { GetSession()->ReleaseInFlightAndCallOnReply(req, resp); - } - } - - if (executeInWorkerPool) { - Session->JobCount.Add(workQueueTemp.GetVector()->size()); - Session->Queue->EnqueueWork(*workQueueTemp.GetVector()); - } -} - -void TRemoteClientConnection::TimeoutMessages() { - if (!TimeToTimeoutMessages.FetchTask()) { - return; - } - - TMessagesPtrs timedOutMessages; - - TInstant sendDeadline; - TInstant ackDeadline; - if (IsReturnConnectFailedImmediately()) { - sendDeadline = TInstant::Max(); - ackDeadline = TInstant::Max(); - } else { - TInstant now = TInstant::Now(); - sendDeadline = now - TDuration::MilliSeconds(Session->Config.SendTimeout); - ackDeadline = now - TDuration::MilliSeconds(Session->Config.TotalTimeout); - } - - { - TMessagesPtrs temp; - WriterData.SendQueue.Timeout(sendDeadline, &temp); - timedOutMessages.insert(timedOutMessages.end(), temp.begin(), temp.end()); - } - - // Ignores message that is being written currently (that is stored - // in WriteMessage). It is not a big problem, because after written - // to the network, message will be placed to the AckMessages queue, - // and timed out on the next iteration of this procedure. - - { - TMessagesPtrs temp; - AckMessages.Timeout(ackDeadline, &temp); - timedOutMessages.insert(timedOutMessages.end(), temp.begin(), temp.end()); - } - - ResetOneWayFlag(timedOutMessages); - - GetSession()->ReleaseInFlight(timedOutMessages); - WriterErrorMessages(timedOutMessages, MESSAGE_TIMEOUT); -} - -void TRemoteClientConnection::ScheduleTimeoutMessages() { - TimeToTimeoutMessages.AddTask(); - ScheduleWrite(); -} - + } + } + + if (executeInWorkerPool) { + Session->JobCount.Add(workQueueTemp.GetVector()->size()); + Session->Queue->EnqueueWork(*workQueueTemp.GetVector()); + } +} + +void TRemoteClientConnection::TimeoutMessages() { + if (!TimeToTimeoutMessages.FetchTask()) { + return; + } + + TMessagesPtrs timedOutMessages; + + TInstant sendDeadline; + TInstant ackDeadline; + if (IsReturnConnectFailedImmediately()) { + sendDeadline = TInstant::Max(); + ackDeadline = TInstant::Max(); + } else { + TInstant now = TInstant::Now(); + sendDeadline = now - TDuration::MilliSeconds(Session->Config.SendTimeout); + ackDeadline = now - TDuration::MilliSeconds(Session->Config.TotalTimeout); + } + + { + TMessagesPtrs temp; + WriterData.SendQueue.Timeout(sendDeadline, &temp); + timedOutMessages.insert(timedOutMessages.end(), temp.begin(), temp.end()); + } + + // Ignores message that is being written currently (that is stored + // in WriteMessage). It is not a big problem, because after written + // to the network, message will be placed to the AckMessages queue, + // and timed out on the next iteration of this procedure. + + { + TMessagesPtrs temp; + AckMessages.Timeout(ackDeadline, &temp); + timedOutMessages.insert(timedOutMessages.end(), temp.begin(), temp.end()); + } + + ResetOneWayFlag(timedOutMessages); + + GetSession()->ReleaseInFlight(timedOutMessages); + WriterErrorMessages(timedOutMessages, MESSAGE_TIMEOUT); +} + +void TRemoteClientConnection::ScheduleTimeoutMessages() { + TimeToTimeoutMessages.AddTask(); + ScheduleWrite(); +} + void TRemoteClientConnection::ReaderProcessMessageUnknownVersion(TArrayRef<const char>) { - LWPROBE(Error, ToString(MESSAGE_INVALID_VERSION), ToString(PeerAddr), ""); - ReaderData.Status.Incremental.StatusCounter[MESSAGE_INVALID_VERSION] += 1; - // TODO: close connection + LWPROBE(Error, ToString(MESSAGE_INVALID_VERSION), ToString(PeerAddr), ""); + ReaderData.Status.Incremental.StatusCounter[MESSAGE_INVALID_VERSION] += 1; + // TODO: close connection Y_FAIL("unknown message"); -} - -void TRemoteClientConnection::ClearOutgoingQueue(TMessagesPtrs& result, bool reconnect) { +} + +void TRemoteClientConnection::ClearOutgoingQueue(TMessagesPtrs& result, bool reconnect) { Y_ASSERT(result.empty()); - - TRemoteConnection::ClearOutgoingQueue(result, reconnect); - AckMessages.Clear(&result); - - ResetOneWayFlag(result); - GetSession()->ReleaseInFlight(result); -} - + + TRemoteConnection::ClearOutgoingQueue(result, reconnect); + AckMessages.Clear(&result); + + ResetOneWayFlag(result); + GetSession()->ReleaseInFlight(result); +} + void TRemoteClientConnection::MessageSent(TArrayRef<TBusMessagePtrAndHeader> messages) { for (auto& message : messages) { bool oneWay = message.LocalFlags & MESSAGE_ONE_WAY_INTERNAL; - - if (oneWay) { + + if (oneWay) { message.MessagePtr->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL; - + TBusMessage* ackMsg = this->PopAck(message.Header.Id); - if (!ackMsg) { - // TODO: expired? - } - + if (!ackMsg) { + // TODO: expired? + } + if (ackMsg != message.MessagePtr.Get()) { - // TODO: non-unique id? - } - + // TODO: non-unique id? + } + GetSession()->ReleaseInFlight({message.MessagePtr.Get()}); ClientHandler->OnMessageSentOneWay(message.MessagePtr.Release()); - } else { + } else { ClientHandler->OnMessageSent(message.MessagePtr.Get()); AckMessages.Push(message); - } - } -} - -EMessageStatus TRemoteClientConnection::SendMessage(TBusMessage* req, bool wait) { - return SendMessageImpl(req, wait, false); -} - -EMessageStatus TRemoteClientConnection::SendMessageOneWay(TBusMessage* req, bool wait) { - return SendMessageImpl(req, wait, true); -} - -EMessageStatus TRemoteClientConnection::SendMessageImpl(TBusMessage* msg, bool wait, bool oneWay) { - msg->CheckClean(); - - if (Session->IsDown()) { - return MESSAGE_SHUTDOWN; - } - - if (wait) { + } + } +} + +EMessageStatus TRemoteClientConnection::SendMessage(TBusMessage* req, bool wait) { + return SendMessageImpl(req, wait, false); +} + +EMessageStatus TRemoteClientConnection::SendMessageOneWay(TBusMessage* req, bool wait) { + return SendMessageImpl(req, wait, true); +} + +EMessageStatus TRemoteClientConnection::SendMessageImpl(TBusMessage* msg, bool wait, bool oneWay) { + msg->CheckClean(); + + if (Session->IsDown()) { + return MESSAGE_SHUTDOWN; + } + + if (wait) { Y_VERIFY(!Session->Queue->GetExecutor()->IsInExecutorThread()); - GetSession()->ClientRemoteInFlight.Wait(); - } else { - if (!GetSession()->ClientRemoteInFlight.TryWait()) { - return MESSAGE_BUSY; - } - } - + GetSession()->ClientRemoteInFlight.Wait(); + } else { + if (!GetSession()->ClientRemoteInFlight.TryWait()) { + return MESSAGE_BUSY; + } + } + GetSession()->AcquireInFlight({msg}); - - EMessageStatus ret = MESSAGE_OK; - - if (oneWay) { - msg->LocalFlags |= MESSAGE_ONE_WAY_INTERNAL; - } - - msg->GetHeader()->SendTime = Now(); - - if (IsReturnConnectFailedImmediately()) { - ret = MESSAGE_CONNECT_FAILED; - goto clean; - } - - Send(msg); - - return MESSAGE_OK; -clean: - msg->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL; + + EMessageStatus ret = MESSAGE_OK; + + if (oneWay) { + msg->LocalFlags |= MESSAGE_ONE_WAY_INTERNAL; + } + + msg->GetHeader()->SendTime = Now(); + + if (IsReturnConnectFailedImmediately()) { + ret = MESSAGE_CONNECT_FAILED; + goto clean; + } + + Send(msg); + + return MESSAGE_OK; +clean: + msg->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL; GetSession()->ReleaseInFlight({msg}); - return ret; -} - -void TRemoteClientConnection::OpenConnection() { - // TODO -} + return ret; +} + +void TRemoteClientConnection::OpenConnection() { + // TODO +} |