blob: c093fc57ad38707c967dd3116f75f938e2132e8d (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
#pragma once
#include <Interpreters/Context_fwd.h>
#include <Processors/IProcessor.h>
#include <QueryPipeline/QueryPlanResourceHolder.h>
namespace DB
{
/// Has one unconnected input port and one unconnected output port.
/// There may be other ports on the processors, but they must all be connected.
/// The unconnected input must be on the first processor, output - on the last.
/// The processors don't necessarily form an actual chain.
class Chain
{
public:
Chain() = default;
Chain(Chain &&) = default;
Chain(const Chain &) = delete;
Chain & operator=(Chain &&) = default;
Chain & operator=(const Chain &) = delete;
explicit Chain(ProcessorPtr processor);
explicit Chain(std::list<ProcessorPtr> processors);
bool empty() const { return processors.empty(); }
size_t getNumThreads() const { return num_threads; }
void setNumThreads(size_t num_threads_) { num_threads = num_threads_; }
bool getConcurrencyControl() const { return concurrency_control; }
void setConcurrencyControl(bool concurrency_control_) { concurrency_control = concurrency_control_; }
void addSource(ProcessorPtr processor);
void addSink(ProcessorPtr processor);
void appendChain(Chain chain);
IProcessor & getSource();
IProcessor & getSink();
InputPort & getInputPort() const;
OutputPort & getOutputPort() const;
const Block & getInputHeader() const { return getInputPort().getHeader(); }
const Block & getOutputHeader() const { return getOutputPort().getHeader(); }
const std::list<ProcessorPtr> & getProcessors() const { return processors; }
static std::list<ProcessorPtr> getProcessors(Chain chain) { return std::move(chain.processors); }
void addTableLock(TableLockHolder lock) { holder.table_locks.emplace_back(std::move(lock)); }
void addStorageHolder(StoragePtr storage) { holder.storage_holders.emplace_back(std::move(storage)); }
void addInterpreterContext(ContextPtr context) { holder.interpreter_context.emplace_back(std::move(context)); }
void attachResources(QueryPlanResourceHolder holder_)
{
/// This operator "=" actually merges holder_ into holder, doesn't replace.
holder = std::move(holder_);
}
QueryPlanResourceHolder detachResources() { return std::move(holder); }
void reset();
private:
QueryPlanResourceHolder holder;
/// -> source -> transform -> ... -> transform -> sink ->
/// ^ -> -> -> -> ^
/// input port output port
std::list<ProcessorPtr> processors;
size_t num_threads = 0;
bool concurrency_control = false;
};
}
|