Skip to content

Commit 9a852d7

Browse files
Revert "Send one message to Kafka per row (#284)" (#321)
This reverts commit 7f5f8c2.
1 parent aa9f764 commit 9a852d7

File tree

2 files changed

+1
-4
lines changed

2 files changed

+1
-4
lines changed

src/Storages/ExternalStream/Kafka/KafkaSink.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -223,9 +223,7 @@ KafkaSink::KafkaSink(const Kafka * kafka, const Block & header, ContextPtr conte
223223
if (data_format.empty())
224224
data_format = "JSONEachRow";
225225

226-
/// The callback allows `IRowOutputFormat` based formats produce one Kafka message per row.
227-
writer = FormatFactory::instance().getOutputFormat(
228-
data_format, *wb, header, context, [this](auto & /*column*/, auto /*row*/) { wb->next(); });
226+
writer = FormatFactory::instance().getOutputFormat(data_format, *wb, header, context);
229227
writer->setAutoFlush();
230228

231229
partitioner = std::make_unique<KafkaStream::ChunkPartitioner>(context, header, kafka->partitioning_expr_ast());

src/Storages/ExternalStream/Kafka/WriteBufferFromKafka.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ void WriteBufferFromKafka::nextImpl()
2020
if (!offset())
2121
return;
2222

23-
/// rd_kafka_produce only enqueue messages, it does not send messages out right away.
2423
auto err = rd_kafka_produce(
2524
topic,
2625
/// we want to trigger the partitioner function, check KafkaSink.cpp

0 commit comments

Comments
 (0)