diff --git a/docs/content.zh/docs/connectors/table/jdbc.md b/docs/content.zh/docs/connectors/table/jdbc.md index 8390c3525..dd759c7d8 100644 --- a/docs/content.zh/docs/connectors/table/jdbc.md +++ b/docs/content.zh/docs/connectors/table/jdbc.md @@ -212,6 +212,14 @@ ON myTopic.key = MyUserTable.id; 它决定了每个语句是否在事务中自动提交。有些 JDBC 驱动程序,特别是 Postgres,可能需要将此设置为 false 以便流化结果。 + +
scan.parallelism
+ optional + no + (none) + Integer + 定义 JDBC source 算子的并行度。默认情况下会使用全局默认并发。 +
lookup.cache
可选 diff --git a/docs/content/docs/connectors/table/jdbc.md b/docs/content/docs/connectors/table/jdbc.md index b0ef84eb1..9d71be316 100644 --- a/docs/content/docs/connectors/table/jdbc.md +++ b/docs/content/docs/connectors/table/jdbc.md @@ -222,6 +222,14 @@ Connector Options which determines whether each statement is committed in a transaction automatically. Some JDBC drivers, specifically Postgres, may require this to be set to false in order to stream results. + +
scan.parallelism
+ optional + no + (none) + Integer + Defines the parallelism of the JDBC source operator. If not set, the global default parallelism is used. +
lookup.cache
optional diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcConnectorOptions.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcConnectorOptions.java index 37d5362ce..9eaa85272 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcConnectorOptions.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcConnectorOptions.java @@ -80,6 +80,8 @@ public class JdbcConnectorOptions { // Scan options // ----------------------------------------------------------------------------------------- + public static final ConfigOption SCAN_PARALLELISM = FactoryUtil.SOURCE_PARALLELISM; + public static final ConfigOption SCAN_PARTITION_COLUMN = ConfigOptions.key("scan.partition.column") .stringType() diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java index e1c732eed..e23aeffb8 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java @@ -63,6 +63,7 @@ import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.PASSWORD; import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SCAN_AUTO_COMMIT; import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SCAN_FETCH_SIZE; +import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SCAN_PARALLELISM; import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SCAN_PARTITION_COLUMN; import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SCAN_PARTITION_LOWER_BOUND; import static org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.SCAN_PARTITION_NUM; @@ -167,6 +168,7 @@ private JdbcReadOptions getJdbcReadOptions(ReadableConfig readableConfig) { final Optional partitionColumnName = readableConfig.getOptional(SCAN_PARTITION_COLUMN); final JdbcReadOptions.Builder builder = JdbcReadOptions.builder(); + readableConfig.getOptional(SCAN_PARALLELISM).ifPresent(builder::setParallelism); if (partitionColumnName.isPresent()) { builder.setPartitionColumnName(partitionColumnName.get()); builder.setPartitionLowerBound(readableConfig.get(SCAN_PARTITION_LOWER_BOUND)); @@ -243,6 +245,7 @@ public Set> optionalOptions() { optionalOptions.add(COMPATIBLE_MODE); optionalOptions.add(USERNAME); optionalOptions.add(PASSWORD); + optionalOptions.add(SCAN_PARALLELISM); optionalOptions.add(SCAN_PARTITION_COLUMN); optionalOptions.add(SCAN_PARTITION_LOWER_BOUND); optionalOptions.add(SCAN_PARTITION_UPPER_BOUND); @@ -316,6 +319,13 @@ private void validateConfigOptions(ReadableConfig config, ClassLoader classLoade } } + if (config.getOptional(SCAN_PARALLELISM).isPresent() && config.get(SCAN_PARALLELISM) <= 0) { + throw new IllegalArgumentException( + String.format( + "The value of '%s' option should be positive, but is %s.", + SCAN_PARALLELISM.key(), config.get(SCAN_PARALLELISM))); + } + checkAllOrNone(config, new ConfigOption[] {LOOKUP_CACHE_MAX_ROWS, LOOKUP_CACHE_TTL}); if (config.get(LOOKUP_MAX_RETRIES) < 0) { diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java index d26898a53..e4947b1d8 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java @@ -190,7 +190,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon builder.setRowDataTypeInfo( runtimeProviderContext.createTypeInformation(physicalRowDataType)); - return InputFormatProvider.of(builder.build()); + return InputFormatProvider.of(builder.build(), readOptions.getParallelism()); } @Override diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcReadOptions.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcReadOptions.java index 064eb2a48..4781dd529 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcReadOptions.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcReadOptions.java @@ -33,6 +33,7 @@ public class JdbcReadOptions implements Serializable { private final int fetchSize; private final boolean autoCommit; + private final Integer parallelism; private JdbcReadOptions( String query, @@ -41,7 +42,8 @@ private JdbcReadOptions( Long partitionUpperBound, Integer numPartitions, int fetchSize, - boolean autoCommit) { + boolean autoCommit, + Integer parallelism) { this.query = query; this.partitionColumnName = partitionColumnName; this.partitionLowerBound = partitionLowerBound; @@ -50,6 +52,7 @@ private JdbcReadOptions( this.fetchSize = fetchSize; this.autoCommit = autoCommit; + this.parallelism = parallelism; } public Optional getQuery() { @@ -80,6 +83,10 @@ public boolean getAutoCommit() { return autoCommit; } + public Integer getParallelism() { + return parallelism; + } + public static Builder builder() { return new Builder(); } @@ -94,7 +101,8 @@ public boolean equals(Object o) { && Objects.equals(partitionUpperBound, options.partitionUpperBound) && Objects.equals(numPartitions, options.numPartitions) && Objects.equals(fetchSize, options.fetchSize) - && Objects.equals(autoCommit, options.autoCommit); + && Objects.equals(autoCommit, options.autoCommit) + && Objects.equals(parallelism, options.parallelism); } else { return false; } @@ -110,6 +118,7 @@ public static class Builder { protected int fetchSize = 0; protected boolean autoCommit = true; + protected Integer scanParallelism; /** optional, SQL query statement for this JDBC source. */ public Builder setQuery(String query) { @@ -159,6 +168,12 @@ public Builder setAutoCommit(boolean autoCommit) { return this; } + /** optional, source scan parallelism. */ + public Builder setParallelism(int scanParallelism) { + this.scanParallelism = scanParallelism; + return this; + } + public JdbcReadOptions build() { return new JdbcReadOptions( query, @@ -167,7 +182,8 @@ public JdbcReadOptions build() { partitionUpperBound, numPartitions, fetchSize, - autoCommit); + autoCommit, + scanParallelism); } } } diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactoryTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactoryTest.java index 4e799a62d..135d7e5c3 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactoryTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactoryTest.java @@ -125,6 +125,7 @@ void testJdbcReadProperties() { properties.put("scan.partition.num", "10"); properties.put("scan.fetch-size", "20"); properties.put("scan.auto-commit", "false"); + properties.put("scan.parallelism", "13"); DynamicTableSource actual = createTableSource(SCHEMA, properties); @@ -141,6 +142,7 @@ void testJdbcReadProperties() { .setNumPartitions(10) .setFetchSize(20) .setAutoCommit(false) + .setParallelism(13) .build(); JdbcDynamicTableSource expected = new JdbcDynamicTableSource( @@ -366,6 +368,14 @@ void testJdbcValidation() { assertThatThrownBy(() -> createTableSource(SCHEMA, finalProperties7)) .hasStackTraceContaining( "The value of 'connection.max-retry-timeout' option must be in second granularity and shouldn't be smaller than 1 second, but is 100ms."); + + // scan parallelism > = 1 + properties = getAllOptions(); + properties.put("scan.parallelism", "-1"); + Map finalProperties8 = properties; + assertThatThrownBy(() -> createTableSource(SCHEMA, finalProperties8)) + .hasStackTraceContaining( + "The value of 'scan.parallelism' option should be positive, but is -1."); } @Test