Skip to content

Commit be38440

Browse files
Create Kafka Admin Client outside of the Parallel scheduler thread pool (#784)
1 parent 49894b8 commit be38440

File tree

1 file changed

+3
-2
lines changed

1 file changed

+3
-2
lines changed

api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import java.util.Map;
99
import java.util.Optional;
1010
import java.util.Properties;
11+
import java.util.concurrent.CompletableFuture;
1112
import java.util.concurrent.ConcurrentHashMap;
1213
import java.util.concurrent.atomic.AtomicLong;
1314
import lombok.extern.slf4j.Slf4j;
@@ -40,7 +41,7 @@ public Mono<ReactiveAdminClient> get(KafkaCluster cluster) {
4041
}
4142

4243
private Mono<ReactiveAdminClient> createAdminClient(KafkaCluster cluster) {
43-
return Mono.fromSupplier(() -> {
44+
return Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
4445
Properties properties = new Properties();
4546
KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
4647
properties.putAll(cluster.getProperties());
@@ -51,7 +52,7 @@ private Mono<ReactiveAdminClient> createAdminClient(KafkaCluster cluster) {
5152
"kafbat-ui-admin-" + Instant.now().getEpochSecond() + "-" + CLIENT_ID_SEQ.incrementAndGet()
5253
);
5354
return AdminClient.create(properties);
54-
}).flatMap(ac -> ReactiveAdminClient.create(ac).doOnError(th -> ac.close()))
55+
})).flatMap(ac -> ReactiveAdminClient.create(ac).doOnError(th -> ac.close()))
5556
.onErrorMap(th -> new IllegalStateException(
5657
"Error while creating AdminClient for the cluster " + cluster.getName(), th));
5758
}

0 commit comments

Comments
 (0)