diff options
author | alexvru <alexvru@ydb.tech> | 2023-04-19 13:31:50 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-04-19 13:31:50 +0300 |
commit | 7653175c3b4bd262510d7dbfdf5dc30ceb777d75 (patch) | |
tree | d33ef2f5490f7f8dbab2e0e9ac8c3b13f26e8111 /library/cpp/actors | |
parent | 9143f30e39d2d0d9d81048632bce5f25de587b3f (diff) | |
download | ydb-7653175c3b4bd262510d7dbfdf5dc30ceb777d75.tar.gz |
Rework interconnect sending stream machinery
Diffstat (limited to 'library/cpp/actors')
14 files changed, 612 insertions, 306 deletions
diff --git a/library/cpp/actors/interconnect/event_holder_pool.h b/library/cpp/actors/interconnect/event_holder_pool.h index b6090a3bc8..59df9bd4c8 100644 --- a/library/cpp/actors/interconnect/event_holder_pool.h +++ b/library/cpp/actors/interconnect/event_holder_pool.h @@ -8,7 +8,6 @@ namespace NActors { struct TEvFreeItems : TEventLocal<TEvFreeItems, EventSpaceBegin(TEvents::ES_PRIVATE)> { static constexpr size_t MaxEvents = 256; - TList<TTcpPacketOutTask> Items; std::list<TEventHolder> FreeQueue; TStackVec<THolder<IEventBase>, MaxEvents> Events; TStackVec<THolder<TEventSerializedData>, MaxEvents> Buffers; diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp index 316d57551d..64a6bbbd09 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.cpp +++ b/library/cpp/actors/interconnect/interconnect_channel.cpp @@ -12,8 +12,7 @@ LWTRACE_USING(ACTORLIB_PROVIDER); namespace NActors { bool TEventOutputChannel::FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) { - const size_t descrSize = sizeof(TEventDescr2); - const size_t amount = sizeof(TChannelPart) + descrSize; + const size_t amount = sizeof(TChannelPart) + sizeof(TEventDescr2); if (task.GetVirtualFreeAmount() < amount) { return false; } @@ -25,28 +24,33 @@ namespace NActors { task.Orbit.Take(event.Orbit); Y_VERIFY(SerializationInfo); - event.Descr.Flags = (event.Descr.Flags & ~IEventHandle::FlagForwardOnNondelivery) | + const ui32 flags = (event.Descr.Flags & ~IEventHandle::FlagForwardOnNondelivery) | (SerializationInfo->IsExtendedFormat ? IEventHandle::FlagExtendedFormat : 0); - TChannelPart *part = static_cast<TChannelPart*>(task.GetFreeArea()); - part->Channel = ChannelId | TChannelPart::LastPartFlag; - part->Size = descrSize; - - auto *p = reinterpret_cast<TEventDescr2*>(part + 1); - *p = { + // prepare descriptor record + TEventDescr2 descr{ event.Descr.Type, - event.Descr.Flags, + flags, event.Descr.Recipient, event.Descr.Sender, event.Descr.Cookie, {}, event.Descr.Checksum }; - traceId.Serialize(&p->TraceId); + traceId.Serialize(&descr.TraceId); + + // and channel header before the descriptor + TChannelPart part{ + .Channel = static_cast<ui16>(ChannelId | TChannelPart::LastPartFlag), + .Size = sizeof(descr), + }; + + // append them to the packet + task.Write(&part, sizeof(part)); + task.Write(&descr, sizeof(descr)); - task.AppendBuf(part, amount); *weightConsumed += amount; - OutputQueueSize -= part->Size; + OutputQueueSize -= part.Size; Metrics->UpdateOutputChannelEvents(ChannelId); return true; @@ -75,7 +79,9 @@ namespace NActors { } else if (event.Event) { State = EState::CHUNKER; IEventBase *base = event.Event.Get(); - Chunker.SetSerializingEvent(base); + if (event.EventSerializedSize) { + Chunker.SetSerializingEvent(base); + } SerializationInfoContainer = base->CreateSerializationInfo(); SerializationInfo = &SerializationInfoContainer; } else { // event without buffer and IEventBase instance @@ -83,28 +89,28 @@ namespace NActors { SerializationInfoContainer = {}; SerializationInfo = &SerializationInfoContainer; } + if (!event.EventSerializedSize) { + State = EState::DESCRIPTOR; + } break; case EState::CHUNKER: case EState::BUFFER: { - size_t maxBytes = task.GetVirtualFreeAmount(); - if (maxBytes <= sizeof(TChannelPart)) { + if (task.GetVirtualFreeAmount() <= sizeof(TChannelPart)) { return false; } - TChannelPart *part = static_cast<TChannelPart*>(task.GetFreeArea()); - part->Channel = ChannelId; - part->Size = 0; - task.AppendBuf(part, sizeof(TChannelPart)); - maxBytes -= sizeof(TChannelPart); - Y_VERIFY(maxBytes); + TChannelPart part{ + .Channel = ChannelId, + .Size = 0, + }; + + auto partBookmark = task.Bookmark(sizeof(part)); auto addChunk = [&](const void *data, size_t len) { event.UpdateChecksum(data, len); - task.AppendBuf(data, len); - part->Size += len; - Y_VERIFY_DEBUG(maxBytes >= len); - maxBytes -= len; + task.Append(data, len); + part.Size += len; event.EventActuallySerialized += len; if (event.EventActuallySerialized > MaxSerializedEventSize) { @@ -114,18 +120,18 @@ namespace NActors { bool complete = false; if (State == EState::CHUNKER) { - Y_VERIFY_DEBUG(task.GetFreeArea() == part + 1); - while (!complete && maxBytes) { - const auto [first, last] = Chunker.FeedBuf(task.GetFreeArea(), maxBytes); + while (!complete && !task.IsFull()) { + TMutableContiguousSpan out = task.AcquireSpanForWriting(); + const auto [first, last] = Chunker.FeedBuf(out.data(), out.size()); for (auto p = first; p != last; ++p) { addChunk(p->first, p->second); } complete = Chunker.IsComplete(); } Y_VERIFY(!complete || Chunker.IsSuccessfull()); - Y_VERIFY_DEBUG(complete || !maxBytes); + Y_VERIFY_DEBUG(complete || task.IsFull()); } else { // BUFFER - while (const size_t numb = Min(maxBytes, Iter.ContiguousSize())) { + while (const size_t numb = Min(task.GetVirtualFreeAmount(), Iter.ContiguousSize())) { const char *obuf = Iter.ContiguousData(); addChunk(obuf, numb); Iter += numb; @@ -138,12 +144,10 @@ namespace NActors { event.EventActuallySerialized, event.EventSerializedSize, event.Descr.Type); } - if (!part->Size) { - task.Undo(sizeof(TChannelPart)); - } else { - *weightConsumed += sizeof(TChannelPart) + part->Size; - OutputQueueSize -= part->Size; - } + Y_VERIFY_DEBUG(part.Size); + task.WriteBookmark(std::exchange(partBookmark, {}), &part, sizeof(part)); + *weightConsumed += sizeof(TChannelPart) + part.Size; + OutputQueueSize -= part.Size; if (complete) { State = EState::DESCRIPTOR; } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index ad2da30ec3..34770133ae 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -93,7 +93,7 @@ namespace NActors { switch (State) { case EState::HEADER: - if (IncomingData.GetSize() < sizeof(Header)) { + if (IncomingData.GetSize() < sizeof(TTcpPacketHeader_v2)) { break; } else { ProcessHeader(); @@ -172,14 +172,15 @@ namespace NActors { } void TInputSessionTCP::ProcessHeader() { - const bool success = IncomingData.ExtractFrontPlain(&Header, sizeof(Header)); + TTcpPacketHeader_v2 header; + const bool success = IncomingData.ExtractFrontPlain(&header, sizeof(header)); Y_VERIFY(success); - PayloadSize = Header.PayloadLength; - HeaderSerial = Header.Serial; - HeaderConfirm = Header.Confirm; + PayloadSize = header.PayloadLength; + const ui64 serial = header.Serial; + const ui64 confirm = header.Confirm; if (!Params.Encryption) { - ChecksumExpected = std::exchange(Header.Checksum, 0); - Checksum = Crc32cExtendMSanCompatible(0, &Header, sizeof(Header)); // start calculating checksum now + ChecksumExpected = std::exchange(header.Checksum, 0); + Checksum = Crc32cExtendMSanCompatible(0, &header, sizeof(header)); // start calculating checksum now if (!PayloadSize && Checksum != ChecksumExpected) { LOG_ERROR_IC_SESSION("ICIS10", "payload checksum error"); return ReestablishConnection(TDisconnectReason::ChecksumError()); @@ -189,9 +190,9 @@ namespace NActors { LOG_CRIT_IC_SESSION("ICIS07", "payload is way too big"); return DestroySession(TDisconnectReason::FormatError()); } - if (ConfirmedByInput < HeaderConfirm) { - ConfirmedByInput = HeaderConfirm; - if (AtomicGet(Context->ControlPacketId) <= HeaderConfirm && !NewPingProtocol) { + if (ConfirmedByInput < confirm) { + ConfirmedByInput = confirm; + if (AtomicGet(Context->ControlPacketId) <= confirm && !NewPingProtocol) { ui64 sendTime = AtomicGet(Context->ControlPacketSendTimer); TDuration duration = CyclesToDuration(GetCycleCountFast() - sendTime); const auto durationUs = duration.MicroSeconds(); @@ -205,20 +206,20 @@ namespace NActors { } if (PayloadSize) { const ui64 expected = Context->GetLastProcessedPacketSerial() + 1; - if (HeaderSerial == 0 || HeaderSerial > expected) { - LOG_CRIT_IC_SESSION("ICIS06", "packet serial %" PRIu64 ", but %" PRIu64 " expected", HeaderSerial, expected); + if (serial == 0 || serial > expected) { + LOG_CRIT_IC_SESSION("ICIS06", "packet serial %" PRIu64 ", but %" PRIu64 " expected", serial, expected); return DestroySession(TDisconnectReason::FormatError()); } - IgnorePayload = HeaderSerial != expected; + IgnorePayload = serial != expected; State = EState::PAYLOAD; - } else if (HeaderSerial & TTcpPacketBuf::PingRequestMask) { - Send(SessionId, new TEvProcessPingRequest(HeaderSerial & ~TTcpPacketBuf::PingRequestMask)); - } else if (HeaderSerial & TTcpPacketBuf::PingResponseMask) { - const ui64 sent = HeaderSerial & ~TTcpPacketBuf::PingResponseMask; + } else if (serial & TTcpPacketBuf::PingRequestMask) { + Send(SessionId, new TEvProcessPingRequest(serial & ~TTcpPacketBuf::PingRequestMask)); + } else if (serial & TTcpPacketBuf::PingResponseMask) { + const ui64 sent = serial & ~TTcpPacketBuf::PingResponseMask; const ui64 received = GetCycleCountFast(); HandlePingResponse(CyclesToDuration(received - sent)); - } else if (HeaderSerial & TTcpPacketBuf::ClockMask) { - HandleClock(TInstant::MicroSeconds(HeaderSerial & ~TTcpPacketBuf::ClockMask)); + } else if (serial & TTcpPacketBuf::ClockMask) { + HandleClock(TInstant::MicroSeconds(serial & ~TTcpPacketBuf::ClockMask)); } } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index 72982c1458..16cadc9e92 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -271,37 +271,27 @@ namespace NActors { // also reset SendQueuePos // drop confirmed packets first as we do not need unwanted retransmissions - SendQueuePos = SendQueue.end(); - DropConfirmed(nextPacket); + DropConfirmed(nextPacket, true); + OutgoingStream.Rewind(); + SendQueuePos = 0; + SendOffset = 0; - for (TSendQueue::iterator it = SendQueue.begin(); it != SendQueue.end(); ) { - const TSendQueue::iterator next = std::next(it); - if (it->IsEmpty()) { - SendQueueCache.splice(SendQueueCache.begin(), SendQueue, it); - } else { - it->ResetBufs(); - } - it = next; - } - TrimSendQueueCache(); - SendQueuePos = SendQueue.begin(); - - TMaybe<ui64> s; - for (auto it = SendQueuePos; it != SendQueue.end(); ++it) { - if (!it->IsEmpty()) { - s = it->GetSerial(); + ui64 serial = Max<ui64>(); + BytesUnwritten = 0; + for (const auto& packet : SendQueue) { + if (packet.Data && packet.Serial < serial) { + serial = packet.Serial; } + BytesUnwritten += packet.PacketSize; } - const ui64 serial = s.GetOrElse(Max<ui64>()); Y_VERIFY(serial > LastConfirmed, "%s serial# %" PRIu64 " LastConfirmed# %" PRIu64, LogPrefix.data(), serial, LastConfirmed); LOG_DEBUG_IC_SESSION("ICS06", "rewind SendQueue size# %zu LastConfirmed# %" PRIu64 " SendQueuePos.Serial# %" PRIu64 "\n", SendQueue.size(), LastConfirmed, serial); - BytesUnwritten = 0; - for (const auto& packet : SendQueue) { - BytesUnwritten += sizeof(TTcpPacketHeader_v2) + packet.GetDataSize(); - } + Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize(), "%s", TString(TStringBuilder() + << "BytesUnwritten# " << BytesUnwritten + << " CalculateUnsentSize# " << OutgoingStream.CalculateUnsentSize()).data()); SwitchStuckPeriod(); @@ -581,9 +571,21 @@ namespace NActors { LOG_DEBUG_IC_SESSION("ICS30", "WriteData WriteBlockedByFullSendBuffer# %s SendQueue.size# %zu", ReceiveContext->WriteBlockedByFullSendBuffer ? "true" : "false", SendQueue.size()); - while (SendQueuePos != SendQueue.end() && !ReceiveContext->WriteBlockedByFullSendBuffer) { - for (auto it = SendQueuePos; it != SendQueue.end() && wbuffers.size() < maxElementsInIOV; ++it) { - it->AppendToIoVector(wbuffers, maxElementsInIOV); + auto calculateUnsentQueueSize = [&] { + size_t res = -SendOffset; + for (auto it = SendQueue.begin() + SendQueuePos; it != SendQueue.end(); ++it) { + res += it->PacketSize; + } + return res; + }; + + while (!ReceiveContext->WriteBlockedByFullSendBuffer) { + Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize()); + Y_VERIFY_DEBUG(BytesUnwritten == calculateUnsentQueueSize()); + + OutgoingStream.ProduceIoVec(wbuffers, maxElementsInIOV); + if (!wbuffers) { // done sending + break; } const struct iovec* iovec = reinterpret_cast<const struct iovec*>(wbuffers.data()); @@ -610,21 +612,31 @@ namespace NActors { wbuffers.clear(); if (r > 0) { + written += r; + Y_VERIFY(static_cast<size_t>(r) <= BytesUnwritten); BytesUnwritten -= r; - written += r; - ui64 packets = 0; - // advance SendQueuePos to eat all processed items - for (size_t amount = r; amount && SendQueuePos->DropBufs(amount); ++SendQueuePos) { - if (!SendQueuePos->IsEmpty()) { - LastSentSerial = Max(LastSentSerial, SendQueuePos->GetSerial()); + OutgoingStream.Advance(r); + + ui64 packets = 0; + Y_VERIFY_DEBUG(SendQueuePos != SendQueue.size()); + SendOffset += r; + for (auto it = SendQueue.begin() + SendQueuePos; SendOffset && it->PacketSize <= SendOffset; ++SendQueuePos, ++it) { + SendOffset -= it->PacketSize; + Y_VERIFY_DEBUG(!it->Data || it->Serial <= OutputCounter); + if (it->Data && LastSentSerial < it->Serial) { + LastSentSerial = it->Serial; } ++PacketsWrittenToSocket; ++packets; - LWTRACK(PacketWrittenToSocket, SendQueuePos->Orbit, Proxy->PeerNodeId, PacketsWrittenToSocket, SendQueuePos->TriedWriting, SendQueuePos->GetDataSize(), BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket); + Y_VERIFY_DEBUG(SendOffset == 0 || SendQueuePos != SendQueue.size() - 1); +// LWTRACK(PacketWrittenToSocket, SendQueuePos->Orbit, Proxy->PeerNodeId, PacketsWrittenToSocket, false, SendQueuePos->PacketSize, BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket); } + Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize()); + Y_VERIFY_DEBUG(BytesUnwritten == calculateUnsentQueueSize()); + LWPROBE(WriteToSocket, Proxy->PeerNodeId, r, packets, PacketsWrittenToSocket, BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket); } else if (-r != EAGAIN && -r != EWOULDBLOCK) { const TString message = r == 0 ? "connection closed by peer" @@ -636,12 +648,6 @@ namespace NActors { } return ReestablishConnectionWithHandshake(r == 0 ? TDisconnectReason::EndOfStream() : TDisconnectReason::FromErrno(-r)); } else { - // we have to do some hack for secure socket -- mark the packet as 'tried writing' - if (Params.Encryption) { - Y_VERIFY(SendQueuePos != SendQueue.end()); - SendQueuePos->MarkTriedWriting(); // do not try to replace buffer under SSL - } - // we have received EAGAIN error code, this means that we can't issue more data until we have received // TEvPollerReadyWrite event from poller; set up flag meaning this and wait for that event Y_VERIFY(!ReceiveContext->WriteBlockedByFullSendBuffer); @@ -711,45 +717,10 @@ namespace NActors { } } - void TInterconnectSessionTCP::TrimSendQueueCache() { - static constexpr size_t maxItems = 32; - static constexpr size_t trimThreshold = maxItems * 2; - if (SendQueueCache.size() >= trimThreshold) { - auto it = SendQueueCache.end(); - for (size_t n = SendQueueCache.size() - maxItems; n; --n) { - --it; - } - - auto ev = std::make_unique<TEvFreeItems>(); - ev->Items.splice(ev->Items.end(), SendQueueCache, it, SendQueueCache.end()); - ev->NumBytes = ev->Items.size() * sizeof(TTcpPacketOutTask); - if (ev->GetInLineForDestruction(Proxy->Common)) { - Send(Proxy->Common->DestructorId, ev.release()); - } - } - } - ui64 TInterconnectSessionTCP::MakePacket(bool data, TMaybe<ui64> pingMask) { Y_VERIFY(Socket); - TSendQueue::iterator packet; - if (SendQueueCache) { - // we have entries in cache, take one and move it to the end of SendQueue - packet = SendQueueCache.begin(); - SendQueue.splice(SendQueue.end(), SendQueueCache, packet); - packet->Reuse(); // reset packet to initial state - } else { - // we have to allocate new packet, so just do it - LWPROBE_IF_TOO_LONG(SlowICAllocPacketBuffer, Proxy->PeerNodeId, ms) { - packet = SendQueue.emplace(SendQueue.end(), Params); - } - } - - // update send queue position - if (SendQueuePos == SendQueue.end()) { - SendQueuePos = packet; // start sending this packet if we are not sending anything for now - } - + TTcpPacketOutTask packet(Params, OutgoingStream); ui64 serial = 0; if (data) { @@ -759,12 +730,12 @@ namespace NActors { // fill the data packet Y_VERIFY(NumEventsInReadyChannels); LWPROBE_IF_TOO_LONG(SlowICFillSendingBuffer, Proxy->PeerNodeId, ms) { - FillSendingBuffer(*packet, serial); + FillSendingBuffer(packet, serial); } - Y_VERIFY(!packet->IsEmpty()); + Y_VERIFY(!packet.IsEmpty()); - InflightDataAmount += packet->GetDataSize(); - Proxy->Metrics->AddInflightDataAmount(packet->GetDataSize()); + InflightDataAmount += packet.GetDataSize(); + Proxy->Metrics->AddInflightDataAmount(packet.GetDataSize()); if (InflightDataAmount > GetTotalInflightAmountOfData()) { Proxy->Metrics->IncInflyLimitReach(); } @@ -778,39 +749,31 @@ namespace NActors { LastPayloadActivityTimestamp = TActivationContext::Monotonic(); } else if (pingMask) { serial = *pingMask; - - // make this packet a priority one - if (SendQueuePos != packet) { - Y_VERIFY(SendQueuePos != SendQueue.end()); - if (SendQueuePos->IsAtBegin()) { - // insert this packet just before the next being sent and step back - SendQueue.splice(SendQueuePos, SendQueue, packet); - --SendQueuePos; - Y_VERIFY(SendQueuePos == packet); - } else { - // current packet is already being sent, so move new packet just after it - SendQueue.splice(std::next(SendQueuePos), SendQueue, packet); - } - } } const ui64 lastInputSerial = ReceiveContext->GetLastProcessedPacketSerial(); - packet->SetMetadata(serial, lastInputSerial); - packet->Sign(); + + packet.Finish(serial, lastInputSerial); // count number of bytes pending for write - ui64 packetSize = sizeof(TTcpPacketHeader_v2) + packet->GetDataSize(); + const size_t packetSize = packet.GetFullSize(); BytesUnwritten += packetSize; + Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize(), "%s", TString(TStringBuilder() + << "BytesUnwritten# " << BytesUnwritten << " packetSize# " << packetSize + << " CalculateUnsentSize# " << OutgoingStream.CalculateUnsentSize()).data()); + + // put outgoing packet metadata here + SendQueue.push_back(TOutgoingPacket{packetSize, serial, data}); LOG_DEBUG_IC_SESSION("ICS22", "outgoing packet Serial# %" PRIu64 " Confirm# %" PRIu64 " DataSize# %zu" - " InflightDataAmount# %" PRIu64 " BytesUnwritten# %" PRIu64, serial, lastInputSerial, packet->GetDataSize(), + " InflightDataAmount# %" PRIu64 " BytesUnwritten# %" PRIu64, serial, lastInputSerial, packet.GetDataSize(), InflightDataAmount, BytesUnwritten); // reset forced packet sending timestamp as we have confirmed all received data ResetFlushLogic(); ++PacketsGenerated; - LWTRACK(PacketGenerated, packet->Orbit, Proxy->PeerNodeId, BytesUnwritten, InflightDataAmount, PacketsGenerated, packetSize); + LWTRACK(PacketGenerated, packet.Orbit, Proxy->PeerNodeId, BytesUnwritten, InflightDataAmount, PacketsGenerated, packetSize); if (!data) { WriteData(); @@ -819,7 +782,7 @@ namespace NActors { return packetSize; } - bool TInterconnectSessionTCP::DropConfirmed(ui64 confirm) { + bool TInterconnectSessionTCP::DropConfirmed(ui64 confirm, bool ignoreSendQueuePos) { LOG_DEBUG_IC_SESSION("ICS23", "confirm count: %" PRIu64, confirm); Y_VERIFY(LastConfirmed <= confirm && confirm <= LastSentSerial && LastSentSerial <= OutputCounter, @@ -828,24 +791,48 @@ namespace NActors { LastConfirmed = confirm; ui64 droppedDataAmount = 0; + std::optional<ui64> lastDroppedSerial = 0; ui32 numDropped = 0; +#ifndef NDEBUG + { + size_t totalBytesInSendQueue = 0; + size_t unsentBytesInSendQueue = 0; + for (size_t i = 0; i < SendQueue.size(); ++i) { + totalBytesInSendQueue += SendQueue[i].PacketSize; + if (i > SendQueuePos) { + unsentBytesInSendQueue += SendQueue[i].PacketSize; + } else if (i == SendQueuePos) { + unsentBytesInSendQueue += SendQueue[i].PacketSize - SendOffset; + } + } + Y_VERIFY(totalBytesInSendQueue == OutgoingStream.CalculateOutgoingSize()); + Y_VERIFY(ignoreSendQueuePos || unsentBytesInSendQueue == OutgoingStream.CalculateUnsentSize()); + } +#endif + // drop confirmed packets; this also includes any auxiliary packets as their serial is set to zero, effectively // making Serial <= confirm true - TSendQueue::iterator it; - ui64 lastDroppedSerial = 0; - for (it = SendQueue.begin(); it != SendQueuePos && it->Confirmed(confirm); ++it) { - if (!it->IsEmpty()) { - lastDroppedSerial = it->GetSerial(); + size_t bytesDropped = 0; + for (; !SendQueue.empty(); SendQueue.pop_front(), --SendQueuePos) { + auto& front = SendQueue.front(); + if (front.Data && confirm < front.Serial) { + break; + } + if (front.Data) { + lastDroppedSerial.emplace(front.Serial); } - droppedDataAmount += it->GetDataSize(); + droppedDataAmount += front.PacketSize - sizeof(TTcpPacketHeader_v2); + bytesDropped += front.PacketSize; ++numDropped; + Y_VERIFY_DEBUG(ignoreSendQueuePos || SendQueuePos != 0); + } + OutgoingStream.DropFront(bytesDropped); + if (lastDroppedSerial) { + ChannelScheduler->ForEach([&](TEventOutputChannel& channel) { + channel.DropConfirmed(*lastDroppedSerial); + }); } - SendQueueCache.splice(SendQueueCache.begin(), SendQueue, SendQueue.begin(), it); - TrimSendQueueCache(); - ChannelScheduler->ForEach([&](TEventOutputChannel& channel) { - channel.DropConfirmed(lastDroppedSerial); - }); const ui64 current = InflightDataAmount; const ui64 limit = GetTotalInflightAmountOfData(); @@ -1202,7 +1189,6 @@ namespace NActors { MON_VAR(OutputStuckFlag) MON_VAR(SendQueue.size()) - MON_VAR(SendQueueCache.size()) MON_VAR(NumEventsInReadyChannels) MON_VAR(TotalOutputQueueSize) MON_VAR(BytesUnwritten) diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index 7d0e003a34..48fa39b273 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -24,6 +24,7 @@ #include "watchdog_timer.h" #include "event_holder_pool.h" #include "channel_scheduler.h" +#include "outgoing_stream.h" #include <unordered_set> #include <unordered_map> @@ -217,10 +218,6 @@ namespace NActors { const ui32 NodeId; const TSessionParams Params; - // header we are currently processing (parsed from the stream) - TTcpPacketHeader_v2 Header; - ui64 HeaderConfirm, HeaderSerial; - size_t PayloadSize; ui32 ChecksumExpected, Checksum; bool IgnorePayload; @@ -233,6 +230,7 @@ namespace NActors { THolder<TEvUpdateFromInputSession> UpdateFromInputSession; + std::optional<ui64> LastReceivedSerial; ui64 ConfirmedByInput; std::shared_ptr<IInterconnectMetrics> Metrics; @@ -390,7 +388,7 @@ namespace NActors { ui64 MakePacket(bool data, TMaybe<ui64> pingMask = {}); void FillSendingBuffer(TTcpPacketOutTask& packet, ui64 serial); - bool DropConfirmed(ui64 confirm); + bool DropConfirmed(ui64 confirm, bool ignoreSendQueuePos = false); void ShutdownSocket(TDisconnectReason reason); void StartHandshake(); @@ -447,16 +445,21 @@ namespace NActors { void SetOutputStuckFlag(bool state); void SwitchStuckPeriod(); - using TSendQueue = TList<TTcpPacketOutTask>; - TSendQueue SendQueue; - TSendQueue SendQueueCache; - TSendQueue::iterator SendQueuePos; + NInterconnect::TOutgoingStream OutgoingStream; + + struct TOutgoingPacket { + size_t PacketSize; + ui64 Serial; + bool Data; + }; + std::deque<TOutgoingPacket> SendQueue; // packet boundaries + size_t SendQueuePos = 0; // packet being sent now + size_t SendOffset = 0; + ui64 WriteBlockedCycles = 0; // start of current block period TDuration WriteBlockedTotal; // total incremental duration that session has been blocked ui64 BytesUnwritten = 0; - void TrimSendQueueCache(); - TDuration GetWriteBlockedTotal() const { if (ReceiveContext->WriteBlockedByFullSendBuffer) { double blockedUs = NHPTimer::GetSeconds(GetCycleCountFast() - WriteBlockedCycles) * 1000000.0; diff --git a/library/cpp/actors/interconnect/outgoing_stream.h b/library/cpp/actors/interconnect/outgoing_stream.h new file mode 100644 index 0000000000..0306295b52 --- /dev/null +++ b/library/cpp/actors/interconnect/outgoing_stream.h @@ -0,0 +1,224 @@ +#pragma once + +#include <library/cpp/actors/core/event_load.h> +#include <library/cpp/actors/util/rc_buf.h> +#include <library/cpp/containers/stack_vector/stack_vec.h> +#include <deque> + +namespace NInterconnect { + + template<size_t TotalSize> + class TOutgoingStreamT { + static constexpr size_t BufferSize = TotalSize - sizeof(ui32) * 2; + + struct TBuffer { + char Data[BufferSize]; + ui32 RefCount; + ui32 Index; + + struct TDeleter { + void operator ()(TBuffer *buffer) const { + free(buffer); + } + }; + }; + + static_assert(sizeof(TBuffer) == TotalSize); + + struct TSendChunk { + TContiguousSpan Span; + TBuffer *Buffer; + }; + + std::vector<std::unique_ptr<TBuffer, typename TBuffer::TDeleter>> Buffers; + TBuffer *AppendBuffer = nullptr; + size_t AppendOffset = BufferSize; // into the last buffer + std::deque<TSendChunk> SendQueue; + size_t SendQueuePos = 0; + size_t SendOffset = 0; + + public: + size_t CalculateOutgoingSize() const { + size_t res = 0; + for (const TSendChunk& chunk : SendQueue) { + res += chunk.Span.size(); + } + return res; + } + + size_t CalculateUnsentSize() const { + size_t res = 0; + for (auto it = SendQueue.begin() + SendQueuePos; it != SendQueue.end(); ++it) { + res += it->Span.size(); + } + return res - SendOffset; + } + + TMutableContiguousSpan AcquireSpanForWriting(size_t maxLen) { + if (AppendOffset == BufferSize) { // we have no free buffer, allocate one + Buffers.emplace_back(static_cast<TBuffer*>(malloc(sizeof(TBuffer)))); + AppendBuffer = Buffers.back().get(); + Y_VERIFY(AppendBuffer); + AppendBuffer->RefCount = 1; // through AppendBuffer pointer + AppendBuffer->Index = Buffers.size() - 1; + AppendOffset = 0; + } + return {AppendBuffer->Data + AppendOffset, Min(maxLen, BufferSize - AppendOffset)}; + } + + void Append(TContiguousSpan span) { + TBuffer *buffer = nullptr; + if (AppendBuffer && span.data() == AppendBuffer->Data + AppendOffset) { // the only valid case to use previously acquired span + buffer = AppendBuffer; + AppendOffset += span.size(); + Y_VERIFY_DEBUG(AppendOffset <= BufferSize); + if (AppendOffset != BufferSize) { + ++buffer->RefCount; + } + } else { +#ifndef NDEBUG + // ensure this span does not point into any existing buffer part + const char *begin = span.data(); + const char *end = span.data() + span.size(); + for (const auto& buffer : Buffers) { + const char *bufferBegin = buffer->Data; + const char *bufferEnd = bufferBegin + BufferSize; + if (bufferBegin < end && begin < bufferEnd) { + Y_FAIL(); + } + } +#endif + } + + if (!SendQueue.empty()) { + auto& back = SendQueue.back(); + if (back.Span.data() + back.Span.size() == span.data()) { // check if it is possible just to extend the last span + if (SendQueuePos == SendQueue.size()) { + --SendQueuePos; + SendOffset = back.Span.size(); + } + back.Span = {back.Span.data(), back.Span.size() + span.size()}; + DropBufferReference(buffer); + return; + } + } + + if (buffer) { + ++buffer->RefCount; + } + SendQueue.push_back(TSendChunk{span, buffer}); + DropBufferReference(buffer); + } + + void Write(TContiguousSpan in) { + while (in.size()) { + auto outChunk = AcquireSpanForWriting(in.size()); + Append(outChunk); + memcpy(outChunk.data(), in.data(), outChunk.size()); + in = in.SubSpan(outChunk.size(), Max<size_t>()); + } + } + + using TBookmark = TStackVec<TMutableContiguousSpan, 2>; + + TBookmark Bookmark(size_t len) { + TBookmark bookmark; + + while (len) { + const auto span = AcquireSpanForWriting(len); + Append(span); + bookmark.push_back(span); + len -= span.size(); + } + + return bookmark; + } + + void WriteBookmark(TBookmark&& bookmark, TContiguousSpan in) { + for (auto& outChunk : bookmark) { + Y_VERIFY(outChunk.size() <= in.size()); + memcpy(outChunk.data(), in.data(), outChunk.size()); + in = in.SubSpan(outChunk.size(), Max<size_t>()); + } + } + + void Rewind() { + SendQueuePos = 0; + SendOffset = 0; + } + + template<typename T> + void ProduceIoVec(T& container, size_t maxItems) { + size_t offset = SendOffset; + for (auto it = SendQueue.begin() + SendQueuePos; it != SendQueue.end() && std::size(container) < maxItems; ++it) { + const TContiguousSpan span = it->Span.SubSpan(offset, Max<size_t>()); + container.push_back(NActors::TConstIoVec{span.data(), span.size()}); + offset = 0; + } + } + + void Advance(size_t numBytes) { // called when numBytes portion of data has been sent + Y_VERIFY_DEBUG(numBytes == 0 || SendQueuePos != SendQueue.size()); + SendOffset += numBytes; + for (auto it = SendQueue.begin() + SendQueuePos; SendOffset && it->Span.size() <= SendOffset; ++SendQueuePos, ++it) { + SendOffset -= it->Span.size(); + Y_VERIFY_DEBUG(SendOffset == 0 || SendQueuePos != SendQueue.size() - 1); + } + } + + void DropFront(size_t numBytes) { // drops first numBytes from the queue, freeing buffers when necessary + while (numBytes) { + Y_VERIFY_DEBUG(!SendQueue.empty()); + auto& front = SendQueue.front(); + if (numBytes < front.Span.size()) { + front.Span = front.Span.SubSpan(numBytes, Max<size_t>()); + if (SendQueuePos == 0) { + Y_VERIFY_DEBUG(numBytes <= SendOffset); + SendOffset -= numBytes; + } + break; + } else { + numBytes -= front.Span.size(); + } + Y_VERIFY_DEBUG(!front.Buffer || (front.Span.data() >= front.Buffer->Data && + front.Span.data() + front.Span.size() <= front.Buffer->Data + BufferSize)); + DropBufferReference(front.Buffer); + SendQueue.pop_front(); + if (SendQueuePos) { + --SendQueuePos; + } else { + SendOffset = 0; + } + } + } + + template<typename T> + void ScanLastBytes(size_t numBytes, T&& callback) const { + auto it = SendQueue.end(); + ssize_t offset = -numBytes; + while (offset < 0) { + Y_VERIFY_DEBUG(it != SendQueue.begin()); + const TSendChunk& chunk = *--it; + offset += chunk.Span.size(); + } + for (; it != SendQueue.end(); ++it, offset = 0) { + callback(it->Span.SubSpan(offset, Max<size_t>())); + } + } + + private: + void DropBufferReference(TBuffer *buffer) { + if (buffer && !--buffer->RefCount) { + const size_t index = buffer->Index; + auto& cell = Buffers[index]; + Y_VERIFY_DEBUG(cell.get() == buffer); + std::swap(cell, Buffers.back()); + cell->Index = index; + Buffers.pop_back(); + } + } + }; + + using TOutgoingStream = TOutgoingStreamT<262144>; + +} // NInterconnect diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h index 501fb08ab1..c1d63fa3b8 100644 --- a/library/cpp/actors/interconnect/packet.h +++ b/library/cpp/actors/interconnect/packet.h @@ -14,6 +14,7 @@ #include <util/generic/list.h> #include "types.h" +#include "outgoing_stream.h" #ifndef FORCE_EVENT_CHECKSUM #define FORCE_EVENT_CHECKSUM 0 @@ -45,9 +46,6 @@ struct TTcpPacketBuf { static constexpr ui64 ClockMask = 0x2000000000000000ULL; static constexpr size_t PacketDataLen = 4096 * 2 - 96 - sizeof(TTcpPacketHeader_v2); - - TTcpPacketHeader_v2 Header; - char Data[PacketDataLen]; }; struct TEventData { @@ -121,144 +119,83 @@ namespace NActors { struct TTcpPacketOutTask : TNonCopyable { const TSessionParams& Params; - TTcpPacketBuf Packet; - size_t DataSize; - TStackVec<TConstIoVec, 32> Bufs; - size_t BufferIndex; - size_t FirstBufferOffset; - bool TriedWriting; - char *FreeArea; - char *End; + NInterconnect::TOutgoingStream& OutgoingStream; + NInterconnect::TOutgoingStream::TBookmark HeaderBookmark; + size_t DataSize = 0; mutable NLWTrace::TOrbit Orbit; -public: - TTcpPacketOutTask(const TSessionParams& params) + TTcpPacketOutTask(const TSessionParams& params, NInterconnect::TOutgoingStream& outgoingStream) : Params(params) - { - Reuse(); - } - - bool IsAtBegin() const { - return !BufferIndex && !FirstBufferOffset && !TriedWriting; - } - - void MarkTriedWriting() { - TriedWriting = true; - } + , OutgoingStream(outgoingStream) + , HeaderBookmark(OutgoingStream.Bookmark(sizeof(TTcpPacketHeader_v2))) + {} - void Reuse() { - DataSize = 0; - Bufs.assign(1, {&Packet.Header, sizeof(Packet.Header)}); - BufferIndex = 0; - FirstBufferOffset = 0; - TriedWriting = false; - FreeArea = Packet.Data; - End = FreeArea + TTcpPacketBuf::PacketDataLen; - Orbit.Reset(); - } - - bool IsEmpty() const { - return !DataSize; + // Preallocate some space to fill it later. + NInterconnect::TOutgoingStream::TBookmark Bookmark(size_t len) { + Y_VERIFY_DEBUG(len <= GetVirtualFreeAmount()); + DataSize += len; + return OutgoingStream.Bookmark(len); } - void SetMetadata(ui64 serial, ui64 confirm) { - Packet.Header.Serial = serial; - Packet.Header.Confirm = confirm; - } - - size_t GetDataSize() const { return DataSize; } - - ui64 GetSerial() const { - return Packet.Header.Serial; + // Write previously bookmarked space. + void WriteBookmark(NInterconnect::TOutgoingStream::TBookmark&& bookmark, const void *buffer, size_t len) { + OutgoingStream.WriteBookmark(std::move(bookmark), {static_cast<const char*>(buffer), len}); } - bool Confirmed(ui64 confirm) const { - return IsEmpty() || Packet.Header.Serial <= confirm; + // Acquire raw pointer to write some data. + TMutableContiguousSpan AcquireSpanForWriting() { + return OutgoingStream.AcquireSpanForWriting(GetVirtualFreeAmount()); } - void *GetFreeArea() { - return FreeArea; + // Append reference to some data (acquired previously or external pointer). + void Append(const void *buffer, size_t len) { + Y_VERIFY_DEBUG(len <= GetVirtualFreeAmount()); + DataSize += len; + OutgoingStream.Append({static_cast<const char*>(buffer), len}); } - size_t GetVirtualFreeAmount() const { - return TTcpPacketBuf::PacketDataLen - DataSize; + // Write some data with copying. + void Write(const void *buffer, size_t len) { + Y_VERIFY_DEBUG(len <= GetVirtualFreeAmount()); + DataSize += len; + OutgoingStream.Write({static_cast<const char*>(buffer), len}); } - void AppendBuf(const void *buf, size_t size) { - DataSize += size; - Y_VERIFY_DEBUG(DataSize <= TTcpPacketBuf::PacketDataLen, "DataSize# %zu AppendBuf buf# %p size# %zu" - " FreeArea# %p End# %p", DataSize, buf, size, FreeArea, End); - - if (Bufs && static_cast<const char*>(Bufs.back().Data) + Bufs.back().Size == buf) { - Bufs.back().Size += size; - } else { - Bufs.push_back({buf, size}); - } + void Finish(ui64 serial, ui64 confirm) { + Y_VERIFY(DataSize <= Max<ui16>()); - if (buf >= FreeArea && buf < End) { - Y_VERIFY_DEBUG(buf == FreeArea); - FreeArea = const_cast<char*>(static_cast<const char*>(buf)) + size; - Y_VERIFY_DEBUG(FreeArea <= End); - } - } + TTcpPacketHeader_v2 header{ + confirm, + serial, + 0, + static_cast<ui16>(DataSize) + }; - void Undo(size_t size) { - Y_VERIFY(Bufs); - auto& buf = Bufs.back(); - Y_VERIFY(buf.Data == FreeArea - buf.Size); - buf.Size -= size; - if (!buf.Size) { - Bufs.pop_back(); - } - FreeArea -= size; - DataSize -= size; - } + if (Checksumming()) { + // pre-write header without checksum for correct checksum calculation + WriteBookmark(NInterconnect::TOutgoingStream::TBookmark(HeaderBookmark), &header, sizeof(header)); - bool DropBufs(size_t& amount) { - while (BufferIndex != Bufs.size()) { - TConstIoVec& item = Bufs[BufferIndex]; - // calculate number of bytes to the end in current buffer - const size_t remain = item.Size - FirstBufferOffset; - if (amount >= remain) { - // vector item completely fits into the received amount, drop it out and switch to next buffer - amount -= remain; - ++BufferIndex; - FirstBufferOffset = 0; - } else { - // adjust first buffer by "amount" bytes forward and reset amount to zero - FirstBufferOffset += amount; - amount = 0; - // return false meaning that we have some more data to send - return false; - } + size_t total = 0; + ui32 checksum = 0; + OutgoingStream.ScanLastBytes(GetFullSize(), [&](TContiguousSpan span) { + checksum = Crc32cExtendMSanCompatible(checksum, span.data(), span.size()); + total += span.size(); + }); + header.Checksum = checksum; + Y_VERIFY(total == sizeof(header) + DataSize, "total# %zu DataSize# %zu GetFullSize# %zu", total, DataSize, + GetFullSize()); } - return true; - } - void ResetBufs() { - BufferIndex = FirstBufferOffset = 0; - TriedWriting = false; + WriteBookmark(std::exchange(HeaderBookmark, {}), &header, sizeof(header)); } - template <typename TVectorType> - void AppendToIoVector(TVectorType& vector, size_t max) { - for (size_t k = BufferIndex, offset = FirstBufferOffset; k != Bufs.size() && vector.size() < max; ++k, offset = 0) { - TConstIoVec v = Bufs[k]; - v.Data = static_cast<const char*>(v.Data) + offset; - v.Size -= offset; - vector.push_back(v); - } + bool Checksumming() const { + return !Params.Encryption; } - void Sign() { - Packet.Header.Checksum = 0; - Packet.Header.PayloadLength = DataSize; - if (!Params.Encryption) { - ui32 sum = 0; - for (const auto& item : Bufs) { - sum = Crc32cExtendMSanCompatible(sum, item.Data, item.Size); - } - Packet.Header.Checksum = sum; - } - } + bool IsFull() const { return GetVirtualFreeAmount() == 0; } + bool IsEmpty() const { return GetDataSize() == 0; } + size_t GetDataSize() const { return DataSize; } + size_t GetFullSize() const { return sizeof(TTcpPacketHeader_v2) + GetDataSize(); } + size_t GetVirtualFreeAmount() const { return TTcpPacketBuf::PacketDataLen - DataSize; } }; diff --git a/library/cpp/actors/interconnect/ut/CMakeLists.darwin-x86_64.txt b/library/cpp/actors/interconnect/ut/CMakeLists.darwin-x86_64.txt index e1e00ed6ad..1b744e906c 100644 --- a/library/cpp/actors/interconnect/ut/CMakeLists.darwin-x86_64.txt +++ b/library/cpp/actors/interconnect/ut/CMakeLists.darwin-x86_64.txt @@ -35,6 +35,7 @@ target_sources(library-cpp-actors-interconnect-ut PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/interconnect_ut.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/large.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/outgoing_stream_ut.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp ) diff --git a/library/cpp/actors/interconnect/ut/CMakeLists.linux-aarch64.txt b/library/cpp/actors/interconnect/ut/CMakeLists.linux-aarch64.txt index a869318b87..b77f4f4e6c 100644 --- a/library/cpp/actors/interconnect/ut/CMakeLists.linux-aarch64.txt +++ b/library/cpp/actors/interconnect/ut/CMakeLists.linux-aarch64.txt @@ -38,6 +38,7 @@ target_sources(library-cpp-actors-interconnect-ut PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/interconnect_ut.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/large.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/outgoing_stream_ut.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp ) diff --git a/library/cpp/actors/interconnect/ut/CMakeLists.linux-x86_64.txt b/library/cpp/actors/interconnect/ut/CMakeLists.linux-x86_64.txt index 479e70158a..96489d5e6b 100644 --- a/library/cpp/actors/interconnect/ut/CMakeLists.linux-x86_64.txt +++ b/library/cpp/actors/interconnect/ut/CMakeLists.linux-x86_64.txt @@ -39,6 +39,7 @@ target_sources(library-cpp-actors-interconnect-ut PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/interconnect_ut.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/large.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/outgoing_stream_ut.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp ) diff --git a/library/cpp/actors/interconnect/ut/CMakeLists.windows-x86_64.txt b/library/cpp/actors/interconnect/ut/CMakeLists.windows-x86_64.txt index 61244b1f9d..ddae9008cf 100644 --- a/library/cpp/actors/interconnect/ut/CMakeLists.windows-x86_64.txt +++ b/library/cpp/actors/interconnect/ut/CMakeLists.windows-x86_64.txt @@ -28,6 +28,7 @@ target_sources(library-cpp-actors-interconnect-ut PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/interconnect_ut.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/large.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/outgoing_stream_ut.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/poller_actor_ut.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/interconnect/ut/dynamic_proxy_ut.cpp ) diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp index 32c8237b59..2de8dce457 100644 --- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp +++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp @@ -43,8 +43,10 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) { std::deque<std::map<ui16, ui32>> window; + NInterconnect::TOutgoingStream stream; + for (; numEvents; ++step) { - TTcpPacketOutTask task(p); + TTcpPacketOutTask task(p, stream); if (step == 100) { for (ui32 i = 0; i < 200; ++i) { diff --git a/library/cpp/actors/interconnect/ut/lib/node.h b/library/cpp/actors/interconnect/ut/lib/node.h index 0b538cdb1c..e15baf639e 100644 --- a/library/cpp/actors/interconnect/ut/lib/node.h +++ b/library/cpp/actors/interconnect/ut/lib/node.h @@ -2,6 +2,7 @@ #include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/actors/core/executor_pool_basic.h> +#include <library/cpp/actors/core/executor_pool_io.h> #include <library/cpp/actors/core/scheduler_basic.h> #include <library/cpp/actors/core/mailbox.h> #include <library/cpp/actors/dnsresolver/dnsresolver.h> @@ -24,11 +25,10 @@ public: TIntrusivePtr<NLog::TSettings> loggerSettings = nullptr) { TActorSystemSetup setup; setup.NodeId = nodeId; - setup.ExecutorsCount = 1; + setup.ExecutorsCount = 2; setup.Executors.Reset(new TAutoPtr<IExecutorPool>[setup.ExecutorsCount]); - for (ui32 i = 0; i < setup.ExecutorsCount; ++i) { - setup.Executors[i].Reset(new TBasicExecutorPool(i, numThreads, 20 /* magic number */)); - } + setup.Executors[0].Reset(new TBasicExecutorPool(0, numThreads, 20 /* magic number */)); + setup.Executors[1].Reset(new TIOExecutorPool(1, 1)); setup.Scheduler.Reset(new TBasicSchedulerThread()); const ui32 interconnectPoolId = 0; @@ -108,8 +108,7 @@ public: // register logger setup.LocalServices.emplace_back(loggerActorId, TActorSetupCmd(new TLoggerActor(loggerSettings, CreateStderrBackend(), counters->GetSubgroup("subsystem", "logger")), - TMailboxType::ReadAsFilled, interconnectPoolId)); - + TMailboxType::ReadAsFilled, 1)); if (common->OutgoingHandshakeInflightLimit) { // create handshake broker actor diff --git a/library/cpp/actors/interconnect/ut/outgoing_stream_ut.cpp b/library/cpp/actors/interconnect/ut/outgoing_stream_ut.cpp new file mode 100644 index 0000000000..14fcf25c2c --- /dev/null +++ b/library/cpp/actors/interconnect/ut/outgoing_stream_ut.cpp @@ -0,0 +1,147 @@ +#include <library/cpp/actors/interconnect/outgoing_stream.h> +#include <library/cpp/testing/unittest/registar.h> +#include <util/random/entropy.h> +#include <util/stream/null.h> + +#define Ctest Cnull + +Y_UNIT_TEST_SUITE(OutgoingStream) { + Y_UNIT_TEST(Basic) { + std::vector<char> buffer; + buffer.resize(4 << 20); + + TReallyFastRng32 rng(EntropyPool()); + for (char *p = buffer.data(); p != buffer.data() + buffer.size(); p += sizeof(ui32)) { + *reinterpret_cast<ui32*>(p) = rng(); + } + + for (ui32 nIter = 0; nIter < 10; ++nIter) { + Cerr << "nIter# " << nIter << Endl; + + size_t base = 0; // number of dropped bytes + size_t sendOffset = 0; // offset to base + size_t pending = 0; // number of bytes in queue + + NInterconnect::TOutgoingStreamT<4096> stream; + + size_t numRewindsRemain = 10; + + while (base != buffer.size()) { + const size_t bytesToEnd = buffer.size() - (base + sendOffset); + + Ctest << "base# " << base << " sendOffset# " << sendOffset << " pending# " << pending + << " bytesToEnd# " << bytesToEnd; + + UNIT_ASSERT_VALUES_EQUAL(stream.CalculateOutgoingSize(), pending + sendOffset); + UNIT_ASSERT_VALUES_EQUAL(stream.CalculateUnsentSize(), pending); + + const size_t maxBuffers = 128; + std::vector<NActors::TConstIoVec> iov; + stream.ProduceIoVec(iov, maxBuffers); + size_t offset = base + sendOffset; + for (const auto& [ptr, len] : iov) { + UNIT_ASSERT(memcmp(buffer.data() + offset, ptr, len) == 0); + offset += len; + } + UNIT_ASSERT(iov.size() == maxBuffers || offset == base + sendOffset + pending); + + const char *nextData = buffer.data() + base + sendOffset + pending; + const size_t nextDataMaxLen = bytesToEnd - pending; + const size_t nextDataLen = nextDataMaxLen ? rng() % Min<size_t>(16384, nextDataMaxLen) + 1 : 0; + + if (size_t bytesToScan = sendOffset + pending) { + bytesToScan = rng() % bytesToScan + 1; + size_t offset = base + sendOffset + pending - bytesToScan; + stream.ScanLastBytes(bytesToScan, [&](TContiguousSpan span) { + UNIT_ASSERT(offset + span.size() <= base + sendOffset + pending); + UNIT_ASSERT(memcmp(buffer.data() + offset, span.data(), span.size()) == 0); + offset += span.size(); + }); + UNIT_ASSERT_VALUES_EQUAL(offset, base + sendOffset + pending); + } + + enum class EAction { + COPY_APPEND, + WRITE, + REF_APPEND, + ADVANCE, + REWIND, + DROP, + BOOKMARK + }; + + std::vector<EAction> actions; + if (nextDataLen) { + actions.push_back(EAction::COPY_APPEND); + actions.push_back(EAction::WRITE); + actions.push_back(EAction::REF_APPEND); + actions.push_back(EAction::BOOKMARK); + } + if (numRewindsRemain && sendOffset > 65536) { + actions.push_back(EAction::REWIND); + } + actions.push_back(EAction::ADVANCE); + actions.push_back(EAction::DROP); + + switch (actions[rng() % actions.size()]) { + case EAction::COPY_APPEND: { + Ctest << " COPY_APPEND nextDataLen# " << nextDataLen; + auto span = stream.AcquireSpanForWriting(nextDataLen); + UNIT_ASSERT(span.size() != 0); + memcpy(span.data(), nextData, span.size()); + stream.Append(span); + pending += span.size(); + break; + } + + case EAction::WRITE: + Ctest << " WRITE nextDataLen# " << nextDataLen; + stream.Write({nextData, nextDataLen}); + pending += nextDataLen; + break; + + case EAction::REF_APPEND: + Ctest << " REF_APPEND nextDataLen# " << nextDataLen; + stream.Append({nextData, nextDataLen}); + pending += nextDataLen; + break; + + case EAction::ADVANCE: { + const size_t advance = rng() % Min<size_t>(4096, pending + 1); + Ctest << " ADVANCE advance# " << advance; + stream.Advance(advance); + sendOffset += advance; + pending -= advance; + break; + } + + case EAction::REWIND: + Ctest << " REWIND"; + stream.Rewind(); + pending += sendOffset; + sendOffset = 0; + --numRewindsRemain; + break; + + case EAction::DROP: { + const size_t drop = rng() % Min<size_t>(65536, sendOffset + 1); + Ctest << " DROP drop# " << drop; + stream.DropFront(drop); + base += drop; + sendOffset -= drop; + break; + } + + case EAction::BOOKMARK: + Ctest << " BOOKMARK nextDataLen# " << nextDataLen; + auto bookmark = stream.Bookmark(nextDataLen); + stream.WriteBookmark(std::move(bookmark), {nextData, nextDataLen}); + pending += nextDataLen; + break; + } + + Ctest << Endl; + } + } + } +} |