diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt b/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt index a690d0f59..69248209c 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt @@ -18,11 +18,10 @@ namespace NServiceBus { public RabbitMQTransport(NServiceBus.RoutingTopology routingTopology, string connectionString) { } public RabbitMQTransport(NServiceBus.RoutingTopology routingTopology, string connectionString, bool enableDelayedDelivery) { } - public RabbitMQTransport(NServiceBus.RoutingTopology routingTopology, string connectionString, string managementConnectionString) { } - public RabbitMQTransport(NServiceBus.RoutingTopology routingTopology, string connectionString, bool enableDelayedDelivery, string managementConnectionString) { } public System.Security.Cryptography.X509Certificates.X509Certificate2 ClientCertificate { get; set; } public bool DoNotUseManagementClient { get; set; } public System.TimeSpan HeartbeatInterval { get; set; } + public string ManagementApiUrl { get; set; } public System.Func MessageIdStrategy { get; set; } public System.TimeSpan NetworkRecoveryInterval { get; set; } public System.Action OutgoingNativeMessageCustomization { get; set; } @@ -44,8 +43,9 @@ namespace NServiceBus public static NServiceBus.TransportExtensions CustomMessageIdStrategy(this NServiceBus.TransportExtensions transportExtensions, System.Func customIdStrategy) { } public static NServiceBus.TransportExtensions DisableDurableExchangesAndQueues(this NServiceBus.TransportExtensions transportExtensions) { } public static NServiceBus.TransportExtensions DisableRemoteCertificateValidation(this NServiceBus.TransportExtensions transportExtensions) { } - public static NServiceBus.TransportExtensions ManagementConnectionString(this NServiceBus.TransportExtensions transportExtensions, System.Func getConnectionString) { } - public static NServiceBus.TransportExtensions ManagementConnectionString(this NServiceBus.TransportExtensions transportExtensions, string connectionString) { } + public static NServiceBus.TransportExtensions DoNotUseManagementClient(this NServiceBus.TransportExtensions transportExtensions) { } + public static NServiceBus.TransportExtensions ManagementApiUrl(this NServiceBus.TransportExtensions transportExtensions, System.Func getConnectionUrl) { } + public static NServiceBus.TransportExtensions ManagementApiUrl(this NServiceBus.TransportExtensions transportExtensions, string connectionUrl) { } public static NServiceBus.TransportExtensions PrefetchCount(this NServiceBus.TransportExtensions transportExtensions, ushort prefetchCount) { } public static NServiceBus.TransportExtensions PrefetchMultiplier(this NServiceBus.TransportExtensions transportExtensions, int prefetchMultiplier) { } public static NServiceBus.TransportExtensions SetClientCertificate(this NServiceBus.TransportExtensions transportExtensions, System.Security.Cryptography.X509Certificates.X509Certificate2 clientCertificate) { } diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/BrokerVerifierTests.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/BrokerVerifierTests.cs new file mode 100644 index 000000000..c263eac49 --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/BrokerVerifierTests.cs @@ -0,0 +1,188 @@ +#nullable enable + +namespace NServiceBus.Transport.RabbitMQ.Tests +{ + using System; + using NUnit.Framework; + using ManagementClient; + using NServiceBus.Logging; + using System.Collections.Generic; + using System.Linq; + using ConnectionFactory = ConnectionFactory; + using System.Threading.Tasks; + using System.Diagnostics; + using System.Net; + + [TestFixture] + class BrokerVerifierTests + { + static readonly string connectionString = Environment.GetEnvironmentVariable("RabbitMQTransport_ConnectionString") ?? "host=localhost"; + static readonly ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.Create(connectionString); + static readonly ConnectionFactory connectionFactory = new(typeof(ManagementClientTests).FullName, connectionConfiguration, null, false, false, TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(10), []); + + [Test] + public void Initialize_Should_Get_Response_When_Management_Client_Is_Available_And_Valid() + { + var managementClient = new ManagementClient(connectionConfiguration); + var brokerVerifier = new BrokerVerifier(connectionFactory, true, managementClient); + + Assert.DoesNotThrowAsync(async () => await brokerVerifier.Initialize()); + } + + [Test] + public void Initialize_Should_Should_Warn_When_Management_Is_Disabled_With_Version_4_Or_Greater() + { + var managementClient = new ManagementClient(connectionConfiguration); + var fakeLogger = new FakeLogger(); + var brokerVerifier = new BrokerVerifier(connectionFactory, false, managementClient, fakeLogger); + + Assert.DoesNotThrowAsync(async () => await brokerVerifier.Initialize()); + + Assert.That(fakeLogger.Messages, Has.Exactly(1).Items); + Assert.That(fakeLogger.Messages.FirstOrDefault(), Does.Contain("Use of RabbitMQ Management API has been disabled.")); + } + + [Test] + public async Task ValidateDeliveryLimit_Should_Set_Delivery_Limit_Policy() + { + var queueName = nameof(ValidateDeliveryLimit_Should_Set_Delivery_Limit_Policy); + var policyName = $"nsb.{queueName}.delivery-limit"; + await CreateQuorumQueue(queueName); + var managementClient = new ManagementClient(connectionConfiguration); + var brokerVerifier = new BrokerVerifier(connectionFactory, true, managementClient); + + await brokerVerifier.Initialize(); + await brokerVerifier.ValidateDeliveryLimit(queueName); + + // It can take some time for updated policies to be applied, so we need to wait. + // If this test is randomly failing, consider increasing the maxWaitTime + var maxWaitTime = TimeSpan.FromSeconds(30); + var pollingInterval = TimeSpan.FromSeconds(2); + var stopwatch = Stopwatch.StartNew(); + while (stopwatch.Elapsed < maxWaitTime) + { + var response = await managementClient.GetQueue(queueName); + if (response.StatusCode == HttpStatusCode.OK + && response.Value is not null + && response.Value.EffectivePolicyDefinition is not null + && response.Value.DeliveryLimit.Equals(-1) + && response.Value.AppliedPolicyName == policyName + && response.Value.EffectivePolicyDefinition.DeliveryLimit == -1) + { + // Policy applied successfully + return; + } + await Task.Delay(pollingInterval); + } + + Assert.Fail($"Policy '{policyName}' was not applied to queue '{queueName}' within {maxWaitTime.TotalSeconds} seconds."); + } + + [Test] + public async Task ValidateDeliveryLimit_Should_Throw_When_Queue_Argument_Has_Delivery_Limit_Not_Set_To_Unlimited() + { + var queueName = nameof(ValidateDeliveryLimit_Should_Throw_When_Queue_Argument_Has_Delivery_Limit_Not_Set_To_Unlimited); + var delivery_limit = 5; + await CreateQuorumQueueWithDeliveryLimit(queueName, delivery_limit); + var managementClient = new ManagementClient(connectionConfiguration); + var brokerVerifier = new BrokerVerifier(connectionFactory, true, managementClient); + + await brokerVerifier.Initialize(); + + var exception = Assert.ThrowsAsync(async () => await brokerVerifier.ValidateDeliveryLimit(queueName)); + Assert.That(exception.Message, Does.Contain($"The delivery limit for {queueName} is set to {delivery_limit} by a queue argument. " + + $"This can interfere with the transport's retry implementation")); + } + + [Test] + public async Task ValidateDeliveryLimit_Should_Throw_When_Delivery_Limit_Cannot_Be_Validated() + { + var queueName = nameof(ValidateDeliveryLimit_Should_Throw_When_Delivery_Limit_Cannot_Be_Validated); + await CreateQuorumQueue(queueName); + var managementClient = new ManagementClient(connectionConfiguration); + var brokerVerifier = new BrokerVerifier(connectionFactory, true, managementClient); + + await brokerVerifier.Initialize(); + + var exception = Assert.ThrowsAsync(async () => await brokerVerifier.ValidateDeliveryLimit("WrongQueue")); + Assert.That(exception.Message, Does.Contain($"Could not retrieve full queue details for WrongQueue")); + } + + [Test] + public async Task ValidateDeliveryLimit_Should_Throw_When_A_Policy_On_Queue_Has_Delivery_Limit_Not_Set_To_Unlimited() + { + // Arrange + var deliveryLimit = 15; + var queueName = nameof(ValidateDeliveryLimit_Should_Throw_When_A_Policy_On_Queue_Has_Delivery_Limit_Not_Set_To_Unlimited); + var managementClient = new ManagementClient(connectionConfiguration); + var brokerVerifier = new BrokerVerifier(connectionFactory, true, managementClient); + var policy = new Policy + { + Name = $"nsb.{queueName}.delivery-limit", + ApplyTo = PolicyTarget.QuorumQueues, + Definition = new PolicyDefinition { DeliveryLimit = deliveryLimit }, + Pattern = queueName, + Priority = 100 + }; + + // Act + await CreateQuorumQueue(queueName); + await brokerVerifier.Initialize(); + await managementClient.CreatePolicy(policy).ConfigureAwait(false); + var exception = Assert.ThrowsAsync(async () => await brokerVerifier.ValidateDeliveryLimit(queueName)); + + // Assert + Assert.That(exception.Message, Does.Contain($"The RabbitMQ policy {policy.Name} is setting delivery limit to {deliveryLimit} for {queueName}")); + } + + static async Task CreateQuorumQueue(string queueName) + { + using var connection = await connectionFactory.CreateConnection($"{queueName} connection").ConfigureAwait(false); + using var channel = await connection.CreateChannelAsync().ConfigureAwait(false); + var arguments = new Dictionary { { "x-queue-type", "quorum" } }; + + _ = await channel.QueueDeclareAsync(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments); + } + + static async Task CreateQuorumQueueWithDeliveryLimit(string queueName, int deliveryLimit) + { + using var connection = await connectionFactory.CreateConnection($"{queueName} connection").ConfigureAwait(false); + using var channel = await connection.CreateChannelAsync().ConfigureAwait(false); + var arguments = new Dictionary { { "x-queue-type", "quorum" }, { "x-delivery-limit", deliveryLimit } }; + + _ = await channel.QueueDeclareAsync(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments); + } + } + + class FakeLogger : ILog + { + public List Messages { get; } = []; + + public bool IsDebugEnabled => throw new NotImplementedException(); + + public bool IsInfoEnabled => throw new NotImplementedException(); + + public bool IsWarnEnabled => throw new NotImplementedException(); + + public bool IsErrorEnabled => throw new NotImplementedException(); + + public bool IsFatalEnabled => throw new NotImplementedException(); + + public void Debug(string message, Exception exception) => throw new NotImplementedException(); + public void DebugFormat(string format, params object[] args) => throw new NotImplementedException(); + public void Info(string message, Exception exception) => throw new NotImplementedException(); + public void InfoFormat(string format, params object[] args) => throw new NotImplementedException(); + public void Warn(string message, Exception exception) => Messages.Add(message); + public void WarnFormat(string format, params object[] args) => Messages.Add(format); + public void Error(string message, Exception exception) => throw new NotImplementedException(); + public void ErrorFormat(string format, params object[] args) => throw new NotImplementedException(); + public void Fatal(string message, Exception exception) => throw new NotImplementedException(); + public void FatalFormat(string format, params object[] args) => throw new NotImplementedException(); + public void Debug(string message) => throw new NotImplementedException(); + public void Info(string message) => throw new NotImplementedException(); + public void Error(string message) => Messages.Add(message); + public void Fatal(string message) => throw new NotImplementedException(); + public void Warn(string message) => Messages.Add(message); + } + +} diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/Connection/ConnectionConfigurationTests.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/Connection/ConnectionConfigurationTests.cs index c3af26cdd..9139a9db6 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/Connection/ConnectionConfigurationTests.cs +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/Connection/ConnectionConfigurationTests.cs @@ -3,12 +3,6 @@ namespace NServiceBus.Transport.RabbitMQ.Tests.ConnectionString { using System; - using System.Collections.Generic; - using System.Linq; - using System.Net.Http; - using System.Text; - using System.Threading.Tasks; - using global::RabbitMQ.Client.Exceptions; using NUnit.Framework; using RabbitMQ; @@ -17,19 +11,10 @@ class ConnectionConfigurationTests { const string FakeConnectionString = "virtualHost=Copa;username=Copa;host=192.168.1.1:1234;password=abc_xyz;port=12345;useTls=true"; static string BrokerConnectionString = Environment.GetEnvironmentVariable("RabbitMQTransport_ConnectionString") ?? "host=localhost"; - static string ManagementConnectionString => CreateManagementConnectionString(BrokerConnectionString); static HostSettings HostSettings { get; } = new(nameof(ConnectionConfigurationTests), nameof(ConnectionConfigurationTests), null, null, false); static readonly ConnectionConfiguration brokerDefaults = ConnectionConfiguration.Create("host=localhost"); - static readonly ConnectionConfiguration managementDefaults = ConnectionConfiguration.Create("host=localhost", isManagementConnection: true); - - static string CreateManagementConnectionString(string connectionString) - { - var parameters = connectionString.Split(';').Select(param => param.Split('=')).ToDictionary(parts => parts[0], parts => parts[1]); - parameters["port"] = "15672"; - return string.Join(";", parameters.Select(kv => $"{kv.Key}={kv.Value}")); - } [Test] public void Should_correctly_parse_full_connection_string() @@ -184,311 +169,31 @@ public void Should_list_all_invalid_options() [Test] public void Should_set_default_port() { - Assert.Multiple(() => - { - Assert.That(brokerDefaults.Port, Is.EqualTo(5672)); - Assert.That(managementDefaults.Port, Is.EqualTo(15672)); - }); + Assert.That(brokerDefaults.Port, Is.EqualTo(5672)); } [Test] public void Should_set_default_virtual_host() { - Assert.Multiple(() => - { - Assert.That(brokerDefaults.VirtualHost, Is.EqualTo("/")); - Assert.That(managementDefaults.VirtualHost, Is.EqualTo("/")); - }); + Assert.That(brokerDefaults.VirtualHost, Is.EqualTo("/")); } [Test] public void Should_set_default_username() { - Assert.Multiple(() => - { - Assert.That(brokerDefaults.UserName, Is.EqualTo("guest")); - Assert.That(managementDefaults.UserName, Is.EqualTo("guest")); - }); + Assert.That(brokerDefaults.UserName, Is.EqualTo("guest")); } [Test] public void Should_set_default_password() { - Assert.Multiple(() => - { - Assert.That(brokerDefaults.Password, Is.EqualTo("guest")); - Assert.That(managementDefaults.Password, Is.EqualTo("guest")); - }); + Assert.That(brokerDefaults.Password, Is.EqualTo("guest")); } [Test] public void Should_set_default_use_tls() { - Assert.Multiple(() => - { - Assert.That(brokerDefaults.UseTls, Is.EqualTo(false)); - Assert.That(managementDefaults.UseTls, Is.EqualTo(false)); - }); - } - - [Test] - public void Should_configure_broker_and_management_connection_configurations_with_single_connection_string() - { - var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), "virtualHost=/;host=localhost;username=guest;password=guest;port=5672;useTls=false"); - - Assert.Multiple(() => - { - Assert.That(transport.BrokerConnectionConfiguration.VirtualHost, Is.EqualTo("/")); - Assert.That(transport.BrokerConnectionConfiguration.Host, Is.EqualTo("localhost")); - Assert.That(transport.BrokerConnectionConfiguration.UserName, Is.EqualTo("guest")); - Assert.That(transport.BrokerConnectionConfiguration.Password, Is.EqualTo("guest")); - Assert.That(transport.BrokerConnectionConfiguration.Port, Is.EqualTo(5672)); - Assert.That(transport.BrokerConnectionConfiguration.UseTls, Is.EqualTo(false)); - - Assert.That(transport.ManagementConnectionConfiguration.VirtualHost, Is.EqualTo("/")); - Assert.That(transport.ManagementConnectionConfiguration.Host, Is.EqualTo("localhost")); - Assert.That(transport.ManagementConnectionConfiguration.UserName, Is.EqualTo("guest")); - Assert.That(transport.ManagementConnectionConfiguration.Password, Is.EqualTo("guest")); - Assert.That(transport.ManagementConnectionConfiguration.Port, Is.EqualTo(15672)); // This should be set to the default management port - Assert.That(transport.ManagementConnectionConfiguration.UseTls, Is.EqualTo(false)); - }); - } - - [Test] - public void Should_configure_broker_and_management_connection_configurations_with_respective_connection_strings() - { - var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), "virtualHost=/;host=localhost;username=guest;password=guest;port=5672;useTls=false", FakeConnectionString); - - Assert.Multiple(() => - { - Assert.That(transport.BrokerConnectionConfiguration.VirtualHost, Is.EqualTo("/")); - Assert.That(transport.BrokerConnectionConfiguration.Host, Is.EqualTo("localhost")); - Assert.That(transport.BrokerConnectionConfiguration.UserName, Is.EqualTo("guest")); - Assert.That(transport.BrokerConnectionConfiguration.Password, Is.EqualTo("guest")); - Assert.That(transport.BrokerConnectionConfiguration.Port, Is.EqualTo(5672)); - Assert.That(transport.BrokerConnectionConfiguration.UseTls, Is.EqualTo(false)); - - Assert.That(transport.ManagementConnectionConfiguration.VirtualHost, Is.EqualTo("Copa")); - Assert.That(transport.ManagementConnectionConfiguration.Host, Is.EqualTo("192.168.1.1")); - Assert.That(transport.ManagementConnectionConfiguration.UserName, Is.EqualTo("Copa")); - Assert.That(transport.ManagementConnectionConfiguration.Password, Is.EqualTo("abc_xyz")); - Assert.That(transport.ManagementConnectionConfiguration.Port, Is.EqualTo(1234)); - Assert.That(transport.ManagementConnectionConfiguration.UseTls, Is.EqualTo(true)); - }); - } - - [Test] - public void Should_throw_on_invalid_management_credentials() - { - var invalidManagementConnection = new FakeConnectionConfiguration(ManagementConnectionString) - { - UserName = "Copa" - }; - - var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), BrokerConnectionString, invalidManagementConnection.ToConnectionString()); - - var exception = Assert.ThrowsAsync(async () => await transport.Initialize(HostSettings, [], [])) - ?? throw new ArgumentNullException("exception"); - - Assert.That(exception.Message, Does.Contain("Could not access RabbitMQ Management API")); - } - - [Test] - public void Should_throw_on_invalid_management_host() - { - var invalidManagementConnection = new FakeConnectionConfiguration(ManagementConnectionString) - { - Host = "WrongHostName" - }; - - var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), BrokerConnectionString, invalidManagementConnection.ToConnectionString()); - - _ = Assert.ThrowsAsync(async () => await transport.Initialize(HostSettings, [], [])); - } - - [Test] - public void Should_throw_on_invalid_broker_connection_string() - { - var invalidBrokerConnection = new FakeConnectionConfiguration(host: "127.0.0.1", userName: "Copa"); - - var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), invalidBrokerConnection.ToConnectionString(), ManagementConnectionString); - - var exception = Assert.ThrowsAsync(async () => await transport.Initialize(HostSettings, [], [])) - ?? throw new ArgumentNullException("exception"); - - Assert.That(exception.Message, Does.Contain("None of the specified endpoints were reachable")); - } - - [Test] - public void Should_throw_on_invalid_legacy_management_credentials() - { - var invalidManagementConnection = new FakeConnectionConfiguration(ManagementConnectionString) - { - UserName = "Copa" - }; - - // Create transport in legacy mode - var transport = new RabbitMQTransport - { - TopologyFactory = durable => new ConventionalRoutingTopology(durable, QueueType.Quorum), - LegacyApiConnectionString = BrokerConnectionString, - LegacyManagementApiConnectionString = invalidManagementConnection.ToConnectionString() - }; - - var exception = Assert.ThrowsAsync(async () => await transport.Initialize(HostSettings, [], [])); - - Assert.That(exception!.Message, Does.Contain("Could not access RabbitMQ Management API")); - } - - [Test] - public void Should_throw_on_invalid_legacy_management_host() - { - var invalidManagementConnection = new FakeConnectionConfiguration(ManagementConnectionString) - { - Host = "WrongHostName" - }; - - // Create transport in legacy mode - var transport = new RabbitMQTransport - { - TopologyFactory = durable => new ConventionalRoutingTopology(durable, QueueType.Quorum), - LegacyApiConnectionString = BrokerConnectionString, - LegacyManagementApiConnectionString = invalidManagementConnection.ToConnectionString() - }; - - _ = Assert.ThrowsAsync(async () => await transport.Initialize(HostSettings, [], [])); - } - - [Test] - public void Should_throw_on_invalid_legacy_broker_connection_string() - { - var invalidBrokerConnection = new FakeConnectionConfiguration(host: "localhost", port: "5672", virtualHost: "/", userName: "Copa", password: "guest", useTls: "false"); - - // Create transport in legacy mode - var transport = new RabbitMQTransport - { - TopologyFactory = durable => new ConventionalRoutingTopology(durable, QueueType.Quorum), - LegacyApiConnectionString = invalidBrokerConnection.ToConnectionString(), - LegacyManagementApiConnectionString = ManagementConnectionString - }; - - var exception = Assert.ThrowsAsync(async () => await transport.Initialize(HostSettings, [], [])) - ?? throw new ArgumentNullException("exception"); - - Assert.That(exception.Message, Does.Contain("None of the specified endpoints were reachable")); - } - - [Test] - public void Should_connect_to_management_api_with_broker_credentials() - { - var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), BrokerConnectionString); - - Assert.DoesNotThrowAsync(async () => await transport.Initialize(HostSettings, [], [])); - } - - [Test] - public async Task Should_set_default_port_values_for_broker_and_management_connections() - { - var validConnectionWithoutPort = new FakeConnectionConfiguration(BrokerConnectionString) - { - Port = null - }; - - var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), validConnectionWithoutPort.ToConnectionString()); - - _ = await transport.Initialize(HostSettings, [], []); - - Assert.Multiple(() => - { - Assert.That(transport.BrokerConnectionConfiguration.Port, Is.EqualTo(5672)); - Assert.That(transport.ManagementConnectionConfiguration.Port, Is.EqualTo(15672)); - }); - } - - [Test] - public void Should_not_throw_when_DoNotUseManagementClient_is_enabled_and_management_connection_is_invalid() - { - var invalidManagementConnection = new FakeConnectionConfiguration(host: "Copa"); - - var transport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Quorum), BrokerConnectionString, invalidManagementConnection.ToConnectionString()) - { - DoNotUseManagementClient = true - }; - - Assert.DoesNotThrowAsync(async () => await transport.Initialize(HostSettings, [], [])); - } - - public class FakeConnectionConfiguration - { - internal string Host { get; set; } - - internal string? Port { get; set; } - - internal string? VirtualHost { get; set; } - - internal string? UserName { get; set; } - - internal string? Password { get; set; } - - internal string? UseTls { get; set; } - - internal FakeConnectionConfiguration( - string host, - string? port = null, - string? virtualHost = null, - string? userName = null, - string? password = null, - string? useTls = null) - { - Host = host; - Port = port; - VirtualHost = virtualHost; - UserName = userName; - Password = password; - UseTls = useTls; - } - - internal FakeConnectionConfiguration(string connectionString) - { - var parameters = connectionString.Split(';').Select(param => param.Split('=')).ToDictionary(parts => parts[0].ToLower(), parts => parts[1]); - - Host = parameters["host"]; - Port = GetParameterValue(parameters, "port"); - VirtualHost = GetParameterValue(parameters, "virtualhost"); - UserName = GetParameterValue(parameters, "username"); - Password = GetParameterValue(parameters, "password"); - UseTls = GetParameterValue(parameters, "usetls"); - } - - static string? GetParameterValue(Dictionary parameters, string key) => parameters.TryGetValue(key, out var value) ? value : null; - - internal string ToConnectionString() - { - var sb = new StringBuilder(); - _ = sb.Append($"{nameof(Host)}={Host}"); - - if (!string.IsNullOrEmpty(VirtualHost)) - { - _ = sb.Append($";{nameof(VirtualHost)}={VirtualHost}"); - } - if (!string.IsNullOrEmpty(Port)) - { - _ = sb.Append($";{nameof(Port)}={Port}"); - } - if (!string.IsNullOrEmpty(UserName)) - { - _ = sb.Append($";{nameof(UserName)}={UserName}"); - } - if (!string.IsNullOrEmpty(Password)) - { - _ = sb.Append($";{nameof(Password)}={Password}"); - } - if (!string.IsNullOrEmpty(UseTls)) - { - _ = sb.Append($";{nameof(UseTls)}={UseTls}"); - } - return sb.ToString(); - } + Assert.That(brokerDefaults.UseTls, Is.EqualTo(false)); } } } \ No newline at end of file diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/FakeHttpClient.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/FakeHttpClient.cs new file mode 100644 index 000000000..85faaa8f3 --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/FakeHttpClient.cs @@ -0,0 +1,135 @@ +#nullable enable + +namespace NServiceBus.Transport.RabbitMQ.Tests +{ + using System; + using System.Net.Http; + using System.Net; + using System.Text; + using System.Threading; + using System.Threading.Tasks; + using ManagementClient; + using Queue = ManagementClient.Queue; + using System.Text.Json; + + class FakeHttpClient + { + public bool Testing { get; set; } + + public static string ConvertAuthenticationToBase64String(string userName, string password) => + Convert.ToBase64String(Encoding.ASCII.GetBytes($"{userName}:{password}")); + + public static string DecodeAuthenticationHeader(string authenticationHeader) + { + var data = Convert.FromBase64String(authenticationHeader); + return Encoding.ASCII.GetString(data); + } + + public static HttpClient CreateFakeHttpClient(Func fakeResponse) => new(new FakeHttpMessageHandler { FakeResponse = fakeResponse }, true); + + public class FakeHttpMessageHandler : HttpMessageHandler + { + public Func? FakeResponse { get; set; } + + protected override async Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken = default) + { + var response = FakeResponse?.Invoke(request) ?? FakeResponses.NotFound(); + return await Task.FromResult(response); + } + } + + public static class FakeResponses + { + public static HttpResponseMessage Valid() => new() + { + StatusCode = HttpStatusCode.OK + }; + + public static HttpResponseMessage Unauthorized() => new() + { + StatusCode = HttpStatusCode.Unauthorized + }; + + public static HttpResponseMessage NotFound() => new() + { + StatusCode = HttpStatusCode.NotFound + }; + + public static HttpResponseMessage GetQueue(QueueType queueType = QueueType.Quorum, int deliveryLimit = 20) + { + var queue = new Queue + { + Name = "test", + Arguments = new QueueArguments() + { + QueueType = queueType, + }, + DeliveryLimit = deliveryLimit + }; + + var json = JsonSerializer.Serialize(queue); + var httpContent = new StringContent(json); + var response = new HttpResponseMessage + { + StatusCode = HttpStatusCode.OK, + Content = httpContent + }; + return response; + } + + public static HttpResponseMessage GetOverview( + HttpRequestMessage request, + string? expectedUserName = null, + string? expectedPassword = null, + string? expectedUrl = null) + { + + var response = CheckRequestMessageConnection(request, expectedUserName, expectedPassword, expectedUrl); + if (response.StatusCode != HttpStatusCode.OK) + { + return response; + } + + var overview = new Overview + { + ClusterName = "rabbit@my - rabbit", + ProductName = "RabbitMQ", + ProductVersion = new Version(4, 0, 3), + ManagementVersion = new Version(4, 0, 3), + RabbitMqVersion = new Version(4, 0, 3), + Node = "rabbit@my-rabbit" + }; + + var json = JsonSerializer.Serialize(overview); + var httpContent = new StringContent(json); + response.Content = httpContent; + return response; + } + + public static HttpResponseMessage CheckRequestMessageConnection( + HttpRequestMessage request, + string? expectedUserName = null, + string? expectedPassword = null, + string? expectedUrl = null) + { + var PathAndQuery = request.RequestUri?.PathAndQuery ?? string.Empty; + var requestUrl = request.RequestUri?.AbsoluteUri.Replace(PathAndQuery, string.Empty); + var credentials = request.Headers.Authorization?.Parameter; + var expectedCredentials = ConvertAuthenticationToBase64String(expectedUserName ?? "guest", expectedPassword ?? "guest"); + var isValidCredential = string.Equals(expectedCredentials, credentials); + var isValidBaseAddress = string.Equals(expectedUrl ?? "http://localhost:15672", requestUrl); + + return !isValidCredential + ? Unauthorized() + : (isValidBaseAddress ? Valid() : NotFound()); + } + + public static HttpResponseMessage CheckAuthentication(HttpRequestMessage request, string userName, string password) + { + var credentials = request.Headers.Authorization?.Parameter; + var expectedCredentials = ConvertAuthenticationToBase64String(userName, password); + return string.Equals(expectedCredentials, credentials) ? Valid() : Unauthorized(); + } + } + } +} diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/InitializeTransportTests.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/InitializeTransportTests.cs new file mode 100644 index 000000000..a73551c15 --- /dev/null +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/InitializeTransportTests.cs @@ -0,0 +1,113 @@ +namespace NServiceBus.Transport.RabbitMQ.Tests +{ + using System; + using System.Net.Http; + using NServiceBus.Transport.RabbitMQ.Tests.ConnectionString; + using NUnit.Framework; + + [TestFixture] + class InitializeTransportTests + { + static readonly string BrokerConnectionString = Environment.GetEnvironmentVariable("RabbitMQTransport_ConnectionString") ?? "host=localhost"; + static HostSettings HostSettings { get; } = new(nameof(ConnectionConfigurationTests), nameof(ConnectionConfigurationTests), null, null, false); + + [Test] + public void Should_not_throw_with_valid_legacy_management_url() + { + var broker = ConnectionConfiguration.Create(BrokerConnectionString); + // Create transport in legacy mode + var transport = new RabbitMQTransport + { + TopologyFactory = durable => new ConventionalRoutingTopology(durable, QueueType.Quorum), + LegacyApiConnectionString = BrokerConnectionString, + LegacyManagementApiUrl = $"{(broker.UseTls ? "https" : "http")}://{broker.UserName}:{broker.Password}@{broker.Host}:{(broker.UseTls ? "15671" : "15672")}", + DoNotUseManagementClient = false + }; + + Assert.DoesNotThrowAsync(async () => await transport.Initialize(HostSettings, [], [])); + } + + [Test] + public void Should_throw_on_invalid_management_scheme() + { + var broker = ConnectionConfiguration.Create(BrokerConnectionString); + // Create transport in legacy mode + var transport = new RabbitMQTransport + { + TopologyFactory = durable => new ConventionalRoutingTopology(durable, QueueType.Quorum), + LegacyApiConnectionString = BrokerConnectionString, + LegacyManagementApiUrl = $"{(broker.UseTls ? "http" : "https")}://guest:guest@{broker.Host}:{(broker.UseTls ? "15671" : "15672")}", + DoNotUseManagementClient = false + }; + + _ = Assert.ThrowsAsync(async () => await transport.Initialize(HostSettings, [], [])); + } + + [Test] + public void Should_throw_on_invalid_legacy_management_credentials() + { + var broker = ConnectionConfiguration.Create(BrokerConnectionString); + // Create transport in legacy mode + var transport = new RabbitMQTransport + { + TopologyFactory = durable => new ConventionalRoutingTopology(durable, QueueType.Quorum), + LegacyApiConnectionString = BrokerConnectionString, + LegacyManagementApiUrl = $"{(broker.UseTls ? "https" : "http")}://copa:abc123xyz@{broker.Host}:{(broker.UseTls ? "15671" : "15672")}", + DoNotUseManagementClient = false + }; + + _ = Assert.ThrowsAsync(async () => await transport.Initialize(HostSettings, [], [])); + } + + [Test] + public void Should_throw_on_invalid_management_host() + { + var broker = ConnectionConfiguration.Create(BrokerConnectionString); + // Create transport in legacy mode + var transport = new RabbitMQTransport + { + TopologyFactory = durable => new ConventionalRoutingTopology(durable, QueueType.Quorum), + LegacyApiConnectionString = BrokerConnectionString, + LegacyManagementApiUrl = $"{(broker.UseTls ? "https" : "http")}://guest:guest@wronghost:{(broker.UseTls ? "15671" : "15672")}", + DoNotUseManagementClient = false + }; + + _ = Assert.ThrowsAsync(async () => await transport.Initialize(HostSettings, [], [])); + } + + [Test] + public void Should_throw_on_invalid_management_port() + { + var broker = ConnectionConfiguration.Create(BrokerConnectionString); + // Create transport in legacy mode + var transport = new RabbitMQTransport + { + TopologyFactory = durable => new ConventionalRoutingTopology(durable, QueueType.Quorum), + LegacyApiConnectionString = BrokerConnectionString, + LegacyManagementApiUrl = $"{(broker.UseTls ? "https" : "http")}://guest:guest@{broker.Host}:12345", + DoNotUseManagementClient = false + }; + + _ = Assert.ThrowsAsync(async () => await transport.Initialize(HostSettings, [], [])); + } + + string CreateLegacyManagementApiUrl( + bool useTls, + string host, + string userName, + string password, + string port) + { + var scheme = useTls ? "https" : "http"; + return $"{scheme}://{userName}:{password}@{host}:{port}"; + } + + static RabbitMQTransport CreateTransport(string managementApiUrl) => new() + { + TopologyFactory = durable => new ConventionalRoutingTopology(durable, QueueType.Quorum), + LegacyApiConnectionString = BrokerConnectionString, + LegacyManagementApiUrl = managementApiUrl, + DoNotUseManagementClient = false + }; + } +} diff --git a/src/NServiceBus.Transport.RabbitMQ.Tests/ManagementClientTests.cs b/src/NServiceBus.Transport.RabbitMQ.Tests/ManagementClientTests.cs index cede74f2e..b90fa83e0 100644 --- a/src/NServiceBus.Transport.RabbitMQ.Tests/ManagementClientTests.cs +++ b/src/NServiceBus.Transport.RabbitMQ.Tests/ManagementClientTests.cs @@ -4,32 +4,98 @@ namespace NServiceBus.Transport.RabbitMQ.Tests { using System; using System.Collections.Generic; + using System.Diagnostics; using System.Net; + using System.Net.Http; using System.Threading.Tasks; using NServiceBus.Transport.RabbitMQ.ManagementClient; using NUnit.Framework; using NUnit.Framework.Internal; - using ConnectionFactory = ConnectionFactory; - + using static NServiceBus.Transport.RabbitMQ.Tests.FakeHttpClient; [TestFixture] class ManagementClientTests { static readonly string connectionString = Environment.GetEnvironmentVariable("RabbitMQTransport_ConnectionString") ?? "host=localhost"; static readonly ConnectionConfiguration connectionConfiguration = ConnectionConfiguration.Create(connectionString); - static readonly ConnectionConfiguration managementConnectionConfiguration = ConnectionConfiguration.Create(connectionString, isManagementConnection: true); static readonly ConnectionFactory connectionFactory = new(typeof(ManagementClientTests).FullName, connectionConfiguration, null, false, false, TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(10), []); - static readonly ManagementClient client = new(managementConnectionConfiguration); + string defaultManagementUrl; + ManagementClient? managementClient; + + const int defaultBrokerPort = 5672; + const int defaultBrokerTlsPort = 5671; + const int defaultManagementPort = 15672; + const int defaultManagementTlsPort = 15671; + const string defaultUserName = "guest"; + const string defaultPassword = "guest"; + const string defaultVirtualHost = "/"; + + [SetUp] + public void SetUp() => defaultManagementUrl = ManagementClient.CreateManagementConnectionString(connectionConfiguration); + + [Test] + [TestCase("http://localhost", "guest", "guest", "http://localhost:15672")] + [TestCase("https://localhost", "guest", "guest", "https://localhost:15671")] + [TestCase("http://localhost:15672", "guest", "guest", "http://localhost:15672")] + [TestCase("https://localhost:15671", "guest", "guest", "https://localhost:15671")] + [TestCase("http://guest:guest@localhost", "guest", "guest", "http://localhost:15672")] + [TestCase("https://guest:guest@localhost", "guest", "guest", "https://localhost:15671")] + [TestCase("http://guest:guest@localhost:15672", "guest", "guest", "http://localhost:15672")] + [TestCase("https://guest:guest@localhost:15671", "guest", "guest", "https://localhost:15671")] + public async Task GetOverview_Should_Return_Success_With_Valid_Default_Connection_Values( + string managementApiUrl, + string expectedUserName, + string expectedPassword, + string expectedUrl) + { + var HttpClient = CreateFakeHttpClient(request => FakeResponses.GetOverview(request, expectedUserName, expectedPassword, expectedUrl)); + managementClient = CreateManagementClient(managementApiUrl, HttpClient); + + var result = await managementClient.GetOverview(); + + Assert.That(result.StatusCode, Is.EqualTo(HttpStatusCode.OK)); + } + + [Test] + [TestCase("http://localhost", "user", "password", "http://localhost:15672")] + [TestCase("https://localhost", "user", "password", "https://localhost:15671")] + [TestCase("http://localhost:15672", "user", "password", "http://localhost:15672")] + [TestCase("https://localhost:15671", "user", "password", "https://localhost:15671")] + [TestCase("http://guest:guest@localhost", "user", "password", "http://localhost:15672")] + [TestCase("https://guest:guest@localhost", "user", "password", "https://localhost:15671")] + [TestCase("http://guest:guest@localhost:15672", "user", "password", "http://localhost:15672")] + [TestCase("https://guest:guest@localhost:15671", "user", "password", "https://localhost:15671")] + public async Task GetOverview_Should_Return_Unauthorized_With_Invalid_Credentials( + string managementApiUrl, + string expectedUserName, + string expectedPassword, + string expectedUrl) + { + var HttpClient = CreateFakeHttpClient(request => FakeResponses.GetOverview(request, expectedUserName, expectedPassword, expectedUrl)); + managementClient = CreateManagementClient(managementApiUrl, HttpClient); + + var result = await managementClient.GetOverview(); + Assert.That(result.StatusCode, Is.EqualTo(HttpStatusCode.Unauthorized)); + } + + [Test] + public void Should_Throw_With_Invalid_Scheme() + { + var managementApiUrl = "amqp:guest:guest@localhost:15672"; + + var exception = Assert.Throws(() => managementClient = new(managementApiUrl, defaultVirtualHost)); + } [Test] public async Task GetQueue_Should_Return_Queue_Information_When_Exists() { // Arrange + managementClient = new(defaultManagementUrl, defaultVirtualHost); var queueName = nameof(GetQueue_Should_Return_Queue_Information_When_Exists); await CreateQuorumQueue(queueName).ConfigureAwait(false); // Act - var response = await client.GetQueue(queueName); + var response = await managementClient.GetQueue(queueName); // Assert Assert.Multiple(() => @@ -44,7 +110,8 @@ public async Task GetQueue_Should_Return_Queue_Information_When_Exists() public async Task GetOverview_Should_Return_Broker_Information() { // Act - var response = await client.GetOverview(); + managementClient = new(defaultManagementUrl, defaultVirtualHost); + var response = await managementClient.GetOverview(); // Assert Assert.Multiple(() => @@ -62,7 +129,8 @@ public async Task GetOverview_Should_Return_Broker_Information() public async Task GetFeatureFlags_Should_Return_FeatureFlag_Information() { // Act - var response = await client.GetFeatureFlags(); + managementClient = new(defaultManagementUrl, defaultVirtualHost); + var response = await managementClient.GetFeatureFlags(); // Assert Assert.Multiple(() => @@ -80,8 +148,9 @@ public async Task GetFeatureFlags_Should_Return_FeatureFlag_Information() public async Task CreatePolicy_With_DeliveryLimit_Should_Be_Applied_To_Quorum_Queues(int deliveryLimit) { // Arrange + managementClient = new(defaultManagementUrl, defaultVirtualHost); var queueName = nameof(CreatePolicy_With_DeliveryLimit_Should_Be_Applied_To_Quorum_Queues); - var policyName = $"{queueName} policy"; + var policyName = $"nsb.{queueName}"; await CreateQuorumQueue(queueName); // Act @@ -96,30 +165,40 @@ public async Task CreatePolicy_With_DeliveryLimit_Should_Be_Applied_To_Quorum_Qu Pattern = queueName, Priority = 100 }; - await client.CreatePolicy(policy); + await managementClient.CreatePolicy(policy); // Assert // It can take some time for updated policies to be applied, so we need to wait. - // If this test is randomly failing, consider increasing the delay - await Task.Delay(10000); - var response = await client.GetQueue(queueName); - Assert.Multiple(() => + // If this test is randomly failing, consider increasing the maxWaitTime + var maxWaitTime = TimeSpan.FromSeconds(30); + var pollingInterval = TimeSpan.FromSeconds(2); + var stopwatch = Stopwatch.StartNew(); + while (stopwatch.Elapsed < maxWaitTime) { - Assert.That(response.StatusCode, Is.EqualTo(HttpStatusCode.OK)); - Assert.That(response.Value, Is.Not.Null); - Assert.That(response.Value?.AppliedPolicyName, Is.EqualTo(policyName)); - Assert.That(response.Value?.EffectivePolicyDefinition?.DeliveryLimit, Is.EqualTo(deliveryLimit)); - }); + var response = await managementClient.GetQueue(queueName); + if (response.StatusCode == HttpStatusCode.OK + && response.Value != null + && response.Value.AppliedPolicyName == policyName + && response.Value.EffectivePolicyDefinition?.DeliveryLimit == deliveryLimit) + { + // Policy applied successfully + return; + } + await Task.Delay(pollingInterval); + } + Assert.Fail($"Policy '{policyName}' was not applied to queue '{queueName}' within {maxWaitTime.TotalSeconds} seconds."); } static async Task CreateQuorumQueue(string queueName) { using var connection = await connectionFactory.CreateConnection($"{queueName} connection").ConfigureAwait(false); using var channel = await connection.CreateChannelAsync().ConfigureAwait(false); - var arguments = new Dictionary { { "x-queue-type", "quorum" } }; + var arguments = new Dictionary { { "x-queue-type", "quorum" }, { "delivery_limit", 5 } }; _ = await channel.QueueDeclareAsync(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments); } + + static ManagementClient CreateManagementClient(string managementApiUrl, HttpClient httpClient) => new(defaultVirtualHost, httpClient, managementApiUrl); } } diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/BrokerVerifier.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/BrokerVerifier.cs index 99730d95c..ee90a431a 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Administration/BrokerVerifier.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/BrokerVerifier.cs @@ -5,18 +5,17 @@ namespace NServiceBus.Transport.RabbitMQ; using System; using System.Threading; using System.Threading.Tasks; -using ManagementClient; +using ManagementClientClass = ManagementClient.ManagementClient; using NServiceBus.Logging; +using NServiceBus.Transport.RabbitMQ.ManagementClient; using Polly; -class BrokerVerifier(ConnectionFactory connectionFactory, bool managementClientAvailable, ConnectionConfiguration connectionConfiguration) +class BrokerVerifier(ConnectionFactory connectionFactory, bool managementClientAvailable, ManagementClientClass managementClient, ILog? logger = null) { - static readonly ILog Logger = LogManager.GetLogger(typeof(BrokerVerifier)); + readonly ILog Logger = logger ?? LogManager.GetLogger(typeof(BrokerVerifier)); static readonly Version MinimumSupportedRabbitMqVersion = Version.Parse("3.10.0"); static readonly Version RabbitMqVersion4 = Version.Parse("4.0.0"); - readonly ManagementClient.ManagementClient managementClient = new(connectionConfiguration); - Version? brokerVersion; public async Task Initialize(CancellationToken cancellationToken = default) @@ -125,7 +124,7 @@ bool ShouldOverrideDeliveryLimit(Queue queue) return true; } - static async Task GetFullQueueDetails(ManagementClient.ManagementClient managementClient, string queueName, CancellationToken cancellationToken) + async Task GetFullQueueDetails(ManagementClientClass managementClient, string queueName, CancellationToken cancellationToken) { var retryPolicy = Polly.Policy .HandleResult>(response => response.Value?.EffectivePolicyDefinition is null) @@ -155,7 +154,7 @@ bool ShouldOverrideDeliveryLimit(Queue queue) return response?.Value?.EffectivePolicyDefinition is not null ? response.Value : null; } - static async Task SetDeliveryLimitViaPolicy(ManagementClient.ManagementClient managementClient, Queue queue, Version brokerVersion, CancellationToken cancellationToken) + static async Task SetDeliveryLimitViaPolicy(ManagementClientClass managementClient, Queue queue, Version brokerVersion, CancellationToken cancellationToken) { if (!string.IsNullOrEmpty(queue.AppliedPolicyName)) { diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/ManagementClient.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/ManagementClient.cs index ae05e9684..17e7c44e6 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/ManagementClient.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/ManagementClient.cs @@ -3,6 +3,7 @@ namespace NServiceBus.Transport.RabbitMQ.ManagementClient; using System; +using System.Collections.Generic; using System.Net.Http; using System.Net.Http.Headers; using System.Net.Http.Json; @@ -16,24 +17,74 @@ class ManagementClient readonly string virtualHost; readonly string escapedVirtualHost; + const int defaultManagementPort = 15672; + const int defaultManagementTlsPort = 15671; + const string defaultVirtualHost = "/"; + const string defaultUserName = "guest"; + const string defaultPassword = "guest"; + + public ManagementClient(string managementApiUrl, string virtualHost) + { + ArgumentNullException.ThrowIfNull(managementApiUrl, nameof(managementApiUrl)); + + var managementUri = GenerateUri(managementApiUrl); + + var scheme = managementUri.Scheme; + var host = managementUri.Host; + var port = managementUri.Port; + var userName = managementUri.UserName; + var password = managementUri.Password; + + this.virtualHost = virtualHost; + escapedVirtualHost = Uri.EscapeDataString(virtualHost); + + httpClient = CreateHttpClient(scheme, host, port, userName, password); + } + public ManagementClient(ConnectionConfiguration connectionConfiguration) { ArgumentNullException.ThrowIfNull(connectionConfiguration, nameof(connectionConfiguration)); - virtualHost = connectionConfiguration.VirtualHost; + var scheme = connectionConfiguration.UseTls ? "https" : "http"; + var host = connectionConfiguration.Host ?? "localhost"; + var port = connectionConfiguration.UseTls ? defaultManagementTlsPort : defaultManagementPort; + var userName = connectionConfiguration.UserName ?? defaultUserName; + var password = connectionConfiguration.Password ?? defaultPassword; + + virtualHost = connectionConfiguration.VirtualHost ?? defaultVirtualHost; escapedVirtualHost = Uri.EscapeDataString(virtualHost); - var uriBuilder = new UriBuilder - { - Scheme = connectionConfiguration.UseTls ? "https" : "http", - Host = connectionConfiguration.Host, - Port = connectionConfiguration.Port, - }; + httpClient = CreateHttpClient(scheme, host, port, userName, password); + } - httpClient = new HttpClient { BaseAddress = uriBuilder.Uri }; - httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue( - "Basic", - Convert.ToBase64String(Encoding.ASCII.GetBytes($"{connectionConfiguration.UserName}:{connectionConfiguration.Password}"))); + // This is used for testing + public ManagementClient(string virtualHost, HttpClient httpClient, string managementApiUrl) + { + var managementUri = GenerateUri(managementApiUrl); + + var userName = managementUri.UserName; + var password = managementUri.Password; + + // Clear these values for setting the baseAddress of the httpClient + managementUri.UserName = string.Empty; + managementUri.Password = string.Empty; + + this.httpClient = httpClient; + httpClient.BaseAddress = managementUri.Uri; + SetManagementClientAuthorization(httpClient, userName, password); + + this.virtualHost = virtualHost; + escapedVirtualHost = Uri.EscapeDataString(virtualHost); + } + + public static string CreateManagementConnectionString(ConnectionConfiguration connectionConfiguration) + { + var scheme = connectionConfiguration.UseTls ? "https" : "http"; + var userName = connectionConfiguration.UserName ?? defaultUserName; + var password = connectionConfiguration.Password ?? defaultPassword; + var host = connectionConfiguration.Host ?? "localhost"; + var port = connectionConfiguration.UseTls ? defaultManagementTlsPort : defaultManagementPort; + return $"{scheme}://{userName}:{password}@{host}:{port}"; } public async Task> GetQueue(string queueName, CancellationToken cancellationToken = default) @@ -101,4 +152,107 @@ public async Task CreatePolicy(Policy policy, CancellationToken cancellationToke response.EnsureSuccessStatusCode(); } + + UriBuilder GenerateUri(string managementApiUrl) + { + var dictionary = ParseManagementApiUrl(managementApiUrl); + + var invalidOptionsMessage = new StringBuilder(); + + var scheme = GetValue(dictionary, "scheme", "http"); + ValidateScheme(scheme, invalidOptionsMessage); + + var useTls = scheme.Equals("https", StringComparison.OrdinalIgnoreCase); + var host = GetValue(dictionary, "host", "localhost"); + var port = GetValue(dictionary, "port", int.TryParse, useTls ? defaultManagementTlsPort : defaultManagementPort, invalidOptionsMessage); + var userName = GetValue(dictionary, "userName", defaultUserName); + var password = GetValue(dictionary, "password", defaultPassword); + + if (invalidOptionsMessage.Length > 0) + { + throw new NotSupportedException(invalidOptionsMessage.ToString().TrimEnd('\r', '\n')); + } + + return new UriBuilder + { + Scheme = scheme, + Host = host, + Port = port, + UserName = userName, + Password = password, + }; + } + + static void ValidateScheme(string scheme, StringBuilder invalidOptionsMessage) + { + if (!scheme.Equals("http", StringComparison.OrdinalIgnoreCase) && + !scheme.Equals("https", StringComparison.OrdinalIgnoreCase)) + { + _ = invalidOptionsMessage.AppendLine("Invalid scheme for RabbitMQ management API, use either 'http' or 'https' in the ManagementApiUrl"); + } + } + + static string GetValue(Dictionary dictionary, string key, string defaultValue) + { + return dictionary.TryGetValue(key, out var value) ? value : defaultValue; + } + + delegate bool Convert(string input, out T output); + + static T GetValue(Dictionary dictionary, string key, Convert convert, T defaultValue, StringBuilder invalidOptionsMessage) + { + if (dictionary.TryGetValue(key, out var value)) + { + if (!convert(value, out defaultValue)) + { + invalidOptionsMessage.AppendLine($"'{value}' is not a valid {typeof(T).Name} value for the '{key}' connection string option."); + } + } + + return defaultValue; + } + + HttpClient CreateHttpClient(string scheme, string host, int port, string userName, string password) + { + var uriBuilder = new UriBuilder + { + Scheme = scheme, + Host = host, + Port = port, + }; + + var client = new HttpClient { BaseAddress = uriBuilder.Uri }; + SetManagementClientAuthorization(client, userName, password); + return client; + } + + static Dictionary ParseManagementApiUrl(string managementApiUrl) + { + var dictionary = new Dictionary(); + if (!Uri.TryCreate(managementApiUrl, UriKind.Absolute, out var uri)) + { + throw new UriFormatException($"The RabbitMQTransport.ManagementApiUrl is not a valid URI format."); + } + var useTls = uri.Scheme.Equals("https", StringComparison.OrdinalIgnoreCase); + + var isPortSpecified = managementApiUrl.Contains($":{uri.Port}"); + var port = !isPortSpecified && uri.IsDefaultPort + ? (useTls ? defaultManagementTlsPort : defaultManagementPort) + : uri.Port; + + var userInfo = uri.UserInfo.Split(':') ?? []; + + dictionary.Add("scheme", uri.Scheme); + dictionary.Add("host", uri.Host); + dictionary.Add("port", port.ToString()); + dictionary.Add("userName", userInfo.Length > 0 && !string.IsNullOrEmpty(userInfo[0]) ? userInfo[0] : defaultUserName); + dictionary.Add("password", userInfo.Length > 1 && !string.IsNullOrEmpty(userInfo[1]) ? userInfo[1] : defaultPassword); + + return dictionary; + } + + void SetManagementClientAuthorization(HttpClient client, string userName, string password) => + client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue( + "Basic", + Convert.ToBase64String(Encoding.ASCII.GetBytes($"{userName}:{password}"))); } diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/Queue.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/Queue.cs index 9fdd72001..17ab2fff0 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/Queue.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/Queue.cs @@ -33,5 +33,5 @@ class Queue() public string? AppliedOperatorPolicyName { get; set; } [JsonExtensionData] - public IDictionary ExtraProperties { get; } = new Dictionary(); + public IDictionary ExtraProperties { get; init; } = new Dictionary(); } \ No newline at end of file diff --git a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/QueueArguments.cs b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/QueueArguments.cs index ecc99d1be..4d5b9a7e5 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/QueueArguments.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Administration/ManagementClient/Models/QueueArguments.cs @@ -17,6 +17,6 @@ class QueueArguments public int? DeliveryLimit { get; set; } [JsonExtensionData] - public IDictionary ExtraProperties { get; } = new Dictionary(); + public IDictionary ExtraProperties { get; init; } = new Dictionary(); } diff --git a/src/NServiceBus.Transport.RabbitMQ/Configuration/ConnectionConfiguration.cs b/src/NServiceBus.Transport.RabbitMQ/Configuration/ConnectionConfiguration.cs index 80fe920e8..3eca08219 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Configuration/ConnectionConfiguration.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Configuration/ConnectionConfiguration.cs @@ -9,10 +9,8 @@ class ConnectionConfiguration { const bool defaultUseTls = false; - const int defaultBrokerPort = 5672; - const int defaultBrokerTlsPort = 5671; - const int defaultManagementPort = 15672; - const int defaultManagementTlsPort = 15671; + const int defaultPort = 5672; + const int defaultTlsPort = 5671; const string defaultVirtualHost = "/"; const string defaultUserName = "guest"; const string defaultPassword = "guest"; @@ -45,7 +43,7 @@ class ConnectionConfiguration UseTls = useTls; } - public static ConnectionConfiguration Create(string connectionString, bool isManagementConnection = false) + public static ConnectionConfiguration Create(string connectionString) { Dictionary dictionary; var invalidOptionsMessage = new StringBuilder(); @@ -61,10 +59,7 @@ public static ConnectionConfiguration Create(string connectionString, bool isMan var host = GetValue(dictionary, "host", string.Empty); var useTls = GetValue(dictionary, "useTls", bool.TryParse, defaultUseTls, invalidOptionsMessage); - var port = GetValue(dictionary, "port", int.TryParse, useTls ? - (isManagementConnection ? defaultManagementTlsPort : defaultBrokerTlsPort) : - (isManagementConnection ? defaultManagementPort : defaultBrokerPort), - invalidOptionsMessage); + var port = GetValue(dictionary, "port", int.TryParse, useTls ? defaultTlsPort : defaultPort, invalidOptionsMessage); var virtualHost = GetValue(dictionary, "virtualHost", defaultVirtualHost); var userName = GetValue(dictionary, "userName", defaultUserName); var password = GetValue(dictionary, "password", defaultPassword); @@ -77,18 +72,6 @@ public static ConnectionConfiguration Create(string connectionString, bool isMan return new ConnectionConfiguration(host, port, virtualHost, userName, password, useTls); } - public static ConnectionConfiguration ConvertToManagementConnection(ConnectionConfiguration brokerConnectionConfiguration) - { - var virtualHost = brokerConnectionConfiguration.VirtualHost; - var host = brokerConnectionConfiguration.Host; - var port = defaultManagementPort; - var useTls = brokerConnectionConfiguration.UseTls; - var userName = brokerConnectionConfiguration.UserName; - var password = brokerConnectionConfiguration.Password; - - return new ConnectionConfiguration(host, port, virtualHost, userName, password, useTls); - } - static Dictionary ParseAmqpConnectionString(string connectionString, StringBuilder invalidOptionsMessage) { var dictionary = new Dictionary(); diff --git a/src/NServiceBus.Transport.RabbitMQ/Configuration/RabbitMQTransportSettingsExtensions.cs b/src/NServiceBus.Transport.RabbitMQ/Configuration/RabbitMQTransportSettingsExtensions.cs index 6227c20f9..a25946437 100644 --- a/src/NServiceBus.Transport.RabbitMQ/Configuration/RabbitMQTransportSettingsExtensions.cs +++ b/src/NServiceBus.Transport.RabbitMQ/Configuration/RabbitMQTransportSettingsExtensions.cs @@ -102,12 +102,12 @@ public static TransportExtensions ConnectionString(this Trans [PreObsolete("https://github.com/Particular/NServiceBus/issues/6811", Message = "The configuration has been moved to RabbitMQTransport class.", Note = "Should not be converted to an ObsoleteEx until API mismatch described in issue is resolved.")] - public static TransportExtensions ManagementConnectionString(this TransportExtensions transportExtensions, string connectionString) + public static TransportExtensions ManagementApiUrl(this TransportExtensions transportExtensions, string connectionUrl) { ArgumentNullException.ThrowIfNull(transportExtensions); - ArgumentException.ThrowIfNullOrWhiteSpace(connectionString); + ArgumentException.ThrowIfNullOrWhiteSpace(connectionUrl); - transportExtensions.Transport.LegacyManagementApiConnectionString = connectionString; + transportExtensions.Transport.LegacyManagementApiUrl = connectionUrl; return transportExtensions; } @@ -117,12 +117,12 @@ public static TransportExtensions ManagementConnectionString( [PreObsolete("https://github.com/Particular/NServiceBus/issues/6811", Message = "The configuration has been moved to RabbitMQTransport class.", Note = "Should not be converted to an ObsoleteEx until API mismatch described in issue is resolved.")] - public static TransportExtensions ManagementConnectionString(this TransportExtensions transportExtensions, Func getConnectionString) + public static TransportExtensions ManagementApiUrl(this TransportExtensions transportExtensions, Func getConnectionUrl) { ArgumentNullException.ThrowIfNull(transportExtensions); - ArgumentNullException.ThrowIfNull(getConnectionString); + ArgumentNullException.ThrowIfNull(getConnectionUrl); - transportExtensions.Transport.LegacyManagementApiConnectionString = getConnectionString(); + transportExtensions.Transport.LegacyManagementApiUrl = getConnectionUrl(); return transportExtensions; } @@ -368,5 +368,21 @@ public static TransportExtensions UseExternalAuthMechanism(th transportExtensions.Transport.UseExternalAuthMechanism = true; return transportExtensions; } + + /// + /// Specifies that an external authentication mechanism should be used for client authentication. + /// + /// + [PreObsolete("https://github.com/Particular/NServiceBus/issues/6811", + ReplacementTypeOrMember = "RabbitMQTransport.UseExternalAuthMechanism", + Message = "The configuration has been moved to RabbitMQTransport class.", + Note = "Should not be converted to an ObsoleteEx until API mismatch described in issue is resolved.")] + public static TransportExtensions DoNotUseManagementClient(this TransportExtensions transportExtensions) + { + ArgumentNullException.ThrowIfNull(transportExtensions); + + transportExtensions.Transport.DoNotUseManagementClient = true; + return transportExtensions; + } } } diff --git a/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs b/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs index 4c2b1d16e..eb66a3ed4 100644 --- a/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs +++ b/src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs @@ -9,6 +9,7 @@ using RabbitMQ.Client.Events; using Transport; using Transport.RabbitMQ; + using Transport.RabbitMQ.ManagementClient; using ConnectionFactory = Transport.RabbitMQ.ConnectionFactory; /// @@ -31,17 +32,6 @@ public class RabbitMQTransport : TransportDefinition /// The routing topology to use. /// The connection string to use when connecting to the broker. public RabbitMQTransport(RoutingTopology routingTopology, string connectionString) - : this(routingTopology, connectionString, null) - { - } - - /// - /// Creates a new instance of the RabbitMQ transport. - /// - /// The routing topology to use. - /// The connection string to use when connecting to the broker. - /// The connection string to use when connecting to the management API - public RabbitMQTransport(RoutingTopology routingTopology, string connectionString, string managementConnectionString) : base(TransportTransactionMode.ReceiveOnly, supportsDelayedDelivery: true, supportsPublishSubscribe: true, @@ -52,10 +42,6 @@ public RabbitMQTransport(RoutingTopology routingTopology, string connectionStrin RoutingTopology = routingTopology.Create(); BrokerConnectionConfiguration = ConnectionConfiguration.Create(connectionString); - - ManagementConnectionConfiguration = string.IsNullOrEmpty(managementConnectionString) ? - ConnectionConfiguration.ConvertToManagementConnection(BrokerConnectionConfiguration) : - ConnectionConfiguration.Create(managementConnectionString, isManagementConnection: true); } /// @@ -65,18 +51,6 @@ public RabbitMQTransport(RoutingTopology routingTopology, string connectionStrin /// The connection string to use when connecting to the broker. /// Should the delayed delivery infrastructure be created by the endpoint public RabbitMQTransport(RoutingTopology routingTopology, string connectionString, bool enableDelayedDelivery) - : this(routingTopology, connectionString, enableDelayedDelivery, null) - { - } - - /// - /// Creates a new instance of the RabbitMQ transport. - /// - /// The routing topology to use. - /// The connection string to use when connecting to the broker. - /// Should the delayed delivery infrastructure be created by the endpoint - /// The connection string to use when connecting to the management API - public RabbitMQTransport(RoutingTopology routingTopology, string connectionString, bool enableDelayedDelivery, string managementConnectionString) : base(TransportTransactionMode.ReceiveOnly, supportsDelayedDelivery: enableDelayedDelivery, supportsPublishSubscribe: true, @@ -87,15 +61,10 @@ public RabbitMQTransport(RoutingTopology routingTopology, string connectionStrin RoutingTopology = routingTopology.Create(); BrokerConnectionConfiguration = ConnectionConfiguration.Create(connectionString); - ManagementConnectionConfiguration = string.IsNullOrEmpty(managementConnectionString) ? - ConnectionConfiguration.ConvertToManagementConnection(BrokerConnectionConfiguration) : - ConnectionConfiguration.Create(managementConnectionString, isManagementConnection: true); } internal ConnectionConfiguration BrokerConnectionConfiguration { get; set; } - internal ConnectionConfiguration ManagementConnectionConfiguration { get; set; } - internal IRoutingTopology RoutingTopology { get; set; } /// @@ -175,6 +144,14 @@ public PrefetchCountCalculation PrefetchCountCalculation /// public bool DoNotUseManagementClient { get; set; } = false; + /// + /// Basic authentication HTTP connection string to the RabbitMQ management API. + /// + /// + /// E.g. https://username:password@localhost:15671 + /// + public string ManagementApiUrl { get; set; } + /// /// The interval for heartbeats between the endpoint and the broker. /// @@ -244,13 +221,11 @@ public override async Task Initialize(HostSettings host additionalClusterNodes ); - // Uses the legacy Management API connection string or default to the RabbitMQ broker connection credentials - if (!string.IsNullOrEmpty(LegacyManagementApiConnectionString)) - { - ManagementConnectionConfiguration = ConnectionConfiguration.Create(LegacyManagementApiConnectionString, isManagementConnection: true); - } + var managementClient = !string.IsNullOrEmpty(ManagementApiUrl) + ? new ManagementClient(ManagementApiUrl, BrokerConnectionConfiguration.VirtualHost) + : new ManagementClient(BrokerConnectionConfiguration); - var brokerVerifier = new BrokerVerifier(connectionFactory, !DoNotUseManagementClient, ManagementConnectionConfiguration); + var brokerVerifier = new BrokerVerifier(connectionFactory, !DoNotUseManagementClient, managementClient); await brokerVerifier.Initialize(cancellationToken).ConfigureAwait(false); var channelProvider = new ChannelProvider(connectionFactory, NetworkRecoveryInterval, RoutingTopology); @@ -291,7 +266,7 @@ void ValidateAndApplyCertCollections() => certCollection ??= ClientCertificate ! internal string LegacyApiConnectionString { get; set; } - internal string LegacyManagementApiConnectionString { get; set; } + internal string LegacyManagementApiUrl { get; set; } internal Func TopologyFactory { get; set; } @@ -317,10 +292,9 @@ void ValidateAndApplyLegacyConfiguration() RoutingTopology = TopologyFactory(UseDurableExchangesAndQueues); BrokerConnectionConfiguration = ConnectionConfiguration.Create(LegacyApiConnectionString); - // Uses the legacy management API connection string or build the string from the legacy broker connection configuration - ManagementConnectionConfiguration = !string.IsNullOrEmpty(LegacyManagementApiConnectionString) ? - ConnectionConfiguration.Create(LegacyManagementApiConnectionString, isManagementConnection: true) : - ConnectionConfiguration.ConvertToManagementConnection(BrokerConnectionConfiguration); + ManagementApiUrl = !string.IsNullOrEmpty(LegacyManagementApiUrl) + ? LegacyManagementApiUrl + : ManagementClient.CreateManagementConnectionString(BrokerConnectionConfiguration); } void VaildateTopologyFactory() @@ -337,11 +311,6 @@ void ValidateConnectionString() { throw new Exception("A connection string must be configured with 'EndpointConfiguration.UseTransport().ConnectionString()` method."); } - - if (!DoNotUseManagementClient && string.IsNullOrEmpty(LegacyManagementApiConnectionString)) - { - throw new Exception("A management API connection string must be configured with 'EndpointConfiguration.UseTransport().ManagementConnectionString()` method."); - } } } } \ No newline at end of file