1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
|
#include <algorithm>
#include <memory>
#include <numeric>
#include <queue>
#include <unordered_map>
#include <vector>
#include <Core/Field.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Processors/QueryPlan/PartsSplitter.h>
#include <Processors/Transforms/FilterSortedStreamByRange.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
using namespace DB;
namespace
{
using Values = std::vector<Field>;
std::string toString(const Values & value)
{
return fmt::format("({})", fmt::join(value, ", "));
}
/// Adaptor to access PK values from index.
class IndexAccess
{
public:
explicit IndexAccess(const RangesInDataParts & parts_) : parts(parts_) { }
Values getValue(size_t part_idx, size_t mark) const
{
const auto & index = parts[part_idx].data_part->index;
Values values(index.size());
for (size_t i = 0; i < values.size(); ++i)
index[i]->get(mark, values[i]);
return values;
}
size_t getMarkRows(size_t part_idx, size_t mark) const { return parts[part_idx].data_part->index_granularity.getMarkRows(mark); }
size_t getTotalRowCount() const
{
size_t total = 0;
for (const auto & part : parts)
total += part.getRowsCount();
return total;
}
private:
const RangesInDataParts & parts;
};
/// Splits parts into layers, each layer will contain parts subranges with PK values from its own range.
/// Will try to produce exactly max_layer layers but may return less if data is distributed in not a very parallelizable way.
std::pair<std::vector<Values>, std::vector<RangesInDataParts>> split(RangesInDataParts parts, size_t max_layers)
{
// We will advance the iterator pointing to the mark with the smallest PK value until there will be not less than rows_per_layer rows in the current layer (roughly speaking).
// Then we choose the last observed value as the new border, so the current layer will consists of granules with values greater than the previous mark and less or equal
// than the new border.
struct PartsRangesIterator
{
struct MarkRangeWithPartIdx : MarkRange
{
size_t part_idx;
};
enum class EventType
{
RangeStart,
RangeEnd,
};
[[ maybe_unused ]] bool operator<(const PartsRangesIterator & other) const { return std::tie(value, event) > std::tie(other.value, other.event); }
Values value;
MarkRangeWithPartIdx range;
EventType event;
};
const auto index_access = std::make_unique<IndexAccess>(parts);
std::priority_queue<PartsRangesIterator> parts_ranges_queue;
for (size_t part_idx = 0; part_idx < parts.size(); ++part_idx)
{
for (const auto & range : parts[part_idx].ranges)
{
parts_ranges_queue.push(
{index_access->getValue(part_idx, range.begin), {range, part_idx}, PartsRangesIterator::EventType::RangeStart});
const auto & index_granularity = parts[part_idx].data_part->index_granularity;
const bool value_is_defined_at_end_mark = range.end < index_granularity.getMarksCount();
if (value_is_defined_at_end_mark)
parts_ranges_queue.push(
{index_access->getValue(part_idx, range.end), {range, part_idx}, PartsRangesIterator::EventType::RangeEnd});
}
}
/// The beginning of currently started (but not yet finished) range of marks of a part in the current layer.
std::unordered_map<size_t, size_t> current_part_range_begin;
/// The current ending of a range of marks of a part in the current layer.
std::unordered_map<size_t, size_t> current_part_range_end;
/// Determine borders between layers.
std::vector<Values> borders;
std::vector<RangesInDataParts> result_layers;
const size_t rows_per_layer = std::max<size_t>(index_access->getTotalRowCount() / max_layers, 1);
while (!parts_ranges_queue.empty())
{
// New layer should include last granules of still open ranges from the previous layer,
// because they may already contain values greater than the last border.
size_t rows_in_current_layer = 0;
size_t marks_in_current_layer = 0;
// Intersection between the current and next layers is just the last observed marks of each still open part range. Ratio is empirical.
auto layers_intersection_is_too_big = [&]()
{
const auto intersected_parts = current_part_range_end.size();
return marks_in_current_layer < intersected_parts * 2;
};
auto & current_layer = result_layers.emplace_back();
/// Map part_idx into index inside layer, used to merge marks from the same part into one reader
std::unordered_map<size_t, size_t> part_idx_in_layer;
while (rows_in_current_layer < rows_per_layer || layers_intersection_is_too_big() || result_layers.size() == max_layers)
{
// We're advancing iterators until a new value showed up.
Values last_value;
while (!parts_ranges_queue.empty() && (last_value.empty() || last_value == parts_ranges_queue.top().value))
{
auto current = parts_ranges_queue.top();
parts_ranges_queue.pop();
const auto part_idx = current.range.part_idx;
if (current.event == PartsRangesIterator::EventType::RangeEnd)
{
const auto & mark = MarkRange{current_part_range_begin[part_idx], current.range.end};
auto it = part_idx_in_layer.emplace(std::make_pair(part_idx, current_layer.size()));
if (it.second)
current_layer.emplace_back(
parts[part_idx].data_part,
parts[part_idx].alter_conversions,
parts[part_idx].part_index_in_query,
MarkRanges{mark});
else
current_layer[it.first->second].ranges.push_back(mark);
current_part_range_begin.erase(part_idx);
current_part_range_end.erase(part_idx);
continue;
}
last_value = std::move(current.value);
rows_in_current_layer += index_access->getMarkRows(part_idx, current.range.begin);
marks_in_current_layer++;
current_part_range_begin.try_emplace(part_idx, current.range.begin);
current_part_range_end[part_idx] = current.range.begin;
if (current.range.begin + 1 < current.range.end)
{
current.range.begin++;
current.value = index_access->getValue(part_idx, current.range.begin);
parts_ranges_queue.push(std::move(current));
}
}
if (parts_ranges_queue.empty())
break;
if (rows_in_current_layer >= rows_per_layer && !layers_intersection_is_too_big() && result_layers.size() < max_layers)
borders.push_back(last_value);
}
for (const auto & [part_idx, last_mark] : current_part_range_end)
{
const auto & mark = MarkRange{current_part_range_begin[part_idx], last_mark + 1};
auto it = part_idx_in_layer.emplace(std::make_pair(part_idx, current_layer.size()));
if (it.second)
result_layers.back().emplace_back(
parts[part_idx].data_part,
parts[part_idx].alter_conversions,
parts[part_idx].part_index_in_query,
MarkRanges{mark});
else
current_layer[it.first->second].ranges.push_back(mark);
current_part_range_begin[part_idx] = current_part_range_end[part_idx];
}
}
for (auto & layer : result_layers)
{
std::stable_sort(
layer.begin(),
layer.end(),
[](const auto & lhs, const auto & rhs) { return lhs.part_index_in_query < rhs.part_index_in_query; });
}
return {std::move(borders), std::move(result_layers)};
}
/// Will return borders.size()+1 filters in total, i-th filter will accept rows with PK values within the range [borders[i-1], borders[i]).
ASTs buildFilters(const KeyDescription & primary_key, const std::vector<Values> & borders)
{
auto add_and_condition = [&](ASTPtr & result, const ASTPtr & foo) { result = (!result) ? foo : makeASTFunction("and", result, foo); };
/// Produces ASTPtr to predicate (pk_col0, pk_col1, ... , pk_colN) > (value[0], value[1], ... , value[N]), possibly with conversions.
/// For example, if table PK is (a, toDate(d)), where `a` is UInt32 and `d` is DateTime, and PK columns values are (8192, 19160),
/// it will build the following predicate: greater(tuple(a, toDate(d)), tuple(8192, cast(19160, 'Date'))).
auto lexicographically_greater = [&](const Values & value)
{
// PK may contain functions of the table columns, so we need the actual PK AST with all expressions it contains.
ASTPtr pk_columns_as_tuple = makeASTFunction("tuple", primary_key.expression_list_ast->children);
ASTPtr value_ast = std::make_shared<ASTExpressionList>();
for (size_t i = 0; i < value.size(); ++i)
{
const auto & types = primary_key.data_types;
ASTPtr component_ast = std::make_shared<ASTLiteral>(value[i]);
// Values of some types (e.g. Date, DateTime) are stored in columns as numbers and we get them as just numbers from the index.
// So we need an explicit Cast for them.
if (isColumnedAsNumber(types.at(i)->getTypeId()) && !isNumber(types.at(i)->getTypeId()))
component_ast = makeASTFunction("cast", std::move(component_ast), std::make_shared<ASTLiteral>(types.at(i)->getName()));
value_ast->children.push_back(std::move(component_ast));
}
ASTPtr values_as_tuple = makeASTFunction("tuple", value_ast->children);
return makeASTFunction("greater", pk_columns_as_tuple, values_as_tuple);
};
ASTs filters(borders.size() + 1);
for (size_t layer = 0; layer <= borders.size(); ++layer)
{
if (layer > 0)
add_and_condition(filters[layer], lexicographically_greater(borders[layer - 1]));
if (layer < borders.size())
add_and_condition(filters[layer], makeASTFunction("not", lexicographically_greater(borders[layer])));
}
return filters;
}
}
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
static void reorderColumns(ActionsDAG & dag, const Block & header, const std::string & filter_column)
{
std::unordered_map<std::string_view, const ActionsDAG::Node *> inputs_map;
for (const auto * input : dag.getInputs())
inputs_map[input->result_name] = input;
for (const auto & col : header)
{
auto & input = inputs_map[col.name];
if (!input)
input = &dag.addInput(col);
}
ActionsDAG::NodeRawConstPtrs new_outputs;
new_outputs.reserve(header.columns() + 1);
new_outputs.push_back(&dag.findInOutputs(filter_column));
for (const auto & col : header)
{
auto & input = inputs_map[col.name];
new_outputs.push_back(input);
}
dag.getOutputs() = std::move(new_outputs);
}
Pipes buildPipesForReadingByPKRanges(
const KeyDescription & primary_key,
ExpressionActionsPtr sorting_expr,
RangesInDataParts parts,
size_t max_layers,
ContextPtr context,
ReadingInOrderStepGetter && reading_step_getter)
{
if (max_layers <= 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "max_layer should be greater than 1.");
auto && [borders, result_layers] = split(std::move(parts), max_layers);
auto filters = buildFilters(primary_key, borders);
Pipes pipes(result_layers.size());
for (size_t i = 0; i < result_layers.size(); ++i)
{
pipes[i] = reading_step_getter(std::move(result_layers[i]));
pipes[i].addSimpleTransform([sorting_expr](const Block & header)
{ return std::make_shared<ExpressionTransform>(header, sorting_expr); });
auto & filter_function = filters[i];
if (!filter_function)
continue;
auto syntax_result = TreeRewriter(context).analyze(filter_function, primary_key.expression->getRequiredColumnsWithTypes());
auto actions = ExpressionAnalyzer(filter_function, syntax_result, context).getActionsDAG(false);
reorderColumns(*actions, pipes[i].getHeader(), filter_function->getColumnName());
ExpressionActionsPtr expression_actions = std::make_shared<ExpressionActions>(std::move(actions));
auto description = fmt::format(
"filter values in [{}, {})", i ? ::toString(borders[i - 1]) : "-inf", i < borders.size() ? ::toString(borders[i]) : "+inf");
pipes[i].addSimpleTransform(
[&](const Block & header)
{
auto step = std::make_shared<FilterSortedStreamByRange>(header, expression_actions, filter_function->getColumnName(), true);
step->setDescription(description);
return step;
});
}
return pipes;
}
}
|