From 89084ddb8ec8e477d6b43cf922362df716214d7a Mon Sep 17 00:00:00 2001 From: dongha kim Date: Sun, 12 May 2024 20:17:51 +0900 Subject: [PATCH 01/12] Add MessagingMessagConverter bean --- .../sqs/SqsAutoConfiguration.java | 19 ++++++++++++------- .../converter/MessagingMessageConverter.java | 3 +++ 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java index aa41805a9..58b876748 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java @@ -31,6 +31,7 @@ import io.awspring.cloud.sqs.listener.interceptor.MessageInterceptor; import io.awspring.cloud.sqs.operations.SqsTemplate; import io.awspring.cloud.sqs.operations.SqsTemplateBuilder; +import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter; import io.awspring.cloud.sqs.support.converter.SqsMessagingMessageConverter; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.AutoConfiguration; @@ -92,7 +93,8 @@ public SqsMessageListenerContainerFactory defaultSqsListenerContainerFac ObjectProvider> errorHandler, ObjectProvider> asyncInterceptors, ObjectProvider> interceptors, - ObjectProvider objectMapperProvider) { + ObjectProvider objectMapperProvider, + MessagingMessageConverter messagingMessageConverter) { SqsMessageListenerContainerFactory factory = new SqsMessageListenerContainerFactory<>(); factory.configure(this::configureContainerOptions); @@ -101,15 +103,18 @@ public SqsMessageListenerContainerFactory defaultSqsListenerContainerFac errorHandler.ifAvailable(factory::setErrorHandler); interceptors.forEach(factory::addMessageInterceptor); asyncInterceptors.forEach(factory::addMessageInterceptor); - objectMapperProvider.ifAvailable(objectMapper -> setObjectMapper(factory, objectMapper)); + objectMapperProvider.ifAvailable(objectMapper -> setObjectMapper(factory, objectMapper, messagingMessageConverter)); return factory; } - private void setObjectMapper(SqsMessageListenerContainerFactory factory, ObjectMapper objectMapper) { - // Object Mapper for early deserialization in MessageSource - var messageConverter = new SqsMessagingMessageConverter(); - messageConverter.setObjectMapper(objectMapper); - factory.configure(options -> options.messageConverter(messageConverter)); + @ConditionalOnMissingBean + @Bean + public MessagingMessageConverter defaultMessageConverter() { + return new SqsMessagingMessageConverter(); + } + + private void setObjectMapper(SqsMessageListenerContainerFactory factory, ObjectMapper objectMapper, MessagingMessageConverter messagingMessageConverter) { + factory.configure(options -> options.messageConverter(messagingMessageConverter)); } private void configureContainerOptions(ContainerOptionsBuilder options) { diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/MessagingMessageConverter.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/MessagingMessageConverter.java index cf62beddc..054a0c0d1 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/MessagingMessageConverter.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/MessagingMessageConverter.java @@ -15,6 +15,7 @@ */ package io.awspring.cloud.sqs.support.converter; +import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.messaging.Message; /** @@ -38,4 +39,6 @@ public interface MessagingMessageConverter { */ S fromMessagingMessage(Message message); + void setObjectMapper(ObjectMapper objectMapper); + } From 2f0917f9fd0eaac0049ce9e3bde0e5846cb106c7 Mon Sep 17 00:00:00 2001 From: dongha kim Date: Sun, 12 May 2024 20:41:04 +0900 Subject: [PATCH 02/12] Add missing setObjectMapper method --- .../autoconfigure/sqs/SqsAutoConfiguration.java | 13 +++++++++---- .../converter/MessagingMessageConverter.java | 2 -- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java index 58b876748..7a23324ce 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java @@ -94,7 +94,7 @@ public SqsMessageListenerContainerFactory defaultSqsListenerContainerFac ObjectProvider> asyncInterceptors, ObjectProvider> interceptors, ObjectProvider objectMapperProvider, - MessagingMessageConverter messagingMessageConverter) { + MessagingMessageConverter messagingMessageConverter) { SqsMessageListenerContainerFactory factory = new SqsMessageListenerContainerFactory<>(); factory.configure(this::configureContainerOptions); @@ -109,12 +109,17 @@ public SqsMessageListenerContainerFactory defaultSqsListenerContainerFac @ConditionalOnMissingBean @Bean - public MessagingMessageConverter defaultMessageConverter() { + public MessagingMessageConverter defaultMessageConverter() { return new SqsMessagingMessageConverter(); } - private void setObjectMapper(SqsMessageListenerContainerFactory factory, ObjectMapper objectMapper, MessagingMessageConverter messagingMessageConverter) { - factory.configure(options -> options.messageConverter(messagingMessageConverter)); + private void setObjectMapper(SqsMessageListenerContainerFactory factory, ObjectMapper objectMapper, MessagingMessageConverter messagingMessageConverter) { + if(messagingMessageConverter instanceof SqsMessagingMessageConverter sqsMessagingMessageConverter) { + sqsMessagingMessageConverter.setObjectMapper(objectMapper); + factory.configure(options -> options.messageConverter(sqsMessagingMessageConverter)); + } else { + factory.configure(options -> options.messageConverter(messagingMessageConverter)); + } } private void configureContainerOptions(ContainerOptionsBuilder options) { diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/MessagingMessageConverter.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/MessagingMessageConverter.java index 054a0c0d1..6ebaa1f62 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/MessagingMessageConverter.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/MessagingMessageConverter.java @@ -39,6 +39,4 @@ public interface MessagingMessageConverter { */ S fromMessagingMessage(Message message); - void setObjectMapper(ObjectMapper objectMapper); - } From 2408678ad9b296b0fdc261d10bb56a5f8f556490 Mon Sep 17 00:00:00 2001 From: dongha kim Date: Sun, 12 May 2024 21:05:33 +0900 Subject: [PATCH 03/12] Refactoting instanceof object --- .../cloud/autoconfigure/sqs/SqsAutoConfiguration.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java index 7a23324ce..061a96311 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java @@ -114,12 +114,11 @@ public MessagingMessageConverter defaultMessageConverter() { } private void setObjectMapper(SqsMessageListenerContainerFactory factory, ObjectMapper objectMapper, MessagingMessageConverter messagingMessageConverter) { - if(messagingMessageConverter instanceof SqsMessagingMessageConverter sqsMessagingMessageConverter) { - sqsMessagingMessageConverter.setObjectMapper(objectMapper); - factory.configure(options -> options.messageConverter(sqsMessagingMessageConverter)); - } else { - factory.configure(options -> options.messageConverter(messagingMessageConverter)); + if (messagingMessageConverter instanceof SqsMessagingMessageConverter) { + ((SqsMessagingMessageConverter)messagingMessageConverter).setObjectMapper(objectMapper); } + + factory.configure(options -> options.messageConverter(messagingMessageConverter)); } private void configureContainerOptions(ContainerOptionsBuilder options) { From a353fc4bb8a936c4929ef01b4767743e1127a253 Mon Sep 17 00:00:00 2001 From: dongha kim Date: Sun, 12 May 2024 21:13:48 +0900 Subject: [PATCH 04/12] Remove unused import --- .../cloud/sqs/support/converter/MessagingMessageConverter.java | 1 - 1 file changed, 1 deletion(-) diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/MessagingMessageConverter.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/MessagingMessageConverter.java index 6ebaa1f62..cf62beddc 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/MessagingMessageConverter.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/MessagingMessageConverter.java @@ -15,7 +15,6 @@ */ package io.awspring.cloud.sqs.support.converter; -import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.messaging.Message; /** From ace28e94a078d088890c2055a7b6cb9a335314ff Mon Sep 17 00:00:00 2001 From: dongha kim Date: Sun, 12 May 2024 21:39:04 +0900 Subject: [PATCH 05/12] Fix copyright and author --- .../awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java index 061a96311..67ffb9c46 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2022 the original author or authors. + * Copyright 2013-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -52,6 +52,7 @@ * * @author Tomaz Fernandes * @author Maciej Walkowiak + * @author Dongha Kim * @since 3.0 */ @AutoConfiguration From 8d0e3779d86439ec2421f81a91c6f31a81c360bf Mon Sep 17 00:00:00 2001 From: dongha kim Date: Sun, 9 Jun 2024 16:25:41 +0900 Subject: [PATCH 06/12] Fix PR Review --- .../autoconfigure/sqs/SqsAutoConfiguration.java | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java index 67ffb9c46..dbb6bea1a 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java @@ -104,7 +104,14 @@ public SqsMessageListenerContainerFactory defaultSqsListenerContainerFac errorHandler.ifAvailable(factory::setErrorHandler); interceptors.forEach(factory::addMessageInterceptor); asyncInterceptors.forEach(factory::addMessageInterceptor); - objectMapperProvider.ifAvailable(objectMapper -> setObjectMapper(factory, objectMapper, messagingMessageConverter)); + objectMapperProvider.ifAvailable(objectMapper -> { + if (messagingMessageConverter instanceof SqsMessagingMessageConverter) { + ((SqsMessagingMessageConverter) messagingMessageConverter).setObjectMapper(objectMapper); + } + }); + + factory.configure(options -> options.messageConverter(messagingMessageConverter)); + return factory; } @@ -114,14 +121,6 @@ public MessagingMessageConverter defaultMessageConverter() { return new SqsMessagingMessageConverter(); } - private void setObjectMapper(SqsMessageListenerContainerFactory factory, ObjectMapper objectMapper, MessagingMessageConverter messagingMessageConverter) { - if (messagingMessageConverter instanceof SqsMessagingMessageConverter) { - ((SqsMessagingMessageConverter)messagingMessageConverter).setObjectMapper(objectMapper); - } - - factory.configure(options -> options.messageConverter(messagingMessageConverter)); - } - private void configureContainerOptions(ContainerOptionsBuilder options) { PropertyMapper mapper = PropertyMapper.get().alwaysApplyingWhenNonNull(); mapper.from(this.sqsProperties.getListener().getMaxConcurrentMessages()).to(options::maxConcurrentMessages); From 3ea1f5cd24648177cbd98c550862c5db3252b9f9 Mon Sep 17 00:00:00 2001 From: daniel kim Date: Sun, 18 May 2025 19:41:21 +0900 Subject: [PATCH 07/12] feat : add message filter and test --- .../SqsMessageListenerContainerFactory.java | 20 +-- .../listener/AbstractContainerOptions.java | 22 ++++ .../cloud/sqs/listener/ContainerOptions.java | 7 ++ .../sqs/listener/ContainerOptionsBuilder.java | 10 ++ .../FilteringMessageListenerAdapter.java | 23 ++++ .../support/filter/DefaultMessageFilter.java | 10 ++ .../sqs/support/filter/MessageFilter.java | 7 ++ .../SqsMessageFilterIntegrationTests.java | 114 ++++++++++++++++++ 8 files changed, 205 insertions(+), 8 deletions(-) create mode 100644 spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/FilteringMessageListenerAdapter.java create mode 100644 spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/filter/DefaultMessageFilter.java create mode 100644 spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/filter/MessageFilter.java create mode 100644 spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsMessageFilterIntegrationTests.java diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsMessageListenerContainerFactory.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsMessageListenerContainerFactory.java index e2f9f4902..7b139914e 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsMessageListenerContainerFactory.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsMessageListenerContainerFactory.java @@ -17,13 +17,7 @@ import io.awspring.cloud.sqs.ConfigUtils; import io.awspring.cloud.sqs.annotation.SqsListener; -import io.awspring.cloud.sqs.listener.AsyncMessageListener; -import io.awspring.cloud.sqs.listener.ContainerComponentFactory; -import io.awspring.cloud.sqs.listener.ContainerOptions; -import io.awspring.cloud.sqs.listener.MessageListener; -import io.awspring.cloud.sqs.listener.SqsContainerOptions; -import io.awspring.cloud.sqs.listener.SqsContainerOptionsBuilder; -import io.awspring.cloud.sqs.listener.SqsMessageListenerContainer; +import io.awspring.cloud.sqs.listener.*; import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementResultCallback; import io.awspring.cloud.sqs.listener.acknowledgement.AsyncAcknowledgementResultCallback; import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler; @@ -34,6 +28,9 @@ import java.util.Collection; import java.util.function.Consumer; import java.util.function.Supplier; + +import io.awspring.cloud.sqs.support.filter.DefaultMessageFilter; +import io.awspring.cloud.sqs.support.filter.MessageFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; @@ -146,7 +143,14 @@ protected SqsMessageListenerContainer createContainerInstance(Endpoint endpoi endpoint.getId() != null ? endpoint.getId() : endpoint.getLogicalNames()); Assert.notNull(this.sqsAsyncClientSupplier, "asyncClientSupplier not set"); SqsAsyncClient asyncClient = getSqsAsyncClientInstance(); - return new SqsMessageListenerContainer<>(asyncClient, containerOptions); + + SqsMessageListenerContainer container = new SqsMessageListenerContainer<>(asyncClient, containerOptions); + MessageFilter filter = (MessageFilter) containerOptions.getMessageFilter(); + if (!(filter instanceof DefaultMessageFilter)) { + container.setMessageListener(new FilteringMessageListenerAdapter<>(container.getMessageListener(), filter)); + } + + return container; } protected SqsAsyncClient getSqsAsyncClientInstance() { diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java index 81f4eb3f2..c0626680d 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java @@ -20,6 +20,9 @@ import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter; import io.awspring.cloud.sqs.support.converter.SqsMessagingMessageConverter; import java.time.Duration; + +import io.awspring.cloud.sqs.support.filter.DefaultMessageFilter; +import io.awspring.cloud.sqs.support.filter.MessageFilter; import org.springframework.core.task.TaskExecutor; import org.springframework.lang.Nullable; import org.springframework.retry.backoff.BackOffPolicy; @@ -59,6 +62,9 @@ public abstract class AbstractContainerOptions, private final AcknowledgementMode acknowledgementMode; + private final MessageFilter messageFilter; + + @Nullable private final AcknowledgementOrdering acknowledgementOrdering; @@ -92,6 +98,8 @@ protected AbstractContainerOptions(Builder builder) { this.acknowledgementThreshold = builder.acknowledgementThreshold; this.componentsTaskExecutor = builder.componentsTaskExecutor; this.acknowledgementResultTaskExecutor = builder.acknowledgementResultTaskExecutor; + this.messageFilter = builder.messageFilter; + Assert.isTrue(this.maxMessagesPerPoll <= this.maxConcurrentMessages, String.format( "messagesPerPoll should be less than or equal to maxConcurrentMessages. Values provided: %s and %s respectively", this.maxMessagesPerPoll, this.maxConcurrentMessages)); @@ -164,6 +172,9 @@ public MessagingMessageConverter getMessageConverter() { return this.messageConverter; } + @Override + public MessageFilter getMessageFilter() {return this.messageFilter; } + @Nullable @Override public Duration getAcknowledgementInterval() { @@ -244,6 +255,8 @@ protected abstract static class Builder, private AcknowledgementMode acknowledgementMode = DEFAULT_ACKNOWLEDGEMENT_MODE; + private MessageFilter messageFilter = new DefaultMessageFilter<>(); + @Nullable private AcknowledgementOrdering acknowledgementOrdering; @@ -280,6 +293,7 @@ protected Builder(AbstractContainerOptions options) { this.acknowledgementThreshold = options.acknowledgementThreshold; this.componentsTaskExecutor = options.componentsTaskExecutor; this.acknowledgementResultTaskExecutor = options.acknowledgementResultTaskExecutor; + this.messageFilter = options.messageFilter; } @Override @@ -400,6 +414,14 @@ public B messageConverter(MessagingMessageConverter messageConverter) { return self(); } + @Override + public B messageFilter(MessageFilter messageFilter) { + Assert.notNull(messageFilter, "messageFilter cannot be null"); + this.messageFilter = messageFilter; + return self(); + } + + @SuppressWarnings("unchecked") private B self() { return (B) this; diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptions.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptions.java index ad7313cf6..9d3471bd0 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptions.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptions.java @@ -20,6 +20,8 @@ import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter; import java.time.Duration; import java.util.Collection; + +import io.awspring.cloud.sqs.support.filter.MessageFilter; import org.springframework.core.task.TaskExecutor; import org.springframework.lang.Nullable; import org.springframework.retry.backoff.BackOffPolicy; @@ -139,6 +141,11 @@ default BackOffPolicy getPollBackOffPolicy() { */ MessagingMessageConverter getMessageConverter(); + /** Return the message filter applied before message processing. + * @return the message filter. + * */ + MessageFilter getMessageFilter(); + /** * Return the maximum interval between acknowledgements for batch acknowledgements. * @return the interval. diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptionsBuilder.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptionsBuilder.java index 9d03b7964..29780630a 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptionsBuilder.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptionsBuilder.java @@ -19,6 +19,8 @@ import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementMode; import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter; import java.time.Duration; + +import io.awspring.cloud.sqs.support.filter.MessageFilter; import org.springframework.core.task.TaskExecutor; import org.springframework.retry.backoff.BackOffPolicy; @@ -187,6 +189,14 @@ default B pollBackOffPolicy(BackOffPolicy pollBackOffPolicy) { */ B messageConverter(MessagingMessageConverter messageConverter); + /** + * Set the {@link MessagingMessageConverter} for this container. + * + * @param messageFilter the message filter. + * @return this instance. + */ + B messageFilter(MessageFilter messageFilter); + /** * Create the {@link ContainerOptions} instance. * diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/FilteringMessageListenerAdapter.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/FilteringMessageListenerAdapter.java new file mode 100644 index 000000000..b1eabd449 --- /dev/null +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/FilteringMessageListenerAdapter.java @@ -0,0 +1,23 @@ +package io.awspring.cloud.sqs.listener; + +import io.awspring.cloud.sqs.support.filter.MessageFilter; +import org.springframework.messaging.Message; + +public class FilteringMessageListenerAdapter implements MessageListener { + + private final AsyncMessageListener delegate; + private final MessageFilter filter; + + public FilteringMessageListenerAdapter(AsyncMessageListener delegate, MessageFilter filter) { + this.delegate = delegate; + this.filter = filter; + } + + @Override + public void onMessage(Message message) { + if (filter.process(message)) { + delegate.onMessage(message); + } + } +} + diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/filter/DefaultMessageFilter.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/filter/DefaultMessageFilter.java new file mode 100644 index 000000000..f0f8bcddf --- /dev/null +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/filter/DefaultMessageFilter.java @@ -0,0 +1,10 @@ +package io.awspring.cloud.sqs.support.filter; + +import org.springframework.messaging.Message; + +public class DefaultMessageFilter implements MessageFilter { + @Override + public boolean process(Message message) { + return true; + } +} diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/filter/MessageFilter.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/filter/MessageFilter.java new file mode 100644 index 000000000..3c995c89a --- /dev/null +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/filter/MessageFilter.java @@ -0,0 +1,7 @@ +package io.awspring.cloud.sqs.support.filter; + +import org.springframework.messaging.Message; + +public interface MessageFilter { + boolean process(Message message); +} diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsMessageFilterIntegrationTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsMessageFilterIntegrationTests.java new file mode 100644 index 000000000..bb9c06953 --- /dev/null +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsMessageFilterIntegrationTests.java @@ -0,0 +1,114 @@ +package io.awspring.cloud.sqs.integration; + +import io.awspring.cloud.sqs.annotation.SqsListener; +import io.awspring.cloud.sqs.config.SqsBootstrapConfiguration; +import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory; +import io.awspring.cloud.sqs.operations.SqsTemplate; +import io.awspring.cloud.sqs.support.filter.MessageFilter; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.messaging.Message; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +@SpringBootTest +public class SqsMessageFilterIntegrationTests extends BaseSqsIntegrationTest { + + private static final Logger logger = LoggerFactory.getLogger(SqsMessageFilterIntegrationTests.class); + private static final String FILTER_QUEUE_NAME = "test-filter-queue"; + + static final String MESSAGE_FILTERED_FACTORY = "messageFilteredFactory"; + + @Autowired + LatchContainer latchContainer; + + @Autowired + SqsTemplate sqsTemplate; + + @BeforeAll + static void beforeTests() { + SqsAsyncClient client = createAsyncClient(); + CompletableFuture.allOf( + createQueue(client, FILTER_QUEUE_NAME) + ).join(); + } + + record SampleRecord(String propertyOne, String propertyTwo) {} + + @Test + void shouldReceiveMessageThatPassesFilter() throws Exception { + sqsTemplate.send(FILTER_QUEUE_NAME, new SampleRecord("Hello!", "Filtered In")); + assertThat(latchContainer.messageReceivedLatch.await(10, TimeUnit.SECONDS)).isTrue(); + } + + // --- Configuration for test environment --- + @Import(SqsBootstrapConfiguration.class) + @Configuration + static class FilterTestConfig { + + @Bean(name = MESSAGE_FILTERED_FACTORY) + public SqsMessageListenerContainerFactory filteredFactory() { + return SqsMessageListenerContainerFactory.builder() + .configure(options -> options.messageFilter(new SampleRecordFilter())) + .sqsAsyncClientSupplier(BaseSqsIntegrationTest::createAsyncClient) + .build(); + } + + @Bean + public SqsTemplate sqsTemplate() { + return SqsTemplate.builder() + .sqsAsyncClient(BaseSqsIntegrationTest.createAsyncClient()) + .build(); + } + + @Bean + public FilteringListener filteringListener() { + return new FilteringListener(); + } + + @Bean + public LatchContainer latchContainer() { + return new LatchContainer(); + } + } + + // --- Message Filter implementation --- + static class SampleRecordFilter implements MessageFilter { + @Override + public boolean process(Message message) { + logger.info("Filtering message: {}", message.getPayload()); + logger.info(String.valueOf("Hello".equals(message.getPayload().propertyOne()))); + return "Hello".equals(message.getPayload().propertyOne()); + } + } + + // --- Listener that receives filtered message --- + static class FilteringListener { + + @Autowired + LatchContainer latchContainer; + + @SqsListener(queueNames = FILTER_QUEUE_NAME, id = "filter-test-listener", factory = MESSAGE_FILTERED_FACTORY) + void listen(SampleRecord record) { + logger.info("Received message: {}", record); + latchContainer.messageReceivedLatch.countDown(); + } + } + + // --- Shared Latch for test synchronization --- + static class LatchContainer { + CountDownLatch messageReceivedLatch = new CountDownLatch(1); + } +} From 708de95e59ef5f75fcd754fb90f9fa15d28febbe Mon Sep 17 00:00:00 2001 From: daniel kim Date: Tue, 20 May 2025 11:21:10 +0900 Subject: [PATCH 08/12] fix : convert messageFilter to apater --- ...stractMessageListenerContainerFactory.java | 17 +++---- .../SqsMessageListenerContainerFactory.java | 21 +++------ .../AbstractMessageListenerContainer.java | 19 +++++--- .../sqs/listener/AsyncComponentAdapters.java | 44 +++++++++++++++++++ .../FilteringMessageListenerAdapter.java | 23 ---------- .../SqsMessageFilterIntegrationTests.java | 5 +-- 6 files changed, 71 insertions(+), 58 deletions(-) delete mode 100644 spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/FilteringMessageListenerAdapter.java diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/AbstractMessageListenerContainerFactory.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/AbstractMessageListenerContainerFactory.java index 1de54cbae..98fbb5241 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/AbstractMessageListenerContainerFactory.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/AbstractMessageListenerContainerFactory.java @@ -16,26 +16,22 @@ package io.awspring.cloud.sqs.config; import io.awspring.cloud.sqs.ConfigUtils; -import io.awspring.cloud.sqs.listener.AbstractMessageListenerContainer; -import io.awspring.cloud.sqs.listener.AsyncComponentAdapters; -import io.awspring.cloud.sqs.listener.AsyncMessageListener; -import io.awspring.cloud.sqs.listener.ContainerComponentFactory; -import io.awspring.cloud.sqs.listener.ContainerOptions; -import io.awspring.cloud.sqs.listener.ContainerOptionsBuilder; -import io.awspring.cloud.sqs.listener.MessageListener; -import io.awspring.cloud.sqs.listener.MessageListenerContainer; +import io.awspring.cloud.sqs.listener.*; import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementResultCallback; import io.awspring.cloud.sqs.listener.acknowledgement.AsyncAcknowledgementResultCallback; import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler; import io.awspring.cloud.sqs.listener.errorhandler.ErrorHandler; import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor; import io.awspring.cloud.sqs.listener.interceptor.MessageInterceptor; +import io.awspring.cloud.sqs.support.filter.DefaultMessageFilter; +import io.awspring.cloud.sqs.support.filter.MessageFilter; +import org.springframework.messaging.Message; +import org.springframework.util.Assert; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.function.Consumer; -import org.springframework.messaging.Message; -import org.springframework.util.Assert; /** * Base implementation for a {@link MessageListenerContainerFactory}. Contains the components and @@ -180,6 +176,7 @@ public C createContainer(Endpoint endpoint) { B options = this.containerOptionsBuilder.createCopy(); configure(endpoint, options); C container = createContainerInstance(endpoint, options.build()); + endpoint.setupContainer(container); configureContainer(container, endpoint); return container; diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsMessageListenerContainerFactory.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsMessageListenerContainerFactory.java index 7b139914e..768b46431 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsMessageListenerContainerFactory.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsMessageListenerContainerFactory.java @@ -24,19 +24,17 @@ import io.awspring.cloud.sqs.listener.errorhandler.ErrorHandler; import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor; import io.awspring.cloud.sqs.listener.interceptor.MessageInterceptor; -import java.util.ArrayList; -import java.util.Collection; -import java.util.function.Consumer; -import java.util.function.Supplier; - -import io.awspring.cloud.sqs.support.filter.DefaultMessageFilter; -import io.awspring.cloud.sqs.support.filter.MessageFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; import org.springframework.util.Assert; import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import java.util.ArrayList; +import java.util.Collection; +import java.util.function.Consumer; +import java.util.function.Supplier; + /** * {@link MessageListenerContainerFactory} implementation for creating {@link SqsMessageListenerContainer} instances. A * factory can be assigned to a {@link io.awspring.cloud.sqs.annotation.SqsListener @SqsListener} by using the @@ -143,14 +141,7 @@ protected SqsMessageListenerContainer createContainerInstance(Endpoint endpoi endpoint.getId() != null ? endpoint.getId() : endpoint.getLogicalNames()); Assert.notNull(this.sqsAsyncClientSupplier, "asyncClientSupplier not set"); SqsAsyncClient asyncClient = getSqsAsyncClientInstance(); - - SqsMessageListenerContainer container = new SqsMessageListenerContainer<>(asyncClient, containerOptions); - MessageFilter filter = (MessageFilter) containerOptions.getMessageFilter(); - if (!(filter instanceof DefaultMessageFilter)) { - container.setMessageListener(new FilteringMessageListenerAdapter<>(container.getMessageListener(), filter)); - } - - return container; + return new SqsMessageListenerContainer<>(asyncClient, containerOptions); } protected SqsAsyncClient getSqsAsyncClientInstance() { diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java index 9566fbb7a..a3437c2c5 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java @@ -21,11 +21,8 @@ import io.awspring.cloud.sqs.listener.errorhandler.ErrorHandler; import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor; import io.awspring.cloud.sqs.listener.interceptor.MessageInterceptor; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.function.Consumer; +import io.awspring.cloud.sqs.support.filter.DefaultMessageFilter; +import io.awspring.cloud.sqs.support.filter.MessageFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.SmartLifecycle; @@ -33,6 +30,12 @@ import org.springframework.messaging.Message; import org.springframework.util.Assert; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.function.Consumer; + /** * Base implementation for {@link MessageListenerContainer} with {@link SmartLifecycle} and component management * capabilities. @@ -132,7 +135,11 @@ public void addMessageInterceptor(AsyncMessageInterceptor messageInterceptor) @Override public void setMessageListener(MessageListener messageListener) { Assert.notNull(messageListener, "messageListener cannot be null"); - this.messageListener = AsyncComponentAdapters.adapt(messageListener); + if(containerOptions.getMessageFilter() instanceof DefaultMessageFilter) { + this.messageListener = AsyncComponentAdapters.adapt(messageListener); + } else { + this.messageListener = AsyncComponentAdapters.adaptFilter(messageListener, (MessageFilter) containerOptions.getMessageFilter()); + } } @Override diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AsyncComponentAdapters.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AsyncComponentAdapters.java index c2563002a..fbc42ee7b 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AsyncComponentAdapters.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AsyncComponentAdapters.java @@ -27,6 +27,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.function.Supplier; + +import io.awspring.cloud.sqs.support.filter.MessageFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.task.TaskExecutor; @@ -76,6 +78,10 @@ public static AsyncMessageListener adapt(MessageListener messageListen return new BlockingMessageListenerAdapter<>(messageListener); } + public static AsyncMessageListener adaptFilter(MessageListener messageListener, MessageFilter messageFilter) { + return new FilteredMessageListenerAdapter<>(messageListener, messageFilter); + } + public static AsyncAcknowledgementResultCallback adapt( AcknowledgementResultCallback acknowledgementResultCallback) { return new BlockingAcknowledgementResultCallbackAdapter<>(acknowledgementResultCallback); @@ -214,6 +220,44 @@ public CompletableFuture onMessage(Collection> messages) { } } + private static class FilteredMessageListenerAdapter extends AbstractThreadingComponentAdapter + implements AsyncMessageListener { + + private final MessageListener filteredMessageListener; + private final MessageFilter filter; + + public FilteredMessageListenerAdapter(MessageListener filteredMessageListener, MessageFilter filter) { + this.filteredMessageListener = filteredMessageListener; + this.filter = filter; + } + + @Override + public CompletableFuture onMessage(Message message) { + if (filter.process(message)) { + return execute(() -> this.filteredMessageListener.onMessage(message)); + } + else { + logger.debug("Message filtered out: {}", message.getPayload()); + return CompletableFuture.completedFuture(null); + } + } + + @Override + public CompletableFuture onMessage(Collection> messages) { + Collection> filteredMessages = messages.stream() + .filter(filter::process) + .toList(); + + if (filteredMessages.isEmpty()) { + logger.debug("All messages were filtered out."); + return CompletableFuture.completedFuture(null); + } + + return execute(() -> this.filteredMessageListener.onMessage(filteredMessages)); + } + } + + private static class BlockingErrorHandlerAdapter extends AbstractThreadingComponentAdapter implements AsyncErrorHandler { diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/FilteringMessageListenerAdapter.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/FilteringMessageListenerAdapter.java deleted file mode 100644 index b1eabd449..000000000 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/FilteringMessageListenerAdapter.java +++ /dev/null @@ -1,23 +0,0 @@ -package io.awspring.cloud.sqs.listener; - -import io.awspring.cloud.sqs.support.filter.MessageFilter; -import org.springframework.messaging.Message; - -public class FilteringMessageListenerAdapter implements MessageListener { - - private final AsyncMessageListener delegate; - private final MessageFilter filter; - - public FilteringMessageListenerAdapter(AsyncMessageListener delegate, MessageFilter filter) { - this.delegate = delegate; - this.filter = filter; - } - - @Override - public void onMessage(Message message) { - if (filter.process(message)) { - delegate.onMessage(message); - } - } -} - diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsMessageFilterIntegrationTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsMessageFilterIntegrationTests.java index bb9c06953..ab206ed5d 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsMessageFilterIntegrationTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsMessageFilterIntegrationTests.java @@ -48,7 +48,7 @@ static void beforeTests() { record SampleRecord(String propertyOne, String propertyTwo) {} @Test - void shouldReceiveMessageThatPassesFilter() throws Exception { + void shouldReceiveMessageThatPassesProcess() throws Exception { sqsTemplate.send(FILTER_QUEUE_NAME, new SampleRecord("Hello!", "Filtered In")); assertThat(latchContainer.messageReceivedLatch.await(10, TimeUnit.SECONDS)).isTrue(); } @@ -84,7 +84,6 @@ public LatchContainer latchContainer() { } } - // --- Message Filter implementation --- static class SampleRecordFilter implements MessageFilter { @Override public boolean process(Message message) { @@ -94,7 +93,6 @@ public boolean process(Message message) { } } - // --- Listener that receives filtered message --- static class FilteringListener { @Autowired @@ -107,7 +105,6 @@ void listen(SampleRecord record) { } } - // --- Shared Latch for test synchronization --- static class LatchContainer { CountDownLatch messageReceivedLatch = new CountDownLatch(1); } From d5cecc76ac9239cbded8307eef9c89f771615618 Mon Sep 17 00:00:00 2001 From: daniel kim Date: Tue, 20 May 2025 11:47:32 +0900 Subject: [PATCH 09/12] fix : test renewal --- .../SqsMessageFilterIntegrationTests.java | 70 +++++++++++++------ 1 file changed, 50 insertions(+), 20 deletions(-) diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsMessageFilterIntegrationTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsMessageFilterIntegrationTests.java index ab206ed5d..4b456818e 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsMessageFilterIntegrationTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsMessageFilterIntegrationTests.java @@ -6,6 +6,7 @@ import io.awspring.cloud.sqs.operations.SqsTemplate; import io.awspring.cloud.sqs.support.filter.MessageFilter; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,9 +28,10 @@ public class SqsMessageFilterIntegrationTests extends BaseSqsIntegrationTest { private static final Logger logger = LoggerFactory.getLogger(SqsMessageFilterIntegrationTests.class); - private static final String FILTER_QUEUE_NAME = "test-filter-queue"; + private static final String FILTER_QUEUE_PASS = "filter-queue-pass"; + private static final String FILTER_QUEUE_BLOCK = "filter-queue-block"; - static final String MESSAGE_FILTERED_FACTORY = "messageFilteredFactory"; + private static final String FILTERING_FACTORY = "filteringFactory"; @Autowired LatchContainer latchContainer; @@ -38,30 +40,37 @@ public class SqsMessageFilterIntegrationTests extends BaseSqsIntegrationTest { SqsTemplate sqsTemplate; @BeforeAll - static void beforeTests() { + static void setupQueues() { SqsAsyncClient client = createAsyncClient(); CompletableFuture.allOf( - createQueue(client, FILTER_QUEUE_NAME) + createQueue(client, FILTER_QUEUE_PASS), + createQueue(client, FILTER_QUEUE_BLOCK) ).join(); } record SampleRecord(String propertyOne, String propertyTwo) {} @Test - void shouldReceiveMessageThatPassesProcess() throws Exception { - sqsTemplate.send(FILTER_QUEUE_NAME, new SampleRecord("Hello!", "Filtered In")); - assertThat(latchContainer.messageReceivedLatch.await(10, TimeUnit.SECONDS)).isTrue(); + void shouldReceiveMessageThatPassesFilter() throws Exception { + sqsTemplate.send(FILTER_QUEUE_PASS, new SampleRecord("Hello", "Accepted")); + assertThat(latchContainer.latchForPass.await(10, TimeUnit.SECONDS)).isTrue(); } - // --- Configuration for test environment --- + @Test + void shouldNotReceiveMessageThatFailsFilter() throws Exception { + sqsTemplate.send(FILTER_QUEUE_BLOCK, new SampleRecord("NotHello", "Rejected")); + assertThat(latchContainer.latchForBlock.await(10, TimeUnit.SECONDS)).isFalse(); + } + + // Configuration @Import(SqsBootstrapConfiguration.class) @Configuration static class FilterTestConfig { - @Bean(name = MESSAGE_FILTERED_FACTORY) - public SqsMessageListenerContainerFactory filteredFactory() { + @Bean(name = FILTERING_FACTORY) + public SqsMessageListenerContainerFactory messageFilterFactory() { return SqsMessageListenerContainerFactory.builder() - .configure(options -> options.messageFilter(new SampleRecordFilter())) + .configure(options -> options.messageFilter(new AllowHelloOnlyFilter())) .sqsAsyncClientSupplier(BaseSqsIntegrationTest::createAsyncClient) .build(); } @@ -74,8 +83,13 @@ public SqsTemplate sqsTemplate() { } @Bean - public FilteringListener filteringListener() { - return new FilteringListener(); + public FilteringListenerPass filteringListenerPass() { + return new FilteringListenerPass(); + } + + @Bean + public FilteringListenerBlock filteringListenerBlock() { + return new FilteringListenerBlock(); } @Bean @@ -84,28 +98,44 @@ public LatchContainer latchContainer() { } } - static class SampleRecordFilter implements MessageFilter { + // Sample Filter + static class AllowHelloOnlyFilter implements MessageFilter { @Override public boolean process(Message message) { logger.info("Filtering message: {}", message.getPayload()); - logger.info(String.valueOf("Hello".equals(message.getPayload().propertyOne()))); return "Hello".equals(message.getPayload().propertyOne()); } } - static class FilteringListener { + // Listener for PASS case + static class FilteringListenerPass { + + @Autowired + LatchContainer latchContainer; + + @SqsListener(queueNames = FILTER_QUEUE_PASS, id = "filter-pass", factory = FILTERING_FACTORY) + void listen(SampleRecord record) { + logger.info("Received (pass): {}", record); + latchContainer.latchForPass.countDown(); + } + } + + // Listener for BLOCK case + static class FilteringListenerBlock { @Autowired LatchContainer latchContainer; - @SqsListener(queueNames = FILTER_QUEUE_NAME, id = "filter-test-listener", factory = MESSAGE_FILTERED_FACTORY) + @SqsListener(queueNames = FILTER_QUEUE_BLOCK, id = "filter-block", factory = FILTERING_FACTORY) void listen(SampleRecord record) { - logger.info("Received message: {}", record); - latchContainer.messageReceivedLatch.countDown(); + logger.info("Received (block): {}", record); + latchContainer.latchForBlock.countDown(); } } + // Shared latch static class LatchContainer { - CountDownLatch messageReceivedLatch = new CountDownLatch(1); + final CountDownLatch latchForPass = new CountDownLatch(1); + final CountDownLatch latchForBlock = new CountDownLatch(1); } } From dcb93e9bc6c91b1b745e3d82f19f92590560750f Mon Sep 17 00:00:00 2001 From: daniel kim Date: Thu, 22 May 2025 09:57:43 +0900 Subject: [PATCH 10/12] fix : apply lint and add comment for adaptFilter --- .../AbstractMessageListenerContainerFactory.java | 3 --- .../cloud/sqs/listener/AbstractContainerOptions.java | 6 +++--- .../listener/AbstractMessageListenerContainer.java | 2 +- .../cloud/sqs/listener/AsyncComponentAdapters.java | 11 +++++++++-- .../awspring/cloud/sqs/listener/ContainerOptions.java | 2 +- 5 files changed, 14 insertions(+), 10 deletions(-) diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/AbstractMessageListenerContainerFactory.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/AbstractMessageListenerContainerFactory.java index 98fbb5241..09ce153d8 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/AbstractMessageListenerContainerFactory.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/AbstractMessageListenerContainerFactory.java @@ -23,8 +23,6 @@ import io.awspring.cloud.sqs.listener.errorhandler.ErrorHandler; import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor; import io.awspring.cloud.sqs.listener.interceptor.MessageInterceptor; -import io.awspring.cloud.sqs.support.filter.DefaultMessageFilter; -import io.awspring.cloud.sqs.support.filter.MessageFilter; import org.springframework.messaging.Message; import org.springframework.util.Assert; @@ -176,7 +174,6 @@ public C createContainer(Endpoint endpoint) { B options = this.containerOptionsBuilder.createCopy(); configure(endpoint, options); C container = createContainerInstance(endpoint, options.build()); - endpoint.setupContainer(container); configureContainer(container, endpoint); return container; diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java index c0626680d..e5adaa0fc 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java @@ -64,7 +64,6 @@ public abstract class AbstractContainerOptions, private final MessageFilter messageFilter; - @Nullable private final AcknowledgementOrdering acknowledgementOrdering; @@ -173,7 +172,9 @@ public MessagingMessageConverter getMessageConverter() { } @Override - public MessageFilter getMessageFilter() {return this.messageFilter; } + public MessageFilter getMessageFilter() { + return this.messageFilter; + } @Nullable @Override @@ -421,7 +422,6 @@ public B messageFilter(MessageFilter messageFilter) { return self(); } - @SuppressWarnings("unchecked") private B self() { return (B) this; diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java index a3437c2c5..8868e402a 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java @@ -135,7 +135,7 @@ public void addMessageInterceptor(AsyncMessageInterceptor messageInterceptor) @Override public void setMessageListener(MessageListener messageListener) { Assert.notNull(messageListener, "messageListener cannot be null"); - if(containerOptions.getMessageFilter() instanceof DefaultMessageFilter) { + if (containerOptions.getMessageFilter() instanceof DefaultMessageFilter) { this.messageListener = AsyncComponentAdapters.adapt(messageListener); } else { this.messageListener = AsyncComponentAdapters.adaptFilter(messageListener, (MessageFilter) containerOptions.getMessageFilter()); diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AsyncComponentAdapters.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AsyncComponentAdapters.java index fbc42ee7b..a3a011fb9 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AsyncComponentAdapters.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AsyncComponentAdapters.java @@ -78,6 +78,15 @@ public static AsyncMessageListener adapt(MessageListener messageListen return new BlockingMessageListenerAdapter<>(messageListener); } + /** + * Adapt the provided {@link MessageListener} and {@link MessageFilter} into a single {@link AsyncMessageListener} + * that only forwards messages passing the filter. + * + * @param messageListener the message listener to be adapted + * @param messageFilter the filter used to evaluate incoming messages + * @param the message payload type + * @return the adapted and filtered async message listener + */ public static AsyncMessageListener adaptFilter(MessageListener messageListener, MessageFilter messageFilter) { return new FilteredMessageListenerAdapter<>(messageListener, messageFilter); } @@ -237,7 +246,6 @@ public CompletableFuture onMessage(Message message) { return execute(() -> this.filteredMessageListener.onMessage(message)); } else { - logger.debug("Message filtered out: {}", message.getPayload()); return CompletableFuture.completedFuture(null); } } @@ -249,7 +257,6 @@ public CompletableFuture onMessage(Collection> messages) { .toList(); if (filteredMessages.isEmpty()) { - logger.debug("All messages were filtered out."); return CompletableFuture.completedFuture(null); } diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptions.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptions.java index 9d3471bd0..c5390c237 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptions.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptions.java @@ -143,7 +143,7 @@ default BackOffPolicy getPollBackOffPolicy() { /** Return the message filter applied before message processing. * @return the message filter. - * */ + */ MessageFilter getMessageFilter(); /** From 37e7ee7d788f3279eda94af0d88394d98a56c3ca Mon Sep 17 00:00:00 2001 From: daniel kim Date: Tue, 3 Jun 2025 14:35:56 +0900 Subject: [PATCH 11/12] fix : replace stream filter with for loop for performance on small message batches --- .../cloud/sqs/listener/AsyncComponentAdapters.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AsyncComponentAdapters.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AsyncComponentAdapters.java index a3a011fb9..24f388cd7 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AsyncComponentAdapters.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AsyncComponentAdapters.java @@ -23,7 +23,10 @@ import io.awspring.cloud.sqs.listener.errorhandler.ErrorHandler; import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor; import io.awspring.cloud.sqs.listener.interceptor.MessageInterceptor; + +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.function.Supplier; @@ -81,7 +84,6 @@ public static AsyncMessageListener adapt(MessageListener messageListen /** * Adapt the provided {@link MessageListener} and {@link MessageFilter} into a single {@link AsyncMessageListener} * that only forwards messages passing the filter. - * * @param messageListener the message listener to be adapted * @param messageFilter the filter used to evaluate incoming messages * @param the message payload type @@ -252,9 +254,12 @@ public CompletableFuture onMessage(Message message) { @Override public CompletableFuture onMessage(Collection> messages) { - Collection> filteredMessages = messages.stream() - .filter(filter::process) - .toList(); + List> filteredMessages = new ArrayList<>(); + for (Message message : messages) { + if (filter.process(message)) { + filteredMessages.add(message); + } + } if (filteredMessages.isEmpty()) { return CompletableFuture.completedFuture(null); From 1f97ca4aed8a2d24a33dd47f472f56e25a9d49d1 Mon Sep 17 00:00:00 2001 From: daniel kim Date: Tue, 3 Jun 2025 18:17:22 +0900 Subject: [PATCH 12/12] fix: remove message filtering from MessageListener and delegate it to the MessageSink --- .../AbstractMessageListenerContainer.java | 8 +-- .../sqs/listener/AsyncComponentAdapters.java | 65 ++----------------- .../sqs/listener/FifoSqsComponentFactory.java | 16 +++-- .../listener/StandardSqsComponentFactory.java | 13 ++-- .../sink/FilteredBatchMessageSink.java | 35 ++++++++++ .../sink/FilteredFanOutMessageSink.java | 35 ++++++++++ .../sink/FilteredOrderedMessageSink.java | 35 ++++++++++ 7 files changed, 127 insertions(+), 80 deletions(-) create mode 100644 spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/sink/FilteredBatchMessageSink.java create mode 100644 spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/sink/FilteredFanOutMessageSink.java create mode 100644 spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/sink/FilteredOrderedMessageSink.java diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java index 8868e402a..952844761 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java @@ -21,8 +21,6 @@ import io.awspring.cloud.sqs.listener.errorhandler.ErrorHandler; import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor; import io.awspring.cloud.sqs.listener.interceptor.MessageInterceptor; -import io.awspring.cloud.sqs.support.filter.DefaultMessageFilter; -import io.awspring.cloud.sqs.support.filter.MessageFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.SmartLifecycle; @@ -135,11 +133,7 @@ public void addMessageInterceptor(AsyncMessageInterceptor messageInterceptor) @Override public void setMessageListener(MessageListener messageListener) { Assert.notNull(messageListener, "messageListener cannot be null"); - if (containerOptions.getMessageFilter() instanceof DefaultMessageFilter) { - this.messageListener = AsyncComponentAdapters.adapt(messageListener); - } else { - this.messageListener = AsyncComponentAdapters.adaptFilter(messageListener, (MessageFilter) containerOptions.getMessageFilter()); - } + this.messageListener = AsyncComponentAdapters.adapt(messageListener); } @Override diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AsyncComponentAdapters.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AsyncComponentAdapters.java index 24f388cd7..a11eca8bf 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AsyncComponentAdapters.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AsyncComponentAdapters.java @@ -23,21 +23,17 @@ import io.awspring.cloud.sqs.listener.errorhandler.ErrorHandler; import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor; import io.awspring.cloud.sqs.listener.interceptor.MessageInterceptor; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.function.Supplier; - -import io.awspring.cloud.sqs.support.filter.MessageFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.task.TaskExecutor; import org.springframework.messaging.Message; import org.springframework.util.Assert; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.function.Supplier; + /** * Utility class for adapting blocking components to their asynchronous variants. * @@ -81,18 +77,6 @@ public static AsyncMessageListener adapt(MessageListener messageListen return new BlockingMessageListenerAdapter<>(messageListener); } - /** - * Adapt the provided {@link MessageListener} and {@link MessageFilter} into a single {@link AsyncMessageListener} - * that only forwards messages passing the filter. - * @param messageListener the message listener to be adapted - * @param messageFilter the filter used to evaluate incoming messages - * @param the message payload type - * @return the adapted and filtered async message listener - */ - public static AsyncMessageListener adaptFilter(MessageListener messageListener, MessageFilter messageFilter) { - return new FilteredMessageListenerAdapter<>(messageListener, messageFilter); - } - public static AsyncAcknowledgementResultCallback adapt( AcknowledgementResultCallback acknowledgementResultCallback) { return new BlockingAcknowledgementResultCallbackAdapter<>(acknowledgementResultCallback); @@ -231,45 +215,6 @@ public CompletableFuture onMessage(Collection> messages) { } } - private static class FilteredMessageListenerAdapter extends AbstractThreadingComponentAdapter - implements AsyncMessageListener { - - private final MessageListener filteredMessageListener; - private final MessageFilter filter; - - public FilteredMessageListenerAdapter(MessageListener filteredMessageListener, MessageFilter filter) { - this.filteredMessageListener = filteredMessageListener; - this.filter = filter; - } - - @Override - public CompletableFuture onMessage(Message message) { - if (filter.process(message)) { - return execute(() -> this.filteredMessageListener.onMessage(message)); - } - else { - return CompletableFuture.completedFuture(null); - } - } - - @Override - public CompletableFuture onMessage(Collection> messages) { - List> filteredMessages = new ArrayList<>(); - for (Message message : messages) { - if (filter.process(message)) { - filteredMessages.add(message); - } - } - - if (filteredMessages.isEmpty()) { - return CompletableFuture.completedFuture(null); - } - - return execute(() -> this.filteredMessageListener.onMessage(filteredMessages)); - } - } - - private static class BlockingErrorHandlerAdapter extends AbstractThreadingComponentAdapter implements AsyncErrorHandler { diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/FifoSqsComponentFactory.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/FifoSqsComponentFactory.java index 3ed94b16a..9c4c08a7f 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/FifoSqsComponentFactory.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/FifoSqsComponentFactory.java @@ -22,9 +22,7 @@ import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementProcessor; import io.awspring.cloud.sqs.listener.acknowledgement.BatchingAcknowledgementProcessor; import io.awspring.cloud.sqs.listener.acknowledgement.ImmediateAcknowledgementProcessor; -import io.awspring.cloud.sqs.listener.sink.BatchMessageSink; -import io.awspring.cloud.sqs.listener.sink.MessageSink; -import io.awspring.cloud.sqs.listener.sink.OrderedMessageSink; +import io.awspring.cloud.sqs.listener.sink.*; import io.awspring.cloud.sqs.listener.sink.adapter.MessageGroupingSinkAdapter; import io.awspring.cloud.sqs.listener.sink.adapter.MessageVisibilityExtendingSinkAdapter; import io.awspring.cloud.sqs.listener.source.FifoSqsMessageSource; @@ -32,6 +30,9 @@ import java.time.Duration; import java.util.Collection; import java.util.function.Function; + +import io.awspring.cloud.sqs.support.filter.DefaultMessageFilter; +import io.awspring.cloud.sqs.support.filter.MessageFilter; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.util.Assert; @@ -69,7 +70,7 @@ public MessageSource createMessageSource(SqsContainerOptions options) { @Override public MessageSink createMessageSink(SqsContainerOptions options) { - MessageSink deliverySink = createDeliverySink(options.getListenerMode()); + MessageSink deliverySink = createDeliverySink(options.getListenerMode(), options); MessageSink wrappedDeliverySink = maybeWrapWithVisibilityAdapter(deliverySink, options.getMessageVisibility()); return maybeWrapWithMessageGroupingAdapter(options, wrappedDeliverySink); @@ -84,10 +85,11 @@ private MessageSink maybeWrapWithMessageGroupingAdapter(SqsContainerOptions o } // @formatter:off - private MessageSink createDeliverySink(ListenerMode listenerMode) { + private MessageSink createDeliverySink(ListenerMode listenerMode, SqsContainerOptions options) { + MessageFilter filter = (MessageFilter) options.getMessageFilter(); return ListenerMode.SINGLE_MESSAGE.equals(listenerMode) - ? new OrderedMessageSink<>() - : new BatchMessageSink<>(); + ? new FilteredOrderedMessageSink<>(filter) + : new FilteredBatchMessageSink<>(filter); } private MessageSink maybeWrapWithVisibilityAdapter(MessageSink deliverySink, @Nullable Duration messageVisibility) { diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/StandardSqsComponentFactory.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/StandardSqsComponentFactory.java index cba188c70..494d136fa 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/StandardSqsComponentFactory.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/StandardSqsComponentFactory.java @@ -21,13 +21,14 @@ import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementProcessor; import io.awspring.cloud.sqs.listener.acknowledgement.BatchingAcknowledgementProcessor; import io.awspring.cloud.sqs.listener.acknowledgement.ImmediateAcknowledgementProcessor; -import io.awspring.cloud.sqs.listener.sink.BatchMessageSink; -import io.awspring.cloud.sqs.listener.sink.FanOutMessageSink; -import io.awspring.cloud.sqs.listener.sink.MessageSink; +import io.awspring.cloud.sqs.listener.sink.*; import io.awspring.cloud.sqs.listener.source.MessageSource; import io.awspring.cloud.sqs.listener.source.StandardSqsMessageSource; import java.time.Duration; import java.util.Collection; + +import io.awspring.cloud.sqs.support.filter.DefaultMessageFilter; +import io.awspring.cloud.sqs.support.filter.MessageFilter; import org.springframework.util.Assert; /** @@ -57,12 +58,12 @@ public MessageSource createMessageSource(SqsContainerOptions options) { // @formatter:off @Override public MessageSink createMessageSink(SqsContainerOptions options) { + MessageFilter filter = (MessageFilter) options.getMessageFilter(); return ListenerMode.SINGLE_MESSAGE.equals(options.getListenerMode()) - ? new FanOutMessageSink<>() - : new BatchMessageSink<>(); + ? new FilteredFanOutMessageSink<>(filter) : new FilteredBatchMessageSink<>(filter); } - // @formatter:on + // @formatter:on @Override public AcknowledgementProcessor createAcknowledgementProcessor(SqsContainerOptions options) { validateAcknowledgementOrdering(options); diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/sink/FilteredBatchMessageSink.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/sink/FilteredBatchMessageSink.java new file mode 100644 index 000000000..6188bd879 --- /dev/null +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/sink/FilteredBatchMessageSink.java @@ -0,0 +1,35 @@ +package io.awspring.cloud.sqs.listener.sink; + +import io.awspring.cloud.sqs.listener.MessageProcessingContext; +import io.awspring.cloud.sqs.support.filter.MessageFilter; +import org.springframework.messaging.Message; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +public class FilteredBatchMessageSink extends BatchMessageSink { + + private final MessageFilter filter; + + public FilteredBatchMessageSink(MessageFilter filter) { + this.filter = filter; + } + + @Override + protected CompletableFuture doEmit(Collection> messages, MessageProcessingContext context) { + List> filtered = new ArrayList<>(messages.size()); + for (Message message : messages) { + if (filter.process(message)) { + filtered.add(message); + } + } + + if (filtered.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + + return super.doEmit(filtered, context); + } +} diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/sink/FilteredFanOutMessageSink.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/sink/FilteredFanOutMessageSink.java new file mode 100644 index 000000000..a1a73bb56 --- /dev/null +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/sink/FilteredFanOutMessageSink.java @@ -0,0 +1,35 @@ +package io.awspring.cloud.sqs.listener.sink; + +import io.awspring.cloud.sqs.listener.MessageProcessingContext; +import io.awspring.cloud.sqs.support.filter.MessageFilter; +import org.springframework.messaging.Message; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +public class FilteredFanOutMessageSink extends FanOutMessageSink { + + private final MessageFilter filter; + + public FilteredFanOutMessageSink(MessageFilter filter) { + this.filter = filter; + } + + @Override + protected CompletableFuture doEmit(Collection> messages, MessageProcessingContext context) { + List> filtered = new ArrayList<>(messages.size()); + for (Message message : messages) { + if (filter.process(message)) { + filtered.add(message); + } + } + + if (filtered.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + + return super.doEmit(filtered, context); + } +} diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/sink/FilteredOrderedMessageSink.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/sink/FilteredOrderedMessageSink.java new file mode 100644 index 000000000..521170a99 --- /dev/null +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/sink/FilteredOrderedMessageSink.java @@ -0,0 +1,35 @@ +package io.awspring.cloud.sqs.listener.sink; + +import io.awspring.cloud.sqs.listener.MessageProcessingContext; +import io.awspring.cloud.sqs.support.filter.MessageFilter; +import org.springframework.messaging.Message; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +public class FilteredOrderedMessageSink extends OrderedMessageSink { + + private final MessageFilter filter; + + public FilteredOrderedMessageSink(MessageFilter filter) { + this.filter = filter; + } + + @Override + protected CompletableFuture doEmit(Collection> messages, MessageProcessingContext context) { + List> filtered = new ArrayList<>(messages.size()); + for (Message message : messages) { + if (filter.process(message)) { + filtered.add(message); + } + } + + if (filtered.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + + return super.doEmit(filtered, context); + } +}