Skip to content

Commit f5f1503

Browse files
committed
wip
1 parent 89cddfb commit f5f1503

File tree

10 files changed

+749
-15
lines changed

10 files changed

+749
-15
lines changed

pom.xml

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,14 @@
202202
<artifactId>flink-s3-fs-hadoop</artifactId>
203203
<version>${flink.version}</version>
204204
</dependency>
205+
<dependency>
206+
<groupId>org.apache.commons</groupId>
207+
<artifactId>commons-exec</artifactId>
208+
<version>1.4.0</version>
209+
<scope>test</scope>
210+
</dependency>
205211
</dependencies>
212+
206213
<repositories>
207214
<repository>
208215
<id>central</id>
@@ -226,11 +233,6 @@
226233

227234
<build>
228235
<resources>
229-
<resource>
230-
<targetPath>${project.basedir}/target</targetPath>
231-
<filtering>true</filtering>
232-
<directory>src/test/docker</directory>
233-
</resource>
234236
<resource>
235237
<directory>src/main/resources</directory>
236238
</resource>

src/test/docker/docker-compose.yml

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,18 +38,17 @@ services:
3838
- datasqrl_network
3939

4040
jobmanager:
41-
user: 1000:1000
4241
image: flink:${flink-base-image}
4342
environment:
4443
- JDBC_URL=jdbc:postgresql://postgres:5432/datasqrl
4544
- JDBC_USERNAME=postgres
4645
- JDBC_PASSWORD=postgres
47-
- PROPERTIES_BOOTSTRAP_SERVERS=kafka:9092
48-
- PROPERTIES_GROUP_ID=mygroupid
4946
- |
5047
FLINK_PROPERTIES=
5148
taskmanager.slot.timeout: 30000ms
5249
jobmanager.rpc.address: jobmanager
50+
state.savepoints-storage: filesystem
51+
state.savepoints.dir: file:///tmp/flink-jar-runner
5352
command: jobmanager
5453
ports:
5554
- "8081:8081" # Flink JobManager REST port
@@ -62,27 +61,32 @@ services:
6261
- flink_conf:/opt/flink/conf
6362
- ./:/opt/flink/usrlib/
6463
- ./datasources/:/datasources/
64+
- /tmp:/tmp
6565
healthcheck:
6666
test: ["CMD-SHELL", "curl -f http://localhost:8081/ || exit 1"]
6767
interval: 1s
6868
timeout: 1s
6969
retries: 50
70+
restart: always
71+
deploy:
72+
resources:
73+
limits:
74+
memory: 2g
7075
networks:
7176
- datasqrl_network
7277

7378
taskmanager:
74-
user: 1000:1000
7579
image: flink:${flink-base-image}
7680
environment:
7781
- JDBC_URL=jdbc:postgresql://postgres:5432/datasqrl
7882
- JDBC_USERNAME=postgres
7983
- JDBC_PASSWORD=postgres
80-
- PROPERTIES_BOOTSTRAP_SERVERS=kafka:9092
81-
- PROPERTIES_GROUP_ID=mygroupid
8284
- |
8385
FLINK_PROPERTIES=
8486
taskmanager.slot.timeout: 30000ms
8587
jobmanager.rpc.address: jobmanager
88+
state.savepoints-storage: filesystem
89+
state.savepoints.dir: file:///tmp/flink-jar-runner
8690
command: taskmanager
8791
depends_on:
8892
jobmanager:
@@ -91,6 +95,12 @@ services:
9195
- flink_conf:/opt/flink/conf
9296
- ./:/opt/flink/usrlib/
9397
- ./datasources/:/datasources/
98+
- /tmp:/tmp
99+
restart: always
100+
deploy:
101+
resources:
102+
limits:
103+
memory: 2g
94104
networks:
95105
- datasqrl_network
96106

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright © 2024 DataSQRL (contact@datasqrl.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datasqrl;
17+
18+
import java.io.ByteArrayOutputStream;
19+
import java.io.IOException;
20+
import java.nio.file.Path;
21+
import java.time.Duration;
22+
import lombok.experimental.UtilityClass;
23+
import lombok.extern.slf4j.Slf4j;
24+
import org.apache.commons.exec.CommandLine;
25+
import org.apache.commons.exec.DefaultExecutor;
26+
import org.apache.commons.exec.ExecuteException;
27+
import org.apache.commons.exec.ExecuteWatchdog;
28+
import org.apache.commons.exec.PumpStreamHandler;
29+
30+
@Slf4j
31+
@UtilityClass
32+
public class CommandLineUtil {
33+
34+
public static String execute(Path workDir, String command) throws ExecuteException {
35+
command = command.replaceAll("\\s+", " ").trim();
36+
37+
log.info("Executing command: {}", command);
38+
39+
var cmdLine = CommandLine.parse(command);
40+
41+
var output = new ByteArrayOutputStream();
42+
var streamHandler = new PumpStreamHandler(output);
43+
44+
var executor =
45+
DefaultExecutor.builder()
46+
.setWorkingDirectory(workDir.toFile())
47+
.setExecuteStreamHandler(streamHandler)
48+
.get();
49+
executor.setExitValue(0);
50+
51+
var watchdog = ExecuteWatchdog.builder().setTimeout(Duration.ofMinutes(5)).get();
52+
executor.setWatchdog(watchdog);
53+
54+
try {
55+
var exitValue =
56+
// This call is synchronous and will block until the command completes
57+
executor.execute(cmdLine);
58+
log.info("Installation completed successfully with exit code: {}", exitValue);
59+
60+
return new String(output.toByteArray());
61+
} catch (IOException e) {
62+
var result = new String(output.toByteArray());
63+
log.error("Error while executing command:\n{}\noutput:\n{}", command, result);
64+
if (e instanceof ExecuteException) {
65+
ExecuteException ee = (ExecuteException) e;
66+
throw new ExecuteException(result, ee.getExitValue(), ee);
67+
}
68+
throw new RuntimeException(e);
69+
}
70+
}
71+
}

src/test/java/com/datasqrl/FlinkMainIT.java

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,36 @@
2525
import com.nextbreakpoint.flink.client.model.TerminationMode;
2626
import com.nextbreakpoint.flink.client.model.UploadStatus;
2727
import java.io.File;
28+
import java.nio.file.Files;
29+
import java.nio.file.Path;
2830
import java.util.ArrayList;
2931
import java.util.Arrays;
3032
import java.util.List;
33+
import java.util.concurrent.TimeUnit;
3134
import java.util.stream.Collectors;
3235
import java.util.stream.Stream;
3336
import lombok.SneakyThrows;
3437
import org.apache.flink.shaded.curator5.com.google.common.base.Objects;
3538
import org.apache.flink.shaded.curator5.com.google.common.collect.Lists;
39+
import org.junit.jupiter.api.AfterEach;
40+
import org.junit.jupiter.api.BeforeEach;
41+
import org.junit.jupiter.api.Test;
3642
import org.junit.jupiter.params.ParameterizedTest;
3743
import org.junit.jupiter.params.provider.Arguments;
3844
import org.junit.jupiter.params.provider.CsvSource;
3945
import org.junit.jupiter.params.provider.MethodSource;
4046

4147
class FlinkMainIT extends AbstractITSupport {
4248

49+
@BeforeEach
50+
@AfterEach
51+
void terminateJobs() throws Exception {
52+
var jobs = client.getJobsOverview();
53+
jobs.getJobs().stream()
54+
.filter(job -> Objects.equal(job.getState(), JobStatus.RUNNING))
55+
.forEach(job -> stopJobs(job.getJid()));
56+
}
57+
4358
static Stream<Arguments> sqlScripts() {
4459
var scripts = List.of("flink.sql", "test_sql.sql");
4560
var config = List.of(true, false);
@@ -82,6 +97,34 @@ void givenPlansScript_whenExecuting_thenSuccess(String filename, boolean config)
8297
execute(args.toArray(String[]::new));
8398
}
8499

100+
@Test
101+
void givenPlanScript_whenCheckPoint_thenResumeSuccessfulySuccess() throws Exception {
102+
String planFile = "/opt/flink/usrlib/sqrl/compiled-plan.json";
103+
var args = new ArrayList<String>();
104+
args.add("--planfile");
105+
args.add(planFile);
106+
args.add("--config-dir");
107+
args.add("/opt/flink/usrlib/config/");
108+
var jobResponse = execute(args.toArray(String[]::new));
109+
110+
TimeUnit.SECONDS.sleep(1);
111+
112+
Path savePoint = Path.of("/tmp/flink-jar-runner/" + System.currentTimeMillis());
113+
114+
CommandLineUtil.execute(
115+
Path.of("."),
116+
"docker exec -t flink-jar-runner-jobmanager-1 bin/flink stop "
117+
+ jobResponse.getJobid()
118+
+ " --savepointPath "
119+
+ savePoint);
120+
121+
assertThat(savePoint).isNotEmptyDirectory();
122+
123+
try (Stream<Path> filePathStream = Files.walk(savePoint)) {
124+
filePathStream.filter(Files::isRegularFile).forEach(System.out::println);
125+
}
126+
}
127+
85128
@SneakyThrows
86129
JarRunResponseBody execute(String... arguments) {
87130
var jarFile = new File("target/flink-jar-runner.uber.jar");
@@ -91,7 +134,7 @@ JarRunResponseBody execute(String... arguments) {
91134
assertThat(uploadResponse.getStatus()).isEqualTo(UploadStatus.SUCCESS);
92135

93136
// Step 2: Extract jarId from the response
94-
String jarId =
137+
var jarId =
95138
uploadResponse.getFilename().substring(uploadResponse.getFilename().lastIndexOf("/") + 1);
96139

97140
// Step 3: Submit the job
@@ -108,17 +151,20 @@ JarRunResponseBody execute(String... arguments) {
108151
String jobId = jobResponse.getJobid();
109152
assertThat(jobId).isNotNull();
110153

111-
SECONDS.sleep(10);
154+
SECONDS.sleep(2);
112155

156+
return jobResponse;
157+
}
158+
159+
@SneakyThrows
160+
private void stopJobs(String jobId) {
113161
var status = client.getJobStatusInfo(jobId);
114162
if (Objects.equal(status.getStatus(), JobStatus.RUNNING)) {
115163
client.cancelJob(jobId, TerminationMode.CANCEL);
116164
} else {
117165
JobExceptionsInfoWithHistory exceptions = client.getJobExceptions(jobId, 5, null);
118166
fail(exceptions.toString());
119167
}
120-
121-
return jobResponse;
122168
}
123169

124170
@ParameterizedTest(name = "{0}")

0 commit comments

Comments
 (0)