blob: f4648caf0f0553bfabc31dc84f018f626216d4b3 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
#include <Processors/ConcatProcessor.h>
namespace DB
{
ConcatProcessor::ConcatProcessor(const Block & header, size_t num_inputs)
: IProcessor(InputPorts(num_inputs, header), OutputPorts{header}), current_input(inputs.begin())
{
}
ConcatProcessor::Status ConcatProcessor::prepare()
{
auto & output = outputs.front();
/// Check can output.
if (output.isFinished())
{
for (; current_input != inputs.end(); ++current_input)
current_input->close();
return Status::Finished;
}
if (!output.isNeeded())
{
if (current_input != inputs.end())
current_input->setNotNeeded();
return Status::PortFull;
}
if (!output.canPush())
return Status::PortFull;
/// Check can input.
while (current_input != inputs.end() && current_input->isFinished())
++current_input;
if (current_input == inputs.end())
{
output.finish();
return Status::Finished;
}
auto & input = *current_input;
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
/// Move data.
output.push(input.pull());
/// Now, we pushed to output, and it must be full.
return Status::PortFull;
}
}
|