diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/remote_client_connection.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/remote_client_connection.cpp')
-rw-r--r-- | library/cpp/messagebus/remote_client_connection.cpp | 343 |
1 files changed, 343 insertions, 0 deletions
diff --git a/library/cpp/messagebus/remote_client_connection.cpp b/library/cpp/messagebus/remote_client_connection.cpp new file mode 100644 index 0000000000..8c7a6db3a8 --- /dev/null +++ b/library/cpp/messagebus/remote_client_connection.cpp @@ -0,0 +1,343 @@ +#include "remote_client_connection.h" + +#include "mb_lwtrace.h" +#include "network.h" +#include "remote_client_session.h" + +#include <library/cpp/messagebus/actor/executor.h> +#include <library/cpp/messagebus/actor/temp_tls_vector.h> + +#include <util/generic/cast.h> +#include <util/thread/singleton.h> + +LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER) + +using namespace NActor; +using namespace NBus; +using namespace NBus::NPrivate; + +TRemoteClientConnection::TRemoteClientConnection(TRemoteClientSessionPtr session, ui64 id, TNetAddr addr) + : TRemoteConnection(session.Get(), id, addr) + , ClientHandler(GetSession()->ClientHandler) +{ + Y_VERIFY(addr.GetPort() > 0, "must connect to non-zero port"); + + ScheduleWrite(); +} + +TRemoteClientSession* TRemoteClientConnection::GetSession() { + return CheckedCast<TRemoteClientSession*>(Session.Get()); +} + +TBusMessage* TRemoteClientConnection::PopAck(TBusKey id) { + return AckMessages.Pop(id); +} + +SOCKET TRemoteClientConnection::CreateSocket(const TNetAddr& addr) { + SOCKET handle = socket(addr.Addr()->sa_family, SOCK_STREAM, 0); + Y_VERIFY(handle != INVALID_SOCKET, "failed to create socket: %s", LastSystemErrorText()); + + TSocketHolder s(handle); + + SetNonBlock(s, true); + SetNoDelay(s, Config.TcpNoDelay); + SetSockOptTcpCork(s, Config.TcpCork); + SetCloseOnExec(s, true); + SetKeepAlive(s, true); + if (Config.SocketRecvBufferSize != 0) { + SetInputBuffer(s, Config.SocketRecvBufferSize); + } + if (Config.SocketSendBufferSize != 0) { + SetOutputBuffer(s, Config.SocketSendBufferSize); + } + if (Config.SocketToS >= 0) { + SetSocketToS(s, &addr, Config.SocketToS); + } + + return s.Release(); +} + +void TRemoteClientConnection::TryConnect() { + if (AtomicGet(WriterData.Down)) { + return; + } + Y_VERIFY(!WriterData.Status.Connected); + + TInstant now = TInstant::Now(); + + if (!WriterData.Channel) { + if ((now - LastConnectAttempt) < TDuration::MilliSeconds(Config.RetryInterval)) { + DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED); + return; + } + LastConnectAttempt = now; + + TSocket connectSocket(CreateSocket(PeerAddr)); + WriterData.SetChannel(Session->WriteEventLoop.Register(connectSocket, this, WriteCookie)); + } + + if (BeforeSendQueue.IsEmpty() && WriterData.SendQueue.Empty() && !Config.ReconnectWhenIdle) { + // TryConnect is called from Writer::Act, which is called in cycle + // from session's ScheduleTimeoutMessages via Cron. This prevent these excessive connects. + return; + } + + ++WriterData.Status.ConnectSyscalls; + + int ret = connect(WriterData.Channel->GetSocket(), PeerAddr.Addr(), PeerAddr.Len()); + int err = ret ? LastSystemError() : 0; + + if (!ret || (ret && err == EISCONN)) { + WriterData.Status.ConnectTime = now; + ++WriterData.SocketVersion; + + WriterData.Channel->DisableWrite(); + WriterData.Status.Connected = true; + AtomicSet(ReturnConnectFailedImmediately, false); + + WriterData.Status.MyAddr = TNetAddr(GetSockAddr(WriterData.Channel->GetSocket())); + + TSocket readSocket = WriterData.Channel->GetSocketPtr(); + + ReaderGetSocketQueue()->EnqueueAndSchedule(TWriterToReaderSocketMessage(readSocket, WriterData.SocketVersion)); + + FireClientConnectionEvent(TClientConnectionEvent::CONNECTED); + + ScheduleWrite(); + } else { + if (WouldBlock() || err == EALREADY) { + WriterData.Channel->EnableWrite(); + } else { + WriterData.DropChannel(); + WriterData.Status.MyAddr = TNetAddr(); + WriterData.Status.Connected = false; + WriterData.Status.ConnectError = err; + + DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED); + } + } +} + +void TRemoteClientConnection::HandleEvent(SOCKET socket, void* cookie) { + Y_UNUSED(socket); + Y_ASSERT(cookie == WriteCookie || cookie == ReadCookie); + if (cookie == ReadCookie) { + ScheduleRead(); + } else { + ScheduleWrite(); + } +} + +void TRemoteClientConnection::WriterFillStatus() { + TRemoteConnection::WriterFillStatus(); + WriterData.Status.AckMessagesSize = AckMessages.Size(); +} + +void TRemoteClientConnection::BeforeTryWrite() { + ProcessReplyQueue(); + TimeoutMessages(); +} + +namespace NBus { + namespace NPrivate { + class TInvokeOnReply: public IWorkItem { + private: + TRemoteClientSession* RemoteClientSession; + TNonDestroyingHolder<TBusMessage> Request; + TBusMessagePtrAndHeader Response; + + public: + TInvokeOnReply(TRemoteClientSession* session, + TNonDestroyingAutoPtr<TBusMessage> request, TBusMessagePtrAndHeader& response) + : RemoteClientSession(session) + , Request(request) + { + Response.Swap(response); + } + + void DoWork() override { + THolder<TInvokeOnReply> holder(this); + RemoteClientSession->ReleaseInFlightAndCallOnReply(Request.Release(), Response); + // TODO: TRemoteClientSessionSemaphore should be enough + RemoteClientSession->JobCount.Decrement(); + } + }; + + } +} + +void TRemoteClientConnection::ProcessReplyQueue() { + if (AtomicGet(WriterData.Down)) { + return; + } + + bool executeInWorkerPool = Session->Config.ExecuteOnReplyInWorkerPool; + + TTempTlsVector<TBusMessagePtrAndHeader, void, TVectorSwaps> replyQueueTemp; + TTempTlsVector< ::NActor::IWorkItem*> workQueueTemp; + + ReplyQueue.DequeueAllSingleConsumer(replyQueueTemp.GetVector()); + if (executeInWorkerPool) { + workQueueTemp.GetVector()->reserve(replyQueueTemp.GetVector()->size()); + } + + for (auto& resp : *replyQueueTemp.GetVector()) { + TBusMessage* req = PopAck(resp.Header.Id); + + if (!req) { + WriterErrorMessage(resp.MessagePtr.Release(), MESSAGE_UNKNOWN); + continue; + } + + if (executeInWorkerPool) { + workQueueTemp.GetVector()->push_back(new TInvokeOnReply(GetSession(), req, resp)); + } else { + GetSession()->ReleaseInFlightAndCallOnReply(req, resp); + } + } + + if (executeInWorkerPool) { + Session->JobCount.Add(workQueueTemp.GetVector()->size()); + Session->Queue->EnqueueWork(*workQueueTemp.GetVector()); + } +} + +void TRemoteClientConnection::TimeoutMessages() { + if (!TimeToTimeoutMessages.FetchTask()) { + return; + } + + TMessagesPtrs timedOutMessages; + + TInstant sendDeadline; + TInstant ackDeadline; + if (IsReturnConnectFailedImmediately()) { + sendDeadline = TInstant::Max(); + ackDeadline = TInstant::Max(); + } else { + TInstant now = TInstant::Now(); + sendDeadline = now - TDuration::MilliSeconds(Session->Config.SendTimeout); + ackDeadline = now - TDuration::MilliSeconds(Session->Config.TotalTimeout); + } + + { + TMessagesPtrs temp; + WriterData.SendQueue.Timeout(sendDeadline, &temp); + timedOutMessages.insert(timedOutMessages.end(), temp.begin(), temp.end()); + } + + // Ignores message that is being written currently (that is stored + // in WriteMessage). It is not a big problem, because after written + // to the network, message will be placed to the AckMessages queue, + // and timed out on the next iteration of this procedure. + + { + TMessagesPtrs temp; + AckMessages.Timeout(ackDeadline, &temp); + timedOutMessages.insert(timedOutMessages.end(), temp.begin(), temp.end()); + } + + ResetOneWayFlag(timedOutMessages); + + GetSession()->ReleaseInFlight(timedOutMessages); + WriterErrorMessages(timedOutMessages, MESSAGE_TIMEOUT); +} + +void TRemoteClientConnection::ScheduleTimeoutMessages() { + TimeToTimeoutMessages.AddTask(); + ScheduleWrite(); +} + +void TRemoteClientConnection::ReaderProcessMessageUnknownVersion(TArrayRef<const char>) { + LWPROBE(Error, ToString(MESSAGE_INVALID_VERSION), ToString(PeerAddr), ""); + ReaderData.Status.Incremental.StatusCounter[MESSAGE_INVALID_VERSION] += 1; + // TODO: close connection + Y_FAIL("unknown message"); +} + +void TRemoteClientConnection::ClearOutgoingQueue(TMessagesPtrs& result, bool reconnect) { + Y_ASSERT(result.empty()); + + TRemoteConnection::ClearOutgoingQueue(result, reconnect); + AckMessages.Clear(&result); + + ResetOneWayFlag(result); + GetSession()->ReleaseInFlight(result); +} + +void TRemoteClientConnection::MessageSent(TArrayRef<TBusMessagePtrAndHeader> messages) { + for (auto& message : messages) { + bool oneWay = message.LocalFlags & MESSAGE_ONE_WAY_INTERNAL; + + if (oneWay) { + message.MessagePtr->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL; + + TBusMessage* ackMsg = this->PopAck(message.Header.Id); + if (!ackMsg) { + // TODO: expired? + } + + if (ackMsg != message.MessagePtr.Get()) { + // TODO: non-unique id? + } + + GetSession()->ReleaseInFlight({message.MessagePtr.Get()}); + ClientHandler->OnMessageSentOneWay(message.MessagePtr.Release()); + } else { + ClientHandler->OnMessageSent(message.MessagePtr.Get()); + AckMessages.Push(message); + } + } +} + +EMessageStatus TRemoteClientConnection::SendMessage(TBusMessage* req, bool wait) { + return SendMessageImpl(req, wait, false); +} + +EMessageStatus TRemoteClientConnection::SendMessageOneWay(TBusMessage* req, bool wait) { + return SendMessageImpl(req, wait, true); +} + +EMessageStatus TRemoteClientConnection::SendMessageImpl(TBusMessage* msg, bool wait, bool oneWay) { + msg->CheckClean(); + + if (Session->IsDown()) { + return MESSAGE_SHUTDOWN; + } + + if (wait) { + Y_VERIFY(!Session->Queue->GetExecutor()->IsInExecutorThread()); + GetSession()->ClientRemoteInFlight.Wait(); + } else { + if (!GetSession()->ClientRemoteInFlight.TryWait()) { + return MESSAGE_BUSY; + } + } + + GetSession()->AcquireInFlight({msg}); + + EMessageStatus ret = MESSAGE_OK; + + if (oneWay) { + msg->LocalFlags |= MESSAGE_ONE_WAY_INTERNAL; + } + + msg->GetHeader()->SendTime = Now(); + + if (IsReturnConnectFailedImmediately()) { + ret = MESSAGE_CONNECT_FAILED; + goto clean; + } + + Send(msg); + + return MESSAGE_OK; +clean: + msg->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL; + GetSession()->ReleaseInFlight({msg}); + return ret; +} + +void TRemoteClientConnection::OpenConnection() { + // TODO +} |