Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions flink-jar-runner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@
<artifactId>sqrl-text</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>sqrl-vector</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>sqrl-flexible-json</artifactId>
Expand Down Expand Up @@ -278,6 +283,16 @@
<composeFile>${project.basedir}/target/test-classes/docker-compose.yml</composeFile>
</configuration>
<executions>
<execution>
<id>down-before-execution</id>
<goals>
<goal>down</goal>
</goals>
<phase>pre-integration-test</phase>
<configuration>
<removeVolumes>true</removeVolumes>
</configuration>
</execution>
<execution>
<id>up</id>
<goals>
Expand Down
1 change: 1 addition & 0 deletions flink-jar-runner/src/main/java/com/datasqrl/FlinkMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public int run() throws Exception {

planJson = replaceScriptWithEnv(planJson);

sqlExecutor.setupSystemFunctions();
tableResult = sqlExecutor.executeCompiledPlan(planJson);
} else {
System.err.println("Invalid input. Please provide one of the following combinations:");
Expand Down
26 changes: 26 additions & 0 deletions flink-jar-runner/src/main/java/com/datasqrl/SqlExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
*/
package com.datasqrl;

import com.datasqrl.function.StandardLibraryFunction;
import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -54,6 +56,30 @@ public SqlExecutor(Configuration configuration, String udfPath) {
}
}

public void setupSystemFunctions() {

ServiceLoader<StandardLibraryFunction> standardLibraryFunctions =
ServiceLoader.load(StandardLibraryFunction.class);

standardLibraryFunctions.forEach(
function -> {
String sql =
String.format(
"CREATE TEMPORARY FUNCTION IF NOT EXISTS `%s` AS '%s' LANGUAGE JAVA;",
getFunctionNameFromClass(function.getClass()), function.getClass().getName());

System.out.println(sql);
tableEnv.executeSql(sql);
});
}

static String getFunctionNameFromClass(Class clazz) {
// String fctName = clazz.getSimpleName();
// fctName = Character.toLowerCase(fctName.charAt(0)) + fctName.substring(1);
// return fctName;
return clazz.getSimpleName().toLowerCase();
}

/**
* Executes a single SQL script.
*
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@
<excludes>
<exclude>src/test/resources/**</exclude>
<exclude>m2e-target/**</exclude>
<exclude>bin/**</exclude>
</excludes>
</licenseSet>
</licenseSets>
Expand Down
1 change: 0 additions & 1 deletion sqrl-json/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@
<artifactId>sqrl-lib-common</artifactId>
<version>${project.version}</version>
</dependency>

</dependencies>

</project>
5 changes: 4 additions & 1 deletion sqrl-json/src/main/java/com/datasqrl/json/JsonArray.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import static com.datasqrl.json.JsonFunctions.createJsonArgumentTypeStrategy;
import static com.datasqrl.json.JsonFunctions.createJsonType;

import com.datasqrl.function.StandardLibraryFunction;
import com.google.auto.service.AutoService;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.flink.table.catalog.DataTypeFactory;
Expand All @@ -29,7 +31,8 @@
import org.apache.flink.util.jackson.JacksonMapperFactory;

/** Creates a JSON array from the list of JSON objects and scalar values. */
public class JsonArray extends ScalarFunction {
@AutoService(StandardLibraryFunction.class)
public class JsonArray extends ScalarFunction implements StandardLibraryFunction {
private static final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();

public FlinkJsonType eval(Object... objects) {
Expand Down
6 changes: 5 additions & 1 deletion sqrl-json/src/main/java/com/datasqrl/json/JsonArrayAgg.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.datasqrl.json;

import com.datasqrl.function.StandardLibraryFunction;
import com.google.auto.service.AutoService;
import java.util.ArrayList;
import lombok.SneakyThrows;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -23,7 +25,9 @@
import org.apache.flink.util.jackson.JacksonMapperFactory;

/** Aggregation function that aggregates JSON objects into a JSON array. */
public class JsonArrayAgg extends AggregateFunction<FlinkJsonType, ArrayAgg> {
@AutoService(StandardLibraryFunction.class)
public class JsonArrayAgg extends AggregateFunction<FlinkJsonType, ArrayAgg>
implements StandardLibraryFunction {

private static final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();

Expand Down
5 changes: 4 additions & 1 deletion sqrl-json/src/main/java/com/datasqrl/json/JsonConcat.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
*/
package com.datasqrl.json;

import com.datasqrl.function.StandardLibraryFunction;
import com.google.auto.service.AutoService;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.table.functions.ScalarFunction;

/**
* Merges two JSON objects into one. If two objects share the same key, the value from the later
* object is used.
*/
public class JsonConcat extends ScalarFunction {
@AutoService(StandardLibraryFunction.class)
public class JsonConcat extends ScalarFunction implements StandardLibraryFunction {

public FlinkJsonType eval(FlinkJsonType json1, FlinkJsonType json2) {
if (json1 == null || json2 == null) {
Expand Down
5 changes: 4 additions & 1 deletion sqrl-json/src/main/java/com/datasqrl/json/JsonExists.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
*/
package com.datasqrl.json;

import com.datasqrl.function.StandardLibraryFunction;
import com.google.auto.service.AutoService;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.runtime.functions.SqlJsonUtils;

/** For a given JSON object, checks whether the provided JSON path exists */
public class JsonExists extends ScalarFunction {
@AutoService(StandardLibraryFunction.class)
public class JsonExists extends ScalarFunction implements StandardLibraryFunction {

public Boolean eval(FlinkJsonType json, String path) {
if (json == null) {
Expand Down
5 changes: 4 additions & 1 deletion sqrl-json/src/main/java/com/datasqrl/json/JsonExtract.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.datasqrl.json;

import com.datasqrl.function.StandardLibraryFunction;
import com.google.auto.service.AutoService;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.ReadContext;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -25,7 +27,8 @@
* can be provided to specify a default value when the given JSON path does not yield a value for
* the JSON object.
*/
public class JsonExtract extends ScalarFunction {
@AutoService(StandardLibraryFunction.class)
public class JsonExtract extends ScalarFunction implements StandardLibraryFunction {

public String eval(FlinkJsonType input, String pathSpec) {
if (input == null) {
Expand Down
5 changes: 4 additions & 1 deletion sqrl-json/src/main/java/com/datasqrl/json/JsonObject.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import static com.datasqrl.json.JsonFunctions.createJsonArgumentTypeStrategy;

import com.datasqrl.function.StandardLibraryFunction;
import com.google.auto.service.AutoService;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.table.api.DataTypes;
Expand All @@ -34,7 +36,8 @@
* of each pair being the key and the second being the value. If multiple key-value pairs have the
* same key, the last pair is added to the JSON object.
*/
public class JsonObject extends ScalarFunction {
@AutoService(StandardLibraryFunction.class)
public class JsonObject extends ScalarFunction implements StandardLibraryFunction {
static final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();

public FlinkJsonType eval(Object... objects) {
Expand Down
6 changes: 5 additions & 1 deletion sqrl-json/src/main/java/com/datasqrl/json/JsonObjectAgg.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.datasqrl.json;

import com.datasqrl.function.StandardLibraryFunction;
import com.google.auto.service.AutoService;
import java.util.LinkedHashMap;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
Expand All @@ -34,7 +36,9 @@
value = "RAW",
bridgedTo = FlinkJsonType.class,
rawSerializer = FlinkJsonTypeSerializer.class))
public class JsonObjectAgg extends AggregateFunction<Object, ObjectAgg> {
@AutoService(StandardLibraryFunction.class)
public class JsonObjectAgg extends AggregateFunction<Object, ObjectAgg>
implements StandardLibraryFunction {

private static final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();

Expand Down
5 changes: 4 additions & 1 deletion sqrl-json/src/main/java/com/datasqrl/json/JsonQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.datasqrl.json;

import com.datasqrl.function.StandardLibraryFunction;
import com.google.auto.service.AutoService;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.ReadContext;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -26,7 +28,8 @@
* For a given JSON object, executes a JSON path query against the object and returns the result as
* string.
*/
public class JsonQuery extends ScalarFunction {
@AutoService(StandardLibraryFunction.class)
public class JsonQuery extends ScalarFunction implements StandardLibraryFunction {
static final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();

public String eval(FlinkJsonType input, String pathSpec) {
Expand Down
6 changes: 5 additions & 1 deletion sqrl-json/src/main/java/com/datasqrl/json/JsonToString.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@
package com.datasqrl.json;

import com.datasqrl.function.SqrlCastFunction;
import com.datasqrl.function.StandardLibraryFunction;
import com.google.auto.service.AutoService;
import org.apache.flink.table.functions.ScalarFunction;

public class JsonToString extends ScalarFunction implements SqrlCastFunction {
@AutoService(StandardLibraryFunction.class)
public class JsonToString extends ScalarFunction
implements StandardLibraryFunction, SqrlCastFunction {

public String eval(FlinkJsonType json) {
if (json == null) {
Expand Down
5 changes: 4 additions & 1 deletion sqrl-json/src/main/java/com/datasqrl/json/ToJson.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package com.datasqrl.json;

import com.datasqrl.function.SqrlCastFunction;
import com.datasqrl.function.StandardLibraryFunction;
import com.google.auto.service.AutoService;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -28,7 +30,8 @@
import org.apache.flink.util.jackson.JacksonMapperFactory;

/** Parses a JSON object from string */
public class ToJson extends ScalarFunction implements SqrlCastFunction {
@AutoService(StandardLibraryFunction.class)
public class ToJson extends ScalarFunction implements StandardLibraryFunction, SqrlCastFunction {

public static final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();

Expand Down
5 changes: 5 additions & 0 deletions sqrl-lib-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@
<version>1.19.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
<version>1.1.1</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.datasqrl.datatype;

import com.datasqrl.function.StandardLibraryFunction;
import com.google.auto.service.AutoService;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
Expand All @@ -27,7 +29,8 @@
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.table.types.inference.TypeStrategies;

public class HashColumns extends ScalarFunction {
@AutoService(StandardLibraryFunction.class)
public class HashColumns extends ScalarFunction implements StandardLibraryFunction {

public String eval(Object... objects) {
if (objects.length == 0) return "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.datasqrl.datatype;

import com.datasqrl.function.StandardLibraryFunction;
import com.google.auto.service.AutoService;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.ScalarFunction;
Expand All @@ -23,7 +25,8 @@
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.table.types.inference.TypeStrategies;

public class Noop extends ScalarFunction {
@AutoService(StandardLibraryFunction.class)
public class Noop extends ScalarFunction implements StandardLibraryFunction {

public boolean eval(Object... objects) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package com.datasqrl.datatype;

import com.datasqrl.function.SqrlCastFunction;
import com.datasqrl.function.StandardLibraryFunction;
import com.google.auto.service.AutoService;
import lombok.SneakyThrows;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputSerializer;
Expand All @@ -24,7 +26,9 @@
import org.apache.flink.table.functions.ScalarFunction;

/** Converts an annotated data type to */
public class SerializeToBytes extends ScalarFunction implements SqrlCastFunction {
@AutoService(StandardLibraryFunction.class)
public class SerializeToBytes extends ScalarFunction
implements StandardLibraryFunction, SqrlCastFunction {

@SneakyThrows
public byte[] eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object object) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright © 2024 DataSQRL (contact@datasqrl.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datasqrl.function;

/** Marker interface for functions that are used by DataSQRL track internal functions */
public interface StandardLibraryFunction {}
5 changes: 4 additions & 1 deletion sqrl-secure/src/main/java/com/datasqrl/secure/RandomID.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package com.datasqrl.secure;

import com.datasqrl.function.FlinkTypeUtil;
import com.datasqrl.function.StandardLibraryFunction;
import com.google.auto.service.AutoService;
import java.security.SecureRandom;
import java.util.Base64;
import java.util.List;
Expand All @@ -28,7 +30,8 @@
* Generates a random ID string with the given number of secure random bytes. The bytes are base64
* encoded so the string length will be longer than the number of bytes
*/
public class RandomID extends ScalarFunction {
@AutoService(StandardLibraryFunction.class)
public class RandomID extends ScalarFunction implements StandardLibraryFunction {

private static final SecureRandom random = new SecureRandom();
private static final Base64.Encoder encoder = Base64.getUrlEncoder().withoutPadding();
Expand Down
Loading