Notes: The source code is based on PostgresFactory. + */ +@Internal +public class GaussdbFactory implements JdbcFactory { + @Override + public boolean acceptsURL(String url) { + return url.startsWith("jdbc:gaussdb:"); + } + + @Override + public JdbcDialect createDialect() { + return new GaussdbDialect(); + } + + @Override + public JdbcCatalog createCatalog( + ClassLoader classLoader, + String catalogName, + String defaultDatabase, + String username, + String pwd, + String baseUrl) { + return new GaussdbCatalog( + classLoader, catalogName, defaultDatabase, username, pwd, baseUrl); + } +} diff --git a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalog.java b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalog.java new file mode 100644 index 000000000..f04920a32 --- /dev/null +++ b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbCatalog.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.gaussdb.database.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.jdbc.core.database.catalog.AbstractJdbcCatalog; +import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalogTypeMapper; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.compress.utils.Lists; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +import static org.apache.flink.connector.jdbc.JdbcConnectionOptions.getBriefAuthProperties; + +/** + * Catalog for GaussDB. + * + *
Notes: The source code is based on PostgresCatalog.
+ */
+@Internal
+public class GaussdbCatalog extends AbstractJdbcCatalog {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GaussdbCatalog.class);
+
+ public static final String DEFAULT_DATABASE = "postgres";
+
+ // ------ GaussDB default objects that shouldn't be exposed to users ------
+
+ private static final Set Notes: The source code is based on PostgresTablePath.
+ */
+@Internal
+public class GaussdbTablePath {
+
+ private static final String DEFAULT_POSTGRES_SCHEMA_NAME = "public";
+
+ private final String pgSchemaName;
+ private final String pgTableName;
+
+ public GaussdbTablePath(String pgSchemaName, String pgTableName) {
+ checkArgument(
+ !StringUtils.isNullOrWhitespaceOnly(pgSchemaName),
+ "Schema name is not valid. Null or empty is not allowed");
+ checkArgument(
+ !StringUtils.isNullOrWhitespaceOnly(pgTableName),
+ "Table name is not valid. Null or empty is not allowed");
+
+ this.pgSchemaName = pgSchemaName;
+ this.pgTableName = pgTableName;
+ }
+
+ public static GaussdbTablePath fromFlinkTableName(String flinkTableName) {
+ if (flinkTableName.contains(".")) {
+ String[] path = flinkTableName.split("\\.");
+
+ checkArgument(
+ path != null && path.length == 2,
+ String.format(
+ "Table name '%s' is not valid. The parsed length is %d",
+ flinkTableName, path.length));
+
+ return new GaussdbTablePath(path[0], path[1]);
+ } else {
+ return new GaussdbTablePath(getDefaultSchemaName(), flinkTableName);
+ }
+ }
+
+ public static String toFlinkTableName(String schema, String table) {
+ return new GaussdbTablePath(schema, table).getFullPath();
+ }
+
+ public String getFullPath() {
+ return String.format("%s.%s", pgSchemaName, pgTableName);
+ }
+
+ public String getPgTableName() {
+ return pgTableName;
+ }
+
+ public String getPgSchemaName() {
+ return pgSchemaName;
+ }
+
+ protected static String getDefaultSchemaName() {
+ return DEFAULT_POSTGRES_SCHEMA_NAME;
+ }
+
+ @Override
+ public String toString() {
+ return getFullPath();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ GaussdbTablePath that = (GaussdbTablePath) o;
+ return Objects.equals(pgSchemaName, that.pgSchemaName)
+ && Objects.equals(pgTableName, that.pgTableName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(pgSchemaName, pgTableName);
+ }
+}
diff --git a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTypeMapper.java b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTypeMapper.java
new file mode 100644
index 000000000..3c1267843
--- /dev/null
+++ b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTypeMapper.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.gaussdb.database.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalogTypeMapper;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+/**
+ * GaussdbTypeMapper util class.
+ *
+ * Notes: The source code is based on PostgresTypeMapper.
+ */
+@Internal
+public class GaussdbTypeMapper implements JdbcCatalogTypeMapper {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GaussdbTypeMapper.class);
+
+ // GaussDB JDBC driver maps several aliases to real types. We use the real types rather than
+ // serial2 <=> int2
+ // smallserial <=> int2
+ // serial4 <=> serial
+ // serial8 <=> bigserial
+ // smallint <=> int2
+ // integer <=> int4
+ // int <=> int4
+ // bigint <=> int8
+ // float <=> float8
+ // boolean <=> bool
+ // decimal <=> numeric
+ private static final String PG_SMALLSERIAL = "smallserial";
+ protected static final String PG_SERIAL = "serial";
+ protected static final String PG_BIGSERIAL = "bigserial";
+ private static final String PG_BYTEA = "bytea";
+ private static final String PG_BYTEA_ARRAY = "_bytea";
+ private static final String PG_SMALLINT = "int2";
+ private static final String PG_SMALLINT_ARRAY = "_int2";
+ private static final String PG_INTEGER = "int4";
+ private static final String PG_INTEGER_ARRAY = "_int4";
+ private static final String PG_BIGINT = "int8";
+ private static final String PG_BIGINT_ARRAY = "_int8";
+ private static final String PG_REAL = "float4";
+ private static final String PG_REAL_ARRAY = "_float4";
+ private static final String PG_DOUBLE_PRECISION = "float8";
+ private static final String PG_DOUBLE_PRECISION_ARRAY = "_float8";
+ private static final String PG_NUMERIC = "numeric";
+ private static final String PG_NUMERIC_ARRAY = "_numeric";
+ private static final String PG_BOOLEAN = "bool";
+ private static final String PG_BOOLEAN_ARRAY = "_bool";
+ private static final String PG_TIMESTAMP = "timestamp";
+ private static final String PG_TIMESTAMP_ARRAY = "_timestamp";
+ private static final String PG_TIMESTAMPTZ = "timestamptz";
+ private static final String PG_TIMESTAMPTZ_ARRAY = "_timestamptz";
+ private static final String PG_DATE = "date";
+ private static final String PG_DATE_ARRAY = "_date";
+ private static final String PG_TIME = "time";
+ private static final String PG_TIME_ARRAY = "_time";
+ private static final String PG_TEXT = "text";
+ private static final String PG_TEXT_ARRAY = "_text";
+ private static final String PG_CHAR = "bpchar";
+ private static final String PG_CHAR_ARRAY = "_bpchar";
+ private static final String PG_CHARACTER = "character";
+ private static final String PG_CHARACTER_ARRAY = "_character";
+ private static final String PG_CHARACTER_VARYING = "varchar";
+ private static final String PG_CHARACTER_VARYING_ARRAY = "_varchar";
+
+ @Override
+ public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex)
+ throws SQLException {
+ String pgType = metadata.getColumnTypeName(colIndex);
+
+ int precision = metadata.getPrecision(colIndex);
+ int scale = metadata.getScale(colIndex);
+
+ DataType dataType = getMapping(pgType, precision, scale);
+ if (dataType == null) {
+ throw new UnsupportedOperationException(
+ String.format("Doesn't support %s type '%s' yet", getDBType(), pgType));
+ }
+ return dataType;
+ }
+
+ protected DataType getMapping(String pgType, int precision, int scale) {
+ switch (pgType) {
+ case PG_BOOLEAN:
+ return DataTypes.BOOLEAN();
+ case PG_BOOLEAN_ARRAY:
+ return DataTypes.ARRAY(DataTypes.BOOLEAN());
+ case PG_BYTEA:
+ return DataTypes.BYTES();
+ case PG_BYTEA_ARRAY:
+ return DataTypes.ARRAY(DataTypes.BYTES());
+ case PG_SMALLINT:
+ case PG_SMALLSERIAL:
+ return DataTypes.SMALLINT();
+ case PG_SMALLINT_ARRAY:
+ return DataTypes.ARRAY(DataTypes.SMALLINT());
+ case PG_INTEGER:
+ case PG_SERIAL:
+ return DataTypes.INT();
+ case PG_INTEGER_ARRAY:
+ return DataTypes.ARRAY(DataTypes.INT());
+ case PG_BIGINT:
+ case PG_BIGSERIAL:
+ return DataTypes.BIGINT();
+ case PG_BIGINT_ARRAY:
+ return DataTypes.ARRAY(DataTypes.BIGINT());
+ case PG_REAL:
+ return DataTypes.FLOAT();
+ case PG_REAL_ARRAY:
+ return DataTypes.ARRAY(DataTypes.FLOAT());
+ case PG_DOUBLE_PRECISION:
+ return DataTypes.DOUBLE();
+ case PG_DOUBLE_PRECISION_ARRAY:
+ return DataTypes.ARRAY(DataTypes.DOUBLE());
+ case PG_NUMERIC:
+ // handle numeric without explicit precision and scale.
+ if (precision > 0) {
+ return DataTypes.DECIMAL(precision, scale);
+ }
+ return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18);
+ case PG_NUMERIC_ARRAY:
+ // handle numeric without explicit precision and scale.
+ if (precision > 0) {
+ return DataTypes.ARRAY(DataTypes.DECIMAL(precision, scale));
+ }
+ return DataTypes.ARRAY(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18));
+ case PG_CHAR:
+ case PG_CHARACTER:
+ return DataTypes.CHAR(precision);
+ case PG_CHAR_ARRAY:
+ case PG_CHARACTER_ARRAY:
+ return DataTypes.ARRAY(DataTypes.CHAR(precision));
+ case PG_CHARACTER_VARYING:
+ return DataTypes.VARCHAR(precision);
+ case PG_CHARACTER_VARYING_ARRAY:
+ return DataTypes.ARRAY(DataTypes.VARCHAR(precision));
+ case PG_TEXT:
+ return DataTypes.STRING();
+ case PG_TEXT_ARRAY:
+ return DataTypes.ARRAY(DataTypes.STRING());
+ case PG_TIMESTAMP:
+ return DataTypes.TIMESTAMP(scale);
+ case PG_TIMESTAMP_ARRAY:
+ return DataTypes.ARRAY(DataTypes.TIMESTAMP(scale));
+ case PG_TIMESTAMPTZ:
+ return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(scale);
+ case PG_TIMESTAMPTZ_ARRAY:
+ return DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(scale));
+ case PG_TIME:
+ return DataTypes.TIME(scale);
+ case PG_TIME_ARRAY:
+ return DataTypes.ARRAY(DataTypes.TIME(scale));
+ case PG_DATE:
+ return DataTypes.DATE();
+ case PG_DATE_ARRAY:
+ return DataTypes.ARRAY(DataTypes.DATE());
+ default:
+ return null;
+ }
+ }
+
+ protected String getDBType() {
+ return "Gaussdb";
+ }
+}
diff --git a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/CompatibleGaussdbDialect.java b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/CompatibleGaussdbDialect.java
new file mode 100644
index 000000000..00a2330cf
--- /dev/null
+++ b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/CompatibleGaussdbDialect.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.gaussdb.database.dialect;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Optional;
+
+/**
+ * JDBC dialect for GaussDB compatible databases.
+ *
+ * Notes: The source code is based on CompatiblePostgresDialect.
+ */
+@PublicEvolving
+public abstract class CompatibleGaussdbDialect extends GaussdbDialect {
+
+ private static final long serialVersionUID = 1L;
+
+ protected abstract String compatibleDialectName();
+
+ protected abstract CompatibleGaussdbDialectConverter compatibleRowConverter(RowType rowType);
+
+ protected abstract Optional Notes: The source code is based on CompatiblePostgresDialectConverter.
+ */
+@Internal
+public abstract class CompatibleGaussdbDialectConverter extends GaussdbDialectConverter {
+
+ private static final long serialVersionUID = 1L;
+
+ protected CompatibleGaussdbDialectConverter(RowType rowType) {
+ super(rowType);
+ }
+
+ protected abstract String compatibleConverterName();
+
+ @Override
+ public String converterName() {
+ return compatibleConverterName();
+ }
+}
diff --git a/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialect.java b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialect.java
new file mode 100644
index 000000000..3b4c35db5
--- /dev/null
+++ b/flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialect.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.gaussdb.database.dialect;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialect;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * JDBC dialect for Gaussdb.
+ *
+ * Notes: The source code is based on PostgresDialect.
+ */
+@Internal
+public class GaussdbDialect extends AbstractDialect {
+
+ private static final long serialVersionUID = 1L;
+
+ // Define MAX/MIN precision of TIMESTAMP type according to Gaussdb docs:
+
+ private static final int MAX_TIMESTAMP_PRECISION = 6;
+ // TODO: see https://bbs.huaweicloud.com/forum/thread-0234179025026914116-1-1.html
+ private static final int MIN_TIMESTAMP_PRECISION = 0;
+
+ // Define MAX/MIN precision of DECIMAL type according to Gaussdb docs:
+
+ private static final int MAX_DECIMAL_PRECISION = 1000;
+ private static final int MIN_DECIMAL_PRECISION = 1;
+
+ @Override
+ public GaussdbDialectConverter getRowConverter(RowType rowType) {
+ return new GaussdbDialectConverter(rowType);
+ }
+
+ @Override
+ public Optional Notes: The source code is based on PostgresDialectConverter.
+ */
+@Internal
+public class GaussdbDialectConverter extends AbstractDialectConverter {
+
+ private static final long serialVersionUID = 1L;
+
+ protected GaussdbDialectConverter(RowType rowType) {
+ super(rowType);
+ }
+
+ @Override
+ public JdbcDeserializationConverter createInternalConverter(LogicalType type) {
+ LogicalTypeRoot root = type.getTypeRoot();
+
+ if (root == LogicalTypeRoot.VARBINARY) {
+ return val -> {
+ if (val instanceof PGobject) {
+ return pgObjectBytes((PGobject) val);
+ }
+ return val;
+ };
+ }
+ if (root == LogicalTypeRoot.ARRAY) {
+ ArrayType arrayType = (ArrayType) type;
+ return createGaussDBArrayConverter(arrayType);
+ }
+
+ return super.createInternalConverter(type);
+ }
+
+ private Object pgObjectBytes(PGobject val) throws SQLException {
+ return PGbytea.toBytes(val.getValue().getBytes(StandardCharsets.US_ASCII));
+ }
+
+ @Override
+ protected JdbcSerializationConverter createNullableExternalConverter(LogicalType type) {
+ LogicalTypeRoot root = type.getTypeRoot();
+ if (root == LogicalTypeRoot.ARRAY) {
+ // note: Writing ARRAY type is not yet supported by GaussdbQL dialect now.
+ return (val, index, statement) -> {
+ throw new IllegalStateException(
+ String.format(
+ "Writing ARRAY type is not yet supported in JDBC:%s.",
+ converterName()));
+ };
+ } else {
+ return super.createNullableExternalConverter(type);
+ }
+ }
+
+ private JdbcDeserializationConverter createGaussDBArrayConverter(ArrayType arrayType) {
+ final Class> elementClass =
+ LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType());
+ final JdbcDeserializationConverter elementConverter =
+ createNullableInternalConverter(arrayType.getElementType());
+ return val -> {
+ java.sql.Array pgArray = (java.sql.Array) val;
+ Object[] in = (Object[]) pgArray.getArray();
+ final Object[] array = (Object[]) Array.newInstance(elementClass, in.length);
+ for (int i = 0; i < in.length; i++) {
+ array[i] = elementConverter.deserialize(in[i]);
+ }
+ return new GenericArrayData(array);
+ };
+ }
+
+ @Override
+ public String converterName() {
+ return "Gaussdb";
+ }
+}
diff --git a/flink-connector-jdbc-gaussdb/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.core.database.JdbcFactory b/flink-connector-jdbc-gaussdb/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.core.database.JdbcFactory
new file mode 100644
index 000000000..1e14edd1b
--- /dev/null
+++ b/flink-connector-jdbc-gaussdb/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.core.database.JdbcFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.connector.jdbc.gaussdb.database.GaussdbFactory
\ No newline at end of file
diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/GaussdbTestBase.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/GaussdbTestBase.java
new file mode 100644
index 000000000..1355d3cec
--- /dev/null
+++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/GaussdbTestBase.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.gaussdb;
+
+import org.apache.flink.connector.jdbc.gaussdb.testutils.GaussdbDatabase;
+import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.DatabaseTest;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Base class for Postgres testing.
+ *
+ * Notes: The source code is based on PostgresTestBase.
+ */
+@ExtendWith(GaussdbDatabase.class)
+public interface GaussdbTestBase extends DatabaseTest {
+
+ @Override
+ default DatabaseMetadata getMetadata() {
+ return GaussdbDatabase.getMetadata();
+ }
+}
diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/GaussdbFactoryTest.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/GaussdbFactoryTest.java
new file mode 100644
index 000000000..eec9a75fd
--- /dev/null
+++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/GaussdbFactoryTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.gaussdb.database;
+
+import org.apache.flink.connector.jdbc.core.database.JdbcFactoryLoader;
+import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog;
+import org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactory;
+import org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactoryOptions;
+import org.apache.flink.connector.jdbc.gaussdb.GaussdbTestBase;
+import org.apache.flink.connector.jdbc.gaussdb.database.catalog.GaussdbCatalog;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test for {@link JdbcCatalogFactory}.
+ *
+ * Notes: The source code is based on PostgresFactoryTest.
+ */
+class GaussdbFactoryTest implements GaussdbTestBase {
+
+ protected static String baseUrl;
+ protected static JdbcCatalog catalog;
+
+ protected static final String TEST_CATALOG_NAME = "mygs";
+
+ @BeforeEach
+ void setup() {
+ String jdbcUrl = getMetadata().getJdbcUrl();
+ baseUrl = jdbcUrl.substring(0, jdbcUrl.lastIndexOf("/"));
+
+ catalog =
+ JdbcFactoryLoader.loadCatalog(
+ Thread.currentThread().getContextClassLoader(),
+ TEST_CATALOG_NAME,
+ GaussdbCatalog.DEFAULT_DATABASE,
+ getMetadata().getUsername(),
+ getMetadata().getPassword(),
+ baseUrl,
+ null);
+ }
+
+ @Test
+ void test() {
+ final Map Notes: The source code is based on PostgresCatalogITCase.
+ */
+class GaussdbCatalogITCase extends GaussdbCatalogTestBase {
+
+ private TableEnvironment tEnv;
+
+ @BeforeEach
+ void setup() {
+ this.tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+ tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+
+ // use PG catalog
+ tEnv.registerCatalog(TEST_CATALOG_NAME, catalog);
+ tEnv.useCatalog(TEST_CATALOG_NAME);
+ }
+
+ @Test
+ void testSelectField() {
+ List Notes: The source code is based on PostgresCatalogTest.
+ */
+class GaussdbCatalogTest extends GaussdbCatalogTestBase {
+
+ // ------ databases ------
+
+ @Test
+ void testGetDb_DatabaseNotExistException() {
+ assertThatThrownBy(() -> catalog.getDatabase("nonexistent"))
+ .isInstanceOf(DatabaseNotExistException.class)
+ .hasMessageContaining("Database nonexistent does not exist in Catalog");
+ }
+
+ @Test
+ void testListDatabases() {
+ List Notes: The source code is based on PostgresCatalogTestBase.
+ */
+class GaussdbCatalogTestBase implements JdbcITCaseBase, GaussdbTestBase {
+
+ private static DatabaseMetadata getStaticMetadata() {
+ return GaussdbDatabase.getMetadata();
+ }
+
+ protected static final String TEST_CATALOG_NAME = "mypg";
+ protected static final String TEST_USERNAME = getStaticMetadata().getUsername();
+ protected static final String TEST_PWD = getStaticMetadata().getPassword();
+ protected static final String TEST_DB = "test";
+ protected static final String TEST_SCHEMA = "test_schema";
+ protected static final String TABLE1 = "t1";
+ protected static final String TABLE2 = "t2";
+ protected static final String TABLE3 = "t3";
+ protected static final String TABLE4 = "t4";
+ protected static final String TABLE5 = "t5";
+ protected static final String TABLE_PRIMITIVE_TYPE = "primitive_table";
+ protected static final String TABLE_PRIMITIVE_TYPE2 = "primitive_table2";
+ protected static final String TABLE_ARRAY_TYPE = "array_table";
+ protected static final String TABLE_SERIAL_TYPE = "serial_table";
+
+ protected static String baseUrl;
+ protected static GaussdbCatalog catalog;
+
+ @BeforeAll
+ static void init() throws SQLException {
+
+ String jdbcUrl = getStaticMetadata().getJdbcUrl();
+ baseUrl = jdbcUrl.substring(0, jdbcUrl.lastIndexOf("/"));
+
+ catalog =
+ new GaussdbCatalog(
+ Thread.currentThread().getContextClassLoader(),
+ TEST_CATALOG_NAME,
+ GaussdbCatalog.DEFAULT_DATABASE,
+ TEST_USERNAME,
+ TEST_PWD,
+ baseUrl);
+
+ // create test database and schema
+ createDatabase(TEST_DB);
+ createSchema(TEST_DB, TEST_SCHEMA);
+
+ // create test tables
+ // table: postgres.public.t1
+ // table: postgres.public.t4
+ // table: postgres.public.t5
+ createTable(GaussdbTablePath.fromFlinkTableName(TABLE1), getSimpleTable().pgSchemaSql);
+ createTable(GaussdbTablePath.fromFlinkTableName(TABLE4), getSimpleTable().pgSchemaSql);
+ createTable(GaussdbTablePath.fromFlinkTableName(TABLE5), getSimpleTable().pgSchemaSql);
+
+ // table: test.public.t2
+ // table: test.test_schema.t3
+ // table: postgres.public.dt
+ // table: postgres.public.dt2
+ createTable(
+ TEST_DB, GaussdbTablePath.fromFlinkTableName(TABLE2), getSimpleTable().pgSchemaSql);
+ createTable(
+ TEST_DB, new GaussdbTablePath(TEST_SCHEMA, TABLE3), getSimpleTable().pgSchemaSql);
+ createTable(
+ GaussdbTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE),
+ getPrimitiveTable().pgSchemaSql);
+ createTable(
+ GaussdbTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE2),
+ getPrimitiveTable("test_pk2").pgSchemaSql);
+ createTable(
+ GaussdbTablePath.fromFlinkTableName(TABLE_ARRAY_TYPE), getArrayTable().pgSchemaSql);
+ createTable(
+ GaussdbTablePath.fromFlinkTableName(TABLE_SERIAL_TYPE),
+ getSerialTable().pgSchemaSql);
+
+ executeSQL(
+ GaussdbCatalog.DEFAULT_DATABASE,
+ String.format(
+ "insert into public.%s values (%s);", TABLE1, getSimpleTable().values));
+ executeSQL(
+ GaussdbCatalog.DEFAULT_DATABASE,
+ String.format(
+ "insert into %s values (%s);",
+ TABLE_PRIMITIVE_TYPE, getPrimitiveTable().values));
+ executeSQL(
+ GaussdbCatalog.DEFAULT_DATABASE,
+ String.format(
+ "insert into %s values (%s);", TABLE_ARRAY_TYPE, getArrayTable().values));
+ executeSQL(
+ GaussdbCatalog.DEFAULT_DATABASE,
+ String.format(
+ "insert into %s values (%s);", TABLE_SERIAL_TYPE, getSerialTable().values));
+ }
+
+ @AfterAll
+ static void afterAll() throws SQLException {
+ executeSQL(TEST_DB, String.format("DROP SCHEMA %s CASCADE", TEST_SCHEMA));
+ executeSQL(
+ TEST_DB,
+ String.format("DROP TABLE %s ", GaussdbTablePath.fromFlinkTableName(TABLE2)));
+
+ executeSQL(
+ GaussdbCatalog.DEFAULT_DATABASE,
+ String.format("DROP TABLE %s ", GaussdbTablePath.fromFlinkTableName(TABLE1)));
+ executeSQL(
+ GaussdbCatalog.DEFAULT_DATABASE,
+ String.format("DROP TABLE %s ", GaussdbTablePath.fromFlinkTableName(TABLE4)));
+ executeSQL(
+ GaussdbCatalog.DEFAULT_DATABASE,
+ String.format("DROP TABLE %s ", GaussdbTablePath.fromFlinkTableName(TABLE5)));
+ executeSQL(
+ GaussdbCatalog.DEFAULT_DATABASE,
+ String.format(
+ "DROP TABLE %s ",
+ GaussdbTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE)));
+ executeSQL(
+ GaussdbCatalog.DEFAULT_DATABASE,
+ String.format(
+ "DROP TABLE %s ",
+ GaussdbTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE2)));
+ executeSQL(
+ GaussdbCatalog.DEFAULT_DATABASE,
+ String.format(
+ "DROP TABLE %s ", GaussdbTablePath.fromFlinkTableName(TABLE_ARRAY_TYPE)));
+ executeSQL(
+ GaussdbCatalog.DEFAULT_DATABASE,
+ String.format(
+ "DROP TABLE %s ", GaussdbTablePath.fromFlinkTableName(TABLE_SERIAL_TYPE)));
+
+ executeSQL(GaussdbCatalog.DEFAULT_DATABASE, "drop database " + TEST_DB + ";");
+ }
+
+ public static void createTable(GaussdbTablePath tablePath, String tableSchemaSql)
+ throws SQLException {
+ executeSQL(
+ GaussdbCatalog.DEFAULT_DATABASE,
+ String.format("CREATE TABLE %s(%s);", tablePath.getFullPath(), tableSchemaSql));
+ }
+
+ public static void createTable(String db, GaussdbTablePath tablePath, String tableSchemaSql)
+ throws SQLException {
+ executeSQL(
+ db, String.format("CREATE TABLE %s(%s);", tablePath.getFullPath(), tableSchemaSql));
+ }
+
+ public static void createSchema(String db, String schema) throws SQLException {
+ executeSQL(db, String.format("CREATE SCHEMA %s", schema));
+ }
+
+ public static void createDatabase(String database) throws SQLException {
+ executeSQL(GaussdbCatalog.DEFAULT_DATABASE, String.format("CREATE DATABASE %s;", database));
+ }
+
+ public static void executeSQL(String sql) throws SQLException {
+ executeSQL("", sql);
+ }
+
+ public static void executeSQL(String db, String sql) throws SQLException {
+ try (Connection conn =
+ DriverManager.getConnection(
+ String.format("%s/%s", baseUrl, db), TEST_USERNAME, TEST_PWD);
+ Statement statement = conn.createStatement()) {
+ statement.executeUpdate(sql);
+ } catch (SQLException e) {
+ throw e;
+ }
+ }
+
+ /** Object holding schema and corresponding sql. */
+ public static class TestTable {
+ Schema schema;
+ String pgSchemaSql;
+ String values;
+
+ public TestTable(Schema schema, String pgSchemaSql, String values) {
+ this.schema = schema;
+ this.pgSchemaSql = pgSchemaSql;
+ this.values = values;
+ }
+ }
+
+ public static TestTable getSimpleTable() {
+ return new TestTable(
+ Schema.newBuilder().column("id", DataTypes.INT()).build(), "id integer", "1");
+ }
+
+ // posgres doesn't support to use the same primary key name across different tables,
+ // make the table parameterized to resolve this problem.
+ public static TestTable getPrimitiveTable() {
+ return getPrimitiveTable("test_pk");
+ }
+
+ // TODO: add back timestamptz and time types.
+ // Flink currently doesn't support converting time's precision, with the following error
+ // TableException: Unsupported conversion from data type 'TIME(6)' (conversion class:
+ // java.sql.Time)
+ // to type information. Only data types that originated from type information fully support a
+ // reverse conversion.
+ public static TestTable getPrimitiveTable(String primaryKeyName) {
+ return new TestTable(
+ Schema.newBuilder()
+ .column("int", DataTypes.INT().notNull())
+ .column("bytea", DataTypes.BYTES())
+ .column("short", DataTypes.SMALLINT().notNull())
+ .column("long", DataTypes.BIGINT())
+ .column("real", DataTypes.FLOAT())
+ .column("double_precision", DataTypes.DOUBLE())
+ .column("numeric", DataTypes.DECIMAL(10, 5))
+ .column("decimal", DataTypes.DECIMAL(10, 1))
+ .column("boolean", DataTypes.BOOLEAN())
+ .column("text", DataTypes.STRING())
+ .column("char", DataTypes.CHAR(1))
+ .column("character", DataTypes.CHAR(3))
+ .column("character_varying", DataTypes.VARCHAR(20))
+ .column("timestamp", DataTypes.TIMESTAMP(5))
+ // .field("timestamptz", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(4))
+ .column("date", DataTypes.TIMESTAMP(0))
+ .column("time", DataTypes.TIME(0))
+ .column("default_numeric", DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18))
+ .primaryKeyNamed(primaryKeyName, "short", "int")
+ .build(),
+ "int integer, "
+ + "bytea bytea, "
+ + "short smallint, "
+ + "long bigint, "
+ + "real real, "
+ + "double_precision double precision, "
+ + "numeric numeric(10, 5), "
+ + "decimal decimal(10, 1), "
+ + "boolean boolean, "
+ + "text text, "
+ + "char char, "
+ + "character character(3), "
+ + "character_varying character varying(20), "
+ + "timestamp timestamp(5), "
+ +
+ // "timestamptz timestamptz(4), " +
+ "date timestamp(0),"
+ + "time time(0), "
+ + "default_numeric numeric, "
+ + "CONSTRAINT "
+ + primaryKeyName
+ + " PRIMARY KEY (short, int)",
+ "1,"
+ + "'2',"
+ + "3,"
+ + "4,"
+ + "5.5,"
+ + "6.6,"
+ + "7.7,"
+ + "8.8,"
+ + "true,"
+ + "'a',"
+ + "'b',"
+ + "'c',"
+ + "'d',"
+ + "'2016-06-22 19:10:25',"
+ +
+ // "'2006-06-22 19:10:25'," +
+ "'2015-01-01',"
+ + "'00:51:02.746572', "
+ + "500");
+ }
+
+ // TODO: add back timestamptz once planner supports timestamp with timezone
+ public static TestTable getArrayTable() {
+ return new TestTable(
+ Schema.newBuilder()
+ .column("int_arr", DataTypes.ARRAY(DataTypes.INT()))
+ .column("bytea_arr", DataTypes.ARRAY(DataTypes.BYTES()))
+ .column("short_arr", DataTypes.ARRAY(DataTypes.SMALLINT()))
+ .column("long_arr", DataTypes.ARRAY(DataTypes.BIGINT()))
+ .column("real_arr", DataTypes.ARRAY(DataTypes.FLOAT()))
+ .column("double_precision_arr", DataTypes.ARRAY(DataTypes.DOUBLE()))
+ .column("numeric_arr", DataTypes.ARRAY(DataTypes.DECIMAL(10, 5)))
+ .column(
+ "numeric_arr_default",
+ DataTypes.ARRAY(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18)))
+ .column("decimal_arr", DataTypes.ARRAY(DataTypes.DECIMAL(10, 2)))
+ .column("boolean_arr", DataTypes.ARRAY(DataTypes.BOOLEAN()))
+ .column("text_arr", DataTypes.ARRAY(DataTypes.STRING()))
+ .column("char_arr", DataTypes.ARRAY(DataTypes.CHAR(1)))
+ .column("character_arr", DataTypes.ARRAY(DataTypes.CHAR(3)))
+ .column("character_varying_arr", DataTypes.ARRAY(DataTypes.VARCHAR(20)))
+ .column("timestamp_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP(5)))
+ // .field("timestamptz_arr",
+ // DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(4)))
+ .column("date_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP(0)))
+ .column("time_arr", DataTypes.ARRAY(DataTypes.TIME(0)))
+ .column("null_bytea_arr", DataTypes.ARRAY(DataTypes.BYTES()))
+ .column("null_text_arr", DataTypes.ARRAY(DataTypes.STRING()))
+ .build(),
+ "int_arr integer[], "
+ + "bytea_arr bytea[], "
+ + "short_arr smallint[], "
+ + "long_arr bigint[], "
+ + "real_arr real[], "
+ + "double_precision_arr double precision[], "
+ + "numeric_arr numeric(10, 5)[], "
+ + "numeric_arr_default numeric[], "
+ + "decimal_arr decimal(10,2)[], "
+ + "boolean_arr boolean[], "
+ + "text_arr text[], "
+ + "char_arr char[], "
+ + "character_arr character(3)[], "
+ + "character_varying_arr character varying(20)[], "
+ + "timestamp_arr timestamp(5)[], "
+ +
+ // "timestamptz_arr timestamptz(4)[], " +
+ "date_arr timestamp(0)[], "
+ + "time_arr time(0)[], "
+ + "null_bytea_arr bytea[], "
+ + "null_text_arr text[]",
+ String.format(
+ "'{1,2,3}',"
+ + "'{2,3,4}',"
+ + "'{3,4,5}',"
+ + "'{4,5,6}',"
+ + "'{5.5,6.6,7.7}',"
+ + "'{6.6,7.7,8.8}',"
+ + "'{7.7,8.8,9.9}',"
+ + "'{8.8,9.9,10.10}',"
+ + "'{9.9,10.10,11.11}',"
+ + "'{true,false,true}',"
+ + "'{a,b,c}',"
+ + "'{b,c,d}',"
+ + "'{b,c,d}',"
+ + "'{b,c,d}',"
+ + "'{\"2016-06-22 19:10:25\", \"2019-06-22 19:10:25\"}',"
+ +
+ // "'{\"2006-06-22 19:10:25\", \"2009-06-22 19:10:25\"}'," +
+ "'{\"2015-01-01\", \"2020-01-01\"}',"
+ + "'{\"00:51:02.746572\", \"00:59:02.746572\"}',"
+ + "NULL,"
+ + "NULL"));
+ }
+
+ public static TestTable getSerialTable() {
+ return new TestTable(
+ Schema.newBuilder()
+ // serial fields are returned as not null by ResultSetMetaData.columnNoNulls
+ .column("f0", DataTypes.SMALLINT().notNull())
+ .column("f1", DataTypes.INT().notNull())
+ .column("f2", DataTypes.SMALLINT().notNull())
+ .column("f3", DataTypes.INT().notNull())
+ .column("f4", DataTypes.BIGINT().notNull())
+ .column("f5", DataTypes.BIGINT().notNull())
+ .build(),
+ "f0 smallserial, "
+ + "f1 serial, "
+ + "f2 serial2, "
+ + "f3 serial4, "
+ + "f4 serial8, "
+ + "f5 bigserial",
+ "32767,"
+ + "2147483647,"
+ + "32767,"
+ + "2147483647,"
+ + "9223372036854775807,"
+ + "9223372036854775807");
+ }
+}
diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTablePathTest.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTablePathTest.java
new file mode 100644
index 000000000..cbdcd360f
--- /dev/null
+++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/catalog/GaussdbTablePathTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.gaussdb.database.catalog;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Test for {@link GaussdbTablePath}.
+ *
+ * Notes: The source code is based on PostgresTablePathTest.
+ */
+class GaussdbTablePathTest {
+ @Test
+ void testToFlinkTableName() {
+ assertThat(GaussdbTablePath.toFlinkTableName("my_schema", "my_table"))
+ .isEqualTo("my_schema.my_table");
+ assertThat(GaussdbTablePath.toFlinkTableName("postgres.my_schema", "my_table"))
+ .isEqualTo("postgres.my_schema.my_table");
+ assertThatThrownBy(() -> GaussdbTablePath.toFlinkTableName("", "my_table"))
+ .isExactlyInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Schema name is not valid. Null or empty is not allowed");
+ }
+
+ @Test
+ void testFromFlinkTableName() {
+ assertThat(GaussdbTablePath.fromFlinkTableName("my_schema.my_table"))
+ .isEqualTo(new GaussdbTablePath("my_schema", "my_table"));
+ assertThat(GaussdbTablePath.fromFlinkTableName("my_table"))
+ .isEqualTo(new GaussdbTablePath("public", "my_table"));
+ assertThatThrownBy(() -> GaussdbTablePath.fromFlinkTableName("postgres.public.my_table"))
+ .isExactlyInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "Table name 'postgres.public.my_table' is not valid. The parsed length is 3");
+ assertThatThrownBy(() -> GaussdbTablePath.fromFlinkTableName(""))
+ .isExactlyInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Table name is not valid. Null or empty is not allowed");
+ }
+}
diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectTest.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectTest.java
new file mode 100644
index 000000000..0bac34a5d
--- /dev/null
+++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.gaussdb.database.dialect;
+
+import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectTest;
+import org.apache.flink.connector.jdbc.gaussdb.GaussdbTestBase;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * The PostgresSql params for {@link JdbcDialectTest}.
+ *
+ * Notes: The source code is based on PostgresDialectTest.
+ */
+class GaussdbDialectTest extends JdbcDialectTest implements GaussdbTestBase {
+
+ @Override
+ protected List Notes: The source code is based on PostgresDynamicTableSinkITCase.
+ */
+class GaussdbDynamicTableSinkITCase extends JdbcDynamicTableSinkITCase implements GaussdbTestBase {}
diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/table/GaussdbDynamicTableSourceITCase.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/table/GaussdbDynamicTableSourceITCase.java
new file mode 100644
index 000000000..e14285b0f
--- /dev/null
+++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/table/GaussdbDynamicTableSourceITCase.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.gaussdb.table;
+
+import org.apache.flink.connector.jdbc.core.table.source.JdbcDynamicTableSourceITCase;
+import org.apache.flink.connector.jdbc.gaussdb.GaussdbTestBase;
+import org.apache.flink.connector.jdbc.gaussdb.database.dialect.GaussdbDialect;
+import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.types.Row;
+
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType;
+import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field;
+import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.tableRow;
+
+/**
+ * The Table Source ITCase for {@link GaussdbDialect}.
+ *
+ * Notes: The source code is based on PostgresDynamicTableSourceITCase.
+ */
+class GaussdbDynamicTableSourceITCase extends JdbcDynamicTableSourceITCase
+ implements GaussdbTestBase {
+
+ @Override
+ protected TableRow createInputTable() {
+ return tableRow(
+ "jdbDynamicTableSource",
+ field("id", DataTypes.BIGINT().notNull()),
+ field("decimal_col", DataTypes.DECIMAL(10, 4)),
+ field("timestamp6_col", DataTypes.TIMESTAMP(6)),
+ // other fields
+ field("real_col", dbType("REAL"), DataTypes.FLOAT()),
+ field("double_col", dbType("DOUBLE PRECISION"), DataTypes.DOUBLE()),
+ field("time_col", dbType("TIME"), DataTypes.TIME()));
+ }
+
+ protected List Notes: The source code is based on PostgresContainer.
+ */
+public class GaussDBContainer Notes: The source code is based on PostgresDatabase.
+ */
+public class GaussdbDatabase extends DatabaseExtension implements GaussdbImages {
+
+ private static final GaussDBContainer> CONTAINER =
+ new GaussdbXaContainer(IMAGE).withMaxConnections(10).withMaxTransactions(50);
+
+ private static final DockerResource RESOURCE = new DockerResource(CONTAINER);
+
+ private static GaussdbMetadata metadata;
+
+ public static GaussdbMetadata getMetadata() {
+ if (!CONTAINER.isRunning()) {
+ throw new FlinkRuntimeException("Container is stopped.");
+ }
+ if (metadata == null) {
+ metadata = new GaussdbMetadata(CONTAINER, true);
+ }
+ return metadata;
+ }
+
+ protected DatabaseMetadata getMetadataDB() {
+ return getMetadata();
+ }
+
+ @Override
+ protected DatabaseResource getResource() {
+ return RESOURCE;
+ }
+
+ /** {@link GaussDBContainer} with XA enabled (by setting max_prepared_transactions). */
+ public static class GaussdbXaContainer extends GaussDBContainer Notes: The source code is based on PostgresImages.
+ */
+public interface GaussdbImages {
+ String IMAGE = "opengauss/opengauss:7.0.0-RC1.B023";
+}
diff --git a/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbMetadata.java b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbMetadata.java
new file mode 100644
index 000000000..f7a1eec0e
--- /dev/null
+++ b/flink-connector-jdbc-gaussdb/src/test/java/org/apache/flink/connector/jdbc/gaussdb/testutils/GaussdbMetadata.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.gaussdb.testutils;
+
+import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+
+import com.huawei.gaussdb.jdbc.util.PSQLException;
+import com.huawei.gaussdb.jdbc.xa.PGXADataSource;
+import org.testcontainers.containers.JdbcDatabaseContainer;
+
+import javax.sql.XADataSource;
+
+/**
+ * Gaussdb Metadata.
+ *
+ * Notes: The source code is based on PostgresMetadata.
+ */
+public class GaussdbMetadata implements DatabaseMetadata {
+
+ private final String username;
+ private final String password;
+ private final String url;
+ private final String driver;
+ private final String version;
+ private final boolean xaEnabled;
+
+ public GaussdbMetadata(GaussDBContainer> container) {
+ this(container, false);
+ }
+
+ public GaussdbMetadata(JdbcDatabaseContainer> container, boolean hasXaEnabled) {
+ this.username = container.getUsername();
+ this.password = container.getPassword();
+ this.url = container.getJdbcUrl();
+ this.driver = container.getDriverClassName();
+ this.version = container.getDockerImageName();
+ this.xaEnabled = hasXaEnabled;
+ }
+
+ @Override
+ public String getJdbcUrl() {
+ return this.url;
+ }
+
+ @Override
+ public String getJdbcUrlWithCredentials() {
+ return String.format("%s&user=%s&password=%s", getJdbcUrl(), getUsername(), getPassword());
+ }
+
+ @Override
+ public String getUsername() {
+ return this.username;
+ }
+
+ @Override
+ public String getPassword() {
+ return this.password;
+ }
+
+ @Override
+ public XADataSource buildXaDataSource() {
+ if (!xaEnabled) {
+ throw new UnsupportedOperationException();
+ }
+
+ PGXADataSource xaDataSource = new PGXADataSource();
+ try {
+ xaDataSource.setUrl(getJdbcUrl());
+ } catch (PSQLException e) {
+ throw new RuntimeException(e);
+ }
+ xaDataSource.setUser(getUsername());
+ xaDataSource.setPassword(getPassword());
+ return xaDataSource;
+ }
+
+ @Override
+ public String getDriverClass() {
+ return this.driver;
+ }
+
+ @Override
+ public String getVersion() {
+ return version;
+ }
+}
diff --git a/flink-connector-jdbc-gaussdb/src/test/resources/log4j2-test.properties b/flink-connector-jdbc-gaussdb/src/test/resources/log4j2-test.properties
new file mode 100644
index 000000000..835c2ec9a
--- /dev/null
+++ b/flink-connector-jdbc-gaussdb/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/pom.xml b/pom.xml
index fe2fc778b..a603a665c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,6 +53,7 @@ under the License.