aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/MergeTree/MergeTreeRangeReader.h
blob: 04d421389634b1c9efe3f4166f6b58a2850d86a8 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
#pragma once
#include <Core/Block.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnsCommon.h>
#include <Columns/FilterDescription.h>
#include <Storages/MergeTree/MarkRange.h>

namespace DB
{

template <typename T>
class ColumnVector;
using ColumnUInt8 = ColumnVector<UInt8>;

class IMergeTreeReader;
class MergeTreeIndexGranularity;
struct PrewhereInfo;
using PrewhereInfoPtr = std::shared_ptr<PrewhereInfo>;

class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;

struct PrewhereExprStep
{
    enum Type
    {
        Filter,
        Expression,
    };

    Type type = Type::Filter;
    ExpressionActionsPtr actions;
    String filter_column_name;

    bool remove_filter_column = false;
    bool need_filter = false;

    /// Some PREWHERE steps should be executed without conversions.
    /// A step without alter conversion cannot be executed after step with alter conversions.
    bool perform_alter_conversions = false;
};

using PrewhereExprStepPtr = std::shared_ptr<PrewhereExprStep>;
using PrewhereExprSteps = std::vector<PrewhereExprStepPtr>;

/// The same as PrewhereInfo, but with ExpressionActions instead of ActionsDAG
struct PrewhereExprInfo
{
    PrewhereExprSteps steps;

    std::string dump() const;
    std::string dumpConditions() const;
};

class FilterWithCachedCount
{
    ConstantFilterDescription const_description;  /// TODO: ConstantFilterDescription only checks always true/false for const columns
                                                  /// think how to handle when the column in not const but has all 0s or all 1s
    ColumnPtr column = nullptr;
    const IColumn::Filter * data = nullptr;
    mutable size_t cached_count_bytes = -1;

public:
    explicit FilterWithCachedCount() = default;

    explicit FilterWithCachedCount(const ColumnPtr & column_)
        : const_description(*column_)
    {
        ColumnPtr col = column_->convertToFullIfNeeded();
        FilterDescription desc(*col);
        column = desc.data_holder ? desc.data_holder : col;
        data = desc.data;
    }

    bool present() const { return !!column; }

    bool alwaysTrue() const { return const_description.always_true; }
    bool alwaysFalse() const { return const_description.always_false; }

    ColumnPtr getColumn() const { return column; }

    const IColumn::Filter & getData() const { return *data; }

    size_t size() const { return column->size(); }

    size_t countBytesInFilter() const
    {
        if (cached_count_bytes == size_t(-1))
            cached_count_bytes = DB::countBytesInFilter(*data);
        return cached_count_bytes;
    }
};

/// MergeTreeReader iterator which allows sequential reading for arbitrary number of rows between pairs of marks in the same part.
/// Stores reading state, which can be inside granule. Can skip rows in current granule and start reading from next mark.
/// Used generally for reading number of rows less than index granularity to decrease cache misses for fat blocks.
class MergeTreeRangeReader
{
public:
    MergeTreeRangeReader(
        IMergeTreeReader * merge_tree_reader_,
        MergeTreeRangeReader * prev_reader_,
        const PrewhereExprStep * prewhere_info_,
        bool last_reader_in_chain_,
        const Names & non_const_virtual_column_names);

    MergeTreeRangeReader() = default;

    bool isReadingFinished() const;

    size_t numReadRowsInCurrentGranule() const;
    size_t numPendingRowsInCurrentGranule() const;
    size_t numRowsInCurrentGranule() const;
    size_t currentMark() const;

    bool isCurrentRangeFinished() const;
    bool isInitialized() const { return is_initialized; }

private:
    /// Accumulates sequential read() requests to perform a large read instead of multiple small reads
    class DelayedStream
    {
    public:
        DelayedStream() = default;
        DelayedStream(size_t from_mark, size_t current_task_last_mark_, IMergeTreeReader * merge_tree_reader);

        /// Read @num_rows rows from @from_mark starting from @offset row
        /// Returns the number of rows added to block.
        /// NOTE: have to return number of rows because block has broken invariant:
        ///       some columns may have different size (for example, default columns may be zero size).
        size_t read(Columns & columns, size_t from_mark, size_t offset, size_t num_rows);

        /// Skip extra rows to current_offset and perform actual reading
        size_t finalize(Columns & columns);

        bool isFinished() const { return is_finished; }

    private:
        size_t current_mark = 0;
        /// Offset from current mark in rows
        size_t current_offset = 0;
        /// Num of rows we have to read
        size_t num_delayed_rows = 0;
        /// Last mark from all ranges of current task.
        size_t current_task_last_mark = 0;

        /// Actual reader of data from disk
        IMergeTreeReader * merge_tree_reader = nullptr;
        const MergeTreeIndexGranularity * index_granularity = nullptr;
        bool continue_reading = false;
        bool is_finished = true;

        /// Current position from the begging of file in rows
        size_t position() const;
        size_t readRows(Columns & columns, size_t num_rows);
    };

    /// Very thin wrapper for DelayedStream
    /// Check bounds of read ranges and make steps between marks
    class Stream
    {
    public:
        Stream() = default;
        Stream(size_t from_mark, size_t to_mark,
               size_t current_task_last_mark, IMergeTreeReader * merge_tree_reader);

        /// Returns the number of rows added to block.
        size_t read(Columns & columns, size_t num_rows, bool skip_remaining_rows_in_current_granule);
        size_t finalize(Columns & columns);
        void skip(size_t num_rows);

        void finish() { current_mark = last_mark; }
        bool isFinished() const { return current_mark >= last_mark; }

        size_t numReadRowsInCurrentGranule() const { return offset_after_current_mark; }
        size_t numPendingRowsInCurrentGranule() const
        {
            return current_mark_index_granularity - numReadRowsInCurrentGranule();
        }
        size_t numPendingGranules() const { return last_mark - current_mark; }
        size_t numPendingRows() const;
        size_t currentMark() const { return current_mark; }
        UInt64 currentPartOffset() const;
        UInt64 lastPartOffset() const;

        size_t current_mark = 0;
        /// Invariant: offset_after_current_mark + skipped_rows_after_offset < index_granularity
        size_t offset_after_current_mark = 0;

        /// Last mark in current range.
        size_t last_mark = 0;

        IMergeTreeReader * merge_tree_reader = nullptr;
        const MergeTreeIndexGranularity * index_granularity = nullptr;

        size_t current_mark_index_granularity = 0;

        DelayedStream stream;

        void checkNotFinished() const;
        void checkEnoughSpaceInCurrentGranule(size_t num_rows) const;
        size_t readRows(Columns & columns, size_t num_rows);
        void toNextMark();
        size_t ceilRowsToCompleteGranules(size_t rows_num) const;
    };

public:
    /// Statistics after next reading step.
    class ReadResult
    {
    public:
        Columns columns;
        size_t num_rows = 0;

        /// The number of rows were added to block as a result of reading chain.
        size_t numReadRows() const { return num_read_rows; }
        /// The number of bytes read from disk.
        size_t numBytesRead() const { return num_bytes_read; }

    private:
        /// Only MergeTreeRangeReader is supposed to access ReadResult internals.
        friend class MergeTreeRangeReader;

        using NumRows = std::vector<size_t>;

        struct RangeInfo
        {
            size_t num_granules_read_before_start;
            MarkRange range;
        };

        using RangesInfo = std::vector<RangeInfo>;

        explicit ReadResult(Poco::Logger * log_) : log(log_) {}

        static size_t getLastMark(const MergeTreeRangeReader::ReadResult::RangesInfo & ranges);

        void addGranule(size_t num_rows_);
        void adjustLastGranule();
        void addRows(size_t rows) { num_read_rows += rows; }
        void addRange(const MarkRange & range) { started_ranges.push_back({rows_per_granule.size(), range}); }

        /// Add current step filter to the result and then for each granule calculate the number of filtered rows at the end.
        /// Remove them and update filter.
        /// Apply the filter to the columns and update num_rows if required
        void optimize(const FilterWithCachedCount & current_filter, bool can_read_incomplete_granules);
        /// Remove all rows from granules.
        void clear();

        void setFilterConstTrue();

        void addNumBytesRead(size_t count) { num_bytes_read += count; }

        /// Shrinks columns according to the diff between current and previous rows_per_granule.
        void shrink(Columns & old_columns, const NumRows & rows_per_granule_previous) const;

        /// Applies the filter to the columns and updates num_rows.
        void applyFilter(const FilterWithCachedCount & filter);

        /// Verifies that columns and filter sizes match.
        /// The checks might be non-trivial so it make sense to have the only in debug builds.
        void checkInternalConsistency() const;

        std::string dumpInfo() const;

        /// Contains columns that are not included into result but might be needed for default values calculation.
        Block additional_columns;

        RangesInfo started_ranges;
        /// The number of rows read from each granule.
        /// Granule here is not number of rows between two marks
        /// It's amount of rows per single reading act
        NumRows rows_per_granule;
        /// Sum(rows_per_granule)
        size_t total_rows_per_granule = 0;
        /// The number of rows was read at first step. May be zero if no read columns present in part.
        size_t num_read_rows = 0;
        /// The number of rows was removed from last granule after clear or optimize.
        size_t num_rows_to_skip_in_last_granule = 0;
        /// Without any filtration.
        size_t num_bytes_read = 0;

        /// This filter has the size of total_rows_per_granule. This means that it can be applied to newly read columns.
        /// The result of applying this filter is that only rows that pass all previous filtering steps will remain.
        FilterWithCachedCount final_filter;

        /// This flag is true when prewhere column can be returned without filtering.
        /// It's true when it contains 0s from all filtering steps (not just the step when it was calculated).
        /// NOTE: If we accumulated the final_filter for several steps without applying it then prewhere column calculated at the last step
        /// will not contain 0s from all previous steps.
        bool can_return_prewhere_column_without_filtering = true;

        /// Checks if result columns have current final_filter applied.
        bool filterWasApplied() const { return !final_filter.present() || final_filter.countBytesInFilter() == num_rows; }

        /// Builds updated filter by cutting zeros in granules tails
        void collapseZeroTails(const IColumn::Filter & filter, const NumRows & rows_per_granule_previous, IColumn::Filter & new_filter) const;
        size_t countZeroTails(const IColumn::Filter & filter, NumRows & zero_tails, bool can_read_incomplete_granules) const;
        static size_t numZerosInTail(const UInt8 * begin, const UInt8 * end);

        Poco::Logger * log;
    };

    ReadResult read(size_t max_rows, MarkRanges & ranges);

    const Block & getSampleBlock() const { return result_sample_block; }

private:
    ReadResult startReadingChain(size_t max_rows, MarkRanges & ranges);
    Columns continueReadingChain(const ReadResult & result, size_t & num_rows);
    void executePrewhereActionsAndFilterColumns(ReadResult & result) const;
    void fillPartOffsetColumn(ReadResult & result, UInt64 leading_begin_part_offset, UInt64 leading_end_part_offset);

    IMergeTreeReader * merge_tree_reader = nullptr;
    const MergeTreeIndexGranularity * index_granularity = nullptr;
    MergeTreeRangeReader * prev_reader = nullptr; /// If not nullptr, read from prev_reader firstly.
    const PrewhereExprStep * prewhere_info;

    Stream stream;

    Block read_sample_block;    /// Block with columns that are actually read from disk + non-const virtual columns that are filled at this step.
    Block result_sample_block;  /// Block with columns that are returned by this step.

    bool last_reader_in_chain = false;
    bool is_initialized = false;
    Names non_const_virtual_column_names;

    Poco::Logger * log = &Poco::Logger::get("MergeTreeRangeReader");
};

}