diff options
author | alexvru <alexvru@ydb.tech> | 2023-07-24 20:13:33 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-07-24 20:13:33 +0300 |
commit | b11444b99ab01c24be7bb66d63eef719491c1d7f (patch) | |
tree | f606ec2605ca3b36c6c698b2b79bcc5eee7002cb | |
parent | a1262352a731dcdfc23df0bb13e6f04c464c4ea7 (diff) | |
download | ydb-b11444b99ab01c24be7bb66d63eef719491c1d7f.tar.gz |
Improve payload wire encoding and XDC selection KIKIMR-18394
9 files changed, 221 insertions, 64 deletions
diff --git a/library/cpp/actors/core/event_load.h b/library/cpp/actors/core/event_load.h index 30cc26aa46..0062ee40db 100644 --- a/library/cpp/actors/core/event_load.h +++ b/library/cpp/actors/core/event_load.h @@ -24,6 +24,7 @@ namespace NActors { size_t Size = 0; // full size of serialized event section (a chunk in rope) size_t Tailroom = 0; // tailroom for the chunk size_t Alignment = 0; // required alignment + bool IsInline = false; // if true, goes through ordinary channel }; struct TEventSerializationInfo { @@ -51,7 +52,7 @@ namespace NActors { , SerializationInfo(original.SerializationInfo) { if (!SerializationInfo.Sections.empty()) { - SerializationInfo.Sections.push_back(TEventSectionInfo{0, extraBuffer.size(), 0, 0}); + SerializationInfo.Sections.push_back(TEventSectionInfo{0, extraBuffer.size(), 0, 0, true}); } Append(std::move(extraBuffer)); } diff --git a/library/cpp/actors/core/event_pb.h b/library/cpp/actors/core/event_pb.h index 4c4f49b1cc..a4fdb33fb4 100644 --- a/library/cpp/actors/core/event_pb.h +++ b/library/cpp/actors/core/event_pb.h @@ -6,6 +6,7 @@ #include <google/protobuf/io/zero_copy_stream.h> #include <google/protobuf/arena.h> #include <library/cpp/actors/protos/actors.pb.h> +#include <library/cpp/containers/stack_vector/stack_vec.h> #include <util/generic/deque.h> #include <util/system/context.h> #include <util/system/filemap.h> @@ -14,6 +15,9 @@ #include <array> #include <span> +// enable only when patch with this macro was successfully deployed +#define USE_EXTENDED_PAYLOAD_FORMAT 0 + namespace NActors { class TRopeStream : public NProtoBuf::io::ZeroCopyInputStream { @@ -144,6 +148,7 @@ namespace NActors { class TEventPBBase: public TEventBase<TEv, TEventType> , public TRecHolder { // a vector of data buffers referenced by record; if filled, then extended serialization mechanism applies TVector<TRope> Payload; + size_t TotalPayloadSize = 0; public: using TRecHolder::Record; @@ -191,8 +196,8 @@ namespace NActors { result += SerializeNumber(Payload.size(), buf); for (const TRope& rope : Payload) { result += SerializeNumber(rope.GetSize(), buf); - result += rope.GetSize(); } + result += TotalPayloadSize; } return result; } @@ -207,9 +212,20 @@ namespace NActors { if (const auto& info = input->GetSerializationInfo(); info.IsExtendedFormat) { // check marker - if (!iter.Valid() || *iter.ContiguousData() != PayloadMarker) { + if (!iter.Valid() || (*iter.ContiguousData() != PayloadMarker && *iter.ContiguousData() != ExtendedPayloadMarker)) { Y_FAIL("invalid event"); } + + const bool dataIsSeparate = *iter.ContiguousData() == ExtendedPayloadMarker; // ropes go after sizes + + auto fetchRope = [&](size_t len) { + TRope::TConstIterator begin = iter; + iter += len; + size -= len; + ev->Payload.emplace_back(begin, iter); + ev->TotalPayloadSize += len; + }; + // skip marker iter += 1; --size; @@ -218,6 +234,10 @@ namespace NActors { if (numRopes == Max<size_t>()) { Y_FAIL("invalid event"); } + TStackVec<size_t, 16> ropeLens; + if (dataIsSeparate) { + ropeLens.reserve(numRopes); + } while (numRopes--) { // parse length of the rope const size_t len = DeserializeNumber(iter, size); @@ -225,10 +245,14 @@ namespace NActors { Y_FAIL("invalid event len# %zu size# %" PRIu64, len, size); } // extract the rope - TRope::TConstIterator begin = iter; - iter += len; - size -= len; - ev->Payload.emplace_back(begin, iter); + if (dataIsSeparate) { + ropeLens.push_back(len); + } else { + fetchRope(len); + } + } + for (size_t len : ropeLens) { + fetchRope(len); } } @@ -261,6 +285,10 @@ namespace NActors { return CreateSerializationInfoImpl(0); } + bool AllowExternalDataChannel() const { + return TotalPayloadSize >= 4096; + } + public: void ReservePayload(size_t size) { Payload.reserve(size); @@ -268,6 +296,7 @@ namespace NActors { ui32 AddPayload(TRope&& rope) { const ui32 id = Payload.size(); + TotalPayloadSize += rope.size(); Payload.push_back(std::move(rope)); InvalidateCachedByteSize(); return id; @@ -284,37 +313,49 @@ namespace NActors { void StripPayload() { Payload.clear(); + TotalPayloadSize = 0; } protected: TEventSerializationInfo CreateSerializationInfoImpl(size_t preserializedSize) const { TEventSerializationInfo info; - - if (Payload) { - char temp[MaxNumberBytes]; - info.Sections.push_back(TEventSectionInfo{0, 1 + SerializeNumber(Payload.size(), temp), 0, 0}); // payload marker and rope count - for (const TRope& rope : Payload) { - const size_t ropeSize = rope.GetSize(); - info.Sections.back().Size += SerializeNumber(ropeSize, temp); - info.Sections.push_back(TEventSectionInfo{0, ropeSize, 0, 0}); // data as a separate section + info.IsExtendedFormat = static_cast<bool>(Payload); + + if (static_cast<const TEv&>(*this).AllowExternalDataChannel()) { + if (Payload) { + char temp[MaxNumberBytes]; +#if USE_EXTENDED_PAYLOAD_FORMAT + size_t headerLen = 1 + SerializeNumber(Payload.size(), temp); + for (const TRope& rope : Payload) { + headerLen += SerializeNumber(rope.size(), temp); + } + info.Sections.push_back(TEventSectionInfo{0, headerLen, 0, 0, true}); + for (const TRope& rope : Payload) { + info.Sections.push_back(TEventSectionInfo{0, rope.size(), 0, 0, false}); + } +#else + info.Sections.push_back(TEventSectionInfo{0, 1 + SerializeNumber(Payload.size(), temp), 0, 0, true}); // payload marker and rope count + for (const TRope& rope : Payload) { + const size_t ropeSize = rope.GetSize(); + info.Sections.back().Size += SerializeNumber(ropeSize, temp); + info.Sections.push_back(TEventSectionInfo{0, ropeSize, 0, 0, false}); // data as a separate section + } +#endif } - info.IsExtendedFormat = true; - } else { - info.IsExtendedFormat = false; - } - const size_t byteSize = Max<ssize_t>(0, Record.ByteSize()) + preserializedSize; - info.Sections.push_back(TEventSectionInfo{0, byteSize, 0, 0}); // protobuf itself + const size_t byteSize = Max<ssize_t>(0, Record.ByteSize()) + preserializedSize; + info.Sections.push_back(TEventSectionInfo{0, byteSize, 0, 0, true}); // protobuf itself #ifndef NDEBUG - size_t total = 0; - for (const auto& section : info.Sections) { - total += section.Size; - } - size_t serialized = CalculateSerializedSize(); - Y_VERIFY(total == serialized, "total# %zu serialized# %zu byteSize# %zd Payload.size# %zu", total, - serialized, byteSize, Payload.size()); + size_t total = 0; + for (const auto& section : info.Sections) { + total += section.Size; + } + size_t serialized = CalculateSerializedSize(); + Y_VERIFY(total == serialized, "total# %zu serialized# %zu byteSize# %zd Payload.size# %zu", total, + serialized, byteSize, Payload.size()); #endif + } return info; } @@ -343,6 +384,27 @@ namespace NActors { char buf[MaxNumberBytes]; return append(buf, SerializeNumber(number, buf)); }; + +#if USE_EXTENDED_PAYLOAD_FORMAT + char marker = ExtendedPayloadMarker; + append(&marker, 1); + if (!appendNumber(Payload.size())) { + return false; + } + for (const TRope& rope : Payload) { + if (!appendNumber(rope.GetSize())) { + return false; + } + } + if (size) { + chunker->BackUp(std::exchange(size, 0)); + } + for (const TRope& rope : Payload) { + if (!chunker->WriteRope(&rope)) { + return false; + } + } +#else char marker = PayloadMarker; append(&marker, 1); if (!appendNumber(Payload.size())) { @@ -364,6 +426,7 @@ namespace NActors { if (size) { chunker->BackUp(size); } +#endif } if (preserialized && !chunker->WriteString(&preserialized)) { @@ -376,6 +439,7 @@ namespace NActors { protected: mutable size_t CachedByteSize = 0; + static constexpr char ExtendedPayloadMarker = 0x06; static constexpr char PayloadMarker = 0x07; static constexpr size_t MaxNumberBytes = (sizeof(size_t) * CHAR_BIT + 6) / 7; diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp index e69483a8e9..6987c344de 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.cpp +++ b/library/cpp/actors/interconnect/interconnect_channel.cpp @@ -75,6 +75,8 @@ namespace NActors { State = EState::BODY; Iter = event.Buffer->GetBeginIter(); SerializationInfo = &event.Buffer->GetSerializationInfo(); + SectionIndex = 0; + PartLenRemain = 0; } else if (event.Event) { State = EState::BODY; IEventBase *base = event.Event.Get(); @@ -83,15 +85,16 @@ namespace NActors { } SerializationInfoContainer = base->CreateSerializationInfo(); SerializationInfo = &SerializationInfoContainer; + SectionIndex = 0; + PartLenRemain = 0; } else { // event without buffer and IEventBase instance State = EState::DESCRIPTOR; SerializationInfoContainer = {}; SerializationInfo = &SerializationInfoContainer; } - EventInExternalDataChannel = Params.UseExternalDataChannel && !SerializationInfo->Sections.empty(); if (!event.EventSerializedSize) { State = EState::DESCRIPTOR; - } else if (EventInExternalDataChannel) { + } else if (Params.UseExternalDataChannel && !SerializationInfo->Sections.empty()) { State = EState::SECTIONS; SectionIndex = 0; } @@ -130,11 +133,15 @@ namespace NActors { char *p = sectionInfo; const auto& section = SerializationInfo->Sections[SectionIndex]; - *p++ = static_cast<ui8>(EXdcCommand::DECLARE_SECTION); + char& type = *p++; + type = static_cast<ui8>(EXdcCommand::DECLARE_SECTION); p += NInterconnect::NDetail::SerializeNumber(section.Headroom, p); p += NInterconnect::NDetail::SerializeNumber(section.Size, p); p += NInterconnect::NDetail::SerializeNumber(section.Tailroom, p); p += NInterconnect::NDetail::SerializeNumber(section.Alignment, p); + if (section.IsInline && Params.UseXdcShuffle) { + type = static_cast<ui8>(EXdcCommand::DECLARE_SECTION_INLINE); + } Y_VERIFY(p <= std::end(sectionInfo)); const size_t declareLen = p - sectionInfo; @@ -161,6 +168,8 @@ namespace NActors { if (SectionIndex == SerializationInfo->Sections.size()) { State = EState::BODY; + SectionIndex = 0; + PartLenRemain = 0; } break; @@ -179,6 +188,8 @@ namespace NActors { task.Append<External>(data, len); } *bytesSerialized += len; + Y_VERIFY_DEBUG(len <= PartLenRemain); + PartLenRemain -= len; event.EventActuallySerialized += len; if (event.EventActuallySerialized > MaxSerializedEventSize) { @@ -189,15 +200,12 @@ namespace NActors { bool complete = false; if (event.Event) { while (!complete) { - TMutableContiguousSpan out = task.AcquireSpanForWriting<External>(); + TMutableContiguousSpan out = task.AcquireSpanForWriting<External>().SubSpan(0, PartLenRemain); if (!out.size()) { break; } - ui32 bytesFed = 0; for (const auto& [buffer, size] : Chunker.FeedBuf(out.data(), out.size())) { addChunk(buffer, size, false); - bytesFed += size; - Y_VERIFY(bytesFed <= out.size()); } complete = Chunker.IsComplete(); if (complete) { @@ -206,7 +214,7 @@ namespace NActors { } } else if (event.Buffer) { while (const size_t numb = Min<size_t>(External ? task.GetExternalFreeAmount() : task.GetInternalFreeAmount(), - Iter.ContiguousSize())) { + Iter.ContiguousSize(), PartLenRemain)) { const char *obuf = Iter.ContiguousData(); addChunk(obuf, numb, true); Iter += numb; @@ -223,14 +231,43 @@ namespace NActors { } bool TEventOutputChannel::FeedPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) { - return EventInExternalDataChannel - ? FeedExternalPayload(task, event, weightConsumed) - : FeedInlinePayload(task, event, weightConsumed); + for (;;) { + // calculate inline or external part size (it may cover a few sections, not just single one) + while (!PartLenRemain) { + const auto& sections = SerializationInfo->Sections; + if (!Params.UseExternalDataChannel || sections.empty()) { + // all data goes inline + IsPartInline = true; + PartLenRemain = Max<size_t>(); + } else if (!Params.UseXdcShuffle) { + // when UseXdcShuffle feature is not supported by the remote side, we transfer whole event over XDC + IsPartInline = false; + PartLenRemain = Max<size_t>(); + } else { + Y_VERIFY(SectionIndex < sections.size()); + IsPartInline = sections[SectionIndex].IsInline; + while (SectionIndex < sections.size() && IsPartInline == sections[SectionIndex].IsInline) { + PartLenRemain += sections[SectionIndex].Size; + ++SectionIndex; + } + } + } + + // serialize bytes + const auto complete = IsPartInline + ? FeedInlinePayload(task, event, weightConsumed) + : FeedExternalPayload(task, event, weightConsumed); + if (!complete) { // no space to serialize + return false; + } else if (*complete) { // event serialized + return true; + } + } } - bool TEventOutputChannel::FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) { + std::optional<bool> TEventOutputChannel::FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) { if (task.GetInternalFreeAmount() <= sizeof(TChannelPart)) { - return false; + return std::nullopt; } auto partBookmark = task.Bookmark(sizeof(TChannelPart)); @@ -253,10 +290,10 @@ namespace NActors { return complete; } - bool TEventOutputChannel::FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) { + std::optional<bool> TEventOutputChannel::FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) { const size_t partSize = sizeof(TChannelPart) + sizeof(ui8) + sizeof(ui16) + (Params.Encryption ? 0 : sizeof(ui32)); if (task.GetInternalFreeAmount() < partSize || task.GetExternalFreeAmount() == 0) { - return false; + return std::nullopt; } // clear external checksum for this chunk diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h index 2c5a286fe8..ef2da2fda7 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.h +++ b/library/cpp/actors/interconnect/interconnect_channel.h @@ -46,6 +46,7 @@ namespace NActors { enum class EXdcCommand : ui8 { DECLARE_SECTION = 1, PUSH_DATA, + DECLARE_SECTION_INLINE, }; struct TExSerializedEventTooLarge : std::exception { @@ -133,7 +134,8 @@ namespace NActors { TCoroutineChunkSerializer Chunker; TEventSerializationInfo SerializationInfoContainer; const TEventSerializationInfo *SerializationInfo = nullptr; - bool EventInExternalDataChannel; + bool IsPartInline = false; + size_t PartLenRemain = 0; size_t SectionIndex = 0; std::vector<char> XdcData; @@ -141,8 +143,8 @@ namespace NActors { bool SerializeEvent(TTcpPacketOutTask& task, TEventHolder& event, size_t *bytesSerialized); bool FeedPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); - bool FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); - bool FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); + std::optional<bool> FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); + std::optional<bool> FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); bool FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp index 3e3e57948c..cb4788a33c 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.cpp +++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp @@ -755,6 +755,7 @@ namespace NActors { request.SetRequestExtendedTraceFmt(true); request.SetRequestExternalDataChannel(Common->Settings.EnableExternalDataChannel); request.SetRequestXxhash(true); + request.SetRequestXdcShuffle(true); request.SetHandshakeId(*HandshakeId); SendExBlock(MainChannel, request, "ExRequest"); @@ -793,6 +794,7 @@ namespace NActors { Params.AuthOnly = Params.Encryption && success.GetAuthOnly(); Params.UseExternalDataChannel = success.GetUseExternalDataChannel(); Params.UseXxhash = success.GetUseXxhash(); + Params.UseXdcShuffle = success.GetUseXdcShuffle(); if (success.HasServerScopeId()) { ParsePeerScopeId(success.GetServerScopeId()); } @@ -1044,6 +1046,7 @@ namespace NActors { Params.AuthOnly = Params.Encryption && request.GetRequestAuthOnly() && Common->Settings.TlsAuthOnly; Params.UseExternalDataChannel = request.GetRequestExternalDataChannel() && Common->Settings.EnableExternalDataChannel; Params.UseXxhash = request.GetRequestXxhash(); + Params.UseXdcShuffle = request.GetRequestXdcShuffle(); if (Params.UseExternalDataChannel) { if (request.HasHandshakeId()) { @@ -1083,6 +1086,7 @@ namespace NActors { success.SetUseExtendedTraceFmt(true); success.SetUseExternalDataChannel(Params.UseExternalDataChannel); success.SetUseXxhash(Params.UseXxhash); + success.SetUseXdcShuffle(Params.UseXdcShuffle); SendExBlock(MainChannel, record, "ExReply"); // extract sender actor id (self virtual id) diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index bb7e069883..3b42884355 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -449,7 +449,7 @@ namespace NActors { } else if (IgnorePayload) { // throw payload out Payload.EraseFront(part.Size); } else if (!part.IsLastPart()) { // just ordinary inline event data - Payload.ExtractFront(part.Size, &pendingEvent.Payload); + Payload.ExtractFront(part.Size, &pendingEvent.InternalPayload); } else { // event final block TEventDescr2 v2; @@ -529,8 +529,9 @@ namespace NActors { const char *ptr = XdcCommands.data(); const char *end = ptr + XdcCommands.size(); while (ptr != end) { - switch (static_cast<EXdcCommand>(*ptr++)) { - case EXdcCommand::DECLARE_SECTION: { + switch (const auto cmd = static_cast<EXdcCommand>(*ptr++)) { + case EXdcCommand::DECLARE_SECTION: + case EXdcCommand::DECLARE_SECTION_INLINE: { // extract and validate command parameters const ui64 headroom = NInterconnect::NDetail::DeserializeNumber(&ptr, end); const ui64 size = NInterconnect::NDetail::DeserializeNumber(&ptr, end); @@ -542,17 +543,22 @@ namespace NActors { } if (!IgnorePayload) { // process command if packet is being applied - // allocate buffer and push it into the payload auto& pendingEvent = context.PendingEvents.back(); - pendingEvent.SerializationInfo.Sections.push_back(TEventSectionInfo{headroom, size, tailroom, alignment}); - auto buffer = TRcBuf::Uninitialized(size, headroom, tailroom); - if (size) { - context.XdcBuffers.push_back(buffer.GetContiguousSpanMut()); + const bool isInline = cmd == EXdcCommand::DECLARE_SECTION_INLINE; + pendingEvent.SerializationInfo.Sections.push_back(TEventSectionInfo{headroom, size, tailroom, + alignment, isInline}); + + Y_VERIFY(!isInline || Params.UseXdcShuffle); + if (!isInline) { + // allocate buffer and push it into the payload + auto buffer = TRcBuf::Uninitialized(size, headroom, tailroom); + if (size) { + context.XdcBuffers.push_back(buffer.GetContiguousSpanMut()); + } + pendingEvent.ExternalPayload.Insert(pendingEvent.ExternalPayload.End(), TRope(std::move(buffer))); + pendingEvent.XdcSizeLeft += size; + ++XdcSections; } - pendingEvent.Payload.Insert(pendingEvent.Payload.End(), TRope(std::move(buffer))); - pendingEvent.XdcSizeLeft += size; - - ++XdcSections; } continue; } @@ -608,18 +614,57 @@ namespace NActors { if (!pendingEvent.EventData || pendingEvent.XdcSizeLeft) { break; // event is not ready yet } - auto& descr = *pendingEvent.EventData; + + // create aggregated payload + TRope payload; + if (!pendingEvent.SerializationInfo.Sections.empty()) { + // unshuffle inline and external payloads into single event content + TRope *prev = nullptr; + size_t accumSize = 0; + for (const auto& s : pendingEvent.SerializationInfo.Sections) { + TRope *rope = s.IsInline + ? &pendingEvent.InternalPayload + : &pendingEvent.ExternalPayload; + if (rope != prev) { + if (accumSize) { + prev->ExtractFront(accumSize, &payload); + } + prev = rope; + accumSize = 0; + } + accumSize += s.Size; + } + if (accumSize) { + prev->ExtractFront(accumSize, &payload); + } + + if (pendingEvent.InternalPayload || pendingEvent.ExternalPayload) { + LOG_CRIT_IC_SESSION("ICIS19", "unprocessed payload remains after shuffling" + " Type# 0x%08" PRIx32 " InternalPayload.size# %zu ExternalPayload.size# %zu", + descr.Type, pendingEvent.InternalPayload.size(), pendingEvent.ExternalPayload.size()); + Y_VERIFY_DEBUG(false); + throw TExReestablishConnection{TDisconnectReason::FormatError()}; + } + } + + // we add any remains of internal payload to the end + if (auto& rope = pendingEvent.InternalPayload) { + rope.ExtractFront(rope.size(), &payload); + } + // and ensure there is no unprocessed external payload + Y_VERIFY(!pendingEvent.ExternalPayload); + #if IC_FORCE_HARDENED_PACKET_CHECKS - if (descr.Len != pendingEvent.Payload.GetSize()) { + if (descr.Len != payload.GetSize()) { LOG_CRIT_IC_SESSION("ICIS17", "event length mismatch Type# 0x%08" PRIx32 " received# %zu expected# %" PRIu32, - descr.Type, pendingEvent.Payload.GetSize(), descr.Len); + descr.Type, payload.GetSize(), descr.Len); throw TExReestablishConnection{TDisconnectReason::ChecksumError()}; } #endif if (descr.Checksum) { ui32 checksum = 0; - for (const auto&& [data, size] : pendingEvent.Payload) { + for (const auto&& [data, size] : payload) { checksum = Crc32cExtendMSanCompatible(checksum, data, size); } if (checksum != descr.Checksum) { @@ -633,7 +678,7 @@ namespace NActors { descr.Flags & ~IEventHandle::FlagExtendedFormat, descr.Recipient, descr.Sender, - MakeIntrusive<TEventSerializedData>(std::move(pendingEvent.Payload), std::move(pendingEvent.SerializationInfo)), + MakeIntrusive<TEventSerializedData>(std::move(payload), std::move(pendingEvent.SerializationInfo)), descr.Cookie, Params.PeerScopeId, std::move(descr.TraceId)); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index adc9fd3710..e1c555e629 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -126,7 +126,8 @@ namespace NActors { struct TPerChannelContext { struct TPendingEvent { TEventSerializationInfo SerializationInfo; - TRope Payload; + TRope InternalPayload; + TRope ExternalPayload; std::optional<TEventData> EventData; // number of bytes remaining through XDC channel diff --git a/library/cpp/actors/interconnect/types.h b/library/cpp/actors/interconnect/types.h index 7628fbd968..14b1a1c7a6 100644 --- a/library/cpp/actors/interconnect/types.h +++ b/library/cpp/actors/interconnect/types.h @@ -57,6 +57,7 @@ namespace NActors { bool AuthOnly = {}; bool UseExternalDataChannel = {}; bool UseXxhash = {}; + bool UseXdcShuffle = {}; TString AuthCN; NActors::TScopeId PeerScopeId; }; diff --git a/library/cpp/actors/protos/interconnect.proto b/library/cpp/actors/protos/interconnect.proto index 538ede33d1..0e88f3bce5 100644 --- a/library/cpp/actors/protos/interconnect.proto +++ b/library/cpp/actors/protos/interconnect.proto @@ -74,6 +74,7 @@ message THandshakeRequest { optional bool RequestExtendedTraceFmt = 20; optional bool RequestExternalDataChannel = 21; optional bool RequestXxhash = 24; + optional bool RequestXdcShuffle = 25; optional bytes CompatibilityInfo = 22; @@ -102,6 +103,7 @@ message THandshakeSuccess { optional bool UseExtendedTraceFmt = 13; optional bool UseExternalDataChannel = 14; optional bool UseXxhash = 16; + optional bool UseXdcShuffle = 17; optional bytes CompatibilityInfo = 15; } |