Skip to content

Commit b74a0e5

Browse files
authored
enhancement: properly produce raw format data to Kafka external streams (#436)
1 parent eb17e17 commit b74a0e5

File tree

4 files changed

+23
-84
lines changed

4 files changed

+23
-84
lines changed

src/Storages/ExternalStream/Kafka/Kafka.cpp

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -183,24 +183,19 @@ void Kafka::calculateDataFormat(const IStorage * storage)
183183
if (!data_format.empty())
184184
return;
185185

186+
/// If there is only one column and its type is a string type, use RawBLOB. Use JSONEachRow otherwise.
186187
auto column_names_and_types{storage->getInMemoryMetadata().getColumns().getOrdinary()};
187-
if (column_names_and_types.size() != 1)
188-
throw Exception(
189-
ErrorCodes::INVALID_SETTING_VALUE,
190-
"`data_format` settings is empty but the Kafka external stream definition has multiple columns. Proton doesn't know how to "
191-
"parse Kafka messages without a data format.");
192-
193-
auto type = column_names_and_types.begin()->type;
194-
if (type->getTypeId() == TypeIndex::String || type->getTypeId() == TypeIndex::FixedString)
188+
if (column_names_and_types.size() == 1)
195189
{
196-
/// no-op
190+
auto type = column_names_and_types.begin()->type->getTypeId();
191+
if (type == TypeIndex::String || type == TypeIndex::FixedString)
192+
{
193+
data_format = "RawBLOB";
194+
return;
195+
}
197196
}
198-
/// FIXME: JSON logic
199-
else if (type->getTypeId() == TypeIndex::Object)
200-
data_format = "JSONEachRow";
201-
else
202-
throw Exception(
203-
ErrorCodes::NOT_IMPLEMENTED, "Automatically converting Kafka message to {} type is not supported yet", type->getName());
197+
198+
data_format = "JSONEachRow";
204199
}
205200

206201
/// FIXME, refactor out as util and unit test it

src/Storages/ExternalStream/Kafka/KafkaSink.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -221,9 +221,8 @@ KafkaSink::KafkaSink(const Kafka * kafka, const Block & header, ContextPtr conte
221221
topic = klog::KTopicPtr(rd_kafka_topic_new(producer.get(), kafka->topic().c_str(), nullptr), rd_kafka_topic_destroy);
222222
wb = std::make_unique<WriteBufferFromKafka>(topic.get());
223223

224-
String data_format = kafka->dataFormat();
225-
if (data_format.empty())
226-
data_format = "JSONEachRow";
224+
const auto & data_format = kafka->dataFormat();
225+
assert(!data_format.empty());
227226

228227
/// The callback allows `IRowOutputFormat` based formats produce one Kafka message per row.
229228
if (kafka->produceOneMessagePerRow())

src/Storages/ExternalStream/Kafka/KafkaSource.cpp

Lines changed: 11 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -130,62 +130,10 @@ void KafkaSource::parseMessage(void * kmessage, size_t total_count, void * data)
130130

131131
void KafkaSource::doParseMessage(const rd_kafka_message_t * kmessage, size_t /*total_count*/)
132132
{
133-
if (format_executor)
134-
parseFormat(kmessage);
135-
else
136-
parseRaw(kmessage);
137-
133+
parseFormat(kmessage);
138134
ckpt_data.last_sn = kmessage->offset;
139135
}
140136

141-
void KafkaSource::parseRaw(const rd_kafka_message_t * kmessage)
142-
{
143-
if (!request_virtual_columns)
144-
{
145-
/// fast path
146-
assert(physical_header.columns() == 1);
147-
148-
if (current_batch.empty())
149-
current_batch.push_back(physical_header.getByPosition(0).type->createColumn());
150-
151-
current_batch.back()->insertData(static_cast<const char *>(kmessage->payload), kmessage->len);
152-
external_stream_counter->addToReadBytes(kmessage->len);
153-
external_stream_counter->addToReadCounts(1);
154-
}
155-
else
156-
{
157-
/// slower path, request virtual columns
158-
if (!current_batch.empty())
159-
{
160-
assert(current_batch.size() == virtual_col_value_functions.size());
161-
for (size_t i = 0, n = virtual_col_value_functions.size(); i < n; ++i)
162-
{
163-
if (!virtual_col_value_functions[i])
164-
current_batch[i]->insertData(static_cast<const char *>(kmessage->payload), kmessage->len);
165-
else
166-
current_batch[i]->insertMany(virtual_col_value_functions[i](kmessage), 1);
167-
}
168-
}
169-
else
170-
{
171-
for (size_t i = 0, n = virtual_col_value_functions.size(); i < n; ++i)
172-
{
173-
if (!virtual_col_value_functions[i])
174-
{
175-
current_batch.push_back(physical_header.getByPosition(0).type->createColumn());
176-
current_batch.back()->insertData(static_cast<const char *>(kmessage->payload), kmessage->len);
177-
}
178-
else
179-
{
180-
auto column = virtual_col_types[i]->createColumn();
181-
column->insertMany(virtual_col_value_functions[i](kmessage), 1);
182-
current_batch.push_back(std::move(column));
183-
}
184-
}
185-
}
186-
}
187-
}
188-
189137
void KafkaSource::parseFormat(const rd_kafka_message_t * kmessage)
190138
{
191139
assert(format_executor);
@@ -285,21 +233,19 @@ void KafkaSource::initConsumer(const Kafka * kafka)
285233
void KafkaSource::initFormatExecutor(const Kafka * kafka)
286234
{
287235
const auto & data_format = kafka->dataFormat();
288-
if (!data_format.empty())
289-
{
290-
auto input_format
291-
= FormatFactory::instance().getInputFormat(data_format, read_buffer, non_virtual_header, query_context, max_block_size, kafka->getFormatSettings(query_context));
292236

293-
format_executor = std::make_unique<StreamingFormatExecutor>(
294-
non_virtual_header, std::move(input_format), [](const MutableColumns &, Exception &) -> size_t { return 0; });
237+
auto input_format
238+
= FormatFactory::instance().getInputFormat(data_format, read_buffer, non_virtual_header, query_context, max_block_size);
295239

296-
auto converting_dag = ActionsDAG::makeConvertingActions(
297-
non_virtual_header.cloneEmpty().getColumnsWithTypeAndName(),
298-
physical_header.cloneEmpty().getColumnsWithTypeAndName(),
299-
ActionsDAG::MatchColumnsMode::Name);
240+
format_executor = std::make_unique<StreamingFormatExecutor>(
241+
non_virtual_header, std::move(input_format), [](const MutableColumns &, Exception &) -> size_t { return 0; });
300242

301-
convert_non_virtual_to_physical_action = std::make_shared<ExpressionActions>(std::move(converting_dag));
302-
}
243+
auto converting_dag = ActionsDAG::makeConvertingActions(
244+
non_virtual_header.cloneEmpty().getColumnsWithTypeAndName(),
245+
physical_header.cloneEmpty().getColumnsWithTypeAndName(),
246+
ActionsDAG::MatchColumnsMode::Name);
247+
248+
convert_non_virtual_to_physical_action = std::make_shared<ExpressionActions>(std::move(converting_dag));
303249
}
304250

305251
void KafkaSource::calculateColumnPositions()

src/Storages/ExternalStream/Kafka/KafkaSource.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ class KafkaSource final : public ISource
5353
static void parseMessage(void * kmessage, size_t total_count, void * data);
5454
void doParseMessage(const rd_kafka_message_s * kmessage, size_t total_count);
5555
void parseFormat(const rd_kafka_message_s * kmessage);
56-
void parseRaw(const rd_kafka_message_s * kmessage);
5756

5857
inline void readAndProcess();
5958

0 commit comments

Comments
 (0)