Skip to content

Add FilteringAdapter in SQS #1388

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,20 @@
package io.awspring.cloud.sqs.config;

import io.awspring.cloud.sqs.ConfigUtils;
import io.awspring.cloud.sqs.listener.AbstractMessageListenerContainer;
import io.awspring.cloud.sqs.listener.AsyncComponentAdapters;
import io.awspring.cloud.sqs.listener.AsyncMessageListener;
import io.awspring.cloud.sqs.listener.ContainerComponentFactory;
import io.awspring.cloud.sqs.listener.ContainerOptions;
import io.awspring.cloud.sqs.listener.ContainerOptionsBuilder;
import io.awspring.cloud.sqs.listener.MessageListener;
import io.awspring.cloud.sqs.listener.MessageListenerContainer;
import io.awspring.cloud.sqs.listener.*;
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementResultCallback;
import io.awspring.cloud.sqs.listener.acknowledgement.AsyncAcknowledgementResultCallback;
import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler;
import io.awspring.cloud.sqs.listener.errorhandler.ErrorHandler;
import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor;
import io.awspring.cloud.sqs.listener.interceptor.MessageInterceptor;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.function.Consumer;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

/**
* Base implementation for a {@link MessageListenerContainerFactory}. Contains the components and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,24 @@

import io.awspring.cloud.sqs.ConfigUtils;
import io.awspring.cloud.sqs.annotation.SqsListener;
import io.awspring.cloud.sqs.listener.AsyncMessageListener;
import io.awspring.cloud.sqs.listener.ContainerComponentFactory;
import io.awspring.cloud.sqs.listener.ContainerOptions;
import io.awspring.cloud.sqs.listener.MessageListener;
import io.awspring.cloud.sqs.listener.SqsContainerOptions;
import io.awspring.cloud.sqs.listener.SqsContainerOptionsBuilder;
import io.awspring.cloud.sqs.listener.SqsMessageListenerContainer;
import io.awspring.cloud.sqs.listener.*;
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementResultCallback;
import io.awspring.cloud.sqs.listener.acknowledgement.AsyncAcknowledgementResultCallback;
import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler;
import io.awspring.cloud.sqs.listener.errorhandler.ErrorHandler;
import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor;
import io.awspring.cloud.sqs.listener.interceptor.MessageInterceptor;
import java.util.ArrayList;
import java.util.Collection;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;

import java.util.ArrayList;
import java.util.Collection;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
* {@link MessageListenerContainerFactory} implementation for creating {@link SqsMessageListenerContainer} instances. A
* factory can be assigned to a {@link io.awspring.cloud.sqs.annotation.SqsListener @SqsListener} by using the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter;
import io.awspring.cloud.sqs.support.converter.SqsMessagingMessageConverter;
import java.time.Duration;

import io.awspring.cloud.sqs.support.filter.DefaultMessageFilter;
import io.awspring.cloud.sqs.support.filter.MessageFilter;
import org.springframework.core.task.TaskExecutor;
import org.springframework.lang.Nullable;
import org.springframework.retry.backoff.BackOffPolicy;
Expand Down Expand Up @@ -59,6 +62,8 @@ public abstract class AbstractContainerOptions<O extends ContainerOptions<O, B>,

private final AcknowledgementMode acknowledgementMode;

private final MessageFilter<?> messageFilter;

@Nullable
private final AcknowledgementOrdering acknowledgementOrdering;

Expand Down Expand Up @@ -92,6 +97,8 @@ protected AbstractContainerOptions(Builder<?, ?> builder) {
this.acknowledgementThreshold = builder.acknowledgementThreshold;
this.componentsTaskExecutor = builder.componentsTaskExecutor;
this.acknowledgementResultTaskExecutor = builder.acknowledgementResultTaskExecutor;
this.messageFilter = builder.messageFilter;

Assert.isTrue(this.maxMessagesPerPoll <= this.maxConcurrentMessages, String.format(
"messagesPerPoll should be less than or equal to maxConcurrentMessages. Values provided: %s and %s respectively",
this.maxMessagesPerPoll, this.maxConcurrentMessages));
Expand Down Expand Up @@ -164,6 +171,11 @@ public MessagingMessageConverter<?> getMessageConverter() {
return this.messageConverter;
}

@Override
public MessageFilter<?> getMessageFilter() {
return this.messageFilter;
}

@Nullable
@Override
public Duration getAcknowledgementInterval() {
Expand Down Expand Up @@ -244,6 +256,8 @@ protected abstract static class Builder<B extends ContainerOptionsBuilder<B, O>,

private AcknowledgementMode acknowledgementMode = DEFAULT_ACKNOWLEDGEMENT_MODE;

private MessageFilter<?> messageFilter = new DefaultMessageFilter<>();

@Nullable
private AcknowledgementOrdering acknowledgementOrdering;

Expand Down Expand Up @@ -280,6 +294,7 @@ protected Builder(AbstractContainerOptions<?, ?> options) {
this.acknowledgementThreshold = options.acknowledgementThreshold;
this.componentsTaskExecutor = options.componentsTaskExecutor;
this.acknowledgementResultTaskExecutor = options.acknowledgementResultTaskExecutor;
this.messageFilter = options.messageFilter;
}

@Override
Expand Down Expand Up @@ -400,6 +415,13 @@ public B messageConverter(MessagingMessageConverter<?> messageConverter) {
return self();
}

@Override
public B messageFilter(MessageFilter<?> messageFilter) {
Assert.notNull(messageFilter, "messageFilter cannot be null");
this.messageFilter = messageFilter;
return self();
}

@SuppressWarnings("unchecked")
private B self() {
return (B) this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,19 @@
import io.awspring.cloud.sqs.listener.errorhandler.ErrorHandler;
import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor;
import io.awspring.cloud.sqs.listener.interceptor.MessageInterceptor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.SmartLifecycle;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.function.Consumer;

/**
* Base implementation for {@link MessageListenerContainer} with {@link SmartLifecycle} and component management
* capabilities.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@
import io.awspring.cloud.sqs.listener.errorhandler.ErrorHandler;
import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor;
import io.awspring.cloud.sqs.listener.interceptor.MessageInterceptor;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.task.TaskExecutor;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Supplier;

/**
* Utility class for adapting blocking components to their asynchronous variants.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter;
import java.time.Duration;
import java.util.Collection;

import io.awspring.cloud.sqs.support.filter.MessageFilter;
import org.springframework.core.task.TaskExecutor;
import org.springframework.lang.Nullable;
import org.springframework.retry.backoff.BackOffPolicy;
Expand Down Expand Up @@ -139,6 +141,11 @@ default BackOffPolicy getPollBackOffPolicy() {
*/
MessagingMessageConverter<?> getMessageConverter();

/** Return the message filter applied before message processing.
* @return the message filter.
*/
MessageFilter<?> getMessageFilter();

/**
* Return the maximum interval between acknowledgements for batch acknowledgements.
* @return the interval.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementMode;
import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter;
import java.time.Duration;

import io.awspring.cloud.sqs.support.filter.MessageFilter;
import org.springframework.core.task.TaskExecutor;
import org.springframework.retry.backoff.BackOffPolicy;

Expand Down Expand Up @@ -187,6 +189,14 @@ default B pollBackOffPolicy(BackOffPolicy pollBackOffPolicy) {
*/
B messageConverter(MessagingMessageConverter<?> messageConverter);

/**
* Set the {@link MessagingMessageConverter} for this container.
*
* @param messageFilter the message filter.
* @return this instance.
*/
B messageFilter(MessageFilter<?> messageFilter);

/**
* Create the {@link ContainerOptions} instance.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementProcessor;
import io.awspring.cloud.sqs.listener.acknowledgement.BatchingAcknowledgementProcessor;
import io.awspring.cloud.sqs.listener.acknowledgement.ImmediateAcknowledgementProcessor;
import io.awspring.cloud.sqs.listener.sink.BatchMessageSink;
import io.awspring.cloud.sqs.listener.sink.MessageSink;
import io.awspring.cloud.sqs.listener.sink.OrderedMessageSink;
import io.awspring.cloud.sqs.listener.sink.*;
import io.awspring.cloud.sqs.listener.sink.adapter.MessageGroupingSinkAdapter;
import io.awspring.cloud.sqs.listener.sink.adapter.MessageVisibilityExtendingSinkAdapter;
import io.awspring.cloud.sqs.listener.source.FifoSqsMessageSource;
import io.awspring.cloud.sqs.listener.source.MessageSource;
import java.time.Duration;
import java.util.Collection;
import java.util.function.Function;

import io.awspring.cloud.sqs.support.filter.DefaultMessageFilter;
import io.awspring.cloud.sqs.support.filter.MessageFilter;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -69,7 +70,7 @@ public MessageSource<T> createMessageSource(SqsContainerOptions options) {

@Override
public MessageSink<T> createMessageSink(SqsContainerOptions options) {
MessageSink<T> deliverySink = createDeliverySink(options.getListenerMode());
MessageSink<T> deliverySink = createDeliverySink(options.getListenerMode(), options);
MessageSink<T> wrappedDeliverySink = maybeWrapWithVisibilityAdapter(deliverySink,
options.getMessageVisibility());
return maybeWrapWithMessageGroupingAdapter(options, wrappedDeliverySink);
Expand All @@ -84,10 +85,11 @@ private MessageSink<T> maybeWrapWithMessageGroupingAdapter(SqsContainerOptions o
}

// @formatter:off
private MessageSink<T> createDeliverySink(ListenerMode listenerMode) {
private MessageSink<T> createDeliverySink(ListenerMode listenerMode, SqsContainerOptions options) {
MessageFilter<T> filter = (MessageFilter<T>) options.getMessageFilter();
return ListenerMode.SINGLE_MESSAGE.equals(listenerMode)
? new OrderedMessageSink<>()
: new BatchMessageSink<>();
? new FilteredOrderedMessageSink<>(filter)
: new FilteredBatchMessageSink<>(filter);
}

private MessageSink<T> maybeWrapWithVisibilityAdapter(MessageSink<T> deliverySink, @Nullable Duration messageVisibility) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementProcessor;
import io.awspring.cloud.sqs.listener.acknowledgement.BatchingAcknowledgementProcessor;
import io.awspring.cloud.sqs.listener.acknowledgement.ImmediateAcknowledgementProcessor;
import io.awspring.cloud.sqs.listener.sink.BatchMessageSink;
import io.awspring.cloud.sqs.listener.sink.FanOutMessageSink;
import io.awspring.cloud.sqs.listener.sink.MessageSink;
import io.awspring.cloud.sqs.listener.sink.*;
import io.awspring.cloud.sqs.listener.source.MessageSource;
import io.awspring.cloud.sqs.listener.source.StandardSqsMessageSource;
import java.time.Duration;
import java.util.Collection;

import io.awspring.cloud.sqs.support.filter.DefaultMessageFilter;
import io.awspring.cloud.sqs.support.filter.MessageFilter;
import org.springframework.util.Assert;

/**
Expand Down Expand Up @@ -57,12 +58,12 @@ public MessageSource<T> createMessageSource(SqsContainerOptions options) {
// @formatter:off
@Override
public MessageSink<T> createMessageSink(SqsContainerOptions options) {
MessageFilter<T> filter = (MessageFilter<T>) options.getMessageFilter();
return ListenerMode.SINGLE_MESSAGE.equals(options.getListenerMode())
? new FanOutMessageSink<>()
: new BatchMessageSink<>();
? new FilteredFanOutMessageSink<>(filter) : new FilteredBatchMessageSink<>(filter);
}
// @formatter:on

// @formatter:on
@Override
public AcknowledgementProcessor<T> createAcknowledgementProcessor(SqsContainerOptions options) {
validateAcknowledgementOrdering(options);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.awspring.cloud.sqs.listener.sink;

import io.awspring.cloud.sqs.listener.MessageProcessingContext;
import io.awspring.cloud.sqs.support.filter.MessageFilter;
import org.springframework.messaging.Message;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;

public class FilteredBatchMessageSink<T> extends BatchMessageSink<T> {

private final MessageFilter<T> filter;

public FilteredBatchMessageSink(MessageFilter<T> filter) {
this.filter = filter;
}

@Override
protected CompletableFuture<Void> doEmit(Collection<Message<T>> messages, MessageProcessingContext<T> context) {
List<Message<T>> filtered = new ArrayList<>(messages.size());
for (Message<T> message : messages) {
if (filter.process(message)) {
filtered.add(message);
}
}

if (filtered.isEmpty()) {
return CompletableFuture.completedFuture(null);
}

return super.doEmit(filtered, context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.awspring.cloud.sqs.listener.sink;

import io.awspring.cloud.sqs.listener.MessageProcessingContext;
import io.awspring.cloud.sqs.support.filter.MessageFilter;
import org.springframework.messaging.Message;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;

public class FilteredFanOutMessageSink<T> extends FanOutMessageSink<T> {

private final MessageFilter<T> filter;

public FilteredFanOutMessageSink(MessageFilter<T> filter) {
this.filter = filter;
}

@Override
protected CompletableFuture<Void> doEmit(Collection<Message<T>> messages, MessageProcessingContext<T> context) {
List<Message<T>> filtered = new ArrayList<>(messages.size());
for (Message<T> message : messages) {
if (filter.process(message)) {
filtered.add(message);
}
}

if (filtered.isEmpty()) {
return CompletableFuture.completedFuture(null);
}

return super.doEmit(filtered, context);
}
}
Loading