diff options
author | alexvru <alexvru@ydb.tech> | 2023-06-16 12:12:27 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-06-16 12:12:27 +0300 |
commit | 4f88348938057f18fd4aed84a4e42202cf16ecfd (patch) | |
tree | bd23fae7b94975d74d4872f73e2edbbc84a124cc /library/cpp/actors/interconnect/interconnect_tcp_session.cpp | |
parent | b19fb21198fe8919ac76d17c392bf4a2ed7db7f2 (diff) | |
download | ydb-4f88348938057f18fd4aed84a4e42202cf16ecfd.tar.gz |
Fix CloseOnIdle logic
Diffstat (limited to 'library/cpp/actors/interconnect/interconnect_tcp_session.cpp')
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_tcp_session.cpp | 35 |
1 files changed, 19 insertions, 16 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index a5d0b3db9d..8767161e08 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -140,9 +140,10 @@ namespace NActors { } SetOutputStuckFlag(true); - ++NumEventsInReadyChannels; + ++NumEventsInQueue; + RearmCloseOnIdle(); - LWTRACK(EnqueueEvent, event->Orbit, Proxy->PeerNodeId, NumEventsInReadyChannels, GetWriteBlockedTotal(), evChannel, oChannel.GetQueueSize(), oChannel.GetBufferedAmountOfData()); + LWTRACK(EnqueueEvent, event->Orbit, Proxy->PeerNodeId, NumEventsInQueue, GetWriteBlockedTotal(), evChannel, oChannel.GetQueueSize(), oChannel.GetBufferedAmountOfData()); // check for overloaded queues ui64 sendBufferDieLimit = Proxy->Common->Settings.SendBufferDieLimitInMB * ui64(1 << 20); @@ -214,7 +215,7 @@ namespace NActors { const ui64 nextPacket = Max(LastConfirmed, ev->Get()->NextPacket); // arm watchdogs - CloseOnIdleWatchdog.Arm(SelfId()); + RearmCloseOnIdle(); // reset activity timestamps LastInputActivityTimestamp = LastPayloadActivityTimestamp = TActivationContext::Monotonic(); @@ -306,7 +307,7 @@ namespace NActors { // reset payload watchdog that controls close-on-idle behaviour LastPayloadActivityTimestamp = TActivationContext::Monotonic(); - CloseOnIdleWatchdog.Reset(); + RearmCloseOnIdle(); } LWPROBE_IF_TOO_LONG(SlowICDropConfirmed, Proxy->PeerNodeId, ms) { @@ -393,7 +394,7 @@ namespace NActors { bool canProducePackets; bool canWriteData; - canProducePackets = NumEventsInReadyChannels && InflightDataAmount < GetTotalInflightAmountOfData() && + canProducePackets = NumEventsInQueue && InflightDataAmount < GetTotalInflightAmountOfData() && GetUnsentSize() < GetUnsentLimit(); canWriteData = ((OutgoingStream || OutOfBandStream) && !ReceiveContext->MainWriteBlocked) || @@ -424,7 +425,7 @@ namespace NActors { // we exit cycle static constexpr ui32 maxBytesToProduce = 64 * 1024; ui32 bytesProduced = 0; - while (NumEventsInReadyChannels && InflightDataAmount < GetTotalInflightAmountOfData() && GetUnsentSize() < GetUnsentLimit()) { + while (NumEventsInQueue && InflightDataAmount < GetTotalInflightAmountOfData() && GetUnsentSize() < GetUnsentLimit()) { if ((bytesProduced && TimeLimit->CheckExceeded()) || bytesProduced >= maxBytesToProduce) { break; } @@ -488,7 +489,7 @@ namespace NActors { Socket.Reset(); Proxy->Metrics->IncDisconnections(); CloseOnIdleWatchdog.Disarm(); - LostConnectionWatchdog.Arm(SelfId()); + LostConnectionWatchdog.Rearm(SelfId()); Proxy->Metrics->SetConnected(0); LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] disconnected", Proxy->PeerNodeId); } @@ -749,7 +750,7 @@ namespace NActors { ForcePacketTimestamp = TMonotonic::Max(); UnconfirmedBytes = 0; const TDuration ping = Proxy->Common->Settings.PingPeriod; - if (ping != TDuration::Zero() && !NumEventsInReadyChannels) { + if (ping != TDuration::Zero() && !NumEventsInQueue) { SetForcePacketTimestamp(ping); } } @@ -773,7 +774,7 @@ namespace NActors { serial = ++OutputCounter; // fill the data packet - Y_VERIFY(NumEventsInReadyChannels); + Y_VERIFY(NumEventsInQueue); LWPROBE_IF_TOO_LONG(SlowICFillSendingBuffer, Proxy->PeerNodeId, ms) { FillSendingBuffer(packet, serial); } @@ -889,13 +890,15 @@ namespace NActors { " dropped %" PRIu32 " packets", InflightDataAmount, droppedDataAmount, numDropped); Pool->Trim(); // send any unsent free requests + + RearmCloseOnIdle(); } void TInterconnectSessionTCP::FillSendingBuffer(TTcpPacketOutTask& task, ui64 serial) { ui32 bytesGenerated = 0; - Y_VERIFY(NumEventsInReadyChannels); - while (NumEventsInReadyChannels) { + Y_VERIFY(NumEventsInQueue); + while (NumEventsInQueue) { TEventOutputChannel *channel = ChannelScheduler->PickChannelWithLeastConsumedWeight(); Y_VERIFY_DEBUG(!channel->IsEmpty()); @@ -921,10 +924,10 @@ namespace NActors { if (eventDone) { ++MessagesWrittenToBuffer; - Y_VERIFY(NumEventsInReadyChannels); - --NumEventsInReadyChannels; + Y_VERIFY(NumEventsInQueue); + --NumEventsInQueue; - if (!NumEventsInReadyChannels) { + if (!NumEventsInQueue) { SetOutputStuckFlag(false); } } @@ -1030,7 +1033,7 @@ namespace NActors { } TDuration TInterconnectSessionTCP::GetCloseOnIdleTimeout() const { - return Proxy->Common->Settings.CloseOnIdle; + return Coalesce(Proxy->Common->Settings.CloseOnIdle, DEFAULT_CLOSE_ON_IDLE_TIMEOUT); } TDuration TInterconnectSessionTCP::GetLostConnectionTimeout() const { @@ -1228,7 +1231,7 @@ namespace NActors { MON_VAR(OutputStuckFlag) MON_VAR(SendQueue.size()) - MON_VAR(NumEventsInReadyChannels) + MON_VAR(NumEventsInQueue) MON_VAR(TotalOutputQueueSize) MON_VAR(InflightDataAmount) MON_VAR(unsentQueueSize) |