Skip to content

Commit 54e02a7

Browse files
authored
Increase Kafka topic polling time in test to 10 seconds (#18040)
We've seen multiple failures where the 3 second timeout was breached. This happens in the test setup of many different tests so multiple individual test failures have been reported. Also replace deprecated method in Kafka API and simplify a couple other code paths. Signed-off-by: Andrew Ross <andrross@amazon.com>
1 parent e25f332 commit 54e02a7

File tree

1 file changed

+8
-12
lines changed
  • plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka

1 file changed

+8
-12
lines changed

plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaUtils.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -53,33 +53,29 @@ public static void createTopic(String topicName, int numOfPartitions, String boo
5353
}
5454

5555
// validates topic is created
56-
await().atMost(3, TimeUnit.SECONDS).until(() -> checkTopicExistence(topicName, bootstrapServers));
56+
await().atMost(10, TimeUnit.SECONDS).until(() -> checkTopicExistence(topicName, bootstrapServers));
5757
}
5858

5959
public static boolean checkTopicExistence(String topicName, String bootstrapServers) {
6060
return getAdminClient(bootstrapServers, (client -> {
61-
Map<String, KafkaFuture<TopicDescription>> topics = client.describeTopics(List.of(topicName)).values();
61+
Map<String, KafkaFuture<TopicDescription>> topics = client.describeTopics(List.of(topicName)).topicNameValues();
6262

6363
try {
6464
return topics.containsKey(topicName) && topics.get(topicName).get().name().equals(topicName);
65-
} catch (InterruptedException e) {
66-
LOGGER.error("error on checkTopicExistence", e);
67-
return false;
68-
} catch (ExecutionException e) {
65+
} catch (InterruptedException | ExecutionException e) {
6966
LOGGER.error("error on checkTopicExistence", e);
7067
return false;
7168
}
7269
}));
7370
}
7471

7572
private static <Rep> Rep getAdminClient(String bootstrapServer, Function<AdminClient, Rep> function) {
76-
AdminClient adminClient = KafkaAdminClient.create(
77-
Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer, AdminClientConfig.CLIENT_ID_CONFIG, "test")
78-
);
79-
try {
73+
try (
74+
AdminClient adminClient = KafkaAdminClient.create(
75+
Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer, AdminClientConfig.CLIENT_ID_CONFIG, "test")
76+
)
77+
) {
8078
return function.apply(adminClient);
81-
} finally {
82-
adminClient.close();
8379
}
8480
}
8581
}

0 commit comments

Comments
 (0)