diff --git a/flink-connector-jdbc-architecture/pom.xml b/flink-connector-jdbc-architecture/pom.xml index fe6fa34b2..a719c30b4 100644 --- a/flink-connector-jdbc-architecture/pom.xml +++ b/flink-connector-jdbc-architecture/pom.xml @@ -90,6 +90,12 @@ ${project.version} test + + org.apache.flink + flink-connector-jdbc-gaussdb + ${project.version} + test + diff --git a/flink-connector-jdbc-gaussdb/pom.xml b/flink-connector-jdbc-gaussdb/pom.xml new file mode 100644 index 000000000..62d2380ce --- /dev/null +++ b/flink-connector-jdbc-gaussdb/pom.xml @@ -0,0 +1,117 @@ + + + 4.0.0 + + org.apache.flink + flink-connector-jdbc-parent + 4.0-SNAPSHOT + + + flink-connector-jdbc-gaussdb + Flink : Connectors : JDBC : GaussDB + + jar + + + 506.0.0.b058-jdk7 + + + + + + org.apache.flink + flink-connector-jdbc-core + ${project.version} + + + + org.apache.flink + flink-connector-jdbc-core + ${project.version} + test-jar + test + + + + org.apache.flink + flink-table-common + ${flink.version} + test-jar + test + + + + org.apache.flink + flink-table-api-java-bridge + ${flink.version} + provided + true + + + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${flink.version} + test + + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${flink.version} + test-jar + test + + + + org.apache.flink + flink-test-utils + ${flink.version} + test + + + + + com.huaweicloud.gaussdb + gaussdbjdbc + 506.0.0.b058-jdk7 + + + + + + org.assertj + assertj-core + ${assertj.version} + test + + + org.testcontainers + jdbc + test + + + com.fasterxml.jackson.core + jackson-core + test + + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + + diff --git a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/GaussdbFactory.java b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/GaussdbFactory.java new file mode 100644 index 000000000..2bbd32afe --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/GaussdbFactory.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.database; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.core.database.JdbcFactory; +import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog; +import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialect; +import org.apache.flink.connector.jdbc.gaussdb.database.catalog.GaussdbCatalog; +import org.apache.flink.connector.jdbc.gaussdb.database.dialect.GaussdbDialect; + +/** + * Factory for {@link GaussdbDialect}. + * + *

Notes: The source code is based on PostgresFactory. + */ +@Internal +public class GaussdbFactory implements JdbcFactory { + @Override + public boolean acceptsURL(String url) { + return url.startsWith("jdbc:gaussdb:"); + } + + @Override + public JdbcDialect createDialect() { + return new GaussdbDialect(); + } + + @Override + public JdbcCatalog createCatalog( + ClassLoader classLoader, + String catalogName, + String defaultDatabase, + String username, + String pwd, + String baseUrl) { + return new GaussdbCatalog( + classLoader, catalogName, defaultDatabase, username, pwd, baseUrl); + } +} diff --git a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalog.java b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalog.java new file mode 100644 index 000000000..f04920a32 --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalog.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.database.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.jdbc.core.database.catalog.AbstractJdbcCatalog; +import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalogTypeMapper; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.compress.utils.Lists; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +import static org.apache.flink.connector.jdbc.JdbcConnectionOptions.getBriefAuthProperties; + +/** + * Catalog for GaussDB. + * + *

Notes: The source code is based on PostgresCatalog. + */ +@Internal +public class GaussdbCatalog extends AbstractJdbcCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(GaussdbCatalog.class); + + public static final String DEFAULT_DATABASE = "postgres"; + + // ------ GaussDB default objects that shouldn't be exposed to users ------ + + private static final Set builtinDatabases = + new HashSet() { + { + add("template0"); + add("template1"); + } + }; + + private static final Set builtinSchemas = + new HashSet() { + { + add("pg_toast"); + add("pg_temp_1"); + add("pg_toast_temp_1"); + add("pg_catalog"); + add("information_schema"); + } + }; + + protected final JdbcCatalogTypeMapper dialectTypeMapper; + + @VisibleForTesting + public GaussdbCatalog( + ClassLoader userClassLoader, + String catalogName, + String defaultDatabase, + String username, + String pwd, + String baseUrl) { + this( + userClassLoader, + catalogName, + defaultDatabase, + baseUrl, + getBriefAuthProperties(username, pwd)); + } + + public GaussdbCatalog( + ClassLoader userClassLoader, + String catalogName, + String defaultDatabase, + String baseUrl, + Properties connectProperties) { + this( + userClassLoader, + catalogName, + defaultDatabase, + baseUrl, + new GaussdbTypeMapper(), + connectProperties); + } + + @Deprecated + protected GaussdbCatalog( + ClassLoader userClassLoader, + String catalogName, + String defaultDatabase, + String username, + String pwd, + String baseUrl, + JdbcCatalogTypeMapper dialectTypeMapper) { + this( + userClassLoader, + catalogName, + defaultDatabase, + baseUrl, + dialectTypeMapper, + getBriefAuthProperties(username, pwd)); + } + + protected GaussdbCatalog( + ClassLoader userClassLoader, + String catalogName, + String defaultDatabase, + String baseUrl, + JdbcCatalogTypeMapper dialectTypeMapper, + Properties connectProperties) { + super(userClassLoader, catalogName, defaultDatabase, baseUrl, connectProperties); + this.dialectTypeMapper = dialectTypeMapper; + } + + // ------ databases ------ + + @Override + public List listDatabases() throws CatalogException { + + return extractColumnValuesBySQL( + defaultUrl, + "SELECT datname FROM pg_database;", + 1, + dbName -> !builtinDatabases.contains(dbName)); + } + + // ------ schemas ------ + + protected Set getBuiltinSchemas() { + return builtinSchemas; + } + + // ------ tables ------ + + protected List getPureTables(Connection conn, List schemas) + throws SQLException { + List tables = Lists.newArrayList(); + + // position 1 is database name, position 2 is schema name, position 3 is table name + try (PreparedStatement ps = + conn.prepareStatement( + "SELECT * FROM information_schema.tables " + + "WHERE table_type = 'BASE TABLE' " + + "AND table_schema = ? " + + "ORDER BY table_type, table_name;")) { + for (String schema : schemas) { + // Column index 1 is database name, 2 is schema name, 3 is table name + extractColumnValuesByStatement(ps, 3, null, schema).stream() + .map(pureTable -> schema + "." + pureTable) + .forEach(tables::add); + } + return tables; + } + } + + @Override + public List listTables(String databaseName) + throws DatabaseNotExistException, CatalogException { + + Preconditions.checkState( + StringUtils.isNotBlank(databaseName), "Database name must not be blank."); + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(getName(), databaseName); + } + + final String url = getDatabaseUrl(databaseName); + try (Connection conn = DriverManager.getConnection(url, connectionProperties)) { + // get all schemas + List schemas; + try (PreparedStatement ps = + conn.prepareStatement("SELECT schema_name FROM information_schema.schemata;")) { + schemas = + extractColumnValuesByStatement( + ps, 1, pgSchema -> !getBuiltinSchemas().contains(pgSchema)); + } + + // get all tables + return getPureTables(conn, schemas); + } catch (Exception e) { + throw new CatalogException( + String.format("Failed to list tables for database %s", databaseName), e); + } + } + + /** Converts Gaussdb type to Flink {@link DataType}. */ + @Override + protected DataType fromJDBCType(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex) + throws SQLException { + return dialectTypeMapper.mapping(tablePath, metadata, colIndex); + } + + @Override + public boolean tableExists(ObjectPath tablePath) throws CatalogException { + + List tables = null; + try { + tables = listTables(tablePath.getDatabaseName()); + } catch (DatabaseNotExistException e) { + return false; + } + + return tables.contains(getSchemaTableName(tablePath)); + } + + @Override + protected String getTableName(ObjectPath tablePath) { + return GaussdbTablePath.fromFlinkTableName(tablePath.getObjectName()).getPgTableName(); + } + + @Override + protected String getSchemaName(ObjectPath tablePath) { + return GaussdbTablePath.fromFlinkTableName(tablePath.getObjectName()).getPgSchemaName(); + } + + @Override + protected String getSchemaTableName(ObjectPath tablePath) { + return GaussdbTablePath.fromFlinkTableName(tablePath.getObjectName()).getFullPath(); + } +} diff --git a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTablePath.java b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTablePath.java new file mode 100644 index 000000000..da3975381 --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTablePath.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.database.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.StringUtils; + +import java.util.Objects; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Table path of GaussDB in Flink. Can be of formats "table_name" or "schema_name.table_name". When + * it's "table_name", the schema name defaults to "public". + * + *

Notes: The source code is based on PostgresTablePath. + */ +@Internal +public class GaussdbTablePath { + + private static final String DEFAULT_POSTGRES_SCHEMA_NAME = "public"; + + private final String pgSchemaName; + private final String pgTableName; + + public GaussdbTablePath(String pgSchemaName, String pgTableName) { + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(pgSchemaName), + "Schema name is not valid. Null or empty is not allowed"); + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(pgTableName), + "Table name is not valid. Null or empty is not allowed"); + + this.pgSchemaName = pgSchemaName; + this.pgTableName = pgTableName; + } + + public static GaussdbTablePath fromFlinkTableName(String flinkTableName) { + if (flinkTableName.contains(".")) { + String[] path = flinkTableName.split("\\."); + + checkArgument( + path != null && path.length == 2, + String.format( + "Table name '%s' is not valid. The parsed length is %d", + flinkTableName, path.length)); + + return new GaussdbTablePath(path[0], path[1]); + } else { + return new GaussdbTablePath(getDefaultSchemaName(), flinkTableName); + } + } + + public static String toFlinkTableName(String schema, String table) { + return new GaussdbTablePath(schema, table).getFullPath(); + } + + public String getFullPath() { + return String.format("%s.%s", pgSchemaName, pgTableName); + } + + public String getPgTableName() { + return pgTableName; + } + + public String getPgSchemaName() { + return pgSchemaName; + } + + protected static String getDefaultSchemaName() { + return DEFAULT_POSTGRES_SCHEMA_NAME; + } + + @Override + public String toString() { + return getFullPath(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + GaussdbTablePath that = (GaussdbTablePath) o; + return Objects.equals(pgSchemaName, that.pgSchemaName) + && Objects.equals(pgTableName, that.pgTableName); + } + + @Override + public int hashCode() { + return Objects.hash(pgSchemaName, pgTableName); + } +} diff --git a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTypeMapper.java b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTypeMapper.java new file mode 100644 index 000000000..3c1267843 --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTypeMapper.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.database.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalogTypeMapper; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.DecimalType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.ResultSetMetaData; +import java.sql.SQLException; + +/** + * GaussdbTypeMapper util class. + * + *

Notes: The source code is based on PostgresTypeMapper. + */ +@Internal +public class GaussdbTypeMapper implements JdbcCatalogTypeMapper { + + private static final Logger LOG = LoggerFactory.getLogger(GaussdbTypeMapper.class); + + // GaussDB JDBC driver maps several aliases to real types. We use the real types rather than + // serial2 <=> int2 + // smallserial <=> int2 + // serial4 <=> serial + // serial8 <=> bigserial + // smallint <=> int2 + // integer <=> int4 + // int <=> int4 + // bigint <=> int8 + // float <=> float8 + // boolean <=> bool + // decimal <=> numeric + private static final String PG_SMALLSERIAL = "smallserial"; + protected static final String PG_SERIAL = "serial"; + protected static final String PG_BIGSERIAL = "bigserial"; + private static final String PG_BYTEA = "bytea"; + private static final String PG_BYTEA_ARRAY = "_bytea"; + private static final String PG_SMALLINT = "int2"; + private static final String PG_SMALLINT_ARRAY = "_int2"; + private static final String PG_INTEGER = "int4"; + private static final String PG_INTEGER_ARRAY = "_int4"; + private static final String PG_BIGINT = "int8"; + private static final String PG_BIGINT_ARRAY = "_int8"; + private static final String PG_REAL = "float4"; + private static final String PG_REAL_ARRAY = "_float4"; + private static final String PG_DOUBLE_PRECISION = "float8"; + private static final String PG_DOUBLE_PRECISION_ARRAY = "_float8"; + private static final String PG_NUMERIC = "numeric"; + private static final String PG_NUMERIC_ARRAY = "_numeric"; + private static final String PG_BOOLEAN = "bool"; + private static final String PG_BOOLEAN_ARRAY = "_bool"; + private static final String PG_TIMESTAMP = "timestamp"; + private static final String PG_TIMESTAMP_ARRAY = "_timestamp"; + private static final String PG_TIMESTAMPTZ = "timestamptz"; + private static final String PG_TIMESTAMPTZ_ARRAY = "_timestamptz"; + private static final String PG_DATE = "date"; + private static final String PG_DATE_ARRAY = "_date"; + private static final String PG_TIME = "time"; + private static final String PG_TIME_ARRAY = "_time"; + private static final String PG_TEXT = "text"; + private static final String PG_TEXT_ARRAY = "_text"; + private static final String PG_CHAR = "bpchar"; + private static final String PG_CHAR_ARRAY = "_bpchar"; + private static final String PG_CHARACTER = "character"; + private static final String PG_CHARACTER_ARRAY = "_character"; + private static final String PG_CHARACTER_VARYING = "varchar"; + private static final String PG_CHARACTER_VARYING_ARRAY = "_varchar"; + + @Override + public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex) + throws SQLException { + String pgType = metadata.getColumnTypeName(colIndex); + + int precision = metadata.getPrecision(colIndex); + int scale = metadata.getScale(colIndex); + + DataType dataType = getMapping(pgType, precision, scale); + if (dataType == null) { + throw new UnsupportedOperationException( + String.format("Doesn't support %s type '%s' yet", getDBType(), pgType)); + } + return dataType; + } + + protected DataType getMapping(String pgType, int precision, int scale) { + switch (pgType) { + case PG_BOOLEAN: + return DataTypes.BOOLEAN(); + case PG_BOOLEAN_ARRAY: + return DataTypes.ARRAY(DataTypes.BOOLEAN()); + case PG_BYTEA: + return DataTypes.BYTES(); + case PG_BYTEA_ARRAY: + return DataTypes.ARRAY(DataTypes.BYTES()); + case PG_SMALLINT: + case PG_SMALLSERIAL: + return DataTypes.SMALLINT(); + case PG_SMALLINT_ARRAY: + return DataTypes.ARRAY(DataTypes.SMALLINT()); + case PG_INTEGER: + case PG_SERIAL: + return DataTypes.INT(); + case PG_INTEGER_ARRAY: + return DataTypes.ARRAY(DataTypes.INT()); + case PG_BIGINT: + case PG_BIGSERIAL: + return DataTypes.BIGINT(); + case PG_BIGINT_ARRAY: + return DataTypes.ARRAY(DataTypes.BIGINT()); + case PG_REAL: + return DataTypes.FLOAT(); + case PG_REAL_ARRAY: + return DataTypes.ARRAY(DataTypes.FLOAT()); + case PG_DOUBLE_PRECISION: + return DataTypes.DOUBLE(); + case PG_DOUBLE_PRECISION_ARRAY: + return DataTypes.ARRAY(DataTypes.DOUBLE()); + case PG_NUMERIC: + // handle numeric without explicit precision and scale. + if (precision > 0) { + return DataTypes.DECIMAL(precision, scale); + } + return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18); + case PG_NUMERIC_ARRAY: + // handle numeric without explicit precision and scale. + if (precision > 0) { + return DataTypes.ARRAY(DataTypes.DECIMAL(precision, scale)); + } + return DataTypes.ARRAY(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18)); + case PG_CHAR: + case PG_CHARACTER: + return DataTypes.CHAR(precision); + case PG_CHAR_ARRAY: + case PG_CHARACTER_ARRAY: + return DataTypes.ARRAY(DataTypes.CHAR(precision)); + case PG_CHARACTER_VARYING: + return DataTypes.VARCHAR(precision); + case PG_CHARACTER_VARYING_ARRAY: + return DataTypes.ARRAY(DataTypes.VARCHAR(precision)); + case PG_TEXT: + return DataTypes.STRING(); + case PG_TEXT_ARRAY: + return DataTypes.ARRAY(DataTypes.STRING()); + case PG_TIMESTAMP: + return DataTypes.TIMESTAMP(scale); + case PG_TIMESTAMP_ARRAY: + return DataTypes.ARRAY(DataTypes.TIMESTAMP(scale)); + case PG_TIMESTAMPTZ: + return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(scale); + case PG_TIMESTAMPTZ_ARRAY: + return DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(scale)); + case PG_TIME: + return DataTypes.TIME(scale); + case PG_TIME_ARRAY: + return DataTypes.ARRAY(DataTypes.TIME(scale)); + case PG_DATE: + return DataTypes.DATE(); + case PG_DATE_ARRAY: + return DataTypes.ARRAY(DataTypes.DATE()); + default: + return null; + } + } + + protected String getDBType() { + return "Gaussdb"; + } +} diff --git a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/CompatibleGaussdbDialect.java b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/CompatibleGaussdbDialect.java new file mode 100644 index 000000000..00a2330cf --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/CompatibleGaussdbDialect.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.database.dialect; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Optional; + +/** + * JDBC dialect for GaussDB compatible databases. + * + *

Notes: The source code is based on CompatiblePostgresDialect. + */ +@PublicEvolving +public abstract class CompatibleGaussdbDialect extends GaussdbDialect { + + private static final long serialVersionUID = 1L; + + protected abstract String compatibleDialectName(); + + protected abstract CompatibleGaussdbDialectConverter compatibleRowConverter(RowType rowType); + + protected abstract Optional compatibleDriverName(); + + @Override + public String dialectName() { + return compatibleDialectName(); + } + + @Override + public CompatibleGaussdbDialectConverter getRowConverter(RowType rowType) { + return compatibleRowConverter(rowType); + } + + @Override + public Optional defaultDriverName() { + return compatibleDriverName(); + } +} diff --git a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/CompatibleGaussdbDialectConverter.java b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/CompatibleGaussdbDialectConverter.java new file mode 100644 index 000000000..45ffa1d75 --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/CompatibleGaussdbDialectConverter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.database.dialect; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.types.logical.RowType; + +/** + * JDBC converter for Gaussdb compatible databases. + * + *

Notes: The source code is based on CompatiblePostgresDialectConverter. + */ +@Internal +public abstract class CompatibleGaussdbDialectConverter extends GaussdbDialectConverter { + + private static final long serialVersionUID = 1L; + + protected CompatibleGaussdbDialectConverter(RowType rowType) { + super(rowType); + } + + protected abstract String compatibleConverterName(); + + @Override + public String converterName() { + return compatibleConverterName(); + } +} diff --git a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialect.java b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialect.java new file mode 100644 index 000000000..3b4c35db5 --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialect.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.database.dialect; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialect; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Arrays; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * JDBC dialect for Gaussdb. + * + *

Notes: The source code is based on PostgresDialect. + */ +@Internal +public class GaussdbDialect extends AbstractDialect { + + private static final long serialVersionUID = 1L; + + // Define MAX/MIN precision of TIMESTAMP type according to Gaussdb docs: + + private static final int MAX_TIMESTAMP_PRECISION = 6; + // TODO: see https://bbs.huaweicloud.com/forum/thread-0234179025026914116-1-1.html + private static final int MIN_TIMESTAMP_PRECISION = 0; + + // Define MAX/MIN precision of DECIMAL type according to Gaussdb docs: + + private static final int MAX_DECIMAL_PRECISION = 1000; + private static final int MIN_DECIMAL_PRECISION = 1; + + @Override + public GaussdbDialectConverter getRowConverter(RowType rowType) { + return new GaussdbDialectConverter(rowType); + } + + @Override + public Optional defaultDriverName() { + return Optional.of("com.huawei.gaussdb.jdbc.Driver"); + } + + @Override + public String dialectName() { + return "Gaussdb"; + } + + @Override + public String getLimitClause(long limit) { + return "LIMIT " + limit; + } + + @Override + public String quoteIdentifier(String identifier) { + return identifier; + } + + @Override + public Optional decimalPrecisionRange() { + return Optional.of(Range.of(MIN_DECIMAL_PRECISION, MAX_DECIMAL_PRECISION)); + } + + @Override + public Optional timestampPrecisionRange() { + return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION)); + } + + @Override + public Set supportedTypes() { + return EnumSet.of( + LogicalTypeRoot.CHAR, + LogicalTypeRoot.VARCHAR, + LogicalTypeRoot.BOOLEAN, + LogicalTypeRoot.VARBINARY, + LogicalTypeRoot.DECIMAL, + LogicalTypeRoot.TINYINT, + LogicalTypeRoot.SMALLINT, + LogicalTypeRoot.INTEGER, + LogicalTypeRoot.BIGINT, + LogicalTypeRoot.FLOAT, + LogicalTypeRoot.DOUBLE, + LogicalTypeRoot.DATE, + LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, + LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, + LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, + LogicalTypeRoot.ARRAY); + } + + /** Gaussdb upsert query. It use ON DUPLICATE KEY UPDATE .. to replace into Gaussdb. */ + @Override + public Optional getUpsertStatement( + String tableName, String[] fieldNames, String[] uniqueKeyFields) { + String uniqueColumns = + Arrays.stream(uniqueKeyFields) + .map(this::quoteIdentifier) + .collect(Collectors.joining(", ")); + final Set uniqueKeyFieldsSet = new HashSet<>(Arrays.asList(uniqueKeyFields)); + String updateClause = + Arrays.stream(fieldNames) + .filter(f -> !uniqueKeyFieldsSet.contains(f)) + .map(f -> quoteIdentifier(f) + "=values(" + quoteIdentifier(f) + ")") + .collect(Collectors.joining(", ")); + return Optional.of( + this.getInsertIntoStatement(tableName, fieldNames) + + " ON DUPLICATE KEY UPDATE " + + updateClause); + } +} diff --git a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectConverter.java b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectConverter.java new file mode 100644 index 000000000..c44b34d68 --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectConverter.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.database.dialect; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialectConverter; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.utils.LogicalTypeUtils; + +import com.huawei.gaussdb.jdbc.util.PGbytea; +import com.huawei.gaussdb.jdbc.util.PGobject; + +import java.lang.reflect.Array; +import java.nio.charset.StandardCharsets; +import java.sql.SQLException; + +/** + * Runtime converter that responsible to convert between JDBC object and Flink internal object for + * GaussDB. + * + *

Notes: The source code is based on PostgresDialectConverter. + */ +@Internal +public class GaussdbDialectConverter extends AbstractDialectConverter { + + private static final long serialVersionUID = 1L; + + protected GaussdbDialectConverter(RowType rowType) { + super(rowType); + } + + @Override + public JdbcDeserializationConverter createInternalConverter(LogicalType type) { + LogicalTypeRoot root = type.getTypeRoot(); + + if (root == LogicalTypeRoot.VARBINARY) { + return val -> { + if (val instanceof PGobject) { + return pgObjectBytes((PGobject) val); + } + return val; + }; + } + if (root == LogicalTypeRoot.ARRAY) { + ArrayType arrayType = (ArrayType) type; + return createGaussDBArrayConverter(arrayType); + } + + return super.createInternalConverter(type); + } + + private Object pgObjectBytes(PGobject val) throws SQLException { + return PGbytea.toBytes(val.getValue().getBytes(StandardCharsets.US_ASCII)); + } + + @Override + protected JdbcSerializationConverter createNullableExternalConverter(LogicalType type) { + LogicalTypeRoot root = type.getTypeRoot(); + if (root == LogicalTypeRoot.ARRAY) { + // note: Writing ARRAY type is not yet supported by GaussdbQL dialect now. + return (val, index, statement) -> { + throw new IllegalStateException( + String.format( + "Writing ARRAY type is not yet supported in JDBC:%s.", + converterName())); + }; + } else { + return super.createNullableExternalConverter(type); + } + } + + private JdbcDeserializationConverter createGaussDBArrayConverter(ArrayType arrayType) { + final Class elementClass = + LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType()); + final JdbcDeserializationConverter elementConverter = + createNullableInternalConverter(arrayType.getElementType()); + return val -> { + java.sql.Array pgArray = (java.sql.Array) val; + Object[] in = (Object[]) pgArray.getArray(); + final Object[] array = (Object[]) Array.newInstance(elementClass, in.length); + for (int i = 0; i < in.length; i++) { + array[i] = elementConverter.deserialize(in[i]); + } + return new GenericArrayData(array); + }; + } + + @Override + public String converterName() { + return "Gaussdb"; + } +} diff --git a/flink-connector-jdbc-gaussdb/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.core.database.JdbcFactory b/flink-connector-jdbc-gaussdb/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.core.database.JdbcFactory new file mode 100644 index 000000000..1e14edd1b --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.core.database.JdbcFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.connector.jdbc.gaussdb.database.GaussdbFactory \ No newline at end of file diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/GaussdbTestBase.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/GaussdbTestBase.java new file mode 100644 index 000000000..1355d3cec --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/GaussdbTestBase.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb; + +import org.apache.flink.connector.jdbc.gaussdb.testutils.GaussdbDatabase; +import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata; +import org.apache.flink.connector.jdbc.testutils.DatabaseTest; + +import org.junit.jupiter.api.extension.ExtendWith; + +/** + * Base class for Postgres testing. + * + *

Notes: The source code is based on PostgresTestBase. + */ +@ExtendWith(GaussdbDatabase.class) +public interface GaussdbTestBase extends DatabaseTest { + + @Override + default DatabaseMetadata getMetadata() { + return GaussdbDatabase.getMetadata(); + } +} diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/GaussdbFactoryTest.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/GaussdbFactoryTest.java new file mode 100644 index 000000000..eec9a75fd --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/GaussdbFactoryTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.database; + +import org.apache.flink.connector.jdbc.core.database.JdbcFactoryLoader; +import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog; +import org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactory; +import org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactoryOptions; +import org.apache.flink.connector.jdbc.gaussdb.GaussdbTestBase; +import org.apache.flink.connector.jdbc.gaussdb.database.catalog.GaussdbCatalog; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.factories.FactoryUtil; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Test for {@link JdbcCatalogFactory}. + * + *

Notes: The source code is based on PostgresFactoryTest. + */ +class GaussdbFactoryTest implements GaussdbTestBase { + + protected static String baseUrl; + protected static JdbcCatalog catalog; + + protected static final String TEST_CATALOG_NAME = "mygs"; + + @BeforeEach + void setup() { + String jdbcUrl = getMetadata().getJdbcUrl(); + baseUrl = jdbcUrl.substring(0, jdbcUrl.lastIndexOf("/")); + + catalog = + JdbcFactoryLoader.loadCatalog( + Thread.currentThread().getContextClassLoader(), + TEST_CATALOG_NAME, + GaussdbCatalog.DEFAULT_DATABASE, + getMetadata().getUsername(), + getMetadata().getPassword(), + baseUrl, + null); + } + + @Test + void test() { + final Map options = new HashMap<>(); + options.put(CommonCatalogOptions.CATALOG_TYPE.key(), JdbcCatalogFactoryOptions.IDENTIFIER); + options.put( + JdbcCatalogFactoryOptions.DEFAULT_DATABASE.key(), GaussdbCatalog.DEFAULT_DATABASE); + options.put(JdbcCatalogFactoryOptions.USERNAME.key(), getMetadata().getUsername()); + options.put(JdbcCatalogFactoryOptions.PASSWORD.key(), getMetadata().getPassword()); + options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl); + + final Catalog actualCatalog = + FactoryUtil.createCatalog( + TEST_CATALOG_NAME, + options, + null, + Thread.currentThread().getContextClassLoader()); + + checkEquals(catalog, (JdbcCatalog) actualCatalog); + + assertThat((JdbcCatalog) actualCatalog).isInstanceOf(GaussdbCatalog.class); + } + + private static void checkEquals(JdbcCatalog c1, JdbcCatalog c2) { + assertThat(c2).isEqualTo(c1); + } +} diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogITCase.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogITCase.java new file mode 100644 index 000000000..b5a540b5c --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogITCase.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.database.catalog; + +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.util.CollectionUtil; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.apache.flink.connector.jdbc.gaussdb.database.catalog.GaussdbCatalog.DEFAULT_DATABASE; +import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * E2E test for {@link GaussdbCatalog}. + * + *

Notes: The source code is based on PostgresCatalogITCase. + */ +class GaussdbCatalogITCase extends GaussdbCatalogTestBase { + + private TableEnvironment tEnv; + + @BeforeEach + void setup() { + this.tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode()); + tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + + // use PG catalog + tEnv.registerCatalog(TEST_CATALOG_NAME, catalog); + tEnv.useCatalog(TEST_CATALOG_NAME); + } + + @Test + void testSelectField() { + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery(String.format("select id from %s", TABLE1)) + .execute() + .collect()); + assertThat(results).hasToString("[+I[1]]"); + } + + @Test + void testWithoutSchema() { + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery(String.format("select * from %s", TABLE1)) + .execute() + .collect()); + assertThat(results).hasToString("[+I[1]]"); + } + + @Test + void testWithSchema() { + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery( + String.format( + "select * from `%s`", + GaussdbTablePath.fromFlinkTableName(TABLE1))) + .execute() + .collect()); + assertThat(results).hasToString("[+I[1]]"); + } + + @Test + void testFullPath() { + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery( + String.format( + "select * from %s.%s.`%s`", + TEST_CATALOG_NAME, + DEFAULT_DATABASE, + GaussdbTablePath.fromFlinkTableName(TABLE1))) + .execute() + .collect()); + assertThat(results).hasToString("[+I[1]]"); + } + + @Test + void testInsert() throws Exception { + tEnv.executeSql(String.format("insert into %s select * from `%s`", TABLE4, TABLE1)).await(); + + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery(String.format("select * from %s", TABLE4)) + .execute() + .collect()); + assertThat(results).hasToString("[+I[1]]"); + } + + @Test + void testGroupByInsert() throws Exception { + tEnv.executeSql( + String.format( + "insert into `%s` " + + "select `int`, cast('41' as bytes), `short`, max(`long`), max(`real`), " + + "max(`double_precision`), max(`numeric`), max(`decimal`), max(`boolean`), " + + "max(`text`), 'B', 'C', max(`character_varying`), max(`timestamp`), " + + "max(`date`), max(`time`), max(`default_numeric`) " + + "from `%s` group by `int`, `short`", + TABLE_PRIMITIVE_TYPE2, TABLE_PRIMITIVE_TYPE)) + .await(); + + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery(String.format("select * from `%s`", TABLE_PRIMITIVE_TYPE2)) + .execute() + .collect()); + assertThat(results) + .hasToString( + "[+I[1, [52, 49], 3, 4, 5.5, 6.6, 7.70000, 8.8, true, a, B, C , d, 2016-06-22T19:10:25, 2015-01-01T00:00, 00:51:03, 500.000000000000000000]]"); + } + + @Test + void testPrimitiveTypes() { + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery(String.format("select * from %s", TABLE_PRIMITIVE_TYPE)) + .execute() + .collect()); + + assertThat(results) + .hasToString( + "[+I[1, [50], 3, 4, 5.5, 6.6, 7.70000, 8.8, true, a, b, c , d, 2016-06-22T19:10:25, 2015-01-01T00:00, 00:51:03, 500.000000000000000000]]"); + } + + @Test + void testArrayTypes() { + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery(String.format("select * from %s", TABLE_ARRAY_TYPE)) + .execute() + .collect()); + + assertThat(results) + .hasToString( + "[+I[" + + "[1, 2, 3], " + + "[[50], [51], [52]], " + + "[3, 4, 5], " + + "[4, 5, 6], " + + "[5.5, 6.6, 7.7], " + + "[6.6, 7.7, 8.8], " + + "[7.70000, 8.80000, 9.90000], " + + "[8.800000000000000000, 9.900000000000000000, 10.100000000000000000], " + + "[9.90, 10.10, 11.11], " + + "[true, false, true], " + + "[a, b, c], " + + "[b, c, d], " + + "[b , c , d ], " + + "[b, c, d], " + + "[2016-06-22T19:10:25, 2019-06-22T19:10:25], " + + "[2015-01-01T00:00, 2020-01-01T00:00], " + + "[00:51:03, 00:59:03], " + + "null, " + + "null]]"); + } + + @Test + void testSerialTypes() { + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery(String.format("select * from %s", TABLE_SERIAL_TYPE)) + .execute() + .collect()); + + assertThat(results) + .hasToString( + "[+I[" + + "32767, " + + "2147483647, " + + "32767, " + + "2147483647, " + + "9223372036854775807, " + + "9223372036854775807]]"); + } +} diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogTest.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogTest.java new file mode 100644 index 000000000..29a4fbdc8 --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogTest.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.database.catalog; + +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Test for {@link GaussdbCatalog}. + * + *

Notes: The source code is based on PostgresCatalogTest. + */ +class GaussdbCatalogTest extends GaussdbCatalogTestBase { + + // ------ databases ------ + + @Test + void testGetDb_DatabaseNotExistException() { + assertThatThrownBy(() -> catalog.getDatabase("nonexistent")) + .isInstanceOf(DatabaseNotExistException.class) + .hasMessageContaining("Database nonexistent does not exist in Catalog"); + } + + @Test + void testListDatabases() { + List actual = catalog.listDatabases(); + + assertThat(actual).containsAll(Arrays.asList("postgres", "test")); + } + + @Test + void testDbExists() { + assertThat(catalog.databaseExists("nonexistent")).isFalse(); + + assertThat(catalog.databaseExists(GaussdbCatalog.DEFAULT_DATABASE)).isTrue(); + } + + // ------ tables ------ + + @Test + void testListTables() throws DatabaseNotExistException { + List actual = catalog.listTables(GaussdbCatalog.DEFAULT_DATABASE); + + assertThat(actual) + .containsAll( + Arrays.asList( + "public.array_table", + "public.primitive_table", + "public.primitive_table2", + "public.serial_table", + "public.t1", + "public.t4", + "public.t5")); + + actual = catalog.listTables(TEST_DB); + + assertThat(actual).containsAll(Arrays.asList("public.t2", "test_schema.t3")); + } + + @Test + void testListTables_DatabaseNotExistException() { + assertThatThrownBy(() -> catalog.listTables("postgres/nonexistschema")) + .isInstanceOf(DatabaseNotExistException.class); + } + + @Test + void testTableExists() { + assertThat(catalog.tableExists(new ObjectPath(TEST_DB, "nonexist"))).isFalse(); + + assertThat(catalog.tableExists(new ObjectPath(GaussdbCatalog.DEFAULT_DATABASE, TABLE1))) + .isTrue(); + assertThat(catalog.tableExists(new ObjectPath(TEST_DB, TABLE2))).isTrue(); + assertThat(catalog.tableExists(new ObjectPath(TEST_DB, "test_schema.t3"))).isTrue(); + } + + @Test + void testGetTables_TableNotExistException() { + assertThatThrownBy( + () -> + catalog.getTable( + new ObjectPath( + TEST_DB, + GaussdbTablePath.toFlinkTableName( + TEST_SCHEMA, "anytable")))) + .isInstanceOf(TableNotExistException.class); + } + + @Test + void testGetTables_TableNotExistException_NoSchema() { + assertThatThrownBy( + () -> + catalog.getTable( + new ObjectPath( + TEST_DB, + GaussdbTablePath.toFlinkTableName( + "nonexistschema", "anytable")))) + .isInstanceOf(TableNotExistException.class); + } + + @Test + void testGetTables_TableNotExistException_NoDb() { + assertThatThrownBy( + () -> + catalog.getTable( + new ObjectPath( + "nonexistdb", + GaussdbTablePath.toFlinkTableName( + TEST_SCHEMA, "anytable")))) + .isInstanceOf(TableNotExistException.class); + } + + @Test + void testGetTable() throws org.apache.flink.table.catalog.exceptions.TableNotExistException { + // test postgres.public.user1 + Schema schema = getSimpleTable().schema; + + CatalogBaseTable table = catalog.getTable(new ObjectPath("postgres", TABLE1)); + + assertThat(table.getUnresolvedSchema()).isEqualTo(schema); + + table = catalog.getTable(new ObjectPath("postgres", "public.t1")); + + assertThat(table.getUnresolvedSchema()).isEqualTo(schema); + + // test testdb.public.user2 + table = catalog.getTable(new ObjectPath(TEST_DB, TABLE2)); + + assertThat(table.getUnresolvedSchema()).isEqualTo(schema); + + table = catalog.getTable(new ObjectPath(TEST_DB, "public.t2")); + + assertThat(table.getUnresolvedSchema()).isEqualTo(schema); + + // test testdb.testschema.user2 + table = catalog.getTable(new ObjectPath(TEST_DB, TEST_SCHEMA + ".t3")); + + assertThat(table.getUnresolvedSchema()).isEqualTo(schema); + } + + @Test + void testPrimitiveDataTypes() throws TableNotExistException { + CatalogBaseTable table = + catalog.getTable( + new ObjectPath(GaussdbCatalog.DEFAULT_DATABASE, TABLE_PRIMITIVE_TYPE)); + + assertThat(table.getUnresolvedSchema()).isEqualTo(getPrimitiveTable().schema); + } + + @Test + void testArrayDataTypes() throws TableNotExistException { + CatalogBaseTable table = + catalog.getTable(new ObjectPath(GaussdbCatalog.DEFAULT_DATABASE, TABLE_ARRAY_TYPE)); + + assertThat(table.getUnresolvedSchema()).isEqualTo(getArrayTable().schema); + } + + @Test + public void testSerialDataTypes() throws TableNotExistException { + CatalogBaseTable table = + catalog.getTable( + new ObjectPath(GaussdbCatalog.DEFAULT_DATABASE, TABLE_SERIAL_TYPE)); + + assertThat(table.getUnresolvedSchema()).isEqualTo(getSerialTable().schema); + } +} diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogTestBase.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogTestBase.java new file mode 100644 index 000000000..631222e60 --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalogTestBase.java @@ -0,0 +1,399 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.database.catalog; + +import org.apache.flink.connector.jdbc.gaussdb.GaussdbTestBase; +import org.apache.flink.connector.jdbc.gaussdb.testutils.GaussdbDatabase; +import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata; +import org.apache.flink.connector.jdbc.testutils.JdbcITCaseBase; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.types.logical.DecimalType; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; + +/** + * Test base for {@link GaussdbCatalog}. + * + *

Notes: The source code is based on PostgresCatalogTestBase. + */ +class GaussdbCatalogTestBase implements JdbcITCaseBase, GaussdbTestBase { + + private static DatabaseMetadata getStaticMetadata() { + return GaussdbDatabase.getMetadata(); + } + + protected static final String TEST_CATALOG_NAME = "mypg"; + protected static final String TEST_USERNAME = getStaticMetadata().getUsername(); + protected static final String TEST_PWD = getStaticMetadata().getPassword(); + protected static final String TEST_DB = "test"; + protected static final String TEST_SCHEMA = "test_schema"; + protected static final String TABLE1 = "t1"; + protected static final String TABLE2 = "t2"; + protected static final String TABLE3 = "t3"; + protected static final String TABLE4 = "t4"; + protected static final String TABLE5 = "t5"; + protected static final String TABLE_PRIMITIVE_TYPE = "primitive_table"; + protected static final String TABLE_PRIMITIVE_TYPE2 = "primitive_table2"; + protected static final String TABLE_ARRAY_TYPE = "array_table"; + protected static final String TABLE_SERIAL_TYPE = "serial_table"; + + protected static String baseUrl; + protected static GaussdbCatalog catalog; + + @BeforeAll + static void init() throws SQLException { + + String jdbcUrl = getStaticMetadata().getJdbcUrl(); + baseUrl = jdbcUrl.substring(0, jdbcUrl.lastIndexOf("/")); + + catalog = + new GaussdbCatalog( + Thread.currentThread().getContextClassLoader(), + TEST_CATALOG_NAME, + GaussdbCatalog.DEFAULT_DATABASE, + TEST_USERNAME, + TEST_PWD, + baseUrl); + + // create test database and schema + createDatabase(TEST_DB); + createSchema(TEST_DB, TEST_SCHEMA); + + // create test tables + // table: postgres.public.t1 + // table: postgres.public.t4 + // table: postgres.public.t5 + createTable(GaussdbTablePath.fromFlinkTableName(TABLE1), getSimpleTable().pgSchemaSql); + createTable(GaussdbTablePath.fromFlinkTableName(TABLE4), getSimpleTable().pgSchemaSql); + createTable(GaussdbTablePath.fromFlinkTableName(TABLE5), getSimpleTable().pgSchemaSql); + + // table: test.public.t2 + // table: test.test_schema.t3 + // table: postgres.public.dt + // table: postgres.public.dt2 + createTable( + TEST_DB, GaussdbTablePath.fromFlinkTableName(TABLE2), getSimpleTable().pgSchemaSql); + createTable( + TEST_DB, new GaussdbTablePath(TEST_SCHEMA, TABLE3), getSimpleTable().pgSchemaSql); + createTable( + GaussdbTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE), + getPrimitiveTable().pgSchemaSql); + createTable( + GaussdbTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE2), + getPrimitiveTable("test_pk2").pgSchemaSql); + createTable( + GaussdbTablePath.fromFlinkTableName(TABLE_ARRAY_TYPE), getArrayTable().pgSchemaSql); + createTable( + GaussdbTablePath.fromFlinkTableName(TABLE_SERIAL_TYPE), + getSerialTable().pgSchemaSql); + + executeSQL( + GaussdbCatalog.DEFAULT_DATABASE, + String.format( + "insert into public.%s values (%s);", TABLE1, getSimpleTable().values)); + executeSQL( + GaussdbCatalog.DEFAULT_DATABASE, + String.format( + "insert into %s values (%s);", + TABLE_PRIMITIVE_TYPE, getPrimitiveTable().values)); + executeSQL( + GaussdbCatalog.DEFAULT_DATABASE, + String.format( + "insert into %s values (%s);", TABLE_ARRAY_TYPE, getArrayTable().values)); + executeSQL( + GaussdbCatalog.DEFAULT_DATABASE, + String.format( + "insert into %s values (%s);", TABLE_SERIAL_TYPE, getSerialTable().values)); + } + + @AfterAll + static void afterAll() throws SQLException { + executeSQL(TEST_DB, String.format("DROP SCHEMA %s CASCADE", TEST_SCHEMA)); + executeSQL( + TEST_DB, + String.format("DROP TABLE %s ", GaussdbTablePath.fromFlinkTableName(TABLE2))); + + executeSQL( + GaussdbCatalog.DEFAULT_DATABASE, + String.format("DROP TABLE %s ", GaussdbTablePath.fromFlinkTableName(TABLE1))); + executeSQL( + GaussdbCatalog.DEFAULT_DATABASE, + String.format("DROP TABLE %s ", GaussdbTablePath.fromFlinkTableName(TABLE4))); + executeSQL( + GaussdbCatalog.DEFAULT_DATABASE, + String.format("DROP TABLE %s ", GaussdbTablePath.fromFlinkTableName(TABLE5))); + executeSQL( + GaussdbCatalog.DEFAULT_DATABASE, + String.format( + "DROP TABLE %s ", + GaussdbTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE))); + executeSQL( + GaussdbCatalog.DEFAULT_DATABASE, + String.format( + "DROP TABLE %s ", + GaussdbTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE2))); + executeSQL( + GaussdbCatalog.DEFAULT_DATABASE, + String.format( + "DROP TABLE %s ", GaussdbTablePath.fromFlinkTableName(TABLE_ARRAY_TYPE))); + executeSQL( + GaussdbCatalog.DEFAULT_DATABASE, + String.format( + "DROP TABLE %s ", GaussdbTablePath.fromFlinkTableName(TABLE_SERIAL_TYPE))); + + executeSQL(GaussdbCatalog.DEFAULT_DATABASE, "drop database " + TEST_DB + ";"); + } + + public static void createTable(GaussdbTablePath tablePath, String tableSchemaSql) + throws SQLException { + executeSQL( + GaussdbCatalog.DEFAULT_DATABASE, + String.format("CREATE TABLE %s(%s);", tablePath.getFullPath(), tableSchemaSql)); + } + + public static void createTable(String db, GaussdbTablePath tablePath, String tableSchemaSql) + throws SQLException { + executeSQL( + db, String.format("CREATE TABLE %s(%s);", tablePath.getFullPath(), tableSchemaSql)); + } + + public static void createSchema(String db, String schema) throws SQLException { + executeSQL(db, String.format("CREATE SCHEMA %s", schema)); + } + + public static void createDatabase(String database) throws SQLException { + executeSQL(GaussdbCatalog.DEFAULT_DATABASE, String.format("CREATE DATABASE %s;", database)); + } + + public static void executeSQL(String sql) throws SQLException { + executeSQL("", sql); + } + + public static void executeSQL(String db, String sql) throws SQLException { + try (Connection conn = + DriverManager.getConnection( + String.format("%s/%s", baseUrl, db), TEST_USERNAME, TEST_PWD); + Statement statement = conn.createStatement()) { + statement.executeUpdate(sql); + } catch (SQLException e) { + throw e; + } + } + + /** Object holding schema and corresponding sql. */ + public static class TestTable { + Schema schema; + String pgSchemaSql; + String values; + + public TestTable(Schema schema, String pgSchemaSql, String values) { + this.schema = schema; + this.pgSchemaSql = pgSchemaSql; + this.values = values; + } + } + + public static TestTable getSimpleTable() { + return new TestTable( + Schema.newBuilder().column("id", DataTypes.INT()).build(), "id integer", "1"); + } + + // posgres doesn't support to use the same primary key name across different tables, + // make the table parameterized to resolve this problem. + public static TestTable getPrimitiveTable() { + return getPrimitiveTable("test_pk"); + } + + // TODO: add back timestamptz and time types. + // Flink currently doesn't support converting time's precision, with the following error + // TableException: Unsupported conversion from data type 'TIME(6)' (conversion class: + // java.sql.Time) + // to type information. Only data types that originated from type information fully support a + // reverse conversion. + public static TestTable getPrimitiveTable(String primaryKeyName) { + return new TestTable( + Schema.newBuilder() + .column("int", DataTypes.INT().notNull()) + .column("bytea", DataTypes.BYTES()) + .column("short", DataTypes.SMALLINT().notNull()) + .column("long", DataTypes.BIGINT()) + .column("real", DataTypes.FLOAT()) + .column("double_precision", DataTypes.DOUBLE()) + .column("numeric", DataTypes.DECIMAL(10, 5)) + .column("decimal", DataTypes.DECIMAL(10, 1)) + .column("boolean", DataTypes.BOOLEAN()) + .column("text", DataTypes.STRING()) + .column("char", DataTypes.CHAR(1)) + .column("character", DataTypes.CHAR(3)) + .column("character_varying", DataTypes.VARCHAR(20)) + .column("timestamp", DataTypes.TIMESTAMP(5)) + // .field("timestamptz", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(4)) + .column("date", DataTypes.TIMESTAMP(0)) + .column("time", DataTypes.TIME(0)) + .column("default_numeric", DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18)) + .primaryKeyNamed(primaryKeyName, "short", "int") + .build(), + "int integer, " + + "bytea bytea, " + + "short smallint, " + + "long bigint, " + + "real real, " + + "double_precision double precision, " + + "numeric numeric(10, 5), " + + "decimal decimal(10, 1), " + + "boolean boolean, " + + "text text, " + + "char char, " + + "character character(3), " + + "character_varying character varying(20), " + + "timestamp timestamp(5), " + + + // "timestamptz timestamptz(4), " + + "date timestamp(0)," + + "time time(0), " + + "default_numeric numeric, " + + "CONSTRAINT " + + primaryKeyName + + " PRIMARY KEY (short, int)", + "1," + + "'2'," + + "3," + + "4," + + "5.5," + + "6.6," + + "7.7," + + "8.8," + + "true," + + "'a'," + + "'b'," + + "'c'," + + "'d'," + + "'2016-06-22 19:10:25'," + + + // "'2006-06-22 19:10:25'," + + "'2015-01-01'," + + "'00:51:02.746572', " + + "500"); + } + + // TODO: add back timestamptz once planner supports timestamp with timezone + public static TestTable getArrayTable() { + return new TestTable( + Schema.newBuilder() + .column("int_arr", DataTypes.ARRAY(DataTypes.INT())) + .column("bytea_arr", DataTypes.ARRAY(DataTypes.BYTES())) + .column("short_arr", DataTypes.ARRAY(DataTypes.SMALLINT())) + .column("long_arr", DataTypes.ARRAY(DataTypes.BIGINT())) + .column("real_arr", DataTypes.ARRAY(DataTypes.FLOAT())) + .column("double_precision_arr", DataTypes.ARRAY(DataTypes.DOUBLE())) + .column("numeric_arr", DataTypes.ARRAY(DataTypes.DECIMAL(10, 5))) + .column( + "numeric_arr_default", + DataTypes.ARRAY(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18))) + .column("decimal_arr", DataTypes.ARRAY(DataTypes.DECIMAL(10, 2))) + .column("boolean_arr", DataTypes.ARRAY(DataTypes.BOOLEAN())) + .column("text_arr", DataTypes.ARRAY(DataTypes.STRING())) + .column("char_arr", DataTypes.ARRAY(DataTypes.CHAR(1))) + .column("character_arr", DataTypes.ARRAY(DataTypes.CHAR(3))) + .column("character_varying_arr", DataTypes.ARRAY(DataTypes.VARCHAR(20))) + .column("timestamp_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP(5))) + // .field("timestamptz_arr", + // DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(4))) + .column("date_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP(0))) + .column("time_arr", DataTypes.ARRAY(DataTypes.TIME(0))) + .column("null_bytea_arr", DataTypes.ARRAY(DataTypes.BYTES())) + .column("null_text_arr", DataTypes.ARRAY(DataTypes.STRING())) + .build(), + "int_arr integer[], " + + "bytea_arr bytea[], " + + "short_arr smallint[], " + + "long_arr bigint[], " + + "real_arr real[], " + + "double_precision_arr double precision[], " + + "numeric_arr numeric(10, 5)[], " + + "numeric_arr_default numeric[], " + + "decimal_arr decimal(10,2)[], " + + "boolean_arr boolean[], " + + "text_arr text[], " + + "char_arr char[], " + + "character_arr character(3)[], " + + "character_varying_arr character varying(20)[], " + + "timestamp_arr timestamp(5)[], " + + + // "timestamptz_arr timestamptz(4)[], " + + "date_arr timestamp(0)[], " + + "time_arr time(0)[], " + + "null_bytea_arr bytea[], " + + "null_text_arr text[]", + String.format( + "'{1,2,3}'," + + "'{2,3,4}'," + + "'{3,4,5}'," + + "'{4,5,6}'," + + "'{5.5,6.6,7.7}'," + + "'{6.6,7.7,8.8}'," + + "'{7.7,8.8,9.9}'," + + "'{8.8,9.9,10.10}'," + + "'{9.9,10.10,11.11}'," + + "'{true,false,true}'," + + "'{a,b,c}'," + + "'{b,c,d}'," + + "'{b,c,d}'," + + "'{b,c,d}'," + + "'{\"2016-06-22 19:10:25\", \"2019-06-22 19:10:25\"}'," + + + // "'{\"2006-06-22 19:10:25\", \"2009-06-22 19:10:25\"}'," + + "'{\"2015-01-01\", \"2020-01-01\"}'," + + "'{\"00:51:02.746572\", \"00:59:02.746572\"}'," + + "NULL," + + "NULL")); + } + + public static TestTable getSerialTable() { + return new TestTable( + Schema.newBuilder() + // serial fields are returned as not null by ResultSetMetaData.columnNoNulls + .column("f0", DataTypes.SMALLINT().notNull()) + .column("f1", DataTypes.INT().notNull()) + .column("f2", DataTypes.SMALLINT().notNull()) + .column("f3", DataTypes.INT().notNull()) + .column("f4", DataTypes.BIGINT().notNull()) + .column("f5", DataTypes.BIGINT().notNull()) + .build(), + "f0 smallserial, " + + "f1 serial, " + + "f2 serial2, " + + "f3 serial4, " + + "f4 serial8, " + + "f5 bigserial", + "32767," + + "2147483647," + + "32767," + + "2147483647," + + "9223372036854775807," + + "9223372036854775807"); + } +} diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTablePathTest.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTablePathTest.java new file mode 100644 index 000000000..cbdcd360f --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTablePathTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.database.catalog; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Test for {@link GaussdbTablePath}. + * + *

Notes: The source code is based on PostgresTablePathTest. + */ +class GaussdbTablePathTest { + @Test + void testToFlinkTableName() { + assertThat(GaussdbTablePath.toFlinkTableName("my_schema", "my_table")) + .isEqualTo("my_schema.my_table"); + assertThat(GaussdbTablePath.toFlinkTableName("postgres.my_schema", "my_table")) + .isEqualTo("postgres.my_schema.my_table"); + assertThatThrownBy(() -> GaussdbTablePath.toFlinkTableName("", "my_table")) + .isExactlyInstanceOf(IllegalArgumentException.class) + .hasMessage("Schema name is not valid. Null or empty is not allowed"); + } + + @Test + void testFromFlinkTableName() { + assertThat(GaussdbTablePath.fromFlinkTableName("my_schema.my_table")) + .isEqualTo(new GaussdbTablePath("my_schema", "my_table")); + assertThat(GaussdbTablePath.fromFlinkTableName("my_table")) + .isEqualTo(new GaussdbTablePath("public", "my_table")); + assertThatThrownBy(() -> GaussdbTablePath.fromFlinkTableName("postgres.public.my_table")) + .isExactlyInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Table name 'postgres.public.my_table' is not valid. The parsed length is 3"); + assertThatThrownBy(() -> GaussdbTablePath.fromFlinkTableName("")) + .isExactlyInstanceOf(IllegalArgumentException.class) + .hasMessage("Table name is not valid. Null or empty is not allowed"); + } +} diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectTest.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectTest.java new file mode 100644 index 000000000..0bac34a5d --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.database.dialect; + +import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectTest; +import org.apache.flink.connector.jdbc.gaussdb.GaussdbTestBase; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * The PostgresSql params for {@link JdbcDialectTest}. + * + *

Notes: The source code is based on PostgresDialectTest. + */ +class GaussdbDialectTest extends JdbcDialectTest implements GaussdbTestBase { + + @Override + protected List testData() { + return Arrays.asList( + createTestItem("CHAR"), + createTestItem("VARCHAR"), + createTestItem("BOOLEAN"), + createTestItem("TINYINT"), + createTestItem("SMALLINT"), + createTestItem("INTEGER"), + createTestItem("BIGINT"), + createTestItem("FLOAT"), + createTestItem("DOUBLE"), + createTestItem("DECIMAL(10, 4)"), + createTestItem("DECIMAL(38, 18)"), + createTestItem("DATE"), + createTestItem("TIME"), + createTestItem("TIMESTAMP(3)"), + createTestItem("TIMESTAMP WITHOUT TIME ZONE"), + createTestItem("VARBINARY"), + createTestItem("ARRAY"), + + // Not valid data + createTestItem("BINARY", "The Gaussdb dialect doesn't support type: BINARY(1)."), + createTestItem( + "VARBINARY(10)", + "The Gaussdb dialect doesn't support type: VARBINARY(10)."), + createTestItem( + "TIMESTAMP(9) WITHOUT TIME ZONE", + "The precision of field 'f0' is out of the TIMESTAMP precision range [0, 6] supported by Gaussdb dialect."), + createTestItem("TIMESTAMP_LTZ(3)", "Unsupported type:TIMESTAMP_LTZ(3)")); + } + + @Test + void testUpsertStatement() { + GaussdbDialect dialect = new GaussdbDialect(); + final String tableName = "tbl"; + final String[] fieldNames = { + "id", "name", "email", "ts", "field1", "field_2", "__field_3__" + }; + final String[] doUpdatekeyFields = {"id", "__field_3__"}; + final String[] doNothingkeyFields = { + "id", "name", "email", "ts", "field1", "field_2", "__field_3__" + }; + + assertThat(dialect.getUpsertStatement(tableName, fieldNames, doUpdatekeyFields).get()) + .isEqualTo( + "INSERT INTO tbl(id, name, email, ts, field1, field_2, __field_3__) VALUES (:id, :name, :email, :ts, :field1, :field_2, :__field_3__) ON DUPLICATE KEY UPDATE name=values(name), email=values(email), ts=values(ts), field1=values(field1), field_2=values(field_2)"); + assertThat(dialect.getUpsertStatement(tableName, fieldNames, doNothingkeyFields).get()) + .isEqualTo( + "INSERT INTO tbl(id, name, email, ts, field1, field_2, __field_3__) VALUES (:id, :name, :email, :ts, :field1, :field_2, :__field_3__) ON DUPLICATE KEY UPDATE "); + } +} diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/table/GaussdbDynamicTableSinkITCase.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/table/GaussdbDynamicTableSinkITCase.java new file mode 100644 index 000000000..a1ebf9ccb --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/table/GaussdbDynamicTableSinkITCase.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.table; + +import org.apache.flink.connector.jdbc.core.table.sink.JdbcDynamicTableSinkITCase; +import org.apache.flink.connector.jdbc.gaussdb.GaussdbTestBase; +import org.apache.flink.connector.jdbc.gaussdb.database.dialect.GaussdbDialect; + +/** + * The Table Sink ITCase for {@link GaussdbDialect}. + * + *

Notes: The source code is based on PostgresDynamicTableSinkITCase. + */ +class GaussdbDynamicTableSinkITCase extends JdbcDynamicTableSinkITCase implements GaussdbTestBase {} diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/table/GaussdbDynamicTableSourceITCase.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/table/GaussdbDynamicTableSourceITCase.java new file mode 100644 index 000000000..e14285b0f --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/table/GaussdbDynamicTableSourceITCase.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.table; + +import org.apache.flink.connector.jdbc.core.table.source.JdbcDynamicTableSourceITCase; +import org.apache.flink.connector.jdbc.gaussdb.GaussdbTestBase; +import org.apache.flink.connector.jdbc.gaussdb.database.dialect.GaussdbDialect; +import org.apache.flink.connector.jdbc.testutils.tables.TableRow; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.types.Row; + +import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.tableRow; + +/** + * The Table Source ITCase for {@link GaussdbDialect}. + * + *

Notes: The source code is based on PostgresDynamicTableSourceITCase. + */ +class GaussdbDynamicTableSourceITCase extends JdbcDynamicTableSourceITCase + implements GaussdbTestBase { + + @Override + protected TableRow createInputTable() { + return tableRow( + "jdbDynamicTableSource", + field("id", DataTypes.BIGINT().notNull()), + field("decimal_col", DataTypes.DECIMAL(10, 4)), + field("timestamp6_col", DataTypes.TIMESTAMP(6)), + // other fields + field("real_col", dbType("REAL"), DataTypes.FLOAT()), + field("double_col", dbType("DOUBLE PRECISION"), DataTypes.DOUBLE()), + field("time_col", dbType("TIME"), DataTypes.TIME())); + } + + protected List getTestData() { + return Arrays.asList( + Row.of( + 1L, + BigDecimal.valueOf(100.1234), + LocalDateTime.parse("2020-01-01T15:35:00.123456"), + 1.175E-37F, + 1.79769E308D, + LocalTime.parse("15:35")), + Row.of( + 2L, + BigDecimal.valueOf(101.1234), + LocalDateTime.parse("2020-01-01T15:36:01.123456"), + -1.175E-37F, + -1.79769E308, + LocalTime.parse("15:36:01"))); + } +} diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussDBContainer.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussDBContainer.java new file mode 100644 index 000000000..94c877b7e --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussDBContainer.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.testutils; + +import org.jetbrains.annotations.NotNull; +import org.testcontainers.containers.JdbcDatabaseContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.containers.wait.strategy.WaitStrategy; +import org.testcontainers.containers.wait.strategy.WaitStrategyTarget; +import org.testcontainers.utility.DockerImageName; + +import java.time.Duration; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * Testcontainers implementation for GaussDB. Supported images: {@code + * opengauss/opengauss:7.0.0-RC1.B023}, {@code pgvector/pgvector} Exposed ports: 8000 + * + *

Notes: The source code is based on PostgresContainer. + */ +public class GaussDBContainer> + extends JdbcDatabaseContainer { + + public static final String NAME = "gaussdb"; + + public static final String IMAGE = "opengauss/opengauss:7.0.0-RC1.B023"; + + public static final String DEFAULT_TAG = "7.0.0-RC1.B023"; + + private static final DockerImageName DEFAULT_IMAGE_NAME = + DockerImageName.parse("opengauss/opengauss:7.0.0-RC1.B023") + .asCompatibleSubstituteFor("gaussdb"); + + public static final Integer GAUSSDB_PORT = 8000; + + public static final String DEFAULT_USER_NAME = "flink_jdbc_test"; + + public static final String DEFAULT_PASSWORD = "Flink_jdbc_test@123"; + + private String databaseName = "postgres"; + + private String username = DEFAULT_USER_NAME; + + private String password = DEFAULT_PASSWORD; + + /** + * @deprecated use {@link #GaussDBContainer(DockerImageName)} or {@link + * #GaussDBContainer(String)} instead + */ + @Deprecated + public GaussDBContainer() { + this(DEFAULT_IMAGE_NAME.withTag(DEFAULT_TAG)); + } + + public GaussDBContainer(final String dockerImageName) { + this(DockerImageName.parse(dockerImageName)); + } + + public GaussDBContainer(final DockerImageName dockerImageName) { + super(dockerImageName); + setWaitStrategy( + new WaitStrategy() { + @Override + public void waitUntilReady(WaitStrategyTarget waitStrategyTarget) { + Wait.forListeningPort().waitUntilReady(waitStrategyTarget); + try { + // Open Gauss will set up users and password when ports are ready. + Wait.forLogMessage(".*gs_ctl stopped.*", 1) + .waitUntilReady(waitStrategyTarget); + // Not enough and no idea + TimeUnit.SECONDS.sleep(3); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public WaitStrategy withStartupTimeout(Duration duration) { + return waitStrategy.withStartupTimeout(duration); + } + }); + addExposedPort(GAUSSDB_PORT); + } + + /** + * @return the ports on which to check if the container is ready + * @deprecated use {@link #getLivenessCheckPortNumbers()} instead + */ + @NotNull + @Override + @Deprecated + protected Set getLivenessCheckPorts() { + return super.getLivenessCheckPorts(); + } + + @Override + protected void configure() { + // Disable Postgres driver use of java.util.logging to reduce noise at startup time + withUrlParam("loggerLevel", "OFF"); + withDatabaseName(databaseName); + addEnv("GS_PORT", String.valueOf(GAUSSDB_PORT)); + addEnv("GS_USERNAME", username); + addEnv("GS_PASSWORD", password); + } + + @Override + public String getDriverClassName() { + return "com.huawei.gaussdb.jdbc.Driver"; + } + + @Override + public String getJdbcUrl() { + String additionalUrlParams = constructUrlParameters("?", "&"); + return ("jdbc:gaussdb://" + + getHost() + + ":" + + getMappedPort(GAUSSDB_PORT) + + "/" + + databaseName + + additionalUrlParams); + } + + @Override + public String getDatabaseName() { + return databaseName; + } + + @Override + public String getUsername() { + return username; + } + + @Override + public String getPassword() { + return password; + } + + @Override + public String getTestQueryString() { + return "SELECT 1"; + } + + @Override + public SELF withDatabaseName(final String databaseName) { + this.databaseName = databaseName; + return self(); + } + + @Override + public SELF withUsername(final String username) { + this.username = username; + return self(); + } + + @Override + public SELF withPassword(final String password) { + this.password = password; + return self(); + } + + @Override + protected void waitUntilContainerStarted() { + getWaitStrategy().waitUntilReady(this); + } +} diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbDatabase.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbDatabase.java new file mode 100644 index 000000000..dca1b677c --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbDatabase.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.testutils; + +import org.apache.flink.connector.jdbc.testutils.DatabaseExtension; +import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata; +import org.apache.flink.connector.jdbc.testutils.DatabaseResource; +import org.apache.flink.connector.jdbc.testutils.resources.DockerResource; +import org.apache.flink.util.FlinkRuntimeException; + +import org.testcontainers.utility.DockerImageName; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A Gaussdb database for testing. + * + *

Notes: The source code is based on PostgresDatabase. + */ +public class GaussdbDatabase extends DatabaseExtension implements GaussdbImages { + + private static final GaussDBContainer CONTAINER = + new GaussdbXaContainer(IMAGE).withMaxConnections(10).withMaxTransactions(50); + + private static final DockerResource RESOURCE = new DockerResource(CONTAINER); + + private static GaussdbMetadata metadata; + + public static GaussdbMetadata getMetadata() { + if (!CONTAINER.isRunning()) { + throw new FlinkRuntimeException("Container is stopped."); + } + if (metadata == null) { + metadata = new GaussdbMetadata(CONTAINER, true); + } + return metadata; + } + + protected DatabaseMetadata getMetadataDB() { + return getMetadata(); + } + + @Override + protected DatabaseResource getResource() { + return RESOURCE; + } + + /** {@link GaussDBContainer} with XA enabled (by setting max_prepared_transactions). */ + public static class GaussdbXaContainer extends GaussDBContainer { + private static final int SUPERUSER_RESERVED_CONNECTIONS = 1; + private volatile boolean started = false; + + public GaussdbXaContainer(String dockerImageName) { + super(DockerImageName.parse(dockerImageName)); + } + + public GaussdbXaContainer withMaxConnections(int maxConnections) { + checkArgument( + maxConnections > SUPERUSER_RESERVED_CONNECTIONS, + "maxConnections should be greater than superuser_reserved_connections"); + return this.self(); + } + + public GaussdbXaContainer withMaxTransactions(int maxTransactions) { + checkArgument(maxTransactions > 1, "maxTransactions should be greater 1"); + return this.self(); + } + + @Override + public void start() { + synchronized (this) { + if (!started) { + super.start(); + started = true; + } + } + } + } +} diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbImages.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbImages.java new file mode 100644 index 000000000..e79eaa915 --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbImages.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.testutils; + +/** + * Gaussdb docker images. + * + *

Notes: The source code is based on PostgresImages. + */ +public interface GaussdbImages { + String IMAGE = "opengauss/opengauss:7.0.0-RC1.B023"; +} diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbMetadata.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbMetadata.java new file mode 100644 index 000000000..f7a1eec0e --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbMetadata.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.testutils; + +import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata; + +import com.huawei.gaussdb.jdbc.util.PSQLException; +import com.huawei.gaussdb.jdbc.xa.PGXADataSource; +import org.testcontainers.containers.JdbcDatabaseContainer; + +import javax.sql.XADataSource; + +/** + * Gaussdb Metadata. + * + *

Notes: The source code is based on PostgresMetadata. + */ +public class GaussdbMetadata implements DatabaseMetadata { + + private final String username; + private final String password; + private final String url; + private final String driver; + private final String version; + private final boolean xaEnabled; + + public GaussdbMetadata(GaussDBContainer container) { + this(container, false); + } + + public GaussdbMetadata(JdbcDatabaseContainer container, boolean hasXaEnabled) { + this.username = container.getUsername(); + this.password = container.getPassword(); + this.url = container.getJdbcUrl(); + this.driver = container.getDriverClassName(); + this.version = container.getDockerImageName(); + this.xaEnabled = hasXaEnabled; + } + + @Override + public String getJdbcUrl() { + return this.url; + } + + @Override + public String getJdbcUrlWithCredentials() { + return String.format("%s&user=%s&password=%s", getJdbcUrl(), getUsername(), getPassword()); + } + + @Override + public String getUsername() { + return this.username; + } + + @Override + public String getPassword() { + return this.password; + } + + @Override + public XADataSource buildXaDataSource() { + if (!xaEnabled) { + throw new UnsupportedOperationException(); + } + + PGXADataSource xaDataSource = new PGXADataSource(); + try { + xaDataSource.setUrl(getJdbcUrl()); + } catch (PSQLException e) { + throw new RuntimeException(e); + } + xaDataSource.setUser(getUsername()); + xaDataSource.setPassword(getPassword()); + return xaDataSource; + } + + @Override + public String getDriverClass() { + return this.driver; + } + + @Override + public String getVersion() { + return version; + } +} diff --git a/flink-connector-jdbc-gaussdb/src/test/resources/log4j2-test.properties b/flink-connector-jdbc-gaussdb/src/test/resources/log4j2-test.properties new file mode 100644 index 000000000..835c2ec9a --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/test/resources/log4j2-test.properties @@ -0,0 +1,28 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = OFF +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n diff --git a/pom.xml b/pom.xml index fe2fc778b..a603a665c 100644 --- a/pom.xml +++ b/pom.xml @@ -53,6 +53,7 @@ under the License. flink-connector-jdbc-postgres flink-connector-jdbc-sqlserver flink-connector-jdbc-trino + flink-connector-jdbc-gaussdb