diff options
author | alexvru <[email protected]> | 2023-04-26 10:57:10 +0300 |
---|---|---|
committer | alexvru <[email protected]> | 2023-04-26 10:57:10 +0300 |
commit | 9f32e2cd6cf2ae4d6651dfb413bc8401a86a597f (patch) | |
tree | 9bf57a3b9c7bdf6e001436c11499579062cdb1d6 /library/cpp/actors/interconnect/interconnect_tcp_session.cpp | |
parent | c4f044f20585c080c4c1c6587148079ab7f59d0b (diff) |
Improve IC and XDC
Diffstat (limited to 'library/cpp/actors/interconnect/interconnect_tcp_session.cpp')
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_tcp_session.cpp | 70 |
1 files changed, 57 insertions, 13 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index e8fc9744335..b0b6dc0b66d 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -78,7 +78,7 @@ namespace NActors { } void TInterconnectSessionTCP::Terminate(TDisconnectReason reason) { - LOG_INFO_IC_SESSION("ICS01", "socket: %" PRIi64, (Socket ? i64(*Socket) : -1)); + LOG_INFO_IC_SESSION("ICS01", "socket: %" PRIi64 " reason# %s", (Socket ? i64(*Socket) : -1), reason.ToString().data()); IActor::InvokeOtherActor(*Proxy, &TInterconnectProxyTCP::UnregisterSession, this); ShutdownSocket(std::move(reason)); @@ -277,9 +277,12 @@ namespace NActors { // also reset SendQueuePos // drop confirmed packets first as we do not need unwanted retransmissions + WriteSinglePacketAndDropConfirmed = false; OutgoingStream.RewindToEnd(); XdcStream.RewindToEnd(); - DropConfirmed(nextPacket, true); + SendQueuePos = SendQueue.size(); + SendOffset = 0; + DropConfirmed(nextPacket); OutgoingStream.Rewind(); XdcStream.Rewind(); SendQueuePos = 0; @@ -577,9 +580,9 @@ namespace NActors { ui64 written = 0; auto process = [&](NInterconnect::TOutgoingStream& stream, const TIntrusivePtr<NInterconnect::TStreamSocket>& socket, - const TPollerToken::TPtr& token, bool *writeBlocked, auto&& callback) { + const TPollerToken::TPtr& token, bool *writeBlocked, size_t maxBytes, auto&& callback) { while (stream && socket) { - ssize_t r = Write(stream, *socket); + ssize_t r = Write(stream, *socket, maxBytes); if (r == -1) { *writeBlocked = true; if (token) { @@ -603,7 +606,11 @@ namespace NActors { Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize()); - process(OutgoingStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked, [&](size_t r) { + Y_VERIFY_DEBUG(!WriteSinglePacketAndDropConfirmed || SendQueuePos != SendQueue.size()); + const size_t maxBytesFromMainStream = WriteSinglePacketAndDropConfirmed + ? SendQueue[SendQueuePos].PacketSize - SendOffset + : Max<size_t>(); + process(OutgoingStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked, maxBytesFromMainStream, [&](size_t r) { Y_VERIFY(r <= BytesUnwritten); BytesUnwritten -= r; @@ -621,12 +628,16 @@ namespace NActors { ++PacketsWrittenToSocket; ++packets; Y_VERIFY_DEBUG(SendOffset == 0 || SendQueuePos != SendQueue.size() - 1); + + if (std::exchange(WriteSinglePacketAndDropConfirmed, false)) { // drop remaining already confirmed packets + DropConfirmed(LastConfirmed); + } } Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize()); }); - process(XdcStream, XdcSocket, XdcPollerToken, &ReceiveContext->XdcWriteBlocked, [&](size_t r) { + process(XdcStream, XdcSocket, XdcPollerToken, &ReceiveContext->XdcWriteBlocked, Max<size_t>(), [&](size_t r) { XdcBytesSent += r; XdcStream.Advance(r); }); @@ -645,7 +656,8 @@ namespace NActors { WriteBlockedByFullSendBuffer = writeBlockedByFullSendBuffer; } - ssize_t TInterconnectSessionTCP::Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket) { + ssize_t TInterconnectSessionTCP::Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket, + size_t maxBytes) { LWPROBE_IF_TOO_LONG(SlowICWriteData, Proxy->PeerNodeId, ms) { constexpr ui32 iovLimit = 256; @@ -667,6 +679,21 @@ namespace NActors { stream.ProduceIoVec(wbuffers, maxElementsInIOV); Y_VERIFY(!wbuffers.empty()); + if (maxBytes != Max<size_t>()) { + for (auto it = wbuffers.begin(); it != wbuffers.end(); ++it) { + if (maxBytes < it->Size) { + if (maxBytes) { + it->Size = maxBytes; + ++it; + } + wbuffers.erase(it, wbuffers.end()); + break; + } else { + maxBytes -= it->Size; + } + } + } + TString err; ssize_t r = 0; { // issue syscall with timing @@ -828,7 +855,7 @@ namespace NActors { return packetSize; } - bool TInterconnectSessionTCP::DropConfirmed(ui64 confirm, bool ignoreSendQueuePos) { + bool TInterconnectSessionTCP::DropConfirmed(ui64 confirm) { LOG_DEBUG_IC_SESSION("ICS23", "confirm count: %" PRIu64, confirm); Y_VERIFY(LastConfirmed <= confirm && confirm <= LastSentSerial && LastSentSerial <= OutputCounter, @@ -845,14 +872,13 @@ namespace NActors { size_t unsentBytesInSendQueue = 0; for (size_t i = 0; i < SendQueue.size(); ++i) { totalBytesInSendQueue += SendQueue[i].PacketSize; - if (i > SendQueuePos) { + if (SendQueuePos <= i) { unsentBytesInSendQueue += SendQueue[i].PacketSize; - } else if (i == SendQueuePos) { - unsentBytesInSendQueue += SendQueue[i].PacketSize - SendOffset; } } + unsentBytesInSendQueue -= SendOffset; Y_VERIFY(totalBytesInSendQueue == OutgoingStream.CalculateOutgoingSize()); - Y_VERIFY(ignoreSendQueuePos || unsentBytesInSendQueue == OutgoingStream.CalculateUnsentSize()); + Y_VERIFY(unsentBytesInSendQueue == OutgoingStream.CalculateUnsentSize()); } #endif @@ -865,14 +891,32 @@ namespace NActors { if (front.Data && confirm < front.Serial) { break; } + if (!SendQueuePos) { + // we are sending this packet right now; it might be a race -- we are resending packets after reconnection, + // while getting confirmation for higher packet number (as we have sent it before); here we have to send + // these packets (if necessary) and skip the rest of them + if (!SendOffset && Socket && !Socket->ExpectingCertainWrite()) { + // just advance to next packet -- we won't send current one + ++SendQueuePos; + } else { + // can't skip packets now, make a mark and exit + WriteSinglePacketAndDropConfirmed = true; + break; + } + } if (front.Data) { lastDroppedSerial.emplace(front.Serial); } bytesDropped += front.PacketSize; bytesDroppedFromXdc += front.ExternalSize; ++numDropped; - Y_VERIFY_DEBUG(ignoreSendQueuePos || SendQueuePos != 0); + Y_VERIFY_DEBUG(SendQueuePos != 0); + } + + if (!numDropped) { + return false; } + const ui64 droppedDataAmount = bytesDropped + bytesDroppedFromXdc - sizeof(TTcpPacketHeader_v2) * numDropped; OutgoingStream.DropFront(bytesDropped); XdcStream.DropFront(bytesDroppedFromXdc); |