diff options
| author | alexvru <[email protected]> | 2023-01-18 20:52:56 +0300 | 
|---|---|---|
| committer | alexvru <[email protected]> | 2023-01-18 20:52:56 +0300 | 
| commit | 165a931108b2403c481db9430596b2ebef8c5391 (patch) | |
| tree | 61c85dafdfa5a4bca0222a627d153d9c5c76c118 /library/cpp | |
| parent | 0d3b35526e4872d4609ba51cb38b46cc588aac71 (diff) | |
Support interface part for ExternalDataChannel feature
Diffstat (limited to 'library/cpp')
| -rw-r--r-- | library/cpp/actors/core/event.cpp | 7 | ||||
| -rw-r--r-- | library/cpp/actors/core/event.h | 4 | ||||
| -rw-r--r-- | library/cpp/actors/core/event_load.h | 40 | ||||
| -rw-r--r-- | library/cpp/actors/core/event_pb.h | 34 | ||||
| -rw-r--r-- | library/cpp/actors/core/event_pb_payload_ut.cpp | 8 | ||||
| -rw-r--r-- | library/cpp/actors/core/mon_ut.cpp | 2 | ||||
| -rw-r--r-- | library/cpp/actors/interconnect/interconnect_channel.cpp | 8 | ||||
| -rw-r--r-- | library/cpp/actors/interconnect/interconnect_channel.h | 2 | ||||
| -rw-r--r-- | library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp | 5 | ||||
| -rw-r--r-- | library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp | 2 | ||||
| -rw-r--r-- | library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp | 8 | ||||
| -rw-r--r-- | library/cpp/actors/interconnect/ut/interconnect_ut.cpp | 4 | 
12 files changed, 81 insertions, 43 deletions
diff --git a/library/cpp/actors/core/event.cpp b/library/cpp/actors/core/event.cpp index 33f8ce2aaf3..6a5230f825e 100644 --- a/library/cpp/actors/core/event.cpp +++ b/library/cpp/actors/core/event.cpp @@ -17,7 +17,7 @@ namespace NActors {          if (Event) {              TAllocChunkSerializer serializer;              Event->SerializeToArcadiaStream(&serializer); -            auto chainBuf = serializer.Release(Event->IsExtendedFormat()); +            auto chainBuf = serializer.Release(Event->CreateSerializationInfo());              Event.Reset();              return chainBuf;          } @@ -25,12 +25,13 @@ namespace NActors {      }      TIntrusivePtr<TEventSerializedData> IEventHandle::GetChainBuffer() { -        if (Buffer) +        if (Buffer) {              return Buffer; +        }          if (Event) {              TAllocChunkSerializer serializer;              Event->SerializeToArcadiaStream(&serializer); -            Buffer = serializer.Release(Event->IsExtendedFormat()); +            Buffer = serializer.Release(Event->CreateSerializationInfo());              return Buffer;          }          return new TEventSerializedData; diff --git a/library/cpp/actors/core/event.h b/library/cpp/actors/core/event.h index 6b92edaf41b..4eedeb0574a 100644 --- a/library/cpp/actors/core/event.h +++ b/library/cpp/actors/core/event.h @@ -47,12 +47,10 @@ namespace NActors {          virtual ui32 Type() const = 0;          virtual bool SerializeToArcadiaStream(TChunkSerializer*) const = 0;          virtual bool IsSerializable() const = 0; -        virtual bool IsExtendedFormat() const { -            return false; -        }          virtual ui32 CalculateSerializedSizeCached() const {              return CalculateSerializedSize();          } +        virtual TEventSerializationInfo CreateSerializationInfo() const { return {}; }      };      // fat handle diff --git a/library/cpp/actors/core/event_load.h b/library/cpp/actors/core/event_load.h index c7cfbdd1b3b..8d069c555d5 100644 --- a/library/cpp/actors/core/event_load.h +++ b/library/cpp/actors/core/event_load.h @@ -19,39 +19,59 @@ namespace NActors {          size_t Size;      }; +    struct TEventSectionInfo { +        size_t Headroom = 0; // headroom to be created on the receiving side +        size_t Size = 0; // full size of serialized event section (a chunk in rope) +        size_t Tailroom = 0; // tailroom for the chunk +        size_t Alignment = 0; // required alignment +    }; + +    struct TEventSerializationInfo { +        bool IsExtendedFormat = {}; +        std::vector<TEventSectionInfo> Sections; +        // total sum of Size for every section must match actual serialized size of the event +    }; +      class TEventSerializedData         : public TThrRefBase      {          TRope Rope; -        bool ExtendedFormat = false; +        TEventSerializationInfo SerializationInfo;      public:          TEventSerializedData() = default; -        TEventSerializedData(TRope&& rope, bool extendedFormat) +        TEventSerializedData(TRope&& rope, TEventSerializationInfo&& serializationInfo)              : Rope(std::move(rope)) -            , ExtendedFormat(extendedFormat) +            , SerializationInfo(std::move(serializationInfo))          {}          TEventSerializedData(const TEventSerializedData& original, TString extraBuffer)              : Rope(original.Rope) -            , ExtendedFormat(original.ExtendedFormat) +            , SerializationInfo(original.SerializationInfo)          { +            if (!SerializationInfo.Sections.empty()) { +                SerializationInfo.Sections.push_back(TEventSectionInfo{0, extraBuffer.size(), 0, 0}); +            }              Append(std::move(extraBuffer));          } -        TEventSerializedData(TString buffer, bool extendedFormat) -            : ExtendedFormat(extendedFormat) +        TEventSerializedData(TString buffer, TEventSerializationInfo&& serializationInfo) +            : SerializationInfo(std::move(serializationInfo))          {              Append(std::move(buffer));          } -        void SetExtendedFormat() { -            ExtendedFormat = true; +        void SetSerializationInfo(TEventSerializationInfo&& serializationInfo) { +            SerializationInfo = std::move(serializationInfo); +        } + +        const TEventSerializationInfo& GetSerializationInfo() const { +            return SerializationInfo;          } -        bool IsExtendedFormat() const { -            return ExtendedFormat; +        TEventSerializationInfo ReleaseSerializationInfo() { +            return std::move(SerializationInfo);          }          TRope::TConstIterator GetBeginIter() const { diff --git a/library/cpp/actors/core/event_pb.h b/library/cpp/actors/core/event_pb.h index 2d388fceeb0..594559cea78 100644 --- a/library/cpp/actors/core/event_pb.h +++ b/library/cpp/actors/core/event_pb.h @@ -56,10 +56,8 @@ namespace NActors {          bool WriteRope(const TRope *rope) override;          bool WriteString(const TString *s) override; -        inline TIntrusivePtr<TEventSerializedData> Release(bool extendedFormat) { -            if (extendedFormat) { -                Buffers->SetExtendedFormat(); -            } +        inline TIntrusivePtr<TEventSerializedData> Release(TEventSerializationInfo&& serializationInfo) { +            Buffers->SetSerializationInfo(std::move(serializationInfo));              return std::move(Buffers);          } @@ -160,10 +158,6 @@ namespace NActors {              return true;          } -        bool IsExtendedFormat() const override { -            return static_cast<bool>(Payload); -        } -          bool SerializeToArcadiaStream(TChunkSerializer* chunker) const override {              // serialize payload first              if (Payload) { @@ -236,7 +230,7 @@ namespace NActors {                  TRope::TConstIterator iter = input->GetBeginIter();                  ui64 size = input->GetSize(); -                if (input->IsExtendedFormat()) { +                if (const auto& info = input->GetSerializationInfo(); info.IsExtendedFormat) {                      // check marker                      if (!iter.Valid() || *iter.ContiguousData() != PayloadMarker) {                          Y_FAIL("invalid event"); @@ -288,6 +282,28 @@ namespace NActors {              CachedByteSize = 0;          } +        TEventSerializationInfo CreateSerializationInfo() const override { +            TEventSerializationInfo info; + +            if (Payload) { +                char temp[MaxNumberBytes]; +                info.Sections.push_back(TEventSectionInfo{0, 1, 0, 0}); // payload marker +                info.Sections.push_back(TEventSectionInfo{0, SerializeNumber(Payload.size(), temp), 0, 0}); +                for (const TRope& payload : Payload) { +                    info.Sections.push_back(TEventSectionInfo{0, SerializeNumber(payload.GetSize(), temp), 0, 0}); // length +                    info.Sections.push_back(TEventSectionInfo{0, payload.GetSize(), 0, 0}); // data +                } +                info.IsExtendedFormat = true; +            } else { +                info.IsExtendedFormat = false; +            } + +            const int byteSize = Max(0, Record.ByteSize()); +            info.Sections.push_back(TEventSectionInfo{0, static_cast<size_t>(byteSize), 0, 0}); + +            return info; +        } +      public:          void ReservePayload(size_t size) {              Payload.reserve(size); diff --git a/library/cpp/actors/core/event_pb_payload_ut.cpp b/library/cpp/actors/core/event_pb_payload_ut.cpp index eab007bc15d..fe47bf4de0e 100644 --- a/library/cpp/actors/core/event_pb_payload_ut.cpp +++ b/library/cpp/actors/core/event_pb_payload_ut.cpp @@ -50,7 +50,7 @@ Y_UNIT_TEST_SUITE(TEventProtoWithPayload) {          auto serializer = MakeHolder<TAllocChunkSerializer>();          msg.SerializeToArcadiaStream(serializer.Get()); -        auto buffers = serializer->Release(msg.IsExtendedFormat()); +        auto buffers = serializer->Release(msg.CreateSerializationInfo());          UNIT_ASSERT_VALUES_EQUAL(buffers->GetSize(), msg.CalculateSerializedSize());          TString ser = buffers->GetString(); @@ -128,20 +128,20 @@ Y_UNIT_TEST_SUITE(TEventProtoWithPayload) {          auto serializer1 = MakeHolder<TAllocChunkSerializer>();          e1.SerializeToArcadiaStream(serializer1.Get()); -        auto buffers1 = serializer1->Release(e1.IsExtendedFormat()); +        auto buffers1 = serializer1->Release(e1.CreateSerializationInfo());          UNIT_ASSERT_VALUES_EQUAL(buffers1->GetSize(), e1.CalculateSerializedSize());          TString ser1 = buffers1->GetString();          TEvMessageWithPayload e2(msg);          auto serializer2 = MakeHolder<TAllocChunkSerializer>();          e2.SerializeToArcadiaStream(serializer2.Get()); -        auto buffers2 = serializer2->Release(e2.IsExtendedFormat()); +        auto buffers2 = serializer2->Release(e2.CreateSerializationInfo());          UNIT_ASSERT_VALUES_EQUAL(buffers2->GetSize(), e2.CalculateSerializedSize());          TString ser2 = buffers2->GetString();          UNIT_ASSERT_VALUES_EQUAL(ser1, ser2);          // deserialize -        auto data = MakeIntrusive<TEventSerializedData>(ser1, false); +        auto data = MakeIntrusive<TEventSerializedData>(ser1, TEventSerializationInfo{});          THolder<TEvMessageWithPayloadPreSerialized> parsedEvent(static_cast<TEvMessageWithPayloadPreSerialized*>(TEvMessageWithPayloadPreSerialized::Load(data)));          UNIT_ASSERT_VALUES_EQUAL(parsedEvent->PreSerializedData, ""); // this field is empty after deserialization          auto& record = parsedEvent->GetRecord(); diff --git a/library/cpp/actors/core/mon_ut.cpp b/library/cpp/actors/core/mon_ut.cpp index d70b2c4a896..a2991e8a10c 100644 --- a/library/cpp/actors/core/mon_ut.cpp +++ b/library/cpp/actors/core/mon_ut.cpp @@ -17,7 +17,7 @@ Y_UNIT_TEST_SUITE(ActorSystemMon) {          TAllocChunkSerializer ser;          const bool success = ev->SerializeToArcadiaStream(&ser);          Y_VERIFY(success); -        auto buffer = ser.Release(false); +        auto buffer = ser.Release(ev->CreateSerializationInfo());          std::unique_ptr<TEvRemoteHttpInfo> restored(dynamic_cast<TEvRemoteHttpInfo*>(TEvRemoteHttpInfo::Load(buffer.Get())));          UNIT_ASSERT(restored->Query == ev->Query);          UNIT_ASSERT(restored->Query.size()); diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp index fed13946866..7f3bc65497e 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.cpp +++ b/library/cpp/actors/interconnect/interconnect_channel.cpp @@ -25,7 +25,7 @@ namespace NActors {          task.Orbit.Take(event.Orbit);          event.Descr.Flags = (event.Descr.Flags & ~IEventHandle::FlagForwardOnNondelivery) | -            (ExtendedFormat ? IEventHandle::FlagExtendedFormat : 0); +            (SerializationInfo.IsExtendedFormat ? IEventHandle::FlagExtendedFormat : 0);          TChannelPart *part = static_cast<TChannelPart*>(task.GetFreeArea());          part->Channel = ChannelId | TChannelPart::LastPartFlag; @@ -85,14 +85,14 @@ namespace NActors {                          State = EState::CHUNKER;                          IEventBase *base = event.Event.Get();                          Chunker.SetSerializingEvent(base); -                        ExtendedFormat = base->IsExtendedFormat(); +                        SerializationInfo = base->CreateSerializationInfo();                      } else if (event.Buffer) {                          State = EState::BUFFER;                          Iter = event.Buffer->GetBeginIter(); -                        ExtendedFormat = event.Buffer->IsExtendedFormat(); +                        SerializationInfo = event.Buffer->ReleaseSerializationInfo();                      } else {                          State = EState::DESCRIPTOR; -                        ExtendedFormat = false; +                        SerializationInfo = {};                      }                      break; diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h index 58783d3c8d1..4a476abc375 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.h +++ b/library/cpp/actors/interconnect/interconnect_channel.h @@ -114,7 +114,7 @@ namespace NActors {          std::list<TEventHolder> NotYetConfirmed;          TRope::TConstIterator Iter;          TCoroutineChunkSerializer Chunker; -        bool ExtendedFormat = false; +        TEventSerializationInfo SerializationInfo;          bool FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index a8c505d94de..c18e2a51373 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -327,12 +327,15 @@ namespace NActors {                  return ReestablishConnection(TDisconnectReason::ChecksumError());              }          } +        TEventSerializationInfo serializationInfo{ +            .IsExtendedFormat = bool(descr.Flags & IEventHandle::FlagExtendedFormat), +        };          auto ev = std::make_unique<IEventHandle>(SessionId,              descr.Type,              descr.Flags & ~IEventHandle::FlagExtendedFormat,              descr.Recipient,              descr.Sender, -            MakeIntrusive<TEventSerializedData>(std::move(data), bool(descr.Flags & IEventHandle::FlagExtendedFormat)), +            MakeIntrusive<TEventSerializedData>(std::move(data), std::move(serializationInfo)),              descr.Cookie,              Params.PeerScopeId,              std::move(descr.TraceId)); diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp index 565a511859e..32c8237b599 100644 --- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp +++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp @@ -20,7 +20,7 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) {          auto pushEvent = [&](size_t size, int channel) {              TString payload(size, 'X'); -            auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, false), 0); +            auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, TEventSerializationInfo{}), 0);              auto& ch = scheduler.GetOutputChannel(channel);              const bool wasWorking = ch.IsWorking();              ch.Push(*ev); diff --git a/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp b/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp index e6b2bd4e4c4..7b45793e26c 100644 --- a/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp +++ b/library/cpp/actors/interconnect/ut/event_holder_pool_ut.cpp @@ -29,16 +29,16 @@ Y_UNIT_TEST_SUITE(EventHolderPool) {          std::list<TEventHolder> q;          auto& ev1 = pool.Allocate(q); -        ev1.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true); +        ev1.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), TEventSerializationInfo{});          auto& ev2 = pool.Allocate(q); -        ev2.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true); +        ev2.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), TEventSerializationInfo{});          auto& ev3 = pool.Allocate(q); -        ev3.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true); +        ev3.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), TEventSerializationInfo{});          auto& ev4 = pool.Allocate(q); -        ev4.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), true); +        ev4.Buffer = MakeIntrusive<TEventSerializedData>(TString::Uninitialized(512 * 1024), TEventSerializationInfo{});          pool.Release(q, q.begin());          pool.Release(q, q.begin()); diff --git a/library/cpp/actors/interconnect/ut/interconnect_ut.cpp b/library/cpp/actors/interconnect/ut/interconnect_ut.cpp index 8ef0b1507c9..3596bffd5ac 100644 --- a/library/cpp/actors/interconnect/ut/interconnect_ut.cpp +++ b/library/cpp/actors/interconnect/ut/interconnect_ut.cpp @@ -47,7 +47,7 @@ public:              const TSessionToCookie::iterator s2cIt = SessionToCookie.emplace(SessionId, NextCookie);              InFlight.emplace(NextCookie, std::make_tuple(s2cIt, MD5::CalcRaw(data)));              TActivationContext::Send(new IEventHandle(TEvents::THelloWorld::Ping, IEventHandle::FlagTrackDelivery, Recipient, -                SelfId(), MakeIntrusive<TEventSerializedData>(std::move(data), false), NextCookie)); +                SelfId(), MakeIntrusive<TEventSerializedData>(std::move(data), TEventSerializationInfo{}), NextCookie));  //            Cerr << (TStringBuilder() << "Send# " << NextCookie << Endl);              ++NextCookie;          } @@ -126,7 +126,7 @@ public:          const TString& data = ev->GetChainBuffer()->GetString();          const TString& response = MD5::CalcRaw(data);          TActivationContext::Send(new IEventHandle(TEvents::THelloWorld::Pong, 0, ev->Sender, SelfId(), -            MakeIntrusive<TEventSerializedData>(response, false), ev->Cookie)); +            MakeIntrusive<TEventSerializedData>(response, TEventSerializationInfo{}), ev->Cookie));      }      STRICT_STFUNC(StateFunc,  | 
