diff options
author | alexvru <alexvru@ydb.tech> | 2023-01-23 17:26:05 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-01-23 17:26:05 +0300 |
commit | a65aef1c0441efcfc8721c8a67fd25195f34d402 (patch) | |
tree | ede200d3dbf5baebd15de4fdf792341a7c5b9618 | |
parent | 4395852866f7a69b06e307b0f9fcd8ef950c200b (diff) | |
download | ydb-a65aef1c0441efcfc8721c8a67fd25195f34d402.tar.gz |
Fix serialization for external data channel,
24 files changed, 109 insertions, 67 deletions
diff --git a/library/cpp/actors/core/event.cpp b/library/cpp/actors/core/event.cpp index 33f8ce2aaf..6a5230f825 100644 --- a/library/cpp/actors/core/event.cpp +++ b/library/cpp/actors/core/event.cpp @@ -17,7 +17,7 @@ namespace NActors { if (Event) { TAllocChunkSerializer serializer; Event->SerializeToArcadiaStream(&serializer); - auto chainBuf = serializer.Release(Event->IsExtendedFormat()); + auto chainBuf = serializer.Release(Event->CreateSerializationInfo()); Event.Reset(); return chainBuf; } @@ -25,12 +25,13 @@ namespace NActors { } TIntrusivePtr<TEventSerializedData> IEventHandle::GetChainBuffer() { - if (Buffer) + if (Buffer) { return Buffer; + } if (Event) { TAllocChunkSerializer serializer; Event->SerializeToArcadiaStream(&serializer); - Buffer = serializer.Release(Event->IsExtendedFormat()); + Buffer = serializer.Release(Event->CreateSerializationInfo()); return Buffer; } return new TEventSerializedData; diff --git a/library/cpp/actors/core/event.h b/library/cpp/actors/core/event.h index 6b92edaf41..4eedeb0574 100644 --- a/library/cpp/actors/core/event.h +++ b/library/cpp/actors/core/event.h @@ -47,12 +47,10 @@ namespace NActors { virtual ui32 Type() const = 0; virtual bool SerializeToArcadiaStream(TChunkSerializer*) const = 0; virtual bool IsSerializable() const = 0; - virtual bool IsExtendedFormat() const { - return false; - } virtual ui32 CalculateSerializedSizeCached() const { return CalculateSerializedSize(); } + virtual TEventSerializationInfo CreateSerializationInfo() const { return {}; } }; // fat handle diff --git a/library/cpp/actors/core/event_load.h b/library/cpp/actors/core/event_load.h index c7cfbdd1b3..30cc26aa46 100644 --- a/library/cpp/actors/core/event_load.h +++ b/library/cpp/actors/core/event_load.h @@ -19,39 +19,55 @@ namespace NActors { size_t Size; }; + struct TEventSectionInfo { + size_t Headroom = 0; // headroom to be created on the receiving side + size_t Size = 0; // full size of serialized event section (a chunk in rope) + size_t Tailroom = 0; // tailroom for the chunk + size_t Alignment = 0; // required alignment + }; + + struct TEventSerializationInfo { + bool IsExtendedFormat = {}; + std::vector<TEventSectionInfo> Sections; + // total sum of Size for every section must match actual serialized size of the event + }; + class TEventSerializedData : public TThrRefBase { TRope Rope; - bool ExtendedFormat = false; + TEventSerializationInfo SerializationInfo; public: TEventSerializedData() = default; - TEventSerializedData(TRope&& rope, bool extendedFormat) + TEventSerializedData(TRope&& rope, TEventSerializationInfo&& serializationInfo) : Rope(std::move(rope)) - , ExtendedFormat(extendedFormat) + , SerializationInfo(std::move(serializationInfo)) {} TEventSerializedData(const TEventSerializedData& original, TString extraBuffer) : Rope(original.Rope) - , ExtendedFormat(original.ExtendedFormat) + , SerializationInfo(original.SerializationInfo) { + if (!SerializationInfo.Sections.empty()) { + SerializationInfo.Sections.push_back(TEventSectionInfo{0, extraBuffer.size(), 0, 0}); + } Append(std::move(extraBuffer)); } - TEventSerializedData(TString buffer, bool extendedFormat) - : ExtendedFormat(extendedFormat) + TEventSerializedData(TString buffer, TEventSerializationInfo&& serializationInfo) + : SerializationInfo(std::move(serializationInfo)) { Append(std::move(buffer)); } - void SetExtendedFormat() { - ExtendedFormat = true; + void SetSerializationInfo(TEventSerializationInfo&& serializationInfo) { + SerializationInfo = std::move(serializationInfo); } - bool IsExtendedFormat() const { - return ExtendedFormat; + const TEventSerializationInfo& GetSerializationInfo() const { + return SerializationInfo; } TRope::TConstIterator GetBeginIter() const { diff --git a/library/cpp/actors/core/event_pb.h b/library/cpp/actors/core/event_pb.h index 2d388fceeb..594559cea7 100644 --- a/library/cpp/actors/core/event_pb.h +++ b/library/cpp/actors/core/event_pb.h @@ -56,10 +56,8 @@ namespace NActors { bool WriteRope(const TRope *rope) override; bool WriteString(const TString *s) override; - inline TIntrusivePtr<TEventSerializedData> Release(bool extendedFormat) { - if (extendedFormat) { - Buffers->SetExtendedFormat(); - } + inline TIntrusivePtr<TEventSerializedData> Release(TEventSerializationInfo&& serializationInfo) { + Buffers->SetSerializationInfo(std::move(serializationInfo)); return std::move(Buffers); } @@ -160,10 +158,6 @@ namespace NActors { return true; } - bool IsExtendedFormat() const override { - return static_cast<bool>(Payload); - } - bool SerializeToArcadiaStream(TChunkSerializer* chunker) const override { // serialize payload first if (Payload) { @@ -236,7 +230,7 @@ namespace NActors { TRope::TConstIterator iter = input->GetBeginIter(); ui64 size = input->GetSize(); - if (input->IsExtendedFormat()) { + if (const auto& info = input->GetSerializationInfo(); info.IsExtendedFormat) { // check marker if (!iter.Valid() || *iter.ContiguousData() != PayloadMarker) { Y_FAIL("invalid event"); @@ -288,6 +282,28 @@ namespace NActors { CachedByteSize = 0; } + TEventSerializationInfo CreateSerializationInfo() const override { + TEventSerializationInfo info; + + if (Payload) { + char temp[MaxNumberBytes]; + info.Sections.push_back(TEventSectionInfo{0, 1, 0, 0}); // payload marker + info.Sections.push_back(TEventSectionInfo{0, SerializeNumber(Payload.size(), temp), 0, 0}); + for (const TRope& payload : Payload) { + info.Sections.push_back(TEventSectionInfo{0, SerializeNumber(payload.GetSize(), temp), 0, 0}); // length + info.Sections.push_back(TEventSectionInfo{0, payload.GetSize(), 0, 0}); // data + } + info.IsExtendedFormat = true; + } else { + info.IsExtendedFormat = false; + } + + const int byteSize = Max(0, Record.ByteSize()); + info.Sections.push_back(TEventSectionInfo{0, static_cast<size_t>(byteSize), 0, 0}); + + return info; + } + public: void ReservePayload(size_t size) { Payload.reserve(size); diff --git a/library/cpp/actors/core/event_pb_payload_ut.cpp b/library/cpp/actors/core/event_pb_payload_ut.cpp index eab007bc15..fe47bf4de0 100644 --- a/library/cpp/actors/core/event_pb_payload_ut.cpp +++ b/library/cpp/actors/core/event_pb_payload_ut.cpp @@ -50,7 +50,7 @@ Y_UNIT_TEST_SUITE(TEventProtoWithPayload) { auto serializer = MakeHolder<TAllocChunkSerializer>(); msg.SerializeToArcadiaStream(serializer.Get()); - auto buffers = serializer->Release(msg.IsExtendedFormat()); + auto buffers = serializer->Release(msg.CreateSerializationInfo()); UNIT_ASSERT_VALUES_EQUAL(buffers->GetSize(), msg.CalculateSerializedSize()); TString ser = buffers->GetString(); @@ -128,20 +128,20 @@ Y_UNIT_TEST_SUITE(TEventProtoWithPayload) { auto serializer1 = MakeHolder<TAllocChunkSerializer>(); e1.SerializeToArcadiaStream(serializer1.Get()); - auto buffers1 = serializer1->Release(e1.IsExtendedFormat()); + auto buffers1 = serializer1->Release(e1.CreateSerializationInfo()); UNIT_ASSERT_VALUES_EQUAL(buffers1->GetSize(), e1.CalculateSerializedSize()); TString ser1 = buffers1->GetString(); TEvMessageWithPayload e2(msg); auto serializer2 = MakeHolder<TAllocChunkSerializer>(); e2.SerializeToArcadiaStream(serializer2.Get()); - auto buffers2 = serializer2->Release(e2.IsExtendedFormat()); + auto buffers2 = serializer2->Release(e2.CreateSerializationInfo()); UNIT_ASSERT_VALUES_EQUAL(buffers2->GetSize(), e2.CalculateSerializedSize()); TString ser2 = buffers2->GetString(); UNIT_ASSERT_VALUES_EQUAL(ser1, ser2); // deserialize - auto data = MakeIntrusive<TEventSerializedData>(ser1, false); + auto data = MakeIntrusive<TEventSerializedData>(ser1, TEventSerializationInfo{}); THolder<TEvMessageWithPayloadPreSerialized> parsedEvent(static_cast<TEvMessageWithPayloadPreSerialized*>(TEvMessageWithPayloadPreSerialized::Load(data))); UNIT_ASSERT_VALUES_EQUAL(parsedEvent->PreSerializedData, ""); // this field is empty after deserialization auto& record = parsedEvent->GetRecord(); diff --git a/library/cpp/actors/core/mon_ut.cpp b/library/cpp/actors/core/mon_ut.cpp index d70b2c4a89..a2991e8a10 100644 --- a/library/cpp/actors/core/mon_ut.cpp +++ b/library/cpp/actors/core/mon_ut.cpp @@ -17,7 +17,7 @@ Y_UNIT_TEST_SUITE(ActorSystemMon) { TAllocChunkSerializer ser; const bool success = ev->SerializeToArcadiaStream(&ser); Y_VERIFY(success); - auto buffer = ser.Release(false); + auto buffer = ser.Release(ev->CreateSerializationInfo()); std::unique_ptr<TEvRemoteHttpInfo> restored(dynamic_cast<TEvRemoteHttpInfo*>(TEvRemoteHttpInfo::Load(buffer.Get()))); UNIT_ASSERT(restored->Query == ev->Query); UNIT_ASSERT(restored->Query.size()); diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp index fed1394686..41709d6fba 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.cpp +++ b/library/cpp/actors/interconnect/interconnect_channel.cpp @@ -24,8 +24,9 @@ namespace NActors { LWTRACK(SerializeToPacketEnd, event.Orbit, PeerNodeId, ChannelId, OutputQueueSize, task.GetDataSize()); task.Orbit.Take(event.Orbit); + Y_VERIFY(SerializationInfo); event.Descr.Flags = (event.Descr.Flags & ~IEventHandle::FlagForwardOnNondelivery) | - (ExtendedFormat ? IEventHandle::FlagExtendedFormat : 0); + (SerializationInfo->IsExtendedFormat ? IEventHandle::FlagExtendedFormat : 0); TChannelPart *part = static_cast<TChannelPart*>(task.GetFreeArea()); part->Channel = ChannelId | TChannelPart::LastPartFlag; @@ -81,18 +82,20 @@ namespace NActors { case EState::INITIAL: event.InitChecksum(); LWTRACK(SerializeToPacketBegin, event.Orbit, PeerNodeId, ChannelId, OutputQueueSize); - if (event.Event) { + if (event.Buffer) { + State = EState::BUFFER; + Iter = event.Buffer->GetBeginIter(); + SerializationInfo = &event.Buffer->GetSerializationInfo(); + } else if (event.Event) { State = EState::CHUNKER; IEventBase *base = event.Event.Get(); Chunker.SetSerializingEvent(base); - ExtendedFormat = base->IsExtendedFormat(); - } else if (event.Buffer) { - State = EState::BUFFER; - Iter = event.Buffer->GetBeginIter(); - ExtendedFormat = event.Buffer->IsExtendedFormat(); - } else { + SerializationInfoContainer = base->CreateSerializationInfo(); + SerializationInfo = &SerializationInfoContainer; + } else { // event without buffer and IEventBase instance State = EState::DESCRIPTOR; - ExtendedFormat = false; + SerializationInfoContainer = {}; + SerializationInfo = &SerializationInfoContainer; } break; @@ -167,6 +170,8 @@ namespace NActors { } event.Serial = serial; NotYetConfirmed.splice(NotYetConfirmed.end(), Queue, Queue.begin()); // move event to not-yet-confirmed queue + SerializationInfoContainer = {}; + SerializationInfo = nullptr; State = EState::INITIAL; return true; // we have processed whole event, signal to the caller } diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h index 58783d3c8d..312eff2666 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.h +++ b/library/cpp/actors/interconnect/interconnect_channel.h @@ -114,7 +114,8 @@ namespace NActors { std::list<TEventHolder> NotYetConfirmed; TRope::TConstIterator Iter; TCoroutineChunkSerializer Chunker; - bool ExtendedFormat = false; + TEventSerializationInfo SerializationInfoContainer; + const TEventSerializationInfo *SerializationInfo = nullptr; bool FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index fdf035499f..b1212b8914 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -327,12 +327,15 @@ namespace NActors { return ReestablishConnection(TDisconnectReason::ChecksumError()); } } + TEventSerializationInfo serializationInfo{ + .IsExtendedFormat = bool(descr.Flags & IEventHandle::FlagExtendedFormat), + }; auto ev = std::make_unique<IEventHandle>(SessionId, descr.Type, descr.Flags & ~IEventHandle::FlagExtendedFormat, descr.Recipient, descr.Sender, - MakeIntrusive<TEventSerializedData>(std::move(data), bool(descr.Flags & IEventHandle::FlagExtendedFormat)), + MakeIntrusive<TEventSerializedData>(std::move(data), std::move(serializationInfo)), descr.Cookie, Params.PeerScopeId, std::move(descr.TraceId)); diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp index 565a511859..32c8237b59 100644 --- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp +++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp @@ -20,7 +20,7 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) { auto pushEvent = [&](size_t size, int channel) { TString payload(size, 'X'); - auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, false), 0); + auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, TEventSerializationInfo{}), 0); auto& ch = scheduler.GetOutputChannel(channel); const bool wasWorking = ch.IsWorking(); ch.Push(*ev); diff --git a/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp b/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp index e6b2bd4e4c..7b45793e26 100644 --- a/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp +++ b/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp @@ -29,16 +29,16 @@ Y_UNIT_TEST_SUITE(EventHolderPool) { std::list<TEventHolder> q; auto& ev1 = pool.Allocate(q); - ev1.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true); + ev1.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), TEventSerializationInfo{}); auto& ev2 = pool.Allocate(q); - ev2.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true); + ev2.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), TEventSerializationInfo{}); auto& ev3 = pool.Allocate(q); - ev3.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true); + ev3.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), TEventSerializationInfo{}); auto& ev4 = pool.Allocate(q); - ev4.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true); + ev4.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), TEventSerializationInfo{}); pool.Release(q, q.begin()); pool.Release(q, q.begin()); diff --git a/library/cpp/actors/interconnect/ut/interconnect_ut.cpp b/library/cpp/actors/interconnect/ut/interconnect_ut.cpp index 8ef0b1507c..3596bffd5a 100644 --- a/library/cpp/actors/interconnect/ut/interconnect_ut.cpp +++ b/library/cpp/actors/interconnect/ut/interconnect_ut.cpp @@ -47,7 +47,7 @@ public: const TSessionToCookie::iterator s2cIt = SessionToCookie.emplace(SessionId, NextCookie); InFlight.emplace(NextCookie, std::make_tuple(s2cIt, MD5::CalcRaw(data))); TActivationContext::Send(new IEventHandle(TEvents::THelloWorld::Ping, IEventHandle::FlagTrackDelivery, Recipient, - SelfId(), MakeIntrusive<TEventSerializedData>(std::move(data), false), NextCookie)); + SelfId(), MakeIntrusive<TEventSerializedData>(std::move(data), TEventSerializationInfo{}), NextCookie)); // Cerr << (TStringBuilder() << "Send# " << NextCookie << Endl); ++NextCookie; } @@ -126,7 +126,7 @@ public: const TString& data = ev->GetChainBuffer()->GetString(); const TString& response = MD5::CalcRaw(data); TActivationContext::Send(new IEventHandle(TEvents::THelloWorld::Pong, 0, ev->Sender, SelfId(), - MakeIntrusive<TEventSerializedData>(response, false), ev->Cookie)); + MakeIntrusive<TEventSerializedData>(response, TEventSerializationInfo{}), ev->Cookie)); } STRICT_STFUNC(StateFunc, diff --git a/ydb/core/actorlib_impl/test_interconnect_ut.cpp b/ydb/core/actorlib_impl/test_interconnect_ut.cpp index 75bc226027..e906aaba34 100644 --- a/ydb/core/actorlib_impl/test_interconnect_ut.cpp +++ b/ydb/core/actorlib_impl/test_interconnect_ut.cpp @@ -109,7 +109,7 @@ Y_UNIT_TEST_SUITE(TInterconnectTest) { TAutoPtr<IEventHandle> GetSerialized(const TAutoPtr<IEventHandle>& ev) { NActors::TAllocChunkSerializer chunker; ev->GetBase()->SerializeToArcadiaStream(&chunker); - auto Data = chunker.Release(ev->GetBase()->IsExtendedFormat()); + auto Data = chunker.Release(ev->GetBase()->CreateSerializationInfo()); TAutoPtr<IEventHandle> serev = new IEventHandle(ev->GetBase()->Type(), ev->Flags, ev->Recipient, ev->Sender, diff --git a/ydb/core/base/tablet_pipe.h b/ydb/core/base/tablet_pipe.h index 76c0b30e69..8eb63331a4 100644 --- a/ydb/core/base/tablet_pipe.h +++ b/ydb/core/base/tablet_pipe.h @@ -69,7 +69,7 @@ namespace NKikimr { TEvPush() {} TEvPush(ui64 tabletId, ui32 type, const TActorId& sender, const TIntrusivePtr<TEventSerializedData>& buffer, - ui64 cookie, bool extendedFormat, bool supportsDataInPayload) + ui64 cookie, const TEventSerializationInfo& serializationInfo, bool supportsDataInPayload) { Record.SetTabletId(tabletId); Record.SetType(type); @@ -80,7 +80,7 @@ namespace NKikimr { Record.SetBuffer(buffer->GetString()); } Record.SetCookie(cookie); - Record.SetExtendedFormat(extendedFormat); + Record.SetExtendedFormat(serializationInfo.IsExtendedFormat); } void SetSeqNo(ui64 seqNo) { diff --git a/ydb/core/blobstorage/ut_vdisk/lib/test_synclog.cpp b/ydb/core/blobstorage/ut_vdisk/lib/test_synclog.cpp index 04a821a497..b3dde08554 100644 --- a/ydb/core/blobstorage/ut_vdisk/lib/test_synclog.cpp +++ b/ydb/core/blobstorage/ut_vdisk/lib/test_synclog.cpp @@ -65,7 +65,7 @@ class TDataWriterActor : public TActorBootstrapped<TDataWriterActor> { TEvBlobStorage::TEvVBlock logCmd(tabletId, Generation, TestCtx->SelfVDiskId, TInstant::Max()); TAllocChunkSerializer serializer; logCmd.SerializeToArcadiaStream(&serializer); - TIntrusivePtr<TEventSerializedData> buffers = serializer.Release(logCmd.IsExtendedFormat()); + TIntrusivePtr<TEventSerializedData> buffers = serializer.Release(logCmd.CreateSerializationInfo()); ctx.Send(TestCtx->LoggerId, new NPDisk::TEvLog(TestCtx->PDiskCtx->Dsk->Owner, TestCtx->PDiskCtx->Dsk->OwnerRound, TLogSignature::SignatureBlock, TRcBuf(buffers->GetString()), seg, nullptr)); diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h index c5f58bd925..e4161d225a 100644 --- a/ydb/core/kqp/common/kqp.h +++ b/ydb/core/kqp/common/kqp.h @@ -269,10 +269,8 @@ struct TEvKqp { return true; } - // Same as TEventPBBase but without Rope - bool IsExtendedFormat() const override { - return false; - } + // Same as TEventPBBase but without Rope (but can contain Payload and will lose some data after all) + TEventSerializationInfo CreateSerializationInfo() const override { return {}; } ui64 GetRequestSize() const { return Record.GetRequest().ByteSizeLong(); diff --git a/ydb/core/tablet/tablet_pipe_client.cpp b/ydb/core/tablet/tablet_pipe_client.cpp index 55df667343..225abef785 100644 --- a/ydb/core/tablet/tablet_pipe_client.cpp +++ b/ydb/core/tablet/tablet_pipe_client.cpp @@ -578,11 +578,11 @@ namespace NTabletPipe { Y_VERIFY(Event, "Sending an empty event without a buffer"); TAllocChunkSerializer serializer; Event->SerializeToArcadiaStream(&serializer); - Buffer = serializer.Release(Event->IsExtendedFormat()); + Buffer = serializer.Release(Event->CreateSerializationInfo()); } auto msg = MakeHolder<TEvTabletPipe::TEvPush>(tabletId, Type, Sender, Buffer, cookie, - Buffer->IsExtendedFormat(), supportsDataInPayload); + Buffer->GetSerializationInfo(), supportsDataInPayload); if (SeqNo) { msg->SetSeqNo(SeqNo); diff --git a/ydb/core/tablet/tablet_pipe_server.cpp b/ydb/core/tablet/tablet_pipe_server.cpp index 2a4b14a900..ca6a67bc8f 100644 --- a/ydb/core/tablet/tablet_pipe_server.cpp +++ b/ydb/core/tablet/tablet_pipe_server.cpp @@ -74,9 +74,12 @@ namespace NTabletPipe { } Y_VERIFY(!msg.GetPayloadCount() || (msg.GetPayloadCount() == 1 && !record.HasBuffer())); + TEventSerializationInfo serializationInfo{ + .IsExtendedFormat = record.GetExtendedFormat(), + }; auto buffer = msg.GetPayloadCount() - ? MakeIntrusive<TEventSerializedData>(TRope(msg.GetPayload(0)), record.GetExtendedFormat()) - : MakeIntrusive<TEventSerializedData>(record.GetBuffer(), record.GetExtendedFormat()); + ? MakeIntrusive<TEventSerializedData>(TRope(msg.GetPayload(0)), std::move(serializationInfo)) + : MakeIntrusive<TEventSerializedData>(record.GetBuffer(), std::move(serializationInfo)); auto result = std::make_unique<IEventHandle>( ev->InterconnectSession, diff --git a/ydb/core/test_tablet/state_server_interface.cpp b/ydb/core/test_tablet/state_server_interface.cpp index 79ff3d223f..8839763725 100644 --- a/ydb/core/test_tablet/state_server_interface.cpp +++ b/ydb/core/test_tablet/state_server_interface.cpp @@ -153,7 +153,7 @@ namespace NKikimr::NTestShard { Y_VERIFY(type); ResponseQ.push_back(TResponseInfo{ev->Sender, ev->Cookie, type}); auto buffers = ev->ReleaseChainBuffer(); - Y_VERIFY(!buffers->IsExtendedFormat()); + Y_VERIFY(!buffers->GetSerializationInfo().IsExtendedFormat); const ui32 len = buffers->GetSize(); Y_VERIFY(len <= 64 * 1024 * 1024); TString w = TString::Uninitialized(sizeof(ui32) + len); @@ -170,7 +170,8 @@ namespace NKikimr::NTestShard { Y_VERIFY(!ResponseQ.empty()); TResponseInfo& response = ResponseQ.front(); TActivationContext::Send(new IEventHandle(response.Type, 0, response.Sender, self->SelfId(), - MakeIntrusive<TEventSerializedData>(std::move(ReadBuffer), false), response.Cookie)); + MakeIntrusive<TEventSerializedData>(std::move(ReadBuffer), TEventSerializationInfo{}), + response.Cookie)); ResponseQ.pop_front(); } diff --git a/ydb/core/testlib/fake_coordinator.h b/ydb/core/testlib/fake_coordinator.h index b3098d60a5..b9aa1ca813 100644 --- a/ydb/core/testlib/fake_coordinator.h +++ b/ydb/core/testlib/fake_coordinator.h @@ -212,7 +212,7 @@ namespace NKikimr { ev->SerializeToArcadiaStream(&serializer); Cerr << "FAKE_COORDINATOR: Send Plan to tablet " << tabletId << " for txId: " << ev->Record.GetTransactions(0).GetTxId() << " at step: " << step << "\n"; - Pipes->Send(ctx, tabletId, ev->EventType, serializer.Release(ev->IsExtendedFormat())); + Pipes->Send(ctx, tabletId, ev->EventType, serializer.Release(ev->CreateSerializationInfo())); } } } diff --git a/ydb/core/tx/mediator/tablet_queue.cpp b/ydb/core/tx/mediator/tablet_queue.cpp index 3878119294..fa05f7acd2 100644 --- a/ydb/core/tx/mediator/tablet_queue.cpp +++ b/ydb/core/tx/mediator/tablet_queue.cpp @@ -121,7 +121,7 @@ class TTxMediatorTabletQueue : public TActor<TTxMediatorTabletQueue> { TAllocChunkSerializer serializer; const bool success = evx.SerializeToArcadiaStream(&serializer); Y_VERIFY(success); - TIntrusivePtr<TEventSerializedData> data = serializer.Release(evx.IsExtendedFormat()); + TIntrusivePtr<TEventSerializedData> data = serializer.Release(evx.CreateSerializationInfo()); // todo: we must throttle delivery const ui32 sendFlags = IEventHandle::FlagTrackDelivery; diff --git a/ydb/core/tx/scheme_board/helpers.cpp b/ydb/core/tx/scheme_board/helpers.cpp index 3b53323ad3..add1b5a460 100644 --- a/ydb/core/tx/scheme_board/helpers.cpp +++ b/ydb/core/tx/scheme_board/helpers.cpp @@ -84,7 +84,7 @@ TSet<ui64> GetAbandonedSchemeShardIds(const NKikimrScheme::TEvDescribeSchemeResu TIntrusivePtr<TEventSerializedData> SerializeEvent(IEventBase* ev) { TAllocChunkSerializer serializer; Y_VERIFY(ev->SerializeToArcadiaStream(&serializer)); - return serializer.Release(ev->IsExtendedFormat()); + return serializer.Release(ev->CreateSerializationInfo()); } void MultiSend(const TVector<const TActorId*>& recipients, const TActorId& sender, TAutoPtr<IEventBase> ev, ui32 flags, ui64 cookie) { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp index 2cde5a9ce7..e0ab1cecde 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp @@ -617,7 +617,7 @@ void TSideEffects::DoBindMsg(TSchemeShard *ss, const TActorContext &ctx) { TAllocChunkSerializer serializer; const bool success = message->SerializeToArcadiaStream(&serializer); Y_VERIFY(success); - TIntrusivePtr<TEventSerializedData> data = serializer.Release(message->IsExtendedFormat()); + TIntrusivePtr<TEventSerializedData> data = serializer.Release(message->CreateSerializationInfo()); operation->PipeBindedMessages[tablet][cookie] = TOperation::TPreSerializedMessage(msgType, data, opId); ss->PipeClientCache->Send(ctx, ui64(tablet), msgType, data, cookie.second); diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp index 16918116d7..28e5a8285e 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp @@ -475,7 +475,7 @@ private: TAllocChunkSerializer serializer; const bool success = message->SerializeToArcadiaStream(&serializer); Y_VERIFY(success); - TIntrusivePtr<TEventSerializedData> data = serializer.Release(message->IsExtendedFormat()); + TIntrusivePtr<TEventSerializedData> data = serializer.Release(message->CreateSerializationInfo()); return TPreSerializedMessage(message->Type(), data); } |