diff options
author | alexvru <alexvru@ydb.tech> | 2023-01-23 17:26:05 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-01-23 17:26:05 +0300 |
commit | a65aef1c0441efcfc8721c8a67fd25195f34d402 (patch) | |
tree | ede200d3dbf5baebd15de4fdf792341a7c5b9618 /library | |
parent | 4395852866f7a69b06e307b0f9fcd8ef950c200b (diff) | |
download | ydb-a65aef1c0441efcfc8721c8a67fd25195f34d402.tar.gz |
Fix serialization for external data channel,
Diffstat (limited to 'library')
-rw-r--r-- | library/cpp/actors/core/event.cpp | 7 | ||||
-rw-r--r-- | library/cpp/actors/core/event.h | 4 | ||||
-rw-r--r-- | library/cpp/actors/core/event_load.h | 36 | ||||
-rw-r--r-- | library/cpp/actors/core/event_pb.h | 34 | ||||
-rw-r--r-- | library/cpp/actors/core/event_pb_payload_ut.cpp | 8 | ||||
-rw-r--r-- | library/cpp/actors/core/mon_ut.cpp | 2 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_channel.cpp | 23 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_channel.h | 3 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp | 5 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp | 2 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp | 8 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/ut/interconnect_ut.cpp | 4 |
12 files changed, 88 insertions, 48 deletions
diff --git a/library/cpp/actors/core/event.cpp b/library/cpp/actors/core/event.cpp index 33f8ce2aaf..6a5230f825 100644 --- a/library/cpp/actors/core/event.cpp +++ b/library/cpp/actors/core/event.cpp @@ -17,7 +17,7 @@ namespace NActors { if (Event) { TAllocChunkSerializer serializer; Event->SerializeToArcadiaStream(&serializer); - auto chainBuf = serializer.Release(Event->IsExtendedFormat()); + auto chainBuf = serializer.Release(Event->CreateSerializationInfo()); Event.Reset(); return chainBuf; } @@ -25,12 +25,13 @@ namespace NActors { } TIntrusivePtr<TEventSerializedData> IEventHandle::GetChainBuffer() { - if (Buffer) + if (Buffer) { return Buffer; + } if (Event) { TAllocChunkSerializer serializer; Event->SerializeToArcadiaStream(&serializer); - Buffer = serializer.Release(Event->IsExtendedFormat()); + Buffer = serializer.Release(Event->CreateSerializationInfo()); return Buffer; } return new TEventSerializedData; diff --git a/library/cpp/actors/core/event.h b/library/cpp/actors/core/event.h index 6b92edaf41..4eedeb0574 100644 --- a/library/cpp/actors/core/event.h +++ b/library/cpp/actors/core/event.h @@ -47,12 +47,10 @@ namespace NActors { virtual ui32 Type() const = 0; virtual bool SerializeToArcadiaStream(TChunkSerializer*) const = 0; virtual bool IsSerializable() const = 0; - virtual bool IsExtendedFormat() const { - return false; - } virtual ui32 CalculateSerializedSizeCached() const { return CalculateSerializedSize(); } + virtual TEventSerializationInfo CreateSerializationInfo() const { return {}; } }; // fat handle diff --git a/library/cpp/actors/core/event_load.h b/library/cpp/actors/core/event_load.h index c7cfbdd1b3..30cc26aa46 100644 --- a/library/cpp/actors/core/event_load.h +++ b/library/cpp/actors/core/event_load.h @@ -19,39 +19,55 @@ namespace NActors { size_t Size; }; + struct TEventSectionInfo { + size_t Headroom = 0; // headroom to be created on the receiving side + size_t Size = 0; // full size of serialized event section (a chunk in rope) + size_t Tailroom = 0; // tailroom for the chunk + size_t Alignment = 0; // required alignment + }; + + struct TEventSerializationInfo { + bool IsExtendedFormat = {}; + std::vector<TEventSectionInfo> Sections; + // total sum of Size for every section must match actual serialized size of the event + }; + class TEventSerializedData : public TThrRefBase { TRope Rope; - bool ExtendedFormat = false; + TEventSerializationInfo SerializationInfo; public: TEventSerializedData() = default; - TEventSerializedData(TRope&& rope, bool extendedFormat) + TEventSerializedData(TRope&& rope, TEventSerializationInfo&& serializationInfo) : Rope(std::move(rope)) - , ExtendedFormat(extendedFormat) + , SerializationInfo(std::move(serializationInfo)) {} TEventSerializedData(const TEventSerializedData& original, TString extraBuffer) : Rope(original.Rope) - , ExtendedFormat(original.ExtendedFormat) + , SerializationInfo(original.SerializationInfo) { + if (!SerializationInfo.Sections.empty()) { + SerializationInfo.Sections.push_back(TEventSectionInfo{0, extraBuffer.size(), 0, 0}); + } Append(std::move(extraBuffer)); } - TEventSerializedData(TString buffer, bool extendedFormat) - : ExtendedFormat(extendedFormat) + TEventSerializedData(TString buffer, TEventSerializationInfo&& serializationInfo) + : SerializationInfo(std::move(serializationInfo)) { Append(std::move(buffer)); } - void SetExtendedFormat() { - ExtendedFormat = true; + void SetSerializationInfo(TEventSerializationInfo&& serializationInfo) { + SerializationInfo = std::move(serializationInfo); } - bool IsExtendedFormat() const { - return ExtendedFormat; + const TEventSerializationInfo& GetSerializationInfo() const { + return SerializationInfo; } TRope::TConstIterator GetBeginIter() const { diff --git a/library/cpp/actors/core/event_pb.h b/library/cpp/actors/core/event_pb.h index 2d388fceeb..594559cea7 100644 --- a/library/cpp/actors/core/event_pb.h +++ b/library/cpp/actors/core/event_pb.h @@ -56,10 +56,8 @@ namespace NActors { bool WriteRope(const TRope *rope) override; bool WriteString(const TString *s) override; - inline TIntrusivePtr<TEventSerializedData> Release(bool extendedFormat) { - if (extendedFormat) { - Buffers->SetExtendedFormat(); - } + inline TIntrusivePtr<TEventSerializedData> Release(TEventSerializationInfo&& serializationInfo) { + Buffers->SetSerializationInfo(std::move(serializationInfo)); return std::move(Buffers); } @@ -160,10 +158,6 @@ namespace NActors { return true; } - bool IsExtendedFormat() const override { - return static_cast<bool>(Payload); - } - bool SerializeToArcadiaStream(TChunkSerializer* chunker) const override { // serialize payload first if (Payload) { @@ -236,7 +230,7 @@ namespace NActors { TRope::TConstIterator iter = input->GetBeginIter(); ui64 size = input->GetSize(); - if (input->IsExtendedFormat()) { + if (const auto& info = input->GetSerializationInfo(); info.IsExtendedFormat) { // check marker if (!iter.Valid() || *iter.ContiguousData() != PayloadMarker) { Y_FAIL("invalid event"); @@ -288,6 +282,28 @@ namespace NActors { CachedByteSize = 0; } + TEventSerializationInfo CreateSerializationInfo() const override { + TEventSerializationInfo info; + + if (Payload) { + char temp[MaxNumberBytes]; + info.Sections.push_back(TEventSectionInfo{0, 1, 0, 0}); // payload marker + info.Sections.push_back(TEventSectionInfo{0, SerializeNumber(Payload.size(), temp), 0, 0}); + for (const TRope& payload : Payload) { + info.Sections.push_back(TEventSectionInfo{0, SerializeNumber(payload.GetSize(), temp), 0, 0}); // length + info.Sections.push_back(TEventSectionInfo{0, payload.GetSize(), 0, 0}); // data + } + info.IsExtendedFormat = true; + } else { + info.IsExtendedFormat = false; + } + + const int byteSize = Max(0, Record.ByteSize()); + info.Sections.push_back(TEventSectionInfo{0, static_cast<size_t>(byteSize), 0, 0}); + + return info; + } + public: void ReservePayload(size_t size) { Payload.reserve(size); diff --git a/library/cpp/actors/core/event_pb_payload_ut.cpp b/library/cpp/actors/core/event_pb_payload_ut.cpp index eab007bc15..fe47bf4de0 100644 --- a/library/cpp/actors/core/event_pb_payload_ut.cpp +++ b/library/cpp/actors/core/event_pb_payload_ut.cpp @@ -50,7 +50,7 @@ Y_UNIT_TEST_SUITE(TEventProtoWithPayload) { auto serializer = MakeHolder<TAllocChunkSerializer>(); msg.SerializeToArcadiaStream(serializer.Get()); - auto buffers = serializer->Release(msg.IsExtendedFormat()); + auto buffers = serializer->Release(msg.CreateSerializationInfo()); UNIT_ASSERT_VALUES_EQUAL(buffers->GetSize(), msg.CalculateSerializedSize()); TString ser = buffers->GetString(); @@ -128,20 +128,20 @@ Y_UNIT_TEST_SUITE(TEventProtoWithPayload) { auto serializer1 = MakeHolder<TAllocChunkSerializer>(); e1.SerializeToArcadiaStream(serializer1.Get()); - auto buffers1 = serializer1->Release(e1.IsExtendedFormat()); + auto buffers1 = serializer1->Release(e1.CreateSerializationInfo()); UNIT_ASSERT_VALUES_EQUAL(buffers1->GetSize(), e1.CalculateSerializedSize()); TString ser1 = buffers1->GetString(); TEvMessageWithPayload e2(msg); auto serializer2 = MakeHolder<TAllocChunkSerializer>(); e2.SerializeToArcadiaStream(serializer2.Get()); - auto buffers2 = serializer2->Release(e2.IsExtendedFormat()); + auto buffers2 = serializer2->Release(e2.CreateSerializationInfo()); UNIT_ASSERT_VALUES_EQUAL(buffers2->GetSize(), e2.CalculateSerializedSize()); TString ser2 = buffers2->GetString(); UNIT_ASSERT_VALUES_EQUAL(ser1, ser2); // deserialize - auto data = MakeIntrusive<TEventSerializedData>(ser1, false); + auto data = MakeIntrusive<TEventSerializedData>(ser1, TEventSerializationInfo{}); THolder<TEvMessageWithPayloadPreSerialized> parsedEvent(static_cast<TEvMessageWithPayloadPreSerialized*>(TEvMessageWithPayloadPreSerialized::Load(data))); UNIT_ASSERT_VALUES_EQUAL(parsedEvent->PreSerializedData, ""); // this field is empty after deserialization auto& record = parsedEvent->GetRecord(); diff --git a/library/cpp/actors/core/mon_ut.cpp b/library/cpp/actors/core/mon_ut.cpp index d70b2c4a89..a2991e8a10 100644 --- a/library/cpp/actors/core/mon_ut.cpp +++ b/library/cpp/actors/core/mon_ut.cpp @@ -17,7 +17,7 @@ Y_UNIT_TEST_SUITE(ActorSystemMon) { TAllocChunkSerializer ser; const bool success = ev->SerializeToArcadiaStream(&ser); Y_VERIFY(success); - auto buffer = ser.Release(false); + auto buffer = ser.Release(ev->CreateSerializationInfo()); std::unique_ptr<TEvRemoteHttpInfo> restored(dynamic_cast<TEvRemoteHttpInfo*>(TEvRemoteHttpInfo::Load(buffer.Get()))); UNIT_ASSERT(restored->Query == ev->Query); UNIT_ASSERT(restored->Query.size()); diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp index fed1394686..41709d6fba 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.cpp +++ b/library/cpp/actors/interconnect/interconnect_channel.cpp @@ -24,8 +24,9 @@ namespace NActors { LWTRACK(SerializeToPacketEnd, event.Orbit, PeerNodeId, ChannelId, OutputQueueSize, task.GetDataSize()); task.Orbit.Take(event.Orbit); + Y_VERIFY(SerializationInfo); event.Descr.Flags = (event.Descr.Flags & ~IEventHandle::FlagForwardOnNondelivery) | - (ExtendedFormat ? IEventHandle::FlagExtendedFormat : 0); + (SerializationInfo->IsExtendedFormat ? IEventHandle::FlagExtendedFormat : 0); TChannelPart *part = static_cast<TChannelPart*>(task.GetFreeArea()); part->Channel = ChannelId | TChannelPart::LastPartFlag; @@ -81,18 +82,20 @@ namespace NActors { case EState::INITIAL: event.InitChecksum(); LWTRACK(SerializeToPacketBegin, event.Orbit, PeerNodeId, ChannelId, OutputQueueSize); - if (event.Event) { + if (event.Buffer) { + State = EState::BUFFER; + Iter = event.Buffer->GetBeginIter(); + SerializationInfo = &event.Buffer->GetSerializationInfo(); + } else if (event.Event) { State = EState::CHUNKER; IEventBase *base = event.Event.Get(); Chunker.SetSerializingEvent(base); - ExtendedFormat = base->IsExtendedFormat(); - } else if (event.Buffer) { - State = EState::BUFFER; - Iter = event.Buffer->GetBeginIter(); - ExtendedFormat = event.Buffer->IsExtendedFormat(); - } else { + SerializationInfoContainer = base->CreateSerializationInfo(); + SerializationInfo = &SerializationInfoContainer; + } else { // event without buffer and IEventBase instance State = EState::DESCRIPTOR; - ExtendedFormat = false; + SerializationInfoContainer = {}; + SerializationInfo = &SerializationInfoContainer; } break; @@ -167,6 +170,8 @@ namespace NActors { } event.Serial = serial; NotYetConfirmed.splice(NotYetConfirmed.end(), Queue, Queue.begin()); // move event to not-yet-confirmed queue + SerializationInfoContainer = {}; + SerializationInfo = nullptr; State = EState::INITIAL; return true; // we have processed whole event, signal to the caller } diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h index 58783d3c8d..312eff2666 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.h +++ b/library/cpp/actors/interconnect/interconnect_channel.h @@ -114,7 +114,8 @@ namespace NActors { std::list<TEventHolder> NotYetConfirmed; TRope::TConstIterator Iter; TCoroutineChunkSerializer Chunker; - bool ExtendedFormat = false; + TEventSerializationInfo SerializationInfoContainer; + const TEventSerializationInfo *SerializationInfo = nullptr; bool FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index fdf035499f..b1212b8914 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -327,12 +327,15 @@ namespace NActors { return ReestablishConnection(TDisconnectReason::ChecksumError()); } } + TEventSerializationInfo serializationInfo{ + .IsExtendedFormat = bool(descr.Flags & IEventHandle::FlagExtendedFormat), + }; auto ev = std::make_unique<IEventHandle>(SessionId, descr.Type, descr.Flags & ~IEventHandle::FlagExtendedFormat, descr.Recipient, descr.Sender, - MakeIntrusive<TEventSerializedData>(std::move(data), bool(descr.Flags & IEventHandle::FlagExtendedFormat)), + MakeIntrusive<TEventSerializedData>(std::move(data), std::move(serializationInfo)), descr.Cookie, Params.PeerScopeId, std::move(descr.TraceId)); diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp index 565a511859..32c8237b59 100644 --- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp +++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp @@ -20,7 +20,7 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) { auto pushEvent = [&](size_t size, int channel) { TString payload(size, 'X'); - auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, false), 0); + auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, TEventSerializationInfo{}), 0); auto& ch = scheduler.GetOutputChannel(channel); const bool wasWorking = ch.IsWorking(); ch.Push(*ev); diff --git a/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp b/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp index e6b2bd4e4c..7b45793e26 100644 --- a/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp +++ b/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp @@ -29,16 +29,16 @@ Y_UNIT_TEST_SUITE(EventHolderPool) { std::list<TEventHolder> q; auto& ev1 = pool.Allocate(q); - ev1.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true); + ev1.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), TEventSerializationInfo{}); auto& ev2 = pool.Allocate(q); - ev2.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true); + ev2.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), TEventSerializationInfo{}); auto& ev3 = pool.Allocate(q); - ev3.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true); + ev3.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), TEventSerializationInfo{}); auto& ev4 = pool.Allocate(q); - ev4.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true); + ev4.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), TEventSerializationInfo{}); pool.Release(q, q.begin()); pool.Release(q, q.begin()); diff --git a/library/cpp/actors/interconnect/ut/interconnect_ut.cpp b/library/cpp/actors/interconnect/ut/interconnect_ut.cpp index 8ef0b1507c..3596bffd5a 100644 --- a/library/cpp/actors/interconnect/ut/interconnect_ut.cpp +++ b/library/cpp/actors/interconnect/ut/interconnect_ut.cpp @@ -47,7 +47,7 @@ public: const TSessionToCookie::iterator s2cIt = SessionToCookie.emplace(SessionId, NextCookie); InFlight.emplace(NextCookie, std::make_tuple(s2cIt, MD5::CalcRaw(data))); TActivationContext::Send(new IEventHandle(TEvents::THelloWorld::Ping, IEventHandle::FlagTrackDelivery, Recipient, - SelfId(), MakeIntrusive<TEventSerializedData>(std::move(data), false), NextCookie)); + SelfId(), MakeIntrusive<TEventSerializedData>(std::move(data), TEventSerializationInfo{}), NextCookie)); // Cerr << (TStringBuilder() << "Send# " << NextCookie << Endl); ++NextCookie; } @@ -126,7 +126,7 @@ public: const TString& data = ev->GetChainBuffer()->GetString(); const TString& response = MD5::CalcRaw(data); TActivationContext::Send(new IEventHandle(TEvents::THelloWorld::Pong, 0, ev->Sender, SelfId(), - MakeIntrusive<TEventSerializedData>(response, false), ev->Cookie)); + MakeIntrusive<TEventSerializedData>(response, TEventSerializationInfo{}), ev->Cookie)); } STRICT_STFUNC(StateFunc, |