diff options
author | alexvru <alexvru@ydb.tech> | 2023-01-18 22:06:52 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-01-18 22:06:52 +0300 |
commit | bc05be2ea40570222a091cb5e3b896532e3c3ce8 (patch) | |
tree | c498df1462880df835b8132b0777cc685340ce53 /library/cpp | |
parent | 3fe90528505b2aa0a4b2dfe7938816aef82649a4 (diff) | |
download | ydb-bc05be2ea40570222a091cb5e3b896532e3c3ce8.tar.gz |
Use monotonic clock for time interval measurements in Interconnect
Diffstat (limited to 'library/cpp')
6 files changed, 54 insertions, 44 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index c18e2a51373..b1212b89140 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -39,7 +39,7 @@ namespace NActors { SetPrefix(Sprintf("InputSession %s [node %" PRIu32 "]", SelfId().ToString().data(), NodeId)); Become(&TThis::WorkingState, DeadPeerTimeout, new TEvCheckDeadPeer); LOG_DEBUG_IC_SESSION("ICIS01", "InputSession created"); - LastReceiveTimestamp = TActivationContext::Now(); + LastReceiveTimestamp = TActivationContext::Monotonic(); ReceiveData(); } @@ -440,7 +440,7 @@ namespace NActors { } } - LastReceiveTimestamp = TActivationContext::Now(); + LastReceiveTimestamp = TActivationContext::Monotonic(); return true; } @@ -476,7 +476,7 @@ namespace NActors { } void TInputSessionTCP::HandleCheckDeadPeer() { - const TInstant now = TActivationContext::Now(); + const TMonotonic now = TActivationContext::Monotonic(); if (now >= LastReceiveTimestamp + DeadPeerTimeout) { ReceiveData(); if (Socket && now >= LastReceiveTimestamp + DeadPeerTimeout) { @@ -484,7 +484,7 @@ namespace NActors { DestroySession(TDisconnectReason::DeadPeer()); } } - Schedule(LastReceiveTimestamp + DeadPeerTimeout - now, new TEvCheckDeadPeer); + Schedule(LastReceiveTimestamp + DeadPeerTimeout, new TEvCheckDeadPeer); } void TInputSessionTCP::HandlePingResponse(TDuration passed) { diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp index 938c3480a8e..3edded47d6a 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp @@ -40,7 +40,6 @@ namespace NActors { SetPrefix(Sprintf("Proxy %s [node %" PRIu32 "]", SelfId().ToString().data(), PeerNodeId)); SwitchToInitialState(); - PassAwayTimestamp = TActivationContext::Now() + TDuration::Seconds(15); LOG_INFO_IC("ICP01", "ready to work"); } @@ -563,7 +562,7 @@ namespace NActors { ValidateEvent(ev, "EnqueueSessionEvent"); const ui32 size = ev->GetSize(); PendingSessionEventsSize += size; - PendingSessionEvents.emplace_back(TActivationContext::Now() + Common->Settings.MessagePendingTimeout, size, ev); + PendingSessionEvents.emplace_back(TActivationContext::Monotonic() + Common->Settings.MessagePendingTimeout, size, ev); ScheduleCleanupEventQueue(); CleanupEventQueue(); } @@ -810,7 +809,7 @@ namespace NActors { if (!CleanupEventQueueScheduled && PendingSessionEvents) { // apply batching at 50 ms granularity - Schedule(Max(TDuration::MilliSeconds(50), PendingSessionEvents.front().Deadline - TActivationContext::Now()), new TEvCleanupEventQueue); + Schedule(Max(TDuration::MilliSeconds(50), PendingSessionEvents.front().Deadline - TActivationContext::Monotonic()), new TEvCleanupEventQueue); CleanupEventQueueScheduled = true; } } @@ -827,7 +826,7 @@ namespace NActors { void TInterconnectProxyTCP::CleanupEventQueue() { ICPROXY_PROFILED; - const TInstant now = TActivationContext::Now(); + const TMonotonic now = TActivationContext::Monotonic(); while (PendingSessionEvents) { TPendingSessionEvent& ev = PendingSessionEvents.front(); if (now >= ev.Deadline || PendingSessionEventsSize > Common->Settings.MessagePendingSize) { diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h index b7c9c3cc8a0..71edfccbe21 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h @@ -176,11 +176,18 @@ namespace NActors { Become(std::forward<TArgs>(args)...); Y_VERIFY(!Terminated || CurrentStateFunc() == &TThis::HoldByError); // ensure we never escape this state if (CurrentStateFunc() != &TThis::PendingActivation) { - PassAwayTimestamp = TInstant::Max(); + PassAwayTimestamp = TMonotonic::Max(); + } else if (DynamicPtr) { + PassAwayTimestamp = TActivationContext::Monotonic() + TDuration::Seconds(15); + if (!PassAwayScheduled) { + TActivationContext::Schedule(PassAwayTimestamp, new IEventHandle(EvPassAwayIfNeeded, 0, SelfId(), + {}, nullptr, 0)); + PassAwayScheduled = true; + } } } - TInstant PassAwayTimestamp; + TMonotonic PassAwayTimestamp; bool PassAwayScheduled = false; void SwitchToInitialState() { @@ -190,17 +197,18 @@ namespace NActors { " PendingIncomingHandshakeEvents# %zu State# %s", LogPrefix.data(), PendingSessionEvents.size(), PendingIncomingHandshakeEvents.size(), State); SwitchToState(__LINE__, "PendingActivation", &TThis::PendingActivation); - if (DynamicPtr && !PassAwayScheduled && PassAwayTimestamp != TInstant::Max()) { - TActivationContext::Schedule(PassAwayTimestamp, new IEventHandle(EvPassAwayIfNeeded, 0, SelfId(), - {}, nullptr, 0)); - PassAwayScheduled = true; - } } void HandlePassAwayIfNeeded() { Y_VERIFY(PassAwayScheduled); - if (PassAwayTimestamp != TInstant::Max()) { + const TMonotonic now = TActivationContext::Monotonic(); + if (now >= PassAwayTimestamp) { PassAway(); + } else if (PassAwayTimestamp != TMonotonic::Max()) { + TActivationContext::Schedule(PassAwayTimestamp, new IEventHandle(EvPassAwayIfNeeded, 0, SelfId(), + {}, nullptr, 0)); + } else { + PassAwayScheduled = false; } } @@ -388,11 +396,11 @@ namespace NActors { // hold all events before connection is established struct TPendingSessionEvent { - TInstant Deadline; + TMonotonic Deadline; ui32 Size; THolder<IEventHandle> Event; - TPendingSessionEvent(TInstant deadline, ui32 size, TAutoPtr<IEventHandle> event) + TPendingSessionEvent(TMonotonic deadline, ui32 size, TAutoPtr<IEventHandle> event) : Deadline(deadline) , Size(size) , Event(event) diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index 7fae0db16a5..dfc4d411d3b 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -232,7 +232,7 @@ namespace NActors { CloseOnIdleWatchdog.Arm(SelfId()); // reset activity timestamps - LastInputActivityTimestamp = LastPayloadActivityTimestamp = TActivationContext::Now(); + LastInputActivityTimestamp = LastPayloadActivityTimestamp = TActivationContext::Monotonic(); LOG_INFO_IC_SESSION("ICS10", "traffic start"); @@ -315,7 +315,7 @@ namespace NActors { bool needConfirm = false; // update activity timer for dead peer checker - LastInputActivityTimestamp = TActivationContext::Now(); + LastInputActivityTimestamp = TActivationContext::Monotonic(); if (msg.NumDataBytes) { UnconfirmedBytes += msg.NumDataBytes; @@ -326,7 +326,7 @@ namespace NActors { } // reset payload watchdog that controls close-on-idle behaviour - LastPayloadActivityTimestamp = TActivationContext::Now(); + LastPayloadActivityTimestamp = TActivationContext::Monotonic(); CloseOnIdleWatchdog.Reset(); } @@ -654,7 +654,7 @@ namespace NActors { void TInterconnectSessionTCP::SetForcePacketTimestamp(TDuration period) { if (period != TDuration::Max()) { - const TInstant when = TActivationContext::Now() + period; + const TMonotonic when = TActivationContext::Monotonic() + period; if (when < ForcePacketTimestamp) { ForcePacketTimestamp = when; ScheduleFlush(); @@ -664,7 +664,7 @@ namespace NActors { void TInterconnectSessionTCP::ScheduleFlush() { if (FlushSchedule.empty() || ForcePacketTimestamp < FlushSchedule.top()) { - Schedule(ForcePacketTimestamp - TActivationContext::Now(), new TEvFlush); + Schedule(ForcePacketTimestamp, new TEvFlush); FlushSchedule.push(ForcePacketTimestamp); MaxFlushSchedule = Max(MaxFlushSchedule, FlushSchedule.size()); ++FlushEventsScheduled; @@ -672,7 +672,7 @@ namespace NActors { } void TInterconnectSessionTCP::HandleFlush() { - const TInstant now = TActivationContext::Now(); + const TMonotonic now = TActivationContext::Monotonic(); while (FlushSchedule && now >= FlushSchedule.top()) { FlushSchedule.pop(); } @@ -682,14 +682,14 @@ namespace NActors { ++ConfirmPacketsForcedByTimeout; ++FlushEventsProcessed; MakePacket(false); // just generate confirmation packet if we have preconditions for this - } else if (ForcePacketTimestamp != TInstant::Max()) { + } else if (ForcePacketTimestamp != TMonotonic::Max()) { ScheduleFlush(); } } } void TInterconnectSessionTCP::ResetFlushLogic() { - ForcePacketTimestamp = TInstant::Max(); + ForcePacketTimestamp = TMonotonic::Max(); UnconfirmedBytes = 0; const TDuration ping = Proxy->Common->Settings.PingPeriod; if (ping != TDuration::Zero() && !NumEventsInReadyChannels) { @@ -761,7 +761,7 @@ namespace NActors { } // update payload activity timer - LastPayloadActivityTimestamp = TActivationContext::Now(); + LastPayloadActivityTimestamp = TActivationContext::Monotonic(); } else if (pingMask) { serial = *pingMask; @@ -923,7 +923,7 @@ namespace NActors { flagState = EFlag::GREEN; do { - auto lastInputDelay = TActivationContext::Now() - LastInputActivityTimestamp; + auto lastInputDelay = TActivationContext::Monotonic() - LastInputActivityTimestamp; if (lastInputDelay * 4 >= GetDeadPeerTimeout() * 3) { flagState = EFlag::ORANGE; break; @@ -1006,7 +1006,7 @@ namespace NActors { } void TInterconnectSessionTCP::IssuePingRequest() { - const TInstant now = TActivationContext::Now(); + const TMonotonic now = TActivationContext::Monotonic(); if (now >= LastPingTimestamp + PingPeriodicity) { LOG_DEBUG_IC_SESSION("ICS22", "Issuing ping request"); if (Socket) { @@ -1176,6 +1176,8 @@ namespace NActors { ui32 unsentQueueSize = Socket ? Socket->GetUnsentQueueSize() : 0; + const TMonotonic now = TActivationContext::Monotonic(); + MON_VAR(OutputStuckFlag) MON_VAR(SendQueue.size()) MON_VAR(SendQueueCache.size()) @@ -1185,8 +1187,8 @@ namespace NActors { MON_VAR(InflightDataAmount) MON_VAR(unsentQueueSize) MON_VAR(SendBufferSize) - MON_VAR(LastInputActivityTimestamp) - MON_VAR(LastPayloadActivityTimestamp) + MON_VAR(now - LastInputActivityTimestamp) + MON_VAR(now - LastPayloadActivityTimestamp) MON_VAR(LastHandshakeDone) MON_VAR(OutputCounter) MON_VAR(LastSentSerial) @@ -1205,7 +1207,7 @@ namespace NActors { clockSkew = Sprintf("+%s", TDuration::MicroSeconds(x).ToString().data()); } - MON_VAR(LastPingTimestamp) + MON_VAR(now - LastPingTimestamp) MON_VAR(GetPingRTT()) MON_VAR(clockSkew) diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index 51c5bfa453f..598a5c9220c 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -266,7 +266,7 @@ namespace NActors { } const TDuration DeadPeerTimeout; - TInstant LastReceiveTimestamp; + TMonotonic LastReceiveTimestamp; void HandleCheckDeadPeer(); //////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -413,15 +413,15 @@ namespace NActors { //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // pinger - TInstant LastPingTimestamp; + TMonotonic LastPingTimestamp; static constexpr TDuration PingPeriodicity = TDuration::Seconds(1); void IssuePingRequest(); void Handle(TEvProcessPingRequest::TPtr ev); //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - TInstant LastInputActivityTimestamp; - TInstant LastPayloadActivityTimestamp; + TMonotonic LastInputActivityTimestamp; + TMonotonic LastPayloadActivityTimestamp; TWatchdogTimer<TEvCheckCloseOnIdle> CloseOnIdleWatchdog; TWatchdogTimer<TEvCheckLostConnection> LostConnectionWatchdog; @@ -481,8 +481,8 @@ namespace NActors { // time at which we want to send confirmation packet even if there was no outgoing data ui64 UnconfirmedBytes = 0; - TInstant ForcePacketTimestamp = TInstant::Max(); - TPriorityQueue<TInstant, TVector<TInstant>, std::greater<TInstant>> FlushSchedule; + TMonotonic ForcePacketTimestamp = TMonotonic::Max(); + TPriorityQueue<TMonotonic, TVector<TMonotonic>, std::greater<TMonotonic>> FlushSchedule; size_t MaxFlushSchedule = 0; ui64 FlushEventsScheduled = 0; ui64 FlushEventsProcessed = 0; diff --git a/library/cpp/actors/interconnect/watchdog_timer.h b/library/cpp/actors/interconnect/watchdog_timer.h index c190105a592..fe62006e3b9 100644 --- a/library/cpp/actors/interconnect/watchdog_timer.h +++ b/library/cpp/actors/interconnect/watchdog_timer.h @@ -8,7 +8,7 @@ namespace NActors { const TDuration Timeout; const TCallback Callback; - TInstant LastResetTimestamp; + TMonotonic LastResetTimestamp; TEvent* ExpectedEvent = nullptr; ui32 Iteration = 0; @@ -29,7 +29,7 @@ namespace NActors { } void Reset() { - LastResetTimestamp = TActivationContext::Now(); + LastResetTimestamp = TActivationContext::Monotonic(); } void Disarm() { @@ -38,11 +38,11 @@ namespace NActors { void operator()(typename TEvent::TPtr& ev) { if (ev->Get() == ExpectedEvent) { - const TInstant now = TActivationContext::Now(); - const TInstant barrier = LastResetTimestamp + Timeout; + const TMonotonic now = TActivationContext::Monotonic(); + const TMonotonic barrier = LastResetTimestamp + Timeout; if (now < barrier) { // the time hasn't come yet - Schedule(barrier - now, TActorIdentity(ev->Recipient)); + Schedule(barrier, TActorIdentity(ev->Recipient)); } else if (Iteration < NumIterationsBeforeFiring) { // time has come, but we will still give actor a chance to process some messages and rearm timer ++Iteration; @@ -57,7 +57,8 @@ namespace NActors { } private: - void Schedule(TDuration timeout, const TActorIdentity& actor) { + template<typename T> + void Schedule(T&& timeout, const TActorIdentity& actor) { auto ev = MakeHolder<TEvent>(); ExpectedEvent = ev.Get(); Iteration = 0; |