Skip to content

Commit a6956b0

Browse files
authored
Merge pull request #68 from ferenc-csaky/safe-kafka-connectors
Add safe kafka connectors
2 parents 26ee92c + d3971ea commit a6956b0

File tree

28 files changed

+2957
-37
lines changed

28 files changed

+2957
-37
lines changed

.github/workflows/deploy.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ jobs:
2828
- name: Set up Docker Buildx
2929
uses: docker/setup-buildx-action@v2
3030

31-
- name: Set up JDK 11
31+
- name: Set up JDK
3232
uses: actions/setup-java@v3
3333
with:
3434
distribution: 'temurin'

.github/workflows/uber-jar.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ jobs:
2020

2121
strategy:
2222
matrix:
23-
FLINK_PROFILE: [flink-1.19, flink-1.20]
23+
FLINK_PROFILE: [flink-1.19]
2424

2525
env:
2626
VERSION: ${{ github.event_name == 'release' && github.event.action == 'created' && github.ref_name || '1.0.0-SNAPSHOT' }}
@@ -32,7 +32,7 @@ jobs:
3232
- name: Set up Docker Buildx
3333
uses: docker/setup-buildx-action@v2
3434

35-
- name: Set up JDK 11
35+
- name: Set up JDK
3636
uses: actions/setup-java@v3
3737
with:
3838
distribution: 'temurin'
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
4+
Copyright © 2024 DataSQRL (contact@datasqrl.com)
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
18+
-->
19+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
21+
<modelVersion>4.0.0</modelVersion>
22+
23+
<parent>
24+
<groupId>com.datasqrl.flinkrunner</groupId>
25+
<artifactId>connectors</artifactId>
26+
<version>1.0.0-SNAPSHOT</version>
27+
</parent>
28+
29+
<artifactId>kafka-safe-connector</artifactId>
30+
<description>Kafka connector with deserialization error handling</description>
31+
32+
<dependencies>
33+
<dependency>
34+
<groupId>org.apache.flink</groupId>
35+
<artifactId>flink-core</artifactId>
36+
<version>${flink.version}</version>
37+
<scope>provided</scope>
38+
</dependency>
39+
40+
<dependency>
41+
<groupId>org.apache.flink</groupId>
42+
<artifactId>flink-streaming-java</artifactId>
43+
<version>${flink.version}</version>
44+
<scope>provided</scope>
45+
</dependency>
46+
47+
<dependency>
48+
<groupId>org.apache.flink</groupId>
49+
<artifactId>flink-table-api-java</artifactId>
50+
<version>${flink.version}</version>
51+
<scope>provided</scope>
52+
</dependency>
53+
54+
<dependency>
55+
<groupId>org.apache.flink</groupId>
56+
<artifactId>flink-table-api-java-bridge</artifactId>
57+
<version>${flink.version}</version>
58+
<scope>provided</scope>
59+
</dependency>
60+
61+
<dependency>
62+
<groupId>org.apache.flink</groupId>
63+
<artifactId>flink-connector-kafka</artifactId>
64+
<version>${kafka.version}</version>
65+
<scope>provided</scope>
66+
</dependency>
67+
68+
<dependency>
69+
<groupId>com.google.auto.service</groupId>
70+
<artifactId>auto-service</artifactId>
71+
</dependency>
72+
73+
<dependency>
74+
<groupId>org.junit.jupiter</groupId>
75+
<artifactId>junit-jupiter</artifactId>
76+
<scope>test</scope>
77+
</dependency>
78+
79+
<dependency>
80+
<groupId>org.assertj</groupId>
81+
<artifactId>assertj-core</artifactId>
82+
<version>${assertj.version}</version>
83+
<scope>test</scope>
84+
</dependency>
85+
86+
<dependency>
87+
<groupId>org.mockito</groupId>
88+
<artifactId>mockito-core</artifactId>
89+
<version>${mockito.version}</version>
90+
<scope>test</scope>
91+
</dependency>
92+
</dependencies>
93+
</project>
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright © 2024 DataSQRL (contact@datasqrl.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datasqrl;
17+
18+
import static com.datasqrl.DeserFailureHandlerOptions.SCAN_DESER_FAILURE_HANDLER;
19+
import static com.datasqrl.DeserFailureHandlerOptions.SCAN_DESER_FAILURE_TOPIC;
20+
21+
import java.io.IOException;
22+
import java.io.Serializable;
23+
import java.util.Properties;
24+
import javax.annotation.Nullable;
25+
import org.apache.flink.configuration.ReadableConfig;
26+
import org.apache.kafka.clients.consumer.ConsumerRecord;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
public class DeserFailureHandler implements Serializable {
31+
32+
private static final Logger LOG = LoggerFactory.getLogger(DeserFailureHandler.class);
33+
private static final long serialVersionUID = 1L;
34+
35+
private final DeserFailureHandlerType handlerType;
36+
private final @Nullable DeserFailureProducer producer;
37+
38+
DeserFailureHandler(
39+
DeserFailureHandlerType handlerType, @Nullable DeserFailureProducer producer) {
40+
this.handlerType = handlerType;
41+
this.producer = producer;
42+
}
43+
44+
public static DeserFailureHandler of(ReadableConfig tableOptions, Properties consumerProps) {
45+
DeserFailureHandlerType handlerType = tableOptions.get(SCAN_DESER_FAILURE_HANDLER);
46+
47+
DeserFailureProducer producer =
48+
handlerType == DeserFailureHandlerType.KAFKA
49+
? new DeserFailureProducer(tableOptions.get(SCAN_DESER_FAILURE_TOPIC), consumerProps)
50+
: null;
51+
52+
return new DeserFailureHandler(handlerType, producer);
53+
}
54+
55+
public void deserWithFailureHandling(
56+
ConsumerRecord<byte[], byte[]> record, DeserializationCaller deser) throws Exception {
57+
58+
try {
59+
deser.call();
60+
} catch (IOException e) {
61+
if (DeserFailureHandlerType.NONE == handlerType) {
62+
throw e;
63+
64+
} else if (DeserFailureHandlerType.LOG == handlerType) {
65+
LOG.warn(
66+
"Deserialization failure occurred. Topic: {}, Partition: {}, Offset: {}",
67+
record.topic(),
68+
record.partition(),
69+
record.offset());
70+
71+
} else if (DeserFailureHandlerType.KAFKA == handlerType) {
72+
LOG.warn(
73+
"Deserialization failure occurred, sending the record to the configured topic ({}). Topic: {}, Partition: {}, Offset: {}",
74+
producer.getTopic(),
75+
record.topic(),
76+
record.partition(),
77+
record.offset());
78+
producer.send(record);
79+
}
80+
81+
LOG.debug("Failure cause", e);
82+
LOG.trace("Failed record: {}", record);
83+
}
84+
}
85+
86+
public interface DeserializationCaller extends Serializable {
87+
void call() throws Exception;
88+
}
89+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright © 2024 DataSQRL (contact@datasqrl.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datasqrl;
17+
18+
import org.apache.flink.configuration.ConfigOption;
19+
import org.apache.flink.configuration.ConfigOptions;
20+
import org.apache.flink.configuration.ReadableConfig;
21+
import org.apache.flink.table.api.ValidationException;
22+
import org.apache.flink.util.StringUtils;
23+
24+
public class DeserFailureHandlerOptions {
25+
26+
public static final ConfigOption<DeserFailureHandlerType> SCAN_DESER_FAILURE_HANDLER =
27+
ConfigOptions.key("scan.deser-failure.handler")
28+
.enumType(DeserFailureHandlerType.class)
29+
.defaultValue(DeserFailureHandlerType.NONE);
30+
31+
public static final ConfigOption<String> SCAN_DESER_FAILURE_TOPIC =
32+
ConfigOptions.key("scan.deser-failure.topic").stringType().noDefaultValue();
33+
34+
public static void validateDeserFailureHandlerOptions(ReadableConfig tableOptions) {
35+
var handler = tableOptions.get(SCAN_DESER_FAILURE_HANDLER);
36+
var topic = tableOptions.get(SCAN_DESER_FAILURE_TOPIC);
37+
38+
if (handler == DeserFailureHandlerType.KAFKA && StringUtils.isNullOrWhitespaceOnly(topic)) {
39+
throw new ValidationException(
40+
String.format(
41+
"'%s' is set to '%s', but '%s' is not specified.",
42+
SCAN_DESER_FAILURE_HANDLER.key(),
43+
DeserFailureHandlerType.KAFKA,
44+
SCAN_DESER_FAILURE_TOPIC.key()));
45+
}
46+
47+
if (handler != DeserFailureHandlerType.KAFKA && !StringUtils.isNullOrWhitespaceOnly(topic)) {
48+
throw new ValidationException(
49+
String.format(
50+
"'%s' is not set to '%s', but '%s' is specified.",
51+
SCAN_DESER_FAILURE_HANDLER.key(),
52+
DeserFailureHandlerType.KAFKA,
53+
SCAN_DESER_FAILURE_TOPIC.key()));
54+
}
55+
}
56+
57+
private DeserFailureHandlerOptions() {}
58+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright © 2024 DataSQRL (contact@datasqrl.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datasqrl;
17+
18+
public enum DeserFailureHandlerType {
19+
/**
20+
* No deserialization failure handling is applied. In case of a problematic record, the
21+
* application will fail. This is the default setting.
22+
*/
23+
NONE,
24+
25+
/**
26+
* In case of a problematic record, helpful information will be logged. The application continues
27+
* the execution.
28+
*/
29+
LOG,
30+
31+
/**
32+
* In case of a problematic record, helpful information will be logged, and the record will be
33+
* sent to a configured Kafka topic as well. The application continues the execution.
34+
*/
35+
KAFKA
36+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright © 2024 DataSQRL (contact@datasqrl.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datasqrl;
17+
18+
import static org.apache.flink.util.Preconditions.checkNotNull;
19+
20+
import java.io.Serializable;
21+
import java.util.Properties;
22+
import org.apache.kafka.clients.consumer.ConsumerRecord;
23+
import org.apache.kafka.clients.producer.KafkaProducer;
24+
import org.apache.kafka.clients.producer.ProducerConfig;
25+
import org.apache.kafka.clients.producer.ProducerRecord;
26+
import org.apache.kafka.common.serialization.ByteArraySerializer;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
class DeserFailureProducer implements Serializable {
31+
32+
private static final Logger LOG = LoggerFactory.getLogger(DeserFailureProducer.class);
33+
private static final long serialVersionUID = 1L;
34+
35+
private final String topic;
36+
private final Properties producerProps;
37+
38+
private transient KafkaProducer<byte[], byte[]> producer;
39+
40+
DeserFailureProducer(String topic, Properties consumerProps) {
41+
this.topic = checkNotNull(topic);
42+
43+
producerProps = new Properties();
44+
producerProps.putAll(consumerProps);
45+
producerProps.setProperty(
46+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
47+
producerProps.setProperty(
48+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
49+
}
50+
51+
private void init() {
52+
producer = new KafkaProducer<>(producerProps);
53+
}
54+
55+
void send(ConsumerRecord<byte[], byte[]> record) {
56+
if (producer == null) {
57+
init();
58+
}
59+
60+
if (record == null) {
61+
LOG.info("Unable to send deserialization failed record: Record was null.");
62+
} else {
63+
producer.send(
64+
new ProducerRecord<>(topic, null, null, record.key(), record.value(), record.headers()));
65+
}
66+
}
67+
68+
public String getTopic() {
69+
return topic;
70+
}
71+
}

0 commit comments

Comments
 (0)