summaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/core
diff options
context:
space:
mode:
authoralexvru <[email protected]>2023-01-23 17:26:05 +0300
committeralexvru <[email protected]>2023-01-23 17:26:05 +0300
commita65aef1c0441efcfc8721c8a67fd25195f34d402 (patch)
treeede200d3dbf5baebd15de4fdf792341a7c5b9618 /library/cpp/actors/core
parent4395852866f7a69b06e307b0f9fcd8ef950c200b (diff)
Fix serialization for external data channel,
Diffstat (limited to 'library/cpp/actors/core')
-rw-r--r--library/cpp/actors/core/event.cpp7
-rw-r--r--library/cpp/actors/core/event.h4
-rw-r--r--library/cpp/actors/core/event_load.h36
-rw-r--r--library/cpp/actors/core/event_pb.h34
-rw-r--r--library/cpp/actors/core/event_pb_payload_ut.cpp8
-rw-r--r--library/cpp/actors/core/mon_ut.cpp2
6 files changed, 61 insertions, 30 deletions
diff --git a/library/cpp/actors/core/event.cpp b/library/cpp/actors/core/event.cpp
index 33f8ce2aaf3..6a5230f825e 100644
--- a/library/cpp/actors/core/event.cpp
+++ b/library/cpp/actors/core/event.cpp
@@ -17,7 +17,7 @@ namespace NActors {
if (Event) {
TAllocChunkSerializer serializer;
Event->SerializeToArcadiaStream(&serializer);
- auto chainBuf = serializer.Release(Event->IsExtendedFormat());
+ auto chainBuf = serializer.Release(Event->CreateSerializationInfo());
Event.Reset();
return chainBuf;
}
@@ -25,12 +25,13 @@ namespace NActors {
}
TIntrusivePtr<TEventSerializedData> IEventHandle::GetChainBuffer() {
- if (Buffer)
+ if (Buffer) {
return Buffer;
+ }
if (Event) {
TAllocChunkSerializer serializer;
Event->SerializeToArcadiaStream(&serializer);
- Buffer = serializer.Release(Event->IsExtendedFormat());
+ Buffer = serializer.Release(Event->CreateSerializationInfo());
return Buffer;
}
return new TEventSerializedData;
diff --git a/library/cpp/actors/core/event.h b/library/cpp/actors/core/event.h
index 6b92edaf41b..4eedeb0574a 100644
--- a/library/cpp/actors/core/event.h
+++ b/library/cpp/actors/core/event.h
@@ -47,12 +47,10 @@ namespace NActors {
virtual ui32 Type() const = 0;
virtual bool SerializeToArcadiaStream(TChunkSerializer*) const = 0;
virtual bool IsSerializable() const = 0;
- virtual bool IsExtendedFormat() const {
- return false;
- }
virtual ui32 CalculateSerializedSizeCached() const {
return CalculateSerializedSize();
}
+ virtual TEventSerializationInfo CreateSerializationInfo() const { return {}; }
};
// fat handle
diff --git a/library/cpp/actors/core/event_load.h b/library/cpp/actors/core/event_load.h
index c7cfbdd1b3b..30cc26aa463 100644
--- a/library/cpp/actors/core/event_load.h
+++ b/library/cpp/actors/core/event_load.h
@@ -19,39 +19,55 @@ namespace NActors {
size_t Size;
};
+ struct TEventSectionInfo {
+ size_t Headroom = 0; // headroom to be created on the receiving side
+ size_t Size = 0; // full size of serialized event section (a chunk in rope)
+ size_t Tailroom = 0; // tailroom for the chunk
+ size_t Alignment = 0; // required alignment
+ };
+
+ struct TEventSerializationInfo {
+ bool IsExtendedFormat = {};
+ std::vector<TEventSectionInfo> Sections;
+ // total sum of Size for every section must match actual serialized size of the event
+ };
+
class TEventSerializedData
: public TThrRefBase
{
TRope Rope;
- bool ExtendedFormat = false;
+ TEventSerializationInfo SerializationInfo;
public:
TEventSerializedData() = default;
- TEventSerializedData(TRope&& rope, bool extendedFormat)
+ TEventSerializedData(TRope&& rope, TEventSerializationInfo&& serializationInfo)
: Rope(std::move(rope))
- , ExtendedFormat(extendedFormat)
+ , SerializationInfo(std::move(serializationInfo))
{}
TEventSerializedData(const TEventSerializedData& original, TString extraBuffer)
: Rope(original.Rope)
- , ExtendedFormat(original.ExtendedFormat)
+ , SerializationInfo(original.SerializationInfo)
{
+ if (!SerializationInfo.Sections.empty()) {
+ SerializationInfo.Sections.push_back(TEventSectionInfo{0, extraBuffer.size(), 0, 0});
+ }
Append(std::move(extraBuffer));
}
- TEventSerializedData(TString buffer, bool extendedFormat)
- : ExtendedFormat(extendedFormat)
+ TEventSerializedData(TString buffer, TEventSerializationInfo&& serializationInfo)
+ : SerializationInfo(std::move(serializationInfo))
{
Append(std::move(buffer));
}
- void SetExtendedFormat() {
- ExtendedFormat = true;
+ void SetSerializationInfo(TEventSerializationInfo&& serializationInfo) {
+ SerializationInfo = std::move(serializationInfo);
}
- bool IsExtendedFormat() const {
- return ExtendedFormat;
+ const TEventSerializationInfo& GetSerializationInfo() const {
+ return SerializationInfo;
}
TRope::TConstIterator GetBeginIter() const {
diff --git a/library/cpp/actors/core/event_pb.h b/library/cpp/actors/core/event_pb.h
index 2d388fceeb0..594559cea78 100644
--- a/library/cpp/actors/core/event_pb.h
+++ b/library/cpp/actors/core/event_pb.h
@@ -56,10 +56,8 @@ namespace NActors {
bool WriteRope(const TRope *rope) override;
bool WriteString(const TString *s) override;
- inline TIntrusivePtr<TEventSerializedData> Release(bool extendedFormat) {
- if (extendedFormat) {
- Buffers->SetExtendedFormat();
- }
+ inline TIntrusivePtr<TEventSerializedData> Release(TEventSerializationInfo&& serializationInfo) {
+ Buffers->SetSerializationInfo(std::move(serializationInfo));
return std::move(Buffers);
}
@@ -160,10 +158,6 @@ namespace NActors {
return true;
}
- bool IsExtendedFormat() const override {
- return static_cast<bool>(Payload);
- }
-
bool SerializeToArcadiaStream(TChunkSerializer* chunker) const override {
// serialize payload first
if (Payload) {
@@ -236,7 +230,7 @@ namespace NActors {
TRope::TConstIterator iter = input->GetBeginIter();
ui64 size = input->GetSize();
- if (input->IsExtendedFormat()) {
+ if (const auto& info = input->GetSerializationInfo(); info.IsExtendedFormat) {
// check marker
if (!iter.Valid() || *iter.ContiguousData() != PayloadMarker) {
Y_FAIL("invalid event");
@@ -288,6 +282,28 @@ namespace NActors {
CachedByteSize = 0;
}
+ TEventSerializationInfo CreateSerializationInfo() const override {
+ TEventSerializationInfo info;
+
+ if (Payload) {
+ char temp[MaxNumberBytes];
+ info.Sections.push_back(TEventSectionInfo{0, 1, 0, 0}); // payload marker
+ info.Sections.push_back(TEventSectionInfo{0, SerializeNumber(Payload.size(), temp), 0, 0});
+ for (const TRope& payload : Payload) {
+ info.Sections.push_back(TEventSectionInfo{0, SerializeNumber(payload.GetSize(), temp), 0, 0}); // length
+ info.Sections.push_back(TEventSectionInfo{0, payload.GetSize(), 0, 0}); // data
+ }
+ info.IsExtendedFormat = true;
+ } else {
+ info.IsExtendedFormat = false;
+ }
+
+ const int byteSize = Max(0, Record.ByteSize());
+ info.Sections.push_back(TEventSectionInfo{0, static_cast<size_t>(byteSize), 0, 0});
+
+ return info;
+ }
+
public:
void ReservePayload(size_t size) {
Payload.reserve(size);
diff --git a/library/cpp/actors/core/event_pb_payload_ut.cpp b/library/cpp/actors/core/event_pb_payload_ut.cpp
index eab007bc15d..fe47bf4de0e 100644
--- a/library/cpp/actors/core/event_pb_payload_ut.cpp
+++ b/library/cpp/actors/core/event_pb_payload_ut.cpp
@@ -50,7 +50,7 @@ Y_UNIT_TEST_SUITE(TEventProtoWithPayload) {
auto serializer = MakeHolder<TAllocChunkSerializer>();
msg.SerializeToArcadiaStream(serializer.Get());
- auto buffers = serializer->Release(msg.IsExtendedFormat());
+ auto buffers = serializer->Release(msg.CreateSerializationInfo());
UNIT_ASSERT_VALUES_EQUAL(buffers->GetSize(), msg.CalculateSerializedSize());
TString ser = buffers->GetString();
@@ -128,20 +128,20 @@ Y_UNIT_TEST_SUITE(TEventProtoWithPayload) {
auto serializer1 = MakeHolder<TAllocChunkSerializer>();
e1.SerializeToArcadiaStream(serializer1.Get());
- auto buffers1 = serializer1->Release(e1.IsExtendedFormat());
+ auto buffers1 = serializer1->Release(e1.CreateSerializationInfo());
UNIT_ASSERT_VALUES_EQUAL(buffers1->GetSize(), e1.CalculateSerializedSize());
TString ser1 = buffers1->GetString();
TEvMessageWithPayload e2(msg);
auto serializer2 = MakeHolder<TAllocChunkSerializer>();
e2.SerializeToArcadiaStream(serializer2.Get());
- auto buffers2 = serializer2->Release(e2.IsExtendedFormat());
+ auto buffers2 = serializer2->Release(e2.CreateSerializationInfo());
UNIT_ASSERT_VALUES_EQUAL(buffers2->GetSize(), e2.CalculateSerializedSize());
TString ser2 = buffers2->GetString();
UNIT_ASSERT_VALUES_EQUAL(ser1, ser2);
// deserialize
- auto data = MakeIntrusive<TEventSerializedData>(ser1, false);
+ auto data = MakeIntrusive<TEventSerializedData>(ser1, TEventSerializationInfo{});
THolder<TEvMessageWithPayloadPreSerialized> parsedEvent(static_cast<TEvMessageWithPayloadPreSerialized*>(TEvMessageWithPayloadPreSerialized::Load(data)));
UNIT_ASSERT_VALUES_EQUAL(parsedEvent->PreSerializedData, ""); // this field is empty after deserialization
auto& record = parsedEvent->GetRecord();
diff --git a/library/cpp/actors/core/mon_ut.cpp b/library/cpp/actors/core/mon_ut.cpp
index d70b2c4a896..a2991e8a10c 100644
--- a/library/cpp/actors/core/mon_ut.cpp
+++ b/library/cpp/actors/core/mon_ut.cpp
@@ -17,7 +17,7 @@ Y_UNIT_TEST_SUITE(ActorSystemMon) {
TAllocChunkSerializer ser;
const bool success = ev->SerializeToArcadiaStream(&ser);
Y_VERIFY(success);
- auto buffer = ser.Release(false);
+ auto buffer = ser.Release(ev->CreateSerializationInfo());
std::unique_ptr<TEvRemoteHttpInfo> restored(dynamic_cast<TEvRemoteHttpInfo*>(TEvRemoteHttpInfo::Load(buffer.Get())));
UNIT_ASSERT(restored->Query == ev->Query);
UNIT_ASSERT(restored->Query.size());