-
-
Notifications
You must be signed in to change notification settings - Fork 337
Dynamically configure SemaphoreBackPressureHandler with BackPressureLimiter (#1251) #1308
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
loicrouchon
wants to merge
29
commits into
awspring:main
Choose a base branch
from
loicrouchon:feature/backpressure-limiter
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 25 commits
Commits
Show all changes
29 commits
Select commit
Hold shift + click to select a range
ec9f0a4
Dynamically configure SemaphoreBackPressureHandler with BackPressureL…
loicrouchon 3da2be2
Use a wrapper approach for dynamically limit the permits of Semaphore…
loicrouchon 296d05a
Introduce a CompositeBackPressureHandler allowing for composition of …
loicrouchon 3f72277
Remove BackPressureHandlerLimiter from the library and make it user-c…
loicrouchon 7dda70b
Move SQS BackPressureHandlers tests to a dedicated integration test (…
loicrouchon 2558c5d
Add a wait condition to the CompositeBPH in case 0 permits were retur…
loicrouchon bd81aea
Enhance default methods for backward compatibility (#1251)
loicrouchon 1e99159
Split SemaphoreBackPressureHandler into a ConcurrencyLimiterBlocking …
loicrouchon 74dc430
Revert changes to SemaphoreBackPressureHandler not to change default …
loicrouchon 7ed4607
Move SemaphoreBackPressureHandler#release(amount, reason) implementat…
loicrouchon dbe37d9
Address review comments
loicrouchon df7a9af
Introduce a BackPressureHandlerFactory for configuring SQS back press…
loicrouchon 8deea58
Introduce factory methods for creating back-pressure handlers (#1251)
loicrouchon ea3c65a
Rename BackPressureHandlerFactory methods
loicrouchon a1c6b44
Simplify ThroughputBackPressureHandler not to count in flight message…
loicrouchon fd36d45
Introduce BlockerBackPressureHandler marker interface (#1251)
loicrouchon 9316ce3
Move BackPressureHandler factory methods to BackPressureHandlerFactor…
loicrouchon 77488f1
Improve javadoc clarity
loicrouchon af9d6ce
Add tests for ThroughputBackPressureHandler (#1251)
loicrouchon 0da12ab
Add tests for ConcurrencyLimiterBlockingBackPressureHandler (#1251)
loicrouchon 4d3c13d
Add tests for FullBatchBackPressureHandler (#1251)
loicrouchon cf3f567
Add tests for CompositeBackPressureHandlerTest (#1251)
loicrouchon ad707da
Limit requested permits to batch size for ThroughputBackPressureHandl…
loicrouchon c7e6329
Update BlockingBackPressureHandler javadoc (#1251)
loicrouchon b1a1f56
Improve SQS tests stability (#1251)
loicrouchon 6c2e37d
Remove BatchAwareBackPressureHandler#releaseBatch() default implement…
loicrouchon 0e99553
Document backpressure management (#1251)
loicrouchon a8f158b
Move backpressure management documentation to under '8.9. Message Pro…
loicrouchon a4e2f1e
Remove default implementation of deprecated methods (#1251)
loicrouchon File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
173 changes: 173 additions & 0 deletions
173
...ud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandlerFactories.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,173 @@ | ||
/* | ||
* Copyright 2013-2025 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.awspring.cloud.sqs.listener; | ||
|
||
import java.time.Duration; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
/** | ||
* Spring Cloud AWS provides the following {@link BackPressureHandler} implementations: | ||
* <ul> | ||
* <li>{@link ConcurrencyLimiterBlockingBackPressureHandler}: Limits the maximum number of messages that can be * | ||
* processed concurrently by the application.</li> * | ||
* <li>{@link ThroughputBackPressureHandler}: Adapts the throughput dynamically between high and low modes in order to * | ||
* reduce SQS pull costs when few messages are coming in.</li> * | ||
* <li>{@link CompositeBackPressureHandler}: Allows combining multiple {@link BackPressureHandler} together and ensures | ||
* * they cooperate.</li> * | ||
* </ul> | ||
* <p> | ||
* Below are a few examples of how common use cases can be achieved. Keep in mind you can always create your own * | ||
* {@link BackPressureHandler} implementation and if needed combine it with the provided ones thanks to the * | ||
* {@link CompositeBackPressureHandler}. * * | ||
* <h3>A BackPressureHandler limiting the max concurrency with high throughput</h3> * * | ||
* | ||
* <pre>{@code | ||
* containerOptionsBuilder.backPressureHandlerFactory(containerOptions -> { | ||
* return ConcurrencyLimiterBlockingBackPressureHandler.builder() | ||
* .batchSize(containerOptions.getMaxMessagesPerPoll()) | ||
* .totalPermits(containerOptions.getMaxConcurrentMessages()) | ||
* .acquireTimeout(containerOptions.getMaxDelayBetweenPolls()) | ||
* .throughputConfiguration(BackPressureMode.FIXED_HIGH_THROUGHPUT) | ||
* .build() | ||
* }}</pre> | ||
* <p> | ||
* * * | ||
* <h3>A BackPressureHandler limiting the max concurrency with dynamic throughput</h3> * * | ||
* | ||
* <pre>{@code | ||
* containerOptionsBuilder.backPressureHandlerFactory(containerOptions -> { | ||
* int batchSize = containerOptions.getMaxMessagesPerPoll(); | ||
* var concurrencyLimiterBlockingBackPressureHandler = ConcurrencyLimiterBlockingBackPressureHandler.builder() | ||
* .batchSize(batchSize) | ||
* .totalPermits(containerOptions.getMaxConcurrentMessages()) | ||
* .acquireTimeout(containerOptions.getMaxDelayBetweenPolls()) | ||
* .throughputConfiguration(BackPressureMode.AUTO) | ||
* .build() | ||
* var throughputBackPressureHandler = ThroughputBackPressureHandler.builder() | ||
* .batchSize(batchSize) | ||
* .build(); | ||
* return new CompositeBackPressureHandler(List.of( | ||
* concurrencyLimiterBlockingBackPressureHandler, | ||
* throughputBackPressureHandler | ||
* ), | ||
* batchSize, | ||
* standbyLimitPollingInterval | ||
* ); | ||
* }}</pre> | ||
*/ | ||
public class BackPressureHandlerFactories { | ||
|
||
private BackPressureHandlerFactories() { | ||
} | ||
|
||
/** | ||
* Creates a new {@link SemaphoreBackPressureHandler} instance based on the provided {@link ContainerOptions}. | ||
* | ||
* @param options the container options. | ||
* @return the created SemaphoreBackPressureHandler. | ||
*/ | ||
public static BatchAwareBackPressureHandler semaphoreBackPressureHandler(ContainerOptions<?, ?> options) { | ||
return SemaphoreBackPressureHandler.builder().batchSize(options.getMaxMessagesPerPoll()) | ||
.totalPermits(options.getMaxConcurrentMessages()).acquireTimeout(options.getMaxDelayBetweenPolls()) | ||
.throughputConfiguration(options.getBackPressureMode()).build(); | ||
} | ||
|
||
/** | ||
* Creates a new {@link BackPressureHandler} instance based on the provided {@link ContainerOptions} combining a | ||
* {@link ConcurrencyLimiterBlockingBackPressureHandler}, a {@link ThroughputBackPressureHandler} and a | ||
* {@link FullBatchBackPressureHandler}. The exact combination of depends on the given {@link ContainerOptions}. | ||
* | ||
* @param options the container options. | ||
* @param maxIdleWaitTime the maximum amount of time to wait for a permit to be released in case no permits were | ||
* obtained. | ||
* @return the created SemaphoreBackPressureHandler. | ||
*/ | ||
public static BatchAwareBackPressureHandler adaptativeThroughputBackPressureHandler(ContainerOptions<?, ?> options, | ||
Duration maxIdleWaitTime) { | ||
BackPressureMode backPressureMode = options.getBackPressureMode(); | ||
|
||
var concurrencyLimiterBlockingBackPressureHandler = concurrencyLimiterBackPressureHandler(options); | ||
if (backPressureMode == BackPressureMode.FIXED_HIGH_THROUGHPUT) { | ||
return concurrencyLimiterBlockingBackPressureHandler; | ||
} | ||
var backPressureHandlers = new ArrayList<BackPressureHandler>(); | ||
backPressureHandlers.add(concurrencyLimiterBlockingBackPressureHandler); | ||
|
||
// The ThroughputBackPressureHandler should run second in the chain as it is non-blocking. | ||
// Running it first would result in more polls as it would potentially limit the | ||
// ConcurrencyLimiterBlockingBackPressureHandler to a lower amount of requested permits | ||
// which means the ConcurrencyLimiterBlockingBackPressureHandler blocking behavior would | ||
// not be optimally leveraged. | ||
if (backPressureMode == BackPressureMode.AUTO | ||
|| backPressureMode == BackPressureMode.ALWAYS_POLL_MAX_MESSAGES) { | ||
backPressureHandlers.add(throughputBackPressureHandler(options)); | ||
} | ||
|
||
// The FullBatchBackPressureHandler should run last in the chain to ensure that a full batch is requested or not | ||
if (backPressureMode == BackPressureMode.ALWAYS_POLL_MAX_MESSAGES) { | ||
backPressureHandlers.add(fullBatchBackPressureHandler(options)); | ||
} | ||
return compositeBackPressureHandler(options, maxIdleWaitTime, backPressureHandlers); | ||
} | ||
|
||
/** | ||
* Creates a new {@link ConcurrencyLimiterBlockingBackPressureHandler} instance based on the provided | ||
* {@link ContainerOptions}. | ||
* | ||
* @param options the container options. | ||
* @return the created ConcurrencyLimiterBlockingBackPressureHandler. | ||
*/ | ||
public static CompositeBackPressureHandler compositeBackPressureHandler(ContainerOptions<?, ?> options, | ||
Duration maxIdleWaitTime, List<BackPressureHandler> backPressureHandlers) { | ||
return CompositeBackPressureHandler.builder().batchSize(options.getMaxMessagesPerPoll()) | ||
.noPermitsReturnedWaitTimeout(maxIdleWaitTime).backPressureHandlers(backPressureHandlers).build(); | ||
} | ||
|
||
/** | ||
* Creates a new {@link ConcurrencyLimiterBlockingBackPressureHandler} instance based on the provided | ||
* {@link ContainerOptions}. | ||
* | ||
* @param options the container options. | ||
* @return the created ConcurrencyLimiterBlockingBackPressureHandler. | ||
*/ | ||
public static ConcurrencyLimiterBlockingBackPressureHandler concurrencyLimiterBackPressureHandler( | ||
ContainerOptions<?, ?> options) { | ||
return ConcurrencyLimiterBlockingBackPressureHandler.builder().batchSize(options.getMaxMessagesPerPoll()) | ||
.totalPermits(options.getMaxConcurrentMessages()).acquireTimeout(options.getMaxDelayBetweenPolls()) | ||
.build(); | ||
} | ||
|
||
/** | ||
* Creates a new {@link ThroughputBackPressureHandler} instance based on the provided {@link ContainerOptions}. | ||
* | ||
* @param options the container options. | ||
* @return the created ThroughputBackPressureHandler. | ||
*/ | ||
public static ThroughputBackPressureHandler throughputBackPressureHandler(ContainerOptions<?, ?> options) { | ||
return ThroughputBackPressureHandler.builder().batchSize(options.getMaxMessagesPerPoll()).build(); | ||
} | ||
|
||
/** | ||
* Creates a new {@link FullBatchBackPressureHandler} instance based on the provided {@link ContainerOptions}. | ||
* | ||
* @param options the container options. | ||
* @return the created FullBatchBackPressureHandler. | ||
*/ | ||
public static FullBatchBackPressureHandler fullBatchBackPressureHandler(ContainerOptions<?, ?> options) { | ||
return FullBatchBackPressureHandler.builder().batchSize(options.getMaxMessagesPerPoll()).build(); | ||
} | ||
} |
40 changes: 40 additions & 0 deletions
40
...loud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandlerFactory.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
/* | ||
* Copyright 2013-2025 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.awspring.cloud.sqs.listener; | ||
|
||
/** | ||
* Factory interface for creating {@link BackPressureHandler} instances to manage queue consumption backpressure. | ||
* <p> | ||
* Implementations of this interface are responsible for producing a new {@link BackPressureHandler} for each container, | ||
* configured according to the provided {@link ContainerOptions}. This ensures that internal resources (such as counters | ||
* or semaphores) are not shared across containers, which could lead to unintended side effects. | ||
* <p> | ||
* Default factory implementations can be found in the {@link BackPressureHandlerFactories} class. | ||
*/ | ||
public interface BackPressureHandlerFactory { | ||
loicrouchon marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
/** | ||
* Creates a new {@link BackPressureHandler} instance based on the provided {@link ContainerOptions}. | ||
* <p> | ||
* <strong>NOTE:</strong> <em>it is important for the factory to always return a new instance as otherwise it might | ||
* result in a BackPressureHandler internal resources (counters, semaphores, ...) to be shared by multiple | ||
* containers which is very likely not the desired behavior.</em> | ||
* | ||
* @param containerOptions the container options to use for creating the BackPressureHandler. | ||
* @return the created BackPressureHandler | ||
*/ | ||
BackPressureHandler createBackPressureHandler(ContainerOptions<?, ?> containerOptions); | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.