aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Processors/IAccumulatingTransform.cpp
blob: 4136fc5a5f2580ca378eede931cb2948730eec6b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#include <Processors/IAccumulatingTransform.h>

namespace DB
{
namespace ErrorCodes
{
    extern const int LOGICAL_ERROR;
}

IAccumulatingTransform::IAccumulatingTransform(Block input_header, Block output_header)
    : IProcessor({std::move(input_header)}, {std::move(output_header)}),
    input(inputs.front()), output(outputs.front())
{
}

IAccumulatingTransform::Status IAccumulatingTransform::prepare()
{
    /// Check can output.
    if (output.isFinished())
    {
        for (auto & in : inputs)
            in.close();

        return Status::Finished;
    }

    if (!output.canPush())
    {
        input.setNotNeeded();
        return Status::PortFull;
    }

    /// Output if has data.
    if (current_output_chunk)
        output.push(std::move(current_output_chunk));

    if (finished_generate)
    {
        output.finish();
        return Status::Finished;
    }

    if (input.isFinished())
        finished_input = true;

    if (finished_input)
    {
        /// Close input if flag was set manually.
        input.close();

        /// Read from totals port if has it.
        if (inputs.size() > 1)
        {
            auto & totals_input = inputs.back();
            if (!totals_input.isFinished())
            {
                totals_input.setNeeded();
                if (!totals_input.hasData())
                    return Status::NeedData;

                totals = totals_input.pull();
                totals_input.close();
            }
        }

        /// Generate output block.
        return Status::Ready;
    }

    /// Check can input.
    if (!has_input)
    {
        input.setNeeded();
        if (!input.hasData())
            return Status::NeedData;

        current_input_chunk = input.pull();
        has_input = true;
    }

    return Status::Ready;
}

void IAccumulatingTransform::work()
{
    if (!finished_input)
    {
        consume(std::move(current_input_chunk));
        has_input = false;
    }
    else
    {
        current_output_chunk = generate();
        if (!current_output_chunk)
            finished_generate = true;
    }
}

void IAccumulatingTransform::setReadyChunk(Chunk chunk)
{
    if (current_output_chunk)
        throw Exception(ErrorCodes::LOGICAL_ERROR,
                        "IAccumulatingTransform already has input. "
                        "Cannot set another chunk. Probably, setReadyChunk method was called twice per consume().");

    current_output_chunk = std::move(chunk);
}

}