diff options
| author | nga <[email protected]> | 2022-02-10 16:48:09 +0300 | 
|---|---|---|
| committer | Daniil Cherednik <[email protected]> | 2022-02-10 16:48:09 +0300 | 
| commit | c2a1af049e9deca890e9923abe64fe6c59060348 (patch) | |
| tree | b222e5ac2e2e98872661c51ccceee5da0d291e13 /library/cpp/messagebus/remote_client_connection.cpp | |
| parent | 1f553f46fb4f3c5eec631352cdd900a0709016af (diff) | |
Restoring authorship annotation for <[email protected]>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/remote_client_connection.cpp')
| -rw-r--r-- | library/cpp/messagebus/remote_client_connection.cpp | 556 | 
1 files changed, 278 insertions, 278 deletions
| diff --git a/library/cpp/messagebus/remote_client_connection.cpp b/library/cpp/messagebus/remote_client_connection.cpp index b7b05e7bed4..8c7a6db3a8c 100644 --- a/library/cpp/messagebus/remote_client_connection.cpp +++ b/library/cpp/messagebus/remote_client_connection.cpp @@ -1,143 +1,143 @@ -#include "remote_client_connection.h"  -  +#include "remote_client_connection.h" +  #include "mb_lwtrace.h"  #include "network.h" -#include "remote_client_session.h"  -  +#include "remote_client_session.h" +  #include <library/cpp/messagebus/actor/executor.h>  #include <library/cpp/messagebus/actor/temp_tls_vector.h> -  +  #include <util/generic/cast.h>  #include <util/thread/singleton.h> -  -LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER)  -  -using namespace NActor;  -using namespace NBus;  -using namespace NBus::NPrivate;  -  -TRemoteClientConnection::TRemoteClientConnection(TRemoteClientSessionPtr session, ui64 id, TNetAddr addr)  -    : TRemoteConnection(session.Get(), id, addr)  -    , ClientHandler(GetSession()->ClientHandler)  -{  + +LWTRACE_USING(LWTRACE_MESSAGEBUS_PROVIDER) + +using namespace NActor; +using namespace NBus; +using namespace NBus::NPrivate; + +TRemoteClientConnection::TRemoteClientConnection(TRemoteClientSessionPtr session, ui64 id, TNetAddr addr) +    : TRemoteConnection(session.Get(), id, addr) +    , ClientHandler(GetSession()->ClientHandler) +{      Y_VERIFY(addr.GetPort() > 0, "must connect to non-zero port"); -  -    ScheduleWrite();  -}  -  -TRemoteClientSession* TRemoteClientConnection::GetSession() {  -    return CheckedCast<TRemoteClientSession*>(Session.Get());  -}  -  -TBusMessage* TRemoteClientConnection::PopAck(TBusKey id) {  -    return AckMessages.Pop(id);  -}  -  + +    ScheduleWrite(); +} + +TRemoteClientSession* TRemoteClientConnection::GetSession() { +    return CheckedCast<TRemoteClientSession*>(Session.Get()); +} + +TBusMessage* TRemoteClientConnection::PopAck(TBusKey id) { +    return AckMessages.Pop(id); +} +  SOCKET TRemoteClientConnection::CreateSocket(const TNetAddr& addr) { -    SOCKET handle = socket(addr.Addr()->sa_family, SOCK_STREAM, 0);  +    SOCKET handle = socket(addr.Addr()->sa_family, SOCK_STREAM, 0);      Y_VERIFY(handle != INVALID_SOCKET, "failed to create socket: %s", LastSystemErrorText()); -  -    TSocketHolder s(handle);  -  -    SetNonBlock(s, true);  -    SetNoDelay(s, Config.TcpNoDelay);  -    SetSockOptTcpCork(s, Config.TcpCork);  -    SetCloseOnExec(s, true);  -    SetKeepAlive(s, true);  -    if (Config.SocketRecvBufferSize != 0) {  -        SetInputBuffer(s, Config.SocketRecvBufferSize);  -    }  -    if (Config.SocketSendBufferSize != 0) {  -        SetOutputBuffer(s, Config.SocketSendBufferSize);  -    }  -    if (Config.SocketToS >= 0) {  -        SetSocketToS(s, &addr, Config.SocketToS);  -    }  -  -    return s.Release();  -}  -  -void TRemoteClientConnection::TryConnect() {  -    if (AtomicGet(WriterData.Down)) {  -        return;  -    }  + +    TSocketHolder s(handle); + +    SetNonBlock(s, true); +    SetNoDelay(s, Config.TcpNoDelay); +    SetSockOptTcpCork(s, Config.TcpCork); +    SetCloseOnExec(s, true); +    SetKeepAlive(s, true); +    if (Config.SocketRecvBufferSize != 0) { +        SetInputBuffer(s, Config.SocketRecvBufferSize); +    } +    if (Config.SocketSendBufferSize != 0) { +        SetOutputBuffer(s, Config.SocketSendBufferSize); +    } +    if (Config.SocketToS >= 0) { +        SetSocketToS(s, &addr, Config.SocketToS); +    } + +    return s.Release(); +} + +void TRemoteClientConnection::TryConnect() { +    if (AtomicGet(WriterData.Down)) { +        return; +    }      Y_VERIFY(!WriterData.Status.Connected); -  -    TInstant now = TInstant::Now();  -  -    if (!WriterData.Channel) {  -        if ((now - LastConnectAttempt) < TDuration::MilliSeconds(Config.RetryInterval)) {  + +    TInstant now = TInstant::Now(); + +    if (!WriterData.Channel) { +        if ((now - LastConnectAttempt) < TDuration::MilliSeconds(Config.RetryInterval)) {              DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED); -            return;  -        }  -        LastConnectAttempt = now;  -  -        TSocket connectSocket(CreateSocket(PeerAddr));  -        WriterData.SetChannel(Session->WriteEventLoop.Register(connectSocket, this, WriteCookie));  -    }  -  +            return; +        } +        LastConnectAttempt = now; + +        TSocket connectSocket(CreateSocket(PeerAddr)); +        WriterData.SetChannel(Session->WriteEventLoop.Register(connectSocket, this, WriteCookie)); +    } +      if (BeforeSendQueue.IsEmpty() && WriterData.SendQueue.Empty() && !Config.ReconnectWhenIdle) {          // TryConnect is called from Writer::Act, which is called in cycle          // from session's ScheduleTimeoutMessages via Cron. This prevent these excessive connects.          return;      } -    ++WriterData.Status.ConnectSyscalls;  -  -    int ret = connect(WriterData.Channel->GetSocket(), PeerAddr.Addr(), PeerAddr.Len());  -    int err = ret ? LastSystemError() : 0;  -  -    if (!ret || (ret && err == EISCONN)) {  -        WriterData.Status.ConnectTime = now;  -        ++WriterData.SocketVersion;  -  -        WriterData.Channel->DisableWrite();  -        WriterData.Status.Connected = true;  -        AtomicSet(ReturnConnectFailedImmediately, false);  -  -        WriterData.Status.MyAddr = TNetAddr(GetSockAddr(WriterData.Channel->GetSocket()));  -  -        TSocket readSocket = WriterData.Channel->GetSocketPtr();  -  -        ReaderGetSocketQueue()->EnqueueAndSchedule(TWriterToReaderSocketMessage(readSocket, WriterData.SocketVersion));  -  -        FireClientConnectionEvent(TClientConnectionEvent::CONNECTED);  -  -        ScheduleWrite();  -    } else {  -        if (WouldBlock() || err == EALREADY) {  -            WriterData.Channel->EnableWrite();  -        } else {  -            WriterData.DropChannel();  -            WriterData.Status.MyAddr = TNetAddr();  -            WriterData.Status.Connected = false;  -            WriterData.Status.ConnectError = err;  -  +    ++WriterData.Status.ConnectSyscalls; + +    int ret = connect(WriterData.Channel->GetSocket(), PeerAddr.Addr(), PeerAddr.Len()); +    int err = ret ? LastSystemError() : 0; + +    if (!ret || (ret && err == EISCONN)) { +        WriterData.Status.ConnectTime = now; +        ++WriterData.SocketVersion; + +        WriterData.Channel->DisableWrite(); +        WriterData.Status.Connected = true; +        AtomicSet(ReturnConnectFailedImmediately, false); + +        WriterData.Status.MyAddr = TNetAddr(GetSockAddr(WriterData.Channel->GetSocket())); + +        TSocket readSocket = WriterData.Channel->GetSocketPtr(); + +        ReaderGetSocketQueue()->EnqueueAndSchedule(TWriterToReaderSocketMessage(readSocket, WriterData.SocketVersion)); + +        FireClientConnectionEvent(TClientConnectionEvent::CONNECTED); + +        ScheduleWrite(); +    } else { +        if (WouldBlock() || err == EALREADY) { +            WriterData.Channel->EnableWrite(); +        } else { +            WriterData.DropChannel(); +            WriterData.Status.MyAddr = TNetAddr(); +            WriterData.Status.Connected = false; +            WriterData.Status.ConnectError = err; +              DropEnqueuedData(MESSAGE_CONNECT_FAILED, MESSAGE_CONNECT_FAILED); -        }  -    }  -}  -  -void TRemoteClientConnection::HandleEvent(SOCKET socket, void* cookie) {  +        } +    } +} + +void TRemoteClientConnection::HandleEvent(SOCKET socket, void* cookie) {      Y_UNUSED(socket);      Y_ASSERT(cookie == WriteCookie || cookie == ReadCookie); -    if (cookie == ReadCookie) {  -        ScheduleRead();  -    } else {  -        ScheduleWrite();  -    }  -}  -  -void TRemoteClientConnection::WriterFillStatus() {  -    TRemoteConnection::WriterFillStatus();  -    WriterData.Status.AckMessagesSize = AckMessages.Size();  -}  -  -void TRemoteClientConnection::BeforeTryWrite() {  -    ProcessReplyQueue();  -    TimeoutMessages();  -}  -  +    if (cookie == ReadCookie) { +        ScheduleRead(); +    } else { +        ScheduleWrite(); +    } +} + +void TRemoteClientConnection::WriterFillStatus() { +    TRemoteConnection::WriterFillStatus(); +    WriterData.Status.AckMessagesSize = AckMessages.Size(); +} + +void TRemoteClientConnection::BeforeTryWrite() { +    ProcessReplyQueue(); +    TimeoutMessages(); +} +  namespace NBus {      namespace NPrivate {          class TInvokeOnReply: public IWorkItem { @@ -145,7 +145,7 @@ namespace NBus {              TRemoteClientSession* RemoteClientSession;              TNonDestroyingHolder<TBusMessage> Request;              TBusMessagePtrAndHeader Response; -  +          public:              TInvokeOnReply(TRemoteClientSession* session,                             TNonDestroyingAutoPtr<TBusMessage> request, TBusMessagePtrAndHeader& response) @@ -154,7 +154,7 @@ namespace NBus {              {                  Response.Swap(response);              } -  +              void DoWork() override {                  THolder<TInvokeOnReply> holder(this);                  RemoteClientSession->ReleaseInFlightAndCallOnReply(Request.Release(), Response); @@ -162,182 +162,182 @@ namespace NBus {                  RemoteClientSession->JobCount.Decrement();              }          }; -  +      }  } -  -void TRemoteClientConnection::ProcessReplyQueue() {  -    if (AtomicGet(WriterData.Down)) {  -        return;  -    }  -  -    bool executeInWorkerPool = Session->Config.ExecuteOnReplyInWorkerPool;  -  -    TTempTlsVector<TBusMessagePtrAndHeader, void, TVectorSwaps> replyQueueTemp;  -    TTempTlsVector< ::NActor::IWorkItem*> workQueueTemp;  -  -    ReplyQueue.DequeueAllSingleConsumer(replyQueueTemp.GetVector());  -    if (executeInWorkerPool) {  -        workQueueTemp.GetVector()->reserve(replyQueueTemp.GetVector()->size());  -    }  -  + +void TRemoteClientConnection::ProcessReplyQueue() { +    if (AtomicGet(WriterData.Down)) { +        return; +    } + +    bool executeInWorkerPool = Session->Config.ExecuteOnReplyInWorkerPool; + +    TTempTlsVector<TBusMessagePtrAndHeader, void, TVectorSwaps> replyQueueTemp; +    TTempTlsVector< ::NActor::IWorkItem*> workQueueTemp; + +    ReplyQueue.DequeueAllSingleConsumer(replyQueueTemp.GetVector()); +    if (executeInWorkerPool) { +        workQueueTemp.GetVector()->reserve(replyQueueTemp.GetVector()->size()); +    } +      for (auto& resp : *replyQueueTemp.GetVector()) {          TBusMessage* req = PopAck(resp.Header.Id); -  -        if (!req) {  + +        if (!req) {              WriterErrorMessage(resp.MessagePtr.Release(), MESSAGE_UNKNOWN); -            continue;  -        }  -  -        if (executeInWorkerPool) {  +            continue; +        } + +        if (executeInWorkerPool) {              workQueueTemp.GetVector()->push_back(new TInvokeOnReply(GetSession(), req, resp)); -        } else {  +        } else {              GetSession()->ReleaseInFlightAndCallOnReply(req, resp); -        }  -    }  -  -    if (executeInWorkerPool) {  -        Session->JobCount.Add(workQueueTemp.GetVector()->size());  -        Session->Queue->EnqueueWork(*workQueueTemp.GetVector());  -    }  -}  -  -void TRemoteClientConnection::TimeoutMessages() {  -    if (!TimeToTimeoutMessages.FetchTask()) {  -        return;  -    }  -  -    TMessagesPtrs timedOutMessages;  -  -    TInstant sendDeadline;  -    TInstant ackDeadline;  -    if (IsReturnConnectFailedImmediately()) {  -        sendDeadline = TInstant::Max();  -        ackDeadline = TInstant::Max();  -    } else {  -        TInstant now = TInstant::Now();  -        sendDeadline = now - TDuration::MilliSeconds(Session->Config.SendTimeout);  -        ackDeadline = now - TDuration::MilliSeconds(Session->Config.TotalTimeout);  -    }  -  -    {  -        TMessagesPtrs temp;  -        WriterData.SendQueue.Timeout(sendDeadline, &temp);  -        timedOutMessages.insert(timedOutMessages.end(), temp.begin(), temp.end());  -    }  -  -    // Ignores message that is being written currently (that is stored  -    // in WriteMessage). It is not a big problem, because after written  -    // to the network, message will be placed to the AckMessages queue,  -    // and timed out on the next iteration of this procedure.  -  -    {  -        TMessagesPtrs temp;  -        AckMessages.Timeout(ackDeadline, &temp);  -        timedOutMessages.insert(timedOutMessages.end(), temp.begin(), temp.end());  -    }  -  -    ResetOneWayFlag(timedOutMessages);  -  -    GetSession()->ReleaseInFlight(timedOutMessages);  -    WriterErrorMessages(timedOutMessages, MESSAGE_TIMEOUT);  -}  -  -void TRemoteClientConnection::ScheduleTimeoutMessages() {  -    TimeToTimeoutMessages.AddTask();  -    ScheduleWrite();  -}  -  +        } +    } + +    if (executeInWorkerPool) { +        Session->JobCount.Add(workQueueTemp.GetVector()->size()); +        Session->Queue->EnqueueWork(*workQueueTemp.GetVector()); +    } +} + +void TRemoteClientConnection::TimeoutMessages() { +    if (!TimeToTimeoutMessages.FetchTask()) { +        return; +    } + +    TMessagesPtrs timedOutMessages; + +    TInstant sendDeadline; +    TInstant ackDeadline; +    if (IsReturnConnectFailedImmediately()) { +        sendDeadline = TInstant::Max(); +        ackDeadline = TInstant::Max(); +    } else { +        TInstant now = TInstant::Now(); +        sendDeadline = now - TDuration::MilliSeconds(Session->Config.SendTimeout); +        ackDeadline = now - TDuration::MilliSeconds(Session->Config.TotalTimeout); +    } + +    { +        TMessagesPtrs temp; +        WriterData.SendQueue.Timeout(sendDeadline, &temp); +        timedOutMessages.insert(timedOutMessages.end(), temp.begin(), temp.end()); +    } + +    // Ignores message that is being written currently (that is stored +    // in WriteMessage). It is not a big problem, because after written +    // to the network, message will be placed to the AckMessages queue, +    // and timed out on the next iteration of this procedure. + +    { +        TMessagesPtrs temp; +        AckMessages.Timeout(ackDeadline, &temp); +        timedOutMessages.insert(timedOutMessages.end(), temp.begin(), temp.end()); +    } + +    ResetOneWayFlag(timedOutMessages); + +    GetSession()->ReleaseInFlight(timedOutMessages); +    WriterErrorMessages(timedOutMessages, MESSAGE_TIMEOUT); +} + +void TRemoteClientConnection::ScheduleTimeoutMessages() { +    TimeToTimeoutMessages.AddTask(); +    ScheduleWrite(); +} +  void TRemoteClientConnection::ReaderProcessMessageUnknownVersion(TArrayRef<const char>) { -    LWPROBE(Error, ToString(MESSAGE_INVALID_VERSION), ToString(PeerAddr), "");  -    ReaderData.Status.Incremental.StatusCounter[MESSAGE_INVALID_VERSION] += 1;  -    // TODO: close connection  +    LWPROBE(Error, ToString(MESSAGE_INVALID_VERSION), ToString(PeerAddr), ""); +    ReaderData.Status.Incremental.StatusCounter[MESSAGE_INVALID_VERSION] += 1; +    // TODO: close connection      Y_FAIL("unknown message"); -}  -  -void TRemoteClientConnection::ClearOutgoingQueue(TMessagesPtrs& result, bool reconnect) {  +} + +void TRemoteClientConnection::ClearOutgoingQueue(TMessagesPtrs& result, bool reconnect) {      Y_ASSERT(result.empty()); -  -    TRemoteConnection::ClearOutgoingQueue(result, reconnect);  -    AckMessages.Clear(&result);  -  -    ResetOneWayFlag(result);  -    GetSession()->ReleaseInFlight(result);  -}  -  + +    TRemoteConnection::ClearOutgoingQueue(result, reconnect); +    AckMessages.Clear(&result); + +    ResetOneWayFlag(result); +    GetSession()->ReleaseInFlight(result); +} +  void TRemoteClientConnection::MessageSent(TArrayRef<TBusMessagePtrAndHeader> messages) {      for (auto& message : messages) {          bool oneWay = message.LocalFlags & MESSAGE_ONE_WAY_INTERNAL; -  -        if (oneWay) {  + +        if (oneWay) {              message.MessagePtr->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL; -  +              TBusMessage* ackMsg = this->PopAck(message.Header.Id); -            if (!ackMsg) {  -                // TODO: expired?  -            }  -  +            if (!ackMsg) { +                // TODO: expired? +            } +              if (ackMsg != message.MessagePtr.Get()) { -                // TODO: non-unique id?  -            }  -  +                // TODO: non-unique id? +            } +              GetSession()->ReleaseInFlight({message.MessagePtr.Get()});              ClientHandler->OnMessageSentOneWay(message.MessagePtr.Release()); -        } else {  +        } else {              ClientHandler->OnMessageSent(message.MessagePtr.Get());              AckMessages.Push(message); -        }  -    }  -}  -  -EMessageStatus TRemoteClientConnection::SendMessage(TBusMessage* req, bool wait) {  -    return SendMessageImpl(req, wait, false);  -}  -  -EMessageStatus TRemoteClientConnection::SendMessageOneWay(TBusMessage* req, bool wait) {  -    return SendMessageImpl(req, wait, true);  -}  -  -EMessageStatus TRemoteClientConnection::SendMessageImpl(TBusMessage* msg, bool wait, bool oneWay) {  -    msg->CheckClean();  -  -    if (Session->IsDown()) {  -        return MESSAGE_SHUTDOWN;  -    }  -  -    if (wait) {  +        } +    } +} + +EMessageStatus TRemoteClientConnection::SendMessage(TBusMessage* req, bool wait) { +    return SendMessageImpl(req, wait, false); +} + +EMessageStatus TRemoteClientConnection::SendMessageOneWay(TBusMessage* req, bool wait) { +    return SendMessageImpl(req, wait, true); +} + +EMessageStatus TRemoteClientConnection::SendMessageImpl(TBusMessage* msg, bool wait, bool oneWay) { +    msg->CheckClean(); + +    if (Session->IsDown()) { +        return MESSAGE_SHUTDOWN; +    } + +    if (wait) {          Y_VERIFY(!Session->Queue->GetExecutor()->IsInExecutorThread()); -        GetSession()->ClientRemoteInFlight.Wait();  -    } else {  -        if (!GetSession()->ClientRemoteInFlight.TryWait()) {  -            return MESSAGE_BUSY;  -        }  -    }  -  +        GetSession()->ClientRemoteInFlight.Wait(); +    } else { +        if (!GetSession()->ClientRemoteInFlight.TryWait()) { +            return MESSAGE_BUSY; +        } +    } +      GetSession()->AcquireInFlight({msg}); -  -    EMessageStatus ret = MESSAGE_OK;  -  -    if (oneWay) {  -        msg->LocalFlags |= MESSAGE_ONE_WAY_INTERNAL;  -    }  -  -    msg->GetHeader()->SendTime = Now();  -  -    if (IsReturnConnectFailedImmediately()) {  -        ret = MESSAGE_CONNECT_FAILED;  -        goto clean;  -    }  -  -    Send(msg);  -  -    return MESSAGE_OK;  -clean:  -    msg->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL;  + +    EMessageStatus ret = MESSAGE_OK; + +    if (oneWay) { +        msg->LocalFlags |= MESSAGE_ONE_WAY_INTERNAL; +    } + +    msg->GetHeader()->SendTime = Now(); + +    if (IsReturnConnectFailedImmediately()) { +        ret = MESSAGE_CONNECT_FAILED; +        goto clean; +    } + +    Send(msg); + +    return MESSAGE_OK; +clean: +    msg->LocalFlags &= ~MESSAGE_ONE_WAY_INTERNAL;      GetSession()->ReleaseInFlight({msg}); -    return ret;  -}  -  -void TRemoteClientConnection::OpenConnection() {  -    // TODO  -}  +    return ret; +} + +void TRemoteClientConnection::OpenConnection() { +    // TODO +} | 
