Skip to content

Commit 02e997c

Browse files
authored
Add support for multi-method listeners
1 parent 9e0cfae commit 02e997c

File tree

17 files changed

+917
-9
lines changed

17 files changed

+917
-9
lines changed

docs/src/main/asciidoc/sqs.adoc

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -632,6 +632,34 @@ Any number of `@SqsListener` annotations can be used in a bean class, and each a
632632

633633
NOTE: Queues declared in the same annotation will share the container, though each will have separate throughput and acknowledgement controls.
634634

635+
===== Handling Different Payload In The Same Listener
636+
637+
It's possible to handle different payloads in the same listener by annotating handler methods with the `@SqsHandler` annotation.
638+
Here's a sample:
639+
[source, java]
640+
----
641+
@SqsListener("myQueue")
642+
public class MyListener {
643+
644+
@SqsHandler
645+
public void handle(String message) {
646+
System.out.println(message);
647+
}
648+
649+
@SqsHandler
650+
public void handle(MyPojo pojo) {
651+
System.out.println(pojo);
652+
}
653+
654+
@SqsHandler(isDefault = true)
655+
public void handle(Object message) {
656+
System.out.println(message);
657+
}
658+
}
659+
----
660+
661+
The `isDefault = true` parameter designates a method as the fallback handler for messages that don't match any other handler's parameter type.
662+
635663
===== SNS Messages
636664

637665
Since 3.1.1, when receiving SNS messages through the `@SqsListener`, the message includes all attributes of the `SnsNotification`. To only receive need the `Message` part of the payload, you can utilize the `@SnsNotificationMessage` annotation.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright 2013-2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sqs.sample;
17+
18+
import io.awspring.cloud.sqs.annotation.SqsHandler;
19+
import io.awspring.cloud.sqs.annotation.SqsListener;
20+
import io.awspring.cloud.sqs.operations.SqsTemplate;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
import org.springframework.boot.ApplicationRunner;
24+
import org.springframework.context.annotation.Bean;
25+
import org.springframework.context.annotation.Configuration;
26+
27+
/**
28+
* Sample class to demonstrate how to handle multiple message types in a single listener
29+
* with {@link SqsHandler} annotation.
30+
*
31+
* @author José Iêdo
32+
*/
33+
@Configuration
34+
@SqsListener(queueNames = SpringSqsHandlerSample.QUEUE_NAME)
35+
public class SpringSqsHandlerSample {
36+
37+
public static final String QUEUE_NAME = "multi-method-queue";
38+
private static final Logger LOGGER = LoggerFactory.getLogger(SpringSqsHandlerSample.class);
39+
40+
private interface BaseMessage { }
41+
private record SampleRecord(String propertyOne, String propertyTwo) { }
42+
private record AnotherSampleRecord(String propertyOne, String propertyTwo) implements BaseMessage { }
43+
44+
@SqsHandler
45+
void handleMessage(SampleRecord message) {
46+
LOGGER.info("Received message of type SampleRecord: {}", message);
47+
}
48+
49+
@SqsHandler
50+
void handleMessage(BaseMessage message) {
51+
LOGGER.info("Received message of type BaseMessage: {}", message);
52+
}
53+
54+
@SqsHandler(isDefault = true)
55+
void handleMessage(Object message) {
56+
LOGGER.info("Received message of type Object: {}", message);
57+
}
58+
59+
60+
@Bean
61+
public ApplicationRunner sendMessageToQueueWithMultipleHandlers(SqsTemplate sqsTemplate) {
62+
return args -> {
63+
sqsTemplate.send(QUEUE_NAME, new SampleRecord("Hello!", "From SQS!"));
64+
sqsTemplate.send(QUEUE_NAME, new AnotherSampleRecord("Hello!", "From SQS!"));
65+
sqsTemplate.send(QUEUE_NAME, "Hello!");
66+
};
67+
}
68+
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/AbstractListenerAnnotationBeanPostProcessor.java

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.HashSet;
3636
import java.util.List;
3737
import java.util.Map;
38+
import java.util.Set;
3839
import java.util.concurrent.atomic.AtomicInteger;
3940
import java.util.stream.Collectors;
4041
import java.util.stream.Stream;
@@ -76,6 +77,7 @@
7677
*
7778
* @author Tomaz Fernandes
7879
* @author Joao Calassio
80+
* @author José Iêdo
7981
* @since 3.0
8082
*/
8183
public abstract class AbstractListenerAnnotationBeanPostProcessor<A extends Annotation>
@@ -120,16 +122,33 @@ protected void detectAnnotationsAndRegisterEndpoints(Object bean, Class<?> targe
120122
Map<Method, A> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
121123
(MethodIntrospector.MetadataLookup<A>) method -> AnnotatedElementUtils.findMergedAnnotation(method,
122124
getAnnotationClass()));
123-
if (annotatedMethods.isEmpty()) {
125+
126+
A classListener = AnnotatedElementUtils.findMergedAnnotation(targetClass, getAnnotationClass());
127+
boolean hasMethodLevelListeners = !annotatedMethods.isEmpty();
128+
boolean hasClassLevelListeners = classListener != null;
129+
130+
if (!hasMethodLevelListeners && !hasClassLevelListeners) {
124131
this.nonAnnotatedClasses.add(targetClass);
125132
}
126-
annotatedMethods.entrySet().stream()
127-
.map(entry -> createAndConfigureEndpoint(bean, entry.getKey(), entry.getValue()))
128-
.forEach(this.endpointRegistrar::registerEndpoint);
133+
else {
134+
if (hasMethodLevelListeners) {
135+
annotatedMethods.entrySet().stream()
136+
.map(entry -> createAndConfigureEndpoint(bean, entry.getKey(), entry.getValue()))
137+
.forEach(this.endpointRegistrar::registerEndpoint);
138+
}
139+
140+
if (hasClassLevelListeners) {
141+
Set<Method> handlerMethods = getHandlerMethods(targetClass);
142+
createAndConfigureMultiMethodEndpoint(bean, targetClass, classListener,
143+
new ArrayList<>(handlerMethods));
144+
}
145+
}
129146
}
130147

131148
protected abstract Class<A> getAnnotationClass();
132149

150+
protected abstract Set<Method> getHandlerMethods(Class<?> targetClass);
151+
133152
private Endpoint createAndConfigureEndpoint(Object bean, Method method, A annotation) {
134153
Endpoint endpoint = createEndpoint(annotation);
135154
ConfigUtils.INSTANCE.acceptIfInstance(endpoint, HandlerMethodEndpoint.class, hme -> {
@@ -140,8 +159,30 @@ private Endpoint createAndConfigureEndpoint(Object bean, Method method, A annota
140159
return endpoint;
141160
}
142161

162+
private void createAndConfigureMultiMethodEndpoint(Object bean, Class<?> targetClass, A classListener,
163+
List<Method> handlerMethods) {
164+
Assert.notEmpty(handlerMethods, "No handler method found for listener in class: " + targetClass);
165+
Method defaultMethod = getDefaultHandlerMethod(targetClass, handlerMethods);
166+
167+
Endpoint endpoint = createMultiMethodEndpoint(classListener, handlerMethods, defaultMethod, bean);
168+
for (Method method : handlerMethods) {
169+
ConfigUtils.INSTANCE.acceptIfInstance(endpoint, HandlerMethodEndpoint.class, hme -> {
170+
hme.setBean(bean);
171+
hme.setMethod(method);
172+
hme.setHandlerMethodFactory(this.delegatingHandlerMethodFactory);
173+
});
174+
}
175+
176+
this.endpointRegistrar.registerEndpoint(endpoint);
177+
}
178+
179+
protected abstract Method getDefaultHandlerMethod(Class<?> targetClass, List<Method> handlerMethods);
180+
143181
protected abstract Endpoint createEndpoint(A sqsListenerAnnotation);
144182

183+
protected abstract Endpoint createMultiMethodEndpoint(A sqsListenerAnnotation, List<Method> methods,
184+
@Nullable Method defaultMethod, Object bean);
185+
145186
protected Collection<String> resolveEndpointNames(String[] endpointNames) {
146187
return Arrays.stream(endpointNames).map(this::resolveExpression)
147188
.flatMap(resolvedName -> resolveAsStrings(resolvedName).stream()).collect(Collectors.toList());
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright 2013-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sqs.annotation;
17+
18+
import java.lang.annotation.Documented;
19+
import java.lang.annotation.ElementType;
20+
import java.lang.annotation.Retention;
21+
import java.lang.annotation.RetentionPolicy;
22+
import java.lang.annotation.Target;
23+
import org.springframework.messaging.handler.annotation.MessageMapping;
24+
25+
/**
26+
* Methods that are from classes annotated with {@link SqsListener} and are annotated with {@link SqsHandler} will be
27+
* marked as the target of the SQS message listener based on the message payload type.
28+
*
29+
* <p>
30+
* Each payload type must have exactly one corresponding method.
31+
* <p>
32+
* If no method matches the payload type, a method marked as the default (using {@code isDefault = true}) will be invoked.
33+
* Only one method can be designated as the default.
34+
*
35+
* @author José Iêdo
36+
*/
37+
@Target(ElementType.METHOD)
38+
@Retention(RetentionPolicy.RUNTIME)
39+
@Documented
40+
@MessageMapping
41+
public @interface SqsHandler {
42+
43+
/**
44+
* Indicates whether this method should be used as the default fallback method if no other {@link SqsHandler}
45+
* method matches the payload type.
46+
*
47+
* @return {@code true} if this is the default method, {@code false} otherwise
48+
*/
49+
boolean isDefault() default false;
50+
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListener.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,10 @@
7676
* @author Matej Nedic
7777
* @author Tomaz Fernandes
7878
* @author Joao Calassio
79+
* @author José Iêdo
7980
* @since 1.1
8081
*/
81-
@Target(ElementType.METHOD)
82+
@Target({ ElementType.METHOD, ElementType.TYPE })
8283
@Retention(RetentionPolicy.RUNTIME)
8384
@Documented
8485
@MessageMapping

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListenerAnnotationBeanPostProcessor.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.fasterxml.jackson.databind.ObjectMapper;
1919
import io.awspring.cloud.sqs.config.Endpoint;
20+
import io.awspring.cloud.sqs.config.MultiMethodSqsEndpoint;
2021
import io.awspring.cloud.sqs.config.SqsBeanNames;
2122
import io.awspring.cloud.sqs.config.SqsEndpoint;
2223
import io.awspring.cloud.sqs.listener.SqsHeaders;
@@ -26,18 +27,25 @@
2627
import io.awspring.cloud.sqs.support.resolver.QueueAttributesMethodArgumentResolver;
2728
import io.awspring.cloud.sqs.support.resolver.SqsMessageMethodArgumentResolver;
2829
import io.awspring.cloud.sqs.support.resolver.VisibilityHandlerMethodArgumentResolver;
30+
import java.lang.reflect.Method;
2931
import java.util.ArrayList;
3032
import java.util.Arrays;
3133
import java.util.Collection;
3234
import java.util.List;
35+
import java.util.Set;
36+
import org.springframework.core.MethodIntrospector;
37+
import org.springframework.core.annotation.AnnotationUtils;
38+
import org.springframework.lang.Nullable;
3339
import org.springframework.messaging.converter.MessageConverter;
3440
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
41+
import org.springframework.util.ReflectionUtils;
3542

3643
/**
3744
* {@link AbstractListenerAnnotationBeanPostProcessor} implementation for {@link SqsListener @SqsListener}.
3845
*
3946
* @author Tomaz Fernandes
4047
* @author Joao Calassio
48+
* @author José Iêdo
4149
* @since 3.0
4250
*/
4351
public class SqsListenerAnnotationBeanPostProcessor extends AbstractListenerAnnotationBeanPostProcessor<SqsListener> {
@@ -62,6 +70,22 @@ protected Endpoint createEndpoint(SqsListener sqsListenerAnnotation) {
6270
.acknowledgementMode(resolveAcknowledgement(sqsListenerAnnotation.acknowledgementMode())).build();
6371
}
6472

73+
@Override
74+
protected Endpoint createMultiMethodEndpoint(SqsListener sqsListenerAnnotation, List<Method> methods,
75+
@Nullable Method defaultMethod, Object bean) {
76+
return MultiMethodSqsEndpoint.builder()
77+
.factoryBeanName(resolveAsString(sqsListenerAnnotation.factory(), "factory"))
78+
.queueNames(resolveEndpointNames(sqsListenerAnnotation.value())).bean(bean).methods(methods)
79+
.defaultMethod(defaultMethod).id(getEndpointId(sqsListenerAnnotation.id()))
80+
.sqsEndpoint(createEndpoint(sqsListenerAnnotation)).build();
81+
}
82+
83+
@Override
84+
protected Set<Method> getHandlerMethods(Class<?> targetClass) {
85+
return MethodIntrospector.selectMethods(targetClass, (ReflectionUtils.MethodFilter) method -> AnnotationUtils
86+
.findAnnotation(method, SqsHandler.class) != null);
87+
}
88+
6589
@Override
6690
protected String getGeneratedIdPrefix() {
6791
return GENERATED_ID_PREFIX;
@@ -90,4 +114,21 @@ protected Collection<HandlerMethodArgumentResolver> createAdditionalArgumentReso
90114
return argumentResolvers;
91115
}
92116

117+
@Override
118+
protected Method getDefaultHandlerMethod(Class<?> targetClass, List<Method> handlerMethods) {
119+
Method defaultMethod = null;
120+
for (Method method : handlerMethods) {
121+
SqsHandler annotation = method.getAnnotation(SqsHandler.class);
122+
if (annotation.isDefault()) {
123+
if (defaultMethod != null) {
124+
throw new IllegalArgumentException(
125+
"There is more than one default method for the same listener in class: " + targetClass);
126+
}
127+
else {
128+
defaultMethod = method;
129+
}
130+
}
131+
}
132+
return defaultMethod;
133+
}
93134
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/AbstractEndpoint.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
* Base class for implementing a {@link HandlerMethodEndpoint}.
4343
*
4444
* @author Tomaz Fernandes
45+
* @author José Iêdo
4546
* @since 3.0
4647
*/
4748
public abstract class AbstractEndpoint implements HandlerMethodEndpoint {
@@ -90,6 +91,14 @@ public void setBean(Object bean) {
9091
this.bean = bean;
9192
}
9293

94+
/**
95+
* Get the bean instance to be used when handling a message for this endpoint.
96+
* @return the bean instance.
97+
*/
98+
public Object getBean() {
99+
return this.bean;
100+
}
101+
93102
/**
94103
* Set the method to be used when handling a message for this endpoint.
95104
* @param method the method.
@@ -109,6 +118,15 @@ public void setHandlerMethodFactory(MessageHandlerMethodFactory handlerMethodFac
109118
this.handlerMethodFactory = handlerMethodFactory;
110119
}
111120

121+
/**
122+
* Get the {@link MessageHandlerMethodFactory} to be used for handling messages in this endpoint.
123+
* @return the factory.
124+
*/
125+
@Nullable
126+
public MessageHandlerMethodFactory getMessageHandlerMethodFactory() {
127+
return this.handlerMethodFactory;
128+
}
129+
112130
@Override
113131
public void configureListenerMode(Consumer<ListenerMode> consumer) {
114132
List<MethodParameter> parameters = getMethodParameters();

0 commit comments

Comments
 (0)