Skip to content

Commit d468795

Browse files
committed
[FLINK-34467] add lineage integration for jdbc connector
1 parent 134d858 commit d468795

File tree

37 files changed

+1361
-122
lines changed

37 files changed

+1361
-122
lines changed

.github/workflows/backwards_compatibility.yml

Lines changed: 0 additions & 103 deletions
This file was deleted.

.github/workflows/weekly.yml

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,7 @@ jobs:
2929
if: github.repository_owner == 'apache'
3030
strategy:
3131
matrix:
32-
flink_branches: [{
33-
flink: 1.19-SNAPSHOT,
34-
jdk: '8, 11, 17, 21',
35-
branch: main
36-
},
32+
flink_branches: [
3733
{
3834
flink: 1.20-SNAPSHOT,
3935
jdk: '8, 11, 17, 21',

.java-version

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
11
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Method <org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource.getLineageVertex()> calls method <org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator.getSqlTemplate()> in (JdbcSource.java:215)

flink-connector-jdbc-core/archunit-violations/6cdea252-f400-4c13-bc99-b325f2ebe333

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,11 @@ Method <org.apache.flink.connector.jdbc.core.table.sink.JdbcOutputFormatBuilder.
4848
Method <org.apache.flink.connector.jdbc.core.table.sink.JdbcOutputFormatBuilder.getPrimaryKey(org.apache.flink.table.data.RowData, [Lorg.apache.flink.table.data.RowData$FieldGetter;)> has parameter of type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in (JdbcOutputFormatBuilder.java:0)
4949
Method <org.apache.flink.connector.jdbc.core.table.sink.JdbcOutputFormatBuilder.setFieldDataTypes([Lorg.apache.flink.table.types.DataType;)> has parameter of type <[Lorg.apache.flink.table.types.DataType;> in (JdbcOutputFormatBuilder.java:0)
5050
Method <org.apache.flink.connector.jdbc.core.table.source.JdbcRowDataInputFormat.createInputSplits(int)> has return type <[Lorg.apache.flink.core.io.InputSplit;> in (JdbcRowDataInputFormat.java:0)
51-
Method <org.apache.flink.connector.jdbc.core.table.source.JdbcRowDataInputFormat.getInputSplitAssigner([Lorg.apache.flink.core.io.InputSplit;)> calls constructor <org.apache.flink.api.common.io.DefaultInputSplitAssigner.<init>([Lorg.apache.flink.core.io.InputSplit;)> in (JdbcRowDataInputFormat.java:287)
51+
Method <org.apache.flink.connector.jdbc.core.table.source.JdbcRowDataInputFormat.getInputSplitAssigner([Lorg.apache.flink.core.io.InputSplit;)> calls constructor <org.apache.flink.api.common.io.DefaultInputSplitAssigner.<init>([Lorg.apache.flink.core.io.InputSplit;)> in (JdbcRowDataInputFormat.java:295)
5252
Method <org.apache.flink.connector.jdbc.core.table.source.JdbcRowDataInputFormat.getInputSplitAssigner([Lorg.apache.flink.core.io.InputSplit;)> has parameter of type <[Lorg.apache.flink.core.io.InputSplit;> in (JdbcRowDataInputFormat.java:0)
5353
Method <org.apache.flink.connector.jdbc.core.table.source.JdbcRowDataLookupFunction.getDbConnection()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (JdbcRowDataLookupFunction.java:0)
54+
Method <org.apache.flink.connector.jdbc.core.table.source.JdbcRowDataLookupFunction.getLineageVertex()> calls method <org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(org.apache.flink.table.types.DataType)> in (JdbcRowDataLookupFunction.java:242)
55+
Method <org.apache.flink.connector.jdbc.core.table.source.JdbcRowDataLookupFunction.getLineageVertex()> calls method <org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType(org.apache.flink.table.types.logical.LogicalType)> in (JdbcRowDataLookupFunction.java:243)
5456
Method <org.apache.flink.connector.jdbc.datasource.connections.xa.SimpleXaConnectionProvider.from(javax.sql.XADataSource)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (SimpleXaConnectionProvider.java:0)
5557
Method <org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId.deserialize([B)> calls constructor <org.apache.flink.core.memory.DataInputDeserializer.<init>([B)> in (TransactionId.java:96)
5658
Method <org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId.deserialize([B)> calls method <org.apache.flink.core.memory.DataInputDeserializer.readInt()> in (TransactionId.java:101)
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
org.apache.flink.connector.jdbc.lineage.DefaultJdbcExtractor.extract(java.lang.String, java.util.Properties): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
2+
org.apache.flink.connector.jdbc.lineage.JdbcLocation$Builder.build(): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
3+
org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractor.extract(java.lang.String, java.util.Properties): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
4+
org.apache.flink.connector.jdbc.lineage.OverrideJdbcLocationExtractor.extract(java.lang.String, java.util.Properties): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated

flink-connector-jdbc-core/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@ under the License.
5656
<optional>true</optional>
5757
</dependency>
5858

59+
<dependency>
60+
<groupId>io.openlineage</groupId>
61+
<artifactId>openlineage-sql-java</artifactId>
62+
</dependency>
63+
5964
<!-- Tests -->
6065

6166
<dependency>

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,23 @@
2424
import org.apache.flink.api.common.io.InputFormat;
2525
import org.apache.flink.api.common.io.RichInputFormat;
2626
import org.apache.flink.api.common.io.statistics.BaseStatistics;
27+
import org.apache.flink.api.connector.source.Boundedness;
2728
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
2829
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2930
import org.apache.flink.configuration.Configuration;
3031
import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource;
3132
import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceBuilder;
3233
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
3334
import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
35+
import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet;
36+
import org.apache.flink.connector.jdbc.lineage.LineageUtils;
3437
import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
3538
import org.apache.flink.core.io.GenericInputSplit;
3639
import org.apache.flink.core.io.InputSplit;
3740
import org.apache.flink.core.io.InputSplitAssigner;
41+
import org.apache.flink.streaming.api.lineage.LineageDataset;
42+
import org.apache.flink.streaming.api.lineage.LineageVertex;
43+
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
3844
import org.apache.flink.types.Row;
3945
import org.apache.flink.util.Preconditions;
4046

@@ -53,6 +59,8 @@
5359
import java.sql.Time;
5460
import java.sql.Timestamp;
5561
import java.util.Arrays;
62+
import java.util.Collections;
63+
import java.util.Optional;
5664

5765
/**
5866
* InputFormat to read data from a database and generate Rows. The InputFormat has to be configured
@@ -107,7 +115,7 @@
107115
@Deprecated
108116
@Experimental
109117
public class JdbcInputFormat extends RichInputFormat<Row, InputSplit>
110-
implements ResultTypeQueryable<Row> {
118+
implements LineageVertexProvider, ResultTypeQueryable<Row> {
111119

112120
protected static final long serialVersionUID = 2L;
113121
protected static final Logger LOG = LoggerFactory.getLogger(JdbcInputFormat.class);
@@ -344,6 +352,19 @@ public static JdbcInputFormatBuilder buildJdbcInputFormat() {
344352
return new JdbcInputFormatBuilder();
345353
}
346354

355+
@Override
356+
public LineageVertex getLineageVertex() {
357+
DefaultTypeDatasetFacet defaultTypeDatasetFacet =
358+
new DefaultTypeDatasetFacet(getProducedType());
359+
Optional<String> nameOpt = LineageUtils.nameOf(queryTemplate);
360+
String namespace = LineageUtils.namespaceOf(connectionProvider);
361+
LineageDataset dataset =
362+
LineageUtils.datasetOf(
363+
nameOpt.orElse(""), namespace, Arrays.asList(defaultTypeDatasetFacet));
364+
return LineageUtils.sourceLineageVertexOf(
365+
Boundedness.BOUNDED, Collections.singleton(dataset));
366+
}
367+
347368
/** Builder for {@link JdbcInputFormat}. */
348369
public static class JdbcInputFormatBuilder {
349370
private final JdbcConnectionOptions.JdbcConnectionOptionsBuilder connOptionsBuilder;

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,16 @@
3434
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
3535
import org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement;
3636
import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer;
37+
import org.apache.flink.connector.jdbc.lineage.LineageUtils;
3738
import org.apache.flink.core.io.SimpleVersionedSerializer;
39+
import org.apache.flink.streaming.api.lineage.LineageDataset;
40+
import org.apache.flink.streaming.api.lineage.LineageVertex;
41+
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
3842

3943
import java.io.IOException;
4044
import java.util.Collection;
4145
import java.util.Collections;
46+
import java.util.Optional;
4247

4348
/**
4449
* Flink Sink to produce data into a jdbc database.
@@ -47,7 +52,9 @@
4752
*/
4853
@PublicEvolving
4954
public class JdbcSink<IN>
50-
implements StatefulSink<IN, JdbcWriterState>, TwoPhaseCommittingSink<IN, JdbcCommitable> {
55+
implements LineageVertexProvider,
56+
StatefulSink<IN, JdbcWriterState>,
57+
TwoPhaseCommittingSink<IN, JdbcCommitable> {
5158

5259
private final DeliveryGuarantee deliveryGuarantee;
5360
private final JdbcConnectionProvider connectionProvider;
@@ -113,4 +120,13 @@ public SimpleVersionedSerializer<JdbcCommitable> getCommittableSerializer() {
113120
public SimpleVersionedSerializer<JdbcWriterState> getWriterStateSerializer() {
114121
return new JdbcWriterStateSerializer();
115122
}
123+
124+
@Override
125+
public LineageVertex getLineageVertex() {
126+
Optional<String> nameOpt = LineageUtils.nameOf(queryStatement.query());
127+
String namespace = LineageUtils.namespaceOf(connectionProvider);
128+
LineageDataset dataset =
129+
LineageUtils.datasetOf(nameOpt.orElse(""), namespace, Collections.emptyList());
130+
return LineageUtils.lineageVertexOf(Collections.singleton(dataset));
131+
}
116132
}

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,26 +34,36 @@
3434
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumerator;
3535
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumeratorState;
3636
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSqlSplitEnumeratorBase;
37+
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator;
3738
import org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceReader;
3839
import org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceSplitReader;
3940
import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor;
4041
import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit;
4142
import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplitSerializer;
4243
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
44+
import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet;
45+
import org.apache.flink.connector.jdbc.lineage.LineageUtils;
4346
import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings;
4447
import org.apache.flink.core.io.SimpleVersionedSerializer;
48+
import org.apache.flink.streaming.api.lineage.LineageDataset;
49+
import org.apache.flink.streaming.api.lineage.LineageVertex;
50+
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
4551
import org.apache.flink.util.Preconditions;
4652

4753
import javax.annotation.Nullable;
4854

4955
import java.io.Serializable;
5056
import java.util.ArrayList;
57+
import java.util.Arrays;
58+
import java.util.Collections;
5159
import java.util.Objects;
60+
import java.util.Optional;
5261

5362
/** JDBC source. */
5463
@PublicEvolving
5564
public class JdbcSource<OUT>
56-
implements Source<OUT, JdbcSourceSplit, JdbcSourceEnumeratorState>,
65+
implements LineageVertexProvider,
66+
Source<OUT, JdbcSourceSplit, JdbcSourceEnumeratorState>,
5767
ResultTypeQueryable<OUT> {
5868

5969
private final Boundedness boundedness;
@@ -195,4 +205,18 @@ public boolean equals(Object o) {
195205
&& deliveryGuarantee == that.deliveryGuarantee
196206
&& Objects.equals(continuousUnBoundingSettings, that.continuousUnBoundingSettings);
197207
}
208+
209+
@Override
210+
public LineageVertex getLineageVertex() {
211+
DefaultTypeDatasetFacet defaultTypeDatasetFacet =
212+
new DefaultTypeDatasetFacet(getTypeInformation());
213+
SqlTemplateSplitEnumerator enumerator =
214+
(SqlTemplateSplitEnumerator) sqlSplitEnumeratorProvider.create();
215+
Optional<String> nameOpt = LineageUtils.nameOf(enumerator.getSqlTemplate());
216+
String namespace = LineageUtils.namespaceOf(connectionProvider);
217+
LineageDataset dataset =
218+
LineageUtils.datasetOf(
219+
nameOpt.orElse(""), namespace, Arrays.asList(defaultTypeDatasetFacet));
220+
return LineageUtils.sourceLineageVertexOf(boundedness, Collections.singleton(dataset));
221+
}
198222
}

0 commit comments

Comments
 (0)