aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/IO/SynchronousReader.cpp
blob: e1c654e48a3a6c1179b12d6221cf76dc65d1eeaa (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#include <IO/SynchronousReader.h>
#include <Common/assert_cast.h>
#include <Common/Exception.h>
#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h>
#include <base/errnoToString.h>
#include <unordered_map>
#include <mutex>
#include <unistd.h>
#include <fcntl.h>


namespace ProfileEvents
{
    extern const Event ReadBufferFromFileDescriptorRead;
    extern const Event ReadBufferFromFileDescriptorReadFailed;
    extern const Event ReadBufferFromFileDescriptorReadBytes;
    extern const Event DiskReadElapsedMicroseconds;
}

namespace CurrentMetrics
{
    extern const Metric Read;
}

namespace DB
{

namespace ErrorCodes
{
    extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
    extern const int CANNOT_ADVISE;
}


std::future<IAsynchronousReader::Result> SynchronousReader::submit(Request request)
{
    /// If size is zero, then read() cannot be distinguished from EOF
    assert(request.size);

    int fd = assert_cast<const LocalFileDescriptor &>(*request.descriptor).fd;

#if defined(POSIX_FADV_WILLNEED)
    if (0 != posix_fadvise(fd, request.offset, request.size, POSIX_FADV_WILLNEED))
        throwFromErrno("Cannot posix_fadvise", ErrorCodes::CANNOT_ADVISE);
#endif

    return std::async(std::launch::deferred, [fd, request]
    {
        ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorRead);
        Stopwatch watch(CLOCK_MONOTONIC);

        size_t bytes_read = 0;
        while (!bytes_read)
        {
            ssize_t res = 0;

            {
                CurrentMetrics::Increment metric_increment{CurrentMetrics::Read};
                res = ::pread(fd, request.buf, request.size, request.offset);
            }
            if (!res)
                break;

            if (-1 == res && errno != EINTR)
            {
                ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadFailed);
                throwFromErrno(fmt::format("Cannot read from file {}", fd), ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR);
            }

            if (res > 0)
                bytes_read += res;
        }

        ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadBytes, bytes_read);

        /// It reports real time spent including the time spent while thread was preempted doing nothing.
        /// And it is Ok for the purpose of this watch (it is used to lower the number of threads to read from tables).
        /// Sometimes it is better to use taskstats::blkio_delay_total, but it is quite expensive to get it
        /// (NetlinkMetricsProvider has about 500K RPS).
        watch.stop();
        ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds());

        return Result{ .size = bytes_read, .offset = request.ignore };
    });
}

}