diff --git a/Snippets/MongoDB/MongoDB.sln b/Snippets/MongoDB/MongoDB.sln index 294e9bc6f97..d678ee31704 100644 --- a/Snippets/MongoDB/MongoDB.sln +++ b/Snippets/MongoDB/MongoDB.sln @@ -12,6 +12,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MongoDB_5", "MongoDB_5\Mong EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MongoDB_6", "MongoDB_6\MongoDB_6.csproj", "{58940814-447D-457B-B541-1BFB51820F7F}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MongoDB_7", "MongoDB_7\MongoDB_7.csproj", "{2146CB07-4B91-40AA-B52B-69A6A6302743}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -27,6 +29,8 @@ Global {D114D094-6ABB-4922-832B-ECBF8A213228}.Debug|Any CPU.Build.0 = Debug|Any CPU {58940814-447D-457B-B541-1BFB51820F7F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {58940814-447D-457B-B541-1BFB51820F7F}.Debug|Any CPU.Build.0 = Debug|Any CPU + {2146CB07-4B91-40AA-B52B-69A6A6302743}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {2146CB07-4B91-40AA-B52B-69A6A6302743}.Debug|Any CPU.Build.0 = Debug|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/Snippets/MongoDB/MongoDB_6/CustomProvider.cs b/Snippets/MongoDB/MongoDB_6/CustomProvider.cs new file mode 100644 index 00000000000..2525e821916 --- /dev/null +++ b/Snippets/MongoDB/MongoDB_6/CustomProvider.cs @@ -0,0 +1,30 @@ +using Microsoft.Extensions.DependencyInjection; +using MongoDB.Driver; +using NServiceBus; +using NServiceBus.Storage.MongoDB; + +#region MongoDBClientProvider + +class CustomMongoClientProvider + : IMongoClientProvider +{ + // get fully configured via DI + public CustomMongoClientProvider(IMongoClient mongoClient) + { + Client = mongoClient; + } + public IMongoClient Client { get; } +} +#endregion + +class CustomMongoClientProviderRegistration +{ + public CustomMongoClientProviderRegistration(EndpointConfiguration endpointConfiguration) + { + #region MongoDBCustomClientProviderRegistration + + endpointConfiguration.RegisterComponents(c => c.AddTransient()); + + #endregion + } +} \ No newline at end of file diff --git a/Snippets/MongoDB/MongoDB_6/MongoDB_6.csproj b/Snippets/MongoDB/MongoDB_6/MongoDB_6.csproj index 0626348e63b..8d1e92fb030 100644 --- a/Snippets/MongoDB/MongoDB_6/MongoDB_6.csproj +++ b/Snippets/MongoDB/MongoDB_6/MongoDB_6.csproj @@ -1,8 +1,8 @@ - net10.0 + net8.0 - + \ No newline at end of file diff --git a/Snippets/MongoDB/MongoDB_6/Usage.cs b/Snippets/MongoDB/MongoDB_6/Usage.cs index 58803fcc04b..105a6b43916 100644 --- a/Snippets/MongoDB/MongoDB_6/Usage.cs +++ b/Snippets/MongoDB/MongoDB_6/Usage.cs @@ -47,8 +47,28 @@ void TimeToKeepOutboxDeduplicationData(EndpointConfiguration endpointConfigurati { #region MongoDBOutboxCleanup + var outbox = endpointConfiguration.EnableOutbox(); + outbox.TimeToKeepOutboxDeduplicationData(TimeSpan.FromDays(30)); + + #endregion + } + + void DisableReadFallback(EndpointConfiguration endpointConfiguration) + { + #region MongoDBDisableReadFallback + + var outbox = endpointConfiguration.EnableOutbox(); + outbox.DisableReadFallback(); + + #endregion + } + + void DisableInstaller(EndpointConfiguration endpointConfiguration) + { + #region MongoDBDisableInstaller + var persistence = endpointConfiguration.UsePersistence(); - persistence.TimeToKeepOutboxDeduplicationData(TimeSpan.FromDays(30)); + persistence.DisableInstaller(); #endregion } diff --git a/Snippets/MongoDB/MongoDB_7/CustomProvider.cs b/Snippets/MongoDB/MongoDB_7/CustomProvider.cs new file mode 100644 index 00000000000..2525e821916 --- /dev/null +++ b/Snippets/MongoDB/MongoDB_7/CustomProvider.cs @@ -0,0 +1,30 @@ +using Microsoft.Extensions.DependencyInjection; +using MongoDB.Driver; +using NServiceBus; +using NServiceBus.Storage.MongoDB; + +#region MongoDBClientProvider + +class CustomMongoClientProvider + : IMongoClientProvider +{ + // get fully configured via DI + public CustomMongoClientProvider(IMongoClient mongoClient) + { + Client = mongoClient; + } + public IMongoClient Client { get; } +} +#endregion + +class CustomMongoClientProviderRegistration +{ + public CustomMongoClientProviderRegistration(EndpointConfiguration endpointConfiguration) + { + #region MongoDBCustomClientProviderRegistration + + endpointConfiguration.RegisterComponents(c => c.AddTransient()); + + #endregion + } +} \ No newline at end of file diff --git a/Snippets/MongoDB/MongoDB_7/DocumentVersion.cs b/Snippets/MongoDB/MongoDB_7/DocumentVersion.cs new file mode 100644 index 00000000000..4e8fdbc1140 --- /dev/null +++ b/Snippets/MongoDB/MongoDB_7/DocumentVersion.cs @@ -0,0 +1,30 @@ +using System; +using System.Threading.Tasks; +using MongoDB.Bson; +using MongoDB.Driver; + +class DocumentVersion +{ + async Task UpdateWithVersion(IMongoCollection collection, string versionFieldName, UpdateDefinitionBuilder updateBuilder, int currentVersion, Guid documentId) + { + #region MongoDBUpdateWithVersion + + UpdateDefinition updateDefinition = updateBuilder.Inc(versionFieldName, 1); + FilterDefinition filterDefinition = Builders.Filter.Eq("_id", documentId) + & Builders.Filter.Eq(versionFieldName, currentVersion); + + //Define other update operations on the document + + var modifiedDocument = await collection.FindOneAndUpdateAsync( + filter: filterDefinition, + update: updateDefinition, + options: new FindOneAndUpdateOptions { IsUpsert = false, ReturnDocument = ReturnDocument.After }); + + if (modifiedDocument == null) + { + //The document was not updated because the version was already incremented. + } + + #endregion + } +} diff --git a/Snippets/MongoDB/MongoDB_7/MongoDB_7.csproj b/Snippets/MongoDB/MongoDB_7/MongoDB_7.csproj new file mode 100644 index 00000000000..044411850df --- /dev/null +++ b/Snippets/MongoDB/MongoDB_7/MongoDB_7.csproj @@ -0,0 +1,8 @@ + + + net10.0 + + + + + \ No newline at end of file diff --git a/Snippets/MongoDB/MongoDB_7/SharedTransaction.cs b/Snippets/MongoDB/MongoDB_7/SharedTransaction.cs new file mode 100644 index 00000000000..16c78152ea2 --- /dev/null +++ b/Snippets/MongoDB/MongoDB_7/SharedTransaction.cs @@ -0,0 +1,25 @@ +using System.Threading.Tasks; +using NServiceBus; + +class SharedTransaction : IHandleMessages +{ + #region MongoDBHandlerSharedTransaction + + public Task Handle(MyMessage message, IMessageHandlerContext context) + { + var session = context.SynchronizedStorageSession.GetClientSession(); + var collection = session.Client.GetDatabase("mydatabase").GetCollection("mycollection"); + return collection.InsertOneAsync(session, new MyBusinessObject(), null, context.CancellationToken); + } + + #endregion +} + +class MyMessage +{ +} + +class MyBusinessObject +{ +} + diff --git a/Snippets/MongoDB/MongoDB_7/SharedTransactionDI.cs b/Snippets/MongoDB/MongoDB_7/SharedTransactionDI.cs new file mode 100644 index 00000000000..a222427f6bf --- /dev/null +++ b/Snippets/MongoDB/MongoDB_7/SharedTransactionDI.cs @@ -0,0 +1,30 @@ +using System.Threading.Tasks; +using NServiceBus.Storage.MongoDB; + +public class SharedTransactionDI +{ + #region MongoDBSharedTransactionDI + class MyService + { + IMongoSynchronizedStorageSession sharedSession; + + // Resolved from DI container + public MyService(IMongoSynchronizedStorageSession sharedSession) + { + this.sharedSession = sharedSession; + } + + public Task Create() + { + return sharedSession.MongoSession.Client + .GetDatabase("mydatabase") + .GetCollection("mycollection") + .InsertOneAsync(sharedSession.MongoSession, new MyBusinessObject()); + } + } + #endregion + + class MyBusinessObject + { + } +} diff --git a/Snippets/MongoDB/MongoDB_7/Usage.cs b/Snippets/MongoDB/MongoDB_7/Usage.cs new file mode 100644 index 00000000000..105a6b43916 --- /dev/null +++ b/Snippets/MongoDB/MongoDB_7/Usage.cs @@ -0,0 +1,98 @@ +using System; +using MongoDB.Driver; +using NServiceBus; + +public class Usage +{ + Usage(EndpointConfiguration endpointConfiguration) + { + #region MongoDBUsage + + endpointConfiguration.UsePersistence(); + + #endregion + } + + void MongoClient(EndpointConfiguration endpointConfiguration) + { + #region MongoDBClient + + var persistence = endpointConfiguration.UsePersistence(); + persistence.MongoClient(new MongoClient("SharedMongoUrl")); + + #endregion + } + + void DatabaseName(EndpointConfiguration endpointConfiguration) + { + #region MongoDBDatabaseName + + var persistence = endpointConfiguration.UsePersistence(); + persistence.DatabaseName("DatabaseName"); + + #endregion + } + + void UseTransactions(EndpointConfiguration endpointConfiguration) + { + #region MongoDBDisableTransactions + + var persistence = endpointConfiguration.UsePersistence(); + persistence.UseTransactions(false); + + #endregion + } + + void TimeToKeepOutboxDeduplicationData(EndpointConfiguration endpointConfiguration) + { + #region MongoDBOutboxCleanup + + var outbox = endpointConfiguration.EnableOutbox(); + outbox.TimeToKeepOutboxDeduplicationData(TimeSpan.FromDays(30)); + + #endregion + } + + void DisableReadFallback(EndpointConfiguration endpointConfiguration) + { + #region MongoDBDisableReadFallback + + var outbox = endpointConfiguration.EnableOutbox(); + outbox.DisableReadFallback(); + + #endregion + } + + void DisableInstaller(EndpointConfiguration endpointConfiguration) + { + #region MongoDBDisableInstaller + + var persistence = endpointConfiguration.UsePersistence(); + persistence.DisableInstaller(); + + #endregion + } + + void SBMakoCompatibility(EndpointConfiguration endpointConfiguration) + { + #region MongoDBSBMakoCompatibility + + var persistence = endpointConfiguration.UsePersistence(); + var compatibility = persistence.CommunityPersistenceCompatibility(); + compatibility.CollectionNamingConvention(type => type.Name); + compatibility.VersionElementName("DocumentVersion"); + + #endregion + } + + void TekmavenCompatibility(EndpointConfiguration endpointConfiguration) + { + #region MongoDBTekmavenCompatibility + + var persistence = endpointConfiguration.UsePersistence(); + var compatibility = persistence.CommunityPersistenceCompatibility(); + compatibility.VersionElementName("Version"); + + #endregion + } +} diff --git a/Snippets/MongoDB/MongoDB_6/prerelease.txt b/Snippets/MongoDB/MongoDB_7/prerelease.txt similarity index 100% rename from Snippets/MongoDB/MongoDB_6/prerelease.txt rename to Snippets/MongoDB/MongoDB_7/prerelease.txt diff --git a/menu/menu.yaml b/menu/menu.yaml index 7b91ae64fe1..3fd8ae67150 100644 --- a/menu/menu.yaml +++ b/menu/menu.yaml @@ -1160,6 +1160,8 @@ Articles: - Url: persistence/upgrades/mongodb-4to5 Title: Version 4 to 5 + - Url: persistence/upgrades/mongodb-5to6 + Title: Version 5 to 6 - Title: RavenDB Articles: - Url: persistence/upgrades/ravendb-9to10 diff --git a/persistence/mongodb/index.md b/persistence/mongodb/index.md index 1ec6a971e24..68bd9a7f02d 100644 --- a/persistence/mongodb/index.md +++ b/persistence/mongodb/index.md @@ -4,6 +4,7 @@ component: mongodb versions: '[2,)' related: - samples/mongodb +- samples/outbox/mongodb redirects: - persistence/mongodb-tekmaven - nservicebus/messaging/databus/mongodb-tekmaven @@ -46,6 +47,8 @@ Specify the database to use for NServiceBus documents using the following config snippet: MongoDBDatabaseName +partial: provider + ## Transactions MongoDB [transactions](https://docs.mongodb.com/manual/core/transactions/) are enabled and required by default. This allows the persister to use pessimistic locking and to update multiple saga instances and commit them atomically during message processing. @@ -89,14 +92,13 @@ snippet: MongoDBSharedTransactionDI The `TestableMongoSynchronizedStorageSession` class in the `NServiceBus.Testing` namespace has been provided to facilitate [testing a handler](/nservicebus/testing/) that utilizes the shared transaction feature. -## Outbox +## Outbox ## Storage format Outbox record documents are stored in a collection called `outboxrecord`. -> [!WARNING] -> Outbox documents are not separated by endpoint name which means that it's not supported for multiple logical endpoints to share the same database since [message identities are not unique across endpoints from a processing perspective](/nservicebus/outbox/#message-identity). +partial: outboxstorage ### Outbox cleanup @@ -104,6 +106,8 @@ When the outbox is enabled, the deduplication data is kept for seven days by def snippet: MongoDBOutboxCleanup +partial: outboxfallback + ## Saga concurrency When simultaneously handling messages, conflicts may occur. See below for examples of the exceptions which are thrown. _[Saga concurrency](/nservicebus/sagas/concurrency.md)_ explains how these conflicts are handled and contains guidance for high-load scenarios. @@ -135,3 +139,7 @@ MongoDB.Driver.MongoCommandException: Command update failed: WriteConflict. ``` include: saga-concurrency + +## Installer + +partial: installer \ No newline at end of file diff --git a/persistence/mongodb/index_installer_mongodb_[,6).partial.md b/persistence/mongodb/index_installer_mongodb_[,6).partial.md new file mode 100644 index 00000000000..657f1c522a9 --- /dev/null +++ b/persistence/mongodb/index_installer_mongodb_[,6).partial.md @@ -0,0 +1 @@ +Installers are not supported. Indexes are created regardless of the installer settings. \ No newline at end of file diff --git a/persistence/mongodb/index_installer_mongodb_[6,).partial.md b/persistence/mongodb/index_installer_mongodb_[6,).partial.md new file mode 100644 index 00000000000..c44b9721029 --- /dev/null +++ b/persistence/mongodb/index_installer_mongodb_[6,).partial.md @@ -0,0 +1,5 @@ +Indexes are only created when the installers are enabled. + +In non-development environments, where the indexes are managed by other means (e.g. Ops Manager), it may be necessary to explicitly disable the persistence installers if [standard installers](/nservicebus/operations/installers.md) need to be used for other purposes. + +snippet: MongoDBDisableInstaller \ No newline at end of file diff --git a/samples/mongodb/simple/MongoDB_6/prerelease.txt b/persistence/mongodb/index_outboxfallback_mongodb_[,6).partial.md similarity index 100% rename from samples/mongodb/simple/MongoDB_6/prerelease.txt rename to persistence/mongodb/index_outboxfallback_mongodb_[,6).partial.md diff --git a/persistence/mongodb/index_outboxfallback_mongodb_[6,).partial.md b/persistence/mongodb/index_outboxfallback_mongodb_[6,).partial.md new file mode 100644 index 00000000000..4995c0c327b --- /dev/null +++ b/persistence/mongodb/index_outboxfallback_mongodb_[6,).partial.md @@ -0,0 +1,7 @@ +### Disable fallback reads + +By default, outbox records are retrieved using the new structured ID format. If no record is found, the persistence falls back to reading entries stored with the previous non-structured ID. This fallback ensures backward compatibility during the transition period. + +Fallback reads can be disabled once all legacy records have either expired or been dispatched, ensuring that only the structured format is queried. For details and recommendations, see the [upgrade guide](/persistence/upgrades/mongodb-5to6.md). + +snippet: MongoDBDisableReadFallback \ No newline at end of file diff --git a/persistence/mongodb/index_outboxstorage_mongodb_[,6).partial.md b/persistence/mongodb/index_outboxstorage_mongodb_[,6).partial.md new file mode 100644 index 00000000000..09377ed3333 --- /dev/null +++ b/persistence/mongodb/index_outboxstorage_mongodb_[,6).partial.md @@ -0,0 +1,2 @@ +> [!WARNING] +> Outbox documents are not separated by endpoint name. Because of that, multiple logical endpoints cannot share the same database since [message identities are not unique across endpoints from a processing perspective](/nservicebus/outbox/#message-identity). \ No newline at end of file diff --git a/persistence/mongodb/index_outboxstorage_mongodb_[6,).partial.md b/persistence/mongodb/index_outboxstorage_mongodb_[6,).partial.md new file mode 100644 index 00000000000..e69de29bb2d diff --git a/persistence/mongodb/index_provider_mongodb_[,6).partial.md b/persistence/mongodb/index_provider_mongodb_[,6).partial.md new file mode 100644 index 00000000000..e69de29bb2d diff --git a/persistence/mongodb/index_provider_mongodb_[6,).partial.md b/persistence/mongodb/index_provider_mongodb_[6,).partial.md new file mode 100644 index 00000000000..a826df33677 --- /dev/null +++ b/persistence/mongodb/index_provider_mongodb_[6,).partial.md @@ -0,0 +1,9 @@ +In cases when the `IMongoClient` is configured and used via dependency injection, a custom provider can be implemented: + +snippet: MongoDBClientProvider + +and then registered on the container + +snippet: MongoDBCustomClientProviderRegistration + +When hosting with the generic host, the registrations can also be done directly on the service collection. \ No newline at end of file diff --git a/persistence/upgrades/mongodb-5to6.md b/persistence/upgrades/mongodb-5to6.md new file mode 100644 index 00000000000..f10183002e3 --- /dev/null +++ b/persistence/upgrades/mongodb-5to6.md @@ -0,0 +1,87 @@ +--- +title: MongoDB Persistence Upgrade Version 5 to 6 +summary: Migration instructions on how to upgrade to MongoDB Persistence version 6 +reviewed: 2025-09-02 +component: mongodb +related: +- persistence/mongodb +isUpgradeGuide: true +--- + +## Minimum required client version + +The minimum required MongoDB client version has been raised to [3.4.3](https://www.nuget.org/packages/MongoDB.Driver/3.4.3). + +## TimeToKeepOutboxDeduplicationData method moved + +The `TimeToKeepOutboxDeduplicationData` method has been moved to the outbox settings: + +snippet: MongoDBOutboxCleanup + +## Installer support + +In previous versions, indexes were created automatically for all storage types, regardless of whether the installers were disabled. Starting with version 6, indexes are created only when the installers are enabled. + +This enables the possibility to take full control over the index creation by leveraging the [Ops Manager](https://www.mongodb.com/docs/ops-manager/current/data-explorer/indexes/) or any other preferred deployment mechanism. + +When installers are disabled, or when installers are enabled but the persistence installers are disabled with: + +snippet: MongoDBDisableInstaller + +the persistence assumes that all required infrastructure (including indexes) is already in place. If the necessary indexes are missing, system performance and reliability may be affected. + +## Outbox record storage layout changes + +Outbox records no longer use the message ID alone as the `_id`. In previous versions, this caused message loss in publish/subscribe scenarios when multiple endpoints shared the same database, since all subscribers wrote to the same outbox record. + +Starting with this version, outbox records include a partition key (defaulting to the endpoint name) as part of a structured `_id { pk, mid }`. This prevents the message loss by applying the deuplication per endpoint. + +- The implementation is backwards compatible meaning that the existing outbox records remain readable. The persistence performs backward-compatible reads for older entries, but all the new entries use the new structured format. +- Old records will continue to expire according to the configured time to keep deduplication data. +- If desired, the fallback reads can be disabled once no legacy records remain. + +For the new endpoints, it is recommended to disable the fallback reads. + +snippet: MongoDBDisableReadFallback + +For the existing endpoints, fallback reads should be enabled until at least the configured time to keep deduplication data has passed. Note that: + +- Dispatched records using the old format will expire after the configured retention time. +- Undispatched records do not expire and may remain in the outbox collection longer. + +The fallback reads should only be disabled once: + +1. All dispatched entries using the old format have expired. +2. No undispatched entries using the old format remain. + +One possible approach to get an understanding of the state of outbox collection is to execute the following MongoDB shell queries: + +```bash +db.getCollection('outboxrecord').aggregate( + [ + { + $match: { + _id: { $type: 'string' }, + Dispatched: { $ne: null } + } + }, + { $count: 'total_count_of_dispatched_records' } + ], + { maxTimeMS: 60000, allowDiskUse: true } +); +``` + +```bash +db.getCollection('outboxrecord').aggregate( + [ + { + $match: { + _id: { $type: 'string' }, + Dispatched: { $eq: null } + } + }, + { $count: 'total_count_of_undispatched_records' } + ], + { maxTimeMS: 60000, allowDiskUse: true } +); +``` \ No newline at end of file diff --git a/samples/mongodb/simple/MongoDB_6/Client/Client.csproj b/samples/mongodb/simple/MongoDB_6/Client/Client.csproj index 3810bbebc8d..d2c1a7ff0fa 100644 --- a/samples/mongodb/simple/MongoDB_6/Client/Client.csproj +++ b/samples/mongodb/simple/MongoDB_6/Client/Client.csproj @@ -1,12 +1,12 @@ - net10.0 - preview + net8.0 enable Exe + 12.0 - + diff --git a/samples/mongodb/simple/MongoDB_6/Server/Server.csproj b/samples/mongodb/simple/MongoDB_6/Server/Server.csproj index 2fa0f28615e..554e695e8e1 100644 --- a/samples/mongodb/simple/MongoDB_6/Server/Server.csproj +++ b/samples/mongodb/simple/MongoDB_6/Server/Server.csproj @@ -1,14 +1,14 @@ - net10.0 + net8.0 Exe - preview + 12.0 enable - - + + \ No newline at end of file diff --git a/samples/mongodb/simple/MongoDB_6/Shared/Shared.csproj b/samples/mongodb/simple/MongoDB_6/Shared/Shared.csproj index b761a1c0cd0..2b855a42eea 100644 --- a/samples/mongodb/simple/MongoDB_6/Shared/Shared.csproj +++ b/samples/mongodb/simple/MongoDB_6/Shared/Shared.csproj @@ -1,9 +1,9 @@ - net10.0 - preview + net8.0 + 12.0 - + \ No newline at end of file diff --git a/samples/mongodb/simple/MongoDB_7/Client/Client.csproj b/samples/mongodb/simple/MongoDB_7/Client/Client.csproj new file mode 100644 index 00000000000..3810bbebc8d --- /dev/null +++ b/samples/mongodb/simple/MongoDB_7/Client/Client.csproj @@ -0,0 +1,14 @@ + + + net10.0 + preview + enable + Exe + + + + + + + + \ No newline at end of file diff --git a/samples/mongodb/simple/MongoDB_7/Client/OrderCompletedHandler.cs b/samples/mongodb/simple/MongoDB_7/Client/OrderCompletedHandler.cs new file mode 100644 index 00000000000..4caece42932 --- /dev/null +++ b/samples/mongodb/simple/MongoDB_7/Client/OrderCompletedHandler.cs @@ -0,0 +1,10 @@ +using Microsoft.Extensions.Logging; + +public class OrderCompletedHandler(ILogger logger) : IHandleMessages +{ + public Task Handle(OrderCompleted message, IMessageHandlerContext context) + { + logger.LogInformation("Received OrderCompleted for OrderId {OrderId}", message.OrderId); + return Task.CompletedTask; + } +} \ No newline at end of file diff --git a/samples/mongodb/simple/MongoDB_7/Client/Program.cs b/samples/mongodb/simple/MongoDB_7/Client/Program.cs new file mode 100644 index 00000000000..4fe068049ff --- /dev/null +++ b/samples/mongodb/simple/MongoDB_7/Client/Program.cs @@ -0,0 +1,41 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; + + +Console.Title = "Client"; +var builder = Host.CreateApplicationBuilder(args); +var endpointConfiguration = new EndpointConfiguration("Samples.MongoDB.Client"); +endpointConfiguration.UseTransport(new LearningTransport()); +endpointConfiguration.UseSerialization(); + +builder.UseNServiceBus(endpointConfiguration); + +var host = builder.Build(); +await host.StartAsync(); + +var messageSession = host.Services.GetRequiredService(); +Console.WriteLine("Press 'enter' to send a StartOrder messages"); + +while (true) +{ + var key = Console.ReadKey(); + Console.WriteLine(); + + if (key.Key != ConsoleKey.Enter) + { + break; + } + + var orderId = Guid.NewGuid(); + var startOrder = new StartOrder + { + OrderId = orderId + }; + + await messageSession.Send("Samples.MongoDB.Server", startOrder); + + Console.WriteLine($"StartOrder Message sent with OrderId {orderId}"); + +} + +await builder.Build().StopAsync(); \ No newline at end of file diff --git a/samples/mongodb/simple/MongoDB_7/MongoDB.sln b/samples/mongodb/simple/MongoDB_7/MongoDB.sln new file mode 100644 index 00000000000..a4a582c753a --- /dev/null +++ b/samples/mongodb/simple/MongoDB_7/MongoDB.sln @@ -0,0 +1,34 @@ +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 17 +VisualStudioVersion = 17.14.36301.6 d17.14 +MinimumVisualStudioVersion = 15.0.26730.12 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Server", "Server\Server.csproj", "{48F718EE-6C45-41BA-80EC-81BF34D4A623}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Shared", "Shared\Shared.csproj", "{DD438DB2-9C03-4BC0-BA52-BB7A35098458}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Client", "Client\Client.csproj", "{2FE71442-7F81-428E-B945-D564850D6564}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{8EC462FD-D22E-90A8-E5CE-7E832BA40C5D}" + ProjectSection(SolutionItems) = preProject + prerelease.txt = prerelease.txt + EndProjectSection +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {48F718EE-6C45-41BA-80EC-81BF34D4A623}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {48F718EE-6C45-41BA-80EC-81BF34D4A623}.Debug|Any CPU.Build.0 = Debug|Any CPU + {DD438DB2-9C03-4BC0-BA52-BB7A35098458}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {DD438DB2-9C03-4BC0-BA52-BB7A35098458}.Debug|Any CPU.Build.0 = Debug|Any CPU + {2FE71442-7F81-428E-B945-D564850D6564}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {2FE71442-7F81-428E-B945-D564850D6564}.Debug|Any CPU.Build.0 = Debug|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {2A3D61F0-1091-40B4-B4A5-E3282EFE5FBC} + EndGlobalSection +EndGlobal diff --git a/samples/mongodb/simple/MongoDB_7/Server/CompleteOrder.cs b/samples/mongodb/simple/MongoDB_7/Server/CompleteOrder.cs new file mode 100644 index 00000000000..7d479bdded9 --- /dev/null +++ b/samples/mongodb/simple/MongoDB_7/Server/CompleteOrder.cs @@ -0,0 +1,4 @@ +public class CompleteOrder +{ + public string OrderDescription { get; set; } +} \ No newline at end of file diff --git a/samples/mongodb/simple/MongoDB_7/Server/OrderSaga.cs b/samples/mongodb/simple/MongoDB_7/Server/OrderSaga.cs new file mode 100644 index 00000000000..a1da870b8d1 --- /dev/null +++ b/samples/mongodb/simple/MongoDB_7/Server/OrderSaga.cs @@ -0,0 +1,47 @@ +using Microsoft.Extensions.Logging; +#region thesaga + +public class OrderSaga(ILogger logger) : + Saga, + IAmStartedByMessages, + IHandleTimeouts +{ + protected override void ConfigureHowToFindSaga(SagaPropertyMapper mapper) + { + mapper.MapSaga(sagaData => sagaData.OrderId) + .ToMessage(message => message.OrderId); + } + + public Task Handle(StartOrder message, IMessageHandlerContext context) + { + Data.OrderId = message.OrderId; + var orderDescription = $"The saga for order {message.OrderId}"; + Data.OrderDescription = orderDescription; + + logger.LogInformation("Received StartOrder message {OrderId}. Starting Saga", Data.OrderId); + logger.LogInformation("Order will complete in 5 seconds"); + + var timeoutData = new CompleteOrder + { + OrderDescription = orderDescription + }; + + return RequestTimeout(context, TimeSpan.FromSeconds(5), timeoutData); + } + + public Task Timeout(CompleteOrder state, IMessageHandlerContext context) + { + logger.LogInformation("Saga with OrderId {OrderId} completed", Data.OrderId); + + var orderCompleted = new OrderCompleted + { + OrderId = Data.OrderId + }; + + MarkAsComplete(); + + return context.Publish(orderCompleted); + } +} + +#endregion \ No newline at end of file diff --git a/samples/mongodb/simple/MongoDB_7/Server/OrderSagaData.cs b/samples/mongodb/simple/MongoDB_7/Server/OrderSagaData.cs new file mode 100644 index 00000000000..f5ec0515ca9 --- /dev/null +++ b/samples/mongodb/simple/MongoDB_7/Server/OrderSagaData.cs @@ -0,0 +1,10 @@ +#region sagadata + +public class OrderSagaData : ContainSagaData +{ + public Guid OrderId { get; set; } + + public string OrderDescription { get; set; } +} + +#endregion \ No newline at end of file diff --git a/samples/mongodb/simple/MongoDB_7/Server/Program.cs b/samples/mongodb/simple/MongoDB_7/Server/Program.cs new file mode 100644 index 00000000000..98a4c299158 --- /dev/null +++ b/samples/mongodb/simple/MongoDB_7/Server/Program.cs @@ -0,0 +1,20 @@ +using Microsoft.Extensions.Hosting; + + +Console.Title = "Server"; +var builder = Host.CreateApplicationBuilder(args); + +#region mongoDbConfig + +var endpointConfiguration = new EndpointConfiguration("Samples.MongoDB.Server"); +var persistence = endpointConfiguration.UsePersistence(); +persistence.DatabaseName("Samples_MongoDB_Server"); + +#endregion + +endpointConfiguration.EnableInstallers(); +endpointConfiguration.UseSerialization(); +endpointConfiguration.UseTransport(new LearningTransport()); + +builder.UseNServiceBus(endpointConfiguration); +await builder.Build().RunAsync(); \ No newline at end of file diff --git a/samples/mongodb/simple/MongoDB_7/Server/Server.csproj b/samples/mongodb/simple/MongoDB_7/Server/Server.csproj new file mode 100644 index 00000000000..9c608e62165 --- /dev/null +++ b/samples/mongodb/simple/MongoDB_7/Server/Server.csproj @@ -0,0 +1,14 @@ + + + net10.0 + Exe + preview + enable + + + + + + + + \ No newline at end of file diff --git a/samples/mongodb/simple/MongoDB_7/Shared/OrderCompleted.cs b/samples/mongodb/simple/MongoDB_7/Shared/OrderCompleted.cs new file mode 100644 index 00000000000..f14ac7b001d --- /dev/null +++ b/samples/mongodb/simple/MongoDB_7/Shared/OrderCompleted.cs @@ -0,0 +1,7 @@ +using System; +using NServiceBus; + +public class OrderCompleted : IEvent +{ + public Guid OrderId { get; set; } +} \ No newline at end of file diff --git a/samples/mongodb/simple/MongoDB_7/Shared/Shared.csproj b/samples/mongodb/simple/MongoDB_7/Shared/Shared.csproj new file mode 100644 index 00000000000..98f3133861c --- /dev/null +++ b/samples/mongodb/simple/MongoDB_7/Shared/Shared.csproj @@ -0,0 +1,9 @@ + + + net10.0 + preview + + + + + \ No newline at end of file diff --git a/samples/mongodb/simple/MongoDB_7/Shared/StartOrder.cs b/samples/mongodb/simple/MongoDB_7/Shared/StartOrder.cs new file mode 100644 index 00000000000..f87f4f3c5e8 --- /dev/null +++ b/samples/mongodb/simple/MongoDB_7/Shared/StartOrder.cs @@ -0,0 +1,7 @@ +using System; +using NServiceBus; + +public class StartOrder : IMessage +{ + public Guid OrderId { get; set; } +} \ No newline at end of file diff --git a/samples/mongodb/simple/MongoDB_7/prerelease.txt b/samples/mongodb/simple/MongoDB_7/prerelease.txt new file mode 100644 index 00000000000..e69de29bb2d diff --git a/samples/outbox/mongodb/Core_10/MongoDbOutbox.sln b/samples/outbox/mongodb/Core_10/MongoDbOutbox.sln new file mode 100644 index 00000000000..b99a7bb3241 --- /dev/null +++ b/samples/outbox/mongodb/Core_10/MongoDbOutbox.sln @@ -0,0 +1,19 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 16 +VisualStudioVersion = 16.0.29728.190 +MinimumVisualStudioVersion = 15.0.26730.12 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sample", "Sample\Sample.csproj", "{FAA2FC1D-4554-4AA7-8D5A-334DC8AB601C}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {FAA2FC1D-4554-4AA7-8D5A-334DC8AB601C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {FAA2FC1D-4554-4AA7-8D5A-334DC8AB601C}.Debug|Any CPU.Build.0 = Debug|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection +EndGlobal diff --git a/samples/outbox/mongodb/Core_10/Sample/Helper.cs b/samples/outbox/mongodb/Core_10/Sample/Helper.cs new file mode 100644 index 00000000000..39bfc8e62d4 --- /dev/null +++ b/samples/outbox/mongodb/Core_10/Sample/Helper.cs @@ -0,0 +1,23 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using NServiceBus; +public static class Helper +{ + public static Task SendDuplicates(IMessageSession context, TMessage message, int totalCount) + { + var duplicatedMessageId = Guid.NewGuid().ToString(); + + var tasks = Enumerable.Range(0, totalCount) + .Select(i => + { + var options = new SendOptions(); + options.RouteToThisEndpoint(); + options.SetMessageId(duplicatedMessageId); + + return context.Send(message, options); + }); + + return Task.WhenAll(tasks); + } +} \ No newline at end of file diff --git a/samples/outbox/mongodb/Core_10/Sample/MyHandler.cs b/samples/outbox/mongodb/Core_10/Sample/MyHandler.cs new file mode 100644 index 00000000000..bc15778b3a8 --- /dev/null +++ b/samples/outbox/mongodb/Core_10/Sample/MyHandler.cs @@ -0,0 +1,29 @@ +using System; +using System.Threading.Tasks; +using MongoDB.Bson; +using NServiceBus; + +public class MyHandler : IHandleMessages +{ + #region Handler + public async Task Handle(MyMessage message, IMessageHandlerContext context) + { + Console.WriteLine($"Processing MessageId {context.MessageId}"); + + var mongoPersistenceSession = context.SynchronizedStorageSession.MongoPersistenceSession(); + + await mongoPersistenceSession.MongoSession!.Client.GetDatabase("Samples_Outbox_Demo") + .GetCollection("businessobjects") + .InsertOneAsync(mongoPersistenceSession.MongoSession, new MyDocument + { + MessageId = context.MessageId, + }, cancellationToken: context.CancellationToken); + } + #endregion +} + +public class MyDocument +{ + public ObjectId Id { get; set; } + public string MessageId { get; set; } +} \ No newline at end of file diff --git a/samples/outbox/mongodb/Core_10/Sample/MyMessage.cs b/samples/outbox/mongodb/Core_10/Sample/MyMessage.cs new file mode 100644 index 00000000000..1686f5539f7 --- /dev/null +++ b/samples/outbox/mongodb/Core_10/Sample/MyMessage.cs @@ -0,0 +1,5 @@ +using NServiceBus; + +public class MyMessage : IMessage +{ +} \ No newline at end of file diff --git a/samples/outbox/mongodb/Core_10/Sample/Program.cs b/samples/outbox/mongodb/Core_10/Sample/Program.cs new file mode 100644 index 00000000000..424eee9ab14 --- /dev/null +++ b/samples/outbox/mongodb/Core_10/Sample/Program.cs @@ -0,0 +1,60 @@ +using System; +using System.Threading.Tasks; +using MongoDB.Driver; +using NServiceBus; + +class Program +{ + static async Task Main() + { + Console.Title = "RabbitMQMongoDBOutbox"; + + #region ConfigureTransport + var endpointConfiguration = new EndpointConfiguration("Samples.CosmosDb.Outbox"); + + var rabbitMqTransport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), "host=localhost;username=rabbitmq;password=rabbitmq") + { + TransportTransactionMode = TransportTransactionMode.ReceiveOnly + }; + endpointConfiguration.UseSerialization(); + endpointConfiguration.UseTransport(rabbitMqTransport); + #endregion + + #region ConfigurePersistence + + var persistence = endpointConfiguration.UsePersistence(); + persistence.MongoClient(new MongoClient("mongodb://127.0.0.1:27017/?replicaSet=tr0")); + persistence.DatabaseName("Samples_Outbox_Demo"); + #endregion + + endpointConfiguration.EnableInstallers(); + + #region SampleSteps + + // STEP 1: Run code as is, duplicates can be observed in console and database + + // STEP 2: Uncomment this line to enable the Outbox. Duplicates will be suppressed. + //endpointConfiguration.EnableOutbox(); + + // STEP 3: Comment out this line to allow concurrent processing. Concurrency exceptions will + // occur in the console window, but only 5 entries will be made in the database. + endpointConfiguration.LimitMessageProcessingConcurrencyTo(1); + + #endregion + var endpointInstance = await Endpoint.Start(endpointConfiguration); + + Console.WriteLine("Endpoint started. Press Enter to send 5 sets of duplicate messages..."); + Console.ReadLine(); + + for (var i = 0; i < 5; i++) + { + var myMessage = new MyMessage(); + await Helper.SendDuplicates(endpointInstance, myMessage, totalCount: 2); + } + + await Task.Delay(5000); + Console.WriteLine("Press any key to exit"); + Console.ReadKey(); + await endpointInstance.Stop(); + } +} diff --git a/samples/outbox/mongodb/Core_10/Sample/Sample.csproj b/samples/outbox/mongodb/Core_10/Sample/Sample.csproj new file mode 100644 index 00000000000..bd85d0adfe5 --- /dev/null +++ b/samples/outbox/mongodb/Core_10/Sample/Sample.csproj @@ -0,0 +1,12 @@ + + + net10.0 + Exe + preview + + + + + + + \ No newline at end of file diff --git a/samples/outbox/mongodb/Core_10/docker-compose.yml b/samples/outbox/mongodb/Core_10/docker-compose.yml new file mode 100644 index 00000000000..08df42bb3b7 --- /dev/null +++ b/samples/outbox/mongodb/Core_10/docker-compose.yml @@ -0,0 +1,38 @@ +name: outbox +services: + rabbit: + image: rabbitmq:3-management + environment: + RABBITMQ_DEFAULT_USER: rabbitmq + RABBITMQ_DEFAULT_PASS: rabbitmq + RABBITMQ_DEFAULT_VHOST: / + ports: + - "15672:15672" + - "5672:5672" + + mongodb: + image: mongo:6.0 + container_name: mongodb + command: ["--replSet", "tr0", "--bind_ip_all"] + ports: + - "27017:27017" + healthcheck: + test: ["CMD", "mongosh", "--eval", "db.adminCommand('ping')"] + interval: 5s + timeout: 5s + retries: 30 + + mongodb-init: + image: mongo:6.0 + container_name: mongodb-init + depends_on: + mongodb: + condition: service_healthy + # One-shot container to initialize the single-node replica set + entrypoint: + [ + "bash", + "-lc", + "mongosh --host mongodb:27017 --eval \"rs.initiate({_id:'tr0', members:[{_id:0, host:'127.0.0.1:27017'}]})\" || echo 'Replica set already initialized.'" + ] + restart: "no" \ No newline at end of file diff --git a/samples/outbox/mongodb/Core_10/prerelease.txt b/samples/outbox/mongodb/Core_10/prerelease.txt new file mode 100644 index 00000000000..e69de29bb2d diff --git a/samples/outbox/mongodb/Core_9/MongoDbOutbox.sln b/samples/outbox/mongodb/Core_9/MongoDbOutbox.sln new file mode 100644 index 00000000000..b99a7bb3241 --- /dev/null +++ b/samples/outbox/mongodb/Core_9/MongoDbOutbox.sln @@ -0,0 +1,19 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 16 +VisualStudioVersion = 16.0.29728.190 +MinimumVisualStudioVersion = 15.0.26730.12 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sample", "Sample\Sample.csproj", "{FAA2FC1D-4554-4AA7-8D5A-334DC8AB601C}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {FAA2FC1D-4554-4AA7-8D5A-334DC8AB601C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {FAA2FC1D-4554-4AA7-8D5A-334DC8AB601C}.Debug|Any CPU.Build.0 = Debug|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection +EndGlobal diff --git a/samples/outbox/mongodb/Core_9/Sample/Helper.cs b/samples/outbox/mongodb/Core_9/Sample/Helper.cs new file mode 100644 index 00000000000..39bfc8e62d4 --- /dev/null +++ b/samples/outbox/mongodb/Core_9/Sample/Helper.cs @@ -0,0 +1,23 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using NServiceBus; +public static class Helper +{ + public static Task SendDuplicates(IMessageSession context, TMessage message, int totalCount) + { + var duplicatedMessageId = Guid.NewGuid().ToString(); + + var tasks = Enumerable.Range(0, totalCount) + .Select(i => + { + var options = new SendOptions(); + options.RouteToThisEndpoint(); + options.SetMessageId(duplicatedMessageId); + + return context.Send(message, options); + }); + + return Task.WhenAll(tasks); + } +} \ No newline at end of file diff --git a/samples/outbox/mongodb/Core_9/Sample/MyHandler.cs b/samples/outbox/mongodb/Core_9/Sample/MyHandler.cs new file mode 100644 index 00000000000..bc15778b3a8 --- /dev/null +++ b/samples/outbox/mongodb/Core_9/Sample/MyHandler.cs @@ -0,0 +1,29 @@ +using System; +using System.Threading.Tasks; +using MongoDB.Bson; +using NServiceBus; + +public class MyHandler : IHandleMessages +{ + #region Handler + public async Task Handle(MyMessage message, IMessageHandlerContext context) + { + Console.WriteLine($"Processing MessageId {context.MessageId}"); + + var mongoPersistenceSession = context.SynchronizedStorageSession.MongoPersistenceSession(); + + await mongoPersistenceSession.MongoSession!.Client.GetDatabase("Samples_Outbox_Demo") + .GetCollection("businessobjects") + .InsertOneAsync(mongoPersistenceSession.MongoSession, new MyDocument + { + MessageId = context.MessageId, + }, cancellationToken: context.CancellationToken); + } + #endregion +} + +public class MyDocument +{ + public ObjectId Id { get; set; } + public string MessageId { get; set; } +} \ No newline at end of file diff --git a/samples/outbox/mongodb/Core_9/Sample/MyMessage.cs b/samples/outbox/mongodb/Core_9/Sample/MyMessage.cs new file mode 100644 index 00000000000..1686f5539f7 --- /dev/null +++ b/samples/outbox/mongodb/Core_9/Sample/MyMessage.cs @@ -0,0 +1,5 @@ +using NServiceBus; + +public class MyMessage : IMessage +{ +} \ No newline at end of file diff --git a/samples/outbox/mongodb/Core_9/Sample/Program.cs b/samples/outbox/mongodb/Core_9/Sample/Program.cs new file mode 100644 index 00000000000..70ec7b24123 --- /dev/null +++ b/samples/outbox/mongodb/Core_9/Sample/Program.cs @@ -0,0 +1,60 @@ +using System; +using System.Threading.Tasks; +using MongoDB.Driver; +using NServiceBus; + +class Program +{ + static async Task Main() + { + Console.Title = "RabbitMQMongoDBOutbox"; + + #region ConfigureTransport + var endpointConfiguration = new EndpointConfiguration("Samples.CosmosDb.Outbox"); + + var rabbitMqTransport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), "host=localhost;username=rabbitmq;password=rabbitmq") + { + TransportTransactionMode = TransportTransactionMode.ReceiveOnly + }; + endpointConfiguration.UseSerialization(); + endpointConfiguration.UseTransport(rabbitMqTransport); + #endregion + + #region ConfigurePersistence + + var persistence = endpointConfiguration.UsePersistence(); + persistence.MongoClient(new MongoClient("mongodb://127.0.0.1:27017/?replicaSet=tr0")); + persistence.DatabaseName("Samples_Outbox_Demo"); + #endregion + + endpointConfiguration.EnableInstallers(); + + #region SampleSteps + + // STEP 1: Run code as is, duplicates can be observed in console and database + + // STEP 2: Uncomment this line to enable the Outbox. Duplicates will be suppressed. + // endpointConfiguration.EnableOutbox().DisableReadFallback(); + + // STEP 3: Comment out this line to allow concurrent processing. Concurrency exceptions will + // occur in the console window, but only 5 entries will be made in the database. + endpointConfiguration.LimitMessageProcessingConcurrencyTo(1); + + #endregion + var endpointInstance = await Endpoint.Start(endpointConfiguration); + + Console.WriteLine("Endpoint started. Press Enter to send 5 sets of duplicate messages..."); + Console.ReadLine(); + + for (var i = 0; i < 5; i++) + { + var myMessage = new MyMessage(); + await Helper.SendDuplicates(endpointInstance, myMessage, totalCount: 2); + } + + await Task.Delay(5000); + Console.WriteLine("Press any key to exit"); + Console.ReadKey(); + await endpointInstance.Stop(); + } +} diff --git a/samples/outbox/mongodb/Core_9/Sample/Sample.csproj b/samples/outbox/mongodb/Core_9/Sample/Sample.csproj new file mode 100644 index 00000000000..c242842443e --- /dev/null +++ b/samples/outbox/mongodb/Core_9/Sample/Sample.csproj @@ -0,0 +1,12 @@ + + + net8.0 + Exe + 12.0 + + + + + + + \ No newline at end of file diff --git a/samples/outbox/mongodb/Core_9/docker-compose.yml b/samples/outbox/mongodb/Core_9/docker-compose.yml new file mode 100644 index 00000000000..08df42bb3b7 --- /dev/null +++ b/samples/outbox/mongodb/Core_9/docker-compose.yml @@ -0,0 +1,38 @@ +name: outbox +services: + rabbit: + image: rabbitmq:3-management + environment: + RABBITMQ_DEFAULT_USER: rabbitmq + RABBITMQ_DEFAULT_PASS: rabbitmq + RABBITMQ_DEFAULT_VHOST: / + ports: + - "15672:15672" + - "5672:5672" + + mongodb: + image: mongo:6.0 + container_name: mongodb + command: ["--replSet", "tr0", "--bind_ip_all"] + ports: + - "27017:27017" + healthcheck: + test: ["CMD", "mongosh", "--eval", "db.adminCommand('ping')"] + interval: 5s + timeout: 5s + retries: 30 + + mongodb-init: + image: mongo:6.0 + container_name: mongodb-init + depends_on: + mongodb: + condition: service_healthy + # One-shot container to initialize the single-node replica set + entrypoint: + [ + "bash", + "-lc", + "mongosh --host mongodb:27017 --eval \"rs.initiate({_id:'tr0', members:[{_id:0, host:'127.0.0.1:27017'}]})\" || echo 'Replica set already initialized.'" + ] + restart: "no" \ No newline at end of file diff --git a/samples/outbox/mongodb/sample.md b/samples/outbox/mongodb/sample.md new file mode 100644 index 00000000000..d234b75a383 --- /dev/null +++ b/samples/outbox/mongodb/sample.md @@ -0,0 +1,133 @@ +--- +title: Using Outbox with MongoDB +summary: Demonstrates how the Outbox handles duplicate messages using RabbitMQ and MongoDB hosted in Docker containers. +reviewed: 2025-09-03 +component: Core +related: +- transports/rabbitmq +- persistence/mongodb +--- + +Integrates the [RabbitMQ Server transport](/transports/rabbitmq/) with [MongoDB persistence](/persistence/mongodb/). + +This sample demonstrates how the Outbox feature works to ensure the atomic processing of a message in MongoDB, ensuring that messages sent and received are kept consistent with any modifications made to business data in a database. + +This sample uses [Docker Compose](https://docs.docker.com/compose/) to provide dependencies. It is not necessary to have installed instances of RabbitMQ or SQL Server. + +downloadbutton + +## Prerequisites + +1. Install [Docker](https://www.docker.com/products/docker-desktop). +2. Install [Docker Compose](https://docs.docker.com/compose/install/). +3. If running Docker on Windows, [set Docker to use Linux containers](https://docs.docker.com/docker-for-windows/#switch-between-windows-and-linux-containers). +4. In the sample directory, execute the following to set up the RabbitMQ and CosmosDb emulator instances: + +```shell +> docker compose up --detach +``` + +Once complete, the RabbitMQ administration can be reached via [http://localhost:15672/](http://localhost:15672/) with username `rabbitmq` and password `rabbitmq`. + +## Running the project + +The code consists of a single NServiceBus endpoint project, which simulates receiving duplicated messages (normally received due to at-least-once delivery guarantees of the message broker) and processing them under three different circumstances. + +1. Without protection, resulting in duplicated processing of messages. +2. Using the Outbox but with a maximum message concurrency of `1`. +3. Using the Outbox but with multiple messages being processed simultaneously, relying on the concurrency exception thrown by the database to ensure exactly-once successful processing of messages. + +Selecting each step is accomplished via commenting and uncommenting code in the **Program.cs** file: + +snippet: SampleSteps + +### Step 1: No protection + +First, run the sample as-is. It's easy to see from the console output that each MessageId is processed twice. The endpoint has no way to know that it's handling duplicated messages. + +```text +Endpoint started. Press Enter to send 5 sets of duplicate messages... + +Processing MessageId f4589be7-efaa-4d7b-8fc7-414ac6e5ddfa +Processing MessageId f4589be7-efaa-4d7b-8fc7-414ac6e5ddfa +Processing MessageId 78c4be6d-02f1-4dc6-9aa0-e1298dcb28f9 +Processing MessageId 78c4be6d-02f1-4dc6-9aa0-e1298dcb28f9 +Processing MessageId 96ed249b-ac80-4eea-be16-eede5f368b09 +Processing MessageId 96ed249b-ac80-4eea-be16-eede5f368b09 +Processing MessageId 95c6f325-22e2-4832-8a2a-0988ce318f40 +Processing MessageId 95c6f325-22e2-4832-8a2a-0988ce318f40 +Processing MessageId 1333e38d-b076-41e1-92d6-a2b7a699f62f +Processing MessageId 1333e38d-b076-41e1-92d6-a2b7a699f62f +Press any key to exit +``` + +The message handler also writes the received MessageId to the **businessobject** collection in the database, as a simulation of writing business data. To see it, execute `db["businessobjects"].find()` when connected to the database: + +| _id | MessageId | +|----|--------------------------------------| +| 68b85e08946aee270f9b9a73 | f4589be7-efaa-4d7b-8fc7-414ac6e5ddfa | +| 68b85e08946aee270f9b9a74 | f4589be7-efaa-4d7b-8fc7-414ac6e5ddfa | +| 68b85e08946aee270f9b9a75 | 78c4be6d-02f1-4dc6-9aa0-e1298dcb28f9 | +| 68b85e08946aee270f9b9a76 | 78c4be6d-02f1-4dc6-9aa0-e1298dcb28f9 | +| 68b85e08946aee270f9b9a77 | 96ed249b-ac80-4eea-be16-eede5f368b09 | +| 68b85e08946aee270f9b9a78 | 96ed249b-ac80-4eea-be16-eede5f368b09 | +| 68b85e08946aee270f9b9a79 | 95c6f325-22e2-4832-8a2a-0988ce318f40 | +| 68b85e08946aee270f9b9a7a | 95c6f325-22e2-4832-8a2a-0988ce318f40 | +| 68b85e08946aee270f9b9a7b | 1333e38d-b076-41e1-92d6-a2b7a699f62f | +| 68b85e08946aee270f9b9a7c | 1333e38d-b076-41e1-92d6-a2b7a699f62f | + +### Step 2: Outbox, 1 message at a time + +Next, uncomment the line in **Program.cs** commented as Step 2, which enables the Outbox feature. + +It's clear from the console output that each MessageId is only processed a single time, and the message handler does not execute for the duplicate messages: + +```text +Endpoint started. Press Enter to send 5 sets of duplicate messages... + +Processing MessageId 4d9b207b-f7e3-48f3-a8a9-8bf00afe0bd5 +Processing MessageId cfda075b-2ebd-4256-b127-20fbf719873c +Processing MessageId 48cb433c-2cda-4835-a40c-1f06e7c4ace5 +Processing MessageId 93132c07-e39d-4eb1-9d34-db4dee493f17 +Processing MessageId 247706a4-c231-441c-b5fd-9f2defe6bb6f +Press any key to exit +``` + +The same is true in MongoDB: + +| Id | MessageId | +|----|--------------------------------------| +| 68b85ec0296a7fe3459973eb | 4d9b207b-f7e3-48f3-a8a9-8bf00afe0bd5 | +| 68b85ec0296a7fe3459973ec | cfda075b-2ebd-4256-b127-20fbf719873c | +| 68b85ec0296a7fe3459973ed | 48cb433c-2cda-4835-a40c-1f06e7c4ace5 | +| 68b85ec0296a7fe3459973ee | 93132c07-e39d-4eb1-9d34-db4dee493f17 | +| 68b85ec0296a7fe3459973ef | 247706a4-c231-441c-b5fd-9f2defe6bb6f | + +## Code walk-through + +In **Program.cs**, an NServiceBus endpoint is created and configured to use the RabbitMQ transport, connecting to the broker instance hosted in Docker: + +snippet: ConfigureTransport + +Next, [MongoDB Persistence](/persistence/mongodb/) is configured to connect to the local MongoDB instance hosted in Docker. + +snippet: ConfigurePersistence + +**MyHandler.cs** contains the message handler. + +snippet: Handler + +The message handler: + +1. Logs the `MessageId` to the console. + +> [!NOTE] +> It's absolutely essential that business data is manipulated using the same connection and transaction that NServiceBus opens to manage the Outbox data. The Outbox feature relies on combining the manipulation of Outbox and business data in the same local database transaction to guarantee consistency between messaging operations and database manipulations within the scope of processing a message. + +## Cleaning up + +Once finished with the sample, the RabbitMQ and local CosmosDb instances can be cleaned up using: + +```shell +> docker compose down +```