18
18
import java .util .Set ;
19
19
import java .util .concurrent .ConcurrentHashMap ;
20
20
21
- import javax .jms .IllegalStateException ;
22
- import javax .jms .Connection ;
23
- import javax .jms .ConnectionConsumer ;
24
- import javax .jms .ConnectionMetaData ;
25
- import javax .jms .Destination ;
26
- import javax .jms .ExceptionListener ;
27
- import javax .jms .InvalidClientIDException ;
28
- import javax .jms .JMSException ;
29
- import javax .jms .Queue ;
30
- import javax .jms .QueueConnection ;
31
- import javax .jms .QueueSession ;
32
- import javax .jms .ServerSessionPool ;
33
- import javax .jms .Session ;
34
- import javax .jms .Topic ;
21
+ import jakarta .jms .IllegalStateException ;
22
+ import jakarta .jms .Connection ;
23
+ import jakarta .jms .ConnectionConsumer ;
24
+ import jakarta .jms .ConnectionMetaData ;
25
+ import jakarta .jms .Destination ;
26
+ import jakarta .jms .ExceptionListener ;
27
+ import jakarta .jms .InvalidClientIDException ;
28
+ import jakarta .jms .JMSException ;
29
+ import jakarta .jms .Queue ;
30
+ import jakarta .jms .QueueConnection ;
31
+ import jakarta .jms .QueueSession ;
32
+ import jakarta .jms .ServerSessionPool ;
33
+ import jakarta .jms .Session ;
34
+ import jakarta .jms .Topic ;
35
35
36
36
import org .slf4j .Logger ;
37
37
import org .slf4j .LoggerFactory ;
@@ -107,7 +107,7 @@ public class SQSConnection implements Connection, QueueConnection {
107
107
*/
108
108
private volatile boolean actionOnConnectionTaken = false ;
109
109
110
- private final Set <Session > sessions = Collections .newSetFromMap (new ConcurrentHashMap <Session , Boolean >());
110
+ private final Set <Session > sessions = Collections .newSetFromMap (new ConcurrentHashMap <>());
111
111
112
112
SQSConnection (AmazonSQSMessagingClientWrapper amazonSQSClientJMSWrapper , int numberOfMessagesToPrefetch ) {
113
113
amazonSQSClient = amazonSQSClientJMSWrapper ;
@@ -167,7 +167,7 @@ public QueueSession createQueueSession(boolean transacted, int acknowledgeMode)
167
167
*
168
168
* @param transacted
169
169
* Only false is supported.
170
- * @param acknowledgeMode
170
+ * @param sessionMode
171
171
* Legal values are <code>Session.AUTO_ACKNOWLEDGE</code>,
172
172
* <code>Session.CLIENT_ACKNOWLEDGE</code>,
173
173
* <code>Session.DUPS_OK_ACKNOWLEDGE</code>, and
@@ -179,26 +179,26 @@ public QueueSession createQueueSession(boolean transacted, int acknowledgeMode)
179
179
* transaction and acknowledge mode.
180
180
*/
181
181
@ Override
182
- public Session createSession (boolean transacted , int acknowledgeMode ) throws JMSException {
182
+ public Session createSession (boolean transacted , int sessionMode ) throws JMSException {
183
183
checkClosed ();
184
184
actionOnConnectionTaken = true ;
185
- if (transacted || acknowledgeMode == Session .SESSION_TRANSACTED )
185
+ if (transacted || sessionMode == Session .SESSION_TRANSACTED )
186
186
throw new JMSException ("SQSSession does not support transacted" );
187
187
188
188
SQSSession sqsSession ;
189
- if (acknowledgeMode == Session .AUTO_ACKNOWLEDGE ) {
190
- sqsSession = new SQSSession (this , AcknowledgeMode .ACK_AUTO .withOriginalAcknowledgeMode (acknowledgeMode ));
191
- } else if (acknowledgeMode == Session .CLIENT_ACKNOWLEDGE || acknowledgeMode == Session .DUPS_OK_ACKNOWLEDGE ) {
192
- sqsSession = new SQSSession (this , AcknowledgeMode .ACK_RANGE .withOriginalAcknowledgeMode (acknowledgeMode ));
193
- } else if (acknowledgeMode == SQSSession .UNORDERED_ACKNOWLEDGE ) {
194
- sqsSession = new SQSSession (this , AcknowledgeMode .ACK_UNORDERED .withOriginalAcknowledgeMode (acknowledgeMode ));
189
+ if (sessionMode == Session .AUTO_ACKNOWLEDGE ) {
190
+ sqsSession = new SQSSession (this , AcknowledgeMode .ACK_AUTO .withOriginalAcknowledgeMode (sessionMode ));
191
+ } else if (sessionMode == Session .CLIENT_ACKNOWLEDGE || sessionMode == Session .DUPS_OK_ACKNOWLEDGE ) {
192
+ sqsSession = new SQSSession (this , AcknowledgeMode .ACK_RANGE .withOriginalAcknowledgeMode (sessionMode ));
193
+ } else if (sessionMode == SQSSession .UNORDERED_ACKNOWLEDGE ) {
194
+ sqsSession = new SQSSession (this , AcknowledgeMode .ACK_UNORDERED .withOriginalAcknowledgeMode (sessionMode ));
195
195
} else {
196
196
LOG .error ("Unrecognized acknowledgeMode. Cannot create Session." );
197
197
throw new JMSException ("Unrecognized acknowledgeMode. Cannot create Session." );
198
198
}
199
199
synchronized (stateLock ) {
200
200
if (closing ) {
201
- /**
201
+ /*
202
202
* SQSSession's constructor has already started a SQSSessionCallbackScheduler which should be closed
203
203
* before leaving sqsSession object.
204
204
*/
@@ -207,18 +207,27 @@ public Session createSession(boolean transacted, int acknowledgeMode) throws JMS
207
207
}
208
208
sessions .add (sqsSession );
209
209
210
- /**
210
+ /*
211
211
* Any new sessions created on a started connection should be
212
212
* started on creation
213
213
*/
214
214
if (running ) {
215
215
sqsSession .start ();
216
216
}
217
217
}
218
-
219
218
return sqsSession ;
220
219
}
221
220
221
+ @ Override
222
+ public Session createSession (int sessionMode ) throws JMSException {
223
+ return createSession (false , sessionMode );
224
+ }
225
+
226
+ @ Override
227
+ public Session createSession () throws JMSException {
228
+ throw new JMSException (SQSMessagingClientConstants .UNSUPPORTED_METHOD );
229
+ }
230
+
222
231
@ Override
223
232
public ExceptionListener getExceptionListener () throws JMSException {
224
233
checkClosing ();
@@ -303,7 +312,7 @@ public void start() throws JMSException {
303
312
* This means that a client can rely on the fact that none of its message
304
313
* listeners will be called and that all threads of control waiting for
305
314
* receive calls to return will not return with a message until the
306
- * connection is restarted. The receive timers for a stopped connection
315
+ * connection is restarted. The received timers for a stopped connection
307
316
* continue to advance, so receives may time out while the connection is
308
317
* stopped.
309
318
* <P>
@@ -336,7 +345,7 @@ public void stop() throws JMSException {
336
345
try {
337
346
for (Session session : sessions ) {
338
347
SQSSession sqsSession = (SQSSession ) session ;
339
- /**
348
+ /*
340
349
* Session stop call blocks until receives and/or
341
350
* message listeners in progress have completed.
342
351
*/
@@ -375,12 +384,11 @@ public void stop() throws JMSException {
375
384
*/
376
385
@ Override
377
386
public void close () throws JMSException {
378
-
379
387
if (closed ) {
380
388
return ;
381
389
}
382
390
383
- /**
391
+ /*
384
392
* A message listener must not attempt to close its own connection as
385
393
* this would lead to deadlock.
386
394
*/
@@ -411,7 +419,7 @@ public void close() throws JMSException {
411
419
412
420
}
413
421
}
414
- }/** Blocks until closing of the connection completes */
422
+ }/* Blocks until closing of the connection completes */
415
423
else {
416
424
synchronized (stateLock ) {
417
425
while (!closed ) {
@@ -432,7 +440,7 @@ public void close() throws JMSException {
432
440
* from list of Sessions.
433
441
*/
434
442
void removeSession (Session session ) throws JMSException {
435
- /**
443
+ /*
436
444
* No need to synchronize on stateLock assuming this can be only called
437
445
* by session.close(), on which point connection will not be worried
438
446
* about missing closing this session.
@@ -505,9 +513,22 @@ public ConnectionConsumer createConnectionConsumer(Destination destination, Stri
505
513
throw new JMSException (SQSMessagingClientConstants .UNSUPPORTED_METHOD );
506
514
}
507
515
516
+ @ Override
517
+ public ConnectionConsumer createSharedConnectionConsumer (Topic topic , String subscriptionName , String messageSelector , ServerSessionPool sessionPool , int maxMessages ) throws JMSException {
518
+ throw new JMSException (SQSMessagingClientConstants .UNSUPPORTED_METHOD );
519
+ }
520
+
508
521
/** This method is not supported. */
509
522
@ Override
510
- public ConnectionConsumer createDurableConnectionConsumer (Topic topic , String subscriptionName , String messageSelector ,
523
+ public ConnectionConsumer createDurableConnectionConsumer (
524
+ Topic topic , String subscriptionName , String messageSelector ,
525
+ ServerSessionPool sessionPool , int maxMessages ) throws JMSException {
526
+ throw new JMSException (SQSMessagingClientConstants .UNSUPPORTED_METHOD );
527
+ }
528
+
529
+ @ Override
530
+ public ConnectionConsumer createSharedDurableConnectionConsumer (
531
+ Topic topic , String subscriptionName , String messageSelector ,
511
532
ServerSessionPool sessionPool , int maxMessages ) throws JMSException {
512
533
throw new JMSException (SQSMessagingClientConstants .UNSUPPORTED_METHOD );
513
534
}
0 commit comments