Skip to content

Commit bfd4438

Browse files
committed
Add deserialization handling skeleton
1 parent c0b96c5 commit bfd4438

File tree

4 files changed

+97
-35
lines changed

4 files changed

+97
-35
lines changed
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package org.apache.flink.streaming.connectors.kafka.table;
2+
3+
public enum DeserFailureHandler {
4+
/**
5+
* No deserialization failure handling is applied.
6+
* In case of a problematic record, the application will fail.
7+
* This is the default setting.
8+
*/
9+
NONE,
10+
LOG,
11+
KAFKA
12+
}

src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
import javax.annotation.Nullable;
3535

36+
import java.io.IOException;
3637
import java.io.Serializable;
3738
import java.util.ArrayList;
3839
import java.util.List;
@@ -56,6 +57,10 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
5657

5758
private final boolean upsertMode;
5859

60+
private final DeserFailureHandler deserFailureHandler;
61+
62+
private final @Nullable Object deserFailureTarget;
63+
5964
DynamicKafkaDeserializationSchema(
6065
int physicalArity,
6166
@Nullable DeserializationSchema<RowData> keyDeserialization,
@@ -65,7 +70,9 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
6570
boolean hasMetadata,
6671
MetadataConverter[] metadataConverters,
6772
TypeInformation<RowData> producedTypeInfo,
68-
boolean upsertMode) {
73+
boolean upsertMode,
74+
DeserFailureHandler deserFailureHandler,
75+
@Nullable Object deserFailureTarget) {
6976
if (upsertMode) {
7077
Preconditions.checkArgument(
7178
keyDeserialization != null && keyProjection.length > 0,
@@ -84,6 +91,8 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
8491
upsertMode);
8592
this.producedTypeInfo = producedTypeInfo;
8693
this.upsertMode = upsertMode;
94+
this.deserFailureHandler = deserFailureHandler;
95+
this.deserFailureTarget = deserFailureTarget;
8796
}
8897

8998
@Override
@@ -110,13 +119,13 @@ public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData
110119
// shortcut in case no output projection is required,
111120
// also not for a cartesian product with the keys
112121
if (keyDeserialization == null && !hasMetadata) {
113-
valueDeserialization.deserialize(record.value(), collector);
122+
deserWithFailureHandling(record, () -> valueDeserialization.deserialize(record.value(), collector));
114123
return;
115124
}
116125

117126
// buffer key(s)
118127
if (keyDeserialization != null) {
119-
keyDeserialization.deserialize(record.key(), keyCollector);
128+
deserWithFailureHandling(record, () -> keyDeserialization.deserialize(record.key(), keyCollector));
120129
}
121130

122131
// project output while emitting values
@@ -127,7 +136,7 @@ public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData
127136
// collect tombstone messages in upsert mode by hand
128137
outputCollector.collect(null);
129138
} else {
130-
valueDeserialization.deserialize(record.value(), outputCollector);
139+
deserWithFailureHandling(record, () -> valueDeserialization.deserialize(record.value(), outputCollector));
131140
}
132141
keyCollector.buffer.clear();
133142
}
@@ -137,6 +146,25 @@ public TypeInformation<RowData> getProducedType() {
137146
return producedTypeInfo;
138147
}
139148

149+
void deserWithFailureHandling(ConsumerRecord<byte[], byte[]> record, DeserializationCaller deser)
150+
throws IOException {
151+
try {
152+
deser.call();
153+
} catch (IOException e) {
154+
if (DeserFailureHandler.NONE == deserFailureHandler) {
155+
throw e;
156+
} else if (DeserFailureHandler.LOG == deserFailureHandler) {
157+
// todo log the record
158+
} else if (DeserFailureHandler.KAFKA == deserFailureHandler) {
159+
// todo send record via Kafka producer
160+
}
161+
}
162+
}
163+
164+
private interface DeserializationCaller extends Serializable {
165+
void call() throws IOException;
166+
}
167+
140168
// --------------------------------------------------------------------------------------------
141169

142170
interface MetadataConverter extends Serializable {

src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,10 @@ public class KafkaDynamicSource
171171

172172
protected final String tableIdentifier;
173173

174+
protected final DeserFailureHandler deserFailureHandler;
175+
176+
protected final @Nullable Object deserFailureTarget;
177+
174178
public KafkaDynamicSource(
175179
DataType physicalDataType,
176180
@Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
@@ -188,7 +192,9 @@ public KafkaDynamicSource(
188192
Map<KafkaTopicPartition, Long> specificBoundedOffsets,
189193
long boundedTimestampMillis,
190194
boolean upsertMode,
191-
String tableIdentifier) {
195+
String tableIdentifier,
196+
DeserFailureHandler deserFailureHandler,
197+
@Nullable Object deserFailureTarget) {
192198
// Format attributes
193199
this.physicalDataType =
194200
Preconditions.checkNotNull(
@@ -228,6 +234,8 @@ public KafkaDynamicSource(
228234
this.boundedTimestampMillis = boundedTimestampMillis;
229235
this.upsertMode = upsertMode;
230236
this.tableIdentifier = tableIdentifier;
237+
this.deserFailureHandler = deserFailureHandler;
238+
this.deserFailureTarget = deserFailureTarget;
231239
}
232240

233241
@Override
@@ -344,7 +352,9 @@ public DynamicTableSource copy() {
344352
specificBoundedOffsets,
345353
boundedTimestampMillis,
346354
upsertMode,
347-
tableIdentifier);
355+
tableIdentifier,
356+
deserFailureHandler,
357+
deserFailureTarget);
348358
copy.producedDataType = producedDataType;
349359
copy.metadataKeys = metadataKeys;
350360
copy.watermarkStrategy = watermarkStrategy;
@@ -550,7 +560,9 @@ private KafkaDeserializationSchema<RowData> createKafkaDeserializationSchema(
550560
hasMetadata,
551561
metadataConverters,
552562
producedTypeInfo,
553-
upsertMode);
563+
upsertMode,
564+
deserFailureHandler,
565+
deserFailureTarget);
554566
}
555567

556568
private @Nullable DeserializationSchema<RowData> createDeserialization(

src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,18 @@ public class KafkaDynamicTableFactory
114114
.noDefaultValue()
115115
.withDescription("Optional semantic when committing.");
116116

117+
private static final ConfigOption<DeserFailureHandler> SCAN_DESER_FAILURE_HANDLER =
118+
ConfigOptions.key("scan.deser.failure-handler")
119+
.enumType(DeserFailureHandler.class)
120+
.defaultValue(DeserFailureHandler.NONE)
121+
.withDescription("asd");
122+
123+
private static final ConfigOption<String> SCAN_DESER_FAILURE_TOPIC =
124+
ConfigOptions.key("scan.deser.failure-topic")
125+
.stringType()
126+
.noDefaultValue()
127+
.withDescription("asd");
128+
117129
public static final String IDENTIFIER = "kafka";
118130

119131
@Override
@@ -152,6 +164,8 @@ public Set<ConfigOption<?>> optionalOptions() {
152164
options.add(SCAN_BOUNDED_MODE);
153165
options.add(SCAN_BOUNDED_SPECIFIC_OFFSETS);
154166
options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS);
167+
options.add(SCAN_DESER_FAILURE_HANDLER);
168+
options.add(SCAN_DESER_FAILURE_TOPIC);
155169
return options;
156170
}
157171

@@ -201,8 +215,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
201215
final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions());
202216

203217
// add topic-partition discovery
204-
final Duration partitionDiscoveryInterval =
205-
tableOptions.get(SCAN_TOPIC_PARTITION_DISCOVERY);
218+
final Duration partitionDiscoveryInterval = tableOptions.get(SCAN_TOPIC_PARTITION_DISCOVERY);
206219
properties.setProperty(
207220
KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
208221
Long.toString(partitionDiscoveryInterval.toMillis()));
@@ -215,6 +228,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
215228

216229
final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
217230

231+
final DeserFailureHandler deserFailureHandler = tableOptions.get(SCAN_DESER_FAILURE_HANDLER);
232+
218233
return createKafkaTableSource(
219234
physicalDataType,
220235
keyDecodingFormat.orElse(null),
@@ -231,14 +246,15 @@ public DynamicTableSource createDynamicTableSource(Context context) {
231246
boundedOptions.boundedMode,
232247
boundedOptions.specificOffsets,
233248
boundedOptions.boundedTimestampMillis,
234-
context.getObjectIdentifier().asSummaryString());
249+
context.getObjectIdentifier().asSummaryString(),
250+
deserFailureHandler,
251+
null);
235252
}
236253

237254
@Override
238255
public DynamicTableSink createDynamicTableSink(Context context) {
239256
final TableFactoryHelper helper =
240-
FactoryUtil.createTableFactoryHelper(
241-
this, autoCompleteSchemaRegistrySubject(context));
257+
FactoryUtil.createTableFactoryHelper(this, autoCompleteSchemaRegistrySubject(context));
242258

243259
final Optional<EncodingFormat<SerializationSchema<RowData>>> keyEncodingFormat =
244260
getKeyEncodingFormat(helper);
@@ -291,17 +307,15 @@ public DynamicTableSink createDynamicTableSink(Context context) {
291307
private static Optional<DecodingFormat<DeserializationSchema<RowData>>> getKeyDecodingFormat(
292308
TableFactoryHelper helper) {
293309
final Optional<DecodingFormat<DeserializationSchema<RowData>>> keyDecodingFormat =
294-
helper.discoverOptionalDecodingFormat(
295-
DeserializationFormatFactory.class, KEY_FORMAT);
310+
helper.discoverOptionalDecodingFormat(DeserializationFormatFactory.class, KEY_FORMAT);
296311
keyDecodingFormat.ifPresent(
297312
format -> {
298313
if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) {
299314
throw new ValidationException(
300315
String.format(
301316
"A key format should only deal with INSERT-only records. "
302317
+ "But %s has a changelog mode of %s.",
303-
helper.getOptions().get(KEY_FORMAT),
304-
format.getChangelogMode()));
318+
helper.getOptions().get(KEY_FORMAT), format.getChangelogMode()));
305319
}
306320
});
307321
return keyDecodingFormat;
@@ -318,45 +332,37 @@ private static Optional<EncodingFormat<SerializationSchema<RowData>>> getKeyEnco
318332
String.format(
319333
"A key format should only deal with INSERT-only records. "
320334
+ "But %s has a changelog mode of %s.",
321-
helper.getOptions().get(KEY_FORMAT),
322-
format.getChangelogMode()));
335+
helper.getOptions().get(KEY_FORMAT), format.getChangelogMode()));
323336
}
324337
});
325338
return keyEncodingFormat;
326339
}
327340

328341
private static DecodingFormat<DeserializationSchema<RowData>> getValueDecodingFormat(
329342
TableFactoryHelper helper) {
330-
return helper.discoverOptionalDecodingFormat(
331-
DeserializationFormatFactory.class, FactoryUtil.FORMAT)
343+
return helper
344+
.discoverOptionalDecodingFormat(DeserializationFormatFactory.class, FactoryUtil.FORMAT)
332345
.orElseGet(
333-
() ->
334-
helper.discoverDecodingFormat(
335-
DeserializationFormatFactory.class, VALUE_FORMAT));
346+
() -> helper.discoverDecodingFormat(DeserializationFormatFactory.class, VALUE_FORMAT));
336347
}
337348

338349
private static EncodingFormat<SerializationSchema<RowData>> getValueEncodingFormat(
339350
TableFactoryHelper helper) {
340-
return helper.discoverOptionalEncodingFormat(
341-
SerializationFormatFactory.class, FactoryUtil.FORMAT)
351+
return helper
352+
.discoverOptionalEncodingFormat(SerializationFormatFactory.class, FactoryUtil.FORMAT)
342353
.orElseGet(
343-
() ->
344-
helper.discoverEncodingFormat(
345-
SerializationFormatFactory.class, VALUE_FORMAT));
354+
() -> helper.discoverEncodingFormat(SerializationFormatFactory.class, VALUE_FORMAT));
346355
}
347356

348357
private static void validatePKConstraints(
349358
ObjectIdentifier tableName,
350359
int[] primaryKeyIndexes,
351360
Map<String, String> options,
352361
Format format) {
353-
if (primaryKeyIndexes.length > 0
354-
&& format.getChangelogMode().containsOnly(RowKind.INSERT)) {
362+
if (primaryKeyIndexes.length > 0 && format.getChangelogMode().containsOnly(RowKind.INSERT)) {
355363
Configuration configuration = Configuration.fromMap(options);
356364
String formatName =
357-
configuration
358-
.getOptional(FactoryUtil.FORMAT)
359-
.orElse(configuration.get(VALUE_FORMAT));
365+
configuration.getOptional(FactoryUtil.FORMAT).orElse(configuration.get(VALUE_FORMAT));
360366
throw new ValidationException(
361367
String.format(
362368
"The Kafka table '%s' with '%s' format doesn't support defining PRIMARY KEY constraint"
@@ -395,7 +401,9 @@ protected KafkaDynamicSource createKafkaTableSource(
395401
BoundedMode boundedMode,
396402
Map<KafkaTopicPartition, Long> specificEndOffsets,
397403
long endTimestampMillis,
398-
String tableIdentifier) {
404+
String tableIdentifier,
405+
DeserFailureHandler deserFailureHandler,
406+
@Nullable Object deserFailureTarget) {
399407
return new KafkaDynamicSource(
400408
physicalDataType,
401409
keyDecodingFormat,
@@ -413,7 +421,9 @@ protected KafkaDynamicSource createKafkaTableSource(
413421
specificEndOffsets,
414422
endTimestampMillis,
415423
false,
416-
tableIdentifier);
424+
tableIdentifier,
425+
deserFailureHandler,
426+
deserFailureTarget);
417427
}
418428

419429
protected KafkaDynamicSink createKafkaTableSink(

0 commit comments

Comments
 (0)