19
19
import java .util .Collections ;
20
20
import java .util .Iterator ;
21
21
import java .util .List ;
22
+ import java .util .Map ;
22
23
import java .util .Set ;
23
24
import java .util .UUID ;
24
25
@@ -96,14 +97,14 @@ public class SQSMessageConsumerPrefetch implements Runnable, PrefetchManager {
96
97
* Counter on how many messages are prefetched into internal messageQueue.
97
98
*/
98
99
protected int messagesPrefetched = 0 ;
99
-
100
+
100
101
/**
101
102
* Counter on how many messages have been explicitly requested.
102
103
* TODO: Consider renaming this class and several other variables now that
103
104
* this logic factors in message requests as well as prefetching.
104
105
*/
105
106
protected int messagesRequested = 0 ;
106
-
107
+
107
108
/**
108
109
* States of the prefetch thread
109
110
*/
@@ -123,7 +124,7 @@ public class SQSMessageConsumerPrefetch implements Runnable, PrefetchManager {
123
124
* 25ms delayInterval.
124
125
*/
125
126
protected ExponentialBackoffStrategy backoffStrategy = new ExponentialBackoffStrategy (25 ,25 ,2000 );
126
-
127
+
127
128
SQSMessageConsumerPrefetch (SQSSessionCallbackScheduler sqsSessionRunnable , Acknowledger acknowledger ,
128
129
NegativeAcknowledger negativeAcknowledger , SQSQueueDestination sqsDestination ,
129
130
AmazonSQSMessagingClientWrapper amazonSQSClient , int numberOfMessagesToPrefetch ) {
@@ -141,16 +142,16 @@ public class SQSMessageConsumerPrefetch implements Runnable, PrefetchManager {
141
142
MessageListener getMessageListener () {
142
143
return messageListener ;
143
144
}
144
-
145
+
145
146
void setMessageConsumer (SQSMessageConsumer messageConsumer ) {
146
147
this .messageConsumer = messageConsumer ;
147
148
}
148
-
149
+
149
150
@ Override
150
151
public SQSMessageConsumer getMessageConsumer () {
151
152
return messageConsumer ;
152
153
}
153
-
154
+
154
155
/**
155
156
* Sets the message listener.
156
157
* <P>
@@ -170,7 +171,7 @@ protected void setMessageListener(MessageListener messageListener) {
170
171
if (!running || isClosed ()) {
171
172
return ;
172
173
}
173
-
174
+
174
175
List <MessageManager > allPrefetchedMessages = new ArrayList <MessageManager >(messageQueue );
175
176
sqsSessionRunnable .scheduleCallBacks (messageListener , allPrefetchedMessages );
176
177
messageQueue .clear ();
@@ -181,7 +182,7 @@ protected void setMessageListener(MessageListener messageListener) {
181
182
messageListenerReady ();
182
183
}
183
184
}
184
-
185
+
185
186
/**
186
187
* Determine the number of messages we should attempt to fetch from SQS.
187
188
* Returns the difference between the number of messages needed (either for
@@ -191,7 +192,7 @@ private int numberOfMessagesToFetch() {
191
192
int numberOfMessagesNeeded = Math .max (numberOfMessagesToPrefetch , messagesRequested );
192
193
return Math .max (numberOfMessagesNeeded - messagesPrefetched , 0 );
193
194
}
194
-
195
+
195
196
/**
196
197
* Runs until the message consumer is closed and in-progress SQS
197
198
* <code>receiveMessage</code> call returns.
@@ -212,7 +213,7 @@ public void run() {
212
213
if (isClosed ()) {
213
214
break ;
214
215
}
215
-
216
+
216
217
synchronized (stateLock ) {
217
218
waitForStart ();
218
219
waitForPrefetch ();
@@ -240,7 +241,7 @@ public void run() {
240
241
}
241
242
}
242
243
}
243
-
244
+
244
245
/**
245
246
* Call <code>receiveMessage</code> with the given wait time.
246
247
*/
@@ -263,7 +264,7 @@ protected List<Message> getMessages(int batchSize, int waitTimeSeconds) throws J
263
264
ReceiveMessageResponse receivedMessageResult = amazonSQSClient .receiveMessage (receiveMessageRequestBuilder .build ());
264
265
return receivedMessageResult .messages ();
265
266
}
266
-
267
+
267
268
/**
268
269
* Converts the received message to JMS message, and pushes to messages to
269
270
* either callback scheduler for asynchronous message delivery or to
@@ -282,14 +283,14 @@ protected void processReceivedMessages(List<Message> messages) {
282
283
nackMessages .add (message .receiptHandle ());
283
284
}
284
285
}
285
-
286
+
286
287
synchronized (stateLock ) {
287
288
if (messageListener != null ) {
288
289
sqsSessionRunnable .scheduleCallBacks (messageListener , messageManagers );
289
290
} else {
290
291
messageQueue .addAll (messageManagers );
291
292
}
292
-
293
+
293
294
messagesPrefetched += messageManagers .size ();
294
295
notifyStateChange ();
295
296
}
@@ -363,11 +364,12 @@ protected javax.jms.Message convertToJMSMessage(Message message) throws JMSExcep
363
364
throw new JMSException ("Not a supported JMS message type" );
364
365
}
365
366
}
366
-
367
+
367
368
jmsMessage .setJMSDestination (sqsDestination );
368
-
369
+
369
370
MessageAttributeValue replyToQueueNameAttribute = message .messageAttributes ().get (
370
371
SQSMessage .JMS_SQS_REPLY_TO_QUEUE_NAME );
372
+
371
373
MessageAttributeValue replyToQueueUrlAttribute = message .messageAttributes ().get (
372
374
SQSMessage .JMS_SQS_REPLY_TO_QUEUE_URL );
373
375
if (replyToQueueNameAttribute != null && replyToQueueUrlAttribute != null ) {
@@ -376,16 +378,27 @@ protected javax.jms.Message convertToJMSMessage(Message message) throws JMSExcep
376
378
Destination replyToQueue = new SQSQueueDestination (replyToQueueName , replyToQueueUrl );
377
379
jmsMessage .setJMSReplyTo (replyToQueue );
378
380
}
379
-
381
+
380
382
MessageAttributeValue correlationIdAttribute = message .messageAttributes ().get (
381
383
SQSMessage .JMS_SQS_CORRELATION_ID );
382
384
if (correlationIdAttribute != null ) {
383
385
jmsMessage .setJMSCorrelationID (correlationIdAttribute .stringValue ());
384
386
}
385
-
387
+
388
+ jmsMessage .setJMSTimestamp (getJMSTimestamp (message ));
386
389
return jmsMessage ;
387
390
}
388
391
392
+ private long getJMSTimestamp (Message message ) {
393
+ Map <String , String > systemAttributes = message .attributesAsStrings ();
394
+ String timestamp = systemAttributes .get (SQSMessagingClientConstants .SENT_TIMESTAMP );
395
+ if (timestamp != null ) {
396
+ return Long .parseLong (timestamp );
397
+ } else {
398
+ return 0L ;
399
+ }
400
+ }
401
+
389
402
protected void nackQueueMessages () {
390
403
// Also nack messages already in the messageQueue
391
404
synchronized (stateLock ) {
@@ -411,7 +424,7 @@ protected void waitForStart() throws InterruptedException {
411
424
}
412
425
}
413
426
}
414
-
427
+
415
428
@ Override
416
429
public void messageDispatched () {
417
430
synchronized (stateLock ) {
@@ -433,7 +446,7 @@ public void messageListenerReady() {
433
446
}
434
447
}
435
448
}
436
-
449
+
437
450
void requestMessage () {
438
451
synchronized (stateLock ) {
439
452
messagesRequested ++;
@@ -447,7 +460,7 @@ private void unrequestMessage() {
447
460
notifyStateChange ();
448
461
}
449
462
}
450
-
463
+
451
464
public static class MessageManager {
452
465
453
466
private final PrefetchManager prefetchManager ;
@@ -471,7 +484,7 @@ public javax.jms.Message getMessage() {
471
484
javax .jms .Message receive () throws JMSException {
472
485
return receive (0 );
473
486
}
474
-
487
+
475
488
javax .jms .Message receive (long timeout ) throws JMSException {
476
489
if (cannotDeliver ()) {
477
490
return null ;
@@ -480,7 +493,7 @@ javax.jms.Message receive(long timeout) throws JMSException {
480
493
if (timeout < 0 ) {
481
494
timeout = 0 ;
482
495
}
483
-
496
+
484
497
MessageManager messageManager = null ;
485
498
synchronized (stateLock ) {
486
499
requestMessage ();
@@ -490,7 +503,7 @@ javax.jms.Message receive(long timeout) throws JMSException {
490
503
messageManager = messageQueue .pollFirst ();
491
504
} else {
492
505
long startTime = System .currentTimeMillis ();
493
-
506
+
494
507
long waitTime = 0 ;
495
508
while (messageQueue .isEmpty () && !isClosed () &&
496
509
(timeout == 0 || (waitTime = getWaitTime (timeout , startTime )) > 0 )) {
@@ -529,7 +542,7 @@ javax.jms.Message receiveNoWait() throws JMSException {
529
542
if (cannotDeliver ()) {
530
543
return null ;
531
544
}
532
-
545
+
533
546
MessageManager messageManager ;
534
547
synchronized (stateLock ) {
535
548
if (messageQueue .isEmpty () && numberOfMessagesToPrefetch == 0 ) {
@@ -576,22 +589,22 @@ void close() {
576
589
messageListener = null ;
577
590
}
578
591
}
579
-
592
+
580
593
/**
581
594
* Helper that notifies PrefetchThread that message is dispatched and AutoAcknowledge
582
595
*/
583
596
private javax .jms .Message messageHandler (MessageManager messageManager ) throws JMSException {
584
597
if (messageManager == null ) {
585
598
return null ;
586
- }
599
+ }
587
600
javax .jms .Message message = messageManager .getMessage ();
588
-
601
+
589
602
// Notify PrefetchThread that message is dispatched
590
603
this .messageDispatched ();
591
604
acknowledger .notifyMessageReceived ((SQSMessage ) message );
592
605
return message ;
593
606
}
594
-
607
+
595
608
private boolean cannotDeliver () throws JMSException {
596
609
if (!running ) {
597
610
return true ;
@@ -606,7 +619,7 @@ private boolean cannotDeliver() throws JMSException {
606
619
}
607
620
return false ;
608
621
}
609
-
622
+
610
623
/**
611
624
* Sleeps for the configured time.
612
625
*/
@@ -617,7 +630,7 @@ protected void sleep(long sleepTimeMillis) throws InterruptedException {
617
630
throw e ;
618
631
}
619
632
}
620
-
633
+
621
634
protected boolean isClosed () {
622
635
return closed ;
623
636
}
@@ -645,7 +658,7 @@ List<SQSMessageIdentifier> purgePrefetchedMessagesWithGroups(Set<String> affecte
645
658
646
659
notifyStateChange ();
647
660
}
648
-
661
+
649
662
return purgedMessages ;
650
663
}
651
664
}
0 commit comments