aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-06-16 12:12:27 +0300
committeralexvru <alexvru@ydb.tech>2023-06-16 12:12:27 +0300
commit4f88348938057f18fd4aed84a4e42202cf16ecfd (patch)
treebd23fae7b94975d74d4872f73e2edbbc84a124cc /library/cpp/actors/interconnect/interconnect_tcp_session.cpp
parentb19fb21198fe8919ac76d17c392bf4a2ed7db7f2 (diff)
downloadydb-4f88348938057f18fd4aed84a4e42202cf16ecfd.tar.gz
Fix CloseOnIdle logic
Diffstat (limited to 'library/cpp/actors/interconnect/interconnect_tcp_session.cpp')
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp35
1 files changed, 19 insertions, 16 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index a5d0b3db9d..8767161e08 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -140,9 +140,10 @@ namespace NActors {
}
SetOutputStuckFlag(true);
- ++NumEventsInReadyChannels;
+ ++NumEventsInQueue;
+ RearmCloseOnIdle();
- LWTRACK(EnqueueEvent, event->Orbit, Proxy->PeerNodeId, NumEventsInReadyChannels, GetWriteBlockedTotal(), evChannel, oChannel.GetQueueSize(), oChannel.GetBufferedAmountOfData());
+ LWTRACK(EnqueueEvent, event->Orbit, Proxy->PeerNodeId, NumEventsInQueue, GetWriteBlockedTotal(), evChannel, oChannel.GetQueueSize(), oChannel.GetBufferedAmountOfData());
// check for overloaded queues
ui64 sendBufferDieLimit = Proxy->Common->Settings.SendBufferDieLimitInMB * ui64(1 << 20);
@@ -214,7 +215,7 @@ namespace NActors {
const ui64 nextPacket = Max(LastConfirmed, ev->Get()->NextPacket);
// arm watchdogs
- CloseOnIdleWatchdog.Arm(SelfId());
+ RearmCloseOnIdle();
// reset activity timestamps
LastInputActivityTimestamp = LastPayloadActivityTimestamp = TActivationContext::Monotonic();
@@ -306,7 +307,7 @@ namespace NActors {
// reset payload watchdog that controls close-on-idle behaviour
LastPayloadActivityTimestamp = TActivationContext::Monotonic();
- CloseOnIdleWatchdog.Reset();
+ RearmCloseOnIdle();
}
LWPROBE_IF_TOO_LONG(SlowICDropConfirmed, Proxy->PeerNodeId, ms) {
@@ -393,7 +394,7 @@ namespace NActors {
bool canProducePackets;
bool canWriteData;
- canProducePackets = NumEventsInReadyChannels && InflightDataAmount < GetTotalInflightAmountOfData() &&
+ canProducePackets = NumEventsInQueue && InflightDataAmount < GetTotalInflightAmountOfData() &&
GetUnsentSize() < GetUnsentLimit();
canWriteData = ((OutgoingStream || OutOfBandStream) && !ReceiveContext->MainWriteBlocked) ||
@@ -424,7 +425,7 @@ namespace NActors {
// we exit cycle
static constexpr ui32 maxBytesToProduce = 64 * 1024;
ui32 bytesProduced = 0;
- while (NumEventsInReadyChannels && InflightDataAmount < GetTotalInflightAmountOfData() && GetUnsentSize() < GetUnsentLimit()) {
+ while (NumEventsInQueue && InflightDataAmount < GetTotalInflightAmountOfData() && GetUnsentSize() < GetUnsentLimit()) {
if ((bytesProduced && TimeLimit->CheckExceeded()) || bytesProduced >= maxBytesToProduce) {
break;
}
@@ -488,7 +489,7 @@ namespace NActors {
Socket.Reset();
Proxy->Metrics->IncDisconnections();
CloseOnIdleWatchdog.Disarm();
- LostConnectionWatchdog.Arm(SelfId());
+ LostConnectionWatchdog.Rearm(SelfId());
Proxy->Metrics->SetConnected(0);
LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] disconnected", Proxy->PeerNodeId);
}
@@ -749,7 +750,7 @@ namespace NActors {
ForcePacketTimestamp = TMonotonic::Max();
UnconfirmedBytes = 0;
const TDuration ping = Proxy->Common->Settings.PingPeriod;
- if (ping != TDuration::Zero() && !NumEventsInReadyChannels) {
+ if (ping != TDuration::Zero() && !NumEventsInQueue) {
SetForcePacketTimestamp(ping);
}
}
@@ -773,7 +774,7 @@ namespace NActors {
serial = ++OutputCounter;
// fill the data packet
- Y_VERIFY(NumEventsInReadyChannels);
+ Y_VERIFY(NumEventsInQueue);
LWPROBE_IF_TOO_LONG(SlowICFillSendingBuffer, Proxy->PeerNodeId, ms) {
FillSendingBuffer(packet, serial);
}
@@ -889,13 +890,15 @@ namespace NActors {
" dropped %" PRIu32 " packets", InflightDataAmount, droppedDataAmount, numDropped);
Pool->Trim(); // send any unsent free requests
+
+ RearmCloseOnIdle();
}
void TInterconnectSessionTCP::FillSendingBuffer(TTcpPacketOutTask& task, ui64 serial) {
ui32 bytesGenerated = 0;
- Y_VERIFY(NumEventsInReadyChannels);
- while (NumEventsInReadyChannels) {
+ Y_VERIFY(NumEventsInQueue);
+ while (NumEventsInQueue) {
TEventOutputChannel *channel = ChannelScheduler->PickChannelWithLeastConsumedWeight();
Y_VERIFY_DEBUG(!channel->IsEmpty());
@@ -921,10 +924,10 @@ namespace NActors {
if (eventDone) {
++MessagesWrittenToBuffer;
- Y_VERIFY(NumEventsInReadyChannels);
- --NumEventsInReadyChannels;
+ Y_VERIFY(NumEventsInQueue);
+ --NumEventsInQueue;
- if (!NumEventsInReadyChannels) {
+ if (!NumEventsInQueue) {
SetOutputStuckFlag(false);
}
}
@@ -1030,7 +1033,7 @@ namespace NActors {
}
TDuration TInterconnectSessionTCP::GetCloseOnIdleTimeout() const {
- return Proxy->Common->Settings.CloseOnIdle;
+ return Coalesce(Proxy->Common->Settings.CloseOnIdle, DEFAULT_CLOSE_ON_IDLE_TIMEOUT);
}
TDuration TInterconnectSessionTCP::GetLostConnectionTimeout() const {
@@ -1228,7 +1231,7 @@ namespace NActors {
MON_VAR(OutputStuckFlag)
MON_VAR(SendQueue.size())
- MON_VAR(NumEventsInReadyChannels)
+ MON_VAR(NumEventsInQueue)
MON_VAR(TotalOutputQueueSize)
MON_VAR(InflightDataAmount)
MON_VAR(unsentQueueSize)