aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-04-19 13:31:50 +0300
committeralexvru <alexvru@ydb.tech>2023-04-19 13:31:50 +0300
commit7653175c3b4bd262510d7dbfdf5dc30ceb777d75 (patch)
treed33ef2f5490f7f8dbab2e0e9ac8c3b13f26e8111 /library/cpp/actors/interconnect
parent9143f30e39d2d0d9d81048632bce5f25de587b3f (diff)
downloadydb-7653175c3b4bd262510d7dbfdf5dc30ceb777d75.tar.gz
Rework interconnect sending stream machinery
Diffstat (limited to 'library/cpp/actors/interconnect')
-rw-r--r--library/cpp/actors/interconnect/event_holder_pool.h1
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.cpp78
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp39
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp208
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.h25
-rw-r--r--library/cpp/actors/interconnect/outgoing_stream.h224
-rw-r--r--library/cpp/actors/interconnect/packet.h177
-rw-r--r--library/cpp/actors/interconnect/ut/CMakeLists.darwin-x86_64.txt1
-rw-r--r--library/cpp/actors/interconnect/ut/CMakeLists.linux-aarch64.txt1
-rw-r--r--library/cpp/actors/interconnect/ut/CMakeLists.linux-x86_64.txt1
-rw-r--r--library/cpp/actors/interconnect/ut/CMakeLists.windows-x86_64.txt1
-rw-r--r--library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp4
-rw-r--r--library/cpp/actors/interconnect/ut/lib/node.h11
-rw-r--r--library/cpp/actors/interconnect/ut/outgoing_stream_ut.cpp147
14 files changed, 612 insertions, 306 deletions
diff --git a/library/cpp/actors/interconnect/event_holder_pool.h b/library/cpp/actors/interconnect/event_holder_pool.h
index b6090a3bc8..59df9bd4c8 100644
--- a/library/cpp/actors/interconnect/event_holder_pool.h
+++ b/library/cpp/actors/interconnect/event_holder_pool.h
@@ -8,7 +8,6 @@ namespace NActors {
struct TEvFreeItems : TEventLocal<TEvFreeItems, EventSpaceBegin(TEvents::ES_PRIVATE)> {
static constexpr size_t MaxEvents = 256;
- TList<TTcpPacketOutTask> Items;
std::list<TEventHolder> FreeQueue;
TStackVec<THolder<IEventBase>, MaxEvents> Events;
TStackVec<THolder<TEventSerializedData>, MaxEvents> Buffers;
diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp
index 316d57551d..64a6bbbd09 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.cpp
+++ b/library/cpp/actors/interconnect/interconnect_channel.cpp
@@ -12,8 +12,7 @@ LWTRACE_USING(ACTORLIB_PROVIDER);
namespace NActors {
bool TEventOutputChannel::FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) {
- const size_t descrSize = sizeof(TEventDescr2);
- const size_t amount = sizeof(TChannelPart) + descrSize;
+ const size_t amount = sizeof(TChannelPart) + sizeof(TEventDescr2);
if (task.GetVirtualFreeAmount() < amount) {
return false;
}
@@ -25,28 +24,33 @@ namespace NActors {
task.Orbit.Take(event.Orbit);
Y_VERIFY(SerializationInfo);
- event.Descr.Flags = (event.Descr.Flags & ~IEventHandle::FlagForwardOnNondelivery) |
+ const ui32 flags = (event.Descr.Flags & ~IEventHandle::FlagForwardOnNondelivery) |
(SerializationInfo->IsExtendedFormat ? IEventHandle::FlagExtendedFormat : 0);
- TChannelPart *part = static_cast<TChannelPart*>(task.GetFreeArea());
- part->Channel = ChannelId | TChannelPart::LastPartFlag;
- part->Size = descrSize;
-
- auto *p = reinterpret_cast<TEventDescr2*>(part + 1);
- *p = {
+ // prepare descriptor record
+ TEventDescr2 descr{
event.Descr.Type,
- event.Descr.Flags,
+ flags,
event.Descr.Recipient,
event.Descr.Sender,
event.Descr.Cookie,
{},
event.Descr.Checksum
};
- traceId.Serialize(&p->TraceId);
+ traceId.Serialize(&descr.TraceId);
+
+ // and channel header before the descriptor
+ TChannelPart part{
+ .Channel = static_cast<ui16>(ChannelId | TChannelPart::LastPartFlag),
+ .Size = sizeof(descr),
+ };
+
+ // append them to the packet
+ task.Write(&part, sizeof(part));
+ task.Write(&descr, sizeof(descr));
- task.AppendBuf(part, amount);
*weightConsumed += amount;
- OutputQueueSize -= part->Size;
+ OutputQueueSize -= part.Size;
Metrics->UpdateOutputChannelEvents(ChannelId);
return true;
@@ -75,7 +79,9 @@ namespace NActors {
} else if (event.Event) {
State = EState::CHUNKER;
IEventBase *base = event.Event.Get();
- Chunker.SetSerializingEvent(base);
+ if (event.EventSerializedSize) {
+ Chunker.SetSerializingEvent(base);
+ }
SerializationInfoContainer = base->CreateSerializationInfo();
SerializationInfo = &SerializationInfoContainer;
} else { // event without buffer and IEventBase instance
@@ -83,28 +89,28 @@ namespace NActors {
SerializationInfoContainer = {};
SerializationInfo = &SerializationInfoContainer;
}
+ if (!event.EventSerializedSize) {
+ State = EState::DESCRIPTOR;
+ }
break;
case EState::CHUNKER:
case EState::BUFFER: {
- size_t maxBytes = task.GetVirtualFreeAmount();
- if (maxBytes <= sizeof(TChannelPart)) {
+ if (task.GetVirtualFreeAmount() <= sizeof(TChannelPart)) {
return false;
}
- TChannelPart *part = static_cast<TChannelPart*>(task.GetFreeArea());
- part->Channel = ChannelId;
- part->Size = 0;
- task.AppendBuf(part, sizeof(TChannelPart));
- maxBytes -= sizeof(TChannelPart);
- Y_VERIFY(maxBytes);
+ TChannelPart part{
+ .Channel = ChannelId,
+ .Size = 0,
+ };
+
+ auto partBookmark = task.Bookmark(sizeof(part));
auto addChunk = [&](const void *data, size_t len) {
event.UpdateChecksum(data, len);
- task.AppendBuf(data, len);
- part->Size += len;
- Y_VERIFY_DEBUG(maxBytes >= len);
- maxBytes -= len;
+ task.Append(data, len);
+ part.Size += len;
event.EventActuallySerialized += len;
if (event.EventActuallySerialized > MaxSerializedEventSize) {
@@ -114,18 +120,18 @@ namespace NActors {
bool complete = false;
if (State == EState::CHUNKER) {
- Y_VERIFY_DEBUG(task.GetFreeArea() == part + 1);
- while (!complete && maxBytes) {
- const auto [first, last] = Chunker.FeedBuf(task.GetFreeArea(), maxBytes);
+ while (!complete && !task.IsFull()) {
+ TMutableContiguousSpan out = task.AcquireSpanForWriting();
+ const auto [first, last] = Chunker.FeedBuf(out.data(), out.size());
for (auto p = first; p != last; ++p) {
addChunk(p->first, p->second);
}
complete = Chunker.IsComplete();
}
Y_VERIFY(!complete || Chunker.IsSuccessfull());
- Y_VERIFY_DEBUG(complete || !maxBytes);
+ Y_VERIFY_DEBUG(complete || task.IsFull());
} else { // BUFFER
- while (const size_t numb = Min(maxBytes, Iter.ContiguousSize())) {
+ while (const size_t numb = Min(task.GetVirtualFreeAmount(), Iter.ContiguousSize())) {
const char *obuf = Iter.ContiguousData();
addChunk(obuf, numb);
Iter += numb;
@@ -138,12 +144,10 @@ namespace NActors {
event.EventActuallySerialized, event.EventSerializedSize, event.Descr.Type);
}
- if (!part->Size) {
- task.Undo(sizeof(TChannelPart));
- } else {
- *weightConsumed += sizeof(TChannelPart) + part->Size;
- OutputQueueSize -= part->Size;
- }
+ Y_VERIFY_DEBUG(part.Size);
+ task.WriteBookmark(std::exchange(partBookmark, {}), &part, sizeof(part));
+ *weightConsumed += sizeof(TChannelPart) + part.Size;
+ OutputQueueSize -= part.Size;
if (complete) {
State = EState::DESCRIPTOR;
}
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
index ad2da30ec3..34770133ae 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
@@ -93,7 +93,7 @@ namespace NActors {
switch (State) {
case EState::HEADER:
- if (IncomingData.GetSize() < sizeof(Header)) {
+ if (IncomingData.GetSize() < sizeof(TTcpPacketHeader_v2)) {
break;
} else {
ProcessHeader();
@@ -172,14 +172,15 @@ namespace NActors {
}
void TInputSessionTCP::ProcessHeader() {
- const bool success = IncomingData.ExtractFrontPlain(&Header, sizeof(Header));
+ TTcpPacketHeader_v2 header;
+ const bool success = IncomingData.ExtractFrontPlain(&header, sizeof(header));
Y_VERIFY(success);
- PayloadSize = Header.PayloadLength;
- HeaderSerial = Header.Serial;
- HeaderConfirm = Header.Confirm;
+ PayloadSize = header.PayloadLength;
+ const ui64 serial = header.Serial;
+ const ui64 confirm = header.Confirm;
if (!Params.Encryption) {
- ChecksumExpected = std::exchange(Header.Checksum, 0);
- Checksum = Crc32cExtendMSanCompatible(0, &Header, sizeof(Header)); // start calculating checksum now
+ ChecksumExpected = std::exchange(header.Checksum, 0);
+ Checksum = Crc32cExtendMSanCompatible(0, &header, sizeof(header)); // start calculating checksum now
if (!PayloadSize && Checksum != ChecksumExpected) {
LOG_ERROR_IC_SESSION("ICIS10", "payload checksum error");
return ReestablishConnection(TDisconnectReason::ChecksumError());
@@ -189,9 +190,9 @@ namespace NActors {
LOG_CRIT_IC_SESSION("ICIS07", "payload is way too big");
return DestroySession(TDisconnectReason::FormatError());
}
- if (ConfirmedByInput < HeaderConfirm) {
- ConfirmedByInput = HeaderConfirm;
- if (AtomicGet(Context->ControlPacketId) <= HeaderConfirm && !NewPingProtocol) {
+ if (ConfirmedByInput < confirm) {
+ ConfirmedByInput = confirm;
+ if (AtomicGet(Context->ControlPacketId) <= confirm && !NewPingProtocol) {
ui64 sendTime = AtomicGet(Context->ControlPacketSendTimer);
TDuration duration = CyclesToDuration(GetCycleCountFast() - sendTime);
const auto durationUs = duration.MicroSeconds();
@@ -205,20 +206,20 @@ namespace NActors {
}
if (PayloadSize) {
const ui64 expected = Context->GetLastProcessedPacketSerial() + 1;
- if (HeaderSerial == 0 || HeaderSerial > expected) {
- LOG_CRIT_IC_SESSION("ICIS06", "packet serial %" PRIu64 ", but %" PRIu64 " expected", HeaderSerial, expected);
+ if (serial == 0 || serial > expected) {
+ LOG_CRIT_IC_SESSION("ICIS06", "packet serial %" PRIu64 ", but %" PRIu64 " expected", serial, expected);
return DestroySession(TDisconnectReason::FormatError());
}
- IgnorePayload = HeaderSerial != expected;
+ IgnorePayload = serial != expected;
State = EState::PAYLOAD;
- } else if (HeaderSerial & TTcpPacketBuf::PingRequestMask) {
- Send(SessionId, new TEvProcessPingRequest(HeaderSerial & ~TTcpPacketBuf::PingRequestMask));
- } else if (HeaderSerial & TTcpPacketBuf::PingResponseMask) {
- const ui64 sent = HeaderSerial & ~TTcpPacketBuf::PingResponseMask;
+ } else if (serial & TTcpPacketBuf::PingRequestMask) {
+ Send(SessionId, new TEvProcessPingRequest(serial & ~TTcpPacketBuf::PingRequestMask));
+ } else if (serial & TTcpPacketBuf::PingResponseMask) {
+ const ui64 sent = serial & ~TTcpPacketBuf::PingResponseMask;
const ui64 received = GetCycleCountFast();
HandlePingResponse(CyclesToDuration(received - sent));
- } else if (HeaderSerial & TTcpPacketBuf::ClockMask) {
- HandleClock(TInstant::MicroSeconds(HeaderSerial & ~TTcpPacketBuf::ClockMask));
+ } else if (serial & TTcpPacketBuf::ClockMask) {
+ HandleClock(TInstant::MicroSeconds(serial & ~TTcpPacketBuf::ClockMask));
}
}
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index 72982c1458..16cadc9e92 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -271,37 +271,27 @@ namespace NActors {
// also reset SendQueuePos
// drop confirmed packets first as we do not need unwanted retransmissions
- SendQueuePos = SendQueue.end();
- DropConfirmed(nextPacket);
+ DropConfirmed(nextPacket, true);
+ OutgoingStream.Rewind();
+ SendQueuePos = 0;
+ SendOffset = 0;
- for (TSendQueue::iterator it = SendQueue.begin(); it != SendQueue.end(); ) {
- const TSendQueue::iterator next = std::next(it);
- if (it->IsEmpty()) {
- SendQueueCache.splice(SendQueueCache.begin(), SendQueue, it);
- } else {
- it->ResetBufs();
- }
- it = next;
- }
- TrimSendQueueCache();
- SendQueuePos = SendQueue.begin();
-
- TMaybe<ui64> s;
- for (auto it = SendQueuePos; it != SendQueue.end(); ++it) {
- if (!it->IsEmpty()) {
- s = it->GetSerial();
+ ui64 serial = Max<ui64>();
+ BytesUnwritten = 0;
+ for (const auto& packet : SendQueue) {
+ if (packet.Data && packet.Serial < serial) {
+ serial = packet.Serial;
}
+ BytesUnwritten += packet.PacketSize;
}
- const ui64 serial = s.GetOrElse(Max<ui64>());
Y_VERIFY(serial > LastConfirmed, "%s serial# %" PRIu64 " LastConfirmed# %" PRIu64, LogPrefix.data(), serial, LastConfirmed);
LOG_DEBUG_IC_SESSION("ICS06", "rewind SendQueue size# %zu LastConfirmed# %" PRIu64 " SendQueuePos.Serial# %" PRIu64 "\n",
SendQueue.size(), LastConfirmed, serial);
- BytesUnwritten = 0;
- for (const auto& packet : SendQueue) {
- BytesUnwritten += sizeof(TTcpPacketHeader_v2) + packet.GetDataSize();
- }
+ Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize(), "%s", TString(TStringBuilder()
+ << "BytesUnwritten# " << BytesUnwritten
+ << " CalculateUnsentSize# " << OutgoingStream.CalculateUnsentSize()).data());
SwitchStuckPeriod();
@@ -581,9 +571,21 @@ namespace NActors {
LOG_DEBUG_IC_SESSION("ICS30", "WriteData WriteBlockedByFullSendBuffer# %s SendQueue.size# %zu",
ReceiveContext->WriteBlockedByFullSendBuffer ? "true" : "false", SendQueue.size());
- while (SendQueuePos != SendQueue.end() && !ReceiveContext->WriteBlockedByFullSendBuffer) {
- for (auto it = SendQueuePos; it != SendQueue.end() && wbuffers.size() < maxElementsInIOV; ++it) {
- it->AppendToIoVector(wbuffers, maxElementsInIOV);
+ auto calculateUnsentQueueSize = [&] {
+ size_t res = -SendOffset;
+ for (auto it = SendQueue.begin() + SendQueuePos; it != SendQueue.end(); ++it) {
+ res += it->PacketSize;
+ }
+ return res;
+ };
+
+ while (!ReceiveContext->WriteBlockedByFullSendBuffer) {
+ Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize());
+ Y_VERIFY_DEBUG(BytesUnwritten == calculateUnsentQueueSize());
+
+ OutgoingStream.ProduceIoVec(wbuffers, maxElementsInIOV);
+ if (!wbuffers) { // done sending
+ break;
}
const struct iovec* iovec = reinterpret_cast<const struct iovec*>(wbuffers.data());
@@ -610,21 +612,31 @@ namespace NActors {
wbuffers.clear();
if (r > 0) {
+ written += r;
+
Y_VERIFY(static_cast<size_t>(r) <= BytesUnwritten);
BytesUnwritten -= r;
- written += r;
- ui64 packets = 0;
- // advance SendQueuePos to eat all processed items
- for (size_t amount = r; amount && SendQueuePos->DropBufs(amount); ++SendQueuePos) {
- if (!SendQueuePos->IsEmpty()) {
- LastSentSerial = Max(LastSentSerial, SendQueuePos->GetSerial());
+ OutgoingStream.Advance(r);
+
+ ui64 packets = 0;
+ Y_VERIFY_DEBUG(SendQueuePos != SendQueue.size());
+ SendOffset += r;
+ for (auto it = SendQueue.begin() + SendQueuePos; SendOffset && it->PacketSize <= SendOffset; ++SendQueuePos, ++it) {
+ SendOffset -= it->PacketSize;
+ Y_VERIFY_DEBUG(!it->Data || it->Serial <= OutputCounter);
+ if (it->Data && LastSentSerial < it->Serial) {
+ LastSentSerial = it->Serial;
}
++PacketsWrittenToSocket;
++packets;
- LWTRACK(PacketWrittenToSocket, SendQueuePos->Orbit, Proxy->PeerNodeId, PacketsWrittenToSocket, SendQueuePos->TriedWriting, SendQueuePos->GetDataSize(), BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket);
+ Y_VERIFY_DEBUG(SendOffset == 0 || SendQueuePos != SendQueue.size() - 1);
+// LWTRACK(PacketWrittenToSocket, SendQueuePos->Orbit, Proxy->PeerNodeId, PacketsWrittenToSocket, false, SendQueuePos->PacketSize, BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket);
}
+ Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize());
+ Y_VERIFY_DEBUG(BytesUnwritten == calculateUnsentQueueSize());
+
LWPROBE(WriteToSocket, Proxy->PeerNodeId, r, packets, PacketsWrittenToSocket, BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket);
} else if (-r != EAGAIN && -r != EWOULDBLOCK) {
const TString message = r == 0 ? "connection closed by peer"
@@ -636,12 +648,6 @@ namespace NActors {
}
return ReestablishConnectionWithHandshake(r == 0 ? TDisconnectReason::EndOfStream() : TDisconnectReason::FromErrno(-r));
} else {
- // we have to do some hack for secure socket -- mark the packet as 'tried writing'
- if (Params.Encryption) {
- Y_VERIFY(SendQueuePos != SendQueue.end());
- SendQueuePos->MarkTriedWriting(); // do not try to replace buffer under SSL
- }
-
// we have received EAGAIN error code, this means that we can't issue more data until we have received
// TEvPollerReadyWrite event from poller; set up flag meaning this and wait for that event
Y_VERIFY(!ReceiveContext->WriteBlockedByFullSendBuffer);
@@ -711,45 +717,10 @@ namespace NActors {
}
}
- void TInterconnectSessionTCP::TrimSendQueueCache() {
- static constexpr size_t maxItems = 32;
- static constexpr size_t trimThreshold = maxItems * 2;
- if (SendQueueCache.size() >= trimThreshold) {
- auto it = SendQueueCache.end();
- for (size_t n = SendQueueCache.size() - maxItems; n; --n) {
- --it;
- }
-
- auto ev = std::make_unique<TEvFreeItems>();
- ev->Items.splice(ev->Items.end(), SendQueueCache, it, SendQueueCache.end());
- ev->NumBytes = ev->Items.size() * sizeof(TTcpPacketOutTask);
- if (ev->GetInLineForDestruction(Proxy->Common)) {
- Send(Proxy->Common->DestructorId, ev.release());
- }
- }
- }
-
ui64 TInterconnectSessionTCP::MakePacket(bool data, TMaybe<ui64> pingMask) {
Y_VERIFY(Socket);
- TSendQueue::iterator packet;
- if (SendQueueCache) {
- // we have entries in cache, take one and move it to the end of SendQueue
- packet = SendQueueCache.begin();
- SendQueue.splice(SendQueue.end(), SendQueueCache, packet);
- packet->Reuse(); // reset packet to initial state
- } else {
- // we have to allocate new packet, so just do it
- LWPROBE_IF_TOO_LONG(SlowICAllocPacketBuffer, Proxy->PeerNodeId, ms) {
- packet = SendQueue.emplace(SendQueue.end(), Params);
- }
- }
-
- // update send queue position
- if (SendQueuePos == SendQueue.end()) {
- SendQueuePos = packet; // start sending this packet if we are not sending anything for now
- }
-
+ TTcpPacketOutTask packet(Params, OutgoingStream);
ui64 serial = 0;
if (data) {
@@ -759,12 +730,12 @@ namespace NActors {
// fill the data packet
Y_VERIFY(NumEventsInReadyChannels);
LWPROBE_IF_TOO_LONG(SlowICFillSendingBuffer, Proxy->PeerNodeId, ms) {
- FillSendingBuffer(*packet, serial);
+ FillSendingBuffer(packet, serial);
}
- Y_VERIFY(!packet->IsEmpty());
+ Y_VERIFY(!packet.IsEmpty());
- InflightDataAmount += packet->GetDataSize();
- Proxy->Metrics->AddInflightDataAmount(packet->GetDataSize());
+ InflightDataAmount += packet.GetDataSize();
+ Proxy->Metrics->AddInflightDataAmount(packet.GetDataSize());
if (InflightDataAmount > GetTotalInflightAmountOfData()) {
Proxy->Metrics->IncInflyLimitReach();
}
@@ -778,39 +749,31 @@ namespace NActors {
LastPayloadActivityTimestamp = TActivationContext::Monotonic();
} else if (pingMask) {
serial = *pingMask;
-
- // make this packet a priority one
- if (SendQueuePos != packet) {
- Y_VERIFY(SendQueuePos != SendQueue.end());
- if (SendQueuePos->IsAtBegin()) {
- // insert this packet just before the next being sent and step back
- SendQueue.splice(SendQueuePos, SendQueue, packet);
- --SendQueuePos;
- Y_VERIFY(SendQueuePos == packet);
- } else {
- // current packet is already being sent, so move new packet just after it
- SendQueue.splice(std::next(SendQueuePos), SendQueue, packet);
- }
- }
}
const ui64 lastInputSerial = ReceiveContext->GetLastProcessedPacketSerial();
- packet->SetMetadata(serial, lastInputSerial);
- packet->Sign();
+
+ packet.Finish(serial, lastInputSerial);
// count number of bytes pending for write
- ui64 packetSize = sizeof(TTcpPacketHeader_v2) + packet->GetDataSize();
+ const size_t packetSize = packet.GetFullSize();
BytesUnwritten += packetSize;
+ Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize(), "%s", TString(TStringBuilder()
+ << "BytesUnwritten# " << BytesUnwritten << " packetSize# " << packetSize
+ << " CalculateUnsentSize# " << OutgoingStream.CalculateUnsentSize()).data());
+
+ // put outgoing packet metadata here
+ SendQueue.push_back(TOutgoingPacket{packetSize, serial, data});
LOG_DEBUG_IC_SESSION("ICS22", "outgoing packet Serial# %" PRIu64 " Confirm# %" PRIu64 " DataSize# %zu"
- " InflightDataAmount# %" PRIu64 " BytesUnwritten# %" PRIu64, serial, lastInputSerial, packet->GetDataSize(),
+ " InflightDataAmount# %" PRIu64 " BytesUnwritten# %" PRIu64, serial, lastInputSerial, packet.GetDataSize(),
InflightDataAmount, BytesUnwritten);
// reset forced packet sending timestamp as we have confirmed all received data
ResetFlushLogic();
++PacketsGenerated;
- LWTRACK(PacketGenerated, packet->Orbit, Proxy->PeerNodeId, BytesUnwritten, InflightDataAmount, PacketsGenerated, packetSize);
+ LWTRACK(PacketGenerated, packet.Orbit, Proxy->PeerNodeId, BytesUnwritten, InflightDataAmount, PacketsGenerated, packetSize);
if (!data) {
WriteData();
@@ -819,7 +782,7 @@ namespace NActors {
return packetSize;
}
- bool TInterconnectSessionTCP::DropConfirmed(ui64 confirm) {
+ bool TInterconnectSessionTCP::DropConfirmed(ui64 confirm, bool ignoreSendQueuePos) {
LOG_DEBUG_IC_SESSION("ICS23", "confirm count: %" PRIu64, confirm);
Y_VERIFY(LastConfirmed <= confirm && confirm <= LastSentSerial && LastSentSerial <= OutputCounter,
@@ -828,24 +791,48 @@ namespace NActors {
LastConfirmed = confirm;
ui64 droppedDataAmount = 0;
+ std::optional<ui64> lastDroppedSerial = 0;
ui32 numDropped = 0;
+#ifndef NDEBUG
+ {
+ size_t totalBytesInSendQueue = 0;
+ size_t unsentBytesInSendQueue = 0;
+ for (size_t i = 0; i < SendQueue.size(); ++i) {
+ totalBytesInSendQueue += SendQueue[i].PacketSize;
+ if (i > SendQueuePos) {
+ unsentBytesInSendQueue += SendQueue[i].PacketSize;
+ } else if (i == SendQueuePos) {
+ unsentBytesInSendQueue += SendQueue[i].PacketSize - SendOffset;
+ }
+ }
+ Y_VERIFY(totalBytesInSendQueue == OutgoingStream.CalculateOutgoingSize());
+ Y_VERIFY(ignoreSendQueuePos || unsentBytesInSendQueue == OutgoingStream.CalculateUnsentSize());
+ }
+#endif
+
// drop confirmed packets; this also includes any auxiliary packets as their serial is set to zero, effectively
// making Serial <= confirm true
- TSendQueue::iterator it;
- ui64 lastDroppedSerial = 0;
- for (it = SendQueue.begin(); it != SendQueuePos && it->Confirmed(confirm); ++it) {
- if (!it->IsEmpty()) {
- lastDroppedSerial = it->GetSerial();
+ size_t bytesDropped = 0;
+ for (; !SendQueue.empty(); SendQueue.pop_front(), --SendQueuePos) {
+ auto& front = SendQueue.front();
+ if (front.Data && confirm < front.Serial) {
+ break;
+ }
+ if (front.Data) {
+ lastDroppedSerial.emplace(front.Serial);
}
- droppedDataAmount += it->GetDataSize();
+ droppedDataAmount += front.PacketSize - sizeof(TTcpPacketHeader_v2);
+ bytesDropped += front.PacketSize;
++numDropped;
+ Y_VERIFY_DEBUG(ignoreSendQueuePos || SendQueuePos != 0);
+ }
+ OutgoingStream.DropFront(bytesDropped);
+ if (lastDroppedSerial) {
+ ChannelScheduler->ForEach([&](TEventOutputChannel& channel) {
+ channel.DropConfirmed(*lastDroppedSerial);
+ });
}
- SendQueueCache.splice(SendQueueCache.begin(), SendQueue, SendQueue.begin(), it);
- TrimSendQueueCache();
- ChannelScheduler->ForEach([&](TEventOutputChannel& channel) {
- channel.DropConfirmed(lastDroppedSerial);
- });
const ui64 current = InflightDataAmount;
const ui64 limit = GetTotalInflightAmountOfData();
@@ -1202,7 +1189,6 @@ namespace NActors {
MON_VAR(OutputStuckFlag)
MON_VAR(SendQueue.size())
- MON_VAR(SendQueueCache.size())
MON_VAR(NumEventsInReadyChannels)
MON_VAR(TotalOutputQueueSize)
MON_VAR(BytesUnwritten)
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h
index 7d0e003a34..48fa39b273 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h
@@ -24,6 +24,7 @@
#include "watchdog_timer.h"
#include "event_holder_pool.h"
#include "channel_scheduler.h"
+#include "outgoing_stream.h"
#include <unordered_set>
#include <unordered_map>
@@ -217,10 +218,6 @@ namespace NActors {
const ui32 NodeId;
const TSessionParams Params;
- // header we are currently processing (parsed from the stream)
- TTcpPacketHeader_v2 Header;
- ui64 HeaderConfirm, HeaderSerial;
-
size_t PayloadSize;
ui32 ChecksumExpected, Checksum;
bool IgnorePayload;
@@ -233,6 +230,7 @@ namespace NActors {
THolder<TEvUpdateFromInputSession> UpdateFromInputSession;
+ std::optional<ui64> LastReceivedSerial;
ui64 ConfirmedByInput;
std::shared_ptr<IInterconnectMetrics> Metrics;
@@ -390,7 +388,7 @@ namespace NActors {
ui64 MakePacket(bool data, TMaybe<ui64> pingMask = {});
void FillSendingBuffer(TTcpPacketOutTask& packet, ui64 serial);
- bool DropConfirmed(ui64 confirm);
+ bool DropConfirmed(ui64 confirm, bool ignoreSendQueuePos = false);
void ShutdownSocket(TDisconnectReason reason);
void StartHandshake();
@@ -447,16 +445,21 @@ namespace NActors {
void SetOutputStuckFlag(bool state);
void SwitchStuckPeriod();
- using TSendQueue = TList<TTcpPacketOutTask>;
- TSendQueue SendQueue;
- TSendQueue SendQueueCache;
- TSendQueue::iterator SendQueuePos;
+ NInterconnect::TOutgoingStream OutgoingStream;
+
+ struct TOutgoingPacket {
+ size_t PacketSize;
+ ui64 Serial;
+ bool Data;
+ };
+ std::deque<TOutgoingPacket> SendQueue; // packet boundaries
+ size_t SendQueuePos = 0; // packet being sent now
+ size_t SendOffset = 0;
+
ui64 WriteBlockedCycles = 0; // start of current block period
TDuration WriteBlockedTotal; // total incremental duration that session has been blocked
ui64 BytesUnwritten = 0;
- void TrimSendQueueCache();
-
TDuration GetWriteBlockedTotal() const {
if (ReceiveContext->WriteBlockedByFullSendBuffer) {
double blockedUs = NHPTimer::GetSeconds(GetCycleCountFast() - WriteBlockedCycles) * 1000000.0;
diff --git a/library/cpp/actors/interconnect/outgoing_stream.h b/library/cpp/actors/interconnect/outgoing_stream.h
new file mode 100644
index 0000000000..0306295b52
--- /dev/null
+++ b/library/cpp/actors/interconnect/outgoing_stream.h
@@ -0,0 +1,224 @@
+#pragma once
+
+#include <library/cpp/actors/core/event_load.h>
+#include <library/cpp/actors/util/rc_buf.h>
+#include <library/cpp/containers/stack_vector/stack_vec.h>
+#include <deque>
+
+namespace NInterconnect {
+
+ template<size_t TotalSize>
+ class TOutgoingStreamT {
+ static constexpr size_t BufferSize = TotalSize - sizeof(ui32) * 2;
+
+ struct TBuffer {
+ char Data[BufferSize];
+ ui32 RefCount;
+ ui32 Index;
+
+ struct TDeleter {
+ void operator ()(TBuffer *buffer) const {
+ free(buffer);
+ }
+ };
+ };
+
+ static_assert(sizeof(TBuffer) == TotalSize);
+
+ struct TSendChunk {
+ TContiguousSpan Span;
+ TBuffer *Buffer;
+ };
+
+ std::vector<std::unique_ptr<TBuffer, typename TBuffer::TDeleter>> Buffers;
+ TBuffer *AppendBuffer = nullptr;
+ size_t AppendOffset = BufferSize; // into the last buffer
+ std::deque<TSendChunk> SendQueue;
+ size_t SendQueuePos = 0;
+ size_t SendOffset = 0;
+
+ public:
+ size_t CalculateOutgoingSize() const {
+ size_t res = 0;
+ for (const TSendChunk& chunk : SendQueue) {
+ res += chunk.Span.size();
+ }
+ return res;
+ }
+
+ size_t CalculateUnsentSize() const {
+ size_t res = 0;
+ for (auto it = SendQueue.begin() + SendQueuePos; it != SendQueue.end(); ++it) {
+ res += it->Span.size();
+ }
+ return res - SendOffset;
+ }
+
+ TMutableContiguousSpan AcquireSpanForWriting(size_t maxLen) {
+ if (AppendOffset == BufferSize) { // we have no free buffer, allocate one
+ Buffers.emplace_back(static_cast<TBuffer*>(malloc(sizeof(TBuffer))));
+ AppendBuffer = Buffers.back().get();
+ Y_VERIFY(AppendBuffer);
+ AppendBuffer->RefCount = 1; // through AppendBuffer pointer
+ AppendBuffer->Index = Buffers.size() - 1;
+ AppendOffset = 0;
+ }
+ return {AppendBuffer->Data + AppendOffset, Min(maxLen, BufferSize - AppendOffset)};
+ }
+
+ void Append(TContiguousSpan span) {
+ TBuffer *buffer = nullptr;
+ if (AppendBuffer && span.data() == AppendBuffer->Data + AppendOffset) { // the only valid case to use previously acquired span
+ buffer = AppendBuffer;
+ AppendOffset += span.size();
+ Y_VERIFY_DEBUG(AppendOffset <= BufferSize);
+ if (AppendOffset != BufferSize) {
+ ++buffer->RefCount;
+ }
+ } else {
+#ifndef NDEBUG
+ // ensure this span does not point into any existing buffer part
+ const char *begin = span.data();
+ const char *end = span.data() + span.size();
+ for (const auto& buffer : Buffers) {
+ const char *bufferBegin = buffer->Data;
+ const char *bufferEnd = bufferBegin + BufferSize;
+ if (bufferBegin < end && begin < bufferEnd) {
+ Y_FAIL();
+ }
+ }
+#endif
+ }
+
+ if (!SendQueue.empty()) {
+ auto& back = SendQueue.back();
+ if (back.Span.data() + back.Span.size() == span.data()) { // check if it is possible just to extend the last span
+ if (SendQueuePos == SendQueue.size()) {
+ --SendQueuePos;
+ SendOffset = back.Span.size();
+ }
+ back.Span = {back.Span.data(), back.Span.size() + span.size()};
+ DropBufferReference(buffer);
+ return;
+ }
+ }
+
+ if (buffer) {
+ ++buffer->RefCount;
+ }
+ SendQueue.push_back(TSendChunk{span, buffer});
+ DropBufferReference(buffer);
+ }
+
+ void Write(TContiguousSpan in) {
+ while (in.size()) {
+ auto outChunk = AcquireSpanForWriting(in.size());
+ Append(outChunk);
+ memcpy(outChunk.data(), in.data(), outChunk.size());
+ in = in.SubSpan(outChunk.size(), Max<size_t>());
+ }
+ }
+
+ using TBookmark = TStackVec<TMutableContiguousSpan, 2>;
+
+ TBookmark Bookmark(size_t len) {
+ TBookmark bookmark;
+
+ while (len) {
+ const auto span = AcquireSpanForWriting(len);
+ Append(span);
+ bookmark.push_back(span);
+ len -= span.size();
+ }
+
+ return bookmark;
+ }
+
+ void WriteBookmark(TBookmark&& bookmark, TContiguousSpan in) {
+ for (auto& outChunk : bookmark) {
+ Y_VERIFY(outChunk.size() <= in.size());
+ memcpy(outChunk.data(), in.data(), outChunk.size());
+ in = in.SubSpan(outChunk.size(), Max<size_t>());
+ }
+ }
+
+ void Rewind() {
+ SendQueuePos = 0;
+ SendOffset = 0;
+ }
+
+ template<typename T>
+ void ProduceIoVec(T& container, size_t maxItems) {
+ size_t offset = SendOffset;
+ for (auto it = SendQueue.begin() + SendQueuePos; it != SendQueue.end() && std::size(container) < maxItems; ++it) {
+ const TContiguousSpan span = it->Span.SubSpan(offset, Max<size_t>());
+ container.push_back(NActors::TConstIoVec{span.data(), span.size()});
+ offset = 0;
+ }
+ }
+
+ void Advance(size_t numBytes) { // called when numBytes portion of data has been sent
+ Y_VERIFY_DEBUG(numBytes == 0 || SendQueuePos != SendQueue.size());
+ SendOffset += numBytes;
+ for (auto it = SendQueue.begin() + SendQueuePos; SendOffset && it->Span.size() <= SendOffset; ++SendQueuePos, ++it) {
+ SendOffset -= it->Span.size();
+ Y_VERIFY_DEBUG(SendOffset == 0 || SendQueuePos != SendQueue.size() - 1);
+ }
+ }
+
+ void DropFront(size_t numBytes) { // drops first numBytes from the queue, freeing buffers when necessary
+ while (numBytes) {
+ Y_VERIFY_DEBUG(!SendQueue.empty());
+ auto& front = SendQueue.front();
+ if (numBytes < front.Span.size()) {
+ front.Span = front.Span.SubSpan(numBytes, Max<size_t>());
+ if (SendQueuePos == 0) {
+ Y_VERIFY_DEBUG(numBytes <= SendOffset);
+ SendOffset -= numBytes;
+ }
+ break;
+ } else {
+ numBytes -= front.Span.size();
+ }
+ Y_VERIFY_DEBUG(!front.Buffer || (front.Span.data() >= front.Buffer->Data &&
+ front.Span.data() + front.Span.size() <= front.Buffer->Data + BufferSize));
+ DropBufferReference(front.Buffer);
+ SendQueue.pop_front();
+ if (SendQueuePos) {
+ --SendQueuePos;
+ } else {
+ SendOffset = 0;
+ }
+ }
+ }
+
+ template<typename T>
+ void ScanLastBytes(size_t numBytes, T&& callback) const {
+ auto it = SendQueue.end();
+ ssize_t offset = -numBytes;
+ while (offset < 0) {
+ Y_VERIFY_DEBUG(it != SendQueue.begin());
+ const TSendChunk& chunk = *--it;
+ offset += chunk.Span.size();
+ }
+ for (; it != SendQueue.end(); ++it, offset = 0) {
+ callback(it->Span.SubSpan(offset, Max<size_t>()));
+ }
+ }
+
+ private:
+ void DropBufferReference(TBuffer *buffer) {
+ if (buffer && !--buffer->RefCount) {
+ const size_t index = buffer->Index;
+ auto& cell = Buffers[index];
+ Y_VERIFY_DEBUG(cell.get() == buffer);
+ std::swap(cell, Buffers.back());
+ cell->Index = index;
+ Buffers.pop_back();
+ }
+ }
+ };
+
+ using TOutgoingStream = TOutgoingStreamT<262144>;
+
+} // NInterconnect
diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h
index 501fb08ab1..c1d63fa3b8 100644
--- a/library/cpp/actors/interconnect/packet.h
+++ b/library/cpp/actors/interconnect/packet.h
@@ -14,6 +14,7 @@
#include <util/generic/list.h>
#include "types.h"
+#include "outgoing_stream.h"
#ifndef FORCE_EVENT_CHECKSUM
#define FORCE_EVENT_CHECKSUM 0
@@ -45,9 +46,6 @@ struct TTcpPacketBuf {
static constexpr ui64 ClockMask = 0x2000000000000000ULL;
static constexpr size_t PacketDataLen = 4096 * 2 - 96 - sizeof(TTcpPacketHeader_v2);
-
- TTcpPacketHeader_v2 Header;
- char Data[PacketDataLen];
};
struct TEventData {
@@ -121,144 +119,83 @@ namespace NActors {
struct TTcpPacketOutTask : TNonCopyable {
const TSessionParams& Params;
- TTcpPacketBuf Packet;
- size_t DataSize;
- TStackVec<TConstIoVec, 32> Bufs;
- size_t BufferIndex;
- size_t FirstBufferOffset;
- bool TriedWriting;
- char *FreeArea;
- char *End;
+ NInterconnect::TOutgoingStream& OutgoingStream;
+ NInterconnect::TOutgoingStream::TBookmark HeaderBookmark;
+ size_t DataSize = 0;
mutable NLWTrace::TOrbit Orbit;
-public:
- TTcpPacketOutTask(const TSessionParams& params)
+ TTcpPacketOutTask(const TSessionParams& params, NInterconnect::TOutgoingStream& outgoingStream)
: Params(params)
- {
- Reuse();
- }
-
- bool IsAtBegin() const {
- return !BufferIndex && !FirstBufferOffset && !TriedWriting;
- }
-
- void MarkTriedWriting() {
- TriedWriting = true;
- }
+ , OutgoingStream(outgoingStream)
+ , HeaderBookmark(OutgoingStream.Bookmark(sizeof(TTcpPacketHeader_v2)))
+ {}
- void Reuse() {
- DataSize = 0;
- Bufs.assign(1, {&Packet.Header, sizeof(Packet.Header)});
- BufferIndex = 0;
- FirstBufferOffset = 0;
- TriedWriting = false;
- FreeArea = Packet.Data;
- End = FreeArea + TTcpPacketBuf::PacketDataLen;
- Orbit.Reset();
- }
-
- bool IsEmpty() const {
- return !DataSize;
+ // Preallocate some space to fill it later.
+ NInterconnect::TOutgoingStream::TBookmark Bookmark(size_t len) {
+ Y_VERIFY_DEBUG(len <= GetVirtualFreeAmount());
+ DataSize += len;
+ return OutgoingStream.Bookmark(len);
}
- void SetMetadata(ui64 serial, ui64 confirm) {
- Packet.Header.Serial = serial;
- Packet.Header.Confirm = confirm;
- }
-
- size_t GetDataSize() const { return DataSize; }
-
- ui64 GetSerial() const {
- return Packet.Header.Serial;
+ // Write previously bookmarked space.
+ void WriteBookmark(NInterconnect::TOutgoingStream::TBookmark&& bookmark, const void *buffer, size_t len) {
+ OutgoingStream.WriteBookmark(std::move(bookmark), {static_cast<const char*>(buffer), len});
}
- bool Confirmed(ui64 confirm) const {
- return IsEmpty() || Packet.Header.Serial <= confirm;
+ // Acquire raw pointer to write some data.
+ TMutableContiguousSpan AcquireSpanForWriting() {
+ return OutgoingStream.AcquireSpanForWriting(GetVirtualFreeAmount());
}
- void *GetFreeArea() {
- return FreeArea;
+ // Append reference to some data (acquired previously or external pointer).
+ void Append(const void *buffer, size_t len) {
+ Y_VERIFY_DEBUG(len <= GetVirtualFreeAmount());
+ DataSize += len;
+ OutgoingStream.Append({static_cast<const char*>(buffer), len});
}
- size_t GetVirtualFreeAmount() const {
- return TTcpPacketBuf::PacketDataLen - DataSize;
+ // Write some data with copying.
+ void Write(const void *buffer, size_t len) {
+ Y_VERIFY_DEBUG(len <= GetVirtualFreeAmount());
+ DataSize += len;
+ OutgoingStream.Write({static_cast<const char*>(buffer), len});
}
- void AppendBuf(const void *buf, size_t size) {
- DataSize += size;
- Y_VERIFY_DEBUG(DataSize <= TTcpPacketBuf::PacketDataLen, "DataSize# %zu AppendBuf buf# %p size# %zu"
- " FreeArea# %p End# %p", DataSize, buf, size, FreeArea, End);
-
- if (Bufs && static_cast<const char*>(Bufs.back().Data) + Bufs.back().Size == buf) {
- Bufs.back().Size += size;
- } else {
- Bufs.push_back({buf, size});
- }
+ void Finish(ui64 serial, ui64 confirm) {
+ Y_VERIFY(DataSize <= Max<ui16>());
- if (buf >= FreeArea && buf < End) {
- Y_VERIFY_DEBUG(buf == FreeArea);
- FreeArea = const_cast<char*>(static_cast<const char*>(buf)) + size;
- Y_VERIFY_DEBUG(FreeArea <= End);
- }
- }
+ TTcpPacketHeader_v2 header{
+ confirm,
+ serial,
+ 0,
+ static_cast<ui16>(DataSize)
+ };
- void Undo(size_t size) {
- Y_VERIFY(Bufs);
- auto& buf = Bufs.back();
- Y_VERIFY(buf.Data == FreeArea - buf.Size);
- buf.Size -= size;
- if (!buf.Size) {
- Bufs.pop_back();
- }
- FreeArea -= size;
- DataSize -= size;
- }
+ if (Checksumming()) {
+ // pre-write header without checksum for correct checksum calculation
+ WriteBookmark(NInterconnect::TOutgoingStream::TBookmark(HeaderBookmark), &header, sizeof(header));
- bool DropBufs(size_t& amount) {
- while (BufferIndex != Bufs.size()) {
- TConstIoVec& item = Bufs[BufferIndex];
- // calculate number of bytes to the end in current buffer
- const size_t remain = item.Size - FirstBufferOffset;
- if (amount >= remain) {
- // vector item completely fits into the received amount, drop it out and switch to next buffer
- amount -= remain;
- ++BufferIndex;
- FirstBufferOffset = 0;
- } else {
- // adjust first buffer by "amount" bytes forward and reset amount to zero
- FirstBufferOffset += amount;
- amount = 0;
- // return false meaning that we have some more data to send
- return false;
- }
+ size_t total = 0;
+ ui32 checksum = 0;
+ OutgoingStream.ScanLastBytes(GetFullSize(), [&](TContiguousSpan span) {
+ checksum = Crc32cExtendMSanCompatible(checksum, span.data(), span.size());
+ total += span.size();
+ });
+ header.Checksum = checksum;
+ Y_VERIFY(total == sizeof(header) + DataSize, "total# %zu DataSize# %zu GetFullSize# %zu", total, DataSize,
+ GetFullSize());
}
- return true;
- }
- void ResetBufs() {
- BufferIndex = FirstBufferOffset = 0;
- TriedWriting = false;
+ WriteBookmark(std::exchange(HeaderBookmark, {}), &header, sizeof(header));
}
- template <typename TVectorType>
- void AppendToIoVector(TVectorType& vector, size_t max) {
- for (size_t k = BufferIndex, offset = FirstBufferOffset; k != Bufs.size() && vector.size() < max; ++k, offset = 0) {
- TConstIoVec v = Bufs[k];
- v.Data = static_cast<const char*>(v.Data) + offset;
- v.Size -= offset;
- vector.push_back(v);
- }
+ bool Checksumming() const {
+ return !Params.Encryption;
}
- void Sign() {
- Packet.Header.Checksum = 0;
- Packet.Header.PayloadLength = DataSize;
- if (!Params.Encryption) {
- ui32 sum = 0;
- for (const auto& item : Bufs) {
- sum = Crc32cExtendMSanCompatible(sum, item.Data, item.Size);
- }
- Packet.Header.Checksum = sum;
- }
- }
+ bool IsFull() const { return GetVirtualFreeAmount() == 0; }
+ bool IsEmpty() const { return GetDataSize() == 0; }
+ size_t GetDataSize() const { return DataSize; }
+ size_t GetFullSize() const { return sizeof(TTcpPacketHeader_v2) + GetDataSize(); }
+ size_t GetVirtualFreeAmount() const { return TTcpPacketBuf::PacketDataLen - DataSize; }
};
diff --git a/library/cpp/actors/interconnect/ut/CMakeLists.darwin-x86_64.txt b/library/cpp/actors/interconnect/ut/CMakeLists.darwin-x86_64.txt
index e1e00ed6ad..1b744e906c 100644
--- a/library/cpp/actors/interconnect/ut/CMakeLists.darwin-x86_64.txt
+++ b/library/cpp/actors/interconnect/ut/CMakeLists.darwin-x86_64.txt
@@ -35,6 +35,7 @@ target_sources(library-cpp-actors-interconnect-ut PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/interconnect_ut.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/large.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/outgoing_stream_ut.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp
)
diff --git a/library/cpp/actors/interconnect/ut/CMakeLists.linux-aarch64.txt b/library/cpp/actors/interconnect/ut/CMakeLists.linux-aarch64.txt
index a869318b87..b77f4f4e6c 100644
--- a/library/cpp/actors/interconnect/ut/CMakeLists.linux-aarch64.txt
+++ b/library/cpp/actors/interconnect/ut/CMakeLists.linux-aarch64.txt
@@ -38,6 +38,7 @@ target_sources(library-cpp-actors-interconnect-ut PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/interconnect_ut.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/large.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/outgoing_stream_ut.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp
)
diff --git a/library/cpp/actors/interconnect/ut/CMakeLists.linux-x86_64.txt b/library/cpp/actors/interconnect/ut/CMakeLists.linux-x86_64.txt
index 479e70158a..96489d5e6b 100644
--- a/library/cpp/actors/interconnect/ut/CMakeLists.linux-x86_64.txt
+++ b/library/cpp/actors/interconnect/ut/CMakeLists.linux-x86_64.txt
@@ -39,6 +39,7 @@ target_sources(library-cpp-actors-interconnect-ut PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/interconnect_ut.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/large.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/outgoing_stream_ut.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp
)
diff --git a/library/cpp/actors/interconnect/ut/CMakeLists.windows-x86_64.txt b/library/cpp/actors/interconnect/ut/CMakeLists.windows-x86_64.txt
index 61244b1f9d..ddae9008cf 100644
--- a/library/cpp/actors/interconnect/ut/CMakeLists.windows-x86_64.txt
+++ b/library/cpp/actors/interconnect/ut/CMakeLists.windows-x86_64.txt
@@ -28,6 +28,7 @@ target_sources(library-cpp-actors-interconnect-ut PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/interconnect_ut.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/large.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/outgoing_stream_ut.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp
)
diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
index 32c8237b59..2de8dce457 100644
--- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
@@ -43,8 +43,10 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) {
std::deque<std::map<ui16, ui32>> window;
+ NInterconnect::TOutgoingStream stream;
+
for (; numEvents; ++step) {
- TTcpPacketOutTask task(p);
+ TTcpPacketOutTask task(p, stream);
if (step == 100) {
for (ui32 i = 0; i < 200; ++i) {
diff --git a/library/cpp/actors/interconnect/ut/lib/node.h b/library/cpp/actors/interconnect/ut/lib/node.h
index 0b538cdb1c..e15baf639e 100644
--- a/library/cpp/actors/interconnect/ut/lib/node.h
+++ b/library/cpp/actors/interconnect/ut/lib/node.h
@@ -2,6 +2,7 @@
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/actors/core/executor_pool_basic.h>
+#include <library/cpp/actors/core/executor_pool_io.h>
#include <library/cpp/actors/core/scheduler_basic.h>
#include <library/cpp/actors/core/mailbox.h>
#include <library/cpp/actors/dnsresolver/dnsresolver.h>
@@ -24,11 +25,10 @@ public:
TIntrusivePtr<NLog::TSettings> loggerSettings = nullptr) {
TActorSystemSetup setup;
setup.NodeId = nodeId;
- setup.ExecutorsCount = 1;
+ setup.ExecutorsCount = 2;
setup.Executors.Reset(new TAutoPtr<IExecutorPool>[setup.ExecutorsCount]);
- for (ui32 i = 0; i < setup.ExecutorsCount; ++i) {
- setup.Executors[i].Reset(new TBasicExecutorPool(i, numThreads, 20 /* magic number */));
- }
+ setup.Executors[0].Reset(new TBasicExecutorPool(0, numThreads, 20 /* magic number */));
+ setup.Executors[1].Reset(new TIOExecutorPool(1, 1));
setup.Scheduler.Reset(new TBasicSchedulerThread());
const ui32 interconnectPoolId = 0;
@@ -108,8 +108,7 @@ public:
// register logger
setup.LocalServices.emplace_back(loggerActorId, TActorSetupCmd(new TLoggerActor(loggerSettings,
CreateStderrBackend(), counters->GetSubgroup("subsystem", "logger")),
- TMailboxType::ReadAsFilled, interconnectPoolId));
-
+ TMailboxType::ReadAsFilled, 1));
if (common->OutgoingHandshakeInflightLimit) {
// create handshake broker actor
diff --git a/library/cpp/actors/interconnect/ut/outgoing_stream_ut.cpp b/library/cpp/actors/interconnect/ut/outgoing_stream_ut.cpp
new file mode 100644
index 0000000000..14fcf25c2c
--- /dev/null
+++ b/library/cpp/actors/interconnect/ut/outgoing_stream_ut.cpp
@@ -0,0 +1,147 @@
+#include <library/cpp/actors/interconnect/outgoing_stream.h>
+#include <library/cpp/testing/unittest/registar.h>
+#include <util/random/entropy.h>
+#include <util/stream/null.h>
+
+#define Ctest Cnull
+
+Y_UNIT_TEST_SUITE(OutgoingStream) {
+ Y_UNIT_TEST(Basic) {
+ std::vector<char> buffer;
+ buffer.resize(4 << 20);
+
+ TReallyFastRng32 rng(EntropyPool());
+ for (char *p = buffer.data(); p != buffer.data() + buffer.size(); p += sizeof(ui32)) {
+ *reinterpret_cast<ui32*>(p) = rng();
+ }
+
+ for (ui32 nIter = 0; nIter < 10; ++nIter) {
+ Cerr << "nIter# " << nIter << Endl;
+
+ size_t base = 0; // number of dropped bytes
+ size_t sendOffset = 0; // offset to base
+ size_t pending = 0; // number of bytes in queue
+
+ NInterconnect::TOutgoingStreamT<4096> stream;
+
+ size_t numRewindsRemain = 10;
+
+ while (base != buffer.size()) {
+ const size_t bytesToEnd = buffer.size() - (base + sendOffset);
+
+ Ctest << "base# " << base << " sendOffset# " << sendOffset << " pending# " << pending
+ << " bytesToEnd# " << bytesToEnd;
+
+ UNIT_ASSERT_VALUES_EQUAL(stream.CalculateOutgoingSize(), pending + sendOffset);
+ UNIT_ASSERT_VALUES_EQUAL(stream.CalculateUnsentSize(), pending);
+
+ const size_t maxBuffers = 128;
+ std::vector<NActors::TConstIoVec> iov;
+ stream.ProduceIoVec(iov, maxBuffers);
+ size_t offset = base + sendOffset;
+ for (const auto& [ptr, len] : iov) {
+ UNIT_ASSERT(memcmp(buffer.data() + offset, ptr, len) == 0);
+ offset += len;
+ }
+ UNIT_ASSERT(iov.size() == maxBuffers || offset == base + sendOffset + pending);
+
+ const char *nextData = buffer.data() + base + sendOffset + pending;
+ const size_t nextDataMaxLen = bytesToEnd - pending;
+ const size_t nextDataLen = nextDataMaxLen ? rng() % Min<size_t>(16384, nextDataMaxLen) + 1 : 0;
+
+ if (size_t bytesToScan = sendOffset + pending) {
+ bytesToScan = rng() % bytesToScan + 1;
+ size_t offset = base + sendOffset + pending - bytesToScan;
+ stream.ScanLastBytes(bytesToScan, [&](TContiguousSpan span) {
+ UNIT_ASSERT(offset + span.size() <= base + sendOffset + pending);
+ UNIT_ASSERT(memcmp(buffer.data() + offset, span.data(), span.size()) == 0);
+ offset += span.size();
+ });
+ UNIT_ASSERT_VALUES_EQUAL(offset, base + sendOffset + pending);
+ }
+
+ enum class EAction {
+ COPY_APPEND,
+ WRITE,
+ REF_APPEND,
+ ADVANCE,
+ REWIND,
+ DROP,
+ BOOKMARK
+ };
+
+ std::vector<EAction> actions;
+ if (nextDataLen) {
+ actions.push_back(EAction::COPY_APPEND);
+ actions.push_back(EAction::WRITE);
+ actions.push_back(EAction::REF_APPEND);
+ actions.push_back(EAction::BOOKMARK);
+ }
+ if (numRewindsRemain && sendOffset > 65536) {
+ actions.push_back(EAction::REWIND);
+ }
+ actions.push_back(EAction::ADVANCE);
+ actions.push_back(EAction::DROP);
+
+ switch (actions[rng() % actions.size()]) {
+ case EAction::COPY_APPEND: {
+ Ctest << " COPY_APPEND nextDataLen# " << nextDataLen;
+ auto span = stream.AcquireSpanForWriting(nextDataLen);
+ UNIT_ASSERT(span.size() != 0);
+ memcpy(span.data(), nextData, span.size());
+ stream.Append(span);
+ pending += span.size();
+ break;
+ }
+
+ case EAction::WRITE:
+ Ctest << " WRITE nextDataLen# " << nextDataLen;
+ stream.Write({nextData, nextDataLen});
+ pending += nextDataLen;
+ break;
+
+ case EAction::REF_APPEND:
+ Ctest << " REF_APPEND nextDataLen# " << nextDataLen;
+ stream.Append({nextData, nextDataLen});
+ pending += nextDataLen;
+ break;
+
+ case EAction::ADVANCE: {
+ const size_t advance = rng() % Min<size_t>(4096, pending + 1);
+ Ctest << " ADVANCE advance# " << advance;
+ stream.Advance(advance);
+ sendOffset += advance;
+ pending -= advance;
+ break;
+ }
+
+ case EAction::REWIND:
+ Ctest << " REWIND";
+ stream.Rewind();
+ pending += sendOffset;
+ sendOffset = 0;
+ --numRewindsRemain;
+ break;
+
+ case EAction::DROP: {
+ const size_t drop = rng() % Min<size_t>(65536, sendOffset + 1);
+ Ctest << " DROP drop# " << drop;
+ stream.DropFront(drop);
+ base += drop;
+ sendOffset -= drop;
+ break;
+ }
+
+ case EAction::BOOKMARK:
+ Ctest << " BOOKMARK nextDataLen# " << nextDataLen;
+ auto bookmark = stream.Bookmark(nextDataLen);
+ stream.WriteBookmark(std::move(bookmark), {nextData, nextDataLen});
+ pending += nextDataLen;
+ break;
+ }
+
+ Ctest << Endl;
+ }
+ }
+ }
+}