Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/NServiceBus.Transport.SqlServer.UnitTests/.editorconfig
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
[*.cs]

# Justification: Test project
dotnet_diagnostic.CA2007.severity = none

# Justification: Tests don't support cancellation and don't need to forward IMessageHandlerContext.CancellationToken
dotnet_diagnostic.NSB0002.severity = suggestion

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
namespace NServiceBus.Transport.SqlServer.UnitTests
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;

[TestFixture]
public class CachedSubscriptionStoreTests
{
// Reproduces a bug that previously existed in the original implementation that stored tasks and their outcomes in the cache (see #1588)
// leaving this test around to make sure such a problem doesn't occur again.
[Test]
public void Should_not_cache_cancelled_operations()
{
var subscriptionStore = new FakeSubscriptionStore
{
GetSubscribersAction = (_, token) => token.IsCancellationRequested ? Task.FromCanceled<List<string>>(token) : Task.FromResult(new List<string>())
};

var cache = new CachedSubscriptionStore(subscriptionStore, TimeSpan.FromSeconds(60));

Assert.Multiple(() =>
{
_ = Assert.ThrowsAsync<TaskCanceledException>(async () =>
await cache.GetSubscribers(typeof(object), new CancellationToken(true)));
Assert.DoesNotThrowAsync(async () => await cache.GetSubscribers(typeof(object), CancellationToken.None));
});
}

class FakeSubscriptionStore : ISubscriptionStore
{
public Func<Type, CancellationToken, Task<List<string>>> GetSubscribersAction { get; set; } =
(type, token) => Task.FromResult(new List<string>());

public Task<List<string>> GetSubscribers(Type eventType, CancellationToken cancellationToken = default) =>
GetSubscribersAction(eventType, cancellationToken);

public Task Subscribe(string endpointName, string endpointAddress, Type eventType,
CancellationToken cancellationToken = default) =>
throw new NotImplementedException();

public Task Unsubscribe(string endpointName, Type eventType, CancellationToken cancellationToken = default) =>
throw new NotImplementedException();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ namespace NServiceBus.Transport.SqlServer.UnitTests.Receiving
using NUnit.Framework;
using NServiceBus.Transport.SqlServer;

#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task

// Ideally the circuit breaker would use a time provider to allow for easier testing but that would require a significant refactor
// and we want keep the changes to a minimum for now to allow backporting to older versions.
[TestFixture]
Expand Down Expand Up @@ -222,5 +220,4 @@ public async Task Should_trigger_after_multiple_failures_and_timeout()
Assert.That(triggerActionCalled, Is.True, "The trigger action should be called after repeated failures and timeout.");
}
}
}
#pragma warning restore CA2007 // Consider calling ConfigureAwait on the awaited task
}
156 changes: 120 additions & 36 deletions src/NServiceBus.Transport.SqlServer/PubSub/CachedSubscriptionStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,67 +3,151 @@ namespace NServiceBus.Transport.SqlServer
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;


class CachedSubscriptionStore : ISubscriptionStore
sealed class CachedSubscriptionStore : ISubscriptionStore, IDisposable
{
public CachedSubscriptionStore(ISubscriptionStore inner, TimeSpan cacheFor)
public async Task<List<string>> GetSubscribers(Type eventType, CancellationToken cancellationToken = default)
{
this.inner = inner;
this.cacheFor = cacheFor;
var cacheKey = CacheKey(eventType);
var cachedSubscriptions = Cache.GetOrAdd(cacheKey,
static (_, state) => new CachedSubscriptions(state.inner, state.eventType, state.cacheFor),
(inner, eventType, cacheFor));

return await cachedSubscriptions.EnsureFresh(cancellationToken).ConfigureAwait(false);
}

public Task<List<string>> GetSubscribers(Type eventType, CancellationToken cancellationToken = default)
public async Task Subscribe(string endpointName, string endpointAddress, Type eventType, CancellationToken cancellationToken = default)
{
var cacheItem = Cache.GetOrAdd(CacheKey(eventType),
_ => new CacheItem
{
StoredUtc = DateTime.UtcNow,
Subscribers = inner.GetSubscribers(eventType, cancellationToken)
});

var age = DateTime.UtcNow - cacheItem.StoredUtc;
if (age >= cacheFor)
try
{
cacheItem.Subscribers = inner.GetSubscribers(eventType, cancellationToken);
cacheItem.StoredUtc = DateTime.UtcNow;
await inner.Subscribe(endpointName, endpointAddress, eventType, cancellationToken).ConfigureAwait(false);
}
finally
{
await Clear(CacheKey(eventType))
.ConfigureAwait(false);
}

return cacheItem.Subscribers;
}

public async Task Subscribe(string endpointName, string endpointAddress, Type eventType, CancellationToken cancellationToken = default)
public async Task Unsubscribe(string endpointName, Type eventType, CancellationToken cancellationToken = default)
{
await inner.Subscribe(endpointName, endpointAddress, eventType, cancellationToken).ConfigureAwait(false);
ClearForMessageType(CacheKey(eventType));
try
{
await inner.Unsubscribe(endpointName, eventType, cancellationToken).ConfigureAwait(false);
}
finally
{
await Clear(CacheKey(eventType))
.ConfigureAwait(false);
}
}

public async Task Unsubscribe(string endpointName, Type eventType, CancellationToken cancellationToken = default)
public void Dispose()
{
await inner.Unsubscribe(endpointName, eventType, cancellationToken).ConfigureAwait(false);
ClearForMessageType(CacheKey(eventType));
if (Cache.IsEmpty)
{
return;
}

foreach (var subscription in Cache.Values)
{
subscription.Dispose();
}

Cache.Clear();
}

void ClearForMessageType(string topic)
#pragma warning disable PS0018 // Clear should not be cancellable
ValueTask Clear(string cacheKey) => Cache.TryGetValue(cacheKey, out var cachedSubscriptions) ? cachedSubscriptions.Clear() : default;
#pragma warning restore PS0018

static string CacheKey(Type eventType) => eventType.FullName;

readonly ConcurrentDictionary<string, CachedSubscriptions> Cache = new();
readonly ISubscriptionStore inner;
readonly TimeSpan cacheFor;

public CachedSubscriptionStore(ISubscriptionStore inner, TimeSpan cacheFor)
{
Cache.TryRemove(topic, out _);
this.inner = inner;
this.cacheFor = cacheFor;
}

static string CacheKey(Type eventType)
sealed class CachedSubscriptions : IDisposable
{
return eventType.FullName;
}
readonly SemaphoreSlim fetchSemaphore = new(1, 1);

TimeSpan cacheFor;
ISubscriptionStore inner;
ConcurrentDictionary<string, CacheItem> Cache = new ConcurrentDictionary<string, CacheItem>();
List<string> cachedSubscriptions;
long cachedAtTimestamp;
readonly ISubscriptionStore store;
readonly Type eventType;
readonly TimeSpan cacheFor;

class CacheItem
{
public DateTime StoredUtc { get; set; } // Internal usage, only set/get using private
public Task<List<string>> Subscribers { get; set; }
public CachedSubscriptions(ISubscriptionStore store, Type eventType, TimeSpan cacheFor)
{
this.store = store;
this.eventType = eventType;
this.cacheFor = cacheFor;
}

public async ValueTask<List<string>> EnsureFresh(CancellationToken cancellationToken = default)
{
var cachedSubscriptionsSnapshot = cachedSubscriptions;
var cachedAtTimestampSnapshot = cachedAtTimestamp;

if (cachedSubscriptionsSnapshot != null && GetElapsedTime(cachedAtTimestampSnapshot) < cacheFor)
{
return cachedSubscriptionsSnapshot;
}

await fetchSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);

try
{
if (cachedSubscriptions != null && GetElapsedTime(cachedAtTimestamp) < cacheFor)
{
return cachedSubscriptions;
}

cachedSubscriptions = await store.GetSubscribers(eventType, cancellationToken).ConfigureAwait(false);
cachedAtTimestamp = Stopwatch.GetTimestamp();

return cachedSubscriptions;
}
finally
{
fetchSemaphore.Release();
}
}

static readonly double tickFrequency = (double)TimeSpan.TicksPerSecond / Stopwatch.Frequency;
static TimeSpan GetElapsedTime(long startingTimestamp) =>
GetElapsedTime(startingTimestamp, Stopwatch.GetTimestamp());
static TimeSpan GetElapsedTime(long startingTimestamp, long endingTimestamp) =>
new((long)((endingTimestamp - startingTimestamp) * tickFrequency));

#pragma warning disable PS0018 // Clear should not be cancellable
public async ValueTask Clear()
#pragma warning restore PS0018
{
try
{
await fetchSemaphore.WaitAsync(CancellationToken.None).ConfigureAwait(false);

cachedSubscriptions = null;
cachedAtTimestamp = 0;
}
finally
{
fetchSemaphore.Release();
}
}

public void Dispose() => fetchSemaphore.Dispose();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ public async Task ChangeConcurrency(PushRuntimeSettings newLimitations, Cancella

public async Task StopReceive(CancellationToken cancellationToken = default)
{
if (messageReceivingCancellationTokenSource == null)
{
// already stopped or never started
return;
}

messageReceivingCancellationTokenSource?.Cancel();

using (cancellationToken.Register(() => messageProcessingCancellationTokenSource?.Cancel()))
Expand All @@ -145,6 +151,7 @@ public async Task StopReceive(CancellationToken cancellationToken = default)
messageProcessingCircuitBreaker.Dispose();
concurrencyLimiter.Dispose();
messageReceivingCancellationTokenSource?.Dispose();
messageReceivingCancellationTokenSource = null;
messageProcessingCancellationTokenSource?.Dispose();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,22 @@ public void ConfigureSendInfrastructure()
connectionFactory);
}

public override Task Shutdown(CancellationToken cancellationToken = default)
public override async Task Shutdown(CancellationToken cancellationToken = default)
{
return dueDelayedMessageProcessor?.Stop(cancellationToken) ?? Task.FromResult(0);
try
{
await Task.WhenAll(Receivers.Values.Select(pump => pump.StopReceive(cancellationToken)))
.ConfigureAwait(false);

if (dueDelayedMessageProcessor != null)
{
await dueDelayedMessageProcessor.Stop(cancellationToken).ConfigureAwait(false);
}
}
finally
{
(subscriptionStore as IDisposable)?.Dispose();
}
}

#pragma warning disable CS0618 // Type or member is obsolete
Expand Down
Loading