diff options
author | alexvru <alexvru@ydb.tech> | 2023-01-18 20:52:56 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-01-18 20:52:56 +0300 |
commit | 165a931108b2403c481db9430596b2ebef8c5391 (patch) | |
tree | 61c85dafdfa5a4bca0222a627d153d9c5c76c118 /library/cpp/actors/interconnect | |
parent | 0d3b35526e4872d4609ba51cb38b46cc588aac71 (diff) | |
download | ydb-165a931108b2403c481db9430596b2ebef8c5391.tar.gz |
Support interface part for ExternalDataChannel feature
Diffstat (limited to 'library/cpp/actors/interconnect')
6 files changed, 16 insertions, 13 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp index fed1394686..7f3bc65497 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.cpp +++ b/library/cpp/actors/interconnect/interconnect_channel.cpp @@ -25,7 +25,7 @@ namespace NActors { task.Orbit.Take(event.Orbit); event.Descr.Flags = (event.Descr.Flags & ~IEventHandle::FlagForwardOnNondelivery) | - (ExtendedFormat ? IEventHandle::FlagExtendedFormat : 0); + (SerializationInfo.IsExtendedFormat ? IEventHandle::FlagExtendedFormat : 0); TChannelPart *part = static_cast<TChannelPart*>(task.GetFreeArea()); part->Channel = ChannelId | TChannelPart::LastPartFlag; @@ -85,14 +85,14 @@ namespace NActors { State = EState::CHUNKER; IEventBase *base = event.Event.Get(); Chunker.SetSerializingEvent(base); - ExtendedFormat = base->IsExtendedFormat(); + SerializationInfo = base->CreateSerializationInfo(); } else if (event.Buffer) { State = EState::BUFFER; Iter = event.Buffer->GetBeginIter(); - ExtendedFormat = event.Buffer->IsExtendedFormat(); + SerializationInfo = event.Buffer->ReleaseSerializationInfo(); } else { State = EState::DESCRIPTOR; - ExtendedFormat = false; + SerializationInfo = {}; } break; diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h index 58783d3c8d..4a476abc37 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.h +++ b/library/cpp/actors/interconnect/interconnect_channel.h @@ -114,7 +114,7 @@ namespace NActors { std::list<TEventHolder> NotYetConfirmed; TRope::TConstIterator Iter; TCoroutineChunkSerializer Chunker; - bool ExtendedFormat = false; + TEventSerializationInfo SerializationInfo; bool FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index a8c505d94d..c18e2a5137 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -327,12 +327,15 @@ namespace NActors { return ReestablishConnection(TDisconnectReason::ChecksumError()); } } + TEventSerializationInfo serializationInfo{ + .IsExtendedFormat = bool(descr.Flags & IEventHandle::FlagExtendedFormat), + }; auto ev = std::make_unique<IEventHandle>(SessionId, descr.Type, descr.Flags & ~IEventHandle::FlagExtendedFormat, descr.Recipient, descr.Sender, - MakeIntrusive<TEventSerializedData>(std::move(data), bool(descr.Flags & IEventHandle::FlagExtendedFormat)), + MakeIntrusive<TEventSerializedData>(std::move(data), std::move(serializationInfo)), descr.Cookie, Params.PeerScopeId, std::move(descr.TraceId)); diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp index 565a511859..32c8237b59 100644 --- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp +++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp @@ -20,7 +20,7 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) { auto pushEvent = [&](size_t size, int channel) { TString payload(size, 'X'); - auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, false), 0); + auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, TEventSerializationInfo{}), 0); auto& ch = scheduler.GetOutputChannel(channel); const bool wasWorking = ch.IsWorking(); ch.Push(*ev); diff --git a/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp b/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp index e6b2bd4e4c..7b45793e26 100644 --- a/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp +++ b/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp @@ -29,16 +29,16 @@ Y_UNIT_TEST_SUITE(EventHolderPool) { std::list<TEventHolder> q; auto& ev1 = pool.Allocate(q); - ev1.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true); + ev1.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), TEventSerializationInfo{}); auto& ev2 = pool.Allocate(q); - ev2.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true); + ev2.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), TEventSerializationInfo{}); auto& ev3 = pool.Allocate(q); - ev3.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true); + ev3.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), TEventSerializationInfo{}); auto& ev4 = pool.Allocate(q); - ev4.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true); + ev4.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), TEventSerializationInfo{}); pool.Release(q, q.begin()); pool.Release(q, q.begin()); diff --git a/library/cpp/actors/interconnect/ut/interconnect_ut.cpp b/library/cpp/actors/interconnect/ut/interconnect_ut.cpp index 8ef0b1507c..3596bffd5a 100644 --- a/library/cpp/actors/interconnect/ut/interconnect_ut.cpp +++ b/library/cpp/actors/interconnect/ut/interconnect_ut.cpp @@ -47,7 +47,7 @@ public: const TSessionToCookie::iterator s2cIt = SessionToCookie.emplace(SessionId, NextCookie); InFlight.emplace(NextCookie, std::make_tuple(s2cIt, MD5::CalcRaw(data))); TActivationContext::Send(new IEventHandle(TEvents::THelloWorld::Ping, IEventHandle::FlagTrackDelivery, Recipient, - SelfId(), MakeIntrusive<TEventSerializedData>(std::move(data), false), NextCookie)); + SelfId(), MakeIntrusive<TEventSerializedData>(std::move(data), TEventSerializationInfo{}), NextCookie)); // Cerr << (TStringBuilder() << "Send# " << NextCookie << Endl); ++NextCookie; } @@ -126,7 +126,7 @@ public: const TString& data = ev->GetChainBuffer()->GetString(); const TString& response = MD5::CalcRaw(data); TActivationContext::Send(new IEventHandle(TEvents::THelloWorld::Pong, 0, ev->Sender, SelfId(), - MakeIntrusive<TEventSerializedData>(response, false), ev->Cookie)); + MakeIntrusive<TEventSerializedData>(response, TEventSerializationInfo{}), ev->Cookie)); } STRICT_STFUNC(StateFunc, |