Skip to content

Commit 496d728

Browse files
committed
Add ability to specify a hierarchy namespace
1 parent 1374081 commit 496d728

File tree

4 files changed

+45
-14
lines changed

4 files changed

+45
-14
lines changed

src/Transport/AzureServiceBusTransport.cs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,9 @@ public override async Task<TransportInfrastructure> Initialize(HostSettings host
7878
{
7979
TransportType = transportType,
8080
EnableCrossEntityTransactions = enableCrossEntityTransactions,
81-
Identifier = $"Client-{receiver.Id}-{receiver.ReceiveAddress}-{Guid.NewGuid()}",
81+
Identifier = HierarchyNamespace is null
82+
? $"Client-{receiver.Id}-{receiver.ReceiveAddress}-{Guid.NewGuid()}"
83+
: $"Client-{HierarchyNamespace}-{receiver.Id}-{receiver.ReceiveAddress}-{Guid.NewGuid()}",
8284
};
8385
ApplyRetryPolicyOptionsIfNeeded(receiveClientOptions);
8486
ApplyWebProxyIfNeeded(receiveClientOptions);
@@ -93,7 +95,9 @@ public override async Task<TransportInfrastructure> Initialize(HostSettings host
9395
TransportType = transportType,
9496
// for the default client we never want things to automatically use cross entity transaction
9597
EnableCrossEntityTransactions = false,
96-
Identifier = $"Client-{hostSettings.Name}-{Guid.NewGuid()}"
98+
Identifier = HierarchyNamespace is null
99+
? $"Client-{hostSettings.Name}-{Guid.NewGuid()}"
100+
: $"Client-{HierarchyNamespace}-{hostSettings.Name}-{Guid.NewGuid()}"
97101
};
98102
ApplyRetryPolicyOptionsIfNeeded(defaultClientOptions);
99103
ApplyWebProxyIfNeeded(defaultClientOptions);
@@ -105,7 +109,7 @@ public override async Task<TransportInfrastructure> Initialize(HostSettings host
105109
? new ServiceBusAdministrationClient(FullyQualifiedNamespace, TokenCredential)
106110
: new ServiceBusAdministrationClient(ConnectionString);
107111

108-
var infrastructure = new AzureServiceBusTransportInfrastructure(this, hostSettings, receiveSettingsAndClientPairs, defaultClient, administrationClient);
112+
var infrastructure = new AzureServiceBusTransportInfrastructure(this, hostSettings, receiveSettingsAndClientPairs, defaultClient, administrationClient, HierarchyNamespace);
109113

110114
if (hostSettings.SetupInfrastructure)
111115
{
@@ -114,7 +118,7 @@ await administrationClient.AssertNamespaceManageRightsAvailable(cancellationToke
114118

115119
var allQueues = infrastructure.Receivers
116120
.Select(r => r.Value.ReceiveAddress)
117-
.Concat(sendingAddresses)
121+
.Concat(sendingAddresses.Select(s => HierarchyNamespace is null || s.StartsWith(HierarchyNamespace) ? s : $"{HierarchyNamespace}/{s}"))
118122
.ToArray();
119123

120124
var queueCreator = new TopologyCreator(this);
@@ -193,6 +197,13 @@ internal set
193197

194198
TopicTopology topology;
195199

200+
/// <summary>
201+
/// Specifies the value that will be prepended to every entity name referenced by the endpoint.
202+
/// This will ensure that all entities will be part of the <see href="https://learn.microsoft.com/en-us/rest/api/servicebus/addressing-and-protocol">same hierarchy</see>
203+
/// within the ServiceBus namespace.
204+
/// </summary>
205+
public string? HierarchyNamespace { get; set; }
206+
196207
/// <summary>
197208
/// The maximum size used when creating queues and topics in GB.
198209
/// </summary>

src/Transport/AzureServiceBusTransportInfrastructure.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,24 @@ sealed class AzureServiceBusTransportInfrastructure : TransportInfrastructure
1616
readonly HostSettings hostSettings;
1717
readonly ServiceBusClient defaultClient;
1818
readonly ServiceBusAdministrationClient administrationClient;
19+
readonly string? hierarchyNamespace;
1920
readonly (ReceiveSettings receiveSettings, ServiceBusClient client)[] receiveSettingsAndClientPairs;
2021

2122
public AzureServiceBusTransportInfrastructure(
2223
AzureServiceBusTransport transportSettings,
2324
HostSettings hostSettings,
2425
(ReceiveSettings receiveSettings, ServiceBusClient client)[] receiveSettingsAndClientPairs,
2526
ServiceBusClient defaultClient,
26-
ServiceBusAdministrationClient administrationClient
27+
ServiceBusAdministrationClient administrationClient,
28+
string? hierarchyNamespace
2729
)
2830
{
2931
this.transportSettings = transportSettings;
3032

3133
this.hostSettings = hostSettings;
3234
this.defaultClient = defaultClient;
3335
this.administrationClient = administrationClient;
36+
this.hierarchyNamespace = hierarchyNamespace;
3437
this.receiveSettingsAndClientPairs = receiveSettingsAndClientPairs;
3538

3639
messageSenderRegistry = new MessageSenderRegistry();
@@ -39,6 +42,7 @@ ServiceBusAdministrationClient administrationClient
3942
defaultClient,
4043
messageSenderRegistry,
4144
transportSettings.Topology,
45+
hierarchyNamespace,
4246
transportSettings.OutgoingNativeMessageCustomization
4347
);
4448
Receivers = receiveSettingsAndClientPairs.ToDictionary(static settingsAndClient =>
@@ -138,7 +142,9 @@ await Task.WhenAll(Receivers.Values.Select(r => r.StopReceive(cancellationToken)
138142

139143
public override string ToTransportAddress(QueueAddress address)
140144
{
141-
var queue = new StringBuilder(address.BaseAddress);
145+
var queue = hierarchyNamespace is null
146+
? new StringBuilder(address.BaseAddress)
147+
: new StringBuilder($"{hierarchyNamespace}/{address.BaseAddress}");
142148

143149
if (address.Discriminator != null)
144150
{

src/Transport/Sending/MessageDispatcher.cs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ class MessageDispatcher(
1313
ServiceBusClient defaultClient,
1414
MessageSenderRegistry messageSenderRegistry,
1515
TopicTopology topology,
16+
string? hierarchyNamespace,
1617
OutgoingNativeMessageCustomizationAction? customizerCallback = null)
1718
: IMessageDispatcher
1819
{
@@ -31,9 +32,7 @@ public async Task Dispatch(TransportOperations outgoingMessages, TransportTransa
3132
var unicastTransportOperations = outgoingMessages.UnicastTransportOperations;
3233
var multicastTransportOperations = outgoingMessages.MulticastTransportOperations;
3334

34-
var transportOperations =
35-
new List<IOutgoingTransportOperation>(unicastTransportOperations.Count +
36-
multicastTransportOperations.Count);
35+
var transportOperations = new List<IOutgoingTransportOperation>(unicastTransportOperations.Count + multicastTransportOperations.Count);
3736
transportOperations.AddRange(unicastTransportOperations);
3837
transportOperations.AddRange(multicastTransportOperations);
3938

@@ -44,7 +43,7 @@ public async Task Dispatch(TransportOperations outgoingMessages, TransportTransa
4443

4544
foreach (var operation in transportOperations)
4645
{
47-
var destination = operation.ExtractDestination(topology);
46+
var destination = operation.ExtractDestination(topology, hierarchyNamespace);
4847
switch (operation.RequiredDispatchConsistency)
4948
{
5049
case DispatchConsistency.Default:

src/Transport/Sending/OutgoingTransportOperationExtensions.cs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
namespace NServiceBus.Transport.AzureServiceBus;
22

33
using System;
4+
using System.Collections.Concurrent;
45
using Azure.Messaging.ServiceBus;
56
using Logging;
67

78
static class OutgoingTransportOperationExtensions
89
{
10+
static readonly ConcurrentDictionary<string, string> destinationCache = [];
11+
912
public static void ApplyCustomizationToOutgoingNativeMessage(
1013
this IOutgoingTransportOperation transportOperation,
1114
ServiceBusMessage message, TransportTransaction transportTransaction, ILog logger)
@@ -28,14 +31,19 @@ public static void ApplyCustomizationToOutgoingNativeMessage(
2831
}
2932

3033
public static string ExtractDestination(this IOutgoingTransportOperation outgoingTransportOperation,
31-
TopicTopology topology)
34+
TopicTopology topology,
35+
string? hierarchyNamespace)
3236
{
37+
string destination;
38+
3339
switch (outgoingTransportOperation)
3440
{
3541
case MulticastTransportOperation multicastTransportOperation:
36-
return topology.GetPublishDestination(multicastTransportOperation.MessageType);
42+
destination = topology.GetPublishDestination(multicastTransportOperation.MessageType);
43+
break;
44+
3745
case UnicastTransportOperation unicastTransportOperation:
38-
var destination = unicastTransportOperation.Destination;
46+
destination = unicastTransportOperation.Destination;
3947

4048
// Workaround for reply-to address set by ASB transport
4149
var index = unicastTransportOperation.Destination.IndexOf('@');
@@ -45,9 +53,16 @@ public static string ExtractDestination(this IOutgoingTransportOperation outgoin
4553
destination = destination[..index];
4654
}
4755

48-
return destination;
56+
break;
57+
4958
default:
5059
throw new ArgumentOutOfRangeException(nameof(outgoingTransportOperation));
5160
}
61+
62+
return destinationCache.GetOrAdd(
63+
destination,
64+
static (dest, ns) => ns is null || dest.StartsWith(ns)
65+
? dest
66+
: $"{ns}/{dest}", hierarchyNamespace);
5267
}
5368
}

0 commit comments

Comments
 (0)