Skip to content

Add FilteringAdapter in SQS #1388

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open

Conversation

imsosleepy
Copy link
Contributor

📢 Type of change

  • Bugfix
  • New feature
  • Enhancement
  • Refactoring

📜 Description

Added support for filtering SQS messages before they reach the listener using a MessageFilter.

The filter can be configured through SqsMessageListenerContainerFactory, and only messages that match the condition will be passed to the listener.

💡 Motivation and Context

Previously, all incoming messages were forwarded to the listener, requiring manual filtering within business logic.

This change enables declarative filtering at the container level, helping reduce unnecessary processing and keeping listener logic clean.

💚 How did you test it?

Added integration test SqsMessageFilterIntegrationTests to verify that filtered messages are not delivered to the listener.

Confirmed that only messages satisfying the filter are passed through and acknowledged.

📝 Checklist

  • I reviewed submitted code
  • I added tests to verify changes
  • I updated reference documentation to reflect the change
  • All tests passing
  • No breaking changes

🔮 Next steps

This initial implementation supports filtering on a single message basis only, and does not yet account for batch or asynchronous filtering scenarios.

As suggested in #1373, we plan to extend support to:

  • Batch message filtering
  • Asynchronous filtering
  • Support for filtering based on fields other than the message payload
    (e.g., message attributes, custom headers, or system metadata)

@MatejNedic
Copy link
Member

Hey @imsosleepy tnx on making the PR! Will check PR next week since I am on the conference this week and Tomaz is on vacation.

Copy link
Member

@MatejNedic MatejNedic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @imsosleepy,
Tnx on PR to me this looks good.

I left 2 comments neat picks nothing big and have few questions.

I would like to see more in terms of filter, this can be done later or in this PR. Firstly what I would like to ask if you are okey on working on this in this PR to extend it or we tackle this in new PR? :)

Second @tomazfernandes please check if you agree with it.

  1. I think we should support something like this.
    @SqsListener(filterMethod = "filterMethodBean")

  2. @tomazfernandes just to make sure since this is happening inside container and we are actually not allowing messageListener impl call and ack will be autoamatic in default case?

@Override
public CompletableFuture<Void> onMessage(Collection<Message<T>> messages) {
Collection<Message<T>> filteredMessages = messages.stream()
.filter(filter::process)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neat pick again but please switch to for. Since size will be always maximum 10 lets switch to for loop to take benefit of performance.

Copy link
Contributor

@tomazfernandes tomazfernandes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @imsosleepy for the changes and @MatejNedic for looking into it!

The MessageFilter interface looks good.

I'm not sureabout filtering messages at the message listener level since we'd still have the message go through the interceptors and error handlers.

I believe it would make more sense to filter it at the Sink level, so the message never gets emitted and doesn't go through the other components, we just acknowledge it.

What do you folks think?

@MatejNedic
Copy link
Member

@tomazfernandes I agree makes more sense to me as well. Abstraction MessageFilter is spot on what I would expect.

What about @SqsListener(messageFilter = "messageFilterBeanMethod" ) ? I think this makes much sense I see few use cases.

Are you sure message will go through error handler if it ends without exception in filter tho?

@tomazfernandes
Copy link
Contributor

What about @SqsListener(messageFilter = "messageFilterBeanMethod" ) ? I think this makes much sense I see few use cases.

There's nothing really you can do through the annotation that you can't do with the ConfigurationOptions. In general, my rule of thumb is to limit the amount of configurations on the annotation unless users explicitly ask for them, as otherwise we might end up with all configurations that are available on ContainerOptions in the annotation and that would get messy.

For this particular case, I would suggest we stick with the ContainerOptions for the scope of this PR to keep it simpler, and we can open an issue / separate PR to add the annotation configuration after we merge it if we think that's important.

What do you think, makes sense?

@tomazfernandes
Copy link
Contributor

Are you sure message will go through error handler if it ends without exception in filter tho?

Yeah, if it doesn't return an error it won't go through the error handler, but if it does return an error, we'll now have a message we don't want to process in the error handler, or perhaps no message at all which would likely trigger another error.

If we prevent the message from entering the processing pipeline altogether I believe it'll be simpler to manage the outcomes.

@imsosleepy
Copy link
Contributor Author

imsosleepy commented Jun 3, 2025

Thanks so much for the detailed feedback @tomazfernandes and @MatejNedic!

I agree that filtering at the Sink level makes a lot more sense and helps avoid unnecessary processing through interceptors and error handlers.

That said, I’m still wrapping my head around the architecture around MessageSink and how exactly to plug in filtering at that level. If you have a more concrete suggestion or example of where the filtering logic should live and how it should be wired into the container, that would help me move faster and avoid misalignment.

I’ll dig a bit deeper on my end in the meantime. Thanks again for the guidance — really appreciate it!

@imsosleepy imsosleepy closed this Jun 3, 2025
@imsosleepy imsosleepy reopened this Jun 3, 2025
@imsosleepy
Copy link
Contributor Author

As suggested, I’ve moved the message filtering logic to the MessageSink layer to avoid unnecessary processing through interceptors and error handlers.

Before proceeding with additional test for batch and FIFO modes, I’d appreciate a review of the current structure to ensure we're aligned.

Copy link
Member

@MatejNedic MatejNedic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM flexible easy to use I like it!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component: sqs SQS integration related issue
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants