Skip to content

Commit e931bb5

Browse files
authored
fix: cannot load kafka external streams created before sharding_expr feature (#351)
1 parent f45ce2b commit e931bb5

File tree

7 files changed

+72
-58
lines changed

7 files changed

+72
-58
lines changed

src/Interpreters/InterpreterCreateQuery.cpp

+4-4
Original file line numberDiff line numberDiff line change
@@ -903,20 +903,20 @@ void InterpreterCreateQuery::handleExternalStreamCreation(ASTCreateQuery & creat
903903

904904
sharding_expr_field.tryGet<String>(sharding_expr);
905905

906-
ASTPtr sharding_expr_ast;
907906
if (sharding_expr.empty())
908907
{
909-
sharding_expr_ast = makeASTFunction("rand");
908+
create.storage->set(create.storage->engine, makeASTFunction("ExternalStream"));
910909
}
911910
else
912911
{
913912
ParserFunction parser;
914913
const char * begin{sharding_expr.data()};
915914
const char * end{begin + sharding_expr.size()};
916-
sharding_expr_ast = parseQuery(parser, begin, end, "", 0, 0);
915+
auto sharding_expr_ast = parseQuery(parser, begin, end, "", 0, 0);
916+
917+
create.storage->set(create.storage->engine, makeASTFunction("ExternalStream", sharding_expr_ast));
917918
}
918919

919-
create.storage->set(create.storage->engine, makeASTFunction("ExternalStream", sharding_expr_ast));
920920

921921
if (create.storage->engine->name != "ExternalStream")
922922
throw Exception(ErrorCodes::INCORRECT_QUERY, "External stream requires ExternalStream engine");

src/Storages/ExternalStream/Kafka/Kafka.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@ extern const int INVALID_SETTING_VALUE;
2525
extern const int RESOURCE_NOT_FOUND;
2626
}
2727

28-
Kafka::Kafka(IStorage * storage, std::unique_ptr<ExternalStreamSettings> settings_, ASTPtr partitioning_expr_, bool attach)
28+
Kafka::Kafka(IStorage * storage, std::unique_ptr<ExternalStreamSettings> settings_, const ASTs & engine_args_, bool attach)
2929
: storage_id(storage->getStorageID())
3030
, settings(std::move(settings_))
3131
, data_format(settings->data_format.value)
3232
, log(&Poco::Logger::get("External-" + settings->topic.value))
33-
, partitioning_expr(std::move(partitioning_expr_))
33+
, engine_args(engine_args_)
3434
{
3535
assert(settings->type.value == StreamTypes::KAFKA || settings->type.value == StreamTypes::REDPANDA);
3636

src/Storages/ExternalStream/Kafka/Kafka.h

+4-3
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ class IStorage;
1313
class Kafka final : public StorageExternalStreamImpl
1414
{
1515
public:
16-
Kafka(IStorage * storage, std::unique_ptr<ExternalStreamSettings> settings_, ASTPtr partitioning_expr, bool attach);
16+
Kafka(IStorage * storage, std::unique_ptr<ExternalStreamSettings> settings_, const ASTs & engine_args_, bool attach);
1717
~Kafka() override = default;
1818

1919
void startup() override { }
@@ -42,7 +42,8 @@ class Kafka final : public StorageExternalStreamImpl
4242
const String & password() const { return settings->password.value; }
4343
const String & sslCaCertFile() const { return settings->ssl_ca_cert_file.value; }
4444
const klog::KConfParams & properties() const { return kafka_properties; }
45-
const ASTPtr & partitioning_expr_ast() const { return partitioning_expr; }
45+
bool hasCustomShardingExpr() const { return !engine_args.empty(); }
46+
const ASTPtr & shardingExprAst() const { assert(!engine_args.empty()); return engine_args[0]; }
4647

4748
private:
4849
void calculateDataFormat(const IStorage * storage);
@@ -61,7 +62,7 @@ class Kafka final : public StorageExternalStreamImpl
6162

6263
NamesAndTypesList virtual_column_names_and_types;
6364
klog::KConfParams kafka_properties;
64-
ASTPtr partitioning_expr;
65+
const ASTs engine_args;
6566

6667
std::mutex shards_mutex;
6768
int32_t shards = 0;

src/Storages/ExternalStream/Kafka/KafkaSink.cpp

+40-30
Original file line numberDiff line numberDiff line change
@@ -24,55 +24,62 @@ extern const int TYPE_MISMATCH;
2424

2525
namespace KafkaStream
2626
{
27-
ChunkPartitioner::ChunkPartitioner(ContextPtr context, const Block & header, const ASTPtr & partitioning_expr_ast)
27+
void ChunkSharder::useRandomSharding()
2828
{
29-
/// `InterpreterCreateQuery::handleExternalStreamCreation` ensures this
30-
assert(partitioning_expr_ast);
29+
random_sharding = true;
30+
std::random_device r;
31+
rand = std::minstd_rand(r());
32+
}
33+
34+
ChunkSharder::ChunkSharder(ContextPtr context, const Block & header, const ASTPtr & sharding_expr_ast)
35+
{
36+
assert(sharding_expr_ast);
3137

32-
ASTPtr query = partitioning_expr_ast;
38+
ASTPtr query = sharding_expr_ast;
3339
auto syntax_result = TreeRewriter(context).analyze(query, header.getNamesAndTypesList());
34-
partitioning_expr = ExpressionAnalyzer(query, syntax_result, context).getActions(false);
40+
sharding_expr = ExpressionAnalyzer(query, syntax_result, context).getActions(false);
3541

36-
partitioning_key_column_name = partitioning_expr_ast->getColumnName();
42+
sharding_key_column_name = sharding_expr_ast->getColumnName();
3743

38-
if (auto * shard_func = partitioning_expr_ast->as<ASTFunction>())
44+
if (auto * shard_func = sharding_expr_ast->as<ASTFunction>())
3945
{
4046
if (shard_func->name == "rand" || shard_func->name == "RAND")
41-
{
42-
random_partitioning = true;
43-
std::random_device r;
44-
rand = std::minstd_rand(r());
45-
}
47+
this->useRandomSharding();
4648
}
4749
}
4850

49-
BlocksWithShard ChunkPartitioner::partition(Block block, Int32 partition_cnt) const
51+
ChunkSharder::ChunkSharder()
52+
{
53+
this->useRandomSharding();
54+
}
55+
56+
BlocksWithShard ChunkSharder::shard(Block block, Int32 shard_cnt) const
5057
{
5158
/// no topics have zero partitions
52-
assert(partition_cnt > 0);
59+
assert(shard_cnt > 0);
5360

54-
if (partition_cnt == 1)
61+
if (shard_cnt == 1)
5562
return {BlockWithShard{Block(std::move(block)), 0}};
5663

57-
if (random_partitioning)
58-
return {BlockWithShard{Block(std::move(block)), getNextShardIndex(partition_cnt)}};
64+
if (random_sharding)
65+
return {BlockWithShard{Block(std::move(block)), getNextShardIndex(shard_cnt)}};
5966

60-
return doParition(std::move(block), partition_cnt);
67+
return doSharding(std::move(block), shard_cnt);
6168
}
6269

63-
BlocksWithShard ChunkPartitioner::doParition(Block block, Int32 partition_cnt) const
70+
BlocksWithShard ChunkSharder::doSharding(Block block, Int32 shard_cnt) const
6471
{
65-
auto selector = createSelector(block, partition_cnt);
72+
auto selector = createSelector(block, shard_cnt);
6673

67-
Blocks partitioned_blocks{static_cast<size_t>(partition_cnt)};
74+
Blocks partitioned_blocks{static_cast<size_t>(shard_cnt)};
6875

69-
for (Int32 i = 0; i < partition_cnt; ++i)
76+
for (Int32 i = 0; i < shard_cnt; ++i)
7077
partitioned_blocks[i] = block.cloneEmpty();
7178

7279
for (size_t pos = 0; pos < block.columns(); ++pos)
7380
{
74-
MutableColumns partitioned_columns = block.getByPosition(pos).column->scatter(partition_cnt, selector);
75-
for (Int32 i = 0; i < partition_cnt; ++i)
81+
MutableColumns partitioned_columns = block.getByPosition(pos).column->scatter(shard_cnt, selector);
82+
for (Int32 i = 0; i < shard_cnt; ++i)
7683
partitioned_blocks[i].getByPosition(pos).column = std::move(partitioned_columns[i]);
7784
}
7885

@@ -89,14 +96,14 @@ BlocksWithShard ChunkPartitioner::doParition(Block block, Int32 partition_cnt) c
8996
return blocks_with_shard;
9097
}
9198

92-
IColumn::Selector ChunkPartitioner::createSelector(Block block, Int32 partition_cnt) const
99+
IColumn::Selector ChunkSharder::createSelector(Block block, Int32 shard_cnt) const
93100
{
94-
std::vector<UInt64> slot_to_shard(partition_cnt);
101+
std::vector<UInt64> slot_to_shard(shard_cnt);
95102
std::iota(slot_to_shard.begin(), slot_to_shard.end(), 0);
96103

97-
partitioning_expr->execute(block);
104+
sharding_expr->execute(block);
98105

99-
const auto & key_column = block.getByName(partitioning_key_column_name);
106+
const auto & key_column = block.getByName(sharding_key_column_name);
100107

101108
/// If key_column.type is DataTypeLowCardinality, do shard according to its dictionaryType
102109
#define CREATE_FOR_TYPE(TYPE) \
@@ -236,7 +243,10 @@ KafkaSink::KafkaSink(const Kafka * kafka, const Block & header, ContextPtr conte
236243
writer = FormatFactory::instance().getOutputFormat(data_format, *wb, header, context);
237244
writer->setAutoFlush();
238245

239-
partitioner = std::make_unique<KafkaStream::ChunkPartitioner>(context, header, kafka->partitioning_expr_ast());
246+
if (kafka->hasCustomShardingExpr())
247+
partitioner = std::make_unique<KafkaStream::ChunkSharder>(context, header, kafka->shardingExprAst());
248+
else
249+
partitioner = std::make_unique<KafkaStream::ChunkSharder>();
240250

241251
polling_threads.scheduleOrThrowOnError([this]() {
242252
while (!is_finished.test())
@@ -251,7 +261,7 @@ void KafkaSink::consume(Chunk chunk)
251261
return;
252262

253263
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
254-
auto blocks = partitioner->partition(std::move(block), partition_cnt);
264+
auto blocks = partitioner->shard(std::move(block), partition_cnt);
255265

256266
for (auto & blockWithShard : blocks)
257267
{

src/Storages/ExternalStream/Kafka/KafkaSink.h

+14-11
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,26 @@ namespace DB
1919

2020
namespace KafkaStream
2121
{
22-
/// Partition Chunk's to paritions (i.e. shards) by the partitioning expression.
23-
class ChunkPartitioner
22+
/// Shard Chunk's to shards (or partitions in Kafka's term) by the sharding expression.
23+
class ChunkSharder
2424
{
2525
public:
26-
ChunkPartitioner(ContextPtr context, const Block & header, const ASTPtr & partitioning_expr_ast);
27-
BlocksWithShard partition(Block block, Int32 partition_cnt) const;
26+
ChunkSharder(ContextPtr context, const Block & header, const ASTPtr & sharding_expr_ast);
27+
ChunkSharder();
28+
29+
BlocksWithShard shard(Block block, Int32 shard_cnt) const;
2830

2931
private:
30-
Int32 getNextShardIndex(Int32 partition_cnt) const noexcept { return static_cast<Int32>(rand()) % partition_cnt; }
32+
void useRandomSharding();
33+
Int32 getNextShardIndex(Int32 shard_cnt) const noexcept { return static_cast<Int32>(rand()) % shard_cnt; }
3134

32-
BlocksWithShard doParition(Block block, Int32 partition_cnt) const;
35+
BlocksWithShard doSharding(Block block, Int32 shard_cnt) const;
3336

34-
IColumn::Selector createSelector(Block block, Int32 partition_cnt) const;
37+
IColumn::Selector createSelector(Block block, Int32 shard_cnt) const;
3538

36-
ExpressionActionsPtr partitioning_expr;
37-
String partitioning_key_column_name;
38-
bool random_partitioning = false;
39+
ExpressionActionsPtr sharding_expr;
40+
String sharding_key_column_name;
41+
bool random_sharding = false;
3942
mutable std::minstd_rand rand;
4043
};
4144
}
@@ -87,7 +90,7 @@ class KafkaSink final : public SinkToStorage
8790
ThreadPool polling_threads;
8891
std::atomic_flag is_finished;
8992
Int32 partition_cnt;
90-
std::unique_ptr<KafkaStream::ChunkPartitioner> partitioner;
93+
std::unique_ptr<KafkaStream::ChunkSharder> partitioner;
9194

9295
Poco::Logger * log;
9396
};

src/Storages/ExternalStream/StorageExternalStream.cpp

+7-7
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ buildShardingKeyExpression(ASTPtr sharding_key, ContextPtr context, const NamesA
3939
}
4040

4141
void validateEngineArgs(ContextPtr context, ASTs & engine_args, const ColumnsDescription & columns) {
42-
if (engine_args.size() == 0)
43-
throw Exception(ErrorCodes::BAD_ARGUMENTS, "ExternalStream requires sharding key expression");
42+
if (engine_args.empty())
43+
return;
4444

4545
auto sharding_expr = buildShardingKeyExpression(engine_args[0], context, columns.getAllPhysical());
4646
const auto & block = sharding_expr->getSampleBlock();
@@ -55,13 +55,13 @@ void validateEngineArgs(ContextPtr context, ASTs & engine_args, const ColumnsDes
5555
}
5656

5757
std::unique_ptr<StorageExternalStreamImpl> createExternalStream(
58-
IStorage * storage, std::unique_ptr<ExternalStreamSettings> settings, ContextPtr & context [[maybe_unused]], const ASTPtr & sharding_expr, bool attach)
58+
IStorage * storage, std::unique_ptr<ExternalStreamSettings> settings, ContextPtr & context [[maybe_unused]], const ASTs & engine_args, bool attach)
5959
{
6060
if (settings->type.value.empty())
6161
throw Exception(ErrorCodes::BAD_ARGUMENTS, "External stream type is required in settings");
6262

6363
if (settings->type.value == StreamTypes::KAFKA || settings->type.value == StreamTypes::REDPANDA)
64-
return std::make_unique<Kafka>(storage, std::move(settings), sharding_expr, attach);
64+
return std::make_unique<Kafka>(storage, std::move(settings), engine_args, attach);
6565
#ifdef OS_LINUX
6666
else if (settings->type.value == StreamTypes::LOG && context->getSettingsRef()._tp_enable_log_stream_expr.value)
6767
return std::make_unique<FileLog>(storage, std::move(settings));
@@ -130,7 +130,7 @@ SinkToStoragePtr StorageExternalStream::write(const ASTPtr & query, const Storag
130130
}
131131

132132
StorageExternalStream::StorageExternalStream(
133-
const ASTPtr & sharding_key_,
133+
const ASTs & engine_args,
134134
const StorageID & table_id_,
135135
ContextPtr context_,
136136
const ColumnsDescription & columns_,
@@ -143,7 +143,7 @@ StorageExternalStream::StorageExternalStream(
143143
setInMemoryMetadata(storage_metadata);
144144

145145

146-
auto stream = createExternalStream(this, std::move(external_stream_settings_), context_, sharding_key_, attach);
146+
auto stream = createExternalStream(this, std::move(external_stream_settings_), context_, engine_args, attach);
147147
external_stream.swap(stream);
148148
}
149149

@@ -161,7 +161,7 @@ void registerStorageExternalStream(StorageFactory & factory)
161161
external_stream_settings->loadFromQuery(*args.storage_def);
162162

163163
return StorageExternalStream::create(
164-
args.engine_args[0], args.table_id, args.getContext(), args.columns, std::move(external_stream_settings), args.attach);
164+
args.engine_args, args.table_id, args.getContext(), args.columns, std::move(external_stream_settings), args.attach);
165165
}
166166
else
167167
throw Exception(ErrorCodes::BAD_ARGUMENTS, "External stream requires correct settings setup");

src/Storages/ExternalStream/StorageExternalStream.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ class StorageExternalStream final : public shared_ptr_helper<StorageExternalStre
5252

5353
protected:
5454
StorageExternalStream(
55-
const ASTPtr & sharding_key_,
55+
const ASTs & engine_args,
5656
const StorageID & table_id_,
5757
ContextPtr context_,
5858
const ColumnsDescription & columns_,

0 commit comments

Comments
 (0)