diff options
author | alexvru <alexvru@ydb.tech> | 2023-04-27 11:24:30 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-04-27 11:24:30 +0300 |
commit | b56866c69663e9a5106f99e79d30b7abd850f070 (patch) | |
tree | d183bbc784fa1666e3023f5d8e93929ae7a4d922 /library | |
parent | c7b3829d69c023694e8d20b7d91c02f47f1fb4e2 (diff) | |
download | ydb-b56866c69663e9a5106f99e79d30b7abd850f070.tar.gz |
Some IC fixes
Diffstat (limited to 'library')
4 files changed, 198 insertions, 224 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index 37e1bb8b5f..edcffea380 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -6,10 +6,35 @@ namespace NActors { LWTRACE_USING(ACTORLIB_PROVIDER); - void TReceiveContext::TPerChannelContext::CalculateBytesToCatch() { - XdcBytesToCatch = FetchOffset; + void TReceiveContext::TPerChannelContext::PrepareCatchBuffer() { + size_t bytesToCatch = FetchOffset; for (auto it = XdcBuffers.begin(), end = it + FetchIndex; it != end; ++it) { - XdcBytesToCatch += it->size(); + bytesToCatch += it->size(); + } + + XdcCatchBuffer = TRcBuf::Uninitialized(bytesToCatch); + XdcCatchBytesRead = 0; + } + + void TReceiveContext::TPerChannelContext::ApplyCatchBuffer() { + if (auto buffer = std::exchange(XdcCatchBuffer, {})) { + Y_VERIFY(XdcCatchBytesRead >= buffer.size()); + + const size_t offset = XdcCatchBytesRead % buffer.size(); + const char *begin = buffer.data(); + const char *mid = begin + offset; + const char *end = begin + buffer.size(); + Y_VERIFY_DEBUG(begin <= mid && mid < end); + + TRope rope; + rope.Insert(rope.End(), TRcBuf(TRcBuf::Piece, mid, end, buffer)); + if (begin != mid) { + rope.Insert(rope.End(), TRcBuf(TRcBuf::Piece, begin, mid, buffer)); + } + + DropFront(&rope, buffer.size()); + } else { + Y_VERIFY_DEBUG(!XdcCatchBytesRead); } } @@ -35,6 +60,8 @@ namespace NActors { } void TReceiveContext::TPerChannelContext::DropFront(TRope *from, size_t numBytes) { + Y_VERIFY_DEBUG(from || !XdcCatchBuffer); + size_t n = numBytes; for (auto& pendingEvent : PendingEvents) { const size_t numBytesInEvent = Min(n, pendingEvent.XdcSizeLeft); @@ -97,10 +124,10 @@ namespace NActors { // calculate number of bytes to catch for (auto& context : Context->ChannelArray) { - context.CalculateBytesToCatch(); + context.PrepareCatchBuffer(); } for (auto& [channel, context] : Context->ChannelMap) { - context.CalculateBytesToCatch(); + context.PrepareCatchBuffer(); } UsageHisto.fill(0); @@ -317,20 +344,21 @@ namespace NActors { } } if (PayloadSize) { - const ui64 expected = Context->LastProcessedSerial + 1; - if (serial == 0 || serial > expected) { - LOG_CRIT_IC_SESSION("ICIS06", "packet serial %" PRIu64 ", but %" PRIu64 " expected", serial, expected); + const ui64 expectedMin = Context->GetLastPacketSerialToConfirm() + 1; + const ui64 expectedMax = Context->LastProcessedSerial + 1; + Y_VERIFY_DEBUG(expectedMin <= expectedMax); + if (CurrentSerial ? serial != CurrentSerial + 1 : (serial == 0 || serial > expectedMin)) { + LOG_CRIT_IC_SESSION("ICIS06", "%s", TString(TStringBuilder() + << "packet serial number mismatch" + << " Serial# " << serial + << " ExpectedMin# " << expectedMin + << " ExpectedMax# " << expectedMax + << " CurrentSerial# " << CurrentSerial + ).data()); throw TExDestroySession{TDisconnectReason::FormatError()}; } - if (Context->LastProcessedSerial <= serial) { - XdcCatchStreamFinalPending = true; // we can't switch it right now, only after packet is fully processed - } - if (serial == expected) { - InboundPacketQ.push_back(TInboundPacket{serial, 0}); - IgnorePayload = false; - } else { - IgnorePayload = true; - } + IgnorePayload = serial != expectedMax; + CurrentSerial = serial; State = EState::PAYLOAD; Y_VERIFY_DEBUG(!Payload); } else if (serial & TTcpPacketBuf::PingRequestMask) { @@ -355,6 +383,7 @@ namespace NActors { if (PayloadSize) { return; // there is still some data to receive in the Payload rope } + InboundPacketQ.push_back(TInboundPacket{CurrentSerial, 0}); State = EState::HEADER; if (!Params.Encryption) { // see if we are checksumming packet body for (const auto&& [data, size] : Payload) { @@ -423,8 +452,14 @@ namespace NActors { } // mark packet as processed - XdcCatchStreamFinal = XdcCatchStreamFinalPending; - Context->LastProcessedSerial += !IgnorePayload; + if (IgnorePayload) { + Y_VERIFY_DEBUG(CurrentSerial <= Context->LastProcessedSerial); + } else { + ++Context->LastProcessedSerial; + Y_VERIFY_DEBUG(CurrentSerial == Context->LastProcessedSerial); + } + XdcCatchStream.Ready = Context->LastProcessedSerial == CurrentSerial; + ApplyXdcCatchStream(); ProcessInboundPacketQ(0); ++PacketsReadFromSocket; @@ -432,7 +467,7 @@ namespace NActors { IgnoredDataPacketsFromSocket += IgnorePayload; } - void TInputSessionTCP::ProcessInboundPacketQ(size_t numXdcBytesRead) { + void TInputSessionTCP::ProcessInboundPacketQ(ui64 numXdcBytesRead) { for (; !InboundPacketQ.empty(); InboundPacketQ.pop_front()) { auto& front = InboundPacketQ.front(); @@ -445,9 +480,11 @@ namespace NActors { break; } - const bool success = Context->AdvanceLastPacketSerialToConfirm(front.Serial); - Y_VERIFY_DEBUG(Context->GetLastPacketSerialToConfirm() <= Context->LastProcessedSerial); - if (!success) { + Y_VERIFY_DEBUG(front.Serial + InboundPacketQ.size() - 1 <= Context->LastProcessedSerial, + "front.Serial# %" PRIu64 " LastProcessedSerial# %" PRIu64 " InboundPacketQ.size# %zu", + front.Serial, Context->LastProcessedSerial, InboundPacketQ.size()); + + if (Context->GetLastPacketSerialToConfirm() < front.Serial && !Context->AdvanceLastPacketSerialToConfirm(front.Serial)) { throw TExReestablishConnection{TDisconnectReason::NewSession()}; } } @@ -503,16 +540,18 @@ namespace NActors { XdcChecksumQ.emplace_back(size, checksumExpected); } + // account channel and number of bytes in XDC for this packet + auto& packet = InboundPacketQ.back(); + packet.XdcUnreadBytes += size; + if (IgnorePayload) { // this packet was already marked as 'processed', all commands had been executed, but we must - // parse XDC stream correctly - XdcCatchStreamBytesPending += size; - XdcCatchStreamMarkup.emplace_back(channel, size); + // parse XDC stream from this packet correctly + const bool apply = Context->GetLastPacketSerialToConfirm() < CurrentSerial && + GetPerChannelContext(channel).XdcCatchBuffer; + XdcCatchStream.BytesPending += size; + XdcCatchStream.Markup.emplace_back(channel, apply, size); } else { - // account channel and number of bytes in XDC for this packet - auto& packet = InboundPacketQ.back(); - packet.XdcUnreadBytes += size; - // find buffers and acquire data buffer pointers context.FetchBuffers(channel, size, XdcInputQ); } @@ -538,7 +577,7 @@ namespace NActors { auto& descr = *pendingEvent.EventData; #if IC_FORCE_HARDENED_PACKET_CHECKS if (descr.Len != pendingEvent.Payload.GetSize()) { - LOG_CRIT_IC_SESSION("ICISxx", "event length mismatch Type# 0x%08" PRIx32 " received# %zu expected# %" PRIu32, + LOG_CRIT_IC_SESSION("ICIS17", "event length mismatch Type# 0x%08" PRIx32 " received# %zu expected# %" PRIu32, descr.Type, pendingEvent.Payload.GetSize(), descr.Len); throw TExReestablishConnection{TDisconnectReason::ChecksumError()}; } @@ -693,83 +732,89 @@ namespace NActors { bool TInputSessionTCP::ReadXdcCatchStream(ui64 *numDataBytes) { bool progress = false; - while (XdcCatchStreamBytesPending) { // read data into catch stream if we still have to - if (!XdcCatchStreamBuffer) { - XdcCatchStreamBuffer = TRcBuf::Uninitialized(64 * 1024); + while (XdcCatchStream.BytesPending) { + if (!XdcCatchStream.Buffer) { + XdcCatchStream.Buffer = TRcBuf::Uninitialized(64 * 1024); } - const size_t numBytesToRead = Min<size_t>(XdcCatchStreamBytesPending, XdcCatchStreamBuffer.size() - XdcCatchStreamBufferOffset); + const size_t numBytesToRead = Min<size_t>(XdcCatchStream.BytesPending, XdcCatchStream.Buffer.size()); - TIoVec iov{XdcCatchStreamBuffer.GetDataMut() + XdcCatchStreamBufferOffset, numBytesToRead}; + TIoVec iov{XdcCatchStream.Buffer.GetDataMut(), numBytesToRead}; ssize_t recvres = Read(*XdcSocket, XdcPollerToken, &Context->XdcReadPending, &iov, 1); if (recvres == -1) { return progress; } - HandleXdcChecksum({XdcCatchStreamBuffer.data() + XdcCatchStreamBufferOffset, static_cast<size_t>(recvres)}); + HandleXdcChecksum({XdcCatchStream.Buffer.data(), static_cast<size_t>(recvres)}); - XdcCatchStreamBufferOffset += recvres; - XdcCatchStreamBytesPending -= recvres; + XdcCatchStream.BytesPending -= recvres; + XdcCatchStream.BytesProcessed += recvres; *numDataBytes += recvres; BytesReadFromXdcSocket += recvres; - if (XdcCatchStreamBufferOffset == XdcCatchStreamBuffer.size() || XdcCatchStreamBytesPending == 0) { - TRope(std::exchange(XdcCatchStreamBuffer, {})).ExtractFront(XdcCatchStreamBufferOffset, &XdcCatchStream); - XdcCatchStreamBufferOffset = 0; + // scatter read data + const char *in = XdcCatchStream.Buffer.data(); + while (recvres) { + Y_VERIFY_DEBUG(!XdcCatchStream.Markup.empty()); + auto& [channel, apply, bytes] = XdcCatchStream.Markup.front(); + size_t bytesInChannel = Min<size_t>(recvres, bytes); + bytes -= bytesInChannel; + recvres -= bytesInChannel; + + if (apply) { + auto& context = GetPerChannelContext(channel); + while (bytesInChannel) { + const size_t offset = context.XdcCatchBytesRead % context.XdcCatchBuffer.size(); + TMutableContiguousSpan out = context.XdcCatchBuffer.GetContiguousSpanMut().SubSpan(offset, bytesInChannel); + memcpy(out.data(), in, out.size()); + context.XdcCatchBytesRead += out.size(); + in += out.size(); + bytesInChannel -= out.size(); + } + } else { + in += bytesInChannel; + } + + if (!bytes) { + XdcCatchStream.Markup.pop_front(); + } } progress = true; } - if (XdcCatchStreamFinal && XdcCatchStream) { - // calculate total number of bytes to catch - size_t totalBytesToCatch = 0; + ApplyXdcCatchStream(); + + return progress; + } + + void TInputSessionTCP::ApplyXdcCatchStream() { + if (!XdcCatchStream.Applied && XdcCatchStream.Ready && !XdcCatchStream.BytesPending) { + Y_VERIFY_DEBUG(XdcCatchStream.Markup.empty()); + + auto process = [&](auto& context) { + context.ApplyCatchBuffer(); + ProcessEvents(context); + }; for (auto& context : Context->ChannelArray) { - totalBytesToCatch += context.XdcBytesToCatch; + process(context); } for (auto& [channel, context] : Context->ChannelMap) { - totalBytesToCatch += context.XdcBytesToCatch; + process(context); } - // calculate ignored offset - Y_VERIFY(totalBytesToCatch <= XdcCatchStream.GetSize()); - size_t bytesToIgnore = XdcCatchStream.GetSize() - totalBytesToCatch; + ProcessInboundPacketQ(XdcCatchStream.BytesProcessed); - // process catch stream markup - THashSet<ui16> channels; - for (auto [channel, size] : XdcCatchStreamMarkup) { - if (const size_t n = Min<size_t>(bytesToIgnore, size)) { - XdcCatchStream.EraseFront(n); - bytesToIgnore -= n; - size -= n; - } - if (const size_t n = Min<size_t>(totalBytesToCatch, size)) { - GetPerChannelContext(channel).DropFront(&XdcCatchStream, n); - channels.insert(channel); - totalBytesToCatch -= n; - size -= n; - } - Y_VERIFY(!size); - } - for (ui16 channel : channels) { - ProcessEvents(GetPerChannelContext(channel)); - } - - // ensure everything was processed - Y_VERIFY(!XdcCatchStream); - Y_VERIFY(!bytesToIgnore); - Y_VERIFY(!totalBytesToCatch); - XdcCatchStreamMarkup = {}; + XdcCatchStream.Buffer = {}; + XdcCatchStream.Applied = true; } - - return progress; } bool TInputSessionTCP::ReadXdc(ui64 *numDataBytes) { bool progress = ReadXdcCatchStream(numDataBytes); // exit if we have no work to do - if (XdcInputQ.empty() || XdcCatchStreamBytesPending) { + if (XdcInputQ.empty() || !XdcCatchStream.Applied) { return progress; } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index b0b6dc0b66..487a598ebc 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -223,7 +223,7 @@ namespace NActors { i64(*ev->Get()->Socket)); NewConnectionSet = TActivationContext::Now(); - PacketsWrittenToSocket = 0; + BytesWrittenToSocket = 0; SendBufferSize = ev->Get()->Socket->GetSendBufferSize(); Socket = std::move(ev->Get()->Socket); @@ -248,9 +248,9 @@ namespace NActors { ReceiveContext->XdcReadPending = false; // create input session actor + ReceiveContext->UnlockLastPacketSerialToConfirm(); auto actor = MakeHolder<TInputSessionTCP>(SelfId(), Socket, XdcSocket, ReceiveContext, Proxy->Common, Proxy->Metrics, Proxy->PeerNodeId, nextPacket, GetDeadPeerTimeout(), Params); - ReceiveContext->ResetLastPacketSerialToConfirm(); ReceiverId = Params.Encryption ? RegisterWithSameMailbox(actor.Release()) : Register(actor.Release(), TMailboxType::ReadAsFilled); // register our socket in poller actor @@ -274,37 +274,29 @@ namespace NActors { // // scan through send queue and leave only those packets who have data -- we will simply resend them; drop all other // auxiliary packets; also reset packet metrics to zero to start sending from the beginning - // also reset SendQueuePos + // also reset send queue // drop confirmed packets first as we do not need unwanted retransmissions - WriteSinglePacketAndDropConfirmed = false; OutgoingStream.RewindToEnd(); XdcStream.RewindToEnd(); - SendQueuePos = SendQueue.size(); - SendOffset = 0; + OutgoingOffset = XdcOffset = Max<size_t>(); DropConfirmed(nextPacket); OutgoingStream.Rewind(); XdcStream.Rewind(); - SendQueuePos = 0; - SendOffset = 0; + OutgoingOffset = XdcOffset = 0; ui64 serial = Max<ui64>(); - BytesUnwritten = 0; for (const auto& packet : SendQueue) { - if (packet.Data && packet.Serial < serial) { + if (packet.Data) { serial = packet.Serial; + break; } - BytesUnwritten += packet.PacketSize; } Y_VERIFY(serial > LastConfirmed, "%s serial# %" PRIu64 " LastConfirmed# %" PRIu64, LogPrefix.data(), serial, LastConfirmed); - LOG_DEBUG_IC_SESSION("ICS06", "rewind SendQueue size# %zu LastConfirmed# %" PRIu64 " SendQueuePos.Serial# %" PRIu64 "\n", + LOG_DEBUG_IC_SESSION("ICS06", "rewind SendQueue size# %zu LastConfirmed# %" PRIu64 " NextSerial# %" PRIu64, SendQueue.size(), LastConfirmed, serial); - Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize(), "%s", TString(TStringBuilder() - << "BytesUnwritten# " << BytesUnwritten - << " CalculateUnsentSize# " << OutgoingStream.CalculateUnsentSize()).data()); - SwitchStuckPeriod(); LastHandshakeDone = TActivationContext::Now(); @@ -576,13 +568,14 @@ namespace NActors { void TInterconnectSessionTCP::WriteData() { Y_VERIFY(Socket); // ensure that socket wasn't closed - // total bytes written during this call - ui64 written = 0; - auto process = [&](NInterconnect::TOutgoingStream& stream, const TIntrusivePtr<NInterconnect::TStreamSocket>& socket, - const TPollerToken::TPtr& token, bool *writeBlocked, size_t maxBytes, auto&& callback) { + const TPollerToken::TPtr& token, bool *writeBlocked) { + size_t totalWritten = 0; + + *writeBlocked = false; + while (stream && socket) { - ssize_t r = Write(stream, *socket, maxBytes); + ssize_t r = Write(stream, *socket); if (r == -1) { *writeBlocked = true; if (token) { @@ -593,59 +586,37 @@ namespace NActors { break; // error condition } - *writeBlocked = false; - written += r; - callback(r); - } - if (!socket) { - *writeBlocked = true; - } else if (!stream) { - *writeBlocked = false; + stream.Advance(r); + totalWritten += r; } - }; - Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize()); - - Y_VERIFY_DEBUG(!WriteSinglePacketAndDropConfirmed || SendQueuePos != SendQueue.size()); - const size_t maxBytesFromMainStream = WriteSinglePacketAndDropConfirmed - ? SendQueue[SendQueuePos].PacketSize - SendOffset - : Max<size_t>(); - process(OutgoingStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked, maxBytesFromMainStream, [&](size_t r) { - Y_VERIFY(r <= BytesUnwritten); - BytesUnwritten -= r; - - OutgoingStream.Advance(r); - - ui64 packets = 0; - Y_VERIFY_DEBUG(SendQueuePos != SendQueue.size()); - SendOffset += r; - for (auto it = SendQueue.begin() + SendQueuePos; SendOffset && it->PacketSize <= SendOffset; ++SendQueuePos, ++it) { - SendOffset -= it->PacketSize; - Y_VERIFY_DEBUG(!it->Data || it->Serial <= OutputCounter); - if (it->Data && LastSentSerial < it->Serial) { - LastSentSerial = it->Serial; - } - ++PacketsWrittenToSocket; - ++packets; - Y_VERIFY_DEBUG(SendOffset == 0 || SendQueuePos != SendQueue.size() - 1); + return totalWritten; + }; - if (std::exchange(WriteSinglePacketAndDropConfirmed, false)) { // drop remaining already confirmed packets - DropConfirmed(LastConfirmed); - } - } + // total bytes written during this call + ui64 written = 0; - Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize()); - }); + if (const size_t w = process(OutgoingStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked)) { + written += w; + BytesWrittenToSocket += w; + OutgoingOffset += w; + } - process(XdcStream, XdcSocket, XdcPollerToken, &ReceiveContext->XdcWriteBlocked, Max<size_t>(), [&](size_t r) { - XdcBytesSent += r; - XdcStream.Advance(r); - }); + if (const size_t w = process(XdcStream, XdcSocket, XdcPollerToken, &ReceiveContext->XdcWriteBlocked)) { + written += w; + XdcBytesSent += w; + XdcOffset += w; + } if (written) { Proxy->Metrics->AddTotalBytesWritten(written); } + if (DropConfirmed(LastConfirmed) && !RamInQueue) { // issue GenerateTraffic a bit later + RamInQueue = new TEvRam(false); + Send(SelfId(), RamInQueue); + } + const bool writeBlockedByFullSendBuffer = ReceiveContext->MainWriteBlocked || ReceiveContext->XdcWriteBlocked; if (WriteBlockedByFullSendBuffer < writeBlockedByFullSendBuffer) { // became blocked WriteBlockedCycles = GetCycleCountFast(); @@ -656,8 +627,7 @@ namespace NActors { WriteBlockedByFullSendBuffer = writeBlockedByFullSendBuffer; } - ssize_t TInterconnectSessionTCP::Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket, - size_t maxBytes) { + ssize_t TInterconnectSessionTCP::Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket) { LWPROBE_IF_TOO_LONG(SlowICWriteData, Proxy->PeerNodeId, ms) { constexpr ui32 iovLimit = 256; @@ -679,21 +649,6 @@ namespace NActors { stream.ProduceIoVec(wbuffers, maxElementsInIOV); Y_VERIFY(!wbuffers.empty()); - if (maxBytes != Max<size_t>()) { - for (auto it = wbuffers.begin(); it != wbuffers.end(); ++it) { - if (maxBytes < it->Size) { - if (maxBytes) { - it->Size = maxBytes; - ++it; - } - wbuffers.erase(it, wbuffers.end()); - break; - } else { - maxBytes -= it->Size; - } - } - } - TString err; ssize_t r = 0; { // issue syscall with timing @@ -818,10 +773,6 @@ namespace NActors { // count number of bytes pending for write const size_t packetSize = packet.GetPacketSize(); - BytesUnwritten += packetSize; - Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize(), "%s", TString(TStringBuilder() - << "BytesUnwritten# " << BytesUnwritten << " packetSize# " << packetSize - << " CalculateUnsentSize# " << OutgoingStream.CalculateUnsentSize()).data()); #ifndef NDEBUG const size_t outgoingStreamSizeAfter = OutgoingStream.CalculateOutgoingSize(); @@ -844,8 +795,7 @@ namespace NActors { }); LOG_DEBUG_IC_SESSION("ICS22", "outgoing packet Serial# %" PRIu64 " Confirm# %" PRIu64 " DataSize# %zu" - " InflightDataAmount# %" PRIu64 " BytesUnwritten# %" PRIu64, serial, lastInputSerial, packet.GetDataSize(), - InflightDataAmount, BytesUnwritten); + " InflightDataAmount# %" PRIu64, serial, lastInputSerial, packet.GetDataSize(), InflightDataAmount); // reset forced packet sending timestamp as we have confirmed all received data ResetFlushLogic(); @@ -858,59 +808,34 @@ namespace NActors { bool TInterconnectSessionTCP::DropConfirmed(ui64 confirm) { LOG_DEBUG_IC_SESSION("ICS23", "confirm count: %" PRIu64, confirm); - Y_VERIFY(LastConfirmed <= confirm && confirm <= LastSentSerial && LastSentSerial <= OutputCounter, - "%s confirm# %" PRIu64 " LastConfirmed# %" PRIu64 " OutputCounter# %" PRIu64 " LastSentSerial# %" PRIu64, - LogPrefix.data(), confirm, LastConfirmed, OutputCounter, LastSentSerial); + Y_VERIFY(LastConfirmed <= confirm && confirm <= OutputCounter, + "%s confirm# %" PRIu64 " LastConfirmed# %" PRIu64 " OutputCounter# %" PRIu64, + LogPrefix.data(), confirm, LastConfirmed, OutputCounter); LastConfirmed = confirm; std::optional<ui64> lastDroppedSerial = 0; ui32 numDropped = 0; -#ifndef NDEBUG - { - size_t totalBytesInSendQueue = 0; - size_t unsentBytesInSendQueue = 0; - for (size_t i = 0; i < SendQueue.size(); ++i) { - totalBytesInSendQueue += SendQueue[i].PacketSize; - if (SendQueuePos <= i) { - unsentBytesInSendQueue += SendQueue[i].PacketSize; - } - } - unsentBytesInSendQueue -= SendOffset; - Y_VERIFY(totalBytesInSendQueue == OutgoingStream.CalculateOutgoingSize()); - Y_VERIFY(unsentBytesInSendQueue == OutgoingStream.CalculateUnsentSize()); - } -#endif - // drop confirmed packets; this also includes any auxiliary packets as their serial is set to zero, effectively // making Serial <= confirm true size_t bytesDropped = 0; size_t bytesDroppedFromXdc = 0; - for (; !SendQueue.empty(); SendQueue.pop_front(), --SendQueuePos) { + for (; !SendQueue.empty(); SendQueue.pop_front()) { auto& front = SendQueue.front(); if (front.Data && confirm < front.Serial) { break; } - if (!SendQueuePos) { - // we are sending this packet right now; it might be a race -- we are resending packets after reconnection, - // while getting confirmation for higher packet number (as we have sent it before); here we have to send - // these packets (if necessary) and skip the rest of them - if (!SendOffset && Socket && !Socket->ExpectingCertainWrite()) { - // just advance to next packet -- we won't send current one - ++SendQueuePos; - } else { - // can't skip packets now, make a mark and exit - WriteSinglePacketAndDropConfirmed = true; - break; - } + if (OutgoingOffset < front.PacketSize || XdcOffset < front.ExternalSize) { + break; // packet wasn't actually sent to receiver, can't drop it now } if (front.Data) { lastDroppedSerial.emplace(front.Serial); } + OutgoingOffset -= front.PacketSize; + XdcOffset -= front.ExternalSize; bytesDropped += front.PacketSize; bytesDroppedFromXdc += front.ExternalSize; ++numDropped; - Y_VERIFY_DEBUG(SendQueuePos != 0); } if (!numDropped) { @@ -1238,7 +1163,6 @@ namespace NActors { MON_VAR(MessagesGot) MON_VAR(MessagesWrittenToBuffer) MON_VAR(PacketsGenerated) - MON_VAR(PacketsWrittenToSocket) MON_VAR(PacketsConfirmed) MON_VAR(ConfirmPacketsForcedBySize) MON_VAR(ConfirmPacketsForcedByTimeout) @@ -1284,7 +1208,6 @@ namespace NActors { MON_VAR(SendQueue.size()) MON_VAR(NumEventsInReadyChannels) MON_VAR(TotalOutputQueueSize) - MON_VAR(BytesUnwritten) MON_VAR(InflightDataAmount) MON_VAR(unsentQueueSize) MON_VAR(SendBufferSize) @@ -1292,7 +1215,6 @@ namespace NActors { MON_VAR(now - LastPayloadActivityTimestamp) MON_VAR(LastHandshakeDone) MON_VAR(OutputCounter) - MON_VAR(LastSentSerial) MON_VAR(LastConfirmed) MON_VAR(FlushSchedule.size()) MON_VAR(MaxFlushSchedule) @@ -1301,15 +1223,18 @@ namespace NActors { MON_VAR(GetWriteBlockedTotal()) + MON_VAR(BytesWrittenToSocket) MON_VAR(XdcBytesSent) MON_VAR(OutgoingStream.CalculateOutgoingSize()) MON_VAR(OutgoingStream.CalculateUnsentSize()) MON_VAR(OutgoingStream.GetSendQueueSize()) + MON_VAR(OutgoingOffset) MON_VAR(XdcStream.CalculateOutgoingSize()) MON_VAR(XdcStream.CalculateUnsentSize()) MON_VAR(XdcStream.GetSendQueueSize()) + MON_VAR(XdcOffset) TString clockSkew; i64 x = GetClockSkew(); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index af6c14d824..f07cbfa4eb 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -133,9 +133,12 @@ namespace NActors { std::deque<TMutableContiguousSpan> XdcBuffers; // receive queue for current channel size_t FetchIndex = 0; size_t FetchOffset = 0; - size_t XdcBytesToCatch = 0; // number of bytes to catch after reconnect - void CalculateBytesToCatch(); + ui64 XdcCatchBytesRead = 0; // number of bytes actually read into cyclic buffer + TRcBuf XdcCatchBuffer; + + void PrepareCatchBuffer(); + void ApplyCatchBuffer(); void FetchBuffers(ui16 channel, size_t numBytes, std::deque<std::tuple<ui16, TMutableContiguousSpan>>& outQ); void DropFront(TRope *from, size_t numBytes); }; @@ -174,8 +177,8 @@ namespace NActors { } } - void ResetLastPacketSerialToConfirm() { - LastPacketSerialToConfirm = LastProcessedSerial; + void UnlockLastPacketSerialToConfirm() { + LastPacketSerialToConfirm &= ~LastPacketSerialToConfirmLockBit; } ui64 GetLastPacketSerialToConfirm() { @@ -258,6 +261,7 @@ namespace NActors { PAYLOAD, }; EState State = EState::HEADER; + ui64 CurrentSerial = 0; std::vector<char> XdcCommands; @@ -267,17 +271,19 @@ namespace NActors { }; std::deque<TInboundPacket> InboundPacketQ; std::deque<std::tuple<ui16, TMutableContiguousSpan>> XdcInputQ; // target buffers for the XDC stream with channel reference - - // catch stream -- used after TCP reconnect to match unread XDC stream with main packet stream - TRope XdcCatchStream; // temporary data buffer to process XDC stream retransmission upon reconnect - TRcBuf XdcCatchStreamBuffer; - size_t XdcCatchStreamBufferOffset = 0; - ui64 XdcCatchStreamBytesPending = 0; - std::deque<std::tuple<ui16, ui16>> XdcCatchStreamMarkup; // a queue of pairs (channel, bytes) std::deque<std::tuple<ui16, ui32>> XdcChecksumQ; // (size, expectedChecksum) ui32 XdcCurrentChecksum = 0; - bool XdcCatchStreamFinal = false; - bool XdcCatchStreamFinalPending = false; + + // catch stream -- used after TCP reconnect to match XDC stream with main packet stream + struct TXdcCatchStream { + TRcBuf Buffer; + ui64 BytesPending = 0; + ui64 BytesProcessed = 0; + std::deque<std::tuple<ui16, bool, size_t>> Markup; // a queue of tuples (channel, apply, bytes) + bool Ready = false; + bool Applied = false; + }; + TXdcCatchStream XdcCatchStream; THolder<TEvUpdateFromInputSession> UpdateFromInputSession; @@ -295,13 +301,14 @@ namespace NActors { void ReceiveData(); void ProcessHeader(); void ProcessPayload(ui64 *numDataBytes); - void ProcessInboundPacketQ(size_t numXdcBytesRead); + void ProcessInboundPacketQ(ui64 numXdcBytesRead); void ProcessXdcCommand(ui16 channel, TReceiveContext::TPerChannelContext& context); void ProcessEvents(TReceiveContext::TPerChannelContext& context); ssize_t Read(NInterconnect::TStreamSocket& socket, const TPollerToken::TPtr& token, bool *readPending, const TIoVec *iov, size_t num); bool ReadMore(); bool ReadXdcCatchStream(ui64 *numDataBytes); + void ApplyXdcCatchStream(); bool ReadXdc(ui64 *numDataBytes); void HandleXdcChecksum(TContiguousSpan span); @@ -384,7 +391,7 @@ namespace NActors { ui64 MessagesGot = 0; ui64 MessagesWrittenToBuffer = 0; ui64 PacketsGenerated = 0; - ui64 PacketsWrittenToSocket = 0; + ui64 BytesWrittenToSocket = 0; ui64 PacketsConfirmed = 0; public: @@ -459,7 +466,7 @@ namespace NActors { void Handle(TEvPollerReady::TPtr& ev); void Handle(TEvPollerRegisterResult::TPtr ev); void WriteData(); - ssize_t Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket, size_t maxBytes); + ssize_t Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket); ui64 MakePacket(bool data, TMaybe<ui64> pingMask = {}); void FillSendingBuffer(TTcpPacketOutTask& packet, ui64 serial); @@ -530,9 +537,8 @@ namespace NActors { bool Data; }; std::deque<TOutgoingPacket> SendQueue; // packet boundaries - size_t SendQueuePos = 0; // packet being sent now - size_t SendOffset = 0; - bool WriteSinglePacketAndDropConfirmed = false; + size_t OutgoingOffset = 0; + size_t XdcOffset = 0; ui64 XdcBytesSent = 0; @@ -540,8 +546,6 @@ namespace NActors { TDuration WriteBlockedTotal; // total incremental duration that session has been blocked bool WriteBlockedByFullSendBuffer = false; - ui64 BytesUnwritten = 0; // number of bytes in outgoing main queue - TDuration GetWriteBlockedTotal() const { return WriteBlockedTotal + (WriteBlockedByFullSendBuffer ? TDuration::Seconds(NHPTimer::GetSeconds(GetCycleCountFast() - WriteBlockedCycles)) @@ -549,7 +553,6 @@ namespace NActors { } ui64 OutputCounter; - ui64 LastSentSerial = 0; TInstant LastHandshakeDone; diff --git a/library/cpp/actors/interconnect/outgoing_stream.h b/library/cpp/actors/interconnect/outgoing_stream.h index deffd1d93b..7c32911807 100644 --- a/library/cpp/actors/interconnect/outgoing_stream.h +++ b/library/cpp/actors/interconnect/outgoing_stream.h @@ -163,7 +163,8 @@ namespace NInterconnect { front.Span = front.Span.SubSpan(numBytes, Max<size_t>()); if (SendQueuePos == 0) { Y_VERIFY_DEBUG(numBytes <= SendOffset, "numBytes# %zu SendOffset# %zu SendQueuePos# %zu" - " SendQueue.size# %zu", numBytes, SendOffset, SendQueuePos, SendQueue.size()); + " SendQueue.size# %zu CalculateUnsentSize# %zu", numBytes, SendOffset, SendQueuePos, + SendQueue.size(), CalculateUnsentSize()); SendOffset -= numBytes; } break; |