Skip to content

Commit 84bbd55

Browse files
committed
Turn all functions into discoverable services
Signed-off-by: Marvin Froeder <marvin@datasqrl.com>
1 parent 60c8f34 commit 84bbd55

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+230
-37
lines changed

flink-jar-runner/pom.xml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,11 @@
186186
<artifactId>sqrl-text</artifactId>
187187
<version>${project.version}</version>
188188
</dependency>
189+
<dependency>
190+
<groupId>${project.groupId}</groupId>
191+
<artifactId>sqrl-vector</artifactId>
192+
<version>${project.version}</version>
193+
</dependency>
189194
<dependency>
190195
<groupId>${project.groupId}</groupId>
191196
<artifactId>sqrl-flexible-json</artifactId>
@@ -278,6 +283,16 @@
278283
<composeFile>${project.basedir}/target/test-classes/docker-compose.yml</composeFile>
279284
</configuration>
280285
<executions>
286+
<execution>
287+
<id>down-before-execution</id>
288+
<goals>
289+
<goal>down</goal>
290+
</goals>
291+
<phase>pre-integration-test</phase>
292+
<configuration>
293+
<removeVolumes>true</removeVolumes>
294+
</configuration>
295+
</execution>
281296
<execution>
282297
<id>up</id>
283298
<goals>

flink-jar-runner/src/main/java/com/datasqrl/FlinkMain.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ public int run() throws Exception {
132132

133133
planJson = replaceScriptWithEnv(planJson);
134134

135+
sqlExecutor.setupSystemFunctions();
135136
tableResult = sqlExecutor.executeCompiledPlan(planJson);
136137
} else {
137138
System.err.println("Invalid input. Please provide one of the following combinations:");

flink-jar-runner/src/main/java/com/datasqrl/SqlExecutor.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
*/
1616
package com.datasqrl;
1717

18+
import com.datasqrl.function.StandardLibraryFunction;
1819
import java.io.File;
1920
import java.util.List;
2021
import java.util.Map;
22+
import java.util.ServiceLoader;
2123
import java.util.regex.Matcher;
2224
import java.util.regex.Pattern;
2325
import org.apache.flink.configuration.Configuration;
@@ -54,6 +56,30 @@ public SqlExecutor(Configuration configuration, String udfPath) {
5456
}
5557
}
5658

59+
public void setupSystemFunctions() {
60+
61+
ServiceLoader<StandardLibraryFunction> standardLibraryFunctions =
62+
ServiceLoader.load(StandardLibraryFunction.class);
63+
64+
standardLibraryFunctions.forEach(
65+
function -> {
66+
String sql =
67+
String.format(
68+
"CREATE TEMPORARY FUNCTION IF NOT EXISTS `%s` AS '%s' LANGUAGE JAVA;",
69+
getFunctionNameFromClass(function.getClass()), function.getClass().getName());
70+
71+
System.out.println(sql);
72+
tableEnv.executeSql(sql);
73+
});
74+
}
75+
76+
static String getFunctionNameFromClass(Class clazz) {
77+
// String fctName = clazz.getSimpleName();
78+
// fctName = Character.toLowerCase(fctName.charAt(0)) + fctName.substring(1);
79+
// return fctName;
80+
return clazz.getSimpleName().toLowerCase();
81+
}
82+
5783
/**
5884
* Executes a single SQL script.
5985
*

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@
196196
<excludes>
197197
<exclude>src/test/resources/**</exclude>
198198
<exclude>m2e-target/**</exclude>
199+
<exclude>bin/**</exclude>
199200
</excludes>
200201
</licenseSet>
201202
</licenseSets>

sqrl-json/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@
8080
<artifactId>sqrl-lib-common</artifactId>
8181
<version>${project.version}</version>
8282
</dependency>
83-
8483
</dependencies>
8584

8685
</project>

sqrl-json/src/main/java/com/datasqrl/json/JsonArray.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import static com.datasqrl.json.JsonFunctions.createJsonArgumentTypeStrategy;
1919
import static com.datasqrl.json.JsonFunctions.createJsonType;
2020

21+
import com.datasqrl.function.StandardLibraryFunction;
22+
import com.google.auto.service.AutoService;
2123
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
2224
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
2325
import org.apache.flink.table.catalog.DataTypeFactory;
@@ -29,7 +31,8 @@
2931
import org.apache.flink.util.jackson.JacksonMapperFactory;
3032

3133
/** Creates a JSON array from the list of JSON objects and scalar values. */
32-
public class JsonArray extends ScalarFunction {
34+
@AutoService(StandardLibraryFunction.class)
35+
public class JsonArray extends ScalarFunction implements StandardLibraryFunction {
3336
private static final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
3437

3538
public FlinkJsonType eval(Object... objects) {

sqrl-json/src/main/java/com/datasqrl/json/JsonArrayAgg.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package com.datasqrl.json;
1717

18+
import com.datasqrl.function.StandardLibraryFunction;
19+
import com.google.auto.service.AutoService;
1820
import java.util.ArrayList;
1921
import lombok.SneakyThrows;
2022
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -23,7 +25,9 @@
2325
import org.apache.flink.util.jackson.JacksonMapperFactory;
2426

2527
/** Aggregation function that aggregates JSON objects into a JSON array. */
26-
public class JsonArrayAgg extends AggregateFunction<FlinkJsonType, ArrayAgg> {
28+
@AutoService(StandardLibraryFunction.class)
29+
public class JsonArrayAgg extends AggregateFunction<FlinkJsonType, ArrayAgg>
30+
implements StandardLibraryFunction {
2731

2832
private static final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
2933

sqrl-json/src/main/java/com/datasqrl/json/JsonConcat.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,17 @@
1515
*/
1616
package com.datasqrl.json;
1717

18+
import com.datasqrl.function.StandardLibraryFunction;
19+
import com.google.auto.service.AutoService;
1820
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
1921
import org.apache.flink.table.functions.ScalarFunction;
2022

2123
/**
2224
* Merges two JSON objects into one. If two objects share the same key, the value from the later
2325
* object is used.
2426
*/
25-
public class JsonConcat extends ScalarFunction {
27+
@AutoService(StandardLibraryFunction.class)
28+
public class JsonConcat extends ScalarFunction implements StandardLibraryFunction {
2629

2730
public FlinkJsonType eval(FlinkJsonType json1, FlinkJsonType json2) {
2831
if (json1 == null || json2 == null) {

sqrl-json/src/main/java/com/datasqrl/json/JsonExists.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@
1515
*/
1616
package com.datasqrl.json;
1717

18+
import com.datasqrl.function.StandardLibraryFunction;
19+
import com.google.auto.service.AutoService;
1820
import org.apache.flink.table.functions.ScalarFunction;
1921
import org.apache.flink.table.runtime.functions.SqlJsonUtils;
2022

2123
/** For a given JSON object, checks whether the provided JSON path exists */
22-
public class JsonExists extends ScalarFunction {
24+
@AutoService(StandardLibraryFunction.class)
25+
public class JsonExists extends ScalarFunction implements StandardLibraryFunction {
2326

2427
public Boolean eval(FlinkJsonType json, String path) {
2528
if (json == null) {

sqrl-json/src/main/java/com/datasqrl/json/JsonExtract.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package com.datasqrl.json;
1717

18+
import com.datasqrl.function.StandardLibraryFunction;
19+
import com.google.auto.service.AutoService;
1820
import com.jayway.jsonpath.JsonPath;
1921
import com.jayway.jsonpath.ReadContext;
2022
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -25,7 +27,8 @@
2527
* can be provided to specify a default value when the given JSON path does not yield a value for
2628
* the JSON object.
2729
*/
28-
public class JsonExtract extends ScalarFunction {
30+
@AutoService(StandardLibraryFunction.class)
31+
public class JsonExtract extends ScalarFunction implements StandardLibraryFunction {
2932

3033
public String eval(FlinkJsonType input, String pathSpec) {
3134
if (input == null) {

sqrl-json/src/main/java/com/datasqrl/json/JsonObject.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
import static com.datasqrl.json.JsonFunctions.createJsonArgumentTypeStrategy;
1919

20+
import com.datasqrl.function.StandardLibraryFunction;
21+
import com.google.auto.service.AutoService;
2022
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
2123
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
2224
import org.apache.flink.table.api.DataTypes;
@@ -34,7 +36,8 @@
3436
* of each pair being the key and the second being the value. If multiple key-value pairs have the
3537
* same key, the last pair is added to the JSON object.
3638
*/
37-
public class JsonObject extends ScalarFunction {
39+
@AutoService(StandardLibraryFunction.class)
40+
public class JsonObject extends ScalarFunction implements StandardLibraryFunction {
3841
static final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
3942

4043
public FlinkJsonType eval(Object... objects) {

sqrl-json/src/main/java/com/datasqrl/json/JsonObjectAgg.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package com.datasqrl.json;
1717

18+
import com.datasqrl.function.StandardLibraryFunction;
19+
import com.google.auto.service.AutoService;
1820
import java.util.LinkedHashMap;
1921
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
2022
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
@@ -34,7 +36,9 @@
3436
value = "RAW",
3537
bridgedTo = FlinkJsonType.class,
3638
rawSerializer = FlinkJsonTypeSerializer.class))
37-
public class JsonObjectAgg extends AggregateFunction<Object, ObjectAgg> {
39+
@AutoService(StandardLibraryFunction.class)
40+
public class JsonObjectAgg extends AggregateFunction<Object, ObjectAgg>
41+
implements StandardLibraryFunction {
3842

3943
private static final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
4044

sqrl-json/src/main/java/com/datasqrl/json/JsonQuery.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package com.datasqrl.json;
1717

18+
import com.datasqrl.function.StandardLibraryFunction;
19+
import com.google.auto.service.AutoService;
1820
import com.jayway.jsonpath.JsonPath;
1921
import com.jayway.jsonpath.ReadContext;
2022
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -26,7 +28,8 @@
2628
* For a given JSON object, executes a JSON path query against the object and returns the result as
2729
* string.
2830
*/
29-
public class JsonQuery extends ScalarFunction {
31+
@AutoService(StandardLibraryFunction.class)
32+
public class JsonQuery extends ScalarFunction implements StandardLibraryFunction {
3033
static final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
3134

3235
public String eval(FlinkJsonType input, String pathSpec) {

sqrl-json/src/main/java/com/datasqrl/json/JsonToString.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,13 @@
1616
package com.datasqrl.json;
1717

1818
import com.datasqrl.function.SqrlCastFunction;
19+
import com.datasqrl.function.StandardLibraryFunction;
20+
import com.google.auto.service.AutoService;
1921
import org.apache.flink.table.functions.ScalarFunction;
2022

21-
public class JsonToString extends ScalarFunction implements SqrlCastFunction {
23+
@AutoService(StandardLibraryFunction.class)
24+
public class JsonToString extends ScalarFunction
25+
implements StandardLibraryFunction, SqrlCastFunction {
2226

2327
public String eval(FlinkJsonType json) {
2428
if (json == null) {

sqrl-json/src/main/java/com/datasqrl/json/ToJson.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
package com.datasqrl.json;
1717

1818
import com.datasqrl.function.SqrlCastFunction;
19+
import com.datasqrl.function.StandardLibraryFunction;
20+
import com.google.auto.service.AutoService;
1921
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
2022
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
2123
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -28,7 +30,8 @@
2830
import org.apache.flink.util.jackson.JacksonMapperFactory;
2931

3032
/** Parses a JSON object from string */
31-
public class ToJson extends ScalarFunction implements SqrlCastFunction {
33+
@AutoService(StandardLibraryFunction.class)
34+
public class ToJson extends ScalarFunction implements StandardLibraryFunction, SqrlCastFunction {
3235

3336
public static final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
3437

sqrl-lib-common/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@
5858
<version>1.19.0</version>
5959
<scope>provided</scope>
6060
</dependency>
61+
<dependency>
62+
<groupId>com.google.auto.service</groupId>
63+
<artifactId>auto-service</artifactId>
64+
<version>1.1.1</version>
65+
</dependency>
6166
</dependencies>
6267

6368
</project>

sqrl-lib-common/src/main/java/com/datasqrl/datatype/HashColumns.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package com.datasqrl.datatype;
1717

18+
import com.datasqrl.function.StandardLibraryFunction;
19+
import com.google.auto.service.AutoService;
1820
import java.nio.charset.StandardCharsets;
1921
import java.security.MessageDigest;
2022
import java.security.NoSuchAlgorithmException;
@@ -27,7 +29,8 @@
2729
import org.apache.flink.table.types.inference.TypeInference;
2830
import org.apache.flink.table.types.inference.TypeStrategies;
2931

30-
public class HashColumns extends ScalarFunction {
32+
@AutoService(StandardLibraryFunction.class)
33+
public class HashColumns extends ScalarFunction implements StandardLibraryFunction {
3134

3235
public String eval(Object... objects) {
3336
if (objects.length == 0) return "";

sqrl-lib-common/src/main/java/com/datasqrl/datatype/Noop.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package com.datasqrl.datatype;
1717

18+
import com.datasqrl.function.StandardLibraryFunction;
19+
import com.google.auto.service.AutoService;
1820
import org.apache.flink.table.api.DataTypes;
1921
import org.apache.flink.table.catalog.DataTypeFactory;
2022
import org.apache.flink.table.functions.ScalarFunction;
@@ -23,7 +25,8 @@
2325
import org.apache.flink.table.types.inference.TypeInference;
2426
import org.apache.flink.table.types.inference.TypeStrategies;
2527

26-
public class Noop extends ScalarFunction {
28+
@AutoService(StandardLibraryFunction.class)
29+
public class Noop extends ScalarFunction implements StandardLibraryFunction {
2730

2831
public boolean eval(Object... objects) {
2932
return true;

sqrl-lib-common/src/main/java/com/datasqrl/datatype/SerializeToBytes.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
package com.datasqrl.datatype;
1717

1818
import com.datasqrl.function.SqrlCastFunction;
19+
import com.datasqrl.function.StandardLibraryFunction;
20+
import com.google.auto.service.AutoService;
1921
import lombok.SneakyThrows;
2022
import org.apache.flink.api.common.typeutils.TypeSerializer;
2123
import org.apache.flink.core.memory.DataOutputSerializer;
@@ -24,7 +26,9 @@
2426
import org.apache.flink.table.functions.ScalarFunction;
2527

2628
/** Converts an annotated data type to */
27-
public class SerializeToBytes extends ScalarFunction implements SqrlCastFunction {
29+
@AutoService(StandardLibraryFunction.class)
30+
public class SerializeToBytes extends ScalarFunction
31+
implements StandardLibraryFunction, SqrlCastFunction {
2832

2933
@SneakyThrows
3034
public byte[] eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object object) {
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Copyright © 2024 DataSQRL (contact@datasqrl.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datasqrl.function;
17+
18+
/** Marker interface for functions that are used by DataSQRL track internal functions */
19+
public interface StandardLibraryFunction {}

sqrl-secure/src/main/java/com/datasqrl/secure/RandomID.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
package com.datasqrl.secure;
1717

1818
import com.datasqrl.function.FlinkTypeUtil;
19+
import com.datasqrl.function.StandardLibraryFunction;
20+
import com.google.auto.service.AutoService;
1921
import java.security.SecureRandom;
2022
import java.util.Base64;
2123
import java.util.List;
@@ -28,7 +30,8 @@
2830
* Generates a random ID string with the given number of secure random bytes. The bytes are base64
2931
* encoded so the string length will be longer than the number of bytes
3032
*/
31-
public class RandomID extends ScalarFunction {
33+
@AutoService(StandardLibraryFunction.class)
34+
public class RandomID extends ScalarFunction implements StandardLibraryFunction {
3235

3336
private static final SecureRandom random = new SecureRandom();
3437
private static final Base64.Encoder encoder = Base64.getUrlEncoder().withoutPadding();

0 commit comments

Comments
 (0)