Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
import com.altinity.clickhouse.sink.connector.converters.DebeziumConverter;
import com.altinity.clickhouse.sink.connector.db.operations.ClickHouseAlterTable;
import com.altinity.clickhouse.sink.connector.db.operations.ClickHouseAutoCreateTable;
import com.altinity.clickhouse.sink.connector.db.operations.ClickHouseAlterTable.ALTER_TABLE_OPERATION;
import com.altinity.clickhouse.sink.connector.metadata.TableMetaDataWriter;
import com.altinity.clickhouse.sink.connector.model.BlockMetaData;
import com.altinity.clickhouse.sink.connector.model.CdcRecordState;
import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct;
import com.altinity.clickhouse.sink.connector.model.KafkaMetaData;
import com.clickhouse.client.ClickHouseCredentials;
import com.clickhouse.client.ClickHouseNode;
import com.google.common.collect.Sets;
import com.google.common.io.BaseEncoding;
import io.debezium.data.Json;
import io.debezium.time.Date;
Expand Down Expand Up @@ -321,25 +323,45 @@ public boolean updateQueryToRecordsMap(ClickHouseStruct record, List<Field> modi
* m modifiedFields
*/
public void alterTable(List<Field> modifiedFields) {
List<Field> missingFieldsInCH = new ArrayList<Field>();
// Identify the columns that need to be added/removed in ClickHouse.
for(Field f: modifiedFields) {
String colName = f.name();
List<Field> addColumns = new ArrayList<Field>();
List<Field> dropColumns = new ArrayList<Field>();

//getAlterColumns(modifiedFields, addColumns, dropColumns, this.columnNameToDataTypeMap);
}


if(this.columnNameToDataTypeMap.containsKey(colName) == false) {
missingFieldsInCH.add(f);
public void getAlterColumns(List<Field> sourceFields,
Set<Field> addColumns, Set<String> dropColumns, Map<String, String> columnNameToDataTypeMap) {

Set<String> sourceColumnNames = new HashSet<>();
// Identify the columns that need to be added/removed in ClickHouse.
for(Field f: sourceFields) {
java.lang.String colName = f.name();
sourceColumnNames.add(colName);
columnNameToDataTypeMap.keySet();
// If the columns are missing in ClickHouse
if(columnNameToDataTypeMap.containsKey(colName) == false) {
addColumns.add(f);
}
}

if(!missingFieldsInCH.isEmpty()) {
Sets.SetView<String> difference = Sets.difference(sourceColumnNames, columnNameToDataTypeMap.keySet());
dropColumns.addAll(difference);
System.out.println();
}


public void handleAlterColumns(ArrayList<Field> columns, ALTER_TABLE_OPERATION operation) {
if(!columns.isEmpty()) {
log.info("***** ALTER TABLE ****");
ClickHouseAlterTable cat = new ClickHouseAlterTable();
Field[] missingFieldsArray = new Field[missingFieldsInCH.size()];
missingFieldsInCH.toArray(missingFieldsArray);
Field[] missingFieldsArray = new Field[columns.size()];
columns.toArray(missingFieldsArray);
Map<String, String> colNameToDataTypeMap = cat.getColumnNameToCHDataTypeMapping(missingFieldsArray);

if(!colNameToDataTypeMap.isEmpty()) {
String alterTableQuery = cat.createAlterTableSyntax(this.tableName, colNameToDataTypeMap, ClickHouseAlterTable.ALTER_TABLE_OPERATION.ADD);
String alterTableQuery = cat.createAlterTableSyntaxAddColumn(this.tableName, colNameToDataTypeMap,
operation);
log.info(" ***** ALTER TABLE QUERY **** " + alterTableQuery);

try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.altinity.clickhouse.sink.connector.db.operations;

import java.util.Map;
import java.util.Set;

/**
* Class that handles logic related to alter table
Expand All @@ -17,7 +18,7 @@ public enum ALTER_TABLE_OPERATION {
this.op = op;
}
}
public String createAlterTableSyntax(String tableName, Map<String, String> colNameToDataTypesMap, ALTER_TABLE_OPERATION operation) {
public String createAlterTableSyntaxAddColumn(String tableName, Map<String, String> colNameToDataTypesMap, ALTER_TABLE_OPERATION operation) {
// alter table <table_name>
// add column `col_name_1` data_type_1,
// add column `col_name_2` data_type_2
Expand All @@ -29,8 +30,6 @@ public String createAlterTableSyntax(String tableName, Map<String, String> colNa
for(Map.Entry<String, String> entry: colNameToDataTypesMap.entrySet()) {
if(operation.name().equalsIgnoreCase(ALTER_TABLE_OPERATION.ADD.op)) {
alterTableSyntax.append("add column ");
} else {
alterTableSyntax.append("delete column ");
}
alterTableSyntax.append("`").append(entry.getKey()).append("`").append(" ").append(entry.getValue()).append(",");
}
Expand All @@ -39,4 +38,24 @@ public String createAlterTableSyntax(String tableName, Map<String, String> colNa

return alterTableSyntax.toString();
}

public String createAlterTableSyntaxDropColumn(String tableName, Set<String> columnNames) {
// alter table <table_name>
// add column `col_name_1` data_type_1,
// add column `col_name_2` data_type_2

StringBuilder alterTableSyntax = new StringBuilder();

alterTableSyntax.append("ALTER TABLE").append(" ").append(tableName).append(" ");

for(String colName: columnNames) {
alterTableSyntax.append("drop column").append(colName).append(",");
}
//.append("drop column ");

//alterTableSyntax.append(String.join(",", columnNames));

return alterTableSyntax.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.clickhouse.jdbc.ClickHouseDataSource;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
Expand All @@ -17,7 +18,6 @@
import org.junit.jupiter.api.Tag;

import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;

Expand All @@ -26,7 +26,7 @@ public class DbWriterTest {
DbWriter writer;

@Before
public void init() throws SQLException {
public void init() {

String hostName = "remoteClickHouse";
Integer port = 8000;
Expand Down Expand Up @@ -76,6 +76,35 @@ public void testGetConnectionUrl() {
public void testIsColumnTypeDate64() {
boolean result = DbWriter.isColumnDateTime64("Nullable(DateTime64(3))");
}

@Test
public void testGetAlterColumns() {
List<Field> fields = new ArrayList<>();

// Columns to be dropped.
Map<String, String> columnNameToDataTypesMap = new HashMap<String, String>();

columnNameToDataTypesMap.put("customerName", "String");
columnNameToDataTypesMap.put("occupation", "String");
columnNameToDataTypesMap.put("quantity", "UInt32");
columnNameToDataTypesMap.put("_topic", "String");

fields.add(new Field("customerName", 0, Schema.STRING_SCHEMA));
fields.add(new Field("occupation", 1, Schema.STRING_SCHEMA));
fields.add(new Field("quantity", 2, Schema.INT32_SCHEMA));
fields.add(new Field("amount", 3, Schema.FLOAT64_SCHEMA));
//fields.add(new Field("employed", 4, Schema.BOOLEAN_SCHEMA));

List<Field> addColumns = new ArrayList<Field>();
List<Field> dropColumns = new ArrayList<Field>();

//writer.getAlterColumns(fields, addColumns, dropColumns, columnNameToDataTypesMap );

Assert.assertFalse(addColumns.isEmpty());
Assert.assertFalse(dropColumns.isEmpty());

}

@Test
@Tag("IntegrationTest")
public void testGetColumnsDataTypesForTable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,38 @@
import org.junit.Assert;
import org.junit.Test;

import java.util.HashSet;
import java.util.Set;

public class ClickHouseAlterTableTest extends ClickHouseAutoCreateTableTest {

@Test
public void createAlterTableSyntaxTest() {
public void createAlterTableSyntaxAddColumnTest() {

ClickHouseAlterTable cat = new ClickHouseAlterTable();
String alterTableQuery = cat.createAlterTableSyntax("employees",
String alterTableQuery = cat.createAlterTableSyntaxAddColumn("employees",
this.getExpectedColumnToDataTypesMap(), ClickHouseAlterTable.ALTER_TABLE_OPERATION.ADD);

String expectedQuery = "ALTER TABLE employees add column `amount` Float64,add column `occupation` String,add column `quantity` Int32,add column `amount_1` Float32,add column `customerName` String,add column `blob_storage` String,add column `employed` Bool";

Assert.assertTrue(alterTableQuery.equalsIgnoreCase(expectedQuery));
System.out.println("Alter table query" + alterTableQuery);
}

@Test
public void createAlterTableSyntaxDropColumnTest() {

Set<String> droppedColumns = new HashSet<String>();
droppedColumns.add("customerName");
droppedColumns.add("address");
droppedColumns.add("phoneNumber");

ClickHouseAlterTable cat = new ClickHouseAlterTable();
String alterTableQuery = cat.createAlterTableSyntaxDropColumn("employees",
droppedColumns);

String expectedQuery = "ALTER TABLE employees drop column address,phoneNumber,customerName";

Assert.assertTrue(alterTableQuery.equalsIgnoreCase(expectedQuery));
}
}