From caa457f399d9e1cb6585125a7feba37dfc1aa574 Mon Sep 17 00:00:00 2001 From: Travis Nickels Date: Tue, 7 Jan 2025 21:05:10 -0800 Subject: [PATCH 01/24] Update ManagementClient to check default connection configuration --- .../Administration/BrokerVerifier.cs | 1 + .../ManagementClient/ManagementClient.cs | 136 ++++++++++++++++-- 2 files changed, 123 insertions(+), 14 deletions(-) diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/BrokerVerifier.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/BrokerVerifier.cs index 99730d95c..3edb23c23 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Administration/BrokerVerifier.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/BrokerVerifier.cs @@ -23,6 +23,7 @@ public async Task Initialize(CancellationToken cancellationToken = default) { if (managementClientAvailable) { + await managementClient.ValidateConnectionConfiguration(cancellationToken).ConfigureAwait(false); var response = await managementClient.GetOverview(cancellationToken).ConfigureAwait(false); if (response.HasValue) { diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/ManagementClient.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/ManagementClient.cs index ae05e9684..f652f8c4c 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/ManagementClient.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/ManagementClient.cs @@ -12,28 +12,58 @@ namespace NServiceBus.Transport.RabbitMQ.ManagementClient; class ManagementClient { - readonly HttpClient httpClient; - readonly string virtualHost; + HttpClient httpClient; + + public int Port { get; private set; } + public string UserName { get; private set; } + public string Password { get; private set; } + + public readonly string Host; + public readonly bool UseTls; + public readonly string VirtualHost; + + readonly ConnectionConfiguration ConnectionConfiguration; readonly string escapedVirtualHost; + const int defaultManagementPort = 15672; + const int defaultManagementTlsPort = 15671; + const string defaultUserName = "guest"; + const string defaultPassword = "guest"; + public ManagementClient(ConnectionConfiguration connectionConfiguration) { ArgumentNullException.ThrowIfNull(connectionConfiguration, nameof(connectionConfiguration)); + ConnectionConfiguration = connectionConfiguration; - virtualHost = connectionConfiguration.VirtualHost; - escapedVirtualHost = Uri.EscapeDataString(virtualHost); + Host = ConnectionConfiguration.Host; + VirtualHost = ConnectionConfiguration.VirtualHost; + escapedVirtualHost = Uri.EscapeDataString(VirtualHost); + Port = ConnectionConfiguration.Port; + UserName = ConnectionConfiguration.UserName; + Password = ConnectionConfiguration.Password; + UseTls = ConnectionConfiguration.UseTls; - var uriBuilder = new UriBuilder + httpClient = CreateHttpClient(Port, UserName, Password); + } + + public async Task ValidateConnectionConfiguration(CancellationToken cancellationToken = default) + { + if (await IsConnectionValid(cancellationToken).ConfigureAwait(false) + || await IsConnectionValidWithDefaultPort(cancellationToken).ConfigureAwait(false) + || await IsConnectionValidWithDefaultAuthorization(cancellationToken).ConfigureAwait(false)) { - Scheme = connectionConfiguration.UseTls ? "https" : "http", - Host = connectionConfiguration.Host, - Port = connectionConfiguration.Port, - }; + return; + } - httpClient = new HttpClient { BaseAddress = uriBuilder.Uri }; - httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue( - "Basic", - Convert.ToBase64String(Encoding.ASCII.GetBytes($"{connectionConfiguration.UserName}:{connectionConfiguration.Password}"))); + SetDefaultManagementPort(); + SetManagementAuthorization(httpClient, defaultUserName, defaultPassword); + + if (await IsConnectionValid(cancellationToken).ConfigureAwait(false)) + { + return; + } + + throw new HttpRequestException("The management connection configuration could not be validated with the supplied connection string or the default values."); } public async Task> GetQueue(string queueName, CancellationToken cancellationToken = default) @@ -93,7 +123,7 @@ public async Task CreatePolicy(Policy policy, CancellationToken cancellationToke { ArgumentNullException.ThrowIfNull(policy, nameof(policy)); - policy.VirtualHost = virtualHost; + policy.VirtualHost = VirtualHost; var escapedPolicyName = Uri.EscapeDataString(policy.Name); var response = await httpClient.PutAsJsonAsync($"api/policies/{escapedVirtualHost}/{escapedPolicyName}", policy, cancellationToken) @@ -101,4 +131,82 @@ public async Task CreatePolicy(Policy policy, CancellationToken cancellationToke response.EnsureSuccessStatusCode(); } + + HttpClient CreateHttpClient(int port, string userName, string password) + { + var uriBuilder = new UriBuilder + { + Scheme = UseTls ? "https" : "http", + Host = Host, + Port = port, + }; + + Port = port; + UserName = userName ?? defaultUserName; + Password = password ?? defaultPassword; + var client = new HttpClient { BaseAddress = uriBuilder.Uri }; + SetManagementAuthorization(client, UserName, Password); + return client; + } + + void SetManagementAuthorization(HttpClient client, string userName, string password) => + client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue( + "Basic", + Convert.ToBase64String(Encoding.ASCII.GetBytes($"{userName}:{password}"))); + + void UpdateHttpClientPort(int port) => httpClient = CreateHttpClient(port, UserName, Password); + + void SetDefaultManagementPort() + { + if (ConnectionConfiguration.UseTls) + { + UpdateHttpClientPort(defaultManagementTlsPort); + return; + } + UpdateHttpClientPort(defaultManagementPort); + } + + async Task IsConnectionValid(CancellationToken cancellationToken) + { + try + { + var request = new HttpRequestMessage(HttpMethod.Head, httpClient.BaseAddress); + var response = await httpClient.SendAsync(request, cancellationToken).ConfigureAwait(false); + return response.IsSuccessStatusCode; + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + throw; + } + catch (Exception) + { + return false; + } + } + + async Task IsConnectionValidWithDefaultPort(CancellationToken cancellationToken) + { + SetDefaultManagementPort(); + + if (await IsConnectionValid(cancellationToken).ConfigureAwait(false)) + { + return true; + } + + UpdateHttpClientPort(ConnectionConfiguration.Port); + + return false; + } + + async Task IsConnectionValidWithDefaultAuthorization(CancellationToken cancellationToken) + { + SetManagementAuthorization(httpClient, defaultUserName, defaultPassword); + if (await IsConnectionValid(cancellationToken).ConfigureAwait(false)) + { + return true; + } + + SetManagementAuthorization(httpClient, ConnectionConfiguration.UserName, ConnectionConfiguration.Password); + return false; + } } From ab1916c242cfa5062e2173621ab83a5b6be8c18a Mon Sep 17 00:00:00 2001 From: Travis Nickels Date: Tue, 7 Jan 2025 22:49:41 -0800 Subject: [PATCH 02/24] Add connection validation test to management client tests --- .../ManagementClientTests.cs | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/ManagementClientTests.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/ManagementClientTests.cs index cede74f2e..f86fcd134 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/ManagementClientTests.cs +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/ManagementClientTests.cs @@ -21,6 +21,57 @@ class ManagementClientTests static readonly ConnectionFactory connectionFactory = new(typeof(ManagementClientTests).FullName, connectionConfiguration, null, false, false, TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(10), []); static readonly ManagementClient client = new(managementConnectionConfiguration); + const int defaultBrokerPort = 5672; + const int defaultBrokerTlsPort = 5671; + const int defaultManagementPort = 15672; + const int defaultManagementTlsPort = 15671; + const string defaultUserName = "guest"; + const string defaultPassword = "guest"; + + static IEnumerable ConnectionConfigurationTestCases() + { + yield return new TestCaseData( + $"Host=wronghost1;VirtualHost=/;Port={defaultBrokerPort};UserName={defaultUserName};Password={defaultPassword};UseTls=False", + defaultManagementPort, + defaultUserName, + defaultPassword); + yield return new TestCaseData( + $"Host=wronghost2;VirtualHost=/;Port={defaultBrokerTlsPort};UserName={defaultUserName};Password={defaultPassword};UseTls=True", + defaultManagementTlsPort, + defaultUserName, + defaultPassword); + yield return new TestCaseData( + $"Host=wronghost3;VirtualHost=/;Port=12345;UserName={defaultUserName};Password={defaultPassword};UseTls=False", + 12345, + defaultUserName, + defaultPassword); + yield return new TestCaseData( + $"Host=wronghost4;VirtualHost=/;Port={defaultBrokerPort};UserName=fakeUser;Password=fakePassword;UseTls=True", + defaultManagementPort, + "fakeUser", + "fakePassword"); + yield return new TestCaseData( + $"Host=wronghost5;VirtualHost=/;Port={defaultBrokerPort};UseTls=True", + defaultManagementPort, + defaultUserName, + defaultPassword); + } + [Test, TestCaseSource(nameof(ConnectionConfigurationTestCases))] + public void ValidateConnectionConfiguration_Should_Set_Default_Port_And_Authorization_Configurations(string connectionString, int expectedPort, string expectedUserName, string expectedPassword) + { + var connectionConfiguration = ConnectionConfiguration.Create(connectionString); + var client = new ManagementClient(connectionConfiguration); + var exception = Assert.ThrowsAsync(async () => await client.ValidateConnectionConfiguration()); + + Assert.Multiple(() => + { + Assert.That(exception!.Message, Is.EqualTo("The management connection configuration could not be validated with the supplied connection string or the default values.")); + Assert.That(client.Port, Is.EqualTo(expectedPort)); + Assert.That(client.UserName, Is.EqualTo(expectedUserName)); + Assert.That(client.Password, Is.EqualTo(expectedPassword)); + }); + } + [Test] public async Task GetQueue_Should_Return_Queue_Information_When_Exists() { From 0e19d9ad20a7019f99026ae98f66087c6bd25c88 Mon Sep 17 00:00:00 2001 From: Travis Nickels Date: Wed, 8 Jan 2025 11:37:07 -0800 Subject: [PATCH 03/24] Revert ConnectionConfiguration changes for management api --- .../ConnectionConfigurationTests.cs | 8 +++--- .../ManagementClientTests.cs | 2 +- .../Configuration/ConnectionConfiguration.cs | 25 +++---------------- .../RabbitMQTransport.cs | 20 +++++++-------- 4 files changed, 19 insertions(+), 36 deletions(-) diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/Connection/ConnectionConfigurationTests.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/Connection/ConnectionConfigurationTests.cs index c3af26cdd..b5bdf4a85 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/Connection/ConnectionConfigurationTests.cs +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/Connection/ConnectionConfigurationTests.cs @@ -22,7 +22,7 @@ class ConnectionConfigurationTests static HostSettings HostSettings { get; } = new(nameof(ConnectionConfigurationTests), nameof(ConnectionConfigurationTests), null, null, false); static readonly ConnectionConfiguration brokerDefaults = ConnectionConfiguration.Create("host=localhost"); - static readonly ConnectionConfiguration managementDefaults = ConnectionConfiguration.Create("host=localhost", isManagementConnection: true); + static readonly ConnectionConfiguration managementDefaults = ConnectionConfiguration.Create("host=localhost"); static string CreateManagementConnectionString(string connectionString) { @@ -187,7 +187,7 @@ public void Should_set_default_port() Assert.Multiple(() => { Assert.That(brokerDefaults.Port, Is.EqualTo(5672)); - Assert.That(managementDefaults.Port, Is.EqualTo(15672)); + Assert.That(managementDefaults.Port, Is.EqualTo(5672)); }); } @@ -249,7 +249,7 @@ public void Should_configure_broker_and_management_connection_configurations_wit Assert.That(transport.ManagementConnectionConfiguration.Host, Is.EqualTo("localhost")); Assert.That(transport.ManagementConnectionConfiguration.UserName, Is.EqualTo("guest")); Assert.That(transport.ManagementConnectionConfiguration.Password, Is.EqualTo("guest")); - Assert.That(transport.ManagementConnectionConfiguration.Port, Is.EqualTo(15672)); // This should be set to the default management port + Assert.That(transport.ManagementConnectionConfiguration.Port, Is.EqualTo(5672)); // This should be set to the default management port Assert.That(transport.ManagementConnectionConfiguration.UseTls, Is.EqualTo(false)); }); } @@ -401,7 +401,7 @@ public async Task Should_set_default_port_values_for_broker_and_management_conne Assert.Multiple(() => { Assert.That(transport.BrokerConnectionConfiguration.Port, Is.EqualTo(5672)); - Assert.That(transport.ManagementConnectionConfiguration.Port, Is.EqualTo(15672)); + Assert.That(transport.ManagementConnectionConfiguration.Port, Is.EqualTo(5672)); }); } diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/ManagementClientTests.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/ManagementClientTests.cs index f86fcd134..9f4d74a98 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/ManagementClientTests.cs +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/ManagementClientTests.cs @@ -17,7 +17,7 @@ class ManagementClientTests { static readonly string connectionString = Environment.GetEnvironmentVariable("RabbitMQTransport_ConnectionString") ?? "host=localhost"; static readonly ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.Create(connectionString); - static readonly ConnectionConfiguration managementConnectionConfiguration = ConnectionConfiguration.Create(connectionString, isManagementConnection: true); + static readonly ConnectionConfiguration managementConnectionConfiguration = ConnectionConfiguration.Create(connectionString); static readonly ConnectionFactory connectionFactory = new(typeof(ManagementClientTests).FullName, connectionConfiguration, null, false, false, TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(10), []); static readonly ManagementClient client = new(managementConnectionConfiguration); diff --git a/src/NServiceBus.Transport.RabbitMQ/Configuration/ConnectionConfiguration.cs b/src/NServiceBus.Transport.RabbitMQ/Configuration/ConnectionConfiguration.cs index 80fe920e8..3eca08219 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Configuration/ConnectionConfiguration.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Configuration/ConnectionConfiguration.cs @@ -9,10 +9,8 @@ class ConnectionConfiguration { const bool defaultUseTls = false; - const int defaultBrokerPort = 5672; - const int defaultBrokerTlsPort = 5671; - const int defaultManagementPort = 15672; - const int defaultManagementTlsPort = 15671; + const int defaultPort = 5672; + const int defaultTlsPort = 5671; const string defaultVirtualHost = "/"; const string defaultUserName = "guest"; const string defaultPassword = "guest"; @@ -45,7 +43,7 @@ class ConnectionConfiguration UseTls = useTls; } - public static ConnectionConfiguration Create(string connectionString, bool isManagementConnection = false) + public static ConnectionConfiguration Create(string connectionString) { Dictionary dictionary; var invalidOptionsMessage = new StringBuilder(); @@ -61,10 +59,7 @@ public static ConnectionConfiguration Create(string connectionString, bool isMan var host = GetValue(dictionary, "host", string.Empty); var useTls = GetValue(dictionary, "useTls", bool.TryParse, defaultUseTls, invalidOptionsMessage); - var port = GetValue(dictionary, "port", int.TryParse, useTls ? - (isManagementConnection ? defaultManagementTlsPort : defaultBrokerTlsPort) : - (isManagementConnection ? defaultManagementPort : defaultBrokerPort), - invalidOptionsMessage); + var port = GetValue(dictionary, "port", int.TryParse, useTls ? defaultTlsPort : defaultPort, invalidOptionsMessage); var virtualHost = GetValue(dictionary, "virtualHost", defaultVirtualHost); var userName = GetValue(dictionary, "userName", defaultUserName); var password = GetValue(dictionary, "password", defaultPassword); @@ -77,18 +72,6 @@ public static ConnectionConfiguration Create(string connectionString, bool isMan return new ConnectionConfiguration(host, port, virtualHost, userName, password, useTls); } - public static ConnectionConfiguration ConvertToManagementConnection(ConnectionConfiguration brokerConnectionConfiguration) - { - var virtualHost = brokerConnectionConfiguration.VirtualHost; - var host = brokerConnectionConfiguration.Host; - var port = defaultManagementPort; - var useTls = brokerConnectionConfiguration.UseTls; - var userName = brokerConnectionConfiguration.UserName; - var password = brokerConnectionConfiguration.Password; - - return new ConnectionConfiguration(host, port, virtualHost, userName, password, useTls); - } - static Dictionary ParseAmqpConnectionString(string connectionString, StringBuilder invalidOptionsMessage) { var dictionary = new Dictionary(); diff --git a/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs b/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs index 4c2b1d16e..3f67e3d51 100644 --- a/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs +++ b/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs @@ -53,9 +53,9 @@ public RabbitMQTransport(RoutingTopology routingTopology, string connectionStrin RoutingTopology = routingTopology.Create(); BrokerConnectionConfiguration = ConnectionConfiguration.Create(connectionString); - ManagementConnectionConfiguration = string.IsNullOrEmpty(managementConnectionString) ? - ConnectionConfiguration.ConvertToManagementConnection(BrokerConnectionConfiguration) : - ConnectionConfiguration.Create(managementConnectionString, isManagementConnection: true); + ManagementConnectionConfiguration = string.IsNullOrEmpty(managementConnectionString) + ? BrokerConnectionConfiguration + : ConnectionConfiguration.Create(managementConnectionString); } /// @@ -87,9 +87,9 @@ public RabbitMQTransport(RoutingTopology routingTopology, string connectionStrin RoutingTopology = routingTopology.Create(); BrokerConnectionConfiguration = ConnectionConfiguration.Create(connectionString); - ManagementConnectionConfiguration = string.IsNullOrEmpty(managementConnectionString) ? - ConnectionConfiguration.ConvertToManagementConnection(BrokerConnectionConfiguration) : - ConnectionConfiguration.Create(managementConnectionString, isManagementConnection: true); + ManagementConnectionConfiguration = string.IsNullOrEmpty(managementConnectionString) + ? BrokerConnectionConfiguration + : ConnectionConfiguration.Create(managementConnectionString); } internal ConnectionConfiguration BrokerConnectionConfiguration { get; set; } @@ -247,7 +247,7 @@ public override async Task Initialize(HostSettings host // Uses the legacy Management API connection string or default to the RabbitMQ broker connection credentials if (!string.IsNullOrEmpty(LegacyManagementApiConnectionString)) { - ManagementConnectionConfiguration = ConnectionConfiguration.Create(LegacyManagementApiConnectionString, isManagementConnection: true); + ManagementConnectionConfiguration = ConnectionConfiguration.Create(LegacyManagementApiConnectionString); } var brokerVerifier = new BrokerVerifier(connectionFactory, !DoNotUseManagementClient, ManagementConnectionConfiguration); @@ -318,9 +318,9 @@ void ValidateAndApplyLegacyConfiguration() BrokerConnectionConfiguration = ConnectionConfiguration.Create(LegacyApiConnectionString); // Uses the legacy management API connection string or build the string from the legacy broker connection configuration - ManagementConnectionConfiguration = !string.IsNullOrEmpty(LegacyManagementApiConnectionString) ? - ConnectionConfiguration.Create(LegacyManagementApiConnectionString, isManagementConnection: true) : - ConnectionConfiguration.ConvertToManagementConnection(BrokerConnectionConfiguration); + ManagementConnectionConfiguration = string.IsNullOrEmpty(LegacyManagementApiConnectionString) + ? BrokerConnectionConfiguration + : ConnectionConfiguration.Create(LegacyManagementApiConnectionString); } void VaildateTopologyFactory() From 3b0bb3ab62142320580bc7fa51899bdd827ea7e9 Mon Sep 17 00:00:00 2001 From: Travis Nickels Date: Sun, 12 Jan 2025 23:54:10 -0800 Subject: [PATCH 04/24] Remove API constructor parameter and add property URL string --- .../RabbitMQTransportSettingsExtensions.cs | 4 +- .../RabbitMQTransport.cs | 67 ++++++------------- 2 files changed, 24 insertions(+), 47 deletions(-) diff --git a/src/NServiceBus.Transport.RabbitMQ/Configuration/RabbitMQTransportSettingsExtensions.cs b/src/NServiceBus.Transport.RabbitMQ/Configuration/RabbitMQTransportSettingsExtensions.cs index 6227c20f9..26afe4721 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Configuration/RabbitMQTransportSettingsExtensions.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Configuration/RabbitMQTransportSettingsExtensions.cs @@ -107,7 +107,7 @@ public static TransportExtensions ManagementConnectionString( ArgumentNullException.ThrowIfNull(transportExtensions); ArgumentException.ThrowIfNullOrWhiteSpace(connectionString); - transportExtensions.Transport.LegacyManagementApiConnectionString = connectionString; + transportExtensions.Transport.LegacyManagementApiUrl = connectionString; return transportExtensions; } @@ -122,7 +122,7 @@ public static TransportExtensions ManagementConnectionString( ArgumentNullException.ThrowIfNull(transportExtensions); ArgumentNullException.ThrowIfNull(getConnectionString); - transportExtensions.Transport.LegacyManagementApiConnectionString = getConnectionString(); + transportExtensions.Transport.LegacyManagementApiUrl = getConnectionString(); return transportExtensions; } diff --git a/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs b/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs index 3f67e3d51..01e536926 100644 --- a/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs +++ b/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs @@ -9,6 +9,7 @@ using RabbitMQ.Client.Events; using Transport; using Transport.RabbitMQ; + using Transport.RabbitMQ.ManagementClient; using ConnectionFactory = Transport.RabbitMQ.ConnectionFactory; /// @@ -31,17 +32,6 @@ public class RabbitMQTransport : TransportDefinition /// The routing topology to use. /// The connection string to use when connecting to the broker. public RabbitMQTransport(RoutingTopology routingTopology, string connectionString) - : this(routingTopology, connectionString, null) - { - } - - /// - /// Creates a new instance of the RabbitMQ transport. - /// - /// The routing topology to use. - /// The connection string to use when connecting to the broker. - /// The connection string to use when connecting to the management API - public RabbitMQTransport(RoutingTopology routingTopology, string connectionString, string managementConnectionString) : base(TransportTransactionMode.ReceiveOnly, supportsDelayedDelivery: true, supportsPublishSubscribe: true, @@ -52,10 +42,6 @@ public RabbitMQTransport(RoutingTopology routingTopology, string connectionStrin RoutingTopology = routingTopology.Create(); BrokerConnectionConfiguration = ConnectionConfiguration.Create(connectionString); - - ManagementConnectionConfiguration = string.IsNullOrEmpty(managementConnectionString) - ? BrokerConnectionConfiguration - : ConnectionConfiguration.Create(managementConnectionString); } /// @@ -65,18 +51,6 @@ public RabbitMQTransport(RoutingTopology routingTopology, string connectionStrin /// The connection string to use when connecting to the broker. /// Should the delayed delivery infrastructure be created by the endpoint public RabbitMQTransport(RoutingTopology routingTopology, string connectionString, bool enableDelayedDelivery) - : this(routingTopology, connectionString, enableDelayedDelivery, null) - { - } - - /// - /// Creates a new instance of the RabbitMQ transport. - /// - /// The routing topology to use. - /// The connection string to use when connecting to the broker. - /// Should the delayed delivery infrastructure be created by the endpoint - /// The connection string to use when connecting to the management API - public RabbitMQTransport(RoutingTopology routingTopology, string connectionString, bool enableDelayedDelivery, string managementConnectionString) : base(TransportTransactionMode.ReceiveOnly, supportsDelayedDelivery: enableDelayedDelivery, supportsPublishSubscribe: true, @@ -87,15 +61,10 @@ public RabbitMQTransport(RoutingTopology routingTopology, string connectionStrin RoutingTopology = routingTopology.Create(); BrokerConnectionConfiguration = ConnectionConfiguration.Create(connectionString); - ManagementConnectionConfiguration = string.IsNullOrEmpty(managementConnectionString) - ? BrokerConnectionConfiguration - : ConnectionConfiguration.Create(managementConnectionString); } internal ConnectionConfiguration BrokerConnectionConfiguration { get; set; } - internal ConnectionConfiguration ManagementConnectionConfiguration { get; set; } - internal IRoutingTopology RoutingTopology { get; set; } /// @@ -175,6 +144,14 @@ public PrefetchCountCalculation PrefetchCountCalculation /// public bool DoNotUseManagementClient { get; set; } = false; + /// + /// Basic authentication HTTP connection string to the RabbitMQ management API. + /// + /// + /// E.g. https://username:password@localhost:15671 + /// + public string ManagementApiUrl { get; set; } + /// /// The interval for heartbeats between the endpoint and the broker. /// @@ -244,13 +221,11 @@ public override async Task Initialize(HostSettings host additionalClusterNodes ); - // Uses the legacy Management API connection string or default to the RabbitMQ broker connection credentials - if (!string.IsNullOrEmpty(LegacyManagementApiConnectionString)) - { - ManagementConnectionConfiguration = ConnectionConfiguration.Create(LegacyManagementApiConnectionString); - } + var managementClient = !string.IsNullOrEmpty(ManagementApiUrl) + ? new ManagementClient(ManagementApiUrl, BrokerConnectionConfiguration.VirtualHost) + : new ManagementClient(BrokerConnectionConfiguration); - var brokerVerifier = new BrokerVerifier(connectionFactory, !DoNotUseManagementClient, ManagementConnectionConfiguration); + var brokerVerifier = new BrokerVerifier(connectionFactory, !DoNotUseManagementClient, managementClient); await brokerVerifier.Initialize(cancellationToken).ConfigureAwait(false); var channelProvider = new ChannelProvider(connectionFactory, NetworkRecoveryInterval, RoutingTopology); @@ -291,7 +266,7 @@ void ValidateAndApplyCertCollections() => certCollection ??= ClientCertificate ! internal string LegacyApiConnectionString { get; set; } - internal string LegacyManagementApiConnectionString { get; set; } + internal string LegacyManagementApiUrl { get; set; } internal Func TopologyFactory { get; set; } @@ -317,10 +292,9 @@ void ValidateAndApplyLegacyConfiguration() RoutingTopology = TopologyFactory(UseDurableExchangesAndQueues); BrokerConnectionConfiguration = ConnectionConfiguration.Create(LegacyApiConnectionString); - // Uses the legacy management API connection string or build the string from the legacy broker connection configuration - ManagementConnectionConfiguration = string.IsNullOrEmpty(LegacyManagementApiConnectionString) - ? BrokerConnectionConfiguration - : ConnectionConfiguration.Create(LegacyManagementApiConnectionString); + ManagementApiUrl = !string.IsNullOrEmpty(LegacyManagementApiUrl) + ? LegacyManagementApiUrl + : ManagementClient.CreateManagementConnectionString(BrokerConnectionConfiguration); } void VaildateTopologyFactory() @@ -338,9 +312,12 @@ void ValidateConnectionString() throw new Exception("A connection string must be configured with 'EndpointConfiguration.UseTransport().ConnectionString()` method."); } - if (!DoNotUseManagementClient && string.IsNullOrEmpty(LegacyManagementApiConnectionString)) + // Todo: Not sure if we should throw here. Even if the LegacyManagementApiUrl is null or empty the default connection + // values could still be tried. Only if the default values fail should an error be thrown that a connection could not + // be made and the ManagementApiUrl should be configured. + if (!DoNotUseManagementClient && string.IsNullOrEmpty(LegacyManagementApiUrl)) { - throw new Exception("A management API connection string must be configured with 'EndpointConfiguration.UseTransport().ManagementConnectionString()` method."); + throw new Exception("A management API connection string must be configured with 'EndpointConfiguration.UseTransport().ManagementApiUrl()` method."); } } } From 593c9aa2c96ac954cf0552f6cf932bafe7f7444f Mon Sep 17 00:00:00 2001 From: Travis Nickels Date: Sun, 12 Jan 2025 23:57:42 -0800 Subject: [PATCH 05/24] Update ManagementClient and brokerVerifier --- .../Administration/BrokerVerifier.cs | 13 +- .../ManagementClient/ManagementClient.cs | 237 ++++++++++++------ 2 files changed, 166 insertions(+), 84 deletions(-) diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/BrokerVerifier.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/BrokerVerifier.cs index 3edb23c23..23660cb96 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Administration/BrokerVerifier.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/BrokerVerifier.cs @@ -5,25 +5,24 @@ namespace NServiceBus.Transport.RabbitMQ; using System; using System.Threading; using System.Threading.Tasks; -using ManagementClient; +using ManagementClientClass = ManagementClient.ManagementClient; using NServiceBus.Logging; +using NServiceBus.Transport.RabbitMQ.ManagementClient; using Polly; -class BrokerVerifier(ConnectionFactory connectionFactory, bool managementClientAvailable, ConnectionConfiguration connectionConfiguration) +class BrokerVerifier(ConnectionFactory connectionFactory, bool managementClientAvailable, ManagementClientClass managementClient) { static readonly ILog Logger = LogManager.GetLogger(typeof(BrokerVerifier)); static readonly Version MinimumSupportedRabbitMqVersion = Version.Parse("3.10.0"); static readonly Version RabbitMqVersion4 = Version.Parse("4.0.0"); - readonly ManagementClient.ManagementClient managementClient = new(connectionConfiguration); - Version? brokerVersion; public async Task Initialize(CancellationToken cancellationToken = default) { if (managementClientAvailable) { - await managementClient.ValidateConnectionConfiguration(cancellationToken).ConfigureAwait(false); + await managementClient.ValidateManagementConnection(cancellationToken).ConfigureAwait(false); var response = await managementClient.GetOverview(cancellationToken).ConfigureAwait(false); if (response.HasValue) { @@ -126,7 +125,7 @@ bool ShouldOverrideDeliveryLimit(Queue queue) return true; } - static async Task GetFullQueueDetails(ManagementClient.ManagementClient managementClient, string queueName, CancellationToken cancellationToken) + static async Task GetFullQueueDetails(ManagementClientClass managementClient, string queueName, CancellationToken cancellationToken) { var retryPolicy = Polly.Policy .HandleResult>(response => response.Value?.EffectivePolicyDefinition is null) @@ -156,7 +155,7 @@ bool ShouldOverrideDeliveryLimit(Queue queue) return response?.Value?.EffectivePolicyDefinition is not null ? response.Value : null; } - static async Task SetDeliveryLimitViaPolicy(ManagementClient.ManagementClient managementClient, Queue queue, Version brokerVersion, CancellationToken cancellationToken) + static async Task SetDeliveryLimitViaPolicy(ManagementClientClass managementClient, Queue queue, Version brokerVersion, CancellationToken cancellationToken) { if (!string.IsNullOrEmpty(queue.AppliedPolicyName)) { diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/ManagementClient.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/ManagementClient.cs index f652f8c4c..4c29f0023 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/ManagementClient.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/ManagementClient.cs @@ -3,6 +3,7 @@ namespace NServiceBus.Transport.RabbitMQ.ManagementClient; using System; +using System.Collections.Generic; using System.Net.Http; using System.Net.Http.Headers; using System.Net.Http.Json; @@ -13,57 +14,96 @@ namespace NServiceBus.Transport.RabbitMQ.ManagementClient; class ManagementClient { HttpClient httpClient; - - public int Port { get; private set; } - public string UserName { get; private set; } - public string Password { get; private set; } - - public readonly string Host; - public readonly bool UseTls; - public readonly string VirtualHost; - - readonly ConnectionConfiguration ConnectionConfiguration; + readonly string virtualHost; readonly string escapedVirtualHost; const int defaultManagementPort = 15672; const int defaultManagementTlsPort = 15671; + const string defaultVirtualHost = "/"; const string defaultUserName = "guest"; const string defaultPassword = "guest"; + public ManagementClient(string managementApiUrl, string virtualHost) + { + ArgumentNullException.ThrowIfNull(managementApiUrl, nameof(managementApiUrl)); + + var managementUri = GenerateUri(managementApiUrl); + + var scheme = managementUri.Scheme; + var host = managementUri.Host; + var port = managementUri.Port; + var userName = managementUri.UserName; + var password = managementUri.Password; + + this.virtualHost = virtualHost; + escapedVirtualHost = Uri.EscapeDataString(virtualHost); + + httpClient = CreateHttpClient(scheme, host, port, userName, password); + } + public ManagementClient(ConnectionConfiguration connectionConfiguration) { ArgumentNullException.ThrowIfNull(connectionConfiguration, nameof(connectionConfiguration)); - ConnectionConfiguration = connectionConfiguration; - Host = ConnectionConfiguration.Host; - VirtualHost = ConnectionConfiguration.VirtualHost; - escapedVirtualHost = Uri.EscapeDataString(VirtualHost); - Port = ConnectionConfiguration.Port; - UserName = ConnectionConfiguration.UserName; - Password = ConnectionConfiguration.Password; - UseTls = ConnectionConfiguration.UseTls; + var scheme = connectionConfiguration.UseTls ? "https" : "http"; + var host = connectionConfiguration.Host ?? "localhost"; + var port = connectionConfiguration.UseTls ? defaultManagementTlsPort : defaultManagementPort; + var userName = connectionConfiguration.UserName ?? defaultUserName; + var password = connectionConfiguration.Password ?? defaultPassword; - httpClient = CreateHttpClient(Port, UserName, Password); + virtualHost = connectionConfiguration.VirtualHost ?? defaultVirtualHost; + escapedVirtualHost = Uri.EscapeDataString(virtualHost); + + httpClient = CreateHttpClient(scheme, host, port, userName, password); } - public async Task ValidateConnectionConfiguration(CancellationToken cancellationToken = default) + // This is used for testing + public ManagementClient(string virtualHost, HttpClient httpClient, string managementApiUrl) { - if (await IsConnectionValid(cancellationToken).ConfigureAwait(false) - || await IsConnectionValidWithDefaultPort(cancellationToken).ConfigureAwait(false) - || await IsConnectionValidWithDefaultAuthorization(cancellationToken).ConfigureAwait(false)) + var managementUri = GenerateUri(managementApiUrl); + + var userName = managementUri.UserName; + var password = managementUri.Password; + + // Clear these values for setting the baseAddress of the httpClient + managementUri.UserName = string.Empty; + managementUri.Password = string.Empty; + + this.httpClient = httpClient; + httpClient.BaseAddress = managementUri.Uri; + SetManagementClientAuthorization(httpClient, userName, password); + + this.virtualHost = virtualHost; + escapedVirtualHost = Uri.EscapeDataString(virtualHost); + } + + public async Task ValidateManagementConnection(CancellationToken cancellationToken = default) + { + // Check broker provided authentication + var (isValid, _) = await IsConnectionValid(cancellationToken).ConfigureAwait(false); + if (isValid) { return; } - SetDefaultManagementPort(); - SetManagementAuthorization(httpClient, defaultUserName, defaultPassword); + // Check default management authentication + SetManagementClientAuthorization(httpClient, defaultUserName, defaultPassword); - if (await IsConnectionValid(cancellationToken).ConfigureAwait(false)) + var (isDefaultValid, exception) = await IsConnectionValid(cancellationToken).ConfigureAwait(false); + if (exception != null || !isDefaultValid) { - return; + throw exception ?? new InvalidOperationException($"Connection to the management API could not be established with the default or provided URL. Update the RabbitMQTransport.ManagementApiUrl with the correct HTTP connection string."); } + } - throw new HttpRequestException("The management connection configuration could not be validated with the supplied connection string or the default values."); + public static string CreateManagementConnectionString(ConnectionConfiguration connectionConfiguration) + { + var scheme = connectionConfiguration.UseTls ? "https" : "http"; + var userName = connectionConfiguration.UserName ?? defaultUserName; + var password = connectionConfiguration.Password ?? defaultPassword; + var host = connectionConfiguration.Host ?? "localhost"; + var port = connectionConfiguration.UseTls ? defaultManagementTlsPort : defaultManagementPort; + return $"{scheme}://{userName}:{password}@{host}:{port}"; } public async Task> GetQueue(string queueName, CancellationToken cancellationToken = default) @@ -123,7 +163,7 @@ public async Task CreatePolicy(Policy policy, CancellationToken cancellationToke { ArgumentNullException.ThrowIfNull(policy, nameof(policy)); - policy.VirtualHost = VirtualHost; + policy.VirtualHost = virtualHost; var escapedPolicyName = Uri.EscapeDataString(policy.Name); var response = await httpClient.PutAsJsonAsync($"api/policies/{escapedVirtualHost}/{escapedPolicyName}", policy, cancellationToken) @@ -132,81 +172,124 @@ public async Task CreatePolicy(Policy policy, CancellationToken cancellationToke response.EnsureSuccessStatusCode(); } - HttpClient CreateHttpClient(int port, string userName, string password) + UriBuilder GenerateUri(string managementApiUrl) { - var uriBuilder = new UriBuilder + var dictionary = ParseManagementApiUrl(managementApiUrl); + + var invalidOptionsMessage = new StringBuilder(); + + var scheme = GetValue(dictionary, "scheme", "http"); + ValidateScheme(scheme, invalidOptionsMessage); + + var useTls = scheme.Equals("https", StringComparison.OrdinalIgnoreCase); + var host = GetValue(dictionary, "host", "localhost"); + var port = GetValue(dictionary, "port", int.TryParse, useTls ? defaultManagementTlsPort : defaultManagementPort, invalidOptionsMessage); + var userName = GetValue(dictionary, "userName", defaultUserName); + var password = GetValue(dictionary, "password", defaultPassword); + + if (invalidOptionsMessage.Length > 0) + { + throw new NotSupportedException(invalidOptionsMessage.ToString().TrimEnd('\r', '\n')); + } + + return new UriBuilder { - Scheme = UseTls ? "https" : "http", - Host = Host, + Scheme = scheme, + Host = host, Port = port, + UserName = userName, + Password = password, }; + } - Port = port; - UserName = userName ?? defaultUserName; - Password = password ?? defaultPassword; - var client = new HttpClient { BaseAddress = uriBuilder.Uri }; - SetManagementAuthorization(client, UserName, Password); - return client; + static void ValidateScheme(string scheme, StringBuilder invalidOptionsMessage) + { + if (!scheme.Equals("http", StringComparison.OrdinalIgnoreCase) && + !scheme.Equals("https", StringComparison.OrdinalIgnoreCase)) + { + _ = invalidOptionsMessage.AppendLine("Invalid scheme for RabbitMQ management API, use either 'http' or 'https' in the ManagementApiUrl"); + } } - void SetManagementAuthorization(HttpClient client, string userName, string password) => - client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue( - "Basic", - Convert.ToBase64String(Encoding.ASCII.GetBytes($"{userName}:{password}"))); + static string GetValue(Dictionary dictionary, string key, string defaultValue) + { + return dictionary.TryGetValue(key, out var value) ? value : defaultValue; + } - void UpdateHttpClientPort(int port) => httpClient = CreateHttpClient(port, UserName, Password); + delegate bool Convert(string input, out T output); - void SetDefaultManagementPort() + static T GetValue(Dictionary dictionary, string key, Convert convert, T defaultValue, StringBuilder invalidOptionsMessage) { - if (ConnectionConfiguration.UseTls) + if (dictionary.TryGetValue(key, out var value)) { - UpdateHttpClientPort(defaultManagementTlsPort); - return; + if (!convert(value, out defaultValue)) + { + invalidOptionsMessage.AppendLine($"'{value}' is not a valid {typeof(T).Name} value for the '{key}' connection string option."); + } } - UpdateHttpClientPort(defaultManagementPort); + + return defaultValue; } - async Task IsConnectionValid(CancellationToken cancellationToken) + HttpClient CreateHttpClient(string scheme, string host, int port, string userName, string password) { - try - { - var request = new HttpRequestMessage(HttpMethod.Head, httpClient.BaseAddress); - var response = await httpClient.SendAsync(request, cancellationToken).ConfigureAwait(false); - return response.IsSuccessStatusCode; - } - catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) - { - throw; - } - catch (Exception) + var uriBuilder = new UriBuilder { - return false; - } + Scheme = scheme, + Host = host, + Port = port, + }; + + var client = new HttpClient { BaseAddress = uriBuilder.Uri }; + SetManagementClientAuthorization(client, userName, password); + return client; } - async Task IsConnectionValidWithDefaultPort(CancellationToken cancellationToken) + static Dictionary ParseManagementApiUrl(string managementApiUrl) { - SetDefaultManagementPort(); - - if (await IsConnectionValid(cancellationToken).ConfigureAwait(false)) + var dictionary = new Dictionary(); + if (!Uri.TryCreate(managementApiUrl, UriKind.Absolute, out var uri)) { - return true; + throw new UriFormatException($"The RabbitMQTransport.ManagementApiUrl is not a valid URI format."); } + var useTls = uri.Scheme.Equals("https", StringComparison.OrdinalIgnoreCase); + + var isPortSpecified = managementApiUrl.Contains($":{uri.Port}"); + var port = !isPortSpecified && uri.IsDefaultPort + ? (useTls ? defaultManagementTlsPort : defaultManagementPort) + : uri.Port; + + var userInfo = uri.UserInfo.Split(':') ?? []; - UpdateHttpClientPort(ConnectionConfiguration.Port); + dictionary.Add("scheme", uri.Scheme); + dictionary.Add("host", uri.Host); + dictionary.Add("port", port.ToString()); + dictionary.Add("userName", userInfo.Length > 0 && !string.IsNullOrEmpty(userInfo[0]) ? userInfo[0] : defaultUserName); + dictionary.Add("password", userInfo.Length > 1 && !string.IsNullOrEmpty(userInfo[1]) ? userInfo[1] : defaultPassword); - return false; + return dictionary; } - async Task IsConnectionValidWithDefaultAuthorization(CancellationToken cancellationToken) + void SetManagementClientAuthorization(HttpClient client, string userName, string password) => + client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue( + "Basic", + Convert.ToBase64String(Encoding.ASCII.GetBytes($"{userName}:{password}"))); + + async Task<(bool IsValid, Exception? ex)> IsConnectionValid(CancellationToken cancellationToken) { - SetManagementAuthorization(httpClient, defaultUserName, defaultPassword); - if (await IsConnectionValid(cancellationToken).ConfigureAwait(false)) + try { - return true; + var request = new HttpRequestMessage(HttpMethod.Head, $"{httpClient.BaseAddress}api/overview"); + var response = await httpClient.SendAsync(request, cancellationToken).ConfigureAwait(false); + return (response.IsSuccessStatusCode, null); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + throw; + } + catch (Exception ex) + { + return (false, ex); } - - SetManagementAuthorization(httpClient, ConnectionConfiguration.UserName, ConnectionConfiguration.Password); - return false; } } From 6af8a66db964d5322908bf17fac5711932cc4ccd Mon Sep 17 00:00:00 2001 From: Travis Nickels Date: Sun, 12 Jan 2025 23:58:21 -0800 Subject: [PATCH 06/24] Remove not needed connection configuration tests --- .../ConnectionConfigurationTests.cs | 242 ++++-------------- 1 file changed, 47 insertions(+), 195 deletions(-) diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/Connection/ConnectionConfigurationTests.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/Connection/ConnectionConfigurationTests.cs index b5bdf4a85..9541b37f2 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/Connection/ConnectionConfigurationTests.cs +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/Connection/ConnectionConfigurationTests.cs @@ -7,8 +7,6 @@ namespace NServiceBus.Transport.RabbitMQ.Tests.ConnectionString using System.Linq; using System.Net.Http; using System.Text; - using System.Threading.Tasks; - using global::RabbitMQ.Client.Exceptions; using NUnit.Framework; using RabbitMQ; @@ -17,19 +15,10 @@ class ConnectionConfigurationTests { const string FakeConnectionString = "virtualHost=Copa;username=Copa;host=192.168.1.1:1234;password=abc_xyz;port=12345;useTls=true"; static string BrokerConnectionString = Environment.GetEnvironmentVariable("RabbitMQTransport_ConnectionString") ?? "host=localhost"; - static string ManagementConnectionString => CreateManagementConnectionString(BrokerConnectionString); static HostSettings HostSettings { get; } = new(nameof(ConnectionConfigurationTests), nameof(ConnectionConfigurationTests), null, null, false); static readonly ConnectionConfiguration brokerDefaults = ConnectionConfiguration.Create("host=localhost"); - static readonly ConnectionConfiguration managementDefaults = ConnectionConfiguration.Create("host=localhost"); - - static string CreateManagementConnectionString(string connectionString) - { - var parameters = connectionString.Split(';').Select(param => param.Split('=')).ToDictionary(parts => parts[0], parts => parts[1]); - parameters["port"] = "15672"; - return string.Join(";", parameters.Select(kv => $"{kv.Key}={kv.Value}")); - } [Test] public void Should_correctly_parse_full_connection_string() @@ -184,233 +173,96 @@ public void Should_list_all_invalid_options() [Test] public void Should_set_default_port() { - Assert.Multiple(() => - { - Assert.That(brokerDefaults.Port, Is.EqualTo(5672)); - Assert.That(managementDefaults.Port, Is.EqualTo(5672)); - }); + Assert.That(brokerDefaults.Port, Is.EqualTo(5672)); } [Test] public void Should_set_default_virtual_host() { - Assert.Multiple(() => - { - Assert.That(brokerDefaults.VirtualHost, Is.EqualTo("/")); - Assert.That(managementDefaults.VirtualHost, Is.EqualTo("/")); - }); + Assert.That(brokerDefaults.VirtualHost, Is.EqualTo("/")); } [Test] public void Should_set_default_username() { - Assert.Multiple(() => - { - Assert.That(brokerDefaults.UserName, Is.EqualTo("guest")); - Assert.That(managementDefaults.UserName, Is.EqualTo("guest")); - }); + Assert.That(brokerDefaults.UserName, Is.EqualTo("guest")); } [Test] public void Should_set_default_password() { - Assert.Multiple(() => - { - Assert.That(brokerDefaults.Password, Is.EqualTo("guest")); - Assert.That(managementDefaults.Password, Is.EqualTo("guest")); - }); + Assert.That(brokerDefaults.Password, Is.EqualTo("guest")); } [Test] public void Should_set_default_use_tls() { - Assert.Multiple(() => - { - Assert.That(brokerDefaults.UseTls, Is.EqualTo(false)); - Assert.That(managementDefaults.UseTls, Is.EqualTo(false)); - }); - } - - [Test] - public void Should_configure_broker_and_management_connection_configurations_with_single_connection_string() - { - var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), "virtualHost=/;host=localhost;username=guest;password=guest;port=5672;useTls=false"); - - Assert.Multiple(() => - { - Assert.That(transport.BrokerConnectionConfiguration.VirtualHost, Is.EqualTo("/")); - Assert.That(transport.BrokerConnectionConfiguration.Host, Is.EqualTo("localhost")); - Assert.That(transport.BrokerConnectionConfiguration.UserName, Is.EqualTo("guest")); - Assert.That(transport.BrokerConnectionConfiguration.Password, Is.EqualTo("guest")); - Assert.That(transport.BrokerConnectionConfiguration.Port, Is.EqualTo(5672)); - Assert.That(transport.BrokerConnectionConfiguration.UseTls, Is.EqualTo(false)); - - Assert.That(transport.ManagementConnectionConfiguration.VirtualHost, Is.EqualTo("/")); - Assert.That(transport.ManagementConnectionConfiguration.Host, Is.EqualTo("localhost")); - Assert.That(transport.ManagementConnectionConfiguration.UserName, Is.EqualTo("guest")); - Assert.That(transport.ManagementConnectionConfiguration.Password, Is.EqualTo("guest")); - Assert.That(transport.ManagementConnectionConfiguration.Port, Is.EqualTo(5672)); // This should be set to the default management port - Assert.That(transport.ManagementConnectionConfiguration.UseTls, Is.EqualTo(false)); - }); - } - - [Test] - public void Should_configure_broker_and_management_connection_configurations_with_respective_connection_strings() - { - var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), "virtualHost=/;host=localhost;username=guest;password=guest;port=5672;useTls=false", FakeConnectionString); - - Assert.Multiple(() => - { - Assert.That(transport.BrokerConnectionConfiguration.VirtualHost, Is.EqualTo("/")); - Assert.That(transport.BrokerConnectionConfiguration.Host, Is.EqualTo("localhost")); - Assert.That(transport.BrokerConnectionConfiguration.UserName, Is.EqualTo("guest")); - Assert.That(transport.BrokerConnectionConfiguration.Password, Is.EqualTo("guest")); - Assert.That(transport.BrokerConnectionConfiguration.Port, Is.EqualTo(5672)); - Assert.That(transport.BrokerConnectionConfiguration.UseTls, Is.EqualTo(false)); - - Assert.That(transport.ManagementConnectionConfiguration.VirtualHost, Is.EqualTo("Copa")); - Assert.That(transport.ManagementConnectionConfiguration.Host, Is.EqualTo("192.168.1.1")); - Assert.That(transport.ManagementConnectionConfiguration.UserName, Is.EqualTo("Copa")); - Assert.That(transport.ManagementConnectionConfiguration.Password, Is.EqualTo("abc_xyz")); - Assert.That(transport.ManagementConnectionConfiguration.Port, Is.EqualTo(1234)); - Assert.That(transport.ManagementConnectionConfiguration.UseTls, Is.EqualTo(true)); - }); + Assert.That(brokerDefaults.UseTls, Is.EqualTo(false)); } - [Test] - public void Should_throw_on_invalid_management_credentials() - { - var invalidManagementConnection = new FakeConnectionConfiguration(ManagementConnectionString) - { - UserName = "Copa" - }; - var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), BrokerConnectionString, invalidManagementConnection.ToConnectionString()); - - var exception = Assert.ThrowsAsync(async () => await transport.Initialize(HostSettings, [], [])) - ?? throw new ArgumentNullException("exception"); - - Assert.That(exception.Message, Does.Contain("Could not access RabbitMQ Management API")); - } - - [Test] - public void Should_throw_on_invalid_management_host() - { - var invalidManagementConnection = new FakeConnectionConfiguration(ManagementConnectionString) - { - Host = "WrongHostName" - }; - - var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), BrokerConnectionString, invalidManagementConnection.ToConnectionString()); - - _ = Assert.ThrowsAsync(async () => await transport.Initialize(HostSettings, [], [])); - } - - [Test] - public void Should_throw_on_invalid_broker_connection_string() - { - var invalidBrokerConnection = new FakeConnectionConfiguration(host: "127.0.0.1", userName: "Copa"); - - var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), invalidBrokerConnection.ToConnectionString(), ManagementConnectionString); - - var exception = Assert.ThrowsAsync(async () => await transport.Initialize(HostSettings, [], [])) - ?? throw new ArgumentNullException("exception"); - - Assert.That(exception.Message, Does.Contain("None of the specified endpoints were reachable")); - } - - [Test] - public void Should_throw_on_invalid_legacy_management_credentials() - { - var invalidManagementConnection = new FakeConnectionConfiguration(ManagementConnectionString) - { - UserName = "Copa" - }; - - // Create transport in legacy mode - var transport = new RabbitMQTransport - { - TopologyFactory = durable => new ConventionalRoutingTopology(durable, QueueType.Quorum), - LegacyApiConnectionString = BrokerConnectionString, - LegacyManagementApiConnectionString = invalidManagementConnection.ToConnectionString() - }; - - var exception = Assert.ThrowsAsync(async () => await transport.Initialize(HostSettings, [], [])); - - Assert.That(exception!.Message, Does.Contain("Could not access RabbitMQ Management API")); - } [Test] public void Should_throw_on_invalid_legacy_management_host() { - var invalidManagementConnection = new FakeConnectionConfiguration(ManagementConnectionString) - { - Host = "WrongHostName" - }; - // Create transport in legacy mode var transport = new RabbitMQTransport { TopologyFactory = durable => new ConventionalRoutingTopology(durable, QueueType.Quorum), LegacyApiConnectionString = BrokerConnectionString, - LegacyManagementApiConnectionString = invalidManagementConnection.ToConnectionString() + LegacyManagementApiUrl = "http://copa:abc123xyz@wronghost:12345" }; _ = Assert.ThrowsAsync(async () => await transport.Initialize(HostSettings, [], [])); } - [Test] - public void Should_throw_on_invalid_legacy_broker_connection_string() - { - var invalidBrokerConnection = new FakeConnectionConfiguration(host: "localhost", port: "5672", virtualHost: "/", userName: "Copa", password: "guest", useTls: "false"); - - // Create transport in legacy mode - var transport = new RabbitMQTransport - { - TopologyFactory = durable => new ConventionalRoutingTopology(durable, QueueType.Quorum), - LegacyApiConnectionString = invalidBrokerConnection.ToConnectionString(), - LegacyManagementApiConnectionString = ManagementConnectionString - }; - - var exception = Assert.ThrowsAsync(async () => await transport.Initialize(HostSettings, [], [])) - ?? throw new ArgumentNullException("exception"); - - Assert.That(exception.Message, Does.Contain("None of the specified endpoints were reachable")); - } - - [Test] - public void Should_connect_to_management_api_with_broker_credentials() - { - var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), BrokerConnectionString); - - Assert.DoesNotThrowAsync(async () => await transport.Initialize(HostSettings, [], [])); - } - - [Test] - public async Task Should_set_default_port_values_for_broker_and_management_connections() - { - var validConnectionWithoutPort = new FakeConnectionConfiguration(BrokerConnectionString) - { - Port = null - }; - - var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), validConnectionWithoutPort.ToConnectionString()); - - _ = await transport.Initialize(HostSettings, [], []); - - Assert.Multiple(() => - { - Assert.That(transport.BrokerConnectionConfiguration.Port, Is.EqualTo(5672)); - Assert.That(transport.ManagementConnectionConfiguration.Port, Is.EqualTo(5672)); - }); - } + // Todo: Need to fix tests checking the legacyManagementApiUrl + // Scenario 1: legacyManagementApiUrl is null or empty + // Scenario 2: legacyManagementApiUrl host is invalid + // Scenario 3: legacyManagementApiUrl port is invalid + // Scenario 4: legacyManagementApiUrl credentials are invalid + + //[Test] + //public void Should_throw_on_invalid_legacy_management_credentials() + //{ + // // Create transport in legacy mode + // var transport = new RabbitMQTransport + // { + // TopologyFactory = durable => new ConventionalRoutingTopology(durable, QueueType.Quorum), + // LegacyApiConnectionString = BrokerConnectionString, + // LegacyManagementApiUrl = "http://copa:abc123xyz@localhost:12345" + // }; + + // var exception = Assert.ThrowsAsync(async () => await transport.Initialize(HostSettings, [], [])); + + // Assert.That(exception!.Message, Does.Contain("Could not access RabbitMQ Management API")); + //} + + //[Test] + //public void Should_throw_on_invalid_legacy_broker_connection_string() + //{ + // var invalidBrokerConnection = new FakeConnectionConfiguration(host: "localhost", port: "5672", virtualHost: "/", userName: "Copa", password: "guest", useTls: "false"); + + // // Create transport in legacy mode + // var transport = new RabbitMQTransport + // { + // TopologyFactory = durable => new ConventionalRoutingTopology(durable, QueueType.Quorum), + // LegacyApiConnectionString = invalidBrokerConnection.ToConnectionString(), + // //LegacyManagementApiUrl = "http://copa:guest@wronghost:12345" + // }; + + // var exception = Assert.ThrowsAsync(async () => await transport.Initialize(HostSettings, [], [])) + // ?? throw new ArgumentNullException("exception"); + + // Assert.That(exception.Message, Does.Contain("None of the specified endpoints were reachable")); + //} [Test] public void Should_not_throw_when_DoNotUseManagementClient_is_enabled_and_management_connection_is_invalid() { var invalidManagementConnection = new FakeConnectionConfiguration(host: "Copa"); - var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), BrokerConnectionString, invalidManagementConnection.ToConnectionString()) + var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), BrokerConnectionString) { DoNotUseManagementClient = true }; From a2eb86362e89e4020b64523a050db890e7dca20e Mon Sep 17 00:00:00 2001 From: Travis Nickels Date: Mon, 13 Jan 2025 00:01:38 -0800 Subject: [PATCH 07/24] Add management client tests --- .../ManagementClientTests.cs | 255 ++++++++++++++---- 1 file changed, 205 insertions(+), 50 deletions(-) diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/ManagementClientTests.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/ManagementClientTests.cs index 9f4d74a98..909b1a39e 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/ManagementClientTests.cs +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/ManagementClientTests.cs @@ -5,21 +5,21 @@ namespace NServiceBus.Transport.RabbitMQ.Tests using System; using System.Collections.Generic; using System.Net; + using System.Net.Http; + using System.Text; + using System.Threading; using System.Threading.Tasks; using NServiceBus.Transport.RabbitMQ.ManagementClient; using NUnit.Framework; using NUnit.Framework.Internal; - using ConnectionFactory = ConnectionFactory; - [TestFixture] class ManagementClientTests { static readonly string connectionString = Environment.GetEnvironmentVariable("RabbitMQTransport_ConnectionString") ?? "host=localhost"; static readonly ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.Create(connectionString); - static readonly ConnectionConfiguration managementConnectionConfiguration = ConnectionConfiguration.Create(connectionString); static readonly ConnectionFactory connectionFactory = new(typeof(ManagementClientTests).FullName, connectionConfiguration, null, false, false, TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(10), []); - static readonly ManagementClient client = new(managementConnectionConfiguration); + ManagementClient managementClient; const int defaultBrokerPort = 5672; const int defaultBrokerTlsPort = 5671; @@ -27,49 +27,139 @@ class ManagementClientTests const int defaultManagementTlsPort = 15671; const string defaultUserName = "guest"; const string defaultPassword = "guest"; + const string defaultVirtualHost = "/"; - static IEnumerable ConnectionConfigurationTestCases() - { - yield return new TestCaseData( - $"Host=wronghost1;VirtualHost=/;Port={defaultBrokerPort};UserName={defaultUserName};Password={defaultPassword};UseTls=False", - defaultManagementPort, - defaultUserName, - defaultPassword); - yield return new TestCaseData( - $"Host=wronghost2;VirtualHost=/;Port={defaultBrokerTlsPort};UserName={defaultUserName};Password={defaultPassword};UseTls=True", - defaultManagementTlsPort, - defaultUserName, - defaultPassword); - yield return new TestCaseData( - $"Host=wronghost3;VirtualHost=/;Port=12345;UserName={defaultUserName};Password={defaultPassword};UseTls=False", - 12345, - defaultUserName, - defaultPassword); - yield return new TestCaseData( - $"Host=wronghost4;VirtualHost=/;Port={defaultBrokerPort};UserName=fakeUser;Password=fakePassword;UseTls=True", - defaultManagementPort, - "fakeUser", - "fakePassword"); - yield return new TestCaseData( - $"Host=wronghost5;VirtualHost=/;Port={defaultBrokerPort};UseTls=True", - defaultManagementPort, - defaultUserName, - defaultPassword); - } - [Test, TestCaseSource(nameof(ConnectionConfigurationTestCases))] - public void ValidateConnectionConfiguration_Should_Set_Default_Port_And_Authorization_Configurations(string connectionString, int expectedPort, string expectedUserName, string expectedPassword) - { - var connectionConfiguration = ConnectionConfiguration.Create(connectionString); - var client = new ManagementClient(connectionConfiguration); - var exception = Assert.ThrowsAsync(async () => await client.ValidateConnectionConfiguration()); + [SetUp] + public void SetUp() + { + var defaultManagementUrl = ManagementClient.CreateManagementConnectionString(connectionConfiguration); + managementClient = new(defaultManagementUrl, defaultVirtualHost); + } - Assert.Multiple(() => - { - Assert.That(exception!.Message, Is.EqualTo("The management connection configuration could not be validated with the supplied connection string or the default values.")); - Assert.That(client.Port, Is.EqualTo(expectedPort)); - Assert.That(client.UserName, Is.EqualTo(expectedUserName)); - Assert.That(client.Password, Is.EqualTo(expectedPassword)); - }); + [Test] + [TestCase("http://localhost", "guest", "guest", "http://localhost:15672")] + [TestCase("http://localhost:15672", "guest", "guest", "http://localhost:15672")] + [TestCase("http://copa:abc123xyz@localhost", "copa", "abc123xyz", "http://localhost:15672")] + [TestCase("http://copa:abc123xyz@localhost", "guest", "guest", "http://localhost:15672")] // The management client will try guest:guest if the provided credentials fail first + [TestCase("http://copa:abc123xyz@localhost:15672", "guest", "guest", "http://localhost:15672")] + [TestCase("http://guest:guest@localhost", "guest", "guest", "http://localhost:15672")] + [TestCase("http://guest:guest@localhost:15672", "guest", "guest", "http://localhost:15672")] + public void ValidateManagementConnection_Should_Not_Throw_With_Default_Management_Api_Connection( + string managementApiUrl, + string expectedUserName, + string expectedPassword, + string expectedUrl) + { + var HttpClient = CreateFakeHttpClient(request => FakeResponses.CheckRequestMessageConnection(request, expectedUserName, expectedPassword, expectedUrl)); + managementClient = CreateManagementClient(managementApiUrl, HttpClient); + + Assert.DoesNotThrowAsync(async () => await managementClient.ValidateManagementConnection()); + } + + [Test] + [TestCase("https://localhost", "guest", "guest", "https://localhost:15671")] + [TestCase("https://localhost:15671", "guest", "guest", "https://localhost:15671")] + [TestCase("https://copa:abc123xyz@localhost", "copa", "abc123xyz", "https://localhost:15671")] + [TestCase("https://copa:abc123xyz@localhost", "guest", "guest", "https://localhost:15671")] // The management client will try guest:guest if the provided credentials fail first + [TestCase("https://guest:guest@localhost", "guest", "guest", "https://localhost:15671")] + [TestCase("https://guest:guest@localhost:15671", "guest", "guest", "https://localhost:15671")] + public void ValidateManagementConnection_Should_Not_Throw_With_Default_Management_Api_Tls_Connection( + string managementApiUrl, + string expectedUserName, + string expectedPassword, + string expectedUrl) + { + var HttpClient = CreateFakeHttpClient(request => FakeResponses.CheckRequestMessageConnection(request, expectedUserName, expectedPassword, expectedUrl)); + var managementClient = CreateManagementClient(managementApiUrl, HttpClient); + + Assert.DoesNotThrowAsync(async () => await managementClient.ValidateManagementConnection()); + } + + [Test] + [TestCase("http://localhost", "admin", "admin")] + [TestCase("http://localhost:15672", "admin", "admin")] + [TestCase("https://localhost:15671", "admin", "admin")] + [TestCase("http://copa:abc123xyz@localhost", "admin", "admin")] + [TestCase("http://guest:guest@localhost", "admin", "admin")] + [TestCase("http://guest:guest@localhost:15672", "admin", "admin")] + [TestCase("https://guest:guest@localhost:15671", "admin", "admin")] + public void ValidateManagementConnection_Should_Throw_With_Invalid_Credentials( + string managementApiUrl, + string expectedUserName, + string expectedPassword) + { + var httpClient = CreateFakeHttpClient(request => FakeResponses.CheckAuthentication(request, expectedUserName, expectedPassword)); + var managementClient = CreateManagementClient(managementApiUrl, httpClient); + + var exception = Assert.ThrowsAsync(async () => await managementClient.ValidateManagementConnection()); + } + + [Test] + [TestCase("host=localhost", "guest", "guest", "http://guest:guest@localhost:15672")] + [TestCase("host=localhost;useTls=true", "guest", "guest", "https://guest:guest@localhost:15671")] + [TestCase("host=localhost;useTls=false", "guest", "guest", "http://guest:guest@localhost:15672")] + [TestCase("host=localhost;port=12345;useTls=true", "guest", "guest", "https://guest:guest@localhost:15671")] + [TestCase("host=localhost;port=12345;useTls=false", "guest", "guest", "http://guest:guest@localhost:15672")] + [TestCase("host=localhost;username=copa;password=abc123xyz;useTls=true", "guest", "guest", "https://copa:abc123xyz@localhost:15671")] + [TestCase("host=localhost;username=copa;password=abc123xyz;useTls=false", "guest", "guest", "http://copa:abc123xyz@localhost:15672")] + [TestCase("host=localhost;username=copa;password=abc123xyz;useTls=true", "copa", "abc123xyz", "https://copa:abc123xyz@localhost:15671")] + [TestCase("host=localhost;username=copa;password=abc123xyz;useTls=false", "copa", "abc123xyz", "http://copa:abc123xyz@localhost:15672")] + [TestCase("host=localhost;username=copa;password=abc123xyz;port=12345;useTls=true", "guest", "guest", "https://copa:abc123xyz@localhost:15671")] + [TestCase("host=localhost;username=copa;password=abc123xyz;port=12345;useTls=false", "guest", "guest", "http://copa:abc123xyz@localhost:15672")] + [TestCase("host=localhost;username=copa;password=abc123xyz;port=12345;useTls=true", "copa", "abc123xyz", "https://copa:abc123xyz@localhost:15671")] + [TestCase("host=localhost;username=copa;password=abc123xyz;port=12345;useTls=false", "copa", "abc123xyz", "http://copa:abc123xyz@localhost:15672")] + public void ValidateManagementConnection_Should_Not_Throw_With_Valid_Or_Default_Broker_ConnectionConfiguration( + string brokerConnectionString, + string expectedUserName, + string expectedPassword, + string expectedUrl) + { + var httpClient = CreateFakeHttpClient(request => FakeResponses.CheckAuthentication(request, expectedUserName, expectedPassword)); + var connectionConfiguration = ConnectionConfiguration.Create(brokerConnectionString); + var managementApiUrl = ManagementClient.CreateManagementConnectionString(connectionConfiguration); + Assert.That(string.Equals(managementApiUrl, expectedUrl), Is.True); + + var managementClient = CreateManagementClient(managementApiUrl, httpClient); + + Assert.DoesNotThrowAsync(async () => await managementClient.ValidateManagementConnection()); + } + + [Test] + [TestCase("host=localhost", "admin", "admin", "http://guest:guest@localhost:15672")] + [TestCase("host=localhost;useTls=true", "admin", "admin", "https://guest:guest@localhost:15671")] + [TestCase("host=localhost;useTls=false", "admin", "admin", "http://guest:guest@localhost:15672")] + [TestCase("host=localhost;port=12345;useTls=true", "admin", "admin", "https://guest:guest@localhost:15671")] + [TestCase("host=localhost;port=12345;useTls=false", "admin", "admin", "http://guest:guest@localhost:15672")] + [TestCase("host=localhost;username=copa;password=abc123xyz;useTls=true", "admin", "admin", "https://copa:abc123xyz@localhost:15671")] + [TestCase("host=localhost;username=copa;password=abc123xyz;useTls=false", "admin", "admin", "http://copa:abc123xyz@localhost:15672")] + [TestCase("host=localhost;username=copa;password=abc123xyz;useTls=true", "admin", "admin", "https://copa:abc123xyz@localhost:15671")] + [TestCase("host=localhost;username=copa;password=abc123xyz;useTls=false", "admin", "admin", "http://copa:abc123xyz@localhost:15672")] + [TestCase("host=localhost;username=copa;password=abc123xyz;port=12345;useTls=true", "admin", "admin", "https://copa:abc123xyz@localhost:15671")] + [TestCase("host=localhost;username=copa;password=abc123xyz;port=12345;useTls=false", "admin", "admin", "http://copa:abc123xyz@localhost:15672")] + [TestCase("host=localhost;username=copa;password=abc123xyz;port=12345;useTls=true", "admin", "admin", "https://copa:abc123xyz@localhost:15671")] + [TestCase("host=localhost;username=copa;password=abc123xyz;port=12345;useTls=false", "admin", "admin", "http://copa:abc123xyz@localhost:15672")] + public void ValidateManagementConnection_Should_Throw_With_Invalid_Management_Credentials_From_Broker_ConnectionConfiguration( + string brokerConnectionString, + string expectedUserName, + string expectedPassword, + string expectedUrl) + { + var httpClient = CreateFakeHttpClient(request => FakeResponses.CheckAuthentication(request, expectedUserName, expectedPassword)); + var connectionConfiguration = ConnectionConfiguration.Create(brokerConnectionString); + var managementApiUrl = ManagementClient.CreateManagementConnectionString(connectionConfiguration); + Assert.That(string.Equals(managementApiUrl, expectedUrl), Is.True); + + var managementClient = CreateManagementClient(managementApiUrl, httpClient); + + var exception = Assert.ThrowsAsync(async () => await managementClient.ValidateManagementConnection()); + } + + [Test] + public void Constructor_Should_Throw_With_Invalid_Scheme() + { + var managementApiUrl = "amqp:guest:guest@localhost:15672"; + + var exception = Assert.Throws(() => managementClient = new(managementApiUrl, defaultVirtualHost)); } [Test] @@ -80,7 +170,7 @@ public async Task GetQueue_Should_Return_Queue_Information_When_Exists() await CreateQuorumQueue(queueName).ConfigureAwait(false); // Act - var response = await client.GetQueue(queueName); + var response = await managementClient.GetQueue(queueName); // Assert Assert.Multiple(() => @@ -95,7 +185,7 @@ public async Task GetQueue_Should_Return_Queue_Information_When_Exists() public async Task GetOverview_Should_Return_Broker_Information() { // Act - var response = await client.GetOverview(); + var response = await managementClient.GetOverview(); // Assert Assert.Multiple(() => @@ -113,7 +203,7 @@ public async Task GetOverview_Should_Return_Broker_Information() public async Task GetFeatureFlags_Should_Return_FeatureFlag_Information() { // Act - var response = await client.GetFeatureFlags(); + var response = await managementClient.GetFeatureFlags(); // Assert Assert.Multiple(() => @@ -147,14 +237,14 @@ public async Task CreatePolicy_With_DeliveryLimit_Should_Be_Applied_To_Quorum_Qu Pattern = queueName, Priority = 100 }; - await client.CreatePolicy(policy); + await managementClient.CreatePolicy(policy); // Assert // It can take some time for updated policies to be applied, so we need to wait. // If this test is randomly failing, consider increasing the delay await Task.Delay(10000); - var response = await client.GetQueue(queueName); + var response = await managementClient.GetQueue(queueName); Assert.Multiple(() => { Assert.That(response.StatusCode, Is.EqualTo(HttpStatusCode.OK)); @@ -172,5 +262,70 @@ static async Task CreateQuorumQueue(string queueName) _ = await channel.QueueDeclareAsync(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments); } + + static string ConvertAuthenticationToBase64String(string userName, string password) + { + return Convert.ToBase64String(Encoding.ASCII.GetBytes($"{userName}:{password}")); + } + + static string DecodeAuthenticationHeader(string authenticationHeader) + { + byte[] data = Convert.FromBase64String(authenticationHeader); + return Encoding.ASCII.GetString(data); + } + + static HttpClient CreateFakeHttpClient(Func fakeResponse) => new(new FakeHttpMessageHandler { FakeResponse = fakeResponse }); + + static ManagementClient CreateManagementClient(string managementApiUrl, HttpClient httpClient) => new(defaultVirtualHost, httpClient, managementApiUrl); + + class FakeHttpMessageHandler : HttpMessageHandler + { + public Func? FakeResponse { get; set; } + + protected override async Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken = default) + { + var response = FakeResponse?.Invoke(request) ?? FakeResponses.NotFound(); + return await Task.FromResult(response); + } + } + + static class FakeResponses + { + public static HttpResponseMessage Valid() => new() + { + StatusCode = HttpStatusCode.OK + }; + + public static HttpResponseMessage Unauthorized() => new() + { + StatusCode = HttpStatusCode.Unauthorized + }; + + public static HttpResponseMessage NotFound() => new() + { + StatusCode = HttpStatusCode.NotFound + }; + + public static HttpResponseMessage CheckRequestMessageConnection(HttpRequestMessage request, string userName, string password, string url) + { + var PathAndQuery = request.RequestUri?.PathAndQuery ?? string.Empty; + var requestUrl = request.RequestUri?.AbsoluteUri.Replace(PathAndQuery, string.Empty); + var credentials = request.Headers.Authorization?.Parameter; + var expectedCredentials = ConvertAuthenticationToBase64String(userName, password); + var isValidCredential = string.Equals(expectedCredentials, credentials); + var isValidBaseAddress = string.Equals(url, requestUrl); + + return !isValidCredential + ? Unauthorized() + : (isValidBaseAddress ? Valid() : NotFound()); + } + + public static HttpResponseMessage CheckAuthentication(HttpRequestMessage request, string userName, string password) + { + var credentials = request.Headers.Authorization?.Parameter; + var expectedCredentials = ConvertAuthenticationToBase64String(userName, password); + return string.Equals(expectedCredentials, credentials) ? Valid() : Unauthorized(); + } + } } } From 9c6b722d3cbef9303169de5a349e3e4b91686753 Mon Sep 17 00:00:00 2001 From: Travis Nickels Date: Mon, 13 Jan 2025 00:02:07 -0800 Subject: [PATCH 08/24] Update approval file --- .../ApprovalFiles/APIApprovals.Approve.approved.txt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt b/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt index a690d0f59..623f8cc4f 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt @@ -18,11 +18,10 @@ namespace NServiceBus { public RabbitMQTransport(NServiceBus.RoutingTopology routingTopology, string connectionString) { } public RabbitMQTransport(NServiceBus.RoutingTopology routingTopology, string connectionString, bool enableDelayedDelivery) { } - public RabbitMQTransport(NServiceBus.RoutingTopology routingTopology, string connectionString, string managementConnectionString) { } - public RabbitMQTransport(NServiceBus.RoutingTopology routingTopology, string connectionString, bool enableDelayedDelivery, string managementConnectionString) { } public System.Security.Cryptography.X509Certificates.X509Certificate2 ClientCertificate { get; set; } public bool DoNotUseManagementClient { get; set; } public System.TimeSpan HeartbeatInterval { get; set; } + public string ManagementApiUrl { get; set; } public System.Func MessageIdStrategy { get; set; } public System.TimeSpan NetworkRecoveryInterval { get; set; } public System.Action OutgoingNativeMessageCustomization { get; set; } From 43deb81a5af9e2df9e2a143e11561a7734617f1d Mon Sep 17 00:00:00 2001 From: Travis Nickels Date: Thu, 16 Jan 2025 00:59:40 -0800 Subject: [PATCH 09/24] Update management client models --- .../Administration/ManagementClient/Models/Queue.cs | 2 +- .../Administration/ManagementClient/Models/QueueArguments.cs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/Queue.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/Queue.cs index 9fdd72001..17ab2fff0 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/Queue.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/Queue.cs @@ -33,5 +33,5 @@ class Queue() public string? AppliedOperatorPolicyName { get; set; } [JsonExtensionData] - public IDictionary ExtraProperties { get; } = new Dictionary(); + public IDictionary ExtraProperties { get; init; } = new Dictionary(); } \ No newline at end of file diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/QueueArguments.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/QueueArguments.cs index ecc99d1be..17edf049a 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/QueueArguments.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/QueueArguments.cs @@ -12,11 +12,11 @@ class QueueArguments [JsonConverter(typeof(QueueTypeConverter))] public QueueType? QueueType { get; set; } - [JsonPropertyName("x-delivery-limit")] + [JsonPropertyName("delivery_limit")] [JsonConverter(typeof(DeliveryLimitConverter))] public int? DeliveryLimit { get; set; } [JsonExtensionData] - public IDictionary ExtraProperties { get; } = new Dictionary(); + public IDictionary ExtraProperties { get; init; } = new Dictionary(); } From dede7aaee29cd991f4b33052ddd0a06f1dc62672 Mon Sep 17 00:00:00 2001 From: Travis Nickels Date: Thu, 16 Jan 2025 02:14:37 -0800 Subject: [PATCH 10/24] Remove connection validation methods --- .../ManagementClient/ManagementClient.cs | 37 ------------------- 1 file changed, 37 deletions(-) diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/ManagementClient.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/ManagementClient.cs index 4c29f0023..6125fc481 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/ManagementClient.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/ManagementClient.cs @@ -77,25 +77,6 @@ public ManagementClient(string virtualHost, HttpClient httpClient, string manage escapedVirtualHost = Uri.EscapeDataString(virtualHost); } - public async Task ValidateManagementConnection(CancellationToken cancellationToken = default) - { - // Check broker provided authentication - var (isValid, _) = await IsConnectionValid(cancellationToken).ConfigureAwait(false); - if (isValid) - { - return; - } - - // Check default management authentication - SetManagementClientAuthorization(httpClient, defaultUserName, defaultPassword); - - var (isDefaultValid, exception) = await IsConnectionValid(cancellationToken).ConfigureAwait(false); - if (exception != null || !isDefaultValid) - { - throw exception ?? new InvalidOperationException($"Connection to the management API could not be established with the default or provided URL. Update the RabbitMQTransport.ManagementApiUrl with the correct HTTP connection string."); - } - } - public static string CreateManagementConnectionString(ConnectionConfiguration connectionConfiguration) { var scheme = connectionConfiguration.UseTls ? "https" : "http"; @@ -274,22 +255,4 @@ void SetManagementClientAuthorization(HttpClient client, string userName, string client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue( "Basic", Convert.ToBase64String(Encoding.ASCII.GetBytes($"{userName}:{password}"))); - - async Task<(bool IsValid, Exception? ex)> IsConnectionValid(CancellationToken cancellationToken) - { - try - { - var request = new HttpRequestMessage(HttpMethod.Head, $"{httpClient.BaseAddress}api/overview"); - var response = await httpClient.SendAsync(request, cancellationToken).ConfigureAwait(false); - return (response.IsSuccessStatusCode, null); - } - catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) - { - throw; - } - catch (Exception ex) - { - return (false, ex); - } - } } From 3b6d064d00e4a4d59520a94708b5c34cc3ec936c Mon Sep 17 00:00:00 2001 From: Travis Nickels Date: Thu, 16 Jan 2025 02:18:10 -0800 Subject: [PATCH 11/24] Move FakeHttpClient --- .../FakeHttpClient.cs | 135 ++++++++++++++++++ .../ManagementClientTests.cs | 63 -------- 2 files changed, 135 insertions(+), 63 deletions(-) create mode 100644 src/NServiceBus.Transport.RabbitMQ.Tests/FakeHttpClient.cs diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/FakeHttpClient.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/FakeHttpClient.cs new file mode 100644 index 000000000..cef5a6140 --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/FakeHttpClient.cs @@ -0,0 +1,135 @@ +#nullable enable + +namespace NServiceBus.Transport.RabbitMQ.Tests +{ + using System; + using System.Net.Http; + using System.Net; + using System.Text; + using System.Threading; + using System.Threading.Tasks; + using ManagementClient; + using Queue = ManagementClient.Queue; + using System.Text.Json; + + class FakeHttpClient + { + public bool Testing { get; set; } + + public static string ConvertAuthenticationToBase64String(string userName, string password) => + Convert.ToBase64String(Encoding.ASCII.GetBytes($"{userName}:{password}")); + + public static string DecodeAuthenticationHeader(string authenticationHeader) + { + var data = Convert.FromBase64String(authenticationHeader); + return Encoding.ASCII.GetString(data); + } + + public static HttpClient CreateFakeHttpClient(Func fakeResponse) => new(new FakeHttpMessageHandler { FakeResponse = fakeResponse }, true); + + public class FakeHttpMessageHandler : HttpMessageHandler + { + public Func? FakeResponse { get; set; } + + protected override async Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken = default) + { + //request?.RequestUri?.PathAndQuery.Contains("api/policies/"); + var response = FakeResponse?.Invoke(request) ?? FakeResponses.NotFound(); + return await Task.FromResult(response); + } + } + + public static class FakeResponses + { + public static HttpResponseMessage Valid() => new() + { + StatusCode = HttpStatusCode.OK + }; + + public static HttpResponseMessage Unauthorized() => new() + { + StatusCode = HttpStatusCode.Unauthorized + }; + + public static HttpResponseMessage NotFound() => new() + { + StatusCode = HttpStatusCode.NotFound + }; + + public static HttpResponseMessage GetQueue(QueueType queueType = QueueType.Quorum, int deliveryLimit = 20) + { + var queue = new Queue + { + Name = "test", + Arguments = new QueueArguments() + { + QueueType = queueType, + }, + DeliveryLimit = deliveryLimit + }; + + var json = JsonSerializer.Serialize(queue); + var httpContent = new StringContent(json); + var response = new HttpResponseMessage + { + StatusCode = HttpStatusCode.OK, + Content = httpContent + }; + return response; + } + + public static HttpResponseMessage GetOverview( + HttpRequestMessage request, + string? expectedUserName = null, + string? expectedPassword = null, + string? expectedUrl = null) + { + + var response = CheckRequestMessageConnection(request, expectedUserName, expectedPassword, expectedUrl); + if (response.StatusCode != HttpStatusCode.OK) + { + return response; + } + var overview = new Overview + { + ClusterName = "rabbit@my - rabbit", + ProductName = "RabbitMQ", + ProductVersion = new Version(4, 0, 3), + ManagementVersion = new Version(4, 0, 3), + RabbitMqVersion = new Version(4, 0, 3), + Node = "rabbit@my-rabbit" + }; + + var json = JsonSerializer.Serialize(overview); + var httpContent = new StringContent(json); + response.Content = httpContent; + return response; + } + + public static HttpResponseMessage CheckRequestMessageConnection( + HttpRequestMessage request, + string? expectedUserName = null, + string? expectedPassword = null, + string? expectedUrl = null) + { + var PathAndQuery = request.RequestUri?.PathAndQuery ?? string.Empty; + var requestUrl = request.RequestUri?.AbsoluteUri.Replace(PathAndQuery, string.Empty); + var credentials = request.Headers.Authorization?.Parameter; + var expectedCredentials = ConvertAuthenticationToBase64String(expectedUserName ?? "guest", expectedPassword ?? "guest"); + var isValidCredential = string.Equals(expectedCredentials, credentials); + var isValidBaseAddress = string.Equals(expectedUrl ?? "http://localhost:15672", requestUrl); + + return !isValidCredential + ? Unauthorized() + : (isValidBaseAddress ? Valid() : NotFound()); + } + + public static HttpResponseMessage CheckAuthentication(HttpRequestMessage request, string userName, string password) + { + var credentials = request.Headers.Authorization?.Parameter; + var expectedCredentials = ConvertAuthenticationToBase64String(userName, password); + return string.Equals(expectedCredentials, credentials) ? Valid() : Unauthorized(); + } + } + } +} diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/ManagementClientTests.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/ManagementClientTests.cs index 909b1a39e..45c833bdc 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/ManagementClientTests.cs +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/ManagementClientTests.cs @@ -263,69 +263,6 @@ static async Task CreateQuorumQueue(string queueName) _ = await channel.QueueDeclareAsync(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments); } - static string ConvertAuthenticationToBase64String(string userName, string password) - { - return Convert.ToBase64String(Encoding.ASCII.GetBytes($"{userName}:{password}")); - } - - static string DecodeAuthenticationHeader(string authenticationHeader) - { - byte[] data = Convert.FromBase64String(authenticationHeader); - return Encoding.ASCII.GetString(data); - } - - static HttpClient CreateFakeHttpClient(Func fakeResponse) => new(new FakeHttpMessageHandler { FakeResponse = fakeResponse }); - static ManagementClient CreateManagementClient(string managementApiUrl, HttpClient httpClient) => new(defaultVirtualHost, httpClient, managementApiUrl); - - class FakeHttpMessageHandler : HttpMessageHandler - { - public Func? FakeResponse { get; set; } - - protected override async Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken = default) - { - var response = FakeResponse?.Invoke(request) ?? FakeResponses.NotFound(); - return await Task.FromResult(response); - } - } - - static class FakeResponses - { - public static HttpResponseMessage Valid() => new() - { - StatusCode = HttpStatusCode.OK - }; - - public static HttpResponseMessage Unauthorized() => new() - { - StatusCode = HttpStatusCode.Unauthorized - }; - - public static HttpResponseMessage NotFound() => new() - { - StatusCode = HttpStatusCode.NotFound - }; - - public static HttpResponseMessage CheckRequestMessageConnection(HttpRequestMessage request, string userName, string password, string url) - { - var PathAndQuery = request.RequestUri?.PathAndQuery ?? string.Empty; - var requestUrl = request.RequestUri?.AbsoluteUri.Replace(PathAndQuery, string.Empty); - var credentials = request.Headers.Authorization?.Parameter; - var expectedCredentials = ConvertAuthenticationToBase64String(userName, password); - var isValidCredential = string.Equals(expectedCredentials, credentials); - var isValidBaseAddress = string.Equals(url, requestUrl); - - return !isValidCredential - ? Unauthorized() - : (isValidBaseAddress ? Valid() : NotFound()); - } - - public static HttpResponseMessage CheckAuthentication(HttpRequestMessage request, string userName, string password) - { - var credentials = request.Headers.Authorization?.Parameter; - var expectedCredentials = ConvertAuthenticationToBase64String(userName, password); - return string.Equals(expectedCredentials, credentials) ? Valid() : Unauthorized(); - } - } } } From 87dbfad3588a1f20b7cb1ff2aedc545e511810d9 Mon Sep 17 00:00:00 2001 From: Travis Nickels Date: Thu, 16 Jan 2025 02:18:49 -0800 Subject: [PATCH 12/24] Update logger for testing BrokerVerifier --- .../Administration/BrokerVerifier.cs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/BrokerVerifier.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/BrokerVerifier.cs index 23660cb96..ee90a431a 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Administration/BrokerVerifier.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/BrokerVerifier.cs @@ -10,9 +10,9 @@ namespace NServiceBus.Transport.RabbitMQ; using NServiceBus.Transport.RabbitMQ.ManagementClient; using Polly; -class BrokerVerifier(ConnectionFactory connectionFactory, bool managementClientAvailable, ManagementClientClass managementClient) +class BrokerVerifier(ConnectionFactory connectionFactory, bool managementClientAvailable, ManagementClientClass managementClient, ILog? logger = null) { - static readonly ILog Logger = LogManager.GetLogger(typeof(BrokerVerifier)); + readonly ILog Logger = logger ?? LogManager.GetLogger(typeof(BrokerVerifier)); static readonly Version MinimumSupportedRabbitMqVersion = Version.Parse("3.10.0"); static readonly Version RabbitMqVersion4 = Version.Parse("4.0.0"); @@ -22,7 +22,6 @@ public async Task Initialize(CancellationToken cancellationToken = default) { if (managementClientAvailable) { - await managementClient.ValidateManagementConnection(cancellationToken).ConfigureAwait(false); var response = await managementClient.GetOverview(cancellationToken).ConfigureAwait(false); if (response.HasValue) { @@ -125,7 +124,7 @@ bool ShouldOverrideDeliveryLimit(Queue queue) return true; } - static async Task GetFullQueueDetails(ManagementClientClass managementClient, string queueName, CancellationToken cancellationToken) + async Task GetFullQueueDetails(ManagementClientClass managementClient, string queueName, CancellationToken cancellationToken) { var retryPolicy = Polly.Policy .HandleResult>(response => response.Value?.EffectivePolicyDefinition is null) From e710dc2d8bb703d81b062ef4af865c951c3539d9 Mon Sep 17 00:00:00 2001 From: Travis Nickels Date: Thu, 16 Jan 2025 02:19:19 -0800 Subject: [PATCH 13/24] Add BrokerVerifier Tests --- .../BrokerVerifierTests.cs | 144 ++++++++++++++++++ 1 file changed, 144 insertions(+) create mode 100644 src/NServiceBus.Transport.RabbitMQ.Tests/BrokerVerifierTests.cs diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/BrokerVerifierTests.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/BrokerVerifierTests.cs new file mode 100644 index 000000000..a511bfaf3 --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/BrokerVerifierTests.cs @@ -0,0 +1,144 @@ +#nullable enable + +namespace NServiceBus.Transport.RabbitMQ.Tests +{ + using System; + using NUnit.Framework; + using ManagementClient; + using NServiceBus.Logging; + using System.Collections.Generic; + using System.Linq; + using ConnectionFactory = ConnectionFactory; + using System.Threading.Tasks; + using System.Diagnostics; + using System.Net; + + [TestFixture] + class BrokerVerifierTests + { + static readonly string connectionString = Environment.GetEnvironmentVariable("RabbitMQTransport_ConnectionString") ?? "host=localhost"; + static readonly ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.Create(connectionString); + static readonly ConnectionFactory connectionFactory = new(typeof(ManagementClientTests).FullName, connectionConfiguration, null, false, false, TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(10), []); + + [Test] + public void Initialize_Should_Get_Response_When_Management_Client_Is_Available_And_Valid() + { + var managementClient = new ManagementClient(connectionConfiguration); + var brokerVerifier = new BrokerVerifier(connectionFactory, true, managementClient); + + Assert.DoesNotThrowAsync(async () => await brokerVerifier.Initialize()); + } + + [Test] + public void Initialize_Should_Should_Warn_When_Management_Is_Disabled_With_Version_4_Or_Greater() + { + var managementClient = new ManagementClient(connectionConfiguration); + var fakeLogger = new FakeLogger(); + var brokerVerifier = new BrokerVerifier(connectionFactory, false, managementClient, fakeLogger); + + Assert.DoesNotThrowAsync(async () => await brokerVerifier.Initialize()); + + Assert.That(fakeLogger.Messages, Has.Exactly(1).Items); + Assert.That(fakeLogger.Messages.FirstOrDefault(), Does.Contain("Use of RabbitMQ Management API has been disabled.")); + } + + [Test] + public async Task ValidateDeliveryLimit_Should_Set_Delivery_Limit_Policy() + { + var queueName = nameof(ValidateDeliveryLimit_Should_Set_Delivery_Limit_Policy); + var policyName = $"nsb.{queueName}.delivery-limit"; + await CreateQuorumQueue(queueName); + var managementClient = new ManagementClient(connectionConfiguration); + var brokerVerifier = new BrokerVerifier(connectionFactory, true, managementClient); + + await brokerVerifier.Initialize(); + await brokerVerifier.ValidateDeliveryLimit(queueName); + + var maxWaitTime = TimeSpan.FromSeconds(30); + var pollingInterval = TimeSpan.FromSeconds(2); + var stopwatch = Stopwatch.StartNew(); + while (stopwatch.Elapsed < maxWaitTime) + { + var response = await managementClient.GetQueue(queueName); + if (response.StatusCode == HttpStatusCode.OK + && response.Value is not null + && response.Value.EffectivePolicyDefinition is not null + && response.Value.DeliveryLimit.Equals(-1) + && response.Value.AppliedPolicyName == policyName + && response.Value.EffectivePolicyDefinition.DeliveryLimit == -1) + { + // Policy applied successfully + return; + } + await Task.Delay(pollingInterval); + } + + Assert.Fail($"Policy '{policyName}' was not applied to queue '{queueName}' within {maxWaitTime.TotalSeconds} seconds."); + } + + [Test] + public async Task ValidateDeliveryLimit_Should_Throw_When_Queue_Argument_Delivery_Limit_Set() + { + var queueName = nameof(ValidateDeliveryLimit_Should_Throw_When_Queue_Argument_Delivery_Limit_Set); + await CreateQuorumQueueWithDeliveryLimit(queueName, 5); + var managementClient = new ManagementClient(connectionConfiguration); + var brokerVerifier = new BrokerVerifier(connectionFactory, true, managementClient); + + await brokerVerifier.Initialize(); + + _ = Assert.ThrowsAsync(async () => await brokerVerifier.ValidateDeliveryLimit(queueName)); + } + + static async Task CreateQuorumQueue(string queueName) + { + using var connection = await connectionFactory.CreateConnection($"{queueName} connection").ConfigureAwait(false); + using var channel = await connection.CreateChannelAsync().ConfigureAwait(false); + var arguments = new Dictionary { { "x-queue-type", "quorum" } }; + + _ = await channel.QueueDeclareAsync(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments); + } + + static async Task CreateQuorumQueueWithDeliveryLimit(string queueName, int deliveryLimit) + { + using var connection = await connectionFactory.CreateConnection($"{queueName} connection").ConfigureAwait(false); + using var channel = await connection.CreateChannelAsync().ConfigureAwait(false); + var arguments = new Dictionary { { "x-queue-type", "quorum" }, { "delivery_limit", deliveryLimit } }; + + _ = await channel.QueueDeclareAsync(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments); + } + } + + + + class FakeLogger : ILog + { + public List Messages { get; } = []; + + public bool IsDebugEnabled => throw new NotImplementedException(); + + public bool IsInfoEnabled => throw new NotImplementedException(); + + public bool IsWarnEnabled => throw new NotImplementedException(); + + public bool IsErrorEnabled => throw new NotImplementedException(); + + public bool IsFatalEnabled => throw new NotImplementedException(); + + public void Debug(string message, Exception exception) => throw new NotImplementedException(); + public void DebugFormat(string format, params object[] args) => throw new NotImplementedException(); + public void Info(string message, Exception exception) => throw new NotImplementedException(); + public void InfoFormat(string format, params object[] args) => throw new NotImplementedException(); + public void Warn(string message, Exception exception) => Messages.Add(message); + public void WarnFormat(string format, params object[] args) => Messages.Add(format); + public void Error(string message, Exception exception) => throw new NotImplementedException(); + public void ErrorFormat(string format, params object[] args) => throw new NotImplementedException(); + public void Fatal(string message, Exception exception) => throw new NotImplementedException(); + public void FatalFormat(string format, params object[] args) => throw new NotImplementedException(); + public void Debug(string message) => throw new NotImplementedException(); + public void Info(string message) => throw new NotImplementedException(); + public void Error(string message) => Messages.Add(message); + public void Fatal(string message) => throw new NotImplementedException(); + public void Warn(string message) => Messages.Add(message); + } + +} From c9d740e45c50d362b69c5eb765778c89325c55b7 Mon Sep 17 00:00:00 2001 From: Travis Nickels Date: Thu, 16 Jan 2025 02:19:47 -0800 Subject: [PATCH 14/24] Update ManagementClient tests --- .../ManagementClientTests.cs | 164 ++++++------------ 1 file changed, 50 insertions(+), 114 deletions(-) diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/ManagementClientTests.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/ManagementClientTests.cs index 45c833bdc..b90fa83e0 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/ManagementClientTests.cs +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/ManagementClientTests.cs @@ -4,14 +4,14 @@ namespace NServiceBus.Transport.RabbitMQ.Tests { using System; using System.Collections.Generic; + using System.Diagnostics; using System.Net; using System.Net.Http; - using System.Text; - using System.Threading; using System.Threading.Tasks; using NServiceBus.Transport.RabbitMQ.ManagementClient; using NUnit.Framework; using NUnit.Framework.Internal; + using static NServiceBus.Transport.RabbitMQ.Tests.FakeHttpClient; [TestFixture] class ManagementClientTests @@ -19,7 +19,8 @@ class ManagementClientTests static readonly string connectionString = Environment.GetEnvironmentVariable("RabbitMQTransport_ConnectionString") ?? "host=localhost"; static readonly ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.Create(connectionString); static readonly ConnectionFactory connectionFactory = new(typeof(ManagementClientTests).FullName, connectionConfiguration, null, false, false, TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(10), []); - ManagementClient managementClient; + string defaultManagementUrl; + ManagementClient? managementClient; const int defaultBrokerPort = 5672; const int defaultBrokerTlsPort = 5671; @@ -30,132 +31,55 @@ class ManagementClientTests const string defaultVirtualHost = "/"; [SetUp] - public void SetUp() - { - var defaultManagementUrl = ManagementClient.CreateManagementConnectionString(connectionConfiguration); - managementClient = new(defaultManagementUrl, defaultVirtualHost); - } + public void SetUp() => defaultManagementUrl = ManagementClient.CreateManagementConnectionString(connectionConfiguration); [Test] [TestCase("http://localhost", "guest", "guest", "http://localhost:15672")] + [TestCase("https://localhost", "guest", "guest", "https://localhost:15671")] [TestCase("http://localhost:15672", "guest", "guest", "http://localhost:15672")] - [TestCase("http://copa:abc123xyz@localhost", "copa", "abc123xyz", "http://localhost:15672")] - [TestCase("http://copa:abc123xyz@localhost", "guest", "guest", "http://localhost:15672")] // The management client will try guest:guest if the provided credentials fail first - [TestCase("http://copa:abc123xyz@localhost:15672", "guest", "guest", "http://localhost:15672")] + [TestCase("https://localhost:15671", "guest", "guest", "https://localhost:15671")] [TestCase("http://guest:guest@localhost", "guest", "guest", "http://localhost:15672")] + [TestCase("https://guest:guest@localhost", "guest", "guest", "https://localhost:15671")] [TestCase("http://guest:guest@localhost:15672", "guest", "guest", "http://localhost:15672")] - public void ValidateManagementConnection_Should_Not_Throw_With_Default_Management_Api_Connection( + [TestCase("https://guest:guest@localhost:15671", "guest", "guest", "https://localhost:15671")] + public async Task GetOverview_Should_Return_Success_With_Valid_Default_Connection_Values( string managementApiUrl, string expectedUserName, string expectedPassword, string expectedUrl) { - var HttpClient = CreateFakeHttpClient(request => FakeResponses.CheckRequestMessageConnection(request, expectedUserName, expectedPassword, expectedUrl)); + var HttpClient = CreateFakeHttpClient(request => FakeResponses.GetOverview(request, expectedUserName, expectedPassword, expectedUrl)); managementClient = CreateManagementClient(managementApiUrl, HttpClient); - Assert.DoesNotThrowAsync(async () => await managementClient.ValidateManagementConnection()); - } + var result = await managementClient.GetOverview(); - [Test] - [TestCase("https://localhost", "guest", "guest", "https://localhost:15671")] - [TestCase("https://localhost:15671", "guest", "guest", "https://localhost:15671")] - [TestCase("https://copa:abc123xyz@localhost", "copa", "abc123xyz", "https://localhost:15671")] - [TestCase("https://copa:abc123xyz@localhost", "guest", "guest", "https://localhost:15671")] // The management client will try guest:guest if the provided credentials fail first - [TestCase("https://guest:guest@localhost", "guest", "guest", "https://localhost:15671")] - [TestCase("https://guest:guest@localhost:15671", "guest", "guest", "https://localhost:15671")] - public void ValidateManagementConnection_Should_Not_Throw_With_Default_Management_Api_Tls_Connection( - string managementApiUrl, - string expectedUserName, - string expectedPassword, - string expectedUrl) - { - var HttpClient = CreateFakeHttpClient(request => FakeResponses.CheckRequestMessageConnection(request, expectedUserName, expectedPassword, expectedUrl)); - var managementClient = CreateManagementClient(managementApiUrl, HttpClient); - - Assert.DoesNotThrowAsync(async () => await managementClient.ValidateManagementConnection()); + Assert.That(result.StatusCode, Is.EqualTo(HttpStatusCode.OK)); } [Test] - [TestCase("http://localhost", "admin", "admin")] - [TestCase("http://localhost:15672", "admin", "admin")] - [TestCase("https://localhost:15671", "admin", "admin")] - [TestCase("http://copa:abc123xyz@localhost", "admin", "admin")] - [TestCase("http://guest:guest@localhost", "admin", "admin")] - [TestCase("http://guest:guest@localhost:15672", "admin", "admin")] - [TestCase("https://guest:guest@localhost:15671", "admin", "admin")] - public void ValidateManagementConnection_Should_Throw_With_Invalid_Credentials( + [TestCase("http://localhost", "user", "password", "http://localhost:15672")] + [TestCase("https://localhost", "user", "password", "https://localhost:15671")] + [TestCase("http://localhost:15672", "user", "password", "http://localhost:15672")] + [TestCase("https://localhost:15671", "user", "password", "https://localhost:15671")] + [TestCase("http://guest:guest@localhost", "user", "password", "http://localhost:15672")] + [TestCase("https://guest:guest@localhost", "user", "password", "https://localhost:15671")] + [TestCase("http://guest:guest@localhost:15672", "user", "password", "http://localhost:15672")] + [TestCase("https://guest:guest@localhost:15671", "user", "password", "https://localhost:15671")] + public async Task GetOverview_Should_Return_Unauthorized_With_Invalid_Credentials( string managementApiUrl, string expectedUserName, - string expectedPassword) - { - var httpClient = CreateFakeHttpClient(request => FakeResponses.CheckAuthentication(request, expectedUserName, expectedPassword)); - var managementClient = CreateManagementClient(managementApiUrl, httpClient); - - var exception = Assert.ThrowsAsync(async () => await managementClient.ValidateManagementConnection()); - } - - [Test] - [TestCase("host=localhost", "guest", "guest", "http://guest:guest@localhost:15672")] - [TestCase("host=localhost;useTls=true", "guest", "guest", "https://guest:guest@localhost:15671")] - [TestCase("host=localhost;useTls=false", "guest", "guest", "http://guest:guest@localhost:15672")] - [TestCase("host=localhost;port=12345;useTls=true", "guest", "guest", "https://guest:guest@localhost:15671")] - [TestCase("host=localhost;port=12345;useTls=false", "guest", "guest", "http://guest:guest@localhost:15672")] - [TestCase("host=localhost;username=copa;password=abc123xyz;useTls=true", "guest", "guest", "https://copa:abc123xyz@localhost:15671")] - [TestCase("host=localhost;username=copa;password=abc123xyz;useTls=false", "guest", "guest", "http://copa:abc123xyz@localhost:15672")] - [TestCase("host=localhost;username=copa;password=abc123xyz;useTls=true", "copa", "abc123xyz", "https://copa:abc123xyz@localhost:15671")] - [TestCase("host=localhost;username=copa;password=abc123xyz;useTls=false", "copa", "abc123xyz", "http://copa:abc123xyz@localhost:15672")] - [TestCase("host=localhost;username=copa;password=abc123xyz;port=12345;useTls=true", "guest", "guest", "https://copa:abc123xyz@localhost:15671")] - [TestCase("host=localhost;username=copa;password=abc123xyz;port=12345;useTls=false", "guest", "guest", "http://copa:abc123xyz@localhost:15672")] - [TestCase("host=localhost;username=copa;password=abc123xyz;port=12345;useTls=true", "copa", "abc123xyz", "https://copa:abc123xyz@localhost:15671")] - [TestCase("host=localhost;username=copa;password=abc123xyz;port=12345;useTls=false", "copa", "abc123xyz", "http://copa:abc123xyz@localhost:15672")] - public void ValidateManagementConnection_Should_Not_Throw_With_Valid_Or_Default_Broker_ConnectionConfiguration( - string brokerConnectionString, - string expectedUserName, - string expectedPassword, - string expectedUrl) - { - var httpClient = CreateFakeHttpClient(request => FakeResponses.CheckAuthentication(request, expectedUserName, expectedPassword)); - var connectionConfiguration = ConnectionConfiguration.Create(brokerConnectionString); - var managementApiUrl = ManagementClient.CreateManagementConnectionString(connectionConfiguration); - Assert.That(string.Equals(managementApiUrl, expectedUrl), Is.True); - - var managementClient = CreateManagementClient(managementApiUrl, httpClient); - - Assert.DoesNotThrowAsync(async () => await managementClient.ValidateManagementConnection()); - } - - [Test] - [TestCase("host=localhost", "admin", "admin", "http://guest:guest@localhost:15672")] - [TestCase("host=localhost;useTls=true", "admin", "admin", "https://guest:guest@localhost:15671")] - [TestCase("host=localhost;useTls=false", "admin", "admin", "http://guest:guest@localhost:15672")] - [TestCase("host=localhost;port=12345;useTls=true", "admin", "admin", "https://guest:guest@localhost:15671")] - [TestCase("host=localhost;port=12345;useTls=false", "admin", "admin", "http://guest:guest@localhost:15672")] - [TestCase("host=localhost;username=copa;password=abc123xyz;useTls=true", "admin", "admin", "https://copa:abc123xyz@localhost:15671")] - [TestCase("host=localhost;username=copa;password=abc123xyz;useTls=false", "admin", "admin", "http://copa:abc123xyz@localhost:15672")] - [TestCase("host=localhost;username=copa;password=abc123xyz;useTls=true", "admin", "admin", "https://copa:abc123xyz@localhost:15671")] - [TestCase("host=localhost;username=copa;password=abc123xyz;useTls=false", "admin", "admin", "http://copa:abc123xyz@localhost:15672")] - [TestCase("host=localhost;username=copa;password=abc123xyz;port=12345;useTls=true", "admin", "admin", "https://copa:abc123xyz@localhost:15671")] - [TestCase("host=localhost;username=copa;password=abc123xyz;port=12345;useTls=false", "admin", "admin", "http://copa:abc123xyz@localhost:15672")] - [TestCase("host=localhost;username=copa;password=abc123xyz;port=12345;useTls=true", "admin", "admin", "https://copa:abc123xyz@localhost:15671")] - [TestCase("host=localhost;username=copa;password=abc123xyz;port=12345;useTls=false", "admin", "admin", "http://copa:abc123xyz@localhost:15672")] - public void ValidateManagementConnection_Should_Throw_With_Invalid_Management_Credentials_From_Broker_ConnectionConfiguration( - string brokerConnectionString, - string expectedUserName, string expectedPassword, string expectedUrl) { - var httpClient = CreateFakeHttpClient(request => FakeResponses.CheckAuthentication(request, expectedUserName, expectedPassword)); - var connectionConfiguration = ConnectionConfiguration.Create(brokerConnectionString); - var managementApiUrl = ManagementClient.CreateManagementConnectionString(connectionConfiguration); - Assert.That(string.Equals(managementApiUrl, expectedUrl), Is.True); - - var managementClient = CreateManagementClient(managementApiUrl, httpClient); + var HttpClient = CreateFakeHttpClient(request => FakeResponses.GetOverview(request, expectedUserName, expectedPassword, expectedUrl)); + managementClient = CreateManagementClient(managementApiUrl, HttpClient); - var exception = Assert.ThrowsAsync(async () => await managementClient.ValidateManagementConnection()); + var result = await managementClient.GetOverview(); + Assert.That(result.StatusCode, Is.EqualTo(HttpStatusCode.Unauthorized)); } [Test] - public void Constructor_Should_Throw_With_Invalid_Scheme() + public void Should_Throw_With_Invalid_Scheme() { var managementApiUrl = "amqp:guest:guest@localhost:15672"; @@ -166,6 +90,7 @@ public void Constructor_Should_Throw_With_Invalid_Scheme() public async Task GetQueue_Should_Return_Queue_Information_When_Exists() { // Arrange + managementClient = new(defaultManagementUrl, defaultVirtualHost); var queueName = nameof(GetQueue_Should_Return_Queue_Information_When_Exists); await CreateQuorumQueue(queueName).ConfigureAwait(false); @@ -185,6 +110,7 @@ public async Task GetQueue_Should_Return_Queue_Information_When_Exists() public async Task GetOverview_Should_Return_Broker_Information() { // Act + managementClient = new(defaultManagementUrl, defaultVirtualHost); var response = await managementClient.GetOverview(); // Assert @@ -203,6 +129,7 @@ public async Task GetOverview_Should_Return_Broker_Information() public async Task GetFeatureFlags_Should_Return_FeatureFlag_Information() { // Act + managementClient = new(defaultManagementUrl, defaultVirtualHost); var response = await managementClient.GetFeatureFlags(); // Assert @@ -221,8 +148,9 @@ public async Task GetFeatureFlags_Should_Return_FeatureFlag_Information() public async Task CreatePolicy_With_DeliveryLimit_Should_Be_Applied_To_Quorum_Queues(int deliveryLimit) { // Arrange + managementClient = new(defaultManagementUrl, defaultVirtualHost); var queueName = nameof(CreatePolicy_With_DeliveryLimit_Should_Be_Applied_To_Quorum_Queues); - var policyName = $"{queueName} policy"; + var policyName = $"nsb.{queueName}"; await CreateQuorumQueue(queueName); // Act @@ -242,23 +170,31 @@ public async Task CreatePolicy_With_DeliveryLimit_Should_Be_Applied_To_Quorum_Qu // Assert // It can take some time for updated policies to be applied, so we need to wait. - // If this test is randomly failing, consider increasing the delay - await Task.Delay(10000); - var response = await managementClient.GetQueue(queueName); - Assert.Multiple(() => + // If this test is randomly failing, consider increasing the maxWaitTime + var maxWaitTime = TimeSpan.FromSeconds(30); + var pollingInterval = TimeSpan.FromSeconds(2); + var stopwatch = Stopwatch.StartNew(); + while (stopwatch.Elapsed < maxWaitTime) { - Assert.That(response.StatusCode, Is.EqualTo(HttpStatusCode.OK)); - Assert.That(response.Value, Is.Not.Null); - Assert.That(response.Value?.AppliedPolicyName, Is.EqualTo(policyName)); - Assert.That(response.Value?.EffectivePolicyDefinition?.DeliveryLimit, Is.EqualTo(deliveryLimit)); - }); + var response = await managementClient.GetQueue(queueName); + if (response.StatusCode == HttpStatusCode.OK + && response.Value != null + && response.Value.AppliedPolicyName == policyName + && response.Value.EffectivePolicyDefinition?.DeliveryLimit == deliveryLimit) + { + // Policy applied successfully + return; + } + await Task.Delay(pollingInterval); + } + Assert.Fail($"Policy '{policyName}' was not applied to queue '{queueName}' within {maxWaitTime.TotalSeconds} seconds."); } static async Task CreateQuorumQueue(string queueName) { using var connection = await connectionFactory.CreateConnection($"{queueName} connection").ConfigureAwait(false); using var channel = await connection.CreateChannelAsync().ConfigureAwait(false); - var arguments = new Dictionary { { "x-queue-type", "quorum" } }; + var arguments = new Dictionary { { "x-queue-type", "quorum" }, { "delivery_limit", 5 } }; _ = await channel.QueueDeclareAsync(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments); } From c95279d5d90c45c0e2e795a23d874d6fac683d79 Mon Sep 17 00:00:00 2001 From: Travis Nickels Date: Thu, 16 Jan 2025 21:12:08 -0800 Subject: [PATCH 15/24] Add more broker verifier tests --- .../BrokerVerifierTests.cs | 51 +++++++++++++++++-- 1 file changed, 48 insertions(+), 3 deletions(-) diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/BrokerVerifierTests.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/BrokerVerifierTests.cs index a511bfaf3..25e9d486d 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/BrokerVerifierTests.cs +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/BrokerVerifierTests.cs @@ -54,6 +54,8 @@ public async Task ValidateDeliveryLimit_Should_Set_Delivery_Limit_Policy() await brokerVerifier.Initialize(); await brokerVerifier.ValidateDeliveryLimit(queueName); + // It can take some time for updated policies to be applied, so we need to wait. + // If this test is randomly failing, consider increasing the maxWaitTime var maxWaitTime = TimeSpan.FromSeconds(30); var pollingInterval = TimeSpan.FromSeconds(2); var stopwatch = Stopwatch.StartNew(); @@ -77,16 +79,59 @@ public async Task ValidateDeliveryLimit_Should_Set_Delivery_Limit_Policy() } [Test] - public async Task ValidateDeliveryLimit_Should_Throw_When_Queue_Argument_Delivery_Limit_Set() + public async Task ValidateDeliveryLimit_Should_Throw_When_Queue_Argument_Has_Delivery_Limit_Not_Set_To_Unlimited() { - var queueName = nameof(ValidateDeliveryLimit_Should_Throw_When_Queue_Argument_Delivery_Limit_Set); + var queueName = nameof(ValidateDeliveryLimit_Should_Throw_When_Queue_Argument_Has_Delivery_Limit_Not_Set_To_Unlimited); await CreateQuorumQueueWithDeliveryLimit(queueName, 5); var managementClient = new ManagementClient(connectionConfiguration); var brokerVerifier = new BrokerVerifier(connectionFactory, true, managementClient); await brokerVerifier.Initialize(); - _ = Assert.ThrowsAsync(async () => await brokerVerifier.ValidateDeliveryLimit(queueName)); + var exception = Assert.ThrowsAsync(async () => await brokerVerifier.ValidateDeliveryLimit(queueName)); + Assert.That(exception.Message, Does.Contain($"The delivery limit for {queueName} is set to 5 by a queue argument. " + + $"This can interfere with the transport's retry implementation")); + } + + [Test] + public async Task ValidateDeliveryLimit_Should_Throw_When_Delivery_Limit_Cannot_Be_Validated() + { + var queueName = nameof(ValidateDeliveryLimit_Should_Throw_When_Delivery_Limit_Cannot_Be_Validated); + await CreateQuorumQueue(queueName); + var managementClient = new ManagementClient(connectionConfiguration); + var brokerVerifier = new BrokerVerifier(connectionFactory, true, managementClient); + + await brokerVerifier.Initialize(); + + var exception = Assert.ThrowsAsync(async () => await brokerVerifier.ValidateDeliveryLimit("WrongQueue")); + Assert.That(exception.Message, Does.Contain($"Could not retrieve full queue details for WrongQueue")); + } + + [Test] + public async Task ValidateDeliveryLimit_Should_Throw_When_A_Policy_On_Queue_Has_Delivery_Limit_Not_Set_To_Unlimited() + { + // Arrange + var deliveryLimit = 15; + var queueName = nameof(ValidateDeliveryLimit_Should_Throw_When_A_Policy_On_Queue_Has_Delivery_Limit_Not_Set_To_Unlimited); + var managementClient = new ManagementClient(connectionConfiguration); + var brokerVerifier = new BrokerVerifier(connectionFactory, true, managementClient); + var policy = new Policy + { + Name = $"nsb.{queueName}.delivery-limit", + ApplyTo = PolicyTarget.QuorumQueues, + Definition = new PolicyDefinition { DeliveryLimit = deliveryLimit }, + Pattern = queueName, + Priority = 100 + }; + + // Act + await CreateQuorumQueue(queueName); + await brokerVerifier.Initialize(); + await managementClient.CreatePolicy(policy).ConfigureAwait(false); + var exception = Assert.ThrowsAsync(async () => await brokerVerifier.ValidateDeliveryLimit(queueName)); + + // Assert + Assert.That(exception.Message, Does.Contain($"The RabbitMQ policy {policy.Name} is setting delivery limit to {deliveryLimit} for {queueName}")); } static async Task CreateQuorumQueue(string queueName) From 7e5ec802f08eb7ab87f3479bad642422f5df1d89 Mon Sep 17 00:00:00 2001 From: Travis Nickels Date: Thu, 16 Jan 2025 21:12:29 -0800 Subject: [PATCH 16/24] update fake http client --- src/NServiceBus.Transport.RabbitMQ.Tests/FakeHttpClient.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/FakeHttpClient.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/FakeHttpClient.cs index cef5a6140..816f4b5fc 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/FakeHttpClient.cs +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/FakeHttpClient.cs @@ -33,7 +33,6 @@ public class FakeHttpMessageHandler : HttpMessageHandler protected override async Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken = default) { - //request?.RequestUri?.PathAndQuery.Contains("api/policies/"); var response = FakeResponse?.Invoke(request) ?? FakeResponses.NotFound(); return await Task.FromResult(response); } From 4df30b95dee621698074977b8081a56d425d4e1b Mon Sep 17 00:00:00 2001 From: Travis Nickels Date: Fri, 17 Jan 2025 16:46:47 -0800 Subject: [PATCH 17/24] Remove management client tests from connection configuration tests --- .../ConnectionConfigurationTests.cs | 147 ------------------ 1 file changed, 147 deletions(-) diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/Connection/ConnectionConfigurationTests.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/Connection/ConnectionConfigurationTests.cs index 9541b37f2..9139a9db6 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/Connection/ConnectionConfigurationTests.cs +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/Connection/ConnectionConfigurationTests.cs @@ -3,10 +3,6 @@ namespace NServiceBus.Transport.RabbitMQ.Tests.ConnectionString { using System; - using System.Collections.Generic; - using System.Linq; - using System.Net.Http; - using System.Text; using NUnit.Framework; using RabbitMQ; @@ -199,148 +195,5 @@ public void Should_set_default_use_tls() { Assert.That(brokerDefaults.UseTls, Is.EqualTo(false)); } - - - - [Test] - public void Should_throw_on_invalid_legacy_management_host() - { - // Create transport in legacy mode - var transport = new RabbitMQTransport - { - TopologyFactory = durable => new ConventionalRoutingTopology(durable, QueueType.Quorum), - LegacyApiConnectionString = BrokerConnectionString, - LegacyManagementApiUrl = "http://copa:abc123xyz@wronghost:12345" - }; - - _ = Assert.ThrowsAsync(async () => await transport.Initialize(HostSettings, [], [])); - } - - // Todo: Need to fix tests checking the legacyManagementApiUrl - // Scenario 1: legacyManagementApiUrl is null or empty - // Scenario 2: legacyManagementApiUrl host is invalid - // Scenario 3: legacyManagementApiUrl port is invalid - // Scenario 4: legacyManagementApiUrl credentials are invalid - - //[Test] - //public void Should_throw_on_invalid_legacy_management_credentials() - //{ - // // Create transport in legacy mode - // var transport = new RabbitMQTransport - // { - // TopologyFactory = durable => new ConventionalRoutingTopology(durable, QueueType.Quorum), - // LegacyApiConnectionString = BrokerConnectionString, - // LegacyManagementApiUrl = "http://copa:abc123xyz@localhost:12345" - // }; - - // var exception = Assert.ThrowsAsync(async () => await transport.Initialize(HostSettings, [], [])); - - // Assert.That(exception!.Message, Does.Contain("Could not access RabbitMQ Management API")); - //} - - //[Test] - //public void Should_throw_on_invalid_legacy_broker_connection_string() - //{ - // var invalidBrokerConnection = new FakeConnectionConfiguration(host: "localhost", port: "5672", virtualHost: "/", userName: "Copa", password: "guest", useTls: "false"); - - // // Create transport in legacy mode - // var transport = new RabbitMQTransport - // { - // TopologyFactory = durable => new ConventionalRoutingTopology(durable, QueueType.Quorum), - // LegacyApiConnectionString = invalidBrokerConnection.ToConnectionString(), - // //LegacyManagementApiUrl = "http://copa:guest@wronghost:12345" - // }; - - // var exception = Assert.ThrowsAsync(async () => await transport.Initialize(HostSettings, [], [])) - // ?? throw new ArgumentNullException("exception"); - - // Assert.That(exception.Message, Does.Contain("None of the specified endpoints were reachable")); - //} - - [Test] - public void Should_not_throw_when_DoNotUseManagementClient_is_enabled_and_management_connection_is_invalid() - { - var invalidManagementConnection = new FakeConnectionConfiguration(host: "Copa"); - - var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), BrokerConnectionString) - { - DoNotUseManagementClient = true - }; - - Assert.DoesNotThrowAsync(async () => await transport.Initialize(HostSettings, [], [])); - } - - public class FakeConnectionConfiguration - { - internal string Host { get; set; } - - internal string? Port { get; set; } - - internal string? VirtualHost { get; set; } - - internal string? UserName { get; set; } - - internal string? Password { get; set; } - - internal string? UseTls { get; set; } - - internal FakeConnectionConfiguration( - string host, - string? port = null, - string? virtualHost = null, - string? userName = null, - string? password = null, - string? useTls = null) - { - Host = host; - Port = port; - VirtualHost = virtualHost; - UserName = userName; - Password = password; - UseTls = useTls; - } - - internal FakeConnectionConfiguration(string connectionString) - { - var parameters = connectionString.Split(';').Select(param => param.Split('=')).ToDictionary(parts => parts[0].ToLower(), parts => parts[1]); - - Host = parameters["host"]; - Port = GetParameterValue(parameters, "port"); - VirtualHost = GetParameterValue(parameters, "virtualhost"); - UserName = GetParameterValue(parameters, "username"); - Password = GetParameterValue(parameters, "password"); - UseTls = GetParameterValue(parameters, "usetls"); - } - - static string? GetParameterValue(Dictionary parameters, string key) => parameters.TryGetValue(key, out var value) ? value : null; - - internal string ToConnectionString() - { - var sb = new StringBuilder(); - _ = sb.Append($"{nameof(Host)}={Host}"); - - if (!string.IsNullOrEmpty(VirtualHost)) - { - _ = sb.Append($";{nameof(VirtualHost)}={VirtualHost}"); - } - if (!string.IsNullOrEmpty(Port)) - { - _ = sb.Append($";{nameof(Port)}={Port}"); - } - if (!string.IsNullOrEmpty(UserName)) - { - _ = sb.Append($";{nameof(UserName)}={UserName}"); - } - if (!string.IsNullOrEmpty(Password)) - { - _ = sb.Append($";{nameof(Password)}={Password}"); - } - if (!string.IsNullOrEmpty(UseTls)) - { - _ = sb.Append($";{nameof(UseTls)}={UseTls}"); - } - return sb.ToString(); - } - } } } \ No newline at end of file From ddc9a8bc5f45440a0f4691f265e0e52db30161bb Mon Sep 17 00:00:00 2001 From: Travis Nickels Date: Fri, 17 Jan 2025 16:47:30 -0800 Subject: [PATCH 18/24] Create transport initialize tests for the management URL --- .../InitializeTransportTests.cs | 113 ++++++++++++++++++ 1 file changed, 113 insertions(+) create mode 100644 src/NServiceBus.Transport.RabbitMQ.Tests/InitializeTransportTests.cs diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/InitializeTransportTests.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/InitializeTransportTests.cs new file mode 100644 index 000000000..630931f88 --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/InitializeTransportTests.cs @@ -0,0 +1,113 @@ +namespace NServiceBus.Transport.RabbitMQ.Tests +{ + using System; + using System.Net.Http; + using NServiceBus.Transport.RabbitMQ.Tests.ConnectionString; + using NUnit.Framework; + + [TestFixture] + class InitializeTransportTests + { + static string BrokerConnectionString = Environment.GetEnvironmentVariable("RabbitMQTransport_ConnectionString") ?? "host=localhost"; + static HostSettings HostSettings { get; } = new(nameof(ConnectionConfigurationTests), nameof(ConnectionConfigurationTests), null, null, false); + + [Test] + public void Should_not_throw_with_valid_legacy_management_url() + { + var broker = ConnectionConfiguration.Create(BrokerConnectionString); + // Create transport in legacy mode + var transport = new RabbitMQTransport + { + TopologyFactory = durable => new ConventionalRoutingTopology(durable, QueueType.Quorum), + LegacyApiConnectionString = BrokerConnectionString, + LegacyManagementApiUrl = $"{(broker.UseTls ? "https" : "http")}://{broker.UserName}:{broker.Password}@{broker.Host}:{(broker.UseTls ? "15671" : "15672")}", + DoNotUseManagementClient = false + }; + + Assert.DoesNotThrowAsync(async () => await transport.Initialize(HostSettings, [], [])); + } + + [Test] + public void Should_throw_on_invalid_management_scheme() + { + var broker = ConnectionConfiguration.Create(BrokerConnectionString); + // Create transport in legacy mode + var transport = new RabbitMQTransport + { + TopologyFactory = durable => new ConventionalRoutingTopology(durable, QueueType.Quorum), + LegacyApiConnectionString = BrokerConnectionString, + LegacyManagementApiUrl = $"{(broker.UseTls ? "http" : "https")}://guest:guest@{broker.Host}:{(broker.UseTls ? "15671" : "15672")}", + DoNotUseManagementClient = false + }; + + _ = Assert.ThrowsAsync(async () => await transport.Initialize(HostSettings, [], [])); + } + + [Test] + public void Should_throw_on_invalid_legacy_management_credentials() + { + var broker = ConnectionConfiguration.Create(BrokerConnectionString); + // Create transport in legacy mode + var transport = new RabbitMQTransport + { + TopologyFactory = durable => new ConventionalRoutingTopology(durable, QueueType.Quorum), + LegacyApiConnectionString = BrokerConnectionString, + LegacyManagementApiUrl = $"{(broker.UseTls ? "https" : "http")}://copa:abc123xyz@{broker.Host}:{(broker.UseTls ? "15671" : "15672")}", + DoNotUseManagementClient = false + }; + + _ = Assert.ThrowsAsync(async () => await transport.Initialize(HostSettings, [], [])); + } + + [Test] + public void Should_throw_on_invalid_management_host() + { + var broker = ConnectionConfiguration.Create(BrokerConnectionString); + // Create transport in legacy mode + var transport = new RabbitMQTransport + { + TopologyFactory = durable => new ConventionalRoutingTopology(durable, QueueType.Quorum), + LegacyApiConnectionString = BrokerConnectionString, + LegacyManagementApiUrl = $"{(broker.UseTls ? "https" : "http")}://guest:guest@wronghost:{(broker.UseTls ? "15671" : "15672")}", + DoNotUseManagementClient = false + }; + + _ = Assert.ThrowsAsync(async () => await transport.Initialize(HostSettings, [], [])); + } + + [Test] + public void Should_throw_on_invalid_management_port() + { + var broker = ConnectionConfiguration.Create(BrokerConnectionString); + // Create transport in legacy mode + var transport = new RabbitMQTransport + { + TopologyFactory = durable => new ConventionalRoutingTopology(durable, QueueType.Quorum), + LegacyApiConnectionString = BrokerConnectionString, + LegacyManagementApiUrl = $"{(broker.UseTls ? "https" : "http")}://guest:guest@{broker.Host}:12345", + DoNotUseManagementClient = false + }; + + _ = Assert.ThrowsAsync(async () => await transport.Initialize(HostSettings, [], [])); + } + + string CreateLegacyManagementApiUrl( + bool useTls, + string host, + string userName, + string password, + string port) + { + var scheme = useTls ? "https" : "http"; + return $"{scheme}://{userName}:{password}@{host}:{port}"; + } + + static RabbitMQTransport CreateTransport(string managementApiUrl) => new() + { + TopologyFactory = durable => new ConventionalRoutingTopology(durable, QueueType.Quorum), + LegacyApiConnectionString = BrokerConnectionString, + LegacyManagementApiUrl = managementApiUrl, + DoNotUseManagementClient = false + }; + } +} From 711fc04f899219dc6312a11e686b4a42f45f041e Mon Sep 17 00:00:00 2001 From: Travis Nickels Date: Fri, 17 Jan 2025 16:48:28 -0800 Subject: [PATCH 19/24] Update the legacy public API --- .../RabbitMQTransportSettingsExtensions.cs | 28 +++++++++++++++---- 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/src/NServiceBus.Transport.RabbitMQ/Configuration/RabbitMQTransportSettingsExtensions.cs b/src/NServiceBus.Transport.RabbitMQ/Configuration/RabbitMQTransportSettingsExtensions.cs index 26afe4721..a25946437 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Configuration/RabbitMQTransportSettingsExtensions.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Configuration/RabbitMQTransportSettingsExtensions.cs @@ -102,12 +102,12 @@ public static TransportExtensions ConnectionString(this Trans [PreObsolete("https://github.com/Particular/NServiceBus/issues/6811", Message = "The configuration has been moved to RabbitMQTransport class.", Note = "Should not be converted to an ObsoleteEx until API mismatch described in issue is resolved.")] - public static TransportExtensions ManagementConnectionString(this TransportExtensions transportExtensions, string connectionString) + public static TransportExtensions ManagementApiUrl(this TransportExtensions transportExtensions, string connectionUrl) { ArgumentNullException.ThrowIfNull(transportExtensions); - ArgumentException.ThrowIfNullOrWhiteSpace(connectionString); + ArgumentException.ThrowIfNullOrWhiteSpace(connectionUrl); - transportExtensions.Transport.LegacyManagementApiUrl = connectionString; + transportExtensions.Transport.LegacyManagementApiUrl = connectionUrl; return transportExtensions; } @@ -117,12 +117,12 @@ public static TransportExtensions ManagementConnectionString( [PreObsolete("https://github.com/Particular/NServiceBus/issues/6811", Message = "The configuration has been moved to RabbitMQTransport class.", Note = "Should not be converted to an ObsoleteEx until API mismatch described in issue is resolved.")] - public static TransportExtensions ManagementConnectionString(this TransportExtensions transportExtensions, Func getConnectionString) + public static TransportExtensions ManagementApiUrl(this TransportExtensions transportExtensions, Func getConnectionUrl) { ArgumentNullException.ThrowIfNull(transportExtensions); - ArgumentNullException.ThrowIfNull(getConnectionString); + ArgumentNullException.ThrowIfNull(getConnectionUrl); - transportExtensions.Transport.LegacyManagementApiUrl = getConnectionString(); + transportExtensions.Transport.LegacyManagementApiUrl = getConnectionUrl(); return transportExtensions; } @@ -368,5 +368,21 @@ public static TransportExtensions UseExternalAuthMechanism(th transportExtensions.Transport.UseExternalAuthMechanism = true; return transportExtensions; } + + /// + /// Specifies that an external authentication mechanism should be used for client authentication. + /// + /// + [PreObsolete("https://github.com/Particular/NServiceBus/issues/6811", + ReplacementTypeOrMember = "RabbitMQTransport.UseExternalAuthMechanism", + Message = "The configuration has been moved to RabbitMQTransport class.", + Note = "Should not be converted to an ObsoleteEx until API mismatch described in issue is resolved.")] + public static TransportExtensions DoNotUseManagementClient(this TransportExtensions transportExtensions) + { + ArgumentNullException.ThrowIfNull(transportExtensions); + + transportExtensions.Transport.DoNotUseManagementClient = true; + return transportExtensions; + } } } From 3cdb4d4cf8c90e4f277dd77f74b2f9112a8d4478 Mon Sep 17 00:00:00 2001 From: Travis Nickels Date: Fri, 17 Jan 2025 16:48:41 -0800 Subject: [PATCH 20/24] Fix approval files --- .../APIApprovals.Approve.approved.txt | 5 +- .../APIApprovals.Approve.approved_old.txt | 81 +++++++++++++++++++ 2 files changed, 84 insertions(+), 2 deletions(-) create mode 100644 src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved_old.txt diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt b/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt index 623f8cc4f..69248209c 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt @@ -43,8 +43,9 @@ namespace NServiceBus public static NServiceBus.TransportExtensions CustomMessageIdStrategy(this NServiceBus.TransportExtensions transportExtensions, System.Func customIdStrategy) { } public static NServiceBus.TransportExtensions DisableDurableExchangesAndQueues(this NServiceBus.TransportExtensions transportExtensions) { } public static NServiceBus.TransportExtensions DisableRemoteCertificateValidation(this NServiceBus.TransportExtensions transportExtensions) { } - public static NServiceBus.TransportExtensions ManagementConnectionString(this NServiceBus.TransportExtensions transportExtensions, System.Func getConnectionString) { } - public static NServiceBus.TransportExtensions ManagementConnectionString(this NServiceBus.TransportExtensions transportExtensions, string connectionString) { } + public static NServiceBus.TransportExtensions DoNotUseManagementClient(this NServiceBus.TransportExtensions transportExtensions) { } + public static NServiceBus.TransportExtensions ManagementApiUrl(this NServiceBus.TransportExtensions transportExtensions, System.Func getConnectionUrl) { } + public static NServiceBus.TransportExtensions ManagementApiUrl(this NServiceBus.TransportExtensions transportExtensions, string connectionUrl) { } public static NServiceBus.TransportExtensions PrefetchCount(this NServiceBus.TransportExtensions transportExtensions, ushort prefetchCount) { } public static NServiceBus.TransportExtensions PrefetchMultiplier(this NServiceBus.TransportExtensions transportExtensions, int prefetchMultiplier) { } public static NServiceBus.TransportExtensions SetClientCertificate(this NServiceBus.TransportExtensions transportExtensions, System.Security.Cryptography.X509Certificates.X509Certificate2 clientCertificate) { } diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved_old.txt b/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved_old.txt new file mode 100644 index 000000000..623f8cc4f --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved_old.txt @@ -0,0 +1,81 @@ +[assembly: System.Runtime.CompilerServices.InternalsVisibleTo(@"NServiceBus.Transport.RabbitMQ.AcceptanceTests, PublicKey=00240000048000009400000006020000002400005253413100040000010001007f16e21368ff041183fab592d9e8ed37e7be355e93323147a1d29983d6e591b04282e4da0c9e18bd901e112c0033925eb7d7872c2f1706655891c5c9d57297994f707d16ee9a8f40d978f064ee1ffc73c0db3f4712691b23bf596f75130f4ec978cf78757ec034625a5f27e6bb50c618931ea49f6f628fd74271c32959efb1c5")] +[assembly: System.Runtime.CompilerServices.InternalsVisibleTo(@"NServiceBus.Transport.RabbitMQ.Tests, PublicKey=00240000048000009400000006020000002400005253413100040000010001007f16e21368ff041183fab592d9e8ed37e7be355e93323147a1d29983d6e591b04282e4da0c9e18bd901e112c0033925eb7d7872c2f1706655891c5c9d57297994f707d16ee9a8f40d978f064ee1ffc73c0db3f4712691b23bf596f75130f4ec978cf78757ec034625a5f27e6bb50c618931ea49f6f628fd74271c32959efb1c5")] +namespace NServiceBus +{ + public static class NonPersistentDeliveryModeExtensions + { + public static void UseNonPersistentDeliveryMode(this NServiceBus.PublishOptions options) { } + public static void UseNonPersistentDeliveryMode(this NServiceBus.ReplyOptions options) { } + public static void UseNonPersistentDeliveryMode(this NServiceBus.SendOptions options) { } + } + public delegate long PrefetchCountCalculation(int maximumConcurrency); + public enum QueueType + { + Classic = 0, + Quorum = 1, + } + public class RabbitMQTransport : NServiceBus.Transport.TransportDefinition + { + public RabbitMQTransport(NServiceBus.RoutingTopology routingTopology, string connectionString) { } + public RabbitMQTransport(NServiceBus.RoutingTopology routingTopology, string connectionString, bool enableDelayedDelivery) { } + public System.Security.Cryptography.X509Certificates.X509Certificate2 ClientCertificate { get; set; } + public bool DoNotUseManagementClient { get; set; } + public System.TimeSpan HeartbeatInterval { get; set; } + public string ManagementApiUrl { get; set; } + public System.Func MessageIdStrategy { get; set; } + public System.TimeSpan NetworkRecoveryInterval { get; set; } + public System.Action OutgoingNativeMessageCustomization { get; set; } + public NServiceBus.PrefetchCountCalculation PrefetchCountCalculation { get; set; } + public System.TimeSpan TimeToWaitBeforeTriggeringCircuitBreaker { get; set; } + public bool UseExternalAuthMechanism { get; set; } + public bool ValidateRemoteCertificate { get; set; } + public void AddClusterNode(string hostName, bool useTls) { } + public void AddClusterNode(string hostName, int port, bool useTls) { } + public override System.Collections.Generic.IReadOnlyCollection GetSupportedTransactionModes() { } + public override System.Threading.Tasks.Task Initialize(NServiceBus.Transport.HostSettings hostSettings, NServiceBus.Transport.ReceiveSettings[] receivers, string[] sendingAddresses, System.Threading.CancellationToken cancellationToken = default) { } + } + public static class RabbitMQTransportSettingsExtensions + { + public static NServiceBus.TransportExtensions AddClusterNode(this NServiceBus.TransportExtensions transportExtensions, string hostName, bool useTls) { } + public static NServiceBus.TransportExtensions AddClusterNode(this NServiceBus.TransportExtensions transportExtensions, string hostName, int port, bool useTls) { } + public static NServiceBus.TransportExtensions ConnectionString(this NServiceBus.TransportExtensions transportExtensions, System.Func getConnectionString) { } + public static NServiceBus.TransportExtensions ConnectionString(this NServiceBus.TransportExtensions transportExtensions, string connectionString) { } + public static NServiceBus.TransportExtensions CustomMessageIdStrategy(this NServiceBus.TransportExtensions transportExtensions, System.Func customIdStrategy) { } + public static NServiceBus.TransportExtensions DisableDurableExchangesAndQueues(this NServiceBus.TransportExtensions transportExtensions) { } + public static NServiceBus.TransportExtensions DisableRemoteCertificateValidation(this NServiceBus.TransportExtensions transportExtensions) { } + public static NServiceBus.TransportExtensions ManagementConnectionString(this NServiceBus.TransportExtensions transportExtensions, System.Func getConnectionString) { } + public static NServiceBus.TransportExtensions ManagementConnectionString(this NServiceBus.TransportExtensions transportExtensions, string connectionString) { } + public static NServiceBus.TransportExtensions PrefetchCount(this NServiceBus.TransportExtensions transportExtensions, ushort prefetchCount) { } + public static NServiceBus.TransportExtensions PrefetchMultiplier(this NServiceBus.TransportExtensions transportExtensions, int prefetchMultiplier) { } + public static NServiceBus.TransportExtensions SetClientCertificate(this NServiceBus.TransportExtensions transportExtensions, System.Security.Cryptography.X509Certificates.X509Certificate2 clientCertificate) { } + public static NServiceBus.TransportExtensions SetClientCertificate(this NServiceBus.TransportExtensions transportExtensions, string path, string password) { } + public static NServiceBus.TransportExtensions SetHeartbeatInterval(this NServiceBus.TransportExtensions transportExtensions, System.TimeSpan heartbeatInterval) { } + public static NServiceBus.TransportExtensions SetNetworkRecoveryInterval(this NServiceBus.TransportExtensions transportExtensions, System.TimeSpan networkRecoveryInterval) { } + public static NServiceBus.TransportExtensions TimeToWaitBeforeTriggeringCircuitBreaker(this NServiceBus.TransportExtensions transportExtensions, System.TimeSpan waitTime) { } + public static NServiceBus.TransportExtensions UseConventionalRoutingTopology(this NServiceBus.TransportExtensions transportExtensions, NServiceBus.QueueType queueType) { } + public static NServiceBus.TransportExtensions UseCustomRoutingTopology(this NServiceBus.TransportExtensions transportExtensions, System.Func topologyFactory) { } + public static NServiceBus.TransportExtensions UseDirectRoutingTopology(this NServiceBus.TransportExtensions transportExtensions, NServiceBus.QueueType queueType, System.Func routingKeyConvention = null, System.Func exchangeNameConvention = null) { } + public static NServiceBus.TransportExtensions UseExternalAuthMechanism(this NServiceBus.TransportExtensions transportExtensions) { } + public static NServiceBus.TransportExtensions UseTransport(this NServiceBus.EndpointConfiguration config) + where T : NServiceBus.RabbitMQTransport { } + } + public class RoutingTopology + { + public static NServiceBus.RoutingTopology Conventional(NServiceBus.QueueType queueType, bool useDurableEntities = true) { } + public static NServiceBus.RoutingTopology Custom(NServiceBus.Transport.RabbitMQ.IRoutingTopology routingTopology) { } + public static NServiceBus.RoutingTopology Direct(NServiceBus.QueueType queueType, bool useDurableEntities = true, System.Func routingKeyConvention = null, System.Func exchangeNameConvention = null) { } + } +} +namespace NServiceBus.Transport.RabbitMQ +{ + public interface IRoutingTopology + { + System.Threading.Tasks.ValueTask BindToDelayInfrastructure(RabbitMQ.Client.IChannel channel, string address, string deliveryExchange, string routingKey, System.Threading.CancellationToken cancellationToken = default); + System.Threading.Tasks.ValueTask Initialize(RabbitMQ.Client.IChannel channel, System.Collections.Generic.IEnumerable receivingAddresses, System.Collections.Generic.IEnumerable sendingAddresses, System.Threading.CancellationToken cancellationToken = default); + System.Threading.Tasks.ValueTask Publish(RabbitMQ.Client.IChannel channel, System.Type type, NServiceBus.Transport.OutgoingMessage message, RabbitMQ.Client.BasicProperties properties, System.Threading.CancellationToken cancellationToken = default); + System.Threading.Tasks.ValueTask RawSendInCaseOfFailure(RabbitMQ.Client.IChannel channel, string address, System.ReadOnlyMemory body, RabbitMQ.Client.BasicProperties properties, System.Threading.CancellationToken cancellationToken = default); + System.Threading.Tasks.ValueTask Send(RabbitMQ.Client.IChannel channel, string address, NServiceBus.Transport.OutgoingMessage message, RabbitMQ.Client.BasicProperties properties, System.Threading.CancellationToken cancellationToken = default); + System.Threading.Tasks.ValueTask SetupSubscription(RabbitMQ.Client.IChannel channel, NServiceBus.Unicast.Messages.MessageMetadata type, string subscriberName, System.Threading.CancellationToken cancellationToken = default); + System.Threading.Tasks.ValueTask TeardownSubscription(RabbitMQ.Client.IChannel channel, NServiceBus.Unicast.Messages.MessageMetadata type, string subscriberName, System.Threading.CancellationToken cancellationToken = default); + } +} \ No newline at end of file From 05e51538e4df964d183d43a3d1d1d22a649e4f51 Mon Sep 17 00:00:00 2001 From: Travis Nickels Date: Fri, 17 Jan 2025 16:51:47 -0800 Subject: [PATCH 21/24] remove old approval file --- .../APIApprovals.Approve.approved_old.txt | 81 ------------------- 1 file changed, 81 deletions(-) delete mode 100644 src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved_old.txt diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved_old.txt b/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved_old.txt deleted file mode 100644 index 623f8cc4f..000000000 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved_old.txt +++ /dev/null @@ -1,81 +0,0 @@ -[assembly: System.Runtime.CompilerServices.InternalsVisibleTo(@"NServiceBus.Transport.RabbitMQ.AcceptanceTests, PublicKey=00240000048000009400000006020000002400005253413100040000010001007f16e21368ff041183fab592d9e8ed37e7be355e93323147a1d29983d6e591b04282e4da0c9e18bd901e112c0033925eb7d7872c2f1706655891c5c9d57297994f707d16ee9a8f40d978f064ee1ffc73c0db3f4712691b23bf596f75130f4ec978cf78757ec034625a5f27e6bb50c618931ea49f6f628fd74271c32959efb1c5")] -[assembly: System.Runtime.CompilerServices.InternalsVisibleTo(@"NServiceBus.Transport.RabbitMQ.Tests, PublicKey=00240000048000009400000006020000002400005253413100040000010001007f16e21368ff041183fab592d9e8ed37e7be355e93323147a1d29983d6e591b04282e4da0c9e18bd901e112c0033925eb7d7872c2f1706655891c5c9d57297994f707d16ee9a8f40d978f064ee1ffc73c0db3f4712691b23bf596f75130f4ec978cf78757ec034625a5f27e6bb50c618931ea49f6f628fd74271c32959efb1c5")] -namespace NServiceBus -{ - public static class NonPersistentDeliveryModeExtensions - { - public static void UseNonPersistentDeliveryMode(this NServiceBus.PublishOptions options) { } - public static void UseNonPersistentDeliveryMode(this NServiceBus.ReplyOptions options) { } - public static void UseNonPersistentDeliveryMode(this NServiceBus.SendOptions options) { } - } - public delegate long PrefetchCountCalculation(int maximumConcurrency); - public enum QueueType - { - Classic = 0, - Quorum = 1, - } - public class RabbitMQTransport : NServiceBus.Transport.TransportDefinition - { - public RabbitMQTransport(NServiceBus.RoutingTopology routingTopology, string connectionString) { } - public RabbitMQTransport(NServiceBus.RoutingTopology routingTopology, string connectionString, bool enableDelayedDelivery) { } - public System.Security.Cryptography.X509Certificates.X509Certificate2 ClientCertificate { get; set; } - public bool DoNotUseManagementClient { get; set; } - public System.TimeSpan HeartbeatInterval { get; set; } - public string ManagementApiUrl { get; set; } - public System.Func MessageIdStrategy { get; set; } - public System.TimeSpan NetworkRecoveryInterval { get; set; } - public System.Action OutgoingNativeMessageCustomization { get; set; } - public NServiceBus.PrefetchCountCalculation PrefetchCountCalculation { get; set; } - public System.TimeSpan TimeToWaitBeforeTriggeringCircuitBreaker { get; set; } - public bool UseExternalAuthMechanism { get; set; } - public bool ValidateRemoteCertificate { get; set; } - public void AddClusterNode(string hostName, bool useTls) { } - public void AddClusterNode(string hostName, int port, bool useTls) { } - public override System.Collections.Generic.IReadOnlyCollection GetSupportedTransactionModes() { } - public override System.Threading.Tasks.Task Initialize(NServiceBus.Transport.HostSettings hostSettings, NServiceBus.Transport.ReceiveSettings[] receivers, string[] sendingAddresses, System.Threading.CancellationToken cancellationToken = default) { } - } - public static class RabbitMQTransportSettingsExtensions - { - public static NServiceBus.TransportExtensions AddClusterNode(this NServiceBus.TransportExtensions transportExtensions, string hostName, bool useTls) { } - public static NServiceBus.TransportExtensions AddClusterNode(this NServiceBus.TransportExtensions transportExtensions, string hostName, int port, bool useTls) { } - public static NServiceBus.TransportExtensions ConnectionString(this NServiceBus.TransportExtensions transportExtensions, System.Func getConnectionString) { } - public static NServiceBus.TransportExtensions ConnectionString(this NServiceBus.TransportExtensions transportExtensions, string connectionString) { } - public static NServiceBus.TransportExtensions CustomMessageIdStrategy(this NServiceBus.TransportExtensions transportExtensions, System.Func customIdStrategy) { } - public static NServiceBus.TransportExtensions DisableDurableExchangesAndQueues(this NServiceBus.TransportExtensions transportExtensions) { } - public static NServiceBus.TransportExtensions DisableRemoteCertificateValidation(this NServiceBus.TransportExtensions transportExtensions) { } - public static NServiceBus.TransportExtensions ManagementConnectionString(this NServiceBus.TransportExtensions transportExtensions, System.Func getConnectionString) { } - public static NServiceBus.TransportExtensions ManagementConnectionString(this NServiceBus.TransportExtensions transportExtensions, string connectionString) { } - public static NServiceBus.TransportExtensions PrefetchCount(this NServiceBus.TransportExtensions transportExtensions, ushort prefetchCount) { } - public static NServiceBus.TransportExtensions PrefetchMultiplier(this NServiceBus.TransportExtensions transportExtensions, int prefetchMultiplier) { } - public static NServiceBus.TransportExtensions SetClientCertificate(this NServiceBus.TransportExtensions transportExtensions, System.Security.Cryptography.X509Certificates.X509Certificate2 clientCertificate) { } - public static NServiceBus.TransportExtensions SetClientCertificate(this NServiceBus.TransportExtensions transportExtensions, string path, string password) { } - public static NServiceBus.TransportExtensions SetHeartbeatInterval(this NServiceBus.TransportExtensions transportExtensions, System.TimeSpan heartbeatInterval) { } - public static NServiceBus.TransportExtensions SetNetworkRecoveryInterval(this NServiceBus.TransportExtensions transportExtensions, System.TimeSpan networkRecoveryInterval) { } - public static NServiceBus.TransportExtensions TimeToWaitBeforeTriggeringCircuitBreaker(this NServiceBus.TransportExtensions transportExtensions, System.TimeSpan waitTime) { } - public static NServiceBus.TransportExtensions UseConventionalRoutingTopology(this NServiceBus.TransportExtensions transportExtensions, NServiceBus.QueueType queueType) { } - public static NServiceBus.TransportExtensions UseCustomRoutingTopology(this NServiceBus.TransportExtensions transportExtensions, System.Func topologyFactory) { } - public static NServiceBus.TransportExtensions UseDirectRoutingTopology(this NServiceBus.TransportExtensions transportExtensions, NServiceBus.QueueType queueType, System.Func routingKeyConvention = null, System.Func exchangeNameConvention = null) { } - public static NServiceBus.TransportExtensions UseExternalAuthMechanism(this NServiceBus.TransportExtensions transportExtensions) { } - public static NServiceBus.TransportExtensions UseTransport(this NServiceBus.EndpointConfiguration config) - where T : NServiceBus.RabbitMQTransport { } - } - public class RoutingTopology - { - public static NServiceBus.RoutingTopology Conventional(NServiceBus.QueueType queueType, bool useDurableEntities = true) { } - public static NServiceBus.RoutingTopology Custom(NServiceBus.Transport.RabbitMQ.IRoutingTopology routingTopology) { } - public static NServiceBus.RoutingTopology Direct(NServiceBus.QueueType queueType, bool useDurableEntities = true, System.Func routingKeyConvention = null, System.Func exchangeNameConvention = null) { } - } -} -namespace NServiceBus.Transport.RabbitMQ -{ - public interface IRoutingTopology - { - System.Threading.Tasks.ValueTask BindToDelayInfrastructure(RabbitMQ.Client.IChannel channel, string address, string deliveryExchange, string routingKey, System.Threading.CancellationToken cancellationToken = default); - System.Threading.Tasks.ValueTask Initialize(RabbitMQ.Client.IChannel channel, System.Collections.Generic.IEnumerable receivingAddresses, System.Collections.Generic.IEnumerable sendingAddresses, System.Threading.CancellationToken cancellationToken = default); - System.Threading.Tasks.ValueTask Publish(RabbitMQ.Client.IChannel channel, System.Type type, NServiceBus.Transport.OutgoingMessage message, RabbitMQ.Client.BasicProperties properties, System.Threading.CancellationToken cancellationToken = default); - System.Threading.Tasks.ValueTask RawSendInCaseOfFailure(RabbitMQ.Client.IChannel channel, string address, System.ReadOnlyMemory body, RabbitMQ.Client.BasicProperties properties, System.Threading.CancellationToken cancellationToken = default); - System.Threading.Tasks.ValueTask Send(RabbitMQ.Client.IChannel channel, string address, NServiceBus.Transport.OutgoingMessage message, RabbitMQ.Client.BasicProperties properties, System.Threading.CancellationToken cancellationToken = default); - System.Threading.Tasks.ValueTask SetupSubscription(RabbitMQ.Client.IChannel channel, NServiceBus.Unicast.Messages.MessageMetadata type, string subscriberName, System.Threading.CancellationToken cancellationToken = default); - System.Threading.Tasks.ValueTask TeardownSubscription(RabbitMQ.Client.IChannel channel, NServiceBus.Unicast.Messages.MessageMetadata type, string subscriberName, System.Threading.CancellationToken cancellationToken = default); - } -} \ No newline at end of file From af3ad95906e19f618db64c9f97eb12d6421e6243 Mon Sep 17 00:00:00 2001 From: Travis Nickels Date: Mon, 20 Jan 2025 13:18:07 -0800 Subject: [PATCH 22/24] Fix queue argument property name for x-delivery-limit --- src/NServiceBus.Transport.RabbitMQ.Tests/BrokerVerifierTests.cs | 2 +- .../Administration/ManagementClient/Models/QueueArguments.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/BrokerVerifierTests.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/BrokerVerifierTests.cs index 25e9d486d..fa0e7d0a9 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/BrokerVerifierTests.cs +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/BrokerVerifierTests.cs @@ -147,7 +147,7 @@ static async Task CreateQuorumQueueWithDeliveryLimit(string queueName, int deliv { using var connection = await connectionFactory.CreateConnection($"{queueName} connection").ConfigureAwait(false); using var channel = await connection.CreateChannelAsync().ConfigureAwait(false); - var arguments = new Dictionary { { "x-queue-type", "quorum" }, { "delivery_limit", deliveryLimit } }; + var arguments = new Dictionary { { "x-queue-type", "quorum" }, { "x-delivery-limit", deliveryLimit } }; _ = await channel.QueueDeclareAsync(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments); } diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/QueueArguments.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/QueueArguments.cs index 17edf049a..4d5b9a7e5 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/QueueArguments.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/QueueArguments.cs @@ -12,7 +12,7 @@ class QueueArguments [JsonConverter(typeof(QueueTypeConverter))] public QueueType? QueueType { get; set; } - [JsonPropertyName("delivery_limit")] + [JsonPropertyName("x-delivery-limit")] [JsonConverter(typeof(DeliveryLimitConverter))] public int? DeliveryLimit { get; set; } From c4d7d023bb05b53916cf10465050a588149f9f40 Mon Sep 17 00:00:00 2001 From: Travis Nickels Date: Mon, 20 Jan 2025 13:20:39 -0800 Subject: [PATCH 23/24] Remove check on legacy management api url --- src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs b/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs index 01e536926..eb66a3ed4 100644 --- a/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs +++ b/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs @@ -311,14 +311,6 @@ void ValidateConnectionString() { throw new Exception("A connection string must be configured with 'EndpointConfiguration.UseTransport().ConnectionString()` method."); } - - // Todo: Not sure if we should throw here. Even if the LegacyManagementApiUrl is null or empty the default connection - // values could still be tried. Only if the default values fail should an error be thrown that a connection could not - // be made and the ManagementApiUrl should be configured. - if (!DoNotUseManagementClient && string.IsNullOrEmpty(LegacyManagementApiUrl)) - { - throw new Exception("A management API connection string must be configured with 'EndpointConfiguration.UseTransport().ManagementApiUrl()` method."); - } } } } \ No newline at end of file From 85a5a5bf8712417b3c4ae730cb03a419d56e0b91 Mon Sep 17 00:00:00 2001 From: Travis Nickels Date: Mon, 20 Jan 2025 13:20:50 -0800 Subject: [PATCH 24/24] Refactoring --- .../BrokerVerifierTests.cs | 7 +++---- src/NServiceBus.Transport.RabbitMQ.Tests/FakeHttpClient.cs | 1 + .../InitializeTransportTests.cs | 2 +- .../Administration/ManagementClient/ManagementClient.cs | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/BrokerVerifierTests.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/BrokerVerifierTests.cs index fa0e7d0a9..c263eac49 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/BrokerVerifierTests.cs +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/BrokerVerifierTests.cs @@ -82,14 +82,15 @@ public async Task ValidateDeliveryLimit_Should_Set_Delivery_Limit_Policy() public async Task ValidateDeliveryLimit_Should_Throw_When_Queue_Argument_Has_Delivery_Limit_Not_Set_To_Unlimited() { var queueName = nameof(ValidateDeliveryLimit_Should_Throw_When_Queue_Argument_Has_Delivery_Limit_Not_Set_To_Unlimited); - await CreateQuorumQueueWithDeliveryLimit(queueName, 5); + var delivery_limit = 5; + await CreateQuorumQueueWithDeliveryLimit(queueName, delivery_limit); var managementClient = new ManagementClient(connectionConfiguration); var brokerVerifier = new BrokerVerifier(connectionFactory, true, managementClient); await brokerVerifier.Initialize(); var exception = Assert.ThrowsAsync(async () => await brokerVerifier.ValidateDeliveryLimit(queueName)); - Assert.That(exception.Message, Does.Contain($"The delivery limit for {queueName} is set to 5 by a queue argument. " + + Assert.That(exception.Message, Does.Contain($"The delivery limit for {queueName} is set to {delivery_limit} by a queue argument. " + $"This can interfere with the transport's retry implementation")); } @@ -153,8 +154,6 @@ static async Task CreateQuorumQueueWithDeliveryLimit(string queueName, int deliv } } - - class FakeLogger : ILog { public List Messages { get; } = []; diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/FakeHttpClient.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/FakeHttpClient.cs index 816f4b5fc..85faaa8f3 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/FakeHttpClient.cs +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/FakeHttpClient.cs @@ -89,6 +89,7 @@ public static HttpResponseMessage GetOverview( { return response; } + var overview = new Overview { ClusterName = "rabbit@my - rabbit", diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/InitializeTransportTests.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/InitializeTransportTests.cs index 630931f88..a73551c15 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/InitializeTransportTests.cs +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/InitializeTransportTests.cs @@ -8,7 +8,7 @@ [TestFixture] class InitializeTransportTests { - static string BrokerConnectionString = Environment.GetEnvironmentVariable("RabbitMQTransport_ConnectionString") ?? "host=localhost"; + static readonly string BrokerConnectionString = Environment.GetEnvironmentVariable("RabbitMQTransport_ConnectionString") ?? "host=localhost"; static HostSettings HostSettings { get; } = new(nameof(ConnectionConfigurationTests), nameof(ConnectionConfigurationTests), null, null, false); [Test] diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/ManagementClient.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/ManagementClient.cs index 6125fc481..17e7c44e6 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/ManagementClient.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/ManagementClient.cs @@ -13,7 +13,7 @@ namespace NServiceBus.Transport.RabbitMQ.ManagementClient; class ManagementClient { - HttpClient httpClient; + readonly HttpClient httpClient; readonly string virtualHost; readonly string escapedVirtualHost;