Skip to content

Commit dda90a7

Browse files
committed
[FLINK-32609] Support Projection Pushdown
1 parent cb5c5c0 commit dda90a7

File tree

11 files changed

+1984
-122
lines changed

11 files changed

+1984
-122
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.streaming.connectors.kafka.config;
20+
21+
import org.apache.flink.annotation.Internal;
22+
23+
/**
24+
* Projection pushdown mode for {@link
25+
* org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource}.
26+
*/
27+
@Internal
28+
public enum FormatProjectionPushdownLevel {
29+
30+
/** The format does not support any kind of projection pushdown. */
31+
NONE,
32+
33+
/** The format supports projection pushdown for top-level fields only. */
34+
TOP_LEVEL,
35+
36+
/** The format supports projection pushdown for top-level and nested fields. */
37+
ALL
38+
}
Lines changed: 347 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,347 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.streaming.connectors.kafka.table;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.api.common.serialization.DeserializationSchema;
23+
import org.apache.flink.table.connector.Projection;
24+
import org.apache.flink.table.connector.format.DecodingFormat;
25+
import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
26+
import org.apache.flink.table.connector.source.DynamicTableSource.Context;
27+
import org.apache.flink.table.data.GenericRowData;
28+
import org.apache.flink.table.data.RowData;
29+
import org.apache.flink.table.types.DataType;
30+
31+
import javax.annotation.Nullable;
32+
33+
import java.io.Serializable;
34+
import java.util.ArrayList;
35+
import java.util.Arrays;
36+
import java.util.Collections;
37+
import java.util.HashMap;
38+
import java.util.List;
39+
import java.util.Map;
40+
import java.util.Objects;
41+
import java.util.Optional;
42+
import java.util.stream.Collectors;
43+
44+
/**
45+
* Decoding messages consists of two potential steps:
46+
*
47+
* <ol>
48+
* <li>Deserialization i.e deserializing the {@code byte[]} into a {@link RowData}. This process
49+
* is handled by a {@link DeserializationSchema}.
50+
* <li>Projection i.e. projecting any required fields from the deserialized {@link RowData}
51+
* (returned by the {@link DeserializationSchema} in the first step) to their positions in the
52+
* final produced {@link RowData}. This process is handled by a {@link Projector}.
53+
* </ol>
54+
*
55+
* <p>In order to decode messages correctly, the {@link DeserializationSchema} and the {@link
56+
* Projector} need to work together. For example, the {@link Projector} needs to know the positions
57+
* of the required fields in the {@link RowData} returned by the {@link DeserializationSchema} in
58+
* order to be able to correctly set fields in the final produced {@link RowData}.
59+
*
60+
* <p>That's why we have this {@link Decoder} class. This class ensures that the returned {@link
61+
* DeserializationSchema} and {@link Projector} will work together to decode messages correctly.
62+
*/
63+
@Internal
64+
public class Decoder {
65+
66+
/**
67+
* Can be null. Null is used inside {@link DynamicKafkaDeserializationSchema} to avoid
68+
* deserializing keys if not required.
69+
*/
70+
private final @Nullable DeserializationSchema<RowData> deserializationSchema;
71+
72+
/** Mapping of the physical position in the key to the target position in the RowData. */
73+
private final Projector projector;
74+
75+
private Decoder(
76+
final DeserializationSchema<RowData> deserializationSchema, final Projector projector) {
77+
this.deserializationSchema = deserializationSchema;
78+
this.projector = projector;
79+
}
80+
81+
/**
82+
* @param decodingFormat Optional format for decoding bytes.
83+
* @param tableDataType The data type representing the table schema.
84+
* @param dataTypeProjection Indices indicate the position of the field in the dataType
85+
* (key/value). Values indicate the position of the field in the tableSchema.
86+
* @param prefix Optional field prefix
87+
* @param projectedFields Indices indicate the position of the field in the produced Row. Values
88+
* indicate the position of the field in the table schema.
89+
* @param pushProjectionsIntoDecodingFormat if this is true and the format is a {@link
90+
* ProjectableDecodingFormat}, any {@param projectedFields} will be pushed down into the
91+
* {@link ProjectableDecodingFormat}. Otherwise, projections will be applied after
92+
* deserialization.
93+
* @return a {@link Decoder} instance.
94+
*/
95+
public static Decoder create(
96+
final Context context,
97+
final @Nullable DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
98+
final DataType tableDataType,
99+
final int[] dataTypeProjection,
100+
final @Nullable String prefix,
101+
final int[][] projectedFields,
102+
final List<String> metadataKeys,
103+
final boolean pushProjectionsIntoDecodingFormat) {
104+
if (decodingFormat == null) {
105+
return Decoder.noDeserializationOrProjection();
106+
} else {
107+
if (decodingFormat instanceof ProjectableDecodingFormat
108+
&& pushProjectionsIntoDecodingFormat) {
109+
return Decoder.projectInsideDeserializer(
110+
context,
111+
(ProjectableDecodingFormat<DeserializationSchema<RowData>>) decodingFormat,
112+
tableDataType,
113+
dataTypeProjection,
114+
prefix,
115+
projectedFields,
116+
metadataKeys);
117+
} else {
118+
return Decoder.projectAfterDeserializing(
119+
context,
120+
decodingFormat,
121+
tableDataType,
122+
dataTypeProjection,
123+
prefix,
124+
projectedFields,
125+
metadataKeys);
126+
}
127+
}
128+
}
129+
130+
/** @return a {@link DeserializationSchema} or null. */
131+
@Nullable
132+
public DeserializationSchema<RowData> getDeserializationSchema() {
133+
return deserializationSchema;
134+
}
135+
136+
/** @return a {@link Projector}. */
137+
public Projector getProjector() {
138+
return projector;
139+
}
140+
141+
private static Decoder noDeserializationOrProjection() {
142+
return new Decoder(null, new ProjectorImpl(Collections.emptyMap()));
143+
}
144+
145+
private static DataType toDataType(
146+
final DataType tableDataType,
147+
final int[] dataTypeProjection,
148+
final @Nullable String prefix) {
149+
final DataType temp = Projection.of(dataTypeProjection).project(tableDataType);
150+
return Optional.ofNullable(prefix)
151+
.map(s -> TableDataTypeUtils.stripRowPrefix(temp, s))
152+
.orElse(temp);
153+
}
154+
155+
private static Map<Integer, Integer> tableToDeserializedTopLevelPos(
156+
final int[] dataTypeProjection) {
157+
final HashMap<Integer, Integer> tableToDeserializedPos = new HashMap<>();
158+
for (int i = 0; i < dataTypeProjection.length; i++) {
159+
tableToDeserializedPos.put(dataTypeProjection[i], i);
160+
}
161+
return tableToDeserializedPos;
162+
}
163+
164+
private static int[] copyArray(final int[] arr) {
165+
return Arrays.copyOf(arr, arr.length);
166+
}
167+
168+
private static void addMetadataProjections(
169+
final DecodingFormat<?> decodingFormat,
170+
final int deserializedSize,
171+
final int physicalSize,
172+
final List<String> requestedMetadataKeys,
173+
final Map<List<Integer>, Integer> deserializedToProducedPos) {
174+
175+
if (!requestedMetadataKeys.isEmpty()) {
176+
decodingFormat.applyReadableMetadata(requestedMetadataKeys);
177+
178+
// project only requested metadata keys
179+
for (int i = 0; i < requestedMetadataKeys.size(); i++) {
180+
// metadata is always added to the end of the deserialized row by the DecodingFormat
181+
final int deserializedPos = deserializedSize + i;
182+
// we need to always add metadata to the end of the produced row
183+
final int producePos = physicalSize + i;
184+
deserializedToProducedPos.put(
185+
Collections.singletonList(deserializedPos), producePos);
186+
}
187+
}
188+
}
189+
190+
/**
191+
* This method generates a {@link Decoder} which pushes projections down directly into the
192+
* {@link ProjectableDecodingFormat} which takes care of projecting the fields during the
193+
* deserialization process itself.
194+
*/
195+
private static Decoder projectInsideDeserializer(
196+
final Context context,
197+
final ProjectableDecodingFormat<DeserializationSchema<RowData>>
198+
projectableDecodingFormat,
199+
final DataType tableDataType,
200+
final int[] dataTypeProjection,
201+
final @Nullable String prefix,
202+
final int[][] projectedFields,
203+
final List<String> metadataKeys) {
204+
final Map<Integer, Integer> tableToDeserializedTopLevelPos =
205+
tableToDeserializedTopLevelPos(dataTypeProjection);
206+
207+
final List<int[]> deserializerProjectedFields = new ArrayList<>();
208+
final Map<List<Integer>, Integer> deserializedToProducedPos = new HashMap<>();
209+
for (int producedPos = 0; producedPos < projectedFields.length; producedPos++) {
210+
final int[] tablePos = projectedFields[producedPos];
211+
final int tableTopLevelPos = tablePos[0];
212+
213+
final Integer dataTypeTopLevelPos =
214+
tableToDeserializedTopLevelPos.get(tableTopLevelPos);
215+
if (dataTypeTopLevelPos != null) {
216+
final int[] dataTypePos = copyArray(tablePos);
217+
dataTypePos[0] = dataTypeTopLevelPos;
218+
219+
deserializerProjectedFields.add(dataTypePos);
220+
221+
final int deserializedPos = deserializerProjectedFields.size() - 1;
222+
deserializedToProducedPos.put(
223+
Collections.singletonList(deserializedPos), producedPos);
224+
}
225+
}
226+
227+
addMetadataProjections(
228+
projectableDecodingFormat,
229+
deserializerProjectedFields.size(),
230+
projectedFields.length,
231+
metadataKeys,
232+
deserializedToProducedPos);
233+
234+
return new Decoder(
235+
projectableDecodingFormat.createRuntimeDecoder(
236+
context,
237+
toDataType(tableDataType, dataTypeProjection, prefix),
238+
deserializerProjectedFields.toArray(
239+
new int[deserializerProjectedFields.size()][])),
240+
new ProjectorImpl(deserializedToProducedPos));
241+
}
242+
243+
/**
244+
* This method generates a {@link Decoder} which deserializes the data fully using the {@link
245+
* DecodingFormat} and then applies any projections afterward.
246+
*/
247+
private static Decoder projectAfterDeserializing(
248+
final Context context,
249+
final DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
250+
final DataType tableDataType,
251+
final int[] dataTypeProjection,
252+
final @Nullable String prefix,
253+
final int[][] projectedFields,
254+
final List<String> metadataKeys) {
255+
final DataType dataType = toDataType(tableDataType, dataTypeProjection, prefix);
256+
final Map<Integer, Integer> tableToDeserializedTopLevelPos =
257+
tableToDeserializedTopLevelPos(dataTypeProjection);
258+
259+
final Map<List<Integer>, Integer> deserializedToProducedPos = new HashMap<>();
260+
for (int producedPos = 0; producedPos < projectedFields.length; producedPos++) {
261+
final int[] tablePos = projectedFields[producedPos];
262+
int tableTopLevelPos = tablePos[0];
263+
264+
final Integer deserializedTopLevelPos =
265+
tableToDeserializedTopLevelPos.get(tableTopLevelPos);
266+
if (deserializedTopLevelPos != null) {
267+
final int[] deserializedPos = copyArray(tablePos);
268+
deserializedPos[0] = deserializedTopLevelPos;
269+
270+
deserializedToProducedPos.put(
271+
Collections.unmodifiableList(
272+
Arrays.stream(deserializedPos)
273+
.boxed()
274+
.collect(Collectors.toList())),
275+
producedPos);
276+
}
277+
}
278+
279+
addMetadataProjections(
280+
decodingFormat,
281+
dataTypeProjection.length,
282+
projectedFields.length,
283+
metadataKeys,
284+
deserializedToProducedPos);
285+
286+
return new Decoder(
287+
decodingFormat.createRuntimeDecoder(context, dataType),
288+
new ProjectorImpl(deserializedToProducedPos));
289+
}
290+
291+
/** Projects fields from the deserialized row to their positions in the final produced row. */
292+
@Internal
293+
public interface Projector extends Serializable {
294+
/** Returns true if {@link #project} will not project any fields. */
295+
boolean isEmptyProjection();
296+
297+
/**
298+
* Returns true if deserialized positions are different from the final produced row
299+
* positions.
300+
*/
301+
boolean isProjectionNeeded();
302+
303+
/** Copies fields from the deserialized row to their final positions in the produced row. */
304+
void project(final RowData deserialized, final GenericRowData producedRow);
305+
}
306+
307+
private static class ProjectorImpl implements Projector {
308+
309+
private final Map<List<Integer>, Integer> deserializedToProducedPos;
310+
private final boolean isProjectionNeeded;
311+
312+
ProjectorImpl(final Map<List<Integer>, Integer> deserializedToProducedPos) {
313+
this.deserializedToProducedPos = deserializedToProducedPos;
314+
this.isProjectionNeeded =
315+
!deserializedToProducedPos.entrySet().stream()
316+
.allMatch(
317+
entry -> {
318+
final List<Integer> deserializedPos = entry.getKey();
319+
final List<Integer> producedPos =
320+
Collections.singletonList(entry.getValue());
321+
return Objects.equals(producedPos, deserializedPos);
322+
});
323+
}
324+
325+
@Override
326+
public boolean isEmptyProjection() {
327+
return deserializedToProducedPos.isEmpty();
328+
}
329+
330+
@Override
331+
public boolean isProjectionNeeded() {
332+
return isProjectionNeeded;
333+
}
334+
335+
@Override
336+
public void project(final RowData deserialized, final GenericRowData producedRow) {
337+
this.deserializedToProducedPos.forEach(
338+
(deserializedPos, targetPos) -> {
339+
Object value = deserialized;
340+
for (final Integer i : deserializedPos) {
341+
value = ((GenericRowData) value).getField(i);
342+
}
343+
producedRow.setField(targetPos, value);
344+
});
345+
}
346+
}
347+
}

0 commit comments

Comments
 (0)