diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 104c392..d5f2ccf 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -50,7 +50,7 @@ jobs: - name: Download dependencies run: | mvn -B org.apache.maven.plugins:maven-dependency-plugin:3.8.1:go-offline de.qaware.maven:go-offline-maven-plugin:1.2.8:resolve-dependencies - mvn -B org.apache.maven.plugins:maven-resources-plugin:3.3.1:testResources com.marvinformatics:docker-compose-maven-plugin:5.0.0:pull + mvn -B org.apache.maven.plugins:maven-resources-plugin:3.3.1:resources com.marvinformatics:docker-compose-maven-plugin:5.0.0:pull - name: Update version if: github.event_name == 'release' && github.event.action == 'created' diff --git a/.github/workflows/uber-jar.yml b/.github/workflows/uber-jar.yml index 242ac41..9c8426e 100644 --- a/.github/workflows/uber-jar.yml +++ b/.github/workflows/uber-jar.yml @@ -54,7 +54,7 @@ jobs: - name: Download dependencies run: | mvn -B org.apache.maven.plugins:maven-dependency-plugin:3.8.1:go-offline de.qaware.maven:go-offline-maven-plugin:1.2.8:resolve-dependencies -P${{ matrix.FLINK_PROFILE }} - mvn -B org.apache.maven.plugins:maven-resources-plugin:3.3.1:testResources com.marvinformatics:docker-compose-maven-plugin:5.0.0:pull -P${{ matrix.FLINK_PROFILE }} + mvn -B org.apache.maven.plugins:maven-resources-plugin:3.3.1:resources com.marvinformatics:docker-compose-maven-plugin:5.0.0:pull -P${{ matrix.FLINK_PROFILE }} - name: Update version if: github.event_name == 'release' && github.event.action == 'created' diff --git a/pom.xml b/pom.xml index 264cc62..0e15421 100644 --- a/pom.xml +++ b/pom.xml @@ -243,15 +243,6 @@ src/main/resources - - - true - src/test/docker - - - src/test/resources - - org.apache.maven.plugins @@ -278,10 +269,19 @@ - *:* + + com/datasqrl/function/AutoRegisterSystemFunction.class + META-INF/*.SF META-INF/*.DSA META-INF/*.RSA @@ -299,6 +299,26 @@ + + maven-jar-plugin + 3.3.0 + + + service-jar + + jar + + package + + service + + com/datasqrl/function/AutoRegisterSystemFunction.class + + + + + + @@ -392,7 +412,7 @@ 5.0.0 ${skipTests} - ${project.basedir}/target/test-classes/docker-compose.yml + ${project.basedir}/target/docker-compose.yml diff --git a/src/main/java/com/datasqrl/SqlExecutor.java b/src/main/java/com/datasqrl/SqlExecutor.java index 90fbaec..4d1e60b 100644 --- a/src/main/java/com/datasqrl/SqlExecutor.java +++ b/src/main/java/com/datasqrl/SqlExecutor.java @@ -57,20 +57,29 @@ public SqlExecutor(Configuration configuration, String udfPath) { } public void setupSystemFunctions() { + System.out.println("Setting up automatically registered system functions"); - ServiceLoader standardLibraryFunctions = - ServiceLoader.load(AutoRegisterSystemFunction.class); - - standardLibraryFunctions.forEach( - function -> { - String sql = - String.format( - "CREATE TEMPORARY FUNCTION IF NOT EXISTS `%s` AS '%s' LANGUAGE JAVA;", - getFunctionNameFromClass(function.getClass()), function.getClass().getName()); + try { + ServiceLoader standardLibraryFunctions = + ServiceLoader.load(AutoRegisterSystemFunction.class); + + standardLibraryFunctions.forEach( + function -> { + String sql = + String.format( + "CREATE TEMPORARY FUNCTION IF NOT EXISTS `%s` AS '%s' LANGUAGE JAVA;", + getFunctionNameFromClass(function.getClass()), function.getClass().getName()); + + System.out.println(sql); + + tableEnv.executeSql(sql); + }); + } catch (Throwable e) { + e.printStackTrace(System.out); + throw new RuntimeException(e); + } - System.out.println(sql); - tableEnv.executeSql(sql); - }); + System.out.println("Completed auto function registered system functions"); } static String getFunctionNameFromClass(Class clazz) { diff --git a/src/test/docker/docker-compose.yml b/src/test/docker/docker-compose.yml index b1ac9c0..68639ba 100644 --- a/src/test/docker/docker-compose.yml +++ b/src/test/docker/docker-compose.yml @@ -30,7 +30,7 @@ services: ports: - "5432:5432" volumes: - - ./sqrl/postgres-schema.sql:/docker-entrypoint-initdb.d/database-schema.sql:ro + - ./test-classes/sqrl/postgres-schema.sql:/docker-entrypoint-initdb.d/database-schema.sql:ro healthcheck: test: ["CMD-SHELL", "pg_isready -U postgres"] interval: 1s @@ -61,9 +61,11 @@ services: condition: service_healthy volumes: - flink_conf:/opt/flink/conf - - ./:/opt/flink/usrlib/ - - ./datasources/:/datasources/ + - ./test-classes/:/opt/flink/usrlib/ + - ./test-classes/datasources/:/datasources/ - /tmp:/tmp + - ./flink-sql-runner-1.0.0-SNAPSHOT-service.jar:/opt/flink/lib/flink-sql-runner-service.jar + - ./test-classes/systemfunction/target/systemfunction-sample-1.0.0-SNAPSHOT.jar:/opt/flink/lib/systemfunction.jar healthcheck: test: ["CMD-SHELL", "curl -f http://localhost:8081/ || exit 1"] interval: 1s @@ -95,9 +97,11 @@ services: condition: service_healthy volumes: - flink_conf:/opt/flink/conf - - ./:/opt/flink/usrlib/ - - ./datasources/:/datasources/ + - ./test-classes/:/opt/flink/usrlib/ + - ./test-classes/datasources/:/datasources/ - /tmp:/tmp + - ./flink-sql-runner-1.0.0-SNAPSHOT-service.jar:/opt/flink/lib/flink-sql-runner-service.jar + - ./test-classes/systemfunction/target/systemfunction-sample-1.0.0-SNAPSHOT.jar:/opt/flink/lib/systemfunction.jar restart: always deploy: resources: diff --git a/src/test/resources/sql/flink.sql b/src/test/resources/sql/flink.sql index 9040222..ae94a7f 100644 --- a/src/test/resources/sql/flink.sql +++ b/src/test/resources/sql/flink.sql @@ -114,7 +114,7 @@ WHERE `_rownum` = 1; CREATE VIEW `table$2` AS SELECT * -FROM (SELECT `customerId`, `cardNo`, `timestamp`, `cardType`, ROW_NUMBER() OVER (PARTITION BY `cardNo` ORDER BY `timestamp` DESC) AS `_rownum` +FROM (SELECT `customerId`, `cardNo`, `timestamp`, upper(`cardType`) as cardType, ROW_NUMBER() OVER (PARTITION BY `cardNo` ORDER BY `timestamp` DESC) AS `_rownum` FROM `cardassignment_1`) AS `t1` WHERE `_rownum` = 1; @@ -142,7 +142,7 @@ WHERE `_rownum` = 1; CREATE VIEW `table$6` AS SELECT * -FROM (SELECT `customerId`, `cardNo`, `timestamp`, `cardType`, ROW_NUMBER() OVER (PARTITION BY `cardNo` ORDER BY `timestamp` DESC) AS `_rownum` +FROM (SELECT `customerId`, `cardNo`, `timestamp`, upper(`cardType`) as cardType, ROW_NUMBER() OVER (PARTITION BY `cardNo` ORDER BY `timestamp` DESC) AS `_rownum` FROM `cardassignment_1`) AS `t1` WHERE `_rownum` = 1; diff --git a/src/test/resources/systemfunction/pom.xml b/src/test/resources/systemfunction/pom.xml new file mode 100644 index 0000000..078c0a4 --- /dev/null +++ b/src/test/resources/systemfunction/pom.xml @@ -0,0 +1,47 @@ + + + + 4.0.0 + + com.datasqrl + systemfunction-sample + 1.0.0-SNAPSHOT + + + + org.apache.flink + flink-table-common + 1.19.2 + provided + + + com.datasqrl + flink-sql-runner + 1.0.0-SNAPSHOT + service + provided + + + com.google.auto.service + auto-service + 1.0.1 + + + + diff --git a/src/test/resources/systemfunction/src/main/java/sample/Upper.java b/src/test/resources/systemfunction/src/main/java/sample/Upper.java new file mode 100644 index 0000000..5d65a06 --- /dev/null +++ b/src/test/resources/systemfunction/src/main/java/sample/Upper.java @@ -0,0 +1,23 @@ +package sample; + +import org.apache.flink.table.functions.ScalarFunction; + +import com.datasqrl.function.AutoRegisterSystemFunction; +import com.google.auto.service.AutoService; + +/** + * Converts the input string to uppercase. + * If the input string is null, a null value is returned. + */ +@AutoService(AutoRegisterSystemFunction.class) +public class Upper extends ScalarFunction implements AutoRegisterSystemFunction { + + public String eval(String text) { + if (text == null) { + return null; + } + + return text.toUpperCase(); + } + +} \ No newline at end of file diff --git a/src/test/resources/systemfunction/target/systemfunction-sample-1.0.0-SNAPSHOT.jar b/src/test/resources/systemfunction/target/systemfunction-sample-1.0.0-SNAPSHOT.jar new file mode 100644 index 0000000..b4ab079 Binary files /dev/null and b/src/test/resources/systemfunction/target/systemfunction-sample-1.0.0-SNAPSHOT.jar differ