aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
diff options
context:
space:
mode:
authorDaniil Cherednik <dan.cherednik@gmail.com>2023-03-31 10:54:08 +0300
committerDaniil Cherednik <dan.cherednik@gmail.com>2023-03-31 12:28:07 +0300
commitfc1cffcfa7f0497a1f97b384a24bcbf23362f3be (patch)
treec15f7ab5b9e9b20fd0ef8fc07d598d28e8b32004 /library/cpp/actors/interconnect/interconnect_tcp_session.cpp
parent8a749596d40e91c896a1907afcd108d9221fbde1 (diff)
downloadydb-e9cbe5c5cf67db853d223fd365c9f05b695f7b96.tar.gz
Ydb stable 23-1-1923.1.19
x-stable-origin-commit: c5d5a396e89d0a72e0267a55e93d8404d4fb54fe
Diffstat (limited to 'library/cpp/actors/interconnect/interconnect_tcp_session.cpp')
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp30
1 files changed, 16 insertions, 14 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index feb55a16ad..18df8e42ff 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -232,7 +232,7 @@ namespace NActors {
CloseOnIdleWatchdog.Arm(SelfId());
// reset activity timestamps
- LastInputActivityTimestamp = LastPayloadActivityTimestamp = TActivationContext::Now();
+ LastInputActivityTimestamp = LastPayloadActivityTimestamp = TActivationContext::Monotonic();
LOG_INFO_IC_SESSION("ICS10", "traffic start");
@@ -315,7 +315,7 @@ namespace NActors {
bool needConfirm = false;
// update activity timer for dead peer checker
- LastInputActivityTimestamp = TActivationContext::Now();
+ LastInputActivityTimestamp = TActivationContext::Monotonic();
if (msg.NumDataBytes) {
UnconfirmedBytes += msg.NumDataBytes;
@@ -326,7 +326,7 @@ namespace NActors {
}
// reset payload watchdog that controls close-on-idle behaviour
- LastPayloadActivityTimestamp = TActivationContext::Now();
+ LastPayloadActivityTimestamp = TActivationContext::Monotonic();
CloseOnIdleWatchdog.Reset();
}
@@ -654,7 +654,7 @@ namespace NActors {
void TInterconnectSessionTCP::SetForcePacketTimestamp(TDuration period) {
if (period != TDuration::Max()) {
- const TInstant when = TActivationContext::Now() + period;
+ const TMonotonic when = TActivationContext::Monotonic() + period;
if (when < ForcePacketTimestamp) {
ForcePacketTimestamp = when;
ScheduleFlush();
@@ -664,7 +664,7 @@ namespace NActors {
void TInterconnectSessionTCP::ScheduleFlush() {
if (FlushSchedule.empty() || ForcePacketTimestamp < FlushSchedule.top()) {
- Schedule(ForcePacketTimestamp - TActivationContext::Now(), new TEvFlush);
+ Schedule(ForcePacketTimestamp, new TEvFlush);
FlushSchedule.push(ForcePacketTimestamp);
MaxFlushSchedule = Max(MaxFlushSchedule, FlushSchedule.size());
++FlushEventsScheduled;
@@ -672,7 +672,7 @@ namespace NActors {
}
void TInterconnectSessionTCP::HandleFlush() {
- const TInstant now = TActivationContext::Now();
+ const TMonotonic now = TActivationContext::Monotonic();
while (FlushSchedule && now >= FlushSchedule.top()) {
FlushSchedule.pop();
}
@@ -682,14 +682,14 @@ namespace NActors {
++ConfirmPacketsForcedByTimeout;
++FlushEventsProcessed;
MakePacket(false); // just generate confirmation packet if we have preconditions for this
- } else if (ForcePacketTimestamp != TInstant::Max()) {
+ } else if (ForcePacketTimestamp != TMonotonic::Max()) {
ScheduleFlush();
}
}
}
void TInterconnectSessionTCP::ResetFlushLogic() {
- ForcePacketTimestamp = TInstant::Max();
+ ForcePacketTimestamp = TMonotonic::Max();
UnconfirmedBytes = 0;
const TDuration ping = Proxy->Common->Settings.PingPeriod;
if (ping != TDuration::Zero() && !NumEventsInReadyChannels) {
@@ -761,7 +761,7 @@ namespace NActors {
}
// update payload activity timer
- LastPayloadActivityTimestamp = TActivationContext::Now();
+ LastPayloadActivityTimestamp = TActivationContext::Monotonic();
} else if (pingMask) {
serial = *pingMask;
@@ -923,7 +923,7 @@ namespace NActors {
flagState = EFlag::GREEN;
do {
- auto lastInputDelay = TActivationContext::Now() - LastInputActivityTimestamp;
+ auto lastInputDelay = TActivationContext::Monotonic() - LastInputActivityTimestamp;
if (lastInputDelay * 4 >= GetDeadPeerTimeout() * 3) {
flagState = EFlag::ORANGE;
break;
@@ -1006,7 +1006,7 @@ namespace NActors {
}
void TInterconnectSessionTCP::IssuePingRequest() {
- const TInstant now = TActivationContext::Now();
+ const TMonotonic now = TActivationContext::Monotonic();
if (now >= LastPingTimestamp + PingPeriodicity) {
LOG_DEBUG_IC_SESSION("ICS22", "Issuing ping request");
if (Socket) {
@@ -1175,6 +1175,8 @@ namespace NActors {
ui32 unsentQueueSize = Socket ? Socket->GetUnsentQueueSize() : 0;
+ const TMonotonic now = TActivationContext::Monotonic();
+
MON_VAR(OutputStuckFlag)
MON_VAR(SendQueue.size())
MON_VAR(SendQueueCache.size())
@@ -1184,8 +1186,8 @@ namespace NActors {
MON_VAR(InflightDataAmount)
MON_VAR(unsentQueueSize)
MON_VAR(SendBufferSize)
- MON_VAR(LastInputActivityTimestamp)
- MON_VAR(LastPayloadActivityTimestamp)
+ MON_VAR(now - LastInputActivityTimestamp)
+ MON_VAR(now - LastPayloadActivityTimestamp)
MON_VAR(LastHandshakeDone)
MON_VAR(OutputCounter)
MON_VAR(LastSentSerial)
@@ -1204,7 +1206,7 @@ namespace NActors {
clockSkew = Sprintf("+%s", TDuration::MicroSeconds(x).ToString().data());
}
- MON_VAR(LastPingTimestamp)
+ MON_VAR(now - LastPingTimestamp)
MON_VAR(GetPingRTT())
MON_VAR(clockSkew)