diff options
author | alexvru <alexvru@ydb.tech> | 2023-06-16 12:12:27 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-06-16 12:12:27 +0300 |
commit | 4f88348938057f18fd4aed84a4e42202cf16ecfd (patch) | |
tree | bd23fae7b94975d74d4872f73e2edbbc84a124cc | |
parent | b19fb21198fe8919ac76d17c392bf4a2ed7db7f2 (diff) | |
download | ydb-4f88348938057f18fd4aed84a4e42202cf16ecfd.tar.gz |
Fix CloseOnIdle logic
3 files changed, 64 insertions, 52 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index a5d0b3db9d..8767161e08 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -140,9 +140,10 @@ namespace NActors { } SetOutputStuckFlag(true); - ++NumEventsInReadyChannels; + ++NumEventsInQueue; + RearmCloseOnIdle(); - LWTRACK(EnqueueEvent, event->Orbit, Proxy->PeerNodeId, NumEventsInReadyChannels, GetWriteBlockedTotal(), evChannel, oChannel.GetQueueSize(), oChannel.GetBufferedAmountOfData()); + LWTRACK(EnqueueEvent, event->Orbit, Proxy->PeerNodeId, NumEventsInQueue, GetWriteBlockedTotal(), evChannel, oChannel.GetQueueSize(), oChannel.GetBufferedAmountOfData()); // check for overloaded queues ui64 sendBufferDieLimit = Proxy->Common->Settings.SendBufferDieLimitInMB * ui64(1 << 20); @@ -214,7 +215,7 @@ namespace NActors { const ui64 nextPacket = Max(LastConfirmed, ev->Get()->NextPacket); // arm watchdogs - CloseOnIdleWatchdog.Arm(SelfId()); + RearmCloseOnIdle(); // reset activity timestamps LastInputActivityTimestamp = LastPayloadActivityTimestamp = TActivationContext::Monotonic(); @@ -306,7 +307,7 @@ namespace NActors { // reset payload watchdog that controls close-on-idle behaviour LastPayloadActivityTimestamp = TActivationContext::Monotonic(); - CloseOnIdleWatchdog.Reset(); + RearmCloseOnIdle(); } LWPROBE_IF_TOO_LONG(SlowICDropConfirmed, Proxy->PeerNodeId, ms) { @@ -393,7 +394,7 @@ namespace NActors { bool canProducePackets; bool canWriteData; - canProducePackets = NumEventsInReadyChannels && InflightDataAmount < GetTotalInflightAmountOfData() && + canProducePackets = NumEventsInQueue && InflightDataAmount < GetTotalInflightAmountOfData() && GetUnsentSize() < GetUnsentLimit(); canWriteData = ((OutgoingStream || OutOfBandStream) && !ReceiveContext->MainWriteBlocked) || @@ -424,7 +425,7 @@ namespace NActors { // we exit cycle static constexpr ui32 maxBytesToProduce = 64 * 1024; ui32 bytesProduced = 0; - while (NumEventsInReadyChannels && InflightDataAmount < GetTotalInflightAmountOfData() && GetUnsentSize() < GetUnsentLimit()) { + while (NumEventsInQueue && InflightDataAmount < GetTotalInflightAmountOfData() && GetUnsentSize() < GetUnsentLimit()) { if ((bytesProduced && TimeLimit->CheckExceeded()) || bytesProduced >= maxBytesToProduce) { break; } @@ -488,7 +489,7 @@ namespace NActors { Socket.Reset(); Proxy->Metrics->IncDisconnections(); CloseOnIdleWatchdog.Disarm(); - LostConnectionWatchdog.Arm(SelfId()); + LostConnectionWatchdog.Rearm(SelfId()); Proxy->Metrics->SetConnected(0); LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] disconnected", Proxy->PeerNodeId); } @@ -749,7 +750,7 @@ namespace NActors { ForcePacketTimestamp = TMonotonic::Max(); UnconfirmedBytes = 0; const TDuration ping = Proxy->Common->Settings.PingPeriod; - if (ping != TDuration::Zero() && !NumEventsInReadyChannels) { + if (ping != TDuration::Zero() && !NumEventsInQueue) { SetForcePacketTimestamp(ping); } } @@ -773,7 +774,7 @@ namespace NActors { serial = ++OutputCounter; // fill the data packet - Y_VERIFY(NumEventsInReadyChannels); + Y_VERIFY(NumEventsInQueue); LWPROBE_IF_TOO_LONG(SlowICFillSendingBuffer, Proxy->PeerNodeId, ms) { FillSendingBuffer(packet, serial); } @@ -889,13 +890,15 @@ namespace NActors { " dropped %" PRIu32 " packets", InflightDataAmount, droppedDataAmount, numDropped); Pool->Trim(); // send any unsent free requests + + RearmCloseOnIdle(); } void TInterconnectSessionTCP::FillSendingBuffer(TTcpPacketOutTask& task, ui64 serial) { ui32 bytesGenerated = 0; - Y_VERIFY(NumEventsInReadyChannels); - while (NumEventsInReadyChannels) { + Y_VERIFY(NumEventsInQueue); + while (NumEventsInQueue) { TEventOutputChannel *channel = ChannelScheduler->PickChannelWithLeastConsumedWeight(); Y_VERIFY_DEBUG(!channel->IsEmpty()); @@ -921,10 +924,10 @@ namespace NActors { if (eventDone) { ++MessagesWrittenToBuffer; - Y_VERIFY(NumEventsInReadyChannels); - --NumEventsInReadyChannels; + Y_VERIFY(NumEventsInQueue); + --NumEventsInQueue; - if (!NumEventsInReadyChannels) { + if (!NumEventsInQueue) { SetOutputStuckFlag(false); } } @@ -1030,7 +1033,7 @@ namespace NActors { } TDuration TInterconnectSessionTCP::GetCloseOnIdleTimeout() const { - return Proxy->Common->Settings.CloseOnIdle; + return Coalesce(Proxy->Common->Settings.CloseOnIdle, DEFAULT_CLOSE_ON_IDLE_TIMEOUT); } TDuration TInterconnectSessionTCP::GetLostConnectionTimeout() const { @@ -1228,7 +1231,7 @@ namespace NActors { MON_VAR(OutputStuckFlag) MON_VAR(SendQueue.size()) - MON_VAR(NumEventsInReadyChannels) + MON_VAR(NumEventsInQueue) MON_VAR(TotalOutputQueueSize) MON_VAR(InflightDataAmount) MON_VAR(unsentQueueSize) diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index f11ae505f8..1200057006 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -82,6 +82,7 @@ namespace NActors { const ui64 UpperLimit; }; + static constexpr TDuration DEFAULT_CLOSE_ON_IDLE_TIMEOUT = TDuration::Seconds(90); static constexpr TDuration DEFAULT_DEADPEER_TIMEOUT = TDuration::Seconds(10); static constexpr TDuration DEFAULT_LOST_CONNECTION_TIMEOUT = TDuration::Seconds(10); static constexpr ui32 DEFAULT_MAX_INFLIGHT_DATA = 10240 * 1024; @@ -545,6 +546,14 @@ namespace NActors { Terminate(TDisconnectReason::LostConnection()); } + void RearmCloseOnIdle() { + if (!NumEventsInQueue && OutputCounter == LastConfirmed) { + CloseOnIdleWatchdog.Rearm(SelfId()); + } else { + CloseOnIdleWatchdog.Disarm(); + } + } + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// const TSessionParams Params; @@ -553,7 +562,7 @@ namespace NActors { ui64 TotalOutputQueueSize; bool OutputStuckFlag; TRecentWnd<std::pair<ui64, ui64>> OutputQueueUtilization; - size_t NumEventsInReadyChannels = 0; + size_t NumEventsInQueue = 0; void SetOutputStuckFlag(bool state); void SwitchStuckPeriod(); diff --git a/library/cpp/actors/interconnect/watchdog_timer.h b/library/cpp/actors/interconnect/watchdog_timer.h index fe62006e3b..66cf19dc4d 100644 --- a/library/cpp/actors/interconnect/watchdog_timer.h +++ b/library/cpp/actors/interconnect/watchdog_timer.h @@ -1,16 +1,16 @@ #pragma once namespace NActors { - template <typename TEvent> + template<typename TEvent> class TWatchdogTimer { using TCallback = std::function<void()>; const TDuration Timeout; const TCallback Callback; - TMonotonic LastResetTimestamp; - TEvent* ExpectedEvent = nullptr; - ui32 Iteration = 0; + TMonotonic TriggerTimestamp = TMonotonic::Max(); + bool EventScheduled = false; + ui32 Iteration; static constexpr ui32 NumIterationsBeforeFiring = 2; @@ -18,51 +18,51 @@ namespace NActors { TWatchdogTimer(TDuration timeout, TCallback callback) : Timeout(timeout) , Callback(std::move(callback)) - { - } + {} - void Arm(const TActorIdentity& actor) { + void Rearm(const TActorIdentity& actor) { if (Timeout != TDuration::Zero() && Timeout != TDuration::Max()) { - Schedule(Timeout, actor); - Reset(); + TriggerTimestamp = TActivationContext::Monotonic() + Timeout; + Iteration = 0; + Schedule(actor); } } - void Reset() { - LastResetTimestamp = TActivationContext::Monotonic(); + void Disarm() { + TriggerTimestamp = TMonotonic::Max(); } - void Disarm() { - ExpectedEvent = nullptr; + bool Armed() const { + return TriggerTimestamp != TMonotonic::Max(); } void operator()(typename TEvent::TPtr& ev) { - if (ev->Get() == ExpectedEvent) { - const TMonotonic now = TActivationContext::Monotonic(); - const TMonotonic barrier = LastResetTimestamp + Timeout; - if (now < barrier) { - // the time hasn't come yet - Schedule(barrier, TActorIdentity(ev->Recipient)); - } else if (Iteration < NumIterationsBeforeFiring) { - // time has come, but we will still give actor a chance to process some messages and rearm timer - ++Iteration; - TActivationContext::Send(ev.Release()); // send this event into queue once more - } else { - // no chance to disarm, fire callback - Callback(); - ExpectedEvent = nullptr; - Iteration = 0; - } + Y_VERIFY_DEBUG(EventScheduled); + EventScheduled = false; + if (!Armed()) { + // just do nothing + } else if (TActivationContext::Monotonic() < TriggerTimestamp) { + // the time hasn't come yet + Schedule(TActorIdentity(ev->Recipient)); + } else if (Iteration < NumIterationsBeforeFiring) { + // time has come, but we will still give actor a chance to process some messages and rearm timer + ++Iteration; + TActivationContext::Send(ev.Release()); // send this event into queue once more + EventScheduled = true; + } else { + // no chance to disarm, fire callback + Disarm(); + Callback(); } } private: - template<typename T> - void Schedule(T&& timeout, const TActorIdentity& actor) { - auto ev = MakeHolder<TEvent>(); - ExpectedEvent = ev.Get(); - Iteration = 0; - actor.Schedule(timeout, ev.Release()); + void Schedule(const TActorIdentity& actor) { + Y_VERIFY_DEBUG(Armed()); + if (!EventScheduled) { + actor.Schedule(TriggerTimestamp, new TEvent); + EventScheduled = true; + } } }; |