Skip to content

Commit 51bad1a

Browse files
committed
WIP
1 parent 93cb447 commit 51bad1a

File tree

4 files changed

+126
-61
lines changed

4 files changed

+126
-61
lines changed

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import io.awspring.cloud.sqs.listener.source.AcknowledgementProcessingMessageSource;
3636
import io.awspring.cloud.sqs.listener.source.MessageSource;
3737
import io.awspring.cloud.sqs.listener.source.PollingMessageSource;
38+
import java.time.Duration;
3839
import java.util.ArrayList;
3940
import java.util.Collection;
4041
import java.util.List;
@@ -226,17 +227,17 @@ private TaskExecutor validateCustomExecutor(TaskExecutor taskExecutor) {
226227

227228
protected BackPressureHandler createBackPressureHandler() {
228229
O containerOptions = getContainerOptions();
229-
BatchAwareBackPressureHandler backPressureHandler = SemaphoreBackPressureHandler.builder()
230-
.batchSize(containerOptions.getMaxMessagesPerPoll())
231-
.totalPermits(containerOptions.getMaxConcurrentMessages())
232-
.acquireTimeout(containerOptions.getMaxDelayBetweenPolls())
233-
.throughputConfiguration(containerOptions.getBackPressureMode()).build();
230+
List<BackPressureHandler> backPressureHandlers = new ArrayList<>(2);
231+
Duration acquireTimeout = containerOptions.getMaxDelayBetweenPolls();
232+
int batchSize = containerOptions.getMaxMessagesPerPoll();
233+
backPressureHandlers.add(SemaphoreBackPressureHandler.builder().batchSize(batchSize)
234+
.totalPermits(containerOptions.getMaxConcurrentMessages()).acquireTimeout(acquireTimeout)
235+
.throughputConfiguration(containerOptions.getBackPressureMode()).build());
234236
if (containerOptions.getBackPressureLimiter() != null) {
235-
backPressureHandler = new BackPressureHandlerLimiter(backPressureHandler,
236-
containerOptions.getBackPressureLimiter(), containerOptions.getStandbyLimitPollingInterval(),
237-
containerOptions.getMaxDelayBetweenPolls());
237+
backPressureHandlers.add(new BackPressureHandlerLimiter(containerOptions.getBackPressureLimiter(),
238+
acquireTimeout, containerOptions.getStandbyLimitPollingInterval()));
238239
}
239-
return backPressureHandler;
240+
return new CompositeBackPressureHandler(backPressureHandlers, batchSize);
240241
}
241242

242243
protected TaskExecutor createSourcesTaskExecutor() {

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandlerLimiter.java

Lines changed: 12 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,7 @@
2727
*
2828
* @see BackPressureLimiter
2929
*/
30-
public class BackPressureHandlerLimiter implements BatchAwareBackPressureHandler {
31-
32-
/**
33-
* The {@link BatchAwareBackPressureHandler} which permits should be limited by the {@link #backPressureLimiter}.
34-
*/
35-
private final BatchAwareBackPressureHandler backPressureHandler;
30+
public class BackPressureHandlerLimiter implements BackPressureHandler {
3631

3732
/**
3833
* The {@link BackPressureLimiter} which computes a limit on how many permits can be requested at a given moment.
@@ -59,50 +54,34 @@ public class BackPressureHandlerLimiter implements BatchAwareBackPressureHandler
5954

6055
private final ReducibleSemaphore semaphore = new ReducibleSemaphore(0);
6156

62-
public BackPressureHandlerLimiter(BatchAwareBackPressureHandler backPressureHandler,
63-
BackPressureLimiter backPressureLimiter, Duration standbyLimitPollingInterval, Duration acquireTimeout) {
64-
this.backPressureHandler = backPressureHandler;
57+
public BackPressureHandlerLimiter(BackPressureLimiter backPressureLimiter, Duration acquireTimeout,
58+
Duration standbyLimitPollingInterval) {
6559
this.backPressureLimiter = backPressureLimiter;
6660
this.acquireTimeout = acquireTimeout;
6761
this.standbyLimitPollingInterval = standbyLimitPollingInterval;
6862
}
6963

70-
@Override
71-
public int requestBatch() throws InterruptedException {
72-
int permits = updatePermitsLimit();
73-
int batchSize = getBatchSize();
74-
if (permits < batchSize) {
75-
return acquirePermits(permits, backPressureHandler::request);
76-
}
77-
return acquirePermits(batchSize, p -> backPressureHandler.requestBatch());
78-
}
79-
80-
@Override
81-
public void releaseBatch() {
82-
semaphore.release(getBatchSize());
83-
backPressureHandler.releaseBatch();
84-
}
85-
86-
@Override
87-
public int getBatchSize() {
88-
return backPressureHandler.getBatchSize();
89-
}
90-
9164
@Override
9265
public int request(int amount) throws InterruptedException {
9366
int permits = Math.min(updatePermitsLimit(), amount);
94-
return acquirePermits(permits, backPressureHandler::request);
67+
if (permits == 0) {
68+
Thread.sleep(standbyLimitPollingInterval.toMillis());
69+
return 0;
70+
}
71+
if (semaphore.tryAcquire(permits, acquireTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
72+
return permits;
73+
}
74+
return 0;
9575
}
9676

9777
@Override
9878
public void release(int amount) {
9979
semaphore.release(amount);
100-
backPressureHandler.release(amount);
10180
}
10281

10382
@Override
10483
public boolean drain(Duration timeout) {
105-
return backPressureHandler.drain(timeout);
84+
return true;
10685
}
10786

10887
private int updatePermitsLimit() {
@@ -120,25 +99,6 @@ else if (newLimit > oldLimit) {
12099
});
121100
}
122101

123-
private interface PermitsRequester {
124-
int request(int amount) throws InterruptedException;
125-
}
126-
127-
private int acquirePermits(int amount, PermitsRequester permitsRequester) throws InterruptedException {
128-
if (amount == 0) {
129-
Thread.sleep(standbyLimitPollingInterval.toMillis());
130-
return 0;
131-
}
132-
if (semaphore.tryAcquire(amount, acquireTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
133-
int obtained = permitsRequester.request(amount);
134-
if (obtained < amount) {
135-
semaphore.release(amount - obtained);
136-
}
137-
return obtained;
138-
}
139-
return 0;
140-
}
141-
142102
private static class ReducibleSemaphore extends Semaphore {
143103

144104
ReducibleSemaphore(int permits) {
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Copyright 2013-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sqs.listener;
17+
18+
import java.time.Duration;
19+
import java.util.List;
20+
21+
public class CompositeBackPressureHandler implements BatchAwareBackPressureHandler, IdentifiableContainerComponent {
22+
23+
private final List<BackPressureHandler> backPressureHandlers;
24+
25+
private final int batchSize;
26+
private String id;
27+
28+
public CompositeBackPressureHandler(List<BackPressureHandler> backPressureHandlers, int batchSize) {
29+
this.backPressureHandlers = backPressureHandlers;
30+
this.batchSize = batchSize;
31+
}
32+
33+
@Override
34+
public void setId(String id) {
35+
this.id = id;
36+
}
37+
38+
@Override
39+
public String getId() {
40+
return id;
41+
}
42+
43+
@Override
44+
public int requestBatch() throws InterruptedException {
45+
return request(batchSize);
46+
}
47+
48+
@Override
49+
public void releaseBatch() {
50+
release(batchSize);
51+
}
52+
53+
@Override
54+
public int getBatchSize() {
55+
return batchSize;
56+
}
57+
58+
@Override
59+
public int request(int amount) throws InterruptedException {
60+
int obtained = amount;
61+
int[] obtainedPerBph = new int[backPressureHandlers.size()];
62+
for (int i = 0; i < backPressureHandlers.size() && obtained > 0; i++) {
63+
obtainedPerBph[i] = backPressureHandlers.get(i).request(obtained);
64+
obtained = Math.min(obtained, obtainedPerBph[i]);
65+
}
66+
for (int i = 0; i < backPressureHandlers.size(); i++) {
67+
int obtainedForBph = obtainedPerBph[i];
68+
if (obtainedForBph > obtained) {
69+
if (amount == batchSize) {
70+
backPressureHandlers.get(i).release(amount);
71+
// FIXME what if we cannot acquire 'obtained' (< 'amount') permits?
72+
backPressureHandlers.get(i).request(obtained);
73+
}
74+
else {
75+
backPressureHandlers.get(i).release(obtainedForBph - obtained);
76+
}
77+
}
78+
}
79+
return obtained;
80+
}
81+
82+
@Override
83+
public void release(int amount) {
84+
for (BackPressureHandler handler : backPressureHandlers) {
85+
handler.release(amount);
86+
}
87+
}
88+
89+
@Override
90+
public boolean drain(Duration timeout) {
91+
boolean result = true;
92+
for (BackPressureHandler handler : backPressureHandlers) {
93+
result &= !handler.drain(timeout);
94+
}
95+
return result;
96+
}
97+
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SemaphoreBackPressureHandler.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@ public String getId() {
8080

8181
@Override
8282
public int request(int amount) throws InterruptedException {
83+
if (amount == batchSize) {
84+
return requestBatch();
85+
}
8386
return tryAcquire(amount, this.currentThroughputMode) ? amount : 0;
8487
}
8588

@@ -171,6 +174,10 @@ private void maybeSwitchToLowThroughputMode() {
171174

172175
@Override
173176
public void release(int amount) {
177+
if (amount == batchSize) {
178+
releaseBatch();
179+
return;
180+
}
174181
logger.trace("Releasing {} permits for {}. Permits left: {}", amount, this.id,
175182
this.semaphore.availablePermits());
176183
maybeSwitchToHighThroughputMode(amount);

0 commit comments

Comments
 (0)