aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors
diff options
context:
space:
mode:
authorDaniil Cherednik <dcherednik@ydb.tech>2023-08-25 09:14:00 +0000
committerDaniil Cherednik <dcherednik@ydb.tech>2023-08-25 09:14:00 +0000
commit1aea989538126dcf9bb99aa87313ba942e679e7b (patch)
tree5f89fae597bbf8cfaf58c56fd2313d1896a956bb /library/cpp/actors
parent41effae1b14cbd91927d4d7746c935f773ee87ef (diff)
downloadydb-1aea989538126dcf9bb99aa87313ba942e679e7b.tar.gz
Create stable-23-3 branch
x-stable-origin-commit: 3224c68a1e19d5457dc64c1c4f3260f7cd718558
Diffstat (limited to 'library/cpp/actors')
-rw-r--r--library/cpp/actors/core/event_load.h3
-rw-r--r--library/cpp/actors/core/event_pb.h118
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.cpp67
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.h8
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.cpp4
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp79
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.h3
-rw-r--r--library/cpp/actors/interconnect/types.h1
-rw-r--r--library/cpp/actors/protos/interconnect.proto2
-rw-r--r--library/cpp/actors/util/CMakeLists.darwin-x86_64.txt2
-rw-r--r--library/cpp/actors/util/CMakeLists.linux-aarch64.txt2
-rw-r--r--library/cpp/actors/util/CMakeLists.linux-x86_64.txt2
-rw-r--r--library/cpp/actors/util/CMakeLists.windows-x86_64.txt2
-rw-r--r--library/cpp/actors/util/rc_buf.h129
-rw-r--r--library/cpp/actors/util/rope.cpp13
-rw-r--r--library/cpp/actors/util/rope.h21
-rw-r--r--library/cpp/actors/util/shared_data_rope_backend.h4
-rw-r--r--library/cpp/actors/util/ut/CMakeLists.darwin-x86_64.txt2
-rw-r--r--library/cpp/actors/util/ya.make2
19 files changed, 330 insertions, 134 deletions
diff --git a/library/cpp/actors/core/event_load.h b/library/cpp/actors/core/event_load.h
index 30cc26aa46..0062ee40db 100644
--- a/library/cpp/actors/core/event_load.h
+++ b/library/cpp/actors/core/event_load.h
@@ -24,6 +24,7 @@ namespace NActors {
size_t Size = 0; // full size of serialized event section (a chunk in rope)
size_t Tailroom = 0; // tailroom for the chunk
size_t Alignment = 0; // required alignment
+ bool IsInline = false; // if true, goes through ordinary channel
};
struct TEventSerializationInfo {
@@ -51,7 +52,7 @@ namespace NActors {
, SerializationInfo(original.SerializationInfo)
{
if (!SerializationInfo.Sections.empty()) {
- SerializationInfo.Sections.push_back(TEventSectionInfo{0, extraBuffer.size(), 0, 0});
+ SerializationInfo.Sections.push_back(TEventSectionInfo{0, extraBuffer.size(), 0, 0, true});
}
Append(std::move(extraBuffer));
}
diff --git a/library/cpp/actors/core/event_pb.h b/library/cpp/actors/core/event_pb.h
index 4c4f49b1cc..a4fdb33fb4 100644
--- a/library/cpp/actors/core/event_pb.h
+++ b/library/cpp/actors/core/event_pb.h
@@ -6,6 +6,7 @@
#include <google/protobuf/io/zero_copy_stream.h>
#include <google/protobuf/arena.h>
#include <library/cpp/actors/protos/actors.pb.h>
+#include <library/cpp/containers/stack_vector/stack_vec.h>
#include <util/generic/deque.h>
#include <util/system/context.h>
#include <util/system/filemap.h>
@@ -14,6 +15,9 @@
#include <array>
#include <span>
+// enable only when patch with this macro was successfully deployed
+#define USE_EXTENDED_PAYLOAD_FORMAT 0
+
namespace NActors {
class TRopeStream : public NProtoBuf::io::ZeroCopyInputStream {
@@ -144,6 +148,7 @@ namespace NActors {
class TEventPBBase: public TEventBase<TEv, TEventType> , public TRecHolder {
// a vector of data buffers referenced by record; if filled, then extended serialization mechanism applies
TVector<TRope> Payload;
+ size_t TotalPayloadSize = 0;
public:
using TRecHolder::Record;
@@ -191,8 +196,8 @@ namespace NActors {
result += SerializeNumber(Payload.size(), buf);
for (const TRope& rope : Payload) {
result += SerializeNumber(rope.GetSize(), buf);
- result += rope.GetSize();
}
+ result += TotalPayloadSize;
}
return result;
}
@@ -207,9 +212,20 @@ namespace NActors {
if (const auto& info = input->GetSerializationInfo(); info.IsExtendedFormat) {
// check marker
- if (!iter.Valid() || *iter.ContiguousData() != PayloadMarker) {
+ if (!iter.Valid() || (*iter.ContiguousData() != PayloadMarker && *iter.ContiguousData() != ExtendedPayloadMarker)) {
Y_FAIL("invalid event");
}
+
+ const bool dataIsSeparate = *iter.ContiguousData() == ExtendedPayloadMarker; // ropes go after sizes
+
+ auto fetchRope = [&](size_t len) {
+ TRope::TConstIterator begin = iter;
+ iter += len;
+ size -= len;
+ ev->Payload.emplace_back(begin, iter);
+ ev->TotalPayloadSize += len;
+ };
+
// skip marker
iter += 1;
--size;
@@ -218,6 +234,10 @@ namespace NActors {
if (numRopes == Max<size_t>()) {
Y_FAIL("invalid event");
}
+ TStackVec<size_t, 16> ropeLens;
+ if (dataIsSeparate) {
+ ropeLens.reserve(numRopes);
+ }
while (numRopes--) {
// parse length of the rope
const size_t len = DeserializeNumber(iter, size);
@@ -225,10 +245,14 @@ namespace NActors {
Y_FAIL("invalid event len# %zu size# %" PRIu64, len, size);
}
// extract the rope
- TRope::TConstIterator begin = iter;
- iter += len;
- size -= len;
- ev->Payload.emplace_back(begin, iter);
+ if (dataIsSeparate) {
+ ropeLens.push_back(len);
+ } else {
+ fetchRope(len);
+ }
+ }
+ for (size_t len : ropeLens) {
+ fetchRope(len);
}
}
@@ -261,6 +285,10 @@ namespace NActors {
return CreateSerializationInfoImpl(0);
}
+ bool AllowExternalDataChannel() const {
+ return TotalPayloadSize >= 4096;
+ }
+
public:
void ReservePayload(size_t size) {
Payload.reserve(size);
@@ -268,6 +296,7 @@ namespace NActors {
ui32 AddPayload(TRope&& rope) {
const ui32 id = Payload.size();
+ TotalPayloadSize += rope.size();
Payload.push_back(std::move(rope));
InvalidateCachedByteSize();
return id;
@@ -284,37 +313,49 @@ namespace NActors {
void StripPayload() {
Payload.clear();
+ TotalPayloadSize = 0;
}
protected:
TEventSerializationInfo CreateSerializationInfoImpl(size_t preserializedSize) const {
TEventSerializationInfo info;
-
- if (Payload) {
- char temp[MaxNumberBytes];
- info.Sections.push_back(TEventSectionInfo{0, 1 + SerializeNumber(Payload.size(), temp), 0, 0}); // payload marker and rope count
- for (const TRope& rope : Payload) {
- const size_t ropeSize = rope.GetSize();
- info.Sections.back().Size += SerializeNumber(ropeSize, temp);
- info.Sections.push_back(TEventSectionInfo{0, ropeSize, 0, 0}); // data as a separate section
+ info.IsExtendedFormat = static_cast<bool>(Payload);
+
+ if (static_cast<const TEv&>(*this).AllowExternalDataChannel()) {
+ if (Payload) {
+ char temp[MaxNumberBytes];
+#if USE_EXTENDED_PAYLOAD_FORMAT
+ size_t headerLen = 1 + SerializeNumber(Payload.size(), temp);
+ for (const TRope& rope : Payload) {
+ headerLen += SerializeNumber(rope.size(), temp);
+ }
+ info.Sections.push_back(TEventSectionInfo{0, headerLen, 0, 0, true});
+ for (const TRope& rope : Payload) {
+ info.Sections.push_back(TEventSectionInfo{0, rope.size(), 0, 0, false});
+ }
+#else
+ info.Sections.push_back(TEventSectionInfo{0, 1 + SerializeNumber(Payload.size(), temp), 0, 0, true}); // payload marker and rope count
+ for (const TRope& rope : Payload) {
+ const size_t ropeSize = rope.GetSize();
+ info.Sections.back().Size += SerializeNumber(ropeSize, temp);
+ info.Sections.push_back(TEventSectionInfo{0, ropeSize, 0, 0, false}); // data as a separate section
+ }
+#endif
}
- info.IsExtendedFormat = true;
- } else {
- info.IsExtendedFormat = false;
- }
- const size_t byteSize = Max<ssize_t>(0, Record.ByteSize()) + preserializedSize;
- info.Sections.push_back(TEventSectionInfo{0, byteSize, 0, 0}); // protobuf itself
+ const size_t byteSize = Max<ssize_t>(0, Record.ByteSize()) + preserializedSize;
+ info.Sections.push_back(TEventSectionInfo{0, byteSize, 0, 0, true}); // protobuf itself
#ifndef NDEBUG
- size_t total = 0;
- for (const auto& section : info.Sections) {
- total += section.Size;
- }
- size_t serialized = CalculateSerializedSize();
- Y_VERIFY(total == serialized, "total# %zu serialized# %zu byteSize# %zd Payload.size# %zu", total,
- serialized, byteSize, Payload.size());
+ size_t total = 0;
+ for (const auto& section : info.Sections) {
+ total += section.Size;
+ }
+ size_t serialized = CalculateSerializedSize();
+ Y_VERIFY(total == serialized, "total# %zu serialized# %zu byteSize# %zd Payload.size# %zu", total,
+ serialized, byteSize, Payload.size());
#endif
+ }
return info;
}
@@ -343,6 +384,27 @@ namespace NActors {
char buf[MaxNumberBytes];
return append(buf, SerializeNumber(number, buf));
};
+
+#if USE_EXTENDED_PAYLOAD_FORMAT
+ char marker = ExtendedPayloadMarker;
+ append(&marker, 1);
+ if (!appendNumber(Payload.size())) {
+ return false;
+ }
+ for (const TRope& rope : Payload) {
+ if (!appendNumber(rope.GetSize())) {
+ return false;
+ }
+ }
+ if (size) {
+ chunker->BackUp(std::exchange(size, 0));
+ }
+ for (const TRope& rope : Payload) {
+ if (!chunker->WriteRope(&rope)) {
+ return false;
+ }
+ }
+#else
char marker = PayloadMarker;
append(&marker, 1);
if (!appendNumber(Payload.size())) {
@@ -364,6 +426,7 @@ namespace NActors {
if (size) {
chunker->BackUp(size);
}
+#endif
}
if (preserialized && !chunker->WriteString(&preserialized)) {
@@ -376,6 +439,7 @@ namespace NActors {
protected:
mutable size_t CachedByteSize = 0;
+ static constexpr char ExtendedPayloadMarker = 0x06;
static constexpr char PayloadMarker = 0x07;
static constexpr size_t MaxNumberBytes = (sizeof(size_t) * CHAR_BIT + 6) / 7;
diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp
index e69483a8e9..6987c344de 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.cpp
+++ b/library/cpp/actors/interconnect/interconnect_channel.cpp
@@ -75,6 +75,8 @@ namespace NActors {
State = EState::BODY;
Iter = event.Buffer->GetBeginIter();
SerializationInfo = &event.Buffer->GetSerializationInfo();
+ SectionIndex = 0;
+ PartLenRemain = 0;
} else if (event.Event) {
State = EState::BODY;
IEventBase *base = event.Event.Get();
@@ -83,15 +85,16 @@ namespace NActors {
}
SerializationInfoContainer = base->CreateSerializationInfo();
SerializationInfo = &SerializationInfoContainer;
+ SectionIndex = 0;
+ PartLenRemain = 0;
} else { // event without buffer and IEventBase instance
State = EState::DESCRIPTOR;
SerializationInfoContainer = {};
SerializationInfo = &SerializationInfoContainer;
}
- EventInExternalDataChannel = Params.UseExternalDataChannel && !SerializationInfo->Sections.empty();
if (!event.EventSerializedSize) {
State = EState::DESCRIPTOR;
- } else if (EventInExternalDataChannel) {
+ } else if (Params.UseExternalDataChannel && !SerializationInfo->Sections.empty()) {
State = EState::SECTIONS;
SectionIndex = 0;
}
@@ -130,11 +133,15 @@ namespace NActors {
char *p = sectionInfo;
const auto& section = SerializationInfo->Sections[SectionIndex];
- *p++ = static_cast<ui8>(EXdcCommand::DECLARE_SECTION);
+ char& type = *p++;
+ type = static_cast<ui8>(EXdcCommand::DECLARE_SECTION);
p += NInterconnect::NDetail::SerializeNumber(section.Headroom, p);
p += NInterconnect::NDetail::SerializeNumber(section.Size, p);
p += NInterconnect::NDetail::SerializeNumber(section.Tailroom, p);
p += NInterconnect::NDetail::SerializeNumber(section.Alignment, p);
+ if (section.IsInline && Params.UseXdcShuffle) {
+ type = static_cast<ui8>(EXdcCommand::DECLARE_SECTION_INLINE);
+ }
Y_VERIFY(p <= std::end(sectionInfo));
const size_t declareLen = p - sectionInfo;
@@ -161,6 +168,8 @@ namespace NActors {
if (SectionIndex == SerializationInfo->Sections.size()) {
State = EState::BODY;
+ SectionIndex = 0;
+ PartLenRemain = 0;
}
break;
@@ -179,6 +188,8 @@ namespace NActors {
task.Append<External>(data, len);
}
*bytesSerialized += len;
+ Y_VERIFY_DEBUG(len <= PartLenRemain);
+ PartLenRemain -= len;
event.EventActuallySerialized += len;
if (event.EventActuallySerialized > MaxSerializedEventSize) {
@@ -189,15 +200,12 @@ namespace NActors {
bool complete = false;
if (event.Event) {
while (!complete) {
- TMutableContiguousSpan out = task.AcquireSpanForWriting<External>();
+ TMutableContiguousSpan out = task.AcquireSpanForWriting<External>().SubSpan(0, PartLenRemain);
if (!out.size()) {
break;
}
- ui32 bytesFed = 0;
for (const auto& [buffer, size] : Chunker.FeedBuf(out.data(), out.size())) {
addChunk(buffer, size, false);
- bytesFed += size;
- Y_VERIFY(bytesFed <= out.size());
}
complete = Chunker.IsComplete();
if (complete) {
@@ -206,7 +214,7 @@ namespace NActors {
}
} else if (event.Buffer) {
while (const size_t numb = Min<size_t>(External ? task.GetExternalFreeAmount() : task.GetInternalFreeAmount(),
- Iter.ContiguousSize())) {
+ Iter.ContiguousSize(), PartLenRemain)) {
const char *obuf = Iter.ContiguousData();
addChunk(obuf, numb, true);
Iter += numb;
@@ -223,14 +231,43 @@ namespace NActors {
}
bool TEventOutputChannel::FeedPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) {
- return EventInExternalDataChannel
- ? FeedExternalPayload(task, event, weightConsumed)
- : FeedInlinePayload(task, event, weightConsumed);
+ for (;;) {
+ // calculate inline or external part size (it may cover a few sections, not just single one)
+ while (!PartLenRemain) {
+ const auto& sections = SerializationInfo->Sections;
+ if (!Params.UseExternalDataChannel || sections.empty()) {
+ // all data goes inline
+ IsPartInline = true;
+ PartLenRemain = Max<size_t>();
+ } else if (!Params.UseXdcShuffle) {
+ // when UseXdcShuffle feature is not supported by the remote side, we transfer whole event over XDC
+ IsPartInline = false;
+ PartLenRemain = Max<size_t>();
+ } else {
+ Y_VERIFY(SectionIndex < sections.size());
+ IsPartInline = sections[SectionIndex].IsInline;
+ while (SectionIndex < sections.size() && IsPartInline == sections[SectionIndex].IsInline) {
+ PartLenRemain += sections[SectionIndex].Size;
+ ++SectionIndex;
+ }
+ }
+ }
+
+ // serialize bytes
+ const auto complete = IsPartInline
+ ? FeedInlinePayload(task, event, weightConsumed)
+ : FeedExternalPayload(task, event, weightConsumed);
+ if (!complete) { // no space to serialize
+ return false;
+ } else if (*complete) { // event serialized
+ return true;
+ }
+ }
}
- bool TEventOutputChannel::FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) {
+ std::optional<bool> TEventOutputChannel::FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) {
if (task.GetInternalFreeAmount() <= sizeof(TChannelPart)) {
- return false;
+ return std::nullopt;
}
auto partBookmark = task.Bookmark(sizeof(TChannelPart));
@@ -253,10 +290,10 @@ namespace NActors {
return complete;
}
- bool TEventOutputChannel::FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) {
+ std::optional<bool> TEventOutputChannel::FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) {
const size_t partSize = sizeof(TChannelPart) + sizeof(ui8) + sizeof(ui16) + (Params.Encryption ? 0 : sizeof(ui32));
if (task.GetInternalFreeAmount() < partSize || task.GetExternalFreeAmount() == 0) {
- return false;
+ return std::nullopt;
}
// clear external checksum for this chunk
diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h
index 2c5a286fe8..ef2da2fda7 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.h
+++ b/library/cpp/actors/interconnect/interconnect_channel.h
@@ -46,6 +46,7 @@ namespace NActors {
enum class EXdcCommand : ui8 {
DECLARE_SECTION = 1,
PUSH_DATA,
+ DECLARE_SECTION_INLINE,
};
struct TExSerializedEventTooLarge : std::exception {
@@ -133,7 +134,8 @@ namespace NActors {
TCoroutineChunkSerializer Chunker;
TEventSerializationInfo SerializationInfoContainer;
const TEventSerializationInfo *SerializationInfo = nullptr;
- bool EventInExternalDataChannel;
+ bool IsPartInline = false;
+ size_t PartLenRemain = 0;
size_t SectionIndex = 0;
std::vector<char> XdcData;
@@ -141,8 +143,8 @@ namespace NActors {
bool SerializeEvent(TTcpPacketOutTask& task, TEventHolder& event, size_t *bytesSerialized);
bool FeedPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
- bool FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
- bool FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
+ std::optional<bool> FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
+ std::optional<bool> FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
bool FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp
index 00ebaa5217..4a57fc226c 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.cpp
+++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp
@@ -752,6 +752,7 @@ namespace NActors {
request.SetRequestExtendedTraceFmt(true);
request.SetRequestExternalDataChannel(Common->Settings.EnableExternalDataChannel);
request.SetRequestXxhash(true);
+ request.SetRequestXdcShuffle(true);
request.SetHandshakeId(*HandshakeId);
SendExBlock(MainChannel, request, "ExRequest");
@@ -790,6 +791,7 @@ namespace NActors {
Params.AuthOnly = Params.Encryption && success.GetAuthOnly();
Params.UseExternalDataChannel = success.GetUseExternalDataChannel();
Params.UseXxhash = success.GetUseXxhash();
+ Params.UseXdcShuffle = success.GetUseXdcShuffle();
if (success.HasServerScopeId()) {
ParsePeerScopeId(success.GetServerScopeId());
}
@@ -1041,6 +1043,7 @@ namespace NActors {
Params.AuthOnly = Params.Encryption && request.GetRequestAuthOnly() && Common->Settings.TlsAuthOnly;
Params.UseExternalDataChannel = request.GetRequestExternalDataChannel() && Common->Settings.EnableExternalDataChannel;
Params.UseXxhash = request.GetRequestXxhash();
+ Params.UseXdcShuffle = request.GetRequestXdcShuffle();
if (Params.UseExternalDataChannel) {
if (request.HasHandshakeId()) {
@@ -1080,6 +1083,7 @@ namespace NActors {
success.SetUseExtendedTraceFmt(true);
success.SetUseExternalDataChannel(Params.UseExternalDataChannel);
success.SetUseXxhash(Params.UseXxhash);
+ success.SetUseXdcShuffle(Params.UseXdcShuffle);
SendExBlock(MainChannel, record, "ExReply");
// extract sender actor id (self virtual id)
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
index bb7e069883..3b42884355 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
@@ -449,7 +449,7 @@ namespace NActors {
} else if (IgnorePayload) { // throw payload out
Payload.EraseFront(part.Size);
} else if (!part.IsLastPart()) { // just ordinary inline event data
- Payload.ExtractFront(part.Size, &pendingEvent.Payload);
+ Payload.ExtractFront(part.Size, &pendingEvent.InternalPayload);
} else { // event final block
TEventDescr2 v2;
@@ -529,8 +529,9 @@ namespace NActors {
const char *ptr = XdcCommands.data();
const char *end = ptr + XdcCommands.size();
while (ptr != end) {
- switch (static_cast<EXdcCommand>(*ptr++)) {
- case EXdcCommand::DECLARE_SECTION: {
+ switch (const auto cmd = static_cast<EXdcCommand>(*ptr++)) {
+ case EXdcCommand::DECLARE_SECTION:
+ case EXdcCommand::DECLARE_SECTION_INLINE: {
// extract and validate command parameters
const ui64 headroom = NInterconnect::NDetail::DeserializeNumber(&ptr, end);
const ui64 size = NInterconnect::NDetail::DeserializeNumber(&ptr, end);
@@ -542,17 +543,22 @@ namespace NActors {
}
if (!IgnorePayload) { // process command if packet is being applied
- // allocate buffer and push it into the payload
auto& pendingEvent = context.PendingEvents.back();
- pendingEvent.SerializationInfo.Sections.push_back(TEventSectionInfo{headroom, size, tailroom, alignment});
- auto buffer = TRcBuf::Uninitialized(size, headroom, tailroom);
- if (size) {
- context.XdcBuffers.push_back(buffer.GetContiguousSpanMut());
+ const bool isInline = cmd == EXdcCommand::DECLARE_SECTION_INLINE;
+ pendingEvent.SerializationInfo.Sections.push_back(TEventSectionInfo{headroom, size, tailroom,
+ alignment, isInline});
+
+ Y_VERIFY(!isInline || Params.UseXdcShuffle);
+ if (!isInline) {
+ // allocate buffer and push it into the payload
+ auto buffer = TRcBuf::Uninitialized(size, headroom, tailroom);
+ if (size) {
+ context.XdcBuffers.push_back(buffer.GetContiguousSpanMut());
+ }
+ pendingEvent.ExternalPayload.Insert(pendingEvent.ExternalPayload.End(), TRope(std::move(buffer)));
+ pendingEvent.XdcSizeLeft += size;
+ ++XdcSections;
}
- pendingEvent.Payload.Insert(pendingEvent.Payload.End(), TRope(std::move(buffer)));
- pendingEvent.XdcSizeLeft += size;
-
- ++XdcSections;
}
continue;
}
@@ -608,18 +614,57 @@ namespace NActors {
if (!pendingEvent.EventData || pendingEvent.XdcSizeLeft) {
break; // event is not ready yet
}
-
auto& descr = *pendingEvent.EventData;
+
+ // create aggregated payload
+ TRope payload;
+ if (!pendingEvent.SerializationInfo.Sections.empty()) {
+ // unshuffle inline and external payloads into single event content
+ TRope *prev = nullptr;
+ size_t accumSize = 0;
+ for (const auto& s : pendingEvent.SerializationInfo.Sections) {
+ TRope *rope = s.IsInline
+ ? &pendingEvent.InternalPayload
+ : &pendingEvent.ExternalPayload;
+ if (rope != prev) {
+ if (accumSize) {
+ prev->ExtractFront(accumSize, &payload);
+ }
+ prev = rope;
+ accumSize = 0;
+ }
+ accumSize += s.Size;
+ }
+ if (accumSize) {
+ prev->ExtractFront(accumSize, &payload);
+ }
+
+ if (pendingEvent.InternalPayload || pendingEvent.ExternalPayload) {
+ LOG_CRIT_IC_SESSION("ICIS19", "unprocessed payload remains after shuffling"
+ " Type# 0x%08" PRIx32 " InternalPayload.size# %zu ExternalPayload.size# %zu",
+ descr.Type, pendingEvent.InternalPayload.size(), pendingEvent.ExternalPayload.size());
+ Y_VERIFY_DEBUG(false);
+ throw TExReestablishConnection{TDisconnectReason::FormatError()};
+ }
+ }
+
+ // we add any remains of internal payload to the end
+ if (auto& rope = pendingEvent.InternalPayload) {
+ rope.ExtractFront(rope.size(), &payload);
+ }
+ // and ensure there is no unprocessed external payload
+ Y_VERIFY(!pendingEvent.ExternalPayload);
+
#if IC_FORCE_HARDENED_PACKET_CHECKS
- if (descr.Len != pendingEvent.Payload.GetSize()) {
+ if (descr.Len != payload.GetSize()) {
LOG_CRIT_IC_SESSION("ICIS17", "event length mismatch Type# 0x%08" PRIx32 " received# %zu expected# %" PRIu32,
- descr.Type, pendingEvent.Payload.GetSize(), descr.Len);
+ descr.Type, payload.GetSize(), descr.Len);
throw TExReestablishConnection{TDisconnectReason::ChecksumError()};
}
#endif
if (descr.Checksum) {
ui32 checksum = 0;
- for (const auto&& [data, size] : pendingEvent.Payload) {
+ for (const auto&& [data, size] : payload) {
checksum = Crc32cExtendMSanCompatible(checksum, data, size);
}
if (checksum != descr.Checksum) {
@@ -633,7 +678,7 @@ namespace NActors {
descr.Flags & ~IEventHandle::FlagExtendedFormat,
descr.Recipient,
descr.Sender,
- MakeIntrusive<TEventSerializedData>(std::move(pendingEvent.Payload), std::move(pendingEvent.SerializationInfo)),
+ MakeIntrusive<TEventSerializedData>(std::move(payload), std::move(pendingEvent.SerializationInfo)),
descr.Cookie,
Params.PeerScopeId,
std::move(descr.TraceId));
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h
index adc9fd3710..e1c555e629 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h
@@ -126,7 +126,8 @@ namespace NActors {
struct TPerChannelContext {
struct TPendingEvent {
TEventSerializationInfo SerializationInfo;
- TRope Payload;
+ TRope InternalPayload;
+ TRope ExternalPayload;
std::optional<TEventData> EventData;
// number of bytes remaining through XDC channel
diff --git a/library/cpp/actors/interconnect/types.h b/library/cpp/actors/interconnect/types.h
index 7628fbd968..14b1a1c7a6 100644
--- a/library/cpp/actors/interconnect/types.h
+++ b/library/cpp/actors/interconnect/types.h
@@ -57,6 +57,7 @@ namespace NActors {
bool AuthOnly = {};
bool UseExternalDataChannel = {};
bool UseXxhash = {};
+ bool UseXdcShuffle = {};
TString AuthCN;
NActors::TScopeId PeerScopeId;
};
diff --git a/library/cpp/actors/protos/interconnect.proto b/library/cpp/actors/protos/interconnect.proto
index 538ede33d1..0e88f3bce5 100644
--- a/library/cpp/actors/protos/interconnect.proto
+++ b/library/cpp/actors/protos/interconnect.proto
@@ -74,6 +74,7 @@ message THandshakeRequest {
optional bool RequestExtendedTraceFmt = 20;
optional bool RequestExternalDataChannel = 21;
optional bool RequestXxhash = 24;
+ optional bool RequestXdcShuffle = 25;
optional bytes CompatibilityInfo = 22;
@@ -102,6 +103,7 @@ message THandshakeSuccess {
optional bool UseExtendedTraceFmt = 13;
optional bool UseExternalDataChannel = 14;
optional bool UseXxhash = 16;
+ optional bool UseXdcShuffle = 17;
optional bytes CompatibilityInfo = 15;
}
diff --git a/library/cpp/actors/util/CMakeLists.darwin-x86_64.txt b/library/cpp/actors/util/CMakeLists.darwin-x86_64.txt
index 6ab1bb43c3..be68d418f7 100644
--- a/library/cpp/actors/util/CMakeLists.darwin-x86_64.txt
+++ b/library/cpp/actors/util/CMakeLists.darwin-x86_64.txt
@@ -12,6 +12,7 @@ add_library(cpp-actors-util)
target_link_libraries(cpp-actors-util PUBLIC
contrib-libs-cxxsupp
yutil
+ cpp-containers-absl_flat_hash
cpp-deprecated-atomic
library-cpp-pop_count
)
@@ -19,6 +20,7 @@ target_sources(cpp-actors-util PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/actors/util/affinity.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/util/memory_track.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/util/memory_tracker.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/rope.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/util/rc_buf.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/util/shared_data.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/util/should_continue.cpp
diff --git a/library/cpp/actors/util/CMakeLists.linux-aarch64.txt b/library/cpp/actors/util/CMakeLists.linux-aarch64.txt
index 4582852947..9c5183c2bd 100644
--- a/library/cpp/actors/util/CMakeLists.linux-aarch64.txt
+++ b/library/cpp/actors/util/CMakeLists.linux-aarch64.txt
@@ -13,6 +13,7 @@ target_link_libraries(cpp-actors-util PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
yutil
+ cpp-containers-absl_flat_hash
cpp-deprecated-atomic
library-cpp-pop_count
)
@@ -20,6 +21,7 @@ target_sources(cpp-actors-util PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/actors/util/affinity.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/util/memory_track.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/util/memory_tracker.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/rope.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/util/rc_buf.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/util/shared_data.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/util/should_continue.cpp
diff --git a/library/cpp/actors/util/CMakeLists.linux-x86_64.txt b/library/cpp/actors/util/CMakeLists.linux-x86_64.txt
index 4582852947..9c5183c2bd 100644
--- a/library/cpp/actors/util/CMakeLists.linux-x86_64.txt
+++ b/library/cpp/actors/util/CMakeLists.linux-x86_64.txt
@@ -13,6 +13,7 @@ target_link_libraries(cpp-actors-util PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
yutil
+ cpp-containers-absl_flat_hash
cpp-deprecated-atomic
library-cpp-pop_count
)
@@ -20,6 +21,7 @@ target_sources(cpp-actors-util PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/actors/util/affinity.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/util/memory_track.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/util/memory_tracker.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/rope.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/util/rc_buf.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/util/shared_data.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/util/should_continue.cpp
diff --git a/library/cpp/actors/util/CMakeLists.windows-x86_64.txt b/library/cpp/actors/util/CMakeLists.windows-x86_64.txt
index 6ab1bb43c3..be68d418f7 100644
--- a/library/cpp/actors/util/CMakeLists.windows-x86_64.txt
+++ b/library/cpp/actors/util/CMakeLists.windows-x86_64.txt
@@ -12,6 +12,7 @@ add_library(cpp-actors-util)
target_link_libraries(cpp-actors-util PUBLIC
contrib-libs-cxxsupp
yutil
+ cpp-containers-absl_flat_hash
cpp-deprecated-atomic
library-cpp-pop_count
)
@@ -19,6 +20,7 @@ target_sources(cpp-actors-util PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/actors/util/affinity.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/util/memory_track.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/util/memory_tracker.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/rope.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/util/rc_buf.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/util/shared_data.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/util/should_continue.cpp
diff --git a/library/cpp/actors/util/rc_buf.h b/library/cpp/actors/util/rc_buf.h
index 60d0ef904a..7b6c68d269 100644
--- a/library/cpp/actors/util/rc_buf.h
+++ b/library/cpp/actors/util/rc_buf.h
@@ -294,6 +294,13 @@ struct IContiguousChunk : TThrRefBase {
return GetDataMut();
}
+ /**
+ * Should return true if GetDataMut() would not copy contents when called.
+ */
+ virtual bool IsPrivate() const {
+ return true;
+ }
+
virtual size_t GetOccupiedMemorySize() const = 0;
};
@@ -434,8 +441,12 @@ class TRcBuf {
using T = std::decay_t<decltype(value)>;
if constexpr (std::is_same_v<T, NActors::TSharedData> || std::is_same_v<T, TInternalBackend>) {
return value.IsPrivate();
+ } else if constexpr (std::is_same_v<T, TString>) {
+ return value.IsDetached();
+ } else if constexpr (std::is_same_v<T, IContiguousChunk::TPtr>) {
+ return value.RefCount() == 1 && value->IsPrivate();
} else {
- return false;
+ static_assert(TDependentFalse<T>);
}
});
}
@@ -446,14 +457,12 @@ class TRcBuf {
}
return Visit(Owner, [](EType, auto& value) -> TContiguousSpan {
using T = std::decay_t<decltype(value)>;
- if constexpr (std::is_same_v<T, TString>) {
- return {&(*value.cbegin()), value.size()};
- } else if constexpr (std::is_same_v<T, NActors::TSharedData> || std::is_same_v<T, TInternalBackend>) {
+ if constexpr (std::is_same_v<T, TString> || std::is_same_v<T, NActors::TSharedData> || std::is_same_v<T, TInternalBackend>) {
return {value.data(), value.size()};
} else if constexpr (std::is_same_v<T, IContiguousChunk::TPtr>) {
return value->GetData();
} else {
- return {};
+ static_assert(TDependentFalse<T>, "unexpected type");
}
});
}
@@ -479,7 +488,7 @@ class TRcBuf {
} else if constexpr (std::is_same_v<T, IContiguousChunk::TPtr>) {
return value->GetDataMut();
} else {
- return {};
+ static_assert(TDependentFalse<T>, "unexpected type");
}
});
}
@@ -497,7 +506,7 @@ class TRcBuf {
} else if constexpr (std::is_same_v<T, IContiguousChunk::TPtr>) {
return value->UnsafeGetDataMut();
} else {
- return {};
+ static_assert(TDependentFalse<T>, "unexpected type");
}
});
}
@@ -515,7 +524,7 @@ class TRcBuf {
} else if constexpr (std::is_same_v<T, IContiguousChunk::TPtr>) {
return value->GetOccupiedMemorySize();
} else {
- Y_FAIL();
+ static_assert(TDependentFalse<T>, "unexpected type");
}
});
}
@@ -582,28 +591,26 @@ class TRcBuf {
return false;
}
- template <class TResult>
- TResult GetRaw() const {
+ template <typename TResult, typename TCallback>
+ std::invoke_result_t<TCallback, const TResult*> ApplySpecificValue(TCallback&& callback) const {
+ static_assert(std::is_same_v<TResult, TString> ||
+ std::is_same_v<TResult, NActors::TSharedData> ||
+ std::is_same_v<TResult, TInternalBackend> ||
+ std::is_same_v<TResult, IContiguousChunk::TPtr>);
+
if (!Owner) {
- return TResult{};
+ return callback(nullptr);
}
- return Visit(Owner, [](EType, auto& value) {
+ return Visit(Owner, [&](EType, auto& value) {
using T = std::decay_t<decltype(value)>;
if constexpr (std::is_same_v<T, TResult>) {
- return value;
+ return callback(&value);
} else {
- Y_FAIL();
- return TResult{}; // unreachable
+ return callback(nullptr);
}
});
}
- NActors::TSharedData GetRawTrimmed(size_t size) const {
- NActors::TSharedData result = GetRaw<NActors::TSharedData>();
- result.TrimBack(size);
- return result;
- }
-
explicit operator bool() const {
return static_cast<bool>(Owner);
}
@@ -845,28 +852,6 @@ public:
return Backend.ContainsNativeType<TType>();
}
- template <class TResult>
- TResult GetRaw() const {
- return Backend.GetRaw<TResult>();
- }
-
- NActors::TSharedData GetRawTrimmed(size_t size) const {
- return Backend.GetRawTrimmed(size);
- }
-
- bool ReferencesWholeContainer() const {
- return Backend.GetData().size() == GetSize();
- }
-
-
- bool ReferencesTrimableToWholeContainer() const {
- if (ContainsNativeType<NActors::TSharedData>()) {
- return Backend.GetData().size() == (GetSize() + UnsafeTailroom());
- } else {
- return ReferencesWholeContainer();
- }
- }
-
bool CanGrowFront() const noexcept {
return Backend.CanGrowFront(Begin);
}
@@ -891,14 +876,20 @@ public:
return Begin;
}
- char* GetDataMut() {
- const char* oldBegin = Backend.GetData().data();
- ptrdiff_t offset = Begin - oldBegin;
- size_t size = GetSize();
- char* newBegin = Backend.GetDataMut().data();
- Begin = newBegin + offset;
- End = Begin + size;
- return newBegin + offset;
+ char* GetDataMut(size_t headroom = 0, size_t tailroom = 0) {
+ const TContiguousSpan backendData = Backend.GetData();
+ if (IsPrivate() || (backendData.data() == GetData() && backendData.size() == GetSize())) { // if we own container or reference it whole
+ const char* oldBegin = backendData.data();
+ ptrdiff_t offset = Begin - oldBegin;
+ size_t size = GetSize();
+ char* newBegin = Backend.GetDataMut().data();
+ Begin = newBegin + offset;
+ End = Begin + size;
+ return newBegin + offset;
+ } else { // make a copy of referenced data
+ *this = Copy(GetContiguousSpan(), headroom, tailroom);
+ return Backend.GetDataMut().data();
+ }
}
char* UnsafeGetDataMut() {
@@ -913,18 +904,30 @@ public:
template <class TResult>
TResult ExtractUnderlyingContainerOrCopy() const {
- if (ContainsNativeType<TResult>() && (ReferencesWholeContainer() || ReferencesTrimableToWholeContainer())) {
- using T = std::decay_t<TResult>;
- if constexpr (std::is_same_v<T, NActors::TSharedData>) {
- return GetRawTrimmed(GetSize());
- } else {
- return GetRaw<TResult>();
+ static_assert(std::is_same_v<TResult, TString> ||
+ std::is_same_v<TResult, NActors::TSharedData> ||
+ std::is_same_v<TResult, TInternalBackend>);
+
+ constexpr bool isSharedData = std::is_same_v<TResult, NActors::TSharedData>;
+ TResult res;
+
+ const bool found = Backend.ApplySpecificValue<TResult>([&](const TResult *raw) {
+ if (raw && raw->data() == Begin && (isSharedData ? End <= Begin + raw->size() : End == Begin + raw->size())) {
+ if constexpr (isSharedData) {
+ raw->TrimBack(size());
+ }
+ res = TResult(*raw);
+ return true;
}
+ return false;
+ });
+
+ if (!found) {
+ res = TResult::Uninitialized(GetSize());
+ char* data = NContiguousDataDetails::TContainerTraits<TResult>::UnsafeGetDataMut(res);
+ std::memcpy(data, GetData(), GetSize());
}
- TResult res = TResult::Uninitialized(GetSize());
- char* data = NContiguousDataDetails::TContainerTraits<TResult>::UnsafeGetDataMut(res);
- std::memcpy(data, Begin, End - Begin);
return res;
}
@@ -932,7 +935,7 @@ public:
return {GetData(), GetSize()};
}
- TStringBuf Slice(size_t pos = 0, size_t len = -1) const noexcept {
+ TStringBuf Slice(size_t pos = 0, size_t len = Max<size_t>()) const noexcept {
pos = Min(pos, size());
len = Min(len, size() - pos);
return {const_cast<TRcBuf*>(this)->UnsafeGetDataMut() + pos, len};
@@ -1078,6 +1081,10 @@ public:
return GetDataMut();
}
+ bool IsPrivate() const {
+ return Backend.IsPrivate();
+ }
+
size_t UnsafeHeadroom() const {
return Begin - Backend.GetData().data();
}
diff --git a/library/cpp/actors/util/rope.cpp b/library/cpp/actors/util/rope.cpp
new file mode 100644
index 0000000000..0927774099
--- /dev/null
+++ b/library/cpp/actors/util/rope.cpp
@@ -0,0 +1,13 @@
+#include "rope.h"
+#include <library/cpp/containers/absl_flat_hash/flat_hash_set.h>
+
+size_t TRope::GetOccupiedMemorySize() const {
+ size_t res = 0;
+ absl::flat_hash_set<const void*> chunks;
+ for (const auto& chunk : Chain) {
+ if (const auto [it, inserted] = chunks.insert(chunk.Backend.UniqueId()); inserted) {
+ res += chunk.Backend.GetOccupiedMemorySize();
+ }
+ }
+ return res;
+}
diff --git a/library/cpp/actors/util/rope.h b/library/cpp/actors/util/rope.h
index 3c65588181..201ce06f0d 100644
--- a/library/cpp/actors/util/rope.h
+++ b/library/cpp/actors/util/rope.h
@@ -455,6 +455,10 @@ public:
return !Size;
}
+ bool empty() const {
+ return IsEmpty();
+ }
+
operator bool() const {
return Chain;
}
@@ -699,10 +703,7 @@ public:
// Use this method carefully -- it may significantly reduce performance when misused.
TString ConvertToString() const {
- // TODO(innokentii): could be microoptimized for single TString case
- TString res = TString::Uninitialized(GetSize());
- Begin().ExtractPlainDataAndAdvance(res.Detach(), res.size());
- return res;
+ return ExtractUnderlyingContainerOrCopy<TString>();
}
/**
@@ -710,14 +711,14 @@ public:
*/
template <class TResult>
TResult ExtractUnderlyingContainerOrCopy() const {
- if (IsContiguous() && GetSize() != 0) {
- const auto& chunk = Begin().GetChunk();
- return chunk.ExtractUnderlyingContainerOrCopy<TResult>();
+ if (Chain.begin() != Chain.end() && ++Chain.begin() == Chain.end()) {
+ return Chain.GetFirstChunk().ExtractUnderlyingContainerOrCopy<TResult>();
}
- TResult res = TResult::Uninitialized(GetSize());
+ const size_t size = GetSize();
+ TResult res = TResult::Uninitialized(size);
char* data = NContiguousDataDetails::TContainerTraits<TResult>::UnsafeGetDataMut(res);
- Begin().ExtractPlainDataAndAdvance(data, res.size());
+ Begin().ExtractPlainDataAndAdvance(data, size);
return res;
}
@@ -805,6 +806,8 @@ public:
return TRcBuf(Begin().GetChunk());
}
+ size_t GetOccupiedMemorySize() const;
+
friend bool operator==(const TRope& x, const TRope& y) { return Compare(x, y) == 0; }
friend bool operator!=(const TRope& x, const TRope& y) { return Compare(x, y) != 0; }
friend bool operator< (const TRope& x, const TRope& y) { return Compare(x, y) < 0; }
diff --git a/library/cpp/actors/util/shared_data_rope_backend.h b/library/cpp/actors/util/shared_data_rope_backend.h
index 2abfcf5584..a221ae668b 100644
--- a/library/cpp/actors/util/shared_data_rope_backend.h
+++ b/library/cpp/actors/util/shared_data_rope_backend.h
@@ -29,6 +29,10 @@ public:
return {const_cast<char *>(Buffer.data()), Buffer.size()};
}
+ bool IsPrivate() const override {
+ return Buffer.IsPrivate();
+ }
+
size_t GetOccupiedMemorySize() const override {
return Buffer.size();
}
diff --git a/library/cpp/actors/util/ut/CMakeLists.darwin-x86_64.txt b/library/cpp/actors/util/ut/CMakeLists.darwin-x86_64.txt
index cb32e0652f..f02b2d926c 100644
--- a/library/cpp/actors/util/ut/CMakeLists.darwin-x86_64.txt
+++ b/library/cpp/actors/util/ut/CMakeLists.darwin-x86_64.txt
@@ -22,6 +22,8 @@ target_link_options(library-cpp-actors-util-ut PRIVATE
-Wl,-platform_version,macos,11.0,11.0
-fPIC
-fPIC
+ -framework
+ CoreFoundation
)
target_sources(library-cpp-actors-util-ut PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/actors/util/cpu_load_log_ut.cpp
diff --git a/library/cpp/actors/util/ya.make b/library/cpp/actors/util/ya.make
index 912b55868d..48d595c156 100644
--- a/library/cpp/actors/util/ya.make
+++ b/library/cpp/actors/util/ya.make
@@ -19,6 +19,7 @@ SRCS(
memory_tracker.cpp
memory_tracker.h
recentwnd.h
+ rope.cpp
rope.h
rc_buf.cpp
rc_buf.h
@@ -37,6 +38,7 @@ SRCS(
)
PEERDIR(
+ library/cpp/containers/absl_flat_hash
library/cpp/deprecated/atomic
library/cpp/pop_count
)