1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
|
#pragma once
#include <cassert>
#include <cstring>
#include <algorithm>
#include <memory>
#include <Common/Exception.h>
#include <Common/Priority.h>
#include <IO/BufferBase.h>
#include <IO/AsynchronousReader.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ATTEMPT_TO_READ_AFTER_EOF;
extern const int CANNOT_READ_ALL_DATA;
extern const int NOT_IMPLEMENTED;
}
static constexpr auto DEFAULT_PREFETCH_PRIORITY = Priority{0};
/** A simple abstract class for buffered data reading (char sequences) from somewhere.
* Unlike std::istream, it provides access to the internal buffer,
* and also allows you to manually manage the position inside the buffer.
*
* Note! `char *`, not `const char *` is used
* (so that you can take out the common code into BufferBase, and also so that you can fill the buffer in with new data).
* This causes inconveniences - for example, when using ReadBuffer to read from a chunk of memory const char *,
* you have to use const_cast.
*
* Derived classes must implement the nextImpl() method.
*/
class ReadBuffer : public BufferBase
{
public:
/** Creates a buffer and sets a piece of available data to read to zero size,
* so that the next() function is called to load the new data portion into the buffer at the first try.
*/
ReadBuffer(Position ptr, size_t size) : BufferBase(ptr, size, 0) { working_buffer.resize(0); }
/** Used when the buffer is already full of data that can be read.
* (in this case, pass 0 as an offset)
*/
ReadBuffer(Position ptr, size_t size, size_t offset) : BufferBase(ptr, size, offset) {}
// Copying the read buffers can be dangerous because they can hold a lot of
// memory or open files, so better to disable the copy constructor to prevent
// accidental copying.
ReadBuffer(const ReadBuffer &) = delete;
// FIXME: behavior differs greately from `BufferBase::set()` and it's very confusing.
void set(Position ptr, size_t size) { BufferBase::set(ptr, size, 0); working_buffer.resize(0); }
/** read next data and fill a buffer with it; set position to the beginning of the new data
* (but not necessarily to the beginning of working_buffer!);
* return `false` in case of end, `true` otherwise; throw an exception, if something is wrong;
*
* if an exception was thrown, is the ReadBuffer left in a usable state? this varies across implementations;
* can the caller retry next() after an exception, or call other methods? not recommended
*/
bool next()
{
assert(!hasPendingData());
assert(position() <= working_buffer.end());
bytes += offset();
bool res = nextImpl();
if (!res)
working_buffer = Buffer(pos, pos);
else
{
pos = working_buffer.begin() + nextimpl_working_buffer_offset;
assert(position() != working_buffer.end());
}
nextimpl_working_buffer_offset = 0;
assert(position() <= working_buffer.end());
return res;
}
inline void nextIfAtEnd()
{
if (!hasPendingData())
next();
}
virtual ~ReadBuffer() = default;
/** Unlike std::istream, it returns true if all data was read
* (and not in case there was an attempt to read after the end).
* If at the moment the position is at the end of the buffer, it calls the next() method.
* That is, it has a side effect - if the buffer is over, then it updates it and set the position to the beginning.
*
* Try to read after the end should throw an exception.
*/
bool ALWAYS_INLINE eof()
{
return !hasPendingData() && !next();
}
void ignore()
{
if (!eof())
++pos;
else
throwReadAfterEOF();
}
void ignore(size_t n)
{
while (n != 0 && !eof())
{
size_t bytes_to_ignore = std::min(static_cast<size_t>(working_buffer.end() - pos), n);
pos += bytes_to_ignore;
n -= bytes_to_ignore;
}
if (n)
throwReadAfterEOF();
}
/// You could call this method `ignore`, and `ignore` call `ignoreStrict`.
size_t tryIgnore(size_t n)
{
size_t bytes_ignored = 0;
while (bytes_ignored < n && !eof())
{
size_t bytes_to_ignore = std::min(static_cast<size_t>(working_buffer.end() - pos), n - bytes_ignored);
pos += bytes_to_ignore;
bytes_ignored += bytes_to_ignore;
}
return bytes_ignored;
}
void ignoreAll()
{
tryIgnore(std::numeric_limits<size_t>::max());
}
/// Peeks a single byte.
bool ALWAYS_INLINE peek(char & c)
{
if (eof())
return false;
c = *pos;
return true;
}
/// Reads a single byte.
[[nodiscard]] bool ALWAYS_INLINE read(char & c)
{
if (peek(c))
{
++pos;
return true;
}
return false;
}
void ALWAYS_INLINE readStrict(char & c)
{
if (read(c))
return;
throwReadAfterEOF();
}
/** Reads as many as there are, no more than n bytes. */
[[nodiscard]] size_t read(char * to, size_t n)
{
size_t bytes_copied = 0;
while (bytes_copied < n && !eof())
{
size_t bytes_to_copy = std::min(static_cast<size_t>(working_buffer.end() - pos), n - bytes_copied);
::memcpy(to + bytes_copied, pos, bytes_to_copy);
pos += bytes_to_copy;
bytes_copied += bytes_to_copy;
}
return bytes_copied;
}
/** Reads n bytes, if there are less - throws an exception. */
void readStrict(char * to, size_t n)
{
auto read_bytes = read(to, n);
if (n != read_bytes)
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA,
"Cannot read all data. Bytes read: {}. Bytes expected: {}.", read_bytes, std::to_string(n));
}
/** A method that can be more efficiently implemented in derived classes, in the case of reading large enough blocks.
* The implementation can read data directly into `to`, without superfluous copying, if in `to` there is enough space for work.
* For example, a CompressedReadBuffer can decompress the data directly into `to`, if the entire decompressed block fits there.
* By default - the same as read.
* Don't use for small reads.
*/
[[nodiscard]] virtual size_t readBig(char * to, size_t n) { return read(to, n); }
/** Do something to allow faster subsequent call to 'nextImpl' if possible.
* It's used for asynchronous readers with double-buffering.
* `priority` is the `ThreadPool` priority, with which the prefetch task will be scheduled.
* Lower value means higher priority.
*/
virtual void prefetch(Priority) {}
/**
* Set upper bound for read range [..., position).
* Useful for reading from remote filesystem, when it matters how much we read.
* Doesn't affect getFileSize().
* See also: SeekableReadBuffer::supportsRightBoundedReads().
*
* Behavior in weird cases is currently implementation-defined:
* - setReadUntilPosition() below current position,
* - setReadUntilPosition() above the end of the file,
* - seek() to a position above the until position (even if you setReadUntilPosition() to a
* higher value right after the seek!),
*
* Typical implementations discard any current buffers and connections, even if the position is
* adjusted only a little.
*
* Typical usage is to call it right after creating the ReadBuffer, before it started doing any
* work.
*/
virtual void setReadUntilPosition(size_t /* position */) {}
virtual void setReadUntilEnd() {}
/// Read at most `size` bytes into data at specified offset `offset`. First ignore `ignore` bytes if `ignore` > 0.
/// Notice: this function only need to be implemented in synchronous read buffers to be wrapped in asynchronous read.
/// Such as ReadBufferFromRemoteFSGather and AsynchronousReadIndirectBufferFromRemoteFS.
virtual IAsynchronousReader::Result readInto(char * /*data*/, size_t /*size*/, size_t /*offset*/, size_t /*ignore*/)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "readInto not implemented");
}
protected:
/// The number of bytes to ignore from the initial position of `working_buffer`
/// buffer. Apparently this is an additional out-parameter for nextImpl(),
/// not a real field.
size_t nextimpl_working_buffer_offset = 0;
private:
/** Read the next data and fill a buffer with it.
* Return `false` in case of the end, `true` otherwise.
* Throw an exception if something is wrong.
*/
virtual bool nextImpl() { return false; }
[[noreturn]] static void throwReadAfterEOF()
{
throw Exception(ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF, "Attempt to read after eof");
}
};
using ReadBufferPtr = std::shared_ptr<ReadBuffer>;
/// Due to inconsistencies in ReadBuffer-family interfaces:
/// - some require to fully wrap underlying buffer and own it,
/// - some just wrap the reference without ownership,
/// we need to be able to wrap reference-only buffers with movable transparent proxy-buffer.
/// The uniqueness of such wraps is responsibility of the code author.
std::unique_ptr<ReadBuffer> wrapReadBufferReference(ReadBuffer & ref);
std::unique_ptr<ReadBuffer> wrapReadBufferPointer(ReadBufferPtr ptr);
}
|