1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
#include <Processors/Transforms/RollupTransform.h>
#include <Processors/Transforms/TotalsHavingTransform.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Columns/ColumnNullable.h>
namespace DB
{
GroupByModifierTransform::GroupByModifierTransform(Block header, AggregatingTransformParamsPtr params_, bool use_nulls_)
: IAccumulatingTransform(std::move(header), generateOutputHeader(params_->getHeader(), params_->params.keys, use_nulls_))
, params(std::move(params_))
, use_nulls(use_nulls_)
{
keys.reserve(params->params.keys_size);
for (const auto & key : params->params.keys)
keys.emplace_back(input.getHeader().getPositionByName(key));
intermediate_header = getOutputPort().getHeader();
intermediate_header.erase(0);
if (use_nulls)
{
auto output_aggregator_params = params->params;
output_aggregator = std::make_unique<Aggregator>(intermediate_header, output_aggregator_params);
}
}
void GroupByModifierTransform::consume(Chunk chunk)
{
consumed_chunks.emplace_back(std::move(chunk));
}
void GroupByModifierTransform::mergeConsumed()
{
if (consumed_chunks.size() > 1)
current_chunk = merge(std::move(consumed_chunks), true, false);
else
current_chunk = std::move(consumed_chunks.front());
size_t rows = current_chunk.getNumRows();
auto columns = current_chunk.getColumns();
if (use_nulls)
{
for (auto key : keys)
columns[key] = makeNullableSafe(columns[key]);
}
current_chunk = Chunk{ columns, rows };
consumed_chunks.clear();
}
Chunk GroupByModifierTransform::merge(Chunks && chunks, bool is_input, bool final)
{
auto header = is_input ? getInputPort().getHeader() : intermediate_header;
BlocksList blocks;
for (auto & chunk : chunks)
blocks.emplace_back(header.cloneWithColumns(chunk.detachColumns()));
auto current_block = is_input ? params->aggregator.mergeBlocks(blocks, final) : output_aggregator->mergeBlocks(blocks, final);
auto num_rows = current_block.rows();
return Chunk(current_block.getColumns(), num_rows);
}
MutableColumnPtr GroupByModifierTransform::getColumnWithDefaults(size_t key, size_t n) const
{
auto const & col = intermediate_header.getByPosition(key);
auto result_column = col.column->cloneEmpty();
col.type->insertManyDefaultsInto(*result_column, n);
return result_column;
}
RollupTransform::RollupTransform(Block header, AggregatingTransformParamsPtr params_, bool use_nulls_)
: GroupByModifierTransform(std::move(header), params_, use_nulls_)
, aggregates_mask(getAggregatesMask(params->getHeader(), params->params.aggregates))
{}
MutableColumnPtr getColumnWithDefaults(Block const & header, size_t key, size_t n)
{
auto const & col = header.getByPosition(key);
auto result_column = col.column->cloneEmpty();
col.type->insertManyDefaultsInto(*result_column, n);
return result_column;
}
Chunk RollupTransform::generate()
{
if (!consumed_chunks.empty())
{
mergeConsumed();
last_removed_key = keys.size();
}
auto gen_chunk = std::move(current_chunk);
if (last_removed_key)
{
--last_removed_key;
auto key = keys[last_removed_key];
auto num_rows = gen_chunk.getNumRows();
auto columns = gen_chunk.getColumns();
columns[key] = getColumnWithDefaults(key, num_rows);
Chunks chunks;
chunks.emplace_back(std::move(columns), num_rows);
current_chunk = merge(std::move(chunks), !use_nulls, false);
}
finalizeChunk(gen_chunk, aggregates_mask);
if (!gen_chunk.empty())
gen_chunk.addColumn(0, ColumnUInt64::create(gen_chunk.getNumRows(), set_counter++));
return gen_chunk;
}
}
|