aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-04-14 11:55:52 +0300
committeralexvru <alexvru@ydb.tech>2023-04-14 11:55:52 +0300
commitb079747b8a665c7e1e9573163b558ffa7bb296f7 (patch)
treec965626c09a51c2afb967158c252bf380fff0725 /library/cpp
parent4ca3ea35a05a878e6f223274b9fc093146d7b478 (diff)
downloadydb-b079747b8a665c7e1e9573163b558ffa7bb296f7.tar.gz
Remove TTcpPacketHeader_v1 support from IC
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.cpp2
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.cpp12
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp42
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp12
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.h8
-rw-r--r--library/cpp/actors/interconnect/packet.h101
-rw-r--r--library/cpp/actors/interconnect/types.h1
7 files changed, 50 insertions, 128 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp
index 41709d6fba..69cc5e946a 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.cpp
+++ b/library/cpp/actors/interconnect/interconnect_channel.cpp
@@ -114,7 +114,7 @@ namespace NActors {
Y_VERIFY(maxBytes);
auto addChunk = [&](const void *data, size_t len) {
- event.UpdateChecksum(Params, data, len);
+ event.UpdateChecksum(data, len);
task.AppendBuf(data, len);
part->Size += len;
Y_VERIFY_DEBUG(maxBytes >= len);
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp
index c75e27ff5e..7baf6a3186 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.cpp
+++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp
@@ -808,9 +808,12 @@ namespace NActors {
const auto& s = success.GetSenderActorId();
PeerVirtualId.Parse(s.data(), s.size());
+ if (!success.GetUseModernFrame()) {
+ generateError("UseModernFrame not set, obsolete peer version");
+ }
+
// recover flags
Params.Encryption = success.GetStartEncryption();
- Params.UseModernFrame = success.GetUseModernFrame();
Params.AuthOnly = Params.Encryption && success.GetAuthOnly();
Params.UseExtendedTraceFmt = success.GetUseExtendedTraceFmt();
Params.UseExternalDataChannel = success.GetUseExternalDataChannel();
@@ -993,7 +996,10 @@ namespace NActors {
break;
}
- Params.UseModernFrame = request.GetRequestModernFrame();
+ if (!request.GetRequestModernFrame()) {
+ generateError("RequestModernFrame not set, obsolete peer version");
+ }
+
Params.AuthOnly = Params.Encryption && request.GetRequestAuthOnly() && Common->Settings.TlsAuthOnly;
Params.UseExtendedTraceFmt = request.GetRequestExtendedTraceFmt();
Params.UseExternalDataChannel = request.GetRequestExternalDataChannel() && Common->Settings.EnableExternalDataChannel;
@@ -1031,7 +1037,7 @@ namespace NActors {
if (Common->LocalScopeId != TScopeId()) {
FillInScopeId(*success.MutableServerScopeId());
}
- success.SetUseModernFrame(Params.UseModernFrame);
+ success.SetUseModernFrame(true);
success.SetAuthOnly(Params.AuthOnly);
success.SetUseExtendedTraceFmt(Params.UseExtendedTraceFmt);
success.SetUseExternalDataChannel(Params.UseExternalDataChannel);
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
index ee85f72dd0..ea239780a7 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
@@ -79,7 +79,6 @@ namespace NActors {
void TInputSessionTCP::ReceiveData() {
TTimeLimit limit(GetMaxCyclesPerEvent());
ui64 numDataBytes = 0;
- const size_t headerLen = Params.UseModernFrame ? sizeof(TTcpPacketHeader_v2) : sizeof(TTcpPacketHeader_v1);
LOG_DEBUG_IC_SESSION("ICIS02", "ReceiveData called");
@@ -94,10 +93,10 @@ namespace NActors {
switch (State) {
case EState::HEADER:
- if (IncomingData.GetSize() < headerLen) {
+ if (IncomingData.GetSize() < sizeof(Header)) {
break;
} else {
- ProcessHeader(headerLen);
+ ProcessHeader();
}
continue;
@@ -172,30 +171,19 @@ namespace NActors {
}
}
- void TInputSessionTCP::ProcessHeader(size_t headerLen) {
- const bool success = IncomingData.ExtractFrontPlain(Header.Data, headerLen);
+ void TInputSessionTCP::ProcessHeader() {
+ const bool success = IncomingData.ExtractFrontPlain(&Header, sizeof(Header));
Y_VERIFY(success);
- if (Params.UseModernFrame) {
- PayloadSize = Header.v2.PayloadLength;
- HeaderSerial = Header.v2.Serial;
- HeaderConfirm = Header.v2.Confirm;
- if (!Params.Encryption) {
- ChecksumExpected = std::exchange(Header.v2.Checksum, 0);
- Checksum = Crc32cExtendMSanCompatible(0, &Header.v2, sizeof(Header.v2)); // start calculating checksum now
- if (!PayloadSize && Checksum != ChecksumExpected) {
- LOG_ERROR_IC_SESSION("ICIS10", "payload checksum error");
- return ReestablishConnection(TDisconnectReason::ChecksumError());
- }
+ PayloadSize = Header.PayloadLength;
+ HeaderSerial = Header.Serial;
+ HeaderConfirm = Header.Confirm;
+ if (!Params.Encryption) {
+ ChecksumExpected = std::exchange(Header.Checksum, 0);
+ Checksum = Crc32cExtendMSanCompatible(0, &Header, sizeof(Header)); // start calculating checksum now
+ if (!PayloadSize && Checksum != ChecksumExpected) {
+ LOG_ERROR_IC_SESSION("ICIS10", "payload checksum error");
+ return ReestablishConnection(TDisconnectReason::ChecksumError());
}
- } else if (!Header.v1.Check()) {
- LOG_ERROR_IC_SESSION("ICIS03", "header checksum error");
- return ReestablishConnection(TDisconnectReason::ChecksumError());
- } else {
- PayloadSize = Header.v1.DataSize;
- HeaderSerial = Header.v1.Serial;
- HeaderConfirm = Header.v1.Confirm;
- ChecksumExpected = Header.v1.PayloadCRC32;
- Checksum = 0;
}
if (PayloadSize >= 65536) {
LOG_CRIT_IC_SESSION("ICIS07", "payload is way too big");
@@ -243,7 +231,7 @@ namespace NActors {
return; // there is still some data to receive in the Payload rope
}
State = EState::HEADER; // we'll continue with header next time
- if (!Params.UseModernFrame || !Params.Encryption) { // see if we are checksumming packet body
+ if (!Params.Encryption) { // see if we are checksumming packet body
for (const auto&& [data, size] : Payload) {
Checksum = Crc32cExtendMSanCompatible(Checksum, data, size);
}
@@ -327,7 +315,7 @@ namespace NActors {
}
void TInputSessionTCP::ProcessEvent(TRope& data, TEventData& descr) {
- if (!Params.UseModernFrame || descr.Checksum) {
+ if (descr.Checksum) {
ui32 checksum = 0;
for (const auto&& [data, size] : data) {
checksum = Crc32cExtendMSanCompatible(checksum, data, size);
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index 9d6a8ba9c1..72982c1458 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -300,8 +300,7 @@ namespace NActors {
BytesUnwritten = 0;
for (const auto& packet : SendQueue) {
- BytesUnwritten += (Params.UseModernFrame ? sizeof(TTcpPacketHeader_v2) : sizeof(TTcpPacketHeader_v1)) +
- packet.GetDataSize();
+ BytesUnwritten += sizeof(TTcpPacketHeader_v2) + packet.GetDataSize();
}
SwitchStuckPeriod();
@@ -582,11 +581,6 @@ namespace NActors {
LOG_DEBUG_IC_SESSION("ICS30", "WriteData WriteBlockedByFullSendBuffer# %s SendQueue.size# %zu",
ReceiveContext->WriteBlockedByFullSendBuffer ? "true" : "false", SendQueue.size());
- // update last confirmed packet number if it has changed
- if (SendQueuePos != SendQueue.end()) {
- SendQueuePos->UpdateConfirmIfPossible(ReceiveContext->GetLastProcessedPacketSerial());
- }
-
while (SendQueuePos != SendQueue.end() && !ReceiveContext->WriteBlockedByFullSendBuffer) {
for (auto it = SendQueuePos; it != SendQueue.end() && wbuffers.size() < maxElementsInIOV; ++it) {
it->AppendToIoVector(wbuffers, maxElementsInIOV);
@@ -805,7 +799,7 @@ namespace NActors {
packet->Sign();
// count number of bytes pending for write
- ui64 packetSize = (Params.UseModernFrame ? sizeof(TTcpPacketHeader_v2) : sizeof(TTcpPacketHeader_v1)) + packet->GetDataSize();
+ ui64 packetSize = sizeof(TTcpPacketHeader_v2) + packet->GetDataSize();
BytesUnwritten += packetSize;
LOG_DEBUG_IC_SESSION("ICS22", "outgoing packet Serial# %" PRIu64 " Confirm# %" PRIu64 " DataSize# %zu"
@@ -1144,7 +1138,7 @@ namespace NActors {
}
TABLER() {
TABLED() { str << "Frame version/Checksum"; }
- TABLED() { str << (!Params.UseModernFrame ? "v1/crc32c" : Params.Encryption ? "v2/none" : "v2/crc32c"); }
+ TABLED() { str << (Params.Encryption ? "v2/none" : "v2/crc32c"); }
}
#define MON_VAR(NAME) \
TABLER() { \
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h
index f836bdd1af..7d0e003a34 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h
@@ -218,11 +218,7 @@ namespace NActors {
const TSessionParams Params;
// header we are currently processing (parsed from the stream)
- union {
- TTcpPacketHeader_v1 v1;
- TTcpPacketHeader_v2 v2;
- char Data[1];
- } Header;
+ TTcpPacketHeader_v2 Header;
ui64 HeaderConfirm, HeaderSerial;
size_t PayloadSize;
@@ -250,7 +246,7 @@ namespace NActors {
void HandleResumeReceiveData();
void HandleConfirmUpdate();
void ReceiveData();
- void ProcessHeader(size_t headerLen);
+ void ProcessHeader();
void ProcessPayload(ui64& numDataBytes);
void ProcessEvent(TRope& data, TEventData& descr);
bool ReadMore();
diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h
index f3c506a663..d29d35a207 100644
--- a/library/cpp/actors/interconnect/packet.h
+++ b/library/cpp/actors/interconnect/packet.h
@@ -30,27 +30,6 @@ Y_FORCE_INLINE ui32 Crc32cExtendMSanCompatible(ui32 checksum, const void *data,
return Crc32cExtend(checksum, data, len);
}
-struct TTcpPacketHeader_v1 {
- ui32 HeaderCRC32;
- ui32 PayloadCRC32;
- ui64 Confirm;
- ui64 Serial;
- ui64 DataSize;
-
- inline bool Check() const {
- ui32 actual = Crc32cExtendMSanCompatible(0, &PayloadCRC32, sizeof(TTcpPacketHeader_v1) - sizeof(HeaderCRC32));
- return actual == HeaderCRC32;
- }
-
- inline void Sign() {
- HeaderCRC32 = Crc32cExtendMSanCompatible(0, &PayloadCRC32, sizeof(TTcpPacketHeader_v1) - sizeof(HeaderCRC32));
- }
-
- TString ToString() const {
- return Sprintf("{Confirm# %" PRIu64 " Serial# %" PRIu64 " DataSize# %" PRIu64 "}", Confirm, Serial, DataSize);
- }
-};
-
#pragma pack(push, 1)
struct TTcpPacketHeader_v2 {
ui64 Confirm;
@@ -60,20 +39,15 @@ struct TTcpPacketHeader_v2 {
};
#pragma pack(pop)
-union TTcpPacketBuf {
+struct TTcpPacketBuf {
static constexpr ui64 PingRequestMask = 0x8000000000000000ULL;
static constexpr ui64 PingResponseMask = 0x4000000000000000ULL;
static constexpr ui64 ClockMask = 0x2000000000000000ULL;
- static constexpr size_t PacketDataLen = 4096 * 2 - 96 - Max(sizeof(TTcpPacketHeader_v1), sizeof(TTcpPacketHeader_v2));
- struct {
- TTcpPacketHeader_v1 Header;
- char Data[PacketDataLen];
- } v1;
- struct {
- TTcpPacketHeader_v2 Header;
- char Data[PacketDataLen];
- } v2;
+ static constexpr size_t PacketDataLen = 4096 * 2 - 96 - sizeof(TTcpPacketHeader_v2);
+
+ TTcpPacketHeader_v2 Header;
+ char Data[PacketDataLen];
};
struct TEventData {
@@ -125,8 +99,8 @@ struct TEventHolder : TNonCopyable {
Descr.Checksum = 0;
}
- void UpdateChecksum(const TSessionParams& params, const void *buffer, size_t len) {
- if (FORCE_EVENT_CHECKSUM || !params.UseModernFrame) {
+ void UpdateChecksum(const void *buffer, size_t len) {
+ if (FORCE_EVENT_CHECKSUM) {
Descr.Checksum = Crc32cExtendMSanCompatible(Descr.Checksum, buffer, len);
}
}
@@ -174,16 +148,6 @@ public:
Reuse();
}
- template<typename T>
- auto ApplyToHeader(T&& callback) {
- return Params.UseModernFrame ? callback(Packet.v2.Header) : callback(Packet.v1.Header);
- }
-
- template<typename T>
- auto ApplyToHeader(T&& callback) const {
- return Params.UseModernFrame ? callback(Packet.v2.Header) : callback(Packet.v1.Header);
- }
-
bool IsAtBegin() const {
return !BufferIndex && !FirstBufferOffset && !TriedWriting;
}
@@ -194,11 +158,11 @@ public:
void Reuse() {
DataSize = 0;
- ApplyToHeader([this](auto& header) { Bufs.assign(1, {&header, sizeof(header)}); });
+ Bufs.assign(1, {&Packet.Header, sizeof(Packet.Header)});
BufferIndex = 0;
FirstBufferOffset = 0;
TriedWriting = false;
- FreeArea = Params.UseModernFrame ? Packet.v2.Data : Packet.v1.Data;
+ FreeArea = Packet.Data;
End = FreeArea + TTcpPacketBuf::PacketDataLen;
Orbit.Reset();
}
@@ -208,28 +172,18 @@ public:
}
void SetMetadata(ui64 serial, ui64 confirm) {
- ApplyToHeader([&](auto& header) {
- header.Serial = serial;
- header.Confirm = confirm;
- });
- }
-
- void UpdateConfirmIfPossible(ui64 confirm) {
- // we don't want to recalculate whole packet checksum for single confirmation update on v2
- if (!Params.UseModernFrame && IsAtBegin() && confirm != Packet.v1.Header.Confirm) {
- Packet.v1.Header.Confirm = confirm;
- Packet.v1.Header.Sign();
- }
+ Packet.Header.Serial = serial;
+ Packet.Header.Confirm = confirm;
}
size_t GetDataSize() const { return DataSize; }
ui64 GetSerial() const {
- return ApplyToHeader([](auto& header) { return header.Serial; });
+ return Packet.Header.Serial;
}
bool Confirmed(ui64 confirm) const {
- return ApplyToHeader([&](auto& header) { return IsEmpty() || header.Serial <= confirm; });
+ return IsEmpty() || Packet.Header.Serial <= confirm;
}
void *GetFreeArea() {
@@ -307,29 +261,14 @@ public:
}
void Sign() {
- if (Params.UseModernFrame) {
- Packet.v2.Header.Checksum = 0;
- Packet.v2.Header.PayloadLength = DataSize;
- if (!Params.Encryption) {
- ui32 sum = 0;
- for (const auto& item : Bufs) {
- sum = Crc32cExtendMSanCompatible(sum, item.Data, item.Size);
- }
- Packet.v2.Header.Checksum = sum;
+ Packet.Header.Checksum = 0;
+ Packet.Header.PayloadLength = DataSize;
+ if (!Params.Encryption) {
+ ui32 sum = 0;
+ for (const auto& item : Bufs) {
+ sum = Crc32cExtendMSanCompatible(sum, item.Data, item.Size);
}
- } else {
- Y_VERIFY(!Bufs.empty());
- auto it = Bufs.begin();
- static constexpr size_t headerLen = sizeof(TTcpPacketHeader_v1);
- Y_VERIFY(it->Data == &Packet.v1.Header && it->Size >= headerLen);
- ui32 sum = Crc32cExtendMSanCompatible(0, Packet.v1.Data, it->Size - headerLen);
- while (++it != Bufs.end()) {
- sum = Crc32cExtendMSanCompatible(sum, it->Data, it->Size);
- }
-
- Packet.v1.Header.PayloadCRC32 = sum;
- Packet.v1.Header.DataSize = DataSize;
- Packet.v1.Header.Sign();
+ Packet.Header.Checksum = sum;
}
}
};
diff --git a/library/cpp/actors/interconnect/types.h b/library/cpp/actors/interconnect/types.h
index b1d2e02f49..f2feda03f2 100644
--- a/library/cpp/actors/interconnect/types.h
+++ b/library/cpp/actors/interconnect/types.h
@@ -52,7 +52,6 @@ namespace NActors {
struct TSessionParams {
bool Encryption = {};
- bool UseModernFrame = {};
bool AuthOnly = {};
bool UseExtendedTraceFmt = {};
bool UseExternalDataChannel = {};