Skip to content

Commit 097451b

Browse files
committed
Add UDF support to jar submitter
Signed-off-by: Daniel Henneberger <git@danielhenneberger.com>
1 parent a3ed7b7 commit 097451b

File tree

8 files changed

+575
-13
lines changed

8 files changed

+575
-13
lines changed

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
<groupId>org.apache.flink</groupId>
3737
<artifactId>flink-table-planner_2.12</artifactId>
3838
<version>${flink.version}</version>
39-
<scope>test</scope>
39+
<scope>provided</scope>
4040
</dependency>
4141
<dependency>
4242
<groupId>org.apache.flink</groupId>
@@ -93,7 +93,7 @@
9393
<groupId>org.apache.flink</groupId>
9494
<artifactId>flink-table-api-java-bridge</artifactId>
9595
<version>1.19.1</version>
96-
<scope>compile</scope>
96+
<scope>provided</scope>
9797
</dependency>
9898
<dependency>
9999
<groupId>org.projectlombok</groupId>

src/main/java/com/datasqrl/SqlRunner.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,14 @@
2222
import org.apache.flink.configuration.GlobalConfiguration;
2323
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
2424
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
25+
import org.apache.flink.table.api.CompiledPlan;
2526
import org.apache.flink.table.api.PlanReference;
2627
import org.apache.flink.table.api.TableEnvironment;
2728
import org.apache.flink.table.api.EnvironmentSettings;
2829
import org.apache.flink.table.api.TableResult;
2930
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
31+
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
32+
import org.apache.flink.table.operations.StatementSetOperation;
3033
import org.apache.flink.util.FileUtils;
3134
import picocli.CommandLine;
3235
import picocli.CommandLine.*;
@@ -136,24 +139,23 @@ class SqlExecutor {
136139
public SqlExecutor(Configuration configuration, String udfPath) {
137140
StreamExecutionEnvironment sEnv;
138141
try {
139-
sEnv = new StreamExecutionEnvironment(configuration);
142+
sEnv = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
140143
} catch (Exception e) {
141144
throw e;
142145
}
143146

144147
EnvironmentSettings tEnvConfig = EnvironmentSettings.newInstance()
145148
.withConfiguration(configuration)
146-
// .withClassLoader(udfClassLoader)
147149
.build();
148150

149151
this.tableEnv = StreamTableEnvironment.create(sEnv, tEnvConfig);
150152

151153
// Apply configuration settings
152154
tableEnv.getConfig().addConfiguration(configuration);
153155

154-
// if (udfPath != null) {
155-
// setupUdfPath(udfPath);
156-
// }
156+
if (udfPath != null) {
157+
setupUdfPath(udfPath);
158+
}
157159
}
158160

159161
/**
@@ -176,7 +178,8 @@ public TableResult executeScript(String script) throws Exception {
176178
// .parse(statements.get(statements.size()-1)).get(0);
177179
//
178180
// CompiledPlan plan = tEnv1.compilePlan(parse.getOperations());
179-
// plan.writeToFile("/Users/henneberger/flink-jar-runner/src/test/resources/sql/compiled-plan.json");
181+
// plan.writeToFile("/Users/henneberger/flink-jar-runner/src/test/resources/sql/compiled-plan-udf.json");
182+
180183
return tableResult;
181184
}
182185

@@ -198,6 +201,7 @@ private TableResult executeStatement(String statement) {
198201
tableEnv.getConfig().getConfiguration().setString(key, value);
199202
log.info("Set configuration: {} = {}", key, value);
200203
} else {
204+
System.out.println(statement);
201205
log.info("Executing statement:\n{}", statement);
202206
tableResult = tableEnv.executeSql(statement);
203207
}

src/test/java/com/datasqrl/SqlRunnerTest.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,20 @@
1616
class SqlRunnerTest {
1717

1818
private File sqlFile;
19+
private File sqlUdfFile;
1920
private File planFile;
2021
private File configDir;
2122
private String udfPath;
2223

2324
@BeforeEach
2425
void setUp() throws URISyntaxException {
2526
sqlFile = new File(Resources.getResource("sql/test_sql.sql").toURI());
27+
sqlUdfFile = new File(Resources.getResource("sql/test_udf_sql.sql").toURI());
2628
planFile = new File(Resources.getResource("plans/test_plan.json").toURI());
2729
configDir = new File(Resources.getResource("config").toURI());
2830

2931
// Set UDF path to the 'udfs' directory in resources
30-
// udfPath = new File(classLoader.getResource("udfs").toURI()).getAbsolutePath();
32+
udfPath = new File(Resources.getResource("udfs").toURI()).getAbsolutePath();
3133
}
3234

3335
@Test
@@ -60,4 +62,20 @@ void testCompiledPlan() {
6062
// Assert failure exit code (1 for failure due to invalid argument combination)
6163
assertEquals(0, exitCode);
6264
}
65+
66+
@Test
67+
void testUdf() {
68+
String[] args = {
69+
"--configfile", configDir.getAbsolutePath(),
70+
"-s", sqlUdfFile.getAbsolutePath(),
71+
"--udfpath", udfPath,
72+
"--block"
73+
};
74+
75+
CommandLine cmd = new CommandLine(new SqlRunner());
76+
int exitCode = cmd.execute(args);
77+
78+
// Assert failure exit code (1 for failure due to invalid argument combination)
79+
assertEquals(0, exitCode);
80+
}
6381
}
Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
{
2+
"flinkVersion" : "1.19",
3+
"nodes" : [ {
4+
"id" : 6,
5+
"type" : "stream-exec-table-source-scan_1",
6+
"scanTableSource" : {
7+
"table" : {
8+
"identifier" : "`default_catalog`.`default_database`.`orders`",
9+
"resolvedTable" : {
10+
"schema" : {
11+
"columns" : [ {
12+
"name" : "order_id",
13+
"dataType" : "BIGINT"
14+
}, {
15+
"name" : "customer_id",
16+
"dataType" : "BIGINT"
17+
}, {
18+
"name" : "order_amount",
19+
"dataType" : "DECIMAL(10, 2)"
20+
}, {
21+
"name" : "order_status",
22+
"dataType" : "VARCHAR(2147483647)"
23+
}, {
24+
"name" : "order_time",
25+
"dataType" : "TIMESTAMP(3)"
26+
} ],
27+
"watermarkSpecs" : [ ]
28+
},
29+
"partitionKeys" : [ ],
30+
"options" : {
31+
"connector" : "datagen",
32+
"fields.customer_id.max" : "1000",
33+
"fields.customer_id.min" : "1",
34+
"fields.order_amount.max" : "500.00",
35+
"fields.order_amount.min" : "10.00",
36+
"fields.order_id.end" : "10000",
37+
"fields.order_id.kind" : "sequence",
38+
"fields.order_id.start" : "1",
39+
"fields.order_status.length" : "10",
40+
"number-of-rows" : "100"
41+
}
42+
}
43+
}
44+
},
45+
"outputType" : "ROW<`order_id` BIGINT, `customer_id` BIGINT, `order_amount` DECIMAL(10, 2), `order_status` VARCHAR(2147483647), `order_time` TIMESTAMP(3)>",
46+
"description" : "TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[order_id, customer_id, order_amount, order_status, order_time])",
47+
"inputProperties" : [ ]
48+
}, {
49+
"id" : 7,
50+
"type" : "stream-exec-calc_1",
51+
"projection" : [ {
52+
"kind" : "INPUT_REF",
53+
"inputIndex" : 0,
54+
"type" : "BIGINT"
55+
}, {
56+
"kind" : "CALL",
57+
"catalogName" : "`default_catalog`.`default_database`.`MYSCALARFUNCTION`",
58+
"class" : "com.myudf.MyScalarFunction",
59+
"operands" : [ {
60+
"kind" : "INPUT_REF",
61+
"inputIndex" : 1,
62+
"type" : "BIGINT"
63+
}, {
64+
"kind" : "INPUT_REF",
65+
"inputIndex" : 1,
66+
"type" : "BIGINT"
67+
} ],
68+
"type" : "BIGINT"
69+
}, {
70+
"kind" : "INPUT_REF",
71+
"inputIndex" : 2,
72+
"type" : "DECIMAL(10, 2)"
73+
}, {
74+
"kind" : "INPUT_REF",
75+
"inputIndex" : 3,
76+
"type" : "VARCHAR(2147483647)"
77+
}, {
78+
"kind" : "INPUT_REF",
79+
"inputIndex" : 4,
80+
"type" : "TIMESTAMP(3)"
81+
} ],
82+
"condition" : null,
83+
"inputProperties" : [ {
84+
"requiredDistribution" : {
85+
"type" : "UNKNOWN"
86+
},
87+
"damBehavior" : "PIPELINED",
88+
"priority" : 0
89+
} ],
90+
"outputType" : "ROW<`order_id` BIGINT, `EXPR$1` BIGINT, `order_amount` DECIMAL(10, 2), `order_status` VARCHAR(2147483647), `order_time` TIMESTAMP(3)>",
91+
"description" : "Calc(select=[order_id, MYSCALARFUNCTION(customer_id, customer_id) AS EXPR$1, order_amount, order_status, order_time])"
92+
}, {
93+
"id" : 8,
94+
"type" : "stream-exec-sink_1",
95+
"configuration" : {
96+
"table.exec.sink.keyed-shuffle" : "AUTO",
97+
"table.exec.sink.not-null-enforcer" : "ERROR",
98+
"table.exec.sink.rowtime-inserter" : "ENABLED",
99+
"table.exec.sink.type-length-enforcer" : "IGNORE",
100+
"table.exec.sink.upsert-materialize" : "AUTO"
101+
},
102+
"dynamicTableSink" : {
103+
"table" : {
104+
"identifier" : "`default_catalog`.`default_database`.`blackhole_orders`",
105+
"resolvedTable" : {
106+
"schema" : {
107+
"columns" : [ {
108+
"name" : "order_id",
109+
"dataType" : "BIGINT"
110+
}, {
111+
"name" : "customer_id",
112+
"dataType" : "BIGINT"
113+
}, {
114+
"name" : "order_amount",
115+
"dataType" : "DECIMAL(10, 2)"
116+
}, {
117+
"name" : "order_status",
118+
"dataType" : "VARCHAR(2147483647)"
119+
}, {
120+
"name" : "order_time",
121+
"dataType" : "TIMESTAMP(3)"
122+
} ],
123+
"watermarkSpecs" : [ ]
124+
},
125+
"partitionKeys" : [ ],
126+
"options" : {
127+
"connector" : "blackhole"
128+
}
129+
}
130+
}
131+
},
132+
"inputChangelogMode" : [ "INSERT" ],
133+
"inputProperties" : [ {
134+
"requiredDistribution" : {
135+
"type" : "UNKNOWN"
136+
},
137+
"damBehavior" : "PIPELINED",
138+
"priority" : 0
139+
} ],
140+
"outputType" : "ROW<`order_id` BIGINT, `EXPR$1` BIGINT, `order_amount` DECIMAL(10, 2), `order_status` VARCHAR(2147483647), `order_time` TIMESTAMP(3)>",
141+
"description" : "Sink(table=[default_catalog.default_database.blackhole_orders], fields=[order_id, EXPR$1, order_amount, order_status, order_time])"
142+
}, {
143+
"id" : 9,
144+
"type" : "stream-exec-calc_1",
145+
"projection" : [ {
146+
"kind" : "INPUT_REF",
147+
"inputIndex" : 0,
148+
"type" : "BIGINT"
149+
}, {
150+
"kind" : "CALL",
151+
"catalogName" : "`default_catalog`.`default_database`.`MYSCALARFUNCTION`",
152+
"class" : "com.myudf.MyScalarFunction",
153+
"operands" : [ {
154+
"kind" : "INPUT_REF",
155+
"inputIndex" : 1,
156+
"type" : "BIGINT"
157+
}, {
158+
"kind" : "INPUT_REF",
159+
"inputIndex" : 1,
160+
"type" : "BIGINT"
161+
} ],
162+
"type" : "BIGINT"
163+
}, {
164+
"kind" : "INPUT_REF",
165+
"inputIndex" : 2,
166+
"type" : "DECIMAL(10, 2)"
167+
}, {
168+
"kind" : "INPUT_REF",
169+
"inputIndex" : 4,
170+
"type" : "TIMESTAMP(3)"
171+
} ],
172+
"condition" : {
173+
"kind" : "CALL",
174+
"syntax" : "BINARY",
175+
"internalName" : "$<$1",
176+
"operands" : [ {
177+
"kind" : "INPUT_REF",
178+
"inputIndex" : 0,
179+
"type" : "BIGINT"
180+
}, {
181+
"kind" : "LITERAL",
182+
"value" : 10,
183+
"type" : "INT NOT NULL"
184+
} ],
185+
"type" : "BOOLEAN"
186+
},
187+
"inputProperties" : [ {
188+
"requiredDistribution" : {
189+
"type" : "UNKNOWN"
190+
},
191+
"damBehavior" : "PIPELINED",
192+
"priority" : 0
193+
} ],
194+
"outputType" : "ROW<`order_id` BIGINT, `EXPR$1` BIGINT, `order_amount` DECIMAL(10, 2), `order_time` TIMESTAMP(3)>",
195+
"description" : "Calc(select=[order_id, MYSCALARFUNCTION(customer_id, customer_id) AS EXPR$1, order_amount, order_time], where=[(order_id < 10)])"
196+
}, {
197+
"id" : 10,
198+
"type" : "stream-exec-sink_1",
199+
"configuration" : {
200+
"table.exec.sink.keyed-shuffle" : "AUTO",
201+
"table.exec.sink.not-null-enforcer" : "ERROR",
202+
"table.exec.sink.rowtime-inserter" : "ENABLED",
203+
"table.exec.sink.type-length-enforcer" : "IGNORE",
204+
"table.exec.sink.upsert-materialize" : "AUTO"
205+
},
206+
"dynamicTableSink" : {
207+
"table" : {
208+
"identifier" : "`default_catalog`.`default_database`.`blackhole_myorders`",
209+
"resolvedTable" : {
210+
"schema" : {
211+
"columns" : [ {
212+
"name" : "order_id",
213+
"dataType" : "BIGINT"
214+
}, {
215+
"name" : "customer_id",
216+
"dataType" : "BIGINT"
217+
}, {
218+
"name" : "order_amount",
219+
"dataType" : "DECIMAL(10, 2)"
220+
}, {
221+
"name" : "order_time",
222+
"dataType" : "TIMESTAMP(3)"
223+
} ],
224+
"watermarkSpecs" : [ ]
225+
},
226+
"partitionKeys" : [ ],
227+
"options" : {
228+
"connector" : "print"
229+
}
230+
}
231+
}
232+
},
233+
"inputChangelogMode" : [ "INSERT" ],
234+
"inputProperties" : [ {
235+
"requiredDistribution" : {
236+
"type" : "UNKNOWN"
237+
},
238+
"damBehavior" : "PIPELINED",
239+
"priority" : 0
240+
} ],
241+
"outputType" : "ROW<`order_id` BIGINT, `EXPR$1` BIGINT, `order_amount` DECIMAL(10, 2), `order_time` TIMESTAMP(3)>",
242+
"description" : "Sink(table=[default_catalog.default_database.blackhole_myorders], fields=[order_id, EXPR$1, order_amount, order_time])"
243+
} ],
244+
"edges" : [ {
245+
"source" : 6,
246+
"target" : 7,
247+
"shuffle" : {
248+
"type" : "FORWARD"
249+
},
250+
"shuffleMode" : "PIPELINED"
251+
}, {
252+
"source" : 7,
253+
"target" : 8,
254+
"shuffle" : {
255+
"type" : "FORWARD"
256+
},
257+
"shuffleMode" : "PIPELINED"
258+
}, {
259+
"source" : 6,
260+
"target" : 9,
261+
"shuffle" : {
262+
"type" : "FORWARD"
263+
},
264+
"shuffleMode" : "PIPELINED"
265+
}, {
266+
"source" : 9,
267+
"target" : 10,
268+
"shuffle" : {
269+
"type" : "FORWARD"
270+
},
271+
"shuffleMode" : "PIPELINED"
272+
} ]
273+
}

0 commit comments

Comments
 (0)