aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-04-25 13:50:27 +0300
committeralexvru <alexvru@ydb.tech>2023-04-25 13:50:27 +0300
commit962dae07e71621bd25c81feee4c6f3355c94a73c (patch)
treea083a671f2eb0d1ded6208ae8037dc16e395c513
parente33f5f1a073db44a35c1616b2f391751779a866f (diff)
downloadydb-962dae07e71621bd25c81feee4c6f3355c94a73c.tar.gz
Support XDC streams
-rw-r--r--library/cpp/actors/core/event_pb.cpp21
-rw-r--r--library/cpp/actors/core/event_pb.h161
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.cpp258
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.h38
-rw-r--r--library/cpp/actors/interconnect/interconnect_stream.cpp14
-rw-r--r--library/cpp/actors/interconnect/interconnect_stream.h7
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp750
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp15
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_proxy.h2
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp384
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.h162
-rw-r--r--library/cpp/actors/interconnect/outgoing_stream.h83
-rw-r--r--library/cpp/actors/interconnect/packet.h90
-rw-r--r--library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp2
14 files changed, 1436 insertions, 551 deletions
diff --git a/library/cpp/actors/core/event_pb.cpp b/library/cpp/actors/core/event_pb.cpp
index 018ff9ac34..4e37f36f70 100644
--- a/library/cpp/actors/core/event_pb.cpp
+++ b/library/cpp/actors/core/event_pb.cpp
@@ -77,11 +77,23 @@ namespace NActors {
if (CancelFlag || AbortFlag) {
return false;
} else if (const size_t bytesToAppend = Min<size_t>(size, SizeRemain)) {
- if (!Produce(data, bytesToAppend)) {
- return false;
+ if ((reinterpret_cast<uintptr_t>(data) & 63) + bytesToAppend <= 2 * 64) {
+ memcpy(BufferPtr, data, bytesToAppend);
+
+ if (!Produce(BufferPtr, bytesToAppend)) {
+ return false;
+ }
+
+ BufferPtr += bytesToAppend;
+ data = static_cast<const char*>(data) + bytesToAppend;
+ size -= bytesToAppend;
+ } else {
+ if (!Produce(data, bytesToAppend)) {
+ return false;
+ }
+ data = static_cast<const char*>(data) + bytesToAppend;
+ size -= bytesToAppend;
}
- data = static_cast<const char*>(data) + bytesToAppend;
- size -= bytesToAppend;
} else {
InnerContext.SwitchTo(BufFeedContext);
}
@@ -148,6 +160,7 @@ namespace NActors {
// fill in base params
BufferPtr = static_cast<char*>(data);
SizeRemain = size;
+ Y_VERIFY_DEBUG(size);
// transfer control to the coroutine
Y_VERIFY(Event);
diff --git a/library/cpp/actors/core/event_pb.h b/library/cpp/actors/core/event_pb.h
index 8fd43aa34d..87e924b1e5 100644
--- a/library/cpp/actors/core/event_pb.h
+++ b/library/cpp/actors/core/event_pb.h
@@ -161,53 +161,7 @@ namespace NActors {
}
bool SerializeToArcadiaStream(TChunkSerializer* chunker) const override {
- // serialize payload first
- if (Payload) {
- void *data;
- int size = 0;
- auto append = [&](const char *p, size_t len) {
- while (len) {
- if (size) {
- const size_t numBytesToCopy = std::min<size_t>(size, len);
- memcpy(data, p, numBytesToCopy);
- data = static_cast<char*>(data) + numBytesToCopy;
- size -= numBytesToCopy;
- p += numBytesToCopy;
- len -= numBytesToCopy;
- } else if (!chunker->Next(&data, &size)) {
- return false;
- }
- }
- return true;
- };
- auto appendNumber = [&](size_t number) {
- char buf[MaxNumberBytes];
- return append(buf, SerializeNumber(number, buf));
- };
- char marker = PayloadMarker;
- append(&marker, 1);
- if (!appendNumber(Payload.size())) {
- return false;
- }
- for (const TRope& rope : Payload) {
- if (!appendNumber(rope.GetSize())) {
- return false;
- }
- if (rope) {
- if (size) {
- chunker->BackUp(std::exchange(size, 0));
- }
- if (!chunker->WriteRope(&rope)) {
- return false;
- }
- }
- }
- if (size) {
- chunker->BackUp(size);
- }
- }
-
- return Record.SerializeToZeroCopyStream(chunker);
+ return SerializeToArcadiaStreamImpl(chunker, TString());
}
ui32 CalculateSerializedSize() const override {
@@ -285,25 +239,7 @@ namespace NActors {
}
TEventSerializationInfo CreateSerializationInfo() const override {
- TEventSerializationInfo info;
-
- if (Payload) {
- char temp[MaxNumberBytes];
- info.Sections.push_back(TEventSectionInfo{0, 1, 0, 0}); // payload marker
- info.Sections.push_back(TEventSectionInfo{0, SerializeNumber(Payload.size(), temp), 0, 0});
- for (const TRope& payload : Payload) {
- info.Sections.push_back(TEventSectionInfo{0, SerializeNumber(payload.GetSize(), temp), 0, 0}); // length
- info.Sections.push_back(TEventSectionInfo{0, payload.GetSize(), 0, 0}); // data
- }
- info.IsExtendedFormat = true;
- } else {
- info.IsExtendedFormat = false;
- }
-
- const int byteSize = Max(0, Record.ByteSize());
- info.Sections.push_back(TEventSectionInfo{0, static_cast<size_t>(byteSize), 0, 0});
-
- return info;
+ return CreateSerializationInfoImpl(0);
}
public:
@@ -332,6 +268,93 @@ namespace NActors {
}
protected:
+ TEventSerializationInfo CreateSerializationInfoImpl(size_t preserializedSize) const {
+ TEventSerializationInfo info;
+
+ if (Payload) {
+ char temp[MaxNumberBytes];
+ info.Sections.push_back(TEventSectionInfo{0, 1 + SerializeNumber(Payload.size(), temp), 0, 0}); // payload marker and rope count
+ for (const TRope& rope : Payload) {
+ const size_t ropeSize = rope.GetSize();
+ info.Sections.back().Size += SerializeNumber(ropeSize, temp);
+ info.Sections.push_back(TEventSectionInfo{0, ropeSize, 0, 0}); // data as a separate section
+ }
+ info.IsExtendedFormat = true;
+ } else {
+ info.IsExtendedFormat = false;
+ }
+
+ const size_t byteSize = Max<ssize_t>(0, Record.ByteSize()) + preserializedSize;
+ info.Sections.push_back(TEventSectionInfo{0, byteSize, 0, 0}); // protobuf itself
+
+#ifndef NDEBUG
+ size_t total = 0;
+ for (const auto& section : info.Sections) {
+ total += section.Size;
+ }
+ size_t serialized = CalculateSerializedSize();
+ Y_VERIFY(total == serialized, "total# %zu serialized# %zu byteSize# %zd Payload.size# %zu", total,
+ serialized, byteSize, Payload.size());
+#endif
+
+ return info;
+ }
+
+ bool SerializeToArcadiaStreamImpl(TChunkSerializer* chunker, const TString& preserialized) const {
+ // serialize payload first
+ if (Payload) {
+ void *data;
+ int size = 0;
+ auto append = [&](const char *p, size_t len) {
+ while (len) {
+ if (size) {
+ const size_t numBytesToCopy = std::min<size_t>(size, len);
+ memcpy(data, p, numBytesToCopy);
+ data = static_cast<char*>(data) + numBytesToCopy;
+ size -= numBytesToCopy;
+ p += numBytesToCopy;
+ len -= numBytesToCopy;
+ } else if (!chunker->Next(&data, &size)) {
+ return false;
+ }
+ }
+ return true;
+ };
+ auto appendNumber = [&](size_t number) {
+ char buf[MaxNumberBytes];
+ return append(buf, SerializeNumber(number, buf));
+ };
+ char marker = PayloadMarker;
+ append(&marker, 1);
+ if (!appendNumber(Payload.size())) {
+ return false;
+ }
+ for (const TRope& rope : Payload) {
+ if (!appendNumber(rope.GetSize())) {
+ return false;
+ }
+ if (rope) {
+ if (size) {
+ chunker->BackUp(std::exchange(size, 0));
+ }
+ if (!chunker->WriteRope(&rope)) {
+ return false;
+ }
+ }
+ }
+ if (size) {
+ chunker->BackUp(size);
+ }
+ }
+
+ if (preserialized && !chunker->WriteString(&preserialized)) {
+ return false;
+ }
+
+ return Record.SerializeToZeroCopyStream(chunker);
+ }
+
+ protected:
mutable size_t CachedByteSize = 0;
static constexpr char PayloadMarker = 0x07;
@@ -488,7 +511,7 @@ namespace NActors {
}
bool SerializeToArcadiaStream(TChunkSerializer* chunker) const override {
- return chunker->WriteString(&PreSerializedData) && TBase::SerializeToArcadiaStream(chunker);
+ return TBase::SerializeToArcadiaStreamImpl(chunker, PreSerializedData);
}
ui32 CalculateSerializedSize() const override {
@@ -502,6 +525,10 @@ namespace NActors {
ui32 CalculateSerializedSizeCached() const override {
return GetCachedByteSize();
}
+
+ TEventSerializationInfo CreateSerializationInfo() const override {
+ return TBase::CreateSerializationInfoImpl(PreSerializedData.size());
+ }
};
inline TActorId ActorIdFromProto(const NActorsProto::TActorId& actorId) {
diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp
index 64a6bbbd09..371f9b7653 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.cpp
+++ b/library/cpp/actors/interconnect/interconnect_channel.cpp
@@ -13,16 +13,13 @@ LWTRACE_USING(ACTORLIB_PROVIDER);
namespace NActors {
bool TEventOutputChannel::FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) {
const size_t amount = sizeof(TChannelPart) + sizeof(TEventDescr2);
- if (task.GetVirtualFreeAmount() < amount) {
+ if (task.GetInternalFreeAmount() < amount) {
return false;
}
auto traceId = event.Span.GetTraceId();
event.Span.EndOk();
- LWTRACK(SerializeToPacketEnd, event.Orbit, PeerNodeId, ChannelId, OutputQueueSize, task.GetDataSize());
- task.Orbit.Take(event.Orbit);
-
Y_VERIFY(SerializationInfo);
const ui32 flags = (event.Descr.Flags & ~IEventHandle::FlagForwardOnNondelivery) |
(SerializationInfo->IsExtendedFormat ? IEventHandle::FlagExtendedFormat : 0);
@@ -41,16 +38,16 @@ namespace NActors {
// and channel header before the descriptor
TChannelPart part{
- .Channel = static_cast<ui16>(ChannelId | TChannelPart::LastPartFlag),
- .Size = sizeof(descr),
+ .ChannelFlags = static_cast<ui16>(ChannelId | TChannelPart::LastPartFlag),
+ .Size = sizeof(descr)
};
// append them to the packet
- task.Write(&part, sizeof(part));
- task.Write(&descr, sizeof(descr));
+ task.Write(false, &part, sizeof(part));
+ task.Write(false, &descr, sizeof(descr));
*weightConsumed += amount;
- OutputQueueSize -= part.Size;
+ OutputQueueSize -= sizeof(TEventDescr2);
Metrics->UpdateOutputChannelEvents(ChannelId);
return true;
@@ -71,106 +68,231 @@ namespace NActors {
switch (State) {
case EState::INITIAL:
event.InitChecksum();
- LWTRACK(SerializeToPacketBegin, event.Orbit, PeerNodeId, ChannelId, OutputQueueSize);
if (event.Buffer) {
- State = EState::BUFFER;
+ State = EState::BODY;
Iter = event.Buffer->GetBeginIter();
SerializationInfo = &event.Buffer->GetSerializationInfo();
+ EventInExternalDataChannel = !SerializationInfo->Sections.empty() && Params.UseExternalDataChannel;
} else if (event.Event) {
- State = EState::CHUNKER;
+ State = EState::BODY;
IEventBase *base = event.Event.Get();
if (event.EventSerializedSize) {
Chunker.SetSerializingEvent(base);
}
SerializationInfoContainer = base->CreateSerializationInfo();
SerializationInfo = &SerializationInfoContainer;
+ EventInExternalDataChannel = !SerializationInfo->Sections.empty() && Params.UseExternalDataChannel;
} else { // event without buffer and IEventBase instance
State = EState::DESCRIPTOR;
SerializationInfoContainer = {};
SerializationInfo = &SerializationInfoContainer;
+ EventInExternalDataChannel = false;
}
if (!event.EventSerializedSize) {
State = EState::DESCRIPTOR;
+ } else if (EventInExternalDataChannel) {
+ State = EState::SECTIONS;
+ SectionIndex = 0;
}
break;
- case EState::CHUNKER:
- case EState::BUFFER: {
- if (task.GetVirtualFreeAmount() <= sizeof(TChannelPart)) {
+ case EState::BODY:
+ if (FeedPayload(task, event, weightConsumed)) {
+ State = EState::DESCRIPTOR;
+ } else {
return false;
}
+ break;
- TChannelPart part{
- .Channel = ChannelId,
- .Size = 0,
- };
+ case EState::DESCRIPTOR:
+ if (!FeedDescriptor(task, event, weightConsumed)) {
+ return false;
+ }
+ event.Serial = serial;
+ NotYetConfirmed.splice(NotYetConfirmed.end(), Queue, Queue.begin()); // move event to not-yet-confirmed queue
+ SerializationInfoContainer = {};
+ SerializationInfo = nullptr;
+ State = EState::INITIAL;
+ return true; // we have processed whole event, signal to the caller
- auto partBookmark = task.Bookmark(sizeof(part));
+ case EState::SECTIONS: {
+ if (SectionIndex == 0) {
+ size_t totalSectionSize = 0;
+ for (const auto& section : SerializationInfo->Sections) {
+ totalSectionSize += section.Size;
+ }
+ Y_VERIFY(totalSectionSize == event.EventSerializedSize);
+ }
- auto addChunk = [&](const void *data, size_t len) {
- event.UpdateChecksum(data, len);
- task.Append(data, len);
- part.Size += len;
+ while (SectionIndex != SerializationInfo->Sections.size()) {
+ char sectionInfo[1 + NInterconnect::NDetail::MaxNumberBytes * 4];
+ char *p = sectionInfo;
- event.EventActuallySerialized += len;
- if (event.EventActuallySerialized > MaxSerializedEventSize) {
- throw TExSerializedEventTooLarge(event.Descr.Type);
- }
- };
+ const auto& section = SerializationInfo->Sections[SectionIndex];
+ *p++ = static_cast<ui8>(EXdcCommand::DECLARE_SECTION);
+ p += NInterconnect::NDetail::SerializeNumber(section.Headroom, p);
+ p += NInterconnect::NDetail::SerializeNumber(section.Size, p);
+ p += NInterconnect::NDetail::SerializeNumber(section.Tailroom, p);
+ p += NInterconnect::NDetail::SerializeNumber(section.Alignment, p);
+ Y_VERIFY(p <= std::end(sectionInfo));
- bool complete = false;
- if (State == EState::CHUNKER) {
- while (!complete && !task.IsFull()) {
- TMutableContiguousSpan out = task.AcquireSpanForWriting();
- const auto [first, last] = Chunker.FeedBuf(out.data(), out.size());
- for (auto p = first; p != last; ++p) {
- addChunk(p->first, p->second);
- }
- complete = Chunker.IsComplete();
- }
- Y_VERIFY(!complete || Chunker.IsSuccessfull());
- Y_VERIFY_DEBUG(complete || task.IsFull());
- } else { // BUFFER
- while (const size_t numb = Min(task.GetVirtualFreeAmount(), Iter.ContiguousSize())) {
- const char *obuf = Iter.ContiguousData();
- addChunk(obuf, numb);
- Iter += numb;
+ const size_t declareLen = p - sectionInfo;
+ if (sizeof(TChannelPart) + XdcData.size() + declareLen <= task.GetInternalFreeAmount() &&
+ XdcData.size() + declareLen <= Max<ui16>()) {
+ XdcData.insert(XdcData.end(), sectionInfo, p);
+ ++SectionIndex;
+ } else {
+ break;
}
- complete = !Iter.Valid();
}
- if (complete) {
- Y_VERIFY(event.EventActuallySerialized == event.EventSerializedSize,
- "EventActuallySerialized# %" PRIu32 " EventSerializedSize# %" PRIu32 " Type# 0x%08" PRIx32,
- event.EventActuallySerialized, event.EventSerializedSize, event.Descr.Type);
+
+ if (XdcData.empty()) {
+ return false;
}
- Y_VERIFY_DEBUG(part.Size);
- task.WriteBookmark(std::exchange(partBookmark, {}), &part, sizeof(part));
- *weightConsumed += sizeof(TChannelPart) + part.Size;
- OutputQueueSize -= part.Size;
- if (complete) {
- State = EState::DESCRIPTOR;
+ TChannelPart part{
+ .ChannelFlags = static_cast<ui16>(ChannelId | TChannelPart::XdcFlag),
+ .Size = static_cast<ui16>(XdcData.size())
+ };
+ task.Write(false, &part, sizeof(part));
+ task.Write(false, XdcData.data(), XdcData.size());
+ XdcData.clear();
+
+ if (SectionIndex == SerializationInfo->Sections.size()) {
+ State = EState::BODY;
}
+
break;
}
+ }
+ }
+ }
- case EState::DESCRIPTOR:
- if (!FeedDescriptor(task, event, weightConsumed)) {
- return false;
- }
- event.Serial = serial;
- NotYetConfirmed.splice(NotYetConfirmed.end(), Queue, Queue.begin()); // move event to not-yet-confirmed queue
- SerializationInfoContainer = {};
- SerializationInfo = nullptr;
- State = EState::INITIAL;
- return true; // we have processed whole event, signal to the caller
+ bool TEventOutputChannel::SerializeEvent(TTcpPacketOutTask& task, TEventHolder& event, bool external, size_t *bytesSerialized) {
+ auto addChunk = [&](const void *data, size_t len, bool allowCopy) {
+ event.UpdateChecksum(data, len);
+ if (allowCopy && (reinterpret_cast<uintptr_t>(data) & 63) + len <= 2 * 64) {
+ task.Write(external, data, len);
+ } else {
+ task.Append(external, data, len);
}
+ *bytesSerialized += len;
+
+ event.EventActuallySerialized += len;
+ if (event.EventActuallySerialized > MaxSerializedEventSize) {
+ throw TExSerializedEventTooLarge(event.Descr.Type);
+ }
+ };
+
+ bool complete = false;
+ if (event.Event) {
+ while (!complete) {
+ TMutableContiguousSpan out = task.AcquireSpanForWriting(external);
+ if (!out.size()) {
+ break;
+ }
+ const auto [first, last] = Chunker.FeedBuf(out.data(), out.size());
+ for (auto p = first; p != last; ++p) {
+ addChunk(p->first, p->second, false);
+ }
+ complete = Chunker.IsComplete();
+ if (complete) {
+ Y_VERIFY(Chunker.IsSuccessfull());
+ }
+ }
+ } else if (event.Buffer) {
+ while (const size_t numb = Min(external ? task.GetExternalFreeAmount() : task.GetInternalFreeAmount(),
+ Iter.ContiguousSize())) {
+ const char *obuf = Iter.ContiguousData();
+ addChunk(obuf, numb, true);
+ Iter += numb;
+ }
+ complete = !Iter.Valid();
+ } else {
+ Y_FAIL();
+ }
+ if (complete) {
+ Y_VERIFY(event.EventActuallySerialized == event.EventSerializedSize,
+ "EventActuallySerialized# %" PRIu32 " EventSerializedSize# %" PRIu32 " Type# 0x%08" PRIx32,
+ event.EventActuallySerialized, event.EventSerializedSize, event.Descr.Type);
+ }
+
+ return complete;
+ }
+
+ bool TEventOutputChannel::FeedPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) {
+ return EventInExternalDataChannel
+ ? FeedExternalPayload(task, event, weightConsumed)
+ : FeedInlinePayload(task, event, weightConsumed);
+ }
+
+ bool TEventOutputChannel::FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) {
+ if (task.GetInternalFreeAmount() <= sizeof(TChannelPart)) {
+ return false;
+ }
+
+ auto partBookmark = task.Bookmark(sizeof(TChannelPart));
+
+ size_t bytesSerialized = 0;
+ const bool complete = SerializeEvent(task, event, false, &bytesSerialized);
+
+ Y_VERIFY_DEBUG(bytesSerialized);
+ Y_VERIFY(bytesSerialized <= Max<ui16>());
+
+ TChannelPart part{
+ .ChannelFlags = ChannelId,
+ .Size = static_cast<ui16>(bytesSerialized)
+ };
+
+ task.WriteBookmark(std::exchange(partBookmark, {}), &part, sizeof(part));
+ *weightConsumed += sizeof(TChannelPart) + part.Size;
+ OutputQueueSize -= part.Size;
+
+ return complete;
+ }
+
+ bool TEventOutputChannel::FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) {
+ const size_t partSize = sizeof(TChannelPart) + sizeof(ui8) + sizeof(ui16) + (Params.Encryption ? 0 : sizeof(ui32));
+ if (task.GetInternalFreeAmount() < partSize || task.GetExternalFreeAmount() == 0) {
+ return false;
+ }
+
+ auto partBookmark = task.Bookmark(partSize);
+
+ size_t bytesSerialized = 0;
+ const bool complete = SerializeEvent(task, event, true, &bytesSerialized);
+
+ Y_VERIFY(0 < bytesSerialized && bytesSerialized <= Max<ui16>());
+
+ char buffer[partSize];
+ TChannelPart *part = reinterpret_cast<TChannelPart*>(buffer);
+ *part = {
+ .ChannelFlags = static_cast<ui16>(ChannelId | TChannelPart::XdcFlag),
+ .Size = static_cast<ui16>(partSize - sizeof(TChannelPart))
+ };
+ char *ptr = reinterpret_cast<char*>(part + 1);
+ *ptr++ = static_cast<ui8>(EXdcCommand::PUSH_DATA);
+ *reinterpret_cast<ui16*>(ptr) = bytesSerialized;
+ ptr += sizeof(ui16);
+ if (!Params.Encryption) {
+ ui32 checksum = 0;
+ task.XdcStream.ScanLastBytes(bytesSerialized, [&](TContiguousSpan span) {
+ checksum = Crc32cExtendMSanCompatible(checksum, span.data(), span.size());
+ });
+ *reinterpret_cast<ui32*>(ptr) = checksum;
}
+
+ task.WriteBookmark(std::move(partBookmark), buffer, partSize);
+
+ *weightConsumed += partSize + bytesSerialized;
+ OutputQueueSize -= bytesSerialized;
+
+ return complete;
}
void TEventOutputChannel::NotifyUndelivered() {
LOG_DEBUG_IC_SESSION("ICOCH89", "Notyfying about Undelivered messages! NotYetConfirmed size: %zu, Queue size: %zu", NotYetConfirmed.size(), Queue.size());
- if (State == EState::CHUNKER) {
+ if (State == EState::BODY && Queue.front().Event) {
Y_VERIFY(!Chunker.IsComplete()); // chunk must have an event being serialized
Y_VERIFY(!Queue.empty()); // this event must be the first event in queue
TEventHolder& event = Queue.front();
diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h
index 7612c31c76..48074b05b9 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.h
+++ b/library/cpp/actors/interconnect/interconnect_channel.h
@@ -17,20 +17,37 @@
namespace NActors {
#pragma pack(push, 1)
+
struct TChannelPart {
- ui16 Channel;
+ ui16 ChannelFlags;
ui16 Size;
- static constexpr ui16 LastPartFlag = ui16(1) << 15;
+ static constexpr ui16 LastPartFlag = 0x8000;
+ static constexpr ui16 XdcFlag = 0x4000;
+ static constexpr ui16 ChannelMask = (1 << IEventHandle::ChannelBits) - 1;
+
+ static_assert((LastPartFlag & ChannelMask) == 0);
+ static_assert((XdcFlag & ChannelMask) == 0);
+
+ ui16 GetChannel() const { return ChannelFlags & ChannelMask; }
+ bool IsLastPart() const { return ChannelFlags & LastPartFlag; }
+ bool IsXdc() const { return ChannelFlags & XdcFlag; }
TString ToString() const {
- return TStringBuilder() << "{Channel# " << (Channel & ~LastPartFlag)
- << " LastPartFlag# " << ((Channel & LastPartFlag) ? "true" : "false")
+ return TStringBuilder() << "{Channel# " << GetChannel()
+ << " IsLastPart# " << IsLastPart()
+ << " IsXdc# " << IsXdc()
<< " Size# " << Size << "}";
}
};
+
#pragma pack(pop)
+ enum class EXdcCommand : ui8 {
+ DECLARE_SECTION = 1,
+ PUSH_DATA,
+ };
+
struct TExSerializedEventTooLarge : std::exception {
const ui32 Type;
@@ -101,9 +118,9 @@ namespace NActors {
enum class EState {
INITIAL,
- CHUNKER,
- BUFFER,
+ BODY,
DESCRIPTOR,
+ SECTIONS,
};
EState State = EState::INITIAL;
@@ -116,6 +133,15 @@ namespace NActors {
TCoroutineChunkSerializer Chunker;
TEventSerializationInfo SerializationInfoContainer;
const TEventSerializationInfo *SerializationInfo = nullptr;
+ bool EventInExternalDataChannel;
+ size_t SectionIndex = 0;
+ std::vector<char> XdcData;
+
+ bool SerializeEvent(TTcpPacketOutTask& task, TEventHolder& event, bool external, size_t *bytesSerialized);
+
+ bool FeedPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
+ bool FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
+ bool FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
bool FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
diff --git a/library/cpp/actors/interconnect/interconnect_stream.cpp b/library/cpp/actors/interconnect/interconnect_stream.cpp
index ad46453acb..96ee13a3f5 100644
--- a/library/cpp/actors/interconnect/interconnect_stream.cpp
+++ b/library/cpp/actors/interconnect/interconnect_stream.cpp
@@ -1,5 +1,6 @@
#include "interconnect_stream.h"
#include "logging.h"
+#include "poller_actor.h"
#include <library/cpp/openssl/init/init.h>
#include <util/network/socket.h>
#include <openssl/ssl.h>
@@ -209,6 +210,10 @@ namespace NInterconnect {
return res;
}
+ void TStreamSocket::Request(NActors::TPollerToken& token, bool read, bool write) {
+ token.Request(read, write);
+ }
+
//////////////////////////////////////////////////////
TDatagramSocket::TPtr TDatagramSocket::Make(int domain) {
@@ -478,6 +483,9 @@ namespace NInterconnect {
std::optional<std::pair<const void*, size_t>> BlockedSend;
ssize_t Send(const void* msg, size_t len, TString *err) {
+ if (BlockedSend && BlockedSend->first == msg && BlockedSend->second < len) {
+ len = BlockedSend->second;
+ }
Y_VERIFY(!BlockedSend || *BlockedSend == std::make_pair(msg, len));
const ssize_t res = Operate(msg, len, &SSL_write_ex, err);
if (res == -EAGAIN) {
@@ -491,6 +499,9 @@ namespace NInterconnect {
std::optional<std::pair<void*, size_t>> BlockedReceive;
ssize_t Recv(void* msg, size_t len, TString *err) {
+ if (BlockedReceive && BlockedReceive->first == msg && BlockedReceive->second < len) {
+ len = BlockedReceive->second;
+ }
Y_VERIFY(!BlockedReceive || *BlockedReceive == std::make_pair(msg, len));
const ssize_t res = Operate(msg, len, &SSL_read_ex, err);
if (res == -EAGAIN) {
@@ -628,4 +639,7 @@ namespace NInterconnect {
return Impl->WantWrite();
}
+ void TSecureSocket::Request(NActors::TPollerToken& token, bool /*read*/, bool /*write*/) {
+ token.Request(WantRead(), WantWrite());
+ }
}
diff --git a/library/cpp/actors/interconnect/interconnect_stream.h b/library/cpp/actors/interconnect/interconnect_stream.h
index 3ba7914f77..55438fef10 100644
--- a/library/cpp/actors/interconnect/interconnect_stream.h
+++ b/library/cpp/actors/interconnect/interconnect_stream.h
@@ -14,6 +14,10 @@
#include <sys/uio.h>
+namespace NActors {
+ class TPollerToken;
+}
+
namespace NInterconnect {
class TSocket: public NActors::TSharedDescriptor, public TNonCopyable {
protected:
@@ -63,6 +67,8 @@ namespace NInterconnect {
void SetSendBufferSize(i32 len) const;
ui32 GetSendBufferSize() const;
+
+ virtual void Request(NActors::TPollerToken& token, bool read, bool write);
};
class TSecureSocketContext {
@@ -114,6 +120,7 @@ namespace NInterconnect {
bool WantRead() const;
bool WantWrite() const;
+ virtual void Request(NActors::TPollerToken& token, bool read, bool write) override;
};
class TDatagramSocket: public TSocket {
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
index 34770133ae..925dbef823 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
@@ -6,6 +6,67 @@
namespace NActors {
LWTRACE_USING(ACTORLIB_PROVIDER);
+ void TReceiveContext::TPerChannelContext::CalculateBytesToCatch() {
+ XdcBytesToCatch = FetchOffset;
+ for (auto it = XdcBuffers.begin(), end = it + FetchIndex; it != end; ++it) {
+ XdcBytesToCatch += it->size();
+ }
+ }
+
+ void TReceiveContext::TPerChannelContext::FetchBuffers(ui16 channel, size_t numBytes,
+ std::deque<std::tuple<ui16, TMutableContiguousSpan>>& outQ) {
+ Y_VERIFY_DEBUG(numBytes);
+ auto it = XdcBuffers.begin() + FetchIndex;
+ for (;;) {
+ Y_VERIFY_DEBUG(it != XdcBuffers.end());
+ const TMutableContiguousSpan span = it->SubSpan(FetchOffset, numBytes);
+ outQ.emplace_back(channel, span);
+ numBytes -= span.size();
+ FetchOffset += span.size();
+ if (FetchOffset == it->size()) {
+ ++FetchIndex;
+ ++it;
+ FetchOffset = 0;
+ }
+ if (!numBytes) {
+ break;
+ }
+ }
+ }
+
+ void TReceiveContext::TPerChannelContext::DropFront(TRope *from, size_t numBytes) {
+ size_t n = numBytes;
+ for (auto& pendingEvent : PendingEvents) {
+ const size_t numBytesInEvent = Min(n, pendingEvent.XdcSizeLeft);
+ pendingEvent.XdcSizeLeft -= numBytesInEvent;
+ n -= numBytesInEvent;
+ if (!n) {
+ break;
+ }
+ }
+
+ while (numBytes) {
+ Y_VERIFY_DEBUG(!XdcBuffers.empty());
+ auto& front = XdcBuffers.front();
+ if (from) {
+ from->ExtractFrontPlain(front.data(), Min(numBytes, front.size()));
+ }
+ if (numBytes < front.size()) {
+ front = front.SubSpan(numBytes, Max<size_t>());
+ if (!FetchIndex) { // we are sending this very buffer, adjust sending offset
+ Y_VERIFY_DEBUG(numBytes <= FetchOffset);
+ FetchOffset -= numBytes;
+ }
+ break;
+ } else {
+ numBytes -= front.size();
+ Y_VERIFY_DEBUG(FetchIndex);
+ --FetchIndex;
+ XdcBuffers.pop_front();
+ }
+ }
+ }
+
TInputSessionTCP::TInputSessionTCP(const TActorId& sessionId, TIntrusivePtr<NInterconnect::TStreamSocket> socket,
TIntrusivePtr<NInterconnect::TStreamSocket> xdcSocket, TIntrusivePtr<TReceiveContext> context,
TInterconnectProxyCommon::TPtr common, std::shared_ptr<IInterconnectMetrics> metrics, ui32 nodeId,
@@ -24,8 +85,7 @@ namespace NActors {
Y_VERIFY(Context);
Y_VERIFY(Socket);
Y_VERIFY(SessionId);
-
- AtomicSet(Context->PacketsReadFromSocket, 0);
+ Y_VERIFY(!Params.UseExternalDataChannel == !XdcSocket);
Metrics->SetClockSkewMicrosec(0);
@@ -34,6 +94,16 @@ namespace NActors {
// ensure that we do not spawn new session while the previous one is still alive
TAtomicBase sessions = AtomicIncrement(Context->NumInputSessions);
Y_VERIFY(sessions == 1, "sessions# %" PRIu64, ui64(sessions));
+
+ // calculate number of bytes to catch
+ for (auto& context : Context->ChannelArray) {
+ context.CalculateBytesToCatch();
+ }
+ for (auto& [channel, context] : Context->ChannelMap) {
+ context.CalculateBytesToCatch();
+ }
+
+ UsageHisto.fill(0);
}
void TInputSessionTCP::Bootstrap() {
@@ -41,7 +111,28 @@ namespace NActors {
Become(&TThis::WorkingState, DeadPeerTimeout, new TEvCheckDeadPeer);
LOG_DEBUG_IC_SESSION("ICIS01", "InputSession created");
LastReceiveTimestamp = TActivationContext::Monotonic();
- ReceiveData();
+ TActivationContext::Send(new IEventHandle(EvResumeReceiveData, 0, SelfId(), {}, nullptr, 0));
+ }
+
+ STATEFN(TInputSessionTCP::WorkingState) {
+ std::unique_ptr<IEventBase> termEv;
+
+ try {
+ WorkingStateImpl(ev);
+ } catch (const TExReestablishConnection& ex) {
+ LOG_DEBUG_IC_SESSION("ICIS09", "ReestablishConnection, reason# %s", ex.Reason.ToString().data());
+ termEv = std::make_unique<TEvSocketDisconnect>(std::move(ex.Reason));
+ } catch (const TExDestroySession& ex) {
+ LOG_DEBUG_IC_SESSION("ICIS13", "DestroySession, reason# %s", ex.Reason.ToString().data());
+ termEv.reset(TInterconnectSessionTCP::NewEvTerminate(std::move(ex.Reason)));
+ }
+
+ if (termEv) {
+ AtomicDecrement(Context->NumInputSessions);
+ Send(SessionId, termEv.release());
+ PassAway();
+ Socket.Reset();
+ }
}
void TInputSessionTCP::CloseInputSession() {
@@ -50,32 +141,42 @@ namespace NActors {
}
void TInputSessionTCP::Handle(TEvPollerReady::TPtr ev) {
- if (Context->ReadPending) {
+ auto *msg = ev->Get();
+
+ bool useful = false;
+ bool writeBlocked = false;
+
+ if (msg->Socket == Socket) {
+ useful = std::exchange(Context->MainReadPending, false);
+ writeBlocked = Context->MainWriteBlocked;
+ } else if (msg->Socket == XdcSocket) {
+ useful = std::exchange(Context->XdcReadPending, false);
+ writeBlocked = Context->XdcWriteBlocked;
+ }
+
+ if (useful) {
Metrics->IncUsefulReadWakeups();
} else if (!ev->Cookie) {
Metrics->IncSpuriousReadWakeups();
}
- Context->ReadPending = false;
+
ReceiveData();
- if (Params.Encryption && Context->WriteBlockedByFullSendBuffer && !ev->Cookie) {
- Send(SessionId, ev->Release().Release(), 0, 1);
+
+ if (Params.Encryption && writeBlocked && ev->Sender != SessionId) {
+ Send(SessionId, ev->Release().Release());
}
}
void TInputSessionTCP::Handle(TEvPollerRegisterResult::TPtr ev) {
- const auto& sk = ev->Get()->Socket;
- if (auto *token = sk == Socket ? &PollerToken : sk == XdcSocket ? &XdcPollerToken : nullptr) {
- *token = std::move(ev->Get()->PollerToken);
- } else {
- return;
+ auto *msg = ev->Get();
+ if (msg->Socket == Socket) {
+ PollerToken = std::move(msg->PollerToken);
+ } else if (msg->Socket == XdcSocket) {
+ XdcPollerToken = std::move(msg->PollerToken);
}
ReceiveData();
}
- void TInputSessionTCP::HandleResumeReceiveData() {
- ReceiveData();
- }
-
void TInputSessionTCP::ReceiveData() {
TTimeLimit limit(GetMaxCyclesPerEvent());
ui64 numDataBytes = 0;
@@ -83,35 +184,46 @@ namespace NActors {
LOG_DEBUG_IC_SESSION("ICIS02", "ReceiveData called");
bool enoughCpu = true;
- for (int iteration = 0; Socket; ++iteration) {
- if (iteration && limit.CheckExceeded()) {
+ bool progress = false;
+
+ for (;;) {
+ if (progress && limit.CheckExceeded()) {
// we have hit processing time limit for this message, send notification to resume processing a bit later
- Send(SelfId(), new TEvResumeReceiveData);
+ TActivationContext::Send(new IEventHandle(EvResumeReceiveData, 0, SelfId(), {}, nullptr, 0));
enoughCpu = false;
break;
}
+ // clear iteration progress
+ progress = false;
+
+ // try to process already fetched part from IncomingData
switch (State) {
case EState::HEADER:
if (IncomingData.GetSize() < sizeof(TTcpPacketHeader_v2)) {
break;
} else {
ProcessHeader();
+ progress = true;
+ continue;
}
- continue;
case EState::PAYLOAD:
+ Y_VERIFY_DEBUG(PayloadSize);
if (!IncomingData) {
break;
} else {
- ProcessPayload(numDataBytes);
+ ProcessPayload(&numDataBytes);
+ progress = true;
+ continue;
}
- continue;
}
- // if we have reached this point, it means that we do not have enough data in read buffer; try to obtain some
- if (!ReadMore()) {
- // we have no data from socket, so we have some free time to spend -- preallocate buffers using this time
+ // try to read more data into buffers
+ progress |= ReadMore();
+ progress |= ReadXdc(&numDataBytes);
+
+ if (!progress) { // no progress was made during this iteration
PreallocateBuffers();
break;
}
@@ -183,12 +295,12 @@ namespace NActors {
Checksum = Crc32cExtendMSanCompatible(0, &header, sizeof(header)); // start calculating checksum now
if (!PayloadSize && Checksum != ChecksumExpected) {
LOG_ERROR_IC_SESSION("ICIS10", "payload checksum error");
- return ReestablishConnection(TDisconnectReason::ChecksumError());
+ throw TExReestablishConnection{TDisconnectReason::ChecksumError()};
}
}
if (PayloadSize >= 65536) {
LOG_CRIT_IC_SESSION("ICIS07", "payload is way too big");
- return DestroySession(TDisconnectReason::FormatError());
+ throw TExDestroySession{TDisconnectReason::FormatError()};
}
if (ConfirmedByInput < confirm) {
ConfirmedByInput = confirm;
@@ -205,13 +317,22 @@ namespace NActors {
}
}
if (PayloadSize) {
- const ui64 expected = Context->GetLastProcessedPacketSerial() + 1;
+ const ui64 expected = Context->LastProcessedSerial + 1;
if (serial == 0 || serial > expected) {
LOG_CRIT_IC_SESSION("ICIS06", "packet serial %" PRIu64 ", but %" PRIu64 " expected", serial, expected);
- return DestroySession(TDisconnectReason::FormatError());
+ throw TExDestroySession{TDisconnectReason::FormatError()};
+ }
+ if (Context->LastProcessedSerial <= serial) {
+ XdcCatchStreamFinalPending = true; // we can't switch it right now, only after packet is fully processed
+ }
+ if (serial == expected) {
+ InboundPacketQ.push_back(TInboundPacket{serial, 0});
+ IgnorePayload = false;
+ } else {
+ IgnorePayload = true;
}
- IgnorePayload = serial != expected;
State = EState::PAYLOAD;
+ Y_VERIFY_DEBUG(!Payload);
} else if (serial & TTcpPacketBuf::PingRequestMask) {
Send(SessionId, new TEvProcessPingRequest(serial & ~TTcpPacketBuf::PingRequestMask));
} else if (serial & TTcpPacketBuf::PingResponseMask) {
@@ -221,62 +342,67 @@ namespace NActors {
} else if (serial & TTcpPacketBuf::ClockMask) {
HandleClock(TInstant::MicroSeconds(serial & ~TTcpPacketBuf::ClockMask));
}
+ if (!PayloadSize) {
+ ++PacketsReadFromSocket;
+ }
}
- void TInputSessionTCP::ProcessPayload(ui64& numDataBytes) {
+ void TInputSessionTCP::ProcessPayload(ui64 *numDataBytes) {
const size_t numBytes = Min(PayloadSize, IncomingData.GetSize());
IncomingData.ExtractFront(numBytes, &Payload);
- numDataBytes += numBytes;
+ *numDataBytes += numBytes;
PayloadSize -= numBytes;
if (PayloadSize) {
return; // there is still some data to receive in the Payload rope
}
- State = EState::HEADER; // we'll continue with header next time
+ State = EState::HEADER;
if (!Params.Encryption) { // see if we are checksumming packet body
for (const auto&& [data, size] : Payload) {
Checksum = Crc32cExtendMSanCompatible(Checksum, data, size);
}
if (Checksum != ChecksumExpected) { // validate payload checksum
LOG_ERROR_IC_SESSION("ICIS04", "payload checksum error");
- return ReestablishConnection(TDisconnectReason::ChecksumError());
+ throw TExReestablishConnection{TDisconnectReason::ChecksumError()};
}
}
- if (Y_UNLIKELY(IgnorePayload)) {
- return;
- }
- if (!Context->AdvanceLastProcessedPacketSerial()) {
- return DestroySession(TDisconnectReason::NewSession());
- }
-
- while (Payload && Socket) {
+ while (Payload) {
// extract channel part header from the payload stream
TChannelPart part;
if (!Payload.ExtractFrontPlain(&part, sizeof(part))) {
LOG_CRIT_IC_SESSION("ICIS14", "missing TChannelPart header in payload");
- return DestroySession(TDisconnectReason::FormatError());
- }
- if (!part.Size) { // bogus frame
- continue;
+ throw TExDestroySession{TDisconnectReason::FormatError()};
} else if (Payload.GetSize() < part.Size) {
LOG_CRIT_IC_SESSION("ICIS08", "payload format error ChannelPart# %s", part.ToString().data());
- return DestroySession(TDisconnectReason::FormatError());
+ throw TExDestroySession{TDisconnectReason::FormatError()};
}
- const ui16 channel = part.Channel & ~TChannelPart::LastPartFlag;
- TRope *eventData = channel < Context->ChannelArray.size()
- ? &Context->ChannelArray[channel]
- : &Context->ChannelMap[channel];
+ const ui16 channel = part.GetChannel();
+ auto& context = GetPerChannelContext(channel);
+ auto& pendingEvent = context.PendingEvents.empty() || context.PendingEvents.back().EventData
+ ? context.PendingEvents.emplace_back()
+ : context.PendingEvents.back();
+
+ if (part.IsXdc()) { // external data channel command packet
+ XdcCommands.resize(part.Size);
+ const bool success = Payload.ExtractFrontPlain(XdcCommands.data(), XdcCommands.size());
+ Y_VERIFY(success);
+ ProcessXdcCommand(channel, context);
+ } else if (IgnorePayload) { // throw payload out
+ Payload.EraseFront(part.Size);
+ } else if (!part.IsLastPart()) { // just ordinary inline event data
+ Payload.ExtractFront(part.Size, &pendingEvent.Payload);
+ } else { // event final block
+ TEventDescr2 v2;
+
+ if (part.Size != sizeof(v2)) {
+ LOG_CRIT_IC_SESSION("ICIS11", "incorrect last part of an event");
+ throw TExDestroySession{TDisconnectReason::FormatError()};
+ }
- Metrics->AddInputChannelsIncomingTraffic(channel, sizeof(part) + part.Size);
+ const bool success = Payload.ExtractFrontPlain(&v2, sizeof(v2));
+ Y_VERIFY(success);
- TEventDescr2 v2;
- if (~part.Channel & TChannelPart::LastPartFlag) {
- Payload.ExtractFront(part.Size, eventData);
- } else if (part.Size != sizeof(v2)) {
- LOG_CRIT_IC_SESSION("ICIS11", "incorrect last part of an event");
- return DestroySession(TDisconnectReason::FormatError());
- } else if (Payload.ExtractFrontPlain(&v2, part.Size)) {
- TEventData descr = {
+ pendingEvent.EventData = TEventData{
v2.Type,
v2.Flags,
v2.Recipient,
@@ -287,44 +413,152 @@ namespace NActors {
};
Metrics->IncInputChannelsIncomingEvents(channel);
- ProcessEvent(*eventData, descr);
- *eventData = TRope();
- } else {
- Y_FAIL();
+ ProcessEvents(context);
}
+
+ Metrics->AddInputChannelsIncomingTraffic(channel, sizeof(part) + part.Size);
}
+
+ // mark packet as processed
+ ProcessInboundPacketQ(0);
+ XdcCatchStreamFinal = XdcCatchStreamFinalPending;
+ Context->LastProcessedSerial += !IgnorePayload;
+
+ ++PacketsReadFromSocket;
+ ++DataPacketsReadFromSocket;
+ IgnoredDataPacketsFromSocket += IgnorePayload;
}
- void TInputSessionTCP::ProcessEvent(TRope& data, TEventData& descr) {
- if (descr.Checksum) {
- ui32 checksum = 0;
- for (const auto&& [data, size] : data) {
- checksum = Crc32cExtendMSanCompatible(checksum, data, size);
- }
- if (checksum != descr.Checksum) {
- LOG_CRIT_IC_SESSION("ICIS05", "event checksum error");
- return ReestablishConnection(TDisconnectReason::ChecksumError());
- }
- }
- TEventSerializationInfo serializationInfo{
- .IsExtendedFormat = bool(descr.Flags & IEventHandle::FlagExtendedFormat),
- };
- auto ev = std::make_unique<IEventHandle>(SessionId,
- descr.Type,
- descr.Flags & ~IEventHandle::FlagExtendedFormat,
- descr.Recipient,
- descr.Sender,
- MakeIntrusive<TEventSerializedData>(std::move(data), std::move(serializationInfo)),
- descr.Cookie,
- Params.PeerScopeId,
- std::move(descr.TraceId));
- if (Common->EventFilter && !Common->EventFilter->CheckIncomingEvent(*ev, Common->LocalScopeId)) {
- LOG_CRIT_IC_SESSION("ICIC03", "Event dropped due to scope error LocalScopeId# %s PeerScopeId# %s Type# 0x%08" PRIx32,
- ScopeIdToString(Common->LocalScopeId).data(), ScopeIdToString(Params.PeerScopeId).data(), descr.Type);
- ev.reset();
- }
- if (ev) {
- TActivationContext::Send(ev.release());
+ void TInputSessionTCP::ProcessInboundPacketQ(size_t numXdcBytesRead) {
+ for (; !InboundPacketQ.empty(); InboundPacketQ.pop_front()) {
+ auto& front = InboundPacketQ.front();
+
+ const size_t n = Min(numXdcBytesRead, front.XdcUnreadBytes);
+ front.XdcUnreadBytes -= n;
+ numXdcBytesRead -= n;
+
+ if (front.XdcUnreadBytes) { // we haven't finished this packet yet
+ Y_VERIFY(!numXdcBytesRead);
+ break;
+ }
+
+ if (!Context->AdvanceLastPacketSerialToConfirm(front.Serial)) {
+ throw TExDestroySession{TDisconnectReason::NewSession()};
+ }
+ }
+ }
+
+ void TInputSessionTCP::ProcessXdcCommand(ui16 channel, TReceiveContext::TPerChannelContext& context) {
+ const char *ptr = XdcCommands.data();
+ const char *end = ptr + XdcCommands.size();
+ while (ptr != end) {
+ switch (static_cast<EXdcCommand>(*ptr++)) {
+ case EXdcCommand::DECLARE_SECTION: {
+ // extract and validate command parameters
+ const ui64 headroom = NInterconnect::NDetail::DeserializeNumber(&ptr, end);
+ const ui64 size = NInterconnect::NDetail::DeserializeNumber(&ptr, end);
+ const ui64 tailroom = NInterconnect::NDetail::DeserializeNumber(&ptr, end);
+ const ui64 alignment = NInterconnect::NDetail::DeserializeNumber(&ptr, end);
+ if (headroom == Max<ui64>() || size == Max<ui64>() || tailroom == Max<ui64>() || alignment == Max<ui64>()) {
+ LOG_CRIT_IC_SESSION("ICIS00", "XDC command format error");
+ throw TExDestroySession{TDisconnectReason::FormatError()};
+ }
+
+ if (!IgnorePayload) { // process command if packet is being applied
+ // allocate buffer and push it into the payload
+ auto& pendingEvent = context.PendingEvents.back();
+ pendingEvent.SerializationInfo.Sections.push_back(TEventSectionInfo{headroom, size, tailroom, alignment});
+ auto buffer = TRcBuf::Uninitialized(size, headroom, tailroom);
+ if (size) {
+ context.XdcBuffers.push_back(buffer.GetContiguousSpanMut());
+ }
+ pendingEvent.Payload.Insert(pendingEvent.Payload.End(), TRope(std::move(buffer)));
+ pendingEvent.XdcSizeLeft += size;
+
+ ++XdcSections;
+ }
+ continue;
+ }
+
+ case EXdcCommand::PUSH_DATA: {
+ const size_t cmdLen = sizeof(ui16) + (Params.Encryption ? 0 : sizeof(ui32));
+ if (static_cast<size_t>(end - ptr) < cmdLen) {
+ LOG_CRIT_IC_SESSION("ICIS18", "XDC command format error");
+ throw TExDestroySession{TDisconnectReason::FormatError()};
+ }
+
+ auto size = *reinterpret_cast<const ui16*>(ptr);
+ if (!size) {
+ LOG_CRIT_IC_SESSION("ICIS03", "XDC empty payload");
+ throw TExDestroySession{TDisconnectReason::FormatError()};
+ }
+
+ if (!Params.Encryption) {
+ const ui32 checksumExpected = *reinterpret_cast<const ui32*>(ptr + sizeof(ui16));
+ XdcChecksumQ.emplace_back(size, checksumExpected);
+ }
+
+ if (IgnorePayload) {
+ // this packet was already marked as 'processed', all commands had been executed, but we must
+ // parse XDC stream correctly
+ XdcCatchStreamBytesPending += size;
+ XdcCatchStreamMarkup.emplace_back(channel, size);
+ } else {
+ // account channel and number of bytes in XDC for this packet
+ auto& packet = InboundPacketQ.back();
+ packet.XdcUnreadBytes += size;
+
+ // find buffers and acquire data buffer pointers
+ context.FetchBuffers(channel, size, XdcInputQ);
+ }
+
+ ptr += cmdLen;
+ ++XdcRefs;
+ continue;
+ }
+ }
+
+ LOG_CRIT_IC_SESSION("ICIS15", "unexpected XDC command");
+ throw TExDestroySession{TDisconnectReason::FormatError()};
+ }
+ }
+
+ void TInputSessionTCP::ProcessEvents(TReceiveContext::TPerChannelContext& context) {
+ for (; !context.PendingEvents.empty(); context.PendingEvents.pop_front()) {
+ auto& pendingEvent = context.PendingEvents.front();
+ if (!pendingEvent.EventData || pendingEvent.XdcSizeLeft) {
+ break; // event is not ready yet
+ }
+
+ auto& descr = *pendingEvent.EventData;
+ if (descr.Checksum) {
+ ui32 checksum = 0;
+ for (const auto&& [data, size] : pendingEvent.Payload) {
+ checksum = Crc32cExtendMSanCompatible(checksum, data, size);
+ }
+ if (checksum != descr.Checksum) {
+ LOG_CRIT_IC_SESSION("ICIS05", "event checksum error Type# 0x%08" PRIx32, descr.Type);
+ throw TExReestablishConnection{TDisconnectReason::ChecksumError()};
+ }
+ }
+ pendingEvent.SerializationInfo.IsExtendedFormat = descr.Flags & IEventHandle::FlagExtendedFormat;
+ auto ev = std::make_unique<IEventHandle>(SessionId,
+ descr.Type,
+ descr.Flags & ~IEventHandle::FlagExtendedFormat,
+ descr.Recipient,
+ descr.Sender,
+ MakeIntrusive<TEventSerializedData>(std::move(pendingEvent.Payload), std::move(pendingEvent.SerializationInfo)),
+ descr.Cookie,
+ Params.PeerScopeId,
+ std::move(descr.TraceId));
+ if (Common->EventFilter && !Common->EventFilter->CheckIncomingEvent(*ev, Common->LocalScopeId)) {
+ LOG_CRIT_IC_SESSION("ICIC03", "Event dropped due to scope error LocalScopeId# %s PeerScopeId# %s Type# 0x%08" PRIx32,
+ ScopeIdToString(Common->LocalScopeId).data(), ScopeIdToString(Params.PeerScopeId).data(), descr.Type);
+ ev.reset();
+ }
+ if (ev) {
+ TActivationContext::Send(ev.release());
+ }
}
}
@@ -347,39 +581,28 @@ namespace NActors {
}
}
- bool TInputSessionTCP::ReadMore() {
- PreallocateBuffers();
-
- TStackVec<TIoVec, 16> buffs;
- size_t offset = FirstBufferOffset;
- for (const auto& item : Buffers) {
- TIoVec iov{item->GetBuffer() + offset, item->GetCapacity() - offset};
- buffs.push_back(iov);
- if (Params.Encryption) {
- break; // do not put more than one buffer in queue to prevent using ReadV
- }
- offset = 0;
- }
-
- const struct iovec* iovec = reinterpret_cast<const struct iovec*>(buffs.data());
- int iovcnt = buffs.size();
-
+ ssize_t TInputSessionTCP::Read(NInterconnect::TStreamSocket& socket, const TPollerToken::TPtr& token,
+ bool *readPending, const TIoVec *iov, size_t num) {
ssize_t recvres = 0;
TString err;
LWPROBE_IF_TOO_LONG(SlowICReadFromSocket, ms) {
do {
const ui64 begin = GetCycleCountFast();
#ifndef _win_
- recvres = iovcnt == 1 ? Socket->Recv(iovec->iov_base, iovec->iov_len, &err) : Socket->ReadV(iovec, iovcnt);
+ if (num == 1) {
+ recvres = socket.Recv(iov->Data, iov->Size, &err);
+ } else {
+ recvres = socket.ReadV(reinterpret_cast<const iovec*>(iov), num);
+ }
#else
- recvres = Socket->Recv(iovec[0].iov_base, iovec[0].iov_len, &err);
+ recvres = socket.Recv(iov->Data, iov->Size, &err);
#endif
const ui64 end = GetCycleCountFast();
Metrics->IncRecvSyscalls((end - begin) * 1'000'000 / GetCyclesPerMillisecond());
} while (recvres == -EINTR);
}
- LOG_DEBUG_IC_SESSION("ICIS12", "ReadMore recvres# %zd iovcnt# %d err# %s", recvres, iovcnt, err.data());
+ LOG_DEBUG_IC_SESSION("ICIS12", "Read recvres# %zd num# %zu err# %s", recvres, num, err.data());
if (recvres <= 0 || CloseInputSessionRequested) {
if ((-recvres != EAGAIN && -recvres != EWOULDBLOCK) || CloseInputSessionRequested) {
@@ -388,23 +611,41 @@ namespace NActors {
: err ? err
: Sprintf("socket: %s", strerror(-recvres));
LOG_NOTICE_NET(NodeId, "%s", message.data());
- ReestablishConnection(CloseInputSessionRequested ? TDisconnectReason::Debug() :
- recvres == 0 ? TDisconnectReason::EndOfStream() : TDisconnectReason::FromErrno(-recvres));
- } else if (PollerToken && !std::exchange(Context->ReadPending, true)) {
- if (Params.Encryption) {
- auto *secure = static_cast<NInterconnect::TSecureSocket*>(Socket.Get());
- const bool wantRead = secure->WantRead(), wantWrite = secure->WantWrite();
- Y_VERIFY_DEBUG(wantRead || wantWrite);
- PollerToken->Request(wantRead, wantWrite);
- } else {
- PollerToken->Request(true, false);
- }
+ throw TExReestablishConnection{CloseInputSessionRequested ? TDisconnectReason::Debug() :
+ recvres == 0 ? TDisconnectReason::EndOfStream() : TDisconnectReason::FromErrno(-recvres)};
+ } else if (token && !std::exchange(*readPending, true)) {
+ socket.Request(*token, true, false);
+ }
+ return -1;
+ }
+
+ return recvres;
+ }
+
+ bool TInputSessionTCP::ReadMore() {
+ PreallocateBuffers();
+
+ TStackVec<TIoVec, 16> buffs;
+ size_t offset = FirstBufferOffset;
+ for (const auto& item : Buffers) {
+ TIoVec iov{item->GetBuffer() + offset, item->GetCapacity() - offset};
+ buffs.push_back(iov);
+ if (Params.Encryption) {
+ break; // do not put more than one buffer in queue to prevent using ReadV
}
+ offset = 0;
+ }
+
+ ssize_t recvres = Read(*Socket, PollerToken, &Context->MainReadPending, buffs.data(), buffs.size());
+ if (recvres == -1) {
return false;
}
Y_VERIFY(recvres > 0);
Metrics->AddTotalBytesRead(recvres);
+ BytesReadFromSocket += recvres;
+
+ size_t numBuffersCovered = 0;
while (recvres) {
Y_VERIFY(!Buffers.empty());
@@ -417,6 +658,19 @@ namespace NActors {
Buffers.pop_front();
FirstBufferOffset = 0;
}
+ ++numBuffersCovered;
+ }
+
+ if (Buffers.empty()) { // we have read all the data, increase number of buffers
+ CurrentBuffers = Min(CurrentBuffers * 2, MaxBuffers);
+ } else if (++UsageHisto[numBuffersCovered - 1] == 64) { // time to shift
+ for (auto& value : UsageHisto) {
+ value /= 2;
+ }
+ while (CurrentBuffers > 1 && !UsageHisto[CurrentBuffers - 1]) {
+ --CurrentBuffers;
+ }
+ Y_VERIFY_DEBUG(UsageHisto[CurrentBuffers - 1]);
}
LastReceiveTimestamp = TActivationContext::Monotonic();
@@ -424,29 +678,178 @@ namespace NActors {
return true;
}
- void TInputSessionTCP::PreallocateBuffers() {
- // ensure that we have exactly "numBuffers" in queue
- LWPROBE_IF_TOO_LONG(SlowICReadLoopAdjustSize, ms) {
- while (Buffers.size() < Common->Settings.NumPreallocatedBuffers) {
- Buffers.emplace_back(TRopeAlignedBuffer::Allocate(Common->Settings.PreallocatedBufferSize));
+ bool TInputSessionTCP::ReadXdcCatchStream(ui64 *numDataBytes) {
+ bool progress = false;
+
+ while (XdcCatchStreamBytesPending) { // read data into catch stream if we still have to
+ if (!XdcCatchStreamBuffer) {
+ XdcCatchStreamBuffer = TRcBuf::Uninitialized(64 * 1024);
+ }
+
+ const size_t numBytesToRead = Min<size_t>(XdcCatchStreamBytesPending, XdcCatchStreamBuffer.size() - XdcCatchStreamBufferOffset);
+
+ TIoVec iov{XdcCatchStreamBuffer.GetDataMut() + XdcCatchStreamBufferOffset, numBytesToRead};
+ ssize_t recvres = Read(*XdcSocket, XdcPollerToken, &Context->XdcReadPending, &iov, 1);
+ if (recvres == -1) {
+ return progress;
+ }
+
+ HandleXdcChecksum({XdcCatchStreamBuffer.data() + XdcCatchStreamBufferOffset, static_cast<size_t>(recvres)});
+
+ XdcCatchStreamBufferOffset += recvres;
+ XdcCatchStreamBytesPending -= recvres;
+ *numDataBytes += recvres;
+ BytesReadFromXdcSocket += recvres;
+
+ if (XdcCatchStreamBufferOffset == XdcCatchStreamBuffer.size() || XdcCatchStreamBytesPending == 0) {
+ TRope(std::exchange(XdcCatchStreamBuffer, {})).ExtractFront(XdcCatchStreamBufferOffset, &XdcCatchStream);
+ XdcCatchStreamBufferOffset = 0;
+ }
+
+ progress = true;
+ }
+
+ if (XdcCatchStreamFinal && XdcCatchStream) {
+ // calculate total number of bytes to catch
+ size_t totalBytesToCatch = 0;
+ for (auto& context : Context->ChannelArray) {
+ totalBytesToCatch += context.XdcBytesToCatch;
+ }
+ for (auto& [channel, context] : Context->ChannelMap) {
+ totalBytesToCatch += context.XdcBytesToCatch;
+ }
+
+ // calculate ignored offset
+ Y_VERIFY(totalBytesToCatch <= XdcCatchStream.GetSize());
+ size_t bytesToIgnore = XdcCatchStream.GetSize() - totalBytesToCatch;
+
+ // process catch stream markup
+ THashSet<ui16> channels;
+ for (auto [channel, size] : XdcCatchStreamMarkup) {
+ if (const size_t n = Min<size_t>(bytesToIgnore, size)) {
+ XdcCatchStream.EraseFront(n);
+ bytesToIgnore -= n;
+ size -= n;
+ }
+ if (const size_t n = Min<size_t>(totalBytesToCatch, size)) {
+ GetPerChannelContext(channel).DropFront(&XdcCatchStream, n);
+ channels.insert(channel);
+ totalBytesToCatch -= n;
+ size -= n;
+ }
+ Y_VERIFY(!size);
}
+ for (ui16 channel : channels) {
+ ProcessEvents(GetPerChannelContext(channel));
+ }
+
+ // ensure everything was processed
+ Y_VERIFY(!XdcCatchStream);
+ Y_VERIFY(!bytesToIgnore);
+ Y_VERIFY(!totalBytesToCatch);
+ XdcCatchStreamMarkup = {};
}
+
+ return progress;
}
- void TInputSessionTCP::ReestablishConnection(TDisconnectReason reason) {
- LOG_DEBUG_IC_SESSION("ICIS09", "ReestablishConnection, reason# %s", reason.ToString().data());
- AtomicDecrement(Context->NumInputSessions);
- Send(SessionId, new TEvSocketDisconnect(std::move(reason)));
- PassAway();
- Socket.Reset();
+ bool TInputSessionTCP::ReadXdc(ui64 *numDataBytes) {
+ bool progress = ReadXdcCatchStream(numDataBytes);
+
+ // exit if we have no work to do
+ if (XdcInputQ.empty() || XdcCatchStreamBytesPending) {
+ return progress;
+ }
+
+ TStackVec<TIoVec, 64> buffs;
+ size_t size = 0;
+ for (auto& [channel, span] : XdcInputQ) {
+ buffs.push_back(TIoVec{span.data(), span.size()});
+ size += span.size();
+ if (buffs.size() == 64 || size >= 1024 * 1024 || Params.Encryption) {
+ break;
+ }
+ }
+
+ ssize_t recvres = Read(*XdcSocket, XdcPollerToken, &Context->XdcReadPending, buffs.data(), buffs.size());
+ if (recvres == -1) {
+ return progress;
+ }
+
+ // calculate stream checksums
+ {
+ size_t bytesToChecksum = recvres;
+ for (const auto& iov : buffs) {
+ const size_t n = Min(bytesToChecksum, iov.Size);
+ HandleXdcChecksum({static_cast<const char*>(iov.Data), n});
+ bytesToChecksum -= n;
+ if (!bytesToChecksum) {
+ break;
+ }
+ }
+ }
+
+ Y_VERIFY(recvres > 0);
+ Metrics->AddTotalBytesRead(recvres);
+ *numDataBytes += recvres;
+ BytesReadFromXdcSocket += recvres;
+
+ // cut the XdcInputQ deque
+ for (size_t bytesToCut = recvres; bytesToCut; ) {
+ Y_VERIFY(!XdcInputQ.empty());
+ auto& [channel, span] = XdcInputQ.front();
+ size_t n = Min(bytesToCut, span.size());
+ bytesToCut -= n;
+ if (n == span.size()) {
+ XdcInputQ.pop_front();
+ } else {
+ span = span.SubSpan(n, Max<size_t>());
+ Y_VERIFY(!bytesToCut);
+ }
+
+ Y_VERIFY_DEBUG(n);
+ auto& context = GetPerChannelContext(channel);
+ context.DropFront(nullptr, n);
+ ProcessEvents(context);
+ }
+
+ // drop fully processed inbound packets
+ ProcessInboundPacketQ(recvres);
+
+ LastReceiveTimestamp = TActivationContext::Monotonic();
+
+ return true;
}
- void TInputSessionTCP::DestroySession(TDisconnectReason reason) {
- LOG_DEBUG_IC_SESSION("ICIS13", "DestroySession, reason# %s", reason.ToString().data());
- AtomicDecrement(Context->NumInputSessions);
- Send(SessionId, TInterconnectSessionTCP::NewEvTerminate(std::move(reason)));
- PassAway();
- Socket.Reset();
+ void TInputSessionTCP::HandleXdcChecksum(TContiguousSpan span) {
+ if (Params.Encryption) {
+ return;
+ }
+ while (span.size()) {
+ Y_VERIFY_DEBUG(!XdcChecksumQ.empty());
+ auto& [size, expected] = XdcChecksumQ.front();
+ const size_t n = Min<size_t>(size, span.size());
+ XdcCurrentChecksum = Crc32cExtendMSanCompatible(XdcCurrentChecksum, span.data(), n);
+ span = span.SubSpan(n, Max<size_t>());
+ size -= n;
+ if (!size) {
+ if (XdcCurrentChecksum != expected) {
+ LOG_ERROR_IC_SESSION("ICIS16", "payload checksum error");
+ throw TExReestablishConnection{TDisconnectReason::ChecksumError()};
+ }
+ XdcChecksumQ.pop_front();
+ XdcCurrentChecksum = 0;
+ }
+ }
+ }
+
+ void TInputSessionTCP::PreallocateBuffers() {
+ // ensure that we have exactly "numBuffers" in queue
+ LWPROBE_IF_TOO_LONG(SlowICReadLoopAdjustSize, ms) {
+ while (Buffers.size() < CurrentBuffers) {
+ Buffers.emplace_back(TRopeAlignedBuffer::Allocate(Common->Settings.PreallocatedBufferSize));
+ }
+ }
}
void TInputSessionTCP::PassAway() {
@@ -460,7 +863,7 @@ namespace NActors {
ReceiveData();
if (Socket && now >= LastReceiveTimestamp + DeadPeerTimeout) {
// nothing has changed, terminate session
- DestroySession(TDisconnectReason::DeadPeer());
+ throw TExDestroySession{TDisconnectReason::DeadPeer()};
}
}
Schedule(LastReceiveTimestamp + DeadPeerTimeout, new TEvCheckDeadPeer);
@@ -496,5 +899,66 @@ namespace NActors {
Metrics->SetClockSkewMicrosec(clockSkew);
}
+ TReceiveContext::TPerChannelContext& TInputSessionTCP::GetPerChannelContext(ui16 channel) const {
+ return channel < std::size(Context->ChannelArray)
+ ? Context->ChannelArray[channel]
+ : Context->ChannelMap[channel];
+ }
+
+ void TInputSessionTCP::GenerateHttpInfo(NMon::TEvHttpInfoRes::TPtr ev) {
+ TStringStream str;
+ ev->Get()->Output(str);
+
+ HTML(str) {
+ DIV_CLASS("panel panel-info") {
+ DIV_CLASS("panel-heading") {
+ str << "Input Session";
+ }
+ DIV_CLASS("panel-body") {
+ TABLE_CLASS("table") {
+ TABLEHEAD() {
+ TABLER() {
+ TABLEH() {
+ str << "Sensor";
+ }
+ TABLEH() {
+ str << "Value";
+ }
+ }
+ }
+#define MON_VAR(KEY) \
+ TABLER() { \
+ TABLED() { str << #KEY; } \
+ TABLED() { str << (KEY); } \
+ }
+
+ TABLEBODY() {
+ MON_VAR(BytesReadFromSocket)
+ MON_VAR(PacketsReadFromSocket)
+ MON_VAR(DataPacketsReadFromSocket)
+ MON_VAR(IgnoredDataPacketsFromSocket)
+
+ MON_VAR(BytesReadFromXdcSocket)
+ MON_VAR(XdcSections)
+ MON_VAR(XdcRefs)
+
+ MON_VAR(PayloadSize)
+ MON_VAR(InboundPacketQ.size())
+ MON_VAR(XdcInputQ.size())
+ MON_VAR(Buffers.size())
+ MON_VAR(IncomingData.GetSize())
+ MON_VAR(Payload.GetSize())
+ MON_VAR(CurrentBuffers)
+
+ MON_VAR(Context->LastProcessedSerial)
+ MON_VAR(ConfirmedByInput)
+ }
+ }
+ }
+ }
+ }
+
+ TActivationContext::Send(new IEventHandle(ev->Recipient, ev->Sender, new NMon::TEvHttpInfoRes(str.Str())));
+ }
}
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
index 7ca2453f6c..13f2f2dd83 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
@@ -294,7 +294,7 @@ namespace NActors {
Send(ev->Sender, new TEvHandshakeReplyError("duplicate serial"));
return;
} else if (serial == *LastSerialFromIncomingHandshake) {
- LOG_NOTICE_IC("ICP15", "Handshake# %s is obsolete, serial# %" PRIu64
+ LOG_NOTICE_IC("ICP00", "Handshake# %s is obsolete, serial# %" PRIu64
" LastSerialFromIncomingHandshake# %" PRIu64, ev->Sender.ToString().data(),
serial, *LastSerialFromIncomingHandshake);
Send(ev->Sender, new TEvents::TEvPoisonPill);
@@ -731,11 +731,16 @@ namespace NActors {
}
}
- if (Session != nullptr) {
- Session->GenerateHttpInfo(str);
+ TAutoPtr<IEventHandle> h(new IEventHandle(ev->Sender, ev->Recipient, new NMon::TEvHttpInfoRes(str.Str())));
+ if (Session) {
+ switch (auto& ev = h; ev->GetTypeRewrite()) {
+ hFunc(NMon::TEvHttpInfoRes, Session->GenerateHttpInfo);
+ default:
+ Y_FAIL();
+ }
+ } else {
+ TActivationContext::Send(h.Release());
}
-
- Send(ev->Sender, new NMon::TEvHttpInfoRes(str.Str()));
}
void TInterconnectProxyTCP::TransitToErrorState(TString explanation, bool updateErrorLog) {
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
index 352310e37c..9ea7fa0c31 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
@@ -456,7 +456,7 @@ namespace NActors {
ICPROXY_PROFILED;
if (const TActorId& actorId = std::exchange(OutgoingHandshakeActor, TActorId())) {
- LOG_DEBUG_IC("ICP112", "dropped outgoing handshake: %s poison: %s", actorId.ToString().data(),
+ LOG_DEBUG_IC("ICP052", "dropped outgoing handshake: %s poison: %s", actorId.ToString().data(),
poison ? "true" : "false");
if (poison) {
Send(actorId, new TEvents::TEvPoisonPill);
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index 16cadc9e92..e8fc974433 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -162,7 +162,7 @@ namespace NActors {
if (RamInQueue && !RamInQueue->Batching) {
// we have pending TEvRam, so GenerateTraffic will be called no matter what
- } else if (InflightDataAmount >= GetTotalInflightAmountOfData() || !Socket || ReceiveContext->WriteBlockedByFullSendBuffer) {
+ } else if (InflightDataAmount >= GetTotalInflightAmountOfData() || !Socket) {
// we can't issue more traffic now; GenerateTraffic will be called upon unblocking
} else if (TotalOutputQueueSize >= 64 * 1024) {
// output queue size is quite big to issue some traffic
@@ -203,7 +203,7 @@ namespace NActors {
// close existing input session, if any, and do nothing upon its destruction
ReestablishConnection({}, false, TDisconnectReason::NewSession());
- const ui64 lastInputSerial = ReceiveContext->LockLastProcessedPacketSerial();
+ const ui64 lastInputSerial = ReceiveContext->LockLastPacketSerialToConfirm();
LOG_INFO_IC_SESSION("ICS08", "incoming handshake Self# %s Peer# %s Counter# %" PRIu64 " LastInputSerial# %" PRIu64,
msg->Self.ToString().data(), msg->Peer.ToString().data(), msg->Counter, lastInputSerial);
@@ -240,10 +240,17 @@ namespace NActors {
LOG_INFO_IC_SESSION("ICS10", "traffic start");
+ // reset parameters to initial values
+ WriteBlockedByFullSendBuffer = false;
+ ReceiveContext->MainWriteBlocked = false;
+ ReceiveContext->XdcWriteBlocked = false;
+ ReceiveContext->MainReadPending = false;
+ ReceiveContext->XdcReadPending = false;
+
// create input session actor
auto actor = MakeHolder<TInputSessionTCP>(SelfId(), Socket, XdcSocket, ReceiveContext, Proxy->Common,
Proxy->Metrics, Proxy->PeerNodeId, nextPacket, GetDeadPeerTimeout(), Params);
- ReceiveContext->UnlockLastProcessedPacketSerial();
+ ReceiveContext->ResetLastPacketSerialToConfirm();
ReceiverId = Params.Encryption ? RegisterWithSameMailbox(actor.Release()) : Register(actor.Release(), TMailboxType::ReadAsFilled);
// register our socket in poller actor
@@ -254,7 +261,6 @@ namespace NActors {
const bool success = Send(MakePollerActorId(), new TEvPollerRegister(XdcSocket, ReceiverId, SelfId()));
Y_VERIFY(success);
}
- ReceiveContext->WriteBlockedByFullSendBuffer = false;
LostConnectionWatchdog.Disarm();
Proxy->Metrics->SetConnected(1);
@@ -271,8 +277,11 @@ namespace NActors {
// also reset SendQueuePos
// drop confirmed packets first as we do not need unwanted retransmissions
+ OutgoingStream.RewindToEnd();
+ XdcStream.RewindToEnd();
DropConfirmed(nextPacket, true);
OutgoingStream.Rewind();
+ XdcStream.Rewind();
SendQueuePos = 0;
SendOffset = 0;
@@ -342,6 +351,7 @@ namespace NActors {
if (needConfirm && Socket) {
++ConfirmPacketsForcedBySize;
MakePacket(false);
+ WriteData();
}
for (;;) {
@@ -409,24 +419,26 @@ namespace NActors {
// first, we create as many data packets as we can generate under certain conditions; they include presence
// of events in channels queues and in flight fitting into requested limit; after we hit one of these conditions
// we exit cycle
- while (Socket && NumEventsInReadyChannels && InflightDataAmount < GetTotalInflightAmountOfData() && !ReceiveContext->WriteBlockedByFullSendBuffer) {
- if (generatedBytes >= generateLimit) {
- // resume later but ensure that we have issued at least one packet
- RamInQueue = new TEvRam(false);
- Send(SelfId(), RamInQueue);
- RamStartedCycles = GetCycleCountFast();
- LWPROBE(StartRam, Proxy->PeerNodeId);
- break;
- }
+ if (Socket) {
+ while (NumEventsInReadyChannels && InflightDataAmount < GetTotalInflightAmountOfData()) {
+ if (generatedBytes >= generateLimit) {
+ // resume later but ensure that we have issued at least one packet
+ RamInQueue = new TEvRam(false);
+ Send(SelfId(), RamInQueue);
+ RamStartedCycles = GetCycleCountFast();
+ LWPROBE(StartRam, Proxy->PeerNodeId);
+ break;
+ }
- try {
- generatedBytes += MakePacket(true);
- ++generatedPackets;
- } catch (const TExSerializedEventTooLarge& ex) {
- // terminate session if the event can't be serialized properly
- accountTraffic();
- LOG_CRIT_IC("ICS31", "serialized event Type# 0x%08" PRIx32 " is too large", ex.Type);
- return Terminate(TDisconnectReason::EventTooLarge());
+ try {
+ generatedBytes += MakePacket(true);
+ ++generatedPackets;
+ } catch (const TExSerializedEventTooLarge& ex) {
+ // terminate session if the event can't be serialized properly
+ accountTraffic();
+ LOG_CRIT_IC("ICS31", "serialized event Type# 0x%08" PRIx32 " is too large", ex.Type);
+ return Terminate(TDisconnectReason::EventTooLarge());
+ }
}
}
@@ -444,7 +456,7 @@ namespace NActors {
void TInterconnectSessionTCP::StartHandshake() {
LOG_INFO_IC_SESSION("ICS15", "start handshake");
- IActor::InvokeOtherActor(*Proxy, &TInterconnectProxyTCP::StartResumeHandshake, ReceiveContext->LockLastProcessedPacketSerial());
+ IActor::InvokeOtherActor(*Proxy, &TInterconnectProxyTCP::StartResumeHandshake, ReceiveContext->LockLastPacketSerialToConfirm());
}
void TInterconnectSessionTCP::ReestablishConnectionWithHandshake(TDisconnectReason reason) {
@@ -515,161 +527,179 @@ namespace NActors {
void TInterconnectSessionTCP::Handle(TEvPollerReady::TPtr& ev) {
LOG_DEBUG_IC_SESSION("ICS29", "HandleReadyWrite WriteBlockedByFullSendBuffer# %s",
- ReceiveContext->WriteBlockedByFullSendBuffer ? "true" : "false");
- if (std::exchange(ReceiveContext->WriteBlockedByFullSendBuffer, false)) {
+ WriteBlockedByFullSendBuffer ? "true" : "false");
+
+ auto *msg = ev->Get();
+ bool useful = false;
+ bool readPending = false;
+
+ if (msg->Socket == Socket) {
+ useful = ReceiveContext->MainWriteBlocked;
+ readPending = ReceiveContext->MainReadPending;
+ } else if (msg->Socket == XdcSocket) {
+ useful = ReceiveContext->XdcWriteBlocked;
+ readPending = ReceiveContext->XdcReadPending;
+ }
+
+ if (useful) {
Proxy->Metrics->IncUsefulWriteWakeups();
- ui64 nowCycles = GetCycleCountFast();
- double blockedUs = NHPTimer::GetSeconds(nowCycles - WriteBlockedCycles) * 1000000.0;
- LWPROBE(ReadyWrite, Proxy->PeerNodeId, NHPTimer::GetSeconds(nowCycles - ev->SendTime) * 1000.0, blockedUs / 1000.0);
- WriteBlockedTotal += TDuration::MicroSeconds(blockedUs);
- GenerateTraffic();
} else if (!ev->Cookie) {
Proxy->Metrics->IncSpuriousWriteWakeups();
}
- if (Params.Encryption && ReceiveContext->ReadPending && !ev->Cookie) {
- Send(ReceiverId, ev->Release().Release(), 0, 1);
+
+ GenerateTraffic();
+
+ if (Params.Encryption && readPending && ev->Sender != ReceiverId) {
+ Send(ReceiverId, ev->Release().Release());
}
}
void TInterconnectSessionTCP::Handle(TEvPollerRegisterResult::TPtr ev) {
- const auto& sk = ev->Get()->Socket;
- if (auto *token = sk == Socket ? &PollerToken : sk == XdcSocket ? &XdcPollerToken : nullptr) {
- *token = std::move(ev->Get()->PollerToken);
- } else {
- return;
- }
+ auto *msg = ev->Get();
- if (ReceiveContext->WriteBlockedByFullSendBuffer) {
- if (Params.Encryption) {
- auto *secure = static_cast<NInterconnect::TSecureSocket*>(Socket.Get());
- PollerToken->Request(secure->WantRead(), secure->WantWrite());
- } else {
- PollerToken->Request(false, true);
+ if (msg->Socket == Socket) {
+ PollerToken = std::move(msg->PollerToken);
+ if (ReceiveContext->MainWriteBlocked) {
+ Socket->Request(*PollerToken, false, true);
+ }
+ } else if (msg->Socket == XdcSocket) {
+ XdcPollerToken = std::move(msg->PollerToken);
+ if (ReceiveContext->XdcWriteBlocked) {
+ XdcSocket->Request(*XdcPollerToken, false, true);
}
}
}
void TInterconnectSessionTCP::WriteData() {
+ Y_VERIFY(Socket); // ensure that socket wasn't closed
+
+ // total bytes written during this call
ui64 written = 0;
- Y_VERIFY(Socket); // ensure that socket wasn't closed
+ auto process = [&](NInterconnect::TOutgoingStream& stream, const TIntrusivePtr<NInterconnect::TStreamSocket>& socket,
+ const TPollerToken::TPtr& token, bool *writeBlocked, auto&& callback) {
+ while (stream && socket) {
+ ssize_t r = Write(stream, *socket);
+ if (r == -1) {
+ *writeBlocked = true;
+ if (token) {
+ socket->Request(*token, false, true);
+ }
+ break;
+ } else if (r == 0) {
+ break; // error condition
+ }
- LWPROBE_IF_TOO_LONG(SlowICWriteData, Proxy->PeerNodeId, ms) {
- constexpr ui32 iovLimit = 256;
-#ifdef _linux_
- ui32 maxElementsInIOV = Min<ui32>(iovLimit, sysconf(_SC_IOV_MAX));
-#else
- ui32 maxElementsInIOV = 64;
-#endif
- if (Params.Encryption) {
- maxElementsInIOV = 1;
+ *writeBlocked = false;
+ written += r;
+ callback(r);
+ }
+ if (!socket) {
+ *writeBlocked = true;
+ } else if (!stream) {
+ *writeBlocked = false;
}
+ };
- // vector of write buffers with preallocated stack space
- TStackVec<TConstIoVec, iovLimit> wbuffers;
+ Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize());
+
+ process(OutgoingStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked, [&](size_t r) {
+ Y_VERIFY(r <= BytesUnwritten);
+ BytesUnwritten -= r;
- LOG_DEBUG_IC_SESSION("ICS30", "WriteData WriteBlockedByFullSendBuffer# %s SendQueue.size# %zu",
- ReceiveContext->WriteBlockedByFullSendBuffer ? "true" : "false", SendQueue.size());
+ OutgoingStream.Advance(r);
- auto calculateUnsentQueueSize = [&] {
- size_t res = -SendOffset;
- for (auto it = SendQueue.begin() + SendQueuePos; it != SendQueue.end(); ++it) {
- res += it->PacketSize;
+ ui64 packets = 0;
+ Y_VERIFY_DEBUG(SendQueuePos != SendQueue.size());
+ SendOffset += r;
+ for (auto it = SendQueue.begin() + SendQueuePos; SendOffset && it->PacketSize <= SendOffset; ++SendQueuePos, ++it) {
+ SendOffset -= it->PacketSize;
+ Y_VERIFY_DEBUG(!it->Data || it->Serial <= OutputCounter);
+ if (it->Data && LastSentSerial < it->Serial) {
+ LastSentSerial = it->Serial;
}
- return res;
- };
+ ++PacketsWrittenToSocket;
+ ++packets;
+ Y_VERIFY_DEBUG(SendOffset == 0 || SendQueuePos != SendQueue.size() - 1);
+ }
- while (!ReceiveContext->WriteBlockedByFullSendBuffer) {
- Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize());
- Y_VERIFY_DEBUG(BytesUnwritten == calculateUnsentQueueSize());
+ Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize());
+ });
- OutgoingStream.ProduceIoVec(wbuffers, maxElementsInIOV);
- if (!wbuffers) { // done sending
- break;
- }
+ process(XdcStream, XdcSocket, XdcPollerToken, &ReceiveContext->XdcWriteBlocked, [&](size_t r) {
+ XdcBytesSent += r;
+ XdcStream.Advance(r);
+ });
- const struct iovec* iovec = reinterpret_cast<const struct iovec*>(wbuffers.data());
- int iovcnt = wbuffers.size();
+ if (written) {
+ Proxy->Metrics->AddTotalBytesWritten(written);
+ }
- Y_VERIFY(iovcnt > 0);
- Y_VERIFY(iovec->iov_len > 0);
+ const bool writeBlockedByFullSendBuffer = ReceiveContext->MainWriteBlocked || ReceiveContext->XdcWriteBlocked;
+ if (WriteBlockedByFullSendBuffer < writeBlockedByFullSendBuffer) { // became blocked
+ WriteBlockedCycles = GetCycleCountFast();
+ LOG_DEBUG_IC_SESSION("ICS18", "hit send buffer limit");
+ } else if (writeBlockedByFullSendBuffer < WriteBlockedByFullSendBuffer) { // became unblocked
+ WriteBlockedTotal += TDuration::Seconds(NHPTimer::GetSeconds(GetCycleCountFast() - WriteBlockedCycles));
+ }
+ WriteBlockedByFullSendBuffer = writeBlockedByFullSendBuffer;
+ }
- TString err;
- ssize_t r = 0;
- do {
- const ui64 begin = GetCycleCountFast();
-#ifndef _win_
- r = iovcnt == 1 ? Socket->Send(iovec[0].iov_base, iovec[0].iov_len, &err) : Socket->WriteV(iovec, iovcnt);
+ ssize_t TInterconnectSessionTCP::Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket) {
+ LWPROBE_IF_TOO_LONG(SlowICWriteData, Proxy->PeerNodeId, ms) {
+ constexpr ui32 iovLimit = 256;
+
+ ui32 maxElementsInIOV;
+ if (Params.Encryption) {
+ maxElementsInIOV = 1;
+ } else {
+#if defined(_win_)
+ maxElementsInIOV = 1;
+#elif defined(_linux_)
+ maxElementsInIOV = Min<ui32>(iovLimit, sysconf(_SC_IOV_MAX));
#else
- r = Socket->Send(iovec[0].iov_base, iovec[0].iov_len, &err);
+ maxElementsInIOV = 64;
#endif
- const ui64 end = GetCycleCountFast();
- Proxy->Metrics->IncSendSyscalls((end - begin) * 1'000'000 / GetCyclesPerMillisecond());
- } while (r == -EINTR);
-
- LOG_DEBUG_IC_SESSION("ICS16", "written# %zd iovcnt# %d err# %s", r, iovcnt, err.data());
-
- wbuffers.clear();
+ }
- if (r > 0) {
- written += r;
+ TStackVec<TConstIoVec, iovLimit> wbuffers;
- Y_VERIFY(static_cast<size_t>(r) <= BytesUnwritten);
- BytesUnwritten -= r;
+ stream.ProduceIoVec(wbuffers, maxElementsInIOV);
+ Y_VERIFY(!wbuffers.empty());
- OutgoingStream.Advance(r);
+ TString err;
+ ssize_t r = 0;
+ { // issue syscall with timing
+ const ui64 begin = GetCycleCountFast();
- ui64 packets = 0;
- Y_VERIFY_DEBUG(SendQueuePos != SendQueue.size());
- SendOffset += r;
- for (auto it = SendQueue.begin() + SendQueuePos; SendOffset && it->PacketSize <= SendOffset; ++SendQueuePos, ++it) {
- SendOffset -= it->PacketSize;
- Y_VERIFY_DEBUG(!it->Data || it->Serial <= OutputCounter);
- if (it->Data && LastSentSerial < it->Serial) {
- LastSentSerial = it->Serial;
- }
- ++PacketsWrittenToSocket;
- ++packets;
- Y_VERIFY_DEBUG(SendOffset == 0 || SendQueuePos != SendQueue.size() - 1);
-// LWTRACK(PacketWrittenToSocket, SendQueuePos->Orbit, Proxy->PeerNodeId, PacketsWrittenToSocket, false, SendQueuePos->PacketSize, BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket);
+ do {
+ if (wbuffers.size() == 1) {
+ auto& front = wbuffers.front();
+ r = socket.Send(front.Data, front.Size, &err);
+ } else {
+ r = socket.WriteV(reinterpret_cast<const iovec*>(wbuffers.data()), wbuffers.size());
}
+ } while (r == -EINTR);
- Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize());
- Y_VERIFY_DEBUG(BytesUnwritten == calculateUnsentQueueSize());
-
- LWPROBE(WriteToSocket, Proxy->PeerNodeId, r, packets, PacketsWrittenToSocket, BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket);
- } else if (-r != EAGAIN && -r != EWOULDBLOCK) {
- const TString message = r == 0 ? "connection closed by peer"
- : err ? err
- : Sprintf("socket: %s", strerror(-r));
- LOG_NOTICE_NET(Proxy->PeerNodeId, "%s", message.data());
- if (written) {
- Proxy->Metrics->AddTotalBytesWritten(written);
- }
- return ReestablishConnectionWithHandshake(r == 0 ? TDisconnectReason::EndOfStream() : TDisconnectReason::FromErrno(-r));
- } else {
- // we have received EAGAIN error code, this means that we can't issue more data until we have received
- // TEvPollerReadyWrite event from poller; set up flag meaning this and wait for that event
- Y_VERIFY(!ReceiveContext->WriteBlockedByFullSendBuffer);
- ReceiveContext->WriteBlockedByFullSendBuffer = true;
- WriteBlockedCycles = GetCycleCountFast();
- LWPROBE(BlockedWrite, Proxy->PeerNodeId, SendQueue.size(), written);
- LOG_DEBUG_IC_SESSION("ICS18", "hit send buffer limit");
-
- if (PollerToken) {
- if (Params.Encryption) {
- auto *secure = static_cast<NInterconnect::TSecureSocket*>(Socket.Get());
- PollerToken->Request(secure->WantRead(), secure->WantWrite());
- } else {
- PollerToken->Request(false, true);
- }
- }
- }
+ const ui64 end = GetCycleCountFast();
+ Proxy->Metrics->IncSendSyscalls((end - begin) * 1'000'000 / GetCyclesPerMillisecond());
+ }
+
+ if (r > 0) {
+ return r;
+ } else if (-r != EAGAIN && -r != EWOULDBLOCK) {
+ const TString message = r == 0 ? "connection closed by peer"
+ : err ? err
+ : Sprintf("socket: %s", strerror(-r));
+ LOG_NOTICE_NET(Proxy->PeerNodeId, "%s", message.data());
+ ReestablishConnectionWithHandshake(r == 0 ? TDisconnectReason::EndOfStream() : TDisconnectReason::FromErrno(-r));
+ return 0; // error indicator
+ } else {
+ return -1; // temporary error
}
}
- if (written) {
- Proxy->Metrics->AddTotalBytesWritten(written);
- }
+
+ Y_UNREACHABLE();
}
void TInterconnectSessionTCP::SetForcePacketTimestamp(TDuration period) {
@@ -702,6 +732,7 @@ namespace NActors {
++ConfirmPacketsForcedByTimeout;
++FlushEventsProcessed;
MakePacket(false); // just generate confirmation packet if we have preconditions for this
+ WriteData();
} else if (ForcePacketTimestamp != TMonotonic::Max()) {
ScheduleFlush();
}
@@ -718,9 +749,12 @@ namespace NActors {
}
ui64 TInterconnectSessionTCP::MakePacket(bool data, TMaybe<ui64> pingMask) {
- Y_VERIFY(Socket);
+#ifndef NDEBUG
+ const size_t outgoingStreamSizeBefore = OutgoingStream.CalculateOutgoingSize();
+ const size_t xdcStreamSizeBefore = XdcStream.CalculateOutgoingSize();
+#endif
- TTcpPacketOutTask packet(Params, OutgoingStream);
+ TTcpPacketOutTask packet(Params, OutgoingStream, XdcStream);
ui64 serial = 0;
if (data) {
@@ -751,19 +785,36 @@ namespace NActors {
serial = *pingMask;
}
- const ui64 lastInputSerial = ReceiveContext->GetLastProcessedPacketSerial();
+ const ui64 lastInputSerial = ReceiveContext->GetLastPacketSerialToConfirm();
packet.Finish(serial, lastInputSerial);
// count number of bytes pending for write
- const size_t packetSize = packet.GetFullSize();
+ const size_t packetSize = packet.GetPacketSize();
BytesUnwritten += packetSize;
Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize(), "%s", TString(TStringBuilder()
<< "BytesUnwritten# " << BytesUnwritten << " packetSize# " << packetSize
<< " CalculateUnsentSize# " << OutgoingStream.CalculateUnsentSize()).data());
+#ifndef NDEBUG
+ const size_t outgoingStreamSizeAfter = OutgoingStream.CalculateOutgoingSize();
+ const size_t xdcStreamSizeAfter = XdcStream.CalculateOutgoingSize();
+
+ Y_VERIFY(outgoingStreamSizeAfter == outgoingStreamSizeBefore + packetSize &&
+ xdcStreamSizeAfter == xdcStreamSizeBefore + packet.GetExternalSize(),
+ "outgoingStreamSizeBefore# %zu outgoingStreamSizeAfter# %zu packetSize# %zu"
+ " xdcStreamSizeBefore# %zu xdcStreamSizeAfter# %zu externalSize# %zu",
+ outgoingStreamSizeBefore, outgoingStreamSizeAfter, packetSize,
+ xdcStreamSizeBefore, xdcStreamSizeAfter, packet.GetExternalSize());
+#endif
+
// put outgoing packet metadata here
- SendQueue.push_back(TOutgoingPacket{packetSize, serial, data});
+ SendQueue.push_back(TOutgoingPacket{
+ static_cast<ui32>(packetSize),
+ static_cast<ui32>(packet.GetExternalSize()),
+ serial,
+ data
+ });
LOG_DEBUG_IC_SESSION("ICS22", "outgoing packet Serial# %" PRIu64 " Confirm# %" PRIu64 " DataSize# %zu"
" InflightDataAmount# %" PRIu64 " BytesUnwritten# %" PRIu64, serial, lastInputSerial, packet.GetDataSize(),
@@ -773,11 +824,6 @@ namespace NActors {
ResetFlushLogic();
++PacketsGenerated;
- LWTRACK(PacketGenerated, packet.Orbit, Proxy->PeerNodeId, BytesUnwritten, InflightDataAmount, PacketsGenerated, packetSize);
-
- if (!data) {
- WriteData();
- }
return packetSize;
}
@@ -790,7 +836,6 @@ namespace NActors {
LogPrefix.data(), confirm, LastConfirmed, OutputCounter, LastSentSerial);
LastConfirmed = confirm;
- ui64 droppedDataAmount = 0;
std::optional<ui64> lastDroppedSerial = 0;
ui32 numDropped = 0;
@@ -814,6 +859,7 @@ namespace NActors {
// drop confirmed packets; this also includes any auxiliary packets as their serial is set to zero, effectively
// making Serial <= confirm true
size_t bytesDropped = 0;
+ size_t bytesDroppedFromXdc = 0;
for (; !SendQueue.empty(); SendQueue.pop_front(), --SendQueuePos) {
auto& front = SendQueue.front();
if (front.Data && confirm < front.Serial) {
@@ -822,12 +868,14 @@ namespace NActors {
if (front.Data) {
lastDroppedSerial.emplace(front.Serial);
}
- droppedDataAmount += front.PacketSize - sizeof(TTcpPacketHeader_v2);
bytesDropped += front.PacketSize;
+ bytesDroppedFromXdc += front.ExternalSize;
++numDropped;
Y_VERIFY_DEBUG(ignoreSendQueuePos || SendQueuePos != 0);
}
+ const ui64 droppedDataAmount = bytesDropped + bytesDroppedFromXdc - sizeof(TTcpPacketHeader_v2) * numDropped;
OutgoingStream.DropFront(bytesDropped);
+ XdcStream.DropFront(bytesDroppedFromXdc);
if (lastDroppedSerial) {
ChannelScheduler->ForEach([&](TEventOutputChannel& channel) {
channel.DropConfirmed(*lastDroppedSerial);
@@ -894,7 +942,6 @@ namespace NActors {
}
}
- LWTRACK(FillSendingBuffer, task.Orbit, Proxy->PeerNodeId, bytesGenerated, NumEventsInReadyChannels, WriteBlockedTotal);
Y_VERIFY(bytesGenerated); // ensure we are not stalled in serialization
}
@@ -1009,12 +1056,11 @@ namespace NActors {
void TInterconnectSessionTCP::IssuePingRequest() {
const TMonotonic now = TActivationContext::Monotonic();
if (now >= LastPingTimestamp + PingPeriodicity) {
- LOG_DEBUG_IC_SESSION("ICS22", "Issuing ping request");
+ LOG_DEBUG_IC_SESSION("ICS00", "Issuing ping request");
if (Socket) {
MakePacket(false, GetCycleCountFast() | TTcpPacketBuf::PingRequestMask);
- }
- if (Socket) {
MakePacket(false, TInstant::Now().MicroSeconds() | TTcpPacketBuf::ClockMask);
+ WriteData();
}
LastPingTimestamp = now;
}
@@ -1023,10 +1069,14 @@ namespace NActors {
void TInterconnectSessionTCP::Handle(TEvProcessPingRequest::TPtr ev) {
if (Socket) {
MakePacket(false, ev->Get()->Payload | TTcpPacketBuf::PingResponseMask);
+ WriteData();
}
}
- void TInterconnectSessionTCP::GenerateHttpInfo(TStringStream& str) {
+ void TInterconnectSessionTCP::GenerateHttpInfo(NMon::TEvHttpInfoRes::TPtr& ev) {
+ TStringStream str;
+ ev->Get()->Output(str);
+
HTML(str) {
DIV_CLASS("panel panel-info") {
DIV_CLASS("panel-heading") {
@@ -1146,7 +1196,6 @@ namespace NActors {
MON_VAR(PacketsGenerated)
MON_VAR(PacketsWrittenToSocket)
MON_VAR(PacketsConfirmed)
- MON_VAR(AtomicGet(ReceiveContext->PacketsReadFromSocket))
MON_VAR(ConfirmPacketsForcedBySize)
MON_VAR(ConfirmPacketsForcedByTimeout)
@@ -1200,13 +1249,24 @@ namespace NActors {
MON_VAR(LastHandshakeDone)
MON_VAR(OutputCounter)
MON_VAR(LastSentSerial)
- MON_VAR(ReceiveContext->GetLastProcessedPacketSerial())
MON_VAR(LastConfirmed)
MON_VAR(FlushSchedule.size())
MON_VAR(MaxFlushSchedule)
MON_VAR(FlushEventsScheduled)
MON_VAR(FlushEventsProcessed)
+ MON_VAR(GetWriteBlockedTotal())
+
+ MON_VAR(XdcBytesSent)
+
+ MON_VAR(OutgoingStream.CalculateOutgoingSize())
+ MON_VAR(OutgoingStream.CalculateUnsentSize())
+ MON_VAR(OutgoingStream.GetSendQueueSize())
+
+ MON_VAR(XdcStream.CalculateOutgoingSize())
+ MON_VAR(XdcStream.CalculateUnsentSize())
+ MON_VAR(XdcStream.GetSendQueueSize())
+
TString clockSkew;
i64 x = GetClockSkew();
if (x < 0) {
@@ -1228,6 +1288,12 @@ namespace NActors {
}
}
}
+
+ auto h = std::make_unique<IEventHandle>(ev->Recipient, ev->Sender, new NMon::TEvHttpInfoRes(str.Str()));
+ if (ReceiverId) {
+ h->Rewrite(h->Type, ReceiverId);
+ }
+ TActivationContext::Send(h.release());
}
void CreateSessionKillingActor(TInterconnectProxyCommon::TPtr common) {
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h
index 48fa39b273..1437f7df2c 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h
@@ -99,13 +99,9 @@ namespace NActors {
ui64 ControlPacketSendTimer = 0;
ui64 ControlPacketId = 0;
- // number of packets received by input session
- TAtomic PacketsReadFromSocket = 0;
- TAtomic DataPacketsReadFromSocket = 0;
-
// last processed packet by input session
- std::atomic_uint64_t LastProcessedPacketSerial = 0;
- static constexpr uint64_t LastProcessedPacketSerialLockBit = uint64_t(1) << 63;
+ std::atomic_uint64_t LastPacketSerialToConfirm = 0;
+ static constexpr uint64_t LastPacketSerialToConfirmLockBit = uint64_t(1) << 63;
// for hardened checks
TAtomic NumInputSessions = 0;
@@ -118,47 +114,72 @@ namespace NActors {
std::atomic<EUpdateState> UpdateState;
static_assert(std::atomic<EUpdateState>::is_always_lock_free);
- bool WriteBlockedByFullSendBuffer = false;
- bool ReadPending = false;
+ bool MainWriteBlocked = false;
+ bool XdcWriteBlocked = false;
+ bool MainReadPending = false;
+ bool XdcReadPending = false;
+
+ struct TPerChannelContext {
+ struct TPendingEvent {
+ TEventSerializationInfo SerializationInfo;
+ TRope Payload;
+ std::optional<TEventData> EventData;
+
+ // number of bytes remaining through XDC channel
+ size_t XdcSizeLeft = 0;
+ };
+
+ std::deque<TPendingEvent> PendingEvents;
+ std::deque<TMutableContiguousSpan> XdcBuffers; // receive queue for current channel
+ size_t FetchIndex = 0;
+ size_t FetchOffset = 0;
+ size_t XdcBytesToCatch = 0; // number of bytes to catch after reconnect
+
+ void CalculateBytesToCatch();
+ void FetchBuffers(ui16 channel, size_t numBytes, std::deque<std::tuple<ui16, TMutableContiguousSpan>>& outQ);
+ void DropFront(TRope *from, size_t numBytes);
+ };
- std::array<TRope, 16> ChannelArray;
- std::unordered_map<ui16, TRope> ChannelMap;
+ std::array<TPerChannelContext, 16> ChannelArray;
+ std::unordered_map<ui16, TPerChannelContext> ChannelMap;
+ ui64 LastProcessedSerial = 0;
TReceiveContext() {
GetTimeFast(&StartTime);
}
- // returns false if sessions needs to be terminated and packet not to be processed
- bool AdvanceLastProcessedPacketSerial() {
+ // returns false if sessions needs to be terminated
+ bool AdvanceLastPacketSerialToConfirm(ui64 nextValue) {
for (;;) {
- uint64_t value = LastProcessedPacketSerial.load();
- if (value & LastProcessedPacketSerialLockBit) {
+ uint64_t value = LastPacketSerialToConfirm.load();
+ if (value & LastPacketSerialToConfirmLockBit) {
return false;
}
- if (LastProcessedPacketSerial.compare_exchange_weak(value, value + 1)) {
+ Y_VERIFY_DEBUG(value + 1 == nextValue);
+ if (LastPacketSerialToConfirm.compare_exchange_weak(value, nextValue)) {
return true;
}
}
}
- ui64 LockLastProcessedPacketSerial() {
+ ui64 LockLastPacketSerialToConfirm() {
for (;;) {
- uint64_t value = LastProcessedPacketSerial.load();
- if (value & LastProcessedPacketSerialLockBit) {
- return value & ~LastProcessedPacketSerialLockBit;
+ uint64_t value = LastPacketSerialToConfirm.load();
+ if (value & LastPacketSerialToConfirmLockBit) {
+ return value & ~LastPacketSerialToConfirmLockBit;
}
- if (LastProcessedPacketSerial.compare_exchange_strong(value, value | LastProcessedPacketSerialLockBit)) {
+ if (LastPacketSerialToConfirm.compare_exchange_strong(value, value | LastPacketSerialToConfirmLockBit)) {
return value;
}
}
}
- void UnlockLastProcessedPacketSerial() {
- LastProcessedPacketSerial = LastProcessedPacketSerial.load() & ~LastProcessedPacketSerialLockBit;
+ void ResetLastPacketSerialToConfirm() {
+ LastPacketSerialToConfirm = LastProcessedSerial;
}
- ui64 GetLastProcessedPacketSerial() {
- return LastProcessedPacketSerial.load() & ~LastProcessedPacketSerialLockBit;
+ ui64 GetLastPacketSerialToConfirm() {
+ return LastPacketSerialToConfirm.load() & ~LastPacketSerialToConfirmLockBit;
}
};
@@ -172,7 +193,6 @@ namespace NActors {
};
struct TEvCheckDeadPeer : TEventLocal<TEvCheckDeadPeer, EvCheckDeadPeer> {};
- struct TEvResumeReceiveData : TEventLocal<TEvResumeReceiveData, EvResumeReceiveData> {};
public:
static constexpr EActivityType ActorActivityType() {
@@ -195,14 +215,25 @@ namespace NActors {
void Bootstrap();
- STRICT_STFUNC(WorkingState,
+ struct TExReestablishConnection {
+ TDisconnectReason Reason;
+ };
+
+ struct TExDestroySession {
+ TDisconnectReason Reason;
+ };
+
+ STATEFN(WorkingState);
+
+ STRICT_STFUNC(WorkingStateImpl,
cFunc(TEvents::TSystem::PoisonPill, PassAway)
hFunc(TEvPollerReady, Handle)
hFunc(TEvPollerRegisterResult, Handle)
- cFunc(EvResumeReceiveData, HandleResumeReceiveData)
+ cFunc(EvResumeReceiveData, ReceiveData)
cFunc(TEvInterconnect::TEvCloseInputSession::EventType, CloseInputSession)
cFunc(EvCheckDeadPeer, HandleCheckDeadPeer)
cFunc(TEvConfirmUpdate::EventType, HandleConfirmUpdate)
+ hFunc(NMon::TEvHttpInfoRes, GenerateHttpInfo)
)
private:
@@ -228,9 +259,28 @@ namespace NActors {
};
EState State = EState::HEADER;
+ std::vector<char> XdcCommands;
+
+ struct TInboundPacket {
+ ui64 Serial;
+ size_t XdcUnreadBytes; // number of unread bytes from XDC stream for this exact unprocessed packet
+ };
+ std::deque<TInboundPacket> InboundPacketQ;
+ std::deque<std::tuple<ui16, TMutableContiguousSpan>> XdcInputQ; // target buffers for the XDC stream with channel reference
+
+ // catch stream -- used after TCP reconnect to match unread XDC stream with main packet stream
+ TRope XdcCatchStream; // temporary data buffer to process XDC stream retransmission upon reconnect
+ TRcBuf XdcCatchStreamBuffer;
+ size_t XdcCatchStreamBufferOffset = 0;
+ ui64 XdcCatchStreamBytesPending = 0;
+ std::deque<std::tuple<ui16, ui16>> XdcCatchStreamMarkup; // a queue of pairs (channel, bytes)
+ std::deque<std::tuple<ui16, ui32>> XdcChecksumQ; // (size, expectedChecksum)
+ ui32 XdcCurrentChecksum = 0;
+ bool XdcCatchStreamFinal = false;
+ bool XdcCatchStreamFinalPending = false;
+
THolder<TEvUpdateFromInputSession> UpdateFromInputSession;
- std::optional<ui64> LastReceivedSerial;
ui64 ConfirmedByInput;
std::shared_ptr<IInterconnectMetrics> Metrics;
@@ -241,21 +291,31 @@ namespace NActors {
void Handle(TEvPollerReady::TPtr ev);
void Handle(TEvPollerRegisterResult::TPtr ev);
- void HandleResumeReceiveData();
void HandleConfirmUpdate();
void ReceiveData();
void ProcessHeader();
- void ProcessPayload(ui64& numDataBytes);
- void ProcessEvent(TRope& data, TEventData& descr);
+ void ProcessPayload(ui64 *numDataBytes);
+ void ProcessInboundPacketQ(size_t numXdcBytesRead);
+ void ProcessXdcCommand(ui16 channel, TReceiveContext::TPerChannelContext& context);
+ void ProcessEvents(TReceiveContext::TPerChannelContext& context);
+ ssize_t Read(NInterconnect::TStreamSocket& socket, const TPollerToken::TPtr& token, bool *readPending,
+ const TIoVec *iov, size_t num);
bool ReadMore();
+ bool ReadXdcCatchStream(ui64 *numDataBytes);
+ bool ReadXdc(ui64 *numDataBytes);
+ void HandleXdcChecksum(TContiguousSpan span);
+
+ TReceiveContext::TPerChannelContext& GetPerChannelContext(ui16 channel) const;
- void ReestablishConnection(TDisconnectReason reason);
- void DestroySession(TDisconnectReason reason);
void PassAway() override;
TDeque<TIntrusivePtr<TRopeAlignedBuffer>> Buffers;
size_t FirstBufferOffset = 0;
+ size_t CurrentBuffers = 1; // number of buffers currently required to allocate
+ static constexpr size_t MaxBuffers = 64; // maximum buffers possible
+ std::array<ui8, MaxBuffers> UsageHisto; // read count histogram
+
void PreallocateBuffers();
inline ui64 GetMaxCyclesPerEvent() const {
@@ -275,6 +335,20 @@ namespace NActors {
void HandlePingResponse(TDuration passed);
void HandleClock(TInstant clock);
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Stats
+
+ ui64 BytesReadFromSocket = 0;
+ ui64 PacketsReadFromSocket = 0;
+ ui64 DataPacketsReadFromSocket = 0;
+ ui64 IgnoredDataPacketsFromSocket = 0;
+
+ ui64 BytesReadFromXdcSocket = 0;
+ ui64 XdcSections = 0;
+ ui64 XdcRefs = 0;
+
+ void GenerateHttpInfo(NMon::TEvHttpInfoRes::TPtr ev);
};
class TInterconnectSessionTCP
@@ -385,6 +459,7 @@ namespace NActors {
void Handle(TEvPollerReady::TPtr& ev);
void Handle(TEvPollerRegisterResult::TPtr ev);
void WriteData();
+ ssize_t Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket);
ui64 MakePacket(bool data, TMaybe<ui64> pingMask = {});
void FillSendingBuffer(TTcpPacketOutTask& packet, ui64 serial);
@@ -446,9 +521,11 @@ namespace NActors {
void SwitchStuckPeriod();
NInterconnect::TOutgoingStream OutgoingStream;
+ NInterconnect::TOutgoingStream XdcStream;
struct TOutgoingPacket {
- size_t PacketSize;
+ ui32 PacketSize; // including header
+ ui32 ExternalSize;
ui64 Serial;
bool Data;
};
@@ -456,17 +533,18 @@ namespace NActors {
size_t SendQueuePos = 0; // packet being sent now
size_t SendOffset = 0;
+ ui64 XdcBytesSent = 0;
+
ui64 WriteBlockedCycles = 0; // start of current block period
TDuration WriteBlockedTotal; // total incremental duration that session has been blocked
- ui64 BytesUnwritten = 0;
+ bool WriteBlockedByFullSendBuffer = false;
+
+ ui64 BytesUnwritten = 0; // number of bytes in outgoing main queue
TDuration GetWriteBlockedTotal() const {
- if (ReceiveContext->WriteBlockedByFullSendBuffer) {
- double blockedUs = NHPTimer::GetSeconds(GetCycleCountFast() - WriteBlockedCycles) * 1000000.0;
- return WriteBlockedTotal + TDuration::MicroSeconds(blockedUs); // append current blocking period if any
- } else {
- return WriteBlockedTotal;
- }
+ return WriteBlockedTotal + (WriteBlockedByFullSendBuffer
+ ? TDuration::Seconds(NHPTimer::GetSeconds(GetCycleCountFast() - WriteBlockedCycles))
+ : TDuration::Zero());
}
ui64 OutputCounter;
@@ -496,7 +574,7 @@ namespace NActors {
void HandleFlush();
void ResetFlushLogic();
- void GenerateHttpInfo(TStringStream& str);
+ void GenerateHttpInfo(NMon::TEvHttpInfoRes::TPtr& ev);
TIntrusivePtr<TReceiveContext> ReceiveContext;
TActorId ReceiverId;
diff --git a/library/cpp/actors/interconnect/outgoing_stream.h b/library/cpp/actors/interconnect/outgoing_stream.h
index 0306295b52..deffd1d93b 100644
--- a/library/cpp/actors/interconnect/outgoing_stream.h
+++ b/library/cpp/actors/interconnect/outgoing_stream.h
@@ -38,6 +38,10 @@ namespace NInterconnect {
size_t SendOffset = 0;
public:
+ operator bool() const {
+ return SendQueuePos != SendQueue.size();
+ }
+
size_t CalculateOutgoingSize() const {
size_t res = 0;
for (const TSendChunk& chunk : SendQueue) {
@@ -54,8 +58,12 @@ namespace NInterconnect {
return res - SendOffset;
}
+ size_t GetSendQueueSize() const {
+ return SendQueue.size();
+ }
+
TMutableContiguousSpan AcquireSpanForWriting(size_t maxLen) {
- if (AppendOffset == BufferSize) { // we have no free buffer, allocate one
+ if (maxLen && AppendOffset == BufferSize) { // we have no free buffer, allocate one
Buffers.emplace_back(static_cast<TBuffer*>(malloc(sizeof(TBuffer))));
AppendBuffer = Buffers.back().get();
Y_VERIFY(AppendBuffer);
@@ -67,14 +75,8 @@ namespace NInterconnect {
}
void Append(TContiguousSpan span) {
- TBuffer *buffer = nullptr;
if (AppendBuffer && span.data() == AppendBuffer->Data + AppendOffset) { // the only valid case to use previously acquired span
- buffer = AppendBuffer;
- AppendOffset += span.size();
- Y_VERIFY_DEBUG(AppendOffset <= BufferSize);
- if (AppendOffset != BufferSize) {
- ++buffer->RefCount;
- }
+ AppendAcquiredSpan(span);
} else {
#ifndef NDEBUG
// ensure this span does not point into any existing buffer part
@@ -88,33 +90,15 @@ namespace NInterconnect {
}
}
#endif
+ AppendSpanWithGlueing(span, nullptr);
}
-
- if (!SendQueue.empty()) {
- auto& back = SendQueue.back();
- if (back.Span.data() + back.Span.size() == span.data()) { // check if it is possible just to extend the last span
- if (SendQueuePos == SendQueue.size()) {
- --SendQueuePos;
- SendOffset = back.Span.size();
- }
- back.Span = {back.Span.data(), back.Span.size() + span.size()};
- DropBufferReference(buffer);
- return;
- }
- }
-
- if (buffer) {
- ++buffer->RefCount;
- }
- SendQueue.push_back(TSendChunk{span, buffer});
- DropBufferReference(buffer);
}
void Write(TContiguousSpan in) {
while (in.size()) {
auto outChunk = AcquireSpanForWriting(in.size());
- Append(outChunk);
memcpy(outChunk.data(), in.data(), outChunk.size());
+ AppendAcquiredSpan(outChunk);
in = in.SubSpan(outChunk.size(), Max<size_t>());
}
}
@@ -126,7 +110,7 @@ namespace NInterconnect {
while (len) {
const auto span = AcquireSpanForWriting(len);
- Append(span);
+ AppendAcquiredSpan(span);
bookmark.push_back(span);
len -= span.size();
}
@@ -136,7 +120,7 @@ namespace NInterconnect {
void WriteBookmark(TBookmark&& bookmark, TContiguousSpan in) {
for (auto& outChunk : bookmark) {
- Y_VERIFY(outChunk.size() <= in.size());
+ Y_VERIFY_DEBUG(outChunk.size() <= in.size());
memcpy(outChunk.data(), in.data(), outChunk.size());
in = in.SubSpan(outChunk.size(), Max<size_t>());
}
@@ -147,6 +131,11 @@ namespace NInterconnect {
SendOffset = 0;
}
+ void RewindToEnd() {
+ SendQueuePos = SendQueue.size();
+ SendOffset = 0;
+ }
+
template<typename T>
void ProduceIoVec(T& container, size_t maxItems) {
size_t offset = SendOffset;
@@ -173,7 +162,8 @@ namespace NInterconnect {
if (numBytes < front.Span.size()) {
front.Span = front.Span.SubSpan(numBytes, Max<size_t>());
if (SendQueuePos == 0) {
- Y_VERIFY_DEBUG(numBytes <= SendOffset);
+ Y_VERIFY_DEBUG(numBytes <= SendOffset, "numBytes# %zu SendOffset# %zu SendQueuePos# %zu"
+ " SendQueue.size# %zu", numBytes, SendOffset, SendQueuePos, SendQueue.size());
SendOffset -= numBytes;
}
break;
@@ -207,6 +197,37 @@ namespace NInterconnect {
}
private:
+ void AppendAcquiredSpan(TContiguousSpan span) {
+ TBuffer *buffer = AppendBuffer;
+ Y_VERIFY_DEBUG(buffer);
+ Y_VERIFY_DEBUG(span.data() == AppendBuffer->Data + AppendOffset);
+ AppendOffset += span.size();
+ Y_VERIFY_DEBUG(AppendOffset <= BufferSize);
+ if (AppendOffset == BufferSize) {
+ AppendBuffer = nullptr;
+ } else {
+ ++buffer->RefCount;
+ }
+ AppendSpanWithGlueing(span, buffer);
+ }
+
+ void AppendSpanWithGlueing(TContiguousSpan span, TBuffer *buffer) {
+ if (!SendQueue.empty()) {
+ auto& back = SendQueue.back();
+ if (back.Span.data() + back.Span.size() == span.data()) { // check if it is possible just to extend the last span
+ Y_VERIFY_DEBUG(buffer == back.Buffer);
+ if (SendQueuePos == SendQueue.size()) {
+ --SendQueuePos;
+ SendOffset = back.Span.size();
+ }
+ back.Span = {back.Span.data(), back.Span.size() + span.size()};
+ DropBufferReference(buffer);
+ return;
+ }
+ }
+ SendQueue.push_back(TSendChunk{span, buffer});
+ }
+
void DropBufferReference(TBuffer *buffer) {
if (buffer && !--buffer->RefCount) {
const size_t index = buffer->Index;
diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h
index c1d63fa3b8..3f7eda1e9c 100644
--- a/library/cpp/actors/interconnect/packet.h
+++ b/library/cpp/actors/interconnect/packet.h
@@ -59,6 +59,7 @@ struct TEventData {
};
#pragma pack(push, 1)
+
struct TEventDescr2 {
ui32 Type;
ui32 Flags;
@@ -68,6 +69,7 @@ struct TEventDescr2 {
NWilson::TTraceId::TSerializedTraceId TraceId;
ui32 Checksum;
};
+
#pragma pack(pop)
struct TEventHolder : TNonCopyable {
@@ -120,20 +122,23 @@ namespace NActors {
struct TTcpPacketOutTask : TNonCopyable {
const TSessionParams& Params;
NInterconnect::TOutgoingStream& OutgoingStream;
+ NInterconnect::TOutgoingStream& XdcStream;
NInterconnect::TOutgoingStream::TBookmark HeaderBookmark;
- size_t DataSize = 0;
- mutable NLWTrace::TOrbit Orbit;
+ size_t InternalSize = 0;
+ size_t ExternalSize = 0;
- TTcpPacketOutTask(const TSessionParams& params, NInterconnect::TOutgoingStream& outgoingStream)
+ TTcpPacketOutTask(const TSessionParams& params, NInterconnect::TOutgoingStream& outgoingStream,
+ NInterconnect::TOutgoingStream& xdcStream)
: Params(params)
, OutgoingStream(outgoingStream)
+ , XdcStream(xdcStream)
, HeaderBookmark(OutgoingStream.Bookmark(sizeof(TTcpPacketHeader_v2)))
{}
// Preallocate some space to fill it later.
NInterconnect::TOutgoingStream::TBookmark Bookmark(size_t len) {
- Y_VERIFY_DEBUG(len <= GetVirtualFreeAmount());
- DataSize += len;
+ Y_VERIFY_DEBUG(len <= GetInternalFreeAmount());
+ InternalSize += len;
return OutgoingStream.Bookmark(len);
}
@@ -143,32 +148,36 @@ struct TTcpPacketOutTask : TNonCopyable {
}
// Acquire raw pointer to write some data.
- TMutableContiguousSpan AcquireSpanForWriting() {
- return OutgoingStream.AcquireSpanForWriting(GetVirtualFreeAmount());
+ TMutableContiguousSpan AcquireSpanForWriting(bool external) {
+ if (external) {
+ return XdcStream.AcquireSpanForWriting(GetExternalFreeAmount());
+ } else {
+ return OutgoingStream.AcquireSpanForWriting(GetInternalFreeAmount());
+ }
}
// Append reference to some data (acquired previously or external pointer).
- void Append(const void *buffer, size_t len) {
- Y_VERIFY_DEBUG(len <= GetVirtualFreeAmount());
- DataSize += len;
- OutgoingStream.Append({static_cast<const char*>(buffer), len});
+ void Append(bool external, const void *buffer, size_t len) {
+ Y_VERIFY_DEBUG(len <= (external ? GetExternalFreeAmount() : GetInternalFreeAmount()));
+ (external ? ExternalSize : InternalSize) += len;
+ (external ? XdcStream : OutgoingStream).Append({static_cast<const char*>(buffer), len});
}
// Write some data with copying.
- void Write(const void *buffer, size_t len) {
- Y_VERIFY_DEBUG(len <= GetVirtualFreeAmount());
- DataSize += len;
- OutgoingStream.Write({static_cast<const char*>(buffer), len});
+ void Write(bool external, const void *buffer, size_t len) {
+ Y_VERIFY_DEBUG(len <= (external ? GetExternalFreeAmount() : GetInternalFreeAmount()));
+ (external ? ExternalSize : InternalSize) += len;
+ (external ? XdcStream : OutgoingStream).Write({static_cast<const char*>(buffer), len});
}
void Finish(ui64 serial, ui64 confirm) {
- Y_VERIFY(DataSize <= Max<ui16>());
+ Y_VERIFY(InternalSize <= Max<ui16>());
TTcpPacketHeader_v2 header{
confirm,
serial,
0,
- static_cast<ui16>(DataSize)
+ static_cast<ui16>(InternalSize)
};
if (Checksumming()) {
@@ -177,13 +186,13 @@ struct TTcpPacketOutTask : TNonCopyable {
size_t total = 0;
ui32 checksum = 0;
- OutgoingStream.ScanLastBytes(GetFullSize(), [&](TContiguousSpan span) {
+ OutgoingStream.ScanLastBytes(GetPacketSize(), [&](TContiguousSpan span) {
checksum = Crc32cExtendMSanCompatible(checksum, span.data(), span.size());
total += span.size();
});
header.Checksum = checksum;
- Y_VERIFY(total == sizeof(header) + DataSize, "total# %zu DataSize# %zu GetFullSize# %zu", total, DataSize,
- GetFullSize());
+ Y_VERIFY(total == GetPacketSize(), "total# %zu InternalSize# %zu GetPacketSize# %zu", total, InternalSize,
+ GetPacketSize());
}
WriteBookmark(std::exchange(HeaderBookmark, {}), &header, sizeof(header));
@@ -193,9 +202,42 @@ struct TTcpPacketOutTask : TNonCopyable {
return !Params.Encryption;
}
- bool IsFull() const { return GetVirtualFreeAmount() == 0; }
bool IsEmpty() const { return GetDataSize() == 0; }
- size_t GetDataSize() const { return DataSize; }
- size_t GetFullSize() const { return sizeof(TTcpPacketHeader_v2) + GetDataSize(); }
- size_t GetVirtualFreeAmount() const { return TTcpPacketBuf::PacketDataLen - DataSize; }
+ size_t GetDataSize() const { return InternalSize + ExternalSize; }
+ size_t GetPacketSize() const { return sizeof(TTcpPacketHeader_v2) + InternalSize; }
+ size_t GetInternalFreeAmount() const { return TTcpPacketBuf::PacketDataLen - InternalSize; }
+ size_t GetExternalFreeAmount() const { return 16384 - ExternalSize; }
+ size_t GetExternalSize() const { return ExternalSize; }
};
+
+namespace NInterconnect::NDetail {
+ static constexpr size_t MaxNumberBytes = (sizeof(ui64) * CHAR_BIT + 6) / 7;
+
+ inline size_t SerializeNumber(ui64 num, char *buffer) {
+ char *begin = buffer;
+ do {
+ *buffer++ = (num & 0x7F) | (num >= 128 ? 0x80 : 0x00);
+ num >>= 7;
+ } while (num);
+ return buffer - begin;
+ }
+
+ inline ui64 DeserializeNumber(const char **ptr, const char *end) {
+ const char *p = *ptr;
+ size_t res = 0;
+ size_t offset = 0;
+ for (;;) {
+ if (p == end) {
+ return Max<ui64>();
+ }
+ const char byte = *p++;
+ res |= (static_cast<size_t>(byte) & 0x7F) << offset;
+ offset += 7;
+ if (!(byte & 0x80)) {
+ break;
+ }
+ }
+ *ptr = p;
+ return res;
+ }
+}
diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
index 2de8dce457..2f8e575cb0 100644
--- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
@@ -46,7 +46,7 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) {
NInterconnect::TOutgoingStream stream;
for (; numEvents; ++step) {
- TTcpPacketOutTask task(p, stream);
+ TTcpPacketOutTask task(p, stream, stream);
if (step == 100) {
for (ui32 i = 0; i < 200; ++i) {