1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
|
#pragma once
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Storages/MergeTree/RequestResponse.h>
#include <Processors/Chunk.h>
namespace DB
{
class IMergeTreeReader;
class UncompressedCache;
class MarkCache;
struct PrewhereExprInfo;
struct ChunkAndProgress
{
Chunk chunk;
size_t num_read_rows = 0;
size_t num_read_bytes = 0;
/// Explicitly indicate that we have read all data.
/// This is needed to occasionally return empty chunk to indicate the progress while the rows are filtered out in PREWHERE.
bool is_finished = false;
};
struct ParallelReadingExtension
{
MergeTreeAllRangesCallback all_callback;
MergeTreeReadTaskCallback callback;
size_t count_participating_replicas{0};
size_t number_of_current_replica{0};
/// This is needed to estimate the number of bytes
/// between a pair of marks to perform one request
/// over the network for a 1Gb of data.
Names columns_to_read;
};
/// Base class for MergeTreeThreadSelectAlgorithm and MergeTreeSelectAlgorithm
class IMergeTreeSelectAlgorithm
{
public:
IMergeTreeSelectAlgorithm(
Block header,
const MergeTreeData & storage_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
const ExpressionActionsSettings & actions_settings,
UInt64 max_block_size_rows_,
UInt64 preferred_block_size_bytes_,
UInt64 preferred_max_column_in_block_size_bytes_,
const MergeTreeReaderSettings & reader_settings_,
bool use_uncompressed_cache_,
const Names & virt_column_names_ = {});
virtual ~IMergeTreeSelectAlgorithm();
static Block transformHeader(
Block block, const PrewhereInfoPtr & prewhere_info, const DataTypePtr & partition_value_type, const Names & virtual_columns);
static std::unique_ptr<MergeTreeBlockSizePredictor> getSizePredictor(
const MergeTreeData::DataPartPtr & data_part,
const MergeTreeReadTaskColumns & task_columns,
const Block & sample_block);
Block getHeader() const { return result_header; }
ChunkAndProgress read();
void cancel() { is_cancelled = true; }
const MergeTreeReaderSettings & getSettings() const { return reader_settings; }
virtual std::string getName() const = 0;
static PrewhereExprInfo getPrewhereActions(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings, bool enable_multiple_prewhere_read_steps);
protected:
/// This struct allow to return block with no columns but with non-zero number of rows similar to Chunk
struct BlockAndProgress
{
Block block;
size_t row_count = 0;
size_t num_read_rows = 0;
size_t num_read_bytes = 0;
};
/// Creates new this->task and return a flag whether it was successful or not
virtual bool getNewTaskImpl() = 0;
/// Creates new readers for a task it is needed. These methods are separate, because
/// in case of parallel reading from replicas the whole task could be denied by a coodinator
/// or it could modified somehow.
virtual void finalizeNewTask() = 0;
size_t estimateMaxBatchSizeForHugeRanges();
/// Closes readers and unlock part locks
virtual void finish() = 0;
virtual BlockAndProgress readFromPart();
BlockAndProgress readFromPartImpl();
/// Used for filling header with no rows as well as block with data
static void
injectVirtualColumns(Block & block, size_t row_count, MergeTreeReadTask * task, const DataTypePtr & partition_value_type, const Names & virtual_columns);
protected:
static void initializeRangeReadersImpl(
MergeTreeRangeReader & range_reader,
std::deque<MergeTreeRangeReader> & pre_range_readers,
const PrewhereExprInfo & prewhere_actions,
IMergeTreeReader * reader,
bool has_lightweight_delete,
const MergeTreeReaderSettings & reader_settings,
const std::vector<std::unique_ptr<IMergeTreeReader>> & pre_reader_for_step,
const PrewhereExprStep & lightweight_delete_filter_step,
const Names & non_const_virtual_column_names);
/// Sets up data readers for each step of prewhere and where
void initializeMergeTreeReadersForCurrentTask(
const IMergeTreeReader::ValueSizeMap & value_size_map,
const ReadBufferFromFileBase::ProfileCallback & profile_callback);
void initializeMergeTreeReadersForPart(
const MergeTreeData::DataPartPtr & data_part,
const AlterConversionsPtr & alter_conversions,
const MergeTreeReadTaskColumns & task_columns,
const MarkRanges & mark_ranges,
const IMergeTreeReader::ValueSizeMap & value_size_map,
const ReadBufferFromFileBase::ProfileCallback & profile_callback);
/// Sets up range readers corresponding to data readers
void initializeRangeReaders(MergeTreeReadTask & task);
const MergeTreeData & storage;
StorageSnapshotPtr storage_snapshot;
/// This step is added when the part has lightweight delete mask
const PrewhereExprStep lightweight_delete_filter_step
{
.type = PrewhereExprStep::Filter,
.actions = nullptr,
.filter_column_name = LightweightDeleteDescription::FILTER_COLUMN.name,
.remove_filter_column = true,
.need_filter = true,
.perform_alter_conversions = true,
};
PrewhereInfoPtr prewhere_info;
ExpressionActionsSettings actions_settings;
PrewhereExprInfo prewhere_actions;
UInt64 max_block_size_rows;
UInt64 preferred_block_size_bytes;
UInt64 preferred_max_column_in_block_size_bytes;
MergeTreeReaderSettings reader_settings;
bool use_uncompressed_cache;
Names virt_column_names;
/// These columns will be filled by the merge tree range reader
Names non_const_virtual_column_names;
DataTypePtr partition_value_type;
/// This header is used for chunks from readFromPart().
Block header_without_const_virtual_columns;
/// A result of getHeader(). A chunk which this header is returned from read().
Block result_header;
UncompressedCachePtr owned_uncompressed_cache;
MarkCachePtr owned_mark_cache;
using MergeTreeReaderPtr = std::unique_ptr<IMergeTreeReader>;
MergeTreeReaderPtr reader;
std::vector<MergeTreeReaderPtr> pre_reader_for_step;
MergeTreeReadTaskPtr task;
/// This setting is used in base algorithm only to additionally limit the number of granules to read.
/// It is changed in ctor of MergeTreeThreadSelectAlgorithm.
///
/// The reason why we have it here is because MergeTreeReadPool takes the full task
/// ignoring min_marks_to_read setting in case of remote disk (see MergeTreeReadPool::getTask).
/// In this case, we won't limit the number of rows to read based on adaptive granularity settings.
///
/// Big reading tasks are better for remote disk and prefetches.
/// So, for now it's easier to limit max_rows_to_read.
/// Somebody need to refactor this later.
size_t min_marks_to_read = 0;
private:
Poco::Logger * log = &Poco::Logger::get("MergeTreeBaseSelectProcessor");
std::atomic<bool> is_cancelled{false};
bool getNewTask();
/// Initialize pre readers.
void initializeMergeTreePreReadersForPart(
const MergeTreeData::DataPartPtr & data_part,
const AlterConversionsPtr & alter_conversions,
const MergeTreeReadTaskColumns & task_columns,
const MarkRanges & mark_ranges,
const IMergeTreeReader::ValueSizeMap & value_size_map,
const ReadBufferFromFileBase::ProfileCallback & profile_callback);
static Block applyPrewhereActions(Block block, const PrewhereInfoPtr & prewhere_info);
};
using MergeTreeSelectAlgorithmPtr = std::unique_ptr<IMergeTreeSelectAlgorithm>;
}
|