Skip to content

Commit f697986

Browse files
authored
[FLINK-36236] Fix overriding the connection provider in JdbcSourceBuilder
1 parent 4d3351e commit f697986

File tree

3 files changed

+34
-4
lines changed

3 files changed

+34
-4
lines changed

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,20 +162,25 @@ public JdbcSqlSplitEnumeratorBase.Provider<JdbcSourceSplit> getSqlSplitEnumerato
162162
}
163163

164164
@VisibleForTesting
165-
public TypeInformation<OUT> getTypeInformation() {
165+
TypeInformation<OUT> getTypeInformation() {
166166
return typeInformation;
167167
}
168168

169169
@VisibleForTesting
170-
public Configuration getConfiguration() {
170+
Configuration getConfiguration() {
171171
return configuration;
172172
}
173173

174174
@VisibleForTesting
175-
public ResultExtractor<OUT> getResultExtractor() {
175+
ResultExtractor<OUT> getResultExtractor() {
176176
return resultExtractor;
177177
}
178178

179+
@VisibleForTesting
180+
JdbcConnectionProvider getConnectionProvider() {
181+
return connectionProvider;
182+
}
183+
179184
@VisibleForTesting
180185
@Override
181186
public boolean equals(Object o) {

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilder.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,10 @@ public JdbcSourceBuilder<OUT> setOptionalSqlSplitEnumeratorState(
267267
}
268268

269269
public JdbcSource<OUT> build() {
270-
this.connectionProvider = new SimpleJdbcConnectionProvider(connOptionsBuilder.build());
270+
if (this.connectionProvider == null) {
271+
this.connectionProvider = new SimpleJdbcConnectionProvider(connOptionsBuilder.build());
272+
}
273+
271274
if (resultSetFetchSize > 0) {
272275
this.configuration.set(JdbcSourceOptions.RESULTSET_FETCH_SIZE, resultSetFetchSize);
273276
}

flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilderTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@
2020

2121
import org.apache.flink.api.common.typeinfo.TypeHint;
2222
import org.apache.flink.api.common.typeinfo.TypeInformation;
23+
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
2324
import org.apache.flink.connector.jdbc.core.datastream.source.config.ContinuousUnBoundingSettings;
2425
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator;
2526
import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor;
27+
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
28+
import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
2629
import org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider;
2730
import org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider;
2831
import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
@@ -181,4 +184,23 @@ void testSetStreamingSemantic() {
181184
.isInstanceOf(IllegalArgumentException.class)
182185
.hasMessage(JdbcSourceBuilder.INVALID_CONTINUOUS_SLIDE_TIMING_HINT);
183186
}
187+
188+
@Test
189+
void testSetConnectionProvider() {
190+
assertThatThrownBy(() -> JdbcSource.builder().setConnectionProvider(null))
191+
.isInstanceOf(NullPointerException.class);
192+
193+
final JdbcConnectionOptions options =
194+
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
195+
.withUrl(dbUrl)
196+
.withDriverName(driverName)
197+
.build();
198+
199+
final JdbcConnectionProvider connectionProvider = new SimpleJdbcConnectionProvider(options);
200+
201+
final JdbcSource<Row> jdbcSource =
202+
sourceBuilder.setConnectionProvider(connectionProvider).build();
203+
204+
assertThat(jdbcSource.getConnectionProvider()).isSameAs(connectionProvider);
205+
}
184206
}

0 commit comments

Comments
 (0)