aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Processors/Transforms/ExtremesTransform.cpp
blob: 526bbc63374b3144627a2bbbcabb9bb7e17437e1 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#include <Processors/Transforms/ExtremesTransform.h>

#include <Core/Field.h>

namespace DB
{

ExtremesTransform::ExtremesTransform(const Block & header)
    : ISimpleTransform(header, header, true)
{
    /// Port for Extremes.
    outputs.emplace_back(outputs.front().getHeader(), this);
}

IProcessor::Status ExtremesTransform::prepare()
{
    if (!finished_transform)
    {
        auto status = ISimpleTransform::prepare();

        if (status != Status::Finished)
            return status;

        finished_transform = true;
    }

    auto & totals_output = getExtremesPort();

    /// Check can output.
    if (totals_output.isFinished())
        return Status::Finished;

    if (!totals_output.canPush())
        return Status::PortFull;

    if (!extremes && !extremes_columns.empty())
        return Status::Ready;

    if (extremes)
        totals_output.push(std::move(extremes));

    totals_output.finish();
    return Status::Finished;
}

void ExtremesTransform::work()
{
    if (finished_transform)
    {
        if (!extremes && !extremes_columns.empty())
            extremes.setColumns(std::move(extremes_columns), 2);
    }
    else
        ISimpleTransform::work();
}

void ExtremesTransform::transform(DB::Chunk & chunk)
{

    if (chunk.getNumRows() == 0)
        return;

    size_t num_columns = chunk.getNumColumns();
    const auto & columns = chunk.getColumns();

    if (extremes_columns.empty())
    {
        extremes_columns.resize(num_columns);

        for (size_t i = 0; i < num_columns; ++i)
        {
            const ColumnPtr & src = columns[i];

            if (isColumnConst(*src))
            {
                /// Equal min and max.
                extremes_columns[i] = src->cloneResized(2);
            }
            else
            {
                Field min_value;
                Field max_value;

                src->getExtremes(min_value, max_value);

                extremes_columns[i] = src->cloneEmpty();

                extremes_columns[i]->insert(min_value);
                extremes_columns[i]->insert(max_value);
            }
        }
    }
    else
    {
        for (size_t i = 0; i < num_columns; ++i)
        {
            if (isColumnConst(*extremes_columns[i]))
                continue;

            Field min_value = (*extremes_columns[i])[0];
            Field max_value = (*extremes_columns[i])[1];

            Field cur_min_value;
            Field cur_max_value;

            columns[i]->getExtremes(cur_min_value, cur_max_value);

            if (cur_min_value < min_value)
                min_value = cur_min_value;
            if (cur_max_value > max_value)
                max_value = cur_max_value;

            MutableColumnPtr new_extremes = extremes_columns[i]->cloneEmpty();

            new_extremes->insert(min_value);
            new_extremes->insert(max_value);

            extremes_columns[i] = std::move(new_extremes);
        }
    }
}

}