aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Processors/QueryPlan/CubeStep.cpp
blob: 0c632c346c7e9d036b19a7542b92e31cc289ef60 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#include <Processors/QueryPlan/CubeStep.h>
#include <Processors/Transforms/CubeTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>

namespace DB
{

static ITransformingStep::Traits getTraits()
{
    return ITransformingStep::Traits
    {
        {
            .returns_single_stream = true,
            .preserves_number_of_streams = false,
            .preserves_sorting = false,
        },
        {
            .preserves_number_of_rows = false,
        }
    };
}

CubeStep::CubeStep(const DataStream & input_stream_, Aggregator::Params params_, bool final_, bool use_nulls_)
    : ITransformingStep(input_stream_, generateOutputHeader(params_.getHeader(input_stream_.header, final_), params_.keys, use_nulls_), getTraits())
    , keys_size(params_.keys_size)
    , params(std::move(params_))
    , final(final_)
    , use_nulls(use_nulls_)
{
}

ProcessorPtr addGroupingSetForTotals(const Block & header, const Names & keys, bool use_nulls, const BuildQueryPipelineSettings & settings, UInt64 grouping_set_number)
{
    auto dag = std::make_shared<ActionsDAG>(header.getColumnsWithTypeAndName());
    auto & outputs = dag->getOutputs();

    if (use_nulls)
    {
        auto to_nullable = FunctionFactory::instance().get("toNullable", nullptr);
        for (const auto & key : keys)
        {
            const auto * node = dag->getOutputs()[header.getPositionByName(key)];
            if (node->result_type->canBeInsideNullable())
            {
                dag->addOrReplaceInOutputs(dag->addFunction(to_nullable, { node }, node->result_name));
            }
        }
    }

    auto grouping_col = ColumnUInt64::create(1, grouping_set_number);
    const auto * grouping_node = &dag->addColumn(
        {ColumnPtr(std::move(grouping_col)), std::make_shared<DataTypeUInt64>(), "__grouping_set"});

    grouping_node = &dag->materializeNode(*grouping_node);
    outputs.insert(outputs.begin(), grouping_node);

    auto expression = std::make_shared<ExpressionActions>(dag, settings.getActionsSettings());
    return std::make_shared<ExpressionTransform>(header, expression);
}

void CubeStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings)
{
    pipeline.resize(1);

    pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
    {
        if (stream_type == QueryPipelineBuilder::StreamType::Totals)
            return addGroupingSetForTotals(header, params.keys, use_nulls, settings, (UInt64(1) << keys_size) - 1);

        auto transform_params = std::make_shared<AggregatingTransformParams>(header, std::move(params), final);
        return std::make_shared<CubeTransform>(header, std::move(transform_params), use_nulls);
    });
}

const Aggregator::Params & CubeStep::getParams() const
{
    return params;
}

void CubeStep::updateOutputStream()
{
    output_stream = createOutputStream(
        input_streams.front(), generateOutputHeader(params.getHeader(input_streams.front().header, final), params.keys, use_nulls), getDataStreamTraits());
}
}