Skip to content

Commit e6d77ff

Browse files
committed
Move BackPressureHandler factory methods to BackPressureHandlerFactories class (#1251)
1 parent c408b0d commit e6d77ff

File tree

6 files changed

+192
-164
lines changed

6 files changed

+192
-164
lines changed

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ protected abstract static class Builder<B extends ContainerOptionsBuilder<B, O>,
222222

223223
private static final BackPressureMode DEFAULT_THROUGHPUT_CONFIGURATION = BackPressureMode.AUTO;
224224

225-
private static final BackPressureHandlerFactory DEFAULT_BACKPRESSURE_FACTORY = BackPressureHandlerFactory::semaphoreBackPressureHandler;
225+
private static final BackPressureHandlerFactory DEFAULT_BACKPRESSURE_FACTORY = BackPressureHandlerFactories::semaphoreBackPressureHandler;
226226

227227
private static final ListenerMode DEFAULT_MESSAGE_DELIVERY_STRATEGY = ListenerMode.SINGLE_MESSAGE;
228228

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/*
2+
* Copyright 2013-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sqs.listener;
17+
18+
import java.time.Duration;
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
22+
/**
23+
* Spring Cloud AWS provides the following {@link BackPressureHandler} implementations:
24+
* <ul>
25+
* <li>{@link ConcurrencyLimiterBlockingBackPressureHandler}: Limits the maximum number of messages that can be *
26+
* processed concurrently by the application.</li> *
27+
* <li>{@link ThroughputBackPressureHandler}: Adapts the throughput dynamically between high and low modes in order to *
28+
* reduce SQS pull costs when few messages are coming in.</li> *
29+
* <li>{@link CompositeBackPressureHandler}: Allows combining multiple {@link BackPressureHandler} together and ensures
30+
* * they cooperate.</li> *
31+
* </ul>
32+
* <p>
33+
* Below are a few examples of how common use cases can be achieved. Keep in mind you can always create your own *
34+
* {@link BackPressureHandler} implementation and if needed combine it with the provided ones thanks to the *
35+
* {@link CompositeBackPressureHandler}. * *
36+
* <h3>A BackPressureHandler limiting the max concurrency with high throughput</h3> * *
37+
*
38+
* <pre>{@code
39+
* containerOptionsBuilder.backPressureHandlerFactory(containerOptions -> {
40+
* return ConcurrencyLimiterBlockingBackPressureHandler.builder()
41+
* .batchSize(containerOptions.getMaxMessagesPerPoll())
42+
* .totalPermits(containerOptions.getMaxConcurrentMessages())
43+
* .acquireTimeout(containerOptions.getMaxDelayBetweenPolls())
44+
* .throughputConfiguration(BackPressureMode.FIXED_HIGH_THROUGHPUT)
45+
* .build()
46+
* }}</pre>
47+
* <p>
48+
* * *
49+
* <h3>A BackPressureHandler limiting the max concurrency with dynamic throughput</h3> * *
50+
*
51+
* <pre>{@code
52+
* containerOptionsBuilder.backPressureHandlerFactory(containerOptions -> {
53+
* int batchSize = containerOptions.getMaxMessagesPerPoll();
54+
* var concurrencyLimiterBlockingBackPressureHandler = ConcurrencyLimiterBlockingBackPressureHandler.builder()
55+
* .batchSize(batchSize)
56+
* .totalPermits(containerOptions.getMaxConcurrentMessages())
57+
* .acquireTimeout(containerOptions.getMaxDelayBetweenPolls())
58+
* .throughputConfiguration(BackPressureMode.AUTO)
59+
* .build()
60+
* var throughputBackPressureHandler = ThroughputBackPressureHandler.builder()
61+
* .batchSize(batchSize)
62+
* .build();
63+
* return new CompositeBackPressureHandler(List.of(
64+
* concurrencyLimiterBlockingBackPressureHandler,
65+
* throughputBackPressureHandler
66+
* ),
67+
* batchSize,
68+
* standbyLimitPollingInterval
69+
* );
70+
* }}</pre>
71+
*/
72+
public class BackPressureHandlerFactories {
73+
74+
private BackPressureHandlerFactories() {
75+
}
76+
77+
/**
78+
* Creates a new {@link SemaphoreBackPressureHandler} instance based on the provided {@link ContainerOptions}.
79+
*
80+
* @param options the container options.
81+
* @return the created SemaphoreBackPressureHandler.
82+
*/
83+
public static BatchAwareBackPressureHandler semaphoreBackPressureHandler(ContainerOptions<?, ?> options) {
84+
return SemaphoreBackPressureHandler.builder().batchSize(options.getMaxMessagesPerPoll())
85+
.totalPermits(options.getMaxConcurrentMessages()).acquireTimeout(options.getMaxDelayBetweenPolls())
86+
.throughputConfiguration(options.getBackPressureMode()).build();
87+
}
88+
89+
/**
90+
* Creates a new {@link BackPressureHandler} instance based on the provided {@link ContainerOptions} combining a
91+
* {@link ConcurrencyLimiterBlockingBackPressureHandler}, a {@link ThroughputBackPressureHandler} and a
92+
* {@link FullBatchBackPressureHandler}. The exact combination of depends on the given {@link ContainerOptions}.
93+
*
94+
* @param options the container options.
95+
* @param maxIdleWaitTime the maximum amount of time to wait for a permit to be released in case no permits were
96+
* obtained.
97+
* @return the created SemaphoreBackPressureHandler.
98+
*/
99+
public static BatchAwareBackPressureHandler adaptativeThroughputBackPressureHandler(ContainerOptions<?, ?> options,
100+
Duration maxIdleWaitTime) {
101+
BackPressureMode backPressureMode = options.getBackPressureMode();
102+
103+
var concurrencyLimiterBlockingBackPressureHandler = concurrencyLimiterBackPressureHandler(options);
104+
if (backPressureMode == BackPressureMode.FIXED_HIGH_THROUGHPUT) {
105+
return concurrencyLimiterBlockingBackPressureHandler;
106+
}
107+
var backPressureHandlers = new ArrayList<BackPressureHandler>();
108+
backPressureHandlers.add(concurrencyLimiterBlockingBackPressureHandler);
109+
110+
// The ThroughputBackPressureHandler should run second in the chain as it is non-blocking.
111+
// Running it first would result in more polls as it would potentially limit the
112+
// ConcurrencyLimiterBlockingBackPressureHandler to a lower amount of requested permits
113+
// which means the ConcurrencyLimiterBlockingBackPressureHandler blocking behavior would
114+
// not be optimally leveraged.
115+
if (backPressureMode == BackPressureMode.AUTO
116+
|| backPressureMode == BackPressureMode.ALWAYS_POLL_MAX_MESSAGES) {
117+
backPressureHandlers.add(throughputBackPressureHandler(options));
118+
}
119+
120+
// The FullBatchBackPressureHandler should run last in the chain to ensure that a full batch is requested or not
121+
if (backPressureMode == BackPressureMode.ALWAYS_POLL_MAX_MESSAGES) {
122+
backPressureHandlers.add(fullBatchBackPressureHandler(options));
123+
}
124+
return compositeBackPressureHandler(options, maxIdleWaitTime, backPressureHandlers);
125+
}
126+
127+
/**
128+
* Creates a new {@link ConcurrencyLimiterBlockingBackPressureHandler} instance based on the provided
129+
* {@link ContainerOptions}.
130+
*
131+
* @param options the container options.
132+
* @return the created ConcurrencyLimiterBlockingBackPressureHandler.
133+
*/
134+
public static CompositeBackPressureHandler compositeBackPressureHandler(ContainerOptions<?, ?> options,
135+
Duration maxIdleWaitTime, List<BackPressureHandler> backPressureHandlers) {
136+
return new CompositeBackPressureHandler(List.copyOf(backPressureHandlers), options.getMaxMessagesPerPoll(),
137+
maxIdleWaitTime);
138+
}
139+
140+
/**
141+
* Creates a new {@link ConcurrencyLimiterBlockingBackPressureHandler} instance based on the provided
142+
* {@link ContainerOptions}.
143+
*
144+
* @param options the container options.
145+
* @return the created ConcurrencyLimiterBlockingBackPressureHandler.
146+
*/
147+
public static ConcurrencyLimiterBlockingBackPressureHandler concurrencyLimiterBackPressureHandler(
148+
ContainerOptions<?, ?> options) {
149+
return ConcurrencyLimiterBlockingBackPressureHandler.builder().batchSize(options.getMaxMessagesPerPoll())
150+
.totalPermits(options.getMaxConcurrentMessages()).throughputConfiguration(options.getBackPressureMode())
151+
.acquireTimeout(options.getMaxDelayBetweenPolls()).build();
152+
}
153+
154+
/**
155+
* Creates a new {@link ThroughputBackPressureHandler} instance based on the provided {@link ContainerOptions}.
156+
*
157+
* @param options the container options.
158+
* @return the created ThroughputBackPressureHandler.
159+
*/
160+
public static ThroughputBackPressureHandler throughputBackPressureHandler(ContainerOptions<?, ?> options) {
161+
return ThroughputBackPressureHandler.builder().build();
162+
}
163+
164+
/**
165+
* Creates a new {@link FullBatchBackPressureHandler} instance based on the provided {@link ContainerOptions}.
166+
*
167+
* @param options the container options.
168+
* @return the created FullBatchBackPressureHandler.
169+
*/
170+
public static FullBatchBackPressureHandler fullBatchBackPressureHandler(ContainerOptions<?, ?> options) {
171+
return FullBatchBackPressureHandler.builder().batchSize(options.getMaxMessagesPerPoll()).build();
172+
}
173+
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandlerFactory.java

Lines changed: 1 addition & 146 deletions
Original file line numberDiff line numberDiff line change
@@ -15,63 +15,12 @@
1515
*/
1616
package io.awspring.cloud.sqs.listener;
1717

18-
import java.time.Duration;
19-
import java.util.ArrayList;
20-
import java.util.List;
21-
2218
/**
2319
* A factory for creating {@link BackPressureHandler} for managing queue consumption backpressure. Implementations can
2420
* configure each the {@link BackPressureHandler} according to its strategies, using the provided
2521
* {@link ContainerOptions}.
2622
* <p>
27-
* Spring Cloud AWS provides the following {@link BackPressureHandler} implementations:
28-
* <ul>
29-
* <li>{@link ConcurrencyLimiterBlockingBackPressureHandler}: Limits the maximum number of messages that can be
30-
* processed concurrently by the application.</li>
31-
* <li>{@link ThroughputBackPressureHandler}: Adapts the throughput dynamically between high and low modes in order to
32-
* reduce SQS pull costs when few messages are coming in.</li>
33-
* <li>{@link CompositeBackPressureHandler}: Allows combining multiple {@link BackPressureHandler} together and ensures
34-
* they cooperate.</li>
35-
* </ul>
36-
* <p>
37-
* Below are a few examples of how common use cases can be achieved. Keep in mind you can always create your own
38-
* {@link BackPressureHandler} implementation and if needed combine it with the provided ones thanks to the
39-
* {@link CompositeBackPressureHandler}.
40-
*
41-
* <h3>A BackPressureHandler limiting the max concurrency with high throughput</h3>
42-
*
43-
* <pre>{@code
44-
* containerOptionsBuilder.backPressureHandlerFactory(containerOptions -> {
45-
* return ConcurrencyLimiterBlockingBackPressureHandler.builder()
46-
* .batchSize(containerOptions.getMaxMessagesPerPoll())
47-
* .totalPermits(containerOptions.getMaxConcurrentMessages())
48-
* .acquireTimeout(containerOptions.getMaxDelayBetweenPolls())
49-
* .throughputConfiguration(BackPressureMode.FIXED_HIGH_THROUGHPUT)
50-
* .build()
51-
* }}</pre>
52-
*
53-
* <h3>A BackPressureHandler limiting the max concurrency with dynamic throughput</h3>
54-
*
55-
* <pre>{@code
56-
* containerOptionsBuilder.backPressureHandlerFactory(containerOptions -> {
57-
* int batchSize = containerOptions.getMaxMessagesPerPoll();
58-
* var concurrencyLimiterBlockingBackPressureHandler = ConcurrencyLimiterBlockingBackPressureHandler.builder()
59-
* .batchSize(batchSize)
60-
* .totalPermits(containerOptions.getMaxConcurrentMessages())
61-
* .acquireTimeout(containerOptions.getMaxDelayBetweenPolls())
62-
* .throughputConfiguration(BackPressureMode.AUTO)
63-
* .build()
64-
* var throughputBackPressureHandler = ThroughputBackPressureHandler.builder()
65-
* .batchSize(batchSize)
66-
* .build();
67-
* return new CompositeBackPressureHandler(List.of(
68-
* concurrencyLimiterBlockingBackPressureHandler,
69-
* throughputBackPressureHandler
70-
* ),
71-
* batchSize,
72-
* standbyLimitPollingInterval
73-
* );
74-
* }}</pre>
23+
* A set of default implementations are provided by the {@link BackPressureHandlerFactories} class.
7524
*/
7625
public interface BackPressureHandlerFactory {
7726

@@ -86,98 +35,4 @@ public interface BackPressureHandlerFactory {
8635
* @return the created BackPressureHandler
8736
*/
8837
BackPressureHandler createBackPressureHandler(ContainerOptions<?, ?> containerOptions);
89-
90-
/**
91-
* Creates a new {@link SemaphoreBackPressureHandler} instance based on the provided {@link ContainerOptions}.
92-
*
93-
* @param options the container options.
94-
* @return the created SemaphoreBackPressureHandler.
95-
*/
96-
static BatchAwareBackPressureHandler semaphoreBackPressureHandler(ContainerOptions<?, ?> options) {
97-
return SemaphoreBackPressureHandler.builder().batchSize(options.getMaxMessagesPerPoll())
98-
.totalPermits(options.getMaxConcurrentMessages()).acquireTimeout(options.getMaxDelayBetweenPolls())
99-
.throughputConfiguration(options.getBackPressureMode()).build();
100-
}
101-
102-
/**
103-
* Creates a new {@link BackPressureHandler} instance based on the provided {@link ContainerOptions} combining a
104-
* {@link ConcurrencyLimiterBlockingBackPressureHandler}, a {@link ThroughputBackPressureHandler} and a
105-
* {@link FullBatchBackPressureHandler}. The exact combination of depends on the given {@link ContainerOptions}.
106-
*
107-
* @param options the container options.
108-
* @param maxIdleWaitTime the maximum amount of time to wait for a permit to be released in case no permits were
109-
* obtained.
110-
* @return the created SemaphoreBackPressureHandler.
111-
*/
112-
static BatchAwareBackPressureHandler adaptativeThroughputBackPressureHandler(ContainerOptions<?, ?> options,
113-
Duration maxIdleWaitTime) {
114-
BackPressureMode backPressureMode = options.getBackPressureMode();
115-
116-
var concurrencyLimiterBlockingBackPressureHandler = concurrencyLimiterBackPressureHandler(options);
117-
if (backPressureMode == BackPressureMode.FIXED_HIGH_THROUGHPUT) {
118-
return concurrencyLimiterBlockingBackPressureHandler;
119-
}
120-
var backPressureHandlers = new ArrayList<BackPressureHandler>();
121-
backPressureHandlers.add(concurrencyLimiterBlockingBackPressureHandler);
122-
123-
// The ThroughputBackPressureHandler should run second in the chain as it is non-blocking.
124-
// Running it first would result in more polls as it would potentially limit the
125-
// ConcurrencyLimiterBlockingBackPressureHandler to a lower amount of requested permits
126-
// which means the ConcurrencyLimiterBlockingBackPressureHandler blocking behavior would
127-
// not be optimally leveraged.
128-
if (backPressureMode == BackPressureMode.AUTO
129-
|| backPressureMode == BackPressureMode.ALWAYS_POLL_MAX_MESSAGES) {
130-
backPressureHandlers.add(throughputBackPressureHandler(options));
131-
}
132-
133-
// The FullBatchBackPressureHandler should run last in the chain to ensure that a full batch is requested or not
134-
if (backPressureMode == BackPressureMode.ALWAYS_POLL_MAX_MESSAGES) {
135-
backPressureHandlers.add(fullBatchBackPressureHandler(options));
136-
}
137-
return compositeBackPressureHandler(options, maxIdleWaitTime, backPressureHandlers);
138-
}
139-
140-
/**
141-
* Creates a new {@link ConcurrencyLimiterBlockingBackPressureHandler} instance based on the provided
142-
* {@link ContainerOptions}.
143-
*
144-
* @param options the container options.
145-
* @return the created ConcurrencyLimiterBlockingBackPressureHandler.
146-
*/
147-
static CompositeBackPressureHandler compositeBackPressureHandler(ContainerOptions<?, ?> options,
148-
Duration maxIdleWaitTime, List<BackPressureHandler> backPressureHandlers) {
149-
return new CompositeBackPressureHandler(List.copyOf(backPressureHandlers), options.getMaxMessagesPerPoll(),
150-
maxIdleWaitTime);
151-
}
152-
153-
/**
154-
* Creates a new {@link ConcurrencyLimiterBlockingBackPressureHandler} instance based on the provided
155-
* {@link ContainerOptions}.
156-
* @param options the container options.
157-
* @return the created ConcurrencyLimiterBlockingBackPressureHandler.
158-
*/
159-
static ConcurrencyLimiterBlockingBackPressureHandler concurrencyLimiterBackPressureHandler(
160-
ContainerOptions<?, ?> options) {
161-
return ConcurrencyLimiterBlockingBackPressureHandler.builder().batchSize(options.getMaxMessagesPerPoll())
162-
.totalPermits(options.getMaxConcurrentMessages()).throughputConfiguration(options.getBackPressureMode())
163-
.acquireTimeout(options.getMaxDelayBetweenPolls()).build();
164-
}
165-
166-
/**
167-
* Creates a new {@link ThroughputBackPressureHandler} instance based on the provided {@link ContainerOptions}.
168-
* @param options the container options.
169-
* @return the created ThroughputBackPressureHandler.
170-
*/
171-
static ThroughputBackPressureHandler throughputBackPressureHandler(ContainerOptions<?, ?> options) {
172-
return ThroughputBackPressureHandler.builder().build();
173-
}
174-
175-
/**
176-
* Creates a new {@link FullBatchBackPressureHandler} instance based on the provided {@link ContainerOptions}.
177-
* @param options the container options.
178-
* @return the created FullBatchBackPressureHandler.
179-
*/
180-
static FullBatchBackPressureHandler fullBatchBackPressureHandler(ContainerOptions<?, ?> options) {
181-
return FullBatchBackPressureHandler.builder().batchSize(options.getMaxMessagesPerPoll()).build();
182-
}
18338
}

spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsBackPressureIntegrationTests.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,9 @@ void staticBackPressureLimitShouldCapQueueProcessingCapacity(int staticLimit, in
132132
options -> options.maxMessagesPerPoll(5).maxConcurrentMessages(5)
133133
.backPressureMode(BackPressureMode.AUTO).maxDelayBetweenPolls(Duration.ofSeconds(1))
134134
.pollTimeout(Duration.ofSeconds(1))
135-
.backPressureHandlerFactory(containerOptions -> BackPressureHandlerFactory
135+
.backPressureHandlerFactory(containerOptions -> BackPressureHandlerFactories
136136
.compositeBackPressureHandler(containerOptions, Duration.ofMillis(50L),
137-
List.of(limiter, BackPressureHandlerFactory
137+
List.of(limiter, BackPressureHandlerFactories
138138
.concurrencyLimiterBackPressureHandler(containerOptions)))))
139139
.messageListener(msg -> {
140140
int concurrentRqs = concurrentRequest.incrementAndGet();
@@ -170,9 +170,9 @@ void zeroBackPressureLimitShouldStopQueueProcessing() throws Exception {
170170
options -> options.maxMessagesPerPoll(5).maxConcurrentMessages(5)
171171
.backPressureMode(BackPressureMode.AUTO).maxDelayBetweenPolls(Duration.ofSeconds(1))
172172
.pollTimeout(Duration.ofSeconds(1))
173-
.backPressureHandlerFactory(containerOptions -> BackPressureHandlerFactory
173+
.backPressureHandlerFactory(containerOptions -> BackPressureHandlerFactories
174174
.compositeBackPressureHandler(containerOptions, Duration.ofMillis(50L),
175-
List.of(limiter, BackPressureHandlerFactory
175+
List.of(limiter, BackPressureHandlerFactories
176176
.concurrencyLimiterBackPressureHandler(containerOptions)))))
177177
.messageListener(msg -> {
178178
int concurrentRqs = concurrentRequest.incrementAndGet();
@@ -214,9 +214,9 @@ void changeInBackPressureLimitShouldAdaptQueueProcessingCapacity() throws Except
214214
options -> options.maxMessagesPerPoll(5).maxConcurrentMessages(5)
215215
.backPressureMode(BackPressureMode.AUTO).maxDelayBetweenPolls(Duration.ofSeconds(1))
216216
.pollTimeout(Duration.ofSeconds(1))
217-
.backPressureHandlerFactory(containerOptions -> BackPressureHandlerFactory
217+
.backPressureHandlerFactory(containerOptions -> BackPressureHandlerFactories
218218
.compositeBackPressureHandler(containerOptions, Duration.ofMillis(50L),
219-
List.of(limiter, BackPressureHandlerFactory
219+
List.of(limiter, BackPressureHandlerFactories
220220
.concurrencyLimiterBackPressureHandler(containerOptions)))))
221221
.messageListener(msg -> {
222222
try {
@@ -441,9 +441,9 @@ void unsynchronizedChangesInBackPressureLimitShouldAdaptQueueProcessingCapacity(
441441
.backPressureMode(BackPressureMode.AUTO).maxDelayBetweenPolls(Duration.ofSeconds(1))
442442
.pollTimeout(Duration.ofSeconds(1))
443443
.backPressureHandlerFactory(containerOptions -> new StatisticsBphDecorator(
444-
BackPressureHandlerFactory.compositeBackPressureHandler(containerOptions,
444+
BackPressureHandlerFactories.compositeBackPressureHandler(containerOptions,
445445
Duration.ofMillis(50L),
446-
List.of(limiter, BackPressureHandlerFactory
446+
List.of(limiter, BackPressureHandlerFactories
447447
.concurrencyLimiterBackPressureHandler(containerOptions))),
448448
eventsCsvWriter)))
449449
.messageListener(msg -> {

0 commit comments

Comments
 (0)