Skip to content

Commit a40821e

Browse files
committed
Simplify ThroughputBackPressureHandler not to count in flight messages (#1251)
1 parent baef9eb commit a40821e

File tree

2 files changed

+13
-57
lines changed

2 files changed

+13
-57
lines changed

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandlerFactory.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,7 @@ static ConcurrencyLimiterBlockingBackPressureHandler concurrencyLimiterBackPress
169169
* @return the created ThroughputBackPressureHandler.
170170
*/
171171
static ThroughputBackPressureHandler throughputBackPressureHandler(ContainerOptions<?, ?> options) {
172-
return ThroughputBackPressureHandler.builder().batchSize(options.getMaxMessagesPerPoll())
173-
.totalPermits(options.getMaxConcurrentMessages()).build();
172+
return ThroughputBackPressureHandler.builder().build();
174173
}
175174

176175
/**

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ThroughputBackPressureHandler.java

Lines changed: 12 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,18 @@
1818
import io.awspring.cloud.sqs.listener.source.PollingMessageSource;
1919
import java.time.Duration;
2020
import java.util.concurrent.atomic.AtomicBoolean;
21-
import java.util.concurrent.atomic.AtomicInteger;
2221
import java.util.concurrent.atomic.AtomicReference;
2322
import org.slf4j.Logger;
2423
import org.slf4j.LoggerFactory;
25-
import org.springframework.util.Assert;
2624

2725
/**
2826
* Non-blocking {@link BackPressureHandler} implementation that uses a switch between high and low throughput modes.
2927
* <p>
3028
* <strong>Throughput modes</strong>
3129
* <ul>
32-
* <li>In low-throughput mode, a single batch can be requested at a time. The number of permits that will be delivered
33-
* is adjusted so that the number of in flight messages will not exceed the batch size.</li>
30+
* <li>In low-throughput mode, a single batch can be requested at a time.</li>
3431
* <li>In high-throughput mode, multiple batches can be requested at a time. The number of permits that will be
35-
* delivered is adjusted so that the number of in flight messages will not exceed the maximum number of concurrent
36-
* messages. Note that for a single poll the maximum number of permits that will be delivered will not exceed the batch
37-
* size.</li>
32+
* delivered is the requested amount.</li>
3833
* </ul>
3934
* <p>
4035
* <strong>Throughput mode switch:</strong> The initial throughput mode is the low-throughput mode. If some messages are
@@ -47,26 +42,21 @@
4742
*
4843
* @see PollingMessageSource
4944
*/
50-
public class ThroughputBackPressureHandler implements BatchAwareBackPressureHandler, IdentifiableContainerComponent {
45+
public class ThroughputBackPressureHandler implements BackPressureHandler, IdentifiableContainerComponent {
5146

5247
private static final Logger logger = LoggerFactory.getLogger(ThroughputBackPressureHandler.class);
5348

54-
private final int batchSize;
55-
private final int maxConcurrentMessages;
56-
5749
private final AtomicReference<CurrentThroughputMode> currentThroughputMode = new AtomicReference<>(
5850
CurrentThroughputMode.LOW);
5951

60-
private final AtomicInteger inFlightRequests = new AtomicInteger(0);
52+
private final AtomicBoolean occupied = new AtomicBoolean(false);
6153

6254
private final AtomicBoolean drained = new AtomicBoolean(false);
6355

6456
private String id = getClass().getSimpleName();
6557

6658
private ThroughputBackPressureHandler(Builder builder) {
67-
this.batchSize = builder.batchSize;
68-
this.maxConcurrentMessages = builder.maxConcurrentMessages;
69-
logger.debug("ThroughputBackPressureHandler created with batchSize {}", this.batchSize);
59+
logger.debug("ThroughputBackPressureHandler created");
7060
}
7161

7262
public static Builder builder() {
@@ -83,35 +73,21 @@ public String getId() {
8373
return this.id;
8474
}
8575

86-
@Override
87-
public int requestBatch() throws InterruptedException {
88-
return request(this.batchSize);
89-
}
90-
9176
@Override
9277
public int request(int amount) throws InterruptedException {
9378
if (drained.get()) {
9479
return 0;
9580
}
96-
int amountCappedAtBatchSize = Math.min(amount, this.batchSize);
97-
int permits;
98-
int inFlight = inFlightRequests.get();
99-
if (CurrentThroughputMode.LOW == this.currentThroughputMode.get()) {
100-
// In low-throughput mode, we only acquire one batch at a time,
101-
// so we need to limit the available permits to the batchSize - inFlight messages.
102-
permits = Math.max(0, Math.min(amountCappedAtBatchSize, this.batchSize - inFlight));
103-
logger.debug("[{}] Acquired {} permits (low-throughput mode), requested: {}, in flight: {}", this.id,
104-
permits, amount, inFlight);
81+
CurrentThroughputMode throughputMode = this.currentThroughputMode.get();
82+
if (throughputMode == CurrentThroughputMode.LOW && this.occupied.get()) {
83+
logger.debug("[{}] No permits acquired because a batch already being processed in low throughput mode",
84+
this.id);
85+
return 0;
10586
}
10687
else {
107-
// In high-throughput mode, we can acquire more permits than the batch size,
108-
// but we need to limit the available permits to the maxConcurrentMessages - inFlight messages.
109-
permits = Math.max(0, Math.min(amountCappedAtBatchSize, this.maxConcurrentMessages - inFlight));
110-
logger.debug("[{}] Acquired {} permits (high-throughput mode), requested: {}, in flight: {}", this.id,
111-
permits, amount, inFlight);
88+
logger.debug("[{}] Acquired {} permits ({} mode)", this.id, amount, throughputMode);
89+
return amount;
11290
}
113-
inFlightRequests.addAndGet(permits);
114-
return permits;
11591
}
11692

11793
@Override
@@ -120,7 +96,6 @@ public void release(int amount, ReleaseReason reason) {
12096
return;
12197
}
12298
logger.debug("[{}] Releasing {} permits ({})", this.id, amount, reason);
123-
inFlightRequests.addAndGet(-amount);
12499
switch (reason) {
125100
case NONE_FETCHED -> updateThroughputMode(CurrentThroughputMode.HIGH, CurrentThroughputMode.LOW);
126101
case PARTIAL_FETCH -> updateThroughputMode(CurrentThroughputMode.LOW, CurrentThroughputMode.HIGH);
@@ -153,25 +128,7 @@ private enum CurrentThroughputMode {
153128

154129
public static class Builder {
155130

156-
private int batchSize;
157-
private int maxConcurrentMessages;
158-
159-
public Builder batchSize(int batchSize) {
160-
this.batchSize = batchSize;
161-
return this;
162-
}
163-
164-
public Builder totalPermits(int maxConcurrentMessages) {
165-
this.maxConcurrentMessages = maxConcurrentMessages;
166-
return this;
167-
}
168-
169131
public ThroughputBackPressureHandler build() {
170-
Assert.notNull(this.batchSize, "Missing batchSize configuration");
171-
Assert.isTrue(this.batchSize > 0, "batch size must be greater than 0");
172-
Assert.notNull(this.maxConcurrentMessages, "Missing maxConcurrentMessages configuration");
173-
Assert.notNull(this.maxConcurrentMessages >= this.batchSize,
174-
"maxConcurrentMessages must be greater than or equal to batchSize");
175132
return new ThroughputBackPressureHandler(this);
176133
}
177134
}

0 commit comments

Comments
 (0)