Skip to content

Commit 856f877

Browse files
Add additional test coverage to verify sends work even when dynamosession not used (#925)
* Aditional test coverage * Assert multiple * Use partition key method to retrieve outbox records --------- Co-authored-by: Daniel Marbach <danielmarbach@users.noreply.github.com>
1 parent 2d40525 commit 856f877

File tree

2 files changed

+57
-8
lines changed

2 files changed

+57
-8
lines changed

src/NServiceBus.Persistence.DynamoDB.TransactionalSession.AcceptanceTests/When_using_transactional_session.cs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66
using System.Threading.Tasks;
77
using Microsoft.Extensions.DependencyInjection;
88
using AcceptanceTesting;
9+
using AcceptanceTesting.Customization;
910
using Amazon.DynamoDBv2.Model;
1011
using NUnit.Framework;
12+
using Persistence.DynamoDB;
1113

1214
public class When_using_transactional_session : NServiceBusAcceptanceTest
1315
{
@@ -117,6 +119,53 @@ public async Task Should_send_messages_and_store_document_in_dynamo_session_on_t
117119
Assert.That(documents.Count, Is.EqualTo(1));
118120
}
119121

122+
[TestCase(true)]
123+
[TestCase(false)]
124+
public async Task Should_send_messages_on_transactional_session_commit_even_when_dynamo_session_not_used(bool outboxEnabled)
125+
{
126+
var endpointIdentifier = Conventions.EndpointNamingConvention(typeof(AnEndpoint));
127+
var messageId = Guid.NewGuid().ToString("N");
128+
129+
var context = await Scenario.Define<Context>()
130+
.WithEndpoint<AnEndpoint>(s => s.When(async (_, ctx) =>
131+
{
132+
using var scope = ctx.ServiceProvider.CreateScope();
133+
using var transactionalSession = scope.ServiceProvider.GetRequiredService<ITransactionalSession>();
134+
await transactionalSession.Open(new DynamoOpenSessionOptions());
135+
136+
var sendOptions = new SendOptions();
137+
sendOptions.SetMessageId(messageId);
138+
sendOptions.RouteToThisEndpoint();
139+
140+
await transactionalSession.Send(new SampleMessage(), sendOptions, CancellationToken.None);
141+
142+
await transactionalSession.Commit(CancellationToken.None).ConfigureAwait(false);
143+
}))
144+
.Done(c => c.MessageReceived)
145+
.Run();
146+
147+
var documents = await SetupFixture.DynamoDBClient.QueryAsync(new QueryRequest()
148+
{
149+
TableName = SetupFixture.TableConfiguration.TableName,
150+
ConsistentRead = true,
151+
KeyConditionExpression = "#pk = :pk",
152+
ExpressionAttributeNames =
153+
new Dictionary<string, string>()
154+
{
155+
{ "#pk", SetupFixture.TableConfiguration.PartitionKeyName }
156+
},
157+
ExpressionAttributeValues = new Dictionary<string, AttributeValue>()
158+
{
159+
{ ":pk", new AttributeValue(OutboxPersister.OutboxPartitionKey(endpointIdentifier, messageId)) }
160+
}
161+
});
162+
Assert.Multiple(() =>
163+
{
164+
Assert.That(documents.Count, outboxEnabled ? Is.EqualTo(1) : Is.Zero);
165+
Assert.That(context.MessageReceived, Is.True);
166+
});
167+
}
168+
120169
[TestCase(true)]
121170
[TestCase(false)]
122171
public async Task Should_not_send_messages_if_session_is_not_committed(bool outboxEnabled)

src/NServiceBus.Persistence.DynamoDB/Outbox/OutboxPersister.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public Task<IOutboxTransaction> BeginTransaction(ContextBag context,
4848
},
4949
ExpressionAttributeValues = new Dictionary<string, AttributeValue>(1)
5050
{
51-
{ ":outboxId", new AttributeValue { S = OutboxPartitionKey(messageId) } }
51+
{ ":outboxId", new AttributeValue { S = OutboxPartitionKey(endpointIdentifier, messageId) } }
5252
},
5353
TableName = configuration.Table.TableName
5454
};
@@ -185,7 +185,7 @@ IReadOnlyCollection<TransactWriteItem> Serialize(OutboxMessage outboxMessage, Co
185185
{
186186
{
187187
configuration.Table.PartitionKeyName,
188-
new AttributeValue { S = OutboxPartitionKey(outboxMessage.MessageId) }
188+
new AttributeValue { S = OutboxPartitionKey(endpointIdentifier, outboxMessage.MessageId) }
189189
},
190190
{
191191
configuration.Table.SortKeyName,
@@ -220,7 +220,7 @@ IReadOnlyCollection<TransactWriteItem> Serialize(OutboxMessage outboxMessage, Co
220220
{
221221
Item = new Dictionary<string, AttributeValue>(6)
222222
{
223-
{configuration.Table.PartitionKeyName, new AttributeValue {S = OutboxPartitionKey(outboxMessage.MessageId)}},
223+
{configuration.Table.PartitionKeyName, new AttributeValue {S = OutboxPartitionKey(endpointIdentifier, outboxMessage.MessageId)}},
224224
{configuration.Table.SortKeyName, new AttributeValue {S = OutboxOperationSortKey(outboxMessage.MessageId, n)}},
225225
{MessageId, new AttributeValue {S = operation.MessageId}},
226226
{
@@ -288,7 +288,7 @@ public async Task SetAsDispatched(string messageId, ContextBag context,
288288
{
289289
Key = new Dictionary<string, AttributeValue>(2)
290290
{
291-
{configuration.Table.PartitionKeyName, new AttributeValue {S = OutboxPartitionKey(messageId)}},
291+
{configuration.Table.PartitionKeyName, new AttributeValue {S = OutboxPartitionKey(endpointIdentifier, messageId)}},
292292
{configuration.Table.SortKeyName, new AttributeValue {S = OutboxMetadataSortKey(messageId)}}
293293
},
294294
UpdateExpression = "SET #dispatched = :dispatched, #dispatched_at = :dispatched_at, #ttl = :ttl",
@@ -323,7 +323,7 @@ public async Task SetAsDispatched(string messageId, ContextBag context,
323323
{
324324
Key = new Dictionary<string, AttributeValue>(2)
325325
{
326-
{configuration.Table.PartitionKeyName, new AttributeValue {S = OutboxPartitionKey(messageId)}},
326+
{configuration.Table.PartitionKeyName, new AttributeValue {S = OutboxPartitionKey(endpointIdentifier, messageId)}},
327327
{configuration.Table.SortKeyName, new AttributeValue {S = OutboxOperationSortKey(messageId, i)}}
328328
}
329329
}
@@ -341,9 +341,9 @@ await dynamoDbClient.BatchWriteItemWithRetries(writeRequestBatches, configuratio
341341
.ConfigureAwait(false);
342342
}
343343

344-
string OutboxPartitionKey(string messageId) => $"OUTBOX#{endpointIdentifier}#{messageId}";
345-
string OutboxMetadataSortKey(string messageId) => $"OUTBOX#METADATA#{messageId}";
346-
string OutboxOperationSortKey(string messageId, int messageNumber) => $"OUTBOX#OPERATION#{messageId}#{messageNumber:D4}";
344+
internal static string OutboxPartitionKey(string endpointIdentifier, string messageId) => $"OUTBOX#{endpointIdentifier}#{messageId}";
345+
static string OutboxMetadataSortKey(string messageId) => $"OUTBOX#METADATA#{messageId}";
346+
static string OutboxOperationSortKey(string messageId, int messageNumber) => $"OUTBOX#OPERATION#{messageId}#{messageNumber:D4}";
347347

348348
readonly IAmazonDynamoDB dynamoDbClient;
349349
readonly OutboxPersistenceConfiguration configuration;

0 commit comments

Comments
 (0)