aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/IO/ParallelReadBuffer.cpp
blob: 8d73f221748f23ae4cc0ca31aca607fe96d17e0e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
#include <IO/ParallelReadBuffer.h>
#include <IO/SharedThreadPools.h>
#include <Poco/Logger.h>
#include <Common/logger_useful.h>

namespace DB
{

namespace ErrorCodes
{
    extern const int UNEXPECTED_END_OF_FILE;
    extern const int CANNOT_SEEK_THROUGH_FILE;
    extern const int SEEK_POSITION_OUT_OF_BOUND;

}

// A subrange of the input, read by one thread.
struct ParallelReadBuffer::ReadWorker
{
    ReadWorker(SeekableReadBuffer & input_, size_t offset, size_t size)
        : input(input_), start_offset(offset), segment(size)
    {
        chassert(size);
        chassert(segment.size() == size);
    }

    bool hasBytesToConsume() const { return bytes_produced > bytes_consumed; }
    bool hasBytesToProduce() const { return bytes_produced < segment.size(); }

    SeekableReadBuffer & input;
    const size_t start_offset; // start of the segment

    Memory<> segment;
    /// Reader thread produces data, nextImpl() consumes it.
    /// segment[bytes_consumed..bytes_produced-1] is data waiting to be picked up by nextImpl()
    /// segment[bytes_produced..] needs to be read from the input ReadBuffer
    size_t bytes_produced = 0;
    size_t bytes_consumed = 0;

    std::atomic_bool cancel{false};
    std::mutex worker_mutex;
};

ParallelReadBuffer::ParallelReadBuffer(
    SeekableReadBuffer & input_, ThreadPoolCallbackRunner<void> schedule_, size_t max_working_readers_, size_t range_step_, size_t file_size_)
    : SeekableReadBuffer(nullptr, 0)
    , max_working_readers(max_working_readers_)
    , schedule(std::move(schedule_))
    , input(input_)
    , file_size(file_size_)
    , range_step(std::max(1ul, range_step_))
{
    LOG_TRACE(&Poco::Logger::get("ParallelReadBuffer"), "Parallel reading is used");

    try
    {
        addReaders();
    }
    catch (const Exception &)
    {
        finishAndWait();
        throw;
    }
}

bool ParallelReadBuffer::addReaderToPool()
{
    if (next_range_start >= file_size)
        return false;
    size_t range_start = next_range_start;
    size_t size = std::min(range_step, file_size - range_start);
    next_range_start += size;

    auto worker = read_workers.emplace_back(std::make_shared<ReadWorker>(input, range_start, size));

    ++active_working_readers;
    schedule([this, my_worker = std::move(worker)]() mutable { readerThreadFunction(std::move(my_worker)); }, Priority{});

    return true;
}

void ParallelReadBuffer::addReaders()
{
    while (read_workers.size() < max_working_readers && addReaderToPool())
        ;
}

off_t ParallelReadBuffer::seek(off_t offset, int whence)
{
    if (whence != SEEK_SET)
        throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Only SEEK_SET mode is allowed.");

    if (offset < 0)
        throw Exception(ErrorCodes::SEEK_POSITION_OUT_OF_BOUND, "Seek position is out of bounds. Offset: {}", offset);

    if (!working_buffer.empty() && static_cast<size_t>(offset) >= current_position - working_buffer.size() && offset < current_position)
    {
        pos = working_buffer.end() - (current_position - offset);
        assert(pos >= working_buffer.begin());
        assert(pos <= working_buffer.end());

        return offset;
    }

    const auto offset_is_in_range
        = [&](const auto & worker) { return static_cast<size_t>(offset) >= worker->start_offset && static_cast<size_t>(offset) < worker->start_offset + worker->segment.size(); };

    while (!read_workers.empty() && !offset_is_in_range(read_workers.front()))
    {
        read_workers.front()->cancel = true;
        read_workers.pop_front();
    }

    if (!read_workers.empty())
    {
        auto & w = read_workers.front();
        size_t diff = static_cast<size_t>(offset) - w->start_offset;
        while (true)
        {
            std::unique_lock lock{w->worker_mutex};

            if (emergency_stop)
                handleEmergencyStop();

            if (w->bytes_produced > diff)
            {
                working_buffer = internal_buffer = Buffer(
                    w->segment.data(), w->segment.data() + w->bytes_produced);
                pos = working_buffer.begin() + diff;
                w->bytes_consumed = w->bytes_produced;
                current_position = w->start_offset + w->bytes_consumed;
                addReaders();
                return offset;
            }

            next_condvar.wait_for(lock, std::chrono::seconds(10));
        }
    }

    finishAndWait();

    read_workers.clear();

    next_range_start = offset;
    current_position = offset;
    resetWorkingBuffer();

    emergency_stop = false;

    addReaders();
    return offset;
}

size_t ParallelReadBuffer::getFileSize()
{
    return file_size;
}

off_t ParallelReadBuffer::getPosition()
{
    return current_position - available();
}

void ParallelReadBuffer::handleEmergencyStop()
{
    // this can only be called from the main thread when there is an exception
    assert(background_exception);
    std::rethrow_exception(background_exception);
}

bool ParallelReadBuffer::nextImpl()
{
    while (true)
    {
        /// All readers processed, stop
        if (read_workers.empty())
        {
            chassert(next_range_start >= file_size);
            return false;
        }

        auto * w = read_workers.front().get();

        std::unique_lock lock{w->worker_mutex};

        if (emergency_stop)
            handleEmergencyStop(); // throws

        /// Read data from front reader
        if (w->bytes_produced > w->bytes_consumed)
        {
            chassert(w->start_offset + w->bytes_consumed == static_cast<size_t>(current_position));

            working_buffer = internal_buffer = Buffer(
                w->segment.data() + w->bytes_consumed, w->segment.data() + w->bytes_produced);
            current_position += working_buffer.size();
            w->bytes_consumed = w->bytes_produced;

            return true;
        }

        /// Front reader is done, remove it and add another
        if (!w->hasBytesToProduce())
        {
            lock.unlock();
            read_workers.pop_front();
            addReaders();

            continue;
        }

        /// Nothing to do right now, wait for something to change.
        ///
        /// The timeout is a workaround for a race condition.
        /// emergency_stop is assigned while holding a *different* mutex from the one we're holding
        /// (exception_mutex vs worker_mutex). So it's possible that our emergency_stop check (above)
        /// happens before a onBackgroundException() call, but our wait(lock) happens after it.
        /// Then the wait may get stuck forever.
        ///
        /// Note that using wait(lock, [&]{ return emergency_stop || ...; }) wouldn't help because
        /// it does effectively the same "check, then wait" sequence.
        ///
        /// One possible proper fix would be to make onBackgroundException() lock all read_workers
        /// mutexes too (not necessarily simultaneously - just locking+unlocking them one by one
        /// between the emergency_stop change and the notify_all() would be enough), but then we
        /// need another mutex to protect read_workers itself...
        next_condvar.wait_for(lock, std::chrono::seconds(10));
    }
    chassert(false);
    return false;
}

void ParallelReadBuffer::readerThreadFunction(ReadWorkerPtr read_worker)
{
    SCOPE_EXIT({
        if (active_working_readers.fetch_sub(1) == 1)
            active_working_readers.notify_all();
    });

    try
    {
        auto on_progress = [&](size_t bytes_read) -> bool
        {
            if (emergency_stop || read_worker->cancel)
                return true;

            std::lock_guard lock(read_worker->worker_mutex);
            if (bytes_read <= read_worker->bytes_produced)
                return false;

            bool need_notify = read_worker->bytes_produced == read_worker->bytes_consumed;
            read_worker->bytes_produced = bytes_read;
            if (need_notify)
                next_condvar.notify_all();

            return false;
        };

        size_t r = input.readBigAt(read_worker->segment.data(), read_worker->segment.size(), read_worker->start_offset, on_progress);

        if (!on_progress(r) && r < read_worker->segment.size())
            throw Exception(
                ErrorCodes::UNEXPECTED_END_OF_FILE,
                "Failed to read all the data from the reader at offset {}, got {}/{} bytes",
                read_worker->start_offset, r, read_worker->segment.size());
    }
    catch (...)
    {
        onBackgroundException();
    }
}

void ParallelReadBuffer::onBackgroundException()
{
    std::lock_guard lock{exception_mutex};
    if (!background_exception)
        background_exception = std::current_exception();

    emergency_stop = true;
    next_condvar.notify_all();
}

void ParallelReadBuffer::finishAndWait()
{
    emergency_stop = true;

    size_t active_readers = active_working_readers.load();
    while (active_readers != 0)
    {
        active_working_readers.wait(active_readers);
        active_readers = active_working_readers.load();
    }
}

std::unique_ptr<ParallelReadBuffer> wrapInParallelReadBufferIfSupported(
    ReadBuffer & buf, ThreadPoolCallbackRunner<void> schedule, size_t max_working_readers,
    size_t range_step, size_t file_size)
{
    auto * seekable = dynamic_cast<SeekableReadBuffer*>(&buf);
    if (!seekable || !seekable->supportsReadAt())
        return nullptr;

    return std::make_unique<ParallelReadBuffer>(
        *seekable, schedule, max_working_readers, range_step, file_size);
}

}