Skip to content

Extracted vector-type from sqrl project #76

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions connectors/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--

Copyright © 2024 DataSQRL (contact@datasqrl.com)

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.datasqrl.flinkrunner</groupId>
<artifactId>flink-sql-runner-parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>

<artifactId>connectors</artifactId>
<packaging>pom</packaging>

<modules>
<module>postgresql-connector</module>
</modules>

</project>
90 changes: 90 additions & 0 deletions connectors/postgresql-connector/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--

Copyright © 2024 DataSQRL (contact@datasqrl.com)

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.datasqrl.flinkrunner</groupId>
<artifactId>connectors</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>

<artifactId>postgresql-connector</artifactId>
<description>Jdbc sink for flink 1.19</description>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.2.0-1.19</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${postgres.version}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.datasqrl.flinkrunner</groupId>
<artifactId>flexible-json-format</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright © 2024 DataSQRL (contact@datasqrl.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datasqrl.connector.postgresql.jdbc;

import static com.datasqrl.connector.postgresql.type.FlinkArrayTypeUtil.getBaseFlinkArrayType;
import static com.datasqrl.connector.postgresql.type.FlinkArrayTypeUtil.isScalarArray;
import static com.datasqrl.connector.postgresql.type.PostgresArrayTypeConverter.getArrayScalarName;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE;

import java.sql.Array;
import java.sql.PreparedStatement;
import java.sql.Timestamp;
import java.sql.Types;
import java.time.LocalDateTime;
import lombok.SneakyThrows;
import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.binary.BinaryArrayData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;

/** A sqrl class to handle arrays and extra data types */
public abstract class SqrlBaseJdbcRowConverter extends AbstractJdbcRowConverter {

public SqrlBaseJdbcRowConverter(RowType rowType) {
super(rowType);
}

@Override
protected JdbcSerializationConverter wrapIntoNullableExternalConverter(
JdbcSerializationConverter jdbcSerializationConverter, LogicalType type) {
if (type.getTypeRoot() == TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
int timestampWithTimezone = Types.TIMESTAMP_WITH_TIMEZONE;
return (val, index, statement) -> {
if (val == null || val.isNullAt(index) || LogicalTypeRoot.NULL.equals(type.getTypeRoot())) {
statement.setNull(index, timestampWithTimezone);
} else {
jdbcSerializationConverter.serialize(val, index, statement);
}
};
}
return super.wrapIntoNullableExternalConverter(jdbcSerializationConverter, type);
}

@Override
public JdbcDeserializationConverter createInternalConverter(LogicalType type) {
LogicalTypeRoot root = type.getTypeRoot();

if (root == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
return val ->
val instanceof LocalDateTime
? TimestampData.fromLocalDateTime((LocalDateTime) val)
: TimestampData.fromTimestamp((Timestamp) val);
} else if (root == LogicalTypeRoot.ARRAY) {
ArrayType arrayType = (ArrayType) type;
return createArrayConverter(arrayType);
} else if (root == LogicalTypeRoot.ROW) {
return val -> val;
} else if (root == LogicalTypeRoot.MAP) {
return val -> val;
} else {
return super.createInternalConverter(type);
}
}

@Override
protected JdbcSerializationConverter createExternalConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
final int tsPrecision = ((LocalZonedTimestampType) type).getPrecision();
return (val, index, statement) ->
statement.setTimestamp(index, val.getTimestamp(index, tsPrecision).toTimestamp());
case MULTISET:
case RAW:
default:
return super.createExternalConverter(type);
}
}

@SneakyThrows
private void createSqlArrayObject(
LogicalType type, ArrayData data, int idx, PreparedStatement statement) {
// Scalar arrays of any dimension are one array call
if (isScalarArray(type)) {
Object[] boxed;
if (data instanceof GenericArrayData) {
boxed = ((GenericArrayData) data).toObjectArray();
} else if (data instanceof BinaryArrayData) {
boxed = ((BinaryArrayData) data).toObjectArray(getBaseFlinkArrayType(type));
} else {
throw new RuntimeException("Unsupported ArrayData type: " + data.getClass());
}
Array array = statement.getConnection().createArrayOf(getArrayScalarName(type), boxed);
statement.setArray(idx, array);
} else {
// If it is not a scalar array (e.g. row type), use an empty byte array.
Array array = statement.getConnection().createArrayOf(getArrayType(), new Byte[0]);
statement.setArray(idx, array);
}
}

protected abstract String getArrayType();

public abstract JdbcDeserializationConverter createArrayConverter(ArrayType arrayType);
}
Loading