aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2023-01-23 13:35:08 +0300
committergvit <gvit@ydb.tech>2023-01-23 13:35:08 +0300
commitda72c4727923040ee9f7328caa9ee22e4bd4ab09 (patch)
tree724aaad14329bb2ae9821238b1d43b34e1af1a9d /library/cpp
parent006419e12383729f3d660ad51f5a84832d6e3780 (diff)
downloadydb-da72c4727923040ee9f7328caa9ee22e4bd4ab09.tar.gz
Revert "Support interface part for ExternalDataChannel feature"
This reverts commit 0a05f95e826bc5416a494ba7822d7d7eaa8ce52c, reversing changes made to 1c58d6c48d9dc449c72880696f23217d19595db1.
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/actors/core/event.cpp7
-rw-r--r--library/cpp/actors/core/event.h4
-rw-r--r--library/cpp/actors/core/event_load.h40
-rw-r--r--library/cpp/actors/core/event_pb.h34
-rw-r--r--library/cpp/actors/core/event_pb_payload_ut.cpp8
-rw-r--r--library/cpp/actors/core/mon_ut.cpp2
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.cpp8
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.h2
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp5
-rw-r--r--library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp2
-rw-r--r--library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp8
-rw-r--r--library/cpp/actors/interconnect/ut/interconnect_ut.cpp4
12 files changed, 43 insertions, 81 deletions
diff --git a/library/cpp/actors/core/event.cpp b/library/cpp/actors/core/event.cpp
index 6a5230f825..33f8ce2aaf 100644
--- a/library/cpp/actors/core/event.cpp
+++ b/library/cpp/actors/core/event.cpp
@@ -17,7 +17,7 @@ namespace NActors {
if (Event) {
TAllocChunkSerializer serializer;
Event->SerializeToArcadiaStream(&serializer);
- auto chainBuf = serializer.Release(Event->CreateSerializationInfo());
+ auto chainBuf = serializer.Release(Event->IsExtendedFormat());
Event.Reset();
return chainBuf;
}
@@ -25,13 +25,12 @@ namespace NActors {
}
TIntrusivePtr<TEventSerializedData> IEventHandle::GetChainBuffer() {
- if (Buffer) {
+ if (Buffer)
return Buffer;
- }
if (Event) {
TAllocChunkSerializer serializer;
Event->SerializeToArcadiaStream(&serializer);
- Buffer = serializer.Release(Event->CreateSerializationInfo());
+ Buffer = serializer.Release(Event->IsExtendedFormat());
return Buffer;
}
return new TEventSerializedData;
diff --git a/library/cpp/actors/core/event.h b/library/cpp/actors/core/event.h
index 4eedeb0574..6b92edaf41 100644
--- a/library/cpp/actors/core/event.h
+++ b/library/cpp/actors/core/event.h
@@ -47,10 +47,12 @@ namespace NActors {
virtual ui32 Type() const = 0;
virtual bool SerializeToArcadiaStream(TChunkSerializer*) const = 0;
virtual bool IsSerializable() const = 0;
+ virtual bool IsExtendedFormat() const {
+ return false;
+ }
virtual ui32 CalculateSerializedSizeCached() const {
return CalculateSerializedSize();
}
- virtual TEventSerializationInfo CreateSerializationInfo() const { return {}; }
};
// fat handle
diff --git a/library/cpp/actors/core/event_load.h b/library/cpp/actors/core/event_load.h
index 8d069c555d..c7cfbdd1b3 100644
--- a/library/cpp/actors/core/event_load.h
+++ b/library/cpp/actors/core/event_load.h
@@ -19,59 +19,39 @@ namespace NActors {
size_t Size;
};
- struct TEventSectionInfo {
- size_t Headroom = 0; // headroom to be created on the receiving side
- size_t Size = 0; // full size of serialized event section (a chunk in rope)
- size_t Tailroom = 0; // tailroom for the chunk
- size_t Alignment = 0; // required alignment
- };
-
- struct TEventSerializationInfo {
- bool IsExtendedFormat = {};
- std::vector<TEventSectionInfo> Sections;
- // total sum of Size for every section must match actual serialized size of the event
- };
-
class TEventSerializedData
: public TThrRefBase
{
TRope Rope;
- TEventSerializationInfo SerializationInfo;
+ bool ExtendedFormat = false;
public:
TEventSerializedData() = default;
- TEventSerializedData(TRope&& rope, TEventSerializationInfo&& serializationInfo)
+ TEventSerializedData(TRope&& rope, bool extendedFormat)
: Rope(std::move(rope))
- , SerializationInfo(std::move(serializationInfo))
+ , ExtendedFormat(extendedFormat)
{}
TEventSerializedData(const TEventSerializedData& original, TString extraBuffer)
: Rope(original.Rope)
- , SerializationInfo(original.SerializationInfo)
+ , ExtendedFormat(original.ExtendedFormat)
{
- if (!SerializationInfo.Sections.empty()) {
- SerializationInfo.Sections.push_back(TEventSectionInfo{0, extraBuffer.size(), 0, 0});
- }
Append(std::move(extraBuffer));
}
- TEventSerializedData(TString buffer, TEventSerializationInfo&& serializationInfo)
- : SerializationInfo(std::move(serializationInfo))
+ TEventSerializedData(TString buffer, bool extendedFormat)
+ : ExtendedFormat(extendedFormat)
{
Append(std::move(buffer));
}
- void SetSerializationInfo(TEventSerializationInfo&& serializationInfo) {
- SerializationInfo = std::move(serializationInfo);
- }
-
- const TEventSerializationInfo& GetSerializationInfo() const {
- return SerializationInfo;
+ void SetExtendedFormat() {
+ ExtendedFormat = true;
}
- TEventSerializationInfo ReleaseSerializationInfo() {
- return std::move(SerializationInfo);
+ bool IsExtendedFormat() const {
+ return ExtendedFormat;
}
TRope::TConstIterator GetBeginIter() const {
diff --git a/library/cpp/actors/core/event_pb.h b/library/cpp/actors/core/event_pb.h
index 594559cea7..2d388fceeb 100644
--- a/library/cpp/actors/core/event_pb.h
+++ b/library/cpp/actors/core/event_pb.h
@@ -56,8 +56,10 @@ namespace NActors {
bool WriteRope(const TRope *rope) override;
bool WriteString(const TString *s) override;
- inline TIntrusivePtr<TEventSerializedData> Release(TEventSerializationInfo&& serializationInfo) {
- Buffers->SetSerializationInfo(std::move(serializationInfo));
+ inline TIntrusivePtr<TEventSerializedData> Release(bool extendedFormat) {
+ if (extendedFormat) {
+ Buffers->SetExtendedFormat();
+ }
return std::move(Buffers);
}
@@ -158,6 +160,10 @@ namespace NActors {
return true;
}
+ bool IsExtendedFormat() const override {
+ return static_cast<bool>(Payload);
+ }
+
bool SerializeToArcadiaStream(TChunkSerializer* chunker) const override {
// serialize payload first
if (Payload) {
@@ -230,7 +236,7 @@ namespace NActors {
TRope::TConstIterator iter = input->GetBeginIter();
ui64 size = input->GetSize();
- if (const auto& info = input->GetSerializationInfo(); info.IsExtendedFormat) {
+ if (input->IsExtendedFormat()) {
// check marker
if (!iter.Valid() || *iter.ContiguousData() != PayloadMarker) {
Y_FAIL("invalid event");
@@ -282,28 +288,6 @@ namespace NActors {
CachedByteSize = 0;
}
- TEventSerializationInfo CreateSerializationInfo() const override {
- TEventSerializationInfo info;
-
- if (Payload) {
- char temp[MaxNumberBytes];
- info.Sections.push_back(TEventSectionInfo{0, 1, 0, 0}); // payload marker
- info.Sections.push_back(TEventSectionInfo{0, SerializeNumber(Payload.size(), temp), 0, 0});
- for (const TRope& payload : Payload) {
- info.Sections.push_back(TEventSectionInfo{0, SerializeNumber(payload.GetSize(), temp), 0, 0}); // length
- info.Sections.push_back(TEventSectionInfo{0, payload.GetSize(), 0, 0}); // data
- }
- info.IsExtendedFormat = true;
- } else {
- info.IsExtendedFormat = false;
- }
-
- const int byteSize = Max(0, Record.ByteSize());
- info.Sections.push_back(TEventSectionInfo{0, static_cast<size_t>(byteSize), 0, 0});
-
- return info;
- }
-
public:
void ReservePayload(size_t size) {
Payload.reserve(size);
diff --git a/library/cpp/actors/core/event_pb_payload_ut.cpp b/library/cpp/actors/core/event_pb_payload_ut.cpp
index fe47bf4de0..eab007bc15 100644
--- a/library/cpp/actors/core/event_pb_payload_ut.cpp
+++ b/library/cpp/actors/core/event_pb_payload_ut.cpp
@@ -50,7 +50,7 @@ Y_UNIT_TEST_SUITE(TEventProtoWithPayload) {
auto serializer = MakeHolder<TAllocChunkSerializer>();
msg.SerializeToArcadiaStream(serializer.Get());
- auto buffers = serializer->Release(msg.CreateSerializationInfo());
+ auto buffers = serializer->Release(msg.IsExtendedFormat());
UNIT_ASSERT_VALUES_EQUAL(buffers->GetSize(), msg.CalculateSerializedSize());
TString ser = buffers->GetString();
@@ -128,20 +128,20 @@ Y_UNIT_TEST_SUITE(TEventProtoWithPayload) {
auto serializer1 = MakeHolder<TAllocChunkSerializer>();
e1.SerializeToArcadiaStream(serializer1.Get());
- auto buffers1 = serializer1->Release(e1.CreateSerializationInfo());
+ auto buffers1 = serializer1->Release(e1.IsExtendedFormat());
UNIT_ASSERT_VALUES_EQUAL(buffers1->GetSize(), e1.CalculateSerializedSize());
TString ser1 = buffers1->GetString();
TEvMessageWithPayload e2(msg);
auto serializer2 = MakeHolder<TAllocChunkSerializer>();
e2.SerializeToArcadiaStream(serializer2.Get());
- auto buffers2 = serializer2->Release(e2.CreateSerializationInfo());
+ auto buffers2 = serializer2->Release(e2.IsExtendedFormat());
UNIT_ASSERT_VALUES_EQUAL(buffers2->GetSize(), e2.CalculateSerializedSize());
TString ser2 = buffers2->GetString();
UNIT_ASSERT_VALUES_EQUAL(ser1, ser2);
// deserialize
- auto data = MakeIntrusive<TEventSerializedData>(ser1, TEventSerializationInfo{});
+ auto data = MakeIntrusive<TEventSerializedData>(ser1, false);
THolder<TEvMessageWithPayloadPreSerialized> parsedEvent(static_cast<TEvMessageWithPayloadPreSerialized*>(TEvMessageWithPayloadPreSerialized::Load(data)));
UNIT_ASSERT_VALUES_EQUAL(parsedEvent->PreSerializedData, ""); // this field is empty after deserialization
auto& record = parsedEvent->GetRecord();
diff --git a/library/cpp/actors/core/mon_ut.cpp b/library/cpp/actors/core/mon_ut.cpp
index a2991e8a10..d70b2c4a89 100644
--- a/library/cpp/actors/core/mon_ut.cpp
+++ b/library/cpp/actors/core/mon_ut.cpp
@@ -17,7 +17,7 @@ Y_UNIT_TEST_SUITE(ActorSystemMon) {
TAllocChunkSerializer ser;
const bool success = ev->SerializeToArcadiaStream(&ser);
Y_VERIFY(success);
- auto buffer = ser.Release(ev->CreateSerializationInfo());
+ auto buffer = ser.Release(false);
std::unique_ptr<TEvRemoteHttpInfo> restored(dynamic_cast<TEvRemoteHttpInfo*>(TEvRemoteHttpInfo::Load(buffer.Get())));
UNIT_ASSERT(restored->Query == ev->Query);
UNIT_ASSERT(restored->Query.size());
diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp
index 7f3bc65497..fed1394686 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.cpp
+++ b/library/cpp/actors/interconnect/interconnect_channel.cpp
@@ -25,7 +25,7 @@ namespace NActors {
task.Orbit.Take(event.Orbit);
event.Descr.Flags = (event.Descr.Flags & ~IEventHandle::FlagForwardOnNondelivery) |
- (SerializationInfo.IsExtendedFormat ? IEventHandle::FlagExtendedFormat : 0);
+ (ExtendedFormat ? IEventHandle::FlagExtendedFormat : 0);
TChannelPart *part = static_cast<TChannelPart*>(task.GetFreeArea());
part->Channel = ChannelId | TChannelPart::LastPartFlag;
@@ -85,14 +85,14 @@ namespace NActors {
State = EState::CHUNKER;
IEventBase *base = event.Event.Get();
Chunker.SetSerializingEvent(base);
- SerializationInfo = base->CreateSerializationInfo();
+ ExtendedFormat = base->IsExtendedFormat();
} else if (event.Buffer) {
State = EState::BUFFER;
Iter = event.Buffer->GetBeginIter();
- SerializationInfo = event.Buffer->ReleaseSerializationInfo();
+ ExtendedFormat = event.Buffer->IsExtendedFormat();
} else {
State = EState::DESCRIPTOR;
- SerializationInfo = {};
+ ExtendedFormat = false;
}
break;
diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h
index 4a476abc37..58783d3c8d 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.h
+++ b/library/cpp/actors/interconnect/interconnect_channel.h
@@ -114,7 +114,7 @@ namespace NActors {
std::list<TEventHolder> NotYetConfirmed;
TRope::TConstIterator Iter;
TCoroutineChunkSerializer Chunker;
- TEventSerializationInfo SerializationInfo;
+ bool ExtendedFormat = false;
bool FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
index b1212b8914..fdf035499f 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
@@ -327,15 +327,12 @@ namespace NActors {
return ReestablishConnection(TDisconnectReason::ChecksumError());
}
}
- TEventSerializationInfo serializationInfo{
- .IsExtendedFormat = bool(descr.Flags & IEventHandle::FlagExtendedFormat),
- };
auto ev = std::make_unique<IEventHandle>(SessionId,
descr.Type,
descr.Flags & ~IEventHandle::FlagExtendedFormat,
descr.Recipient,
descr.Sender,
- MakeIntrusive<TEventSerializedData>(std::move(data), std::move(serializationInfo)),
+ MakeIntrusive<TEventSerializedData>(std::move(data), bool(descr.Flags & IEventHandle::FlagExtendedFormat)),
descr.Cookie,
Params.PeerScopeId,
std::move(descr.TraceId));
diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
index 32c8237b59..565a511859 100644
--- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
@@ -20,7 +20,7 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) {
auto pushEvent = [&](size_t size, int channel) {
TString payload(size, 'X');
- auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, TEventSerializationInfo{}), 0);
+ auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, false), 0);
auto& ch = scheduler.GetOutputChannel(channel);
const bool wasWorking = ch.IsWorking();
ch.Push(*ev);
diff --git a/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp b/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp
index 7b45793e26..e6b2bd4e4c 100644
--- a/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp
@@ -29,16 +29,16 @@ Y_UNIT_TEST_SUITE(EventHolderPool) {
std::list<TEventHolder> q;
auto& ev1 = pool.Allocate(q);
- ev1.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), TEventSerializationInfo{});
+ ev1.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true);
auto& ev2 = pool.Allocate(q);
- ev2.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), TEventSerializationInfo{});
+ ev2.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true);
auto& ev3 = pool.Allocate(q);
- ev3.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), TEventSerializationInfo{});
+ ev3.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true);
auto& ev4 = pool.Allocate(q);
- ev4.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), TEventSerializationInfo{});
+ ev4.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true);
pool.Release(q, q.begin());
pool.Release(q, q.begin());
diff --git a/library/cpp/actors/interconnect/ut/interconnect_ut.cpp b/library/cpp/actors/interconnect/ut/interconnect_ut.cpp
index 3596bffd5a..8ef0b1507c 100644
--- a/library/cpp/actors/interconnect/ut/interconnect_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/interconnect_ut.cpp
@@ -47,7 +47,7 @@ public:
const TSessionToCookie::iterator s2cIt = SessionToCookie.emplace(SessionId, NextCookie);
InFlight.emplace(NextCookie, std::make_tuple(s2cIt, MD5::CalcRaw(data)));
TActivationContext::Send(new IEventHandle(TEvents::THelloWorld::Ping, IEventHandle::FlagTrackDelivery, Recipient,
- SelfId(), MakeIntrusive<TEventSerializedData>(std::move(data), TEventSerializationInfo{}), NextCookie));
+ SelfId(), MakeIntrusive<TEventSerializedData>(std::move(data), false), NextCookie));
// Cerr << (TStringBuilder() << "Send# " << NextCookie << Endl);
++NextCookie;
}
@@ -126,7 +126,7 @@ public:
const TString& data = ev->GetChainBuffer()->GetString();
const TString& response = MD5::CalcRaw(data);
TActivationContext::Send(new IEventHandle(TEvents::THelloWorld::Pong, 0, ev->Sender, SelfId(),
- MakeIntrusive<TEventSerializedData>(response, TEventSerializationInfo{}), ev->Cookie));
+ MakeIntrusive<TEventSerializedData>(response, false), ev->Cookie));
}
STRICT_STFUNC(StateFunc,