aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/core/event_load.h
blob: c7cfbdd1b3be96b148da34ea18f1b6235a7421fe (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#pragma once

#include <util/stream/walk.h>
#include <util/system/types.h>
#include <util/generic/string.h>
#include <library/cpp/actors/util/rope.h>
#include <library/cpp/actors/wilson/wilson_trace.h>

namespace NActors {
    class IEventHandle;

    struct TConstIoVec {
        const void* Data;
        size_t Size;
    };

    struct TIoVec {
        void* Data;
        size_t Size;
    };

    class TEventSerializedData
       : public TThrRefBase
    {
        TRope Rope;
        bool ExtendedFormat = false;

    public:
        TEventSerializedData() = default;

        TEventSerializedData(TRope&& rope, bool extendedFormat)
            : Rope(std::move(rope))
            , ExtendedFormat(extendedFormat)
        {}

        TEventSerializedData(const TEventSerializedData& original, TString extraBuffer)
            : Rope(original.Rope)
            , ExtendedFormat(original.ExtendedFormat)
        {
            Append(std::move(extraBuffer));
        }

        TEventSerializedData(TString buffer, bool extendedFormat)
            : ExtendedFormat(extendedFormat)
        {
            Append(std::move(buffer));
        }

        void SetExtendedFormat() {
            ExtendedFormat = true;
        }

        bool IsExtendedFormat() const {
            return ExtendedFormat;
        }

        TRope::TConstIterator GetBeginIter() const {
            return Rope.Begin();
        }

        size_t GetSize() const {
            return Rope.GetSize();
        }

        TString GetString() const {
            TString result;
            result.reserve(GetSize());
            for (auto it = Rope.Begin(); it.Valid(); it.AdvanceToNextContiguousBlock()) {
                result.append(it.ContiguousData(), it.ContiguousSize());
            }
            return result;
        }

        TRope GetRope() const {
            return TRope(Rope);
        }

        TRope EraseBack(size_t count) {
            Y_VERIFY(count <= Rope.GetSize());
            TRope::TIterator iter = Rope.End();
            iter -= count;
            return Rope.Extract(iter, Rope.End());
        }

        void Append(TRope&& from) {
            Rope.Insert(Rope.End(), std::move(from));
        }

        void Append(TString buffer) {
            if (buffer) {
                Rope.Insert(Rope.End(), TRope(std::move(buffer)));
            }
        }
    };
}

class TChainBufWalk : public IWalkInput {
    TIntrusivePtr<NActors::TEventSerializedData> Buffer;
    TRope::TConstIterator Iter;

public:
    TChainBufWalk(TIntrusivePtr<NActors::TEventSerializedData> buffer)
        : Buffer(std::move(buffer))
        , Iter(Buffer->GetBeginIter())
    {}

private:
    size_t DoUnboundedNext(const void **ptr) override {
        const size_t size = Iter.ContiguousSize();
        *ptr = Iter.ContiguousData();
        if (Iter.Valid()) {
            Iter.AdvanceToNextContiguousBlock();
        }
        return size;
    }
};