diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:15 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:15 +0300 |
commit | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch) | |
tree | da2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /library/cpp/actors/interconnect/interconnect_tcp_session.h | |
parent | 778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff) | |
download | ydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/interconnect/interconnect_tcp_session.h')
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_tcp_session.h | 280 |
1 files changed, 140 insertions, 140 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index 7fc00dbcc5..1ecd829132 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -30,61 +30,61 @@ #include <unordered_map> namespace NActors { - class TSlowPathChecker { - using TTraceCallback = std::function<void(double)>; - TTraceCallback Callback; - const NHPTimer::STime Start; - - public: - TSlowPathChecker(TTraceCallback&& callback) - : Callback(std::move(callback)) + class TSlowPathChecker { + using TTraceCallback = std::function<void(double)>; + TTraceCallback Callback; + const NHPTimer::STime Start; + + public: + TSlowPathChecker(TTraceCallback&& callback) + : Callback(std::move(callback)) , Start(GetCycleCountFast()) - { - } + { + } - ~TSlowPathChecker() { + ~TSlowPathChecker() { const NHPTimer::STime end = GetCycleCountFast(); - const NHPTimer::STime elapsed = end - Start; - if (elapsed > 1000000) { - Callback(NHPTimer::GetSeconds(elapsed) * 1000); - } + const NHPTimer::STime elapsed = end - Start; + if (elapsed > 1000000) { + Callback(NHPTimer::GetSeconds(elapsed) * 1000); + } + } + + operator bool() const { + return false; } + }; - operator bool() const { - return false; - } - }; +#define LWPROBE_IF_TOO_LONG(...) \ + if (auto __x = TSlowPathChecker{[&](double ms) { LWPROBE(__VA_ARGS__); }}) \ + ; \ + else -#define LWPROBE_IF_TOO_LONG(...) \ - if (auto __x = TSlowPathChecker{[&](double ms) { LWPROBE(__VA_ARGS__); }}) \ - ; \ - else - - class TTimeLimit { - public: - TTimeLimit(ui64 limitInCycles) + class TTimeLimit { + public: + TTimeLimit(ui64 limitInCycles) : UpperLimit(limitInCycles == 0 ? 0 : GetCycleCountFast() + limitInCycles) - { - } + { + } - TTimeLimit(ui64 startTS, ui64 limitInCycles) - : UpperLimit(limitInCycles == 0 ? 0 : startTS + limitInCycles) - { - } + TTimeLimit(ui64 startTS, ui64 limitInCycles) + : UpperLimit(limitInCycles == 0 ? 0 : startTS + limitInCycles) + { + } - bool CheckExceeded() { + bool CheckExceeded() { return UpperLimit != 0 && GetCycleCountFast() > UpperLimit; - } + } - const ui64 UpperLimit; - }; + const ui64 UpperLimit; + }; - static constexpr TDuration DEFAULT_DEADPEER_TIMEOUT = TDuration::Seconds(10); + static constexpr TDuration DEFAULT_DEADPEER_TIMEOUT = TDuration::Seconds(10); static constexpr TDuration DEFAULT_LOST_CONNECTION_TIMEOUT = TDuration::Seconds(10); - static constexpr ui32 DEFAULT_MAX_INFLIGHT_DATA = 10240 * 1024; - static constexpr ui32 DEFAULT_TOTAL_INFLIGHT_DATA = 4 * 10240 * 1024; + static constexpr ui32 DEFAULT_MAX_INFLIGHT_DATA = 10240 * 1024; + static constexpr ui32 DEFAULT_TOTAL_INFLIGHT_DATA = 4 * 10240 * 1024; - class TInterconnectProxyTCP; + class TInterconnectProxyTCP; enum class EUpdateState : ui8 { NONE, // no updates generated by input session yet @@ -93,22 +93,22 @@ namespace NActors { CONFIRMING, // confirmation inflight }; - struct TReceiveContext: public TAtomicRefCount<TReceiveContext> { - /* All invokations to these fields should be thread-safe */ + struct TReceiveContext: public TAtomicRefCount<TReceiveContext> { + /* All invokations to these fields should be thread-safe */ - ui64 ControlPacketSendTimer = 0; - ui64 ControlPacketId = 0; + ui64 ControlPacketSendTimer = 0; + ui64 ControlPacketId = 0; - // number of packets received by input session - TAtomic PacketsReadFromSocket = 0; - TAtomic DataPacketsReadFromSocket = 0; + // number of packets received by input session + TAtomic PacketsReadFromSocket = 0; + TAtomic DataPacketsReadFromSocket = 0; // last processed packet by input session std::atomic_uint64_t LastProcessedPacketSerial = 0; static constexpr uint64_t LastProcessedPacketSerialLockBit = uint64_t(1) << 63; - // for hardened checks - TAtomic NumInputSessions = 0; + // for hardened checks + TAtomic NumInputSessions = 0; NHPTimer::STime StartTime; @@ -160,9 +160,9 @@ namespace NActors { ui64 GetLastProcessedPacketSerial() { return LastProcessedPacketSerial.load() & ~LastProcessedPacketSerialLockBit; } - }; + }; - class TInputSessionTCP + class TInputSessionTCP : public TActorBootstrapped<TInputSessionTCP> , public TInterconnectLoggingBase { @@ -174,14 +174,14 @@ namespace NActors { struct TEvCheckDeadPeer : TEventLocal<TEvCheckDeadPeer, EvCheckDeadPeer> {}; struct TEvResumeReceiveData : TEventLocal<TEvResumeReceiveData, EvResumeReceiveData> {}; - public: - static constexpr EActivityType ActorActivityType() { + public: + static constexpr EActivityType ActorActivityType() { return INTERCONNECT_SESSION_TCP; - } + } TInputSessionTCP(const TActorId& sessionId, - TIntrusivePtr<NInterconnect::TStreamSocket> socket, - TIntrusivePtr<TReceiveContext> context, + TIntrusivePtr<NInterconnect::TStreamSocket> socket, + TIntrusivePtr<TReceiveContext> context, TInterconnectProxyCommon::TPtr common, std::shared_ptr<IInterconnectMetrics> metrics, ui32 nodeId, @@ -189,8 +189,8 @@ namespace NActors { TDuration deadPeerTimeout, TSessionParams params); - private: - friend class TActorBootstrapped<TInputSessionTCP>; + private: + friend class TActorBootstrapped<TInputSessionTCP>; void Bootstrap(); @@ -204,13 +204,13 @@ namespace NActors { cFunc(TEvConfirmUpdate::EventType, HandleConfirmUpdate) ) - private: + private: TRope IncomingData; const TActorId SessionId; - TIntrusivePtr<NInterconnect::TStreamSocket> Socket; + TIntrusivePtr<NInterconnect::TStreamSocket> Socket; TPollerToken::TPtr PollerToken; - TIntrusivePtr<TReceiveContext> Context; + TIntrusivePtr<TReceiveContext> Context; TInterconnectProxyCommon::TPtr Common; const ui32 NodeId; const TSessionParams Params; @@ -235,11 +235,11 @@ namespace NActors { THolder<TEvUpdateFromInputSession> UpdateFromInputSession; - ui64 ConfirmedByInput; + ui64 ConfirmedByInput; std::shared_ptr<IInterconnectMetrics> Metrics; - bool CloseInputSessionRequested = false; + bool CloseInputSessionRequested = false; void CloseInputSession(); @@ -258,12 +258,12 @@ namespace NActors { TDeque<TIntrusivePtr<TRopeAlignedBuffer>> Buffers; - static constexpr size_t NumPreallocatedBuffers = 16; - void PreallocateBuffers(); + static constexpr size_t NumPreallocatedBuffers = 16; + void PreallocateBuffers(); - inline ui64 GetMaxCyclesPerEvent() const { + inline ui64 GetMaxCyclesPerEvent() const { return DurationToCycles(TDuration::MicroSeconds(500)); - } + } const TDuration DeadPeerTimeout; TInstant LastReceiveTimestamp; @@ -278,21 +278,21 @@ namespace NActors { void HandlePingResponse(TDuration passed); void HandleClock(TInstant clock); - }; + }; - class TInterconnectSessionTCP + class TInterconnectSessionTCP : public TActor<TInterconnectSessionTCP> , public TInterconnectLoggingBase { - enum { + enum { EvCheckCloseOnIdle = EventSpaceBegin(TEvents::ES_PRIVATE), EvCheckLostConnection, EvRam, EvTerminate, EvFreeItems, - }; + }; - struct TEvCheckCloseOnIdle : TEventLocal<TEvCheckCloseOnIdle, EvCheckCloseOnIdle> {}; + struct TEvCheckCloseOnIdle : TEventLocal<TEvCheckCloseOnIdle, EvCheckCloseOnIdle> {}; struct TEvCheckLostConnection : TEventLocal<TEvCheckLostConnection, EvCheckLostConnection> {}; struct TEvRam : TEventLocal<TEvRam, EvRam> { @@ -308,18 +308,18 @@ namespace NActors { {} }; - const TInstant Created; - TInstant NewConnectionSet; - ui64 MessagesGot = 0; - ui64 MessagesWrittenToBuffer = 0; - ui64 PacketsGenerated = 0; - ui64 PacketsWrittenToSocket = 0; - ui64 PacketsConfirmed = 0; + const TInstant Created; + TInstant NewConnectionSet; + ui64 MessagesGot = 0; + ui64 MessagesWrittenToBuffer = 0; + ui64 PacketsGenerated = 0; + ui64 PacketsWrittenToSocket = 0; + ui64 PacketsConfirmed = 0; - public: - static constexpr EActivityType ActorActivityType() { - return INTERCONNECT_SESSION_TCP; - } + public: + static constexpr EActivityType ActorActivityType() { + return INTERCONNECT_SESSION_TCP; + } TInterconnectSessionTCP(TInterconnectProxyTCP* const proxy, TSessionParams params); ~TInterconnectSessionTCP(); @@ -339,8 +339,8 @@ namespace NActors { return ReceiveContext->ClockSkew_us; } - private: - friend class TInterconnectProxyTCP; + private: + friend class TInterconnectProxyTCP; void Handle(TEvTerminate::TPtr& ev); void HandlePoison(); @@ -400,7 +400,7 @@ namespace NActors { void ReestablishConnectionWithHandshake(TDisconnectReason reason); void ReestablishConnectionExecute(); - TInterconnectProxyTCP* const Proxy; + TInterconnectProxyTCP* const Proxy; // various connection settings access TDuration GetDeadPeerTimeout() const; @@ -418,11 +418,11 @@ namespace NActors { void IssuePingRequest(); void Handle(TEvProcessPingRequest::TPtr ev); - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - TInstant LastInputActivityTimestamp; - TInstant LastPayloadActivityTimestamp; - TWatchdogTimer<TEvCheckCloseOnIdle> CloseOnIdleWatchdog; + TInstant LastInputActivityTimestamp; + TInstant LastPayloadActivityTimestamp; + TWatchdogTimer<TEvCheckCloseOnIdle> CloseOnIdleWatchdog; TWatchdogTimer<TEvCheckLostConnection> LostConnectionWatchdog; void OnCloseOnIdleTimerHit() { @@ -435,26 +435,26 @@ namespace NActors { Terminate(TDisconnectReason::LostConnection()); } - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// const TSessionParams Params; TMaybe<TEventHolderPool> Pool; TMaybe<TChannelScheduler> ChannelScheduler; - ui64 TotalOutputQueueSize; - bool OutputStuckFlag; - TRecentWnd<std::pair<ui64, ui64>> OutputQueueUtilization; + ui64 TotalOutputQueueSize; + bool OutputStuckFlag; + TRecentWnd<std::pair<ui64, ui64>> OutputQueueUtilization; size_t NumEventsInReadyChannels = 0; - void SetOutputStuckFlag(bool state); - void SwitchStuckPeriod(); + void SetOutputStuckFlag(bool state); + void SwitchStuckPeriod(); - using TSendQueue = TList<TTcpPacketOutTask>; - TSendQueue SendQueue; - TSendQueue SendQueueCache; - TSendQueue::iterator SendQueuePos; + using TSendQueue = TList<TTcpPacketOutTask>; + TSendQueue SendQueue; + TSendQueue SendQueueCache; + TSendQueue::iterator SendQueuePos; ui64 WriteBlockedCycles = 0; // start of current block period TDuration WriteBlockedTotal; // total incremental duration that session has been blocked - ui64 BytesUnwritten = 0; + ui64 BytesUnwritten = 0; void TrimSendQueueCache(); @@ -467,21 +467,21 @@ namespace NActors { } } - ui64 OutputCounter; + ui64 OutputCounter; ui64 LastSentSerial = 0; - TInstant LastHandshakeDone; + TInstant LastHandshakeDone; - TIntrusivePtr<NInterconnect::TStreamSocket> Socket; + TIntrusivePtr<NInterconnect::TStreamSocket> Socket; TPollerToken::TPtr PollerToken; ui32 SendBufferSize; ui64 InflightDataAmount = 0; std::unordered_map<TActorId, ui64, TActorId::THash> Subscribers; - // time at which we want to send confirmation packet even if there was no outgoing data - ui64 UnconfirmedBytes = 0; - TInstant ForcePacketTimestamp = TInstant::Max(); + // time at which we want to send confirmation packet even if there was no outgoing data + ui64 UnconfirmedBytes = 0; + TInstant ForcePacketTimestamp = TInstant::Max(); TPriorityQueue<TInstant, TVector<TInstant>, std::greater<TInstant>> FlushSchedule; size_t MaxFlushSchedule = 0; ui64 FlushEventsScheduled = 0; @@ -494,71 +494,71 @@ namespace NActors { void GenerateHttpInfo(TStringStream& str); - TIntrusivePtr<TReceiveContext> ReceiveContext; + TIntrusivePtr<TReceiveContext> ReceiveContext; TActorId ReceiverId; TDuration Ping; - ui64 ConfirmPacketsForcedBySize = 0; - ui64 ConfirmPacketsForcedByTimeout = 0; + ui64 ConfirmPacketsForcedBySize = 0; + ui64 ConfirmPacketsForcedByTimeout = 0; - ui64 LastConfirmed = 0; + ui64 LastConfirmed = 0; TEvHandshakeDone::TPtr PendingHandshakeDoneEvent; bool StartHandshakeOnSessionClose = false; ui64 EqualizeCounter = 0; - }; + }; - class TInterconnectSessionKiller - : public TActorBootstrapped<TInterconnectSessionKiller> { - ui32 RepliesReceived = 0; - ui32 RepliesNumber = 0; + class TInterconnectSessionKiller + : public TActorBootstrapped<TInterconnectSessionKiller> { + ui32 RepliesReceived = 0; + ui32 RepliesNumber = 0; TActorId LargestSession = TActorId(); - ui64 MaxBufferSize = 0; + ui64 MaxBufferSize = 0; TInterconnectProxyCommon::TPtr Common; - public: + public: static constexpr EActivityType ActorActivityType() { return INTERCONNECT_SESSION_KILLER; } TInterconnectSessionKiller(TInterconnectProxyCommon::TPtr common) - : Common(common) - { - } + : Common(common) + { + } void Bootstrap() { - auto sender = SelfId(); + auto sender = SelfId(); const auto eventFabric = [&sender](const TActorId& recp) -> IEventHandle* { - auto ev = new TEvSessionBufferSizeRequest(); - return new IEventHandle(recp, sender, ev, IEventHandle::FlagTrackDelivery); - }; + auto ev = new TEvSessionBufferSizeRequest(); + return new IEventHandle(recp, sender, ev, IEventHandle::FlagTrackDelivery); + }; RepliesNumber = TlsActivationContext->ExecutorThread.ActorSystem->BroadcastToProxies(eventFabric); - Become(&TInterconnectSessionKiller::StateFunc); - } + Become(&TInterconnectSessionKiller::StateFunc); + } STRICT_STFUNC(StateFunc, hFunc(TEvSessionBufferSizeResponse, ProcessResponse) cFunc(TEvents::TEvUndelivered::EventType, ProcessUndelivered) ) - + void ProcessResponse(TEvSessionBufferSizeResponse::TPtr& ev) { - RepliesReceived++; - if (MaxBufferSize < ev->Get()->BufferSize) { - MaxBufferSize = ev->Get()->BufferSize; - LargestSession = ev->Get()->SessionID; - } - if (RepliesReceived == RepliesNumber) { + RepliesReceived++; + if (MaxBufferSize < ev->Get()->BufferSize) { + MaxBufferSize = ev->Get()->BufferSize; + LargestSession = ev->Get()->SessionID; + } + if (RepliesReceived == RepliesNumber) { Send(LargestSession, new TEvents::TEvPoisonPill); - AtomicUnlock(&Common->StartedSessionKiller); + AtomicUnlock(&Common->StartedSessionKiller); PassAway(); - } + } } - + void ProcessUndelivered() { - RepliesReceived++; + RepliesReceived++; } - }; + }; void CreateSessionKillingActor(TInterconnectProxyCommon::TPtr common); |