Skip to content

Add request specific policies #882

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
import com.commercetools.api.defaultconfig.ApiRootBuilder;
import com.commercetools.api.defaultconfig.ServiceRegion;
import com.commercetools.api.models.category.Category;
import com.commercetools.api.models.product.ProductProjectionPagedSearchResponse;
import commercetools.utils.CommercetoolsTestUtils;

import io.vrap.rmf.base.client.ApiHttpMethod;
import io.vrap.rmf.base.client.ApiHttpResponse;
import io.vrap.rmf.base.client.http.HttpStatusCode;
import io.vrap.rmf.base.client.oauth2.ClientCredentials;
Expand Down Expand Up @@ -93,4 +95,48 @@ public void timeoutWithRetryTimeout() {
.getBody();
});
}

@Test
public void requestPolicies() {
String projectKey = CommercetoolsTestUtils.getProjectKey();

ProjectApiRoot b = ApiRootBuilder.of()
.defaultClient(ClientCredentials.of()
.withClientId(CommercetoolsTestUtils.getClientId())
.withClientSecret(CommercetoolsTestUtils.getClientSecret())
.build(),
ServiceRegion.GCP_EUROPE_WEST1)
.withTelemetryMiddleware((request, next) -> next.apply(request).thenApply((response) -> {
try {
Thread.sleep(2000); // ensure timeout
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
return response;
}))
.withRequestPolicies(policies -> policies
.withRequestMatching(apiHttpRequest -> apiHttpRequest.getMethod().equals(ApiHttpMethod.POST),
policyBuilder -> policyBuilder.withTimeout(Duration.ofSeconds(10)))
.withRequestMatching(apiHttpRequest -> apiHttpRequest.getMethod().equals(ApiHttpMethod.GET),
policyBuilder -> policyBuilder.withTimeout(Duration.ofSeconds(1)))
.withAllOtherRequests(policyBuilder -> policyBuilder.withTimeout(Duration.ofSeconds(60))))
.build(projectKey);

Assertions.assertThatExceptionOfType(TimeoutExceededException.class).isThrownBy(() -> {
Category category = b.categories()
.withId("fdbaf4ea-fbc9-4fea-bac4-1d7e6c1995b3")
.get()
.executeBlocking()
.getBody();
});

ProductProjectionPagedSearchResponse searchResponse = b.productProjections()
.search()
.post()
.executeBlocking()
.getBody();

Assertions.assertThat(searchResponse).isNotNull();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ public class CommercetoolsTestUtils {
static {
ApiRootBuilder builder = ApiRootBuilder.ofEnvironmentVariables()
.addConcurrentModificationMiddleware()
.withPolicies(
policyBuilder -> policyBuilder.withRetry(b -> b.maxRetries(5).statusCodes(singletonList(503))))
.withRequestPolicies(policyBuilder -> policyBuilder.withAllOtherRequests(
request -> request.withRetry(b -> b.maxRetries(5).statusCodes(singletonList(503)))))
.withErrorMiddleware(ErrorMiddleware.ExceptionMode.UNWRAP_COMPLETION_EXCEPTION);
projectApiRoot = builder.buildProjectRoot();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,14 @@
return with(clientBuilder -> clientBuilder.withPolicies(policyBuilder));
}

public ApiRootBuilder withRequestPolicies(final Function<RequestPolicyBuilder, RequestPolicyBuilder> fn) {
return with(clientBuilder -> clientBuilder.withRequestPolicies(fn));
}

public ApiRootBuilder withRequestPolicies(final RequestPolicyBuilder policyBuilder) {
return with(clientBuilder -> clientBuilder.withRequestPolicies(policyBuilder));

Check warning on line 329 in commercetools/commercetools-sdk-java-api/src/main/java/com/commercetools/api/defaultconfig/ApiRootBuilder.java

View check run for this annotation

Codecov / codecov/patch

commercetools/commercetools-sdk-java-api/src/main/java/com/commercetools/api/defaultconfig/ApiRootBuilder.java#L329

Added line #L329 was not covered by tests
}

public ApiRootBuilder withPolicyMiddleware(final PolicyMiddleware policyMiddleware) {
return with(clientBuilder -> clientBuilder.withPolicyMiddleware(policyMiddleware));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@
import org.junit.jupiter.api.Test;
import org.slf4j.event.Level;

import dev.failsafe.Failsafe;
import dev.failsafe.FailsafeExecutor;
import reactor.netty.http.HttpProtocol;

public class ExamplesTest {
Expand Down Expand Up @@ -121,18 +119,14 @@ public void customUrls() {
}

public void timeoutMiddleware() {
dev.failsafe.Timeout<ApiHttpResponse<byte[]>> timeout = dev.failsafe.Timeout
.<ApiHttpResponse<byte[]>> builder(Duration.ofSeconds(10))
.build();
FailsafeExecutor<ApiHttpResponse<byte[]>> failsafeExecutor = Failsafe.with(timeout);

ProjectApiRoot apiRoot = ApiRootBuilder.of()
.defaultClient(ClientCredentials.of()
.withClientId("your-client-id")
.withClientSecret("your-client-secret")
.build(),
ServiceRegion.GCP_EUROPE_WEST1)
.addMiddleware((request, next) -> failsafeExecutor.getStageAsync(() -> next.apply(request)))
.withRequestPolicies(
policies -> policies.withAllOtherRequests(request -> request.withTimeout(Duration.ofSeconds(10))))
.build("my-project");
}

Expand Down Expand Up @@ -401,8 +395,12 @@ public void retry() {
ProjectApiRoot apiRoot = ApiRootBuilder.of()
.defaultClient(ClientCredentials.of().withClientId("clientId").withClientSecret("clientSecret").build(),
ServiceRegion.GCP_EUROPE_WEST1)
.withPolicies(policies -> policies.withRetry(builder -> builder.maxRetries(5)
.statusCodes(Arrays.asList(BAD_GATEWAY_502, SERVICE_UNAVAILABLE_503, GATEWAY_TIMEOUT_504))))
.withRequestPolicies(
policies -> policies
.withAllOtherRequests(
request -> request.withRetry(builder -> builder.maxRetries(5)
.statusCodes(Arrays.asList(BAD_GATEWAY_502, SERVICE_UNAVAILABLE_503,
GATEWAY_TIMEOUT_504)))))
.build("my-project");
}

Expand Down Expand Up @@ -590,7 +588,8 @@ public void httpConcurrentLimitation() {
public void queueConcurrentLimitation() {
ApiRootBuilder.of()
// ...
.withPolicies(policies -> policies.withBulkhead(64, Duration.ofSeconds(10)))
.withRequestPolicies(policies -> policies
.withAllOtherRequests(request -> request.withBulkhead(64, Duration.ofSeconds(10))))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ public void retry() {
.defaultClient(ServiceRegion.GCP_EUROPE_WEST1.getApiUrl(),
ClientCredentials.of().withClientId("clientId").withClientSecret("clientSecret").build(),
ServiceRegion.GCP_EUROPE_WEST1.getOAuthTokenUrl())
.withPolicies(policies -> policies.withRetry(builder -> builder
.statusCodes(Arrays.asList(BAD_GATEWAY_502, SERVICE_UNAVAILABLE_503, GATEWAY_TIMEOUT_504))))
.withRequestPolicies(policies -> policies
.withAllOtherRequests(request -> request.withRetry(builder -> builder.statusCodes(
Arrays.asList(BAD_GATEWAY_502, SERVICE_UNAVAILABLE_503, GATEWAY_TIMEOUT_504)))))
.build("projectKey");

final CategoryPagedQueryResponse body = projectClient.categories().get().executeBlocking().getBody();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
private Supplier<OAuthMiddleware> oAuthMiddleware;
private Supplier<RetryRequestMiddleware> retryMiddleware;
private PolicyBuilder policyBuilder;
private RequestPolicyBuilder requestPolicyBuilder;
private Supplier<PolicyMiddleware> policyMiddleware;
private Supplier<QueueRequestMiddleware> queueMiddleware;
private Supplier<Middleware> correlationIdMiddleware;
Expand Down Expand Up @@ -1275,6 +1276,24 @@
return this;
}

/**
* add middleware for safe handling of failed requests
* @param requestPolicyBuilder the policy builder
* @return ClientBuilder instance
*/
public ClientBuilder withRequestPolicies(RequestPolicyBuilder requestPolicyBuilder) {
this.requestPolicyBuilder = requestPolicyBuilder;
this.policyMiddleware = requestPolicyBuilder::build;
return this;

Check warning on line 1287 in rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/ClientBuilder.java

View check run for this annotation

Codecov / codecov/patch

rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/ClientBuilder.java#L1285-L1287

Added lines #L1285 - L1287 were not covered by tests
}

public ClientBuilder withRequestPolicies(Function<RequestPolicyBuilder, RequestPolicyBuilder> fn) {
this.requestPolicyBuilder = fn
.apply(Optional.ofNullable(requestPolicyBuilder).orElse(RequestPolicyBuilder.of()));
this.policyMiddleware = requestPolicyBuilder::build;
return this;
}

/**
* add middleware for safe handling of failed requests
* @param policyMiddleware {@link PolicyMiddleware} to be used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@

import io.vrap.rmf.base.client.*;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import dev.failsafe.*;
import dev.failsafe.spi.Scheduler;

Expand All @@ -21,6 +18,8 @@
*
* <p>The PolicyBuilder allows the combination of different policies for failing requests.</p>
*
* <p>In case you need different policies based on the request use the {@link RequestPolicyBuilder} instead</p>
*
* <p>The order of policies matters. For example applying a {@link #withTimeout(Duration) timeout} before
* {@link #withRetry(RetryPolicyBuilder)} retry} will time out across all requests whereas applying a timeout after the retry count
* against every single request even the retried ones.</p>
Expand All @@ -47,11 +46,6 @@
*/
public class PolicyBuilder {

static final String loggerName = ClientBuilder.COMMERCETOOLS + ".retry";

private static final InternalLogger logger = InternalLogger.getLogger(loggerName);
private static final Logger classLogger = LoggerFactory.getLogger(PolicyMiddleware.class);

private final List<Policy<ApiHttpResponse<byte[]>>> policies;

private final Scheduler scheduler;
Expand All @@ -61,6 +55,10 @@ public PolicyBuilder() {
this.scheduler = Scheduler.DEFAULT;
}

List<Policy<ApiHttpResponse<byte[]>>> getPolicies() {
return policies;
}

public PolicyBuilder(List<Policy<ApiHttpResponse<byte[]>>> policies) {
this.policies = policies;
this.scheduler = Scheduler.DEFAULT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
package io.vrap.rmf.base.client.http;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Predicate;

import io.vrap.rmf.base.client.ApiHttpRequest;
import io.vrap.rmf.base.client.ApiHttpResponse;

import dev.failsafe.Policy;
Expand All @@ -28,4 +31,23 @@
final List<Policy<ApiHttpResponse<byte[]>>> policies) {
return new PolicyMiddlewareImpl(Scheduler.of(scheduler), policies);
}

public static PolicyMiddleware of(
final List<Map.Entry<Predicate<ApiHttpRequest>, List<Policy<ApiHttpResponse<byte[]>>>>> policies,
final Scheduler scheduler) {
return new RequestPolicyMiddlewareImpl(scheduler, policies);
}

public static PolicyMiddleware of(
final List<Map.Entry<Predicate<ApiHttpRequest>, List<Policy<ApiHttpResponse<byte[]>>>>> policies,
final ScheduledExecutorService scheduler) {
return new RequestPolicyMiddlewareImpl(Scheduler.of(scheduler), policies);

Check warning on line 44 in rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/PolicyMiddleware.java

View check run for this annotation

Codecov / codecov/patch

rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/PolicyMiddleware.java#L44

Added line #L44 was not covered by tests
}

public static PolicyMiddleware of(
final List<Map.Entry<Predicate<ApiHttpRequest>, List<Policy<ApiHttpResponse<byte[]>>>>> policies,
final ExecutorService scheduler) {
return new RequestPolicyMiddlewareImpl(Scheduler.of(scheduler), policies);

Check warning on line 50 in rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/PolicyMiddleware.java

View check run for this annotation

Codecov / codecov/patch

rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/PolicyMiddleware.java#L50

Added line #L50 was not covered by tests
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@

package io.vrap.rmf.base.client.http;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.function.Predicate;

import io.vrap.rmf.base.client.ApiHttpRequest;
import io.vrap.rmf.base.client.ApiHttpResponse;

import dev.failsafe.Policy;
import dev.failsafe.spi.Scheduler;

/**
* <h2>RequestPolicyBuilder</h2>
*
* <p>The RequestPolicyBuilder allows the combination of different policies for failing requests and apply them to matching
* requests.</p>
*
* <p>The order of policies matters. For example applying a {@link PolicyBuilder#withTimeout(Duration) timeout} before
* {@link PolicyBuilder#withRetry(RetryPolicyBuilder)} retry} will time out across all requests whereas applying a timeout after the retry count
* against every single request even the retried ones.</p>
*
* <h3 id="retry">Retry</h3>
*
* <h4>Retrying on HTTP status codes</h4>
*
* {@include.example io.vrap.rmf.base.client.http.RequestPolicyMiddlewareTest#retryConfigurationStatusCodes()}
*
* <h3>Retrying specific exceptions</h3>
*
* {@include.example io.vrap.rmf.base.client.http.RequestPolicyMiddlewareTest#retryConfigurationExceptions()}
*
* <h3 id="timeout">Timeout</h3>
*
* {@include.example io.vrap.rmf.base.client.http.RequestPolicyMiddlewareTest#timeoutConfiguration()}
*
* <h3 id="bulkhead">Bulkhead</h3>
*
* <p>Implementation of a Queue to limit the number of concurrent requests handled by the client</p>
*
* {@include.example io.vrap.rmf.base.client.http.RequestPolicyMiddlewareTest#queueConfiguration()}
*/
public class RequestPolicyBuilder {

private final Map<Predicate<ApiHttpRequest>, List<Policy<ApiHttpResponse<byte[]>>>> policies;

private final Scheduler scheduler;

public RequestPolicyBuilder() {
this.policies = new LinkedHashMap<>();
this.scheduler = Scheduler.DEFAULT;
}

public RequestPolicyBuilder(final Map<Predicate<ApiHttpRequest>, List<Policy<ApiHttpResponse<byte[]>>>> policies) {
this.policies = policies;
this.scheduler = Scheduler.DEFAULT;
}

Check warning on line 61 in rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/RequestPolicyBuilder.java

View check run for this annotation

Codecov / codecov/patch

rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/RequestPolicyBuilder.java#L58-L61

Added lines #L58 - L61 were not covered by tests

public RequestPolicyBuilder(final Scheduler scheduler,
final Map<Predicate<ApiHttpRequest>, List<Policy<ApiHttpResponse<byte[]>>>> policies) {
this.policies = policies;
this.scheduler = scheduler;
}

public RequestPolicyBuilder withScheduler(final ScheduledExecutorService scheduler) {
return new RequestPolicyBuilder(Scheduler.of(scheduler), policies);

Check warning on line 70 in rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/RequestPolicyBuilder.java

View check run for this annotation

Codecov / codecov/patch

rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/RequestPolicyBuilder.java#L70

Added line #L70 was not covered by tests
}

public RequestPolicyBuilder withScheduler(final ExecutorService scheduler) {
return new RequestPolicyBuilder(Scheduler.of(scheduler), policies);

Check warning on line 74 in rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/RequestPolicyBuilder.java

View check run for this annotation

Codecov / codecov/patch

rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/RequestPolicyBuilder.java#L74

Added line #L74 was not covered by tests
}

public RequestPolicyBuilder withScheduler(final Scheduler scheduler) {
return new RequestPolicyBuilder(scheduler, policies);

Check warning on line 78 in rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/RequestPolicyBuilder.java

View check run for this annotation

Codecov / codecov/patch

rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/RequestPolicyBuilder.java#L78

Added line #L78 was not covered by tests
}

public RequestPolicyBuilder withRequestMatching(final Predicate<ApiHttpRequest> predicate,
Function<PolicyBuilder, PolicyBuilder> fn) {
Map<Predicate<ApiHttpRequest>, List<Policy<ApiHttpResponse<byte[]>>>> policiesCopy = new LinkedHashMap<>(
policies);
policiesCopy.put(predicate, fn.apply(PolicyBuilder.of()).getPolicies());
return new RequestPolicyBuilder(scheduler, policiesCopy);
}

public RequestPolicyBuilder withAllOtherRequests(Function<PolicyBuilder, PolicyBuilder> fn) {
Map<Predicate<ApiHttpRequest>, List<Policy<ApiHttpResponse<byte[]>>>> policiesCopy = new LinkedHashMap<>(
policies);
policiesCopy.put(apiHttpRequest -> true, fn.apply(PolicyBuilder.of()).getPolicies());
return new RequestPolicyBuilder(scheduler, policiesCopy);
}

public PolicyMiddleware build() {
return PolicyMiddleware.of(new ArrayList<>(policies.entrySet()), scheduler);
}

public static RequestPolicyBuilder of() {
return new RequestPolicyBuilder();
}
}
Loading