diff options
author | alexvru <alexvru@ydb.tech> | 2023-04-14 11:55:52 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-04-14 11:55:52 +0300 |
commit | b079747b8a665c7e1e9573163b558ffa7bb296f7 (patch) | |
tree | c965626c09a51c2afb967158c252bf380fff0725 /library/cpp | |
parent | 4ca3ea35a05a878e6f223274b9fc093146d7b478 (diff) | |
download | ydb-b079747b8a665c7e1e9573163b558ffa7bb296f7.tar.gz |
Remove TTcpPacketHeader_v1 support from IC
Diffstat (limited to 'library/cpp')
7 files changed, 50 insertions, 128 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp index 41709d6fba..69cc5e946a 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.cpp +++ b/library/cpp/actors/interconnect/interconnect_channel.cpp @@ -114,7 +114,7 @@ namespace NActors { Y_VERIFY(maxBytes); auto addChunk = [&](const void *data, size_t len) { - event.UpdateChecksum(Params, data, len); + event.UpdateChecksum(data, len); task.AppendBuf(data, len); part->Size += len; Y_VERIFY_DEBUG(maxBytes >= len); diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp index c75e27ff5e..7baf6a3186 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.cpp +++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp @@ -808,9 +808,12 @@ namespace NActors { const auto& s = success.GetSenderActorId(); PeerVirtualId.Parse(s.data(), s.size()); + if (!success.GetUseModernFrame()) { + generateError("UseModernFrame not set, obsolete peer version"); + } + // recover flags Params.Encryption = success.GetStartEncryption(); - Params.UseModernFrame = success.GetUseModernFrame(); Params.AuthOnly = Params.Encryption && success.GetAuthOnly(); Params.UseExtendedTraceFmt = success.GetUseExtendedTraceFmt(); Params.UseExternalDataChannel = success.GetUseExternalDataChannel(); @@ -993,7 +996,10 @@ namespace NActors { break; } - Params.UseModernFrame = request.GetRequestModernFrame(); + if (!request.GetRequestModernFrame()) { + generateError("RequestModernFrame not set, obsolete peer version"); + } + Params.AuthOnly = Params.Encryption && request.GetRequestAuthOnly() && Common->Settings.TlsAuthOnly; Params.UseExtendedTraceFmt = request.GetRequestExtendedTraceFmt(); Params.UseExternalDataChannel = request.GetRequestExternalDataChannel() && Common->Settings.EnableExternalDataChannel; @@ -1031,7 +1037,7 @@ namespace NActors { if (Common->LocalScopeId != TScopeId()) { FillInScopeId(*success.MutableServerScopeId()); } - success.SetUseModernFrame(Params.UseModernFrame); + success.SetUseModernFrame(true); success.SetAuthOnly(Params.AuthOnly); success.SetUseExtendedTraceFmt(Params.UseExtendedTraceFmt); success.SetUseExternalDataChannel(Params.UseExternalDataChannel); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index ee85f72dd0..ea239780a7 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -79,7 +79,6 @@ namespace NActors { void TInputSessionTCP::ReceiveData() { TTimeLimit limit(GetMaxCyclesPerEvent()); ui64 numDataBytes = 0; - const size_t headerLen = Params.UseModernFrame ? sizeof(TTcpPacketHeader_v2) : sizeof(TTcpPacketHeader_v1); LOG_DEBUG_IC_SESSION("ICIS02", "ReceiveData called"); @@ -94,10 +93,10 @@ namespace NActors { switch (State) { case EState::HEADER: - if (IncomingData.GetSize() < headerLen) { + if (IncomingData.GetSize() < sizeof(Header)) { break; } else { - ProcessHeader(headerLen); + ProcessHeader(); } continue; @@ -172,30 +171,19 @@ namespace NActors { } } - void TInputSessionTCP::ProcessHeader(size_t headerLen) { - const bool success = IncomingData.ExtractFrontPlain(Header.Data, headerLen); + void TInputSessionTCP::ProcessHeader() { + const bool success = IncomingData.ExtractFrontPlain(&Header, sizeof(Header)); Y_VERIFY(success); - if (Params.UseModernFrame) { - PayloadSize = Header.v2.PayloadLength; - HeaderSerial = Header.v2.Serial; - HeaderConfirm = Header.v2.Confirm; - if (!Params.Encryption) { - ChecksumExpected = std::exchange(Header.v2.Checksum, 0); - Checksum = Crc32cExtendMSanCompatible(0, &Header.v2, sizeof(Header.v2)); // start calculating checksum now - if (!PayloadSize && Checksum != ChecksumExpected) { - LOG_ERROR_IC_SESSION("ICIS10", "payload checksum error"); - return ReestablishConnection(TDisconnectReason::ChecksumError()); - } + PayloadSize = Header.PayloadLength; + HeaderSerial = Header.Serial; + HeaderConfirm = Header.Confirm; + if (!Params.Encryption) { + ChecksumExpected = std::exchange(Header.Checksum, 0); + Checksum = Crc32cExtendMSanCompatible(0, &Header, sizeof(Header)); // start calculating checksum now + if (!PayloadSize && Checksum != ChecksumExpected) { + LOG_ERROR_IC_SESSION("ICIS10", "payload checksum error"); + return ReestablishConnection(TDisconnectReason::ChecksumError()); } - } else if (!Header.v1.Check()) { - LOG_ERROR_IC_SESSION("ICIS03", "header checksum error"); - return ReestablishConnection(TDisconnectReason::ChecksumError()); - } else { - PayloadSize = Header.v1.DataSize; - HeaderSerial = Header.v1.Serial; - HeaderConfirm = Header.v1.Confirm; - ChecksumExpected = Header.v1.PayloadCRC32; - Checksum = 0; } if (PayloadSize >= 65536) { LOG_CRIT_IC_SESSION("ICIS07", "payload is way too big"); @@ -243,7 +231,7 @@ namespace NActors { return; // there is still some data to receive in the Payload rope } State = EState::HEADER; // we'll continue with header next time - if (!Params.UseModernFrame || !Params.Encryption) { // see if we are checksumming packet body + if (!Params.Encryption) { // see if we are checksumming packet body for (const auto&& [data, size] : Payload) { Checksum = Crc32cExtendMSanCompatible(Checksum, data, size); } @@ -327,7 +315,7 @@ namespace NActors { } void TInputSessionTCP::ProcessEvent(TRope& data, TEventData& descr) { - if (!Params.UseModernFrame || descr.Checksum) { + if (descr.Checksum) { ui32 checksum = 0; for (const auto&& [data, size] : data) { checksum = Crc32cExtendMSanCompatible(checksum, data, size); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index 9d6a8ba9c1..72982c1458 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -300,8 +300,7 @@ namespace NActors { BytesUnwritten = 0; for (const auto& packet : SendQueue) { - BytesUnwritten += (Params.UseModernFrame ? sizeof(TTcpPacketHeader_v2) : sizeof(TTcpPacketHeader_v1)) + - packet.GetDataSize(); + BytesUnwritten += sizeof(TTcpPacketHeader_v2) + packet.GetDataSize(); } SwitchStuckPeriod(); @@ -582,11 +581,6 @@ namespace NActors { LOG_DEBUG_IC_SESSION("ICS30", "WriteData WriteBlockedByFullSendBuffer# %s SendQueue.size# %zu", ReceiveContext->WriteBlockedByFullSendBuffer ? "true" : "false", SendQueue.size()); - // update last confirmed packet number if it has changed - if (SendQueuePos != SendQueue.end()) { - SendQueuePos->UpdateConfirmIfPossible(ReceiveContext->GetLastProcessedPacketSerial()); - } - while (SendQueuePos != SendQueue.end() && !ReceiveContext->WriteBlockedByFullSendBuffer) { for (auto it = SendQueuePos; it != SendQueue.end() && wbuffers.size() < maxElementsInIOV; ++it) { it->AppendToIoVector(wbuffers, maxElementsInIOV); @@ -805,7 +799,7 @@ namespace NActors { packet->Sign(); // count number of bytes pending for write - ui64 packetSize = (Params.UseModernFrame ? sizeof(TTcpPacketHeader_v2) : sizeof(TTcpPacketHeader_v1)) + packet->GetDataSize(); + ui64 packetSize = sizeof(TTcpPacketHeader_v2) + packet->GetDataSize(); BytesUnwritten += packetSize; LOG_DEBUG_IC_SESSION("ICS22", "outgoing packet Serial# %" PRIu64 " Confirm# %" PRIu64 " DataSize# %zu" @@ -1144,7 +1138,7 @@ namespace NActors { } TABLER() { TABLED() { str << "Frame version/Checksum"; } - TABLED() { str << (!Params.UseModernFrame ? "v1/crc32c" : Params.Encryption ? "v2/none" : "v2/crc32c"); } + TABLED() { str << (Params.Encryption ? "v2/none" : "v2/crc32c"); } } #define MON_VAR(NAME) \ TABLER() { \ diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index f836bdd1af..7d0e003a34 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -218,11 +218,7 @@ namespace NActors { const TSessionParams Params; // header we are currently processing (parsed from the stream) - union { - TTcpPacketHeader_v1 v1; - TTcpPacketHeader_v2 v2; - char Data[1]; - } Header; + TTcpPacketHeader_v2 Header; ui64 HeaderConfirm, HeaderSerial; size_t PayloadSize; @@ -250,7 +246,7 @@ namespace NActors { void HandleResumeReceiveData(); void HandleConfirmUpdate(); void ReceiveData(); - void ProcessHeader(size_t headerLen); + void ProcessHeader(); void ProcessPayload(ui64& numDataBytes); void ProcessEvent(TRope& data, TEventData& descr); bool ReadMore(); diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h index f3c506a663..d29d35a207 100644 --- a/library/cpp/actors/interconnect/packet.h +++ b/library/cpp/actors/interconnect/packet.h @@ -30,27 +30,6 @@ Y_FORCE_INLINE ui32 Crc32cExtendMSanCompatible(ui32 checksum, const void *data, return Crc32cExtend(checksum, data, len); } -struct TTcpPacketHeader_v1 { - ui32 HeaderCRC32; - ui32 PayloadCRC32; - ui64 Confirm; - ui64 Serial; - ui64 DataSize; - - inline bool Check() const { - ui32 actual = Crc32cExtendMSanCompatible(0, &PayloadCRC32, sizeof(TTcpPacketHeader_v1) - sizeof(HeaderCRC32)); - return actual == HeaderCRC32; - } - - inline void Sign() { - HeaderCRC32 = Crc32cExtendMSanCompatible(0, &PayloadCRC32, sizeof(TTcpPacketHeader_v1) - sizeof(HeaderCRC32)); - } - - TString ToString() const { - return Sprintf("{Confirm# %" PRIu64 " Serial# %" PRIu64 " DataSize# %" PRIu64 "}", Confirm, Serial, DataSize); - } -}; - #pragma pack(push, 1) struct TTcpPacketHeader_v2 { ui64 Confirm; @@ -60,20 +39,15 @@ struct TTcpPacketHeader_v2 { }; #pragma pack(pop) -union TTcpPacketBuf { +struct TTcpPacketBuf { static constexpr ui64 PingRequestMask = 0x8000000000000000ULL; static constexpr ui64 PingResponseMask = 0x4000000000000000ULL; static constexpr ui64 ClockMask = 0x2000000000000000ULL; - static constexpr size_t PacketDataLen = 4096 * 2 - 96 - Max(sizeof(TTcpPacketHeader_v1), sizeof(TTcpPacketHeader_v2)); - struct { - TTcpPacketHeader_v1 Header; - char Data[PacketDataLen]; - } v1; - struct { - TTcpPacketHeader_v2 Header; - char Data[PacketDataLen]; - } v2; + static constexpr size_t PacketDataLen = 4096 * 2 - 96 - sizeof(TTcpPacketHeader_v2); + + TTcpPacketHeader_v2 Header; + char Data[PacketDataLen]; }; struct TEventData { @@ -125,8 +99,8 @@ struct TEventHolder : TNonCopyable { Descr.Checksum = 0; } - void UpdateChecksum(const TSessionParams& params, const void *buffer, size_t len) { - if (FORCE_EVENT_CHECKSUM || !params.UseModernFrame) { + void UpdateChecksum(const void *buffer, size_t len) { + if (FORCE_EVENT_CHECKSUM) { Descr.Checksum = Crc32cExtendMSanCompatible(Descr.Checksum, buffer, len); } } @@ -174,16 +148,6 @@ public: Reuse(); } - template<typename T> - auto ApplyToHeader(T&& callback) { - return Params.UseModernFrame ? callback(Packet.v2.Header) : callback(Packet.v1.Header); - } - - template<typename T> - auto ApplyToHeader(T&& callback) const { - return Params.UseModernFrame ? callback(Packet.v2.Header) : callback(Packet.v1.Header); - } - bool IsAtBegin() const { return !BufferIndex && !FirstBufferOffset && !TriedWriting; } @@ -194,11 +158,11 @@ public: void Reuse() { DataSize = 0; - ApplyToHeader([this](auto& header) { Bufs.assign(1, {&header, sizeof(header)}); }); + Bufs.assign(1, {&Packet.Header, sizeof(Packet.Header)}); BufferIndex = 0; FirstBufferOffset = 0; TriedWriting = false; - FreeArea = Params.UseModernFrame ? Packet.v2.Data : Packet.v1.Data; + FreeArea = Packet.Data; End = FreeArea + TTcpPacketBuf::PacketDataLen; Orbit.Reset(); } @@ -208,28 +172,18 @@ public: } void SetMetadata(ui64 serial, ui64 confirm) { - ApplyToHeader([&](auto& header) { - header.Serial = serial; - header.Confirm = confirm; - }); - } - - void UpdateConfirmIfPossible(ui64 confirm) { - // we don't want to recalculate whole packet checksum for single confirmation update on v2 - if (!Params.UseModernFrame && IsAtBegin() && confirm != Packet.v1.Header.Confirm) { - Packet.v1.Header.Confirm = confirm; - Packet.v1.Header.Sign(); - } + Packet.Header.Serial = serial; + Packet.Header.Confirm = confirm; } size_t GetDataSize() const { return DataSize; } ui64 GetSerial() const { - return ApplyToHeader([](auto& header) { return header.Serial; }); + return Packet.Header.Serial; } bool Confirmed(ui64 confirm) const { - return ApplyToHeader([&](auto& header) { return IsEmpty() || header.Serial <= confirm; }); + return IsEmpty() || Packet.Header.Serial <= confirm; } void *GetFreeArea() { @@ -307,29 +261,14 @@ public: } void Sign() { - if (Params.UseModernFrame) { - Packet.v2.Header.Checksum = 0; - Packet.v2.Header.PayloadLength = DataSize; - if (!Params.Encryption) { - ui32 sum = 0; - for (const auto& item : Bufs) { - sum = Crc32cExtendMSanCompatible(sum, item.Data, item.Size); - } - Packet.v2.Header.Checksum = sum; + Packet.Header.Checksum = 0; + Packet.Header.PayloadLength = DataSize; + if (!Params.Encryption) { + ui32 sum = 0; + for (const auto& item : Bufs) { + sum = Crc32cExtendMSanCompatible(sum, item.Data, item.Size); } - } else { - Y_VERIFY(!Bufs.empty()); - auto it = Bufs.begin(); - static constexpr size_t headerLen = sizeof(TTcpPacketHeader_v1); - Y_VERIFY(it->Data == &Packet.v1.Header && it->Size >= headerLen); - ui32 sum = Crc32cExtendMSanCompatible(0, Packet.v1.Data, it->Size - headerLen); - while (++it != Bufs.end()) { - sum = Crc32cExtendMSanCompatible(sum, it->Data, it->Size); - } - - Packet.v1.Header.PayloadCRC32 = sum; - Packet.v1.Header.DataSize = DataSize; - Packet.v1.Header.Sign(); + Packet.Header.Checksum = sum; } } }; diff --git a/library/cpp/actors/interconnect/types.h b/library/cpp/actors/interconnect/types.h index b1d2e02f49..f2feda03f2 100644 --- a/library/cpp/actors/interconnect/types.h +++ b/library/cpp/actors/interconnect/types.h @@ -52,7 +52,6 @@ namespace NActors { struct TSessionParams { bool Encryption = {}; - bool UseModernFrame = {}; bool AuthOnly = {}; bool UseExtendedTraceFmt = {}; bool UseExternalDataChannel = {}; |