diff options
author | alexvru <alexvru@ydb.tech> | 2023-05-10 21:41:42 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-05-10 21:41:42 +0300 |
commit | 1c7bc09bc80a6e280d3b113f5e58363ab63d528f (patch) | |
tree | 16b3d9a7b3b285407ef8bcec596153039dfd6d2a | |
parent | 6ff45f26cc9fc9f10ec4653a793a396d7556439e (diff) | |
download | ydb-1c7bc09bc80a6e280d3b113f5e58363ab63d528f.tar.gz |
Support xxHash in IC
12 files changed, 78 insertions, 11 deletions
diff --git a/library/cpp/actors/interconnect/CMakeLists.darwin-x86_64.txt b/library/cpp/actors/interconnect/CMakeLists.darwin-x86_64.txt index 5240e1579b..b0843f7a1f 100644 --- a/library/cpp/actors/interconnect/CMakeLists.darwin-x86_64.txt +++ b/library/cpp/actors/interconnect/CMakeLists.darwin-x86_64.txt @@ -18,6 +18,7 @@ target_link_libraries(cpp-actors-interconnect PUBLIC yutil contrib-libs-libc_compat OpenSSL::OpenSSL + contrib-libs-xxhash cpp-actors-core cpp-actors-dnscachelib cpp-actors-dnsresolver diff --git a/library/cpp/actors/interconnect/CMakeLists.linux-aarch64.txt b/library/cpp/actors/interconnect/CMakeLists.linux-aarch64.txt index b89c5d909e..88d247f735 100644 --- a/library/cpp/actors/interconnect/CMakeLists.linux-aarch64.txt +++ b/library/cpp/actors/interconnect/CMakeLists.linux-aarch64.txt @@ -19,6 +19,7 @@ target_link_libraries(cpp-actors-interconnect PUBLIC yutil contrib-libs-libc_compat OpenSSL::OpenSSL + contrib-libs-xxhash cpp-actors-core cpp-actors-dnscachelib cpp-actors-dnsresolver diff --git a/library/cpp/actors/interconnect/CMakeLists.linux-x86_64.txt b/library/cpp/actors/interconnect/CMakeLists.linux-x86_64.txt index b89c5d909e..88d247f735 100644 --- a/library/cpp/actors/interconnect/CMakeLists.linux-x86_64.txt +++ b/library/cpp/actors/interconnect/CMakeLists.linux-x86_64.txt @@ -19,6 +19,7 @@ target_link_libraries(cpp-actors-interconnect PUBLIC yutil contrib-libs-libc_compat OpenSSL::OpenSSL + contrib-libs-xxhash cpp-actors-core cpp-actors-dnscachelib cpp-actors-dnsresolver diff --git a/library/cpp/actors/interconnect/CMakeLists.windows-x86_64.txt b/library/cpp/actors/interconnect/CMakeLists.windows-x86_64.txt index 5240e1579b..b0843f7a1f 100644 --- a/library/cpp/actors/interconnect/CMakeLists.windows-x86_64.txt +++ b/library/cpp/actors/interconnect/CMakeLists.windows-x86_64.txt @@ -18,6 +18,7 @@ target_link_libraries(cpp-actors-interconnect PUBLIC yutil contrib-libs-libc_compat OpenSSL::OpenSSL + contrib-libs-xxhash cpp-actors-core cpp-actors-dnscachelib cpp-actors-dnsresolver diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp index 3b3672b48e..a5b02201a0 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.cpp +++ b/library/cpp/actors/interconnect/interconnect_channel.cpp @@ -277,7 +277,14 @@ namespace NActors { *ptr++ = static_cast<ui8>(EXdcCommand::PUSH_DATA); *reinterpret_cast<ui16*>(ptr) = bytesSerialized; ptr += sizeof(ui16); - if (task.Checksumming()) { + if (task.ChecksummingXxhash()) { + XXH3_state_t state; + XXH3_64bits_reset(&state); + task.XdcStream.ScanLastBytes(bytesSerialized, [&state](TContiguousSpan span) { + XXH3_64bits_update(&state, span.data(), span.size()); + }); + *reinterpret_cast<ui32*>(ptr) = XXH3_64bits_digest(&state); + } else if (task.ChecksummingCrc32c()) { *reinterpret_cast<ui32*>(ptr) = task.ExternalChecksum; } diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp index a8aeca7866..835b110a6d 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.cpp +++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp @@ -781,6 +781,7 @@ namespace NActors { request.SetRequestAuthOnly(Common->Settings.TlsAuthOnly); request.SetRequestExtendedTraceFmt(true); request.SetRequestExternalDataChannel(Common->Settings.EnableExternalDataChannel); + request.SetRequestXxhash(true); request.SetHandshakeId(*HandshakeId); SendExBlock(MainChannel, request, "ExRequest"); @@ -818,6 +819,7 @@ namespace NActors { Params.Encryption = success.GetStartEncryption(); Params.AuthOnly = Params.Encryption && success.GetAuthOnly(); Params.UseExternalDataChannel = success.GetUseExternalDataChannel(); + Params.UseXxhash = success.GetUseXxhash(); if (success.HasServerScopeId()) { ParsePeerScopeId(success.GetServerScopeId()); } @@ -1005,6 +1007,7 @@ namespace NActors { Params.AuthOnly = Params.Encryption && request.GetRequestAuthOnly() && Common->Settings.TlsAuthOnly; Params.UseExternalDataChannel = request.GetRequestExternalDataChannel() && Common->Settings.EnableExternalDataChannel; + Params.UseXxhash = request.GetRequestXxhash(); if (Params.UseExternalDataChannel) { if (request.HasHandshakeId()) { @@ -1043,6 +1046,7 @@ namespace NActors { success.SetAuthOnly(Params.AuthOnly); success.SetUseExtendedTraceFmt(true); success.SetUseExternalDataChannel(Params.UseExternalDataChannel); + success.SetUseXxhash(Params.UseXxhash); SendExBlock(MainChannel, record, "ExReply"); // extract sender actor id (self virtual id) diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index 961bf7caff..03377a5ea6 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -131,6 +131,8 @@ namespace NActors { } UsageHisto.fill(0); + + XXH3_64bits_reset(&XxhashXdcState); } void TInputSessionTCP::Bootstrap() { @@ -320,7 +322,15 @@ namespace NActors { const ui64 confirm = header.Confirm; if (!Params.Encryption) { ChecksumExpected = std::exchange(header.Checksum, 0); - Checksum = Crc32cExtendMSanCompatible(0, &header, sizeof(header)); // start calculating checksum now + if (Params.UseXxhash) { + XXH3_64bits_reset(&XxhashState); + XXH3_64bits_update(&XxhashState, &header, sizeof(header)); + if (!PayloadSize) { + Checksum = XXH3_64bits_digest(&XxhashState); + } + } else { + Checksum = Crc32cExtendMSanCompatible(0, &header, sizeof(header)); // start calculating checksum now + } if (!PayloadSize && Checksum != ChecksumExpected) { LOG_ERROR_IC_SESSION("ICIS10", "payload checksum error"); throw TExReestablishConnection{TDisconnectReason::ChecksumError()}; @@ -388,7 +398,14 @@ namespace NActors { State = EState::HEADER; if (!Params.Encryption) { // see if we are checksumming packet body for (const auto&& [data, size] : Payload) { - Checksum = Crc32cExtendMSanCompatible(Checksum, data, size); + if (Params.UseXxhash) { + XXH3_64bits_update(&XxhashState, data, size); + } else { + Checksum = Crc32cExtendMSanCompatible(Checksum, data, size); + } + } + if (Params.UseXxhash) { + Checksum = XXH3_64bits_digest(&XxhashState); } if (Checksum != ChecksumExpected) { // validate payload checksum LOG_ERROR_IC_SESSION("ICIS04", "payload checksum error"); @@ -910,10 +927,18 @@ namespace NActors { Y_VERIFY_DEBUG(!XdcChecksumQ.empty()); auto& [size, expected] = XdcChecksumQ.front(); const size_t n = Min<size_t>(size, span.size()); - XdcCurrentChecksum = Crc32cExtendMSanCompatible(XdcCurrentChecksum, span.data(), n); + if (Params.UseXxhash) { + XXH3_64bits_update(&XxhashXdcState, span.data(), n); + } else { + XdcCurrentChecksum = Crc32cExtendMSanCompatible(XdcCurrentChecksum, span.data(), n); + } span = span.SubSpan(n, Max<size_t>()); size -= n; if (!size) { + if (Params.UseXxhash) { + XdcCurrentChecksum = XXH3_64bits_digest(&XxhashXdcState); + XXH3_64bits_reset(&XxhashXdcState); + } if (XdcCurrentChecksum != expected) { LOG_ERROR_IC_SESSION("ICIS16", "payload checksum error"); throw TExReestablishConnection{TDisconnectReason::ChecksumError()}; diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index a73418bf15..2f2b175530 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -1158,7 +1158,7 @@ namespace NActors { } TABLER() { TABLED() { str << "Frame version/Checksum"; } - TABLED() { str << (Params.Encryption ? "v2/none" : "v2/crc32c"); } + TABLED() { str << (Params.Encryption ? "v2/none" : Params.UseXxhash ? "v2/xxhash" : "v2/crc32c"); } } #define MON_VAR(NAME) \ TABLER() { \ diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index e8d201c84c..5722ddb627 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -12,6 +12,9 @@ #include <library/cpp/monlib/dynamic_counters/counters.h> #include <library/cpp/actors/core/actor_bootstrapped.h> +#define XXH_INLINE_ALL +#include <contrib/libs/xxhash/xxhash.h> + #include <util/generic/queue.h> #include <util/generic/deque.h> #include <util/datetime/cputimer.h> @@ -251,6 +254,8 @@ namespace NActors { TInterconnectProxyCommon::TPtr Common; const ui32 NodeId; const TSessionParams Params; + XXH3_state_t XxhashState; + XXH3_state_t XxhashXdcState; size_t PayloadSize; ui32 ChecksumExpected, Checksum; diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h index 4e4cfb4b1d..df0bf0a8c7 100644 --- a/library/cpp/actors/interconnect/packet.h +++ b/library/cpp/actors/interconnect/packet.h @@ -13,6 +13,9 @@ #include <util/generic/string.h> #include <util/generic/list.h> +#define XXH_INLINE_ALL +#include <contrib/libs/xxhash/xxhash.h> + #include "types.h" #include "outgoing_stream.h" @@ -158,7 +161,7 @@ struct TTcpPacketOutTask : TNonCopyable { // Preallocate some space to fill it later. NInterconnect::TOutgoingStream::TBookmark Bookmark(size_t len) { - if (Checksumming()) { + if (ChecksummingCrc32c()) { Y_VERIFY_DEBUG(!InsideBookmark); InsideBookmark = true; PreBookmarkChecksum = std::exchange(InternalChecksum, 0); @@ -171,7 +174,7 @@ struct TTcpPacketOutTask : TNonCopyable { // Write previously bookmarked space. void WriteBookmark(NInterconnect::TOutgoingStream::TBookmark&& bookmark, const void *buffer, size_t len) { - if (Checksumming()) { + if (ChecksummingCrc32c()) { Y_VERIFY_DEBUG(InsideBookmark); InsideBookmark = false; const ui32 bookmarkChecksum = Crc32cExtendMSanCompatible(PreBookmarkChecksum, buffer, len); @@ -210,7 +213,7 @@ struct TTcpPacketOutTask : TNonCopyable { template<bool External> void ProcessChecksum(const void *buffer, size_t len) { - if (Checksumming()) { + if (ChecksummingCrc32c()) { if (External) { ExternalChecksum = Crc32cExtendMSanCompatible(ExternalChecksum, buffer, len); } else { @@ -230,7 +233,19 @@ struct TTcpPacketOutTask : TNonCopyable { static_cast<ui16>(InternalSize) }; - if (Checksumming()) { + if (ChecksummingXxhash()) { + // write header with zero checksum to calculate whole packet checksum correctly + OutgoingStream.WriteBookmark(NInterconnect::TOutgoingStream::TBookmark(HeaderBookmark), + {reinterpret_cast<const char*>(&header), sizeof(header)}); + + // calculate packet checksum + XXH3_state_t state; + XXH3_64bits_reset(&state); + OutgoingStream.ScanLastBytes(GetPacketSize(), [&state](TContiguousSpan span) { + XXH3_64bits_update(&state, span.data(), span.size()); + }); + header.Checksum = XXH3_64bits_digest(&state); + } else if (ChecksummingCrc32c()) { Y_VERIFY_DEBUG(!InsideBookmark); const ui32 headerChecksum = Crc32cExtendMSanCompatible(0, &header, sizeof(header)); header.Checksum = Crc32cCombine(headerChecksum, InternalChecksum, InternalSize); @@ -240,8 +255,12 @@ struct TTcpPacketOutTask : TNonCopyable { sizeof(header)}); } - bool Checksumming() const { - return !Params.Encryption; + bool ChecksummingCrc32c() const { + return !Params.Encryption && !Params.UseXxhash; + } + + bool ChecksummingXxhash() const { + return !Params.Encryption && Params.UseXxhash; } bool IsEmpty() const { return GetDataSize() == 0; } diff --git a/library/cpp/actors/interconnect/types.h b/library/cpp/actors/interconnect/types.h index 3963f3107d..0c36564efd 100644 --- a/library/cpp/actors/interconnect/types.h +++ b/library/cpp/actors/interconnect/types.h @@ -54,6 +54,7 @@ namespace NActors { bool Encryption = {}; bool AuthOnly = {}; bool UseExternalDataChannel = {}; + bool UseXxhash = {}; TString AuthCN; NActors::TScopeId PeerScopeId; }; diff --git a/library/cpp/actors/protos/interconnect.proto b/library/cpp/actors/protos/interconnect.proto index 3108b4f24c..538ede33d1 100644 --- a/library/cpp/actors/protos/interconnect.proto +++ b/library/cpp/actors/protos/interconnect.proto @@ -73,6 +73,7 @@ message THandshakeRequest { optional bool RequestAuthOnly = 19; optional bool RequestExtendedTraceFmt = 20; optional bool RequestExternalDataChannel = 21; + optional bool RequestXxhash = 24; optional bytes CompatibilityInfo = 22; @@ -100,6 +101,7 @@ message THandshakeSuccess { optional bool AuthOnly = 12; optional bool UseExtendedTraceFmt = 13; optional bool UseExternalDataChannel = 14; + optional bool UseXxhash = 16; optional bytes CompatibilityInfo = 15; } |