Skip to content

Commit 308cc0d

Browse files
committed
[FLINK-37366] Allow configurable retry for Kafka topic metadata fetch
1 parent a52f15a commit 308cc0d

File tree

10 files changed

+89
-26
lines changed

10 files changed

+89
-26
lines changed

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java

+12
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,18 @@ public class KafkaSourceOptions {
5858
.defaultValue(true)
5959
.withDescription("Whether to commit consuming offset on checkpoint.");
6060

61+
public static final ConfigOption<Integer> TOPIC_METADATA_REQUEST_MAX_RETRY =
62+
ConfigOptions.key("topic.metadata.max.retry")
63+
.intType()
64+
.defaultValue(0)
65+
.withDescription("Max number of retries for topic metadata request before failing the source.");
66+
67+
public static final ConfigOption<Long> TOPIC_METADATA_REQUEST_RETRY_INTERVAL_MS =
68+
ConfigOptions.key("topic.metadata.retry.interval.ms")
69+
.longType()
70+
.defaultValue(Duration.ofSeconds(30).toMillis())
71+
.withDescription("The interval in milliseconds between retries for topic metadata request.");
72+
6173
@SuppressWarnings("unchecked")
6274
public static <T> T getOption(
6375
Properties props, ConfigOption<?> configOption, Function<String, T> parser) {

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ public void close() {
230230
* @return Set of subscribed {@link TopicPartition}s
231231
*/
232232
private Set<TopicPartition> getSubscribedTopicPartitions() {
233-
return subscriber.getSubscribedTopicPartitions(adminClient);
233+
return subscriber.getSubscribedTopicPartitions(adminClient, properties);
234234
}
235235

236236
/**

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import java.io.Serializable;
2727
import java.util.List;
28+
import java.util.Properties;
2829
import java.util.Set;
2930
import java.util.regex.Pattern;
3031

@@ -51,9 +52,11 @@ public interface KafkaSubscriber extends Serializable {
5152
* Get a set of subscribed {@link TopicPartition}s.
5253
*
5354
* @param adminClient The admin client used to retrieve subscribed topic partitions.
55+
* @param properties The properties for the configuration.
5456
* @return A set of subscribed {@link TopicPartition}s
5557
*/
56-
Set<TopicPartition> getSubscribedTopicPartitions(AdminClient adminClient);
58+
Set<TopicPartition> getSubscribedTopicPartitions(
59+
AdminClient adminClient, Properties properties);
5760

5861
// ----------------- factory methods --------------
5962

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberUtils.java

+48-10
Original file line numberDiff line numberDiff line change
@@ -18,37 +18,45 @@
1818

1919
package org.apache.flink.connector.kafka.source.enumerator.subscriber;
2020

21+
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
22+
2123
import org.apache.kafka.clients.admin.AdminClient;
2224
import org.apache.kafka.clients.admin.TopicDescription;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
2327

2428
import java.util.Map;
29+
import java.util.Properties;
2530
import java.util.Set;
31+
import java.util.concurrent.TimeUnit;
2632
import java.util.regex.Pattern;
2733
import java.util.stream.Collectors;
2834

2935
/** The base implementations of {@link KafkaSubscriber}. */
3036
class KafkaSubscriberUtils {
37+
private static final Logger LOG = LoggerFactory.getLogger(KafkaSubscriberUtils.class);
3138

3239
private KafkaSubscriberUtils() {}
3340

34-
static Map<String, TopicDescription> getAllTopicMetadata(AdminClient adminClient) {
41+
static Map<String, TopicDescription> getAllTopicMetadata(
42+
AdminClient adminClient, Properties properties) {
3543
try {
3644
Set<String> allTopicNames = adminClient.listTopics().names().get();
37-
return getTopicMetadata(adminClient, allTopicNames);
45+
return getTopicMetadata(adminClient, allTopicNames, properties);
3846
} catch (Exception e) {
3947
throw new RuntimeException("Failed to get metadata for all topics.", e);
4048
}
4149
}
4250

4351
static Map<String, TopicDescription> getTopicMetadata(
44-
AdminClient adminClient, Pattern topicPattern) {
52+
AdminClient adminClient, Pattern topicPattern, Properties properties) {
4553
try {
4654
Set<String> allTopicNames = adminClient.listTopics().names().get();
4755
Set<String> matchedTopicNames =
4856
allTopicNames.stream()
4957
.filter(name -> topicPattern.matcher(name).matches())
5058
.collect(Collectors.toSet());
51-
return getTopicMetadata(adminClient, matchedTopicNames);
59+
return getTopicMetadata(adminClient, matchedTopicNames, properties);
5260
} catch (Exception e) {
5361
throw new RuntimeException(
5462
String.format("Failed to get metadata for %s topics.", topicPattern.pattern()),
@@ -57,12 +65,42 @@ static Map<String, TopicDescription> getTopicMetadata(
5765
}
5866

5967
static Map<String, TopicDescription> getTopicMetadata(
60-
AdminClient adminClient, Set<String> topicNames) {
61-
try {
62-
return adminClient.describeTopics(topicNames).allTopicNames().get();
63-
} catch (Exception e) {
64-
throw new RuntimeException(
65-
String.format("Failed to get metadata for topics %s.", topicNames), e);
68+
AdminClient adminClient, Set<String> topicNames, Properties properties) {
69+
int maxRetries =
70+
KafkaSourceOptions.getOption(
71+
properties,
72+
KafkaSourceOptions.TOPIC_METADATA_REQUEST_MAX_RETRY,
73+
Integer::parseInt);
74+
long retryDelay =
75+
KafkaSourceOptions.getOption(
76+
properties,
77+
KafkaSourceOptions.TOPIC_METADATA_REQUEST_RETRY_INTERVAL_MS,
78+
Long::parseLong);
79+
for (int attempt = 0; attempt <= maxRetries; attempt++) {
80+
try {
81+
return adminClient.describeTopics(topicNames).allTopicNames().get();
82+
} catch (Exception e) {
83+
if (attempt == maxRetries) {
84+
throw new RuntimeException(
85+
String.format("Failed to get metadata for topics %s.", topicNames), e);
86+
} else {
87+
LOG.warn(
88+
"Attempt {} to get metadata for topics {} failed. Retrying in {} ms...",
89+
attempt,
90+
topicNames,
91+
retryDelay);
92+
try {
93+
TimeUnit.MILLISECONDS.sleep(retryDelay);
94+
} catch (InterruptedException ie) {
95+
Thread.currentThread().interrupt(); // Restore interrupted state
96+
LOG.error("Thread was interrupted during metadata fetch retry delay.", ie);
97+
}
98+
}
99+
}
66100
}
101+
// This statement will never be reached because either a valid result is returned or an
102+
// exception is thrown.
103+
throw new IllegalStateException(
104+
"Unexpected error in getTopicMetadata: reached unreachable code.");
67105
}
68106
}

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.HashSet;
3131
import java.util.Map;
3232
import java.util.Optional;
33+
import java.util.Properties;
3334
import java.util.Set;
3435
import java.util.stream.Collectors;
3536

@@ -46,15 +47,16 @@ class PartitionSetSubscriber implements KafkaSubscriber, KafkaDatasetIdentifierP
4647
}
4748

4849
@Override
49-
public Set<TopicPartition> getSubscribedTopicPartitions(AdminClient adminClient) {
50+
public Set<TopicPartition> getSubscribedTopicPartitions(
51+
AdminClient adminClient, Properties properties) {
5052
final Set<String> topicNames =
5153
subscribedPartitions.stream()
5254
.map(TopicPartition::topic)
5355
.collect(Collectors.toSet());
5456

5557
LOG.debug("Fetching descriptions for topics: {}", topicNames);
5658
final Map<String, TopicDescription> topicMetadata =
57-
getTopicMetadata(adminClient, topicNames);
59+
getTopicMetadata(adminClient, topicNames, properties);
5860

5961
Set<TopicPartition> existingSubscribedPartitions = new HashSet<>();
6062

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicListSubscriber.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.List;
3333
import java.util.Map;
3434
import java.util.Optional;
35+
import java.util.Properties;
3536
import java.util.Set;
3637

3738
import static org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata;
@@ -50,10 +51,11 @@ class TopicListSubscriber implements KafkaSubscriber, KafkaDatasetIdentifierProv
5051
}
5152

5253
@Override
53-
public Set<TopicPartition> getSubscribedTopicPartitions(AdminClient adminClient) {
54+
public Set<TopicPartition> getSubscribedTopicPartitions(
55+
AdminClient adminClient, Properties properties) {
5456
LOG.debug("Fetching descriptions for topics: {}", topics);
5557
final Map<String, TopicDescription> topicMetadata =
56-
getTopicMetadata(adminClient, new HashSet<>(topics));
58+
getTopicMetadata(adminClient, new HashSet<>(topics), properties);
5759

5860
Set<TopicPartition> subscribedPartitions = new HashSet<>();
5961
for (TopicDescription topic : topicMetadata.values()) {

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.HashSet;
3232
import java.util.Map;
3333
import java.util.Optional;
34+
import java.util.Properties;
3435
import java.util.Set;
3536
import java.util.regex.Pattern;
3637

@@ -47,10 +48,11 @@ class TopicPatternSubscriber implements KafkaSubscriber, KafkaDatasetIdentifierP
4748
}
4849

4950
@Override
50-
public Set<TopicPartition> getSubscribedTopicPartitions(AdminClient adminClient) {
51+
public Set<TopicPartition> getSubscribedTopicPartitions(
52+
AdminClient adminClient, Properties properties) {
5153
LOG.debug("Fetching descriptions for {} topics on Kafka cluster", topicPattern.pattern());
5254
final Map<String, TopicDescription> matchedTopicMetadata =
53-
getTopicMetadata(adminClient, topicPattern);
55+
getTopicMetadata(adminClient, topicPattern, properties);
5456

5557
Set<TopicPartition> subscribedTopicPartitions = new HashSet<>();
5658

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.Collections;
3939
import java.util.HashMap;
4040
import java.util.Map;
41+
import java.util.Properties;
4142
import java.util.Set;
4243
import java.util.regex.Pattern;
4344
import java.util.stream.Stream;
@@ -251,7 +252,7 @@ private KafkaSourceBuilder<String> getBasicBuilder() {
251252
private static class ExampleCustomSubscriber implements KafkaSubscriber {
252253

253254
@Override
254-
public Set<TopicPartition> getSubscribedTopicPartitions(AdminClient adminClient) {
255+
public Set<TopicPartition> getSubscribedTopicPartitions(AdminClient adminClient, Properties properties) {
255256
return Collections.singleton(new TopicPartition("topic", 0));
256257
}
257258
}

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void testGetLineageVertexWhenSubscriberNotAnKafkaDatasetFacetProvider() {
6161
new KafkaSubscriber() {
6262
@Override
6363
public Set<TopicPartition> getSubscribedTopicPartitions(
64-
AdminClient adminClient) {
64+
AdminClient adminClient, Properties properties) {
6565
return null;
6666
}
6767
})
@@ -176,7 +176,7 @@ public Optional<DefaultKafkaDatasetIdentifier> getDatasetIdentifier() {
176176
}
177177

178178
@Override
179-
public Set<TopicPartition> getSubscribedTopicPartitions(AdminClient adminClient) {
179+
public Set<TopicPartition> getSubscribedTopicPartitions(AdminClient adminClient, Properties properties) {
180180
return null;
181181
}
182182
}

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.Collections;
3434
import java.util.HashSet;
3535
import java.util.List;
36+
import java.util.Properties;
3637
import java.util.Set;
3738
import java.util.regex.Pattern;
3839

@@ -46,13 +47,15 @@ public class KafkaSubscriberTest {
4647
private static final String TOPIC2 = "pattern-topic";
4748
private static final TopicPartition NON_EXISTING_TOPIC = new TopicPartition("removed", 0);
4849
private static AdminClient adminClient;
50+
private static Properties properties;
4951

5052
@BeforeClass
5153
public static void setup() throws Throwable {
5254
KafkaSourceTestEnv.setup();
5355
KafkaSourceTestEnv.createTestTopic(TOPIC1);
5456
KafkaSourceTestEnv.createTestTopic(TOPIC2);
5557
adminClient = KafkaSourceTestEnv.getAdminClient();
58+
properties = new Properties();
5659
}
5760

5861
@AfterClass
@@ -67,7 +70,7 @@ public void testTopicListSubscriber() {
6770
KafkaSubscriber subscriber =
6871
KafkaSubscriber.getTopicListSubscriber(Arrays.asList(TOPIC1, TOPIC2));
6972
final Set<TopicPartition> subscribedPartitions =
70-
subscriber.getSubscribedTopicPartitions(adminClient);
73+
subscriber.getSubscribedTopicPartitions(adminClient, properties);
7174

7275
final Set<TopicPartition> expectedSubscribedPartitions =
7376
new HashSet<>(KafkaSourceTestEnv.getPartitionsForTopics(topics));
@@ -83,7 +86,7 @@ public void testNonExistingTopic() {
8386
KafkaSubscriber.getTopicListSubscriber(
8487
Collections.singletonList(NON_EXISTING_TOPIC.topic()));
8588

86-
assertThatThrownBy(() -> subscriber.getSubscribedTopicPartitions(adminClient))
89+
assertThatThrownBy(() -> subscriber.getSubscribedTopicPartitions(adminClient, properties))
8790
.isInstanceOf(RuntimeException.class)
8891
.satisfies(anyCauseMatches(UnknownTopicOrPartitionException.class));
8992
}
@@ -93,7 +96,7 @@ public void testTopicPatternSubscriber() {
9396
Pattern pattern = Pattern.compile("pattern.*");
9497
KafkaSubscriber subscriber = KafkaSubscriber.getTopicPatternSubscriber(pattern);
9598
final Set<TopicPartition> subscribedPartitions =
96-
subscriber.getSubscribedTopicPartitions(adminClient);
99+
subscriber.getSubscribedTopicPartitions(adminClient, properties);
97100

98101
final Set<TopicPartition> expectedSubscribedPartitions =
99102
new HashSet<>(
@@ -114,7 +117,7 @@ public void testPartitionSetSubscriber() {
114117
KafkaSubscriber subscriber = KafkaSubscriber.getPartitionSetSubscriber(partitions);
115118

116119
final Set<TopicPartition> subscribedPartitions =
117-
subscriber.getSubscribedTopicPartitions(adminClient);
120+
subscriber.getSubscribedTopicPartitions(adminClient, properties);
118121

119122
assertThat(subscribedPartitions).isEqualTo(partitions);
120123
assertThat(((KafkaDatasetIdentifierProvider) subscriber).getDatasetIdentifier().get())
@@ -128,7 +131,7 @@ public void testNonExistingPartition() {
128131
KafkaSubscriber.getPartitionSetSubscriber(
129132
Collections.singleton(nonExistingPartition));
130133

131-
assertThatThrownBy(() -> subscriber.getSubscribedTopicPartitions(adminClient))
134+
assertThatThrownBy(() -> subscriber.getSubscribedTopicPartitions(adminClient, properties))
132135
.isInstanceOf(RuntimeException.class)
133136
.hasMessage(
134137
String.format(

0 commit comments

Comments
 (0)