Skip to content

Commit 1909806

Browse files
authored
Merge pull request #19 from DataSQRL/savepoint
Test savepoints on compiled plans
2 parents 89cddfb + c874e96 commit 1909806

File tree

10 files changed

+613
-13
lines changed

10 files changed

+613
-13
lines changed

pom.xml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,27 @@
147147
<scope>test</scope>
148148
</dependency>
149149

150+
<dependency>
151+
<groupId>org.apache.commons</groupId>
152+
<artifactId>commons-exec</artifactId>
153+
<version>1.4.0</version>
154+
<scope>test</scope>
155+
</dependency>
156+
157+
<!-- JDBI -->
158+
<dependency>
159+
<groupId>org.jdbi</groupId>
160+
<artifactId>jdbi3-core</artifactId>
161+
<version>3.45.1</version>
162+
<scope>test</scope>
163+
</dependency>
164+
<dependency>
165+
<groupId>org.jdbi</groupId>
166+
<artifactId>jdbi3-sqlobject</artifactId>
167+
<version>3.45.1</version>
168+
<scope>test</scope>
169+
</dependency>
170+
150171
<dependency>
151172
<groupId>org.apache.flink</groupId>
152173
<artifactId>flink-connector-kafka</artifactId>
@@ -203,6 +224,7 @@
203224
<version>${flink.version}</version>
204225
</dependency>
205226
</dependencies>
227+
206228
<repositories>
207229
<repository>
208230
<id>central</id>

src/test/docker/docker-compose.yml

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@ networks:
2222

2323
services:
2424
postgres:
25-
image: postgres:16
25+
image: postgres:17
2626
environment:
2727
POSTGRES_USER: postgres
2828
POSTGRES_PASSWORD: postgres
2929
POSTGRES_DB: datasqrl
3030
ports:
3131
- "5432:5432"
32+
volumes:
33+
- ./sqrl/postgres-schema.sql:/docker-entrypoint-initdb.d/database-schema.sql:ro
3234
healthcheck:
3335
test: ["CMD-SHELL", "pg_isready -U postgres"]
3436
interval: 1s
@@ -38,18 +40,17 @@ services:
3840
- datasqrl_network
3941

4042
jobmanager:
41-
user: 1000:1000
4243
image: flink:${flink-base-image}
4344
environment:
4445
- JDBC_URL=jdbc:postgresql://postgres:5432/datasqrl
4546
- JDBC_USERNAME=postgres
4647
- JDBC_PASSWORD=postgres
47-
- PROPERTIES_BOOTSTRAP_SERVERS=kafka:9092
48-
- PROPERTIES_GROUP_ID=mygroupid
4948
- |
5049
FLINK_PROPERTIES=
5150
taskmanager.slot.timeout: 30000ms
5251
jobmanager.rpc.address: jobmanager
52+
state.savepoints-storage: filesystem
53+
state.savepoints.dir: file:///tmp/flink-jar-runner
5354
command: jobmanager
5455
ports:
5556
- "8081:8081" # Flink JobManager REST port
@@ -62,27 +63,32 @@ services:
6263
- flink_conf:/opt/flink/conf
6364
- ./:/opt/flink/usrlib/
6465
- ./datasources/:/datasources/
66+
- /tmp:/tmp
6567
healthcheck:
6668
test: ["CMD-SHELL", "curl -f http://localhost:8081/ || exit 1"]
6769
interval: 1s
6870
timeout: 1s
6971
retries: 50
72+
restart: always
73+
deploy:
74+
resources:
75+
limits:
76+
memory: 2g
7077
networks:
7178
- datasqrl_network
7279

7380
taskmanager:
74-
user: 1000:1000
7581
image: flink:${flink-base-image}
7682
environment:
7783
- JDBC_URL=jdbc:postgresql://postgres:5432/datasqrl
7884
- JDBC_USERNAME=postgres
7985
- JDBC_PASSWORD=postgres
80-
- PROPERTIES_BOOTSTRAP_SERVERS=kafka:9092
81-
- PROPERTIES_GROUP_ID=mygroupid
8286
- |
8387
FLINK_PROPERTIES=
8488
taskmanager.slot.timeout: 30000ms
8589
jobmanager.rpc.address: jobmanager
90+
state.savepoints-storage: filesystem
91+
state.savepoints.dir: file:///tmp/flink-jar-runner
8692
command: taskmanager
8793
depends_on:
8894
jobmanager:
@@ -91,6 +97,12 @@ services:
9197
- flink_conf:/opt/flink/conf
9298
- ./:/opt/flink/usrlib/
9399
- ./datasources/:/datasources/
100+
- /tmp:/tmp
101+
restart: always
102+
deploy:
103+
resources:
104+
limits:
105+
memory: 2g
94106
networks:
95107
- datasqrl_network
96108

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright © 2024 DataSQRL (contact@datasqrl.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datasqrl;
17+
18+
import java.io.ByteArrayOutputStream;
19+
import java.io.IOException;
20+
import java.nio.file.Path;
21+
import java.time.Duration;
22+
import lombok.experimental.UtilityClass;
23+
import lombok.extern.slf4j.Slf4j;
24+
import org.apache.commons.exec.CommandLine;
25+
import org.apache.commons.exec.DefaultExecutor;
26+
import org.apache.commons.exec.ExecuteException;
27+
import org.apache.commons.exec.ExecuteWatchdog;
28+
import org.apache.commons.exec.PumpStreamHandler;
29+
30+
@Slf4j
31+
@UtilityClass
32+
public class CommandLineUtil {
33+
34+
public static String execute(String command) throws ExecuteException {
35+
return execute(Path.of("."), command);
36+
}
37+
38+
public static String execute(Path workDir, String command) throws ExecuteException {
39+
command = command.replaceAll("\\s+", " ").trim();
40+
41+
log.info("Executing command: {}", command);
42+
43+
var cmdLine = CommandLine.parse(command);
44+
45+
var output = new ByteArrayOutputStream();
46+
var streamHandler = new PumpStreamHandler(output);
47+
48+
var executor =
49+
DefaultExecutor.builder()
50+
.setWorkingDirectory(workDir.toFile())
51+
.setExecuteStreamHandler(streamHandler)
52+
.get();
53+
executor.setExitValue(0);
54+
55+
var watchdog = ExecuteWatchdog.builder().setTimeout(Duration.ofMinutes(5)).get();
56+
executor.setWatchdog(watchdog);
57+
58+
try {
59+
var exitValue =
60+
// This call is synchronous and will block until the command completes
61+
executor.execute(cmdLine);
62+
log.info("Installation completed successfully with exit code: {}", exitValue);
63+
64+
return new String(output.toByteArray());
65+
} catch (IOException e) {
66+
var result = new String(output.toByteArray());
67+
log.error("Error while executing command:\n{}\noutput:\n{}", command, result);
68+
if (e instanceof ExecuteException) {
69+
ExecuteException ee = (ExecuteException) e;
70+
throw new ExecuteException(result, ee.getExitValue(), ee);
71+
}
72+
throw new RuntimeException(e);
73+
}
74+
}
75+
}

0 commit comments

Comments
 (0)