diff --git a/connectors/pom.xml b/connectors/pom.xml new file mode 100644 index 00000000..28ac2f09 --- /dev/null +++ b/connectors/pom.xml @@ -0,0 +1,36 @@ + + + + + 4.0.0 + + + com.datasqrl.flinkrunner + flink-sql-runner-parent + 1.0.0-SNAPSHOT + + + connectors + pom + + + postgresql-connector + + + diff --git a/connectors/postgresql-connector/pom.xml b/connectors/postgresql-connector/pom.xml new file mode 100644 index 00000000..605a046b --- /dev/null +++ b/connectors/postgresql-connector/pom.xml @@ -0,0 +1,90 @@ + + + + 4.0.0 + + com.datasqrl.flinkrunner + connectors + 1.0.0-SNAPSHOT + + + postgresql-connector + Jdbc sink for flink 1.19 + + + + org.apache.flink + flink-connector-jdbc + 3.2.0-1.19 + + + org.postgresql + postgresql + ${postgres.version} + + + org.testcontainers + postgresql + ${testcontainers.version} + test + + + org.apache.flink + flink-table-runtime + ${flink.version} + test + + + org.apache.flink + flink-table-common + ${flink.version} + provided + + + com.datasqrl.flinkrunner + flexible-json-format + 1.0.0-SNAPSHOT + + + org.apache.flink + flink-csv + ${flink.version} + provided + + + org.apache.flink + flink-json + ${flink.version} + provided + + + org.apache.flink + flink-table-planner_2.12 + ${flink.version} + test + + + org.apache.flink + flink-test-utils + ${flink.version} + test + + + + diff --git a/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/jdbc/SqrlBaseJdbcRowConverter.java b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/jdbc/SqrlBaseJdbcRowConverter.java new file mode 100644 index 00000000..c1a2baf2 --- /dev/null +++ b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/jdbc/SqrlBaseJdbcRowConverter.java @@ -0,0 +1,123 @@ +/* + * Copyright © 2024 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl.connector.postgresql.jdbc; + +import static com.datasqrl.connector.postgresql.type.FlinkArrayTypeUtil.getBaseFlinkArrayType; +import static com.datasqrl.connector.postgresql.type.FlinkArrayTypeUtil.isScalarArray; +import static com.datasqrl.connector.postgresql.type.PostgresArrayTypeConverter.getArrayScalarName; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE; + +import java.sql.Array; +import java.sql.PreparedStatement; +import java.sql.Timestamp; +import java.sql.Types; +import java.time.LocalDateTime; +import lombok.SneakyThrows; +import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.data.binary.BinaryArrayData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.RowType; + +/** A sqrl class to handle arrays and extra data types */ +public abstract class SqrlBaseJdbcRowConverter extends AbstractJdbcRowConverter { + + public SqrlBaseJdbcRowConverter(RowType rowType) { + super(rowType); + } + + @Override + protected JdbcSerializationConverter wrapIntoNullableExternalConverter( + JdbcSerializationConverter jdbcSerializationConverter, LogicalType type) { + if (type.getTypeRoot() == TIMESTAMP_WITH_LOCAL_TIME_ZONE) { + int timestampWithTimezone = Types.TIMESTAMP_WITH_TIMEZONE; + return (val, index, statement) -> { + if (val == null || val.isNullAt(index) || LogicalTypeRoot.NULL.equals(type.getTypeRoot())) { + statement.setNull(index, timestampWithTimezone); + } else { + jdbcSerializationConverter.serialize(val, index, statement); + } + }; + } + return super.wrapIntoNullableExternalConverter(jdbcSerializationConverter, type); + } + + @Override + public JdbcDeserializationConverter createInternalConverter(LogicalType type) { + LogicalTypeRoot root = type.getTypeRoot(); + + if (root == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) { + return val -> + val instanceof LocalDateTime + ? TimestampData.fromLocalDateTime((LocalDateTime) val) + : TimestampData.fromTimestamp((Timestamp) val); + } else if (root == LogicalTypeRoot.ARRAY) { + ArrayType arrayType = (ArrayType) type; + return createArrayConverter(arrayType); + } else if (root == LogicalTypeRoot.ROW) { + return val -> val; + } else if (root == LogicalTypeRoot.MAP) { + return val -> val; + } else { + return super.createInternalConverter(type); + } + } + + @Override + protected JdbcSerializationConverter createExternalConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + final int tsPrecision = ((LocalZonedTimestampType) type).getPrecision(); + return (val, index, statement) -> + statement.setTimestamp(index, val.getTimestamp(index, tsPrecision).toTimestamp()); + case MULTISET: + case RAW: + default: + return super.createExternalConverter(type); + } + } + + @SneakyThrows + private void createSqlArrayObject( + LogicalType type, ArrayData data, int idx, PreparedStatement statement) { + // Scalar arrays of any dimension are one array call + if (isScalarArray(type)) { + Object[] boxed; + if (data instanceof GenericArrayData) { + boxed = ((GenericArrayData) data).toObjectArray(); + } else if (data instanceof BinaryArrayData) { + boxed = ((BinaryArrayData) data).toObjectArray(getBaseFlinkArrayType(type)); + } else { + throw new RuntimeException("Unsupported ArrayData type: " + data.getClass()); + } + Array array = statement.getConnection().createArrayOf(getArrayScalarName(type), boxed); + statement.setArray(idx, array); + } else { + // If it is not a scalar array (e.g. row type), use an empty byte array. + Array array = statement.getConnection().createArrayOf(getArrayType(), new Byte[0]); + statement.setArray(idx, array); + } + } + + protected abstract String getArrayType(); + + public abstract JdbcDeserializationConverter createArrayConverter(ArrayType arrayType); +} diff --git a/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/jdbc/SqrlJdbcDynamicTableFactory.java b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/jdbc/SqrlJdbcDynamicTableFactory.java new file mode 100644 index 00000000..002f35a3 --- /dev/null +++ b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/jdbc/SqrlJdbcDynamicTableFactory.java @@ -0,0 +1,281 @@ +/* + * Copyright © 2024 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl.connector.postgresql.jdbc; + +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.DRIVER; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.LOOKUP_CACHE_MAX_ROWS; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.LOOKUP_CACHE_MISSING_KEY; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.LOOKUP_CACHE_TTL; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.LOOKUP_MAX_RETRIES; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.MAX_RETRY_TIMEOUT; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.PASSWORD; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SCAN_AUTO_COMMIT; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SCAN_FETCH_SIZE; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SCAN_PARTITION_COLUMN; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SCAN_PARTITION_LOWER_BOUND; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SCAN_PARTITION_NUM; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SCAN_PARTITION_UPPER_BOUND; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SINK_MAX_RETRIES; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.SINK_PARALLELISM; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.TABLE_NAME; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.URL; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.USERNAME; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.dialect.JdbcDialect; +import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader; +import org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions; +import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions; +import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSink; +import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.lookup.LookupOptions; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Preconditions; + +/** + * Factory for creating configured instances of {@link JdbcDynamicTableSource} and {@link + * JdbcDynamicTableSink}. + */ +@Internal +public class SqrlJdbcDynamicTableFactory implements DynamicTableSinkFactory { + + public static final String IDENTIFIER = "jdbc-sqrl"; + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + final FactoryUtil.TableFactoryHelper helper = + FactoryUtil.createTableFactoryHelper(this, context); + final ReadableConfig config = helper.getOptions(); + + helper.validate(); + validateConfigOptions(config, context.getClassLoader()); + validateDataTypeWithJdbcDialect( + context.getPhysicalRowDataType(), config.get(URL), context.getClassLoader()); + InternalJdbcConnectionOptions jdbcOptions = getJdbcOptions(config, context.getClassLoader()); + + return new JdbcDynamicTableSink( + jdbcOptions, + getJdbcExecutionOptions(config), + getJdbcDmlOptions( + jdbcOptions, context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes()), + context.getPhysicalRowDataType()); + } + + private static void validateDataTypeWithJdbcDialect( + DataType dataType, String url, ClassLoader classLoader) { + JdbcDialect dialect = loadDialect(url, classLoader); + + dialect.validate((RowType) dataType.getLogicalType()); + } + + private InternalJdbcConnectionOptions getJdbcOptions( + ReadableConfig readableConfig, ClassLoader classLoader) { + final String url = readableConfig.get(URL); + final InternalJdbcConnectionOptions.Builder builder = + InternalJdbcConnectionOptions.builder() + .setClassLoader(classLoader) + .setDBUrl(url) + .setTableName(readableConfig.get(TABLE_NAME)) + .setDialect(loadDialect(url, classLoader)) + .setParallelism(readableConfig.getOptional(SINK_PARALLELISM).orElse(null)) + .setConnectionCheckTimeoutSeconds( + (int) readableConfig.get(MAX_RETRY_TIMEOUT).getSeconds()); + + readableConfig.getOptional(DRIVER).ifPresent(builder::setDriverName); + readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername); + readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword); + return builder.build(); + } + + private static JdbcDialect loadDialect(String url, ClassLoader classLoader) { + JdbcDialect dialect = JdbcDialectLoader.load(url, classLoader); + // sqrl: standard postgres dialect with extended dialect + if (dialect.dialectName().equalsIgnoreCase("PostgreSQL")) { + return new SqrlPostgresDialect(); + } + return dialect; + } + + private JdbcExecutionOptions getJdbcExecutionOptions(ReadableConfig config) { + final JdbcExecutionOptions.Builder builder = new JdbcExecutionOptions.Builder(); + builder.withBatchSize(config.get(SINK_BUFFER_FLUSH_MAX_ROWS)); + builder.withBatchIntervalMs(config.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis()); + builder.withMaxRetries(config.get(SINK_MAX_RETRIES)); + return builder.build(); + } + + private JdbcDmlOptions getJdbcDmlOptions( + InternalJdbcConnectionOptions jdbcOptions, DataType dataType, int[] primaryKeyIndexes) { + + String[] keyFields = + Arrays.stream(primaryKeyIndexes) + .mapToObj(i -> DataType.getFieldNames(dataType).get(i)) + .toArray(String[]::new); + + return JdbcDmlOptions.builder() + .withTableName(jdbcOptions.getTableName()) + .withDialect(jdbcOptions.getDialect()) + .withFieldNames(DataType.getFieldNames(dataType).toArray(new String[0])) + .withKeyFields(keyFields.length > 0 ? keyFields : null) + .build(); + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + Set> requiredOptions = new HashSet<>(); + requiredOptions.add(URL); + requiredOptions.add(TABLE_NAME); + return requiredOptions; + } + + @Override + public Set> optionalOptions() { + Set> optionalOptions = new HashSet<>(); + optionalOptions.add(DRIVER); + optionalOptions.add(USERNAME); + optionalOptions.add(PASSWORD); + optionalOptions.add(SCAN_PARTITION_COLUMN); + optionalOptions.add(SCAN_PARTITION_LOWER_BOUND); + optionalOptions.add(SCAN_PARTITION_UPPER_BOUND); + optionalOptions.add(SCAN_PARTITION_NUM); + optionalOptions.add(SCAN_FETCH_SIZE); + optionalOptions.add(SCAN_AUTO_COMMIT); + optionalOptions.add(LOOKUP_CACHE_MAX_ROWS); + optionalOptions.add(LOOKUP_CACHE_TTL); + optionalOptions.add(LOOKUP_MAX_RETRIES); + optionalOptions.add(LOOKUP_CACHE_MISSING_KEY); + optionalOptions.add(SINK_BUFFER_FLUSH_MAX_ROWS); + optionalOptions.add(SINK_BUFFER_FLUSH_INTERVAL); + optionalOptions.add(SINK_MAX_RETRIES); + optionalOptions.add(SINK_PARALLELISM); + optionalOptions.add(MAX_RETRY_TIMEOUT); + optionalOptions.add(LookupOptions.CACHE_TYPE); + optionalOptions.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS); + optionalOptions.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE); + optionalOptions.add(LookupOptions.PARTIAL_CACHE_MAX_ROWS); + optionalOptions.add(LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY); + optionalOptions.add(LookupOptions.MAX_RETRIES); + return optionalOptions; + } + + @Override + public Set> forwardOptions() { + return Stream.of( + URL, + TABLE_NAME, + USERNAME, + PASSWORD, + DRIVER, + SINK_BUFFER_FLUSH_MAX_ROWS, + SINK_BUFFER_FLUSH_INTERVAL, + SINK_MAX_RETRIES, + MAX_RETRY_TIMEOUT, + SCAN_FETCH_SIZE, + SCAN_AUTO_COMMIT) + .collect(Collectors.toSet()); + } + + private void validateConfigOptions(ReadableConfig config, ClassLoader classLoader) { + String jdbcUrl = config.get(URL); + // JdbcDialectLoader.load(jdbcUrl, classLoader); + + checkAllOrNone(config, new ConfigOption[] {USERNAME, PASSWORD}); + + checkAllOrNone( + config, + new ConfigOption[] { + SCAN_PARTITION_COLUMN, + SCAN_PARTITION_NUM, + SCAN_PARTITION_LOWER_BOUND, + SCAN_PARTITION_UPPER_BOUND + }); + + if (config.getOptional(SCAN_PARTITION_LOWER_BOUND).isPresent() + && config.getOptional(SCAN_PARTITION_UPPER_BOUND).isPresent()) { + long lowerBound = config.get(SCAN_PARTITION_LOWER_BOUND); + long upperBound = config.get(SCAN_PARTITION_UPPER_BOUND); + if (lowerBound > upperBound) { + throw new IllegalArgumentException( + String.format( + "'%s'='%s' must not be larger than '%s'='%s'.", + SCAN_PARTITION_LOWER_BOUND.key(), + lowerBound, + SCAN_PARTITION_UPPER_BOUND.key(), + upperBound)); + } + } + + checkAllOrNone(config, new ConfigOption[] {LOOKUP_CACHE_MAX_ROWS, LOOKUP_CACHE_TTL}); + + if (config.get(LOOKUP_MAX_RETRIES) < 0) { + throw new IllegalArgumentException( + String.format( + "The value of '%s' option shouldn't be negative, but is %s.", + LOOKUP_MAX_RETRIES.key(), config.get(LOOKUP_MAX_RETRIES))); + } + + if (config.get(SINK_MAX_RETRIES) < 0) { + throw new IllegalArgumentException( + String.format( + "The value of '%s' option shouldn't be negative, but is %s.", + SINK_MAX_RETRIES.key(), config.get(SINK_MAX_RETRIES))); + } + + if (config.get(MAX_RETRY_TIMEOUT).getSeconds() <= 0) { + throw new IllegalArgumentException( + String.format( + "The value of '%s' option must be in second granularity and shouldn't be smaller than 1 second, but is %s.", + MAX_RETRY_TIMEOUT.key(), + config.get( + ConfigOptions.key(MAX_RETRY_TIMEOUT.key()).stringType().noDefaultValue()))); + } + } + + private void checkAllOrNone(ReadableConfig config, ConfigOption[] configOptions) { + int presentCount = 0; + for (ConfigOption configOption : configOptions) { + if (config.getOptional(configOption).isPresent()) { + presentCount++; + } + } + String[] propertyNames = + Arrays.stream(configOptions).map(ConfigOption::key).toArray(String[]::new); + Preconditions.checkArgument( + configOptions.length == presentCount || presentCount == 0, + "Either all or none of the following options should be provided:\n" + + String.join("\n", propertyNames)); + } +} diff --git a/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/jdbc/SqrlPostgresDialect.java b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/jdbc/SqrlPostgresDialect.java new file mode 100644 index 00000000..4285bfbc --- /dev/null +++ b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/jdbc/SqrlPostgresDialect.java @@ -0,0 +1,157 @@ +/* + * Copyright © 2024 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl.connector.postgresql.jdbc; + +import java.util.Arrays; +import java.util.EnumSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.flink.connector.jdbc.converter.JdbcRowConverter; +import org.apache.flink.connector.jdbc.dialect.AbstractDialect; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.RowType.RowField; + +/** + * JDBC dialect for PostgreSQL. + * + *

SQRL: Add quoting to identifiers + */ +public class SqrlPostgresDialect extends AbstractDialect { + + private static final long serialVersionUID = 1L; + + // Define MAX/MIN precision of TIMESTAMP type according to PostgreSQL docs: + // https://www.postgresql.org/docs/12/datatype-datetime.html + private static final int MAX_TIMESTAMP_PRECISION = 6; + private static final int MIN_TIMESTAMP_PRECISION = 1; + + // Define MAX/MIN precision of DECIMAL type according to PostgreSQL docs: + // https://www.postgresql.org/docs/12/datatype-numeric.html#DATATYPE-NUMERIC-DECIMAL + private static final int MAX_DECIMAL_PRECISION = 1000; + private static final int MIN_DECIMAL_PRECISION = 1; + + @Override + public JdbcRowConverter getRowConverter(RowType rowType) { + return new SqrlPostgresRowConverter(rowType); + } + + @Override + public String getLimitClause(long limit) { + return "LIMIT " + limit; + } + + @Override + public Optional defaultDriverName() { + return Optional.of("org.postgresql.Driver"); + } + + /** Postgres upsert query. It use ON CONFLICT ... DO UPDATE SET.. to replace into Postgres. */ + @Override + public Optional getUpsertStatement( + String tableName, String[] fieldNames, String[] uniqueKeyFields) { + String uniqueColumns = + Arrays.stream(uniqueKeyFields).map(this::quoteIdentifier).collect(Collectors.joining(", ")); + String updateClause = + Arrays.stream(fieldNames) + .map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f)) + .collect(Collectors.joining(", ")); + return Optional.of( + getInsertIntoStatement(tableName, fieldNames) + + " ON CONFLICT (" + + uniqueColumns + + ")" + + " DO UPDATE SET " + + updateClause); + } + + @Override + public void validate(RowType rowType) throws ValidationException { + List unsupportedTypes = + rowType.getFields().stream() + .map(RowField::getType) + .filter(type -> LogicalTypeRoot.RAW.equals(type.getTypeRoot())) + .filter(type -> !isSupportedType(type)) + .collect(Collectors.toList()); + + if (!unsupportedTypes.isEmpty()) { + throw new ValidationException( + String.format( + "The %s dialect doesn't support type: %s.", this.dialectName(), unsupportedTypes)); + } + + super.validate(rowType); + } + + private boolean isSupportedType(LogicalType type) { + return SqrlPostgresRowConverter.sqrlSerializers.containsKey(type.getDefaultConversion()); + } + + @Override + public String quoteIdentifier(String identifier) { + return "\"" + identifier + "\""; + } + + @Override + public String dialectName() { + return "PostgreSQL"; + } + + @Override + public Optional decimalPrecisionRange() { + return Optional.of(Range.of(MIN_DECIMAL_PRECISION, MAX_DECIMAL_PRECISION)); + } + + @Override + public Optional timestampPrecisionRange() { + return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION)); + } + + @Override + public Set supportedTypes() { + // The data types used in PostgreSQL are list at: + // https://www.postgresql.org/docs/12/datatype.html + + // TODO: We can't convert BINARY data type to + // PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in + // LegacyTypeInfoDataTypeConverter. + + return EnumSet.of( + LogicalTypeRoot.CHAR, + LogicalTypeRoot.VARCHAR, + LogicalTypeRoot.BOOLEAN, + LogicalTypeRoot.VARBINARY, + LogicalTypeRoot.DECIMAL, + LogicalTypeRoot.TINYINT, + LogicalTypeRoot.SMALLINT, + LogicalTypeRoot.INTEGER, + LogicalTypeRoot.BIGINT, + LogicalTypeRoot.FLOAT, + LogicalTypeRoot.DOUBLE, + LogicalTypeRoot.DATE, + LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, + LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, + LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, + LogicalTypeRoot.ARRAY, + LogicalTypeRoot.MAP, + LogicalTypeRoot.RAW // see validate() for supported structured types + ); + } +} diff --git a/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/jdbc/SqrlPostgresRowConverter.java b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/jdbc/SqrlPostgresRowConverter.java new file mode 100644 index 00000000..8538fcb2 --- /dev/null +++ b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/jdbc/SqrlPostgresRowConverter.java @@ -0,0 +1,121 @@ +/* + * Copyright © 2024 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl.connector.postgresql.jdbc; + +import com.datasqrl.connector.postgresql.type.JdbcTypeSerializer; +import java.lang.reflect.Type; +import java.util.Map; +import java.util.ServiceLoader; +import java.util.stream.Collectors; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.utils.LogicalTypeUtils; +import org.postgresql.jdbc.PgArray; + +/** + * Runtime converter that responsible to convert between JDBC object and Flink internal object for + * PostgreSQL. + * + *

SQRL:Add array support + */ +public class SqrlPostgresRowConverter extends SqrlBaseJdbcRowConverter { + + private static final long serialVersionUID = 1L; + + public static final Map< + Type, JdbcTypeSerializer> + sqrlSerializers = discoverSerializers(); + + private static Map< + Type, JdbcTypeSerializer> + discoverSerializers() { + return ServiceLoader.load(JdbcTypeSerializer.class).stream() + .map(f -> f.get()) + .filter(f -> f.getDialectId().equalsIgnoreCase("postgres")) + .collect(Collectors.toMap(JdbcTypeSerializer::getConversionClass, t -> t)); + } + + @Override + public String converterName() { + return "PostgreSQL"; + } + + public SqrlPostgresRowConverter(RowType rowType) { + super(rowType); + } + + @Override + public JdbcDeserializationConverter createInternalConverter(LogicalType type) { + if (sqrlSerializers.containsKey(type.getDefaultConversion())) { + return sqrlSerializers.get(type.getDefaultConversion()).getDeserializerConverter().create(); + } else { + return super.createInternalConverter(type); + } + } + + @Override + protected JdbcSerializationConverter wrapIntoNullableExternalConverter( + JdbcSerializationConverter jdbcSerializationConverter, LogicalType type) { + if (sqrlSerializers.containsKey(type.getDefaultConversion())) { + return jdbcSerializationConverter::serialize; + } else { + return super.wrapIntoNullableExternalConverter(jdbcSerializationConverter, type); + } + } + + @Override + protected JdbcSerializationConverter createExternalConverter(LogicalType type) { + if (sqrlSerializers.containsKey(type.getDefaultConversion())) { + return sqrlSerializers.get(type.getDefaultConversion()).getSerializerConverter(type).create(); + } else { + return super.createExternalConverter(type); + } + } + + @Override + protected String getArrayType() { + return "bytea"; + } + + @Override + public JdbcDeserializationConverter createArrayConverter(ArrayType arrayType) { + // Since PGJDBC 42.2.15 (https://github.com/pgjdbc/pgjdbc/pull/1194) bytea[] is wrapped in + // primitive byte arrays + final Class elementClass = + LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType()); + final JdbcDeserializationConverter elementConverter = + createNullableInternalConverter(arrayType.getElementType()); + return val -> { + // sqrl: check if scalar array + + Object[] in; + if (val instanceof PgArray) { + PgArray pgArray = (PgArray) val; + in = (Object[]) pgArray.getArray(); + } else { + in = (Object[]) val; + } + final Object[] array = + (Object[]) java.lang.reflect.Array.newInstance(elementClass, in.length); + for (int i = 0; i < in.length; i++) { + array[i] = elementConverter.deserialize(in[i]); + } + return new GenericArrayData(array); + }; + } +} diff --git a/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/FlinkArrayTypeUtil.java b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/FlinkArrayTypeUtil.java new file mode 100644 index 00000000..ed606c23 --- /dev/null +++ b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/FlinkArrayTypeUtil.java @@ -0,0 +1,62 @@ +/* + * Copyright © 2024 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl.connector.postgresql.type; + +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.LogicalType; + +public class FlinkArrayTypeUtil { + + public static LogicalType getBaseFlinkArrayType(LogicalType type) { + if (type instanceof ArrayType) { + return getBaseFlinkArrayType(((ArrayType) type).getElementType()); + } + return type; + } + + public static boolean isScalarArray(LogicalType type) { + if (type instanceof ArrayType) { + LogicalType elementType = ((ArrayType) type).getElementType(); + return isScalar(elementType) || isScalarArray(elementType); + } + return false; + } + + public static boolean isScalar(LogicalType type) { + switch (type.getTypeRoot()) { + case BOOLEAN: + case TINYINT: + case SMALLINT: + case INTEGER: + case BIGINT: + case FLOAT: + case DOUBLE: + case CHAR: + case VARCHAR: + case BINARY: + case VARBINARY: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + case DECIMAL: + return true; + default: + return false; + } + } +} diff --git a/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/JdbcTypeSerializer.java b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/JdbcTypeSerializer.java new file mode 100644 index 00000000..95904e59 --- /dev/null +++ b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/JdbcTypeSerializer.java @@ -0,0 +1,39 @@ +/* + * Copyright © 2024 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl.connector.postgresql.type; + +import org.apache.flink.table.types.logical.LogicalType; + +public interface JdbcTypeSerializer { + + String getDialectId(); + + Class getConversionClass(); + + String dialectTypeName(); + + GenericDeserializationConverter getDeserializerConverter(); + + GenericSerializationConverter getSerializerConverter(LogicalType type); + + interface GenericSerializationConverter { + T create(); + } + + interface GenericDeserializationConverter { + T create(); + } +} diff --git a/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/PostgresArrayTypeConverter.java b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/PostgresArrayTypeConverter.java new file mode 100644 index 00000000..a4c9b835 --- /dev/null +++ b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/PostgresArrayTypeConverter.java @@ -0,0 +1,77 @@ +/* + * Copyright © 2024 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl.connector.postgresql.type; + +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.LogicalType; + +public class PostgresArrayTypeConverter { + + /** Return the base array type for flink type */ + public static String getArrayScalarName(LogicalType type) { + switch (type.getTypeRoot()) { + case CHAR: + case VARCHAR: + return "text"; + case BOOLEAN: + return "boolean"; + case BINARY: + case VARBINARY: + return "bytea"; + case DECIMAL: + return "decimal"; + case TINYINT: + return "smallint"; + case SMALLINT: + return "smallint"; + case INTEGER: + return "integer"; + case BIGINT: + return "bigint"; + case FLOAT: + return "real"; // PostgreSQL uses REAL for float + case DOUBLE: + return "double"; + case DATE: + return "date"; + case TIME_WITHOUT_TIME_ZONE: + return "time without time zone"; + case TIMESTAMP_WITHOUT_TIME_ZONE: + return "timestamp without time zone"; + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return "timestamptz"; + case INTERVAL_YEAR_MONTH: + return "interval year to month"; + case INTERVAL_DAY_TIME: + return "interval day to second"; + case NULL: + return "void"; + case ARRAY: + return getArrayScalarName(((ArrayType) type).getElementType()); + case MULTISET: + case MAP: + case ROW: + case DISTINCT_TYPE: + case STRUCTURED_TYPE: + case RAW: + case SYMBOL: + case UNRESOLVED: + default: + throw new RuntimeException("Cannot convert type to array type"); + } + } +} diff --git a/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/PostgresJsonTypeSerializer.java b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/PostgresJsonTypeSerializer.java new file mode 100644 index 00000000..c3b0458b --- /dev/null +++ b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/PostgresJsonTypeSerializer.java @@ -0,0 +1,76 @@ +/* + * Copyright © 2024 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl.connector.postgresql.type; + +import com.datasqrl.types.json.FlinkJsonType; +import com.datasqrl.types.json.FlinkJsonTypeSerializer; +import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.JdbcDeserializationConverter; +import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.JdbcSerializationConverter; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.types.logical.LogicalType; +import org.postgresql.util.PGobject; + +public class PostgresJsonTypeSerializer + implements JdbcTypeSerializer { + + @Override + public String getDialectId() { + return "postgres"; + } + + @Override + public Class getConversionClass() { + return FlinkJsonType.class; + } + + @Override + public String dialectTypeName() { + return "jsonb"; + } + + @Override + public GenericDeserializationConverter getDeserializerConverter() { + return () -> + (val) -> { + FlinkJsonType t = (FlinkJsonType) val; + return t.getJson(); + }; + } + + @Override + public GenericSerializationConverter getSerializerConverter( + LogicalType type) { + FlinkJsonTypeSerializer typeSerializer = new FlinkJsonTypeSerializer(); + + return () -> + (val, index, statement) -> { + if (val != null && !val.isNullAt(index)) { + PGobject pgObject = new PGobject(); + pgObject.setType("json"); + RawValueData object = val.getRawValue(index); + FlinkJsonType vec = object.toObject(typeSerializer); + if (vec == null) { + statement.setObject(index, null); + } else { + pgObject.setValue(vec.getJson().toString()); + statement.setObject(index, pgObject); + } + } else { + statement.setObject(index, null); + } + }; + } +} diff --git a/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/PostgresRowTypeSerializer.java b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/PostgresRowTypeSerializer.java new file mode 100644 index 00000000..c7b2792e --- /dev/null +++ b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/PostgresRowTypeSerializer.java @@ -0,0 +1,82 @@ +/* + * Copyright © 2024 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl.connector.postgresql.type; + +import com.datasqrl.flink.format.json.SqrlRowDataToJsonConverters; +import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.JdbcDeserializationConverter; +import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.JdbcSerializationConverter; +import org.apache.flink.formats.common.TimestampFormat; +import org.apache.flink.formats.json.JsonFormatOptions.MapNullKeyMode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.Row; +import org.postgresql.util.PGobject; + +public class PostgresRowTypeSerializer + implements JdbcTypeSerializer { + + @Override + public String getDialectId() { + return "postgres"; + } + + @Override + public Class getConversionClass() { + return Row[].class; + } + + @Override + public String dialectTypeName() { + return "jsonb"; + } + + @Override + public GenericDeserializationConverter getDeserializerConverter() { + return () -> { + return (val) -> null; + }; + } + + @Override + public GenericSerializationConverter getSerializerConverter( + LogicalType type) { + ObjectMapper mapper = new ObjectMapper(); + return () -> + (val, index, statement) -> { + if (val != null && !val.isNullAt(index)) { + SqrlRowDataToJsonConverters rowDataToJsonConverter = + new SqrlRowDataToJsonConverters(TimestampFormat.SQL, MapNullKeyMode.DROP, "null"); + + ArrayType arrayType = (ArrayType) type; + ObjectNode objectNode = mapper.createObjectNode(); + JsonNode convert = + rowDataToJsonConverter + .createConverter(arrayType.getElementType()) + .convert(mapper, objectNode, val); + + PGobject pgObject = new PGobject(); + pgObject.setType("json"); + pgObject.setValue(convert.toString()); + statement.setObject(index, pgObject); + } else { + statement.setObject(index, null); + } + }; + } +} diff --git a/connectors/postgresql-connector/src/main/resources/META-INF/services/com.datasqrl.connector.postgresql.type.JdbcTypeSerializer b/connectors/postgresql-connector/src/main/resources/META-INF/services/com.datasqrl.connector.postgresql.type.JdbcTypeSerializer new file mode 100644 index 00000000..0d0b9e67 --- /dev/null +++ b/connectors/postgresql-connector/src/main/resources/META-INF/services/com.datasqrl.connector.postgresql.type.JdbcTypeSerializer @@ -0,0 +1,2 @@ +com.datasqrl.connector.postgresql.type.PostgresRowTypeSerializer +com.datasqrl.connector.postgresql.type.PostgresJsonTypeSerializer \ No newline at end of file diff --git a/connectors/postgresql-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/connectors/postgresql-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 00000000..eeae9c1d --- /dev/null +++ b/connectors/postgresql-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1 @@ +com.datasqrl.connector.postgresql.jdbc.SqrlJdbcDynamicTableFactory \ No newline at end of file diff --git a/connectors/postgresql-connector/src/test/java/com/datasqrl/connector/postgresql/jdbc/FlinkJdbcTest.java b/connectors/postgresql-connector/src/test/java/com/datasqrl/connector/postgresql/jdbc/FlinkJdbcTest.java new file mode 100644 index 00000000..1f87d778 --- /dev/null +++ b/connectors/postgresql-connector/src/test/java/com/datasqrl/connector/postgresql/jdbc/FlinkJdbcTest.java @@ -0,0 +1,106 @@ +/* + * Copyright © 2024 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl.connector.postgresql.jdbc; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.testcontainers.containers.PostgreSQLContainer; + +@ExtendWith(MiniClusterExtension.class) +public class FlinkJdbcTest { + + @Test + public void testWriteAndReadToPostgres() throws Exception { + try (PostgreSQLContainer postgresContainer = new PostgreSQLContainer<>("postgres:14")) { + postgresContainer.start(); + try (Connection conn = + DriverManager.getConnection( + postgresContainer.getJdbcUrl(), + postgresContainer.getUsername(), + postgresContainer.getPassword()); + Statement stmt = conn.createStatement()) { + String createTableSQL = "CREATE TABLE test_table (" + " id BIGINT, name VARCHAR " + ")"; + stmt.executeUpdate(createTableSQL); + } + + // Set up Flink mini cluster environment + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); + + // Create a PostgreSQL table using the Table API + tEnv.executeSql( + "CREATE TABLE test_table (" + + "id BIGINT," + + "name STRING" + + ") WITH (" + + "'connector' = 'jdbc'," + + "'url' = '" + + postgresContainer.getJdbcUrl() + + "'," + + "'table-name' = 'test_table'," + + "'username' = '" + + postgresContainer.getUsername() + + "'," + + "'password' = '" + + postgresContainer.getPassword() + + "'" + + ")"); + + // Create a DataGen source to generate 10 rows of data + tEnv.executeSql( + "CREATE TABLE datagen_source (" + + "id BIGINT," + + "name STRING" + + ") WITH (" + + "'connector' = 'datagen'," + + "'rows-per-second' = '1'," + + "'fields.id.kind' = 'sequence'," + + "'fields.id.start' = '1'," + + "'fields.id.end' = '10'," + + "'fields.name.length' = '10'" + + ")"); + + // Insert data from the DataGen source into the PostgreSQL table + tEnv.executeSql("INSERT INTO test_table SELECT * FROM datagen_source").await(); + + // Verify the data has been inserted by querying the PostgreSQL database directly + Connection connection = postgresContainer.createConnection(""); + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery("SELECT COUNT(*) FROM test_table"); + + int count = 0; + if (resultSet.next()) { + count = resultSet.getInt(1); + } + + // Validate that 10 rows were inserted + assertEquals(10, count); + + connection.close(); + } + } +} diff --git a/flink-sql-runner/pom.xml b/flink-sql-runner/pom.xml index b13747de..4fcb9afe 100644 --- a/flink-sql-runner/pom.xml +++ b/flink-sql-runner/pom.xml @@ -202,6 +202,38 @@ runtime test + + + + ${project.groupId} + flexible-csv-format + ${project.version} + runtime + + + ${project.groupId} + flexible-json-format + ${project.version} + runtime + + + ${project.groupId} + system-functions-discovery + ${project.version} + runtime + + + ${project.groupId} + vector-type + ${project.version} + runtime + + + ${project.groupId} + postgresql-connector + ${project.version} + runtime + diff --git a/pom.xml b/pom.xml index 792f14c4..e8054099 100644 --- a/pom.xml +++ b/pom.xml @@ -53,6 +53,7 @@ formats functions types + connectors testing flink-sql-runner diff --git a/types/pom.xml b/types/pom.xml index 37740453..b2b72bc4 100644 --- a/types/pom.xml +++ b/types/pom.xml @@ -31,6 +31,7 @@ json-type + vector-type diff --git a/types/vector-type/pom.xml b/types/vector-type/pom.xml new file mode 100644 index 00000000..f54137a3 --- /dev/null +++ b/types/vector-type/pom.xml @@ -0,0 +1,38 @@ + + + + 4.0.0 + + + com.datasqrl.flinkrunner + types + 1.0.0-SNAPSHOT + + + vector-type + + + + org.apache.flink + flink-table-common + ${flink.version} + provided + + + diff --git a/types/vector-type/src/main/java/com/datasqrl/types/vector/FlinkVectorType.java b/types/vector-type/src/main/java/com/datasqrl/types/vector/FlinkVectorType.java new file mode 100644 index 00000000..87dad7d2 --- /dev/null +++ b/types/vector-type/src/main/java/com/datasqrl/types/vector/FlinkVectorType.java @@ -0,0 +1,34 @@ +/* + * Copyright © 2024 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl.types.vector; + +import org.apache.flink.table.annotation.DataTypeHint; + +@DataTypeHint( + value = "RAW", + bridgedTo = FlinkVectorType.class, + rawSerializer = FlinkVectorTypeSerializer.class) +public class FlinkVectorType { + public double[] value; + + public FlinkVectorType(double[] value) { + this.value = value; + } + + public double[] getValue() { + return value; + } +} diff --git a/types/vector-type/src/main/java/com/datasqrl/types/vector/FlinkVectorTypeSerializer.java b/types/vector-type/src/main/java/com/datasqrl/types/vector/FlinkVectorTypeSerializer.java new file mode 100644 index 00000000..cbac8467 --- /dev/null +++ b/types/vector-type/src/main/java/com/datasqrl/types/vector/FlinkVectorTypeSerializer.java @@ -0,0 +1,99 @@ +/* + * Copyright © 2024 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl.types.vector; + +import java.io.IOException; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +public class FlinkVectorTypeSerializer extends TypeSerializer { + + @Override + public boolean isImmutableType() { + return true; + } + + @Override + public FlinkVectorType createInstance() { + return new FlinkVectorType(null); + } + + @Override + public FlinkVectorType copy(FlinkVectorType from) { + return new FlinkVectorType(from.getValue()); + } + + @Override + public FlinkVectorType copy(FlinkVectorType from, FlinkVectorType reuse) { + return copy(from); + } + + @Override + public int getLength() { + return -1; // indicates that this serializer does not have a fixed length + } + + @Override + public void serialize(FlinkVectorType record, DataOutputView target) throws IOException { + target.writeInt(record.getValue().length); // First write the length of the array + for (double v : record.getValue()) { + target.writeDouble(v); // Write each double value + } + } + + @Override + public FlinkVectorType deserialize(DataInputView source) throws IOException { + int length = source.readInt(); + double[] array = new double[length]; + for (int i = 0; i < length; i++) { + array[i] = source.readDouble(); + } + return new FlinkVectorType(array); + } + + @Override + public FlinkVectorType deserialize(FlinkVectorType reuse, DataInputView source) + throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + target.writeUTF(source.readUTF()); + } + + @Override + public TypeSerializer duplicate() { + return this; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof FlinkVectorTypeSerializer; + } + + @Override + public int hashCode() { + return FlinkVectorTypeSerializer.class.hashCode(); + } + + @Override + public TypeSerializerSnapshot snapshotConfiguration() { + return new FlinkVectorTypeSerializerSnapshot(); + } +} diff --git a/types/vector-type/src/main/java/com/datasqrl/types/vector/FlinkVectorTypeSerializerSnapshot.java b/types/vector-type/src/main/java/com/datasqrl/types/vector/FlinkVectorTypeSerializerSnapshot.java new file mode 100644 index 00000000..c90bfa4c --- /dev/null +++ b/types/vector-type/src/main/java/com/datasqrl/types/vector/FlinkVectorTypeSerializerSnapshot.java @@ -0,0 +1,74 @@ +/* + * Copyright © 2024 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl.types.vector; + +import java.io.IOException; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +public class FlinkVectorTypeSerializerSnapshot implements TypeSerializerSnapshot { + + private Class serializerClass; + + public FlinkVectorTypeSerializerSnapshot() { + this.serializerClass = FlinkVectorTypeSerializer.class; + } + + @Override + public int getCurrentVersion() { + return 1; + } + + @Override + public void writeSnapshot(DataOutputView out) throws IOException { + out.writeUTF(FlinkVectorTypeSerializer.class.getName()); + } + + @Override + public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) + throws IOException { + String className = in.readUTF(); + try { + this.serializerClass = + (Class) Class.forName(className, true, userCodeClassLoader); + } catch (ClassNotFoundException e) { + throw new IOException("Failed to find serializer class: " + className, e); + } + } + + @Override + public TypeSerializer restoreSerializer() { + try { + return serializerClass.newInstance(); + } catch (InstantiationException | IllegalAccessException e) { + throw new RuntimeException( + "Failed to instantiate serializer class: " + serializerClass.getName(), e); + } + } + + @Override + public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( + TypeSerializer newSerializer) { + if (newSerializer.getClass() == this.serializerClass) { + return TypeSerializerSchemaCompatibility.compatibleAsIs(); + } else { + return TypeSerializerSchemaCompatibility.incompatible(); + } + } +}