Skip to content

Commit 89666b3

Browse files
committed
Validate if all environment variables are present
Signed-off-by: Marvin Froeder <velo.br@gmail.com>
1 parent 8dc2b76 commit 89666b3

File tree

5 files changed

+74
-14
lines changed

5 files changed

+74
-14
lines changed

fink-dockerfile-example/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ ext {
1212
flinkVersion = "1.19.1"
1313
jdbcVersion = "3.2.0-1.19"
1414
kafkaVersion = "3.2.0-1.19"
15-
sqrlVersion = "0.5.6"
15+
sqrlVersion = "0.5.7"
1616
icebergVersion = "1.6.0"
1717
}
1818

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package com.datasqrl;
2+
3+
import java.util.Map;
4+
import java.util.Set;
5+
import java.util.TreeMap;
6+
import java.util.TreeSet;
7+
import java.util.regex.Matcher;
8+
import java.util.regex.Pattern;
9+
import lombok.experimental.UtilityClass;
10+
11+
@UtilityClass
12+
public class EnvironmentVariablesUtils {
13+
14+
private static final Pattern ENVIRONMENT_VARIABLE_PATTERN = Pattern.compile("\\$\\{(.*?)\\}");
15+
16+
public static String replaceWithEnv(String command, Map<String, String> envVariables) {
17+
String substitutedStr = command;
18+
StringBuffer result = new StringBuffer();
19+
// First pass to replace environment variables
20+
Matcher matcher = ENVIRONMENT_VARIABLE_PATTERN.matcher(substitutedStr);
21+
while (matcher.find()) {
22+
String key = matcher.group(1);
23+
String envValue = envVariables.getOrDefault(key, "");
24+
matcher.appendReplacement(result, Matcher.quoteReplacement(envValue));
25+
}
26+
matcher.appendTail(result);
27+
28+
return result.toString();
29+
}
30+
31+
public void validateEnvironmentVariables(TreeMap<String, String> envVariables, String script) {
32+
Matcher matcher = ENVIRONMENT_VARIABLE_PATTERN.matcher(script);
33+
34+
Set<String> scriptEnvironmentVariables = new TreeSet<>();
35+
while (matcher.find()) {
36+
scriptEnvironmentVariables.add(matcher.group(1));
37+
}
38+
39+
scriptEnvironmentVariables.removeAll(envVariables.keySet());
40+
41+
if (!scriptEnvironmentVariables.isEmpty()) {
42+
throw new IllegalStateException(
43+
String.format(
44+
"Could not find the following environment variables: %s",
45+
scriptEnvironmentVariables));
46+
}
47+
}
48+
}

src/main/java/com/datasqrl/SqlExecutor.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@ class SqlExecutor {
2424
Pattern.compile("SET\\s+'(\\S+)'\\s*=\\s*'(.+)';?", Pattern.CASE_INSENSITIVE);
2525

2626
private final TableEnvironment tableEnv;
27+
private final Map<String, String> envVariables;
2728

28-
public SqlExecutor(Configuration configuration, String udfPath) {
29+
public SqlExecutor(
30+
Configuration configuration, String udfPath, Map<String, String> envVariables) {
2931
StreamExecutionEnvironment sEnv;
3032
try {
3133
sEnv = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
@@ -44,6 +46,7 @@ public SqlExecutor(Configuration configuration, String udfPath) {
4446
if (udfPath != null) {
4547
setupUdfPath(udfPath);
4648
}
49+
this.envVariables = envVariables;
4750
}
4851

4952
/**
@@ -92,7 +95,8 @@ private TableResult executeStatement(String statement) {
9295
} else {
9396
System.out.println(statement);
9497
log.info("Executing statement:\n{}", statement);
95-
tableResult = tableEnv.executeSql(replaceWithEnv(statement));
98+
tableResult =
99+
tableEnv.executeSql(EnvironmentVariablesUtils.replaceWithEnv(statement, envVariables));
96100
}
97101
} catch (Exception e) {
98102
log.error("Failed to execute statement: {}", statement, e);

src/main/java/com/datasqrl/SqlRunner.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
package com.datasqrl;
1919

2020
import java.io.File;
21-
import java.util.*;
21+
import java.util.Map;
22+
import java.util.TreeMap;
2223
import java.util.concurrent.Callable;
2324
import lombok.SneakyThrows;
2425
import lombok.extern.slf4j.Slf4j;
@@ -29,7 +30,8 @@
2930
import org.apache.flink.table.api.TableResult;
3031
import org.apache.flink.util.FileUtils;
3132
import picocli.CommandLine;
32-
import picocli.CommandLine.*;
33+
import picocli.CommandLine.Command;
34+
import picocli.CommandLine.Option;
3335

3436
/** Main class for executing SQL scripts using picocli. */
3537
@Command(
@@ -67,7 +69,7 @@ public class SqlRunner implements Callable<Integer> {
6769
private String udfPath;
6870

6971
public static void main(String[] args) {
70-
int exitCode = new CommandLine(new SqlRunner()).execute(args);
72+
var exitCode = new CommandLine(new SqlRunner()).execute(args);
7173
System.exit(exitCode);
7274
}
7375

@@ -79,22 +81,27 @@ public Integer call() throws Exception {
7981
}
8082

8183
// Load configuration if configFile is provided
82-
Configuration configuration = new Configuration();
84+
var configuration = new Configuration();
8385
if (configFile != null) {
8486
configuration = loadConfigurationFromYaml(configFile);
8587
}
8688

89+
log.info("Environment variables");
90+
TreeMap<String, String> envVariables = new TreeMap<>(System.getenv());
91+
envVariables.forEach((name, value) -> log.info("{}: {}", name, value));
92+
8793
// Initialize SqlExecutor
88-
SqlExecutor sqlExecutor = new SqlExecutor(configuration, udfPath);
94+
var sqlExecutor = new SqlExecutor(configuration, udfPath, envVariables);
8995
TableResult tableResult;
9096
// Input validation and execution logic
9197
if (sqlFile != null) {
9298
// Single SQL file mode
93-
String script = FileUtils.readFileUtf8(sqlFile);
99+
var script = FileUtils.readFileUtf8(sqlFile);
100+
EnvironmentVariablesUtils.validateEnvironmentVariables(envVariables, script);
94101
tableResult = sqlExecutor.executeScript(script);
95102
} else if (planFile != null) {
96103
// Compiled plan JSON file
97-
String planJson = FileUtils.readFileUtf8(planFile);
104+
var planJson = FileUtils.readFileUtf8(planFile);
98105
planJson = replaceScriptWithEnv(planJson);
99106

100107
tableResult = sqlExecutor.executeCompiledPlan(planJson);
@@ -120,10 +127,10 @@ private String replaceScriptWithEnv(String script) {
120127
}
121128

122129
public static ObjectMapper getObjectMapper() {
123-
ObjectMapper objectMapper = new ObjectMapper();
130+
var objectMapper = new ObjectMapper();
124131

125132
// Register the custom deserializer module
126-
SimpleModule module = new SimpleModule();
133+
var module = new SimpleModule();
127134
module.addDeserializer(String.class, new JsonEnvVarDeserializer());
128135
objectMapper.registerModule(module);
129136
return objectMapper;
@@ -138,8 +145,7 @@ public static ObjectMapper getObjectMapper() {
138145
*/
139146
private Configuration loadConfigurationFromYaml(File configFile) throws Exception {
140147
log.info("Loading configuration from {}", configFile.getAbsolutePath());
141-
Configuration configuration =
142-
GlobalConfiguration.loadConfiguration(configFile.getAbsolutePath());
148+
var configuration = GlobalConfiguration.loadConfiguration(configFile.getAbsolutePath());
143149
return configuration;
144150
}
145151
}

src/main/java/com/datasqrl/SqlUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22

33
import java.util.ArrayList;
44
import java.util.List;
5+
import lombok.experimental.UtilityClass;
56

67
/** Utility class for parsing SQL scripts. */
8+
@UtilityClass
79
class SqlUtils {
810

911
private static final String STATEMENT_DELIMITER = ";"; // a statement should end with `;`

0 commit comments

Comments
 (0)