aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-01-23 17:26:05 +0300
committeralexvru <alexvru@ydb.tech>2023-01-23 17:26:05 +0300
commita65aef1c0441efcfc8721c8a67fd25195f34d402 (patch)
treeede200d3dbf5baebd15de4fdf792341a7c5b9618
parent4395852866f7a69b06e307b0f9fcd8ef950c200b (diff)
downloadydb-a65aef1c0441efcfc8721c8a67fd25195f34d402.tar.gz
Fix serialization for external data channel,
-rw-r--r--library/cpp/actors/core/event.cpp7
-rw-r--r--library/cpp/actors/core/event.h4
-rw-r--r--library/cpp/actors/core/event_load.h36
-rw-r--r--library/cpp/actors/core/event_pb.h34
-rw-r--r--library/cpp/actors/core/event_pb_payload_ut.cpp8
-rw-r--r--library/cpp/actors/core/mon_ut.cpp2
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.cpp23
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.h3
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp5
-rw-r--r--library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp2
-rw-r--r--library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp8
-rw-r--r--library/cpp/actors/interconnect/ut/interconnect_ut.cpp4
-rw-r--r--ydb/core/actorlib_impl/test_interconnect_ut.cpp2
-rw-r--r--ydb/core/base/tablet_pipe.h4
-rw-r--r--ydb/core/blobstorage/ut_vdisk/lib/test_synclog.cpp2
-rw-r--r--ydb/core/kqp/common/kqp.h6
-rw-r--r--ydb/core/tablet/tablet_pipe_client.cpp4
-rw-r--r--ydb/core/tablet/tablet_pipe_server.cpp7
-rw-r--r--ydb/core/test_tablet/state_server_interface.cpp5
-rw-r--r--ydb/core/testlib/fake_coordinator.h2
-rw-r--r--ydb/core/tx/mediator/tablet_queue.cpp2
-rw-r--r--ydb/core/tx/scheme_board/helpers.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp2
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/test_env.cpp2
24 files changed, 109 insertions, 67 deletions
diff --git a/library/cpp/actors/core/event.cpp b/library/cpp/actors/core/event.cpp
index 33f8ce2aaf..6a5230f825 100644
--- a/library/cpp/actors/core/event.cpp
+++ b/library/cpp/actors/core/event.cpp
@@ -17,7 +17,7 @@ namespace NActors {
if (Event) {
TAllocChunkSerializer serializer;
Event->SerializeToArcadiaStream(&serializer);
- auto chainBuf = serializer.Release(Event->IsExtendedFormat());
+ auto chainBuf = serializer.Release(Event->CreateSerializationInfo());
Event.Reset();
return chainBuf;
}
@@ -25,12 +25,13 @@ namespace NActors {
}
TIntrusivePtr<TEventSerializedData> IEventHandle::GetChainBuffer() {
- if (Buffer)
+ if (Buffer) {
return Buffer;
+ }
if (Event) {
TAllocChunkSerializer serializer;
Event->SerializeToArcadiaStream(&serializer);
- Buffer = serializer.Release(Event->IsExtendedFormat());
+ Buffer = serializer.Release(Event->CreateSerializationInfo());
return Buffer;
}
return new TEventSerializedData;
diff --git a/library/cpp/actors/core/event.h b/library/cpp/actors/core/event.h
index 6b92edaf41..4eedeb0574 100644
--- a/library/cpp/actors/core/event.h
+++ b/library/cpp/actors/core/event.h
@@ -47,12 +47,10 @@ namespace NActors {
virtual ui32 Type() const = 0;
virtual bool SerializeToArcadiaStream(TChunkSerializer*) const = 0;
virtual bool IsSerializable() const = 0;
- virtual bool IsExtendedFormat() const {
- return false;
- }
virtual ui32 CalculateSerializedSizeCached() const {
return CalculateSerializedSize();
}
+ virtual TEventSerializationInfo CreateSerializationInfo() const { return {}; }
};
// fat handle
diff --git a/library/cpp/actors/core/event_load.h b/library/cpp/actors/core/event_load.h
index c7cfbdd1b3..30cc26aa46 100644
--- a/library/cpp/actors/core/event_load.h
+++ b/library/cpp/actors/core/event_load.h
@@ -19,39 +19,55 @@ namespace NActors {
size_t Size;
};
+ struct TEventSectionInfo {
+ size_t Headroom = 0; // headroom to be created on the receiving side
+ size_t Size = 0; // full size of serialized event section (a chunk in rope)
+ size_t Tailroom = 0; // tailroom for the chunk
+ size_t Alignment = 0; // required alignment
+ };
+
+ struct TEventSerializationInfo {
+ bool IsExtendedFormat = {};
+ std::vector<TEventSectionInfo> Sections;
+ // total sum of Size for every section must match actual serialized size of the event
+ };
+
class TEventSerializedData
: public TThrRefBase
{
TRope Rope;
- bool ExtendedFormat = false;
+ TEventSerializationInfo SerializationInfo;
public:
TEventSerializedData() = default;
- TEventSerializedData(TRope&& rope, bool extendedFormat)
+ TEventSerializedData(TRope&& rope, TEventSerializationInfo&& serializationInfo)
: Rope(std::move(rope))
- , ExtendedFormat(extendedFormat)
+ , SerializationInfo(std::move(serializationInfo))
{}
TEventSerializedData(const TEventSerializedData& original, TString extraBuffer)
: Rope(original.Rope)
- , ExtendedFormat(original.ExtendedFormat)
+ , SerializationInfo(original.SerializationInfo)
{
+ if (!SerializationInfo.Sections.empty()) {
+ SerializationInfo.Sections.push_back(TEventSectionInfo{0, extraBuffer.size(), 0, 0});
+ }
Append(std::move(extraBuffer));
}
- TEventSerializedData(TString buffer, bool extendedFormat)
- : ExtendedFormat(extendedFormat)
+ TEventSerializedData(TString buffer, TEventSerializationInfo&& serializationInfo)
+ : SerializationInfo(std::move(serializationInfo))
{
Append(std::move(buffer));
}
- void SetExtendedFormat() {
- ExtendedFormat = true;
+ void SetSerializationInfo(TEventSerializationInfo&& serializationInfo) {
+ SerializationInfo = std::move(serializationInfo);
}
- bool IsExtendedFormat() const {
- return ExtendedFormat;
+ const TEventSerializationInfo& GetSerializationInfo() const {
+ return SerializationInfo;
}
TRope::TConstIterator GetBeginIter() const {
diff --git a/library/cpp/actors/core/event_pb.h b/library/cpp/actors/core/event_pb.h
index 2d388fceeb..594559cea7 100644
--- a/library/cpp/actors/core/event_pb.h
+++ b/library/cpp/actors/core/event_pb.h
@@ -56,10 +56,8 @@ namespace NActors {
bool WriteRope(const TRope *rope) override;
bool WriteString(const TString *s) override;
- inline TIntrusivePtr<TEventSerializedData> Release(bool extendedFormat) {
- if (extendedFormat) {
- Buffers->SetExtendedFormat();
- }
+ inline TIntrusivePtr<TEventSerializedData> Release(TEventSerializationInfo&& serializationInfo) {
+ Buffers->SetSerializationInfo(std::move(serializationInfo));
return std::move(Buffers);
}
@@ -160,10 +158,6 @@ namespace NActors {
return true;
}
- bool IsExtendedFormat() const override {
- return static_cast<bool>(Payload);
- }
-
bool SerializeToArcadiaStream(TChunkSerializer* chunker) const override {
// serialize payload first
if (Payload) {
@@ -236,7 +230,7 @@ namespace NActors {
TRope::TConstIterator iter = input->GetBeginIter();
ui64 size = input->GetSize();
- if (input->IsExtendedFormat()) {
+ if (const auto& info = input->GetSerializationInfo(); info.IsExtendedFormat) {
// check marker
if (!iter.Valid() || *iter.ContiguousData() != PayloadMarker) {
Y_FAIL("invalid event");
@@ -288,6 +282,28 @@ namespace NActors {
CachedByteSize = 0;
}
+ TEventSerializationInfo CreateSerializationInfo() const override {
+ TEventSerializationInfo info;
+
+ if (Payload) {
+ char temp[MaxNumberBytes];
+ info.Sections.push_back(TEventSectionInfo{0, 1, 0, 0}); // payload marker
+ info.Sections.push_back(TEventSectionInfo{0, SerializeNumber(Payload.size(), temp), 0, 0});
+ for (const TRope& payload : Payload) {
+ info.Sections.push_back(TEventSectionInfo{0, SerializeNumber(payload.GetSize(), temp), 0, 0}); // length
+ info.Sections.push_back(TEventSectionInfo{0, payload.GetSize(), 0, 0}); // data
+ }
+ info.IsExtendedFormat = true;
+ } else {
+ info.IsExtendedFormat = false;
+ }
+
+ const int byteSize = Max(0, Record.ByteSize());
+ info.Sections.push_back(TEventSectionInfo{0, static_cast<size_t>(byteSize), 0, 0});
+
+ return info;
+ }
+
public:
void ReservePayload(size_t size) {
Payload.reserve(size);
diff --git a/library/cpp/actors/core/event_pb_payload_ut.cpp b/library/cpp/actors/core/event_pb_payload_ut.cpp
index eab007bc15..fe47bf4de0 100644
--- a/library/cpp/actors/core/event_pb_payload_ut.cpp
+++ b/library/cpp/actors/core/event_pb_payload_ut.cpp
@@ -50,7 +50,7 @@ Y_UNIT_TEST_SUITE(TEventProtoWithPayload) {
auto serializer = MakeHolder<TAllocChunkSerializer>();
msg.SerializeToArcadiaStream(serializer.Get());
- auto buffers = serializer->Release(msg.IsExtendedFormat());
+ auto buffers = serializer->Release(msg.CreateSerializationInfo());
UNIT_ASSERT_VALUES_EQUAL(buffers->GetSize(), msg.CalculateSerializedSize());
TString ser = buffers->GetString();
@@ -128,20 +128,20 @@ Y_UNIT_TEST_SUITE(TEventProtoWithPayload) {
auto serializer1 = MakeHolder<TAllocChunkSerializer>();
e1.SerializeToArcadiaStream(serializer1.Get());
- auto buffers1 = serializer1->Release(e1.IsExtendedFormat());
+ auto buffers1 = serializer1->Release(e1.CreateSerializationInfo());
UNIT_ASSERT_VALUES_EQUAL(buffers1->GetSize(), e1.CalculateSerializedSize());
TString ser1 = buffers1->GetString();
TEvMessageWithPayload e2(msg);
auto serializer2 = MakeHolder<TAllocChunkSerializer>();
e2.SerializeToArcadiaStream(serializer2.Get());
- auto buffers2 = serializer2->Release(e2.IsExtendedFormat());
+ auto buffers2 = serializer2->Release(e2.CreateSerializationInfo());
UNIT_ASSERT_VALUES_EQUAL(buffers2->GetSize(), e2.CalculateSerializedSize());
TString ser2 = buffers2->GetString();
UNIT_ASSERT_VALUES_EQUAL(ser1, ser2);
// deserialize
- auto data = MakeIntrusive<TEventSerializedData>(ser1, false);
+ auto data = MakeIntrusive<TEventSerializedData>(ser1, TEventSerializationInfo{});
THolder<TEvMessageWithPayloadPreSerialized> parsedEvent(static_cast<TEvMessageWithPayloadPreSerialized*>(TEvMessageWithPayloadPreSerialized::Load(data)));
UNIT_ASSERT_VALUES_EQUAL(parsedEvent->PreSerializedData, ""); // this field is empty after deserialization
auto& record = parsedEvent->GetRecord();
diff --git a/library/cpp/actors/core/mon_ut.cpp b/library/cpp/actors/core/mon_ut.cpp
index d70b2c4a89..a2991e8a10 100644
--- a/library/cpp/actors/core/mon_ut.cpp
+++ b/library/cpp/actors/core/mon_ut.cpp
@@ -17,7 +17,7 @@ Y_UNIT_TEST_SUITE(ActorSystemMon) {
TAllocChunkSerializer ser;
const bool success = ev->SerializeToArcadiaStream(&ser);
Y_VERIFY(success);
- auto buffer = ser.Release(false);
+ auto buffer = ser.Release(ev->CreateSerializationInfo());
std::unique_ptr<TEvRemoteHttpInfo> restored(dynamic_cast<TEvRemoteHttpInfo*>(TEvRemoteHttpInfo::Load(buffer.Get())));
UNIT_ASSERT(restored->Query == ev->Query);
UNIT_ASSERT(restored->Query.size());
diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp
index fed1394686..41709d6fba 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.cpp
+++ b/library/cpp/actors/interconnect/interconnect_channel.cpp
@@ -24,8 +24,9 @@ namespace NActors {
LWTRACK(SerializeToPacketEnd, event.Orbit, PeerNodeId, ChannelId, OutputQueueSize, task.GetDataSize());
task.Orbit.Take(event.Orbit);
+ Y_VERIFY(SerializationInfo);
event.Descr.Flags = (event.Descr.Flags & ~IEventHandle::FlagForwardOnNondelivery) |
- (ExtendedFormat ? IEventHandle::FlagExtendedFormat : 0);
+ (SerializationInfo->IsExtendedFormat ? IEventHandle::FlagExtendedFormat : 0);
TChannelPart *part = static_cast<TChannelPart*>(task.GetFreeArea());
part->Channel = ChannelId | TChannelPart::LastPartFlag;
@@ -81,18 +82,20 @@ namespace NActors {
case EState::INITIAL:
event.InitChecksum();
LWTRACK(SerializeToPacketBegin, event.Orbit, PeerNodeId, ChannelId, OutputQueueSize);
- if (event.Event) {
+ if (event.Buffer) {
+ State = EState::BUFFER;
+ Iter = event.Buffer->GetBeginIter();
+ SerializationInfo = &event.Buffer->GetSerializationInfo();
+ } else if (event.Event) {
State = EState::CHUNKER;
IEventBase *base = event.Event.Get();
Chunker.SetSerializingEvent(base);
- ExtendedFormat = base->IsExtendedFormat();
- } else if (event.Buffer) {
- State = EState::BUFFER;
- Iter = event.Buffer->GetBeginIter();
- ExtendedFormat = event.Buffer->IsExtendedFormat();
- } else {
+ SerializationInfoContainer = base->CreateSerializationInfo();
+ SerializationInfo = &SerializationInfoContainer;
+ } else { // event without buffer and IEventBase instance
State = EState::DESCRIPTOR;
- ExtendedFormat = false;
+ SerializationInfoContainer = {};
+ SerializationInfo = &SerializationInfoContainer;
}
break;
@@ -167,6 +170,8 @@ namespace NActors {
}
event.Serial = serial;
NotYetConfirmed.splice(NotYetConfirmed.end(), Queue, Queue.begin()); // move event to not-yet-confirmed queue
+ SerializationInfoContainer = {};
+ SerializationInfo = nullptr;
State = EState::INITIAL;
return true; // we have processed whole event, signal to the caller
}
diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h
index 58783d3c8d..312eff2666 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.h
+++ b/library/cpp/actors/interconnect/interconnect_channel.h
@@ -114,7 +114,8 @@ namespace NActors {
std::list<TEventHolder> NotYetConfirmed;
TRope::TConstIterator Iter;
TCoroutineChunkSerializer Chunker;
- bool ExtendedFormat = false;
+ TEventSerializationInfo SerializationInfoContainer;
+ const TEventSerializationInfo *SerializationInfo = nullptr;
bool FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
index fdf035499f..b1212b8914 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
@@ -327,12 +327,15 @@ namespace NActors {
return ReestablishConnection(TDisconnectReason::ChecksumError());
}
}
+ TEventSerializationInfo serializationInfo{
+ .IsExtendedFormat = bool(descr.Flags & IEventHandle::FlagExtendedFormat),
+ };
auto ev = std::make_unique<IEventHandle>(SessionId,
descr.Type,
descr.Flags & ~IEventHandle::FlagExtendedFormat,
descr.Recipient,
descr.Sender,
- MakeIntrusive<TEventSerializedData>(std::move(data), bool(descr.Flags & IEventHandle::FlagExtendedFormat)),
+ MakeIntrusive<TEventSerializedData>(std::move(data), std::move(serializationInfo)),
descr.Cookie,
Params.PeerScopeId,
std::move(descr.TraceId));
diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
index 565a511859..32c8237b59 100644
--- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
@@ -20,7 +20,7 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) {
auto pushEvent = [&](size_t size, int channel) {
TString payload(size, 'X');
- auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, false), 0);
+ auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, TEventSerializationInfo{}), 0);
auto& ch = scheduler.GetOutputChannel(channel);
const bool wasWorking = ch.IsWorking();
ch.Push(*ev);
diff --git a/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp b/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp
index e6b2bd4e4c..7b45793e26 100644
--- a/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp
@@ -29,16 +29,16 @@ Y_UNIT_TEST_SUITE(EventHolderPool) {
std::list<TEventHolder> q;
auto& ev1 = pool.Allocate(q);
- ev1.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true);
+ ev1.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), TEventSerializationInfo{});
auto& ev2 = pool.Allocate(q);
- ev2.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true);
+ ev2.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), TEventSerializationInfo{});
auto& ev3 = pool.Allocate(q);
- ev3.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true);
+ ev3.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), TEventSerializationInfo{});
auto& ev4 = pool.Allocate(q);
- ev4.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true);
+ ev4.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), TEventSerializationInfo{});
pool.Release(q, q.begin());
pool.Release(q, q.begin());
diff --git a/library/cpp/actors/interconnect/ut/interconnect_ut.cpp b/library/cpp/actors/interconnect/ut/interconnect_ut.cpp
index 8ef0b1507c..3596bffd5a 100644
--- a/library/cpp/actors/interconnect/ut/interconnect_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/interconnect_ut.cpp
@@ -47,7 +47,7 @@ public:
const TSessionToCookie::iterator s2cIt = SessionToCookie.emplace(SessionId, NextCookie);
InFlight.emplace(NextCookie, std::make_tuple(s2cIt, MD5::CalcRaw(data)));
TActivationContext::Send(new IEventHandle(TEvents::THelloWorld::Ping, IEventHandle::FlagTrackDelivery, Recipient,
- SelfId(), MakeIntrusive<TEventSerializedData>(std::move(data), false), NextCookie));
+ SelfId(), MakeIntrusive<TEventSerializedData>(std::move(data), TEventSerializationInfo{}), NextCookie));
// Cerr << (TStringBuilder() << "Send# " << NextCookie << Endl);
++NextCookie;
}
@@ -126,7 +126,7 @@ public:
const TString& data = ev->GetChainBuffer()->GetString();
const TString& response = MD5::CalcRaw(data);
TActivationContext::Send(new IEventHandle(TEvents::THelloWorld::Pong, 0, ev->Sender, SelfId(),
- MakeIntrusive<TEventSerializedData>(response, false), ev->Cookie));
+ MakeIntrusive<TEventSerializedData>(response, TEventSerializationInfo{}), ev->Cookie));
}
STRICT_STFUNC(StateFunc,
diff --git a/ydb/core/actorlib_impl/test_interconnect_ut.cpp b/ydb/core/actorlib_impl/test_interconnect_ut.cpp
index 75bc226027..e906aaba34 100644
--- a/ydb/core/actorlib_impl/test_interconnect_ut.cpp
+++ b/ydb/core/actorlib_impl/test_interconnect_ut.cpp
@@ -109,7 +109,7 @@ Y_UNIT_TEST_SUITE(TInterconnectTest) {
TAutoPtr<IEventHandle> GetSerialized(const TAutoPtr<IEventHandle>& ev) {
NActors::TAllocChunkSerializer chunker;
ev->GetBase()->SerializeToArcadiaStream(&chunker);
- auto Data = chunker.Release(ev->GetBase()->IsExtendedFormat());
+ auto Data = chunker.Release(ev->GetBase()->CreateSerializationInfo());
TAutoPtr<IEventHandle> serev =
new IEventHandle(ev->GetBase()->Type(), ev->Flags,
ev->Recipient, ev->Sender,
diff --git a/ydb/core/base/tablet_pipe.h b/ydb/core/base/tablet_pipe.h
index 76c0b30e69..8eb63331a4 100644
--- a/ydb/core/base/tablet_pipe.h
+++ b/ydb/core/base/tablet_pipe.h
@@ -69,7 +69,7 @@ namespace NKikimr {
TEvPush() {}
TEvPush(ui64 tabletId, ui32 type, const TActorId& sender, const TIntrusivePtr<TEventSerializedData>& buffer,
- ui64 cookie, bool extendedFormat, bool supportsDataInPayload)
+ ui64 cookie, const TEventSerializationInfo& serializationInfo, bool supportsDataInPayload)
{
Record.SetTabletId(tabletId);
Record.SetType(type);
@@ -80,7 +80,7 @@ namespace NKikimr {
Record.SetBuffer(buffer->GetString());
}
Record.SetCookie(cookie);
- Record.SetExtendedFormat(extendedFormat);
+ Record.SetExtendedFormat(serializationInfo.IsExtendedFormat);
}
void SetSeqNo(ui64 seqNo) {
diff --git a/ydb/core/blobstorage/ut_vdisk/lib/test_synclog.cpp b/ydb/core/blobstorage/ut_vdisk/lib/test_synclog.cpp
index 04a821a497..b3dde08554 100644
--- a/ydb/core/blobstorage/ut_vdisk/lib/test_synclog.cpp
+++ b/ydb/core/blobstorage/ut_vdisk/lib/test_synclog.cpp
@@ -65,7 +65,7 @@ class TDataWriterActor : public TActorBootstrapped<TDataWriterActor> {
TEvBlobStorage::TEvVBlock logCmd(tabletId, Generation, TestCtx->SelfVDiskId, TInstant::Max());
TAllocChunkSerializer serializer;
logCmd.SerializeToArcadiaStream(&serializer);
- TIntrusivePtr<TEventSerializedData> buffers = serializer.Release(logCmd.IsExtendedFormat());
+ TIntrusivePtr<TEventSerializedData> buffers = serializer.Release(logCmd.CreateSerializationInfo());
ctx.Send(TestCtx->LoggerId,
new NPDisk::TEvLog(TestCtx->PDiskCtx->Dsk->Owner, TestCtx->PDiskCtx->Dsk->OwnerRound,
TLogSignature::SignatureBlock, TRcBuf(buffers->GetString()), seg, nullptr));
diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h
index c5f58bd925..e4161d225a 100644
--- a/ydb/core/kqp/common/kqp.h
+++ b/ydb/core/kqp/common/kqp.h
@@ -269,10 +269,8 @@ struct TEvKqp {
return true;
}
- // Same as TEventPBBase but without Rope
- bool IsExtendedFormat() const override {
- return false;
- }
+ // Same as TEventPBBase but without Rope (but can contain Payload and will lose some data after all)
+ TEventSerializationInfo CreateSerializationInfo() const override { return {}; }
ui64 GetRequestSize() const {
return Record.GetRequest().ByteSizeLong();
diff --git a/ydb/core/tablet/tablet_pipe_client.cpp b/ydb/core/tablet/tablet_pipe_client.cpp
index 55df667343..225abef785 100644
--- a/ydb/core/tablet/tablet_pipe_client.cpp
+++ b/ydb/core/tablet/tablet_pipe_client.cpp
@@ -578,11 +578,11 @@ namespace NTabletPipe {
Y_VERIFY(Event, "Sending an empty event without a buffer");
TAllocChunkSerializer serializer;
Event->SerializeToArcadiaStream(&serializer);
- Buffer = serializer.Release(Event->IsExtendedFormat());
+ Buffer = serializer.Release(Event->CreateSerializationInfo());
}
auto msg = MakeHolder<TEvTabletPipe::TEvPush>(tabletId, Type, Sender, Buffer, cookie,
- Buffer->IsExtendedFormat(), supportsDataInPayload);
+ Buffer->GetSerializationInfo(), supportsDataInPayload);
if (SeqNo) {
msg->SetSeqNo(SeqNo);
diff --git a/ydb/core/tablet/tablet_pipe_server.cpp b/ydb/core/tablet/tablet_pipe_server.cpp
index 2a4b14a900..ca6a67bc8f 100644
--- a/ydb/core/tablet/tablet_pipe_server.cpp
+++ b/ydb/core/tablet/tablet_pipe_server.cpp
@@ -74,9 +74,12 @@ namespace NTabletPipe {
}
Y_VERIFY(!msg.GetPayloadCount() || (msg.GetPayloadCount() == 1 && !record.HasBuffer()));
+ TEventSerializationInfo serializationInfo{
+ .IsExtendedFormat = record.GetExtendedFormat(),
+ };
auto buffer = msg.GetPayloadCount()
- ? MakeIntrusive<TEventSerializedData>(TRope(msg.GetPayload(0)), record.GetExtendedFormat())
- : MakeIntrusive<TEventSerializedData>(record.GetBuffer(), record.GetExtendedFormat());
+ ? MakeIntrusive<TEventSerializedData>(TRope(msg.GetPayload(0)), std::move(serializationInfo))
+ : MakeIntrusive<TEventSerializedData>(record.GetBuffer(), std::move(serializationInfo));
auto result = std::make_unique<IEventHandle>(
ev->InterconnectSession,
diff --git a/ydb/core/test_tablet/state_server_interface.cpp b/ydb/core/test_tablet/state_server_interface.cpp
index 79ff3d223f..8839763725 100644
--- a/ydb/core/test_tablet/state_server_interface.cpp
+++ b/ydb/core/test_tablet/state_server_interface.cpp
@@ -153,7 +153,7 @@ namespace NKikimr::NTestShard {
Y_VERIFY(type);
ResponseQ.push_back(TResponseInfo{ev->Sender, ev->Cookie, type});
auto buffers = ev->ReleaseChainBuffer();
- Y_VERIFY(!buffers->IsExtendedFormat());
+ Y_VERIFY(!buffers->GetSerializationInfo().IsExtendedFormat);
const ui32 len = buffers->GetSize();
Y_VERIFY(len <= 64 * 1024 * 1024);
TString w = TString::Uninitialized(sizeof(ui32) + len);
@@ -170,7 +170,8 @@ namespace NKikimr::NTestShard {
Y_VERIFY(!ResponseQ.empty());
TResponseInfo& response = ResponseQ.front();
TActivationContext::Send(new IEventHandle(response.Type, 0, response.Sender, self->SelfId(),
- MakeIntrusive<TEventSerializedData>(std::move(ReadBuffer), false), response.Cookie));
+ MakeIntrusive<TEventSerializedData>(std::move(ReadBuffer), TEventSerializationInfo{}),
+ response.Cookie));
ResponseQ.pop_front();
}
diff --git a/ydb/core/testlib/fake_coordinator.h b/ydb/core/testlib/fake_coordinator.h
index b3098d60a5..b9aa1ca813 100644
--- a/ydb/core/testlib/fake_coordinator.h
+++ b/ydb/core/testlib/fake_coordinator.h
@@ -212,7 +212,7 @@ namespace NKikimr {
ev->SerializeToArcadiaStream(&serializer);
Cerr << "FAKE_COORDINATOR: Send Plan to tablet " << tabletId << " for txId: " << ev->Record.GetTransactions(0).GetTxId() << " at step: " << step << "\n";
- Pipes->Send(ctx, tabletId, ev->EventType, serializer.Release(ev->IsExtendedFormat()));
+ Pipes->Send(ctx, tabletId, ev->EventType, serializer.Release(ev->CreateSerializationInfo()));
}
}
}
diff --git a/ydb/core/tx/mediator/tablet_queue.cpp b/ydb/core/tx/mediator/tablet_queue.cpp
index 3878119294..fa05f7acd2 100644
--- a/ydb/core/tx/mediator/tablet_queue.cpp
+++ b/ydb/core/tx/mediator/tablet_queue.cpp
@@ -121,7 +121,7 @@ class TTxMediatorTabletQueue : public TActor<TTxMediatorTabletQueue> {
TAllocChunkSerializer serializer;
const bool success = evx.SerializeToArcadiaStream(&serializer);
Y_VERIFY(success);
- TIntrusivePtr<TEventSerializedData> data = serializer.Release(evx.IsExtendedFormat());
+ TIntrusivePtr<TEventSerializedData> data = serializer.Release(evx.CreateSerializationInfo());
// todo: we must throttle delivery
const ui32 sendFlags = IEventHandle::FlagTrackDelivery;
diff --git a/ydb/core/tx/scheme_board/helpers.cpp b/ydb/core/tx/scheme_board/helpers.cpp
index 3b53323ad3..add1b5a460 100644
--- a/ydb/core/tx/scheme_board/helpers.cpp
+++ b/ydb/core/tx/scheme_board/helpers.cpp
@@ -84,7 +84,7 @@ TSet<ui64> GetAbandonedSchemeShardIds(const NKikimrScheme::TEvDescribeSchemeResu
TIntrusivePtr<TEventSerializedData> SerializeEvent(IEventBase* ev) {
TAllocChunkSerializer serializer;
Y_VERIFY(ev->SerializeToArcadiaStream(&serializer));
- return serializer.Release(ev->IsExtendedFormat());
+ return serializer.Release(ev->CreateSerializationInfo());
}
void MultiSend(const TVector<const TActorId*>& recipients, const TActorId& sender, TAutoPtr<IEventBase> ev, ui32 flags, ui64 cookie) {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp
index 2cde5a9ce7..e0ab1cecde 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp
@@ -617,7 +617,7 @@ void TSideEffects::DoBindMsg(TSchemeShard *ss, const TActorContext &ctx) {
TAllocChunkSerializer serializer;
const bool success = message->SerializeToArcadiaStream(&serializer);
Y_VERIFY(success);
- TIntrusivePtr<TEventSerializedData> data = serializer.Release(message->IsExtendedFormat());
+ TIntrusivePtr<TEventSerializedData> data = serializer.Release(message->CreateSerializationInfo());
operation->PipeBindedMessages[tablet][cookie] = TOperation::TPreSerializedMessage(msgType, data, opId);
ss->PipeClientCache->Send(ctx, ui64(tablet), msgType, data, cookie.second);
diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp
index 16918116d7..28e5a8285e 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp
+++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp
@@ -475,7 +475,7 @@ private:
TAllocChunkSerializer serializer;
const bool success = message->SerializeToArcadiaStream(&serializer);
Y_VERIFY(success);
- TIntrusivePtr<TEventSerializedData> data = serializer.Release(message->IsExtendedFormat());
+ TIntrusivePtr<TEventSerializedData> data = serializer.Release(message->CreateSerializationInfo());
return TPreSerializedMessage(message->Type(), data);
}