Skip to content

Commit 82ff10c

Browse files
authored
add unique function support (#858)
* add unique function support * change hash map logic * code structure revise * protect function
1 parent 905a743 commit 82ff10c

File tree

6 files changed

+960
-148
lines changed

6 files changed

+960
-148
lines changed
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
#include "AggregateFunctionUniq.h"
2+
3+
#include <AggregateFunctions/AggregateFunctionFactory.h>
4+
#include <AggregateFunctions/FactoryHelpers.h>
5+
#include <AggregateFunctions/Helpers.h>
6+
7+
#include <DataTypes/DataTypeDate.h>
8+
#include <DataTypes/DataTypeDate32.h>
9+
#include <DataTypes/DataTypeDateTime.h>
10+
#include <DataTypes/DataTypeUUID.h>
11+
12+
namespace DB
13+
{
14+
struct Settings;
15+
16+
namespace ErrorCodes
17+
{
18+
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
19+
}
20+
21+
22+
namespace Streaming
23+
{
24+
25+
26+
/** `DataForVariadic` is a data structure that will be used for `uniq` aggregate function of multiple arguments.
27+
* It differs, for example, in that it uses a trivial hash function, since `uniq` of many arguments first hashes them out itself.
28+
*/
29+
template <typename Data, template <bool, bool> typename DataForVariadic>
30+
AggregateFunctionPtr
31+
createAggregateFunctionUniq(const std::string & name, const DataTypes & argument_types, const Array & params, const Settings *)
32+
{
33+
assertNoParameters(name, params);
34+
35+
if (argument_types.size() < 2)
36+
throw Exception("Incorrect number of arguments for aggregate function " + name,
37+
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
38+
39+
bool use_exact_hash_function = !isAllArgumentsContiguousInMemory(argument_types);
40+
41+
if (argument_types.size() == 2)
42+
{
43+
const IDataType & argument_type = *argument_types[0];
44+
45+
AggregateFunctionPtr res(createWithNumericType<AggregateFunctionUniq, Data>(*argument_types[0], argument_types));
46+
47+
WhichDataType which(argument_type);
48+
if (res)
49+
return res;
50+
else if (which.isDate())
51+
return std::make_shared<AggregateFunctionUniq<DataTypeDate::FieldType, Data>>(argument_types);
52+
else if (which.isDate32())
53+
return std::make_shared<AggregateFunctionUniq<DataTypeDate32::FieldType, Data>>(argument_types);
54+
else if (which.isDateTime())
55+
return std::make_shared<AggregateFunctionUniq<DataTypeDateTime::FieldType, Data>>(argument_types);
56+
else if (which.isStringOrFixedString())
57+
return std::make_shared<AggregateFunctionUniq<String, Data>>(argument_types);
58+
else if (which.isUUID())
59+
return std::make_shared<AggregateFunctionUniq<DataTypeUUID::FieldType, Data>>(argument_types);
60+
else if (which.isTuple())
61+
{
62+
if (use_exact_hash_function)
63+
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic<true, true>>>(argument_types);
64+
else
65+
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic<false, true>>>(argument_types);
66+
}
67+
}
68+
69+
/// "Variadic" method also works as a fallback generic case for single argument.
70+
if (use_exact_hash_function)
71+
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic<true, false>>>(argument_types);
72+
else
73+
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic<false, false>>>(argument_types);
74+
}
75+
76+
template <bool is_exact, template <typename, bool> typename Data, template <bool, bool, bool> typename DataForVariadic, bool is_able_to_parallelize_merge>
77+
AggregateFunctionPtr
78+
createAggregateFunctionUniq(const std::string & name, const DataTypes & argument_types, const Array & params, const Settings *)
79+
{
80+
assertNoParameters(name, params);
81+
82+
if (argument_types.size() < 2)
83+
throw Exception("Incorrect number of arguments for aggregate function " + name,
84+
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
85+
86+
/// We use exact hash function if the user wants it;
87+
/// or if the arguments are not contiguous in memory, because only exact hash function have support for this case.
88+
bool use_exact_hash_function = is_exact || !isAllArgumentsContiguousInMemory(argument_types);
89+
90+
if (argument_types.size() == 2)
91+
{
92+
const IDataType & argument_type = *argument_types[0];
93+
94+
AggregateFunctionPtr res(createWithNumericType<AggregateFunctionUniq, Data, is_able_to_parallelize_merge>(*argument_types[0], argument_types));
95+
96+
WhichDataType which(argument_type);
97+
if (res)
98+
return res;
99+
else if (which.isDate())
100+
return std::make_shared<AggregateFunctionUniq<DataTypeDate::FieldType, Data<DataTypeDate::FieldType, is_able_to_parallelize_merge>>>(argument_types);
101+
else if (which.isDate32())
102+
return std::make_shared<AggregateFunctionUniq<DataTypeDate32::FieldType, Data<DataTypeDate32::FieldType, is_able_to_parallelize_merge>>>(argument_types);
103+
else if (which.isDateTime())
104+
return std::make_shared<AggregateFunctionUniq<DataTypeDateTime::FieldType, Data<DataTypeDateTime::FieldType, is_able_to_parallelize_merge>>>(argument_types);
105+
else if (which.isStringOrFixedString())
106+
return std::make_shared<AggregateFunctionUniq<String, Data<String, is_able_to_parallelize_merge>>>(argument_types);
107+
else if (which.isUUID())
108+
return std::make_shared<AggregateFunctionUniq<DataTypeUUID::FieldType, Data<DataTypeUUID::FieldType, is_able_to_parallelize_merge>>>(argument_types);
109+
else if (which.isTuple())
110+
{
111+
if (use_exact_hash_function)
112+
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic<true, true, is_able_to_parallelize_merge>>>(argument_types);
113+
else
114+
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic<false, true, is_able_to_parallelize_merge>>>(argument_types);
115+
}
116+
}
117+
118+
/// "Variadic" method also works as a fallback generic case for single argument.
119+
if (use_exact_hash_function)
120+
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic<true, false, is_able_to_parallelize_merge>>>(argument_types);
121+
else
122+
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic<false, false, is_able_to_parallelize_merge>>>(argument_types);
123+
}
124+
125+
void registerAggregateFunctionsUniqRetract(AggregateFunctionFactory & factory)
126+
{
127+
AggregateFunctionProperties properties = { .returns_default_when_only_null = true, .is_order_dependent = false };
128+
129+
factory.registerFunction("__unique_retract",
130+
{createAggregateFunctionUniq<AggregateFunctionUniqUniquesHashSetData, AggregateFunctionUniqUniquesHashSetDataForVariadic>, properties});
131+
132+
auto assign_bool_param = [](const std::string & name, const DataTypes & argument_types, const Array & params, const Settings * settings)
133+
{
134+
return createAggregateFunctionUniq<
135+
true, AggregateFunctionUniqExactData, AggregateFunctionUniqExactDataForVariadic, false /* is_able_to_parallelize_merge */>(name, argument_types, params, settings);
136+
};
137+
factory.registerFunction("__unique_exact_retract", {assign_bool_param, properties});
138+
139+
}
140+
}
141+
}

0 commit comments

Comments
 (0)