Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[*.cs]

# Justification: Test project
dotnet_diagnostic.CA2007.severity = none

# Justification: Tests don't support cancellation and don't need to forward IMessageHandlerContext.CancellationToken
dotnet_diagnostic.NSB0002.severity = suggestion
dotnet_diagnostic.PS0018.severity = suggestion
dotnet_diagnostic.PS0013.severity = suggestion

# Persistence library doesn't need saga analyzers
dotnet_diagnostic.NSB0004.severity = none
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.AcceptanceTesting.Support;
using NServiceBus.AcceptanceTests;
using NServiceBus.Configuration.AdvancedExtensibility;

public class ConfigureEndpointDynamoDBPersistence : IConfigureEndpointTestExecution
{
public Task Configure(string endpointName, EndpointConfiguration configuration, RunSettings settings, PublisherMetadata publisherMetadata)
{
if (configuration.GetSettings().Get<bool>("Endpoint.SendOnly"))
{
return Task.CompletedTask;
}

// disable installers which are enabled by default in the standard endpoint templates
configuration.GetSettings().Set("Installers.Enable", false);

var persistence = configuration.UsePersistence<DynamoPersistence>();
persistence.DynamoClient(SetupFixture.DynamoDBClient);
persistence.UseSharedTable(SetupFixture.TableConfiguration);

var sagas = persistence.Sagas();
sagas.UseEventualConsistentReads = true;

return Task.CompletedTask;
}

public Task Cleanup() => Task.CompletedTask;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>net8.0;net9.0</TargetFrameworks>
<SignAssembly>true</SignAssembly>
<AssemblyOriginatorKeyFile>..\NServiceBusTests.snk</AssemblyOriginatorKeyFile>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\NServiceBus.Persistence.DynamoDB\NServiceBus.Persistence.DynamoDB.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="GitHubActionsTestLogger" Version="2.4.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0" />
<PackageReference Include="NServiceBus.AcceptanceTests.Sources" Version="9.2.4" GeneratePathProperty="true" />
<PackageReference Include="NUnit" Version="4.3.2" />
<PackageReference Include="NUnit.Analyzers" Version="4.7.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.6.0" />
</ItemGroup>

<ItemGroup>
<Compile Include="..\NServiceBus.Persistence.DynamoDB.Tests\ClientFactory.cs" />
</ItemGroup>

<ItemGroup Condition="'$(PkgNServiceBus_AcceptanceTests_Sources)' != ''">
<Compile Remove="$(PkgNServiceBus_AcceptanceTests_Sources)\**\ConfigureEndpointInMemoryPersistence.cs" />
<Compile Remove="$(PkgNServiceBus_AcceptanceTests_Sources)\**\ConfigureEndpointLearningPersistence.cs" />
<Compile Remove="$(PkgNServiceBus_AcceptanceTests_Sources)\**\ConventionEnforcementTests.cs" />
<Compile Remove="$(PkgNServiceBus_AcceptanceTests_Sources)\**\DeterministicGuid.cs" />
<Compile Remove="$(PkgNServiceBus_AcceptanceTests_Sources)\**\Audit\*.*" />
<Compile Remove="$(PkgNServiceBus_AcceptanceTests_Sources)\**\Correlation\*.*" />
<Compile Remove="$(PkgNServiceBus_AcceptanceTests_Sources)\**\DataBus\*.*" />
<Compile Remove="$(PkgNServiceBus_AcceptanceTests_Sources)\**\DelayedDelivery\*.*" />
<Compile Remove="$(PkgNServiceBus_AcceptanceTests_Sources)\**\Forwarding\*.*" />
<Compile Remove="$(PkgNServiceBus_AcceptanceTests_Sources)\**\Feature\*.*" />
<Compile Remove="$(PkgNServiceBus_AcceptanceTests_Sources)\**\MessageId\*.*" />
<Compile Remove="$(PkgNServiceBus_AcceptanceTests_Sources)\**\Pipeline\*.*" />
<Compile Remove="$(PkgNServiceBus_AcceptanceTests_Sources)\**\Recoverability\*.*" />
<Compile Remove="$(PkgNServiceBus_AcceptanceTests_Sources)\**\Routing\**\*.*" />
<Compile Remove="$(PkgNServiceBus_AcceptanceTests_Sources)\**\Satellites\*.*" />
<Compile Remove="$(PkgNServiceBus_AcceptanceTests_Sources)\**\Scheduling\*.*" />
<Compile Remove="$(PkgNServiceBus_AcceptanceTests_Sources)\**\SelfVerification\*.*" />
<Compile Remove="$(PkgNServiceBus_AcceptanceTests_Sources)\**\Serialization\*.*" />
<Compile Remove="$(PkgNServiceBus_AcceptanceTests_Sources)\**\Timeout\*.*" />
<Compile Remove="$(PkgNServiceBus_AcceptanceTests_Sources)\**\TimeToBeReceived\*.*" />
<Compile Remove="$(PkgNServiceBus_AcceptanceTests_Sources)\**\Tx\**\*.*" />
<Compile Remove="$(PkgNServiceBus_AcceptanceTests_Sources)\**\Versioning\*.*" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
namespace NServiceBus.AcceptanceTests;

using System;
using System.IO;
using System.Threading.Tasks;
using Amazon.DynamoDBv2;
using NUnit.Framework;
using Persistence.DynamoDB;
using Persistence.DynamoDB.Tests;

[SetUpFixture]
public class SetupFixture
{
[OneTimeSetUp]
public async Task OneTimeSetUp()
{
TableConfiguration = new TableConfiguration
{
TableName = $"{DateTime.UtcNow.Ticks}_{Path.GetFileNameWithoutExtension(Path.GetTempFileName())}",
};

DynamoDBClient = ClientFactory.CreateDynamoDBClient();

var installer = new Installer(DynamoDBClient);

await installer.CreateTable(TableConfiguration);
}

[OneTimeTearDown]
public async Task OneTimeTearDown()
{
await DynamoDBClient.DeleteTableAsync(TableConfiguration.TableName);
DynamoDBClient.Dispose();
}

public static IAmazonDynamoDB DynamoDBClient;
public static TableConfiguration TableConfiguration;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace NServiceBus.AcceptanceTests;

using AcceptanceTesting.Support;

public partial class TestSuiteConstraints
{
public bool SupportsDtc { get; } = false;
public bool SupportsCrossQueueTransactions { get; } = true;
public bool SupportsNativePubSub { get; } = true;
public bool SupportsOutbox { get; } = true;
public bool SupportsDelayedDelivery { get; } = true;
public bool SupportsPurgeOnStartup { get; } = true;

public IConfigureEndpointTestExecution CreateTransportConfiguration() => new ConfigureEndpointAcceptanceTestingTransport(true, true, TransportTransactionMode.ReceiveOnly);

public IConfigureEndpointTestExecution CreatePersistenceConfiguration() => new ConfigureEndpointDynamoDBPersistence();
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,20 @@ public partial class PersistenceTestsConfiguration : IDynamoClientProvider
{
static PersistenceTestsConfiguration()
{
SagaVariants = new[]
{
new TestFixtureData(new TestVariant(new PersistenceConfiguration(usePessimisticLocking: false))).SetArgDisplayNames("Optimistic"),
new TestFixtureData(new TestVariant(new PersistenceConfiguration(usePessimisticLocking: true))).SetArgDisplayNames("Pessimistic"),
};

OutboxVariants = new[]
{
new TestFixtureData(new TestVariant(new PersistenceConfiguration(usePessimisticLocking: false))).SetArgDisplayNames("Optimistic"),
};
SagaVariants =
[
new TestFixtureData(new TestVariant(new PersistenceConfiguration())).SetArgDisplayNames("Optimistic"),
new TestFixtureData(new TestVariant(new PersistenceConfiguration(UseEventualConsistentReads: true))).SetArgDisplayNames("Optimistic Eventual Consistent"),
new TestFixtureData(new TestVariant(new PersistenceConfiguration(UsePessimisticLocking: true))).SetArgDisplayNames("Pessimistic")
];

OutboxVariants =
[
new TestFixtureData(new TestVariant(new PersistenceConfiguration())).SetArgDisplayNames("Optimistic"),
];
}
public class PersistenceConfiguration
{
public readonly bool UsePessimisticLocking;

public PersistenceConfiguration(bool usePessimisticLocking)
{
UsePessimisticLocking = usePessimisticLocking;
}
}
public record PersistenceConfiguration(bool? UsePessimisticLocking = null, bool? UseEventualConsistentReads = null);

public bool SupportsDtc => false;

Expand All @@ -57,14 +51,25 @@ public Task Configure(CancellationToken cancellationToken = default)
{
var configuration = (PersistenceConfiguration)Variant.Values[0];

var sagaPersistenceConfiguration = new SagaPersistenceConfiguration
{
Table = SetupFixture.SagaTable,
LeaseAcquisitionTimeout = Variant.SessionTimeout ?? TimeSpan.FromSeconds(10)
};

if (configuration.UsePessimisticLocking.HasValue)
{
sagaPersistenceConfiguration.UsePessimisticLocking = SupportsPessimisticConcurrency = configuration.UsePessimisticLocking.Value;
}

if (configuration.UseEventualConsistentReads.HasValue)
{
sagaPersistenceConfiguration.UseEventualConsistentReads = configuration.UseEventualConsistentReads.Value;
}

SagaStorage = new SagaPersister(
Client,
new SagaPersistenceConfiguration
{
Table = SetupFixture.SagaTable,
UsePessimisticLocking = SupportsPessimisticConcurrency = configuration.UsePessimisticLocking,
LeaseAcquisitionTimeout = Variant.SessionTimeout ?? TimeSpan.FromSeconds(10)
},
sagaPersistenceConfiguration,
"PersistenceTest");

OutboxStorage = new OutboxPersister(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[assembly: System.Runtime.CompilerServices.InternalsVisibleTo(@"NServiceBus.Persistence.DynamoDB.AcceptanceTests, PublicKey=00240000048000009400000006020000002400005253413100040000010001007f16e21368ff041183fab592d9e8ed37e7be355e93323147a1d29983d6e591b04282e4da0c9e18bd901e112c0033925eb7d7872c2f1706655891c5c9d57297994f707d16ee9a8f40d978f064ee1ffc73c0db3f4712691b23bf596f75130f4ec978cf78757ec034625a5f27e6bb50c618931ea49f6f628fd74271c32959efb1c5")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleTo(@"NServiceBus.Persistence.DynamoDB.EventualConsistent.AcceptanceTests, PublicKey=00240000048000009400000006020000002400005253413100040000010001007f16e21368ff041183fab592d9e8ed37e7be355e93323147a1d29983d6e591b04282e4da0c9e18bd901e112c0033925eb7d7872c2f1706655891c5c9d57297994f707d16ee9a8f40d978f064ee1ffc73c0db3f4712691b23bf596f75130f4ec978cf78757ec034625a5f27e6bb50c618931ea49f6f628fd74271c32959efb1c5")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleTo(@"NServiceBus.Persistence.DynamoDB.PersistenceTests, PublicKey=00240000048000009400000006020000002400005253413100040000010001007f16e21368ff041183fab592d9e8ed37e7be355e93323147a1d29983d6e591b04282e4da0c9e18bd901e112c0033925eb7d7872c2f1706655891c5c9d57297994f707d16ee9a8f40d978f064ee1ffc73c0db3f4712691b23bf596f75130f4ec978cf78757ec034625a5f27e6bb50c618931ea49f6f628fd74271c32959efb1c5")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleTo(@"NServiceBus.Persistence.DynamoDB.PessimisticLock.AcceptanceTests, PublicKey=00240000048000009400000006020000002400005253413100040000010001007f16e21368ff041183fab592d9e8ed37e7be355e93323147a1d29983d6e591b04282e4da0c9e18bd901e112c0033925eb7d7872c2f1706655891c5c9d57297994f707d16ee9a8f40d978f064ee1ffc73c0db3f4712691b23bf596f75130f4ec978cf78757ec034625a5f27e6bb50c618931ea49f6f628fd74271c32959efb1c5")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleTo(@"NServiceBus.Persistence.DynamoDB.Tests, PublicKey=00240000048000009400000006020000002400005253413100040000010001007f16e21368ff041183fab592d9e8ed37e7be355e93323147a1d29983d6e591b04282e4da0c9e18bd901e112c0033925eb7d7872c2f1706655891c5c9d57297994f707d16ee9a8f40d978f064ee1ffc73c0db3f4712691b23bf596f75130f4ec978cf78757ec034625a5f27e6bb50c618931ea49f6f628fd74271c32959efb1c5")]
Expand Down Expand Up @@ -40,6 +41,7 @@ namespace NServiceBus
public System.TimeSpan LeaseDuration { get; set; }
public System.Text.Json.JsonSerializerOptions MapperOptions { get; set; }
public NServiceBus.TableConfiguration Table { get; set; }
public bool UseEventualConsistentReads { get; set; }
public bool UsePessimisticLocking { get; set; }
}
public static class SynchronizedStorageSessionExtensions
Expand Down
6 changes: 6 additions & 0 deletions src/NServiceBus.Persistence.DynamoDB.sln
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NServiceBus.Persistence.Dyn
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NServiceBus.Persistence.DynamoDB.TransactionalSession.Tests", "NServiceBus.Persistence.DynamoDB.TransactionalSession.Tests\NServiceBus.Persistence.DynamoDB.TransactionalSession.Tests.csproj", "{EB66E88B-5B05-4D26-8FEF-739E325A14D6}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NServiceBus.Persistence.DynamoDB.EventualConsistent.AcceptanceTests", "NServiceBus.Persistence.DynamoDB.EventualConsistent.AcceptanceTests\NServiceBus.Persistence.DynamoDB.EventualConsistent.AcceptanceTests.csproj", "{5C63649F-25A3-4568-95EC-AF98761992EC}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -64,6 +66,10 @@ Global
{EB66E88B-5B05-4D26-8FEF-739E325A14D6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{EB66E88B-5B05-4D26-8FEF-739E325A14D6}.Release|Any CPU.ActiveCfg = Release|Any CPU
{EB66E88B-5B05-4D26-8FEF-739E325A14D6}.Release|Any CPU.Build.0 = Release|Any CPU
{5C63649F-25A3-4568-95EC-AF98761992EC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5C63649F-25A3-4568-95EC-AF98761992EC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5C63649F-25A3-4568-95EC-AF98761992EC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5C63649F-25A3-4568-95EC-AF98761992EC}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

<ItemGroup>
<InternalsVisibleTo Include="NServiceBus.Persistence.DynamoDB.AcceptanceTests" Key="$(NServiceBusTestsKey)" />
<InternalsVisibleTo Include="NServiceBus.Persistence.DynamoDB.EventualConsistent.AcceptanceTests" Key="$(NServiceBusTestsKey)" />
<InternalsVisibleTo Include="NServiceBus.Persistence.DynamoDB.PersistenceTests" Key="$(NServiceBusTestsKey)" />
<InternalsVisibleTo Include="NServiceBus.Persistence.DynamoDB.PessimisticLock.AcceptanceTests" Key="$(NServiceBusTestsKey)" />
<InternalsVisibleTo Include="NServiceBus.Persistence.DynamoDB.Tests" Key="$(NServiceBusTestsKey)" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,52 @@ public class SagaPersistenceConfiguration
/// <summary>
/// The configuration of the table used by the outbox persistence.
/// </summary>
public TableConfiguration Table { get; set; } = new TableConfiguration()
public TableConfiguration Table { get; set; } = new()
{
TimeToLiveAttributeName = null
};

/// <summary>
/// Enables eventual consistent reads on table containing the saga data. This might reduce costs for reads since
/// eventual consistent reads are cheaper than strongly consistent reads. However, it might lead to stale data being read which can lead
/// too more retries in case of concurrent updates to the same saga.
/// </summary>
/// <remarks>This setting is mutually exclusive to <see cref="UsePessimisticLocking"/> meaning when opting into eventual
/// consistent reads pessimistic locking is disabled when previously explicitly enabled and vice versa.</remarks>
public bool UseEventualConsistentReads
{
get => useEventualConsistentReads.GetValueOrDefault(false);
set
{
useEventualConsistentReads = value;
if (usePessimisticLocking.HasValue)
{
usePessimisticLocking = !value;
}
}
}

bool? useEventualConsistentReads;

/// <summary>
/// Enables pessimistic locking mode to avoid concurrent modifications to the same saga. Enable this mode to reduce retries due to optimistic concurrency control violations.
/// </summary>
public bool UsePessimisticLocking { get; set; } = false;
/// <remarks>This setting is mutually exclusive to <see cref="UseEventualConsistentReads"/> meaning when opting into pessimistic locking
/// eventual consistent reads are disabled when previously explicitly enabled and vice versa.</remarks>
public bool UsePessimisticLocking
{
get => usePessimisticLocking.GetValueOrDefault(false);
set
{
usePessimisticLocking = value;
if (useEventualConsistentReads.HasValue)
{
useEventualConsistentReads = !value;
}
}
}

bool? usePessimisticLocking;

/// <summary>
/// Determines whether the NServiceBus installer should create the Outbox table when enabled.
Expand Down
6 changes: 3 additions & 3 deletions src/NServiceBus.Persistence.DynamoDB/Saga/SagaPersister.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public SagaPersister(IAmazonDynamoDB dynamoDbClient, SagaPersistenceConfiguratio
// Using optimistic concurrency control
var getItemRequest = new GetItemRequest
{
ConsistentRead = true,
ConsistentRead = !configuration.UseEventualConsistentReads,
Key = new Dictionary<string, AttributeValue>(2)
{
{ configuration.Table.PartitionKeyName, new AttributeValue { S = SagaPartitionKey(sagaId) } },
Expand All @@ -48,7 +48,7 @@ public SagaPersister(IAmazonDynamoDB dynamoDbClient, SagaPersistenceConfiguratio
};

var response = await dynamoDbClient.GetItemAsync(getItemRequest, cancellationToken).ConfigureAwait(false);
return !response.IsItemSet ? default : Deserialize<TSagaData>(response.Item, context);
return !response.IsItemSet ? null : Deserialize<TSagaData>(response.Item, context);
}

async Task<TSagaData?> ReadWithLock<TSagaData>(Guid sagaId, ContextBag context,
Expand Down Expand Up @@ -140,7 +140,7 @@ await Task.Delay(Random.Next(100, 300), cancellationToken)
var sagaData = Mapper.ToObject<TSagaData>(attributeValues, configuration.MapperOptions);
if (sagaData is null)
{
return default;
return null;
}
var currentVersion = int.Parse(attributeValues[Metadata].M[SagaMetadataAttributeNames.Version].N);
context.Set($"dynamo_version:{sagaData.Id}", currentVersion);
Expand Down