1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
|
#pragma once
#include <algorithm>
#include <cassert>
#include <list>
#include <mutex>
#include <optional>
#include <variant>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnVector.h>
#include <Columns/IColumn.h>
#include <Core/Joins.h>
#include <base/sort.h>
#include <Common/Arena.h>
namespace DB
{
class Block;
/// Reference to the row in block.
struct RowRef
{
using SizeT = uint32_t; /// Do not use size_t cause of memory economy
const Block * block = nullptr;
SizeT row_num = 0;
RowRef() = default;
RowRef(const Block * block_, size_t row_num_)
: block(block_)
, row_num(static_cast<SizeT>(row_num_))
{}
};
/// Single linked list of references to rows. Used for ALL JOINs (non-unique JOINs)
struct RowRefList : RowRef
{
/// Portion of RowRefs, 16 * (MAX_SIZE + 1) bytes sized.
struct Batch
{
static constexpr size_t MAX_SIZE = 7; /// Adequate values are 3, 7, 15, 31.
SizeT size = 0; /// It's smaller than size_t but keeps align in Arena.
Batch * next;
RowRef row_refs[MAX_SIZE];
explicit Batch(Batch * parent)
: next(parent)
{}
bool full() const { return size == MAX_SIZE; }
Batch * insert(RowRef && row_ref, Arena & pool)
{
if (full())
{
auto * batch = pool.alloc<Batch>();
*batch = Batch(this);
batch->insert(std::move(row_ref), pool);
return batch;
}
row_refs[size] = std::move(row_ref);
++size;
return this;
}
};
class ForwardIterator
{
public:
explicit ForwardIterator(const RowRefList * begin)
: root(begin)
, first(true)
, batch(root->next)
, position(0)
{}
const RowRef * operator -> () const
{
if (first)
return root;
return &batch->row_refs[position];
}
const RowRef * operator * () const
{
if (first)
return root;
return &batch->row_refs[position];
}
void operator ++ ()
{
if (first)
{
first = false;
return;
}
if (batch)
{
++position;
if (position >= batch->size)
{
batch = batch->next;
position = 0;
}
}
}
bool ok() const { return first || batch; }
private:
const RowRefList * root;
bool first;
Batch * batch;
size_t position;
};
RowRefList() {} /// NOLINT
RowRefList(const Block * block_, size_t row_num_) : RowRef(block_, row_num_) {}
ForwardIterator begin() const { return ForwardIterator(this); }
/// insert element after current one
void insert(RowRef && row_ref, Arena & pool)
{
if (!next)
{
next = pool.alloc<Batch>();
*next = Batch(nullptr);
}
next = next->insert(std::move(row_ref), pool);
}
private:
Batch * next = nullptr;
};
/**
* This class is intended to push sortable data into.
* When looking up values the container ensures that it is sorted for log(N) lookup
* After calling any of the lookup methods, it is no longer allowed to insert more data as this would invalidate the
* references that can be returned by the lookup methods
*/
struct SortedLookupVectorBase
{
SortedLookupVectorBase() = default;
virtual ~SortedLookupVectorBase() = default;
static std::optional<TypeIndex> getTypeSize(const IColumn & asof_column, size_t & type_size);
// This will be synchronized by the rwlock mutex in Join.h
virtual void insert(const IColumn &, const Block *, size_t) = 0;
// This needs to be synchronized internally
virtual RowRef findAsof(const IColumn &, size_t) = 0;
};
// It only contains a std::unique_ptr which is memmovable.
// Source: https://github.com/ClickHouse/ClickHouse/issues/4906
using AsofRowRefs = std::unique_ptr<SortedLookupVectorBase>;
AsofRowRefs createAsofRowRef(TypeIndex type, ASOFJoinInequality inequality);
}
|