diff --git a/src/NServiceBus.Persistence.DynamoDB.EventualConsistent.AcceptanceTests/.editorconfig b/src/NServiceBus.Persistence.DynamoDB.EventualConsistent.AcceptanceTests/.editorconfig new file mode 100644 index 00000000..1b8d54f8 --- /dev/null +++ b/src/NServiceBus.Persistence.DynamoDB.EventualConsistent.AcceptanceTests/.editorconfig @@ -0,0 +1,12 @@ +[*.cs] + +# Justification: Test project +dotnet_diagnostic.CA2007.severity = none + +# Justification: Tests don't support cancellation and don't need to forward IMessageHandlerContext.CancellationToken +dotnet_diagnostic.NSB0002.severity = suggestion +dotnet_diagnostic.PS0018.severity = suggestion +dotnet_diagnostic.PS0013.severity = suggestion + +# Persistence library doesn't need saga analyzers +dotnet_diagnostic.NSB0004.severity = none diff --git a/src/NServiceBus.Persistence.DynamoDB.EventualConsistent.AcceptanceTests/ConfigureEndpointDynamoDBPersistence.cs b/src/NServiceBus.Persistence.DynamoDB.EventualConsistent.AcceptanceTests/ConfigureEndpointDynamoDBPersistence.cs new file mode 100644 index 00000000..6d0e3024 --- /dev/null +++ b/src/NServiceBus.Persistence.DynamoDB.EventualConsistent.AcceptanceTests/ConfigureEndpointDynamoDBPersistence.cs @@ -0,0 +1,30 @@ +using System.Threading.Tasks; +using NServiceBus; +using NServiceBus.AcceptanceTesting.Support; +using NServiceBus.AcceptanceTests; +using NServiceBus.Configuration.AdvancedExtensibility; + +public class ConfigureEndpointDynamoDBPersistence : IConfigureEndpointTestExecution +{ + public Task Configure(string endpointName, EndpointConfiguration configuration, RunSettings settings, PublisherMetadata publisherMetadata) + { + if (configuration.GetSettings().Get("Endpoint.SendOnly")) + { + return Task.CompletedTask; + } + + // disable installers which are enabled by default in the standard endpoint templates + configuration.GetSettings().Set("Installers.Enable", false); + + var persistence = configuration.UsePersistence(); + persistence.DynamoClient(SetupFixture.DynamoDBClient); + persistence.UseSharedTable(SetupFixture.TableConfiguration); + + var sagas = persistence.Sagas(); + sagas.UseEventuallyConsistentReads = true; + + return Task.CompletedTask; + } + + public Task Cleanup() => Task.CompletedTask; +} \ No newline at end of file diff --git a/src/NServiceBus.Persistence.DynamoDB.EventualConsistent.AcceptanceTests/NServiceBus.Persistence.DynamoDB.EventualConsistent.AcceptanceTests.csproj b/src/NServiceBus.Persistence.DynamoDB.EventualConsistent.AcceptanceTests/NServiceBus.Persistence.DynamoDB.EventualConsistent.AcceptanceTests.csproj new file mode 100644 index 00000000..16afe2a7 --- /dev/null +++ b/src/NServiceBus.Persistence.DynamoDB.EventualConsistent.AcceptanceTests/NServiceBus.Persistence.DynamoDB.EventualConsistent.AcceptanceTests.csproj @@ -0,0 +1,51 @@ + + + + net8.0;net9.0 + true + ..\NServiceBusTests.snk + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/NServiceBus.Persistence.DynamoDB.EventualConsistent.AcceptanceTests/SetupFixture.cs b/src/NServiceBus.Persistence.DynamoDB.EventualConsistent.AcceptanceTests/SetupFixture.cs new file mode 100644 index 00000000..54a78c3d --- /dev/null +++ b/src/NServiceBus.Persistence.DynamoDB.EventualConsistent.AcceptanceTests/SetupFixture.cs @@ -0,0 +1,38 @@ +namespace NServiceBus.AcceptanceTests; + +using System; +using System.IO; +using System.Threading.Tasks; +using Amazon.DynamoDBv2; +using NUnit.Framework; +using Persistence.DynamoDB; +using Persistence.DynamoDB.Tests; + +[SetUpFixture] +public class SetupFixture +{ + [OneTimeSetUp] + public async Task OneTimeSetUp() + { + TableConfiguration = new TableConfiguration + { + TableName = $"{DateTime.UtcNow.Ticks}_{Path.GetFileNameWithoutExtension(Path.GetTempFileName())}", + }; + + DynamoDBClient = ClientFactory.CreateDynamoDBClient(); + + var installer = new Installer(DynamoDBClient); + + await installer.CreateTable(TableConfiguration); + } + + [OneTimeTearDown] + public async Task OneTimeTearDown() + { + await DynamoDBClient.DeleteTableAsync(TableConfiguration.TableName); + DynamoDBClient.Dispose(); + } + + public static IAmazonDynamoDB DynamoDBClient; + public static TableConfiguration TableConfiguration; +} \ No newline at end of file diff --git a/src/NServiceBus.Persistence.DynamoDB.EventualConsistent.AcceptanceTests/TestSuiteConstraints.cs b/src/NServiceBus.Persistence.DynamoDB.EventualConsistent.AcceptanceTests/TestSuiteConstraints.cs new file mode 100644 index 00000000..f04f4246 --- /dev/null +++ b/src/NServiceBus.Persistence.DynamoDB.EventualConsistent.AcceptanceTests/TestSuiteConstraints.cs @@ -0,0 +1,17 @@ +namespace NServiceBus.AcceptanceTests; + +using AcceptanceTesting.Support; + +public partial class TestSuiteConstraints +{ + public bool SupportsDtc { get; } = false; + public bool SupportsCrossQueueTransactions { get; } = true; + public bool SupportsNativePubSub { get; } = true; + public bool SupportsOutbox { get; } = true; + public bool SupportsDelayedDelivery { get; } = true; + public bool SupportsPurgeOnStartup { get; } = true; + + public IConfigureEndpointTestExecution CreateTransportConfiguration() => new ConfigureEndpointAcceptanceTestingTransport(true, true, TransportTransactionMode.ReceiveOnly); + + public IConfigureEndpointTestExecution CreatePersistenceConfiguration() => new ConfigureEndpointDynamoDBPersistence(); +} \ No newline at end of file diff --git a/src/NServiceBus.Persistence.DynamoDB.PersistenceTests/PersistenceTestsConfiguration.cs b/src/NServiceBus.Persistence.DynamoDB.PersistenceTests/PersistenceTestsConfiguration.cs index 90834fc1..083c726b 100644 --- a/src/NServiceBus.Persistence.DynamoDB.PersistenceTests/PersistenceTestsConfiguration.cs +++ b/src/NServiceBus.Persistence.DynamoDB.PersistenceTests/PersistenceTestsConfiguration.cs @@ -14,26 +14,20 @@ public partial class PersistenceTestsConfiguration : IDynamoClientProvider { static PersistenceTestsConfiguration() { - SagaVariants = new[] - { - new TestFixtureData(new TestVariant(new PersistenceConfiguration(usePessimisticLocking: false))).SetArgDisplayNames("Optimistic"), - new TestFixtureData(new TestVariant(new PersistenceConfiguration(usePessimisticLocking: true))).SetArgDisplayNames("Pessimistic"), - }; - - OutboxVariants = new[] - { - new TestFixtureData(new TestVariant(new PersistenceConfiguration(usePessimisticLocking: false))).SetArgDisplayNames("Optimistic"), - }; + SagaVariants = + [ + new TestFixtureData(new TestVariant(new PersistenceConfiguration())).SetArgDisplayNames("Optimistic"), + new TestFixtureData(new TestVariant(new PersistenceConfiguration(UseEventuallyConsistentReads: true))).SetArgDisplayNames("Optimistic Eventual Consistent"), + new TestFixtureData(new TestVariant(new PersistenceConfiguration(UsePessimisticLocking: true))).SetArgDisplayNames("Pessimistic") + ]; + + OutboxVariants = + [ + new TestFixtureData(new TestVariant(new PersistenceConfiguration())).SetArgDisplayNames("Optimistic"), + ]; } - public class PersistenceConfiguration - { - public readonly bool UsePessimisticLocking; - public PersistenceConfiguration(bool usePessimisticLocking) - { - UsePessimisticLocking = usePessimisticLocking; - } - } + public record PersistenceConfiguration(bool? UsePessimisticLocking = null, bool? UseEventuallyConsistentReads = null); public bool SupportsDtc => false; @@ -57,14 +51,25 @@ public Task Configure(CancellationToken cancellationToken = default) { var configuration = (PersistenceConfiguration)Variant.Values[0]; + var sagaPersistenceConfiguration = new SagaPersistenceConfiguration + { + Table = SetupFixture.SagaTable, + LeaseAcquisitionTimeout = Variant.SessionTimeout ?? TimeSpan.FromSeconds(10) + }; + + if (configuration.UsePessimisticLocking.HasValue) + { + sagaPersistenceConfiguration.UsePessimisticLocking = SupportsPessimisticConcurrency = configuration.UsePessimisticLocking.Value; + } + + if (configuration.UseEventuallyConsistentReads.HasValue) + { + sagaPersistenceConfiguration.UseEventuallyConsistentReads = configuration.UseEventuallyConsistentReads.Value; + } + SagaStorage = new SagaPersister( Client, - new SagaPersistenceConfiguration - { - Table = SetupFixture.SagaTable, - UsePessimisticLocking = SupportsPessimisticConcurrency = configuration.UsePessimisticLocking, - LeaseAcquisitionTimeout = Variant.SessionTimeout ?? TimeSpan.FromSeconds(10) - }, + sagaPersistenceConfiguration, "PersistenceTest"); OutboxStorage = new OutboxPersister( diff --git a/src/NServiceBus.Persistence.DynamoDB.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt b/src/NServiceBus.Persistence.DynamoDB.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt index 30706acd..29acdf89 100644 --- a/src/NServiceBus.Persistence.DynamoDB.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt +++ b/src/NServiceBus.Persistence.DynamoDB.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt @@ -1,4 +1,5 @@ [assembly: System.Runtime.CompilerServices.InternalsVisibleTo(@"NServiceBus.Persistence.DynamoDB.AcceptanceTests, PublicKey=00240000048000009400000006020000002400005253413100040000010001007f16e21368ff041183fab592d9e8ed37e7be355e93323147a1d29983d6e591b04282e4da0c9e18bd901e112c0033925eb7d7872c2f1706655891c5c9d57297994f707d16ee9a8f40d978f064ee1ffc73c0db3f4712691b23bf596f75130f4ec978cf78757ec034625a5f27e6bb50c618931ea49f6f628fd74271c32959efb1c5")] +[assembly: System.Runtime.CompilerServices.InternalsVisibleTo(@"NServiceBus.Persistence.DynamoDB.EventualConsistent.AcceptanceTests, PublicKey=00240000048000009400000006020000002400005253413100040000010001007f16e21368ff041183fab592d9e8ed37e7be355e93323147a1d29983d6e591b04282e4da0c9e18bd901e112c0033925eb7d7872c2f1706655891c5c9d57297994f707d16ee9a8f40d978f064ee1ffc73c0db3f4712691b23bf596f75130f4ec978cf78757ec034625a5f27e6bb50c618931ea49f6f628fd74271c32959efb1c5")] [assembly: System.Runtime.CompilerServices.InternalsVisibleTo(@"NServiceBus.Persistence.DynamoDB.PersistenceTests, PublicKey=00240000048000009400000006020000002400005253413100040000010001007f16e21368ff041183fab592d9e8ed37e7be355e93323147a1d29983d6e591b04282e4da0c9e18bd901e112c0033925eb7d7872c2f1706655891c5c9d57297994f707d16ee9a8f40d978f064ee1ffc73c0db3f4712691b23bf596f75130f4ec978cf78757ec034625a5f27e6bb50c618931ea49f6f628fd74271c32959efb1c5")] [assembly: System.Runtime.CompilerServices.InternalsVisibleTo(@"NServiceBus.Persistence.DynamoDB.PessimisticLock.AcceptanceTests, PublicKey=00240000048000009400000006020000002400005253413100040000010001007f16e21368ff041183fab592d9e8ed37e7be355e93323147a1d29983d6e591b04282e4da0c9e18bd901e112c0033925eb7d7872c2f1706655891c5c9d57297994f707d16ee9a8f40d978f064ee1ffc73c0db3f4712691b23bf596f75130f4ec978cf78757ec034625a5f27e6bb50c618931ea49f6f628fd74271c32959efb1c5")] [assembly: System.Runtime.CompilerServices.InternalsVisibleTo(@"NServiceBus.Persistence.DynamoDB.Tests, PublicKey=00240000048000009400000006020000002400005253413100040000010001007f16e21368ff041183fab592d9e8ed37e7be355e93323147a1d29983d6e591b04282e4da0c9e18bd901e112c0033925eb7d7872c2f1706655891c5c9d57297994f707d16ee9a8f40d978f064ee1ffc73c0db3f4712691b23bf596f75130f4ec978cf78757ec034625a5f27e6bb50c618931ea49f6f628fd74271c32959efb1c5")] @@ -40,6 +41,7 @@ namespace NServiceBus public System.TimeSpan LeaseDuration { get; set; } public System.Text.Json.JsonSerializerOptions MapperOptions { get; set; } public NServiceBus.TableConfiguration Table { get; set; } + public bool UseEventuallyConsistentReads { get; set; } public bool UsePessimisticLocking { get; set; } } public static class SynchronizedStorageSessionExtensions diff --git a/src/NServiceBus.Persistence.DynamoDB.sln b/src/NServiceBus.Persistence.DynamoDB.sln index 2b060e1c..ed846315 100644 --- a/src/NServiceBus.Persistence.DynamoDB.sln +++ b/src/NServiceBus.Persistence.DynamoDB.sln @@ -26,6 +26,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NServiceBus.Persistence.Dyn EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NServiceBus.Persistence.DynamoDB.TransactionalSession.Tests", "NServiceBus.Persistence.DynamoDB.TransactionalSession.Tests\NServiceBus.Persistence.DynamoDB.TransactionalSession.Tests.csproj", "{EB66E88B-5B05-4D26-8FEF-739E325A14D6}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NServiceBus.Persistence.DynamoDB.EventualConsistent.AcceptanceTests", "NServiceBus.Persistence.DynamoDB.EventualConsistent.AcceptanceTests\NServiceBus.Persistence.DynamoDB.EventualConsistent.AcceptanceTests.csproj", "{5C63649F-25A3-4568-95EC-AF98761992EC}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -64,6 +66,10 @@ Global {EB66E88B-5B05-4D26-8FEF-739E325A14D6}.Debug|Any CPU.Build.0 = Debug|Any CPU {EB66E88B-5B05-4D26-8FEF-739E325A14D6}.Release|Any CPU.ActiveCfg = Release|Any CPU {EB66E88B-5B05-4D26-8FEF-739E325A14D6}.Release|Any CPU.Build.0 = Release|Any CPU + {5C63649F-25A3-4568-95EC-AF98761992EC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {5C63649F-25A3-4568-95EC-AF98761992EC}.Debug|Any CPU.Build.0 = Debug|Any CPU + {5C63649F-25A3-4568-95EC-AF98761992EC}.Release|Any CPU.ActiveCfg = Release|Any CPU + {5C63649F-25A3-4568-95EC-AF98761992EC}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/src/NServiceBus.Persistence.DynamoDB/NServiceBus.Persistence.DynamoDB.csproj b/src/NServiceBus.Persistence.DynamoDB/NServiceBus.Persistence.DynamoDB.csproj index fff53f57..cc5b9447 100644 --- a/src/NServiceBus.Persistence.DynamoDB/NServiceBus.Persistence.DynamoDB.csproj +++ b/src/NServiceBus.Persistence.DynamoDB/NServiceBus.Persistence.DynamoDB.csproj @@ -19,6 +19,7 @@ + diff --git a/src/NServiceBus.Persistence.DynamoDB/Saga/SagaPersistenceConfiguration.cs b/src/NServiceBus.Persistence.DynamoDB/Saga/SagaPersistenceConfiguration.cs index a98caa59..50be65c4 100644 --- a/src/NServiceBus.Persistence.DynamoDB/Saga/SagaPersistenceConfiguration.cs +++ b/src/NServiceBus.Persistence.DynamoDB/Saga/SagaPersistenceConfiguration.cs @@ -12,15 +12,52 @@ public class SagaPersistenceConfiguration /// /// The configuration of the table used by the outbox persistence. /// - public TableConfiguration Table { get; set; } = new TableConfiguration() + public TableConfiguration Table { get; set; } = new() { TimeToLiveAttributeName = null }; + /// + /// Enables eventual consistent reads on table containing the saga data. This might reduce costs for reads since + /// eventual consistent reads are cheaper than strongly consistent reads. However, it might lead to stale data being read which can lead + /// too more retries in case of concurrent updates to the same saga. + /// + /// This setting is mutually exclusive to meaning when opting into eventual + /// consistent reads pessimistic locking is disabled when previously explicitly enabled and vice versa. + public bool UseEventuallyConsistentReads + { + get => useEventuallyConsistentReads.GetValueOrDefault(false); + set + { + useEventuallyConsistentReads = value; + if (usePessimisticLocking.HasValue) + { + usePessimisticLocking = !value; + } + } + } + + bool? useEventuallyConsistentReads; + /// /// Enables pessimistic locking mode to avoid concurrent modifications to the same saga. Enable this mode to reduce retries due to optimistic concurrency control violations. /// - public bool UsePessimisticLocking { get; set; } = false; + /// This setting is mutually exclusive to meaning when opting into pessimistic locking + /// eventual consistent reads are disabled when previously explicitly enabled and vice versa. + public bool UsePessimisticLocking + { + get => usePessimisticLocking.GetValueOrDefault(false); + set + { + usePessimisticLocking = value; + if (useEventuallyConsistentReads.HasValue) + { + useEventuallyConsistentReads = !value; + } + } + } + + bool? usePessimisticLocking; /// /// Determines whether the NServiceBus installer should create the Outbox table when enabled. diff --git a/src/NServiceBus.Persistence.DynamoDB/Saga/SagaPersister.cs b/src/NServiceBus.Persistence.DynamoDB/Saga/SagaPersister.cs index 8cec0835..24739d19 100644 --- a/src/NServiceBus.Persistence.DynamoDB/Saga/SagaPersister.cs +++ b/src/NServiceBus.Persistence.DynamoDB/Saga/SagaPersister.cs @@ -38,7 +38,7 @@ public SagaPersister(IAmazonDynamoDB dynamoDbClient, SagaPersistenceConfiguratio // Using optimistic concurrency control var getItemRequest = new GetItemRequest { - ConsistentRead = true, + ConsistentRead = !configuration.UseEventuallyConsistentReads, Key = new Dictionary(2) { { configuration.Table.PartitionKeyName, new AttributeValue { S = SagaPartitionKey(sagaId) } }, @@ -48,7 +48,7 @@ public SagaPersister(IAmazonDynamoDB dynamoDbClient, SagaPersistenceConfiguratio }; var response = await dynamoDbClient.GetItemAsync(getItemRequest, cancellationToken).ConfigureAwait(false); - return !response.IsItemSet ? default : Deserialize(response.Item, context); + return !response.IsItemSet ? null : Deserialize(response.Item, context); } async Task ReadWithLock(Guid sagaId, ContextBag context, @@ -140,7 +140,7 @@ await Task.Delay(Random.Next(100, 300), cancellationToken) var sagaData = Mapper.ToObject(attributeValues, configuration.MapperOptions); if (sagaData is null) { - return default; + return null; } var currentVersion = int.Parse(attributeValues[Metadata].M[SagaMetadataAttributeNames.Version].N); context.Set($"dynamo_version:{sagaData.Id}", currentVersion);