Skip to content

Commit f4015d1

Browse files
committed
[FLINK-37282] Incorporate Backchannel and ProducerPool into EOSWriter
1 parent d3c3546 commit f4015d1

16 files changed

+257
-245
lines changed

flink-connector-kafka/archunit-violations/984f05c0-ec82-405e-9bcc-d202dbe7202e

-3
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ Constructor <org.apache.flink.connector.kafka.sink.KafkaWriter.<init>(org.apache
2626
Constructor <org.apache.flink.connector.kafka.sink.KafkaWriter.<init>(org.apache.flink.connector.base.DeliveryGuarantee, java.util.Properties, java.lang.String, org.apache.flink.api.connector.sink2.Sink$InitContext, org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema, org.apache.flink.api.common.serialization.SerializationSchema$InitializationContext, java.util.Collection)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (KafkaWriter.java:138)
2727
Constructor <org.apache.flink.connector.kafka.sink.KafkaWriter.<init>(org.apache.flink.connector.base.DeliveryGuarantee, java.util.Properties, java.lang.String, org.apache.flink.api.connector.sink2.Sink$InitContext, org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema, org.apache.flink.api.common.serialization.SerializationSchema$InitializationContext, java.util.Collection)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (KafkaWriter.java:173)
2828
Constructor <org.apache.flink.connector.kafka.sink.KafkaWriterState.<init>(java.lang.String)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (KafkaWriterState.java:28)
29-
Constructor <org.apache.flink.connector.kafka.sink.Recyclable.<init>(java.lang.Object, java.util.function.Consumer)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object)> in (Recyclable.java:31)
30-
Constructor <org.apache.flink.connector.kafka.sink.Recyclable.<init>(java.lang.Object, java.util.function.Consumer)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object)> in (Recyclable.java:32)
3129
Constructor <org.apache.flink.connector.kafka.sink.TransactionAborter.<init>(int, int, java.util.function.Function, java.util.function.Consumer)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object)> in (TransactionAborter.java:60)
3230
Constructor <org.apache.flink.connector.kafka.testutils.YamlFileMetadataService$ListConstructor.<init>(java.lang.Class)> calls constructor <org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.constructor.Constructor.<init>()> in (YamlFileMetadataService.java:270)
3331
Constructor <org.apache.flink.connector.kafka.dynamic.source.enumerator.DynamicKafkaSourceEnumerator.<init>(org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.KafkaStreamSubscriber, org.apache.flink.connector.kafka.source.enumerator.metadata.KafkaMetadataService, org.apache.flink.api.connector.source.SplitEnumeratorContext, org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer, org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer, java.util.Properties, org.apache.flink.api.connector.source.Boundedness, org.apache.flink.connector.kafka.dynamic.source.enumerator.DynamicKafkaSourceEnumState, org.apache.flink.connector.kafka.dynamic.source.enumerator.StoppableKafkaEnumContextProxy$StoppableKafkaEnumContextProxyFactory)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (DynamicKafkaSourceEnumerator.java:0)
@@ -162,7 +160,6 @@ Method <org.apache.flink.connector.kafka.sink.KafkaWriter.close()> calls method
162160
Method <org.apache.flink.connector.kafka.sink.KafkaWriter.getCurrentProducer()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaWriter.java:0)
163161
Method <org.apache.flink.connector.kafka.sink.KafkaWriter.getProducerPool()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaWriter.java:0)
164162
Method <org.apache.flink.connector.kafka.sink.KafkaWriter.getTransactionalProducer(long)> calls method <org.apache.flink.util.Preconditions.checkState(boolean, java.lang.String, [Ljava.lang.Object;)> in (KafkaWriter.java:311)
165-
Method <org.apache.flink.connector.kafka.sink.Recyclable.getObject()> calls method <org.apache.flink.util.Preconditions.checkState(boolean, java.lang.Object)> in (Recyclable.java:36)
166163
Method <org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSource.getKafkaStreamSubscriber()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (DynamicKafkaSource.java:0)
167164
Method <org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceBuilder.sanityCheck()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (DynamicKafkaSourceBuilder.java:290)
168165
Method <org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceBuilder.sanityCheck()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (DynamicKafkaSourceBuilder.java:292)

flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612

+7-5
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,13 @@ Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourc
4848
Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.isAvailable()> calls method <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.resetToUnAvailable()> in (DynamicKafkaSourceReader.java:381)
4949
Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.syncAvailabilityHelperWithReaders()> calls method <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.anyOf(int, java.util.concurrent.CompletableFuture)> in (DynamicKafkaSourceReader.java:496)
5050
Method <org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getProducerPool()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (ExactlyOnceKafkaWriter.java:0)
51-
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getCoLocationGroupKey()> in (KafkaSink.java:164)
52-
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getInputs()> in (KafkaSink.java:167)
53-
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getOutputType()> in (KafkaSink.java:163)
54-
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.setCoLocationGroupKey(java.lang.String)> in (KafkaSink.java:166)
55-
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> checks instanceof <org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo> in (KafkaSink.java:163)
51+
Method <org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getTransactionalIdPrefix()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (ExactlyOnceKafkaWriter.java:0)
52+
Method <org.apache.flink.connector.kafka.sink.KafkaCommitter.getBackchannel()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaCommitter.java:0)
53+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getCoLocationGroupKey()> in (KafkaSink.java:168)
54+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getInputs()> in (KafkaSink.java:171)
55+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getOutputType()> in (KafkaSink.java:167)
56+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.setCoLocationGroupKey(java.lang.String)> in (KafkaSink.java:170)
57+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> checks instanceof <org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo> in (KafkaSink.java:167)
5658
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> has generic parameter type <org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.flink.connector.kafka.sink.KafkaCommittable>>> with type argument depending on <org.apache.flink.streaming.api.connector.sink2.CommittableMessage> in (KafkaSink.java:0)
5759
Method <org.apache.flink.connector.kafka.sink.KafkaSink.getKafkaProducerConfig()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSink.java:0)
5860
Method <org.apache.flink.connector.kafka.sink.KafkaSinkBuilder.setRecordSerializer(org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema)> calls method <org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in (KafkaSinkBuilder.java:152)

flink-connector-kafka/archunit-violations/d853eb69-8c04-4246-9a5e-4f5911286b1d

+1
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@ org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema.open(org.ap
22
org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema.serialize(java.lang.Object, org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema$KafkaSinkContext, java.lang.Long): Argument leaf type org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema$KafkaSinkContext does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
33
org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream): Argument leaf type org.apache.flink.connector.kafka.sink.KafkaCommittable does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
44
org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream): Argument leaf type org.apache.flink.streaming.api.connector.sink2.CommittableMessage does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
5+
org.apache.flink.connector.kafka.sink.KafkaSink.createCommitter(org.apache.flink.api.connector.sink2.CommitterInitContext): Returned leaf type org.apache.flink.connector.kafka.sink.KafkaCommittable does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
56
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer.getPartitionOffsets(java.util.Collection, org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer$PartitionOffsetsRetriever): Argument leaf type org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer$PartitionOffsetsRetriever does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
67
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer$KafkaTransactionState, java.lang.Object, org.apache.flink.streaming.api.functions.sink.SinkFunction$Context): Argument leaf type org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer$KafkaTransactionState does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated

0 commit comments

Comments
 (0)