aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect
diff options
context:
space:
mode:
authorDaniil Cherednik <dcherednik@ydb.tech>2023-08-25 09:14:00 +0000
committerDaniil Cherednik <dcherednik@ydb.tech>2023-08-25 09:14:00 +0000
commit1aea989538126dcf9bb99aa87313ba942e679e7b (patch)
tree5f89fae597bbf8cfaf58c56fd2313d1896a956bb /library/cpp/actors/interconnect
parent41effae1b14cbd91927d4d7746c935f773ee87ef (diff)
downloadydb-1aea989538126dcf9bb99aa87313ba942e679e7b.tar.gz
Create stable-23-3 branch
x-stable-origin-commit: 3224c68a1e19d5457dc64c1c4f3260f7cd718558
Diffstat (limited to 'library/cpp/actors/interconnect')
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.cpp67
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.h8
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.cpp4
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp79
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.h3
-rw-r--r--library/cpp/actors/interconnect/types.h1
6 files changed, 126 insertions, 36 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp
index e69483a8e9..6987c344de 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.cpp
+++ b/library/cpp/actors/interconnect/interconnect_channel.cpp
@@ -75,6 +75,8 @@ namespace NActors {
State = EState::BODY;
Iter = event.Buffer->GetBeginIter();
SerializationInfo = &event.Buffer->GetSerializationInfo();
+ SectionIndex = 0;
+ PartLenRemain = 0;
} else if (event.Event) {
State = EState::BODY;
IEventBase *base = event.Event.Get();
@@ -83,15 +85,16 @@ namespace NActors {
}
SerializationInfoContainer = base->CreateSerializationInfo();
SerializationInfo = &SerializationInfoContainer;
+ SectionIndex = 0;
+ PartLenRemain = 0;
} else { // event without buffer and IEventBase instance
State = EState::DESCRIPTOR;
SerializationInfoContainer = {};
SerializationInfo = &SerializationInfoContainer;
}
- EventInExternalDataChannel = Params.UseExternalDataChannel && !SerializationInfo->Sections.empty();
if (!event.EventSerializedSize) {
State = EState::DESCRIPTOR;
- } else if (EventInExternalDataChannel) {
+ } else if (Params.UseExternalDataChannel && !SerializationInfo->Sections.empty()) {
State = EState::SECTIONS;
SectionIndex = 0;
}
@@ -130,11 +133,15 @@ namespace NActors {
char *p = sectionInfo;
const auto& section = SerializationInfo->Sections[SectionIndex];
- *p++ = static_cast<ui8>(EXdcCommand::DECLARE_SECTION);
+ char& type = *p++;
+ type = static_cast<ui8>(EXdcCommand::DECLARE_SECTION);
p += NInterconnect::NDetail::SerializeNumber(section.Headroom, p);
p += NInterconnect::NDetail::SerializeNumber(section.Size, p);
p += NInterconnect::NDetail::SerializeNumber(section.Tailroom, p);
p += NInterconnect::NDetail::SerializeNumber(section.Alignment, p);
+ if (section.IsInline && Params.UseXdcShuffle) {
+ type = static_cast<ui8>(EXdcCommand::DECLARE_SECTION_INLINE);
+ }
Y_VERIFY(p <= std::end(sectionInfo));
const size_t declareLen = p - sectionInfo;
@@ -161,6 +168,8 @@ namespace NActors {
if (SectionIndex == SerializationInfo->Sections.size()) {
State = EState::BODY;
+ SectionIndex = 0;
+ PartLenRemain = 0;
}
break;
@@ -179,6 +188,8 @@ namespace NActors {
task.Append<External>(data, len);
}
*bytesSerialized += len;
+ Y_VERIFY_DEBUG(len <= PartLenRemain);
+ PartLenRemain -= len;
event.EventActuallySerialized += len;
if (event.EventActuallySerialized > MaxSerializedEventSize) {
@@ -189,15 +200,12 @@ namespace NActors {
bool complete = false;
if (event.Event) {
while (!complete) {
- TMutableContiguousSpan out = task.AcquireSpanForWriting<External>();
+ TMutableContiguousSpan out = task.AcquireSpanForWriting<External>().SubSpan(0, PartLenRemain);
if (!out.size()) {
break;
}
- ui32 bytesFed = 0;
for (const auto& [buffer, size] : Chunker.FeedBuf(out.data(), out.size())) {
addChunk(buffer, size, false);
- bytesFed += size;
- Y_VERIFY(bytesFed <= out.size());
}
complete = Chunker.IsComplete();
if (complete) {
@@ -206,7 +214,7 @@ namespace NActors {
}
} else if (event.Buffer) {
while (const size_t numb = Min<size_t>(External ? task.GetExternalFreeAmount() : task.GetInternalFreeAmount(),
- Iter.ContiguousSize())) {
+ Iter.ContiguousSize(), PartLenRemain)) {
const char *obuf = Iter.ContiguousData();
addChunk(obuf, numb, true);
Iter += numb;
@@ -223,14 +231,43 @@ namespace NActors {
}
bool TEventOutputChannel::FeedPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) {
- return EventInExternalDataChannel
- ? FeedExternalPayload(task, event, weightConsumed)
- : FeedInlinePayload(task, event, weightConsumed);
+ for (;;) {
+ // calculate inline or external part size (it may cover a few sections, not just single one)
+ while (!PartLenRemain) {
+ const auto& sections = SerializationInfo->Sections;
+ if (!Params.UseExternalDataChannel || sections.empty()) {
+ // all data goes inline
+ IsPartInline = true;
+ PartLenRemain = Max<size_t>();
+ } else if (!Params.UseXdcShuffle) {
+ // when UseXdcShuffle feature is not supported by the remote side, we transfer whole event over XDC
+ IsPartInline = false;
+ PartLenRemain = Max<size_t>();
+ } else {
+ Y_VERIFY(SectionIndex < sections.size());
+ IsPartInline = sections[SectionIndex].IsInline;
+ while (SectionIndex < sections.size() && IsPartInline == sections[SectionIndex].IsInline) {
+ PartLenRemain += sections[SectionIndex].Size;
+ ++SectionIndex;
+ }
+ }
+ }
+
+ // serialize bytes
+ const auto complete = IsPartInline
+ ? FeedInlinePayload(task, event, weightConsumed)
+ : FeedExternalPayload(task, event, weightConsumed);
+ if (!complete) { // no space to serialize
+ return false;
+ } else if (*complete) { // event serialized
+ return true;
+ }
+ }
}
- bool TEventOutputChannel::FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) {
+ std::optional<bool> TEventOutputChannel::FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) {
if (task.GetInternalFreeAmount() <= sizeof(TChannelPart)) {
- return false;
+ return std::nullopt;
}
auto partBookmark = task.Bookmark(sizeof(TChannelPart));
@@ -253,10 +290,10 @@ namespace NActors {
return complete;
}
- bool TEventOutputChannel::FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) {
+ std::optional<bool> TEventOutputChannel::FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) {
const size_t partSize = sizeof(TChannelPart) + sizeof(ui8) + sizeof(ui16) + (Params.Encryption ? 0 : sizeof(ui32));
if (task.GetInternalFreeAmount() < partSize || task.GetExternalFreeAmount() == 0) {
- return false;
+ return std::nullopt;
}
// clear external checksum for this chunk
diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h
index 2c5a286fe8..ef2da2fda7 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.h
+++ b/library/cpp/actors/interconnect/interconnect_channel.h
@@ -46,6 +46,7 @@ namespace NActors {
enum class EXdcCommand : ui8 {
DECLARE_SECTION = 1,
PUSH_DATA,
+ DECLARE_SECTION_INLINE,
};
struct TExSerializedEventTooLarge : std::exception {
@@ -133,7 +134,8 @@ namespace NActors {
TCoroutineChunkSerializer Chunker;
TEventSerializationInfo SerializationInfoContainer;
const TEventSerializationInfo *SerializationInfo = nullptr;
- bool EventInExternalDataChannel;
+ bool IsPartInline = false;
+ size_t PartLenRemain = 0;
size_t SectionIndex = 0;
std::vector<char> XdcData;
@@ -141,8 +143,8 @@ namespace NActors {
bool SerializeEvent(TTcpPacketOutTask& task, TEventHolder& event, size_t *bytesSerialized);
bool FeedPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
- bool FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
- bool FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
+ std::optional<bool> FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
+ std::optional<bool> FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
bool FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp
index 00ebaa5217..4a57fc226c 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.cpp
+++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp
@@ -752,6 +752,7 @@ namespace NActors {
request.SetRequestExtendedTraceFmt(true);
request.SetRequestExternalDataChannel(Common->Settings.EnableExternalDataChannel);
request.SetRequestXxhash(true);
+ request.SetRequestXdcShuffle(true);
request.SetHandshakeId(*HandshakeId);
SendExBlock(MainChannel, request, "ExRequest");
@@ -790,6 +791,7 @@ namespace NActors {
Params.AuthOnly = Params.Encryption && success.GetAuthOnly();
Params.UseExternalDataChannel = success.GetUseExternalDataChannel();
Params.UseXxhash = success.GetUseXxhash();
+ Params.UseXdcShuffle = success.GetUseXdcShuffle();
if (success.HasServerScopeId()) {
ParsePeerScopeId(success.GetServerScopeId());
}
@@ -1041,6 +1043,7 @@ namespace NActors {
Params.AuthOnly = Params.Encryption && request.GetRequestAuthOnly() && Common->Settings.TlsAuthOnly;
Params.UseExternalDataChannel = request.GetRequestExternalDataChannel() && Common->Settings.EnableExternalDataChannel;
Params.UseXxhash = request.GetRequestXxhash();
+ Params.UseXdcShuffle = request.GetRequestXdcShuffle();
if (Params.UseExternalDataChannel) {
if (request.HasHandshakeId()) {
@@ -1080,6 +1083,7 @@ namespace NActors {
success.SetUseExtendedTraceFmt(true);
success.SetUseExternalDataChannel(Params.UseExternalDataChannel);
success.SetUseXxhash(Params.UseXxhash);
+ success.SetUseXdcShuffle(Params.UseXdcShuffle);
SendExBlock(MainChannel, record, "ExReply");
// extract sender actor id (self virtual id)
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
index bb7e069883..3b42884355 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
@@ -449,7 +449,7 @@ namespace NActors {
} else if (IgnorePayload) { // throw payload out
Payload.EraseFront(part.Size);
} else if (!part.IsLastPart()) { // just ordinary inline event data
- Payload.ExtractFront(part.Size, &pendingEvent.Payload);
+ Payload.ExtractFront(part.Size, &pendingEvent.InternalPayload);
} else { // event final block
TEventDescr2 v2;
@@ -529,8 +529,9 @@ namespace NActors {
const char *ptr = XdcCommands.data();
const char *end = ptr + XdcCommands.size();
while (ptr != end) {
- switch (static_cast<EXdcCommand>(*ptr++)) {
- case EXdcCommand::DECLARE_SECTION: {
+ switch (const auto cmd = static_cast<EXdcCommand>(*ptr++)) {
+ case EXdcCommand::DECLARE_SECTION:
+ case EXdcCommand::DECLARE_SECTION_INLINE: {
// extract and validate command parameters
const ui64 headroom = NInterconnect::NDetail::DeserializeNumber(&ptr, end);
const ui64 size = NInterconnect::NDetail::DeserializeNumber(&ptr, end);
@@ -542,17 +543,22 @@ namespace NActors {
}
if (!IgnorePayload) { // process command if packet is being applied
- // allocate buffer and push it into the payload
auto& pendingEvent = context.PendingEvents.back();
- pendingEvent.SerializationInfo.Sections.push_back(TEventSectionInfo{headroom, size, tailroom, alignment});
- auto buffer = TRcBuf::Uninitialized(size, headroom, tailroom);
- if (size) {
- context.XdcBuffers.push_back(buffer.GetContiguousSpanMut());
+ const bool isInline = cmd == EXdcCommand::DECLARE_SECTION_INLINE;
+ pendingEvent.SerializationInfo.Sections.push_back(TEventSectionInfo{headroom, size, tailroom,
+ alignment, isInline});
+
+ Y_VERIFY(!isInline || Params.UseXdcShuffle);
+ if (!isInline) {
+ // allocate buffer and push it into the payload
+ auto buffer = TRcBuf::Uninitialized(size, headroom, tailroom);
+ if (size) {
+ context.XdcBuffers.push_back(buffer.GetContiguousSpanMut());
+ }
+ pendingEvent.ExternalPayload.Insert(pendingEvent.ExternalPayload.End(), TRope(std::move(buffer)));
+ pendingEvent.XdcSizeLeft += size;
+ ++XdcSections;
}
- pendingEvent.Payload.Insert(pendingEvent.Payload.End(), TRope(std::move(buffer)));
- pendingEvent.XdcSizeLeft += size;
-
- ++XdcSections;
}
continue;
}
@@ -608,18 +614,57 @@ namespace NActors {
if (!pendingEvent.EventData || pendingEvent.XdcSizeLeft) {
break; // event is not ready yet
}
-
auto& descr = *pendingEvent.EventData;
+
+ // create aggregated payload
+ TRope payload;
+ if (!pendingEvent.SerializationInfo.Sections.empty()) {
+ // unshuffle inline and external payloads into single event content
+ TRope *prev = nullptr;
+ size_t accumSize = 0;
+ for (const auto& s : pendingEvent.SerializationInfo.Sections) {
+ TRope *rope = s.IsInline
+ ? &pendingEvent.InternalPayload
+ : &pendingEvent.ExternalPayload;
+ if (rope != prev) {
+ if (accumSize) {
+ prev->ExtractFront(accumSize, &payload);
+ }
+ prev = rope;
+ accumSize = 0;
+ }
+ accumSize += s.Size;
+ }
+ if (accumSize) {
+ prev->ExtractFront(accumSize, &payload);
+ }
+
+ if (pendingEvent.InternalPayload || pendingEvent.ExternalPayload) {
+ LOG_CRIT_IC_SESSION("ICIS19", "unprocessed payload remains after shuffling"
+ " Type# 0x%08" PRIx32 " InternalPayload.size# %zu ExternalPayload.size# %zu",
+ descr.Type, pendingEvent.InternalPayload.size(), pendingEvent.ExternalPayload.size());
+ Y_VERIFY_DEBUG(false);
+ throw TExReestablishConnection{TDisconnectReason::FormatError()};
+ }
+ }
+
+ // we add any remains of internal payload to the end
+ if (auto& rope = pendingEvent.InternalPayload) {
+ rope.ExtractFront(rope.size(), &payload);
+ }
+ // and ensure there is no unprocessed external payload
+ Y_VERIFY(!pendingEvent.ExternalPayload);
+
#if IC_FORCE_HARDENED_PACKET_CHECKS
- if (descr.Len != pendingEvent.Payload.GetSize()) {
+ if (descr.Len != payload.GetSize()) {
LOG_CRIT_IC_SESSION("ICIS17", "event length mismatch Type# 0x%08" PRIx32 " received# %zu expected# %" PRIu32,
- descr.Type, pendingEvent.Payload.GetSize(), descr.Len);
+ descr.Type, payload.GetSize(), descr.Len);
throw TExReestablishConnection{TDisconnectReason::ChecksumError()};
}
#endif
if (descr.Checksum) {
ui32 checksum = 0;
- for (const auto&& [data, size] : pendingEvent.Payload) {
+ for (const auto&& [data, size] : payload) {
checksum = Crc32cExtendMSanCompatible(checksum, data, size);
}
if (checksum != descr.Checksum) {
@@ -633,7 +678,7 @@ namespace NActors {
descr.Flags & ~IEventHandle::FlagExtendedFormat,
descr.Recipient,
descr.Sender,
- MakeIntrusive<TEventSerializedData>(std::move(pendingEvent.Payload), std::move(pendingEvent.SerializationInfo)),
+ MakeIntrusive<TEventSerializedData>(std::move(payload), std::move(pendingEvent.SerializationInfo)),
descr.Cookie,
Params.PeerScopeId,
std::move(descr.TraceId));
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h
index adc9fd3710..e1c555e629 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h
@@ -126,7 +126,8 @@ namespace NActors {
struct TPerChannelContext {
struct TPendingEvent {
TEventSerializationInfo SerializationInfo;
- TRope Payload;
+ TRope InternalPayload;
+ TRope ExternalPayload;
std::optional<TEventData> EventData;
// number of bytes remaining through XDC channel
diff --git a/library/cpp/actors/interconnect/types.h b/library/cpp/actors/interconnect/types.h
index 7628fbd968..14b1a1c7a6 100644
--- a/library/cpp/actors/interconnect/types.h
+++ b/library/cpp/actors/interconnect/types.h
@@ -57,6 +57,7 @@ namespace NActors {
bool AuthOnly = {};
bool UseExternalDataChannel = {};
bool UseXxhash = {};
+ bool UseXdcShuffle = {};
TString AuthCN;
NActors::TScopeId PeerScopeId;
};