aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Processors/Transforms/MergingAggregatedTransform.cpp
blob: e4955d06859075db1fc70274bfd5186dcbb4527f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#include <Processors/Transforms/MergingAggregatedTransform.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/Transforms/AggregatingInOrderTransform.h>
#include <Common/logger_useful.h>

namespace DB
{
namespace ErrorCodes
{
    extern const int LOGICAL_ERROR;
}

MergingAggregatedTransform::MergingAggregatedTransform(
    Block header_, AggregatingTransformParamsPtr params_, size_t max_threads_)
    : IAccumulatingTransform(std::move(header_), params_->getHeader())
    , params(std::move(params_)), max_threads(max_threads_)
{
}

void MergingAggregatedTransform::consume(Chunk chunk)
{
    if (!consume_started)
    {
        consume_started = true;
        LOG_TRACE(log, "Reading blocks of partially aggregated data.");
    }

    size_t input_rows = chunk.getNumRows();
    if (!input_rows)
        return;

    total_input_rows += input_rows;
    ++total_input_blocks;

    const auto & info = chunk.getChunkInfo();
    if (!info)
        throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk info was not set for chunk in MergingAggregatedTransform.");

    if (const auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(info.get()))
    {
        /** If the remote servers used a two-level aggregation method,
        *  then blocks will contain information about the number of the bucket.
        * Then the calculations can be parallelized by buckets.
        * We decompose the blocks to the bucket numbers indicated in them.
        */

        auto block = getInputPort().getHeader().cloneWithColumns(chunk.getColumns());
        block.info.is_overflows = agg_info->is_overflows;
        block.info.bucket_num = agg_info->bucket_num;

        bucket_to_blocks[agg_info->bucket_num].emplace_back(std::move(block));
    }
    else if (typeid_cast<const ChunkInfoWithAllocatedBytes *>(info.get()))
    {
        auto block = getInputPort().getHeader().cloneWithColumns(chunk.getColumns());
        block.info.is_overflows = false;
        block.info.bucket_num = -1;

        bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(block));
    }
    else
        throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk should have AggregatedChunkInfo in MergingAggregatedTransform.");
}

Chunk MergingAggregatedTransform::generate()
{
    if (!generate_started)
    {
        generate_started = true;
        LOG_DEBUG(log, "Read {} blocks of partially aggregated data, total {} rows.", total_input_blocks, total_input_rows);

        /// Exception safety. Make iterator valid in case any method below throws.
        next_block = blocks.begin();

        /// TODO: this operation can be made async. Add async for IAccumulatingTransform.
        params->aggregator.mergeBlocks(std::move(bucket_to_blocks), data_variants, max_threads);
        blocks = params->aggregator.convertToBlocks(data_variants, params->final, max_threads);
        next_block = blocks.begin();
    }

    if (next_block == blocks.end())
        return {};

    auto block = std::move(*next_block);
    ++next_block;

    auto info = std::make_shared<AggregatedChunkInfo>();
    info->bucket_num = block.info.bucket_num;
    info->is_overflows = block.info.is_overflows;

    UInt64 num_rows = block.rows();
    Chunk chunk(block.getColumns(), num_rows);
    chunk.setChunkInfo(std::move(info));

    return chunk;
}

}