Skip to content

Commit 61c63b8

Browse files
authored
Merge pull request #77 from awslabs/robin-aws/receive_no_wait_without_prefetching
receiveNoWait() without prefetching
2 parents efa716d + 1d2cfc8 commit 61c63b8

File tree

4 files changed

+68
-43
lines changed

4 files changed

+68
-43
lines changed

src/main/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetch.java

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414
*/
1515
package com.amazon.sqs.javamessaging;
1616

17-
import java.net.URI;
1817
import java.util.ArrayDeque;
1918
import java.util.ArrayList;
19+
import java.util.Collections;
2020
import java.util.Iterator;
2121
import java.util.List;
2222
import java.util.Set;
@@ -218,7 +218,7 @@ public void run() {
218218
}
219219

220220
if (!isClosed()) {
221-
messages = getMessages(prefetchBatchSize);
221+
messages = getMessagesWithBackoff(prefetchBatchSize);
222222
}
223223

224224
if (messages != null && !messages.isEmpty()) {
@@ -240,38 +240,24 @@ public void run() {
240240
}
241241

242242
/**
243-
* Call <code>receiveMessage</code> with long-poll wait time of 20 seconds
244-
* with available prefetch batch size and potential re-tries.
243+
* Call <code>receiveMessage</code> with the given wait time.
245244
*/
246-
protected List<Message> getMessages(int prefetchBatchSize) throws InterruptedException {
245+
protected List<Message> getMessages(int batchSize, int waitTimeSeconds) throws JMSException {
247246

248-
assert prefetchBatchSize > 0;
247+
assert batchSize > 0;
249248

250249
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl)
251-
.withMaxNumberOfMessages(prefetchBatchSize)
250+
.withMaxNumberOfMessages(batchSize)
252251
.withAttributeNames(ALL)
253252
.withMessageAttributeNames(ALL)
254-
.withWaitTimeSeconds(WAIT_TIME_SECONDS);
253+
.withWaitTimeSeconds(waitTimeSeconds);
255254
//if the receive request is for FIFO queue, provide a unique receive request attempt it, so that
256255
//failed calls retried by SDK will claim the same messages
257256
if (sqsDestination.isFifo()) {
258257
receiveMessageRequest.withReceiveRequestAttemptId(UUID.randomUUID().toString());
259258
}
260-
List<Message> messages = null;
261-
try {
262-
ReceiveMessageResult receivedMessageResult = amazonSQSClient.receiveMessage(receiveMessageRequest);
263-
messages = receivedMessageResult.getMessages();
264-
retriesAttempted = 0;
265-
} catch (JMSException e) {
266-
LOG.warn("Encountered exception during receive in ConsumerPrefetch thread", e);
267-
try {
268-
sleep(backoffStrategy.delayBeforeNextRetry(retriesAttempted++));
269-
} catch (InterruptedException ex) {
270-
LOG.warn("Interrupted while retrying on receive", ex);
271-
throw ex;
272-
}
273-
}
274-
return messages;
259+
ReceiveMessageResult receivedMessageResult = amazonSQSClient.receiveMessage(receiveMessageRequest);
260+
return receivedMessageResult.getMessages();
275261
}
276262

277263
/**
@@ -288,6 +274,7 @@ protected void processReceivedMessages(List<Message> messages) {
288274
javax.jms.Message jmsMessage = convertToJMSMessage(message);
289275
messageManagers.add(new MessageManager(this, jmsMessage));
290276
} catch (JMSException e) {
277+
LOG.warn("Caught exception while converting received messages", e);
291278
nackMessages.add(message.getReceiptHandle());
292279
}
293280
}
@@ -311,6 +298,23 @@ protected void processReceivedMessages(List<Message> messages) {
311298
}
312299
}
313300

301+
protected List<Message> getMessagesWithBackoff(int batchSize) throws InterruptedException {
302+
try {
303+
List<Message> result = getMessages(batchSize, WAIT_TIME_SECONDS);
304+
retriesAttempted = 0;
305+
return result;
306+
} catch (JMSException e) {
307+
LOG.warn("Encountered exception during receive in ConsumerPrefetch thread", e);
308+
try {
309+
sleep(backoffStrategy.delayBeforeNextRetry(retriesAttempted++));
310+
return Collections.emptyList();
311+
} catch (InterruptedException ex) {
312+
LOG.warn("Interrupted while retrying on receive", ex);
313+
throw ex;
314+
}
315+
}
316+
}
317+
314318
protected void waitForPrefetch() throws InterruptedException {
315319
synchronized (stateLock) {
316320
while (numberOfMessagesToFetch() <= 0 && !isClosed()) {
@@ -524,6 +528,12 @@ javax.jms.Message receiveNoWait() throws JMSException {
524528

525529
MessageManager messageManager;
526530
synchronized (stateLock) {
531+
if (messageQueue.isEmpty() && numberOfMessagesToPrefetch == 0) {
532+
List<Message> messages = getMessages(1, 0);
533+
if (messages != null && !messages.isEmpty()) {
534+
processReceivedMessages(messages);
535+
}
536+
}
527537
messageManager = messageQueue.pollFirst();
528538
}
529539
if (messageManager != null) {

src/main/java/com/amazon/sqs/javamessaging/message/SQSMessage.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1192,9 +1192,7 @@ private static String getType(Object value) throws JMSException {
11921192
}
11931193

11941194
private static Object getObjectValue(String value, String type) throws JMSException {
1195-
if (STRING.equals(type) || NUMBER.equals(type)) {
1196-
return value;
1197-
} else if (INT.equals(type)) {
1195+
if (INT.equals(type)) {
11981196
return Integer.valueOf(value);
11991197
} else if (LONG.equals(type)) {
12001198
return Long.valueOf(value);
@@ -1211,6 +1209,8 @@ private static Object getObjectValue(String value, String type) throws JMSExcept
12111209
return Float.valueOf(value);
12121210
} else if (SHORT.equals(type)) {
12131211
return Short.valueOf(value);
1212+
} else if (type != null && (type.startsWith(STRING) || type.startsWith(NUMBER))) {
1213+
return value;
12141214
} else {
12151215
throw new JMSException(type + " is not a supported JMS property type");
12161216
}

src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchTest.java

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ public void testStopWhenConsumerClosedDuringWaitForPrefetch() throws Interrupted
260260
verify(consumerPrefetch, times(2)).nackQueueMessages();
261261

262262
// Ensure we do not get messages when closed while waiting for prefetch
263-
verify(consumerPrefetch, never()).getMessages(anyInt());
263+
verify(consumerPrefetch, never()).getMessages(anyInt(), anyInt());
264264

265265
// Ensure we do not process any messages
266266
verify(consumerPrefetch, never()).processReceivedMessages(any(List.class));
@@ -296,7 +296,7 @@ public void testStopAfterInterruptWaitForStart() throws InterruptedException, JM
296296
verify(consumerPrefetch).nackQueueMessages();
297297

298298
verify(consumerPrefetch, never()).waitForPrefetch();
299-
verify(consumerPrefetch, never()).getMessages(anyInt());
299+
verify(consumerPrefetch, never()).getMessages(anyInt(), anyInt());
300300
verify(consumerPrefetch, never()).processReceivedMessages(any(List.class));
301301

302302
// Ensure retries attempt was not increased
@@ -335,7 +335,7 @@ public void testStopAfterErrorWaitForStart() throws InterruptedException, JMSExc
335335
verify(consumerPrefetch).nackQueueMessages();
336336

337337
verify(consumerPrefetch, never()).waitForPrefetch();
338-
verify(consumerPrefetch, never()).getMessages(anyInt());
338+
verify(consumerPrefetch, never()).getMessages(anyInt(), anyInt());
339339
verify(consumerPrefetch, never()).processReceivedMessages(any(List.class));
340340

341341
// Ensure retries attempt was not increased
@@ -371,7 +371,7 @@ public void testStopAfterInterruptWaitForPrefetch() throws InterruptedException,
371371
verify(consumerPrefetch).waitForPrefetch();
372372
verify(consumerPrefetch).nackQueueMessages();
373373

374-
verify(consumerPrefetch, never()).getMessages(anyInt());
374+
verify(consumerPrefetch, never()).getMessages(anyInt(), anyInt());
375375
verify(consumerPrefetch, never()).processReceivedMessages(any(List.class));
376376

377377
// Ensure retries attempt was not increased
@@ -412,7 +412,7 @@ public void testStopAfterErrorWaitForPrefetch() throws InterruptedException, JMS
412412
verify(consumerPrefetch).waitForPrefetch();
413413
verify(consumerPrefetch).nackQueueMessages();
414414

415-
verify(consumerPrefetch, never()).getMessages(anyInt());
415+
verify(consumerPrefetch, never()).getMessages(anyInt(), anyInt());
416416
verify(consumerPrefetch, never()).processReceivedMessages(any(List.class));
417417

418418
// Ensure retries attempt was not increased
@@ -435,7 +435,7 @@ public void testStopAfterInterruptGetMessages() throws InterruptedException, JMS
435435
doNothing()
436436
.when(consumerPrefetch).waitForPrefetch();
437437
doThrow(new InterruptedException("Interrupt"))
438-
.when(consumerPrefetch).getMessages(anyInt());
438+
.when(consumerPrefetch).getMessagesWithBackoff(anyInt());
439439

440440
/*
441441
* Run the prefetch
@@ -449,7 +449,7 @@ public void testStopAfterInterruptGetMessages() throws InterruptedException, JMS
449449
verify(consumerPrefetch).waitForStart();
450450
verify(consumerPrefetch).waitForPrefetch();
451451
verify(consumerPrefetch).nackQueueMessages();
452-
verify(consumerPrefetch).getMessages(anyInt());
452+
verify(consumerPrefetch).getMessagesWithBackoff(anyInt());
453453

454454
verify(consumerPrefetch, never()).processReceivedMessages(any(List.class));
455455

@@ -473,7 +473,7 @@ public void testStopAfterErrorGetMessages() throws InterruptedException, JMSExce
473473
doNothing()
474474
.when(consumerPrefetch).waitForPrefetch();
475475
doThrow(new Error("error"))
476-
.when(consumerPrefetch).getMessages(anyInt());
476+
.when(consumerPrefetch).getMessages(anyInt(), anyInt());
477477

478478
/*
479479
* Run the prefetch
@@ -492,7 +492,7 @@ public void testStopAfterErrorGetMessages() throws InterruptedException, JMSExce
492492
verify(consumerPrefetch).waitForStart();
493493
verify(consumerPrefetch).waitForPrefetch();
494494
verify(consumerPrefetch).nackQueueMessages();
495-
verify(consumerPrefetch).getMessages(anyInt());
495+
verify(consumerPrefetch).getMessages(anyInt(), anyInt());
496496

497497
verify(consumerPrefetch, never()).processReceivedMessages(any(List.class));
498498

@@ -1332,6 +1332,11 @@ public void testReceiveNoWaitEmpty() throws InterruptedException, JMSException {
13321332
*/
13331333
consumerPrefetch.running = true;
13341334

1335+
if (numberOfMessagesToPrefetch == 0) {
1336+
when(amazonSQSClient.receiveMessage(any(ReceiveMessageRequest.class)))
1337+
.thenReturn(new ReceiveMessageResult());
1338+
}
1339+
13351340
/*
13361341
* Call receive messages
13371342
*/
@@ -1542,7 +1547,7 @@ public void testGetMessages() throws InterruptedException, JMSException {
15421547
/*
15431548
* Get messages
15441549
*/
1545-
List<com.amazonaws.services.sqs.model.Message> result = consumerPrefetch.getMessages(prefetchBatchSize);
1550+
List<com.amazonaws.services.sqs.model.Message> result = consumerPrefetch.getMessagesWithBackoff(prefetchBatchSize);
15461551

15471552
/*
15481553
* Verify results
@@ -1555,18 +1560,18 @@ public void testGetMessages() throws InterruptedException, JMSException {
15551560
* Test Get Messages with illegal prefetch size
15561561
*/
15571562
@Test
1558-
public void testGetMessagesIllegalPrefetchSize() throws InterruptedException {
1563+
public void testGetMessagesIllegalPrefetchSize() throws JMSException {
15591564

15601565
int negativeSize = -10;
15611566
try {
1562-
consumerPrefetch.getMessages(negativeSize);
1567+
consumerPrefetch.getMessages(negativeSize, 0);
15631568
fail();
15641569
} catch(AssertionError ae) {
15651570
// expected exception
15661571
}
15671572

15681573
try {
1569-
consumerPrefetch.getMessages(0);
1574+
consumerPrefetch.getMessages(0, 0);
15701575
fail();
15711576
} catch(AssertionError ae) {
15721577
// expected exception
@@ -1597,9 +1602,9 @@ public void testGetMessagesJMSException() throws InterruptedException, JMSExcept
15971602
when(backoffStrategy.delayBeforeNextRetry(retriesAttempted + 1))
15981603
.thenReturn(secondSleepTime);
15991604

1600-
consumerPrefetch.getMessages(prefetchBatchSize);
1605+
consumerPrefetch.getMessagesWithBackoff(prefetchBatchSize);
16011606

1602-
consumerPrefetch.getMessages(prefetchBatchSize);
1607+
consumerPrefetch.getMessagesWithBackoff(prefetchBatchSize);
16031608

16041609
/*
16051610
* Verify results
@@ -1638,7 +1643,7 @@ public void testGetMessagesInterruptDuringBackoff() throws InterruptedException,
16381643
public void run() {
16391644
try {
16401645
beforeGetMessagesCall.countDown();
1641-
consumerPrefetch.getMessages(prefetchBatchSize);
1646+
consumerPrefetch.getMessagesWithBackoff(prefetchBatchSize);
16421647
} catch (InterruptedException e) {
16431648
recvInterruptedExceptionLatch.countDown();
16441649
e.printStackTrace();
@@ -1672,7 +1677,7 @@ public void testGetMessagesError() throws InterruptedException, JMSException {
16721677
.thenThrow(new Error());
16731678

16741679
try {
1675-
consumerPrefetch.getMessages(prefetchBatchSize);
1680+
consumerPrefetch.getMessages(prefetchBatchSize, 0);
16761681
} catch (Error e) {
16771682
// Expected error exception
16781683
}

src/test/java/com/amazon/sqs/javamessaging/message/SQSMessageTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public class SQSMessageTest {
5353
final String myShort = "myShort";
5454
final String myByte = "myByte";
5555
final String myString = "myString";
56+
final String myCustomString = "myCustomString";
5657
final String myNumber = "myNumber";
5758

5859
@Before
@@ -326,6 +327,10 @@ public void testSQSMessageAttributeToProperty() throws JMSException {
326327
.withDataType(SQSMessagingClientConstants.STRING)
327328
.withStringValue("StringValue"));
328329

330+
messageAttributes.put(myCustomString, new MessageAttributeValue()
331+
.withDataType(SQSMessagingClientConstants.NUMBER + ".custom")
332+
.withStringValue("['one', 'two']"));
333+
329334
messageAttributes.put(myNumber, new MessageAttributeValue()
330335
.withDataType(SQSMessagingClientConstants.NUMBER)
331336
.withStringValue("500"));
@@ -374,6 +379,10 @@ public void testSQSMessageAttributeToProperty() throws JMSException {
374379
Assert.assertEquals(message.getObjectProperty(myString), "StringValue");
375380
Assert.assertEquals(message.getStringProperty(myString), "StringValue");
376381

382+
Assert.assertTrue(message.propertyExists(myCustomString));
383+
Assert.assertEquals(message.getObjectProperty(myCustomString), "['one', 'two']");
384+
Assert.assertEquals(message.getStringProperty(myCustomString), "['one', 'two']");
385+
377386
Assert.assertTrue(message.propertyExists(myNumber));
378387
Assert.assertEquals(message.getObjectProperty(myNumber), "500");
379388
Assert.assertEquals(message.getStringProperty(myNumber), "500");
@@ -395,6 +404,7 @@ public void testSQSMessageAttributeToProperty() throws JMSException {
395404
myShort,
396405
myByte,
397406
myString,
407+
myCustomString,
398408
myNumber,
399409
JMSX_DELIVERY_COUNT));
400410

0 commit comments

Comments
 (0)