diff options
author | Daniil Cherednik <dcherednik@ydb.tech> | 2023-08-25 09:14:00 +0000 |
---|---|---|
committer | Daniil Cherednik <dcherednik@ydb.tech> | 2023-08-25 09:14:00 +0000 |
commit | 1aea989538126dcf9bb99aa87313ba942e679e7b (patch) | |
tree | 5f89fae597bbf8cfaf58c56fd2313d1896a956bb /library/cpp/actors/interconnect | |
parent | 41effae1b14cbd91927d4d7746c935f773ee87ef (diff) | |
download | ydb-1aea989538126dcf9bb99aa87313ba942e679e7b.tar.gz |
Create stable-23-3 branch
x-stable-origin-commit: 3224c68a1e19d5457dc64c1c4f3260f7cd718558
Diffstat (limited to 'library/cpp/actors/interconnect')
6 files changed, 126 insertions, 36 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp index e69483a8e9..6987c344de 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.cpp +++ b/library/cpp/actors/interconnect/interconnect_channel.cpp @@ -75,6 +75,8 @@ namespace NActors { State = EState::BODY; Iter = event.Buffer->GetBeginIter(); SerializationInfo = &event.Buffer->GetSerializationInfo(); + SectionIndex = 0; + PartLenRemain = 0; } else if (event.Event) { State = EState::BODY; IEventBase *base = event.Event.Get(); @@ -83,15 +85,16 @@ namespace NActors { } SerializationInfoContainer = base->CreateSerializationInfo(); SerializationInfo = &SerializationInfoContainer; + SectionIndex = 0; + PartLenRemain = 0; } else { // event without buffer and IEventBase instance State = EState::DESCRIPTOR; SerializationInfoContainer = {}; SerializationInfo = &SerializationInfoContainer; } - EventInExternalDataChannel = Params.UseExternalDataChannel && !SerializationInfo->Sections.empty(); if (!event.EventSerializedSize) { State = EState::DESCRIPTOR; - } else if (EventInExternalDataChannel) { + } else if (Params.UseExternalDataChannel && !SerializationInfo->Sections.empty()) { State = EState::SECTIONS; SectionIndex = 0; } @@ -130,11 +133,15 @@ namespace NActors { char *p = sectionInfo; const auto& section = SerializationInfo->Sections[SectionIndex]; - *p++ = static_cast<ui8>(EXdcCommand::DECLARE_SECTION); + char& type = *p++; + type = static_cast<ui8>(EXdcCommand::DECLARE_SECTION); p += NInterconnect::NDetail::SerializeNumber(section.Headroom, p); p += NInterconnect::NDetail::SerializeNumber(section.Size, p); p += NInterconnect::NDetail::SerializeNumber(section.Tailroom, p); p += NInterconnect::NDetail::SerializeNumber(section.Alignment, p); + if (section.IsInline && Params.UseXdcShuffle) { + type = static_cast<ui8>(EXdcCommand::DECLARE_SECTION_INLINE); + } Y_VERIFY(p <= std::end(sectionInfo)); const size_t declareLen = p - sectionInfo; @@ -161,6 +168,8 @@ namespace NActors { if (SectionIndex == SerializationInfo->Sections.size()) { State = EState::BODY; + SectionIndex = 0; + PartLenRemain = 0; } break; @@ -179,6 +188,8 @@ namespace NActors { task.Append<External>(data, len); } *bytesSerialized += len; + Y_VERIFY_DEBUG(len <= PartLenRemain); + PartLenRemain -= len; event.EventActuallySerialized += len; if (event.EventActuallySerialized > MaxSerializedEventSize) { @@ -189,15 +200,12 @@ namespace NActors { bool complete = false; if (event.Event) { while (!complete) { - TMutableContiguousSpan out = task.AcquireSpanForWriting<External>(); + TMutableContiguousSpan out = task.AcquireSpanForWriting<External>().SubSpan(0, PartLenRemain); if (!out.size()) { break; } - ui32 bytesFed = 0; for (const auto& [buffer, size] : Chunker.FeedBuf(out.data(), out.size())) { addChunk(buffer, size, false); - bytesFed += size; - Y_VERIFY(bytesFed <= out.size()); } complete = Chunker.IsComplete(); if (complete) { @@ -206,7 +214,7 @@ namespace NActors { } } else if (event.Buffer) { while (const size_t numb = Min<size_t>(External ? task.GetExternalFreeAmount() : task.GetInternalFreeAmount(), - Iter.ContiguousSize())) { + Iter.ContiguousSize(), PartLenRemain)) { const char *obuf = Iter.ContiguousData(); addChunk(obuf, numb, true); Iter += numb; @@ -223,14 +231,43 @@ namespace NActors { } bool TEventOutputChannel::FeedPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) { - return EventInExternalDataChannel - ? FeedExternalPayload(task, event, weightConsumed) - : FeedInlinePayload(task, event, weightConsumed); + for (;;) { + // calculate inline or external part size (it may cover a few sections, not just single one) + while (!PartLenRemain) { + const auto& sections = SerializationInfo->Sections; + if (!Params.UseExternalDataChannel || sections.empty()) { + // all data goes inline + IsPartInline = true; + PartLenRemain = Max<size_t>(); + } else if (!Params.UseXdcShuffle) { + // when UseXdcShuffle feature is not supported by the remote side, we transfer whole event over XDC + IsPartInline = false; + PartLenRemain = Max<size_t>(); + } else { + Y_VERIFY(SectionIndex < sections.size()); + IsPartInline = sections[SectionIndex].IsInline; + while (SectionIndex < sections.size() && IsPartInline == sections[SectionIndex].IsInline) { + PartLenRemain += sections[SectionIndex].Size; + ++SectionIndex; + } + } + } + + // serialize bytes + const auto complete = IsPartInline + ? FeedInlinePayload(task, event, weightConsumed) + : FeedExternalPayload(task, event, weightConsumed); + if (!complete) { // no space to serialize + return false; + } else if (*complete) { // event serialized + return true; + } + } } - bool TEventOutputChannel::FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) { + std::optional<bool> TEventOutputChannel::FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) { if (task.GetInternalFreeAmount() <= sizeof(TChannelPart)) { - return false; + return std::nullopt; } auto partBookmark = task.Bookmark(sizeof(TChannelPart)); @@ -253,10 +290,10 @@ namespace NActors { return complete; } - bool TEventOutputChannel::FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) { + std::optional<bool> TEventOutputChannel::FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) { const size_t partSize = sizeof(TChannelPart) + sizeof(ui8) + sizeof(ui16) + (Params.Encryption ? 0 : sizeof(ui32)); if (task.GetInternalFreeAmount() < partSize || task.GetExternalFreeAmount() == 0) { - return false; + return std::nullopt; } // clear external checksum for this chunk diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h index 2c5a286fe8..ef2da2fda7 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.h +++ b/library/cpp/actors/interconnect/interconnect_channel.h @@ -46,6 +46,7 @@ namespace NActors { enum class EXdcCommand : ui8 { DECLARE_SECTION = 1, PUSH_DATA, + DECLARE_SECTION_INLINE, }; struct TExSerializedEventTooLarge : std::exception { @@ -133,7 +134,8 @@ namespace NActors { TCoroutineChunkSerializer Chunker; TEventSerializationInfo SerializationInfoContainer; const TEventSerializationInfo *SerializationInfo = nullptr; - bool EventInExternalDataChannel; + bool IsPartInline = false; + size_t PartLenRemain = 0; size_t SectionIndex = 0; std::vector<char> XdcData; @@ -141,8 +143,8 @@ namespace NActors { bool SerializeEvent(TTcpPacketOutTask& task, TEventHolder& event, size_t *bytesSerialized); bool FeedPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); - bool FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); - bool FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); + std::optional<bool> FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); + std::optional<bool> FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); bool FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp index 00ebaa5217..4a57fc226c 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.cpp +++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp @@ -752,6 +752,7 @@ namespace NActors { request.SetRequestExtendedTraceFmt(true); request.SetRequestExternalDataChannel(Common->Settings.EnableExternalDataChannel); request.SetRequestXxhash(true); + request.SetRequestXdcShuffle(true); request.SetHandshakeId(*HandshakeId); SendExBlock(MainChannel, request, "ExRequest"); @@ -790,6 +791,7 @@ namespace NActors { Params.AuthOnly = Params.Encryption && success.GetAuthOnly(); Params.UseExternalDataChannel = success.GetUseExternalDataChannel(); Params.UseXxhash = success.GetUseXxhash(); + Params.UseXdcShuffle = success.GetUseXdcShuffle(); if (success.HasServerScopeId()) { ParsePeerScopeId(success.GetServerScopeId()); } @@ -1041,6 +1043,7 @@ namespace NActors { Params.AuthOnly = Params.Encryption && request.GetRequestAuthOnly() && Common->Settings.TlsAuthOnly; Params.UseExternalDataChannel = request.GetRequestExternalDataChannel() && Common->Settings.EnableExternalDataChannel; Params.UseXxhash = request.GetRequestXxhash(); + Params.UseXdcShuffle = request.GetRequestXdcShuffle(); if (Params.UseExternalDataChannel) { if (request.HasHandshakeId()) { @@ -1080,6 +1083,7 @@ namespace NActors { success.SetUseExtendedTraceFmt(true); success.SetUseExternalDataChannel(Params.UseExternalDataChannel); success.SetUseXxhash(Params.UseXxhash); + success.SetUseXdcShuffle(Params.UseXdcShuffle); SendExBlock(MainChannel, record, "ExReply"); // extract sender actor id (self virtual id) diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index bb7e069883..3b42884355 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -449,7 +449,7 @@ namespace NActors { } else if (IgnorePayload) { // throw payload out Payload.EraseFront(part.Size); } else if (!part.IsLastPart()) { // just ordinary inline event data - Payload.ExtractFront(part.Size, &pendingEvent.Payload); + Payload.ExtractFront(part.Size, &pendingEvent.InternalPayload); } else { // event final block TEventDescr2 v2; @@ -529,8 +529,9 @@ namespace NActors { const char *ptr = XdcCommands.data(); const char *end = ptr + XdcCommands.size(); while (ptr != end) { - switch (static_cast<EXdcCommand>(*ptr++)) { - case EXdcCommand::DECLARE_SECTION: { + switch (const auto cmd = static_cast<EXdcCommand>(*ptr++)) { + case EXdcCommand::DECLARE_SECTION: + case EXdcCommand::DECLARE_SECTION_INLINE: { // extract and validate command parameters const ui64 headroom = NInterconnect::NDetail::DeserializeNumber(&ptr, end); const ui64 size = NInterconnect::NDetail::DeserializeNumber(&ptr, end); @@ -542,17 +543,22 @@ namespace NActors { } if (!IgnorePayload) { // process command if packet is being applied - // allocate buffer and push it into the payload auto& pendingEvent = context.PendingEvents.back(); - pendingEvent.SerializationInfo.Sections.push_back(TEventSectionInfo{headroom, size, tailroom, alignment}); - auto buffer = TRcBuf::Uninitialized(size, headroom, tailroom); - if (size) { - context.XdcBuffers.push_back(buffer.GetContiguousSpanMut()); + const bool isInline = cmd == EXdcCommand::DECLARE_SECTION_INLINE; + pendingEvent.SerializationInfo.Sections.push_back(TEventSectionInfo{headroom, size, tailroom, + alignment, isInline}); + + Y_VERIFY(!isInline || Params.UseXdcShuffle); + if (!isInline) { + // allocate buffer and push it into the payload + auto buffer = TRcBuf::Uninitialized(size, headroom, tailroom); + if (size) { + context.XdcBuffers.push_back(buffer.GetContiguousSpanMut()); + } + pendingEvent.ExternalPayload.Insert(pendingEvent.ExternalPayload.End(), TRope(std::move(buffer))); + pendingEvent.XdcSizeLeft += size; + ++XdcSections; } - pendingEvent.Payload.Insert(pendingEvent.Payload.End(), TRope(std::move(buffer))); - pendingEvent.XdcSizeLeft += size; - - ++XdcSections; } continue; } @@ -608,18 +614,57 @@ namespace NActors { if (!pendingEvent.EventData || pendingEvent.XdcSizeLeft) { break; // event is not ready yet } - auto& descr = *pendingEvent.EventData; + + // create aggregated payload + TRope payload; + if (!pendingEvent.SerializationInfo.Sections.empty()) { + // unshuffle inline and external payloads into single event content + TRope *prev = nullptr; + size_t accumSize = 0; + for (const auto& s : pendingEvent.SerializationInfo.Sections) { + TRope *rope = s.IsInline + ? &pendingEvent.InternalPayload + : &pendingEvent.ExternalPayload; + if (rope != prev) { + if (accumSize) { + prev->ExtractFront(accumSize, &payload); + } + prev = rope; + accumSize = 0; + } + accumSize += s.Size; + } + if (accumSize) { + prev->ExtractFront(accumSize, &payload); + } + + if (pendingEvent.InternalPayload || pendingEvent.ExternalPayload) { + LOG_CRIT_IC_SESSION("ICIS19", "unprocessed payload remains after shuffling" + " Type# 0x%08" PRIx32 " InternalPayload.size# %zu ExternalPayload.size# %zu", + descr.Type, pendingEvent.InternalPayload.size(), pendingEvent.ExternalPayload.size()); + Y_VERIFY_DEBUG(false); + throw TExReestablishConnection{TDisconnectReason::FormatError()}; + } + } + + // we add any remains of internal payload to the end + if (auto& rope = pendingEvent.InternalPayload) { + rope.ExtractFront(rope.size(), &payload); + } + // and ensure there is no unprocessed external payload + Y_VERIFY(!pendingEvent.ExternalPayload); + #if IC_FORCE_HARDENED_PACKET_CHECKS - if (descr.Len != pendingEvent.Payload.GetSize()) { + if (descr.Len != payload.GetSize()) { LOG_CRIT_IC_SESSION("ICIS17", "event length mismatch Type# 0x%08" PRIx32 " received# %zu expected# %" PRIu32, - descr.Type, pendingEvent.Payload.GetSize(), descr.Len); + descr.Type, payload.GetSize(), descr.Len); throw TExReestablishConnection{TDisconnectReason::ChecksumError()}; } #endif if (descr.Checksum) { ui32 checksum = 0; - for (const auto&& [data, size] : pendingEvent.Payload) { + for (const auto&& [data, size] : payload) { checksum = Crc32cExtendMSanCompatible(checksum, data, size); } if (checksum != descr.Checksum) { @@ -633,7 +678,7 @@ namespace NActors { descr.Flags & ~IEventHandle::FlagExtendedFormat, descr.Recipient, descr.Sender, - MakeIntrusive<TEventSerializedData>(std::move(pendingEvent.Payload), std::move(pendingEvent.SerializationInfo)), + MakeIntrusive<TEventSerializedData>(std::move(payload), std::move(pendingEvent.SerializationInfo)), descr.Cookie, Params.PeerScopeId, std::move(descr.TraceId)); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index adc9fd3710..e1c555e629 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -126,7 +126,8 @@ namespace NActors { struct TPerChannelContext { struct TPendingEvent { TEventSerializationInfo SerializationInfo; - TRope Payload; + TRope InternalPayload; + TRope ExternalPayload; std::optional<TEventData> EventData; // number of bytes remaining through XDC channel diff --git a/library/cpp/actors/interconnect/types.h b/library/cpp/actors/interconnect/types.h index 7628fbd968..14b1a1c7a6 100644 --- a/library/cpp/actors/interconnect/types.h +++ b/library/cpp/actors/interconnect/types.h @@ -57,6 +57,7 @@ namespace NActors { bool AuthOnly = {}; bool UseExternalDataChannel = {}; bool UseXxhash = {}; + bool UseXdcShuffle = {}; TString AuthCN; NActors::TScopeId PeerScopeId; }; |