aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-05-03 13:36:18 +0300
committeralexvru <alexvru@ydb.tech>2023-05-03 13:36:18 +0300
commit2f1ea1c6577a0a24496ef74f28a9175e50eff1d0 (patch)
tree253245039ff4b30c0449de81a51340c1acb6b353
parent6d2b034c88f51bebae7f6dff359e503174d39ffd (diff)
downloadydb-2f1ea1c6577a0a24496ef74f28a9175e50eff1d0.tar.gz
Some IC fixes
-rw-r--r--library/cpp/actors/core/event_pb.cpp10
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.cpp39
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.h3
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp11
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp213
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.h17
-rw-r--r--library/cpp/actors/interconnect/outgoing_stream.h7
-rw-r--r--library/cpp/actors/interconnect/packet.h27
-rw-r--r--library/cpp/actors/interconnect/ut/outgoing_stream_ut.cpp2
9 files changed, 195 insertions, 134 deletions
diff --git a/library/cpp/actors/core/event_pb.cpp b/library/cpp/actors/core/event_pb.cpp
index 7b35f07e01..f5a50cba83 100644
--- a/library/cpp/actors/core/event_pb.cpp
+++ b/library/cpp/actors/core/event_pb.cpp
@@ -77,24 +77,20 @@ namespace NActors {
if (CancelFlag || AbortFlag) {
return false;
} else if (const size_t bytesToAppend = Min<size_t>(size, SizeRemain)) {
- if ((reinterpret_cast<uintptr_t>(data) & 63) + bytesToAppend <= 2 * 64 &&
+ if ((reinterpret_cast<uintptr_t>(data) & 63) + bytesToAppend <= 64 &&
(NumChunks == 0 || data != Chunks[NumChunks - 1].first + Chunks[NumChunks - 1].second)) {
memcpy(BufferPtr, data, bytesToAppend);
-
if (!Produce(BufferPtr, bytesToAppend)) {
return false;
}
-
BufferPtr += bytesToAppend;
- data = static_cast<const char*>(data) + bytesToAppend;
- size -= bytesToAppend;
} else {
if (!Produce(data, bytesToAppend)) {
return false;
}
- data = static_cast<const char*>(data) + bytesToAppend;
- size -= bytesToAppend;
}
+ data = static_cast<const char*>(data) + bytesToAppend;
+ size -= bytesToAppend;
} else {
InnerContext.SwitchTo(BufFeedContext);
}
diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp
index 232f440cad..374d11935e 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.cpp
+++ b/library/cpp/actors/interconnect/interconnect_channel.cpp
@@ -46,8 +46,8 @@ namespace NActors {
};
// append them to the packet
- task.Write(false, &part, sizeof(part));
- task.Write(false, &descr, sizeof(descr));
+ task.Write<false>(&part, sizeof(part));
+ task.Write<false>(&descr, sizeof(descr));
*weightConsumed += amount;
OutputQueueSize -= sizeof(TEventDescr2);
@@ -75,7 +75,6 @@ namespace NActors {
State = EState::BODY;
Iter = event.Buffer->GetBeginIter();
SerializationInfo = &event.Buffer->GetSerializationInfo();
- EventInExternalDataChannel = !SerializationInfo->Sections.empty() && Params.UseExternalDataChannel;
} else if (event.Event) {
State = EState::BODY;
IEventBase *base = event.Event.Get();
@@ -84,13 +83,12 @@ namespace NActors {
}
SerializationInfoContainer = base->CreateSerializationInfo();
SerializationInfo = &SerializationInfoContainer;
- EventInExternalDataChannel = !SerializationInfo->Sections.empty() && Params.UseExternalDataChannel;
} else { // event without buffer and IEventBase instance
State = EState::DESCRIPTOR;
SerializationInfoContainer = {};
SerializationInfo = &SerializationInfoContainer;
- EventInExternalDataChannel = false;
}
+ EventInExternalDataChannel = Params.UseExternalDataChannel && !SerializationInfo->Sections.empty();
if (!event.EventSerializedSize) {
State = EState::DESCRIPTOR;
} else if (EventInExternalDataChannel) {
@@ -157,8 +155,8 @@ namespace NActors {
.ChannelFlags = static_cast<ui16>(ChannelId | TChannelPart::XdcFlag),
.Size = static_cast<ui16>(XdcData.size())
};
- task.Write(false, &part, sizeof(part));
- task.Write(false, XdcData.data(), XdcData.size());
+ task.Write<false>(&part, sizeof(part));
+ task.Write<false>(XdcData.data(), XdcData.size());
XdcData.clear();
if (SectionIndex == SerializationInfo->Sections.size()) {
@@ -171,13 +169,14 @@ namespace NActors {
}
}
- bool TEventOutputChannel::SerializeEvent(TTcpPacketOutTask& task, TEventHolder& event, bool external, size_t *bytesSerialized) {
+ template<bool External>
+ bool TEventOutputChannel::SerializeEvent(TTcpPacketOutTask& task, TEventHolder& event, size_t *bytesSerialized) {
auto addChunk = [&](const void *data, size_t len, bool allowCopy) {
event.UpdateChecksum(data, len);
- if (allowCopy && (reinterpret_cast<uintptr_t>(data) & 63) + len <= 2 * 64) {
- task.Write(external, data, len);
+ if (allowCopy && (reinterpret_cast<uintptr_t>(data) & 63) + len <= 64) {
+ task.Write<External>(data, len);
} else {
- task.Append(external, data, len);
+ task.Append<External>(data, len);
}
*bytesSerialized += len;
@@ -190,7 +189,7 @@ namespace NActors {
bool complete = false;
if (event.Event) {
while (!complete) {
- TMutableContiguousSpan out = task.AcquireSpanForWriting(external);
+ TMutableContiguousSpan out = task.AcquireSpanForWriting<External>();
if (!out.size()) {
break;
}
@@ -204,7 +203,7 @@ namespace NActors {
}
}
} else if (event.Buffer) {
- while (const size_t numb = Min(external ? task.GetExternalFreeAmount() : task.GetInternalFreeAmount(),
+ while (const size_t numb = Min(External ? task.GetExternalFreeAmount() : task.GetInternalFreeAmount(),
Iter.ContiguousSize())) {
const char *obuf = Iter.ContiguousData();
addChunk(obuf, numb, true);
@@ -214,11 +213,9 @@ namespace NActors {
} else {
Y_FAIL();
}
- if (complete) {
- Y_VERIFY(event.EventActuallySerialized == event.EventSerializedSize,
- "EventActuallySerialized# %" PRIu32 " EventSerializedSize# %" PRIu32 " Type# 0x%08" PRIx32,
- event.EventActuallySerialized, event.EventSerializedSize, event.Descr.Type);
- }
+ Y_VERIFY(!complete || event.EventActuallySerialized == event.EventSerializedSize,
+ "EventActuallySerialized# %" PRIu32 " EventSerializedSize# %" PRIu32 " Type# 0x%08" PRIx32,
+ event.EventActuallySerialized, event.EventSerializedSize, event.Descr.Type);
return complete;
}
@@ -237,7 +234,7 @@ namespace NActors {
auto partBookmark = task.Bookmark(sizeof(TChannelPart));
size_t bytesSerialized = 0;
- const bool complete = SerializeEvent(task, event, false, &bytesSerialized);
+ const bool complete = SerializeEvent<false>(task, event, &bytesSerialized);
Y_VERIFY_DEBUG(bytesSerialized);
Y_VERIFY(bytesSerialized <= Max<ui16>());
@@ -247,7 +244,7 @@ namespace NActors {
.Size = static_cast<ui16>(bytesSerialized)
};
- task.WriteBookmark(std::exchange(partBookmark, {}), &part, sizeof(part));
+ task.WriteBookmark(std::move(partBookmark), &part, sizeof(part));
*weightConsumed += sizeof(TChannelPart) + part.Size;
OutputQueueSize -= part.Size;
@@ -263,7 +260,7 @@ namespace NActors {
auto partBookmark = task.Bookmark(partSize);
size_t bytesSerialized = 0;
- const bool complete = SerializeEvent(task, event, true, &bytesSerialized);
+ const bool complete = SerializeEvent<true>(task, event, &bytesSerialized);
Y_VERIFY(0 < bytesSerialized && bytesSerialized <= Max<ui16>());
diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h
index 48074b05b9..2c5a286fe8 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.h
+++ b/library/cpp/actors/interconnect/interconnect_channel.h
@@ -137,7 +137,8 @@ namespace NActors {
size_t SectionIndex = 0;
std::vector<char> XdcData;
- bool SerializeEvent(TTcpPacketOutTask& task, TEventHolder& event, bool external, size_t *bytesSerialized);
+ template<bool External>
+ bool SerializeEvent(TTcpPacketOutTask& task, TEventHolder& event, size_t *bytesSerialized);
bool FeedPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
bool FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
index edcffea380..43253a2c5f 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
@@ -218,6 +218,7 @@ namespace NActors {
// we have hit processing time limit for this message, send notification to resume processing a bit later
TActivationContext::Send(new IEventHandle(EvResumeReceiveData, 0, SelfId(), {}, nullptr, 0));
enoughCpu = false;
+ ++CpuStarvationEvents;
break;
}
@@ -639,15 +640,11 @@ namespace NActors {
LWPROBE_IF_TOO_LONG(SlowICReadFromSocket, ms) {
do {
const ui64 begin = GetCycleCountFast();
-#ifndef _win_
if (num == 1) {
recvres = socket.Recv(iov->Data, iov->Size, &err);
} else {
recvres = socket.ReadV(reinterpret_cast<const iovec*>(iov), num);
}
-#else
- recvres = socket.Recv(iov->Data, iov->Size, &err);
-#endif
const ui64 end = GetCycleCountFast();
Metrics->IncRecvSyscalls((end - begin) * 1'000'000 / GetCyclesPerMillisecond());
} while (recvres == -EINTR);
@@ -676,7 +673,7 @@ namespace NActors {
bool TInputSessionTCP::ReadMore() {
PreallocateBuffers();
- TStackVec<TIoVec, 16> buffs;
+ TStackVec<TIoVec, MaxBuffers> buffs;
size_t offset = FirstBufferOffset;
for (const auto& item : Buffers) {
TIoVec iov{item->GetBuffer() + offset, item->GetCapacity() - offset};
@@ -684,6 +681,9 @@ namespace NActors {
if (Params.Encryption) {
break; // do not put more than one buffer in queue to prevent using ReadV
}
+#ifdef _win_
+ break; // do the same thing for Windows build
+#endif
offset = 0;
}
@@ -998,6 +998,7 @@ namespace NActors {
MON_VAR(BytesReadFromXdcSocket)
MON_VAR(XdcSections)
MON_VAR(XdcRefs)
+ MON_VAR(CpuStarvationEvents)
MON_VAR(PayloadSize)
MON_VAR(InboundPacketQ.size())
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index 487a598ebc..0c592d236b 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -166,7 +166,7 @@ namespace NActors {
// we can't issue more traffic now; GenerateTraffic will be called upon unblocking
} else if (TotalOutputQueueSize >= 64 * 1024) {
// output queue size is quite big to issue some traffic
- GenerateTraffic();
+ IssueRam();
} else if (!RamInQueue) {
Y_VERIFY_DEBUG(NumEventsInReadyChannels == 1);
RamInQueue = new TEvRam(true);
@@ -282,17 +282,11 @@ namespace NActors {
OutgoingOffset = XdcOffset = Max<size_t>();
DropConfirmed(nextPacket);
OutgoingStream.Rewind();
+ OutOfBandStream = {};
XdcStream.Rewind();
OutgoingOffset = XdcOffset = 0;
- ui64 serial = Max<ui64>();
- for (const auto& packet : SendQueue) {
- if (packet.Data) {
- serial = packet.Serial;
- break;
- }
- }
-
+ const ui64 serial = OutputCounter - SendQueue.size() + 1;
Y_VERIFY(serial > LastConfirmed, "%s serial# %" PRIu64 " LastConfirmed# %" PRIu64, LogPrefix.data(), serial, LastConfirmed);
LOG_DEBUG_IC_SESSION("ICS06", "rewind SendQueue size# %zu LastConfirmed# %" PRIu64 " NextSerial# %" PRIu64,
SendQueue.size(), LastConfirmed, serial);
@@ -302,7 +296,7 @@ namespace NActors {
LastHandshakeDone = TActivationContext::Now();
RamInQueue = nullptr;
- GenerateTraffic();
+ IssueRam();
}
void TInterconnectSessionTCP::Handle(TEvUpdateFromInputSession::TPtr& ev) {
@@ -339,7 +333,7 @@ namespace NActors {
// generate more traffic if we have unblocked state now
if (unblockedSomething) {
LWPROBE(UnblockByDropConfirmed, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - ev->SendTime) * 1000.0);
- GenerateTraffic();
+ IssueRam();
}
// if we haven't generated any packets, then make a lone Flush packet without any data
@@ -375,6 +369,15 @@ namespace NActors {
}
}
+ void TInterconnectSessionTCP::IssueRam() {
+ if (!RamInQueue || (RamInQueue->Batching && Proxy->Common->Settings.BatchPeriod != TDuration::Zero())) {
+ RamInQueue = new TEvRam(false);
+ Send(SelfId(), RamInQueue);
+ LWPROBE(StartRam, Proxy->PeerNodeId);
+ RamStartedCycles = GetCycleCountFast();
+ }
+ }
+
void TInterconnectSessionTCP::HandleRam(TEvRam::TPtr& ev) {
if (ev->Get() == RamInQueue) {
LWPROBE(FinishRam, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - ev->SendTime) * 1000.0);
@@ -387,26 +390,13 @@ namespace NActors {
// generate ping request, if needed
IssuePingRequest();
- if (RamInQueue && !RamInQueue->Batching) {
- LWPROBE(SkipGenerateTraffic, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - RamStartedCycles) * 1000.0);
- return; // we'll do it a bit later
- } else {
- RamInQueue = nullptr;
- }
-
LOG_DEBUG_IC_SESSION("ICS19", "GenerateTraffic");
- // There is a tradeoff between fairness and efficiency.
- // The less traffic is generated here, the less buffering is after fair scheduler,
- // the more fair system is, the less latency is present.
- // The more traffic is generated here, the less syscalls and actor-system overhead occurs,
- // the less cpu is consumed.
- static const ui64 generateLimit = 64 * 1024;
-
const ui64 sizeBefore = TotalOutputQueueSize;
ui32 generatedPackets = 0;
ui64 generatedBytes = 0;
ui64 generateStarted = GetCycleCountFast();
+ bool enoughCpu = true;
// apply traffic changes
auto accountTraffic = [&] { ChannelScheduler->ForEach([](TEventOutputChannel& channel) { channel.AccountTraffic(); }); };
@@ -415,13 +405,14 @@ namespace NActors {
// of events in channels queues and in flight fitting into requested limit; after we hit one of these conditions
// we exit cycle
if (Socket) {
+ TTimeLimit limit(GetMaxCyclesPerEvent());
+
while (NumEventsInReadyChannels && InflightDataAmount < GetTotalInflightAmountOfData()) {
- if (generatedBytes >= generateLimit) {
+ if (generatedPackets && limit.CheckExceeded()) {
// resume later but ensure that we have issued at least one packet
- RamInQueue = new TEvRam(false);
- Send(SelfId(), RamInQueue);
- RamStartedCycles = GetCycleCountFast();
- LWPROBE(StartRam, Proxy->PeerNodeId);
+ IssueRam();
+ enoughCpu = false;
+ ++CpuStarvationEvents;
break;
}
@@ -437,7 +428,7 @@ namespace NActors {
}
}
- SetEnoughCpu(generatedBytes < generateLimit);
+ SetEnoughCpu(enoughCpu);
if (Socket) {
WriteData();
@@ -529,10 +520,10 @@ namespace NActors {
bool readPending = false;
if (msg->Socket == Socket) {
- useful = ReceiveContext->MainWriteBlocked;
+ useful = std::exchange(ReceiveContext->MainWriteBlocked, false);
readPending = ReceiveContext->MainReadPending;
} else if (msg->Socket == XdcSocket) {
- useful = ReceiveContext->XdcWriteBlocked;
+ useful = std::exchange(ReceiveContext->XdcWriteBlocked, false);
readPending = ReceiveContext->XdcReadPending;
}
@@ -542,11 +533,11 @@ namespace NActors {
Proxy->Metrics->IncSpuriousWriteWakeups();
}
- GenerateTraffic();
-
if (Params.Encryption && readPending && ev->Sender != ReceiverId) {
Send(ReceiverId, ev->Release().Release());
}
+
+ IssueRam();
}
void TInterconnectSessionTCP::Handle(TEvPollerRegisterResult::TPtr ev) {
@@ -565,56 +556,101 @@ namespace NActors {
}
}
- void TInterconnectSessionTCP::WriteData() {
- Y_VERIFY(Socket); // ensure that socket wasn't closed
+ void TInterconnectSessionTCP::HandleWriteData() {
+ Y_VERIFY(WriteDataInFlight);
+ WriteDataInFlight = false;
+ if (!Socket) {
+ return;
+ }
+
+ // total bytes written during this call
+ ui64 written = 0;
auto process = [&](NInterconnect::TOutgoingStream& stream, const TIntrusivePtr<NInterconnect::TStreamSocket>& socket,
- const TPollerToken::TPtr& token, bool *writeBlocked) {
+ const TPollerToken::TPtr& token, bool *writeBlocked, size_t maxBytes) {
size_t totalWritten = 0;
- *writeBlocked = false;
-
- while (stream && socket) {
- ssize_t r = Write(stream, *socket);
- if (r == -1) {
+ if (stream && socket && !*writeBlocked && maxBytes) {
+ if (const ssize_t r = Write(stream, *socket, maxBytes); r > 0) {
+ stream.Advance(r);
+ totalWritten += r;
+ } else if (r == -1) {
*writeBlocked = true;
if (token) {
socket->Request(*token, false, true);
}
- break;
} else if (r == 0) {
- break; // error condition
+ // error condition
+ } else {
+ Y_UNREACHABLE();
}
-
- stream.Advance(r);
- totalWritten += r;
}
+ written += totalWritten;
return totalWritten;
};
- // total bytes written during this call
- ui64 written = 0;
+ TTimeLimit limit(GetMaxCyclesPerEvent());
- if (const size_t w = process(OutgoingStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked)) {
- written += w;
- BytesWrittenToSocket += w;
- OutgoingOffset += w;
- }
+ for (;;) {
+ bool progress = false;
+
+ size_t bytesToSendInMain = Max<size_t>();
- if (const size_t w = process(XdcStream, XdcSocket, XdcPollerToken, &ReceiveContext->XdcWriteBlocked)) {
- written += w;
- XdcBytesSent += w;
- XdcOffset += w;
+ if (OutOfBandStream) {
+ bytesToSendInMain = 0;
+ size_t offset = OutgoingOffset;
+ for (const TOutgoingPacket& packet : SendQueue) {
+ if (!offset) {
+ break;
+ } else if (offset < packet.PacketSize) {
+ bytesToSendInMain = packet.PacketSize - offset;
+ break;
+ } else {
+ offset -= packet.PacketSize;
+ }
+ }
+ }
+
+ if (const size_t w = process(OutgoingStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked, bytesToSendInMain)) {
+ BytesWrittenToSocket += w;
+ OutgoingOffset += w;
+ progress = true;
+ bytesToSendInMain -= w;
+ if (OutOfBandStream) {
+ BytesAlignedForOutOfBand += w;
+ }
+ }
+
+ if (!bytesToSendInMain) {
+ if (const size_t w = process(OutOfBandStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked, Max<size_t>())) {
+ BytesWrittenToSocket += w;
+ OutOfBandBytesSent += w;
+ progress = true;
+ }
+ }
+
+ if (const size_t w = process(XdcStream, XdcSocket, XdcPollerToken, &ReceiveContext->XdcWriteBlocked, Max<size_t>())) {
+ XdcBytesSent += w;
+ XdcOffset += w;
+ progress = true;
+ }
+
+ if (!progress) {
+ break;
+ } else if (limit.CheckExceeded()) {
+ WriteData();
+ ++CpuStarvationEventsOnWriteData;
+ break;
+ }
}
if (written) {
Proxy->Metrics->AddTotalBytesWritten(written);
}
- if (DropConfirmed(LastConfirmed) && !RamInQueue) { // issue GenerateTraffic a bit later
- RamInQueue = new TEvRam(false);
- Send(SelfId(), RamInQueue);
+ if (DropConfirmed(LastConfirmed)) { // issue GenerateTraffic a bit later
+ IssueRam();
}
const bool writeBlockedByFullSendBuffer = ReceiveContext->MainWriteBlocked || ReceiveContext->XdcWriteBlocked;
@@ -627,7 +663,15 @@ namespace NActors {
WriteBlockedByFullSendBuffer = writeBlockedByFullSendBuffer;
}
- ssize_t TInterconnectSessionTCP::Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket) {
+ void TInterconnectSessionTCP::WriteData() {
+ if (!WriteDataInFlight) {
+ WriteDataInFlight = true;
+ TActivationContext::Send(new IEventHandle(EvWriteData, 0, SelfId(), {}, nullptr, 0));
+ }
+ }
+
+ ssize_t TInterconnectSessionTCP::Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket,
+ size_t maxBytes) {
LWPROBE_IF_TOO_LONG(SlowICWriteData, Proxy->PeerNodeId, ms) {
constexpr ui32 iovLimit = 256;
@@ -646,7 +690,7 @@ namespace NActors {
TStackVec<TConstIoVec, iovLimit> wbuffers;
- stream.ProduceIoVec(wbuffers, maxElementsInIOV);
+ stream.ProduceIoVec(wbuffers, maxElementsInIOV, maxBytes);
Y_VERIFY(!wbuffers.empty());
TString err;
@@ -731,12 +775,14 @@ namespace NActors {
}
ui64 TInterconnectSessionTCP::MakePacket(bool data, TMaybe<ui64> pingMask) {
+ NInterconnect::TOutgoingStream& stream = data ? OutgoingStream : OutOfBandStream;
+
#ifndef NDEBUG
- const size_t outgoingStreamSizeBefore = OutgoingStream.CalculateOutgoingSize();
+ const size_t outgoingStreamSizeBefore = stream.CalculateOutgoingSize();
const size_t xdcStreamSizeBefore = XdcStream.CalculateOutgoingSize();
#endif
- TTcpPacketOutTask packet(Params, OutgoingStream, XdcStream);
+ TTcpPacketOutTask packet(Params, stream, XdcStream);
ui64 serial = 0;
if (data) {
@@ -775,7 +821,7 @@ namespace NActors {
const size_t packetSize = packet.GetPacketSize();
#ifndef NDEBUG
- const size_t outgoingStreamSizeAfter = OutgoingStream.CalculateOutgoingSize();
+ const size_t outgoingStreamSizeAfter = stream.CalculateOutgoingSize();
const size_t xdcStreamSizeAfter = XdcStream.CalculateOutgoingSize();
Y_VERIFY(outgoingStreamSizeAfter == outgoingStreamSizeBefore + packetSize &&
@@ -787,12 +833,12 @@ namespace NActors {
#endif
// put outgoing packet metadata here
- SendQueue.push_back(TOutgoingPacket{
- static_cast<ui32>(packetSize),
- static_cast<ui32>(packet.GetExternalSize()),
- serial,
- data
- });
+ if (data) {
+ SendQueue.push_back(TOutgoingPacket{
+ static_cast<ui32>(packetSize),
+ static_cast<ui32>(packet.GetExternalSize())
+ });
+ }
LOG_DEBUG_IC_SESSION("ICS22", "outgoing packet Serial# %" PRIu64 " Confirm# %" PRIu64 " DataSize# %zu"
" InflightDataAmount# %" PRIu64, serial, lastInputSerial, packet.GetDataSize(), InflightDataAmount);
@@ -820,17 +866,17 @@ namespace NActors {
// making Serial <= confirm true
size_t bytesDropped = 0;
size_t bytesDroppedFromXdc = 0;
- for (; !SendQueue.empty(); SendQueue.pop_front()) {
- auto& front = SendQueue.front();
- if (front.Data && confirm < front.Serial) {
+ ui64 frontPacketSerial = OutputCounter - SendQueue.size() + 1;
+ for (; !SendQueue.empty(); SendQueue.pop_front(), ++frontPacketSerial) {
+ if (confirm < frontPacketSerial) {
break;
}
+
+ auto& front = SendQueue.front();
if (OutgoingOffset < front.PacketSize || XdcOffset < front.ExternalSize) {
break; // packet wasn't actually sent to receiver, can't drop it now
}
- if (front.Data) {
- lastDroppedSerial.emplace(front.Serial);
- }
+ lastDroppedSerial.emplace(frontPacketSerial);
OutgoingOffset -= front.PacketSize;
XdcOffset -= front.ExternalSize;
bytesDropped += front.PacketSize;
@@ -1231,11 +1277,20 @@ namespace NActors {
MON_VAR(OutgoingStream.GetSendQueueSize())
MON_VAR(OutgoingOffset)
+ MON_VAR(OutOfBandStream.CalculateOutgoingSize())
+ MON_VAR(OutOfBandStream.CalculateUnsentSize())
+ MON_VAR(OutOfBandStream.GetSendQueueSize())
+ MON_VAR(BytesAlignedForOutOfBand)
+ MON_VAR(OutOfBandBytesSent)
+
MON_VAR(XdcStream.CalculateOutgoingSize())
MON_VAR(XdcStream.CalculateUnsentSize())
MON_VAR(XdcStream.GetSendQueueSize())
MON_VAR(XdcOffset)
+ MON_VAR(CpuStarvationEvents)
+ MON_VAR(CpuStarvationEventsOnWriteData)
+
TString clockSkew;
i64 x = GetClockSkew();
if (x < 0) {
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h
index f07cbfa4eb..6c76fa8e3c 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h
@@ -355,6 +355,8 @@ namespace NActors {
ui64 XdcSections = 0;
ui64 XdcRefs = 0;
+ ui64 CpuStarvationEvents = 0;
+
void GenerateHttpInfo(NMon::TEvHttpInfoRes::TPtr ev);
};
@@ -368,6 +370,7 @@ namespace NActors {
EvRam,
EvTerminate,
EvFreeItems,
+ EvWriteData,
};
struct TEvCheckCloseOnIdle : TEventLocal<TEvCheckCloseOnIdle, EvCheckCloseOnIdle> {};
@@ -393,6 +396,10 @@ namespace NActors {
ui64 PacketsGenerated = 0;
ui64 BytesWrittenToSocket = 0;
ui64 PacketsConfirmed = 0;
+ ui64 BytesAlignedForOutOfBand = 0;
+ ui64 OutOfBandBytesSent = 0;
+ ui64 CpuStarvationEvents = 0;
+ ui64 CpuStarvationEventsOnWriteData = 0;
public:
static constexpr EActivityType ActorActivityType() {
@@ -446,6 +453,7 @@ namespace NActors {
hFunc(TEvSocketDisconnect, OnDisconnect)
hFunc(TEvTerminate, Handle)
hFunc(TEvProcessPingRequest, Handle)
+ cFunc(EvWriteData, HandleWriteData)
)
void Handle(TEvUpdateFromInputSession::TPtr& ev);
@@ -457,16 +465,20 @@ namespace NActors {
TEvRam* RamInQueue = nullptr;
ui64 RamStartedCycles = 0;
+ void IssueRam();
void HandleRam(TEvRam::TPtr& ev);
void GenerateTraffic();
void SendUpdateToWhiteboard(bool connected = true);
ui32 CalculateQueueUtilization();
+ bool WriteDataInFlight = false;
+
void Handle(TEvPollerReady::TPtr& ev);
void Handle(TEvPollerRegisterResult::TPtr ev);
+ void HandleWriteData();
void WriteData();
- ssize_t Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket);
+ ssize_t Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket, size_t maxBytes);
ui64 MakePacket(bool data, TMaybe<ui64> pingMask = {});
void FillSendingBuffer(TTcpPacketOutTask& packet, ui64 serial);
@@ -528,13 +540,12 @@ namespace NActors {
void SwitchStuckPeriod();
NInterconnect::TOutgoingStream OutgoingStream;
+ NInterconnect::TOutgoingStream OutOfBandStream;
NInterconnect::TOutgoingStream XdcStream;
struct TOutgoingPacket {
ui32 PacketSize; // including header
ui32 ExternalSize;
- ui64 Serial;
- bool Data;
};
std::deque<TOutgoingPacket> SendQueue; // packet boundaries
size_t OutgoingOffset = 0;
diff --git a/library/cpp/actors/interconnect/outgoing_stream.h b/library/cpp/actors/interconnect/outgoing_stream.h
index 7c32911807..011ecebb9a 100644
--- a/library/cpp/actors/interconnect/outgoing_stream.h
+++ b/library/cpp/actors/interconnect/outgoing_stream.h
@@ -137,12 +137,13 @@ namespace NInterconnect {
}
template<typename T>
- void ProduceIoVec(T& container, size_t maxItems) {
+ void ProduceIoVec(T& container, size_t maxItems, size_t maxBytes) {
size_t offset = SendOffset;
- for (auto it = SendQueue.begin() + SendQueuePos; it != SendQueue.end() && std::size(container) < maxItems; ++it) {
- const TContiguousSpan span = it->Span.SubSpan(offset, Max<size_t>());
+ for (auto it = SendQueue.begin() + SendQueuePos; it != SendQueue.end() && std::size(container) < maxItems && maxBytes; ++it) {
+ const TContiguousSpan span = it->Span.SubSpan(offset, maxBytes);
container.push_back(NActors::TConstIoVec{span.data(), span.size()});
offset = 0;
+ maxBytes -= span.size();
}
}
diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h
index b4d672ee31..2ef72b1c21 100644
--- a/library/cpp/actors/interconnect/packet.h
+++ b/library/cpp/actors/interconnect/packet.h
@@ -162,8 +162,9 @@ struct TTcpPacketOutTask : TNonCopyable {
}
// Acquire raw pointer to write some data.
- TMutableContiguousSpan AcquireSpanForWriting(bool external) {
- if (external) {
+ template<bool External>
+ TMutableContiguousSpan AcquireSpanForWriting() {
+ if (External) {
return XdcStream.AcquireSpanForWriting(GetExternalFreeAmount());
} else {
return OutgoingStream.AcquireSpanForWriting(GetInternalFreeAmount());
@@ -171,17 +172,19 @@ struct TTcpPacketOutTask : TNonCopyable {
}
// Append reference to some data (acquired previously or external pointer).
- void Append(bool external, const void *buffer, size_t len) {
- Y_VERIFY_DEBUG(len <= (external ? GetExternalFreeAmount() : GetInternalFreeAmount()));
- (external ? ExternalSize : InternalSize) += len;
- (external ? XdcStream : OutgoingStream).Append({static_cast<const char*>(buffer), len});
+ template<bool External>
+ void Append(const void *buffer, size_t len) {
+ Y_VERIFY_DEBUG(len <= (External ? GetExternalFreeAmount() : GetInternalFreeAmount()));
+ (External ? ExternalSize : InternalSize) += len;
+ (External ? XdcStream : OutgoingStream).Append({static_cast<const char*>(buffer), len});
}
// Write some data with copying.
- void Write(bool external, const void *buffer, size_t len) {
- Y_VERIFY_DEBUG(len <= (external ? GetExternalFreeAmount() : GetInternalFreeAmount()));
- (external ? ExternalSize : InternalSize) += len;
- (external ? XdcStream : OutgoingStream).Write({static_cast<const char*>(buffer), len});
+ template<bool External>
+ void Write(const void *buffer, size_t len) {
+ Y_VERIFY_DEBUG(len <= (External ? GetExternalFreeAmount() : GetInternalFreeAmount()));
+ (External ? ExternalSize : InternalSize) += len;
+ (External ? XdcStream : OutgoingStream).Write({static_cast<const char*>(buffer), len});
}
void Finish(ui64 serial, ui64 confirm) {
@@ -198,15 +201,11 @@ struct TTcpPacketOutTask : TNonCopyable {
// pre-write header without checksum for correct checksum calculation
WriteBookmark(NInterconnect::TOutgoingStream::TBookmark(HeaderBookmark), &header, sizeof(header));
- size_t total = 0;
ui32 checksum = 0;
OutgoingStream.ScanLastBytes(GetPacketSize(), [&](TContiguousSpan span) {
checksum = Crc32cExtendMSanCompatible(checksum, span.data(), span.size());
- total += span.size();
});
header.Checksum = checksum;
- Y_VERIFY(total == GetPacketSize(), "total# %zu InternalSize# %zu GetPacketSize# %zu", total, InternalSize,
- GetPacketSize());
}
WriteBookmark(std::exchange(HeaderBookmark, {}), &header, sizeof(header));
diff --git a/library/cpp/actors/interconnect/ut/outgoing_stream_ut.cpp b/library/cpp/actors/interconnect/ut/outgoing_stream_ut.cpp
index 14fcf25c2c..4834e48765 100644
--- a/library/cpp/actors/interconnect/ut/outgoing_stream_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/outgoing_stream_ut.cpp
@@ -37,7 +37,7 @@ Y_UNIT_TEST_SUITE(OutgoingStream) {
const size_t maxBuffers = 128;
std::vector<NActors::TConstIoVec> iov;
- stream.ProduceIoVec(iov, maxBuffers);
+ stream.ProduceIoVec(iov, maxBuffers, Max<size_t>());
size_t offset = base + sendOffset;
for (const auto& [ptr, len] : iov) {
UNIT_ASSERT(memcmp(buffer.data() + offset, ptr, len) == 0);