aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Processors/Sources/DelayedSource.h
blob: 0b2751e18a6024f37256839ab0ebfc0bf9b13986 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#pragma once

#include <Processors/IProcessor.h>
#include <QueryPipeline/Pipe.h>

namespace DB
{

/// DelayedSource delays pipeline calculation until it starts execution.
/// It accepts callback which creates a new pipe.
///
/// First time when DelayedSource's main output port needs data, callback is called.
/// Then, DelayedSource expands pipeline: adds new inputs and connects pipe with it.
/// Then, DelayedSource just move data from inputs to outputs until finished.
///
/// It main output port of DelayedSource is never needed, callback won't be called.
class DelayedSource : public IProcessor
{
public:
    using Creator = std::function<QueryPipelineBuilder()>;

    DelayedSource(const Block & header, Creator processors_creator, bool add_totals_port, bool add_extremes_port);
    String getName() const override { return "Delayed"; }

    Status prepare() override;
    void work() override;
    Processors expandPipeline() override;

    OutputPort & getPort() { return *main; }
    OutputPort * getTotalsPort() { return totals; }
    OutputPort * getExtremesPort() { return extremes; }

    void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) override { rows_before_limit.swap(counter); }

private:
    QueryPlanResourceHolder resources;
    Creator creator;
    Processors processors;
    RowsBeforeLimitCounterPtr rows_before_limit;

    /// Outputs for DelayedSource.
    OutputPort * main = nullptr;
    OutputPort * totals = nullptr;
    OutputPort * extremes = nullptr;

    /// Outputs from returned pipe.
    OutputPort * main_output = nullptr;
    OutputPort * totals_output = nullptr;
    OutputPort * extremes_output = nullptr;
};

/// Creates pipe from DelayedSource.
Pipe createDelayedPipe(const Block & header, DelayedSource::Creator processors_creator, bool add_totals_port, bool add_extremes_port);

}