aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Processors/Formats/Impl/ParallelParsingInputFormat.h
blob: f61dc3fbc78052652b593e54a2e71fd03aa01937 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
#pragma once

#include <Processors/Formats/IInputFormat.h>
#include <Formats/FormatFactory.h>
#include <Common/CurrentThread.h>
#include <Common/ThreadPool.h>
#include <Common/setThreadName.h>
#include <Common/logger_useful.h>
#include <Common/CurrentMetrics.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/ReadBuffer.h>
#include <Processors/Formats/IRowInputFormat.h>
#include <Interpreters/Context.h>
#include <Poco/Event.h>


namespace CurrentMetrics
{
    extern const Metric ParallelParsingInputFormatThreads;
    extern const Metric ParallelParsingInputFormatThreadsActive;
}

namespace DB
{

namespace ErrorCodes
{
    extern const int LOGICAL_ERROR;
}

class Context;

/**
 * ORDER-PRESERVING parallel parsing of data formats.
 * It splits original data into chunks. Then each chunk is parsed by different thread.
 * The number of chunks equals to the number or parser threads.
 * The size of chunk is equal to min_chunk_bytes_for_parallel_parsing setting.
 *
 *                    Parsers
 *      |   |   |   |   |   |   |   |   |   |
 *      v   v   v   v   v   v   v   v   v   v
 *    |---|---|---|---|---|---|---|---|---|---|
 *    | 1 | 2 | 3 | 4 | 5 | . | . | . | . | N | <-- Processing units
 *    |---|---|---|---|---|---|---|---|---|---|
 *      ^               ^
 *      |               |
 *   readImpl        Segmentator
 *
 * This stream has three kinds of threads: one segmentator, multiple parsers,
 * and one reader thread -- that is, the one from which readImpl() is called.
 * They operate one after another on parts of data called "processing units".
 * One unit consists of buffer with raw data from file, filled by segmentator
 * thread. This raw data is then parsed by a parser thread to form a number of
 * Blocks. These Blocks are returned to the parent stream from readImpl().
 * After being read out, a processing unit is reused, to save on allocating
 * memory for the raw buffer. The processing units are organized into a circular
 * array to facilitate reuse and to apply backpressure on the segmentator thread
 * -- after it runs out of processing units, it has to wait for the reader to
 * read out the previous blocks.
 * The outline of what the threads do is as follows:
 * segmentator thread:
 *  1) wait for the next processing unit to become empty
 *  2) fill it with a part of input file
 *  3) start a parser thread
 *  4) repeat until eof
 * parser thread:
 *  1) parse the given raw buffer without any synchronization
 *  2) signal that the given unit is ready to read
 *  3) finish
 * readImpl():
 *  1) wait for the next processing unit to become ready to read
 *  2) take the blocks from the processing unit to return them to the caller
 *  3) signal that the processing unit is empty
 *  4) repeat until it encounters unit that is marked as "past_the_end"
 * All threads must also check for cancel/eof/exception flags.
 */
class ParallelParsingInputFormat : public IInputFormat
{
public:
    /* Used to recreate parser on every new data piece.*/
    using InternalParserCreator = std::function<InputFormatPtr(ReadBuffer & buf)>;

    struct Params
    {
        ReadBuffer & in;
        Block header;
        InternalParserCreator internal_parser_creator;
        FormatFactory::FileSegmentationEngine file_segmentation_engine;
        String format_name;
        size_t max_threads;
        size_t min_chunk_bytes;
        size_t max_block_size;
        bool is_server;
    };

    explicit ParallelParsingInputFormat(Params params)
        : IInputFormat(std::move(params.header), &params.in)
        , internal_parser_creator(params.internal_parser_creator)
        , file_segmentation_engine(params.file_segmentation_engine)
        , format_name(params.format_name)
        , min_chunk_bytes(params.min_chunk_bytes)
        , max_block_size(params.max_block_size)
        , is_server(params.is_server)
        , pool(CurrentMetrics::ParallelParsingInputFormatThreads, CurrentMetrics::ParallelParsingInputFormatThreadsActive, params.max_threads)
    {
        // One unit for each thread, including segmentator and reader, plus a
        // couple more units so that the segmentation thread doesn't spuriously
        // bump into reader thread on wraparound.
        processing_units.resize(params.max_threads + 2);

        LOG_TRACE(&Poco::Logger::get("ParallelParsingInputFormat"), "Parallel parsing is used");
    }

    ~ParallelParsingInputFormat() override
    {
        finishAndWait();
    }

    void resetParser() override final
    {
        throw Exception(ErrorCodes::LOGICAL_ERROR, "resetParser() is not allowed for {}", getName());
    }

    const BlockMissingValues & getMissingValues() const override final
    {
        return last_block_missing_values;
    }

    size_t getApproxBytesReadForChunk() const override { return last_approx_bytes_read_for_chunk; }

    String getName() const override final { return "ParallelParsingBlockInputFormat"; }

private:

    Chunk generate() override final;

    void onCancel() override final
    {
        /*
         * The format parsers themselves are not being cancelled here, so we'll
         * have to wait until they process the current block. Given that the
         * chunk size is on the order of megabytes, this shouldn't be too long.
         * We can't call IInputFormat->cancel here, because the parser object is
         * local to the parser thread, and we don't want to introduce any
         * synchronization between parser threads and the other threads to get
         * better performance. An ideal solution would be to add a callback to
         * IInputFormat that checks whether it was cancelled.
         */

        finishAndWait();
    }

    class InternalParser
    {
    public:
        explicit InternalParser(const InputFormatPtr & input_format_)
        : input_format(input_format_)
        , port(input_format->getPort().getHeader(), input_format.get())
        {
            connect(input_format->getPort(), port);
            port.setNeeded();
        }

        Chunk getChunk()
        {
            while (true)
            {
                IProcessor::Status status = input_format->prepare();
                switch (status)
                {
                    case IProcessor::Status::Ready:
                        input_format->work();
                        break;

                    case IProcessor::Status::Finished:
                        return {};

                    case IProcessor::Status::PortFull:
                        return port.pull();

                    case IProcessor::Status::NeedData: break;
                    case IProcessor::Status::Async: break;
                    case IProcessor::Status::ExpandPipeline:
                        throw Exception(ErrorCodes::LOGICAL_ERROR, "One of the parsers returned status {} during parallel parsing",
                                             IProcessor::statusToName(status));
                }
            }
        }

        const BlockMissingValues & getMissingValues() const { return input_format->getMissingValues(); }

    private:
        const InputFormatPtr & input_format;
        InputPort port;
    };

    const InternalParserCreator internal_parser_creator;
    /// Function to segment the file. Then "parsers" will parse that segments.
    FormatFactory::FileSegmentationEngine file_segmentation_engine;
    const String format_name;
    const size_t min_chunk_bytes;
    const size_t max_block_size;

    BlockMissingValues last_block_missing_values;
    size_t last_approx_bytes_read_for_chunk = 0;

    /// Non-atomic because it is used in one thread.
    std::optional<size_t> next_block_in_current_unit;
    size_t segmentator_ticket_number{0};
    size_t reader_ticket_number{0};

    /// Mutex for internal synchronization between threads
    std::mutex mutex;

    /// finishAndWait can be called concurrently from
    /// multiple threads. Atomic flag is not enough
    /// because if finishAndWait called before destructor it can check the flag
    /// and destroy object immediately.
    std::mutex finish_and_wait_mutex;
    /// We don't use parsing_finished flag because it can be setup from multiple
    /// place in code. For example in case of bad data. It doesn't mean that we
    /// don't need to finishAndWait our class.
    bool finish_and_wait_called = false;

    std::condition_variable reader_condvar;
    std::condition_variable segmentator_condvar;

    Poco::Event first_parser_finished;

    std::atomic<bool> parsing_started{false};
    std::atomic<bool> parsing_finished{false};

    const bool is_server;

    /// There are multiple "parsers", that's why we use thread pool.
    ThreadPool pool;
    /// Reading and segmentating the file
    ThreadFromGlobalPool segmentator_thread;

    enum ProcessingUnitStatus
    {
        READY_TO_INSERT,
        READY_TO_PARSE,
        READY_TO_READ
    };

    struct ChunkExt
    {
        std::vector<Chunk> chunk;
        std::vector<BlockMissingValues> block_missing_values;
        std::vector<size_t> approx_chunk_sizes;
    };

    struct ProcessingUnit
    {
        ProcessingUnit()
            : status(ProcessingUnitStatus::READY_TO_INSERT)
        {
        }

        ChunkExt chunk_ext;
        Memory<> segment;
        size_t original_segment_size;
        std::atomic<ProcessingUnitStatus> status;
        /// Needed for better exception message.
        size_t offset = 0;
        bool is_last{false};
    };

    std::exception_ptr background_exception = nullptr;

    /// We use deque instead of vector, because it does not require a move
    /// constructor, which is absent for atomics that are inside ProcessingUnit.
    std::deque<ProcessingUnit> processing_units;

    /// Compute it to have a more understandable error message.
    size_t successfully_read_rows_count{0};


    void scheduleParserThreadForUnitWithNumber(size_t ticket_number)
    {
        pool.scheduleOrThrowOnError([this, ticket_number, group = CurrentThread::getGroup()]()
        {
            parserThreadFunction(group, ticket_number);
        });
        /// We have to wait here to possibly extract ColumnMappingPtr from the first parser.
        if (ticket_number == 0)
            first_parser_finished.wait();
    }

    void finishAndWait()
    {
        /// Defending concurrent segmentator thread join
        std::lock_guard finish_and_wait_lock(finish_and_wait_mutex);

        /// We shouldn't execute this logic twice
        if (finish_and_wait_called)
            return;

        finish_and_wait_called = true;

        /// Signal background threads to finish
        parsing_finished = true;

        {
            /// Additionally notify condvars
            std::lock_guard lock(mutex);
            segmentator_condvar.notify_all();
            reader_condvar.notify_all();
        }

        if (segmentator_thread.joinable())
            segmentator_thread.join();

        try
        {
            pool.wait();
        }
        catch (...)
        {
            tryLogCurrentException(__PRETTY_FUNCTION__);
        }
    }

    void segmentatorThreadFunction(ThreadGroupPtr thread_group);
    void parserThreadFunction(ThreadGroupPtr thread_group, size_t current_ticket_number);

    /// Save/log a background exception, set termination flag, wake up all
    /// threads. This function is used by segmentator and parsed threads.
    /// readImpl() is called from the main thread, so the exception handling
    /// is different.
    void onBackgroundException(size_t offset);
};

}