> forwardOptions() {
+ return Stream.of(
+ URL,
+ TABLE_NAME,
+ USERNAME,
+ PASSWORD,
+ DRIVER,
+ SINK_BUFFER_FLUSH_MAX_ROWS,
+ SINK_BUFFER_FLUSH_INTERVAL,
+ SINK_MAX_RETRIES,
+ MAX_RETRY_TIMEOUT,
+ SCAN_FETCH_SIZE,
+ SCAN_AUTO_COMMIT)
+ .collect(Collectors.toSet());
+ }
+
+ private void validateConfigOptions(ReadableConfig config, ClassLoader classLoader) {
+ String jdbcUrl = config.get(URL);
+ // JdbcDialectLoader.load(jdbcUrl, classLoader);
+
+ checkAllOrNone(config, new ConfigOption[] {USERNAME, PASSWORD});
+
+ checkAllOrNone(
+ config,
+ new ConfigOption[] {
+ SCAN_PARTITION_COLUMN,
+ SCAN_PARTITION_NUM,
+ SCAN_PARTITION_LOWER_BOUND,
+ SCAN_PARTITION_UPPER_BOUND
+ });
+
+ if (config.getOptional(SCAN_PARTITION_LOWER_BOUND).isPresent()
+ && config.getOptional(SCAN_PARTITION_UPPER_BOUND).isPresent()) {
+ long lowerBound = config.get(SCAN_PARTITION_LOWER_BOUND);
+ long upperBound = config.get(SCAN_PARTITION_UPPER_BOUND);
+ if (lowerBound > upperBound) {
+ throw new IllegalArgumentException(
+ String.format(
+ "'%s'='%s' must not be larger than '%s'='%s'.",
+ SCAN_PARTITION_LOWER_BOUND.key(),
+ lowerBound,
+ SCAN_PARTITION_UPPER_BOUND.key(),
+ upperBound));
+ }
+ }
+
+ checkAllOrNone(config, new ConfigOption[] {LOOKUP_CACHE_MAX_ROWS, LOOKUP_CACHE_TTL});
+
+ if (config.get(LOOKUP_MAX_RETRIES) < 0) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The value of '%s' option shouldn't be negative, but is %s.",
+ LOOKUP_MAX_RETRIES.key(), config.get(LOOKUP_MAX_RETRIES)));
+ }
+
+ if (config.get(SINK_MAX_RETRIES) < 0) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The value of '%s' option shouldn't be negative, but is %s.",
+ SINK_MAX_RETRIES.key(), config.get(SINK_MAX_RETRIES)));
+ }
+
+ if (config.get(MAX_RETRY_TIMEOUT).getSeconds() <= 0) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The value of '%s' option must be in second granularity and shouldn't be smaller than 1 second, but is %s.",
+ MAX_RETRY_TIMEOUT.key(),
+ config.get(
+ ConfigOptions.key(MAX_RETRY_TIMEOUT.key()).stringType().noDefaultValue())));
+ }
+ }
+
+ private void checkAllOrNone(ReadableConfig config, ConfigOption>[] configOptions) {
+ int presentCount = 0;
+ for (ConfigOption configOption : configOptions) {
+ if (config.getOptional(configOption).isPresent()) {
+ presentCount++;
+ }
+ }
+ String[] propertyNames =
+ Arrays.stream(configOptions).map(ConfigOption::key).toArray(String[]::new);
+ Preconditions.checkArgument(
+ configOptions.length == presentCount || presentCount == 0,
+ "Either all or none of the following options should be provided:\n"
+ + String.join("\n", propertyNames));
+ }
+}
diff --git a/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/jdbc/SqrlPostgresDialect.java b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/jdbc/SqrlPostgresDialect.java
new file mode 100644
index 00000000..4285bfbc
--- /dev/null
+++ b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/jdbc/SqrlPostgresDialect.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright © 2024 DataSQRL (contact@datasqrl.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datasqrl.connector.postgresql.jdbc;
+
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+
+/**
+ * JDBC dialect for PostgreSQL.
+ *
+ * SQRL: Add quoting to identifiers
+ */
+public class SqrlPostgresDialect extends AbstractDialect {
+
+ private static final long serialVersionUID = 1L;
+
+ // Define MAX/MIN precision of TIMESTAMP type according to PostgreSQL docs:
+ // https://www.postgresql.org/docs/12/datatype-datetime.html
+ private static final int MAX_TIMESTAMP_PRECISION = 6;
+ private static final int MIN_TIMESTAMP_PRECISION = 1;
+
+ // Define MAX/MIN precision of DECIMAL type according to PostgreSQL docs:
+ // https://www.postgresql.org/docs/12/datatype-numeric.html#DATATYPE-NUMERIC-DECIMAL
+ private static final int MAX_DECIMAL_PRECISION = 1000;
+ private static final int MIN_DECIMAL_PRECISION = 1;
+
+ @Override
+ public JdbcRowConverter getRowConverter(RowType rowType) {
+ return new SqrlPostgresRowConverter(rowType);
+ }
+
+ @Override
+ public String getLimitClause(long limit) {
+ return "LIMIT " + limit;
+ }
+
+ @Override
+ public Optional defaultDriverName() {
+ return Optional.of("org.postgresql.Driver");
+ }
+
+ /** Postgres upsert query. It use ON CONFLICT ... DO UPDATE SET.. to replace into Postgres. */
+ @Override
+ public Optional getUpsertStatement(
+ String tableName, String[] fieldNames, String[] uniqueKeyFields) {
+ String uniqueColumns =
+ Arrays.stream(uniqueKeyFields).map(this::quoteIdentifier).collect(Collectors.joining(", "));
+ String updateClause =
+ Arrays.stream(fieldNames)
+ .map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f))
+ .collect(Collectors.joining(", "));
+ return Optional.of(
+ getInsertIntoStatement(tableName, fieldNames)
+ + " ON CONFLICT ("
+ + uniqueColumns
+ + ")"
+ + " DO UPDATE SET "
+ + updateClause);
+ }
+
+ @Override
+ public void validate(RowType rowType) throws ValidationException {
+ List unsupportedTypes =
+ rowType.getFields().stream()
+ .map(RowField::getType)
+ .filter(type -> LogicalTypeRoot.RAW.equals(type.getTypeRoot()))
+ .filter(type -> !isSupportedType(type))
+ .collect(Collectors.toList());
+
+ if (!unsupportedTypes.isEmpty()) {
+ throw new ValidationException(
+ String.format(
+ "The %s dialect doesn't support type: %s.", this.dialectName(), unsupportedTypes));
+ }
+
+ super.validate(rowType);
+ }
+
+ private boolean isSupportedType(LogicalType type) {
+ return SqrlPostgresRowConverter.sqrlSerializers.containsKey(type.getDefaultConversion());
+ }
+
+ @Override
+ public String quoteIdentifier(String identifier) {
+ return "\"" + identifier + "\"";
+ }
+
+ @Override
+ public String dialectName() {
+ return "PostgreSQL";
+ }
+
+ @Override
+ public Optional decimalPrecisionRange() {
+ return Optional.of(Range.of(MIN_DECIMAL_PRECISION, MAX_DECIMAL_PRECISION));
+ }
+
+ @Override
+ public Optional timestampPrecisionRange() {
+ return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION));
+ }
+
+ @Override
+ public Set supportedTypes() {
+ // The data types used in PostgreSQL are list at:
+ // https://www.postgresql.org/docs/12/datatype.html
+
+ // TODO: We can't convert BINARY data type to
+ // PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in
+ // LegacyTypeInfoDataTypeConverter.
+
+ return EnumSet.of(
+ LogicalTypeRoot.CHAR,
+ LogicalTypeRoot.VARCHAR,
+ LogicalTypeRoot.BOOLEAN,
+ LogicalTypeRoot.VARBINARY,
+ LogicalTypeRoot.DECIMAL,
+ LogicalTypeRoot.TINYINT,
+ LogicalTypeRoot.SMALLINT,
+ LogicalTypeRoot.INTEGER,
+ LogicalTypeRoot.BIGINT,
+ LogicalTypeRoot.FLOAT,
+ LogicalTypeRoot.DOUBLE,
+ LogicalTypeRoot.DATE,
+ LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE,
+ LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
+ LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
+ LogicalTypeRoot.ARRAY,
+ LogicalTypeRoot.MAP,
+ LogicalTypeRoot.RAW // see validate() for supported structured types
+ );
+ }
+}
diff --git a/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/jdbc/SqrlPostgresRowConverter.java b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/jdbc/SqrlPostgresRowConverter.java
new file mode 100644
index 00000000..8538fcb2
--- /dev/null
+++ b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/jdbc/SqrlPostgresRowConverter.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright © 2024 DataSQRL (contact@datasqrl.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datasqrl.connector.postgresql.jdbc;
+
+import com.datasqrl.connector.postgresql.type.JdbcTypeSerializer;
+import java.lang.reflect.Type;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
+import org.postgresql.jdbc.PgArray;
+
+/**
+ * Runtime converter that responsible to convert between JDBC object and Flink internal object for
+ * PostgreSQL.
+ *
+ * SQRL:Add array support
+ */
+public class SqrlPostgresRowConverter extends SqrlBaseJdbcRowConverter {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final Map<
+ Type, JdbcTypeSerializer>
+ sqrlSerializers = discoverSerializers();
+
+ private static Map<
+ Type, JdbcTypeSerializer>
+ discoverSerializers() {
+ return ServiceLoader.load(JdbcTypeSerializer.class).stream()
+ .map(f -> f.get())
+ .filter(f -> f.getDialectId().equalsIgnoreCase("postgres"))
+ .collect(Collectors.toMap(JdbcTypeSerializer::getConversionClass, t -> t));
+ }
+
+ @Override
+ public String converterName() {
+ return "PostgreSQL";
+ }
+
+ public SqrlPostgresRowConverter(RowType rowType) {
+ super(rowType);
+ }
+
+ @Override
+ public JdbcDeserializationConverter createInternalConverter(LogicalType type) {
+ if (sqrlSerializers.containsKey(type.getDefaultConversion())) {
+ return sqrlSerializers.get(type.getDefaultConversion()).getDeserializerConverter().create();
+ } else {
+ return super.createInternalConverter(type);
+ }
+ }
+
+ @Override
+ protected JdbcSerializationConverter wrapIntoNullableExternalConverter(
+ JdbcSerializationConverter jdbcSerializationConverter, LogicalType type) {
+ if (sqrlSerializers.containsKey(type.getDefaultConversion())) {
+ return jdbcSerializationConverter::serialize;
+ } else {
+ return super.wrapIntoNullableExternalConverter(jdbcSerializationConverter, type);
+ }
+ }
+
+ @Override
+ protected JdbcSerializationConverter createExternalConverter(LogicalType type) {
+ if (sqrlSerializers.containsKey(type.getDefaultConversion())) {
+ return sqrlSerializers.get(type.getDefaultConversion()).getSerializerConverter(type).create();
+ } else {
+ return super.createExternalConverter(type);
+ }
+ }
+
+ @Override
+ protected String getArrayType() {
+ return "bytea";
+ }
+
+ @Override
+ public JdbcDeserializationConverter createArrayConverter(ArrayType arrayType) {
+ // Since PGJDBC 42.2.15 (https://github.com/pgjdbc/pgjdbc/pull/1194) bytea[] is wrapped in
+ // primitive byte arrays
+ final Class> elementClass =
+ LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType());
+ final JdbcDeserializationConverter elementConverter =
+ createNullableInternalConverter(arrayType.getElementType());
+ return val -> {
+ // sqrl: check if scalar array
+
+ Object[] in;
+ if (val instanceof PgArray) {
+ PgArray pgArray = (PgArray) val;
+ in = (Object[]) pgArray.getArray();
+ } else {
+ in = (Object[]) val;
+ }
+ final Object[] array =
+ (Object[]) java.lang.reflect.Array.newInstance(elementClass, in.length);
+ for (int i = 0; i < in.length; i++) {
+ array[i] = elementConverter.deserialize(in[i]);
+ }
+ return new GenericArrayData(array);
+ };
+ }
+}
diff --git a/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/FlinkArrayTypeUtil.java b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/FlinkArrayTypeUtil.java
new file mode 100644
index 00000000..ed606c23
--- /dev/null
+++ b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/FlinkArrayTypeUtil.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright © 2024 DataSQRL (contact@datasqrl.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datasqrl.connector.postgresql.type;
+
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+public class FlinkArrayTypeUtil {
+
+ public static LogicalType getBaseFlinkArrayType(LogicalType type) {
+ if (type instanceof ArrayType) {
+ return getBaseFlinkArrayType(((ArrayType) type).getElementType());
+ }
+ return type;
+ }
+
+ public static boolean isScalarArray(LogicalType type) {
+ if (type instanceof ArrayType) {
+ LogicalType elementType = ((ArrayType) type).getElementType();
+ return isScalar(elementType) || isScalarArray(elementType);
+ }
+ return false;
+ }
+
+ public static boolean isScalar(LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case BOOLEAN:
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ case BIGINT:
+ case FLOAT:
+ case DOUBLE:
+ case CHAR:
+ case VARCHAR:
+ case BINARY:
+ case VARBINARY:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_TIME_ZONE:
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ case DECIMAL:
+ return true;
+ default:
+ return false;
+ }
+ }
+}
diff --git a/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/JdbcTypeSerializer.java b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/JdbcTypeSerializer.java
new file mode 100644
index 00000000..95904e59
--- /dev/null
+++ b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/JdbcTypeSerializer.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright © 2024 DataSQRL (contact@datasqrl.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datasqrl.connector.postgresql.type;
+
+import org.apache.flink.table.types.logical.LogicalType;
+
+public interface JdbcTypeSerializer {
+
+ String getDialectId();
+
+ Class getConversionClass();
+
+ String dialectTypeName();
+
+ GenericDeserializationConverter getDeserializerConverter();
+
+ GenericSerializationConverter getSerializerConverter(LogicalType type);
+
+ interface GenericSerializationConverter {
+ T create();
+ }
+
+ interface GenericDeserializationConverter {
+ T create();
+ }
+}
diff --git a/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/PostgresArrayTypeConverter.java b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/PostgresArrayTypeConverter.java
new file mode 100644
index 00000000..a4c9b835
--- /dev/null
+++ b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/PostgresArrayTypeConverter.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright © 2024 DataSQRL (contact@datasqrl.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datasqrl.connector.postgresql.type;
+
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+public class PostgresArrayTypeConverter {
+
+ /** Return the base array type for flink type */
+ public static String getArrayScalarName(LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ return "text";
+ case BOOLEAN:
+ return "boolean";
+ case BINARY:
+ case VARBINARY:
+ return "bytea";
+ case DECIMAL:
+ return "decimal";
+ case TINYINT:
+ return "smallint";
+ case SMALLINT:
+ return "smallint";
+ case INTEGER:
+ return "integer";
+ case BIGINT:
+ return "bigint";
+ case FLOAT:
+ return "real"; // PostgreSQL uses REAL for float
+ case DOUBLE:
+ return "double";
+ case DATE:
+ return "date";
+ case TIME_WITHOUT_TIME_ZONE:
+ return "time without time zone";
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return "timestamp without time zone";
+ case TIMESTAMP_WITH_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return "timestamptz";
+ case INTERVAL_YEAR_MONTH:
+ return "interval year to month";
+ case INTERVAL_DAY_TIME:
+ return "interval day to second";
+ case NULL:
+ return "void";
+ case ARRAY:
+ return getArrayScalarName(((ArrayType) type).getElementType());
+ case MULTISET:
+ case MAP:
+ case ROW:
+ case DISTINCT_TYPE:
+ case STRUCTURED_TYPE:
+ case RAW:
+ case SYMBOL:
+ case UNRESOLVED:
+ default:
+ throw new RuntimeException("Cannot convert type to array type");
+ }
+ }
+}
diff --git a/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/PostgresJsonTypeSerializer.java b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/PostgresJsonTypeSerializer.java
new file mode 100644
index 00000000..c3b0458b
--- /dev/null
+++ b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/PostgresJsonTypeSerializer.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright © 2024 DataSQRL (contact@datasqrl.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datasqrl.connector.postgresql.type;
+
+import com.datasqrl.types.json.FlinkJsonType;
+import com.datasqrl.types.json.FlinkJsonTypeSerializer;
+import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.JdbcDeserializationConverter;
+import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.JdbcSerializationConverter;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.postgresql.util.PGobject;
+
+public class PostgresJsonTypeSerializer
+ implements JdbcTypeSerializer {
+
+ @Override
+ public String getDialectId() {
+ return "postgres";
+ }
+
+ @Override
+ public Class getConversionClass() {
+ return FlinkJsonType.class;
+ }
+
+ @Override
+ public String dialectTypeName() {
+ return "jsonb";
+ }
+
+ @Override
+ public GenericDeserializationConverter getDeserializerConverter() {
+ return () ->
+ (val) -> {
+ FlinkJsonType t = (FlinkJsonType) val;
+ return t.getJson();
+ };
+ }
+
+ @Override
+ public GenericSerializationConverter getSerializerConverter(
+ LogicalType type) {
+ FlinkJsonTypeSerializer typeSerializer = new FlinkJsonTypeSerializer();
+
+ return () ->
+ (val, index, statement) -> {
+ if (val != null && !val.isNullAt(index)) {
+ PGobject pgObject = new PGobject();
+ pgObject.setType("json");
+ RawValueData object = val.getRawValue(index);
+ FlinkJsonType vec = object.toObject(typeSerializer);
+ if (vec == null) {
+ statement.setObject(index, null);
+ } else {
+ pgObject.setValue(vec.getJson().toString());
+ statement.setObject(index, pgObject);
+ }
+ } else {
+ statement.setObject(index, null);
+ }
+ };
+ }
+}
diff --git a/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/PostgresRowTypeSerializer.java b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/PostgresRowTypeSerializer.java
new file mode 100644
index 00000000..c7b2792e
--- /dev/null
+++ b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/PostgresRowTypeSerializer.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright © 2024 DataSQRL (contact@datasqrl.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datasqrl.connector.postgresql.type;
+
+import com.datasqrl.flink.format.json.SqrlRowDataToJsonConverters;
+import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.JdbcDeserializationConverter;
+import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.JdbcSerializationConverter;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonFormatOptions.MapNullKeyMode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+import org.postgresql.util.PGobject;
+
+public class PostgresRowTypeSerializer
+ implements JdbcTypeSerializer {
+
+ @Override
+ public String getDialectId() {
+ return "postgres";
+ }
+
+ @Override
+ public Class getConversionClass() {
+ return Row[].class;
+ }
+
+ @Override
+ public String dialectTypeName() {
+ return "jsonb";
+ }
+
+ @Override
+ public GenericDeserializationConverter getDeserializerConverter() {
+ return () -> {
+ return (val) -> null;
+ };
+ }
+
+ @Override
+ public GenericSerializationConverter getSerializerConverter(
+ LogicalType type) {
+ ObjectMapper mapper = new ObjectMapper();
+ return () ->
+ (val, index, statement) -> {
+ if (val != null && !val.isNullAt(index)) {
+ SqrlRowDataToJsonConverters rowDataToJsonConverter =
+ new SqrlRowDataToJsonConverters(TimestampFormat.SQL, MapNullKeyMode.DROP, "null");
+
+ ArrayType arrayType = (ArrayType) type;
+ ObjectNode objectNode = mapper.createObjectNode();
+ JsonNode convert =
+ rowDataToJsonConverter
+ .createConverter(arrayType.getElementType())
+ .convert(mapper, objectNode, val);
+
+ PGobject pgObject = new PGobject();
+ pgObject.setType("json");
+ pgObject.setValue(convert.toString());
+ statement.setObject(index, pgObject);
+ } else {
+ statement.setObject(index, null);
+ }
+ };
+ }
+}
diff --git a/connectors/postgresql-connector/src/main/resources/META-INF/services/com.datasqrl.connector.postgresql.type.JdbcTypeSerializer b/connectors/postgresql-connector/src/main/resources/META-INF/services/com.datasqrl.connector.postgresql.type.JdbcTypeSerializer
new file mode 100644
index 00000000..0d0b9e67
--- /dev/null
+++ b/connectors/postgresql-connector/src/main/resources/META-INF/services/com.datasqrl.connector.postgresql.type.JdbcTypeSerializer
@@ -0,0 +1,2 @@
+com.datasqrl.connector.postgresql.type.PostgresRowTypeSerializer
+com.datasqrl.connector.postgresql.type.PostgresJsonTypeSerializer
\ No newline at end of file
diff --git a/connectors/postgresql-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/connectors/postgresql-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 00000000..eeae9c1d
--- /dev/null
+++ b/connectors/postgresql-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1 @@
+com.datasqrl.connector.postgresql.jdbc.SqrlJdbcDynamicTableFactory
\ No newline at end of file
diff --git a/connectors/postgresql-connector/src/test/java/com/datasqrl/connector/postgresql/jdbc/FlinkJdbcTest.java b/connectors/postgresql-connector/src/test/java/com/datasqrl/connector/postgresql/jdbc/FlinkJdbcTest.java
new file mode 100644
index 00000000..1f87d778
--- /dev/null
+++ b/connectors/postgresql-connector/src/test/java/com/datasqrl/connector/postgresql/jdbc/FlinkJdbcTest.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright © 2024 DataSQRL (contact@datasqrl.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datasqrl.connector.postgresql.jdbc;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.testcontainers.containers.PostgreSQLContainer;
+
+@ExtendWith(MiniClusterExtension.class)
+public class FlinkJdbcTest {
+
+ @Test
+ public void testWriteAndReadToPostgres() throws Exception {
+ try (PostgreSQLContainer> postgresContainer = new PostgreSQLContainer<>("postgres:14")) {
+ postgresContainer.start();
+ try (Connection conn =
+ DriverManager.getConnection(
+ postgresContainer.getJdbcUrl(),
+ postgresContainer.getUsername(),
+ postgresContainer.getPassword());
+ Statement stmt = conn.createStatement()) {
+ String createTableSQL = "CREATE TABLE test_table (" + " id BIGINT, name VARCHAR " + ")";
+ stmt.executeUpdate(createTableSQL);
+ }
+
+ // Set up Flink mini cluster environment
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
+
+ // Create a PostgreSQL table using the Table API
+ tEnv.executeSql(
+ "CREATE TABLE test_table ("
+ + "id BIGINT,"
+ + "name STRING"
+ + ") WITH ("
+ + "'connector' = 'jdbc',"
+ + "'url' = '"
+ + postgresContainer.getJdbcUrl()
+ + "',"
+ + "'table-name' = 'test_table',"
+ + "'username' = '"
+ + postgresContainer.getUsername()
+ + "',"
+ + "'password' = '"
+ + postgresContainer.getPassword()
+ + "'"
+ + ")");
+
+ // Create a DataGen source to generate 10 rows of data
+ tEnv.executeSql(
+ "CREATE TABLE datagen_source ("
+ + "id BIGINT,"
+ + "name STRING"
+ + ") WITH ("
+ + "'connector' = 'datagen',"
+ + "'rows-per-second' = '1',"
+ + "'fields.id.kind' = 'sequence',"
+ + "'fields.id.start' = '1',"
+ + "'fields.id.end' = '10',"
+ + "'fields.name.length' = '10'"
+ + ")");
+
+ // Insert data from the DataGen source into the PostgreSQL table
+ tEnv.executeSql("INSERT INTO test_table SELECT * FROM datagen_source").await();
+
+ // Verify the data has been inserted by querying the PostgreSQL database directly
+ Connection connection = postgresContainer.createConnection("");
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery("SELECT COUNT(*) FROM test_table");
+
+ int count = 0;
+ if (resultSet.next()) {
+ count = resultSet.getInt(1);
+ }
+
+ // Validate that 10 rows were inserted
+ assertEquals(10, count);
+
+ connection.close();
+ }
+ }
+}
diff --git a/flink-sql-runner/pom.xml b/flink-sql-runner/pom.xml
index b13747de..4fcb9afe 100644
--- a/flink-sql-runner/pom.xml
+++ b/flink-sql-runner/pom.xml
@@ -202,6 +202,38 @@
runtime
test
+
+
+
+ ${project.groupId}
+ flexible-csv-format
+ ${project.version}
+ runtime
+
+
+ ${project.groupId}
+ flexible-json-format
+ ${project.version}
+ runtime
+
+
+ ${project.groupId}
+ system-functions-discovery
+ ${project.version}
+ runtime
+
+
+ ${project.groupId}
+ vector-type
+ ${project.version}
+ runtime
+
+
+ ${project.groupId}
+ postgresql-connector
+ ${project.version}
+ runtime
+
diff --git a/pom.xml b/pom.xml
index 792f14c4..e8054099 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,6 +53,7 @@
formats
functions
types
+ connectors
testing
flink-sql-runner
diff --git a/types/pom.xml b/types/pom.xml
index 37740453..b2b72bc4 100644
--- a/types/pom.xml
+++ b/types/pom.xml
@@ -31,6 +31,7 @@
json-type
+ vector-type
diff --git a/types/vector-type/pom.xml b/types/vector-type/pom.xml
new file mode 100644
index 00000000..f54137a3
--- /dev/null
+++ b/types/vector-type/pom.xml
@@ -0,0 +1,38 @@
+
+
+
+ 4.0.0
+
+
+ com.datasqrl.flinkrunner
+ types
+ 1.0.0-SNAPSHOT
+
+
+ vector-type
+
+
+
+ org.apache.flink
+ flink-table-common
+ ${flink.version}
+ provided
+
+
+
diff --git a/types/vector-type/src/main/java/com/datasqrl/types/vector/FlinkVectorType.java b/types/vector-type/src/main/java/com/datasqrl/types/vector/FlinkVectorType.java
new file mode 100644
index 00000000..87dad7d2
--- /dev/null
+++ b/types/vector-type/src/main/java/com/datasqrl/types/vector/FlinkVectorType.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright © 2024 DataSQRL (contact@datasqrl.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datasqrl.types.vector;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+
+@DataTypeHint(
+ value = "RAW",
+ bridgedTo = FlinkVectorType.class,
+ rawSerializer = FlinkVectorTypeSerializer.class)
+public class FlinkVectorType {
+ public double[] value;
+
+ public FlinkVectorType(double[] value) {
+ this.value = value;
+ }
+
+ public double[] getValue() {
+ return value;
+ }
+}
diff --git a/types/vector-type/src/main/java/com/datasqrl/types/vector/FlinkVectorTypeSerializer.java b/types/vector-type/src/main/java/com/datasqrl/types/vector/FlinkVectorTypeSerializer.java
new file mode 100644
index 00000000..cbac8467
--- /dev/null
+++ b/types/vector-type/src/main/java/com/datasqrl/types/vector/FlinkVectorTypeSerializer.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright © 2024 DataSQRL (contact@datasqrl.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datasqrl.types.vector;
+
+import java.io.IOException;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class FlinkVectorTypeSerializer extends TypeSerializer {
+
+ @Override
+ public boolean isImmutableType() {
+ return true;
+ }
+
+ @Override
+ public FlinkVectorType createInstance() {
+ return new FlinkVectorType(null);
+ }
+
+ @Override
+ public FlinkVectorType copy(FlinkVectorType from) {
+ return new FlinkVectorType(from.getValue());
+ }
+
+ @Override
+ public FlinkVectorType copy(FlinkVectorType from, FlinkVectorType reuse) {
+ return copy(from);
+ }
+
+ @Override
+ public int getLength() {
+ return -1; // indicates that this serializer does not have a fixed length
+ }
+
+ @Override
+ public void serialize(FlinkVectorType record, DataOutputView target) throws IOException {
+ target.writeInt(record.getValue().length); // First write the length of the array
+ for (double v : record.getValue()) {
+ target.writeDouble(v); // Write each double value
+ }
+ }
+
+ @Override
+ public FlinkVectorType deserialize(DataInputView source) throws IOException {
+ int length = source.readInt();
+ double[] array = new double[length];
+ for (int i = 0; i < length; i++) {
+ array[i] = source.readDouble();
+ }
+ return new FlinkVectorType(array);
+ }
+
+ @Override
+ public FlinkVectorType deserialize(FlinkVectorType reuse, DataInputView source)
+ throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ target.writeUTF(source.readUTF());
+ }
+
+ @Override
+ public TypeSerializer duplicate() {
+ return this;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof FlinkVectorTypeSerializer;
+ }
+
+ @Override
+ public int hashCode() {
+ return FlinkVectorTypeSerializer.class.hashCode();
+ }
+
+ @Override
+ public TypeSerializerSnapshot snapshotConfiguration() {
+ return new FlinkVectorTypeSerializerSnapshot();
+ }
+}
diff --git a/types/vector-type/src/main/java/com/datasqrl/types/vector/FlinkVectorTypeSerializerSnapshot.java b/types/vector-type/src/main/java/com/datasqrl/types/vector/FlinkVectorTypeSerializerSnapshot.java
new file mode 100644
index 00000000..c90bfa4c
--- /dev/null
+++ b/types/vector-type/src/main/java/com/datasqrl/types/vector/FlinkVectorTypeSerializerSnapshot.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright © 2024 DataSQRL (contact@datasqrl.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datasqrl.types.vector;
+
+import java.io.IOException;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class FlinkVectorTypeSerializerSnapshot implements TypeSerializerSnapshot {
+
+ private Class serializerClass;
+
+ public FlinkVectorTypeSerializerSnapshot() {
+ this.serializerClass = FlinkVectorTypeSerializer.class;
+ }
+
+ @Override
+ public int getCurrentVersion() {
+ return 1;
+ }
+
+ @Override
+ public void writeSnapshot(DataOutputView out) throws IOException {
+ out.writeUTF(FlinkVectorTypeSerializer.class.getName());
+ }
+
+ @Override
+ public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader)
+ throws IOException {
+ String className = in.readUTF();
+ try {
+ this.serializerClass =
+ (Class) Class.forName(className, true, userCodeClassLoader);
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Failed to find serializer class: " + className, e);
+ }
+ }
+
+ @Override
+ public TypeSerializer restoreSerializer() {
+ try {
+ return serializerClass.newInstance();
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new RuntimeException(
+ "Failed to instantiate serializer class: " + serializerClass.getName(), e);
+ }
+ }
+
+ @Override
+ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
+ TypeSerializer newSerializer) {
+ if (newSerializer.getClass() == this.serializerClass) {
+ return TypeSerializerSchemaCompatibility.compatibleAsIs();
+ } else {
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+ }
+}