diff --git a/connectors/postgresql-connector/pom.xml b/connectors/postgresql-connector/pom.xml
index 605a046..84c493b 100644
--- a/connectors/postgresql-connector/pom.xml
+++ b/connectors/postgresql-connector/pom.xml
@@ -57,9 +57,25 @@
provided
- com.datasqrl.flinkrunner
+ ${project.groupId}
+ json-type
+ ${project.version}
+
+
+ ${project.groupId}
+ vector-type
+ ${project.version}
+
+
+ ${project.groupId}
flexible-json-format
- 1.0.0-SNAPSHOT
+ ${project.version}
+
+
+ ${project.groupId}
+ system-functions-discovery
+ ${project.version}
+ provided
org.apache.flink
diff --git a/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/PostgresVectorTypeSerializer.java b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/PostgresVectorTypeSerializer.java
new file mode 100644
index 0000000..e4cf05c
--- /dev/null
+++ b/connectors/postgresql-connector/src/main/java/com/datasqrl/connector/postgresql/type/PostgresVectorTypeSerializer.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright © 2024 DataSQRL (contact@datasqrl.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datasqrl.connector.postgresql.type;
+
+import com.datasqrl.connector.postgresql.type.JdbcTypeSerializer.GenericDeserializationConverter;
+import com.datasqrl.connector.postgresql.type.JdbcTypeSerializer.GenericSerializationConverter;
+import com.datasqrl.types.vector.FlinkVectorType;
+import com.datasqrl.types.vector.FlinkVectorTypeSerializer;
+import java.util.Arrays;
+import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.JdbcDeserializationConverter;
+import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.JdbcSerializationConverter;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.postgresql.util.PGobject;
+
+public class PostgresVectorTypeSerializer
+ implements JdbcTypeSerializer {
+
+ @Override
+ public String getDialectId() {
+ return "postgres";
+ }
+
+ @Override
+ public Class getConversionClass() {
+ return FlinkVectorType.class;
+ }
+
+ @Override
+ public String dialectTypeName() {
+ return "vector";
+ }
+
+ @Override
+ public GenericDeserializationConverter getDeserializerConverter() {
+ return () ->
+ (val) -> {
+ FlinkVectorType t = (FlinkVectorType) val;
+ return t.getValue();
+ };
+ }
+
+ @Override
+ public GenericSerializationConverter getSerializerConverter(
+ LogicalType type) {
+ FlinkVectorTypeSerializer flinkVectorTypeSerializer = new FlinkVectorTypeSerializer();
+ return () ->
+ (val, index, statement) -> {
+ if (val != null && !val.isNullAt(index)) {
+ RawValueData object = val.getRawValue(index);
+ FlinkVectorType vec = object.toObject(flinkVectorTypeSerializer);
+
+ if (vec != null) {
+ PGobject pgObject = new PGobject();
+ pgObject.setType("vector");
+ pgObject.setValue(Arrays.toString(vec.getValue()));
+ statement.setObject(index, pgObject);
+ return;
+ }
+ }
+ statement.setObject(index, null);
+ };
+ }
+}
diff --git a/connectors/postgresql-connector/src/main/resources/META-INF/services/com.datasqrl.connector.postgresql.type.JdbcTypeSerializer b/connectors/postgresql-connector/src/main/resources/META-INF/services/com.datasqrl.connector.postgresql.type.JdbcTypeSerializer
index 0d0b9e6..fe5d8c8 100644
--- a/connectors/postgresql-connector/src/main/resources/META-INF/services/com.datasqrl.connector.postgresql.type.JdbcTypeSerializer
+++ b/connectors/postgresql-connector/src/main/resources/META-INF/services/com.datasqrl.connector.postgresql.type.JdbcTypeSerializer
@@ -1,2 +1,3 @@
com.datasqrl.connector.postgresql.type.PostgresRowTypeSerializer
-com.datasqrl.connector.postgresql.type.PostgresJsonTypeSerializer
\ No newline at end of file
+com.datasqrl.connector.postgresql.type.PostgresJsonTypeSerializer
+com.datasqrl.connector.postgresql.type.PostgresVectorTypeSerializer
\ No newline at end of file
diff --git a/connectors/postgresql-connector/src/test/java/com/datasqrl/connector/postgresql/jdbc/FlinkJdbcTest.java b/connectors/postgresql-connector/src/test/java/com/datasqrl/connector/postgresql/jdbc/FlinkJdbcTest.java
index 1f87d77..1b901f9 100644
--- a/connectors/postgresql-connector/src/test/java/com/datasqrl/connector/postgresql/jdbc/FlinkJdbcTest.java
+++ b/connectors/postgresql-connector/src/test/java/com/datasqrl/connector/postgresql/jdbc/FlinkJdbcTest.java
@@ -17,13 +17,21 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
+import com.datasqrl.types.json.FlinkJsonTypeSerializer;
+import com.datasqrl.types.json.FlinkJsonTypeSerializerSnapshot;
+import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.ResultKind;
+import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.utils.EncodingUtils;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -32,6 +40,82 @@
@ExtendWith(MiniClusterExtension.class)
public class FlinkJdbcTest {
+ public static void main(String[] args) throws IOException {
+ var input =
+ new DataInputDeserializer(
+ EncodingUtils.decodeBase64ToBytes(
+ "ADFjb20uZGF0YXNxcmwuanNvbi5GbGlua0pzb25UeXBlU2VyaWFsaXplclNuYXBzaG90AAAAAQApY29tLmRhdGFzcXJsLmpzb24uRmxpbmtKc29uVHlwZVNlcmlhbGl6ZXI="));
+ System.out.println(input.readUTF());
+ System.out.println(input.readInt());
+ System.out.println(input.readUTF());
+
+ var output = new DataOutputSerializer(53);
+ output.writeUTF(FlinkJsonTypeSerializerSnapshot.class.getName());
+ output.writeInt(1);
+ output.writeUTF(FlinkJsonTypeSerializer.class.getName());
+ System.out.println(EncodingUtils.encodeBytesToBase64(output.getSharedBuffer()));
+ }
+
+ @Test
+ public void testFlinkWithPostgres() throws Exception {
+ // Start PostgreSQL container
+ try (PostgreSQLContainer> postgres = new PostgreSQLContainer<>("postgres:14")) {
+ postgres.start();
+ // Establish a connection and create the PostgreSQL table
+ try (Connection conn =
+ DriverManager.getConnection(
+ postgres.getJdbcUrl(), postgres.getUsername(), postgres.getPassword());
+ Statement stmt = conn.createStatement()) {
+ String createTableSQL = "CREATE TABLE test_table (" + " \"arrayOfRows\" JSONB " + ")";
+ stmt.executeUpdate(createTableSQL);
+ }
+
+ // Set up Flink environment
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+
+ // Define the schema
+ String createSourceTable =
+ "CREATE TABLE datagen_source ("
+ + " arrayOfRows ARRAY> "
+ + ") WITH ("
+ + " 'connector' = 'datagen',"
+ + " 'number-of-rows' = '10'"
+ + ")";
+
+ String createSinkTable =
+ "CREATE TABLE jdbc_sink ("
+ + " arrayOfRows RAW('com.datasqrl.types.json.FlinkJsonType', 'ADdjb20uZGF0YXNxcmwudHlwZXMuanNvbi5GbGlua0pzb25UeXBlU2VyaWFsaXplclNuYXBzaG90AAAAAQAvY29tLmRhdGFzcXJsLnR5cGVzLmpzb24uRmxpbmtKc29uVHlwZVNlcmlhbGl6ZXI=') "
+ + ") WITH ("
+ + " 'connector' = 'jdbc-sqrl', "
+ + " 'url' = '"
+ + postgres.getJdbcUrl()
+ + "', "
+ + " 'table-name' = 'test_table', "
+ + " 'username' = '"
+ + postgres.getUsername()
+ + "', "
+ + " 'password' = '"
+ + postgres.getPassword()
+ + "'"
+ + ")";
+
+ // Register tables in the environment
+ tableEnv.executeSql(
+ "CREATE TEMPORARY FUNCTION IF NOT EXISTS `tojson` AS 'com.datasqrl.types.json.functions.ToJson' LANGUAGE JAVA");
+ tableEnv.executeSql(createSourceTable);
+ tableEnv.executeSql(createSinkTable);
+
+ // Set up a simple Flink job
+ TableResult tableResult =
+ tableEnv.executeSql(
+ "INSERT INTO jdbc_sink SELECT tojson(arrayOfRows) AS arrayOfRows FROM datagen_source");
+ tableResult.print();
+
+ assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind());
+ }
+ }
+
@Test
public void testWriteAndReadToPostgres() throws Exception {
try (PostgreSQLContainer> postgresContainer = new PostgreSQLContainer<>("postgres:14")) {
diff --git a/flink-sql-runner/pom.xml b/flink-sql-runner/pom.xml
index 4fcb9af..ab11a2c 100644
--- a/flink-sql-runner/pom.xml
+++ b/flink-sql-runner/pom.xml
@@ -202,38 +202,38 @@
runtime
test
-
+
- ${project.groupId}
- flexible-csv-format
- ${project.version}
- runtime
-
-
- ${project.groupId}
- flexible-json-format
- ${project.version}
- runtime
-
-
- ${project.groupId}
- system-functions-discovery
- ${project.version}
- runtime
-
-
- ${project.groupId}
- vector-type
- ${project.version}
- runtime
-
-
- ${project.groupId}
- postgresql-connector
- ${project.version}
- runtime
-
+ ${project.groupId}
+ flexible-csv-format
+ ${project.version}
+ runtime
+
+
+ ${project.groupId}
+ flexible-json-format
+ ${project.version}
+ runtime
+
+
+ ${project.groupId}
+ system-functions-discovery
+ ${project.version}
+ runtime
+
+
+ ${project.groupId}
+ vector-type
+ ${project.version}
+ runtime
+
+
+ ${project.groupId}
+ postgresql-connector
+ ${project.version}
+ runtime
+
diff --git a/pom.xml b/pom.xml
index e805409..8a4bb60 100644
--- a/pom.xml
+++ b/pom.xml
@@ -450,6 +450,57 @@
${project.basedir}/m2e-target
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+ 3.4.0
+
+
+ add-source
+
+ add-source
+
+ generate-sources
+
+
+ target/generated-sources/annotations
+ target/generated-sources/java
+
+
+
+
+ add-google-auto
+
+ add-resource
+
+ generate-sources
+
+
+
+ target/classes
+
+ **/*.class
+
+
+
+
+
+
+ add-test-source
+
+ add-test-source
+
+ generate-test-sources
+
+
+ target/generated-test-sources/test-annotations
+
+
+
+
+
+
diff --git a/testing/system-functions-sample/pom.xml b/testing/system-functions-sample/pom.xml
index d0b74c1..24817a5 100644
--- a/testing/system-functions-sample/pom.xml
+++ b/testing/system-functions-sample/pom.xml
@@ -35,9 +35,9 @@
provided
- com.datasqrl.flinkrunner
+ ${project.groupId}
system-functions-discovery
- 1.0.0-SNAPSHOT
+ ${project.version}
provided
diff --git a/types/json-type/pom.xml b/types/json-type/pom.xml
index 82182ff..12a24fe 100644
--- a/types/json-type/pom.xml
+++ b/types/json-type/pom.xml
@@ -34,5 +34,27 @@
${flink.version}
provided
+
+ org.apache.flink
+ flink-table-runtime
+ ${flink.version}
+ provided
+
+
+ ${project.groupId}
+ system-functions-discovery
+ ${project.version}
+ provided
+
+
+ com.jayway.jsonpath
+ json-path
+ 2.8.0
+
+
+ com.google.auto.service
+ auto-service
+ 1.1.1
+
diff --git a/types/json-type/src/main/java/com/datasqrl/types/json/functions/ArrayAgg.java b/types/json-type/src/main/java/com/datasqrl/types/json/functions/ArrayAgg.java
new file mode 100644
index 0000000..336086a
--- /dev/null
+++ b/types/json-type/src/main/java/com/datasqrl/types/json/functions/ArrayAgg.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright © 2024 DataSQRL (contact@datasqrl.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datasqrl.types.json.functions;
+
+import java.util.List;
+import lombok.Value;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.annotation.DataTypeHint;
+
+@Value
+public class ArrayAgg {
+
+ @DataTypeHint(value = "RAW")
+ private List objects;
+
+ public void add(JsonNode value) {
+ objects.add(value);
+ }
+
+ public void remove(JsonNode value) {
+ objects.remove(value);
+ }
+}
diff --git a/types/json-type/src/main/java/com/datasqrl/types/json/functions/JsonArray.java b/types/json-type/src/main/java/com/datasqrl/types/json/functions/JsonArray.java
new file mode 100644
index 0000000..6c1426f
--- /dev/null
+++ b/types/json-type/src/main/java/com/datasqrl/types/json/functions/JsonArray.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright © 2024 DataSQRL (contact@datasqrl.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datasqrl.types.json.functions;
+
+import static com.datasqrl.types.json.functions.JsonFunctions.createJsonArgumentTypeStrategy;
+import static com.datasqrl.types.json.functions.JsonFunctions.createJsonType;
+
+import com.datasqrl.function.AutoRegisterSystemFunction;
+import com.datasqrl.types.json.FlinkJsonType;
+import com.google.auto.service.AutoService;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.types.inference.InputTypeStrategies;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.table.types.inference.TypeStrategies;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
+
+/** Creates a JSON array from the list of JSON objects and scalar values. */
+@AutoService(AutoRegisterSystemFunction.class)
+public class JsonArray extends ScalarFunction implements AutoRegisterSystemFunction {
+ private static final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
+
+ public FlinkJsonType eval(Object... objects) {
+ ArrayNode arrayNode = mapper.createArrayNode();
+
+ for (Object value : objects) {
+ if (value instanceof FlinkJsonType) {
+ FlinkJsonType type = (FlinkJsonType) value;
+ arrayNode.add(type.json);
+ } else {
+ arrayNode.addPOJO(value);
+ }
+ }
+
+ return new FlinkJsonType(arrayNode);
+ }
+
+ @Override
+ public TypeInference getTypeInference(DataTypeFactory typeFactory) {
+ InputTypeStrategy inputTypeStrategy =
+ InputTypeStrategies.varyingSequence(createJsonArgumentTypeStrategy(typeFactory));
+
+ return TypeInference.newBuilder()
+ .inputTypeStrategy(inputTypeStrategy)
+ .outputTypeStrategy(TypeStrategies.explicit(createJsonType(typeFactory)))
+ .build();
+ }
+}
diff --git a/types/json-type/src/main/java/com/datasqrl/types/json/functions/JsonArrayAgg.java b/types/json-type/src/main/java/com/datasqrl/types/json/functions/JsonArrayAgg.java
new file mode 100644
index 0000000..1042544
--- /dev/null
+++ b/types/json-type/src/main/java/com/datasqrl/types/json/functions/JsonArrayAgg.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright © 2024 DataSQRL (contact@datasqrl.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datasqrl.types.json.functions;
+
+import com.datasqrl.types.json.FlinkJsonType;
+import java.util.ArrayList;
+import lombok.SneakyThrows;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
+
+/** Aggregation function that aggregates JSON objects into a JSON array. */
+public class JsonArrayAgg extends AggregateFunction {
+
+ private static final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
+
+ @Override
+ public ArrayAgg createAccumulator() {
+ return new ArrayAgg(new ArrayList<>());
+ }
+
+ public void accumulate(ArrayAgg accumulator, String value) {
+ accumulator.add(mapper.getNodeFactory().textNode(value));
+ }
+
+ @SneakyThrows
+ public void accumulate(ArrayAgg accumulator, FlinkJsonType value) {
+ if (value != null) {
+ accumulator.add(value.json);
+ } else {
+ accumulator.add(null);
+ }
+ }
+
+ public void accumulate(ArrayAgg accumulator, Double value) {
+ accumulator.add(mapper.getNodeFactory().numberNode(value));
+ }
+
+ public void accumulate(ArrayAgg accumulator, Long value) {
+ accumulator.add(mapper.getNodeFactory().numberNode(value));
+ }
+
+ public void accumulate(ArrayAgg accumulator, Integer value) {
+ accumulator.add(mapper.getNodeFactory().numberNode(value));
+ }
+
+ public void retract(ArrayAgg accumulator, String value) {
+ accumulator.remove(mapper.getNodeFactory().textNode(value));
+ }
+
+ @SneakyThrows
+ public void retract(ArrayAgg accumulator, FlinkJsonType value) {
+ if (value != null) {
+ accumulator.remove(value.json);
+ } else {
+ accumulator.remove(null);
+ }
+ }
+
+ public void retract(ArrayAgg accumulator, Double value) {
+ accumulator.remove(mapper.getNodeFactory().numberNode(value));
+ }
+
+ public void retract(ArrayAgg accumulator, Long value) {
+ accumulator.remove(mapper.getNodeFactory().numberNode(value));
+ }
+
+ public void retract(ArrayAgg accumulator, Integer value) {
+ accumulator.remove(mapper.getNodeFactory().numberNode(value));
+ }
+
+ public void merge(ArrayAgg accumulator, java.lang.Iterable iterable) {
+ iterable.forEach(o -> accumulator.getObjects().addAll(o.getObjects()));
+ }
+
+ @Override
+ public FlinkJsonType getValue(ArrayAgg accumulator) {
+ ArrayNode arrayNode = mapper.createArrayNode();
+ for (Object o : accumulator.getObjects()) {
+ if (o instanceof FlinkJsonType) {
+ arrayNode.add(((FlinkJsonType) o).json);
+ } else {
+ arrayNode.addPOJO(o);
+ }
+ }
+ return new FlinkJsonType(arrayNode);
+ }
+}
diff --git a/types/json-type/src/main/java/com/datasqrl/types/json/functions/JsonConcat.java b/types/json-type/src/main/java/com/datasqrl/types/json/functions/JsonConcat.java
new file mode 100644
index 0000000..27e2800
--- /dev/null
+++ b/types/json-type/src/main/java/com/datasqrl/types/json/functions/JsonConcat.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright © 2024 DataSQRL (contact@datasqrl.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datasqrl.types.json.functions;
+
+import com.datasqrl.function.AutoRegisterSystemFunction;
+import com.datasqrl.types.json.FlinkJsonType;
+import com.google.auto.service.AutoService;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.table.functions.ScalarFunction;
+
+/**
+ * Merges two JSON objects into one. If two objects share the same key, the value from the later
+ * object is used.
+ */
+@AutoService(AutoRegisterSystemFunction.class)
+public class JsonConcat extends ScalarFunction implements AutoRegisterSystemFunction {
+
+ public FlinkJsonType eval(FlinkJsonType json1, FlinkJsonType json2) {
+ if (json1 == null || json2 == null) {
+ return null;
+ }
+ try {
+ ObjectNode node1 = (ObjectNode) json1.getJson();
+ ObjectNode node2 = (ObjectNode) json2.getJson();
+
+ node1.setAll(node2);
+ return new FlinkJsonType(node1);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+}
diff --git a/types/json-type/src/main/java/com/datasqrl/types/json/functions/JsonExists.java b/types/json-type/src/main/java/com/datasqrl/types/json/functions/JsonExists.java
new file mode 100644
index 0000000..7602c30
--- /dev/null
+++ b/types/json-type/src/main/java/com/datasqrl/types/json/functions/JsonExists.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright © 2024 DataSQRL (contact@datasqrl.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datasqrl.types.json.functions;
+
+import com.datasqrl.function.AutoRegisterSystemFunction;
+import com.datasqrl.types.json.FlinkJsonType;
+import com.google.auto.service.AutoService;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.runtime.functions.SqlJsonUtils;
+
+/** For a given JSON object, checks whether the provided JSON path exists */
+@AutoService(AutoRegisterSystemFunction.class)
+public class JsonExists extends ScalarFunction implements AutoRegisterSystemFunction {
+
+ public Boolean eval(FlinkJsonType json, String path) {
+ if (json == null) {
+ return null;
+ }
+ try {
+ return SqlJsonUtils.jsonExists(json.json.toString(), path);
+ } catch (Exception e) {
+ return false;
+ }
+ }
+}
diff --git a/types/json-type/src/main/java/com/datasqrl/types/json/functions/JsonExtract.java b/types/json-type/src/main/java/com/datasqrl/types/json/functions/JsonExtract.java
new file mode 100644
index 0000000..98feedc
--- /dev/null
+++ b/types/json-type/src/main/java/com/datasqrl/types/json/functions/JsonExtract.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright © 2024 DataSQRL (contact@datasqrl.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datasqrl.types.json.functions;
+
+import com.datasqrl.function.AutoRegisterSystemFunction;
+import com.datasqrl.types.json.FlinkJsonType;
+import com.google.auto.service.AutoService;
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.ReadContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.functions.ScalarFunction;
+
+/**
+ * Extracts a value from the JSON object based on the provided JSON path. An optional third argument
+ * can be provided to specify a default value when the given JSON path does not yield a value for
+ * the JSON object.
+ */
+@AutoService(AutoRegisterSystemFunction.class)
+public class JsonExtract extends ScalarFunction implements AutoRegisterSystemFunction {
+
+ public String eval(FlinkJsonType input, String pathSpec) {
+ if (input == null) {
+ return null;
+ }
+ try {
+ JsonNode jsonNode = input.getJson();
+ ReadContext ctx = JsonPath.parse(jsonNode.toString());
+ Object value = ctx.read(pathSpec);
+ if (value == null) {
+ return null;
+ }
+ return value.toString();
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ public String eval(FlinkJsonType input, String pathSpec, String defaultValue) {
+ if (input == null) {
+ return null;
+ }
+ try {
+ ReadContext ctx = JsonPath.parse(input.getJson().toString());
+ JsonPath parse = JsonPath.compile(pathSpec);
+ return ctx.read(parse, String.class);
+ } catch (Exception e) {
+ return defaultValue;
+ }
+ }
+
+ public Boolean eval(FlinkJsonType input, String pathSpec, Boolean defaultValue) {
+ if (input == null) {
+ return null;
+ }
+ try {
+ ReadContext ctx = JsonPath.parse(input.getJson().toString());
+ JsonPath parse = JsonPath.compile(pathSpec);
+ return ctx.read(parse, Boolean.class);
+ } catch (Exception e) {
+ return defaultValue;
+ }
+ }
+
+ public Double eval(FlinkJsonType input, String pathSpec, Double defaultValue) {
+ if (input == null) {
+ return null;
+ }
+ try {
+ ReadContext ctx = JsonPath.parse(input.getJson().toString());
+ JsonPath parse = JsonPath.compile(pathSpec);
+ return ctx.read(parse, Double.class);
+ } catch (Exception e) {
+ return defaultValue;
+ }
+ }
+
+ public Integer eval(FlinkJsonType input, String pathSpec, Integer defaultValue) {
+ if (input == null) {
+ return null;
+ }
+ try {
+ ReadContext ctx = JsonPath.parse(input.getJson().toString());
+ JsonPath parse = JsonPath.compile(pathSpec);
+ return ctx.read(parse, Integer.class);
+ } catch (Exception e) {
+ return defaultValue;
+ }
+ }
+}
diff --git a/types/json-type/src/main/java/com/datasqrl/types/json/functions/JsonFunctions.java b/types/json-type/src/main/java/com/datasqrl/types/json/functions/JsonFunctions.java
new file mode 100644
index 0000000..09a7a85
--- /dev/null
+++ b/types/json-type/src/main/java/com/datasqrl/types/json/functions/JsonFunctions.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright © 2024 DataSQRL (contact@datasqrl.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datasqrl.types.json.functions;
+
+import com.datasqrl.types.json.FlinkJsonType;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
+import org.apache.flink.table.types.inference.InputTypeStrategies;
+import org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies;
+
+public class JsonFunctions {
+
+ public static final ToJson TO_JSON = new ToJson();
+ public static final JsonToString JSON_TO_STRING = new JsonToString();
+ public static final JsonObject JSON_OBJECT = new JsonObject();
+ public static final JsonArray JSON_ARRAY = new JsonArray();
+ public static final JsonExtract JSON_EXTRACT = new JsonExtract();
+ public static final JsonQuery JSON_QUERY = new JsonQuery();
+ public static final JsonExists JSON_EXISTS = new JsonExists();
+ public static final JsonArrayAgg JSON_ARRAYAGG = new JsonArrayAgg();
+ public static final JsonObjectAgg JSON_OBJECTAGG = new JsonObjectAgg();
+ public static final JsonConcat JSON_CONCAT = new JsonConcat();
+
+ public static ArgumentTypeStrategy createJsonArgumentTypeStrategy(DataTypeFactory typeFactory) {
+ return InputTypeStrategies.or(
+ SpecificInputTypeStrategies.JSON_ARGUMENT,
+ InputTypeStrategies.explicit(createJsonType(typeFactory)));
+ }
+
+ public static DataType createJsonType(DataTypeFactory typeFactory) {
+ DataType dataType = DataTypes.of(FlinkJsonType.class).toDataType(typeFactory);
+ return dataType;
+ }
+}
diff --git a/types/json-type/src/main/java/com/datasqrl/types/json/functions/JsonObject.java b/types/json-type/src/main/java/com/datasqrl/types/json/functions/JsonObject.java
new file mode 100644
index 0000000..593b8ef
--- /dev/null
+++ b/types/json-type/src/main/java/com/datasqrl/types/json/functions/JsonObject.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright © 2024 DataSQRL (contact@datasqrl.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datasqrl.types.json.functions;
+
+import static com.datasqrl.types.json.functions.JsonFunctions.createJsonArgumentTypeStrategy;
+
+import com.datasqrl.function.AutoRegisterSystemFunction;
+import com.datasqrl.types.json.FlinkJsonType;
+import com.google.auto.service.AutoService;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.types.inference.InputTypeStrategies;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.table.types.inference.TypeStrategies;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
+
+/**
+ * Creates a JSON object from key-value pairs, where the key is mapped to a field with the
+ * associated value. Key-value pairs are provided as a list of even length, with the first element
+ * of each pair being the key and the second being the value. If multiple key-value pairs have the
+ * same key, the last pair is added to the JSON object.
+ */
+@AutoService(AutoRegisterSystemFunction.class)
+public class JsonObject extends ScalarFunction implements AutoRegisterSystemFunction {
+ static final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
+
+ public FlinkJsonType eval(Object... objects) {
+ if (objects.length % 2 != 0) {
+ throw new IllegalArgumentException("Arguments should be in key-value pairs");
+ }
+
+ ObjectNode objectNode = mapper.createObjectNode();
+
+ for (int i = 0; i < objects.length; i += 2) {
+ if (!(objects[i] instanceof String)) {
+ throw new IllegalArgumentException("Key must be a string");
+ }
+ String key = (String) objects[i];
+ Object value = objects[i + 1];
+ if (value instanceof FlinkJsonType) {
+ FlinkJsonType type = (FlinkJsonType) value;
+ objectNode.put(key, type.json);
+ } else {
+ objectNode.putPOJO(key, value);
+ }
+ }
+
+ return new FlinkJsonType(objectNode);
+ }
+
+ @Override
+ public TypeInference getTypeInference(DataTypeFactory typeFactory) {
+ InputTypeStrategy anyJsonCompatibleArg =
+ InputTypeStrategies.repeatingSequence(createJsonArgumentTypeStrategy(typeFactory));
+
+ InputTypeStrategy inputTypeStrategy =
+ InputTypeStrategies.compositeSequence().finishWithVarying(anyJsonCompatibleArg);
+
+ return TypeInference.newBuilder()
+ .inputTypeStrategy(inputTypeStrategy)
+ .outputTypeStrategy(
+ TypeStrategies.explicit(DataTypes.of(FlinkJsonType.class).toDataType(typeFactory)))
+ .build();
+ }
+}
diff --git a/types/json-type/src/main/java/com/datasqrl/types/json/functions/JsonObjectAgg.java b/types/json-type/src/main/java/com/datasqrl/types/json/functions/JsonObjectAgg.java
new file mode 100644
index 0000000..b9a817a
--- /dev/null
+++ b/types/json-type/src/main/java/com/datasqrl/types/json/functions/JsonObjectAgg.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright © 2024 DataSQRL (contact@datasqrl.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datasqrl.types.json.functions;
+
+import com.datasqrl.types.json.FlinkJsonType;
+import com.datasqrl.types.json.FlinkJsonTypeSerializer;
+import java.util.LinkedHashMap;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.FunctionHint;
+import org.apache.flink.table.annotation.InputGroup;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
+
+/**
+ * Aggregation function that merges JSON objects into a single JSON object. If two JSON objects
+ * share the same field name, the value of the later one is used in the aggregated result.
+ */
+@FunctionHint(
+ output =
+ @DataTypeHint(
+ value = "RAW",
+ bridgedTo = FlinkJsonType.class,
+ rawSerializer = FlinkJsonTypeSerializer.class))
+public class JsonObjectAgg extends AggregateFunction