Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using AcceptanceTesting;
using AcceptanceTesting.Customization;
using Amazon.DynamoDBv2.Model;
using NUnit.Framework;
using Persistence.DynamoDB;

public class When_using_transactional_session : NServiceBusAcceptanceTest
{
Expand Down Expand Up @@ -117,6 +119,53 @@ public async Task Should_send_messages_and_store_document_in_dynamo_session_on_t
Assert.That(documents.Count, Is.EqualTo(1));
}

[TestCase(true)]
[TestCase(false)]
public async Task Should_send_messages_on_transactional_session_commit_even_when_dynamo_session_not_used(bool outboxEnabled)
{
var endpointIdentifier = Conventions.EndpointNamingConvention(typeof(AnEndpoint));
var messageId = Guid.NewGuid().ToString("N");

var context = await Scenario.Define<Context>()
.WithEndpoint<AnEndpoint>(s => s.When(async (_, ctx) =>
{
using var scope = ctx.ServiceProvider.CreateScope();
using var transactionalSession = scope.ServiceProvider.GetRequiredService<ITransactionalSession>();
await transactionalSession.Open(new DynamoOpenSessionOptions());

var sendOptions = new SendOptions();
sendOptions.SetMessageId(messageId);
sendOptions.RouteToThisEndpoint();

await transactionalSession.Send(new SampleMessage(), sendOptions, CancellationToken.None);

await transactionalSession.Commit(CancellationToken.None).ConfigureAwait(false);
}))
.Done(c => c.MessageReceived)
.Run();

var documents = await SetupFixture.DynamoDBClient.QueryAsync(new QueryRequest()
{
TableName = SetupFixture.TableConfiguration.TableName,
ConsistentRead = true,
KeyConditionExpression = "#pk = :pk",
ExpressionAttributeNames =
new Dictionary<string, string>()
{
{ "#pk", SetupFixture.TableConfiguration.PartitionKeyName }
},
ExpressionAttributeValues = new Dictionary<string, AttributeValue>()
{
{ ":pk", new AttributeValue(OutboxPersister.OutboxPartitionKey(endpointIdentifier, messageId)) }
}
});
Assert.Multiple(() =>
{
Assert.That(documents.Count, outboxEnabled ? Is.EqualTo(1) : Is.Zero);
Assert.That(context.MessageReceived, Is.True);
});
}

[TestCase(true)]
[TestCase(false)]
public async Task Should_not_send_messages_if_session_is_not_committed(bool outboxEnabled)
Expand Down
16 changes: 8 additions & 8 deletions src/NServiceBus.Persistence.DynamoDB/Outbox/OutboxPersister.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public Task<IOutboxTransaction> BeginTransaction(ContextBag context,
},
ExpressionAttributeValues = new Dictionary<string, AttributeValue>(1)
{
{ ":outboxId", new AttributeValue { S = OutboxPartitionKey(messageId) } }
{ ":outboxId", new AttributeValue { S = OutboxPartitionKey(endpointIdentifier, messageId) } }
},
TableName = configuration.Table.TableName
};
Expand Down Expand Up @@ -185,7 +185,7 @@ IReadOnlyCollection<TransactWriteItem> Serialize(OutboxMessage outboxMessage, Co
{
{
configuration.Table.PartitionKeyName,
new AttributeValue { S = OutboxPartitionKey(outboxMessage.MessageId) }
new AttributeValue { S = OutboxPartitionKey(endpointIdentifier, outboxMessage.MessageId) }
},
{
configuration.Table.SortKeyName,
Expand Down Expand Up @@ -220,7 +220,7 @@ IReadOnlyCollection<TransactWriteItem> Serialize(OutboxMessage outboxMessage, Co
{
Item = new Dictionary<string, AttributeValue>(6)
{
{configuration.Table.PartitionKeyName, new AttributeValue {S = OutboxPartitionKey(outboxMessage.MessageId)}},
{configuration.Table.PartitionKeyName, new AttributeValue {S = OutboxPartitionKey(endpointIdentifier, outboxMessage.MessageId)}},
{configuration.Table.SortKeyName, new AttributeValue {S = OutboxOperationSortKey(outboxMessage.MessageId, n)}},
{MessageId, new AttributeValue {S = operation.MessageId}},
{
Expand Down Expand Up @@ -288,7 +288,7 @@ public async Task SetAsDispatched(string messageId, ContextBag context,
{
Key = new Dictionary<string, AttributeValue>(2)
{
{configuration.Table.PartitionKeyName, new AttributeValue {S = OutboxPartitionKey(messageId)}},
{configuration.Table.PartitionKeyName, new AttributeValue {S = OutboxPartitionKey(endpointIdentifier, messageId)}},
{configuration.Table.SortKeyName, new AttributeValue {S = OutboxMetadataSortKey(messageId)}}
},
UpdateExpression = "SET #dispatched = :dispatched, #dispatched_at = :dispatched_at, #ttl = :ttl",
Expand Down Expand Up @@ -323,7 +323,7 @@ public async Task SetAsDispatched(string messageId, ContextBag context,
{
Key = new Dictionary<string, AttributeValue>(2)
{
{configuration.Table.PartitionKeyName, new AttributeValue {S = OutboxPartitionKey(messageId)}},
{configuration.Table.PartitionKeyName, new AttributeValue {S = OutboxPartitionKey(endpointIdentifier, messageId)}},
{configuration.Table.SortKeyName, new AttributeValue {S = OutboxOperationSortKey(messageId, i)}}
}
}
Expand All @@ -341,9 +341,9 @@ await dynamoDbClient.BatchWriteItemWithRetries(writeRequestBatches, configuratio
.ConfigureAwait(false);
}

string OutboxPartitionKey(string messageId) => $"OUTBOX#{endpointIdentifier}#{messageId}";
string OutboxMetadataSortKey(string messageId) => $"OUTBOX#METADATA#{messageId}";
string OutboxOperationSortKey(string messageId, int messageNumber) => $"OUTBOX#OPERATION#{messageId}#{messageNumber:D4}";
internal static string OutboxPartitionKey(string endpointIdentifier, string messageId) => $"OUTBOX#{endpointIdentifier}#{messageId}";
static string OutboxMetadataSortKey(string messageId) => $"OUTBOX#METADATA#{messageId}";
static string OutboxOperationSortKey(string messageId, int messageNumber) => $"OUTBOX#OPERATION#{messageId}#{messageNumber:D4}";

readonly IAmazonDynamoDB dynamoDbClient;
readonly OutboxPersistenceConfiguration configuration;
Expand Down
Loading