1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
#pragma once
#include <DataTypes/DataTypeAggregateFunction.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <Columns/ColumnAggregateFunction.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
namespace DB
{
struct Settings;
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
/** Not an aggregate function, but an adapter of aggregate functions,
* Aggregate functions with the `Merge` suffix accept `DataTypeAggregateFunction` as an argument
* (state of the aggregate function obtained earlier using the aggregate function with the `State` suffix)
* and combine them with aggregation.
*/
class AggregateFunctionMerge final : public IAggregateFunctionHelper<AggregateFunctionMerge>
{
private:
AggregateFunctionPtr nested_func;
public:
AggregateFunctionMerge(const AggregateFunctionPtr & nested_, const DataTypePtr & argument, const Array & params_)
: IAggregateFunctionHelper<AggregateFunctionMerge>({argument}, params_, createResultType(nested_))
, nested_func(nested_)
{
const DataTypeAggregateFunction * data_type = typeid_cast<const DataTypeAggregateFunction *>(argument.get());
if (!data_type || !nested_func->haveSameStateRepresentation(*data_type->getFunction()))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument for aggregate function {}, "
"expected {} or equivalent type", argument->getName(), getName(), getStateType()->getName());
}
String getName() const override
{
return nested_func->getName() + "Merge";
}
static DataTypePtr createResultType(const AggregateFunctionPtr & nested_)
{
return nested_->getResultType();
}
const IAggregateFunction & getBaseAggregateFunctionWithSameStateRepresentation() const override
{
return nested_func->getBaseAggregateFunctionWithSameStateRepresentation();
}
bool isVersioned() const override
{
return nested_func->isVersioned();
}
size_t getDefaultVersion() const override
{
return nested_func->getDefaultVersion();
}
DataTypePtr getStateType() const override
{
return nested_func->getStateType();
}
void create(AggregateDataPtr __restrict place) const override
{
nested_func->create(place);
}
void destroy(AggregateDataPtr __restrict place) const noexcept override
{
nested_func->destroy(place);
}
void destroyUpToState(AggregateDataPtr __restrict place) const noexcept override
{
nested_func->destroyUpToState(place);
}
bool hasTrivialDestructor() const override
{
return nested_func->hasTrivialDestructor();
}
size_t sizeOfData() const override
{
return nested_func->sizeOfData();
}
size_t alignOfData() const override
{
return nested_func->alignOfData();
}
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{
nested_func->merge(place, assert_cast<const ColumnAggregateFunction &>(*columns[0]).getData()[row_num], arena);
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
nested_func->merge(place, rhs, arena);
}
bool isAbleToParallelizeMerge() const override { return nested_func->isAbleToParallelizeMerge(); }
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override
{
nested_func->merge(place, rhs, thread_pool, arena);
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> version) const override
{
nested_func->serialize(place, buf, version);
}
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> version, Arena * arena) const override
{
nested_func->deserialize(place, buf, version, arena);
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
{
nested_func->insertResultInto(place, to, arena);
}
bool allocatesMemoryInArena() const override
{
return nested_func->allocatesMemoryInArena();
}
AggregateFunctionPtr getNestedFunction() const override { return nested_func; }
bool isState() const override
{
return nested_func->isState();
}
};
}
|