diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java index d2c055e42..8efb732e5 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java @@ -162,20 +162,25 @@ public JdbcSqlSplitEnumeratorBase.Provider getSqlSplitEnumerato } @VisibleForTesting - public TypeInformation getTypeInformation() { + TypeInformation getTypeInformation() { return typeInformation; } @VisibleForTesting - public Configuration getConfiguration() { + Configuration getConfiguration() { return configuration; } @VisibleForTesting - public ResultExtractor getResultExtractor() { + ResultExtractor getResultExtractor() { return resultExtractor; } + @VisibleForTesting + JdbcConnectionProvider getConnectionProvider() { + return connectionProvider; + } + @VisibleForTesting @Override public boolean equals(Object o) { diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilder.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilder.java index 32c0f2b71..87b39f401 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilder.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilder.java @@ -267,7 +267,10 @@ public JdbcSourceBuilder setOptionalSqlSplitEnumeratorState( } public JdbcSource build() { - this.connectionProvider = new SimpleJdbcConnectionProvider(connOptionsBuilder.build()); + if (this.connectionProvider == null) { + this.connectionProvider = new SimpleJdbcConnectionProvider(connOptionsBuilder.build()); + } + if (resultSetFetchSize > 0) { this.configuration.set(JdbcSourceOptions.RESULTSET_FETCH_SIZE, resultSetFetchSize); } diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilderTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilderTest.java index 9cdc909d3..d116433b5 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilderTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilderTest.java @@ -20,9 +20,12 @@ import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.core.datastream.source.config.ContinuousUnBoundingSettings; import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator; import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor; +import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; +import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider; import org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider; import org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider; import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider; @@ -181,4 +184,23 @@ void testSetStreamingSemantic() { .isInstanceOf(IllegalArgumentException.class) .hasMessage(JdbcSourceBuilder.INVALID_CONTINUOUS_SLIDE_TIMING_HINT); } + + @Test + void testSetConnectionProvider() { + assertThatThrownBy(() -> JdbcSource.builder().setConnectionProvider(null)) + .isInstanceOf(NullPointerException.class); + + final JdbcConnectionOptions options = + new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() + .withUrl(dbUrl) + .withDriverName(driverName) + .build(); + + final JdbcConnectionProvider connectionProvider = new SimpleJdbcConnectionProvider(options); + + final JdbcSource jdbcSource = + sourceBuilder.setConnectionProvider(connectionProvider).build(); + + assertThat(jdbcSource.getConnectionProvider()).isSameAs(connectionProvider); + } }