aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Processors/DelayedPortsProcessor.h
blob: 667667bbb916ca91d090ceb415a4fa21fd82bf7c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#pragma once
#include <Processors/IProcessor.h>

namespace DB
{

/// Processor with N inputs and N outputs. Only moves data from i-th input to i-th output as is.
/// Some ports are delayed. Delayed ports are processed after other outputs are all finished.
/// Data between ports is not mixed. It is important because this processor can be used before MergingSortedTransform.
/// Delayed ports are appeared after joins, when some non-matched data need to be processed at the end.
class DelayedPortsProcessor final : public IProcessor
{
public:
    DelayedPortsProcessor(const Block & header, size_t num_ports, const PortNumbers & delayed_ports, bool assert_main_ports_empty = false);

    String getName() const override { return "DelayedPorts"; }

    Status prepare(const PortNumbers &, const PortNumbers &) override;

private:

    struct PortsPair
    {
        InputPort * input_port = nullptr;
        OutputPort * output_port = nullptr;
        bool is_delayed = false;
        bool is_finished = false;
    };

    std::vector<PortsPair> port_pairs;
    const size_t num_delayed_ports;
    size_t num_finished_inputs = 0;
    size_t num_finished_outputs = 0;
    size_t num_finished_main_inputs = 0;

    std::vector<size_t> output_to_pair;
    bool are_inputs_initialized = false;

    bool processPair(PortsPair & pair);
    void finishPair(PortsPair & pair);
    bool shouldSkipDelayed() const;
};

}