Skip to content

Commit c874e96

Browse files
committed
Test savepoints on compiled plans
Create new integration test that stops job with savepoint, changes compiled plan, restore jobs and check if changes took effect Signed-off-by: Marvin Froeder <marvin@datasqrl.com>
1 parent 89cddfb commit c874e96

File tree

10 files changed

+613
-13
lines changed

10 files changed

+613
-13
lines changed

pom.xml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,27 @@
147147
<scope>test</scope>
148148
</dependency>
149149

150+
<dependency>
151+
<groupId>org.apache.commons</groupId>
152+
<artifactId>commons-exec</artifactId>
153+
<version>1.4.0</version>
154+
<scope>test</scope>
155+
</dependency>
156+
157+
<!-- JDBI -->
158+
<dependency>
159+
<groupId>org.jdbi</groupId>
160+
<artifactId>jdbi3-core</artifactId>
161+
<version>3.45.1</version>
162+
<scope>test</scope>
163+
</dependency>
164+
<dependency>
165+
<groupId>org.jdbi</groupId>
166+
<artifactId>jdbi3-sqlobject</artifactId>
167+
<version>3.45.1</version>
168+
<scope>test</scope>
169+
</dependency>
170+
150171
<dependency>
151172
<groupId>org.apache.flink</groupId>
152173
<artifactId>flink-connector-kafka</artifactId>
@@ -203,6 +224,7 @@
203224
<version>${flink.version}</version>
204225
</dependency>
205226
</dependencies>
227+
206228
<repositories>
207229
<repository>
208230
<id>central</id>

src/test/docker/docker-compose.yml

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@ networks:
2222

2323
services:
2424
postgres:
25-
image: postgres:16
25+
image: postgres:17
2626
environment:
2727
POSTGRES_USER: postgres
2828
POSTGRES_PASSWORD: postgres
2929
POSTGRES_DB: datasqrl
3030
ports:
3131
- "5432:5432"
32+
volumes:
33+
- ./sqrl/postgres-schema.sql:/docker-entrypoint-initdb.d/database-schema.sql:ro
3234
healthcheck:
3335
test: ["CMD-SHELL", "pg_isready -U postgres"]
3436
interval: 1s
@@ -38,18 +40,17 @@ services:
3840
- datasqrl_network
3941

4042
jobmanager:
41-
user: 1000:1000
4243
image: flink:${flink-base-image}
4344
environment:
4445
- JDBC_URL=jdbc:postgresql://postgres:5432/datasqrl
4546
- JDBC_USERNAME=postgres
4647
- JDBC_PASSWORD=postgres
47-
- PROPERTIES_BOOTSTRAP_SERVERS=kafka:9092
48-
- PROPERTIES_GROUP_ID=mygroupid
4948
- |
5049
FLINK_PROPERTIES=
5150
taskmanager.slot.timeout: 30000ms
5251
jobmanager.rpc.address: jobmanager
52+
state.savepoints-storage: filesystem
53+
state.savepoints.dir: file:///tmp/flink-jar-runner
5354
command: jobmanager
5455
ports:
5556
- "8081:8081" # Flink JobManager REST port
@@ -62,27 +63,32 @@ services:
6263
- flink_conf:/opt/flink/conf
6364
- ./:/opt/flink/usrlib/
6465
- ./datasources/:/datasources/
66+
- /tmp:/tmp
6567
healthcheck:
6668
test: ["CMD-SHELL", "curl -f http://localhost:8081/ || exit 1"]
6769
interval: 1s
6870
timeout: 1s
6971
retries: 50
72+
restart: always
73+
deploy:
74+
resources:
75+
limits:
76+
memory: 2g
7077
networks:
7178
- datasqrl_network
7279

7380
taskmanager:
74-
user: 1000:1000
7581
image: flink:${flink-base-image}
7682
environment:
7783
- JDBC_URL=jdbc:postgresql://postgres:5432/datasqrl
7884
- JDBC_USERNAME=postgres
7985
- JDBC_PASSWORD=postgres
80-
- PROPERTIES_BOOTSTRAP_SERVERS=kafka:9092
81-
- PROPERTIES_GROUP_ID=mygroupid
8286
- |
8387
FLINK_PROPERTIES=
8488
taskmanager.slot.timeout: 30000ms
8589
jobmanager.rpc.address: jobmanager
90+
state.savepoints-storage: filesystem
91+
state.savepoints.dir: file:///tmp/flink-jar-runner
8692
command: taskmanager
8793
depends_on:
8894
jobmanager:
@@ -91,6 +97,12 @@ services:
9197
- flink_conf:/opt/flink/conf
9298
- ./:/opt/flink/usrlib/
9399
- ./datasources/:/datasources/
100+
- /tmp:/tmp
101+
restart: always
102+
deploy:
103+
resources:
104+
limits:
105+
memory: 2g
94106
networks:
95107
- datasqrl_network
96108

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright © 2024 DataSQRL (contact@datasqrl.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datasqrl;
17+
18+
import java.io.ByteArrayOutputStream;
19+
import java.io.IOException;
20+
import java.nio.file.Path;
21+
import java.time.Duration;
22+
import lombok.experimental.UtilityClass;
23+
import lombok.extern.slf4j.Slf4j;
24+
import org.apache.commons.exec.CommandLine;
25+
import org.apache.commons.exec.DefaultExecutor;
26+
import org.apache.commons.exec.ExecuteException;
27+
import org.apache.commons.exec.ExecuteWatchdog;
28+
import org.apache.commons.exec.PumpStreamHandler;
29+
30+
@Slf4j
31+
@UtilityClass
32+
public class CommandLineUtil {
33+
34+
public static String execute(String command) throws ExecuteException {
35+
return execute(Path.of("."), command);
36+
}
37+
38+
public static String execute(Path workDir, String command) throws ExecuteException {
39+
command = command.replaceAll("\\s+", " ").trim();
40+
41+
log.info("Executing command: {}", command);
42+
43+
var cmdLine = CommandLine.parse(command);
44+
45+
var output = new ByteArrayOutputStream();
46+
var streamHandler = new PumpStreamHandler(output);
47+
48+
var executor =
49+
DefaultExecutor.builder()
50+
.setWorkingDirectory(workDir.toFile())
51+
.setExecuteStreamHandler(streamHandler)
52+
.get();
53+
executor.setExitValue(0);
54+
55+
var watchdog = ExecuteWatchdog.builder().setTimeout(Duration.ofMinutes(5)).get();
56+
executor.setWatchdog(watchdog);
57+
58+
try {
59+
var exitValue =
60+
// This call is synchronous and will block until the command completes
61+
executor.execute(cmdLine);
62+
log.info("Installation completed successfully with exit code: {}", exitValue);
63+
64+
return new String(output.toByteArray());
65+
} catch (IOException e) {
66+
var result = new String(output.toByteArray());
67+
log.error("Error while executing command:\n{}\noutput:\n{}", command, result);
68+
if (e instanceof ExecuteException) {
69+
ExecuteException ee = (ExecuteException) e;
70+
throw new ExecuteException(result, ee.getExitValue(), ee);
71+
}
72+
throw new RuntimeException(e);
73+
}
74+
}
75+
}

0 commit comments

Comments
 (0)