Skip to content

Commit 81ee54d

Browse files
authored
feat: support message key for Kafka external stream (#434)
1 parent b74a0e5 commit 81ee54d

11 files changed

+420
-276
lines changed

src/KafkaLog/KafkaWALCommon.cpp

+25-22
Original file line numberDiff line numberDiff line change
@@ -194,41 +194,44 @@ nlog::RecordPtr kafkaMsgToRecord(rd_kafka_message_t * msg, const nlog::SchemaCon
194194
return record;
195195
}
196196

197-
DescribeResult describeTopic(const String & name, struct rd_kafka_s * rk, Poco::Logger * log)
197+
DescribeResult describeTopic(rd_kafka_topic_t * rkt, struct rd_kafka_s * rk, Poco::Logger * log)
198198
{
199-
std::shared_ptr<rd_kafka_topic_t> topic_handle{rd_kafka_topic_new(rk, name.c_str(), nullptr), rd_kafka_topic_destroy};
200-
201-
if (!topic_handle)
202-
{
203-
LOG_ERROR(log, "Failed to describe topic, can't create topic handle");
204-
return {.err = DB::ErrorCodes::UNKNOWN_EXCEPTION};
205-
}
206-
207199
const struct rd_kafka_metadata * metadata = nullptr;
208200

209-
auto err = rd_kafka_metadata(rk, 0, topic_handle.get(), &metadata, 5000);
201+
auto err = rd_kafka_metadata(rk, 0, rkt, &metadata, 5000);
210202
if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
211203
{
212204
LOG_ERROR(log, "Failed to describe topic, error={}", rd_kafka_err2str(err));
213205
return {.err = mapErrorCode(err)};
214206
}
215207

216-
for (int32_t i = 0; i < metadata->topic_cnt; ++i)
208+
if (metadata->topic_cnt < 1)
217209
{
218-
if (name == metadata->topics[i].topic)
219-
{
220-
auto partition_cnt = metadata->topics[i].partition_cnt;
221-
rd_kafka_metadata_destroy(metadata);
222-
223-
if (partition_cnt > 0)
224-
return {.err = DB::ErrorCodes::OK, .partitions = partition_cnt};
225-
else
226-
return {.err = DB::ErrorCodes::RESOURCE_NOT_FOUND};
227-
}
210+
rd_kafka_metadata_destroy(metadata);
211+
return {.err = DB::ErrorCodes::RESOURCE_NOT_FOUND};
228212
}
229213

214+
assert(metadata->topic_cnt == 1);
215+
216+
auto partition_cnt = metadata->topics[0].partition_cnt;
230217
rd_kafka_metadata_destroy(metadata);
231-
return {.err = DB::ErrorCodes::RESOURCE_NOT_FOUND};
218+
if (partition_cnt > 0)
219+
return {.err = DB::ErrorCodes::OK, .partitions = partition_cnt};
220+
else
221+
return {.err = DB::ErrorCodes::RESOURCE_NOT_FOUND};
222+
}
223+
224+
DescribeResult describeTopic(const String & name, struct rd_kafka_s * rk, Poco::Logger * log)
225+
{
226+
std::shared_ptr<rd_kafka_topic_t> topic_handle{rd_kafka_topic_new(rk, name.c_str(), nullptr), rd_kafka_topic_destroy};
227+
228+
if (!topic_handle)
229+
{
230+
LOG_ERROR(log, "Failed to describe topic, can't create topic handle");
231+
return {.err = DB::ErrorCodes::UNKNOWN_EXCEPTION};
232+
}
233+
234+
return describeTopic(topic_handle.get(), rk, log);
232235
}
233236

234237
std::vector<int64_t> getOffsetsForTimestamps(

src/KafkaLog/KafkaWALCommon.h

+1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ initRdKafkaTopicHandle(const std::string & topic, KConfParams & params, rd_kafka
4747
nlog::RecordPtr kafkaMsgToRecord(rd_kafka_message_t * msg, const nlog::SchemaContext & schema_ctx, bool copy_topic = false);
4848

4949
DescribeResult describeTopic(const String & name, struct rd_kafka_s * rk, Poco::Logger * log);
50+
DescribeResult describeTopic(rd_kafka_topic_t * rkt, struct rd_kafka_s * rk, Poco::Logger * log);
5051

5152
std::vector<int64_t> getOffsetsForTimestamps(
5253
struct rd_kafka_s * rd_handle,

src/Storages/ExternalStream/ExternalStreamSettings.h

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ class ASTStorage;
1919
M(String, ssl_ca_cert_file, "", "The path of ssl ca cert file", 0) \
2020
M(String, properties, "", "A semi-colon-separated key-value pairs for configuring the kafka client used by the external stream. A key-value pair is separated by a equal sign. Example: 'client.id=my-client-id;group.id=my-group-id'. Note, not all properties are supported, please check the document for supported properties.", 0) \
2121
M(String, sharding_expr, "", "An expression which will be evaluated on each row of data returned by the query to calculate the an integer which will be used to determine the ID of the partition to which the row of data will be sent. If not set, data are sent to any partition randomly.", 0) \
22+
M(String, message_key, "", "An expression which will be evaluated on each row of data returned by the query to compute a string which will be used as the message key.", 0) \
2223
M(Bool, one_message_per_row, false, "If set to true, when send data to the Kafka external stream with row-based data format like `JSONEachRow`, it will produce one message per row.", 0) \
2324
/* those are log related settings */ \
2425
M(String, log_files, "", "A comma-separated list of log files", 0) \

src/Storages/ExternalStream/Kafka/Kafka.cpp

+59-12
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,22 @@
1-
#include "Kafka.h"
2-
#include "KafkaSink.h"
3-
#include "KafkaSource.h"
4-
51
#include <DataTypes/DataTypeDateTime64.h>
62
#include <DataTypes/DataTypesNumber.h>
73
#include <Interpreters/Context.h>
4+
#include <Interpreters/ExpressionAnalyzer.h>
5+
#include <Interpreters/TreeRewriter.h>
86
#include <KafkaLog/KafkaWALPool.h>
7+
#include <Parsers/ExpressionListParsers.h>
98
#include <Storages/ExternalStream/ExternalStreamTypes.h>
9+
#include <Storages/ExternalStream/Kafka/Kafka.h>
10+
#include <Storages/ExternalStream/Kafka/KafkaSink.h>
11+
#include <Storages/ExternalStream/Kafka/KafkaSource.h>
1012
#include <Storages/IStorage.h>
1113
#include <Storages/SelectQueryInfo.h>
1214
#include <Common/ProtonCommon.h>
1315
#include <Common/logger_useful.h>
16+
#include <Parsers/ASTFunction.h>
1417

1518
#include <boost/algorithm/string/classification.hpp>
19+
#include <boost/algorithm/string/predicate.hpp>
1620
#include <boost/algorithm/string/split.hpp>
1721

1822
#include <ranges>
@@ -26,19 +30,20 @@ extern const int INVALID_SETTING_VALUE;
2630
extern const int RESOURCE_NOT_FOUND;
2731
}
2832

29-
Kafka::Kafka(IStorage * storage, std::unique_ptr<ExternalStreamSettings> settings_, const ASTs & engine_args_, bool attach, ExternalStreamCounterPtr external_stream_counter_)
33+
Kafka::Kafka(IStorage * storage, std::unique_ptr<ExternalStreamSettings> settings_, const ASTs & engine_args_, bool attach, ExternalStreamCounterPtr external_stream_counter_, ContextPtr context)
3034
: StorageExternalStreamImpl(std::move(settings_))
3135
, storage_id(storage->getStorageID())
32-
, data_format(StorageExternalStreamImpl::dataFormat())
33-
, log(&Poco::Logger::get("External-" + settings->topic.value))
3436
, engine_args(engine_args_)
37+
, kafka_properties(klog::parseProperties(settings->properties.value))
38+
, data_format(StorageExternalStreamImpl::dataFormat())
3539
, auth_info(std::make_unique<klog::KafkaWALAuth>(
3640
settings->security_protocol.value,
3741
settings->username.value,
3842
settings->password.value,
3943
settings->sasl_mechanism.value,
4044
settings->ssl_ca_cert_file.value))
4145
, external_stream_counter(external_stream_counter_)
46+
, logger(&Poco::Logger::get("External-" + settings->topic.value))
4247
{
4348
assert(settings->type.value == StreamTypes::KAFKA || settings->type.value == StreamTypes::REDPANDA);
4449
assert(external_stream_counter);
@@ -49,7 +54,15 @@ Kafka::Kafka(IStorage * storage, std::unique_ptr<ExternalStreamSettings> setting
4954
if (settings->topic.value.empty())
5055
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Empty `topic` setting for {} external stream", settings->type.value);
5156

52-
kafka_properties = klog::parseProperties(settings->properties.value);
57+
if (!settings->message_key.value.empty())
58+
{
59+
validateMessageKey(settings->message_key.value, storage, context);
60+
61+
/// When message_key is set, each row should be sent as one message, it doesn't make any sense otherwise.
62+
if (settings->isChanged("one_message_per_row") && !settings->one_message_per_row)
63+
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "`one_message_per_row` cannot be set to `false` when `message_key` is set");
64+
settings->set("one_message_per_row", true);
65+
}
5366

5467
calculateDataFormat(storage);
5568

@@ -60,6 +73,16 @@ Kafka::Kafka(IStorage * storage, std::unique_ptr<ExternalStreamSettings> setting
6073
validate();
6174
}
6275

76+
bool Kafka::hasCustomShardingExpr() const {
77+
if (engine_args.empty())
78+
return false;
79+
80+
if (auto * shard_func = shardingExprAst()->as<ASTFunction>())
81+
return !boost::iequals(shard_func->name, "rand");
82+
83+
return true;
84+
}
85+
6386
Pipe Kafka::read(
6487
const Names & column_names,
6588
const StorageSnapshotPtr & storage_snapshot,
@@ -76,7 +99,7 @@ Pipe Kafka::read(
7699
{
77100
shards_to_query = parseShards(context->getSettingsRef().shards.value);
78101
validate(shards_to_query);
79-
LOG_INFO(log, "reading from [{}] partitions for topic={}", fmt::join(shards_to_query, ","), settings->topic.value);
102+
LOG_INFO(logger, "reading from [{}] partitions for topic={}", fmt::join(shards_to_query, ","), settings->topic.value);
80103
}
81104
else
82105
{
@@ -120,11 +143,11 @@ Pipe Kafka::read(
120143
assert(offsets.size() == shards_to_query.size());
121144
for (auto [shard, offset] : std::ranges::views::zip(shards_to_query, offsets))
122145
pipes.emplace_back(
123-
std::make_shared<KafkaSource>(this, header, storage_snapshot, context, shard, offset, max_block_size, log, external_stream_counter));
146+
std::make_shared<KafkaSource>(this, header, storage_snapshot, context, shard, offset, max_block_size, logger, external_stream_counter));
124147
}
125148

126149
LOG_INFO(
127-
log,
150+
logger,
128151
"Starting reading {} streams by seeking to {} in dedicated resource group",
129152
pipes.size(),
130153
query_info.seek_to_info->getSeekTo());
@@ -228,6 +251,30 @@ std::vector<int32_t> Kafka::parseShards(const std::string & shards_setting)
228251
return specified_shards;
229252
}
230253

254+
void Kafka::validateMessageKey(const String & message_key_, IStorage * storage, const ContextPtr & context)
255+
{
256+
const auto & key = message_key_.c_str();
257+
Tokens tokens(key, key + message_key_.size(), 0);
258+
IParser::Pos pos(tokens, 0);
259+
Expected expected;
260+
ParserExpression p_id;
261+
if (!p_id.parse(pos, message_key_ast, expected))
262+
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "message_key was not a valid expression, parse failed at {}, expected {}", expected.max_parsed_pos, fmt::join(expected.variants, ", "));
263+
264+
if (!pos->isEnd())
265+
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "message_key must be a single expression, got extra characters: {}", expected.max_parsed_pos);
266+
267+
auto syntax_result = TreeRewriter(context).analyze(message_key_ast, storage->getInMemoryMetadata().getColumns().getAllPhysical());
268+
auto analyzer = ExpressionAnalyzer(message_key_ast, syntax_result, context).getActions(true);
269+
const auto & block = analyzer->getSampleBlock();
270+
if (block.columns() != 1)
271+
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "message_key expression must return exactly one column");
272+
273+
auto type_id = block.getByPosition(0).type->getTypeId();
274+
if (type_id != TypeIndex::String && type_id != TypeIndex::FixedString)
275+
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "message_key must have type of string");
276+
}
277+
231278
/// Validate the topic still exists, specified partitions are still valid etc
232279
void Kafka::validate(const std::vector<int32_t> & shards_to_query)
233280
{
@@ -267,6 +314,6 @@ SinkToStoragePtr Kafka::write(const ASTPtr & /*query*/, const StorageMetadataPtr
267314
{
268315
/// always validate before actual use
269316
validate();
270-
return std::make_shared<KafkaSink>(this, metadata_snapshot->getSampleBlock(), context, shards, log);
317+
return std::make_shared<KafkaSink>(this, metadata_snapshot->getSampleBlock(), shards, message_key_ast, context, logger);
271318
}
272319
}

src/Storages/ExternalStream/Kafka/Kafka.h

+10-10
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ class IStorage;
1515
class Kafka final : public StorageExternalStreamImpl
1616
{
1717
public:
18-
Kafka(IStorage * storage, std::unique_ptr<ExternalStreamSettings> settings_, const ASTs & engine_args_, bool attach, ExternalStreamCounterPtr external_stream_counter_);
18+
Kafka(IStorage * storage, std::unique_ptr<ExternalStreamSettings> settings_, const ASTs & engine_args_, bool attach, ExternalStreamCounterPtr external_stream_counter_, ContextPtr context);
1919
~Kafka() override = default;
2020

2121
void startup() override { }
@@ -41,32 +41,32 @@ class Kafka final : public StorageExternalStreamImpl
4141
const String & topic() const { return settings->topic.value; }
4242
const klog::KConfParams & properties() const { return kafka_properties; }
4343
const klog::KafkaWALAuth & auth() const noexcept { return *auth_info; }
44-
bool hasCustomShardingExpr() const { return !engine_args.empty(); }
4544
const ASTPtr & shardingExprAst() const { assert(!engine_args.empty()); return engine_args[0]; }
45+
bool hasCustomShardingExpr() const;
4646
klog::KafkaWALSimpleConsumerPtr getConsumer(int32_t fetch_wait_max_ms = 200) const;
4747

4848
private:
4949
void calculateDataFormat(const IStorage * storage);
5050
void cacheVirtualColumnNamesAndTypes();
5151
std::vector<Int64> getOffsets(const SeekToInfoPtr & seek_to_info, const std::vector<int32_t> & shards_to_query) const;
52+
void validateMessageKey(const String & message_key, IStorage * storage, const ContextPtr & context);
5253
void validate(const std::vector<int32_t> & shards_to_query = {});
53-
5454
static std::vector<int32_t> parseShards(const std::string & shards_setting);
5555

56-
private:
5756
StorageID storage_id;
57+
ASTs engine_args;
58+
klog::KConfParams kafka_properties;
5859
String data_format;
59-
60-
Poco::Logger * log;
60+
const std::unique_ptr<klog::KafkaWALAuth> auth_info;
61+
ExternalStreamCounterPtr external_stream_counter;
6162

6263
NamesAndTypesList virtual_column_names_and_types;
63-
klog::KConfParams kafka_properties;
64-
const ASTs engine_args;
65-
const std::unique_ptr<klog::KafkaWALAuth> auth_info;
6664

6765
std::mutex shards_mutex;
6866
int32_t shards = 0;
6967

70-
ExternalStreamCounterPtr external_stream_counter;
68+
ASTPtr message_key_ast;
69+
70+
Poco::Logger * logger;
7171
};
7272
}

0 commit comments

Comments
 (0)