diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:17 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:17 +0300 |
commit | d3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch) | |
tree | dd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/actors/interconnect | |
parent | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff) | |
download | ydb-d3a398281c6fd1d3672036cb2d63f842d2cb28c5.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/actors/interconnect')
43 files changed, 2154 insertions, 2154 deletions
diff --git a/library/cpp/actors/interconnect/event_holder_pool.h b/library/cpp/actors/interconnect/event_holder_pool.h index 9fe5e03be6..b6090a3bc8 100644 --- a/library/cpp/actors/interconnect/event_holder_pool.h +++ b/library/cpp/actors/interconnect/event_holder_pool.h @@ -117,7 +117,7 @@ namespace NActors { } private: - TEvFreeItems* GetPendingEvent() { + TEvFreeItems* GetPendingEvent() { if (!PendingFreeEvent) { PendingFreeEvent.Reset(new TEvFreeItems); } @@ -125,4 +125,4 @@ namespace NActors { } }; -} +} diff --git a/library/cpp/actors/interconnect/events_local.h b/library/cpp/actors/interconnect/events_local.h index 7880ca283f..8a46ffd535 100644 --- a/library/cpp/actors/interconnect/events_local.h +++ b/library/cpp/actors/interconnect/events_local.h @@ -11,324 +11,324 @@ #include "types.h" namespace NActors { - struct TProgramInfo { - ui64 PID = 0; - ui64 StartTime = 0; - ui64 Serial = 0; - }; - - enum class ENetwork : ui32 { - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // local messages - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - Start = EventSpaceBegin(TEvents::ES_INTERCONNECT_TCP), - - SocketReadyRead = Start, - SocketReadyWrite, - SocketError, - Connect, - Disconnect, - IncomingConnection, - HandshakeAsk, - HandshakeAck, + struct TProgramInfo { + ui64 PID = 0; + ui64 StartTime = 0; + ui64 Serial = 0; + }; + + enum class ENetwork : ui32 { + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // local messages + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + Start = EventSpaceBegin(TEvents::ES_INTERCONNECT_TCP), + + SocketReadyRead = Start, + SocketReadyWrite, + SocketError, + Connect, + Disconnect, + IncomingConnection, + HandshakeAsk, + HandshakeAck, HandshakeNak, - HandshakeDone, - HandshakeFail, - Kick, - Flush, - NodeInfo, - BunchOfEventsToDestroy, - HandshakeRequest, - HandshakeReplyOK, - HandshakeReplyError, - ResolveAddress, - AddressInfo, - ResolveError, - HTTPStreamStatus, - HTTPSendContent, - ConnectProtocolWakeup, - HTTPProtocolRetry, - EvPollerRegister, + HandshakeDone, + HandshakeFail, + Kick, + Flush, + NodeInfo, + BunchOfEventsToDestroy, + HandshakeRequest, + HandshakeReplyOK, + HandshakeReplyError, + ResolveAddress, + AddressInfo, + ResolveError, + HTTPStreamStatus, + HTTPSendContent, + ConnectProtocolWakeup, + HTTPProtocolRetry, + EvPollerRegister, EvPollerRegisterResult, EvPollerReady, - EvUpdateFromInputSession, + EvUpdateFromInputSession, EvConfirmUpdate, - EvSessionBufferSizeRequest, - EvSessionBufferSizeResponse, + EvSessionBufferSizeRequest, + EvSessionBufferSizeResponse, EvProcessPingRequest, EvGetSecureSocket, EvSecureSocket, - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // nonlocal messages; their indices must be preserved in order to work properly while doing rolling update - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - // interconnect load test message - EvLoadMessage = Start + 256, - }; - - struct TEvSocketReadyRead: public TEventLocal<TEvSocketReadyRead, ui32(ENetwork::SocketReadyRead)> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketReadyRead, "Network: TEvSocketReadyRead") - }; - - struct TEvSocketReadyWrite: public TEventLocal<TEvSocketReadyWrite, ui32(ENetwork::SocketReadyWrite)> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketReadyWrite, "Network: TEvSocketReadyWrite") - }; - - struct TEvSocketError: public TEventLocal<TEvSocketError, ui32(ENetwork::SocketError)> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketError, ::strerror(Error)) - TString GetReason() const { - return ::strerror(Error); - } - const int Error; - TIntrusivePtr<NInterconnect::TStreamSocket> Socket; - - TEvSocketError(int error, TIntrusivePtr<NInterconnect::TStreamSocket> sock) - : Error(error) - , Socket(std::move(sock)) - { - } - }; - - struct TEvSocketConnect: public TEventLocal<TEvSocketConnect, ui32(ENetwork::Connect)> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketConnect, "Network: TEvSocketConnect") - }; - - struct TEvSocketDisconnect: public TEventLocal<TEvSocketDisconnect, ui32(ENetwork::Disconnect)> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketDisconnect, "Network: TEvSocketDisconnect") + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // nonlocal messages; their indices must be preserved in order to work properly while doing rolling update + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + // interconnect load test message + EvLoadMessage = Start + 256, + }; + + struct TEvSocketReadyRead: public TEventLocal<TEvSocketReadyRead, ui32(ENetwork::SocketReadyRead)> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketReadyRead, "Network: TEvSocketReadyRead") + }; + + struct TEvSocketReadyWrite: public TEventLocal<TEvSocketReadyWrite, ui32(ENetwork::SocketReadyWrite)> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketReadyWrite, "Network: TEvSocketReadyWrite") + }; + + struct TEvSocketError: public TEventLocal<TEvSocketError, ui32(ENetwork::SocketError)> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketError, ::strerror(Error)) + TString GetReason() const { + return ::strerror(Error); + } + const int Error; + TIntrusivePtr<NInterconnect::TStreamSocket> Socket; + + TEvSocketError(int error, TIntrusivePtr<NInterconnect::TStreamSocket> sock) + : Error(error) + , Socket(std::move(sock)) + { + } + }; + + struct TEvSocketConnect: public TEventLocal<TEvSocketConnect, ui32(ENetwork::Connect)> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketConnect, "Network: TEvSocketConnect") + }; + + struct TEvSocketDisconnect: public TEventLocal<TEvSocketDisconnect, ui32(ENetwork::Disconnect)> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketDisconnect, "Network: TEvSocketDisconnect") TDisconnectReason Reason; TEvSocketDisconnect(TDisconnectReason reason) : Reason(std::move(reason)) - { - } - }; + { + } + }; - struct TEvHandshakeAsk: public TEventLocal<TEvHandshakeAsk, ui32(ENetwork::HandshakeAsk)> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeAsk, "Network: TEvHandshakeAsk") + struct TEvHandshakeAsk: public TEventLocal<TEvHandshakeAsk, ui32(ENetwork::HandshakeAsk)> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeAsk, "Network: TEvHandshakeAsk") TEvHandshakeAsk(const TActorId& self, const TActorId& peer, - ui64 counter) - : Self(self) - , Peer(peer) - , Counter(counter) - { - } + ui64 counter) + : Self(self) + , Peer(peer) + , Counter(counter) + { + } const TActorId Self; const TActorId Peer; - const ui64 Counter; - }; + const ui64 Counter; + }; - struct TEvHandshakeAck: public TEventLocal<TEvHandshakeAck, ui32(ENetwork::HandshakeAck)> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeAck, "Network: TEvHandshakeAck") + struct TEvHandshakeAck: public TEventLocal<TEvHandshakeAck, ui32(ENetwork::HandshakeAck)> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeAck, "Network: TEvHandshakeAck") TEvHandshakeAck(const TActorId& self, ui64 nextPacket, TSessionParams params) - : Self(self) - , NextPacket(nextPacket) + : Self(self) + , NextPacket(nextPacket) , Params(std::move(params)) {} const TActorId Self; - const ui64 NextPacket; + const ui64 NextPacket; const TSessionParams Params; - }; + }; struct TEvHandshakeNak : TEventLocal<TEvHandshakeNak, ui32(ENetwork::HandshakeNak)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvSocketReadyRead, "Network: TEvHandshakeNak") }; - struct TEvHandshakeRequest - : public TEventLocal<TEvHandshakeRequest, - ui32(ENetwork::HandshakeRequest)> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeRequest, - "Network: TEvHandshakeRequest") + struct TEvHandshakeRequest + : public TEventLocal<TEvHandshakeRequest, + ui32(ENetwork::HandshakeRequest)> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeRequest, + "Network: TEvHandshakeRequest") - NActorsInterconnect::THandshakeRequest Record; - }; + NActorsInterconnect::THandshakeRequest Record; + }; - struct TEvHandshakeReplyOK - : public TEventLocal<TEvHandshakeReplyOK, - ui32(ENetwork::HandshakeReplyOK)> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeReplyOK, - "Network: TEvHandshakeReplyOK") + struct TEvHandshakeReplyOK + : public TEventLocal<TEvHandshakeReplyOK, + ui32(ENetwork::HandshakeReplyOK)> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeReplyOK, + "Network: TEvHandshakeReplyOK") - NActorsInterconnect::THandshakeReply Record; - }; + NActorsInterconnect::THandshakeReply Record; + }; - struct TEvHandshakeReplyError - : public TEventLocal<TEvHandshakeReplyError, - ui32(ENetwork::HandshakeReplyError)> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeReplyError, - "Network: TEvHandshakeReplyError") + struct TEvHandshakeReplyError + : public TEventLocal<TEvHandshakeReplyError, + ui32(ENetwork::HandshakeReplyError)> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeReplyError, + "Network: TEvHandshakeReplyError") - TEvHandshakeReplyError(TString error) { - Record.SetErrorExplaination(error); - } + TEvHandshakeReplyError(TString error) { + Record.SetErrorExplaination(error); + } - NActorsInterconnect::THandshakeReply Record; - }; + NActorsInterconnect::THandshakeReply Record; + }; - struct TEvIncomingConnection: public TEventLocal<TEvIncomingConnection, ui32(ENetwork::IncomingConnection)> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvIncomingConnection, "Network: TEvIncomingConnection") - TIntrusivePtr<NInterconnect::TStreamSocket> Socket; - NInterconnect::TAddress Address; + struct TEvIncomingConnection: public TEventLocal<TEvIncomingConnection, ui32(ENetwork::IncomingConnection)> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvIncomingConnection, "Network: TEvIncomingConnection") + TIntrusivePtr<NInterconnect::TStreamSocket> Socket; + NInterconnect::TAddress Address; TEvIncomingConnection(TIntrusivePtr<NInterconnect::TStreamSocket> socket, NInterconnect::TAddress address) : Socket(std::move(socket)) , Address(std::move(address)) {} - }; + }; - struct TEvHandshakeDone: public TEventLocal<TEvHandshakeDone, ui32(ENetwork::HandshakeDone)> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeDone, "Network: TEvHandshakeDone") + struct TEvHandshakeDone: public TEventLocal<TEvHandshakeDone, ui32(ENetwork::HandshakeDone)> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeDone, "Network: TEvHandshakeDone") - TEvHandshakeDone( + TEvHandshakeDone( TIntrusivePtr<NInterconnect::TStreamSocket> socket, const TActorId& peer, const TActorId& self, ui64 nextPacket, TAutoPtr<TProgramInfo>&& programInfo, TSessionParams params) - : Socket(std::move(socket)) - , Peer(peer) - , Self(self) - , NextPacket(nextPacket) - , ProgramInfo(std::move(programInfo)) + : Socket(std::move(socket)) + , Peer(peer) + , Self(self) + , NextPacket(nextPacket) + , ProgramInfo(std::move(programInfo)) , Params(std::move(params)) - { - } + { + } - TIntrusivePtr<NInterconnect::TStreamSocket> Socket; + TIntrusivePtr<NInterconnect::TStreamSocket> Socket; const TActorId Peer; const TActorId Self; - const ui64 NextPacket; - TAutoPtr<TProgramInfo> ProgramInfo; + const ui64 NextPacket; + TAutoPtr<TProgramInfo> ProgramInfo; const TSessionParams Params; - }; + }; + + struct TEvHandshakeFail: public TEventLocal<TEvHandshakeFail, ui32(ENetwork::HandshakeFail)> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeFail, "Network: TEvHandshakeFail") - struct TEvHandshakeFail: public TEventLocal<TEvHandshakeFail, ui32(ENetwork::HandshakeFail)> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeFail, "Network: TEvHandshakeFail") + enum EnumHandshakeFail { + HANDSHAKE_FAIL_TRANSIENT, + HANDSHAKE_FAIL_PERMANENT, + HANDSHAKE_FAIL_SESSION_MISMATCH, + }; - enum EnumHandshakeFail { - HANDSHAKE_FAIL_TRANSIENT, - HANDSHAKE_FAIL_PERMANENT, - HANDSHAKE_FAIL_SESSION_MISMATCH, - }; - TEvHandshakeFail(EnumHandshakeFail temporary, TString explanation) - : Temporary(temporary) - , Explanation(std::move(explanation)) - { - } - - const EnumHandshakeFail Temporary; - const TString Explanation; - }; - - struct TEvKick: public TEventLocal<TEvKick, ui32(ENetwork::Kick)> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvKick, "Network: TEvKick") - }; - - struct TEvFlush: public TEventLocal<TEvFlush, ui32(ENetwork::Flush)> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvFlush, "Network: TEvFlush") - }; - - struct TEvLocalNodeInfo - : public TEventLocal<TEvLocalNodeInfo, ui32(ENetwork::NodeInfo)> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvLocalNodeInfo, "Network: TEvLocalNodeInfo") - - ui32 NodeId; - NAddr::IRemoteAddrPtr Address; - }; - - struct TEvBunchOfEventsToDestroy : TEventLocal<TEvBunchOfEventsToDestroy, ui32(ENetwork::BunchOfEventsToDestroy)> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvBunchOfEventsToDestroy, - "Network: TEvBunchOfEventsToDestroy") - - TEvBunchOfEventsToDestroy(TDeque<TAutoPtr<IEventBase>> events) - : Events(std::move(events)) - { - } - - TDeque<TAutoPtr<IEventBase>> Events; - }; - - struct TEvResolveAddress - : public TEventLocal<TEvResolveAddress, ui32(ENetwork::ResolveAddress)> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvResolveAddress, "Network: TEvResolveAddress") - - TString Address; - ui16 Port; - }; - - struct TEvAddressInfo - : public TEventLocal<TEvAddressInfo, ui32(ENetwork::AddressInfo)> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvAddressInfo, "Network: TEvAddressInfo") - - NAddr::IRemoteAddrPtr Address; - }; - - struct TEvResolveError - : public TEventLocal<TEvResolveError, ui32(ENetwork::ResolveError)> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvResolveError, "Network: TEvResolveError") - - TString Explain; - }; - - struct TEvHTTPStreamStatus - : public TEventLocal<TEvHTTPStreamStatus, ui32(ENetwork::HTTPStreamStatus)> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvHTTPStreamStatus, - "Network: TEvHTTPStreamStatus") - enum EStatus { - READY, - COMPLETE, - ERROR, - }; - - EStatus Status; - TString Error; - TString HttpHeaders; - }; - - struct TEvHTTPSendContent - : public TEventLocal<TEvHTTPSendContent, ui32(ENetwork::HTTPSendContent)> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvHTTPSendContent, "Network: TEvHTTPSendContent") - - const char* Data; - size_t Len; - bool Last; - }; - - struct TEvConnectWakeup - : public TEventLocal<TEvConnectWakeup, - ui32(ENetwork::ConnectProtocolWakeup)> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvConnectWakeup, "Protocols: TEvConnectWakeup") - }; - - struct TEvHTTPProtocolRetry - : public TEventLocal<TEvHTTPProtocolRetry, - ui32(ENetwork::HTTPProtocolRetry)> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvHTTPProtocolRetry, - "Protocols: TEvHTTPProtocolRetry") - }; - - struct TEvLoadMessage - : TEventPB<TEvLoadMessage, NActorsInterconnect::TEvLoadMessage, static_cast<ui32>(ENetwork::EvLoadMessage)> { - TEvLoadMessage() = default; - - template <typename TContainer> + : Temporary(temporary) + , Explanation(std::move(explanation)) + { + } + + const EnumHandshakeFail Temporary; + const TString Explanation; + }; + + struct TEvKick: public TEventLocal<TEvKick, ui32(ENetwork::Kick)> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvKick, "Network: TEvKick") + }; + + struct TEvFlush: public TEventLocal<TEvFlush, ui32(ENetwork::Flush)> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvFlush, "Network: TEvFlush") + }; + + struct TEvLocalNodeInfo + : public TEventLocal<TEvLocalNodeInfo, ui32(ENetwork::NodeInfo)> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvLocalNodeInfo, "Network: TEvLocalNodeInfo") + + ui32 NodeId; + NAddr::IRemoteAddrPtr Address; + }; + + struct TEvBunchOfEventsToDestroy : TEventLocal<TEvBunchOfEventsToDestroy, ui32(ENetwork::BunchOfEventsToDestroy)> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvBunchOfEventsToDestroy, + "Network: TEvBunchOfEventsToDestroy") + + TEvBunchOfEventsToDestroy(TDeque<TAutoPtr<IEventBase>> events) + : Events(std::move(events)) + { + } + + TDeque<TAutoPtr<IEventBase>> Events; + }; + + struct TEvResolveAddress + : public TEventLocal<TEvResolveAddress, ui32(ENetwork::ResolveAddress)> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvResolveAddress, "Network: TEvResolveAddress") + + TString Address; + ui16 Port; + }; + + struct TEvAddressInfo + : public TEventLocal<TEvAddressInfo, ui32(ENetwork::AddressInfo)> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvAddressInfo, "Network: TEvAddressInfo") + + NAddr::IRemoteAddrPtr Address; + }; + + struct TEvResolveError + : public TEventLocal<TEvResolveError, ui32(ENetwork::ResolveError)> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvResolveError, "Network: TEvResolveError") + + TString Explain; + }; + + struct TEvHTTPStreamStatus + : public TEventLocal<TEvHTTPStreamStatus, ui32(ENetwork::HTTPStreamStatus)> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvHTTPStreamStatus, + "Network: TEvHTTPStreamStatus") + enum EStatus { + READY, + COMPLETE, + ERROR, + }; + + EStatus Status; + TString Error; + TString HttpHeaders; + }; + + struct TEvHTTPSendContent + : public TEventLocal<TEvHTTPSendContent, ui32(ENetwork::HTTPSendContent)> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvHTTPSendContent, "Network: TEvHTTPSendContent") + + const char* Data; + size_t Len; + bool Last; + }; + + struct TEvConnectWakeup + : public TEventLocal<TEvConnectWakeup, + ui32(ENetwork::ConnectProtocolWakeup)> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvConnectWakeup, "Protocols: TEvConnectWakeup") + }; + + struct TEvHTTPProtocolRetry + : public TEventLocal<TEvHTTPProtocolRetry, + ui32(ENetwork::HTTPProtocolRetry)> { + DEFINE_SIMPLE_LOCAL_EVENT(TEvHTTPProtocolRetry, + "Protocols: TEvHTTPProtocolRetry") + }; + + struct TEvLoadMessage + : TEventPB<TEvLoadMessage, NActorsInterconnect::TEvLoadMessage, static_cast<ui32>(ENetwork::EvLoadMessage)> { + TEvLoadMessage() = default; + + template <typename TContainer> TEvLoadMessage(const TContainer& route, const TString& id, const TString* payload) { for (const TActorId& actorId : route) { - auto* hop = Record.AddHops(); + auto* hop = Record.AddHops(); if (actorId) { ActorIdToProto(actorId, hop->MutableNextHop()); } - } - Record.SetId(id); - if (payload) { + } + Record.SetId(id); + if (payload) { Record.SetPayload(*payload); - } + } } template <typename TContainer> @@ -342,39 +342,39 @@ namespace NActors { Record.SetId(id); AddPayload(std::move(payload)); } - }; + }; - struct TEvUpdateFromInputSession : TEventLocal<TEvUpdateFromInputSession, static_cast<ui32>(ENetwork::EvUpdateFromInputSession)> { - ui64 ConfirmedByInput; // latest Confirm value from processed input packet - ui64 NumDataBytes; + struct TEvUpdateFromInputSession : TEventLocal<TEvUpdateFromInputSession, static_cast<ui32>(ENetwork::EvUpdateFromInputSession)> { + ui64 ConfirmedByInput; // latest Confirm value from processed input packet + ui64 NumDataBytes; TDuration Ping; TEvUpdateFromInputSession(ui64 confirmedByInput, ui64 numDataBytes, TDuration ping) - : ConfirmedByInput(confirmedByInput) - , NumDataBytes(numDataBytes) + : ConfirmedByInput(confirmedByInput) + , NumDataBytes(numDataBytes) , Ping(ping) - { - } - }; + { + } + }; struct TEvConfirmUpdate : TEventLocal<TEvConfirmUpdate, static_cast<ui32>(ENetwork::EvConfirmUpdate)> {}; - struct TEvSessionBufferSizeRequest : TEventLocal<TEvSessionBufferSizeRequest, static_cast<ui32>(ENetwork::EvSessionBufferSizeRequest)> { - //DEFINE_SIMPLE_LOCAL_EVENT(TEvSessionBufferSizeRequest, "Session: TEvSessionBufferSizeRequest") - DEFINE_SIMPLE_LOCAL_EVENT(TEvSessionBufferSizeRequest, "Network: TEvSessionBufferSizeRequest"); - }; + struct TEvSessionBufferSizeRequest : TEventLocal<TEvSessionBufferSizeRequest, static_cast<ui32>(ENetwork::EvSessionBufferSizeRequest)> { + //DEFINE_SIMPLE_LOCAL_EVENT(TEvSessionBufferSizeRequest, "Session: TEvSessionBufferSizeRequest") + DEFINE_SIMPLE_LOCAL_EVENT(TEvSessionBufferSizeRequest, "Network: TEvSessionBufferSizeRequest"); + }; - struct TEvSessionBufferSizeResponse : TEventLocal<TEvSessionBufferSizeResponse, static_cast<ui32>(ENetwork::EvSessionBufferSizeResponse)> { + struct TEvSessionBufferSizeResponse : TEventLocal<TEvSessionBufferSizeResponse, static_cast<ui32>(ENetwork::EvSessionBufferSizeResponse)> { TEvSessionBufferSizeResponse(const TActorId& sessionId, ui64 outputBufferSize) - : SessionID(sessionId) - , BufferSize(outputBufferSize) - { - } + : SessionID(sessionId) + , BufferSize(outputBufferSize) + { + } TActorId SessionID; - ui64 BufferSize; - }; + ui64 BufferSize; + }; struct TEvProcessPingRequest : TEventLocal<TEvProcessPingRequest, static_cast<ui32>(ENetwork::EvProcessPingRequest)> { const ui64 Payload; @@ -400,4 +400,4 @@ namespace NActors { {} }; -} +} diff --git a/library/cpp/actors/interconnect/interconnect.h b/library/cpp/actors/interconnect/interconnect.h index 79f1fd1184..225a5243fd 100644 --- a/library/cpp/actors/interconnect/interconnect.h +++ b/library/cpp/actors/interconnect/interconnect.h @@ -6,122 +6,122 @@ #include <util/network/address.h> namespace NActors { - struct TInterconnectGlobalState: public TThrRefBase { - TString SelfAddress; - ui32 SelfPort; + struct TInterconnectGlobalState: public TThrRefBase { + TString SelfAddress; + ui32 SelfPort; TVector<TActorId> GlobalNameservers; // todo: add some info about (like expected reply time) - }; + }; - struct TInterconnectProxySetup: public TThrRefBase { - // synchronous (session -> proxy) - struct IProxy : TNonCopyable { - virtual ~IProxy() { - } + struct TInterconnectProxySetup: public TThrRefBase { + // synchronous (session -> proxy) + struct IProxy : TNonCopyable { + virtual ~IProxy() { + } - virtual void ActivateSession(const TActorContext& ctx) = 0; // session activated - virtual void DetachSession(const TActorContext& ctx) = 0; // session is dead - }; + virtual void ActivateSession(const TActorContext& ctx) = 0; // session activated + virtual void DetachSession(const TActorContext& ctx) = 0; // session is dead + }; - // synchronous (proxy -> session) - struct ISession : TNonCopyable { - virtual ~ISession() { - } + // synchronous (proxy -> session) + struct ISession : TNonCopyable { + virtual ~ISession() { + } - virtual void DetachSession(const TActorContext& ownerCtx, const TActorContext& sessionCtx) = 0; // kill yourself - virtual void ForwardPacket(TAutoPtr<IEventHandle>& ev, const TActorContext& ownerCtx, const TActorContext& sessionCtx) = 0; // receive packet for forward - virtual void Connect(const TActorContext& ownerCtx, const TActorContext& sessionCtx) = 0; // begin connection - virtual bool ReceiveIncomingSession(TAutoPtr<IEventHandle>& ev, const TActorContext& ownerCtx, const TActorContext& sessionCtx) = 0; // handle incoming session, if returns true - then session is dead and must be recreated with new one - }; + virtual void DetachSession(const TActorContext& ownerCtx, const TActorContext& sessionCtx) = 0; // kill yourself + virtual void ForwardPacket(TAutoPtr<IEventHandle>& ev, const TActorContext& ownerCtx, const TActorContext& sessionCtx) = 0; // receive packet for forward + virtual void Connect(const TActorContext& ownerCtx, const TActorContext& sessionCtx) = 0; // begin connection + virtual bool ReceiveIncomingSession(TAutoPtr<IEventHandle>& ev, const TActorContext& ownerCtx, const TActorContext& sessionCtx) = 0; // handle incoming session, if returns true - then session is dead and must be recreated with new one + }; - ui32 DestinationNode; + ui32 DestinationNode; - TString StaticAddress; // if set - would be used as main destination address - int StaticPort; + TString StaticAddress; // if set - would be used as main destination address + int StaticPort; - TIntrusivePtr<TInterconnectGlobalState> GlobalState; + TIntrusivePtr<TInterconnectGlobalState> GlobalState; virtual IActor* CreateSession(const TActorId& ownerId, IProxy* owner) = 0; // returned actor is session and would be attached to same mailbox as proxy to allow sync calls - virtual TActorSetupCmd CreateAcceptor() = 0; - }; + virtual TActorSetupCmd CreateAcceptor() = 0; + }; - struct TNameserverSetup { + struct TNameserverSetup { TActorId ServiceID; - TIntrusivePtr<TInterconnectGlobalState> GlobalState; - }; + TIntrusivePtr<TInterconnectGlobalState> GlobalState; + }; - struct TTableNameserverSetup: public TThrRefBase { - struct TNodeInfo { - TString Address; - TString Host; - TString ResolveHost; - ui16 Port; + struct TTableNameserverSetup: public TThrRefBase { + struct TNodeInfo { + TString Address; + TString Host; + TString ResolveHost; + ui16 Port; TNodeLocation Location; - TString& first; - ui16& second; + TString& first; + ui16& second; - TNodeInfo() - : first(Address) - , second(Port) - { - } + TNodeInfo() + : first(Address) + , second(Port) + { + } TNodeInfo(const TNodeInfo&) = default; - // for testing purposes only - TNodeInfo(const TString& address, const TString& host, ui16 port) - : TNodeInfo() - { - Address = address; - Host = host; - ResolveHost = host; - Port = port; - } - - TNodeInfo(const TString& address, - const TString& host, - const TString& resolveHost, - ui16 port, + // for testing purposes only + TNodeInfo(const TString& address, const TString& host, ui16 port) + : TNodeInfo() + { + Address = address; + Host = host; + ResolveHost = host; + Port = port; + } + + TNodeInfo(const TString& address, + const TString& host, + const TString& resolveHost, + ui16 port, const TNodeLocation& location) - : TNodeInfo() - { - Address = address; - Host = host; - ResolveHost = resolveHost; - Port = port; - Location = location; - } - - // for testing purposes only - TNodeInfo& operator=(const std::pair<TString, ui32>& pr) { - Address = pr.first; - Host = pr.first; - ResolveHost = pr.first; - Port = pr.second; - return *this; - } - - TNodeInfo& operator=(const TNodeInfo& ni) { - Address = ni.Address; - Host = ni.Host; - ResolveHost = ni.ResolveHost; - Port = ni.Port; - Location = ni.Location; - return *this; - } - }; - - TMap<ui32, TNodeInfo> StaticNodeTable; - - bool IsEntriesUnique() const; + : TNodeInfo() + { + Address = address; + Host = host; + ResolveHost = resolveHost; + Port = port; + Location = location; + } + + // for testing purposes only + TNodeInfo& operator=(const std::pair<TString, ui32>& pr) { + Address = pr.first; + Host = pr.first; + ResolveHost = pr.first; + Port = pr.second; + return *this; + } + + TNodeInfo& operator=(const TNodeInfo& ni) { + Address = ni.Address; + Host = ni.Host; + ResolveHost = ni.ResolveHost; + Port = ni.Port; + Location = ni.Location; + return *this; + } + }; + + TMap<ui32, TNodeInfo> StaticNodeTable; + + bool IsEntriesUnique() const; }; - struct TNodeRegistrarSetup { + struct TNodeRegistrarSetup { TActorId ServiceID; - TIntrusivePtr<TInterconnectGlobalState> GlobalState; - }; + TIntrusivePtr<TInterconnectGlobalState> GlobalState; + }; TActorId GetNameserviceActorId(); @@ -129,9 +129,9 @@ namespace NActors { * Const table-lookup based name service */ - IActor* CreateNameserverTable( - const TIntrusivePtr<TTableNameserverSetup>& setup, - ui32 poolId = 0); + IActor* CreateNameserverTable( + const TIntrusivePtr<TTableNameserverSetup>& setup, + ui32 poolId = 0); /** * Name service which can be paired with external discovery service. diff --git a/library/cpp/actors/interconnect/interconnect_address.cpp b/library/cpp/actors/interconnect/interconnect_address.cpp index 8dff6ecb42..8f474f5a39 100644 --- a/library/cpp/actors/interconnect/interconnect_address.cpp +++ b/library/cpp/actors/interconnect/interconnect_address.cpp @@ -9,9 +9,9 @@ #endif namespace NInterconnect { - TAddress::TAddress() { + TAddress::TAddress() { memset(&Addr, 0, sizeof(Addr)); - } + } TAddress::TAddress(NAddr::IRemoteAddr& addr) { socklen_t len = addr.Len(); @@ -19,42 +19,42 @@ namespace NInterconnect { memcpy(&Addr.Generic, addr.Addr(), len); } - int TAddress::GetFamily() const { + int TAddress::GetFamily() const { return Addr.Generic.sa_family; - } + } - socklen_t TAddress::Size() const { + socklen_t TAddress::Size() const { switch (Addr.Generic.sa_family) { - case AF_INET6: + case AF_INET6: return sizeof(sockaddr_in6); - case AF_INET: + case AF_INET: return sizeof(sockaddr_in); - default: - return 0; - } - } + default: + return 0; + } + } - sockaddr* TAddress::SockAddr() { + sockaddr* TAddress::SockAddr() { return &Addr.Generic; } - const sockaddr* TAddress::SockAddr() const { + const sockaddr* TAddress::SockAddr() const { return &Addr.Generic; - } + } - ui16 TAddress::GetPort() const { + ui16 TAddress::GetPort() const { switch (Addr.Generic.sa_family) { - case AF_INET6: + case AF_INET6: return ntohs(Addr.Ipv6.sin6_port); - case AF_INET: + case AF_INET: return ntohs(Addr.Ipv4.sin_port); - default: - return 0; - } - } + default: + return 0; + } + } - TString TAddress::ToString() const { - return GetAddress() + ":" + ::ToString(GetPort()); + TString TAddress::ToString() const { + return GetAddress() + ":" + ::ToString(GetPort()); } TAddress::TAddress(const char* addr, ui16 port) { @@ -63,8 +63,8 @@ namespace NInterconnect { Addr.Ipv6.sin6_port = htons(port); } else if (inet_pton(Addr.Ipv4.sin_family = AF_INET, addr, &Addr.Ipv4.sin_addr)) { Addr.Ipv4.sin_port = htons(port); - } - } + } + } TAddress::TAddress(const TString& addr, ui16 port) : TAddress(addr.data(), port) @@ -75,17 +75,17 @@ namespace NInterconnect { socklen_t size; switch (Addr.Generic.sa_family) { - case AF_INET6: + case AF_INET6: std::tie(src, size) = std::make_tuple(&Addr.Ipv6.sin6_addr, INET6_ADDRSTRLEN); break; - case AF_INET: + case AF_INET: std::tie(src, size) = std::make_tuple(&Addr.Ipv4.sin_addr, INET_ADDRSTRLEN); break; - default: - return TString(); - } + default: + return TString(); + } char *buffer = static_cast<char*>(alloca(size)); const char *p = inet_ntop(Addr.Generic.sa_family, const_cast<void*>(src), buffer, size); diff --git a/library/cpp/actors/interconnect/interconnect_address.h b/library/cpp/actors/interconnect/interconnect_address.h index 9c01381ce3..e9e0faec81 100644 --- a/library/cpp/actors/interconnect/interconnect_address.h +++ b/library/cpp/actors/interconnect/interconnect_address.h @@ -6,24 +6,24 @@ #include <util/generic/string.h> namespace NInterconnect { - class TAddress { + class TAddress { union { sockaddr Generic; sockaddr_in Ipv4; sockaddr_in6 Ipv6; } Addr; - public: - TAddress(); - TAddress(const char* addr, ui16 port); + public: + TAddress(); + TAddress(const char* addr, ui16 port); TAddress(const TString& addr, ui16 port); TAddress(NAddr::IRemoteAddr& addr); - int GetFamily() const; - socklen_t Size() const; - ::sockaddr* SockAddr(); - const ::sockaddr* SockAddr() const; - ui16 GetPort() const; - TString GetAddress() const; - TString ToString() const; + int GetFamily() const; + socklen_t Size() const; + ::sockaddr* SockAddr(); + const ::sockaddr* SockAddr() const; + ui16 GetPort() const; + TString GetAddress() const; + TString ToString() const; }; } diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp index bb54667af6..a66ba2a154 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.cpp +++ b/library/cpp/actors/interconnect/interconnect_channel.cpp @@ -11,11 +11,11 @@ LWTRACE_USING(ACTORLIB_PROVIDER); namespace NActors { - DECLARE_WILSON_EVENT(EventSentToSocket); - DECLARE_WILSON_EVENT(EventReceivedFromSocket); + DECLARE_WILSON_EVENT(EventSentToSocket); + DECLARE_WILSON_EVENT(EventReceivedFromSocket); bool TEventOutputChannel::FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) { - const size_t amount = sizeof(TChannelPart) + sizeof(TEventDescr); + const size_t amount = sizeof(TChannelPart) + sizeof(TEventDescr); if (task.GetVirtualFreeAmount() < amount) { return false; } @@ -32,8 +32,8 @@ namespace NActors { (ExtendedFormat ? IEventHandle::FlagExtendedFormat : 0); TChannelPart *part = static_cast<TChannelPart*>(task.GetFreeArea()); - part->Channel = ChannelId | TChannelPart::LastPartFlag; - part->Size = sizeof(TEventDescr); + part->Channel = ChannelId | TChannelPart::LastPartFlag; + part->Size = sizeof(TEventDescr); memcpy(part + 1, &event.Descr, sizeof(TEventDescr)); task.AppendBuf(part, amount); *weightConsumed += amount; @@ -41,13 +41,13 @@ namespace NActors { Metrics->UpdateOutputChannelEvents(ChannelId); return true; - } + } void TEventOutputChannel::DropConfirmed(ui64 confirm) { LOG_DEBUG_IC_SESSION("ICOCH98", "Dropping confirmed messages"); for (auto it = NotYetConfirmed.begin(); it != NotYetConfirmed.end() && it->Serial <= confirm; ) { Pool.Release(NotYetConfirmed, it++); - } + } } bool TEventOutputChannel::FeedBuf(TTcpPacketOutTask& task, ui64 serial, ui64 *weightConsumed) { @@ -147,7 +147,7 @@ namespace NActors { NotYetConfirmed.splice(NotYetConfirmed.end(), Queue, Queue.begin()); // move event to not-yet-confirmed queue State = EState::INITIAL; return true; // we have processed whole event, signal to the caller - } + } } } @@ -169,7 +169,7 @@ namespace NActors { Pool.Release(NotYetConfirmed); for (auto& item : Queue) { item.ForwardOnNondelivery(false); - } + } Pool.Release(Queue); } diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h index e4a4c89ba5..e4a0ae3cda 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.h +++ b/library/cpp/actors/interconnect/interconnect_channel.h @@ -18,18 +18,18 @@ namespace NActors { #pragma pack(push, 1) - struct TChannelPart { - ui16 Channel; - ui16 Size; + struct TChannelPart { + ui16 Channel; + ui16 Size; - static constexpr ui16 LastPartFlag = ui16(1) << 15; + static constexpr ui16 LastPartFlag = ui16(1) << 15; TString ToString() const { return TStringBuilder() << "{Channel# " << (Channel & ~LastPartFlag) << " LastPartFlag# " << ((Channel & LastPartFlag) ? "true" : "false") << " Size# " << Size << "}"; } - }; + }; #pragma pack(pop) struct TExSerializedEventTooLarge : std::exception { @@ -41,13 +41,13 @@ namespace NActors { }; class TEventOutputChannel : public TInterconnectLoggingBase { - public: + public: TEventOutputChannel(TEventHolderPool& pool, ui16 id, ui32 peerNodeId, ui32 maxSerializedEventSize, std::shared_ptr<IInterconnectMetrics> metrics, TSessionParams params) : TInterconnectLoggingBase(Sprintf("OutputChannel %" PRIu16 " [node %" PRIu32 "]", id, peerNodeId)) - , Pool(pool) + , Pool(pool) , PeerNodeId(peerNodeId) - , ChannelId(id) + , ChannelId(id) , Metrics(std::move(metrics)) , Params(std::move(params)) , MaxSerializedEventSize(maxSerializedEventSize) @@ -61,33 +61,33 @@ namespace NActors { const ui32 bytes = event.Fill(ev) + sizeof(TEventDescr); OutputQueueSize += bytes; return std::make_pair(bytes, &event); - } + } void DropConfirmed(ui64 confirm); bool FeedBuf(TTcpPacketOutTask& task, ui64 serial, ui64 *weightConsumed); - bool IsEmpty() const { + bool IsEmpty() const { return Queue.empty(); - } + } bool IsWorking() const { return !IsEmpty(); } - ui32 GetQueueSize() const { - return (ui32)Queue.size(); - } + ui32 GetQueueSize() const { + return (ui32)Queue.size(); + } ui64 GetBufferedAmountOfData() const { - return OutputQueueSize; - } + return OutputQueueSize; + } void NotifyUndelivered(); - TEventHolderPool& Pool; + TEventHolderPool& Pool; const ui32 PeerNodeId; - const ui16 ChannelId; + const ui16 ChannelId; std::shared_ptr<IInterconnectMetrics> Metrics; const TSessionParams Params; const ui32 MaxSerializedEventSize; @@ -105,7 +105,7 @@ namespace NActors { static constexpr ui16 MinimumFreeSpace = sizeof(TChannelPart) + sizeof(TEventDescr); - protected: + protected: ui64 OutputQueueSize = 0; std::list<TEventHolder> Queue; @@ -120,8 +120,8 @@ namespace NActors { if (const ui64 amount = std::exchange(UnaccountedTraffic, 0)) { Metrics->UpdateOutputChannelTraffic(ChannelId, amount); } - } + } - friend class TInterconnectSessionTCP; - }; + friend class TInterconnectSessionTCP; + }; } diff --git a/library/cpp/actors/interconnect/interconnect_common.h b/library/cpp/actors/interconnect/interconnect_common.h index 28c40f46ff..285709a00c 100644 --- a/library/cpp/actors/interconnect/interconnect_common.h +++ b/library/cpp/actors/interconnect/interconnect_common.h @@ -23,17 +23,17 @@ namespace NActors { }; struct TInterconnectSettings { - TDuration Handshake; - TDuration DeadPeer; - TDuration CloseOnIdle; - ui32 SendBufferDieLimitInMB = 0; - ui64 OutputBuffersTotalSizeLimitInMB = 0; - ui32 TotalInflightAmountOfData = 0; - bool MergePerPeerCounters = false; + TDuration Handshake; + TDuration DeadPeer; + TDuration CloseOnIdle; + ui32 SendBufferDieLimitInMB = 0; + ui64 OutputBuffersTotalSizeLimitInMB = 0; + ui32 TotalInflightAmountOfData = 0; + bool MergePerPeerCounters = false; bool MergePerDataCenterCounters = false; - ui32 TCPSocketBufferSize = 0; - TDuration PingPeriod = TDuration::Seconds(3); - TDuration ForceConfirmPeriod = TDuration::Seconds(1); + ui32 TCPSocketBufferSize = 0; + TDuration PingPeriod = TDuration::Seconds(3); + TDuration ForceConfirmPeriod = TDuration::Seconds(1); TDuration LostConnection; TDuration BatchPeriod; bool BindOnAllAddresses = true; @@ -54,40 +54,40 @@ namespace NActors { } return res; } - }; + }; - struct TChannelSettings { - ui16 Weight; - }; + struct TChannelSettings { + ui16 Weight; + }; - typedef TMap<ui16, TChannelSettings> TChannelsConfig; + typedef TMap<ui16, TChannelSettings> TChannelsConfig; - using TRegisterMonPageCallback = std::function<void(const TString& path, const TString& title, + using TRegisterMonPageCallback = std::function<void(const TString& path, const TString& title, TActorSystem* actorSystem, const TActorId& actorId)>; using TInitWhiteboardCallback = std::function<void(ui16 icPort, TActorSystem* actorSystem)>; using TUpdateWhiteboardCallback = std::function<void(const TString& peer, bool connected, bool green, bool yellow, - bool orange, bool red, TActorSystem* actorSystem)>; + bool orange, bool red, TActorSystem* actorSystem)>; - struct TInterconnectProxyCommon : TAtomicRefCount<TInterconnectProxyCommon> { + struct TInterconnectProxyCommon : TAtomicRefCount<TInterconnectProxyCommon> { TActorId NameserviceId; - NMonitoring::TDynamicCounterPtr MonCounters; + NMonitoring::TDynamicCounterPtr MonCounters; std::shared_ptr<NMonitoring::IMetricRegistry> Metrics; TChannelsConfig ChannelsConfig; TInterconnectSettings Settings; - TRegisterMonPageCallback RegisterMonPage; + TRegisterMonPageCallback RegisterMonPage; TActorId DestructorId; std::shared_ptr<std::atomic<TAtomicBase>> DestructorQueueSize; TAtomicBase MaxDestructorQueueSize = 1024 * 1024 * 1024; - TString ClusterUUID; - TVector<TString> AcceptUUID; + TString ClusterUUID; + TVector<TString> AcceptUUID; ui64 StartTime = GetCycleCountFast(); - TString TechnicalSelfHostName; + TString TechnicalSelfHostName; TInitWhiteboardCallback InitWhiteboard; - TUpdateWhiteboardCallback UpdateWhiteboard; - ui32 HandshakeBallastSize = 0; - TAtomic StartedSessionKiller = 0; + TUpdateWhiteboardCallback UpdateWhiteboard; + ui32 HandshakeBallastSize = 0; + TAtomic StartedSessionKiller = 0; TScopeId LocalScopeId; std::shared_ptr<TEventFilter> EventFilter; TString Cookie; // unique random identifier of a node instance (generated randomly at every start) @@ -101,6 +101,6 @@ namespace NActors { TMaybe<TVersionInfo> VersionInfo; using TPtr = TIntrusivePtr<TInterconnectProxyCommon>; - }; + }; -} +} diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp index 705d6f90af..9ede998d8e 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.cpp +++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp @@ -62,15 +62,15 @@ namespace NActors { ui32 Checksum; ui32 Size; - ui32 CalculateChecksum(const void* data, size_t len) const { + ui32 CalculateChecksum(const void* data, size_t len) const { return Crc32cExtendMSanCompatible(Crc32cExtendMSanCompatible(0, &Size, sizeof(Size)), data, len); } - void Sign(const void* data, size_t len) { + void Sign(const void* data, size_t len) { Checksum = CalculateChecksum(data, len); } - bool Check(const void* data, size_t len) const { + bool Check(const void* data, size_t len) const { return Checksum == CalculateChecksum(data, len); } }; @@ -474,7 +474,7 @@ namespace NActors { if (const ui32 size = Common->HandshakeBallastSize) { TString ballast(size, 0); - char* data = ballast.Detach(); + char* data = ballast.Detach(); for (ui32 i = 0; i < size; ++i) { data[i] = i; } @@ -721,7 +721,7 @@ namespace NActors { } } - template <typename T> + template <typename T> void SendExBlock(const T& proto, const char* what) { TString data; Y_PROTOBUF_SUPPRESS_NODISCARD proto.SerializeToString(&data); @@ -760,7 +760,7 @@ namespace NActors { Send(GetActorSystem()->InterconnectProxy(PeerNodeId), ev.Release()); } - template <typename TEvent> + template <typename TEvent> THolder<typename TEvent::THandle> WaitForSpecificEvent(TString state, TInstant deadline = TInstant::Max()) { State = std::move(state); return TActorCoroImpl::WaitForSpecificEvent<TEvent>(deadline); @@ -778,7 +778,7 @@ namespace NActors { return WaitForSpecificEvent<TEvent>(std::move(state)); } - template <typename T1, typename T2, typename... TOther> + template <typename T1, typename T2, typename... TOther> THolder<IEventHandle> AskProxy(THolder<IEventBase> ev, TString state) { SendToProxy(std::move(ev)); return WaitForSpecificEvent<T1, T2, TOther...>(std::move(state)); @@ -930,7 +930,7 @@ namespace NActors { template <typename TDataPtr, typename TSendRecvFunc> void Process(TDataPtr buffer, size_t len, TSendRecvFunc&& sendRecv, bool read, bool write, TString state) { Y_VERIFY(Socket); - NInterconnect::TStreamSocket* sock = Socket.Get(); + NInterconnect::TStreamSocket* sock = Socket.Get(); ssize_t (NInterconnect::TStreamSocket::*pfn)(TDataPtr, size_t, TString*) const = sendRecv; size_t processed = 0; @@ -971,7 +971,7 @@ namespace NActors { return std::move(response->Get()->Node); } - template <typename T> + template <typename T> static THolder<TProgramInfo> GetProgramInfo(const T& proto) { auto programInfo = MakeHolder<TProgramInfo>(); programInfo->PID = proto.GetProgramPID(); diff --git a/library/cpp/actors/interconnect/interconnect_handshake.h b/library/cpp/actors/interconnect/interconnect_handshake.h index 1b957570ad..b3c0db6c5d 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.h +++ b/library/cpp/actors/interconnect/interconnect_handshake.h @@ -10,10 +10,10 @@ #include "events_local.h" namespace NActors { - static constexpr TDuration DEFAULT_HANDSHAKE_TIMEOUT = TDuration::Seconds(1); - static constexpr ui64 INTERCONNECT_PROTOCOL_VERSION = 2; + static constexpr TDuration DEFAULT_HANDSHAKE_TIMEOUT = TDuration::Seconds(1); + static constexpr ui64 INTERCONNECT_PROTOCOL_VERSION = 2; - using TSocketPtr = TIntrusivePtr<NInterconnect::TStreamSocket>; + using TSocketPtr = TIntrusivePtr<NInterconnect::TStreamSocket>; IActor* CreateOutgoingHandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, const TActorId& peer, ui32 nodeId, ui64 nextPacket, TString peerHostName, diff --git a/library/cpp/actors/interconnect/interconnect_impl.h b/library/cpp/actors/interconnect/interconnect_impl.h index 716bef412e..ee29e4d397 100644 --- a/library/cpp/actors/interconnect/interconnect_impl.h +++ b/library/cpp/actors/interconnect/interconnect_impl.h @@ -1,5 +1,5 @@ #pragma once - + #include "interconnect.h" #include <library/cpp/actors/protos/interconnect.pb.h> #include <library/cpp/actors/core/event_pb.h> @@ -7,39 +7,39 @@ #include <library/cpp/monlib/dynamic_counters/counters.h> namespace NActors { - // resolve node info - struct TEvInterconnect::TEvResolveNode: public TEventPB<TEvInterconnect::TEvResolveNode, NActorsInterconnect::TEvResolveNode, TEvInterconnect::EvResolveNode> { - TEvResolveNode() { - } + // resolve node info + struct TEvInterconnect::TEvResolveNode: public TEventPB<TEvInterconnect::TEvResolveNode, NActorsInterconnect::TEvResolveNode, TEvInterconnect::EvResolveNode> { + TEvResolveNode() { + } TEvResolveNode(ui32 nodeId, TInstant deadline = TInstant::Max()) { - Record.SetNodeId(nodeId); + Record.SetNodeId(nodeId); if (deadline != TInstant::Max()) { Record.SetDeadline(deadline.GetValue()); } - } - }; - - // node info - struct TEvInterconnect::TEvNodeAddress: public TEventPB<TEvInterconnect::TEvNodeAddress, NActorsInterconnect::TEvNodeInfo, TEvInterconnect::EvNodeAddress> { - TEvNodeAddress() { - } - - TEvNodeAddress(ui32 nodeId) { - Record.SetNodeId(nodeId); - } - }; - - // register node - struct TEvInterconnect::TEvRegisterNode: public TEventBase<TEvInterconnect::TEvRegisterNode, TEvInterconnect::EvRegisterNode> { - }; - - // reply on register node - struct TEvInterconnect::TEvRegisterNodeResult: public TEventBase<TEvInterconnect::TEvRegisterNodeResult, TEvInterconnect::EvRegisterNodeResult> { - }; - - // disconnect - struct TEvInterconnect::TEvDisconnect: public TEventLocal<TEvInterconnect::TEvDisconnect, TEvInterconnect::EvDisconnect> { - }; + } + }; + + // node info + struct TEvInterconnect::TEvNodeAddress: public TEventPB<TEvInterconnect::TEvNodeAddress, NActorsInterconnect::TEvNodeInfo, TEvInterconnect::EvNodeAddress> { + TEvNodeAddress() { + } + + TEvNodeAddress(ui32 nodeId) { + Record.SetNodeId(nodeId); + } + }; + + // register node + struct TEvInterconnect::TEvRegisterNode: public TEventBase<TEvInterconnect::TEvRegisterNode, TEvInterconnect::EvRegisterNode> { + }; + + // reply on register node + struct TEvInterconnect::TEvRegisterNodeResult: public TEventBase<TEvInterconnect::TEvRegisterNodeResult, TEvInterconnect::EvRegisterNodeResult> { + }; + + // disconnect + struct TEvInterconnect::TEvDisconnect: public TEventLocal<TEvInterconnect::TEvDisconnect, TEvInterconnect::EvDisconnect> { + }; } diff --git a/library/cpp/actors/interconnect/interconnect_nameserver_table.cpp b/library/cpp/actors/interconnect/interconnect_nameserver_table.cpp index a2071bbc80..43419bf70d 100644 --- a/library/cpp/actors/interconnect/interconnect_nameserver_table.cpp +++ b/library/cpp/actors/interconnect/interconnect_nameserver_table.cpp @@ -10,57 +10,57 @@ namespace NActors { class TInterconnectNameserverTable: public TInterconnectNameserverBase<TInterconnectNameserverTable> { - TIntrusivePtr<TTableNameserverSetup> Config; + TIntrusivePtr<TTableNameserverSetup> Config; - public: - static constexpr EActivityType ActorActivityType() { + public: + static constexpr EActivityType ActorActivityType() { return NAMESERVICE; - } + } TInterconnectNameserverTable(const TIntrusivePtr<TTableNameserverSetup>& setup, ui32 /*resolvePoolId*/) : TInterconnectNameserverBase<TInterconnectNameserverTable>(&TInterconnectNameserverTable::StateFunc, setup->StaticNodeTable) - , Config(setup) - { - Y_VERIFY(Config->IsEntriesUnique()); - } + , Config(setup) + { + Y_VERIFY(Config->IsEntriesUnique()); + } - STFUNC(StateFunc) { - try { + STFUNC(StateFunc) { + try { switch (ev->GetTypeRewrite()) { HFunc(TEvInterconnect::TEvResolveNode, Handle); HFunc(TEvResolveAddress, Handle); HFunc(TEvInterconnect::TEvListNodes, Handle); HFunc(TEvInterconnect::TEvGetNode, Handle); - } - } catch (...) { - // on error - do nothing + } + } catch (...) { + // on error - do nothing } } - }; + }; IActor* CreateNameserverTable(const TIntrusivePtr<TTableNameserverSetup>& setup, ui32 poolId) { - return new TInterconnectNameserverTable(setup, poolId); + return new TInterconnectNameserverTable(setup, poolId); } - bool TTableNameserverSetup::IsEntriesUnique() const { - TVector<const TNodeInfo*> infos; - infos.reserve(StaticNodeTable.size()); - for (const auto& x : StaticNodeTable) - infos.push_back(&x.second); + bool TTableNameserverSetup::IsEntriesUnique() const { + TVector<const TNodeInfo*> infos; + infos.reserve(StaticNodeTable.size()); + for (const auto& x : StaticNodeTable) + infos.push_back(&x.second); auto CompareAddressLambda = - [](const TNodeInfo* left, const TNodeInfo* right) { - return left->Port == right->Port ? left->Address < right->Address : left->Port < right->Port; - }; + [](const TNodeInfo* left, const TNodeInfo* right) { + return left->Port == right->Port ? left->Address < right->Address : left->Port < right->Port; + }; Sort(infos, CompareAddressLambda); - for (ui32 idx = 1, end = StaticNodeTable.size(); idx < end; ++idx) { - const TNodeInfo* left = infos[idx - 1]; - const TNodeInfo* right = infos[idx]; + for (ui32 idx = 1, end = StaticNodeTable.size(); idx < end; ++idx) { + const TNodeInfo* left = infos[idx - 1]; + const TNodeInfo* right = infos[idx]; if (left->Address && left->Address == right->Address && left->Port == right->Port) - return false; - } + return false; + } auto CompareHostLambda = [](const TNodeInfo* left, const TNodeInfo* right) { @@ -76,8 +76,8 @@ namespace NActors { return false; } - return true; - } + return true; + } TActorId GetNameserviceActorId() { return TActorId(0, "namesvc"); diff --git a/library/cpp/actors/interconnect/interconnect_stream.cpp b/library/cpp/actors/interconnect/interconnect_stream.cpp index 52e4491cec..158ebc9e1d 100644 --- a/library/cpp/actors/interconnect/interconnect_stream.cpp +++ b/library/cpp/actors/interconnect/interconnect_stream.cpp @@ -23,74 +23,74 @@ #include <cerrno> namespace NInterconnect { - namespace { - inline int - LastSocketError() { + namespace { + inline int + LastSocketError() { #if defined(_win_) - return WSAGetLastError(); + return WSAGetLastError(); #else - return errno; + return errno; #endif - } - } - - TSocket::TSocket(SOCKET fd) - : Descriptor(fd) - { - } - - TSocket::~TSocket() { - if (Descriptor == INVALID_SOCKET) { - return; - } - - auto const result = ::closesocket(Descriptor); - if (result == 0) - return; - switch (LastSocketError()) { - case EBADF: - Y_FAIL("Close bad descriptor"); - case EINTR: - break; - case EIO: - Y_FAIL("EIO"); - default: - Y_FAIL("It's something unexpected"); - } - } - - int TSocket::GetDescriptor() { - return Descriptor; - } - - int - TSocket::Bind(const TAddress& addr) const { - const auto ret = ::bind(Descriptor, addr.SockAddr(), addr.Size()); - if (ret < 0) - return -LastSocketError(); - - return 0; - } - - int - TSocket::Shutdown(int how) const { - const auto ret = ::shutdown(Descriptor, how); - if (ret < 0) - return -LastSocketError(); - - return 0; - } - - int TSocket::GetConnectStatus() const { - int err = 0; - socklen_t len = sizeof(err); - if (getsockopt(Descriptor, SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&err), &len) == -1) { - err = LastSocketError(); - } - return err; - } - - ///////////////////////////////////////////////////////////////// + } + } + + TSocket::TSocket(SOCKET fd) + : Descriptor(fd) + { + } + + TSocket::~TSocket() { + if (Descriptor == INVALID_SOCKET) { + return; + } + + auto const result = ::closesocket(Descriptor); + if (result == 0) + return; + switch (LastSocketError()) { + case EBADF: + Y_FAIL("Close bad descriptor"); + case EINTR: + break; + case EIO: + Y_FAIL("EIO"); + default: + Y_FAIL("It's something unexpected"); + } + } + + int TSocket::GetDescriptor() { + return Descriptor; + } + + int + TSocket::Bind(const TAddress& addr) const { + const auto ret = ::bind(Descriptor, addr.SockAddr(), addr.Size()); + if (ret < 0) + return -LastSocketError(); + + return 0; + } + + int + TSocket::Shutdown(int how) const { + const auto ret = ::shutdown(Descriptor, how); + if (ret < 0) + return -LastSocketError(); + + return 0; + } + + int TSocket::GetConnectStatus() const { + int err = 0; + socklen_t len = sizeof(err); + if (getsockopt(Descriptor, SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&err), &len) == -1) { + err = LastSocketError(); + } + return err; + } + + ///////////////////////////////////////////////////////////////// TIntrusivePtr<TStreamSocket> TStreamSocket::Make(int domain) { const SOCKET res = ::socket(domain, SOCK_STREAM | SOCK_NONBLOCK, 0); @@ -99,106 +99,106 @@ namespace NInterconnect { Y_VERIFY(err != EMFILE && err != ENFILE); } return MakeIntrusive<TStreamSocket>(res); - } + } - TStreamSocket::TStreamSocket(SOCKET fd) - : TSocket(fd) - { - } + TStreamSocket::TStreamSocket(SOCKET fd) + : TSocket(fd) + { + } - ssize_t + ssize_t TStreamSocket::Send(const void* msg, size_t len, TString* /*err*/) const { const auto ret = ::send(Descriptor, static_cast<const char*>(msg), int(len), 0); - if (ret < 0) - return -LastSocketError(); + if (ret < 0) + return -LastSocketError(); - return ret; - } + return ret; + } - ssize_t + ssize_t TStreamSocket::Recv(void* buf, size_t len, TString* /*err*/) const { const auto ret = ::recv(Descriptor, static_cast<char*>(buf), int(len), 0); - if (ret < 0) - return -LastSocketError(); + if (ret < 0) + return -LastSocketError(); - return ret; - } + return ret; + } - ssize_t - TStreamSocket::WriteV(const struct iovec* iov, int iovcnt) const { + ssize_t + TStreamSocket::WriteV(const struct iovec* iov, int iovcnt) const { #ifndef _win_ - const auto ret = ::writev(Descriptor, iov, iovcnt); - if (ret < 0) - return -LastSocketError(); - return ret; + const auto ret = ::writev(Descriptor, iov, iovcnt); + if (ret < 0) + return -LastSocketError(); + return ret; #else Y_FAIL("WriteV() unsupported on Windows"); #endif - } + } - ssize_t - TStreamSocket::ReadV(const struct iovec* iov, int iovcnt) const { + ssize_t + TStreamSocket::ReadV(const struct iovec* iov, int iovcnt) const { #ifndef _win_ - const auto ret = ::readv(Descriptor, iov, iovcnt); - if (ret < 0) - return -LastSocketError(); - return ret; + const auto ret = ::readv(Descriptor, iov, iovcnt); + if (ret < 0) + return -LastSocketError(); + return ret; #else Y_FAIL("ReadV() unsupported on Windows"); #endif - } + } - ssize_t TStreamSocket::GetUnsentQueueSize() const { - int num = -1; + ssize_t TStreamSocket::GetUnsentQueueSize() const { + int num = -1; #ifndef _win_ // we have no means to determine output queue size on Windows - if (ioctl(Descriptor, TIOCOUTQ, &num) == -1) { - num = -1; - } -#endif - return num; - } - - int - TStreamSocket::Connect(const TAddress& addr) const { - const auto ret = ::connect(Descriptor, addr.SockAddr(), addr.Size()); - if (ret < 0) - return -LastSocketError(); - - return ret; - } - - int - TStreamSocket::Connect(const NAddr::IRemoteAddr* addr) const { - const auto ret = ::connect(Descriptor, addr->Addr(), addr->Len()); - if (ret < 0) - return -LastSocketError(); - - return ret; - } - - int - TStreamSocket::Listen(int backlog) const { - const auto ret = ::listen(Descriptor, backlog); - if (ret < 0) - return -LastSocketError(); - - return ret; - } - - int - TStreamSocket::Accept(TAddress& acceptedAddr) const { - socklen_t acceptedSize = sizeof(::sockaddr_in6); - const auto ret = ::accept(Descriptor, acceptedAddr.SockAddr(), &acceptedSize); - if (ret == INVALID_SOCKET) - return -LastSocketError(); - - return ret; - } - - void - TStreamSocket::SetSendBufferSize(i32 len) const { - (void)SetSockOpt(Descriptor, SOL_SOCKET, SO_SNDBUF, len); - } + if (ioctl(Descriptor, TIOCOUTQ, &num) == -1) { + num = -1; + } +#endif + return num; + } + + int + TStreamSocket::Connect(const TAddress& addr) const { + const auto ret = ::connect(Descriptor, addr.SockAddr(), addr.Size()); + if (ret < 0) + return -LastSocketError(); + + return ret; + } + + int + TStreamSocket::Connect(const NAddr::IRemoteAddr* addr) const { + const auto ret = ::connect(Descriptor, addr->Addr(), addr->Len()); + if (ret < 0) + return -LastSocketError(); + + return ret; + } + + int + TStreamSocket::Listen(int backlog) const { + const auto ret = ::listen(Descriptor, backlog); + if (ret < 0) + return -LastSocketError(); + + return ret; + } + + int + TStreamSocket::Accept(TAddress& acceptedAddr) const { + socklen_t acceptedSize = sizeof(::sockaddr_in6); + const auto ret = ::accept(Descriptor, acceptedAddr.SockAddr(), &acceptedSize); + if (ret == INVALID_SOCKET) + return -LastSocketError(); + + return ret; + } + + void + TStreamSocket::SetSendBufferSize(i32 len) const { + (void)SetSockOpt(Descriptor, SOL_SOCKET, SO_SNDBUF, len); + } ui32 TStreamSocket::GetSendBufferSize() const { ui32 res = 0; @@ -206,7 +206,7 @@ namespace NInterconnect { return res; } - ////////////////////////////////////////////////////// + ////////////////////////////////////////////////////// TDatagramSocket::TPtr TDatagramSocket::Make(int domain) { const SOCKET res = ::socket(domain, SOCK_DGRAM, 0); @@ -215,31 +215,31 @@ namespace NInterconnect { Y_VERIFY(err != EMFILE && err != ENFILE); } return std::make_shared<TDatagramSocket>(res); - } - - TDatagramSocket::TDatagramSocket(SOCKET fd) - : TSocket(fd) - { - } - - ssize_t - TDatagramSocket::SendTo(const void* msg, size_t len, const TAddress& toAddr) const { - const auto ret = ::sendto(Descriptor, static_cast<const char*>(msg), int(len), 0, toAddr.SockAddr(), toAddr.Size()); - if (ret < 0) - return -LastSocketError(); - - return ret; - } - - ssize_t - TDatagramSocket::RecvFrom(void* buf, size_t len, TAddress& fromAddr) const { - socklen_t fromSize = sizeof(::sockaddr_in6); - const auto ret = ::recvfrom(Descriptor, static_cast<char*>(buf), int(len), 0, fromAddr.SockAddr(), &fromSize); - if (ret < 0) - return -LastSocketError(); - - return ret; - } + } + + TDatagramSocket::TDatagramSocket(SOCKET fd) + : TSocket(fd) + { + } + + ssize_t + TDatagramSocket::SendTo(const void* msg, size_t len, const TAddress& toAddr) const { + const auto ret = ::sendto(Descriptor, static_cast<const char*>(msg), int(len), 0, toAddr.SockAddr(), toAddr.Size()); + if (ret < 0) + return -LastSocketError(); + + return ret; + } + + ssize_t + TDatagramSocket::RecvFrom(void* buf, size_t len, TAddress& fromAddr) const { + socklen_t fromSize = sizeof(::sockaddr_in6); + const auto ret = ::recvfrom(Descriptor, static_cast<char*>(buf), int(len), 0, fromAddr.SockAddr(), &fromSize); + if (ret < 0) + return -LastSocketError(); + + return ret; + } // deleter for SSL objects diff --git a/library/cpp/actors/interconnect/interconnect_stream.h b/library/cpp/actors/interconnect/interconnect_stream.h index af7176b5f0..074adc6e74 100644 --- a/library/cpp/actors/interconnect/interconnect_stream.h +++ b/library/cpp/actors/interconnect/interconnect_stream.h @@ -15,15 +15,15 @@ #include <sys/uio.h> namespace NInterconnect { - class TSocket: public NActors::TSharedDescriptor, public TNonCopyable { - protected: - TSocket(SOCKET fd); + class TSocket: public NActors::TSharedDescriptor, public TNonCopyable { + protected: + TSocket(SOCKET fd); - virtual ~TSocket() override; + virtual ~TSocket() override; SOCKET Descriptor; - virtual int GetDescriptor() override; + virtual int GetDescriptor() override; private: friend class TSecureSocket; @@ -32,21 +32,21 @@ namespace NInterconnect { return std::exchange(Descriptor, INVALID_SOCKET); } - public: - operator SOCKET() const { - return Descriptor; - } + public: + operator SOCKET() const { + return Descriptor; + } - int Bind(const TAddress& addr) const; - int Shutdown(int how) const; - int GetConnectStatus() const; - }; + int Bind(const TAddress& addr) const; + int Shutdown(int how) const; + int GetConnectStatus() const; + }; - class TStreamSocket: public TSocket { - public: - TStreamSocket(SOCKET fd); + class TStreamSocket: public TSocket { + public: + TStreamSocket(SOCKET fd); - static TIntrusivePtr<TStreamSocket> Make(int domain); + static TIntrusivePtr<TStreamSocket> Make(int domain); virtual ssize_t Send(const void* msg, size_t len, TString *err = nullptr) const; virtual ssize_t Recv(void* buf, size_t len, TString *err = nullptr) const; @@ -54,16 +54,16 @@ namespace NInterconnect { virtual ssize_t WriteV(const struct iovec* iov, int iovcnt) const; virtual ssize_t ReadV(const struct iovec* iov, int iovcnt) const; - int Connect(const TAddress& addr) const; - int Connect(const NAddr::IRemoteAddr* addr) const; - int Listen(int backlog) const; - int Accept(TAddress& acceptedAddr) const; + int Connect(const TAddress& addr) const; + int Connect(const NAddr::IRemoteAddr* addr) const; + int Listen(int backlog) const; + int Accept(TAddress& acceptedAddr) const; - ssize_t GetUnsentQueueSize() const; + ssize_t GetUnsentQueueSize() const; - void SetSendBufferSize(i32 len) const; + void SetSendBufferSize(i32 len) const; ui32 GetSendBufferSize() const; - }; + }; class TSecureSocketContext { class TImpl; @@ -116,16 +116,16 @@ namespace NInterconnect { bool WantWrite() const; }; - class TDatagramSocket: public TSocket { - public: - typedef std::shared_ptr<TDatagramSocket> TPtr; + class TDatagramSocket: public TSocket { + public: + typedef std::shared_ptr<TDatagramSocket> TPtr; - TDatagramSocket(SOCKET fd); + TDatagramSocket(SOCKET fd); - static TPtr Make(int domain); + static TPtr Make(int domain); - ssize_t SendTo(const void* msg, size_t len, const TAddress& toAddr) const; - ssize_t RecvFrom(void* buf, size_t len, TAddress& fromAddr) const; - }; + ssize_t SendTo(const void* msg, size_t len, const TAddress& toAddr) const; + ssize_t RecvFrom(void* buf, size_t len, TAddress& fromAddr) const; + }; } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index 19a1ed3f57..0abe9fe659 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -339,7 +339,7 @@ namespace NActors { PreallocateBuffers(); TStackVec<TIoVec, NumPreallocatedBuffers> buffs; - for (const auto& item : Buffers) { + for (const auto& item : Buffers) { TIoVec iov{item->GetBuffer(), item->GetCapacity()}; buffs.push_back(iov); if (Params.Encryption) { @@ -347,7 +347,7 @@ namespace NActors { } } - const struct iovec* iovec = reinterpret_cast<const struct iovec*>(buffs.data()); + const struct iovec* iovec = reinterpret_cast<const struct iovec*>(buffs.data()); int iovcnt = buffs.size(); ssize_t recvres = 0; @@ -473,4 +473,4 @@ namespace NActors { } -} +} diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp index 65c8ae6fa5..7e2d8ccb94 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp @@ -7,17 +7,17 @@ #include <util/system/getpid.h> namespace NActors { - static constexpr TDuration GetNodeRequestTimeout = TDuration::Seconds(5); + static constexpr TDuration GetNodeRequestTimeout = TDuration::Seconds(5); - static constexpr TDuration FirstErrorSleep = TDuration::MilliSeconds(10); - static constexpr TDuration MaxErrorSleep = TDuration::Seconds(10); - static constexpr ui32 SleepRetryMultiplier = 4; + static constexpr TDuration FirstErrorSleep = TDuration::MilliSeconds(10); + static constexpr TDuration MaxErrorSleep = TDuration::Seconds(10); + static constexpr ui32 SleepRetryMultiplier = 4; - static TString PeerNameForHuman(ui32 nodeNum, const TString& longName, ui16 port) { - TStringBuf token; - TStringBuf(longName).NextTok('.', token); - return ToString<ui32>(nodeNum) + ":" + (token.size() > 0 ? TString(token) : longName) + ":" + ToString<ui16>(port); - } + static TString PeerNameForHuman(ui32 nodeNum, const TString& longName, ui16 port) { + TStringBuf token; + TStringBuf(longName).NextTok('.', token); + return ToString<ui32>(nodeNum) + ":" + (token.size() > 0 ? TString(token) : longName) + ":" + ToString<ui16>(port); + } TInterconnectProxyTCP::TInterconnectProxyTCP(const ui32 node, TInterconnectProxyCommon::TPtr common, IActor **dynamicPtr) @@ -27,14 +27,14 @@ namespace NActors { , Common(std::move(common)) , SecureContext(new NInterconnect::TSecureSocketContext(Common->Settings.Certificate, Common->Settings.PrivateKey, Common->Settings.CaFilePath, Common->Settings.CipherList)) - { + { Y_VERIFY(Common); Y_VERIFY(Common->NameserviceId); if (DynamicPtr) { Y_VERIFY(!*DynamicPtr); *DynamicPtr = this; } - } + } void TInterconnectProxyTCP::Bootstrap() { SetPrefix(Sprintf("Proxy %s [node %" PRIu32 "]", SelfId().ToString().data(), PeerNodeId)); @@ -54,20 +54,20 @@ namespace NActors { TString path = Sprintf("peer%04" PRIu32, PeerNodeId); TString title = Sprintf("Peer #%04" PRIu32, PeerNodeId); mon(path, title, sys, SelfId()); - } + } } - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // PendingActivation - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // PendingActivation + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// void TInterconnectProxyTCP::RequestNodeInfo(STATEFN_SIG) { ICPROXY_PROFILED; - Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor && !PendingIncomingHandshakeEvents && !PendingSessionEvents); + Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor && !PendingIncomingHandshakeEvents && !PendingSessionEvents); EnqueueSessionEvent(ev); StartConfiguring(); - } + } void TInterconnectProxyTCP::RequestNodeInfoForIncomingHandshake(STATEFN_SIG) { ICPROXY_PROFILED; @@ -77,37 +77,37 @@ namespace NActors { EnqueueIncomingHandshakeEvent(ev); StartConfiguring(); } - } + } - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // PendingNodeInfo - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // PendingNodeInfo + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// void TInterconnectProxyTCP::StartConfiguring() { ICPROXY_PROFILED; - Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor); + Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor); - // issue node info request + // issue node info request Send(Common->NameserviceId, new TEvInterconnect::TEvGetNode(PeerNodeId)); - // arm configure timer; store pointer to event to ensure that we will handle correct one if there were any other - // wakeup events in flight + // arm configure timer; store pointer to event to ensure that we will handle correct one if there were any other + // wakeup events in flight SwitchToState(__LINE__, "PendingNodeInfo", &TThis::PendingNodeInfo, GetNodeRequestTimeout, - ConfigureTimeoutCookie = new TEvents::TEvWakeup); - } + ConfigureTimeoutCookie = new TEvents::TEvWakeup); + } void TInterconnectProxyTCP::Configure(TEvInterconnect::TEvNodeInfo::TPtr& ev) { ICPROXY_PROFILED; Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor && !Session); - if (!ev->Get()->Node) { + if (!ev->Get()->Node) { TransitToErrorState("cannot get node info"); - } else { - auto& info = *ev->Get()->Node; + } else { + auto& info = *ev->Get()->Node; TString name = PeerNameForHuman(PeerNodeId, info.Host, info.Port); - TechnicalPeerHostName = info.Host; + TechnicalPeerHostName = info.Host; if (!Metrics) { Metrics = Common->Metrics ? CreateInterconnectMetrics(Common) : CreateInterconnectCounters(Common); } @@ -122,9 +122,9 @@ namespace NActors { void TInterconnectProxyTCP::ConfigureTimeout(TEvents::TEvWakeup::TPtr& ev) { ICPROXY_PROFILED; - if (ev->Get() == ConfigureTimeoutCookie) { + if (ev->Get() == ConfigureTimeoutCookie) { TransitToErrorState("timed out while waiting for node info"); - } + } } void TInterconnectProxyTCP::ProcessConfigured() { @@ -152,72 +152,72 @@ namespace NActors { void TInterconnectProxyTCP::StartInitialHandshake() { ICPROXY_PROFILED; - // since we are starting initial handshake for some reason, we'll drop any existing handshakes, if any + // since we are starting initial handshake for some reason, we'll drop any existing handshakes, if any DropHandshakes(); - // create and register handshake actor + // create and register handshake actor OutgoingHandshakeActor = Register(CreateOutgoingHandshakeActor(Common, GenerateSessionVirtualId(), TActorId(), PeerNodeId, 0, TechnicalPeerHostName, TSessionParams()), TMailboxType::ReadAsFilled); OutgoingHandshakeActorCreated = TActivationContext::Now(); // prepare for new handshake PrepareNewSessionHandshake(); - } + } void TInterconnectProxyTCP::StartResumeHandshake(ui64 inputCounter) { ICPROXY_PROFILED; - // drop outgoing handshake if we have one; keep incoming handshakes as they may be useful + // drop outgoing handshake if we have one; keep incoming handshakes as they may be useful DropOutgoingHandshake(); - // ensure that we have session - Y_VERIFY(Session); + // ensure that we have session + Y_VERIFY(Session); - // ensure that we have both virtual ids - Y_VERIFY(SessionVirtualId); - Y_VERIFY(RemoteSessionVirtualId); + // ensure that we have both virtual ids + Y_VERIFY(SessionVirtualId); + Y_VERIFY(RemoteSessionVirtualId); - // create and register handshake actor + // create and register handshake actor OutgoingHandshakeActor = Register(CreateOutgoingHandshakeActor(Common, SessionVirtualId, RemoteSessionVirtualId, PeerNodeId, inputCounter, TechnicalPeerHostName, Session->Params), TMailboxType::ReadAsFilled); OutgoingHandshakeActorCreated = TActivationContext::Now(); - } + } void TInterconnectProxyTCP::IssueIncomingHandshakeReply(const TActorId& handshakeId, ui64 peerLocalId, THolder<IEventBase> event) { ICPROXY_PROFILED; - Y_VERIFY(!IncomingHandshakeActor); - IncomingHandshakeActor = handshakeId; + Y_VERIFY(!IncomingHandshakeActor); + IncomingHandshakeActor = handshakeId; IncomingHandshakeActorFilledIn = TActivationContext::Now(); Y_VERIFY(!LastSerialFromIncomingHandshake || *LastSerialFromIncomingHandshake <= peerLocalId); LastSerialFromIncomingHandshake = peerLocalId; if (OutgoingHandshakeActor && SelfId().NodeId() < PeerNodeId) { - // Both outgoing and incoming handshake are in progress. To prevent race condition during semultanous handshake - // incoming handshake must be held till outgoing handshake is complete or failed + // Both outgoing and incoming handshake are in progress. To prevent race condition during semultanous handshake + // incoming handshake must be held till outgoing handshake is complete or failed LOG_DEBUG_IC("ICP06", "reply for incoming handshake (actor %s) is held", IncomingHandshakeActor.ToString().data()); - HeldHandshakeReply = std::move(event); + HeldHandshakeReply = std::move(event); - // Check that we are in one of acceptable states that would properly handle handshake statuses. - const auto state = CurrentStateFunc(); - Y_VERIFY(state == &TThis::PendingConnection || state == &TThis::StateWork, "invalid handshake request in state# %s", State); - } else { - LOG_DEBUG_IC("ICP07", "issued incoming handshake reply"); + // Check that we are in one of acceptable states that would properly handle handshake statuses. + const auto state = CurrentStateFunc(); + Y_VERIFY(state == &TThis::PendingConnection || state == &TThis::StateWork, "invalid handshake request in state# %s", State); + } else { + LOG_DEBUG_IC("ICP07", "issued incoming handshake reply"); - // No race, so we can send reply immediately. - Y_VERIFY(!HeldHandshakeReply); + // No race, so we can send reply immediately. + Y_VERIFY(!HeldHandshakeReply); Send(IncomingHandshakeActor, event.Release()); - // Start waiting for handshake reply, if not yet started; also, if session is already created, then we don't - // switch from working state. - if (!Session) { + // Start waiting for handshake reply, if not yet started; also, if session is already created, then we don't + // switch from working state. + if (!Session) { LOG_INFO_IC("ICP08", "No active sessions, becoming PendingConnection"); SwitchToState(__LINE__, "PendingConnection", &TThis::PendingConnection); - } else { - Y_VERIFY(CurrentStateFunc() == &TThis::StateWork); - } + } else { + Y_VERIFY(CurrentStateFunc() == &TThis::StateWork); + } } } @@ -243,8 +243,8 @@ namespace NActors { } else { // if we already have incoming handshake, then terminate existing one DropIncomingHandshake(); - - // issue reply to the sender, possibly holding it while outgoing handshake is at race + + // issue reply to the sender, possibly holding it while outgoing handshake is at race THolder<IEventBase> reply = IActor::InvokeOtherActor(*Session, &TInterconnectSessionTCP::ProcessHandshakeRequest, ev); return IssueIncomingHandshakeReply(ev->Sender, RemoteSessionVirtualId.LocalId(), std::move(reply)); } @@ -263,27 +263,27 @@ namespace NActors { ui64 remoteStartTime = record.GetProgramStartTime(); ui64 remoteSerial = record.GetSerial(); - if (RemoteProgramInfo && remotePID == RemoteProgramInfo->PID && remoteStartTime == RemoteProgramInfo->StartTime) { - if (remoteSerial < RemoteProgramInfo->Serial) { + if (RemoteProgramInfo && remotePID == RemoteProgramInfo->PID && remoteStartTime == RemoteProgramInfo->StartTime) { + if (remoteSerial < RemoteProgramInfo->Serial) { LOG_INFO_IC("ICP18", "handshake (actor %s) is too old", ev->Sender.ToString().data()); Send(ev->Sender, new TEvents::TEvPoisonPill); - return; - } else { - RemoteProgramInfo->Serial = remoteSerial; - } + return; + } else { + RemoteProgramInfo->Serial = remoteSerial; + } } else { - const auto ptr = new TProgramInfo; - ptr->PID = remotePID; - ptr->StartTime = remoteStartTime; - ptr->Serial = remoteSerial; - RemoteProgramInfo.Reset(ptr); + const auto ptr = new TProgramInfo; + ptr->PID = remotePID; + ptr->StartTime = remoteStartTime; + ptr->Serial = remoteSerial; + RemoteProgramInfo.Reset(ptr); } - /* Let's check peer technical hostname */ + /* Let's check peer technical hostname */ if (record.HasSenderHostName() && TechnicalPeerHostName != record.GetSenderHostName()) { Send(ev->Sender, new TEvHandshakeReplyError("host name mismatch")); - return; - } + return; + } // check sender actor id and check if it is not very old if (LastSerialFromIncomingHandshake) { @@ -303,60 +303,60 @@ namespace NActors { } } - // drop incoming handshake as this is definitely more recent + // drop incoming handshake as this is definitely more recent DropIncomingHandshake(); - // prepare for new session + // prepare for new session PrepareNewSessionHandshake(); - auto event = MakeHolder<TEvHandshakeReplyOK>(); - auto* pb = event->Record.MutableSuccess(); + auto event = MakeHolder<TEvHandshakeReplyOK>(); + auto* pb = event->Record.MutableSuccess(); const TActorId virtualId = GenerateSessionVirtualId(); - pb->SetProtocol(INTERCONNECT_PROTOCOL_VERSION); + pb->SetProtocol(INTERCONNECT_PROTOCOL_VERSION); pb->SetSenderActorId(virtualId.ToString()); - pb->SetProgramPID(GetPID()); + pb->SetProgramPID(GetPID()); pb->SetProgramStartTime(Common->StartTime); - pb->SetSerial(virtualId.LocalId()); + pb->SetSerial(virtualId.LocalId()); IssueIncomingHandshakeReply(ev->Sender, 0, std::move(event)); - } + } void TInterconnectProxyTCP::HandleHandshakeStatus(TEvHandshakeDone::TPtr& ev) { ICPROXY_PROFILED; TEvHandshakeDone *msg = ev->Get(); - // Terminate handshake actor working in opposite direction, if set up. - if (ev->Sender == IncomingHandshakeActor) { - LOG_INFO_IC("ICP19", "incoming handshake succeeded"); + // Terminate handshake actor working in opposite direction, if set up. + if (ev->Sender == IncomingHandshakeActor) { + LOG_INFO_IC("ICP19", "incoming handshake succeeded"); DropIncomingHandshake(false); DropOutgoingHandshake(); - } else if (ev->Sender == OutgoingHandshakeActor) { - LOG_INFO_IC("ICP20", "outgoing handshake succeeded"); + } else if (ev->Sender == OutgoingHandshakeActor) { + LOG_INFO_IC("ICP20", "outgoing handshake succeeded"); DropIncomingHandshake(); DropOutgoingHandshake(false); - } else { + } else { /* It seems to be an old handshake. */ - return; + return; } - Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor); + Y_VERIFY(!IncomingHandshakeActor && !OutgoingHandshakeActor); SwitchToState(__LINE__, "StateWork", &TThis::StateWork); - if (Session) { + if (Session) { // this is continuation request, check that virtual ids match Y_VERIFY(SessionVirtualId == msg->Self && RemoteSessionVirtualId == msg->Peer); - } else { + } else { // this is initial request, check that we have virtual ids not filled in Y_VERIFY(!SessionVirtualId && !RemoteSessionVirtualId); - } + } auto error = [&](const char* description) { TransitToErrorState(description); }; - // If session is not created, then create new one. - if (!Session) { + // If session is not created, then create new one. + if (!Session) { RemoteProgramInfo = std::move(msg->ProgramInfo); if (!RemoteProgramInfo) { // we have received resume handshake, but session was closed concurrently while handshaking @@ -374,15 +374,15 @@ namespace NActors { // ensure that we have session local/peer virtual ids Y_VERIFY(Session && SessionVirtualId && RemoteSessionVirtualId); - // Set up new connection for the session. + // Set up new connection for the session. IActor::InvokeOtherActor(*Session, &TInterconnectSessionTCP::SetNewConnection, ev); - // Reset retry timer - HoldByErrorWakeupDuration = TDuration::Zero(); + // Reset retry timer + HoldByErrorWakeupDuration = TDuration::Zero(); - /* Forward all held events */ + /* Forward all held events */ ProcessPendingSessionEvents(); - } + } void TInterconnectProxyTCP::HandleHandshakeStatus(TEvHandshakeFail::TPtr& ev) { ICPROXY_PROFILED; @@ -392,42 +392,42 @@ namespace NActors { (IncomingHandshakeActor && OutgoingHandshakeActor); LogHandshakeFail(ev, inconclusive); - if (ev->Sender == IncomingHandshakeActor) { - LOG_NOTICE_IC("ICP24", "incoming handshake failed, temporary: %" PRIu32 " explanation: %s outgoing: %s", + if (ev->Sender == IncomingHandshakeActor) { + LOG_NOTICE_IC("ICP24", "incoming handshake failed, temporary: %" PRIu32 " explanation: %s outgoing: %s", ui32(ev->Get()->Temporary), ev->Get()->Explanation.data(), OutgoingHandshakeActor.ToString().data()); DropIncomingHandshake(false); - } else if (ev->Sender == OutgoingHandshakeActor) { - LOG_NOTICE_IC("ICP25", "outgoing handshake failed, temporary: %" PRIu32 " explanation: %s incoming: %s held: %s", + } else if (ev->Sender == OutgoingHandshakeActor) { + LOG_NOTICE_IC("ICP25", "outgoing handshake failed, temporary: %" PRIu32 " explanation: %s incoming: %s held: %s", ui32(ev->Get()->Temporary), ev->Get()->Explanation.data(), IncomingHandshakeActor.ToString().data(), - HeldHandshakeReply ? "yes" : "no"); + HeldHandshakeReply ? "yes" : "no"); DropOutgoingHandshake(false); - if (IEventBase* reply = HeldHandshakeReply.Release()) { - Y_VERIFY(IncomingHandshakeActor); + if (IEventBase* reply = HeldHandshakeReply.Release()) { + Y_VERIFY(IncomingHandshakeActor); LOG_DEBUG_IC("ICP26", "sent held handshake reply to %s", IncomingHandshakeActor.ToString().data()); Send(IncomingHandshakeActor, reply); - } + } // if we have no current session, then we have to drop all pending events as the outgoing handshake has failed ProcessPendingSessionEvents(); - } else { - /* It seems to be an old fail, just ignore it */ + } else { + /* It seems to be an old fail, just ignore it */ LOG_NOTICE_IC("ICP27", "obsolete handshake fail ignored"); - return; + return; } if (Metrics) { Metrics->IncHandshakeFails(); - } + } - if (IncomingHandshakeActor || OutgoingHandshakeActor) { - // one of handshakes is still going on - LOG_DEBUG_IC("ICP28", "other handshake is still going on"); - return; - } + if (IncomingHandshakeActor || OutgoingHandshakeActor) { + // one of handshakes is still going on + LOG_DEBUG_IC("ICP28", "other handshake is still going on"); + return; + } - switch (ev->Get()->Temporary) { - case TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT: + switch (ev->Get()->Temporary) { + case TEvHandshakeFail::HANDSHAKE_FAIL_TRANSIENT: if (!Session) { if (PendingSessionEvents) { // try to start outgoing handshake as we have some events enqueued @@ -444,22 +444,22 @@ namespace NActors { // we have no active connection in that session, so just restart handshake from last known position IActor::InvokeOtherActor(*Session, &TInterconnectSessionTCP::StartHandshake); } - break; - - case TEvHandshakeFail::HANDSHAKE_FAIL_SESSION_MISMATCH: + break; + + case TEvHandshakeFail::HANDSHAKE_FAIL_SESSION_MISMATCH: StartInitialHandshake(); - break; - - case TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT: + break; + + case TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT: TString timeExplanation = " LastSessionDieTime# " + LastSessionDieTime.ToString(); - if (Session) { + if (Session) { InvokeOtherActor(*Session, &TInterconnectSessionTCP::Terminate, TDisconnectReason::HandshakeFailPermanent()); } TransitToErrorState(ev->Get()->Explanation + timeExplanation, false); - break; - } - } + break; + } + } void TInterconnectProxyTCP::LogHandshakeFail(TEvHandshakeFail::TPtr& ev, bool inconclusive) { ICPROXY_PROFILED; @@ -487,69 +487,69 @@ namespace NActors { void TInterconnectProxyTCP::ProcessPendingSessionEvents() { ICPROXY_PROFILED; - while (PendingSessionEvents) { + while (PendingSessionEvents) { TPendingSessionEvent ev = std::move(PendingSessionEvents.front()); PendingSessionEventsSize -= ev.Size; TAutoPtr<IEventHandle> event(ev.Event.Release()); - PendingSessionEvents.pop_front(); + PendingSessionEvents.pop_front(); if (Session) { ForwardSessionEventToSession(event); - } else { + } else { DropSessionEvent(event); } - } + } } void TInterconnectProxyTCP::DropSessionEvent(STATEFN_SIG) { ICPROXY_PROFILED; ValidateEvent(ev, "DropSessionEvent"); - switch (ev->GetTypeRewrite()) { - case TEvInterconnect::EvForward: - if (ev->Flags & IEventHandle::FlagSubscribeOnSession) { + switch (ev->GetTypeRewrite()) { + case TEvInterconnect::EvForward: + if (ev->Flags & IEventHandle::FlagSubscribeOnSession) { Send(ev->Sender, new TEvInterconnect::TEvNodeDisconnected(PeerNodeId), 0, ev->Cookie); - } + } TActivationContext::Send(ev->ForwardOnNondelivery(TEvents::TEvUndelivered::Disconnected)); - break; + break; - case TEvInterconnect::TEvConnectNode::EventType: - case TEvents::TEvSubscribe::EventType: + case TEvInterconnect::TEvConnectNode::EventType: + case TEvents::TEvSubscribe::EventType: Send(ev->Sender, new TEvInterconnect::TEvNodeDisconnected(PeerNodeId), 0, ev->Cookie); - break; + break; - case TEvents::TEvUnsubscribe::EventType: - /* Do nothing */ - break; + case TEvents::TEvUnsubscribe::EventType: + /* Do nothing */ + break; - default: - Y_FAIL("Unexpected type of event in held event queue"); - } + default: + Y_FAIL("Unexpected type of event in held event queue"); + } } void TInterconnectProxyTCP::UnregisterSession(TInterconnectSessionTCP* session) { ICPROXY_PROFILED; - Y_VERIFY(Session && Session == session && SessionID); + Y_VERIFY(Session && Session == session && SessionID); LOG_INFO_IC("ICP30", "unregister session Session# %s VirtualId# %s", SessionID.ToString().data(), SessionVirtualId.ToString().data()); - Session = nullptr; + Session = nullptr; SessionID = TActorId(); - // drop all pending events as we are closed + // drop all pending events as we are closed ProcessPendingSessionEvents(); - // reset virtual ids as this session is terminated + // reset virtual ids as this session is terminated SessionVirtualId = TActorId(); RemoteSessionVirtualId = TActorId(); if (Metrics) { Metrics->IncSessionDeaths(); - } + } LastSessionDieTime = TActivationContext::Now(); - + if (IncomingHandshakeActor || OutgoingHandshakeActor) { PrepareNewSessionHandshake(); } else { @@ -566,22 +566,22 @@ namespace NActors { PendingSessionEvents.emplace_back(TActivationContext::Now() + Common->Settings.MessagePendingTimeout, size, ev); ScheduleCleanupEventQueue(); CleanupEventQueue(); - } + } void TInterconnectProxyTCP::EnqueueIncomingHandshakeEvent(STATEFN_SIG) { ICPROXY_PROFILED; // enqueue handshake request Y_UNUSED(); - PendingIncomingHandshakeEvents.emplace_back(ev); - } + PendingIncomingHandshakeEvents.emplace_back(ev); + } void TInterconnectProxyTCP::EnqueueIncomingHandshakeEvent(TEvHandshakeDone::TPtr& /*ev*/) { ICPROXY_PROFILED; // TEvHandshakeDone can't get into the queue, because we have to process handshake request first; this may be the // race with the previous handshakes, so simply ignore it - } + } void TInterconnectProxyTCP::EnqueueIncomingHandshakeEvent(TEvHandshakeFail::TPtr& ev) { ICPROXY_PROFILED; @@ -599,140 +599,140 @@ namespace NActors { break; } } - } + } void TInterconnectProxyTCP::ForwardSessionEventToSession(STATEFN_SIG) { ICPROXY_PROFILED; - Y_VERIFY(Session && SessionID); + Y_VERIFY(Session && SessionID); ValidateEvent(ev, "ForwardSessionEventToSession"); InvokeOtherActor(*Session, &TInterconnectSessionTCP::Receive, ev, TActivationContext::ActorContextFor(SessionID)); - } + } void TInterconnectProxyTCP::GenerateHttpInfo(NMon::TEvHttpInfo::TPtr& ev) { ICPROXY_PROFILED; LOG_INFO_IC("ICP31", "proxy http called"); - TStringStream str; - - HTML(str) { - DIV_CLASS("panel panel-info") { - DIV_CLASS("panel-heading") { - str << "Proxy"; - } - DIV_CLASS("panel-body") { - TABLE_CLASS("table") { - TABLEHEAD() { - TABLER() { - TABLEH() { - str << "Sensor"; - } - TABLEH() { - str << "Value"; - } - } - } -#define MON_VAR(NAME) \ - TABLER() { \ - TABLED() { \ - str << #NAME; \ - } \ - TABLED() { \ - str << NAME; \ - } \ - } - - TABLEBODY() { + TStringStream str; + + HTML(str) { + DIV_CLASS("panel panel-info") { + DIV_CLASS("panel-heading") { + str << "Proxy"; + } + DIV_CLASS("panel-body") { + TABLE_CLASS("table") { + TABLEHEAD() { + TABLER() { + TABLEH() { + str << "Sensor"; + } + TABLEH() { + str << "Value"; + } + } + } +#define MON_VAR(NAME) \ + TABLER() { \ + TABLED() { \ + str << #NAME; \ + } \ + TABLED() { \ + str << NAME; \ + } \ + } + + TABLEBODY() { MON_VAR(TActivationContext::Now()) - MON_VAR(SessionID) - MON_VAR(LastSessionDieTime) - MON_VAR(IncomingHandshakeActor) - MON_VAR(IncomingHandshakeActorFilledIn) - MON_VAR(IncomingHandshakeActorReset) - MON_VAR(OutgoingHandshakeActor) - MON_VAR(OutgoingHandshakeActorCreated) - MON_VAR(OutgoingHandshakeActorReset) - MON_VAR(State) - MON_VAR(StateSwitchTime) + MON_VAR(SessionID) + MON_VAR(LastSessionDieTime) + MON_VAR(IncomingHandshakeActor) + MON_VAR(IncomingHandshakeActorFilledIn) + MON_VAR(IncomingHandshakeActorReset) + MON_VAR(OutgoingHandshakeActor) + MON_VAR(OutgoingHandshakeActorCreated) + MON_VAR(OutgoingHandshakeActorReset) + MON_VAR(State) + MON_VAR(StateSwitchTime) } } } } - DIV_CLASS("panel panel-info") { - DIV_CLASS("panel-heading") { - str << "Error Log"; - } - DIV_CLASS("panel-body") { - TABLE_CLASS("table") { - TABLEHEAD() { + DIV_CLASS("panel panel-info") { + DIV_CLASS("panel-heading") { + str << "Error Log"; + } + DIV_CLASS("panel-body") { + TABLE_CLASS("table") { + TABLEHEAD() { TABLER() { - TABLEH() { - str << "Timestamp"; + TABLEH() { + str << "Timestamp"; } - TABLEH() { - str << "Elapsed"; + TABLEH() { + str << "Elapsed"; } - TABLEH() { - str << "Kind"; + TABLEH() { + str << "Kind"; } - TABLEH() { - str << "Explanation"; + TABLEH() { + str << "Explanation"; } } } - TABLEBODY() { + TABLEBODY() { const TInstant now = TActivationContext::Now(); - const TInstant barrier = now - TDuration::Minutes(1); - for (auto it = ErrorStateLog.rbegin(); it != ErrorStateLog.rend(); ++it) { - auto wrapper = [&](const auto& lambda) { - if (std::get<0>(*it) > barrier) { - str << "<strong>"; - lambda(); - str << "</strong>"; - } else { - lambda(); - } - }; - TABLER() { - TABLED() { - wrapper([&] { - str << std::get<0>(*it); - }); - } - TABLED() { - wrapper([&] { - str << now - std::get<0>(*it); - }); - } - TABLED() { - wrapper([&] { - str << std::get<1>(*it); - }); - } - TABLED() { - wrapper([&] { - str << std::get<2>(*it); - }); + const TInstant barrier = now - TDuration::Minutes(1); + for (auto it = ErrorStateLog.rbegin(); it != ErrorStateLog.rend(); ++it) { + auto wrapper = [&](const auto& lambda) { + if (std::get<0>(*it) > barrier) { + str << "<strong>"; + lambda(); + str << "</strong>"; + } else { + lambda(); + } + }; + TABLER() { + TABLED() { + wrapper([&] { + str << std::get<0>(*it); + }); + } + TABLED() { + wrapper([&] { + str << now - std::get<0>(*it); + }); + } + TABLED() { + wrapper([&] { + str << std::get<1>(*it); + }); + } + TABLED() { + wrapper([&] { + str << std::get<2>(*it); + }); ui32 rep = std::get<3>(*it); if (rep != 1) { str << " <strong>x" << rep << "</strong>"; } - } - } - } - } + } + } + } + } } } } } - if (Session != nullptr) { + if (Session != nullptr) { Session->GenerateHttpInfo(str); - } - + } + Send(ev->Sender, new NMon::TEvHttpInfoRes(str.Str())); } @@ -746,16 +746,16 @@ namespace NActors { UpdateErrorStateLog(TActivationContext::Now(), "permanent conclusive", explanation); } - Y_VERIFY(Session == nullptr); - Y_VERIFY(!SessionID); + Y_VERIFY(Session == nullptr); + Y_VERIFY(!SessionID); - // recalculate wakeup timeout -- if this is the first failure, then we sleep for default timeout; otherwise we - // sleep N times longer than the previous try, but not longer than desired number of seconds - HoldByErrorWakeupDuration = HoldByErrorWakeupDuration != TDuration::Zero() - ? Min(HoldByErrorWakeupDuration * SleepRetryMultiplier, MaxErrorSleep) - : FirstErrorSleep; + // recalculate wakeup timeout -- if this is the first failure, then we sleep for default timeout; otherwise we + // sleep N times longer than the previous try, but not longer than desired number of seconds + HoldByErrorWakeupDuration = HoldByErrorWakeupDuration != TDuration::Zero() + ? Min(HoldByErrorWakeupDuration * SleepRetryMultiplier, MaxErrorSleep) + : FirstErrorSleep; - // transit to required state and arm wakeup timer + // transit to required state and arm wakeup timer if (Terminated) { // switch to this state permanently SwitchToState(__LINE__, "HoldByError", &TThis::HoldByError); @@ -765,21 +765,21 @@ namespace NActors { HoldByErrorWakeupCookie = new TEvents::TEvWakeup); } - /* Process all pending events. */ + /* Process all pending events. */ ProcessPendingSessionEvents(); - /* Terminate handshakes */ + /* Terminate handshakes */ DropHandshakes(); - /* Terminate pending incoming handshake requests. */ - for (auto& ev : PendingIncomingHandshakeEvents) { + /* Terminate pending incoming handshake requests. */ + for (auto& ev : PendingIncomingHandshakeEvents) { Send(ev->Sender, new TEvents::TEvPoisonPill); if (ev->GetTypeRewrite() == TEvHandshakeFail::EventType) { TEvHandshakeFail::TPtr tmp(static_cast<TEventHandle<TEvHandshakeFail>*>(ev.Release())); LogHandshakeFail(tmp, true); } - } - PendingIncomingHandshakeEvents.clear(); + } + PendingIncomingHandshakeEvents.clear(); } void TInterconnectProxyTCP::WakeupFromErrorState(TEvents::TEvWakeup::TPtr& ev) { @@ -787,39 +787,39 @@ namespace NActors { LOG_INFO_IC("ICP33", "wake up from error state"); - if (ev->Get() == HoldByErrorWakeupCookie) { + if (ev->Get() == HoldByErrorWakeupCookie) { SwitchToInitialState(); - } + } } void TInterconnectProxyTCP::Disconnect() { ICPROXY_PROFILED; - // terminate handshakes (if any) + // terminate handshakes (if any) DropHandshakes(); - if (Session) { + if (Session) { IActor::InvokeOtherActor(*Session, &TInterconnectSessionTCP::Terminate, TDisconnectReason::UserRequest()); - } else { + } else { TransitToErrorState("forced disconnect"); - } + } } void TInterconnectProxyTCP::ScheduleCleanupEventQueue() { ICPROXY_PROFILED; - if (!CleanupEventQueueScheduled && PendingSessionEvents) { + if (!CleanupEventQueueScheduled && PendingSessionEvents) { // apply batching at 50 ms granularity Schedule(Max(TDuration::MilliSeconds(50), PendingSessionEvents.front().Deadline - TActivationContext::Now()), new TEvCleanupEventQueue); - CleanupEventQueueScheduled = true; - } + CleanupEventQueueScheduled = true; + } } void TInterconnectProxyTCP::HandleCleanupEventQueue() { ICPROXY_PROFILED; - Y_VERIFY(CleanupEventQueueScheduled); - CleanupEventQueueScheduled = false; + Y_VERIFY(CleanupEventQueueScheduled); + CleanupEventQueueScheduled = false; CleanupEventQueue(); ScheduleCleanupEventQueue(); } @@ -838,24 +838,24 @@ namespace NActors { } else { break; } - } + } } void TInterconnectProxyTCP::HandleClosePeerSocket() { ICPROXY_PROFILED; - if (Session && Session->Socket) { + if (Session && Session->Socket) { LOG_INFO_IC("ICP34", "closed connection by debug command"); - Session->Socket->Shutdown(SHUT_RDWR); - } + Session->Socket->Shutdown(SHUT_RDWR); + } } void TInterconnectProxyTCP::HandleCloseInputSession() { ICPROXY_PROFILED; - if (Session) { + if (Session) { IActor::InvokeOtherActor(*Session, &TInterconnectSessionTCP::CloseInputSession); - } + } } void TInterconnectProxyTCP::HandlePoisonSession() { @@ -869,11 +869,11 @@ namespace NActors { void TInterconnectProxyTCP::HandleSessionBufferSizeRequest(TEvSessionBufferSizeRequest::TPtr& ev) { ICPROXY_PROFILED; - ui64 bufSize = 0; - if (Session) { - bufSize = Session->TotalOutputQueueSize; - } - + ui64 bufSize = 0; + if (Session) { + bufSize = Session->TotalOutputQueueSize; + } + Send(ev->Sender, new TEvSessionBufferSizeResponse(SessionID, bufSize)); } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h index 1f9ce6841c..023e5bd1ee 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h @@ -16,22 +16,22 @@ namespace NActors { - /* WARNING: all proxy actors should be alive during actorsystem activity */ + /* WARNING: all proxy actors should be alive during actorsystem activity */ class TInterconnectProxyTCP : public TActor<TInterconnectProxyTCP> , public TInterconnectLoggingBase , public TProfiled { - enum { - EvCleanupEventQueue = EventSpaceBegin(TEvents::ES_PRIVATE), + enum { + EvCleanupEventQueue = EventSpaceBegin(TEvents::ES_PRIVATE), EvQueryStats, EvStats, EvPassAwayIfNeeded, - }; + }; - struct TEvCleanupEventQueue : TEventLocal<TEvCleanupEventQueue, EvCleanupEventQueue> {}; + struct TEvCleanupEventQueue : TEventLocal<TEvCleanupEventQueue, EvCleanupEventQueue> {}; - public: + public: struct TEvQueryStats : TEventLocal<TEvQueryStats, EvQueryStats> {}; struct TProxyStats { @@ -56,9 +56,9 @@ namespace NActors { TProxyStats ProxyStats; }; - static constexpr EActivityType ActorActivityType() { - return INTERCONNECT_PROXY_TCP; - } + static constexpr EActivityType ActorActivityType() { + return INTERCONNECT_PROXY_TCP; + } TInterconnectProxyTCP(const ui32 node, TInterconnectProxyCommon::TPtr common, IActor **dynamicPtr = nullptr); @@ -72,11 +72,11 @@ namespace NActors { void Bootstrap(); void Registered(TActorSystem* sys, const TActorId& owner) override; - private: - friend class TInterconnectSessionTCP; - friend class TInterconnectSessionTCPv0; - friend class THandshake; - friend class TInputSessionTCP; + private: + friend class TInterconnectSessionTCP; + friend class TInterconnectSessionTCPv0; + friend class THandshake; + friend class TInputSessionTCP; void UnregisterSession(TInterconnectSessionTCP* session); @@ -138,46 +138,46 @@ namespace NActors { } \ } - template <typename T> + template <typename T> void Ignore(T& /*ev*/) { ICPROXY_PROFILED; - } + } void Ignore() { ICPROXY_PROFILED; - } + } void Ignore(TEvHandshakeDone::TPtr& ev) { ICPROXY_PROFILED; - Y_VERIFY(ev->Sender != IncomingHandshakeActor); - Y_VERIFY(ev->Sender != OutgoingHandshakeActor); - } + Y_VERIFY(ev->Sender != IncomingHandshakeActor); + Y_VERIFY(ev->Sender != OutgoingHandshakeActor); + } void Ignore(TEvHandshakeFail::TPtr& ev) { ICPROXY_PROFILED; - Y_VERIFY(ev->Sender != IncomingHandshakeActor); - Y_VERIFY(ev->Sender != OutgoingHandshakeActor); + Y_VERIFY(ev->Sender != IncomingHandshakeActor); + Y_VERIFY(ev->Sender != OutgoingHandshakeActor); LogHandshakeFail(ev, true); - } + } - const char* State = nullptr; - TInstant StateSwitchTime; + const char* State = nullptr; + TInstant StateSwitchTime; - template <typename... TArgs> + template <typename... TArgs> void SwitchToState(int line, const char* name, TArgs&&... args) { ICPROXY_PROFILED; LOG_DEBUG_IC("ICP77", "@%d %s -> %s", line, State, name); - State = name; + State = name; StateSwitchTime = TActivationContext::Now(); - Become(std::forward<TArgs>(args)...); + Become(std::forward<TArgs>(args)...); Y_VERIFY(!Terminated || CurrentStateFunc() == &TThis::HoldByError); // ensure we never escape this state if (CurrentStateFunc() != &TThis::PendingActivation) { PassAwayTimestamp = TInstant::Max(); } - } + } TInstant PassAwayTimestamp; bool PassAwayScheduled = false; @@ -194,7 +194,7 @@ namespace NActors { {}, nullptr, 0)); PassAwayScheduled = true; } - } + } void HandlePassAwayIfNeeded() { Y_VERIFY(PassAwayScheduled); @@ -203,92 +203,92 @@ namespace NActors { } } - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // PendingActivation - // - // In this state we are just waiting for some activities, which may include: - // * an external Session event - // * incoming handshake request - // - // Upon receiving such event, we put it to corresponding queue and initiate start up by calling IssueGetNodeRequest, - // which, as the name says, issued TEvGetNode to the nameservice and arms timer to handle timeout (which should not - // occur, but we want to be sure we don't hang on this), and then switches to PendingNodeInfo state. - - PROXY_STFUNC(PendingActivation, - RequestNodeInfo, // Session events - RequestNodeInfoForIncomingHandshake, // Incoming handshake requests - Ignore, // Handshake status - Ignore, // Disconnect request - Ignore, // Wakeup - Ignore // Node info + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // PendingActivation + // + // In this state we are just waiting for some activities, which may include: + // * an external Session event + // * incoming handshake request + // + // Upon receiving such event, we put it to corresponding queue and initiate start up by calling IssueGetNodeRequest, + // which, as the name says, issued TEvGetNode to the nameservice and arms timer to handle timeout (which should not + // occur, but we want to be sure we don't hang on this), and then switches to PendingNodeInfo state. + + PROXY_STFUNC(PendingActivation, + RequestNodeInfo, // Session events + RequestNodeInfoForIncomingHandshake, // Incoming handshake requests + Ignore, // Handshake status + Ignore, // Disconnect request + Ignore, // Wakeup + Ignore // Node info ) - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // PendingNodeInfo - // - // This state is entered when we asked nameserver to provide description for peer node we are working with. All - // external Session events and incoming handshake requests are enqueued into their respective queues, TEvNodeInfo - // is main event that triggers processing. On success, we try to initiate outgoing handshake if needed, or process - // incoming handshakes. On error, we enter HoldByError state. - // - // NOTE: handshake status events are also enqueued as the handshake actor may have generated failure event due to - // timeout or some other reason without waiting for acknowledge, and it must be processed correctly to prevent - // session hang - - PROXY_STFUNC(PendingNodeInfo, - EnqueueSessionEvent, // Session events - EnqueueIncomingHandshakeEvent, // Incoming handshake requests - EnqueueIncomingHandshakeEvent, // Handshake status - Disconnect, // Disconnect request - ConfigureTimeout, // Wakeup - Configure // Node info + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // PendingNodeInfo + // + // This state is entered when we asked nameserver to provide description for peer node we are working with. All + // external Session events and incoming handshake requests are enqueued into their respective queues, TEvNodeInfo + // is main event that triggers processing. On success, we try to initiate outgoing handshake if needed, or process + // incoming handshakes. On error, we enter HoldByError state. + // + // NOTE: handshake status events are also enqueued as the handshake actor may have generated failure event due to + // timeout or some other reason without waiting for acknowledge, and it must be processed correctly to prevent + // session hang + + PROXY_STFUNC(PendingNodeInfo, + EnqueueSessionEvent, // Session events + EnqueueIncomingHandshakeEvent, // Incoming handshake requests + EnqueueIncomingHandshakeEvent, // Handshake status + Disconnect, // Disconnect request + ConfigureTimeout, // Wakeup + Configure // Node info ) - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // PendingConnection - // - // Here we have issued outgoing handshake or have accepted (or may be both) incoming handshake and we are waiting for - // the status of the handshake. When one if handshakes finishes, we use this status to establish connection (or to - // go to error state). When one handshake terminates with error while other is running, we will still wait for the - // second one to finish. - - PROXY_STFUNC(PendingConnection, - EnqueueSessionEvent, // Session events - IncomingHandshake, // Incoming handshake requests - HandleHandshakeStatus, // Handshake status - Disconnect, // Disconnect request - Ignore, // Wakeup - Ignore // Node info + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // PendingConnection + // + // Here we have issued outgoing handshake or have accepted (or may be both) incoming handshake and we are waiting for + // the status of the handshake. When one if handshakes finishes, we use this status to establish connection (or to + // go to error state). When one handshake terminates with error while other is running, we will still wait for the + // second one to finish. + + PROXY_STFUNC(PendingConnection, + EnqueueSessionEvent, // Session events + IncomingHandshake, // Incoming handshake requests + HandleHandshakeStatus, // Handshake status + Disconnect, // Disconnect request + Ignore, // Wakeup + Ignore // Node info ) - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // StateWork - // - // We have accepted session and process any incoming messages with the session. Incoming handshakes are accepted - // concurrently and applied when finished. - - PROXY_STFUNC(StateWork, - ForwardSessionEventToSession, // Session events - IncomingHandshake, // Incoming handshake requests - HandleHandshakeStatus, // Handshake status - Disconnect, // Disconnect request - Ignore, // Wakeup - Ignore // Node info + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // StateWork + // + // We have accepted session and process any incoming messages with the session. Incoming handshakes are accepted + // concurrently and applied when finished. + + PROXY_STFUNC(StateWork, + ForwardSessionEventToSession, // Session events + IncomingHandshake, // Incoming handshake requests + HandleHandshakeStatus, // Handshake status + Disconnect, // Disconnect request + Ignore, // Wakeup + Ignore // Node info ) - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // HoldByError - // - // When something bad happens with the connection, we sleep in this state. After wake up we go back to - // PendingActivation. - - PROXY_STFUNC(HoldByError, - DropSessionEvent, // Session events - RequestNodeInfoForIncomingHandshake, // Incoming handshake requests - Ignore, // Handshake status - Ignore, // Disconnect request - WakeupFromErrorState, // Wakeup - Ignore // Node info + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // HoldByError + // + // When something bad happens with the connection, we sleep in this state. After wake up we go back to + // PendingActivation. + + PROXY_STFUNC(HoldByError, + DropSessionEvent, // Session events + RequestNodeInfoForIncomingHandshake, // Incoming handshake requests + Ignore, // Handshake status + Ignore, // Disconnect request + WakeupFromErrorState, // Wakeup + Ignore // Node info ) #undef SESSION_EVENTS @@ -317,19 +317,19 @@ namespace NActors { void StartInitialHandshake(); void StartResumeHandshake(ui64 inputCounter); - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // Incoming handshake event queue processing - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Incoming handshake event queue processing + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// void EnqueueIncomingHandshakeEvent(STATEFN_SIG); void EnqueueIncomingHandshakeEvent(TEvHandshakeDone::TPtr& ev); void EnqueueIncomingHandshakeEvent(TEvHandshakeFail::TPtr& ev); - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // PendingNodeInfo - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // PendingNodeInfo + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - IEventBase* ConfigureTimeoutCookie; // pointer to the scheduled event used to match sent and received events + IEventBase* ConfigureTimeoutCookie; // pointer to the scheduled event used to match sent and received events void StartConfiguring(); void Configure(TEvInterconnect::TEvNodeInfo::TPtr& ev); @@ -343,7 +343,7 @@ namespace NActors { void WakeupFromErrorState(TEvents::TEvWakeup::TPtr& ev); void Disconnect(); - const ui32 PeerNodeId; + const ui32 PeerNodeId; IActor **DynamicPtr; void ValidateEvent(TAutoPtr<IEventHandle>& ev, const char* func) { @@ -362,15 +362,15 @@ namespace NActors { } // Common with helpers - // All proxy actors share the same information in the object - // read only + // All proxy actors share the same information in the object + // read only TInterconnectProxyCommon::TPtr const Common; const TActorId& GetNameserviceId() const { return Common->NameserviceId; - } + } - TString TechnicalPeerHostName; + TString TechnicalPeerHostName; std::shared_ptr<IInterconnectMetrics> Metrics; @@ -380,12 +380,12 @@ namespace NActors { void HandleSessionBufferSizeRequest(TEvSessionBufferSizeRequest::TPtr& ev); - bool CleanupEventQueueScheduled = false; + bool CleanupEventQueueScheduled = false; void ScheduleCleanupEventQueue(); void HandleCleanupEventQueue(); void CleanupEventQueue(); - // hold all events before connection is established + // hold all events before connection is established struct TPendingSessionEvent { TInstant Deadline; ui32 Size; @@ -402,12 +402,12 @@ namespace NActors { void ProcessPendingSessionEvents(); void DropSessionEvent(STATEFN_SIG); - TInterconnectSessionTCP* Session = nullptr; + TInterconnectSessionTCP* Session = nullptr; TActorId SessionID; - // virtual ids used during handshake to check if it is the connection - // for the same session or to find out the latest shandshake - // it's virtual because session actor apears after successfull handshake + // virtual ids used during handshake to check if it is the connection + // for the same session or to find out the latest shandshake + // it's virtual because session actor apears after successfull handshake TActorId SessionVirtualId; TActorId RemoteSessionVirtualId; @@ -416,27 +416,27 @@ namespace NActors { const ui64 localId = TlsActivationContext->ExecutorThread.ActorSystem->AllocateIDSpace(1); return NActors::TActorId(SelfId().NodeId(), 0, localId, 0); - } + } - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// TActorId IncomingHandshakeActor; - TInstant IncomingHandshakeActorFilledIn; - TInstant IncomingHandshakeActorReset; + TInstant IncomingHandshakeActorFilledIn; + TInstant IncomingHandshakeActorReset; TMaybe<ui64> LastSerialFromIncomingHandshake; - THolder<IEventBase> HeldHandshakeReply; + THolder<IEventBase> HeldHandshakeReply; void DropIncomingHandshake(bool poison = true) { ICPROXY_PROFILED; if (const TActorId& actorId = std::exchange(IncomingHandshakeActor, TActorId())) { LOG_DEBUG_IC("ICP111", "dropped incoming handshake: %s poison: %s", actorId.ToString().data(), - poison ? "true" : "false"); - if (poison) { + poison ? "true" : "false"); + if (poison) { Send(actorId, new TEvents::TEvPoisonPill); - } + } LastSerialFromIncomingHandshake.Clear(); - HeldHandshakeReply.Reset(); + HeldHandshakeReply.Reset(); IncomingHandshakeActorReset = TActivationContext::Now(); } } @@ -446,10 +446,10 @@ namespace NActors { if (const TActorId& actorId = std::exchange(OutgoingHandshakeActor, TActorId())) { LOG_DEBUG_IC("ICP112", "dropped outgoing handshake: %s poison: %s", actorId.ToString().data(), - poison ? "true" : "false"); - if (poison) { + poison ? "true" : "false"); + if (poison) { Send(actorId, new TEvents::TEvPoisonPill); - } + } OutgoingHandshakeActorReset = TActivationContext::Now(); } } @@ -464,38 +464,38 @@ namespace NActors { void PrepareNewSessionHandshake() { ICPROXY_PROFILED; - // drop existing session if we have one - if (Session) { - LOG_INFO_IC("ICP04", "terminating current session as we are negotiating a new one"); + // drop existing session if we have one + if (Session) { + LOG_INFO_IC("ICP04", "terminating current session as we are negotiating a new one"); IActor::InvokeOtherActor(*Session, &TInterconnectSessionTCP::Terminate, TDisconnectReason::NewSession()); - } + } - // ensure we have no current session - Y_VERIFY(!Session); + // ensure we have no current session + Y_VERIFY(!Session); - // switch to pending connection state -- we wait for handshakes, we want more handshakes! + // switch to pending connection state -- we wait for handshakes, we want more handshakes! SwitchToState(__LINE__, "PendingConnection", &TThis::PendingConnection); - } + } void IssueIncomingHandshakeReply(const TActorId& handshakeId, ui64 peerLocalId, - THolder<IEventBase> event); + THolder<IEventBase> event); - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// TActorId OutgoingHandshakeActor; - TInstant OutgoingHandshakeActorCreated; - TInstant OutgoingHandshakeActorReset; + TInstant OutgoingHandshakeActorCreated; + TInstant OutgoingHandshakeActorReset; - TInstant LastSessionDieTime; + TInstant LastSessionDieTime; void GenerateHttpInfo(NMon::TEvHttpInfo::TPtr& ev); void Handle(TEvQueryStats::TPtr& ev); - TDuration HoldByErrorWakeupDuration = TDuration::Zero(); - TEvents::TEvWakeup* HoldByErrorWakeupCookie; + TDuration HoldByErrorWakeupDuration = TDuration::Zero(); + TEvents::TEvWakeup* HoldByErrorWakeupCookie; - THolder<TProgramInfo> RemoteProgramInfo; + THolder<TProgramInfo> RemoteProgramInfo; NInterconnect::TSecureSocketContext::TPtr SecureContext; void Handle(TEvGetSecureSocket::TPtr ev) { @@ -503,11 +503,11 @@ namespace NActors { Send(ev->Sender, new TEvSecureSocket(std::move(socket))); } - TDeque<THolder<IEventHandle>> PendingIncomingHandshakeEvents; + TDeque<THolder<IEventHandle>> PendingIncomingHandshakeEvents; TDeque<std::tuple<TInstant, TString, TString, ui32>> ErrorStateLog; - - void UpdateErrorStateLog(TInstant now, TString kind, TString explanation) { + + void UpdateErrorStateLog(TInstant now, TString kind, TString explanation) { ICPROXY_PROFILED; if (ErrorStateLog) { @@ -521,9 +521,9 @@ namespace NActors { } ErrorStateLog.emplace_back(now, std::move(kind), std::move(explanation), 1); - if (ErrorStateLog.size() > 20) { - ErrorStateLog.pop_front(); - } + if (ErrorStateLog.size() > 20) { + ErrorStateLog.pop_front(); + } } void LogHandshakeFail(TEvHandshakeFail::TPtr& ev, bool inconclusive); @@ -532,6 +532,6 @@ namespace NActors { void HandleTerminate(); void PassAway() override; - }; + }; } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp index 31bac08b29..b95c994598 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp @@ -8,15 +8,15 @@ namespace NActors { TInterconnectListenerTCP::TInterconnectListenerTCP(const TString& address, ui16 port, TInterconnectProxyCommon::TPtr common, const TMaybe<SOCKET>& socket) - : TActor(&TThis::Initial) + : TActor(&TThis::Initial) , TInterconnectLoggingBase(Sprintf("ICListener: %s", SelfId().ToString().data())) - , Address(address.c_str(), port) + , Address(address.c_str(), port) , Listener( socket ? new NInterconnect::TStreamSocket(*socket) : nullptr) , ExternalSocket(!!Listener) - , ProxyCommonCtx(std::move(common)) + , ProxyCommonCtx(std::move(common)) { if (ExternalSocket) { SetNonBlock(*Listener); @@ -24,13 +24,13 @@ namespace NActors { } TAutoPtr<IEventHandle> TInterconnectListenerTCP::AfterRegister(const TActorId& self, const TActorId& parentId) { - return new IEventHandle(self, parentId, new TEvents::TEvBootstrap, 0); - } + return new IEventHandle(self, parentId, new TEvents::TEvBootstrap, 0); + } - void TInterconnectListenerTCP::Die(const TActorContext& ctx) { + void TInterconnectListenerTCP::Die(const TActorContext& ctx) { LOG_DEBUG_IC("ICL08", "Dying"); - TActor::Die(ctx); - } + TActor::Die(ctx); + } int TInterconnectListenerTCP::Bind() { NInterconnect::TAddress addr = Address; @@ -57,7 +57,7 @@ namespace NActors { Listener = NInterconnect::TStreamSocket::Make(addr.GetFamily()); if (*Listener == -1) { return errno; - } + } SetNonBlock(*Listener); Listener->SetSendBufferSize(ProxyCommonCtx->Settings.GetSendBufferSize()); // TODO(alexvru): WTF? SetSockOpt(*Listener, SOL_SOCKET, SO_REUSEADDR, 1); @@ -67,7 +67,7 @@ namespace NActors { return e; } else { return 0; - } + } } void TInterconnectListenerTCP::Bootstrap(const TActorContext& ctx) { @@ -78,7 +78,7 @@ namespace NActors { Become(&TThis::Initial, TDuration::Seconds(1), new TEvents::TEvBootstrap); return; } - } + } if (const auto& callback = ProxyCommonCtx->InitWhiteboard) { callback(Address.GetPort(), TlsActivationContext->ExecutorThread.ActorSystem); } @@ -112,6 +112,6 @@ namespace NActors { } break; } - } + } } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_server.h b/library/cpp/actors/interconnect/interconnect_tcp_server.h index 49e9f43dd6..fc71073c2d 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_server.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_server.h @@ -9,46 +9,46 @@ #include "events_local.h" namespace NActors { - class TInterconnectListenerTCP: public TActor<TInterconnectListenerTCP>, public TInterconnectLoggingBase { - public: - static constexpr EActivityType ActorActivityType() { + class TInterconnectListenerTCP: public TActor<TInterconnectListenerTCP>, public TInterconnectLoggingBase { + public: + static constexpr EActivityType ActorActivityType() { return INTERCONNECT_COMMON; - } + } TInterconnectListenerTCP(const TString& address, ui16 port, TInterconnectProxyCommon::TPtr common, const TMaybe<SOCKET>& socket = Nothing()); int Bind(); - private: - STFUNC(Initial) { - switch (ev->GetTypeRewrite()) { - CFunc(TEvents::TEvBootstrap::EventType, Bootstrap); - CFunc(TEvents::TEvPoisonPill::EventType, Die); - } + private: + STFUNC(Initial) { + switch (ev->GetTypeRewrite()) { + CFunc(TEvents::TEvBootstrap::EventType, Bootstrap); + CFunc(TEvents::TEvPoisonPill::EventType, Die); + } } - STFUNC(Listen) { - switch (ev->GetTypeRewrite()) { - CFunc(TEvents::TEvPoisonPill::EventType, Die); + STFUNC(Listen) { + switch (ev->GetTypeRewrite()) { + CFunc(TEvents::TEvPoisonPill::EventType, Die); HFunc(TEvPollerRegisterResult, Handle); CFunc(TEvPollerReady::EventType, Process); - } + } } TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) override; - void Die(const TActorContext& ctx) override; + void Die(const TActorContext& ctx) override; - void Bootstrap(const TActorContext& ctx); + void Bootstrap(const TActorContext& ctx); void Handle(TEvPollerRegisterResult::TPtr ev, const TActorContext& ctx); void Process(const TActorContext& ctx); - const NInterconnect::TAddress Address; - TIntrusivePtr<NInterconnect::TStreamSocket> Listener; + const NInterconnect::TAddress Address; + TIntrusivePtr<NInterconnect::TStreamSocket> Listener; const bool ExternalSocket; TPollerToken::TPtr PollerToken; TInterconnectProxyCommon::TPtr const ProxyCommonCtx; - }; + }; static inline TActorId MakeInterconnectListenerActorId(bool dynamic) { char x[12] = {'I', 'C', 'L', 'i', 's', 't', 'e', 'n', 'e', 'r', '/', dynamic ? 'D' : 'S'}; diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index 0e18a20072..2ded7f9f53 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -10,14 +10,14 @@ #include <library/cpp/monlib/service/pages/templates.h> namespace NActors { - LWTRACE_USING(ACTORLIB_PROVIDER); + LWTRACE_USING(ACTORLIB_PROVIDER); - DECLARE_WILSON_EVENT(OutputQueuePush, (ui32, QueueSizeInEvents), (ui64, QueueSizeInBytes)); + DECLARE_WILSON_EVENT(OutputQueuePush, (ui32, QueueSizeInEvents), (ui64, QueueSizeInBytes)); template<typename T> T Coalesce(T&& x) { return x; - } + } template<typename T, typename T2, typename... TRest> typename std::common_type<T, T2, TRest...>::type Coalesce(T&& first, T2&& mid, TRest&&... rest) { @@ -26,22 +26,22 @@ namespace NActors { } else { return Coalesce(std::forward<T2>(mid), std::forward<TRest>(rest)...); } - } + } TInterconnectSessionTCP::TInterconnectSessionTCP(TInterconnectProxyTCP* const proxy, TSessionParams params) - : TActor(&TInterconnectSessionTCP::StateFunc) + : TActor(&TInterconnectSessionTCP::StateFunc) , Created(TInstant::Now()) - , Proxy(proxy) + , Proxy(proxy) , CloseOnIdleWatchdog(GetCloseOnIdleTimeout(), std::bind(&TThis::OnCloseOnIdleTimerHit, this)) , LostConnectionWatchdog(GetLostConnectionTimeout(), std::bind(&TThis::OnLostConnectionTimerHit, this)) , Params(std::move(params)) - , TotalOutputQueueSize(0) - , OutputStuckFlag(false) - , OutputQueueUtilization(16) - , OutputCounter(0ULL) - { + , TotalOutputQueueSize(0) + , OutputStuckFlag(false) + , OutputQueueUtilization(16) + , OutputCounter(0ULL) + { Proxy->Metrics->SetConnected(0); - ReceiveContext.Reset(new TReceiveContext); + ReceiveContext.Reset(new TReceiveContext); } TInterconnectSessionTCP::~TInterconnectSessionTCP() { @@ -62,7 +62,7 @@ namespace NActors { LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] session created", Proxy->PeerNodeId); SetPrefix(Sprintf("Session %s [node %" PRIu32 "]", SelfId().ToString().data(), Proxy->PeerNodeId)); SendUpdateToWhiteboard(); - } + } void TInterconnectSessionTCP::CloseInputSession() { Send(ReceiverId, new TEvInterconnect::TEvCloseInputSession); @@ -84,7 +84,7 @@ namespace NActors { for (const auto& kv : Subscribers) { Send(kv.first, new TEvInterconnect::TEvNodeDisconnected(Proxy->PeerNodeId), 0, kv.second); - } + } Proxy->Metrics->SubSubscribersCount(Subscribers.size()); Subscribers.clear(); @@ -108,7 +108,7 @@ namespace NActors { } TActor::PassAway(); - } + } void TInterconnectSessionTCP::PassAway() { Y_FAIL("TInterconnectSessionTCP::PassAway() can't be called directly"); @@ -118,13 +118,13 @@ namespace NActors { Proxy->ValidateEvent(ev, "Forward"); LOG_DEBUG_IC_SESSION("ICS02", "send event from: %s to: %s", ev->Sender.ToString().data(), ev->Recipient.ToString().data()); - ++MessagesGot; + ++MessagesGot; - if (ev->Flags & IEventHandle::FlagSubscribeOnSession) { + if (ev->Flags & IEventHandle::FlagSubscribeOnSession) { Subscribe(ev); - } + } - ui16 evChannel = ev->GetChannel(); + ui16 evChannel = ev->GetChannel(); auto& oChannel = ChannelScheduler->GetOutputChannel(evChannel); const bool wasWorking = oChannel.IsWorking(); @@ -136,31 +136,31 @@ namespace NActors { if (!wasWorking) { // this channel has returned to work -- it was empty and this we have just put first event in the queue ChannelScheduler->AddToHeap(oChannel, EqualizeCounter); - } + } - SetOutputStuckFlag(true); + SetOutputStuckFlag(true); ++NumEventsInReadyChannels; LWTRACK(EnqueueEvent, event->Orbit, Proxy->PeerNodeId, NumEventsInReadyChannels, GetWriteBlockedTotal(), evChannel, oChannel.GetQueueSize(), oChannel.GetBufferedAmountOfData()); WILSON_TRACE(*TlsActivationContext, &ev->TraceId, OutputQueuePush, - QueueSizeInEvents = oChannel.GetQueueSize(), + QueueSizeInEvents = oChannel.GetQueueSize(), QueueSizeInBytes = oChannel.GetBufferedAmountOfData()); - // check for overloaded queues + // check for overloaded queues ui64 sendBufferDieLimit = Proxy->Common->Settings.SendBufferDieLimitInMB * ui64(1 << 20); - if (sendBufferDieLimit != 0 && TotalOutputQueueSize > sendBufferDieLimit) { + if (sendBufferDieLimit != 0 && TotalOutputQueueSize > sendBufferDieLimit) { LOG_ERROR_IC_SESSION("ICS03", "socket: %" PRIi64 " output queue is overloaded, actual %" PRIu64 " bytes, limit is %" PRIu64, - Socket ? i64(*Socket) : -1, TotalOutputQueueSize, sendBufferDieLimit); + Socket ? i64(*Socket) : -1, TotalOutputQueueSize, sendBufferDieLimit); return Terminate(TDisconnectReason::QueueOverload()); - } + } ui64 outputBuffersTotalSizeLimit = Proxy->Common->Settings.OutputBuffersTotalSizeLimitInMB * ui64(1 << 20); if (outputBuffersTotalSizeLimit != 0 && static_cast<ui64>(Proxy->Metrics->GetOutputBuffersTotalSize()) > outputBuffersTotalSizeLimit) { LOG_ERROR_IC_SESSION("ICS77", "Exceeded total limit on output buffers size"); if (AtomicTryLock(&Proxy->Common->StartedSessionKiller)) { CreateSessionKillingActor(Proxy->Common); - } - } + } + } if (RamInQueue && !RamInQueue->Batching) { // we have pending TEvRam, so GenerateTraffic will be called no matter what @@ -198,12 +198,12 @@ namespace NActors { void TInterconnectSessionTCP::Unsubscribe(STATEFN_SIG) { LOG_DEBUG_IC_SESSION("ICS05", "unsubscribe for session state for %s", ev->Sender.ToString().data()); Proxy->Metrics->SubSubscribersCount( Subscribers.erase(ev->Sender)); - } + } THolder<TEvHandshakeAck> TInterconnectSessionTCP::ProcessHandshakeRequest(TEvHandshakeAsk::TPtr& ev) { TEvHandshakeAsk *msg = ev->Get(); - // close existing input session, if any, and do nothing upon its destruction + // close existing input session, if any, and do nothing upon its destruction ReestablishConnection({}, false, TDisconnectReason::NewSession()); const ui64 lastInputSerial = ReceiveContext->LockLastProcessedPacketSerial(); @@ -211,21 +211,21 @@ namespace NActors { msg->Self.ToString().data(), msg->Peer.ToString().data(), msg->Counter, lastInputSerial); return MakeHolder<TEvHandshakeAck>(msg->Peer, lastInputSerial, Params); - } + } void TInterconnectSessionTCP::SetNewConnection(TEvHandshakeDone::TPtr& ev) { - if (ReceiverId) { - // upon destruction of input session actor invoke this callback again + if (ReceiverId) { + // upon destruction of input session actor invoke this callback again ReestablishConnection(std::move(ev), false, TDisconnectReason::NewSession()); - return; - } + return; + } LOG_INFO_IC_SESSION("ICS09", "handshake done sender: %s self: %s peer: %s socket: %" PRIi64, ev->Sender.ToString().data(), ev->Get()->Self.ToString().data(), ev->Get()->Peer.ToString().data(), i64(*ev->Get()->Socket)); NewConnectionSet = TActivationContext::Now(); - PacketsWrittenToSocket = 0; + PacketsWrittenToSocket = 0; SendBufferSize = ev->Get()->Socket->GetSendBufferSize(); Socket = std::move(ev->Get()->Socket); @@ -233,21 +233,21 @@ namespace NActors { // there may be a race const ui64 nextPacket = Max(LastConfirmed, ev->Get()->NextPacket); - // arm watchdogs + // arm watchdogs CloseOnIdleWatchdog.Arm(SelfId()); - // reset activity timestamps + // reset activity timestamps LastInputActivityTimestamp = LastPayloadActivityTimestamp = TActivationContext::Now(); LOG_INFO_IC_SESSION("ICS10", "traffic start"); - // create input session actor + // create input session actor auto actor = MakeHolder<TInputSessionTCP>(SelfId(), Socket, ReceiveContext, Proxy->Common, Proxy->Metrics, Proxy->PeerNodeId, nextPacket, GetDeadPeerTimeout(), Params); ReceiveContext->UnlockLastProcessedPacketSerial(); ReceiverId = Params.Encryption ? RegisterWithSameMailbox(actor.Release()) : Register(actor.Release(), TMailboxType::ReadAsFilled); - // register our socket in poller actor + // register our socket in poller actor LOG_DEBUG_IC_SESSION("ICS11", "registering socket in PollerActor"); const bool success = Send(MakePollerActorId(), new TEvPollerRegister(Socket, ReceiverId, SelfId())); Y_VERIFY(success); @@ -257,31 +257,31 @@ namespace NActors { Proxy->Metrics->SetConnected(1); LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] connected", Proxy->PeerNodeId); - // arm pinger timer + // arm pinger timer ResetFlushLogic(); - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // REINITIALIZE SEND QUEUE - // - // scan through send queue and leave only those packets who have data -- we will simply resend them; drop all other + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // REINITIALIZE SEND QUEUE + // + // scan through send queue and leave only those packets who have data -- we will simply resend them; drop all other // auxiliary packets; also reset packet metrics to zero to start sending from the beginning - // also reset SendQueuePos + // also reset SendQueuePos - // drop confirmed packets first as we do not need unwanted retransmissions - SendQueuePos = SendQueue.end(); + // drop confirmed packets first as we do not need unwanted retransmissions + SendQueuePos = SendQueue.end(); DropConfirmed(nextPacket); for (TSendQueue::iterator it = SendQueue.begin(); it != SendQueue.end(); ) { - const TSendQueue::iterator next = std::next(it); - if (it->IsEmpty()) { + const TSendQueue::iterator next = std::next(it); + if (it->IsEmpty()) { SendQueueCache.splice(SendQueueCache.begin(), SendQueue, it); - } else { - it->ResetBufs(); - } - it = next; - } + } else { + it->ResetBufs(); + } + it = next; + } TrimSendQueueCache(); - SendQueuePos = SendQueue.begin(); + SendQueuePos = SendQueue.begin(); TMaybe<ui64> s; for (auto it = SendQueuePos; it != SendQueue.end(); ++it) { @@ -295,13 +295,13 @@ namespace NActors { LOG_DEBUG_IC_SESSION("ICS06", "rewind SendQueue size# %zu LastConfirmed# %" PRIu64 " SendQueuePos.Serial# %" PRIu64 "\n", SendQueue.size(), LastConfirmed, serial); - BytesUnwritten = 0; - for (const auto& packet : SendQueue) { + BytesUnwritten = 0; + for (const auto& packet : SendQueue) { BytesUnwritten += (Params.UseModernFrame ? sizeof(TTcpPacketHeader_v2) : sizeof(TTcpPacketHeader_v1)) + packet.GetDataSize(); - } + } - SwitchStuckPeriod(); + SwitchStuckPeriod(); LastHandshakeDone = TActivationContext::Now(); @@ -310,45 +310,45 @@ namespace NActors { } void TInterconnectSessionTCP::Handle(TEvUpdateFromInputSession::TPtr& ev) { - if (ev->Sender == ReceiverId) { - TEvUpdateFromInputSession& msg = *ev->Get(); + if (ev->Sender == ReceiverId) { + TEvUpdateFromInputSession& msg = *ev->Get(); // update ping time Ping = msg.Ping; LWPROBE(UpdateFromInputSession, Proxy->PeerNodeId, Ping.MillisecondsFloat()); - bool needConfirm = false; + bool needConfirm = false; - // update activity timer for dead peer checker + // update activity timer for dead peer checker LastInputActivityTimestamp = TActivationContext::Now(); if (msg.NumDataBytes) { - UnconfirmedBytes += msg.NumDataBytes; + UnconfirmedBytes += msg.NumDataBytes; if (UnconfirmedBytes >= GetTotalInflightAmountOfData() / 4) { - needConfirm = true; - } else { + needConfirm = true; + } else { SetForcePacketTimestamp(Proxy->Common->Settings.ForceConfirmPeriod); - } + } - // reset payload watchdog that controls close-on-idle behaviour + // reset payload watchdog that controls close-on-idle behaviour LastPayloadActivityTimestamp = TActivationContext::Now(); CloseOnIdleWatchdog.Reset(); - } + } bool unblockedSomething = false; LWPROBE_IF_TOO_LONG(SlowICDropConfirmed, Proxy->PeerNodeId, ms) { unblockedSomething = DropConfirmed(msg.ConfirmedByInput); - } + } // generate more traffic if we have unblocked state now if (unblockedSomething) { LWPROBE(UnblockByDropConfirmed, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - ev->SendTime) * 1000.0); GenerateTraffic(); - } + } - // if we haven't generated any packets, then make a lone Flush packet without any data - if (needConfirm && Socket) { - ++ConfirmPacketsForcedBySize; + // if we haven't generated any packets, then make a lone Flush packet without any data + if (needConfirm && Socket) { + ++ConfirmPacketsForcedBySize; MakePacket(false); } @@ -376,7 +376,7 @@ namespace NActors { } } } - } + } void TInterconnectSessionTCP::HandleRam(TEvRam::TPtr& ev) { if (ev->Get() == RamInQueue) { @@ -451,57 +451,57 @@ namespace NActors { void TInterconnectSessionTCP::StartHandshake() { LOG_INFO_IC_SESSION("ICS15", "start handshake"); IActor::InvokeOtherActor(*Proxy, &TInterconnectProxyTCP::StartResumeHandshake, ReceiveContext->LockLastProcessedPacketSerial()); - } + } void TInterconnectSessionTCP::ReestablishConnectionWithHandshake(TDisconnectReason reason) { ReestablishConnection({}, true, std::move(reason)); - } + } void TInterconnectSessionTCP::ReestablishConnection(TEvHandshakeDone::TPtr&& ev, bool startHandshakeOnSessionClose, TDisconnectReason reason) { - if (Socket) { + if (Socket) { LOG_INFO_IC_SESSION("ICS13", "reestablish connection"); ShutdownSocket(std::move(reason)); // stop sending/receiving on socket PendingHandshakeDoneEvent = std::move(ev); StartHandshakeOnSessionClose = startHandshakeOnSessionClose; - if (!ReceiverId) { + if (!ReceiverId) { ReestablishConnectionExecute(); - } + } } } void TInterconnectSessionTCP::OnDisconnect(TEvSocketDisconnect::TPtr& ev) { - if (ev->Sender == ReceiverId) { - const bool wasConnected(Socket); + if (ev->Sender == ReceiverId) { + const bool wasConnected(Socket); LOG_INFO_IC_SESSION("ICS07", "socket disconnect %" PRIi64 " reason# %s", Socket ? i64(*Socket) : -1, ev->Get()->Reason.ToString().data()); ReceiverId = TActorId(); // reset receiver actor id as we have no more receiver yet - if (wasConnected) { - // we were sucessfully connected and did not expect failure, so it arrived from the input side; we should - // restart handshake process, closing our part of socket first + if (wasConnected) { + // we were sucessfully connected and did not expect failure, so it arrived from the input side; we should + // restart handshake process, closing our part of socket first ShutdownSocket(ev->Get()->Reason); StartHandshake(); - } else { + } else { ReestablishConnectionExecute(); - } + } } } void TInterconnectSessionTCP::ShutdownSocket(TDisconnectReason reason) { - if (Socket) { + if (Socket) { if (const TString& s = reason.ToString()) { Proxy->Metrics->IncDisconnectByReason(s); } LOG_INFO_IC_SESSION("ICS25", "shutdown socket, reason# %s", reason.ToString().data()); Proxy->UpdateErrorStateLog(TActivationContext::Now(), "close_socket", reason.ToString().data()); - Socket->Shutdown(SHUT_RDWR); - Socket.Reset(); + Socket->Shutdown(SHUT_RDWR); + Socket.Reset(); Proxy->Metrics->IncDisconnections(); - CloseOnIdleWatchdog.Disarm(); + CloseOnIdleWatchdog.Disarm(); LostConnectionWatchdog.Arm(SelfId()); Proxy->Metrics->SetConnected(0); LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] disconnected", Proxy->PeerNodeId); - } + } } void TInterconnectSessionTCP::ReestablishConnectionExecute() { @@ -512,7 +512,7 @@ namespace NActors { StartHandshake(); } else if (ev) { SetNewConnection(ev); - } + } } void TInterconnectSessionTCP::Handle(TEvPollerReady::TPtr& ev) { @@ -527,7 +527,7 @@ namespace NActors { GenerateTraffic(); } else if (!ev->Cookie) { Proxy->Metrics->IncSpuriousWriteWakeups(); - } + } if (Params.Encryption && ReceiveContext->ReadPending && !ev->Cookie) { Send(ReceiverId, ev->Release().Release(), 0, 1); } @@ -548,10 +548,10 @@ namespace NActors { void TInterconnectSessionTCP::WriteData() { ui64 written = 0; - Y_VERIFY(Socket); // ensure that socket wasn't closed + Y_VERIFY(Socket); // ensure that socket wasn't closed LWPROBE_IF_TOO_LONG(SlowICWriteData, Proxy->PeerNodeId, ms) { - constexpr ui32 iovLimit = 256; + constexpr ui32 iovLimit = 256; #ifdef _linux_ ui32 maxElementsInIOV = Min<ui32>(iovLimit, sysconf(_SC_IOV_MAX)); #else @@ -561,8 +561,8 @@ namespace NActors { maxElementsInIOV = 1; } - // vector of write buffers with preallocated stack space - TStackVec<TConstIoVec, iovLimit> wbuffers; + // vector of write buffers with preallocated stack space + TStackVec<TConstIoVec, iovLimit> wbuffers; LOG_DEBUG_IC_SESSION("ICS30", "WriteData WriteBlockedByFullSendBuffer# %s SendQueue.size# %zu", ReceiveContext->WriteBlockedByFullSendBuffer ? "true" : "false", SendQueue.size()); @@ -574,14 +574,14 @@ namespace NActors { while (SendQueuePos != SendQueue.end() && !ReceiveContext->WriteBlockedByFullSendBuffer) { for (auto it = SendQueuePos; it != SendQueue.end() && wbuffers.size() < maxElementsInIOV; ++it) { - it->AppendToIoVector(wbuffers, maxElementsInIOV); + it->AppendToIoVector(wbuffers, maxElementsInIOV); } - const struct iovec* iovec = reinterpret_cast<const struct iovec*>(wbuffers.data()); - int iovcnt = wbuffers.size(); + const struct iovec* iovec = reinterpret_cast<const struct iovec*>(wbuffers.data()); + int iovcnt = wbuffers.size(); - Y_VERIFY(iovcnt > 0); - Y_VERIFY(iovec->iov_len > 0); + Y_VERIFY(iovcnt > 0); + Y_VERIFY(iovec->iov_len > 0); TString err; ssize_t r = 0; @@ -596,23 +596,23 @@ namespace NActors { LOG_DEBUG_IC_SESSION("ICS16", "written# %zd iovcnt# %d err# %s", r, iovcnt, err.data()); - wbuffers.clear(); + wbuffers.clear(); - if (r > 0) { - Y_VERIFY(static_cast<size_t>(r) <= BytesUnwritten); - BytesUnwritten -= r; + if (r > 0) { + Y_VERIFY(static_cast<size_t>(r) <= BytesUnwritten); + BytesUnwritten -= r; written += r; ui64 packets = 0; - // advance SendQueuePos to eat all processed items - for (size_t amount = r; amount && SendQueuePos->DropBufs(amount); ++SendQueuePos) { + // advance SendQueuePos to eat all processed items + for (size_t amount = r; amount && SendQueuePos->DropBufs(amount); ++SendQueuePos) { if (!SendQueuePos->IsEmpty()) { LastSentSerial = Max(LastSentSerial, SendQueuePos->GetSerial()); } - ++PacketsWrittenToSocket; + ++PacketsWrittenToSocket; ++packets; LWTRACK(PacketWrittenToSocket, SendQueuePos->Orbit, Proxy->PeerNodeId, PacketsWrittenToSocket, SendQueuePos->TriedWriting, SendQueuePos->GetDataSize(), BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket); - } + } LWPROBE(WriteToSocket, Proxy->PeerNodeId, r, packets, PacketsWrittenToSocket, BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket); } else if (-r != EAGAIN && -r != EWOULDBLOCK) { @@ -624,15 +624,15 @@ namespace NActors { Proxy->Metrics->AddTotalBytesWritten(written); } return ReestablishConnectionWithHandshake(r == 0 ? TDisconnectReason::EndOfStream() : TDisconnectReason::FromErrno(-r)); - } else { + } else { // we have to do some hack for secure socket -- mark the packet as 'tried writing' if (Params.Encryption) { Y_VERIFY(SendQueuePos != SendQueue.end()); SendQueuePos->MarkTriedWriting(); // do not try to replace buffer under SSL } - // we have received EAGAIN error code, this means that we can't issue more data until we have received - // TEvPollerReadyWrite event from poller; set up flag meaning this and wait for that event + // we have received EAGAIN error code, this means that we can't issue more data until we have received + // TEvPollerReadyWrite event from poller; set up flag meaning this and wait for that event Y_VERIFY(!ReceiveContext->WriteBlockedByFullSendBuffer); ReceiveContext->WriteBlockedByFullSendBuffer = true; WriteBlockedCycles = GetCycleCountFast(); @@ -646,7 +646,7 @@ namespace NActors { } else { PollerToken->Request(false, true); } - } + } } } } @@ -656,12 +656,12 @@ namespace NActors { } void TInterconnectSessionTCP::SetForcePacketTimestamp(TDuration period) { - if (period != TDuration::Max()) { + if (period != TDuration::Max()) { const TInstant when = TActivationContext::Now() + period; - if (when < ForcePacketTimestamp) { - ForcePacketTimestamp = when; + if (when < ForcePacketTimestamp) { + ForcePacketTimestamp = when; ScheduleFlush(); - } + } } } @@ -671,7 +671,7 @@ namespace NActors { FlushSchedule.push(ForcePacketTimestamp); MaxFlushSchedule = Max(MaxFlushSchedule, FlushSchedule.size()); ++FlushEventsScheduled; - } + } } void TInterconnectSessionTCP::HandleFlush() { @@ -692,12 +692,12 @@ namespace NActors { } void TInterconnectSessionTCP::ResetFlushLogic() { - ForcePacketTimestamp = TInstant::Max(); - UnconfirmedBytes = 0; + ForcePacketTimestamp = TInstant::Max(); + UnconfirmedBytes = 0; const TDuration ping = Proxy->Common->Settings.PingPeriod; if (ping != TDuration::Zero() && !NumEventsInReadyChannels) { SetForcePacketTimestamp(ping); - } + } } void TInterconnectSessionTCP::TrimSendQueueCache() { @@ -719,19 +719,19 @@ namespace NActors { } ui64 TInterconnectSessionTCP::MakePacket(bool data, TMaybe<ui64> pingMask) { - Y_VERIFY(Socket); - - TSendQueue::iterator packet; - if (SendQueueCache) { - // we have entries in cache, take one and move it to the end of SendQueue - packet = SendQueueCache.begin(); - SendQueue.splice(SendQueue.end(), SendQueueCache, packet); - packet->Reuse(); // reset packet to initial state - } else { - // we have to allocate new packet, so just do it + Y_VERIFY(Socket); + + TSendQueue::iterator packet; + if (SendQueueCache) { + // we have entries in cache, take one and move it to the end of SendQueue + packet = SendQueueCache.begin(); + SendQueue.splice(SendQueue.end(), SendQueueCache, packet); + packet->Reuse(); // reset packet to initial state + } else { + // we have to allocate new packet, so just do it LWPROBE_IF_TOO_LONG(SlowICAllocPacketBuffer, Proxy->PeerNodeId, ms) { packet = SendQueue.emplace(SendQueue.end(), Params); - } + } } // update send queue position @@ -741,7 +741,7 @@ namespace NActors { ui64 serial = 0; - if (data) { + if (data) { // generate serial for this data packet serial = ++OutputCounter; @@ -749,21 +749,21 @@ namespace NActors { Y_VERIFY(NumEventsInReadyChannels); LWPROBE_IF_TOO_LONG(SlowICFillSendingBuffer, Proxy->PeerNodeId, ms) { FillSendingBuffer(*packet, serial); - } + } Y_VERIFY(!packet->IsEmpty()); InflightDataAmount += packet->GetDataSize(); Proxy->Metrics->AddInflightDataAmount(packet->GetDataSize()); if (InflightDataAmount > GetTotalInflightAmountOfData()) { Proxy->Metrics->IncInflyLimitReach(); - } + } - if (AtomicGet(ReceiveContext->ControlPacketId) == 0) { + if (AtomicGet(ReceiveContext->ControlPacketId) == 0) { AtomicSet(ReceiveContext->ControlPacketSendTimer, GetCycleCountFast()); - AtomicSet(ReceiveContext->ControlPacketId, OutputCounter); - } - - // update payload activity timer + AtomicSet(ReceiveContext->ControlPacketId, OutputCounter); + } + + // update payload activity timer LastPayloadActivityTimestamp = TActivationContext::Now(); } else if (pingMask) { serial = *pingMask; @@ -781,13 +781,13 @@ namespace NActors { SendQueue.splice(std::next(SendQueuePos), SendQueue, packet); } } - } + } const ui64 lastInputSerial = ReceiveContext->GetLastProcessedPacketSerial(); packet->SetMetadata(serial, lastInputSerial); - packet->Sign(); + packet->Sign(); - // count number of bytes pending for write + // count number of bytes pending for write ui64 packetSize = (Params.UseModernFrame ? sizeof(TTcpPacketHeader_v2) : sizeof(TTcpPacketHeader_v1)) + packet->GetDataSize(); BytesUnwritten += packetSize; @@ -795,15 +795,15 @@ namespace NActors { " InflightDataAmount# %" PRIu64 " BytesUnwritten# %" PRIu64, serial, lastInputSerial, packet->GetDataSize(), InflightDataAmount, BytesUnwritten); - // reset forced packet sending timestamp as we have confirmed all received data + // reset forced packet sending timestamp as we have confirmed all received data ResetFlushLogic(); - ++PacketsGenerated; + ++PacketsGenerated; LWTRACK(PacketGenerated, packet->Orbit, Proxy->PeerNodeId, BytesUnwritten, InflightDataAmount, PacketsGenerated, packetSize); - if (!data) { + if (!data) { WriteData(); - } + } return packetSize; } @@ -814,21 +814,21 @@ namespace NActors { Y_VERIFY(LastConfirmed <= confirm && confirm <= LastSentSerial && LastSentSerial <= OutputCounter, "%s confirm# %" PRIu64 " LastConfirmed# %" PRIu64 " OutputCounter# %" PRIu64 " LastSentSerial# %" PRIu64, LogPrefix.data(), confirm, LastConfirmed, OutputCounter, LastSentSerial); - LastConfirmed = confirm; + LastConfirmed = confirm; - ui64 droppedDataAmount = 0; - ui32 numDropped = 0; + ui64 droppedDataAmount = 0; + ui32 numDropped = 0; - // drop confirmed packets; this also includes any auxiliary packets as their serial is set to zero, effectively - // making Serial <= confirm true - TSendQueue::iterator it; + // drop confirmed packets; this also includes any auxiliary packets as their serial is set to zero, effectively + // making Serial <= confirm true + TSendQueue::iterator it; ui64 lastDroppedSerial = 0; for (it = SendQueue.begin(); it != SendQueuePos && it->Confirmed(confirm); ++it) { if (!it->IsEmpty()) { lastDroppedSerial = it->GetSerial(); } droppedDataAmount += it->GetDataSize(); - ++numDropped; + ++numDropped; } SendQueueCache.splice(SendQueueCache.begin(), SendQueue, SendQueue.begin(), it); TrimSendQueueCache(); @@ -840,7 +840,7 @@ namespace NActors { const ui64 limit = GetTotalInflightAmountOfData(); const bool unblockedSomething = current >= limit && current < limit + droppedDataAmount; - PacketsConfirmed += numDropped; + PacketsConfirmed += numDropped; InflightDataAmount -= droppedDataAmount; Proxy->Metrics->SubInflightDataAmount(droppedDataAmount); LWPROBE(DropConfirmed, Proxy->PeerNodeId, droppedDataAmount, InflightDataAmount); @@ -851,7 +851,7 @@ namespace NActors { Pool->Trim(); // send any unsent free requests return unblockedSomething; - } + } void TInterconnectSessionTCP::FillSendingBuffer(TTcpPacketOutTask& task, ui64 serial) { ui32 bytesGenerated = 0; @@ -914,42 +914,42 @@ namespace NActors { const ui32 utilization = Socket ? CalculateQueueUtilization() : 0; if (const auto& callback = Proxy->Common->UpdateWhiteboard) { - enum class EFlag { - GREEN, - YELLOW, - ORANGE, - RED, - }; - EFlag flagState = EFlag::RED; - - if (Socket) { - flagState = EFlag::GREEN; - - do { + enum class EFlag { + GREEN, + YELLOW, + ORANGE, + RED, + }; + EFlag flagState = EFlag::RED; + + if (Socket) { + flagState = EFlag::GREEN; + + do { auto lastInputDelay = TActivationContext::Now() - LastInputActivityTimestamp; - if (lastInputDelay * 4 >= GetDeadPeerTimeout() * 3) { - flagState = EFlag::ORANGE; - break; - } else if (lastInputDelay * 2 >= GetDeadPeerTimeout()) { - flagState = EFlag::YELLOW; - } - - // check utilization + if (lastInputDelay * 4 >= GetDeadPeerTimeout() * 3) { + flagState = EFlag::ORANGE; + break; + } else if (lastInputDelay * 2 >= GetDeadPeerTimeout()) { + flagState = EFlag::YELLOW; + } + + // check utilization if (utilization > 875000) { // 7/8 - flagState = EFlag::ORANGE; - break; + flagState = EFlag::ORANGE; + break; } else if (utilization > 500000) { // 1/2 - flagState = EFlag::YELLOW; - } - } while (false); - } + flagState = EFlag::YELLOW; + } + } while (false); + } callback(Proxy->Metrics->GetHumanFriendlyPeerHostName(), - connected, - flagState == EFlag::GREEN, - flagState == EFlag::YELLOW, - flagState == EFlag::ORANGE, - flagState == EFlag::RED, + connected, + flagState == EFlag::GREEN, + flagState == EFlag::YELLOW, + flagState == EFlag::ORANGE, + flagState == EFlag::RED, TlsActivationContext->ExecutorThread.ActorSystem); } @@ -958,34 +958,34 @@ namespace NActors { } } - void TInterconnectSessionTCP::SetOutputStuckFlag(bool state) { - if (OutputStuckFlag == state) - return; + void TInterconnectSessionTCP::SetOutputStuckFlag(bool state) { + if (OutputStuckFlag == state) + return; - if (OutputQueueUtilization.Size() == 0) - return; + if (OutputQueueUtilization.Size() == 0) + return; - auto& lastpair = OutputQueueUtilization.Last(); - if (state) + auto& lastpair = OutputQueueUtilization.Last(); + if (state) lastpair.first -= GetCycleCountFast(); - else + else lastpair.first += GetCycleCountFast(); - OutputStuckFlag = state; - } + OutputStuckFlag = state; + } - void TInterconnectSessionTCP::SwitchStuckPeriod() { + void TInterconnectSessionTCP::SwitchStuckPeriod() { auto now = GetCycleCountFast(); - if (OutputQueueUtilization.Size() != 0) { - auto& lastpair = OutputQueueUtilization.Last(); - lastpair.second = now - lastpair.second; - if (OutputStuckFlag) - lastpair.first += now; - } - - OutputQueueUtilization.Push(std::pair<ui64, ui64>(0, now)); + if (OutputQueueUtilization.Size() != 0) { + auto& lastpair = OutputQueueUtilization.Last(); + lastpair.second = now - lastpair.second; + if (OutputStuckFlag) + lastpair.first += now; + } + + OutputQueueUtilization.Push(std::pair<ui64, ui64>(0, now)); if (OutputStuckFlag) - OutputQueueUtilization.Last().first -= now; + OutputQueueUtilization.Last().first -= now; } TDuration TInterconnectSessionTCP::GetDeadPeerTimeout() const { @@ -1029,26 +1029,26 @@ namespace NActors { } void TInterconnectSessionTCP::GenerateHttpInfo(TStringStream& str) { - HTML(str) { - DIV_CLASS("panel panel-info") { - DIV_CLASS("panel-heading") { - str << "Session"; - } - DIV_CLASS("panel-body") { - TABLE_CLASS("table") { - TABLEHEAD() { - TABLER() { - TABLEH() { - str << "Sensor"; - } - TABLEH() { - str << "Value"; - } - } + HTML(str) { + DIV_CLASS("panel panel-info") { + DIV_CLASS("panel-heading") { + str << "Session"; + } + DIV_CLASS("panel-body") { + TABLE_CLASS("table") { + TABLEHEAD() { + TABLER() { + TABLEH() { + str << "Sensor"; + } + TABLEH() { + str << "Value"; + } + } } - TABLEBODY() { - TABLER() { - TABLED() { + TABLEBODY() { + TABLER() { + TABLED() { str << "Encryption"; } TABLED() { @@ -1111,89 +1111,89 @@ namespace NActors { } TABLER() { TABLED() { - str << "This page generated at"; - } - TABLED() { + str << "This page generated at"; + } + TABLED() { str << TActivationContext::Now() << " / " << Now(); - } - } - TABLER() { - TABLED() { - str << "SelfID"; - } - TABLED() { + } + } + TABLER() { + TABLED() { + str << "SelfID"; + } + TABLED() { str << SelfId().ToString(); - } - } + } + } TABLER() { TABLED() { str << "Frame version/Checksum"; } TABLED() { str << (!Params.UseModernFrame ? "v1/crc32c" : Params.Encryption ? "v2/none" : "v2/crc32c"); } } -#define MON_VAR(NAME) \ - TABLER() { \ - TABLED() { \ - str << #NAME; \ - } \ - TABLED() { \ - str << NAME; \ - } \ - } - - MON_VAR(Created) - MON_VAR(NewConnectionSet) - MON_VAR(ReceiverId) - MON_VAR(MessagesGot) - MON_VAR(MessagesWrittenToBuffer) - MON_VAR(PacketsGenerated) - MON_VAR(PacketsWrittenToSocket) - MON_VAR(PacketsConfirmed) - MON_VAR(AtomicGet(ReceiveContext->PacketsReadFromSocket)) - MON_VAR(ConfirmPacketsForcedBySize) - MON_VAR(ConfirmPacketsForcedByTimeout) - - TABLER() { - TABLED() { - str << "Virtual self ID"; - } - TABLED() { - str << Proxy->SessionVirtualId.ToString(); - } +#define MON_VAR(NAME) \ + TABLER() { \ + TABLED() { \ + str << #NAME; \ + } \ + TABLED() { \ + str << NAME; \ + } \ + } + + MON_VAR(Created) + MON_VAR(NewConnectionSet) + MON_VAR(ReceiverId) + MON_VAR(MessagesGot) + MON_VAR(MessagesWrittenToBuffer) + MON_VAR(PacketsGenerated) + MON_VAR(PacketsWrittenToSocket) + MON_VAR(PacketsConfirmed) + MON_VAR(AtomicGet(ReceiveContext->PacketsReadFromSocket)) + MON_VAR(ConfirmPacketsForcedBySize) + MON_VAR(ConfirmPacketsForcedByTimeout) + + TABLER() { + TABLED() { + str << "Virtual self ID"; + } + TABLED() { + str << Proxy->SessionVirtualId.ToString(); + } + } + TABLER() { + TABLED() { + str << "Virtual peer ID"; + } + TABLED() { + str << Proxy->RemoteSessionVirtualId.ToString(); + } + } + TABLER() { + TABLED() { + str << "Socket"; + } + TABLED() { + str << (Socket ? i64(*Socket) : -1); + } } - TABLER() { - TABLED() { - str << "Virtual peer ID"; - } - TABLED() { - str << Proxy->RemoteSessionVirtualId.ToString(); - } - } - TABLER() { - TABLED() { - str << "Socket"; - } - TABLED() { - str << (Socket ? i64(*Socket) : -1); - } - } ui32 unsentQueueSize = Socket ? Socket->GetUnsentQueueSize() : 0; - MON_VAR(OutputStuckFlag) - MON_VAR(SendQueue.size()) - MON_VAR(SendQueueCache.size()) + MON_VAR(OutputStuckFlag) + MON_VAR(SendQueue.size()) + MON_VAR(SendQueueCache.size()) MON_VAR(NumEventsInReadyChannels) - MON_VAR(TotalOutputQueueSize) - MON_VAR(BytesUnwritten) + MON_VAR(TotalOutputQueueSize) + MON_VAR(BytesUnwritten) MON_VAR(InflightDataAmount) MON_VAR(unsentQueueSize) MON_VAR(SendBufferSize) - MON_VAR(LastInputActivityTimestamp) - MON_VAR(LastPayloadActivityTimestamp) - MON_VAR(LastHandshakeDone) - MON_VAR(OutputCounter) + MON_VAR(LastInputActivityTimestamp) + MON_VAR(LastPayloadActivityTimestamp) + MON_VAR(LastHandshakeDone) + MON_VAR(OutputCounter) MON_VAR(LastSentSerial) MON_VAR(ReceiveContext->GetLastProcessedPacketSerial()) - MON_VAR(LastConfirmed) + MON_VAR(LastConfirmed) MON_VAR(FlushSchedule.size()) MON_VAR(MaxFlushSchedule) MON_VAR(FlushEventsScheduled) @@ -1211,11 +1211,11 @@ namespace NActors { MON_VAR(GetPingRTT()) MON_VAR(clockSkew) - MON_VAR(GetDeadPeerTimeout()) + MON_VAR(GetDeadPeerTimeout()) MON_VAR(GetTotalInflightAmountOfData()) MON_VAR(GetCloseOnIdleTimeout()) MON_VAR(Subscribers.size()) - } + } } } } @@ -1224,5 +1224,5 @@ namespace NActors { void CreateSessionKillingActor(TInterconnectProxyCommon::TPtr common) { TlsActivationContext->ExecutorThread.ActorSystem->Register(new TInterconnectSessionKiller(common)); - } + } } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index 1ecd829132..7fc00dbcc5 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -30,61 +30,61 @@ #include <unordered_map> namespace NActors { - class TSlowPathChecker { - using TTraceCallback = std::function<void(double)>; - TTraceCallback Callback; - const NHPTimer::STime Start; - - public: - TSlowPathChecker(TTraceCallback&& callback) - : Callback(std::move(callback)) + class TSlowPathChecker { + using TTraceCallback = std::function<void(double)>; + TTraceCallback Callback; + const NHPTimer::STime Start; + + public: + TSlowPathChecker(TTraceCallback&& callback) + : Callback(std::move(callback)) , Start(GetCycleCountFast()) - { - } + { + } - ~TSlowPathChecker() { + ~TSlowPathChecker() { const NHPTimer::STime end = GetCycleCountFast(); - const NHPTimer::STime elapsed = end - Start; - if (elapsed > 1000000) { - Callback(NHPTimer::GetSeconds(elapsed) * 1000); - } - } - - operator bool() const { - return false; + const NHPTimer::STime elapsed = end - Start; + if (elapsed > 1000000) { + Callback(NHPTimer::GetSeconds(elapsed) * 1000); + } } - }; -#define LWPROBE_IF_TOO_LONG(...) \ - if (auto __x = TSlowPathChecker{[&](double ms) { LWPROBE(__VA_ARGS__); }}) \ - ; \ - else + operator bool() const { + return false; + } + }; - class TTimeLimit { - public: - TTimeLimit(ui64 limitInCycles) +#define LWPROBE_IF_TOO_LONG(...) \ + if (auto __x = TSlowPathChecker{[&](double ms) { LWPROBE(__VA_ARGS__); }}) \ + ; \ + else + + class TTimeLimit { + public: + TTimeLimit(ui64 limitInCycles) : UpperLimit(limitInCycles == 0 ? 0 : GetCycleCountFast() + limitInCycles) - { - } + { + } - TTimeLimit(ui64 startTS, ui64 limitInCycles) - : UpperLimit(limitInCycles == 0 ? 0 : startTS + limitInCycles) - { - } + TTimeLimit(ui64 startTS, ui64 limitInCycles) + : UpperLimit(limitInCycles == 0 ? 0 : startTS + limitInCycles) + { + } - bool CheckExceeded() { + bool CheckExceeded() { return UpperLimit != 0 && GetCycleCountFast() > UpperLimit; - } + } - const ui64 UpperLimit; - }; + const ui64 UpperLimit; + }; - static constexpr TDuration DEFAULT_DEADPEER_TIMEOUT = TDuration::Seconds(10); + static constexpr TDuration DEFAULT_DEADPEER_TIMEOUT = TDuration::Seconds(10); static constexpr TDuration DEFAULT_LOST_CONNECTION_TIMEOUT = TDuration::Seconds(10); - static constexpr ui32 DEFAULT_MAX_INFLIGHT_DATA = 10240 * 1024; - static constexpr ui32 DEFAULT_TOTAL_INFLIGHT_DATA = 4 * 10240 * 1024; + static constexpr ui32 DEFAULT_MAX_INFLIGHT_DATA = 10240 * 1024; + static constexpr ui32 DEFAULT_TOTAL_INFLIGHT_DATA = 4 * 10240 * 1024; - class TInterconnectProxyTCP; + class TInterconnectProxyTCP; enum class EUpdateState : ui8 { NONE, // no updates generated by input session yet @@ -93,22 +93,22 @@ namespace NActors { CONFIRMING, // confirmation inflight }; - struct TReceiveContext: public TAtomicRefCount<TReceiveContext> { - /* All invokations to these fields should be thread-safe */ + struct TReceiveContext: public TAtomicRefCount<TReceiveContext> { + /* All invokations to these fields should be thread-safe */ - ui64 ControlPacketSendTimer = 0; - ui64 ControlPacketId = 0; + ui64 ControlPacketSendTimer = 0; + ui64 ControlPacketId = 0; - // number of packets received by input session - TAtomic PacketsReadFromSocket = 0; - TAtomic DataPacketsReadFromSocket = 0; + // number of packets received by input session + TAtomic PacketsReadFromSocket = 0; + TAtomic DataPacketsReadFromSocket = 0; // last processed packet by input session std::atomic_uint64_t LastProcessedPacketSerial = 0; static constexpr uint64_t LastProcessedPacketSerialLockBit = uint64_t(1) << 63; - // for hardened checks - TAtomic NumInputSessions = 0; + // for hardened checks + TAtomic NumInputSessions = 0; NHPTimer::STime StartTime; @@ -160,9 +160,9 @@ namespace NActors { ui64 GetLastProcessedPacketSerial() { return LastProcessedPacketSerial.load() & ~LastProcessedPacketSerialLockBit; } - }; + }; - class TInputSessionTCP + class TInputSessionTCP : public TActorBootstrapped<TInputSessionTCP> , public TInterconnectLoggingBase { @@ -174,14 +174,14 @@ namespace NActors { struct TEvCheckDeadPeer : TEventLocal<TEvCheckDeadPeer, EvCheckDeadPeer> {}; struct TEvResumeReceiveData : TEventLocal<TEvResumeReceiveData, EvResumeReceiveData> {}; - public: - static constexpr EActivityType ActorActivityType() { + public: + static constexpr EActivityType ActorActivityType() { return INTERCONNECT_SESSION_TCP; - } + } TInputSessionTCP(const TActorId& sessionId, - TIntrusivePtr<NInterconnect::TStreamSocket> socket, - TIntrusivePtr<TReceiveContext> context, + TIntrusivePtr<NInterconnect::TStreamSocket> socket, + TIntrusivePtr<TReceiveContext> context, TInterconnectProxyCommon::TPtr common, std::shared_ptr<IInterconnectMetrics> metrics, ui32 nodeId, @@ -189,8 +189,8 @@ namespace NActors { TDuration deadPeerTimeout, TSessionParams params); - private: - friend class TActorBootstrapped<TInputSessionTCP>; + private: + friend class TActorBootstrapped<TInputSessionTCP>; void Bootstrap(); @@ -204,13 +204,13 @@ namespace NActors { cFunc(TEvConfirmUpdate::EventType, HandleConfirmUpdate) ) - private: + private: TRope IncomingData; const TActorId SessionId; - TIntrusivePtr<NInterconnect::TStreamSocket> Socket; + TIntrusivePtr<NInterconnect::TStreamSocket> Socket; TPollerToken::TPtr PollerToken; - TIntrusivePtr<TReceiveContext> Context; + TIntrusivePtr<TReceiveContext> Context; TInterconnectProxyCommon::TPtr Common; const ui32 NodeId; const TSessionParams Params; @@ -235,11 +235,11 @@ namespace NActors { THolder<TEvUpdateFromInputSession> UpdateFromInputSession; - ui64 ConfirmedByInput; + ui64 ConfirmedByInput; std::shared_ptr<IInterconnectMetrics> Metrics; - bool CloseInputSessionRequested = false; + bool CloseInputSessionRequested = false; void CloseInputSession(); @@ -258,12 +258,12 @@ namespace NActors { TDeque<TIntrusivePtr<TRopeAlignedBuffer>> Buffers; - static constexpr size_t NumPreallocatedBuffers = 16; - void PreallocateBuffers(); + static constexpr size_t NumPreallocatedBuffers = 16; + void PreallocateBuffers(); - inline ui64 GetMaxCyclesPerEvent() const { + inline ui64 GetMaxCyclesPerEvent() const { return DurationToCycles(TDuration::MicroSeconds(500)); - } + } const TDuration DeadPeerTimeout; TInstant LastReceiveTimestamp; @@ -278,21 +278,21 @@ namespace NActors { void HandlePingResponse(TDuration passed); void HandleClock(TInstant clock); - }; + }; - class TInterconnectSessionTCP + class TInterconnectSessionTCP : public TActor<TInterconnectSessionTCP> , public TInterconnectLoggingBase { - enum { + enum { EvCheckCloseOnIdle = EventSpaceBegin(TEvents::ES_PRIVATE), EvCheckLostConnection, EvRam, EvTerminate, EvFreeItems, - }; + }; - struct TEvCheckCloseOnIdle : TEventLocal<TEvCheckCloseOnIdle, EvCheckCloseOnIdle> {}; + struct TEvCheckCloseOnIdle : TEventLocal<TEvCheckCloseOnIdle, EvCheckCloseOnIdle> {}; struct TEvCheckLostConnection : TEventLocal<TEvCheckLostConnection, EvCheckLostConnection> {}; struct TEvRam : TEventLocal<TEvRam, EvRam> { @@ -308,18 +308,18 @@ namespace NActors { {} }; - const TInstant Created; - TInstant NewConnectionSet; - ui64 MessagesGot = 0; - ui64 MessagesWrittenToBuffer = 0; - ui64 PacketsGenerated = 0; - ui64 PacketsWrittenToSocket = 0; - ui64 PacketsConfirmed = 0; + const TInstant Created; + TInstant NewConnectionSet; + ui64 MessagesGot = 0; + ui64 MessagesWrittenToBuffer = 0; + ui64 PacketsGenerated = 0; + ui64 PacketsWrittenToSocket = 0; + ui64 PacketsConfirmed = 0; - public: - static constexpr EActivityType ActorActivityType() { - return INTERCONNECT_SESSION_TCP; - } + public: + static constexpr EActivityType ActorActivityType() { + return INTERCONNECT_SESSION_TCP; + } TInterconnectSessionTCP(TInterconnectProxyTCP* const proxy, TSessionParams params); ~TInterconnectSessionTCP(); @@ -339,8 +339,8 @@ namespace NActors { return ReceiveContext->ClockSkew_us; } - private: - friend class TInterconnectProxyTCP; + private: + friend class TInterconnectProxyTCP; void Handle(TEvTerminate::TPtr& ev); void HandlePoison(); @@ -400,7 +400,7 @@ namespace NActors { void ReestablishConnectionWithHandshake(TDisconnectReason reason); void ReestablishConnectionExecute(); - TInterconnectProxyTCP* const Proxy; + TInterconnectProxyTCP* const Proxy; // various connection settings access TDuration GetDeadPeerTimeout() const; @@ -418,11 +418,11 @@ namespace NActors { void IssuePingRequest(); void Handle(TEvProcessPingRequest::TPtr ev); - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - TInstant LastInputActivityTimestamp; - TInstant LastPayloadActivityTimestamp; - TWatchdogTimer<TEvCheckCloseOnIdle> CloseOnIdleWatchdog; + TInstant LastInputActivityTimestamp; + TInstant LastPayloadActivityTimestamp; + TWatchdogTimer<TEvCheckCloseOnIdle> CloseOnIdleWatchdog; TWatchdogTimer<TEvCheckLostConnection> LostConnectionWatchdog; void OnCloseOnIdleTimerHit() { @@ -435,26 +435,26 @@ namespace NActors { Terminate(TDisconnectReason::LostConnection()); } - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// const TSessionParams Params; TMaybe<TEventHolderPool> Pool; TMaybe<TChannelScheduler> ChannelScheduler; - ui64 TotalOutputQueueSize; - bool OutputStuckFlag; - TRecentWnd<std::pair<ui64, ui64>> OutputQueueUtilization; + ui64 TotalOutputQueueSize; + bool OutputStuckFlag; + TRecentWnd<std::pair<ui64, ui64>> OutputQueueUtilization; size_t NumEventsInReadyChannels = 0; - void SetOutputStuckFlag(bool state); - void SwitchStuckPeriod(); + void SetOutputStuckFlag(bool state); + void SwitchStuckPeriod(); - using TSendQueue = TList<TTcpPacketOutTask>; - TSendQueue SendQueue; - TSendQueue SendQueueCache; - TSendQueue::iterator SendQueuePos; + using TSendQueue = TList<TTcpPacketOutTask>; + TSendQueue SendQueue; + TSendQueue SendQueueCache; + TSendQueue::iterator SendQueuePos; ui64 WriteBlockedCycles = 0; // start of current block period TDuration WriteBlockedTotal; // total incremental duration that session has been blocked - ui64 BytesUnwritten = 0; + ui64 BytesUnwritten = 0; void TrimSendQueueCache(); @@ -467,21 +467,21 @@ namespace NActors { } } - ui64 OutputCounter; + ui64 OutputCounter; ui64 LastSentSerial = 0; - TInstant LastHandshakeDone; + TInstant LastHandshakeDone; - TIntrusivePtr<NInterconnect::TStreamSocket> Socket; + TIntrusivePtr<NInterconnect::TStreamSocket> Socket; TPollerToken::TPtr PollerToken; ui32 SendBufferSize; ui64 InflightDataAmount = 0; std::unordered_map<TActorId, ui64, TActorId::THash> Subscribers; - // time at which we want to send confirmation packet even if there was no outgoing data - ui64 UnconfirmedBytes = 0; - TInstant ForcePacketTimestamp = TInstant::Max(); + // time at which we want to send confirmation packet even if there was no outgoing data + ui64 UnconfirmedBytes = 0; + TInstant ForcePacketTimestamp = TInstant::Max(); TPriorityQueue<TInstant, TVector<TInstant>, std::greater<TInstant>> FlushSchedule; size_t MaxFlushSchedule = 0; ui64 FlushEventsScheduled = 0; @@ -494,71 +494,71 @@ namespace NActors { void GenerateHttpInfo(TStringStream& str); - TIntrusivePtr<TReceiveContext> ReceiveContext; + TIntrusivePtr<TReceiveContext> ReceiveContext; TActorId ReceiverId; TDuration Ping; - ui64 ConfirmPacketsForcedBySize = 0; - ui64 ConfirmPacketsForcedByTimeout = 0; + ui64 ConfirmPacketsForcedBySize = 0; + ui64 ConfirmPacketsForcedByTimeout = 0; - ui64 LastConfirmed = 0; + ui64 LastConfirmed = 0; TEvHandshakeDone::TPtr PendingHandshakeDoneEvent; bool StartHandshakeOnSessionClose = false; ui64 EqualizeCounter = 0; - }; + }; - class TInterconnectSessionKiller - : public TActorBootstrapped<TInterconnectSessionKiller> { - ui32 RepliesReceived = 0; - ui32 RepliesNumber = 0; + class TInterconnectSessionKiller + : public TActorBootstrapped<TInterconnectSessionKiller> { + ui32 RepliesReceived = 0; + ui32 RepliesNumber = 0; TActorId LargestSession = TActorId(); - ui64 MaxBufferSize = 0; + ui64 MaxBufferSize = 0; TInterconnectProxyCommon::TPtr Common; - public: + public: static constexpr EActivityType ActorActivityType() { return INTERCONNECT_SESSION_KILLER; } TInterconnectSessionKiller(TInterconnectProxyCommon::TPtr common) - : Common(common) - { - } + : Common(common) + { + } void Bootstrap() { - auto sender = SelfId(); + auto sender = SelfId(); const auto eventFabric = [&sender](const TActorId& recp) -> IEventHandle* { - auto ev = new TEvSessionBufferSizeRequest(); - return new IEventHandle(recp, sender, ev, IEventHandle::FlagTrackDelivery); - }; + auto ev = new TEvSessionBufferSizeRequest(); + return new IEventHandle(recp, sender, ev, IEventHandle::FlagTrackDelivery); + }; RepliesNumber = TlsActivationContext->ExecutorThread.ActorSystem->BroadcastToProxies(eventFabric); - Become(&TInterconnectSessionKiller::StateFunc); - } + Become(&TInterconnectSessionKiller::StateFunc); + } STRICT_STFUNC(StateFunc, hFunc(TEvSessionBufferSizeResponse, ProcessResponse) cFunc(TEvents::TEvUndelivered::EventType, ProcessUndelivered) ) - + void ProcessResponse(TEvSessionBufferSizeResponse::TPtr& ev) { - RepliesReceived++; - if (MaxBufferSize < ev->Get()->BufferSize) { - MaxBufferSize = ev->Get()->BufferSize; - LargestSession = ev->Get()->SessionID; - } - if (RepliesReceived == RepliesNumber) { + RepliesReceived++; + if (MaxBufferSize < ev->Get()->BufferSize) { + MaxBufferSize = ev->Get()->BufferSize; + LargestSession = ev->Get()->SessionID; + } + if (RepliesReceived == RepliesNumber) { Send(LargestSession, new TEvents::TEvPoisonPill); - AtomicUnlock(&Common->StartedSessionKiller); + AtomicUnlock(&Common->StartedSessionKiller); PassAway(); - } + } } - + void ProcessUndelivered() { - RepliesReceived++; + RepliesReceived++; } - }; + }; void CreateSessionKillingActor(TInterconnectProxyCommon::TPtr common); diff --git a/library/cpp/actors/interconnect/load.cpp b/library/cpp/actors/interconnect/load.cpp index f166ca0a99..2a8443da71 100644 --- a/library/cpp/actors/interconnect/load.cpp +++ b/library/cpp/actors/interconnect/load.cpp @@ -34,9 +34,9 @@ namespace NInterconnect { CFunc(TEvents::TSystem::PoisonPill, Die); ) - void Handle(TEvLoadMessage::TPtr& ev, const TActorContext& ctx) { + void Handle(TEvLoadMessage::TPtr& ev, const TActorContext& ctx) { ui64 bytes = ev->Get()->CalculateSerializedSizeCached(); - auto& record = ev->Get()->Record; + auto& record = ev->Get()->Record; auto *hops = record.MutableHops(); while (!hops->empty() && !hops->begin()->HasNextHop()) { record.ClearPayload(); @@ -81,7 +81,7 @@ namespace NInterconnect { CFunc(TEvents::TSystem::PoisonPill, Die); ) - void Handle(TEvLoadMessage::TPtr& ev, const TActorContext& ctx) { + void Handle(TEvLoadMessage::TPtr& ev, const TActorContext& ctx) { ctx.ExecutorThread.ActorSystem->Send(ev->Forward(Slaves[SlaveIndex])); if (++SlaveIndex == Slaves.size()) { SlaveIndex = 0; @@ -107,7 +107,7 @@ namespace NInterconnect { TLoadResponderMasterActor() {} - void Bootstrap(const TActorContext& ctx) { + void Bootstrap(const TActorContext& ctx) { Become(&TLoadResponderMasterActor::StateFunc); while (Slaves.size() < 10) { Slaves.push_back(ctx.Register(new TLoadResponderActor(Traffic))); @@ -118,7 +118,7 @@ namespace NInterconnect { std::shared_ptr<std::atomic_uint64_t> Traffic = std::make_shared<std::atomic_uint64_t>(); }; - IActor* CreateLoadResponderActor() { + IActor* CreateLoadResponderActor() { return new TLoadResponderMasterActor(); } @@ -127,17 +127,17 @@ namespace NInterconnect { return TActorId(nodeId, TStringBuf(x, 12)); } - class TLoadActor: public TActorBootstrapped<TLoadActor> { + class TLoadActor: public TActorBootstrapped<TLoadActor> { struct TEvGenerateMessages : TEventLocal<TEvGenerateMessages, EvGenerateMessages> {}; struct TEvPublishResults : TEventLocal<TEvPublishResults, EvPublishResults> {}; struct TMessageInfo { TInstant SendTimestamp; - TMessageInfo(const TInstant& sendTimestamp) + TMessageInfo(const TInstant& sendTimestamp) : SendTimestamp(sendTimestamp) - { - } + { + } }; const TLoadParams Params; @@ -154,14 +154,14 @@ namespace NInterconnect { return IActor::INTERCONNECT_LOAD_ACTOR; } - TLoadActor(const TLoadParams& params) + TLoadActor(const TLoadParams& params) : Params(params) {} void Bootstrap(const TActorContext& ctx) { Become(&TLoadActor::QueryTrafficCounter); ctx.Send(MakeLoadResponderActorId(SelfId().NodeId()), new TEvQueryTrafficCounter); - } + } void Handle(TEvTrafficCounter::TPtr ev, const TActorContext& ctx) { Traffic = std::move(ev->Get()->Traffic); @@ -185,7 +185,7 @@ namespace NInterconnect { SchedulePublishResults(ctx); } - void GenerateMessages(const TActorContext& ctx) { + void GenerateMessages(const TActorContext& ctx) { while (InFly.size() < Params.InFlyMax && ctx.Now() >= NextMessageTimestamp) { // generate payload const ui32 size = Params.SizeMin + RandomNumber(Params.SizeMax - Params.SizeMin + 1); @@ -232,8 +232,8 @@ namespace NInterconnect { } } - void Handle(TEvLoadMessage::TPtr& ev, const TActorContext& ctx) { - const auto& record = ev->Get()->Record; + void Handle(TEvLoadMessage::TPtr& ev, const TActorContext& ctx) { + const auto& record = ev->Get()->Record; auto it = InFly.find(record.GetId()); if (it != InFly.end()) { // record message rtt @@ -294,18 +294,18 @@ namespace NInterconnect { TQueue<std::pair<TInstant, TString>> TimeoutQueue; - void PutTimeoutQueueItem(const TActorContext& ctx, TString id) { + void PutTimeoutQueueItem(const TActorContext& ctx, TString id) { TimeoutQueue.emplace(ctx.Now() + TDuration::Minutes(1), std::move(id)); if (TimeoutQueue.size() == 1) { ScheduleWakeup(ctx); } } - void ScheduleWakeup(const TActorContext& ctx) { + void ScheduleWakeup(const TActorContext& ctx) { ctx.Schedule(TimeoutQueue.front().first - ctx.Now(), new TEvents::TEvWakeup); } - void HandleWakeup(const TActorContext& ctx) { + void HandleWakeup(const TActorContext& ctx) { ui32 numDropped = 0; while (TimeoutQueue && TimeoutQueue.front().first <= ctx.Now()) { @@ -326,11 +326,11 @@ namespace NInterconnect { const TDuration ResultPublishPeriod = TDuration::Seconds(15); - void SchedulePublishResults(const TActorContext& ctx) { + void SchedulePublishResults(const TActorContext& ctx) { ctx.Schedule(ResultPublishPeriod, new TEvPublishResults); } - void PublishResults(const TActorContext& ctx, bool schedule = true) { + void PublishResults(const TActorContext& ctx, bool schedule = true) { const TInstant now = ctx.Now(); TStringStream msg; @@ -354,7 +354,7 @@ namespace NInterconnect { msg << "{window# " << duration << " samples# " << Histogram.size(); TVector<TDuration> v; v.reserve(Histogram.size()); - for (const auto& item : Histogram) { + for (const auto& item : Histogram) { v.push_back(item.second); } std::sort(v.begin(), v.end()); @@ -398,8 +398,8 @@ namespace NInterconnect { } }; - IActor* CreateLoadActor(const TLoadParams& params) { + IActor* CreateLoadActor(const TLoadParams& params) { return new TLoadActor(params); } -} +} diff --git a/library/cpp/actors/interconnect/load.h b/library/cpp/actors/interconnect/load.h index 78c80c8e17..0a01a0dc04 100644 --- a/library/cpp/actors/interconnect/load.h +++ b/library/cpp/actors/interconnect/load.h @@ -4,21 +4,21 @@ namespace NInterconnect { // load responder -- lives on every node as a service actor - NActors::IActor* CreateLoadResponderActor(); + NActors::IActor* CreateLoadResponderActor(); NActors::TActorId MakeLoadResponderActorId(ui32 node); // load actor -- generates load with specific parameters struct TLoadParams { TString Name; ui32 Channel; - TVector<ui32> NodeHops; // node ids for the message route - ui32 SizeMin, SizeMax; // min and max size for payloads - ui32 InFlyMax; // maximum number of in fly messages + TVector<ui32> NodeHops; // node ids for the message route + ui32 SizeMin, SizeMax; // min and max size for payloads + ui32 InFlyMax; // maximum number of in fly messages TDuration IntervalMin, IntervalMax; // min and max intervals between sending messages - bool SoftLoad; // is the load soft? - TDuration Duration; // test duration + bool SoftLoad; // is the load soft? + TDuration Duration; // test duration bool UseProtobufWithPayload; // store payload separately }; - NActors::IActor* CreateLoadActor(const TLoadParams& params); + NActors::IActor* CreateLoadActor(const TLoadParams& params); -} +} diff --git a/library/cpp/actors/interconnect/logging.h b/library/cpp/actors/interconnect/logging.h index 4d628ba859..c429d1cade 100644 --- a/library/cpp/actors/interconnect/logging.h +++ b/library/cpp/actors/interconnect/logging.h @@ -18,7 +18,7 @@ #define LOG_LOG_IC(component, marker, priority, ...) \ do { \ LOG_LOG(::NActors::TActivationContext::AsActorContext(), (priority), (component), "%s " marker " %s", LogPrefix.data(), Sprintf(__VA_ARGS__).data()); \ - } while (false) + } while (false) #define LOG_LOG_NET(priority, NODE_ID, FMT, ...) \ do { \ @@ -58,8 +58,8 @@ namespace NActors { TInterconnectLoggingBase(const TString& prefix) : LogPrefix(prefix) - { - } + { + } void SetPrefix(TString logPrefix) const { logPrefix.swap(const_cast<TString&>(LogPrefix)); diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h index e7cbb453f6..4ba50a2b5f 100644 --- a/library/cpp/actors/interconnect/packet.h +++ b/library/cpp/actors/interconnect/packet.h @@ -16,10 +16,10 @@ #define FORCE_EVENT_CHECKSUM 0 #endif -using NActors::IEventBase; -using NActors::IEventHandle; +using NActors::IEventBase; +using NActors::IEventHandle; using NActors::TActorId; -using NActors::TConstIoVec; +using NActors::TConstIoVec; using NActors::TEventSerializedData; Y_FORCE_INLINE ui32 Crc32cExtendMSanCompatible(ui32 checksum, const void *data, size_t len) { @@ -142,7 +142,7 @@ struct TEventHolder : TNonCopyable { namespace NActors { class TEventOutputChannel; -} +} struct TTcpPacketOutTask : TNonCopyable { const TSessionParams& Params; @@ -192,9 +192,9 @@ public: Orbit.Reset(); } - bool IsEmpty() const { + bool IsEmpty() const { return !DataSize; - } + } void SetMetadata(ui64 serial, ui64 confirm) { ApplyToHeader([&](auto& header) { @@ -261,7 +261,7 @@ public: bool DropBufs(size_t& amount) { while (BufferIndex != Bufs.size()) { - TConstIoVec& item = Bufs[BufferIndex]; + TConstIoVec& item = Bufs[BufferIndex]; // calculate number of bytes to the end in current buffer const size_t remain = item.Size - FirstBufferOffset; if (amount >= remain) { @@ -285,11 +285,11 @@ public: TriedWriting = false; } - template <typename TVectorType> - void AppendToIoVector(TVectorType& vector, size_t max) { + template <typename TVectorType> + void AppendToIoVector(TVectorType& vector, size_t max) { for (size_t k = BufferIndex, offset = FirstBufferOffset; k != Bufs.size() && vector.size() < max; ++k, offset = 0) { TConstIoVec v = Bufs[k]; - v.Data = static_cast<const char*>(v.Data) + offset; + v.Data = static_cast<const char*>(v.Data) + offset; v.Size -= offset; vector.push_back(v); } diff --git a/library/cpp/actors/interconnect/poller.h b/library/cpp/actors/interconnect/poller.h index 476a7a5e92..ff7979369f 100644 --- a/library/cpp/actors/interconnect/poller.h +++ b/library/cpp/actors/interconnect/poller.h @@ -4,20 +4,20 @@ #include <library/cpp/actors/core/events.h> namespace NActors { - class TSharedDescriptor: public TThrRefBase { - public: - virtual int GetDescriptor() = 0; - }; + class TSharedDescriptor: public TThrRefBase { + public: + virtual int GetDescriptor() = 0; + }; - using TDelegate = std::function<void()>; - using TFDDelegate = std::function<TDelegate(const TIntrusivePtr<TSharedDescriptor>&)>; + using TDelegate = std::function<void()>; + using TFDDelegate = std::function<TDelegate(const TIntrusivePtr<TSharedDescriptor>&)>; - class IPoller: public TThrRefBase { - public: - virtual ~IPoller() = default; + class IPoller: public TThrRefBase { + public: + virtual ~IPoller() = default; - virtual void StartRead(const TIntrusivePtr<TSharedDescriptor>& s, TFDDelegate&& operation) = 0; - virtual void StartWrite(const TIntrusivePtr<TSharedDescriptor>& s, TFDDelegate&& operation) = 0; - }; + virtual void StartRead(const TIntrusivePtr<TSharedDescriptor>& s, TFDDelegate&& operation) = 0; + virtual void StartWrite(const TIntrusivePtr<TSharedDescriptor>& s, TFDDelegate&& operation) = 0; + }; } diff --git a/library/cpp/actors/interconnect/poller_actor.cpp b/library/cpp/actors/interconnect/poller_actor.cpp index 930d334821..e75cbcaef4 100644 --- a/library/cpp/actors/interconnect/poller_actor.cpp +++ b/library/cpp/actors/interconnect/poller_actor.cpp @@ -36,7 +36,7 @@ namespace NActors { const TActorId WriteActorId; std::atomic_uint32_t Flags = 0; - TSocketRecord(TEvPollerRegister& ev) + TSocketRecord(TEvPollerRegister& ev) : Socket(std::move(ev.Socket)) , ReadActorId(ev.ReadActorId) , WriteActorId(ev.WriteActorId) @@ -287,8 +287,8 @@ namespace NActors { Impl->Request(read, write); } - IActor* CreatePollerActor() { + IActor* CreatePollerActor() { return new TPollerActor; } -} +} diff --git a/library/cpp/actors/interconnect/poller_actor.h b/library/cpp/actors/interconnect/poller_actor.h index 899dbb73e1..f927b82089 100644 --- a/library/cpp/actors/interconnect/poller_actor.h +++ b/library/cpp/actors/interconnect/poller_actor.h @@ -53,11 +53,11 @@ namespace NActors { {} }; - IActor* CreatePollerActor(); + IActor* CreatePollerActor(); inline TActorId MakePollerActorId() { char x[12] = {'I', 'C', 'P', 'o', 'l', 'l', 'e', 'r', '\xDE', '\xAD', '\xBE', '\xEF'}; return TActorId(0, TStringBuf(std::begin(x), std::end(x))); } -} +} diff --git a/library/cpp/actors/interconnect/poller_tcp.cpp b/library/cpp/actors/interconnect/poller_tcp.cpp index 456bca8b98..8267df31ea 100644 --- a/library/cpp/actors/interconnect/poller_tcp.cpp +++ b/library/cpp/actors/interconnect/poller_tcp.cpp @@ -1,35 +1,35 @@ #include "poller_tcp.h" namespace NInterconnect { - TPollerThreads::TPollerThreads(size_t units, bool useSelect) - : Units(units) - { - Y_VERIFY_DEBUG(!Units.empty()); - for (auto& unit : Units) - unit = TPollerUnit::Make(useSelect); - } + TPollerThreads::TPollerThreads(size_t units, bool useSelect) + : Units(units) + { + Y_VERIFY_DEBUG(!Units.empty()); + for (auto& unit : Units) + unit = TPollerUnit::Make(useSelect); + } - TPollerThreads::~TPollerThreads() { - } + TPollerThreads::~TPollerThreads() { + } - void TPollerThreads::Start() { - for (const auto& unit : Units) - unit->Start(); - } + void TPollerThreads::Start() { + for (const auto& unit : Units) + unit->Start(); + } - void TPollerThreads::Stop() { - for (const auto& unit : Units) - unit->Stop(); - } + void TPollerThreads::Stop() { + for (const auto& unit : Units) + unit->Stop(); + } - void TPollerThreads::StartRead(const TIntrusivePtr<TSharedDescriptor>& s, TFDDelegate&& operation) { - auto& unit = Units[THash<SOCKET>()(s->GetDescriptor()) % Units.size()]; - unit->StartReadOperation(s, std::move(operation)); - } + void TPollerThreads::StartRead(const TIntrusivePtr<TSharedDescriptor>& s, TFDDelegate&& operation) { + auto& unit = Units[THash<SOCKET>()(s->GetDescriptor()) % Units.size()]; + unit->StartReadOperation(s, std::move(operation)); + } - void TPollerThreads::StartWrite(const TIntrusivePtr<TSharedDescriptor>& s, TFDDelegate&& operation) { - auto& unit = Units[THash<SOCKET>()(s->GetDescriptor()) % Units.size()]; - unit->StartWriteOperation(s, std::move(operation)); - } + void TPollerThreads::StartWrite(const TIntrusivePtr<TSharedDescriptor>& s, TFDDelegate&& operation) { + auto& unit = Units[THash<SOCKET>()(s->GetDescriptor()) % Units.size()]; + unit->StartWriteOperation(s, std::move(operation)); + } } diff --git a/library/cpp/actors/interconnect/poller_tcp.h b/library/cpp/actors/interconnect/poller_tcp.h index 2ae8631237..310265eccd 100644 --- a/library/cpp/actors/interconnect/poller_tcp.h +++ b/library/cpp/actors/interconnect/poller_tcp.h @@ -7,19 +7,19 @@ #include <util/generic/hash.h> namespace NInterconnect { - class TPollerThreads: public NActors::IPoller { - public: - TPollerThreads(size_t units = 1U, bool useSelect = false); - ~TPollerThreads(); + class TPollerThreads: public NActors::IPoller { + public: + TPollerThreads(size_t units = 1U, bool useSelect = false); + ~TPollerThreads(); - void Start(); - void Stop(); + void Start(); + void Stop(); - void StartRead(const TIntrusivePtr<TSharedDescriptor>& s, TFDDelegate&& operation) override; - void StartWrite(const TIntrusivePtr<TSharedDescriptor>& s, TFDDelegate&& operation) override; + void StartRead(const TIntrusivePtr<TSharedDescriptor>& s, TFDDelegate&& operation) override; + void StartWrite(const TIntrusivePtr<TSharedDescriptor>& s, TFDDelegate&& operation) override; - private: - TVector<TPollerUnit::TPtr> Units; - }; + private: + TVector<TPollerUnit::TPtr> Units; + }; } diff --git a/library/cpp/actors/interconnect/poller_tcp_unit.cpp b/library/cpp/actors/interconnect/poller_tcp_unit.cpp index 36180353b6..59e7dda810 100644 --- a/library/cpp/actors/interconnect/poller_tcp_unit.cpp +++ b/library/cpp/actors/interconnect/poller_tcp_unit.cpp @@ -1,7 +1,7 @@ #include "poller_tcp_unit.h" #if !defined(_win_) && !defined(_darwin_) -#include "poller_tcp_unit_epoll.h" +#include "poller_tcp_unit_epoll.h" #endif #include "poller_tcp_unit_select.h" @@ -11,116 +11,116 @@ #include <library/cpp/actors/util/intrinsics.h> #if defined _linux_ -#include <pthread.h> +#include <pthread.h> #endif namespace NInterconnect { - TPollerUnit::TPtr - TPollerUnit::Make(bool useSelect) { + TPollerUnit::TPtr + TPollerUnit::Make(bool useSelect) { #if defined(_win_) || defined(_darwin_) - Y_UNUSED(useSelect); - return TPtr(new TPollerUnitSelect); + Y_UNUSED(useSelect); + return TPtr(new TPollerUnitSelect); #else - return useSelect ? TPtr(new TPollerUnitSelect) : TPtr(new TPollerUnitEpoll); + return useSelect ? TPtr(new TPollerUnitSelect) : TPtr(new TPollerUnitEpoll); #endif - } - - TPollerUnit::TPollerUnit() - : StopFlag(true) - , ReadLoop(TThread::TParams(IdleThread<false>, this).SetName("network read")) - , WriteLoop(TThread::TParams(IdleThread<true>, this).SetName("network write")) - { - } - - TPollerUnit::~TPollerUnit() { - if (!AtomicLoad(&StopFlag)) - Stop(); - } - - void - TPollerUnit::Start() { - AtomicStore(&StopFlag, false); - ReadLoop.Start(); - WriteLoop.Start(); - } - - void - TPollerUnit::Stop() { - AtomicStore(&StopFlag, true); - ReadLoop.Join(); - WriteLoop.Join(); - } - - template <> - TPollerUnit::TSide& - TPollerUnit::GetSide<false>() { - return Read; - } - - template <> - TPollerUnit::TSide& - TPollerUnit::GetSide<true>() { - return Write; - } - - void - TPollerUnit::StartReadOperation( + } + + TPollerUnit::TPollerUnit() + : StopFlag(true) + , ReadLoop(TThread::TParams(IdleThread<false>, this).SetName("network read")) + , WriteLoop(TThread::TParams(IdleThread<true>, this).SetName("network write")) + { + } + + TPollerUnit::~TPollerUnit() { + if (!AtomicLoad(&StopFlag)) + Stop(); + } + + void + TPollerUnit::Start() { + AtomicStore(&StopFlag, false); + ReadLoop.Start(); + WriteLoop.Start(); + } + + void + TPollerUnit::Stop() { + AtomicStore(&StopFlag, true); + ReadLoop.Join(); + WriteLoop.Join(); + } + + template <> + TPollerUnit::TSide& + TPollerUnit::GetSide<false>() { + return Read; + } + + template <> + TPollerUnit::TSide& + TPollerUnit::GetSide<true>() { + return Write; + } + + void + TPollerUnit::StartReadOperation( const TIntrusivePtr<TSharedDescriptor>& stream, - TFDDelegate&& operation) { - Y_VERIFY_DEBUG(stream); - if (AtomicLoad(&StopFlag)) - return; - GetSide<false>().InputQueue.Push(TSide::TItem(stream, std::move(operation))); - } - - void - TPollerUnit::StartWriteOperation( + TFDDelegate&& operation) { + Y_VERIFY_DEBUG(stream); + if (AtomicLoad(&StopFlag)) + return; + GetSide<false>().InputQueue.Push(TSide::TItem(stream, std::move(operation))); + } + + void + TPollerUnit::StartWriteOperation( const TIntrusivePtr<TSharedDescriptor>& stream, - TFDDelegate&& operation) { - Y_VERIFY_DEBUG(stream); - if (AtomicLoad(&StopFlag)) - return; - GetSide<true>().InputQueue.Push(TSide::TItem(stream, std::move(operation))); - } - - template <bool IsWrite> - void* - TPollerUnit::IdleThread(void* param) { + TFDDelegate&& operation) { + Y_VERIFY_DEBUG(stream); + if (AtomicLoad(&StopFlag)) + return; + GetSide<true>().InputQueue.Push(TSide::TItem(stream, std::move(operation))); + } + + template <bool IsWrite> + void* + TPollerUnit::IdleThread(void* param) { // TODO: musl-libc version of `sched_param` struct is for some reason different from pthread // version in Ubuntu 12.04 #if defined(_linux_) && !defined(_musl_) - pthread_t threadSelf = pthread_self(); - sched_param sparam = {20}; - pthread_setschedparam(threadSelf, SCHED_FIFO, &sparam); + pthread_t threadSelf = pthread_self(); + sched_param sparam = {20}; + pthread_setschedparam(threadSelf, SCHED_FIFO, &sparam); #endif - static_cast<TPollerUnit*>(param)->RunLoop<IsWrite>(); - return nullptr; - } + static_cast<TPollerUnit*>(param)->RunLoop<IsWrite>(); + return nullptr; + } - template <> - void - TPollerUnit::RunLoop<false>() { + template <> + void + TPollerUnit::RunLoop<false>() { NProfiling::TMemoryTagScope tag("INTERCONNECT_RECEIVED_DATA"); - while (!AtomicLoad(&StopFlag)) - ProcessRead(); - } + while (!AtomicLoad(&StopFlag)) + ProcessRead(); + } - template <> - void - TPollerUnit::RunLoop<true>() { + template <> + void + TPollerUnit::RunLoop<true>() { NProfiling::TMemoryTagScope tag("INTERCONNECT_SEND_DATA"); - while (!AtomicLoad(&StopFlag)) - ProcessWrite(); - } - - void - TPollerUnit::TSide::ProcessInput() { - if (!InputQueue.IsEmpty()) - do { - auto sock = InputQueue.Top().first->GetDescriptor(); - if (!Operations.emplace(sock, std::move(InputQueue.Top())).second) - Y_FAIL("Descriptor is already in pooler."); - } while (InputQueue.Pop()); - } + while (!AtomicLoad(&StopFlag)) + ProcessWrite(); + } + + void + TPollerUnit::TSide::ProcessInput() { + if (!InputQueue.IsEmpty()) + do { + auto sock = InputQueue.Top().first->GetDescriptor(); + if (!Operations.emplace(sock, std::move(InputQueue.Top())).second) + Y_FAIL("Descriptor is already in pooler."); + } while (InputQueue.Pop()); + } } diff --git a/library/cpp/actors/interconnect/poller_tcp_unit.h b/library/cpp/actors/interconnect/poller_tcp_unit.h index d3f0d0fd0a..692168b968 100644 --- a/library/cpp/actors/interconnect/poller_tcp_unit.h +++ b/library/cpp/actors/interconnect/poller_tcp_unit.h @@ -10,58 +10,58 @@ #include <unordered_map> namespace NInterconnect { - using NActors::TFDDelegate; - using NActors::TSharedDescriptor; + using NActors::TFDDelegate; + using NActors::TSharedDescriptor; - class TPollerUnit { - public: - typedef std::unique_ptr<TPollerUnit> TPtr; + class TPollerUnit { + public: + typedef std::unique_ptr<TPollerUnit> TPtr; - static TPtr Make(bool useSelect); + static TPtr Make(bool useSelect); - void Start(); - void Stop(); + void Start(); + void Stop(); - virtual void StartReadOperation( - const TIntrusivePtr<TSharedDescriptor>& stream, - TFDDelegate&& operation); + virtual void StartReadOperation( + const TIntrusivePtr<TSharedDescriptor>& stream, + TFDDelegate&& operation); - virtual void StartWriteOperation( - const TIntrusivePtr<TSharedDescriptor>& stream, - TFDDelegate&& operation); + virtual void StartWriteOperation( + const TIntrusivePtr<TSharedDescriptor>& stream, + TFDDelegate&& operation); - virtual ~TPollerUnit(); + virtual ~TPollerUnit(); - private: - virtual void ProcessRead() = 0; - virtual void ProcessWrite() = 0; + private: + virtual void ProcessRead() = 0; + virtual void ProcessWrite() = 0; - template <bool IsWrite> - static void* IdleThread(void* param); + template <bool IsWrite> + static void* IdleThread(void* param); - template <bool IsWrite> - void RunLoop(); + template <bool IsWrite> + void RunLoop(); - volatile bool StopFlag; - TThread ReadLoop, WriteLoop; + volatile bool StopFlag; + TThread ReadLoop, WriteLoop; - protected: - TPollerUnit(); + protected: + TPollerUnit(); - struct TSide { - using TOperations = - std::unordered_map<SOCKET, - std::pair<TIntrusivePtr<TSharedDescriptor>, TFDDelegate>>; + struct TSide { + using TOperations = + std::unordered_map<SOCKET, + std::pair<TIntrusivePtr<TSharedDescriptor>, TFDDelegate>>; - TOperations Operations; - using TItem = TOperations::mapped_type; - TFunnelQueue<TItem> InputQueue; + TOperations Operations; + using TItem = TOperations::mapped_type; + TFunnelQueue<TItem> InputQueue; - void ProcessInput(); - } Read, Write; + void ProcessInput(); + } Read, Write; - template <bool IsWrite> - TSide& GetSide(); - }; + template <bool IsWrite> + TSide& GetSide(); + }; } diff --git a/library/cpp/actors/interconnect/poller_tcp_unit_epoll.cpp b/library/cpp/actors/interconnect/poller_tcp_unit_epoll.cpp index c0c4524f1e..c78538b95b 100644 --- a/library/cpp/actors/interconnect/poller_tcp_unit_epoll.cpp +++ b/library/cpp/actors/interconnect/poller_tcp_unit_epoll.cpp @@ -8,117 +8,117 @@ #include <cstring> namespace NInterconnect { - namespace { - void - DeleteEpoll(int epoll, SOCKET stream) { - ::epoll_event event = {0, {.fd = stream}}; - if (::epoll_ctl(epoll, EPOLL_CTL_DEL, stream, &event)) { - Cerr << "epoll_ctl errno: " << errno << Endl; - Y_FAIL("epoll delete error!"); - } - } - - template <ui32 Events> - void - AddEpoll(int epoll, SOCKET stream) { + namespace { + void + DeleteEpoll(int epoll, SOCKET stream) { + ::epoll_event event = {0, {.fd = stream}}; + if (::epoll_ctl(epoll, EPOLL_CTL_DEL, stream, &event)) { + Cerr << "epoll_ctl errno: " << errno << Endl; + Y_FAIL("epoll delete error!"); + } + } + + template <ui32 Events> + void + AddEpoll(int epoll, SOCKET stream) { ::epoll_event event = {.events = Events}; - event.data.fd = stream; - if (::epoll_ctl(epoll, EPOLL_CTL_ADD, stream, &event)) { - Cerr << "epoll_ctl errno: " << errno << Endl; - Y_FAIL("epoll add error!"); - } - } - - int - Initialize() { - const auto epoll = ::epoll_create(10000); - Y_VERIFY_DEBUG(epoll > 0); - return epoll; - } - + event.data.fd = stream; + if (::epoll_ctl(epoll, EPOLL_CTL_ADD, stream, &event)) { + Cerr << "epoll_ctl errno: " << errno << Endl; + Y_FAIL("epoll add error!"); + } + } + + int + Initialize() { + const auto epoll = ::epoll_create(10000); + Y_VERIFY_DEBUG(epoll > 0); + return epoll; + } + } - TPollerUnitEpoll::TPollerUnitEpoll() - : ReadDescriptor(Initialize()) - , WriteDescriptor(Initialize()) - { - // Block on the epoll descriptor. - ::sigemptyset(&sigmask); - ::sigaddset(&sigmask, SIGPIPE); - ::sigaddset(&sigmask, SIGTERM); + TPollerUnitEpoll::TPollerUnitEpoll() + : ReadDescriptor(Initialize()) + , WriteDescriptor(Initialize()) + { + // Block on the epoll descriptor. + ::sigemptyset(&sigmask); + ::sigaddset(&sigmask, SIGPIPE); + ::sigaddset(&sigmask, SIGTERM); } - TPollerUnitEpoll::~TPollerUnitEpoll() { - ::close(ReadDescriptor); - ::close(WriteDescriptor); - } + TPollerUnitEpoll::~TPollerUnitEpoll() { + ::close(ReadDescriptor); + ::close(WriteDescriptor); + } - template <> - int TPollerUnitEpoll::GetDescriptor<false>() const { - return ReadDescriptor; - } + template <> + int TPollerUnitEpoll::GetDescriptor<false>() const { + return ReadDescriptor; + } - template <> - int TPollerUnitEpoll::GetDescriptor<true>() const { - return WriteDescriptor; - } + template <> + int TPollerUnitEpoll::GetDescriptor<true>() const { + return WriteDescriptor; + } - void - TPollerUnitEpoll::StartReadOperation( + void + TPollerUnitEpoll::StartReadOperation( const TIntrusivePtr<TSharedDescriptor>& s, - TFDDelegate&& operation) { - TPollerUnit::StartReadOperation(s, std::move(operation)); - AddEpoll<EPOLLRDHUP | EPOLLIN>(ReadDescriptor, s->GetDescriptor()); - } + TFDDelegate&& operation) { + TPollerUnit::StartReadOperation(s, std::move(operation)); + AddEpoll<EPOLLRDHUP | EPOLLIN>(ReadDescriptor, s->GetDescriptor()); + } - void - TPollerUnitEpoll::StartWriteOperation( + void + TPollerUnitEpoll::StartWriteOperation( const TIntrusivePtr<TSharedDescriptor>& s, - TFDDelegate&& operation) { - TPollerUnit::StartWriteOperation(s, std::move(operation)); - AddEpoll<EPOLLRDHUP | EPOLLOUT>(WriteDescriptor, s->GetDescriptor()); - } - - constexpr int EVENTS_BUF_SIZE = 128; - - template <bool WriteOp> - void - TPollerUnitEpoll::Process() { - ::epoll_event events[EVENTS_BUF_SIZE]; - - const int epoll = GetDescriptor<WriteOp>(); - - /* Timeout just to check StopFlag sometimes */ - const int result = - ::epoll_pwait(epoll, events, EVENTS_BUF_SIZE, 200, &sigmask); - - if (result == -1 && errno != EINTR) - Y_FAIL("epoll wait error!"); - - auto& side = GetSide<WriteOp>(); - side.ProcessInput(); - - for (int i = 0; i < result; ++i) { - const auto it = side.Operations.find(events[i].data.fd); - if (side.Operations.end() == it) - continue; - if (const auto& finalizer = it->second.second(it->second.first)) { - DeleteEpoll(epoll, it->first); - side.Operations.erase(it); - finalizer(); - } + TFDDelegate&& operation) { + TPollerUnit::StartWriteOperation(s, std::move(operation)); + AddEpoll<EPOLLRDHUP | EPOLLOUT>(WriteDescriptor, s->GetDescriptor()); + } + + constexpr int EVENTS_BUF_SIZE = 128; + + template <bool WriteOp> + void + TPollerUnitEpoll::Process() { + ::epoll_event events[EVENTS_BUF_SIZE]; + + const int epoll = GetDescriptor<WriteOp>(); + + /* Timeout just to check StopFlag sometimes */ + const int result = + ::epoll_pwait(epoll, events, EVENTS_BUF_SIZE, 200, &sigmask); + + if (result == -1 && errno != EINTR) + Y_FAIL("epoll wait error!"); + + auto& side = GetSide<WriteOp>(); + side.ProcessInput(); + + for (int i = 0; i < result; ++i) { + const auto it = side.Operations.find(events[i].data.fd); + if (side.Operations.end() == it) + continue; + if (const auto& finalizer = it->second.second(it->second.first)) { + DeleteEpoll(epoll, it->first); + side.Operations.erase(it); + finalizer(); + } } } - void - TPollerUnitEpoll::ProcessRead() { - Process<false>(); - } + void + TPollerUnitEpoll::ProcessRead() { + Process<false>(); + } - void - TPollerUnitEpoll::ProcessWrite() { - Process<true>(); - } + void + TPollerUnitEpoll::ProcessWrite() { + Process<true>(); + } } diff --git a/library/cpp/actors/interconnect/poller_tcp_unit_epoll.h b/library/cpp/actors/interconnect/poller_tcp_unit_epoll.h index 70aee1c66b..ff7893eba2 100644 --- a/library/cpp/actors/interconnect/poller_tcp_unit_epoll.h +++ b/library/cpp/actors/interconnect/poller_tcp_unit_epoll.h @@ -3,31 +3,31 @@ #include "poller_tcp_unit.h" namespace NInterconnect { - class TPollerUnitEpoll: public TPollerUnit { - public: - TPollerUnitEpoll(); - virtual ~TPollerUnitEpoll(); + class TPollerUnitEpoll: public TPollerUnit { + public: + TPollerUnitEpoll(); + virtual ~TPollerUnitEpoll(); - private: - virtual void StartReadOperation( - const TIntrusivePtr<TSharedDescriptor>& s, - TFDDelegate&& operation) override; + private: + virtual void StartReadOperation( + const TIntrusivePtr<TSharedDescriptor>& s, + TFDDelegate&& operation) override; - virtual void StartWriteOperation( - const TIntrusivePtr<TSharedDescriptor>& s, - TFDDelegate&& operation) override; + virtual void StartWriteOperation( + const TIntrusivePtr<TSharedDescriptor>& s, + TFDDelegate&& operation) override; - virtual void ProcessRead() override; - virtual void ProcessWrite() override; + virtual void ProcessRead() override; + virtual void ProcessWrite() override; - template <bool Write> - void Process(); + template <bool Write> + void Process(); - template <bool Write> - int GetDescriptor() const; + template <bool Write> + int GetDescriptor() const; - const int ReadDescriptor, WriteDescriptor; - ::sigset_t sigmask; - }; + const int ReadDescriptor, WriteDescriptor; + ::sigset_t sigmask; + }; } diff --git a/library/cpp/actors/interconnect/poller_tcp_unit_select.cpp b/library/cpp/actors/interconnect/poller_tcp_unit_select.cpp index f0acc6d338..ae7aaad566 100644 --- a/library/cpp/actors/interconnect/poller_tcp_unit_select.cpp +++ b/library/cpp/actors/interconnect/poller_tcp_unit_select.cpp @@ -15,72 +15,72 @@ typedef timeval TIMEVAL; #endif namespace NInterconnect { - TPollerUnitSelect::TPollerUnitSelect() { - } + TPollerUnitSelect::TPollerUnitSelect() { + } - TPollerUnitSelect::~TPollerUnitSelect() { - } + TPollerUnitSelect::~TPollerUnitSelect() { + } - template <bool IsWrite> - void - TPollerUnitSelect::Process() { - auto& side = GetSide<IsWrite>(); - side.ProcessInput(); + template <bool IsWrite> + void + TPollerUnitSelect::Process() { + auto& side = GetSide<IsWrite>(); + side.ProcessInput(); - enum : size_t { R, - W, - E }; - static const auto O = IsWrite ? W : R; + enum : size_t { R, + W, + E }; + static const auto O = IsWrite ? W : R; - ::fd_set sets[3]; + ::fd_set sets[3]; - FD_ZERO(&sets[R]); - FD_ZERO(&sets[W]); - FD_ZERO(&sets[E]); + FD_ZERO(&sets[R]); + FD_ZERO(&sets[W]); + FD_ZERO(&sets[E]); - for (const auto& operation : side.Operations) { - FD_SET(operation.first, &sets[O]); - FD_SET(operation.first, &sets[E]); - } + for (const auto& operation : side.Operations) { + FD_SET(operation.first, &sets[O]); + FD_SET(operation.first, &sets[E]); + } #if defined(_win_) - ::TIMEVAL timeout = {0L, 99991L}; - const auto numberEvents = !side.Operations.empty() ? ::select(FD_SETSIZE, &sets[R], &sets[W], &sets[E], &timeout) - : (::Sleep(100), 0); + ::TIMEVAL timeout = {0L, 99991L}; + const auto numberEvents = !side.Operations.empty() ? ::select(FD_SETSIZE, &sets[R], &sets[W], &sets[E], &timeout) + : (::Sleep(100), 0); #elif defined(_darwin_) - ::TIMEVAL timeout = {0L, 99991L}; - const auto numberEvents = ::select(FD_SETSIZE, &sets[R], &sets[W], &sets[E], &timeout); + ::TIMEVAL timeout = {0L, 99991L}; + const auto numberEvents = ::select(FD_SETSIZE, &sets[R], &sets[W], &sets[E], &timeout); #else - ::sigset_t sigmask; - ::sigemptyset(&sigmask); - ::sigaddset(&sigmask, SIGPIPE); - ::sigaddset(&sigmask, SIGTERM); + ::sigset_t sigmask; + ::sigemptyset(&sigmask); + ::sigaddset(&sigmask, SIGPIPE); + ::sigaddset(&sigmask, SIGTERM); - struct ::timespec timeout = {0L, 99999989L}; - const auto numberEvents = ::pselect(FD_SETSIZE, &sets[R], &sets[W], &sets[E], &timeout, &sigmask); + struct ::timespec timeout = {0L, 99999989L}; + const auto numberEvents = ::pselect(FD_SETSIZE, &sets[R], &sets[W], &sets[E], &timeout, &sigmask); #endif - Y_VERIFY_DEBUG(numberEvents >= 0); - - for (auto it = side.Operations.cbegin(); side.Operations.cend() != it;) { - if (FD_ISSET(it->first, &sets[O]) || FD_ISSET(it->first, &sets[E])) - if (const auto& finalizer = it->second.second(it->second.first)) { - side.Operations.erase(it++); - finalizer(); - continue; - } - ++it; - } + Y_VERIFY_DEBUG(numberEvents >= 0); + + for (auto it = side.Operations.cbegin(); side.Operations.cend() != it;) { + if (FD_ISSET(it->first, &sets[O]) || FD_ISSET(it->first, &sets[E])) + if (const auto& finalizer = it->second.second(it->second.first)) { + side.Operations.erase(it++); + finalizer(); + continue; + } + ++it; + } } - void - TPollerUnitSelect::ProcessRead() { - Process<false>(); - } + void + TPollerUnitSelect::ProcessRead() { + Process<false>(); + } - void - TPollerUnitSelect::ProcessWrite() { - Process<true>(); - } + void + TPollerUnitSelect::ProcessWrite() { + Process<true>(); + } } diff --git a/library/cpp/actors/interconnect/poller_tcp_unit_select.h b/library/cpp/actors/interconnect/poller_tcp_unit_select.h index 7b6d5b3996..0c15217796 100644 --- a/library/cpp/actors/interconnect/poller_tcp_unit_select.h +++ b/library/cpp/actors/interconnect/poller_tcp_unit_select.h @@ -3,17 +3,17 @@ #include "poller_tcp_unit.h" namespace NInterconnect { - class TPollerUnitSelect: public TPollerUnit { - public: - TPollerUnitSelect(); - virtual ~TPollerUnitSelect(); + class TPollerUnitSelect: public TPollerUnit { + public: + TPollerUnitSelect(); + virtual ~TPollerUnitSelect(); - private: - virtual void ProcessRead() override; - virtual void ProcessWrite() override; + private: + virtual void ProcessRead() override; + virtual void ProcessWrite() override; - template <bool IsWrite> - void Process(); - }; + template <bool IsWrite> + void Process(); + }; } diff --git a/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h index d17755cad1..2b6d27cd3f 100644 --- a/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h +++ b/library/cpp/actors/interconnect/ut/lib/ic_test_cluster.h @@ -9,14 +9,14 @@ #include <util/generic/noncopyable.h> -class TTestICCluster: public TNonCopyable { +class TTestICCluster: public TNonCopyable { public: struct TTrafficInterrupterSettings { TDuration RejectingTrafficTimeout; double BandWidth; bool Disconnect; }; - + private: const ui32 NumNodes; const TString Address = "::1"; @@ -29,7 +29,7 @@ private: public: TTestICCluster(ui32 numNodes = 1, NActors::TChannelsConfig channelsConfig = NActors::TChannelsConfig(), - TTrafficInterrupterSettings* tiSettings = nullptr) + TTrafficInterrupterSettings* tiSettings = nullptr) : NumNodes(numNodes) , Counters(new NMonitoring::TDynamicCounters) , ChannelsConfig(channelsConfig) @@ -67,7 +67,7 @@ public: return Nodes[id].Get(); } - ~TTestICCluster() { + ~TTestICCluster() { } TActorId RegisterActor(NActors::IActor* actor, ui32 nodeId) { diff --git a/library/cpp/actors/interconnect/ut/lib/interrupter.h b/library/cpp/actors/interconnect/ut/lib/interrupter.h index a4a4373f3e..48851de2c5 100644 --- a/library/cpp/actors/interconnect/ut/lib/interrupter.h +++ b/library/cpp/actors/interconnect/ut/lib/interrupter.h @@ -26,7 +26,7 @@ class TTrafficInterrupter TVector<char> Data; }; struct TCompare { - bool operator()(const std::pair<TInstant, TDelayedPacket>& x, const std::pair<TInstant, TDelayedPacket>& y) const { + bool operator()(const std::pair<TInstant, TDelayedPacket>& x, const std::pair<TInstant, TDelayedPacket>& y) const { return x.first > y.first; }; }; @@ -160,7 +160,7 @@ private: timeout = Min(timeout, conn.DelayedQueue.top().first - TInstant::Now()); } }; - for (auto& it : Connections) { + for (auto& it : Connections) { updateTimout(it.ForwardConnection); updateTimout(it.BackwardConnection); } @@ -177,20 +177,20 @@ private: } if (DelayTraffic) { // process packets from DelayQueues auto processDelayedPackages = [](TDirectedConnection& conn) { - while (!conn.DelayedQueue.empty()) { - auto& frontPackage = conn.DelayedQueue.top(); - if (TInstant::Now() >= frontPackage.first) { - TInet6StreamSocket* sock = frontPackage.second.ForwardSocket; - if (sock) { + while (!conn.DelayedQueue.empty()) { + auto& frontPackage = conn.DelayedQueue.top(); + if (TInstant::Now() >= frontPackage.first) { + TInet6StreamSocket* sock = frontPackage.second.ForwardSocket; + if (sock) { sock->Send(frontPackage.second.Data.data(), frontPackage.second.Data.size()); } - conn.DelayedQueue.pop(); - } else { - break; + conn.DelayedQueue.pop(); + } else { + break; } - } - }; - for (auto& it : Connections) { + } + }; + for (auto& it : Connections) { processDelayedPackages(it.ForwardConnection); processDelayedPackages(it.BackwardConnection); } @@ -228,7 +228,7 @@ private: if (recvSize > 0) { if (DelayTraffic) { // put packet into DelayQueue - const TDuration baseDelay = TDuration::MicroSeconds(recvSize * 1e6 / Bandwidth); + const TDuration baseDelay = TDuration::MicroSeconds(recvSize * 1e6 / Bandwidth); const TInstant now = TInstant::Now(); directedConnection->Timestamp = Max(now, directedConnection->Timestamp) + baseDelay; TDelayedPacket pkt; diff --git a/library/cpp/actors/interconnect/ut/lib/node.h b/library/cpp/actors/interconnect/ut/lib/node.h index cd3a39a6c0..ff30b1445e 100644 --- a/library/cpp/actors/interconnect/ut/lib/node.h +++ b/library/cpp/actors/interconnect/ut/lib/node.h @@ -63,7 +63,7 @@ public: TMailboxType::ReadAsFilled, 0)); const TActorId loggerActorId(0, "logger"); - constexpr ui32 LoggerComponentId = 410; // NKikimrServices::LOGGER + constexpr ui32 LoggerComponentId = 410; // NKikimrServices::LOGGER auto loggerSettings = MakeIntrusive<NLog::TSettings>( loggerActorId, @@ -78,7 +78,7 @@ public: NActorsServices::EServiceCommon_Name ); - constexpr ui32 WilsonComponentId = 430; // NKikimrServices::WILSON + constexpr ui32 WilsonComponentId = 430; // NKikimrServices::WILSON static const TString WilsonComponentName = "WILSON"; loggerSettings->Append( diff --git a/library/cpp/actors/interconnect/ut/lib/test_actors.h b/library/cpp/actors/interconnect/ut/lib/test_actors.h index bcb3504ebe..7591200471 100644 --- a/library/cpp/actors/interconnect/ut/lib/test_actors.h +++ b/library/cpp/actors/interconnect/ut/lib/test_actors.h @@ -1,56 +1,56 @@ #pragma once namespace NActors { - class TSenderBaseActor: public TActorBootstrapped<TSenderBaseActor> { - protected: + class TSenderBaseActor: public TActorBootstrapped<TSenderBaseActor> { + protected: const TActorId RecipientActorId; - const ui32 Preload; - ui64 SequenceNumber = 0; - ui32 InFlySize = 0; + const ui32 Preload; + ui64 SequenceNumber = 0; + ui32 InFlySize = 0; - public: + public: TSenderBaseActor(const TActorId& recipientActorId, ui32 preload = 1) - : RecipientActorId(recipientActorId) - , Preload(preload) - { - } + : RecipientActorId(recipientActorId) + , Preload(preload) + { + } - virtual ~TSenderBaseActor() { - } + virtual ~TSenderBaseActor() { + } - virtual void Bootstrap(const TActorContext& ctx) { - Become(&TSenderBaseActor::StateFunc); + virtual void Bootstrap(const TActorContext& ctx) { + Become(&TSenderBaseActor::StateFunc); ctx.Send(ctx.ExecutorThread.ActorSystem->InterconnectProxy(RecipientActorId.NodeId()), new TEvInterconnect::TEvConnectNode); - } + } - virtual void SendMessagesIfPossible(const TActorContext& ctx) { - while (InFlySize < Preload) { - SendMessage(ctx); - } + virtual void SendMessagesIfPossible(const TActorContext& ctx) { + while (InFlySize < Preload) { + SendMessage(ctx); + } } - virtual void SendMessage(const TActorContext& /*ctx*/) { - ++SequenceNumber; - } + virtual void SendMessage(const TActorContext& /*ctx*/) { + ++SequenceNumber; + } - virtual void Handle(TEvents::TEvUndelivered::TPtr& /*ev*/, const TActorContext& ctx) { - SendMessage(ctx); - } + virtual void Handle(TEvents::TEvUndelivered::TPtr& /*ev*/, const TActorContext& ctx) { + SendMessage(ctx); + } - virtual void Handle(TEvTestResponse::TPtr& /*ev*/, const TActorContext& ctx) { - SendMessagesIfPossible(ctx); - } + virtual void Handle(TEvTestResponse::TPtr& /*ev*/, const TActorContext& ctx) { + SendMessagesIfPossible(ctx); + } void Handle(TEvInterconnect::TEvNodeConnected::TPtr& /*ev*/, const TActorContext& ctx) { SendMessagesIfPossible(ctx); - } + } - void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& /*ev*/, const TActorContext& /*ctx*/) { - } + void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& /*ev*/, const TActorContext& /*ctx*/) { + } - virtual void Handle(TEvents::TEvPoisonPill::TPtr& /*ev*/, const TActorContext& ctx) { - Die(ctx); - } + virtual void Handle(TEvents::TEvPoisonPill::TPtr& /*ev*/, const TActorContext& ctx) { + Die(ctx); + } virtual STRICT_STFUNC(StateFunc, HFunc(TEvTestResponse, Handle) @@ -59,25 +59,25 @@ namespace NActors { HFunc(TEvInterconnect::TEvNodeConnected, Handle) HFunc(TEvInterconnect::TEvNodeDisconnected, Handle) ) - }; + }; - class TReceiverBaseActor: public TActor<TReceiverBaseActor> { - protected: - ui64 ReceivedCount = 0; + class TReceiverBaseActor: public TActor<TReceiverBaseActor> { + protected: + ui64 ReceivedCount = 0; - public: - TReceiverBaseActor() - : TActor(&TReceiverBaseActor::StateFunc) - { - } + public: + TReceiverBaseActor() + : TActor(&TReceiverBaseActor::StateFunc) + { + } - virtual ~TReceiverBaseActor() { - } + virtual ~TReceiverBaseActor() { + } virtual STRICT_STFUNC(StateFunc, HFunc(TEvTest, Handle) ) virtual void Handle(TEvTest::TPtr& /*ev*/, const TActorContext& /*ctx*/) {} - }; + }; } diff --git a/library/cpp/actors/interconnect/ut/lib/test_events.h b/library/cpp/actors/interconnect/ut/lib/test_events.h index 29d52b20af..cd0d9e0152 100644 --- a/library/cpp/actors/interconnect/ut/lib/test_events.h +++ b/library/cpp/actors/interconnect/ut/lib/test_events.h @@ -2,7 +2,7 @@ #include <library/cpp/actors/interconnect/ut/protos/interconnect_test.pb.h> -namespace NActors { +namespace NActors { enum { EvTest = EventSpaceBegin(TEvents::ES_PRIVATE), EvTestChan, diff --git a/library/cpp/actors/interconnect/ut_fat/main.cpp b/library/cpp/actors/interconnect/ut_fat/main.cpp index c83f7c2ddf..5d19bc3003 100644 --- a/library/cpp/actors/interconnect/ut_fat/main.cpp +++ b/library/cpp/actors/interconnect/ut_fat/main.cpp @@ -61,13 +61,13 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) { const NInterconnectTest::TEvTestResponse& record = ev->Get()->Record; Y_VERIFY(record.HasConfirmedSequenceNumber()); if (!(SendFlags & IEventHandle::FlagGenerateUnsureUndelivered)) { - while (record.GetConfirmedSequenceNumber() != InFly.front()) { + while (record.GetConfirmedSequenceNumber() != InFly.front()) { InFly.pop_front(); --InFlySize; } } Y_VERIFY(record.GetConfirmedSequenceNumber() == InFly.front(), "got# %" PRIu64 " expected# %" PRIu64, - record.GetConfirmedSequenceNumber(), InFly.front()); + record.GetConfirmedSequenceNumber(), InFly.front()); InFly.pop_front(); --InFlySize; SendMessagesIfPossible(ctx); diff --git a/library/cpp/actors/interconnect/watchdog_timer.h b/library/cpp/actors/interconnect/watchdog_timer.h index 52376aa3fb..c190105a59 100644 --- a/library/cpp/actors/interconnect/watchdog_timer.h +++ b/library/cpp/actors/interconnect/watchdog_timer.h @@ -1,7 +1,7 @@ #pragma once namespace NActors { - template <typename TEvent> + template <typename TEvent> class TWatchdogTimer { using TCallback = std::function<void()>; @@ -9,7 +9,7 @@ namespace NActors { const TCallback Callback; TInstant LastResetTimestamp; - TEvent* ExpectedEvent = nullptr; + TEvent* ExpectedEvent = nullptr; ui32 Iteration = 0; static constexpr ui32 NumIterationsBeforeFiring = 2; @@ -18,8 +18,8 @@ namespace NActors { TWatchdogTimer(TDuration timeout, TCallback callback) : Timeout(timeout) , Callback(std::move(callback)) - { - } + { + } void Arm(const TActorIdentity& actor) { if (Timeout != TDuration::Zero() && Timeout != TDuration::Max()) { @@ -65,4 +65,4 @@ namespace NActors { } }; -} +} |