17
17
18
18
package com .datasqrl ;
19
19
20
+ import java .io .File ;
21
+ import java .util .Map ;
22
+ import java .util .TreeMap ;
23
+ import java .util .concurrent .Callable ;
20
24
import lombok .SneakyThrows ;
21
25
import lombok .extern .slf4j .Slf4j ;
22
26
import org .apache .flink .configuration .Configuration ;
23
27
import org .apache .flink .configuration .GlobalConfiguration ;
24
28
import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .databind .ObjectMapper ;
25
29
import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .databind .module .SimpleModule ;
26
- import org .apache .flink .table .api .CompiledPlan ;
27
30
import org .apache .flink .table .api .TableResult ;
28
- import org .apache .flink .table .api .internal .TableEnvironmentImpl ;
29
- import org .apache .flink .table .operations .StatementSetOperation ;
30
31
import org .apache .flink .util .FileUtils ;
31
32
import picocli .CommandLine ;
32
- import picocli .CommandLine .* ;
33
- import java . io . File ;
34
- import java . util .*;
35
- import java . util . concurrent . Callable ;
36
-
37
- /**
38
- * Main class for executing SQL scripts using picocli.
39
- */
40
- @ Command ( name = "SqlRunner" , mixinStandardHelpOptions = true , version = "1.0" , description = "Runs SQL scripts using Flink TableEnvironment." )
33
+ import picocli .CommandLine .Command ;
34
+ import picocli . CommandLine . Option ;
35
+
36
+ /** Main class for executing SQL scripts using picocli. */
37
+ @ Command (
38
+ name = "SqlRunner" ,
39
+ mixinStandardHelpOptions = true ,
40
+ version = "1.0" ,
41
+ description = "Runs SQL scripts using Flink TableEnvironment." )
41
42
@ Slf4j
42
43
public class SqlRunner implements Callable <Integer > {
43
44
44
- @ Option (names = {"-s" , "--sqlfile" }, description = "SQL file to execute." )
45
+ @ Option (
46
+ names = {"-s" , "--sqlfile" },
47
+ description = "SQL file to execute." )
45
48
private File sqlFile ;
46
49
47
- @ Option (names = {"--block" }, description = "Wait for the flink job manager to exit." ,
48
- defaultValue = "false" )
50
+ @ Option (
51
+ names = {"--block" },
52
+ description = "Wait for the flink job manager to exit." ,
53
+ defaultValue = "false" )
49
54
private boolean block ;
50
55
51
- @ Option (names = {"--planfile" }, description = "Compiled plan JSON file." )
56
+ @ Option (
57
+ names = {"--planfile" },
58
+ description = "Compiled plan JSON file." )
52
59
private File planFile ;
53
60
54
- @ Option (names = {"--configfile" }, description = "Configuration YAML file." )
61
+ @ Option (
62
+ names = {"--configfile" },
63
+ description = "Configuration YAML file." )
55
64
private File configFile ;
56
65
57
- @ Option (names = {"--udfpath" }, description = "Path to UDFs." )
66
+ @ Option (
67
+ names = {"--udfpath" },
68
+ description = "Path to UDFs." )
58
69
private String udfPath ;
59
70
60
71
public static void main (String [] args ) {
61
- int exitCode = new CommandLine (new SqlRunner ()).execute (args );
72
+ var exitCode = new CommandLine (new SqlRunner ()).execute (args );
62
73
System .exit (exitCode );
63
74
}
64
75
@@ -70,22 +81,27 @@ public Integer call() throws Exception {
70
81
}
71
82
72
83
// Load configuration if configFile is provided
73
- Configuration configuration = new Configuration ();
84
+ var configuration = new Configuration ();
74
85
if (configFile != null ) {
75
86
configuration = loadConfigurationFromYaml (configFile );
76
87
}
77
88
89
+ log .info ("Environment variables" );
90
+ TreeMap <String , String > envVariables = new TreeMap <>(System .getenv ());
91
+ envVariables .forEach ((name , value ) -> log .info ("{}: {}" , name , value ));
92
+
78
93
// Initialize SqlExecutor
79
- SqlExecutor sqlExecutor = new SqlExecutor (configuration , udfPath );
94
+ var sqlExecutor = new SqlExecutor (configuration , udfPath , envVariables );
80
95
TableResult tableResult ;
81
96
// Input validation and execution logic
82
97
if (sqlFile != null ) {
83
98
// Single SQL file mode
84
- String script = FileUtils .readFileUtf8 (sqlFile );
99
+ var script = FileUtils .readFileUtf8 (sqlFile );
100
+ EnvironmentVariablesUtils .validateEnvironmentVariables (envVariables , script );
85
101
tableResult = sqlExecutor .executeScript (script );
86
102
} else if (planFile != null ) {
87
103
// Compiled plan JSON file
88
- String planJson = FileUtils .readFileUtf8 (planFile );
104
+ var planJson = FileUtils .readFileUtf8 (planFile );
89
105
planJson = replaceScriptWithEnv (planJson );
90
106
91
107
tableResult = sqlExecutor .executeCompiledPlan (planJson );
@@ -110,12 +126,11 @@ private String replaceScriptWithEnv(String script) {
110
126
return objectMapper .writeValueAsString (map );
111
127
}
112
128
113
-
114
129
public static ObjectMapper getObjectMapper () {
115
- ObjectMapper objectMapper = new ObjectMapper ();
130
+ var objectMapper = new ObjectMapper ();
116
131
117
132
// Register the custom deserializer module
118
- SimpleModule module = new SimpleModule ();
133
+ var module = new SimpleModule ();
119
134
module .addDeserializer (String .class , new JsonEnvVarDeserializer ());
120
135
objectMapper .registerModule (module );
121
136
return objectMapper ;
@@ -130,7 +145,7 @@ public static ObjectMapper getObjectMapper() {
130
145
*/
131
146
private Configuration loadConfigurationFromYaml (File configFile ) throws Exception {
132
147
log .info ("Loading configuration from {}" , configFile .getAbsolutePath ());
133
- Configuration configuration = GlobalConfiguration .loadConfiguration (configFile .getAbsolutePath ());
148
+ var configuration = GlobalConfiguration .loadConfiguration (configFile .getAbsolutePath ());
134
149
return configuration ;
135
150
}
136
- }
151
+ }
0 commit comments