From 196a7f6630cfc01643991a5dbb3a56d607056ef8 Mon Sep 17 00:00:00 2001 From: "raju.gupta" Date: Sun, 25 May 2025 19:08:44 +0100 Subject: [PATCH 1/5] FLINK-36236 Fix overriding the connection provider in JdbcSourceBuilder when set and add tests for the same. --- .../core/datastream/source/JdbcSource.java | 11 +++++++--- .../datastream/source/JdbcSourceBuilder.java | 5 ++++- .../source/JdbcSourceBuilderTest.java | 21 +++++++++++++++++++ 3 files changed, 33 insertions(+), 4 deletions(-) diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java index d2c055e42..8efb732e5 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java @@ -162,20 +162,25 @@ public JdbcSqlSplitEnumeratorBase.Provider getSqlSplitEnumerato } @VisibleForTesting - public TypeInformation getTypeInformation() { + TypeInformation getTypeInformation() { return typeInformation; } @VisibleForTesting - public Configuration getConfiguration() { + Configuration getConfiguration() { return configuration; } @VisibleForTesting - public ResultExtractor getResultExtractor() { + ResultExtractor getResultExtractor() { return resultExtractor; } + @VisibleForTesting + JdbcConnectionProvider getConnectionProvider() { + return connectionProvider; + } + @VisibleForTesting @Override public boolean equals(Object o) { diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilder.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilder.java index 32c0f2b71..89cf7ef0d 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilder.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilder.java @@ -267,7 +267,10 @@ public JdbcSourceBuilder setOptionalSqlSplitEnumeratorState( } public JdbcSource build() { - this.connectionProvider = new SimpleJdbcConnectionProvider(connOptionsBuilder.build()); + if(this.connectionProvider == null) { + this.connectionProvider = new SimpleJdbcConnectionProvider(connOptionsBuilder.build()); + } + if (resultSetFetchSize > 0) { this.configuration.set(JdbcSourceOptions.RESULTSET_FETCH_SIZE, resultSetFetchSize); } diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilderTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilderTest.java index 9cdc909d3..ae895f937 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilderTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilderTest.java @@ -20,9 +20,12 @@ import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.core.datastream.source.config.ContinuousUnBoundingSettings; import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator; import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor; +import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; +import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider; import org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider; import org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider; import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider; @@ -181,4 +184,22 @@ void testSetStreamingSemantic() { .isInstanceOf(IllegalArgumentException.class) .hasMessage(JdbcSourceBuilder.INVALID_CONTINUOUS_SLIDE_TIMING_HINT); } + + @Test + void testSetConnectionProvider() { + assertThatThrownBy(() -> JdbcSource.builder().setConnectionProvider(null)) + .isInstanceOf(NullPointerException.class); + + final JdbcConnectionOptions options = + new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() + .withUrl(dbUrl) + .withDriverName(driverName) + .build(); + + final JdbcConnectionProvider connectionProvider = new SimpleJdbcConnectionProvider(options); + + JdbcSource jdbcSource = sourceBuilder.setConnectionProvider(connectionProvider).build(); + + assertThat(jdbcSource.getConnectionProvider()).isSameAs(connectionProvider); + } } From b282371775ec65d62093469bae36c34d30f6e23c Mon Sep 17 00:00:00 2001 From: "raju.gupta" Date: Sun, 25 May 2025 19:19:15 +0100 Subject: [PATCH 2/5] amend --- flink-connector-jdbc-core/.idea/.gitignore | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 flink-connector-jdbc-core/.idea/.gitignore diff --git a/flink-connector-jdbc-core/.idea/.gitignore b/flink-connector-jdbc-core/.idea/.gitignore new file mode 100644 index 000000000..7bc07ec21 --- /dev/null +++ b/flink-connector-jdbc-core/.idea/.gitignore @@ -0,0 +1,10 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Environment-dependent path to Maven home directory +/mavenHomeManager.xml +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml From 887ccdbf461727b575e6c8117c5b2f5791de1308 Mon Sep 17 00:00:00 2001 From: "raju.gupta" Date: Sun, 25 May 2025 19:37:44 +0100 Subject: [PATCH 3/5] FLINK-36236 Fix overriding the connection provider in JdbcSourceBuilder when set and add tests for the same. --- .../jdbc/core/datastream/source/JdbcSourceBuilderTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilderTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilderTest.java index ae895f937..ba48c603f 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilderTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilderTest.java @@ -198,7 +198,7 @@ void testSetConnectionProvider() { final JdbcConnectionProvider connectionProvider = new SimpleJdbcConnectionProvider(options); - JdbcSource jdbcSource = sourceBuilder.setConnectionProvider(connectionProvider).build(); + final JdbcSource jdbcSource = sourceBuilder.setConnectionProvider(connectionProvider).build(); assertThat(jdbcSource.getConnectionProvider()).isSameAs(connectionProvider); } From af34ebb5e69361a0ff422fecf082302a2253ae61 Mon Sep 17 00:00:00 2001 From: "raju.gupta" Date: Sun, 25 May 2025 19:39:03 +0100 Subject: [PATCH 4/5] FLINK-36236 Fix overriding the connection provider in JdbcSourceBuilder when set and add tests for the same. --- flink-connector-jdbc-core/.idea/.gitignore | 10 ---------- 1 file changed, 10 deletions(-) delete mode 100644 flink-connector-jdbc-core/.idea/.gitignore diff --git a/flink-connector-jdbc-core/.idea/.gitignore b/flink-connector-jdbc-core/.idea/.gitignore deleted file mode 100644 index 7bc07ec21..000000000 --- a/flink-connector-jdbc-core/.idea/.gitignore +++ /dev/null @@ -1,10 +0,0 @@ -# Default ignored files -/shelf/ -/workspace.xml -# Editor-based HTTP Client requests -/httpRequests/ -# Environment-dependent path to Maven home directory -/mavenHomeManager.xml -# Datasource local storage ignored files -/dataSources/ -/dataSources.local.xml From 42e58f3e3d1a446920debf8f90607ddacbce63bd Mon Sep 17 00:00:00 2001 From: "raju.gupta" Date: Sun, 25 May 2025 19:50:23 +0100 Subject: [PATCH 5/5] FLINK-36236 Fix checkstyle and format violations. --- .../core/datastream/source/JdbcSourceBuilder.java | 2 +- .../datastream/source/JdbcSourceBuilderTest.java | 13 +++++++------ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilder.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilder.java index 89cf7ef0d..87b39f401 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilder.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilder.java @@ -267,7 +267,7 @@ public JdbcSourceBuilder setOptionalSqlSplitEnumeratorState( } public JdbcSource build() { - if(this.connectionProvider == null) { + if (this.connectionProvider == null) { this.connectionProvider = new SimpleJdbcConnectionProvider(connOptionsBuilder.build()); } diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilderTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilderTest.java index ba48c603f..d116433b5 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilderTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilderTest.java @@ -188,17 +188,18 @@ void testSetStreamingSemantic() { @Test void testSetConnectionProvider() { assertThatThrownBy(() -> JdbcSource.builder().setConnectionProvider(null)) - .isInstanceOf(NullPointerException.class); + .isInstanceOf(NullPointerException.class); final JdbcConnectionOptions options = - new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() - .withUrl(dbUrl) - .withDriverName(driverName) - .build(); + new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() + .withUrl(dbUrl) + .withDriverName(driverName) + .build(); final JdbcConnectionProvider connectionProvider = new SimpleJdbcConnectionProvider(options); - final JdbcSource jdbcSource = sourceBuilder.setConnectionProvider(connectionProvider).build(); + final JdbcSource jdbcSource = + sourceBuilder.setConnectionProvider(connectionProvider).build(); assertThat(jdbcSource.getConnectionProvider()).isSameAs(connectionProvider); }