diff --git a/src/main/java/com/amazon/sqs/javamessaging/AbstractSQSConnectionFactory.java b/src/main/java/com/amazon/sqs/javamessaging/AbstractSQSConnectionFactory.java
new file mode 100644
index 0000000..a25d783
--- /dev/null
+++ b/src/main/java/com/amazon/sqs/javamessaging/AbstractSQSConnectionFactory.java
@@ -0,0 +1,29 @@
+package com.amazon.sqs.javamessaging;
+
+import jakarta.jms.ConnectionFactory;
+import jakarta.jms.JMSContext;
+import jakarta.jms.JMSRuntimeException;
+import jakarta.jms.QueueConnectionFactory;
+
+public abstract class AbstractSQSConnectionFactory implements ConnectionFactory, QueueConnectionFactory {
+
+ @Override
+ public JMSContext createContext() {
+ throw new JMSRuntimeException(SQSMessagingClientConstants.UNSUPPORTED_METHOD);
+ }
+
+ @Override
+ public JMSContext createContext(String userName, String password) {
+ throw new JMSRuntimeException(SQSMessagingClientConstants.UNSUPPORTED_METHOD);
+ }
+
+ @Override
+ public JMSContext createContext(String userName, String password, int sessionMode) {
+ throw new JMSRuntimeException(SQSMessagingClientConstants.UNSUPPORTED_METHOD);
+ }
+
+ @Override
+ public JMSContext createContext(int sessionMode) {
+ throw new JMSRuntimeException(SQSMessagingClientConstants.UNSUPPORTED_METHOD);
+ }
+}
diff --git a/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSAsyncMessagingClientWrapper.java b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSAsyncMessagingClientWrapper.java
new file mode 100644
index 0000000..4efbb1d
--- /dev/null
+++ b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSAsyncMessagingClientWrapper.java
@@ -0,0 +1,394 @@
+package com.amazon.sqs.javamessaging;
+
+import jakarta.jms.InvalidDestinationException;
+import jakarta.jms.JMSException;
+import jakarta.jms.JMSSecurityException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.awscore.AwsRequest;
+import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.core.exception.SdkException;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest;
+import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchResponse;
+import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
+import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
+import software.amazon.awssdk.services.sqs.model.CreateQueueResponse;
+import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
+import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;
+import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
+import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
+import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse;
+import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;
+import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
+import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageResponse;
+
+import java.util.Set;
+
+
+/**
+ * This is a JMS Wrapper of SqsAsyncClient
. This class changes all
+ * AwsServiceException
and SdkException
into
+ * JMSException/JMSSecurityException.
+ */
+public class AmazonSQSAsyncMessagingClientWrapper implements AmazonSQSMessagingClient {
+ private static final Logger LOG = LoggerFactory.getLogger(AmazonSQSAsyncMessagingClientWrapper.class);
+
+ /**
+ * List of exceptions that can classified as security. These exceptions
+ * are not thrown during connection-set-up rather after the service
+ * calls of the SqsAsyncClient
.
+ */
+ private static final Set SECURITY_EXCEPTION_ERROR_CODES = Set.of("MissingClientTokenId",
+ "InvalidClientTokenId", "MissingAuthenticationToken", "AccessDenied");
+
+ private final SqsAsyncClient amazonSQSClient;
+ private final AwsCredentialsProvider credentialsProvider;
+
+ /**
+ * @param amazonSQSClient The AWS SDK Client for SQS.
+ * @throws JMSException if the client is null
+ */
+ public AmazonSQSAsyncMessagingClientWrapper(SqsAsyncClient amazonSQSClient) throws JMSException {
+ this(amazonSQSClient, null);
+ }
+
+ /**
+ * @param amazonSQSClient The AWS SDK Client for SQS.
+ * @throws JMSException if the client is null
+ */
+ public AmazonSQSAsyncMessagingClientWrapper(SqsAsyncClient amazonSQSClient, AwsCredentialsProvider credentialsProvider) throws JMSException {
+ if (amazonSQSClient == null) {
+ throw new JMSException("Amazon SQS client cannot be null");
+ }
+ this.amazonSQSClient = amazonSQSClient;
+ this.credentialsProvider = credentialsProvider;
+ }
+
+ /**
+ * If one uses any other AWS SDK operations other than explicitly listed
+ * here, the exceptions thrown by those operations will not be wrapped as
+ * JMSException
.
+ *
+ * @return amazonSQSClient
+ */
+ @Override
+ public SqsAsyncClient getAmazonSQSClient() {
+ return amazonSQSClient;
+ }
+
+ /**
+ * Calls deleteMessage
and wraps SdkException
. This is used to
+ * acknowledge single messages, so that they can be deleted from SQS queue.
+ *
+ * @param deleteMessageRequest Container for the necessary parameters to execute the
+ * deleteMessage service method on SqsAsyncClient.
+ * @throws JMSException
+ */
+ @Override
+ public void deleteMessage(DeleteMessageRequest deleteMessageRequest) throws JMSException {
+ try {
+ amazonSQSClient.deleteMessage(prepareRequest(deleteMessageRequest));
+ } catch (SdkException e) {
+ throw handleException(e, "deleteMessage");
+ }
+ }
+
+ /**
+ * Calls deleteMessageBatch
and wraps
+ * SdkException
. This is used to acknowledge multiple
+ * messages on client_acknowledge mode, so that they can be deleted from SQS
+ * queue.
+ *
+ * @param deleteMessageBatchRequest Container for the necessary parameters to execute the
+ * deleteMessageBatch service method on SqsAsyncClient. This is the
+ * batch version of deleteMessage. Max batch size is 10.
+ * @return The response from the deleteMessageBatch service method, as
+ * returned by SqsAsyncClient
+ * @throws JMSException
+ */
+ @Override
+ public DeleteMessageBatchResponse deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) throws JMSException {
+ try {
+ return amazonSQSClient.deleteMessageBatch(prepareRequest(deleteMessageBatchRequest)).join();
+ } catch (SdkException e) {
+ throw handleException(e, "deleteMessageBatch");
+ }
+ }
+
+ /**
+ * Calls sendMessage
and wraps
+ * AmazonClientException
.
+ *
+ * @param sendMessageRequest Container for the necessary parameters to execute the
+ * sendMessage service method on SqsAsyncClient.
+ * @return The response from the sendMessage service method, as returned by
+ * SqsAsyncClient
+ * @throws JMSException
+ */
+ @Override
+ public SendMessageResponse sendMessage(SendMessageRequest sendMessageRequest) throws JMSException {
+ try {
+ return amazonSQSClient.sendMessage(prepareRequest(sendMessageRequest)).join();
+ } catch (SdkException e) {
+ throw handleException(e, "sendMessage");
+ }
+ }
+
+ /**
+ * Check if the requested queue exists. This function calls
+ * GetQueueUrl
for the given queue name, returning true on
+ * success, false if it gets QueueDoesNotExistException
.
+ *
+ * @param queueName the queue to check
+ * @return true if the queue exists, false if it doesn't.
+ * @throws JMSException
+ */
+ @Override
+ public boolean queueExists(String queueName) throws JMSException {
+ try {
+ GetQueueUrlRequest getQueueUrlRequest = GetQueueUrlRequest.builder().queueName(queueName).build();
+ amazonSQSClient.getQueueUrl(prepareRequest(getQueueUrlRequest));
+ return true;
+ } catch (QueueDoesNotExistException e) {
+ return false;
+ } catch (SdkException e) {
+ throw handleException(e, "getQueueUrl");
+ }
+ }
+
+ /**
+ * Check if the requested queue exists. This function calls
+ * GetQueueUrl
for the given queue name with the given owner
+ * accountId, returning true on success, false if it gets
+ * QueueDoesNotExistException
.
+ *
+ * @param queueName the queue to check
+ * @param queueOwnerAccountId The AWS accountId of the account that created the queue
+ * @return true if the queue exists, false if it doesn't.
+ * @throws JMSException
+ */
+ @Override
+ public boolean queueExists(String queueName, String queueOwnerAccountId) throws JMSException {
+ try {
+ GetQueueUrlRequest getQueueUrlRequest = GetQueueUrlRequest.builder()
+ .queueName(queueName)
+ .queueOwnerAWSAccountId(queueOwnerAccountId)
+ .build();
+ amazonSQSClient.getQueueUrl(prepareRequest(getQueueUrlRequest));
+ return true;
+ } catch (QueueDoesNotExistException e) {
+ return false;
+ } catch (SdkException e) {
+ throw handleException(e, "getQueueUrl");
+ }
+ }
+
+ /**
+ * Gets the queueUrl of a queue given a queue name.
+ *
+ * @param queueName
+ * @return The response from the GetQueueUrl service method, as returned by
+ * SqsAsyncClient, which will include queue`s URL
+ * @throws JMSException
+ */
+ @Override
+ public GetQueueUrlResponse getQueueUrl(String queueName) throws JMSException {
+ GetQueueUrlRequest request = GetQueueUrlRequest.builder()
+ .queueName(queueName)
+ .build();
+ return getQueueUrl(request);
+ }
+
+ /**
+ * Gets the queueUrl of a queue given a queue name owned by the provided accountId.
+ *
+ * @param queueName
+ * @param queueOwnerAccountId The AWS accountId of the account that created the queue
+ * @return The response from the GetQueueUrl service method, as returned by
+ * SqsAsyncClient, which will include queue`s URL
+ * @throws JMSException
+ */
+ @Override
+ public GetQueueUrlResponse getQueueUrl(String queueName, String queueOwnerAccountId) throws JMSException {
+ GetQueueUrlRequest request = GetQueueUrlRequest.builder()
+ .queueName(queueName)
+ .queueOwnerAWSAccountId(queueOwnerAccountId)
+ .build();
+ return getQueueUrl(request);
+ }
+
+ /**
+ * Calls getQueueUrl
and wraps SdkException
+ *
+ * @param getQueueUrlRequest Container for the necessary parameters to execute the
+ * getQueueUrl service method on SqsAsyncClient.
+ * @return The response from the GetQueueUrl service method, as returned by
+ * SqsAsyncClient, which will include queue`s URL
+ * @throws JMSException
+ */
+ @Override
+ public GetQueueUrlResponse getQueueUrl(GetQueueUrlRequest getQueueUrlRequest) throws JMSException {
+ try {
+ return amazonSQSClient.getQueueUrl(prepareRequest(getQueueUrlRequest)).join();
+ } catch (SdkException e) {
+ throw handleException(e, "getQueueUrl");
+ }
+ }
+
+ /**
+ * Calls createQueue
to create the queue with the default queue attributes,
+ * and wraps SdkException
+ *
+ * @param queueName
+ * @return The response from the createQueue service method, as returned by
+ * SqsAsyncClient. This call creates a new queue, or returns the URL of
+ * an existing one.
+ * @throws JMSException
+ */
+ @Override
+ public CreateQueueResponse createQueue(String queueName) throws JMSException {
+ return createQueue(CreateQueueRequest.builder().queueName(queueName).build());
+ }
+
+ /**
+ * Calls createQueue
to create the queue with the provided queue attributes
+ * if any, and wraps SdkException
+ *
+ * @param createQueueRequest Container for the necessary parameters to execute the
+ * createQueue service method on SqsAsyncClient.
+ * @return The response from the createQueue service method, as returned by
+ * SqsAsyncClient. This call creates a new queue, or returns the URL of
+ * an existing one.
+ * @throws JMSException
+ */
+ @Override
+ public CreateQueueResponse createQueue(CreateQueueRequest createQueueRequest) throws JMSException {
+ try {
+ return amazonSQSClient.createQueue(prepareRequest(createQueueRequest)).join();
+ } catch (SdkException e) {
+ throw handleException(e, "createQueue");
+ }
+ }
+
+ /**
+ * Calls receiveMessage
and wraps SdkException
. Used by
+ * {@link SQSMessageConsumerPrefetch} to receive up to minimum of
+ * (numberOfMessagesToPrefetch
,10) messages from SQS queue into consumer
+ * prefetch buffers.
+ *
+ * @param receiveMessageRequest Container for the necessary parameters to execute the
+ * receiveMessage service method on SqsAsyncClient.
+ * @return The response from the ReceiveMessage service method, as returned
+ * by SqsAsyncClient.
+ * @throws JMSException
+ */
+ @Override
+ public ReceiveMessageResponse receiveMessage(ReceiveMessageRequest receiveMessageRequest) throws JMSException {
+ try {
+ return amazonSQSClient.receiveMessage(prepareRequest(receiveMessageRequest)).join();
+ } catch (SdkException e) {
+ throw handleException(e, "receiveMessage");
+ }
+ }
+
+ /**
+ * Calls changeMessageVisibility
and wraps SdkException
. This is
+ * used to for negative acknowledge of a single message, so that messages can be received again without any delay.
+ *
+ * @param changeMessageVisibilityRequest Container for the necessary parameters to execute the
+ * changeMessageVisibility service method on SqsAsyncClient.
+ * @throws JMSException
+ */
+ @Override
+ public void changeMessageVisibility(ChangeMessageVisibilityRequest changeMessageVisibilityRequest) throws JMSException {
+ try {
+ amazonSQSClient.changeMessageVisibility(prepareRequest(changeMessageVisibilityRequest));
+ } catch (SdkException e) {
+ throw handleException(e, "changeMessageVisibility");
+ }
+ }
+
+ /**
+ * Calls changeMessageVisibilityBatch
and wraps SdkException
. This is
+ * used to for negative acknowledge of messages in batch, so that messages
+ * can be received again without any delay.
+ *
+ * @param changeMessageVisibilityBatchRequest Container for the necessary parameters to execute the
+ * changeMessageVisibilityBatch service method on SqsClient.
+ * @return The response from the changeMessageVisibilityBatch service
+ * method, as returned by SqsClient.
+ * @throws JMSException
+ */
+ @Override
+ public ChangeMessageVisibilityBatchResponse changeMessageVisibilityBatch(
+ ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest) throws JMSException {
+ try {
+ return amazonSQSClient.changeMessageVisibilityBatch(prepareRequest(changeMessageVisibilityBatchRequest)).join();
+ } catch (SdkException e) {
+ throw handleException(e, "changeMessageVisibilityBatch");
+ }
+ }
+
+ /**
+ * Create generic error message for AwsServiceException
. Message include
+ * Action, RequestId, HTTPStatusCode, and AmazonErrorCode.
+ */
+ private String logAndGetAmazonServiceException(AwsServiceException ase, String action) {
+ String errorMessage = "AmazonServiceException: " + action + ". RequestId: " + ase.requestId() +
+ "\nHTTPStatusCode: " + ase.statusCode() + " AmazonErrorCode: " + errorCode(ase);
+ LOG.error(errorMessage, ase);
+ return errorMessage;
+ }
+
+ /**
+ * Create generic error message for SdkException
. Message include
+ * Action.
+ */
+ private String logAndGetAmazonClientException(SdkException ace, String action) {
+ String errorMessage = "AmazonClientException: " + action + ".";
+ LOG.error(errorMessage, ace);
+ return errorMessage;
+ }
+
+ private JMSException handleException(SdkException e, String operationName) {
+ JMSException jmsException;
+ if (e instanceof AwsServiceException se) {
+ if (e instanceof QueueDoesNotExistException) {
+ jmsException = new InvalidDestinationException(
+ logAndGetAmazonServiceException(se, operationName), errorCode(se));
+ } else if (isJMSSecurityException(se)) {
+ jmsException = new JMSSecurityException(
+ logAndGetAmazonServiceException(se, operationName), errorCode(se));
+ } else {
+ jmsException = new JMSException(
+ logAndGetAmazonServiceException(se, operationName), errorCode(se));
+ }
+
+ } else {
+ jmsException = new JMSException(logAndGetAmazonClientException(e, operationName));
+ }
+ jmsException.initCause(e);
+ return jmsException;
+ }
+
+ private static String errorCode(AwsServiceException e) {
+ return e.awsErrorDetails() != null && e.awsErrorDetails().errorCode() != null ? e.awsErrorDetails().errorCode() : "";
+ }
+
+
+ private static boolean isJMSSecurityException(AwsServiceException e) {
+ return SECURITY_EXCEPTION_ERROR_CODES.contains(errorCode(e));
+ }
+
+ private T prepareRequest(T request) {
+ return credentialsProvider == null ? request : (T) request.toBuilder().overrideConfiguration(
+ AwsRequestOverrideConfiguration.builder().credentialsProvider(credentialsProvider).build())
+ .build();
+ }
+
+}
diff --git a/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSMessagingClient.java b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSMessagingClient.java
new file mode 100644
index 0000000..7949441
--- /dev/null
+++ b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSMessagingClient.java
@@ -0,0 +1,37 @@
+package com.amazon.sqs.javamessaging;
+
+import jakarta.jms.JMSException;
+import software.amazon.awssdk.awscore.AwsClient;
+import software.amazon.awssdk.services.sqs.model.*;
+
+public interface AmazonSQSMessagingClient {
+
+ AwsClient getAmazonSQSClient();
+
+ void deleteMessage(DeleteMessageRequest deleteMessageRequest) throws JMSException;
+
+ DeleteMessageBatchResponse deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) throws JMSException;
+
+ SendMessageResponse sendMessage(SendMessageRequest sendMessageRequest) throws JMSException;
+
+ boolean queueExists(String queueName) throws JMSException;
+
+ boolean queueExists(String queueName, String queueOwnerAccountId) throws JMSException;
+
+ GetQueueUrlResponse getQueueUrl(String queueName) throws JMSException;
+
+ GetQueueUrlResponse getQueueUrl(String queueName, String queueOwnerAccountId) throws JMSException;
+
+ GetQueueUrlResponse getQueueUrl(GetQueueUrlRequest getQueueUrlRequest) throws JMSException;
+
+ CreateQueueResponse createQueue(String queueName) throws JMSException;
+
+ CreateQueueResponse createQueue(CreateQueueRequest createQueueRequest) throws JMSException;
+
+ ReceiveMessageResponse receiveMessage(ReceiveMessageRequest receiveMessageRequest) throws JMSException;
+
+ void changeMessageVisibility(ChangeMessageVisibilityRequest changeMessageVisibilityRequest) throws JMSException;
+
+ ChangeMessageVisibilityBatchResponse changeMessageVisibilityBatch(
+ ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest) throws JMSException;
+}
diff --git a/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSMessagingClientWrapper.java b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSMessagingClientWrapper.java
index 3c82b4b..254ba26 100644
--- a/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSMessagingClientWrapper.java
+++ b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSMessagingClientWrapper.java
@@ -49,7 +49,7 @@
* AwsServiceException
and SdkException
into
* JMSException/JMSSecurityException.
*/
-public class AmazonSQSMessagingClientWrapper {
+public class AmazonSQSMessagingClientWrapper implements AmazonSQSMessagingClient {
private static final Logger LOG = LoggerFactory.getLogger(AmazonSQSMessagingClientWrapper.class);
/**
@@ -90,6 +90,7 @@ public AmazonSQSMessagingClientWrapper(SqsClient amazonSQSClient, AwsCredentials
*
* @return amazonSQSClient
*/
+ @Override
public SqsClient getAmazonSQSClient() {
return amazonSQSClient;
}
@@ -102,6 +103,7 @@ public SqsClient getAmazonSQSClient() {
* deleteMessage service method on SqsClient.
* @throws JMSException
*/
+ @Override
public void deleteMessage(DeleteMessageRequest deleteMessageRequest) throws JMSException {
try {
amazonSQSClient.deleteMessage(prepareRequest(deleteMessageRequest));
@@ -123,6 +125,7 @@ public void deleteMessage(DeleteMessageRequest deleteMessageRequest) throws JMSE
* returned by SqsClient
* @throws JMSException
*/
+ @Override
public DeleteMessageBatchResponse deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) throws JMSException {
try {
return amazonSQSClient.deleteMessageBatch(prepareRequest(deleteMessageBatchRequest));
@@ -141,6 +144,7 @@ public DeleteMessageBatchResponse deleteMessageBatch(DeleteMessageBatchRequest d
* SqsClient
* @throws JMSException
*/
+ @Override
public SendMessageResponse sendMessage(SendMessageRequest sendMessageRequest) throws JMSException {
try {
return amazonSQSClient.sendMessage(prepareRequest(sendMessageRequest));
@@ -158,6 +162,7 @@ public SendMessageResponse sendMessage(SendMessageRequest sendMessageRequest) th
* @return true if the queue exists, false if it doesn't.
* @throws JMSException
*/
+ @Override
public boolean queueExists(String queueName) throws JMSException {
try {
GetQueueUrlRequest getQueueUrlRequest = GetQueueUrlRequest.builder().queueName(queueName).build();
@@ -181,6 +186,7 @@ public boolean queueExists(String queueName) throws JMSException {
* @return true if the queue exists, false if it doesn't.
* @throws JMSException
*/
+ @Override
public boolean queueExists(String queueName, String queueOwnerAccountId) throws JMSException {
try {
GetQueueUrlRequest getQueueUrlRequest = GetQueueUrlRequest.builder()
@@ -204,6 +210,7 @@ public boolean queueExists(String queueName, String queueOwnerAccountId) throws
* SqsClient, which will include queue`s URL
* @throws JMSException
*/
+ @Override
public GetQueueUrlResponse getQueueUrl(String queueName) throws JMSException {
GetQueueUrlRequest request = GetQueueUrlRequest.builder()
.queueName(queueName)
@@ -220,6 +227,7 @@ public GetQueueUrlResponse getQueueUrl(String queueName) throws JMSException {
* SqsClient, which will include queue`s URL
* @throws JMSException
*/
+ @Override
public GetQueueUrlResponse getQueueUrl(String queueName, String queueOwnerAccountId) throws JMSException {
GetQueueUrlRequest request = GetQueueUrlRequest.builder()
.queueName(queueName)
@@ -237,6 +245,7 @@ public GetQueueUrlResponse getQueueUrl(String queueName, String queueOwnerAccoun
* SqsClient, which will include queue`s URL
* @throws JMSException
*/
+ @Override
public GetQueueUrlResponse getQueueUrl(GetQueueUrlRequest getQueueUrlRequest) throws JMSException {
try {
return amazonSQSClient.getQueueUrl(prepareRequest(getQueueUrlRequest));
@@ -255,6 +264,7 @@ public GetQueueUrlResponse getQueueUrl(GetQueueUrlRequest getQueueUrlRequest) th
* an existing one.
* @throws JMSException
*/
+ @Override
public CreateQueueResponse createQueue(String queueName) throws JMSException {
return createQueue(CreateQueueRequest.builder().queueName(queueName).build());
}
@@ -270,6 +280,7 @@ public CreateQueueResponse createQueue(String queueName) throws JMSException {
* an existing one.
* @throws JMSException
*/
+ @Override
public CreateQueueResponse createQueue(CreateQueueRequest createQueueRequest) throws JMSException {
try {
return amazonSQSClient.createQueue(prepareRequest(createQueueRequest));
@@ -290,6 +301,7 @@ public CreateQueueResponse createQueue(CreateQueueRequest createQueueRequest) th
* by SqsClient.
* @throws JMSException
*/
+ @Override
public ReceiveMessageResponse receiveMessage(ReceiveMessageRequest receiveMessageRequest) throws JMSException {
try {
return amazonSQSClient.receiveMessage(prepareRequest(receiveMessageRequest));
@@ -306,6 +318,7 @@ public ReceiveMessageResponse receiveMessage(ReceiveMessageRequest receiveMessag
* changeMessageVisibility service method on SqsClient.
* @throws JMSException
*/
+ @Override
public void changeMessageVisibility(ChangeMessageVisibilityRequest changeMessageVisibilityRequest) throws JMSException {
try {
amazonSQSClient.changeMessageVisibility(prepareRequest(changeMessageVisibilityRequest));
@@ -325,6 +338,7 @@ public void changeMessageVisibility(ChangeMessageVisibilityRequest changeMessage
* method, as returned by SqsClient.
* @throws JMSException
*/
+ @Override
public ChangeMessageVisibilityBatchResponse changeMessageVisibilityBatch(
ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest) throws JMSException {
try {
diff --git a/src/main/java/com/amazon/sqs/javamessaging/SQSAsyncConnectionFactory.java b/src/main/java/com/amazon/sqs/javamessaging/SQSAsyncConnectionFactory.java
new file mode 100644
index 0000000..ab20537
--- /dev/null
+++ b/src/main/java/com/amazon/sqs/javamessaging/SQSAsyncConnectionFactory.java
@@ -0,0 +1,110 @@
+package com.amazon.sqs.javamessaging;
+
+import jakarta.jms.JMSException;
+import jakarta.jms.QueueConnection;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.SqsAsyncClientBuilder;
+
+import java.util.function.Supplier;
+
+public class SQSAsyncConnectionFactory extends AbstractSQSConnectionFactory {
+ private final ProviderConfiguration providerConfiguration;
+ private final Supplier amazonSQSClientSupplier;
+
+ /**
+ * Creates a SQSConnectionFactory that uses default ProviderConfiguration
+ * and SqsClientAsyncBuilder.standard() for creating SqsAsyncClient connections.
+ * Every SQSConnection will have its own copy of SqsAsyncClient.
+ */
+ public SQSAsyncConnectionFactory() {
+ this(new ProviderConfiguration());
+ }
+
+ /**
+ * Creates a SQSConnectionFactory that uses SqsAsyncClientBuilder.standard() for creating SqsAsyncClient connections.
+ * Every SQSConnection will have its own copy of SqsAsyncClient.
+ */
+ public SQSAsyncConnectionFactory(ProviderConfiguration providerConfiguration) {
+ this(providerConfiguration, SqsAsyncClient.create());
+ }
+
+ /**
+ * Creates a SQSConnectionFactory that uses the provided SqsAsyncClient connection.
+ * Every SQSConnection will use the same provided SqsAsyncClient.
+ */
+ public SQSAsyncConnectionFactory(ProviderConfiguration providerConfiguration, final SqsAsyncClient client) {
+ if (providerConfiguration == null) {
+ throw new IllegalArgumentException("Provider configuration cannot be null");
+ }
+ if (client == null) {
+ throw new IllegalArgumentException("AmazonSQS client cannot be null");
+ }
+ this.providerConfiguration = providerConfiguration;
+ this.amazonSQSClientSupplier = () -> client;
+ }
+
+ /**
+ * Creates a SQSConnectionFactory that uses the provided SqsClientBuilder for creating AmazonSQS client connections.
+ * Every SQSConnection will have its own copy of AmazonSQS client created through the provided builder.
+ */
+ public SQSAsyncConnectionFactory(ProviderConfiguration providerConfiguration, final SqsAsyncClientBuilder clientBuilder) {
+ if (providerConfiguration == null) {
+ throw new IllegalArgumentException("Provider configuration cannot be null");
+ }
+ if (clientBuilder == null) {
+ throw new IllegalArgumentException("AmazonSQS client builder cannot be null");
+ }
+ this.providerConfiguration = providerConfiguration;
+ this.amazonSQSClientSupplier = clientBuilder::build;
+ }
+
+
+ @Override
+ public SQSConnection createConnection() throws JMSException {
+ try {
+ SqsAsyncClient amazonSQS = amazonSQSClientSupplier.get();
+ return createConnection(amazonSQS, null);
+ } catch (RuntimeException e) {
+ throw (JMSException) new JMSException("Error creating SQS client: " + e.getMessage()).initCause(e);
+ }
+ }
+
+ @Override
+ public SQSConnection createConnection(String awsAccessKeyId, String awsSecretKey) throws JMSException {
+ AwsBasicCredentials basicAWSCredentials = AwsBasicCredentials.create(awsAccessKeyId, awsSecretKey);
+ return createConnection(basicAWSCredentials);
+ }
+
+ public SQSConnection createConnection(AwsCredentials awsCredentials) throws JMSException {
+ AwsCredentialsProvider awsCredentialsProvider = StaticCredentialsProvider.create(awsCredentials);
+ return createConnection(awsCredentialsProvider);
+ }
+
+ public SQSConnection createConnection(AwsCredentialsProvider awsCredentialsProvider) throws JMSException {
+ try {
+ SqsAsyncClient amazonSQS = amazonSQSClientSupplier.get();
+ return createConnection(amazonSQS, awsCredentialsProvider);
+ } catch (Exception e) {
+ throw (JMSException) new JMSException("Error creating SQS client: " + e.getMessage()).initCause(e);
+ }
+ }
+
+ private SQSConnection createConnection(SqsAsyncClient amazonSQS, AwsCredentialsProvider awsCredentialsProvider) throws JMSException {
+ AmazonSQSAsyncMessagingClientWrapper amazonSQSClientJMSWrapper = new AmazonSQSAsyncMessagingClientWrapper(amazonSQS, awsCredentialsProvider);
+ return new SQSConnection(amazonSQSClientJMSWrapper, providerConfiguration.getNumberOfMessagesToPrefetch());
+ }
+
+ @Override
+ public QueueConnection createQueueConnection() throws JMSException {
+ return createConnection();
+ }
+
+ @Override
+ public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
+ return createConnection(userName, password);
+ }
+}
diff --git a/src/main/java/com/amazon/sqs/javamessaging/SQSConnection.java b/src/main/java/com/amazon/sqs/javamessaging/SQSConnection.java
index da4dd14..ad6e380 100644
--- a/src/main/java/com/amazon/sqs/javamessaging/SQSConnection.java
+++ b/src/main/java/com/amazon/sqs/javamessaging/SQSConnection.java
@@ -31,7 +31,7 @@
import jakarta.jms.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.awscore.AwsClient;
import java.util.Collections;
import java.util.Set;
@@ -84,7 +84,7 @@ public class SQSConnection implements Connection, QueueConnection {
/** Used for interactions with connection state. */
private final Object stateLock = new Object();
- private final AmazonSQSMessagingClientWrapper amazonSQSClient;
+ private final AmazonSQSMessagingClient amazonSQSClient;
/**
* Configures the amount of messages that can be prefetched by a consumer. A
@@ -106,7 +106,7 @@ public class SQSConnection implements Connection, QueueConnection {
private final Set sessions = Collections.newSetFromMap(new ConcurrentHashMap<>());
- SQSConnection(AmazonSQSMessagingClientWrapper amazonSQSClientJMSWrapper, int numberOfMessagesToPrefetch) {
+ SQSConnection(AmazonSQSMessagingClient amazonSQSClientJMSWrapper, int numberOfMessagesToPrefetch) {
amazonSQSClient = amazonSQSClientJMSWrapper;
this.numberOfMessagesToPrefetch = numberOfMessagesToPrefetch;
@@ -116,9 +116,9 @@ public class SQSConnection implements Connection, QueueConnection {
* Get the AmazonSQSClient used by this connection. This can be used to do administrative operations
* that aren't included in the JMS specification, e.g. creating new queues.
*
- * @return the SqsClient used by this connection
+ * @return the AwsClient used by this connection
*/
- public SqsClient getAmazonSQSClient() {
+ public AwsClient getAmazonSQSClient() {
return amazonSQSClient.getAmazonSQSClient();
}
@@ -130,7 +130,7 @@ public SqsClient getAmazonSQSClient() {
*
* @return wrapped version of the AmazonSQSClient used by this connection
*/
- public AmazonSQSMessagingClientWrapper getWrappedAmazonSQSClient() {
+ public AmazonSQSMessagingClient getWrappedAmazonSQSClient() {
return amazonSQSClient;
}
diff --git a/src/main/java/com/amazon/sqs/javamessaging/SQSConnectionFactory.java b/src/main/java/com/amazon/sqs/javamessaging/SQSConnectionFactory.java
index 99c3add..aaf58fb 100644
--- a/src/main/java/com/amazon/sqs/javamessaging/SQSConnectionFactory.java
+++ b/src/main/java/com/amazon/sqs/javamessaging/SQSConnectionFactory.java
@@ -14,12 +14,8 @@
*/
package com.amazon.sqs.javamessaging;
-import jakarta.jms.ConnectionFactory;
-import jakarta.jms.JMSContext;
import jakarta.jms.JMSException;
-import jakarta.jms.JMSRuntimeException;
import jakarta.jms.QueueConnection;
-import jakarta.jms.QueueConnectionFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
@@ -48,7 +44,7 @@
* methods.
*/
-public class SQSConnectionFactory implements ConnectionFactory, QueueConnectionFactory {
+public class SQSConnectionFactory extends AbstractSQSConnectionFactory {
private final ProviderConfiguration providerConfiguration;
private final Supplier amazonSQSClientSupplier;
@@ -116,26 +112,6 @@ public SQSConnection createConnection(String awsAccessKeyId, String awsSecretKey
return createConnection(basicAWSCredentials);
}
- @Override
- public JMSContext createContext() {
- throw new JMSRuntimeException(SQSMessagingClientConstants.UNSUPPORTED_METHOD);
- }
-
- @Override
- public JMSContext createContext(String userName, String password) {
- throw new JMSRuntimeException(SQSMessagingClientConstants.UNSUPPORTED_METHOD);
- }
-
- @Override
- public JMSContext createContext(String userName, String password, int sessionMode) {
- throw new JMSRuntimeException(SQSMessagingClientConstants.UNSUPPORTED_METHOD);
- }
-
- @Override
- public JMSContext createContext(int sessionMode) {
- throw new JMSRuntimeException(SQSMessagingClientConstants.UNSUPPORTED_METHOD);
- }
-
public SQSConnection createConnection(AwsCredentials awsCredentials) throws JMSException {
AwsCredentialsProvider awsCredentialsProvider = StaticCredentialsProvider.create(awsCredentials);
return createConnection(awsCredentialsProvider);
diff --git a/src/main/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetch.java b/src/main/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetch.java
index 7ea5258..f50b091 100644
--- a/src/main/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetch.java
+++ b/src/main/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetch.java
@@ -63,7 +63,7 @@ public class SQSMessageConsumerPrefetch implements Runnable, PrefetchManager {
protected static final String ALL = "All";
- private final AmazonSQSMessagingClientWrapper amazonSQSClient;
+ private final AmazonSQSMessagingClient amazonSQSClient;
private final String queueUrl;
@@ -124,7 +124,7 @@ public class SQSMessageConsumerPrefetch implements Runnable, PrefetchManager {
SQSMessageConsumerPrefetch(SQSSessionCallbackScheduler sqsSessionRunnable, Acknowledger acknowledger,
NegativeAcknowledger negativeAcknowledger, SQSQueueDestination sqsDestination,
- AmazonSQSMessagingClientWrapper amazonSQSClient, int numberOfMessagesToPrefetch) {
+ AmazonSQSMessagingClient amazonSQSClient, int numberOfMessagesToPrefetch) {
this.amazonSQSClient = amazonSQSClient;
this.numberOfMessagesToPrefetch = numberOfMessagesToPrefetch;
diff --git a/src/main/java/com/amazon/sqs/javamessaging/SQSMessageProducer.java b/src/main/java/com/amazon/sqs/javamessaging/SQSMessageProducer.java
index b192099..52b85eb 100644
--- a/src/main/java/com/amazon/sqs/javamessaging/SQSMessageProducer.java
+++ b/src/main/java/com/amazon/sqs/javamessaging/SQSMessageProducer.java
@@ -86,11 +86,11 @@ public class SQSMessageProducer implements MessageProducer, QueueSender {
*/
final AtomicBoolean closed = new AtomicBoolean(false);
- private final AmazonSQSMessagingClientWrapper amazonSQSClient;
+ private final AmazonSQSMessagingClient amazonSQSClient;
private final SQSQueueDestination sqsDestination;
private final SQSSession parentSQSSession;
- SQSMessageProducer(AmazonSQSMessagingClientWrapper amazonSQSClient, SQSSession parentSQSSession,
+ SQSMessageProducer(AmazonSQSMessagingClient amazonSQSClient, SQSSession parentSQSSession,
SQSQueueDestination destination) throws JMSException {
this.sqsDestination = destination;
this.amazonSQSClient = amazonSQSClient;
diff --git a/src/main/java/com/amazon/sqs/javamessaging/SQSSession.java b/src/main/java/com/amazon/sqs/javamessaging/SQSSession.java
index ed81646..e80b628 100644
--- a/src/main/java/com/amazon/sqs/javamessaging/SQSSession.java
+++ b/src/main/java/com/amazon/sqs/javamessaging/SQSSession.java
@@ -128,7 +128,7 @@ public class SQSSession implements Session, QueueSession {
*/
private volatile boolean closing = false;
- private final AmazonSQSMessagingClientWrapper amazonSQSClient;
+ private final AmazonSQSMessagingClient amazonSQSClient;
private final SQSConnection parentSQSConnection;
/**
diff --git a/src/main/java/com/amazon/sqs/javamessaging/acknowledge/AcknowledgeMode.java b/src/main/java/com/amazon/sqs/javamessaging/acknowledge/AcknowledgeMode.java
index e22e424..0ef7824 100644
--- a/src/main/java/com/amazon/sqs/javamessaging/acknowledge/AcknowledgeMode.java
+++ b/src/main/java/com/amazon/sqs/javamessaging/acknowledge/AcknowledgeMode.java
@@ -14,7 +14,7 @@
*/
package com.amazon.sqs.javamessaging.acknowledge;
-import com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper;
+import com.amazon.sqs.javamessaging.AmazonSQSMessagingClient;
import com.amazon.sqs.javamessaging.SQSSession;
/**
@@ -60,7 +60,7 @@ public int getOriginalAcknowledgeMode() {
* @param parentSQSSession
* the associated session for the acknowledger
*/
- public Acknowledger createAcknowledger(AmazonSQSMessagingClientWrapper amazonSQSClient, SQSSession parentSQSSession) {
+ public Acknowledger createAcknowledger(AmazonSQSMessagingClient amazonSQSClient, SQSSession parentSQSSession) {
return switch (this) {
case ACK_AUTO -> new AutoAcknowledger(amazonSQSClient, parentSQSSession);
case ACK_RANGE -> new RangedAcknowledger(amazonSQSClient, parentSQSSession);
diff --git a/src/main/java/com/amazon/sqs/javamessaging/acknowledge/AutoAcknowledger.java b/src/main/java/com/amazon/sqs/javamessaging/acknowledge/AutoAcknowledger.java
index f155a2a..9d8d997 100644
--- a/src/main/java/com/amazon/sqs/javamessaging/acknowledge/AutoAcknowledger.java
+++ b/src/main/java/com/amazon/sqs/javamessaging/acknowledge/AutoAcknowledger.java
@@ -14,7 +14,7 @@
*/
package com.amazon.sqs.javamessaging.acknowledge;
-import com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper;
+import com.amazon.sqs.javamessaging.AmazonSQSMessagingClient;
import com.amazon.sqs.javamessaging.SQSSession;
import com.amazon.sqs.javamessaging.message.SQSMessage;
import jakarta.jms.JMSException;
@@ -31,10 +31,10 @@
*/
public class AutoAcknowledger implements Acknowledger {
- private final AmazonSQSMessagingClientWrapper amazonSQSClient;
+ private final AmazonSQSMessagingClient amazonSQSClient;
private final SQSSession session;
- public AutoAcknowledger(AmazonSQSMessagingClientWrapper amazonSQSClient, SQSSession session) {
+ public AutoAcknowledger(AmazonSQSMessagingClient amazonSQSClient, SQSSession session) {
this.amazonSQSClient = amazonSQSClient;
this.session = session;
}
diff --git a/src/main/java/com/amazon/sqs/javamessaging/acknowledge/NegativeAcknowledger.java b/src/main/java/com/amazon/sqs/javamessaging/acknowledge/NegativeAcknowledger.java
index 4b4b581..82e6c5f 100644
--- a/src/main/java/com/amazon/sqs/javamessaging/acknowledge/NegativeAcknowledger.java
+++ b/src/main/java/com/amazon/sqs/javamessaging/acknowledge/NegativeAcknowledger.java
@@ -14,7 +14,7 @@
*/
package com.amazon.sqs.javamessaging.acknowledge;
-import com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper;
+import com.amazon.sqs.javamessaging.AmazonSQSMessagingClient;
import com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.MessageManager;
import com.amazon.sqs.javamessaging.SQSMessagingClientConstants;
import com.amazon.sqs.javamessaging.message.SQSMessage;
@@ -39,9 +39,9 @@ public class NegativeAcknowledger extends BulkSQSOperation {
private static final int NACK_TIMEOUT = 0;
- private final AmazonSQSMessagingClientWrapper amazonSQSClient;
+ private final AmazonSQSMessagingClient amazonSQSClient;
- public NegativeAcknowledger(AmazonSQSMessagingClientWrapper amazonSQSClient) {
+ public NegativeAcknowledger(AmazonSQSMessagingClient amazonSQSClient) {
this.amazonSQSClient = amazonSQSClient;
}
diff --git a/src/main/java/com/amazon/sqs/javamessaging/acknowledge/RangedAcknowledger.java b/src/main/java/com/amazon/sqs/javamessaging/acknowledge/RangedAcknowledger.java
index 72e4b4f..5d855da 100644
--- a/src/main/java/com/amazon/sqs/javamessaging/acknowledge/RangedAcknowledger.java
+++ b/src/main/java/com/amazon/sqs/javamessaging/acknowledge/RangedAcknowledger.java
@@ -14,7 +14,7 @@
*/
package com.amazon.sqs.javamessaging.acknowledge;
-import com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper;
+import com.amazon.sqs.javamessaging.AmazonSQSMessagingClient;
import com.amazon.sqs.javamessaging.SQSSession;
import com.amazon.sqs.javamessaging.message.SQSMessage;
import jakarta.jms.JMSException;
@@ -41,13 +41,13 @@
public class RangedAcknowledger extends BulkSQSOperation implements Acknowledger {
private static final Logger LOG = LoggerFactory.getLogger(RangedAcknowledger.class);
- private final AmazonSQSMessagingClientWrapper amazonSQSClient;
+ private final AmazonSQSMessagingClient amazonSQSClient;
private final SQSSession session;
private final Queue unAckMessages;
- public RangedAcknowledger(AmazonSQSMessagingClientWrapper amazonSQSClient, SQSSession session) {
+ public RangedAcknowledger(AmazonSQSMessagingClient amazonSQSClient, SQSSession session) {
this.amazonSQSClient = amazonSQSClient;
this.session = session;
this.unAckMessages = new LinkedList<>();
diff --git a/src/main/java/com/amazon/sqs/javamessaging/acknowledge/UnorderedAcknowledger.java b/src/main/java/com/amazon/sqs/javamessaging/acknowledge/UnorderedAcknowledger.java
index c77469c..026960f 100644
--- a/src/main/java/com/amazon/sqs/javamessaging/acknowledge/UnorderedAcknowledger.java
+++ b/src/main/java/com/amazon/sqs/javamessaging/acknowledge/UnorderedAcknowledger.java
@@ -14,7 +14,7 @@
*/
package com.amazon.sqs.javamessaging.acknowledge;
-import com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper;
+import com.amazon.sqs.javamessaging.AmazonSQSMessagingClient;
import com.amazon.sqs.javamessaging.SQSSession;
import com.amazon.sqs.javamessaging.message.SQSMessage;
import jakarta.jms.JMSException;
@@ -32,7 +32,7 @@
*/
public class UnorderedAcknowledger implements Acknowledger {
- private final AmazonSQSMessagingClientWrapper amazonSQSClient;
+ private final AmazonSQSMessagingClient amazonSQSClient;
private final SQSSession session;
@@ -40,7 +40,7 @@ public class UnorderedAcknowledger implements Acknowledger {
// identifier
private final Map unAckMessages;
- public UnorderedAcknowledger (AmazonSQSMessagingClientWrapper amazonSQSClient, SQSSession session) {
+ public UnorderedAcknowledger (AmazonSQSMessagingClient amazonSQSClient, SQSSession session) {
this.amazonSQSClient = amazonSQSClient;
this.session = session;
this.unAckMessages = new HashMap<>();
diff --git a/src/test/java/com/amazon/sqs/javamessaging/AcknowledgerCommon.java b/src/test/java/com/amazon/sqs/javamessaging/AcknowledgerCommon.java
index a73f837..4eb58e5 100644
--- a/src/test/java/com/amazon/sqs/javamessaging/AcknowledgerCommon.java
+++ b/src/test/java/com/amazon/sqs/javamessaging/AcknowledgerCommon.java
@@ -35,7 +35,7 @@ public class AcknowledgerCommon {
protected String baseQueueUrl = "queueUrl";
protected Acknowledger acknowledger;
- protected AmazonSQSMessagingClientWrapper amazonSQSClient;
+ protected AmazonSQSMessagingClient amazonSQSClient;
protected List populatedMessages = new ArrayList<>();
/*
diff --git a/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSMessagingClientWrapperTest.java b/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSMessagingClientWrapperTest.java
index 69fd08f..8f1cae0 100644
--- a/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSMessagingClientWrapperTest.java
+++ b/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSMessagingClientWrapperTest.java
@@ -49,7 +49,7 @@ public class AmazonSQSMessagingClientWrapperTest {
private static final String OWNER_ACCOUNT_ID = "accountId";
private SqsClient amazonSQSClient;
- private AmazonSQSMessagingClientWrapper wrapper;
+ private AmazonSQSMessagingClient wrapper;
@BeforeEach
public void setup() throws JMSException {
diff --git a/src/test/java/com/amazon/sqs/javamessaging/AutoAcknowledgerTest.java b/src/test/java/com/amazon/sqs/javamessaging/AutoAcknowledgerTest.java
index 05c6cb9..ad10e2c 100644
--- a/src/test/java/com/amazon/sqs/javamessaging/AutoAcknowledgerTest.java
+++ b/src/test/java/com/amazon/sqs/javamessaging/AutoAcknowledgerTest.java
@@ -41,7 +41,7 @@ public class AutoAcknowledgerTest {
private static final String RECEIPT_HANDLE = "ReceiptHandle";
private AutoAcknowledger acknowledger;
- private AmazonSQSMessagingClientWrapper amazonSQSClient;
+ private AmazonSQSMessagingClient amazonSQSClient;
private SQSSession session;
@BeforeEach
diff --git a/src/test/java/com/amazon/sqs/javamessaging/MessageListenerConcurrentOperationTest.java b/src/test/java/com/amazon/sqs/javamessaging/MessageListenerConcurrentOperationTest.java
index 045e776..1d4511d 100644
--- a/src/test/java/com/amazon/sqs/javamessaging/MessageListenerConcurrentOperationTest.java
+++ b/src/test/java/com/amazon/sqs/javamessaging/MessageListenerConcurrentOperationTest.java
@@ -44,7 +44,7 @@ public class MessageListenerConcurrentOperationTest {
private static final String QUEUE_NAME = "queueName";
private static final int NUMBER_OF_MESSAGES_TO_PREFETCH = 10;
- private AmazonSQSMessagingClientWrapper amazonSQSClient;
+ private AmazonSQSMessagingClient amazonSQSClient;
private SQSMessageConsumerPrefetch.MessageManager msgManager;
private volatile SQSSession session;
private volatile SQSConnection connection;
diff --git a/src/test/java/com/amazon/sqs/javamessaging/SQSConnectionTest.java b/src/test/java/com/amazon/sqs/javamessaging/SQSConnectionTest.java
index c2a7b43..0f3ed5a 100644
--- a/src/test/java/com/amazon/sqs/javamessaging/SQSConnectionTest.java
+++ b/src/test/java/com/amazon/sqs/javamessaging/SQSConnectionTest.java
@@ -58,7 +58,7 @@ public void setup() throws JMSException {
destination = new SQSQueueDestination(QUEUE_NAME, QUEUE_URL);
int numberOfMessagesToPrefetch = 10;
- AmazonSQSMessagingClientWrapper amazonSQSClientJMSWrapper = mock(AmazonSQSMessagingClientWrapper.class);
+ AmazonSQSMessagingClient amazonSQSClientJMSWrapper = mock(AmazonSQSMessagingClientWrapper.class);
sqsConnection = spy(new SQSConnection(amazonSQSClientJMSWrapper, numberOfMessagesToPrefetch));
session1 = mock(SQSSession.class);
diff --git a/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchFifoTest.java b/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchFifoTest.java
index a6d079d..5f4c597 100644
--- a/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchFifoTest.java
+++ b/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchFifoTest.java
@@ -65,7 +65,7 @@ public class SQSMessageConsumerPrefetchFifoTest {
private NegativeAcknowledger negativeAcknowledger;
private SQSMessageConsumerPrefetch consumerPrefetch;
- private AmazonSQSMessagingClientWrapper amazonSQSClient;
+ private AmazonSQSMessagingClient amazonSQSClient;
/**
* Test one full prefetch operation works as expected
diff --git a/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchTest.java b/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchTest.java
index 4d52c70..aff5bcc 100644
--- a/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchTest.java
+++ b/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchTest.java
@@ -91,7 +91,7 @@ public class SQSMessageConsumerPrefetchTest {
private ExponentialBackoffStrategy backoffStrategy;
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
- private AmazonSQSMessagingClientWrapper amazonSQSClient;
+ private AmazonSQSMessagingClient amazonSQSClient;
/**
* Test one full prefetch operation works as expected
diff --git a/src/test/java/com/amazon/sqs/javamessaging/SQSMessageProducerFifoTest.java b/src/test/java/com/amazon/sqs/javamessaging/SQSMessageProducerFifoTest.java
index 3fa1b98..0dcb6b9 100644
--- a/src/test/java/com/amazon/sqs/javamessaging/SQSMessageProducerFifoTest.java
+++ b/src/test/java/com/amazon/sqs/javamessaging/SQSMessageProducerFifoTest.java
@@ -62,7 +62,7 @@ public class SQSMessageProducerFifoTest {
private SQSMessageProducer producer;
private SQSQueueDestination destination;
- private AmazonSQSMessagingClientWrapper amazonSQSClient;
+ private AmazonSQSMessagingClient amazonSQSClient;
private Acknowledger acknowledger;
@BeforeEach
diff --git a/src/test/java/com/amazon/sqs/javamessaging/SQSMessageProducerTest.java b/src/test/java/com/amazon/sqs/javamessaging/SQSMessageProducerTest.java
index ace019a..d6bca02 100644
--- a/src/test/java/com/amazon/sqs/javamessaging/SQSMessageProducerTest.java
+++ b/src/test/java/com/amazon/sqs/javamessaging/SQSMessageProducerTest.java
@@ -72,7 +72,7 @@ public class SQSMessageProducerTest {
private SQSMessageProducer producer;
private SQSQueueDestination destination;
private SQSSession sqsSession;
- private AmazonSQSMessagingClientWrapper amazonSQSClient;
+ private AmazonSQSMessagingClient amazonSQSClient;
private Acknowledger acknowledger;
@BeforeEach
diff --git a/src/test/java/com/amazon/sqs/javamessaging/SQSSessionTest.java b/src/test/java/com/amazon/sqs/javamessaging/SQSSessionTest.java
index 5dc4b51..cbac0a2 100644
--- a/src/test/java/com/amazon/sqs/javamessaging/SQSSessionTest.java
+++ b/src/test/java/com/amazon/sqs/javamessaging/SQSSessionTest.java
@@ -80,7 +80,7 @@ public class SQSSessionTest {
private SQSMessageConsumer consumer2;
private SQSMessageProducer producer1;
private SQSMessageProducer producer2;
- private AmazonSQSMessagingClientWrapper sqsClientJMSWrapper;
+ private AmazonSQSMessagingClient sqsClientJMSWrapper;
@BeforeEach
public void setup() throws JMSException {