1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
|
#pragma once
#include <cassert>
#include <cstddef>
#include <memory>
#include <mutex>
#include <optional>
#include <unordered_map>
#include <utility>
#include <vector>
#include <boost/core/noncopyable.hpp>
#include <Common/PODArray.h>
#include <Core/SortCursor.h>
#include <Core/SortDescription.h>
#include <IO/ReadBuffer.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Processors/Chunk.h>
#include <Processors/Merges/Algorithms/IMergingAlgorithm.h>
#include <Processors/Merges/IMergingTransform.h>
namespace Poco { class Logger; }
namespace DB
{
class IJoin;
using JoinPtr = std::shared_ptr<IJoin>;
class FullMergeJoinCursor;
using FullMergeJoinCursorPtr = std::unique_ptr<FullMergeJoinCursor>;
/// Used instead of storing previous block
struct JoinKeyRow
{
std::vector<ColumnPtr> row;
JoinKeyRow() = default;
explicit JoinKeyRow(const SortCursorImpl & impl_, size_t pos)
{
row.reserve(impl_.sort_columns.size());
for (const auto & col : impl_.sort_columns)
{
auto new_col = col->cloneEmpty();
new_col->insertFrom(*col, pos);
row.push_back(std::move(new_col));
}
}
void reset()
{
row.clear();
}
bool equals(const SortCursorImpl & impl) const
{
if (row.empty())
return false;
assert(this->row.size() == impl.sort_columns_size);
for (size_t i = 0; i < impl.sort_columns_size; ++i)
{
int cmp = this->row[i]->compareAt(0, impl.getRow(), *impl.sort_columns[i], impl.desc[i].nulls_direction);
if (cmp != 0)
return false;
}
return true;
}
};
/// Remembers previous key if it was joined in previous block
class AnyJoinState : boost::noncopyable
{
public:
AnyJoinState() = default;
void set(size_t source_num, const SortCursorImpl & cursor)
{
assert(cursor.rows);
keys[source_num] = JoinKeyRow(cursor, cursor.rows - 1);
}
void setValue(Chunk value_) { value = std::move(value_); }
bool empty() const { return keys[0].row.empty() && keys[1].row.empty(); }
/// current keys
JoinKeyRow keys[2];
/// for LEFT/RIGHT join use previously joined row from other table.
Chunk value;
};
/// Accumulate blocks with same key and cross-join them
class AllJoinState : boost::noncopyable
{
public:
struct Range
{
Range() = default;
explicit Range(Chunk chunk_, size_t begin_, size_t length_)
: begin(begin_)
, length(length_)
, current(begin_)
, chunk(std::move(chunk_))
{
assert(length > 0 && begin + length <= chunk.getNumRows());
}
size_t begin;
size_t length;
size_t current;
Chunk chunk;
};
AllJoinState(const SortCursorImpl & lcursor, size_t lpos,
const SortCursorImpl & rcursor, size_t rpos)
: keys{JoinKeyRow(lcursor, lpos), JoinKeyRow(rcursor, rpos)}
{
}
void addRange(size_t source_num, Chunk chunk, size_t begin, size_t length)
{
if (source_num == 0)
left.emplace_back(std::move(chunk), begin, length);
else
right.emplace_back(std::move(chunk), begin, length);
}
bool next()
{
/// advance right to one row, when right finished, advance left to next block
assert(!left.empty() && !right.empty());
if (finished())
return false;
bool has_next_right = nextRight();
if (has_next_right)
return true;
return nextLeft();
}
bool finished() const { return lidx >= left.size(); }
size_t blocksStored() const { return left.size() + right.size(); }
const Range & getLeft() const { return left[lidx]; }
const Range & getRight() const { return right[ridx]; }
/// Left and right types can be different because of nullable
JoinKeyRow keys[2];
private:
bool nextLeft()
{
lidx += 1;
return lidx < left.size();
}
bool nextRight()
{
/// cycle through right rows
right[ridx].current += 1;
if (right[ridx].current >= right[ridx].begin + right[ridx].length)
{
/// reset current row index to the beginning, because range will be accessed again
right[ridx].current = right[ridx].begin;
ridx += 1;
if (ridx >= right.size())
{
ridx = 0;
return false;
}
}
return true;
}
std::vector<Range> left;
std::vector<Range> right;
size_t lidx = 0;
size_t ridx = 0;
};
/*
* Wrapper for SortCursorImpl
*/
class FullMergeJoinCursor : boost::noncopyable
{
public:
explicit FullMergeJoinCursor(const Block & sample_block_, const SortDescription & description_)
: sample_block(sample_block_.cloneEmpty())
, desc(description_)
{
}
bool fullyCompleted() const;
void setChunk(Chunk && chunk);
const Chunk & getCurrent() const;
Chunk detach();
SortCursorImpl * operator-> () { return &cursor; }
const SortCursorImpl * operator-> () const { return &cursor; }
SortCursorImpl cursor;
const Block & sampleBlock() const { return sample_block; }
Columns sampleColumns() const { return sample_block.getColumns(); }
private:
Block sample_block;
SortDescription desc;
Chunk current_chunk;
bool recieved_all_blocks = false;
};
/*
* This class is used to join chunks from two sorted streams.
* It is used in MergeJoinTransform.
*/
class MergeJoinAlgorithm final : public IMergingAlgorithm
{
public:
explicit MergeJoinAlgorithm(JoinPtr table_join, const Blocks & input_headers, size_t max_block_size_);
virtual void initialize(Inputs inputs) override;
virtual void consume(Input & input, size_t source_num) override;
virtual Status merge() override;
void logElapsed(double seconds);
private:
std::optional<Status> handleAnyJoinState();
Status anyJoin(JoinKind kind);
std::optional<Status> handleAllJoinState();
Status allJoin(JoinKind kind);
Chunk createBlockWithDefaults(size_t source_num);
Chunk createBlockWithDefaults(size_t source_num, size_t start, size_t num_rows) const;
/// For `USING` join key columns should have values from right side instead of defaults
std::unordered_map<size_t, size_t> left_to_right_key_remap;
std::vector<FullMergeJoinCursorPtr> cursors;
/// Keep some state to make connection between data in different blocks
AnyJoinState any_join_state;
std::unique_ptr<AllJoinState> all_join_state;
JoinPtr table_join;
size_t max_block_size;
struct Statistic
{
size_t num_blocks[2] = {0, 0};
size_t num_rows[2] = {0, 0};
size_t max_blocks_loaded = 0;
};
Statistic stat;
Poco::Logger * log;
};
class MergeJoinTransform final : public IMergingTransform<MergeJoinAlgorithm>
{
using Base = IMergingTransform<MergeJoinAlgorithm>;
public:
MergeJoinTransform(
JoinPtr table_join,
const Blocks & input_headers,
const Block & output_header,
size_t max_block_size,
UInt64 limit_hint = 0);
String getName() const override { return "MergeJoinTransform"; }
protected:
void onFinish() override;
Poco::Logger * log;
};
}
|