Skip to content

Commit b5e4740

Browse files
authored
Merge pull request #77 from DataSQRL/vector
Bring json and vector functions from sqrl project
2 parents 112a459 + 620761e commit b5e4740

File tree

34 files changed

+2545
-35
lines changed

34 files changed

+2545
-35
lines changed

connectors/postgresql-connector/pom.xml

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,25 @@
5757
<scope>provided</scope>
5858
</dependency>
5959
<dependency>
60-
<groupId>com.datasqrl.flinkrunner</groupId>
60+
<groupId>${project.groupId}</groupId>
61+
<artifactId>json-type</artifactId>
62+
<version>${project.version}</version>
63+
</dependency>
64+
<dependency>
65+
<groupId>${project.groupId}</groupId>
66+
<artifactId>vector-type</artifactId>
67+
<version>${project.version}</version>
68+
</dependency>
69+
<dependency>
70+
<groupId>${project.groupId}</groupId>
6171
<artifactId>flexible-json-format</artifactId>
62-
<version>1.0.0-SNAPSHOT</version>
72+
<version>${project.version}</version>
73+
</dependency>
74+
<dependency>
75+
<groupId>${project.groupId}</groupId>
76+
<artifactId>system-functions-discovery</artifactId>
77+
<version>${project.version}</version>
78+
<scope>provided</scope>
6379
</dependency>
6480
<dependency>
6581
<groupId>org.apache.flink</groupId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright © 2024 DataSQRL (contact@datasqrl.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datasqrl.connector.postgresql.type;
17+
18+
import com.datasqrl.connector.postgresql.type.JdbcTypeSerializer.GenericDeserializationConverter;
19+
import com.datasqrl.connector.postgresql.type.JdbcTypeSerializer.GenericSerializationConverter;
20+
import com.datasqrl.types.vector.FlinkVectorType;
21+
import com.datasqrl.types.vector.FlinkVectorTypeSerializer;
22+
import java.util.Arrays;
23+
import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.JdbcDeserializationConverter;
24+
import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.JdbcSerializationConverter;
25+
import org.apache.flink.table.data.RawValueData;
26+
import org.apache.flink.table.types.logical.LogicalType;
27+
import org.postgresql.util.PGobject;
28+
29+
public class PostgresVectorTypeSerializer
30+
implements JdbcTypeSerializer<JdbcDeserializationConverter, JdbcSerializationConverter> {
31+
32+
@Override
33+
public String getDialectId() {
34+
return "postgres";
35+
}
36+
37+
@Override
38+
public Class getConversionClass() {
39+
return FlinkVectorType.class;
40+
}
41+
42+
@Override
43+
public String dialectTypeName() {
44+
return "vector";
45+
}
46+
47+
@Override
48+
public GenericDeserializationConverter<JdbcDeserializationConverter> getDeserializerConverter() {
49+
return () ->
50+
(val) -> {
51+
FlinkVectorType t = (FlinkVectorType) val;
52+
return t.getValue();
53+
};
54+
}
55+
56+
@Override
57+
public GenericSerializationConverter<JdbcSerializationConverter> getSerializerConverter(
58+
LogicalType type) {
59+
FlinkVectorTypeSerializer flinkVectorTypeSerializer = new FlinkVectorTypeSerializer();
60+
return () ->
61+
(val, index, statement) -> {
62+
if (val != null && !val.isNullAt(index)) {
63+
RawValueData<FlinkVectorType> object = val.getRawValue(index);
64+
FlinkVectorType vec = object.toObject(flinkVectorTypeSerializer);
65+
66+
if (vec != null) {
67+
PGobject pgObject = new PGobject();
68+
pgObject.setType("vector");
69+
pgObject.setValue(Arrays.toString(vec.getValue()));
70+
statement.setObject(index, pgObject);
71+
return;
72+
}
73+
}
74+
statement.setObject(index, null);
75+
};
76+
}
77+
}
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
com.datasqrl.connector.postgresql.type.PostgresRowTypeSerializer
2-
com.datasqrl.connector.postgresql.type.PostgresJsonTypeSerializer
2+
com.datasqrl.connector.postgresql.type.PostgresJsonTypeSerializer
3+
com.datasqrl.connector.postgresql.type.PostgresVectorTypeSerializer

connectors/postgresql-connector/src/test/java/com/datasqrl/connector/postgresql/jdbc/FlinkJdbcTest.java

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,21 @@
1717

1818
import static org.junit.jupiter.api.Assertions.assertEquals;
1919

20+
import com.datasqrl.types.json.FlinkJsonTypeSerializer;
21+
import com.datasqrl.types.json.FlinkJsonTypeSerializerSnapshot;
22+
import java.io.IOException;
2023
import java.sql.Connection;
2124
import java.sql.DriverManager;
2225
import java.sql.ResultSet;
2326
import java.sql.Statement;
27+
import org.apache.flink.core.memory.DataInputDeserializer;
28+
import org.apache.flink.core.memory.DataOutputSerializer;
2429
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
2530
import org.apache.flink.table.api.EnvironmentSettings;
31+
import org.apache.flink.table.api.ResultKind;
32+
import org.apache.flink.table.api.TableResult;
2633
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
34+
import org.apache.flink.table.utils.EncodingUtils;
2735
import org.apache.flink.test.junit5.MiniClusterExtension;
2836
import org.junit.jupiter.api.Test;
2937
import org.junit.jupiter.api.extension.ExtendWith;
@@ -32,6 +40,82 @@
3240
@ExtendWith(MiniClusterExtension.class)
3341
public class FlinkJdbcTest {
3442

43+
public static void main(String[] args) throws IOException {
44+
var input =
45+
new DataInputDeserializer(
46+
EncodingUtils.decodeBase64ToBytes(
47+
"ADFjb20uZGF0YXNxcmwuanNvbi5GbGlua0pzb25UeXBlU2VyaWFsaXplclNuYXBzaG90AAAAAQApY29tLmRhdGFzcXJsLmpzb24uRmxpbmtKc29uVHlwZVNlcmlhbGl6ZXI="));
48+
System.out.println(input.readUTF());
49+
System.out.println(input.readInt());
50+
System.out.println(input.readUTF());
51+
52+
var output = new DataOutputSerializer(53);
53+
output.writeUTF(FlinkJsonTypeSerializerSnapshot.class.getName());
54+
output.writeInt(1);
55+
output.writeUTF(FlinkJsonTypeSerializer.class.getName());
56+
System.out.println(EncodingUtils.encodeBytesToBase64(output.getSharedBuffer()));
57+
}
58+
59+
@Test
60+
public void testFlinkWithPostgres() throws Exception {
61+
// Start PostgreSQL container
62+
try (PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:14")) {
63+
postgres.start();
64+
// Establish a connection and create the PostgreSQL table
65+
try (Connection conn =
66+
DriverManager.getConnection(
67+
postgres.getJdbcUrl(), postgres.getUsername(), postgres.getPassword());
68+
Statement stmt = conn.createStatement()) {
69+
String createTableSQL = "CREATE TABLE test_table (" + " \"arrayOfRows\" JSONB " + ")";
70+
stmt.executeUpdate(createTableSQL);
71+
}
72+
73+
// Set up Flink environment
74+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
75+
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
76+
77+
// Define the schema
78+
String createSourceTable =
79+
"CREATE TABLE datagen_source ("
80+
+ " arrayOfRows ARRAY<ROW<field1 INT, field2 STRING>> "
81+
+ ") WITH ("
82+
+ " 'connector' = 'datagen',"
83+
+ " 'number-of-rows' = '10'"
84+
+ ")";
85+
86+
String createSinkTable =
87+
"CREATE TABLE jdbc_sink ("
88+
+ " arrayOfRows RAW('com.datasqrl.types.json.FlinkJsonType', 'ADdjb20uZGF0YXNxcmwudHlwZXMuanNvbi5GbGlua0pzb25UeXBlU2VyaWFsaXplclNuYXBzaG90AAAAAQAvY29tLmRhdGFzcXJsLnR5cGVzLmpzb24uRmxpbmtKc29uVHlwZVNlcmlhbGl6ZXI=') "
89+
+ ") WITH ("
90+
+ " 'connector' = 'jdbc-sqrl', "
91+
+ " 'url' = '"
92+
+ postgres.getJdbcUrl()
93+
+ "', "
94+
+ " 'table-name' = 'test_table', "
95+
+ " 'username' = '"
96+
+ postgres.getUsername()
97+
+ "', "
98+
+ " 'password' = '"
99+
+ postgres.getPassword()
100+
+ "'"
101+
+ ")";
102+
103+
// Register tables in the environment
104+
tableEnv.executeSql(
105+
"CREATE TEMPORARY FUNCTION IF NOT EXISTS `tojson` AS 'com.datasqrl.types.json.functions.ToJson' LANGUAGE JAVA");
106+
tableEnv.executeSql(createSourceTable);
107+
tableEnv.executeSql(createSinkTable);
108+
109+
// Set up a simple Flink job
110+
TableResult tableResult =
111+
tableEnv.executeSql(
112+
"INSERT INTO jdbc_sink SELECT tojson(arrayOfRows) AS arrayOfRows FROM datagen_source");
113+
tableResult.print();
114+
115+
assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind());
116+
}
117+
}
118+
35119
@Test
36120
public void testWriteAndReadToPostgres() throws Exception {
37121
try (PostgreSQLContainer<?> postgresContainer = new PostgreSQLContainer<>("postgres:14")) {

flink-sql-runner/pom.xml

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -202,38 +202,38 @@
202202
<classifier>runtime</classifier>
203203
<scope>test</scope>
204204
</dependency>
205-
205+
206206
<!-- project dependencies-->
207207
<dependency>
208-
<groupId>${project.groupId}</groupId>
209-
<artifactId>flexible-csv-format</artifactId>
210-
<version>${project.version}</version>
211-
<scope>runtime</scope>
212-
</dependency>
213-
<dependency>
214-
<groupId>${project.groupId}</groupId>
215-
<artifactId>flexible-json-format</artifactId>
216-
<version>${project.version}</version>
217-
<scope>runtime</scope>
218-
</dependency>
219-
<dependency>
220-
<groupId>${project.groupId}</groupId>
221-
<artifactId>system-functions-discovery</artifactId>
222-
<version>${project.version}</version>
223-
<scope>runtime</scope>
224-
</dependency>
225-
<dependency>
226-
<groupId>${project.groupId}</groupId>
227-
<artifactId>vector-type</artifactId>
228-
<version>${project.version}</version>
229-
<scope>runtime</scope>
230-
</dependency>
231-
<dependency>
232-
<groupId>${project.groupId}</groupId>
233-
<artifactId>postgresql-connector</artifactId>
234-
<version>${project.version}</version>
235-
<scope>runtime</scope>
236-
</dependency>
208+
<groupId>${project.groupId}</groupId>
209+
<artifactId>flexible-csv-format</artifactId>
210+
<version>${project.version}</version>
211+
<scope>runtime</scope>
212+
</dependency>
213+
<dependency>
214+
<groupId>${project.groupId}</groupId>
215+
<artifactId>flexible-json-format</artifactId>
216+
<version>${project.version}</version>
217+
<scope>runtime</scope>
218+
</dependency>
219+
<dependency>
220+
<groupId>${project.groupId}</groupId>
221+
<artifactId>system-functions-discovery</artifactId>
222+
<version>${project.version}</version>
223+
<scope>runtime</scope>
224+
</dependency>
225+
<dependency>
226+
<groupId>${project.groupId}</groupId>
227+
<artifactId>vector-type</artifactId>
228+
<version>${project.version}</version>
229+
<scope>runtime</scope>
230+
</dependency>
231+
<dependency>
232+
<groupId>${project.groupId}</groupId>
233+
<artifactId>postgresql-connector</artifactId>
234+
<version>${project.version}</version>
235+
<scope>runtime</scope>
236+
</dependency>
237237
</dependencies>
238238

239239
<build>

pom.xml

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,57 @@
450450
</activation>
451451
<build>
452452
<directory>${project.basedir}/m2e-target</directory>
453+
<plugins>
454+
<plugin>
455+
<groupId>org.codehaus.mojo</groupId>
456+
<artifactId>build-helper-maven-plugin</artifactId>
457+
<version>3.4.0</version>
458+
<executions>
459+
<execution>
460+
<id>add-source</id>
461+
<goals>
462+
<goal>add-source</goal>
463+
</goals>
464+
<phase>generate-sources</phase>
465+
<configuration>
466+
<sources>
467+
<source>target/generated-sources/annotations</source>
468+
<source>target/generated-sources/java</source>
469+
</sources>
470+
</configuration>
471+
</execution>
472+
<execution>
473+
<id>add-google-auto</id>
474+
<goals>
475+
<goal>add-resource</goal>
476+
</goals>
477+
<phase>generate-sources</phase>
478+
<configuration>
479+
<resources>
480+
<resource>
481+
<directory>target/classes</directory>
482+
<excludes>
483+
<exclude>**/*.class</exclude>
484+
</excludes>
485+
</resource>
486+
</resources>
487+
</configuration>
488+
</execution>
489+
<execution>
490+
<id>add-test-source</id>
491+
<goals>
492+
<goal>add-test-source</goal>
493+
</goals>
494+
<phase>generate-test-sources</phase>
495+
<configuration>
496+
<sources>
497+
<source>target/generated-test-sources/test-annotations</source>
498+
</sources>
499+
</configuration>
500+
</execution>
501+
</executions>
502+
</plugin>
503+
</plugins>
453504
</build>
454505
</profile>
455506
<profile>

testing/system-functions-sample/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@
3535
<scope>provided</scope>
3636
</dependency>
3737
<dependency>
38-
<groupId>com.datasqrl.flinkrunner</groupId>
38+
<groupId>${project.groupId}</groupId>
3939
<artifactId>system-functions-discovery</artifactId>
40-
<version>1.0.0-SNAPSHOT</version>
40+
<version>${project.version}</version>
4141
<scope>provided</scope>
4242
</dependency>
4343
<dependency>

types/json-type/pom.xml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,27 @@
3434
<version>${flink.version}</version>
3535
<scope>provided</scope>
3636
</dependency>
37+
<dependency>
38+
<groupId>org.apache.flink</groupId>
39+
<artifactId>flink-table-runtime</artifactId>
40+
<version>${flink.version}</version>
41+
<scope>provided</scope>
42+
</dependency>
43+
<dependency>
44+
<groupId>${project.groupId}</groupId>
45+
<artifactId>system-functions-discovery</artifactId>
46+
<version>${project.version}</version>
47+
<scope>provided</scope>
48+
</dependency>
49+
<dependency>
50+
<groupId>com.jayway.jsonpath</groupId>
51+
<artifactId>json-path</artifactId>
52+
<version>2.8.0</version>
53+
</dependency>
54+
<dependency>
55+
<groupId>com.google.auto.service</groupId>
56+
<artifactId>auto-service</artifactId>
57+
<version>1.1.1</version>
58+
</dependency>
3759
</dependencies>
3860
</project>

0 commit comments

Comments
 (0)