diff options
author | Alexander Rutkovsky <alexvru@mail.ru> | 2022-02-10 16:47:39 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:47:39 +0300 |
commit | f3646f91e0de459836a7800b9ce3e8dc57a2ab3a (patch) | |
tree | 25c1423200152570c1f8307e5b8304b9bc3840c5 /library/cpp/actors/core/event_pb.h | |
parent | fccc62e9bfdce9be2fe7e0f23479da3a5512211a (diff) | |
download | ydb-f3646f91e0de459836a7800b9ce3e8dc57a2ab3a.tar.gz |
Restoring authorship annotation for Alexander Rutkovsky <alexvru@mail.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/core/event_pb.h')
-rw-r--r-- | library/cpp/actors/core/event_pb.h | 512 |
1 files changed, 256 insertions, 256 deletions
diff --git a/library/cpp/actors/core/event_pb.h b/library/cpp/actors/core/event_pb.h index d7546b901a..f88519e108 100644 --- a/library/cpp/actors/core/event_pb.h +++ b/library/cpp/actors/core/event_pb.h @@ -12,81 +12,81 @@ #include <array> namespace NActors { - - class TRopeStream : public NProtoBuf::io::ZeroCopyInputStream { - TRope::TConstIterator Iter; - const size_t Size; - + + class TRopeStream : public NProtoBuf::io::ZeroCopyInputStream { + TRope::TConstIterator Iter; + const size_t Size; + public: - TRopeStream(TRope::TConstIterator iter, size_t size) - : Iter(iter) - , Size(size) - {} - - bool Next(const void** data, int* size) override; - void BackUp(int count) override; - bool Skip(int count) override; + TRopeStream(TRope::TConstIterator iter, size_t size) + : Iter(iter) + , Size(size) + {} + + bool Next(const void** data, int* size) override; + void BackUp(int count) override; + bool Skip(int count) override; int64_t ByteCount() const override { return TotalByteCount; } - private: + private: int64_t TotalByteCount = 0; }; - class TChunkSerializer : public NProtoBuf::io::ZeroCopyOutputStream { + class TChunkSerializer : public NProtoBuf::io::ZeroCopyOutputStream { public: - TChunkSerializer() = default; - virtual ~TChunkSerializer() = default; - - virtual bool WriteRope(const TRope *rope) = 0; - virtual bool WriteString(const TString *s) = 0; + TChunkSerializer() = default; + virtual ~TChunkSerializer() = default; + + virtual bool WriteRope(const TRope *rope) = 0; + virtual bool WriteString(const TString *s) = 0; }; - class TAllocChunkSerializer final : public TChunkSerializer { + class TAllocChunkSerializer final : public TChunkSerializer { public: - bool Next(void** data, int* size) override; - void BackUp(int count) override; + bool Next(void** data, int* size) override; + void BackUp(int count) override; int64_t ByteCount() const override { return Buffers->GetSize(); } - bool WriteAliasedRaw(const void* data, int size) override; + bool WriteAliasedRaw(const void* data, int size) override; - // WARNING: these methods require owner to retain ownership and immutability of passed objects - bool WriteRope(const TRope *rope) override; - bool WriteString(const TString *s) override; - - inline TIntrusivePtr<TEventSerializedData> Release(bool extendedFormat) { - if (extendedFormat) { - Buffers->SetExtendedFormat(); - } - return std::move(Buffers); + // WARNING: these methods require owner to retain ownership and immutability of passed objects + bool WriteRope(const TRope *rope) override; + bool WriteString(const TString *s) override; + + inline TIntrusivePtr<TEventSerializedData> Release(bool extendedFormat) { + if (extendedFormat) { + Buffers->SetExtendedFormat(); + } + return std::move(Buffers); } protected: - TIntrusivePtr<TEventSerializedData> Buffers = new TEventSerializedData; - TRope Backup; + TIntrusivePtr<TEventSerializedData> Buffers = new TEventSerializedData; + TRope Backup; }; - class TCoroutineChunkSerializer final : public TChunkSerializer, protected ITrampoLine { + class TCoroutineChunkSerializer final : public TChunkSerializer, protected ITrampoLine { public: - using TChunk = std::pair<const char*, size_t>; - + using TChunk = std::pair<const char*, size_t>; + TCoroutineChunkSerializer(); ~TCoroutineChunkSerializer(); - void SetSerializingEvent(const IEventBase *event); - void Abort(); - std::pair<TChunk*, TChunk*> FeedBuf(void* data, size_t size); + void SetSerializingEvent(const IEventBase *event); + void Abort(); + std::pair<TChunk*, TChunk*> FeedBuf(void* data, size_t size); bool IsComplete() const { - return !Event; + return !Event; } bool IsSuccessfull() const { return SerializationSuccess; } - const IEventBase *GetCurrentEvent() const { - return Event; - } + const IEventBase *GetCurrentEvent() const { + return Event; + } bool Next(void** data, int* size) override; void BackUp(int count) override; @@ -96,42 +96,42 @@ namespace NActors { bool WriteAliasedRaw(const void* data, int size) override; bool AllowsAliasing() const override; - bool WriteRope(const TRope *rope) override; - bool WriteString(const TString *s) override; - + bool WriteRope(const TRope *rope) override; + bool WriteString(const TString *s) override; + protected: void DoRun() override; void Resume(); - bool Produce(const void *data, size_t size); + bool Produce(const void *data, size_t size); i64 TotalSerializedDataSize; TMappedAllocation Stack; TContClosure SelfClosure; TContMachineContext InnerContext; - TContMachineContext *BufFeedContext = nullptr; - char *BufferPtr; - size_t SizeRemain; - static constexpr size_t MaxChunks = 16; - TChunk Chunks[MaxChunks]; - size_t NumChunks = 0; - const IEventBase *Event = nullptr; - bool CancelFlag = false; - bool AbortFlag; - bool SerializationSuccess; - bool Finished = false; + TContMachineContext *BufFeedContext = nullptr; + char *BufferPtr; + size_t SizeRemain; + static constexpr size_t MaxChunks = 16; + TChunk Chunks[MaxChunks]; + size_t NumChunks = 0; + const IEventBase *Event = nullptr; + bool CancelFlag = false; + bool AbortFlag; + bool SerializationSuccess; + bool Finished = false; }; -#ifdef ACTORLIB_HUGE_PB_SIZE - static const size_t EventMaxByteSize = 140 << 20; // (140MB) -#else - static const size_t EventMaxByteSize = 67108000; -#endif - +#ifdef ACTORLIB_HUGE_PB_SIZE + static const size_t EventMaxByteSize = 140 << 20; // (140MB) +#else + static const size_t EventMaxByteSize = 67108000; +#endif + template <typename TEv, typename TRecord /*protobuf record*/, ui32 TEventType, typename TRecHolder> class TEventPBBase: public TEventBase<TEv, TEventType> , public TRecHolder { - // a vector of data buffers referenced by record; if filled, then extended serialization mechanism applies - TVector<TRope> Payload; - + // a vector of data buffers referenced by record; if filled, then extended serialization mechanism applies + TVector<TRope> Payload; + public: using TRecHolder::Record; @@ -155,218 +155,218 @@ namespace NActors { } TString ToString() const override { - return Record.ShortDebugString(); - } - - bool IsSerializable() const override { - return true; - } - - bool IsExtendedFormat() const override { - return static_cast<bool>(Payload); + return Record.ShortDebugString(); } + bool IsSerializable() const override { + return true; + } + + bool IsExtendedFormat() const override { + return static_cast<bool>(Payload); + } + bool SerializeToArcadiaStream(TChunkSerializer* chunker) const override { - // serialize payload first - if (Payload) { - void *data; - int size = 0; - auto append = [&](const char *p, size_t len) { - while (len) { - if (size) { - const size_t numBytesToCopy = std::min<size_t>(size, len); - memcpy(data, p, numBytesToCopy); - data = static_cast<char*>(data) + numBytesToCopy; - size -= numBytesToCopy; - p += numBytesToCopy; - len -= numBytesToCopy; - } else if (!chunker->Next(&data, &size)) { - return false; - } - } - return true; - }; - auto appendNumber = [&](size_t number) { - char buf[MaxNumberBytes]; - return append(buf, SerializeNumber(number, buf)); - }; - char marker = PayloadMarker; - append(&marker, 1); - if (!appendNumber(Payload.size())) { - return false; - } - for (const TRope& rope : Payload) { - if (!appendNumber(rope.GetSize())) { - return false; - } - if (rope) { - if (size) { - chunker->BackUp(std::exchange(size, 0)); - } - if (!chunker->WriteRope(&rope)) { - return false; - } - } - } - if (size) { - chunker->BackUp(size); - } - } - + // serialize payload first + if (Payload) { + void *data; + int size = 0; + auto append = [&](const char *p, size_t len) { + while (len) { + if (size) { + const size_t numBytesToCopy = std::min<size_t>(size, len); + memcpy(data, p, numBytesToCopy); + data = static_cast<char*>(data) + numBytesToCopy; + size -= numBytesToCopy; + p += numBytesToCopy; + len -= numBytesToCopy; + } else if (!chunker->Next(&data, &size)) { + return false; + } + } + return true; + }; + auto appendNumber = [&](size_t number) { + char buf[MaxNumberBytes]; + return append(buf, SerializeNumber(number, buf)); + }; + char marker = PayloadMarker; + append(&marker, 1); + if (!appendNumber(Payload.size())) { + return false; + } + for (const TRope& rope : Payload) { + if (!appendNumber(rope.GetSize())) { + return false; + } + if (rope) { + if (size) { + chunker->BackUp(std::exchange(size, 0)); + } + if (!chunker->WriteRope(&rope)) { + return false; + } + } + } + if (size) { + chunker->BackUp(size); + } + } + return Record.SerializeToZeroCopyStream(chunker); } ui32 CalculateSerializedSize() const override { - ssize_t result = Record.ByteSize(); - if (result >= 0 && Payload) { - ++result; // marker - char buf[MaxNumberBytes]; - result += SerializeNumber(Payload.size(), buf); - for (const TRope& rope : Payload) { - result += SerializeNumber(rope.GetSize(), buf); - result += rope.GetSize(); - } - } + ssize_t result = Record.ByteSize(); + if (result >= 0 && Payload) { + ++result; // marker + char buf[MaxNumberBytes]; + result += SerializeNumber(Payload.size(), buf); + for (const TRope& rope : Payload) { + result += SerializeNumber(rope.GetSize(), buf); + result += rope.GetSize(); + } + } return result; } static IEventBase* Load(TIntrusivePtr<TEventSerializedData> input) { THolder<TEventPBBase> ev(new TEv()); - if (!input->GetSize()) { + if (!input->GetSize()) { Y_PROTOBUF_SUPPRESS_NODISCARD ev->Record.ParseFromString(TString()); - } else { - TRope::TConstIterator iter = input->GetBeginIter(); - ui64 size = input->GetSize(); - - if (input->IsExtendedFormat()) { - // check marker - if (!iter.Valid() || *iter.ContiguousData() != PayloadMarker) { - Y_FAIL("invalid event"); - } - // skip marker - iter += 1; - --size; - // parse number of payload ropes - size_t numRopes = DeserializeNumber(iter, size); - if (numRopes == Max<size_t>()) { - Y_FAIL("invalid event"); - } - while (numRopes--) { - // parse length of the rope - const size_t len = DeserializeNumber(iter, size); - if (len == Max<size_t>() || size < len) { - Y_FAIL("invalid event len# %zu size# %" PRIu64, len, size); - } - // extract the rope - TRope::TConstIterator begin = iter; - iter += len; - size -= len; - ev->Payload.emplace_back(begin, iter); - } - } - - // parse the protobuf - TRopeStream stream(iter, size); - if (!ev->Record.ParseFromZeroCopyStream(&stream)) { + } else { + TRope::TConstIterator iter = input->GetBeginIter(); + ui64 size = input->GetSize(); + + if (input->IsExtendedFormat()) { + // check marker + if (!iter.Valid() || *iter.ContiguousData() != PayloadMarker) { + Y_FAIL("invalid event"); + } + // skip marker + iter += 1; + --size; + // parse number of payload ropes + size_t numRopes = DeserializeNumber(iter, size); + if (numRopes == Max<size_t>()) { + Y_FAIL("invalid event"); + } + while (numRopes--) { + // parse length of the rope + const size_t len = DeserializeNumber(iter, size); + if (len == Max<size_t>() || size < len) { + Y_FAIL("invalid event len# %zu size# %" PRIu64, len, size); + } + // extract the rope + TRope::TConstIterator begin = iter; + iter += len; + size -= len; + ev->Payload.emplace_back(begin, iter); + } + } + + // parse the protobuf + TRopeStream stream(iter, size); + if (!ev->Record.ParseFromZeroCopyStream(&stream)) { Y_FAIL("Failed to parse protobuf event type %" PRIu32 " class %s", TEventType, TypeName(ev->Record).data()); - } + } } - ev->CachedByteSize = input->GetSize(); - return ev.Release(); + ev->CachedByteSize = input->GetSize(); + return ev.Release(); } - size_t GetCachedByteSize() const { - if (CachedByteSize == 0) { - CachedByteSize = CalculateSerializedSize(); + size_t GetCachedByteSize() const { + if (CachedByteSize == 0) { + CachedByteSize = CalculateSerializedSize(); } return CachedByteSize; } - ui32 CalculateSerializedSizeCached() const override { - return GetCachedByteSize(); - } - + ui32 CalculateSerializedSizeCached() const override { + return GetCachedByteSize(); + } + void InvalidateCachedByteSize() { CachedByteSize = 0; } - public: + public: void ReservePayload(size_t size) { Payload.reserve(size); } - ui32 AddPayload(TRope&& rope) { - const ui32 id = Payload.size(); - Payload.push_back(std::move(rope)); - InvalidateCachedByteSize(); - return id; - } - - const TRope& GetPayload(ui32 id) const { - Y_VERIFY(id < Payload.size()); - return Payload[id]; - } - - ui32 GetPayloadCount() const { - return Payload.size(); - } - - void StripPayload() { - Payload.clear(); - } - - protected: + ui32 AddPayload(TRope&& rope) { + const ui32 id = Payload.size(); + Payload.push_back(std::move(rope)); + InvalidateCachedByteSize(); + return id; + } + + const TRope& GetPayload(ui32 id) const { + Y_VERIFY(id < Payload.size()); + return Payload[id]; + } + + ui32 GetPayloadCount() const { + return Payload.size(); + } + + void StripPayload() { + Payload.clear(); + } + + protected: mutable size_t CachedByteSize = 0; - - static constexpr char PayloadMarker = 0x07; - static constexpr size_t MaxNumberBytes = (sizeof(size_t) * CHAR_BIT + 6) / 7; - - static size_t SerializeNumber(size_t num, char *buffer) { - char *begin = buffer; - do { - *buffer++ = (num & 0x7F) | (num >= 128 ? 0x80 : 0x00); - num >>= 7; - } while (num); - return buffer - begin; - } - - static size_t DeserializeNumber(const char **ptr, const char *end) { - const char *p = *ptr; - size_t res = 0; - size_t offset = 0; - for (;;) { - if (p == end) { - return Max<size_t>(); - } - const char byte = *p++; - res |= (static_cast<size_t>(byte) & 0x7F) << offset; - offset += 7; - if (!(byte & 0x80)) { - break; - } - } - *ptr = p; - return res; - } - - static size_t DeserializeNumber(TRope::TConstIterator& iter, ui64& size) { - size_t res = 0; - size_t offset = 0; - for (;;) { - if (!iter.Valid()) { - return Max<size_t>(); - } - const char byte = *iter.ContiguousData(); - iter += 1; - --size; - res |= (static_cast<size_t>(byte) & 0x7F) << offset; - offset += 7; - if (!(byte & 0x80)) { - break; - } - } - return res; - } + + static constexpr char PayloadMarker = 0x07; + static constexpr size_t MaxNumberBytes = (sizeof(size_t) * CHAR_BIT + 6) / 7; + + static size_t SerializeNumber(size_t num, char *buffer) { + char *begin = buffer; + do { + *buffer++ = (num & 0x7F) | (num >= 128 ? 0x80 : 0x00); + num >>= 7; + } while (num); + return buffer - begin; + } + + static size_t DeserializeNumber(const char **ptr, const char *end) { + const char *p = *ptr; + size_t res = 0; + size_t offset = 0; + for (;;) { + if (p == end) { + return Max<size_t>(); + } + const char byte = *p++; + res |= (static_cast<size_t>(byte) & 0x7F) << offset; + offset += 7; + if (!(byte & 0x80)) { + break; + } + } + *ptr = p; + return res; + } + + static size_t DeserializeNumber(TRope::TConstIterator& iter, ui64& size) { + size_t res = 0; + size_t offset = 0; + for (;;) { + if (!iter.Valid()) { + return Max<size_t>(); + } + const char byte = *iter.ContiguousData(); + iter += 1; + --size; + res |= (static_cast<size_t>(byte) & 0x7F) << offset; + offset += 7; + if (!(byte & 0x80)) { + break; + } + } + return res; + } }; // Protobuf record not using arena @@ -468,7 +468,7 @@ namespace NActors { } TString ToString() const override { - return GetRecord().ShortDebugString(); + return GetRecord().ShortDebugString(); } bool SerializeToArcadiaStream(TChunkSerializer* chunker) const override { @@ -482,10 +482,10 @@ namespace NActors { size_t GetCachedByteSize() const { return PreSerializedData.size() + TBase::GetCachedByteSize(); } - - ui32 CalculateSerializedSizeCached() const override { - return GetCachedByteSize(); - } + + ui32 CalculateSerializedSizeCached() const override { + return GetCachedByteSize(); + } }; inline TActorId ActorIdFromProto(const NActorsProto::TActorId& actorId) { |