diff --git a/src/main/java/com/datasqrl/JsonEnvVarDeserializer.java b/src/main/java/com/datasqrl/JsonEnvVarDeserializer.java new file mode 100644 index 0000000..f394330 --- /dev/null +++ b/src/main/java/com/datasqrl/JsonEnvVarDeserializer.java @@ -0,0 +1,44 @@ +package com.datasqrl; + +import java.io.IOException; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonDeserializer; + +class JsonEnvVarDeserializer extends JsonDeserializer { + + private Map env; + + public JsonEnvVarDeserializer() { + env = System.getenv(); + } + + public JsonEnvVarDeserializer(Map env) { + this.env = env; + } + + @Override + public String deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + String value = p.getText(); + return replaceWithEnv(this.env, value); + } + + public String replaceWithEnv(Map env, String value) { + Pattern pattern = Pattern.compile("\\$\\{(.+?)\\}"); + Matcher matcher = pattern.matcher(value); + StringBuffer result = new StringBuffer(); + while (matcher.find()) { + String key = matcher.group(1); + String envVarValue = env.get(key); + if (envVarValue != null) { + matcher.appendReplacement(result, Matcher.quoteReplacement(envVarValue)); + } + } + matcher.appendTail(result); + + return result.toString(); + } +} \ No newline at end of file diff --git a/src/main/java/com/datasqrl/SqlExecutor.java b/src/main/java/com/datasqrl/SqlExecutor.java new file mode 100644 index 0000000..2e09e24 --- /dev/null +++ b/src/main/java/com/datasqrl/SqlExecutor.java @@ -0,0 +1,171 @@ +package com.datasqrl; + +import java.io.File; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.PlanReference; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class for executing SQL scripts programmatically. + */ +class SqlExecutor { + + private static final Logger log = LoggerFactory.getLogger(SqlExecutor.class); + + private static final Pattern SET_STATEMENT_PATTERN = Pattern.compile( + "SET\\s+'(\\S+)'\\s*=\\s*'(.+)';?", Pattern.CASE_INSENSITIVE); + + private final TableEnvironment tableEnv; + + public SqlExecutor(Configuration configuration, String udfPath) { + StreamExecutionEnvironment sEnv; + try { + sEnv = StreamExecutionEnvironment.getExecutionEnvironment(configuration); + } catch (Exception e) { + throw e; + } + + EnvironmentSettings tEnvConfig = EnvironmentSettings.newInstance() + .withConfiguration(configuration) + .build(); + + this.tableEnv = StreamTableEnvironment.create(sEnv, tEnvConfig); + + // Apply configuration settings + tableEnv.getConfig().addConfiguration(configuration); + + if (udfPath != null) { + setupUdfPath(udfPath); + } + } + + /** + * Executes a single SQL script. + * + * @param script The SQL script content. + * @return + * @throws Exception If execution fails. + */ + public TableResult executeScript(String script) throws Exception { + List statements = SqlUtils.parseStatements(script); + TableResult tableResult = null; + for (String statement : statements) { + tableResult = executeStatement(statement); + } +// +// TableEnvironmentImpl tEnv1 = (TableEnvironmentImpl) tableEnv; +// +// StatementSetOperation parse = (StatementSetOperation)tEnv1.getParser() +// .parse(statements.get(statements.size()-1)).get(0); +// +// CompiledPlan plan = tEnv1.compilePlan(parse.getOperations()); +// plan.writeToFile("/Users/henneberger/flink-jar-runner/src/test/resources/sql/compiled-plan-udf.json"); + + return tableResult; + } + + /** + * Executes a single SQL statement. + * + * @param statement The SQL statement. + * @return + */ + private TableResult executeStatement(String statement) { + TableResult tableResult = null; + try { + Matcher setMatcher = SET_STATEMENT_PATTERN.matcher(statement.trim()); + + if (setMatcher.matches()) { + // Handle SET statements + String key = setMatcher.group(1); + String value = setMatcher.group(2); + tableEnv.getConfig().getConfiguration().setString(key, value); + log.info("Set configuration: {} = {}", key, value); + } else { + System.out.println(statement); + log.info("Executing statement:\n{}", statement); + tableResult = tableEnv.executeSql(replaceWithEnv(statement)); + } + } catch (Exception e) { + log.error("Failed to execute statement: {}", statement, e); + throw e; + } + return tableResult; + } + + public String replaceWithEnv(String command) { + Map envVariables = System.getenv(); + Pattern pattern = Pattern.compile("\\$\\{(.*?)\\}"); + + String substitutedStr = command; + StringBuffer result = new StringBuffer(); + // First pass to replace environment variables + Matcher matcher = pattern.matcher(substitutedStr); + while (matcher.find()) { + String key = matcher.group(1); + String envValue = envVariables.getOrDefault(key, ""); + matcher.appendReplacement(result, Matcher.quoteReplacement(envValue)); + } + matcher.appendTail(result); + + return result.toString(); + } + + /** + * Sets up the UDF path in the TableEnvironment. + * + * @param udfPath The path to UDFs. + */ + private void setupUdfPath(String udfPath) { + // Load and register UDFs from the provided path + log.info("Setting up UDF path: {}", udfPath); + // Implementation depends on how UDFs are packaged and should be loaded + // For example, you might use a URLClassLoader to load JARs from udfPath + try { + File udfDir = new File(udfPath); + if (udfDir.exists() && udfDir.isDirectory()) { + File[] jarFiles = udfDir.listFiles((dir, name) -> name.endsWith(".jar")); + if (jarFiles != null) { + for (File jarFile : jarFiles) { + tableEnv.executeSql("ADD JAR 'file://" + jarFile.getAbsolutePath() + "'"); + log.info("Added UDF JAR: {}", jarFile.getAbsolutePath()); + } + } + } else { + log.warn("UDF path does not exist or is not a directory: {}", udfPath); + } + } catch (Exception e) { + log.error("Failed to set up UDF path", e); + } + } + + /** + * Executes a compiled plan from JSON. + * + * @param planJson The JSON content of the compiled plan. + * @return + * @throws Exception If execution fails. + */ + protected TableResult executeCompiledPlan(String planJson) throws Exception { + log.info("Executing compiled plan from JSON."); + try { + PlanReference planReference = PlanReference.fromJsonString(planJson); + TableResult result = tableEnv.executePlan(planReference); + log.info("Compiled plan executed."); + return result; + } catch (Exception e) { + log.error("Failed to execute compiled plan", e); + throw e; + } + } +} \ No newline at end of file diff --git a/src/main/java/com/datasqrl/SqlRunner.java b/src/main/java/com/datasqrl/SqlRunner.java index 9c47eb8..1e055b1 100644 --- a/src/main/java/com/datasqrl/SqlRunner.java +++ b/src/main/java/com/datasqrl/SqlRunner.java @@ -17,36 +17,22 @@ package com.datasqrl; -import java.io.IOException; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonDeserializer; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.CompiledPlan; -import org.apache.flink.table.api.PlanReference; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableResult; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.operations.StatementSetOperation; import org.apache.flink.util.FileUtils; import picocli.CommandLine; import picocli.CommandLine.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.File; import java.util.*; import java.util.concurrent.Callable; -import java.util.regex.Matcher; -import java.util.regex.Pattern; /** * Main class for executing SQL scripts using picocli. @@ -147,267 +133,4 @@ private Configuration loadConfigurationFromYaml(File configFile) throws Exceptio Configuration configuration = GlobalConfiguration.loadConfiguration(configFile.getAbsolutePath()); return configuration; } -} - -/** - * Class for executing SQL scripts programmatically. - */ -class SqlExecutor { - - private static final Logger log = LoggerFactory.getLogger(SqlExecutor.class); - - private static final Pattern SET_STATEMENT_PATTERN = Pattern.compile( - "SET\\s+'(\\S+)'\\s*=\\s*'(.+)';?", Pattern.CASE_INSENSITIVE); - - private final TableEnvironment tableEnv; - - public SqlExecutor(Configuration configuration, String udfPath) { - StreamExecutionEnvironment sEnv; - try { - sEnv = StreamExecutionEnvironment.getExecutionEnvironment(configuration); - } catch (Exception e) { - throw e; - } - - EnvironmentSettings tEnvConfig = EnvironmentSettings.newInstance() - .withConfiguration(configuration) - .build(); - - this.tableEnv = StreamTableEnvironment.create(sEnv, tEnvConfig); - - // Apply configuration settings - tableEnv.getConfig().addConfiguration(configuration); - - if (udfPath != null) { - setupUdfPath(udfPath); - } - } - - /** - * Executes a single SQL script. - * - * @param script The SQL script content. - * @return - * @throws Exception If execution fails. - */ - public TableResult executeScript(String script) throws Exception { - List statements = SqlUtils.parseStatements(script); - TableResult tableResult = null; - for (String statement : statements) { - tableResult = executeStatement(statement); - } -// -// TableEnvironmentImpl tEnv1 = (TableEnvironmentImpl) tableEnv; -// -// StatementSetOperation parse = (StatementSetOperation)tEnv1.getParser() -// .parse(statements.get(statements.size()-1)).get(0); -// -// CompiledPlan plan = tEnv1.compilePlan(parse.getOperations()); -// plan.writeToFile("/Users/henneberger/flink-jar-runner/src/test/resources/sql/compiled-plan-udf.json"); - - return tableResult; - } - - /** - * Executes a single SQL statement. - * - * @param statement The SQL statement. - * @return - */ - private TableResult executeStatement(String statement) { - TableResult tableResult = null; - try { - Matcher setMatcher = SET_STATEMENT_PATTERN.matcher(statement.trim()); - - if (setMatcher.matches()) { - // Handle SET statements - String key = setMatcher.group(1); - String value = setMatcher.group(2); - tableEnv.getConfig().getConfiguration().setString(key, value); - log.info("Set configuration: {} = {}", key, value); - } else { - System.out.println(statement); - log.info("Executing statement:\n{}", statement); - tableResult = tableEnv.executeSql(replaceWithEnv(statement)); - } - } catch (Exception e) { - log.error("Failed to execute statement: {}", statement, e); - throw e; - } - return tableResult; - } - - public String replaceWithEnv(String command) { - Map envVariables = System.getenv(); - Pattern pattern = Pattern.compile("\\$\\{(.*?)\\}"); - - String substitutedStr = command; - StringBuffer result = new StringBuffer(); - // First pass to replace environment variables - Matcher matcher = pattern.matcher(substitutedStr); - while (matcher.find()) { - String key = matcher.group(1); - String envValue = envVariables.getOrDefault(key, ""); - matcher.appendReplacement(result, Matcher.quoteReplacement(envValue)); - } - matcher.appendTail(result); - - return result.toString(); - } - - /** - * Sets up the UDF path in the TableEnvironment. - * - * @param udfPath The path to UDFs. - */ - private void setupUdfPath(String udfPath) { - // Load and register UDFs from the provided path - log.info("Setting up UDF path: {}", udfPath); - // Implementation depends on how UDFs are packaged and should be loaded - // For example, you might use a URLClassLoader to load JARs from udfPath - try { - File udfDir = new File(udfPath); - if (udfDir.exists() && udfDir.isDirectory()) { - File[] jarFiles = udfDir.listFiles((dir, name) -> name.endsWith(".jar")); - if (jarFiles != null) { - for (File jarFile : jarFiles) { - tableEnv.executeSql("ADD JAR 'file://" + jarFile.getAbsolutePath() + "'"); - log.info("Added UDF JAR: {}", jarFile.getAbsolutePath()); - } - } - } else { - log.warn("UDF path does not exist or is not a directory: {}", udfPath); - } - } catch (Exception e) { - log.error("Failed to set up UDF path", e); - } - } - - /** - * Executes a compiled plan from JSON. - * - * @param planJson The JSON content of the compiled plan. - * @return - * @throws Exception If execution fails. - */ - protected TableResult executeCompiledPlan(String planJson) throws Exception { - log.info("Executing compiled plan from JSON."); - try { - PlanReference planReference = PlanReference.fromJsonString(planJson); - TableResult result = tableEnv.executePlan(planReference); - log.info("Compiled plan executed."); - return result; - } catch (Exception e) { - log.error("Failed to execute compiled plan", e); - throw e; - } - } -} - -/** - * Utility class for parsing SQL scripts. - */ -class SqlUtils { - - private static final String STATEMENT_DELIMITER = ";"; // a statement should end with `;` - private static final String LINE_DELIMITER = "\n"; - - private static final String COMMENT_PATTERN = "(--.*)|(((/\\*)+?[\\w\\W]+?(\\*/)+))"; - - private static final String BEGIN_CERTIFICATE = "-----BEGIN CERTIFICATE-----"; - private static final String END_CERTIFICATE = "-----END CERTIFICATE-----"; - private static final String ESCAPED_BEGIN_CERTIFICATE = "======BEGIN CERTIFICATE====="; - private static final String ESCAPED_END_CERTIFICATE = "=====END CERTIFICATE====="; - - /** - * Parses SQL statements from a script. - * - * @param script The SQL script content. - * @return A list of individual SQL statements. - */ - public static List parseStatements(String script) { - String formatted = formatSqlFile(script).replaceAll(BEGIN_CERTIFICATE, - ESCAPED_BEGIN_CERTIFICATE).replaceAll(END_CERTIFICATE, ESCAPED_END_CERTIFICATE) - .replaceAll(COMMENT_PATTERN, "").replaceAll(ESCAPED_BEGIN_CERTIFICATE, BEGIN_CERTIFICATE) - .replaceAll(ESCAPED_END_CERTIFICATE, END_CERTIFICATE); - - List statements = new ArrayList<>(); - - StringBuilder current = null; - boolean statementSet = false; - for (String line : formatted.split("\n")) { - String trimmed = line.trim(); - if (trimmed.isBlank()) { - continue; - } - if (current == null) { - current = new StringBuilder(); - } - if (trimmed.startsWith("EXECUTE STATEMENT SET")) { - statementSet = true; - } - current.append(trimmed); - current.append("\n"); - if (trimmed.endsWith(STATEMENT_DELIMITER)) { - if (!statementSet || trimmed.equalsIgnoreCase("END;")) { - statements.add(current.toString()); - current = null; - statementSet = false; - } - } - } - return statements; - } - - /** - * Formats the SQL file content to ensure proper statement termination. - * - * @param content The SQL file content. - * @return Formatted SQL content. - */ - public static String formatSqlFile(String content) { - String trimmed = content.trim(); - StringBuilder formatted = new StringBuilder(); - formatted.append(trimmed); - if (!trimmed.endsWith(STATEMENT_DELIMITER)) { - formatted.append(STATEMENT_DELIMITER); - } - formatted.append(LINE_DELIMITER); - return formatted.toString(); - } -} - -class JsonEnvVarDeserializer extends JsonDeserializer { - - private Map env; - - public JsonEnvVarDeserializer() { - env = System.getenv(); - } - - public JsonEnvVarDeserializer(Map env) { - this.env = env; - } - - @Override - public String deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { - String value = p.getText(); - return replaceWithEnv(this.env, value); - } - - public String replaceWithEnv(Map env, String value) { - Pattern pattern = Pattern.compile("\\$\\{(.+?)\\}"); - Matcher matcher = pattern.matcher(value); - StringBuffer result = new StringBuffer(); - while (matcher.find()) { - String key = matcher.group(1); - String envVarValue = env.get(key); - if (envVarValue != null) { - matcher.appendReplacement(result, Matcher.quoteReplacement(envVarValue)); - } - } - matcher.appendTail(result); - - return result.toString(); - } } \ No newline at end of file diff --git a/src/main/java/com/datasqrl/SqlUtils.java b/src/main/java/com/datasqrl/SqlUtils.java new file mode 100644 index 0000000..0a48d06 --- /dev/null +++ b/src/main/java/com/datasqrl/SqlUtils.java @@ -0,0 +1,77 @@ +package com.datasqrl; + +import java.util.ArrayList; +import java.util.List; + +/** + * Utility class for parsing SQL scripts. + */ +class SqlUtils { + + private static final String STATEMENT_DELIMITER = ";"; // a statement should end with `;` + private static final String LINE_DELIMITER = "\n"; + + private static final String COMMENT_PATTERN = "(--.*)|(((/\\*)+?[\\w\\W]+?(\\*/)+))"; + + private static final String BEGIN_CERTIFICATE = "-----BEGIN CERTIFICATE-----"; + private static final String END_CERTIFICATE = "-----END CERTIFICATE-----"; + private static final String ESCAPED_BEGIN_CERTIFICATE = "======BEGIN CERTIFICATE====="; + private static final String ESCAPED_END_CERTIFICATE = "=====END CERTIFICATE====="; + + /** + * Parses SQL statements from a script. + * + * @param script The SQL script content. + * @return A list of individual SQL statements. + */ + public static List parseStatements(String script) { + String formatted = formatSqlFile(script).replaceAll(BEGIN_CERTIFICATE, + ESCAPED_BEGIN_CERTIFICATE).replaceAll(END_CERTIFICATE, ESCAPED_END_CERTIFICATE) + .replaceAll(COMMENT_PATTERN, "").replaceAll(ESCAPED_BEGIN_CERTIFICATE, BEGIN_CERTIFICATE) + .replaceAll(ESCAPED_END_CERTIFICATE, END_CERTIFICATE); + + List statements = new ArrayList<>(); + + StringBuilder current = null; + boolean statementSet = false; + for (String line : formatted.split("\n")) { + String trimmed = line.trim(); + if (trimmed.isBlank()) { + continue; + } + if (current == null) { + current = new StringBuilder(); + } + if (trimmed.startsWith("EXECUTE STATEMENT SET")) { + statementSet = true; + } + current.append(trimmed); + current.append("\n"); + if (trimmed.endsWith(STATEMENT_DELIMITER)) { + if (!statementSet || trimmed.equalsIgnoreCase("END;")) { + statements.add(current.toString()); + current = null; + statementSet = false; + } + } + } + return statements; + } + + /** + * Formats the SQL file content to ensure proper statement termination. + * + * @param content The SQL file content. + * @return Formatted SQL content. + */ + public static String formatSqlFile(String content) { + String trimmed = content.trim(); + StringBuilder formatted = new StringBuilder(); + formatted.append(trimmed); + if (!trimmed.endsWith(STATEMENT_DELIMITER)) { + formatted.append(STATEMENT_DELIMITER); + } + formatted.append(LINE_DELIMITER); + return formatted.toString(); + } +} \ No newline at end of file