Skip to content

Commit 112a459

Browse files
authored
Merge pull request #76 from DataSQRL/vector
Extracted vector-type from sqrl project
2 parents e566543 + 730df0a commit 112a459

21 files changed

+1532
-0
lines changed

connectors/pom.xml

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
4+
Copyright © 2024 DataSQRL (contact@datasqrl.com)
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
18+
-->
19+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
21+
<modelVersion>4.0.0</modelVersion>
22+
23+
<parent>
24+
<groupId>com.datasqrl.flinkrunner</groupId>
25+
<artifactId>flink-sql-runner-parent</artifactId>
26+
<version>1.0.0-SNAPSHOT</version>
27+
</parent>
28+
29+
<artifactId>connectors</artifactId>
30+
<packaging>pom</packaging>
31+
32+
<modules>
33+
<module>postgresql-connector</module>
34+
</modules>
35+
36+
</project>
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
4+
Copyright © 2024 DataSQRL (contact@datasqrl.com)
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
18+
-->
19+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
<modelVersion>4.0.0</modelVersion>
21+
<parent>
22+
<groupId>com.datasqrl.flinkrunner</groupId>
23+
<artifactId>connectors</artifactId>
24+
<version>1.0.0-SNAPSHOT</version>
25+
</parent>
26+
27+
<artifactId>postgresql-connector</artifactId>
28+
<description>Jdbc sink for flink 1.19</description>
29+
30+
<dependencies>
31+
<dependency>
32+
<groupId>org.apache.flink</groupId>
33+
<artifactId>flink-connector-jdbc</artifactId>
34+
<version>3.2.0-1.19</version>
35+
</dependency>
36+
<dependency>
37+
<groupId>org.postgresql</groupId>
38+
<artifactId>postgresql</artifactId>
39+
<version>${postgres.version}</version>
40+
</dependency>
41+
<dependency>
42+
<groupId>org.testcontainers</groupId>
43+
<artifactId>postgresql</artifactId>
44+
<version>${testcontainers.version}</version>
45+
<scope>test</scope>
46+
</dependency>
47+
<dependency>
48+
<groupId>org.apache.flink</groupId>
49+
<artifactId>flink-table-runtime</artifactId>
50+
<version>${flink.version}</version>
51+
<scope>test</scope>
52+
</dependency>
53+
<dependency>
54+
<groupId>org.apache.flink</groupId>
55+
<artifactId>flink-table-common</artifactId>
56+
<version>${flink.version}</version>
57+
<scope>provided</scope>
58+
</dependency>
59+
<dependency>
60+
<groupId>com.datasqrl.flinkrunner</groupId>
61+
<artifactId>flexible-json-format</artifactId>
62+
<version>1.0.0-SNAPSHOT</version>
63+
</dependency>
64+
<dependency>
65+
<groupId>org.apache.flink</groupId>
66+
<artifactId>flink-csv</artifactId>
67+
<version>${flink.version}</version>
68+
<scope>provided</scope>
69+
</dependency>
70+
<dependency>
71+
<groupId>org.apache.flink</groupId>
72+
<artifactId>flink-json</artifactId>
73+
<version>${flink.version}</version>
74+
<scope>provided</scope>
75+
</dependency>
76+
<dependency>
77+
<groupId>org.apache.flink</groupId>
78+
<artifactId>flink-table-planner_2.12</artifactId>
79+
<version>${flink.version}</version>
80+
<scope>test</scope>
81+
</dependency>
82+
<dependency>
83+
<groupId>org.apache.flink</groupId>
84+
<artifactId>flink-test-utils</artifactId>
85+
<version>${flink.version}</version>
86+
<scope>test</scope>
87+
</dependency>
88+
</dependencies>
89+
90+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Copyright © 2024 DataSQRL (contact@datasqrl.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datasqrl.connector.postgresql.jdbc;
17+
18+
import static com.datasqrl.connector.postgresql.type.FlinkArrayTypeUtil.getBaseFlinkArrayType;
19+
import static com.datasqrl.connector.postgresql.type.FlinkArrayTypeUtil.isScalarArray;
20+
import static com.datasqrl.connector.postgresql.type.PostgresArrayTypeConverter.getArrayScalarName;
21+
import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
22+
23+
import java.sql.Array;
24+
import java.sql.PreparedStatement;
25+
import java.sql.Timestamp;
26+
import java.sql.Types;
27+
import java.time.LocalDateTime;
28+
import lombok.SneakyThrows;
29+
import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter;
30+
import org.apache.flink.table.data.ArrayData;
31+
import org.apache.flink.table.data.GenericArrayData;
32+
import org.apache.flink.table.data.TimestampData;
33+
import org.apache.flink.table.data.binary.BinaryArrayData;
34+
import org.apache.flink.table.types.logical.ArrayType;
35+
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
36+
import org.apache.flink.table.types.logical.LogicalType;
37+
import org.apache.flink.table.types.logical.LogicalTypeRoot;
38+
import org.apache.flink.table.types.logical.RowType;
39+
40+
/** A sqrl class to handle arrays and extra data types */
41+
public abstract class SqrlBaseJdbcRowConverter extends AbstractJdbcRowConverter {
42+
43+
public SqrlBaseJdbcRowConverter(RowType rowType) {
44+
super(rowType);
45+
}
46+
47+
@Override
48+
protected JdbcSerializationConverter wrapIntoNullableExternalConverter(
49+
JdbcSerializationConverter jdbcSerializationConverter, LogicalType type) {
50+
if (type.getTypeRoot() == TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
51+
int timestampWithTimezone = Types.TIMESTAMP_WITH_TIMEZONE;
52+
return (val, index, statement) -> {
53+
if (val == null || val.isNullAt(index) || LogicalTypeRoot.NULL.equals(type.getTypeRoot())) {
54+
statement.setNull(index, timestampWithTimezone);
55+
} else {
56+
jdbcSerializationConverter.serialize(val, index, statement);
57+
}
58+
};
59+
}
60+
return super.wrapIntoNullableExternalConverter(jdbcSerializationConverter, type);
61+
}
62+
63+
@Override
64+
public JdbcDeserializationConverter createInternalConverter(LogicalType type) {
65+
LogicalTypeRoot root = type.getTypeRoot();
66+
67+
if (root == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
68+
return val ->
69+
val instanceof LocalDateTime
70+
? TimestampData.fromLocalDateTime((LocalDateTime) val)
71+
: TimestampData.fromTimestamp((Timestamp) val);
72+
} else if (root == LogicalTypeRoot.ARRAY) {
73+
ArrayType arrayType = (ArrayType) type;
74+
return createArrayConverter(arrayType);
75+
} else if (root == LogicalTypeRoot.ROW) {
76+
return val -> val;
77+
} else if (root == LogicalTypeRoot.MAP) {
78+
return val -> val;
79+
} else {
80+
return super.createInternalConverter(type);
81+
}
82+
}
83+
84+
@Override
85+
protected JdbcSerializationConverter createExternalConverter(LogicalType type) {
86+
switch (type.getTypeRoot()) {
87+
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
88+
final int tsPrecision = ((LocalZonedTimestampType) type).getPrecision();
89+
return (val, index, statement) ->
90+
statement.setTimestamp(index, val.getTimestamp(index, tsPrecision).toTimestamp());
91+
case MULTISET:
92+
case RAW:
93+
default:
94+
return super.createExternalConverter(type);
95+
}
96+
}
97+
98+
@SneakyThrows
99+
private void createSqlArrayObject(
100+
LogicalType type, ArrayData data, int idx, PreparedStatement statement) {
101+
// Scalar arrays of any dimension are one array call
102+
if (isScalarArray(type)) {
103+
Object[] boxed;
104+
if (data instanceof GenericArrayData) {
105+
boxed = ((GenericArrayData) data).toObjectArray();
106+
} else if (data instanceof BinaryArrayData) {
107+
boxed = ((BinaryArrayData) data).toObjectArray(getBaseFlinkArrayType(type));
108+
} else {
109+
throw new RuntimeException("Unsupported ArrayData type: " + data.getClass());
110+
}
111+
Array array = statement.getConnection().createArrayOf(getArrayScalarName(type), boxed);
112+
statement.setArray(idx, array);
113+
} else {
114+
// If it is not a scalar array (e.g. row type), use an empty byte array.
115+
Array array = statement.getConnection().createArrayOf(getArrayType(), new Byte[0]);
116+
statement.setArray(idx, array);
117+
}
118+
}
119+
120+
protected abstract String getArrayType();
121+
122+
public abstract JdbcDeserializationConverter createArrayConverter(ArrayType arrayType);
123+
}

0 commit comments

Comments
 (0)