Skip to content

Commit cbcf582

Browse files
BE: Impl a default timeout for http requests (#799)
Co-authored-by: German Osin <german.osin@gmail.com>
1 parent 8847b0d commit cbcf582

File tree

6 files changed

+27
-5
lines changed

6 files changed

+27
-5
lines changed

api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ public class RetryingKafkaConnectClient extends KafkaConnectClientApi {
3939

4040
public RetryingKafkaConnectClient(ClustersProperties.ConnectCluster config,
4141
@Nullable ClustersProperties.TruststoreConfig truststoreConfig,
42-
DataSize maxBuffSize) {
43-
super(new RetryingApiClient(config, truststoreConfig, maxBuffSize));
42+
DataSize maxBuffSize,
43+
Duration responseTimeout) {
44+
super(new RetryingApiClient(config, truststoreConfig, maxBuffSize, responseTimeout));
4445
}
4546

4647
private static Retry conflictCodeRetry() {
@@ -318,14 +319,16 @@ private static class RetryingApiClient extends ApiClient {
318319

319320
public RetryingApiClient(ClustersProperties.ConnectCluster config,
320321
ClustersProperties.TruststoreConfig truststoreConfig,
321-
DataSize maxBuffSize) {
322-
super(buildWebClient(maxBuffSize, config, truststoreConfig), null, null);
322+
DataSize maxBuffSize,
323+
Duration responseTimeout) {
324+
super(buildWebClient(maxBuffSize, responseTimeout, config, truststoreConfig), null, null);
323325
setBasePath(config.getAddress());
324326
setUsername(config.getUsername());
325327
setPassword(config.getPassword());
326328
}
327329

328330
public static WebClient buildWebClient(DataSize maxBuffSize,
331+
Duration responseTimeout,
329332
ClustersProperties.ConnectCluster config,
330333
ClustersProperties.TruststoreConfig truststoreConfig) {
331334
return new WebClientConfigurator()
@@ -341,6 +344,7 @@ public static WebClient buildWebClient(DataSize maxBuffSize,
341344
config.getPassword()
342345
)
343346
.configureBufferSize(maxBuffSize)
347+
.configureResponseTimeout(responseTimeout)
344348
.build();
345349
}
346350
}

api/src/main/java/io/kafbat/ui/config/ClustersProperties.java

+1
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ public static class PollingProperties {
7777
Integer pollTimeoutMs;
7878
Integer maxPageSize;
7979
Integer defaultPageSize;
80+
Integer responseTimeoutMs;
8081
}
8182

8283
@Data

api/src/main/java/io/kafbat/ui/config/WebclientProperties.java

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
public class WebclientProperties {
1414

1515
String maxInMemoryBufferSize;
16+
Integer responseTimeoutMs;
1617

1718
@PostConstruct
1819
public void validate() {

api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.kafbat.ui.util.KafkaServicesValidation;
1717
import io.kafbat.ui.util.ReactiveFailover;
1818
import io.kafbat.ui.util.WebClientConfigurator;
19+
import java.time.Duration;
1920
import java.util.HashMap;
2021
import java.util.List;
2122
import java.util.Map;
@@ -37,13 +38,18 @@
3738
public class KafkaClusterFactory {
3839

3940
private static final DataSize DEFAULT_WEBCLIENT_BUFFER = DataSize.parse("20MB");
41+
private static final Duration DEFAULT_RESPONSE_TIMEOUT = Duration.ofSeconds(20);
4042

4143
private final DataSize webClientMaxBuffSize;
44+
private final Duration responseTimeout;
4245

4346
public KafkaClusterFactory(WebclientProperties webclientProperties) {
4447
this.webClientMaxBuffSize = Optional.ofNullable(webclientProperties.getMaxInMemoryBufferSize())
4548
.map(DataSize::parse)
4649
.orElse(DEFAULT_WEBCLIENT_BUFFER);
50+
this.responseTimeout = Optional.ofNullable(webclientProperties.getResponseTimeoutMs())
51+
.map(Duration::ofMillis)
52+
.orElse(DEFAULT_RESPONSE_TIMEOUT);
4753
}
4854

4955
public KafkaCluster create(ClustersProperties properties,
@@ -147,7 +153,8 @@ private ReactiveFailover<KafkaConnectClientApi> connectClient(ClustersProperties
147153
url -> new RetryingKafkaConnectClient(
148154
connectCluster.toBuilder().address(url).build(),
149155
cluster.getSsl(),
150-
webClientMaxBuffSize
156+
webClientMaxBuffSize,
157+
responseTimeout
151158
),
152159
ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER,
153160
"No alive connect instances available",

api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java

+6
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
1111
import java.io.FileInputStream;
1212
import java.security.KeyStore;
13+
import java.time.Duration;
1314
import java.util.function.Consumer;
1415
import javax.annotation.Nullable;
1516
import javax.net.ssl.KeyManagerFactory;
@@ -144,6 +145,11 @@ public WebClientConfigurator configureCodecs(Consumer<ClientCodecConfigurer> con
144145
return this;
145146
}
146147

148+
public WebClientConfigurator configureResponseTimeout(Duration responseTimeout) {
149+
httpClient = httpClient.responseTimeout(responseTimeout);
150+
return this;
151+
}
152+
147153
public WebClient build() {
148154
return builder.clientConnector(new ReactorClientHttpConnector(httpClient)).build();
149155
}

contract/src/main/resources/swagger/kafbat-ui-api.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -4245,6 +4245,9 @@ components:
42454245
maxInMemoryBufferSize:
42464246
type: string
42474247
description: "examples: 20, 12KB, 5MB"
4248+
responseTimeoutMs:
4249+
type: integer
4250+
description: "general response timeout in milliseconds for all http requests"
42484251
kafka:
42494252
type: object
42504253
properties:

0 commit comments

Comments
 (0)