1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
|
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionUniq.h>
#include <AggregateFunctions/Helpers.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypeIPv4andIPv6.h>
#include <Core/Settings.h>
namespace DB
{
struct Settings;
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
namespace
{
/** `DataForVariadic` is a data structure that will be used for `uniq` aggregate function of multiple arguments.
* It differs, for example, in that it uses a trivial hash function, since `uniq` of many arguments first hashes them out itself.
*/
template <typename Data, template <bool, bool> typename DataForVariadic>
AggregateFunctionPtr
createAggregateFunctionUniq(const std::string & name, const DataTypes & argument_types, const Array & params, const Settings *)
{
assertNoParameters(name, params);
if (argument_types.empty())
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Incorrect number of arguments for aggregate function {}", name);
bool use_exact_hash_function = !isAllArgumentsContiguousInMemory(argument_types);
if (argument_types.size() == 1)
{
const IDataType & argument_type = *argument_types[0];
AggregateFunctionPtr res(createWithNumericType<AggregateFunctionUniq, Data>(*argument_types[0], argument_types));
WhichDataType which(argument_type);
if (res)
return res;
else if (which.isDate())
return std::make_shared<AggregateFunctionUniq<DataTypeDate::FieldType, Data>>(argument_types);
else if (which.isDate32())
return std::make_shared<AggregateFunctionUniq<DataTypeDate32::FieldType, Data>>(argument_types);
else if (which.isDateTime())
return std::make_shared<AggregateFunctionUniq<DataTypeDateTime::FieldType, Data>>(argument_types);
else if (which.isStringOrFixedString())
return std::make_shared<AggregateFunctionUniq<String, Data>>(argument_types);
else if (which.isUUID())
return std::make_shared<AggregateFunctionUniq<DataTypeUUID::FieldType, Data>>(argument_types);
else if (which.isIPv4())
return std::make_shared<AggregateFunctionUniq<DataTypeIPv4::FieldType, Data>>(argument_types);
else if (which.isIPv6())
return std::make_shared<AggregateFunctionUniq<DataTypeIPv6::FieldType, Data>>(argument_types);
else if (which.isTuple())
{
if (use_exact_hash_function)
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic<true, true>>>(argument_types);
else
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic<false, true>>>(argument_types);
}
}
/// "Variadic" method also works as a fallback generic case for single argument.
if (use_exact_hash_function)
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic<true, false>>>(argument_types);
else
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic<false, false>>>(argument_types);
}
template <bool is_exact, template <typename, bool> typename Data, template <bool, bool, bool> typename DataForVariadic, bool is_able_to_parallelize_merge>
AggregateFunctionPtr
createAggregateFunctionUniq(const std::string & name, const DataTypes & argument_types, const Array & params, const Settings *)
{
assertNoParameters(name, params);
if (argument_types.empty())
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Incorrect number of arguments for aggregate function {}", name);
/// We use exact hash function if the user wants it;
/// or if the arguments are not contiguous in memory, because only exact hash function have support for this case.
bool use_exact_hash_function = is_exact || !isAllArgumentsContiguousInMemory(argument_types);
if (argument_types.size() == 1)
{
const IDataType & argument_type = *argument_types[0];
AggregateFunctionPtr res(createWithNumericType<AggregateFunctionUniq, Data, is_able_to_parallelize_merge>(*argument_types[0], argument_types));
WhichDataType which(argument_type);
if (res)
return res;
else if (which.isDate())
return std::make_shared<AggregateFunctionUniq<DataTypeDate::FieldType, Data<DataTypeDate::FieldType, is_able_to_parallelize_merge>>>(argument_types);
else if (which.isDate32())
return std::make_shared<AggregateFunctionUniq<DataTypeDate32::FieldType, Data<DataTypeDate32::FieldType, is_able_to_parallelize_merge>>>(argument_types);
else if (which.isDateTime())
return std::make_shared<AggregateFunctionUniq<DataTypeDateTime::FieldType, Data<DataTypeDateTime::FieldType, is_able_to_parallelize_merge>>>(argument_types);
else if (which.isStringOrFixedString())
return std::make_shared<AggregateFunctionUniq<String, Data<String, is_able_to_parallelize_merge>>>(argument_types);
else if (which.isUUID())
return std::make_shared<AggregateFunctionUniq<DataTypeUUID::FieldType, Data<DataTypeUUID::FieldType, is_able_to_parallelize_merge>>>(argument_types);
else if (which.isIPv4())
return std::make_shared<AggregateFunctionUniq<DataTypeIPv4::FieldType, Data<DataTypeIPv4::FieldType, is_able_to_parallelize_merge>>>(argument_types);
else if (which.isIPv6())
return std::make_shared<AggregateFunctionUniq<DataTypeIPv6::FieldType, Data<DataTypeIPv6::FieldType, is_able_to_parallelize_merge>>>(argument_types);
else if (which.isTuple())
{
if (use_exact_hash_function)
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic<true, true, is_able_to_parallelize_merge>>>(argument_types);
else
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic<false, true, is_able_to_parallelize_merge>>>(argument_types);
}
}
/// "Variadic" method also works as a fallback generic case for single argument.
if (use_exact_hash_function)
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic<true, false, is_able_to_parallelize_merge>>>(argument_types);
else
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic<false, false, is_able_to_parallelize_merge>>>(argument_types);
}
}
void registerAggregateFunctionsUniq(AggregateFunctionFactory & factory)
{
AggregateFunctionProperties properties = { .returns_default_when_only_null = true, .is_order_dependent = false };
factory.registerFunction("uniq",
{createAggregateFunctionUniq<AggregateFunctionUniqUniquesHashSetData, AggregateFunctionUniqUniquesHashSetDataForVariadic>, properties});
factory.registerFunction("uniqHLL12",
{createAggregateFunctionUniq<false, AggregateFunctionUniqHLL12Data, AggregateFunctionUniqHLL12DataForVariadic, false /* is_able_to_parallelize_merge */>, properties});
auto assign_bool_param = [](const std::string & name, const DataTypes & argument_types, const Array & params, const Settings * settings)
{
/// Using two level hash set if we wouldn't be able to merge in parallel can cause ~10% slowdown.
if (settings && settings->max_threads > 1)
return createAggregateFunctionUniq<
true, AggregateFunctionUniqExactData, AggregateFunctionUniqExactDataForVariadic, true /* is_able_to_parallelize_merge */>(name, argument_types, params, settings);
else
return createAggregateFunctionUniq<
true, AggregateFunctionUniqExactData, AggregateFunctionUniqExactDataForVariadic, false /* is_able_to_parallelize_merge */>(name, argument_types, params, settings);
};
factory.registerFunction("uniqExact", {assign_bool_param, properties});
#if USE_DATASKETCHES
factory.registerFunction("uniqTheta",
{createAggregateFunctionUniq<AggregateFunctionUniqThetaData, AggregateFunctionUniqThetaDataForVariadic>, properties});
#endif
}
}
|