aboutsummaryrefslogtreecommitdiffstats
path: root/library
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-04-27 11:24:30 +0300
committeralexvru <alexvru@ydb.tech>2023-04-27 11:24:30 +0300
commitb56866c69663e9a5106f99e79d30b7abd850f070 (patch)
treed183bbc784fa1666e3023f5d8e93929ae7a4d922 /library
parentc7b3829d69c023694e8d20b7d91c02f47f1fb4e2 (diff)
downloadydb-b56866c69663e9a5106f99e79d30b7abd850f070.tar.gz
Some IC fixes
Diffstat (limited to 'library')
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp201
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp171
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.h47
-rw-r--r--library/cpp/actors/interconnect/outgoing_stream.h3
4 files changed, 198 insertions, 224 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
index 37e1bb8b5f..edcffea380 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
@@ -6,10 +6,35 @@
namespace NActors {
LWTRACE_USING(ACTORLIB_PROVIDER);
- void TReceiveContext::TPerChannelContext::CalculateBytesToCatch() {
- XdcBytesToCatch = FetchOffset;
+ void TReceiveContext::TPerChannelContext::PrepareCatchBuffer() {
+ size_t bytesToCatch = FetchOffset;
for (auto it = XdcBuffers.begin(), end = it + FetchIndex; it != end; ++it) {
- XdcBytesToCatch += it->size();
+ bytesToCatch += it->size();
+ }
+
+ XdcCatchBuffer = TRcBuf::Uninitialized(bytesToCatch);
+ XdcCatchBytesRead = 0;
+ }
+
+ void TReceiveContext::TPerChannelContext::ApplyCatchBuffer() {
+ if (auto buffer = std::exchange(XdcCatchBuffer, {})) {
+ Y_VERIFY(XdcCatchBytesRead >= buffer.size());
+
+ const size_t offset = XdcCatchBytesRead % buffer.size();
+ const char *begin = buffer.data();
+ const char *mid = begin + offset;
+ const char *end = begin + buffer.size();
+ Y_VERIFY_DEBUG(begin <= mid && mid < end);
+
+ TRope rope;
+ rope.Insert(rope.End(), TRcBuf(TRcBuf::Piece, mid, end, buffer));
+ if (begin != mid) {
+ rope.Insert(rope.End(), TRcBuf(TRcBuf::Piece, begin, mid, buffer));
+ }
+
+ DropFront(&rope, buffer.size());
+ } else {
+ Y_VERIFY_DEBUG(!XdcCatchBytesRead);
}
}
@@ -35,6 +60,8 @@ namespace NActors {
}
void TReceiveContext::TPerChannelContext::DropFront(TRope *from, size_t numBytes) {
+ Y_VERIFY_DEBUG(from || !XdcCatchBuffer);
+
size_t n = numBytes;
for (auto& pendingEvent : PendingEvents) {
const size_t numBytesInEvent = Min(n, pendingEvent.XdcSizeLeft);
@@ -97,10 +124,10 @@ namespace NActors {
// calculate number of bytes to catch
for (auto& context : Context->ChannelArray) {
- context.CalculateBytesToCatch();
+ context.PrepareCatchBuffer();
}
for (auto& [channel, context] : Context->ChannelMap) {
- context.CalculateBytesToCatch();
+ context.PrepareCatchBuffer();
}
UsageHisto.fill(0);
@@ -317,20 +344,21 @@ namespace NActors {
}
}
if (PayloadSize) {
- const ui64 expected = Context->LastProcessedSerial + 1;
- if (serial == 0 || serial > expected) {
- LOG_CRIT_IC_SESSION("ICIS06", "packet serial %" PRIu64 ", but %" PRIu64 " expected", serial, expected);
+ const ui64 expectedMin = Context->GetLastPacketSerialToConfirm() + 1;
+ const ui64 expectedMax = Context->LastProcessedSerial + 1;
+ Y_VERIFY_DEBUG(expectedMin <= expectedMax);
+ if (CurrentSerial ? serial != CurrentSerial + 1 : (serial == 0 || serial > expectedMin)) {
+ LOG_CRIT_IC_SESSION("ICIS06", "%s", TString(TStringBuilder()
+ << "packet serial number mismatch"
+ << " Serial# " << serial
+ << " ExpectedMin# " << expectedMin
+ << " ExpectedMax# " << expectedMax
+ << " CurrentSerial# " << CurrentSerial
+ ).data());
throw TExDestroySession{TDisconnectReason::FormatError()};
}
- if (Context->LastProcessedSerial <= serial) {
- XdcCatchStreamFinalPending = true; // we can't switch it right now, only after packet is fully processed
- }
- if (serial == expected) {
- InboundPacketQ.push_back(TInboundPacket{serial, 0});
- IgnorePayload = false;
- } else {
- IgnorePayload = true;
- }
+ IgnorePayload = serial != expectedMax;
+ CurrentSerial = serial;
State = EState::PAYLOAD;
Y_VERIFY_DEBUG(!Payload);
} else if (serial & TTcpPacketBuf::PingRequestMask) {
@@ -355,6 +383,7 @@ namespace NActors {
if (PayloadSize) {
return; // there is still some data to receive in the Payload rope
}
+ InboundPacketQ.push_back(TInboundPacket{CurrentSerial, 0});
State = EState::HEADER;
if (!Params.Encryption) { // see if we are checksumming packet body
for (const auto&& [data, size] : Payload) {
@@ -423,8 +452,14 @@ namespace NActors {
}
// mark packet as processed
- XdcCatchStreamFinal = XdcCatchStreamFinalPending;
- Context->LastProcessedSerial += !IgnorePayload;
+ if (IgnorePayload) {
+ Y_VERIFY_DEBUG(CurrentSerial <= Context->LastProcessedSerial);
+ } else {
+ ++Context->LastProcessedSerial;
+ Y_VERIFY_DEBUG(CurrentSerial == Context->LastProcessedSerial);
+ }
+ XdcCatchStream.Ready = Context->LastProcessedSerial == CurrentSerial;
+ ApplyXdcCatchStream();
ProcessInboundPacketQ(0);
++PacketsReadFromSocket;
@@ -432,7 +467,7 @@ namespace NActors {
IgnoredDataPacketsFromSocket += IgnorePayload;
}
- void TInputSessionTCP::ProcessInboundPacketQ(size_t numXdcBytesRead) {
+ void TInputSessionTCP::ProcessInboundPacketQ(ui64 numXdcBytesRead) {
for (; !InboundPacketQ.empty(); InboundPacketQ.pop_front()) {
auto& front = InboundPacketQ.front();
@@ -445,9 +480,11 @@ namespace NActors {
break;
}
- const bool success = Context->AdvanceLastPacketSerialToConfirm(front.Serial);
- Y_VERIFY_DEBUG(Context->GetLastPacketSerialToConfirm() <= Context->LastProcessedSerial);
- if (!success) {
+ Y_VERIFY_DEBUG(front.Serial + InboundPacketQ.size() - 1 <= Context->LastProcessedSerial,
+ "front.Serial# %" PRIu64 " LastProcessedSerial# %" PRIu64 " InboundPacketQ.size# %zu",
+ front.Serial, Context->LastProcessedSerial, InboundPacketQ.size());
+
+ if (Context->GetLastPacketSerialToConfirm() < front.Serial && !Context->AdvanceLastPacketSerialToConfirm(front.Serial)) {
throw TExReestablishConnection{TDisconnectReason::NewSession()};
}
}
@@ -503,16 +540,18 @@ namespace NActors {
XdcChecksumQ.emplace_back(size, checksumExpected);
}
+ // account channel and number of bytes in XDC for this packet
+ auto& packet = InboundPacketQ.back();
+ packet.XdcUnreadBytes += size;
+
if (IgnorePayload) {
// this packet was already marked as 'processed', all commands had been executed, but we must
- // parse XDC stream correctly
- XdcCatchStreamBytesPending += size;
- XdcCatchStreamMarkup.emplace_back(channel, size);
+ // parse XDC stream from this packet correctly
+ const bool apply = Context->GetLastPacketSerialToConfirm() < CurrentSerial &&
+ GetPerChannelContext(channel).XdcCatchBuffer;
+ XdcCatchStream.BytesPending += size;
+ XdcCatchStream.Markup.emplace_back(channel, apply, size);
} else {
- // account channel and number of bytes in XDC for this packet
- auto& packet = InboundPacketQ.back();
- packet.XdcUnreadBytes += size;
-
// find buffers and acquire data buffer pointers
context.FetchBuffers(channel, size, XdcInputQ);
}
@@ -538,7 +577,7 @@ namespace NActors {
auto& descr = *pendingEvent.EventData;
#if IC_FORCE_HARDENED_PACKET_CHECKS
if (descr.Len != pendingEvent.Payload.GetSize()) {
- LOG_CRIT_IC_SESSION("ICISxx", "event length mismatch Type# 0x%08" PRIx32 " received# %zu expected# %" PRIu32,
+ LOG_CRIT_IC_SESSION("ICIS17", "event length mismatch Type# 0x%08" PRIx32 " received# %zu expected# %" PRIu32,
descr.Type, pendingEvent.Payload.GetSize(), descr.Len);
throw TExReestablishConnection{TDisconnectReason::ChecksumError()};
}
@@ -693,83 +732,89 @@ namespace NActors {
bool TInputSessionTCP::ReadXdcCatchStream(ui64 *numDataBytes) {
bool progress = false;
- while (XdcCatchStreamBytesPending) { // read data into catch stream if we still have to
- if (!XdcCatchStreamBuffer) {
- XdcCatchStreamBuffer = TRcBuf::Uninitialized(64 * 1024);
+ while (XdcCatchStream.BytesPending) {
+ if (!XdcCatchStream.Buffer) {
+ XdcCatchStream.Buffer = TRcBuf::Uninitialized(64 * 1024);
}
- const size_t numBytesToRead = Min<size_t>(XdcCatchStreamBytesPending, XdcCatchStreamBuffer.size() - XdcCatchStreamBufferOffset);
+ const size_t numBytesToRead = Min<size_t>(XdcCatchStream.BytesPending, XdcCatchStream.Buffer.size());
- TIoVec iov{XdcCatchStreamBuffer.GetDataMut() + XdcCatchStreamBufferOffset, numBytesToRead};
+ TIoVec iov{XdcCatchStream.Buffer.GetDataMut(), numBytesToRead};
ssize_t recvres = Read(*XdcSocket, XdcPollerToken, &Context->XdcReadPending, &iov, 1);
if (recvres == -1) {
return progress;
}
- HandleXdcChecksum({XdcCatchStreamBuffer.data() + XdcCatchStreamBufferOffset, static_cast<size_t>(recvres)});
+ HandleXdcChecksum({XdcCatchStream.Buffer.data(), static_cast<size_t>(recvres)});
- XdcCatchStreamBufferOffset += recvres;
- XdcCatchStreamBytesPending -= recvres;
+ XdcCatchStream.BytesPending -= recvres;
+ XdcCatchStream.BytesProcessed += recvres;
*numDataBytes += recvres;
BytesReadFromXdcSocket += recvres;
- if (XdcCatchStreamBufferOffset == XdcCatchStreamBuffer.size() || XdcCatchStreamBytesPending == 0) {
- TRope(std::exchange(XdcCatchStreamBuffer, {})).ExtractFront(XdcCatchStreamBufferOffset, &XdcCatchStream);
- XdcCatchStreamBufferOffset = 0;
+ // scatter read data
+ const char *in = XdcCatchStream.Buffer.data();
+ while (recvres) {
+ Y_VERIFY_DEBUG(!XdcCatchStream.Markup.empty());
+ auto& [channel, apply, bytes] = XdcCatchStream.Markup.front();
+ size_t bytesInChannel = Min<size_t>(recvres, bytes);
+ bytes -= bytesInChannel;
+ recvres -= bytesInChannel;
+
+ if (apply) {
+ auto& context = GetPerChannelContext(channel);
+ while (bytesInChannel) {
+ const size_t offset = context.XdcCatchBytesRead % context.XdcCatchBuffer.size();
+ TMutableContiguousSpan out = context.XdcCatchBuffer.GetContiguousSpanMut().SubSpan(offset, bytesInChannel);
+ memcpy(out.data(), in, out.size());
+ context.XdcCatchBytesRead += out.size();
+ in += out.size();
+ bytesInChannel -= out.size();
+ }
+ } else {
+ in += bytesInChannel;
+ }
+
+ if (!bytes) {
+ XdcCatchStream.Markup.pop_front();
+ }
}
progress = true;
}
- if (XdcCatchStreamFinal && XdcCatchStream) {
- // calculate total number of bytes to catch
- size_t totalBytesToCatch = 0;
+ ApplyXdcCatchStream();
+
+ return progress;
+ }
+
+ void TInputSessionTCP::ApplyXdcCatchStream() {
+ if (!XdcCatchStream.Applied && XdcCatchStream.Ready && !XdcCatchStream.BytesPending) {
+ Y_VERIFY_DEBUG(XdcCatchStream.Markup.empty());
+
+ auto process = [&](auto& context) {
+ context.ApplyCatchBuffer();
+ ProcessEvents(context);
+ };
for (auto& context : Context->ChannelArray) {
- totalBytesToCatch += context.XdcBytesToCatch;
+ process(context);
}
for (auto& [channel, context] : Context->ChannelMap) {
- totalBytesToCatch += context.XdcBytesToCatch;
+ process(context);
}
- // calculate ignored offset
- Y_VERIFY(totalBytesToCatch <= XdcCatchStream.GetSize());
- size_t bytesToIgnore = XdcCatchStream.GetSize() - totalBytesToCatch;
+ ProcessInboundPacketQ(XdcCatchStream.BytesProcessed);
- // process catch stream markup
- THashSet<ui16> channels;
- for (auto [channel, size] : XdcCatchStreamMarkup) {
- if (const size_t n = Min<size_t>(bytesToIgnore, size)) {
- XdcCatchStream.EraseFront(n);
- bytesToIgnore -= n;
- size -= n;
- }
- if (const size_t n = Min<size_t>(totalBytesToCatch, size)) {
- GetPerChannelContext(channel).DropFront(&XdcCatchStream, n);
- channels.insert(channel);
- totalBytesToCatch -= n;
- size -= n;
- }
- Y_VERIFY(!size);
- }
- for (ui16 channel : channels) {
- ProcessEvents(GetPerChannelContext(channel));
- }
-
- // ensure everything was processed
- Y_VERIFY(!XdcCatchStream);
- Y_VERIFY(!bytesToIgnore);
- Y_VERIFY(!totalBytesToCatch);
- XdcCatchStreamMarkup = {};
+ XdcCatchStream.Buffer = {};
+ XdcCatchStream.Applied = true;
}
-
- return progress;
}
bool TInputSessionTCP::ReadXdc(ui64 *numDataBytes) {
bool progress = ReadXdcCatchStream(numDataBytes);
// exit if we have no work to do
- if (XdcInputQ.empty() || XdcCatchStreamBytesPending) {
+ if (XdcInputQ.empty() || !XdcCatchStream.Applied) {
return progress;
}
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index b0b6dc0b66..487a598ebc 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -223,7 +223,7 @@ namespace NActors {
i64(*ev->Get()->Socket));
NewConnectionSet = TActivationContext::Now();
- PacketsWrittenToSocket = 0;
+ BytesWrittenToSocket = 0;
SendBufferSize = ev->Get()->Socket->GetSendBufferSize();
Socket = std::move(ev->Get()->Socket);
@@ -248,9 +248,9 @@ namespace NActors {
ReceiveContext->XdcReadPending = false;
// create input session actor
+ ReceiveContext->UnlockLastPacketSerialToConfirm();
auto actor = MakeHolder<TInputSessionTCP>(SelfId(), Socket, XdcSocket, ReceiveContext, Proxy->Common,
Proxy->Metrics, Proxy->PeerNodeId, nextPacket, GetDeadPeerTimeout(), Params);
- ReceiveContext->ResetLastPacketSerialToConfirm();
ReceiverId = Params.Encryption ? RegisterWithSameMailbox(actor.Release()) : Register(actor.Release(), TMailboxType::ReadAsFilled);
// register our socket in poller actor
@@ -274,37 +274,29 @@ namespace NActors {
//
// scan through send queue and leave only those packets who have data -- we will simply resend them; drop all other
// auxiliary packets; also reset packet metrics to zero to start sending from the beginning
- // also reset SendQueuePos
+ // also reset send queue
// drop confirmed packets first as we do not need unwanted retransmissions
- WriteSinglePacketAndDropConfirmed = false;
OutgoingStream.RewindToEnd();
XdcStream.RewindToEnd();
- SendQueuePos = SendQueue.size();
- SendOffset = 0;
+ OutgoingOffset = XdcOffset = Max<size_t>();
DropConfirmed(nextPacket);
OutgoingStream.Rewind();
XdcStream.Rewind();
- SendQueuePos = 0;
- SendOffset = 0;
+ OutgoingOffset = XdcOffset = 0;
ui64 serial = Max<ui64>();
- BytesUnwritten = 0;
for (const auto& packet : SendQueue) {
- if (packet.Data && packet.Serial < serial) {
+ if (packet.Data) {
serial = packet.Serial;
+ break;
}
- BytesUnwritten += packet.PacketSize;
}
Y_VERIFY(serial > LastConfirmed, "%s serial# %" PRIu64 " LastConfirmed# %" PRIu64, LogPrefix.data(), serial, LastConfirmed);
- LOG_DEBUG_IC_SESSION("ICS06", "rewind SendQueue size# %zu LastConfirmed# %" PRIu64 " SendQueuePos.Serial# %" PRIu64 "\n",
+ LOG_DEBUG_IC_SESSION("ICS06", "rewind SendQueue size# %zu LastConfirmed# %" PRIu64 " NextSerial# %" PRIu64,
SendQueue.size(), LastConfirmed, serial);
- Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize(), "%s", TString(TStringBuilder()
- << "BytesUnwritten# " << BytesUnwritten
- << " CalculateUnsentSize# " << OutgoingStream.CalculateUnsentSize()).data());
-
SwitchStuckPeriod();
LastHandshakeDone = TActivationContext::Now();
@@ -576,13 +568,14 @@ namespace NActors {
void TInterconnectSessionTCP::WriteData() {
Y_VERIFY(Socket); // ensure that socket wasn't closed
- // total bytes written during this call
- ui64 written = 0;
-
auto process = [&](NInterconnect::TOutgoingStream& stream, const TIntrusivePtr<NInterconnect::TStreamSocket>& socket,
- const TPollerToken::TPtr& token, bool *writeBlocked, size_t maxBytes, auto&& callback) {
+ const TPollerToken::TPtr& token, bool *writeBlocked) {
+ size_t totalWritten = 0;
+
+ *writeBlocked = false;
+
while (stream && socket) {
- ssize_t r = Write(stream, *socket, maxBytes);
+ ssize_t r = Write(stream, *socket);
if (r == -1) {
*writeBlocked = true;
if (token) {
@@ -593,59 +586,37 @@ namespace NActors {
break; // error condition
}
- *writeBlocked = false;
- written += r;
- callback(r);
- }
- if (!socket) {
- *writeBlocked = true;
- } else if (!stream) {
- *writeBlocked = false;
+ stream.Advance(r);
+ totalWritten += r;
}
- };
- Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize());
-
- Y_VERIFY_DEBUG(!WriteSinglePacketAndDropConfirmed || SendQueuePos != SendQueue.size());
- const size_t maxBytesFromMainStream = WriteSinglePacketAndDropConfirmed
- ? SendQueue[SendQueuePos].PacketSize - SendOffset
- : Max<size_t>();
- process(OutgoingStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked, maxBytesFromMainStream, [&](size_t r) {
- Y_VERIFY(r <= BytesUnwritten);
- BytesUnwritten -= r;
-
- OutgoingStream.Advance(r);
-
- ui64 packets = 0;
- Y_VERIFY_DEBUG(SendQueuePos != SendQueue.size());
- SendOffset += r;
- for (auto it = SendQueue.begin() + SendQueuePos; SendOffset && it->PacketSize <= SendOffset; ++SendQueuePos, ++it) {
- SendOffset -= it->PacketSize;
- Y_VERIFY_DEBUG(!it->Data || it->Serial <= OutputCounter);
- if (it->Data && LastSentSerial < it->Serial) {
- LastSentSerial = it->Serial;
- }
- ++PacketsWrittenToSocket;
- ++packets;
- Y_VERIFY_DEBUG(SendOffset == 0 || SendQueuePos != SendQueue.size() - 1);
+ return totalWritten;
+ };
- if (std::exchange(WriteSinglePacketAndDropConfirmed, false)) { // drop remaining already confirmed packets
- DropConfirmed(LastConfirmed);
- }
- }
+ // total bytes written during this call
+ ui64 written = 0;
- Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize());
- });
+ if (const size_t w = process(OutgoingStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked)) {
+ written += w;
+ BytesWrittenToSocket += w;
+ OutgoingOffset += w;
+ }
- process(XdcStream, XdcSocket, XdcPollerToken, &ReceiveContext->XdcWriteBlocked, Max<size_t>(), [&](size_t r) {
- XdcBytesSent += r;
- XdcStream.Advance(r);
- });
+ if (const size_t w = process(XdcStream, XdcSocket, XdcPollerToken, &ReceiveContext->XdcWriteBlocked)) {
+ written += w;
+ XdcBytesSent += w;
+ XdcOffset += w;
+ }
if (written) {
Proxy->Metrics->AddTotalBytesWritten(written);
}
+ if (DropConfirmed(LastConfirmed) && !RamInQueue) { // issue GenerateTraffic a bit later
+ RamInQueue = new TEvRam(false);
+ Send(SelfId(), RamInQueue);
+ }
+
const bool writeBlockedByFullSendBuffer = ReceiveContext->MainWriteBlocked || ReceiveContext->XdcWriteBlocked;
if (WriteBlockedByFullSendBuffer < writeBlockedByFullSendBuffer) { // became blocked
WriteBlockedCycles = GetCycleCountFast();
@@ -656,8 +627,7 @@ namespace NActors {
WriteBlockedByFullSendBuffer = writeBlockedByFullSendBuffer;
}
- ssize_t TInterconnectSessionTCP::Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket,
- size_t maxBytes) {
+ ssize_t TInterconnectSessionTCP::Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket) {
LWPROBE_IF_TOO_LONG(SlowICWriteData, Proxy->PeerNodeId, ms) {
constexpr ui32 iovLimit = 256;
@@ -679,21 +649,6 @@ namespace NActors {
stream.ProduceIoVec(wbuffers, maxElementsInIOV);
Y_VERIFY(!wbuffers.empty());
- if (maxBytes != Max<size_t>()) {
- for (auto it = wbuffers.begin(); it != wbuffers.end(); ++it) {
- if (maxBytes < it->Size) {
- if (maxBytes) {
- it->Size = maxBytes;
- ++it;
- }
- wbuffers.erase(it, wbuffers.end());
- break;
- } else {
- maxBytes -= it->Size;
- }
- }
- }
-
TString err;
ssize_t r = 0;
{ // issue syscall with timing
@@ -818,10 +773,6 @@ namespace NActors {
// count number of bytes pending for write
const size_t packetSize = packet.GetPacketSize();
- BytesUnwritten += packetSize;
- Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize(), "%s", TString(TStringBuilder()
- << "BytesUnwritten# " << BytesUnwritten << " packetSize# " << packetSize
- << " CalculateUnsentSize# " << OutgoingStream.CalculateUnsentSize()).data());
#ifndef NDEBUG
const size_t outgoingStreamSizeAfter = OutgoingStream.CalculateOutgoingSize();
@@ -844,8 +795,7 @@ namespace NActors {
});
LOG_DEBUG_IC_SESSION("ICS22", "outgoing packet Serial# %" PRIu64 " Confirm# %" PRIu64 " DataSize# %zu"
- " InflightDataAmount# %" PRIu64 " BytesUnwritten# %" PRIu64, serial, lastInputSerial, packet.GetDataSize(),
- InflightDataAmount, BytesUnwritten);
+ " InflightDataAmount# %" PRIu64, serial, lastInputSerial, packet.GetDataSize(), InflightDataAmount);
// reset forced packet sending timestamp as we have confirmed all received data
ResetFlushLogic();
@@ -858,59 +808,34 @@ namespace NActors {
bool TInterconnectSessionTCP::DropConfirmed(ui64 confirm) {
LOG_DEBUG_IC_SESSION("ICS23", "confirm count: %" PRIu64, confirm);
- Y_VERIFY(LastConfirmed <= confirm && confirm <= LastSentSerial && LastSentSerial <= OutputCounter,
- "%s confirm# %" PRIu64 " LastConfirmed# %" PRIu64 " OutputCounter# %" PRIu64 " LastSentSerial# %" PRIu64,
- LogPrefix.data(), confirm, LastConfirmed, OutputCounter, LastSentSerial);
+ Y_VERIFY(LastConfirmed <= confirm && confirm <= OutputCounter,
+ "%s confirm# %" PRIu64 " LastConfirmed# %" PRIu64 " OutputCounter# %" PRIu64,
+ LogPrefix.data(), confirm, LastConfirmed, OutputCounter);
LastConfirmed = confirm;
std::optional<ui64> lastDroppedSerial = 0;
ui32 numDropped = 0;
-#ifndef NDEBUG
- {
- size_t totalBytesInSendQueue = 0;
- size_t unsentBytesInSendQueue = 0;
- for (size_t i = 0; i < SendQueue.size(); ++i) {
- totalBytesInSendQueue += SendQueue[i].PacketSize;
- if (SendQueuePos <= i) {
- unsentBytesInSendQueue += SendQueue[i].PacketSize;
- }
- }
- unsentBytesInSendQueue -= SendOffset;
- Y_VERIFY(totalBytesInSendQueue == OutgoingStream.CalculateOutgoingSize());
- Y_VERIFY(unsentBytesInSendQueue == OutgoingStream.CalculateUnsentSize());
- }
-#endif
-
// drop confirmed packets; this also includes any auxiliary packets as their serial is set to zero, effectively
// making Serial <= confirm true
size_t bytesDropped = 0;
size_t bytesDroppedFromXdc = 0;
- for (; !SendQueue.empty(); SendQueue.pop_front(), --SendQueuePos) {
+ for (; !SendQueue.empty(); SendQueue.pop_front()) {
auto& front = SendQueue.front();
if (front.Data && confirm < front.Serial) {
break;
}
- if (!SendQueuePos) {
- // we are sending this packet right now; it might be a race -- we are resending packets after reconnection,
- // while getting confirmation for higher packet number (as we have sent it before); here we have to send
- // these packets (if necessary) and skip the rest of them
- if (!SendOffset && Socket && !Socket->ExpectingCertainWrite()) {
- // just advance to next packet -- we won't send current one
- ++SendQueuePos;
- } else {
- // can't skip packets now, make a mark and exit
- WriteSinglePacketAndDropConfirmed = true;
- break;
- }
+ if (OutgoingOffset < front.PacketSize || XdcOffset < front.ExternalSize) {
+ break; // packet wasn't actually sent to receiver, can't drop it now
}
if (front.Data) {
lastDroppedSerial.emplace(front.Serial);
}
+ OutgoingOffset -= front.PacketSize;
+ XdcOffset -= front.ExternalSize;
bytesDropped += front.PacketSize;
bytesDroppedFromXdc += front.ExternalSize;
++numDropped;
- Y_VERIFY_DEBUG(SendQueuePos != 0);
}
if (!numDropped) {
@@ -1238,7 +1163,6 @@ namespace NActors {
MON_VAR(MessagesGot)
MON_VAR(MessagesWrittenToBuffer)
MON_VAR(PacketsGenerated)
- MON_VAR(PacketsWrittenToSocket)
MON_VAR(PacketsConfirmed)
MON_VAR(ConfirmPacketsForcedBySize)
MON_VAR(ConfirmPacketsForcedByTimeout)
@@ -1284,7 +1208,6 @@ namespace NActors {
MON_VAR(SendQueue.size())
MON_VAR(NumEventsInReadyChannels)
MON_VAR(TotalOutputQueueSize)
- MON_VAR(BytesUnwritten)
MON_VAR(InflightDataAmount)
MON_VAR(unsentQueueSize)
MON_VAR(SendBufferSize)
@@ -1292,7 +1215,6 @@ namespace NActors {
MON_VAR(now - LastPayloadActivityTimestamp)
MON_VAR(LastHandshakeDone)
MON_VAR(OutputCounter)
- MON_VAR(LastSentSerial)
MON_VAR(LastConfirmed)
MON_VAR(FlushSchedule.size())
MON_VAR(MaxFlushSchedule)
@@ -1301,15 +1223,18 @@ namespace NActors {
MON_VAR(GetWriteBlockedTotal())
+ MON_VAR(BytesWrittenToSocket)
MON_VAR(XdcBytesSent)
MON_VAR(OutgoingStream.CalculateOutgoingSize())
MON_VAR(OutgoingStream.CalculateUnsentSize())
MON_VAR(OutgoingStream.GetSendQueueSize())
+ MON_VAR(OutgoingOffset)
MON_VAR(XdcStream.CalculateOutgoingSize())
MON_VAR(XdcStream.CalculateUnsentSize())
MON_VAR(XdcStream.GetSendQueueSize())
+ MON_VAR(XdcOffset)
TString clockSkew;
i64 x = GetClockSkew();
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h
index af6c14d824..f07cbfa4eb 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h
@@ -133,9 +133,12 @@ namespace NActors {
std::deque<TMutableContiguousSpan> XdcBuffers; // receive queue for current channel
size_t FetchIndex = 0;
size_t FetchOffset = 0;
- size_t XdcBytesToCatch = 0; // number of bytes to catch after reconnect
- void CalculateBytesToCatch();
+ ui64 XdcCatchBytesRead = 0; // number of bytes actually read into cyclic buffer
+ TRcBuf XdcCatchBuffer;
+
+ void PrepareCatchBuffer();
+ void ApplyCatchBuffer();
void FetchBuffers(ui16 channel, size_t numBytes, std::deque<std::tuple<ui16, TMutableContiguousSpan>>& outQ);
void DropFront(TRope *from, size_t numBytes);
};
@@ -174,8 +177,8 @@ namespace NActors {
}
}
- void ResetLastPacketSerialToConfirm() {
- LastPacketSerialToConfirm = LastProcessedSerial;
+ void UnlockLastPacketSerialToConfirm() {
+ LastPacketSerialToConfirm &= ~LastPacketSerialToConfirmLockBit;
}
ui64 GetLastPacketSerialToConfirm() {
@@ -258,6 +261,7 @@ namespace NActors {
PAYLOAD,
};
EState State = EState::HEADER;
+ ui64 CurrentSerial = 0;
std::vector<char> XdcCommands;
@@ -267,17 +271,19 @@ namespace NActors {
};
std::deque<TInboundPacket> InboundPacketQ;
std::deque<std::tuple<ui16, TMutableContiguousSpan>> XdcInputQ; // target buffers for the XDC stream with channel reference
-
- // catch stream -- used after TCP reconnect to match unread XDC stream with main packet stream
- TRope XdcCatchStream; // temporary data buffer to process XDC stream retransmission upon reconnect
- TRcBuf XdcCatchStreamBuffer;
- size_t XdcCatchStreamBufferOffset = 0;
- ui64 XdcCatchStreamBytesPending = 0;
- std::deque<std::tuple<ui16, ui16>> XdcCatchStreamMarkup; // a queue of pairs (channel, bytes)
std::deque<std::tuple<ui16, ui32>> XdcChecksumQ; // (size, expectedChecksum)
ui32 XdcCurrentChecksum = 0;
- bool XdcCatchStreamFinal = false;
- bool XdcCatchStreamFinalPending = false;
+
+ // catch stream -- used after TCP reconnect to match XDC stream with main packet stream
+ struct TXdcCatchStream {
+ TRcBuf Buffer;
+ ui64 BytesPending = 0;
+ ui64 BytesProcessed = 0;
+ std::deque<std::tuple<ui16, bool, size_t>> Markup; // a queue of tuples (channel, apply, bytes)
+ bool Ready = false;
+ bool Applied = false;
+ };
+ TXdcCatchStream XdcCatchStream;
THolder<TEvUpdateFromInputSession> UpdateFromInputSession;
@@ -295,13 +301,14 @@ namespace NActors {
void ReceiveData();
void ProcessHeader();
void ProcessPayload(ui64 *numDataBytes);
- void ProcessInboundPacketQ(size_t numXdcBytesRead);
+ void ProcessInboundPacketQ(ui64 numXdcBytesRead);
void ProcessXdcCommand(ui16 channel, TReceiveContext::TPerChannelContext& context);
void ProcessEvents(TReceiveContext::TPerChannelContext& context);
ssize_t Read(NInterconnect::TStreamSocket& socket, const TPollerToken::TPtr& token, bool *readPending,
const TIoVec *iov, size_t num);
bool ReadMore();
bool ReadXdcCatchStream(ui64 *numDataBytes);
+ void ApplyXdcCatchStream();
bool ReadXdc(ui64 *numDataBytes);
void HandleXdcChecksum(TContiguousSpan span);
@@ -384,7 +391,7 @@ namespace NActors {
ui64 MessagesGot = 0;
ui64 MessagesWrittenToBuffer = 0;
ui64 PacketsGenerated = 0;
- ui64 PacketsWrittenToSocket = 0;
+ ui64 BytesWrittenToSocket = 0;
ui64 PacketsConfirmed = 0;
public:
@@ -459,7 +466,7 @@ namespace NActors {
void Handle(TEvPollerReady::TPtr& ev);
void Handle(TEvPollerRegisterResult::TPtr ev);
void WriteData();
- ssize_t Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket, size_t maxBytes);
+ ssize_t Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket);
ui64 MakePacket(bool data, TMaybe<ui64> pingMask = {});
void FillSendingBuffer(TTcpPacketOutTask& packet, ui64 serial);
@@ -530,9 +537,8 @@ namespace NActors {
bool Data;
};
std::deque<TOutgoingPacket> SendQueue; // packet boundaries
- size_t SendQueuePos = 0; // packet being sent now
- size_t SendOffset = 0;
- bool WriteSinglePacketAndDropConfirmed = false;
+ size_t OutgoingOffset = 0;
+ size_t XdcOffset = 0;
ui64 XdcBytesSent = 0;
@@ -540,8 +546,6 @@ namespace NActors {
TDuration WriteBlockedTotal; // total incremental duration that session has been blocked
bool WriteBlockedByFullSendBuffer = false;
- ui64 BytesUnwritten = 0; // number of bytes in outgoing main queue
-
TDuration GetWriteBlockedTotal() const {
return WriteBlockedTotal + (WriteBlockedByFullSendBuffer
? TDuration::Seconds(NHPTimer::GetSeconds(GetCycleCountFast() - WriteBlockedCycles))
@@ -549,7 +553,6 @@ namespace NActors {
}
ui64 OutputCounter;
- ui64 LastSentSerial = 0;
TInstant LastHandshakeDone;
diff --git a/library/cpp/actors/interconnect/outgoing_stream.h b/library/cpp/actors/interconnect/outgoing_stream.h
index deffd1d93b..7c32911807 100644
--- a/library/cpp/actors/interconnect/outgoing_stream.h
+++ b/library/cpp/actors/interconnect/outgoing_stream.h
@@ -163,7 +163,8 @@ namespace NInterconnect {
front.Span = front.Span.SubSpan(numBytes, Max<size_t>());
if (SendQueuePos == 0) {
Y_VERIFY_DEBUG(numBytes <= SendOffset, "numBytes# %zu SendOffset# %zu SendQueuePos# %zu"
- " SendQueue.size# %zu", numBytes, SendOffset, SendQueuePos, SendQueue.size());
+ " SendQueue.size# %zu CalculateUnsentSize# %zu", numBytes, SendOffset, SendQueuePos,
+ SendQueue.size(), CalculateUnsentSize());
SendOffset -= numBytes;
}
break;