summaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
diff options
context:
space:
mode:
authoralexvru <[email protected]>2023-04-26 10:57:10 +0300
committeralexvru <[email protected]>2023-04-26 10:57:10 +0300
commit9f32e2cd6cf2ae4d6651dfb413bc8401a86a597f (patch)
tree9bf57a3b9c7bdf6e001436c11499579062cdb1d6 /library/cpp/actors/interconnect/interconnect_tcp_session.cpp
parentc4f044f20585c080c4c1c6587148079ab7f59d0b (diff)
Improve IC and XDC
Diffstat (limited to 'library/cpp/actors/interconnect/interconnect_tcp_session.cpp')
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp70
1 files changed, 57 insertions, 13 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index e8fc9744335..b0b6dc0b66d 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -78,7 +78,7 @@ namespace NActors {
}
void TInterconnectSessionTCP::Terminate(TDisconnectReason reason) {
- LOG_INFO_IC_SESSION("ICS01", "socket: %" PRIi64, (Socket ? i64(*Socket) : -1));
+ LOG_INFO_IC_SESSION("ICS01", "socket: %" PRIi64 " reason# %s", (Socket ? i64(*Socket) : -1), reason.ToString().data());
IActor::InvokeOtherActor(*Proxy, &TInterconnectProxyTCP::UnregisterSession, this);
ShutdownSocket(std::move(reason));
@@ -277,9 +277,12 @@ namespace NActors {
// also reset SendQueuePos
// drop confirmed packets first as we do not need unwanted retransmissions
+ WriteSinglePacketAndDropConfirmed = false;
OutgoingStream.RewindToEnd();
XdcStream.RewindToEnd();
- DropConfirmed(nextPacket, true);
+ SendQueuePos = SendQueue.size();
+ SendOffset = 0;
+ DropConfirmed(nextPacket);
OutgoingStream.Rewind();
XdcStream.Rewind();
SendQueuePos = 0;
@@ -577,9 +580,9 @@ namespace NActors {
ui64 written = 0;
auto process = [&](NInterconnect::TOutgoingStream& stream, const TIntrusivePtr<NInterconnect::TStreamSocket>& socket,
- const TPollerToken::TPtr& token, bool *writeBlocked, auto&& callback) {
+ const TPollerToken::TPtr& token, bool *writeBlocked, size_t maxBytes, auto&& callback) {
while (stream && socket) {
- ssize_t r = Write(stream, *socket);
+ ssize_t r = Write(stream, *socket, maxBytes);
if (r == -1) {
*writeBlocked = true;
if (token) {
@@ -603,7 +606,11 @@ namespace NActors {
Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize());
- process(OutgoingStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked, [&](size_t r) {
+ Y_VERIFY_DEBUG(!WriteSinglePacketAndDropConfirmed || SendQueuePos != SendQueue.size());
+ const size_t maxBytesFromMainStream = WriteSinglePacketAndDropConfirmed
+ ? SendQueue[SendQueuePos].PacketSize - SendOffset
+ : Max<size_t>();
+ process(OutgoingStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked, maxBytesFromMainStream, [&](size_t r) {
Y_VERIFY(r <= BytesUnwritten);
BytesUnwritten -= r;
@@ -621,12 +628,16 @@ namespace NActors {
++PacketsWrittenToSocket;
++packets;
Y_VERIFY_DEBUG(SendOffset == 0 || SendQueuePos != SendQueue.size() - 1);
+
+ if (std::exchange(WriteSinglePacketAndDropConfirmed, false)) { // drop remaining already confirmed packets
+ DropConfirmed(LastConfirmed);
+ }
}
Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize());
});
- process(XdcStream, XdcSocket, XdcPollerToken, &ReceiveContext->XdcWriteBlocked, [&](size_t r) {
+ process(XdcStream, XdcSocket, XdcPollerToken, &ReceiveContext->XdcWriteBlocked, Max<size_t>(), [&](size_t r) {
XdcBytesSent += r;
XdcStream.Advance(r);
});
@@ -645,7 +656,8 @@ namespace NActors {
WriteBlockedByFullSendBuffer = writeBlockedByFullSendBuffer;
}
- ssize_t TInterconnectSessionTCP::Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket) {
+ ssize_t TInterconnectSessionTCP::Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket,
+ size_t maxBytes) {
LWPROBE_IF_TOO_LONG(SlowICWriteData, Proxy->PeerNodeId, ms) {
constexpr ui32 iovLimit = 256;
@@ -667,6 +679,21 @@ namespace NActors {
stream.ProduceIoVec(wbuffers, maxElementsInIOV);
Y_VERIFY(!wbuffers.empty());
+ if (maxBytes != Max<size_t>()) {
+ for (auto it = wbuffers.begin(); it != wbuffers.end(); ++it) {
+ if (maxBytes < it->Size) {
+ if (maxBytes) {
+ it->Size = maxBytes;
+ ++it;
+ }
+ wbuffers.erase(it, wbuffers.end());
+ break;
+ } else {
+ maxBytes -= it->Size;
+ }
+ }
+ }
+
TString err;
ssize_t r = 0;
{ // issue syscall with timing
@@ -828,7 +855,7 @@ namespace NActors {
return packetSize;
}
- bool TInterconnectSessionTCP::DropConfirmed(ui64 confirm, bool ignoreSendQueuePos) {
+ bool TInterconnectSessionTCP::DropConfirmed(ui64 confirm) {
LOG_DEBUG_IC_SESSION("ICS23", "confirm count: %" PRIu64, confirm);
Y_VERIFY(LastConfirmed <= confirm && confirm <= LastSentSerial && LastSentSerial <= OutputCounter,
@@ -845,14 +872,13 @@ namespace NActors {
size_t unsentBytesInSendQueue = 0;
for (size_t i = 0; i < SendQueue.size(); ++i) {
totalBytesInSendQueue += SendQueue[i].PacketSize;
- if (i > SendQueuePos) {
+ if (SendQueuePos <= i) {
unsentBytesInSendQueue += SendQueue[i].PacketSize;
- } else if (i == SendQueuePos) {
- unsentBytesInSendQueue += SendQueue[i].PacketSize - SendOffset;
}
}
+ unsentBytesInSendQueue -= SendOffset;
Y_VERIFY(totalBytesInSendQueue == OutgoingStream.CalculateOutgoingSize());
- Y_VERIFY(ignoreSendQueuePos || unsentBytesInSendQueue == OutgoingStream.CalculateUnsentSize());
+ Y_VERIFY(unsentBytesInSendQueue == OutgoingStream.CalculateUnsentSize());
}
#endif
@@ -865,14 +891,32 @@ namespace NActors {
if (front.Data && confirm < front.Serial) {
break;
}
+ if (!SendQueuePos) {
+ // we are sending this packet right now; it might be a race -- we are resending packets after reconnection,
+ // while getting confirmation for higher packet number (as we have sent it before); here we have to send
+ // these packets (if necessary) and skip the rest of them
+ if (!SendOffset && Socket && !Socket->ExpectingCertainWrite()) {
+ // just advance to next packet -- we won't send current one
+ ++SendQueuePos;
+ } else {
+ // can't skip packets now, make a mark and exit
+ WriteSinglePacketAndDropConfirmed = true;
+ break;
+ }
+ }
if (front.Data) {
lastDroppedSerial.emplace(front.Serial);
}
bytesDropped += front.PacketSize;
bytesDroppedFromXdc += front.ExternalSize;
++numDropped;
- Y_VERIFY_DEBUG(ignoreSendQueuePos || SendQueuePos != 0);
+ Y_VERIFY_DEBUG(SendQueuePos != 0);
+ }
+
+ if (!numDropped) {
+ return false;
}
+
const ui64 droppedDataAmount = bytesDropped + bytesDroppedFromXdc - sizeof(TTcpPacketHeader_v2) * numDropped;
OutgoingStream.DropFront(bytesDropped);
XdcStream.DropFront(bytesDroppedFromXdc);