Skip to content

Commit e9f7d02

Browse files
authored
Make the runner more flexible for easier SQRL integration (#160)
1 parent 70aae76 commit e9f7d02

File tree

15 files changed

+658
-376
lines changed

15 files changed

+658
-376
lines changed

flink-sql-runner/pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,7 @@
304304
<finalName>flink-sql-runner.uber</finalName>
305305
<shadedArtifactAttached>true</shadedArtifactAttached>
306306
<shadedClassifierName>flink${flink.version}</shadedClassifierName>
307+
<createDependencyReducedPom>false</createDependencyReducedPom>
307308
<artifactSet>
308309
<excludes>
309310
<!--
@@ -345,7 +346,7 @@
345346
<transformers>
346347
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
347348
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
348-
<mainClass>com.datasqrl.flinkrunner.FlinkMain</mainClass>
349+
<mainClass>com.datasqrl.flinkrunner.CliRunner</mainClass>
349350
</transformer>
350351
</transformers>
351352
</configuration>
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Copyright © 2025 DataSQRL (contact@datasqrl.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datasqrl.flinkrunner;
17+
18+
import java.io.File;
19+
import java.io.IOException;
20+
import java.util.function.Supplier;
21+
import javax.annotation.Nullable;
22+
import lombok.RequiredArgsConstructor;
23+
import org.apache.commons.lang3.StringUtils;
24+
import org.apache.flink.annotation.VisibleForTesting;
25+
import org.apache.flink.api.common.RuntimeExecutionMode;
26+
import org.apache.flink.configuration.Configuration;
27+
import org.apache.flink.configuration.ExecutionOptions;
28+
import org.apache.flink.configuration.GlobalConfiguration;
29+
import org.apache.flink.table.api.TableResult;
30+
import org.apache.flink.util.FileUtils;
31+
32+
@RequiredArgsConstructor
33+
abstract class BaseRunner {
34+
35+
final RuntimeExecutionMode mode;
36+
final EnvVarResolver resolver;
37+
@Nullable final String sqlFile;
38+
@Nullable final String planFile;
39+
@Nullable final String configDir;
40+
@Nullable final String udfPath;
41+
@Nullable final Configuration config;
42+
43+
public TableResult run() throws Exception {
44+
var execConfig = config != null ? config : initConfiguration();
45+
46+
return run(() -> new SqlExecutor(execConfig, udfPath));
47+
}
48+
49+
@VisibleForTesting
50+
TableResult run(Supplier<SqlExecutor> sqlExecutorSupplier) throws Exception {
51+
var sqlExecutor = sqlExecutorSupplier.get();
52+
53+
if (StringUtils.isNoneBlank(sqlFile, planFile)) {
54+
throw new IllegalArgumentException(
55+
"Provide either a SQL file or a compiled plan - not both.");
56+
}
57+
58+
if (StringUtils.isAllBlank(sqlFile, planFile)) {
59+
throw new IllegalArgumentException(
60+
"Invalid input. Please provide one of the following combinations: 1. A single SQL file (--sqlfile) 2. A plan JSON file (--planfile)");
61+
}
62+
63+
if (StringUtils.isNotBlank(sqlFile)) {
64+
// Single SQL file mode
65+
var script = readTextFile(sqlFile);
66+
script = resolver.resolve(script);
67+
68+
sqlExecutor.setupSystemFunctions();
69+
return sqlExecutor.executeScript(script);
70+
}
71+
72+
// Compiled plan JSON file
73+
var planJson = readTextFile(planFile);
74+
planJson = resolver.resolveInJson(planJson);
75+
76+
sqlExecutor.setupSystemFunctions();
77+
return sqlExecutor.executeCompiledPlan(planJson);
78+
}
79+
80+
/**
81+
* Initializes Flink {@link Configuration}. Loads configuration from YAML file, if given. Sets
82+
* {@link RuntimeExecutionMode} based on the given CLI option, if it was not part of the YAML
83+
* config.
84+
*
85+
* @return the initialized Flink {@link Configuration} object
86+
*/
87+
@VisibleForTesting
88+
Configuration initConfiguration() {
89+
var conf = new Configuration();
90+
if (StringUtils.isNotBlank(configDir)) {
91+
System.out.printf("Loading configuration from %s\n", configDir);
92+
conf = GlobalConfiguration.loadConfiguration(configDir);
93+
}
94+
95+
// Do not overwrite runtime given in YAML
96+
if (!conf.contains(ExecutionOptions.RUNTIME_MODE)) {
97+
conf.set(ExecutionOptions.RUNTIME_MODE, mode);
98+
}
99+
100+
return conf;
101+
}
102+
103+
static String readTextFile(String path) throws IOException {
104+
var f = new File(path);
105+
if (!f.exists()) {
106+
throw new IllegalArgumentException(String.format("Given file '%s' does not exist", path));
107+
}
108+
109+
if (!f.isFile()) {
110+
throw new IllegalArgumentException(
111+
String.format("Given file '%s' is not a regular file", path));
112+
}
113+
114+
return FileUtils.readFileUtf8(f);
115+
}
116+
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright © 2025 DataSQRL (contact@datasqrl.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datasqrl.flinkrunner;
17+
18+
import java.util.Arrays;
19+
import java.util.concurrent.Callable;
20+
import javax.annotation.Nullable;
21+
import org.apache.flink.annotation.VisibleForTesting;
22+
import org.apache.flink.api.common.RuntimeExecutionMode;
23+
import picocli.CommandLine;
24+
import picocli.CommandLine.Command;
25+
import picocli.CommandLine.Option;
26+
27+
public class CliRunner extends BaseRunner {
28+
29+
@SuppressWarnings("unused")
30+
@Command(
31+
name = "SqlRunner",
32+
mixinStandardHelpOptions = true,
33+
version = "0.6",
34+
description = "Runs SQL scripts using Flink TableEnvironment.")
35+
public static class SqlRunner implements Callable<Void> {
36+
37+
@Option(
38+
names = {"-m", "--mode"},
39+
defaultValue = "STREAMING",
40+
description =
41+
"Flink runtime execution mode to apply to the given SQL program. Valid values: ${COMPLETION-CANDIDATES}.")
42+
private RuntimeExecutionMode mode;
43+
44+
@Option(
45+
names = {"-s", "--sqlfile"},
46+
description = "SQL file to execute.")
47+
private String sqlFile;
48+
49+
@Option(
50+
names = {"-p", "--planfile"},
51+
description = "Compiled plan JSON file.")
52+
private String planFile;
53+
54+
@Option(
55+
names = {"-c", "--config-dir"},
56+
description = "Directory containing configuration YAML file.")
57+
private String configDir;
58+
59+
@Option(
60+
names = {"-u", "--udfpath"},
61+
description = "Path to UDFs.")
62+
private String udfPath;
63+
64+
@Override
65+
public Void call() {
66+
return null;
67+
}
68+
}
69+
70+
public CliRunner(
71+
RuntimeExecutionMode mode,
72+
@Nullable String sqlFile,
73+
@Nullable String planFile,
74+
@Nullable String configDir,
75+
@Nullable String udfPath) {
76+
this(mode, new EnvVarResolver(), sqlFile, planFile, configDir, udfPath);
77+
}
78+
79+
@VisibleForTesting
80+
CliRunner(
81+
RuntimeExecutionMode mode,
82+
EnvVarResolver resolver,
83+
@Nullable String sqlFile,
84+
@Nullable String planFile,
85+
@Nullable String configDir,
86+
@Nullable String udfPath) {
87+
super(mode, resolver, sqlFile, planFile, configDir, udfPath, null);
88+
}
89+
90+
public static void main(String[] args) throws Exception {
91+
System.out.printf("\n\nExecuting flink-sql-runner: %s\n\n", Arrays.toString(args));
92+
93+
var cl = new CommandLine(new SqlRunner());
94+
var resCode = cl.execute(args);
95+
if (resCode != 0) {
96+
System.exit(resCode);
97+
}
98+
99+
if (cl.isUsageHelpRequested()) {
100+
return;
101+
}
102+
103+
SqlRunner runner = cl.getCommand();
104+
105+
// Determine UDF path
106+
if (runner.udfPath == null) {
107+
runner.udfPath = System.getenv("UDF_PATH");
108+
}
109+
110+
new CliRunner(runner.mode, runner.sqlFile, runner.planFile, runner.configDir, runner.udfPath)
111+
.run();
112+
113+
System.out.println("Finished flink-sql-runner");
114+
}
115+
}

flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/EnvVarUtils.java renamed to flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/EnvVarResolver.java

Lines changed: 44 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,32 +22,40 @@
2222
import java.util.regex.Pattern;
2323
import lombok.extern.slf4j.Slf4j;
2424
import org.apache.commons.lang3.StringUtils;
25+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
26+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
27+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonDeserializer;
2528
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
2629
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
2730

28-
/** Environment variable handling utilities. */
31+
/** Environment variable resolving functionality. */
2932
@Slf4j
30-
class EnvVarUtils {
33+
public class EnvVarResolver {
3134

3235
private static final Pattern ENVIRONMENT_VARIABLE_PATTERN = Pattern.compile("\\$\\{(.*?)\\}");
33-
private static final ObjectMapper OBJECT_MAPPER = initObjectMapper();
3436

35-
/** See {@link EnvVarUtils#resolveEnvVars(String, Map)}. */
36-
static String resolveEnvVars(String src) {
37-
return resolveEnvVars(src, System.getenv());
37+
private final Map<String, String> envVars;
38+
private final ObjectMapper objectMapper;
39+
40+
public EnvVarResolver() {
41+
this(System.getenv());
42+
}
43+
44+
public EnvVarResolver(Map<String, String> envVars) {
45+
this.envVars = envVars;
46+
objectMapper = initObjectMapper();
3847
}
3948

4049
/**
4150
* Resolves environment variables referenced in a given source string. Searches for environment
42-
* variable references based on {@link EnvVarUtils#ENVIRONMENT_VARIABLE_PATTERN}. If a blank
51+
* variable references based on {@link EnvVarResolver#ENVIRONMENT_VARIABLE_PATTERN}. If a blank
4352
* source string is passed, it will be returned as is.
4453
*
4554
* @param src given source string that may contain environment variable references
46-
* @param envVars available environment variables
4755
* @return a new string with the resolved environment variables
4856
* @throws IllegalStateException if any referenced environment variable are not available
4957
*/
50-
static String resolveEnvVars(String src, Map<String, String> envVars) {
58+
public String resolve(String src) {
5159
if (StringUtils.isBlank(src)) {
5260
return src;
5361
}
@@ -57,10 +65,24 @@ static String resolveEnvVars(String src, Map<String, String> envVars) {
5765
var matcher = ENVIRONMENT_VARIABLE_PATTERN.matcher(src);
5866
var missingEnvVars = new HashSet<String>();
5967
while (matcher.find()) {
60-
var key = matcher.group(1);
68+
var rawKey = matcher.group(1);
69+
String key;
70+
String defaultValue = null;
71+
72+
// Split at first ':' to support default values
73+
int colonIdx = rawKey.indexOf(':');
74+
if (colonIdx >= 0) {
75+
key = rawKey.substring(0, colonIdx);
76+
defaultValue = rawKey.substring(colonIdx + 1);
77+
} else {
78+
key = rawKey;
79+
}
80+
6181
if (envVars.containsKey(key)) {
6282
var envValue = envVars.get(key);
6383
matcher.appendReplacement(res, Matcher.quoteReplacement(envValue));
84+
} else if (defaultValue != null) {
85+
matcher.appendReplacement(res, Matcher.quoteReplacement(defaultValue));
6486
} else {
6587
missingEnvVars.add(key);
6688
}
@@ -80,19 +102,19 @@ static String resolveEnvVars(String src, Map<String, String> envVars) {
80102
/**
81103
* Resolves environment variables referenced in a given JSON source string.Searches for
82104
* environment variable references in any string leaf nodes based on {@link
83-
* EnvVarUtils#ENVIRONMENT_VARIABLE_PATTERN}.
105+
* EnvVarResolver#ENVIRONMENT_VARIABLE_PATTERN}.
84106
*
85107
* @param jsonSrc given JSON source string that may contain environment variable references
86108
* @return JSON string with the resolved environment variables
87109
* @throws IOException if the JSON processing fails in any way
88110
*/
89-
static String resolveEnvVarsInJson(String jsonSrc) throws IOException {
90-
var res = OBJECT_MAPPER.readValue(jsonSrc, Map.class);
111+
public String resolveInJson(String jsonSrc) throws IOException {
112+
var res = objectMapper.readValue(jsonSrc, Map.class);
91113

92-
return OBJECT_MAPPER.writeValueAsString(res);
114+
return objectMapper.writeValueAsString(res);
93115
}
94116

95-
private static ObjectMapper initObjectMapper() {
117+
private ObjectMapper initObjectMapper() {
96118
var objectMapper = new ObjectMapper();
97119

98120
var module = new SimpleModule();
@@ -102,7 +124,12 @@ private static ObjectMapper initObjectMapper() {
102124
return objectMapper;
103125
}
104126

105-
private EnvVarUtils() {
106-
throw new UnsupportedOperationException();
127+
private class JsonEnvVarDeserializer extends JsonDeserializer<String> {
128+
129+
@Override
130+
public String deserialize(JsonParser parser, DeserializationContext ctx) throws IOException {
131+
var value = parser.getText();
132+
return resolve(value);
133+
}
107134
}
108135
}

0 commit comments

Comments
 (0)