-
-
Notifications
You must be signed in to change notification settings - Fork 336
Add FilteringAdapter in SQS #1388
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
# Conflicts: # spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java
...-sqs/src/main/java/io/awspring/cloud/sqs/config/AbstractMessageListenerContainerFactory.java
Outdated
Show resolved
Hide resolved
...-sqs/src/main/java/io/awspring/cloud/sqs/config/AbstractMessageListenerContainerFactory.java
Outdated
Show resolved
Hide resolved
spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java
Outdated
Show resolved
Hide resolved
spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java
Outdated
Show resolved
Hide resolved
spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java
Outdated
Show resolved
Hide resolved
...d-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java
Outdated
Show resolved
Hide resolved
spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AsyncComponentAdapters.java
Outdated
Show resolved
Hide resolved
spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptions.java
Outdated
Show resolved
Hide resolved
spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AsyncComponentAdapters.java
Outdated
Show resolved
Hide resolved
Hey @imsosleepy tnx on making the PR! Will check PR next week since I am on the conference this week and Tomaz is on vacation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @imsosleepy,
Tnx on PR to me this looks good.
I left 2 comments neat picks nothing big and have few questions.
I would like to see more in terms of filter, this can be done later or in this PR. Firstly what I would like to ask if you are okey on working on this in this PR to extend it or we tackle this in new PR? :)
Second @tomazfernandes please check if you agree with it.
-
I think we should support something like this.
@SqsListener(filterMethod = "filterMethodBean"
) -
@tomazfernandes just to make sure since this is happening inside container and we are actually not allowing messageListener impl call and ack will be autoamatic in default case?
...-sqs/src/main/java/io/awspring/cloud/sqs/config/AbstractMessageListenerContainerFactory.java
Show resolved
Hide resolved
@Override | ||
public CompletableFuture<Void> onMessage(Collection<Message<T>> messages) { | ||
Collection<Message<T>> filteredMessages = messages.stream() | ||
.filter(filter::process) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Neat pick again but please switch to for. Since size will be always maximum 10 lets switch to for loop to take benefit of performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @imsosleepy for the changes and @MatejNedic for looking into it!
The MessageFilter
interface looks good.
I'm not sureabout filtering messages at the message listener level since we'd still have the message go through the interceptors and error handlers.
I believe it would make more sense to filter it at the Sink level, so the message never gets emitted and doesn't go through the other components, we just acknowledge it.
What do you folks think?
@tomazfernandes I agree makes more sense to me as well. Abstraction What about Are you sure message will go through error handler if it ends without exception in filter tho? |
There's nothing really you can do through the annotation that you can't do with the For this particular case, I would suggest we stick with the What do you think, makes sense? |
Yeah, if it doesn't return an error it won't go through the error handler, but if it does return an error, we'll now have a message we don't want to process in the error handler, or perhaps no message at all which would likely trigger another error. If we prevent the message from entering the processing pipeline altogether I believe it'll be simpler to manage the outcomes. |
Thanks so much for the detailed feedback @tomazfernandes and @MatejNedic! I agree that filtering at the Sink level makes a lot more sense and helps avoid unnecessary processing through interceptors and error handlers. That said, I’m still wrapping my head around the architecture around MessageSink and how exactly to plug in filtering at that level. If you have a more concrete suggestion or example of where the filtering logic should live and how it should be wired into the container, that would help me move faster and avoid misalignment. I’ll dig a bit deeper on my end in the meantime. Thanks again for the guidance — really appreciate it! |
As suggested, I’ve moved the message filtering logic to the MessageSink layer to avoid unnecessary processing through interceptors and error handlers. Before proceeding with additional test for batch and FIFO modes, I’d appreciate a review of the current structure to ensure we're aligned. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM flexible easy to use I like it!
📢 Type of change
📜 Description
Added support for filtering SQS messages before they reach the listener using a MessageFilter.
The filter can be configured through SqsMessageListenerContainerFactory, and only messages that match the condition will be passed to the listener.
💡 Motivation and Context
Previously, all incoming messages were forwarded to the listener, requiring manual filtering within business logic.
This change enables declarative filtering at the container level, helping reduce unnecessary processing and keeping listener logic clean.
💚 How did you test it?
Added integration test SqsMessageFilterIntegrationTests to verify that filtered messages are not delivered to the listener.
Confirmed that only messages satisfying the filter are passed through and acknowledged.
📝 Checklist
🔮 Next steps
This initial implementation supports filtering on a single message basis only, and does not yet account for batch or asynchronous filtering scenarios.
As suggested in #1373, we plan to extend support to:
(e.g., message attributes, custom headers, or system metadata)