diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/actors/interconnect/interconnect_channel.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/actors/interconnect/interconnect_channel.cpp')
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_channel.cpp | 176 |
1 files changed, 176 insertions, 0 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp new file mode 100644 index 0000000000..a66ba2a154 --- /dev/null +++ b/library/cpp/actors/interconnect/interconnect_channel.cpp @@ -0,0 +1,176 @@ +#include "interconnect_channel.h" + +#include <library/cpp/actors/core/events.h> +#include <library/cpp/actors/core/executor_thread.h> +#include <library/cpp/actors/core/log.h> +#include <library/cpp/actors/core/probes.h> +#include <library/cpp/actors/protos/services_common.pb.h> +#include <library/cpp/actors/prof/tag.h> +#include <library/cpp/digest/crc32c/crc32c.h> + +LWTRACE_USING(ACTORLIB_PROVIDER); + +namespace NActors { + DECLARE_WILSON_EVENT(EventSentToSocket); + DECLARE_WILSON_EVENT(EventReceivedFromSocket); + + bool TEventOutputChannel::FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) { + const size_t amount = sizeof(TChannelPart) + sizeof(TEventDescr); + if (task.GetVirtualFreeAmount() < amount) { + return false; + } + + NWilson::TTraceId traceId(event.Descr.TraceId); +// if (ctx) { +// WILSON_TRACE(*ctx, &traceId, EventSentToSocket); +// } + traceId.Serialize(&event.Descr.TraceId); + LWTRACK(SerializeToPacketEnd, event.Orbit, PeerNodeId, ChannelId, OutputQueueSize, task.GetDataSize()); + task.Orbit.Take(event.Orbit); + + event.Descr.Flags = (event.Descr.Flags & ~IEventHandle::FlagForwardOnNondelivery) | + (ExtendedFormat ? IEventHandle::FlagExtendedFormat : 0); + + TChannelPart *part = static_cast<TChannelPart*>(task.GetFreeArea()); + part->Channel = ChannelId | TChannelPart::LastPartFlag; + part->Size = sizeof(TEventDescr); + memcpy(part + 1, &event.Descr, sizeof(TEventDescr)); + task.AppendBuf(part, amount); + *weightConsumed += amount; + OutputQueueSize -= part->Size; + Metrics->UpdateOutputChannelEvents(ChannelId); + + return true; + } + + void TEventOutputChannel::DropConfirmed(ui64 confirm) { + LOG_DEBUG_IC_SESSION("ICOCH98", "Dropping confirmed messages"); + for (auto it = NotYetConfirmed.begin(); it != NotYetConfirmed.end() && it->Serial <= confirm; ) { + Pool.Release(NotYetConfirmed, it++); + } + } + + bool TEventOutputChannel::FeedBuf(TTcpPacketOutTask& task, ui64 serial, ui64 *weightConsumed) { + for (;;) { + Y_VERIFY(!Queue.empty()); + TEventHolder& event = Queue.front(); + + switch (State) { + case EState::INITIAL: + event.InitChecksum(); + LWTRACK(SerializeToPacketBegin, event.Orbit, PeerNodeId, ChannelId, OutputQueueSize); + if (event.Event) { + State = EState::CHUNKER; + IEventBase *base = event.Event.Get(); + Chunker.SetSerializingEvent(base); + ExtendedFormat = base->IsExtendedFormat(); + } else if (event.Buffer) { + State = EState::BUFFER; + Iter = event.Buffer->GetBeginIter(); + ExtendedFormat = event.Buffer->IsExtendedFormat(); + } else { + State = EState::DESCRIPTOR; + ExtendedFormat = false; + } + break; + + case EState::CHUNKER: + case EState::BUFFER: { + size_t maxBytes = task.GetVirtualFreeAmount(); + if (maxBytes <= sizeof(TChannelPart)) { + return false; + } + + TChannelPart *part = static_cast<TChannelPart*>(task.GetFreeArea()); + part->Channel = ChannelId; + part->Size = 0; + task.AppendBuf(part, sizeof(TChannelPart)); + maxBytes -= sizeof(TChannelPart); + Y_VERIFY(maxBytes); + + auto addChunk = [&](const void *data, size_t len) { + event.UpdateChecksum(Params, data, len); + task.AppendBuf(data, len); + part->Size += len; + Y_VERIFY_DEBUG(maxBytes >= len); + maxBytes -= len; + + event.EventActuallySerialized += len; + if (event.EventActuallySerialized > MaxSerializedEventSize) { + throw TExSerializedEventTooLarge(event.Descr.Type); + } + }; + + bool complete = false; + if (State == EState::CHUNKER) { + Y_VERIFY_DEBUG(task.GetFreeArea() == part + 1); + while (!complete && maxBytes) { + const auto [first, last] = Chunker.FeedBuf(task.GetFreeArea(), maxBytes); + for (auto p = first; p != last; ++p) { + addChunk(p->first, p->second); + } + complete = Chunker.IsComplete(); + } + Y_VERIFY(!complete || Chunker.IsSuccessfull()); + Y_VERIFY_DEBUG(complete || !maxBytes); + } else { // BUFFER + while (const size_t numb = Min(maxBytes, Iter.ContiguousSize())) { + const char *obuf = Iter.ContiguousData(); + addChunk(obuf, numb); + Iter += numb; + } + complete = !Iter.Valid(); + } + if (complete) { + Y_VERIFY(event.EventActuallySerialized == event.EventSerializedSize, + "EventActuallySerialized# %" PRIu32 " EventSerializedSize# %" PRIu32 " Type# 0x%08" PRIx32, + event.EventActuallySerialized, event.EventSerializedSize, event.Descr.Type); + } + + if (!part->Size) { + task.Undo(sizeof(TChannelPart)); + } else { + *weightConsumed += sizeof(TChannelPart) + part->Size; + OutputQueueSize -= part->Size; + } + if (complete) { + State = EState::DESCRIPTOR; + } + break; + } + + case EState::DESCRIPTOR: + if (!FeedDescriptor(task, event, weightConsumed)) { + return false; + } + event.Serial = serial; + NotYetConfirmed.splice(NotYetConfirmed.end(), Queue, Queue.begin()); // move event to not-yet-confirmed queue + State = EState::INITIAL; + return true; // we have processed whole event, signal to the caller + } + } + } + + void TEventOutputChannel::NotifyUndelivered() { + LOG_DEBUG_IC_SESSION("ICOCH89", "Notyfying about Undelivered messages! NotYetConfirmed size: %zu, Queue size: %zu", NotYetConfirmed.size(), Queue.size()); + if (State == EState::CHUNKER) { + Y_VERIFY(!Chunker.IsComplete()); // chunk must have an event being serialized + Y_VERIFY(!Queue.empty()); // this event must be the first event in queue + TEventHolder& event = Queue.front(); + Y_VERIFY(Chunker.GetCurrentEvent() == event.Event.Get()); // ensure the event is valid + Chunker.Abort(); // stop serializing current event + Y_VERIFY(Chunker.IsComplete()); + } + for (auto& item : NotYetConfirmed) { + if (item.Descr.Flags & IEventHandle::FlagGenerateUnsureUndelivered) { // notify only when unsure flag is set + item.ForwardOnNondelivery(true); + } + } + Pool.Release(NotYetConfirmed); + for (auto& item : Queue) { + item.ForwardOnNondelivery(false); + } + Pool.Release(Queue); + } + +} |