aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Storages/MergeTree/MergeTreeBlockReadUtils.h
blob: 5df3e600a1ad7e9bfae701cf89ff5ee7b2a51308 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
#pragma once

#include <optional>
#include <Core/NamesAndTypes.h>
#include <Storages/StorageSnapshot.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Storages/MergeTree/MergeTreeRangeReader.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Storages/MergeTree/AlterConversions.h>


namespace DB
{

class MergeTreeData;
struct MergeTreeReadTask;
struct MergeTreeReaderSettings;
struct MergeTreeBlockSizePredictor;
class IMergeTreeDataPartInfoForReader;

using MergeTreeReadTaskPtr = std::unique_ptr<MergeTreeReadTask>;
using MergeTreeBlockSizePredictorPtr = std::shared_ptr<MergeTreeBlockSizePredictor>;

class IMergeTreeDataPart;
using DataPartPtr = std::shared_ptr<const IMergeTreeDataPart>;


/** If some of the requested columns are not in the part,
  * then find out which columns may need to be read further,
  * so that you can calculate the DEFAULT expression for these columns.
  * Adds them to the `columns`.
  */
NameSet injectRequiredColumns(
    const IMergeTreeDataPartInfoForReader & data_part_info_for_reader,
    const StorageSnapshotPtr & storage_snapshot,
    bool with_subcolumns,
    Names & columns);

struct MergeTreeReadTaskColumns
{
    /// column names to read during WHERE
    NamesAndTypesList columns;
    /// column names to read during each PREWHERE step
    std::vector<NamesAndTypesList> pre_columns;

    std::string dump() const;
};

/// A batch of work for MergeTreeThreadSelectProcessor
struct MergeTreeReadTask
{
    /// Data part which should be read while performing this task
    DataPartPtr data_part;
    /// Alter converversionss that should be applied on-fly for part.
    AlterConversionsPtr alter_conversions;
    /// Ranges to read from `data_part`.
    MarkRanges mark_ranges;
    /// for virtual `part_index` virtual column
    size_t part_index_in_query;
    /// used to determine whether column should be filtered during PREWHERE or WHERE
    const NameSet & column_name_set;
    /// column names to read during PREWHERE and WHERE
    const MergeTreeReadTaskColumns & task_columns;
    /// Used to satistfy preferred_block_size_bytes limitation
    MergeTreeBlockSizePredictorPtr size_predictor;
    /// Used to save current range processing status
    MergeTreeRangeReader range_reader;
    /// Range readers for multiple filtering steps: row level security, PREWHERE etc.
    /// NOTE: we take references to elements and push_back new elements, that's why it is a deque but not a vector
    std::deque<MergeTreeRangeReader> pre_range_readers;

    using MergeTreeReaderPtr = std::unique_ptr<IMergeTreeReader>;
    std::future<MergeTreeReaderPtr> reader;
    std::vector<std::future<MergeTreeReaderPtr>> pre_reader_for_step;

    Priority priority;

    bool isFinished() const { return mark_ranges.empty() && range_reader.isCurrentRangeFinished(); }

    MergeTreeReadTask(
        const DataPartPtr & data_part_,
        const AlterConversionsPtr & alter_conversions_,
        const MarkRanges & mark_ranges_,
        size_t part_index_in_query_,
        const NameSet & column_name_set_,
        const MergeTreeReadTaskColumns & task_columns_,
        MergeTreeBlockSizePredictorPtr size_predictor_,
        Priority priority_ = {},
        std::future<MergeTreeReaderPtr> reader_ = {},
        std::vector<std::future<MergeTreeReaderPtr>> && pre_reader_for_step_ = {});

    ~MergeTreeReadTask();
};


MergeTreeReadTaskColumns getReadTaskColumns(
    const IMergeTreeDataPartInfoForReader & data_part_info_for_reader,
    const StorageSnapshotPtr & storage_snapshot,
    const Names & required_columns,
    const Names & system_columns,
    const PrewhereInfoPtr & prewhere_info,
    const ExpressionActionsSettings & actions_settings,
    const MergeTreeReaderSettings & reader_settings,
    bool with_subcolumns);

struct MergeTreeBlockSizePredictor
{
    MergeTreeBlockSizePredictor(const DataPartPtr & data_part_, const Names & columns, const Block & sample_block);

    /// Reset some values for correct statistics calculating
    void startBlock();

    /// Updates statistic for more accurate prediction
    void update(const Block & sample_block, const Columns & columns, size_t num_rows, double decay = calculateDecay());

    /// Return current block size (after update())
    inline size_t getBlockSize() const
    {
        return block_size_bytes;
    }


    /// Predicts what number of rows should be read to exhaust byte quota per column
    inline size_t estimateNumRowsForMaxSizeColumn(size_t bytes_quota) const
    {
        double max_size_per_row = std::max<double>(std::max<size_t>(max_size_per_row_fixed, 1), max_size_per_row_dynamic);
        return (bytes_quota > block_size_rows * max_size_per_row)
            ? static_cast<size_t>(bytes_quota / max_size_per_row) - block_size_rows
            : 0;
    }

    /// Predicts what number of rows should be read to exhaust byte quota per block
    inline size_t estimateNumRows(size_t bytes_quota) const
    {
        return (bytes_quota > block_size_bytes)
            ? static_cast<size_t>((bytes_quota - block_size_bytes) / std::max<size_t>(1, static_cast<size_t>(bytes_per_row_current)))
            : 0;
    }

    inline void updateFilteredRowsRation(size_t rows_was_read, size_t rows_was_filtered, double decay = calculateDecay())
    {
        double alpha = std::pow(1. - decay, rows_was_read);
        double current_ration = rows_was_filtered / std::max(1.0, static_cast<double>(rows_was_read));
        filtered_rows_ratio = current_ration < filtered_rows_ratio
            ? current_ration
            : alpha * filtered_rows_ratio + (1.0 - alpha) * current_ration;
    }

    /// Aggressiveness of bytes_per_row updates. See update() implementation.
    /// After n=NUM_UPDATES_TO_TARGET_WEIGHT updates v_{n} = (1 - TARGET_WEIGHT) * v_{0} + TARGET_WEIGHT * v_{target}
    static constexpr double TARGET_WEIGHT = 0.5;
    static constexpr size_t NUM_UPDATES_TO_TARGET_WEIGHT = 8192;
    static double calculateDecay() { return 1. - std::pow(TARGET_WEIGHT, 1. / NUM_UPDATES_TO_TARGET_WEIGHT); }

protected:

    DataPartPtr data_part;

    struct ColumnInfo
    {
        String name;
        double bytes_per_row_global = 0;
        double bytes_per_row = 0;
        size_t size_bytes = 0;
    };

    std::vector<ColumnInfo> dynamic_columns_infos;
    size_t fixed_columns_bytes_per_row = 0;

    size_t max_size_per_row_fixed = 0;
    double max_size_per_row_dynamic = 0;

    size_t number_of_rows_in_part;

    bool is_initialized_in_update = false;

    void initialize(const Block & sample_block, const Columns & columns, const Names & names, bool from_update = false);

public:

    size_t block_size_bytes = 0;
    size_t block_size_rows = 0;

    /// Total statistics
    double bytes_per_row_current = 0;
    double bytes_per_row_global = 0;

    double filtered_rows_ratio = 0;
};

}