aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/QueryPipeline/Chain.h
blob: c093fc57ad38707c967dd3116f75f938e2132e8d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#pragma once

#include <Interpreters/Context_fwd.h>
#include <Processors/IProcessor.h>
#include <QueryPipeline/QueryPlanResourceHolder.h>

namespace DB
{

/// Has one unconnected input port and one unconnected output port.
/// There may be other ports on the processors, but they must all be connected.
/// The unconnected input must be on the first processor, output - on the last.
/// The processors don't necessarily form an actual chain.
class Chain
{
public:
    Chain() = default;
    Chain(Chain &&) = default;
    Chain(const Chain &) = delete;

    Chain & operator=(Chain &&) = default;
    Chain & operator=(const Chain &) = delete;

    explicit Chain(ProcessorPtr processor);
    explicit Chain(std::list<ProcessorPtr> processors);

    bool empty() const { return processors.empty(); }

    size_t getNumThreads() const { return num_threads; }
    void setNumThreads(size_t num_threads_) { num_threads = num_threads_; }

    bool getConcurrencyControl() const { return concurrency_control; }
    void setConcurrencyControl(bool concurrency_control_) { concurrency_control = concurrency_control_; }

    void addSource(ProcessorPtr processor);
    void addSink(ProcessorPtr processor);
    void appendChain(Chain chain);

    IProcessor & getSource();
    IProcessor & getSink();

    InputPort & getInputPort() const;
    OutputPort & getOutputPort() const;

    const Block & getInputHeader() const { return getInputPort().getHeader(); }
    const Block & getOutputHeader() const { return getOutputPort().getHeader(); }

    const std::list<ProcessorPtr> & getProcessors() const { return processors; }
    static std::list<ProcessorPtr> getProcessors(Chain chain) { return std::move(chain.processors); }

    void addTableLock(TableLockHolder lock) { holder.table_locks.emplace_back(std::move(lock)); }
    void addStorageHolder(StoragePtr storage) { holder.storage_holders.emplace_back(std::move(storage)); }
    void addInterpreterContext(ContextPtr context) { holder.interpreter_context.emplace_back(std::move(context)); }

    void attachResources(QueryPlanResourceHolder holder_)
    {
        /// This operator "=" actually merges holder_ into holder, doesn't replace.
        holder = std::move(holder_);
    }
    QueryPlanResourceHolder detachResources() { return std::move(holder); }

    void reset();

private:
    QueryPlanResourceHolder holder;

    /// -> source -> transform -> ... -> transform -> sink ->
    ///  ^        ->           ->     ->           ->       ^
    ///  input port                               output port
    std::list<ProcessorPtr> processors;
    size_t num_threads = 0;
    bool concurrency_control = false;
};

}