Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.altinity.clickhouse.sink.connector.deduplicator.DeDuplicator;
import com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchExecutor;
import com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable;
import com.altinity.clickhouse.sink.connector.executor.WriteConfirmationCallback;
import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
Expand All @@ -21,16 +22,18 @@
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* <p>Creates sink service instance, takes records loaded from those Kafka
* partitions and ingests them into ClickHouse via the Sink service.</p>
* <p>This class extends {@link SinkTask} and is managed by Kafka Connect.</p>
*/
public class ClickHouseSinkTask extends SinkTask {
public class ClickHouseSinkTask extends SinkTask implements WriteConfirmationCallback {

/**
* The logger for this class.
Expand Down Expand Up @@ -74,6 +77,12 @@ public class ClickHouseSinkTask extends SinkTask {
*/
private long totalRecords;

/**
* Tracks successfully committed offsets for at-least-once delivery.
* Key: TopicPartition, Value: Highest successfully committed offset
*/
private final Map<TopicPartition, Long> successfullyCommittedOffsets = new ConcurrentHashMap<>();

/**
* Default constructor.
*/
Expand Down Expand Up @@ -115,7 +124,7 @@ public void start(Map<String, String> config) {
this.records = new LinkedBlockingQueue<>(maxQueueSize);

ClickHouseBatchRunnable runnable = new ClickHouseBatchRunnable(
this.records, this.config, topic2TableMap);
this.records, this.config, topic2TableMap, this::onWriteSuccess);

ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("Sink Connector thread-pool-%d")
Expand Down Expand Up @@ -184,7 +193,6 @@ public void close(final Collection<TopicPartition> partitions) {
@Override
public void put(Collection<SinkRecord> records) {
totalRecords += records.size();

long taskId = this.config.getLong(
ClickHouseSinkConnectorConfigVariables.TASK_ID.toString());

Expand All @@ -204,10 +212,12 @@ public void put(Collection<SinkRecord> records) {
}
}

try {
this.records.put(batch);
} catch (InterruptedException e) {
throw new RetriableException(e);
if (!batch.isEmpty()) {
try {
this.records.put(batch);
} catch (InterruptedException e) {
throw new RetriableException(e);
}
}
}

Expand Down Expand Up @@ -243,6 +253,8 @@ public void put(Collection<SinkRecord> records) {
* system and doesn't need Kafka Connect to record anything, then this
* method should be overridden (instead of flush) and return an empty
* set of offsets.</p>
* <p>For at-least-once delivery, this method only returns
* offsets for records that have been successfully written to ClickHouse.</p>
*
* @param currentOffsets A map of topic partition to current offset.
* @return A map of committed offsets that Kafka Connect should record.
Expand All @@ -255,14 +267,27 @@ public Map<TopicPartition, OffsetAndMetadata> preCommit(

log.info("preCommit({}) {}", this.id, currentOffsets.size());

Map<TopicPartition, OffsetAndMetadata> committedOffsets =
new HashMap<>();
Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();

try {
// For at-least-once delivery, only commit offsets for successfully written records
currentOffsets.forEach((topicPartition, offsetAndMetadata) -> {
committedOffsets.put(topicPartition,
new OffsetAndMetadata(offsetAndMetadata.offset()));
Long lastSuccessfulOffset = successfullyCommittedOffsets.get(topicPartition);

if (lastSuccessfulOffset != null) {
Long offsetToCommit = lastSuccessfulOffset + 1;
committedOffsets.put(topicPartition, new OffsetAndMetadata(offsetToCommit));

// Log only when we're committing a different offset than requested
if (!offsetToCommit.equals(offsetAndMetadata.offset())) {
log.debug("preCommit: Holding back offset for {}: requested={}, committing={}",
topicPartition, offsetAndMetadata.offset(), offsetToCommit);
}
}
});

log.debug("preCommit({}) returning {} committed offsets out of {} requested", this.id, committedOffsets.size(), currentOffsets.size());

} catch (Exception e) {
log.error("preCommit({}):{}", this.id, e.getMessage());
return new HashMap<>();
Expand All @@ -279,6 +304,32 @@ public Map<TopicPartition, OffsetAndMetadata> preCommit(
// }
// }

/**
* Callback method called when records have been successfully written to ClickHouse.
* Used for at-least-once delivery.
*
* @param successfulRecords List of ClickHouseStruct records that were successfully written
*/
@Override
public void onWriteSuccess(List<ClickHouseStruct> successfulRecords) {
// Group records by partition and find max offset per partition
Map<TopicPartition, Long> partitionMaxOffsets = successfulRecords.stream()
.collect(Collectors.toMap(
record -> new TopicPartition(record.getTopic(), record.getKafkaPartition()),
ClickHouseStruct::getKafkaOffset,
Math::max // merge function for duplicate keys
));

// Update successfully committed offsets for each partition
partitionMaxOffsets.forEach((topicPartition, maxOffset) -> {
successfullyCommittedOffsets.merge(topicPartition, maxOffset, Math::max);
log.debug("Updated max successful offset for {}: {}", topicPartition, maxOffset);
});

log.debug("onWriteSuccess: Processed {} successful records across {} partitions",
successfulRecords.size(), partitionMaxOffsets.size());
}

/**
* Returns the version of this task, typically the same as the connector
* version.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ public class ClickHouseBatchRunnable implements Runnable {
*/
private Map<String, String> databaseOverrideMap = new HashMap<>();

/**
* Callback for notifying about successful writes to ClickHouse.
*/
private final WriteConfirmationCallback writeCallback;

/**
* Sleep time in milliseconds after an exception occurs.
*/
Expand All @@ -93,13 +98,16 @@ public class ClickHouseBatchRunnable implements Runnable {
* @param records the queue of record batches
* @param config the connector configuration
* @param topic2TableMap a map of topic names to table names
* @param writeCallback the callback for write confirmations
*/
public ClickHouseBatchRunnable(
LinkedBlockingQueue<List<ClickHouseStruct>> records,
ClickHouseSinkConnectorConfig config,
Map<String, String> topic2TableMap) {
Map<String, String> topic2TableMap,
WriteConfirmationCallback writeCallback) {
this.records = records;
this.config = config;
this.writeCallback = writeCallback;
if (topic2TableMap == null) {
this.topic2TableMap = new HashMap();
} else {
Expand Down Expand Up @@ -275,6 +283,11 @@ public void run() {
}
}
if (result) {
// Notify about successfully written records for at-least-once delivery
if (writeCallback != null) {
writeCallback.onWriteSuccess(currentBatch);
}

// Step 2: Check if the batch can be committed.
if(DebeziumOffsetManagement.checkIfBatchCanBeCommitted(currentBatch)) {
currentBatch = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.altinity.clickhouse.sink.connector.executor;

import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct;
import java.util.List;

/**
* Callback interface for notifying about successful writes to ClickHouse.
* Used for at-least-once delivery.
*/
public interface WriteConfirmationCallback {

/**
* Called when records have been successfully written to ClickHouse.
*
* @param successfulRecords List of records that were successfully written
*/
void onWriteSuccess(List<ClickHouseStruct> successfulRecords);
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public Struct getKafkaStruct() {
@Test
public void testGetTableNameFromTopic() {
ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(new HashMap<String, String>());
ClickHouseBatchRunnable run = new ClickHouseBatchRunnable(this.records, config, this.topic2TableMap);
ClickHouseBatchRunnable run = new ClickHouseBatchRunnable(this.records, config, this.topic2TableMap, null);

String tableName = run.getTableFromTopic("SERVER5432.test.customers");

Expand Down
Loading