aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-01-18 22:06:52 +0300
committeralexvru <alexvru@ydb.tech>2023-01-18 22:06:52 +0300
commitbc05be2ea40570222a091cb5e3b896532e3c3ce8 (patch)
treec498df1462880df835b8132b0777cc685340ce53 /library/cpp
parent3fe90528505b2aa0a4b2dfe7938816aef82649a4 (diff)
downloadydb-bc05be2ea40570222a091cb5e3b896532e3c3ce8.tar.gz
Use monotonic clock for time interval measurements in Interconnect
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp8
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp7
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_proxy.h28
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp30
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.h12
-rw-r--r--library/cpp/actors/interconnect/watchdog_timer.h13
6 files changed, 54 insertions, 44 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
index c18e2a51373..b1212b89140 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
@@ -39,7 +39,7 @@ namespace NActors {
SetPrefix(Sprintf("InputSession %s [node %" PRIu32 "]", SelfId().ToString().data(), NodeId));
Become(&TThis::WorkingState, DeadPeerTimeout, new TEvCheckDeadPeer);
LOG_DEBUG_IC_SESSION("ICIS01", "InputSession created");
- LastReceiveTimestamp = TActivationContext::Now();
+ LastReceiveTimestamp = TActivationContext::Monotonic();
ReceiveData();
}
@@ -440,7 +440,7 @@ namespace NActors {
}
}
- LastReceiveTimestamp = TActivationContext::Now();
+ LastReceiveTimestamp = TActivationContext::Monotonic();
return true;
}
@@ -476,7 +476,7 @@ namespace NActors {
}
void TInputSessionTCP::HandleCheckDeadPeer() {
- const TInstant now = TActivationContext::Now();
+ const TMonotonic now = TActivationContext::Monotonic();
if (now >= LastReceiveTimestamp + DeadPeerTimeout) {
ReceiveData();
if (Socket && now >= LastReceiveTimestamp + DeadPeerTimeout) {
@@ -484,7 +484,7 @@ namespace NActors {
DestroySession(TDisconnectReason::DeadPeer());
}
}
- Schedule(LastReceiveTimestamp + DeadPeerTimeout - now, new TEvCheckDeadPeer);
+ Schedule(LastReceiveTimestamp + DeadPeerTimeout, new TEvCheckDeadPeer);
}
void TInputSessionTCP::HandlePingResponse(TDuration passed) {
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
index 938c3480a8e..3edded47d6a 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
@@ -40,7 +40,6 @@ namespace NActors {
SetPrefix(Sprintf("Proxy %s [node %" PRIu32 "]", SelfId().ToString().data(), PeerNodeId));
SwitchToInitialState();
- PassAwayTimestamp = TActivationContext::Now() + TDuration::Seconds(15);
LOG_INFO_IC("ICP01", "ready to work");
}
@@ -563,7 +562,7 @@ namespace NActors {
ValidateEvent(ev, "EnqueueSessionEvent");
const ui32 size = ev->GetSize();
PendingSessionEventsSize += size;
- PendingSessionEvents.emplace_back(TActivationContext::Now() + Common->Settings.MessagePendingTimeout, size, ev);
+ PendingSessionEvents.emplace_back(TActivationContext::Monotonic() + Common->Settings.MessagePendingTimeout, size, ev);
ScheduleCleanupEventQueue();
CleanupEventQueue();
}
@@ -810,7 +809,7 @@ namespace NActors {
if (!CleanupEventQueueScheduled && PendingSessionEvents) {
// apply batching at 50 ms granularity
- Schedule(Max(TDuration::MilliSeconds(50), PendingSessionEvents.front().Deadline - TActivationContext::Now()), new TEvCleanupEventQueue);
+ Schedule(Max(TDuration::MilliSeconds(50), PendingSessionEvents.front().Deadline - TActivationContext::Monotonic()), new TEvCleanupEventQueue);
CleanupEventQueueScheduled = true;
}
}
@@ -827,7 +826,7 @@ namespace NActors {
void TInterconnectProxyTCP::CleanupEventQueue() {
ICPROXY_PROFILED;
- const TInstant now = TActivationContext::Now();
+ const TMonotonic now = TActivationContext::Monotonic();
while (PendingSessionEvents) {
TPendingSessionEvent& ev = PendingSessionEvents.front();
if (now >= ev.Deadline || PendingSessionEventsSize > Common->Settings.MessagePendingSize) {
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
index b7c9c3cc8a0..71edfccbe21 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
@@ -176,11 +176,18 @@ namespace NActors {
Become(std::forward<TArgs>(args)...);
Y_VERIFY(!Terminated || CurrentStateFunc() == &TThis::HoldByError); // ensure we never escape this state
if (CurrentStateFunc() != &TThis::PendingActivation) {
- PassAwayTimestamp = TInstant::Max();
+ PassAwayTimestamp = TMonotonic::Max();
+ } else if (DynamicPtr) {
+ PassAwayTimestamp = TActivationContext::Monotonic() + TDuration::Seconds(15);
+ if (!PassAwayScheduled) {
+ TActivationContext::Schedule(PassAwayTimestamp, new IEventHandle(EvPassAwayIfNeeded, 0, SelfId(),
+ {}, nullptr, 0));
+ PassAwayScheduled = true;
+ }
}
}
- TInstant PassAwayTimestamp;
+ TMonotonic PassAwayTimestamp;
bool PassAwayScheduled = false;
void SwitchToInitialState() {
@@ -190,17 +197,18 @@ namespace NActors {
" PendingIncomingHandshakeEvents# %zu State# %s", LogPrefix.data(), PendingSessionEvents.size(),
PendingIncomingHandshakeEvents.size(), State);
SwitchToState(__LINE__, "PendingActivation", &TThis::PendingActivation);
- if (DynamicPtr && !PassAwayScheduled && PassAwayTimestamp != TInstant::Max()) {
- TActivationContext::Schedule(PassAwayTimestamp, new IEventHandle(EvPassAwayIfNeeded, 0, SelfId(),
- {}, nullptr, 0));
- PassAwayScheduled = true;
- }
}
void HandlePassAwayIfNeeded() {
Y_VERIFY(PassAwayScheduled);
- if (PassAwayTimestamp != TInstant::Max()) {
+ const TMonotonic now = TActivationContext::Monotonic();
+ if (now >= PassAwayTimestamp) {
PassAway();
+ } else if (PassAwayTimestamp != TMonotonic::Max()) {
+ TActivationContext::Schedule(PassAwayTimestamp, new IEventHandle(EvPassAwayIfNeeded, 0, SelfId(),
+ {}, nullptr, 0));
+ } else {
+ PassAwayScheduled = false;
}
}
@@ -388,11 +396,11 @@ namespace NActors {
// hold all events before connection is established
struct TPendingSessionEvent {
- TInstant Deadline;
+ TMonotonic Deadline;
ui32 Size;
THolder<IEventHandle> Event;
- TPendingSessionEvent(TInstant deadline, ui32 size, TAutoPtr<IEventHandle> event)
+ TPendingSessionEvent(TMonotonic deadline, ui32 size, TAutoPtr<IEventHandle> event)
: Deadline(deadline)
, Size(size)
, Event(event)
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index 7fae0db16a5..dfc4d411d3b 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -232,7 +232,7 @@ namespace NActors {
CloseOnIdleWatchdog.Arm(SelfId());
// reset activity timestamps
- LastInputActivityTimestamp = LastPayloadActivityTimestamp = TActivationContext::Now();
+ LastInputActivityTimestamp = LastPayloadActivityTimestamp = TActivationContext::Monotonic();
LOG_INFO_IC_SESSION("ICS10", "traffic start");
@@ -315,7 +315,7 @@ namespace NActors {
bool needConfirm = false;
// update activity timer for dead peer checker
- LastInputActivityTimestamp = TActivationContext::Now();
+ LastInputActivityTimestamp = TActivationContext::Monotonic();
if (msg.NumDataBytes) {
UnconfirmedBytes += msg.NumDataBytes;
@@ -326,7 +326,7 @@ namespace NActors {
}
// reset payload watchdog that controls close-on-idle behaviour
- LastPayloadActivityTimestamp = TActivationContext::Now();
+ LastPayloadActivityTimestamp = TActivationContext::Monotonic();
CloseOnIdleWatchdog.Reset();
}
@@ -654,7 +654,7 @@ namespace NActors {
void TInterconnectSessionTCP::SetForcePacketTimestamp(TDuration period) {
if (period != TDuration::Max()) {
- const TInstant when = TActivationContext::Now() + period;
+ const TMonotonic when = TActivationContext::Monotonic() + period;
if (when < ForcePacketTimestamp) {
ForcePacketTimestamp = when;
ScheduleFlush();
@@ -664,7 +664,7 @@ namespace NActors {
void TInterconnectSessionTCP::ScheduleFlush() {
if (FlushSchedule.empty() || ForcePacketTimestamp < FlushSchedule.top()) {
- Schedule(ForcePacketTimestamp - TActivationContext::Now(), new TEvFlush);
+ Schedule(ForcePacketTimestamp, new TEvFlush);
FlushSchedule.push(ForcePacketTimestamp);
MaxFlushSchedule = Max(MaxFlushSchedule, FlushSchedule.size());
++FlushEventsScheduled;
@@ -672,7 +672,7 @@ namespace NActors {
}
void TInterconnectSessionTCP::HandleFlush() {
- const TInstant now = TActivationContext::Now();
+ const TMonotonic now = TActivationContext::Monotonic();
while (FlushSchedule && now >= FlushSchedule.top()) {
FlushSchedule.pop();
}
@@ -682,14 +682,14 @@ namespace NActors {
++ConfirmPacketsForcedByTimeout;
++FlushEventsProcessed;
MakePacket(false); // just generate confirmation packet if we have preconditions for this
- } else if (ForcePacketTimestamp != TInstant::Max()) {
+ } else if (ForcePacketTimestamp != TMonotonic::Max()) {
ScheduleFlush();
}
}
}
void TInterconnectSessionTCP::ResetFlushLogic() {
- ForcePacketTimestamp = TInstant::Max();
+ ForcePacketTimestamp = TMonotonic::Max();
UnconfirmedBytes = 0;
const TDuration ping = Proxy->Common->Settings.PingPeriod;
if (ping != TDuration::Zero() && !NumEventsInReadyChannels) {
@@ -761,7 +761,7 @@ namespace NActors {
}
// update payload activity timer
- LastPayloadActivityTimestamp = TActivationContext::Now();
+ LastPayloadActivityTimestamp = TActivationContext::Monotonic();
} else if (pingMask) {
serial = *pingMask;
@@ -923,7 +923,7 @@ namespace NActors {
flagState = EFlag::GREEN;
do {
- auto lastInputDelay = TActivationContext::Now() - LastInputActivityTimestamp;
+ auto lastInputDelay = TActivationContext::Monotonic() - LastInputActivityTimestamp;
if (lastInputDelay * 4 >= GetDeadPeerTimeout() * 3) {
flagState = EFlag::ORANGE;
break;
@@ -1006,7 +1006,7 @@ namespace NActors {
}
void TInterconnectSessionTCP::IssuePingRequest() {
- const TInstant now = TActivationContext::Now();
+ const TMonotonic now = TActivationContext::Monotonic();
if (now >= LastPingTimestamp + PingPeriodicity) {
LOG_DEBUG_IC_SESSION("ICS22", "Issuing ping request");
if (Socket) {
@@ -1176,6 +1176,8 @@ namespace NActors {
ui32 unsentQueueSize = Socket ? Socket->GetUnsentQueueSize() : 0;
+ const TMonotonic now = TActivationContext::Monotonic();
+
MON_VAR(OutputStuckFlag)
MON_VAR(SendQueue.size())
MON_VAR(SendQueueCache.size())
@@ -1185,8 +1187,8 @@ namespace NActors {
MON_VAR(InflightDataAmount)
MON_VAR(unsentQueueSize)
MON_VAR(SendBufferSize)
- MON_VAR(LastInputActivityTimestamp)
- MON_VAR(LastPayloadActivityTimestamp)
+ MON_VAR(now - LastInputActivityTimestamp)
+ MON_VAR(now - LastPayloadActivityTimestamp)
MON_VAR(LastHandshakeDone)
MON_VAR(OutputCounter)
MON_VAR(LastSentSerial)
@@ -1205,7 +1207,7 @@ namespace NActors {
clockSkew = Sprintf("+%s", TDuration::MicroSeconds(x).ToString().data());
}
- MON_VAR(LastPingTimestamp)
+ MON_VAR(now - LastPingTimestamp)
MON_VAR(GetPingRTT())
MON_VAR(clockSkew)
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h
index 51c5bfa453f..598a5c9220c 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h
@@ -266,7 +266,7 @@ namespace NActors {
}
const TDuration DeadPeerTimeout;
- TInstant LastReceiveTimestamp;
+ TMonotonic LastReceiveTimestamp;
void HandleCheckDeadPeer();
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -413,15 +413,15 @@ namespace NActors {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// pinger
- TInstant LastPingTimestamp;
+ TMonotonic LastPingTimestamp;
static constexpr TDuration PingPeriodicity = TDuration::Seconds(1);
void IssuePingRequest();
void Handle(TEvProcessPingRequest::TPtr ev);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- TInstant LastInputActivityTimestamp;
- TInstant LastPayloadActivityTimestamp;
+ TMonotonic LastInputActivityTimestamp;
+ TMonotonic LastPayloadActivityTimestamp;
TWatchdogTimer<TEvCheckCloseOnIdle> CloseOnIdleWatchdog;
TWatchdogTimer<TEvCheckLostConnection> LostConnectionWatchdog;
@@ -481,8 +481,8 @@ namespace NActors {
// time at which we want to send confirmation packet even if there was no outgoing data
ui64 UnconfirmedBytes = 0;
- TInstant ForcePacketTimestamp = TInstant::Max();
- TPriorityQueue<TInstant, TVector<TInstant>, std::greater<TInstant>> FlushSchedule;
+ TMonotonic ForcePacketTimestamp = TMonotonic::Max();
+ TPriorityQueue<TMonotonic, TVector<TMonotonic>, std::greater<TMonotonic>> FlushSchedule;
size_t MaxFlushSchedule = 0;
ui64 FlushEventsScheduled = 0;
ui64 FlushEventsProcessed = 0;
diff --git a/library/cpp/actors/interconnect/watchdog_timer.h b/library/cpp/actors/interconnect/watchdog_timer.h
index c190105a592..fe62006e3b9 100644
--- a/library/cpp/actors/interconnect/watchdog_timer.h
+++ b/library/cpp/actors/interconnect/watchdog_timer.h
@@ -8,7 +8,7 @@ namespace NActors {
const TDuration Timeout;
const TCallback Callback;
- TInstant LastResetTimestamp;
+ TMonotonic LastResetTimestamp;
TEvent* ExpectedEvent = nullptr;
ui32 Iteration = 0;
@@ -29,7 +29,7 @@ namespace NActors {
}
void Reset() {
- LastResetTimestamp = TActivationContext::Now();
+ LastResetTimestamp = TActivationContext::Monotonic();
}
void Disarm() {
@@ -38,11 +38,11 @@ namespace NActors {
void operator()(typename TEvent::TPtr& ev) {
if (ev->Get() == ExpectedEvent) {
- const TInstant now = TActivationContext::Now();
- const TInstant barrier = LastResetTimestamp + Timeout;
+ const TMonotonic now = TActivationContext::Monotonic();
+ const TMonotonic barrier = LastResetTimestamp + Timeout;
if (now < barrier) {
// the time hasn't come yet
- Schedule(barrier - now, TActorIdentity(ev->Recipient));
+ Schedule(barrier, TActorIdentity(ev->Recipient));
} else if (Iteration < NumIterationsBeforeFiring) {
// time has come, but we will still give actor a chance to process some messages and rearm timer
++Iteration;
@@ -57,7 +57,8 @@ namespace NActors {
}
private:
- void Schedule(TDuration timeout, const TActorIdentity& actor) {
+ template<typename T>
+ void Schedule(T&& timeout, const TActorIdentity& actor) {
auto ev = MakeHolder<TEvent>();
ExpectedEvent = ev.Get();
Iteration = 0;