diff --git a/flink-connector-jdbc-architecture/pom.xml b/flink-connector-jdbc-architecture/pom.xml
index fe6fa34b2..eb04c73d2 100644
--- a/flink-connector-jdbc-architecture/pom.xml
+++ b/flink-connector-jdbc-architecture/pom.xml
@@ -42,6 +42,12 @@
${project.version}
test
+
+ org.apache.flink
+ flink-connector-jdbc-clickhouse
+ ${project.version}
+ test
+
org.apache.flink
flink-connector-jdbc-cratedb
diff --git a/flink-connector-jdbc-clickhouse/pom.xml b/flink-connector-jdbc-clickhouse/pom.xml
new file mode 100644
index 000000000..377b9bbd8
--- /dev/null
+++ b/flink-connector-jdbc-clickhouse/pom.xml
@@ -0,0 +1,94 @@
+
+
+ 4.0.0
+
+
+ org.apache.flink
+ flink-connector-jdbc-parent
+ 4.0-SNAPSHOT
+
+
+ flink-connector-jdbc-clickhouse
+ Flink : Connectors : JDBC : ClickHouse
+
+ jar
+
+
+ 0.6.2
+
+
+
+
+
+ org.apache.flink
+ flink-connector-jdbc-core
+ ${project.version}
+
+
+
+ org.apache.flink
+ flink-connector-jdbc-core
+ ${project.version}
+ test-jar
+ test
+
+
+
+ org.apache.flink
+ flink-table-api-java-bridge
+ ${flink.version}
+ provided
+ true
+
+
+
+ org.apache.flink
+ flink-table-planner_${scala.binary.version}
+ ${flink.version}
+ test
+
+
+ org.apache.flink
+ flink-table-planner_${scala.binary.version}
+ ${flink.version}
+ test-jar
+ test
+
+
+
+ org.apache.flink
+ flink-test-utils
+ ${flink.version}
+ test
+
+
+
+
+
+ com.clickhouse
+ clickhouse-jdbc
+ ${clickhouse.version}
+ provided
+
+
+
+
+
+ org.assertj
+ assertj-core
+ ${assertj.version}
+ test
+
+
+
+
+ org.testcontainers
+ clickhouse
+ test
+
+
+
+
+
diff --git a/flink-connector-jdbc-clickhouse/src/main/java/org/apache/flink/connector/jdbc/clickhouse/database/ClickHouseFactory.java b/flink-connector-jdbc-clickhouse/src/main/java/org/apache/flink/connector/jdbc/clickhouse/database/ClickHouseFactory.java
new file mode 100644
index 000000000..47b710411
--- /dev/null
+++ b/flink-connector-jdbc-clickhouse/src/main/java/org/apache/flink/connector/jdbc/clickhouse/database/ClickHouseFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.clickhouse.database;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.clickhouse.database.dialect.ClickHouseDialect;
+import org.apache.flink.connector.jdbc.core.database.JdbcFactory;
+import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog;
+import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialect;
+
+/** Factory for {@link ClickHouseDialect}. */
+@Internal
+public class ClickHouseFactory implements JdbcFactory {
+ @Override
+ public boolean acceptsURL(String url) {
+ return url.startsWith("jdbc:clickhouse:");
+ }
+
+ @Override
+ public JdbcDialect createDialect() {
+ return new ClickHouseDialect();
+ }
+
+ @Override
+ public JdbcCatalog createCatalog(
+ ClassLoader classLoader,
+ String catalogName,
+ String defaultDatabase,
+ String username,
+ String pwd,
+ String baseUrl) {
+ throw new UnsupportedOperationException("Catalog for ClickHouse is not supported yet.");
+ }
+}
diff --git a/flink-connector-jdbc-clickhouse/src/main/java/org/apache/flink/connector/jdbc/clickhouse/database/dialect/ClickHouseDialect.java b/flink-connector-jdbc-clickhouse/src/main/java/org/apache/flink/connector/jdbc/clickhouse/database/dialect/ClickHouseDialect.java
new file mode 100644
index 000000000..593d8109e
--- /dev/null
+++ b/flink-connector-jdbc-clickhouse/src/main/java/org/apache/flink/connector/jdbc/clickhouse/database/dialect/ClickHouseDialect.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.clickhouse.database.dialect;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialect;
+import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectConverter;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+
+/** JDBC dialect for ClickHouse. */
+@Internal
+public class ClickHouseDialect extends AbstractDialect {
+
+ private static final long serialVersionUID = 1L;
+
+ // Define MAX/MIN precision of TIMESTAMP type according to ClickHouse docs:
+ // https://clickhouse.com/docs/sql-reference/data-types/datetime64
+ private static final int MAX_TIMESTAMP_PRECISION = 9;
+ private static final int MIN_TIMESTAMP_PRECISION = 0;
+
+ // Define MAX/MIN precision of DECIMAL type according to ClickHouse docs:
+ // https://clickhouse.com/docs/sql-reference/data-types/decimal
+ private static final int MAX_DECIMAL_PRECISION = 76;
+ private static final int MIN_DECIMAL_PRECISION = 1;
+
+ @Override
+ public JdbcDialectConverter getRowConverter(RowType rowType) {
+ return new ClickHouseDialectConverter(rowType);
+ }
+
+ @Override
+ public String getLimitClause(long limit) {
+ return "LIMIT " + limit;
+ }
+
+ @Override
+ public Optional defaultDriverName() {
+ return Optional.of("com.clickhouse.jdbc.ClickHouseDriver");
+ }
+
+ @Override
+ public String dialectName() {
+ return "ClickHouse";
+ }
+
+ @Override
+ public String quoteIdentifier(String identifier) {
+ return identifier;
+ }
+
+ // ClickHouse does not support Upsert statements
+ // Instead you can create a table with ReplacingMergeTree engine;
+ // https://clickhouse.com/docs/engines/table-engines/mergetree-family/replacingmergetree
+ @Override
+ public Optional getUpsertStatement(
+ String tableName, String[] fieldNames, String[] uniqueKeyFields) {
+ return Optional.empty();
+ }
+
+ // ClickHouse pkField cannot be updated via sql
+ @Override
+ public String getUpdateStatement(
+ String tableName, String[] fieldNames, String[] conditionFields) {
+ String setClause =
+ Arrays.stream(fieldNames)
+ .filter(item -> !Arrays.asList(conditionFields).contains(item))
+ .map(f -> format("%s = :%s", quoteIdentifier(f), f))
+ .collect(Collectors.joining(", "));
+ String conditionClause =
+ Arrays.stream(conditionFields)
+ .map(f -> format("%s = :%s", quoteIdentifier(f), f))
+ .collect(Collectors.joining(" AND "));
+ return "ALTER TABLE "
+ + quoteIdentifier(tableName)
+ + " UPDATE "
+ + setClause
+ + " WHERE "
+ + conditionClause;
+ }
+
+ @Override
+ public Optional decimalPrecisionRange() {
+ return Optional.of(Range.of(MIN_DECIMAL_PRECISION, MAX_DECIMAL_PRECISION));
+ }
+
+ @Override
+ public Optional timestampPrecisionRange() {
+ return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION));
+ }
+
+ @Override
+ public Set supportedTypes() {
+ // The data types used in ClickHouse are list at:
+ // https://clickhouse.com/docs/sql-reference/data-types
+
+ return EnumSet.of(
+ LogicalTypeRoot.CHAR,
+ LogicalTypeRoot.VARCHAR,
+ LogicalTypeRoot.BOOLEAN,
+ LogicalTypeRoot.DECIMAL,
+ LogicalTypeRoot.TINYINT,
+ LogicalTypeRoot.SMALLINT,
+ LogicalTypeRoot.INTEGER,
+ LogicalTypeRoot.BIGINT,
+ LogicalTypeRoot.FLOAT,
+ LogicalTypeRoot.DOUBLE,
+ LogicalTypeRoot.DATE,
+ LogicalTypeRoot.MAP,
+ LogicalTypeRoot.ARRAY,
+ LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE,
+ LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
+ LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE);
+ }
+}
diff --git a/flink-connector-jdbc-clickhouse/src/main/java/org/apache/flink/connector/jdbc/clickhouse/database/dialect/ClickHouseDialectConverter.java b/flink-connector-jdbc-clickhouse/src/main/java/org/apache/flink/connector/jdbc/clickhouse/database/dialect/ClickHouseDialectConverter.java
new file mode 100644
index 000000000..597a24109
--- /dev/null
+++ b/flink-connector-jdbc-clickhouse/src/main/java/org/apache/flink/connector/jdbc/clickhouse/database/dialect/ClickHouseDialectConverter.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.clickhouse.database.dialect;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialectConverter;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Runtime converter that responsible to convert between JDBC object and Flink internal object for
+ * ClickHouse.
+ */
+@Internal
+public class ClickHouseDialectConverter extends AbstractDialectConverter {
+
+ private static final long serialVersionUID = 1L;
+
+ public ClickHouseDialectConverter(RowType rowType) {
+ super(rowType);
+ }
+
+ @Override
+ public JdbcDeserializationConverter createInternalConverter(LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case NULL:
+ return null;
+ case BOOLEAN:
+ case TINYINT:
+ return val -> ((Integer) val).byteValue();
+ case SMALLINT:
+ return val -> val instanceof Integer ? ((Integer) val).shortValue() : val;
+ case INTEGER:
+ case INTERVAL_YEAR_MONTH:
+ case BIGINT:
+ case INTERVAL_DAY_TIME:
+ case FLOAT:
+ case DOUBLE:
+ case BINARY:
+ case VARBINARY:
+ return val -> val;
+ case CHAR:
+ case VARCHAR:
+ return val -> StringData.fromString((String) val);
+ case DATE:
+ return val -> (Date) val;
+ case TIME_WITHOUT_TIME_ZONE:
+ return val -> (Time) val;
+ case TIMESTAMP_WITH_TIME_ZONE:
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return val ->
+ val instanceof LocalDateTime
+ ? TimestampData.fromLocalDateTime((LocalDateTime) val)
+ : TimestampData.fromTimestamp((Timestamp) val);
+ case DECIMAL:
+ final int precision = ((DecimalType) type).getPrecision();
+ final int scale = ((DecimalType) type).getScale();
+ return val ->
+ val instanceof BigInteger
+ ? DecimalData.fromBigDecimal(
+ new BigDecimal((BigInteger) val, 0), precision, scale)
+ : DecimalData.fromBigDecimal((BigDecimal) val, precision, scale);
+ case MAP:
+ return val -> (MapData) val;
+ case ARRAY:
+ return val -> (ArrayData) val;
+ default:
+ return super.createInternalConverter(type);
+ }
+ }
+
+ @Override
+ public JdbcSerializationConverter createExternalConverter(LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case MAP:
+ return (val, index, statement) ->
+ statement.setObject(index, toExternalSerializer(val.getMap(index), type));
+ case ARRAY:
+ return (val, index, statement) ->
+ statement.setObject(index, toExternalSerializer(val.getArray(index), type));
+ default:
+ return super.createExternalConverter(type);
+ }
+ }
+
+ // adding support to MAP and ARRAY types
+ public static Object toExternalSerializer(Object value, LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case BOOLEAN:
+ case TINYINT:
+ return ((Integer) value).byteValue();
+ case SMALLINT:
+ return value instanceof Integer ? ((Integer) value).shortValue() : value;
+ case INTEGER:
+ case INTERVAL_YEAR_MONTH:
+ case BIGINT:
+ case INTERVAL_DAY_TIME:
+ case FLOAT:
+ case DOUBLE:
+ case BINARY:
+ case VARBINARY:
+ return value;
+ case CHAR:
+ case VARCHAR:
+ return value.toString();
+ case DATE:
+ return (Date) value;
+ case TIME_WITHOUT_TIME_ZONE:
+ return (Time) value;
+ case TIMESTAMP_WITH_TIME_ZONE:
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return value instanceof LocalDateTime
+ ? TimestampData.fromLocalDateTime((LocalDateTime) value)
+ : TimestampData.fromTimestamp((Timestamp) value);
+ case DECIMAL:
+ final int precision = ((DecimalType) type).getPrecision();
+ final int scale = ((DecimalType) type).getScale();
+ return value instanceof BigInteger
+ ? DecimalData.fromBigDecimal(
+ new BigDecimal((BigInteger) value, 0), precision, scale)
+ : DecimalData.fromBigDecimal((BigDecimal) value, precision, scale);
+ case ARRAY:
+ LogicalType elementType =
+ ((ArrayType) type)
+ .getChildren().stream()
+ .findFirst()
+ .orElseThrow(
+ () ->
+ new RuntimeException(
+ "Unknown array element type"));
+ ArrayData.ElementGetter elementGetter = ArrayData.createElementGetter(elementType);
+ ArrayData arrayData = ((ArrayData) value);
+ Object[] objectArray = new Object[arrayData.size()];
+ for (int i = 0; i < arrayData.size(); i++) {
+ objectArray[i] =
+ toExternalSerializer(
+ elementGetter.getElementOrNull(arrayData, i), elementType);
+ }
+ return objectArray;
+ case MAP:
+ LogicalType keyType = ((MapType) type).getKeyType();
+ LogicalType valueType = ((MapType) type).getValueType();
+ ArrayData.ElementGetter keyGetter = ArrayData.createElementGetter(keyType);
+ ArrayData.ElementGetter valueGetter = ArrayData.createElementGetter(valueType);
+ MapData mapData = (MapData) value;
+ ArrayData keyArrayData = mapData.keyArray();
+ ArrayData valueArrayData = mapData.valueArray();
+ Map