From 03e5fa95c861ecfd4cf7b552172fef684d2c772a Mon Sep 17 00:00:00 2001 From: david_1366677 Date: Thu, 17 Apr 2025 18:01:34 +0800 Subject: [PATCH 1/9] Add support for GaussDB database type --- .idea/vcs.xml | 20 +- flink-connector-jdbc-gaussdb/pom.xml | 148 +++++++ .../jdbc/gaussdb/database/GaussdbFactory.java | 52 +++ .../database/catalog/GaussdbCatalog.java | 243 +++++++++++ .../database/catalog/GaussdbTablePath.java | 112 +++++ .../database/catalog/GaussdbTypeMapper.java | 187 +++++++++ .../dialect/CompatibleGaussdbDialect.java | 52 +++ .../CompatibleGaussdbDialectConverter.java | 40 ++ .../database/dialect/GaussdbDialect.java | 124 ++++++ .../dialect/GaussdbDialectConverter.java | 100 +++++ ...k.connector.jdbc.core.database.JdbcFactory | 16 + .../jdbc/gaussdb/GaussdbTestBase.java | 35 ++ .../gaussdb/database/GaussdbFactoryTest.java | 88 ++++ .../catalog/GaussdbCatalogITCase.java | 196 +++++++++ .../database/catalog/GaussdbCatalogTest.java | 188 +++++++++ .../catalog/GaussdbCatalogTestBase.java | 392 ++++++++++++++++++ .../catalog/GaussdbTablePathTest.java | 53 +++ .../database/dialect/GaussdbDialectTest.java | 85 ++++ .../table/GaussdbDynamicTableSinkITCase.java | 26 ++ .../GaussdbDynamicTableSourceITCase.java | 72 ++++ .../gaussdb/testutils/GaussDBContainer.java | 162 ++++++++ .../gaussdb/testutils/GaussdbDatabase.java | 86 ++++ .../jdbc/gaussdb/testutils/GaussdbImages.java | 24 ++ .../gaussdb/testutils/GaussdbMetadata.java | 92 ++++ .../src/test/resources/log4j2-test.properties | 28 ++ 25 files changed, 2602 insertions(+), 19 deletions(-) create mode 100644 flink-connector-jdbc-gaussdb/pom.xml create mode 100644 flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/GaussdbFactory.java create mode 100644 flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalog.java create mode 100644 flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTablePath.java create mode 100644 flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTypeMapper.java create mode 100644 flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/CompatibleGaussdbDialect.java create mode 100644 flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/CompatibleGaussdbDialectConverter.java create mode 100644 flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialect.java create mode 100644 flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectConverter.java create mode 100644 flink-connector-jdbc-gaussdb/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.core.database.JdbcFactory create mode 100644 flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/GaussdbTestBase.java create mode 100644 flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/GaussdbFactoryTest.java create mode 100644 flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogITCase.java create mode 100644 flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogTest.java create mode 100644 flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogTestBase.java create mode 100644 flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTablePathTest.java create mode 100644 flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectTest.java create mode 100644 flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/table/GaussdbDynamicTableSinkITCase.java create mode 100644 flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/table/GaussdbDynamicTableSourceITCase.java create mode 100644 flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussDBContainer.java create mode 100644 flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbDatabase.java create mode 100644 flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbImages.java create mode 100644 flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbMetadata.java create mode 100644 flink-connector-jdbc-gaussdb/src/test/resources/log4j2-test.properties diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 0dd9dbf02..35eb1ddfb 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -1,24 +1,6 @@ - - - - + \ No newline at end of file diff --git a/flink-connector-jdbc-gaussdb/pom.xml b/flink-connector-jdbc-gaussdb/pom.xml new file mode 100644 index 000000000..241c3c348 --- /dev/null +++ b/flink-connector-jdbc-gaussdb/pom.xml @@ -0,0 +1,148 @@ + + + 4.0.0 + + org.apache.flink + flink-connector-jdbc-parent + 4.0-SNAPSHOT + + + flink-connector-jdbc-gaussdb + Flink : Connectors : JDBC : GaussDB + + jar + + + 506.0.0.b058 + + + + + + org.apache.flink + flink-connector-jdbc-core + ${project.version} + + + + org.apache.flink + flink-connector-jdbc-core + ${project.version} + test-jar + test + + + + org.apache.flink + flink-table-common + ${flink.version} + test-jar + test + + + + org.apache.flink + flink-table-api-java-bridge + ${flink.version} + provided + true + + + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${flink.version} + test + + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${flink.version} + test-jar + test + + + + org.apache.flink + flink-test-utils + ${flink.version} + test + + + + + com.huaweicloud.gaussdb + gaussdbjdbc + 506.0.0.b058 + provided + + + + com.huaweicloud.gaussdb + gaussdbjdbc + 506.0.0.b058 + test + + + + + org.assertj + assertj-core + ${assertj.version} + test + + + org.testcontainers + jdbc + test + + + org.testcontainers + postgresql + test + + + org.postgresql + postgresql + 42.7.3 + test + + + com.fasterxml.jackson.core + jackson-core + test + + + + + + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + + \ No newline at end of file diff --git a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/GaussdbFactory.java b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/GaussdbFactory.java new file mode 100644 index 000000000..d1ce55849 --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/GaussdbFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.database; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.core.database.JdbcFactory; +import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog; +import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialect; +import org.apache.flink.connector.jdbc.gaussdb.database.catalog.GaussdbCatalog; +import org.apache.flink.connector.jdbc.gaussdb.database.dialect.GaussdbDialect; + +/** Factory for {@link GaussdbDialect}. */ +@Internal +public class GaussdbFactory implements JdbcFactory { + @Override + public boolean acceptsURL(String url) { + return url.startsWith("jdbc:gaussdb:"); + } + + @Override + public JdbcDialect createDialect() { + return new GaussdbDialect(); + } + + @Override + public JdbcCatalog createCatalog( + ClassLoader classLoader, + String catalogName, + String defaultDatabase, + String username, + String pwd, + String baseUrl) { + return new GaussdbCatalog( + classLoader, catalogName, defaultDatabase, username, pwd, baseUrl); + } +} diff --git a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalog.java b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalog.java new file mode 100644 index 000000000..6a09e143f --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalog.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.database.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.jdbc.core.database.catalog.AbstractJdbcCatalog; +import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalogTypeMapper; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.compress.utils.Lists; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +import static org.apache.flink.connector.jdbc.JdbcConnectionOptions.getBriefAuthProperties; + +/** Catalog for GaussDB. */ +@Internal +public class GaussdbCatalog extends AbstractJdbcCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(GaussdbCatalog.class); + + public static final String DEFAULT_DATABASE = "postgres"; + + // ------ GaussDB default objects that shouldn't be exposed to users ------ + + private static final Set builtinDatabases = + new HashSet() { + { + add("template0"); + add("template1"); + } + }; + + private static final Set builtinSchemas = + new HashSet() { + { + add("pg_toast"); + add("pg_temp_1"); + add("pg_toast_temp_1"); + add("pg_catalog"); + add("information_schema"); + } + }; + + protected final JdbcCatalogTypeMapper dialectTypeMapper; + + @VisibleForTesting + public GaussdbCatalog( + ClassLoader userClassLoader, + String catalogName, + String defaultDatabase, + String username, + String pwd, + String baseUrl) { + this( + userClassLoader, + catalogName, + defaultDatabase, + baseUrl, + getBriefAuthProperties(username, pwd)); + } + + public GaussdbCatalog( + ClassLoader userClassLoader, + String catalogName, + String defaultDatabase, + String baseUrl, + Properties connectProperties) { + this( + userClassLoader, + catalogName, + defaultDatabase, + baseUrl, + new GaussdbTypeMapper(), + connectProperties); + } + + @Deprecated + protected GaussdbCatalog( + ClassLoader userClassLoader, + String catalogName, + String defaultDatabase, + String username, + String pwd, + String baseUrl, + JdbcCatalogTypeMapper dialectTypeMapper) { + this( + userClassLoader, + catalogName, + defaultDatabase, + baseUrl, + dialectTypeMapper, + getBriefAuthProperties(username, pwd)); + } + + protected GaussdbCatalog( + ClassLoader userClassLoader, + String catalogName, + String defaultDatabase, + String baseUrl, + JdbcCatalogTypeMapper dialectTypeMapper, + Properties connectProperties) { + super(userClassLoader, catalogName, defaultDatabase, baseUrl, connectProperties); + this.dialectTypeMapper = dialectTypeMapper; + } + + // ------ databases ------ + + @Override + public List listDatabases() throws CatalogException { + + return extractColumnValuesBySQL( + defaultUrl, + "SELECT datname FROM pg_database;", + 1, + dbName -> !builtinDatabases.contains(dbName)); + } + + // ------ schemas ------ + + protected Set getBuiltinSchemas() { + return builtinSchemas; + } + + // ------ tables ------ + + protected List getPureTables(Connection conn, List schemas) + throws SQLException { + List tables = Lists.newArrayList(); + + // position 1 is database name, position 2 is schema name, position 3 is table name + try (PreparedStatement ps = + conn.prepareStatement( + "SELECT * FROM information_schema.tables " + + "WHERE table_type = 'BASE TABLE' " + + "AND table_schema = ? " + + "ORDER BY table_type, table_name;")) { + for (String schema : schemas) { + // Column index 1 is database name, 2 is schema name, 3 is table name + extractColumnValuesByStatement(ps, 3, null, schema).stream() + .map(pureTable -> schema + "." + pureTable) + .forEach(tables::add); + } + return tables; + } + } + + @Override + public List listTables(String databaseName) + throws DatabaseNotExistException, CatalogException { + + Preconditions.checkState( + StringUtils.isNotBlank(databaseName), "Database name must not be blank."); + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(getName(), databaseName); + } + + final String url = getDatabaseUrl(databaseName); + try (Connection conn = DriverManager.getConnection(url, connectionProperties)) { + // get all schemas + List schemas; + try (PreparedStatement ps = + conn.prepareStatement("SELECT schema_name FROM information_schema.schemata;")) { + schemas = + extractColumnValuesByStatement( + ps, 1, pgSchema -> !getBuiltinSchemas().contains(pgSchema)); + } + + // get all tables + return getPureTables(conn, schemas); + } catch (Exception e) { + throw new CatalogException( + String.format("Failed to list tables for database %s", databaseName), e); + } + } + + /** Converts Gaussdb type to Flink {@link DataType}. */ + @Override + protected DataType fromJDBCType(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex) + throws SQLException { + return dialectTypeMapper.mapping(tablePath, metadata, colIndex); + } + + @Override + public boolean tableExists(ObjectPath tablePath) throws CatalogException { + + List tables = null; + try { + tables = listTables(tablePath.getDatabaseName()); + } catch (DatabaseNotExistException e) { + return false; + } + + return tables.contains(getSchemaTableName(tablePath)); + } + + @Override + protected String getTableName(ObjectPath tablePath) { + return GaussdbTablePath.fromFlinkTableName(tablePath.getObjectName()).getPgTableName(); + } + + @Override + protected String getSchemaName(ObjectPath tablePath) { + return GaussdbTablePath.fromFlinkTableName(tablePath.getObjectName()).getPgSchemaName(); + } + + @Override + protected String getSchemaTableName(ObjectPath tablePath) { + return GaussdbTablePath.fromFlinkTableName(tablePath.getObjectName()).getFullPath(); + } +} diff --git a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTablePath.java b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTablePath.java new file mode 100644 index 000000000..f6dac0548 --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTablePath.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.database.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.StringUtils; + +import java.util.Objects; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Table path of GaussDB in Flink. Can be of formats "table_name" or "schema_name.table_name". When + * it's "table_name", the schema name defaults to "public". + */ +@Internal +public class GaussdbTablePath { + + private static final String DEFAULT_POSTGRES_SCHEMA_NAME = "public"; + + private final String pgSchemaName; + private final String pgTableName; + + public GaussdbTablePath(String pgSchemaName, String pgTableName) { + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(pgSchemaName), + "Schema name is not valid. Null or empty is not allowed"); + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(pgTableName), + "Table name is not valid. Null or empty is not allowed"); + + this.pgSchemaName = pgSchemaName; + this.pgTableName = pgTableName; + } + + public static GaussdbTablePath fromFlinkTableName(String flinkTableName) { + if (flinkTableName.contains(".")) { + String[] path = flinkTableName.split("\\."); + + checkArgument( + path != null && path.length == 2, + String.format( + "Table name '%s' is not valid. The parsed length is %d", + flinkTableName, path.length)); + + return new GaussdbTablePath(path[0], path[1]); + } else { + return new GaussdbTablePath(getDefaultSchemaName(), flinkTableName); + } + } + + public static String toFlinkTableName(String schema, String table) { + return new GaussdbTablePath(schema, table).getFullPath(); + } + + public String getFullPath() { + return String.format("%s.%s", pgSchemaName, pgTableName); + } + + public String getPgTableName() { + return pgTableName; + } + + public String getPgSchemaName() { + return pgSchemaName; + } + + protected static String getDefaultSchemaName() { + return DEFAULT_POSTGRES_SCHEMA_NAME; + } + + @Override + public String toString() { + return getFullPath(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + GaussdbTablePath that = (GaussdbTablePath) o; + return Objects.equals(pgSchemaName, that.pgSchemaName) + && Objects.equals(pgTableName, that.pgTableName); + } + + @Override + public int hashCode() { + return Objects.hash(pgSchemaName, pgTableName); + } +} diff --git a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTypeMapper.java b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTypeMapper.java new file mode 100644 index 000000000..6456e4fbd --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTypeMapper.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.database.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalogTypeMapper; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.DecimalType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.ResultSetMetaData; +import java.sql.SQLException; + +/** GaussdbTypeMapper util class. */ +@Internal +public class GaussdbTypeMapper implements JdbcCatalogTypeMapper { + + private static final Logger LOG = LoggerFactory.getLogger(GaussdbTypeMapper.class); + + // GaussDB JDBC driver maps several aliases to real types. We use the real types rather than + // serial2 <=> int2 + // smallserial <=> int2 + // serial4 <=> serial + // serial8 <=> bigserial + // smallint <=> int2 + // integer <=> int4 + // int <=> int4 + // bigint <=> int8 + // float <=> float8 + // boolean <=> bool + // decimal <=> numeric + private static final String PG_SMALLSERIAL = "smallserial"; + protected static final String PG_SERIAL = "serial"; + protected static final String PG_BIGSERIAL = "bigserial"; + private static final String PG_BYTEA = "bytea"; + private static final String PG_BYTEA_ARRAY = "_bytea"; + private static final String PG_SMALLINT = "int2"; + private static final String PG_SMALLINT_ARRAY = "_int2"; + private static final String PG_INTEGER = "int4"; + private static final String PG_INTEGER_ARRAY = "_int4"; + private static final String PG_BIGINT = "int8"; + private static final String PG_BIGINT_ARRAY = "_int8"; + private static final String PG_REAL = "float4"; + private static final String PG_REAL_ARRAY = "_float4"; + private static final String PG_DOUBLE_PRECISION = "float8"; + private static final String PG_DOUBLE_PRECISION_ARRAY = "_float8"; + private static final String PG_NUMERIC = "numeric"; + private static final String PG_NUMERIC_ARRAY = "_numeric"; + private static final String PG_BOOLEAN = "bool"; + private static final String PG_BOOLEAN_ARRAY = "_bool"; + private static final String PG_TIMESTAMP = "timestamp"; + private static final String PG_TIMESTAMP_ARRAY = "_timestamp"; + private static final String PG_TIMESTAMPTZ = "timestamptz"; + private static final String PG_TIMESTAMPTZ_ARRAY = "_timestamptz"; + private static final String PG_DATE = "date"; + private static final String PG_DATE_ARRAY = "_date"; + private static final String PG_TIME = "time"; + private static final String PG_TIME_ARRAY = "_time"; + private static final String PG_TEXT = "text"; + private static final String PG_TEXT_ARRAY = "_text"; + private static final String PG_CHAR = "bpchar"; + private static final String PG_CHAR_ARRAY = "_bpchar"; + private static final String PG_CHARACTER = "character"; + private static final String PG_CHARACTER_ARRAY = "_character"; + private static final String PG_CHARACTER_VARYING = "varchar"; + private static final String PG_CHARACTER_VARYING_ARRAY = "_varchar"; + + @Override + public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex) + throws SQLException { + String pgType = metadata.getColumnTypeName(colIndex); + + int precision = metadata.getPrecision(colIndex); + int scale = metadata.getScale(colIndex); + + DataType dataType = getMapping(pgType, precision, scale); + if (dataType == null) { + throw new UnsupportedOperationException( + String.format("Doesn't support %s type '%s' yet", getDBType(), pgType)); + } + return dataType; + } + + protected DataType getMapping(String pgType, int precision, int scale) { + switch (pgType) { + case PG_BOOLEAN: + return DataTypes.BOOLEAN(); + case PG_BOOLEAN_ARRAY: + return DataTypes.ARRAY(DataTypes.BOOLEAN()); + case PG_BYTEA: + return DataTypes.BYTES(); + case PG_BYTEA_ARRAY: + return DataTypes.ARRAY(DataTypes.BYTES()); + case PG_SMALLINT: + case PG_SMALLSERIAL: + return DataTypes.SMALLINT(); + case PG_SMALLINT_ARRAY: + return DataTypes.ARRAY(DataTypes.SMALLINT()); + case PG_INTEGER: + case PG_SERIAL: + return DataTypes.INT(); + case PG_INTEGER_ARRAY: + return DataTypes.ARRAY(DataTypes.INT()); + case PG_BIGINT: + case PG_BIGSERIAL: + return DataTypes.BIGINT(); + case PG_BIGINT_ARRAY: + return DataTypes.ARRAY(DataTypes.BIGINT()); + case PG_REAL: + return DataTypes.FLOAT(); + case PG_REAL_ARRAY: + return DataTypes.ARRAY(DataTypes.FLOAT()); + case PG_DOUBLE_PRECISION: + return DataTypes.DOUBLE(); + case PG_DOUBLE_PRECISION_ARRAY: + return DataTypes.ARRAY(DataTypes.DOUBLE()); + case PG_NUMERIC: + // handle numeric without explicit precision and scale. + if (precision > 0) { + return DataTypes.DECIMAL(precision, scale); + } + return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18); + case PG_NUMERIC_ARRAY: + // handle numeric without explicit precision and scale. + if (precision > 0) { + return DataTypes.ARRAY(DataTypes.DECIMAL(precision, scale)); + } + return DataTypes.ARRAY(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18)); + case PG_CHAR: + case PG_CHARACTER: + return DataTypes.CHAR(precision); + case PG_CHAR_ARRAY: + case PG_CHARACTER_ARRAY: + return DataTypes.ARRAY(DataTypes.CHAR(precision)); + case PG_CHARACTER_VARYING: + return DataTypes.VARCHAR(precision); + case PG_CHARACTER_VARYING_ARRAY: + return DataTypes.ARRAY(DataTypes.VARCHAR(precision)); + case PG_TEXT: + return DataTypes.STRING(); + case PG_TEXT_ARRAY: + return DataTypes.ARRAY(DataTypes.STRING()); + case PG_TIMESTAMP: + return DataTypes.TIMESTAMP(scale); + case PG_TIMESTAMP_ARRAY: + return DataTypes.ARRAY(DataTypes.TIMESTAMP(scale)); + case PG_TIMESTAMPTZ: + return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(scale); + case PG_TIMESTAMPTZ_ARRAY: + return DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(scale)); + case PG_TIME: + return DataTypes.TIME(scale); + case PG_TIME_ARRAY: + return DataTypes.ARRAY(DataTypes.TIME(scale)); + case PG_DATE: + return DataTypes.DATE(); + case PG_DATE_ARRAY: + return DataTypes.ARRAY(DataTypes.DATE()); + default: + return null; + } + } + + protected String getDBType() { + return "Gaussdb"; + } +} diff --git a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/CompatibleGaussdbDialect.java b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/CompatibleGaussdbDialect.java new file mode 100644 index 000000000..aae02d2a9 --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/CompatibleGaussdbDialect.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.database.dialect; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Optional; + +/** JDBC dialect for GaussDB compatible databases. */ +@PublicEvolving +public abstract class CompatibleGaussdbDialect extends GaussdbDialect { + + private static final long serialVersionUID = 1L; + + protected abstract String compatibleDialectName(); + + protected abstract CompatibleGaussdbDialectConverter compatibleRowConverter(RowType rowType); + + protected abstract Optional compatibleDriverName(); + + @Override + public String dialectName() { + return compatibleDialectName(); + } + + @Override + public CompatibleGaussdbDialectConverter getRowConverter(RowType rowType) { + return compatibleRowConverter(rowType); + } + + @Override + public Optional defaultDriverName() { + return compatibleDriverName(); + } +} diff --git a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/CompatibleGaussdbDialectConverter.java b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/CompatibleGaussdbDialectConverter.java new file mode 100644 index 000000000..10fb97184 --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/CompatibleGaussdbDialectConverter.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.database.dialect; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.types.logical.RowType; + +/** JDBC converter for Gaussdb compatible databases. */ +@Internal +public abstract class CompatibleGaussdbDialectConverter extends GaussdbDialectConverter { + + private static final long serialVersionUID = 1L; + + protected CompatibleGaussdbDialectConverter(RowType rowType) { + super(rowType); + } + + protected abstract String compatibleConverterName(); + + @Override + public String converterName() { + return compatibleConverterName(); + } +} diff --git a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialect.java b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialect.java new file mode 100644 index 000000000..fe2706b15 --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialect.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.database.dialect; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialect; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Arrays; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** JDBC dialect for Gaussdb. */ +@Internal +public class GaussdbDialect extends AbstractDialect { + + private static final long serialVersionUID = 1L; + + // Define MAX/MIN precision of TIMESTAMP type according to Gaussdb docs: + + private static final int MAX_TIMESTAMP_PRECISION = 6; + private static final int MIN_TIMESTAMP_PRECISION = 1; + + // Define MAX/MIN precision of DECIMAL type according to Gaussdb docs: + + private static final int MAX_DECIMAL_PRECISION = 1000; + private static final int MIN_DECIMAL_PRECISION = 1; + + @Override + public GaussdbDialectConverter getRowConverter(RowType rowType) { + return new GaussdbDialectConverter(rowType); + } + + @Override + public Optional defaultDriverName() { + return Optional.of("com.huawei.gaussdb.jdbc.Driver"); + } + + @Override + public String dialectName() { + return "Gaussdb"; + } + + @Override + public String getLimitClause(long limit) { + return "LIMIT " + limit; + } + + @Override + public String quoteIdentifier(String identifier) { + return identifier; + } + + @Override + public Optional decimalPrecisionRange() { + return Optional.of(Range.of(MIN_DECIMAL_PRECISION, MAX_DECIMAL_PRECISION)); + } + + @Override + public Optional timestampPrecisionRange() { + return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION)); + } + + @Override + public Set supportedTypes() { + return EnumSet.of( + LogicalTypeRoot.CHAR, + LogicalTypeRoot.VARCHAR, + LogicalTypeRoot.BOOLEAN, + LogicalTypeRoot.VARBINARY, + LogicalTypeRoot.DECIMAL, + LogicalTypeRoot.TINYINT, + LogicalTypeRoot.SMALLINT, + LogicalTypeRoot.INTEGER, + LogicalTypeRoot.BIGINT, + LogicalTypeRoot.FLOAT, + LogicalTypeRoot.DOUBLE, + LogicalTypeRoot.DATE, + LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, + LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, + LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, + LogicalTypeRoot.ARRAY); + } + + /** Gaussdb upsert query. It use ON DUPLICATE KEY UPDATE .. to replace into Gaussdb. */ + @Override + public Optional getUpsertStatement( + String tableName, String[] fieldNames, String[] uniqueKeyFields) { + String uniqueColumns = + Arrays.stream(uniqueKeyFields) + .map(this::quoteIdentifier) + .collect(Collectors.joining(", ")); + final Set uniqueKeyFieldsSet = new HashSet<>(Arrays.asList(uniqueKeyFields)); + String updateClause = + Arrays.stream(fieldNames) + .filter(f -> !uniqueKeyFieldsSet.contains(f)) + .map(f -> quoteIdentifier(f) + "=values(" + quoteIdentifier(f) + ")") + .collect(Collectors.joining(", ")); + return Optional.of( + this.getInsertIntoStatement(tableName, fieldNames) + + " ON DUPLICATE KEY UPDATE " + + updateClause); + } +} diff --git a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectConverter.java b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectConverter.java new file mode 100644 index 000000000..e3c6f5687 --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectConverter.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.database.dialect; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialectConverter; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.utils.LogicalTypeUtils; + +import java.lang.reflect.Array; + +/** + * Runtime converter that responsible to convert between JDBC object and Flink internal object for + * GaussDB. + */ +@Internal +public class GaussdbDialectConverter extends AbstractDialectConverter { + + private static final long serialVersionUID = 1L; + + protected GaussdbDialectConverter(RowType rowType) { + super(rowType); + } + + @Override + public JdbcDeserializationConverter createInternalConverter(LogicalType type) { + LogicalTypeRoot root = type.getTypeRoot(); + + if (root == LogicalTypeRoot.ARRAY) { + ArrayType arrayType = (ArrayType) type; + return createGaussdbArrayConverter(arrayType); + } else { + return createPrimitiveConverter(type); + } + } + + @Override + protected JdbcSerializationConverter createNullableExternalConverter(LogicalType type) { + LogicalTypeRoot root = type.getTypeRoot(); + if (root == LogicalTypeRoot.ARRAY) { + // note: Writing ARRAY type is not yet supported by GaussdbQL dialect now. + return (val, index, statement) -> { + throw new IllegalStateException( + String.format( + "Writing ARRAY type is not yet supported in JDBC:%s.", + converterName())); + }; + } else { + return super.createNullableExternalConverter(type); + } + } + + private JdbcDeserializationConverter createGaussdbArrayConverter(ArrayType arrayType) { + + final Class elementClass = + LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType()); + final JdbcDeserializationConverter elementConverter = + createNullableInternalConverter(arrayType.getElementType()); + return val -> { + java.sql.Array pgArray = (java.sql.Array) val; + Object[] in = (Object[]) pgArray.getArray(); + final Object[] array = (Object[]) Array.newInstance(elementClass, in.length); + for (int i = 0; i < in.length; i++) { + array[i] = elementConverter.deserialize(in[i]); + } + return new GenericArrayData(array); + }; + } + + // Have its own method so that Gaussdb can support primitives that super class doesn't support + // in the future + private JdbcDeserializationConverter createPrimitiveConverter(LogicalType type) { + return super.createInternalConverter(type); + } + + @Override + public String converterName() { + return "Gaussdb"; + } +} diff --git a/flink-connector-jdbc-gaussdb/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.core.database.JdbcFactory b/flink-connector-jdbc-gaussdb/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.core.database.JdbcFactory new file mode 100644 index 000000000..1e14edd1b --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.core.database.JdbcFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.connector.jdbc.gaussdb.database.GaussdbFactory \ No newline at end of file diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/GaussdbTestBase.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/GaussdbTestBase.java new file mode 100644 index 000000000..a243deab2 --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/GaussdbTestBase.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb; + +import org.apache.flink.connector.jdbc.gaussdb.testutils.GaussdbDatabase; +import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata; +import org.apache.flink.connector.jdbc.testutils.DatabaseTest; + +import org.junit.jupiter.api.extension.ExtendWith; + +/** Base class for Postgres testing. */ +@ExtendWith(GaussdbDatabase.class) +public interface GaussdbTestBase extends DatabaseTest { + + @Override + default DatabaseMetadata getMetadata() { + return GaussdbDatabase.getMetadata(); + } +} diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/GaussdbFactoryTest.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/GaussdbFactoryTest.java new file mode 100644 index 000000000..24c9e76cf --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/GaussdbFactoryTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.database; + +import org.apache.flink.connector.jdbc.core.database.JdbcFactoryLoader; +import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog; +import org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactory; +import org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactoryOptions; +import org.apache.flink.connector.jdbc.gaussdb.GaussdbTestBase; +import org.apache.flink.connector.jdbc.gaussdb.database.catalog.GaussdbCatalog; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.factories.FactoryUtil; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link JdbcCatalogFactory}. */ +class GaussdbFactoryTest implements GaussdbTestBase { + + protected static String baseUrl; + protected static JdbcCatalog catalog; + + protected static final String TEST_CATALOG_NAME = "mygs"; + + @BeforeEach + void setup() { + String jdbcUrl = getMetadata().getJdbcUrl(); + baseUrl = jdbcUrl.substring(0, jdbcUrl.lastIndexOf("/")); + + catalog = + JdbcFactoryLoader.loadCatalog( + Thread.currentThread().getContextClassLoader(), + TEST_CATALOG_NAME, + GaussdbCatalog.DEFAULT_DATABASE, + getMetadata().getUsername(), + getMetadata().getPassword(), + baseUrl, + null); + } + + @Test + void test() { + final Map options = new HashMap<>(); + options.put(CommonCatalogOptions.CATALOG_TYPE.key(), JdbcCatalogFactoryOptions.IDENTIFIER); + options.put( + JdbcCatalogFactoryOptions.DEFAULT_DATABASE.key(), GaussdbCatalog.DEFAULT_DATABASE); + options.put(JdbcCatalogFactoryOptions.USERNAME.key(), getMetadata().getUsername()); + options.put(JdbcCatalogFactoryOptions.PASSWORD.key(), getMetadata().getPassword()); + options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl); + + final Catalog actualCatalog = + FactoryUtil.createCatalog( + TEST_CATALOG_NAME, + options, + null, + Thread.currentThread().getContextClassLoader()); + + checkEquals(catalog, (JdbcCatalog) actualCatalog); + + assertThat((JdbcCatalog) actualCatalog).isInstanceOf(GaussdbCatalog.class); + } + + private static void checkEquals(JdbcCatalog c1, JdbcCatalog c2) { + assertThat(c2).isEqualTo(c1); + } +} diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogITCase.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogITCase.java new file mode 100644 index 000000000..0d8b6e6f4 --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogITCase.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.database.catalog; + +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.util.CollectionUtil; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.apache.flink.connector.jdbc.gaussdb.database.catalog.GaussdbCatalog.DEFAULT_DATABASE; +import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM; +import static org.assertj.core.api.Assertions.assertThat; + +/** E2E test for {@link GaussdbCatalog}. */ +class GaussdbCatalogITCase extends GaussdbCatalogTestBase { + + private TableEnvironment tEnv; + + @BeforeEach + void setup() { + this.tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode()); + tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + + // use PG catalog + tEnv.registerCatalog(TEST_CATALOG_NAME, catalog); + tEnv.useCatalog(TEST_CATALOG_NAME); + } + + @Test + void testSelectField() { + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery(String.format("select id from %s", TABLE1)) + .execute() + .collect()); + assertThat(results).hasToString("[+I[1]]"); + } + + @Test + void testWithoutSchema() { + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery(String.format("select * from %s", TABLE1)) + .execute() + .collect()); + assertThat(results).hasToString("[+I[1]]"); + } + + @Test + void testWithSchema() { + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery( + String.format( + "select * from `%s`", + GaussdbTablePath.fromFlinkTableName(TABLE1))) + .execute() + .collect()); + assertThat(results).hasToString("[+I[1]]"); + } + + @Test + void testFullPath() { + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery( + String.format( + "select * from %s.%s.`%s`", + TEST_CATALOG_NAME, + DEFAULT_DATABASE, + GaussdbTablePath.fromFlinkTableName(TABLE1))) + .execute() + .collect()); + assertThat(results).hasToString("[+I[1]]"); + } + + @Test + void testInsert() throws Exception { + tEnv.executeSql(String.format("insert into %s select * from `%s`", TABLE4, TABLE1)).await(); + + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery(String.format("select * from %s", TABLE4)) + .execute() + .collect()); + assertThat(results).hasToString("[+I[1]]"); + } + + @Test + void testGroupByInsert() throws Exception { + tEnv.executeSql( + String.format( + "insert into `%s` " + + "select `int`, cast('41' as bytes), `short`, max(`long`), max(`real`), " + + "max(`double_precision`), max(`numeric`), max(`decimal`), max(`boolean`), " + + "max(`text`), 'B', 'C', max(`character_varying`), max(`timestamp`), " + + "max(`date`), max(`time`), max(`default_numeric`) " + + "from `%s` group by `int`, `short`", + TABLE_PRIMITIVE_TYPE2, TABLE_PRIMITIVE_TYPE)) + .await(); + + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery(String.format("select * from `%s`", TABLE_PRIMITIVE_TYPE2)) + .execute() + .collect()); + assertThat(results) + .hasToString( + "[+I[1, [52, 49], 3, 4, 5.5, 6.6, 7.70000, 8.8, true, a, B, C , d, 2016-06-22T19:10:25, 2015-01-01, 00:51:03, 500.000000000000000000]]"); + } + + @Test + void testPrimitiveTypes() { + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery(String.format("select * from %s", TABLE_PRIMITIVE_TYPE)) + .execute() + .collect()); + + assertThat(results) + .hasToString( + "[+I[1, [50], 3, 4, 5.5, 6.6, 7.70000, 8.8, true, a, b, c , d, 2016-06-22T19:10:25, 2015-01-01, 00:51:03, 500.000000000000000000]]"); + } + + @Test + void testArrayTypes() { + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery(String.format("select * from %s", TABLE_ARRAY_TYPE)) + .execute() + .collect()); + + assertThat(results) + .hasToString( + "[+I[" + + "[1, 2, 3], " + + "[[50], [51], [52]], " + + "[3, 4, 5], " + + "[4, 5, 6], " + + "[5.5, 6.6, 7.7], " + + "[6.6, 7.7, 8.8], " + + "[7.70000, 8.80000, 9.90000], " + + "[8.800000000000000000, 9.900000000000000000, 10.100000000000000000], " + + "[9.90, 10.10, 11.11], " + + "[true, false, true], " + + "[a, b, c], " + + "[b, c, d], " + + "[b , c , d ], " + + "[b, c, d], " + + "[2016-06-22T19:10:25, 2019-06-22T19:10:25], " + + "[2015-01-01, 2020-01-01], " + + "[00:51:03, 00:59:03], " + + "null, " + + "null]]"); + } + + @Test + void testSerialTypes() { + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery(String.format("select * from %s", TABLE_SERIAL_TYPE)) + .execute() + .collect()); + + assertThat(results) + .hasToString( + "[+I[" + + "32767, " + + "2147483647, " + + "32767, " + + "2147483647, " + + "9223372036854775807, " + + "9223372036854775807]]"); + } +} diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogTest.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogTest.java new file mode 100644 index 000000000..b09bcb43a --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogTest.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.database.catalog; + +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link GaussdbCatalog}. */ +class GaussdbCatalogTest extends GaussdbCatalogTestBase { + + // ------ databases ------ + + @Test + void testGetDb_DatabaseNotExistException() { + assertThatThrownBy(() -> catalog.getDatabase("nonexistent")) + .isInstanceOf(DatabaseNotExistException.class) + .hasMessageContaining("Database nonexistent does not exist in Catalog"); + } + + @Test + void testListDatabases() { + List actual = catalog.listDatabases(); + + assertThat(actual).isEqualTo(Arrays.asList("postgres", "test")); + } + + @Test + void testDbExists() { + assertThat(catalog.databaseExists("nonexistent")).isFalse(); + + assertThat(catalog.databaseExists(GaussdbCatalog.DEFAULT_DATABASE)).isTrue(); + } + + // ------ tables ------ + + @Test + void testListTables() throws DatabaseNotExistException { + List actual = catalog.listTables(GaussdbCatalog.DEFAULT_DATABASE); + + assertThat(actual) + .isEqualTo( + Arrays.asList( + "public.array_table", + "public.primitive_table", + "public.primitive_table2", + "public.serial_table", + "public.t1", + "public.t4", + "public.t5")); + + actual = catalog.listTables(TEST_DB); + + assertThat(actual).isEqualTo(Arrays.asList("public.t2", "test_schema.t3")); + } + + @Test + void testListTables_DatabaseNotExistException() { + assertThatThrownBy(() -> catalog.listTables("postgres/nonexistschema")) + .isInstanceOf(DatabaseNotExistException.class); + } + + @Test + void testTableExists() { + assertThat(catalog.tableExists(new ObjectPath(TEST_DB, "nonexist"))).isFalse(); + + assertThat(catalog.tableExists(new ObjectPath(GaussdbCatalog.DEFAULT_DATABASE, TABLE1))) + .isTrue(); + assertThat(catalog.tableExists(new ObjectPath(TEST_DB, TABLE2))).isTrue(); + assertThat(catalog.tableExists(new ObjectPath(TEST_DB, "test_schema.t3"))).isTrue(); + } + + @Test + void testGetTables_TableNotExistException() { + assertThatThrownBy( + () -> + catalog.getTable( + new ObjectPath( + TEST_DB, + GaussdbTablePath.toFlinkTableName( + TEST_SCHEMA, "anytable")))) + .isInstanceOf(TableNotExistException.class); + } + + @Test + void testGetTables_TableNotExistException_NoSchema() { + assertThatThrownBy( + () -> + catalog.getTable( + new ObjectPath( + TEST_DB, + GaussdbTablePath.toFlinkTableName( + "nonexistschema", "anytable")))) + .isInstanceOf(TableNotExistException.class); + } + + @Test + void testGetTables_TableNotExistException_NoDb() { + assertThatThrownBy( + () -> + catalog.getTable( + new ObjectPath( + "nonexistdb", + GaussdbTablePath.toFlinkTableName( + TEST_SCHEMA, "anytable")))) + .isInstanceOf(TableNotExistException.class); + } + + @Test + void testGetTable() throws org.apache.flink.table.catalog.exceptions.TableNotExistException { + // test postgres.public.user1 + Schema schema = getSimpleTable().schema; + + CatalogBaseTable table = catalog.getTable(new ObjectPath("postgres", TABLE1)); + + assertThat(table.getUnresolvedSchema()).isEqualTo(schema); + + table = catalog.getTable(new ObjectPath("postgres", "public.t1")); + + assertThat(table.getUnresolvedSchema()).isEqualTo(schema); + + // test testdb.public.user2 + table = catalog.getTable(new ObjectPath(TEST_DB, TABLE2)); + + assertThat(table.getUnresolvedSchema()).isEqualTo(schema); + + table = catalog.getTable(new ObjectPath(TEST_DB, "public.t2")); + + assertThat(table.getUnresolvedSchema()).isEqualTo(schema); + + // test testdb.testschema.user2 + table = catalog.getTable(new ObjectPath(TEST_DB, TEST_SCHEMA + ".t3")); + + assertThat(table.getUnresolvedSchema()).isEqualTo(schema); + } + + @Test + void testPrimitiveDataTypes() throws TableNotExistException { + CatalogBaseTable table = + catalog.getTable( + new ObjectPath(GaussdbCatalog.DEFAULT_DATABASE, TABLE_PRIMITIVE_TYPE)); + + assertThat(table.getUnresolvedSchema()).isEqualTo(getPrimitiveTable().schema); + } + + @Test + void testArrayDataTypes() throws TableNotExistException { + CatalogBaseTable table = + catalog.getTable(new ObjectPath(GaussdbCatalog.DEFAULT_DATABASE, TABLE_ARRAY_TYPE)); + + assertThat(table.getUnresolvedSchema()).isEqualTo(getArrayTable().schema); + } + + @Test + public void testSerialDataTypes() throws TableNotExistException { + CatalogBaseTable table = + catalog.getTable( + new ObjectPath(GaussdbCatalog.DEFAULT_DATABASE, TABLE_SERIAL_TYPE)); + + assertThat(table.getUnresolvedSchema()).isEqualTo(getSerialTable().schema); + } +} diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogTestBase.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogTestBase.java new file mode 100644 index 000000000..f0611cb8b --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogTestBase.java @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.database.catalog; + +import org.apache.flink.connector.jdbc.gaussdb.GaussdbTestBase; +import org.apache.flink.connector.jdbc.gaussdb.testutils.GaussdbDatabase; +import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata; +import org.apache.flink.connector.jdbc.testutils.JdbcITCaseBase; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.types.logical.DecimalType; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; + +/** Test base for {@link GaussdbCatalog}. */ +class GaussdbCatalogTestBase implements JdbcITCaseBase, GaussdbTestBase { + + private static DatabaseMetadata getStaticMetadata() { + return GaussdbDatabase.getMetadata(); + } + + protected static final String TEST_CATALOG_NAME = "mypg"; + protected static final String TEST_USERNAME = getStaticMetadata().getUsername(); + protected static final String TEST_PWD = getStaticMetadata().getPassword(); + protected static final String TEST_DB = "test"; + protected static final String TEST_SCHEMA = "test_schema"; + protected static final String TABLE1 = "t1"; + protected static final String TABLE2 = "t2"; + protected static final String TABLE3 = "t3"; + protected static final String TABLE4 = "t4"; + protected static final String TABLE5 = "t5"; + protected static final String TABLE_PRIMITIVE_TYPE = "primitive_table"; + protected static final String TABLE_PRIMITIVE_TYPE2 = "primitive_table2"; + protected static final String TABLE_ARRAY_TYPE = "array_table"; + protected static final String TABLE_SERIAL_TYPE = "serial_table"; + + protected static String baseUrl; + protected static GaussdbCatalog catalog; + + @BeforeAll + static void init() throws SQLException { + + String jdbcUrl = getStaticMetadata().getJdbcUrl(); + baseUrl = jdbcUrl.substring(0, jdbcUrl.lastIndexOf("/")); + + catalog = + new GaussdbCatalog( + Thread.currentThread().getContextClassLoader(), + TEST_CATALOG_NAME, + GaussdbCatalog.DEFAULT_DATABASE, + TEST_USERNAME, + TEST_PWD, + baseUrl); + + // create test database and schema + createSchema(TEST_DB, TEST_SCHEMA); + + // create test tables + // table: postgres.public.t1 + // table: postgres.public.t4 + // table: postgres.public.t5 + createTable(GaussdbTablePath.fromFlinkTableName(TABLE1), getSimpleTable().pgSchemaSql); + createTable(GaussdbTablePath.fromFlinkTableName(TABLE4), getSimpleTable().pgSchemaSql); + createTable(GaussdbTablePath.fromFlinkTableName(TABLE5), getSimpleTable().pgSchemaSql); + + // table: test.public.t2 + // table: test.test_schema.t3 + // table: postgres.public.dt + // table: postgres.public.dt2 + createTable( + TEST_DB, GaussdbTablePath.fromFlinkTableName(TABLE2), getSimpleTable().pgSchemaSql); + createTable( + TEST_DB, new GaussdbTablePath(TEST_SCHEMA, TABLE3), getSimpleTable().pgSchemaSql); + createTable( + GaussdbTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE), + getPrimitiveTable().pgSchemaSql); + createTable( + GaussdbTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE2), + getPrimitiveTable("test_pk2").pgSchemaSql); + createTable( + GaussdbTablePath.fromFlinkTableName(TABLE_ARRAY_TYPE), getArrayTable().pgSchemaSql); + createTable( + GaussdbTablePath.fromFlinkTableName(TABLE_SERIAL_TYPE), + getSerialTable().pgSchemaSql); + + executeSQL( + GaussdbCatalog.DEFAULT_DATABASE, + String.format( + "insert into public.%s values (%s);", TABLE1, getSimpleTable().values)); + executeSQL( + GaussdbCatalog.DEFAULT_DATABASE, + String.format( + "insert into %s values (%s);", + TABLE_PRIMITIVE_TYPE, getPrimitiveTable().values)); + executeSQL( + GaussdbCatalog.DEFAULT_DATABASE, + String.format( + "insert into %s values (%s);", TABLE_ARRAY_TYPE, getArrayTable().values)); + executeSQL( + GaussdbCatalog.DEFAULT_DATABASE, + String.format( + "insert into %s values (%s);", TABLE_SERIAL_TYPE, getSerialTable().values)); + } + + @AfterAll + static void afterAll() throws SQLException { + executeSQL(TEST_DB, String.format("DROP SCHEMA %s CASCADE", TEST_SCHEMA)); + executeSQL( + TEST_DB, + String.format("DROP TABLE %s ", GaussdbTablePath.fromFlinkTableName(TABLE2))); + + executeSQL( + GaussdbCatalog.DEFAULT_DATABASE, + String.format("DROP TABLE %s ", GaussdbTablePath.fromFlinkTableName(TABLE1))); + executeSQL( + GaussdbCatalog.DEFAULT_DATABASE, + String.format("DROP TABLE %s ", GaussdbTablePath.fromFlinkTableName(TABLE4))); + executeSQL( + GaussdbCatalog.DEFAULT_DATABASE, + String.format("DROP TABLE %s ", GaussdbTablePath.fromFlinkTableName(TABLE5))); + executeSQL( + GaussdbCatalog.DEFAULT_DATABASE, + String.format( + "DROP TABLE %s ", + GaussdbTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE))); + executeSQL( + GaussdbCatalog.DEFAULT_DATABASE, + String.format( + "DROP TABLE %s ", + GaussdbTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE2))); + executeSQL( + GaussdbCatalog.DEFAULT_DATABASE, + String.format( + "DROP TABLE %s ", GaussdbTablePath.fromFlinkTableName(TABLE_ARRAY_TYPE))); + executeSQL( + GaussdbCatalog.DEFAULT_DATABASE, + String.format( + "DROP TABLE %s ", GaussdbTablePath.fromFlinkTableName(TABLE_SERIAL_TYPE))); + } + + public static void createTable(GaussdbTablePath tablePath, String tableSchemaSql) + throws SQLException { + executeSQL( + GaussdbCatalog.DEFAULT_DATABASE, + String.format("CREATE TABLE %s(%s);", tablePath.getFullPath(), tableSchemaSql)); + } + + public static void createTable(String db, GaussdbTablePath tablePath, String tableSchemaSql) + throws SQLException { + executeSQL( + db, String.format("CREATE TABLE %s(%s);", tablePath.getFullPath(), tableSchemaSql)); + } + + public static void createSchema(String db, String schema) throws SQLException { + executeSQL(db, String.format("CREATE SCHEMA %s", schema)); + } + + public static void createDatabase(String database) throws SQLException { + executeSQL(String.format("CREATE DATABASE %s;", database)); + } + + public static void executeSQL(String sql) throws SQLException { + executeSQL("", sql); + } + + public static void executeSQL(String db, String sql) throws SQLException { + try (Connection conn = + DriverManager.getConnection( + String.format("%s/%s", baseUrl, db), TEST_USERNAME, TEST_PWD); + Statement statement = conn.createStatement()) { + statement.executeUpdate(sql); + } catch (SQLException e) { + throw e; + } + } + + /** Object holding schema and corresponding sql. */ + public static class TestTable { + Schema schema; + String pgSchemaSql; + String values; + + public TestTable(Schema schema, String pgSchemaSql, String values) { + this.schema = schema; + this.pgSchemaSql = pgSchemaSql; + this.values = values; + } + } + + public static TestTable getSimpleTable() { + return new TestTable( + Schema.newBuilder().column("id", DataTypes.INT()).build(), "id integer", "1"); + } + + // posgres doesn't support to use the same primary key name across different tables, + // make the table parameterized to resolve this problem. + public static TestTable getPrimitiveTable() { + return getPrimitiveTable("test_pk"); + } + + // TODO: add back timestamptz and time types. + // Flink currently doesn't support converting time's precision, with the following error + // TableException: Unsupported conversion from data type 'TIME(6)' (conversion class: + // java.sql.Time) + // to type information. Only data types that originated from type information fully support a + // reverse conversion. + public static TestTable getPrimitiveTable(String primaryKeyName) { + return new TestTable( + Schema.newBuilder() + .column("int", DataTypes.INT().notNull()) + .column("bytea", DataTypes.BYTES()) + .column("short", DataTypes.SMALLINT().notNull()) + .column("long", DataTypes.BIGINT()) + .column("real", DataTypes.FLOAT()) + .column("double_precision", DataTypes.DOUBLE()) + .column("numeric", DataTypes.DECIMAL(10, 5)) + .column("decimal", DataTypes.DECIMAL(10, 1)) + .column("boolean", DataTypes.BOOLEAN()) + .column("text", DataTypes.STRING()) + .column("char", DataTypes.CHAR(1)) + .column("character", DataTypes.CHAR(3)) + .column("character_varying", DataTypes.VARCHAR(20)) + .column("timestamp", DataTypes.TIMESTAMP(5)) + // .field("timestamptz", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(4)) + .column("date", DataTypes.DATE()) + .column("time", DataTypes.TIME(0)) + .column("default_numeric", DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18)) + .primaryKeyNamed(primaryKeyName, "short", "int") + .build(), + "int integer, " + + "bytea bytea, " + + "short smallint, " + + "long bigint, " + + "real real, " + + "double_precision double precision, " + + "numeric numeric(10, 5), " + + "decimal decimal(10, 1), " + + "boolean boolean, " + + "text text, " + + "char char, " + + "character character(3), " + + "character_varying character varying(20), " + + "timestamp timestamp(5), " + + + // "timestamptz timestamptz(4), " + + "date date," + + "time time(0), " + + "default_numeric numeric, " + + "CONSTRAINT " + + primaryKeyName + + " PRIMARY KEY (short, int)", + "1," + + "'2'," + + "3," + + "4," + + "5.5," + + "6.6," + + "7.7," + + "8.8," + + "true," + + "'a'," + + "'b'," + + "'c'," + + "'d'," + + "'2016-06-22 19:10:25'," + + + // "'2006-06-22 19:10:25'," + + "'2015-01-01'," + + "'00:51:02.746572', " + + "500"); + } + + // TODO: add back timestamptz once planner supports timestamp with timezone + public static TestTable getArrayTable() { + return new TestTable( + Schema.newBuilder() + .column("int_arr", DataTypes.ARRAY(DataTypes.INT())) + .column("bytea_arr", DataTypes.ARRAY(DataTypes.BYTES())) + .column("short_arr", DataTypes.ARRAY(DataTypes.SMALLINT())) + .column("long_arr", DataTypes.ARRAY(DataTypes.BIGINT())) + .column("real_arr", DataTypes.ARRAY(DataTypes.FLOAT())) + .column("double_precision_arr", DataTypes.ARRAY(DataTypes.DOUBLE())) + .column("numeric_arr", DataTypes.ARRAY(DataTypes.DECIMAL(10, 5))) + .column( + "numeric_arr_default", + DataTypes.ARRAY(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18))) + .column("decimal_arr", DataTypes.ARRAY(DataTypes.DECIMAL(10, 2))) + .column("boolean_arr", DataTypes.ARRAY(DataTypes.BOOLEAN())) + .column("text_arr", DataTypes.ARRAY(DataTypes.STRING())) + .column("char_arr", DataTypes.ARRAY(DataTypes.CHAR(1))) + .column("character_arr", DataTypes.ARRAY(DataTypes.CHAR(3))) + .column("character_varying_arr", DataTypes.ARRAY(DataTypes.VARCHAR(20))) + .column("timestamp_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP(5))) + // .field("timestamptz_arr", + // DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(4))) + .column("date_arr", DataTypes.ARRAY(DataTypes.DATE())) + .column("time_arr", DataTypes.ARRAY(DataTypes.TIME(0))) + .column("null_bytea_arr", DataTypes.ARRAY(DataTypes.BYTES())) + .column("null_text_arr", DataTypes.ARRAY(DataTypes.STRING())) + .build(), + "int_arr integer[], " + + "bytea_arr bytea[], " + + "short_arr smallint[], " + + "long_arr bigint[], " + + "real_arr real[], " + + "double_precision_arr double precision[], " + + "numeric_arr numeric(10, 5)[], " + + "numeric_arr_default numeric[], " + + "decimal_arr decimal(10,2)[], " + + "boolean_arr boolean[], " + + "text_arr text[], " + + "char_arr char[], " + + "character_arr character(3)[], " + + "character_varying_arr character varying(20)[], " + + "timestamp_arr timestamp(5)[], " + + + // "timestamptz_arr timestamptz(4)[], " + + "date_arr date[], " + + "time_arr time(0)[], " + + "null_bytea_arr bytea[], " + + "null_text_arr text[]", + String.format( + "'{1,2,3}'," + + "'{2,3,4}'," + + "'{3,4,5}'," + + "'{4,5,6}'," + + "'{5.5,6.6,7.7}'," + + "'{6.6,7.7,8.8}'," + + "'{7.7,8.8,9.9}'," + + "'{8.8,9.9,10.10}'," + + "'{9.9,10.10,11.11}'," + + "'{true,false,true}'," + + "'{a,b,c}'," + + "'{b,c,d}'," + + "'{b,c,d}'," + + "'{b,c,d}'," + + "'{\"2016-06-22 19:10:25\", \"2019-06-22 19:10:25\"}'," + + + // "'{\"2006-06-22 19:10:25\", \"2009-06-22 19:10:25\"}'," + + "'{\"2015-01-01\", \"2020-01-01\"}'," + + "'{\"00:51:02.746572\", \"00:59:02.746572\"}'," + + "NULL," + + "NULL")); + } + + public static TestTable getSerialTable() { + return new TestTable( + Schema.newBuilder() + // serial fields are returned as not null by ResultSetMetaData.columnNoNulls + .column("f0", DataTypes.SMALLINT().notNull()) + .column("f1", DataTypes.INT().notNull()) + .column("f2", DataTypes.SMALLINT().notNull()) + .column("f3", DataTypes.INT().notNull()) + .column("f4", DataTypes.BIGINT().notNull()) + .column("f5", DataTypes.BIGINT().notNull()) + .build(), + "f0 smallserial, " + + "f1 serial, " + + "f2 serial2, " + + "f3 serial4, " + + "f4 serial8, " + + "f5 bigserial", + "32767," + + "2147483647," + + "32767," + + "2147483647," + + "9223372036854775807," + + "9223372036854775807"); + } +} diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTablePathTest.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTablePathTest.java new file mode 100644 index 000000000..02663a929 --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTablePathTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.database.catalog; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link GaussdbTablePath}. */ +class GaussdbTablePathTest { + @Test + void testToFlinkTableName() { + assertThat(GaussdbTablePath.toFlinkTableName("my_schema", "my_table")) + .isEqualTo("my_schema.my_table"); + assertThat(GaussdbTablePath.toFlinkTableName("postgres.my_schema", "my_table")) + .isEqualTo("postgres.my_schema.my_table"); + assertThatThrownBy(() -> GaussdbTablePath.toFlinkTableName("", "my_table")) + .isExactlyInstanceOf(IllegalArgumentException.class) + .hasMessage("Schema name is not valid. Null or empty is not allowed"); + } + + @Test + void testFromFlinkTableName() { + assertThat(GaussdbTablePath.fromFlinkTableName("my_schema.my_table")) + .isEqualTo(new GaussdbTablePath("my_schema", "my_table")); + assertThat(GaussdbTablePath.fromFlinkTableName("my_table")) + .isEqualTo(new GaussdbTablePath("public", "my_table")); + assertThatThrownBy(() -> GaussdbTablePath.fromFlinkTableName("postgres.public.my_table")) + .isExactlyInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Table name 'postgres.public.my_table' is not valid. The parsed length is 3"); + assertThatThrownBy(() -> GaussdbTablePath.fromFlinkTableName("")) + .isExactlyInstanceOf(IllegalArgumentException.class) + .hasMessage("Table name is not valid. Null or empty is not allowed"); + } +} diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectTest.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectTest.java new file mode 100644 index 000000000..4f63f66b7 --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.database.dialect; + +import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectTest; +import org.apache.flink.connector.jdbc.gaussdb.GaussdbTestBase; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** The PostgresSql params for {@link JdbcDialectTest}. */ +class GaussdbDialectTest extends JdbcDialectTest implements GaussdbTestBase { + + @Override + protected List testData() { + return Arrays.asList( + createTestItem("CHAR"), + createTestItem("VARCHAR"), + createTestItem("BOOLEAN"), + createTestItem("TINYINT"), + createTestItem("SMALLINT"), + createTestItem("INTEGER"), + createTestItem("BIGINT"), + createTestItem("FLOAT"), + createTestItem("DOUBLE"), + createTestItem("DECIMAL(10, 4)"), + createTestItem("DECIMAL(38, 18)"), + createTestItem("DATE"), + createTestItem("TIME"), + createTestItem("TIMESTAMP(3)"), + createTestItem("TIMESTAMP WITHOUT TIME ZONE"), + createTestItem("VARBINARY"), + createTestItem("ARRAY"), + + // Not valid data + createTestItem("BINARY", "The PostgreSQL dialect doesn't support type: BINARY(1)."), + createTestItem( + "VARBINARY(10)", + "The PostgreSQL dialect doesn't support type: VARBINARY(10)."), + createTestItem( + "TIMESTAMP(9) WITHOUT TIME ZONE", + "The precision of field 'f0' is out of the TIMESTAMP precision range [1, 6] supported by PostgreSQL dialect."), + createTestItem("TIMESTAMP_LTZ(3)", "Unsupported type:TIMESTAMP_LTZ(3)")); + } + + @Test + void testUpsertStatement() { + GaussdbDialect dialect = new GaussdbDialect(); + final String tableName = "tbl"; + final String[] fieldNames = { + "id", "name", "email", "ts", "field1", "field_2", "__field_3__" + }; + final String[] doUpdatekeyFields = {"id", "__field_3__"}; + final String[] doNothingkeyFields = { + "id", "name", "email", "ts", "field1", "field_2", "__field_3__" + }; + + assertThat(dialect.getUpsertStatement(tableName, fieldNames, doUpdatekeyFields).get()) + .isEqualTo( + "INSERT INTO tbl(id, name, email, ts, field1, field_2, __field_3__) VALUES (:id, :name, :email, :ts, :field1, :field_2, :__field_3__) ON DUPLICATE KEY UPDATE name=values(name), email=values(email), ts=values(ts), field1=values(field1), field_2=values(field_2)"); + assertThat(dialect.getUpsertStatement(tableName, fieldNames, doNothingkeyFields).get()) + .isEqualTo( + "INSERT INTO tbl(id, name, email, ts, field1, field_2, __field_3__) VALUES (:id, :name, :email, :ts, :field1, :field_2, :__field_3__) ON DUPLICATE KEY UPDATE "); + } +} diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/table/GaussdbDynamicTableSinkITCase.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/table/GaussdbDynamicTableSinkITCase.java new file mode 100644 index 000000000..e0946a96b --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/table/GaussdbDynamicTableSinkITCase.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.table; + +import org.apache.flink.connector.jdbc.core.table.sink.JdbcDynamicTableSinkITCase; +import org.apache.flink.connector.jdbc.gaussdb.GaussdbTestBase; +import org.apache.flink.connector.jdbc.gaussdb.database.dialect.GaussdbDialect; + +/** The Table Sink ITCase for {@link GaussdbDialect}. */ +class GaussdbDynamicTableSinkITCase extends JdbcDynamicTableSinkITCase implements GaussdbTestBase {} diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/table/GaussdbDynamicTableSourceITCase.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/table/GaussdbDynamicTableSourceITCase.java new file mode 100644 index 000000000..656fc878e --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/table/GaussdbDynamicTableSourceITCase.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.table; + +import org.apache.flink.connector.jdbc.core.table.source.JdbcDynamicTableSourceITCase; +import org.apache.flink.connector.jdbc.gaussdb.GaussdbTestBase; +import org.apache.flink.connector.jdbc.gaussdb.database.dialect.GaussdbDialect; +import org.apache.flink.connector.jdbc.testutils.tables.TableRow; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.types.Row; + +import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.tableRow; + +/** The Table Source ITCase for {@link GaussdbDialect}. */ +class GaussdbDynamicTableSourceITCase extends JdbcDynamicTableSourceITCase + implements GaussdbTestBase { + + @Override + protected TableRow createInputTable() { + return tableRow( + "jdbDynamicTableSource", + field("id", DataTypes.BIGINT().notNull()), + field("decimal_col", DataTypes.DECIMAL(10, 4)), + field("timestamp6_col", DataTypes.TIMESTAMP(6)), + // other fields + field("real_col", dbType("REAL"), DataTypes.FLOAT()), + field("double_col", dbType("DOUBLE PRECISION"), DataTypes.DOUBLE()), + field("time_col", dbType("TIME"), DataTypes.TIME())); + } + + protected List getTestData() { + return Arrays.asList( + Row.of( + 1L, + BigDecimal.valueOf(100.1234), + LocalDateTime.parse("2020-01-01T15:35:00.123456"), + 1.175E-37F, + 1.79769E308D, + LocalTime.parse("15:35")), + Row.of( + 2L, + BigDecimal.valueOf(101.1234), + LocalDateTime.parse("2020-01-01T15:36:01.123456"), + -1.175E-37F, + -1.79769E308, + LocalTime.parse("15:36:01"))); + } +} diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussDBContainer.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussDBContainer.java new file mode 100644 index 000000000..25771fef2 --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussDBContainer.java @@ -0,0 +1,162 @@ +package org.apache.flink.connector.jdbc.gaussdb.testutils; + +import org.jetbrains.annotations.NotNull; +import org.testcontainers.containers.JdbcDatabaseContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.containers.wait.strategy.WaitStrategy; +import org.testcontainers.containers.wait.strategy.WaitStrategyTarget; +import org.testcontainers.utility.DockerImageName; + +import java.time.Duration; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * Testcontainers implementation for GaussDB. Supported images: {@code opengauss/opengauss}, {@code + * pgvector/pgvector} Exposed ports: 8000 + */ +public class GaussDBContainer> + extends JdbcDatabaseContainer { + + public static final String NAME = "gaussdb"; + + public static final String IMAGE = "opengauss/opengauss"; + + public static final String DEFAULT_TAG = "latest"; + + private static final DockerImageName DEFAULT_IMAGE_NAME = + DockerImageName.parse("opengauss/opengauss").asCompatibleSubstituteFor("gaussdb"); + + public static final Integer GAUSSDB_PORT = 8000; + + public static final String DEFAULT_USER_NAME = "flink_jdbc_test"; + + public static final String DEFAULT_PASSWORD = "Flink_jdbc_test@123"; + + private String databaseName = "postgres"; + + private String username = DEFAULT_USER_NAME; + + private String password = DEFAULT_PASSWORD; + + /** + * @deprecated use {@link #GaussDBContainer(DockerImageName)} or {@link + * #GaussDBContainer(String)} instead + */ + @Deprecated + public GaussDBContainer() { + this(DEFAULT_IMAGE_NAME.withTag(DEFAULT_TAG)); + } + + public GaussDBContainer(final String dockerImageName) { + this(DockerImageName.parse(dockerImageName)); + } + + public GaussDBContainer(final DockerImageName dockerImageName) { + super(dockerImageName); + setWaitStrategy( + new WaitStrategy() { + @Override + public void waitUntilReady(WaitStrategyTarget waitStrategyTarget) { + Wait.forListeningPort().waitUntilReady(waitStrategyTarget); + try { + // Open Gauss will set up users and password when ports are ready. + Wait.forLogMessage(".*gs_ctl stopped.*", 1) + .waitUntilReady(waitStrategyTarget); + // Not enough and no idea + TimeUnit.SECONDS.sleep(3); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public WaitStrategy withStartupTimeout(Duration duration) { + return waitStrategy.withStartupTimeout(duration); + } + }); + } + + /** + * @return the ports on which to check if the container is ready + * @deprecated use {@link #getLivenessCheckPortNumbers()} instead + */ + @NotNull + @Override + @Deprecated + protected Set getLivenessCheckPorts() { + return super.getLivenessCheckPorts(); + } + + @Override + protected void configure() { + // Disable Postgres driver use of java.util.logging to reduce noise at startup time + withUrlParam("loggerLevel", "OFF"); + withDatabaseName(databaseName); + addExposedPorts(GAUSSDB_PORT); + addFixedExposedPort(GAUSSDB_PORT, GAUSSDB_PORT); + addEnv("GS_PORT", String.valueOf(GAUSSDB_PORT)); + addEnv("GS_USERNAME", username); + addEnv("GS_PASSWORD", password); + } + + @Override + public String getDriverClassName() { + return "com.huawei.gaussdb.jdbc.Driver"; + } + + @Override + public String getJdbcUrl() { + String additionalUrlParams = constructUrlParameters("?", "&"); + return ("jdbc:gaussdb://" + + getHost() + + ":" + + getMappedPort(GAUSSDB_PORT) + + "/" + + databaseName + + additionalUrlParams); + } + + @Override + public String getDatabaseName() { + return databaseName; + } + + @Override + public String getUsername() { + return username; + } + + @Override + public String getPassword() { + return password; + } + + @Override + public String getTestQueryString() { + return "SELECT 1"; + } + + @Override + public SELF withDatabaseName(final String databaseName) { + this.databaseName = databaseName; + return self(); + } + + @Override + public SELF withUsername(final String username) { + this.username = username; + return self(); + } + + @Override + public SELF withPassword(final String password) { + this.password = password; + return self(); + } + + @Override + protected void waitUntilContainerStarted() { + getWaitStrategy().waitUntilReady(this); + } +} diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbDatabase.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbDatabase.java new file mode 100644 index 000000000..833b136c8 --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbDatabase.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.testutils; + +import org.apache.flink.connector.jdbc.testutils.DatabaseExtension; +import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata; +import org.apache.flink.connector.jdbc.testutils.DatabaseResource; +import org.apache.flink.connector.jdbc.testutils.resources.DockerResource; +import org.apache.flink.util.FlinkRuntimeException; + +import org.testcontainers.utility.DockerImageName; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** A Gaussdb database for testing. */ +public class GaussdbDatabase extends DatabaseExtension implements GaussdbImages { + + private static final GaussDBContainer CONTAINER = + new GaussdbXaContainer(IMAGE).withMaxConnections(10).withMaxTransactions(50); + + private static GaussdbMetadata metadata; + + public static GaussdbMetadata getMetadata() { + if (!CONTAINER.isRunning()) { + throw new FlinkRuntimeException("Container is stopped."); + } + if (metadata == null) { + metadata = new GaussdbMetadata(CONTAINER, true); + } + return metadata; + } + + protected DatabaseMetadata getMetadataDB() { + return getMetadata(); + } + + @Override + protected DatabaseResource getResource() { + return new DockerResource(CONTAINER); + } + + /** {@link GaussDBContainer} with XA enabled (by setting max_prepared_transactions). */ + public static class GaussdbXaContainer extends GaussDBContainer { + private static final int SUPERUSER_RESERVED_CONNECTIONS = 1; + private int maxConnections = SUPERUSER_RESERVED_CONNECTIONS + 1; + private int maxTransactions = 1; + + public GaussdbXaContainer(String dockerImageName) { + super(DockerImageName.parse(dockerImageName)); + } + + public GaussdbXaContainer withMaxConnections(int maxConnections) { + checkArgument( + maxConnections > SUPERUSER_RESERVED_CONNECTIONS, + "maxConnections should be greater than superuser_reserved_connections"); + this.maxConnections = maxConnections; + return this.self(); + } + + public GaussdbXaContainer withMaxTransactions(int maxTransactions) { + checkArgument(maxTransactions > 1, "maxTransactions should be greater 1"); + this.maxTransactions = maxTransactions; + return this.self(); + } + + @Override + public void start() { + super.start(); + } + } +} diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbImages.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbImages.java new file mode 100644 index 000000000..91ccbfbd6 --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbImages.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.testutils; + +// import org.testcontainers.utility.DockerImageName; +/** Gaussdb docker images. */ +public interface GaussdbImages { + String IMAGE = "opengauss/opengauss"; +} diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbMetadata.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbMetadata.java new file mode 100644 index 000000000..a5f2cab21 --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbMetadata.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.testutils; + +import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata; + +import org.postgresql.xa.PGXADataSource; +import org.testcontainers.containers.JdbcDatabaseContainer; + +import javax.sql.XADataSource; + +/** Gaussdb Metadata. */ +public class GaussdbMetadata implements DatabaseMetadata { + + private final String username; + private final String password; + private final String url; + private final String driver; + private final String version; + private final boolean xaEnabled; + + public GaussdbMetadata(GaussDBContainer container) { + this(container, false); + } + + public GaussdbMetadata(JdbcDatabaseContainer container, boolean hasXaEnabled) { + this.username = container.getUsername(); + this.password = container.getPassword(); + this.url = container.getJdbcUrl(); + this.driver = container.getDriverClassName(); + this.version = container.getDockerImageName(); + this.xaEnabled = hasXaEnabled; + } + + @Override + public String getJdbcUrl() { + return this.url; + } + + @Override + public String getJdbcUrlWithCredentials() { + return String.format("%s&user=%s&password=%s", getJdbcUrl(), getUsername(), getPassword()); + } + + @Override + public String getUsername() { + return this.username; + } + + @Override + public String getPassword() { + return this.password; + } + + @Override + public XADataSource buildXaDataSource() { + if (!xaEnabled) { + throw new UnsupportedOperationException(); + } + + PGXADataSource xaDataSource = new PGXADataSource(); + xaDataSource.setUrl(getJdbcUrl()); + xaDataSource.setUser(getUsername()); + xaDataSource.setPassword(getPassword()); + return xaDataSource; + } + + @Override + public String getDriverClass() { + return this.driver; + } + + @Override + public String getVersion() { + return version; + } +} diff --git a/flink-connector-jdbc-gaussdb/src/test/resources/log4j2-test.properties b/flink-connector-jdbc-gaussdb/src/test/resources/log4j2-test.properties new file mode 100644 index 000000000..835c2ec9a --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/test/resources/log4j2-test.properties @@ -0,0 +1,28 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = OFF +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n From e6d195704a97acdda2a5ced5db8102e233031cff Mon Sep 17 00:00:00 2001 From: david_1366677 Date: Fri, 18 Apr 2025 11:14:44 +0800 Subject: [PATCH 2/9] Add gaussdb module --- pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/pom.xml b/pom.xml index fe2fc778b..a603a665c 100644 --- a/pom.xml +++ b/pom.xml @@ -53,6 +53,7 @@ under the License. flink-connector-jdbc-postgres flink-connector-jdbc-sqlserver flink-connector-jdbc-trino + flink-connector-jdbc-gaussdb From e31b47527f3ccc5fb3801f1c8e4970bac42e034f Mon Sep 17 00:00:00 2001 From: david_1366677 Date: Mon, 21 Apr 2025 14:31:08 +0800 Subject: [PATCH 3/9] Adjust the dependencies of the gaussdb pom file. --- flink-connector-jdbc-gaussdb/pom.xml | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/flink-connector-jdbc-gaussdb/pom.xml b/flink-connector-jdbc-gaussdb/pom.xml index 241c3c348..880e86ced 100644 --- a/flink-connector-jdbc-gaussdb/pom.xml +++ b/flink-connector-jdbc-gaussdb/pom.xml @@ -114,19 +114,6 @@ jackson-core test - - - - From 153fb0261cef5bc1ef55899b644837f4b5372cc1 Mon Sep 17 00:00:00 2001 From: david_1366677 Date: Tue, 22 Apr 2025 17:31:54 +0800 Subject: [PATCH 4/9] Adjust the test image of the gaussdb and add license header. --- .../gaussdb/testutils/GaussDBContainer.java | 28 +++++++++++++++---- .../jdbc/gaussdb/testutils/GaussdbImages.java | 2 +- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussDBContainer.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussDBContainer.java index 25771fef2..c9780047e 100644 --- a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussDBContainer.java +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussDBContainer.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.flink.connector.jdbc.gaussdb.testutils; import org.jetbrains.annotations.NotNull; @@ -12,20 +29,21 @@ import java.util.concurrent.TimeUnit; /** - * Testcontainers implementation for GaussDB. Supported images: {@code opengauss/opengauss}, {@code - * pgvector/pgvector} Exposed ports: 8000 + * Testcontainers implementation for GaussDB. Supported images: {@code + * opengauss/opengauss:7.0.0-RC1.B023}, {@code pgvector/pgvector} Exposed ports: 8000 */ public class GaussDBContainer> extends JdbcDatabaseContainer { public static final String NAME = "gaussdb"; - public static final String IMAGE = "opengauss/opengauss"; + public static final String IMAGE = "opengauss/opengauss:7.0.0-RC1.B023"; - public static final String DEFAULT_TAG = "latest"; + public static final String DEFAULT_TAG = "7.0.0-RC1.B023"; private static final DockerImageName DEFAULT_IMAGE_NAME = - DockerImageName.parse("opengauss/opengauss").asCompatibleSubstituteFor("gaussdb"); + DockerImageName.parse("opengauss/opengauss:7.0.0-RC1.B023") + .asCompatibleSubstituteFor("gaussdb"); public static final Integer GAUSSDB_PORT = 8000; diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbImages.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbImages.java index 91ccbfbd6..343ac3f71 100644 --- a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbImages.java +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbImages.java @@ -20,5 +20,5 @@ // import org.testcontainers.utility.DockerImageName; /** Gaussdb docker images. */ public interface GaussdbImages { - String IMAGE = "opengauss/opengauss"; + String IMAGE = "opengauss/opengauss:7.0.0-RC1.B023"; } From 9b93d7c0068202f4744e31796995b702db4feefe Mon Sep 17 00:00:00 2001 From: david_1366677 Date: Thu, 24 Apr 2025 20:17:39 +0800 Subject: [PATCH 5/9] sync vcs.xml --- .idea/vcs.xml | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 35eb1ddfb..0dd9dbf02 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -1,6 +1,24 @@ + + + - + \ No newline at end of file From c62d0c6415ac5484072ac436ef694c8b21c101c7 Mon Sep 17 00:00:00 2001 From: david_1366677 Date: Mon, 28 Apr 2025 10:46:59 +0800 Subject: [PATCH 6/9] Explain the integration of Gaussian DB source code --- .../connector/jdbc/gaussdb/database/GaussdbFactory.java | 6 +++++- .../jdbc/gaussdb/database/catalog/GaussdbCatalog.java | 6 +++++- .../jdbc/gaussdb/database/catalog/GaussdbTablePath.java | 2 ++ .../jdbc/gaussdb/database/catalog/GaussdbTypeMapper.java | 6 +++++- .../gaussdb/database/dialect/CompatibleGaussdbDialect.java | 6 +++++- .../dialect/CompatibleGaussdbDialectConverter.java | 6 +++++- .../jdbc/gaussdb/database/dialect/GaussdbDialect.java | 6 +++++- .../gaussdb/database/dialect/GaussdbDialectConverter.java | 2 ++ .../flink/connector/jdbc/gaussdb/GaussdbTestBase.java | 6 +++++- .../jdbc/gaussdb/database/GaussdbFactoryTest.java | 6 +++++- .../gaussdb/database/catalog/GaussdbCatalogITCase.java | 6 +++++- .../jdbc/gaussdb/database/catalog/GaussdbCatalogTest.java | 6 +++++- .../gaussdb/database/catalog/GaussdbCatalogTestBase.java | 6 +++++- .../gaussdb/database/catalog/GaussdbTablePathTest.java | 6 +++++- .../jdbc/gaussdb/database/dialect/GaussdbDialectTest.java | 6 +++++- .../jdbc/gaussdb/table/GaussdbDynamicTableSinkITCase.java | 6 +++++- .../gaussdb/table/GaussdbDynamicTableSourceITCase.java | 6 +++++- .../connector/jdbc/gaussdb/testutils/GaussDBContainer.java | 2 ++ .../connector/jdbc/gaussdb/testutils/GaussdbDatabase.java | 6 +++++- .../connector/jdbc/gaussdb/testutils/GaussdbImages.java | 7 +++++-- .../connector/jdbc/gaussdb/testutils/GaussdbMetadata.java | 6 +++++- 21 files changed, 96 insertions(+), 19 deletions(-) diff --git a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/GaussdbFactory.java b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/GaussdbFactory.java index d1ce55849..2bbd32afe 100644 --- a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/GaussdbFactory.java +++ b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/GaussdbFactory.java @@ -25,7 +25,11 @@ import org.apache.flink.connector.jdbc.gaussdb.database.catalog.GaussdbCatalog; import org.apache.flink.connector.jdbc.gaussdb.database.dialect.GaussdbDialect; -/** Factory for {@link GaussdbDialect}. */ +/** + * Factory for {@link GaussdbDialect}. + * + *

Notes: The source code is based on PostgresFactory. + */ @Internal public class GaussdbFactory implements JdbcFactory { @Override diff --git a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalog.java b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalog.java index 6a09e143f..f04920a32 100644 --- a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalog.java +++ b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalog.java @@ -45,7 +45,11 @@ import static org.apache.flink.connector.jdbc.JdbcConnectionOptions.getBriefAuthProperties; -/** Catalog for GaussDB. */ +/** + * Catalog for GaussDB. + * + *

Notes: The source code is based on PostgresCatalog. + */ @Internal public class GaussdbCatalog extends AbstractJdbcCatalog { diff --git a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTablePath.java b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTablePath.java index f6dac0548..da3975381 100644 --- a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTablePath.java +++ b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTablePath.java @@ -28,6 +28,8 @@ /** * Table path of GaussDB in Flink. Can be of formats "table_name" or "schema_name.table_name". When * it's "table_name", the schema name defaults to "public". + * + *

Notes: The source code is based on PostgresTablePath. */ @Internal public class GaussdbTablePath { diff --git a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTypeMapper.java b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTypeMapper.java index 6456e4fbd..3c1267843 100644 --- a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTypeMapper.java +++ b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTypeMapper.java @@ -31,7 +31,11 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; -/** GaussdbTypeMapper util class. */ +/** + * GaussdbTypeMapper util class. + * + *

Notes: The source code is based on PostgresTypeMapper. + */ @Internal public class GaussdbTypeMapper implements JdbcCatalogTypeMapper { diff --git a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/CompatibleGaussdbDialect.java b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/CompatibleGaussdbDialect.java index aae02d2a9..00a2330cf 100644 --- a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/CompatibleGaussdbDialect.java +++ b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/CompatibleGaussdbDialect.java @@ -23,7 +23,11 @@ import java.util.Optional; -/** JDBC dialect for GaussDB compatible databases. */ +/** + * JDBC dialect for GaussDB compatible databases. + * + *

Notes: The source code is based on CompatiblePostgresDialect. + */ @PublicEvolving public abstract class CompatibleGaussdbDialect extends GaussdbDialect { diff --git a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/CompatibleGaussdbDialectConverter.java b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/CompatibleGaussdbDialectConverter.java index 10fb97184..45ffa1d75 100644 --- a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/CompatibleGaussdbDialectConverter.java +++ b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/CompatibleGaussdbDialectConverter.java @@ -21,7 +21,11 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.table.types.logical.RowType; -/** JDBC converter for Gaussdb compatible databases. */ +/** + * JDBC converter for Gaussdb compatible databases. + * + *

Notes: The source code is based on CompatiblePostgresDialectConverter. + */ @Internal public abstract class CompatibleGaussdbDialectConverter extends GaussdbDialectConverter { diff --git a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialect.java b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialect.java index fe2706b15..d13fd2d97 100644 --- a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialect.java +++ b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialect.java @@ -30,7 +30,11 @@ import java.util.Set; import java.util.stream.Collectors; -/** JDBC dialect for Gaussdb. */ +/** + * JDBC dialect for Gaussdb. + * + *

Notes: The source code is based on PostgresDialect. + */ @Internal public class GaussdbDialect extends AbstractDialect { diff --git a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectConverter.java b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectConverter.java index e3c6f5687..a37df8eb6 100644 --- a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectConverter.java +++ b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectConverter.java @@ -32,6 +32,8 @@ /** * Runtime converter that responsible to convert between JDBC object and Flink internal object for * GaussDB. + * + *

Notes: The source code is based on PostgresDialectConverter. */ @Internal public class GaussdbDialectConverter extends AbstractDialectConverter { diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/GaussdbTestBase.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/GaussdbTestBase.java index a243deab2..1355d3cec 100644 --- a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/GaussdbTestBase.java +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/GaussdbTestBase.java @@ -24,7 +24,11 @@ import org.junit.jupiter.api.extension.ExtendWith; -/** Base class for Postgres testing. */ +/** + * Base class for Postgres testing. + * + *

Notes: The source code is based on PostgresTestBase. + */ @ExtendWith(GaussdbDatabase.class) public interface GaussdbTestBase extends DatabaseTest { diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/GaussdbFactoryTest.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/GaussdbFactoryTest.java index 24c9e76cf..eec9a75fd 100644 --- a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/GaussdbFactoryTest.java +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/GaussdbFactoryTest.java @@ -36,7 +36,11 @@ import static org.assertj.core.api.Assertions.assertThat; -/** Test for {@link JdbcCatalogFactory}. */ +/** + * Test for {@link JdbcCatalogFactory}. + * + *

Notes: The source code is based on PostgresFactoryTest. + */ class GaussdbFactoryTest implements GaussdbTestBase { protected static String baseUrl; diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogITCase.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogITCase.java index 0d8b6e6f4..773960edf 100644 --- a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogITCase.java +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogITCase.java @@ -32,7 +32,11 @@ import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM; import static org.assertj.core.api.Assertions.assertThat; -/** E2E test for {@link GaussdbCatalog}. */ +/** + * E2E test for {@link GaussdbCatalog}. + * + *

Notes: The source code is based on PostgresCatalogITCase. + */ class GaussdbCatalogITCase extends GaussdbCatalogTestBase { private TableEnvironment tEnv; diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogTest.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogTest.java index b09bcb43a..66f5fbd7f 100644 --- a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogTest.java +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogTest.java @@ -32,7 +32,11 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -/** Test for {@link GaussdbCatalog}. */ +/** + * Test for {@link GaussdbCatalog}. + * + *

Notes: The source code is based on PostgresCatalogTest. + */ class GaussdbCatalogTest extends GaussdbCatalogTestBase { // ------ databases ------ diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogTestBase.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogTestBase.java index f0611cb8b..315d4ca0b 100644 --- a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogTestBase.java +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogTestBase.java @@ -34,7 +34,11 @@ import java.sql.SQLException; import java.sql.Statement; -/** Test base for {@link GaussdbCatalog}. */ +/** + * Test base for {@link GaussdbCatalog}. + * + *

Notes: The source code is based on PostgresCatalogTestBase. + */ class GaussdbCatalogTestBase implements JdbcITCaseBase, GaussdbTestBase { private static DatabaseMetadata getStaticMetadata() { diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTablePathTest.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTablePathTest.java index 02663a929..cbdcd360f 100644 --- a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTablePathTest.java +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTablePathTest.java @@ -23,7 +23,11 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -/** Test for {@link GaussdbTablePath}. */ +/** + * Test for {@link GaussdbTablePath}. + * + *

Notes: The source code is based on PostgresTablePathTest. + */ class GaussdbTablePathTest { @Test void testToFlinkTableName() { diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectTest.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectTest.java index 4f63f66b7..884f395df 100644 --- a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectTest.java +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectTest.java @@ -28,7 +28,11 @@ import static org.assertj.core.api.Assertions.assertThat; -/** The PostgresSql params for {@link JdbcDialectTest}. */ +/** + * The PostgresSql params for {@link JdbcDialectTest}. + * + *

Notes: The source code is based on PostgresDialectTest. + */ class GaussdbDialectTest extends JdbcDialectTest implements GaussdbTestBase { @Override diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/table/GaussdbDynamicTableSinkITCase.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/table/GaussdbDynamicTableSinkITCase.java index e0946a96b..a1ebf9ccb 100644 --- a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/table/GaussdbDynamicTableSinkITCase.java +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/table/GaussdbDynamicTableSinkITCase.java @@ -22,5 +22,9 @@ import org.apache.flink.connector.jdbc.gaussdb.GaussdbTestBase; import org.apache.flink.connector.jdbc.gaussdb.database.dialect.GaussdbDialect; -/** The Table Sink ITCase for {@link GaussdbDialect}. */ +/** + * The Table Sink ITCase for {@link GaussdbDialect}. + * + *

Notes: The source code is based on PostgresDynamicTableSinkITCase. + */ class GaussdbDynamicTableSinkITCase extends JdbcDynamicTableSinkITCase implements GaussdbTestBase {} diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/table/GaussdbDynamicTableSourceITCase.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/table/GaussdbDynamicTableSourceITCase.java index 656fc878e..e14285b0f 100644 --- a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/table/GaussdbDynamicTableSourceITCase.java +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/table/GaussdbDynamicTableSourceITCase.java @@ -35,7 +35,11 @@ import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field; import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.tableRow; -/** The Table Source ITCase for {@link GaussdbDialect}. */ +/** + * The Table Source ITCase for {@link GaussdbDialect}. + * + *

Notes: The source code is based on PostgresDynamicTableSourceITCase. + */ class GaussdbDynamicTableSourceITCase extends JdbcDynamicTableSourceITCase implements GaussdbTestBase { diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussDBContainer.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussDBContainer.java index c9780047e..6cde85a76 100644 --- a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussDBContainer.java +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussDBContainer.java @@ -31,6 +31,8 @@ /** * Testcontainers implementation for GaussDB. Supported images: {@code * opengauss/opengauss:7.0.0-RC1.B023}, {@code pgvector/pgvector} Exposed ports: 8000 + * + *

Notes: The source code is based on PostgresContainer. */ public class GaussDBContainer> extends JdbcDatabaseContainer { diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbDatabase.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbDatabase.java index 833b136c8..f64ba67a2 100644 --- a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbDatabase.java +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbDatabase.java @@ -27,7 +27,11 @@ import static org.apache.flink.util.Preconditions.checkArgument; -/** A Gaussdb database for testing. */ +/** + * A Gaussdb database for testing. + * + *

Notes: The source code is based on PostgresDatabase. + */ public class GaussdbDatabase extends DatabaseExtension implements GaussdbImages { private static final GaussDBContainer CONTAINER = diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbImages.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbImages.java index 343ac3f71..e79eaa915 100644 --- a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbImages.java +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbImages.java @@ -17,8 +17,11 @@ package org.apache.flink.connector.jdbc.gaussdb.testutils; -// import org.testcontainers.utility.DockerImageName; -/** Gaussdb docker images. */ +/** + * Gaussdb docker images. + * + *

Notes: The source code is based on PostgresImages. + */ public interface GaussdbImages { String IMAGE = "opengauss/opengauss:7.0.0-RC1.B023"; } diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbMetadata.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbMetadata.java index a5f2cab21..6a701b018 100644 --- a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbMetadata.java +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbMetadata.java @@ -24,7 +24,11 @@ import javax.sql.XADataSource; -/** Gaussdb Metadata. */ +/** + * Gaussdb Metadata. + * + *

Notes: The source code is based on PostgresMetadata. + */ public class GaussdbMetadata implements DatabaseMetadata { private final String username; From d3fa96133dc9d695a79e12a8546a749034023f4f Mon Sep 17 00:00:00 2001 From: david_1366677 Date: Mon, 28 Apr 2025 10:49:50 +0800 Subject: [PATCH 7/9] Architecture module integrates Gaussian database --- flink-connector-jdbc-architecture/pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/flink-connector-jdbc-architecture/pom.xml b/flink-connector-jdbc-architecture/pom.xml index fe6fa34b2..a719c30b4 100644 --- a/flink-connector-jdbc-architecture/pom.xml +++ b/flink-connector-jdbc-architecture/pom.xml @@ -90,6 +90,12 @@ ${project.version} test + + org.apache.flink + flink-connector-jdbc-gaussdb + ${project.version} + test + From 872c3ce7c5dccba8e107847ebdc0504d7bb802d8 Mon Sep 17 00:00:00 2001 From: david_1366677 Date: Mon, 28 Apr 2025 10:49:50 +0800 Subject: [PATCH 8/9] Optimize the features of GaussDB database --- flink-connector-jdbc-architecture/pom.xml | 6 ++++ flink-connector-jdbc-gaussdb/pom.xml | 26 +++------------ .../database/dialect/GaussdbDialect.java | 3 +- .../dialect/GaussdbDialectConverter.java | 32 ++++++++++++------- .../catalog/GaussdbCatalogITCase.java | 6 ++-- .../database/catalog/GaussdbCatalogTest.java | 6 ++-- .../catalog/GaussdbCatalogTestBase.java | 13 +++++--- .../database/dialect/GaussdbDialectTest.java | 6 ++-- .../gaussdb/testutils/GaussDBContainer.java | 3 +- .../gaussdb/testutils/GaussdbDatabase.java | 16 ++++++---- .../gaussdb/testutils/GaussdbMetadata.java | 9 ++++-- 11 files changed, 68 insertions(+), 58 deletions(-) diff --git a/flink-connector-jdbc-architecture/pom.xml b/flink-connector-jdbc-architecture/pom.xml index fe6fa34b2..a719c30b4 100644 --- a/flink-connector-jdbc-architecture/pom.xml +++ b/flink-connector-jdbc-architecture/pom.xml @@ -90,6 +90,12 @@ ${project.version} test + + org.apache.flink + flink-connector-jdbc-gaussdb + ${project.version} + test + diff --git a/flink-connector-jdbc-gaussdb/pom.xml b/flink-connector-jdbc-gaussdb/pom.xml index 880e86ced..62d2380ce 100644 --- a/flink-connector-jdbc-gaussdb/pom.xml +++ b/flink-connector-jdbc-gaussdb/pom.xml @@ -15,7 +15,7 @@ jar - 506.0.0.b058 + 506.0.0.b058-jdk7 @@ -75,15 +75,8 @@ com.huaweicloud.gaussdb gaussdbjdbc - 506.0.0.b058 - provided - - - - com.huaweicloud.gaussdb - gaussdbjdbc - 506.0.0.b058 - test + 506.0.0.b058-jdk7 + @@ -98,17 +91,6 @@ jdbc test - - org.testcontainers - postgresql - test - - - org.postgresql - postgresql - 42.7.3 - test - com.fasterxml.jackson.core jackson-core @@ -132,4 +114,4 @@ - \ No newline at end of file + diff --git a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialect.java b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialect.java index d13fd2d97..3b4c35db5 100644 --- a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialect.java +++ b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialect.java @@ -43,7 +43,8 @@ public class GaussdbDialect extends AbstractDialect { // Define MAX/MIN precision of TIMESTAMP type according to Gaussdb docs: private static final int MAX_TIMESTAMP_PRECISION = 6; - private static final int MIN_TIMESTAMP_PRECISION = 1; + // TODO: see https://bbs.huaweicloud.com/forum/thread-0234179025026914116-1-1.html + private static final int MIN_TIMESTAMP_PRECISION = 0; // Define MAX/MIN precision of DECIMAL type according to Gaussdb docs: diff --git a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectConverter.java b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectConverter.java index a37df8eb6..c44b34d68 100644 --- a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectConverter.java +++ b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectConverter.java @@ -27,7 +27,12 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.utils.LogicalTypeUtils; +import com.huawei.gaussdb.jdbc.util.PGbytea; +import com.huawei.gaussdb.jdbc.util.PGobject; + import java.lang.reflect.Array; +import java.nio.charset.StandardCharsets; +import java.sql.SQLException; /** * Runtime converter that responsible to convert between JDBC object and Flink internal object for @@ -48,12 +53,24 @@ protected GaussdbDialectConverter(RowType rowType) { public JdbcDeserializationConverter createInternalConverter(LogicalType type) { LogicalTypeRoot root = type.getTypeRoot(); + if (root == LogicalTypeRoot.VARBINARY) { + return val -> { + if (val instanceof PGobject) { + return pgObjectBytes((PGobject) val); + } + return val; + }; + } if (root == LogicalTypeRoot.ARRAY) { ArrayType arrayType = (ArrayType) type; - return createGaussdbArrayConverter(arrayType); - } else { - return createPrimitiveConverter(type); + return createGaussDBArrayConverter(arrayType); } + + return super.createInternalConverter(type); + } + + private Object pgObjectBytes(PGobject val) throws SQLException { + return PGbytea.toBytes(val.getValue().getBytes(StandardCharsets.US_ASCII)); } @Override @@ -72,8 +89,7 @@ protected JdbcSerializationConverter createNullableExternalConverter(LogicalType } } - private JdbcDeserializationConverter createGaussdbArrayConverter(ArrayType arrayType) { - + private JdbcDeserializationConverter createGaussDBArrayConverter(ArrayType arrayType) { final Class elementClass = LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType()); final JdbcDeserializationConverter elementConverter = @@ -89,12 +105,6 @@ private JdbcDeserializationConverter createGaussdbArrayConverter(ArrayType array }; } - // Have its own method so that Gaussdb can support primitives that super class doesn't support - // in the future - private JdbcDeserializationConverter createPrimitiveConverter(LogicalType type) { - return super.createInternalConverter(type); - } - @Override public String converterName() { return "Gaussdb"; diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogITCase.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogITCase.java index 773960edf..b5a540b5c 100644 --- a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogITCase.java +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogITCase.java @@ -131,7 +131,7 @@ void testGroupByInsert() throws Exception { .collect()); assertThat(results) .hasToString( - "[+I[1, [52, 49], 3, 4, 5.5, 6.6, 7.70000, 8.8, true, a, B, C , d, 2016-06-22T19:10:25, 2015-01-01, 00:51:03, 500.000000000000000000]]"); + "[+I[1, [52, 49], 3, 4, 5.5, 6.6, 7.70000, 8.8, true, a, B, C , d, 2016-06-22T19:10:25, 2015-01-01T00:00, 00:51:03, 500.000000000000000000]]"); } @Test @@ -144,7 +144,7 @@ void testPrimitiveTypes() { assertThat(results) .hasToString( - "[+I[1, [50], 3, 4, 5.5, 6.6, 7.70000, 8.8, true, a, b, c , d, 2016-06-22T19:10:25, 2015-01-01, 00:51:03, 500.000000000000000000]]"); + "[+I[1, [50], 3, 4, 5.5, 6.6, 7.70000, 8.8, true, a, b, c , d, 2016-06-22T19:10:25, 2015-01-01T00:00, 00:51:03, 500.000000000000000000]]"); } @Test @@ -173,7 +173,7 @@ void testArrayTypes() { + "[b , c , d ], " + "[b, c, d], " + "[2016-06-22T19:10:25, 2019-06-22T19:10:25], " - + "[2015-01-01, 2020-01-01], " + + "[2015-01-01T00:00, 2020-01-01T00:00], " + "[00:51:03, 00:59:03], " + "null, " + "null]]"); diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogTest.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogTest.java index 66f5fbd7f..29a4fbdc8 100644 --- a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogTest.java +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogTest.java @@ -52,7 +52,7 @@ void testGetDb_DatabaseNotExistException() { void testListDatabases() { List actual = catalog.listDatabases(); - assertThat(actual).isEqualTo(Arrays.asList("postgres", "test")); + assertThat(actual).containsAll(Arrays.asList("postgres", "test")); } @Test @@ -69,7 +69,7 @@ void testListTables() throws DatabaseNotExistException { List actual = catalog.listTables(GaussdbCatalog.DEFAULT_DATABASE); assertThat(actual) - .isEqualTo( + .containsAll( Arrays.asList( "public.array_table", "public.primitive_table", @@ -81,7 +81,7 @@ void testListTables() throws DatabaseNotExistException { actual = catalog.listTables(TEST_DB); - assertThat(actual).isEqualTo(Arrays.asList("public.t2", "test_schema.t3")); + assertThat(actual).containsAll(Arrays.asList("public.t2", "test_schema.t3")); } @Test diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogTestBase.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogTestBase.java index 315d4ca0b..631222e60 100644 --- a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogTestBase.java +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogTestBase.java @@ -79,6 +79,7 @@ static void init() throws SQLException { baseUrl); // create test database and schema + createDatabase(TEST_DB); createSchema(TEST_DB, TEST_SCHEMA); // create test tables @@ -162,6 +163,8 @@ static void afterAll() throws SQLException { GaussdbCatalog.DEFAULT_DATABASE, String.format( "DROP TABLE %s ", GaussdbTablePath.fromFlinkTableName(TABLE_SERIAL_TYPE))); + + executeSQL(GaussdbCatalog.DEFAULT_DATABASE, "drop database " + TEST_DB + ";"); } public static void createTable(GaussdbTablePath tablePath, String tableSchemaSql) @@ -182,7 +185,7 @@ public static void createSchema(String db, String schema) throws SQLException { } public static void createDatabase(String database) throws SQLException { - executeSQL(String.format("CREATE DATABASE %s;", database)); + executeSQL(GaussdbCatalog.DEFAULT_DATABASE, String.format("CREATE DATABASE %s;", database)); } public static void executeSQL(String sql) throws SQLException { @@ -248,7 +251,7 @@ public static TestTable getPrimitiveTable(String primaryKeyName) { .column("character_varying", DataTypes.VARCHAR(20)) .column("timestamp", DataTypes.TIMESTAMP(5)) // .field("timestamptz", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(4)) - .column("date", DataTypes.DATE()) + .column("date", DataTypes.TIMESTAMP(0)) .column("time", DataTypes.TIME(0)) .column("default_numeric", DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18)) .primaryKeyNamed(primaryKeyName, "short", "int") @@ -269,7 +272,7 @@ public static TestTable getPrimitiveTable(String primaryKeyName) { + "timestamp timestamp(5), " + // "timestamptz timestamptz(4), " + - "date date," + "date timestamp(0)," + "time time(0), " + "default_numeric numeric, " + "CONSTRAINT " @@ -319,7 +322,7 @@ public static TestTable getArrayTable() { .column("timestamp_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP(5))) // .field("timestamptz_arr", // DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(4))) - .column("date_arr", DataTypes.ARRAY(DataTypes.DATE())) + .column("date_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP(0))) .column("time_arr", DataTypes.ARRAY(DataTypes.TIME(0))) .column("null_bytea_arr", DataTypes.ARRAY(DataTypes.BYTES())) .column("null_text_arr", DataTypes.ARRAY(DataTypes.STRING())) @@ -341,7 +344,7 @@ public static TestTable getArrayTable() { + "timestamp_arr timestamp(5)[], " + // "timestamptz_arr timestamptz(4)[], " + - "date_arr date[], " + "date_arr timestamp(0)[], " + "time_arr time(0)[], " + "null_bytea_arr bytea[], " + "null_text_arr text[]", diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectTest.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectTest.java index 884f395df..0bac34a5d 100644 --- a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectTest.java +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectTest.java @@ -57,13 +57,13 @@ protected List testData() { createTestItem("ARRAY"), // Not valid data - createTestItem("BINARY", "The PostgreSQL dialect doesn't support type: BINARY(1)."), + createTestItem("BINARY", "The Gaussdb dialect doesn't support type: BINARY(1)."), createTestItem( "VARBINARY(10)", - "The PostgreSQL dialect doesn't support type: VARBINARY(10)."), + "The Gaussdb dialect doesn't support type: VARBINARY(10)."), createTestItem( "TIMESTAMP(9) WITHOUT TIME ZONE", - "The precision of field 'f0' is out of the TIMESTAMP precision range [1, 6] supported by PostgreSQL dialect."), + "The precision of field 'f0' is out of the TIMESTAMP precision range [0, 6] supported by Gaussdb dialect."), createTestItem("TIMESTAMP_LTZ(3)", "Unsupported type:TIMESTAMP_LTZ(3)")); } diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussDBContainer.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussDBContainer.java index 6cde85a76..94c877b7e 100644 --- a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussDBContainer.java +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussDBContainer.java @@ -95,6 +95,7 @@ public WaitStrategy withStartupTimeout(Duration duration) { return waitStrategy.withStartupTimeout(duration); } }); + addExposedPort(GAUSSDB_PORT); } /** @@ -113,8 +114,6 @@ protected void configure() { // Disable Postgres driver use of java.util.logging to reduce noise at startup time withUrlParam("loggerLevel", "OFF"); withDatabaseName(databaseName); - addExposedPorts(GAUSSDB_PORT); - addFixedExposedPort(GAUSSDB_PORT, GAUSSDB_PORT); addEnv("GS_PORT", String.valueOf(GAUSSDB_PORT)); addEnv("GS_USERNAME", username); addEnv("GS_PASSWORD", password); diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbDatabase.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbDatabase.java index f64ba67a2..dca1b677c 100644 --- a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbDatabase.java +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbDatabase.java @@ -37,6 +37,8 @@ public class GaussdbDatabase extends DatabaseExtension implements GaussdbImages private static final GaussDBContainer CONTAINER = new GaussdbXaContainer(IMAGE).withMaxConnections(10).withMaxTransactions(50); + private static final DockerResource RESOURCE = new DockerResource(CONTAINER); + private static GaussdbMetadata metadata; public static GaussdbMetadata getMetadata() { @@ -55,14 +57,13 @@ protected DatabaseMetadata getMetadataDB() { @Override protected DatabaseResource getResource() { - return new DockerResource(CONTAINER); + return RESOURCE; } /** {@link GaussDBContainer} with XA enabled (by setting max_prepared_transactions). */ public static class GaussdbXaContainer extends GaussDBContainer { private static final int SUPERUSER_RESERVED_CONNECTIONS = 1; - private int maxConnections = SUPERUSER_RESERVED_CONNECTIONS + 1; - private int maxTransactions = 1; + private volatile boolean started = false; public GaussdbXaContainer(String dockerImageName) { super(DockerImageName.parse(dockerImageName)); @@ -72,19 +73,22 @@ public GaussdbXaContainer withMaxConnections(int maxConnections) { checkArgument( maxConnections > SUPERUSER_RESERVED_CONNECTIONS, "maxConnections should be greater than superuser_reserved_connections"); - this.maxConnections = maxConnections; return this.self(); } public GaussdbXaContainer withMaxTransactions(int maxTransactions) { checkArgument(maxTransactions > 1, "maxTransactions should be greater 1"); - this.maxTransactions = maxTransactions; return this.self(); } @Override public void start() { - super.start(); + synchronized (this) { + if (!started) { + super.start(); + started = true; + } + } } } } diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbMetadata.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbMetadata.java index 6a701b018..f7a1eec0e 100644 --- a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbMetadata.java +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbMetadata.java @@ -19,7 +19,8 @@ import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata; -import org.postgresql.xa.PGXADataSource; +import com.huawei.gaussdb.jdbc.util.PSQLException; +import com.huawei.gaussdb.jdbc.xa.PGXADataSource; import org.testcontainers.containers.JdbcDatabaseContainer; import javax.sql.XADataSource; @@ -78,7 +79,11 @@ public XADataSource buildXaDataSource() { } PGXADataSource xaDataSource = new PGXADataSource(); - xaDataSource.setUrl(getJdbcUrl()); + try { + xaDataSource.setUrl(getJdbcUrl()); + } catch (PSQLException e) { + throw new RuntimeException(e); + } xaDataSource.setUser(getUsername()); xaDataSource.setPassword(getPassword()); return xaDataSource; From e8510efef5143b642c94668617ce6b0e26c5fa93 Mon Sep 17 00:00:00 2001 From: david_1366677 Date: Tue, 17 Jun 2025 11:11:07 +0800 Subject: [PATCH 9/9] Solved some problems during the review. --- flink-connector-jdbc-gaussdb/pom.xml | 1 - pom.xml | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/flink-connector-jdbc-gaussdb/pom.xml b/flink-connector-jdbc-gaussdb/pom.xml index 62d2380ce..dda3efb10 100644 --- a/flink-connector-jdbc-gaussdb/pom.xml +++ b/flink-connector-jdbc-gaussdb/pom.xml @@ -76,7 +76,6 @@ com.huaweicloud.gaussdb gaussdbjdbc 506.0.0.b058-jdk7 - diff --git a/pom.xml b/pom.xml index a603a665c..e4ebbaa00 100644 --- a/pom.xml +++ b/pom.xml @@ -47,13 +47,13 @@ under the License. flink-connector-jdbc-core flink-connector-jdbc-cratedb flink-connector-jdbc-db2 + flink-connector-jdbc-gaussdb flink-connector-jdbc-mysql flink-connector-jdbc-oceanbase flink-connector-jdbc-oracle flink-connector-jdbc-postgres flink-connector-jdbc-sqlserver flink-connector-jdbc-trino - flink-connector-jdbc-gaussdb