aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-05-05 21:11:19 +0300
committeralexvru <alexvru@ydb.tech>2023-05-05 21:11:19 +0300
commit29c8867977327360db88d8937052786b57435501 (patch)
tree48f63cd99e63c6f47df6285805e0b426b10cb7f1
parentde77cf349f6cbd1e0b7c70fa28234999233112fa (diff)
downloadydb-29c8867977327360db88d8937052786b57435501.tar.gz
Fix encryption problem
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp170
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.h4
2 files changed, 86 insertions, 88 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index 3274a6367c..a73418bf15 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -260,12 +260,16 @@ namespace NActors {
// drop confirmed packets first as we do not need unwanted retransmissions
OutgoingStream.RewindToEnd();
XdcStream.RewindToEnd();
- OutgoingOffset = XdcOffset = Max<size_t>();
+ XdcOffset = Max<size_t>();
+ OutgoingOffset = 0;
+ OutgoingIndex = SendQueue.size();
DropConfirmed(nextPacket);
OutgoingStream.Rewind();
OutOfBandStream = {};
XdcStream.Rewind();
OutgoingOffset = XdcOffset = 0;
+ OutgoingIndex = 0;
+ ForceCurrentPacket = false;
const ui64 serial = OutputCounter - SendQueue.size() + 1;
Y_VERIFY(serial > LastConfirmed, "%s serial# %" PRIu64 " LastConfirmed# %" PRIu64, LogPrefix.data(), serial, LastConfirmed);
@@ -345,7 +349,7 @@ namespace NActors {
void TInterconnectSessionTCP::IssueRam(bool batching) {
const auto& batchPeriod = Proxy->Common->Settings.BatchPeriod;
- if (!RamInQueue || (!batching && (RamInQueue->Batching && batchPeriod != TDuration()))) {
+ if (!RamInQueue || (!batching && RamInQueue->Batching && batchPeriod != TDuration())) {
auto ev = std::make_unique<TEvRam>(batching);
RamInQueue = ev.get();
auto handle = std::make_unique<IEventHandle>(SelfId(), SelfId(), ev.release());
@@ -368,23 +372,26 @@ namespace NActors {
}
void TInterconnectSessionTCP::GenerateTraffic() {
- bool canProducePackets = false;
- bool canWriteData = false;
-
if (!TimeLimit) {
TimeLimit.emplace(GetMaxCyclesPerEvent());
}
- for (;;) {
+ // generate ping request, if needed
+ IssuePingRequest();
+
+ while (Socket) {
+ ProducePackets();
if (!Socket) {
return;
}
- ProducePackets();
+ WriteData();
if (!Socket) {
return;
}
- WriteData();
+
+ bool canProducePackets;
+ bool canWriteData;
canProducePackets = NumEventsInReadyChannels && InflightDataAmount < GetTotalInflightAmountOfData() &&
GetUnsentSize() < GetUnsentLimit();
@@ -392,45 +399,43 @@ namespace NActors {
canWriteData = ((OutgoingStream || OutOfBandStream) && !ReceiveContext->MainWriteBlocked) ||
(XdcStream && !ReceiveContext->XdcWriteBlocked);
- if ((!canProducePackets && !canWriteData) || TimeLimit->CheckExceeded()) {
+ if (!canProducePackets && !canWriteData) {
+ SetEnoughCpu(true); // we do not starve
+ break;
+ } else if (TimeLimit->CheckExceeded()) {
+ SetEnoughCpu(false);
+ IssueRam(false);
break;
}
}
- if (canProducePackets || canWriteData) {
- SetEnoughCpu(false);
- IssueRam(false);
- } else {
- SetEnoughCpu(true);
- }
+ // account traffic changes
+ ChannelScheduler->ForEach([](TEventOutputChannel& channel) {
+ channel.AccountTraffic();
+ });
+
+ // equalize channel weights
+ EqualizeCounter += ChannelScheduler->Equalize();
}
void TInterconnectSessionTCP::ProducePackets() {
- // generate ping request, if needed
- IssuePingRequest();
-
- // apply traffic changes
- auto accountTraffic = [&] { ChannelScheduler->ForEach([](TEventOutputChannel& channel) { channel.AccountTraffic(); }); };
-
// first, we create as many data packets as we can generate under certain conditions; they include presence
// of events in channels queues and in flight fitting into requested limit; after we hit one of these conditions
// we exit cycle
+ static constexpr ui32 maxBytesToProduce = 64 * 1024;
+ ui32 bytesProduced = 0;
while (NumEventsInReadyChannels && InflightDataAmount < GetTotalInflightAmountOfData() && GetUnsentSize() < GetUnsentLimit()) {
+ if ((bytesProduced && TimeLimit->CheckExceeded()) || bytesProduced >= maxBytesToProduce) {
+ break;
+ }
try {
- MakePacket(true);
+ bytesProduced += MakePacket(true);
} catch (const TExSerializedEventTooLarge& ex) {
// terminate session if the event can't be serialized properly
- accountTraffic();
LOG_CRIT_IC("ICS31", "serialized event Type# 0x%08" PRIx32 " is too large", ex.Type);
return Terminate(TDisconnectReason::EventTooLarge());
}
- if (TimeLimit->CheckExceeded()) {
- break;
- }
}
-
- accountTraffic();
- EqualizeCounter += ChannelScheduler->Equalize();
}
void TInterconnectSessionTCP::StartHandshake() {
@@ -550,10 +555,6 @@ namespace NActors {
}
void TInterconnectSessionTCP::WriteData() {
- if (!TimeLimit) {
- TimeLimit.emplace(GetMaxCyclesPerEvent());
- }
-
// total bytes written during this call
ui64 written = 0;
@@ -561,7 +562,7 @@ namespace NActors {
const TPollerToken::TPtr& token, bool *writeBlocked, size_t maxBytes) {
size_t totalWritten = 0;
- if (stream && socket && !*writeBlocked && maxBytes) {
+ if (stream && socket && !*writeBlocked) {
if (const ssize_t r = Write(stream, *socket, maxBytes); r > 0) {
stream.Advance(r);
totalWritten += r;
@@ -581,55 +582,49 @@ namespace NActors {
return totalWritten;
};
- TTimeLimit limit(GetMaxCyclesPerEvent());
+ auto sendQueueIt = SendQueue.begin() + OutgoingIndex;
+ static constexpr size_t maxBytesAtOnce = 256 * 1024;
+ size_t bytesToSendInMain = maxBytesAtOnce;
- for (;;) {
- bool progress = false;
- static constexpr size_t maxBytesAtOnce = 256 * 1024;
- size_t bytesToSendInMain = maxBytesAtOnce;
+ Y_VERIFY_DEBUG(OutgoingIndex < SendQueue.size() || (OutgoingIndex == SendQueue.size() && !OutgoingOffset && !OutgoingStream));
- if (OutOfBandStream) {
- bytesToSendInMain = 0;
- size_t offset = OutgoingOffset;
- for (const TOutgoingPacket& packet : SendQueue) {
- if (!offset) {
- break;
- } else if (offset < packet.PacketSize) {
- bytesToSendInMain = packet.PacketSize - offset;
- break;
- } else {
- offset -= packet.PacketSize;
- }
- }
+ if (OutOfBandStream) {
+ // we need to align main stream to single packet boundary, so calculate the boundary
+ bytesToSendInMain = sendQueueIt == SendQueue.end() ? 0 : // we have no packet to send
+ ForceCurrentPacket || OutgoingOffset ? sendQueueIt->PacketSize - OutgoingOffset : // we have to finish current packet
+ 0; // we haven't sent any of current packet content
+ Y_VERIFY_DEBUG(bytesToSendInMain || !ForceCurrentPacket);
+ }
+
+ if (bytesToSendInMain) {
+ const size_t w = process(OutgoingStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked, bytesToSendInMain);
+
+ // adjust sending queue iterator
+ for (OutgoingOffset += w; OutgoingOffset && sendQueueIt->PacketSize <= OutgoingOffset; ++sendQueueIt, ++OutgoingIndex) {
+ OutgoingOffset -= sendQueueIt->PacketSize;
}
- if (const size_t w = process(OutgoingStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked, bytesToSendInMain)) {
- BytesWrittenToSocket += w;
- OutgoingOffset += w;
- progress = true;
+ BytesWrittenToSocket += w;
+
+ if (OutOfBandStream) {
+ BytesAlignedForOutOfBand += w;
bytesToSendInMain -= w;
- if (OutOfBandStream) {
- BytesAlignedForOutOfBand += w;
- }
}
- if (!bytesToSendInMain) {
- if (const size_t w = process(OutOfBandStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked, maxBytesAtOnce)) {
- BytesWrittenToSocket += w;
- OutOfBandBytesSent += w;
- progress = true;
- }
- }
+ ForceCurrentPacket = Socket ? Socket->ExpectingCertainWrite() : false;
+ }
- if (const size_t w = process(XdcStream, XdcSocket, XdcPollerToken, &ReceiveContext->XdcWriteBlocked, maxBytesAtOnce)) {
- XdcBytesSent += w;
- XdcOffset += w;
- progress = true;
+ if (!bytesToSendInMain && !ForceCurrentPacket) {
+ if (const size_t w = process(OutOfBandStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked, maxBytesAtOnce)) {
+ OutOfBandStream.DropFront(w);
+ BytesWrittenToSocket += w;
+ OutOfBandBytesSent += w;
}
+ }
- if (!progress || TimeLimit->CheckExceeded()) {
- break;
- }
+ if (const size_t w = process(XdcStream, XdcSocket, XdcPollerToken, &ReceiveContext->XdcWriteBlocked, maxBytesAtOnce)) {
+ XdcBytesSent += w;
+ XdcOffset += w;
}
if (written) {
@@ -730,16 +725,15 @@ namespace NActors {
while (FlushSchedule && now >= FlushSchedule.top()) {
FlushSchedule.pop();
}
- IssuePingRequest();
if (Socket) {
if (now >= ForcePacketTimestamp) {
++ConfirmPacketsForcedByTimeout;
++FlushEventsProcessed;
MakePacket(false); // just generate confirmation packet if we have preconditions for this
- WriteData();
} else if (ForcePacketTimestamp != TMonotonic::Max()) {
ScheduleFlush();
}
+ GenerateTraffic();
}
}
@@ -752,7 +746,7 @@ namespace NActors {
}
}
- void TInterconnectSessionTCP::MakePacket(bool data, TMaybe<ui64> pingMask) {
+ ui32 TInterconnectSessionTCP::MakePacket(bool data, TMaybe<ui64> pingMask) {
NInterconnect::TOutgoingStream& stream = data ? OutgoingStream : OutOfBandStream;
#ifndef NDEBUG
@@ -828,6 +822,8 @@ namespace NActors {
ResetFlushLogic();
++PacketsGenerated;
+
+ return packetSize;
}
void TInterconnectSessionTCP::DropConfirmed(ui64 confirm) {
@@ -846,21 +842,21 @@ namespace NActors {
size_t bytesDropped = 0;
size_t bytesDroppedFromXdc = 0;
ui64 frontPacketSerial = OutputCounter - SendQueue.size() + 1;
- for (; !SendQueue.empty(); SendQueue.pop_front(), ++frontPacketSerial) {
- if (confirm < frontPacketSerial) {
- break;
- }
-
+ Y_VERIFY_DEBUG(OutgoingIndex < SendQueue.size() || (OutgoingIndex == SendQueue.size() && !OutgoingOffset && !OutgoingStream),
+ "OutgoingIndex# %zu SendQueue.size# %zu OutgoingOffset# %zu Unsent# %zu Total# %zu",
+ OutgoingIndex, SendQueue.size(), OutgoingOffset, OutgoingStream.CalculateUnsentSize(),
+ OutgoingStream.CalculateOutgoingSize());
+ while (OutgoingIndex && frontPacketSerial <= confirm && SendQueue.front().ExternalSize <= XdcOffset) {
auto& front = SendQueue.front();
- if (OutgoingOffset < front.PacketSize || XdcOffset < front.ExternalSize) {
- break; // packet wasn't actually sent to receiver, can't drop it now
- }
lastDroppedSerial.emplace(frontPacketSerial);
- OutgoingOffset -= front.PacketSize;
XdcOffset -= front.ExternalSize;
bytesDropped += front.PacketSize;
bytesDroppedFromXdc += front.ExternalSize;
++numDropped;
+
+ ++frontPacketSerial;
+ SendQueue.pop_front();
+ --OutgoingIndex;
}
if (!numDropped) {
@@ -1048,7 +1044,6 @@ namespace NActors {
if (Socket) {
MakePacket(false, GetCycleCountFast() | TTcpPacketBuf::PingRequestMask);
MakePacket(false, TInstant::Now().MicroSeconds() | TTcpPacketBuf::ClockMask);
- WriteData();
}
LastPingTimestamp = now;
}
@@ -1057,7 +1052,7 @@ namespace NActors {
void TInterconnectSessionTCP::Handle(TEvProcessPingRequest::TPtr ev) {
if (Socket) {
MakePacket(false, ev->Get()->Payload | TTcpPacketBuf::PingResponseMask);
- WriteData();
+ GenerateTraffic();
}
}
@@ -1249,6 +1244,7 @@ namespace NActors {
MON_VAR(OutgoingStream.CalculateUnsentSize())
MON_VAR(OutgoingStream.GetSendQueueSize())
MON_VAR(OutgoingOffset)
+ MON_VAR(OutgoingIndex)
MON_VAR(OutOfBandStream.CalculateOutgoingSize())
MON_VAR(OutOfBandStream.CalculateUnsentSize())
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h
index 7841f8a848..e8d201c84c 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h
@@ -492,7 +492,7 @@ namespace NActors {
void WriteData();
ssize_t Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket, size_t maxBytes);
- void MakePacket(bool data, TMaybe<ui64> pingMask = {});
+ ui32 MakePacket(bool data, TMaybe<ui64> pingMask = {});
void FillSendingBuffer(TTcpPacketOutTask& packet, ui64 serial);
void DropConfirmed(ui64 confirm);
void ShutdownSocket(TDisconnectReason reason);
@@ -562,6 +562,8 @@ namespace NActors {
std::deque<TOutgoingPacket> SendQueue; // packet boundaries
size_t OutgoingOffset = 0;
size_t XdcOffset = 0;
+ size_t OutgoingIndex = 0; // index into current packet in SendQueue
+ bool ForceCurrentPacket = false;
ui64 XdcBytesSent = 0;