diff options
author | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-03-31 10:54:08 +0300 |
---|---|---|
committer | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-03-31 12:28:07 +0300 |
commit | fc1cffcfa7f0497a1f97b384a24bcbf23362f3be (patch) | |
tree | c15f7ab5b9e9b20fd0ef8fc07d598d28e8b32004 /library/cpp/actors/interconnect/interconnect_tcp_session.cpp | |
parent | 8a749596d40e91c896a1907afcd108d9221fbde1 (diff) | |
download | ydb-e9cbe5c5cf67db853d223fd365c9f05b695f7b96.tar.gz |
Ydb stable 23-1-1923.1.19
x-stable-origin-commit: c5d5a396e89d0a72e0267a55e93d8404d4fb54fe
Diffstat (limited to 'library/cpp/actors/interconnect/interconnect_tcp_session.cpp')
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_tcp_session.cpp | 30 |
1 files changed, 16 insertions, 14 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index feb55a16ad..18df8e42ff 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -232,7 +232,7 @@ namespace NActors { CloseOnIdleWatchdog.Arm(SelfId()); // reset activity timestamps - LastInputActivityTimestamp = LastPayloadActivityTimestamp = TActivationContext::Now(); + LastInputActivityTimestamp = LastPayloadActivityTimestamp = TActivationContext::Monotonic(); LOG_INFO_IC_SESSION("ICS10", "traffic start"); @@ -315,7 +315,7 @@ namespace NActors { bool needConfirm = false; // update activity timer for dead peer checker - LastInputActivityTimestamp = TActivationContext::Now(); + LastInputActivityTimestamp = TActivationContext::Monotonic(); if (msg.NumDataBytes) { UnconfirmedBytes += msg.NumDataBytes; @@ -326,7 +326,7 @@ namespace NActors { } // reset payload watchdog that controls close-on-idle behaviour - LastPayloadActivityTimestamp = TActivationContext::Now(); + LastPayloadActivityTimestamp = TActivationContext::Monotonic(); CloseOnIdleWatchdog.Reset(); } @@ -654,7 +654,7 @@ namespace NActors { void TInterconnectSessionTCP::SetForcePacketTimestamp(TDuration period) { if (period != TDuration::Max()) { - const TInstant when = TActivationContext::Now() + period; + const TMonotonic when = TActivationContext::Monotonic() + period; if (when < ForcePacketTimestamp) { ForcePacketTimestamp = when; ScheduleFlush(); @@ -664,7 +664,7 @@ namespace NActors { void TInterconnectSessionTCP::ScheduleFlush() { if (FlushSchedule.empty() || ForcePacketTimestamp < FlushSchedule.top()) { - Schedule(ForcePacketTimestamp - TActivationContext::Now(), new TEvFlush); + Schedule(ForcePacketTimestamp, new TEvFlush); FlushSchedule.push(ForcePacketTimestamp); MaxFlushSchedule = Max(MaxFlushSchedule, FlushSchedule.size()); ++FlushEventsScheduled; @@ -672,7 +672,7 @@ namespace NActors { } void TInterconnectSessionTCP::HandleFlush() { - const TInstant now = TActivationContext::Now(); + const TMonotonic now = TActivationContext::Monotonic(); while (FlushSchedule && now >= FlushSchedule.top()) { FlushSchedule.pop(); } @@ -682,14 +682,14 @@ namespace NActors { ++ConfirmPacketsForcedByTimeout; ++FlushEventsProcessed; MakePacket(false); // just generate confirmation packet if we have preconditions for this - } else if (ForcePacketTimestamp != TInstant::Max()) { + } else if (ForcePacketTimestamp != TMonotonic::Max()) { ScheduleFlush(); } } } void TInterconnectSessionTCP::ResetFlushLogic() { - ForcePacketTimestamp = TInstant::Max(); + ForcePacketTimestamp = TMonotonic::Max(); UnconfirmedBytes = 0; const TDuration ping = Proxy->Common->Settings.PingPeriod; if (ping != TDuration::Zero() && !NumEventsInReadyChannels) { @@ -761,7 +761,7 @@ namespace NActors { } // update payload activity timer - LastPayloadActivityTimestamp = TActivationContext::Now(); + LastPayloadActivityTimestamp = TActivationContext::Monotonic(); } else if (pingMask) { serial = *pingMask; @@ -923,7 +923,7 @@ namespace NActors { flagState = EFlag::GREEN; do { - auto lastInputDelay = TActivationContext::Now() - LastInputActivityTimestamp; + auto lastInputDelay = TActivationContext::Monotonic() - LastInputActivityTimestamp; if (lastInputDelay * 4 >= GetDeadPeerTimeout() * 3) { flagState = EFlag::ORANGE; break; @@ -1006,7 +1006,7 @@ namespace NActors { } void TInterconnectSessionTCP::IssuePingRequest() { - const TInstant now = TActivationContext::Now(); + const TMonotonic now = TActivationContext::Monotonic(); if (now >= LastPingTimestamp + PingPeriodicity) { LOG_DEBUG_IC_SESSION("ICS22", "Issuing ping request"); if (Socket) { @@ -1175,6 +1175,8 @@ namespace NActors { ui32 unsentQueueSize = Socket ? Socket->GetUnsentQueueSize() : 0; + const TMonotonic now = TActivationContext::Monotonic(); + MON_VAR(OutputStuckFlag) MON_VAR(SendQueue.size()) MON_VAR(SendQueueCache.size()) @@ -1184,8 +1186,8 @@ namespace NActors { MON_VAR(InflightDataAmount) MON_VAR(unsentQueueSize) MON_VAR(SendBufferSize) - MON_VAR(LastInputActivityTimestamp) - MON_VAR(LastPayloadActivityTimestamp) + MON_VAR(now - LastInputActivityTimestamp) + MON_VAR(now - LastPayloadActivityTimestamp) MON_VAR(LastHandshakeDone) MON_VAR(OutputCounter) MON_VAR(LastSentSerial) @@ -1204,7 +1206,7 @@ namespace NActors { clockSkew = Sprintf("+%s", TDuration::MicroSeconds(x).ToString().data()); } - MON_VAR(LastPingTimestamp) + MON_VAR(now - LastPingTimestamp) MON_VAR(GetPingRTT()) MON_VAR(clockSkew) |