diff --git a/docs/content.zh/docs/connectors/table/jdbc.md b/docs/content.zh/docs/connectors/table/jdbc.md
index 8390c3525..dd759c7d8 100644
--- a/docs/content.zh/docs/connectors/table/jdbc.md
+++ b/docs/content.zh/docs/connectors/table/jdbc.md
@@ -212,6 +212,14 @@ ON myTopic.key = MyUserTable.id;
它决定了每个语句是否在事务中自动提交。有些 JDBC 驱动程序,特别是
Postgres,可能需要将此设置为 false 以便流化结果。
+
+ scan.parallelism |
+ optional |
+ no |
+ (none) |
+ Integer |
+ 定义 JDBC source 算子的并行度。默认情况下会使用全局默认并发。 |
+
lookup.cache |
可选 |
diff --git a/docs/content/docs/connectors/table/jdbc.md b/docs/content/docs/connectors/table/jdbc.md
index b0ef84eb1..9d71be316 100644
--- a/docs/content/docs/connectors/table/jdbc.md
+++ b/docs/content/docs/connectors/table/jdbc.md
@@ -222,6 +222,14 @@ Connector Options
which determines whether each statement is committed in a transaction automatically. Some JDBC drivers, specifically
Postgres, may require this to be set to false in order to stream results.
+
+ scan.parallelism |
+ optional |
+ no |
+ (none) |
+ Integer |
+ Defines the parallelism of the JDBC source operator. If not set, the global default parallelism is used. |
+
lookup.cache |
optional |
diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcConnectorOptions.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcConnectorOptions.java
index 37d5362ce..9eaa85272 100644
--- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcConnectorOptions.java
+++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcConnectorOptions.java
@@ -80,6 +80,8 @@ public class JdbcConnectorOptions {
// Scan options
// -----------------------------------------------------------------------------------------
+ public static final ConfigOption SCAN_PARALLELISM = FactoryUtil.SOURCE_PARALLELISM;
+
public static final ConfigOption SCAN_PARTITION_COLUMN =
ConfigOptions.key("scan.partition.column")
.stringType()
diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java
index e1c732eed..e23aeffb8 100644
--- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java
+++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java
@@ -63,6 +63,7 @@
import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.PASSWORD;
import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SCAN_AUTO_COMMIT;
import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SCAN_FETCH_SIZE;
+import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SCAN_PARALLELISM;
import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SCAN_PARTITION_COLUMN;
import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SCAN_PARTITION_LOWER_BOUND;
import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SCAN_PARTITION_NUM;
@@ -167,6 +168,7 @@ private JdbcReadOptions getJdbcReadOptions(ReadableConfig readableConfig) {
final Optional partitionColumnName =
readableConfig.getOptional(SCAN_PARTITION_COLUMN);
final JdbcReadOptions.Builder builder = JdbcReadOptions.builder();
+ readableConfig.getOptional(SCAN_PARALLELISM).ifPresent(builder::setParallelism);
if (partitionColumnName.isPresent()) {
builder.setPartitionColumnName(partitionColumnName.get());
builder.setPartitionLowerBound(readableConfig.get(SCAN_PARTITION_LOWER_BOUND));
@@ -243,6 +245,7 @@ public Set> optionalOptions() {
optionalOptions.add(COMPATIBLE_MODE);
optionalOptions.add(USERNAME);
optionalOptions.add(PASSWORD);
+ optionalOptions.add(SCAN_PARALLELISM);
optionalOptions.add(SCAN_PARTITION_COLUMN);
optionalOptions.add(SCAN_PARTITION_LOWER_BOUND);
optionalOptions.add(SCAN_PARTITION_UPPER_BOUND);
@@ -316,6 +319,13 @@ private void validateConfigOptions(ReadableConfig config, ClassLoader classLoade
}
}
+ if (config.getOptional(SCAN_PARALLELISM).isPresent() && config.get(SCAN_PARALLELISM) <= 0) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The value of '%s' option should be positive, but is %s.",
+ SCAN_PARALLELISM.key(), config.get(SCAN_PARALLELISM)));
+ }
+
checkAllOrNone(config, new ConfigOption[] {LOOKUP_CACHE_MAX_ROWS, LOOKUP_CACHE_TTL});
if (config.get(LOOKUP_MAX_RETRIES) < 0) {
diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java
index d26898a53..e4947b1d8 100644
--- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java
+++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java
@@ -190,7 +190,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
builder.setRowDataTypeInfo(
runtimeProviderContext.createTypeInformation(physicalRowDataType));
- return InputFormatProvider.of(builder.build());
+ return InputFormatProvider.of(builder.build(), readOptions.getParallelism());
}
@Override
diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcReadOptions.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcReadOptions.java
index 064eb2a48..4781dd529 100644
--- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcReadOptions.java
+++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcReadOptions.java
@@ -33,6 +33,7 @@ public class JdbcReadOptions implements Serializable {
private final int fetchSize;
private final boolean autoCommit;
+ private final Integer parallelism;
private JdbcReadOptions(
String query,
@@ -41,7 +42,8 @@ private JdbcReadOptions(
Long partitionUpperBound,
Integer numPartitions,
int fetchSize,
- boolean autoCommit) {
+ boolean autoCommit,
+ Integer parallelism) {
this.query = query;
this.partitionColumnName = partitionColumnName;
this.partitionLowerBound = partitionLowerBound;
@@ -50,6 +52,7 @@ private JdbcReadOptions(
this.fetchSize = fetchSize;
this.autoCommit = autoCommit;
+ this.parallelism = parallelism;
}
public Optional getQuery() {
@@ -80,6 +83,10 @@ public boolean getAutoCommit() {
return autoCommit;
}
+ public Integer getParallelism() {
+ return parallelism;
+ }
+
public static Builder builder() {
return new Builder();
}
@@ -94,7 +101,8 @@ public boolean equals(Object o) {
&& Objects.equals(partitionUpperBound, options.partitionUpperBound)
&& Objects.equals(numPartitions, options.numPartitions)
&& Objects.equals(fetchSize, options.fetchSize)
- && Objects.equals(autoCommit, options.autoCommit);
+ && Objects.equals(autoCommit, options.autoCommit)
+ && Objects.equals(parallelism, options.parallelism);
} else {
return false;
}
@@ -110,6 +118,7 @@ public static class Builder {
protected int fetchSize = 0;
protected boolean autoCommit = true;
+ protected Integer scanParallelism;
/** optional, SQL query statement for this JDBC source. */
public Builder setQuery(String query) {
@@ -159,6 +168,12 @@ public Builder setAutoCommit(boolean autoCommit) {
return this;
}
+ /** optional, source scan parallelism. */
+ public Builder setParallelism(int scanParallelism) {
+ this.scanParallelism = scanParallelism;
+ return this;
+ }
+
public JdbcReadOptions build() {
return new JdbcReadOptions(
query,
@@ -167,7 +182,8 @@ public JdbcReadOptions build() {
partitionUpperBound,
numPartitions,
fetchSize,
- autoCommit);
+ autoCommit,
+ scanParallelism);
}
}
}
diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactoryTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactoryTest.java
index 4e799a62d..135d7e5c3 100644
--- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactoryTest.java
+++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactoryTest.java
@@ -125,6 +125,7 @@ void testJdbcReadProperties() {
properties.put("scan.partition.num", "10");
properties.put("scan.fetch-size", "20");
properties.put("scan.auto-commit", "false");
+ properties.put("scan.parallelism", "13");
DynamicTableSource actual = createTableSource(SCHEMA, properties);
@@ -141,6 +142,7 @@ void testJdbcReadProperties() {
.setNumPartitions(10)
.setFetchSize(20)
.setAutoCommit(false)
+ .setParallelism(13)
.build();
JdbcDynamicTableSource expected =
new JdbcDynamicTableSource(
@@ -366,6 +368,14 @@ void testJdbcValidation() {
assertThatThrownBy(() -> createTableSource(SCHEMA, finalProperties7))
.hasStackTraceContaining(
"The value of 'connection.max-retry-timeout' option must be in second granularity and shouldn't be smaller than 1 second, but is 100ms.");
+
+ // scan parallelism > = 1
+ properties = getAllOptions();
+ properties.put("scan.parallelism", "-1");
+ Map finalProperties8 = properties;
+ assertThatThrownBy(() -> createTableSource(SCHEMA, finalProperties8))
+ .hasStackTraceContaining(
+ "The value of 'scan.parallelism' option should be positive, but is -1.");
}
@Test