diff options
author | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-03-31 10:54:08 +0300 |
---|---|---|
committer | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-03-31 12:28:07 +0300 |
commit | fc1cffcfa7f0497a1f97b384a24bcbf23362f3be (patch) | |
tree | c15f7ab5b9e9b20fd0ef8fc07d598d28e8b32004 /library/cpp/actors/interconnect | |
parent | 8a749596d40e91c896a1907afcd108d9221fbde1 (diff) | |
download | ydb-fc1cffcfa7f0497a1f97b384a24bcbf23362f3be.tar.gz |
Ydb stable 23-1-1923.1.19
x-stable-origin-commit: c5d5a396e89d0a72e0267a55e93d8404d4fb54fe
Diffstat (limited to 'library/cpp/actors/interconnect')
9 files changed, 85 insertions, 194 deletions
diff --git a/library/cpp/actors/interconnect/events_local.h b/library/cpp/actors/interconnect/events_local.h index 43f376038b..b1b8ae0c75 100644 --- a/library/cpp/actors/interconnect/events_local.h +++ b/library/cpp/actors/interconnect/events_local.h @@ -52,9 +52,6 @@ namespace NActors { EvProcessPingRequest, EvGetSecureSocket, EvSecureSocket, - HandshakeBrokerTake, - HandshakeBrokerFree, - HandshakeBrokerPermit, //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // nonlocal messages; their indices must be preserved in order to work properly while doing rolling update @@ -101,18 +98,6 @@ namespace NActors { } }; - struct TEvHandshakeBrokerTake: public TEventLocal<TEvHandshakeBrokerTake, ui32(ENetwork::HandshakeBrokerTake)> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerTake, "Network: TEvHandshakeBrokerTake") - }; - - struct TEvHandshakeBrokerFree: public TEventLocal<TEvHandshakeBrokerFree, ui32(ENetwork::HandshakeBrokerFree)> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerFree, "Network: TEvHandshakeBrokerFree") - }; - - struct TEvHandshakeBrokerPermit: public TEventLocal<TEvHandshakeBrokerPermit, ui32(ENetwork::HandshakeBrokerPermit)> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerPermit, "Network: TEvHandshakeBrokerPermit") - }; - struct TEvHandshakeAsk: public TEventLocal<TEvHandshakeAsk, ui32(ENetwork::HandshakeAsk)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeAsk, "Network: TEvHandshakeAsk") TEvHandshakeAsk(const TActorId& self, diff --git a/library/cpp/actors/interconnect/handshake_broker.h b/library/cpp/actors/interconnect/handshake_broker.h deleted file mode 100644 index 70a7cb91dc..0000000000 --- a/library/cpp/actors/interconnect/handshake_broker.h +++ /dev/null @@ -1,78 +0,0 @@ -#pragma once - -#include <library/cpp/actors/core/actor.h> - -#include <deque> - -namespace NActors { - static constexpr ui32 DEFAULT_INFLIGHT = 100; - - class THandshakeBroker : public TActor<THandshakeBroker> { - private: - std::deque<TActorId> Waiting; - ui32 Capacity; - - void Handle(TEvHandshakeBrokerTake::TPtr &ev) { - if (Capacity > 0) { - Capacity -= 1; - Send(ev->Sender, new TEvHandshakeBrokerPermit()); - } else { - Waiting.push_back(ev->Sender); - } - } - - void Handle(TEvHandshakeBrokerFree::TPtr& ev) { - Y_UNUSED(ev); - if (Capacity == 0 && !Waiting.empty()) { - Send(Waiting.front(), new TEvHandshakeBrokerPermit()); - Waiting.pop_front(); - } else { - Capacity += 1; - } - } - - void PassAway() override { - while (!Waiting.empty()) { - Send(Waiting.front(), new TEvHandshakeBrokerPermit()); - Waiting.pop_front(); - } - TActor::PassAway(); - } - - public: - THandshakeBroker(ui32 inflightLimit = DEFAULT_INFLIGHT) - : TActor(&TThis::StateFunc) - , Capacity(inflightLimit) - { - } - - static constexpr char ActorName[] = "HANDSHAKE_BROKER_ACTOR"; - - STFUNC(StateFunc) { - Y_UNUSED(ctx); - switch(ev->GetTypeRewrite()) { - hFunc(TEvHandshakeBrokerTake, Handle); - hFunc(TEvHandshakeBrokerFree, Handle); - cFunc(TEvents::TSystem::Poison, PassAway); - } - } - - void Bootstrap() { - Become(&TThis::StateFunc); - }; - }; - - inline IActor* CreateHandshakeBroker() { - return new THandshakeBroker(); - } - - inline TActorId MakeHandshakeBrokerOutId() { - char x[12] = {'I', 'C', 'H', 's', 'h', 'k', 'B', 'r', 'k', 'O', 'u', 't'}; - return TActorId(0, TStringBuf(std::begin(x), std::end(x))); - } - - inline TActorId MakeHandshakeBrokerInId() { - char x[12] = {'I', 'C', 'H', 's', 'h', 'k', 'B', 'r', 'k', 'r', 'I', 'n'}; - return TActorId(0, TStringBuf(std::begin(x), std::end(x))); - } -}; diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp index a9c6b1dd11..dc651f3762 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.cpp +++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp @@ -1,5 +1,4 @@ #include "interconnect_handshake.h" -#include "handshake_broker.h" #include "interconnect_tcp_proxy.h" #include <library/cpp/actors/core/actor_coroutine.h> @@ -97,13 +96,8 @@ namespace NActors { THashMap<ui32, TInstant> LastLogNotice; const TDuration MuteDuration = TDuration::Seconds(15); TInstant Deadline; - TActorId HandshakeBroker; public: - static constexpr IActor::EActivityType ActorActivityType() { - return IActor::INTERCONNECT_HANDSHAKE; - } - THandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, const TActorId& peer, ui32 nodeId, ui64 nextPacket, TString peerHostName, TSessionParams params) : TActorCoroImpl(StackSize, true, true) // allow unhandled poison pills and dtors @@ -119,7 +113,6 @@ namespace NActors { Y_VERIFY(SelfVirtualId); Y_VERIFY(SelfVirtualId.NodeId()); Y_VERIFY(PeerNodeId); - HandshakeBroker = MakeHandshakeBrokerOutId(); } THandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket) @@ -135,7 +128,6 @@ namespace NActors { } else { PeerAddr.clear(); } - HandshakeBroker = MakeHandshakeBrokerInId(); } void UpdatePrefix() { @@ -145,64 +137,45 @@ namespace NActors { void Run() override { UpdatePrefix(); - bool isBrokerActive = false; - - if (Send(HandshakeBroker, new TEvHandshakeBrokerTake())) { - isBrokerActive = true; - WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit"); + // set up overall handshake process timer + TDuration timeout = Common->Settings.Handshake; + if (timeout == TDuration::Zero()) { + timeout = DEFAULT_HANDSHAKE_TIMEOUT; } + timeout += ResolveTimeout * 2; + Deadline = Now() + timeout; + Schedule(Deadline, new TEvents::TEvWakeup); try { - // set up overall handshake process timer - TDuration timeout = Common->Settings.Handshake; - if (timeout == TDuration::Zero()) { - timeout = DEFAULT_HANDSHAKE_TIMEOUT; - } - timeout += ResolveTimeout * 2; - Deadline = Now() + timeout; - Schedule(Deadline, new TEvents::TEvWakeup); - - try { - if (Socket) { - PerformIncomingHandshake(); - } else { - PerformOutgoingHandshake(); - } - - // establish encrypted channel, or, in case when encryption is disabled, check if it matches settings - if (ProgramInfo) { - if (Params.Encryption) { - EstablishSecureConnection(); - } else if (Common->Settings.EncryptionMode == EEncryptionMode::REQUIRED && !Params.AuthOnly) { - Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Peer doesn't support encryption, which is required"); - } - } - } catch (const TExHandshakeFailed&) { - ProgramInfo.Clear(); + if (Socket) { + PerformIncomingHandshake(); + } else { + PerformOutgoingHandshake(); } + // establish encrypted channel, or, in case when encryption is disabled, check if it matches settings if (ProgramInfo) { - LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH04", NLog::PRI_INFO, "handshake succeeded"); - Y_VERIFY(NextPacketFromPeer); - if (PollerToken) { - Y_VERIFY(PollerToken->RefCount() == 1); - PollerToken.Reset(); // ensure we are going to destroy poller token here as we will re-register the socket within other actor + if (Params.Encryption) { + EstablishSecureConnection(); + } else if (Common->Settings.EncryptionMode == EEncryptionMode::REQUIRED && !Params.AuthOnly) { + Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Peer doesn't support encryption, which is required"); } - SendToProxy(MakeHolder<TEvHandshakeDone>(std::move(Socket), PeerVirtualId, SelfVirtualId, - *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params))); - } - } catch (const TDtorException&) { - throw; // we can't use actor system when handling this exception - } catch (...) { - if (isBrokerActive) { - Send(HandshakeBroker, new TEvHandshakeBrokerFree()); } - throw; + } catch (const TExHandshakeFailed&) { + ProgramInfo.Clear(); } - if (isBrokerActive) { - Send(HandshakeBroker, new TEvHandshakeBrokerFree()); + if (ProgramInfo) { + LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH04", NLog::PRI_INFO, "handshake succeeded"); + Y_VERIFY(NextPacketFromPeer); + if (PollerToken) { + Y_VERIFY(PollerToken->RefCount() == 1); + PollerToken.Reset(); // ensure we are going to destroy poller token here as we will re-register the socket within other actor + } + SendToProxy(MakeHolder<TEvHandshakeDone>(std::move(Socket), PeerVirtualId, SelfVirtualId, + *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params))); } + Socket.Reset(); } @@ -1022,11 +995,12 @@ namespace NActors { const TActorId& peer, ui32 nodeId, ui64 nextPacket, TString peerHostName, TSessionParams params) { return new TActorCoro(MakeHolder<THandshakeActor>(std::move(common), self, peer, nodeId, nextPacket, - std::move(peerHostName), std::move(params))); + std::move(peerHostName), std::move(params)), IActor::INTERCONNECT_HANDSHAKE); } IActor* CreateIncomingHandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket) { - return new TActorCoro(MakeHolder<THandshakeActor>(std::move(common), std::move(socket))); + return new TActorCoro(MakeHolder<THandshakeActor>(std::move(common), std::move(socket)), + IActor::INTERCONNECT_HANDSHAKE); } } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index a8c505d94d..fdf035499f 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -39,7 +39,7 @@ namespace NActors { SetPrefix(Sprintf("InputSession %s [node %" PRIu32 "]", SelfId().ToString().data(), NodeId)); Become(&TThis::WorkingState, DeadPeerTimeout, new TEvCheckDeadPeer); LOG_DEBUG_IC_SESSION("ICIS01", "InputSession created"); - LastReceiveTimestamp = TActivationContext::Now(); + LastReceiveTimestamp = TActivationContext::Monotonic(); ReceiveData(); } @@ -437,7 +437,7 @@ namespace NActors { } } - LastReceiveTimestamp = TActivationContext::Now(); + LastReceiveTimestamp = TActivationContext::Monotonic(); return true; } @@ -473,7 +473,7 @@ namespace NActors { } void TInputSessionTCP::HandleCheckDeadPeer() { - const TInstant now = TActivationContext::Now(); + const TMonotonic now = TActivationContext::Monotonic(); if (now >= LastReceiveTimestamp + DeadPeerTimeout) { ReceiveData(); if (Socket && now >= LastReceiveTimestamp + DeadPeerTimeout) { @@ -481,7 +481,7 @@ namespace NActors { DestroySession(TDisconnectReason::DeadPeer()); } } - Schedule(LastReceiveTimestamp + DeadPeerTimeout - now, new TEvCheckDeadPeer); + Schedule(LastReceiveTimestamp + DeadPeerTimeout, new TEvCheckDeadPeer); } void TInputSessionTCP::HandlePingResponse(TDuration passed) { diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp index 7e2d8ccb94..b4cc263a4c 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp @@ -40,7 +40,6 @@ namespace NActors { SetPrefix(Sprintf("Proxy %s [node %" PRIu32 "]", SelfId().ToString().data(), PeerNodeId)); SwitchToInitialState(); - PassAwayTimestamp = TActivationContext::Now() + TDuration::Seconds(15); LOG_INFO_IC("ICP01", "ready to work"); } @@ -563,7 +562,7 @@ namespace NActors { ValidateEvent(ev, "EnqueueSessionEvent"); const ui32 size = ev->GetSize(); PendingSessionEventsSize += size; - PendingSessionEvents.emplace_back(TActivationContext::Now() + Common->Settings.MessagePendingTimeout, size, ev); + PendingSessionEvents.emplace_back(TActivationContext::Monotonic() + Common->Settings.MessagePendingTimeout, size, ev); ScheduleCleanupEventQueue(); CleanupEventQueue(); } @@ -810,7 +809,7 @@ namespace NActors { if (!CleanupEventQueueScheduled && PendingSessionEvents) { // apply batching at 50 ms granularity - Schedule(Max(TDuration::MilliSeconds(50), PendingSessionEvents.front().Deadline - TActivationContext::Now()), new TEvCleanupEventQueue); + Schedule(Max(TDuration::MilliSeconds(50), PendingSessionEvents.front().Deadline - TActivationContext::Monotonic()), new TEvCleanupEventQueue); CleanupEventQueueScheduled = true; } } @@ -827,7 +826,7 @@ namespace NActors { void TInterconnectProxyTCP::CleanupEventQueue() { ICPROXY_PROFILED; - const TInstant now = TActivationContext::Now(); + const TMonotonic now = TActivationContext::Monotonic(); while (PendingSessionEvents) { TPendingSessionEvent& ev = PendingSessionEvents.front(); if (now >= ev.Deadline || PendingSessionEventsSize > Common->Settings.MessagePendingSize) { diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h index 023e5bd1ee..b750e278e1 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h @@ -175,11 +175,18 @@ namespace NActors { Become(std::forward<TArgs>(args)...); Y_VERIFY(!Terminated || CurrentStateFunc() == &TThis::HoldByError); // ensure we never escape this state if (CurrentStateFunc() != &TThis::PendingActivation) { - PassAwayTimestamp = TInstant::Max(); + PassAwayTimestamp = TMonotonic::Max(); + } else if (DynamicPtr) { + PassAwayTimestamp = TActivationContext::Monotonic() + TDuration::Seconds(15); + if (!PassAwayScheduled) { + TActivationContext::Schedule(PassAwayTimestamp, new IEventHandle(EvPassAwayIfNeeded, 0, SelfId(), + {}, nullptr, 0)); + PassAwayScheduled = true; + } } } - TInstant PassAwayTimestamp; + TMonotonic PassAwayTimestamp; bool PassAwayScheduled = false; void SwitchToInitialState() { @@ -189,17 +196,18 @@ namespace NActors { " PendingIncomingHandshakeEvents# %zu State# %s", LogPrefix.data(), PendingSessionEvents.size(), PendingIncomingHandshakeEvents.size(), State); SwitchToState(__LINE__, "PendingActivation", &TThis::PendingActivation); - if (DynamicPtr && !PassAwayScheduled && PassAwayTimestamp != TInstant::Max()) { - TActivationContext::Schedule(PassAwayTimestamp, new IEventHandle(EvPassAwayIfNeeded, 0, SelfId(), - {}, nullptr, 0)); - PassAwayScheduled = true; - } } void HandlePassAwayIfNeeded() { Y_VERIFY(PassAwayScheduled); - if (PassAwayTimestamp != TInstant::Max()) { + const TMonotonic now = TActivationContext::Monotonic(); + if (now >= PassAwayTimestamp) { PassAway(); + } else if (PassAwayTimestamp != TMonotonic::Max()) { + TActivationContext::Schedule(PassAwayTimestamp, new IEventHandle(EvPassAwayIfNeeded, 0, SelfId(), + {}, nullptr, 0)); + } else { + PassAwayScheduled = false; } } @@ -387,11 +395,11 @@ namespace NActors { // hold all events before connection is established struct TPendingSessionEvent { - TInstant Deadline; + TMonotonic Deadline; ui32 Size; THolder<IEventHandle> Event; - TPendingSessionEvent(TInstant deadline, ui32 size, TAutoPtr<IEventHandle> event) + TPendingSessionEvent(TMonotonic deadline, ui32 size, TAutoPtr<IEventHandle> event) : Deadline(deadline) , Size(size) , Event(event) diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index feb55a16ad..18df8e42ff 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -232,7 +232,7 @@ namespace NActors { CloseOnIdleWatchdog.Arm(SelfId()); // reset activity timestamps - LastInputActivityTimestamp = LastPayloadActivityTimestamp = TActivationContext::Now(); + LastInputActivityTimestamp = LastPayloadActivityTimestamp = TActivationContext::Monotonic(); LOG_INFO_IC_SESSION("ICS10", "traffic start"); @@ -315,7 +315,7 @@ namespace NActors { bool needConfirm = false; // update activity timer for dead peer checker - LastInputActivityTimestamp = TActivationContext::Now(); + LastInputActivityTimestamp = TActivationContext::Monotonic(); if (msg.NumDataBytes) { UnconfirmedBytes += msg.NumDataBytes; @@ -326,7 +326,7 @@ namespace NActors { } // reset payload watchdog that controls close-on-idle behaviour - LastPayloadActivityTimestamp = TActivationContext::Now(); + LastPayloadActivityTimestamp = TActivationContext::Monotonic(); CloseOnIdleWatchdog.Reset(); } @@ -654,7 +654,7 @@ namespace NActors { void TInterconnectSessionTCP::SetForcePacketTimestamp(TDuration period) { if (period != TDuration::Max()) { - const TInstant when = TActivationContext::Now() + period; + const TMonotonic when = TActivationContext::Monotonic() + period; if (when < ForcePacketTimestamp) { ForcePacketTimestamp = when; ScheduleFlush(); @@ -664,7 +664,7 @@ namespace NActors { void TInterconnectSessionTCP::ScheduleFlush() { if (FlushSchedule.empty() || ForcePacketTimestamp < FlushSchedule.top()) { - Schedule(ForcePacketTimestamp - TActivationContext::Now(), new TEvFlush); + Schedule(ForcePacketTimestamp, new TEvFlush); FlushSchedule.push(ForcePacketTimestamp); MaxFlushSchedule = Max(MaxFlushSchedule, FlushSchedule.size()); ++FlushEventsScheduled; @@ -672,7 +672,7 @@ namespace NActors { } void TInterconnectSessionTCP::HandleFlush() { - const TInstant now = TActivationContext::Now(); + const TMonotonic now = TActivationContext::Monotonic(); while (FlushSchedule && now >= FlushSchedule.top()) { FlushSchedule.pop(); } @@ -682,14 +682,14 @@ namespace NActors { ++ConfirmPacketsForcedByTimeout; ++FlushEventsProcessed; MakePacket(false); // just generate confirmation packet if we have preconditions for this - } else if (ForcePacketTimestamp != TInstant::Max()) { + } else if (ForcePacketTimestamp != TMonotonic::Max()) { ScheduleFlush(); } } } void TInterconnectSessionTCP::ResetFlushLogic() { - ForcePacketTimestamp = TInstant::Max(); + ForcePacketTimestamp = TMonotonic::Max(); UnconfirmedBytes = 0; const TDuration ping = Proxy->Common->Settings.PingPeriod; if (ping != TDuration::Zero() && !NumEventsInReadyChannels) { @@ -761,7 +761,7 @@ namespace NActors { } // update payload activity timer - LastPayloadActivityTimestamp = TActivationContext::Now(); + LastPayloadActivityTimestamp = TActivationContext::Monotonic(); } else if (pingMask) { serial = *pingMask; @@ -923,7 +923,7 @@ namespace NActors { flagState = EFlag::GREEN; do { - auto lastInputDelay = TActivationContext::Now() - LastInputActivityTimestamp; + auto lastInputDelay = TActivationContext::Monotonic() - LastInputActivityTimestamp; if (lastInputDelay * 4 >= GetDeadPeerTimeout() * 3) { flagState = EFlag::ORANGE; break; @@ -1006,7 +1006,7 @@ namespace NActors { } void TInterconnectSessionTCP::IssuePingRequest() { - const TInstant now = TActivationContext::Now(); + const TMonotonic now = TActivationContext::Monotonic(); if (now >= LastPingTimestamp + PingPeriodicity) { LOG_DEBUG_IC_SESSION("ICS22", "Issuing ping request"); if (Socket) { @@ -1175,6 +1175,8 @@ namespace NActors { ui32 unsentQueueSize = Socket ? Socket->GetUnsentQueueSize() : 0; + const TMonotonic now = TActivationContext::Monotonic(); + MON_VAR(OutputStuckFlag) MON_VAR(SendQueue.size()) MON_VAR(SendQueueCache.size()) @@ -1184,8 +1186,8 @@ namespace NActors { MON_VAR(InflightDataAmount) MON_VAR(unsentQueueSize) MON_VAR(SendBufferSize) - MON_VAR(LastInputActivityTimestamp) - MON_VAR(LastPayloadActivityTimestamp) + MON_VAR(now - LastInputActivityTimestamp) + MON_VAR(now - LastPayloadActivityTimestamp) MON_VAR(LastHandshakeDone) MON_VAR(OutputCounter) MON_VAR(LastSentSerial) @@ -1204,7 +1206,7 @@ namespace NActors { clockSkew = Sprintf("+%s", TDuration::MicroSeconds(x).ToString().data()); } - MON_VAR(LastPingTimestamp) + MON_VAR(now - LastPingTimestamp) MON_VAR(GetPingRTT()) MON_VAR(clockSkew) diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index 51c5bfa453..598a5c9220 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -266,7 +266,7 @@ namespace NActors { } const TDuration DeadPeerTimeout; - TInstant LastReceiveTimestamp; + TMonotonic LastReceiveTimestamp; void HandleCheckDeadPeer(); //////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -413,15 +413,15 @@ namespace NActors { //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // pinger - TInstant LastPingTimestamp; + TMonotonic LastPingTimestamp; static constexpr TDuration PingPeriodicity = TDuration::Seconds(1); void IssuePingRequest(); void Handle(TEvProcessPingRequest::TPtr ev); //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - TInstant LastInputActivityTimestamp; - TInstant LastPayloadActivityTimestamp; + TMonotonic LastInputActivityTimestamp; + TMonotonic LastPayloadActivityTimestamp; TWatchdogTimer<TEvCheckCloseOnIdle> CloseOnIdleWatchdog; TWatchdogTimer<TEvCheckLostConnection> LostConnectionWatchdog; @@ -481,8 +481,8 @@ namespace NActors { // time at which we want to send confirmation packet even if there was no outgoing data ui64 UnconfirmedBytes = 0; - TInstant ForcePacketTimestamp = TInstant::Max(); - TPriorityQueue<TInstant, TVector<TInstant>, std::greater<TInstant>> FlushSchedule; + TMonotonic ForcePacketTimestamp = TMonotonic::Max(); + TPriorityQueue<TMonotonic, TVector<TMonotonic>, std::greater<TMonotonic>> FlushSchedule; size_t MaxFlushSchedule = 0; ui64 FlushEventsScheduled = 0; ui64 FlushEventsProcessed = 0; diff --git a/library/cpp/actors/interconnect/watchdog_timer.h b/library/cpp/actors/interconnect/watchdog_timer.h index c190105a59..fe62006e3b 100644 --- a/library/cpp/actors/interconnect/watchdog_timer.h +++ b/library/cpp/actors/interconnect/watchdog_timer.h @@ -8,7 +8,7 @@ namespace NActors { const TDuration Timeout; const TCallback Callback; - TInstant LastResetTimestamp; + TMonotonic LastResetTimestamp; TEvent* ExpectedEvent = nullptr; ui32 Iteration = 0; @@ -29,7 +29,7 @@ namespace NActors { } void Reset() { - LastResetTimestamp = TActivationContext::Now(); + LastResetTimestamp = TActivationContext::Monotonic(); } void Disarm() { @@ -38,11 +38,11 @@ namespace NActors { void operator()(typename TEvent::TPtr& ev) { if (ev->Get() == ExpectedEvent) { - const TInstant now = TActivationContext::Now(); - const TInstant barrier = LastResetTimestamp + Timeout; + const TMonotonic now = TActivationContext::Monotonic(); + const TMonotonic barrier = LastResetTimestamp + Timeout; if (now < barrier) { // the time hasn't come yet - Schedule(barrier - now, TActorIdentity(ev->Recipient)); + Schedule(barrier, TActorIdentity(ev->Recipient)); } else if (Iteration < NumIterationsBeforeFiring) { // time has come, but we will still give actor a chance to process some messages and rearm timer ++Iteration; @@ -57,7 +57,8 @@ namespace NActors { } private: - void Schedule(TDuration timeout, const TActorIdentity& actor) { + template<typename T> + void Schedule(T&& timeout, const TActorIdentity& actor) { auto ev = MakeHolder<TEvent>(); ExpectedEvent = ev.Get(); Iteration = 0; |