blob: 6f6e8d6bd42d07e0491fa817083d9a64109b32b1 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
#pragma once
#include "clickhouse_config.h"
#if USE_LIBURING
#include <Common/Exception.h>
#include <Common/ThreadPool_fwd.h>
#include <IO/AsynchronousReader.h>
#include <deque>
#include <unordered_map>
#error #include <liburing.h>
namespace Poco { class Logger; }
namespace DB
{
class Exception;
/** Perform reads using the io_uring Linux subsystem.
*
* The class sets up a single io_uring that clients submit read requests to, they are
* placed in a map using the read buffer address as the key and the original request
* with a promise as the value. A monitor thread continuously polls the completion queue,
* looks up the completed request and completes the matching promise.
*/
class IOUringReader final : public IAsynchronousReader
{
private:
bool is_supported;
std::mutex mutex;
struct io_uring ring;
uint32_t cq_entries;
std::atomic<bool> cancelled{false};
std::unique_ptr<ThreadFromGlobalPool> ring_completion_monitor;
struct EnqueuedRequest
{
std::promise<IAsynchronousReader::Result> promise;
Request request;
bool resubmitting; // resubmits can happen due to short reads or when io_uring returns -EAGAIN
size_t bytes_read; // keep track of bytes already read in case short reads happen
};
std::deque<EnqueuedRequest> pending_requests;
std::unordered_map<UInt64, EnqueuedRequest> in_flight_requests;
int submitToRing(EnqueuedRequest & enqueued);
using EnqueuedIterator = std::unordered_map<UInt64, EnqueuedRequest>::iterator;
void failRequest(const EnqueuedIterator & requestIt, const Exception & ex);
void finalizeRequest(const EnqueuedIterator & requestIt);
void monitorRing();
template<typename T> inline void failPromise(std::promise<T> & promise, const Exception & ex)
{
promise.set_exception(std::make_exception_ptr(ex));
}
inline std::future<Result> makeFailedResult(const Exception & ex)
{
auto promise = std::promise<Result>{};
failPromise(promise, ex);
return promise.get_future();
}
const Poco::Logger * log;
public:
IOUringReader(uint32_t entries_);
inline bool isSupported() { return is_supported; }
std::future<Result> submit(Request request) override;
void wait() override {}
~IOUringReader() override;
};
}
#endif
|