Skip to content

Commit 0bd7be9

Browse files
Add support for autoStartup (#827)
* Add support for autoStartup * Update DefaultListenerContainerRegistry.java --------- Co-authored-by: Tomaz Fernandes <76525045+tomazfernandes@users.noreply.github.com>
1 parent 4e37bb3 commit 0bd7be9

File tree

8 files changed

+109
-1
lines changed

8 files changed

+109
-1
lines changed

docs/src/main/asciidoc/sqs.adoc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -803,6 +803,13 @@ See AWS documentation for more information.
803803
After that period, the framework will try to perform a partial acquire with the available permits, resulting in a poll for less than `maxMessagesPerPoll` messages, unless otherwise configured.
804804
See <<Message Processing Throughput>>.
805805

806+
|`autoStartup`
807+
|true, false
808+
|true
809+
|Determines wherever container should start automatically. When set to false the
810+
container will not launch on startup, requiring manual intervention to start it.
811+
See <<Container Lifecycle>>.
812+
806813
|`listenerShutdownTimeout`
807814
|0 - undefined
808815
|10 seconds

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ public abstract class AbstractContainerOptions<O extends ContainerOptions<O, B>,
3737

3838
private final int maxMessagesPerPoll;
3939

40+
private final boolean autoStartup;
41+
4042
private final Duration pollTimeout;
4143

4244
private final Duration maxDelayBetweenPolls;
@@ -71,6 +73,7 @@ public abstract class AbstractContainerOptions<O extends ContainerOptions<O, B>,
7173
protected AbstractContainerOptions(Builder<?, ?> builder) {
7274
this.maxConcurrentMessages = builder.maxConcurrentMessages;
7375
this.maxMessagesPerPoll = builder.maxMessagesPerPoll;
76+
this.autoStartup = builder.autoStartup;
7477
this.pollTimeout = builder.pollTimeout;
7578
this.maxDelayBetweenPolls = builder.maxDelayBetweenPolls;
7679
this.listenerShutdownTimeout = builder.listenerShutdownTimeout;
@@ -99,6 +102,11 @@ public int getMaxMessagesPerPoll() {
99102
return this.maxMessagesPerPoll;
100103
}
101104

105+
@Override
106+
public boolean isAutoStartup() {
107+
return this.autoStartup;
108+
}
109+
102110
@Override
103111
public Duration getPollTimeout() {
104112
return this.pollTimeout;
@@ -176,6 +184,8 @@ protected abstract static class Builder<B extends ContainerOptionsBuilder<B, O>,
176184

177185
private static final int DEFAULT_MAX_MESSAGES_PER_POLL = 10;
178186

187+
private static final boolean DEFAULT_AUTO_STARTUP = true;
188+
179189
private static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofSeconds(10);
180190

181191
private static final Duration DEFAULT_SEMAPHORE_TIMEOUT = Duration.ofSeconds(10);
@@ -196,6 +206,8 @@ protected abstract static class Builder<B extends ContainerOptionsBuilder<B, O>,
196206

197207
private int maxMessagesPerPoll = DEFAULT_MAX_MESSAGES_PER_POLL;
198208

209+
private boolean autoStartup = DEFAULT_AUTO_STARTUP;
210+
199211
private Duration pollTimeout = DEFAULT_POLL_TIMEOUT;
200212

201213
private Duration maxDelayBetweenPolls = DEFAULT_SEMAPHORE_TIMEOUT;
@@ -233,6 +245,7 @@ protected Builder() {
233245
protected Builder(AbstractContainerOptions<?, ?> options) {
234246
this.maxConcurrentMessages = options.maxConcurrentMessages;
235247
this.maxMessagesPerPoll = options.maxMessagesPerPoll;
248+
this.autoStartup = options.autoStartup;
236249
this.pollTimeout = options.pollTimeout;
237250
this.maxDelayBetweenPolls = options.maxDelayBetweenPolls;
238251
this.listenerShutdownTimeout = options.listenerShutdownTimeout;
@@ -261,6 +274,12 @@ public B maxMessagesPerPoll(int maxMessagesPerPoll) {
261274
return self();
262275
}
263276

277+
@Override
278+
public B autoStartup(boolean autoStartup) {
279+
this.autoStartup = autoStartup;
280+
return self();
281+
}
282+
264283
@Override
265284
public B pollTimeout(Duration pollTimeout) {
266285
Assert.notNull(pollTimeout, "pollTimeout cannot be null");

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,11 @@ public boolean isRunning() {
252252
return this.isRunning;
253253
}
254254

255+
@Override
256+
public boolean isAutoStartup() {
257+
return containerOptions.isAutoStartup();
258+
}
259+
255260
@Override
256261
public void start() {
257262
if (this.isRunning) {

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptions.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,13 @@ public interface ContainerOptions<O extends ContainerOptions<O, B>, B extends Co
5050
*/
5151
int getMaxMessagesPerPoll();
5252

53+
/**
54+
* Checks whether the container should be started automatically or manually. Default is true.
55+
*
56+
* @return true if the container starts automatically, false if it should be started manually
57+
*/
58+
boolean isAutoStartup();
59+
5360
/**
5461
* Set the maximum time the polling thread should wait for a full batch of permits to be available before trying to
5562
* acquire a partial batch if so configured. A poll is only actually executed if at least one permit is available.

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptionsBuilder.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,15 @@ public interface ContainerOptionsBuilder<B extends ContainerOptionsBuilder<B, O>
4646
*/
4747
B maxMessagesPerPoll(int maxMessagesPerPoll);
4848

49+
/**
50+
* Set whether the container should be started automatically or manually. By default, the container is set to start
51+
* automatically.
52+
*
53+
* @param autoStartup true if the container is set to start automatically, false if it should be started manually
54+
* @return this instance.
55+
*/
56+
B autoStartup(boolean autoStartup);
57+
4958
/**
5059
* Set the maximum time the polling thread should wait for a full batch of permits to be available before trying to
5160
* acquire a partial batch if so configured. A poll is only actually executed if at least one permit is available.

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/DefaultListenerContainerRegistry.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818
import io.awspring.cloud.sqs.LifecycleHandler;
1919
import java.util.Collection;
2020
import java.util.Collections;
21+
import java.util.List;
2122
import java.util.Map;
2223
import java.util.concurrent.ConcurrentHashMap;
24+
import java.util.stream.Collectors;
2325
import org.slf4j.Logger;
2426
import org.slf4j.LoggerFactory;
27+
import org.springframework.context.SmartLifecycle;
2528
import org.springframework.lang.Nullable;
2629
import org.springframework.util.Assert;
2730

@@ -75,7 +78,9 @@ public MessageListenerContainer<?> getContainerById(String id) {
7578
public void start() {
7679
synchronized (this.lifecycleMonitor) {
7780
logger.debug("Starting {}", getClass().getSimpleName());
78-
LifecycleHandler.get().start(this.listenerContainers.values());
81+
List<MessageListenerContainer<?>> containersToStart = this.listenerContainers.values().stream()
82+
.filter(SmartLifecycle::isAutoStartup).collect(Collectors.toList());
83+
LifecycleHandler.get().start(containersToStart);
7984
this.running = true;
8085
logger.debug("{} started", getClass().getSimpleName());
8186
}

spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsIntegrationTests.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.slf4j.Logger;
6868
import org.slf4j.LoggerFactory;
6969
import org.springframework.beans.factory.annotation.Autowired;
70+
import org.springframework.beans.factory.annotation.Qualifier;
7071
import org.springframework.boot.test.context.SpringBootTest;
7172
import org.springframework.context.annotation.Bean;
7273
import org.springframework.context.annotation.Configuration;
@@ -117,6 +118,8 @@ class SqsIntegrationTests extends BaseSqsIntegrationTest {
117118

118119
static final String MANUALLY_CREATE_CONTAINER_QUEUE_NAME = "manually_create_container_test_queue";
119120

121+
static final String MANUALLY_CREATE_INACTIVE_CONTAINER_QUEUE_NAME = "manually_create_inactive_container_test_queue";
122+
120123
static final String MANUALLY_CREATE_FACTORY_QUEUE_NAME = "manually_create_factory_test_queue";
121124

122125
static final String MAX_CONCURRENT_MESSAGES_QUEUE_NAME = "max_concurrent_messages_test_queue";
@@ -144,6 +147,7 @@ static void beforeTests() {
144147
createQueue(client, RESOLVES_PARAMETER_TYPES_QUEUE_NAME,
145148
singletonMap(QueueAttributeName.VISIBILITY_TIMEOUT, "20")),
146149
createQueue(client, MANUALLY_CREATE_CONTAINER_QUEUE_NAME),
150+
createQueue(client, MANUALLY_CREATE_INACTIVE_CONTAINER_QUEUE_NAME),
147151
createQueue(client, MANUALLY_CREATE_FACTORY_QUEUE_NAME),
148152
createQueue(client, MAX_CONCURRENT_MESSAGES_QUEUE_NAME)).join();
149153
}
@@ -157,6 +161,10 @@ static void beforeTests() {
157161
@Autowired
158162
ObjectMapper objectMapper;
159163

164+
@Autowired
165+
@Qualifier("inactiveContainer")
166+
MessageListenerContainer<Object> inactiveMessageListenerContainer;
167+
160168
@Test
161169
void receivesMessage() throws Exception {
162170
String messageBody = "receivesMessage-payload";
@@ -240,6 +248,17 @@ void manuallyCreatesContainer() throws Exception {
240248
assertThat(latchContainer.manuallyCreatedContainerLatch.await(10, TimeUnit.SECONDS)).isTrue();
241249
}
242250

251+
@Test
252+
void manuallyCreatesInactiveContainer() throws Exception {
253+
String messageBody = "Testing manually creates inactive container";
254+
assertThat(inactiveMessageListenerContainer.isRunning()).isFalse();
255+
sqsTemplate.send(MANUALLY_CREATE_INACTIVE_CONTAINER_QUEUE_NAME, messageBody);
256+
inactiveMessageListenerContainer.start();
257+
logger.debug("Sent message to queue {} with messageBody {}", MANUALLY_CREATE_INACTIVE_CONTAINER_QUEUE_NAME,
258+
messageBody);
259+
assertThat(latchContainer.manuallyInactiveCreatedContainerLatch.await(10, TimeUnit.SECONDS)).isTrue();
260+
}
261+
243262
// @formatter:off
244263
@Test
245264
void manuallyStartsContainerAndChangesComponent() throws Exception {
@@ -453,6 +472,7 @@ static class LatchContainer {
453472
final CountDownLatch acknowledgementCallbackSuccessLatch = new CountDownLatch(1);
454473
final CountDownLatch acknowledgementCallbackBatchLatch = new CountDownLatch(1);
455474
final CountDownLatch acknowledgementCallbackErrorLatch = new CountDownLatch(1);
475+
final CountDownLatch manuallyInactiveCreatedContainerLatch = new CountDownLatch(1);
456476
final CyclicBarrier maxConcurrentMessagesBarrier = new CyclicBarrier(21);
457477

458478
}
@@ -576,6 +596,23 @@ public MessageListenerContainer<Object> manuallyCreatedContainer() throws Except
576596
.build();
577597
}
578598

599+
@Bean("inactiveContainer")
600+
public MessageListenerContainer<Object> manuallyCreatedInactiveContainer() throws Exception {
601+
SqsAsyncClient client = BaseSqsIntegrationTest.createAsyncClient();
602+
String queueUrl = client.getQueueUrl(req -> req.queueName(MANUALLY_CREATE_INACTIVE_CONTAINER_QUEUE_NAME)).get()
603+
.queueUrl();
604+
return SqsMessageListenerContainer
605+
.builder()
606+
.queueNames(queueUrl)
607+
.sqsAsyncClient(client)
608+
.configure(options -> options
609+
.autoStartup(false)
610+
.maxDelayBetweenPolls(Duration.ofSeconds(1))
611+
.pollTimeout(Duration.ofSeconds(3)))
612+
.messageListener(msg -> {latchContainer.manuallyInactiveCreatedContainerLatch.countDown();})
613+
.build();
614+
}
615+
579616
@Bean
580617
public SqsMessageListenerContainer<String> manuallyCreatedFactory() {
581618
SqsMessageListenerContainerFactory<String> factory = new SqsMessageListenerContainerFactory<>();

spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/DefaultListenerContainerRegistryTests.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.mockito.BDDMockito.given;
2121
import static org.mockito.BDDMockito.then;
2222
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.times;
2324

2425
import org.junit.jupiter.api.Test;
2526

@@ -78,8 +79,11 @@ void shouldStartAndStopAllListenerContainers() {
7879
String id2 = "test-container-id-2";
7980
String id3 = "test-container-id-3";
8081
given(container1.getId()).willReturn(id1);
82+
given(container1.isAutoStartup()).willReturn(true);
8183
given(container2.getId()).willReturn(id2);
84+
given(container2.isAutoStartup()).willReturn(true);
8285
given(container3.getId()).willReturn(id3);
86+
given(container3.isAutoStartup()).willReturn(true);
8387
DefaultListenerContainerRegistry registry = new DefaultListenerContainerRegistry();
8488
registry.registerListenerContainer(container1);
8589
registry.registerListenerContainer(container2);
@@ -96,6 +100,21 @@ void shouldStartAndStopAllListenerContainers() {
96100
then(container3).should().stop();
97101
}
98102

103+
@Test
104+
void shouldNotStartContainerWithAutoStartupFalse() {
105+
MessageListenerContainer<Object> container1 = mock(MessageListenerContainer.class);
106+
String id1 = "test-container-id-1";
107+
given(container1.getId()).willReturn(id1);
108+
DefaultListenerContainerRegistry registry = new DefaultListenerContainerRegistry();
109+
registry.registerListenerContainer(container1);
110+
registry.start();
111+
assertThat(registry.isRunning()).isTrue();
112+
registry.stop();
113+
assertThat(registry.isRunning()).isFalse();
114+
then(container1).should(times(0)).start();
115+
then(container1).should().stop();
116+
}
117+
99118
@Test
100119
void shouldThrowIfIdAlreadyPresent() {
101120
MessageListenerContainer<Object> container = mock(MessageListenerContainer.class);

0 commit comments

Comments
 (0)