From fb2711fa08841d6294cba98f915b906f98e7144d Mon Sep 17 00:00:00 2001 From: Ferenc Csaky Date: Fri, 14 Mar 2025 18:07:52 +0100 Subject: [PATCH 1/2] Add required "flink-connector-kafka" classes locally without any modification Source state: https://github.com/apache/flink-connector-kafka/tree/3.2.0 --- flink-jar-runner/pom.xml | 15 + .../DynamicKafkaDeserializationSchema.java | 273 +++++++ .../kafka/table/KafkaDynamicSource.java | 678 ++++++++++++++++++ .../kafka/table/KafkaDynamicTableFactory.java | 449 ++++++++++++ .../table/UpsertKafkaDynamicTableFactory.java | 427 +++++++++++ pom.xml | 1 + 6 files changed, 1843 insertions(+) create mode 100644 flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java create mode 100644 flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java create mode 100644 flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java create mode 100644 flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java diff --git a/flink-jar-runner/pom.xml b/flink-jar-runner/pom.xml index ca47924..9ab9ff1 100644 --- a/flink-jar-runner/pom.xml +++ b/flink-jar-runner/pom.xml @@ -30,6 +30,12 @@ 2024 + + org.apache.flink + flink-connector-base + ${flink.version} + provided + org.apache.flink flink-streaming-java @@ -251,6 +257,15 @@ + + org.apache.flink:flink-connector-kafka + + org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema** + org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource** + org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory** + org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory** + + diff --git a/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java b/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java new file mode 100644 index 0000000..ba26a1e --- /dev/null +++ b/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.table; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; +import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.types.DeserializationException; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +/** A specific {@link KafkaSerializationSchema} for {@link KafkaDynamicSource}. */ +class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema { + + private static final long serialVersionUID = 1L; + + private final @Nullable DeserializationSchema keyDeserialization; + + private final DeserializationSchema valueDeserialization; + + private final boolean hasMetadata; + + private final BufferingCollector keyCollector; + + private final OutputProjectionCollector outputCollector; + + private final TypeInformation producedTypeInfo; + + private final boolean upsertMode; + + DynamicKafkaDeserializationSchema( + int physicalArity, + @Nullable DeserializationSchema keyDeserialization, + int[] keyProjection, + DeserializationSchema valueDeserialization, + int[] valueProjection, + boolean hasMetadata, + MetadataConverter[] metadataConverters, + TypeInformation producedTypeInfo, + boolean upsertMode) { + if (upsertMode) { + Preconditions.checkArgument( + keyDeserialization != null && keyProjection.length > 0, + "Key must be set in upsert mode for deserialization schema."); + } + this.keyDeserialization = keyDeserialization; + this.valueDeserialization = valueDeserialization; + this.hasMetadata = hasMetadata; + this.keyCollector = new BufferingCollector(); + this.outputCollector = + new OutputProjectionCollector( + physicalArity, + keyProjection, + valueProjection, + metadataConverters, + upsertMode); + this.producedTypeInfo = producedTypeInfo; + this.upsertMode = upsertMode; + } + + @Override + public void open(DeserializationSchema.InitializationContext context) throws Exception { + if (keyDeserialization != null) { + keyDeserialization.open(context); + } + valueDeserialization.open(context); + } + + @Override + public boolean isEndOfStream(RowData nextElement) { + return false; + } + + @Override + public RowData deserialize(ConsumerRecord record) throws Exception { + throw new IllegalStateException("A collector is required for deserializing."); + } + + @Override + public void deserialize(ConsumerRecord record, Collector collector) + throws Exception { + // shortcut in case no output projection is required, + // also not for a cartesian product with the keys + if (keyDeserialization == null && !hasMetadata) { + valueDeserialization.deserialize(record.value(), collector); + return; + } + + // buffer key(s) + if (keyDeserialization != null) { + keyDeserialization.deserialize(record.key(), keyCollector); + } + + // project output while emitting values + outputCollector.inputRecord = record; + outputCollector.physicalKeyRows = keyCollector.buffer; + outputCollector.outputCollector = collector; + if (record.value() == null && upsertMode) { + // collect tombstone messages in upsert mode by hand + outputCollector.collect(null); + } else { + valueDeserialization.deserialize(record.value(), outputCollector); + } + keyCollector.buffer.clear(); + } + + @Override + public TypeInformation getProducedType() { + return producedTypeInfo; + } + + // -------------------------------------------------------------------------------------------- + + interface MetadataConverter extends Serializable { + Object read(ConsumerRecord record); + } + + // -------------------------------------------------------------------------------------------- + + private static final class BufferingCollector implements Collector, Serializable { + + private static final long serialVersionUID = 1L; + + private final List buffer = new ArrayList<>(); + + @Override + public void collect(RowData record) { + buffer.add(record); + } + + @Override + public void close() { + // nothing to do + } + } + + // -------------------------------------------------------------------------------------------- + + /** + * Emits a row with key, value, and metadata fields. + * + *

The collector is able to handle the following kinds of keys: + * + *

    + *
  • No key is used. + *
  • A key is used. + *
  • The deserialization schema emits multiple keys. + *
  • Keys and values have overlapping fields. + *
  • Keys are used and value is null. + *
+ */ + private static final class OutputProjectionCollector + implements Collector, Serializable { + + private static final long serialVersionUID = 1L; + + private final int physicalArity; + + private final int[] keyProjection; + + private final int[] valueProjection; + + private final MetadataConverter[] metadataConverters; + + private final boolean upsertMode; + + private transient ConsumerRecord inputRecord; + + private transient List physicalKeyRows; + + private transient Collector outputCollector; + + OutputProjectionCollector( + int physicalArity, + int[] keyProjection, + int[] valueProjection, + MetadataConverter[] metadataConverters, + boolean upsertMode) { + this.physicalArity = physicalArity; + this.keyProjection = keyProjection; + this.valueProjection = valueProjection; + this.metadataConverters = metadataConverters; + this.upsertMode = upsertMode; + } + + @Override + public void collect(RowData physicalValueRow) { + // no key defined + if (keyProjection.length == 0) { + emitRow(null, (GenericRowData) physicalValueRow); + return; + } + + // otherwise emit a value for each key + for (RowData physicalKeyRow : physicalKeyRows) { + emitRow((GenericRowData) physicalKeyRow, (GenericRowData) physicalValueRow); + } + } + + @Override + public void close() { + // nothing to do + } + + private void emitRow( + @Nullable GenericRowData physicalKeyRow, + @Nullable GenericRowData physicalValueRow) { + final RowKind rowKind; + if (physicalValueRow == null) { + if (upsertMode) { + rowKind = RowKind.DELETE; + } else { + throw new DeserializationException( + "Invalid null value received in non-upsert mode. Could not to set row kind for output record."); + } + } else { + rowKind = physicalValueRow.getRowKind(); + } + + final int metadataArity = metadataConverters.length; + final GenericRowData producedRow = + new GenericRowData(rowKind, physicalArity + metadataArity); + + for (int keyPos = 0; keyPos < keyProjection.length; keyPos++) { + assert physicalKeyRow != null; + producedRow.setField(keyProjection[keyPos], physicalKeyRow.getField(keyPos)); + } + + if (physicalValueRow != null) { + for (int valuePos = 0; valuePos < valueProjection.length; valuePos++) { + producedRow.setField( + valueProjection[valuePos], physicalValueRow.getField(valuePos)); + } + } + + for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) { + producedRow.setField( + physicalArity + metadataPos, + metadataConverters[metadataPos].read(inputRecord)); + } + + outputCollector.collect(producedRow); + } + } +} diff --git a/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java b/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java new file mode 100644 index 0000000..5123064 --- /dev/null +++ b/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java @@ -0,0 +1,678 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.connector.kafka.source.KafkaSource; +import org.apache.flink.connector.kafka.source.KafkaSourceBuilder; +import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; +import org.apache.flink.streaming.connectors.kafka.config.BoundedMode; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.MetadataConverter; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.Projection; +import org.apache.flink.table.connector.ProviderContext; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.source.DataStreamScanProvider; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; +import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.utils.DataTypeUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Header; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** A version-agnostic Kafka {@link ScanTableSource}. */ +@Internal +public class KafkaDynamicSource + implements ScanTableSource, SupportsReadingMetadata, SupportsWatermarkPushDown { + + private static final String KAFKA_TRANSFORMATION = "kafka"; + + // -------------------------------------------------------------------------------------------- + // Mutable attributes + // -------------------------------------------------------------------------------------------- + + /** Data type that describes the final output of the source. */ + protected DataType producedDataType; + + /** Metadata that is appended at the end of a physical source row. */ + protected List metadataKeys; + + /** Watermark strategy that is used to generate per-partition watermark. */ + protected @Nullable WatermarkStrategy watermarkStrategy; + + // -------------------------------------------------------------------------------------------- + // Format attributes + // -------------------------------------------------------------------------------------------- + + private static final String VALUE_METADATA_PREFIX = "value."; + + /** Data type to configure the formats. */ + protected final DataType physicalDataType; + + /** Optional format for decoding keys from Kafka. */ + protected final @Nullable DecodingFormat> keyDecodingFormat; + + /** Format for decoding values from Kafka. */ + protected final DecodingFormat> valueDecodingFormat; + + /** Indices that determine the key fields and the target position in the produced row. */ + protected final int[] keyProjection; + + /** Indices that determine the value fields and the target position in the produced row. */ + protected final int[] valueProjection; + + /** Prefix that needs to be removed from fields when constructing the physical data type. */ + protected final @Nullable String keyPrefix; + + // -------------------------------------------------------------------------------------------- + // Kafka-specific attributes + // -------------------------------------------------------------------------------------------- + + /** The Kafka topics to consume. */ + protected final List topics; + + /** The Kafka topic pattern to consume. */ + protected final Pattern topicPattern; + + /** Properties for the Kafka consumer. */ + protected final Properties properties; + + /** + * The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). + */ + protected final StartupMode startupMode; + + /** + * Specific startup offsets; only relevant when startup mode is {@link + * StartupMode#SPECIFIC_OFFSETS}. + */ + protected final Map specificStartupOffsets; + + /** + * The start timestamp to locate partition offsets; only relevant when startup mode is {@link + * StartupMode#TIMESTAMP}. + */ + protected final long startupTimestampMillis; + + /** The bounded mode for the contained consumer (default is an unbounded data stream). */ + protected final BoundedMode boundedMode; + + /** + * Specific end offsets; only relevant when bounded mode is {@link + * BoundedMode#SPECIFIC_OFFSETS}. + */ + protected final Map specificBoundedOffsets; + + /** + * The bounded timestamp to locate partition offsets; only relevant when bounded mode is {@link + * BoundedMode#TIMESTAMP}. + */ + protected final long boundedTimestampMillis; + + /** Flag to determine source mode. In upsert mode, it will keep the tombstone message. * */ + protected final boolean upsertMode; + + protected final String tableIdentifier; + + public KafkaDynamicSource( + DataType physicalDataType, + @Nullable DecodingFormat> keyDecodingFormat, + DecodingFormat> valueDecodingFormat, + int[] keyProjection, + int[] valueProjection, + @Nullable String keyPrefix, + @Nullable List topics, + @Nullable Pattern topicPattern, + Properties properties, + StartupMode startupMode, + Map specificStartupOffsets, + long startupTimestampMillis, + BoundedMode boundedMode, + Map specificBoundedOffsets, + long boundedTimestampMillis, + boolean upsertMode, + String tableIdentifier) { + // Format attributes + this.physicalDataType = + Preconditions.checkNotNull( + physicalDataType, "Physical data type must not be null."); + this.keyDecodingFormat = keyDecodingFormat; + this.valueDecodingFormat = + Preconditions.checkNotNull( + valueDecodingFormat, "Value decoding format must not be null."); + this.keyProjection = + Preconditions.checkNotNull(keyProjection, "Key projection must not be null."); + this.valueProjection = + Preconditions.checkNotNull(valueProjection, "Value projection must not be null."); + this.keyPrefix = keyPrefix; + // Mutable attributes + this.producedDataType = physicalDataType; + this.metadataKeys = Collections.emptyList(); + this.watermarkStrategy = null; + // Kafka-specific attributes + Preconditions.checkArgument( + (topics != null && topicPattern == null) + || (topics == null && topicPattern != null), + "Either Topic or Topic Pattern must be set for source."); + this.topics = topics; + this.topicPattern = topicPattern; + this.properties = Preconditions.checkNotNull(properties, "Properties must not be null."); + this.startupMode = + Preconditions.checkNotNull(startupMode, "Startup mode must not be null."); + this.specificStartupOffsets = + Preconditions.checkNotNull( + specificStartupOffsets, "Specific offsets must not be null."); + this.startupTimestampMillis = startupTimestampMillis; + this.boundedMode = + Preconditions.checkNotNull(boundedMode, "Bounded mode must not be null."); + this.specificBoundedOffsets = + Preconditions.checkNotNull( + specificBoundedOffsets, "Specific bounded offsets must not be null."); + this.boundedTimestampMillis = boundedTimestampMillis; + this.upsertMode = upsertMode; + this.tableIdentifier = tableIdentifier; + } + + @Override + public ChangelogMode getChangelogMode() { + return valueDecodingFormat.getChangelogMode(); + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) { + final DeserializationSchema keyDeserialization = + createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix); + + final DeserializationSchema valueDeserialization = + createDeserialization(context, valueDecodingFormat, valueProjection, null); + + final TypeInformation producedTypeInfo = + context.createTypeInformation(producedDataType); + + final KafkaSource kafkaSource = + createKafkaSource(keyDeserialization, valueDeserialization, producedTypeInfo); + + return new DataStreamScanProvider() { + @Override + public DataStream produceDataStream( + ProviderContext providerContext, StreamExecutionEnvironment execEnv) { + if (watermarkStrategy == null) { + watermarkStrategy = WatermarkStrategy.noWatermarks(); + } + DataStreamSource sourceStream = + execEnv.fromSource( + kafkaSource, watermarkStrategy, "KafkaSource-" + tableIdentifier); + providerContext.generateUid(KAFKA_TRANSFORMATION).ifPresent(sourceStream::uid); + return sourceStream; + } + + @Override + public boolean isBounded() { + return kafkaSource.getBoundedness() == Boundedness.BOUNDED; + } + }; + } + + @Override + public Map listReadableMetadata() { + final Map metadataMap = new LinkedHashMap<>(); + + // according to convention, the order of the final row must be + // PHYSICAL + FORMAT METADATA + CONNECTOR METADATA + // where the format metadata has highest precedence + + // add value format metadata with prefix + valueDecodingFormat + .listReadableMetadata() + .forEach((key, value) -> metadataMap.put(VALUE_METADATA_PREFIX + key, value)); + + // add connector metadata + Stream.of(ReadableMetadata.values()) + .forEachOrdered(m -> metadataMap.putIfAbsent(m.key, m.dataType)); + + return metadataMap; + } + + @Override + public void applyReadableMetadata(List metadataKeys, DataType producedDataType) { + // separate connector and format metadata + final List formatMetadataKeys = + metadataKeys.stream() + .filter(k -> k.startsWith(VALUE_METADATA_PREFIX)) + .collect(Collectors.toList()); + final List connectorMetadataKeys = new ArrayList<>(metadataKeys); + connectorMetadataKeys.removeAll(formatMetadataKeys); + + // push down format metadata + final Map formatMetadata = valueDecodingFormat.listReadableMetadata(); + if (formatMetadata.size() > 0) { + final List requestedFormatMetadataKeys = + formatMetadataKeys.stream() + .map(k -> k.substring(VALUE_METADATA_PREFIX.length())) + .collect(Collectors.toList()); + valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys); + } + + this.metadataKeys = connectorMetadataKeys; + this.producedDataType = producedDataType; + } + + @Override + public boolean supportsMetadataProjection() { + return false; + } + + @Override + public void applyWatermark(WatermarkStrategy watermarkStrategy) { + this.watermarkStrategy = watermarkStrategy; + } + + @Override + public DynamicTableSource copy() { + final KafkaDynamicSource copy = + new KafkaDynamicSource( + physicalDataType, + keyDecodingFormat, + valueDecodingFormat, + keyProjection, + valueProjection, + keyPrefix, + topics, + topicPattern, + properties, + startupMode, + specificStartupOffsets, + startupTimestampMillis, + boundedMode, + specificBoundedOffsets, + boundedTimestampMillis, + upsertMode, + tableIdentifier); + copy.producedDataType = producedDataType; + copy.metadataKeys = metadataKeys; + copy.watermarkStrategy = watermarkStrategy; + return copy; + } + + @Override + public String asSummaryString() { + return "Kafka table source"; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final KafkaDynamicSource that = (KafkaDynamicSource) o; + return Objects.equals(producedDataType, that.producedDataType) + && Objects.equals(metadataKeys, that.metadataKeys) + && Objects.equals(physicalDataType, that.physicalDataType) + && Objects.equals(keyDecodingFormat, that.keyDecodingFormat) + && Objects.equals(valueDecodingFormat, that.valueDecodingFormat) + && Arrays.equals(keyProjection, that.keyProjection) + && Arrays.equals(valueProjection, that.valueProjection) + && Objects.equals(keyPrefix, that.keyPrefix) + && Objects.equals(topics, that.topics) + && Objects.equals(String.valueOf(topicPattern), String.valueOf(that.topicPattern)) + && Objects.equals(properties, that.properties) + && startupMode == that.startupMode + && Objects.equals(specificStartupOffsets, that.specificStartupOffsets) + && startupTimestampMillis == that.startupTimestampMillis + && boundedMode == that.boundedMode + && Objects.equals(specificBoundedOffsets, that.specificBoundedOffsets) + && boundedTimestampMillis == that.boundedTimestampMillis + && Objects.equals(upsertMode, that.upsertMode) + && Objects.equals(tableIdentifier, that.tableIdentifier) + && Objects.equals(watermarkStrategy, that.watermarkStrategy); + } + + @Override + public int hashCode() { + return Objects.hash( + producedDataType, + metadataKeys, + physicalDataType, + keyDecodingFormat, + valueDecodingFormat, + Arrays.hashCode(keyProjection), + Arrays.hashCode(valueProjection), + keyPrefix, + topics, + topicPattern, + properties, + startupMode, + specificStartupOffsets, + startupTimestampMillis, + boundedMode, + specificBoundedOffsets, + boundedTimestampMillis, + upsertMode, + tableIdentifier, + watermarkStrategy); + } + + // -------------------------------------------------------------------------------------------- + + protected KafkaSource createKafkaSource( + DeserializationSchema keyDeserialization, + DeserializationSchema valueDeserialization, + TypeInformation producedTypeInfo) { + + final KafkaDeserializationSchema kafkaDeserializer = + createKafkaDeserializationSchema( + keyDeserialization, valueDeserialization, producedTypeInfo); + + final KafkaSourceBuilder kafkaSourceBuilder = KafkaSource.builder(); + + if (topics != null) { + kafkaSourceBuilder.setTopics(topics); + } else { + kafkaSourceBuilder.setTopicPattern(topicPattern); + } + + switch (startupMode) { + case EARLIEST: + kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.earliest()); + break; + case LATEST: + kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.latest()); + break; + case GROUP_OFFSETS: + String offsetResetConfig = + properties.getProperty( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + OffsetResetStrategy.NONE.name()); + OffsetResetStrategy offsetResetStrategy = getResetStrategy(offsetResetConfig); + kafkaSourceBuilder.setStartingOffsets( + OffsetsInitializer.committedOffsets(offsetResetStrategy)); + break; + case SPECIFIC_OFFSETS: + Map offsets = new HashMap<>(); + specificStartupOffsets.forEach( + (tp, offset) -> + offsets.put( + new TopicPartition(tp.getTopic(), tp.getPartition()), + offset)); + kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.offsets(offsets)); + break; + case TIMESTAMP: + kafkaSourceBuilder.setStartingOffsets( + OffsetsInitializer.timestamp(startupTimestampMillis)); + break; + } + + switch (boundedMode) { + case UNBOUNDED: + kafkaSourceBuilder.setUnbounded(new NoStoppingOffsetsInitializer()); + break; + case LATEST: + kafkaSourceBuilder.setBounded(OffsetsInitializer.latest()); + break; + case GROUP_OFFSETS: + kafkaSourceBuilder.setBounded(OffsetsInitializer.committedOffsets()); + break; + case SPECIFIC_OFFSETS: + Map offsets = new HashMap<>(); + specificBoundedOffsets.forEach( + (tp, offset) -> + offsets.put( + new TopicPartition(tp.getTopic(), tp.getPartition()), + offset)); + kafkaSourceBuilder.setBounded(OffsetsInitializer.offsets(offsets)); + break; + case TIMESTAMP: + kafkaSourceBuilder.setBounded(OffsetsInitializer.timestamp(boundedTimestampMillis)); + break; + } + + kafkaSourceBuilder + .setProperties(properties) + .setDeserializer(KafkaRecordDeserializationSchema.of(kafkaDeserializer)); + + return kafkaSourceBuilder.build(); + } + + private OffsetResetStrategy getResetStrategy(String offsetResetConfig) { + return Arrays.stream(OffsetResetStrategy.values()) + .filter(ors -> ors.name().equals(offsetResetConfig.toUpperCase(Locale.ROOT))) + .findAny() + .orElseThrow( + () -> + new IllegalArgumentException( + String.format( + "%s can not be set to %s. Valid values: [%s]", + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + offsetResetConfig, + Arrays.stream(OffsetResetStrategy.values()) + .map(Enum::name) + .map(String::toLowerCase) + .collect(Collectors.joining(","))))); + } + + private KafkaDeserializationSchema createKafkaDeserializationSchema( + DeserializationSchema keyDeserialization, + DeserializationSchema valueDeserialization, + TypeInformation producedTypeInfo) { + final MetadataConverter[] metadataConverters = + metadataKeys.stream() + .map( + k -> + Stream.of(ReadableMetadata.values()) + .filter(rm -> rm.key.equals(k)) + .findFirst() + .orElseThrow(IllegalStateException::new)) + .map(m -> m.converter) + .toArray(MetadataConverter[]::new); + + // check if connector metadata is used at all + final boolean hasMetadata = metadataKeys.size() > 0; + + // adjust physical arity with value format's metadata + final int adjustedPhysicalArity = + DataType.getFieldDataTypes(producedDataType).size() - metadataKeys.size(); + + // adjust value format projection to include value format's metadata columns at the end + final int[] adjustedValueProjection = + IntStream.concat( + IntStream.of(valueProjection), + IntStream.range( + keyProjection.length + valueProjection.length, + adjustedPhysicalArity)) + .toArray(); + + return new DynamicKafkaDeserializationSchema( + adjustedPhysicalArity, + keyDeserialization, + keyProjection, + valueDeserialization, + adjustedValueProjection, + hasMetadata, + metadataConverters, + producedTypeInfo, + upsertMode); + } + + private @Nullable DeserializationSchema createDeserialization( + DynamicTableSource.Context context, + @Nullable DecodingFormat> format, + int[] projection, + @Nullable String prefix) { + if (format == null) { + return null; + } + DataType physicalFormatDataType = Projection.of(projection).project(this.physicalDataType); + if (prefix != null) { + physicalFormatDataType = DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix); + } + return format.createRuntimeDecoder(context, physicalFormatDataType); + } + + // -------------------------------------------------------------------------------------------- + // Metadata handling + // -------------------------------------------------------------------------------------------- + + enum ReadableMetadata { + TOPIC( + "topic", + DataTypes.STRING().notNull(), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(ConsumerRecord record) { + return StringData.fromString(record.topic()); + } + }), + + PARTITION( + "partition", + DataTypes.INT().notNull(), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(ConsumerRecord record) { + return record.partition(); + } + }), + + HEADERS( + "headers", + // key and value of the map are nullable to make handling easier in queries + DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.BYTES().nullable()) + .notNull(), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(ConsumerRecord record) { + final Map map = new HashMap<>(); + for (Header header : record.headers()) { + map.put(StringData.fromString(header.key()), header.value()); + } + return new GenericMapData(map); + } + }), + + LEADER_EPOCH( + "leader-epoch", + DataTypes.INT().nullable(), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(ConsumerRecord record) { + return record.leaderEpoch().orElse(null); + } + }), + + OFFSET( + "offset", + DataTypes.BIGINT().notNull(), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(ConsumerRecord record) { + return record.offset(); + } + }), + + TIMESTAMP( + "timestamp", + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(ConsumerRecord record) { + return TimestampData.fromEpochMillis(record.timestamp()); + } + }), + + TIMESTAMP_TYPE( + "timestamp-type", + DataTypes.STRING().notNull(), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(ConsumerRecord record) { + return StringData.fromString(record.timestampType().toString()); + } + }); + + final String key; + + final DataType dataType; + + final MetadataConverter converter; + + ReadableMetadata(String key, DataType dataType, MetadataConverter converter) { + this.key = key; + this.dataType = dataType; + this.converter = converter; + } + } +} diff --git a/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java b/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java new file mode 100644 index 0000000..fe1905e --- /dev/null +++ b/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.kafka.source.KafkaSourceOptions; +import org.apache.flink.streaming.connectors.kafka.config.BoundedMode; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.BoundedOptions; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.format.Format; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DeserializationFormatFactory; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper; +import org.apache.flink.table.factories.SerializationFormatFactory; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.RowKind; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.DELIVERY_GUARANTEE; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_GROUP_ID; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_TOPIC_PARTITION_DISCOVERY; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_PARALLELISM; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_PARTITIONER; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC_PATTERN; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TRANSACTIONAL_ID_PREFIX; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FIELDS_INCLUDE; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.StartupOptions; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.autoCompleteSchemaRegistrySubject; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getBoundedOptions; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getFlinkKafkaPartitioner; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getKafkaProperties; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getSourceTopicPattern; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getSourceTopics; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getStartupOptions; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.validateTableSinkOptions; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.validateTableSourceOptions; + +/** + * Factory for creating configured instances of {@link KafkaDynamicSource} and {@link + * KafkaDynamicSink}. + */ +@Internal +public class KafkaDynamicTableFactory + implements DynamicTableSourceFactory, DynamicTableSinkFactory { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaDynamicTableFactory.class); + private static final ConfigOption SINK_SEMANTIC = + ConfigOptions.key("sink.semantic") + .stringType() + .noDefaultValue() + .withDescription("Optional semantic when committing."); + + public static final String IDENTIFIER = "kafka"; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + final Set> options = new HashSet<>(); + options.add(PROPS_BOOTSTRAP_SERVERS); + return options; + } + + @Override + public Set> optionalOptions() { + final Set> options = new HashSet<>(); + options.add(FactoryUtil.FORMAT); + options.add(KEY_FORMAT); + options.add(KEY_FIELDS); + options.add(KEY_FIELDS_PREFIX); + options.add(VALUE_FORMAT); + options.add(VALUE_FIELDS_INCLUDE); + options.add(TOPIC); + options.add(TOPIC_PATTERN); + options.add(PROPS_GROUP_ID); + options.add(SCAN_STARTUP_MODE); + options.add(SCAN_STARTUP_SPECIFIC_OFFSETS); + options.add(SCAN_TOPIC_PARTITION_DISCOVERY); + options.add(SCAN_STARTUP_TIMESTAMP_MILLIS); + options.add(SINK_PARTITIONER); + options.add(SINK_PARALLELISM); + options.add(DELIVERY_GUARANTEE); + options.add(TRANSACTIONAL_ID_PREFIX); + options.add(SINK_SEMANTIC); + options.add(SCAN_BOUNDED_MODE); + options.add(SCAN_BOUNDED_SPECIFIC_OFFSETS); + options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS); + return options; + } + + @Override + public Set> forwardOptions() { + return Stream.of( + PROPS_BOOTSTRAP_SERVERS, + PROPS_GROUP_ID, + TOPIC, + TOPIC_PATTERN, + SCAN_STARTUP_MODE, + SCAN_STARTUP_SPECIFIC_OFFSETS, + SCAN_TOPIC_PARTITION_DISCOVERY, + SCAN_STARTUP_TIMESTAMP_MILLIS, + SINK_PARTITIONER, + SINK_PARALLELISM, + TRANSACTIONAL_ID_PREFIX) + .collect(Collectors.toSet()); + } + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + + final Optional>> keyDecodingFormat = + getKeyDecodingFormat(helper); + + final DecodingFormat> valueDecodingFormat = + getValueDecodingFormat(helper); + + helper.validateExcept(PROPERTIES_PREFIX); + + final ReadableConfig tableOptions = helper.getOptions(); + + validateTableSourceOptions(tableOptions); + + validatePKConstraints( + context.getObjectIdentifier(), + context.getPrimaryKeyIndexes(), + context.getCatalogTable().getOptions(), + valueDecodingFormat); + + final StartupOptions startupOptions = getStartupOptions(tableOptions); + + final BoundedOptions boundedOptions = getBoundedOptions(tableOptions); + + final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions()); + + // add topic-partition discovery + final Duration partitionDiscoveryInterval = + tableOptions.get(SCAN_TOPIC_PARTITION_DISCOVERY); + properties.setProperty( + KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(), + Long.toString(partitionDiscoveryInterval.toMillis())); + + final DataType physicalDataType = context.getPhysicalRowDataType(); + + final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType); + + final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType); + + final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); + + return createKafkaTableSource( + physicalDataType, + keyDecodingFormat.orElse(null), + valueDecodingFormat, + keyProjection, + valueProjection, + keyPrefix, + getSourceTopics(tableOptions), + getSourceTopicPattern(tableOptions), + properties, + startupOptions.startupMode, + startupOptions.specificOffsets, + startupOptions.startupTimestampMillis, + boundedOptions.boundedMode, + boundedOptions.specificOffsets, + boundedOptions.boundedTimestampMillis, + context.getObjectIdentifier().asSummaryString()); + } + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + final TableFactoryHelper helper = + FactoryUtil.createTableFactoryHelper( + this, autoCompleteSchemaRegistrySubject(context)); + + final Optional>> keyEncodingFormat = + getKeyEncodingFormat(helper); + + final EncodingFormat> valueEncodingFormat = + getValueEncodingFormat(helper); + + helper.validateExcept(PROPERTIES_PREFIX); + + final ReadableConfig tableOptions = helper.getOptions(); + + final DeliveryGuarantee deliveryGuarantee = validateDeprecatedSemantic(tableOptions); + validateTableSinkOptions(tableOptions); + + KafkaConnectorOptionsUtil.validateDeliveryGuarantee(tableOptions); + + validatePKConstraints( + context.getObjectIdentifier(), + context.getPrimaryKeyIndexes(), + context.getCatalogTable().getOptions(), + valueEncodingFormat); + + final DataType physicalDataType = context.getPhysicalRowDataType(); + + final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType); + + final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType); + + final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); + + final Integer parallelism = tableOptions.getOptional(SINK_PARALLELISM).orElse(null); + + return createKafkaTableSink( + physicalDataType, + keyEncodingFormat.orElse(null), + valueEncodingFormat, + keyProjection, + valueProjection, + keyPrefix, + tableOptions.get(TOPIC).get(0), + getKafkaProperties(context.getCatalogTable().getOptions()), + getFlinkKafkaPartitioner(tableOptions, context.getClassLoader()).orElse(null), + deliveryGuarantee, + parallelism, + tableOptions.get(TRANSACTIONAL_ID_PREFIX)); + } + + // -------------------------------------------------------------------------------------------- + + private static Optional>> getKeyDecodingFormat( + TableFactoryHelper helper) { + final Optional>> keyDecodingFormat = + helper.discoverOptionalDecodingFormat( + DeserializationFormatFactory.class, KEY_FORMAT); + keyDecodingFormat.ifPresent( + format -> { + if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) { + throw new ValidationException( + String.format( + "A key format should only deal with INSERT-only records. " + + "But %s has a changelog mode of %s.", + helper.getOptions().get(KEY_FORMAT), + format.getChangelogMode())); + } + }); + return keyDecodingFormat; + } + + private static Optional>> getKeyEncodingFormat( + TableFactoryHelper helper) { + final Optional>> keyEncodingFormat = + helper.discoverOptionalEncodingFormat(SerializationFormatFactory.class, KEY_FORMAT); + keyEncodingFormat.ifPresent( + format -> { + if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) { + throw new ValidationException( + String.format( + "A key format should only deal with INSERT-only records. " + + "But %s has a changelog mode of %s.", + helper.getOptions().get(KEY_FORMAT), + format.getChangelogMode())); + } + }); + return keyEncodingFormat; + } + + private static DecodingFormat> getValueDecodingFormat( + TableFactoryHelper helper) { + return helper.discoverOptionalDecodingFormat( + DeserializationFormatFactory.class, FactoryUtil.FORMAT) + .orElseGet( + () -> + helper.discoverDecodingFormat( + DeserializationFormatFactory.class, VALUE_FORMAT)); + } + + private static EncodingFormat> getValueEncodingFormat( + TableFactoryHelper helper) { + return helper.discoverOptionalEncodingFormat( + SerializationFormatFactory.class, FactoryUtil.FORMAT) + .orElseGet( + () -> + helper.discoverEncodingFormat( + SerializationFormatFactory.class, VALUE_FORMAT)); + } + + private static void validatePKConstraints( + ObjectIdentifier tableName, + int[] primaryKeyIndexes, + Map options, + Format format) { + if (primaryKeyIndexes.length > 0 + && format.getChangelogMode().containsOnly(RowKind.INSERT)) { + Configuration configuration = Configuration.fromMap(options); + String formatName = + configuration + .getOptional(FactoryUtil.FORMAT) + .orElse(configuration.get(VALUE_FORMAT)); + throw new ValidationException( + String.format( + "The Kafka table '%s' with '%s' format doesn't support defining PRIMARY KEY constraint" + + " on the table, because it can't guarantee the semantic of primary key.", + tableName.asSummaryString(), formatName)); + } + } + + private static DeliveryGuarantee validateDeprecatedSemantic(ReadableConfig tableOptions) { + if (tableOptions.getOptional(SINK_SEMANTIC).isPresent()) { + LOG.warn( + "{} is deprecated and will be removed. Please use {} instead.", + SINK_SEMANTIC.key(), + DELIVERY_GUARANTEE.key()); + return DeliveryGuarantee.valueOf( + tableOptions.get(SINK_SEMANTIC).toUpperCase().replace("-", "_")); + } + return tableOptions.get(DELIVERY_GUARANTEE); + } + + // -------------------------------------------------------------------------------------------- + + protected KafkaDynamicSource createKafkaTableSource( + DataType physicalDataType, + @Nullable DecodingFormat> keyDecodingFormat, + DecodingFormat> valueDecodingFormat, + int[] keyProjection, + int[] valueProjection, + @Nullable String keyPrefix, + @Nullable List topics, + @Nullable Pattern topicPattern, + Properties properties, + StartupMode startupMode, + Map specificStartupOffsets, + long startupTimestampMillis, + BoundedMode boundedMode, + Map specificEndOffsets, + long endTimestampMillis, + String tableIdentifier) { + return new KafkaDynamicSource( + physicalDataType, + keyDecodingFormat, + valueDecodingFormat, + keyProjection, + valueProjection, + keyPrefix, + topics, + topicPattern, + properties, + startupMode, + specificStartupOffsets, + startupTimestampMillis, + boundedMode, + specificEndOffsets, + endTimestampMillis, + false, + tableIdentifier); + } + + protected KafkaDynamicSink createKafkaTableSink( + DataType physicalDataType, + @Nullable EncodingFormat> keyEncodingFormat, + EncodingFormat> valueEncodingFormat, + int[] keyProjection, + int[] valueProjection, + @Nullable String keyPrefix, + String topic, + Properties properties, + FlinkKafkaPartitioner partitioner, + DeliveryGuarantee deliveryGuarantee, + Integer parallelism, + @Nullable String transactionalIdPrefix) { + return new KafkaDynamicSink( + physicalDataType, + physicalDataType, + keyEncodingFormat, + valueEncodingFormat, + keyProjection, + valueProjection, + keyPrefix, + topic, + properties, + partitioner, + deliveryGuarantee, + false, + SinkBufferFlushMode.DISABLED, + parallelism, + transactionalIdPrefix); + } +} diff --git a/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java b/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java new file mode 100644 index 0000000..cebe27f --- /dev/null +++ b/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.table; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; +import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.BoundedOptions; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.format.Format; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DeserializationFormatFactory; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.SerializationFormatFactory; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.RowKind; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.DELIVERY_GUARANTEE; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_PARALLELISM; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TRANSACTIONAL_ID_PREFIX; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FIELDS_INCLUDE; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.autoCompleteSchemaRegistrySubject; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getBoundedOptions; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getKafkaProperties; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getSourceTopicPattern; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getSourceTopics; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.validateScanBoundedMode; + +/** Upsert-Kafka factory. */ +public class UpsertKafkaDynamicTableFactory + implements DynamicTableSourceFactory, DynamicTableSinkFactory { + + public static final String IDENTIFIER = "upsert-kafka"; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + final Set> options = new HashSet<>(); + options.add(PROPS_BOOTSTRAP_SERVERS); + options.add(TOPIC); + options.add(KEY_FORMAT); + options.add(VALUE_FORMAT); + return options; + } + + @Override + public Set> optionalOptions() { + final Set> options = new HashSet<>(); + options.add(KEY_FIELDS_PREFIX); + options.add(VALUE_FIELDS_INCLUDE); + options.add(SINK_PARALLELISM); + options.add(SINK_BUFFER_FLUSH_INTERVAL); + options.add(SINK_BUFFER_FLUSH_MAX_ROWS); + options.add(SCAN_BOUNDED_MODE); + options.add(SCAN_BOUNDED_SPECIFIC_OFFSETS); + options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS); + options.add(DELIVERY_GUARANTEE); + options.add(TRANSACTIONAL_ID_PREFIX); + return options; + } + + @Override + public Set> forwardOptions() { + return Stream.of(DELIVERY_GUARANTEE, TRANSACTIONAL_ID_PREFIX).collect(Collectors.toSet()); + } + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + + ReadableConfig tableOptions = helper.getOptions(); + DecodingFormat> keyDecodingFormat = + helper.discoverDecodingFormat(DeserializationFormatFactory.class, KEY_FORMAT); + DecodingFormat> valueDecodingFormat = + helper.discoverDecodingFormat(DeserializationFormatFactory.class, VALUE_FORMAT); + + // Validate the option data type. + helper.validateExcept(PROPERTIES_PREFIX); + validateSource( + tableOptions, + keyDecodingFormat, + valueDecodingFormat, + context.getPrimaryKeyIndexes()); + + Tuple2 keyValueProjections = + createKeyValueProjections(context.getCatalogTable()); + String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); + Properties properties = getKafkaProperties(context.getCatalogTable().getOptions()); + // always use earliest to keep data integrity + StartupMode earliest = StartupMode.EARLIEST; + + final BoundedOptions boundedOptions = getBoundedOptions(tableOptions); + + return new KafkaDynamicSource( + context.getPhysicalRowDataType(), + keyDecodingFormat, + new DecodingFormatWrapper(valueDecodingFormat), + keyValueProjections.f0, + keyValueProjections.f1, + keyPrefix, + getSourceTopics(tableOptions), + getSourceTopicPattern(tableOptions), + properties, + earliest, + Collections.emptyMap(), + 0, + boundedOptions.boundedMode, + boundedOptions.specificOffsets, + boundedOptions.boundedTimestampMillis, + true, + context.getObjectIdentifier().asSummaryString()); + } + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + FactoryUtil.TableFactoryHelper helper = + FactoryUtil.createTableFactoryHelper( + this, autoCompleteSchemaRegistrySubject(context)); + + final ReadableConfig tableOptions = helper.getOptions(); + + EncodingFormat> keyEncodingFormat = + helper.discoverEncodingFormat(SerializationFormatFactory.class, KEY_FORMAT); + EncodingFormat> valueEncodingFormat = + helper.discoverEncodingFormat(SerializationFormatFactory.class, VALUE_FORMAT); + + // Validate the option data type. + helper.validateExcept(PROPERTIES_PREFIX); + validateSink( + tableOptions, + keyEncodingFormat, + valueEncodingFormat, + context.getPrimaryKeyIndexes()); + KafkaConnectorOptionsUtil.validateDeliveryGuarantee(tableOptions); + + Tuple2 keyValueProjections = + createKeyValueProjections(context.getCatalogTable()); + final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); + final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions()); + + Integer parallelism = tableOptions.get(SINK_PARALLELISM); + + int batchSize = tableOptions.get(SINK_BUFFER_FLUSH_MAX_ROWS); + Duration batchInterval = tableOptions.get(SINK_BUFFER_FLUSH_INTERVAL); + SinkBufferFlushMode flushMode = + new SinkBufferFlushMode(batchSize, batchInterval.toMillis()); + + // use {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}. + // it will use hash partition if key is set else in round-robin behaviour. + return new KafkaDynamicSink( + context.getPhysicalRowDataType(), + context.getPhysicalRowDataType(), + keyEncodingFormat, + new EncodingFormatWrapper(valueEncodingFormat), + keyValueProjections.f0, + keyValueProjections.f1, + keyPrefix, + tableOptions.get(TOPIC).get(0), + properties, + null, + tableOptions.get(DELIVERY_GUARANTEE), + true, + flushMode, + parallelism, + tableOptions.get(TRANSACTIONAL_ID_PREFIX)); + } + + private Tuple2 createKeyValueProjections(ResolvedCatalogTable catalogTable) { + ResolvedSchema schema = catalogTable.getResolvedSchema(); + // primary key should validated earlier + List keyFields = schema.getPrimaryKey().get().getColumns(); + DataType physicalDataType = schema.toPhysicalRowDataType(); + + Configuration tableOptions = Configuration.fromMap(catalogTable.getOptions()); + // upsert-kafka will set key.fields to primary key fields by default + tableOptions.set(KEY_FIELDS, keyFields); + + int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType); + int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType); + + return Tuple2.of(keyProjection, valueProjection); + } + + // -------------------------------------------------------------------------------------------- + // Validation + // -------------------------------------------------------------------------------------------- + + private static void validateSource( + ReadableConfig tableOptions, + Format keyFormat, + Format valueFormat, + int[] primaryKeyIndexes) { + validateTopic(tableOptions); + validateScanBoundedMode(tableOptions); + validateFormat(keyFormat, valueFormat, tableOptions); + validatePKConstraints(primaryKeyIndexes); + } + + private static void validateSink( + ReadableConfig tableOptions, + Format keyFormat, + Format valueFormat, + int[] primaryKeyIndexes) { + validateTopic(tableOptions); + validateFormat(keyFormat, valueFormat, tableOptions); + validatePKConstraints(primaryKeyIndexes); + validateSinkBufferFlush(tableOptions); + } + + private static void validateTopic(ReadableConfig tableOptions) { + List topic = tableOptions.get(TOPIC); + if (topic.size() > 1) { + throw new ValidationException( + "The 'upsert-kafka' connector doesn't support topic list now. " + + "Please use single topic as the value of the parameter 'topic'."); + } + } + + private static void validateFormat( + Format keyFormat, Format valueFormat, ReadableConfig tableOptions) { + if (!keyFormat.getChangelogMode().containsOnly(RowKind.INSERT)) { + String identifier = tableOptions.get(KEY_FORMAT); + throw new ValidationException( + String.format( + "'upsert-kafka' connector doesn't support '%s' as key format, " + + "because '%s' is not in insert-only mode.", + identifier, identifier)); + } + if (!valueFormat.getChangelogMode().containsOnly(RowKind.INSERT)) { + String identifier = tableOptions.get(VALUE_FORMAT); + throw new ValidationException( + String.format( + "'upsert-kafka' connector doesn't support '%s' as value format, " + + "because '%s' is not in insert-only mode.", + identifier, identifier)); + } + } + + private static void validatePKConstraints(int[] schema) { + if (schema.length == 0) { + throw new ValidationException( + "'upsert-kafka' tables require to define a PRIMARY KEY constraint. " + + "The PRIMARY KEY specifies which columns should be read from or write to the Kafka message key. " + + "The PRIMARY KEY also defines records in the 'upsert-kafka' table should update or delete on which keys."); + } + } + + private static void validateSinkBufferFlush(ReadableConfig tableOptions) { + int flushMaxRows = tableOptions.get(SINK_BUFFER_FLUSH_MAX_ROWS); + long flushIntervalMs = tableOptions.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis(); + if (flushMaxRows > 0 && flushIntervalMs > 0) { + // flush is enabled + return; + } + if (flushMaxRows <= 0 && flushIntervalMs <= 0) { + // flush is disabled + return; + } + // one of them is set which is not allowed + throw new ValidationException( + String.format( + "'%s' and '%s' must be set to be greater than zero together to enable sink buffer flushing.", + SINK_BUFFER_FLUSH_MAX_ROWS.key(), SINK_BUFFER_FLUSH_INTERVAL.key())); + } + + // -------------------------------------------------------------------------------------------- + // Format wrapper + // -------------------------------------------------------------------------------------------- + + /** + * It is used to wrap the decoding format and expose the desired changelog mode. It's only works + * for insert-only format. + */ + protected static class DecodingFormatWrapper + implements DecodingFormat> { + private final DecodingFormat> innerDecodingFormat; + + private static final ChangelogMode SOURCE_CHANGELOG_MODE = + ChangelogMode.newBuilder() + .addContainedKind(RowKind.UPDATE_AFTER) + .addContainedKind(RowKind.DELETE) + .build(); + + public DecodingFormatWrapper( + DecodingFormat> innerDecodingFormat) { + this.innerDecodingFormat = innerDecodingFormat; + } + + @Override + public DeserializationSchema createRuntimeDecoder( + DynamicTableSource.Context context, DataType producedDataType) { + return innerDecodingFormat.createRuntimeDecoder(context, producedDataType); + } + + @Override + public ChangelogMode getChangelogMode() { + return SOURCE_CHANGELOG_MODE; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + DecodingFormatWrapper that = (DecodingFormatWrapper) obj; + return Objects.equals(innerDecodingFormat, that.innerDecodingFormat); + } + + @Override + public int hashCode() { + return Objects.hash(innerDecodingFormat); + } + } + + /** + * It is used to wrap the encoding format and expose the desired changelog mode. It's only works + * for insert-only format. + */ + protected static class EncodingFormatWrapper + implements EncodingFormat> { + private final EncodingFormat> innerEncodingFormat; + + public static final ChangelogMode SINK_CHANGELOG_MODE = + ChangelogMode.newBuilder() + .addContainedKind(RowKind.INSERT) + .addContainedKind(RowKind.UPDATE_AFTER) + .addContainedKind(RowKind.DELETE) + .build(); + + public EncodingFormatWrapper( + EncodingFormat> innerEncodingFormat) { + this.innerEncodingFormat = innerEncodingFormat; + } + + @Override + public SerializationSchema createRuntimeEncoder( + DynamicTableSink.Context context, DataType consumedDataType) { + return innerEncodingFormat.createRuntimeEncoder(context, consumedDataType); + } + + @Override + public ChangelogMode getChangelogMode() { + return SINK_CHANGELOG_MODE; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + EncodingFormatWrapper that = (EncodingFormatWrapper) obj; + return Objects.equals(innerEncodingFormat, that.innerEncodingFormat); + } + + @Override + public int hashCode() { + return Objects.hash(innerEncodingFormat); + } + } +} diff --git a/pom.xml b/pom.xml index 84164b2..76aed3e 100644 --- a/pom.xml +++ b/pom.xml @@ -178,6 +178,7 @@
com/mycila/maven/plugin/license/templates/APACHE-2.txt
+ src/main/java/org/apache/flink/** src/test/resources/** m2e-target/** bin/** From e855980ecf82f8f7112c0062ad01db52450177a8 Mon Sep 17 00:00:00 2001 From: Ferenc Csaky Date: Fri, 14 Mar 2025 19:39:22 +0100 Subject: [PATCH 2/2] Add deserialization failure handler implementation --- .../kafka/table/DeserFailureHandler.java | 88 +++++++++++++++++++ .../table/DeserFailureHandlerOptions.java | 63 +++++++++++++ .../kafka/table/DeserFailureHandlerType.java | 41 +++++++++ .../kafka/table/DeserFailureProducer.java | 83 +++++++++++++++++ .../DynamicKafkaDeserializationSchema.java | 15 +++- .../kafka/table/KafkaDynamicSource.java | 12 ++- .../kafka/table/KafkaDynamicTableFactory.java | 16 +++- .../table/UpsertKafkaDynamicTableFactory.java | 9 +- 8 files changed, 316 insertions(+), 11 deletions(-) create mode 100644 flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DeserFailureHandler.java create mode 100644 flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DeserFailureHandlerOptions.java create mode 100644 flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DeserFailureHandlerType.java create mode 100644 flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DeserFailureProducer.java diff --git a/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DeserFailureHandler.java b/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DeserFailureHandler.java new file mode 100644 index 0000000..96e95d5 --- /dev/null +++ b/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DeserFailureHandler.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.table; + +import org.apache.flink.configuration.ReadableConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.io.Serializable; +import java.util.Properties; + +import static org.apache.flink.streaming.connectors.kafka.table.DeserFailureHandlerOptions.SCAN_DESER_FAILURE_HANDLER; +import static org.apache.flink.streaming.connectors.kafka.table.DeserFailureHandlerOptions.SCAN_DESER_FAILURE_TOPIC; + +public class DeserFailureHandler { + + private static final Logger LOG = LoggerFactory.getLogger(DeserFailureHandler.class); + + private final DeserFailureHandlerType handlerType; + private final @Nullable DeserFailureProducer producer; + + DeserFailureHandler(DeserFailureHandlerType handlerType, @Nullable DeserFailureProducer producer) { + this.handlerType = handlerType; + this.producer = producer; + } + + static DeserFailureHandler of(ReadableConfig tableOptions, Properties consumerProps) { + DeserFailureHandlerType handlerType = tableOptions.get(SCAN_DESER_FAILURE_HANDLER); + + DeserFailureProducer producer = + handlerType == DeserFailureHandlerType.KAFKA + ? new DeserFailureProducer(tableOptions.get(SCAN_DESER_FAILURE_TOPIC), consumerProps) + : null; + + return new DeserFailureHandler(handlerType, producer); + } + + void deserWithFailureHandling(ConsumerRecord record, DeserializationCaller deser) + throws IOException { + + try { + deser.call(); + } catch (IOException e) { + if (DeserFailureHandlerType.NONE == handlerType) { + throw e; + + } else if (DeserFailureHandlerType.LOG == handlerType) { + LOG.info( + "Deserialization failure occurred for record. Topic: {}, Partition: {}, Offset: {}", + record.topic(), + record.partition(), + record.offset()); + + } else if (DeserFailureHandlerType.KAFKA == handlerType) { + LOG.info( + "Deserialization failure occurred for record, sending it to the configured topic ({}). Topic: {}, Partition: {}, Offset: {}", + producer.getTopic(), + record.topic(), + record.partition(), + record.offset()); + producer.send(record); + } + } + } + + interface DeserializationCaller extends Serializable { + void call() throws IOException; + } +} diff --git a/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DeserFailureHandlerOptions.java b/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DeserFailureHandlerOptions.java new file mode 100644 index 0000000..cb5896c --- /dev/null +++ b/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DeserFailureHandlerOptions.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.table; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.util.StringUtils; + +public class DeserFailureHandlerOptions { + + public static final ConfigOption SCAN_DESER_FAILURE_HANDLER = + ConfigOptions.key("scan.deser-failure.handler") + .enumType(DeserFailureHandlerType.class) + .defaultValue(DeserFailureHandlerType.NONE); + + public static final ConfigOption SCAN_DESER_FAILURE_TOPIC = + ConfigOptions.key("scan.deser-failure.topic") + .stringType() + .noDefaultValue(); + + public static void validateDeserFailureHandlerOptions(ReadableConfig tableOptions) { + var handler = tableOptions.get(SCAN_DESER_FAILURE_HANDLER); + var topic = tableOptions.get(SCAN_DESER_FAILURE_TOPIC); + + if (handler == DeserFailureHandlerType.KAFKA && StringUtils.isNullOrWhitespaceOnly(topic)) { + throw new ValidationException( + String.format( + "'%s' is set to '%s', but '%s' is not specified.", + SCAN_DESER_FAILURE_HANDLER.key(), + DeserFailureHandlerType.KAFKA, + SCAN_DESER_FAILURE_TOPIC.key())); + } + + if (handler != DeserFailureHandlerType.KAFKA && !StringUtils.isNullOrWhitespaceOnly(topic)) { + throw new ValidationException( + String.format( + "'%s' is not set to '%s', but '%s' is specified.", + SCAN_DESER_FAILURE_HANDLER.key(), + DeserFailureHandlerType.KAFKA, + SCAN_DESER_FAILURE_TOPIC.key())); + } + } + + private DeserFailureHandlerOptions() {} +} diff --git a/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DeserFailureHandlerType.java b/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DeserFailureHandlerType.java new file mode 100644 index 0000000..de819c0 --- /dev/null +++ b/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DeserFailureHandlerType.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.table; + +public enum DeserFailureHandlerType { + /** + * No deserialization failure handling is applied. + * In case of a problematic record, the application will fail. + * This is the default setting. + */ + NONE, + + /** + * In case of a problematic record, helpful information will be logged. + * The application continues the execution. + */ + LOG, + + /** + * In case of a problematic record, helpful information will be logged, + * and the record will be sent to a configured Kafka topic as well. + * The application continues the execution. + */ + KAFKA +} diff --git a/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DeserFailureProducer.java b/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DeserFailureProducer.java new file mode 100644 index 0000000..e19013a --- /dev/null +++ b/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DeserFailureProducer.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.table; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Properties; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +class DeserFailureProducer implements Serializable { + + private static final Logger LOG = LoggerFactory.getLogger(DeserFailureProducer.class); + + private final String topic; + private final Properties producerProps; + + private transient KafkaProducer kafkaProducer; + + DeserFailureProducer(String topic, Properties consumerProps) { + this.topic = checkNotNull(topic); + + producerProps = new Properties(); + producerProps.putAll(consumerProps); + producerProps.setProperty( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + producerProps.setProperty( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + } + + private void init() { + if (kafkaProducer == null) { + LOG.debug("Initializing deserialization failure producer."); + kafkaProducer = new KafkaProducer<>(producerProps); + } + } + + void send(ConsumerRecord record) { + init(); + + if (record == null) { + LOG.info("Unable to send deserialization failed record: Record was null."); + } else if (kafkaProducer == null) { + LOG.warn("Unable to send deserialization failed record: Kafka producer is not initialized."); + } else { + kafkaProducer.send( + new ProducerRecord<>( + topic, + null, + null, + record.key(), + record.value(), + record.headers())); + } + } + + public String getTopic() { + return topic; + } +} diff --git a/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java b/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java index ba26a1e..62c2e71 100644 --- a/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java +++ b/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java @@ -56,6 +56,8 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema keyDeserialization, @@ -65,7 +67,8 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema producedTypeInfo, - boolean upsertMode) { + boolean upsertMode, + DeserFailureHandler deserFailureHandler) { if (upsertMode) { Preconditions.checkArgument( keyDeserialization != null && keyProjection.length > 0, @@ -84,6 +87,7 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema record, Collector valueDeserialization.deserialize(record.value(), collector)); return; } // buffer key(s) if (keyDeserialization != null) { - keyDeserialization.deserialize(record.key(), keyCollector); + deserFailureHandler.deserWithFailureHandling( + record, () -> keyDeserialization.deserialize(record.key(), keyCollector)); } // project output while emitting values @@ -127,7 +133,8 @@ public void deserialize(ConsumerRecord record, Collector valueDeserialization.deserialize(record.value(), outputCollector)); } keyCollector.buffer.clear(); } diff --git a/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java b/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java index 5123064..a06255f 100644 --- a/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java +++ b/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java @@ -171,6 +171,8 @@ public class KafkaDynamicSource protected final String tableIdentifier; + protected final DeserFailureHandler deserFailureHandler; + public KafkaDynamicSource( DataType physicalDataType, @Nullable DecodingFormat> keyDecodingFormat, @@ -188,7 +190,8 @@ public KafkaDynamicSource( Map specificBoundedOffsets, long boundedTimestampMillis, boolean upsertMode, - String tableIdentifier) { + String tableIdentifier, + DeserFailureHandler deserFailureHandler) { // Format attributes this.physicalDataType = Preconditions.checkNotNull( @@ -228,6 +231,7 @@ public KafkaDynamicSource( this.boundedTimestampMillis = boundedTimestampMillis; this.upsertMode = upsertMode; this.tableIdentifier = tableIdentifier; + this.deserFailureHandler = deserFailureHandler; } @Override @@ -344,7 +348,8 @@ public DynamicTableSource copy() { specificBoundedOffsets, boundedTimestampMillis, upsertMode, - tableIdentifier); + tableIdentifier, + deserFailureHandler); copy.producedDataType = producedDataType; copy.metadataKeys = metadataKeys; copy.watermarkStrategy = watermarkStrategy; @@ -550,7 +555,8 @@ private KafkaDeserializationSchema createKafkaDeserializationSchema( hasMetadata, metadataConverters, producedTypeInfo, - upsertMode); + upsertMode, + deserFailureHandler); } private @Nullable DeserializationSchema createDeserialization( diff --git a/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java b/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java index fe1905e..e3910df 100644 --- a/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java +++ b/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java @@ -65,6 +65,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.flink.streaming.connectors.kafka.table.DeserFailureHandlerOptions.*; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.DELIVERY_GUARANTEE; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX; @@ -152,6 +153,8 @@ public Set> optionalOptions() { options.add(SCAN_BOUNDED_MODE); options.add(SCAN_BOUNDED_SPECIFIC_OFFSETS); options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS); + options.add(SCAN_DESER_FAILURE_HANDLER); + options.add(SCAN_DESER_FAILURE_TOPIC); return options; } @@ -194,6 +197,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { context.getCatalogTable().getOptions(), valueDecodingFormat); + validateDeserFailureHandlerOptions(tableOptions); + final StartupOptions startupOptions = getStartupOptions(tableOptions); final BoundedOptions boundedOptions = getBoundedOptions(tableOptions); @@ -215,6 +220,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); + final DeserFailureHandler deserFailureHandler = DeserFailureHandler.of(tableOptions, properties); + return createKafkaTableSource( physicalDataType, keyDecodingFormat.orElse(null), @@ -231,7 +238,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { boundedOptions.boundedMode, boundedOptions.specificOffsets, boundedOptions.boundedTimestampMillis, - context.getObjectIdentifier().asSummaryString()); + context.getObjectIdentifier().asSummaryString(), + deserFailureHandler); } @Override @@ -395,7 +403,8 @@ protected KafkaDynamicSource createKafkaTableSource( BoundedMode boundedMode, Map specificEndOffsets, long endTimestampMillis, - String tableIdentifier) { + String tableIdentifier, + DeserFailureHandler deserFailureHandler) { return new KafkaDynamicSource( physicalDataType, keyDecodingFormat, @@ -413,7 +422,8 @@ protected KafkaDynamicSource createKafkaTableSource( specificEndOffsets, endTimestampMillis, false, - tableIdentifier); + tableIdentifier, + deserFailureHandler); } protected KafkaDynamicSink createKafkaTableSink( diff --git a/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java b/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java index cebe27f..86683f0 100644 --- a/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java +++ b/flink-jar-runner/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java @@ -54,6 +54,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.flink.streaming.connectors.kafka.table.DeserFailureHandlerOptions.*; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.DELIVERY_GUARANTEE; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX; @@ -111,6 +112,8 @@ public Set> optionalOptions() { options.add(SCAN_BOUNDED_MODE); options.add(SCAN_BOUNDED_SPECIFIC_OFFSETS); options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS); + options.add(SCAN_DESER_FAILURE_HANDLER); + options.add(SCAN_DESER_FAILURE_TOPIC); options.add(DELIVERY_GUARANTEE); options.add(TRANSACTIONAL_ID_PREFIX); return options; @@ -148,6 +151,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { final BoundedOptions boundedOptions = getBoundedOptions(tableOptions); + final DeserFailureHandler deserFailureHandler = DeserFailureHandler.of(tableOptions, properties); + return new KafkaDynamicSource( context.getPhysicalRowDataType(), keyDecodingFormat, @@ -165,7 +170,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { boundedOptions.specificOffsets, boundedOptions.boundedTimestampMillis, true, - context.getObjectIdentifier().asSummaryString()); + context.getObjectIdentifier().asSummaryString(), + deserFailureHandler); } @Override @@ -251,6 +257,7 @@ private static void validateSource( validateScanBoundedMode(tableOptions); validateFormat(keyFormat, valueFormat, tableOptions); validatePKConstraints(primaryKeyIndexes); + validateDeserFailureHandlerOptions(tableOptions); } private static void validateSink(