aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/core/event_load.h
blob: 267345f0c92525404c91cee7edd32006ccae8383 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#pragma once

#include <util/stream/walk.h> 
#include <util/system/types.h>
#include <util/generic/string.h>
#include <library/cpp/actors/util/rope.h>
#include <library/cpp/actors/wilson/wilson_trace.h>

namespace NActors {
    class IEventHandle;

    struct TConstIoVec {
        const void* Data;
        size_t Size;
    };

    struct TIoVec {
        void* Data;
        size_t Size;
    };

    class TEventSerializedData 
       : public TThrRefBase 
    { 
        TRope Rope; 
        bool ExtendedFormat = false; 
 
    public: 
        TEventSerializedData() = default; 
 
        TEventSerializedData(TRope&& rope, bool extendedFormat) 
            : Rope(std::move(rope)) 
            , ExtendedFormat(extendedFormat) 
        {} 
 
        TEventSerializedData(const TEventSerializedData& original, TString extraBuffer) 
            : Rope(original.Rope) 
            , ExtendedFormat(original.ExtendedFormat) 
        {
            Append(std::move(extraBuffer)); 
        }
 
        TEventSerializedData(TString buffer, bool extendedFormat) 
            : ExtendedFormat(extendedFormat) 
        { 
            Append(std::move(buffer)); 
        } 
 
        void SetExtendedFormat() { 
            ExtendedFormat = true; 
        } 
 
        bool IsExtendedFormat() const { 
            return ExtendedFormat; 
        } 
 
        TRope::TConstIterator GetBeginIter() const { 
            return Rope.Begin(); 
        } 
 
        size_t GetSize() const { 
            return Rope.GetSize(); 
        } 
 
        TString GetString() const { 
            TString result; 
            result.reserve(GetSize()); 
            for (auto it = Rope.Begin(); it.Valid(); it.AdvanceToNextContiguousBlock()) { 
                result.append(it.ContiguousData(), it.ContiguousSize()); 
            } 
            return result; 
        } 

        TRope EraseBack(size_t count) { 
            Y_VERIFY(count <= Rope.GetSize()); 
            TRope::TIterator iter = Rope.End(); 
            iter -= count; 
            return Rope.Extract(iter, Rope.End()); 
        } 
 
        void Append(TRope&& from) { 
            Rope.Insert(Rope.End(), std::move(from)); 
        }

        void Append(TString buffer) { 
            if (buffer) { 
                Rope.Insert(Rope.End(), TRope(std::move(buffer))); 
            } 
        }
    };
}

class TChainBufWalk : public IWalkInput { 
    TIntrusivePtr<NActors::TEventSerializedData> Buffer; 
    TRope::TConstIterator Iter; 

public: 
    TChainBufWalk(TIntrusivePtr<NActors::TEventSerializedData> buffer) 
        : Buffer(std::move(buffer)) 
        , Iter(Buffer->GetBeginIter()) 
    {} 

private: 
    size_t DoUnboundedNext(const void **ptr) override { 
        const size_t size = Iter.ContiguousSize(); 
        *ptr = Iter.ContiguousData(); 
        if (Iter.Valid()) { 
            Iter.AdvanceToNextContiguousBlock(); 
        } 
        return size; 
    }
};