Skip to content

Commit 792679a

Browse files
committed
Add fixes for CrateDB
1 parent 00256d4 commit 792679a

File tree

4 files changed

+24
-10
lines changed

4 files changed

+24
-10
lines changed

flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/catalog/CrateDBTypeMapper.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ protected DataType getMapping(String pgType, int precision, int scale) {
4747
return DataTypes.STRING();
4848
case PG_STRING_ARRAY:
4949
return DataTypes.ARRAY(DataTypes.STRING());
50+
case PG_JSON: // CrateDB supports but not supported in the dialect yet.
51+
return null;
52+
case PG_JSONB: // CrateDB supports JSON type, but not JSONB.
53+
return null;
5054
default:
5155
return super.getMapping(pgType, precision, scale);
5256
}

flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/dialect/CrateDBDialectConverter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.connector.jdbc.postgres.database.dialect.CompatiblePostgresDialectConverter;
23+
import org.apache.flink.table.data.StringData;
2324
import org.apache.flink.table.types.logical.RowType;
2425

2526
/**
@@ -39,4 +40,9 @@ public CrateDBDialectConverter(RowType rowType) {
3940
public String compatibleConverterName() {
4041
return "CrateDB";
4142
}
43+
44+
@Override
45+
protected JdbcDeserializationConverter createVarcharConverter() {
46+
return val -> StringData.fromString((String) val);
47+
}
4248
}

flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/catalog/PostgresTypeMapper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ public class PostgresTypeMapper implements JdbcCatalogTypeMapper {
8484
private static final String PG_CHARACTER_ARRAY = "_character";
8585
private static final String PG_CHARACTER_VARYING = "varchar";
8686
private static final String PG_CHARACTER_VARYING_ARRAY = "_varchar";
87-
private static final String PG_JSON = "json";
88-
private static final String PG_JSONB = "jsonb";
87+
protected static final String PG_JSON = "json";
88+
protected static final String PG_JSONB = "jsonb";
8989

9090
@Override
9191
public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex)

flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/dialect/PostgresDialectConverter.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -96,19 +96,23 @@ private JdbcDeserializationConverter createPostgresArrayConverter(ArrayType arra
9696
private JdbcDeserializationConverter createPrimitiveConverter(LogicalType type) {
9797
switch (type.getTypeRoot()) {
9898
case VARCHAR:
99-
return val -> {
100-
if (val instanceof PGobject) {
101-
PGobject obj = (PGobject) val;
102-
return StringData.fromString(obj.getValue());
103-
} else {
104-
return StringData.fromString((String) val);
105-
}
106-
};
99+
return createVarcharConverter();
107100
default:
108101
return super.createInternalConverter(type);
109102
}
110103
}
111104

105+
protected JdbcDeserializationConverter createVarcharConverter() {
106+
return val -> {
107+
if (val instanceof PGobject) {
108+
PGobject obj = (PGobject) val;
109+
return StringData.fromString(obj.getValue());
110+
} else {
111+
return StringData.fromString((String) val);
112+
}
113+
};
114+
}
115+
112116
@Override
113117
public String converterName() {
114118
return "PostgreSQL";

0 commit comments

Comments
 (0)