aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Processors/QueryPlan/Optimizations/enableMemoryBoundMerging.cpp
blob: 729efe2ba4edc3c1757a0529927653192a7104eb (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/ReadFromRemote.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/MergingAggregatedStep.h>
#include <Processors/QueryPlan/UnionStep.h>

namespace DB::QueryPlanOptimizations
{

/// We are trying to find a part of plan like
///
///          - ReadFromRemote (x N)
///  - Union - ReadFromParallelRemoteReplicasStep (x M)
///          - Aggregating/MergingAggregated
///
/// and enable memory bound merging for remote steps if it was enabled for local aggregation.
void enableMemoryBoundMerging(QueryPlan::Node & node, QueryPlan::Nodes &)
{
    auto * root_mergine_aggeregated = typeid_cast<MergingAggregatedStep *>(node.step.get());
    if (!root_mergine_aggeregated)
        return;

    const auto & union_node = *node.children.front();
    auto * union_step = typeid_cast<UnionStep *>(union_node.step.get());
    if (!union_step)
        return;

    std::vector<ReadFromRemote *> reading_steps;
    std::vector<ReadFromParallelRemoteReplicasStep *> async_reading_steps;
    IQueryPlanStep * local_plan = nullptr;

    reading_steps.reserve((union_node.children.size()));
    async_reading_steps.reserve((union_node.children.size()));

    for (const auto & child : union_node.children)
    {
        auto * child_node = child->step.get();
        if (auto * reading_step = typeid_cast<ReadFromRemote *>(child_node))
            reading_steps.push_back(reading_step);
        else if (auto * async_reading_step = typeid_cast<ReadFromParallelRemoteReplicasStep *>(child_node))
            async_reading_steps.push_back(async_reading_step);
        else if (local_plan)
            /// Usually there is a single local plan.
            /// TODO: we can support many local plans and calculate common sort description prefix. Do we need it?
            return;
        else
            local_plan = child_node;
    }

    /// We determine output stream sort properties by a local plan (local because otherwise table could be unknown).
    /// If no local shard exist for this cluster, no sort properties will be provided, c'est la vie.
    if (local_plan == nullptr || (reading_steps.empty() && async_reading_steps.empty()))
        return;

    SortDescription sort_description;
    bool enforce_aggregation_in_order = false;

    if (auto * aggregating_step = typeid_cast<AggregatingStep *>(local_plan))
    {
        if (aggregating_step->memoryBoundMergingWillBeUsed())
        {
            sort_description = aggregating_step->getOutputStream().sort_description;
            enforce_aggregation_in_order = true;
        }
    }
    else if (auto * mergine_aggeregated = typeid_cast<MergingAggregatedStep *>(local_plan))
    {
        if (mergine_aggeregated->memoryBoundMergingWillBeUsed())
        {
            sort_description = mergine_aggeregated->getOutputStream().sort_description;
        }
    }

    if (sort_description.empty())
        return;

    for (auto & reading : reading_steps)
    {
        reading->enforceSorting(sort_description);
        if (enforce_aggregation_in_order)
            reading->enforceAggregationInOrder();
    }

    for (auto & reading : async_reading_steps)
    {
        reading->enforceSorting(sort_description);
        if (enforce_aggregation_in_order)
            reading->enforceAggregationInOrder();
    }

    root_mergine_aggeregated->applyOrder(sort_description, DataStream::SortScope::Stream);
}

}