1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
#include <IO/SynchronousReader.h>
#include <Common/assert_cast.h>
#include <Common/Exception.h>
#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h>
#include <base/errnoToString.h>
#include <unordered_map>
#include <mutex>
#include <unistd.h>
#include <fcntl.h>
namespace ProfileEvents
{
extern const Event ReadBufferFromFileDescriptorRead;
extern const Event ReadBufferFromFileDescriptorReadFailed;
extern const Event ReadBufferFromFileDescriptorReadBytes;
extern const Event DiskReadElapsedMicroseconds;
}
namespace CurrentMetrics
{
extern const Metric Read;
}
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
extern const int CANNOT_ADVISE;
}
std::future<IAsynchronousReader::Result> SynchronousReader::submit(Request request)
{
/// If size is zero, then read() cannot be distinguished from EOF
assert(request.size);
int fd = assert_cast<const LocalFileDescriptor &>(*request.descriptor).fd;
#if defined(POSIX_FADV_WILLNEED)
if (0 != posix_fadvise(fd, request.offset, request.size, POSIX_FADV_WILLNEED))
throwFromErrno("Cannot posix_fadvise", ErrorCodes::CANNOT_ADVISE);
#endif
return std::async(std::launch::deferred, [fd, request]
{
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorRead);
Stopwatch watch(CLOCK_MONOTONIC);
size_t bytes_read = 0;
while (!bytes_read)
{
ssize_t res = 0;
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::Read};
res = ::pread(fd, request.buf, request.size, request.offset);
}
if (!res)
break;
if (-1 == res && errno != EINTR)
{
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadFailed);
throwFromErrno(fmt::format("Cannot read from file {}", fd), ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR);
}
if (res > 0)
bytes_read += res;
}
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadBytes, bytes_read);
/// It reports real time spent including the time spent while thread was preempted doing nothing.
/// And it is Ok for the purpose of this watch (it is used to lower the number of threads to read from tables).
/// Sometimes it is better to use taskstats::blkio_delay_total, but it is quite expensive to get it
/// (NetlinkMetricsProvider has about 500K RPS).
watch.stop();
ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds());
return Result{ .size = bytes_read, .offset = request.ignore };
});
}
}
|