1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
|
#pragma once
#include <Processors/IProcessor.h>
namespace DB
{
class IJoin;
using JoinPtr = std::shared_ptr<IJoin>;
class NotJoinedBlocks;
class IBlocksStream;
using IBlocksStreamPtr = std::shared_ptr<IBlocksStream>;
/// Join rows to chunk form left table.
/// This transform usually has two input ports and one output.
/// First input is for data from left table.
/// Second input has empty header and is connected with FillingRightJoinSide.
/// We can process left table only when Join is filled. Second input is used to signal that FillingRightJoinSide is finished.
class JoiningTransform : public IProcessor
{
public:
/// Count streams and check which is last.
/// The last one should process non-joined rows.
class FinishCounter
{
public:
explicit FinishCounter(size_t total_) : total(total_) {}
bool isLast()
{
return finished.fetch_add(1) + 1 >= total;
}
private:
const size_t total;
std::atomic<size_t> finished{0};
};
using FinishCounterPtr = std::shared_ptr<FinishCounter>;
JoiningTransform(
const Block & input_header,
const Block & output_header,
JoinPtr join_,
size_t max_block_size_,
bool on_totals_ = false,
bool default_totals_ = false,
FinishCounterPtr finish_counter_ = nullptr);
~JoiningTransform() override;
String getName() const override { return "JoiningTransform"; }
static Block transformHeader(Block header, const JoinPtr & join);
OutputPort & getFinishedSignal();
Status prepare() override;
void work() override;
protected:
void transform(Chunk & chunk);
private:
Chunk input_chunk;
Chunk output_chunk;
bool has_input = false;
bool has_output = false;
bool stop_reading = false;
bool process_non_joined = true;
JoinPtr join;
bool on_totals;
/// This flag means that we have manually added totals to our pipeline.
/// It may happen in case if joined subquery has totals, but out string doesn't.
/// We need to join default values with subquery totals if we have them, or return empty chunk is haven't.
bool default_totals;
bool initialized = false;
ExtraBlockPtr not_processed;
FinishCounterPtr finish_counter;
IBlocksStreamPtr non_joined_blocks;
size_t max_block_size;
Block readExecute(Chunk & chunk);
};
/// Fills Join with block from right table.
/// Has single input and single output port.
/// Output port has empty header. It is closed when all data is inserted in join.
class FillingRightJoinSideTransform : public IProcessor
{
public:
FillingRightJoinSideTransform(Block input_header, JoinPtr join_);
String getName() const override { return "FillingRightJoinSide"; }
InputPort * addTotalsPort();
Status prepare() override;
void work() override;
private:
JoinPtr join;
Chunk chunk;
bool stop_reading = false;
bool for_totals = false;
bool set_totals = false;
};
class DelayedBlocksTask : public ChunkInfo
{
public:
DelayedBlocksTask() = default;
explicit DelayedBlocksTask(IBlocksStreamPtr delayed_blocks_, JoiningTransform::FinishCounterPtr left_delayed_stream_finish_counter_)
: delayed_blocks(std::move(delayed_blocks_))
, left_delayed_stream_finish_counter(left_delayed_stream_finish_counter_)
{
}
IBlocksStreamPtr delayed_blocks = nullptr;
JoiningTransform::FinishCounterPtr left_delayed_stream_finish_counter = nullptr;
};
using DelayedBlocksTaskPtr = std::shared_ptr<const DelayedBlocksTask>;
/// Reads delayed joined blocks from Join
class DelayedJoinedBlocksTransform : public IProcessor
{
public:
explicit DelayedJoinedBlocksTransform(size_t num_streams, JoinPtr join_);
String getName() const override { return "DelayedJoinedBlocksTransform"; }
Status prepare() override;
void work() override;
private:
JoinPtr join;
IBlocksStreamPtr delayed_blocks = nullptr;
bool finished = false;
};
class DelayedJoinedBlocksWorkerTransform : public IProcessor
{
public:
using NonJoinedStreamBuilder = std::function<IBlocksStreamPtr()>;
explicit DelayedJoinedBlocksWorkerTransform(
Block output_header_,
NonJoinedStreamBuilder non_joined_stream_builder_);
String getName() const override { return "DelayedJoinedBlocksWorkerTransform"; }
Status prepare() override;
void work() override;
private:
DelayedBlocksTaskPtr task;
Chunk output_chunk;
/// For building a block stream to access the non-joined rows.
NonJoinedStreamBuilder non_joined_stream_builder;
IBlocksStreamPtr non_joined_delayed_stream = nullptr;
void resetTask();
Block nextNonJoinedBlock();
};
}
|