diff --git a/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/catalog/CrateDBTypeMapper.java b/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/catalog/CrateDBTypeMapper.java index 43a6e43f1..4bc601c1f 100644 --- a/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/catalog/CrateDBTypeMapper.java +++ b/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/catalog/CrateDBTypeMapper.java @@ -47,6 +47,10 @@ protected DataType getMapping(String pgType, int precision, int scale) { return DataTypes.STRING(); case PG_STRING_ARRAY: return DataTypes.ARRAY(DataTypes.STRING()); + case PG_JSON: // TODO: Add support for JSON type. + return null; + case PG_JSONB: // CrateDB supports JSON type, but not JSONB. + return null; default: return super.getMapping(pgType, precision, scale); } diff --git a/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/dialect/CrateDBDialectConverter.java b/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/dialect/CrateDBDialectConverter.java index a4c4c4309..e63c8f778 100644 --- a/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/dialect/CrateDBDialectConverter.java +++ b/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/dialect/CrateDBDialectConverter.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.connector.jdbc.postgres.database.dialect.CompatiblePostgresDialectConverter; +import org.apache.flink.table.data.StringData; import org.apache.flink.table.types.logical.RowType; /** @@ -39,4 +40,9 @@ public CrateDBDialectConverter(RowType rowType) { public String compatibleConverterName() { return "CrateDB"; } + + @Override + protected JdbcDeserializationConverter createVarcharConverter() { + return val -> StringData.fromString((String) val); + } } diff --git a/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresTypeMapper.java b/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresTypeMapper.java index 5fc19ea36..f7c1cdecb 100644 --- a/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresTypeMapper.java +++ b/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresTypeMapper.java @@ -84,6 +84,8 @@ public class PostgresTypeMapper implements JdbcCatalogTypeMapper { private static final String PG_CHARACTER_ARRAY = "_character"; private static final String PG_CHARACTER_VARYING = "varchar"; private static final String PG_CHARACTER_VARYING_ARRAY = "_varchar"; + protected static final String PG_JSON = "json"; + protected static final String PG_JSONB = "jsonb"; @Override public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex) @@ -157,6 +159,8 @@ protected DataType getMapping(String pgType, int precision, int scale) { case PG_CHARACTER_VARYING_ARRAY: return DataTypes.ARRAY(DataTypes.VARCHAR(precision)); case PG_TEXT: + case PG_JSON: + case PG_JSONB: return DataTypes.STRING(); case PG_TEXT_ARRAY: return DataTypes.ARRAY(DataTypes.STRING()); diff --git a/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/dialect/PostgresDialectConverter.java b/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/dialect/PostgresDialectConverter.java index c0cf6cfe5..68a50324b 100644 --- a/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/dialect/PostgresDialectConverter.java +++ b/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/dialect/PostgresDialectConverter.java @@ -21,12 +21,15 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialectConverter; import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.StringData; import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.utils.LogicalTypeUtils; +import org.postgresql.util.PGobject; + import java.lang.reflect.Array; /** @@ -91,7 +94,23 @@ private JdbcDeserializationConverter createPostgresArrayConverter(ArrayType arra // Have its own method so that Postgres can support primitives that super class doesn't support // in the future private JdbcDeserializationConverter createPrimitiveConverter(LogicalType type) { - return super.createInternalConverter(type); + switch (type.getTypeRoot()) { + case VARCHAR: + return createVarcharConverter(); + default: + return super.createInternalConverter(type); + } + } + + protected JdbcDeserializationConverter createVarcharConverter() { + return val -> { + if (val instanceof PGobject) { + PGobject obj = (PGobject) val; + return StringData.fromString(obj.getValue()); + } else { + return StringData.fromString((String) val); + } + }; } @Override