blob: 79532dd4d6c43396b696d85edea70c623feb8d9b (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
#include <Processors/Port.h>
#include <Processors/IProcessor.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
void connect(OutputPort & output, InputPort & input, bool reconnect)
{
if (!reconnect && input.state)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Port is already connected, (header: [{}])", input.header.dumpStructure());
if (!reconnect && output.state)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Port is already connected, (header: [{}])", output.header.dumpStructure());
auto out_name = output.processor ? output.getProcessor().getName() : "null";
auto in_name = input.processor ? input.getProcessor().getName() : "null";
assertCompatibleHeader(output.getHeader(), input.getHeader(), fmt::format("function connect between {} and {}", out_name, in_name));
input.output_port = &output;
output.input_port = &input;
input.state = std::make_shared<Port::State>();
output.state = input.state;
}
}
|