Skip to content

Commit 5e35974

Browse files
committed
Test system function service discovery
Signed-off-by: Marvin Froeder <marvin@datasqrl.com>
1 parent 67def7b commit 5e35974

File tree

7 files changed

+137
-22
lines changed

7 files changed

+137
-22
lines changed

pom.xml

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -278,10 +278,19 @@
278278
</artifactSet>
279279
<filters>
280280
<filter>
281-
<!-- Do not copy the signatures in the META-INF folder.
282-
Otherwise, this might cause SecurityExceptions when using the JAR. -->
283281
<artifact>*:*</artifact>
284282
<excludes>
283+
<!--
284+
We exclude AutoRegisterSystemFunction from the shaded JAR because this class
285+
is loaded via ServiceLoader from the Flink classpath (/opt/flink/lib), not the job JAR.
286+
Including it here would result in a NoClassDefFoundError due to Flink's classloader isolation.
287+
288+
A separate 'service' JAR is built (see maven-jar-plugin) that contains only this interface,
289+
and is deployed to Flink's classpath to ensure it is available at runtime for discovery.
290+
-->
291+
<exclude>com/datasqrl/function/AutoRegisterSystemFunction.class</exclude>
292+
<!-- Do not copy the signatures in the META-INF folder.
293+
Otherwise, this might cause SecurityExceptions when using the JAR. -->
285294
<exclude>META-INF/*.SF</exclude>
286295
<exclude>META-INF/*.DSA</exclude>
287296
<exclude>META-INF/*.RSA</exclude>
@@ -299,6 +308,26 @@
299308
</executions>
300309
</plugin>
301310

311+
<plugin>
312+
<artifactId>maven-jar-plugin</artifactId>
313+
<version>3.3.0</version>
314+
<executions>
315+
<execution>
316+
<id>service-jar</id>
317+
<goals>
318+
<goal>jar</goal>
319+
</goals>
320+
<phase>package</phase>
321+
<configuration>
322+
<classifier>service</classifier>
323+
<includes>
324+
<include>com/datasqrl/function/AutoRegisterSystemFunction.class</include>
325+
</includes>
326+
</configuration>
327+
</execution>
328+
</executions>
329+
</plugin>
330+
302331
<!-- linter for java code, will be applied automatically when files are committed to git -->
303332
<!-- alternatively, can run with -Pdev to force code format on the whole project -->
304333
<plugin>
@@ -392,7 +421,7 @@
392421
<version>5.0.0</version>
393422
<configuration>
394423
<skip>${skipTests}</skip>
395-
<composeFile>${project.basedir}/target/test-classes/docker-compose.yml</composeFile>
424+
<composeFile>${project.basedir}/target/docker-compose.yml</composeFile>
396425
</configuration>
397426
<executions>
398427
<execution>

src/main/java/com/datasqrl/SqlExecutor.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -57,20 +57,29 @@ public SqlExecutor(Configuration configuration, String udfPath) {
5757
}
5858

5959
public void setupSystemFunctions() {
60+
System.out.println("Setting up automatically registered system functions");
6061

61-
ServiceLoader<AutoRegisterSystemFunction> standardLibraryFunctions =
62-
ServiceLoader.load(AutoRegisterSystemFunction.class);
63-
64-
standardLibraryFunctions.forEach(
65-
function -> {
66-
String sql =
67-
String.format(
68-
"CREATE TEMPORARY FUNCTION IF NOT EXISTS `%s` AS '%s' LANGUAGE JAVA;",
69-
getFunctionNameFromClass(function.getClass()), function.getClass().getName());
62+
try {
63+
ServiceLoader<AutoRegisterSystemFunction> standardLibraryFunctions =
64+
ServiceLoader.load(AutoRegisterSystemFunction.class);
65+
66+
standardLibraryFunctions.forEach(
67+
function -> {
68+
String sql =
69+
String.format(
70+
"CREATE TEMPORARY FUNCTION IF NOT EXISTS `%s` AS '%s' LANGUAGE JAVA;",
71+
getFunctionNameFromClass(function.getClass()), function.getClass().getName());
72+
73+
System.out.println(sql);
74+
75+
tableEnv.executeSql(sql);
76+
});
77+
} catch (Throwable e) {
78+
e.printStackTrace(System.out);
79+
throw new RuntimeException(e);
80+
}
7081

71-
System.out.println(sql);
72-
tableEnv.executeSql(sql);
73-
});
82+
System.out.println("Completed auto function registered system functions");
7483
}
7584

7685
static String getFunctionNameFromClass(Class clazz) {

src/test/docker/docker-compose.yml

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ services:
3030
ports:
3131
- "5432:5432"
3232
volumes:
33-
- ./sqrl/postgres-schema.sql:/docker-entrypoint-initdb.d/database-schema.sql:ro
33+
- ./test-classes/sqrl/postgres-schema.sql:/docker-entrypoint-initdb.d/database-schema.sql:ro
3434
healthcheck:
3535
test: ["CMD-SHELL", "pg_isready -U postgres"]
3636
interval: 1s
@@ -61,9 +61,11 @@ services:
6161
condition: service_healthy
6262
volumes:
6363
- flink_conf:/opt/flink/conf
64-
- ./:/opt/flink/usrlib/
65-
- ./datasources/:/datasources/
64+
- ./test-classes/:/opt/flink/usrlib/
65+
- ./test-classes/datasources/:/datasources/
6666
- /tmp:/tmp
67+
- ./flink-sql-runner-1.0.0-SNAPSHOT-service.jar:/opt/flink/lib/flink-sql-runner-service.jar
68+
- ./test-classes/systemfunction/target/systemfunction-sample-1.0.0-SNAPSHOT.jar:/opt/flink/lib/systemfunction.jar
6769
healthcheck:
6870
test: ["CMD-SHELL", "curl -f http://localhost:8081/ || exit 1"]
6971
interval: 1s
@@ -95,9 +97,11 @@ services:
9597
condition: service_healthy
9698
volumes:
9799
- flink_conf:/opt/flink/conf
98-
- ./:/opt/flink/usrlib/
99-
- ./datasources/:/datasources/
100+
- ./test-classes/:/opt/flink/usrlib/
101+
- ./test-classes/datasources/:/datasources/
100102
- /tmp:/tmp
103+
- ./flink-sql-runner-1.0.0-SNAPSHOT-service.jar:/opt/flink/lib/flink-sql-runner-service.jar
104+
- ./test-classes/systemfunction/target/systemfunction-sample-1.0.0-SNAPSHOT.jar:/opt/flink/lib/systemfunction.jar
101105
restart: always
102106
deploy:
103107
resources:

src/test/resources/sql/flink.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ WHERE `_rownum` = 1;
114114
CREATE VIEW `table$2`
115115
AS
116116
SELECT *
117-
FROM (SELECT `customerId`, `cardNo`, `timestamp`, `cardType`, ROW_NUMBER() OVER (PARTITION BY `cardNo` ORDER BY `timestamp` DESC) AS `_rownum`
117+
FROM (SELECT `customerId`, `cardNo`, `timestamp`, upper(`cardType`) as cardType, ROW_NUMBER() OVER (PARTITION BY `cardNo` ORDER BY `timestamp` DESC) AS `_rownum`
118118
FROM `cardassignment_1`) AS `t1`
119119
WHERE `_rownum` = 1;
120120

@@ -142,7 +142,7 @@ WHERE `_rownum` = 1;
142142
CREATE VIEW `table$6`
143143
AS
144144
SELECT *
145-
FROM (SELECT `customerId`, `cardNo`, `timestamp`, `cardType`, ROW_NUMBER() OVER (PARTITION BY `cardNo` ORDER BY `timestamp` DESC) AS `_rownum`
145+
FROM (SELECT `customerId`, `cardNo`, `timestamp`, upper(`cardType`) as cardType, ROW_NUMBER() OVER (PARTITION BY `cardNo` ORDER BY `timestamp` DESC) AS `_rownum`
146146
FROM `cardassignment_1`) AS `t1`
147147
WHERE `_rownum` = 1;
148148

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
4+
Copyright © 2024 DataSQRL (contact@datasqrl.com)
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
18+
-->
19+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
<modelVersion>4.0.0</modelVersion>
21+
22+
<groupId>com.datasqrl</groupId>
23+
<artifactId>systemfunction-sample</artifactId>
24+
<version>1.0.0-SNAPSHOT</version>
25+
26+
<dependencies>
27+
<dependency>
28+
<groupId>org.apache.flink</groupId>
29+
<artifactId>flink-table-common</artifactId>
30+
<version>1.19.2</version>
31+
<scope>provided</scope>
32+
</dependency>
33+
<dependency>
34+
<groupId>com.datasqrl</groupId>
35+
<artifactId>flink-sql-runner</artifactId>
36+
<version>1.0.0-SNAPSHOT</version>
37+
<classifier>service</classifier>
38+
<scope>provided</scope>
39+
</dependency>
40+
<dependency>
41+
<groupId>com.google.auto.service</groupId>
42+
<artifactId>auto-service</artifactId>
43+
<version>1.0.1</version>
44+
</dependency>
45+
</dependencies>
46+
47+
</project>
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package sample;
2+
3+
import org.apache.flink.table.functions.ScalarFunction;
4+
5+
import com.datasqrl.function.AutoRegisterSystemFunction;
6+
import com.google.auto.service.AutoService;
7+
8+
/**
9+
* Returns an array of substrings by splitting the input string based on the given delimiter.
10+
* If the delimiter is not found in the string, the original string is returned as the only element
11+
* in the array. If the delimiter is empty, every character in the string is split. If the string or
12+
* delimiter is null, a null value is returned. If the delimiter is found at the beginning or end of
13+
* the string, or there are contiguous delimiters, then an empty string is added to the array.
14+
*/
15+
@AutoService(AutoRegisterSystemFunction.class)
16+
public class Upper extends ScalarFunction implements AutoRegisterSystemFunction {
17+
18+
public String eval(String text) {
19+
if (text == null) {
20+
return null;
21+
}
22+
23+
return text.toUpperCase();
24+
}
25+
26+
}
Binary file not shown.

0 commit comments

Comments
 (0)