Skip to content

Dynamically configure SemaphoreBackPressureHandler with BackPressureLimiter (#1251) #1308

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
ec9f0a4
Dynamically configure SemaphoreBackPressureHandler with BackPressureL…
loicrouchon Dec 5, 2024
3da2be2
Use a wrapper approach for dynamically limit the permits of Semaphore…
loicrouchon Jan 2, 2025
296d05a
Introduce a CompositeBackPressureHandler allowing for composition of …
loicrouchon Jan 3, 2025
3f72277
Remove BackPressureHandlerLimiter from the library and make it user-c…
loicrouchon Feb 6, 2025
7dda70b
Move SQS BackPressureHandlers tests to a dedicated integration test (…
loicrouchon Feb 12, 2025
2558c5d
Add a wait condition to the CompositeBPH in case 0 permits were retur…
loicrouchon Feb 12, 2025
bd81aea
Enhance default methods for backward compatibility (#1251)
loicrouchon Feb 13, 2025
1e99159
Split SemaphoreBackPressureHandler into a ConcurrencyLimiterBlocking …
loicrouchon Feb 17, 2025
74dc430
Revert changes to SemaphoreBackPressureHandler not to change default …
loicrouchon Mar 3, 2025
7ed4607
Move SemaphoreBackPressureHandler#release(amount, reason) implementat…
loicrouchon Mar 4, 2025
dbe37d9
Address review comments
loicrouchon May 8, 2025
df7a9af
Introduce a BackPressureHandlerFactory for configuring SQS back press…
loicrouchon May 8, 2025
8deea58
Introduce factory methods for creating back-pressure handlers (#1251)
loicrouchon May 9, 2025
ea3c65a
Rename BackPressureHandlerFactory methods
loicrouchon May 9, 2025
a1c6b44
Simplify ThroughputBackPressureHandler not to count in flight message…
loicrouchon May 27, 2025
fd36d45
Introduce BlockerBackPressureHandler marker interface (#1251)
loicrouchon May 28, 2025
9316ce3
Move BackPressureHandler factory methods to BackPressureHandlerFactor…
loicrouchon May 28, 2025
77488f1
Improve javadoc clarity
loicrouchon Jun 10, 2025
af9d6ce
Add tests for ThroughputBackPressureHandler (#1251)
loicrouchon Jun 10, 2025
0da12ab
Add tests for ConcurrencyLimiterBlockingBackPressureHandler (#1251)
loicrouchon Jun 10, 2025
4d3c13d
Add tests for FullBatchBackPressureHandler (#1251)
loicrouchon Jun 10, 2025
cf3f567
Add tests for CompositeBackPressureHandlerTest (#1251)
loicrouchon Jun 11, 2025
ad707da
Limit requested permits to batch size for ThroughputBackPressureHandl…
loicrouchon Jul 1, 2025
c7e6329
Update BlockingBackPressureHandler javadoc (#1251)
loicrouchon Jul 1, 2025
b1a1f56
Improve SQS tests stability (#1251)
loicrouchon Jul 1, 2025
6c2e37d
Remove BatchAwareBackPressureHandler#releaseBatch() default implement…
loicrouchon Jul 14, 2025
0e99553
Document backpressure management (#1251)
loicrouchon Jul 17, 2025
a8f158b
Move backpressure management documentation to under '8.9. Message Pro…
loicrouchon Jul 17, 2025
a4e2f1e
Remove default implementation of deprecated methods (#1251)
loicrouchon Jul 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public abstract class AbstractContainerOptions<O extends ContainerOptions<O, B>,

private final BackPressureMode backPressureMode;

private final BackPressureHandlerFactory backPressureHandlerFactory;

private final ListenerMode listenerMode;

private final MessagingMessageConverter<?> messageConverter;
Expand Down Expand Up @@ -90,6 +92,7 @@ protected AbstractContainerOptions(Builder<?, ?> builder) {
this.listenerShutdownTimeout = builder.listenerShutdownTimeout;
this.acknowledgementShutdownTimeout = builder.acknowledgementShutdownTimeout;
this.backPressureMode = builder.backPressureMode;
this.backPressureHandlerFactory = builder.backPressureHandlerFactory;
this.listenerMode = builder.listenerMode;
this.messageConverter = builder.messageConverter;
this.acknowledgementMode = builder.acknowledgementMode;
Expand Down Expand Up @@ -162,6 +165,11 @@ public BackPressureMode getBackPressureMode() {
return this.backPressureMode;
}

@Override
public BackPressureHandlerFactory getBackPressureHandlerFactory() {
return this.backPressureHandlerFactory;
}

@Override
public ListenerMode getListenerMode() {
return this.listenerMode;
Expand Down Expand Up @@ -232,6 +240,8 @@ protected abstract static class Builder<B extends ContainerOptionsBuilder<B, O>,

private static final BackPressureMode DEFAULT_THROUGHPUT_CONFIGURATION = BackPressureMode.AUTO;

private static final BackPressureHandlerFactory DEFAULT_BACKPRESSURE_FACTORY = BackPressureHandlerFactories::semaphoreBackPressureHandler;

private static final ListenerMode DEFAULT_MESSAGE_DELIVERY_STRATEGY = ListenerMode.SINGLE_MESSAGE;

private static final MessagingMessageConverter<?> DEFAULT_MESSAGE_CONVERTER = new SqsMessagingMessageConverter();
Expand All @@ -254,6 +264,8 @@ protected abstract static class Builder<B extends ContainerOptionsBuilder<B, O>,

private BackPressureMode backPressureMode = DEFAULT_THROUGHPUT_CONFIGURATION;

private BackPressureHandlerFactory backPressureHandlerFactory = DEFAULT_BACKPRESSURE_FACTORY;

private Duration listenerShutdownTimeout = DEFAULT_LISTENER_SHUTDOWN_TIMEOUT;

private Duration acknowledgementShutdownTimeout = DEFAULT_ACKNOWLEDGEMENT_SHUTDOWN_TIMEOUT;
Expand Down Expand Up @@ -296,6 +308,7 @@ protected Builder(AbstractContainerOptions<?, ?> options) {
this.listenerShutdownTimeout = options.listenerShutdownTimeout;
this.acknowledgementShutdownTimeout = options.acknowledgementShutdownTimeout;
this.backPressureMode = options.backPressureMode;
this.backPressureHandlerFactory = options.backPressureHandlerFactory;
this.listenerMode = options.listenerMode;
this.messageConverter = options.messageConverter;
this.acknowledgementMode = options.acknowledgementMode;
Expand Down Expand Up @@ -390,6 +403,12 @@ public B backPressureMode(BackPressureMode backPressureMode) {
return self();
}

@Override
public B backPressureHandlerFactory(BackPressureHandlerFactory backPressureHandlerFactory) {
this.backPressureHandlerFactory = backPressureHandlerFactory;
return self();
}

@Override
public B acknowledgementInterval(Duration acknowledgementInterval) {
Assert.notNull(acknowledgementInterval, "acknowledgementInterval cannot be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,9 @@ private TaskExecutor validateCustomExecutor(TaskExecutor taskExecutor) {
}

protected BackPressureHandler createBackPressureHandler() {
return SemaphoreBackPressureHandler.builder().batchSize(getContainerOptions().getMaxMessagesPerPoll())
.totalPermits(getContainerOptions().getMaxConcurrentMessages())
.acquireTimeout(getContainerOptions().getMaxDelayBetweenPolls())
.throughputConfiguration(getContainerOptions().getBackPressureMode()).build();
O containerOptions = getContainerOptions();
BackPressureHandlerFactory factory = containerOptions.getBackPressureHandlerFactory();
return factory.createBackPressureHandler(containerOptions);
}

protected TaskExecutor createSourcesTaskExecutor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,48 @@
public interface BackPressureHandler {

/**
* Request a number of permits. Each obtained permit allows the
* Requests a number of permits. Each obtained permit allows the
* {@link io.awspring.cloud.sqs.listener.source.MessageSource} to retrieve one message.
* @param amount the amount of permits to request.
* @return the amount of permits obtained.
* @throws InterruptedException if the Thread is interrupted while waiting for permits.
*/
int request(int amount) throws InterruptedException;

/**
* Releases the specified amount of permits for processed messages. Each message that has been processed should
* release one permit, whether processing was successful or not.
* <p>
* This method can be called in the following use cases:
* <ul>
* <li>{@link ReleaseReason#LIMITED}: all/some permits were not used because another BackPressureHandler has a lower
* permits limit and the difference in permits needs to be returned.</li>
* <li>{@link ReleaseReason#NONE_FETCHED}: none of the permits were actually used because no messages were retrieved
* from SQS. Permits need to be returned.</li>
* <li>{@link ReleaseReason#PARTIAL_FETCH}: some of the permits were used (some messages were retrieved from SQS).
* The unused ones need to be returned. The amount to be returned might be {@literal 0}, in which case it means all
* the permits will be used as the same number of messages were fetched from SQS.</li>
* <li>{@link ReleaseReason#PROCESSED}: a message processing finished, successfully or not.</li>
* </ul>
* @param amount the amount of permits to release.
* @param reason the reason why the permits were released.
*/
default void release(int amount, ReleaseReason reason) {
release(amount);
}

/**
* Release the specified amount of permits. Each message that has been processed should release one permit, whether
* processing was successful or not.
* @param amount the amount of permits to release.
*
* @deprecated This method is deprecated and will not be called by the Spring Cloud AWS SQS listener anymore.
* Implement {@link #release(int, ReleaseReason)} instead.
*/
void release(int amount);
@Deprecated
default void release(int amount) {
release(amount, ReleaseReason.PROCESSED);
}

/**
* Attempts to acquire all permits up to the specified timeout. If successful, means all permits were returned and
Expand All @@ -52,4 +80,24 @@ public interface BackPressureHandler {
*/
boolean drain(Duration timeout);

enum ReleaseReason {
/**
* All/Some permits were not used because another BackPressureHandler has a lower permits limit and the permits
* difference need to be aligned across all handlers.
*/
LIMITED,
/**
* No messages were retrieved from SQS, so all permits need to be returned.
*/
NONE_FETCHED,
/**
* Some messages were fetched from SQS. Unused permits if any need to be returned.
*/
PARTIAL_FETCH,
/**
* The processing of one or more messages finished, successfully or not.
*/
PROCESSED;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright 2013-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.awspring.cloud.sqs.listener;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

/**
* Spring Cloud AWS provides the following {@link BackPressureHandler} implementations:
* <ul>
* <li>{@link ConcurrencyLimiterBlockingBackPressureHandler}: Limits the maximum number of messages that can be *
* processed concurrently by the application.</li> *
* <li>{@link ThroughputBackPressureHandler}: Adapts the throughput dynamically between high and low modes in order to *
* reduce SQS pull costs when few messages are coming in.</li> *
* <li>{@link CompositeBackPressureHandler}: Allows combining multiple {@link BackPressureHandler} together and ensures
* * they cooperate.</li> *
* </ul>
* <p>
* Below are a few examples of how common use cases can be achieved. Keep in mind you can always create your own *
* {@link BackPressureHandler} implementation and if needed combine it with the provided ones thanks to the *
* {@link CompositeBackPressureHandler}. * *
* <h3>A BackPressureHandler limiting the max concurrency with high throughput</h3> * *
*
* <pre>{@code
* containerOptionsBuilder.backPressureHandlerFactory(containerOptions -> {
* return ConcurrencyLimiterBlockingBackPressureHandler.builder()
* .batchSize(containerOptions.getMaxMessagesPerPoll())
* .totalPermits(containerOptions.getMaxConcurrentMessages())
* .acquireTimeout(containerOptions.getMaxDelayBetweenPolls())
* .throughputConfiguration(BackPressureMode.FIXED_HIGH_THROUGHPUT)
* .build()
* }}</pre>
* <p>
* * *
* <h3>A BackPressureHandler limiting the max concurrency with dynamic throughput</h3> * *
*
* <pre>{@code
* containerOptionsBuilder.backPressureHandlerFactory(containerOptions -> {
* int batchSize = containerOptions.getMaxMessagesPerPoll();
* var concurrencyLimiterBlockingBackPressureHandler = ConcurrencyLimiterBlockingBackPressureHandler.builder()
* .batchSize(batchSize)
* .totalPermits(containerOptions.getMaxConcurrentMessages())
* .acquireTimeout(containerOptions.getMaxDelayBetweenPolls())
* .throughputConfiguration(BackPressureMode.AUTO)
* .build()
* var throughputBackPressureHandler = ThroughputBackPressureHandler.builder()
* .batchSize(batchSize)
* .build();
* return new CompositeBackPressureHandler(List.of(
* concurrencyLimiterBlockingBackPressureHandler,
* throughputBackPressureHandler
* ),
* batchSize,
* standbyLimitPollingInterval
* );
* }}</pre>
*/
public class BackPressureHandlerFactories {

private BackPressureHandlerFactories() {
}

/**
* Creates a new {@link SemaphoreBackPressureHandler} instance based on the provided {@link ContainerOptions}.
*
* @param options the container options.
* @return the created SemaphoreBackPressureHandler.
*/
public static BatchAwareBackPressureHandler semaphoreBackPressureHandler(ContainerOptions<?, ?> options) {
return SemaphoreBackPressureHandler.builder().batchSize(options.getMaxMessagesPerPoll())
.totalPermits(options.getMaxConcurrentMessages()).acquireTimeout(options.getMaxDelayBetweenPolls())
.throughputConfiguration(options.getBackPressureMode()).build();
}

/**
* Creates a new {@link BackPressureHandler} instance based on the provided {@link ContainerOptions} combining a
* {@link ConcurrencyLimiterBlockingBackPressureHandler}, a {@link ThroughputBackPressureHandler} and a
* {@link FullBatchBackPressureHandler}. The exact combination of depends on the given {@link ContainerOptions}.
*
* @param options the container options.
* @param maxIdleWaitTime the maximum amount of time to wait for a permit to be released in case no permits were
* obtained.
* @return the created SemaphoreBackPressureHandler.
*/
public static BatchAwareBackPressureHandler adaptativeThroughputBackPressureHandler(ContainerOptions<?, ?> options,
Duration maxIdleWaitTime) {
BackPressureMode backPressureMode = options.getBackPressureMode();

var concurrencyLimiterBlockingBackPressureHandler = concurrencyLimiterBackPressureHandler(options);
if (backPressureMode == BackPressureMode.FIXED_HIGH_THROUGHPUT) {
return concurrencyLimiterBlockingBackPressureHandler;
}
var backPressureHandlers = new ArrayList<BackPressureHandler>();
backPressureHandlers.add(concurrencyLimiterBlockingBackPressureHandler);

// The ThroughputBackPressureHandler should run second in the chain as it is non-blocking.
// Running it first would result in more polls as it would potentially limit the
// ConcurrencyLimiterBlockingBackPressureHandler to a lower amount of requested permits
// which means the ConcurrencyLimiterBlockingBackPressureHandler blocking behavior would
// not be optimally leveraged.
if (backPressureMode == BackPressureMode.AUTO
|| backPressureMode == BackPressureMode.ALWAYS_POLL_MAX_MESSAGES) {
backPressureHandlers.add(throughputBackPressureHandler(options));
}

// The FullBatchBackPressureHandler should run last in the chain to ensure that a full batch is requested or not
if (backPressureMode == BackPressureMode.ALWAYS_POLL_MAX_MESSAGES) {
backPressureHandlers.add(fullBatchBackPressureHandler(options));
}
return compositeBackPressureHandler(options, maxIdleWaitTime, backPressureHandlers);
}

/**
* Creates a new {@link ConcurrencyLimiterBlockingBackPressureHandler} instance based on the provided
* {@link ContainerOptions}.
*
* @param options the container options.
* @return the created ConcurrencyLimiterBlockingBackPressureHandler.
*/
public static CompositeBackPressureHandler compositeBackPressureHandler(ContainerOptions<?, ?> options,
Duration maxIdleWaitTime, List<BackPressureHandler> backPressureHandlers) {
return CompositeBackPressureHandler.builder().batchSize(options.getMaxMessagesPerPoll())
.noPermitsReturnedWaitTimeout(maxIdleWaitTime).backPressureHandlers(backPressureHandlers).build();
}

/**
* Creates a new {@link ConcurrencyLimiterBlockingBackPressureHandler} instance based on the provided
* {@link ContainerOptions}.
*
* @param options the container options.
* @return the created ConcurrencyLimiterBlockingBackPressureHandler.
*/
public static ConcurrencyLimiterBlockingBackPressureHandler concurrencyLimiterBackPressureHandler(
ContainerOptions<?, ?> options) {
return ConcurrencyLimiterBlockingBackPressureHandler.builder().batchSize(options.getMaxMessagesPerPoll())
.totalPermits(options.getMaxConcurrentMessages()).acquireTimeout(options.getMaxDelayBetweenPolls())
.build();
}

/**
* Creates a new {@link ThroughputBackPressureHandler} instance based on the provided {@link ContainerOptions}.
*
* @param options the container options.
* @return the created ThroughputBackPressureHandler.
*/
public static ThroughputBackPressureHandler throughputBackPressureHandler(ContainerOptions<?, ?> options) {
return ThroughputBackPressureHandler.builder().batchSize(options.getMaxMessagesPerPoll()).build();
}

/**
* Creates a new {@link FullBatchBackPressureHandler} instance based on the provided {@link ContainerOptions}.
*
* @param options the container options.
* @return the created FullBatchBackPressureHandler.
*/
public static FullBatchBackPressureHandler fullBatchBackPressureHandler(ContainerOptions<?, ?> options) {
return FullBatchBackPressureHandler.builder().batchSize(options.getMaxMessagesPerPoll()).build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2013-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.awspring.cloud.sqs.listener;

/**
* Factory interface for creating {@link BackPressureHandler} instances to manage queue consumption backpressure.
* <p>
* Implementations of this interface are responsible for producing a new {@link BackPressureHandler} for each container,
* configured according to the provided {@link ContainerOptions}. This ensures that internal resources (such as counters
* or semaphores) are not shared across containers, which could lead to unintended side effects.
* <p>
* Default factory implementations can be found in the {@link BackPressureHandlerFactories} class.
*/
public interface BackPressureHandlerFactory {

/**
* Creates a new {@link BackPressureHandler} instance based on the provided {@link ContainerOptions}.
* <p>
* <strong>NOTE:</strong> <em>it is important for the factory to always return a new instance as otherwise it might
* result in a BackPressureHandler internal resources (counters, semaphores, ...) to be shared by multiple
* containers which is very likely not the desired behavior.</em>
*
* @param containerOptions the container options to use for creating the BackPressureHandler.
* @return the created BackPressureHandler
*/
BackPressureHandler createBackPressureHandler(ContainerOptions<?, ?> containerOptions);
}
Loading