diff options
author | alexvru <alexvru@ydb.tech> | 2023-04-25 13:50:27 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-04-25 13:50:27 +0300 |
commit | 962dae07e71621bd25c81feee4c6f3355c94a73c (patch) | |
tree | a083a671f2eb0d1ded6208ae8037dc16e395c513 | |
parent | e33f5f1a073db44a35c1616b2f391751779a866f (diff) | |
download | ydb-962dae07e71621bd25c81feee4c6f3355c94a73c.tar.gz |
Support XDC streams
14 files changed, 1436 insertions, 551 deletions
diff --git a/library/cpp/actors/core/event_pb.cpp b/library/cpp/actors/core/event_pb.cpp index 018ff9ac34..4e37f36f70 100644 --- a/library/cpp/actors/core/event_pb.cpp +++ b/library/cpp/actors/core/event_pb.cpp @@ -77,11 +77,23 @@ namespace NActors { if (CancelFlag || AbortFlag) { return false; } else if (const size_t bytesToAppend = Min<size_t>(size, SizeRemain)) { - if (!Produce(data, bytesToAppend)) { - return false; + if ((reinterpret_cast<uintptr_t>(data) & 63) + bytesToAppend <= 2 * 64) { + memcpy(BufferPtr, data, bytesToAppend); + + if (!Produce(BufferPtr, bytesToAppend)) { + return false; + } + + BufferPtr += bytesToAppend; + data = static_cast<const char*>(data) + bytesToAppend; + size -= bytesToAppend; + } else { + if (!Produce(data, bytesToAppend)) { + return false; + } + data = static_cast<const char*>(data) + bytesToAppend; + size -= bytesToAppend; } - data = static_cast<const char*>(data) + bytesToAppend; - size -= bytesToAppend; } else { InnerContext.SwitchTo(BufFeedContext); } @@ -148,6 +160,7 @@ namespace NActors { // fill in base params BufferPtr = static_cast<char*>(data); SizeRemain = size; + Y_VERIFY_DEBUG(size); // transfer control to the coroutine Y_VERIFY(Event); diff --git a/library/cpp/actors/core/event_pb.h b/library/cpp/actors/core/event_pb.h index 8fd43aa34d..87e924b1e5 100644 --- a/library/cpp/actors/core/event_pb.h +++ b/library/cpp/actors/core/event_pb.h @@ -161,53 +161,7 @@ namespace NActors { } bool SerializeToArcadiaStream(TChunkSerializer* chunker) const override { - // serialize payload first - if (Payload) { - void *data; - int size = 0; - auto append = [&](const char *p, size_t len) { - while (len) { - if (size) { - const size_t numBytesToCopy = std::min<size_t>(size, len); - memcpy(data, p, numBytesToCopy); - data = static_cast<char*>(data) + numBytesToCopy; - size -= numBytesToCopy; - p += numBytesToCopy; - len -= numBytesToCopy; - } else if (!chunker->Next(&data, &size)) { - return false; - } - } - return true; - }; - auto appendNumber = [&](size_t number) { - char buf[MaxNumberBytes]; - return append(buf, SerializeNumber(number, buf)); - }; - char marker = PayloadMarker; - append(&marker, 1); - if (!appendNumber(Payload.size())) { - return false; - } - for (const TRope& rope : Payload) { - if (!appendNumber(rope.GetSize())) { - return false; - } - if (rope) { - if (size) { - chunker->BackUp(std::exchange(size, 0)); - } - if (!chunker->WriteRope(&rope)) { - return false; - } - } - } - if (size) { - chunker->BackUp(size); - } - } - - return Record.SerializeToZeroCopyStream(chunker); + return SerializeToArcadiaStreamImpl(chunker, TString()); } ui32 CalculateSerializedSize() const override { @@ -285,25 +239,7 @@ namespace NActors { } TEventSerializationInfo CreateSerializationInfo() const override { - TEventSerializationInfo info; - - if (Payload) { - char temp[MaxNumberBytes]; - info.Sections.push_back(TEventSectionInfo{0, 1, 0, 0}); // payload marker - info.Sections.push_back(TEventSectionInfo{0, SerializeNumber(Payload.size(), temp), 0, 0}); - for (const TRope& payload : Payload) { - info.Sections.push_back(TEventSectionInfo{0, SerializeNumber(payload.GetSize(), temp), 0, 0}); // length - info.Sections.push_back(TEventSectionInfo{0, payload.GetSize(), 0, 0}); // data - } - info.IsExtendedFormat = true; - } else { - info.IsExtendedFormat = false; - } - - const int byteSize = Max(0, Record.ByteSize()); - info.Sections.push_back(TEventSectionInfo{0, static_cast<size_t>(byteSize), 0, 0}); - - return info; + return CreateSerializationInfoImpl(0); } public: @@ -332,6 +268,93 @@ namespace NActors { } protected: + TEventSerializationInfo CreateSerializationInfoImpl(size_t preserializedSize) const { + TEventSerializationInfo info; + + if (Payload) { + char temp[MaxNumberBytes]; + info.Sections.push_back(TEventSectionInfo{0, 1 + SerializeNumber(Payload.size(), temp), 0, 0}); // payload marker and rope count + for (const TRope& rope : Payload) { + const size_t ropeSize = rope.GetSize(); + info.Sections.back().Size += SerializeNumber(ropeSize, temp); + info.Sections.push_back(TEventSectionInfo{0, ropeSize, 0, 0}); // data as a separate section + } + info.IsExtendedFormat = true; + } else { + info.IsExtendedFormat = false; + } + + const size_t byteSize = Max<ssize_t>(0, Record.ByteSize()) + preserializedSize; + info.Sections.push_back(TEventSectionInfo{0, byteSize, 0, 0}); // protobuf itself + +#ifndef NDEBUG + size_t total = 0; + for (const auto& section : info.Sections) { + total += section.Size; + } + size_t serialized = CalculateSerializedSize(); + Y_VERIFY(total == serialized, "total# %zu serialized# %zu byteSize# %zd Payload.size# %zu", total, + serialized, byteSize, Payload.size()); +#endif + + return info; + } + + bool SerializeToArcadiaStreamImpl(TChunkSerializer* chunker, const TString& preserialized) const { + // serialize payload first + if (Payload) { + void *data; + int size = 0; + auto append = [&](const char *p, size_t len) { + while (len) { + if (size) { + const size_t numBytesToCopy = std::min<size_t>(size, len); + memcpy(data, p, numBytesToCopy); + data = static_cast<char*>(data) + numBytesToCopy; + size -= numBytesToCopy; + p += numBytesToCopy; + len -= numBytesToCopy; + } else if (!chunker->Next(&data, &size)) { + return false; + } + } + return true; + }; + auto appendNumber = [&](size_t number) { + char buf[MaxNumberBytes]; + return append(buf, SerializeNumber(number, buf)); + }; + char marker = PayloadMarker; + append(&marker, 1); + if (!appendNumber(Payload.size())) { + return false; + } + for (const TRope& rope : Payload) { + if (!appendNumber(rope.GetSize())) { + return false; + } + if (rope) { + if (size) { + chunker->BackUp(std::exchange(size, 0)); + } + if (!chunker->WriteRope(&rope)) { + return false; + } + } + } + if (size) { + chunker->BackUp(size); + } + } + + if (preserialized && !chunker->WriteString(&preserialized)) { + return false; + } + + return Record.SerializeToZeroCopyStream(chunker); + } + + protected: mutable size_t CachedByteSize = 0; static constexpr char PayloadMarker = 0x07; @@ -488,7 +511,7 @@ namespace NActors { } bool SerializeToArcadiaStream(TChunkSerializer* chunker) const override { - return chunker->WriteString(&PreSerializedData) && TBase::SerializeToArcadiaStream(chunker); + return TBase::SerializeToArcadiaStreamImpl(chunker, PreSerializedData); } ui32 CalculateSerializedSize() const override { @@ -502,6 +525,10 @@ namespace NActors { ui32 CalculateSerializedSizeCached() const override { return GetCachedByteSize(); } + + TEventSerializationInfo CreateSerializationInfo() const override { + return TBase::CreateSerializationInfoImpl(PreSerializedData.size()); + } }; inline TActorId ActorIdFromProto(const NActorsProto::TActorId& actorId) { diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp index 64a6bbbd09..371f9b7653 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.cpp +++ b/library/cpp/actors/interconnect/interconnect_channel.cpp @@ -13,16 +13,13 @@ LWTRACE_USING(ACTORLIB_PROVIDER); namespace NActors { bool TEventOutputChannel::FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) { const size_t amount = sizeof(TChannelPart) + sizeof(TEventDescr2); - if (task.GetVirtualFreeAmount() < amount) { + if (task.GetInternalFreeAmount() < amount) { return false; } auto traceId = event.Span.GetTraceId(); event.Span.EndOk(); - LWTRACK(SerializeToPacketEnd, event.Orbit, PeerNodeId, ChannelId, OutputQueueSize, task.GetDataSize()); - task.Orbit.Take(event.Orbit); - Y_VERIFY(SerializationInfo); const ui32 flags = (event.Descr.Flags & ~IEventHandle::FlagForwardOnNondelivery) | (SerializationInfo->IsExtendedFormat ? IEventHandle::FlagExtendedFormat : 0); @@ -41,16 +38,16 @@ namespace NActors { // and channel header before the descriptor TChannelPart part{ - .Channel = static_cast<ui16>(ChannelId | TChannelPart::LastPartFlag), - .Size = sizeof(descr), + .ChannelFlags = static_cast<ui16>(ChannelId | TChannelPart::LastPartFlag), + .Size = sizeof(descr) }; // append them to the packet - task.Write(&part, sizeof(part)); - task.Write(&descr, sizeof(descr)); + task.Write(false, &part, sizeof(part)); + task.Write(false, &descr, sizeof(descr)); *weightConsumed += amount; - OutputQueueSize -= part.Size; + OutputQueueSize -= sizeof(TEventDescr2); Metrics->UpdateOutputChannelEvents(ChannelId); return true; @@ -71,106 +68,231 @@ namespace NActors { switch (State) { case EState::INITIAL: event.InitChecksum(); - LWTRACK(SerializeToPacketBegin, event.Orbit, PeerNodeId, ChannelId, OutputQueueSize); if (event.Buffer) { - State = EState::BUFFER; + State = EState::BODY; Iter = event.Buffer->GetBeginIter(); SerializationInfo = &event.Buffer->GetSerializationInfo(); + EventInExternalDataChannel = !SerializationInfo->Sections.empty() && Params.UseExternalDataChannel; } else if (event.Event) { - State = EState::CHUNKER; + State = EState::BODY; IEventBase *base = event.Event.Get(); if (event.EventSerializedSize) { Chunker.SetSerializingEvent(base); } SerializationInfoContainer = base->CreateSerializationInfo(); SerializationInfo = &SerializationInfoContainer; + EventInExternalDataChannel = !SerializationInfo->Sections.empty() && Params.UseExternalDataChannel; } else { // event without buffer and IEventBase instance State = EState::DESCRIPTOR; SerializationInfoContainer = {}; SerializationInfo = &SerializationInfoContainer; + EventInExternalDataChannel = false; } if (!event.EventSerializedSize) { State = EState::DESCRIPTOR; + } else if (EventInExternalDataChannel) { + State = EState::SECTIONS; + SectionIndex = 0; } break; - case EState::CHUNKER: - case EState::BUFFER: { - if (task.GetVirtualFreeAmount() <= sizeof(TChannelPart)) { + case EState::BODY: + if (FeedPayload(task, event, weightConsumed)) { + State = EState::DESCRIPTOR; + } else { return false; } + break; - TChannelPart part{ - .Channel = ChannelId, - .Size = 0, - }; + case EState::DESCRIPTOR: + if (!FeedDescriptor(task, event, weightConsumed)) { + return false; + } + event.Serial = serial; + NotYetConfirmed.splice(NotYetConfirmed.end(), Queue, Queue.begin()); // move event to not-yet-confirmed queue + SerializationInfoContainer = {}; + SerializationInfo = nullptr; + State = EState::INITIAL; + return true; // we have processed whole event, signal to the caller - auto partBookmark = task.Bookmark(sizeof(part)); + case EState::SECTIONS: { + if (SectionIndex == 0) { + size_t totalSectionSize = 0; + for (const auto& section : SerializationInfo->Sections) { + totalSectionSize += section.Size; + } + Y_VERIFY(totalSectionSize == event.EventSerializedSize); + } - auto addChunk = [&](const void *data, size_t len) { - event.UpdateChecksum(data, len); - task.Append(data, len); - part.Size += len; + while (SectionIndex != SerializationInfo->Sections.size()) { + char sectionInfo[1 + NInterconnect::NDetail::MaxNumberBytes * 4]; + char *p = sectionInfo; - event.EventActuallySerialized += len; - if (event.EventActuallySerialized > MaxSerializedEventSize) { - throw TExSerializedEventTooLarge(event.Descr.Type); - } - }; + const auto& section = SerializationInfo->Sections[SectionIndex]; + *p++ = static_cast<ui8>(EXdcCommand::DECLARE_SECTION); + p += NInterconnect::NDetail::SerializeNumber(section.Headroom, p); + p += NInterconnect::NDetail::SerializeNumber(section.Size, p); + p += NInterconnect::NDetail::SerializeNumber(section.Tailroom, p); + p += NInterconnect::NDetail::SerializeNumber(section.Alignment, p); + Y_VERIFY(p <= std::end(sectionInfo)); - bool complete = false; - if (State == EState::CHUNKER) { - while (!complete && !task.IsFull()) { - TMutableContiguousSpan out = task.AcquireSpanForWriting(); - const auto [first, last] = Chunker.FeedBuf(out.data(), out.size()); - for (auto p = first; p != last; ++p) { - addChunk(p->first, p->second); - } - complete = Chunker.IsComplete(); - } - Y_VERIFY(!complete || Chunker.IsSuccessfull()); - Y_VERIFY_DEBUG(complete || task.IsFull()); - } else { // BUFFER - while (const size_t numb = Min(task.GetVirtualFreeAmount(), Iter.ContiguousSize())) { - const char *obuf = Iter.ContiguousData(); - addChunk(obuf, numb); - Iter += numb; + const size_t declareLen = p - sectionInfo; + if (sizeof(TChannelPart) + XdcData.size() + declareLen <= task.GetInternalFreeAmount() && + XdcData.size() + declareLen <= Max<ui16>()) { + XdcData.insert(XdcData.end(), sectionInfo, p); + ++SectionIndex; + } else { + break; } - complete = !Iter.Valid(); } - if (complete) { - Y_VERIFY(event.EventActuallySerialized == event.EventSerializedSize, - "EventActuallySerialized# %" PRIu32 " EventSerializedSize# %" PRIu32 " Type# 0x%08" PRIx32, - event.EventActuallySerialized, event.EventSerializedSize, event.Descr.Type); + + if (XdcData.empty()) { + return false; } - Y_VERIFY_DEBUG(part.Size); - task.WriteBookmark(std::exchange(partBookmark, {}), &part, sizeof(part)); - *weightConsumed += sizeof(TChannelPart) + part.Size; - OutputQueueSize -= part.Size; - if (complete) { - State = EState::DESCRIPTOR; + TChannelPart part{ + .ChannelFlags = static_cast<ui16>(ChannelId | TChannelPart::XdcFlag), + .Size = static_cast<ui16>(XdcData.size()) + }; + task.Write(false, &part, sizeof(part)); + task.Write(false, XdcData.data(), XdcData.size()); + XdcData.clear(); + + if (SectionIndex == SerializationInfo->Sections.size()) { + State = EState::BODY; } + break; } + } + } + } - case EState::DESCRIPTOR: - if (!FeedDescriptor(task, event, weightConsumed)) { - return false; - } - event.Serial = serial; - NotYetConfirmed.splice(NotYetConfirmed.end(), Queue, Queue.begin()); // move event to not-yet-confirmed queue - SerializationInfoContainer = {}; - SerializationInfo = nullptr; - State = EState::INITIAL; - return true; // we have processed whole event, signal to the caller + bool TEventOutputChannel::SerializeEvent(TTcpPacketOutTask& task, TEventHolder& event, bool external, size_t *bytesSerialized) { + auto addChunk = [&](const void *data, size_t len, bool allowCopy) { + event.UpdateChecksum(data, len); + if (allowCopy && (reinterpret_cast<uintptr_t>(data) & 63) + len <= 2 * 64) { + task.Write(external, data, len); + } else { + task.Append(external, data, len); } + *bytesSerialized += len; + + event.EventActuallySerialized += len; + if (event.EventActuallySerialized > MaxSerializedEventSize) { + throw TExSerializedEventTooLarge(event.Descr.Type); + } + }; + + bool complete = false; + if (event.Event) { + while (!complete) { + TMutableContiguousSpan out = task.AcquireSpanForWriting(external); + if (!out.size()) { + break; + } + const auto [first, last] = Chunker.FeedBuf(out.data(), out.size()); + for (auto p = first; p != last; ++p) { + addChunk(p->first, p->second, false); + } + complete = Chunker.IsComplete(); + if (complete) { + Y_VERIFY(Chunker.IsSuccessfull()); + } + } + } else if (event.Buffer) { + while (const size_t numb = Min(external ? task.GetExternalFreeAmount() : task.GetInternalFreeAmount(), + Iter.ContiguousSize())) { + const char *obuf = Iter.ContiguousData(); + addChunk(obuf, numb, true); + Iter += numb; + } + complete = !Iter.Valid(); + } else { + Y_FAIL(); + } + if (complete) { + Y_VERIFY(event.EventActuallySerialized == event.EventSerializedSize, + "EventActuallySerialized# %" PRIu32 " EventSerializedSize# %" PRIu32 " Type# 0x%08" PRIx32, + event.EventActuallySerialized, event.EventSerializedSize, event.Descr.Type); + } + + return complete; + } + + bool TEventOutputChannel::FeedPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) { + return EventInExternalDataChannel + ? FeedExternalPayload(task, event, weightConsumed) + : FeedInlinePayload(task, event, weightConsumed); + } + + bool TEventOutputChannel::FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) { + if (task.GetInternalFreeAmount() <= sizeof(TChannelPart)) { + return false; + } + + auto partBookmark = task.Bookmark(sizeof(TChannelPart)); + + size_t bytesSerialized = 0; + const bool complete = SerializeEvent(task, event, false, &bytesSerialized); + + Y_VERIFY_DEBUG(bytesSerialized); + Y_VERIFY(bytesSerialized <= Max<ui16>()); + + TChannelPart part{ + .ChannelFlags = ChannelId, + .Size = static_cast<ui16>(bytesSerialized) + }; + + task.WriteBookmark(std::exchange(partBookmark, {}), &part, sizeof(part)); + *weightConsumed += sizeof(TChannelPart) + part.Size; + OutputQueueSize -= part.Size; + + return complete; + } + + bool TEventOutputChannel::FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) { + const size_t partSize = sizeof(TChannelPart) + sizeof(ui8) + sizeof(ui16) + (Params.Encryption ? 0 : sizeof(ui32)); + if (task.GetInternalFreeAmount() < partSize || task.GetExternalFreeAmount() == 0) { + return false; + } + + auto partBookmark = task.Bookmark(partSize); + + size_t bytesSerialized = 0; + const bool complete = SerializeEvent(task, event, true, &bytesSerialized); + + Y_VERIFY(0 < bytesSerialized && bytesSerialized <= Max<ui16>()); + + char buffer[partSize]; + TChannelPart *part = reinterpret_cast<TChannelPart*>(buffer); + *part = { + .ChannelFlags = static_cast<ui16>(ChannelId | TChannelPart::XdcFlag), + .Size = static_cast<ui16>(partSize - sizeof(TChannelPart)) + }; + char *ptr = reinterpret_cast<char*>(part + 1); + *ptr++ = static_cast<ui8>(EXdcCommand::PUSH_DATA); + *reinterpret_cast<ui16*>(ptr) = bytesSerialized; + ptr += sizeof(ui16); + if (!Params.Encryption) { + ui32 checksum = 0; + task.XdcStream.ScanLastBytes(bytesSerialized, [&](TContiguousSpan span) { + checksum = Crc32cExtendMSanCompatible(checksum, span.data(), span.size()); + }); + *reinterpret_cast<ui32*>(ptr) = checksum; } + + task.WriteBookmark(std::move(partBookmark), buffer, partSize); + + *weightConsumed += partSize + bytesSerialized; + OutputQueueSize -= bytesSerialized; + + return complete; } void TEventOutputChannel::NotifyUndelivered() { LOG_DEBUG_IC_SESSION("ICOCH89", "Notyfying about Undelivered messages! NotYetConfirmed size: %zu, Queue size: %zu", NotYetConfirmed.size(), Queue.size()); - if (State == EState::CHUNKER) { + if (State == EState::BODY && Queue.front().Event) { Y_VERIFY(!Chunker.IsComplete()); // chunk must have an event being serialized Y_VERIFY(!Queue.empty()); // this event must be the first event in queue TEventHolder& event = Queue.front(); diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h index 7612c31c76..48074b05b9 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.h +++ b/library/cpp/actors/interconnect/interconnect_channel.h @@ -17,20 +17,37 @@ namespace NActors { #pragma pack(push, 1) + struct TChannelPart { - ui16 Channel; + ui16 ChannelFlags; ui16 Size; - static constexpr ui16 LastPartFlag = ui16(1) << 15; + static constexpr ui16 LastPartFlag = 0x8000; + static constexpr ui16 XdcFlag = 0x4000; + static constexpr ui16 ChannelMask = (1 << IEventHandle::ChannelBits) - 1; + + static_assert((LastPartFlag & ChannelMask) == 0); + static_assert((XdcFlag & ChannelMask) == 0); + + ui16 GetChannel() const { return ChannelFlags & ChannelMask; } + bool IsLastPart() const { return ChannelFlags & LastPartFlag; } + bool IsXdc() const { return ChannelFlags & XdcFlag; } TString ToString() const { - return TStringBuilder() << "{Channel# " << (Channel & ~LastPartFlag) - << " LastPartFlag# " << ((Channel & LastPartFlag) ? "true" : "false") + return TStringBuilder() << "{Channel# " << GetChannel() + << " IsLastPart# " << IsLastPart() + << " IsXdc# " << IsXdc() << " Size# " << Size << "}"; } }; + #pragma pack(pop) + enum class EXdcCommand : ui8 { + DECLARE_SECTION = 1, + PUSH_DATA, + }; + struct TExSerializedEventTooLarge : std::exception { const ui32 Type; @@ -101,9 +118,9 @@ namespace NActors { enum class EState { INITIAL, - CHUNKER, - BUFFER, + BODY, DESCRIPTOR, + SECTIONS, }; EState State = EState::INITIAL; @@ -116,6 +133,15 @@ namespace NActors { TCoroutineChunkSerializer Chunker; TEventSerializationInfo SerializationInfoContainer; const TEventSerializationInfo *SerializationInfo = nullptr; + bool EventInExternalDataChannel; + size_t SectionIndex = 0; + std::vector<char> XdcData; + + bool SerializeEvent(TTcpPacketOutTask& task, TEventHolder& event, bool external, size_t *bytesSerialized); + + bool FeedPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); + bool FeedInlinePayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); + bool FeedExternalPayload(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); bool FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); diff --git a/library/cpp/actors/interconnect/interconnect_stream.cpp b/library/cpp/actors/interconnect/interconnect_stream.cpp index ad46453acb..96ee13a3f5 100644 --- a/library/cpp/actors/interconnect/interconnect_stream.cpp +++ b/library/cpp/actors/interconnect/interconnect_stream.cpp @@ -1,5 +1,6 @@ #include "interconnect_stream.h" #include "logging.h" +#include "poller_actor.h" #include <library/cpp/openssl/init/init.h> #include <util/network/socket.h> #include <openssl/ssl.h> @@ -209,6 +210,10 @@ namespace NInterconnect { return res; } + void TStreamSocket::Request(NActors::TPollerToken& token, bool read, bool write) { + token.Request(read, write); + } + ////////////////////////////////////////////////////// TDatagramSocket::TPtr TDatagramSocket::Make(int domain) { @@ -478,6 +483,9 @@ namespace NInterconnect { std::optional<std::pair<const void*, size_t>> BlockedSend; ssize_t Send(const void* msg, size_t len, TString *err) { + if (BlockedSend && BlockedSend->first == msg && BlockedSend->second < len) { + len = BlockedSend->second; + } Y_VERIFY(!BlockedSend || *BlockedSend == std::make_pair(msg, len)); const ssize_t res = Operate(msg, len, &SSL_write_ex, err); if (res == -EAGAIN) { @@ -491,6 +499,9 @@ namespace NInterconnect { std::optional<std::pair<void*, size_t>> BlockedReceive; ssize_t Recv(void* msg, size_t len, TString *err) { + if (BlockedReceive && BlockedReceive->first == msg && BlockedReceive->second < len) { + len = BlockedReceive->second; + } Y_VERIFY(!BlockedReceive || *BlockedReceive == std::make_pair(msg, len)); const ssize_t res = Operate(msg, len, &SSL_read_ex, err); if (res == -EAGAIN) { @@ -628,4 +639,7 @@ namespace NInterconnect { return Impl->WantWrite(); } + void TSecureSocket::Request(NActors::TPollerToken& token, bool /*read*/, bool /*write*/) { + token.Request(WantRead(), WantWrite()); + } } diff --git a/library/cpp/actors/interconnect/interconnect_stream.h b/library/cpp/actors/interconnect/interconnect_stream.h index 3ba7914f77..55438fef10 100644 --- a/library/cpp/actors/interconnect/interconnect_stream.h +++ b/library/cpp/actors/interconnect/interconnect_stream.h @@ -14,6 +14,10 @@ #include <sys/uio.h> +namespace NActors { + class TPollerToken; +} + namespace NInterconnect { class TSocket: public NActors::TSharedDescriptor, public TNonCopyable { protected: @@ -63,6 +67,8 @@ namespace NInterconnect { void SetSendBufferSize(i32 len) const; ui32 GetSendBufferSize() const; + + virtual void Request(NActors::TPollerToken& token, bool read, bool write); }; class TSecureSocketContext { @@ -114,6 +120,7 @@ namespace NInterconnect { bool WantRead() const; bool WantWrite() const; + virtual void Request(NActors::TPollerToken& token, bool read, bool write) override; }; class TDatagramSocket: public TSocket { diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index 34770133ae..925dbef823 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -6,6 +6,67 @@ namespace NActors { LWTRACE_USING(ACTORLIB_PROVIDER); + void TReceiveContext::TPerChannelContext::CalculateBytesToCatch() { + XdcBytesToCatch = FetchOffset; + for (auto it = XdcBuffers.begin(), end = it + FetchIndex; it != end; ++it) { + XdcBytesToCatch += it->size(); + } + } + + void TReceiveContext::TPerChannelContext::FetchBuffers(ui16 channel, size_t numBytes, + std::deque<std::tuple<ui16, TMutableContiguousSpan>>& outQ) { + Y_VERIFY_DEBUG(numBytes); + auto it = XdcBuffers.begin() + FetchIndex; + for (;;) { + Y_VERIFY_DEBUG(it != XdcBuffers.end()); + const TMutableContiguousSpan span = it->SubSpan(FetchOffset, numBytes); + outQ.emplace_back(channel, span); + numBytes -= span.size(); + FetchOffset += span.size(); + if (FetchOffset == it->size()) { + ++FetchIndex; + ++it; + FetchOffset = 0; + } + if (!numBytes) { + break; + } + } + } + + void TReceiveContext::TPerChannelContext::DropFront(TRope *from, size_t numBytes) { + size_t n = numBytes; + for (auto& pendingEvent : PendingEvents) { + const size_t numBytesInEvent = Min(n, pendingEvent.XdcSizeLeft); + pendingEvent.XdcSizeLeft -= numBytesInEvent; + n -= numBytesInEvent; + if (!n) { + break; + } + } + + while (numBytes) { + Y_VERIFY_DEBUG(!XdcBuffers.empty()); + auto& front = XdcBuffers.front(); + if (from) { + from->ExtractFrontPlain(front.data(), Min(numBytes, front.size())); + } + if (numBytes < front.size()) { + front = front.SubSpan(numBytes, Max<size_t>()); + if (!FetchIndex) { // we are sending this very buffer, adjust sending offset + Y_VERIFY_DEBUG(numBytes <= FetchOffset); + FetchOffset -= numBytes; + } + break; + } else { + numBytes -= front.size(); + Y_VERIFY_DEBUG(FetchIndex); + --FetchIndex; + XdcBuffers.pop_front(); + } + } + } + TInputSessionTCP::TInputSessionTCP(const TActorId& sessionId, TIntrusivePtr<NInterconnect::TStreamSocket> socket, TIntrusivePtr<NInterconnect::TStreamSocket> xdcSocket, TIntrusivePtr<TReceiveContext> context, TInterconnectProxyCommon::TPtr common, std::shared_ptr<IInterconnectMetrics> metrics, ui32 nodeId, @@ -24,8 +85,7 @@ namespace NActors { Y_VERIFY(Context); Y_VERIFY(Socket); Y_VERIFY(SessionId); - - AtomicSet(Context->PacketsReadFromSocket, 0); + Y_VERIFY(!Params.UseExternalDataChannel == !XdcSocket); Metrics->SetClockSkewMicrosec(0); @@ -34,6 +94,16 @@ namespace NActors { // ensure that we do not spawn new session while the previous one is still alive TAtomicBase sessions = AtomicIncrement(Context->NumInputSessions); Y_VERIFY(sessions == 1, "sessions# %" PRIu64, ui64(sessions)); + + // calculate number of bytes to catch + for (auto& context : Context->ChannelArray) { + context.CalculateBytesToCatch(); + } + for (auto& [channel, context] : Context->ChannelMap) { + context.CalculateBytesToCatch(); + } + + UsageHisto.fill(0); } void TInputSessionTCP::Bootstrap() { @@ -41,7 +111,28 @@ namespace NActors { Become(&TThis::WorkingState, DeadPeerTimeout, new TEvCheckDeadPeer); LOG_DEBUG_IC_SESSION("ICIS01", "InputSession created"); LastReceiveTimestamp = TActivationContext::Monotonic(); - ReceiveData(); + TActivationContext::Send(new IEventHandle(EvResumeReceiveData, 0, SelfId(), {}, nullptr, 0)); + } + + STATEFN(TInputSessionTCP::WorkingState) { + std::unique_ptr<IEventBase> termEv; + + try { + WorkingStateImpl(ev); + } catch (const TExReestablishConnection& ex) { + LOG_DEBUG_IC_SESSION("ICIS09", "ReestablishConnection, reason# %s", ex.Reason.ToString().data()); + termEv = std::make_unique<TEvSocketDisconnect>(std::move(ex.Reason)); + } catch (const TExDestroySession& ex) { + LOG_DEBUG_IC_SESSION("ICIS13", "DestroySession, reason# %s", ex.Reason.ToString().data()); + termEv.reset(TInterconnectSessionTCP::NewEvTerminate(std::move(ex.Reason))); + } + + if (termEv) { + AtomicDecrement(Context->NumInputSessions); + Send(SessionId, termEv.release()); + PassAway(); + Socket.Reset(); + } } void TInputSessionTCP::CloseInputSession() { @@ -50,32 +141,42 @@ namespace NActors { } void TInputSessionTCP::Handle(TEvPollerReady::TPtr ev) { - if (Context->ReadPending) { + auto *msg = ev->Get(); + + bool useful = false; + bool writeBlocked = false; + + if (msg->Socket == Socket) { + useful = std::exchange(Context->MainReadPending, false); + writeBlocked = Context->MainWriteBlocked; + } else if (msg->Socket == XdcSocket) { + useful = std::exchange(Context->XdcReadPending, false); + writeBlocked = Context->XdcWriteBlocked; + } + + if (useful) { Metrics->IncUsefulReadWakeups(); } else if (!ev->Cookie) { Metrics->IncSpuriousReadWakeups(); } - Context->ReadPending = false; + ReceiveData(); - if (Params.Encryption && Context->WriteBlockedByFullSendBuffer && !ev->Cookie) { - Send(SessionId, ev->Release().Release(), 0, 1); + + if (Params.Encryption && writeBlocked && ev->Sender != SessionId) { + Send(SessionId, ev->Release().Release()); } } void TInputSessionTCP::Handle(TEvPollerRegisterResult::TPtr ev) { - const auto& sk = ev->Get()->Socket; - if (auto *token = sk == Socket ? &PollerToken : sk == XdcSocket ? &XdcPollerToken : nullptr) { - *token = std::move(ev->Get()->PollerToken); - } else { - return; + auto *msg = ev->Get(); + if (msg->Socket == Socket) { + PollerToken = std::move(msg->PollerToken); + } else if (msg->Socket == XdcSocket) { + XdcPollerToken = std::move(msg->PollerToken); } ReceiveData(); } - void TInputSessionTCP::HandleResumeReceiveData() { - ReceiveData(); - } - void TInputSessionTCP::ReceiveData() { TTimeLimit limit(GetMaxCyclesPerEvent()); ui64 numDataBytes = 0; @@ -83,35 +184,46 @@ namespace NActors { LOG_DEBUG_IC_SESSION("ICIS02", "ReceiveData called"); bool enoughCpu = true; - for (int iteration = 0; Socket; ++iteration) { - if (iteration && limit.CheckExceeded()) { + bool progress = false; + + for (;;) { + if (progress && limit.CheckExceeded()) { // we have hit processing time limit for this message, send notification to resume processing a bit later - Send(SelfId(), new TEvResumeReceiveData); + TActivationContext::Send(new IEventHandle(EvResumeReceiveData, 0, SelfId(), {}, nullptr, 0)); enoughCpu = false; break; } + // clear iteration progress + progress = false; + + // try to process already fetched part from IncomingData switch (State) { case EState::HEADER: if (IncomingData.GetSize() < sizeof(TTcpPacketHeader_v2)) { break; } else { ProcessHeader(); + progress = true; + continue; } - continue; case EState::PAYLOAD: + Y_VERIFY_DEBUG(PayloadSize); if (!IncomingData) { break; } else { - ProcessPayload(numDataBytes); + ProcessPayload(&numDataBytes); + progress = true; + continue; } - continue; } - // if we have reached this point, it means that we do not have enough data in read buffer; try to obtain some - if (!ReadMore()) { - // we have no data from socket, so we have some free time to spend -- preallocate buffers using this time + // try to read more data into buffers + progress |= ReadMore(); + progress |= ReadXdc(&numDataBytes); + + if (!progress) { // no progress was made during this iteration PreallocateBuffers(); break; } @@ -183,12 +295,12 @@ namespace NActors { Checksum = Crc32cExtendMSanCompatible(0, &header, sizeof(header)); // start calculating checksum now if (!PayloadSize && Checksum != ChecksumExpected) { LOG_ERROR_IC_SESSION("ICIS10", "payload checksum error"); - return ReestablishConnection(TDisconnectReason::ChecksumError()); + throw TExReestablishConnection{TDisconnectReason::ChecksumError()}; } } if (PayloadSize >= 65536) { LOG_CRIT_IC_SESSION("ICIS07", "payload is way too big"); - return DestroySession(TDisconnectReason::FormatError()); + throw TExDestroySession{TDisconnectReason::FormatError()}; } if (ConfirmedByInput < confirm) { ConfirmedByInput = confirm; @@ -205,13 +317,22 @@ namespace NActors { } } if (PayloadSize) { - const ui64 expected = Context->GetLastProcessedPacketSerial() + 1; + const ui64 expected = Context->LastProcessedSerial + 1; if (serial == 0 || serial > expected) { LOG_CRIT_IC_SESSION("ICIS06", "packet serial %" PRIu64 ", but %" PRIu64 " expected", serial, expected); - return DestroySession(TDisconnectReason::FormatError()); + throw TExDestroySession{TDisconnectReason::FormatError()}; + } + if (Context->LastProcessedSerial <= serial) { + XdcCatchStreamFinalPending = true; // we can't switch it right now, only after packet is fully processed + } + if (serial == expected) { + InboundPacketQ.push_back(TInboundPacket{serial, 0}); + IgnorePayload = false; + } else { + IgnorePayload = true; } - IgnorePayload = serial != expected; State = EState::PAYLOAD; + Y_VERIFY_DEBUG(!Payload); } else if (serial & TTcpPacketBuf::PingRequestMask) { Send(SessionId, new TEvProcessPingRequest(serial & ~TTcpPacketBuf::PingRequestMask)); } else if (serial & TTcpPacketBuf::PingResponseMask) { @@ -221,62 +342,67 @@ namespace NActors { } else if (serial & TTcpPacketBuf::ClockMask) { HandleClock(TInstant::MicroSeconds(serial & ~TTcpPacketBuf::ClockMask)); } + if (!PayloadSize) { + ++PacketsReadFromSocket; + } } - void TInputSessionTCP::ProcessPayload(ui64& numDataBytes) { + void TInputSessionTCP::ProcessPayload(ui64 *numDataBytes) { const size_t numBytes = Min(PayloadSize, IncomingData.GetSize()); IncomingData.ExtractFront(numBytes, &Payload); - numDataBytes += numBytes; + *numDataBytes += numBytes; PayloadSize -= numBytes; if (PayloadSize) { return; // there is still some data to receive in the Payload rope } - State = EState::HEADER; // we'll continue with header next time + State = EState::HEADER; if (!Params.Encryption) { // see if we are checksumming packet body for (const auto&& [data, size] : Payload) { Checksum = Crc32cExtendMSanCompatible(Checksum, data, size); } if (Checksum != ChecksumExpected) { // validate payload checksum LOG_ERROR_IC_SESSION("ICIS04", "payload checksum error"); - return ReestablishConnection(TDisconnectReason::ChecksumError()); + throw TExReestablishConnection{TDisconnectReason::ChecksumError()}; } } - if (Y_UNLIKELY(IgnorePayload)) { - return; - } - if (!Context->AdvanceLastProcessedPacketSerial()) { - return DestroySession(TDisconnectReason::NewSession()); - } - - while (Payload && Socket) { + while (Payload) { // extract channel part header from the payload stream TChannelPart part; if (!Payload.ExtractFrontPlain(&part, sizeof(part))) { LOG_CRIT_IC_SESSION("ICIS14", "missing TChannelPart header in payload"); - return DestroySession(TDisconnectReason::FormatError()); - } - if (!part.Size) { // bogus frame - continue; + throw TExDestroySession{TDisconnectReason::FormatError()}; } else if (Payload.GetSize() < part.Size) { LOG_CRIT_IC_SESSION("ICIS08", "payload format error ChannelPart# %s", part.ToString().data()); - return DestroySession(TDisconnectReason::FormatError()); + throw TExDestroySession{TDisconnectReason::FormatError()}; } - const ui16 channel = part.Channel & ~TChannelPart::LastPartFlag; - TRope *eventData = channel < Context->ChannelArray.size() - ? &Context->ChannelArray[channel] - : &Context->ChannelMap[channel]; + const ui16 channel = part.GetChannel(); + auto& context = GetPerChannelContext(channel); + auto& pendingEvent = context.PendingEvents.empty() || context.PendingEvents.back().EventData + ? context.PendingEvents.emplace_back() + : context.PendingEvents.back(); + + if (part.IsXdc()) { // external data channel command packet + XdcCommands.resize(part.Size); + const bool success = Payload.ExtractFrontPlain(XdcCommands.data(), XdcCommands.size()); + Y_VERIFY(success); + ProcessXdcCommand(channel, context); + } else if (IgnorePayload) { // throw payload out + Payload.EraseFront(part.Size); + } else if (!part.IsLastPart()) { // just ordinary inline event data + Payload.ExtractFront(part.Size, &pendingEvent.Payload); + } else { // event final block + TEventDescr2 v2; + + if (part.Size != sizeof(v2)) { + LOG_CRIT_IC_SESSION("ICIS11", "incorrect last part of an event"); + throw TExDestroySession{TDisconnectReason::FormatError()}; + } - Metrics->AddInputChannelsIncomingTraffic(channel, sizeof(part) + part.Size); + const bool success = Payload.ExtractFrontPlain(&v2, sizeof(v2)); + Y_VERIFY(success); - TEventDescr2 v2; - if (~part.Channel & TChannelPart::LastPartFlag) { - Payload.ExtractFront(part.Size, eventData); - } else if (part.Size != sizeof(v2)) { - LOG_CRIT_IC_SESSION("ICIS11", "incorrect last part of an event"); - return DestroySession(TDisconnectReason::FormatError()); - } else if (Payload.ExtractFrontPlain(&v2, part.Size)) { - TEventData descr = { + pendingEvent.EventData = TEventData{ v2.Type, v2.Flags, v2.Recipient, @@ -287,44 +413,152 @@ namespace NActors { }; Metrics->IncInputChannelsIncomingEvents(channel); - ProcessEvent(*eventData, descr); - *eventData = TRope(); - } else { - Y_FAIL(); + ProcessEvents(context); } + + Metrics->AddInputChannelsIncomingTraffic(channel, sizeof(part) + part.Size); } + + // mark packet as processed + ProcessInboundPacketQ(0); + XdcCatchStreamFinal = XdcCatchStreamFinalPending; + Context->LastProcessedSerial += !IgnorePayload; + + ++PacketsReadFromSocket; + ++DataPacketsReadFromSocket; + IgnoredDataPacketsFromSocket += IgnorePayload; } - void TInputSessionTCP::ProcessEvent(TRope& data, TEventData& descr) { - if (descr.Checksum) { - ui32 checksum = 0; - for (const auto&& [data, size] : data) { - checksum = Crc32cExtendMSanCompatible(checksum, data, size); - } - if (checksum != descr.Checksum) { - LOG_CRIT_IC_SESSION("ICIS05", "event checksum error"); - return ReestablishConnection(TDisconnectReason::ChecksumError()); - } - } - TEventSerializationInfo serializationInfo{ - .IsExtendedFormat = bool(descr.Flags & IEventHandle::FlagExtendedFormat), - }; - auto ev = std::make_unique<IEventHandle>(SessionId, - descr.Type, - descr.Flags & ~IEventHandle::FlagExtendedFormat, - descr.Recipient, - descr.Sender, - MakeIntrusive<TEventSerializedData>(std::move(data), std::move(serializationInfo)), - descr.Cookie, - Params.PeerScopeId, - std::move(descr.TraceId)); - if (Common->EventFilter && !Common->EventFilter->CheckIncomingEvent(*ev, Common->LocalScopeId)) { - LOG_CRIT_IC_SESSION("ICIC03", "Event dropped due to scope error LocalScopeId# %s PeerScopeId# %s Type# 0x%08" PRIx32, - ScopeIdToString(Common->LocalScopeId).data(), ScopeIdToString(Params.PeerScopeId).data(), descr.Type); - ev.reset(); - } - if (ev) { - TActivationContext::Send(ev.release()); + void TInputSessionTCP::ProcessInboundPacketQ(size_t numXdcBytesRead) { + for (; !InboundPacketQ.empty(); InboundPacketQ.pop_front()) { + auto& front = InboundPacketQ.front(); + + const size_t n = Min(numXdcBytesRead, front.XdcUnreadBytes); + front.XdcUnreadBytes -= n; + numXdcBytesRead -= n; + + if (front.XdcUnreadBytes) { // we haven't finished this packet yet + Y_VERIFY(!numXdcBytesRead); + break; + } + + if (!Context->AdvanceLastPacketSerialToConfirm(front.Serial)) { + throw TExDestroySession{TDisconnectReason::NewSession()}; + } + } + } + + void TInputSessionTCP::ProcessXdcCommand(ui16 channel, TReceiveContext::TPerChannelContext& context) { + const char *ptr = XdcCommands.data(); + const char *end = ptr + XdcCommands.size(); + while (ptr != end) { + switch (static_cast<EXdcCommand>(*ptr++)) { + case EXdcCommand::DECLARE_SECTION: { + // extract and validate command parameters + const ui64 headroom = NInterconnect::NDetail::DeserializeNumber(&ptr, end); + const ui64 size = NInterconnect::NDetail::DeserializeNumber(&ptr, end); + const ui64 tailroom = NInterconnect::NDetail::DeserializeNumber(&ptr, end); + const ui64 alignment = NInterconnect::NDetail::DeserializeNumber(&ptr, end); + if (headroom == Max<ui64>() || size == Max<ui64>() || tailroom == Max<ui64>() || alignment == Max<ui64>()) { + LOG_CRIT_IC_SESSION("ICIS00", "XDC command format error"); + throw TExDestroySession{TDisconnectReason::FormatError()}; + } + + if (!IgnorePayload) { // process command if packet is being applied + // allocate buffer and push it into the payload + auto& pendingEvent = context.PendingEvents.back(); + pendingEvent.SerializationInfo.Sections.push_back(TEventSectionInfo{headroom, size, tailroom, alignment}); + auto buffer = TRcBuf::Uninitialized(size, headroom, tailroom); + if (size) { + context.XdcBuffers.push_back(buffer.GetContiguousSpanMut()); + } + pendingEvent.Payload.Insert(pendingEvent.Payload.End(), TRope(std::move(buffer))); + pendingEvent.XdcSizeLeft += size; + + ++XdcSections; + } + continue; + } + + case EXdcCommand::PUSH_DATA: { + const size_t cmdLen = sizeof(ui16) + (Params.Encryption ? 0 : sizeof(ui32)); + if (static_cast<size_t>(end - ptr) < cmdLen) { + LOG_CRIT_IC_SESSION("ICIS18", "XDC command format error"); + throw TExDestroySession{TDisconnectReason::FormatError()}; + } + + auto size = *reinterpret_cast<const ui16*>(ptr); + if (!size) { + LOG_CRIT_IC_SESSION("ICIS03", "XDC empty payload"); + throw TExDestroySession{TDisconnectReason::FormatError()}; + } + + if (!Params.Encryption) { + const ui32 checksumExpected = *reinterpret_cast<const ui32*>(ptr + sizeof(ui16)); + XdcChecksumQ.emplace_back(size, checksumExpected); + } + + if (IgnorePayload) { + // this packet was already marked as 'processed', all commands had been executed, but we must + // parse XDC stream correctly + XdcCatchStreamBytesPending += size; + XdcCatchStreamMarkup.emplace_back(channel, size); + } else { + // account channel and number of bytes in XDC for this packet + auto& packet = InboundPacketQ.back(); + packet.XdcUnreadBytes += size; + + // find buffers and acquire data buffer pointers + context.FetchBuffers(channel, size, XdcInputQ); + } + + ptr += cmdLen; + ++XdcRefs; + continue; + } + } + + LOG_CRIT_IC_SESSION("ICIS15", "unexpected XDC command"); + throw TExDestroySession{TDisconnectReason::FormatError()}; + } + } + + void TInputSessionTCP::ProcessEvents(TReceiveContext::TPerChannelContext& context) { + for (; !context.PendingEvents.empty(); context.PendingEvents.pop_front()) { + auto& pendingEvent = context.PendingEvents.front(); + if (!pendingEvent.EventData || pendingEvent.XdcSizeLeft) { + break; // event is not ready yet + } + + auto& descr = *pendingEvent.EventData; + if (descr.Checksum) { + ui32 checksum = 0; + for (const auto&& [data, size] : pendingEvent.Payload) { + checksum = Crc32cExtendMSanCompatible(checksum, data, size); + } + if (checksum != descr.Checksum) { + LOG_CRIT_IC_SESSION("ICIS05", "event checksum error Type# 0x%08" PRIx32, descr.Type); + throw TExReestablishConnection{TDisconnectReason::ChecksumError()}; + } + } + pendingEvent.SerializationInfo.IsExtendedFormat = descr.Flags & IEventHandle::FlagExtendedFormat; + auto ev = std::make_unique<IEventHandle>(SessionId, + descr.Type, + descr.Flags & ~IEventHandle::FlagExtendedFormat, + descr.Recipient, + descr.Sender, + MakeIntrusive<TEventSerializedData>(std::move(pendingEvent.Payload), std::move(pendingEvent.SerializationInfo)), + descr.Cookie, + Params.PeerScopeId, + std::move(descr.TraceId)); + if (Common->EventFilter && !Common->EventFilter->CheckIncomingEvent(*ev, Common->LocalScopeId)) { + LOG_CRIT_IC_SESSION("ICIC03", "Event dropped due to scope error LocalScopeId# %s PeerScopeId# %s Type# 0x%08" PRIx32, + ScopeIdToString(Common->LocalScopeId).data(), ScopeIdToString(Params.PeerScopeId).data(), descr.Type); + ev.reset(); + } + if (ev) { + TActivationContext::Send(ev.release()); + } } } @@ -347,39 +581,28 @@ namespace NActors { } } - bool TInputSessionTCP::ReadMore() { - PreallocateBuffers(); - - TStackVec<TIoVec, 16> buffs; - size_t offset = FirstBufferOffset; - for (const auto& item : Buffers) { - TIoVec iov{item->GetBuffer() + offset, item->GetCapacity() - offset}; - buffs.push_back(iov); - if (Params.Encryption) { - break; // do not put more than one buffer in queue to prevent using ReadV - } - offset = 0; - } - - const struct iovec* iovec = reinterpret_cast<const struct iovec*>(buffs.data()); - int iovcnt = buffs.size(); - + ssize_t TInputSessionTCP::Read(NInterconnect::TStreamSocket& socket, const TPollerToken::TPtr& token, + bool *readPending, const TIoVec *iov, size_t num) { ssize_t recvres = 0; TString err; LWPROBE_IF_TOO_LONG(SlowICReadFromSocket, ms) { do { const ui64 begin = GetCycleCountFast(); #ifndef _win_ - recvres = iovcnt == 1 ? Socket->Recv(iovec->iov_base, iovec->iov_len, &err) : Socket->ReadV(iovec, iovcnt); + if (num == 1) { + recvres = socket.Recv(iov->Data, iov->Size, &err); + } else { + recvres = socket.ReadV(reinterpret_cast<const iovec*>(iov), num); + } #else - recvres = Socket->Recv(iovec[0].iov_base, iovec[0].iov_len, &err); + recvres = socket.Recv(iov->Data, iov->Size, &err); #endif const ui64 end = GetCycleCountFast(); Metrics->IncRecvSyscalls((end - begin) * 1'000'000 / GetCyclesPerMillisecond()); } while (recvres == -EINTR); } - LOG_DEBUG_IC_SESSION("ICIS12", "ReadMore recvres# %zd iovcnt# %d err# %s", recvres, iovcnt, err.data()); + LOG_DEBUG_IC_SESSION("ICIS12", "Read recvres# %zd num# %zu err# %s", recvres, num, err.data()); if (recvres <= 0 || CloseInputSessionRequested) { if ((-recvres != EAGAIN && -recvres != EWOULDBLOCK) || CloseInputSessionRequested) { @@ -388,23 +611,41 @@ namespace NActors { : err ? err : Sprintf("socket: %s", strerror(-recvres)); LOG_NOTICE_NET(NodeId, "%s", message.data()); - ReestablishConnection(CloseInputSessionRequested ? TDisconnectReason::Debug() : - recvres == 0 ? TDisconnectReason::EndOfStream() : TDisconnectReason::FromErrno(-recvres)); - } else if (PollerToken && !std::exchange(Context->ReadPending, true)) { - if (Params.Encryption) { - auto *secure = static_cast<NInterconnect::TSecureSocket*>(Socket.Get()); - const bool wantRead = secure->WantRead(), wantWrite = secure->WantWrite(); - Y_VERIFY_DEBUG(wantRead || wantWrite); - PollerToken->Request(wantRead, wantWrite); - } else { - PollerToken->Request(true, false); - } + throw TExReestablishConnection{CloseInputSessionRequested ? TDisconnectReason::Debug() : + recvres == 0 ? TDisconnectReason::EndOfStream() : TDisconnectReason::FromErrno(-recvres)}; + } else if (token && !std::exchange(*readPending, true)) { + socket.Request(*token, true, false); + } + return -1; + } + + return recvres; + } + + bool TInputSessionTCP::ReadMore() { + PreallocateBuffers(); + + TStackVec<TIoVec, 16> buffs; + size_t offset = FirstBufferOffset; + for (const auto& item : Buffers) { + TIoVec iov{item->GetBuffer() + offset, item->GetCapacity() - offset}; + buffs.push_back(iov); + if (Params.Encryption) { + break; // do not put more than one buffer in queue to prevent using ReadV } + offset = 0; + } + + ssize_t recvres = Read(*Socket, PollerToken, &Context->MainReadPending, buffs.data(), buffs.size()); + if (recvres == -1) { return false; } Y_VERIFY(recvres > 0); Metrics->AddTotalBytesRead(recvres); + BytesReadFromSocket += recvres; + + size_t numBuffersCovered = 0; while (recvres) { Y_VERIFY(!Buffers.empty()); @@ -417,6 +658,19 @@ namespace NActors { Buffers.pop_front(); FirstBufferOffset = 0; } + ++numBuffersCovered; + } + + if (Buffers.empty()) { // we have read all the data, increase number of buffers + CurrentBuffers = Min(CurrentBuffers * 2, MaxBuffers); + } else if (++UsageHisto[numBuffersCovered - 1] == 64) { // time to shift + for (auto& value : UsageHisto) { + value /= 2; + } + while (CurrentBuffers > 1 && !UsageHisto[CurrentBuffers - 1]) { + --CurrentBuffers; + } + Y_VERIFY_DEBUG(UsageHisto[CurrentBuffers - 1]); } LastReceiveTimestamp = TActivationContext::Monotonic(); @@ -424,29 +678,178 @@ namespace NActors { return true; } - void TInputSessionTCP::PreallocateBuffers() { - // ensure that we have exactly "numBuffers" in queue - LWPROBE_IF_TOO_LONG(SlowICReadLoopAdjustSize, ms) { - while (Buffers.size() < Common->Settings.NumPreallocatedBuffers) { - Buffers.emplace_back(TRopeAlignedBuffer::Allocate(Common->Settings.PreallocatedBufferSize)); + bool TInputSessionTCP::ReadXdcCatchStream(ui64 *numDataBytes) { + bool progress = false; + + while (XdcCatchStreamBytesPending) { // read data into catch stream if we still have to + if (!XdcCatchStreamBuffer) { + XdcCatchStreamBuffer = TRcBuf::Uninitialized(64 * 1024); + } + + const size_t numBytesToRead = Min<size_t>(XdcCatchStreamBytesPending, XdcCatchStreamBuffer.size() - XdcCatchStreamBufferOffset); + + TIoVec iov{XdcCatchStreamBuffer.GetDataMut() + XdcCatchStreamBufferOffset, numBytesToRead}; + ssize_t recvres = Read(*XdcSocket, XdcPollerToken, &Context->XdcReadPending, &iov, 1); + if (recvres == -1) { + return progress; + } + + HandleXdcChecksum({XdcCatchStreamBuffer.data() + XdcCatchStreamBufferOffset, static_cast<size_t>(recvres)}); + + XdcCatchStreamBufferOffset += recvres; + XdcCatchStreamBytesPending -= recvres; + *numDataBytes += recvres; + BytesReadFromXdcSocket += recvres; + + if (XdcCatchStreamBufferOffset == XdcCatchStreamBuffer.size() || XdcCatchStreamBytesPending == 0) { + TRope(std::exchange(XdcCatchStreamBuffer, {})).ExtractFront(XdcCatchStreamBufferOffset, &XdcCatchStream); + XdcCatchStreamBufferOffset = 0; + } + + progress = true; + } + + if (XdcCatchStreamFinal && XdcCatchStream) { + // calculate total number of bytes to catch + size_t totalBytesToCatch = 0; + for (auto& context : Context->ChannelArray) { + totalBytesToCatch += context.XdcBytesToCatch; + } + for (auto& [channel, context] : Context->ChannelMap) { + totalBytesToCatch += context.XdcBytesToCatch; + } + + // calculate ignored offset + Y_VERIFY(totalBytesToCatch <= XdcCatchStream.GetSize()); + size_t bytesToIgnore = XdcCatchStream.GetSize() - totalBytesToCatch; + + // process catch stream markup + THashSet<ui16> channels; + for (auto [channel, size] : XdcCatchStreamMarkup) { + if (const size_t n = Min<size_t>(bytesToIgnore, size)) { + XdcCatchStream.EraseFront(n); + bytesToIgnore -= n; + size -= n; + } + if (const size_t n = Min<size_t>(totalBytesToCatch, size)) { + GetPerChannelContext(channel).DropFront(&XdcCatchStream, n); + channels.insert(channel); + totalBytesToCatch -= n; + size -= n; + } + Y_VERIFY(!size); } + for (ui16 channel : channels) { + ProcessEvents(GetPerChannelContext(channel)); + } + + // ensure everything was processed + Y_VERIFY(!XdcCatchStream); + Y_VERIFY(!bytesToIgnore); + Y_VERIFY(!totalBytesToCatch); + XdcCatchStreamMarkup = {}; } + + return progress; } - void TInputSessionTCP::ReestablishConnection(TDisconnectReason reason) { - LOG_DEBUG_IC_SESSION("ICIS09", "ReestablishConnection, reason# %s", reason.ToString().data()); - AtomicDecrement(Context->NumInputSessions); - Send(SessionId, new TEvSocketDisconnect(std::move(reason))); - PassAway(); - Socket.Reset(); + bool TInputSessionTCP::ReadXdc(ui64 *numDataBytes) { + bool progress = ReadXdcCatchStream(numDataBytes); + + // exit if we have no work to do + if (XdcInputQ.empty() || XdcCatchStreamBytesPending) { + return progress; + } + + TStackVec<TIoVec, 64> buffs; + size_t size = 0; + for (auto& [channel, span] : XdcInputQ) { + buffs.push_back(TIoVec{span.data(), span.size()}); + size += span.size(); + if (buffs.size() == 64 || size >= 1024 * 1024 || Params.Encryption) { + break; + } + } + + ssize_t recvres = Read(*XdcSocket, XdcPollerToken, &Context->XdcReadPending, buffs.data(), buffs.size()); + if (recvres == -1) { + return progress; + } + + // calculate stream checksums + { + size_t bytesToChecksum = recvres; + for (const auto& iov : buffs) { + const size_t n = Min(bytesToChecksum, iov.Size); + HandleXdcChecksum({static_cast<const char*>(iov.Data), n}); + bytesToChecksum -= n; + if (!bytesToChecksum) { + break; + } + } + } + + Y_VERIFY(recvres > 0); + Metrics->AddTotalBytesRead(recvres); + *numDataBytes += recvres; + BytesReadFromXdcSocket += recvres; + + // cut the XdcInputQ deque + for (size_t bytesToCut = recvres; bytesToCut; ) { + Y_VERIFY(!XdcInputQ.empty()); + auto& [channel, span] = XdcInputQ.front(); + size_t n = Min(bytesToCut, span.size()); + bytesToCut -= n; + if (n == span.size()) { + XdcInputQ.pop_front(); + } else { + span = span.SubSpan(n, Max<size_t>()); + Y_VERIFY(!bytesToCut); + } + + Y_VERIFY_DEBUG(n); + auto& context = GetPerChannelContext(channel); + context.DropFront(nullptr, n); + ProcessEvents(context); + } + + // drop fully processed inbound packets + ProcessInboundPacketQ(recvres); + + LastReceiveTimestamp = TActivationContext::Monotonic(); + + return true; } - void TInputSessionTCP::DestroySession(TDisconnectReason reason) { - LOG_DEBUG_IC_SESSION("ICIS13", "DestroySession, reason# %s", reason.ToString().data()); - AtomicDecrement(Context->NumInputSessions); - Send(SessionId, TInterconnectSessionTCP::NewEvTerminate(std::move(reason))); - PassAway(); - Socket.Reset(); + void TInputSessionTCP::HandleXdcChecksum(TContiguousSpan span) { + if (Params.Encryption) { + return; + } + while (span.size()) { + Y_VERIFY_DEBUG(!XdcChecksumQ.empty()); + auto& [size, expected] = XdcChecksumQ.front(); + const size_t n = Min<size_t>(size, span.size()); + XdcCurrentChecksum = Crc32cExtendMSanCompatible(XdcCurrentChecksum, span.data(), n); + span = span.SubSpan(n, Max<size_t>()); + size -= n; + if (!size) { + if (XdcCurrentChecksum != expected) { + LOG_ERROR_IC_SESSION("ICIS16", "payload checksum error"); + throw TExReestablishConnection{TDisconnectReason::ChecksumError()}; + } + XdcChecksumQ.pop_front(); + XdcCurrentChecksum = 0; + } + } + } + + void TInputSessionTCP::PreallocateBuffers() { + // ensure that we have exactly "numBuffers" in queue + LWPROBE_IF_TOO_LONG(SlowICReadLoopAdjustSize, ms) { + while (Buffers.size() < CurrentBuffers) { + Buffers.emplace_back(TRopeAlignedBuffer::Allocate(Common->Settings.PreallocatedBufferSize)); + } + } } void TInputSessionTCP::PassAway() { @@ -460,7 +863,7 @@ namespace NActors { ReceiveData(); if (Socket && now >= LastReceiveTimestamp + DeadPeerTimeout) { // nothing has changed, terminate session - DestroySession(TDisconnectReason::DeadPeer()); + throw TExDestroySession{TDisconnectReason::DeadPeer()}; } } Schedule(LastReceiveTimestamp + DeadPeerTimeout, new TEvCheckDeadPeer); @@ -496,5 +899,66 @@ namespace NActors { Metrics->SetClockSkewMicrosec(clockSkew); } + TReceiveContext::TPerChannelContext& TInputSessionTCP::GetPerChannelContext(ui16 channel) const { + return channel < std::size(Context->ChannelArray) + ? Context->ChannelArray[channel] + : Context->ChannelMap[channel]; + } + + void TInputSessionTCP::GenerateHttpInfo(NMon::TEvHttpInfoRes::TPtr ev) { + TStringStream str; + ev->Get()->Output(str); + + HTML(str) { + DIV_CLASS("panel panel-info") { + DIV_CLASS("panel-heading") { + str << "Input Session"; + } + DIV_CLASS("panel-body") { + TABLE_CLASS("table") { + TABLEHEAD() { + TABLER() { + TABLEH() { + str << "Sensor"; + } + TABLEH() { + str << "Value"; + } + } + } +#define MON_VAR(KEY) \ + TABLER() { \ + TABLED() { str << #KEY; } \ + TABLED() { str << (KEY); } \ + } + + TABLEBODY() { + MON_VAR(BytesReadFromSocket) + MON_VAR(PacketsReadFromSocket) + MON_VAR(DataPacketsReadFromSocket) + MON_VAR(IgnoredDataPacketsFromSocket) + + MON_VAR(BytesReadFromXdcSocket) + MON_VAR(XdcSections) + MON_VAR(XdcRefs) + + MON_VAR(PayloadSize) + MON_VAR(InboundPacketQ.size()) + MON_VAR(XdcInputQ.size()) + MON_VAR(Buffers.size()) + MON_VAR(IncomingData.GetSize()) + MON_VAR(Payload.GetSize()) + MON_VAR(CurrentBuffers) + + MON_VAR(Context->LastProcessedSerial) + MON_VAR(ConfirmedByInput) + } + } + } + } + } + + TActivationContext::Send(new IEventHandle(ev->Recipient, ev->Sender, new NMon::TEvHttpInfoRes(str.Str()))); + } } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp index 7ca2453f6c..13f2f2dd83 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp @@ -294,7 +294,7 @@ namespace NActors { Send(ev->Sender, new TEvHandshakeReplyError("duplicate serial")); return; } else if (serial == *LastSerialFromIncomingHandshake) { - LOG_NOTICE_IC("ICP15", "Handshake# %s is obsolete, serial# %" PRIu64 + LOG_NOTICE_IC("ICP00", "Handshake# %s is obsolete, serial# %" PRIu64 " LastSerialFromIncomingHandshake# %" PRIu64, ev->Sender.ToString().data(), serial, *LastSerialFromIncomingHandshake); Send(ev->Sender, new TEvents::TEvPoisonPill); @@ -731,11 +731,16 @@ namespace NActors { } } - if (Session != nullptr) { - Session->GenerateHttpInfo(str); + TAutoPtr<IEventHandle> h(new IEventHandle(ev->Sender, ev->Recipient, new NMon::TEvHttpInfoRes(str.Str()))); + if (Session) { + switch (auto& ev = h; ev->GetTypeRewrite()) { + hFunc(NMon::TEvHttpInfoRes, Session->GenerateHttpInfo); + default: + Y_FAIL(); + } + } else { + TActivationContext::Send(h.Release()); } - - Send(ev->Sender, new NMon::TEvHttpInfoRes(str.Str())); } void TInterconnectProxyTCP::TransitToErrorState(TString explanation, bool updateErrorLog) { diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h index 352310e37c..9ea7fa0c31 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h @@ -456,7 +456,7 @@ namespace NActors { ICPROXY_PROFILED; if (const TActorId& actorId = std::exchange(OutgoingHandshakeActor, TActorId())) { - LOG_DEBUG_IC("ICP112", "dropped outgoing handshake: %s poison: %s", actorId.ToString().data(), + LOG_DEBUG_IC("ICP052", "dropped outgoing handshake: %s poison: %s", actorId.ToString().data(), poison ? "true" : "false"); if (poison) { Send(actorId, new TEvents::TEvPoisonPill); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index 16cadc9e92..e8fc974433 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -162,7 +162,7 @@ namespace NActors { if (RamInQueue && !RamInQueue->Batching) { // we have pending TEvRam, so GenerateTraffic will be called no matter what - } else if (InflightDataAmount >= GetTotalInflightAmountOfData() || !Socket || ReceiveContext->WriteBlockedByFullSendBuffer) { + } else if (InflightDataAmount >= GetTotalInflightAmountOfData() || !Socket) { // we can't issue more traffic now; GenerateTraffic will be called upon unblocking } else if (TotalOutputQueueSize >= 64 * 1024) { // output queue size is quite big to issue some traffic @@ -203,7 +203,7 @@ namespace NActors { // close existing input session, if any, and do nothing upon its destruction ReestablishConnection({}, false, TDisconnectReason::NewSession()); - const ui64 lastInputSerial = ReceiveContext->LockLastProcessedPacketSerial(); + const ui64 lastInputSerial = ReceiveContext->LockLastPacketSerialToConfirm(); LOG_INFO_IC_SESSION("ICS08", "incoming handshake Self# %s Peer# %s Counter# %" PRIu64 " LastInputSerial# %" PRIu64, msg->Self.ToString().data(), msg->Peer.ToString().data(), msg->Counter, lastInputSerial); @@ -240,10 +240,17 @@ namespace NActors { LOG_INFO_IC_SESSION("ICS10", "traffic start"); + // reset parameters to initial values + WriteBlockedByFullSendBuffer = false; + ReceiveContext->MainWriteBlocked = false; + ReceiveContext->XdcWriteBlocked = false; + ReceiveContext->MainReadPending = false; + ReceiveContext->XdcReadPending = false; + // create input session actor auto actor = MakeHolder<TInputSessionTCP>(SelfId(), Socket, XdcSocket, ReceiveContext, Proxy->Common, Proxy->Metrics, Proxy->PeerNodeId, nextPacket, GetDeadPeerTimeout(), Params); - ReceiveContext->UnlockLastProcessedPacketSerial(); + ReceiveContext->ResetLastPacketSerialToConfirm(); ReceiverId = Params.Encryption ? RegisterWithSameMailbox(actor.Release()) : Register(actor.Release(), TMailboxType::ReadAsFilled); // register our socket in poller actor @@ -254,7 +261,6 @@ namespace NActors { const bool success = Send(MakePollerActorId(), new TEvPollerRegister(XdcSocket, ReceiverId, SelfId())); Y_VERIFY(success); } - ReceiveContext->WriteBlockedByFullSendBuffer = false; LostConnectionWatchdog.Disarm(); Proxy->Metrics->SetConnected(1); @@ -271,8 +277,11 @@ namespace NActors { // also reset SendQueuePos // drop confirmed packets first as we do not need unwanted retransmissions + OutgoingStream.RewindToEnd(); + XdcStream.RewindToEnd(); DropConfirmed(nextPacket, true); OutgoingStream.Rewind(); + XdcStream.Rewind(); SendQueuePos = 0; SendOffset = 0; @@ -342,6 +351,7 @@ namespace NActors { if (needConfirm && Socket) { ++ConfirmPacketsForcedBySize; MakePacket(false); + WriteData(); } for (;;) { @@ -409,24 +419,26 @@ namespace NActors { // first, we create as many data packets as we can generate under certain conditions; they include presence // of events in channels queues and in flight fitting into requested limit; after we hit one of these conditions // we exit cycle - while (Socket && NumEventsInReadyChannels && InflightDataAmount < GetTotalInflightAmountOfData() && !ReceiveContext->WriteBlockedByFullSendBuffer) { - if (generatedBytes >= generateLimit) { - // resume later but ensure that we have issued at least one packet - RamInQueue = new TEvRam(false); - Send(SelfId(), RamInQueue); - RamStartedCycles = GetCycleCountFast(); - LWPROBE(StartRam, Proxy->PeerNodeId); - break; - } + if (Socket) { + while (NumEventsInReadyChannels && InflightDataAmount < GetTotalInflightAmountOfData()) { + if (generatedBytes >= generateLimit) { + // resume later but ensure that we have issued at least one packet + RamInQueue = new TEvRam(false); + Send(SelfId(), RamInQueue); + RamStartedCycles = GetCycleCountFast(); + LWPROBE(StartRam, Proxy->PeerNodeId); + break; + } - try { - generatedBytes += MakePacket(true); - ++generatedPackets; - } catch (const TExSerializedEventTooLarge& ex) { - // terminate session if the event can't be serialized properly - accountTraffic(); - LOG_CRIT_IC("ICS31", "serialized event Type# 0x%08" PRIx32 " is too large", ex.Type); - return Terminate(TDisconnectReason::EventTooLarge()); + try { + generatedBytes += MakePacket(true); + ++generatedPackets; + } catch (const TExSerializedEventTooLarge& ex) { + // terminate session if the event can't be serialized properly + accountTraffic(); + LOG_CRIT_IC("ICS31", "serialized event Type# 0x%08" PRIx32 " is too large", ex.Type); + return Terminate(TDisconnectReason::EventTooLarge()); + } } } @@ -444,7 +456,7 @@ namespace NActors { void TInterconnectSessionTCP::StartHandshake() { LOG_INFO_IC_SESSION("ICS15", "start handshake"); - IActor::InvokeOtherActor(*Proxy, &TInterconnectProxyTCP::StartResumeHandshake, ReceiveContext->LockLastProcessedPacketSerial()); + IActor::InvokeOtherActor(*Proxy, &TInterconnectProxyTCP::StartResumeHandshake, ReceiveContext->LockLastPacketSerialToConfirm()); } void TInterconnectSessionTCP::ReestablishConnectionWithHandshake(TDisconnectReason reason) { @@ -515,161 +527,179 @@ namespace NActors { void TInterconnectSessionTCP::Handle(TEvPollerReady::TPtr& ev) { LOG_DEBUG_IC_SESSION("ICS29", "HandleReadyWrite WriteBlockedByFullSendBuffer# %s", - ReceiveContext->WriteBlockedByFullSendBuffer ? "true" : "false"); - if (std::exchange(ReceiveContext->WriteBlockedByFullSendBuffer, false)) { + WriteBlockedByFullSendBuffer ? "true" : "false"); + + auto *msg = ev->Get(); + bool useful = false; + bool readPending = false; + + if (msg->Socket == Socket) { + useful = ReceiveContext->MainWriteBlocked; + readPending = ReceiveContext->MainReadPending; + } else if (msg->Socket == XdcSocket) { + useful = ReceiveContext->XdcWriteBlocked; + readPending = ReceiveContext->XdcReadPending; + } + + if (useful) { Proxy->Metrics->IncUsefulWriteWakeups(); - ui64 nowCycles = GetCycleCountFast(); - double blockedUs = NHPTimer::GetSeconds(nowCycles - WriteBlockedCycles) * 1000000.0; - LWPROBE(ReadyWrite, Proxy->PeerNodeId, NHPTimer::GetSeconds(nowCycles - ev->SendTime) * 1000.0, blockedUs / 1000.0); - WriteBlockedTotal += TDuration::MicroSeconds(blockedUs); - GenerateTraffic(); } else if (!ev->Cookie) { Proxy->Metrics->IncSpuriousWriteWakeups(); } - if (Params.Encryption && ReceiveContext->ReadPending && !ev->Cookie) { - Send(ReceiverId, ev->Release().Release(), 0, 1); + + GenerateTraffic(); + + if (Params.Encryption && readPending && ev->Sender != ReceiverId) { + Send(ReceiverId, ev->Release().Release()); } } void TInterconnectSessionTCP::Handle(TEvPollerRegisterResult::TPtr ev) { - const auto& sk = ev->Get()->Socket; - if (auto *token = sk == Socket ? &PollerToken : sk == XdcSocket ? &XdcPollerToken : nullptr) { - *token = std::move(ev->Get()->PollerToken); - } else { - return; - } + auto *msg = ev->Get(); - if (ReceiveContext->WriteBlockedByFullSendBuffer) { - if (Params.Encryption) { - auto *secure = static_cast<NInterconnect::TSecureSocket*>(Socket.Get()); - PollerToken->Request(secure->WantRead(), secure->WantWrite()); - } else { - PollerToken->Request(false, true); + if (msg->Socket == Socket) { + PollerToken = std::move(msg->PollerToken); + if (ReceiveContext->MainWriteBlocked) { + Socket->Request(*PollerToken, false, true); + } + } else if (msg->Socket == XdcSocket) { + XdcPollerToken = std::move(msg->PollerToken); + if (ReceiveContext->XdcWriteBlocked) { + XdcSocket->Request(*XdcPollerToken, false, true); } } } void TInterconnectSessionTCP::WriteData() { + Y_VERIFY(Socket); // ensure that socket wasn't closed + + // total bytes written during this call ui64 written = 0; - Y_VERIFY(Socket); // ensure that socket wasn't closed + auto process = [&](NInterconnect::TOutgoingStream& stream, const TIntrusivePtr<NInterconnect::TStreamSocket>& socket, + const TPollerToken::TPtr& token, bool *writeBlocked, auto&& callback) { + while (stream && socket) { + ssize_t r = Write(stream, *socket); + if (r == -1) { + *writeBlocked = true; + if (token) { + socket->Request(*token, false, true); + } + break; + } else if (r == 0) { + break; // error condition + } - LWPROBE_IF_TOO_LONG(SlowICWriteData, Proxy->PeerNodeId, ms) { - constexpr ui32 iovLimit = 256; -#ifdef _linux_ - ui32 maxElementsInIOV = Min<ui32>(iovLimit, sysconf(_SC_IOV_MAX)); -#else - ui32 maxElementsInIOV = 64; -#endif - if (Params.Encryption) { - maxElementsInIOV = 1; + *writeBlocked = false; + written += r; + callback(r); + } + if (!socket) { + *writeBlocked = true; + } else if (!stream) { + *writeBlocked = false; } + }; - // vector of write buffers with preallocated stack space - TStackVec<TConstIoVec, iovLimit> wbuffers; + Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize()); + + process(OutgoingStream, Socket, PollerToken, &ReceiveContext->MainWriteBlocked, [&](size_t r) { + Y_VERIFY(r <= BytesUnwritten); + BytesUnwritten -= r; - LOG_DEBUG_IC_SESSION("ICS30", "WriteData WriteBlockedByFullSendBuffer# %s SendQueue.size# %zu", - ReceiveContext->WriteBlockedByFullSendBuffer ? "true" : "false", SendQueue.size()); + OutgoingStream.Advance(r); - auto calculateUnsentQueueSize = [&] { - size_t res = -SendOffset; - for (auto it = SendQueue.begin() + SendQueuePos; it != SendQueue.end(); ++it) { - res += it->PacketSize; + ui64 packets = 0; + Y_VERIFY_DEBUG(SendQueuePos != SendQueue.size()); + SendOffset += r; + for (auto it = SendQueue.begin() + SendQueuePos; SendOffset && it->PacketSize <= SendOffset; ++SendQueuePos, ++it) { + SendOffset -= it->PacketSize; + Y_VERIFY_DEBUG(!it->Data || it->Serial <= OutputCounter); + if (it->Data && LastSentSerial < it->Serial) { + LastSentSerial = it->Serial; } - return res; - }; + ++PacketsWrittenToSocket; + ++packets; + Y_VERIFY_DEBUG(SendOffset == 0 || SendQueuePos != SendQueue.size() - 1); + } - while (!ReceiveContext->WriteBlockedByFullSendBuffer) { - Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize()); - Y_VERIFY_DEBUG(BytesUnwritten == calculateUnsentQueueSize()); + Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize()); + }); - OutgoingStream.ProduceIoVec(wbuffers, maxElementsInIOV); - if (!wbuffers) { // done sending - break; - } + process(XdcStream, XdcSocket, XdcPollerToken, &ReceiveContext->XdcWriteBlocked, [&](size_t r) { + XdcBytesSent += r; + XdcStream.Advance(r); + }); - const struct iovec* iovec = reinterpret_cast<const struct iovec*>(wbuffers.data()); - int iovcnt = wbuffers.size(); + if (written) { + Proxy->Metrics->AddTotalBytesWritten(written); + } - Y_VERIFY(iovcnt > 0); - Y_VERIFY(iovec->iov_len > 0); + const bool writeBlockedByFullSendBuffer = ReceiveContext->MainWriteBlocked || ReceiveContext->XdcWriteBlocked; + if (WriteBlockedByFullSendBuffer < writeBlockedByFullSendBuffer) { // became blocked + WriteBlockedCycles = GetCycleCountFast(); + LOG_DEBUG_IC_SESSION("ICS18", "hit send buffer limit"); + } else if (writeBlockedByFullSendBuffer < WriteBlockedByFullSendBuffer) { // became unblocked + WriteBlockedTotal += TDuration::Seconds(NHPTimer::GetSeconds(GetCycleCountFast() - WriteBlockedCycles)); + } + WriteBlockedByFullSendBuffer = writeBlockedByFullSendBuffer; + } - TString err; - ssize_t r = 0; - do { - const ui64 begin = GetCycleCountFast(); -#ifndef _win_ - r = iovcnt == 1 ? Socket->Send(iovec[0].iov_base, iovec[0].iov_len, &err) : Socket->WriteV(iovec, iovcnt); + ssize_t TInterconnectSessionTCP::Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket) { + LWPROBE_IF_TOO_LONG(SlowICWriteData, Proxy->PeerNodeId, ms) { + constexpr ui32 iovLimit = 256; + + ui32 maxElementsInIOV; + if (Params.Encryption) { + maxElementsInIOV = 1; + } else { +#if defined(_win_) + maxElementsInIOV = 1; +#elif defined(_linux_) + maxElementsInIOV = Min<ui32>(iovLimit, sysconf(_SC_IOV_MAX)); #else - r = Socket->Send(iovec[0].iov_base, iovec[0].iov_len, &err); + maxElementsInIOV = 64; #endif - const ui64 end = GetCycleCountFast(); - Proxy->Metrics->IncSendSyscalls((end - begin) * 1'000'000 / GetCyclesPerMillisecond()); - } while (r == -EINTR); - - LOG_DEBUG_IC_SESSION("ICS16", "written# %zd iovcnt# %d err# %s", r, iovcnt, err.data()); - - wbuffers.clear(); + } - if (r > 0) { - written += r; + TStackVec<TConstIoVec, iovLimit> wbuffers; - Y_VERIFY(static_cast<size_t>(r) <= BytesUnwritten); - BytesUnwritten -= r; + stream.ProduceIoVec(wbuffers, maxElementsInIOV); + Y_VERIFY(!wbuffers.empty()); - OutgoingStream.Advance(r); + TString err; + ssize_t r = 0; + { // issue syscall with timing + const ui64 begin = GetCycleCountFast(); - ui64 packets = 0; - Y_VERIFY_DEBUG(SendQueuePos != SendQueue.size()); - SendOffset += r; - for (auto it = SendQueue.begin() + SendQueuePos; SendOffset && it->PacketSize <= SendOffset; ++SendQueuePos, ++it) { - SendOffset -= it->PacketSize; - Y_VERIFY_DEBUG(!it->Data || it->Serial <= OutputCounter); - if (it->Data && LastSentSerial < it->Serial) { - LastSentSerial = it->Serial; - } - ++PacketsWrittenToSocket; - ++packets; - Y_VERIFY_DEBUG(SendOffset == 0 || SendQueuePos != SendQueue.size() - 1); -// LWTRACK(PacketWrittenToSocket, SendQueuePos->Orbit, Proxy->PeerNodeId, PacketsWrittenToSocket, false, SendQueuePos->PacketSize, BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket); + do { + if (wbuffers.size() == 1) { + auto& front = wbuffers.front(); + r = socket.Send(front.Data, front.Size, &err); + } else { + r = socket.WriteV(reinterpret_cast<const iovec*>(wbuffers.data()), wbuffers.size()); } + } while (r == -EINTR); - Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize()); - Y_VERIFY_DEBUG(BytesUnwritten == calculateUnsentQueueSize()); - - LWPROBE(WriteToSocket, Proxy->PeerNodeId, r, packets, PacketsWrittenToSocket, BytesUnwritten, GetWriteBlockedTotal(), (SOCKET)*Socket); - } else if (-r != EAGAIN && -r != EWOULDBLOCK) { - const TString message = r == 0 ? "connection closed by peer" - : err ? err - : Sprintf("socket: %s", strerror(-r)); - LOG_NOTICE_NET(Proxy->PeerNodeId, "%s", message.data()); - if (written) { - Proxy->Metrics->AddTotalBytesWritten(written); - } - return ReestablishConnectionWithHandshake(r == 0 ? TDisconnectReason::EndOfStream() : TDisconnectReason::FromErrno(-r)); - } else { - // we have received EAGAIN error code, this means that we can't issue more data until we have received - // TEvPollerReadyWrite event from poller; set up flag meaning this and wait for that event - Y_VERIFY(!ReceiveContext->WriteBlockedByFullSendBuffer); - ReceiveContext->WriteBlockedByFullSendBuffer = true; - WriteBlockedCycles = GetCycleCountFast(); - LWPROBE(BlockedWrite, Proxy->PeerNodeId, SendQueue.size(), written); - LOG_DEBUG_IC_SESSION("ICS18", "hit send buffer limit"); - - if (PollerToken) { - if (Params.Encryption) { - auto *secure = static_cast<NInterconnect::TSecureSocket*>(Socket.Get()); - PollerToken->Request(secure->WantRead(), secure->WantWrite()); - } else { - PollerToken->Request(false, true); - } - } - } + const ui64 end = GetCycleCountFast(); + Proxy->Metrics->IncSendSyscalls((end - begin) * 1'000'000 / GetCyclesPerMillisecond()); + } + + if (r > 0) { + return r; + } else if (-r != EAGAIN && -r != EWOULDBLOCK) { + const TString message = r == 0 ? "connection closed by peer" + : err ? err + : Sprintf("socket: %s", strerror(-r)); + LOG_NOTICE_NET(Proxy->PeerNodeId, "%s", message.data()); + ReestablishConnectionWithHandshake(r == 0 ? TDisconnectReason::EndOfStream() : TDisconnectReason::FromErrno(-r)); + return 0; // error indicator + } else { + return -1; // temporary error } } - if (written) { - Proxy->Metrics->AddTotalBytesWritten(written); - } + + Y_UNREACHABLE(); } void TInterconnectSessionTCP::SetForcePacketTimestamp(TDuration period) { @@ -702,6 +732,7 @@ namespace NActors { ++ConfirmPacketsForcedByTimeout; ++FlushEventsProcessed; MakePacket(false); // just generate confirmation packet if we have preconditions for this + WriteData(); } else if (ForcePacketTimestamp != TMonotonic::Max()) { ScheduleFlush(); } @@ -718,9 +749,12 @@ namespace NActors { } ui64 TInterconnectSessionTCP::MakePacket(bool data, TMaybe<ui64> pingMask) { - Y_VERIFY(Socket); +#ifndef NDEBUG + const size_t outgoingStreamSizeBefore = OutgoingStream.CalculateOutgoingSize(); + const size_t xdcStreamSizeBefore = XdcStream.CalculateOutgoingSize(); +#endif - TTcpPacketOutTask packet(Params, OutgoingStream); + TTcpPacketOutTask packet(Params, OutgoingStream, XdcStream); ui64 serial = 0; if (data) { @@ -751,19 +785,36 @@ namespace NActors { serial = *pingMask; } - const ui64 lastInputSerial = ReceiveContext->GetLastProcessedPacketSerial(); + const ui64 lastInputSerial = ReceiveContext->GetLastPacketSerialToConfirm(); packet.Finish(serial, lastInputSerial); // count number of bytes pending for write - const size_t packetSize = packet.GetFullSize(); + const size_t packetSize = packet.GetPacketSize(); BytesUnwritten += packetSize; Y_VERIFY_DEBUG(BytesUnwritten == OutgoingStream.CalculateUnsentSize(), "%s", TString(TStringBuilder() << "BytesUnwritten# " << BytesUnwritten << " packetSize# " << packetSize << " CalculateUnsentSize# " << OutgoingStream.CalculateUnsentSize()).data()); +#ifndef NDEBUG + const size_t outgoingStreamSizeAfter = OutgoingStream.CalculateOutgoingSize(); + const size_t xdcStreamSizeAfter = XdcStream.CalculateOutgoingSize(); + + Y_VERIFY(outgoingStreamSizeAfter == outgoingStreamSizeBefore + packetSize && + xdcStreamSizeAfter == xdcStreamSizeBefore + packet.GetExternalSize(), + "outgoingStreamSizeBefore# %zu outgoingStreamSizeAfter# %zu packetSize# %zu" + " xdcStreamSizeBefore# %zu xdcStreamSizeAfter# %zu externalSize# %zu", + outgoingStreamSizeBefore, outgoingStreamSizeAfter, packetSize, + xdcStreamSizeBefore, xdcStreamSizeAfter, packet.GetExternalSize()); +#endif + // put outgoing packet metadata here - SendQueue.push_back(TOutgoingPacket{packetSize, serial, data}); + SendQueue.push_back(TOutgoingPacket{ + static_cast<ui32>(packetSize), + static_cast<ui32>(packet.GetExternalSize()), + serial, + data + }); LOG_DEBUG_IC_SESSION("ICS22", "outgoing packet Serial# %" PRIu64 " Confirm# %" PRIu64 " DataSize# %zu" " InflightDataAmount# %" PRIu64 " BytesUnwritten# %" PRIu64, serial, lastInputSerial, packet.GetDataSize(), @@ -773,11 +824,6 @@ namespace NActors { ResetFlushLogic(); ++PacketsGenerated; - LWTRACK(PacketGenerated, packet.Orbit, Proxy->PeerNodeId, BytesUnwritten, InflightDataAmount, PacketsGenerated, packetSize); - - if (!data) { - WriteData(); - } return packetSize; } @@ -790,7 +836,6 @@ namespace NActors { LogPrefix.data(), confirm, LastConfirmed, OutputCounter, LastSentSerial); LastConfirmed = confirm; - ui64 droppedDataAmount = 0; std::optional<ui64> lastDroppedSerial = 0; ui32 numDropped = 0; @@ -814,6 +859,7 @@ namespace NActors { // drop confirmed packets; this also includes any auxiliary packets as their serial is set to zero, effectively // making Serial <= confirm true size_t bytesDropped = 0; + size_t bytesDroppedFromXdc = 0; for (; !SendQueue.empty(); SendQueue.pop_front(), --SendQueuePos) { auto& front = SendQueue.front(); if (front.Data && confirm < front.Serial) { @@ -822,12 +868,14 @@ namespace NActors { if (front.Data) { lastDroppedSerial.emplace(front.Serial); } - droppedDataAmount += front.PacketSize - sizeof(TTcpPacketHeader_v2); bytesDropped += front.PacketSize; + bytesDroppedFromXdc += front.ExternalSize; ++numDropped; Y_VERIFY_DEBUG(ignoreSendQueuePos || SendQueuePos != 0); } + const ui64 droppedDataAmount = bytesDropped + bytesDroppedFromXdc - sizeof(TTcpPacketHeader_v2) * numDropped; OutgoingStream.DropFront(bytesDropped); + XdcStream.DropFront(bytesDroppedFromXdc); if (lastDroppedSerial) { ChannelScheduler->ForEach([&](TEventOutputChannel& channel) { channel.DropConfirmed(*lastDroppedSerial); @@ -894,7 +942,6 @@ namespace NActors { } } - LWTRACK(FillSendingBuffer, task.Orbit, Proxy->PeerNodeId, bytesGenerated, NumEventsInReadyChannels, WriteBlockedTotal); Y_VERIFY(bytesGenerated); // ensure we are not stalled in serialization } @@ -1009,12 +1056,11 @@ namespace NActors { void TInterconnectSessionTCP::IssuePingRequest() { const TMonotonic now = TActivationContext::Monotonic(); if (now >= LastPingTimestamp + PingPeriodicity) { - LOG_DEBUG_IC_SESSION("ICS22", "Issuing ping request"); + LOG_DEBUG_IC_SESSION("ICS00", "Issuing ping request"); if (Socket) { MakePacket(false, GetCycleCountFast() | TTcpPacketBuf::PingRequestMask); - } - if (Socket) { MakePacket(false, TInstant::Now().MicroSeconds() | TTcpPacketBuf::ClockMask); + WriteData(); } LastPingTimestamp = now; } @@ -1023,10 +1069,14 @@ namespace NActors { void TInterconnectSessionTCP::Handle(TEvProcessPingRequest::TPtr ev) { if (Socket) { MakePacket(false, ev->Get()->Payload | TTcpPacketBuf::PingResponseMask); + WriteData(); } } - void TInterconnectSessionTCP::GenerateHttpInfo(TStringStream& str) { + void TInterconnectSessionTCP::GenerateHttpInfo(NMon::TEvHttpInfoRes::TPtr& ev) { + TStringStream str; + ev->Get()->Output(str); + HTML(str) { DIV_CLASS("panel panel-info") { DIV_CLASS("panel-heading") { @@ -1146,7 +1196,6 @@ namespace NActors { MON_VAR(PacketsGenerated) MON_VAR(PacketsWrittenToSocket) MON_VAR(PacketsConfirmed) - MON_VAR(AtomicGet(ReceiveContext->PacketsReadFromSocket)) MON_VAR(ConfirmPacketsForcedBySize) MON_VAR(ConfirmPacketsForcedByTimeout) @@ -1200,13 +1249,24 @@ namespace NActors { MON_VAR(LastHandshakeDone) MON_VAR(OutputCounter) MON_VAR(LastSentSerial) - MON_VAR(ReceiveContext->GetLastProcessedPacketSerial()) MON_VAR(LastConfirmed) MON_VAR(FlushSchedule.size()) MON_VAR(MaxFlushSchedule) MON_VAR(FlushEventsScheduled) MON_VAR(FlushEventsProcessed) + MON_VAR(GetWriteBlockedTotal()) + + MON_VAR(XdcBytesSent) + + MON_VAR(OutgoingStream.CalculateOutgoingSize()) + MON_VAR(OutgoingStream.CalculateUnsentSize()) + MON_VAR(OutgoingStream.GetSendQueueSize()) + + MON_VAR(XdcStream.CalculateOutgoingSize()) + MON_VAR(XdcStream.CalculateUnsentSize()) + MON_VAR(XdcStream.GetSendQueueSize()) + TString clockSkew; i64 x = GetClockSkew(); if (x < 0) { @@ -1228,6 +1288,12 @@ namespace NActors { } } } + + auto h = std::make_unique<IEventHandle>(ev->Recipient, ev->Sender, new NMon::TEvHttpInfoRes(str.Str())); + if (ReceiverId) { + h->Rewrite(h->Type, ReceiverId); + } + TActivationContext::Send(h.release()); } void CreateSessionKillingActor(TInterconnectProxyCommon::TPtr common) { diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index 48fa39b273..1437f7df2c 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -99,13 +99,9 @@ namespace NActors { ui64 ControlPacketSendTimer = 0; ui64 ControlPacketId = 0; - // number of packets received by input session - TAtomic PacketsReadFromSocket = 0; - TAtomic DataPacketsReadFromSocket = 0; - // last processed packet by input session - std::atomic_uint64_t LastProcessedPacketSerial = 0; - static constexpr uint64_t LastProcessedPacketSerialLockBit = uint64_t(1) << 63; + std::atomic_uint64_t LastPacketSerialToConfirm = 0; + static constexpr uint64_t LastPacketSerialToConfirmLockBit = uint64_t(1) << 63; // for hardened checks TAtomic NumInputSessions = 0; @@ -118,47 +114,72 @@ namespace NActors { std::atomic<EUpdateState> UpdateState; static_assert(std::atomic<EUpdateState>::is_always_lock_free); - bool WriteBlockedByFullSendBuffer = false; - bool ReadPending = false; + bool MainWriteBlocked = false; + bool XdcWriteBlocked = false; + bool MainReadPending = false; + bool XdcReadPending = false; + + struct TPerChannelContext { + struct TPendingEvent { + TEventSerializationInfo SerializationInfo; + TRope Payload; + std::optional<TEventData> EventData; + + // number of bytes remaining through XDC channel + size_t XdcSizeLeft = 0; + }; + + std::deque<TPendingEvent> PendingEvents; + std::deque<TMutableContiguousSpan> XdcBuffers; // receive queue for current channel + size_t FetchIndex = 0; + size_t FetchOffset = 0; + size_t XdcBytesToCatch = 0; // number of bytes to catch after reconnect + + void CalculateBytesToCatch(); + void FetchBuffers(ui16 channel, size_t numBytes, std::deque<std::tuple<ui16, TMutableContiguousSpan>>& outQ); + void DropFront(TRope *from, size_t numBytes); + }; - std::array<TRope, 16> ChannelArray; - std::unordered_map<ui16, TRope> ChannelMap; + std::array<TPerChannelContext, 16> ChannelArray; + std::unordered_map<ui16, TPerChannelContext> ChannelMap; + ui64 LastProcessedSerial = 0; TReceiveContext() { GetTimeFast(&StartTime); } - // returns false if sessions needs to be terminated and packet not to be processed - bool AdvanceLastProcessedPacketSerial() { + // returns false if sessions needs to be terminated + bool AdvanceLastPacketSerialToConfirm(ui64 nextValue) { for (;;) { - uint64_t value = LastProcessedPacketSerial.load(); - if (value & LastProcessedPacketSerialLockBit) { + uint64_t value = LastPacketSerialToConfirm.load(); + if (value & LastPacketSerialToConfirmLockBit) { return false; } - if (LastProcessedPacketSerial.compare_exchange_weak(value, value + 1)) { + Y_VERIFY_DEBUG(value + 1 == nextValue); + if (LastPacketSerialToConfirm.compare_exchange_weak(value, nextValue)) { return true; } } } - ui64 LockLastProcessedPacketSerial() { + ui64 LockLastPacketSerialToConfirm() { for (;;) { - uint64_t value = LastProcessedPacketSerial.load(); - if (value & LastProcessedPacketSerialLockBit) { - return value & ~LastProcessedPacketSerialLockBit; + uint64_t value = LastPacketSerialToConfirm.load(); + if (value & LastPacketSerialToConfirmLockBit) { + return value & ~LastPacketSerialToConfirmLockBit; } - if (LastProcessedPacketSerial.compare_exchange_strong(value, value | LastProcessedPacketSerialLockBit)) { + if (LastPacketSerialToConfirm.compare_exchange_strong(value, value | LastPacketSerialToConfirmLockBit)) { return value; } } } - void UnlockLastProcessedPacketSerial() { - LastProcessedPacketSerial = LastProcessedPacketSerial.load() & ~LastProcessedPacketSerialLockBit; + void ResetLastPacketSerialToConfirm() { + LastPacketSerialToConfirm = LastProcessedSerial; } - ui64 GetLastProcessedPacketSerial() { - return LastProcessedPacketSerial.load() & ~LastProcessedPacketSerialLockBit; + ui64 GetLastPacketSerialToConfirm() { + return LastPacketSerialToConfirm.load() & ~LastPacketSerialToConfirmLockBit; } }; @@ -172,7 +193,6 @@ namespace NActors { }; struct TEvCheckDeadPeer : TEventLocal<TEvCheckDeadPeer, EvCheckDeadPeer> {}; - struct TEvResumeReceiveData : TEventLocal<TEvResumeReceiveData, EvResumeReceiveData> {}; public: static constexpr EActivityType ActorActivityType() { @@ -195,14 +215,25 @@ namespace NActors { void Bootstrap(); - STRICT_STFUNC(WorkingState, + struct TExReestablishConnection { + TDisconnectReason Reason; + }; + + struct TExDestroySession { + TDisconnectReason Reason; + }; + + STATEFN(WorkingState); + + STRICT_STFUNC(WorkingStateImpl, cFunc(TEvents::TSystem::PoisonPill, PassAway) hFunc(TEvPollerReady, Handle) hFunc(TEvPollerRegisterResult, Handle) - cFunc(EvResumeReceiveData, HandleResumeReceiveData) + cFunc(EvResumeReceiveData, ReceiveData) cFunc(TEvInterconnect::TEvCloseInputSession::EventType, CloseInputSession) cFunc(EvCheckDeadPeer, HandleCheckDeadPeer) cFunc(TEvConfirmUpdate::EventType, HandleConfirmUpdate) + hFunc(NMon::TEvHttpInfoRes, GenerateHttpInfo) ) private: @@ -228,9 +259,28 @@ namespace NActors { }; EState State = EState::HEADER; + std::vector<char> XdcCommands; + + struct TInboundPacket { + ui64 Serial; + size_t XdcUnreadBytes; // number of unread bytes from XDC stream for this exact unprocessed packet + }; + std::deque<TInboundPacket> InboundPacketQ; + std::deque<std::tuple<ui16, TMutableContiguousSpan>> XdcInputQ; // target buffers for the XDC stream with channel reference + + // catch stream -- used after TCP reconnect to match unread XDC stream with main packet stream + TRope XdcCatchStream; // temporary data buffer to process XDC stream retransmission upon reconnect + TRcBuf XdcCatchStreamBuffer; + size_t XdcCatchStreamBufferOffset = 0; + ui64 XdcCatchStreamBytesPending = 0; + std::deque<std::tuple<ui16, ui16>> XdcCatchStreamMarkup; // a queue of pairs (channel, bytes) + std::deque<std::tuple<ui16, ui32>> XdcChecksumQ; // (size, expectedChecksum) + ui32 XdcCurrentChecksum = 0; + bool XdcCatchStreamFinal = false; + bool XdcCatchStreamFinalPending = false; + THolder<TEvUpdateFromInputSession> UpdateFromInputSession; - std::optional<ui64> LastReceivedSerial; ui64 ConfirmedByInput; std::shared_ptr<IInterconnectMetrics> Metrics; @@ -241,21 +291,31 @@ namespace NActors { void Handle(TEvPollerReady::TPtr ev); void Handle(TEvPollerRegisterResult::TPtr ev); - void HandleResumeReceiveData(); void HandleConfirmUpdate(); void ReceiveData(); void ProcessHeader(); - void ProcessPayload(ui64& numDataBytes); - void ProcessEvent(TRope& data, TEventData& descr); + void ProcessPayload(ui64 *numDataBytes); + void ProcessInboundPacketQ(size_t numXdcBytesRead); + void ProcessXdcCommand(ui16 channel, TReceiveContext::TPerChannelContext& context); + void ProcessEvents(TReceiveContext::TPerChannelContext& context); + ssize_t Read(NInterconnect::TStreamSocket& socket, const TPollerToken::TPtr& token, bool *readPending, + const TIoVec *iov, size_t num); bool ReadMore(); + bool ReadXdcCatchStream(ui64 *numDataBytes); + bool ReadXdc(ui64 *numDataBytes); + void HandleXdcChecksum(TContiguousSpan span); + + TReceiveContext::TPerChannelContext& GetPerChannelContext(ui16 channel) const; - void ReestablishConnection(TDisconnectReason reason); - void DestroySession(TDisconnectReason reason); void PassAway() override; TDeque<TIntrusivePtr<TRopeAlignedBuffer>> Buffers; size_t FirstBufferOffset = 0; + size_t CurrentBuffers = 1; // number of buffers currently required to allocate + static constexpr size_t MaxBuffers = 64; // maximum buffers possible + std::array<ui8, MaxBuffers> UsageHisto; // read count histogram + void PreallocateBuffers(); inline ui64 GetMaxCyclesPerEvent() const { @@ -275,6 +335,20 @@ namespace NActors { void HandlePingResponse(TDuration passed); void HandleClock(TInstant clock); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Stats + + ui64 BytesReadFromSocket = 0; + ui64 PacketsReadFromSocket = 0; + ui64 DataPacketsReadFromSocket = 0; + ui64 IgnoredDataPacketsFromSocket = 0; + + ui64 BytesReadFromXdcSocket = 0; + ui64 XdcSections = 0; + ui64 XdcRefs = 0; + + void GenerateHttpInfo(NMon::TEvHttpInfoRes::TPtr ev); }; class TInterconnectSessionTCP @@ -385,6 +459,7 @@ namespace NActors { void Handle(TEvPollerReady::TPtr& ev); void Handle(TEvPollerRegisterResult::TPtr ev); void WriteData(); + ssize_t Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket); ui64 MakePacket(bool data, TMaybe<ui64> pingMask = {}); void FillSendingBuffer(TTcpPacketOutTask& packet, ui64 serial); @@ -446,9 +521,11 @@ namespace NActors { void SwitchStuckPeriod(); NInterconnect::TOutgoingStream OutgoingStream; + NInterconnect::TOutgoingStream XdcStream; struct TOutgoingPacket { - size_t PacketSize; + ui32 PacketSize; // including header + ui32 ExternalSize; ui64 Serial; bool Data; }; @@ -456,17 +533,18 @@ namespace NActors { size_t SendQueuePos = 0; // packet being sent now size_t SendOffset = 0; + ui64 XdcBytesSent = 0; + ui64 WriteBlockedCycles = 0; // start of current block period TDuration WriteBlockedTotal; // total incremental duration that session has been blocked - ui64 BytesUnwritten = 0; + bool WriteBlockedByFullSendBuffer = false; + + ui64 BytesUnwritten = 0; // number of bytes in outgoing main queue TDuration GetWriteBlockedTotal() const { - if (ReceiveContext->WriteBlockedByFullSendBuffer) { - double blockedUs = NHPTimer::GetSeconds(GetCycleCountFast() - WriteBlockedCycles) * 1000000.0; - return WriteBlockedTotal + TDuration::MicroSeconds(blockedUs); // append current blocking period if any - } else { - return WriteBlockedTotal; - } + return WriteBlockedTotal + (WriteBlockedByFullSendBuffer + ? TDuration::Seconds(NHPTimer::GetSeconds(GetCycleCountFast() - WriteBlockedCycles)) + : TDuration::Zero()); } ui64 OutputCounter; @@ -496,7 +574,7 @@ namespace NActors { void HandleFlush(); void ResetFlushLogic(); - void GenerateHttpInfo(TStringStream& str); + void GenerateHttpInfo(NMon::TEvHttpInfoRes::TPtr& ev); TIntrusivePtr<TReceiveContext> ReceiveContext; TActorId ReceiverId; diff --git a/library/cpp/actors/interconnect/outgoing_stream.h b/library/cpp/actors/interconnect/outgoing_stream.h index 0306295b52..deffd1d93b 100644 --- a/library/cpp/actors/interconnect/outgoing_stream.h +++ b/library/cpp/actors/interconnect/outgoing_stream.h @@ -38,6 +38,10 @@ namespace NInterconnect { size_t SendOffset = 0; public: + operator bool() const { + return SendQueuePos != SendQueue.size(); + } + size_t CalculateOutgoingSize() const { size_t res = 0; for (const TSendChunk& chunk : SendQueue) { @@ -54,8 +58,12 @@ namespace NInterconnect { return res - SendOffset; } + size_t GetSendQueueSize() const { + return SendQueue.size(); + } + TMutableContiguousSpan AcquireSpanForWriting(size_t maxLen) { - if (AppendOffset == BufferSize) { // we have no free buffer, allocate one + if (maxLen && AppendOffset == BufferSize) { // we have no free buffer, allocate one Buffers.emplace_back(static_cast<TBuffer*>(malloc(sizeof(TBuffer)))); AppendBuffer = Buffers.back().get(); Y_VERIFY(AppendBuffer); @@ -67,14 +75,8 @@ namespace NInterconnect { } void Append(TContiguousSpan span) { - TBuffer *buffer = nullptr; if (AppendBuffer && span.data() == AppendBuffer->Data + AppendOffset) { // the only valid case to use previously acquired span - buffer = AppendBuffer; - AppendOffset += span.size(); - Y_VERIFY_DEBUG(AppendOffset <= BufferSize); - if (AppendOffset != BufferSize) { - ++buffer->RefCount; - } + AppendAcquiredSpan(span); } else { #ifndef NDEBUG // ensure this span does not point into any existing buffer part @@ -88,33 +90,15 @@ namespace NInterconnect { } } #endif + AppendSpanWithGlueing(span, nullptr); } - - if (!SendQueue.empty()) { - auto& back = SendQueue.back(); - if (back.Span.data() + back.Span.size() == span.data()) { // check if it is possible just to extend the last span - if (SendQueuePos == SendQueue.size()) { - --SendQueuePos; - SendOffset = back.Span.size(); - } - back.Span = {back.Span.data(), back.Span.size() + span.size()}; - DropBufferReference(buffer); - return; - } - } - - if (buffer) { - ++buffer->RefCount; - } - SendQueue.push_back(TSendChunk{span, buffer}); - DropBufferReference(buffer); } void Write(TContiguousSpan in) { while (in.size()) { auto outChunk = AcquireSpanForWriting(in.size()); - Append(outChunk); memcpy(outChunk.data(), in.data(), outChunk.size()); + AppendAcquiredSpan(outChunk); in = in.SubSpan(outChunk.size(), Max<size_t>()); } } @@ -126,7 +110,7 @@ namespace NInterconnect { while (len) { const auto span = AcquireSpanForWriting(len); - Append(span); + AppendAcquiredSpan(span); bookmark.push_back(span); len -= span.size(); } @@ -136,7 +120,7 @@ namespace NInterconnect { void WriteBookmark(TBookmark&& bookmark, TContiguousSpan in) { for (auto& outChunk : bookmark) { - Y_VERIFY(outChunk.size() <= in.size()); + Y_VERIFY_DEBUG(outChunk.size() <= in.size()); memcpy(outChunk.data(), in.data(), outChunk.size()); in = in.SubSpan(outChunk.size(), Max<size_t>()); } @@ -147,6 +131,11 @@ namespace NInterconnect { SendOffset = 0; } + void RewindToEnd() { + SendQueuePos = SendQueue.size(); + SendOffset = 0; + } + template<typename T> void ProduceIoVec(T& container, size_t maxItems) { size_t offset = SendOffset; @@ -173,7 +162,8 @@ namespace NInterconnect { if (numBytes < front.Span.size()) { front.Span = front.Span.SubSpan(numBytes, Max<size_t>()); if (SendQueuePos == 0) { - Y_VERIFY_DEBUG(numBytes <= SendOffset); + Y_VERIFY_DEBUG(numBytes <= SendOffset, "numBytes# %zu SendOffset# %zu SendQueuePos# %zu" + " SendQueue.size# %zu", numBytes, SendOffset, SendQueuePos, SendQueue.size()); SendOffset -= numBytes; } break; @@ -207,6 +197,37 @@ namespace NInterconnect { } private: + void AppendAcquiredSpan(TContiguousSpan span) { + TBuffer *buffer = AppendBuffer; + Y_VERIFY_DEBUG(buffer); + Y_VERIFY_DEBUG(span.data() == AppendBuffer->Data + AppendOffset); + AppendOffset += span.size(); + Y_VERIFY_DEBUG(AppendOffset <= BufferSize); + if (AppendOffset == BufferSize) { + AppendBuffer = nullptr; + } else { + ++buffer->RefCount; + } + AppendSpanWithGlueing(span, buffer); + } + + void AppendSpanWithGlueing(TContiguousSpan span, TBuffer *buffer) { + if (!SendQueue.empty()) { + auto& back = SendQueue.back(); + if (back.Span.data() + back.Span.size() == span.data()) { // check if it is possible just to extend the last span + Y_VERIFY_DEBUG(buffer == back.Buffer); + if (SendQueuePos == SendQueue.size()) { + --SendQueuePos; + SendOffset = back.Span.size(); + } + back.Span = {back.Span.data(), back.Span.size() + span.size()}; + DropBufferReference(buffer); + return; + } + } + SendQueue.push_back(TSendChunk{span, buffer}); + } + void DropBufferReference(TBuffer *buffer) { if (buffer && !--buffer->RefCount) { const size_t index = buffer->Index; diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h index c1d63fa3b8..3f7eda1e9c 100644 --- a/library/cpp/actors/interconnect/packet.h +++ b/library/cpp/actors/interconnect/packet.h @@ -59,6 +59,7 @@ struct TEventData { }; #pragma pack(push, 1) + struct TEventDescr2 { ui32 Type; ui32 Flags; @@ -68,6 +69,7 @@ struct TEventDescr2 { NWilson::TTraceId::TSerializedTraceId TraceId; ui32 Checksum; }; + #pragma pack(pop) struct TEventHolder : TNonCopyable { @@ -120,20 +122,23 @@ namespace NActors { struct TTcpPacketOutTask : TNonCopyable { const TSessionParams& Params; NInterconnect::TOutgoingStream& OutgoingStream; + NInterconnect::TOutgoingStream& XdcStream; NInterconnect::TOutgoingStream::TBookmark HeaderBookmark; - size_t DataSize = 0; - mutable NLWTrace::TOrbit Orbit; + size_t InternalSize = 0; + size_t ExternalSize = 0; - TTcpPacketOutTask(const TSessionParams& params, NInterconnect::TOutgoingStream& outgoingStream) + TTcpPacketOutTask(const TSessionParams& params, NInterconnect::TOutgoingStream& outgoingStream, + NInterconnect::TOutgoingStream& xdcStream) : Params(params) , OutgoingStream(outgoingStream) + , XdcStream(xdcStream) , HeaderBookmark(OutgoingStream.Bookmark(sizeof(TTcpPacketHeader_v2))) {} // Preallocate some space to fill it later. NInterconnect::TOutgoingStream::TBookmark Bookmark(size_t len) { - Y_VERIFY_DEBUG(len <= GetVirtualFreeAmount()); - DataSize += len; + Y_VERIFY_DEBUG(len <= GetInternalFreeAmount()); + InternalSize += len; return OutgoingStream.Bookmark(len); } @@ -143,32 +148,36 @@ struct TTcpPacketOutTask : TNonCopyable { } // Acquire raw pointer to write some data. - TMutableContiguousSpan AcquireSpanForWriting() { - return OutgoingStream.AcquireSpanForWriting(GetVirtualFreeAmount()); + TMutableContiguousSpan AcquireSpanForWriting(bool external) { + if (external) { + return XdcStream.AcquireSpanForWriting(GetExternalFreeAmount()); + } else { + return OutgoingStream.AcquireSpanForWriting(GetInternalFreeAmount()); + } } // Append reference to some data (acquired previously or external pointer). - void Append(const void *buffer, size_t len) { - Y_VERIFY_DEBUG(len <= GetVirtualFreeAmount()); - DataSize += len; - OutgoingStream.Append({static_cast<const char*>(buffer), len}); + void Append(bool external, const void *buffer, size_t len) { + Y_VERIFY_DEBUG(len <= (external ? GetExternalFreeAmount() : GetInternalFreeAmount())); + (external ? ExternalSize : InternalSize) += len; + (external ? XdcStream : OutgoingStream).Append({static_cast<const char*>(buffer), len}); } // Write some data with copying. - void Write(const void *buffer, size_t len) { - Y_VERIFY_DEBUG(len <= GetVirtualFreeAmount()); - DataSize += len; - OutgoingStream.Write({static_cast<const char*>(buffer), len}); + void Write(bool external, const void *buffer, size_t len) { + Y_VERIFY_DEBUG(len <= (external ? GetExternalFreeAmount() : GetInternalFreeAmount())); + (external ? ExternalSize : InternalSize) += len; + (external ? XdcStream : OutgoingStream).Write({static_cast<const char*>(buffer), len}); } void Finish(ui64 serial, ui64 confirm) { - Y_VERIFY(DataSize <= Max<ui16>()); + Y_VERIFY(InternalSize <= Max<ui16>()); TTcpPacketHeader_v2 header{ confirm, serial, 0, - static_cast<ui16>(DataSize) + static_cast<ui16>(InternalSize) }; if (Checksumming()) { @@ -177,13 +186,13 @@ struct TTcpPacketOutTask : TNonCopyable { size_t total = 0; ui32 checksum = 0; - OutgoingStream.ScanLastBytes(GetFullSize(), [&](TContiguousSpan span) { + OutgoingStream.ScanLastBytes(GetPacketSize(), [&](TContiguousSpan span) { checksum = Crc32cExtendMSanCompatible(checksum, span.data(), span.size()); total += span.size(); }); header.Checksum = checksum; - Y_VERIFY(total == sizeof(header) + DataSize, "total# %zu DataSize# %zu GetFullSize# %zu", total, DataSize, - GetFullSize()); + Y_VERIFY(total == GetPacketSize(), "total# %zu InternalSize# %zu GetPacketSize# %zu", total, InternalSize, + GetPacketSize()); } WriteBookmark(std::exchange(HeaderBookmark, {}), &header, sizeof(header)); @@ -193,9 +202,42 @@ struct TTcpPacketOutTask : TNonCopyable { return !Params.Encryption; } - bool IsFull() const { return GetVirtualFreeAmount() == 0; } bool IsEmpty() const { return GetDataSize() == 0; } - size_t GetDataSize() const { return DataSize; } - size_t GetFullSize() const { return sizeof(TTcpPacketHeader_v2) + GetDataSize(); } - size_t GetVirtualFreeAmount() const { return TTcpPacketBuf::PacketDataLen - DataSize; } + size_t GetDataSize() const { return InternalSize + ExternalSize; } + size_t GetPacketSize() const { return sizeof(TTcpPacketHeader_v2) + InternalSize; } + size_t GetInternalFreeAmount() const { return TTcpPacketBuf::PacketDataLen - InternalSize; } + size_t GetExternalFreeAmount() const { return 16384 - ExternalSize; } + size_t GetExternalSize() const { return ExternalSize; } }; + +namespace NInterconnect::NDetail { + static constexpr size_t MaxNumberBytes = (sizeof(ui64) * CHAR_BIT + 6) / 7; + + inline size_t SerializeNumber(ui64 num, char *buffer) { + char *begin = buffer; + do { + *buffer++ = (num & 0x7F) | (num >= 128 ? 0x80 : 0x00); + num >>= 7; + } while (num); + return buffer - begin; + } + + inline ui64 DeserializeNumber(const char **ptr, const char *end) { + const char *p = *ptr; + size_t res = 0; + size_t offset = 0; + for (;;) { + if (p == end) { + return Max<ui64>(); + } + const char byte = *p++; + res |= (static_cast<size_t>(byte) & 0x7F) << offset; + offset += 7; + if (!(byte & 0x80)) { + break; + } + } + *ptr = p; + return res; + } +} diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp index 2de8dce457..2f8e575cb0 100644 --- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp +++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp @@ -46,7 +46,7 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) { NInterconnect::TOutgoingStream stream; for (; numEvents; ++step) { - TTcpPacketOutTask task(p, stream); + TTcpPacketOutTask task(p, stream, stream); if (step == 100) { for (ui32 i = 0; i < 200; ++i) { |