Skip to content

Commit 5ed28cb

Browse files
authored
Feature/issue 67 uda support changelog (#137)
* UDA support 'retract' on changelog stream. close #67
1 parent cc67ce0 commit 5ed28cb

17 files changed

+764
-107
lines changed

src/AggregateFunctions/AggregateFunctionFactory.cpp

+24-9
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,14 @@ static DataTypes convertLowCardinalityTypesToNested(const DataTypes & types)
7070
return res_types;
7171
}
7272

73+
/// proton: starts
7374
AggregateFunctionPtr AggregateFunctionFactory::get(
74-
const String & name, const DataTypes & argument_types, const Array & parameters, AggregateFunctionProperties & out_properties) const
75+
const String & name,
76+
const DataTypes & argument_types,
77+
const Array & parameters,
78+
AggregateFunctionProperties & out_properties,
79+
bool is_changelog_input) const
80+
/// proton: ends
7581
{
7682
auto types_without_low_cardinality = convertLowCardinalityTypesToNested(argument_types);
7783

@@ -92,7 +98,7 @@ AggregateFunctionPtr AggregateFunctionFactory::get(
9298
[](const auto & type) { return type->onlyNull(); });
9399

94100
AggregateFunctionPtr nested_function = getImpl(
95-
name, nested_types, nested_parameters, out_properties, has_null_arguments);
101+
name, nested_types, nested_parameters, out_properties, has_null_arguments, is_changelog_input);
96102

97103
// Pure window functions are not real aggregate functions. Applying
98104
// combinators doesn't make sense for them, they must handle the
@@ -103,20 +109,22 @@ AggregateFunctionPtr AggregateFunctionFactory::get(
103109
return combinator->transformAggregateFunction(nested_function, out_properties, types_without_low_cardinality, parameters);
104110
}
105111

106-
auto with_original_arguments = getImpl(name, types_without_low_cardinality, parameters, out_properties, false);
112+
auto with_original_arguments = getImpl(name, types_without_low_cardinality, parameters, out_properties, false, is_changelog_input);
107113

108114
if (!with_original_arguments)
109115
throw Exception("Logical error: AggregateFunctionFactory returned nullptr", ErrorCodes::LOGICAL_ERROR);
110116
return with_original_arguments;
111117
}
112118

113-
119+
/// proton: starts
114120
AggregateFunctionPtr AggregateFunctionFactory::getImpl(
115121
const String & name_param,
116122
const DataTypes & argument_types,
117123
const Array & parameters,
118124
AggregateFunctionProperties & out_properties,
119-
bool has_null_arguments) const
125+
bool has_null_arguments,
126+
bool is_changelog_input) const
127+
/// proton: ends
120128
{
121129
String name = getAliasToOrName(name_param);
122130
bool is_case_insensitive = false;
@@ -194,7 +202,7 @@ AggregateFunctionPtr AggregateFunctionFactory::getImpl(
194202
}
195203

196204
/// proton: starts. Check user defined aggr function
197-
auto aggr = UserDefinedFunctionFactory::getAggregateFunction(name, argument_types, parameters, out_properties);
205+
auto aggr = UserDefinedFunctionFactory::getAggregateFunction(name, argument_types, parameters, out_properties, is_changelog_input);
198206
if (aggr)
199207
return aggr;
200208
/// proton: ends
@@ -211,12 +219,19 @@ AggregateFunctionPtr AggregateFunctionFactory::getImpl(
211219
throw Exception(ErrorCodes::UNKNOWN_AGGREGATE_FUNCTION, "Unknown aggregate function {}{}", name, extra_info);
212220
}
213221

214-
222+
/// proton: starts
215223
AggregateFunctionPtr AggregateFunctionFactory::tryGet(
216-
const String & name, const DataTypes & argument_types, const Array & parameters, AggregateFunctionProperties & out_properties) const
224+
const String & name,
225+
const DataTypes & argument_types,
226+
const Array & parameters,
227+
AggregateFunctionProperties & out_properties,
228+
bool is_changelog_input) const
229+
/// proton: ends
217230
{
218231
return isAggregateFunctionName(name)
219-
? get(name, argument_types, parameters, out_properties)
232+
/// proton: starts
233+
? get(name, argument_types, parameters, out_properties, is_changelog_input)
234+
/// proton: ends
220235
: nullptr;
221236
}
222237

src/AggregateFunctions/AggregateFunctionFactory.h

+10-3
Original file line numberDiff line numberDiff line change
@@ -63,18 +63,22 @@ class AggregateFunctionFactory final : private boost::noncopyable, public IFacto
6363
CaseSensitiveness case_sensitiveness = CaseSensitive);
6464

6565
/// Throws an exception if not found.
66+
/// proton: starts. Add 'is_changelog_input' param to allow aggregate function being aware whether the input stream is a changelog
6667
AggregateFunctionPtr
6768
get(const String & name,
6869
const DataTypes & argument_types,
6970
const Array & parameters,
70-
AggregateFunctionProperties & out_properties) const;
71+
AggregateFunctionProperties & out_properties,
72+
bool is_changelog_input = false) const;
7173

7274
/// Returns nullptr if not found.
7375
AggregateFunctionPtr tryGet(
7476
const String & name,
7577
const DataTypes & argument_types,
7678
const Array & parameters,
77-
AggregateFunctionProperties & out_properties) const;
79+
AggregateFunctionProperties & out_properties,
80+
bool is_changelog_input = false) const;
81+
/// proton: ends
7882

7983
/// Get properties if the aggregate function exists.
8084
std::optional<AggregateFunctionProperties> tryGetProperties(const String & name) const;
@@ -86,12 +90,15 @@ class AggregateFunctionFactory final : private boost::noncopyable, public IFacto
8690
/// proton: ends
8791

8892
private:
93+
/// proton: starts
8994
AggregateFunctionPtr getImpl(
9095
const String & name,
9196
const DataTypes & argument_types,
9297
const Array & parameters,
9398
AggregateFunctionProperties & out_properties,
94-
bool has_null_arguments) const;
99+
bool has_null_arguments,
100+
bool is_changelog_input = false) const;
101+
/// proton: ends
95102

96103
std::optional<AggregateFunctionProperties> tryGetPropertiesImpl(const String & name) const;
97104

src/AggregateFunctions/AggregateFunctionJavaScriptAdapter.cpp

+51-12
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <V8/ConvertDataTypes.h>
77
#include <V8/Utils.h>
88
#include <Common/logger_useful.h>
9+
#include <span>
910

1011
namespace DB
1112
{
@@ -86,10 +87,12 @@ JavaScriptBlueprint::JavaScriptBlueprint(const String & name, const String & sou
8687

8788
{
8889
v8::Local<v8::Value> val;
89-
if (!obj->Get(local_ctx, V8::to_v8(isolate_, "has_customized_emit")).ToLocal(&val) || !val->IsUndefined())
90+
if (obj->Get(local_ctx, V8::to_v8(isolate_, "has_customized_emit")).ToLocal(&val) && !val->IsUndefined())
9091
{
91-
LOG_INFO(&Poco::Logger::get("JavaScriptAggregateFunction"), "JavaScript UDA '{}' has defined its own emit strategy", name);
92-
has_user_defined_emit_strategy = true;
92+
has_user_defined_emit_strategy = V8::from_v8<bool>(isolate_, val);
93+
if (has_user_defined_emit_strategy)
94+
LOG_INFO(
95+
&Poco::Logger::get("JavaScriptAggregateFunction"), "JavaScript UDA '{}' has defined its own emit strategy", name);
9396
}
9497
}
9598

@@ -110,9 +113,19 @@ JavaScriptBlueprint::~JavaScriptBlueprint() noexcept
110113
}
111114

112115
JavaScriptAggrFunctionState::JavaScriptAggrFunctionState(
113-
const JavaScriptBlueprint & blueprint, const std::vector<UserDefinedFunctionConfiguration::Argument> & arguments)
116+
const JavaScriptBlueprint & blueprint,
117+
const std::vector<UserDefinedFunctionConfiguration::Argument> & arguments,
118+
const bool is_changelog_input_)
119+
: is_changelog_input(is_changelog_input_)
114120
{
115121
columns.reserve(arguments.size());
122+
123+
/// check _tp_delta column
124+
if (unlikely(arguments.back().type->getTypeId() != TypeIndex::Int8))
125+
throw Exception(
126+
ErrorCodes::NOT_IMPLEMENTED,
127+
"Tha last argument of JavaScript UDA is '_tp_delta' column, which should be 'int8'. Invalid type.");
128+
116129
for (const auto & arg : arguments)
117130
{
118131
auto col = arg.type->createColumn();
@@ -206,11 +219,28 @@ JavaScriptAggrFunctionState::~JavaScriptAggrFunctionState()
206219

207220
void JavaScriptAggrFunctionState::add(const IColumn ** src_columns, size_t row_num)
208221
{
209-
for (size_t i = 0; auto & col : columns)
210-
{
211-
col->insertFrom(*src_columns[i], row_num);
212-
i++;
213-
}
222+
assert(columns.size() >= 1);
223+
size_t num_of_input_columns = columns.size() - 1;
224+
225+
for (size_t i = 0; i < num_of_input_columns; i++)
226+
columns[i]->insertFrom(*src_columns[i], row_num);
227+
228+
/// _tp_delta column
229+
if (is_changelog_input)
230+
columns.back()->insert(1);
231+
}
232+
233+
void JavaScriptAggrFunctionState::negate(const IColumn ** src_columns, size_t row_num)
234+
{
235+
assert(columns.size() >= 1);
236+
size_t num_of_input_columns = columns.size() - 1;
237+
238+
for (size_t i = 0; i < num_of_input_columns; i++)
239+
columns[i]->insertFrom(*src_columns[i], row_num);
240+
241+
/// _tp_delta column
242+
if (is_changelog_input)
243+
columns.back()->insert(-1);
214244
}
215245

216246
void JavaScriptAggrFunctionState::reinitCache()
@@ -237,10 +267,12 @@ AggregateFunctionJavaScriptAdapter::AggregateFunctionJavaScriptAdapter(
237267
JavaScriptUserDefinedFunctionConfigurationPtr config_,
238268
const DataTypes & types,
239269
const Array & params_,
270+
bool is_changelog_input_,
240271
size_t max_v8_heap_size_in_bytes_)
241272
: IAggregateFunctionHelper<AggregateFunctionJavaScriptAdapter>(types, params_)
242273
, config(config_)
243274
, num_arguments(types.size())
275+
, is_changelog_input(is_changelog_input_)
244276
, max_v8_heap_size_in_bytes(max_v8_heap_size_in_bytes_)
245277
, blueprint(config->name, config->source)
246278
{
@@ -260,7 +292,7 @@ DataTypePtr AggregateFunctionJavaScriptAdapter::getReturnType() const
260292
void AggregateFunctionJavaScriptAdapter::create(AggregateDataPtr __restrict place) const
261293
{
262294
V8::checkHeapLimit(blueprint.isolate.get(), max_v8_heap_size_in_bytes);
263-
new (place) Data(blueprint, config->arguments);
295+
new (place) Data(blueprint, config->arguments, is_changelog_input);
264296
}
265297

266298
/// destroy instance of UDF
@@ -307,6 +339,11 @@ void AggregateFunctionJavaScriptAdapter::add(AggregateDataPtr __restrict place,
307339
this->data(place).add(columns, row_num);
308340
}
309341

342+
void AggregateFunctionJavaScriptAdapter::negate(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const
343+
{
344+
this->data(place).negate(columns, row_num);
345+
}
346+
310347
void AggregateFunctionJavaScriptAdapter::merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const
311348
{
312349
auto & data = this->data(place);
@@ -376,11 +413,13 @@ size_t AggregateFunctionJavaScriptAdapter::flush(AggregateDataPtr __restrict pla
376413
v8::Local<v8::Function> local_func = v8::Local<v8::Function>::New(isolate_, data.process_func);
377414

378415
/// Second, convert the input column into the corresponding object used by UDF
379-
auto argv = V8::prepareArguments(isolate_, config->arguments, data.columns);
416+
/// remove the _tp_delta column if the input stream is not changelog
417+
auto column_size = is_changelog_input ? config->arguments.size() : config->arguments.size() - 1;
418+
auto argv = V8::prepareArguments(isolate_, std::span(config->arguments.begin(), column_size), data.columns);
380419

381420
/// Third, execute the UDF and get aggregate state (only support the final state now, intermediate state is not supported
382421
v8::Local<v8::Value> res;
383-
if (!local_func->Call(ctx, local_obj, static_cast<int>(config->arguments.size()), argv.data()).ToLocal(&res))
422+
if (!local_func->Call(ctx, local_obj, static_cast<int>(column_size), argv.data()).ToLocal(&res))
384423
V8::throwException(
385424
isolate_,
386425
try_catch,

src/AggregateFunctions/AggregateFunctionJavaScriptAdapter.h

+13-1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ struct JavaScriptAggrFunctionState
4242

4343
/// the number of emits, 0 means no emit, >1 means it has some aggregate results to emit
4444
size_t emit_times = 0;
45+
/// whether or not it is used in changelog input stream
46+
bool is_changelog_input = false;
4547

4648
/// JavaScript UDA code looks like below. For far, it can only contain member functions and the has_customized_emit bool data member or function
4749
/// {
@@ -55,12 +57,16 @@ struct JavaScriptAggrFunctionState
5557
/// has_customized_emit : false /// Define if the the aggregation has user defined emit strategy
5658
/// }
5759
JavaScriptAggrFunctionState(
58-
const JavaScriptBlueprint & blueprint, const std::vector<UserDefinedFunctionConfiguration::Argument> & arguments);
60+
const JavaScriptBlueprint & blueprint,
61+
const std::vector<UserDefinedFunctionConfiguration::Argument> & arguments,
62+
const bool is_changelog_input_);
5963

6064
~JavaScriptAggrFunctionState();
6165

6266
void add(const IColumn ** src_columns, size_t row_num);
6367

68+
void negate(const IColumn ** src_columns, size_t row_num);
69+
6470
void reinitCache();
6571
};
6672

@@ -82,6 +88,7 @@ class AggregateFunctionJavaScriptAdapter final : public IAggregateFunctionHelper
8288

8389
const JavaScriptUserDefinedFunctionConfigurationPtr config;
8490
size_t num_arguments;
91+
bool is_changelog_input = false;
8592
size_t max_v8_heap_size_in_bytes;
8693
JavaScriptBlueprint blueprint;
8794

@@ -90,6 +97,8 @@ class AggregateFunctionJavaScriptAdapter final : public IAggregateFunctionHelper
9097
JavaScriptUserDefinedFunctionConfigurationPtr config_,
9198
const DataTypes & types,
9299
const Array & params_,
100+
/// If the input stream is changelog, aggregate function will pass _tp_delta column to JavaScript function
101+
bool is_changelog_input_,
93102
size_t max_v8_heap_size_in_bytes_);
94103

95104
String getName() const override;
@@ -113,6 +122,9 @@ class AggregateFunctionJavaScriptAdapter final : public IAggregateFunctionHelper
113122
/// get instance of UDF from AggregateData and execute UDF
114123
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override;
115124

125+
/// for changelog processing, delete existing row from current aggregation result
126+
void negate(AggregateDataPtr __restrict place, const IColumn ** columns, size_t /*row_num*/, Arena * /*arena*/) const override;
127+
116128
/// Merge with other Aggregate Data, maybe used before finalize result
117129
void merge(AggregateDataPtr __restrict /*place*/, ConstAggregateDataPtr /*rhs*/, Arena *) const override;
118130

src/AggregateFunctions/tests/gtest_uda.cpp

+5-5
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ using namespace DB;
1919

2020
std::unique_ptr<v8::Platform> platform;
2121

22-
String ARGS_CEP1 = R"###([{ "name": "value","type": "int64"}])###";
22+
String ARGS_CEP1 = R"###([{ "name": "value","type": "int64"}, {"name": "_tp_delta", "type": "int8"}])###";
2323
String RETURN_CEP1 = "uint32";
2424
String UDA_CEP1 = R"###(
2525
{
@@ -49,7 +49,7 @@ String UDA_CEP1 = R"###(
4949
has_customized_emit : true
5050
})###";
5151

52-
String ARGS_UDA1 = R"###([{ "name": "value","type": "int64"}])###";
52+
String ARGS_UDA1 = R"###([{ "name": "value","type": "int64"}, {"name": "_tp_delta", "type": "int8"}])###";
5353
String RETURN_UDA1 = "float32";
5454
String UDA1 = R"###(
5555
{
@@ -202,7 +202,7 @@ TEST_F(UDATestCase, add)
202202
DataTypes types = getDataTypes(ARGS_CEP1);
203203
Array params;
204204
size_t max_heap_size = 100 * 1024 * 1024;
205-
auto aggr_function = AggregateFunctionJavaScriptAdapter(config, types, params, max_heap_size);
205+
auto aggr_function = AggregateFunctionJavaScriptAdapter(config, types, params, false, max_heap_size);
206206

207207
ASSERT_TRUE(aggr_function.hasUserDefinedEmit());
208208

@@ -242,7 +242,7 @@ TEST_F(UDATestCase, CheckPoint)
242242
DataTypes types = getDataTypes(ARGS_UDA1);
243243
Array params;
244244
size_t max_heap_size = 100 * 1024 * 1024;
245-
auto aggr_function = AggregateFunctionJavaScriptAdapter(config, types, params, max_heap_size);
245+
auto aggr_function = AggregateFunctionJavaScriptAdapter(config, types, params, false, max_heap_size);
246246

247247
std::unique_ptr<AggregateFunctionJavaScriptAdapter::Data[], AggregateFunctionJavaScriptAdapter::DataDeleter> places{
248248
static_cast<AggregateFunctionJavaScriptAdapter::Data *>(malloc(aggr_function.sizeOfData())),
@@ -284,7 +284,7 @@ TEST_F(UDATestCase, Merge)
284284
DataTypes types = getDataTypes(ARGS_UDA1);
285285
Array params;
286286
size_t max_heap_size = 100 * 1024 * 1024;
287-
auto aggr_function = AggregateFunctionJavaScriptAdapter(config, types, params, max_heap_size);
287+
auto aggr_function = AggregateFunctionJavaScriptAdapter(config, types, params, false, max_heap_size);
288288

289289
std::unique_ptr<AggregateFunctionJavaScriptAdapter::Data[], AggregateFunctionJavaScriptAdapter::DataDeleter> places{
290290
static_cast<AggregateFunctionJavaScriptAdapter::Data *>(malloc(2 * aggr_function.sizeOfData())),

src/Functions/UserDefined/JavaScriptUserDefinedFunction.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include <Functions/UserDefined/JavaScriptUserDefinedFunction.h>
22
#include <V8/ConvertDataTypes.h>
33
#include <V8/Utils.h>
4+
#include <span>
45

56
namespace DB
67
{
@@ -64,7 +65,7 @@ ColumnPtr JavaScriptUserDefinedFunction::userDefinedExecuteImpl(
6465
v8::Local<v8::Function> local_func = v8::Local<v8::Function>::New(isolate_, js_ctx->func);
6566

6667
/// Second, convert the input column into the corresponding object used by UDF
67-
auto argv = V8::prepareArguments(isolate_, config->arguments, columns);
68+
auto argv = V8::prepareArguments(isolate_, std::span(config->arguments), columns);
6869

6970
/// Third, execute the UDF and get result
7071
v8::Local<v8::Value> res;

0 commit comments

Comments
 (0)