diff options
author | alexvru <alexvru@ydb.tech> | 2023-05-03 13:36:18 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-05-03 13:36:18 +0300 |
commit | 2f1ea1c6577a0a24496ef74f28a9175e50eff1d0 (patch) | |
tree | 253245039ff4b30c0449de81a51340c1acb6b353 | |
parent | 6d2b034c88f51bebae7f6dff359e503174d39ffd (diff) | |
download | ydb-2f1ea1c6577a0a24496ef74f28a9175e50eff1d0.tar.gz |
Some IC fixes
9 files changed, 195 insertions, 134 deletions
diff --git a/library/cpp/actors/core/event_pb.cpp b/library/cpp/actors/core/event_pb.cpp index 7b35f07e01..f5a50cba83 100644 --- a/library/cpp/actors/core/event_pb.cpp +++ b/library/cpp/actors/core/event_pb.cpp @@ -77,24 +77,20 @@ namespace NActors { if (CancelFlag || AbortFlag) { return false; } else if (const size_t bytesToAppend = Min<size_t>(size, SizeRemain)) { - if ((reinterpret_cast<uintptr_t>(data) & 63) + bytesToAppend <= 2 * 64 && + if ((reinterpret_cast<uintptr_t>(data) & 63) + bytesToAppend <= 64 && (NumChunks == 0 || data != Chunks[NumChunks - 1].first + Chunks[NumChunks - 1].second)) { memcpy(BufferPtr, data, bytesToAppend); - if (!Produce(BufferPtr, bytesToAppend)) { return false; } - BufferPtr += bytesToAppend; - data = static_cast<const char*>(data) + bytesToAppend; - size -= bytesToAppend; } else { if (!Produce(data, bytesToAppend)) { return false; } - data = static_cast<const char*>(data) + bytesToAppend; - size -= bytesToAppend; } + data = static_cast<const char*>(data) + bytesToAppend; + size -= bytesToAppend; } else { InnerContext.SwitchTo(BufFeedContext); } diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp index 232f440cad..374d11935e 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.cpp +++ b/library/cpp/actors/interconnect/interconnect_channel.cpp @@ -46,8 +46,8 @@ namespace NActors { }; // append them to the packet - task.Write(false, &part, sizeof(part)); - task.Write(false, &descr, sizeof(descr)); + task.Write<false>(&part, sizeof(part)); + task.Write<false>(&descr, sizeof(descr)); *weightConsumed += amount; OutputQueueSize -= sizeof(TEventDescr2); @@ -75,7 +75,6 @@ namespace NActors { State = EState::BODY; Iter = event.Buffer->GetBeginIter(); SerializationInfo = &event.Buffer->GetSerializationInfo(); - EventInExternalDataChannel = !SerializationInfo->Sections.empty() && Params.UseExternalDataChannel; } else if (event.Event) { State = EState::BODY; IEventBase *base = event.Event.Get(); @@ -84,13 +83,12 @@ namespace NActors { } SerializationInfoContainer = base->CreateSerializationInfo(); SerializationInfo = &SerializationInfoContainer; - EventInExternalDataChannel = !SerializationInfo->Sections.empty() && Params.UseExternalDataChannel; } else { // event without buffer and IEventBase instance State = EState::DESCRIPTOR; SerializationInfoContainer = {}; SerializationInfo = &SerializationInfoContainer; - EventInExternalDataChannel = false; } + EventInExternalDataChannel = Params.UseExternalDataChannel && !SerializationInfo->Sections.empty(); if (!event.EventSerializedSize) { State = EState::DESCRIPTOR; } else if (EventInExternalDataChannel) { @@ -157,8 +155,8 @@ namespace NActors { .ChannelFlags = static_cast<ui16>(ChannelId | TChannelPart::XdcFlag), .Size = static_cast<ui16>(XdcData.size()) }; - task.Write(false, &part, sizeof(part)); - task.Write(false, XdcData.data(), XdcData.size()); + task.Write<false>(&part, sizeof(part)); + task.Write<false>(XdcData.data(), XdcData.size()); XdcData.clear(); if (SectionIndex == SerializationInfo->Sections.size()) { @@ -171,13 +169,14 @@ namespace NActors { } } - bool TEventOutputChannel::SerializeEvent(TTcpPacketOutTask& task, TEventHolder& event, bool external, size_t *bytesSerialized) { + template<bool External> + bool TEventOutputChannel::SerializeEvent(TTcpPacketOutTask& task, TEventHolder& event, size_t *bytesSerialized) { auto addChunk = [&](const void *data, size_t len, bool allowCopy) { event.UpdateChecksum(data, len); - if (allowCopy && (reinterpret_cast<uintptr_t>(data) & 63) + len <= 2 * 64) { - task.Write(external, data, len); + if (allowCopy && (reinterpret_cast<uintptr_t>(data) & 63) + len <= 64) { + task.Write<External>(data, len); } else { - task.Append(external, data, len); + task.Append<External>(data, len); } *bytesSerialized += len; @@ -190,7 +189,7 @@ namespace NActors { bool complete = false; if (event.Event) { while (!complete) { - TMutableContiguousSpan out = task.AcquireSpanForWriting(external); + TMutableContiguousSpan out = task.AcquireSpanForWriting<External>(); if (!out.size()) { break; } @@ -204,7 +203,7 @@ namespace NActors { } } } else if (event.Buffer) { - while (const size_t numb = Min(external ? task.GetExternalFreeAmount() : task.GetInternalFreeAmount(), + while (const size_t numb = Min(External ? task.GetExternalFreeAmount() : task.GetInternalFreeAmount(), Iter.ContiguousSize())) { const char *obuf = Iter.ContiguousData(); addChunk(obuf, numb, true); @@ -214,11 +213,9 @@ namespace NActors { } else { Y_FAIL(); } - if (complete) { - Y_VERIFY(event.EventActuallySerialized == event.EventSerializedSize, - "EventActuallySerialized# %" PRIu32 " EventSerializedSize# %" PRIu32 " Type# 0x%08" PRIx32, - event.EventActuallySerialized, event.EventSerializedSize, event.Descr.Type); - } + Y_VERIFY(!complete || event.EventActuallySerialized == event.EventSerializedSize, + "EventActuallySerialized# %" PRIu32 " EventSerializedSize# %" PRIu32 " Type# 0x%08" PRIx32, + event.EventActuallySerialized, event.EventSerializedSize, event.Descr.Type); return complete; } @@ -237,7 +234,7 @@ namespace NActors { auto partBookmark = task.Bookmark(sizeof(TChannelPart)); size_t bytesSerialized = 0; - const bool complete = SerializeEvent(task, event, false, &bytesSerialized); + const bool complete = SerializeEvent<false>(task, event, &bytesSerialized); Y_VERIFY_DEBUG(bytesSerialized); Y_VERIFY(bytesSerialized <= Max<ui16>()); @@ -247,7 +244,7 @@ namespace NActors { .Size = static_cast<ui16>(bytesSerialized) }; - task.WriteBookmark(std::exchange(partBookmark, {}), &part, sizeof(part)); + task.WriteBookmark(std::move(partBookmark), &part, sizeof(part)); *weightConsumed += sizeof(TChannelPart) + part.Size; OutputQueueSize -= part.Size; @@ -263,7 +260,7 @@ namespace NActors { auto partBookmark = task.Bookmark(partSize); size_t bytesSerialized = 0; - const bool complete = SerializeEvent(task, event, true, &bytesSerialized); + const bool complete = SerializeEvent<true>(task, event, &bytesSerialized); Y_VERIFY(0 < bytesSerialized && bytesSerialized <= Max<ui16>()); diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h index 48074b05b9..2c5a286fe8 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.h +++ b/library/cpp/actors/interconnect/interconnect_channel.h @@ -137,7 +137,8 @@ namespace NActors { size_t SectionIndex = 0; std::vector<char> XdcData; - bool SerializeEvent(TTcpPacketOutTask& task, TEventHolder& event, bool external, size_t *bytesSerialized); + template<bool External> + bool SerializeEvent(TTcpPacketOutTask& task, TEventHolder& event, size_t *bytesSerialized); bool FeedPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); bool FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index edcffea380..43253a2c5f 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -218,6 +218,7 @@ namespace NActors { // we have hit processing time limit for this message, send notification to resume processing a bit later TActivationContext::Send(new IEventHandle(EvResumeReceiveData, 0, SelfId(), {}, nullptr, 0)); enoughCpu = false; + ++CpuStarvationEvents; break; } @@ -639,15 +640,11 @@ namespace NActors { LWPROBE_IF_TOO_LONG(SlowICReadFromSocket, ms) { do { const ui64 begin = GetCycleCountFast(); -#ifndef _win_ if (num == 1) { recvres = socket.Recv(iov->Data, iov->Size, &err); } else { recvres = socket.ReadV(reinterpret_cast<const iovec*>(iov), num); } -#else - recvres = socket.Recv(iov->Data, iov->Size, &err); -#endif const ui64 end = GetCycleCountFast(); Metrics->IncRecvSyscalls((end - begin) * 1'000'000 / GetCyclesPerMillisecond()); } while (recvres == -EINTR); @@ -676,7 +673,7 @@ namespace NActors { bool TInputSessionTCP::ReadMore() { PreallocateBuffers(); - TStackVec<TIoVec, 16> buffs; + TStackVec<TIoVec, MaxBuffers> buffs; size_t offset = FirstBufferOffset; for (const auto& item : Buffers) { TIoVec iov{item->GetBuffer() + offset, item->GetCapacity() - offset}; @@ -684,6 +681,9 @@ namespace NActors { if (Params.Encryption) { break; // do not put more than one buffer in queue to prevent using ReadV } +#ifdef _win_ + break; // do the same thing for Windows build +#endif offset = 0; } @@ -998,6 +998,7 @@ namespace NActors { MON_VAR(BytesReadFromXdcSocket) MON_VAR(XdcSections) MON_VAR(XdcRefs) + MON_VAR(CpuStarvationEvents) MON_VAR(PayloadSize) MON_VAR(InboundPacketQ.size()) diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index 487a598ebc..0c592d236b 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -166,7 +166,7 @@ namespace NActors { // we can't issue more traffic now; GenerateTraffic will be called upon unblocking } else if (TotalOutputQueueSize >= 64 * 1024) { // output queue size is quite big to issue some traffic - GenerateTraffic(); + IssueRam(); } else if (!RamInQueue) { Y_VERIFY_DEBUG(NumEventsInReadyChannels == 1); RamInQueue = new TEvRam(true); @@ -282,17 +282,11 @@ namespace NActors { OutgoingOffset = XdcOffset = Max<size_t>(); DropConfirmed(nextPacket); OutgoingStream.Rewind(); + OutOfBandStream = {}; XdcStream.Rewind(); OutgoingOffset = XdcOffset = 0; - ui64 serial = Max<ui64>(); - for (const auto& packet : SendQueue) { - if (packet.Data) { - serial = packet.Serial; - break; - } - } - + const ui64 serial = OutputCounter - SendQueue.size() + 1; Y_VERIFY(serial > LastConfirmed, "%s serial# %" PRIu64 " LastConfirmed# %" PRIu64, LogPrefix.data(), serial, LastConfirmed); LOG_DEBUG_IC_SESSION("ICS06", "rewind SendQueue size# %zu LastConfirmed# %" PRIu64 " NextSerial# %" PRIu64, SendQueue.size(), LastConfirmed, serial); @@ -302,7 +296,7 @@ namespace NActors { LastHandshakeDone = TActivationContext::Now(); RamInQueue = nullptr; - GenerateTraffic(); + IssueRam(); } void TInterconnectSessionTCP::Handle(TEvUpdateFromInputSession::TPtr& ev) { @@ -339,7 +333,7 @@ namespace NActors { // generate more traffic if we have unblocked state now if (unblockedSomething) { LWPROBE(UnblockByDropConfirmed, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - ev->SendTime) * 1000.0); - GenerateTraffic(); + IssueRam(); } // if we haven't generated any packets, then make a lone Flush packet without any data @@ -375,6 +369,15 @@ namespace NActors { } } + void TInterconnectSessionTCP::IssueRam() { + if (!RamInQueue || (RamInQueue->Batching && Proxy->Common->Settings.BatchPeriod != TDuration::Zero())) { + RamInQueue = new TEvRam(false); + Send(SelfId(), RamInQueue); + LWPROBE(StartRam, Proxy->PeerNodeId); + RamStartedCycles = GetCycleCountFast(); + } + } + void TInterconnectSessionTCP::HandleRam(TEvRam::TPtr& ev) { if (ev->Get() == RamInQueue) { LWPROBE(FinishRam, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - ev->SendTime) * 1000.0); @@ -387,26 +390,13 @@ namespace NActors { // generate ping request, if needed IssuePingRequest(); - if (RamInQueue && !RamInQueue->Batching) { - LWPROBE(SkipGenerateTraffic, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - RamStartedCycles) * 1000.0); - return; // we'll do it a bit later - } else { - RamInQueue = nullptr; - } - LOG_DEBUG_IC_SESSION("ICS19", "GenerateTraffic"); - // There is a tradeoff between fairness and efficiency. - // The less traffic is generated here, the less buffering is after fair scheduler, - // the more fair system is, the less latency is present. - // The more traffic is generated here, the less syscalls and actor-system overhead occurs, - // the less cpu is consumed. - static const ui64 generateLimit = 64 * 1024; - const ui64 sizeBefore = TotalOutputQueueSize; ui32 generatedPackets = 0; ui64 generatedBytes = 0; ui64 generateStarted = GetCycleCountFast(); + bool enoughCpu = true; // apply traffic changes auto accountTraffic = [&] { ChannelScheduler->ForEach([](TEventOutputChannel& channel) { channel.AccountTraffic(); }); }; @@ -415,13 +405,14 @@ namespace NActors { // of events in channels queues and in flight fitting into requested limit; after we hit one of these conditions // we exit cycle if (Socket) { + TTimeLimit limit(GetMaxCyclesPerEvent()); + while (NumEventsInReadyChannels && InflightDataAmount < GetTotalInflightAmountOfData()) { - if (generatedBytes >= generateLimit) { + if (generatedPackets && limit.CheckExceeded()) { // resume later but ensure that we have issued at least one packet - RamInQueue = new TEvRam(false); - Send(SelfId(), RamInQueue); - RamStartedCycles = GetCycleCountFast(); - LWPROBE(StartRam, Proxy->PeerNodeId); + IssueRam(); + enoughCpu = false; + ++CpuStarvationEvents; break; } @@ -437,7 +428,7 @@ namespace NActors { } } - SetEnoughCpu(generatedBytes < generateLimit); + SetEnoughCpu(enoughCpu); if (Socket) { WriteData(); @@ -529,10 +520,10 @@ namespace NActors { bool readPending = false; if (msg->Socket == Socket) { - useful = ReceiveContext->MainWriteBlocked; + useful = std::exchange(ReceiveContext->MainWriteBlocked, false); readPending = ReceiveContext->MainReadPending; } else if (msg->Socket == XdcSocket) { - useful = ReceiveContext->XdcWriteBlocked; + useful = std::exchange(ReceiveContext->XdcWriteBlocked, false); readPending = ReceiveContext->XdcReadPending; } @@ -542,11 +533,11 @@ namespace NActors { Proxy->Metrics->IncSpuriousWriteWakeups(); } - GenerateTraffic(); - if (Params.Encryption && readPending && ev->Sender != ReceiverId) { Send(ReceiverId, ev->Release().Release()); } + + IssueRam(); } void TInterconnectSessionTCP::Handle(TEvPollerRegisterResult::TPtr ev) { @@ -565,56 +556,101 @@ namespace NActors { } } - void TInterconnectSessionTCP::WriteData() { - Y_VERIFY(Socket); // ensure that socket wasn't closed + void TInterconnectSessionTCP::HandleWriteData() { + Y_VERIFY(WriteDataInFlight); + WriteDataInFlight = false; + if (!Socket) { + return; + } + + // total bytes written during this call + ui64 written = 0; auto process = [&](NInterconnect::TOutgoingStream& stream, const TIntrusivePtr<NInterconnect::TStreamSocket>& socket, - const TPollerToken::TPtr& token, bool *writeBlocked) { + const TPollerToken::TPtr& token, bool *writeBlocked, size_t maxBytes) { size_t totalWritten = 0; - *writeBlocked = false; - - while (stream && socket) { - ssize_t r = Write(stream, *socket); - if (r == -1) { + if (stream && socket && !*writeBlocked && maxBytes) { + if (const ssize_t r = Write(stream, *socket, maxBytes); r > 0) { + stream.Advance(r); + totalWritten += r; + } else if (r == -1) { *writeBlocked = true; if (token) { socket->Request(*token, false, true); } - break; } else if (r == 0) { - break; // error condition + // error condition + } else { + Y_UNREACHABLE(); } - - stream.Advance(r); - totalWritten += r; } + written += totalWritten; return totalWritten; }; - // total bytes written during this call - ui64 written = 0; + TTimeLimit limit(GetMaxCyclesPerEvent()); - if (const size_t w = process(OutgoingStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked)) { - written += w; - BytesWrittenToSocket += w; - OutgoingOffset += w; - } + for (;;) { + bool progress = false; + + size_t bytesToSendInMain = Max<size_t>(); - if (const size_t w = process(XdcStream, XdcSocket, XdcPollerToken, &ReceiveContext->XdcWriteBlocked)) { - written += w; - XdcBytesSent += w; - XdcOffset += w; + if (OutOfBandStream) { + bytesToSendInMain = 0; + size_t offset = OutgoingOffset; + for (const TOutgoingPacket& packet : SendQueue) { + if (!offset) { + break; + } else if (offset < packet.PacketSize) { + bytesToSendInMain = packet.PacketSize - offset; + break; + } else { + offset -= packet.PacketSize; + } + } + } + + if (const size_t w = process(OutgoingStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked, bytesToSendInMain)) { + BytesWrittenToSocket += w; + OutgoingOffset += w; + progress = true; + bytesToSendInMain -= w; + if (OutOfBandStream) { + BytesAlignedForOutOfBand += w; + } + } + + if (!bytesToSendInMain) { + if (const size_t w = process(OutOfBandStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked, Max<size_t>())) { + BytesWrittenToSocket += w; + OutOfBandBytesSent += w; + progress = true; + } + } + + if (const size_t w = process(XdcStream, XdcSocket, XdcPollerToken, &ReceiveContext->XdcWriteBlocked, Max<size_t>())) { + XdcBytesSent += w; + XdcOffset += w; + progress = true; + } + + if (!progress) { + break; + } else if (limit.CheckExceeded()) { + WriteData(); + ++CpuStarvationEventsOnWriteData; + break; + } } if (written) { Proxy->Metrics->AddTotalBytesWritten(written); } - if (DropConfirmed(LastConfirmed) && !RamInQueue) { // issue GenerateTraffic a bit later - RamInQueue = new TEvRam(false); - Send(SelfId(), RamInQueue); + if (DropConfirmed(LastConfirmed)) { // issue GenerateTraffic a bit later + IssueRam(); } const bool writeBlockedByFullSendBuffer = ReceiveContext->MainWriteBlocked || ReceiveContext->XdcWriteBlocked; @@ -627,7 +663,15 @@ namespace NActors { WriteBlockedByFullSendBuffer = writeBlockedByFullSendBuffer; } - ssize_t TInterconnectSessionTCP::Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket) { + void TInterconnectSessionTCP::WriteData() { + if (!WriteDataInFlight) { + WriteDataInFlight = true; + TActivationContext::Send(new IEventHandle(EvWriteData, 0, SelfId(), {}, nullptr, 0)); + } + } + + ssize_t TInterconnectSessionTCP::Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket, + size_t maxBytes) { LWPROBE_IF_TOO_LONG(SlowICWriteData, Proxy->PeerNodeId, ms) { constexpr ui32 iovLimit = 256; @@ -646,7 +690,7 @@ namespace NActors { TStackVec<TConstIoVec, iovLimit> wbuffers; - stream.ProduceIoVec(wbuffers, maxElementsInIOV); + stream.ProduceIoVec(wbuffers, maxElementsInIOV, maxBytes); Y_VERIFY(!wbuffers.empty()); TString err; @@ -731,12 +775,14 @@ namespace NActors { } ui64 TInterconnectSessionTCP::MakePacket(bool data, TMaybe<ui64> pingMask) { + NInterconnect::TOutgoingStream& stream = data ? OutgoingStream : OutOfBandStream; + #ifndef NDEBUG - const size_t outgoingStreamSizeBefore = OutgoingStream.CalculateOutgoingSize(); + const size_t outgoingStreamSizeBefore = stream.CalculateOutgoingSize(); const size_t xdcStreamSizeBefore = XdcStream.CalculateOutgoingSize(); #endif - TTcpPacketOutTask packet(Params, OutgoingStream, XdcStream); + TTcpPacketOutTask packet(Params, stream, XdcStream); ui64 serial = 0; if (data) { @@ -775,7 +821,7 @@ namespace NActors { const size_t packetSize = packet.GetPacketSize(); #ifndef NDEBUG - const size_t outgoingStreamSizeAfter = OutgoingStream.CalculateOutgoingSize(); + const size_t outgoingStreamSizeAfter = stream.CalculateOutgoingSize(); const size_t xdcStreamSizeAfter = XdcStream.CalculateOutgoingSize(); Y_VERIFY(outgoingStreamSizeAfter == outgoingStreamSizeBefore + packetSize && @@ -787,12 +833,12 @@ namespace NActors { #endif // put outgoing packet metadata here - SendQueue.push_back(TOutgoingPacket{ - static_cast<ui32>(packetSize), - static_cast<ui32>(packet.GetExternalSize()), - serial, - data - }); + if (data) { + SendQueue.push_back(TOutgoingPacket{ + static_cast<ui32>(packetSize), + static_cast<ui32>(packet.GetExternalSize()) + }); + } LOG_DEBUG_IC_SESSION("ICS22", "outgoing packet Serial# %" PRIu64 " Confirm# %" PRIu64 " DataSize# %zu" " InflightDataAmount# %" PRIu64, serial, lastInputSerial, packet.GetDataSize(), InflightDataAmount); @@ -820,17 +866,17 @@ namespace NActors { // making Serial <= confirm true size_t bytesDropped = 0; size_t bytesDroppedFromXdc = 0; - for (; !SendQueue.empty(); SendQueue.pop_front()) { - auto& front = SendQueue.front(); - if (front.Data && confirm < front.Serial) { + ui64 frontPacketSerial = OutputCounter - SendQueue.size() + 1; + for (; !SendQueue.empty(); SendQueue.pop_front(), ++frontPacketSerial) { + if (confirm < frontPacketSerial) { break; } + + auto& front = SendQueue.front(); if (OutgoingOffset < front.PacketSize || XdcOffset < front.ExternalSize) { break; // packet wasn't actually sent to receiver, can't drop it now } - if (front.Data) { - lastDroppedSerial.emplace(front.Serial); - } + lastDroppedSerial.emplace(frontPacketSerial); OutgoingOffset -= front.PacketSize; XdcOffset -= front.ExternalSize; bytesDropped += front.PacketSize; @@ -1231,11 +1277,20 @@ namespace NActors { MON_VAR(OutgoingStream.GetSendQueueSize()) MON_VAR(OutgoingOffset) + MON_VAR(OutOfBandStream.CalculateOutgoingSize()) + MON_VAR(OutOfBandStream.CalculateUnsentSize()) + MON_VAR(OutOfBandStream.GetSendQueueSize()) + MON_VAR(BytesAlignedForOutOfBand) + MON_VAR(OutOfBandBytesSent) + MON_VAR(XdcStream.CalculateOutgoingSize()) MON_VAR(XdcStream.CalculateUnsentSize()) MON_VAR(XdcStream.GetSendQueueSize()) MON_VAR(XdcOffset) + MON_VAR(CpuStarvationEvents) + MON_VAR(CpuStarvationEventsOnWriteData) + TString clockSkew; i64 x = GetClockSkew(); if (x < 0) { diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index f07cbfa4eb..6c76fa8e3c 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -355,6 +355,8 @@ namespace NActors { ui64 XdcSections = 0; ui64 XdcRefs = 0; + ui64 CpuStarvationEvents = 0; + void GenerateHttpInfo(NMon::TEvHttpInfoRes::TPtr ev); }; @@ -368,6 +370,7 @@ namespace NActors { EvRam, EvTerminate, EvFreeItems, + EvWriteData, }; struct TEvCheckCloseOnIdle : TEventLocal<TEvCheckCloseOnIdle, EvCheckCloseOnIdle> {}; @@ -393,6 +396,10 @@ namespace NActors { ui64 PacketsGenerated = 0; ui64 BytesWrittenToSocket = 0; ui64 PacketsConfirmed = 0; + ui64 BytesAlignedForOutOfBand = 0; + ui64 OutOfBandBytesSent = 0; + ui64 CpuStarvationEvents = 0; + ui64 CpuStarvationEventsOnWriteData = 0; public: static constexpr EActivityType ActorActivityType() { @@ -446,6 +453,7 @@ namespace NActors { hFunc(TEvSocketDisconnect, OnDisconnect) hFunc(TEvTerminate, Handle) hFunc(TEvProcessPingRequest, Handle) + cFunc(EvWriteData, HandleWriteData) ) void Handle(TEvUpdateFromInputSession::TPtr& ev); @@ -457,16 +465,20 @@ namespace NActors { TEvRam* RamInQueue = nullptr; ui64 RamStartedCycles = 0; + void IssueRam(); void HandleRam(TEvRam::TPtr& ev); void GenerateTraffic(); void SendUpdateToWhiteboard(bool connected = true); ui32 CalculateQueueUtilization(); + bool WriteDataInFlight = false; + void Handle(TEvPollerReady::TPtr& ev); void Handle(TEvPollerRegisterResult::TPtr ev); + void HandleWriteData(); void WriteData(); - ssize_t Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket); + ssize_t Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket, size_t maxBytes); ui64 MakePacket(bool data, TMaybe<ui64> pingMask = {}); void FillSendingBuffer(TTcpPacketOutTask& packet, ui64 serial); @@ -528,13 +540,12 @@ namespace NActors { void SwitchStuckPeriod(); NInterconnect::TOutgoingStream OutgoingStream; + NInterconnect::TOutgoingStream OutOfBandStream; NInterconnect::TOutgoingStream XdcStream; struct TOutgoingPacket { ui32 PacketSize; // including header ui32 ExternalSize; - ui64 Serial; - bool Data; }; std::deque<TOutgoingPacket> SendQueue; // packet boundaries size_t OutgoingOffset = 0; diff --git a/library/cpp/actors/interconnect/outgoing_stream.h b/library/cpp/actors/interconnect/outgoing_stream.h index 7c32911807..011ecebb9a 100644 --- a/library/cpp/actors/interconnect/outgoing_stream.h +++ b/library/cpp/actors/interconnect/outgoing_stream.h @@ -137,12 +137,13 @@ namespace NInterconnect { } template<typename T> - void ProduceIoVec(T& container, size_t maxItems) { + void ProduceIoVec(T& container, size_t maxItems, size_t maxBytes) { size_t offset = SendOffset; - for (auto it = SendQueue.begin() + SendQueuePos; it != SendQueue.end() && std::size(container) < maxItems; ++it) { - const TContiguousSpan span = it->Span.SubSpan(offset, Max<size_t>()); + for (auto it = SendQueue.begin() + SendQueuePos; it != SendQueue.end() && std::size(container) < maxItems && maxBytes; ++it) { + const TContiguousSpan span = it->Span.SubSpan(offset, maxBytes); container.push_back(NActors::TConstIoVec{span.data(), span.size()}); offset = 0; + maxBytes -= span.size(); } } diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h index b4d672ee31..2ef72b1c21 100644 --- a/library/cpp/actors/interconnect/packet.h +++ b/library/cpp/actors/interconnect/packet.h @@ -162,8 +162,9 @@ struct TTcpPacketOutTask : TNonCopyable { } // Acquire raw pointer to write some data. - TMutableContiguousSpan AcquireSpanForWriting(bool external) { - if (external) { + template<bool External> + TMutableContiguousSpan AcquireSpanForWriting() { + if (External) { return XdcStream.AcquireSpanForWriting(GetExternalFreeAmount()); } else { return OutgoingStream.AcquireSpanForWriting(GetInternalFreeAmount()); @@ -171,17 +172,19 @@ struct TTcpPacketOutTask : TNonCopyable { } // Append reference to some data (acquired previously or external pointer). - void Append(bool external, const void *buffer, size_t len) { - Y_VERIFY_DEBUG(len <= (external ? GetExternalFreeAmount() : GetInternalFreeAmount())); - (external ? ExternalSize : InternalSize) += len; - (external ? XdcStream : OutgoingStream).Append({static_cast<const char*>(buffer), len}); + template<bool External> + void Append(const void *buffer, size_t len) { + Y_VERIFY_DEBUG(len <= (External ? GetExternalFreeAmount() : GetInternalFreeAmount())); + (External ? ExternalSize : InternalSize) += len; + (External ? XdcStream : OutgoingStream).Append({static_cast<const char*>(buffer), len}); } // Write some data with copying. - void Write(bool external, const void *buffer, size_t len) { - Y_VERIFY_DEBUG(len <= (external ? GetExternalFreeAmount() : GetInternalFreeAmount())); - (external ? ExternalSize : InternalSize) += len; - (external ? XdcStream : OutgoingStream).Write({static_cast<const char*>(buffer), len}); + template<bool External> + void Write(const void *buffer, size_t len) { + Y_VERIFY_DEBUG(len <= (External ? GetExternalFreeAmount() : GetInternalFreeAmount())); + (External ? ExternalSize : InternalSize) += len; + (External ? XdcStream : OutgoingStream).Write({static_cast<const char*>(buffer), len}); } void Finish(ui64 serial, ui64 confirm) { @@ -198,15 +201,11 @@ struct TTcpPacketOutTask : TNonCopyable { // pre-write header without checksum for correct checksum calculation WriteBookmark(NInterconnect::TOutgoingStream::TBookmark(HeaderBookmark), &header, sizeof(header)); - size_t total = 0; ui32 checksum = 0; OutgoingStream.ScanLastBytes(GetPacketSize(), [&](TContiguousSpan span) { checksum = Crc32cExtendMSanCompatible(checksum, span.data(), span.size()); - total += span.size(); }); header.Checksum = checksum; - Y_VERIFY(total == GetPacketSize(), "total# %zu InternalSize# %zu GetPacketSize# %zu", total, InternalSize, - GetPacketSize()); } WriteBookmark(std::exchange(HeaderBookmark, {}), &header, sizeof(header)); diff --git a/library/cpp/actors/interconnect/ut/outgoing_stream_ut.cpp b/library/cpp/actors/interconnect/ut/outgoing_stream_ut.cpp index 14fcf25c2c..4834e48765 100644 --- a/library/cpp/actors/interconnect/ut/outgoing_stream_ut.cpp +++ b/library/cpp/actors/interconnect/ut/outgoing_stream_ut.cpp @@ -37,7 +37,7 @@ Y_UNIT_TEST_SUITE(OutgoingStream) { const size_t maxBuffers = 128; std::vector<NActors::TConstIoVec> iov; - stream.ProduceIoVec(iov, maxBuffers); + stream.ProduceIoVec(iov, maxBuffers, Max<size_t>()); size_t offset = base + sendOffset; for (const auto& [ptr, len] : iov) { UNIT_ASSERT(memcmp(buffer.data() + offset, ptr, len) == 0); |