From cd34ded5876f9f4d2ceda320127f12f5d366162b Mon Sep 17 00:00:00 2001 From: Luke Butters Date: Mon, 22 Sep 2025 15:02:38 +1000 Subject: [PATCH 1/6] Add Redis observability infrastructure - Add IMessageSerialiserAndDataStreamStorageExceptionObserver interface for monitoring message serialization exceptions - Create MessageSerialiserAndDataStreamStorageWithExceptionObserver decorator to wrap existing implementations - Add NoOpMessageSerialiserAndDataStreamStorageExceptionObserver as default implementation - Update RedisPendingRequestQueueFactory to accept and use exception observer - Add IRedisFacadeObserver interface for monitoring Redis connection events and retry operations - Create NoOpRedisFacadeObserver as default implementation - Update RedisFacade to accept observer and notify on: - Redis connection failures (OnRedisConnectionFailed) - Redis error messages (OnRedisServerRepliedWithAnErrorMessage) - Redis connection restoration (OnRedisConnectionRestored) - Redis operation failures and retries (OnRedisOperationFailed) - Update RedisFacadeBuilder to accept observer parameter - Add comprehensive tests for both observer implementations - Update existing code to use no-op observers by default This enables monitoring and logging of Redis infrastructure issues and message processing errors. --- .../RedisPendingRequestQueueBuilder.cs | 3 +- .../Halibut.Tests/ManyPollingTentacleTests.cs | 1 + ...dDataStreamStorageExceptionObserverTest.cs | 131 +++++++++++++++++ .../RedisHelpers/RedisFacadeObserverTest.cs | 136 ++++++++++++++++++ .../Queue/Redis/Utils/RedisFacadeBuilder.cs | 9 +- ...erAndDataStreamStorageExceptionObserver.cs | 19 +++ ...dDataStreamStorageWithExceptionObserver.cs | 79 ++++++++++ ...erAndDataStreamStorageExceptionObserver.cs | 27 ++++ .../RedisHelpers/IRedisFacadeObserver.cs | 40 ++++++ .../RedisHelpers/NoOpRedisFacadeObserver.cs | 57 ++++++++ .../Queue/Redis/RedisHelpers/RedisFacade.cs | 21 ++- .../Redis/RedisPendingRequestQueueFactory.cs | 21 ++- 12 files changed, 531 insertions(+), 13 deletions(-) create mode 100644 source/Halibut.Tests/Queue/Redis/MessageSerialiserAndDataStreamStorageExceptionObserverTest.cs create mode 100644 source/Halibut.Tests/Queue/Redis/RedisHelpers/RedisFacadeObserverTest.cs create mode 100644 source/Halibut/Queue/Redis/MessageStorage/IMessageSerialiserAndDataStreamStorageExceptionObserver.cs create mode 100644 source/Halibut/Queue/Redis/MessageStorage/MessageSerialiserAndDataStreamStorageWithExceptionObserver.cs create mode 100644 source/Halibut/Queue/Redis/MessageStorage/NoOpMessageSerialiserAndDataStreamStorageExceptionObserver.cs create mode 100644 source/Halibut/Queue/Redis/RedisHelpers/IRedisFacadeObserver.cs create mode 100644 source/Halibut/Queue/Redis/RedisHelpers/NoOpRedisFacadeObserver.cs diff --git a/source/Halibut.Tests/Builders/RedisPendingRequestQueueBuilder.cs b/source/Halibut.Tests/Builders/RedisPendingRequestQueueBuilder.cs index 90ce5b7a2..3fca441f6 100644 --- a/source/Halibut.Tests/Builders/RedisPendingRequestQueueBuilder.cs +++ b/source/Halibut.Tests/Builders/RedisPendingRequestQueueBuilder.cs @@ -65,7 +65,8 @@ public QueueHolder Build() var redisTransport = new HalibutRedisTransport(redisFacade); var dataStreamStore = new InMemoryStoreDataStreamsForDistributedQueues(); var messageSerializer = new QueueMessageSerializerBuilder().Build(); - var messageReaderWriter = new MessageSerialiserAndDataStreamStorage(messageSerializer, dataStreamStore); + var baseMessageReaderWriter = new MessageSerialiserAndDataStreamStorage(messageSerializer, dataStreamStore); + var messageReaderWriter = new MessageSerialiserAndDataStreamStorageWithExceptionObserver(baseMessageReaderWriter, NoOpMessageSerialiserAndDataStreamStorageExceptionObserver.Instance); var queue = new RedisPendingRequestQueue(endpoint, new RedisNeverLosesData(), log, redisTransport, messageReaderWriter, halibutTimeoutsAndLimits); if (defaultDelayBeforeSubscribingToRequestCancellation != null) diff --git a/source/Halibut.Tests/ManyPollingTentacleTests.cs b/source/Halibut.Tests/ManyPollingTentacleTests.cs index 8c1193def..5a228c8b9 100644 --- a/source/Halibut.Tests/ManyPollingTentacleTests.cs +++ b/source/Halibut.Tests/ManyPollingTentacleTests.cs @@ -12,6 +12,7 @@ using Halibut.Logging; using Halibut.Queue.QueuedDataStreams; using Halibut.Queue.Redis; +using Halibut.Queue.Redis.MessageStorage; using Halibut.Queue.Redis.RedisDataLossDetection; using Halibut.Queue.Redis.RedisHelpers; using Halibut.ServiceModel; diff --git a/source/Halibut.Tests/Queue/Redis/MessageSerialiserAndDataStreamStorageExceptionObserverTest.cs b/source/Halibut.Tests/Queue/Redis/MessageSerialiserAndDataStreamStorageExceptionObserverTest.cs new file mode 100644 index 000000000..f78f492e9 --- /dev/null +++ b/source/Halibut.Tests/Queue/Redis/MessageSerialiserAndDataStreamStorageExceptionObserverTest.cs @@ -0,0 +1,131 @@ +#if NET8_0_OR_GREATER +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using FluentAssertions; +using Halibut.Diagnostics; +using Halibut.Diagnostics.LogCreators; +using Halibut.Logging; +using Halibut.Queue; +using Halibut.Queue.MessageStreamWrapping; +using Halibut.Queue.QueuedDataStreams; +using Halibut.Queue.Redis; +using Halibut.Queue.Redis.MessageStorage; +using Halibut.Queue.Redis.RedisHelpers; +using Halibut.Tests.Builders; +using Halibut.Tests.Queue.Redis.Utils; +using Halibut.Tests.Support; +using Halibut.Tests.Support.Logging; +using Halibut.Tests.Support.TestAttributes; +using Halibut.Tests.Util; +using Halibut.TestUtils.Contracts; +using Halibut.Transport.Protocol; +using NUnit.Framework; + +namespace Halibut.Tests.Queue.Redis +{ + [RedisTest] + public class MessageSerialiserAndDataStreamStorageExceptionObserverTest : BaseTest + { + [Test] + public async Task WhenStoreDataStreamsThrows_ExceptionObserverShouldBeNotified() + { + // Arrange + var endpoint = new Uri("poll://" + Guid.NewGuid()); + await using var redisFacade = RedisFacadeBuilder.CreateRedisFacade(); + var redisTransport = new HalibutRedisTransport(redisFacade); + + var expectedException = new InvalidOperationException("Test exception from storage"); + var throwingDataStreamStorage = new ThrowingStoreDataStreamsForDistributedQueues(expectedException); + var testObserver = new TestMessageSerialiserAndDataStreamStorageExceptionObserver(); + + var queueMessageSerializer = new QueueMessageSerializerBuilder().Build(); + var logFactory = new TestContextLogCreator("Redis", LogLevel.Trace).ToCachingLogFactory(); + var factory = new RedisPendingRequestQueueFactory( + queueMessageSerializer, + throwingDataStreamStorage, + new RedisNeverLosesData(), + redisTransport, + new HalibutTimeoutsAndLimits(), + logFactory, + testObserver); + + var sut = (RedisPendingRequestQueue)factory.CreateQueue(endpoint); + await sut.WaitUntilQueueIsSubscribedToReceiveMessages(); + + // Create a request with data streams to trigger PrepareRequest -> StoreDataStreams + var request = new RequestMessageBuilder("poll://test-endpoint").Build(); + request.Params = new[] { new ComplexObjectMultipleDataStreams(DataStream.FromString("hello"), DataStream.FromString("world")) }; + + // Act & Assert + var exception = await AssertThrowsAny.Exception(async () => await sut.QueueAndWaitAsync(request, CancellationToken)); + + // Verify that the observer was called + testObserver.ObservedException.Should().NotBeNull("Exception observer should have been called"); + testObserver.ObservedException.Should().BeSameAs(expectedException, "Observer should receive the original exception"); + testObserver.MethodName.Should().Be(nameof(IMessageSerialiserAndDataStreamStorage.PrepareRequest), "Observer should know which method threw the exception"); + + // Verify the original exception is still thrown + exception.Should().NotBeNull("Original exception should still be thrown"); + exception.InnerException.Should().BeSameAs(expectedException, "Original exception should be preserved"); + } + + public class QueueMessageSerializerBuilder + { + public QueueMessageSerializer Build() + { + var typeRegistry = new TypeRegistry(); + typeRegistry.Register(typeof(IComplexObjectService)); + + StreamCapturingJsonSerializer StreamCapturingSerializer() + { + var settings = MessageSerializerBuilder.CreateSerializer(); + var binder = new RegisteredSerializationBinder(typeRegistry); + settings.SerializationBinder = binder; + return new StreamCapturingJsonSerializer(settings); + } + + return new QueueMessageSerializer(StreamCapturingSerializer, new MessageStreamWrappers()); + } + } + + class ThrowingStoreDataStreamsForDistributedQueues : IStoreDataStreamsForDistributedQueues + { + readonly Exception exceptionToThrow; + + public ThrowingStoreDataStreamsForDistributedQueues(Exception exceptionToThrow) + { + this.exceptionToThrow = exceptionToThrow; + } + + public Task StoreDataStreams(IReadOnlyList dataStreams, CancellationToken cancellationToken) + { + throw exceptionToThrow; + } + + public Task RehydrateDataStreams(byte[] dataStreamMetadata, List rehydrateDataStreams, CancellationToken cancellationToken) + { + throw exceptionToThrow; + } + + public ValueTask DisposeAsync() + { + return ValueTask.CompletedTask; + } + } + + class TestMessageSerialiserAndDataStreamStorageExceptionObserver : IMessageSerialiserAndDataStreamStorageExceptionObserver + { + public Exception? ObservedException { get; private set; } + public string? MethodName { get; private set; } + + public void OnException(Exception exception, string methodName) + { + ObservedException = exception; + MethodName = methodName; + } + } + } +} +#endif diff --git a/source/Halibut.Tests/Queue/Redis/RedisHelpers/RedisFacadeObserverTest.cs b/source/Halibut.Tests/Queue/Redis/RedisHelpers/RedisFacadeObserverTest.cs new file mode 100644 index 000000000..007ffd805 --- /dev/null +++ b/source/Halibut.Tests/Queue/Redis/RedisHelpers/RedisFacadeObserverTest.cs @@ -0,0 +1,136 @@ +#if NET8_0_OR_GREATER +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using FluentAssertions; +using Halibut.Diagnostics; +using Halibut.Diagnostics.LogCreators; +using Halibut.Logging; +using Halibut.Queue.Redis.RedisHelpers; +using Halibut.Tests.Queue.Redis.Utils; +using Halibut.Tests.Support; +using Halibut.Tests.Support.Logging; +using Halibut.Tests.Support.TestAttributes; +using Halibut.Tests.Util; +using NUnit.Framework; +using StackExchange.Redis; + +namespace Halibut.Tests.Queue.Redis.RedisHelpers +{ + [RedisTest] + public class RedisFacadeObserverTest : BaseTest + { + [Test] + public async Task WhenRedisConnectionGoesDown_ObserverShouldBeNotifiedOfRetryExceptions() + { + // Arrange + using var portForwarder = PortForwardingToRedisBuilder.ForwardingToRedis(Logger); + var testObserver = new TestRedisFacadeObserver(); + + await using var redisFacade = RedisFacadeBuilder.CreateRedisFacade(portForwarder, redisFacadeObserver: testObserver); + + // Verify Redis is working initially + await redisFacade.SetString("foo", "bar", TimeSpan.FromMinutes(1), CancellationToken); + (await redisFacade.GetString("foo", CancellationToken)).Should().Be("bar"); + + // Kill Redis connections to simulate network issues + portForwarder.EnterKillNewAndExistingConnectionsMode(); + + // This should trigger retries and call the observer + var getStringTask = redisFacade.GetString("foo", CancellationToken); + + // Wait a bit for retries to happen, then restore connection + await Task.Delay(6000); // By-default (somewhere) the redis client will wait 5s for a request to get to Redis so we need to wait longer than that. + portForwarder.ReturnToNormalMode(); + + // The operation should eventually succeed + var result = await getStringTask; + result.Should().Be("bar"); + + // Verify that the observer was called with retry exceptions + testObserver.ExecuteWithRetryExceptions.Should().NotBeEmpty("Observer should have been called for retry exceptions during connection issues"); + testObserver.ExecuteWithRetryExceptions.Should().AllSatisfy(ex => + { + ex.Exception.Should().NotBeNull("Exception should not be null"); + // We should have both retry attempts (willRetry=true) and potentially final failures (willRetry=false) + ex.WillRetry.Should().BeTrue(); + }); + + testObserver.ConnectionRestorations.Count.Should().BeGreaterThan(1); + testObserver.ConnectionFailures.Count.Should().BeGreaterThan(1); + } + + [Test] + public async Task WhenRedisConnectionGoesDown_AndStaysDown_ObserverShouldBeNotifiedOfRetryExceptions() + { + // Arrange + using var portForwarder = PortForwardingToRedisBuilder.ForwardingToRedis(Logger); + var testObserver = new TestRedisFacadeObserver(); + + await using var redisFacade = RedisFacadeBuilder.CreateRedisFacade(portForwarder, redisFacadeObserver: testObserver); + redisFacade.MaxDurationToRetryFor = TimeSpan.FromSeconds(1); + + // Verify Redis is working initially + await redisFacade.SetString("foo", "bar", TimeSpan.FromMinutes(1), CancellationToken); + (await redisFacade.GetString("foo", CancellationToken)).Should().Be("bar"); + + // Kill Redis connections to simulate network issues + portForwarder.EnterKillNewAndExistingConnectionsMode(); + + // This should trigger retries and call the observer + var getStringTask = redisFacade.GetString("foo", CancellationToken); + + // Wait a bit for retries to happen, then restore connection + await Task.Delay(6000); // By-default (somewhere) the redis client will wait 5s for a request to get to Redis so we need to wait longer than that. + portForwarder.ReturnToNormalMode(); + + // The operation should eventually succeed + var result = await getStringTask; + result.Should().Be("bar"); + + // Verify that the observer was called with retry exceptions + testObserver.ExecuteWithRetryExceptions.Should().NotBeEmpty("Observer should have been called for retry exceptions during connection issues"); + testObserver.ExecuteWithRetryExceptions.Should().AllSatisfy(ex => + { + ex.Exception.Should().NotBeNull("Exception should not be null"); + // We should have both retry attempts (willRetry=true) and potentially final failures (willRetry=false) + ex.WillRetry.Should().BeFalse(); + }); + + testObserver.ConnectionRestorations.Count.Should().Be(0); + testObserver.ConnectionFailures.Count.Should().BeGreaterThan(1); + } + + + + class TestRedisFacadeObserver : IRedisFacadeObserver + { + public List<(string? EndPoint, ConnectionFailureType FailureType, Exception? Exception)> ConnectionFailures { get; } = new(); + public List<(string? EndPoint, string Message)> ErrorMessages { get; } = new(); + public List ConnectionRestorations { get; } = new(); + public List<(Exception Exception, bool WillRetry)> ExecuteWithRetryExceptions { get; } = new(); + + public void OnRedisConnectionFailed(string? endPoint, ConnectionFailureType failureType, Exception? exception) + { + ConnectionFailures.Add((endPoint, failureType, exception)); + } + + public void OnRedisServerRepliedWithAnErrorMessage(string? endPoint, string message) + { + ErrorMessages.Add((endPoint, message)); + } + + public void OnRedisConnectionRestored(string? endPoint) + { + ConnectionRestorations.Add(endPoint); + } + + public void OnRedisOperationFailed(Exception exception, bool willRetry) + { + ExecuteWithRetryExceptions.Add((exception, willRetry)); + } + } + } +} +#endif diff --git a/source/Halibut.Tests/Queue/Redis/Utils/RedisFacadeBuilder.cs b/source/Halibut.Tests/Queue/Redis/Utils/RedisFacadeBuilder.cs index 68ace900c..d53601f8e 100644 --- a/source/Halibut.Tests/Queue/Redis/Utils/RedisFacadeBuilder.cs +++ b/source/Halibut.Tests/Queue/Redis/Utils/RedisFacadeBuilder.cs @@ -10,17 +10,18 @@ namespace Halibut.Tests.Queue.Redis.Utils { public class RedisFacadeBuilder { - public static RedisFacade CreateRedisFacade(string? host = null, int? port = 0, Guid? prefix = null) + public static RedisFacade CreateRedisFacade(string? host = null, int? port = 0, Guid? prefix = null, IRedisFacadeObserver? redisFacadeObserver = null) { port = port == 0 ? RedisTestHost.Port() : port; - return new RedisFacade((host??RedisTestHost.RedisHost) + ":" + port, (prefix ?? Guid.NewGuid()).ToString(), new TestContextLogCreator("Redis", LogLevel.Trace).CreateNewForPrefix("")); + return new RedisFacade((host??RedisTestHost.RedisHost) + ":" + port, (prefix ?? Guid.NewGuid()).ToString(), new TestContextLogCreator("Redis", LogLevel.Trace).CreateNewForPrefix(""), redisFacadeObserver); } - public static RedisFacade CreateRedisFacade(PortForwarder portForwarder, Guid? prefix = null) + public static RedisFacade CreateRedisFacade(PortForwarder portForwarder, Guid? prefix = null, IRedisFacadeObserver? redisFacadeObserver = null) { return CreateRedisFacade(host: portForwarder.PublicEndpoint.Host, port: portForwarder.ListeningPort, - prefix: prefix); + prefix: prefix, + redisFacadeObserver: redisFacadeObserver); } } } \ No newline at end of file diff --git a/source/Halibut/Queue/Redis/MessageStorage/IMessageSerialiserAndDataStreamStorageExceptionObserver.cs b/source/Halibut/Queue/Redis/MessageStorage/IMessageSerialiserAndDataStreamStorageExceptionObserver.cs new file mode 100644 index 000000000..450df9c5b --- /dev/null +++ b/source/Halibut/Queue/Redis/MessageStorage/IMessageSerialiserAndDataStreamStorageExceptionObserver.cs @@ -0,0 +1,19 @@ +using System; + +namespace Halibut.Queue.Redis.MessageStorage +{ + /// + /// Observes exceptions that occur within IMessageSerialiserAndDataStreamStorage implementations. + /// This allows monitoring and logging of errors during message serialization and data stream operations. + /// + public interface IMessageSerialiserAndDataStreamStorageExceptionObserver + { + /// + /// Called when an exception occurs in any IMessageSerialiserAndDataStreamStorage method. + /// Errors caught here are most likely caused by the Redis Pending Request Queue itself. + /// + /// The exception that was raised + /// The name of the method where the exception occurred + void OnException(Exception exception, string methodName); + } +} diff --git a/source/Halibut/Queue/Redis/MessageStorage/MessageSerialiserAndDataStreamStorageWithExceptionObserver.cs b/source/Halibut/Queue/Redis/MessageStorage/MessageSerialiserAndDataStreamStorageWithExceptionObserver.cs new file mode 100644 index 000000000..e14ba53b6 --- /dev/null +++ b/source/Halibut/Queue/Redis/MessageStorage/MessageSerialiserAndDataStreamStorageWithExceptionObserver.cs @@ -0,0 +1,79 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Halibut.Queue.QueuedDataStreams; +using Halibut.Queue.Redis.RedisHelpers; +using Halibut.Transport.Protocol; + +namespace Halibut.Queue.Redis.MessageStorage +{ + /// + /// Decorator implementation of IMessageSerialiserAndDataStreamStorage that wraps another implementation + /// and notifies an observer when exceptions occur during any of the operations. + /// + public class MessageSerialiserAndDataStreamStorageWithExceptionObserver : IMessageSerialiserAndDataStreamStorage + { + readonly IMessageSerialiserAndDataStreamStorage inner; + readonly IMessageSerialiserAndDataStreamStorageExceptionObserver exceptionObserver; + + public MessageSerialiserAndDataStreamStorageWithExceptionObserver( + IMessageSerialiserAndDataStreamStorage inner, + IMessageSerialiserAndDataStreamStorageExceptionObserver exceptionObserver) + { + this.inner = inner ?? throw new ArgumentNullException(nameof(inner)); + this.exceptionObserver = exceptionObserver ?? throw new ArgumentNullException(nameof(exceptionObserver)); + } + + public async Task<(RedisStoredMessage, HeartBeatDrivenDataStreamProgressReporter)> PrepareRequest(RequestMessage request, CancellationToken cancellationToken) + { + try + { + return await inner.PrepareRequest(request, cancellationToken); + } + catch (Exception ex) + { + exceptionObserver.OnException(ex, nameof(PrepareRequest)); + throw; + } + } + + public async Task<(RequestMessage, RequestDataStreamsTransferProgress)> ReadRequest(RedisStoredMessage jsonRequest, CancellationToken cancellationToken) + { + try + { + return await inner.ReadRequest(jsonRequest, cancellationToken); + } + catch (Exception ex) + { + exceptionObserver.OnException(ex, nameof(ReadRequest)); + throw; + } + } + + public async Task PrepareResponse(ResponseMessage response, CancellationToken cancellationToken) + { + try + { + return await inner.PrepareResponse(response, cancellationToken); + } + catch (Exception ex) + { + exceptionObserver.OnException(ex, nameof(PrepareResponse)); + throw; + } + } + + public async Task ReadResponse(RedisStoredMessage jsonResponse, CancellationToken cancellationToken) + { + try + { + return await inner.ReadResponse(jsonResponse, cancellationToken); + } + catch (Exception ex) + { + exceptionObserver.OnException(ex, nameof(ReadResponse)); + throw; + } + } + } +} diff --git a/source/Halibut/Queue/Redis/MessageStorage/NoOpMessageSerialiserAndDataStreamStorageExceptionObserver.cs b/source/Halibut/Queue/Redis/MessageStorage/NoOpMessageSerialiserAndDataStreamStorageExceptionObserver.cs new file mode 100644 index 000000000..68dbb5438 --- /dev/null +++ b/source/Halibut/Queue/Redis/MessageStorage/NoOpMessageSerialiserAndDataStreamStorageExceptionObserver.cs @@ -0,0 +1,27 @@ +using System; + +namespace Halibut.Queue.Redis.MessageStorage +{ + /// + /// No-operation implementation of IMessageSerialiserAndDataStreamStorageExceptionObserver + /// that discards all exception notifications. This is used as a default when no specific + /// exception handling behavior is required. + /// + public class NoOpMessageSerialiserAndDataStreamStorageExceptionObserver : IMessageSerialiserAndDataStreamStorageExceptionObserver + { + /// + /// Gets a singleton instance of the no-op observer to avoid unnecessary allocations. + /// + public static readonly IMessageSerialiserAndDataStreamStorageExceptionObserver Instance = new NoOpMessageSerialiserAndDataStreamStorageExceptionObserver(); + + /// + /// Does nothing with the provided exception. All exceptions are ignored. + /// + /// The exception that occurred (ignored) + /// The method name where the exception occurred (ignored) + public void OnException(Exception exception, string methodName) + { + // No-op: intentionally does nothing + } + } +} diff --git a/source/Halibut/Queue/Redis/RedisHelpers/IRedisFacadeObserver.cs b/source/Halibut/Queue/Redis/RedisHelpers/IRedisFacadeObserver.cs new file mode 100644 index 000000000..ffc013db9 --- /dev/null +++ b/source/Halibut/Queue/Redis/RedisHelpers/IRedisFacadeObserver.cs @@ -0,0 +1,40 @@ +using System; +using StackExchange.Redis; + +namespace Halibut.Queue.Redis.RedisHelpers +{ + /// + /// Observes events and exceptions that occur within RedisFacade. + /// This allows monitoring and logging of Redis connection events and retry operations. + /// + public interface IRedisFacadeObserver + { + /// + /// Called when a Redis connection fails. + /// + /// The endpoint that failed + /// The type of failure + /// The exception that occurred, if any + void OnRedisConnectionFailed(string? endPoint, ConnectionFailureType failureType, Exception? exception); + + /// + /// Called when a Redis error message is received. + /// + /// The endpoint where the error occurred + /// The error message + void OnRedisServerRepliedWithAnErrorMessage(string? endPoint, string message); + + /// + /// Called when a Redis connection is restored. + /// + /// The endpoint that was restored + void OnRedisConnectionRestored(string? endPoint); + + /// + /// When an exception is raised trying to do an operation with redis, this method is called. + /// + /// The exception that occurred + /// True if the operation will be retried, false if it will fail + void OnRedisOperationFailed(Exception exception, bool willRetry); + } +} diff --git a/source/Halibut/Queue/Redis/RedisHelpers/NoOpRedisFacadeObserver.cs b/source/Halibut/Queue/Redis/RedisHelpers/NoOpRedisFacadeObserver.cs new file mode 100644 index 000000000..a1969b81e --- /dev/null +++ b/source/Halibut/Queue/Redis/RedisHelpers/NoOpRedisFacadeObserver.cs @@ -0,0 +1,57 @@ +using System; +using StackExchange.Redis; + +namespace Halibut.Queue.Redis.RedisHelpers +{ + /// + /// No-operation implementation of IRedisFacadeObserver that discards all notifications. + /// This is used as a default when no specific Redis monitoring behavior is required. + /// + public class NoOpRedisFacadeObserver : IRedisFacadeObserver + { + /// + /// Gets a singleton instance of the no-op observer to avoid unnecessary allocations. + /// + public static readonly IRedisFacadeObserver Instance = new NoOpRedisFacadeObserver(); + + /// + /// Does nothing with the connection failed notification. + /// + /// The endpoint that failed (ignored) + /// The type of failure (ignored) + /// The exception that occurred (ignored) + public void OnRedisConnectionFailed(string? endPoint, ConnectionFailureType failureType, Exception? exception) + { + // No-op: intentionally does nothing + } + + /// + /// Does nothing with the error message notification. + /// + /// The endpoint where the error occurred (ignored) + /// The error message (ignored) + public void OnRedisServerRepliedWithAnErrorMessage(string? endPoint, string message) + { + // No-op: intentionally does nothing + } + + /// + /// Does nothing with the connection restored notification. + /// + /// The endpoint that was restored (ignored) + public void OnRedisConnectionRestored(string? endPoint) + { + // No-op: intentionally does nothing + } + + /// + /// Does nothing with the retry exception notification. + /// + /// The exception that occurred (ignored) + /// Whether the operation will be retried (ignored) + public void OnRedisOperationFailed(Exception exception, bool willRetry) + { + // No-op: intentionally does nothing + } + } +} diff --git a/source/Halibut/Queue/Redis/RedisHelpers/RedisFacade.cs b/source/Halibut/Queue/Redis/RedisHelpers/RedisFacade.cs index af126fcf2..434a55d37 100644 --- a/source/Halibut/Queue/Redis/RedisHelpers/RedisFacade.cs +++ b/source/Halibut/Queue/Redis/RedisHelpers/RedisFacade.cs @@ -14,6 +14,7 @@ public class RedisFacade : IAsyncDisposable { readonly Lazy connection; readonly ILog log; + readonly IRedisFacadeObserver observer; // We can survive redis being unavailable for this amount of time. // Generally redis will try for 5s, we add our own retries to try for longer. internal TimeSpan MaxDurationToRetryFor = TimeSpan.FromSeconds(30); @@ -29,14 +30,17 @@ public class RedisFacade : IAsyncDisposable readonly CancelOnDisposeCancellationToken objectLifetimeCts; readonly CancellationToken objectLifeTimeCancellationToken; - public RedisFacade(string configuration, string keyPrefix, ILog log) : this(ConfigurationOptions.Parse(configuration), keyPrefix, log) + public RedisFacade(string configuration, string keyPrefix, ILog log, IRedisFacadeObserver? redisFacadeObserver = null) : + this(ConfigurationOptions.Parse(configuration), keyPrefix, log, redisFacadeObserver) { } - public RedisFacade(ConfigurationOptions redisOptions, string keyPrefix, ILog log) + + public RedisFacade(ConfigurationOptions redisOptions, string keyPrefix, ILog log, IRedisFacadeObserver? observer = null) { this.keyPrefix = keyPrefix + ":HalibutRedis"; this.log = log.ForContext(); + this.observer = observer ?? NoOpRedisFacadeObserver.Instance; objectLifetimeCts = new CancelOnDisposeCancellationToken(); objectLifeTimeCancellationToken = objectLifetimeCts.Token; @@ -59,16 +63,19 @@ public RedisFacade(ConfigurationOptions redisOptions, string keyPrefix, ILog log void OnConnectionFailed(object? sender, ConnectionFailedEventArgs e) { log.Write(EventType.Error, "Redis connection failed - EndPoint: {0}, Failure: {1}, Exception: {2}", e.EndPoint, e.FailureType, e.Exception?.Message); + observer.OnRedisConnectionFailed(e.EndPoint?.ToString(), e.FailureType, e.Exception); } void OnErrorMessage(object? sender, RedisErrorEventArgs e) { log.Write(EventType.Error, "Redis error - EndPoint: {0}, Message: {1}", e.EndPoint, e.Message); + observer.OnRedisServerRepliedWithAnErrorMessage(e.EndPoint?.ToString(), e.Message); } void OnConnectionRestored(object? sender, ConnectionFailedEventArgs e) { log.Write(EventType.Diagnostic, "Redis connection restored - EndPoint: {0}", e.EndPoint); + observer.OnRedisConnectionRestored(e.EndPoint?.ToString()); } async Task ExecuteWithRetry(Func> operation, CancellationToken cancellationToken) @@ -87,8 +94,11 @@ async Task ExecuteWithRetry(Func> operation, CancellationToken can { return await operation(); } - catch (Exception ex) when (stopwatch.Elapsed < MaxDurationToRetryFor && !combinedToken.IsCancellationRequested) + catch (Exception ex) when (!combinedToken.IsCancellationRequested) { + bool willRetry = stopwatch.Elapsed < MaxDurationToRetryFor; + observer.OnRedisOperationFailed(ex, willRetry); + if (!willRetry) throw; log?.Write(EventType.Diagnostic, $"Redis operation failed, retrying in {retryDelay.TotalSeconds}s: {ex.Message}"); await Task.Delay(retryDelay, combinedToken); } @@ -112,8 +122,11 @@ async Task ExecuteWithRetry(Func operation, CancellationToken cancellation await operation(); return; } - catch (Exception ex) when (stopwatch.Elapsed < MaxDurationToRetryFor && !combinedToken.IsCancellationRequested) + catch (Exception ex) when (!combinedToken.IsCancellationRequested) { + bool willRetry = stopwatch.Elapsed < MaxDurationToRetryFor; + observer.OnRedisOperationFailed(ex, willRetry); + if (!willRetry) throw; log?.Write(EventType.Diagnostic, $"Redis operation failed, retrying in {retryDelay.TotalSeconds}s: {ex.Message}"); await Task.Delay(retryDelay, combinedToken); } diff --git a/source/Halibut/Queue/Redis/RedisPendingRequestQueueFactory.cs b/source/Halibut/Queue/Redis/RedisPendingRequestQueueFactory.cs index 4d50bc983..4509cfc78 100644 --- a/source/Halibut/Queue/Redis/RedisPendingRequestQueueFactory.cs +++ b/source/Halibut/Queue/Redis/RedisPendingRequestQueueFactory.cs @@ -19,31 +19,44 @@ public class RedisPendingRequestQueueFactory : IPendingRequestQueueFactory readonly ILogFactory logFactory; readonly HalibutTimeoutsAndLimits halibutTimeoutsAndLimits; readonly IWatchForRedisLosingAllItsData watchForRedisLosingAllItsData; + readonly IMessageSerialiserAndDataStreamStorageExceptionObserver exceptionObserver; + readonly Func? queueDecorator; public RedisPendingRequestQueueFactory( QueueMessageSerializer queueMessageSerializer, IStoreDataStreamsForDistributedQueues dataStreamStorage, IWatchForRedisLosingAllItsData watchForRedisLosingAllItsData, HalibutRedisTransport halibutRedisTransport, - HalibutTimeoutsAndLimits halibutTimeoutsAndLimits, - ILogFactory logFactory) + HalibutTimeoutsAndLimits halibutTimeoutsAndLimits, + ILogFactory logFactory, + IMessageSerialiserAndDataStreamStorageExceptionObserver? exceptionObserver = null, + Func? queueDecorator = null) { this.queueMessageSerializer = queueMessageSerializer; this.dataStreamStorage = dataStreamStorage; this.halibutRedisTransport = halibutRedisTransport; this.logFactory = logFactory; + this.queueDecorator = queueDecorator; this.halibutTimeoutsAndLimits = halibutTimeoutsAndLimits; this.watchForRedisLosingAllItsData = watchForRedisLosingAllItsData; + this.exceptionObserver = exceptionObserver ?? NoOpMessageSerialiserAndDataStreamStorageExceptionObserver.Instance; } public IPendingRequestQueue CreateQueue(Uri endpoint) { - return new RedisPendingRequestQueue(endpoint, + var baseStorage = new MessageSerialiserAndDataStreamStorage(queueMessageSerializer, dataStreamStorage); + var storageWithObserver = new MessageSerialiserAndDataStreamStorageWithExceptionObserver(baseStorage, exceptionObserver); + + var queue = new RedisPendingRequestQueue(endpoint, watchForRedisLosingAllItsData, logFactory.ForEndpoint(endpoint), halibutRedisTransport, - new MessageSerialiserAndDataStreamStorage(queueMessageSerializer, dataStreamStorage), + storageWithObserver, halibutTimeoutsAndLimits); + + if (queueDecorator != null) return queueDecorator(queue); + + return queue; } } From 460059140afc0f87095e1a5f345901b6e5fb289a Mon Sep 17 00:00:00 2001 From: Luke Butters Date: Mon, 22 Sep 2025 15:17:43 +1000 Subject: [PATCH 2/6] . --- .../Queue/Redis/RedisHelpers/RedisFacadeObserverTest.cs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/source/Halibut.Tests/Queue/Redis/RedisHelpers/RedisFacadeObserverTest.cs b/source/Halibut.Tests/Queue/Redis/RedisHelpers/RedisFacadeObserverTest.cs index 007ffd805..e85662f2f 100644 --- a/source/Halibut.Tests/Queue/Redis/RedisHelpers/RedisFacadeObserverTest.cs +++ b/source/Halibut.Tests/Queue/Redis/RedisHelpers/RedisFacadeObserverTest.cs @@ -53,7 +53,6 @@ public async Task WhenRedisConnectionGoesDown_ObserverShouldBeNotifiedOfRetryExc testObserver.ExecuteWithRetryExceptions.Should().AllSatisfy(ex => { ex.Exception.Should().NotBeNull("Exception should not be null"); - // We should have both retry attempts (willRetry=true) and potentially final failures (willRetry=false) ex.WillRetry.Should().BeTrue(); }); @@ -94,15 +93,12 @@ public async Task WhenRedisConnectionGoesDown_AndStaysDown_ObserverShouldBeNotif testObserver.ExecuteWithRetryExceptions.Should().AllSatisfy(ex => { ex.Exception.Should().NotBeNull("Exception should not be null"); - // We should have both retry attempts (willRetry=true) and potentially final failures (willRetry=false) ex.WillRetry.Should().BeFalse(); }); testObserver.ConnectionRestorations.Count.Should().Be(0); testObserver.ConnectionFailures.Count.Should().BeGreaterThan(1); } - - class TestRedisFacadeObserver : IRedisFacadeObserver { From 981376353301541f634956ef5e97fb466ef7aebb Mon Sep 17 00:00:00 2001 From: Luke Butters Date: Mon, 22 Sep 2025 15:23:59 +1000 Subject: [PATCH 3/6] . --- .../Redis/RedisHelpers/RedisFacadeObserverTest.cs | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/source/Halibut.Tests/Queue/Redis/RedisHelpers/RedisFacadeObserverTest.cs b/source/Halibut.Tests/Queue/Redis/RedisHelpers/RedisFacadeObserverTest.cs index e85662f2f..5e876ea27 100644 --- a/source/Halibut.Tests/Queue/Redis/RedisHelpers/RedisFacadeObserverTest.cs +++ b/source/Halibut.Tests/Queue/Redis/RedisHelpers/RedisFacadeObserverTest.cs @@ -77,16 +77,11 @@ public async Task WhenRedisConnectionGoesDown_AndStaysDown_ObserverShouldBeNotif // Kill Redis connections to simulate network issues portForwarder.EnterKillNewAndExistingConnectionsMode(); - // This should trigger retries and call the observer - var getStringTask = redisFacade.GetString("foo", CancellationToken); + // This should trigger retries and call the observer, then ultimately fail + var exception = await AssertThrowsAny.Exception(async () => + await redisFacade.GetString("foo", CancellationToken)); - // Wait a bit for retries to happen, then restore connection - await Task.Delay(6000); // By-default (somewhere) the redis client will wait 5s for a request to get to Redis so we need to wait longer than that. - portForwarder.ReturnToNormalMode(); - - // The operation should eventually succeed - var result = await getStringTask; - result.Should().Be("bar"); + exception.Should().NotBeNull("Operation should fail when Redis stays down"); // Verify that the observer was called with retry exceptions testObserver.ExecuteWithRetryExceptions.Should().NotBeEmpty("Observer should have been called for retry exceptions during connection issues"); From ba9fd10b342ceb813dbc1da4a3c8c2a1e08a104e Mon Sep 17 00:00:00 2001 From: Luke Butters Date: Mon, 22 Sep 2025 15:24:21 +1000 Subject: [PATCH 4/6] . --- .../Queue/Redis/RedisHelpers/RedisFacadeObserverTest.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/Halibut.Tests/Queue/Redis/RedisHelpers/RedisFacadeObserverTest.cs b/source/Halibut.Tests/Queue/Redis/RedisHelpers/RedisFacadeObserverTest.cs index 5e876ea27..4e4afbe8d 100644 --- a/source/Halibut.Tests/Queue/Redis/RedisHelpers/RedisFacadeObserverTest.cs +++ b/source/Halibut.Tests/Queue/Redis/RedisHelpers/RedisFacadeObserverTest.cs @@ -56,8 +56,8 @@ public async Task WhenRedisConnectionGoesDown_ObserverShouldBeNotifiedOfRetryExc ex.WillRetry.Should().BeTrue(); }); - testObserver.ConnectionRestorations.Count.Should().BeGreaterThan(1); - testObserver.ConnectionFailures.Count.Should().BeGreaterThan(1); + testObserver.ConnectionRestorations.Count.Should().BeGreaterThanOrEqualTo(1); + testObserver.ConnectionFailures.Count.Should().BeGreaterThanOrEqualTo(1); } [Test] @@ -92,7 +92,7 @@ public async Task WhenRedisConnectionGoesDown_AndStaysDown_ObserverShouldBeNotif }); testObserver.ConnectionRestorations.Count.Should().Be(0); - testObserver.ConnectionFailures.Count.Should().BeGreaterThan(1); + testObserver.ConnectionFailures.Count.Should().BeGreaterThanOrEqualTo(1); } class TestRedisFacadeObserver : IRedisFacadeObserver From 74f3204af9976dc7448fc870bece4cd29b963677 Mon Sep 17 00:00:00 2001 From: Luke Butters Date: Tue, 23 Sep 2025 09:42:39 +1000 Subject: [PATCH 5/6] . --- .../RedisHelpers/RedisFacadeObserverTest.cs | 85 +++++++++++++++---- 1 file changed, 69 insertions(+), 16 deletions(-) diff --git a/source/Halibut.Tests/Queue/Redis/RedisHelpers/RedisFacadeObserverTest.cs b/source/Halibut.Tests/Queue/Redis/RedisHelpers/RedisFacadeObserverTest.cs index 4e4afbe8d..b037123f8 100644 --- a/source/Halibut.Tests/Queue/Redis/RedisHelpers/RedisFacadeObserverTest.cs +++ b/source/Halibut.Tests/Queue/Redis/RedisHelpers/RedisFacadeObserverTest.cs @@ -1,17 +1,11 @@ #if NET8_0_OR_GREATER using System; using System.Collections.Generic; -using System.Threading; using System.Threading.Tasks; using FluentAssertions; -using Halibut.Diagnostics; -using Halibut.Diagnostics.LogCreators; -using Halibut.Logging; using Halibut.Queue.Redis.RedisHelpers; using Halibut.Tests.Queue.Redis.Utils; using Halibut.Tests.Support; -using Halibut.Tests.Support.Logging; -using Halibut.Tests.Support.TestAttributes; using Halibut.Tests.Util; using NUnit.Framework; using StackExchange.Redis; @@ -40,8 +34,10 @@ public async Task WhenRedisConnectionGoesDown_ObserverShouldBeNotifiedOfRetryExc // This should trigger retries and call the observer var getStringTask = redisFacade.GetString("foo", CancellationToken); - // Wait a bit for retries to happen, then restore connection - await Task.Delay(6000); // By-default (somewhere) the redis client will wait 5s for a request to get to Redis so we need to wait longer than that. + // Wait for retries to happen, then restore connection + await ShouldEventually.Eventually(() => testObserver.ExecuteWithRetryExceptions.Count.Should().BeGreaterThanOrEqualTo(1), + TimeSpan.FromSeconds(30), + CancellationToken); portForwarder.ReturnToNormalMode(); // The operation should eventually succeed @@ -97,29 +93,86 @@ public async Task WhenRedisConnectionGoesDown_AndStaysDown_ObserverShouldBeNotif class TestRedisFacadeObserver : IRedisFacadeObserver { - public List<(string? EndPoint, ConnectionFailureType FailureType, Exception? Exception)> ConnectionFailures { get; } = new(); - public List<(string? EndPoint, string Message)> ErrorMessages { get; } = new(); - public List ConnectionRestorations { get; } = new(); - public List<(Exception Exception, bool WillRetry)> ExecuteWithRetryExceptions { get; } = new(); + readonly object mutex = new object(); + readonly List<(string? EndPoint, ConnectionFailureType FailureType, Exception? Exception)> connectionFailures = new(); + readonly List<(string? EndPoint, string Message)> errorMessages = new(); + readonly List connectionRestorations = new(); + readonly List<(Exception Exception, bool WillRetry)> executeWithRetryExceptions = new(); + + public List<(string? EndPoint, ConnectionFailureType FailureType, Exception? Exception)> ConnectionFailures + { + get + { + lock (mutex) + { + return new List<(string? EndPoint, ConnectionFailureType FailureType, Exception? Exception)>(connectionFailures); + } + } + } + + public List<(string? EndPoint, string Message)> ErrorMessages + { + get + { + lock (mutex) + { + return new List<(string? EndPoint, string Message)>(errorMessages); + } + } + } + + public List ConnectionRestorations + { + get + { + lock (mutex) + { + return new List(connectionRestorations); + } + } + } + + public List<(Exception Exception, bool WillRetry)> ExecuteWithRetryExceptions + { + get + { + lock (mutex) + { + return new List<(Exception Exception, bool WillRetry)>(executeWithRetryExceptions); + } + } + } public void OnRedisConnectionFailed(string? endPoint, ConnectionFailureType failureType, Exception? exception) { - ConnectionFailures.Add((endPoint, failureType, exception)); + lock (mutex) + { + connectionFailures.Add((endPoint, failureType, exception)); + } } public void OnRedisServerRepliedWithAnErrorMessage(string? endPoint, string message) { - ErrorMessages.Add((endPoint, message)); + lock (mutex) + { + errorMessages.Add((endPoint, message)); + } } public void OnRedisConnectionRestored(string? endPoint) { - ConnectionRestorations.Add(endPoint); + lock (mutex) + { + connectionRestorations.Add(endPoint); + } } public void OnRedisOperationFailed(Exception exception, bool willRetry) { - ExecuteWithRetryExceptions.Add((exception, willRetry)); + lock (mutex) + { + executeWithRetryExceptions.Add((exception, willRetry)); + } } } } From 2361938d95f3c439127ed37049746d6c287fcbbd Mon Sep 17 00:00:00 2001 From: Luke Butters Date: Tue, 23 Sep 2025 09:44:16 +1000 Subject: [PATCH 6/6] . --- .../Halibut.Tests/Builders/RedisPendingRequestQueueBuilder.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/Halibut.Tests/Builders/RedisPendingRequestQueueBuilder.cs b/source/Halibut.Tests/Builders/RedisPendingRequestQueueBuilder.cs index 3fca441f6..90ce5b7a2 100644 --- a/source/Halibut.Tests/Builders/RedisPendingRequestQueueBuilder.cs +++ b/source/Halibut.Tests/Builders/RedisPendingRequestQueueBuilder.cs @@ -65,8 +65,7 @@ public QueueHolder Build() var redisTransport = new HalibutRedisTransport(redisFacade); var dataStreamStore = new InMemoryStoreDataStreamsForDistributedQueues(); var messageSerializer = new QueueMessageSerializerBuilder().Build(); - var baseMessageReaderWriter = new MessageSerialiserAndDataStreamStorage(messageSerializer, dataStreamStore); - var messageReaderWriter = new MessageSerialiserAndDataStreamStorageWithExceptionObserver(baseMessageReaderWriter, NoOpMessageSerialiserAndDataStreamStorageExceptionObserver.Instance); + var messageReaderWriter = new MessageSerialiserAndDataStreamStorage(messageSerializer, dataStreamStore); var queue = new RedisPendingRequestQueue(endpoint, new RedisNeverLosesData(), log, redisTransport, messageReaderWriter, halibutTimeoutsAndLimits); if (defaultDelayBeforeSubscribingToRequestCancellation != null)