diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java index 8d4553454..7dcd38fa5 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java @@ -38,8 +38,9 @@ import java.util.HashMap; import java.util.List; import java.util.Properties; +import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -59,7 +60,7 @@ public class DebeziumChangeEventCapture { private ClickHouseBatchRunnable runnable; // Records grouped by Topic Name - private ConcurrentHashMap> records; + private final ConcurrentHashMap> records = new ConcurrentHashMap<>(); private BaseDbWriter writer = null; @@ -198,14 +199,8 @@ private void processEveryChangeRecord(Properties props, ChangeEvent queue = new ConcurrentLinkedQueue(); if (chStruct != null) { - queue.add(chStruct); - } - synchronized (this.records) { - if (chStruct != null) { - addRecordsToSharedBuffer(chStruct.getTopic(), chStruct); - } + addRecordsToSharedBuffer(chStruct.getTopic(), chStruct); } } @@ -562,17 +557,22 @@ private void setupProcessingThread(ClickHouseSinkConnectorConfig config, DDLPars * @param topicName * @param chs */ - private void addRecordsToSharedBuffer(String topicName, ClickHouseStruct chs) { - ConcurrentLinkedQueue structs; + private void addRecordsToSharedBuffer(String topicName, ClickHouseStruct chs) throws InterruptedException { + Queue structs; if (this.records.containsKey(topicName)) { structs = this.records.get(topicName); } else { - structs = new ConcurrentLinkedQueue<>(); + structs = new LinkedBlockingQueue<>(200_000); + this.records.putIfAbsent(topicName, structs); } - structs.add(chs); - synchronized (this.records) { - this.records.put(topicName, structs); + while (true) { + try { + structs.add(chs); + break; + } catch (IllegalStateException e) { + Thread.sleep(50); + } } } diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkTask.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkTask.java index 2ca580a54..f1ea340dd 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkTask.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkTask.java @@ -18,8 +18,9 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; /** @@ -39,7 +40,7 @@ public ClickHouseSinkTask() { private ClickHouseBatchExecutor executor; // Records grouped by Topic Name - private ConcurrentHashMap> records; + private ConcurrentHashMap> records; private DeDuplicator deduplicator; @@ -115,16 +116,25 @@ public void put(Collection records) { } private void appendToRecords(String topicName, ClickHouseStruct chs) { - ConcurrentLinkedQueue structs; + Queue structs; - if(this.records.containsKey(topicName)) { + if (this.records.containsKey(topicName)) { structs = this.records.get(topicName); } else { - structs = new ConcurrentLinkedQueue<>(); + structs = new LinkedBlockingQueue<>(200_000); + this.records.putIfAbsent(topicName, structs); } - structs.add(chs); - synchronized (this.records) { - this.records.put(topicName, structs); + while (true) { + try { + structs.add(chs); + break; + } catch (IllegalStateException e) { + try { + Thread.sleep(50); + } catch (InterruptedException ignored) { + + } + } } } diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java index ca9dfb912..12e7db4c0 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java @@ -27,12 +27,7 @@ import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Types; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; /** @@ -237,8 +232,8 @@ private void updatePartitionOffsetMap(Map offsetToPartitio * @param records * @return */ - public Map groupQueryWithRecords(ConcurrentLinkedQueue records, - Map>, + public Map groupQueryWithRecords(Collection records, + Map>, List> queryToRecordsMap) { @@ -468,7 +463,9 @@ public BlockMetaData addToPreparedStatementBatch(String topicName, Map> records; + private final ConcurrentHashMap> records; private final ClickHouseSinkConnectorConfig config; @@ -44,11 +41,9 @@ public class ClickHouseBatchRunnable implements Runnable { // Map of topic name to buffered records. - Map>, List>> topicToRecordsMap; - private DBCredentials dbCredentials; - public ClickHouseBatchRunnable(ConcurrentHashMap> records, + public ClickHouseBatchRunnable(ConcurrentHashMap> records, ClickHouseSinkConnectorConfig config, Map topic2TableMap) { this.records = records; @@ -61,7 +56,6 @@ public ClickHouseBatchRunnable(ConcurrentHashMap(); this.topicToDbWriterMap = new HashMap<>(); - this.topicToRecordsMap = new HashMap<>(); this.dbCredentials = parseDBConfiguration(); @@ -96,9 +90,11 @@ public void run() { } // Topic Name -> List of records - for (Map.Entry> entry : this.records.entrySet()) { - if (entry.getValue().size() > 0) { - processRecordsByTopic(entry.getKey(), entry.getValue()); + for (Map.Entry> entry : this.records.entrySet()) { + Queue queue = entry.getValue(); + while (!queue.isEmpty()) { + Queue buffer = this.moveRecordsToSeparateBuffer(entry.getValue()); + processRecordsByTopic(entry.getKey(), buffer); } } } catch(Exception e) { @@ -106,6 +102,34 @@ public void run() { } } + private Queue moveRecordsToSeparateBuffer(Queue from) throws InterruptedException { + long timeMillis = System.currentTimeMillis(); + Iterator iterator = from.iterator(); + int bufferSize = 100_000; + ArrayDeque buffer = new ArrayDeque<>(bufferSize); + while (System.currentTimeMillis() - timeMillis < 5000) { + if (!iterator.hasNext()) { + break; + } + int counter = 0; + while (iterator.hasNext() && buffer.size() < bufferSize) { + buffer.add(iterator.next()); + iterator.remove(); + ++counter; + } + if (buffer.size() == bufferSize) { + break; + } + if (counter < 1000) { //probably fetching data from binlog by now (or small or really wide table) + break; + } + Thread.sleep(50); + } + log.info(String.format("Built new batch for processing in %d msec", System.currentTimeMillis() - timeMillis)); + + return buffer; + } + /** * Function to retrieve table name from topic name * @@ -141,7 +165,7 @@ public DbWriter getDbWriterForTable(String topicName, String tableName, ClickHou * @param topicName * @param records */ - private void processRecordsByTopic(String topicName, ConcurrentLinkedQueue records) throws SQLException { + private void processRecordsByTopic(String topicName, Queue records) throws SQLException { //The user parameter will override the topic mapping to table. String tableName = getTableFromTopic(topicName); @@ -154,22 +178,12 @@ private void processRecordsByTopic(String topicName, ConcurrentLinkedQueue Records(List of ClickHouseStruct) - Map>, List> queryToRecordsMap; - - if(topicToRecordsMap.containsKey(topicName)) { - queryToRecordsMap = topicToRecordsMap.get(topicName); - } else { - queryToRecordsMap = new HashMap<>(); - topicToRecordsMap.put(topicName, queryToRecordsMap); - } + Map>, List> queryToRecordsMap = new HashMap<>(); Map partitionToOffsetMap = writer.groupQueryWithRecords(records, queryToRecordsMap); BlockMetaData bmd = new BlockMetaData(); - if(flushRecordsToClickHouse(topicName, writer, queryToRecordsMap, bmd)) { - // Remove the entry. - queryToRecordsMap.remove(topicName); - } + flushRecordsToClickHouse(topicName, writer, queryToRecordsMap, bmd); if (this.config.getBoolean(ClickHouseSinkConnectorConfigVariables.ENABLE_KAFKA_OFFSET.toString())) { log.info("***** KAFKA OFFSET MANAGEMENT ENABLED *****"); diff --git a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnableTest.java b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnableTest.java index 3c2da1261..83733dc87 100644 --- a/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnableTest.java +++ b/sink-connector/src/test/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnableTest.java @@ -13,13 +13,14 @@ import java.util.HashMap; import java.util.Map; +import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; public class ClickHouseBatchRunnableTest { - ConcurrentHashMap> + ConcurrentHashMap> records = new ConcurrentHashMap<>(); Map topic2TableMap = new HashMap<>();