Skip to content

Commit 364fec9

Browse files
committed
Add kafka-safe nad upsert-kafka-safe connector implementations
1 parent e566543 commit 364fec9

23 files changed

+2850
-42
lines changed

connectors/kafka-safe/pom.xml

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
4+
Copyright © 2024 DataSQRL (contact@datasqrl.com)
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
18+
-->
19+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
21+
<modelVersion>4.0.0</modelVersion>
22+
23+
<parent>
24+
<groupId>com.datasqrl.flinkrunner</groupId>
25+
<artifactId>connectors</artifactId>
26+
<version>1.0.0-SNAPSHOT</version>
27+
</parent>
28+
29+
<artifactId>kafka-safe</artifactId>
30+
31+
<dependencies>
32+
<dependency>
33+
<groupId>org.apache.flink</groupId>
34+
<artifactId>flink-core</artifactId>
35+
<version>${flink.version}</version>
36+
<scope>provided</scope>
37+
</dependency>
38+
39+
<dependency>
40+
<groupId>org.apache.flink</groupId>
41+
<artifactId>flink-streaming-java</artifactId>
42+
<version>${flink.version}</version>
43+
<scope>provided</scope>
44+
</dependency>
45+
46+
<dependency>
47+
<groupId>org.apache.flink</groupId>
48+
<artifactId>flink-table-api-java</artifactId>
49+
<version>${flink.version}</version>
50+
<scope>provided</scope>
51+
</dependency>
52+
53+
<dependency>
54+
<groupId>org.apache.flink</groupId>
55+
<artifactId>flink-table-api-java-bridge</artifactId>
56+
<version>${flink.version}</version>
57+
<scope>provided</scope>
58+
</dependency>
59+
60+
<dependency>
61+
<groupId>org.apache.flink</groupId>
62+
<artifactId>flink-connector-kafka</artifactId>
63+
<version>${kafka.version}</version>
64+
<scope>provided</scope>
65+
</dependency>
66+
67+
<dependency>
68+
<groupId>com.google.auto.service</groupId>
69+
<artifactId>auto-service</artifactId>
70+
</dependency>
71+
72+
<dependency>
73+
<groupId>org.junit.jupiter</groupId>
74+
<artifactId>junit-jupiter</artifactId>
75+
<scope>test</scope>
76+
</dependency>
77+
78+
<dependency>
79+
<groupId>org.assertj</groupId>
80+
<artifactId>assertj-core</artifactId>
81+
<version>${assertj.version}</version>
82+
<scope>test</scope>
83+
</dependency>
84+
85+
<dependency>
86+
<groupId>org.mockito</groupId>
87+
<artifactId>mockito-core</artifactId>
88+
<version>${mockito.version}</version>
89+
<scope>test</scope>
90+
</dependency>
91+
</dependencies>
92+
</project>
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright © 2024 DataSQRL (contact@datasqrl.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datasqrl;
17+
18+
import static com.datasqrl.DeserFailureHandlerOptions.SCAN_DESER_FAILURE_HANDLER;
19+
import static com.datasqrl.DeserFailureHandlerOptions.SCAN_DESER_FAILURE_TOPIC;
20+
21+
import java.io.IOException;
22+
import java.io.Serializable;
23+
import java.util.Properties;
24+
import javax.annotation.Nullable;
25+
import org.apache.flink.configuration.ReadableConfig;
26+
import org.apache.kafka.clients.consumer.ConsumerRecord;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
public class DeserFailureHandler implements Serializable {
31+
32+
private static final Logger LOG = LoggerFactory.getLogger(DeserFailureHandler.class);
33+
private static final long serialVersionUID = 1L;
34+
35+
private final DeserFailureHandlerType handlerType;
36+
private final @Nullable DeserFailureProducer producer;
37+
38+
DeserFailureHandler(
39+
DeserFailureHandlerType handlerType, @Nullable DeserFailureProducer producer) {
40+
this.handlerType = handlerType;
41+
this.producer = producer;
42+
}
43+
44+
public static DeserFailureHandler of(ReadableConfig tableOptions, Properties consumerProps) {
45+
DeserFailureHandlerType handlerType = tableOptions.get(SCAN_DESER_FAILURE_HANDLER);
46+
47+
DeserFailureProducer producer =
48+
handlerType == DeserFailureHandlerType.KAFKA
49+
? new DeserFailureProducer(tableOptions.get(SCAN_DESER_FAILURE_TOPIC), consumerProps)
50+
: null;
51+
52+
return new DeserFailureHandler(handlerType, producer);
53+
}
54+
55+
public void deserWithFailureHandling(
56+
ConsumerRecord<byte[], byte[]> record, DeserializationCaller deser) throws Exception {
57+
58+
try {
59+
deser.call();
60+
} catch (IOException e) {
61+
if (DeserFailureHandlerType.NONE == handlerType) {
62+
throw e;
63+
64+
} else if (DeserFailureHandlerType.LOG == handlerType) {
65+
LOG.warn(
66+
"Deserialization failure occurred. Topic: {}, Partition: {}, Offset: {}",
67+
record.topic(),
68+
record.partition(),
69+
record.offset());
70+
71+
} else if (DeserFailureHandlerType.KAFKA == handlerType) {
72+
LOG.warn(
73+
"Deserialization failure occurred, sending the record to the configured topic ({}). Topic: {}, Partition: {}, Offset: {}",
74+
producer.getTopic(),
75+
record.topic(),
76+
record.partition(),
77+
record.offset());
78+
producer.send(record);
79+
}
80+
81+
LOG.debug("Failure cause", e);
82+
LOG.trace("Failed record: {}", record);
83+
}
84+
}
85+
86+
public interface DeserializationCaller extends Serializable {
87+
void call() throws Exception;
88+
}
89+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright © 2024 DataSQRL (contact@datasqrl.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datasqrl;
17+
18+
import org.apache.flink.configuration.ConfigOption;
19+
import org.apache.flink.configuration.ConfigOptions;
20+
import org.apache.flink.configuration.ReadableConfig;
21+
import org.apache.flink.table.api.ValidationException;
22+
import org.apache.flink.util.StringUtils;
23+
24+
public class DeserFailureHandlerOptions {
25+
26+
public static final ConfigOption<DeserFailureHandlerType> SCAN_DESER_FAILURE_HANDLER =
27+
ConfigOptions.key("scan.deser-failure.handler")
28+
.enumType(DeserFailureHandlerType.class)
29+
.defaultValue(DeserFailureHandlerType.NONE);
30+
31+
public static final ConfigOption<String> SCAN_DESER_FAILURE_TOPIC =
32+
ConfigOptions.key("scan.deser-failure.topic").stringType().noDefaultValue();
33+
34+
public static void validateDeserFailureHandlerOptions(ReadableConfig tableOptions) {
35+
var handler = tableOptions.get(SCAN_DESER_FAILURE_HANDLER);
36+
var topic = tableOptions.get(SCAN_DESER_FAILURE_TOPIC);
37+
38+
if (handler == DeserFailureHandlerType.KAFKA && StringUtils.isNullOrWhitespaceOnly(topic)) {
39+
throw new ValidationException(
40+
String.format(
41+
"'%s' is set to '%s', but '%s' is not specified.",
42+
SCAN_DESER_FAILURE_HANDLER.key(),
43+
DeserFailureHandlerType.KAFKA,
44+
SCAN_DESER_FAILURE_TOPIC.key()));
45+
}
46+
47+
if (handler != DeserFailureHandlerType.KAFKA && !StringUtils.isNullOrWhitespaceOnly(topic)) {
48+
throw new ValidationException(
49+
String.format(
50+
"'%s' is not set to '%s', but '%s' is specified.",
51+
SCAN_DESER_FAILURE_HANDLER.key(),
52+
DeserFailureHandlerType.KAFKA,
53+
SCAN_DESER_FAILURE_TOPIC.key()));
54+
}
55+
}
56+
57+
private DeserFailureHandlerOptions() {}
58+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright © 2024 DataSQRL (contact@datasqrl.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datasqrl;
17+
18+
public enum DeserFailureHandlerType {
19+
/**
20+
* No deserialization failure handling is applied. In case of a problematic record, the
21+
* application will fail. This is the default setting.
22+
*/
23+
NONE,
24+
25+
/**
26+
* In case of a problematic record, helpful information will be logged. The application continues
27+
* the execution.
28+
*/
29+
LOG,
30+
31+
/**
32+
* In case of a problematic record, helpful information will be logged, and the record will be
33+
* sent to a configured Kafka topic as well. The application continues the execution.
34+
*/
35+
KAFKA
36+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright © 2024 DataSQRL (contact@datasqrl.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datasqrl;
17+
18+
import static org.apache.flink.util.Preconditions.checkNotNull;
19+
20+
import java.io.Serializable;
21+
import java.util.Properties;
22+
import org.apache.kafka.clients.consumer.ConsumerRecord;
23+
import org.apache.kafka.clients.producer.KafkaProducer;
24+
import org.apache.kafka.clients.producer.ProducerConfig;
25+
import org.apache.kafka.clients.producer.ProducerRecord;
26+
import org.apache.kafka.common.serialization.ByteArraySerializer;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
class DeserFailureProducer implements Serializable {
31+
32+
private static final Logger LOG = LoggerFactory.getLogger(DeserFailureProducer.class);
33+
private static final long serialVersionUID = 1L;
34+
35+
private final String topic;
36+
private final Properties producerProps;
37+
38+
private transient KafkaProducer<byte[], byte[]> producer;
39+
40+
DeserFailureProducer(String topic, Properties consumerProps) {
41+
this.topic = checkNotNull(topic);
42+
43+
producerProps = new Properties();
44+
producerProps.putAll(consumerProps);
45+
producerProps.setProperty(
46+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
47+
producerProps.setProperty(
48+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
49+
}
50+
51+
private void init() {
52+
producer = new KafkaProducer<>(producerProps);
53+
}
54+
55+
void send(ConsumerRecord<byte[], byte[]> record) {
56+
if (producer == null) {
57+
init();
58+
}
59+
60+
if (record == null) {
61+
LOG.info("Unable to send deserialization failed record: Record was null.");
62+
} else {
63+
producer.send(
64+
new ProducerRecord<>(topic, null, null, record.key(), record.value(), record.headers()));
65+
}
66+
}
67+
68+
public String getTopic() {
69+
return topic;
70+
}
71+
}

0 commit comments

Comments
 (0)