Skip to content

Commit 37f06ae

Browse files
committed
fix: prevent deadlock in OutboxWorker
1 parent a59f3ec commit 37f06ae

File tree

8 files changed

+165
-13
lines changed

8 files changed

+165
-13
lines changed

Directory.Build.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<Project>
22
<PropertyGroup Label="Package information">
33
<BaseVersionSuffix></BaseVersionSuffix>
4-
<BaseVersion>4.3.0$(BaseVersionSuffix)</BaseVersion>
4+
<BaseVersion>4.3.1$(BaseVersionSuffix)</BaseVersion>
55
<DatabasePackagesRevision>1</DatabasePackagesRevision>
66
<DatabasePackagesVersionSuffix>$(BaseVersionSuffix)</DatabasePackagesVersionSuffix>
77
</PropertyGroup>

docs/releases.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,12 @@ uid: releases
44

55
# Releases
66

7+
## [4.3.1](https://github.com/BEagle1984/silverback/releases/tag/v4.3.1)
8+
9+
### Fixes
10+
11+
* Fix deadlock in `OutboxWorker` when `enforceMessageOrder=true` (default)
12+
713
## [4.3.0](https://github.com/BEagle1984/silverback/releases/tag/v4.3.0)
814

915
### What's new

src/Silverback.Integration/Messaging/Outbound/TransactionalOutbox/OutboxWorker.cs

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -165,15 +165,28 @@ private async Task ProcessQueueAsync(
165165
{
166166
_logger.LogProcessingOutboxStoredMessage(i + 1, outboxMessages.Count);
167167

168-
await ProcessMessageAsync(
169-
outboxMessages[i],
170-
failedMessages,
171-
outboxReader,
172-
serviceProvider)
173-
.ConfigureAwait(false);
168+
try
169+
{
170+
await ProcessMessageAsync(
171+
outboxMessages[i],
172+
failedMessages,
173+
outboxReader,
174+
serviceProvider)
175+
.ConfigureAwait(false);
176+
}
177+
catch (Exception)
178+
{
179+
// Subtract the produce operations that will never be initiated
180+
Interlocked.Add(ref _pendingProduceOperations, -(outboxMessages.Count - i - 1));
181+
throw;
182+
}
174183

175184
if (stoppingToken.IsCancellationRequested)
185+
{
186+
// Subtract the produce operations that will never be initiated
187+
Interlocked.Add(ref _pendingProduceOperations, -(outboxMessages.Count - i - 1));
176188
break;
189+
}
177190
}
178191
}
179192
finally
@@ -247,8 +260,7 @@ private IProducerEndpoint GetTargetEndpoint(
247260

248261
var targetEndpoint = outboundRoutes
249262
.SelectMany(route => route.GetOutboundRouter(serviceProvider).Endpoints)
250-
.FirstOrDefault(
251-
endpoint => endpoint.Name == endpointName || endpoint.DisplayName == endpointName);
263+
.FirstOrDefault(endpoint => endpoint.Name == endpointName || endpoint.DisplayName == endpointName);
252264

253265
if (targetEndpoint == null)
254266
{
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFrameworks>$(DefaultTargetFrameworks)</TargetFrameworks>
5+
<RootNamespace>$(RootNamespace)</RootNamespace>
6+
<Version>$(BaseVersion)</Version>
7+
<Authors>$(Authors)</Authors>
8+
<Company>$(Company)</Company>
9+
<PackageLicenseExpression>$(License)</PackageLicenseExpression>
10+
<Copyright>$(Copyright)</Copyright>
11+
<PackageProjectUrl>$(ProjectUrl)</PackageProjectUrl>
12+
<RepositoryUrl>$(RepositoryUrl)</RepositoryUrl>
13+
<RepositoryType>$(RepositoryType)</RepositoryType>
14+
<GeneratePackageOnBuild>${GeneratePackageOnBuild}</GeneratePackageOnBuild>
15+
<Description>$(Description) This package contains an implementation of Silverback.Storage that stores the data in memory.</Description>
16+
<PackageIconUrl>$(IconUrl)</PackageIconUrl>
17+
<PackageTags>$(Tags)</PackageTags>
18+
<LangVersion>$(LangVersion)</LangVersion>
19+
<EnforceCodeStyleInBuild>true</EnforceCodeStyleInBuild>
20+
<PackageId>Silverback.Storage.Memory</PackageId>
21+
<Product>Silverback.Storage.Memory</Product>
22+
</PropertyGroup>
23+
24+
<ItemGroup>
25+
<ProjectReference Include="..\Silverback.Core\Silverback.Core.csproj" />
26+
<ProjectReference Include="..\Silverback.Integration\Silverback.Integration.csproj" />
27+
<ProjectReference Include="..\Silverback.Integration.Kafka\Silverback.Integration.Kafka.csproj" />
28+
</ItemGroup>
29+
30+
<ItemGroup>
31+
<PackageReference Update="Microsoft.SourceLink.GitHub" Version="1.1.1" />
32+
</ItemGroup>
33+
34+
</Project>
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFrameworks>$(DefaultTargetFrameworks)</TargetFrameworks>
5+
<RootNamespace>$(RootNamespace)</RootNamespace>
6+
<Version>$(BaseVersion)</Version>
7+
<Authors>$(Authors)</Authors>
8+
<Company>$(Company)</Company>
9+
<PackageLicenseExpression>$(License)</PackageLicenseExpression>
10+
<Copyright>$(Copyright)</Copyright>
11+
<PackageProjectUrl>$(ProjectUrl)</PackageProjectUrl>
12+
<RepositoryUrl>$(RepositoryUrl)</RepositoryUrl>
13+
<RepositoryType>$(RepositoryType)</RepositoryType>
14+
<GeneratePackageOnBuild>${GeneratePackageOnBuild}</GeneratePackageOnBuild>
15+
<Description>$(Description) This package contains an implementation of Silverback.Storage that stores the data in Sqlite.</Description>
16+
<PackageIconUrl>$(IconUrl)</PackageIconUrl>
17+
<PackageTags>$(Tags)</PackageTags>
18+
<LangVersion>$(LangVersion)</LangVersion>
19+
<EnforceCodeStyleInBuild>true</EnforceCodeStyleInBuild>
20+
<PackageId>Silverback.Storage.Sqlite</PackageId>
21+
<Product>Silverback.Storage.Sqlite</Product>
22+
</PropertyGroup>
23+
24+
<ItemGroup>
25+
<ProjectReference Include="..\Silverback.Core\Silverback.Core.csproj" />
26+
<ProjectReference Include="..\Silverback.Integration\Silverback.Integration.csproj" />
27+
<ProjectReference Include="..\Silverback.Storage.Memory\Silverback.Storage.Memory.csproj" />
28+
</ItemGroup>
29+
30+
<ItemGroup>
31+
<PackageReference Include="Microsoft.Data.Sqlite" Version="7.0.1" />
32+
<PackageReference Include="System.Linq.Async" Version="6.0.1" />
33+
<PackageReference Update="Microsoft.SourceLink.GitHub" Version="1.1.1" />
34+
</ItemGroup>
35+
36+
</Project>

tests/Silverback.Integration.Tests/Messaging/Outbound/TransactionalOutbox/OutboxWorkerTests.cs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,5 +167,56 @@ await _outboxWriter.WriteAsync(
167167

168168
_broker.ProducedMessages.Should().HaveCount(3);
169169
}
170+
171+
[Fact]
172+
public async Task ProcessQueue_ProduceError_Retried()
173+
{
174+
await _outboxWriter.WriteAsync(
175+
new TestEventOne { Content = "Test" },
176+
null,
177+
null,
178+
"topic1",
179+
"topic1");
180+
await _outboxWriter.WriteAsync(
181+
new TestEventTwo { Content = "Test" },
182+
null,
183+
null,
184+
"topic2",
185+
"topic2");
186+
await _outboxWriter.WriteAsync(
187+
new TestEventOne { Content = "Test" },
188+
null,
189+
null,
190+
"topic1",
191+
"topic1");
192+
await _outboxWriter.WriteAsync(
193+
new TestEventOne { Content = "Test" },
194+
null,
195+
null,
196+
"topic1",
197+
"topic1");
198+
await _outboxWriter.CommitAsync();
199+
200+
_broker.FailProduceNumber = new[] { 2, 3 }; // Note: counter is per producer / topic
201+
202+
await _worker.ProcessQueueAsync(CancellationToken.None);
203+
204+
_broker.ProducedMessages.Should().HaveCount(2);
205+
_broker.ProducedMessages[0].Endpoint.Name.Should().Be("topic1");
206+
_broker.ProducedMessages[1].Endpoint.Name.Should().Be("topic2");
207+
208+
await _worker.ProcessQueueAsync(CancellationToken.None);
209+
210+
_broker.ProducedMessages.Should().HaveCount(2);
211+
_broker.ProducedMessages[0].Endpoint.Name.Should().Be("topic1");
212+
_broker.ProducedMessages[1].Endpoint.Name.Should().Be("topic2");
213+
214+
await _worker.ProcessQueueAsync(CancellationToken.None);
215+
216+
_broker.ProducedMessages.Should().HaveCount(3);
217+
_broker.ProducedMessages[0].Endpoint.Name.Should().Be("topic1");
218+
_broker.ProducedMessages[1].Endpoint.Name.Should().Be("topic2");
219+
_broker.ProducedMessages[2].Endpoint.Name.Should().Be("topic1");
220+
}
170221
}
171222
}

tests/Silverback.Integration.Tests/TestTypes/TestBroker.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ public TestBroker(IServiceProvider serviceProvider)
2222

2323
public bool SimulateConnectIssues { get; set; }
2424

25+
public IReadOnlyCollection<int>? FailProduceNumber { get; set; }
26+
2527
protected override Task ConnectAsync(
2628
IReadOnlyCollection<IProducer> producers,
2729
IReadOnlyCollection<IConsumer> consumers)

tests/Silverback.Integration.Tests/TestTypes/TestProducer.cs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System;
55
using System.Collections.Generic;
66
using System.IO;
7+
using System.Linq;
78
using System.Threading.Tasks;
89
using NSubstitute;
910
using Silverback.Diagnostics;
@@ -17,6 +18,8 @@ namespace Silverback.Tests.Integration.TestTypes
1718
{
1819
public class TestProducer : Producer<TestBroker, TestProducerEndpoint>
1920
{
21+
private int _produceCount;
22+
2023
public TestProducer(
2124
TestBroker broker,
2225
TestProducerEndpoint endpoint,
@@ -51,7 +54,7 @@ public TestProducer(
5154
IReadOnlyCollection<MessageHeader>? headers,
5255
string actualEndpointName)
5356
{
54-
ProducedMessages.Add(new ProducedMessage(messageBytes, headers, Endpoint));
57+
PerformMockProduce(messageBytes, headers);
5558
return null;
5659
}
5760

@@ -80,7 +83,7 @@ protected override void ProduceCore(
8083
Action<IBrokerMessageIdentifier?> onSuccess,
8184
Action<Exception> onError)
8285
{
83-
ProducedMessages.Add(new ProducedMessage(messageBytes, headers, Endpoint));
86+
PerformMockProduce(messageBytes, headers);
8487
onSuccess.Invoke(null);
8588
}
8689

@@ -101,7 +104,7 @@ await messageStream.ReadAllAsync().ConfigureAwait(false),
101104
IReadOnlyCollection<MessageHeader>? headers,
102105
string actualEndpointName)
103106
{
104-
ProducedMessages.Add(new ProducedMessage(messageBytes, headers, Endpoint));
107+
PerformMockProduce(messageBytes, headers);
105108
return Task.FromResult<IBrokerMessageIdentifier?>(null);
106109
}
107110

@@ -128,9 +131,17 @@ protected override Task ProduceCoreAsync(
128131
Action<IBrokerMessageIdentifier?> onSuccess,
129132
Action<Exception> onError)
130133
{
131-
ProducedMessages.Add(new ProducedMessage(messageBytes, headers, Endpoint));
134+
PerformMockProduce(messageBytes, headers);
132135
onSuccess.Invoke(null);
133136
return Task.CompletedTask;
134137
}
138+
139+
private void PerformMockProduce(byte[]? messageBytes, IReadOnlyCollection<MessageHeader>? headers)
140+
{
141+
if (Broker.FailProduceNumber != null && Broker.FailProduceNumber.Contains(++_produceCount))
142+
throw new InvalidOperationException("Produce failed (mock).");
143+
144+
ProducedMessages.Add(new ProducedMessage(messageBytes, headers, Endpoint));
145+
}
135146
}
136147
}

0 commit comments

Comments
 (0)