-
Notifications
You must be signed in to change notification settings - Fork 48
Add Redis PRQ observability infrastructure #690
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
131 changes: 131 additions & 0 deletions
131
...e/Halibut.Tests/Queue/Redis/MessageSerialiserAndDataStreamStorageExceptionObserverTest.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
#if NET8_0_OR_GREATER | ||
using System; | ||
using System.Collections.Generic; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
using FluentAssertions; | ||
using Halibut.Diagnostics; | ||
using Halibut.Diagnostics.LogCreators; | ||
using Halibut.Logging; | ||
using Halibut.Queue; | ||
using Halibut.Queue.MessageStreamWrapping; | ||
using Halibut.Queue.QueuedDataStreams; | ||
using Halibut.Queue.Redis; | ||
using Halibut.Queue.Redis.MessageStorage; | ||
using Halibut.Queue.Redis.RedisHelpers; | ||
using Halibut.Tests.Builders; | ||
using Halibut.Tests.Queue.Redis.Utils; | ||
using Halibut.Tests.Support; | ||
using Halibut.Tests.Support.Logging; | ||
using Halibut.Tests.Support.TestAttributes; | ||
using Halibut.Tests.Util; | ||
using Halibut.TestUtils.Contracts; | ||
using Halibut.Transport.Protocol; | ||
using NUnit.Framework; | ||
|
||
namespace Halibut.Tests.Queue.Redis | ||
{ | ||
[RedisTest] | ||
public class MessageSerialiserAndDataStreamStorageExceptionObserverTest : BaseTest | ||
{ | ||
[Test] | ||
public async Task WhenStoreDataStreamsThrows_ExceptionObserverShouldBeNotified() | ||
{ | ||
// Arrange | ||
var endpoint = new Uri("poll://" + Guid.NewGuid()); | ||
await using var redisFacade = RedisFacadeBuilder.CreateRedisFacade(); | ||
var redisTransport = new HalibutRedisTransport(redisFacade); | ||
|
||
var expectedException = new InvalidOperationException("Test exception from storage"); | ||
var throwingDataStreamStorage = new ThrowingStoreDataStreamsForDistributedQueues(expectedException); | ||
var testObserver = new TestMessageSerialiserAndDataStreamStorageExceptionObserver(); | ||
|
||
var queueMessageSerializer = new QueueMessageSerializerBuilder().Build(); | ||
var logFactory = new TestContextLogCreator("Redis", LogLevel.Trace).ToCachingLogFactory(); | ||
var factory = new RedisPendingRequestQueueFactory( | ||
queueMessageSerializer, | ||
throwingDataStreamStorage, | ||
new RedisNeverLosesData(), | ||
redisTransport, | ||
new HalibutTimeoutsAndLimits(), | ||
logFactory, | ||
testObserver); | ||
|
||
var sut = (RedisPendingRequestQueue)factory.CreateQueue(endpoint); | ||
await sut.WaitUntilQueueIsSubscribedToReceiveMessages(); | ||
|
||
// Create a request with data streams to trigger PrepareRequest -> StoreDataStreams | ||
var request = new RequestMessageBuilder("poll://test-endpoint").Build(); | ||
request.Params = new[] { new ComplexObjectMultipleDataStreams(DataStream.FromString("hello"), DataStream.FromString("world")) }; | ||
|
||
// Act & Assert | ||
var exception = await AssertThrowsAny.Exception(async () => await sut.QueueAndWaitAsync(request, CancellationToken)); | ||
|
||
// Verify that the observer was called | ||
testObserver.ObservedException.Should().NotBeNull("Exception observer should have been called"); | ||
testObserver.ObservedException.Should().BeSameAs(expectedException, "Observer should receive the original exception"); | ||
testObserver.MethodName.Should().Be(nameof(IMessageSerialiserAndDataStreamStorage.PrepareRequest), "Observer should know which method threw the exception"); | ||
|
||
// Verify the original exception is still thrown | ||
exception.Should().NotBeNull("Original exception should still be thrown"); | ||
exception.InnerException.Should().BeSameAs(expectedException, "Original exception should be preserved"); | ||
} | ||
|
||
public class QueueMessageSerializerBuilder | ||
{ | ||
public QueueMessageSerializer Build() | ||
{ | ||
var typeRegistry = new TypeRegistry(); | ||
typeRegistry.Register(typeof(IComplexObjectService)); | ||
|
||
StreamCapturingJsonSerializer StreamCapturingSerializer() | ||
{ | ||
var settings = MessageSerializerBuilder.CreateSerializer(); | ||
var binder = new RegisteredSerializationBinder(typeRegistry); | ||
settings.SerializationBinder = binder; | ||
return new StreamCapturingJsonSerializer(settings); | ||
} | ||
|
||
return new QueueMessageSerializer(StreamCapturingSerializer, new MessageStreamWrappers()); | ||
} | ||
} | ||
|
||
class ThrowingStoreDataStreamsForDistributedQueues : IStoreDataStreamsForDistributedQueues | ||
{ | ||
readonly Exception exceptionToThrow; | ||
|
||
public ThrowingStoreDataStreamsForDistributedQueues(Exception exceptionToThrow) | ||
{ | ||
this.exceptionToThrow = exceptionToThrow; | ||
} | ||
|
||
public Task<byte[]> StoreDataStreams(IReadOnlyList<DataStream> dataStreams, CancellationToken cancellationToken) | ||
{ | ||
throw exceptionToThrow; | ||
} | ||
|
||
public Task RehydrateDataStreams(byte[] dataStreamMetadata, List<IRehydrateDataStream> rehydrateDataStreams, CancellationToken cancellationToken) | ||
{ | ||
throw exceptionToThrow; | ||
} | ||
|
||
public ValueTask DisposeAsync() | ||
{ | ||
return ValueTask.CompletedTask; | ||
} | ||
} | ||
|
||
class TestMessageSerialiserAndDataStreamStorageExceptionObserver : IMessageSerialiserAndDataStreamStorageExceptionObserver | ||
{ | ||
public Exception? ObservedException { get; private set; } | ||
public string? MethodName { get; private set; } | ||
|
||
public void OnException(Exception exception, string methodName) | ||
{ | ||
ObservedException = exception; | ||
MethodName = methodName; | ||
} | ||
} | ||
} | ||
} | ||
#endif |
180 changes: 180 additions & 0 deletions
180
source/Halibut.Tests/Queue/Redis/RedisHelpers/RedisFacadeObserverTest.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,180 @@ | ||
#if NET8_0_OR_GREATER | ||
using System; | ||
using System.Collections.Generic; | ||
using System.Threading.Tasks; | ||
using FluentAssertions; | ||
using Halibut.Queue.Redis.RedisHelpers; | ||
using Halibut.Tests.Queue.Redis.Utils; | ||
using Halibut.Tests.Support; | ||
using Halibut.Tests.Util; | ||
using NUnit.Framework; | ||
using StackExchange.Redis; | ||
|
||
namespace Halibut.Tests.Queue.Redis.RedisHelpers | ||
{ | ||
[RedisTest] | ||
public class RedisFacadeObserverTest : BaseTest | ||
{ | ||
[Test] | ||
public async Task WhenRedisConnectionGoesDown_ObserverShouldBeNotifiedOfRetryExceptions() | ||
{ | ||
// Arrange | ||
using var portForwarder = PortForwardingToRedisBuilder.ForwardingToRedis(Logger); | ||
var testObserver = new TestRedisFacadeObserver(); | ||
|
||
await using var redisFacade = RedisFacadeBuilder.CreateRedisFacade(portForwarder, redisFacadeObserver: testObserver); | ||
|
||
// Verify Redis is working initially | ||
await redisFacade.SetString("foo", "bar", TimeSpan.FromMinutes(1), CancellationToken); | ||
(await redisFacade.GetString("foo", CancellationToken)).Should().Be("bar"); | ||
|
||
// Kill Redis connections to simulate network issues | ||
portForwarder.EnterKillNewAndExistingConnectionsMode(); | ||
|
||
// This should trigger retries and call the observer | ||
var getStringTask = redisFacade.GetString("foo", CancellationToken); | ||
|
||
// Wait for retries to happen, then restore connection | ||
await ShouldEventually.Eventually(() => testObserver.ExecuteWithRetryExceptions.Count.Should().BeGreaterThanOrEqualTo(1), | ||
TimeSpan.FromSeconds(30), | ||
CancellationToken); | ||
portForwarder.ReturnToNormalMode(); | ||
|
||
// The operation should eventually succeed | ||
var result = await getStringTask; | ||
result.Should().Be("bar"); | ||
|
||
// Verify that the observer was called with retry exceptions | ||
testObserver.ExecuteWithRetryExceptions.Should().NotBeEmpty("Observer should have been called for retry exceptions during connection issues"); | ||
testObserver.ExecuteWithRetryExceptions.Should().AllSatisfy(ex => | ||
{ | ||
ex.Exception.Should().NotBeNull("Exception should not be null"); | ||
ex.WillRetry.Should().BeTrue(); | ||
}); | ||
|
||
testObserver.ConnectionRestorations.Count.Should().BeGreaterThanOrEqualTo(1); | ||
testObserver.ConnectionFailures.Count.Should().BeGreaterThanOrEqualTo(1); | ||
} | ||
|
||
[Test] | ||
public async Task WhenRedisConnectionGoesDown_AndStaysDown_ObserverShouldBeNotifiedOfRetryExceptions() | ||
{ | ||
// Arrange | ||
using var portForwarder = PortForwardingToRedisBuilder.ForwardingToRedis(Logger); | ||
var testObserver = new TestRedisFacadeObserver(); | ||
|
||
await using var redisFacade = RedisFacadeBuilder.CreateRedisFacade(portForwarder, redisFacadeObserver: testObserver); | ||
redisFacade.MaxDurationToRetryFor = TimeSpan.FromSeconds(1); | ||
|
||
// Verify Redis is working initially | ||
await redisFacade.SetString("foo", "bar", TimeSpan.FromMinutes(1), CancellationToken); | ||
(await redisFacade.GetString("foo", CancellationToken)).Should().Be("bar"); | ||
|
||
// Kill Redis connections to simulate network issues | ||
portForwarder.EnterKillNewAndExistingConnectionsMode(); | ||
|
||
// This should trigger retries and call the observer, then ultimately fail | ||
var exception = await AssertThrowsAny.Exception(async () => | ||
await redisFacade.GetString("foo", CancellationToken)); | ||
|
||
exception.Should().NotBeNull("Operation should fail when Redis stays down"); | ||
|
||
// Verify that the observer was called with retry exceptions | ||
testObserver.ExecuteWithRetryExceptions.Should().NotBeEmpty("Observer should have been called for retry exceptions during connection issues"); | ||
testObserver.ExecuteWithRetryExceptions.Should().AllSatisfy(ex => | ||
{ | ||
ex.Exception.Should().NotBeNull("Exception should not be null"); | ||
ex.WillRetry.Should().BeFalse(); | ||
}); | ||
|
||
testObserver.ConnectionRestorations.Count.Should().Be(0); | ||
testObserver.ConnectionFailures.Count.Should().BeGreaterThanOrEqualTo(1); | ||
} | ||
|
||
class TestRedisFacadeObserver : IRedisFacadeObserver | ||
{ | ||
readonly object mutex = new object(); | ||
readonly List<(string? EndPoint, ConnectionFailureType FailureType, Exception? Exception)> connectionFailures = new(); | ||
readonly List<(string? EndPoint, string Message)> errorMessages = new(); | ||
readonly List<string?> connectionRestorations = new(); | ||
readonly List<(Exception Exception, bool WillRetry)> executeWithRetryExceptions = new(); | ||
|
||
public List<(string? EndPoint, ConnectionFailureType FailureType, Exception? Exception)> ConnectionFailures | ||
{ | ||
get | ||
{ | ||
lock (mutex) | ||
{ | ||
return new List<(string? EndPoint, ConnectionFailureType FailureType, Exception? Exception)>(connectionFailures); | ||
} | ||
} | ||
} | ||
|
||
public List<(string? EndPoint, string Message)> ErrorMessages | ||
{ | ||
get | ||
{ | ||
lock (mutex) | ||
{ | ||
return new List<(string? EndPoint, string Message)>(errorMessages); | ||
} | ||
} | ||
} | ||
|
||
public List<string?> ConnectionRestorations | ||
{ | ||
get | ||
{ | ||
lock (mutex) | ||
{ | ||
return new List<string?>(connectionRestorations); | ||
} | ||
} | ||
} | ||
|
||
public List<(Exception Exception, bool WillRetry)> ExecuteWithRetryExceptions | ||
{ | ||
get | ||
{ | ||
lock (mutex) | ||
{ | ||
return new List<(Exception Exception, bool WillRetry)>(executeWithRetryExceptions); | ||
} | ||
} | ||
} | ||
|
||
public void OnRedisConnectionFailed(string? endPoint, ConnectionFailureType failureType, Exception? exception) | ||
{ | ||
lock (mutex) | ||
{ | ||
connectionFailures.Add((endPoint, failureType, exception)); | ||
} | ||
} | ||
|
||
public void OnRedisServerRepliedWithAnErrorMessage(string? endPoint, string message) | ||
{ | ||
lock (mutex) | ||
{ | ||
errorMessages.Add((endPoint, message)); | ||
} | ||
} | ||
|
||
public void OnRedisConnectionRestored(string? endPoint) | ||
{ | ||
lock (mutex) | ||
{ | ||
connectionRestorations.Add(endPoint); | ||
} | ||
} | ||
|
||
public void OnRedisOperationFailed(Exception exception, bool willRetry) | ||
{ | ||
lock (mutex) | ||
{ | ||
executeWithRetryExceptions.Add((exception, willRetry)); | ||
} | ||
} | ||
} | ||
} | ||
} | ||
#endif |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
19 changes: 19 additions & 0 deletions
19
...but/Queue/Redis/MessageStorage/IMessageSerialiserAndDataStreamStorageExceptionObserver.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
using System; | ||
|
||
namespace Halibut.Queue.Redis.MessageStorage | ||
{ | ||
/// <summary> | ||
/// Observes exceptions that occur within IMessageSerialiserAndDataStreamStorage implementations. | ||
/// This allows monitoring and logging of errors during message serialization and data stream operations. | ||
/// </summary> | ||
public interface IMessageSerialiserAndDataStreamStorageExceptionObserver | ||
{ | ||
/// <summary> | ||
/// Called when an exception occurs in any IMessageSerialiserAndDataStreamStorage method. | ||
/// Errors caught here are most likely caused by the Redis Pending Request Queue itself. | ||
/// </summary> | ||
/// <param name="exception">The exception that was raised</param> | ||
/// <param name="methodName">The name of the method where the exception occurred</param> | ||
void OnException(Exception exception, string methodName); | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's been a while since I've been in this code base, but why are we doing this for .NET 8 or higher only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question, because the queue is only supported in net8 or higher. The Redis PRQ does not exist for net48. Because screw net48!