1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
#pragma once
#include <condition_variable>
#include <memory>
#include <optional>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/HashJoin.h>
#include <Interpreters/IJoin.h>
#include <base/defines.h>
#include <base/types.h>
#include <Common/Stopwatch.h>
namespace DB
{
/**
* Can run addBlockToJoin() parallelly to speedup the join process. On test, it almose linear speedup by
* the degree of parallelism.
*
* The default HashJoin is not thread safe for inserting right table's rows and run it in a single thread. When
* the right table is large, the join process is too slow.
*
* We create multiple HashJoin instances here. In addBlockToJoin(), one input block is split into multiple blocks
* corresponding to the HashJoin instances by hashing every row on the join keys. And make a guarantee that every HashJoin
* instance is written by only one thread.
*
* When come to the left table matching, the blocks from left table are alse split into different HashJoin instances.
*
*/
class ConcurrentHashJoin : public IJoin
{
public:
explicit ConcurrentHashJoin(
ContextPtr context_,
std::shared_ptr<TableJoin> table_join_,
size_t slots_,
const Block & right_sample_block,
bool any_take_last_row_ = false);
~ConcurrentHashJoin() override = default;
std::string getName() const override { return "ConcurrentHashJoin"; }
const TableJoin & getTableJoin() const override { return *table_join; }
bool addBlockToJoin(const Block & block, bool check_limits) override;
void checkTypesOfKeys(const Block & block) const override;
void joinBlock(Block & block, std::shared_ptr<ExtraBlock> & not_processed) override;
void setTotals(const Block & block) override;
const Block & getTotals() const override;
size_t getTotalRowCount() const override;
size_t getTotalByteCount() const override;
bool alwaysReturnsEmptySet() const override;
bool supportParallelJoin() const override { return true; }
IBlocksStreamPtr
getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override;
private:
struct InternalHashJoin
{
std::mutex mutex;
std::unique_ptr<HashJoin> data;
};
ContextPtr context;
std::shared_ptr<TableJoin> table_join;
size_t slots;
std::vector<std::shared_ptr<InternalHashJoin>> hash_joins;
std::mutex totals_mutex;
Block totals;
IColumn::Selector selectDispatchBlock(const Strings & key_columns_names, const Block & from_block);
Blocks dispatchBlock(const Strings & key_columns_names, const Block & from_block);
};
}
|