blob: c7cfbdd1b3be96b148da34ea18f1b6235a7421fe (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
#pragma once
#include <util/stream/walk.h>
#include <util/system/types.h>
#include <util/generic/string.h>
#include <library/cpp/actors/util/rope.h>
#include <library/cpp/actors/wilson/wilson_trace.h>
namespace NActors {
class IEventHandle;
struct TConstIoVec {
const void* Data;
size_t Size;
};
struct TIoVec {
void* Data;
size_t Size;
};
class TEventSerializedData
: public TThrRefBase
{
TRope Rope;
bool ExtendedFormat = false;
public:
TEventSerializedData() = default;
TEventSerializedData(TRope&& rope, bool extendedFormat)
: Rope(std::move(rope))
, ExtendedFormat(extendedFormat)
{}
TEventSerializedData(const TEventSerializedData& original, TString extraBuffer)
: Rope(original.Rope)
, ExtendedFormat(original.ExtendedFormat)
{
Append(std::move(extraBuffer));
}
TEventSerializedData(TString buffer, bool extendedFormat)
: ExtendedFormat(extendedFormat)
{
Append(std::move(buffer));
}
void SetExtendedFormat() {
ExtendedFormat = true;
}
bool IsExtendedFormat() const {
return ExtendedFormat;
}
TRope::TConstIterator GetBeginIter() const {
return Rope.Begin();
}
size_t GetSize() const {
return Rope.GetSize();
}
TString GetString() const {
TString result;
result.reserve(GetSize());
for (auto it = Rope.Begin(); it.Valid(); it.AdvanceToNextContiguousBlock()) {
result.append(it.ContiguousData(), it.ContiguousSize());
}
return result;
}
TRope GetRope() const {
return TRope(Rope);
}
TRope EraseBack(size_t count) {
Y_VERIFY(count <= Rope.GetSize());
TRope::TIterator iter = Rope.End();
iter -= count;
return Rope.Extract(iter, Rope.End());
}
void Append(TRope&& from) {
Rope.Insert(Rope.End(), std::move(from));
}
void Append(TString buffer) {
if (buffer) {
Rope.Insert(Rope.End(), TRope(std::move(buffer)));
}
}
};
}
class TChainBufWalk : public IWalkInput {
TIntrusivePtr<NActors::TEventSerializedData> Buffer;
TRope::TConstIterator Iter;
public:
TChainBufWalk(TIntrusivePtr<NActors::TEventSerializedData> buffer)
: Buffer(std::move(buffer))
, Iter(Buffer->GetBeginIter())
{}
private:
size_t DoUnboundedNext(const void **ptr) override {
const size_t size = Iter.ContiguousSize();
*ptr = Iter.ContiguousData();
if (Iter.Valid()) {
Iter.AdvanceToNextContiguousBlock();
}
return size;
}
};
|