Skip to content

Commit 6297709

Browse files
ramonsmitsSzymonPobiega
authored andcommitted
✨ Allow applying content-type without having NServiceBus.ContentType header entry
Use global customization function
1 parent f59e618 commit 6297709

File tree

6 files changed

+68
-4
lines changed

6 files changed

+68
-4
lines changed

src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ namespace NServiceBus
2222
public System.TimeSpan HeartbeatInterval { get; set; }
2323
public System.Func<RabbitMQ.Client.Events.BasicDeliverEventArgs, string> MessageIdStrategy { get; set; }
2424
public System.TimeSpan NetworkRecoveryInterval { get; set; }
25+
public System.Action<NServiceBus.Transport.IOutgoingTransportOperation, RabbitMQ.Client.IBasicProperties> OutgoingNativeMessageCustomization { get; set; }
2526
public NServiceBus.PrefetchCountCalculation PrefetchCountCalculation { get; set; }
2627
public System.TimeSpan TimeToWaitBeforeTriggeringCircuitBreaker { get; set; }
2728
public bool UseExternalAuthMechanism { get; set; }

src/NServiceBus.Transport.RabbitMQ.TransportTests/ConfigureRabbitMQTransportInfrastructure.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,17 @@ public TransportDefinition CreateTransportDefinition()
1616
{
1717
var connectionString = Environment.GetEnvironmentVariable("RabbitMQTransport_ConnectionString") ?? "host=localhost";
1818

19-
var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Classic), connectionString, false);
19+
var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Classic), connectionString, false)
20+
{
21+
//Used by When_customizing_outgoing_messages test
22+
OutgoingNativeMessageCustomization = (operation, properties) =>
23+
{
24+
if (operation.Properties.TryGetValue("ContentType", out var overrideContentType))
25+
{
26+
properties.ContentType = overrideContentType;
27+
}
28+
}
29+
};
2030

2131
return transport;
2232
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
namespace NServiceBus.Transport.RabbitMQ.TransportTests;
2+
3+
using System.Collections.Generic;
4+
using System.Threading.Tasks;
5+
using global::RabbitMQ.Client;
6+
using global::RabbitMQ.Client.Events;
7+
using NServiceBus.TransportTests;
8+
using NUnit.Framework;
9+
10+
public class When_customizing_outgoing_messages : NServiceBusTransportTest
11+
{
12+
[Test]
13+
public async Task Should_override_default_values()
14+
{
15+
var onMessageInvoked = CreateTaskCompletionSource<MessageContext>();
16+
IReadOnlyBasicProperties basicProps = null;
17+
18+
await StartPump(
19+
(context, _) =>
20+
{
21+
var basicEventArgs = context.Extensions.Get<BasicDeliverEventArgs>();
22+
basicProps = basicEventArgs.BasicProperties;
23+
24+
return onMessageInvoked.SetCompleted(context);
25+
},
26+
(_, __) => Task.FromResult(ErrorHandleResult.RetryRequired),
27+
TransportTransactionMode.ReceiveOnly);
28+
29+
var dispatchProperties = new DispatchProperties { ["ContentType"] = "my-content" };
30+
31+
await SendMessage(InputQueueName, new Dictionary<string, string>
32+
{
33+
{"test-header", "original"}
34+
}, null, dispatchProperties);
35+
36+
var messageContext = await onMessageInvoked.Task;
37+
38+
Assert.That(basicProps.ContentType, Is.EqualTo("my-content"));
39+
}
40+
}

src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@
55
using System.Security.Cryptography.X509Certificates;
66
using System.Threading;
77
using System.Threading.Tasks;
8+
using RabbitMQ.Client;
89
using RabbitMQ.Client.Events;
910
using Transport;
1011
using Transport.RabbitMQ;
1112
using ConnectionFactory = Transport.RabbitMQ.ConnectionFactory;
1213

14+
1315
/// <summary>
1416
/// Transport definition for RabbitMQ.
1517
/// </summary>
@@ -91,6 +93,11 @@ public TimeSpan TimeToWaitBeforeTriggeringCircuitBreaker
9193
}
9294
}
9395

96+
/// <summary>
97+
/// The callback to use when customizing the message before dispatching it to the broker.
98+
/// </summary>
99+
public Action<IOutgoingTransportOperation, IBasicProperties> OutgoingNativeMessageCustomization { get; set; }
100+
94101
/// <summary>
95102
/// The calculation method for the prefetch count. The default is 3 times the maximum concurrency value.
96103
/// </summary>
@@ -192,7 +199,7 @@ public override async Task<TransportInfrastructure> Initialize(HostSettings host
192199
var converter = new MessageConverter(MessageIdStrategy);
193200

194201
var infra = new RabbitMQTransportInfrastructure(hostSettings, receivers, connectionFactory,
195-
RoutingTopology, channelProvider, converter, TimeToWaitBeforeTriggeringCircuitBreaker,
202+
RoutingTopology, channelProvider, converter, OutgoingNativeMessageCustomization, TimeToWaitBeforeTriggeringCircuitBreaker,
196203
PrefetchCountCalculation, NetworkRecoveryInterval, SupportsDelayedDelivery);
197204

198205
if (hostSettings.SetupInfrastructure)

src/NServiceBus.Transport.RabbitMQ/RabbitMQTransportInfrastructure.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Text;
66
using System.Threading;
77
using System.Threading.Tasks;
8+
using global::RabbitMQ.Client;
89

910
sealed class RabbitMQTransportInfrastructure : TransportInfrastructure
1011
{
@@ -17,6 +18,7 @@ sealed class RabbitMQTransportInfrastructure : TransportInfrastructure
1718
public RabbitMQTransportInfrastructure(HostSettings hostSettings, ReceiveSettings[] receiverSettings,
1819
ConnectionFactory connectionFactory, IRoutingTopology routingTopology,
1920
ChannelProvider channelProvider, MessageConverter messageConverter,
21+
Action<IOutgoingTransportOperation, IBasicProperties> messageCustomization,
2022
TimeSpan timeToWaitBeforeTriggeringCircuitBreaker, PrefetchCountCalculation prefetchCountCalculation,
2123
TimeSpan networkRecoveryInterval, bool supportsDelayedDelivery)
2224
{
@@ -26,7 +28,7 @@ public RabbitMQTransportInfrastructure(HostSettings hostSettings, ReceiveSetting
2628
this.networkRecoveryInterval = networkRecoveryInterval;
2729
this.supportsDelayedDelivery = supportsDelayedDelivery;
2830

29-
Dispatcher = new MessageDispatcher(channelProvider, supportsDelayedDelivery);
31+
Dispatcher = new MessageDispatcher(channelProvider, messageCustomization, supportsDelayedDelivery);
3032
Receivers = receiverSettings.Select(x => CreateMessagePump(hostSettings, x, messageConverter, timeToWaitBeforeTriggeringCircuitBreaker, prefetchCountCalculation))
3133
.ToDictionary(x => x.Id, x => x);
3234
}

src/NServiceBus.Transport.RabbitMQ/Sending/MessageDispatcher.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@
1010
class MessageDispatcher : IMessageDispatcher
1111
{
1212
readonly ChannelProvider channelProvider;
13+
readonly Action<IOutgoingTransportOperation, IBasicProperties> messageCustomization;
1314
readonly bool supportsDelayedDelivery;
1415

15-
public MessageDispatcher(ChannelProvider channelProvider, bool supportsDelayedDelivery)
16+
public MessageDispatcher(ChannelProvider channelProvider, Action<IOutgoingTransportOperation, IBasicProperties> messageCustomization, bool supportsDelayedDelivery)
1617
{
1718
this.channelProvider = channelProvider;
19+
this.messageCustomization = messageCustomization ?? ((_, _) => { });
1820
this.supportsDelayedDelivery = supportsDelayedDelivery;
1921
}
2022

@@ -55,6 +57,7 @@ Task SendMessage(UnicastTransportOperation transportOperation, ConfirmsAwareChan
5557

5658
var properties = new BasicProperties();
5759
properties.Fill(message, transportOperation.Properties);
60+
messageCustomization(transportOperation, properties);
5861

5962
return channel.SendMessage(transportOperation.Destination, message, properties, cancellationToken);
6063
}
@@ -67,6 +70,7 @@ Task PublishMessage(MulticastTransportOperation transportOperation, ConfirmsAwar
6770

6871
var properties = new BasicProperties();
6972
properties.Fill(message, transportOperation.Properties);
73+
messageCustomization(transportOperation, properties);
7074

7175
return channel.PublishMessage(transportOperation.MessageType, message, properties, cancellationToken);
7276
}

0 commit comments

Comments
 (0)