Skip to content

Commit d3c3546

Browse files
committed
[FLINK-37282] Force colocation of kafka writer and kafka committer.
1 parent 580d3ed commit d3c3546

File tree

4 files changed

+95
-14
lines changed

4 files changed

+95
-14
lines changed

flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
Class <org.apache.flink.connector.kafka.sink.KafkaSink> implements interface <org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology> in (KafkaSink.java:0)
12
Class <org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionChange> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceEnumerator.java:0)
23
Class <org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceEnumerator.java:0)
34
Class <org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer$ContextStateSerializer> extends class <org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton> in (FlinkKafkaProducer.java:0)
@@ -47,6 +48,12 @@ Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourc
4748
Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.isAvailable()> calls method <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.resetToUnAvailable()> in (DynamicKafkaSourceReader.java:381)
4849
Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.syncAvailabilityHelperWithReaders()> calls method <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.anyOf(int, java.util.concurrent.CompletableFuture)> in (DynamicKafkaSourceReader.java:496)
4950
Method <org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getProducerPool()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (ExactlyOnceKafkaWriter.java:0)
51+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getCoLocationGroupKey()> in (KafkaSink.java:164)
52+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getInputs()> in (KafkaSink.java:167)
53+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getOutputType()> in (KafkaSink.java:163)
54+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.setCoLocationGroupKey(java.lang.String)> in (KafkaSink.java:166)
55+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> checks instanceof <org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo> in (KafkaSink.java:163)
56+
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> has generic parameter type <org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.flink.connector.kafka.sink.KafkaCommittable>>> with type argument depending on <org.apache.flink.streaming.api.connector.sink2.CommittableMessage> in (KafkaSink.java:0)
5057
Method <org.apache.flink.connector.kafka.sink.KafkaSink.getKafkaProducerConfig()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSink.java:0)
5158
Method <org.apache.flink.connector.kafka.sink.KafkaSinkBuilder.setRecordSerializer(org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema)> calls method <org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in (KafkaSinkBuilder.java:152)
5259
Method <org.apache.flink.connector.kafka.sink.KafkaWriter.getCurrentProducer()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaWriter.java:0)
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema.open(org.apache.flink.api.common.serialization.SerializationSchema$InitializationContext, org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema$KafkaSinkContext): Argument leaf type org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema$KafkaSinkContext does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
22
org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema.serialize(java.lang.Object, org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema$KafkaSinkContext, java.lang.Long): Argument leaf type org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema$KafkaSinkContext does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
3+
org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream): Argument leaf type org.apache.flink.connector.kafka.sink.KafkaCommittable does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
4+
org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream): Argument leaf type org.apache.flink.streaming.api.connector.sink2.CommittableMessage does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
35
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer.getPartitionOffsets(java.util.Collection, org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer$PartitionOffsetsRetriever): Argument leaf type org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer$PartitionOffsetsRetriever does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
46
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer$KafkaTransactionState, java.lang.Object, org.apache.flink.streaming.api.functions.sink.SinkFunction$Context): Argument leaf type org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer$KafkaTransactionState does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,18 @@
2121
import org.apache.flink.annotation.PublicEvolving;
2222
import org.apache.flink.annotation.VisibleForTesting;
2323
import org.apache.flink.api.connector.sink2.Committer;
24+
import org.apache.flink.api.dag.Transformation;
2425
import org.apache.flink.connector.base.DeliveryGuarantee;
2526
import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet;
2627
import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider;
2728
import org.apache.flink.connector.kafka.lineage.LineageUtil;
2829
import org.apache.flink.connector.kafka.lineage.TypeDatasetFacet;
2930
import org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider;
3031
import org.apache.flink.core.io.SimpleVersionedSerializer;
32+
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
33+
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
34+
import org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
35+
import org.apache.flink.streaming.api.datastream.DataStream;
3136
import org.apache.flink.streaming.api.lineage.LineageVertex;
3237
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
3338

@@ -66,7 +71,8 @@
6671
@PublicEvolving
6772
public class KafkaSink<IN>
6873
implements LineageVertexProvider,
69-
TwoPhaseCommittingStatefulSink<IN, KafkaWriterState, KafkaCommittable> {
74+
TwoPhaseCommittingStatefulSink<IN, KafkaWriterState, KafkaCommittable>,
75+
SupportsPostCommitTopology<KafkaCommittable> {
7076
private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
7177
private final DeliveryGuarantee deliveryGuarantee;
7278

@@ -147,6 +153,22 @@ public SimpleVersionedSerializer<KafkaWriterState> getWriterStateSerializer() {
147153
return new KafkaWriterStateSerializer();
148154
}
149155

156+
@Override
157+
public void addPostCommitTopology(DataStream<CommittableMessage<KafkaCommittable>> committer) {
158+
// this is a somewhat hacky way to ensure that the committer and writer are co-located
159+
if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE && transactionalIdPrefix != null) {
160+
Transformation<?> transformation = committer.getTransformation();
161+
// all sink transformations output CommittableMessage, so we can safely traverse the
162+
// chain; custom colocation key is set before and should be preserved
163+
while (transformation.getOutputType() instanceof CommittableMessageTypeInfo
164+
&& transformation.getCoLocationGroupKey() == null) {
165+
// colocate by transactionalIdPrefix, which should be unique
166+
transformation.setCoLocationGroupKey(transactionalIdPrefix);
167+
transformation = transformation.getInputs().get(0);
168+
}
169+
}
170+
}
171+
150172
@VisibleForTesting
151173
protected Properties getKafkaProducerConfig() {
152174
return kafkaProducerConfig;

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkTest.java

Lines changed: 63 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@
99
import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider;
1010
import org.apache.flink.connector.kafka.lineage.TypeDatasetFacet;
1111
import org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider;
12+
import org.apache.flink.streaming.api.datastream.DataStreamSink;
13+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
14+
import org.apache.flink.streaming.api.graph.StreamGraph;
15+
import org.apache.flink.streaming.api.graph.StreamNode;
1216
import org.apache.flink.streaming.api.lineage.LineageVertex;
1317

1418
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -35,34 +39,34 @@ void setup() {
3539

3640
@Test
3741
public void testGetLineageVertexWhenSerializerNotAnKafkaDatasetFacetProvider() {
38-
KafkaRecordSerializationSchema recordSerializer =
42+
KafkaRecordSerializationSchema<Object> recordSerializer =
3943
new KafkaRecordSerializationSchemaWithoutKafkaDatasetProvider();
40-
KafkaSink sink =
41-
new KafkaSink(
44+
KafkaSink<Object> sink =
45+
new KafkaSink<>(
4246
DeliveryGuarantee.EXACTLY_ONCE, new Properties(), "", recordSerializer);
4347

4448
assertThat(sink.getLineageVertex().datasets()).isEmpty();
4549
}
4650

4751
@Test
4852
public void testGetLineageVertexWhenNoKafkaDatasetFacetReturnedFromSerializer() {
49-
KafkaRecordSerializationSchema recordSerializer =
53+
KafkaRecordSerializationSchema<Object> recordSerializer =
5054
new KafkaRecordSerializationSchemaWithEmptyKafkaDatasetProvider();
5155

52-
KafkaSink sink =
53-
new KafkaSink(
56+
KafkaSink<Object> sink =
57+
new KafkaSink<>(
5458
DeliveryGuarantee.EXACTLY_ONCE, new Properties(), "", recordSerializer);
5559

5660
assertThat(sink.getLineageVertex().datasets()).isEmpty();
5761
}
5862

5963
@Test
6064
public void testGetLineageVertex() {
61-
KafkaRecordSerializationSchema recordSerializer =
65+
KafkaRecordSerializationSchema<Object> recordSerializer =
6266
new TestingKafkaRecordSerializationSchema();
6367

64-
KafkaSink sink =
65-
new KafkaSink(
68+
KafkaSink<Object> sink =
69+
new KafkaSink<>(
6670
DeliveryGuarantee.EXACTLY_ONCE, kafkaProperties, "", recordSerializer);
6771

6872
LineageVertex lineageVertex = sink.getLineageVertex();
@@ -91,8 +95,54 @@ public void testGetLineageVertex() {
9195
.hasFieldOrPropertyWithValue("typeInformation", TypeInformation.of(String.class));
9296
}
9397

94-
private static class KafkaRecordSerializationSchemaWithoutKafkaDatasetProvider
95-
implements KafkaRecordSerializationSchema {
98+
@Test
99+
public void testCoLocation() {
100+
String colocationKey = "testCoLocation";
101+
KafkaSink<Object> sink =
102+
new KafkaSink<>(
103+
DeliveryGuarantee.EXACTLY_ONCE,
104+
kafkaProperties,
105+
colocationKey,
106+
new TestingKafkaRecordSerializationSchema());
107+
108+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
109+
110+
env.<Object>fromData(1).sinkTo(sink);
111+
112+
StreamGraph streamGraph = env.getStreamGraph();
113+
assertThat(streamGraph.getStreamNodes())
114+
.filteredOn(node -> !node.getInEdges().isEmpty())
115+
.hasSize(2) // writer and committer
116+
.extracting(StreamNode::getCoLocationGroup)
117+
.containsOnly(colocationKey);
118+
}
119+
120+
@Test
121+
public void testPreserveCustomCoLocation() {
122+
String colocationKey = "testPreserveCustomCoLocation";
123+
String customColocationKey = "customCoLocation";
124+
KafkaSink<Object> sink =
125+
new KafkaSink<>(
126+
DeliveryGuarantee.EXACTLY_ONCE,
127+
kafkaProperties,
128+
colocationKey,
129+
new TestingKafkaRecordSerializationSchema());
130+
131+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
132+
133+
DataStreamSink<Object> stream = env.<Object>fromData(1).sinkTo(sink);
134+
stream.getTransformation().setCoLocationGroupKey(customColocationKey);
135+
136+
StreamGraph streamGraph = env.getStreamGraph();
137+
assertThat(streamGraph.getStreamNodes())
138+
.filteredOn(node -> !node.getInEdges().isEmpty())
139+
.hasSize(2) // writer and committer
140+
.extracting(StreamNode::getCoLocationGroup)
141+
.containsOnly(customColocationKey);
142+
}
143+
144+
private static class KafkaRecordSerializationSchemaWithoutKafkaDatasetProvider<T>
145+
implements KafkaRecordSerializationSchema<Object> {
96146
@Nullable
97147
@Override
98148
public ProducerRecord<byte[], byte[]> serialize(
@@ -102,7 +152,7 @@ public ProducerRecord<byte[], byte[]> serialize(
102152
}
103153

104154
private static class KafkaRecordSerializationSchemaWithEmptyKafkaDatasetProvider
105-
implements KafkaRecordSerializationSchema, KafkaDatasetFacetProvider {
155+
implements KafkaRecordSerializationSchema<Object>, KafkaDatasetFacetProvider {
106156
@Nullable
107157
@Override
108158
public ProducerRecord<byte[], byte[]> serialize(
@@ -117,7 +167,7 @@ public Optional<KafkaDatasetFacet> getKafkaDatasetFacet() {
117167
}
118168

119169
private static class TestingKafkaRecordSerializationSchema
120-
implements KafkaRecordSerializationSchema,
170+
implements KafkaRecordSerializationSchema<Object>,
121171
KafkaDatasetFacetProvider,
122172
TypeDatasetFacetProvider {
123173

0 commit comments

Comments
 (0)