diff options
author | alexvru <alexvru@ydb.tech> | 2023-05-05 21:11:19 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-05-05 21:11:19 +0300 |
commit | 29c8867977327360db88d8937052786b57435501 (patch) | |
tree | 48f63cd99e63c6f47df6285805e0b426b10cb7f1 | |
parent | de77cf349f6cbd1e0b7c70fa28234999233112fa (diff) | |
download | ydb-29c8867977327360db88d8937052786b57435501.tar.gz |
Fix encryption problem
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_tcp_session.cpp | 170 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_tcp_session.h | 4 |
2 files changed, 86 insertions, 88 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index 3274a6367c..a73418bf15 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -260,12 +260,16 @@ namespace NActors { // drop confirmed packets first as we do not need unwanted retransmissions OutgoingStream.RewindToEnd(); XdcStream.RewindToEnd(); - OutgoingOffset = XdcOffset = Max<size_t>(); + XdcOffset = Max<size_t>(); + OutgoingOffset = 0; + OutgoingIndex = SendQueue.size(); DropConfirmed(nextPacket); OutgoingStream.Rewind(); OutOfBandStream = {}; XdcStream.Rewind(); OutgoingOffset = XdcOffset = 0; + OutgoingIndex = 0; + ForceCurrentPacket = false; const ui64 serial = OutputCounter - SendQueue.size() + 1; Y_VERIFY(serial > LastConfirmed, "%s serial# %" PRIu64 " LastConfirmed# %" PRIu64, LogPrefix.data(), serial, LastConfirmed); @@ -345,7 +349,7 @@ namespace NActors { void TInterconnectSessionTCP::IssueRam(bool batching) { const auto& batchPeriod = Proxy->Common->Settings.BatchPeriod; - if (!RamInQueue || (!batching && (RamInQueue->Batching && batchPeriod != TDuration()))) { + if (!RamInQueue || (!batching && RamInQueue->Batching && batchPeriod != TDuration())) { auto ev = std::make_unique<TEvRam>(batching); RamInQueue = ev.get(); auto handle = std::make_unique<IEventHandle>(SelfId(), SelfId(), ev.release()); @@ -368,23 +372,26 @@ namespace NActors { } void TInterconnectSessionTCP::GenerateTraffic() { - bool canProducePackets = false; - bool canWriteData = false; - if (!TimeLimit) { TimeLimit.emplace(GetMaxCyclesPerEvent()); } - for (;;) { + // generate ping request, if needed + IssuePingRequest(); + + while (Socket) { + ProducePackets(); if (!Socket) { return; } - ProducePackets(); + WriteData(); if (!Socket) { return; } - WriteData(); + + bool canProducePackets; + bool canWriteData; canProducePackets = NumEventsInReadyChannels && InflightDataAmount < GetTotalInflightAmountOfData() && GetUnsentSize() < GetUnsentLimit(); @@ -392,45 +399,43 @@ namespace NActors { canWriteData = ((OutgoingStream || OutOfBandStream) && !ReceiveContext->MainWriteBlocked) || (XdcStream && !ReceiveContext->XdcWriteBlocked); - if ((!canProducePackets && !canWriteData) || TimeLimit->CheckExceeded()) { + if (!canProducePackets && !canWriteData) { + SetEnoughCpu(true); // we do not starve + break; + } else if (TimeLimit->CheckExceeded()) { + SetEnoughCpu(false); + IssueRam(false); break; } } - if (canProducePackets || canWriteData) { - SetEnoughCpu(false); - IssueRam(false); - } else { - SetEnoughCpu(true); - } + // account traffic changes + ChannelScheduler->ForEach([](TEventOutputChannel& channel) { + channel.AccountTraffic(); + }); + + // equalize channel weights + EqualizeCounter += ChannelScheduler->Equalize(); } void TInterconnectSessionTCP::ProducePackets() { - // generate ping request, if needed - IssuePingRequest(); - - // apply traffic changes - auto accountTraffic = [&] { ChannelScheduler->ForEach([](TEventOutputChannel& channel) { channel.AccountTraffic(); }); }; - // first, we create as many data packets as we can generate under certain conditions; they include presence // of events in channels queues and in flight fitting into requested limit; after we hit one of these conditions // we exit cycle + static constexpr ui32 maxBytesToProduce = 64 * 1024; + ui32 bytesProduced = 0; while (NumEventsInReadyChannels && InflightDataAmount < GetTotalInflightAmountOfData() && GetUnsentSize() < GetUnsentLimit()) { + if ((bytesProduced && TimeLimit->CheckExceeded()) || bytesProduced >= maxBytesToProduce) { + break; + } try { - MakePacket(true); + bytesProduced += MakePacket(true); } catch (const TExSerializedEventTooLarge& ex) { // terminate session if the event can't be serialized properly - accountTraffic(); LOG_CRIT_IC("ICS31", "serialized event Type# 0x%08" PRIx32 " is too large", ex.Type); return Terminate(TDisconnectReason::EventTooLarge()); } - if (TimeLimit->CheckExceeded()) { - break; - } } - - accountTraffic(); - EqualizeCounter += ChannelScheduler->Equalize(); } void TInterconnectSessionTCP::StartHandshake() { @@ -550,10 +555,6 @@ namespace NActors { } void TInterconnectSessionTCP::WriteData() { - if (!TimeLimit) { - TimeLimit.emplace(GetMaxCyclesPerEvent()); - } - // total bytes written during this call ui64 written = 0; @@ -561,7 +562,7 @@ namespace NActors { const TPollerToken::TPtr& token, bool *writeBlocked, size_t maxBytes) { size_t totalWritten = 0; - if (stream && socket && !*writeBlocked && maxBytes) { + if (stream && socket && !*writeBlocked) { if (const ssize_t r = Write(stream, *socket, maxBytes); r > 0) { stream.Advance(r); totalWritten += r; @@ -581,55 +582,49 @@ namespace NActors { return totalWritten; }; - TTimeLimit limit(GetMaxCyclesPerEvent()); + auto sendQueueIt = SendQueue.begin() + OutgoingIndex; + static constexpr size_t maxBytesAtOnce = 256 * 1024; + size_t bytesToSendInMain = maxBytesAtOnce; - for (;;) { - bool progress = false; - static constexpr size_t maxBytesAtOnce = 256 * 1024; - size_t bytesToSendInMain = maxBytesAtOnce; + Y_VERIFY_DEBUG(OutgoingIndex < SendQueue.size() || (OutgoingIndex == SendQueue.size() && !OutgoingOffset && !OutgoingStream)); - if (OutOfBandStream) { - bytesToSendInMain = 0; - size_t offset = OutgoingOffset; - for (const TOutgoingPacket& packet : SendQueue) { - if (!offset) { - break; - } else if (offset < packet.PacketSize) { - bytesToSendInMain = packet.PacketSize - offset; - break; - } else { - offset -= packet.PacketSize; - } - } + if (OutOfBandStream) { + // we need to align main stream to single packet boundary, so calculate the boundary + bytesToSendInMain = sendQueueIt == SendQueue.end() ? 0 : // we have no packet to send + ForceCurrentPacket || OutgoingOffset ? sendQueueIt->PacketSize - OutgoingOffset : // we have to finish current packet + 0; // we haven't sent any of current packet content + Y_VERIFY_DEBUG(bytesToSendInMain || !ForceCurrentPacket); + } + + if (bytesToSendInMain) { + const size_t w = process(OutgoingStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked, bytesToSendInMain); + + // adjust sending queue iterator + for (OutgoingOffset += w; OutgoingOffset && sendQueueIt->PacketSize <= OutgoingOffset; ++sendQueueIt, ++OutgoingIndex) { + OutgoingOffset -= sendQueueIt->PacketSize; } - if (const size_t w = process(OutgoingStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked, bytesToSendInMain)) { - BytesWrittenToSocket += w; - OutgoingOffset += w; - progress = true; + BytesWrittenToSocket += w; + + if (OutOfBandStream) { + BytesAlignedForOutOfBand += w; bytesToSendInMain -= w; - if (OutOfBandStream) { - BytesAlignedForOutOfBand += w; - } } - if (!bytesToSendInMain) { - if (const size_t w = process(OutOfBandStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked, maxBytesAtOnce)) { - BytesWrittenToSocket += w; - OutOfBandBytesSent += w; - progress = true; - } - } + ForceCurrentPacket = Socket ? Socket->ExpectingCertainWrite() : false; + } - if (const size_t w = process(XdcStream, XdcSocket, XdcPollerToken, &ReceiveContext->XdcWriteBlocked, maxBytesAtOnce)) { - XdcBytesSent += w; - XdcOffset += w; - progress = true; + if (!bytesToSendInMain && !ForceCurrentPacket) { + if (const size_t w = process(OutOfBandStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked, maxBytesAtOnce)) { + OutOfBandStream.DropFront(w); + BytesWrittenToSocket += w; + OutOfBandBytesSent += w; } + } - if (!progress || TimeLimit->CheckExceeded()) { - break; - } + if (const size_t w = process(XdcStream, XdcSocket, XdcPollerToken, &ReceiveContext->XdcWriteBlocked, maxBytesAtOnce)) { + XdcBytesSent += w; + XdcOffset += w; } if (written) { @@ -730,16 +725,15 @@ namespace NActors { while (FlushSchedule && now >= FlushSchedule.top()) { FlushSchedule.pop(); } - IssuePingRequest(); if (Socket) { if (now >= ForcePacketTimestamp) { ++ConfirmPacketsForcedByTimeout; ++FlushEventsProcessed; MakePacket(false); // just generate confirmation packet if we have preconditions for this - WriteData(); } else if (ForcePacketTimestamp != TMonotonic::Max()) { ScheduleFlush(); } + GenerateTraffic(); } } @@ -752,7 +746,7 @@ namespace NActors { } } - void TInterconnectSessionTCP::MakePacket(bool data, TMaybe<ui64> pingMask) { + ui32 TInterconnectSessionTCP::MakePacket(bool data, TMaybe<ui64> pingMask) { NInterconnect::TOutgoingStream& stream = data ? OutgoingStream : OutOfBandStream; #ifndef NDEBUG @@ -828,6 +822,8 @@ namespace NActors { ResetFlushLogic(); ++PacketsGenerated; + + return packetSize; } void TInterconnectSessionTCP::DropConfirmed(ui64 confirm) { @@ -846,21 +842,21 @@ namespace NActors { size_t bytesDropped = 0; size_t bytesDroppedFromXdc = 0; ui64 frontPacketSerial = OutputCounter - SendQueue.size() + 1; - for (; !SendQueue.empty(); SendQueue.pop_front(), ++frontPacketSerial) { - if (confirm < frontPacketSerial) { - break; - } - + Y_VERIFY_DEBUG(OutgoingIndex < SendQueue.size() || (OutgoingIndex == SendQueue.size() && !OutgoingOffset && !OutgoingStream), + "OutgoingIndex# %zu SendQueue.size# %zu OutgoingOffset# %zu Unsent# %zu Total# %zu", + OutgoingIndex, SendQueue.size(), OutgoingOffset, OutgoingStream.CalculateUnsentSize(), + OutgoingStream.CalculateOutgoingSize()); + while (OutgoingIndex && frontPacketSerial <= confirm && SendQueue.front().ExternalSize <= XdcOffset) { auto& front = SendQueue.front(); - if (OutgoingOffset < front.PacketSize || XdcOffset < front.ExternalSize) { - break; // packet wasn't actually sent to receiver, can't drop it now - } lastDroppedSerial.emplace(frontPacketSerial); - OutgoingOffset -= front.PacketSize; XdcOffset -= front.ExternalSize; bytesDropped += front.PacketSize; bytesDroppedFromXdc += front.ExternalSize; ++numDropped; + + ++frontPacketSerial; + SendQueue.pop_front(); + --OutgoingIndex; } if (!numDropped) { @@ -1048,7 +1044,6 @@ namespace NActors { if (Socket) { MakePacket(false, GetCycleCountFast() | TTcpPacketBuf::PingRequestMask); MakePacket(false, TInstant::Now().MicroSeconds() | TTcpPacketBuf::ClockMask); - WriteData(); } LastPingTimestamp = now; } @@ -1057,7 +1052,7 @@ namespace NActors { void TInterconnectSessionTCP::Handle(TEvProcessPingRequest::TPtr ev) { if (Socket) { MakePacket(false, ev->Get()->Payload | TTcpPacketBuf::PingResponseMask); - WriteData(); + GenerateTraffic(); } } @@ -1249,6 +1244,7 @@ namespace NActors { MON_VAR(OutgoingStream.CalculateUnsentSize()) MON_VAR(OutgoingStream.GetSendQueueSize()) MON_VAR(OutgoingOffset) + MON_VAR(OutgoingIndex) MON_VAR(OutOfBandStream.CalculateOutgoingSize()) MON_VAR(OutOfBandStream.CalculateUnsentSize()) diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index 7841f8a848..e8d201c84c 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -492,7 +492,7 @@ namespace NActors { void WriteData(); ssize_t Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket, size_t maxBytes); - void MakePacket(bool data, TMaybe<ui64> pingMask = {}); + ui32 MakePacket(bool data, TMaybe<ui64> pingMask = {}); void FillSendingBuffer(TTcpPacketOutTask& packet, ui64 serial); void DropConfirmed(ui64 confirm); void ShutdownSocket(TDisconnectReason reason); @@ -562,6 +562,8 @@ namespace NActors { std::deque<TOutgoingPacket> SendQueue; // packet boundaries size_t OutgoingOffset = 0; size_t XdcOffset = 0; + size_t OutgoingIndex = 0; // index into current packet in SendQueue + bool ForceCurrentPacket = false; ui64 XdcBytesSent = 0; |