1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
|
#include <Processors/QueryPlan/IntersectOrExceptStep.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/IntersectOrExceptTransform.h>
#include <Processors/ResizeProcessor.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
static Block checkHeaders(const DataStreams & input_streams_)
{
if (input_streams_.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot perform intersect/except on empty set of query plan steps");
Block res = input_streams_.front().header;
for (const auto & stream : input_streams_)
assertBlocksHaveEqualStructure(stream.header, res, "IntersectOrExceptStep");
return res;
}
IntersectOrExceptStep::IntersectOrExceptStep(
DataStreams input_streams_, Operator operator_, size_t max_threads_)
: header(checkHeaders(input_streams_))
, current_operator(operator_)
, max_threads(max_threads_)
{
input_streams = std::move(input_streams_);
output_stream = DataStream{.header = header};
}
QueryPipelineBuilderPtr IntersectOrExceptStep::updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings &)
{
auto pipeline = std::make_unique<QueryPipelineBuilder>();
if (pipelines.empty())
{
QueryPipelineProcessorsCollector collector(*pipeline, this);
pipeline->init(Pipe(std::make_shared<NullSource>(output_stream->header)));
processors = collector.detachProcessors();
return pipeline;
}
for (auto & cur_pipeline : pipelines)
{
/// Just in case.
if (!isCompatibleHeader(cur_pipeline->getHeader(), getOutputStream().header))
{
QueryPipelineProcessorsCollector collector(*cur_pipeline, this);
auto converting_dag = ActionsDAG::makeConvertingActions(
cur_pipeline->getHeader().getColumnsWithTypeAndName(),
getOutputStream().header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
auto converting_actions = std::make_shared<ExpressionActions>(std::move(converting_dag));
cur_pipeline->addSimpleTransform([&](const Block & cur_header)
{
return std::make_shared<ExpressionTransform>(cur_header, converting_actions);
});
auto added_processors = collector.detachProcessors();
processors.insert(processors.end(), added_processors.begin(), added_processors.end());
}
/// For the case of union.
cur_pipeline->addTransform(std::make_shared<ResizeProcessor>(header, cur_pipeline->getNumStreams(), 1));
}
*pipeline = QueryPipelineBuilder::unitePipelines(std::move(pipelines), max_threads, &processors);
auto transform = std::make_shared<IntersectOrExceptTransform>(header, current_operator);
processors.push_back(transform);
pipeline->addTransform(std::move(transform));
return pipeline;
}
void IntersectOrExceptStep::describePipeline(FormatSettings & settings) const
{
IQueryPlanStep::describePipeline(processors, settings);
}
}
|