diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml
index 104c392..d5f2ccf 100644
--- a/.github/workflows/deploy.yml
+++ b/.github/workflows/deploy.yml
@@ -50,7 +50,7 @@ jobs:
- name: Download dependencies
run: |
mvn -B org.apache.maven.plugins:maven-dependency-plugin:3.8.1:go-offline de.qaware.maven:go-offline-maven-plugin:1.2.8:resolve-dependencies
- mvn -B org.apache.maven.plugins:maven-resources-plugin:3.3.1:testResources com.marvinformatics:docker-compose-maven-plugin:5.0.0:pull
+ mvn -B org.apache.maven.plugins:maven-resources-plugin:3.3.1:resources com.marvinformatics:docker-compose-maven-plugin:5.0.0:pull
- name: Update version
if: github.event_name == 'release' && github.event.action == 'created'
diff --git a/.github/workflows/uber-jar.yml b/.github/workflows/uber-jar.yml
index 242ac41..9c8426e 100644
--- a/.github/workflows/uber-jar.yml
+++ b/.github/workflows/uber-jar.yml
@@ -54,7 +54,7 @@ jobs:
- name: Download dependencies
run: |
mvn -B org.apache.maven.plugins:maven-dependency-plugin:3.8.1:go-offline de.qaware.maven:go-offline-maven-plugin:1.2.8:resolve-dependencies -P${{ matrix.FLINK_PROFILE }}
- mvn -B org.apache.maven.plugins:maven-resources-plugin:3.3.1:testResources com.marvinformatics:docker-compose-maven-plugin:5.0.0:pull -P${{ matrix.FLINK_PROFILE }}
+ mvn -B org.apache.maven.plugins:maven-resources-plugin:3.3.1:resources com.marvinformatics:docker-compose-maven-plugin:5.0.0:pull -P${{ matrix.FLINK_PROFILE }}
- name: Update version
if: github.event_name == 'release' && github.event.action == 'created'
diff --git a/pom.xml b/pom.xml
index 264cc62..0e15421 100644
--- a/pom.xml
+++ b/pom.xml
@@ -243,15 +243,6 @@
src/main/resources
-
-
- true
- src/test/docker
-
-
- src/test/resources
-
-
org.apache.maven.plugins
@@ -278,10 +269,19 @@
-
*:*
+
+ com/datasqrl/function/AutoRegisterSystemFunction.class
+
META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA
@@ -299,6 +299,26 @@
+
+ maven-jar-plugin
+ 3.3.0
+
+
+ service-jar
+
+ jar
+
+ package
+
+ service
+
+ com/datasqrl/function/AutoRegisterSystemFunction.class
+
+
+
+
+
+
@@ -392,7 +412,7 @@
5.0.0
${skipTests}
- ${project.basedir}/target/test-classes/docker-compose.yml
+ ${project.basedir}/target/docker-compose.yml
diff --git a/src/main/java/com/datasqrl/SqlExecutor.java b/src/main/java/com/datasqrl/SqlExecutor.java
index 90fbaec..4d1e60b 100644
--- a/src/main/java/com/datasqrl/SqlExecutor.java
+++ b/src/main/java/com/datasqrl/SqlExecutor.java
@@ -57,20 +57,29 @@ public SqlExecutor(Configuration configuration, String udfPath) {
}
public void setupSystemFunctions() {
+ System.out.println("Setting up automatically registered system functions");
- ServiceLoader standardLibraryFunctions =
- ServiceLoader.load(AutoRegisterSystemFunction.class);
-
- standardLibraryFunctions.forEach(
- function -> {
- String sql =
- String.format(
- "CREATE TEMPORARY FUNCTION IF NOT EXISTS `%s` AS '%s' LANGUAGE JAVA;",
- getFunctionNameFromClass(function.getClass()), function.getClass().getName());
+ try {
+ ServiceLoader standardLibraryFunctions =
+ ServiceLoader.load(AutoRegisterSystemFunction.class);
+
+ standardLibraryFunctions.forEach(
+ function -> {
+ String sql =
+ String.format(
+ "CREATE TEMPORARY FUNCTION IF NOT EXISTS `%s` AS '%s' LANGUAGE JAVA;",
+ getFunctionNameFromClass(function.getClass()), function.getClass().getName());
+
+ System.out.println(sql);
+
+ tableEnv.executeSql(sql);
+ });
+ } catch (Throwable e) {
+ e.printStackTrace(System.out);
+ throw new RuntimeException(e);
+ }
- System.out.println(sql);
- tableEnv.executeSql(sql);
- });
+ System.out.println("Completed auto function registered system functions");
}
static String getFunctionNameFromClass(Class clazz) {
diff --git a/src/test/docker/docker-compose.yml b/src/test/docker/docker-compose.yml
index b1ac9c0..68639ba 100644
--- a/src/test/docker/docker-compose.yml
+++ b/src/test/docker/docker-compose.yml
@@ -30,7 +30,7 @@ services:
ports:
- "5432:5432"
volumes:
- - ./sqrl/postgres-schema.sql:/docker-entrypoint-initdb.d/database-schema.sql:ro
+ - ./test-classes/sqrl/postgres-schema.sql:/docker-entrypoint-initdb.d/database-schema.sql:ro
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 1s
@@ -61,9 +61,11 @@ services:
condition: service_healthy
volumes:
- flink_conf:/opt/flink/conf
- - ./:/opt/flink/usrlib/
- - ./datasources/:/datasources/
+ - ./test-classes/:/opt/flink/usrlib/
+ - ./test-classes/datasources/:/datasources/
- /tmp:/tmp
+ - ./flink-sql-runner-1.0.0-SNAPSHOT-service.jar:/opt/flink/lib/flink-sql-runner-service.jar
+ - ./test-classes/systemfunction/target/systemfunction-sample-1.0.0-SNAPSHOT.jar:/opt/flink/lib/systemfunction.jar
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:8081/ || exit 1"]
interval: 1s
@@ -95,9 +97,11 @@ services:
condition: service_healthy
volumes:
- flink_conf:/opt/flink/conf
- - ./:/opt/flink/usrlib/
- - ./datasources/:/datasources/
+ - ./test-classes/:/opt/flink/usrlib/
+ - ./test-classes/datasources/:/datasources/
- /tmp:/tmp
+ - ./flink-sql-runner-1.0.0-SNAPSHOT-service.jar:/opt/flink/lib/flink-sql-runner-service.jar
+ - ./test-classes/systemfunction/target/systemfunction-sample-1.0.0-SNAPSHOT.jar:/opt/flink/lib/systemfunction.jar
restart: always
deploy:
resources:
diff --git a/src/test/resources/sql/flink.sql b/src/test/resources/sql/flink.sql
index 9040222..ae94a7f 100644
--- a/src/test/resources/sql/flink.sql
+++ b/src/test/resources/sql/flink.sql
@@ -114,7 +114,7 @@ WHERE `_rownum` = 1;
CREATE VIEW `table$2`
AS
SELECT *
-FROM (SELECT `customerId`, `cardNo`, `timestamp`, `cardType`, ROW_NUMBER() OVER (PARTITION BY `cardNo` ORDER BY `timestamp` DESC) AS `_rownum`
+FROM (SELECT `customerId`, `cardNo`, `timestamp`, upper(`cardType`) as cardType, ROW_NUMBER() OVER (PARTITION BY `cardNo` ORDER BY `timestamp` DESC) AS `_rownum`
FROM `cardassignment_1`) AS `t1`
WHERE `_rownum` = 1;
@@ -142,7 +142,7 @@ WHERE `_rownum` = 1;
CREATE VIEW `table$6`
AS
SELECT *
-FROM (SELECT `customerId`, `cardNo`, `timestamp`, `cardType`, ROW_NUMBER() OVER (PARTITION BY `cardNo` ORDER BY `timestamp` DESC) AS `_rownum`
+FROM (SELECT `customerId`, `cardNo`, `timestamp`, upper(`cardType`) as cardType, ROW_NUMBER() OVER (PARTITION BY `cardNo` ORDER BY `timestamp` DESC) AS `_rownum`
FROM `cardassignment_1`) AS `t1`
WHERE `_rownum` = 1;
diff --git a/src/test/resources/systemfunction/pom.xml b/src/test/resources/systemfunction/pom.xml
new file mode 100644
index 0000000..078c0a4
--- /dev/null
+++ b/src/test/resources/systemfunction/pom.xml
@@ -0,0 +1,47 @@
+
+
+
+ 4.0.0
+
+ com.datasqrl
+ systemfunction-sample
+ 1.0.0-SNAPSHOT
+
+
+
+ org.apache.flink
+ flink-table-common
+ 1.19.2
+ provided
+
+
+ com.datasqrl
+ flink-sql-runner
+ 1.0.0-SNAPSHOT
+ service
+ provided
+
+
+ com.google.auto.service
+ auto-service
+ 1.0.1
+
+
+
+
diff --git a/src/test/resources/systemfunction/src/main/java/sample/Upper.java b/src/test/resources/systemfunction/src/main/java/sample/Upper.java
new file mode 100644
index 0000000..5d65a06
--- /dev/null
+++ b/src/test/resources/systemfunction/src/main/java/sample/Upper.java
@@ -0,0 +1,23 @@
+package sample;
+
+import org.apache.flink.table.functions.ScalarFunction;
+
+import com.datasqrl.function.AutoRegisterSystemFunction;
+import com.google.auto.service.AutoService;
+
+/**
+ * Converts the input string to uppercase.
+ * If the input string is null, a null value is returned.
+ */
+@AutoService(AutoRegisterSystemFunction.class)
+public class Upper extends ScalarFunction implements AutoRegisterSystemFunction {
+
+ public String eval(String text) {
+ if (text == null) {
+ return null;
+ }
+
+ return text.toUpperCase();
+ }
+
+}
\ No newline at end of file
diff --git a/src/test/resources/systemfunction/target/systemfunction-sample-1.0.0-SNAPSHOT.jar b/src/test/resources/systemfunction/target/systemfunction-sample-1.0.0-SNAPSHOT.jar
new file mode 100644
index 0000000..b4ab079
Binary files /dev/null and b/src/test/resources/systemfunction/target/systemfunction-sample-1.0.0-SNAPSHOT.jar differ