diff --git a/flink-connector-jdbc-architecture/pom.xml b/flink-connector-jdbc-architecture/pom.xml index fe6fa34b2..eb04c73d2 100644 --- a/flink-connector-jdbc-architecture/pom.xml +++ b/flink-connector-jdbc-architecture/pom.xml @@ -42,6 +42,12 @@ ${project.version} test + + org.apache.flink + flink-connector-jdbc-clickhouse + ${project.version} + test + org.apache.flink flink-connector-jdbc-cratedb diff --git a/flink-connector-jdbc-clickhouse/pom.xml b/flink-connector-jdbc-clickhouse/pom.xml new file mode 100644 index 000000000..377b9bbd8 --- /dev/null +++ b/flink-connector-jdbc-clickhouse/pom.xml @@ -0,0 +1,94 @@ + + + 4.0.0 + + + org.apache.flink + flink-connector-jdbc-parent + 4.0-SNAPSHOT + + + flink-connector-jdbc-clickhouse + Flink : Connectors : JDBC : ClickHouse + + jar + + + 0.6.2 + + + + + + org.apache.flink + flink-connector-jdbc-core + ${project.version} + + + + org.apache.flink + flink-connector-jdbc-core + ${project.version} + test-jar + test + + + + org.apache.flink + flink-table-api-java-bridge + ${flink.version} + provided + true + + + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${flink.version} + test + + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${flink.version} + test-jar + test + + + + org.apache.flink + flink-test-utils + ${flink.version} + test + + + + + + com.clickhouse + clickhouse-jdbc + ${clickhouse.version} + provided + + + + + + org.assertj + assertj-core + ${assertj.version} + test + + + + + org.testcontainers + clickhouse + test + + + + + diff --git a/flink-connector-jdbc-clickhouse/src/main/java/org/apache/flink/connector/jdbc/clickhouse/database/ClickHouseFactory.java b/flink-connector-jdbc-clickhouse/src/main/java/org/apache/flink/connector/jdbc/clickhouse/database/ClickHouseFactory.java new file mode 100644 index 000000000..47b710411 --- /dev/null +++ b/flink-connector-jdbc-clickhouse/src/main/java/org/apache/flink/connector/jdbc/clickhouse/database/ClickHouseFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.clickhouse.database; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.clickhouse.database.dialect.ClickHouseDialect; +import org.apache.flink.connector.jdbc.core.database.JdbcFactory; +import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog; +import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialect; + +/** Factory for {@link ClickHouseDialect}. */ +@Internal +public class ClickHouseFactory implements JdbcFactory { + @Override + public boolean acceptsURL(String url) { + return url.startsWith("jdbc:clickhouse:"); + } + + @Override + public JdbcDialect createDialect() { + return new ClickHouseDialect(); + } + + @Override + public JdbcCatalog createCatalog( + ClassLoader classLoader, + String catalogName, + String defaultDatabase, + String username, + String pwd, + String baseUrl) { + throw new UnsupportedOperationException("Catalog for ClickHouse is not supported yet."); + } +} diff --git a/flink-connector-jdbc-clickhouse/src/main/java/org/apache/flink/connector/jdbc/clickhouse/database/dialect/ClickHouseDialect.java b/flink-connector-jdbc-clickhouse/src/main/java/org/apache/flink/connector/jdbc/clickhouse/database/dialect/ClickHouseDialect.java new file mode 100644 index 000000000..593d8109e --- /dev/null +++ b/flink-connector-jdbc-clickhouse/src/main/java/org/apache/flink/connector/jdbc/clickhouse/database/dialect/ClickHouseDialect.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.clickhouse.database.dialect; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialect; +import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectConverter; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Arrays; +import java.util.EnumSet; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static java.lang.String.format; + +/** JDBC dialect for ClickHouse. */ +@Internal +public class ClickHouseDialect extends AbstractDialect { + + private static final long serialVersionUID = 1L; + + // Define MAX/MIN precision of TIMESTAMP type according to ClickHouse docs: + // https://clickhouse.com/docs/sql-reference/data-types/datetime64 + private static final int MAX_TIMESTAMP_PRECISION = 9; + private static final int MIN_TIMESTAMP_PRECISION = 0; + + // Define MAX/MIN precision of DECIMAL type according to ClickHouse docs: + // https://clickhouse.com/docs/sql-reference/data-types/decimal + private static final int MAX_DECIMAL_PRECISION = 76; + private static final int MIN_DECIMAL_PRECISION = 1; + + @Override + public JdbcDialectConverter getRowConverter(RowType rowType) { + return new ClickHouseDialectConverter(rowType); + } + + @Override + public String getLimitClause(long limit) { + return "LIMIT " + limit; + } + + @Override + public Optional defaultDriverName() { + return Optional.of("com.clickhouse.jdbc.ClickHouseDriver"); + } + + @Override + public String dialectName() { + return "ClickHouse"; + } + + @Override + public String quoteIdentifier(String identifier) { + return identifier; + } + + // ClickHouse does not support Upsert statements + // Instead you can create a table with ReplacingMergeTree engine; + // https://clickhouse.com/docs/engines/table-engines/mergetree-family/replacingmergetree + @Override + public Optional getUpsertStatement( + String tableName, String[] fieldNames, String[] uniqueKeyFields) { + return Optional.empty(); + } + + // ClickHouse pkField cannot be updated via sql + @Override + public String getUpdateStatement( + String tableName, String[] fieldNames, String[] conditionFields) { + String setClause = + Arrays.stream(fieldNames) + .filter(item -> !Arrays.asList(conditionFields).contains(item)) + .map(f -> format("%s = :%s", quoteIdentifier(f), f)) + .collect(Collectors.joining(", ")); + String conditionClause = + Arrays.stream(conditionFields) + .map(f -> format("%s = :%s", quoteIdentifier(f), f)) + .collect(Collectors.joining(" AND ")); + return "ALTER TABLE " + + quoteIdentifier(tableName) + + " UPDATE " + + setClause + + " WHERE " + + conditionClause; + } + + @Override + public Optional decimalPrecisionRange() { + return Optional.of(Range.of(MIN_DECIMAL_PRECISION, MAX_DECIMAL_PRECISION)); + } + + @Override + public Optional timestampPrecisionRange() { + return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION)); + } + + @Override + public Set supportedTypes() { + // The data types used in ClickHouse are list at: + // https://clickhouse.com/docs/sql-reference/data-types + + return EnumSet.of( + LogicalTypeRoot.CHAR, + LogicalTypeRoot.VARCHAR, + LogicalTypeRoot.BOOLEAN, + LogicalTypeRoot.DECIMAL, + LogicalTypeRoot.TINYINT, + LogicalTypeRoot.SMALLINT, + LogicalTypeRoot.INTEGER, + LogicalTypeRoot.BIGINT, + LogicalTypeRoot.FLOAT, + LogicalTypeRoot.DOUBLE, + LogicalTypeRoot.DATE, + LogicalTypeRoot.MAP, + LogicalTypeRoot.ARRAY, + LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, + LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, + LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE); + } +} diff --git a/flink-connector-jdbc-clickhouse/src/main/java/org/apache/flink/connector/jdbc/clickhouse/database/dialect/ClickHouseDialectConverter.java b/flink-connector-jdbc-clickhouse/src/main/java/org/apache/flink/connector/jdbc/clickhouse/database/dialect/ClickHouseDialectConverter.java new file mode 100644 index 000000000..597a24109 --- /dev/null +++ b/flink-connector-jdbc-clickhouse/src/main/java/org/apache/flink/connector/jdbc/clickhouse/database/dialect/ClickHouseDialectConverter.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.clickhouse.database.dialect; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialectConverter; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.Map; + +/** + * Runtime converter that responsible to convert between JDBC object and Flink internal object for + * ClickHouse. + */ +@Internal +public class ClickHouseDialectConverter extends AbstractDialectConverter { + + private static final long serialVersionUID = 1L; + + public ClickHouseDialectConverter(RowType rowType) { + super(rowType); + } + + @Override + public JdbcDeserializationConverter createInternalConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case NULL: + return null; + case BOOLEAN: + case TINYINT: + return val -> ((Integer) val).byteValue(); + case SMALLINT: + return val -> val instanceof Integer ? ((Integer) val).shortValue() : val; + case INTEGER: + case INTERVAL_YEAR_MONTH: + case BIGINT: + case INTERVAL_DAY_TIME: + case FLOAT: + case DOUBLE: + case BINARY: + case VARBINARY: + return val -> val; + case CHAR: + case VARCHAR: + return val -> StringData.fromString((String) val); + case DATE: + return val -> (Date) val; + case TIME_WITHOUT_TIME_ZONE: + return val -> (Time) val; + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + return val -> + val instanceof LocalDateTime + ? TimestampData.fromLocalDateTime((LocalDateTime) val) + : TimestampData.fromTimestamp((Timestamp) val); + case DECIMAL: + final int precision = ((DecimalType) type).getPrecision(); + final int scale = ((DecimalType) type).getScale(); + return val -> + val instanceof BigInteger + ? DecimalData.fromBigDecimal( + new BigDecimal((BigInteger) val, 0), precision, scale) + : DecimalData.fromBigDecimal((BigDecimal) val, precision, scale); + case MAP: + return val -> (MapData) val; + case ARRAY: + return val -> (ArrayData) val; + default: + return super.createInternalConverter(type); + } + } + + @Override + public JdbcSerializationConverter createExternalConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case MAP: + return (val, index, statement) -> + statement.setObject(index, toExternalSerializer(val.getMap(index), type)); + case ARRAY: + return (val, index, statement) -> + statement.setObject(index, toExternalSerializer(val.getArray(index), type)); + default: + return super.createExternalConverter(type); + } + } + + // adding support to MAP and ARRAY types + public static Object toExternalSerializer(Object value, LogicalType type) { + switch (type.getTypeRoot()) { + case BOOLEAN: + case TINYINT: + return ((Integer) value).byteValue(); + case SMALLINT: + return value instanceof Integer ? ((Integer) value).shortValue() : value; + case INTEGER: + case INTERVAL_YEAR_MONTH: + case BIGINT: + case INTERVAL_DAY_TIME: + case FLOAT: + case DOUBLE: + case BINARY: + case VARBINARY: + return value; + case CHAR: + case VARCHAR: + return value.toString(); + case DATE: + return (Date) value; + case TIME_WITHOUT_TIME_ZONE: + return (Time) value; + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + return value instanceof LocalDateTime + ? TimestampData.fromLocalDateTime((LocalDateTime) value) + : TimestampData.fromTimestamp((Timestamp) value); + case DECIMAL: + final int precision = ((DecimalType) type).getPrecision(); + final int scale = ((DecimalType) type).getScale(); + return value instanceof BigInteger + ? DecimalData.fromBigDecimal( + new BigDecimal((BigInteger) value, 0), precision, scale) + : DecimalData.fromBigDecimal((BigDecimal) value, precision, scale); + case ARRAY: + LogicalType elementType = + ((ArrayType) type) + .getChildren().stream() + .findFirst() + .orElseThrow( + () -> + new RuntimeException( + "Unknown array element type")); + ArrayData.ElementGetter elementGetter = ArrayData.createElementGetter(elementType); + ArrayData arrayData = ((ArrayData) value); + Object[] objectArray = new Object[arrayData.size()]; + for (int i = 0; i < arrayData.size(); i++) { + objectArray[i] = + toExternalSerializer( + elementGetter.getElementOrNull(arrayData, i), elementType); + } + return objectArray; + case MAP: + LogicalType keyType = ((MapType) type).getKeyType(); + LogicalType valueType = ((MapType) type).getValueType(); + ArrayData.ElementGetter keyGetter = ArrayData.createElementGetter(keyType); + ArrayData.ElementGetter valueGetter = ArrayData.createElementGetter(valueType); + MapData mapData = (MapData) value; + ArrayData keyArrayData = mapData.keyArray(); + ArrayData valueArrayData = mapData.valueArray(); + Map objectMap = new HashMap<>(keyArrayData.size()); + for (int i = 0; i < keyArrayData.size(); i++) { + objectMap.put( + toExternalSerializer( + keyGetter.getElementOrNull(keyArrayData, i), keyType), + toExternalSerializer( + valueGetter.getElementOrNull(valueArrayData, i), valueType)); + } + return objectMap; + default: + throw new UnsupportedOperationException("Unsupported type:" + type); + } + } + + @Override + public String converterName() { + return "ClickHouse"; + } +} diff --git a/flink-connector-jdbc-clickhouse/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.core.database.JdbcFactory b/flink-connector-jdbc-clickhouse/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.core.database.JdbcFactory new file mode 100644 index 000000000..d2c8fdae7 --- /dev/null +++ b/flink-connector-jdbc-clickhouse/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.core.database.JdbcFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.connector.jdbc.clickhouse.database.ClickHouseFactory \ No newline at end of file diff --git a/flink-connector-jdbc-clickhouse/src/test/java/org/apache/flink/connector/jdbc/clickhouse/ClickHouseTestBase.java b/flink-connector-jdbc-clickhouse/src/test/java/org/apache/flink/connector/jdbc/clickhouse/ClickHouseTestBase.java new file mode 100644 index 000000000..facba6472 --- /dev/null +++ b/flink-connector-jdbc-clickhouse/src/test/java/org/apache/flink/connector/jdbc/clickhouse/ClickHouseTestBase.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.clickhouse; + +import org.apache.flink.connector.jdbc.clickhouse.testutils.ClickHouseDatabase; +import org.apache.flink.connector.jdbc.clickhouse.testutils.ClickHouseTableRow; +import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata; +import org.apache.flink.connector.jdbc.testutils.DatabaseTest; +import org.apache.flink.connector.jdbc.testutils.tables.TableField; +import org.apache.flink.connector.jdbc.testutils.tables.TableRow; + +import org.junit.jupiter.api.extension.ExtendWith; + +/** Base class for ClickHouse testing. */ +@ExtendWith(ClickHouseDatabase.class) +public interface ClickHouseTestBase extends DatabaseTest { + + @Override + default DatabaseMetadata getMetadata() { + return ClickHouseDatabase.getMetadata(); + } + + static TableRow tableRow(String name, TableField... fields) { + return new ClickHouseTableRow(name, fields); + } +} diff --git a/flink-connector-jdbc-clickhouse/src/test/java/org/apache/flink/connector/jdbc/clickhouse/database/dialect/ClickHouseDialectTest.java b/flink-connector-jdbc-clickhouse/src/test/java/org/apache/flink/connector/jdbc/clickhouse/database/dialect/ClickHouseDialectTest.java new file mode 100644 index 000000000..381e4c8db --- /dev/null +++ b/flink-connector-jdbc-clickhouse/src/test/java/org/apache/flink/connector/jdbc/clickhouse/database/dialect/ClickHouseDialectTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.clickhouse.database.dialect; + +import org.apache.flink.connector.jdbc.clickhouse.ClickHouseTestBase; +import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectTest; + +import java.util.Arrays; +import java.util.List; + +/** The ClickHouse params for {@link JdbcDialectTest}. */ +class ClickHouseDialectTest extends JdbcDialectTest implements ClickHouseTestBase { + + @Override + protected List testData() { + return Arrays.asList( + createTestItem("CHAR"), + createTestItem("VARCHAR"), + createTestItem("BOOLEAN"), + createTestItem("DECIMAL(10, 4)"), + createTestItem("DECIMAL(38, 18)"), + createTestItem("TINYINT"), + createTestItem("SMALLINT"), + createTestItem("INTEGER"), + createTestItem("BIGINT"), + createTestItem("FLOAT"), + createTestItem("DOUBLE"), + createTestItem("DATE"), + createTestItem("TIMESTAMP"), + createTestItem("TIMESTAMP WITHOUT TIME ZONE"), + createTestItem("ARRAY"), + + // Not valid data + createTestItem("BINARY", "The ClickHouse dialect doesn't support type: BINARY(1)."), + createTestItem( + "VARBINARY(10)", + "The ClickHouse dialect doesn't support type: VARBINARY(10).")); + } +} diff --git a/flink-connector-jdbc-clickhouse/src/test/java/org/apache/flink/connector/jdbc/clickhouse/database/dialect/ClickHousePreparedStatementTest.java b/flink-connector-jdbc-clickhouse/src/test/java/org/apache/flink/connector/jdbc/clickhouse/database/dialect/ClickHousePreparedStatementTest.java new file mode 100644 index 000000000..72624785c --- /dev/null +++ b/flink-connector-jdbc-clickhouse/src/test/java/org/apache/flink/connector/jdbc/clickhouse/database/dialect/ClickHousePreparedStatementTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.clickhouse.database.dialect; + +import org.apache.flink.connector.jdbc.core.database.JdbcFactoryLoader; +import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialect; +import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl; + +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static java.util.Collections.singletonList; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link ClickHousePreparedStatementTest}. */ +class ClickHousePreparedStatementTest { + + private final JdbcDialect dialect = + JdbcFactoryLoader.loadDialect( + "jdbc:clickhouse://localhost:8123/default", getClass().getClassLoader()); + private final String[] fieldNames = + new String[] {"id", "name", "email", "ts", "field1", "field_2", "__field_3__"}; + private final String[] keyFields = new String[] {"id", "__field_3__"}; + private final String tableName = "tbl"; + + @Test + void testInsertStatement() { + String insertStmt = dialect.getInsertIntoStatement(tableName, fieldNames); + assertThat(insertStmt) + .isEqualTo( + "INSERT INTO tbl(id, name, email, ts, field1, field_2, __field_3__) " + + "VALUES (:id, :name, :email, :ts, :field1, :field_2, :__field_3__)"); + NamedStatementMatcher.parsedSql( + "INSERT INTO tbl(id, name, email, ts, field1, field_2, __field_3__) " + + "VALUES (?, ?, ?, ?, ?, ?, ?)") + .parameter("id", singletonList(1)) + .parameter("name", singletonList(2)) + .parameter("email", singletonList(3)) + .parameter("ts", singletonList(4)) + .parameter("field1", singletonList(5)) + .parameter("field_2", singletonList(6)) + .parameter("__field_3__", singletonList(7)) + .matches(insertStmt); + } + + @Test + void testDeleteStatement() { + String deleteStmt = dialect.getDeleteStatement(tableName, keyFields); + assertThat(deleteStmt) + .isEqualTo("DELETE FROM tbl WHERE id = :id AND __field_3__ = :__field_3__"); + NamedStatementMatcher.parsedSql("DELETE FROM tbl WHERE id = ? AND __field_3__ = ?") + .parameter("id", singletonList(1)) + .parameter("__field_3__", singletonList(2)) + .matches(deleteStmt); + } + + @Test + void testRowExistsStatement() { + String rowExistStmt = dialect.getRowExistsStatement(tableName, keyFields); + assertThat(rowExistStmt) + .isEqualTo("SELECT 1 FROM tbl WHERE id = :id AND __field_3__ = :__field_3__"); + NamedStatementMatcher.parsedSql("SELECT 1 FROM tbl WHERE id = ? AND __field_3__ = ?") + .parameter("id", singletonList(1)) + .parameter("__field_3__", singletonList(2)) + .matches(rowExistStmt); + } + + @Test + void testUpdateStatement() { + String updateStmt = dialect.getUpdateStatement(tableName, fieldNames, keyFields); + assertThat(updateStmt) + .isEqualTo( + "ALTER TABLE tbl UPDATE name = :name, email = :email, ts = :ts, " + + "field1 = :field1, field_2 = :field_2 " + + "WHERE id = :id AND __field_3__ = :__field_3__"); + NamedStatementMatcher.parsedSql( + "ALTER TABLE tbl UPDATE name = ?, email = ?, ts = ?, field1 = ?, " + + "field_2 = ? WHERE id = ? AND __field_3__ = ?") + .parameter("name", singletonList(1)) + .parameter("email", singletonList(2)) + .parameter("ts", singletonList(3)) + .parameter("field1", singletonList(4)) + .parameter("field_2", singletonList(5)) + .parameter("id", singletonList(6)) + .parameter("__field_3__", singletonList(7)) + .matches(updateStmt); + } + + @Test + void testUpsertStatement() { + Optional upsertStmt = dialect.getUpsertStatement(tableName, fieldNames, keyFields); + assertThat(upsertStmt).isEqualTo(Optional.empty()); + } + + @Test + void testSelectStatement() { + String selectStmt = dialect.getSelectFromStatement(tableName, fieldNames, keyFields); + assertThat(selectStmt) + .isEqualTo( + "SELECT id, name, email, ts, field1, field_2, __field_3__ FROM tbl " + + "WHERE id = :id AND __field_3__ = :__field_3__"); + NamedStatementMatcher.parsedSql( + "SELECT id, name, email, ts, field1, field_2, __field_3__ FROM tbl " + + "WHERE id = ? AND __field_3__ = ?") + .parameter("id", singletonList(1)) + .parameter("__field_3__", singletonList(2)) + .matches(selectStmt); + } + + private static class NamedStatementMatcher { + private String parsedSql; + private Map> parameterMap = new HashMap<>(); + + public static NamedStatementMatcher parsedSql(String parsedSql) { + NamedStatementMatcher spec = new NamedStatementMatcher(); + spec.parsedSql = parsedSql; + return spec; + } + + public NamedStatementMatcher parameter(String name, List index) { + this.parameterMap.put(name, index); + return this; + } + + public void matches(String statement) { + Map> actualParams = new HashMap<>(); + String actualParsedStmt = + FieldNamedPreparedStatementImpl.parseNamedStatement(statement, actualParams); + assertThat(actualParsedStmt).isEqualTo(parsedSql); + assertThat(actualParams).isEqualTo(parameterMap); + } + } +} diff --git a/flink-connector-jdbc-clickhouse/src/test/java/org/apache/flink/connector/jdbc/clickhouse/table/ClickHouseDynamicTableSinkITCase.java b/flink-connector-jdbc-clickhouse/src/test/java/org/apache/flink/connector/jdbc/clickhouse/table/ClickHouseDynamicTableSinkITCase.java new file mode 100644 index 000000000..a66c5a787 --- /dev/null +++ b/flink-connector-jdbc-clickhouse/src/test/java/org/apache/flink/connector/jdbc/clickhouse/table/ClickHouseDynamicTableSinkITCase.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.clickhouse.table; + +import org.apache.flink.connector.jdbc.clickhouse.ClickHouseTestBase; +import org.apache.flink.connector.jdbc.clickhouse.database.dialect.ClickHouseDialect; +import org.apache.flink.connector.jdbc.core.table.sink.JdbcDynamicTableSinkITCase; +import org.apache.flink.connector.jdbc.testutils.tables.TableRow; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.types.Row; + +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.connector.jdbc.clickhouse.ClickHouseTestBase.tableRow; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.pkField; + +/** The Table Sink ITCase for {@link ClickHouseDialect}. */ +class ClickHouseDynamicTableSinkITCase extends JdbcDynamicTableSinkITCase + implements ClickHouseTestBase { + + @Override + protected TableRow createUpsertOutputTable() { + return tableRow( + "dynamicSinkForUpsert", + pkField("cnt", DataTypes.BIGINT().notNull()), + pkField("lencnt", DataTypes.BIGINT().notNull()), + field("cTag", DataTypes.INT().notNull()), + field("ts", dbType("DateTime64(6)"), DataTypes.TIMESTAMP())); + } + + @Override + protected TableRow createAppendOutputTable() { + return tableRow( + "dynamicSinkForAppend", + pkField("id", DataTypes.INT().notNull()), + field("num", dbType("Int64"), DataTypes.BIGINT().notNull()), + field("ts", dbType("DateTime64(6)"), DataTypes.TIMESTAMP())); + } + + @Override + protected TableRow createBatchOutputTable() { + return tableRow( + "dynamicSinkForBatch", + field("NAME", DataTypes.VARCHAR(20).notNull()), + field("SCORE", dbType("Int64"), DataTypes.BIGINT().notNull())); + } + + @Override + protected TableRow createRealOutputTable() { + return tableRow("REAL_TABLE", field("real_data", dbType("Float32"), DataTypes.FLOAT())); + } + + @Override + protected TableRow createUserOutputTable() { + return tableRow( + "USER_TABLE", + pkField("user_id", DataTypes.VARCHAR(20).notNull()), + pkField("user_name", DataTypes.VARCHAR(20).notNull()), + field("email", DataTypes.VARCHAR(255)), + field("balance", DataTypes.DECIMAL(18, 2)), + field("balance2", DataTypes.DECIMAL(18, 2))); + } + + @Override + protected TableRow createCheckpointOutputTable() { + return tableRow("checkpointTable", field("id", DataTypes.BIGINT().notNull())); + } + + @Override + protected List testUserData() { + return Arrays.asList( + Row.of( + "user1", + "Tom", + "tom123@gmail.com", + new BigDecimal("8.10"), + new BigDecimal("16.20")), + Row.of( + "user3", + "Bailey", + "bailey@qq.com", + new BigDecimal("9.99"), + new BigDecimal("19.98")), + Row.of( + "user4", + "Tina", + "tina@gmail.com", + new BigDecimal("11.30"), + new BigDecimal("22.60"))); + } +} diff --git a/flink-connector-jdbc-clickhouse/src/test/java/org/apache/flink/connector/jdbc/clickhouse/table/ClickHouseDynamicTableSourceITCase.java b/flink-connector-jdbc-clickhouse/src/test/java/org/apache/flink/connector/jdbc/clickhouse/table/ClickHouseDynamicTableSourceITCase.java new file mode 100644 index 000000000..c80b98773 --- /dev/null +++ b/flink-connector-jdbc-clickhouse/src/test/java/org/apache/flink/connector/jdbc/clickhouse/table/ClickHouseDynamicTableSourceITCase.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.clickhouse.table; + +import org.apache.flink.connector.jdbc.clickhouse.ClickHouseTestBase; +import org.apache.flink.connector.jdbc.clickhouse.database.dialect.ClickHouseDialect; +import org.apache.flink.connector.jdbc.core.table.source.JdbcDynamicTableSourceITCase; +import org.apache.flink.connector.jdbc.testutils.tables.TableRow; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.types.Row; + +import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.connector.jdbc.clickhouse.ClickHouseTestBase.tableRow; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.pkField; + +/** The Table Source ITCase for {@link ClickHouseDialect}. */ +class ClickHouseDynamicTableSourceITCase extends JdbcDynamicTableSourceITCase + implements ClickHouseTestBase { + + @Override + protected TableRow createInputTable() { + return tableRow( + "jdbcDynamicTableSource", + pkField("id", dbType("Int64"), DataTypes.BIGINT().notNull()), + field("decimal_col", dbType("Decimal64(4)"), DataTypes.DECIMAL(10, 4)), + field("timestamp6_col", dbType("DateTime64(6)"), DataTypes.TIMESTAMP(6)) + // other fields + // field("real_col", dbType("Float64"), DataTypes.DOUBLE()), + // field("double_col", dbType("Float64"), DataTypes.DOUBLE()), + // field("time_col", dbType("Time"), DataTypes.TIME()), + // field("timestamp9_col", dbType("DateTime(6)"), + // DataTypes.TIMESTAMP(6)) + // field("array_col", dbType("Array(String)"), + // DataTypes.ARRAY(DataTypes.STRING())) + ); + } + + @Override + protected List getTestData() { + String[] testArray = {"red", "green", "blue"}; + return Arrays.asList( + Row.of( + 1L, + BigDecimal.valueOf(100.1234), + LocalDateTime.parse("2020-01-01T15:35:00.123456") + // 1.175E-37D, + // 1.79769E308D, + // LocalTime.parse("15:35"), + // LocalDateTime.parse("2020-01-01T15:35:00.123456") + // testArray + ), + Row.of( + 2L, + BigDecimal.valueOf(101.1234), + LocalDateTime.parse("2020-01-01T15:36:01.123456") + // -1.175E-37D, + // -1.79769E308, + // LocalTime.parse("15:36:01"), + // LocalDateTime.parse("2020-01-01T15:36:01.123456") + // testArray + )); + } +} diff --git a/flink-connector-jdbc-clickhouse/src/test/java/org/apache/flink/connector/jdbc/clickhouse/testutils/ClickHouseDatabase.java b/flink-connector-jdbc-clickhouse/src/test/java/org/apache/flink/connector/jdbc/clickhouse/testutils/ClickHouseDatabase.java new file mode 100644 index 000000000..3b73cdbe7 --- /dev/null +++ b/flink-connector-jdbc-clickhouse/src/test/java/org/apache/flink/connector/jdbc/clickhouse/testutils/ClickHouseDatabase.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.clickhouse.testutils; + +import org.apache.flink.connector.jdbc.testutils.DatabaseExtension; +import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata; +import org.apache.flink.connector.jdbc.testutils.DatabaseResource; +import org.apache.flink.connector.jdbc.testutils.resources.DockerResource; +import org.apache.flink.util.FlinkRuntimeException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; + +/** ClickHouse database for testing. */ +public class ClickHouseDatabase extends DatabaseExtension implements ClickHouseImages { + + private static final Logger LOG = LoggerFactory.getLogger(ClickHouseDatabase.class); + + private static final ClickHouseContainer CONTAINER = + new ClickHouseContainer(CLICKHOUSE_LATEST) + .withUsername("flink_test_user") + .withPassword("flink_test_password") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + private static ClickHouseMetadata metadata; + + public static ClickHouseMetadata getMetadata() { + if (!CONTAINER.isRunning()) { + throw new FlinkRuntimeException("Container is stopped."); + } + if (metadata == null) { + metadata = new ClickHouseMetadata(CONTAINER); + } + return metadata; + } + + @Override + protected DatabaseMetadata getMetadataDB() { + return getMetadata(); + } + + @Override + protected DatabaseResource getResource() { + return new DockerResource(CONTAINER); + } +} diff --git a/flink-connector-jdbc-clickhouse/src/test/java/org/apache/flink/connector/jdbc/clickhouse/testutils/ClickHouseImages.java b/flink-connector-jdbc-clickhouse/src/test/java/org/apache/flink/connector/jdbc/clickhouse/testutils/ClickHouseImages.java new file mode 100644 index 000000000..fcc10880d --- /dev/null +++ b/flink-connector-jdbc-clickhouse/src/test/java/org/apache/flink/connector/jdbc/clickhouse/testutils/ClickHouseImages.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.clickhouse.testutils; + +/** ClickHouse docker images. */ +public interface ClickHouseImages { + String CLICKHOUSE_LATEST = "clickhouse/clickhouse-server"; +} diff --git a/flink-connector-jdbc-clickhouse/src/test/java/org/apache/flink/connector/jdbc/clickhouse/testutils/ClickHouseMetadata.java b/flink-connector-jdbc-clickhouse/src/test/java/org/apache/flink/connector/jdbc/clickhouse/testutils/ClickHouseMetadata.java new file mode 100644 index 000000000..f60b79d01 --- /dev/null +++ b/flink-connector-jdbc-clickhouse/src/test/java/org/apache/flink/connector/jdbc/clickhouse/testutils/ClickHouseMetadata.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.clickhouse.testutils; + +import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata; + +import org.testcontainers.clickhouse.ClickHouseContainer; + +import javax.sql.XADataSource; + +/** ClickHouse Metadata. */ +public class ClickHouseMetadata implements DatabaseMetadata { + + private final String username; + private final String password; + private final String url; + private final String driver; + private final String version; + + public ClickHouseMetadata(ClickHouseContainer container) { + this.username = container.getUsername(); + this.password = container.getPassword(); + this.url = container.getJdbcUrl(); + this.driver = container.getDriverClassName(); + this.version = container.getDockerImageName(); + } + + @Override + public String getJdbcUrl() { + return this.url; + } + + @Override + public String getJdbcUrlWithCredentials() { + return String.format("%s?user=%s&password=%s", getJdbcUrl(), getUsername(), getPassword()); + } + + @Override + public String getUsername() { + return this.username; + } + + @Override + public String getPassword() { + return this.password; + } + + @Override + public XADataSource buildXaDataSource() { + throw new UnsupportedOperationException(); + } + + @Override + public String getDriverClass() { + return this.driver; + } + + @Override + public String getVersion() { + return version; + } +} diff --git a/flink-connector-jdbc-clickhouse/src/test/java/org/apache/flink/connector/jdbc/clickhouse/testutils/ClickHouseTableRow.java b/flink-connector-jdbc-clickhouse/src/test/java/org/apache/flink/connector/jdbc/clickhouse/testutils/ClickHouseTableRow.java new file mode 100644 index 000000000..449d19a09 --- /dev/null +++ b/flink-connector-jdbc-clickhouse/src/test/java/org/apache/flink/connector/jdbc/clickhouse/testutils/ClickHouseTableRow.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.clickhouse.testutils; + +import org.apache.flink.connector.jdbc.testutils.tables.TableField; +import org.apache.flink.connector.jdbc.testutils.tables.TableRow; + +import java.util.Arrays; +import java.util.stream.Collectors; + +/** ClickHouseTableRow. */ +public class ClickHouseTableRow extends TableRow { + public ClickHouseTableRow(String name, TableField[] fields) { + super(name, fields); + } + + @Override + public String getCreateQuery() { + String pkFields = + getStreamFields() + .filter(TableField::isPkField) + .map(TableField::getName) + .collect(Collectors.joining(", ")); + return String.format( + "CREATE TABLE %s (%s) %s PRIMARY KEY (%s);", + getTableName(), + getStreamFields().map(TableField::asString).collect(Collectors.joining(", ")), + "ENGINE = MergeTree", + pkFields); + } + + @Override + protected String getDeleteFromQuery() { + return String.format("TRUNCATE TABLE %s;", getTableName()); + } + + @Override + protected String getInsertIntoQuery(String... values) { + return String.format( + "INSERT INTO %s (%s) VALUES %s;", + getTableName(), + getStreamFieldNames().collect(Collectors.joining(", ")), + Arrays.stream(values) + .map(v -> String.format("(%s)", v)) + .collect(Collectors.joining(","))); + } + + @Override + public String getSelectAllQuery() { + return String.format( + "SELECT %s FROM %s;", + getStreamFieldNames().collect(Collectors.joining(", ")), getTableName()); + } + + @Override + public String getDropTableQuery() { + return String.format("DROP TABLE IF EXISTS %s;", getTableName()); + } +} diff --git a/flink-connector-jdbc-clickhouse/src/test/resources/log4j2-test.properties b/flink-connector-jdbc-clickhouse/src/test/resources/log4j2-test.properties new file mode 100644 index 000000000..835c2ec9a --- /dev/null +++ b/flink-connector-jdbc-clickhouse/src/test/resources/log4j2-test.properties @@ -0,0 +1,28 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = OFF +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBase.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBase.java index d8cbd7937..e4b9360aa 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBase.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBase.java @@ -69,11 +69,11 @@ public String getTableName() { return name; } - private Stream getStreamFields() { + protected Stream getStreamFields() { return Arrays.stream(this.fields); } - private Stream getStreamFieldNames() { + protected Stream getStreamFieldNames() { return getStreamFields().map(TableField::getName); } @@ -202,7 +202,7 @@ public String getCreateQueryForFlink( newName, fields, primaryKey, String.join(", ", params)); } - private String getInsertIntoQuery(String... values) { + protected String getInsertIntoQuery(String... values) { return String.format( "INSERT INTO %s (%s) VALUES %s", name, diff --git a/pom.xml b/pom.xml index fe2fc778b..5f99b1751 100644 --- a/pom.xml +++ b/pom.xml @@ -45,6 +45,7 @@ under the License. flink-connector-jdbc-architecture flink-connector-jdbc-core + flink-connector-jdbc-clickhouse flink-connector-jdbc-cratedb flink-connector-jdbc-db2 flink-connector-jdbc-mysql