1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
|
#include <IO/MemoryReadWriteBuffer.h>
#include <boost/noncopyable.hpp>
namespace DB
{
class ReadBufferFromMemoryWriteBuffer : public ReadBuffer, boost::noncopyable, private Allocator<false>
{
public:
explicit ReadBufferFromMemoryWriteBuffer(MemoryWriteBuffer && origin)
: ReadBuffer(nullptr, 0),
chunk_list(std::move(origin.chunk_list)),
end_pos(origin.position())
{
chunk_head = chunk_list.begin();
setChunk();
}
bool nextImpl() override
{
if (chunk_head == chunk_list.end())
return false;
++chunk_head;
return setChunk();
}
~ReadBufferFromMemoryWriteBuffer() override
{
for (const auto & range : chunk_list)
free(range.begin(), range.size());
}
private:
/// update buffers and position according to chunk_head pointer
bool setChunk()
{
if (chunk_head != chunk_list.end())
{
internalBuffer() = *chunk_head;
/// It is last chunk, it should be truncated
if (std::next(chunk_head) != chunk_list.end())
buffer() = internalBuffer();
else
buffer() = Buffer(internalBuffer().begin(), end_pos);
position() = buffer().begin();
}
else
{
buffer() = internalBuffer() = Buffer(nullptr, nullptr);
position() = nullptr;
}
return !buffer().empty();
}
using Container = std::forward_list<BufferBase::Buffer>;
Container chunk_list;
Container::iterator chunk_head;
Position end_pos;
};
MemoryWriteBuffer::MemoryWriteBuffer(size_t max_total_size_, size_t initial_chunk_size_, double growth_rate_, size_t max_chunk_size_)
: WriteBuffer(nullptr, 0),
max_total_size(max_total_size_),
initial_chunk_size(initial_chunk_size_),
max_chunk_size(max_chunk_size_),
growth_rate(growth_rate_)
{
addChunk();
}
void MemoryWriteBuffer::nextImpl()
{
if (unlikely(hasPendingData()))
{
/// ignore flush
buffer() = Buffer(pos, buffer().end());
return;
}
addChunk();
}
void MemoryWriteBuffer::addChunk()
{
size_t next_chunk_size;
if (chunk_list.empty())
{
chunk_tail = chunk_list.before_begin();
next_chunk_size = initial_chunk_size;
}
else
{
next_chunk_size = std::max(1uz, static_cast<size_t>(chunk_tail->size() * growth_rate));
next_chunk_size = std::min(next_chunk_size, max_chunk_size);
}
if (max_total_size)
{
if (total_chunks_size + next_chunk_size > max_total_size)
next_chunk_size = max_total_size - total_chunks_size;
if (0 == next_chunk_size)
{
set(position(), 0);
throw MemoryWriteBuffer::CurrentBufferExhausted();
}
}
Position begin = reinterpret_cast<Position>(alloc(next_chunk_size));
chunk_tail = chunk_list.emplace_after(chunk_tail, begin, begin + next_chunk_size);
total_chunks_size += next_chunk_size;
set(chunk_tail->begin(), chunk_tail->size());
}
std::shared_ptr<ReadBuffer> MemoryWriteBuffer::getReadBufferImpl()
{
finalize();
auto res = std::make_shared<ReadBufferFromMemoryWriteBuffer>(std::move(*this));
/// invalidate members
chunk_list.clear();
chunk_tail = chunk_list.begin();
return res;
}
MemoryWriteBuffer::~MemoryWriteBuffer()
{
for (const auto & range : chunk_list)
free(range.begin(), range.size());
}
}
|