aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-01-18 20:52:56 +0300
committeralexvru <alexvru@ydb.tech>2023-01-18 20:52:56 +0300
commit165a931108b2403c481db9430596b2ebef8c5391 (patch)
tree61c85dafdfa5a4bca0222a627d153d9c5c76c118 /library/cpp/actors/interconnect
parent0d3b35526e4872d4609ba51cb38b46cc588aac71 (diff)
downloadydb-165a931108b2403c481db9430596b2ebef8c5391.tar.gz
Support interface part for ExternalDataChannel feature
Diffstat (limited to 'library/cpp/actors/interconnect')
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.cpp8
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.h2
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp5
-rw-r--r--library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp2
-rw-r--r--library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp8
-rw-r--r--library/cpp/actors/interconnect/ut/interconnect_ut.cpp4
6 files changed, 16 insertions, 13 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp
index fed1394686..7f3bc65497 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.cpp
+++ b/library/cpp/actors/interconnect/interconnect_channel.cpp
@@ -25,7 +25,7 @@ namespace NActors {
task.Orbit.Take(event.Orbit);
event.Descr.Flags = (event.Descr.Flags & ~IEventHandle::FlagForwardOnNondelivery) |
- (ExtendedFormat ? IEventHandle::FlagExtendedFormat : 0);
+ (SerializationInfo.IsExtendedFormat ? IEventHandle::FlagExtendedFormat : 0);
TChannelPart *part = static_cast<TChannelPart*>(task.GetFreeArea());
part->Channel = ChannelId | TChannelPart::LastPartFlag;
@@ -85,14 +85,14 @@ namespace NActors {
State = EState::CHUNKER;
IEventBase *base = event.Event.Get();
Chunker.SetSerializingEvent(base);
- ExtendedFormat = base->IsExtendedFormat();
+ SerializationInfo = base->CreateSerializationInfo();
} else if (event.Buffer) {
State = EState::BUFFER;
Iter = event.Buffer->GetBeginIter();
- ExtendedFormat = event.Buffer->IsExtendedFormat();
+ SerializationInfo = event.Buffer->ReleaseSerializationInfo();
} else {
State = EState::DESCRIPTOR;
- ExtendedFormat = false;
+ SerializationInfo = {};
}
break;
diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h
index 58783d3c8d..4a476abc37 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.h
+++ b/library/cpp/actors/interconnect/interconnect_channel.h
@@ -114,7 +114,7 @@ namespace NActors {
std::list<TEventHolder> NotYetConfirmed;
TRope::TConstIterator Iter;
TCoroutineChunkSerializer Chunker;
- bool ExtendedFormat = false;
+ TEventSerializationInfo SerializationInfo;
bool FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed);
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
index a8c505d94d..c18e2a5137 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
@@ -327,12 +327,15 @@ namespace NActors {
return ReestablishConnection(TDisconnectReason::ChecksumError());
}
}
+ TEventSerializationInfo serializationInfo{
+ .IsExtendedFormat = bool(descr.Flags & IEventHandle::FlagExtendedFormat),
+ };
auto ev = std::make_unique<IEventHandle>(SessionId,
descr.Type,
descr.Flags & ~IEventHandle::FlagExtendedFormat,
descr.Recipient,
descr.Sender,
- MakeIntrusive<TEventSerializedData>(std::move(data), bool(descr.Flags & IEventHandle::FlagExtendedFormat)),
+ MakeIntrusive<TEventSerializedData>(std::move(data), std::move(serializationInfo)),
descr.Cookie,
Params.PeerScopeId,
std::move(descr.TraceId));
diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
index 565a511859..32c8237b59 100644
--- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
@@ -20,7 +20,7 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) {
auto pushEvent = [&](size_t size, int channel) {
TString payload(size, 'X');
- auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, false), 0);
+ auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, TEventSerializationInfo{}), 0);
auto& ch = scheduler.GetOutputChannel(channel);
const bool wasWorking = ch.IsWorking();
ch.Push(*ev);
diff --git a/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp b/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp
index e6b2bd4e4c..7b45793e26 100644
--- a/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp
@@ -29,16 +29,16 @@ Y_UNIT_TEST_SUITE(EventHolderPool) {
std::list<TEventHolder> q;
auto& ev1 = pool.Allocate(q);
- ev1.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true);
+ ev1.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), TEventSerializationInfo{});
auto& ev2 = pool.Allocate(q);
- ev2.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true);
+ ev2.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), TEventSerializationInfo{});
auto& ev3 = pool.Allocate(q);
- ev3.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true);
+ ev3.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), TEventSerializationInfo{});
auto& ev4 = pool.Allocate(q);
- ev4.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true);
+ ev4.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), TEventSerializationInfo{});
pool.Release(q, q.begin());
pool.Release(q, q.begin());
diff --git a/library/cpp/actors/interconnect/ut/interconnect_ut.cpp b/library/cpp/actors/interconnect/ut/interconnect_ut.cpp
index 8ef0b1507c..3596bffd5a 100644
--- a/library/cpp/actors/interconnect/ut/interconnect_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/interconnect_ut.cpp
@@ -47,7 +47,7 @@ public:
const TSessionToCookie::iterator s2cIt = SessionToCookie.emplace(SessionId, NextCookie);
InFlight.emplace(NextCookie, std::make_tuple(s2cIt, MD5::CalcRaw(data)));
TActivationContext::Send(new IEventHandle(TEvents::THelloWorld::Ping, IEventHandle::FlagTrackDelivery, Recipient,
- SelfId(), MakeIntrusive<TEventSerializedData>(std::move(data), false), NextCookie));
+ SelfId(), MakeIntrusive<TEventSerializedData>(std::move(data), TEventSerializationInfo{}), NextCookie));
// Cerr << (TStringBuilder() << "Send# " << NextCookie << Endl);
++NextCookie;
}
@@ -126,7 +126,7 @@ public:
const TString& data = ev->GetChainBuffer()->GetString();
const TString& response = MD5::CalcRaw(data);
TActivationContext::Send(new IEventHandle(TEvents::THelloWorld::Pong, 0, ev->Sender, SelfId(),
- MakeIntrusive<TEventSerializedData>(response, false), ev->Cookie));
+ MakeIntrusive<TEventSerializedData>(response, TEventSerializationInfo{}), ev->Cookie));
}
STRICT_STFUNC(StateFunc,