Skip to content

Commit cc67ce0

Browse files
authored
support monotonic function and support _tp_sn, _tp_shard in random s… (#359)
1 parent 8ef6c22 commit cc67ce0

File tree

4 files changed

+287
-22
lines changed

4 files changed

+287
-22
lines changed

src/Functions/FunctionMonotonic.cpp

+113
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
#include <Columns/ColumnConst.h>
2+
#include <Core/Field.h>
3+
#include <DataTypes/DataTypeFactory.h>
4+
#include <DataTypes/DataTypesNumber.h>
5+
#include <Formats/FormatSettings.h>
6+
#include <Functions/FunctionFactory.h>
7+
#include <Functions/IFunction.h>
8+
#include <IO/ReadHelpers.h>
9+
#include <IO/WriteHelpers.h>
10+
#include <base/map.h>
11+
12+
#include <numeric>
13+
14+
15+
namespace DB
16+
{
17+
namespace ErrorCodes
18+
{
19+
extern const int BAD_ARGUMENTS;
20+
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
21+
}
22+
23+
24+
/**
25+
* Function monotonic() returns a monotonically increasing sequence of numbers.
26+
* Now the argument range is min(Int64) ~ max(Int64).
27+
* So it supports passing uint8, uint16, uint32, int8, int16, int32, int64 as argument.
28+
*/
29+
class FuntionMonotonic : public IFunction
30+
{
31+
public:
32+
explicit FuntionMonotonic(Int64 start_num_) : start_num(start_num_) { }
33+
34+
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
35+
36+
size_t getNumberOfArguments() const override { return 1; }
37+
38+
bool isStateful() const override { return true; }
39+
40+
bool isDeterministic() const override { return false; }
41+
42+
bool isDeterministicInScopeOfQuery() const override { return false; }
43+
44+
bool isSuitableForConstantFolding() const override { return false; }
45+
46+
ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr & result_type, size_t input_rows_count) const override
47+
{
48+
auto column = ColumnInt64::create(input_rows_count);
49+
std::iota(column->getData().begin(), column->getData().end(), start_num);
50+
start_num += input_rows_count;
51+
return column;
52+
}
53+
54+
void serialize(WriteBuffer & wb) const override { writeIntBinary<Int64>(start_num, wb); }
55+
56+
void deserialize(ReadBuffer & rb) const override { readIntBinary<Int64>(start_num, rb); }
57+
58+
String getName() const override { return "monotonic"; }
59+
60+
private:
61+
mutable Int64 start_num;
62+
};
63+
64+
class MonotonicOverloadResolver : public IFunctionOverloadResolver
65+
{
66+
public:
67+
static constexpr auto name = "monotonic";
68+
69+
String getName() const override { return name; }
70+
71+
static FunctionOverloadResolverPtr create(ContextPtr) { return std::make_unique<MonotonicOverloadResolver>(); }
72+
73+
size_t getNumberOfArguments() const override { return 1; }
74+
75+
bool isStateful() const override { return true; }
76+
77+
bool isVariadic() const override { return false; }
78+
79+
bool isDeterministic() const override { return false; }
80+
81+
bool isDeterministicInScopeOfQuery() const override { return false; }
82+
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
83+
{
84+
if (arguments.size() != 1)
85+
throw Exception("Function " + getName() + " requires 1 arguments", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
86+
87+
if (!isInteger(arguments[0].type))
88+
throw Exception(
89+
fmt::format("Function {} requires integer argumengt but given {}", getName(), arguments[0].type->getFamilyName()),
90+
ErrorCodes::BAD_ARGUMENTS);
91+
92+
return std::make_unique<DataTypeInt64>();
93+
}
94+
95+
FunctionBasePtr buildImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & return_type) const override
96+
{
97+
if (!isColumnConst(*arguments[0].column))
98+
throw Exception("Function " + getName() + " requires constant argument", ErrorCodes::BAD_ARGUMENTS);
99+
100+
auto start_num = assert_cast<const ColumnConst &>(*arguments[0].column).getInt(0);
101+
return std::make_unique<FunctionToFunctionBaseAdaptor>(
102+
std::make_unique<FuntionMonotonic>(start_num),
103+
collections::map<DataTypes>(arguments, [](const auto & elem) { return elem.type; }),
104+
return_type);
105+
}
106+
};
107+
108+
109+
REGISTER_FUNCTION(Monotonic)
110+
{
111+
factory.registerFunction<MonotonicOverloadResolver>({}, FunctionFactory::CaseInsensitive);
112+
}
113+
}

src/Storages/Streaming/StorageRandom.cpp

+57-19
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
#include <Common/logger_useful.h>
3737
#include <Common/randomSeed.h>
3838

39-
4039
namespace DB
4140
{
4241

@@ -66,7 +65,6 @@ void fillBufferWithRandomData(char * __restrict data, size_t size, pcg64 & rng)
6665
}
6766
}
6867

69-
7068
ColumnPtr fillColumnWithRandomData(const DataTypePtr type, UInt64 limit, pcg64 & rng, ContextPtr context)
7169
{
7270
TypeIndex idx = type->getTypeId();
@@ -353,6 +351,28 @@ ColumnPtr fillColumnWithRandomData(const DataTypePtr type, UInt64 limit, pcg64 &
353351
}
354352
}
355353

354+
ColumnPtr
355+
fillColumnWithData(const DataTypePtr type, UInt64 limit, std::tuple<Int64, Int32, pcg64> & data, ContextPtr context, String col_name)
356+
{
357+
if (col_name == ProtonConsts::RESERVED_SHARD)
358+
{
359+
auto & shard_num = std::get<0>(data);
360+
auto column = type->createColumnConst(limit, shard_num)->convertToFullColumnIfConst();
361+
return column;
362+
}
363+
else if (col_name == ProtonConsts::RESERVED_EVENT_SEQUENCE_ID)
364+
{
365+
auto & sn = std::get<1>(data);
366+
auto column = type->createColumnConst(limit, sn)->convertToFullColumnIfConst();
367+
sn++;
368+
return column;
369+
}
370+
else
371+
{
372+
auto & rng = std::get<2>(data);
373+
return fillColumnWithRandomData(type, limit, rng, context);
374+
}
375+
}
356376

357377
class GenerateRandomSource final : public ISource
358378
{
@@ -366,12 +386,12 @@ class GenerateRandomSource final : public ISource
366386
UInt64 events_per_second_,
367387
UInt64 interval_time_,
368388
bool is_streaming_,
369-
UInt64 total_events_)
389+
UInt64 total_events_,
390+
size_t shard_num_)
370391
: ISource(Nested::flatten(prepareBlockToFill(block_header_)), true, ProcessorID::GenerateRandomSourceID)
371392
, block_size(block_size_)
372393
, block_full(std::move(block_header_))
373394
, our_columns(our_columns_)
374-
, rng(random_seed_)
375395
, context(context_)
376396
, events_per_second(events_per_second_)
377397
, header_chunk(Nested::flatten(block_full.cloneEmpty()).getColumns(), 0)
@@ -380,7 +400,7 @@ class GenerateRandomSource final : public ISource
380400
, log(&Poco::Logger::get("GenerateRandSource"))
381401
{
382402
is_streaming = is_streaming_;
383-
403+
data_generate_helper = std::make_tuple(shard_num_, 1, pcg64(random_seed_));
384404
if (total_events == 0 && !is_streaming)
385405
total_events = events_per_second ? events_per_second : block_size;
386406

@@ -414,8 +434,17 @@ class GenerateRandomSource final : public ISource
414434
!= ProtonConsts::RESERVED_COLUMN_NAMES.end();
415435
if (is_reserved_column || our_columns.hasDefault(elem.name))
416436
continue;
437+
417438
block_to_fill.insert(elem);
418439
}
440+
441+
auto dag
442+
= evaluateMissingDefaults(block_to_fill, block_full.getNamesAndTypesList(), our_columns, context, true, false, true);
443+
if (dag)
444+
{
445+
default_actions = std::make_shared<ExpressionActions>(
446+
std::move(dag), ExpressionActionsSettings::fromContext(context, CompileExpressions::yes));
447+
}
419448
}
420449

421450
String getName() const override { return "Random"; }
@@ -493,7 +522,7 @@ class GenerateRandomSource final : public ISource
493522
Block block_to_fill_as_result(block_to_fill.cloneEmpty());
494523

495524
for (const auto & elem : block_to_fill_as_result)
496-
columns.emplace_back(fillColumnWithRandomData(elem.type, block_size_, rng, context));
525+
columns.emplace_back(fillColumnWithData(elem.type, block_size_, data_generate_helper, context, elem.name));
497526

498527
block_to_fill_as_result.setColumns(columns);
499528

@@ -504,13 +533,8 @@ class GenerateRandomSource final : public ISource
504533
block_to_fill_as_result.insert(
505534
{ColumnConst::create(ColumnUInt8::create(1, 0), block_size_), std::make_shared<DataTypeUInt8>(), "_dummy"});
506535

507-
auto dag = evaluateMissingDefaults(block_to_fill_as_result, block_full.getNamesAndTypesList(), our_columns, context);
508-
if (dag)
509-
{
510-
auto actions = std::make_shared<ExpressionActions>(
511-
std::move(dag), ExpressionActionsSettings::fromContext(context, CompileExpressions::yes));
512-
actions->execute(block_to_fill_as_result);
513-
}
536+
if (default_actions)
537+
default_actions->execute(block_to_fill_as_result);
514538

515539
if (block_to_fill_as_result.has(ProtonConsts::RESERVED_COLUMN_NAMES[0])
516540
&& block_to_fill_as_result.has(ProtonConsts::RESERVED_COLUMN_NAMES[1]))
@@ -528,7 +552,6 @@ class GenerateRandomSource final : public ISource
528552
Block block_full;
529553
Block block_to_fill;
530554
const ColumnsDescription our_columns;
531-
pcg64 rng;
532555
ContextPtr context;
533556
Int64 boundary_time;
534557
UInt64 block_idx_in_window;
@@ -546,6 +569,9 @@ class GenerateRandomSource final : public ISource
546569
UInt64 total_events;
547570
UInt64 generated_events = 0;
548571
Poco::Logger * log;
572+
std::shared_ptr<ExpressionActions> default_actions = nullptr;
573+
// <shard_num, sequence_num, rng>
574+
std::tuple<Int64, Int32, pcg64> data_generate_helper;
549575

550576
static Block & prepareBlockToFill(Block & block)
551577
{
@@ -717,7 +743,8 @@ Pipe StorageRandom::read(
717743
0,
718744
1000,
719745
query_info.syntax_analyzer_result->streaming,
720-
events_share));
746+
events_share,
747+
i));
721748
}
722749

723750
pipes.emplace_back(std::make_shared<GenerateRandomSource>(
@@ -729,7 +756,8 @@ Pipe StorageRandom::read(
729756
0,
730757
1000,
731758
query_info.syntax_analyzer_result->streaming,
732-
events_share + events_remainder));
759+
events_share + events_remainder,
760+
shards - 1));
733761
}
734762
else
735763
{
@@ -743,7 +771,8 @@ Pipe StorageRandom::read(
743771
eps,
744772
1000,
745773
query_info.syntax_analyzer_result->streaming,
746-
max_events));
774+
max_events,
775+
0));
747776
}
748777
}
749778
else
@@ -762,7 +791,8 @@ Pipe StorageRandom::read(
762791
eps_thread,
763792
interval_time,
764793
query_info.syntax_analyzer_result->streaming,
765-
events_share));
794+
events_share,
795+
i));
766796
}
767797

768798
/// The last thread will do the remaining work
@@ -775,9 +805,17 @@ Pipe StorageRandom::read(
775805
eps_thread + remainder,
776806
interval_time,
777807
query_info.syntax_analyzer_result->streaming,
778-
events_share + events_remainder));
808+
events_share + events_remainder,
809+
shards - 1));
779810
}
780811
return Pipe::unitePipes(std::move(pipes));
781812
}
782813

814+
NamesAndTypesList StorageRandom::getVirtuals() const
815+
{
816+
return NamesAndTypesList {
817+
{ProtonConsts::RESERVED_EVENT_SEQUENCE_ID, std::make_shared<DataTypeInt64>()},
818+
{ProtonConsts::RESERVED_SHARD, std::make_shared<DataTypeInt32>()},
819+
};
820+
}
783821
}

src/Storages/Streaming/StorageRandom.h

+2
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ class StorageRandom final : public shared_ptr_helper<StorageRandom>, public ISto
5959
size_t max_block_size,
6060
size_t num_streams) override;
6161

62+
NamesAndTypesList getVirtuals() const override;
63+
6264
bool supportsStreamingQuery() const override { return true; }
6365
bool hasEvenlyDistributedRead() const override { return true; }
6466

0 commit comments

Comments
 (0)