1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
|
#pragma once
#include <AggregateFunctions/IAggregateFunction.h>
#include <Columns/IColumn.h>
#include <Common/PODArray.h>
#include <Core/Field.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <Functions/FunctionHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
class Arena;
using ArenaPtr = std::shared_ptr<Arena>;
using ConstArenaPtr = std::shared_ptr<const Arena>;
using ConstArenas = std::vector<ConstArenaPtr>;
/** Column of states of aggregate functions.
* Presented as an array of pointers to the states of aggregate functions (data).
* The states themselves are stored in one of the pools (arenas).
*
* It can be in two variants:
*
* 1. Own its values - that is, be responsible for destroying them.
* The column consists of the values "assigned to it" after the aggregation is performed (see Aggregator, convertToBlocks function),
* or from values created by itself (see `insert` method).
* In this case, `src` will be `nullptr`, and the column itself will be destroyed (call `IAggregateFunction::destroy`)
* states of aggregate functions in the destructor.
*
* 2. Do not own its values, but use values taken from another ColumnAggregateFunction column.
* For example, this is a column obtained by permutation/filtering or other transformations from another column.
* In this case, `src` will be `shared ptr` to the source column. Destruction of values will be handled by this source column.
*
* This solution is somewhat limited:
* - the variant in which the column contains a part of "it's own" and a part of "another's" values is not supported;
* - the option of having multiple source columns is not supported, which may be necessary for a more optimal merge of the two columns.
*
* These restrictions can be removed if you add an array of flags or even refcount,
* specifying which individual values should be destroyed and which ones should not.
* Clearly, this method would have a substantially non-zero price.
*/
class ColumnAggregateFunction final : public COWHelper<IColumn, ColumnAggregateFunction>
{
public:
using Container = PaddedPODArray<AggregateDataPtr>;
private:
friend class COWHelper<IColumn, ColumnAggregateFunction>;
/// Arenas used by function states that are created elsewhere. We own these
/// arenas in the sense of extending their lifetime, but do not modify them.
/// Even reading these arenas is unsafe, because they may be shared with
/// other data blocks and modified by other threads concurrently.
ConstArenas foreign_arenas;
/// Arena for allocating the internals of function states created by current
/// column (e.g., when inserting new states).
ArenaPtr my_arena;
/// Used for destroying states and for finalization of values.
AggregateFunctionPtr func;
/// Source column. Used (holds source from destruction),
/// if this column has been constructed from another and uses all or part of its values.
ColumnPtr src;
/// Do not share the source column (`src`) after further modifications (i.e. insertRangeFrom()).
/// This may be useful for proper memory tracking, since source column may contain aggregate states.
bool force_data_ownership = false;
/// Array of pointers to aggregation states, that are placed in arenas.
Container data;
/// Name of the type to distinguish different aggregation states.
String type_string;
std::optional<size_t> version;
ColumnAggregateFunction() = default;
/// Create a new column that has another column as a source.
MutablePtr createView() const;
explicit ColumnAggregateFunction(const AggregateFunctionPtr & func_, std::optional<size_t> version_ = std::nullopt);
ColumnAggregateFunction(const AggregateFunctionPtr & func_, const ConstArenas & arenas_);
ColumnAggregateFunction(const ColumnAggregateFunction & src_);
void insertFromWithOwnership(const IColumn & from, size_t n);
public:
~ColumnAggregateFunction() override;
void set(const AggregateFunctionPtr & func_, std::optional<size_t> version_ = std::nullopt);
AggregateFunctionPtr getAggregateFunction() { return func; }
AggregateFunctionPtr getAggregateFunction() const { return func; }
/// If we have another column as a source (owner of data), copy all data to ourself and reset source.
/// This is needed before inserting new elements, because we must own these elements (to destroy them in destructor),
/// but ownership of different elements cannot be mixed by different columns.
void ensureOwnership() override;
/// Take shared ownership of Arena, that holds memory for states of aggregate functions.
void addArena(ConstArenaPtr arena_);
/// Transform column with states of aggregate functions to column with final result values.
/// It expects ColumnAggregateFunction as an argument, this column will be destroyed.
/// This method is made static and receive MutableColumnPtr object to explicitly destroy it.
static MutableColumnPtr convertToValues(MutableColumnPtr column);
std::string getName() const override { return "AggregateFunction(" + func->getName() + ")"; }
const char * getFamilyName() const override { return "AggregateFunction"; }
TypeIndex getDataType() const override { return TypeIndex::AggregateFunction; }
MutableColumnPtr predictValues(const ColumnsWithTypeAndName & arguments, ContextPtr context) const;
size_t size() const override
{
return getData().size();
}
MutableColumnPtr cloneEmpty() const override;
Field operator[](size_t n) const override;
void get(size_t n, Field & res) const override;
bool isDefaultAt(size_t) const override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method isDefaultAt is not supported for ColumnAggregateFunction");
}
StringRef getDataAt(size_t n) const override;
void insertData(const char * pos, size_t length) override;
void insertFrom(const IColumn & from, size_t n) override;
void insertFrom(ConstAggregateDataPtr place);
/// Merge state at last row with specified state in another column.
void insertMergeFrom(ConstAggregateDataPtr place);
void insertMergeFrom(const IColumn & from, size_t n);
Arena & createOrGetArena();
void insert(const Field & x) override;
void insertDefault() override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const UInt8 *) const override;
const char * deserializeAndInsertFromArena(const char * src_arena) override;
const char * skipSerializedInArena(const char *) const override;
void updateHashWithValue(size_t n, SipHash & hash) const override;
void updateWeakHash32(WeakHash32 & hash) const override;
void updateHashFast(SipHash & hash) const override;
size_t byteSize() const override;
size_t byteSizeAt(size_t n) const override;
size_t allocatedBytes() const override;
void protect() override;
void insertRangeFrom(const IColumn & from, size_t start, size_t length) override;
void popBack(size_t n) override;
ColumnPtr filter(const Filter & filter, ssize_t result_size_hint) const override;
void expand(const Filter & mask, bool inverted) override;
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
ColumnPtr index(const IColumn & indexes, size_t limit) const override;
template <typename Type>
ColumnPtr indexImpl(const PaddedPODArray<Type> & indexes, size_t limit) const;
ColumnPtr replicate(const Offsets & offsets) const override;
MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) const override;
void gather(ColumnGathererStream & gatherer_stream) override;
int compareAt(size_t, size_t, const IColumn &, int) const override
{
return 0;
}
void compareColumn(const IColumn &, size_t, PaddedPODArray<UInt64> *, PaddedPODArray<Int8> &, int, int) const override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method compareColumn is not supported for ColumnAggregateFunction");
}
bool hasEqualValues() const override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method hasEqualValues is not supported for ColumnAggregateFunction");
}
double getRatioOfDefaultRows(double) const override
{
return 0.0;
}
UInt64 getNumberOfDefaultRows() const override
{
return 0;
}
void getIndicesOfNonDefaultRows(Offsets &, size_t, size_t) const override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method getIndicesOfNonDefaultRows is not supported for ColumnAggregateFunction");
}
void getPermutation(PermutationSortDirection direction, PermutationSortStability stability,
size_t limit, int nan_direction_hint, Permutation & res) const override;
void updatePermutation(PermutationSortDirection direction, PermutationSortStability stability,
size_t limit, int, Permutation & res, EqualRanges & equal_ranges) const override;
/** More efficient manipulation methods */
Container & getData()
{
return data;
}
const Container & getData() const
{
return data;
}
void getExtremes(Field & min, Field & max) const override;
bool structureEquals(const IColumn &) const override;
MutableColumnPtr cloneResized(size_t size) const override;
};
}
|