Skip to content

Commit a52f15a

Browse files
X-czhlibenchao
authored andcommitted
[FLINK-33265] Support source parallelism setting for Kafka connector
Close apache/flink-connector-kafka#134
1 parent f4015d1 commit a52f15a

File tree

10 files changed

+170
-20
lines changed

10 files changed

+170
-20
lines changed

docs/content.zh/docs/connectors/table/kafka.md

+8
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,14 @@ CREATE TABLE KafkaTable (
342342
<td>Duration</td>
343343
<td>Consumer 定期探测动态创建的 Kafka topic 和 partition 的时间间隔。需要显式地设置'scan.topic-partition-discovery.interval'为0才能关闭此功能</td>
344344
</tr>
345+
<tr>
346+
<td><h5>scan.parallelism</h5></td>
347+
<td>optional</td>
348+
<td>no</td>
349+
<td style="word-wrap: break-word;">(none)</td>
350+
<td>Integer</td>
351+
<td>定义 Kafka source 算子的并行度。默认情况下会使用全局默认并行度。</td>
352+
</tr>
345353
<tr>
346354
<td><h5>sink.partitioner</h5></td>
347355
<td>可选</td>

docs/content.zh/docs/connectors/table/upsert-kafka.md

+8
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,14 @@ of all available metadata fields.
180180
</ul>
181181
</td>
182182
</tr>
183+
<tr>
184+
<td><h5>scan.parallelism</h5></td>
185+
<td>optional</td>
186+
<td>no</td>
187+
<td style="word-wrap: break-word;">(none)</td>
188+
<td>Integer</td>
189+
<td>定义 upsert-kafka source 算子的并行度。默认情况下会使用全局默认并行度。</td>
190+
</tr>
183191
<tr>
184192
<td><h5>sink.parallelism</h5></td>
185193
<td>可选</td>

docs/content/docs/connectors/table/kafka.md

+8
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,14 @@ Connector Options
369369
<td>Duration</td>
370370
<td>Interval for consumer to discover dynamically created Kafka topics and partitions periodically. To disable this feature, you need to explicitly set the 'scan.topic-partition-discovery.interval' value to 0.</td>
371371
</tr>
372+
<tr>
373+
<td><h5>scan.parallelism</h5></td>
374+
<td>optional</td>
375+
<td>no</td>
376+
<td style="word-wrap: break-word;">(none)</td>
377+
<td>Integer</td>
378+
<td>Defines the parallelism of the Kafka source operator. If not set, the global default parallelism is used.</td>
379+
</tr>
372380
<tr>
373381
<td><h5>sink.partitioner</h5></td>
374382
<td>optional</td>

docs/content/docs/connectors/table/upsert-kafka.md

+8
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,14 @@ Connector Options
192192
format which means that key columns appear in the data type for both the key and value format.
193193
</td>
194194
</tr>
195+
<tr>
196+
<td><h5>scan.parallelism</h5></td>
197+
<td>optional</td>
198+
<td>no</td>
199+
<td style="word-wrap: break-word;">(none)</td>
200+
<td>Integer</td>
201+
<td>Defines the parallelism of the upsert-kafka source operator. If not set, the global default parallelism is used.</td>
202+
</tr>
195203
<tr>
196204
<td><h5>sink.parallelism</h5></td>
197205
<td>optional</td>

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java

+1
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ public class KafkaConnectorOptions {
106106
ValueFieldsStrategy.EXCEPT_KEY))
107107
.build());
108108

109+
public static final ConfigOption<Integer> SCAN_PARALLELISM = FactoryUtil.SOURCE_PARALLELISM;
109110
public static final ConfigOption<Integer> SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM;
110111

111112
// --------------------------------------------------------------------------------------------

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java

+18-4
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import java.util.Locale;
7272
import java.util.Map;
7373
import java.util.Objects;
74+
import java.util.Optional;
7475
import java.util.Properties;
7576
import java.util.regex.Pattern;
7677
import java.util.stream.Collectors;
@@ -171,6 +172,9 @@ public class KafkaDynamicSource
171172

172173
protected final String tableIdentifier;
173174

175+
/** Parallelism of the physical Kafka consumer. * */
176+
protected final @Nullable Integer parallelism;
177+
174178
public KafkaDynamicSource(
175179
DataType physicalDataType,
176180
@Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
@@ -188,7 +192,8 @@ public KafkaDynamicSource(
188192
Map<KafkaTopicPartition, Long> specificBoundedOffsets,
189193
long boundedTimestampMillis,
190194
boolean upsertMode,
191-
String tableIdentifier) {
195+
String tableIdentifier,
196+
@Nullable Integer parallelism) {
192197
// Format attributes
193198
this.physicalDataType =
194199
Preconditions.checkNotNull(
@@ -228,6 +233,7 @@ public KafkaDynamicSource(
228233
this.boundedTimestampMillis = boundedTimestampMillis;
229234
this.upsertMode = upsertMode;
230235
this.tableIdentifier = tableIdentifier;
236+
this.parallelism = parallelism;
231237
}
232238

233239
@Override
@@ -267,6 +273,11 @@ public DataStream<RowData> produceDataStream(
267273
public boolean isBounded() {
268274
return kafkaSource.getBoundedness() == Boundedness.BOUNDED;
269275
}
276+
277+
@Override
278+
public Optional<Integer> getParallelism() {
279+
return Optional.ofNullable(parallelism);
280+
}
270281
};
271282
}
272283

@@ -344,7 +355,8 @@ public DynamicTableSource copy() {
344355
specificBoundedOffsets,
345356
boundedTimestampMillis,
346357
upsertMode,
347-
tableIdentifier);
358+
tableIdentifier,
359+
parallelism);
348360
copy.producedDataType = producedDataType;
349361
copy.metadataKeys = metadataKeys;
350362
copy.watermarkStrategy = watermarkStrategy;
@@ -384,7 +396,8 @@ public boolean equals(Object o) {
384396
&& boundedTimestampMillis == that.boundedTimestampMillis
385397
&& Objects.equals(upsertMode, that.upsertMode)
386398
&& Objects.equals(tableIdentifier, that.tableIdentifier)
387-
&& Objects.equals(watermarkStrategy, that.watermarkStrategy);
399+
&& Objects.equals(watermarkStrategy, that.watermarkStrategy)
400+
&& Objects.equals(parallelism, that.parallelism);
388401
}
389402

390403
@Override
@@ -409,7 +422,8 @@ public int hashCode() {
409422
boundedTimestampMillis,
410423
upsertMode,
411424
tableIdentifier,
412-
watermarkStrategy);
425+
watermarkStrategy,
426+
parallelism);
413427
}
414428

415429
// --------------------------------------------------------------------------------------------

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java

+11-3
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE;
7575
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS;
7676
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS;
77+
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_PARALLELISM;
7778
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE;
7879
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
7980
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
@@ -152,6 +153,7 @@ public Set<ConfigOption<?>> optionalOptions() {
152153
options.add(SCAN_BOUNDED_MODE);
153154
options.add(SCAN_BOUNDED_SPECIFIC_OFFSETS);
154155
options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS);
156+
options.add(SCAN_PARALLELISM);
155157
return options;
156158
}
157159

@@ -166,6 +168,7 @@ public Set<ConfigOption<?>> forwardOptions() {
166168
SCAN_STARTUP_SPECIFIC_OFFSETS,
167169
SCAN_TOPIC_PARTITION_DISCOVERY,
168170
SCAN_STARTUP_TIMESTAMP_MILLIS,
171+
SCAN_PARALLELISM,
169172
SINK_PARTITIONER,
170173
SINK_PARALLELISM,
171174
TRANSACTIONAL_ID_PREFIX)
@@ -215,6 +218,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
215218

216219
final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
217220

221+
final Integer parallelism = tableOptions.getOptional(SCAN_PARALLELISM).orElse(null);
222+
218223
return createKafkaTableSource(
219224
physicalDataType,
220225
keyDecodingFormat.orElse(null),
@@ -231,7 +236,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
231236
boundedOptions.boundedMode,
232237
boundedOptions.specificOffsets,
233238
boundedOptions.boundedTimestampMillis,
234-
context.getObjectIdentifier().asSummaryString());
239+
context.getObjectIdentifier().asSummaryString(),
240+
parallelism);
235241
}
236242

237243
@Override
@@ -396,7 +402,8 @@ protected KafkaDynamicSource createKafkaTableSource(
396402
BoundedMode boundedMode,
397403
Map<KafkaTopicPartition, Long> specificEndOffsets,
398404
long endTimestampMillis,
399-
String tableIdentifier) {
405+
String tableIdentifier,
406+
Integer parallelism) {
400407
return new KafkaDynamicSource(
401408
physicalDataType,
402409
keyDecodingFormat,
@@ -414,7 +421,8 @@ protected KafkaDynamicSource createKafkaTableSource(
414421
specificEndOffsets,
415422
endTimestampMillis,
416423
false,
417-
tableIdentifier);
424+
tableIdentifier,
425+
parallelism);
418426
}
419427

420428
protected KafkaDynamicSink createKafkaTableSink(

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE;
6363
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS;
6464
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS;
65+
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_PARALLELISM;
6566
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL;
6667
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
6768
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_PARALLELISM;
@@ -115,6 +116,7 @@ public Set<ConfigOption<?>> optionalOptions() {
115116
options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS);
116117
options.add(DELIVERY_GUARANTEE);
117118
options.add(TRANSACTIONAL_ID_PREFIX);
119+
options.add(SCAN_PARALLELISM);
118120
return options;
119121
}
120122

@@ -150,6 +152,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
150152

151153
final BoundedOptions boundedOptions = getBoundedOptions(tableOptions);
152154

155+
Integer parallelism = tableOptions.get(SCAN_PARALLELISM);
156+
153157
return new KafkaDynamicSource(
154158
context.getPhysicalRowDataType(),
155159
keyDecodingFormat,
@@ -167,7 +171,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
167171
boundedOptions.specificOffsets,
168172
boundedOptions.boundedTimestampMillis,
169173
true,
170-
context.getObjectIdentifier().asSummaryString());
174+
context.getObjectIdentifier().asSummaryString(),
175+
parallelism);
171176
}
172177

173178
@Override

flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java

+59-8
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@
101101
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.AVRO_CONFLUENT;
102102
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.DEBEZIUM_AVRO_CONFLUENT;
103103
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX;
104+
import static org.apache.flink.table.factories.FactoryUtil.SOURCE_PARALLELISM;
104105
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
105106
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
106107
import static org.assertj.core.api.Assertions.assertThat;
@@ -212,14 +213,57 @@ public void testTableSource() {
212213
KAFKA_SOURCE_PROPERTIES,
213214
StartupMode.SPECIFIC_OFFSETS,
214215
specificOffsets,
215-
0);
216+
0,
217+
null);
216218
assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource);
217219

218220
ScanTableSource.ScanRuntimeProvider provider =
219221
actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
220222
assertKafkaSource(provider);
221223
}
222224

225+
@Test
226+
public void testTableSourceWithParallelism() {
227+
final Map<String, String> modifiedOptions =
228+
getModifiedOptions(
229+
getBasicSourceOptions(),
230+
options -> options.put(SOURCE_PARALLELISM.key(), "100"));
231+
final DynamicTableSource actualSource = createTableSource(SCHEMA, modifiedOptions);
232+
final KafkaDynamicSource actualKafkaSource = (KafkaDynamicSource) actualSource;
233+
234+
final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
235+
specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_0), OFFSET_0);
236+
specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_1), OFFSET_1);
237+
238+
final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat =
239+
new DecodingFormatMock(",", true);
240+
241+
// Test scan source equals
242+
final KafkaDynamicSource expectedKafkaSource =
243+
createExpectedScanSource(
244+
SCHEMA_DATA_TYPE,
245+
null,
246+
valueDecodingFormat,
247+
new int[0],
248+
new int[] {0, 1, 2},
249+
null,
250+
Collections.singletonList(TOPIC),
251+
null,
252+
KAFKA_SOURCE_PROPERTIES,
253+
StartupMode.SPECIFIC_OFFSETS,
254+
specificOffsets,
255+
0,
256+
100);
257+
assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource);
258+
259+
ScanTableSource.ScanRuntimeProvider provider =
260+
actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
261+
assertThat(provider).isInstanceOf(DataStreamScanProvider.class);
262+
final DataStreamScanProvider sourceProvider = (DataStreamScanProvider) provider;
263+
assertThat(sourceProvider.getParallelism()).isPresent();
264+
assertThat(sourceProvider.getParallelism()).hasValue(100);
265+
}
266+
223267
@Test
224268
public void testTableSourceWithPattern() {
225269
final Map<String, String> modifiedOptions =
@@ -254,7 +298,8 @@ public void testTableSourceWithPattern() {
254298
KAFKA_SOURCE_PROPERTIES,
255299
StartupMode.EARLIEST,
256300
specificOffsets,
257-
0);
301+
0,
302+
null);
258303
final KafkaDynamicSource actualKafkaSource = (KafkaDynamicSource) actualSource;
259304
assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource);
260305

@@ -295,7 +340,8 @@ public void testTableSourceWithKeyValue() {
295340
KAFKA_FINAL_SOURCE_PROPERTIES,
296341
StartupMode.GROUP_OFFSETS,
297342
Collections.emptyMap(),
298-
0);
343+
0,
344+
null);
299345

300346
assertThat(actualSource).isEqualTo(expectedKafkaSource);
301347
}
@@ -346,7 +392,8 @@ public void testTableSourceWithKeyValueAndMetadata() {
346392
KAFKA_FINAL_SOURCE_PROPERTIES,
347393
StartupMode.GROUP_OFFSETS,
348394
Collections.emptyMap(),
349-
0);
395+
0,
396+
null);
350397
expectedKafkaSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
351398
expectedKafkaSource.metadataKeys = Collections.singletonList("timestamp");
352399

@@ -1188,7 +1235,8 @@ public void testDiscoverPartitionByDefault() {
11881235
props,
11891236
StartupMode.SPECIFIC_OFFSETS,
11901237
specificOffsets,
1191-
0);
1238+
0,
1239+
null);
11921240
assertThat(actualSource).isEqualTo(expectedKafkaSource);
11931241
ScanTableSource.ScanRuntimeProvider provider =
11941242
actualSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
@@ -1226,7 +1274,8 @@ public void testDisableDiscoverPartition() {
12261274
props,
12271275
StartupMode.SPECIFIC_OFFSETS,
12281276
specificOffsets,
1229-
0);
1277+
0,
1278+
null);
12301279
assertThat(actualSource).isEqualTo(expectedKafkaSource);
12311280
ScanTableSource.ScanRuntimeProvider provider =
12321281
actualSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
@@ -1249,7 +1298,8 @@ private static KafkaDynamicSource createExpectedScanSource(
12491298
Properties properties,
12501299
StartupMode startupMode,
12511300
Map<KafkaTopicPartition, Long> specificStartupOffsets,
1252-
long startupTimestampMillis) {
1301+
long startupTimestampMillis,
1302+
@Nullable Integer parallelism) {
12531303
return new KafkaDynamicSource(
12541304
physicalDataType,
12551305
keyDecodingFormat,
@@ -1267,7 +1317,8 @@ private static KafkaDynamicSource createExpectedScanSource(
12671317
Collections.emptyMap(),
12681318
0,
12691319
false,
1270-
FactoryMocks.IDENTIFIER.asSummaryString());
1320+
FactoryMocks.IDENTIFIER.asSummaryString(),
1321+
parallelism);
12711322
}
12721323

12731324
private static KafkaDynamicSink createExpectedSink(

0 commit comments

Comments
 (0)