diff options
author | Daniil Cherednik <dcherednik@ydb.tech> | 2023-08-25 09:14:00 +0000 |
---|---|---|
committer | Daniil Cherednik <dcherednik@ydb.tech> | 2023-08-25 09:14:00 +0000 |
commit | 1aea989538126dcf9bb99aa87313ba942e679e7b (patch) | |
tree | 5f89fae597bbf8cfaf58c56fd2313d1896a956bb /library/cpp/actors | |
parent | 41effae1b14cbd91927d4d7746c935f773ee87ef (diff) | |
download | ydb-1aea989538126dcf9bb99aa87313ba942e679e7b.tar.gz |
Create stable-23-3 branch
x-stable-origin-commit: 3224c68a1e19d5457dc64c1c4f3260f7cd718558
Diffstat (limited to 'library/cpp/actors')
19 files changed, 330 insertions, 134 deletions
diff --git a/library/cpp/actors/core/event_load.h b/library/cpp/actors/core/event_load.h index 30cc26aa46..0062ee40db 100644 --- a/library/cpp/actors/core/event_load.h +++ b/library/cpp/actors/core/event_load.h @@ -24,6 +24,7 @@ namespace NActors { size_t Size = 0; // full size of serialized event section (a chunk in rope) size_t Tailroom = 0; // tailroom for the chunk size_t Alignment = 0; // required alignment + bool IsInline = false; // if true, goes through ordinary channel }; struct TEventSerializationInfo { @@ -51,7 +52,7 @@ namespace NActors { , SerializationInfo(original.SerializationInfo) { if (!SerializationInfo.Sections.empty()) { - SerializationInfo.Sections.push_back(TEventSectionInfo{0, extraBuffer.size(), 0, 0}); + SerializationInfo.Sections.push_back(TEventSectionInfo{0, extraBuffer.size(), 0, 0, true}); } Append(std::move(extraBuffer)); } diff --git a/library/cpp/actors/core/event_pb.h b/library/cpp/actors/core/event_pb.h index 4c4f49b1cc..a4fdb33fb4 100644 --- a/library/cpp/actors/core/event_pb.h +++ b/library/cpp/actors/core/event_pb.h @@ -6,6 +6,7 @@ #include <google/protobuf/io/zero_copy_stream.h> #include <google/protobuf/arena.h> #include <library/cpp/actors/protos/actors.pb.h> +#include <library/cpp/containers/stack_vector/stack_vec.h> #include <util/generic/deque.h> #include <util/system/context.h> #include <util/system/filemap.h> @@ -14,6 +15,9 @@ #include <array> #include <span> +// enable only when patch with this macro was successfully deployed +#define USE_EXTENDED_PAYLOAD_FORMAT 0 + namespace NActors { class TRopeStream : public NProtoBuf::io::ZeroCopyInputStream { @@ -144,6 +148,7 @@ namespace NActors { class TEventPBBase: public TEventBase<TEv, TEventType> , public TRecHolder { // a vector of data buffers referenced by record; if filled, then extended serialization mechanism applies TVector<TRope> Payload; + size_t TotalPayloadSize = 0; public: using TRecHolder::Record; @@ -191,8 +196,8 @@ namespace NActors { result += SerializeNumber(Payload.size(), buf); for (const TRope& rope : Payload) { result += SerializeNumber(rope.GetSize(), buf); - result += rope.GetSize(); } + result += TotalPayloadSize; } return result; } @@ -207,9 +212,20 @@ namespace NActors { if (const auto& info = input->GetSerializationInfo(); info.IsExtendedFormat) { // check marker - if (!iter.Valid() || *iter.ContiguousData() != PayloadMarker) { + if (!iter.Valid() || (*iter.ContiguousData() != PayloadMarker && *iter.ContiguousData() != ExtendedPayloadMarker)) { Y_FAIL("invalid event"); } + + const bool dataIsSeparate = *iter.ContiguousData() == ExtendedPayloadMarker; // ropes go after sizes + + auto fetchRope = [&](size_t len) { + TRope::TConstIterator begin = iter; + iter += len; + size -= len; + ev->Payload.emplace_back(begin, iter); + ev->TotalPayloadSize += len; + }; + // skip marker iter += 1; --size; @@ -218,6 +234,10 @@ namespace NActors { if (numRopes == Max<size_t>()) { Y_FAIL("invalid event"); } + TStackVec<size_t, 16> ropeLens; + if (dataIsSeparate) { + ropeLens.reserve(numRopes); + } while (numRopes--) { // parse length of the rope const size_t len = DeserializeNumber(iter, size); @@ -225,10 +245,14 @@ namespace NActors { Y_FAIL("invalid event len# %zu size# %" PRIu64, len, size); } // extract the rope - TRope::TConstIterator begin = iter; - iter += len; - size -= len; - ev->Payload.emplace_back(begin, iter); + if (dataIsSeparate) { + ropeLens.push_back(len); + } else { + fetchRope(len); + } + } + for (size_t len : ropeLens) { + fetchRope(len); } } @@ -261,6 +285,10 @@ namespace NActors { return CreateSerializationInfoImpl(0); } + bool AllowExternalDataChannel() const { + return TotalPayloadSize >= 4096; + } + public: void ReservePayload(size_t size) { Payload.reserve(size); @@ -268,6 +296,7 @@ namespace NActors { ui32 AddPayload(TRope&& rope) { const ui32 id = Payload.size(); + TotalPayloadSize += rope.size(); Payload.push_back(std::move(rope)); InvalidateCachedByteSize(); return id; @@ -284,37 +313,49 @@ namespace NActors { void StripPayload() { Payload.clear(); + TotalPayloadSize = 0; } protected: TEventSerializationInfo CreateSerializationInfoImpl(size_t preserializedSize) const { TEventSerializationInfo info; - - if (Payload) { - char temp[MaxNumberBytes]; - info.Sections.push_back(TEventSectionInfo{0, 1 + SerializeNumber(Payload.size(), temp), 0, 0}); // payload marker and rope count - for (const TRope& rope : Payload) { - const size_t ropeSize = rope.GetSize(); - info.Sections.back().Size += SerializeNumber(ropeSize, temp); - info.Sections.push_back(TEventSectionInfo{0, ropeSize, 0, 0}); // data as a separate section + info.IsExtendedFormat = static_cast<bool>(Payload); + + if (static_cast<const TEv&>(*this).AllowExternalDataChannel()) { + if (Payload) { + char temp[MaxNumberBytes]; +#if USE_EXTENDED_PAYLOAD_FORMAT + size_t headerLen = 1 + SerializeNumber(Payload.size(), temp); + for (const TRope& rope : Payload) { + headerLen += SerializeNumber(rope.size(), temp); + } + info.Sections.push_back(TEventSectionInfo{0, headerLen, 0, 0, true}); + for (const TRope& rope : Payload) { + info.Sections.push_back(TEventSectionInfo{0, rope.size(), 0, 0, false}); + } +#else + info.Sections.push_back(TEventSectionInfo{0, 1 + SerializeNumber(Payload.size(), temp), 0, 0, true}); // payload marker and rope count + for (const TRope& rope : Payload) { + const size_t ropeSize = rope.GetSize(); + info.Sections.back().Size += SerializeNumber(ropeSize, temp); + info.Sections.push_back(TEventSectionInfo{0, ropeSize, 0, 0, false}); // data as a separate section + } +#endif } - info.IsExtendedFormat = true; - } else { - info.IsExtendedFormat = false; - } - const size_t byteSize = Max<ssize_t>(0, Record.ByteSize()) + preserializedSize; - info.Sections.push_back(TEventSectionInfo{0, byteSize, 0, 0}); // protobuf itself + const size_t byteSize = Max<ssize_t>(0, Record.ByteSize()) + preserializedSize; + info.Sections.push_back(TEventSectionInfo{0, byteSize, 0, 0, true}); // protobuf itself #ifndef NDEBUG - size_t total = 0; - for (const auto& section : info.Sections) { - total += section.Size; - } - size_t serialized = CalculateSerializedSize(); - Y_VERIFY(total == serialized, "total# %zu serialized# %zu byteSize# %zd Payload.size# %zu", total, - serialized, byteSize, Payload.size()); + size_t total = 0; + for (const auto& section : info.Sections) { + total += section.Size; + } + size_t serialized = CalculateSerializedSize(); + Y_VERIFY(total == serialized, "total# %zu serialized# %zu byteSize# %zd Payload.size# %zu", total, + serialized, byteSize, Payload.size()); #endif + } return info; } @@ -343,6 +384,27 @@ namespace NActors { char buf[MaxNumberBytes]; return append(buf, SerializeNumber(number, buf)); }; + +#if USE_EXTENDED_PAYLOAD_FORMAT + char marker = ExtendedPayloadMarker; + append(&marker, 1); + if (!appendNumber(Payload.size())) { + return false; + } + for (const TRope& rope : Payload) { + if (!appendNumber(rope.GetSize())) { + return false; + } + } + if (size) { + chunker->BackUp(std::exchange(size, 0)); + } + for (const TRope& rope : Payload) { + if (!chunker->WriteRope(&rope)) { + return false; + } + } +#else char marker = PayloadMarker; append(&marker, 1); if (!appendNumber(Payload.size())) { @@ -364,6 +426,7 @@ namespace NActors { if (size) { chunker->BackUp(size); } +#endif } if (preserialized && !chunker->WriteString(&preserialized)) { @@ -376,6 +439,7 @@ namespace NActors { protected: mutable size_t CachedByteSize = 0; + static constexpr char ExtendedPayloadMarker = 0x06; static constexpr char PayloadMarker = 0x07; static constexpr size_t MaxNumberBytes = (sizeof(size_t) * CHAR_BIT + 6) / 7; diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp index e69483a8e9..6987c344de 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.cpp +++ b/library/cpp/actors/interconnect/interconnect_channel.cpp @@ -75,6 +75,8 @@ namespace NActors { State = EState::BODY; Iter = event.Buffer->GetBeginIter(); SerializationInfo = &event.Buffer->GetSerializationInfo(); + SectionIndex = 0; + PartLenRemain = 0; } else if (event.Event) { State = EState::BODY; IEventBase *base = event.Event.Get(); @@ -83,15 +85,16 @@ namespace NActors { } SerializationInfoContainer = base->CreateSerializationInfo(); SerializationInfo = &SerializationInfoContainer; + SectionIndex = 0; + PartLenRemain = 0; } else { // event without buffer and IEventBase instance State = EState::DESCRIPTOR; SerializationInfoContainer = {}; SerializationInfo = &SerializationInfoContainer; } - EventInExternalDataChannel = Params.UseExternalDataChannel && !SerializationInfo->Sections.empty(); if (!event.EventSerializedSize) { State = EState::DESCRIPTOR; - } else if (EventInExternalDataChannel) { + } else if (Params.UseExternalDataChannel && !SerializationInfo->Sections.empty()) { State = EState::SECTIONS; SectionIndex = 0; } @@ -130,11 +133,15 @@ namespace NActors { char *p = sectionInfo; const auto& section = SerializationInfo->Sections[SectionIndex]; - *p++ = static_cast<ui8>(EXdcCommand::DECLARE_SECTION); + char& type = *p++; + type = static_cast<ui8>(EXdcCommand::DECLARE_SECTION); p += NInterconnect::NDetail::SerializeNumber(section.Headroom, p); p += NInterconnect::NDetail::SerializeNumber(section.Size, p); p += NInterconnect::NDetail::SerializeNumber(section.Tailroom, p); p += NInterconnect::NDetail::SerializeNumber(section.Alignment, p); + if (section.IsInline && Params.UseXdcShuffle) { + type = static_cast<ui8>(EXdcCommand::DECLARE_SECTION_INLINE); + } Y_VERIFY(p <= std::end(sectionInfo)); const size_t declareLen = p - sectionInfo; @@ -161,6 +168,8 @@ namespace NActors { if (SectionIndex == SerializationInfo->Sections.size()) { State = EState::BODY; + SectionIndex = 0; + PartLenRemain = 0; } break; @@ -179,6 +188,8 @@ namespace NActors { task.Append<External>(data, len); } *bytesSerialized += len; + Y_VERIFY_DEBUG(len <= PartLenRemain); + PartLenRemain -= len; event.EventActuallySerialized += len; if (event.EventActuallySerialized > MaxSerializedEventSize) { @@ -189,15 +200,12 @@ namespace NActors { bool complete = false; if (event.Event) { while (!complete) { - TMutableContiguousSpan out = task.AcquireSpanForWriting<External>(); + TMutableContiguousSpan out = task.AcquireSpanForWriting<External>().SubSpan(0, PartLenRemain); if (!out.size()) { break; } - ui32 bytesFed = 0; for (const auto& [buffer, size] : Chunker.FeedBuf(out.data(), out.size())) { addChunk(buffer, size, false); - bytesFed += size; - Y_VERIFY(bytesFed <= out.size()); } complete = Chunker.IsComplete(); if (complete) { @@ -206,7 +214,7 @@ namespace NActors { } } else if (event.Buffer) { while (const size_t numb = Min<size_t>(External ? task.GetExternalFreeAmount() : task.GetInternalFreeAmount(), - Iter.ContiguousSize())) { + Iter.ContiguousSize(), PartLenRemain)) { const char *obuf = Iter.ContiguousData(); addChunk(obuf, numb, true); Iter += numb; @@ -223,14 +231,43 @@ namespace NActors { } bool TEventOutputChannel::FeedPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) { - return EventInExternalDataChannel - ? FeedExternalPayload(task, event, weightConsumed) - : FeedInlinePayload(task, event, weightConsumed); + for (;;) { + // calculate inline or external part size (it may cover a few sections, not just single one) + while (!PartLenRemain) { + const auto& sections = SerializationInfo->Sections; + if (!Params.UseExternalDataChannel || sections.empty()) { + // all data goes inline + IsPartInline = true; + PartLenRemain = Max<size_t>(); + } else if (!Params.UseXdcShuffle) { + // when UseXdcShuffle feature is not supported by the remote side, we transfer whole event over XDC + IsPartInline = false; + PartLenRemain = Max<size_t>(); + } else { + Y_VERIFY(SectionIndex < sections.size()); + IsPartInline = sections[SectionIndex].IsInline; + while (SectionIndex < sections.size() && IsPartInline == sections[SectionIndex].IsInline) { + PartLenRemain += sections[SectionIndex].Size; + ++SectionIndex; + } + } + } + + // serialize bytes + const auto complete = IsPartInline + ? FeedInlinePayload(task, event, weightConsumed) + : FeedExternalPayload(task, event, weightConsumed); + if (!complete) { // no space to serialize + return false; + } else if (*complete) { // event serialized + return true; + } + } } - bool TEventOutputChannel::FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) { + std::optional<bool> TEventOutputChannel::FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) { if (task.GetInternalFreeAmount() <= sizeof(TChannelPart)) { - return false; + return std::nullopt; } auto partBookmark = task.Bookmark(sizeof(TChannelPart)); @@ -253,10 +290,10 @@ namespace NActors { return complete; } - bool TEventOutputChannel::FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) { + std::optional<bool> TEventOutputChannel::FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) { const size_t partSize = sizeof(TChannelPart) + sizeof(ui8) + sizeof(ui16) + (Params.Encryption ? 0 : sizeof(ui32)); if (task.GetInternalFreeAmount() < partSize || task.GetExternalFreeAmount() == 0) { - return false; + return std::nullopt; } // clear external checksum for this chunk diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h index 2c5a286fe8..ef2da2fda7 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.h +++ b/library/cpp/actors/interconnect/interconnect_channel.h @@ -46,6 +46,7 @@ namespace NActors { enum class EXdcCommand : ui8 { DECLARE_SECTION = 1, PUSH_DATA, + DECLARE_SECTION_INLINE, }; struct TExSerializedEventTooLarge : std::exception { @@ -133,7 +134,8 @@ namespace NActors { TCoroutineChunkSerializer Chunker; TEventSerializationInfo SerializationInfoContainer; const TEventSerializationInfo *SerializationInfo = nullptr; - bool EventInExternalDataChannel; + bool IsPartInline = false; + size_t PartLenRemain = 0; size_t SectionIndex = 0; std::vector<char> XdcData; @@ -141,8 +143,8 @@ namespace NActors { bool SerializeEvent(TTcpPacketOutTask& task, TEventHolder& event, size_t *bytesSerialized); bool FeedPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); - bool FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); - bool FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); + std::optional<bool> FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); + std::optional<bool> FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); bool FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp index 00ebaa5217..4a57fc226c 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.cpp +++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp @@ -752,6 +752,7 @@ namespace NActors { request.SetRequestExtendedTraceFmt(true); request.SetRequestExternalDataChannel(Common->Settings.EnableExternalDataChannel); request.SetRequestXxhash(true); + request.SetRequestXdcShuffle(true); request.SetHandshakeId(*HandshakeId); SendExBlock(MainChannel, request, "ExRequest"); @@ -790,6 +791,7 @@ namespace NActors { Params.AuthOnly = Params.Encryption && success.GetAuthOnly(); Params.UseExternalDataChannel = success.GetUseExternalDataChannel(); Params.UseXxhash = success.GetUseXxhash(); + Params.UseXdcShuffle = success.GetUseXdcShuffle(); if (success.HasServerScopeId()) { ParsePeerScopeId(success.GetServerScopeId()); } @@ -1041,6 +1043,7 @@ namespace NActors { Params.AuthOnly = Params.Encryption && request.GetRequestAuthOnly() && Common->Settings.TlsAuthOnly; Params.UseExternalDataChannel = request.GetRequestExternalDataChannel() && Common->Settings.EnableExternalDataChannel; Params.UseXxhash = request.GetRequestXxhash(); + Params.UseXdcShuffle = request.GetRequestXdcShuffle(); if (Params.UseExternalDataChannel) { if (request.HasHandshakeId()) { @@ -1080,6 +1083,7 @@ namespace NActors { success.SetUseExtendedTraceFmt(true); success.SetUseExternalDataChannel(Params.UseExternalDataChannel); success.SetUseXxhash(Params.UseXxhash); + success.SetUseXdcShuffle(Params.UseXdcShuffle); SendExBlock(MainChannel, record, "ExReply"); // extract sender actor id (self virtual id) diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index bb7e069883..3b42884355 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -449,7 +449,7 @@ namespace NActors { } else if (IgnorePayload) { // throw payload out Payload.EraseFront(part.Size); } else if (!part.IsLastPart()) { // just ordinary inline event data - Payload.ExtractFront(part.Size, &pendingEvent.Payload); + Payload.ExtractFront(part.Size, &pendingEvent.InternalPayload); } else { // event final block TEventDescr2 v2; @@ -529,8 +529,9 @@ namespace NActors { const char *ptr = XdcCommands.data(); const char *end = ptr + XdcCommands.size(); while (ptr != end) { - switch (static_cast<EXdcCommand>(*ptr++)) { - case EXdcCommand::DECLARE_SECTION: { + switch (const auto cmd = static_cast<EXdcCommand>(*ptr++)) { + case EXdcCommand::DECLARE_SECTION: + case EXdcCommand::DECLARE_SECTION_INLINE: { // extract and validate command parameters const ui64 headroom = NInterconnect::NDetail::DeserializeNumber(&ptr, end); const ui64 size = NInterconnect::NDetail::DeserializeNumber(&ptr, end); @@ -542,17 +543,22 @@ namespace NActors { } if (!IgnorePayload) { // process command if packet is being applied - // allocate buffer and push it into the payload auto& pendingEvent = context.PendingEvents.back(); - pendingEvent.SerializationInfo.Sections.push_back(TEventSectionInfo{headroom, size, tailroom, alignment}); - auto buffer = TRcBuf::Uninitialized(size, headroom, tailroom); - if (size) { - context.XdcBuffers.push_back(buffer.GetContiguousSpanMut()); + const bool isInline = cmd == EXdcCommand::DECLARE_SECTION_INLINE; + pendingEvent.SerializationInfo.Sections.push_back(TEventSectionInfo{headroom, size, tailroom, + alignment, isInline}); + + Y_VERIFY(!isInline || Params.UseXdcShuffle); + if (!isInline) { + // allocate buffer and push it into the payload + auto buffer = TRcBuf::Uninitialized(size, headroom, tailroom); + if (size) { + context.XdcBuffers.push_back(buffer.GetContiguousSpanMut()); + } + pendingEvent.ExternalPayload.Insert(pendingEvent.ExternalPayload.End(), TRope(std::move(buffer))); + pendingEvent.XdcSizeLeft += size; + ++XdcSections; } - pendingEvent.Payload.Insert(pendingEvent.Payload.End(), TRope(std::move(buffer))); - pendingEvent.XdcSizeLeft += size; - - ++XdcSections; } continue; } @@ -608,18 +614,57 @@ namespace NActors { if (!pendingEvent.EventData || pendingEvent.XdcSizeLeft) { break; // event is not ready yet } - auto& descr = *pendingEvent.EventData; + + // create aggregated payload + TRope payload; + if (!pendingEvent.SerializationInfo.Sections.empty()) { + // unshuffle inline and external payloads into single event content + TRope *prev = nullptr; + size_t accumSize = 0; + for (const auto& s : pendingEvent.SerializationInfo.Sections) { + TRope *rope = s.IsInline + ? &pendingEvent.InternalPayload + : &pendingEvent.ExternalPayload; + if (rope != prev) { + if (accumSize) { + prev->ExtractFront(accumSize, &payload); + } + prev = rope; + accumSize = 0; + } + accumSize += s.Size; + } + if (accumSize) { + prev->ExtractFront(accumSize, &payload); + } + + if (pendingEvent.InternalPayload || pendingEvent.ExternalPayload) { + LOG_CRIT_IC_SESSION("ICIS19", "unprocessed payload remains after shuffling" + " Type# 0x%08" PRIx32 " InternalPayload.size# %zu ExternalPayload.size# %zu", + descr.Type, pendingEvent.InternalPayload.size(), pendingEvent.ExternalPayload.size()); + Y_VERIFY_DEBUG(false); + throw TExReestablishConnection{TDisconnectReason::FormatError()}; + } + } + + // we add any remains of internal payload to the end + if (auto& rope = pendingEvent.InternalPayload) { + rope.ExtractFront(rope.size(), &payload); + } + // and ensure there is no unprocessed external payload + Y_VERIFY(!pendingEvent.ExternalPayload); + #if IC_FORCE_HARDENED_PACKET_CHECKS - if (descr.Len != pendingEvent.Payload.GetSize()) { + if (descr.Len != payload.GetSize()) { LOG_CRIT_IC_SESSION("ICIS17", "event length mismatch Type# 0x%08" PRIx32 " received# %zu expected# %" PRIu32, - descr.Type, pendingEvent.Payload.GetSize(), descr.Len); + descr.Type, payload.GetSize(), descr.Len); throw TExReestablishConnection{TDisconnectReason::ChecksumError()}; } #endif if (descr.Checksum) { ui32 checksum = 0; - for (const auto&& [data, size] : pendingEvent.Payload) { + for (const auto&& [data, size] : payload) { checksum = Crc32cExtendMSanCompatible(checksum, data, size); } if (checksum != descr.Checksum) { @@ -633,7 +678,7 @@ namespace NActors { descr.Flags & ~IEventHandle::FlagExtendedFormat, descr.Recipient, descr.Sender, - MakeIntrusive<TEventSerializedData>(std::move(pendingEvent.Payload), std::move(pendingEvent.SerializationInfo)), + MakeIntrusive<TEventSerializedData>(std::move(payload), std::move(pendingEvent.SerializationInfo)), descr.Cookie, Params.PeerScopeId, std::move(descr.TraceId)); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index adc9fd3710..e1c555e629 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -126,7 +126,8 @@ namespace NActors { struct TPerChannelContext { struct TPendingEvent { TEventSerializationInfo SerializationInfo; - TRope Payload; + TRope InternalPayload; + TRope ExternalPayload; std::optional<TEventData> EventData; // number of bytes remaining through XDC channel diff --git a/library/cpp/actors/interconnect/types.h b/library/cpp/actors/interconnect/types.h index 7628fbd968..14b1a1c7a6 100644 --- a/library/cpp/actors/interconnect/types.h +++ b/library/cpp/actors/interconnect/types.h @@ -57,6 +57,7 @@ namespace NActors { bool AuthOnly = {}; bool UseExternalDataChannel = {}; bool UseXxhash = {}; + bool UseXdcShuffle = {}; TString AuthCN; NActors::TScopeId PeerScopeId; }; diff --git a/library/cpp/actors/protos/interconnect.proto b/library/cpp/actors/protos/interconnect.proto index 538ede33d1..0e88f3bce5 100644 --- a/library/cpp/actors/protos/interconnect.proto +++ b/library/cpp/actors/protos/interconnect.proto @@ -74,6 +74,7 @@ message THandshakeRequest { optional bool RequestExtendedTraceFmt = 20; optional bool RequestExternalDataChannel = 21; optional bool RequestXxhash = 24; + optional bool RequestXdcShuffle = 25; optional bytes CompatibilityInfo = 22; @@ -102,6 +103,7 @@ message THandshakeSuccess { optional bool UseExtendedTraceFmt = 13; optional bool UseExternalDataChannel = 14; optional bool UseXxhash = 16; + optional bool UseXdcShuffle = 17; optional bytes CompatibilityInfo = 15; } diff --git a/library/cpp/actors/util/CMakeLists.darwin-x86_64.txt b/library/cpp/actors/util/CMakeLists.darwin-x86_64.txt index 6ab1bb43c3..be68d418f7 100644 --- a/library/cpp/actors/util/CMakeLists.darwin-x86_64.txt +++ b/library/cpp/actors/util/CMakeLists.darwin-x86_64.txt @@ -12,6 +12,7 @@ add_library(cpp-actors-util) target_link_libraries(cpp-actors-util PUBLIC contrib-libs-cxxsupp yutil + cpp-containers-absl_flat_hash cpp-deprecated-atomic library-cpp-pop_count ) @@ -19,6 +20,7 @@ target_sources(cpp-actors-util PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/affinity.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/memory_track.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/memory_tracker.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/rope.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/rc_buf.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/shared_data.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/should_continue.cpp diff --git a/library/cpp/actors/util/CMakeLists.linux-aarch64.txt b/library/cpp/actors/util/CMakeLists.linux-aarch64.txt index 4582852947..9c5183c2bd 100644 --- a/library/cpp/actors/util/CMakeLists.linux-aarch64.txt +++ b/library/cpp/actors/util/CMakeLists.linux-aarch64.txt @@ -13,6 +13,7 @@ target_link_libraries(cpp-actors-util PUBLIC contrib-libs-linux-headers contrib-libs-cxxsupp yutil + cpp-containers-absl_flat_hash cpp-deprecated-atomic library-cpp-pop_count ) @@ -20,6 +21,7 @@ target_sources(cpp-actors-util PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/affinity.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/memory_track.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/memory_tracker.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/rope.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/rc_buf.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/shared_data.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/should_continue.cpp diff --git a/library/cpp/actors/util/CMakeLists.linux-x86_64.txt b/library/cpp/actors/util/CMakeLists.linux-x86_64.txt index 4582852947..9c5183c2bd 100644 --- a/library/cpp/actors/util/CMakeLists.linux-x86_64.txt +++ b/library/cpp/actors/util/CMakeLists.linux-x86_64.txt @@ -13,6 +13,7 @@ target_link_libraries(cpp-actors-util PUBLIC contrib-libs-linux-headers contrib-libs-cxxsupp yutil + cpp-containers-absl_flat_hash cpp-deprecated-atomic library-cpp-pop_count ) @@ -20,6 +21,7 @@ target_sources(cpp-actors-util PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/affinity.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/memory_track.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/memory_tracker.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/rope.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/rc_buf.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/shared_data.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/should_continue.cpp diff --git a/library/cpp/actors/util/CMakeLists.windows-x86_64.txt b/library/cpp/actors/util/CMakeLists.windows-x86_64.txt index 6ab1bb43c3..be68d418f7 100644 --- a/library/cpp/actors/util/CMakeLists.windows-x86_64.txt +++ b/library/cpp/actors/util/CMakeLists.windows-x86_64.txt @@ -12,6 +12,7 @@ add_library(cpp-actors-util) target_link_libraries(cpp-actors-util PUBLIC contrib-libs-cxxsupp yutil + cpp-containers-absl_flat_hash cpp-deprecated-atomic library-cpp-pop_count ) @@ -19,6 +20,7 @@ target_sources(cpp-actors-util PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/affinity.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/memory_track.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/memory_tracker.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/rope.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/rc_buf.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/shared_data.cpp ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/should_continue.cpp diff --git a/library/cpp/actors/util/rc_buf.h b/library/cpp/actors/util/rc_buf.h index 60d0ef904a..7b6c68d269 100644 --- a/library/cpp/actors/util/rc_buf.h +++ b/library/cpp/actors/util/rc_buf.h @@ -294,6 +294,13 @@ struct IContiguousChunk : TThrRefBase { return GetDataMut(); } + /** + * Should return true if GetDataMut() would not copy contents when called. + */ + virtual bool IsPrivate() const { + return true; + } + virtual size_t GetOccupiedMemorySize() const = 0; }; @@ -434,8 +441,12 @@ class TRcBuf { using T = std::decay_t<decltype(value)>; if constexpr (std::is_same_v<T, NActors::TSharedData> || std::is_same_v<T, TInternalBackend>) { return value.IsPrivate(); + } else if constexpr (std::is_same_v<T, TString>) { + return value.IsDetached(); + } else if constexpr (std::is_same_v<T, IContiguousChunk::TPtr>) { + return value.RefCount() == 1 && value->IsPrivate(); } else { - return false; + static_assert(TDependentFalse<T>); } }); } @@ -446,14 +457,12 @@ class TRcBuf { } return Visit(Owner, [](EType, auto& value) -> TContiguousSpan { using T = std::decay_t<decltype(value)>; - if constexpr (std::is_same_v<T, TString>) { - return {&(*value.cbegin()), value.size()}; - } else if constexpr (std::is_same_v<T, NActors::TSharedData> || std::is_same_v<T, TInternalBackend>) { + if constexpr (std::is_same_v<T, TString> || std::is_same_v<T, NActors::TSharedData> || std::is_same_v<T, TInternalBackend>) { return {value.data(), value.size()}; } else if constexpr (std::is_same_v<T, IContiguousChunk::TPtr>) { return value->GetData(); } else { - return {}; + static_assert(TDependentFalse<T>, "unexpected type"); } }); } @@ -479,7 +488,7 @@ class TRcBuf { } else if constexpr (std::is_same_v<T, IContiguousChunk::TPtr>) { return value->GetDataMut(); } else { - return {}; + static_assert(TDependentFalse<T>, "unexpected type"); } }); } @@ -497,7 +506,7 @@ class TRcBuf { } else if constexpr (std::is_same_v<T, IContiguousChunk::TPtr>) { return value->UnsafeGetDataMut(); } else { - return {}; + static_assert(TDependentFalse<T>, "unexpected type"); } }); } @@ -515,7 +524,7 @@ class TRcBuf { } else if constexpr (std::is_same_v<T, IContiguousChunk::TPtr>) { return value->GetOccupiedMemorySize(); } else { - Y_FAIL(); + static_assert(TDependentFalse<T>, "unexpected type"); } }); } @@ -582,28 +591,26 @@ class TRcBuf { return false; } - template <class TResult> - TResult GetRaw() const { + template <typename TResult, typename TCallback> + std::invoke_result_t<TCallback, const TResult*> ApplySpecificValue(TCallback&& callback) const { + static_assert(std::is_same_v<TResult, TString> || + std::is_same_v<TResult, NActors::TSharedData> || + std::is_same_v<TResult, TInternalBackend> || + std::is_same_v<TResult, IContiguousChunk::TPtr>); + if (!Owner) { - return TResult{}; + return callback(nullptr); } - return Visit(Owner, [](EType, auto& value) { + return Visit(Owner, [&](EType, auto& value) { using T = std::decay_t<decltype(value)>; if constexpr (std::is_same_v<T, TResult>) { - return value; + return callback(&value); } else { - Y_FAIL(); - return TResult{}; // unreachable + return callback(nullptr); } }); } - NActors::TSharedData GetRawTrimmed(size_t size) const { - NActors::TSharedData result = GetRaw<NActors::TSharedData>(); - result.TrimBack(size); - return result; - } - explicit operator bool() const { return static_cast<bool>(Owner); } @@ -845,28 +852,6 @@ public: return Backend.ContainsNativeType<TType>(); } - template <class TResult> - TResult GetRaw() const { - return Backend.GetRaw<TResult>(); - } - - NActors::TSharedData GetRawTrimmed(size_t size) const { - return Backend.GetRawTrimmed(size); - } - - bool ReferencesWholeContainer() const { - return Backend.GetData().size() == GetSize(); - } - - - bool ReferencesTrimableToWholeContainer() const { - if (ContainsNativeType<NActors::TSharedData>()) { - return Backend.GetData().size() == (GetSize() + UnsafeTailroom()); - } else { - return ReferencesWholeContainer(); - } - } - bool CanGrowFront() const noexcept { return Backend.CanGrowFront(Begin); } @@ -891,14 +876,20 @@ public: return Begin; } - char* GetDataMut() { - const char* oldBegin = Backend.GetData().data(); - ptrdiff_t offset = Begin - oldBegin; - size_t size = GetSize(); - char* newBegin = Backend.GetDataMut().data(); - Begin = newBegin + offset; - End = Begin + size; - return newBegin + offset; + char* GetDataMut(size_t headroom = 0, size_t tailroom = 0) { + const TContiguousSpan backendData = Backend.GetData(); + if (IsPrivate() || (backendData.data() == GetData() && backendData.size() == GetSize())) { // if we own container or reference it whole + const char* oldBegin = backendData.data(); + ptrdiff_t offset = Begin - oldBegin; + size_t size = GetSize(); + char* newBegin = Backend.GetDataMut().data(); + Begin = newBegin + offset; + End = Begin + size; + return newBegin + offset; + } else { // make a copy of referenced data + *this = Copy(GetContiguousSpan(), headroom, tailroom); + return Backend.GetDataMut().data(); + } } char* UnsafeGetDataMut() { @@ -913,18 +904,30 @@ public: template <class TResult> TResult ExtractUnderlyingContainerOrCopy() const { - if (ContainsNativeType<TResult>() && (ReferencesWholeContainer() || ReferencesTrimableToWholeContainer())) { - using T = std::decay_t<TResult>; - if constexpr (std::is_same_v<T, NActors::TSharedData>) { - return GetRawTrimmed(GetSize()); - } else { - return GetRaw<TResult>(); + static_assert(std::is_same_v<TResult, TString> || + std::is_same_v<TResult, NActors::TSharedData> || + std::is_same_v<TResult, TInternalBackend>); + + constexpr bool isSharedData = std::is_same_v<TResult, NActors::TSharedData>; + TResult res; + + const bool found = Backend.ApplySpecificValue<TResult>([&](const TResult *raw) { + if (raw && raw->data() == Begin && (isSharedData ? End <= Begin + raw->size() : End == Begin + raw->size())) { + if constexpr (isSharedData) { + raw->TrimBack(size()); + } + res = TResult(*raw); + return true; } + return false; + }); + + if (!found) { + res = TResult::Uninitialized(GetSize()); + char* data = NContiguousDataDetails::TContainerTraits<TResult>::UnsafeGetDataMut(res); + std::memcpy(data, GetData(), GetSize()); } - TResult res = TResult::Uninitialized(GetSize()); - char* data = NContiguousDataDetails::TContainerTraits<TResult>::UnsafeGetDataMut(res); - std::memcpy(data, Begin, End - Begin); return res; } @@ -932,7 +935,7 @@ public: return {GetData(), GetSize()}; } - TStringBuf Slice(size_t pos = 0, size_t len = -1) const noexcept { + TStringBuf Slice(size_t pos = 0, size_t len = Max<size_t>()) const noexcept { pos = Min(pos, size()); len = Min(len, size() - pos); return {const_cast<TRcBuf*>(this)->UnsafeGetDataMut() + pos, len}; @@ -1078,6 +1081,10 @@ public: return GetDataMut(); } + bool IsPrivate() const { + return Backend.IsPrivate(); + } + size_t UnsafeHeadroom() const { return Begin - Backend.GetData().data(); } diff --git a/library/cpp/actors/util/rope.cpp b/library/cpp/actors/util/rope.cpp new file mode 100644 index 0000000000..0927774099 --- /dev/null +++ b/library/cpp/actors/util/rope.cpp @@ -0,0 +1,13 @@ +#include "rope.h" +#include <library/cpp/containers/absl_flat_hash/flat_hash_set.h> + +size_t TRope::GetOccupiedMemorySize() const { + size_t res = 0; + absl::flat_hash_set<const void*> chunks; + for (const auto& chunk : Chain) { + if (const auto [it, inserted] = chunks.insert(chunk.Backend.UniqueId()); inserted) { + res += chunk.Backend.GetOccupiedMemorySize(); + } + } + return res; +} diff --git a/library/cpp/actors/util/rope.h b/library/cpp/actors/util/rope.h index 3c65588181..201ce06f0d 100644 --- a/library/cpp/actors/util/rope.h +++ b/library/cpp/actors/util/rope.h @@ -455,6 +455,10 @@ public: return !Size; } + bool empty() const { + return IsEmpty(); + } + operator bool() const { return Chain; } @@ -699,10 +703,7 @@ public: // Use this method carefully -- it may significantly reduce performance when misused. TString ConvertToString() const { - // TODO(innokentii): could be microoptimized for single TString case - TString res = TString::Uninitialized(GetSize()); - Begin().ExtractPlainDataAndAdvance(res.Detach(), res.size()); - return res; + return ExtractUnderlyingContainerOrCopy<TString>(); } /** @@ -710,14 +711,14 @@ public: */ template <class TResult> TResult ExtractUnderlyingContainerOrCopy() const { - if (IsContiguous() && GetSize() != 0) { - const auto& chunk = Begin().GetChunk(); - return chunk.ExtractUnderlyingContainerOrCopy<TResult>(); + if (Chain.begin() != Chain.end() && ++Chain.begin() == Chain.end()) { + return Chain.GetFirstChunk().ExtractUnderlyingContainerOrCopy<TResult>(); } - TResult res = TResult::Uninitialized(GetSize()); + const size_t size = GetSize(); + TResult res = TResult::Uninitialized(size); char* data = NContiguousDataDetails::TContainerTraits<TResult>::UnsafeGetDataMut(res); - Begin().ExtractPlainDataAndAdvance(data, res.size()); + Begin().ExtractPlainDataAndAdvance(data, size); return res; } @@ -805,6 +806,8 @@ public: return TRcBuf(Begin().GetChunk()); } + size_t GetOccupiedMemorySize() const; + friend bool operator==(const TRope& x, const TRope& y) { return Compare(x, y) == 0; } friend bool operator!=(const TRope& x, const TRope& y) { return Compare(x, y) != 0; } friend bool operator< (const TRope& x, const TRope& y) { return Compare(x, y) < 0; } diff --git a/library/cpp/actors/util/shared_data_rope_backend.h b/library/cpp/actors/util/shared_data_rope_backend.h index 2abfcf5584..a221ae668b 100644 --- a/library/cpp/actors/util/shared_data_rope_backend.h +++ b/library/cpp/actors/util/shared_data_rope_backend.h @@ -29,6 +29,10 @@ public: return {const_cast<char *>(Buffer.data()), Buffer.size()}; } + bool IsPrivate() const override { + return Buffer.IsPrivate(); + } + size_t GetOccupiedMemorySize() const override { return Buffer.size(); } diff --git a/library/cpp/actors/util/ut/CMakeLists.darwin-x86_64.txt b/library/cpp/actors/util/ut/CMakeLists.darwin-x86_64.txt index cb32e0652f..f02b2d926c 100644 --- a/library/cpp/actors/util/ut/CMakeLists.darwin-x86_64.txt +++ b/library/cpp/actors/util/ut/CMakeLists.darwin-x86_64.txt @@ -22,6 +22,8 @@ target_link_options(library-cpp-actors-util-ut PRIVATE -Wl,-platform_version,macos,11.0,11.0 -fPIC -fPIC + -framework + CoreFoundation ) target_sources(library-cpp-actors-util-ut PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/actors/util/cpu_load_log_ut.cpp diff --git a/library/cpp/actors/util/ya.make b/library/cpp/actors/util/ya.make index 912b55868d..48d595c156 100644 --- a/library/cpp/actors/util/ya.make +++ b/library/cpp/actors/util/ya.make @@ -19,6 +19,7 @@ SRCS( memory_tracker.cpp memory_tracker.h recentwnd.h + rope.cpp rope.h rc_buf.cpp rc_buf.h @@ -37,6 +38,7 @@ SRCS( ) PEERDIR( + library/cpp/containers/absl_flat_hash library/cpp/deprecated/atomic library/cpp/pop_count ) |