Skip to content

Commit 42da25f

Browse files
Ability to set SmartLifecycle.phase to SqsMessageListenerContainer/DefaultListenerContainerRegistry (#821)
* Add phase to the SqsMessageListenerContainer and its builder for the SmartLifecycle interface * Add phase to the SqsMessageListenerContainer and its builder for the SmartLifecycle interface * Add phase to the SqsMessageListenerContainer and its builder for the SmartLifecycle interface * Add phase to the DefaultListenerContainerRegistry * Improve documentation * Fix merge commit --------- Co-authored-by: Tomaz Fernandes <76525045+tomazfernandes@users.noreply.github.com>
1 parent 0bd7be9 commit 42da25f

File tree

8 files changed

+51
-2
lines changed

8 files changed

+51
-2
lines changed

docs/src/main/asciidoc/sqs.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -934,6 +934,8 @@ The `MessageListenerContainer` interface extends `SmartLifecycle`, which provide
934934
Containers created from `@SqsListener` annotations are registered in a `MessageListenerContainerRegistry` bean that is registered by the framework.
935935
The containers themselves are not Spring-managed beans, and the registry is responsible for managing these containers` lifecycle in application startup and shutdown.
936936

937+
NOTE: The `DefaultListenerContainerRegistry ` implementation provided by the framework allows the phase value to be set through the `setPhase` method. The default value is `MessageListenerContainer.DEFAULT_PHASE`.
938+
937939
At startup, the containers will make requests to `SQS` to retrieve the queues` urls for the provided queue names or ARNs, and for retrieving `QueueAttributes` if so configured.
938940
Providing queue urls instead of names and not requesting queue attributes can result in slightly better startup times since there's no need for such requests.
939941

@@ -962,6 +964,8 @@ MessageListenerContainer<Object> listenerContainer(SqsAsyncClient sqsAsyncClient
962964
}
963965
----
964966

967+
NOTE: The `SqsMessageListenerContainer.builder()` allows to specify the `SmartLifecycle.phase`, to override the default value defined in `MessageListenerContainer.DEFAULT_PHASE`
968+
965969
===== Retrieving Containers from the Registry
966970

967971
Containers can be retrieved by fetching the `MessageListenerContainer` bean from the container and using the `getListenerContainers` and `getContainerById` methods.

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ public abstract class AbstractMessageListenerContainer<T, O extends ContainerOpt
6969
private AsyncAcknowledgementResultCallback<T> acknowledgementResultCallback = new AsyncAcknowledgementResultCallback<T>() {
7070
};
7171

72+
private int phase = DEFAULT_PHASE;
73+
7274
/**
7375
* Create an instance with the provided {@link ContainerOptions}
7476
* @param containerOptions the options instance.
@@ -162,6 +164,14 @@ public void setComponentFactories(Collection<ContainerComponentFactory<T, O>> co
162164
this.containerComponentFactories = containerComponentFactories;
163165
}
164166

167+
/**
168+
* Set the phase for the SmartLifecycle for this container instance.
169+
* @param phase the phase.
170+
*/
171+
public void setPhase(int phase) {
172+
this.phase = phase;
173+
}
174+
165175
/**
166176
* Returns the {@link ContainerOptions} instance for this container. Changed options will take effect on container
167177
* restart.
@@ -252,6 +262,10 @@ public boolean isRunning() {
252262
return this.isRunning;
253263
}
254264

265+
public int getPhase() {
266+
return this.phase;
267+
}
268+
255269
@Override
256270
public boolean isAutoStartup() {
257271
return containerOptions.isAutoStartup();

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/DefaultListenerContainerRegistry.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ public class DefaultListenerContainerRegistry implements MessageListenerContaine
5353

5454
private volatile boolean running = false;
5555

56+
private int phase = MessageListenerContainer.DEFAULT_PHASE;
57+
5658
@Override
5759
public void registerListenerContainer(MessageListenerContainer<?> listenerContainer) {
5860
Assert.notNull(listenerContainer, "listenerContainer cannot be null");
@@ -100,4 +102,12 @@ public boolean isRunning() {
100102
return this.running;
101103
}
102104

105+
@Override
106+
public int getPhase() {
107+
return phase;
108+
}
109+
110+
public void setPhase(int phase) {
111+
this.phase = phase;
112+
}
103113
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/MessageListenerContainer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
*/
2929
public interface MessageListenerContainer<T> extends SmartLifecycle {
3030

31+
int DEFAULT_PHASE = Integer.MAX_VALUE;
32+
3133
/**
3234
* Get the container id.
3335
* @return the id.

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SqsMessageListenerContainer.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.function.Consumer;
3232
import org.slf4j.Logger;
3333
import org.slf4j.LoggerFactory;
34+
import org.springframework.context.SmartLifecycle;
3435
import org.springframework.messaging.Message;
3536
import org.springframework.util.Assert;
3637
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
@@ -177,6 +178,8 @@ public static class Builder<T> {
177178

178179
private AcknowledgementResultCallback<T> acknowledgementResultCallback;
179180

181+
private Integer phase;
182+
180183
public Builder<T> id(String id) {
181184
this.id = id;
182185
return this;
@@ -250,6 +253,11 @@ public Builder<T> configure(Consumer<SqsContainerOptionsBuilder> options) {
250253
return this;
251254
}
252255

256+
public Builder<T> phase(Integer phase) {
257+
this.phase = phase;
258+
return this;
259+
}
260+
253261
// @formatter:off
254262
public SqsMessageListenerContainer<T> build() {
255263
SqsMessageListenerContainer<T> container = new SqsMessageListenerContainer<>(this.sqsAsyncClient);
@@ -262,9 +270,11 @@ public SqsMessageListenerContainer<T> build() {
262270
.acceptIfNotNull(this.acknowledgementResultCallback, container::setAcknowledgementResultCallback)
263271
.acceptIfNotNull(this.asyncAcknowledgementResultCallback, container::setAcknowledgementResultCallback)
264272
.acceptIfNotNull(this.containerComponentFactories, container::setComponentFactories)
265-
.acceptIfNotEmpty(this.queueNames, container::setQueueNames);
273+
.acceptIfNotEmpty(this.queueNames, container::setQueueNames)
274+
.acceptIfNotNullOrElse(container::setPhase, this.phase, DEFAULT_PHASE);
266275
this.messageInterceptors.forEach(container::addMessageInterceptor);
267276
this.asyncMessageInterceptors.forEach(container::addMessageInterceptor);
277+
268278
container.configure(this.optionsConsumer);
269279
return container;
270280
}

spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainerTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ void shouldAdaptBlockingComponents() {
7373
.isInstanceOf(AsyncComponentAdapters.AbstractThreadingComponentAdapter.class)
7474
.extracting("blockingMessageInterceptor").isEqualTo(interceptor);
7575

76+
assertThat(container.getPhase()).isEqualTo(MessageListenerContainer.DEFAULT_PHASE);
7677
}
7778

7879
@Test
@@ -101,6 +102,7 @@ void shouldSetAsyncComponents() {
101102
assertThat(container.getAcknowledgementResultCallback()).isEqualTo(callback);
102103
assertThat(container.getContainerComponentFactories()).containsExactlyElementsOf(componentFactories);
103104
assertThat(container.getMessageInterceptors()).containsExactly(interceptor);
105+
assertThat(container.getPhase()).isEqualTo(MessageListenerContainer.DEFAULT_PHASE);
104106

105107
}
106108

spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/DefaultListenerContainerRegistryTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ void shouldRegisterListenerContainer() {
3939
given(container.getId()).willReturn(id);
4040
DefaultListenerContainerRegistry registry = new DefaultListenerContainerRegistry();
4141
registry.registerListenerContainer(container);
42+
assertThat(registry.getPhase()).isEqualTo(MessageListenerContainer.DEFAULT_PHASE);
4243
}
4344

4445
@Test
@@ -47,9 +48,11 @@ void shouldGetListenerContainer() {
4748
String id = "test-container-id";
4849
given(container.getId()).willReturn(id);
4950
DefaultListenerContainerRegistry registry = new DefaultListenerContainerRegistry();
51+
registry.setPhase(2);
5052
registry.registerListenerContainer(container);
5153
MessageListenerContainer<?> containerFromRegistry = registry.getContainerById(id);
5254
assertThat(containerFromRegistry).isEqualTo(container);
55+
assertThat(registry.getPhase()).isEqualTo(2);
5356
}
5457

5558
@Test

spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/SqsMessageListenerContainerTests.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,12 @@ void shouldCreateFromBuilderWithBlockingComponents() {
6161
List<ContainerComponentFactory<Object, SqsContainerOptions>> componentFactories = Collections
6262
.singletonList(componentFactory);
6363
List<String> queueNames = Arrays.asList("test-queue-name-1", "test-queue-name-2");
64+
Integer phase = 2;
6465

6566
SqsMessageListenerContainer<Object> container = SqsMessageListenerContainer.builder().messageListener(listener)
6667
.sqsAsyncClient(client).errorHandler(errorHandler).componentFactories(componentFactories)
6768
.acknowledgementResultCallback(callback).messageInterceptor(interceptor1)
68-
.messageInterceptor(interceptor2).queueNames(queueNames).build();
69+
.messageInterceptor(interceptor2).queueNames(queueNames).phase(phase).build();
6970

7071
assertThat(container.getMessageListener())
7172
.isInstanceOf(AsyncComponentAdapters.AbstractThreadingComponentAdapter.class)
@@ -90,6 +91,8 @@ void shouldCreateFromBuilderWithBlockingComponents() {
9091
assertThat(container).extracting("sqsAsyncClient").isEqualTo(client);
9192

9293
assertThat(container.getQueueNames()).containsExactlyElementsOf(queueNames);
94+
95+
assertThat(container.getPhase()).isEqualTo(phase);
9396
}
9497

9598
@Test
@@ -114,6 +117,7 @@ void shouldCreateFromBuilderWithAsyncComponents() {
114117
assertThat(container.getErrorHandler()).isEqualTo(errorHandler);
115118
assertThat(container.getAcknowledgementResultCallback()).isEqualTo(callback);
116119
assertThat(container.getMessageInterceptors()).containsExactly(interceptor1, interceptor2);
120+
assertThat(container.getPhase()).isEqualTo(MessageListenerContainer.DEFAULT_PHASE);
117121
}
118122

119123
@Test

0 commit comments

Comments
 (0)