1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
|
#include <stdexcept>
#include <IO/Operators.h>
#include <Processors/Merges/MergingSortedTransform.h>
#include <Processors/QueryPlan/SortingStep.h>
#include <Processors/Transforms/FinishSortingTransform.h>
#include <Processors/Transforms/LimitsCheckingTransform.h>
#include <Processors/Transforms/MergeSortingTransform.h>
#include <Processors/Transforms/PartialSortingTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Common/JSONBuilder.h>
namespace CurrentMetrics
{
extern const Metric TemporaryFilesForSort;
}
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
SortingStep::Settings::Settings(const Context & context)
{
const auto & settings = context.getSettingsRef();
max_block_size = settings.max_block_size;
size_limits = SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode);
max_bytes_before_remerge = settings.max_bytes_before_remerge_sort;
remerge_lowered_memory_bytes_ratio = settings.remerge_sort_lowered_memory_bytes_ratio;
max_bytes_before_external_sort = settings.max_bytes_before_external_sort;
tmp_data = context.getTempDataOnDisk();
min_free_disk_space = settings.min_free_disk_space_for_temporary_data;
}
SortingStep::Settings::Settings(size_t max_block_size_)
{
max_block_size = max_block_size_;
}
static ITransformingStep::Traits getTraits(size_t limit)
{
return ITransformingStep::Traits
{
{
.returns_single_stream = true,
.preserves_number_of_streams = false,
.preserves_sorting = false,
},
{
.preserves_number_of_rows = limit == 0,
}
};
}
SortingStep::SortingStep(
const DataStream & input_stream,
SortDescription description_,
UInt64 limit_,
const Settings & settings_,
bool optimize_sorting_by_input_stream_properties_)
: ITransformingStep(input_stream, input_stream.header, getTraits(limit_))
, type(Type::Full)
, result_description(std::move(description_))
, limit(limit_)
, sort_settings(settings_)
, optimize_sorting_by_input_stream_properties(optimize_sorting_by_input_stream_properties_)
{
if (sort_settings.max_bytes_before_external_sort && sort_settings.tmp_data == nullptr)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary data storage for external sorting is not provided");
/// TODO: check input_stream is partially sorted by the same description.
output_stream->sort_description = result_description;
output_stream->sort_scope = DataStream::SortScope::Global;
}
SortingStep::SortingStep(
const DataStream & input_stream_,
SortDescription prefix_description_,
SortDescription result_description_,
size_t max_block_size_,
UInt64 limit_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits(limit_))
, type(Type::FinishSorting)
, prefix_description(std::move(prefix_description_))
, result_description(std::move(result_description_))
, limit(limit_)
, sort_settings(max_block_size_)
{
/// TODO: check input_stream is sorted by prefix_description.
output_stream->sort_description = result_description;
output_stream->sort_scope = DataStream::SortScope::Global;
}
SortingStep::SortingStep(
const DataStream & input_stream,
SortDescription sort_description_,
size_t max_block_size_,
UInt64 limit_,
bool always_read_till_end_)
: ITransformingStep(input_stream, input_stream.header, getTraits(limit_))
, type(Type::MergingSorted)
, result_description(std::move(sort_description_))
, limit(limit_)
, always_read_till_end(always_read_till_end_)
, sort_settings(max_block_size_)
{
sort_settings.max_block_size = max_block_size_;
/// TODO: check input_stream is partially sorted (each port) by the same description.
output_stream->sort_description = result_description;
output_stream->sort_scope = DataStream::SortScope::Global;
}
void SortingStep::updateOutputStream()
{
output_stream = createOutputStream(input_streams.front(), input_streams.front().header, getDataStreamTraits());
output_stream->sort_description = result_description;
output_stream->sort_scope = DataStream::SortScope::Global;
}
void SortingStep::updateLimit(size_t limit_)
{
if (limit_ && (limit == 0 || limit_ < limit))
{
limit = limit_;
transform_traits.preserves_number_of_rows = false;
}
}
void SortingStep::convertToFinishSorting(SortDescription prefix_description_)
{
type = Type::FinishSorting;
prefix_description = std::move(prefix_description_);
}
void SortingStep::finishSorting(
QueryPipelineBuilder & pipeline, const SortDescription & input_sort_desc, const SortDescription & result_sort_desc, const UInt64 limit_)
{
pipeline.addSimpleTransform(
[&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipelineBuilder::StreamType::Main)
return nullptr;
return std::make_shared<PartialSortingTransform>(header, result_sort_desc, limit_);
});
bool increase_sort_description_compile_attempts = true;
/// NOTE limits are not applied to the size of temporary sets in FinishSortingTransform
pipeline.addSimpleTransform(
[&, increase_sort_description_compile_attempts](const Block & header) mutable -> ProcessorPtr
{
/** For multiple FinishSortingTransform we need to count identical comparators only once per QueryPlan
* To property support min_count_to_compile_sort_description.
*/
bool increase_sort_description_compile_attempts_current = increase_sort_description_compile_attempts;
if (increase_sort_description_compile_attempts)
increase_sort_description_compile_attempts = false;
return std::make_shared<FinishSortingTransform>(
header, input_sort_desc, result_sort_desc, sort_settings.max_block_size, limit_, increase_sort_description_compile_attempts_current);
});
}
void SortingStep::mergingSorted(QueryPipelineBuilder & pipeline, const SortDescription & result_sort_desc, const UInt64 limit_)
{
/// If there are several streams, then we merge them into one
if (pipeline.getNumStreams() > 1)
{
auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
result_sort_desc,
sort_settings.max_block_size,
/*max_block_size_bytes=*/0,
SortingQueueStrategy::Batch,
limit_,
always_read_till_end);
pipeline.addTransform(std::move(transform));
}
}
void SortingStep::mergeSorting(
QueryPipelineBuilder & pipeline, const Settings & sort_settings, const SortDescription & result_sort_desc, UInt64 limit_)
{
bool increase_sort_description_compile_attempts = true;
pipeline.addSimpleTransform(
[&, increase_sort_description_compile_attempts](
const Block & header, QueryPipelineBuilder::StreamType stream_type) mutable -> ProcessorPtr
{
if (stream_type == QueryPipelineBuilder::StreamType::Totals)
return nullptr;
// For multiple FinishSortingTransform we need to count identical comparators only once per QueryPlan.
// To property support min_count_to_compile_sort_description.
bool increase_sort_description_compile_attempts_current = increase_sort_description_compile_attempts;
if (increase_sort_description_compile_attempts)
increase_sort_description_compile_attempts = false;
auto tmp_data_on_disk = sort_settings.tmp_data
? std::make_unique<TemporaryDataOnDisk>(sort_settings.tmp_data, CurrentMetrics::TemporaryFilesForSort)
: std::unique_ptr<TemporaryDataOnDisk>();
return std::make_shared<MergeSortingTransform>(
header,
result_sort_desc,
sort_settings.max_block_size,
limit_,
increase_sort_description_compile_attempts_current,
sort_settings.max_bytes_before_remerge / pipeline.getNumStreams(),
sort_settings.remerge_lowered_memory_bytes_ratio,
sort_settings.max_bytes_before_external_sort,
std::move(tmp_data_on_disk),
sort_settings.min_free_disk_space);
});
}
void SortingStep::fullSortStreams(
QueryPipelineBuilder & pipeline,
const Settings & sort_settings,
const SortDescription & result_sort_desc,
const UInt64 limit_,
const bool skip_partial_sort)
{
if (!skip_partial_sort || limit_)
{
pipeline.addSimpleTransform(
[&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipelineBuilder::StreamType::Main)
return nullptr;
return std::make_shared<PartialSortingTransform>(header, result_sort_desc, limit_);
});
StreamLocalLimits limits;
limits.mode = LimitsMode::LIMITS_CURRENT;
limits.size_limits = sort_settings.size_limits;
pipeline.addSimpleTransform(
[&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipelineBuilder::StreamType::Main)
return nullptr;
return std::make_shared<LimitsCheckingTransform>(header, limits);
});
}
mergeSorting(pipeline, sort_settings, result_sort_desc, limit_);
}
void SortingStep::fullSort(
QueryPipelineBuilder & pipeline, const SortDescription & result_sort_desc, const UInt64 limit_, const bool skip_partial_sort)
{
fullSortStreams(pipeline, sort_settings, result_sort_desc, limit_, skip_partial_sort);
/// If there are several streams, then we merge them into one
if (pipeline.getNumStreams() > 1)
{
auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
result_sort_desc,
sort_settings.max_block_size,
/*max_block_size_bytes=*/0,
SortingQueueStrategy::Batch,
limit_,
always_read_till_end);
pipeline.addTransform(std::move(transform));
}
}
void SortingStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
/// We consider that a caller has more information what type of sorting to apply.
/// The type depends on constructor used to create sorting step.
/// So we'll try to infer sorting to use only in case of Full sorting
if (type == Type::MergingSorted)
{
mergingSorted(pipeline, result_description, limit);
return;
}
if (type == Type::FinishSorting)
{
bool need_finish_sorting = (prefix_description.size() < result_description.size());
mergingSorted(pipeline, prefix_description, (need_finish_sorting ? 0 : limit));
if (need_finish_sorting)
{
finishSorting(pipeline, prefix_description, result_description, limit);
}
return;
}
const auto input_sort_mode = input_streams.front().sort_scope;
const SortDescription & input_sort_desc = input_streams.front().sort_description;
if (optimize_sorting_by_input_stream_properties)
{
/// skip sorting if stream is already sorted
if (input_sort_mode == DataStream::SortScope::Global && input_sort_desc.hasPrefix(result_description))
{
if (pipeline.getNumStreams() != 1)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"If input stream is globally sorted then there should be only 1 input stream at this stage. Number of input streams: "
"{}",
pipeline.getNumStreams());
return;
}
/// merge sorted
if (input_sort_mode == DataStream::SortScope::Stream && input_sort_desc.hasPrefix(result_description))
{
mergingSorted(pipeline, result_description, limit);
return;
}
/// if chunks already sorted according to result_sort_desc, then we can skip chunk sorting
if (input_sort_mode == DataStream::SortScope::Chunk && input_sort_desc.hasPrefix(result_description))
{
const bool skip_partial_sort = true;
fullSort(pipeline, result_description, limit, skip_partial_sort);
return;
}
}
fullSort(pipeline, result_description, limit);
}
void SortingStep::describeActions(FormatSettings & settings) const
{
String prefix(settings.offset, ' ');
if (!prefix_description.empty())
{
settings.out << prefix << "Prefix sort description: ";
dumpSortDescription(prefix_description, settings.out);
settings.out << '\n';
settings.out << prefix << "Result sort description: ";
dumpSortDescription(result_description, settings.out);
settings.out << '\n';
}
else
{
settings.out << prefix << "Sort description: ";
dumpSortDescription(result_description, settings.out);
settings.out << '\n';
}
if (limit)
settings.out << prefix << "Limit " << limit << '\n';
}
void SortingStep::describeActions(JSONBuilder::JSONMap & map) const
{
if (!prefix_description.empty())
{
map.add("Prefix Sort Description", explainSortDescription(prefix_description));
map.add("Result Sort Description", explainSortDescription(result_description));
}
else
map.add("Sort Description", explainSortDescription(result_description));
if (limit)
map.add("Limit", limit);
}
}
|