Skip to content

Commit bb869a8

Browse files
Merge pull request #1468 from Particular/set-native-content-type
Allow setting customizing the outgoing messages by setting AMQP attributes just before dispatching the message to the broker
2 parents 49bee24 + b8849de commit bb869a8

File tree

5 files changed

+119
-7
lines changed

5 files changed

+119
-7
lines changed
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
namespace NServiceBus.Transport.RabbitMQ.AcceptanceTests
2+
{
3+
using System.Threading.Tasks;
4+
using AcceptanceTesting;
5+
using global::RabbitMQ.Client.Events;
6+
using NServiceBus.AcceptanceTests;
7+
using NServiceBus.AcceptanceTests.EndpointTemplates;
8+
using NUnit.Framework;
9+
10+
class When_customizing_outgoing_messages : NServiceBusAcceptanceTest
11+
{
12+
[Test]
13+
public async Task Should_set_value()
14+
{
15+
var scenario = await Scenario.Define<Context>()
16+
.WithEndpoint<Receiver>(b => b.When((bus, c) => bus.SendLocal(new Message())))
17+
.Done(c => c.MessageReceived)
18+
.Run();
19+
20+
Assert.That(scenario.BasicDeliverEventArgs.BasicProperties.AppId, Is.EqualTo("MyValue"));
21+
}
22+
23+
public class Receiver : EndpointConfigurationBuilder
24+
{
25+
public Receiver()
26+
{
27+
EndpointSetup<DefaultServer>(endpointConfiguration =>
28+
{
29+
endpointConfiguration.ConfigureRabbitMQTransport().OutgoingNativeMessageCustomization =
30+
(operation, properties) =>
31+
{
32+
properties.AppId = "MyValue";
33+
};
34+
});
35+
}
36+
37+
class MyEventHandler : IHandleMessages<Message>
38+
{
39+
Context testContext;
40+
41+
public MyEventHandler(Context testContext)
42+
{
43+
this.testContext = testContext;
44+
}
45+
46+
public Task Handle(Message message, IMessageHandlerContext context)
47+
{
48+
testContext.BasicDeliverEventArgs = context.Extensions.Get<BasicDeliverEventArgs>();
49+
testContext.MessageReceived = true;
50+
51+
return Task.CompletedTask;
52+
}
53+
}
54+
}
55+
56+
public class Message : IMessage
57+
{
58+
}
59+
60+
class Context : ScenarioContext
61+
{
62+
public bool MessageReceived { get; set; }
63+
64+
public BasicDeliverEventArgs BasicDeliverEventArgs { get; set; }
65+
}
66+
}
67+
}

src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ namespace NServiceBus
2222
public System.TimeSpan HeartbeatInterval { get; set; }
2323
public System.Func<RabbitMQ.Client.Events.BasicDeliverEventArgs, string> MessageIdStrategy { get; set; }
2424
public System.TimeSpan NetworkRecoveryInterval { get; set; }
25+
public System.Action<NServiceBus.Transport.IOutgoingTransportOperation, RabbitMQ.Client.IBasicProperties> OutgoingNativeMessageCustomization { get; set; }
2526
public NServiceBus.PrefetchCountCalculation PrefetchCountCalculation { get; set; }
2627
public System.TimeSpan TimeToWaitBeforeTriggeringCircuitBreaker { get; set; }
2728
public bool UseExternalAuthMechanism { get; set; }

src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Security.Cryptography.X509Certificates;
66
using System.Threading;
77
using System.Threading.Tasks;
8+
using RabbitMQ.Client;
89
using RabbitMQ.Client.Events;
910
using Transport;
1011
using Transport.RabbitMQ;
@@ -91,6 +92,21 @@ public TimeSpan TimeToWaitBeforeTriggeringCircuitBreaker
9192
}
9293
}
9394

95+
/// <summary>
96+
/// Gets or sets the action that allows customization of the native <see cref="BasicProperties"/>
97+
/// just before it is dispatched to the rabbitmq client.
98+
/// </summary>
99+
/// <remarks>
100+
/// <para>
101+
/// This customization is applied after any configured transport customizations, meaning that
102+
/// any changes made here may override or conflict with previous transport-level adjustments.
103+
/// Exercise caution, as modifying the message at this stage can lead to unintended behavior
104+
/// downstream if the message structure or properties are altered in ways that do not align
105+
/// with expectations elsewhere in the system.
106+
/// </para>
107+
/// </remarks>
108+
public Action<IOutgoingTransportOperation, IBasicProperties> OutgoingNativeMessageCustomization { get; set; }
109+
94110
/// <summary>
95111
/// The calculation method for the prefetch count. The default is 3 times the maximum concurrency value.
96112
/// </summary>
@@ -183,17 +199,35 @@ public override async Task<TransportInfrastructure> Initialize(HostSettings host
183199
certCollection = new X509Certificate2Collection(ClientCertificate);
184200
}
185201

186-
var connectionFactory = new ConnectionFactory(hostSettings.Name, ConnectionConfiguration, certCollection, !ValidateRemoteCertificate,
187-
UseExternalAuthMechanism, HeartbeatInterval, NetworkRecoveryInterval, additionalClusterNodes);
202+
var connectionFactory = new ConnectionFactory(
203+
hostSettings.Name,
204+
ConnectionConfiguration,
205+
certCollection,
206+
!ValidateRemoteCertificate,
207+
UseExternalAuthMechanism,
208+
HeartbeatInterval,
209+
NetworkRecoveryInterval,
210+
additionalClusterNodes
211+
);
188212

189213
var channelProvider = new ChannelProvider(connectionFactory, NetworkRecoveryInterval, RoutingTopology);
190214
await channelProvider.CreateConnection(cancellationToken).ConfigureAwait(false);
191215

192216
var converter = new MessageConverter(MessageIdStrategy);
193217

194-
var infra = new RabbitMQTransportInfrastructure(hostSettings, receivers, connectionFactory,
195-
RoutingTopology, channelProvider, converter, TimeToWaitBeforeTriggeringCircuitBreaker,
196-
PrefetchCountCalculation, NetworkRecoveryInterval, SupportsDelayedDelivery);
218+
var infra = new RabbitMQTransportInfrastructure(
219+
hostSettings,
220+
receivers,
221+
connectionFactory,
222+
RoutingTopology,
223+
channelProvider,
224+
converter,
225+
OutgoingNativeMessageCustomization,
226+
TimeToWaitBeforeTriggeringCircuitBreaker,
227+
PrefetchCountCalculation,
228+
NetworkRecoveryInterval,
229+
SupportsDelayedDelivery
230+
);
197231

198232
if (hostSettings.SetupInfrastructure)
199233
{

src/NServiceBus.Transport.RabbitMQ/RabbitMQTransportInfrastructure.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Text;
66
using System.Threading;
77
using System.Threading.Tasks;
8+
using global::RabbitMQ.Client;
89

910
sealed class RabbitMQTransportInfrastructure : TransportInfrastructure
1011
{
@@ -17,6 +18,7 @@ sealed class RabbitMQTransportInfrastructure : TransportInfrastructure
1718
public RabbitMQTransportInfrastructure(HostSettings hostSettings, ReceiveSettings[] receiverSettings,
1819
ConnectionFactory connectionFactory, IRoutingTopology routingTopology,
1920
ChannelProvider channelProvider, MessageConverter messageConverter,
21+
Action<IOutgoingTransportOperation, IBasicProperties> messageCustomization,
2022
TimeSpan timeToWaitBeforeTriggeringCircuitBreaker, PrefetchCountCalculation prefetchCountCalculation,
2123
TimeSpan networkRecoveryInterval, bool supportsDelayedDelivery)
2224
{
@@ -26,7 +28,7 @@ public RabbitMQTransportInfrastructure(HostSettings hostSettings, ReceiveSetting
2628
this.networkRecoveryInterval = networkRecoveryInterval;
2729
this.supportsDelayedDelivery = supportsDelayedDelivery;
2830

29-
Dispatcher = new MessageDispatcher(channelProvider, supportsDelayedDelivery);
31+
Dispatcher = new MessageDispatcher(channelProvider, messageCustomization, supportsDelayedDelivery);
3032
Receivers = receiverSettings.Select(x => CreateMessagePump(hostSettings, x, messageConverter, timeToWaitBeforeTriggeringCircuitBreaker, prefetchCountCalculation))
3133
.ToDictionary(x => x.Id, x => x);
3234
}

src/NServiceBus.Transport.RabbitMQ/Sending/MessageDispatcher.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,17 @@
1010
class MessageDispatcher : IMessageDispatcher
1111
{
1212
readonly ChannelProvider channelProvider;
13+
readonly Action<IOutgoingTransportOperation, IBasicProperties> messageCustomization;
1314
readonly bool supportsDelayedDelivery;
1415

15-
public MessageDispatcher(ChannelProvider channelProvider, bool supportsDelayedDelivery)
16+
public MessageDispatcher(
17+
ChannelProvider channelProvider,
18+
Action<IOutgoingTransportOperation, IBasicProperties> messageCustomization,
19+
bool supportsDelayedDelivery
20+
)
1621
{
1722
this.channelProvider = channelProvider;
23+
this.messageCustomization = messageCustomization ?? (static (_, _) => { });
1824
this.supportsDelayedDelivery = supportsDelayedDelivery;
1925
}
2026

@@ -56,6 +62,7 @@ ValueTask SendMessage(UnicastTransportOperation transportOperation, ConfirmsAwar
5662

5763
var properties = new BasicProperties();
5864
properties.Fill(message, transportOperation.Properties);
65+
messageCustomization(transportOperation, properties);
5966

6067
return channel.SendMessage(transportOperation.Destination, message, properties, cancellationToken);
6168
}
@@ -68,6 +75,7 @@ ValueTask PublishMessage(MulticastTransportOperation transportOperation, Confirm
6875

6976
var properties = new BasicProperties();
7077
properties.Fill(message, transportOperation.Properties);
78+
messageCustomization(transportOperation, properties);
7179

7280
return channel.PublishMessage(transportOperation.MessageType, message, properties, cancellationToken);
7381
}

0 commit comments

Comments
 (0)