Skip to content

Commit a13ade9

Browse files
authored
add join alignment smoke test and fix more (#374)
1 parent 6a4df0f commit a13ade9

File tree

7 files changed

+758
-86
lines changed

7 files changed

+758
-86
lines changed

src/AggregateFunctions/AggregateFunctionJavaScriptAdapter.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -292,10 +292,11 @@ void AggregateFunctionJavaScriptAdapter::addBatchLookupTable8(
292292
std::function<void(AggregateDataPtr &)> init,
293293
const UInt8 * key,
294294
const IColumn ** columns,
295-
Arena * arena) const
295+
Arena * arena,
296+
const IColumn * delta_col) const
296297
{
297298
IAggregateFunctionHelper<AggregateFunctionJavaScriptAdapter>::addBatchLookupTable8(
298-
row_begin, row_end, map, place_offset, init, key, columns, arena);
299+
row_begin, row_end, map, place_offset, init, key, columns, arena, delta_col);
299300

300301
for (size_t cur = row_begin; cur < row_end; ++cur)
301302
flush(map[key[cur]] + place_offset);

src/AggregateFunctions/AggregateFunctionJavaScriptAdapter.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ class AggregateFunctionJavaScriptAdapter final : public IAggregateFunctionHelper
143143
std::function<void(AggregateDataPtr &)> init,
144144
const UInt8 * key,
145145
const IColumn ** columns,
146-
Arena * arena) const override;
146+
Arena * arena,
147+
const IColumn * delta_col = nullptr) const override;
147148
};
148149
}

src/AggregateFunctions/IAggregateFunction.h

Lines changed: 157 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,8 @@ class IAggregateFunction : public std::enable_shared_from_this<IAggregateFunctio
342342
std::function<void(AggregateDataPtr &)> init,
343343
const UInt8 * key,
344344
const IColumn ** columns,
345-
Arena * arena) const = 0;
345+
Arena * arena,
346+
const IColumn * delta_col = nullptr) const = 0;
346347

347348
/** Insert result of aggregate function into result column with batch size.
348349
* The implementation of this method will destroy aggregate place up to -State if insert state into result column was successful.
@@ -787,35 +788,81 @@ class IAggregateFunctionHelper : public IAggregateFunction
787788
std::function<void(AggregateDataPtr &)> init,
788789
const UInt8 * key,
789790
const IColumn ** columns,
790-
Arena * arena) const override
791+
Arena * arena,
792+
const IColumn * delta_col = nullptr) const override
791793
{
792794
static constexpr size_t UNROLL_COUNT = 8;
793795

794-
size_t i = row_begin;
795-
796-
size_t size_unrolled = (row_end - row_begin) / UNROLL_COUNT * UNROLL_COUNT;
797-
for (; i < size_unrolled; i += UNROLL_COUNT)
796+
if (delta_col == nullptr)
798797
{
799-
AggregateDataPtr places[UNROLL_COUNT];
800-
for (size_t j = 0; j < UNROLL_COUNT; ++j)
798+
/// Fast path. non-changelog
799+
size_t i = row_begin;
800+
801+
size_t size_unrolled = (row_end - row_begin) / UNROLL_COUNT * UNROLL_COUNT;
802+
for (; i < size_unrolled; i += UNROLL_COUNT)
801803
{
802-
AggregateDataPtr & place = map[key[i + j]];
803-
if (unlikely(!place))
804-
init(place);
804+
AggregateDataPtr places[UNROLL_COUNT];
805+
for (size_t j = 0; j < UNROLL_COUNT; ++j)
806+
{
807+
AggregateDataPtr & place = map[key[i + j]];
808+
if (unlikely(!place))
809+
init(place);
810+
811+
places[j] = place;
812+
}
805813

806-
places[j] = place;
814+
for (size_t j = 0; j < UNROLL_COUNT; ++j)
815+
static_cast<const Derived *>(this)->add(places[j] + place_offset, columns, i + j, arena);
807816
}
808817

809-
for (size_t j = 0; j < UNROLL_COUNT; ++j)
810-
static_cast<const Derived *>(this)->add(places[j] + place_offset, columns, i + j, arena);
818+
for (; i < row_end; ++i)
819+
{
820+
AggregateDataPtr & place = map[key[i]];
821+
if (unlikely(!place))
822+
init(place);
823+
static_cast<const Derived *>(this)->add(place + place_offset, columns, i, arena);
824+
}
811825
}
812-
813-
for (; i < row_end; ++i)
826+
else
814827
{
815-
AggregateDataPtr & place = map[key[i]];
816-
if (unlikely(!place))
817-
init(place);
818-
static_cast<const Derived *>(this)->add(place + place_offset, columns, i, arena);
828+
/// changelog
829+
const auto & delta_flags = assert_cast<const ColumnInt8 &>(*delta_col).getData();
830+
831+
size_t i = row_begin;
832+
833+
size_t size_unrolled = (row_end - row_begin) / UNROLL_COUNT * UNROLL_COUNT;
834+
for (; i < size_unrolled; i += UNROLL_COUNT)
835+
{
836+
AggregateDataPtr places[UNROLL_COUNT];
837+
for (size_t j = 0; j < UNROLL_COUNT; ++j)
838+
{
839+
AggregateDataPtr & place = map[key[i + j]];
840+
if (unlikely(!place))
841+
init(place);
842+
843+
places[j] = place;
844+
}
845+
846+
for (size_t j = 0; j < UNROLL_COUNT; ++j)
847+
{
848+
if (delta_flags[i] >= 0)
849+
static_cast<const Derived *>(this)->add(places[j] + place_offset, columns, i + j, arena);
850+
else
851+
static_cast<const Derived *>(this)->negate(places[j] + place_offset, columns, i + j, arena);
852+
}
853+
}
854+
855+
for (; i < row_end; ++i)
856+
{
857+
AggregateDataPtr & place = map[key[i]];
858+
if (unlikely(!place))
859+
init(place);
860+
861+
if (delta_flags[i] >= 0)
862+
static_cast<const Derived *>(this)->add(place + place_offset, columns, i, arena);
863+
else
864+
static_cast<const Derived *>(this)->negate(place + place_offset, columns, i, arena);
865+
}
819866
}
820867
}
821868

@@ -940,15 +987,16 @@ class IAggregateFunctionDataHelper : public IAggregateFunctionHelper<Derived>
940987
std::function<void(AggregateDataPtr &)> init,
941988
const UInt8 * key,
942989
const IColumn ** columns,
943-
Arena * arena) const override
990+
Arena * arena,
991+
const IColumn * delta_col = nullptr) const override
944992
{
945993
const Derived & func = *static_cast<const Derived *>(this);
946994

947995
/// If the function is complex or too large, use more generic algorithm.
948996

949997
if (func.allocatesMemoryInArena() || sizeof(Data) > 16 || func.sizeOfData() != sizeof(Data))
950998
{
951-
IAggregateFunctionHelper<Derived>::addBatchLookupTable8(row_begin, row_end, map, place_offset, init, key, columns, arena);
999+
IAggregateFunctionHelper<Derived>::addBatchLookupTable8(row_begin, row_end, map, place_offset, init, key, columns, arena, delta_col);
9521000
return;
9531001
}
9541002

@@ -962,50 +1010,109 @@ class IAggregateFunctionDataHelper : public IAggregateFunctionHelper<Derived>
9621010
size_t i = row_begin;
9631011

9641012
/// Aggregate data into different lookup tables.
965-
966-
size_t size_unrolled = (row_end - row_begin) / UNROLL_COUNT * UNROLL_COUNT;
967-
for (; i < size_unrolled; i += UNROLL_COUNT)
1013+
if (delta_col == nullptr)
9681014
{
969-
for (size_t j = 0; j < UNROLL_COUNT; ++j)
1015+
/// Fast path. non-changelog
1016+
size_t size_unrolled = (row_end - row_begin) / UNROLL_COUNT * UNROLL_COUNT;
1017+
for (; i < size_unrolled; i += UNROLL_COUNT)
9701018
{
971-
size_t idx = j * 256 + key[i + j];
972-
if (unlikely(!has_data[idx]))
1019+
for (size_t j = 0; j < UNROLL_COUNT; ++j)
9731020
{
974-
new (&places[idx]) Data;
975-
has_data[idx] = true;
1021+
size_t idx = j * 256 + key[i + j];
1022+
if (unlikely(!has_data[idx]))
1023+
{
1024+
new (&places[idx]) Data;
1025+
has_data[idx] = true;
1026+
}
1027+
func.add(reinterpret_cast<char *>(&places[idx]), columns, i + j, nullptr);
9761028
}
977-
func.add(reinterpret_cast<char *>(&places[idx]), columns, i + j, nullptr);
9781029
}
979-
}
9801030

981-
/// Merge data from every lookup table to the final destination.
1031+
/// Merge data from every lookup table to the final destination.
9821032

983-
for (size_t k = 0; k < 256; ++k)
1033+
for (size_t k = 0; k < 256; ++k)
1034+
{
1035+
for (size_t j = 0; j < UNROLL_COUNT; ++j)
1036+
{
1037+
size_t idx = j * 256 + k;
1038+
if (has_data[idx])
1039+
{
1040+
AggregateDataPtr & place = map[k];
1041+
if (unlikely(!place))
1042+
init(place);
1043+
1044+
func.merge(place + place_offset, reinterpret_cast<const char *>(&places[idx]), nullptr);
1045+
}
1046+
}
1047+
}
1048+
1049+
/// Process tails and add directly to the final destination.
1050+
1051+
for (; i < row_end; ++i)
1052+
{
1053+
size_t k = key[i];
1054+
AggregateDataPtr & place = map[k];
1055+
if (unlikely(!place))
1056+
init(place);
1057+
1058+
func.add(place + place_offset, columns, i, nullptr);
1059+
}
1060+
}
1061+
else
9841062
{
985-
for (size_t j = 0; j < UNROLL_COUNT; ++j)
1063+
/// changelog
1064+
const auto & delta_flags = assert_cast<const ColumnInt8 &>(*delta_col).getData();
1065+
size_t size_unrolled = (row_end - row_begin) / UNROLL_COUNT * UNROLL_COUNT;
1066+
for (; i < size_unrolled; i += UNROLL_COUNT)
9861067
{
987-
size_t idx = j * 256 + k;
988-
if (has_data[idx])
1068+
for (size_t j = 0; j < UNROLL_COUNT; ++j)
9891069
{
990-
AggregateDataPtr & place = map[k];
991-
if (unlikely(!place))
992-
init(place);
1070+
size_t idx = j * 256 + key[i + j];
1071+
if (unlikely(!has_data[idx]))
1072+
{
1073+
new (&places[idx]) Data;
1074+
has_data[idx] = true;
1075+
}
9931076

994-
func.merge(place + place_offset, reinterpret_cast<const char *>(&places[idx]), nullptr);
1077+
if (delta_flags[i] >= 0)
1078+
func.add(reinterpret_cast<char *>(&places[idx]), columns, i + j, nullptr);
1079+
else
1080+
func.negate(reinterpret_cast<char *>(&places[idx]), columns, i + j, nullptr);
9951081
}
9961082
}
997-
}
9981083

999-
/// Process tails and add directly to the final destination.
1084+
/// Merge data from every lookup table to the final destination.
10001085

1001-
for (; i < row_end; ++i)
1002-
{
1003-
size_t k = key[i];
1004-
AggregateDataPtr & place = map[k];
1005-
if (unlikely(!place))
1006-
init(place);
1086+
for (size_t k = 0; k < 256; ++k)
1087+
{
1088+
for (size_t j = 0; j < UNROLL_COUNT; ++j)
1089+
{
1090+
size_t idx = j * 256 + k;
1091+
if (has_data[idx])
1092+
{
1093+
AggregateDataPtr & place = map[k];
1094+
if (unlikely(!place))
1095+
init(place);
1096+
1097+
func.merge(place + place_offset, reinterpret_cast<const char *>(&places[idx]), nullptr);
1098+
}
1099+
}
1100+
}
1101+
1102+
/// Process tails and add directly to the final destination.
1103+
1104+
for (; i < row_end; ++i)
1105+
{
1106+
size_t k = key[i];
1107+
AggregateDataPtr & place = map[k];
1108+
if (unlikely(!place))
1109+
init(place);
10071110

1008-
func.add(place + place_offset, columns, i, nullptr);
1111+
if (delta_flags[i] >= 0)
1112+
func.add(place + place_offset, columns, i, nullptr);
1113+
else
1114+
func.negate(place + place_offset, columns, i, nullptr);
1115+
}
10091116
}
10101117
}
10111118
};

src/Interpreters/Streaming/Aggregator.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -792,7 +792,8 @@ template <bool no_more_keys, bool use_compiled_functions, typename Method>
792792
},
793793
state.getKeyData(),
794794
inst->batch_arguments,
795-
aggregates_pool);
795+
aggregates_pool,
796+
inst->delta_column);
796797

797798
/// Calculate if we need finalization
798799
if (!need_finalization && inst->batch_that->isUserDefined())
@@ -3924,6 +3925,9 @@ bool Aggregator::executeAndRetractImpl(
39243925
emplace_result.setMapped(aggregate_data);
39253926

39263927
/// Save new group without retracted state (used for emit new key group)
3928+
/// FIXME: There is a bug when use hash table (key8 or key16), it use a optimzed FixedImplicitZeroHashMap that the empty mapped directly means zero (i.e. invalid insertion).
3929+
/// But in retract group scenario, we need to use an empty mapped to represent no ratracted value for new group
3930+
/// Use a non-optimized FixedHashMap ? or revisit retract implementation ?
39273931
retracted_state.emplaceKey(retracted_method.data, i, *retracted_pool).setMapped(nullptr);
39283932
}
39293933
else

0 commit comments

Comments
 (0)