aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Disks/IO/IOUringReader.h
blob: 6f6e8d6bd42d07e0491fa817083d9a64109b32b1 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#pragma once

#include "clickhouse_config.h"

#if USE_LIBURING

#include <Common/Exception.h>
#include <Common/ThreadPool_fwd.h>
#include <IO/AsynchronousReader.h>
#include <deque>
#include <unordered_map>
#error #include <liburing.h>

namespace Poco { class Logger; }

namespace DB
{

class Exception;

/** Perform reads using the io_uring Linux subsystem.
  *
  * The class sets up a single io_uring that clients submit read requests to, they are
  * placed in a map using the read buffer address as the key and the original request
  * with a promise as the value. A monitor thread continuously polls the completion queue,
  * looks up the completed request and completes the matching promise.
  */
class IOUringReader final : public IAsynchronousReader
{
private:
    bool is_supported;

    std::mutex mutex;
    struct io_uring ring;
    uint32_t cq_entries;

    std::atomic<bool> cancelled{false};
    std::unique_ptr<ThreadFromGlobalPool> ring_completion_monitor;

    struct EnqueuedRequest
    {
        std::promise<IAsynchronousReader::Result> promise;
        Request request;
        bool resubmitting; // resubmits can happen due to short reads or when io_uring returns -EAGAIN
        size_t bytes_read; // keep track of bytes already read in case short reads happen
    };

    std::deque<EnqueuedRequest> pending_requests;
    std::unordered_map<UInt64, EnqueuedRequest> in_flight_requests;

    int submitToRing(EnqueuedRequest & enqueued);

    using EnqueuedIterator = std::unordered_map<UInt64, EnqueuedRequest>::iterator;

    void failRequest(const EnqueuedIterator & requestIt, const Exception & ex);
    void finalizeRequest(const EnqueuedIterator & requestIt);

    void monitorRing();

    template<typename T> inline void failPromise(std::promise<T> & promise, const Exception & ex)
    {
        promise.set_exception(std::make_exception_ptr(ex));
    }

    inline std::future<Result> makeFailedResult(const Exception & ex)
    {
        auto promise = std::promise<Result>{};
        failPromise(promise, ex);
        return promise.get_future();
    }

    const Poco::Logger * log;

public:
    IOUringReader(uint32_t entries_);

    inline bool isSupported() { return is_supported; }
    std::future<Result> submit(Request request) override;

    void wait() override {}

    ~IOUringReader() override;
};

}
#endif