aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-05-10 21:41:42 +0300
committeralexvru <alexvru@ydb.tech>2023-05-10 21:41:42 +0300
commit1c7bc09bc80a6e280d3b113f5e58363ab63d528f (patch)
tree16b3d9a7b3b285407ef8bcec596153039dfd6d2a
parent6ff45f26cc9fc9f10ec4653a793a396d7556439e (diff)
downloadydb-1c7bc09bc80a6e280d3b113f5e58363ab63d528f.tar.gz
Support xxHash in IC
-rw-r--r--library/cpp/actors/interconnect/CMakeLists.darwin-x86_64.txt1
-rw-r--r--library/cpp/actors/interconnect/CMakeLists.linux-aarch64.txt1
-rw-r--r--library/cpp/actors/interconnect/CMakeLists.linux-x86_64.txt1
-rw-r--r--library/cpp/actors/interconnect/CMakeLists.windows-x86_64.txt1
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.cpp9
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.cpp4
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp31
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp2
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.h5
-rw-r--r--library/cpp/actors/interconnect/packet.h31
-rw-r--r--library/cpp/actors/interconnect/types.h1
-rw-r--r--library/cpp/actors/protos/interconnect.proto2
12 files changed, 78 insertions, 11 deletions
diff --git a/library/cpp/actors/interconnect/CMakeLists.darwin-x86_64.txt b/library/cpp/actors/interconnect/CMakeLists.darwin-x86_64.txt
index 5240e1579b..b0843f7a1f 100644
--- a/library/cpp/actors/interconnect/CMakeLists.darwin-x86_64.txt
+++ b/library/cpp/actors/interconnect/CMakeLists.darwin-x86_64.txt
@@ -18,6 +18,7 @@ target_link_libraries(cpp-actors-interconnect PUBLIC
yutil
contrib-libs-libc_compat
OpenSSL::OpenSSL
+ contrib-libs-xxhash
cpp-actors-core
cpp-actors-dnscachelib
cpp-actors-dnsresolver
diff --git a/library/cpp/actors/interconnect/CMakeLists.linux-aarch64.txt b/library/cpp/actors/interconnect/CMakeLists.linux-aarch64.txt
index b89c5d909e..88d247f735 100644
--- a/library/cpp/actors/interconnect/CMakeLists.linux-aarch64.txt
+++ b/library/cpp/actors/interconnect/CMakeLists.linux-aarch64.txt
@@ -19,6 +19,7 @@ target_link_libraries(cpp-actors-interconnect PUBLIC
yutil
contrib-libs-libc_compat
OpenSSL::OpenSSL
+ contrib-libs-xxhash
cpp-actors-core
cpp-actors-dnscachelib
cpp-actors-dnsresolver
diff --git a/library/cpp/actors/interconnect/CMakeLists.linux-x86_64.txt b/library/cpp/actors/interconnect/CMakeLists.linux-x86_64.txt
index b89c5d909e..88d247f735 100644
--- a/library/cpp/actors/interconnect/CMakeLists.linux-x86_64.txt
+++ b/library/cpp/actors/interconnect/CMakeLists.linux-x86_64.txt
@@ -19,6 +19,7 @@ target_link_libraries(cpp-actors-interconnect PUBLIC
yutil
contrib-libs-libc_compat
OpenSSL::OpenSSL
+ contrib-libs-xxhash
cpp-actors-core
cpp-actors-dnscachelib
cpp-actors-dnsresolver
diff --git a/library/cpp/actors/interconnect/CMakeLists.windows-x86_64.txt b/library/cpp/actors/interconnect/CMakeLists.windows-x86_64.txt
index 5240e1579b..b0843f7a1f 100644
--- a/library/cpp/actors/interconnect/CMakeLists.windows-x86_64.txt
+++ b/library/cpp/actors/interconnect/CMakeLists.windows-x86_64.txt
@@ -18,6 +18,7 @@ target_link_libraries(cpp-actors-interconnect PUBLIC
yutil
contrib-libs-libc_compat
OpenSSL::OpenSSL
+ contrib-libs-xxhash
cpp-actors-core
cpp-actors-dnscachelib
cpp-actors-dnsresolver
diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp
index 3b3672b48e..a5b02201a0 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.cpp
+++ b/library/cpp/actors/interconnect/interconnect_channel.cpp
@@ -277,7 +277,14 @@ namespace NActors {
*ptr++ = static_cast<ui8>(EXdcCommand::PUSH_DATA);
*reinterpret_cast<ui16*>(ptr) = bytesSerialized;
ptr += sizeof(ui16);
- if (task.Checksumming()) {
+ if (task.ChecksummingXxhash()) {
+ XXH3_state_t state;
+ XXH3_64bits_reset(&state);
+ task.XdcStream.ScanLastBytes(bytesSerialized, [&state](TContiguousSpan span) {
+ XXH3_64bits_update(&state, span.data(), span.size());
+ });
+ *reinterpret_cast<ui32*>(ptr) = XXH3_64bits_digest(&state);
+ } else if (task.ChecksummingCrc32c()) {
*reinterpret_cast<ui32*>(ptr) = task.ExternalChecksum;
}
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp
index a8aeca7866..835b110a6d 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.cpp
+++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp
@@ -781,6 +781,7 @@ namespace NActors {
request.SetRequestAuthOnly(Common->Settings.TlsAuthOnly);
request.SetRequestExtendedTraceFmt(true);
request.SetRequestExternalDataChannel(Common->Settings.EnableExternalDataChannel);
+ request.SetRequestXxhash(true);
request.SetHandshakeId(*HandshakeId);
SendExBlock(MainChannel, request, "ExRequest");
@@ -818,6 +819,7 @@ namespace NActors {
Params.Encryption = success.GetStartEncryption();
Params.AuthOnly = Params.Encryption && success.GetAuthOnly();
Params.UseExternalDataChannel = success.GetUseExternalDataChannel();
+ Params.UseXxhash = success.GetUseXxhash();
if (success.HasServerScopeId()) {
ParsePeerScopeId(success.GetServerScopeId());
}
@@ -1005,6 +1007,7 @@ namespace NActors {
Params.AuthOnly = Params.Encryption && request.GetRequestAuthOnly() && Common->Settings.TlsAuthOnly;
Params.UseExternalDataChannel = request.GetRequestExternalDataChannel() && Common->Settings.EnableExternalDataChannel;
+ Params.UseXxhash = request.GetRequestXxhash();
if (Params.UseExternalDataChannel) {
if (request.HasHandshakeId()) {
@@ -1043,6 +1046,7 @@ namespace NActors {
success.SetAuthOnly(Params.AuthOnly);
success.SetUseExtendedTraceFmt(true);
success.SetUseExternalDataChannel(Params.UseExternalDataChannel);
+ success.SetUseXxhash(Params.UseXxhash);
SendExBlock(MainChannel, record, "ExReply");
// extract sender actor id (self virtual id)
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
index 961bf7caff..03377a5ea6 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
@@ -131,6 +131,8 @@ namespace NActors {
}
UsageHisto.fill(0);
+
+ XXH3_64bits_reset(&XxhashXdcState);
}
void TInputSessionTCP::Bootstrap() {
@@ -320,7 +322,15 @@ namespace NActors {
const ui64 confirm = header.Confirm;
if (!Params.Encryption) {
ChecksumExpected = std::exchange(header.Checksum, 0);
- Checksum = Crc32cExtendMSanCompatible(0, &header, sizeof(header)); // start calculating checksum now
+ if (Params.UseXxhash) {
+ XXH3_64bits_reset(&XxhashState);
+ XXH3_64bits_update(&XxhashState, &header, sizeof(header));
+ if (!PayloadSize) {
+ Checksum = XXH3_64bits_digest(&XxhashState);
+ }
+ } else {
+ Checksum = Crc32cExtendMSanCompatible(0, &header, sizeof(header)); // start calculating checksum now
+ }
if (!PayloadSize && Checksum != ChecksumExpected) {
LOG_ERROR_IC_SESSION("ICIS10", "payload checksum error");
throw TExReestablishConnection{TDisconnectReason::ChecksumError()};
@@ -388,7 +398,14 @@ namespace NActors {
State = EState::HEADER;
if (!Params.Encryption) { // see if we are checksumming packet body
for (const auto&& [data, size] : Payload) {
- Checksum = Crc32cExtendMSanCompatible(Checksum, data, size);
+ if (Params.UseXxhash) {
+ XXH3_64bits_update(&XxhashState, data, size);
+ } else {
+ Checksum = Crc32cExtendMSanCompatible(Checksum, data, size);
+ }
+ }
+ if (Params.UseXxhash) {
+ Checksum = XXH3_64bits_digest(&XxhashState);
}
if (Checksum != ChecksumExpected) { // validate payload checksum
LOG_ERROR_IC_SESSION("ICIS04", "payload checksum error");
@@ -910,10 +927,18 @@ namespace NActors {
Y_VERIFY_DEBUG(!XdcChecksumQ.empty());
auto& [size, expected] = XdcChecksumQ.front();
const size_t n = Min<size_t>(size, span.size());
- XdcCurrentChecksum = Crc32cExtendMSanCompatible(XdcCurrentChecksum, span.data(), n);
+ if (Params.UseXxhash) {
+ XXH3_64bits_update(&XxhashXdcState, span.data(), n);
+ } else {
+ XdcCurrentChecksum = Crc32cExtendMSanCompatible(XdcCurrentChecksum, span.data(), n);
+ }
span = span.SubSpan(n, Max<size_t>());
size -= n;
if (!size) {
+ if (Params.UseXxhash) {
+ XdcCurrentChecksum = XXH3_64bits_digest(&XxhashXdcState);
+ XXH3_64bits_reset(&XxhashXdcState);
+ }
if (XdcCurrentChecksum != expected) {
LOG_ERROR_IC_SESSION("ICIS16", "payload checksum error");
throw TExReestablishConnection{TDisconnectReason::ChecksumError()};
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index a73418bf15..2f2b175530 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -1158,7 +1158,7 @@ namespace NActors {
}
TABLER() {
TABLED() { str << "Frame version/Checksum"; }
- TABLED() { str << (Params.Encryption ? "v2/none" : "v2/crc32c"); }
+ TABLED() { str << (Params.Encryption ? "v2/none" : Params.UseXxhash ? "v2/xxhash" : "v2/crc32c"); }
}
#define MON_VAR(NAME) \
TABLER() { \
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h
index e8d201c84c..5722ddb627 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h
@@ -12,6 +12,9 @@
#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
+#define XXH_INLINE_ALL
+#include <contrib/libs/xxhash/xxhash.h>
+
#include <util/generic/queue.h>
#include <util/generic/deque.h>
#include <util/datetime/cputimer.h>
@@ -251,6 +254,8 @@ namespace NActors {
TInterconnectProxyCommon::TPtr Common;
const ui32 NodeId;
const TSessionParams Params;
+ XXH3_state_t XxhashState;
+ XXH3_state_t XxhashXdcState;
size_t PayloadSize;
ui32 ChecksumExpected, Checksum;
diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h
index 4e4cfb4b1d..df0bf0a8c7 100644
--- a/library/cpp/actors/interconnect/packet.h
+++ b/library/cpp/actors/interconnect/packet.h
@@ -13,6 +13,9 @@
#include <util/generic/string.h>
#include <util/generic/list.h>
+#define XXH_INLINE_ALL
+#include <contrib/libs/xxhash/xxhash.h>
+
#include "types.h"
#include "outgoing_stream.h"
@@ -158,7 +161,7 @@ struct TTcpPacketOutTask : TNonCopyable {
// Preallocate some space to fill it later.
NInterconnect::TOutgoingStream::TBookmark Bookmark(size_t len) {
- if (Checksumming()) {
+ if (ChecksummingCrc32c()) {
Y_VERIFY_DEBUG(!InsideBookmark);
InsideBookmark = true;
PreBookmarkChecksum = std::exchange(InternalChecksum, 0);
@@ -171,7 +174,7 @@ struct TTcpPacketOutTask : TNonCopyable {
// Write previously bookmarked space.
void WriteBookmark(NInterconnect::TOutgoingStream::TBookmark&& bookmark, const void *buffer, size_t len) {
- if (Checksumming()) {
+ if (ChecksummingCrc32c()) {
Y_VERIFY_DEBUG(InsideBookmark);
InsideBookmark = false;
const ui32 bookmarkChecksum = Crc32cExtendMSanCompatible(PreBookmarkChecksum, buffer, len);
@@ -210,7 +213,7 @@ struct TTcpPacketOutTask : TNonCopyable {
template<bool External>
void ProcessChecksum(const void *buffer, size_t len) {
- if (Checksumming()) {
+ if (ChecksummingCrc32c()) {
if (External) {
ExternalChecksum = Crc32cExtendMSanCompatible(ExternalChecksum, buffer, len);
} else {
@@ -230,7 +233,19 @@ struct TTcpPacketOutTask : TNonCopyable {
static_cast<ui16>(InternalSize)
};
- if (Checksumming()) {
+ if (ChecksummingXxhash()) {
+ // write header with zero checksum to calculate whole packet checksum correctly
+ OutgoingStream.WriteBookmark(NInterconnect::TOutgoingStream::TBookmark(HeaderBookmark),
+ {reinterpret_cast<const char*>(&header), sizeof(header)});
+
+ // calculate packet checksum
+ XXH3_state_t state;
+ XXH3_64bits_reset(&state);
+ OutgoingStream.ScanLastBytes(GetPacketSize(), [&state](TContiguousSpan span) {
+ XXH3_64bits_update(&state, span.data(), span.size());
+ });
+ header.Checksum = XXH3_64bits_digest(&state);
+ } else if (ChecksummingCrc32c()) {
Y_VERIFY_DEBUG(!InsideBookmark);
const ui32 headerChecksum = Crc32cExtendMSanCompatible(0, &header, sizeof(header));
header.Checksum = Crc32cCombine(headerChecksum, InternalChecksum, InternalSize);
@@ -240,8 +255,12 @@ struct TTcpPacketOutTask : TNonCopyable {
sizeof(header)});
}
- bool Checksumming() const {
- return !Params.Encryption;
+ bool ChecksummingCrc32c() const {
+ return !Params.Encryption && !Params.UseXxhash;
+ }
+
+ bool ChecksummingXxhash() const {
+ return !Params.Encryption && Params.UseXxhash;
}
bool IsEmpty() const { return GetDataSize() == 0; }
diff --git a/library/cpp/actors/interconnect/types.h b/library/cpp/actors/interconnect/types.h
index 3963f3107d..0c36564efd 100644
--- a/library/cpp/actors/interconnect/types.h
+++ b/library/cpp/actors/interconnect/types.h
@@ -54,6 +54,7 @@ namespace NActors {
bool Encryption = {};
bool AuthOnly = {};
bool UseExternalDataChannel = {};
+ bool UseXxhash = {};
TString AuthCN;
NActors::TScopeId PeerScopeId;
};
diff --git a/library/cpp/actors/protos/interconnect.proto b/library/cpp/actors/protos/interconnect.proto
index 3108b4f24c..538ede33d1 100644
--- a/library/cpp/actors/protos/interconnect.proto
+++ b/library/cpp/actors/protos/interconnect.proto
@@ -73,6 +73,7 @@ message THandshakeRequest {
optional bool RequestAuthOnly = 19;
optional bool RequestExtendedTraceFmt = 20;
optional bool RequestExternalDataChannel = 21;
+ optional bool RequestXxhash = 24;
optional bytes CompatibilityInfo = 22;
@@ -100,6 +101,7 @@ message THandshakeSuccess {
optional bool AuthOnly = 12;
optional bool UseExtendedTraceFmt = 13;
optional bool UseExternalDataChannel = 14;
+ optional bool UseXxhash = 16;
optional bytes CompatibilityInfo = 15;
}