Skip to content

Commit 11b25fa

Browse files
committed
Use a kafka-safe table in FlinkMainIT
Also added the custom classes to the uber JAR for proper classloading
1 parent 7af5ed9 commit 11b25fa

File tree

16 files changed

+320
-117
lines changed

16 files changed

+320
-117
lines changed

connectors/kafka-safe/pom.xml

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
4+
Copyright © 2024 DataSQRL (contact@datasqrl.com)
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
18+
-->
19+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
21+
<modelVersion>4.0.0</modelVersion>
22+
23+
<parent>
24+
<groupId>com.datasqrl.flinkrunner</groupId>
25+
<artifactId>connectors</artifactId>
26+
<version>1.0.0-SNAPSHOT</version>
27+
</parent>
28+
29+
<artifactId>kafka-safe</artifactId>
30+
31+
<dependencies>
32+
<dependency>
33+
<groupId>org.apache.flink</groupId>
34+
<artifactId>flink-core</artifactId>
35+
<version>${flink.version}</version>
36+
<scope>provided</scope>
37+
</dependency>
38+
39+
<dependency>
40+
<groupId>org.apache.flink</groupId>
41+
<artifactId>flink-streaming-java</artifactId>
42+
<version>${flink.version}</version>
43+
<scope>provided</scope>
44+
</dependency>
45+
46+
<dependency>
47+
<groupId>org.apache.flink</groupId>
48+
<artifactId>flink-table-api-java</artifactId>
49+
<version>${flink.version}</version>
50+
<scope>provided</scope>
51+
</dependency>
52+
53+
<dependency>
54+
<groupId>org.apache.flink</groupId>
55+
<artifactId>flink-table-api-java-bridge</artifactId>
56+
<version>${flink.version}</version>
57+
<scope>provided</scope>
58+
</dependency>
59+
60+
<dependency>
61+
<groupId>org.apache.flink</groupId>
62+
<artifactId>flink-connector-kafka</artifactId>
63+
<version>${kafka.version}</version>
64+
<scope>provided</scope>
65+
</dependency>
66+
67+
<dependency>
68+
<groupId>com.google.auto.service</groupId>
69+
<artifactId>auto-service</artifactId>
70+
</dependency>
71+
</dependencies>
72+
</project>

connectors/kafka/src/main/java/com/datasqrl/DeserFailureHandler.java renamed to connectors/kafka-safe/src/main/java/com/datasqrl/DeserFailureHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,10 @@
2727
import org.slf4j.Logger;
2828
import org.slf4j.LoggerFactory;
2929

30-
public class DeserFailureHandler {
30+
public class DeserFailureHandler implements Serializable {
3131

3232
private static final Logger LOG = LoggerFactory.getLogger(DeserFailureHandler.class);
33+
private static final long serialVersionUID = 1L;
3334

3435
private final DeserFailureHandlerType handlerType;
3536
private final @Nullable DeserFailureProducer producer;

connectors/kafka/src/main/java/com/datasqrl/DeserFailureProducer.java renamed to connectors/kafka-safe/src/main/java/com/datasqrl/DeserFailureProducer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
class DeserFailureProducer implements Serializable {
3131

3232
private static final Logger LOG = LoggerFactory.getLogger(DeserFailureProducer.class);
33+
private static final long serialVersionUID = 1L;
3334

3435
private final String topic;
3536
private final Properties producerProps;

connectors/kafka/pom.xml

Lines changed: 0 additions & 74 deletions
This file was deleted.

connectors/pom.xml

Lines changed: 25 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,31 +16,29 @@
1616
limitations under the License.
1717
1818
-->
19-
<project xmlns="http://maven.apache.org/POM/4.0.0"
20-
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
21-
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
22-
23-
<modelVersion>4.0.0</modelVersion>
24-
25-
<parent>
26-
<groupId>com.datasqrl.flinkrunner</groupId>
27-
<artifactId>flink-sql-runner-parent</artifactId>
28-
<version>1.0.0-SNAPSHOT</version>
29-
</parent>
30-
31-
<artifactId>connectors</artifactId>
32-
<packaging>pom</packaging>
33-
34-
<modules>
35-
<module>kafka</module>
36-
</modules>
37-
38-
<dependencies>
39-
<dependency>
40-
<groupId>org.apache.flink</groupId>
41-
<artifactId>flink-connector-base</artifactId>
42-
<version>${flink.version}</version>
43-
<scope>provided</scope>
44-
</dependency>
45-
</dependencies>
19+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
21+
<modelVersion>4.0.0</modelVersion>
22+
23+
<parent>
24+
<groupId>com.datasqrl.flinkrunner</groupId>
25+
<artifactId>flink-sql-runner-parent</artifactId>
26+
<version>1.0.0-SNAPSHOT</version>
27+
</parent>
28+
29+
<artifactId>connectors</artifactId>
30+
<packaging>pom</packaging>
31+
32+
<modules>
33+
<module>kafka-safe</module>
34+
</modules>
35+
36+
<dependencies>
37+
<dependency>
38+
<groupId>org.apache.flink</groupId>
39+
<artifactId>flink-connector-base</artifactId>
40+
<version>${flink.version}</version>
41+
<scope>provided</scope>
42+
</dependency>
43+
</dependencies>
4644
</project>

flink-sql-runner/pom.xml

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -172,13 +172,12 @@
172172

173173
<dependency>
174174
<groupId>com.datasqrl.flinkrunner</groupId>
175-
<artifactId>system-functions-discovery</artifactId>
175+
<artifactId>kafka-safe</artifactId>
176176
<version>1.0.0-SNAPSHOT</version>
177-
<scope>provided</scope>
178177
</dependency>
179178
<dependency>
180179
<groupId>com.datasqrl.flinkrunner</groupId>
181-
<artifactId>kafka</artifactId>
180+
<artifactId>system-functions-discovery</artifactId>
182181
<version>1.0.0-SNAPSHOT</version>
183182
<scope>provided</scope>
184183
</dependency>
@@ -380,18 +379,6 @@
380379
<outputDirectory>${project.build.directory}</outputDirectory>
381380
</configuration>
382381
</execution>
383-
<execution>
384-
<id>copy-kafka-connector</id>
385-
<goals>
386-
<goal>copy-dependencies</goal>
387-
</goals>
388-
<phase>process-resources</phase>
389-
<configuration>
390-
<includeGroupIds>com.datasqrl.flinkrunner</includeGroupIds>
391-
<includeArtifactIds>kafka</includeArtifactIds>
392-
<outputDirectory>${project.build.directory}</outputDirectory>
393-
</configuration>
394-
</execution>
395382
</executions>
396383
</plugin>
397384
</plugins>

flink-sql-runner/src/main/docker/docker-compose.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ services:
4545
- JDBC_URL=jdbc:postgresql://postgres:5432/datasqrl
4646
- JDBC_USERNAME=postgres
4747
- JDBC_PASSWORD=postgres
48+
- PROPERTIES_BOOTSTRAP_SERVERS=kafka:9092
49+
- PROPERTIES_GROUP_ID=mygroupid
4850
- |
4951
FLINK_PROPERTIES=
5052
taskmanager.slot.timeout: 30000ms

flink-sql-runner/src/test/java/com/datasqrl/FlinkMainIT.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,4 +296,10 @@ void givenUdfPlansScript_whenExecuting_thenSuccess() {
296296
String planFile = "/opt/flink/usrlib/plans/compiled-plan-udf.json";
297297
execute("--planfile", planFile, "--udfpath", "/opt/flink/usrlib/udfs/");
298298
}
299+
300+
@Test
301+
void givenKafkaPlanScript_whenExecuting_thenSuccess() {
302+
String planFile = "/opt/flink/usrlib/plans/kafka_plan.json";
303+
execute("--planfile", planFile);
304+
}
299305
}

0 commit comments

Comments
 (0)