aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Processors/QueryPlan/MergingAggregatedStep.cpp
blob: c724de02de8a25a09fc6a9dd86a64089ff3cc409 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
#include <Interpreters/Context.h>
#include <Processors/Merges/FinishAggregatingInOrderTransform.h>
#include <Processors/QueryPlan/MergingAggregatedStep.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/Transforms/MemoryBoundMerging.h>
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
#include <Processors/Transforms/MergingAggregatedTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>

namespace DB
{

static bool memoryBoundMergingWillBeUsed(
    const DataStream & input_stream,
    bool memory_bound_merging_of_aggregation_results_enabled,
    const SortDescription & group_by_sort_description)
{
    return memory_bound_merging_of_aggregation_results_enabled && !group_by_sort_description.empty()
        && input_stream.sort_scope >= DataStream::SortScope::Stream && input_stream.sort_description.hasPrefix(group_by_sort_description);
}

static ITransformingStep::Traits getTraits(bool should_produce_results_in_order_of_bucket_number)
{
    return ITransformingStep::Traits
    {
        {
            .returns_single_stream = should_produce_results_in_order_of_bucket_number,
            .preserves_number_of_streams = false,
            .preserves_sorting = false,
        },
        {
            .preserves_number_of_rows = false,
        }
    };
}

MergingAggregatedStep::MergingAggregatedStep(
    const DataStream & input_stream_,
    Aggregator::Params params_,
    bool final_,
    bool memory_efficient_aggregation_,
    size_t max_threads_,
    size_t memory_efficient_merge_threads_,
    bool should_produce_results_in_order_of_bucket_number_,
    size_t max_block_size_,
    size_t memory_bound_merging_max_block_bytes_,
    SortDescription group_by_sort_description_,
    bool memory_bound_merging_of_aggregation_results_enabled_)
    : ITransformingStep(
        input_stream_,
        params_.getHeader(input_stream_.header, final_),
        getTraits(should_produce_results_in_order_of_bucket_number_))
    , params(std::move(params_))
    , final(final_)
    , memory_efficient_aggregation(memory_efficient_aggregation_)
    , max_threads(max_threads_)
    , memory_efficient_merge_threads(memory_efficient_merge_threads_)
    , max_block_size(max_block_size_)
    , memory_bound_merging_max_block_bytes(memory_bound_merging_max_block_bytes_)
    , group_by_sort_description(std::move(group_by_sort_description_))
    , should_produce_results_in_order_of_bucket_number(should_produce_results_in_order_of_bucket_number_)
    , memory_bound_merging_of_aggregation_results_enabled(memory_bound_merging_of_aggregation_results_enabled_)
{
    if (memoryBoundMergingWillBeUsed() && should_produce_results_in_order_of_bucket_number)
    {
        output_stream->sort_description = group_by_sort_description;
        output_stream->sort_scope = DataStream::SortScope::Global;
    }
}

void MergingAggregatedStep::applyOrder(SortDescription sort_description, DataStream::SortScope sort_scope)
{
    is_order_overwritten = true;
    overwritten_sort_scope = sort_scope;

    auto & input_stream = input_streams.front();
    input_stream.sort_scope = sort_scope;
    input_stream.sort_description = sort_description;

    /// Columns might be reordered during optimisation, so we better to update sort description.
    group_by_sort_description = std::move(sort_description);

    if (memoryBoundMergingWillBeUsed() && should_produce_results_in_order_of_bucket_number)
    {
        output_stream->sort_description = group_by_sort_description;
        output_stream->sort_scope = DataStream::SortScope::Global;
    }
}

void MergingAggregatedStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
    auto transform_params = std::make_shared<AggregatingTransformParams>(pipeline.getHeader(), std::move(params), final);

    if (memoryBoundMergingWillBeUsed())
    {
        auto transform = std::make_shared<FinishAggregatingInOrderTransform>(
            pipeline.getHeader(),
            pipeline.getNumStreams(),
            transform_params,
            group_by_sort_description,
            max_block_size,
            memory_bound_merging_max_block_bytes);

        pipeline.addTransform(std::move(transform));

        /// Do merge of aggregated data in parallel.
        pipeline.resize(max_threads);

        const auto & required_sort_description
            = should_produce_results_in_order_of_bucket_number ? group_by_sort_description : SortDescription{};

        pipeline.addSimpleTransform(
            [&](const Block &) { return std::make_shared<MergingAggregatedBucketTransform>(transform_params, required_sort_description); });

        if (should_produce_results_in_order_of_bucket_number)
        {
            pipeline.addTransform(
                std::make_shared<SortingAggregatedForMemoryBoundMergingTransform>(pipeline.getHeader(), pipeline.getNumStreams()));
        }

        return;
    }

    if (!memory_efficient_aggregation)
    {
        /// We union several sources into one, paralleling the work.
        pipeline.resize(1);

        /// Now merge the aggregated blocks
        pipeline.addSimpleTransform([&](const Block & header)
                                    { return std::make_shared<MergingAggregatedTransform>(header, transform_params, max_threads); });
    }
    else
    {
        auto num_merge_threads = memory_efficient_merge_threads
                                 ? static_cast<size_t>(memory_efficient_merge_threads)
                                 : static_cast<size_t>(max_threads);

        pipeline.addMergingAggregatedMemoryEfficientTransform(transform_params, num_merge_threads);
    }

    pipeline.resize(should_produce_results_in_order_of_bucket_number ? 1 : max_threads);
}

void MergingAggregatedStep::describeActions(FormatSettings & settings) const
{
    return params.explain(settings.out, settings.offset);
}

void MergingAggregatedStep::describeActions(JSONBuilder::JSONMap & map) const
{
    params.explain(map);
}

void MergingAggregatedStep::updateOutputStream()
{
    output_stream = createOutputStream(input_streams.front(), params.getHeader(input_streams.front().header, final), getDataStreamTraits());
    if (is_order_overwritten)  /// overwrite order again
        applyOrder(group_by_sort_description, overwritten_sort_scope);
}

bool MergingAggregatedStep::memoryBoundMergingWillBeUsed() const
{
    return DB::memoryBoundMergingWillBeUsed(
        input_streams.front(), memory_bound_merging_of_aggregation_results_enabled, group_by_sort_description);
}
}