Skip to content

Commit 681d36d

Browse files
authored
Merge pull request #1 from DataSQRL/split_classes
Move each class to a new file
2 parents f0b1551 + 4bf8ef5 commit 681d36d

File tree

4 files changed

+292
-277
lines changed

4 files changed

+292
-277
lines changed
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package com.datasqrl;
2+
3+
import java.io.IOException;
4+
import java.util.Map;
5+
import java.util.regex.Matcher;
6+
import java.util.regex.Pattern;
7+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
8+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
9+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonDeserializer;
10+
11+
class JsonEnvVarDeserializer extends JsonDeserializer<String> {
12+
13+
private Map<String, String> env;
14+
15+
public JsonEnvVarDeserializer() {
16+
env = System.getenv();
17+
}
18+
19+
public JsonEnvVarDeserializer(Map<String, String> env) {
20+
this.env = env;
21+
}
22+
23+
@Override
24+
public String deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
25+
String value = p.getText();
26+
return replaceWithEnv(this.env, value);
27+
}
28+
29+
public String replaceWithEnv(Map<String, String> env, String value) {
30+
Pattern pattern = Pattern.compile("\\$\\{(.+?)\\}");
31+
Matcher matcher = pattern.matcher(value);
32+
StringBuffer result = new StringBuffer();
33+
while (matcher.find()) {
34+
String key = matcher.group(1);
35+
String envVarValue = env.get(key);
36+
if (envVarValue != null) {
37+
matcher.appendReplacement(result, Matcher.quoteReplacement(envVarValue));
38+
}
39+
}
40+
matcher.appendTail(result);
41+
42+
return result.toString();
43+
}
44+
}
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
package com.datasqrl;
2+
3+
import java.io.File;
4+
import java.util.List;
5+
import java.util.Map;
6+
import java.util.regex.Matcher;
7+
import java.util.regex.Pattern;
8+
import org.apache.flink.configuration.Configuration;
9+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
10+
import org.apache.flink.table.api.EnvironmentSettings;
11+
import org.apache.flink.table.api.PlanReference;
12+
import org.apache.flink.table.api.TableEnvironment;
13+
import org.apache.flink.table.api.TableResult;
14+
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
15+
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
17+
18+
/**
19+
* Class for executing SQL scripts programmatically.
20+
*/
21+
class SqlExecutor {
22+
23+
private static final Logger log = LoggerFactory.getLogger(SqlExecutor.class);
24+
25+
private static final Pattern SET_STATEMENT_PATTERN = Pattern.compile(
26+
"SET\\s+'(\\S+)'\\s*=\\s*'(.+)';?", Pattern.CASE_INSENSITIVE);
27+
28+
private final TableEnvironment tableEnv;
29+
30+
public SqlExecutor(Configuration configuration, String udfPath) {
31+
StreamExecutionEnvironment sEnv;
32+
try {
33+
sEnv = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
34+
} catch (Exception e) {
35+
throw e;
36+
}
37+
38+
EnvironmentSettings tEnvConfig = EnvironmentSettings.newInstance()
39+
.withConfiguration(configuration)
40+
.build();
41+
42+
this.tableEnv = StreamTableEnvironment.create(sEnv, tEnvConfig);
43+
44+
// Apply configuration settings
45+
tableEnv.getConfig().addConfiguration(configuration);
46+
47+
if (udfPath != null) {
48+
setupUdfPath(udfPath);
49+
}
50+
}
51+
52+
/**
53+
* Executes a single SQL script.
54+
*
55+
* @param script The SQL script content.
56+
* @return
57+
* @throws Exception If execution fails.
58+
*/
59+
public TableResult executeScript(String script) throws Exception {
60+
List<String> statements = SqlUtils.parseStatements(script);
61+
TableResult tableResult = null;
62+
for (String statement : statements) {
63+
tableResult = executeStatement(statement);
64+
}
65+
//
66+
// TableEnvironmentImpl tEnv1 = (TableEnvironmentImpl) tableEnv;
67+
//
68+
// StatementSetOperation parse = (StatementSetOperation)tEnv1.getParser()
69+
// .parse(statements.get(statements.size()-1)).get(0);
70+
//
71+
// CompiledPlan plan = tEnv1.compilePlan(parse.getOperations());
72+
// plan.writeToFile("/Users/henneberger/flink-jar-runner/src/test/resources/sql/compiled-plan-udf.json");
73+
74+
return tableResult;
75+
}
76+
77+
/**
78+
* Executes a single SQL statement.
79+
*
80+
* @param statement The SQL statement.
81+
* @return
82+
*/
83+
private TableResult executeStatement(String statement) {
84+
TableResult tableResult = null;
85+
try {
86+
Matcher setMatcher = SET_STATEMENT_PATTERN.matcher(statement.trim());
87+
88+
if (setMatcher.matches()) {
89+
// Handle SET statements
90+
String key = setMatcher.group(1);
91+
String value = setMatcher.group(2);
92+
tableEnv.getConfig().getConfiguration().setString(key, value);
93+
log.info("Set configuration: {} = {}", key, value);
94+
} else {
95+
System.out.println(statement);
96+
log.info("Executing statement:\n{}", statement);
97+
tableResult = tableEnv.executeSql(replaceWithEnv(statement));
98+
}
99+
} catch (Exception e) {
100+
log.error("Failed to execute statement: {}", statement, e);
101+
throw e;
102+
}
103+
return tableResult;
104+
}
105+
106+
public String replaceWithEnv(String command) {
107+
Map<String, String> envVariables = System.getenv();
108+
Pattern pattern = Pattern.compile("\\$\\{(.*?)\\}");
109+
110+
String substitutedStr = command;
111+
StringBuffer result = new StringBuffer();
112+
// First pass to replace environment variables
113+
Matcher matcher = pattern.matcher(substitutedStr);
114+
while (matcher.find()) {
115+
String key = matcher.group(1);
116+
String envValue = envVariables.getOrDefault(key, "");
117+
matcher.appendReplacement(result, Matcher.quoteReplacement(envValue));
118+
}
119+
matcher.appendTail(result);
120+
121+
return result.toString();
122+
}
123+
124+
/**
125+
* Sets up the UDF path in the TableEnvironment.
126+
*
127+
* @param udfPath The path to UDFs.
128+
*/
129+
private void setupUdfPath(String udfPath) {
130+
// Load and register UDFs from the provided path
131+
log.info("Setting up UDF path: {}", udfPath);
132+
// Implementation depends on how UDFs are packaged and should be loaded
133+
// For example, you might use a URLClassLoader to load JARs from udfPath
134+
try {
135+
File udfDir = new File(udfPath);
136+
if (udfDir.exists() && udfDir.isDirectory()) {
137+
File[] jarFiles = udfDir.listFiles((dir, name) -> name.endsWith(".jar"));
138+
if (jarFiles != null) {
139+
for (File jarFile : jarFiles) {
140+
tableEnv.executeSql("ADD JAR 'file://" + jarFile.getAbsolutePath() + "'");
141+
log.info("Added UDF JAR: {}", jarFile.getAbsolutePath());
142+
}
143+
}
144+
} else {
145+
log.warn("UDF path does not exist or is not a directory: {}", udfPath);
146+
}
147+
} catch (Exception e) {
148+
log.error("Failed to set up UDF path", e);
149+
}
150+
}
151+
152+
/**
153+
* Executes a compiled plan from JSON.
154+
*
155+
* @param planJson The JSON content of the compiled plan.
156+
* @return
157+
* @throws Exception If execution fails.
158+
*/
159+
protected TableResult executeCompiledPlan(String planJson) throws Exception {
160+
log.info("Executing compiled plan from JSON.");
161+
try {
162+
PlanReference planReference = PlanReference.fromJsonString(planJson);
163+
TableResult result = tableEnv.executePlan(planReference);
164+
log.info("Compiled plan executed.");
165+
return result;
166+
} catch (Exception e) {
167+
log.error("Failed to execute compiled plan", e);
168+
throw e;
169+
}
170+
}
171+
}

0 commit comments

Comments
 (0)