diff --git a/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java b/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java index d91261e39..6c079c2a7 100644 --- a/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java +++ b/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java @@ -7,6 +7,7 @@ import com.altinity.clickhouse.sink.connector.converters.DebeziumConverter; import com.altinity.clickhouse.sink.connector.db.operations.ClickHouseAlterTable; import com.altinity.clickhouse.sink.connector.db.operations.ClickHouseAutoCreateTable; +import com.altinity.clickhouse.sink.connector.db.operations.ClickHouseAlterTable.ALTER_TABLE_OPERATION; import com.altinity.clickhouse.sink.connector.metadata.TableMetaDataWriter; import com.altinity.clickhouse.sink.connector.model.BlockMetaData; import com.altinity.clickhouse.sink.connector.model.CdcRecordState; @@ -14,6 +15,7 @@ import com.altinity.clickhouse.sink.connector.model.KafkaMetaData; import com.clickhouse.client.ClickHouseCredentials; import com.clickhouse.client.ClickHouseNode; +import com.google.common.collect.Sets; import com.google.common.io.BaseEncoding; import io.debezium.data.Json; import io.debezium.time.Date; @@ -321,25 +323,45 @@ public boolean updateQueryToRecordsMap(ClickHouseStruct record, List modi * m modifiedFields */ public void alterTable(List modifiedFields) { - List missingFieldsInCH = new ArrayList(); - // Identify the columns that need to be added/removed in ClickHouse. - for(Field f: modifiedFields) { - String colName = f.name(); + List addColumns = new ArrayList(); + List dropColumns = new ArrayList(); + + //getAlterColumns(modifiedFields, addColumns, dropColumns, this.columnNameToDataTypeMap); + } + - if(this.columnNameToDataTypeMap.containsKey(colName) == false) { - missingFieldsInCH.add(f); + public void getAlterColumns(List sourceFields, + Set addColumns, Set dropColumns, Map columnNameToDataTypeMap) { + + Set sourceColumnNames = new HashSet<>(); + // Identify the columns that need to be added/removed in ClickHouse. + for(Field f: sourceFields) { + java.lang.String colName = f.name(); + sourceColumnNames.add(colName); + columnNameToDataTypeMap.keySet(); + // If the columns are missing in ClickHouse + if(columnNameToDataTypeMap.containsKey(colName) == false) { + addColumns.add(f); } } - if(!missingFieldsInCH.isEmpty()) { + Sets.SetView difference = Sets.difference(sourceColumnNames, columnNameToDataTypeMap.keySet()); + dropColumns.addAll(difference); + System.out.println(); + } + + + public void handleAlterColumns(ArrayList columns, ALTER_TABLE_OPERATION operation) { + if(!columns.isEmpty()) { log.info("***** ALTER TABLE ****"); ClickHouseAlterTable cat = new ClickHouseAlterTable(); - Field[] missingFieldsArray = new Field[missingFieldsInCH.size()]; - missingFieldsInCH.toArray(missingFieldsArray); + Field[] missingFieldsArray = new Field[columns.size()]; + columns.toArray(missingFieldsArray); Map colNameToDataTypeMap = cat.getColumnNameToCHDataTypeMapping(missingFieldsArray); if(!colNameToDataTypeMap.isEmpty()) { - String alterTableQuery = cat.createAlterTableSyntax(this.tableName, colNameToDataTypeMap, ClickHouseAlterTable.ALTER_TABLE_OPERATION.ADD); + String alterTableQuery = cat.createAlterTableSyntaxAddColumn(this.tableName, colNameToDataTypeMap, + operation); log.info(" ***** ALTER TABLE QUERY **** " + alterTableQuery); try { diff --git a/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAlterTable.java b/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAlterTable.java index acd8b7716..4e11be2cd 100644 --- a/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAlterTable.java +++ b/src/main/java/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAlterTable.java @@ -1,6 +1,7 @@ package com.altinity.clickhouse.sink.connector.db.operations; import java.util.Map; +import java.util.Set; /** * Class that handles logic related to alter table @@ -17,7 +18,7 @@ public enum ALTER_TABLE_OPERATION { this.op = op; } } - public String createAlterTableSyntax(String tableName, Map colNameToDataTypesMap, ALTER_TABLE_OPERATION operation) { + public String createAlterTableSyntaxAddColumn(String tableName, Map colNameToDataTypesMap, ALTER_TABLE_OPERATION operation) { // alter table // add column `col_name_1` data_type_1, // add column `col_name_2` data_type_2 @@ -29,8 +30,6 @@ public String createAlterTableSyntax(String tableName, Map colNa for(Map.Entry entry: colNameToDataTypesMap.entrySet()) { if(operation.name().equalsIgnoreCase(ALTER_TABLE_OPERATION.ADD.op)) { alterTableSyntax.append("add column "); - } else { - alterTableSyntax.append("delete column "); } alterTableSyntax.append("`").append(entry.getKey()).append("`").append(" ").append(entry.getValue()).append(","); } @@ -39,4 +38,24 @@ public String createAlterTableSyntax(String tableName, Map colNa return alterTableSyntax.toString(); } + + public String createAlterTableSyntaxDropColumn(String tableName, Set columnNames) { + // alter table + // add column `col_name_1` data_type_1, + // add column `col_name_2` data_type_2 + + StringBuilder alterTableSyntax = new StringBuilder(); + + alterTableSyntax.append("ALTER TABLE").append(" ").append(tableName).append(" "); + + for(String colName: columnNames) { + alterTableSyntax.append("drop column").append(colName).append(","); + } + //.append("drop column "); + + //alterTableSyntax.append(String.join(",", columnNames)); + + return alterTableSyntax.toString(); + } + } diff --git a/src/test/com/altinity/clickhouse/sink/connector/db/DbWriterTest.java b/src/test/com/altinity/clickhouse/sink/connector/db/DbWriterTest.java index 43bd7347f..895ae60d5 100644 --- a/src/test/com/altinity/clickhouse/sink/connector/db/DbWriterTest.java +++ b/src/test/com/altinity/clickhouse/sink/connector/db/DbWriterTest.java @@ -8,6 +8,7 @@ import com.clickhouse.jdbc.ClickHouseDataSource; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; @@ -17,7 +18,6 @@ import org.junit.jupiter.api.Tag; import java.sql.PreparedStatement; -import java.sql.SQLException; import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; @@ -26,7 +26,7 @@ public class DbWriterTest { DbWriter writer; @Before - public void init() throws SQLException { + public void init() { String hostName = "remoteClickHouse"; Integer port = 8000; @@ -76,6 +76,35 @@ public void testGetConnectionUrl() { public void testIsColumnTypeDate64() { boolean result = DbWriter.isColumnDateTime64("Nullable(DateTime64(3))"); } + + @Test + public void testGetAlterColumns() { + List fields = new ArrayList<>(); + + // Columns to be dropped. + Map columnNameToDataTypesMap = new HashMap(); + + columnNameToDataTypesMap.put("customerName", "String"); + columnNameToDataTypesMap.put("occupation", "String"); + columnNameToDataTypesMap.put("quantity", "UInt32"); + columnNameToDataTypesMap.put("_topic", "String"); + + fields.add(new Field("customerName", 0, Schema.STRING_SCHEMA)); + fields.add(new Field("occupation", 1, Schema.STRING_SCHEMA)); + fields.add(new Field("quantity", 2, Schema.INT32_SCHEMA)); + fields.add(new Field("amount", 3, Schema.FLOAT64_SCHEMA)); + //fields.add(new Field("employed", 4, Schema.BOOLEAN_SCHEMA)); + + List addColumns = new ArrayList(); + List dropColumns = new ArrayList(); + + //writer.getAlterColumns(fields, addColumns, dropColumns, columnNameToDataTypesMap ); + + Assert.assertFalse(addColumns.isEmpty()); + Assert.assertFalse(dropColumns.isEmpty()); + + } + @Test @Tag("IntegrationTest") public void testGetColumnsDataTypesForTable() { diff --git a/src/test/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAlterTableTest.java b/src/test/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAlterTableTest.java index 04e10bd99..2935e9c3f 100644 --- a/src/test/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAlterTableTest.java +++ b/src/test/com/altinity/clickhouse/sink/connector/db/operations/ClickHouseAlterTableTest.java @@ -3,13 +3,16 @@ import org.junit.Assert; import org.junit.Test; +import java.util.HashSet; +import java.util.Set; + public class ClickHouseAlterTableTest extends ClickHouseAutoCreateTableTest { @Test - public void createAlterTableSyntaxTest() { + public void createAlterTableSyntaxAddColumnTest() { ClickHouseAlterTable cat = new ClickHouseAlterTable(); - String alterTableQuery = cat.createAlterTableSyntax("employees", + String alterTableQuery = cat.createAlterTableSyntaxAddColumn("employees", this.getExpectedColumnToDataTypesMap(), ClickHouseAlterTable.ALTER_TABLE_OPERATION.ADD); String expectedQuery = "ALTER TABLE employees add column `amount` Float64,add column `occupation` String,add column `quantity` Int32,add column `amount_1` Float32,add column `customerName` String,add column `blob_storage` String,add column `employed` Bool"; @@ -17,4 +20,21 @@ public void createAlterTableSyntaxTest() { Assert.assertTrue(alterTableQuery.equalsIgnoreCase(expectedQuery)); System.out.println("Alter table query" + alterTableQuery); } + + @Test + public void createAlterTableSyntaxDropColumnTest() { + + Set droppedColumns = new HashSet(); + droppedColumns.add("customerName"); + droppedColumns.add("address"); + droppedColumns.add("phoneNumber"); + + ClickHouseAlterTable cat = new ClickHouseAlterTable(); + String alterTableQuery = cat.createAlterTableSyntaxDropColumn("employees", + droppedColumns); + + String expectedQuery = "ALTER TABLE employees drop column address,phoneNumber,customerName"; + + Assert.assertTrue(alterTableQuery.equalsIgnoreCase(expectedQuery)); + } }