Skip to content

Commit 0da12ab

Browse files
committed
Add tests for ConcurrencyLimiterBlockingBackPressureHandler (#1251)
1 parent af9d6ce commit 0da12ab

File tree

4 files changed

+54
-15
lines changed

4 files changed

+54
-15
lines changed

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandlerFactories.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ public static CompositeBackPressureHandler compositeBackPressureHandler(Containe
147147
public static ConcurrencyLimiterBlockingBackPressureHandler concurrencyLimiterBackPressureHandler(
148148
ContainerOptions<?, ?> options) {
149149
return ConcurrencyLimiterBlockingBackPressureHandler.builder().batchSize(options.getMaxMessagesPerPoll())
150-
.totalPermits(options.getMaxConcurrentMessages()).throughputConfiguration(options.getBackPressureMode())
150+
.totalPermits(options.getMaxConcurrentMessages())
151151
.acquireTimeout(options.getMaxDelayBetweenPolls()).build();
152152
}
153153

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ConcurrencyLimiterBlockingBackPressureHandler.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,6 @@ public static class Builder {
126126

127127
private Duration acquireTimeout;
128128

129-
private BackPressureMode backPressureMode;
130-
131129
public Builder batchSize(int batchSize) {
132130
this.batchSize = batchSize;
133131
return this;
@@ -143,14 +141,9 @@ public Builder acquireTimeout(Duration acquireTimeout) {
143141
return this;
144142
}
145143

146-
public Builder throughputConfiguration(BackPressureMode backPressureConfiguration) {
147-
this.backPressureMode = backPressureConfiguration;
148-
return this;
149-
}
150-
151144
public ConcurrencyLimiterBlockingBackPressureHandler build() {
152145
Assert.noNullElements(
153-
Arrays.asList(this.batchSize, this.totalPermits, this.acquireTimeout, this.backPressureMode),
146+
Arrays.asList(this.batchSize, this.totalPermits, this.acquireTimeout),
154147
"Missing configuration");
155148
Assert.isTrue(this.batchSize > 0, "The batch size must be greater than 0");
156149
Assert.isTrue(this.totalPermits >= this.batchSize, "Total permits must be greater than the batch size");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package io.awspring.cloud.sqs.listener;
2+
3+
import org.junit.jupiter.api.BeforeEach;
4+
import org.junit.jupiter.api.Test;
5+
6+
import java.time.Duration;
7+
8+
import static org.assertj.core.api.Assertions.assertThat;
9+
10+
class ConcurrencyLimiterBlockingBackPressureHandlerTest {
11+
12+
private static final int BATCH_SIZE = 5;
13+
private static final int TOTAL_PERMITS = 10;
14+
15+
private ConcurrencyLimiterBlockingBackPressureHandler handler;
16+
17+
@BeforeEach
18+
void setUp() {
19+
handler = ConcurrencyLimiterBlockingBackPressureHandler.builder()
20+
.totalPermits(TOTAL_PERMITS)
21+
.batchSize(BATCH_SIZE)
22+
.acquireTimeout(Duration.ofMillis(100))
23+
.build();
24+
}
25+
26+
@Test
27+
void request_shouldAcquirePermits() throws InterruptedException {
28+
// Requesting a first batch should acquire the permits
29+
assertThat(handler.request(BATCH_SIZE)).isEqualTo(BATCH_SIZE);
30+
// Requesting a second batch should acquire the remaining permits
31+
assertThat(handler.request(BATCH_SIZE)).isEqualTo(BATCH_SIZE);
32+
// No permits left
33+
assertThat(handler.request(1)).isZero();
34+
}
35+
36+
@Test
37+
void release_shouldAllowFurtherRequests() throws InterruptedException {
38+
// Given all permits are acquired
39+
assertThat(handler.request(TOTAL_PERMITS)).isEqualTo(TOTAL_PERMITS);
40+
assertThat(handler.request(1)).isZero();
41+
// When releasing some permits, new requests should be allowed
42+
handler.release(3, BackPressureHandler.ReleaseReason.PROCESSED);
43+
assertThat(handler.request(5)).isEqualTo(3); // Only 3 permits were released so far
44+
}
45+
}
46+

spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/ThroughputBackPressureHandlerTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ void lowThroughputMode_shouldReturnZeroUntilRelease(BackPressureHandler.ReleaseR
4040
int batchSize = 5;
4141
assertThat(handler.request(batchSize)).isEqualTo(batchSize);
4242
// When a second batch is requested, it should return zero permits (because low throughput mode)
43-
assertThat(handler.request(batchSize)).isEqualTo(0);
43+
assertThat(handler.request(batchSize)).isZero();
4444
// When a batch is requested after a release, the expected permits should be
4545
// returned depending on the release reason
4646
handler.release(1, releaseReason);
@@ -64,13 +64,13 @@ void highThroughputMode_shouldAllowMultipleConcurrentRequests() throws Interrupt
6464
handler.release(5, BackPressureHandler.ReleaseReason.NONE_FETCHED);
6565
assertThat(handler.request(batchSize)).isEqualTo(batchSize);
6666
// And subsequent requests should return zero permits until the current batch finishes with NONE_FETCHED
67-
assertThat(handler.request(batchSize)).isEqualTo(0);
68-
assertThat(handler.request(batchSize)).isEqualTo(0);
67+
assertThat(handler.request(batchSize)).isZero();
68+
assertThat(handler.request(batchSize)).isZero();
6969
handler.release(5, BackPressureHandler.ReleaseReason.NONE_FETCHED);
7070
assertThat(handler.request(batchSize)).isEqualTo(5);
7171
// or until it (the current batch) finishes with PARTIAL_FETCH
72-
assertThat(handler.request(batchSize)).isEqualTo(0);
73-
assertThat(handler.request(batchSize)).isEqualTo(0);
72+
assertThat(handler.request(batchSize)).isZero();
73+
assertThat(handler.request(batchSize)).isZero();
7474
handler.release(3, BackPressureHandler.ReleaseReason.PARTIAL_FETCH);
7575
assertThat(handler.request(batchSize)).isEqualTo(5);
7676
}
@@ -79,7 +79,7 @@ void highThroughputMode_shouldAllowMultipleConcurrentRequests() throws Interrupt
7979
void drain_shouldSetDrainedAndReturnTrue() throws InterruptedException {
8080
boolean result = handler.drain(Duration.ofSeconds(1));
8181
assertThat(result).isTrue();
82-
assertThat(handler.request(5)).isEqualTo(0);
82+
assertThat(handler.request(5)).isZero();
8383
}
8484

8585
}

0 commit comments

Comments
 (0)