aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-06-16 12:12:27 +0300
committeralexvru <alexvru@ydb.tech>2023-06-16 12:12:27 +0300
commit4f88348938057f18fd4aed84a4e42202cf16ecfd (patch)
treebd23fae7b94975d74d4872f73e2edbbc84a124cc
parentb19fb21198fe8919ac76d17c392bf4a2ed7db7f2 (diff)
downloadydb-4f88348938057f18fd4aed84a4e42202cf16ecfd.tar.gz
Fix CloseOnIdle logic
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp35
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.h11
-rw-r--r--library/cpp/actors/interconnect/watchdog_timer.h70
3 files changed, 64 insertions, 52 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index a5d0b3db9d..8767161e08 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -140,9 +140,10 @@ namespace NActors {
}
SetOutputStuckFlag(true);
- ++NumEventsInReadyChannels;
+ ++NumEventsInQueue;
+ RearmCloseOnIdle();
- LWTRACK(EnqueueEvent, event->Orbit, Proxy->PeerNodeId, NumEventsInReadyChannels, GetWriteBlockedTotal(), evChannel, oChannel.GetQueueSize(), oChannel.GetBufferedAmountOfData());
+ LWTRACK(EnqueueEvent, event->Orbit, Proxy->PeerNodeId, NumEventsInQueue, GetWriteBlockedTotal(), evChannel, oChannel.GetQueueSize(), oChannel.GetBufferedAmountOfData());
// check for overloaded queues
ui64 sendBufferDieLimit = Proxy->Common->Settings.SendBufferDieLimitInMB * ui64(1 << 20);
@@ -214,7 +215,7 @@ namespace NActors {
const ui64 nextPacket = Max(LastConfirmed, ev->Get()->NextPacket);
// arm watchdogs
- CloseOnIdleWatchdog.Arm(SelfId());
+ RearmCloseOnIdle();
// reset activity timestamps
LastInputActivityTimestamp = LastPayloadActivityTimestamp = TActivationContext::Monotonic();
@@ -306,7 +307,7 @@ namespace NActors {
// reset payload watchdog that controls close-on-idle behaviour
LastPayloadActivityTimestamp = TActivationContext::Monotonic();
- CloseOnIdleWatchdog.Reset();
+ RearmCloseOnIdle();
}
LWPROBE_IF_TOO_LONG(SlowICDropConfirmed, Proxy->PeerNodeId, ms) {
@@ -393,7 +394,7 @@ namespace NActors {
bool canProducePackets;
bool canWriteData;
- canProducePackets = NumEventsInReadyChannels && InflightDataAmount < GetTotalInflightAmountOfData() &&
+ canProducePackets = NumEventsInQueue && InflightDataAmount < GetTotalInflightAmountOfData() &&
GetUnsentSize() < GetUnsentLimit();
canWriteData = ((OutgoingStream || OutOfBandStream) && !ReceiveContext->MainWriteBlocked) ||
@@ -424,7 +425,7 @@ namespace NActors {
// we exit cycle
static constexpr ui32 maxBytesToProduce = 64 * 1024;
ui32 bytesProduced = 0;
- while (NumEventsInReadyChannels && InflightDataAmount < GetTotalInflightAmountOfData() && GetUnsentSize() < GetUnsentLimit()) {
+ while (NumEventsInQueue && InflightDataAmount < GetTotalInflightAmountOfData() && GetUnsentSize() < GetUnsentLimit()) {
if ((bytesProduced && TimeLimit->CheckExceeded()) || bytesProduced >= maxBytesToProduce) {
break;
}
@@ -488,7 +489,7 @@ namespace NActors {
Socket.Reset();
Proxy->Metrics->IncDisconnections();
CloseOnIdleWatchdog.Disarm();
- LostConnectionWatchdog.Arm(SelfId());
+ LostConnectionWatchdog.Rearm(SelfId());
Proxy->Metrics->SetConnected(0);
LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] disconnected", Proxy->PeerNodeId);
}
@@ -749,7 +750,7 @@ namespace NActors {
ForcePacketTimestamp = TMonotonic::Max();
UnconfirmedBytes = 0;
const TDuration ping = Proxy->Common->Settings.PingPeriod;
- if (ping != TDuration::Zero() && !NumEventsInReadyChannels) {
+ if (ping != TDuration::Zero() && !NumEventsInQueue) {
SetForcePacketTimestamp(ping);
}
}
@@ -773,7 +774,7 @@ namespace NActors {
serial = ++OutputCounter;
// fill the data packet
- Y_VERIFY(NumEventsInReadyChannels);
+ Y_VERIFY(NumEventsInQueue);
LWPROBE_IF_TOO_LONG(SlowICFillSendingBuffer, Proxy->PeerNodeId, ms) {
FillSendingBuffer(packet, serial);
}
@@ -889,13 +890,15 @@ namespace NActors {
" dropped %" PRIu32 " packets", InflightDataAmount, droppedDataAmount, numDropped);
Pool->Trim(); // send any unsent free requests
+
+ RearmCloseOnIdle();
}
void TInterconnectSessionTCP::FillSendingBuffer(TTcpPacketOutTask& task, ui64 serial) {
ui32 bytesGenerated = 0;
- Y_VERIFY(NumEventsInReadyChannels);
- while (NumEventsInReadyChannels) {
+ Y_VERIFY(NumEventsInQueue);
+ while (NumEventsInQueue) {
TEventOutputChannel *channel = ChannelScheduler->PickChannelWithLeastConsumedWeight();
Y_VERIFY_DEBUG(!channel->IsEmpty());
@@ -921,10 +924,10 @@ namespace NActors {
if (eventDone) {
++MessagesWrittenToBuffer;
- Y_VERIFY(NumEventsInReadyChannels);
- --NumEventsInReadyChannels;
+ Y_VERIFY(NumEventsInQueue);
+ --NumEventsInQueue;
- if (!NumEventsInReadyChannels) {
+ if (!NumEventsInQueue) {
SetOutputStuckFlag(false);
}
}
@@ -1030,7 +1033,7 @@ namespace NActors {
}
TDuration TInterconnectSessionTCP::GetCloseOnIdleTimeout() const {
- return Proxy->Common->Settings.CloseOnIdle;
+ return Coalesce(Proxy->Common->Settings.CloseOnIdle, DEFAULT_CLOSE_ON_IDLE_TIMEOUT);
}
TDuration TInterconnectSessionTCP::GetLostConnectionTimeout() const {
@@ -1228,7 +1231,7 @@ namespace NActors {
MON_VAR(OutputStuckFlag)
MON_VAR(SendQueue.size())
- MON_VAR(NumEventsInReadyChannels)
+ MON_VAR(NumEventsInQueue)
MON_VAR(TotalOutputQueueSize)
MON_VAR(InflightDataAmount)
MON_VAR(unsentQueueSize)
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h
index f11ae505f8..1200057006 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h
@@ -82,6 +82,7 @@ namespace NActors {
const ui64 UpperLimit;
};
+ static constexpr TDuration DEFAULT_CLOSE_ON_IDLE_TIMEOUT = TDuration::Seconds(90);
static constexpr TDuration DEFAULT_DEADPEER_TIMEOUT = TDuration::Seconds(10);
static constexpr TDuration DEFAULT_LOST_CONNECTION_TIMEOUT = TDuration::Seconds(10);
static constexpr ui32 DEFAULT_MAX_INFLIGHT_DATA = 10240 * 1024;
@@ -545,6 +546,14 @@ namespace NActors {
Terminate(TDisconnectReason::LostConnection());
}
+ void RearmCloseOnIdle() {
+ if (!NumEventsInQueue && OutputCounter == LastConfirmed) {
+ CloseOnIdleWatchdog.Rearm(SelfId());
+ } else {
+ CloseOnIdleWatchdog.Disarm();
+ }
+ }
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
const TSessionParams Params;
@@ -553,7 +562,7 @@ namespace NActors {
ui64 TotalOutputQueueSize;
bool OutputStuckFlag;
TRecentWnd<std::pair<ui64, ui64>> OutputQueueUtilization;
- size_t NumEventsInReadyChannels = 0;
+ size_t NumEventsInQueue = 0;
void SetOutputStuckFlag(bool state);
void SwitchStuckPeriod();
diff --git a/library/cpp/actors/interconnect/watchdog_timer.h b/library/cpp/actors/interconnect/watchdog_timer.h
index fe62006e3b..66cf19dc4d 100644
--- a/library/cpp/actors/interconnect/watchdog_timer.h
+++ b/library/cpp/actors/interconnect/watchdog_timer.h
@@ -1,16 +1,16 @@
#pragma once
namespace NActors {
- template <typename TEvent>
+ template<typename TEvent>
class TWatchdogTimer {
using TCallback = std::function<void()>;
const TDuration Timeout;
const TCallback Callback;
- TMonotonic LastResetTimestamp;
- TEvent* ExpectedEvent = nullptr;
- ui32 Iteration = 0;
+ TMonotonic TriggerTimestamp = TMonotonic::Max();
+ bool EventScheduled = false;
+ ui32 Iteration;
static constexpr ui32 NumIterationsBeforeFiring = 2;
@@ -18,51 +18,51 @@ namespace NActors {
TWatchdogTimer(TDuration timeout, TCallback callback)
: Timeout(timeout)
, Callback(std::move(callback))
- {
- }
+ {}
- void Arm(const TActorIdentity& actor) {
+ void Rearm(const TActorIdentity& actor) {
if (Timeout != TDuration::Zero() && Timeout != TDuration::Max()) {
- Schedule(Timeout, actor);
- Reset();
+ TriggerTimestamp = TActivationContext::Monotonic() + Timeout;
+ Iteration = 0;
+ Schedule(actor);
}
}
- void Reset() {
- LastResetTimestamp = TActivationContext::Monotonic();
+ void Disarm() {
+ TriggerTimestamp = TMonotonic::Max();
}
- void Disarm() {
- ExpectedEvent = nullptr;
+ bool Armed() const {
+ return TriggerTimestamp != TMonotonic::Max();
}
void operator()(typename TEvent::TPtr& ev) {
- if (ev->Get() == ExpectedEvent) {
- const TMonotonic now = TActivationContext::Monotonic();
- const TMonotonic barrier = LastResetTimestamp + Timeout;
- if (now < barrier) {
- // the time hasn't come yet
- Schedule(barrier, TActorIdentity(ev->Recipient));
- } else if (Iteration < NumIterationsBeforeFiring) {
- // time has come, but we will still give actor a chance to process some messages and rearm timer
- ++Iteration;
- TActivationContext::Send(ev.Release()); // send this event into queue once more
- } else {
- // no chance to disarm, fire callback
- Callback();
- ExpectedEvent = nullptr;
- Iteration = 0;
- }
+ Y_VERIFY_DEBUG(EventScheduled);
+ EventScheduled = false;
+ if (!Armed()) {
+ // just do nothing
+ } else if (TActivationContext::Monotonic() < TriggerTimestamp) {
+ // the time hasn't come yet
+ Schedule(TActorIdentity(ev->Recipient));
+ } else if (Iteration < NumIterationsBeforeFiring) {
+ // time has come, but we will still give actor a chance to process some messages and rearm timer
+ ++Iteration;
+ TActivationContext::Send(ev.Release()); // send this event into queue once more
+ EventScheduled = true;
+ } else {
+ // no chance to disarm, fire callback
+ Disarm();
+ Callback();
}
}
private:
- template<typename T>
- void Schedule(T&& timeout, const TActorIdentity& actor) {
- auto ev = MakeHolder<TEvent>();
- ExpectedEvent = ev.Get();
- Iteration = 0;
- actor.Schedule(timeout, ev.Release());
+ void Schedule(const TActorIdentity& actor) {
+ Y_VERIFY_DEBUG(Armed());
+ if (!EventScheduled) {
+ actor.Schedule(TriggerTimestamp, new TEvent);
+ EventScheduled = true;
+ }
}
};