From 7281ca25b56b797fd5efd63ac587c1f7f0445b9b Mon Sep 17 00:00:00 2001 From: Vladyslav Date: Tue, 5 Nov 2024 18:49:16 +0100 Subject: [PATCH] Add sqs async client support --- .../AbstractSQSConnectionFactory.java | 29 ++ .../AmazonSQSAsyncMessagingClientWrapper.java | 394 ++++++++++++++++++ .../AmazonSQSMessagingClient.java | 37 ++ .../AmazonSQSMessagingClientWrapper.java | 16 +- .../SQSAsyncConnectionFactory.java | 110 +++++ .../sqs/javamessaging/SQSConnection.java | 12 +- .../javamessaging/SQSConnectionFactory.java | 26 +- .../SQSMessageConsumerPrefetch.java | 4 +- .../sqs/javamessaging/SQSMessageProducer.java | 4 +- .../amazon/sqs/javamessaging/SQSSession.java | 2 +- .../acknowledge/AcknowledgeMode.java | 4 +- .../acknowledge/AutoAcknowledger.java | 6 +- .../acknowledge/NegativeAcknowledger.java | 6 +- .../acknowledge/RangedAcknowledger.java | 6 +- .../acknowledge/UnorderedAcknowledger.java | 6 +- .../sqs/javamessaging/AcknowledgerCommon.java | 2 +- .../AmazonSQSMessagingClientWrapperTest.java | 2 +- .../javamessaging/AutoAcknowledgerTest.java | 2 +- ...essageListenerConcurrentOperationTest.java | 2 +- .../sqs/javamessaging/SQSConnectionTest.java | 2 +- .../SQSMessageConsumerPrefetchFifoTest.java | 2 +- .../SQSMessageConsumerPrefetchTest.java | 2 +- .../SQSMessageProducerFifoTest.java | 2 +- .../javamessaging/SQSMessageProducerTest.java | 2 +- .../sqs/javamessaging/SQSSessionTest.java | 2 +- 25 files changed, 621 insertions(+), 61 deletions(-) create mode 100644 src/main/java/com/amazon/sqs/javamessaging/AbstractSQSConnectionFactory.java create mode 100644 src/main/java/com/amazon/sqs/javamessaging/AmazonSQSAsyncMessagingClientWrapper.java create mode 100644 src/main/java/com/amazon/sqs/javamessaging/AmazonSQSMessagingClient.java create mode 100644 src/main/java/com/amazon/sqs/javamessaging/SQSAsyncConnectionFactory.java diff --git a/src/main/java/com/amazon/sqs/javamessaging/AbstractSQSConnectionFactory.java b/src/main/java/com/amazon/sqs/javamessaging/AbstractSQSConnectionFactory.java new file mode 100644 index 0000000..a25d783 --- /dev/null +++ b/src/main/java/com/amazon/sqs/javamessaging/AbstractSQSConnectionFactory.java @@ -0,0 +1,29 @@ +package com.amazon.sqs.javamessaging; + +import jakarta.jms.ConnectionFactory; +import jakarta.jms.JMSContext; +import jakarta.jms.JMSRuntimeException; +import jakarta.jms.QueueConnectionFactory; + +public abstract class AbstractSQSConnectionFactory implements ConnectionFactory, QueueConnectionFactory { + + @Override + public JMSContext createContext() { + throw new JMSRuntimeException(SQSMessagingClientConstants.UNSUPPORTED_METHOD); + } + + @Override + public JMSContext createContext(String userName, String password) { + throw new JMSRuntimeException(SQSMessagingClientConstants.UNSUPPORTED_METHOD); + } + + @Override + public JMSContext createContext(String userName, String password, int sessionMode) { + throw new JMSRuntimeException(SQSMessagingClientConstants.UNSUPPORTED_METHOD); + } + + @Override + public JMSContext createContext(int sessionMode) { + throw new JMSRuntimeException(SQSMessagingClientConstants.UNSUPPORTED_METHOD); + } +} diff --git a/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSAsyncMessagingClientWrapper.java b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSAsyncMessagingClientWrapper.java new file mode 100644 index 0000000..4efbb1d --- /dev/null +++ b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSAsyncMessagingClientWrapper.java @@ -0,0 +1,394 @@ +package com.amazon.sqs.javamessaging; + +import jakarta.jms.InvalidDestinationException; +import jakarta.jms.JMSException; +import jakarta.jms.JMSSecurityException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.awscore.AwsRequest; +import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchResponse; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest; +import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; +import software.amazon.awssdk.services.sqs.model.CreateQueueResponse; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse; +import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; +import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest; +import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse; +import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; +import software.amazon.awssdk.services.sqs.model.SendMessageRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageResponse; + +import java.util.Set; + + +/** + * This is a JMS Wrapper of SqsAsyncClient. This class changes all + * AwsServiceException and SdkException into + * JMSException/JMSSecurityException. + */ +public class AmazonSQSAsyncMessagingClientWrapper implements AmazonSQSMessagingClient { + private static final Logger LOG = LoggerFactory.getLogger(AmazonSQSAsyncMessagingClientWrapper.class); + + /** + * List of exceptions that can classified as security. These exceptions + * are not thrown during connection-set-up rather after the service + * calls of the SqsAsyncClient. + */ + private static final Set SECURITY_EXCEPTION_ERROR_CODES = Set.of("MissingClientTokenId", + "InvalidClientTokenId", "MissingAuthenticationToken", "AccessDenied"); + + private final SqsAsyncClient amazonSQSClient; + private final AwsCredentialsProvider credentialsProvider; + + /** + * @param amazonSQSClient The AWS SDK Client for SQS. + * @throws JMSException if the client is null + */ + public AmazonSQSAsyncMessagingClientWrapper(SqsAsyncClient amazonSQSClient) throws JMSException { + this(amazonSQSClient, null); + } + + /** + * @param amazonSQSClient The AWS SDK Client for SQS. + * @throws JMSException if the client is null + */ + public AmazonSQSAsyncMessagingClientWrapper(SqsAsyncClient amazonSQSClient, AwsCredentialsProvider credentialsProvider) throws JMSException { + if (amazonSQSClient == null) { + throw new JMSException("Amazon SQS client cannot be null"); + } + this.amazonSQSClient = amazonSQSClient; + this.credentialsProvider = credentialsProvider; + } + + /** + * If one uses any other AWS SDK operations other than explicitly listed + * here, the exceptions thrown by those operations will not be wrapped as + * JMSException. + * + * @return amazonSQSClient + */ + @Override + public SqsAsyncClient getAmazonSQSClient() { + return amazonSQSClient; + } + + /** + * Calls deleteMessage and wraps SdkException. This is used to + * acknowledge single messages, so that they can be deleted from SQS queue. + * + * @param deleteMessageRequest Container for the necessary parameters to execute the + * deleteMessage service method on SqsAsyncClient. + * @throws JMSException + */ + @Override + public void deleteMessage(DeleteMessageRequest deleteMessageRequest) throws JMSException { + try { + amazonSQSClient.deleteMessage(prepareRequest(deleteMessageRequest)); + } catch (SdkException e) { + throw handleException(e, "deleteMessage"); + } + } + + /** + * Calls deleteMessageBatch and wraps + * SdkException. This is used to acknowledge multiple + * messages on client_acknowledge mode, so that they can be deleted from SQS + * queue. + * + * @param deleteMessageBatchRequest Container for the necessary parameters to execute the + * deleteMessageBatch service method on SqsAsyncClient. This is the + * batch version of deleteMessage. Max batch size is 10. + * @return The response from the deleteMessageBatch service method, as + * returned by SqsAsyncClient + * @throws JMSException + */ + @Override + public DeleteMessageBatchResponse deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) throws JMSException { + try { + return amazonSQSClient.deleteMessageBatch(prepareRequest(deleteMessageBatchRequest)).join(); + } catch (SdkException e) { + throw handleException(e, "deleteMessageBatch"); + } + } + + /** + * Calls sendMessage and wraps + * AmazonClientException. + * + * @param sendMessageRequest Container for the necessary parameters to execute the + * sendMessage service method on SqsAsyncClient. + * @return The response from the sendMessage service method, as returned by + * SqsAsyncClient + * @throws JMSException + */ + @Override + public SendMessageResponse sendMessage(SendMessageRequest sendMessageRequest) throws JMSException { + try { + return amazonSQSClient.sendMessage(prepareRequest(sendMessageRequest)).join(); + } catch (SdkException e) { + throw handleException(e, "sendMessage"); + } + } + + /** + * Check if the requested queue exists. This function calls + * GetQueueUrl for the given queue name, returning true on + * success, false if it gets QueueDoesNotExistException. + * + * @param queueName the queue to check + * @return true if the queue exists, false if it doesn't. + * @throws JMSException + */ + @Override + public boolean queueExists(String queueName) throws JMSException { + try { + GetQueueUrlRequest getQueueUrlRequest = GetQueueUrlRequest.builder().queueName(queueName).build(); + amazonSQSClient.getQueueUrl(prepareRequest(getQueueUrlRequest)); + return true; + } catch (QueueDoesNotExistException e) { + return false; + } catch (SdkException e) { + throw handleException(e, "getQueueUrl"); + } + } + + /** + * Check if the requested queue exists. This function calls + * GetQueueUrl for the given queue name with the given owner + * accountId, returning true on success, false if it gets + * QueueDoesNotExistException. + * + * @param queueName the queue to check + * @param queueOwnerAccountId The AWS accountId of the account that created the queue + * @return true if the queue exists, false if it doesn't. + * @throws JMSException + */ + @Override + public boolean queueExists(String queueName, String queueOwnerAccountId) throws JMSException { + try { + GetQueueUrlRequest getQueueUrlRequest = GetQueueUrlRequest.builder() + .queueName(queueName) + .queueOwnerAWSAccountId(queueOwnerAccountId) + .build(); + amazonSQSClient.getQueueUrl(prepareRequest(getQueueUrlRequest)); + return true; + } catch (QueueDoesNotExistException e) { + return false; + } catch (SdkException e) { + throw handleException(e, "getQueueUrl"); + } + } + + /** + * Gets the queueUrl of a queue given a queue name. + * + * @param queueName + * @return The response from the GetQueueUrl service method, as returned by + * SqsAsyncClient, which will include queue`s URL + * @throws JMSException + */ + @Override + public GetQueueUrlResponse getQueueUrl(String queueName) throws JMSException { + GetQueueUrlRequest request = GetQueueUrlRequest.builder() + .queueName(queueName) + .build(); + return getQueueUrl(request); + } + + /** + * Gets the queueUrl of a queue given a queue name owned by the provided accountId. + * + * @param queueName + * @param queueOwnerAccountId The AWS accountId of the account that created the queue + * @return The response from the GetQueueUrl service method, as returned by + * SqsAsyncClient, which will include queue`s URL + * @throws JMSException + */ + @Override + public GetQueueUrlResponse getQueueUrl(String queueName, String queueOwnerAccountId) throws JMSException { + GetQueueUrlRequest request = GetQueueUrlRequest.builder() + .queueName(queueName) + .queueOwnerAWSAccountId(queueOwnerAccountId) + .build(); + return getQueueUrl(request); + } + + /** + * Calls getQueueUrl and wraps SdkException + * + * @param getQueueUrlRequest Container for the necessary parameters to execute the + * getQueueUrl service method on SqsAsyncClient. + * @return The response from the GetQueueUrl service method, as returned by + * SqsAsyncClient, which will include queue`s URL + * @throws JMSException + */ + @Override + public GetQueueUrlResponse getQueueUrl(GetQueueUrlRequest getQueueUrlRequest) throws JMSException { + try { + return amazonSQSClient.getQueueUrl(prepareRequest(getQueueUrlRequest)).join(); + } catch (SdkException e) { + throw handleException(e, "getQueueUrl"); + } + } + + /** + * Calls createQueue to create the queue with the default queue attributes, + * and wraps SdkException + * + * @param queueName + * @return The response from the createQueue service method, as returned by + * SqsAsyncClient. This call creates a new queue, or returns the URL of + * an existing one. + * @throws JMSException + */ + @Override + public CreateQueueResponse createQueue(String queueName) throws JMSException { + return createQueue(CreateQueueRequest.builder().queueName(queueName).build()); + } + + /** + * Calls createQueue to create the queue with the provided queue attributes + * if any, and wraps SdkException + * + * @param createQueueRequest Container for the necessary parameters to execute the + * createQueue service method on SqsAsyncClient. + * @return The response from the createQueue service method, as returned by + * SqsAsyncClient. This call creates a new queue, or returns the URL of + * an existing one. + * @throws JMSException + */ + @Override + public CreateQueueResponse createQueue(CreateQueueRequest createQueueRequest) throws JMSException { + try { + return amazonSQSClient.createQueue(prepareRequest(createQueueRequest)).join(); + } catch (SdkException e) { + throw handleException(e, "createQueue"); + } + } + + /** + * Calls receiveMessage and wraps SdkException. Used by + * {@link SQSMessageConsumerPrefetch} to receive up to minimum of + * (numberOfMessagesToPrefetch,10) messages from SQS queue into consumer + * prefetch buffers. + * + * @param receiveMessageRequest Container for the necessary parameters to execute the + * receiveMessage service method on SqsAsyncClient. + * @return The response from the ReceiveMessage service method, as returned + * by SqsAsyncClient. + * @throws JMSException + */ + @Override + public ReceiveMessageResponse receiveMessage(ReceiveMessageRequest receiveMessageRequest) throws JMSException { + try { + return amazonSQSClient.receiveMessage(prepareRequest(receiveMessageRequest)).join(); + } catch (SdkException e) { + throw handleException(e, "receiveMessage"); + } + } + + /** + * Calls changeMessageVisibility and wraps SdkException. This is + * used to for negative acknowledge of a single message, so that messages can be received again without any delay. + * + * @param changeMessageVisibilityRequest Container for the necessary parameters to execute the + * changeMessageVisibility service method on SqsAsyncClient. + * @throws JMSException + */ + @Override + public void changeMessageVisibility(ChangeMessageVisibilityRequest changeMessageVisibilityRequest) throws JMSException { + try { + amazonSQSClient.changeMessageVisibility(prepareRequest(changeMessageVisibilityRequest)); + } catch (SdkException e) { + throw handleException(e, "changeMessageVisibility"); + } + } + + /** + * Calls changeMessageVisibilityBatch and wraps SdkException. This is + * used to for negative acknowledge of messages in batch, so that messages + * can be received again without any delay. + * + * @param changeMessageVisibilityBatchRequest Container for the necessary parameters to execute the + * changeMessageVisibilityBatch service method on SqsClient. + * @return The response from the changeMessageVisibilityBatch service + * method, as returned by SqsClient. + * @throws JMSException + */ + @Override + public ChangeMessageVisibilityBatchResponse changeMessageVisibilityBatch( + ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest) throws JMSException { + try { + return amazonSQSClient.changeMessageVisibilityBatch(prepareRequest(changeMessageVisibilityBatchRequest)).join(); + } catch (SdkException e) { + throw handleException(e, "changeMessageVisibilityBatch"); + } + } + + /** + * Create generic error message for AwsServiceException. Message include + * Action, RequestId, HTTPStatusCode, and AmazonErrorCode. + */ + private String logAndGetAmazonServiceException(AwsServiceException ase, String action) { + String errorMessage = "AmazonServiceException: " + action + ". RequestId: " + ase.requestId() + + "\nHTTPStatusCode: " + ase.statusCode() + " AmazonErrorCode: " + errorCode(ase); + LOG.error(errorMessage, ase); + return errorMessage; + } + + /** + * Create generic error message for SdkException. Message include + * Action. + */ + private String logAndGetAmazonClientException(SdkException ace, String action) { + String errorMessage = "AmazonClientException: " + action + "."; + LOG.error(errorMessage, ace); + return errorMessage; + } + + private JMSException handleException(SdkException e, String operationName) { + JMSException jmsException; + if (e instanceof AwsServiceException se) { + if (e instanceof QueueDoesNotExistException) { + jmsException = new InvalidDestinationException( + logAndGetAmazonServiceException(se, operationName), errorCode(se)); + } else if (isJMSSecurityException(se)) { + jmsException = new JMSSecurityException( + logAndGetAmazonServiceException(se, operationName), errorCode(se)); + } else { + jmsException = new JMSException( + logAndGetAmazonServiceException(se, operationName), errorCode(se)); + } + + } else { + jmsException = new JMSException(logAndGetAmazonClientException(e, operationName)); + } + jmsException.initCause(e); + return jmsException; + } + + private static String errorCode(AwsServiceException e) { + return e.awsErrorDetails() != null && e.awsErrorDetails().errorCode() != null ? e.awsErrorDetails().errorCode() : ""; + } + + + private static boolean isJMSSecurityException(AwsServiceException e) { + return SECURITY_EXCEPTION_ERROR_CODES.contains(errorCode(e)); + } + + private T prepareRequest(T request) { + return credentialsProvider == null ? request : (T) request.toBuilder().overrideConfiguration( + AwsRequestOverrideConfiguration.builder().credentialsProvider(credentialsProvider).build()) + .build(); + } + +} diff --git a/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSMessagingClient.java b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSMessagingClient.java new file mode 100644 index 0000000..7949441 --- /dev/null +++ b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSMessagingClient.java @@ -0,0 +1,37 @@ +package com.amazon.sqs.javamessaging; + +import jakarta.jms.JMSException; +import software.amazon.awssdk.awscore.AwsClient; +import software.amazon.awssdk.services.sqs.model.*; + +public interface AmazonSQSMessagingClient { + + AwsClient getAmazonSQSClient(); + + void deleteMessage(DeleteMessageRequest deleteMessageRequest) throws JMSException; + + DeleteMessageBatchResponse deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) throws JMSException; + + SendMessageResponse sendMessage(SendMessageRequest sendMessageRequest) throws JMSException; + + boolean queueExists(String queueName) throws JMSException; + + boolean queueExists(String queueName, String queueOwnerAccountId) throws JMSException; + + GetQueueUrlResponse getQueueUrl(String queueName) throws JMSException; + + GetQueueUrlResponse getQueueUrl(String queueName, String queueOwnerAccountId) throws JMSException; + + GetQueueUrlResponse getQueueUrl(GetQueueUrlRequest getQueueUrlRequest) throws JMSException; + + CreateQueueResponse createQueue(String queueName) throws JMSException; + + CreateQueueResponse createQueue(CreateQueueRequest createQueueRequest) throws JMSException; + + ReceiveMessageResponse receiveMessage(ReceiveMessageRequest receiveMessageRequest) throws JMSException; + + void changeMessageVisibility(ChangeMessageVisibilityRequest changeMessageVisibilityRequest) throws JMSException; + + ChangeMessageVisibilityBatchResponse changeMessageVisibilityBatch( + ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest) throws JMSException; +} diff --git a/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSMessagingClientWrapper.java b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSMessagingClientWrapper.java index 3c82b4b..254ba26 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSMessagingClientWrapper.java +++ b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSMessagingClientWrapper.java @@ -49,7 +49,7 @@ * AwsServiceException and SdkException into * JMSException/JMSSecurityException. */ -public class AmazonSQSMessagingClientWrapper { +public class AmazonSQSMessagingClientWrapper implements AmazonSQSMessagingClient { private static final Logger LOG = LoggerFactory.getLogger(AmazonSQSMessagingClientWrapper.class); /** @@ -90,6 +90,7 @@ public AmazonSQSMessagingClientWrapper(SqsClient amazonSQSClient, AwsCredentials * * @return amazonSQSClient */ + @Override public SqsClient getAmazonSQSClient() { return amazonSQSClient; } @@ -102,6 +103,7 @@ public SqsClient getAmazonSQSClient() { * deleteMessage service method on SqsClient. * @throws JMSException */ + @Override public void deleteMessage(DeleteMessageRequest deleteMessageRequest) throws JMSException { try { amazonSQSClient.deleteMessage(prepareRequest(deleteMessageRequest)); @@ -123,6 +125,7 @@ public void deleteMessage(DeleteMessageRequest deleteMessageRequest) throws JMSE * returned by SqsClient * @throws JMSException */ + @Override public DeleteMessageBatchResponse deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) throws JMSException { try { return amazonSQSClient.deleteMessageBatch(prepareRequest(deleteMessageBatchRequest)); @@ -141,6 +144,7 @@ public DeleteMessageBatchResponse deleteMessageBatch(DeleteMessageBatchRequest d * SqsClient * @throws JMSException */ + @Override public SendMessageResponse sendMessage(SendMessageRequest sendMessageRequest) throws JMSException { try { return amazonSQSClient.sendMessage(prepareRequest(sendMessageRequest)); @@ -158,6 +162,7 @@ public SendMessageResponse sendMessage(SendMessageRequest sendMessageRequest) th * @return true if the queue exists, false if it doesn't. * @throws JMSException */ + @Override public boolean queueExists(String queueName) throws JMSException { try { GetQueueUrlRequest getQueueUrlRequest = GetQueueUrlRequest.builder().queueName(queueName).build(); @@ -181,6 +186,7 @@ public boolean queueExists(String queueName) throws JMSException { * @return true if the queue exists, false if it doesn't. * @throws JMSException */ + @Override public boolean queueExists(String queueName, String queueOwnerAccountId) throws JMSException { try { GetQueueUrlRequest getQueueUrlRequest = GetQueueUrlRequest.builder() @@ -204,6 +210,7 @@ public boolean queueExists(String queueName, String queueOwnerAccountId) throws * SqsClient, which will include queue`s URL * @throws JMSException */ + @Override public GetQueueUrlResponse getQueueUrl(String queueName) throws JMSException { GetQueueUrlRequest request = GetQueueUrlRequest.builder() .queueName(queueName) @@ -220,6 +227,7 @@ public GetQueueUrlResponse getQueueUrl(String queueName) throws JMSException { * SqsClient, which will include queue`s URL * @throws JMSException */ + @Override public GetQueueUrlResponse getQueueUrl(String queueName, String queueOwnerAccountId) throws JMSException { GetQueueUrlRequest request = GetQueueUrlRequest.builder() .queueName(queueName) @@ -237,6 +245,7 @@ public GetQueueUrlResponse getQueueUrl(String queueName, String queueOwnerAccoun * SqsClient, which will include queue`s URL * @throws JMSException */ + @Override public GetQueueUrlResponse getQueueUrl(GetQueueUrlRequest getQueueUrlRequest) throws JMSException { try { return amazonSQSClient.getQueueUrl(prepareRequest(getQueueUrlRequest)); @@ -255,6 +264,7 @@ public GetQueueUrlResponse getQueueUrl(GetQueueUrlRequest getQueueUrlRequest) th * an existing one. * @throws JMSException */ + @Override public CreateQueueResponse createQueue(String queueName) throws JMSException { return createQueue(CreateQueueRequest.builder().queueName(queueName).build()); } @@ -270,6 +280,7 @@ public CreateQueueResponse createQueue(String queueName) throws JMSException { * an existing one. * @throws JMSException */ + @Override public CreateQueueResponse createQueue(CreateQueueRequest createQueueRequest) throws JMSException { try { return amazonSQSClient.createQueue(prepareRequest(createQueueRequest)); @@ -290,6 +301,7 @@ public CreateQueueResponse createQueue(CreateQueueRequest createQueueRequest) th * by SqsClient. * @throws JMSException */ + @Override public ReceiveMessageResponse receiveMessage(ReceiveMessageRequest receiveMessageRequest) throws JMSException { try { return amazonSQSClient.receiveMessage(prepareRequest(receiveMessageRequest)); @@ -306,6 +318,7 @@ public ReceiveMessageResponse receiveMessage(ReceiveMessageRequest receiveMessag * changeMessageVisibility service method on SqsClient. * @throws JMSException */ + @Override public void changeMessageVisibility(ChangeMessageVisibilityRequest changeMessageVisibilityRequest) throws JMSException { try { amazonSQSClient.changeMessageVisibility(prepareRequest(changeMessageVisibilityRequest)); @@ -325,6 +338,7 @@ public void changeMessageVisibility(ChangeMessageVisibilityRequest changeMessage * method, as returned by SqsClient. * @throws JMSException */ + @Override public ChangeMessageVisibilityBatchResponse changeMessageVisibilityBatch( ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest) throws JMSException { try { diff --git a/src/main/java/com/amazon/sqs/javamessaging/SQSAsyncConnectionFactory.java b/src/main/java/com/amazon/sqs/javamessaging/SQSAsyncConnectionFactory.java new file mode 100644 index 0000000..ab20537 --- /dev/null +++ b/src/main/java/com/amazon/sqs/javamessaging/SQSAsyncConnectionFactory.java @@ -0,0 +1,110 @@ +package com.amazon.sqs.javamessaging; + +import jakarta.jms.JMSException; +import jakarta.jms.QueueConnection; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.SqsAsyncClientBuilder; + +import java.util.function.Supplier; + +public class SQSAsyncConnectionFactory extends AbstractSQSConnectionFactory { + private final ProviderConfiguration providerConfiguration; + private final Supplier amazonSQSClientSupplier; + + /** + * Creates a SQSConnectionFactory that uses default ProviderConfiguration + * and SqsClientAsyncBuilder.standard() for creating SqsAsyncClient connections. + * Every SQSConnection will have its own copy of SqsAsyncClient. + */ + public SQSAsyncConnectionFactory() { + this(new ProviderConfiguration()); + } + + /** + * Creates a SQSConnectionFactory that uses SqsAsyncClientBuilder.standard() for creating SqsAsyncClient connections. + * Every SQSConnection will have its own copy of SqsAsyncClient. + */ + public SQSAsyncConnectionFactory(ProviderConfiguration providerConfiguration) { + this(providerConfiguration, SqsAsyncClient.create()); + } + + /** + * Creates a SQSConnectionFactory that uses the provided SqsAsyncClient connection. + * Every SQSConnection will use the same provided SqsAsyncClient. + */ + public SQSAsyncConnectionFactory(ProviderConfiguration providerConfiguration, final SqsAsyncClient client) { + if (providerConfiguration == null) { + throw new IllegalArgumentException("Provider configuration cannot be null"); + } + if (client == null) { + throw new IllegalArgumentException("AmazonSQS client cannot be null"); + } + this.providerConfiguration = providerConfiguration; + this.amazonSQSClientSupplier = () -> client; + } + + /** + * Creates a SQSConnectionFactory that uses the provided SqsClientBuilder for creating AmazonSQS client connections. + * Every SQSConnection will have its own copy of AmazonSQS client created through the provided builder. + */ + public SQSAsyncConnectionFactory(ProviderConfiguration providerConfiguration, final SqsAsyncClientBuilder clientBuilder) { + if (providerConfiguration == null) { + throw new IllegalArgumentException("Provider configuration cannot be null"); + } + if (clientBuilder == null) { + throw new IllegalArgumentException("AmazonSQS client builder cannot be null"); + } + this.providerConfiguration = providerConfiguration; + this.amazonSQSClientSupplier = clientBuilder::build; + } + + + @Override + public SQSConnection createConnection() throws JMSException { + try { + SqsAsyncClient amazonSQS = amazonSQSClientSupplier.get(); + return createConnection(amazonSQS, null); + } catch (RuntimeException e) { + throw (JMSException) new JMSException("Error creating SQS client: " + e.getMessage()).initCause(e); + } + } + + @Override + public SQSConnection createConnection(String awsAccessKeyId, String awsSecretKey) throws JMSException { + AwsBasicCredentials basicAWSCredentials = AwsBasicCredentials.create(awsAccessKeyId, awsSecretKey); + return createConnection(basicAWSCredentials); + } + + public SQSConnection createConnection(AwsCredentials awsCredentials) throws JMSException { + AwsCredentialsProvider awsCredentialsProvider = StaticCredentialsProvider.create(awsCredentials); + return createConnection(awsCredentialsProvider); + } + + public SQSConnection createConnection(AwsCredentialsProvider awsCredentialsProvider) throws JMSException { + try { + SqsAsyncClient amazonSQS = amazonSQSClientSupplier.get(); + return createConnection(amazonSQS, awsCredentialsProvider); + } catch (Exception e) { + throw (JMSException) new JMSException("Error creating SQS client: " + e.getMessage()).initCause(e); + } + } + + private SQSConnection createConnection(SqsAsyncClient amazonSQS, AwsCredentialsProvider awsCredentialsProvider) throws JMSException { + AmazonSQSAsyncMessagingClientWrapper amazonSQSClientJMSWrapper = new AmazonSQSAsyncMessagingClientWrapper(amazonSQS, awsCredentialsProvider); + return new SQSConnection(amazonSQSClientJMSWrapper, providerConfiguration.getNumberOfMessagesToPrefetch()); + } + + @Override + public QueueConnection createQueueConnection() throws JMSException { + return createConnection(); + } + + @Override + public QueueConnection createQueueConnection(String userName, String password) throws JMSException { + return createConnection(userName, password); + } +} diff --git a/src/main/java/com/amazon/sqs/javamessaging/SQSConnection.java b/src/main/java/com/amazon/sqs/javamessaging/SQSConnection.java index da4dd14..ad6e380 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/SQSConnection.java +++ b/src/main/java/com/amazon/sqs/javamessaging/SQSConnection.java @@ -31,7 +31,7 @@ import jakarta.jms.Topic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.awscore.AwsClient; import java.util.Collections; import java.util.Set; @@ -84,7 +84,7 @@ public class SQSConnection implements Connection, QueueConnection { /** Used for interactions with connection state. */ private final Object stateLock = new Object(); - private final AmazonSQSMessagingClientWrapper amazonSQSClient; + private final AmazonSQSMessagingClient amazonSQSClient; /** * Configures the amount of messages that can be prefetched by a consumer. A @@ -106,7 +106,7 @@ public class SQSConnection implements Connection, QueueConnection { private final Set sessions = Collections.newSetFromMap(new ConcurrentHashMap<>()); - SQSConnection(AmazonSQSMessagingClientWrapper amazonSQSClientJMSWrapper, int numberOfMessagesToPrefetch) { + SQSConnection(AmazonSQSMessagingClient amazonSQSClientJMSWrapper, int numberOfMessagesToPrefetch) { amazonSQSClient = amazonSQSClientJMSWrapper; this.numberOfMessagesToPrefetch = numberOfMessagesToPrefetch; @@ -116,9 +116,9 @@ public class SQSConnection implements Connection, QueueConnection { * Get the AmazonSQSClient used by this connection. This can be used to do administrative operations * that aren't included in the JMS specification, e.g. creating new queues. * - * @return the SqsClient used by this connection + * @return the AwsClient used by this connection */ - public SqsClient getAmazonSQSClient() { + public AwsClient getAmazonSQSClient() { return amazonSQSClient.getAmazonSQSClient(); } @@ -130,7 +130,7 @@ public SqsClient getAmazonSQSClient() { * * @return wrapped version of the AmazonSQSClient used by this connection */ - public AmazonSQSMessagingClientWrapper getWrappedAmazonSQSClient() { + public AmazonSQSMessagingClient getWrappedAmazonSQSClient() { return amazonSQSClient; } diff --git a/src/main/java/com/amazon/sqs/javamessaging/SQSConnectionFactory.java b/src/main/java/com/amazon/sqs/javamessaging/SQSConnectionFactory.java index 99c3add..aaf58fb 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/SQSConnectionFactory.java +++ b/src/main/java/com/amazon/sqs/javamessaging/SQSConnectionFactory.java @@ -14,12 +14,8 @@ */ package com.amazon.sqs.javamessaging; -import jakarta.jms.ConnectionFactory; -import jakarta.jms.JMSContext; import jakarta.jms.JMSException; -import jakarta.jms.JMSRuntimeException; import jakarta.jms.QueueConnection; -import jakarta.jms.QueueConnectionFactory; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; @@ -48,7 +44,7 @@ * methods. */ -public class SQSConnectionFactory implements ConnectionFactory, QueueConnectionFactory { +public class SQSConnectionFactory extends AbstractSQSConnectionFactory { private final ProviderConfiguration providerConfiguration; private final Supplier amazonSQSClientSupplier; @@ -116,26 +112,6 @@ public SQSConnection createConnection(String awsAccessKeyId, String awsSecretKey return createConnection(basicAWSCredentials); } - @Override - public JMSContext createContext() { - throw new JMSRuntimeException(SQSMessagingClientConstants.UNSUPPORTED_METHOD); - } - - @Override - public JMSContext createContext(String userName, String password) { - throw new JMSRuntimeException(SQSMessagingClientConstants.UNSUPPORTED_METHOD); - } - - @Override - public JMSContext createContext(String userName, String password, int sessionMode) { - throw new JMSRuntimeException(SQSMessagingClientConstants.UNSUPPORTED_METHOD); - } - - @Override - public JMSContext createContext(int sessionMode) { - throw new JMSRuntimeException(SQSMessagingClientConstants.UNSUPPORTED_METHOD); - } - public SQSConnection createConnection(AwsCredentials awsCredentials) throws JMSException { AwsCredentialsProvider awsCredentialsProvider = StaticCredentialsProvider.create(awsCredentials); return createConnection(awsCredentialsProvider); diff --git a/src/main/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetch.java b/src/main/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetch.java index 7ea5258..f50b091 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetch.java +++ b/src/main/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetch.java @@ -63,7 +63,7 @@ public class SQSMessageConsumerPrefetch implements Runnable, PrefetchManager { protected static final String ALL = "All"; - private final AmazonSQSMessagingClientWrapper amazonSQSClient; + private final AmazonSQSMessagingClient amazonSQSClient; private final String queueUrl; @@ -124,7 +124,7 @@ public class SQSMessageConsumerPrefetch implements Runnable, PrefetchManager { SQSMessageConsumerPrefetch(SQSSessionCallbackScheduler sqsSessionRunnable, Acknowledger acknowledger, NegativeAcknowledger negativeAcknowledger, SQSQueueDestination sqsDestination, - AmazonSQSMessagingClientWrapper amazonSQSClient, int numberOfMessagesToPrefetch) { + AmazonSQSMessagingClient amazonSQSClient, int numberOfMessagesToPrefetch) { this.amazonSQSClient = amazonSQSClient; this.numberOfMessagesToPrefetch = numberOfMessagesToPrefetch; diff --git a/src/main/java/com/amazon/sqs/javamessaging/SQSMessageProducer.java b/src/main/java/com/amazon/sqs/javamessaging/SQSMessageProducer.java index b192099..52b85eb 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/SQSMessageProducer.java +++ b/src/main/java/com/amazon/sqs/javamessaging/SQSMessageProducer.java @@ -86,11 +86,11 @@ public class SQSMessageProducer implements MessageProducer, QueueSender { */ final AtomicBoolean closed = new AtomicBoolean(false); - private final AmazonSQSMessagingClientWrapper amazonSQSClient; + private final AmazonSQSMessagingClient amazonSQSClient; private final SQSQueueDestination sqsDestination; private final SQSSession parentSQSSession; - SQSMessageProducer(AmazonSQSMessagingClientWrapper amazonSQSClient, SQSSession parentSQSSession, + SQSMessageProducer(AmazonSQSMessagingClient amazonSQSClient, SQSSession parentSQSSession, SQSQueueDestination destination) throws JMSException { this.sqsDestination = destination; this.amazonSQSClient = amazonSQSClient; diff --git a/src/main/java/com/amazon/sqs/javamessaging/SQSSession.java b/src/main/java/com/amazon/sqs/javamessaging/SQSSession.java index ed81646..e80b628 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/SQSSession.java +++ b/src/main/java/com/amazon/sqs/javamessaging/SQSSession.java @@ -128,7 +128,7 @@ public class SQSSession implements Session, QueueSession { */ private volatile boolean closing = false; - private final AmazonSQSMessagingClientWrapper amazonSQSClient; + private final AmazonSQSMessagingClient amazonSQSClient; private final SQSConnection parentSQSConnection; /** diff --git a/src/main/java/com/amazon/sqs/javamessaging/acknowledge/AcknowledgeMode.java b/src/main/java/com/amazon/sqs/javamessaging/acknowledge/AcknowledgeMode.java index e22e424..0ef7824 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/acknowledge/AcknowledgeMode.java +++ b/src/main/java/com/amazon/sqs/javamessaging/acknowledge/AcknowledgeMode.java @@ -14,7 +14,7 @@ */ package com.amazon.sqs.javamessaging.acknowledge; -import com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper; +import com.amazon.sqs.javamessaging.AmazonSQSMessagingClient; import com.amazon.sqs.javamessaging.SQSSession; /** @@ -60,7 +60,7 @@ public int getOriginalAcknowledgeMode() { * @param parentSQSSession * the associated session for the acknowledger */ - public Acknowledger createAcknowledger(AmazonSQSMessagingClientWrapper amazonSQSClient, SQSSession parentSQSSession) { + public Acknowledger createAcknowledger(AmazonSQSMessagingClient amazonSQSClient, SQSSession parentSQSSession) { return switch (this) { case ACK_AUTO -> new AutoAcknowledger(amazonSQSClient, parentSQSSession); case ACK_RANGE -> new RangedAcknowledger(amazonSQSClient, parentSQSSession); diff --git a/src/main/java/com/amazon/sqs/javamessaging/acknowledge/AutoAcknowledger.java b/src/main/java/com/amazon/sqs/javamessaging/acknowledge/AutoAcknowledger.java index f155a2a..9d8d997 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/acknowledge/AutoAcknowledger.java +++ b/src/main/java/com/amazon/sqs/javamessaging/acknowledge/AutoAcknowledger.java @@ -14,7 +14,7 @@ */ package com.amazon.sqs.javamessaging.acknowledge; -import com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper; +import com.amazon.sqs.javamessaging.AmazonSQSMessagingClient; import com.amazon.sqs.javamessaging.SQSSession; import com.amazon.sqs.javamessaging.message.SQSMessage; import jakarta.jms.JMSException; @@ -31,10 +31,10 @@ */ public class AutoAcknowledger implements Acknowledger { - private final AmazonSQSMessagingClientWrapper amazonSQSClient; + private final AmazonSQSMessagingClient amazonSQSClient; private final SQSSession session; - public AutoAcknowledger(AmazonSQSMessagingClientWrapper amazonSQSClient, SQSSession session) { + public AutoAcknowledger(AmazonSQSMessagingClient amazonSQSClient, SQSSession session) { this.amazonSQSClient = amazonSQSClient; this.session = session; } diff --git a/src/main/java/com/amazon/sqs/javamessaging/acknowledge/NegativeAcknowledger.java b/src/main/java/com/amazon/sqs/javamessaging/acknowledge/NegativeAcknowledger.java index 4b4b581..82e6c5f 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/acknowledge/NegativeAcknowledger.java +++ b/src/main/java/com/amazon/sqs/javamessaging/acknowledge/NegativeAcknowledger.java @@ -14,7 +14,7 @@ */ package com.amazon.sqs.javamessaging.acknowledge; -import com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper; +import com.amazon.sqs.javamessaging.AmazonSQSMessagingClient; import com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.MessageManager; import com.amazon.sqs.javamessaging.SQSMessagingClientConstants; import com.amazon.sqs.javamessaging.message.SQSMessage; @@ -39,9 +39,9 @@ public class NegativeAcknowledger extends BulkSQSOperation { private static final int NACK_TIMEOUT = 0; - private final AmazonSQSMessagingClientWrapper amazonSQSClient; + private final AmazonSQSMessagingClient amazonSQSClient; - public NegativeAcknowledger(AmazonSQSMessagingClientWrapper amazonSQSClient) { + public NegativeAcknowledger(AmazonSQSMessagingClient amazonSQSClient) { this.amazonSQSClient = amazonSQSClient; } diff --git a/src/main/java/com/amazon/sqs/javamessaging/acknowledge/RangedAcknowledger.java b/src/main/java/com/amazon/sqs/javamessaging/acknowledge/RangedAcknowledger.java index 72e4b4f..5d855da 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/acknowledge/RangedAcknowledger.java +++ b/src/main/java/com/amazon/sqs/javamessaging/acknowledge/RangedAcknowledger.java @@ -14,7 +14,7 @@ */ package com.amazon.sqs.javamessaging.acknowledge; -import com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper; +import com.amazon.sqs.javamessaging.AmazonSQSMessagingClient; import com.amazon.sqs.javamessaging.SQSSession; import com.amazon.sqs.javamessaging.message.SQSMessage; import jakarta.jms.JMSException; @@ -41,13 +41,13 @@ public class RangedAcknowledger extends BulkSQSOperation implements Acknowledger { private static final Logger LOG = LoggerFactory.getLogger(RangedAcknowledger.class); - private final AmazonSQSMessagingClientWrapper amazonSQSClient; + private final AmazonSQSMessagingClient amazonSQSClient; private final SQSSession session; private final Queue unAckMessages; - public RangedAcknowledger(AmazonSQSMessagingClientWrapper amazonSQSClient, SQSSession session) { + public RangedAcknowledger(AmazonSQSMessagingClient amazonSQSClient, SQSSession session) { this.amazonSQSClient = amazonSQSClient; this.session = session; this.unAckMessages = new LinkedList<>(); diff --git a/src/main/java/com/amazon/sqs/javamessaging/acknowledge/UnorderedAcknowledger.java b/src/main/java/com/amazon/sqs/javamessaging/acknowledge/UnorderedAcknowledger.java index c77469c..026960f 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/acknowledge/UnorderedAcknowledger.java +++ b/src/main/java/com/amazon/sqs/javamessaging/acknowledge/UnorderedAcknowledger.java @@ -14,7 +14,7 @@ */ package com.amazon.sqs.javamessaging.acknowledge; -import com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper; +import com.amazon.sqs.javamessaging.AmazonSQSMessagingClient; import com.amazon.sqs.javamessaging.SQSSession; import com.amazon.sqs.javamessaging.message.SQSMessage; import jakarta.jms.JMSException; @@ -32,7 +32,7 @@ */ public class UnorderedAcknowledger implements Acknowledger { - private final AmazonSQSMessagingClientWrapper amazonSQSClient; + private final AmazonSQSMessagingClient amazonSQSClient; private final SQSSession session; @@ -40,7 +40,7 @@ public class UnorderedAcknowledger implements Acknowledger { // identifier private final Map unAckMessages; - public UnorderedAcknowledger (AmazonSQSMessagingClientWrapper amazonSQSClient, SQSSession session) { + public UnorderedAcknowledger (AmazonSQSMessagingClient amazonSQSClient, SQSSession session) { this.amazonSQSClient = amazonSQSClient; this.session = session; this.unAckMessages = new HashMap<>(); diff --git a/src/test/java/com/amazon/sqs/javamessaging/AcknowledgerCommon.java b/src/test/java/com/amazon/sqs/javamessaging/AcknowledgerCommon.java index a73f837..4eb58e5 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/AcknowledgerCommon.java +++ b/src/test/java/com/amazon/sqs/javamessaging/AcknowledgerCommon.java @@ -35,7 +35,7 @@ public class AcknowledgerCommon { protected String baseQueueUrl = "queueUrl"; protected Acknowledger acknowledger; - protected AmazonSQSMessagingClientWrapper amazonSQSClient; + protected AmazonSQSMessagingClient amazonSQSClient; protected List populatedMessages = new ArrayList<>(); /* diff --git a/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSMessagingClientWrapperTest.java b/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSMessagingClientWrapperTest.java index 69fd08f..8f1cae0 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSMessagingClientWrapperTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSMessagingClientWrapperTest.java @@ -49,7 +49,7 @@ public class AmazonSQSMessagingClientWrapperTest { private static final String OWNER_ACCOUNT_ID = "accountId"; private SqsClient amazonSQSClient; - private AmazonSQSMessagingClientWrapper wrapper; + private AmazonSQSMessagingClient wrapper; @BeforeEach public void setup() throws JMSException { diff --git a/src/test/java/com/amazon/sqs/javamessaging/AutoAcknowledgerTest.java b/src/test/java/com/amazon/sqs/javamessaging/AutoAcknowledgerTest.java index 05c6cb9..ad10e2c 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/AutoAcknowledgerTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/AutoAcknowledgerTest.java @@ -41,7 +41,7 @@ public class AutoAcknowledgerTest { private static final String RECEIPT_HANDLE = "ReceiptHandle"; private AutoAcknowledger acknowledger; - private AmazonSQSMessagingClientWrapper amazonSQSClient; + private AmazonSQSMessagingClient amazonSQSClient; private SQSSession session; @BeforeEach diff --git a/src/test/java/com/amazon/sqs/javamessaging/MessageListenerConcurrentOperationTest.java b/src/test/java/com/amazon/sqs/javamessaging/MessageListenerConcurrentOperationTest.java index 045e776..1d4511d 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/MessageListenerConcurrentOperationTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/MessageListenerConcurrentOperationTest.java @@ -44,7 +44,7 @@ public class MessageListenerConcurrentOperationTest { private static final String QUEUE_NAME = "queueName"; private static final int NUMBER_OF_MESSAGES_TO_PREFETCH = 10; - private AmazonSQSMessagingClientWrapper amazonSQSClient; + private AmazonSQSMessagingClient amazonSQSClient; private SQSMessageConsumerPrefetch.MessageManager msgManager; private volatile SQSSession session; private volatile SQSConnection connection; diff --git a/src/test/java/com/amazon/sqs/javamessaging/SQSConnectionTest.java b/src/test/java/com/amazon/sqs/javamessaging/SQSConnectionTest.java index c2a7b43..0f3ed5a 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/SQSConnectionTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/SQSConnectionTest.java @@ -58,7 +58,7 @@ public void setup() throws JMSException { destination = new SQSQueueDestination(QUEUE_NAME, QUEUE_URL); int numberOfMessagesToPrefetch = 10; - AmazonSQSMessagingClientWrapper amazonSQSClientJMSWrapper = mock(AmazonSQSMessagingClientWrapper.class); + AmazonSQSMessagingClient amazonSQSClientJMSWrapper = mock(AmazonSQSMessagingClientWrapper.class); sqsConnection = spy(new SQSConnection(amazonSQSClientJMSWrapper, numberOfMessagesToPrefetch)); session1 = mock(SQSSession.class); diff --git a/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchFifoTest.java b/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchFifoTest.java index a6d079d..5f4c597 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchFifoTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchFifoTest.java @@ -65,7 +65,7 @@ public class SQSMessageConsumerPrefetchFifoTest { private NegativeAcknowledger negativeAcknowledger; private SQSMessageConsumerPrefetch consumerPrefetch; - private AmazonSQSMessagingClientWrapper amazonSQSClient; + private AmazonSQSMessagingClient amazonSQSClient; /** * Test one full prefetch operation works as expected diff --git a/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchTest.java b/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchTest.java index 4d52c70..aff5bcc 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchTest.java @@ -91,7 +91,7 @@ public class SQSMessageConsumerPrefetchTest { private ExponentialBackoffStrategy backoffStrategy; private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); - private AmazonSQSMessagingClientWrapper amazonSQSClient; + private AmazonSQSMessagingClient amazonSQSClient; /** * Test one full prefetch operation works as expected diff --git a/src/test/java/com/amazon/sqs/javamessaging/SQSMessageProducerFifoTest.java b/src/test/java/com/amazon/sqs/javamessaging/SQSMessageProducerFifoTest.java index 3fa1b98..0dcb6b9 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/SQSMessageProducerFifoTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/SQSMessageProducerFifoTest.java @@ -62,7 +62,7 @@ public class SQSMessageProducerFifoTest { private SQSMessageProducer producer; private SQSQueueDestination destination; - private AmazonSQSMessagingClientWrapper amazonSQSClient; + private AmazonSQSMessagingClient amazonSQSClient; private Acknowledger acknowledger; @BeforeEach diff --git a/src/test/java/com/amazon/sqs/javamessaging/SQSMessageProducerTest.java b/src/test/java/com/amazon/sqs/javamessaging/SQSMessageProducerTest.java index ace019a..d6bca02 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/SQSMessageProducerTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/SQSMessageProducerTest.java @@ -72,7 +72,7 @@ public class SQSMessageProducerTest { private SQSMessageProducer producer; private SQSQueueDestination destination; private SQSSession sqsSession; - private AmazonSQSMessagingClientWrapper amazonSQSClient; + private AmazonSQSMessagingClient amazonSQSClient; private Acknowledger acknowledger; @BeforeEach diff --git a/src/test/java/com/amazon/sqs/javamessaging/SQSSessionTest.java b/src/test/java/com/amazon/sqs/javamessaging/SQSSessionTest.java index 5dc4b51..cbac0a2 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/SQSSessionTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/SQSSessionTest.java @@ -80,7 +80,7 @@ public class SQSSessionTest { private SQSMessageConsumer consumer2; private SQSMessageProducer producer1; private SQSMessageProducer producer2; - private AmazonSQSMessagingClientWrapper sqsClientJMSWrapper; + private AmazonSQSMessagingClient sqsClientJMSWrapper; @BeforeEach public void setup() throws JMSException {