From c4f3f9ae9abeb94b94c00df3a23e29a0d51197af Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Thu, 4 Jul 2024 12:35:23 +0200 Subject: [PATCH 1/4] =?UTF-8?q?=E2=9C=A8=20Allow=20applying=20content-type?= =?UTF-8?q?=20without=20having=20NServiceBus.ContentType=20header=20entry?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Daniel Marbach --- .../When_customizing_outgoing_messages.cs | 67 +++++++++++++++++++ .../APIApprovals.Approve.approved.txt | 1 + .../RabbitMQTransport.cs | 9 ++- .../RabbitMQTransportInfrastructure.cs | 4 +- .../Sending/MessageDispatcher.cs | 6 +- 5 files changed, 84 insertions(+), 3 deletions(-) create mode 100644 src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_customizing_outgoing_messages.cs diff --git a/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_customizing_outgoing_messages.cs b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_customizing_outgoing_messages.cs new file mode 100644 index 000000000..711807707 --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ.AcceptanceTests/When_customizing_outgoing_messages.cs @@ -0,0 +1,67 @@ +namespace NServiceBus.Transport.RabbitMQ.AcceptanceTests +{ + using System.Threading.Tasks; + using AcceptanceTesting; + using global::RabbitMQ.Client.Events; + using NServiceBus.AcceptanceTests; + using NServiceBus.AcceptanceTests.EndpointTemplates; + using NUnit.Framework; + + class When_customizing_outgoing_messages : NServiceBusAcceptanceTest + { + [Test] + public async Task Should_set_value() + { + var scenario = await Scenario.Define() + .WithEndpoint(b => b.When((bus, c) => bus.SendLocal(new Message()))) + .Done(c => c.MessageReceived) + .Run(); + + Assert.That(scenario.BasicDeliverEventArgs.BasicProperties.AppId, Is.EqualTo("MyValue")); + } + + public class Receiver : EndpointConfigurationBuilder + { + public Receiver() + { + EndpointSetup(endpointConfiguration => + { + endpointConfiguration.ConfigureRabbitMQTransport().OutgoingNativeMessageCustomization = + (operation, properties) => + { + properties.AppId = "MyValue"; + }; + }); + } + + class MyEventHandler : IHandleMessages + { + Context testContext; + + public MyEventHandler(Context testContext) + { + this.testContext = testContext; + } + + public Task Handle(Message message, IMessageHandlerContext context) + { + testContext.BasicDeliverEventArgs = context.Extensions.Get(); + testContext.MessageReceived = true; + + return Task.CompletedTask; + } + } + } + + public class Message : IMessage + { + } + + class Context : ScenarioContext + { + public bool MessageReceived { get; set; } + + public BasicDeliverEventArgs BasicDeliverEventArgs { get; set; } + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt b/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt index 76ab4163d..01faebeab 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt @@ -22,6 +22,7 @@ namespace NServiceBus public System.TimeSpan HeartbeatInterval { get; set; } public System.Func MessageIdStrategy { get; set; } public System.TimeSpan NetworkRecoveryInterval { get; set; } + public System.Action OutgoingNativeMessageCustomization { get; set; } public NServiceBus.PrefetchCountCalculation PrefetchCountCalculation { get; set; } public System.TimeSpan TimeToWaitBeforeTriggeringCircuitBreaker { get; set; } public bool UseExternalAuthMechanism { get; set; } diff --git a/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs b/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs index e1ab2ef06..96c76b251 100644 --- a/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs +++ b/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs @@ -5,11 +5,13 @@ using System.Security.Cryptography.X509Certificates; using System.Threading; using System.Threading.Tasks; + using RabbitMQ.Client; using RabbitMQ.Client.Events; using Transport; using Transport.RabbitMQ; using ConnectionFactory = Transport.RabbitMQ.ConnectionFactory; + /// /// Transport definition for RabbitMQ. /// @@ -91,6 +93,11 @@ public TimeSpan TimeToWaitBeforeTriggeringCircuitBreaker } } + /// + /// The callback to use when customizing the message before dispatching it to the broker. + /// + public Action OutgoingNativeMessageCustomization { get; set; } + /// /// The calculation method for the prefetch count. The default is 3 times the maximum concurrency value. /// @@ -192,7 +199,7 @@ public override async Task Initialize(HostSettings host var converter = new MessageConverter(MessageIdStrategy); var infra = new RabbitMQTransportInfrastructure(hostSettings, receivers, connectionFactory, - RoutingTopology, channelProvider, converter, TimeToWaitBeforeTriggeringCircuitBreaker, + RoutingTopology, channelProvider, converter, OutgoingNativeMessageCustomization, TimeToWaitBeforeTriggeringCircuitBreaker, PrefetchCountCalculation, NetworkRecoveryInterval, SupportsDelayedDelivery); if (hostSettings.SetupInfrastructure) diff --git a/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransportInfrastructure.cs b/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransportInfrastructure.cs index b603d5e0a..01f1e7868 100644 --- a/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransportInfrastructure.cs +++ b/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransportInfrastructure.cs @@ -5,6 +5,7 @@ using System.Text; using System.Threading; using System.Threading.Tasks; + using global::RabbitMQ.Client; sealed class RabbitMQTransportInfrastructure : TransportInfrastructure { @@ -17,6 +18,7 @@ sealed class RabbitMQTransportInfrastructure : TransportInfrastructure public RabbitMQTransportInfrastructure(HostSettings hostSettings, ReceiveSettings[] receiverSettings, ConnectionFactory connectionFactory, IRoutingTopology routingTopology, ChannelProvider channelProvider, MessageConverter messageConverter, + Action messageCustomization, TimeSpan timeToWaitBeforeTriggeringCircuitBreaker, PrefetchCountCalculation prefetchCountCalculation, TimeSpan networkRecoveryInterval, bool supportsDelayedDelivery) { @@ -26,7 +28,7 @@ public RabbitMQTransportInfrastructure(HostSettings hostSettings, ReceiveSetting this.networkRecoveryInterval = networkRecoveryInterval; this.supportsDelayedDelivery = supportsDelayedDelivery; - Dispatcher = new MessageDispatcher(channelProvider, supportsDelayedDelivery); + Dispatcher = new MessageDispatcher(channelProvider, messageCustomization, supportsDelayedDelivery); Receivers = receiverSettings.Select(x => CreateMessagePump(hostSettings, x, messageConverter, timeToWaitBeforeTriggeringCircuitBreaker, prefetchCountCalculation)) .ToDictionary(x => x.Id, x => x); } diff --git a/src/NServiceBus.Transport.RabbitMQ/Sending/MessageDispatcher.cs b/src/NServiceBus.Transport.RabbitMQ/Sending/MessageDispatcher.cs index ee9a64e5c..817c79a00 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Sending/MessageDispatcher.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Sending/MessageDispatcher.cs @@ -10,11 +10,13 @@ class MessageDispatcher : IMessageDispatcher { readonly ChannelProvider channelProvider; + readonly Action messageCustomization; readonly bool supportsDelayedDelivery; - public MessageDispatcher(ChannelProvider channelProvider, bool supportsDelayedDelivery) + public MessageDispatcher(ChannelProvider channelProvider, Action messageCustomization, bool supportsDelayedDelivery) { this.channelProvider = channelProvider; + this.messageCustomization = messageCustomization ?? (static (_, _) => { }); this.supportsDelayedDelivery = supportsDelayedDelivery; } @@ -55,6 +57,7 @@ Task SendMessage(UnicastTransportOperation transportOperation, ConfirmsAwareChan var properties = new BasicProperties(); properties.Fill(message, transportOperation.Properties); + messageCustomization(transportOperation, properties); return channel.SendMessage(transportOperation.Destination, message, properties, cancellationToken); } @@ -67,6 +70,7 @@ Task PublishMessage(MulticastTransportOperation transportOperation, ConfirmsAwar var properties = new BasicProperties(); properties.Fill(message, transportOperation.Properties); + messageCustomization(transportOperation, properties); return channel.PublishMessage(transportOperation.MessageType, message, properties, cancellationToken); } From fdc6257d566f6a139cc0e0620f8a4f60e1cc36d8 Mon Sep 17 00:00:00 2001 From: Szymon Pobiega Date: Mon, 28 Oct 2024 12:23:46 +0100 Subject: [PATCH 2/4] Apply suggestions from code review Co-authored-by: Daniel Marbach --- .../RabbitMQTransport.cs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs b/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs index 96c76b251..f928e788e 100644 --- a/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs +++ b/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs @@ -94,8 +94,18 @@ public TimeSpan TimeToWaitBeforeTriggeringCircuitBreaker } /// - /// The callback to use when customizing the message before dispatching it to the broker. + /// Gets or sets the action that allows customization of the native + /// just before it is dispatched to the rabbitmq client. /// + /// + /// + /// This customization is applied after any configured transport customizations, meaning that + /// any changes made here may override or conflict with previous transport-level adjustments. + /// Exercise caution, as modifying the message at this stage can lead to unintended behavior + /// downstream if the message structure or properties are altered in ways that do not align + /// with expectations elsewhere in the system. + /// + /// public Action OutgoingNativeMessageCustomization { get; set; } /// From c3136d6d27c28cd97f4f024a47f809c38f849aac Mon Sep 17 00:00:00 2001 From: Szymon Pobiega Date: Mon, 4 Nov 2024 09:44:49 +0100 Subject: [PATCH 3/4] Apply suggestions from code review Co-authored-by: Daniel Marbach --- src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs b/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs index f928e788e..e66822c4b 100644 --- a/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs +++ b/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs @@ -11,7 +11,6 @@ using Transport.RabbitMQ; using ConnectionFactory = Transport.RabbitMQ.ConnectionFactory; - /// /// Transport definition for RabbitMQ. /// From b8849de5c5dd49da9566fe0accf89a6526bc5197 Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Tue, 5 Nov 2024 09:41:20 +0100 Subject: [PATCH 4/4] =?UTF-8?q?=E2=9A=9C=EF=B8=8F=20Boyscout=20rule?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../RabbitMQTransport.cs | 28 +++++++++++++++---- .../Sending/MessageDispatcher.cs | 6 +++- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs b/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs index e66822c4b..c10783eab 100644 --- a/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs +++ b/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs @@ -199,17 +199,35 @@ public override async Task Initialize(HostSettings host certCollection = new X509Certificate2Collection(ClientCertificate); } - var connectionFactory = new ConnectionFactory(hostSettings.Name, ConnectionConfiguration, certCollection, !ValidateRemoteCertificate, - UseExternalAuthMechanism, HeartbeatInterval, NetworkRecoveryInterval, additionalClusterNodes); + var connectionFactory = new ConnectionFactory( + hostSettings.Name, + ConnectionConfiguration, + certCollection, + !ValidateRemoteCertificate, + UseExternalAuthMechanism, + HeartbeatInterval, + NetworkRecoveryInterval, + additionalClusterNodes + ); var channelProvider = new ChannelProvider(connectionFactory, NetworkRecoveryInterval, RoutingTopology); await channelProvider.CreateConnection(cancellationToken).ConfigureAwait(false); var converter = new MessageConverter(MessageIdStrategy); - var infra = new RabbitMQTransportInfrastructure(hostSettings, receivers, connectionFactory, - RoutingTopology, channelProvider, converter, OutgoingNativeMessageCustomization, TimeToWaitBeforeTriggeringCircuitBreaker, - PrefetchCountCalculation, NetworkRecoveryInterval, SupportsDelayedDelivery); + var infra = new RabbitMQTransportInfrastructure( + hostSettings, + receivers, + connectionFactory, + RoutingTopology, + channelProvider, + converter, + OutgoingNativeMessageCustomization, + TimeToWaitBeforeTriggeringCircuitBreaker, + PrefetchCountCalculation, + NetworkRecoveryInterval, + SupportsDelayedDelivery + ); if (hostSettings.SetupInfrastructure) { diff --git a/src/NServiceBus.Transport.RabbitMQ/Sending/MessageDispatcher.cs b/src/NServiceBus.Transport.RabbitMQ/Sending/MessageDispatcher.cs index 817c79a00..c7af80f19 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Sending/MessageDispatcher.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Sending/MessageDispatcher.cs @@ -13,7 +13,11 @@ class MessageDispatcher : IMessageDispatcher readonly Action messageCustomization; readonly bool supportsDelayedDelivery; - public MessageDispatcher(ChannelProvider channelProvider, Action messageCustomization, bool supportsDelayedDelivery) + public MessageDispatcher( + ChannelProvider channelProvider, + Action messageCustomization, + bool supportsDelayedDelivery + ) { this.channelProvider = channelProvider; this.messageCustomization = messageCustomization ?? (static (_, _) => { });