Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 28 additions & 29 deletions src/CommandLine/AmazonServiceExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,40 +1,39 @@
namespace NServiceBus.Transport.SQS.CommandLine
{
using System;
using System.Net;
using System.Threading.Tasks;
using Amazon.Runtime;
namespace NServiceBus.Transport.SQS.CommandLine;

using System;
using System.Net;
using System.Threading.Tasks;
using Amazon.Runtime;

static class AmazonServiceExtensions
static class AmazonServiceExtensions
{
public static async Task<T> RetryConflictsAsync<T>(this IAmazonService client, Func<Task<T>> a, Func<int, Task> onRetry)
{
public static async Task<T> RetryConflictsAsync<T>(this IAmazonService client, Func<Task<T>> a, Func<int, Task> onRetry)
{
_ = client;
_ = client;

var tryCount = 0;
var sleepTimeMs = 2000;
const int maxTryCount = 5;
var tryCount = 0;
var sleepTimeMs = 2000;
const int maxTryCount = 5;

while (true)
while (true)
{
try
{
try
tryCount++;
return await a().ConfigureAwait(false);
}
catch (AmazonServiceException ex)
when (ex.StatusCode == HttpStatusCode.Conflict &&
ex.ErrorCode == "OperationAborted")
{
if (tryCount >= maxTryCount)
{
tryCount++;
return await a().ConfigureAwait(false);
throw;
}
catch (AmazonServiceException ex)
when (ex.StatusCode == HttpStatusCode.Conflict &&
ex.ErrorCode == "OperationAborted")
{
if (tryCount >= maxTryCount)
{
throw;
}

var sleepTime = sleepTimeMs * tryCount;
await onRetry(sleepTime);
await Task.Delay(sleepTime).ConfigureAwait(false);
}
var sleepTime = sleepTimeMs * tryCount;
await onRetry(sleepTime);
await Task.Delay(sleepTime).ConfigureAwait(false);
}
}
}
Expand Down
183 changes: 91 additions & 92 deletions src/CommandLine/Bucket.cs
Original file line number Diff line number Diff line change
@@ -1,117 +1,116 @@
namespace NServiceBus.Transport.SQS.CommandLine
{
using System;
using System.Linq;
using System.Threading.Tasks;
using Amazon.S3;
using Amazon.S3.Model;
using Amazon.S3.Util;
namespace NServiceBus.Transport.SQS.CommandLine;

using System;
using System.Linq;
using System.Threading.Tasks;
using Amazon.S3;
using Amazon.S3.Model;
using Amazon.S3.Util;

static class Bucket
static class Bucket
{
public static async Task Create(IAmazonS3 s3, string endpointName, string bucketName)
{
public static async Task Create(IAmazonS3 s3, string endpointName, string bucketName)
{
await Console.Out.WriteLineAsync($"Creating bucket with name '{bucketName}' for endpoint '{endpointName}'.");
await Console.Out.WriteLineAsync($"Creating bucket with name '{bucketName}' for endpoint '{endpointName}'.");

if (!await AmazonS3Util.DoesS3BucketExistV2Async(s3, bucketName))
{
await s3.RetryConflictsAsync(async () =>
await s3.PutBucketAsync(new PutBucketRequest { BucketName = bucketName }).ConfigureAwait(false),
onRetry: async x => { await Console.Out.WriteLineAsync($"Conflict when creating S3 bucket, retrying after {x}ms."); }).ConfigureAwait(false);
if (!await AmazonS3Util.DoesS3BucketExistV2Async(s3, bucketName))
{
await s3.RetryConflictsAsync(async () =>
await s3.PutBucketAsync(new PutBucketRequest { BucketName = bucketName }).ConfigureAwait(false),
onRetry: async x => { await Console.Out.WriteLineAsync($"Conflict when creating S3 bucket, retrying after {x}ms."); }).ConfigureAwait(false);

await Console.Out.WriteLineAsync($"Created bucket with name '{bucketName}' for endpoint '{endpointName}'.");
}
else
{
await Console.Out.WriteLineAsync($"Bucket with name '{bucketName}' already exists.");
}
await Console.Out.WriteLineAsync($"Created bucket with name '{bucketName}' for endpoint '{endpointName}'.");
}

public static async Task EnableCleanup(IAmazonS3 s3, string endpointName, string bucketName, string keyPrefix, int expirationInDays)
else
{
await Console.Out.WriteLineAsync($"Adding lifecycle configuration to bucket name '{bucketName}' for endpoint '{endpointName}'.");
await Console.Out.WriteLineAsync($"Bucket with name '{bucketName}' already exists.");
}
}

var lifecycleConfig = await s3.GetLifecycleConfigurationAsync(bucketName).ConfigureAwait(false);
var setLifecycleConfig = lifecycleConfig.Configuration.Rules.All(x => x.Id != "NServiceBus.SQS.DeleteMessageBodies");
public static async Task EnableCleanup(IAmazonS3 s3, string endpointName, string bucketName, string keyPrefix, int expirationInDays)
{
await Console.Out.WriteLineAsync($"Adding lifecycle configuration to bucket name '{bucketName}' for endpoint '{endpointName}'.");

if (setLifecycleConfig)
{
await s3.RetryConflictsAsync(async () =>
await s3.PutLifecycleConfigurationAsync(new PutLifecycleConfigurationRequest
var lifecycleConfig = await s3.GetLifecycleConfigurationAsync(bucketName).ConfigureAwait(false);
var setLifecycleConfig = lifecycleConfig.Configuration.Rules.All(x => x.Id != "NServiceBus.SQS.DeleteMessageBodies");

if (setLifecycleConfig)
{
await s3.RetryConflictsAsync(async () =>
await s3.PutLifecycleConfigurationAsync(new PutLifecycleConfigurationRequest
{
BucketName = bucketName,
Configuration = new LifecycleConfiguration
{
BucketName = bucketName,
Configuration = new LifecycleConfiguration
{
Rules =
[
new LifecycleRule
Rules =
[
new LifecycleRule
{
Id = "NServiceBus.SQS.DeleteMessageBodies",
Filter = new LifecycleFilter
{
Id = "NServiceBus.SQS.DeleteMessageBodies",
Filter = new LifecycleFilter
LifecycleFilterPredicate = new LifecyclePrefixPredicate
{
LifecycleFilterPredicate = new LifecyclePrefixPredicate
{
Prefix = keyPrefix
}
},
Status = LifecycleRuleStatus.Enabled,
Expiration = new LifecycleRuleExpiration
{
Days = expirationInDays
Prefix = keyPrefix
}
},
Status = LifecycleRuleStatus.Enabled,
Expiration = new LifecycleRuleExpiration
{
Days = expirationInDays
}
]
}
}).ConfigureAwait(false),
onRetry: async x => { await Console.Out.WriteLineAsync($"Conflict when setting S3 lifecycle configuration, retrying after {x}ms."); }).ConfigureAwait(false);
}
]
}
}).ConfigureAwait(false),
onRetry: async x => { await Console.Out.WriteLineAsync($"Conflict when setting S3 lifecycle configuration, retrying after {x}ms."); }).ConfigureAwait(false);

await Console.Out.WriteLineAsync($"Added lifecycle configuration to bucket name '{bucketName}' for endpoint '{endpointName}'.");
}
else
{
await Console.Out.WriteLineAsync($"Lifecycle configuration already configured for bucket name '{bucketName}' for endpoint '{endpointName}'.");
}
await Console.Out.WriteLineAsync($"Added lifecycle configuration to bucket name '{bucketName}' for endpoint '{endpointName}'.");
}

public static async Task Delete(IAmazonS3 s3, string endpointName, string bucketName)
else
{
await Console.Out.WriteLineAsync($"Delete bucket with name '{bucketName}' for endpoint '{endpointName}'.");
await Console.Out.WriteLineAsync($"Lifecycle configuration already configured for bucket name '{bucketName}' for endpoint '{endpointName}'.");
}
}

if (await AmazonS3Util.DoesS3BucketExistV2Async(s3, bucketName))
public static async Task Delete(IAmazonS3 s3, string endpointName, string bucketName)
{
await Console.Out.WriteLineAsync($"Delete bucket with name '{bucketName}' for endpoint '{endpointName}'.");

if (await AmazonS3Util.DoesS3BucketExistV2Async(s3, bucketName))
{
var response = await s3.GetBucketLocationAsync(bucketName);
S3Region region;
switch (response.Location)
{
var response = await s3.GetBucketLocationAsync(bucketName);
S3Region region;
switch (response.Location)
{
case "":
{
region = new S3Region("us-east-1");
break;
}
case "EU":
{
region = S3Region.EUWest1;
break;
}
default:
region = response.Location;
case "":
{
region = new S3Region("us-east-1");
break;
}
case "EU":
{
region = S3Region.EUWest1;
break;
}
}
default:
region = response.Location;
break;
}

var deleteRequest = new DeleteBucketRequest
{
BucketName = bucketName,
BucketRegion = region,
};
var deleteRequest = new DeleteBucketRequest
{
BucketName = bucketName,
BucketRegion = region,
};

await s3.DeleteBucketAsync(deleteRequest).ConfigureAwait(false);
await s3.DeleteBucketAsync(deleteRequest).ConfigureAwait(false);

await Console.Out.WriteLineAsync($"Delete bucket with name '{bucketName}' for endpoint '{endpointName}'.");
}
else
{
await Console.Out.WriteLineAsync($"Bucket with name '{bucketName}' does not exist.");
}
await Console.Out.WriteLineAsync($"Delete bucket with name '{bucketName}' for endpoint '{endpointName}'.");
}
else
{
await Console.Out.WriteLineAsync($"Bucket with name '{bucketName}' does not exist.");
}
}
}
65 changes: 32 additions & 33 deletions src/CommandLine/CommandRunner.cs
Original file line number Diff line number Diff line change
@@ -1,39 +1,38 @@
namespace NServiceBus.Transport.SQS.CommandLine
{
using System;
using System.Threading.Tasks;
using Amazon;
using Amazon.S3;
using Amazon.SimpleNotificationService;
using Amazon.SQS;
using McMaster.Extensions.CommandLineUtils;
namespace NServiceBus.Transport.SQS.CommandLine;

static class CommandRunner
{
public static async Task Run(CommandOption accessKey, CommandOption secret, CommandOption region, Func<IAmazonSQS, IAmazonSimpleNotificationService, IAmazonS3, Task> func)
{
var useCredentialsFromOptions = accessKey.HasValue() && secret.HasValue();
var useRegionFromOptions = region.HasValue();
using System;
using System.Threading.Tasks;
using Amazon;
using Amazon.S3;
using Amazon.SimpleNotificationService;
using Amazon.SQS;
using McMaster.Extensions.CommandLineUtils;

var useFullConstructor = useCredentialsFromOptions && useRegionFromOptions;
static class CommandRunner
{
public static async Task Run(CommandOption accessKey, CommandOption secret, CommandOption region, Func<IAmazonSQS, IAmazonSimpleNotificationService, IAmazonS3, Task> func)
{
var useCredentialsFromOptions = accessKey.HasValue() && secret.HasValue();
var useRegionFromOptions = region.HasValue();

using var sqs = useFullConstructor ? new AmazonSQSClient(accessKey.Value(), secret.Value(), new AmazonSQSConfig { RegionEndpoint = RegionEndpoint.GetBySystemName(region.Value()) }) :
useCredentialsFromOptions ? new AmazonSQSClient(accessKey.Value(), secret.Value(), new AmazonSQSConfig()) :
useRegionFromOptions ? new AmazonSQSClient(new AmazonSQSConfig { RegionEndpoint = RegionEndpoint.GetBySystemName(region.Value()) }) :
new AmazonSQSClient(new AmazonSQSConfig());
using var sns = useFullConstructor ? new AmazonSimpleNotificationServiceClient(accessKey.Value(), secret.Value(), new AmazonSimpleNotificationServiceConfig { RegionEndpoint = RegionEndpoint.GetBySystemName(region.Value()) }) :
useCredentialsFromOptions ? new AmazonSimpleNotificationServiceClient(accessKey.Value(), secret.Value(), new AmazonSimpleNotificationServiceConfig()) :
useRegionFromOptions ? new AmazonSimpleNotificationServiceClient(new AmazonSimpleNotificationServiceConfig { RegionEndpoint = RegionEndpoint.GetBySystemName(region.Value()) }) :
new AmazonSimpleNotificationServiceClient(new AmazonSimpleNotificationServiceConfig());
using var s3 = useFullConstructor ? new AmazonS3Client(accessKey.Value(), secret.Value(), new AmazonS3Config { RegionEndpoint = RegionEndpoint.GetBySystemName(region.Value()) }) :
useCredentialsFromOptions ? new AmazonS3Client(accessKey.Value(), secret.Value(), new AmazonS3Config()) :
useRegionFromOptions ? new AmazonS3Client(new AmazonS3Config { RegionEndpoint = RegionEndpoint.GetBySystemName(region.Value()) }) :
new AmazonS3Client(new AmazonS3Config());
await func(sqs, sns, s3);
}
var useFullConstructor = useCredentialsFromOptions && useRegionFromOptions;

public const string AccessKeyId = "AWS_ACCESS_KEY_ID";
public const string Region = "AWS_REGION";
public const string SecretAccessKey = "AWS_SECRET_ACCESS_KEY";
using var sqs = useFullConstructor ? new AmazonSQSClient(accessKey.Value(), secret.Value(), new AmazonSQSConfig { RegionEndpoint = RegionEndpoint.GetBySystemName(region.Value()) }) :
useCredentialsFromOptions ? new AmazonSQSClient(accessKey.Value(), secret.Value(), new AmazonSQSConfig()) :
useRegionFromOptions ? new AmazonSQSClient(new AmazonSQSConfig { RegionEndpoint = RegionEndpoint.GetBySystemName(region.Value()) }) :
new AmazonSQSClient(new AmazonSQSConfig());
using var sns = useFullConstructor ? new AmazonSimpleNotificationServiceClient(accessKey.Value(), secret.Value(), new AmazonSimpleNotificationServiceConfig { RegionEndpoint = RegionEndpoint.GetBySystemName(region.Value()) }) :
useCredentialsFromOptions ? new AmazonSimpleNotificationServiceClient(accessKey.Value(), secret.Value(), new AmazonSimpleNotificationServiceConfig()) :
useRegionFromOptions ? new AmazonSimpleNotificationServiceClient(new AmazonSimpleNotificationServiceConfig { RegionEndpoint = RegionEndpoint.GetBySystemName(region.Value()) }) :
new AmazonSimpleNotificationServiceClient(new AmazonSimpleNotificationServiceConfig());
using var s3 = useFullConstructor ? new AmazonS3Client(accessKey.Value(), secret.Value(), new AmazonS3Config { RegionEndpoint = RegionEndpoint.GetBySystemName(region.Value()) }) :
useCredentialsFromOptions ? new AmazonS3Client(accessKey.Value(), secret.Value(), new AmazonS3Config()) :
useRegionFromOptions ? new AmazonS3Client(new AmazonS3Config { RegionEndpoint = RegionEndpoint.GetBySystemName(region.Value()) }) :
new AmazonS3Client(new AmazonS3Config());
await func(sqs, sns, s3);
}

public const string AccessKeyId = "AWS_ACCESS_KEY_ID";
public const string Region = "AWS_REGION";
public const string SecretAccessKey = "AWS_SECRET_ACCESS_KEY";
}
33 changes: 16 additions & 17 deletions src/CommandLine/DefaultConfigurationValues.cs
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
namespace NServiceBus.Transport.SQS.CommandLine
{
using System;
namespace NServiceBus.Transport.SQS.CommandLine;

using System;

public static class DefaultConfigurationValues
{
public static readonly TimeSpan RetentionPeriod = TimeSpan.FromDays(4);
public static readonly TimeSpan MaximumQueueDelayTime = TimeSpan.FromMinutes(15);
public static readonly string S3KeyPrefix = string.Empty;
public static readonly string DelayedDeliveryQueueSuffix = "-delay.fifo";
public static class DefaultConfigurationValues
{
public static readonly TimeSpan RetentionPeriod = TimeSpan.FromDays(4);
public static readonly TimeSpan MaximumQueueDelayTime = TimeSpan.FromMinutes(15);
public static readonly string S3KeyPrefix = string.Empty;
public static readonly string DelayedDeliveryQueueSuffix = "-delay.fifo";

public static readonly string QueueNamePrefix = string.Empty;
public static readonly string TopicNamePrefix = string.Empty;
public static readonly string QueueNamePrefix = string.Empty;
public static readonly string TopicNamePrefix = string.Empty;

/* TODO: look into these, why are they different from the above?
public static readonly int AwsMaximumQueueDelayTime = (int)TimeSpan.FromMinutes(15).TotalSeconds;
public static readonly TimeSpan DelayedDeliveryQueueMessageRetentionPeriod = TimeSpan.FromDays(4);
public static readonly int DelayedDeliveryQueueDelayTime = Convert.ToInt32(Math.Ceiling(MaximumQueueDelayTime.TotalSeconds));
*/
}
/* TODO: look into these, why are they different from the above?
public static readonly int AwsMaximumQueueDelayTime = (int)TimeSpan.FromMinutes(15).TotalSeconds;
public static readonly TimeSpan DelayedDeliveryQueueMessageRetentionPeriod = TimeSpan.FromDays(4);
public static readonly int DelayedDeliveryQueueDelayTime = Convert.ToInt32(Math.Ceiling(MaximumQueueDelayTime.TotalSeconds));
*/
}
Loading