aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-07-24 20:13:33 +0300
committeralexvru <alexvru@ydb.tech>2023-07-24 20:13:33 +0300
commitb11444b99ab01c24be7bb66d63eef719491c1d7f (patch)
treef606ec2605ca3b36c6c698b2b79bcc5eee7002cb
parenta1262352a731dcdfc23df0bb13e6f04c464c4ea7 (diff)
downloadydb-b11444b99ab01c24be7bb66d63eef719491c1d7f.tar.gz
Improve payload wire encoding and XDC selection KIKIMR-18394
-rw-r--r--library/cpp/actors/core/event_load.h3
-rw-r--r--library/cpp/actors/core/event_pb.h118
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.cpp67
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.h8
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.cpp4
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp79
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.h3
-rw-r--r--library/cpp/actors/interconnect/types.h1
-rw-r--r--library/cpp/actors/protos/interconnect.proto2
9 files changed, 221 insertions, 64 deletions
diff --git a/library/cpp/actors/core/event_load.h b/library/cpp/actors/core/event_load.h
index 30cc26aa46..0062ee40db 100644
--- a/library/cpp/actors/core/event_load.h
+++ b/library/cpp/actors/core/event_load.h
@@ -24,6 +24,7 @@ namespace NActors {
size_t Size = 0; // full size of serialized event section (a chunk in rope)
size_t Tailroom = 0; // tailroom for the chunk
size_t Alignment = 0; // required alignment
+ bool IsInline = false; // if true, goes through ordinary channel
};
struct TEventSerializationInfo {
@@ -51,7 +52,7 @@ namespace NActors {
, SerializationInfo(original.SerializationInfo)
{
if (!SerializationInfo.Sections.empty()) {
- SerializationInfo.Sections.push_back(TEventSectionInfo{0, extraBuffer.size(), 0, 0});
+ SerializationInfo.Sections.push_back(TEventSectionInfo{0, extraBuffer.size(), 0, 0, true});
}
Append(std::move(extraBuffer));
}
diff --git a/library/cpp/actors/core/event_pb.h b/library/cpp/actors/core/event_pb.h
index 4c4f49b1cc..a4fdb33fb4 100644
--- a/library/cpp/actors/core/event_pb.h
+++ b/library/cpp/actors/core/event_pb.h
@@ -6,6 +6,7 @@
#include <google/protobuf/io/zero_copy_stream.h>
#include <google/protobuf/arena.h>
#include <library/cpp/actors/protos/actors.pb.h>
+#include <library/cpp/containers/stack_vector/stack_vec.h>
#include <util/generic/deque.h>
#include <util/system/context.h>
#include <util/system/filemap.h>
@@ -14,6 +15,9 @@
#include <array>
#include <span>
+// enable only when patch with this macro was successfully deployed
+#define USE_EXTENDED_PAYLOAD_FORMAT 0
+
namespace NActors {
class TRopeStream : public NProtoBuf::io::ZeroCopyInputStream {
@@ -144,6 +148,7 @@ namespace NActors {
class TEventPBBase: public TEventBase<TEv, TEventType> , public TRecHolder {
// a vector of data buffers referenced by record; if filled, then extended serialization mechanism applies
TVector<TRope> Payload;
+ size_t TotalPayloadSize = 0;
public:
using TRecHolder::Record;
@@ -191,8 +196,8 @@ namespace NActors {
result += SerializeNumber(Payload.size(), buf);
for (const TRope& rope : Payload) {
result += SerializeNumber(rope.GetSize(), buf);
- result += rope.GetSize();
}
+ result += TotalPayloadSize;
}
return result;
}
@@ -207,9 +212,20 @@ namespace NActors {
if (const auto& info = input->GetSerializationInfo(); info.IsExtendedFormat) {
// check marker
- if (!iter.Valid() || *iter.ContiguousData() != PayloadMarker) {
+ if (!iter.Valid() || (*iter.ContiguousData() != PayloadMarker && *iter.ContiguousData() != ExtendedPayloadMarker)) {
Y_FAIL("invalid event");
}
+
+ const bool dataIsSeparate = *iter.ContiguousData() == ExtendedPayloadMarker; // ropes go after sizes
+
+ auto fetchRope = [&](size_t len) {
+ TRope::TConstIterator begin = iter;
+ iter += len;
+ size -= len;
+ ev->Payload.emplace_back(begin, iter);
+ ev->TotalPayloadSize += len;
+ };
+
// skip marker
iter += 1;
--size;
@@ -218,6 +234,10 @@ namespace NActors {
if (numRopes == Max<size_t>()) {
Y_FAIL("invalid event");
}
+ TStackVec<size_t, 16> ropeLens;
+ if (dataIsSeparate) {
+ ropeLens.reserve(numRopes);
+ }
while (numRopes--) {
// parse length of the rope
const size_t len = DeserializeNumber(iter, size);
@@ -225,10 +245,14 @@ namespace NActors {
Y_FAIL("invalid event len# %zu size# %" PRIu64, len, size);
}
// extract the rope
- TRope::TConstIterator begin = iter;
- iter += len;
- size -= len;
- ev->Payload.emplace_back(begin, iter);
+ if (dataIsSeparate) {
+ ropeLens.push_back(len);
+ } else {
+ fetchRope(len);
+ }
+ }
+ for (size_t len : ropeLens) {
+ fetchRope(len);
}
}
@@ -261,6 +285,10 @@ namespace NActors {
return CreateSerializationInfoImpl(0);
}
+ bool AllowExternalDataChannel() const {
+ return TotalPayloadSize >= 4096;
+ }
+
public:
void ReservePayload(size_t size) {
Payload.reserve(size);
@@ -268,6 +296,7 @@ namespace NActors {
ui32 AddPayload(TRope&& rope) {
const ui32 id = Payload.size();
+ TotalPayloadSize += rope.size();
Payload.push_back(std::move(rope));
InvalidateCachedByteSize();
return id;
@@ -284,37 +313,49 @@ namespace NActors {
void StripPayload() {
Payload.clear();
+ TotalPayloadSize = 0;
}
protected:
TEventSerializationInfo CreateSerializationInfoImpl(size_t preserializedSize) const {
TEventSerializationInfo info;
-
- if (Payload) {
- char temp[MaxNumberBytes];
- info.Sections.push_back(TEventSectionInfo{0, 1 + SerializeNumber(Payload.size(), temp), 0, 0}); // payload marker and rope count
- for (const TRope& rope : Payload) {
- const size_t ropeSize = rope.GetSize();
- info.Sections.back().Size += SerializeNumber(ropeSize, temp);
- info.Sections.push_back(TEventSectionInfo{0, ropeSize, 0, 0}); // data as a separate section
+ info.IsExtendedFormat = static_cast<bool>(Payload);
+
+ if (static_cast<const TEv&>(*this).AllowExternalDataChannel()) {
+ if (Payload) {
+ char temp[MaxNumberBytes];
+#if USE_EXTENDED_PAYLOAD_FORMAT
+ size_t headerLen = 1 + SerializeNumber(Payload.size(), temp);
+ for (const TRope& rope : Payload) {
+ headerLen += SerializeNumber(rope.size(), temp);
+ }
+ info.Sections.push_back(TEventSectionInfo{0, headerLen, 0, 0, true});
+ for (const TRope& rope : Payload) {
+ info.Sections.push_back(TEventSectionInfo{0, rope.size(), 0, 0, false});
+ }
+#else
+ info.Sections.push_back(TEventSectionInfo{0, 1 + SerializeNumber(Payload.size(), temp), 0, 0, true}); // payload marker and rope count
+ for (const TRope& rope : Payload) {
+ const size_t ropeSize = rope.GetSize();
+ info.Sections.back().Size += SerializeNumber(ropeSize, temp);
+ info.Sections.push_back(TEventSectionInfo{0, ropeSize, 0, 0, false}); // data as a separate section
+ }
+#endif
}
- info.IsExtendedFormat = true;
- } else {
- info.IsExtendedFormat = false;
- }
- const size_t byteSize = Max<ssize_t>(0, Record.ByteSize()) + preserializedSize;
- info.Sections.push_back(TEventSectionInfo{0, byteSize, 0, 0}); // protobuf itself
+ const size_t byteSize = Max<ssize_t>(0, Record.ByteSize()) + preserializedSize;
+ info.Sections.push_back(TEventSectionInfo{0, byteSize, 0, 0, true}); // protobuf itself
#ifndef NDEBUG
- size_t total = 0;
- for (const auto& section : info.Sections) {
- total += section.Size;
- }
- size_t serialized = CalculateSerializedSize();
- Y_VERIFY(total == serialized, "total# %zu serialized# %zu byteSize# %zd Payload.size# %zu", total,
- serialized, byteSize, Payload.size());
+ size_t total = 0;
+ for (const auto& section : info.Sections) {
+ total += section.Size;
+ }
+ size_t serialized = CalculateSerializedSize();
+ Y_VERIFY(total == serialized, "total# %zu serialized# %zu byteSize# %zd Payload.size# %zu", total,
+ serialized, byteSize, Payload.size());
#endif
+ }
return info;
}
@@ -343,6 +384,27 @@ namespace NActors {
char buf[MaxNumberBytes];
return append(buf, SerializeNumber(number, buf));
};
+
+#if USE_EXTENDED_PAYLOAD_FORMAT
+ char marker = ExtendedPayloadMarker;
+ append(&marker, 1);
+ if (!appendNumber(Payload.size())) {
+ return false;
+ }
+ for (const TRope& rope : Payload) {
+ if (!appendNumber(rope.GetSize())) {
+ return false;
+ }
+ }
+ if (size) {
+ chunker->BackUp(std::exchange(size, 0));
+ }
+ for (const TRope& rope : Payload) {
+ if (!chunker->WriteRope(&rope)) {
+ return false;
+ }
+ }
+#else
char marker = PayloadMarker;
append(&marker, 1);
if (!appendNumber(Payload.size())) {
@@ -364,6 +426,7 @@ namespace NActors {
if (size) {
chunker->BackUp(size);
}
+#endif
}
if (preserialized && !chunker->WriteString(&preserialized)) {
@@ -376,6 +439,7 @@ namespace NActors {
protected:
mutable size_t CachedByteSize = 0;
+ static constexpr char ExtendedPayloadMarker = 0x06;
static constexpr char PayloadMarker = 0x07;
static constexpr size_t MaxNumberBytes = (sizeof(size_t) * CHAR_BIT + 6) / 7;
diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp
index e69483a8e9..6987c344de 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.cpp
+++ b/library/cpp/actors/interconnect/interconnect_channel.cpp
@@ -75,6 +75,8 @@ namespace NActors {
State = EState::BODY;
Iter = event.Buffer->GetBeginIter();
SerializationInfo = &event.Buffer->GetSerializationInfo();
+ SectionIndex = 0;
+ PartLenRemain = 0;
} else if (event.Event) {
State = EState::BODY;
IEventBase *base = event.Event.Get();
@@ -83,15 +85,16 @@ namespace NActors {
}
SerializationInfoContainer = base->CreateSerializationInfo();
SerializationInfo = &SerializationInfoContainer;
+ SectionIndex = 0;
+ PartLenRemain = 0;
} else { // event without buffer and IEventBase instance
State = EState::DESCRIPTOR;
SerializationInfoContainer = {};
SerializationInfo = &SerializationInfoContainer;
}
- EventInExternalDataChannel = Params.UseExternalDataChannel && !SerializationInfo->Sections.empty();
if (!event.EventSerializedSize) {
State = EState::DESCRIPTOR;
- } else if (EventInExternalDataChannel) {
+ } else if (Params.UseExternalDataChannel && !SerializationInfo->Sections.empty()) {
State = EState::SECTIONS;
SectionIndex = 0;
}
@@ -130,11 +133,15 @@ namespace NActors {
char *p = sectionInfo;
const auto& section = SerializationInfo->Sections[SectionIndex];
- *p++ = static_cast<ui8>(EXdcCommand::DECLARE_SECTION);
+ char& type = *p++;
+ type = static_cast<ui8>(EXdcCommand::DECLARE_SECTION);
p += NInterconnect::NDetail::SerializeNumber(section.Headroom, p);
p += NInterconnect::NDetail::SerializeNumber(section.Size, p);
p += NInterconnect::NDetail::SerializeNumber(section.Tailroom, p);
p += NInterconnect::NDetail::SerializeNumber(section.Alignment, p);
+ if (section.IsInline && Params.UseXdcShuffle) {
+ type = static_cast<ui8>(EXdcCommand::DECLARE_SECTION_INLINE);
+ }
Y_VERIFY(p <= std::end(sectionInfo));
const size_t declareLen = p - sectionInfo;
@@ -161,6 +168,8 @@ namespace NActors {
if (SectionIndex == SerializationInfo->Sections.size()) {
State = EState::BODY;
+ SectionIndex = 0;
+ PartLenRemain = 0;
}
break;
@@ -179,6 +188,8 @@ namespace NActors {
task.Append<External>(data, len);
}
*bytesSerialized += len;
+ Y_VERIFY_DEBUG(len <= PartLenRemain);
+ PartLenRemain -= len;
event.EventActuallySerialized += len;
if (event.EventActuallySerialized > MaxSerializedEventSize) {
@@ -189,15 +200,12 @@ namespace NActors {
bool complete = false;
if (event.Event) {
while (!complete) {
- TMutableContiguousSpan out = task.AcquireSpanForWriting<External>();
+ TMutableContiguousSpan out = task.AcquireSpanForWriting<External>().SubSpan(0, PartLenRemain);
if (!out.size()) {
break;
}
- ui32 bytesFed = 0;
for (const auto& [buffer, size] : Chunker.FeedBuf(out.data(), out.size())) {
addChunk(buffer, size, false);
- bytesFed += size;
- Y_VERIFY(bytesFed <= out.size());
}
complete = Chunker.IsComplete();
if (complete) {
@@ -206,7 +214,7 @@ namespace NActors {
}
} else if (event.Buffer) {
while (const size_t numb = Min<size_t>(External ? task.GetExternalFreeAmount() : task.GetInternalFreeAmount(),
- Iter.ContiguousSize())) {
+ Iter.ContiguousSize(), PartLenRemain)) {
const char *obuf = Iter.ContiguousData();
addChunk(obuf, numb, true);
Iter += numb;
@@ -223,14 +231,43 @@ namespace NActors {
}
bool TEventOutputChannel::FeedPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) {
- return EventInExternalDataChannel
- ? FeedExternalPayload(task, event, weightConsumed)
- : FeedInlinePayload(task, event, weightConsumed);
+ for (;;) {
+ // calculate inline or external part size (it may cover a few sections, not just single one)
+ while (!PartLenRemain) {
+ const auto& sections = SerializationInfo->Sections;
+ if (!Params.UseExternalDataChannel || sections.empty()) {
+ // all data goes inline
+ IsPartInline = true;
+ PartLenRemain = Max<size_t>();
+ } else if (!Params.UseXdcShuffle) {
+ // when UseXdcShuffle feature is not supported by the remote side, we transfer whole event over XDC
+ IsPartInline = false;
+ PartLenRemain = Max<size_t>();
+ } else {
+ Y_VERIFY(SectionIndex < sections.size());
+ IsPartInline = sections[SectionIndex].IsInline;
+ while (SectionIndex < sections.size() && IsPartInline == sections[SectionIndex].IsInline) {
+ PartLenRemain += sections[SectionIndex].Size;
+ ++SectionIndex;
+ }
+ }
+ }
+
+ // serialize bytes
+ const auto complete = IsPartInline
+ ? FeedInlinePayload(task, event, weightConsumed)
+ : FeedExternalPayload(task, event, weightConsumed);
+ if (!complete) { // no space to serialize
+ return false;
+ } else if (*complete) { // event serialized
+ return true;
+ }
+ }
}
- bool TEventOutputChannel::FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) {
+ std::optional<bool> TEventOutputChannel::FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) {
if (task.GetInternalFreeAmount() <= sizeof(TChannelPart)) {
- return false;
+ return std::nullopt;
}
auto partBookmark = task.Bookmark(sizeof(TChannelPart));
@@ -253,10 +290,10 @@ namespace NActors {
return complete;
}
- bool TEventOutputChannel::FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) {
+ std::optional<bool> TEventOutputChannel::FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) {
const size_t partSize = sizeof(TChannelPart) + sizeof(ui8) + sizeof(ui16) + (Params.Encryption ? 0 : sizeof(ui32));
if (task.GetInternalFreeAmount() < partSize || task.GetExternalFreeAmount() == 0) {
- return false;
+ return std::nullopt;
}
// clear external checksum for this chunk
diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h
index 2c5a286fe8..ef2da2fda7 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.h
+++ b/library/cpp/actors/interconnect/interconnect_channel.h
@@ -46,6 +46,7 @@ namespace NActors {
enum class EXdcCommand : ui8 {
DECLARE_SECTION = 1,
PUSH_DATA,
+ DECLARE_SECTION_INLINE,
};
struct TExSerializedEventTooLarge : std::exception {
@@ -133,7 +134,8 @@ namespace NActors {
TCoroutineChunkSerializer Chunker;
TEventSerializationInfo SerializationInfoContainer;
const TEventSerializationInfo *SerializationInfo = nullptr;
- bool EventInExternalDataChannel;
+ bool IsPartInline = false;
+ size_t PartLenRemain = 0;
size_t SectionIndex = 0;
std::vector<char> XdcData;
@@ -141,8 +143,8 @@ namespace NActors {
bool SerializeEvent(TTcpPacketOutTask& task, TEventHolder& event, size_t *bytesSerialized);
bool FeedPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
- bool FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
- bool FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
+ std::optional<bool> FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
+ std::optional<bool> FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
bool FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp
index 3e3e57948c..cb4788a33c 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.cpp
+++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp
@@ -755,6 +755,7 @@ namespace NActors {
request.SetRequestExtendedTraceFmt(true);
request.SetRequestExternalDataChannel(Common->Settings.EnableExternalDataChannel);
request.SetRequestXxhash(true);
+ request.SetRequestXdcShuffle(true);
request.SetHandshakeId(*HandshakeId);
SendExBlock(MainChannel, request, "ExRequest");
@@ -793,6 +794,7 @@ namespace NActors {
Params.AuthOnly = Params.Encryption && success.GetAuthOnly();
Params.UseExternalDataChannel = success.GetUseExternalDataChannel();
Params.UseXxhash = success.GetUseXxhash();
+ Params.UseXdcShuffle = success.GetUseXdcShuffle();
if (success.HasServerScopeId()) {
ParsePeerScopeId(success.GetServerScopeId());
}
@@ -1044,6 +1046,7 @@ namespace NActors {
Params.AuthOnly = Params.Encryption && request.GetRequestAuthOnly() && Common->Settings.TlsAuthOnly;
Params.UseExternalDataChannel = request.GetRequestExternalDataChannel() && Common->Settings.EnableExternalDataChannel;
Params.UseXxhash = request.GetRequestXxhash();
+ Params.UseXdcShuffle = request.GetRequestXdcShuffle();
if (Params.UseExternalDataChannel) {
if (request.HasHandshakeId()) {
@@ -1083,6 +1086,7 @@ namespace NActors {
success.SetUseExtendedTraceFmt(true);
success.SetUseExternalDataChannel(Params.UseExternalDataChannel);
success.SetUseXxhash(Params.UseXxhash);
+ success.SetUseXdcShuffle(Params.UseXdcShuffle);
SendExBlock(MainChannel, record, "ExReply");
// extract sender actor id (self virtual id)
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
index bb7e069883..3b42884355 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
@@ -449,7 +449,7 @@ namespace NActors {
} else if (IgnorePayload) { // throw payload out
Payload.EraseFront(part.Size);
} else if (!part.IsLastPart()) { // just ordinary inline event data
- Payload.ExtractFront(part.Size, &pendingEvent.Payload);
+ Payload.ExtractFront(part.Size, &pendingEvent.InternalPayload);
} else { // event final block
TEventDescr2 v2;
@@ -529,8 +529,9 @@ namespace NActors {
const char *ptr = XdcCommands.data();
const char *end = ptr + XdcCommands.size();
while (ptr != end) {
- switch (static_cast<EXdcCommand>(*ptr++)) {
- case EXdcCommand::DECLARE_SECTION: {
+ switch (const auto cmd = static_cast<EXdcCommand>(*ptr++)) {
+ case EXdcCommand::DECLARE_SECTION:
+ case EXdcCommand::DECLARE_SECTION_INLINE: {
// extract and validate command parameters
const ui64 headroom = NInterconnect::NDetail::DeserializeNumber(&ptr, end);
const ui64 size = NInterconnect::NDetail::DeserializeNumber(&ptr, end);
@@ -542,17 +543,22 @@ namespace NActors {
}
if (!IgnorePayload) { // process command if packet is being applied
- // allocate buffer and push it into the payload
auto& pendingEvent = context.PendingEvents.back();
- pendingEvent.SerializationInfo.Sections.push_back(TEventSectionInfo{headroom, size, tailroom, alignment});
- auto buffer = TRcBuf::Uninitialized(size, headroom, tailroom);
- if (size) {
- context.XdcBuffers.push_back(buffer.GetContiguousSpanMut());
+ const bool isInline = cmd == EXdcCommand::DECLARE_SECTION_INLINE;
+ pendingEvent.SerializationInfo.Sections.push_back(TEventSectionInfo{headroom, size, tailroom,
+ alignment, isInline});
+
+ Y_VERIFY(!isInline || Params.UseXdcShuffle);
+ if (!isInline) {
+ // allocate buffer and push it into the payload
+ auto buffer = TRcBuf::Uninitialized(size, headroom, tailroom);
+ if (size) {
+ context.XdcBuffers.push_back(buffer.GetContiguousSpanMut());
+ }
+ pendingEvent.ExternalPayload.Insert(pendingEvent.ExternalPayload.End(), TRope(std::move(buffer)));
+ pendingEvent.XdcSizeLeft += size;
+ ++XdcSections;
}
- pendingEvent.Payload.Insert(pendingEvent.Payload.End(), TRope(std::move(buffer)));
- pendingEvent.XdcSizeLeft += size;
-
- ++XdcSections;
}
continue;
}
@@ -608,18 +614,57 @@ namespace NActors {
if (!pendingEvent.EventData || pendingEvent.XdcSizeLeft) {
break; // event is not ready yet
}
-
auto& descr = *pendingEvent.EventData;
+
+ // create aggregated payload
+ TRope payload;
+ if (!pendingEvent.SerializationInfo.Sections.empty()) {
+ // unshuffle inline and external payloads into single event content
+ TRope *prev = nullptr;
+ size_t accumSize = 0;
+ for (const auto& s : pendingEvent.SerializationInfo.Sections) {
+ TRope *rope = s.IsInline
+ ? &pendingEvent.InternalPayload
+ : &pendingEvent.ExternalPayload;
+ if (rope != prev) {
+ if (accumSize) {
+ prev->ExtractFront(accumSize, &payload);
+ }
+ prev = rope;
+ accumSize = 0;
+ }
+ accumSize += s.Size;
+ }
+ if (accumSize) {
+ prev->ExtractFront(accumSize, &payload);
+ }
+
+ if (pendingEvent.InternalPayload || pendingEvent.ExternalPayload) {
+ LOG_CRIT_IC_SESSION("ICIS19", "unprocessed payload remains after shuffling"
+ " Type# 0x%08" PRIx32 " InternalPayload.size# %zu ExternalPayload.size# %zu",
+ descr.Type, pendingEvent.InternalPayload.size(), pendingEvent.ExternalPayload.size());
+ Y_VERIFY_DEBUG(false);
+ throw TExReestablishConnection{TDisconnectReason::FormatError()};
+ }
+ }
+
+ // we add any remains of internal payload to the end
+ if (auto& rope = pendingEvent.InternalPayload) {
+ rope.ExtractFront(rope.size(), &payload);
+ }
+ // and ensure there is no unprocessed external payload
+ Y_VERIFY(!pendingEvent.ExternalPayload);
+
#if IC_FORCE_HARDENED_PACKET_CHECKS
- if (descr.Len != pendingEvent.Payload.GetSize()) {
+ if (descr.Len != payload.GetSize()) {
LOG_CRIT_IC_SESSION("ICIS17", "event length mismatch Type# 0x%08" PRIx32 " received# %zu expected# %" PRIu32,
- descr.Type, pendingEvent.Payload.GetSize(), descr.Len);
+ descr.Type, payload.GetSize(), descr.Len);
throw TExReestablishConnection{TDisconnectReason::ChecksumError()};
}
#endif
if (descr.Checksum) {
ui32 checksum = 0;
- for (const auto&& [data, size] : pendingEvent.Payload) {
+ for (const auto&& [data, size] : payload) {
checksum = Crc32cExtendMSanCompatible(checksum, data, size);
}
if (checksum != descr.Checksum) {
@@ -633,7 +678,7 @@ namespace NActors {
descr.Flags & ~IEventHandle::FlagExtendedFormat,
descr.Recipient,
descr.Sender,
- MakeIntrusive<TEventSerializedData>(std::move(pendingEvent.Payload), std::move(pendingEvent.SerializationInfo)),
+ MakeIntrusive<TEventSerializedData>(std::move(payload), std::move(pendingEvent.SerializationInfo)),
descr.Cookie,
Params.PeerScopeId,
std::move(descr.TraceId));
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h
index adc9fd3710..e1c555e629 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h
@@ -126,7 +126,8 @@ namespace NActors {
struct TPerChannelContext {
struct TPendingEvent {
TEventSerializationInfo SerializationInfo;
- TRope Payload;
+ TRope InternalPayload;
+ TRope ExternalPayload;
std::optional<TEventData> EventData;
// number of bytes remaining through XDC channel
diff --git a/library/cpp/actors/interconnect/types.h b/library/cpp/actors/interconnect/types.h
index 7628fbd968..14b1a1c7a6 100644
--- a/library/cpp/actors/interconnect/types.h
+++ b/library/cpp/actors/interconnect/types.h
@@ -57,6 +57,7 @@ namespace NActors {
bool AuthOnly = {};
bool UseExternalDataChannel = {};
bool UseXxhash = {};
+ bool UseXdcShuffle = {};
TString AuthCN;
NActors::TScopeId PeerScopeId;
};
diff --git a/library/cpp/actors/protos/interconnect.proto b/library/cpp/actors/protos/interconnect.proto
index 538ede33d1..0e88f3bce5 100644
--- a/library/cpp/actors/protos/interconnect.proto
+++ b/library/cpp/actors/protos/interconnect.proto
@@ -74,6 +74,7 @@ message THandshakeRequest {
optional bool RequestExtendedTraceFmt = 20;
optional bool RequestExternalDataChannel = 21;
optional bool RequestXxhash = 24;
+ optional bool RequestXdcShuffle = 25;
optional bytes CompatibilityInfo = 22;
@@ -102,6 +103,7 @@ message THandshakeSuccess {
optional bool UseExtendedTraceFmt = 13;
optional bool UseExternalDataChannel = 14;
optional bool UseXxhash = 16;
+ optional bool UseXdcShuffle = 17;
optional bytes CompatibilityInfo = 15;
}