aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/core/event_pb.h
diff options
context:
space:
mode:
authorAlexander Rutkovsky <alexvru@mail.ru>2022-02-10 16:47:39 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:47:39 +0300
commitf3646f91e0de459836a7800b9ce3e8dc57a2ab3a (patch)
tree25c1423200152570c1f8307e5b8304b9bc3840c5 /library/cpp/actors/core/event_pb.h
parentfccc62e9bfdce9be2fe7e0f23479da3a5512211a (diff)
downloadydb-f3646f91e0de459836a7800b9ce3e8dc57a2ab3a.tar.gz
Restoring authorship annotation for Alexander Rutkovsky <alexvru@mail.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/actors/core/event_pb.h')
-rw-r--r--library/cpp/actors/core/event_pb.h512
1 files changed, 256 insertions, 256 deletions
diff --git a/library/cpp/actors/core/event_pb.h b/library/cpp/actors/core/event_pb.h
index d7546b901a..f88519e108 100644
--- a/library/cpp/actors/core/event_pb.h
+++ b/library/cpp/actors/core/event_pb.h
@@ -12,81 +12,81 @@
#include <array>
namespace NActors {
-
- class TRopeStream : public NProtoBuf::io::ZeroCopyInputStream {
- TRope::TConstIterator Iter;
- const size_t Size;
-
+
+ class TRopeStream : public NProtoBuf::io::ZeroCopyInputStream {
+ TRope::TConstIterator Iter;
+ const size_t Size;
+
public:
- TRopeStream(TRope::TConstIterator iter, size_t size)
- : Iter(iter)
- , Size(size)
- {}
-
- bool Next(const void** data, int* size) override;
- void BackUp(int count) override;
- bool Skip(int count) override;
+ TRopeStream(TRope::TConstIterator iter, size_t size)
+ : Iter(iter)
+ , Size(size)
+ {}
+
+ bool Next(const void** data, int* size) override;
+ void BackUp(int count) override;
+ bool Skip(int count) override;
int64_t ByteCount() const override {
return TotalByteCount;
}
- private:
+ private:
int64_t TotalByteCount = 0;
};
- class TChunkSerializer : public NProtoBuf::io::ZeroCopyOutputStream {
+ class TChunkSerializer : public NProtoBuf::io::ZeroCopyOutputStream {
public:
- TChunkSerializer() = default;
- virtual ~TChunkSerializer() = default;
-
- virtual bool WriteRope(const TRope *rope) = 0;
- virtual bool WriteString(const TString *s) = 0;
+ TChunkSerializer() = default;
+ virtual ~TChunkSerializer() = default;
+
+ virtual bool WriteRope(const TRope *rope) = 0;
+ virtual bool WriteString(const TString *s) = 0;
};
- class TAllocChunkSerializer final : public TChunkSerializer {
+ class TAllocChunkSerializer final : public TChunkSerializer {
public:
- bool Next(void** data, int* size) override;
- void BackUp(int count) override;
+ bool Next(void** data, int* size) override;
+ void BackUp(int count) override;
int64_t ByteCount() const override {
return Buffers->GetSize();
}
- bool WriteAliasedRaw(const void* data, int size) override;
+ bool WriteAliasedRaw(const void* data, int size) override;
- // WARNING: these methods require owner to retain ownership and immutability of passed objects
- bool WriteRope(const TRope *rope) override;
- bool WriteString(const TString *s) override;
-
- inline TIntrusivePtr<TEventSerializedData> Release(bool extendedFormat) {
- if (extendedFormat) {
- Buffers->SetExtendedFormat();
- }
- return std::move(Buffers);
+ // WARNING: these methods require owner to retain ownership and immutability of passed objects
+ bool WriteRope(const TRope *rope) override;
+ bool WriteString(const TString *s) override;
+
+ inline TIntrusivePtr<TEventSerializedData> Release(bool extendedFormat) {
+ if (extendedFormat) {
+ Buffers->SetExtendedFormat();
+ }
+ return std::move(Buffers);
}
protected:
- TIntrusivePtr<TEventSerializedData> Buffers = new TEventSerializedData;
- TRope Backup;
+ TIntrusivePtr<TEventSerializedData> Buffers = new TEventSerializedData;
+ TRope Backup;
};
- class TCoroutineChunkSerializer final : public TChunkSerializer, protected ITrampoLine {
+ class TCoroutineChunkSerializer final : public TChunkSerializer, protected ITrampoLine {
public:
- using TChunk = std::pair<const char*, size_t>;
-
+ using TChunk = std::pair<const char*, size_t>;
+
TCoroutineChunkSerializer();
~TCoroutineChunkSerializer();
- void SetSerializingEvent(const IEventBase *event);
- void Abort();
- std::pair<TChunk*, TChunk*> FeedBuf(void* data, size_t size);
+ void SetSerializingEvent(const IEventBase *event);
+ void Abort();
+ std::pair<TChunk*, TChunk*> FeedBuf(void* data, size_t size);
bool IsComplete() const {
- return !Event;
+ return !Event;
}
bool IsSuccessfull() const {
return SerializationSuccess;
}
- const IEventBase *GetCurrentEvent() const {
- return Event;
- }
+ const IEventBase *GetCurrentEvent() const {
+ return Event;
+ }
bool Next(void** data, int* size) override;
void BackUp(int count) override;
@@ -96,42 +96,42 @@ namespace NActors {
bool WriteAliasedRaw(const void* data, int size) override;
bool AllowsAliasing() const override;
- bool WriteRope(const TRope *rope) override;
- bool WriteString(const TString *s) override;
-
+ bool WriteRope(const TRope *rope) override;
+ bool WriteString(const TString *s) override;
+
protected:
void DoRun() override;
void Resume();
- bool Produce(const void *data, size_t size);
+ bool Produce(const void *data, size_t size);
i64 TotalSerializedDataSize;
TMappedAllocation Stack;
TContClosure SelfClosure;
TContMachineContext InnerContext;
- TContMachineContext *BufFeedContext = nullptr;
- char *BufferPtr;
- size_t SizeRemain;
- static constexpr size_t MaxChunks = 16;
- TChunk Chunks[MaxChunks];
- size_t NumChunks = 0;
- const IEventBase *Event = nullptr;
- bool CancelFlag = false;
- bool AbortFlag;
- bool SerializationSuccess;
- bool Finished = false;
+ TContMachineContext *BufFeedContext = nullptr;
+ char *BufferPtr;
+ size_t SizeRemain;
+ static constexpr size_t MaxChunks = 16;
+ TChunk Chunks[MaxChunks];
+ size_t NumChunks = 0;
+ const IEventBase *Event = nullptr;
+ bool CancelFlag = false;
+ bool AbortFlag;
+ bool SerializationSuccess;
+ bool Finished = false;
};
-#ifdef ACTORLIB_HUGE_PB_SIZE
- static const size_t EventMaxByteSize = 140 << 20; // (140MB)
-#else
- static const size_t EventMaxByteSize = 67108000;
-#endif
-
+#ifdef ACTORLIB_HUGE_PB_SIZE
+ static const size_t EventMaxByteSize = 140 << 20; // (140MB)
+#else
+ static const size_t EventMaxByteSize = 67108000;
+#endif
+
template <typename TEv, typename TRecord /*protobuf record*/, ui32 TEventType, typename TRecHolder>
class TEventPBBase: public TEventBase<TEv, TEventType> , public TRecHolder {
- // a vector of data buffers referenced by record; if filled, then extended serialization mechanism applies
- TVector<TRope> Payload;
-
+ // a vector of data buffers referenced by record; if filled, then extended serialization mechanism applies
+ TVector<TRope> Payload;
+
public:
using TRecHolder::Record;
@@ -155,218 +155,218 @@ namespace NActors {
}
TString ToString() const override {
- return Record.ShortDebugString();
- }
-
- bool IsSerializable() const override {
- return true;
- }
-
- bool IsExtendedFormat() const override {
- return static_cast<bool>(Payload);
+ return Record.ShortDebugString();
}
+ bool IsSerializable() const override {
+ return true;
+ }
+
+ bool IsExtendedFormat() const override {
+ return static_cast<bool>(Payload);
+ }
+
bool SerializeToArcadiaStream(TChunkSerializer* chunker) const override {
- // serialize payload first
- if (Payload) {
- void *data;
- int size = 0;
- auto append = [&](const char *p, size_t len) {
- while (len) {
- if (size) {
- const size_t numBytesToCopy = std::min<size_t>(size, len);
- memcpy(data, p, numBytesToCopy);
- data = static_cast<char*>(data) + numBytesToCopy;
- size -= numBytesToCopy;
- p += numBytesToCopy;
- len -= numBytesToCopy;
- } else if (!chunker->Next(&data, &size)) {
- return false;
- }
- }
- return true;
- };
- auto appendNumber = [&](size_t number) {
- char buf[MaxNumberBytes];
- return append(buf, SerializeNumber(number, buf));
- };
- char marker = PayloadMarker;
- append(&marker, 1);
- if (!appendNumber(Payload.size())) {
- return false;
- }
- for (const TRope& rope : Payload) {
- if (!appendNumber(rope.GetSize())) {
- return false;
- }
- if (rope) {
- if (size) {
- chunker->BackUp(std::exchange(size, 0));
- }
- if (!chunker->WriteRope(&rope)) {
- return false;
- }
- }
- }
- if (size) {
- chunker->BackUp(size);
- }
- }
-
+ // serialize payload first
+ if (Payload) {
+ void *data;
+ int size = 0;
+ auto append = [&](const char *p, size_t len) {
+ while (len) {
+ if (size) {
+ const size_t numBytesToCopy = std::min<size_t>(size, len);
+ memcpy(data, p, numBytesToCopy);
+ data = static_cast<char*>(data) + numBytesToCopy;
+ size -= numBytesToCopy;
+ p += numBytesToCopy;
+ len -= numBytesToCopy;
+ } else if (!chunker->Next(&data, &size)) {
+ return false;
+ }
+ }
+ return true;
+ };
+ auto appendNumber = [&](size_t number) {
+ char buf[MaxNumberBytes];
+ return append(buf, SerializeNumber(number, buf));
+ };
+ char marker = PayloadMarker;
+ append(&marker, 1);
+ if (!appendNumber(Payload.size())) {
+ return false;
+ }
+ for (const TRope& rope : Payload) {
+ if (!appendNumber(rope.GetSize())) {
+ return false;
+ }
+ if (rope) {
+ if (size) {
+ chunker->BackUp(std::exchange(size, 0));
+ }
+ if (!chunker->WriteRope(&rope)) {
+ return false;
+ }
+ }
+ }
+ if (size) {
+ chunker->BackUp(size);
+ }
+ }
+
return Record.SerializeToZeroCopyStream(chunker);
}
ui32 CalculateSerializedSize() const override {
- ssize_t result = Record.ByteSize();
- if (result >= 0 && Payload) {
- ++result; // marker
- char buf[MaxNumberBytes];
- result += SerializeNumber(Payload.size(), buf);
- for (const TRope& rope : Payload) {
- result += SerializeNumber(rope.GetSize(), buf);
- result += rope.GetSize();
- }
- }
+ ssize_t result = Record.ByteSize();
+ if (result >= 0 && Payload) {
+ ++result; // marker
+ char buf[MaxNumberBytes];
+ result += SerializeNumber(Payload.size(), buf);
+ for (const TRope& rope : Payload) {
+ result += SerializeNumber(rope.GetSize(), buf);
+ result += rope.GetSize();
+ }
+ }
return result;
}
static IEventBase* Load(TIntrusivePtr<TEventSerializedData> input) {
THolder<TEventPBBase> ev(new TEv());
- if (!input->GetSize()) {
+ if (!input->GetSize()) {
Y_PROTOBUF_SUPPRESS_NODISCARD ev->Record.ParseFromString(TString());
- } else {
- TRope::TConstIterator iter = input->GetBeginIter();
- ui64 size = input->GetSize();
-
- if (input->IsExtendedFormat()) {
- // check marker
- if (!iter.Valid() || *iter.ContiguousData() != PayloadMarker) {
- Y_FAIL("invalid event");
- }
- // skip marker
- iter += 1;
- --size;
- // parse number of payload ropes
- size_t numRopes = DeserializeNumber(iter, size);
- if (numRopes == Max<size_t>()) {
- Y_FAIL("invalid event");
- }
- while (numRopes--) {
- // parse length of the rope
- const size_t len = DeserializeNumber(iter, size);
- if (len == Max<size_t>() || size < len) {
- Y_FAIL("invalid event len# %zu size# %" PRIu64, len, size);
- }
- // extract the rope
- TRope::TConstIterator begin = iter;
- iter += len;
- size -= len;
- ev->Payload.emplace_back(begin, iter);
- }
- }
-
- // parse the protobuf
- TRopeStream stream(iter, size);
- if (!ev->Record.ParseFromZeroCopyStream(&stream)) {
+ } else {
+ TRope::TConstIterator iter = input->GetBeginIter();
+ ui64 size = input->GetSize();
+
+ if (input->IsExtendedFormat()) {
+ // check marker
+ if (!iter.Valid() || *iter.ContiguousData() != PayloadMarker) {
+ Y_FAIL("invalid event");
+ }
+ // skip marker
+ iter += 1;
+ --size;
+ // parse number of payload ropes
+ size_t numRopes = DeserializeNumber(iter, size);
+ if (numRopes == Max<size_t>()) {
+ Y_FAIL("invalid event");
+ }
+ while (numRopes--) {
+ // parse length of the rope
+ const size_t len = DeserializeNumber(iter, size);
+ if (len == Max<size_t>() || size < len) {
+ Y_FAIL("invalid event len# %zu size# %" PRIu64, len, size);
+ }
+ // extract the rope
+ TRope::TConstIterator begin = iter;
+ iter += len;
+ size -= len;
+ ev->Payload.emplace_back(begin, iter);
+ }
+ }
+
+ // parse the protobuf
+ TRopeStream stream(iter, size);
+ if (!ev->Record.ParseFromZeroCopyStream(&stream)) {
Y_FAIL("Failed to parse protobuf event type %" PRIu32 " class %s", TEventType, TypeName(ev->Record).data());
- }
+ }
}
- ev->CachedByteSize = input->GetSize();
- return ev.Release();
+ ev->CachedByteSize = input->GetSize();
+ return ev.Release();
}
- size_t GetCachedByteSize() const {
- if (CachedByteSize == 0) {
- CachedByteSize = CalculateSerializedSize();
+ size_t GetCachedByteSize() const {
+ if (CachedByteSize == 0) {
+ CachedByteSize = CalculateSerializedSize();
}
return CachedByteSize;
}
- ui32 CalculateSerializedSizeCached() const override {
- return GetCachedByteSize();
- }
-
+ ui32 CalculateSerializedSizeCached() const override {
+ return GetCachedByteSize();
+ }
+
void InvalidateCachedByteSize() {
CachedByteSize = 0;
}
- public:
+ public:
void ReservePayload(size_t size) {
Payload.reserve(size);
}
- ui32 AddPayload(TRope&& rope) {
- const ui32 id = Payload.size();
- Payload.push_back(std::move(rope));
- InvalidateCachedByteSize();
- return id;
- }
-
- const TRope& GetPayload(ui32 id) const {
- Y_VERIFY(id < Payload.size());
- return Payload[id];
- }
-
- ui32 GetPayloadCount() const {
- return Payload.size();
- }
-
- void StripPayload() {
- Payload.clear();
- }
-
- protected:
+ ui32 AddPayload(TRope&& rope) {
+ const ui32 id = Payload.size();
+ Payload.push_back(std::move(rope));
+ InvalidateCachedByteSize();
+ return id;
+ }
+
+ const TRope& GetPayload(ui32 id) const {
+ Y_VERIFY(id < Payload.size());
+ return Payload[id];
+ }
+
+ ui32 GetPayloadCount() const {
+ return Payload.size();
+ }
+
+ void StripPayload() {
+ Payload.clear();
+ }
+
+ protected:
mutable size_t CachedByteSize = 0;
-
- static constexpr char PayloadMarker = 0x07;
- static constexpr size_t MaxNumberBytes = (sizeof(size_t) * CHAR_BIT + 6) / 7;
-
- static size_t SerializeNumber(size_t num, char *buffer) {
- char *begin = buffer;
- do {
- *buffer++ = (num & 0x7F) | (num >= 128 ? 0x80 : 0x00);
- num >>= 7;
- } while (num);
- return buffer - begin;
- }
-
- static size_t DeserializeNumber(const char **ptr, const char *end) {
- const char *p = *ptr;
- size_t res = 0;
- size_t offset = 0;
- for (;;) {
- if (p == end) {
- return Max<size_t>();
- }
- const char byte = *p++;
- res |= (static_cast<size_t>(byte) & 0x7F) << offset;
- offset += 7;
- if (!(byte & 0x80)) {
- break;
- }
- }
- *ptr = p;
- return res;
- }
-
- static size_t DeserializeNumber(TRope::TConstIterator& iter, ui64& size) {
- size_t res = 0;
- size_t offset = 0;
- for (;;) {
- if (!iter.Valid()) {
- return Max<size_t>();
- }
- const char byte = *iter.ContiguousData();
- iter += 1;
- --size;
- res |= (static_cast<size_t>(byte) & 0x7F) << offset;
- offset += 7;
- if (!(byte & 0x80)) {
- break;
- }
- }
- return res;
- }
+
+ static constexpr char PayloadMarker = 0x07;
+ static constexpr size_t MaxNumberBytes = (sizeof(size_t) * CHAR_BIT + 6) / 7;
+
+ static size_t SerializeNumber(size_t num, char *buffer) {
+ char *begin = buffer;
+ do {
+ *buffer++ = (num & 0x7F) | (num >= 128 ? 0x80 : 0x00);
+ num >>= 7;
+ } while (num);
+ return buffer - begin;
+ }
+
+ static size_t DeserializeNumber(const char **ptr, const char *end) {
+ const char *p = *ptr;
+ size_t res = 0;
+ size_t offset = 0;
+ for (;;) {
+ if (p == end) {
+ return Max<size_t>();
+ }
+ const char byte = *p++;
+ res |= (static_cast<size_t>(byte) & 0x7F) << offset;
+ offset += 7;
+ if (!(byte & 0x80)) {
+ break;
+ }
+ }
+ *ptr = p;
+ return res;
+ }
+
+ static size_t DeserializeNumber(TRope::TConstIterator& iter, ui64& size) {
+ size_t res = 0;
+ size_t offset = 0;
+ for (;;) {
+ if (!iter.Valid()) {
+ return Max<size_t>();
+ }
+ const char byte = *iter.ContiguousData();
+ iter += 1;
+ --size;
+ res |= (static_cast<size_t>(byte) & 0x7F) << offset;
+ offset += 7;
+ if (!(byte & 0x80)) {
+ break;
+ }
+ }
+ return res;
+ }
};
// Protobuf record not using arena
@@ -468,7 +468,7 @@ namespace NActors {
}
TString ToString() const override {
- return GetRecord().ShortDebugString();
+ return GetRecord().ShortDebugString();
}
bool SerializeToArcadiaStream(TChunkSerializer* chunker) const override {
@@ -482,10 +482,10 @@ namespace NActors {
size_t GetCachedByteSize() const {
return PreSerializedData.size() + TBase::GetCachedByteSize();
}
-
- ui32 CalculateSerializedSizeCached() const override {
- return GetCachedByteSize();
- }
+
+ ui32 CalculateSerializedSizeCached() const override {
+ return GetCachedByteSize();
+ }
};
inline TActorId ActorIdFromProto(const NActorsProto::TActorId& actorId) {