Skip to content

Commit e855980

Browse files
committed
Add deserialization failure handler implementation
1 parent fb2711f commit e855980

8 files changed

+316
-11
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.streaming.connectors.kafka.table;
20+
21+
import org.apache.flink.configuration.ReadableConfig;
22+
import org.apache.kafka.clients.consumer.ConsumerRecord;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
import javax.annotation.Nullable;
27+
import java.io.IOException;
28+
import java.io.Serializable;
29+
import java.util.Properties;
30+
31+
import static org.apache.flink.streaming.connectors.kafka.table.DeserFailureHandlerOptions.SCAN_DESER_FAILURE_HANDLER;
32+
import static org.apache.flink.streaming.connectors.kafka.table.DeserFailureHandlerOptions.SCAN_DESER_FAILURE_TOPIC;
33+
34+
public class DeserFailureHandler {
35+
36+
private static final Logger LOG = LoggerFactory.getLogger(DeserFailureHandler.class);
37+
38+
private final DeserFailureHandlerType handlerType;
39+
private final @Nullable DeserFailureProducer producer;
40+
41+
DeserFailureHandler(DeserFailureHandlerType handlerType, @Nullable DeserFailureProducer producer) {
42+
this.handlerType = handlerType;
43+
this.producer = producer;
44+
}
45+
46+
static DeserFailureHandler of(ReadableConfig tableOptions, Properties consumerProps) {
47+
DeserFailureHandlerType handlerType = tableOptions.get(SCAN_DESER_FAILURE_HANDLER);
48+
49+
DeserFailureProducer producer =
50+
handlerType == DeserFailureHandlerType.KAFKA
51+
? new DeserFailureProducer(tableOptions.get(SCAN_DESER_FAILURE_TOPIC), consumerProps)
52+
: null;
53+
54+
return new DeserFailureHandler(handlerType, producer);
55+
}
56+
57+
void deserWithFailureHandling(ConsumerRecord<byte[], byte[]> record, DeserializationCaller deser)
58+
throws IOException {
59+
60+
try {
61+
deser.call();
62+
} catch (IOException e) {
63+
if (DeserFailureHandlerType.NONE == handlerType) {
64+
throw e;
65+
66+
} else if (DeserFailureHandlerType.LOG == handlerType) {
67+
LOG.info(
68+
"Deserialization failure occurred for record. Topic: {}, Partition: {}, Offset: {}",
69+
record.topic(),
70+
record.partition(),
71+
record.offset());
72+
73+
} else if (DeserFailureHandlerType.KAFKA == handlerType) {
74+
LOG.info(
75+
"Deserialization failure occurred for record, sending it to the configured topic ({}). Topic: {}, Partition: {}, Offset: {}",
76+
producer.getTopic(),
77+
record.topic(),
78+
record.partition(),
79+
record.offset());
80+
producer.send(record);
81+
}
82+
}
83+
}
84+
85+
interface DeserializationCaller extends Serializable {
86+
void call() throws IOException;
87+
}
88+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.streaming.connectors.kafka.table;
20+
21+
import org.apache.flink.configuration.ConfigOption;
22+
import org.apache.flink.configuration.ConfigOptions;
23+
import org.apache.flink.configuration.ReadableConfig;
24+
import org.apache.flink.table.api.ValidationException;
25+
import org.apache.flink.util.StringUtils;
26+
27+
public class DeserFailureHandlerOptions {
28+
29+
public static final ConfigOption<DeserFailureHandlerType> SCAN_DESER_FAILURE_HANDLER =
30+
ConfigOptions.key("scan.deser-failure.handler")
31+
.enumType(DeserFailureHandlerType.class)
32+
.defaultValue(DeserFailureHandlerType.NONE);
33+
34+
public static final ConfigOption<String> SCAN_DESER_FAILURE_TOPIC =
35+
ConfigOptions.key("scan.deser-failure.topic")
36+
.stringType()
37+
.noDefaultValue();
38+
39+
public static void validateDeserFailureHandlerOptions(ReadableConfig tableOptions) {
40+
var handler = tableOptions.get(SCAN_DESER_FAILURE_HANDLER);
41+
var topic = tableOptions.get(SCAN_DESER_FAILURE_TOPIC);
42+
43+
if (handler == DeserFailureHandlerType.KAFKA && StringUtils.isNullOrWhitespaceOnly(topic)) {
44+
throw new ValidationException(
45+
String.format(
46+
"'%s' is set to '%s', but '%s' is not specified.",
47+
SCAN_DESER_FAILURE_HANDLER.key(),
48+
DeserFailureHandlerType.KAFKA,
49+
SCAN_DESER_FAILURE_TOPIC.key()));
50+
}
51+
52+
if (handler != DeserFailureHandlerType.KAFKA && !StringUtils.isNullOrWhitespaceOnly(topic)) {
53+
throw new ValidationException(
54+
String.format(
55+
"'%s' is not set to '%s', but '%s' is specified.",
56+
SCAN_DESER_FAILURE_HANDLER.key(),
57+
DeserFailureHandlerType.KAFKA,
58+
SCAN_DESER_FAILURE_TOPIC.key()));
59+
}
60+
}
61+
62+
private DeserFailureHandlerOptions() {}
63+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.streaming.connectors.kafka.table;
20+
21+
public enum DeserFailureHandlerType {
22+
/**
23+
* No deserialization failure handling is applied.
24+
* In case of a problematic record, the application will fail.
25+
* This is the default setting.
26+
*/
27+
NONE,
28+
29+
/**
30+
* In case of a problematic record, helpful information will be logged.
31+
* The application continues the execution.
32+
*/
33+
LOG,
34+
35+
/**
36+
* In case of a problematic record, helpful information will be logged,
37+
* and the record will be sent to a configured Kafka topic as well.
38+
* The application continues the execution.
39+
*/
40+
KAFKA
41+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.streaming.connectors.kafka.table;
20+
21+
import org.apache.kafka.clients.consumer.ConsumerRecord;
22+
import org.apache.kafka.clients.producer.KafkaProducer;
23+
import org.apache.kafka.clients.producer.ProducerConfig;
24+
import org.apache.kafka.clients.producer.ProducerRecord;
25+
import org.apache.kafka.common.serialization.ByteArraySerializer;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
import java.io.Serializable;
30+
import java.util.Properties;
31+
32+
import static org.apache.flink.util.Preconditions.checkNotNull;
33+
34+
class DeserFailureProducer implements Serializable {
35+
36+
private static final Logger LOG = LoggerFactory.getLogger(DeserFailureProducer.class);
37+
38+
private final String topic;
39+
private final Properties producerProps;
40+
41+
private transient KafkaProducer<byte[], byte[]> kafkaProducer;
42+
43+
DeserFailureProducer(String topic, Properties consumerProps) {
44+
this.topic = checkNotNull(topic);
45+
46+
producerProps = new Properties();
47+
producerProps.putAll(consumerProps);
48+
producerProps.setProperty(
49+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
50+
producerProps.setProperty(
51+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
52+
}
53+
54+
private void init() {
55+
if (kafkaProducer == null) {
56+
LOG.debug("Initializing deserialization failure producer.");
57+
kafkaProducer = new KafkaProducer<>(producerProps);
58+
}
59+
}
60+
61+
void send(ConsumerRecord<byte[], byte[]> record) {
62+
init();
63+
64+
if (record == null) {
65+
LOG.info("Unable to send deserialization failed record: Record was null.");
66+
} else if (kafkaProducer == null) {
67+
LOG.warn("Unable to send deserialization failed record: Kafka producer is not initialized.");
68+
} else {
69+
kafkaProducer.send(
70+
new ProducerRecord<>(
71+
topic,
72+
null,
73+
null,
74+
record.key(),
75+
record.value(),
76+
record.headers()));
77+
}
78+
}
79+
80+
public String getTopic() {
81+
return topic;
82+
}
83+
}

flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
5656

5757
private final boolean upsertMode;
5858

59+
private final DeserFailureHandler deserFailureHandler;
60+
5961
DynamicKafkaDeserializationSchema(
6062
int physicalArity,
6163
@Nullable DeserializationSchema<RowData> keyDeserialization,
@@ -65,7 +67,8 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
6567
boolean hasMetadata,
6668
MetadataConverter[] metadataConverters,
6769
TypeInformation<RowData> producedTypeInfo,
68-
boolean upsertMode) {
70+
boolean upsertMode,
71+
DeserFailureHandler deserFailureHandler) {
6972
if (upsertMode) {
7073
Preconditions.checkArgument(
7174
keyDeserialization != null && keyProjection.length > 0,
@@ -84,6 +87,7 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
8487
upsertMode);
8588
this.producedTypeInfo = producedTypeInfo;
8689
this.upsertMode = upsertMode;
90+
this.deserFailureHandler = deserFailureHandler;
8791
}
8892

8993
@Override
@@ -110,13 +114,15 @@ public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData
110114
// shortcut in case no output projection is required,
111115
// also not for a cartesian product with the keys
112116
if (keyDeserialization == null && !hasMetadata) {
113-
valueDeserialization.deserialize(record.value(), collector);
117+
deserFailureHandler.deserWithFailureHandling(
118+
record, () -> valueDeserialization.deserialize(record.value(), collector));
114119
return;
115120
}
116121

117122
// buffer key(s)
118123
if (keyDeserialization != null) {
119-
keyDeserialization.deserialize(record.key(), keyCollector);
124+
deserFailureHandler.deserWithFailureHandling(
125+
record, () -> keyDeserialization.deserialize(record.key(), keyCollector));
120126
}
121127

122128
// project output while emitting values
@@ -127,7 +133,8 @@ public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData
127133
// collect tombstone messages in upsert mode by hand
128134
outputCollector.collect(null);
129135
} else {
130-
valueDeserialization.deserialize(record.value(), outputCollector);
136+
deserFailureHandler.deserWithFailureHandling(
137+
record, () -> valueDeserialization.deserialize(record.value(), outputCollector));
131138
}
132139
keyCollector.buffer.clear();
133140
}

flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,8 @@ public class KafkaDynamicSource
171171

172172
protected final String tableIdentifier;
173173

174+
protected final DeserFailureHandler deserFailureHandler;
175+
174176
public KafkaDynamicSource(
175177
DataType physicalDataType,
176178
@Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
@@ -188,7 +190,8 @@ public KafkaDynamicSource(
188190
Map<KafkaTopicPartition, Long> specificBoundedOffsets,
189191
long boundedTimestampMillis,
190192
boolean upsertMode,
191-
String tableIdentifier) {
193+
String tableIdentifier,
194+
DeserFailureHandler deserFailureHandler) {
192195
// Format attributes
193196
this.physicalDataType =
194197
Preconditions.checkNotNull(
@@ -228,6 +231,7 @@ public KafkaDynamicSource(
228231
this.boundedTimestampMillis = boundedTimestampMillis;
229232
this.upsertMode = upsertMode;
230233
this.tableIdentifier = tableIdentifier;
234+
this.deserFailureHandler = deserFailureHandler;
231235
}
232236

233237
@Override
@@ -344,7 +348,8 @@ public DynamicTableSource copy() {
344348
specificBoundedOffsets,
345349
boundedTimestampMillis,
346350
upsertMode,
347-
tableIdentifier);
351+
tableIdentifier,
352+
deserFailureHandler);
348353
copy.producedDataType = producedDataType;
349354
copy.metadataKeys = metadataKeys;
350355
copy.watermarkStrategy = watermarkStrategy;
@@ -550,7 +555,8 @@ private KafkaDeserializationSchema<RowData> createKafkaDeserializationSchema(
550555
hasMetadata,
551556
metadataConverters,
552557
producedTypeInfo,
553-
upsertMode);
558+
upsertMode,
559+
deserFailureHandler);
554560
}
555561

556562
private @Nullable DeserializationSchema<RowData> createDeserialization(

0 commit comments

Comments
 (0)