1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
|
#include <cerrno>
#include <ctime>
#include <optional>
#include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h>
#include <Common/Exception.h>
#include <Common/CurrentMetrics.h>
#include <Common/Throttler.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteHelpers.h>
#include <Common/filesystemHelpers.h>
#include <sys/stat.h>
#include <Interpreters/Context.h>
#pragma clang diagnostic ignored "-Wreserved-identifier"
namespace ProfileEvents
{
extern const Event ReadBufferFromFileDescriptorRead;
extern const Event ReadBufferFromFileDescriptorReadFailed;
extern const Event ReadBufferFromFileDescriptorReadBytes;
extern const Event DiskReadElapsedMicroseconds;
extern const Event Seek;
extern const Event LocalReadThrottlerBytes;
extern const Event LocalReadThrottlerSleepMicroseconds;
}
namespace CurrentMetrics
{
extern const Metric Read;
}
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int CANNOT_SELECT;
extern const int CANNOT_ADVISE;
}
std::string ReadBufferFromFileDescriptor::getFileName() const
{
return "(fd = " + toString(fd) + ")";
}
size_t ReadBufferFromFileDescriptor::readImpl(char * to, size_t min_bytes, size_t max_bytes, size_t offset)
{
chassert(min_bytes <= max_bytes);
/// This is a workaround of a read past EOF bug in linux kernel with pread()
if (file_size.has_value() && offset >= *file_size)
return 0;
size_t bytes_read = 0;
while (bytes_read < min_bytes)
{
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorRead);
Stopwatch watch(profile_callback ? clock_type : CLOCK_MONOTONIC);
ssize_t res = 0;
size_t to_read = max_bytes - bytes_read;
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::Read};
if (use_pread)
res = ::pread(fd, to + bytes_read, to_read, offset + bytes_read);
else
res = ::read(fd, to + bytes_read, to_read);
}
if (!res)
break;
if (-1 == res && errno != EINTR)
{
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadFailed);
throwFromErrnoWithPath("Cannot read from file: " + getFileName(), getFileName(), ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR);
}
if (res > 0)
{
bytes_read += res;
if (throttler)
throttler->add(res, ProfileEvents::LocalReadThrottlerBytes, ProfileEvents::LocalReadThrottlerSleepMicroseconds);
}
/// It reports real time spent including the time spent while thread was preempted doing nothing.
/// And it is Ok for the purpose of this watch (it is used to lower the number of threads to read from tables).
/// Sometimes it is better to use taskstats::blkio_delay_total, but it is quite expensive to get it
/// (NetlinkMetricsProvider has about 500K RPS).
watch.stop();
ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds());
if (profile_callback)
{
ProfileInfo info;
info.bytes_requested = to_read;
info.bytes_read = res;
info.nanoseconds = watch.elapsed();
profile_callback(info);
}
}
if (bytes_read)
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadBytes, bytes_read);
return bytes_read;
}
bool ReadBufferFromFileDescriptor::nextImpl()
{
/// If internal_buffer size is empty, then read() cannot be distinguished from EOF
assert(!internal_buffer.empty());
size_t bytes_read = readImpl(internal_buffer.begin(), 1, internal_buffer.size(), file_offset_of_buffer_end);
file_offset_of_buffer_end += bytes_read;
if (bytes_read)
{
working_buffer = internal_buffer;
working_buffer.resize(bytes_read);
}
else
return false;
return true;
}
void ReadBufferFromFileDescriptor::prefetch(Priority)
{
#if defined(POSIX_FADV_WILLNEED)
/// For direct IO, loading data into page cache is pointless.
if (required_alignment)
return;
/// Ask OS to prefetch data into page cache.
if (0 != posix_fadvise(fd, file_offset_of_buffer_end, internal_buffer.size(), POSIX_FADV_WILLNEED))
throwFromErrno("Cannot posix_fadvise", ErrorCodes::CANNOT_ADVISE);
#endif
}
/// If 'offset' is small enough to stay in buffer after seek, then true seek in file does not happen.
off_t ReadBufferFromFileDescriptor::seek(off_t offset, int whence)
{
size_t new_pos;
if (whence == SEEK_SET)
{
assert(offset >= 0);
new_pos = offset;
}
else if (whence == SEEK_CUR)
{
new_pos = file_offset_of_buffer_end - (working_buffer.end() - pos) + offset;
}
else
{
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "ReadBufferFromFileDescriptor::seek expects SEEK_SET or SEEK_CUR as whence");
}
/// Position is unchanged.
if (new_pos + (working_buffer.end() - pos) == file_offset_of_buffer_end)
return new_pos;
if (file_offset_of_buffer_end - working_buffer.size() <= static_cast<size_t>(new_pos)
&& new_pos <= file_offset_of_buffer_end)
{
/// Position is still inside the buffer.
/// Probably it is at the end of the buffer - then we will load data on the following 'next' call.
pos = working_buffer.end() - file_offset_of_buffer_end + new_pos;
assert(pos >= working_buffer.begin());
assert(pos <= working_buffer.end());
return new_pos;
}
else
{
/// Position is out of the buffer, we need to do real seek.
off_t seek_pos = required_alignment > 1
? new_pos / required_alignment * required_alignment
: new_pos;
off_t offset_after_seek_pos = new_pos - seek_pos;
/// First reset the buffer so the next read will fetch new data to the buffer.
resetWorkingBuffer();
/// In case of using 'pread' we just update the info about the next position in file.
/// In case of using 'read' we call 'lseek'.
/// We account both cases as seek event as it leads to non-contiguous reads from file.
ProfileEvents::increment(ProfileEvents::Seek);
if (!use_pread)
{
Stopwatch watch(profile_callback ? clock_type : CLOCK_MONOTONIC);
off_t res = ::lseek(fd, seek_pos, SEEK_SET);
if (-1 == res)
throwFromErrnoWithPath(fmt::format("Cannot seek through file {} at offset {}", getFileName(), seek_pos), getFileName(),
ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
/// Also note that seeking past the file size is not allowed.
if (res != seek_pos)
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE,
"The 'lseek' syscall returned value ({}) that is not expected ({})", res, seek_pos);
watch.stop();
ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds());
}
file_offset_of_buffer_end = seek_pos;
if (offset_after_seek_pos > 0)
ignore(offset_after_seek_pos);
return seek_pos;
}
}
void ReadBufferFromFileDescriptor::rewind()
{
if (!use_pread)
{
ProfileEvents::increment(ProfileEvents::Seek);
off_t res = ::lseek(fd, 0, SEEK_SET);
if (-1 == res)
throwFromErrnoWithPath("Cannot seek through file " + getFileName(), getFileName(),
ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
}
/// In case of pread, the ProfileEvents::Seek is not accounted, but it's Ok.
/// Clearing the buffer with existing data. New data will be read on subsequent call to 'next'.
working_buffer.resize(0);
pos = working_buffer.begin();
file_offset_of_buffer_end = 0;
}
/// Assuming file descriptor supports 'select', check that we have data to read or wait until timeout.
bool ReadBufferFromFileDescriptor::poll(size_t timeout_microseconds) const
{
fd_set fds;
FD_ZERO(&fds);
FD_SET(fd, &fds);
timeval timeout = { time_t(timeout_microseconds / 1000000), suseconds_t(timeout_microseconds % 1000000) };
int res = select(1, &fds, nullptr, nullptr, &timeout);
if (-1 == res)
throwFromErrno("Cannot select", ErrorCodes::CANNOT_SELECT);
return res > 0;
}
size_t ReadBufferFromFileDescriptor::getFileSize()
{
return getSizeFromFileDescriptor(fd, getFileName());
}
bool ReadBufferFromFileDescriptor::checkIfActuallySeekable()
{
struct stat stat;
auto res = ::fstat(fd, &stat);
return res == 0 && S_ISREG(stat.st_mode);
}
size_t ReadBufferFromFileDescriptor::readBigAt(char * to, size_t n, size_t offset, const std::function<bool(size_t)> &)
{
chassert(use_pread);
return readImpl(to, n, n, offset);
}
}
|