diff options
author | alexvru <alexvru@ydb.tech> | 2023-05-03 16:24:46 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-05-03 16:24:46 +0300 |
commit | f323f855449f04b5a243bf5eeb48320e834a4009 (patch) | |
tree | a2cd846c0be022fb9b1530613f77681d545eceb0 | |
parent | 200c355ded38bd5a84be4da890b492ac56602273 (diff) | |
download | ydb-f323f855449f04b5a243bf5eeb48320e834a4009.tar.gz |
Further IC fixes
3 files changed, 140 insertions, 136 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index 0c592d236b..0ca9f2f136 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -160,26 +160,7 @@ namespace NActors { } } - if (RamInQueue && !RamInQueue->Batching) { - // we have pending TEvRam, so GenerateTraffic will be called no matter what - } else if (InflightDataAmount >= GetTotalInflightAmountOfData() || !Socket) { - // we can't issue more traffic now; GenerateTraffic will be called upon unblocking - } else if (TotalOutputQueueSize >= 64 * 1024) { - // output queue size is quite big to issue some traffic - IssueRam(); - } else if (!RamInQueue) { - Y_VERIFY_DEBUG(NumEventsInReadyChannels == 1); - RamInQueue = new TEvRam(true); - auto *ev = new IEventHandle(SelfId(), {}, RamInQueue); - const TDuration batchPeriod = Proxy->Common->Settings.BatchPeriod; - if (batchPeriod != TDuration()) { - TActivationContext::Schedule(batchPeriod, ev); - } else { - TActivationContext::Send(ev); - } - LWPROBE(StartBatching, Proxy->PeerNodeId, batchPeriod.MillisecondsFloat()); - LOG_DEBUG_IC_SESSION("ICS17", "batching started"); - } + IssueRam(true); } void TInterconnectSessionTCP::Subscribe(STATEFN_SIG) { @@ -295,8 +276,7 @@ namespace NActors { LastHandshakeDone = TActivationContext::Now(); - RamInQueue = nullptr; - IssueRam(); + IssueRam(false); } void TInterconnectSessionTCP::Handle(TEvUpdateFromInputSession::TPtr& ev) { @@ -325,24 +305,18 @@ namespace NActors { CloseOnIdleWatchdog.Reset(); } - bool unblockedSomething = false; LWPROBE_IF_TOO_LONG(SlowICDropConfirmed, Proxy->PeerNodeId, ms) { - unblockedSomething = DropConfirmed(msg.ConfirmedByInput); - } - - // generate more traffic if we have unblocked state now - if (unblockedSomething) { - LWPROBE(UnblockByDropConfirmed, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - ev->SendTime) * 1000.0); - IssueRam(); + DropConfirmed(msg.ConfirmedByInput); } // if we haven't generated any packets, then make a lone Flush packet without any data if (needConfirm && Socket) { ++ConfirmPacketsForcedBySize; MakePacket(false); - WriteData(); } + GenerateTraffic(); + for (;;) { switch (EUpdateState state = ReceiveContext->UpdateState) { case EUpdateState::NONE: @@ -369,10 +343,17 @@ namespace NActors { } } - void TInterconnectSessionTCP::IssueRam() { - if (!RamInQueue || (RamInQueue->Batching && Proxy->Common->Settings.BatchPeriod != TDuration::Zero())) { - RamInQueue = new TEvRam(false); - Send(SelfId(), RamInQueue); + void TInterconnectSessionTCP::IssueRam(bool batching) { + const auto& batchPeriod = Proxy->Common->Settings.BatchPeriod; + if (!RamInQueue || (!batching && (RamInQueue->Batching && batchPeriod != TDuration()))) { + auto ev = std::make_unique<TEvRam>(batching); + RamInQueue = ev.get(); + auto handle = std::make_unique<IEventHandle>(SelfId(), SelfId(), ev.release()); + if (batching && batchPeriod != TDuration()) { + TActivationContext::Schedule(batchPeriod, handle.release()); + } else { + TActivationContext::Send(handle.release()); + } LWPROBE(StartRam, Proxy->PeerNodeId); RamStartedCycles = GetCycleCountFast(); } @@ -387,16 +368,44 @@ namespace NActors { } void TInterconnectSessionTCP::GenerateTraffic() { - // generate ping request, if needed - IssuePingRequest(); + bool canProducePackets = false; + bool canWriteData = false; + + if (!TimeLimit) { + TimeLimit.emplace(GetMaxCyclesPerEvent()); + } - LOG_DEBUG_IC_SESSION("ICS19", "GenerateTraffic"); + for (;;) { + if (!Socket) { + return; + } + ProducePackets(); + canProducePackets = NumEventsInReadyChannels && InflightDataAmount < GetTotalInflightAmountOfData() && + GetUnsentSize() < GetUnsentLimit(); + + if (!Socket) { + return; + } + WriteData(); + canWriteData = ((OutgoingStream || OutOfBandStream) && !ReceiveContext->MainWriteBlocked) || + (XdcStream && !ReceiveContext->XdcWriteBlocked); + + if ((!canProducePackets && !canWriteData) || TimeLimit->CheckExceeded()) { + break; + } + } - const ui64 sizeBefore = TotalOutputQueueSize; - ui32 generatedPackets = 0; - ui64 generatedBytes = 0; - ui64 generateStarted = GetCycleCountFast(); - bool enoughCpu = true; + if (canProducePackets || canWriteData) { + SetEnoughCpu(false); + IssueRam(false); + } else { + SetEnoughCpu(true); + } + } + + void TInterconnectSessionTCP::ProducePackets() { + // generate ping request, if needed + IssuePingRequest(); // apply traffic changes auto accountTraffic = [&] { ChannelScheduler->ForEach([](TEventOutputChannel& channel) { channel.AccountTraffic(); }); }; @@ -404,38 +413,20 @@ namespace NActors { // first, we create as many data packets as we can generate under certain conditions; they include presence // of events in channels queues and in flight fitting into requested limit; after we hit one of these conditions // we exit cycle - if (Socket) { - TTimeLimit limit(GetMaxCyclesPerEvent()); - - while (NumEventsInReadyChannels && InflightDataAmount < GetTotalInflightAmountOfData()) { - if (generatedPackets && limit.CheckExceeded()) { - // resume later but ensure that we have issued at least one packet - IssueRam(); - enoughCpu = false; - ++CpuStarvationEvents; - break; - } - - try { - generatedBytes += MakePacket(true); - ++generatedPackets; - } catch (const TExSerializedEventTooLarge& ex) { - // terminate session if the event can't be serialized properly - accountTraffic(); - LOG_CRIT_IC("ICS31", "serialized event Type# 0x%08" PRIx32 " is too large", ex.Type); - return Terminate(TDisconnectReason::EventTooLarge()); - } + while (NumEventsInReadyChannels && InflightDataAmount < GetTotalInflightAmountOfData() && GetUnsentSize() < GetUnsentLimit()) { + try { + MakePacket(true); + } catch (const TExSerializedEventTooLarge& ex) { + // terminate session if the event can't be serialized properly + accountTraffic(); + LOG_CRIT_IC("ICS31", "serialized event Type# 0x%08" PRIx32 " is too large", ex.Type); + return Terminate(TDisconnectReason::EventTooLarge()); + } + if (TimeLimit->CheckExceeded()) { + break; } } - SetEnoughCpu(enoughCpu); - - if (Socket) { - WriteData(); - } - - LWPROBE(GenerateTraffic, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - generateStarted) * 1000.0, sizeBefore - TotalOutputQueueSize, generatedPackets, generatedBytes); - accountTraffic(); EqualizeCounter += ChannelScheduler->Equalize(); } @@ -537,7 +528,7 @@ namespace NActors { Send(ReceiverId, ev->Release().Release()); } - IssueRam(); + GenerateTraffic(); } void TInterconnectSessionTCP::Handle(TEvPollerRegisterResult::TPtr ev) { @@ -556,11 +547,9 @@ namespace NActors { } } - void TInterconnectSessionTCP::HandleWriteData() { - Y_VERIFY(WriteDataInFlight); - WriteDataInFlight = false; - if (!Socket) { - return; + void TInterconnectSessionTCP::WriteData() { + if (!TimeLimit) { + TimeLimit.emplace(GetMaxCyclesPerEvent()); } // total bytes written during this call @@ -594,8 +583,8 @@ namespace NActors { for (;;) { bool progress = false; - - size_t bytesToSendInMain = Max<size_t>(); + static constexpr size_t maxBytesAtOnce = 256 * 1024; + size_t bytesToSendInMain = maxBytesAtOnce; if (OutOfBandStream) { bytesToSendInMain = 0; @@ -623,24 +612,20 @@ namespace NActors { } if (!bytesToSendInMain) { - if (const size_t w = process(OutOfBandStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked, Max<size_t>())) { + if (const size_t w = process(OutOfBandStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked, maxBytesAtOnce)) { BytesWrittenToSocket += w; OutOfBandBytesSent += w; progress = true; } } - if (const size_t w = process(XdcStream, XdcSocket, XdcPollerToken, &ReceiveContext->XdcWriteBlocked, Max<size_t>())) { + if (const size_t w = process(XdcStream, XdcSocket, XdcPollerToken, &ReceiveContext->XdcWriteBlocked, maxBytesAtOnce)) { XdcBytesSent += w; XdcOffset += w; progress = true; } - if (!progress) { - break; - } else if (limit.CheckExceeded()) { - WriteData(); - ++CpuStarvationEventsOnWriteData; + if (!progress || TimeLimit->CheckExceeded()) { break; } } @@ -649,9 +634,7 @@ namespace NActors { Proxy->Metrics->AddTotalBytesWritten(written); } - if (DropConfirmed(LastConfirmed)) { // issue GenerateTraffic a bit later - IssueRam(); - } + DropConfirmed(LastConfirmed); const bool writeBlockedByFullSendBuffer = ReceiveContext->MainWriteBlocked || ReceiveContext->XdcWriteBlocked; if (WriteBlockedByFullSendBuffer < writeBlockedByFullSendBuffer) { // became blocked @@ -663,13 +646,6 @@ namespace NActors { WriteBlockedByFullSendBuffer = writeBlockedByFullSendBuffer; } - void TInterconnectSessionTCP::WriteData() { - if (!WriteDataInFlight) { - WriteDataInFlight = true; - TActivationContext::Send(new IEventHandle(EvWriteData, 0, SelfId(), {}, nullptr, 0)); - } - } - ssize_t TInterconnectSessionTCP::Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket, size_t maxBytes) { LWPROBE_IF_TOO_LONG(SlowICWriteData, Proxy->PeerNodeId, ms) { @@ -774,7 +750,7 @@ namespace NActors { } } - ui64 TInterconnectSessionTCP::MakePacket(bool data, TMaybe<ui64> pingMask) { + void TInterconnectSessionTCP::MakePacket(bool data, TMaybe<ui64> pingMask) { NInterconnect::TOutgoingStream& stream = data ? OutgoingStream : OutOfBandStream; #ifndef NDEBUG @@ -782,6 +758,9 @@ namespace NActors { const size_t xdcStreamSizeBefore = XdcStream.CalculateOutgoingSize(); #endif + stream.Align(); + XdcStream.Align(); + TTcpPacketOutTask packet(Params, stream, XdcStream); ui64 serial = 0; @@ -847,11 +826,9 @@ namespace NActors { ResetFlushLogic(); ++PacketsGenerated; - - return packetSize; } - bool TInterconnectSessionTCP::DropConfirmed(ui64 confirm) { + void TInterconnectSessionTCP::DropConfirmed(ui64 confirm) { LOG_DEBUG_IC_SESSION("ICS23", "confirm count: %" PRIu64, confirm); Y_VERIFY(LastConfirmed <= confirm && confirm <= OutputCounter, @@ -859,7 +836,7 @@ namespace NActors { LogPrefix.data(), confirm, LastConfirmed, OutputCounter); LastConfirmed = confirm; - std::optional<ui64> lastDroppedSerial = 0; + std::optional<ui64> lastDroppedSerial; ui32 numDropped = 0; // drop confirmed packets; this also includes any auxiliary packets as their serial is set to zero, effectively @@ -885,7 +862,7 @@ namespace NActors { } if (!numDropped) { - return false; + return; } const ui64 droppedDataAmount = bytesDropped + bytesDroppedFromXdc - sizeof(TTcpPacketHeader_v2) * numDropped; @@ -897,10 +874,6 @@ namespace NActors { }); } - const ui64 current = InflightDataAmount; - const ui64 limit = GetTotalInflightAmountOfData(); - const bool unblockedSomething = current >= limit && current < limit + droppedDataAmount; - PacketsConfirmed += numDropped; InflightDataAmount -= droppedDataAmount; Proxy->Metrics->SubInflightDataAmount(droppedDataAmount); @@ -910,8 +883,6 @@ namespace NActors { " dropped %" PRIu32 " packets", InflightDataAmount, droppedDataAmount, numDropped); Pool->Trim(); // send any unsent free requests - - return unblockedSomething; } void TInterconnectSessionTCP::FillSendingBuffer(TTcpPacketOutTask& task, ui64 serial) { diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index 6c76fa8e3c..92146f6591 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -370,7 +370,6 @@ namespace NActors { EvRam, EvTerminate, EvFreeItems, - EvWriteData, }; struct TEvCheckCloseOnIdle : TEventLocal<TEvCheckCloseOnIdle, EvCheckCloseOnIdle> {}; @@ -436,25 +435,30 @@ namespace NActors { void Subscribe(STATEFN_SIG); void Unsubscribe(STATEFN_SIG); - STRICT_STFUNC(StateFunc, - fFunc(TEvInterconnect::EvForward, Forward) - cFunc(TEvents::TEvPoisonPill::EventType, HandlePoison) - fFunc(TEvInterconnect::TEvConnectNode::EventType, Subscribe) - fFunc(TEvents::TEvSubscribe::EventType, Subscribe) - fFunc(TEvents::TEvUnsubscribe::EventType, Unsubscribe) - cFunc(TEvFlush::EventType, HandleFlush) - hFunc(TEvPollerReady, Handle) - hFunc(TEvPollerRegisterResult, Handle) - hFunc(TEvUpdateFromInputSession, Handle) - hFunc(TEvRam, HandleRam) - hFunc(TEvCheckCloseOnIdle, CloseOnIdleWatchdog) - hFunc(TEvCheckLostConnection, LostConnectionWatchdog) - cFunc(TEvents::TSystem::Wakeup, SendUpdateToWhiteboard) - hFunc(TEvSocketDisconnect, OnDisconnect) - hFunc(TEvTerminate, Handle) - hFunc(TEvProcessPingRequest, Handle) - cFunc(EvWriteData, HandleWriteData) - ) + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + std::optional<TTimeLimit> TimeLimit; + + STATEFN(StateFunc) { + TimeLimit.emplace(GetMaxCyclesPerEvent()); + STRICT_STFUNC_BODY( + fFunc(TEvInterconnect::EvForward, Forward) + cFunc(TEvents::TEvPoisonPill::EventType, HandlePoison) + fFunc(TEvInterconnect::TEvConnectNode::EventType, Subscribe) + fFunc(TEvents::TEvSubscribe::EventType, Subscribe) + fFunc(TEvents::TEvUnsubscribe::EventType, Unsubscribe) + cFunc(TEvFlush::EventType, HandleFlush) + hFunc(TEvPollerReady, Handle) + hFunc(TEvPollerRegisterResult, Handle) + hFunc(TEvUpdateFromInputSession, Handle) + hFunc(TEvRam, HandleRam) + hFunc(TEvCheckCloseOnIdle, CloseOnIdleWatchdog) + hFunc(TEvCheckLostConnection, LostConnectionWatchdog) + cFunc(TEvents::TSystem::Wakeup, SendUpdateToWhiteboard) + hFunc(TEvSocketDisconnect, OnDisconnect) + hFunc(TEvTerminate, Handle) + hFunc(TEvProcessPingRequest, Handle) + ) + } void Handle(TEvUpdateFromInputSession::TPtr& ev); @@ -465,24 +469,31 @@ namespace NActors { TEvRam* RamInQueue = nullptr; ui64 RamStartedCycles = 0; - void IssueRam(); + void IssueRam(bool batching); void HandleRam(TEvRam::TPtr& ev); void GenerateTraffic(); + void ProducePackets(); + + size_t GetUnsentSize() const { + return OutgoingStream.CalculateUnsentSize() + OutOfBandStream.CalculateUnsentSize() + + XdcStream.CalculateUnsentSize(); + } + + size_t GetUnsentLimit() const { + return 128 * 1024; + } void SendUpdateToWhiteboard(bool connected = true); ui32 CalculateQueueUtilization(); - bool WriteDataInFlight = false; - void Handle(TEvPollerReady::TPtr& ev); void Handle(TEvPollerRegisterResult::TPtr ev); - void HandleWriteData(); void WriteData(); ssize_t Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket, size_t maxBytes); - ui64 MakePacket(bool data, TMaybe<ui64> pingMask = {}); + void MakePacket(bool data, TMaybe<ui64> pingMask = {}); void FillSendingBuffer(TTcpPacketOutTask& packet, ui64 serial); - bool DropConfirmed(ui64 confirm); + void DropConfirmed(ui64 confirm); void ShutdownSocket(TDisconnectReason reason); void StartHandshake(); diff --git a/library/cpp/actors/interconnect/outgoing_stream.h b/library/cpp/actors/interconnect/outgoing_stream.h index 011ecebb9a..197b9219c2 100644 --- a/library/cpp/actors/interconnect/outgoing_stream.h +++ b/library/cpp/actors/interconnect/outgoing_stream.h @@ -36,6 +36,7 @@ namespace NInterconnect { std::deque<TSendChunk> SendQueue; size_t SendQueuePos = 0; size_t SendOffset = 0; + size_t UnsentBytes = 0; public: operator bool() const { @@ -51,11 +52,14 @@ namespace NInterconnect { } size_t CalculateUnsentSize() const { +#ifndef NDEBUG size_t res = 0; for (auto it = SendQueue.begin() + SendQueuePos; it != SendQueue.end(); ++it) { res += it->Span.size(); } - return res - SendOffset; + Y_VERIFY(UnsentBytes == res - SendOffset); +#endif + return UnsentBytes; } size_t GetSendQueueSize() const { @@ -74,6 +78,16 @@ namespace NInterconnect { return {AppendBuffer->Data + AppendOffset, Min(maxLen, BufferSize - AppendOffset)}; } + void Align() { + if (AppendOffset != BufferSize) { + AppendOffset += -(reinterpret_cast<uintptr_t>(AppendBuffer->Data) + AppendOffset) & 63; + if (AppendOffset > BufferSize) { + AppendOffset = BufferSize; + DropBufferReference(std::exchange(AppendBuffer, nullptr)); + } + } + } + void Append(TContiguousSpan span) { if (AppendBuffer && span.data() == AppendBuffer->Data + AppendOffset) { // the only valid case to use previously acquired span AppendAcquiredSpan(span); @@ -129,11 +143,16 @@ namespace NInterconnect { void Rewind() { SendQueuePos = 0; SendOffset = 0; + UnsentBytes = 0; + for (const auto& item : SendQueue) { + UnsentBytes += item.Span.size(); + } } void RewindToEnd() { SendQueuePos = SendQueue.size(); SendOffset = 0; + UnsentBytes = 0; } template<typename T> @@ -149,7 +168,9 @@ namespace NInterconnect { void Advance(size_t numBytes) { // called when numBytes portion of data has been sent Y_VERIFY_DEBUG(numBytes == 0 || SendQueuePos != SendQueue.size()); + Y_VERIFY_DEBUG(numBytes <= UnsentBytes); SendOffset += numBytes; + UnsentBytes -= numBytes; for (auto it = SendQueue.begin() + SendQueuePos; SendOffset && it->Span.size() <= SendOffset; ++SendQueuePos, ++it) { SendOffset -= it->Span.size(); Y_VERIFY_DEBUG(SendOffset == 0 || SendQueuePos != SendQueue.size() - 1); @@ -214,6 +235,7 @@ namespace NInterconnect { } void AppendSpanWithGlueing(TContiguousSpan span, TBuffer *buffer) { + UnsentBytes += span.size(); if (!SendQueue.empty()) { auto& back = SendQueue.back(); if (back.Span.data() + back.Span.size() == span.data()) { // check if it is possible just to extend the last span |