Skip to content

Commit fb2711f

Browse files
committed
Add required "flink-connector-kafka" classes locally without any modification
Source state: https://github.com/apache/flink-connector-kafka/tree/3.2.0
1 parent 1544884 commit fb2711f

File tree

6 files changed

+1843
-0
lines changed

6 files changed

+1843
-0
lines changed

flink-jar-runner/pom.xml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@
3030
<inceptionYear>2024</inceptionYear>
3131

3232
<dependencies>
33+
<dependency>
34+
<groupId>org.apache.flink</groupId>
35+
<artifactId>flink-connector-base</artifactId>
36+
<version>${flink.version}</version>
37+
<scope>provided</scope>
38+
</dependency>
3339
<dependency>
3440
<groupId>org.apache.flink</groupId>
3541
<artifactId>flink-streaming-java</artifactId>
@@ -251,6 +257,15 @@
251257
</excludes>
252258
</artifactSet>
253259
<filters>
260+
<filter>
261+
<artifact>org.apache.flink:flink-connector-kafka</artifact>
262+
<excludes>
263+
<exclude>org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema**</exclude>
264+
<exclude>org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource**</exclude>
265+
<exclude>org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory**</exclude>
266+
<exclude>org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory**</exclude>
267+
</excludes>
268+
</filter>
254269
<filter>
255270
<!-- Do not copy the signatures in the META-INF folder.
256271
Otherwise, this might cause SecurityExceptions when using the JAR. -->
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.streaming.connectors.kafka.table;
20+
21+
import org.apache.flink.api.common.serialization.DeserializationSchema;
22+
import org.apache.flink.api.common.typeinfo.TypeInformation;
23+
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
24+
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
25+
import org.apache.flink.table.data.GenericRowData;
26+
import org.apache.flink.table.data.RowData;
27+
import org.apache.flink.types.DeserializationException;
28+
import org.apache.flink.types.RowKind;
29+
import org.apache.flink.util.Collector;
30+
import org.apache.flink.util.Preconditions;
31+
32+
import org.apache.kafka.clients.consumer.ConsumerRecord;
33+
34+
import javax.annotation.Nullable;
35+
36+
import java.io.Serializable;
37+
import java.util.ArrayList;
38+
import java.util.List;
39+
40+
/** A specific {@link KafkaSerializationSchema} for {@link KafkaDynamicSource}. */
41+
class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<RowData> {
42+
43+
private static final long serialVersionUID = 1L;
44+
45+
private final @Nullable DeserializationSchema<RowData> keyDeserialization;
46+
47+
private final DeserializationSchema<RowData> valueDeserialization;
48+
49+
private final boolean hasMetadata;
50+
51+
private final BufferingCollector keyCollector;
52+
53+
private final OutputProjectionCollector outputCollector;
54+
55+
private final TypeInformation<RowData> producedTypeInfo;
56+
57+
private final boolean upsertMode;
58+
59+
DynamicKafkaDeserializationSchema(
60+
int physicalArity,
61+
@Nullable DeserializationSchema<RowData> keyDeserialization,
62+
int[] keyProjection,
63+
DeserializationSchema<RowData> valueDeserialization,
64+
int[] valueProjection,
65+
boolean hasMetadata,
66+
MetadataConverter[] metadataConverters,
67+
TypeInformation<RowData> producedTypeInfo,
68+
boolean upsertMode) {
69+
if (upsertMode) {
70+
Preconditions.checkArgument(
71+
keyDeserialization != null && keyProjection.length > 0,
72+
"Key must be set in upsert mode for deserialization schema.");
73+
}
74+
this.keyDeserialization = keyDeserialization;
75+
this.valueDeserialization = valueDeserialization;
76+
this.hasMetadata = hasMetadata;
77+
this.keyCollector = new BufferingCollector();
78+
this.outputCollector =
79+
new OutputProjectionCollector(
80+
physicalArity,
81+
keyProjection,
82+
valueProjection,
83+
metadataConverters,
84+
upsertMode);
85+
this.producedTypeInfo = producedTypeInfo;
86+
this.upsertMode = upsertMode;
87+
}
88+
89+
@Override
90+
public void open(DeserializationSchema.InitializationContext context) throws Exception {
91+
if (keyDeserialization != null) {
92+
keyDeserialization.open(context);
93+
}
94+
valueDeserialization.open(context);
95+
}
96+
97+
@Override
98+
public boolean isEndOfStream(RowData nextElement) {
99+
return false;
100+
}
101+
102+
@Override
103+
public RowData deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
104+
throw new IllegalStateException("A collector is required for deserializing.");
105+
}
106+
107+
@Override
108+
public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData> collector)
109+
throws Exception {
110+
// shortcut in case no output projection is required,
111+
// also not for a cartesian product with the keys
112+
if (keyDeserialization == null && !hasMetadata) {
113+
valueDeserialization.deserialize(record.value(), collector);
114+
return;
115+
}
116+
117+
// buffer key(s)
118+
if (keyDeserialization != null) {
119+
keyDeserialization.deserialize(record.key(), keyCollector);
120+
}
121+
122+
// project output while emitting values
123+
outputCollector.inputRecord = record;
124+
outputCollector.physicalKeyRows = keyCollector.buffer;
125+
outputCollector.outputCollector = collector;
126+
if (record.value() == null && upsertMode) {
127+
// collect tombstone messages in upsert mode by hand
128+
outputCollector.collect(null);
129+
} else {
130+
valueDeserialization.deserialize(record.value(), outputCollector);
131+
}
132+
keyCollector.buffer.clear();
133+
}
134+
135+
@Override
136+
public TypeInformation<RowData> getProducedType() {
137+
return producedTypeInfo;
138+
}
139+
140+
// --------------------------------------------------------------------------------------------
141+
142+
interface MetadataConverter extends Serializable {
143+
Object read(ConsumerRecord<?, ?> record);
144+
}
145+
146+
// --------------------------------------------------------------------------------------------
147+
148+
private static final class BufferingCollector implements Collector<RowData>, Serializable {
149+
150+
private static final long serialVersionUID = 1L;
151+
152+
private final List<RowData> buffer = new ArrayList<>();
153+
154+
@Override
155+
public void collect(RowData record) {
156+
buffer.add(record);
157+
}
158+
159+
@Override
160+
public void close() {
161+
// nothing to do
162+
}
163+
}
164+
165+
// --------------------------------------------------------------------------------------------
166+
167+
/**
168+
* Emits a row with key, value, and metadata fields.
169+
*
170+
* <p>The collector is able to handle the following kinds of keys:
171+
*
172+
* <ul>
173+
* <li>No key is used.
174+
* <li>A key is used.
175+
* <li>The deserialization schema emits multiple keys.
176+
* <li>Keys and values have overlapping fields.
177+
* <li>Keys are used and value is null.
178+
* </ul>
179+
*/
180+
private static final class OutputProjectionCollector
181+
implements Collector<RowData>, Serializable {
182+
183+
private static final long serialVersionUID = 1L;
184+
185+
private final int physicalArity;
186+
187+
private final int[] keyProjection;
188+
189+
private final int[] valueProjection;
190+
191+
private final MetadataConverter[] metadataConverters;
192+
193+
private final boolean upsertMode;
194+
195+
private transient ConsumerRecord<?, ?> inputRecord;
196+
197+
private transient List<RowData> physicalKeyRows;
198+
199+
private transient Collector<RowData> outputCollector;
200+
201+
OutputProjectionCollector(
202+
int physicalArity,
203+
int[] keyProjection,
204+
int[] valueProjection,
205+
MetadataConverter[] metadataConverters,
206+
boolean upsertMode) {
207+
this.physicalArity = physicalArity;
208+
this.keyProjection = keyProjection;
209+
this.valueProjection = valueProjection;
210+
this.metadataConverters = metadataConverters;
211+
this.upsertMode = upsertMode;
212+
}
213+
214+
@Override
215+
public void collect(RowData physicalValueRow) {
216+
// no key defined
217+
if (keyProjection.length == 0) {
218+
emitRow(null, (GenericRowData) physicalValueRow);
219+
return;
220+
}
221+
222+
// otherwise emit a value for each key
223+
for (RowData physicalKeyRow : physicalKeyRows) {
224+
emitRow((GenericRowData) physicalKeyRow, (GenericRowData) physicalValueRow);
225+
}
226+
}
227+
228+
@Override
229+
public void close() {
230+
// nothing to do
231+
}
232+
233+
private void emitRow(
234+
@Nullable GenericRowData physicalKeyRow,
235+
@Nullable GenericRowData physicalValueRow) {
236+
final RowKind rowKind;
237+
if (physicalValueRow == null) {
238+
if (upsertMode) {
239+
rowKind = RowKind.DELETE;
240+
} else {
241+
throw new DeserializationException(
242+
"Invalid null value received in non-upsert mode. Could not to set row kind for output record.");
243+
}
244+
} else {
245+
rowKind = physicalValueRow.getRowKind();
246+
}
247+
248+
final int metadataArity = metadataConverters.length;
249+
final GenericRowData producedRow =
250+
new GenericRowData(rowKind, physicalArity + metadataArity);
251+
252+
for (int keyPos = 0; keyPos < keyProjection.length; keyPos++) {
253+
assert physicalKeyRow != null;
254+
producedRow.setField(keyProjection[keyPos], physicalKeyRow.getField(keyPos));
255+
}
256+
257+
if (physicalValueRow != null) {
258+
for (int valuePos = 0; valuePos < valueProjection.length; valuePos++) {
259+
producedRow.setField(
260+
valueProjection[valuePos], physicalValueRow.getField(valuePos));
261+
}
262+
}
263+
264+
for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
265+
producedRow.setField(
266+
physicalArity + metadataPos,
267+
metadataConverters[metadataPos].read(inputRecord));
268+
}
269+
270+
outputCollector.collect(producedRow);
271+
}
272+
}
273+
}

0 commit comments

Comments
 (0)