diff options
author | alexvru <alexvru@ydb.tech> | 2023-04-26 10:57:10 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-04-26 10:57:10 +0300 |
commit | 9f32e2cd6cf2ae4d6651dfb413bc8401a86a597f (patch) | |
tree | 9bf57a3b9c7bdf6e001436c11499579062cdb1d6 | |
parent | c4f044f20585c080c4c1c6587148079ab7f59d0b (diff) | |
download | ydb-9f32e2cd6cf2ae4d6651dfb413bc8401a86a597f.tar.gz |
Improve IC and XDC
8 files changed, 112 insertions, 22 deletions
diff --git a/library/cpp/actors/core/event_pb.cpp b/library/cpp/actors/core/event_pb.cpp index 4e37f36f70..7b35f07e01 100644 --- a/library/cpp/actors/core/event_pb.cpp +++ b/library/cpp/actors/core/event_pb.cpp @@ -77,7 +77,8 @@ namespace NActors { if (CancelFlag || AbortFlag) { return false; } else if (const size_t bytesToAppend = Min<size_t>(size, SizeRemain)) { - if ((reinterpret_cast<uintptr_t>(data) & 63) + bytesToAppend <= 2 * 64) { + if ((reinterpret_cast<uintptr_t>(data) & 63) + bytesToAppend <= 2 * 64 && + (NumChunks == 0 || data != Chunks[NumChunks - 1].first + Chunks[NumChunks - 1].second)) { memcpy(BufferPtr, data, bytesToAppend); if (!Produce(BufferPtr, bytesToAppend)) { diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp index 371f9b7653..232f440cad 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.cpp +++ b/library/cpp/actors/interconnect/interconnect_channel.cpp @@ -32,7 +32,10 @@ namespace NActors { event.Descr.Sender, event.Descr.Cookie, {}, - event.Descr.Checksum + event.Descr.Checksum, +#if IC_FORCE_HARDENED_PACKET_CHECKS + event.EventSerializedSize +#endif }; traceId.Serialize(&descr.TraceId); diff --git a/library/cpp/actors/interconnect/interconnect_stream.cpp b/library/cpp/actors/interconnect/interconnect_stream.cpp index 96ee13a3f5..af63e72fb4 100644 --- a/library/cpp/actors/interconnect/interconnect_stream.cpp +++ b/library/cpp/actors/interconnect/interconnect_stream.cpp @@ -214,6 +214,10 @@ namespace NInterconnect { token.Request(read, write); } + bool TStreamSocket::ExpectingCertainWrite() const { + return false; + } + ////////////////////////////////////////////////////// TDatagramSocket::TPtr TDatagramSocket::Make(int domain) { @@ -496,6 +500,10 @@ namespace NInterconnect { return res; } + bool ExpectingCertainWrite() const { + return BlockedSend.has_value(); + } + std::optional<std::pair<void*, size_t>> BlockedReceive; ssize_t Recv(void* msg, size_t len, TString *err) { @@ -642,4 +650,8 @@ namespace NInterconnect { void TSecureSocket::Request(NActors::TPollerToken& token, bool /*read*/, bool /*write*/) { token.Request(WantRead(), WantWrite()); } + + bool TSecureSocket::ExpectingCertainWrite() const { + return Impl->ExpectingCertainWrite(); + } } diff --git a/library/cpp/actors/interconnect/interconnect_stream.h b/library/cpp/actors/interconnect/interconnect_stream.h index 55438fef10..a25185fc50 100644 --- a/library/cpp/actors/interconnect/interconnect_stream.h +++ b/library/cpp/actors/interconnect/interconnect_stream.h @@ -69,6 +69,8 @@ namespace NInterconnect { ui32 GetSendBufferSize() const; virtual void Request(NActors::TPollerToken& token, bool read, bool write); + + virtual bool ExpectingCertainWrite() const; }; class TSecureSocketContext { @@ -120,7 +122,8 @@ namespace NInterconnect { bool WantRead() const; bool WantWrite() const; - virtual void Request(NActors::TPollerToken& token, bool read, bool write) override; + void Request(NActors::TPollerToken& token, bool read, bool write) override; + bool ExpectingCertainWrite() const override; }; class TDatagramSocket: public TSocket { diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index 925dbef823..37e1bb8b5f 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -409,7 +409,10 @@ namespace NActors { v2.Sender, v2.Cookie, NWilson::TTraceId(v2.TraceId), - v2.Checksum + v2.Checksum, +#if IC_FORCE_HARDENED_PACKET_CHECKS + v2.Len +#endif }; Metrics->IncInputChannelsIncomingEvents(channel); @@ -420,9 +423,9 @@ namespace NActors { } // mark packet as processed - ProcessInboundPacketQ(0); XdcCatchStreamFinal = XdcCatchStreamFinalPending; Context->LastProcessedSerial += !IgnorePayload; + ProcessInboundPacketQ(0); ++PacketsReadFromSocket; ++DataPacketsReadFromSocket; @@ -442,8 +445,10 @@ namespace NActors { break; } - if (!Context->AdvanceLastPacketSerialToConfirm(front.Serial)) { - throw TExDestroySession{TDisconnectReason::NewSession()}; + const bool success = Context->AdvanceLastPacketSerialToConfirm(front.Serial); + Y_VERIFY_DEBUG(Context->GetLastPacketSerialToConfirm() <= Context->LastProcessedSerial); + if (!success) { + throw TExReestablishConnection{TDisconnectReason::NewSession()}; } } } @@ -531,6 +536,13 @@ namespace NActors { } auto& descr = *pendingEvent.EventData; +#if IC_FORCE_HARDENED_PACKET_CHECKS + if (descr.Len != pendingEvent.Payload.GetSize()) { + LOG_CRIT_IC_SESSION("ICISxx", "event length mismatch Type# 0x%08" PRIx32 " received# %zu expected# %" PRIu32, + descr.Type, pendingEvent.Payload.GetSize(), descr.Len); + throw TExReestablishConnection{TDisconnectReason::ChecksumError()}; + } +#endif if (descr.Checksum) { ui32 checksum = 0; for (const auto&& [data, size] : pendingEvent.Payload) { diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index e8fc974433..b0b6dc0b66 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -78,7 +78,7 @@ namespace NActors { } void TInterconnectSessionTCP::Terminate(TDisconnectReason reason) { - LOG_INFO_IC_SESSION("ICS01", "socket: %" PRIi64, (Socket ? i64(*Socket) : -1)); + LOG_INFO_IC_SESSION("ICS01", "socket: %" PRIi64 " reason# %s", (Socket ? i64(*Socket) : -1), reason.ToString().data()); IActor::InvokeOtherActor(*Proxy, &TInterconnectProxyTCP::UnregisterSession, this); ShutdownSocket(std::move(reason)); @@ -277,9 +277,12 @@ namespace NActors { // also reset SendQueuePos // drop confirmed packets first as we do not need unwanted retransmissions + WriteSinglePacketAndDropConfirmed = false; OutgoingStream.RewindToEnd(); XdcStream.RewindToEnd(); - DropConfirmed(nextPacket, true); + SendQueuePos = SendQueue.size(); + SendOffset = 0; + DropConfirmed(nextPacket); OutgoingStream.Rewind(); XdcStream.Rewind(); SendQueuePos = 0; @@ -577,9 +580,9 @@ namespace NActors { ui64 written = 0; auto process = [&](NInterconnect::TOutgoingStream& stream, const TIntrusivePtr<NInterconnect::TStreamSocket>& socket, - const TPollerToken::TPtr& token, bool *writeBlocked, auto&& callback) { + const TPollerToken::TPtr& token, bool *writeBlocked, size_t maxBytes, auto&& callback) { while (stream && socket) { - ssize_t r = Write(stream, *socket); + ssize_t r = Write(stream, *socket, maxBytes); if (r == -1) { *writeBlocked = true; if (token) { @@ -603,7 +606,11 @@ namespace NActors { Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize()); - process(OutgoingStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked, [&](size_t r) { + Y_VERIFY_DEBUG(!WriteSinglePacketAndDropConfirmed || SendQueuePos != SendQueue.size()); + const size_t maxBytesFromMainStream = WriteSinglePacketAndDropConfirmed + ? SendQueue[SendQueuePos].PacketSize - SendOffset + : Max<size_t>(); + process(OutgoingStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked, maxBytesFromMainStream, [&](size_t r) { Y_VERIFY(r <= BytesUnwritten); BytesUnwritten -= r; @@ -621,12 +628,16 @@ namespace NActors { ++PacketsWrittenToSocket; ++packets; Y_VERIFY_DEBUG(SendOffset == 0 || SendQueuePos != SendQueue.size() - 1); + + if (std::exchange(WriteSinglePacketAndDropConfirmed, false)) { // drop remaining already confirmed packets + DropConfirmed(LastConfirmed); + } } Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize()); }); - process(XdcStream, XdcSocket, XdcPollerToken, &ReceiveContext->XdcWriteBlocked, [&](size_t r) { + process(XdcStream, XdcSocket, XdcPollerToken, &ReceiveContext->XdcWriteBlocked, Max<size_t>(), [&](size_t r) { XdcBytesSent += r; XdcStream.Advance(r); }); @@ -645,7 +656,8 @@ namespace NActors { WriteBlockedByFullSendBuffer = writeBlockedByFullSendBuffer; } - ssize_t TInterconnectSessionTCP::Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket) { + ssize_t TInterconnectSessionTCP::Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket, + size_t maxBytes) { LWPROBE_IF_TOO_LONG(SlowICWriteData, Proxy->PeerNodeId, ms) { constexpr ui32 iovLimit = 256; @@ -667,6 +679,21 @@ namespace NActors { stream.ProduceIoVec(wbuffers, maxElementsInIOV); Y_VERIFY(!wbuffers.empty()); + if (maxBytes != Max<size_t>()) { + for (auto it = wbuffers.begin(); it != wbuffers.end(); ++it) { + if (maxBytes < it->Size) { + if (maxBytes) { + it->Size = maxBytes; + ++it; + } + wbuffers.erase(it, wbuffers.end()); + break; + } else { + maxBytes -= it->Size; + } + } + } + TString err; ssize_t r = 0; { // issue syscall with timing @@ -828,7 +855,7 @@ namespace NActors { return packetSize; } - bool TInterconnectSessionTCP::DropConfirmed(ui64 confirm, bool ignoreSendQueuePos) { + bool TInterconnectSessionTCP::DropConfirmed(ui64 confirm) { LOG_DEBUG_IC_SESSION("ICS23", "confirm count: %" PRIu64, confirm); Y_VERIFY(LastConfirmed <= confirm && confirm <= LastSentSerial && LastSentSerial <= OutputCounter, @@ -845,14 +872,13 @@ namespace NActors { size_t unsentBytesInSendQueue = 0; for (size_t i = 0; i < SendQueue.size(); ++i) { totalBytesInSendQueue += SendQueue[i].PacketSize; - if (i > SendQueuePos) { + if (SendQueuePos <= i) { unsentBytesInSendQueue += SendQueue[i].PacketSize; - } else if (i == SendQueuePos) { - unsentBytesInSendQueue += SendQueue[i].PacketSize - SendOffset; } } + unsentBytesInSendQueue -= SendOffset; Y_VERIFY(totalBytesInSendQueue == OutgoingStream.CalculateOutgoingSize()); - Y_VERIFY(ignoreSendQueuePos || unsentBytesInSendQueue == OutgoingStream.CalculateUnsentSize()); + Y_VERIFY(unsentBytesInSendQueue == OutgoingStream.CalculateUnsentSize()); } #endif @@ -865,14 +891,32 @@ namespace NActors { if (front.Data && confirm < front.Serial) { break; } + if (!SendQueuePos) { + // we are sending this packet right now; it might be a race -- we are resending packets after reconnection, + // while getting confirmation for higher packet number (as we have sent it before); here we have to send + // these packets (if necessary) and skip the rest of them + if (!SendOffset && Socket && !Socket->ExpectingCertainWrite()) { + // just advance to next packet -- we won't send current one + ++SendQueuePos; + } else { + // can't skip packets now, make a mark and exit + WriteSinglePacketAndDropConfirmed = true; + break; + } + } if (front.Data) { lastDroppedSerial.emplace(front.Serial); } bytesDropped += front.PacketSize; bytesDroppedFromXdc += front.ExternalSize; ++numDropped; - Y_VERIFY_DEBUG(ignoreSendQueuePos || SendQueuePos != 0); + Y_VERIFY_DEBUG(SendQueuePos != 0); + } + + if (!numDropped) { + return false; } + const ui64 droppedDataAmount = bytesDropped + bytesDroppedFromXdc - sizeof(TTcpPacketHeader_v2) * numDropped; OutgoingStream.DropFront(bytesDropped); XdcStream.DropFront(bytesDroppedFromXdc); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index 1437f7df2c..af6c14d824 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -459,11 +459,11 @@ namespace NActors { void Handle(TEvPollerReady::TPtr& ev); void Handle(TEvPollerRegisterResult::TPtr ev); void WriteData(); - ssize_t Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket); + ssize_t Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket, size_t maxBytes); ui64 MakePacket(bool data, TMaybe<ui64> pingMask = {}); void FillSendingBuffer(TTcpPacketOutTask& packet, ui64 serial); - bool DropConfirmed(ui64 confirm, bool ignoreSendQueuePos = false); + bool DropConfirmed(ui64 confirm); void ShutdownSocket(TDisconnectReason reason); void StartHandshake(); @@ -532,6 +532,7 @@ namespace NActors { std::deque<TOutgoingPacket> SendQueue; // packet boundaries size_t SendQueuePos = 0; // packet being sent now size_t SendOffset = 0; + bool WriteSinglePacketAndDropConfirmed = false; ui64 XdcBytesSent = 0; diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h index 3f7eda1e9c..b4d672ee31 100644 --- a/library/cpp/actors/interconnect/packet.h +++ b/library/cpp/actors/interconnect/packet.h @@ -20,6 +20,14 @@ #define FORCE_EVENT_CHECKSUM 0 #endif +// WARNING: turning this feature on will make protocol incompatible with ordinary Interconnect, use with caution +#define IC_FORCE_HARDENED_PACKET_CHECKS 0 + +#if IC_FORCE_HARDENED_PACKET_CHECKS +#undef FORCE_EVENT_CHECKSUM +#define FORCE_EVENT_CHECKSUM 1 +#endif + Y_FORCE_INLINE ui32 Crc32cExtendMSanCompatible(ui32 checksum, const void *data, size_t len) { if constexpr (NSan::MSanIsOn()) { const char *begin = static_cast<const char*>(data); @@ -56,6 +64,9 @@ struct TEventData { ui64 Cookie; NWilson::TTraceId TraceId; ui32 Checksum; +#if IC_FORCE_HARDENED_PACKET_CHECKS + ui32 Len; +#endif }; #pragma pack(push, 1) @@ -68,6 +79,9 @@ struct TEventDescr2 { ui64 Cookie; NWilson::TTraceId::TSerializedTraceId TraceId; ui32 Checksum; +#if IC_FORCE_HARDENED_PACKET_CHECKS + ui32 Len; +#endif }; #pragma pack(pop) |