Skip to content

Commit cf3f567

Browse files
committed
Add tests for CompositeBackPressureHandlerTest (#1251)
1 parent 4d3c13d commit cf3f567

File tree

7 files changed

+281
-69
lines changed

7 files changed

+281
-69
lines changed

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import io.awspring.cloud.sqs.listener.source.MessageSource;
3737
import io.awspring.cloud.sqs.listener.source.PollingMessageSource;
3838
import io.awspring.cloud.sqs.support.observation.AbstractListenerObservation;
39-
import java.time.Duration;
4039
import java.util.ArrayList;
4140
import java.util.Collection;
4241
import java.util.List;

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandlerFactories.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,8 @@ public static BatchAwareBackPressureHandler adaptativeThroughputBackPressureHand
133133
*/
134134
public static CompositeBackPressureHandler compositeBackPressureHandler(ContainerOptions<?, ?> options,
135135
Duration maxIdleWaitTime, List<BackPressureHandler> backPressureHandlers) {
136-
return new CompositeBackPressureHandler(List.copyOf(backPressureHandlers), options.getMaxMessagesPerPoll(),
137-
maxIdleWaitTime);
136+
return CompositeBackPressureHandler.builder().batchSize(options.getMaxMessagesPerPoll())
137+
.noPermitsReturnedWaitTimeout(maxIdleWaitTime).backPressureHandlers(backPressureHandlers).build();
138138
}
139139

140140
/**
@@ -147,8 +147,8 @@ public static CompositeBackPressureHandler compositeBackPressureHandler(Containe
147147
public static ConcurrencyLimiterBlockingBackPressureHandler concurrencyLimiterBackPressureHandler(
148148
ContainerOptions<?, ?> options) {
149149
return ConcurrencyLimiterBlockingBackPressureHandler.builder().batchSize(options.getMaxMessagesPerPoll())
150-
.totalPermits(options.getMaxConcurrentMessages())
151-
.acquireTimeout(options.getMaxDelayBetweenPolls()).build();
150+
.totalPermits(options.getMaxConcurrentMessages()).acquireTimeout(options.getMaxDelayBetweenPolls())
151+
.build();
152152
}
153153

154154
/**

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/CompositeBackPressureHandler.java

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.concurrent.locks.ReentrantLock;
2424
import org.slf4j.Logger;
2525
import org.slf4j.LoggerFactory;
26+
import org.springframework.util.Assert;
2627

2728
/**
2829
* Composite {@link BackPressureHandler} implementation that delegates the back-pressure handling to a list of
@@ -50,23 +51,23 @@ public class CompositeBackPressureHandler implements BatchAwareBackPressureHandl
5051

5152
private static final Logger logger = LoggerFactory.getLogger(CompositeBackPressureHandler.class);
5253

53-
private final List<BackPressureHandler> backPressureHandlers;
54+
private String id;
5455

5556
private final int batchSize;
5657

58+
private final Duration noPermitsReturnedWaitTimeout;
59+
60+
private final List<BackPressureHandler> backPressureHandlers;
61+
5762
private final ReentrantLock noPermitsReturnedWaitLock = new ReentrantLock();
5863

5964
private final Condition permitsReleasedCondition = noPermitsReturnedWaitLock.newCondition();
6065

61-
private final Duration noPermitsReturnedWaitTimeout;
62-
63-
private String id;
66+
private CompositeBackPressureHandler(Builder builder) {
67+
this.batchSize = builder.batchSize;
68+
this.noPermitsReturnedWaitTimeout = builder.noPermitsReturnedWaitTimeout;
69+
this.backPressureHandlers = List.copyOf(builder.backPressureHandlers);
6470

65-
public CompositeBackPressureHandler(List<BackPressureHandler> backPressureHandlers, int batchSize,
66-
Duration noPermitsReturnedWaitTimeout) {
67-
this.backPressureHandlers = backPressureHandlers;
68-
this.batchSize = batchSize;
69-
this.noPermitsReturnedWaitTimeout = noPermitsReturnedWaitTimeout;
7071
}
7172

7273
@Override
@@ -168,4 +169,37 @@ public boolean drain(Duration timeout) {
168169
private static Duration maxDuration(Duration first, Duration second) {
169170
return first.compareTo(second) > 0 ? first : second;
170171
}
172+
173+
public static Builder builder() {
174+
return new Builder();
175+
}
176+
177+
public static class Builder {
178+
179+
private int batchSize;
180+
private Duration noPermitsReturnedWaitTimeout;
181+
private List<BackPressureHandler> backPressureHandlers;
182+
183+
public Builder backPressureHandlers(List<BackPressureHandler> backPressureHandlers) {
184+
this.backPressureHandlers = backPressureHandlers;
185+
return this;
186+
}
187+
188+
public Builder batchSize(int batchSize) {
189+
this.batchSize = batchSize;
190+
return this;
191+
}
192+
193+
public Builder noPermitsReturnedWaitTimeout(Duration noPermitsReturnedWaitTimeout) {
194+
this.noPermitsReturnedWaitTimeout = noPermitsReturnedWaitTimeout;
195+
return this;
196+
}
197+
198+
public CompositeBackPressureHandler build() {
199+
Assert.notNull(this.batchSize, "Missing configuration for batch size");
200+
Assert.notNull(this.noPermitsReturnedWaitTimeout, "Missing configuration for noPermitsReturnedWaitTimeout");
201+
Assert.noNullElements(this.backPressureHandlers, "backPressureHandlers must not be null");
202+
return new CompositeBackPressureHandler(this);
203+
}
204+
}
171205
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ConcurrencyLimiterBlockingBackPressureHandler.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,7 @@ public Builder acquireTimeout(Duration acquireTimeout) {
142142
}
143143

144144
public ConcurrencyLimiterBlockingBackPressureHandler build() {
145-
Assert.noNullElements(
146-
Arrays.asList(this.batchSize, this.totalPermits, this.acquireTimeout),
145+
Assert.noNullElements(Arrays.asList(this.batchSize, this.totalPermits, this.acquireTimeout),
147146
"Missing configuration");
148147
Assert.isTrue(this.batchSize > 0, "The batch size must be greater than 0");
149148
Assert.isTrue(this.totalPermits >= this.batchSize, "Total permits must be greater than the batch size");
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Copyright 2013-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sqs.listener;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.mockito.Mockito.*;
20+
21+
import java.time.Duration;
22+
import java.util.List;
23+
import java.util.concurrent.atomic.AtomicInteger;
24+
import org.jetbrains.annotations.NotNull;
25+
import org.junit.jupiter.api.BeforeEach;
26+
import org.junit.jupiter.api.Test;
27+
28+
class CompositeBackPressureHandlerTest {
29+
30+
private BackPressureHandler handler1;
31+
private BackPressureHandler handler2;
32+
33+
@BeforeEach
34+
void setUp() {
35+
handler1 = mock(BackPressureHandler.class);
36+
handler2 = mock(BackPressureHandler.class);
37+
}
38+
39+
@Test
40+
void request_shouldDelegateToHandlersAndReturnMinPermits() throws InterruptedException {
41+
// given
42+
CompositeBackPressureHandler compositeHandler = compositeHandlerBuilder()
43+
.noPermitsReturnedWaitTimeout(Duration.ofSeconds(30)).backPressureHandlers(List.of(handler1, handler2))
44+
.build();
45+
when(handler1.request(5)).thenReturn(5);
46+
when(handler2.request(5)).thenReturn(3);
47+
// when
48+
int permits = compositeHandler.request(5);
49+
// then
50+
assertThat(permits).isEqualTo(3);
51+
verify(handler1).request(5);
52+
verify(handler2).request(5);
53+
}
54+
55+
@Test
56+
void release_shouldDelegateToHandlers() {
57+
// given
58+
CompositeBackPressureHandler compositeHandler = compositeHandlerBuilder()
59+
.noPermitsReturnedWaitTimeout(Duration.ofSeconds(30)).backPressureHandlers(List.of(handler1, handler2))
60+
.build();
61+
// when
62+
compositeHandler.release(2, BackPressureHandler.ReleaseReason.PROCESSED);
63+
// then
64+
verify(handler1).release(2, BackPressureHandler.ReleaseReason.PROCESSED);
65+
verify(handler2).release(2, BackPressureHandler.ReleaseReason.PROCESSED);
66+
}
67+
68+
@Test
69+
void request_shouldWaitIfNoPermitsAndTimeout() throws InterruptedException {
70+
// given
71+
CompositeBackPressureHandler compositeHandler = compositeHandlerBuilder()
72+
.noPermitsReturnedWaitTimeout(Duration.ofSeconds(5)).backPressureHandlers(List.of(handler1, handler2))
73+
.build();
74+
when(handler1.request(5)).thenReturn(0);
75+
when(handler2.request(5)).thenReturn(0);
76+
// when
77+
long start = System.nanoTime();
78+
int permits = compositeHandler.request(5);
79+
Duration duration = Duration.ofNanos(System.nanoTime() - start);
80+
// then
81+
assertThat(permits).isZero();
82+
assertThat(duration).isGreaterThanOrEqualTo(Duration.ofSeconds(1L));
83+
}
84+
85+
@Test
86+
void request_shouldPassReducedPermitsToSubsequentHandlers() throws InterruptedException {
87+
// given
88+
CompositeBackPressureHandler compositeHandler = compositeHandlerBuilder()
89+
.noPermitsReturnedWaitTimeout(Duration.ofSeconds(30)).backPressureHandlers(List.of(handler1, handler2))
90+
.build();
91+
when(handler1.request(10)).thenReturn(5);
92+
when(handler2.request(5)).thenReturn(5);
93+
// when
94+
int permits = compositeHandler.request(10);
95+
// then
96+
assertThat(permits).isEqualTo(5);
97+
verify(handler1).request(10);
98+
verify(handler2).request(5);
99+
}
100+
101+
@Test
102+
void request_whenLaterHandlerReturnsLessPermits_shouldReleaseDiffWithLimitedOnPreviousHandlers()
103+
throws InterruptedException {
104+
// given
105+
BackPressureHandler handler3 = mock(BackPressureHandler.class);
106+
CompositeBackPressureHandler compositeHandler = compositeHandlerBuilder()
107+
.noPermitsReturnedWaitTimeout(Duration.ofMillis(50))
108+
.backPressureHandlers(List.of(handler1, handler2, handler3)).build();
109+
when(handler1.request(5)).thenReturn(4);
110+
when(handler2.request(4)).thenReturn(2);
111+
when(handler3.request(2)).thenReturn(1);
112+
// when
113+
int permits = compositeHandler.request(5);
114+
// then
115+
assertThat(permits).isEqualTo(1);
116+
verify(handler1).request(5);
117+
verify(handler2).request(4);
118+
verify(handler3).request(2);
119+
verify(handler1).release(3, BackPressureHandler.ReleaseReason.LIMITED);
120+
verify(handler2).release(1, BackPressureHandler.ReleaseReason.LIMITED);
121+
verify(handler3, never()).release(anyInt(), any());
122+
}
123+
124+
@Test
125+
void request_shouldUnblockWhenPermitsAreReleased() throws InterruptedException {
126+
// given
127+
CompositeBackPressureHandler compositeHandler = compositeHandlerBuilder()
128+
.noPermitsReturnedWaitTimeout(Duration.ofSeconds(30)).backPressureHandlers(List.of(handler1, handler2))
129+
.build();
130+
when(handler1.request(5)).thenReturn(0, 5);
131+
when(handler2.request(5)).thenReturn(5);
132+
133+
AtomicInteger result = new AtomicInteger(-1);
134+
Thread requester = new Thread(() -> {
135+
try {
136+
// when
137+
result.set(compositeHandler.request(5));
138+
}
139+
catch (InterruptedException e) {
140+
Thread.currentThread().interrupt();
141+
}
142+
});
143+
requester.start();
144+
Thread.sleep(200); // Ensure requester is waiting
145+
assertThat(requester.isAlive()).isTrue();
146+
// when
147+
compositeHandler.release(5, BackPressureHandler.ReleaseReason.PROCESSED);
148+
requester.join(2000);
149+
// then
150+
assertThat(requester.isAlive()).isFalse();
151+
assertThat(result.get()).isZero();
152+
assertThat(compositeHandler.request(5)).isEqualTo(5);
153+
}
154+
155+
private static CompositeBackPressureHandler.@NotNull Builder compositeHandlerBuilder() {
156+
return CompositeBackPressureHandler.builder().batchSize(5);
157+
}
158+
}

spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/ConcurrencyLimiterBlockingBackPressureHandlerTest.java

Lines changed: 36 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,25 @@
1+
/*
2+
* Copyright 2013-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package io.awspring.cloud.sqs.listener;
217

3-
import org.junit.jupiter.api.BeforeEach;
4-
import org.junit.jupiter.api.Test;
18+
import static org.assertj.core.api.Assertions.assertThat;
519

620
import java.time.Duration;
7-
8-
import static org.assertj.core.api.Assertions.assertThat;
21+
import org.junit.jupiter.api.BeforeEach;
22+
import org.junit.jupiter.api.Test;
923

1024
class ConcurrencyLimiterBlockingBackPressureHandlerTest {
1125

@@ -14,33 +28,29 @@ class ConcurrencyLimiterBlockingBackPressureHandlerTest {
1428

1529
private ConcurrencyLimiterBlockingBackPressureHandler handler;
1630

17-
@BeforeEach
18-
void setUp() {
19-
handler = ConcurrencyLimiterBlockingBackPressureHandler.builder()
20-
.totalPermits(TOTAL_PERMITS)
21-
.batchSize(BATCH_SIZE)
22-
.acquireTimeout(Duration.ofMillis(100))
23-
.build();
24-
}
25-
26-
@Test
27-
void request_shouldAcquirePermits() throws InterruptedException {
31+
@BeforeEach
32+
void setUp() {
33+
handler = ConcurrencyLimiterBlockingBackPressureHandler.builder().totalPermits(TOTAL_PERMITS)
34+
.batchSize(BATCH_SIZE).acquireTimeout(Duration.ofMillis(100)).build();
35+
}
36+
37+
@Test
38+
void request_shouldAcquirePermits() throws InterruptedException {
2839
// Requesting a first batch should acquire the permits
29-
assertThat(handler.request(BATCH_SIZE)).isEqualTo(BATCH_SIZE);
40+
assertThat(handler.request(BATCH_SIZE)).isEqualTo(BATCH_SIZE);
3041
// Requesting a second batch should acquire the remaining permits
31-
assertThat(handler.request(BATCH_SIZE)).isEqualTo(BATCH_SIZE);
32-
// No permits left
33-
assertThat(handler.request(1)).isZero();
34-
}
42+
assertThat(handler.request(BATCH_SIZE)).isEqualTo(BATCH_SIZE);
43+
// No permits left
44+
assertThat(handler.request(1)).isZero();
45+
}
3546

36-
@Test
37-
void release_shouldAllowFurtherRequests() throws InterruptedException {
47+
@Test
48+
void release_shouldAllowFurtherRequests() throws InterruptedException {
3849
// Given all permits are acquired
3950
assertThat(handler.request(TOTAL_PERMITS)).isEqualTo(TOTAL_PERMITS);
40-
assertThat(handler.request(1)).isZero();
51+
assertThat(handler.request(1)).isZero();
4152
// When releasing some permits, new requests should be allowed
42-
handler.release(3, BackPressureHandler.ReleaseReason.PROCESSED);
53+
handler.release(3, BackPressureHandler.ReleaseReason.PROCESSED);
4354
assertThat(handler.request(5)).isEqualTo(3); // Only 3 permits were released so far
44-
}
55+
}
4556
}
46-

0 commit comments

Comments
 (0)