Skip to content

Commit 7d93e04

Browse files
authored
Merge pull request #76 from awslabs/robin-aws/delayed_messages
Add support for MessageProducer.setDeliveryDelay() (from JMS 2.0)
2 parents ce40ad9 + 078b574 commit 7d93e04

File tree

2 files changed

+80
-0
lines changed

2 files changed

+80
-0
lines changed

src/main/java/com/amazon/sqs/javamessaging/SQSMessageProducer.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.util.Enumeration;
1818
import java.util.HashMap;
1919
import java.util.Map;
20+
import java.util.concurrent.TimeUnit;
2021
import java.util.concurrent.atomic.AtomicBoolean;
2122

2223
import javax.jms.Destination;
@@ -55,6 +56,10 @@
5556
public class SQSMessageProducer implements MessageProducer, QueueSender {
5657
private static final Log LOG = LogFactory.getLog(SQSMessageProducer.class);
5758

59+
private long MAXIMUM_DELIVERY_DELAY_MILLISECONDS = TimeUnit.MILLISECONDS.convert(15, TimeUnit.MINUTES);
60+
61+
private int deliveryDelaySeconds = 0;
62+
5863
/** This field is not actually used. */
5964
private long timeToLive;
6065
/** This field is not actually used. */
@@ -122,6 +127,10 @@ void sendInternal(SQSQueueDestination queue, Message rawMessage) throws JMSExcep
122127
SendMessageRequest sendMessageRequest = new SendMessageRequest(queue.getQueueUrl(), sqsMessageBody);
123128
sendMessageRequest.setMessageAttributes(messageAttributes);
124129

130+
if (deliveryDelaySeconds != 0) {
131+
sendMessageRequest.setDelaySeconds(deliveryDelaySeconds);
132+
}
133+
125134
//for FIFO queues, we have to specify both MessageGroupId, which we obtain from standard property JMSX_GROUP_ID
126135
//and MessageDeduplicationId, which we obtain from a custom provider specific property JMS_SQS_DEDUPLICATION_ID
127136
//notice that this code does not validate if the values are actually set by the JMS user
@@ -493,7 +502,32 @@ public void setTimeToLive(long timeToLive) throws JMSException {
493502
public long getTimeToLive() throws JMSException {
494503
return timeToLive;
495504
}
505+
506+
/**
507+
* Sets the minimum length of time in milliseconds that must elapse after a
508+
* message is sent before the JMS provider may deliver the message to a consumer.
509+
* <p>
510+
* This must be a multiple of 1000, since SQS only supports delivery delays
511+
* in seconds.
512+
*/
513+
public void setDeliveryDelay(long deliveryDelay) {
514+
if (deliveryDelay < 0 || deliveryDelay > MAXIMUM_DELIVERY_DELAY_MILLISECONDS) {
515+
throw new IllegalArgumentException("Delivery delay must be non-negative and at most 15 minutes: " + deliveryDelay);
516+
}
517+
if (deliveryDelay % 1000 != 0) {
518+
throw new IllegalArgumentException("Delivery delay must be a multiple of 1000: " + deliveryDelay);
519+
}
520+
this.deliveryDelaySeconds = (int)(deliveryDelay / 1000);
521+
}
496522

523+
/**
524+
* Gets the minimum length of time in milliseconds that must elapse after a
525+
* message is sent before the JMS provider may deliver the message to a consumer.
526+
*/
527+
public long getDeliveryDelay() {
528+
return deliveryDelaySeconds * 1000;
529+
}
530+
497531
void checkClosed() throws IllegalStateException {
498532
if (closed.get()) {
499533
throw new IllegalStateException("The producer is closed.");

src/test/java/com/amazon/sqs/javamessaging/SQSMessageProducerTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
import java.util.HashSet;
4747
import java.util.List;
4848
import java.util.Map;
49+
import java.util.concurrent.TimeUnit;
50+
4951
import org.junit.Before;
5052
import org.junit.Test;
5153
import org.mockito.ArgumentCaptor;
@@ -699,6 +701,50 @@ public void testClose() throws JMSException {
699701
verify(sqsSession).removeProducer(producer);
700702
}
701703

704+
@Test
705+
public void testSetDeliveryDelay() throws JMSException {
706+
assertEquals(0, producer.getDeliveryDelay());
707+
708+
producer.setDeliveryDelay(2000);
709+
710+
assertEquals(2000, producer.getDeliveryDelay());
711+
712+
ArgumentCaptor<SendMessageRequest> requestCaptor = ArgumentCaptor.forClass(SendMessageRequest.class);
713+
when(amazonSQSClient.sendMessage(requestCaptor.capture()))
714+
.thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_1));
715+
716+
SQSTextMessage msg = new SQSTextMessage("Sorry I'm late!");
717+
producer.send(msg);
718+
719+
assertEquals(2, requestCaptor.getValue().getDelaySeconds().intValue());
720+
}
721+
722+
723+
@Test
724+
public void testSetDeliveryDelayInvalidDelays() throws JMSException {
725+
try {
726+
producer.setDeliveryDelay(-1);
727+
fail();
728+
} catch (IllegalArgumentException ide) {
729+
// expected
730+
}
731+
732+
try {
733+
producer.setDeliveryDelay(TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS));
734+
fail();
735+
} catch (IllegalArgumentException ide) {
736+
// expected
737+
}
738+
739+
try {
740+
producer.setDeliveryDelay(20);
741+
fail();
742+
} catch (IllegalArgumentException ide) {
743+
// expected
744+
}
745+
}
746+
747+
702748
private Map<String, MessageAttributeValue> createMessageAttribute(String type) {
703749
MessageAttributeValue messageAttributeValue = new MessageAttributeValue();
704750
messageAttributeValue.setDataType("String");

0 commit comments

Comments
 (0)