diff options
author | gvit <gvit@ydb.tech> | 2023-01-23 13:35:08 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2023-01-23 13:35:08 +0300 |
commit | da72c4727923040ee9f7328caa9ee22e4bd4ab09 (patch) | |
tree | 724aaad14329bb2ae9821238b1d43b34e1af1a9d /library/cpp | |
parent | 006419e12383729f3d660ad51f5a84832d6e3780 (diff) | |
download | ydb-da72c4727923040ee9f7328caa9ee22e4bd4ab09.tar.gz |
Revert "Support interface part for ExternalDataChannel feature"
This reverts commit 0a05f95e826bc5416a494ba7822d7d7eaa8ce52c, reversing
changes made to 1c58d6c48d9dc449c72880696f23217d19595db1.
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/actors/core/event.cpp | 7 | ||||
-rw-r--r-- | library/cpp/actors/core/event.h | 4 | ||||
-rw-r--r-- | library/cpp/actors/core/event_load.h | 40 | ||||
-rw-r--r-- | library/cpp/actors/core/event_pb.h | 34 | ||||
-rw-r--r-- | library/cpp/actors/core/event_pb_payload_ut.cpp | 8 | ||||
-rw-r--r-- | library/cpp/actors/core/mon_ut.cpp | 2 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_channel.cpp | 8 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_channel.h | 2 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp | 5 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp | 2 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp | 8 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/ut/interconnect_ut.cpp | 4 |
12 files changed, 43 insertions, 81 deletions
diff --git a/library/cpp/actors/core/event.cpp b/library/cpp/actors/core/event.cpp index 6a5230f825..33f8ce2aaf 100644 --- a/library/cpp/actors/core/event.cpp +++ b/library/cpp/actors/core/event.cpp @@ -17,7 +17,7 @@ namespace NActors { if (Event) { TAllocChunkSerializer serializer; Event->SerializeToArcadiaStream(&serializer); - auto chainBuf = serializer.Release(Event->CreateSerializationInfo()); + auto chainBuf = serializer.Release(Event->IsExtendedFormat()); Event.Reset(); return chainBuf; } @@ -25,13 +25,12 @@ namespace NActors { } TIntrusivePtr<TEventSerializedData> IEventHandle::GetChainBuffer() { - if (Buffer) { + if (Buffer) return Buffer; - } if (Event) { TAllocChunkSerializer serializer; Event->SerializeToArcadiaStream(&serializer); - Buffer = serializer.Release(Event->CreateSerializationInfo()); + Buffer = serializer.Release(Event->IsExtendedFormat()); return Buffer; } return new TEventSerializedData; diff --git a/library/cpp/actors/core/event.h b/library/cpp/actors/core/event.h index 4eedeb0574..6b92edaf41 100644 --- a/library/cpp/actors/core/event.h +++ b/library/cpp/actors/core/event.h @@ -47,10 +47,12 @@ namespace NActors { virtual ui32 Type() const = 0; virtual bool SerializeToArcadiaStream(TChunkSerializer*) const = 0; virtual bool IsSerializable() const = 0; + virtual bool IsExtendedFormat() const { + return false; + } virtual ui32 CalculateSerializedSizeCached() const { return CalculateSerializedSize(); } - virtual TEventSerializationInfo CreateSerializationInfo() const { return {}; } }; // fat handle diff --git a/library/cpp/actors/core/event_load.h b/library/cpp/actors/core/event_load.h index 8d069c555d..c7cfbdd1b3 100644 --- a/library/cpp/actors/core/event_load.h +++ b/library/cpp/actors/core/event_load.h @@ -19,59 +19,39 @@ namespace NActors { size_t Size; }; - struct TEventSectionInfo { - size_t Headroom = 0; // headroom to be created on the receiving side - size_t Size = 0; // full size of serialized event section (a chunk in rope) - size_t Tailroom = 0; // tailroom for the chunk - size_t Alignment = 0; // required alignment - }; - - struct TEventSerializationInfo { - bool IsExtendedFormat = {}; - std::vector<TEventSectionInfo> Sections; - // total sum of Size for every section must match actual serialized size of the event - }; - class TEventSerializedData : public TThrRefBase { TRope Rope; - TEventSerializationInfo SerializationInfo; + bool ExtendedFormat = false; public: TEventSerializedData() = default; - TEventSerializedData(TRope&& rope, TEventSerializationInfo&& serializationInfo) + TEventSerializedData(TRope&& rope, bool extendedFormat) : Rope(std::move(rope)) - , SerializationInfo(std::move(serializationInfo)) + , ExtendedFormat(extendedFormat) {} TEventSerializedData(const TEventSerializedData& original, TString extraBuffer) : Rope(original.Rope) - , SerializationInfo(original.SerializationInfo) + , ExtendedFormat(original.ExtendedFormat) { - if (!SerializationInfo.Sections.empty()) { - SerializationInfo.Sections.push_back(TEventSectionInfo{0, extraBuffer.size(), 0, 0}); - } Append(std::move(extraBuffer)); } - TEventSerializedData(TString buffer, TEventSerializationInfo&& serializationInfo) - : SerializationInfo(std::move(serializationInfo)) + TEventSerializedData(TString buffer, bool extendedFormat) + : ExtendedFormat(extendedFormat) { Append(std::move(buffer)); } - void SetSerializationInfo(TEventSerializationInfo&& serializationInfo) { - SerializationInfo = std::move(serializationInfo); - } - - const TEventSerializationInfo& GetSerializationInfo() const { - return SerializationInfo; + void SetExtendedFormat() { + ExtendedFormat = true; } - TEventSerializationInfo ReleaseSerializationInfo() { - return std::move(SerializationInfo); + bool IsExtendedFormat() const { + return ExtendedFormat; } TRope::TConstIterator GetBeginIter() const { diff --git a/library/cpp/actors/core/event_pb.h b/library/cpp/actors/core/event_pb.h index 594559cea7..2d388fceeb 100644 --- a/library/cpp/actors/core/event_pb.h +++ b/library/cpp/actors/core/event_pb.h @@ -56,8 +56,10 @@ namespace NActors { bool WriteRope(const TRope *rope) override; bool WriteString(const TString *s) override; - inline TIntrusivePtr<TEventSerializedData> Release(TEventSerializationInfo&& serializationInfo) { - Buffers->SetSerializationInfo(std::move(serializationInfo)); + inline TIntrusivePtr<TEventSerializedData> Release(bool extendedFormat) { + if (extendedFormat) { + Buffers->SetExtendedFormat(); + } return std::move(Buffers); } @@ -158,6 +160,10 @@ namespace NActors { return true; } + bool IsExtendedFormat() const override { + return static_cast<bool>(Payload); + } + bool SerializeToArcadiaStream(TChunkSerializer* chunker) const override { // serialize payload first if (Payload) { @@ -230,7 +236,7 @@ namespace NActors { TRope::TConstIterator iter = input->GetBeginIter(); ui64 size = input->GetSize(); - if (const auto& info = input->GetSerializationInfo(); info.IsExtendedFormat) { + if (input->IsExtendedFormat()) { // check marker if (!iter.Valid() || *iter.ContiguousData() != PayloadMarker) { Y_FAIL("invalid event"); @@ -282,28 +288,6 @@ namespace NActors { CachedByteSize = 0; } - TEventSerializationInfo CreateSerializationInfo() const override { - TEventSerializationInfo info; - - if (Payload) { - char temp[MaxNumberBytes]; - info.Sections.push_back(TEventSectionInfo{0, 1, 0, 0}); // payload marker - info.Sections.push_back(TEventSectionInfo{0, SerializeNumber(Payload.size(), temp), 0, 0}); - for (const TRope& payload : Payload) { - info.Sections.push_back(TEventSectionInfo{0, SerializeNumber(payload.GetSize(), temp), 0, 0}); // length - info.Sections.push_back(TEventSectionInfo{0, payload.GetSize(), 0, 0}); // data - } - info.IsExtendedFormat = true; - } else { - info.IsExtendedFormat = false; - } - - const int byteSize = Max(0, Record.ByteSize()); - info.Sections.push_back(TEventSectionInfo{0, static_cast<size_t>(byteSize), 0, 0}); - - return info; - } - public: void ReservePayload(size_t size) { Payload.reserve(size); diff --git a/library/cpp/actors/core/event_pb_payload_ut.cpp b/library/cpp/actors/core/event_pb_payload_ut.cpp index fe47bf4de0..eab007bc15 100644 --- a/library/cpp/actors/core/event_pb_payload_ut.cpp +++ b/library/cpp/actors/core/event_pb_payload_ut.cpp @@ -50,7 +50,7 @@ Y_UNIT_TEST_SUITE(TEventProtoWithPayload) { auto serializer = MakeHolder<TAllocChunkSerializer>(); msg.SerializeToArcadiaStream(serializer.Get()); - auto buffers = serializer->Release(msg.CreateSerializationInfo()); + auto buffers = serializer->Release(msg.IsExtendedFormat()); UNIT_ASSERT_VALUES_EQUAL(buffers->GetSize(), msg.CalculateSerializedSize()); TString ser = buffers->GetString(); @@ -128,20 +128,20 @@ Y_UNIT_TEST_SUITE(TEventProtoWithPayload) { auto serializer1 = MakeHolder<TAllocChunkSerializer>(); e1.SerializeToArcadiaStream(serializer1.Get()); - auto buffers1 = serializer1->Release(e1.CreateSerializationInfo()); + auto buffers1 = serializer1->Release(e1.IsExtendedFormat()); UNIT_ASSERT_VALUES_EQUAL(buffers1->GetSize(), e1.CalculateSerializedSize()); TString ser1 = buffers1->GetString(); TEvMessageWithPayload e2(msg); auto serializer2 = MakeHolder<TAllocChunkSerializer>(); e2.SerializeToArcadiaStream(serializer2.Get()); - auto buffers2 = serializer2->Release(e2.CreateSerializationInfo()); + auto buffers2 = serializer2->Release(e2.IsExtendedFormat()); UNIT_ASSERT_VALUES_EQUAL(buffers2->GetSize(), e2.CalculateSerializedSize()); TString ser2 = buffers2->GetString(); UNIT_ASSERT_VALUES_EQUAL(ser1, ser2); // deserialize - auto data = MakeIntrusive<TEventSerializedData>(ser1, TEventSerializationInfo{}); + auto data = MakeIntrusive<TEventSerializedData>(ser1, false); THolder<TEvMessageWithPayloadPreSerialized> parsedEvent(static_cast<TEvMessageWithPayloadPreSerialized*>(TEvMessageWithPayloadPreSerialized::Load(data))); UNIT_ASSERT_VALUES_EQUAL(parsedEvent->PreSerializedData, ""); // this field is empty after deserialization auto& record = parsedEvent->GetRecord(); diff --git a/library/cpp/actors/core/mon_ut.cpp b/library/cpp/actors/core/mon_ut.cpp index a2991e8a10..d70b2c4a89 100644 --- a/library/cpp/actors/core/mon_ut.cpp +++ b/library/cpp/actors/core/mon_ut.cpp @@ -17,7 +17,7 @@ Y_UNIT_TEST_SUITE(ActorSystemMon) { TAllocChunkSerializer ser; const bool success = ev->SerializeToArcadiaStream(&ser); Y_VERIFY(success); - auto buffer = ser.Release(ev->CreateSerializationInfo()); + auto buffer = ser.Release(false); std::unique_ptr<TEvRemoteHttpInfo> restored(dynamic_cast<TEvRemoteHttpInfo*>(TEvRemoteHttpInfo::Load(buffer.Get()))); UNIT_ASSERT(restored->Query == ev->Query); UNIT_ASSERT(restored->Query.size()); diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp index 7f3bc65497..fed1394686 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.cpp +++ b/library/cpp/actors/interconnect/interconnect_channel.cpp @@ -25,7 +25,7 @@ namespace NActors { task.Orbit.Take(event.Orbit); event.Descr.Flags = (event.Descr.Flags & ~IEventHandle::FlagForwardOnNondelivery) | - (SerializationInfo.IsExtendedFormat ? IEventHandle::FlagExtendedFormat : 0); + (ExtendedFormat ? IEventHandle::FlagExtendedFormat : 0); TChannelPart *part = static_cast<TChannelPart*>(task.GetFreeArea()); part->Channel = ChannelId | TChannelPart::LastPartFlag; @@ -85,14 +85,14 @@ namespace NActors { State = EState::CHUNKER; IEventBase *base = event.Event.Get(); Chunker.SetSerializingEvent(base); - SerializationInfo = base->CreateSerializationInfo(); + ExtendedFormat = base->IsExtendedFormat(); } else if (event.Buffer) { State = EState::BUFFER; Iter = event.Buffer->GetBeginIter(); - SerializationInfo = event.Buffer->ReleaseSerializationInfo(); + ExtendedFormat = event.Buffer->IsExtendedFormat(); } else { State = EState::DESCRIPTOR; - SerializationInfo = {}; + ExtendedFormat = false; } break; diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h index 4a476abc37..58783d3c8d 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.h +++ b/library/cpp/actors/interconnect/interconnect_channel.h @@ -114,7 +114,7 @@ namespace NActors { std::list<TEventHolder> NotYetConfirmed; TRope::TConstIterator Iter; TCoroutineChunkSerializer Chunker; - TEventSerializationInfo SerializationInfo; + bool ExtendedFormat = false; bool FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index b1212b8914..fdf035499f 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -327,15 +327,12 @@ namespace NActors { return ReestablishConnection(TDisconnectReason::ChecksumError()); } } - TEventSerializationInfo serializationInfo{ - .IsExtendedFormat = bool(descr.Flags & IEventHandle::FlagExtendedFormat), - }; auto ev = std::make_unique<IEventHandle>(SessionId, descr.Type, descr.Flags & ~IEventHandle::FlagExtendedFormat, descr.Recipient, descr.Sender, - MakeIntrusive<TEventSerializedData>(std::move(data), std::move(serializationInfo)), + MakeIntrusive<TEventSerializedData>(std::move(data), bool(descr.Flags & IEventHandle::FlagExtendedFormat)), descr.Cookie, Params.PeerScopeId, std::move(descr.TraceId)); diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp index 32c8237b59..565a511859 100644 --- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp +++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp @@ -20,7 +20,7 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) { auto pushEvent = [&](size_t size, int channel) { TString payload(size, 'X'); - auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, TEventSerializationInfo{}), 0); + auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, false), 0); auto& ch = scheduler.GetOutputChannel(channel); const bool wasWorking = ch.IsWorking(); ch.Push(*ev); diff --git a/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp b/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp index 7b45793e26..e6b2bd4e4c 100644 --- a/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp +++ b/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp @@ -29,16 +29,16 @@ Y_UNIT_TEST_SUITE(EventHolderPool) { std::list<TEventHolder> q; auto& ev1 = pool.Allocate(q); - ev1.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), TEventSerializationInfo{}); + ev1.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true); auto& ev2 = pool.Allocate(q); - ev2.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), TEventSerializationInfo{}); + ev2.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true); auto& ev3 = pool.Allocate(q); - ev3.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), TEventSerializationInfo{}); + ev3.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true); auto& ev4 = pool.Allocate(q); - ev4.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), TEventSerializationInfo{}); + ev4.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true); pool.Release(q, q.begin()); pool.Release(q, q.begin()); diff --git a/library/cpp/actors/interconnect/ut/interconnect_ut.cpp b/library/cpp/actors/interconnect/ut/interconnect_ut.cpp index 3596bffd5a..8ef0b1507c 100644 --- a/library/cpp/actors/interconnect/ut/interconnect_ut.cpp +++ b/library/cpp/actors/interconnect/ut/interconnect_ut.cpp @@ -47,7 +47,7 @@ public: const TSessionToCookie::iterator s2cIt = SessionToCookie.emplace(SessionId, NextCookie); InFlight.emplace(NextCookie, std::make_tuple(s2cIt, MD5::CalcRaw(data))); TActivationContext::Send(new IEventHandle(TEvents::THelloWorld::Ping, IEventHandle::FlagTrackDelivery, Recipient, - SelfId(), MakeIntrusive<TEventSerializedData>(std::move(data), TEventSerializationInfo{}), NextCookie)); + SelfId(), MakeIntrusive<TEventSerializedData>(std::move(data), false), NextCookie)); // Cerr << (TStringBuilder() << "Send# " << NextCookie << Endl); ++NextCookie; } @@ -126,7 +126,7 @@ public: const TString& data = ev->GetChainBuffer()->GetString(); const TString& response = MD5::CalcRaw(data); TActivationContext::Send(new IEventHandle(TEvents::THelloWorld::Pong, 0, ev->Sender, SelfId(), - MakeIntrusive<TEventSerializedData>(response, TEventSerializationInfo{}), ev->Cookie)); + MakeIntrusive<TEventSerializedData>(response, false), ev->Cookie)); } STRICT_STFUNC(StateFunc, |