Skip to content

Commit 84c4aed

Browse files
authored
feat: Kafka external streams support SCRAM-SHA-256/512 (#454)
1 parent 8b78038 commit 84c4aed

10 files changed

+58
-92
lines changed

src/KafkaLog/KafkaWAL.cpp

+1-11
Original file line numberDiff line numberDiff line change
@@ -365,24 +365,14 @@ void KafkaWAL::initProducerHandle()
365365
{"message.max.bytes", std::to_string(settings->message_max_bytes)},
366366
{"message.timeout.ms", std::to_string(settings->message_timeout_ms)},
367367
/// Protocol used to communicate with brokers.
368-
{"security.protocol", settings->auth.security_protocol.c_str()},
369368
{"topic.metadata.refresh.interval.ms", std::to_string(settings->topic_metadata_refresh_interval_ms)},
370369
{"compression.codec", settings->compression_codec.c_str()},
371370
};
372371

373372
if (!settings->debug.empty())
374373
producer_params.emplace_back("debug", settings->debug);
375374

376-
if (boost::iequals(settings->auth.security_protocol, "SASL_PLAINTEXT")
377-
|| boost::iequals(settings->auth.security_protocol, "SASL_SSL"))
378-
{
379-
producer_params.emplace_back("sasl.mechanisms", "PLAIN");
380-
producer_params.emplace_back("sasl.username", settings->auth.username.c_str());
381-
producer_params.emplace_back("sasl.password", settings->auth.password.c_str());
382-
}
383-
384-
if (boost::iequals(settings->auth.security_protocol, "SASL_SSL") && !settings->auth.ssl_ca_cert_file.empty())
385-
producer_params.emplace_back("ssl.ca.location", settings->auth.ssl_ca_cert_file.c_str());
375+
settings->auth.populateConfigs(producer_params);
386376

387377
auto cb_setup = [](rd_kafka_conf_t * kconf) {
388378
rd_kafka_conf_set_stats_cb(kconf, &KafkaWALStats::logStats);

src/KafkaLog/KafkaWALConsumer.cpp

+6-16
Original file line numberDiff line numberDiff line change
@@ -98,19 +98,9 @@ void KafkaWALConsumer::initHandle()
9898
/// ensuring no on-the-wire or on-disk corruption to the messages occurred
9999
std::make_pair("check.crcs", std::to_string(settings->check_crcs)),
100100
std::make_pair("statistics.interval.ms", std::to_string(settings->statistic_internal_ms)),
101-
std::make_pair("security.protocol", settings->auth.security_protocol.c_str()),
102101
};
103102

104-
if (boost::iequals(settings->auth.security_protocol, "SASL_PLAINTEXT")
105-
|| boost::iequals(settings->auth.security_protocol, "SASL_SSL"))
106-
{
107-
consumer_params.emplace_back("sasl.mechanisms", "PLAIN");
108-
consumer_params.emplace_back("sasl.username", settings->auth.username.c_str());
109-
consumer_params.emplace_back("sasl.password", settings->auth.password.c_str());
110-
}
111-
112-
if (boost::iequals(settings->auth.security_protocol, "SASL_SSL") && !settings->auth.ssl_ca_cert_file.empty())
113-
consumer_params.emplace_back("ssl.ca.location", settings->auth.ssl_ca_cert_file.c_str());
103+
settings->auth.populateConfigs(consumer_params);
114104

115105
if (!settings->debug.empty())
116106
consumer_params.emplace_back("debug", settings->debug);
@@ -141,11 +131,11 @@ int32_t KafkaWALConsumer::addSubscriptions(const TopicPartitionOffsets & partiti
141131

142132
for (const auto & partition : partitions_)
143133
{
144-
auto new_partition = rd_kafka_topic_partition_list_add(topic_partitions.get(), partition.topic.c_str(), partition.partition);
134+
auto * new_partition = rd_kafka_topic_partition_list_add(topic_partitions.get(), partition.topic.c_str(), partition.partition);
145135
new_partition->offset = partition.offset;
146136
}
147137

148-
auto err = rd_kafka_incremental_assign(consumer_handle.get(), topic_partitions.get());
138+
auto * err = rd_kafka_incremental_assign(consumer_handle.get(), topic_partitions.get());
149139
if (err)
150140
{
151141
LOG_ERROR(log, "Failed to assign partitions incrementally, error={}", rd_kafka_error_string(err));
@@ -168,7 +158,7 @@ int32_t KafkaWALConsumer::removeSubscriptions(const TopicPartitionOffsets & part
168158
rd_kafka_topic_partition_list_add(topic_partitions.get(), partition.topic.c_str(), partition.partition);
169159
}
170160

171-
auto err = rd_kafka_incremental_unassign(consumer_handle.get(), topic_partitions.get());
161+
auto * err = rd_kafka_incremental_unassign(consumer_handle.get(), topic_partitions.get());
172162
if (err)
173163
{
174164
LOG_ERROR(log, "Failed to unassign partitions incrementally, error={}", rd_kafka_error_string(err));
@@ -193,7 +183,7 @@ ConsumeResult KafkaWALConsumer::consume(uint32_t count, int32_t timeout_ms, std:
193183

194184
for (uint32_t i = 0; i < count; ++i)
195185
{
196-
auto rkmessage = rd_kafka_consumer_poll(consumer_handle.get(), timeout_ms);
186+
auto * rkmessage = rd_kafka_consumer_poll(consumer_handle.get(), timeout_ms);
197187
if (likely(rkmessage))
198188
{
199189
if (likely(rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR))
@@ -254,7 +244,7 @@ int32_t KafkaWALConsumer::commit(const TopicPartitionOffsets & tpos)
254244

255245
for (const auto & tpo : tpos)
256246
{
257-
auto partition_offset = rd_kafka_topic_partition_list_add(topic_partition_list.get(), tpo.topic.c_str(), tpo.partition);
247+
auto * partition_offset = rd_kafka_topic_partition_list_add(topic_partition_list.get(), tpo.topic.c_str(), tpo.partition);
258248

259249
/// rd_kafka_offsets_store commits `offset` as it is. We add 1 to the offset
260250
/// to keep the same semantic as rd_kafka_offset_store

src/KafkaLog/KafkaWALPool.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class KafkaWALPool : private boost::noncopyable
3131

3232
KafkaWALSimpleConsumerPtr getOrCreateStreaming(const String & cluster_id);
3333

34-
KafkaWALSimpleConsumerPtr getOrCreateStreamingExternal(const String & brokers, const KafkaWALAuth & auth, int32_t fetch_max_wait_ms = 200);
34+
KafkaWALSimpleConsumerPtr getOrCreateStreamingExternal(const String & brokers, const KafkaWALAuth & auth, int32_t fetch_wait_max_ms = 200);
3535

3636
std::vector<KafkaWALClusterPtr> clusters(const KafkaWALContext & ctx) const;
3737

src/KafkaLog/KafkaWALSettings.h

+29-11
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@
22

33
#include <boost/algorithm/string/join.hpp>
44

5-
#include <string>
6-
#include <vector>
7-
5+
#include <boost/algorithm/string/predicate.hpp>
86
#include <fmt/format.h>
97

108
namespace klog
@@ -14,6 +12,7 @@ struct KafkaWALAuth
1412
std::string security_protocol;
1513
std::string username;
1614
std::string password;
15+
std::string sasl_mechanism;
1716
std::string ssl_ca_cert_file;
1817

1918
bool operator==(const KafkaWALAuth & o) const
@@ -23,6 +22,31 @@ struct KafkaWALAuth
2322
&& password == o.password
2423
&& ssl_ca_cert_file == o.ssl_ca_cert_file;
2524
}
25+
26+
bool usesSASL() const
27+
{
28+
return boost::istarts_with(security_protocol, "SASL_");
29+
}
30+
31+
/// "SASL_SSL" or "SSL"
32+
bool usesSecureConnection() const
33+
{
34+
return boost::iends_with(security_protocol, "SSL");
35+
}
36+
37+
void populateConfigs(std::vector<std::pair<std::string, std::string>> & params) const
38+
{
39+
params.emplace_back("security.protocol", security_protocol);
40+
if (usesSASL())
41+
{
42+
params.emplace_back("sasl.mechanism", sasl_mechanism);
43+
params.emplace_back("sasl.username", username);
44+
params.emplace_back("sasl.password", password);
45+
}
46+
47+
if (usesSecureConnection() && !ssl_ca_cert_file.empty())
48+
params.emplace_back("ssl.ca.location", ssl_ca_cert_file);
49+
}
2650
};
2751

2852
struct KafkaWALSettings
@@ -34,18 +58,12 @@ struct KafkaWALSettings
3458
/// Global settings for both producer and consumer /// global metrics
3559
/// comma separated host/port: host1:port,host2:port,...
3660
std::string brokers;
37-
KafkaWALAuth auth = {
38-
.security_protocol = "plaintext",
39-
.username = "",
40-
.password = "",
41-
.ssl_ca_cert_file = ""
42-
};
43-
/// FIXME, SASL, SSL etc support
61+
KafkaWALAuth auth = { .security_protocol = "plaintext" };
4462

4563
int32_t message_max_bytes = 1000000;
4664
int32_t topic_metadata_refresh_interval_ms = 300000;
4765
int32_t statistic_internal_ms = 30000;
48-
std::string debug = "";
66+
std::string debug;
4967

5068
/////////////////////////////////////////////////////
5169

src/KafkaLog/KafkaWALSimpleConsumer.cpp

+1-11
Original file line numberDiff line numberDiff line change
@@ -87,24 +87,14 @@ void KafkaWALSimpleConsumer::initHandle()
8787
{"enable.partition.eof", "false"},
8888
{"queued.min.messages", std::to_string(settings->queued_min_messages)},
8989
{"queued.max.messages.kbytes", std::to_string(settings->queued_max_messages_kbytes)},
90-
{"security.protocol", settings->auth.security_protocol.c_str()},
9190
};
9291

9392
if (!settings->debug.empty())
9493
{
9594
consumer_params.emplace_back("debug", settings->debug);
9695
}
9796

98-
if (boost::iequals(settings->auth.security_protocol, "SASL_PLAINTEXT")
99-
|| boost::iequals(settings->auth.security_protocol, "SASL_SSL"))
100-
{
101-
consumer_params.emplace_back("sasl.mechanisms", "PLAIN");
102-
consumer_params.emplace_back("sasl.username", settings->auth.username.c_str());
103-
consumer_params.emplace_back("sasl.password", settings->auth.password.c_str());
104-
}
105-
106-
if (boost::iequals(settings->auth.security_protocol, "SASL_SSL") && !settings->auth.ssl_ca_cert_file.empty())
107-
consumer_params.emplace_back("ssl.ca.location", settings->auth.ssl_ca_cert_file.c_str());
97+
settings->auth.populateConfigs(consumer_params);
10898

10999
auto cb_setup = [](rd_kafka_conf_t * kconf) { /// STYLE_CHECK_ALLOW_BRACE_SAME_LINE_LAMBDA
110100
rd_kafka_conf_set_stats_cb(kconf, &KafkaWALStats::logStats);

src/Storages/ExternalStream/ExternalStreamSettings.h

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ class ASTStorage;
1515
M(String, security_protocol, "plaintext", "The protocol to connection external logstore", 0) \
1616
M(String, username, "", "The username of external logstore", 0) \
1717
M(String, password, "", "The password of external logstore", 0) \
18+
M(String, sasl_mechanism, "PLAIN", "SASL mechanism to use for authentication. Supported: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512. Default to PLAIN when SASL is enabled.", 0) \
1819
M(String, ssl_ca_cert_file, "", "The path of ssl ca cert file", 0) \
1920
M(String, properties, "", "A semi-colon-separated key-value pairs for configuring the kafka client used by the external stream. A key-value pair is separated by a equal sign. Example: 'client.id=my-client-id;group.id=my-group-id'. Note, not all properties are supported, please check the document for supported properties.", 0) \
2021
M(String, sharding_expr, "", "An expression which will be evaluated on each row of data returned by the query to calculate the an integer which will be used to determine the ID of the partition to which the row of data will be sent. If not set, data are sent to any partition randomly.", 0) \

src/Storages/ExternalStream/Kafka/Kafka.cpp

+13-20
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ Kafka::Kafka(IStorage * storage, std::unique_ptr<ExternalStreamSettings> setting
3232
, data_format(StorageExternalStreamImpl::dataFormat())
3333
, log(&Poco::Logger::get("External-" + settings->topic.value))
3434
, engine_args(engine_args_)
35+
, auth_info(std::make_unique<klog::KafkaWALAuth>(
36+
settings->security_protocol.value,
37+
settings->username.value,
38+
settings->password.value,
39+
settings->sasl_mechanism.value,
40+
settings->ssl_ca_cert_file.value))
3541
, external_stream_counter(external_stream_counter_)
3642
{
3743
assert(settings->type.value == StreamTypes::KAFKA || settings->type.value == StreamTypes::REDPANDA);
@@ -145,6 +151,11 @@ void Kafka::cacheVirtualColumnNamesAndTypes()
145151
virtual_column_names_and_types.push_back(NameAndTypePair(ProtonConsts::RESERVED_EVENT_SEQUENCE_ID, std::make_shared<DataTypeInt64>()));
146152
}
147153

154+
klog::KafkaWALSimpleConsumerPtr Kafka::getConsumer(int32_t fetch_wait_max_ms) const
155+
{
156+
return klog::KafkaWALPool::instance(nullptr).getOrCreateStreamingExternal(settings->brokers.value, *auth_info, fetch_wait_max_ms);
157+
}
158+
148159
std::vector<Int64> Kafka::getOffsets(const SeekToInfoPtr & seek_to_info, const std::vector<int32_t> & shards_to_query) const
149160
{
150161
assert(seek_to_info);
@@ -155,15 +166,6 @@ std::vector<Int64> Kafka::getOffsets(const SeekToInfoPtr & seek_to_info, const s
155166
}
156167
else
157168
{
158-
klog::KafkaWALAuth auth{
159-
.security_protocol = securityProtocol(),
160-
.username = username(),
161-
.password = password(),
162-
.ssl_ca_cert_file = sslCaCertFile(),
163-
};
164-
165-
auto consumer = klog::KafkaWALPool::instance(nullptr).getOrCreateStreamingExternal(settings->brokers.value, auth);
166-
167169
std::vector<klog::PartitionTimestamp> partition_timestamps;
168170
partition_timestamps.reserve(shards_to_query.size());
169171
auto seek_timestamps{seek_to_info->getSeekPoints()};
@@ -172,7 +174,7 @@ std::vector<Int64> Kafka::getOffsets(const SeekToInfoPtr & seek_to_info, const s
172174
for (auto [shard, timestamp] : std::ranges::views::zip(shards_to_query, seek_timestamps))
173175
partition_timestamps.emplace_back(shard, timestamp);
174176

175-
return consumer->offsetsForTimestamps(settings->topic.value, partition_timestamps);
177+
return getConsumer()->offsetsForTimestamps(settings->topic.value, partition_timestamps);
176178
}
177179
}
178180

@@ -241,16 +243,7 @@ void Kafka::validate(const std::vector<int32_t> & shards_to_query)
241243
if (shards == 0)
242244
{
243245
/// We haven't describe the topic yet
244-
klog::KafkaWALAuth auth = {
245-
.security_protocol = settings->security_protocol.value,
246-
.username = settings->username.value,
247-
.password = settings->password.value,
248-
.ssl_ca_cert_file = settings->ssl_ca_cert_file.value,
249-
};
250-
251-
auto consumer = klog::KafkaWALPool::instance(nullptr).getOrCreateStreamingExternal(settings->brokers.value, auth);
252-
253-
auto result = consumer->describe(settings->topic.value);
246+
auto result = getConsumer()->describe(settings->topic.value);
254247
if (result.err != ErrorCodes::OK)
255248
throw Exception(ErrorCodes::RESOURCE_NOT_FOUND, "{} topic doesn't exist", settings->topic.value);
256249

src/Storages/ExternalStream/Kafka/Kafka.h

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22

33
#include <KafkaLog/KafkaWALCommon.h>
4+
#include <KafkaLog/KafkaWALSimpleConsumer.h>
45
#include <Storages/ExternalStream/ExternalStreamSettings.h>
56
#include <Storages/ExternalStream/StorageExternalStreamImpl.h>
67
#include <Storages/ExternalStream/ExternalStreamCounter.h>
@@ -38,13 +39,11 @@ class Kafka final : public StorageExternalStreamImpl
3839
const String & brokers() const { return settings->brokers.value; }
3940
const String & dataFormat() const override { return data_format; }
4041
const String & topic() const { return settings->topic.value; }
41-
const String & securityProtocol() const { return settings->security_protocol.value; }
42-
const String & username() const { return settings->username.value; }
43-
const String & password() const { return settings->password.value; }
44-
const String & sslCaCertFile() const { return settings->ssl_ca_cert_file.value; }
4542
const klog::KConfParams & properties() const { return kafka_properties; }
43+
const klog::KafkaWALAuth & auth() const noexcept { return *auth_info; }
4644
bool hasCustomShardingExpr() const { return !engine_args.empty(); }
4745
const ASTPtr & shardingExprAst() const { assert(!engine_args.empty()); return engine_args[0]; }
46+
klog::KafkaWALSimpleConsumerPtr getConsumer(int32_t fetch_wait_max_ms = 200) const;
4847

4948
private:
5049
void calculateDataFormat(const IStorage * storage);
@@ -63,6 +62,7 @@ class Kafka final : public StorageExternalStreamImpl
6362
NamesAndTypesList virtual_column_names_and_types;
6463
klog::KConfParams kafka_properties;
6564
const ASTs engine_args;
65+
const std::unique_ptr<klog::KafkaWALAuth> auth_info;
6666

6767
std::mutex shards_mutex;
6868
int32_t shards = 0;

src/Storages/ExternalStream/Kafka/KafkaSink.cpp

+1-11
Original file line numberDiff line numberDiff line change
@@ -174,17 +174,7 @@ KafkaSink::KafkaSink(const Kafka * kafka, const Block & header, ContextPtr conte
174174

175175
/// properies from settings have higher priority
176176
producer_params.emplace_back("bootstrap.servers", kafka->brokers());
177-
producer_params.emplace_back("security.protocol", kafka->securityProtocol());
178-
if (boost::iequals(kafka->securityProtocol(), "SASL_PLAINTEXT")
179-
|| boost::iequals(kafka->securityProtocol(), "SASL_SSL"))
180-
{
181-
producer_params.emplace_back("sasl.mechanisms", "PLAIN");
182-
producer_params.emplace_back("sasl.username", kafka->username());
183-
producer_params.emplace_back("sasl.password", kafka->password());
184-
}
185-
186-
if (boost::iequals(kafka->securityProtocol(), "SASL_SSL") && !kafka->sslCaCertFile().empty())
187-
producer_params.emplace_back("ssl.ca.location", kafka->sslCaCertFile());
177+
kafka->auth().populateConfigs(producer_params);
188178

189179
auto * conf = rd_kafka_conf_new();
190180
char errstr[512]{'\0'};

src/Storages/ExternalStream/Kafka/KafkaSource.cpp

+1-7
Original file line numberDiff line numberDiff line change
@@ -278,13 +278,7 @@ void KafkaSource::initConsumer(const Kafka * kafka)
278278
consume_ctx.auto_offset_reset = "earliest";
279279

280280
consume_ctx.enforce_offset = true;
281-
klog::KafkaWALAuth auth = {
282-
.security_protocol = kafka->securityProtocol(),
283-
.username = kafka->username(),
284-
.password = kafka->password(),
285-
.ssl_ca_cert_file = kafka->sslCaCertFile()
286-
};
287-
consumer = klog::KafkaWALPool::instance(nullptr).getOrCreateStreamingExternal(kafka->brokers(), auth, record_consume_timeout_ms);
281+
consumer = kafka->getConsumer(record_consume_timeout_ms);
288282
consumer->initTopicHandle(consume_ctx);
289283
}
290284

0 commit comments

Comments
 (0)