Skip to content

Commit 580d3ed

Browse files
committed
[FLINK-37282] Add ProducerPool
Add first class producer pool that self-manages all resources and allows to recycle producers by transactional ids.
1 parent db07aa6 commit 580d3ed

File tree

7 files changed

+525
-19
lines changed

7 files changed

+525
-19
lines changed

flink-connector-kafka/archunit-violations/86dfd459-67a9-4b26-9b5c-0b0bbf22681a

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ org.apache.flink.connector.kafka.sink.KafkaWriterITCase does not satisfy: only o
2626
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
2727
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv\
2828
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
29+
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
30+
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
31+
org.apache.flink.connector.kafka.sink.internal.ProducerPoolImplITCase does not satisfy: only one of the following predicates match:\
32+
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
33+
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv\
34+
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
2935
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
3036
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
3137
org.apache.flink.connector.kafka.source.KafkaSourceITCase does not satisfy: only one of the following predicates match:\

flink-connector-kafka/archunit-violations/c0d94764-76a0-4c50-b617-70b1754c4612

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ Method <org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getProducer
5050
Method <org.apache.flink.connector.kafka.sink.KafkaSink.getKafkaProducerConfig()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSink.java:0)
5151
Method <org.apache.flink.connector.kafka.sink.KafkaSinkBuilder.setRecordSerializer(org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema)> calls method <org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in (KafkaSinkBuilder.java:152)
5252
Method <org.apache.flink.connector.kafka.sink.KafkaWriter.getCurrentProducer()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaWriter.java:0)
53+
Method <org.apache.flink.connector.kafka.sink.internal.ProducerPoolImpl.getProducers()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (ProducerPoolImpl.java:0)
5354
Method <org.apache.flink.connector.kafka.source.KafkaSource.createReader(org.apache.flink.api.connector.source.SourceReaderContext, java.util.function.Consumer)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSource.java:0)
5455
Method <org.apache.flink.connector.kafka.source.KafkaSource.getConfiguration()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSource.java:0)
5556
Method <org.apache.flink.connector.kafka.source.KafkaSource.getKafkaSubscriber()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSource.java:0)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.kafka.sink.internal;
20+
21+
import org.apache.flink.annotation.Internal;
22+
23+
import java.util.Objects;
24+
25+
import static org.apache.flink.util.Preconditions.checkNotNull;
26+
27+
/**
28+
* An immutable class that represents a transactional id and a checkpoint id. It's used inside the
29+
* {@link ProducerPoolImpl} to keep track of the transactions that are currently in-flight. The
30+
* checkpoint id is used to subsume committed transactions wrt to recycling producers.
31+
*/
32+
@Internal
33+
public class CheckpointTransaction {
34+
private final String transactionalId;
35+
private final long checkpointId;
36+
37+
public CheckpointTransaction(String transactionalId, long checkpointId) {
38+
this.transactionalId = checkNotNull(transactionalId, "transactionalId must not be null");
39+
this.checkpointId = checkpointId;
40+
}
41+
42+
public long getCheckpointId() {
43+
return checkpointId;
44+
}
45+
46+
public String getTransactionalId() {
47+
return transactionalId;
48+
}
49+
50+
@Override
51+
public boolean equals(Object o) {
52+
if (this == o) {
53+
return true;
54+
}
55+
if (o == null || getClass() != o.getClass()) {
56+
return false;
57+
}
58+
CheckpointTransaction that = (CheckpointTransaction) o;
59+
return checkpointId == that.checkpointId
60+
&& Objects.equals(transactionalId, that.transactionalId);
61+
}
62+
63+
@Override
64+
public int hashCode() {
65+
return Objects.hash(transactionalId, checkpointId);
66+
}
67+
68+
@Override
69+
public String toString() {
70+
return "CheckpointTransaction{"
71+
+ "transactionalId='"
72+
+ transactionalId
73+
+ '\''
74+
+ ", checkpointId="
75+
+ checkpointId
76+
+ '}';
77+
}
78+
}

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/FlinkKafkaInternalProducer.java

Lines changed: 38 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818

1919
package org.apache.flink.connector.kafka.sink.internal;
2020

21-
import org.apache.flink.annotation.Internal;
22-
2321
import org.apache.kafka.clients.producer.Callback;
2422
import org.apache.kafka.clients.producer.KafkaProducer;
2523
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -46,9 +44,7 @@
4644
/**
4745
* A {@link KafkaProducer} that exposes private fields to allow resume producing from a given state.
4846
*/
49-
@Internal
5047
public class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> {
51-
5248
private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaInternalProducer.class);
5349
private static final String TRANSACTION_MANAGER_FIELD_NAME = "transactionManager";
5450
private static final String TRANSACTION_MANAGER_STATE_ENUM =
@@ -173,28 +169,51 @@ public void initTransactionId(String transactionalId) {
173169
}
174170
}
175171

172+
/**
173+
* Sets the transaction manager state to uninitialized.
174+
*
175+
* <p>Can only be called if the producer is in a transaction. Its main purpose is to resolve the
176+
* split brain scenario between writer and committer.
177+
*/
178+
public void transactionCompletedExternally() {
179+
checkState(inTransaction, "Not in transactional state");
180+
this.inTransaction = false;
181+
this.hasRecordsInTransaction = false;
182+
Object transactionManager = getTransactionManager();
183+
synchronized (transactionManager) {
184+
setField(transactionManager, "transactionalId", transactionalId);
185+
setField(
186+
transactionManager,
187+
"currentState",
188+
getTransactionManagerState("UNINITIALIZED"));
189+
}
190+
}
191+
192+
/**
193+
* Sets the transactional id and sets the transaction manager state to uninitialized.
194+
*
195+
* <p>Can only be called if the producer is not in a transaction.
196+
*/
176197
public void setTransactionId(String transactionalId) {
177-
if (!transactionalId.equals(this.transactionalId)) {
178-
checkState(
179-
!inTransaction,
180-
String.format("Another transaction %s is still open.", transactionalId));
181-
LOG.debug("Change transaction id from {} to {}", this.transactionalId, transactionalId);
182-
Object transactionManager = getTransactionManager();
183-
synchronized (transactionManager) {
184-
setField(transactionManager, "transactionalId", transactionalId);
185-
setField(
186-
transactionManager,
187-
"currentState",
188-
getTransactionManagerState("UNINITIALIZED"));
189-
this.transactionalId = transactionalId;
190-
}
198+
checkState(
199+
!inTransaction,
200+
String.format("Another transaction %s is still open.", transactionalId));
201+
LOG.debug("Change transaction id from {} to {}", this.transactionalId, transactionalId);
202+
this.transactionalId = transactionalId;
203+
Object transactionManager = getTransactionManager();
204+
synchronized (transactionManager) {
205+
setField(transactionManager, "transactionalId", transactionalId);
206+
setField(
207+
transactionManager,
208+
"currentState",
209+
getTransactionManagerState("UNINITIALIZED"));
191210
}
192211
}
193212

194213
/**
195214
* Besides committing {@link org.apache.kafka.clients.producer.KafkaProducer#commitTransaction}
196215
* is also adding new partitions to the transaction. flushNewPartitions method is moving this
197-
* logic to pre-commit/flush, to make resumeTransaction simpler. Otherwise resumeTransaction
216+
* logic to pre-commit/flush, to make resumeTransaction simpler. Otherwise, resumeTransaction
198217
* would require to restore state of the not yet added/"in-flight" partitions.
199218
*/
200219
private void flushNewPartitions() {
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.kafka.sink.internal;
20+
21+
/** A pool of producers that can be recycled. */
22+
public interface ProducerPool extends AutoCloseable {
23+
/**
24+
* Notify the pool that a transaction has finished. The producer with the given transactional id
25+
* can be recycled.
26+
*/
27+
void recycleByTransactionId(String transactionalId);
28+
29+
/**
30+
* Get a producer for the given transactional id and checkpoint id. The producer is not recycled
31+
* until it is passed to the committer, the committer commits the transaction, and {@link
32+
* #recycleByTransactionId(String)} is called. Alternatively, the producer can be recycled by
33+
* {@link #recycle(FlinkKafkaInternalProducer)}.
34+
*/
35+
FlinkKafkaInternalProducer<byte[], byte[]> getTransactionalProducer(
36+
String transactionalId, long checkpointId);
37+
38+
/**
39+
* Explicitly recycle a producer. This is useful when the producer has not been passed to the
40+
* committer.
41+
*/
42+
void recycle(FlinkKafkaInternalProducer<byte[], byte[]> producer);
43+
}

0 commit comments

Comments
 (0)