Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions source/Halibut.Tests/ManyPollingTentacleTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Halibut.Logging;
using Halibut.Queue.QueuedDataStreams;
using Halibut.Queue.Redis;
using Halibut.Queue.Redis.MessageStorage;
using Halibut.Queue.Redis.RedisDataLossDetection;
using Halibut.Queue.Redis.RedisHelpers;
using Halibut.ServiceModel;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#if NET8_0_OR_GREATER
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's been a while since I've been in this code base, but why are we doing this for .NET 8 or higher only?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, because the queue is only supported in net8 or higher. The Redis PRQ does not exist for net48. Because screw net48!

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Halibut.Diagnostics;
using Halibut.Diagnostics.LogCreators;
using Halibut.Logging;
using Halibut.Queue;
using Halibut.Queue.MessageStreamWrapping;
using Halibut.Queue.QueuedDataStreams;
using Halibut.Queue.Redis;
using Halibut.Queue.Redis.MessageStorage;
using Halibut.Queue.Redis.RedisHelpers;
using Halibut.Tests.Builders;
using Halibut.Tests.Queue.Redis.Utils;
using Halibut.Tests.Support;
using Halibut.Tests.Support.Logging;
using Halibut.Tests.Support.TestAttributes;
using Halibut.Tests.Util;
using Halibut.TestUtils.Contracts;
using Halibut.Transport.Protocol;
using NUnit.Framework;

namespace Halibut.Tests.Queue.Redis
{
[RedisTest]
public class MessageSerialiserAndDataStreamStorageExceptionObserverTest : BaseTest
{
[Test]
public async Task WhenStoreDataStreamsThrows_ExceptionObserverShouldBeNotified()
{
// Arrange
var endpoint = new Uri("poll://" + Guid.NewGuid());
await using var redisFacade = RedisFacadeBuilder.CreateRedisFacade();
var redisTransport = new HalibutRedisTransport(redisFacade);

var expectedException = new InvalidOperationException("Test exception from storage");
var throwingDataStreamStorage = new ThrowingStoreDataStreamsForDistributedQueues(expectedException);
var testObserver = new TestMessageSerialiserAndDataStreamStorageExceptionObserver();

var queueMessageSerializer = new QueueMessageSerializerBuilder().Build();
var logFactory = new TestContextLogCreator("Redis", LogLevel.Trace).ToCachingLogFactory();
var factory = new RedisPendingRequestQueueFactory(
queueMessageSerializer,
throwingDataStreamStorage,
new RedisNeverLosesData(),
redisTransport,
new HalibutTimeoutsAndLimits(),
logFactory,
testObserver);

var sut = (RedisPendingRequestQueue)factory.CreateQueue(endpoint);
await sut.WaitUntilQueueIsSubscribedToReceiveMessages();

// Create a request with data streams to trigger PrepareRequest -> StoreDataStreams
var request = new RequestMessageBuilder("poll://test-endpoint").Build();
request.Params = new[] { new ComplexObjectMultipleDataStreams(DataStream.FromString("hello"), DataStream.FromString("world")) };

// Act & Assert
var exception = await AssertThrowsAny.Exception(async () => await sut.QueueAndWaitAsync(request, CancellationToken));

// Verify that the observer was called
testObserver.ObservedException.Should().NotBeNull("Exception observer should have been called");
testObserver.ObservedException.Should().BeSameAs(expectedException, "Observer should receive the original exception");
testObserver.MethodName.Should().Be(nameof(IMessageSerialiserAndDataStreamStorage.PrepareRequest), "Observer should know which method threw the exception");

// Verify the original exception is still thrown
exception.Should().NotBeNull("Original exception should still be thrown");
exception.InnerException.Should().BeSameAs(expectedException, "Original exception should be preserved");
}

public class QueueMessageSerializerBuilder
{
public QueueMessageSerializer Build()
{
var typeRegistry = new TypeRegistry();
typeRegistry.Register(typeof(IComplexObjectService));

StreamCapturingJsonSerializer StreamCapturingSerializer()
{
var settings = MessageSerializerBuilder.CreateSerializer();
var binder = new RegisteredSerializationBinder(typeRegistry);
settings.SerializationBinder = binder;
return new StreamCapturingJsonSerializer(settings);
}

return new QueueMessageSerializer(StreamCapturingSerializer, new MessageStreamWrappers());
}
}

class ThrowingStoreDataStreamsForDistributedQueues : IStoreDataStreamsForDistributedQueues
{
readonly Exception exceptionToThrow;

public ThrowingStoreDataStreamsForDistributedQueues(Exception exceptionToThrow)
{
this.exceptionToThrow = exceptionToThrow;
}

public Task<byte[]> StoreDataStreams(IReadOnlyList<DataStream> dataStreams, CancellationToken cancellationToken)
{
throw exceptionToThrow;
}

public Task RehydrateDataStreams(byte[] dataStreamMetadata, List<IRehydrateDataStream> rehydrateDataStreams, CancellationToken cancellationToken)
{
throw exceptionToThrow;
}

public ValueTask DisposeAsync()
{
return ValueTask.CompletedTask;
}
}

class TestMessageSerialiserAndDataStreamStorageExceptionObserver : IMessageSerialiserAndDataStreamStorageExceptionObserver
{
public Exception? ObservedException { get; private set; }
public string? MethodName { get; private set; }

public void OnException(Exception exception, string methodName)
{
ObservedException = exception;
MethodName = methodName;
}
}
}
}
#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
#if NET8_0_OR_GREATER
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using FluentAssertions;
using Halibut.Queue.Redis.RedisHelpers;
using Halibut.Tests.Queue.Redis.Utils;
using Halibut.Tests.Support;
using Halibut.Tests.Util;
using NUnit.Framework;
using StackExchange.Redis;

namespace Halibut.Tests.Queue.Redis.RedisHelpers
{
[RedisTest]
public class RedisFacadeObserverTest : BaseTest
{
[Test]
public async Task WhenRedisConnectionGoesDown_ObserverShouldBeNotifiedOfRetryExceptions()
{
// Arrange
using var portForwarder = PortForwardingToRedisBuilder.ForwardingToRedis(Logger);
var testObserver = new TestRedisFacadeObserver();

await using var redisFacade = RedisFacadeBuilder.CreateRedisFacade(portForwarder, redisFacadeObserver: testObserver);

// Verify Redis is working initially
await redisFacade.SetString("foo", "bar", TimeSpan.FromMinutes(1), CancellationToken);
(await redisFacade.GetString("foo", CancellationToken)).Should().Be("bar");

// Kill Redis connections to simulate network issues
portForwarder.EnterKillNewAndExistingConnectionsMode();

// This should trigger retries and call the observer
var getStringTask = redisFacade.GetString("foo", CancellationToken);

// Wait for retries to happen, then restore connection
await ShouldEventually.Eventually(() => testObserver.ExecuteWithRetryExceptions.Count.Should().BeGreaterThanOrEqualTo(1),
TimeSpan.FromSeconds(30),
CancellationToken);
portForwarder.ReturnToNormalMode();

// The operation should eventually succeed
var result = await getStringTask;
result.Should().Be("bar");

// Verify that the observer was called with retry exceptions
testObserver.ExecuteWithRetryExceptions.Should().NotBeEmpty("Observer should have been called for retry exceptions during connection issues");
testObserver.ExecuteWithRetryExceptions.Should().AllSatisfy(ex =>
{
ex.Exception.Should().NotBeNull("Exception should not be null");
ex.WillRetry.Should().BeTrue();
});

testObserver.ConnectionRestorations.Count.Should().BeGreaterThanOrEqualTo(1);
testObserver.ConnectionFailures.Count.Should().BeGreaterThanOrEqualTo(1);
}

[Test]
public async Task WhenRedisConnectionGoesDown_AndStaysDown_ObserverShouldBeNotifiedOfRetryExceptions()
{
// Arrange
using var portForwarder = PortForwardingToRedisBuilder.ForwardingToRedis(Logger);
var testObserver = new TestRedisFacadeObserver();

await using var redisFacade = RedisFacadeBuilder.CreateRedisFacade(portForwarder, redisFacadeObserver: testObserver);
redisFacade.MaxDurationToRetryFor = TimeSpan.FromSeconds(1);

// Verify Redis is working initially
await redisFacade.SetString("foo", "bar", TimeSpan.FromMinutes(1), CancellationToken);
(await redisFacade.GetString("foo", CancellationToken)).Should().Be("bar");

// Kill Redis connections to simulate network issues
portForwarder.EnterKillNewAndExistingConnectionsMode();

// This should trigger retries and call the observer, then ultimately fail
var exception = await AssertThrowsAny.Exception(async () =>
await redisFacade.GetString("foo", CancellationToken));

exception.Should().NotBeNull("Operation should fail when Redis stays down");

// Verify that the observer was called with retry exceptions
testObserver.ExecuteWithRetryExceptions.Should().NotBeEmpty("Observer should have been called for retry exceptions during connection issues");
testObserver.ExecuteWithRetryExceptions.Should().AllSatisfy(ex =>
{
ex.Exception.Should().NotBeNull("Exception should not be null");
ex.WillRetry.Should().BeFalse();
});

testObserver.ConnectionRestorations.Count.Should().Be(0);
testObserver.ConnectionFailures.Count.Should().BeGreaterThanOrEqualTo(1);
}

class TestRedisFacadeObserver : IRedisFacadeObserver
{
readonly object mutex = new object();
readonly List<(string? EndPoint, ConnectionFailureType FailureType, Exception? Exception)> connectionFailures = new();
readonly List<(string? EndPoint, string Message)> errorMessages = new();
readonly List<string?> connectionRestorations = new();
readonly List<(Exception Exception, bool WillRetry)> executeWithRetryExceptions = new();

public List<(string? EndPoint, ConnectionFailureType FailureType, Exception? Exception)> ConnectionFailures
{
get
{
lock (mutex)
{
return new List<(string? EndPoint, ConnectionFailureType FailureType, Exception? Exception)>(connectionFailures);
}
}
}

public List<(string? EndPoint, string Message)> ErrorMessages
{
get
{
lock (mutex)
{
return new List<(string? EndPoint, string Message)>(errorMessages);
}
}
}

public List<string?> ConnectionRestorations
{
get
{
lock (mutex)
{
return new List<string?>(connectionRestorations);
}
}
}

public List<(Exception Exception, bool WillRetry)> ExecuteWithRetryExceptions
{
get
{
lock (mutex)
{
return new List<(Exception Exception, bool WillRetry)>(executeWithRetryExceptions);
}
}
}

public void OnRedisConnectionFailed(string? endPoint, ConnectionFailureType failureType, Exception? exception)
{
lock (mutex)
{
connectionFailures.Add((endPoint, failureType, exception));
}
}

public void OnRedisServerRepliedWithAnErrorMessage(string? endPoint, string message)
{
lock (mutex)
{
errorMessages.Add((endPoint, message));
}
}

public void OnRedisConnectionRestored(string? endPoint)
{
lock (mutex)
{
connectionRestorations.Add(endPoint);
}
}

public void OnRedisOperationFailed(Exception exception, bool willRetry)
{
lock (mutex)
{
executeWithRetryExceptions.Add((exception, willRetry));
}
}
}
}
}
#endif
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,18 @@ namespace Halibut.Tests.Queue.Redis.Utils
{
public class RedisFacadeBuilder
{
public static RedisFacade CreateRedisFacade(string? host = null, int? port = 0, Guid? prefix = null)
public static RedisFacade CreateRedisFacade(string? host = null, int? port = 0, Guid? prefix = null, IRedisFacadeObserver? redisFacadeObserver = null)
{
port = port == 0 ? RedisTestHost.Port() : port;
return new RedisFacade((host??RedisTestHost.RedisHost) + ":" + port, (prefix ?? Guid.NewGuid()).ToString(), new TestContextLogCreator("Redis", LogLevel.Trace).CreateNewForPrefix(""));
return new RedisFacade((host??RedisTestHost.RedisHost) + ":" + port, (prefix ?? Guid.NewGuid()).ToString(), new TestContextLogCreator("Redis", LogLevel.Trace).CreateNewForPrefix(""), redisFacadeObserver);
}

public static RedisFacade CreateRedisFacade(PortForwarder portForwarder, Guid? prefix = null)
public static RedisFacade CreateRedisFacade(PortForwarder portForwarder, Guid? prefix = null, IRedisFacadeObserver? redisFacadeObserver = null)
{
return CreateRedisFacade(host: portForwarder.PublicEndpoint.Host,
port: portForwarder.ListeningPort,
prefix: prefix);
prefix: prefix,
redisFacadeObserver: redisFacadeObserver);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System;

namespace Halibut.Queue.Redis.MessageStorage
{
/// <summary>
/// Observes exceptions that occur within IMessageSerialiserAndDataStreamStorage implementations.
/// This allows monitoring and logging of errors during message serialization and data stream operations.
/// </summary>
public interface IMessageSerialiserAndDataStreamStorageExceptionObserver
{
/// <summary>
/// Called when an exception occurs in any IMessageSerialiserAndDataStreamStorage method.
/// Errors caught here are most likely caused by the Redis Pending Request Queue itself.
/// </summary>
/// <param name="exception">The exception that was raised</param>
/// <param name="methodName">The name of the method where the exception occurred</param>
void OnException(Exception exception, string methodName);
}
}
Loading