Skip to content

Commit 89cddfb

Browse files
authored
Merge pull request #18 from DataSQRL/upgrade_client_api
Bump flink client library version
2 parents 8b34802 + 1060e46 commit 89cddfb

File tree

5 files changed

+480
-110
lines changed

5 files changed

+480
-110
lines changed

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,8 @@
135135

136136
<dependency>
137137
<groupId>com.nextbreakpoint</groupId>
138-
<artifactId>com.nextbreakpoint.flinkclient</artifactId>
139-
<version>1.0.4</version>
138+
<artifactId>com.nextbreakpoint.flink.client</artifactId>
139+
<version>1.1.4</version>
140140
<scope>test</scope>
141141
</dependency>
142142

src/test/docker/docker-compose.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ services:
3838
- datasqrl_network
3939

4040
jobmanager:
41+
user: 1000:1000
4142
image: flink:${flink-base-image}
4243
environment:
4344
- JDBC_URL=jdbc:postgresql://postgres:5432/datasqrl
@@ -70,6 +71,7 @@ services:
7071
- datasqrl_network
7172

7273
taskmanager:
74+
user: 1000:1000
7375
image: flink:${flink-base-image}
7476
environment:
7577
- JDBC_URL=jdbc:postgresql://postgres:5432/datasqrl

src/test/java/com/datasqrl/AbstractITSupport.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
import static java.util.concurrent.TimeUnit.SECONDS;
2020
import static org.awaitility.Awaitility.await;
2121

22-
import com.nextbreakpoint.flinkclient.api.ApiException;
23-
import com.nextbreakpoint.flinkclient.api.FlinkApi;
24-
import com.nextbreakpoint.flinkclient.model.JobIdsWithStatusOverview;
22+
import com.nextbreakpoint.flink.client.api.ApiException;
23+
import com.nextbreakpoint.flink.client.api.FlinkApi;
24+
import com.nextbreakpoint.flink.client.model.JobIdsWithStatusOverview;
25+
import com.nextbreakpoint.flink.client.model.TerminationMode;
26+
import java.time.Duration;
2527
import java.util.Optional;
2628
import java.util.concurrent.TimeUnit;
2729
import lombok.extern.slf4j.Slf4j;
@@ -38,9 +40,17 @@ static void waitServiceStart() throws ApiException {
3840
client.getApiClient().setBasePath(serverUrl());
3941

4042
int timeout = (int) TimeUnit.MINUTES.toMillis(2);
41-
client.getApiClient().setConnectTimeout(timeout);
42-
client.getApiClient().setReadTimeout(timeout);
43-
client.getApiClient().setWriteTimeout(timeout);
43+
client
44+
.getApiClient()
45+
.setHttpClient(
46+
client
47+
.getApiClient()
48+
.getHttpClient()
49+
.newBuilder()
50+
.connectTimeout(Duration.ofMinutes(2))
51+
.writeTimeout(Duration.ofMinutes(2))
52+
.readTimeout(Duration.ofMinutes(2))
53+
.build());
4454

4555
await()
4656
.atMost(100, SECONDS)
@@ -49,16 +59,16 @@ static void waitServiceStart() throws ApiException {
4959
.until(
5060
() -> {
5161
log.info("Awaiting for custody-api");
52-
return client.getJobs() != null;
62+
return client.getJobsOverview() != null;
5363
});
5464

55-
final JobIdsWithStatusOverview statusOverview = client.getJobs();
65+
final JobIdsWithStatusOverview statusOverview = client.getJobIdsWithStatusesOverview();
5666
statusOverview
5767
.getJobs()
5868
.forEach(
5969
jobIdWithStatus -> {
6070
try {
61-
client.terminateJob(jobIdWithStatus.getId(), "cancel");
71+
client.cancelJob(jobIdWithStatus.getId(), TerminationMode.CANCEL);
6272
} catch (ApiException ignored) {
6373
}
6474
});

src/test/java/com/datasqrl/FlinkMainIT.java

Lines changed: 36 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,23 @@
1515
*/
1616
package com.datasqrl;
1717

18+
import static java.util.concurrent.TimeUnit.SECONDS;
1819
import static org.assertj.core.api.Assertions.assertThat;
19-
import static org.assertj.core.api.Assertions.assertThatNoException;
20+
import static org.junit.jupiter.api.Assertions.fail;
2021

21-
import com.nextbreakpoint.flinkclient.model.JarRunResponseBody;
22-
import com.nextbreakpoint.flinkclient.model.JarUploadResponseBody;
23-
import com.nextbreakpoint.flinkclient.model.JarUploadResponseBody.StatusEnum;
22+
import com.nextbreakpoint.flink.client.model.JarRunResponseBody;
23+
import com.nextbreakpoint.flink.client.model.JobExceptionsInfoWithHistory;
24+
import com.nextbreakpoint.flink.client.model.JobStatus;
25+
import com.nextbreakpoint.flink.client.model.TerminationMode;
26+
import com.nextbreakpoint.flink.client.model.UploadStatus;
2427
import java.io.File;
2528
import java.util.ArrayList;
2629
import java.util.Arrays;
2730
import java.util.List;
2831
import java.util.stream.Collectors;
2932
import java.util.stream.Stream;
3033
import lombok.SneakyThrows;
34+
import org.apache.flink.shaded.curator5.com.google.common.base.Objects;
3135
import org.apache.flink.shaded.curator5.com.google.common.collect.Lists;
3236
import org.junit.jupiter.params.ParameterizedTest;
3337
import org.junit.jupiter.params.provider.Arguments;
@@ -79,34 +83,42 @@ void givenPlansScript_whenExecuting_thenSuccess(String filename, boolean config)
7983
}
8084

8185
@SneakyThrows
82-
void execute(String... arguments) {
83-
File jarFile = new File("target/flink-jar-runner.uber.jar");
86+
JarRunResponseBody execute(String... arguments) {
87+
var jarFile = new File("target/flink-jar-runner.uber.jar");
8488

85-
JarUploadResponseBody uploadResponse = client.uploadJar(jarFile);
89+
var uploadResponse = client.uploadJar(jarFile);
8690

87-
assertThat(uploadResponse.getStatus()).isEqualTo(StatusEnum.SUCCESS);
91+
assertThat(uploadResponse.getStatus()).isEqualTo(UploadStatus.SUCCESS);
8892

8993
// Step 2: Extract jarId from the response
9094
String jarId =
9195
uploadResponse.getFilename().substring(uploadResponse.getFilename().lastIndexOf("/") + 1);
9296

9397
// Step 3: Submit the job
94-
assertThatNoException()
95-
.as("Running script %s", Arrays.toString(arguments))
96-
.isThrownBy(
97-
() -> {
98-
JarRunResponseBody jobResponse =
99-
client.runJar(
100-
jarId,
101-
null,
102-
null,
103-
null,
104-
Arrays.stream(arguments).collect(Collectors.joining(",")),
105-
null,
106-
1);
107-
String jobId = jobResponse.getJobid();
108-
assertThat(jobId).isNotNull();
109-
});
98+
var jobResponse =
99+
client.submitJobFromJar(
100+
jarId,
101+
null,
102+
null,
103+
null,
104+
null,
105+
Arrays.stream(arguments).collect(Collectors.joining(",")),
106+
null,
107+
1);
108+
String jobId = jobResponse.getJobid();
109+
assertThat(jobId).isNotNull();
110+
111+
SECONDS.sleep(10);
112+
113+
var status = client.getJobStatusInfo(jobId);
114+
if (Objects.equal(status.getStatus(), JobStatus.RUNNING)) {
115+
client.cancelJob(jobId, TerminationMode.CANCEL);
116+
} else {
117+
JobExceptionsInfoWithHistory exceptions = client.getJobExceptions(jobId, 5, null);
118+
fail(exceptions.toString());
119+
}
120+
121+
return jobResponse;
110122
}
111123

112124
@ParameterizedTest(name = "{0}")

0 commit comments

Comments
 (0)