Skip to content

Commit 3053ed0

Browse files
brun0-4ugustorafaelcgpava
authored andcommitted
ImmediateRetryAsyncErrorHandler changes msg visibility timeout to zero. (#1314)
1 parent 89bc6f2 commit 3053ed0

File tree

4 files changed

+424
-0
lines changed

4 files changed

+424
-0
lines changed

docs/src/main/asciidoc/sqs.adoc

+28
Original file line numberDiff line numberDiff line change
@@ -1248,6 +1248,34 @@ If the error handler execution succeeds, i.e. no error is thrown from the error
12481248
IMPORTANT: If the message should not be acknowledged and the `ON_SUCCESS` acknowledgement mode is set, it's important to propagate the error.
12491249
For simply executing an action in case of errors, an `interceptor` should be used instead, checking the presence of the `throwable` argument for detecting a failed execution.
12501250

1251+
==== Immediate Retry Error Handler
1252+
As mentioned in <<Error Handling>>, by default, messages that cause an error in the listener are only retried after the visibility timeout has expired.
1253+
1254+
Starting with version 3.4, Spring Cloud AWS SQS includes the `ImmediateRetryAsyncErrorHandler`, which sets the visibility timeout to zero to enable immediate retry of failed messages.
1255+
1256+
When using auto-configured factory, simply declare a `@Bean` and the error handler will be set
1257+
1258+
[source, java]
1259+
----
1260+
@Bean
1261+
public AsyncErrorHandler<Object> asyncErrorHandler() {
1262+
return new ImmediateRetryAsyncErrorHandler<>();
1263+
}
1264+
----
1265+
1266+
Alternatively, `ImmediateRetryAsyncErrorHandler` can be set in the `MessageListenerContainerFactory` or directly in the `MessageListenerContainer`:
1267+
1268+
[source, java]
1269+
----
1270+
@Bean
1271+
public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory() {
1272+
return SqsMessageListenerContainerFactory
1273+
.builder()
1274+
.sqsAsyncClientSupplier(BaseSqsIntegrationTest::createAsyncClient)
1275+
.errorHandler(new ImmediateRetryAsyncErrorHandler<>())
1276+
.build();
1277+
}
1278+
----
12511279

12521280
=== Message Conversion and Payload Deserialization
12531281

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright 2013-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sqs.listener.errorhandler;
17+
18+
import io.awspring.cloud.sqs.MessageHeaderUtils;
19+
import io.awspring.cloud.sqs.listener.QueueMessageVisibility;
20+
import io.awspring.cloud.sqs.listener.SqsHeaders;
21+
import io.awspring.cloud.sqs.listener.Visibility;
22+
import java.util.Collection;
23+
import java.util.concurrent.CompletableFuture;
24+
import java.util.stream.Collectors;
25+
26+
import org.springframework.messaging.Message;
27+
28+
/**
29+
* A default error handler implementation for asynchronous message processing.
30+
*
31+
* <p>
32+
* This error handler attempts to set the SQS message visibility timeout to zero whenever an exception occurs,
33+
* effectively making the message immediately available for reprocessing.
34+
*
35+
* <p>
36+
* When AcknowledgementMode is set to ON_SUCCESS (the default value),
37+
* returning a failed future will prevent the message from being acknowledged
38+
*
39+
* @author Bruno Garcia
40+
* @author Rafael Pavarini
41+
*/
42+
public class ImmediateRetryAsyncErrorHandler<T> implements AsyncErrorHandler<T> {
43+
44+
@Override
45+
public CompletableFuture<Void> handle(Message<T> message, Throwable t) {
46+
return changeTimeoutToZero(message)
47+
.thenCompose(theVoid -> CompletableFuture.failedFuture(t));
48+
}
49+
50+
@Override
51+
public CompletableFuture<Void> handle(Collection<Message<T>> messages, Throwable t) {
52+
return changeTimeoutToZero(messages)
53+
.thenCompose(theVoid -> CompletableFuture.failedFuture(t));
54+
55+
}
56+
57+
private CompletableFuture<Void> changeTimeoutToZero(Message<T> message) {
58+
Visibility visibilityTimeout = getVisibilityTimeout(message);
59+
return visibilityTimeout.changeToAsync(0);
60+
}
61+
62+
private CompletableFuture<Void> changeTimeoutToZero(Collection<Message<T>> messages) {
63+
QueueMessageVisibility firstVisibilityMessage = (QueueMessageVisibility) getVisibilityTimeout(messages.iterator().next());
64+
65+
Collection<Message<?>> castMessages = messages.stream()
66+
.map(m -> (Message<?>) m)
67+
.collect(Collectors.toList());
68+
69+
return firstVisibilityMessage.toBatchVisibility(castMessages).changeToAsync(0);
70+
}
71+
72+
private Visibility getVisibilityTimeout(Message<T> message) {
73+
return MessageHeaderUtils.getHeader(message, SqsHeaders.SQS_VISIBILITY_TIMEOUT_HEADER, Visibility.class);
74+
}
75+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
package io.awspring.cloud.sqs.integration;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import io.awspring.cloud.sqs.MessageHeaderUtils;
5+
import io.awspring.cloud.sqs.annotation.SqsListener;
6+
import io.awspring.cloud.sqs.config.SqsBootstrapConfiguration;
7+
import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory;
8+
import io.awspring.cloud.sqs.listener.SqsHeaders;
9+
import io.awspring.cloud.sqs.listener.errorhandler.ImmediateRetryAsyncErrorHandler;
10+
import io.awspring.cloud.sqs.operations.SqsTemplate;
11+
import org.junit.jupiter.api.BeforeAll;
12+
import org.junit.jupiter.api.Test;
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
15+
import org.springframework.beans.factory.annotation.Autowired;
16+
import org.springframework.boot.test.context.SpringBootTest;
17+
import org.springframework.context.annotation.Bean;
18+
import org.springframework.context.annotation.Configuration;
19+
import org.springframework.context.annotation.Import;
20+
import org.springframework.messaging.Message;
21+
import org.springframework.messaging.handler.annotation.Header;
22+
import org.springframework.messaging.support.MessageBuilder;
23+
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
24+
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
25+
26+
import java.time.Duration;
27+
import java.util.Collections;
28+
import java.util.List;
29+
import java.util.Map;
30+
import java.util.UUID;
31+
import java.util.concurrent.CompletableFuture;
32+
import java.util.concurrent.ConcurrentHashMap;
33+
import java.util.concurrent.CountDownLatch;
34+
import java.util.concurrent.TimeUnit;
35+
import java.util.stream.Collectors;
36+
import java.util.stream.IntStream;
37+
38+
import static java.util.Collections.singletonMap;
39+
import static org.assertj.core.api.Assertions.assertThat;
40+
41+
/**
42+
* Integration tests for SQS ErrorHandler integration.
43+
*
44+
* @author Bruno Garcia
45+
* @author Rafael Pavarini
46+
*/
47+
48+
@SpringBootTest
49+
public class SqsErrorHandlerIntegrationTests extends BaseSqsIntegrationTest {
50+
51+
private static final Logger logger = LoggerFactory.getLogger(SqsErrorHandlerIntegrationTests.class);
52+
53+
static final String SUCCESS_VISIBILITY_TIMEOUT_TO_ZERO_QUEUE_NAME = "success_visibility_timeout_to_zero_test_queue";
54+
55+
static final String SUCCESS_VISIBILITY_TIMEOUT_TO_ZERO_BATCH_QUEUE_NAME = "success_visibility_batch_timeout_to_zero_test_queue";
56+
57+
static final String SUCCESS_VISIBILITY_TIMEOUT_TO_ZERO_FACTORY = "receivesMessageErrorFactory";
58+
59+
@BeforeAll
60+
static void beforeTests() {
61+
SqsAsyncClient client = createAsyncClient();
62+
CompletableFuture.allOf(
63+
createQueue(client, SUCCESS_VISIBILITY_TIMEOUT_TO_ZERO_QUEUE_NAME,
64+
singletonMap(QueueAttributeName.VISIBILITY_TIMEOUT, "500")),
65+
createQueue(client, SUCCESS_VISIBILITY_TIMEOUT_TO_ZERO_BATCH_QUEUE_NAME,
66+
singletonMap(QueueAttributeName.VISIBILITY_TIMEOUT, "500")))
67+
.join();
68+
}
69+
70+
71+
@Autowired
72+
LatchContainer latchContainer;
73+
74+
@Autowired
75+
SqsTemplate sqsTemplate;
76+
77+
@Autowired
78+
ObjectMapper objectMapper;
79+
80+
@Test
81+
void receivesMessageVisibilityTimeout() throws Exception {
82+
String messageBody = UUID.randomUUID().toString();
83+
sqsTemplate.send(SUCCESS_VISIBILITY_TIMEOUT_TO_ZERO_QUEUE_NAME, messageBody);
84+
logger.debug("Sent message to queue {} with messageBody {}", SUCCESS_VISIBILITY_TIMEOUT_TO_ZERO_QUEUE_NAME,
85+
messageBody);
86+
87+
assertThat(latchContainer.receivesRetryMessageQuicklyLatch.await(10, TimeUnit.SECONDS)).isTrue();
88+
}
89+
90+
@Test
91+
void receivesMessageVisibilityTimeoutBatch() throws Exception {
92+
List<Message<String>> messages = create10Messages("receivesMessageVisibilityTimeoutBatch");
93+
94+
sqsTemplate.sendManyAsync(SUCCESS_VISIBILITY_TIMEOUT_TO_ZERO_BATCH_QUEUE_NAME, messages);
95+
logger.debug("Sent message to queue {} with messageBody {}",
96+
SUCCESS_VISIBILITY_TIMEOUT_TO_ZERO_BATCH_QUEUE_NAME, messages);
97+
98+
assertThat(latchContainer.receivesRetryBatchMessageQuicklyLatch.await(10, TimeUnit.SECONDS)).isTrue();
99+
}
100+
101+
static class ImmediateRetryAsyncErrorHandlerListener {
102+
103+
@Autowired
104+
LatchContainer latchContainer;
105+
106+
private static final Map<String, Long> previousReceivedMessageTimestamps = new ConcurrentHashMap<>();
107+
108+
private static final int MAX_EXPECTED_ELAPSED_TIME_BETWEEN_MSG_RECEIVES_IN_MS = 5000;
109+
110+
@SqsListener(queueNames = SUCCESS_VISIBILITY_TIMEOUT_TO_ZERO_QUEUE_NAME, messageVisibilitySeconds = "500", factory = SUCCESS_VISIBILITY_TIMEOUT_TO_ZERO_FACTORY, id = "visibilityErrHandler")
111+
CompletableFuture<Void> listen(Message<String> message,
112+
@Header(SqsHeaders.SQS_QUEUE_NAME_HEADER) String queueName) {
113+
logger.info("Received message {} from queue {}", message, queueName);
114+
String msgId = MessageHeaderUtils.getHeader(message, "id", UUID.class).toString();
115+
Long prevReceivedMessageTimestamp = previousReceivedMessageTimestamps.get(msgId);
116+
if (prevReceivedMessageTimestamp == null) {
117+
previousReceivedMessageTimestamps.put(msgId, System.currentTimeMillis());
118+
return CompletableFuture
119+
.failedFuture(new RuntimeException("Expected exception from visibility-err-handler"));
120+
}
121+
122+
long elapsedTimeBetweenMessageReceivesInMs = System.currentTimeMillis() - prevReceivedMessageTimestamp;
123+
if (elapsedTimeBetweenMessageReceivesInMs < MAX_EXPECTED_ELAPSED_TIME_BETWEEN_MSG_RECEIVES_IN_MS) {
124+
latchContainer.receivesRetryMessageQuicklyLatch.countDown();
125+
}
126+
127+
return CompletableFuture.completedFuture(null);
128+
}
129+
}
130+
131+
static class ImmediateRetryAsyncBatchErrorHandlerListener {
132+
133+
@Autowired
134+
LatchContainer latchContainer;
135+
136+
private static final Map<String, Long> previousReceivedMessageTimestamps = new ConcurrentHashMap<>();
137+
138+
private static final int MAX_EXPECTED_ELAPSED_TIME_BETWEEN_BATCH_MSG_RECEIVES_IN_MS = 5000;
139+
140+
@SqsListener(queueNames = SUCCESS_VISIBILITY_TIMEOUT_TO_ZERO_BATCH_QUEUE_NAME, messageVisibilitySeconds = "500", factory = SUCCESS_VISIBILITY_TIMEOUT_TO_ZERO_FACTORY, id = "visibilityBatchErrHandler")
141+
CompletableFuture<Void> listen(List<Message<String>> messages) {
142+
logger.info("Received messages {} from queue {}", MessageHeaderUtils.getId(messages),
143+
messages.get(0).getHeaders().get(SqsHeaders.SQS_QUEUE_NAME_HEADER));
144+
145+
for(Message<String> message : messages) {
146+
String msgId = MessageHeaderUtils.getHeader(message, "id", UUID.class).toString();
147+
if (!previousReceivedMessageTimestamps.containsKey(msgId)) {
148+
previousReceivedMessageTimestamps.put(msgId, System.currentTimeMillis());
149+
return CompletableFuture.failedFuture(new RuntimeException("Expected exception from visibility-err-handler"));
150+
}
151+
else {
152+
long timediff = System.currentTimeMillis() - previousReceivedMessageTimestamps.get(msgId);
153+
if (MAX_EXPECTED_ELAPSED_TIME_BETWEEN_BATCH_MSG_RECEIVES_IN_MS > timediff) {
154+
latchContainer.receivesRetryBatchMessageQuicklyLatch.countDown();
155+
}
156+
}
157+
}
158+
159+
return CompletableFuture.completedFuture(null);
160+
}
161+
}
162+
163+
static class LatchContainer {
164+
final CountDownLatch receivesRetryMessageQuicklyLatch = new CountDownLatch(1);
165+
final CountDownLatch receivesRetryBatchMessageQuicklyLatch = new CountDownLatch(10);
166+
}
167+
168+
@Import(SqsBootstrapConfiguration.class)
169+
@Configuration
170+
static class SQSConfiguration {
171+
172+
// @formatter:off
173+
@Bean(name = SUCCESS_VISIBILITY_TIMEOUT_TO_ZERO_FACTORY)
174+
public SqsMessageListenerContainerFactory<Object> errorHandlerVisibility() {
175+
return SqsMessageListenerContainerFactory
176+
.builder()
177+
.configure(options -> options
178+
.maxConcurrentMessages(10)
179+
.pollTimeout(Duration.ofSeconds(10))
180+
.maxMessagesPerPoll(10)
181+
.queueAttributeNames(Collections.singletonList(QueueAttributeName.QUEUE_ARN))
182+
.maxDelayBetweenPolls(Duration.ofSeconds(10)))
183+
.errorHandler(new ImmediateRetryAsyncErrorHandler<>())
184+
.sqsAsyncClientSupplier(BaseSqsIntegrationTest::createAsyncClient)
185+
.build();
186+
}
187+
// @formatter:on
188+
189+
@Bean
190+
ImmediateRetryAsyncErrorHandlerListener immediateRetryAsyncErrorHandlerListener() {
191+
return new ImmediateRetryAsyncErrorHandlerListener();
192+
}
193+
194+
@Bean
195+
ImmediateRetryAsyncBatchErrorHandlerListener immediateRetryAsyncBatchErrorHandlerListener() {
196+
return new ImmediateRetryAsyncBatchErrorHandlerListener();
197+
}
198+
199+
LatchContainer latchContainer = new LatchContainer();
200+
201+
@Bean
202+
LatchContainer latchContainer() {
203+
return this.latchContainer;
204+
}
205+
206+
@Bean
207+
ObjectMapper objectMapper() {
208+
return new ObjectMapper();
209+
}
210+
211+
@Bean
212+
SqsTemplate sqsTemplate() {
213+
return SqsTemplate.builder().sqsAsyncClient(BaseSqsIntegrationTest.createAsyncClient()).build();
214+
}
215+
}
216+
217+
private List<Message<String>> create10Messages(String testName) {
218+
return IntStream.range(0, 10).mapToObj(index -> testName + "-payload-" + index)
219+
.map(payload -> MessageBuilder.withPayload(payload).build()).collect(Collectors.toList());
220+
}
221+
}

0 commit comments

Comments
 (0)