aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-05-03 16:24:46 +0300
committeralexvru <alexvru@ydb.tech>2023-05-03 16:24:46 +0300
commitf323f855449f04b5a243bf5eeb48320e834a4009 (patch)
treea2cd846c0be022fb9b1530613f77681d545eceb0
parent200c355ded38bd5a84be4da890b492ac56602273 (diff)
downloadydb-f323f855449f04b5a243bf5eeb48320e834a4009.tar.gz
Further IC fixes
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp189
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.h63
-rw-r--r--library/cpp/actors/interconnect/outgoing_stream.h24
3 files changed, 140 insertions, 136 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index 0c592d236b..0ca9f2f136 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -160,26 +160,7 @@ namespace NActors {
}
}
- if (RamInQueue && !RamInQueue->Batching) {
- // we have pending TEvRam, so GenerateTraffic will be called no matter what
- } else if (InflightDataAmount >= GetTotalInflightAmountOfData() || !Socket) {
- // we can't issue more traffic now; GenerateTraffic will be called upon unblocking
- } else if (TotalOutputQueueSize >= 64 * 1024) {
- // output queue size is quite big to issue some traffic
- IssueRam();
- } else if (!RamInQueue) {
- Y_VERIFY_DEBUG(NumEventsInReadyChannels == 1);
- RamInQueue = new TEvRam(true);
- auto *ev = new IEventHandle(SelfId(), {}, RamInQueue);
- const TDuration batchPeriod = Proxy->Common->Settings.BatchPeriod;
- if (batchPeriod != TDuration()) {
- TActivationContext::Schedule(batchPeriod, ev);
- } else {
- TActivationContext::Send(ev);
- }
- LWPROBE(StartBatching, Proxy->PeerNodeId, batchPeriod.MillisecondsFloat());
- LOG_DEBUG_IC_SESSION("ICS17", "batching started");
- }
+ IssueRam(true);
}
void TInterconnectSessionTCP::Subscribe(STATEFN_SIG) {
@@ -295,8 +276,7 @@ namespace NActors {
LastHandshakeDone = TActivationContext::Now();
- RamInQueue = nullptr;
- IssueRam();
+ IssueRam(false);
}
void TInterconnectSessionTCP::Handle(TEvUpdateFromInputSession::TPtr& ev) {
@@ -325,24 +305,18 @@ namespace NActors {
CloseOnIdleWatchdog.Reset();
}
- bool unblockedSomething = false;
LWPROBE_IF_TOO_LONG(SlowICDropConfirmed, Proxy->PeerNodeId, ms) {
- unblockedSomething = DropConfirmed(msg.ConfirmedByInput);
- }
-
- // generate more traffic if we have unblocked state now
- if (unblockedSomething) {
- LWPROBE(UnblockByDropConfirmed, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - ev->SendTime) * 1000.0);
- IssueRam();
+ DropConfirmed(msg.ConfirmedByInput);
}
// if we haven't generated any packets, then make a lone Flush packet without any data
if (needConfirm && Socket) {
++ConfirmPacketsForcedBySize;
MakePacket(false);
- WriteData();
}
+ GenerateTraffic();
+
for (;;) {
switch (EUpdateState state = ReceiveContext->UpdateState) {
case EUpdateState::NONE:
@@ -369,10 +343,17 @@ namespace NActors {
}
}
- void TInterconnectSessionTCP::IssueRam() {
- if (!RamInQueue || (RamInQueue->Batching && Proxy->Common->Settings.BatchPeriod != TDuration::Zero())) {
- RamInQueue = new TEvRam(false);
- Send(SelfId(), RamInQueue);
+ void TInterconnectSessionTCP::IssueRam(bool batching) {
+ const auto& batchPeriod = Proxy->Common->Settings.BatchPeriod;
+ if (!RamInQueue || (!batching && (RamInQueue->Batching && batchPeriod != TDuration()))) {
+ auto ev = std::make_unique<TEvRam>(batching);
+ RamInQueue = ev.get();
+ auto handle = std::make_unique<IEventHandle>(SelfId(), SelfId(), ev.release());
+ if (batching && batchPeriod != TDuration()) {
+ TActivationContext::Schedule(batchPeriod, handle.release());
+ } else {
+ TActivationContext::Send(handle.release());
+ }
LWPROBE(StartRam, Proxy->PeerNodeId);
RamStartedCycles = GetCycleCountFast();
}
@@ -387,16 +368,44 @@ namespace NActors {
}
void TInterconnectSessionTCP::GenerateTraffic() {
- // generate ping request, if needed
- IssuePingRequest();
+ bool canProducePackets = false;
+ bool canWriteData = false;
+
+ if (!TimeLimit) {
+ TimeLimit.emplace(GetMaxCyclesPerEvent());
+ }
- LOG_DEBUG_IC_SESSION("ICS19", "GenerateTraffic");
+ for (;;) {
+ if (!Socket) {
+ return;
+ }
+ ProducePackets();
+ canProducePackets = NumEventsInReadyChannels && InflightDataAmount < GetTotalInflightAmountOfData() &&
+ GetUnsentSize() < GetUnsentLimit();
+
+ if (!Socket) {
+ return;
+ }
+ WriteData();
+ canWriteData = ((OutgoingStream || OutOfBandStream) && !ReceiveContext->MainWriteBlocked) ||
+ (XdcStream && !ReceiveContext->XdcWriteBlocked);
+
+ if ((!canProducePackets && !canWriteData) || TimeLimit->CheckExceeded()) {
+ break;
+ }
+ }
- const ui64 sizeBefore = TotalOutputQueueSize;
- ui32 generatedPackets = 0;
- ui64 generatedBytes = 0;
- ui64 generateStarted = GetCycleCountFast();
- bool enoughCpu = true;
+ if (canProducePackets || canWriteData) {
+ SetEnoughCpu(false);
+ IssueRam(false);
+ } else {
+ SetEnoughCpu(true);
+ }
+ }
+
+ void TInterconnectSessionTCP::ProducePackets() {
+ // generate ping request, if needed
+ IssuePingRequest();
// apply traffic changes
auto accountTraffic = [&] { ChannelScheduler->ForEach([](TEventOutputChannel& channel) { channel.AccountTraffic(); }); };
@@ -404,38 +413,20 @@ namespace NActors {
// first, we create as many data packets as we can generate under certain conditions; they include presence
// of events in channels queues and in flight fitting into requested limit; after we hit one of these conditions
// we exit cycle
- if (Socket) {
- TTimeLimit limit(GetMaxCyclesPerEvent());
-
- while (NumEventsInReadyChannels && InflightDataAmount < GetTotalInflightAmountOfData()) {
- if (generatedPackets && limit.CheckExceeded()) {
- // resume later but ensure that we have issued at least one packet
- IssueRam();
- enoughCpu = false;
- ++CpuStarvationEvents;
- break;
- }
-
- try {
- generatedBytes += MakePacket(true);
- ++generatedPackets;
- } catch (const TExSerializedEventTooLarge& ex) {
- // terminate session if the event can't be serialized properly
- accountTraffic();
- LOG_CRIT_IC("ICS31", "serialized event Type# 0x%08" PRIx32 " is too large", ex.Type);
- return Terminate(TDisconnectReason::EventTooLarge());
- }
+ while (NumEventsInReadyChannels && InflightDataAmount < GetTotalInflightAmountOfData() && GetUnsentSize() < GetUnsentLimit()) {
+ try {
+ MakePacket(true);
+ } catch (const TExSerializedEventTooLarge& ex) {
+ // terminate session if the event can't be serialized properly
+ accountTraffic();
+ LOG_CRIT_IC("ICS31", "serialized event Type# 0x%08" PRIx32 " is too large", ex.Type);
+ return Terminate(TDisconnectReason::EventTooLarge());
+ }
+ if (TimeLimit->CheckExceeded()) {
+ break;
}
}
- SetEnoughCpu(enoughCpu);
-
- if (Socket) {
- WriteData();
- }
-
- LWPROBE(GenerateTraffic, Proxy->PeerNodeId, NHPTimer::GetSeconds(GetCycleCountFast() - generateStarted) * 1000.0, sizeBefore - TotalOutputQueueSize, generatedPackets, generatedBytes);
-
accountTraffic();
EqualizeCounter += ChannelScheduler->Equalize();
}
@@ -537,7 +528,7 @@ namespace NActors {
Send(ReceiverId, ev->Release().Release());
}
- IssueRam();
+ GenerateTraffic();
}
void TInterconnectSessionTCP::Handle(TEvPollerRegisterResult::TPtr ev) {
@@ -556,11 +547,9 @@ namespace NActors {
}
}
- void TInterconnectSessionTCP::HandleWriteData() {
- Y_VERIFY(WriteDataInFlight);
- WriteDataInFlight = false;
- if (!Socket) {
- return;
+ void TInterconnectSessionTCP::WriteData() {
+ if (!TimeLimit) {
+ TimeLimit.emplace(GetMaxCyclesPerEvent());
}
// total bytes written during this call
@@ -594,8 +583,8 @@ namespace NActors {
for (;;) {
bool progress = false;
-
- size_t bytesToSendInMain = Max<size_t>();
+ static constexpr size_t maxBytesAtOnce = 256 * 1024;
+ size_t bytesToSendInMain = maxBytesAtOnce;
if (OutOfBandStream) {
bytesToSendInMain = 0;
@@ -623,24 +612,20 @@ namespace NActors {
}
if (!bytesToSendInMain) {
- if (const size_t w = process(OutOfBandStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked, Max<size_t>())) {
+ if (const size_t w = process(OutOfBandStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked, maxBytesAtOnce)) {
BytesWrittenToSocket += w;
OutOfBandBytesSent += w;
progress = true;
}
}
- if (const size_t w = process(XdcStream, XdcSocket, XdcPollerToken, &ReceiveContext->XdcWriteBlocked, Max<size_t>())) {
+ if (const size_t w = process(XdcStream, XdcSocket, XdcPollerToken, &ReceiveContext->XdcWriteBlocked, maxBytesAtOnce)) {
XdcBytesSent += w;
XdcOffset += w;
progress = true;
}
- if (!progress) {
- break;
- } else if (limit.CheckExceeded()) {
- WriteData();
- ++CpuStarvationEventsOnWriteData;
+ if (!progress || TimeLimit->CheckExceeded()) {
break;
}
}
@@ -649,9 +634,7 @@ namespace NActors {
Proxy->Metrics->AddTotalBytesWritten(written);
}
- if (DropConfirmed(LastConfirmed)) { // issue GenerateTraffic a bit later
- IssueRam();
- }
+ DropConfirmed(LastConfirmed);
const bool writeBlockedByFullSendBuffer = ReceiveContext->MainWriteBlocked || ReceiveContext->XdcWriteBlocked;
if (WriteBlockedByFullSendBuffer < writeBlockedByFullSendBuffer) { // became blocked
@@ -663,13 +646,6 @@ namespace NActors {
WriteBlockedByFullSendBuffer = writeBlockedByFullSendBuffer;
}
- void TInterconnectSessionTCP::WriteData() {
- if (!WriteDataInFlight) {
- WriteDataInFlight = true;
- TActivationContext::Send(new IEventHandle(EvWriteData, 0, SelfId(), {}, nullptr, 0));
- }
- }
-
ssize_t TInterconnectSessionTCP::Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket,
size_t maxBytes) {
LWPROBE_IF_TOO_LONG(SlowICWriteData, Proxy->PeerNodeId, ms) {
@@ -774,7 +750,7 @@ namespace NActors {
}
}
- ui64 TInterconnectSessionTCP::MakePacket(bool data, TMaybe<ui64> pingMask) {
+ void TInterconnectSessionTCP::MakePacket(bool data, TMaybe<ui64> pingMask) {
NInterconnect::TOutgoingStream& stream = data ? OutgoingStream : OutOfBandStream;
#ifndef NDEBUG
@@ -782,6 +758,9 @@ namespace NActors {
const size_t xdcStreamSizeBefore = XdcStream.CalculateOutgoingSize();
#endif
+ stream.Align();
+ XdcStream.Align();
+
TTcpPacketOutTask packet(Params, stream, XdcStream);
ui64 serial = 0;
@@ -847,11 +826,9 @@ namespace NActors {
ResetFlushLogic();
++PacketsGenerated;
-
- return packetSize;
}
- bool TInterconnectSessionTCP::DropConfirmed(ui64 confirm) {
+ void TInterconnectSessionTCP::DropConfirmed(ui64 confirm) {
LOG_DEBUG_IC_SESSION("ICS23", "confirm count: %" PRIu64, confirm);
Y_VERIFY(LastConfirmed <= confirm && confirm <= OutputCounter,
@@ -859,7 +836,7 @@ namespace NActors {
LogPrefix.data(), confirm, LastConfirmed, OutputCounter);
LastConfirmed = confirm;
- std::optional<ui64> lastDroppedSerial = 0;
+ std::optional<ui64> lastDroppedSerial;
ui32 numDropped = 0;
// drop confirmed packets; this also includes any auxiliary packets as their serial is set to zero, effectively
@@ -885,7 +862,7 @@ namespace NActors {
}
if (!numDropped) {
- return false;
+ return;
}
const ui64 droppedDataAmount = bytesDropped + bytesDroppedFromXdc - sizeof(TTcpPacketHeader_v2) * numDropped;
@@ -897,10 +874,6 @@ namespace NActors {
});
}
- const ui64 current = InflightDataAmount;
- const ui64 limit = GetTotalInflightAmountOfData();
- const bool unblockedSomething = current >= limit && current < limit + droppedDataAmount;
-
PacketsConfirmed += numDropped;
InflightDataAmount -= droppedDataAmount;
Proxy->Metrics->SubInflightDataAmount(droppedDataAmount);
@@ -910,8 +883,6 @@ namespace NActors {
" dropped %" PRIu32 " packets", InflightDataAmount, droppedDataAmount, numDropped);
Pool->Trim(); // send any unsent free requests
-
- return unblockedSomething;
}
void TInterconnectSessionTCP::FillSendingBuffer(TTcpPacketOutTask& task, ui64 serial) {
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h
index 6c76fa8e3c..92146f6591 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h
@@ -370,7 +370,6 @@ namespace NActors {
EvRam,
EvTerminate,
EvFreeItems,
- EvWriteData,
};
struct TEvCheckCloseOnIdle : TEventLocal<TEvCheckCloseOnIdle, EvCheckCloseOnIdle> {};
@@ -436,25 +435,30 @@ namespace NActors {
void Subscribe(STATEFN_SIG);
void Unsubscribe(STATEFN_SIG);
- STRICT_STFUNC(StateFunc,
- fFunc(TEvInterconnect::EvForward, Forward)
- cFunc(TEvents::TEvPoisonPill::EventType, HandlePoison)
- fFunc(TEvInterconnect::TEvConnectNode::EventType, Subscribe)
- fFunc(TEvents::TEvSubscribe::EventType, Subscribe)
- fFunc(TEvents::TEvUnsubscribe::EventType, Unsubscribe)
- cFunc(TEvFlush::EventType, HandleFlush)
- hFunc(TEvPollerReady, Handle)
- hFunc(TEvPollerRegisterResult, Handle)
- hFunc(TEvUpdateFromInputSession, Handle)
- hFunc(TEvRam, HandleRam)
- hFunc(TEvCheckCloseOnIdle, CloseOnIdleWatchdog)
- hFunc(TEvCheckLostConnection, LostConnectionWatchdog)
- cFunc(TEvents::TSystem::Wakeup, SendUpdateToWhiteboard)
- hFunc(TEvSocketDisconnect, OnDisconnect)
- hFunc(TEvTerminate, Handle)
- hFunc(TEvProcessPingRequest, Handle)
- cFunc(EvWriteData, HandleWriteData)
- )
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ std::optional<TTimeLimit> TimeLimit;
+
+ STATEFN(StateFunc) {
+ TimeLimit.emplace(GetMaxCyclesPerEvent());
+ STRICT_STFUNC_BODY(
+ fFunc(TEvInterconnect::EvForward, Forward)
+ cFunc(TEvents::TEvPoisonPill::EventType, HandlePoison)
+ fFunc(TEvInterconnect::TEvConnectNode::EventType, Subscribe)
+ fFunc(TEvents::TEvSubscribe::EventType, Subscribe)
+ fFunc(TEvents::TEvUnsubscribe::EventType, Unsubscribe)
+ cFunc(TEvFlush::EventType, HandleFlush)
+ hFunc(TEvPollerReady, Handle)
+ hFunc(TEvPollerRegisterResult, Handle)
+ hFunc(TEvUpdateFromInputSession, Handle)
+ hFunc(TEvRam, HandleRam)
+ hFunc(TEvCheckCloseOnIdle, CloseOnIdleWatchdog)
+ hFunc(TEvCheckLostConnection, LostConnectionWatchdog)
+ cFunc(TEvents::TSystem::Wakeup, SendUpdateToWhiteboard)
+ hFunc(TEvSocketDisconnect, OnDisconnect)
+ hFunc(TEvTerminate, Handle)
+ hFunc(TEvProcessPingRequest, Handle)
+ )
+ }
void Handle(TEvUpdateFromInputSession::TPtr& ev);
@@ -465,24 +469,31 @@ namespace NActors {
TEvRam* RamInQueue = nullptr;
ui64 RamStartedCycles = 0;
- void IssueRam();
+ void IssueRam(bool batching);
void HandleRam(TEvRam::TPtr& ev);
void GenerateTraffic();
+ void ProducePackets();
+
+ size_t GetUnsentSize() const {
+ return OutgoingStream.CalculateUnsentSize() + OutOfBandStream.CalculateUnsentSize() +
+ XdcStream.CalculateUnsentSize();
+ }
+
+ size_t GetUnsentLimit() const {
+ return 128 * 1024;
+ }
void SendUpdateToWhiteboard(bool connected = true);
ui32 CalculateQueueUtilization();
- bool WriteDataInFlight = false;
-
void Handle(TEvPollerReady::TPtr& ev);
void Handle(TEvPollerRegisterResult::TPtr ev);
- void HandleWriteData();
void WriteData();
ssize_t Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket, size_t maxBytes);
- ui64 MakePacket(bool data, TMaybe<ui64> pingMask = {});
+ void MakePacket(bool data, TMaybe<ui64> pingMask = {});
void FillSendingBuffer(TTcpPacketOutTask& packet, ui64 serial);
- bool DropConfirmed(ui64 confirm);
+ void DropConfirmed(ui64 confirm);
void ShutdownSocket(TDisconnectReason reason);
void StartHandshake();
diff --git a/library/cpp/actors/interconnect/outgoing_stream.h b/library/cpp/actors/interconnect/outgoing_stream.h
index 011ecebb9a..197b9219c2 100644
--- a/library/cpp/actors/interconnect/outgoing_stream.h
+++ b/library/cpp/actors/interconnect/outgoing_stream.h
@@ -36,6 +36,7 @@ namespace NInterconnect {
std::deque<TSendChunk> SendQueue;
size_t SendQueuePos = 0;
size_t SendOffset = 0;
+ size_t UnsentBytes = 0;
public:
operator bool() const {
@@ -51,11 +52,14 @@ namespace NInterconnect {
}
size_t CalculateUnsentSize() const {
+#ifndef NDEBUG
size_t res = 0;
for (auto it = SendQueue.begin() + SendQueuePos; it != SendQueue.end(); ++it) {
res += it->Span.size();
}
- return res - SendOffset;
+ Y_VERIFY(UnsentBytes == res - SendOffset);
+#endif
+ return UnsentBytes;
}
size_t GetSendQueueSize() const {
@@ -74,6 +78,16 @@ namespace NInterconnect {
return {AppendBuffer->Data + AppendOffset, Min(maxLen, BufferSize - AppendOffset)};
}
+ void Align() {
+ if (AppendOffset != BufferSize) {
+ AppendOffset += -(reinterpret_cast<uintptr_t>(AppendBuffer->Data) + AppendOffset) & 63;
+ if (AppendOffset > BufferSize) {
+ AppendOffset = BufferSize;
+ DropBufferReference(std::exchange(AppendBuffer, nullptr));
+ }
+ }
+ }
+
void Append(TContiguousSpan span) {
if (AppendBuffer && span.data() == AppendBuffer->Data + AppendOffset) { // the only valid case to use previously acquired span
AppendAcquiredSpan(span);
@@ -129,11 +143,16 @@ namespace NInterconnect {
void Rewind() {
SendQueuePos = 0;
SendOffset = 0;
+ UnsentBytes = 0;
+ for (const auto& item : SendQueue) {
+ UnsentBytes += item.Span.size();
+ }
}
void RewindToEnd() {
SendQueuePos = SendQueue.size();
SendOffset = 0;
+ UnsentBytes = 0;
}
template<typename T>
@@ -149,7 +168,9 @@ namespace NInterconnect {
void Advance(size_t numBytes) { // called when numBytes portion of data has been sent
Y_VERIFY_DEBUG(numBytes == 0 || SendQueuePos != SendQueue.size());
+ Y_VERIFY_DEBUG(numBytes <= UnsentBytes);
SendOffset += numBytes;
+ UnsentBytes -= numBytes;
for (auto it = SendQueue.begin() + SendQueuePos; SendOffset && it->Span.size() <= SendOffset; ++SendQueuePos, ++it) {
SendOffset -= it->Span.size();
Y_VERIFY_DEBUG(SendOffset == 0 || SendQueuePos != SendQueue.size() - 1);
@@ -214,6 +235,7 @@ namespace NInterconnect {
}
void AppendSpanWithGlueing(TContiguousSpan span, TBuffer *buffer) {
+ UnsentBytes += span.size();
if (!SendQueue.empty()) {
auto& back = SendQueue.back();
if (back.Span.data() + back.Span.size() == span.data()) { // check if it is possible just to extend the last span