Skip to content

Commit f9167c3

Browse files
lvyanquanleonardBang
authored andcommitted
[FLINK-36659] Bump version of flink to 2.0.0.
1 parent da67474 commit f9167c3

File tree

22 files changed

+151
-109
lines changed

22 files changed

+151
-109
lines changed

.github/workflows/backwards_compatibility.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ jobs:
2929
runs-on: ubuntu-latest
3030
strategy:
3131
matrix:
32-
flink: [1.18-SNAPSHOT, 1.19-SNAPSHOT]
33-
jdk: [8, 11, 17]
32+
flink: [2.0-SNAPSHOT, 2.1-SNAPSHOT]
33+
jdk: [17]
3434

3535
env:
3636
MVN_CONNECTION_OPTIONS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120

.github/workflows/push_pr.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ jobs:
2828
compile_and_test:
2929
strategy:
3030
matrix:
31-
flink: [1.20.0]
32-
jdk: [ '8, 11, 17, 21' ]
31+
flink: [2.0.0]
32+
jdk: [ '17' ]
3333
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
3434
with:
3535
flink_version: ${{ matrix.flink }}

.github/workflows/weekly.yml

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,16 @@ jobs:
3030
strategy:
3131
matrix:
3232
flink_branches: [{
33-
flink: 1.19-SNAPSHOT,
34-
jdk: '8, 11, 17, 21',
33+
flink: 2.1-SNAPSHOT,
34+
jdk: '17',
3535
branch: main
36-
},
37-
{
38-
flink: 1.20-SNAPSHOT,
39-
jdk: '8, 11, 17, 21',
36+
}, {
37+
flink: 2.0-SNAPSHOT,
38+
jdk: '17',
39+
branch: main
40+
}, {
41+
flink: 2.0.0,
42+
jdk: '17',
4043
branch: main
4144
}, {
4245
flink: 1.19.1,

flink-connector-jdbc-backward-compatibility/pom.xml

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
<packaging>jar</packaging>
1818

1919
<properties>
20+
<postgres.version>42.7.3</postgres.version>
2021
<surefire.module.config>
2122
--add-opens=java.base/java.util=ALL-UNNAMED
2223
--add-opens=java.base/java.lang=ALL-UNNAMED
@@ -58,17 +59,30 @@
5859
</dependency>
5960
<dependency>
6061
<groupId>org.apache.flink</groupId>
61-
<artifactId>flink-connector-jdbc</artifactId>
62+
<artifactId>flink-connector-jdbc-core</artifactId>
63+
<version>${project.version}</version>
64+
<type>test-jar</type>
65+
<scope>test</scope>
66+
</dependency>
67+
<dependency>
68+
<groupId>org.apache.flink</groupId>
69+
<artifactId>flink-connector-jdbc-postgres</artifactId>
6270
<version>${project.version}</version>
6371
<scope>test</scope>
6472
</dependency>
6573
<dependency>
6674
<groupId>org.apache.flink</groupId>
67-
<artifactId>flink-connector-jdbc</artifactId>
75+
<artifactId>flink-connector-jdbc-postgres</artifactId>
6876
<version>${project.version}</version>
6977
<type>test-jar</type>
7078
<scope>test</scope>
7179
</dependency>
80+
<dependency>
81+
<groupId>org.postgresql</groupId>
82+
<artifactId>postgresql</artifactId>
83+
<version>${postgres.version}</version>
84+
<scope>test</scope>
85+
</dependency>
7286
<dependency>
7387
<groupId>org.testcontainers</groupId>
7488
<artifactId>postgresql</artifactId>

flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DataStreamSinkTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,16 @@
1818

1919
package org.apache.flink.connector.jdbc.backward.compatibility;
2020

21-
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
2221
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
2322
import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
2423
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
25-
import org.apache.flink.connector.jdbc.JdbcSink;
24+
import org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSink;
2625
import org.apache.flink.connector.jdbc.postgres.PostgresTestBase;
2726
import org.apache.flink.connector.jdbc.testutils.TableManaged;
2827
import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable;
2928
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
3029
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
30+
import org.apache.flink.streaming.util.RestartStrategyUtils;
3131
import org.apache.flink.test.junit5.MiniClusterExtension;
3232

3333
import org.junit.jupiter.api.Test;
@@ -72,7 +72,7 @@ public List<TableManaged> getManagedTables() {
7272
@Test
7373
public void testAtLeastOnce() throws Exception {
7474
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
75-
env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
75+
RestartStrategyUtils.configureNoRestartStrategy(env);
7676
env.setParallelism(1);
7777

7878
assertResult(new ArrayList<>());
@@ -98,7 +98,7 @@ public void testAtLeastOnce() throws Exception {
9898
@Test
9999
public void testExactlyOnce() throws Exception {
100100
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
101-
env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
101+
RestartStrategyUtils.configureNoRestartStrategy(env);
102102
env.setParallelism(1);
103103

104104
assertResult(new ArrayList<>());

flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DataStreamSourceTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,18 @@
1919
package org.apache.flink.connector.jdbc.backward.compatibility;
2020

2121
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
22-
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
2322
import org.apache.flink.api.common.typeinfo.TypeInformation;
2423
import org.apache.flink.connector.jdbc.JdbcTestFixture;
24+
import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource;
25+
import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor;
2526
import org.apache.flink.connector.jdbc.postgres.PostgresTestBase;
26-
import org.apache.flink.connector.jdbc.source.JdbcSource;
27-
import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
2827
import org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider;
2928
import org.apache.flink.connector.jdbc.testutils.TableManaged;
3029
import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable;
3130
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
3231
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
33-
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
32+
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
33+
import org.apache.flink.streaming.util.RestartStrategyUtils;
3434
import org.apache.flink.test.junit5.MiniClusterExtension;
3535

3636
import org.junit.jupiter.api.BeforeEach;
@@ -101,7 +101,7 @@ void init() throws SQLException {
101101
@Test
102102
void testReadWithoutParallelismWithoutParamsProvider() throws Exception {
103103
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
104-
env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
104+
RestartStrategyUtils.configureNoRestartStrategy(env);
105105
env.setParallelism(1);
106106
JdbcSource<JdbcTestFixture.TestEntry> jdbcSource =
107107
JdbcSource.<JdbcTestFixture.TestEntry>builder()
@@ -122,7 +122,7 @@ void testReadWithoutParallelismWithoutParamsProvider() throws Exception {
122122
@Test
123123
void testReadWithoutParallelismWithParamsProvider() throws Exception {
124124
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
125-
env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
125+
RestartStrategyUtils.configureNoRestartStrategy(env);
126126
env.setParallelism(1);
127127
JdbcSource<JdbcTestFixture.TestEntry> jdbcSource =
128128
JdbcSource.<JdbcTestFixture.TestEntry>builder()
@@ -146,7 +146,7 @@ void testReadWithoutParallelismWithParamsProvider() throws Exception {
146146
@Test
147147
void testReadWithParallelismWithoutParamsProvider() throws Exception {
148148
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
149-
env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
149+
RestartStrategyUtils.configureNoRestartStrategy(env);
150150
env.setParallelism(2);
151151
JdbcSource<JdbcTestFixture.TestEntry> jdbcSource =
152152
JdbcSource.<JdbcTestFixture.TestEntry>builder()
@@ -167,7 +167,7 @@ void testReadWithParallelismWithoutParamsProvider() throws Exception {
167167
@Test
168168
void testReadWithParallelismWithParamsProvider() throws Exception {
169169
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
170-
env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
170+
RestartStrategyUtils.configureNoRestartStrategy(env);
171171
env.setParallelism(2);
172172
JdbcSource<JdbcTestFixture.TestEntry> jdbcSource =
173173
JdbcSource.<JdbcTestFixture.TestEntry>builder()

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,10 @@ public CatalogBaseTable getTable(ObjectPath tablePath)
305305
pk -> schemaBuilder.primaryKeyNamed(pk.getName(), pk.getColumns()));
306306
Schema tableSchema = schemaBuilder.build();
307307

308-
return CatalogTable.of(tableSchema, null, Lists.newArrayList(), getOptions(tablePath));
308+
return CatalogTable.newBuilder()
309+
.schema(tableSchema)
310+
.options(getOptions(tablePath))
311+
.build();
309312
} catch (Exception e) {
310313
throw new CatalogException(
311314
String.format("Failed getting table %s", tablePath.getFullName()), e);

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@
2020
import org.apache.flink.annotation.Internal;
2121
import org.apache.flink.annotation.PublicEvolving;
2222
import org.apache.flink.api.connector.sink2.Committer;
23-
import org.apache.flink.api.connector.sink2.StatefulSink;
24-
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
23+
import org.apache.flink.api.connector.sink2.CommitterInitContext;
24+
import org.apache.flink.api.connector.sink2.Sink;
25+
import org.apache.flink.api.connector.sink2.SupportsCommitter;
26+
import org.apache.flink.api.connector.sink2.SupportsWriterState;
27+
import org.apache.flink.api.connector.sink2.WriterInitContext;
2528
import org.apache.flink.connector.base.DeliveryGuarantee;
2629
import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
2730
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
@@ -47,7 +50,9 @@
4750
*/
4851
@PublicEvolving
4952
public class JdbcSink<IN>
50-
implements StatefulSink<IN, JdbcWriterState>, TwoPhaseCommittingSink<IN, JdbcCommitable> {
53+
implements Sink<IN>,
54+
SupportsWriterState<IN, JdbcWriterState>,
55+
SupportsCommitter<JdbcCommitable> {
5156

5257
private final DeliveryGuarantee deliveryGuarantee;
5358
private final JdbcConnectionProvider connectionProvider;
@@ -74,14 +79,28 @@ public static <IN> JdbcSinkBuilder<IN> builder() {
7479

7580
@Override
7681
@Internal
77-
public JdbcWriter<IN> createWriter(InitContext context) throws IOException {
82+
public JdbcWriter<IN> createWriter(WriterInitContext context) throws IOException {
7883
return restoreWriter(context, Collections.emptyList());
7984
}
8085

86+
@Override
87+
@Internal
88+
public Committer<JdbcCommitable> createCommitter(CommitterInitContext committerInitContext)
89+
throws IOException {
90+
return new JdbcCommitter(deliveryGuarantee, connectionProvider, exactlyOnceOptions);
91+
}
92+
93+
@Override
94+
@Internal
95+
public SimpleVersionedSerializer<JdbcCommitable> getCommittableSerializer() {
96+
return new JdbcCommitableSerializer();
97+
}
98+
8199
@Override
82100
@Internal
83101
public JdbcWriter<IN> restoreWriter(
84-
InitContext context, Collection<JdbcWriterState> recoveredState) throws IOException {
102+
WriterInitContext context, Collection<JdbcWriterState> recoveredState)
103+
throws IOException {
85104
JdbcOutputSerializer<IN> outputSerializer =
86105
JdbcOutputSerializer.of(
87106
context.createInputSerializer(), context.isObjectReuseEnabled());
@@ -96,18 +115,6 @@ public JdbcWriter<IN> restoreWriter(
96115
context);
97116
}
98117

99-
@Override
100-
@Internal
101-
public Committer<JdbcCommitable> createCommitter() throws IOException {
102-
return new JdbcCommitter(deliveryGuarantee, connectionProvider, exactlyOnceOptions);
103-
}
104-
105-
@Override
106-
@Internal
107-
public SimpleVersionedSerializer<JdbcCommitable> getCommittableSerializer() {
108-
return new JdbcCommitableSerializer();
109-
}
110-
111118
@Override
112119
@Internal
113120
public SimpleVersionedSerializer<JdbcWriterState> getWriterStateSerializer() {

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriter.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
package org.apache.flink.connector.jdbc.core.datastream.sink.writer;
1919

2020
import org.apache.flink.annotation.Internal;
21-
import org.apache.flink.api.connector.sink2.Sink;
22-
import org.apache.flink.api.connector.sink2.StatefulSink;
23-
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
21+
import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
22+
import org.apache.flink.api.connector.sink2.InitContext;
23+
import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
2424
import org.apache.flink.connector.base.DeliveryGuarantee;
2525
import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
2626
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
@@ -55,8 +55,8 @@
5555
*/
5656
@Internal
5757
public class JdbcWriter<IN>
58-
implements StatefulSink.StatefulSinkWriter<IN, JdbcWriterState>,
59-
TwoPhaseCommittingSink.PrecommittingSinkWriter<IN, JdbcCommitable> {
58+
implements StatefulSinkWriter<IN, JdbcWriterState>,
59+
CommittingSinkWriter<IN, JdbcCommitable> {
6060

6161
private static final Logger LOG = LoggerFactory.getLogger(JdbcWriter.class);
6262

@@ -75,7 +75,7 @@ public JdbcWriter(
7575
JdbcOutputSerializer<IN> outputSerializer,
7676
DeliveryGuarantee deliveryGuarantee,
7777
Collection<JdbcWriterState> recoveredState,
78-
Sink.InitContext initContext)
78+
InitContext initContext)
7979
throws IOException {
8080

8181
this.deliveryGuarantee =
@@ -85,9 +85,7 @@ public JdbcWriter(
8585

8686
pendingRecords = false;
8787
this.lastCheckpointId =
88-
initContext
89-
.getRestoredCheckpointId()
90-
.orElse(Sink.InitContext.INITIAL_CHECKPOINT_ID - 1);
88+
initContext.getRestoredCheckpointId().orElse(InitContext.INITIAL_CHECKPOINT_ID - 1);
9189

9290
checkNotNull(connectionProvider, "connectionProvider must be defined");
9391

@@ -106,9 +104,9 @@ public JdbcWriter(
106104

107105
TransactionId transactionId =
108106
TransactionId.create(
109-
initContext.getJobId().getBytes(),
110-
initContext.getSubtaskId(),
111-
initContext.getNumberOfParallelSubtasks());
107+
initContext.getJobInfo().getJobId().getBytes(),
108+
initContext.getTaskInfo().getIndexOfThisSubtask(),
109+
initContext.getTaskInfo().getNumberOfParallelSubtasks());
112110

113111
this.jdbcTransaction =
114112
new XaTransaction(

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceSplitReader.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,15 +105,15 @@ public JdbcSourceSplitReader(
105105
this.config = Preconditions.checkNotNull(config);
106106
this.typeInformation = Preconditions.checkNotNull(typeInformation);
107107
this.connectionProvider = Preconditions.checkNotNull(connectionProvider);
108-
this.resultSetType = config.getInteger(RESULTSET_TYPE);
109-
this.resultSetConcurrency = config.getInteger(RESULTSET_CONCURRENCY);
110-
this.resultSetFetchSize = config.getInteger(RESULTSET_FETCH_SIZE);
111-
this.autoCommit = config.getBoolean(AUTO_COMMIT);
108+
this.resultSetType = config.get(RESULTSET_TYPE);
109+
this.resultSetConcurrency = config.get(RESULTSET_CONCURRENCY);
110+
this.resultSetFetchSize = config.get(RESULTSET_FETCH_SIZE);
111+
this.autoCommit = config.get(AUTO_COMMIT);
112112
this.deliveryGuarantee = Preconditions.checkNotNull(deliveryGuarantee);
113113
this.splits = new ArrayDeque<>();
114114
this.hasNextRecordCurrentSplit = false;
115115
this.currentSplit = null;
116-
int splitReaderFetchBatchSize = config.getInteger(READER_FETCH_BATCH_SIZE);
116+
int splitReaderFetchBatchSize = config.get(READER_FETCH_BATCH_SIZE);
117117
Preconditions.checkArgument(
118118
splitReaderFetchBatchSize > 0 && splitReaderFetchBatchSize < Integer.MAX_VALUE);
119119
this.splitReaderFetchBatchSize = splitReaderFetchBatchSize;

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcDynamicTableSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
2626
import org.apache.flink.table.connector.ChangelogMode;
2727
import org.apache.flink.table.connector.sink.DynamicTableSink;
28-
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
28+
import org.apache.flink.table.connector.sink.legacy.SinkFunctionProvider;
2929
import org.apache.flink.table.types.DataType;
3030
import org.apache.flink.types.RowKind;
3131

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/GenericJdbcSinkFunction.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@
2020

2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.api.common.ExecutionConfig;
23+
import org.apache.flink.api.common.functions.OpenContext;
2324
import org.apache.flink.api.common.typeinfo.TypeInformation;
2425
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
25-
import org.apache.flink.configuration.Configuration;
2626
import org.apache.flink.runtime.state.FunctionInitializationContext;
2727
import org.apache.flink.runtime.state.FunctionSnapshotContext;
2828
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
29-
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
29+
import org.apache.flink.streaming.api.functions.sink.legacy.RichSinkFunction;
3030
import org.apache.flink.util.Preconditions;
3131

3232
import javax.annotation.Nonnull;
@@ -45,11 +45,10 @@ public GenericJdbcSinkFunction(@Nonnull JdbcOutputFormat<T, ?, ?> outputFormat)
4545
}
4646

4747
@Override
48-
public void open(Configuration parameters) throws Exception {
49-
super.open(parameters);
48+
public void open(OpenContext openContext) throws Exception {
49+
super.open(openContext);
5050
// Recheck if execution config change
51-
serializer.withObjectReuseEnabled(
52-
getRuntimeContext().getExecutionConfig().isObjectReuseEnabled());
51+
serializer.withObjectReuseEnabled(getRuntimeContext().isObjectReuseEnabled());
5352
outputFormat.open(serializer);
5453
}
5554

@@ -76,6 +75,7 @@ public void close() {
7675
public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
7776
this.serializer =
7877
JdbcOutputSerializer.of(
79-
((TypeInformation<T>) type).createSerializer(executionConfig));
78+
((TypeInformation<T>) type)
79+
.createSerializer(executionConfig.getSerializerConfig()));
8080
}
8181
}

0 commit comments

Comments
 (0)