Skip to content

Commit 5d84698

Browse files
authored
Added abstraction for Secondary Indexes building (#5035)
Added needed abstractions for handling secondary indexes and plugged them to plugin. This is the first step before plugging the real DuckDB implementation. 1. `SecondaryIndexSubscription` - responsible for listening to log and indexing events that were stored there. 2. Indexing and committing happens in the implementation of `ISecondaryIndexProcessor`. 3. `SecondaryIndexCheckpointTracker` is responsible for triggering index commits. Commits happen when the maximum size of pending batches is reached or when the maximum time has elapsed since the previous commit. 4. Commit settings can be configured through options: CheckpointCommitBatchSize and CheckpointCommitDelayMs exposed through database settings. 5. All of that is handled by the `SecondaryIndexBuilder` hosted service, responsible for running the subscription. You can see how it will work for DuckDB in simplified fake in-memory implementations. Besides that plugged integration tests based on the Connectors plugins. Expanded them to make it possible to run them in parallel. **_Note:_ I also made `_clusterNodeMutex` NOT be acquired for the in-memory database in `ClusterVNodeHostedService` (so just like the db block). I also released it on dispose it accordingly.** This enables having multiple instances of `ClusterVNodeHostedService` running in integration tests. Previous tests in connectors assumed that only one `ClusterVNodeHostedService` would be running in the assembly fixture, but that doesn't allow for testing different configuration settings (e.g. if the plugin was enabled or disabled).
1 parent 17d5253 commit 5d84698

26 files changed

+954
-201
lines changed

src/Connectors/KurrentDB.Connectors.Tests/ClusterVNodeApp.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
using KurrentDB.Core.Messages;
1515
// using KurrentDB.Surge.Testing.Extensions;
1616
using Microsoft.AspNetCore.Builder;
17+
using Microsoft.AspNetCore.Hosting;
1718
using Microsoft.Extensions.Configuration;
1819
using Microsoft.Extensions.DependencyInjection;
1920
using Microsoft.Extensions.Hosting;
@@ -38,7 +39,7 @@ public class ClusterVNodeApp : IAsyncDisposable {
3839

3940
public async Task<(ClusterVNodeOptions Options, IServiceProvider Services)> Start(TimeSpan? readinessTimeout = null, Dictionary<string, string?>? overrides = null, Action<IServiceCollection>? configureServices = null) {
4041
var settings = overrides is not null
41-
? DefaultSettings.With(x => overrides.ForEach((key, value) => x[key] = value))
42+
? DefaultSettings.ToDictionary().With(x => overrides.ForEach((key, value) => x[key] = value))
4243
: DefaultSettings;
4344

4445
var options = GetClusterVNodeOptions(settings);
@@ -54,6 +55,8 @@ public class ClusterVNodeApp : IAsyncDisposable {
5455
.With(x => x.Services.AddSingleton<IHostedService>(esdb))
5556
.With(x => configureServices?.Invoke(x.Services));
5657

58+
builder.WebHost.ConfigureKestrel(serverOptions => serverOptions.ListenAnyIP(0));
59+
5760
App = builder.Build().With(x => esdb.Node.Startup.Configure(x));
5861

5962
await App.StartAsync();

src/Connectors/KurrentDB.Connectors.Tests/ClusterVNodeFixture.cs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,10 @@ protected ClusterVNodeFixture() {
3737
public ILoggerFactory LoggerFactory { get; }
3838
public Faker Faker { get; }
3939

40-
public Action<IServiceCollection> ConfigureServices { get; init; } = _ => { };
41-
public Func<Task> OnSetup { get; init; } = () => Task.CompletedTask;
42-
public Func<Task> OnTearDown { get; init; } = () => Task.CompletedTask;
40+
public Action<IServiceCollection> ConfigureServices { get; init; } = _ => { };
41+
public Dictionary<string, string?>? Configuration { get; init; }
42+
public Func<Task> OnSetup { get; init; } = () => Task.CompletedTask;
43+
public Func<Task> OnTearDown { get; init; } = () => Task.CompletedTask;
4344

4445
public ClusterVNodeOptions NodeOptions { get; private set; } = null!;
4546
public IServiceProvider NodeServices { get; private set; } = null!;
@@ -48,7 +49,10 @@ protected ClusterVNodeFixture() {
4849
public ISubscriber Subscriber => NodeServices.GetRequiredService<ISubscriber>();
4950

5051
public async Task InitializeAsync() {
51-
var (options, services) = await ClusterVNodeApp.Start(configureServices: ConfigureServices);
52+
var (options, services) = await ClusterVNodeApp.Start(
53+
configureServices: ConfigureServices,
54+
overrides: Configuration
55+
);
5256

5357
NodeServices = services;
5458
NodeOptions = options;
@@ -59,6 +63,7 @@ public async Task InitializeAsync() {
5963
public async Task DisposeAsync() {
6064
try {
6165
await OnTearDown();
66+
await ClusterVNodeApp.DisposeAsync();
6267
}
6368
catch {
6469
// ignored

src/KurrentDB.Core.Tests/Helpers/MiniClusterNode.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,7 @@ public MiniClusterNode(string pathname, int debugIndex, IPEndPoint internalTcp,
7272
IPEndPoint httpEndPoint, EndPoint[] gossipSeeds, ISubsystem[] subsystems = null,
7373
bool enableTrustedAuth = false, int memTableSize = 1000, bool inMemDb = true,
7474
bool disableFlushToDisk = false, bool readOnlyReplica = false, int nodePriority = 0,
75-
string intHostAdvertiseAs = null, IExpiryStrategy expiryStrategy = null,
76-
IEnumerable<IVirtualStreamReader> virtualStreamReaders = null) {
75+
string intHostAdvertiseAs = null, IExpiryStrategy expiryStrategy = null) {
7776

7877
if (RuntimeInformation.IsOSX) {
7978
AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport",
@@ -194,7 +193,7 @@ public MiniClusterNode(string pathname, int debugIndex, IPEndPoint internalTcp,
194193
options.Application.AllowAnonymousEndpointAccess,
195194
options.Application.AllowAnonymousStreamAccess,
196195
options.Application.OverrideAnonymousEndpointAccessForGossip).Create(components.MainQueue)]))),
197-
virtualStreamReaders ?? [],
196+
virtualStreamReader: null,
198197
Array.Empty<IPersistentSubscriptionConsumerStrategyFactory>(),
199198
new OptionsCertificateProvider(),
200199
configuration: inMemConf,

src/KurrentDB.Core.Tests/Helpers/MiniNode.cs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -216,10 +216,6 @@ public MiniNode(string pathname,
216216
HighHasher = hash32bit ? new ConstantHasher(0) : options.HighHasher,
217217
});
218218

219-
var virtualStreamReaders = subsystems
220-
.OfType<ISecondaryIndexingPlugin>()
221-
.SelectMany(indexingPlugin => indexingPlugin.IndicesVirtualStreamReaders);
222-
223219
Node = new ClusterVNode<TStreamId>(options, logFormatFactory,
224220
new AuthenticationProviderFactory(
225221
c => authenticationProviderFactory ?? new InternalAuthenticationProviderFactory(
@@ -231,7 +227,6 @@ public MiniNode(string pathname,
231227
options.Application.AllowAnonymousEndpointAccess,
232228
options.Application.AllowAnonymousStreamAccess,
233229
options.Application.OverrideAnonymousEndpointAccessForGossip).Create(c.MainQueue)]))),
234-
virtualStreamReaders: virtualStreamReaders,
235230
expiryStrategy: expiryStrategy,
236231
certificateProvider: new OptionsCertificateProvider(),
237232
configuration: inMemConf,

src/KurrentDB.Core/ClusterVNode.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public static ClusterVNode<TStreamId> Create<TStreamId>(
103103
ILogFormatAbstractorFactory<TStreamId> logFormatAbstractorFactory,
104104
AuthenticationProviderFactory authenticationProviderFactory = null,
105105
AuthorizationProviderFactory authorizationProviderFactory = null,
106-
IEnumerable<IVirtualStreamReader> virtualStreamReaders = null,
106+
VirtualStreamReader virtualStreamReader = null,
107107
IReadOnlyList<IPersistentSubscriptionConsumerStrategyFactory> factories = null,
108108
CertificateProvider certificateProvider = null,
109109
IConfiguration configuration = null,
@@ -115,7 +115,7 @@ public static ClusterVNode<TStreamId> Create<TStreamId>(
115115
logFormatAbstractorFactory,
116116
authenticationProviderFactory,
117117
authorizationProviderFactory,
118-
virtualStreamReaders,
118+
virtualStreamReader,
119119
factories,
120120
certificateProvider,
121121
configuration,
@@ -235,7 +235,7 @@ public ClusterVNode(ClusterVNodeOptions options,
235235
ILogFormatAbstractorFactory<TStreamId> logFormatAbstractorFactory,
236236
AuthenticationProviderFactory authenticationProviderFactory = null,
237237
AuthorizationProviderFactory authorizationProviderFactory = null,
238-
IEnumerable<IVirtualStreamReader> virtualStreamReaders = null,
238+
VirtualStreamReader virtualStreamReader = null,
239239
IReadOnlyList<IPersistentSubscriptionConsumerStrategyFactory>
240240
additionalPersistentSubscriptionConsumerStrategyFactories = null,
241241
CertificateProvider certificateProvider = null,
@@ -782,16 +782,16 @@ void StartSubsystems() {
782782
var nodeStatusListener = new NodeStateListenerService(_mainQueue, memLog);
783783
_mainBus.Subscribe<SystemMessage.StateChangeMessage>(nodeStatusListener);
784784

785-
var inMemReader = new VirtualStreamReader([
785+
virtualStreamReader ??= new VirtualStreamReader();
786+
virtualStreamReader.Register(
786787
gossipListener.Stream,
787-
nodeStatusListener.Stream,
788-
..virtualStreamReaders ?? []
789-
]);
788+
nodeStatusListener.Stream
789+
);
790790

791791
// Storage Reader
792792
var storageReader = new StorageReaderService<TStreamId>(_mainQueue, _mainBus, readIndex,
793793
logFormat.SystemStreams,
794-
readerThreadsCount, Db.Config.WriterCheckpoint.AsReadOnly(), inMemReader, _queueStatsManager,
794+
readerThreadsCount, Db.Config.WriterCheckpoint.AsReadOnly(), virtualStreamReader, _queueStatsManager,
795795
trackers.QueueTrackers);
796796

797797
_mainBus.Subscribe<SystemMessage.SystemInit>(storageReader);
@@ -1157,7 +1157,7 @@ GossipAdvertiseInfo GetGossipAdvertiseInfo() {
11571157
_mainBus.Subscribe<StorageMessage.EventCommitted>(subscrQueue);
11581158
_mainBus.Subscribe<StorageMessage.InMemoryEventCommitted>(subscrQueue);
11591159

1160-
var subscription = new SubscriptionsService<TStreamId>(_mainQueue, subscrQueue, _authorizationProvider, readIndex, inMemReader);
1160+
var subscription = new SubscriptionsService<TStreamId>(_mainQueue, subscrQueue, _authorizationProvider, readIndex, virtualStreamReader);
11611161
subscrBus.Subscribe<SystemMessage.SystemStart>(subscription);
11621162
subscrBus.Subscribe<SystemMessage.BecomeShuttingDown>(subscription);
11631163
subscrBus.Subscribe<TcpMessage.ConnectionClosed>(subscription);

src/KurrentDB.Core/Configuration/ClusterVNodeOptionsExtensions.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using System.Net;
88
using System.Security.Cryptography.X509Certificates;
99
using EventStore.Plugins;
10+
using EventStore.Plugins.Subsystems;
1011
using KurrentDB.Common.Exceptions;
1112
using KurrentDB.Common.Utils;
1213
using KurrentDB.Core.Certificates;
@@ -20,6 +21,9 @@ public static ClusterVNodeOptions Reload(this ClusterVNodeOptions options) =>
2021
? options
2122
: ClusterVNodeOptions.FromConfiguration(options.ConfigurationRoot);
2223

24+
public static ClusterVNodeOptions WithPlugableComponents(this ClusterVNodeOptions options, ISubsystemsPlugin subsystemsPlugin) =>
25+
options with { PlugableComponents = [.. options.PlugableComponents, .. subsystemsPlugin.GetSubsystems()] };
26+
2327
public static ClusterVNodeOptions WithPlugableComponent(this ClusterVNodeOptions options, IPlugableComponent plugableComponent) =>
2428
options with { PlugableComponents = [.. options.PlugableComponents, plugableComponent] };
2529

src/KurrentDB.Core/Services/Storage/InMemory/VirtualStreamReader.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,15 @@
99
namespace KurrentDB.Core.Services.Storage.InMemory;
1010

1111
public class VirtualStreamReader : IVirtualStreamReader {
12-
private readonly IVirtualStreamReader[] _readers;
12+
private IVirtualStreamReader[] _readers;
1313

14-
public VirtualStreamReader(IVirtualStreamReader[] readers) {
15-
_readers = readers;
14+
public VirtualStreamReader(IVirtualStreamReader[] readers = null) {
15+
_readers = readers ?? [];
1616
}
1717

18+
public void Register(params IVirtualStreamReader[] readers) =>
19+
_readers = [.._readers, ..readers];
20+
1821
public ValueTask<ReadStreamEventsForwardCompleted> ReadForwards(ReadStreamEventsForward msg, CancellationToken token) {
1922
if (TryGetReader(msg.EventStreamId, out var reader))
2023
return reader.ReadForwards(msg, token);
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
2+
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).
3+
4+
using KurrentDB.Core.Data;
5+
using KurrentDB.Core.Services.Storage.InMemory;
6+
using KurrentDB.SecondaryIndexing.Indices;
7+
8+
namespace KurrentDB.SecondaryIndexing.Tests.Indices;
9+
10+
public class FakeSecondaryIndex : ISecondaryIndex {
11+
public FakeSecondaryIndex(string streamName) {
12+
Committed = [];
13+
Processor = new FakeSecondaryIndexProcessor(Committed, Pending);
14+
Readers = [new FakeVirtualStreamReader(streamName, Committed.AsReadOnly())];
15+
}
16+
17+
public IList<ResolvedEvent> Committed { get; }
18+
public IList<ResolvedEvent> Pending { get; } = new List<ResolvedEvent>();
19+
20+
public ISecondaryIndexProcessor Processor { get; }
21+
public IReadOnlyList<IVirtualStreamReader> Readers { get; }
22+
public ValueTask Init(CancellationToken ct) => ValueTask.CompletedTask;
23+
24+
public ValueTask<ulong?> GetLastPosition(CancellationToken ct) =>
25+
ValueTask.FromResult(Committed.Select(@event => (ulong?)@event.Event.LogPosition).FirstOrDefault());
26+
27+
public ValueTask<ulong?> GetLastSequence(CancellationToken ct) =>
28+
ValueTask.FromResult(Committed.Select(@event => (ulong?)@event.Event.EventNumber).FirstOrDefault());
29+
30+
public void Dispose() { }
31+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
2+
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).
3+
4+
using FluentStorage.Utils.Extensions;
5+
using KurrentDB.Core.Data;
6+
using KurrentDB.SecondaryIndexing.Indices;
7+
8+
namespace KurrentDB.SecondaryIndexing.Tests.Indices;
9+
10+
public class FakeSecondaryIndexProcessor(IList<ResolvedEvent> committed, IList<ResolvedEvent>? pending = null): ISecondaryIndexProcessor {
11+
private readonly object _lock = new();
12+
private readonly IList<ResolvedEvent> _pending = pending ?? [];
13+
14+
public ValueTask Index(ResolvedEvent resolvedEvent, CancellationToken token = default) {
15+
lock (_lock) {
16+
_pending.Add(resolvedEvent);
17+
}
18+
19+
return ValueTask.CompletedTask;
20+
}
21+
22+
public ValueTask Commit(CancellationToken token = default) {
23+
lock (_lock) {
24+
committed.AddRange(_pending);
25+
_pending.Clear();
26+
}
27+
return ValueTask.CompletedTask;
28+
}
29+
}

src/KurrentDB.SecondaryIndexing.Tests/FakeVirtualStreamReader.cs renamed to src/KurrentDB.SecondaryIndexing.Tests/Indices/FakeVirtualStreamReader.cs

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,19 @@
55
using KurrentDB.Core.Messages;
66
using KurrentDB.Core.Services.Storage.InMemory;
77

8-
namespace KurrentDB.SecondaryIndexing.Tests;
8+
namespace KurrentDB.SecondaryIndexing.Tests.Indices;
99

10-
public class FakeVirtualStreamReader(string streamName, ResolvedEvent[] events) : IVirtualStreamReader {
10+
internal class FakeVirtualStreamReader(string streamName, IReadOnlyList<ResolvedEvent> events) : IVirtualStreamReader {
1111
public ValueTask<ClientMessage.ReadStreamEventsForwardCompleted> ReadForwards(
1212
ClientMessage.ReadStreamEventsForward msg,
1313
CancellationToken token) {
1414

1515
ReadStreamResult result;
16-
long nextEventNumber, lastEventNumber;
16+
long nextEventNumber, lastEventNumber, lastPosition;
17+
bool isEndOfStream;
1718

1819
var readEvents = events
19-
.Where(e => e.Event.EventNumber >= msg.FromEventNumber)
20+
.Skip((int)msg.FromEventNumber)
2021
.ToArray();
2122

2223
ResolvedEvent? lastEvent = readEvents.Length > 0 ? readEvents.Last(): null;
@@ -25,13 +26,16 @@ public class FakeVirtualStreamReader(string streamName, ResolvedEvent[] events)
2526
result = ReadStreamResult.NoStream;
2627
nextEventNumber = -1;
2728
lastEventNumber = ExpectedVersion.NoStream;
29+
lastPosition = -1;
30+
isEndOfStream = true;
2831
} else {
2932
result = ReadStreamResult.Success;
30-
lastEventNumber = lastEvent.Value.Event.EventNumber;
33+
lastEventNumber = Array.IndexOf(readEvents, lastEvent);
3134
nextEventNumber = lastEventNumber + 1;
35+
lastPosition = lastEvent.Value.Event.TransactionPosition;
36+
isEndOfStream = nextEventNumber == events.Count;
3237
}
3338

34-
3539
return ValueTask.FromResult(new ClientMessage.ReadStreamEventsForwardCompleted(
3640
msg.CorrelationId,
3741
msg.EventStreamId,
@@ -44,19 +48,20 @@ public class FakeVirtualStreamReader(string streamName, ResolvedEvent[] events)
4448
error: string.Empty,
4549
nextEventNumber: nextEventNumber,
4650
lastEventNumber: lastEventNumber,
47-
isEndOfStream: true,
48-
tfLastCommitPosition: 0L));
51+
isEndOfStream: isEndOfStream,
52+
tfLastCommitPosition: lastPosition));
4953
}
5054

5155
public ValueTask<ClientMessage.ReadStreamEventsBackwardCompleted> ReadBackwards(
5256
ClientMessage.ReadStreamEventsBackward msg,
5357
CancellationToken token) {
5458

5559
ReadStreamResult result;
56-
long nextEventNumber, lastEventNumber;
60+
long nextEventNumber, lastEventNumber, lastPosition;
5761

5862
var readEvents = events
59-
.Where(e => e.Event.EventNumber <= msg.FromEventNumber)
63+
.Reverse()
64+
.Skip((int)msg.FromEventNumber)
6065
.ToArray();
6166

6267
ResolvedEvent? lastEvent = readEvents.Length > 0 ? readEvents.First(): null;
@@ -65,10 +70,12 @@ public class FakeVirtualStreamReader(string streamName, ResolvedEvent[] events)
6570
result = ReadStreamResult.NoStream;
6671
nextEventNumber = -1;
6772
lastEventNumber = ExpectedVersion.NoStream;
73+
lastPosition = -1;
6874
} else {
6975
result = ReadStreamResult.Success;
70-
lastEventNumber = lastEvent.Value.Event.EventNumber;
76+
lastEventNumber = Array.IndexOf(readEvents, lastEvent);
7177
nextEventNumber = lastEventNumber - 1;
78+
lastPosition = lastEvent.Value.Event.LogPosition;
7279
}
7380

7481
return ValueTask.FromResult(new ClientMessage.ReadStreamEventsBackwardCompleted(
@@ -77,21 +84,21 @@ public class FakeVirtualStreamReader(string streamName, ResolvedEvent[] events)
7784
msg.FromEventNumber,
7885
msg.MaxCount,
7986
result,
80-
readEvents,
87+
readEvents.Reverse().ToArray(),
8188
StreamMetadata.Empty,
8289
isCachePublic: false,
8390
error: string.Empty,
8491
nextEventNumber: nextEventNumber,
8592
lastEventNumber: lastEventNumber,
86-
isEndOfStream: true,
87-
tfLastCommitPosition: 0L));
93+
isEndOfStream: lastEventNumber == 0,
94+
tfLastCommitPosition: lastPosition));
8895
}
8996

9097
public long GetLastEventNumber(string streamId) =>
91-
events.Length > 0 ? events.Last().Event.EventNumber : -1;
98+
events.Count > 0 ? events.Last().Event.EventNumber : -1;
9299

93100
public long GetLastIndexedPosition(string streamId) =>
94-
events.Length > 0 ? events.Last().Event.LogPosition : -1;
101+
events.Count > 0 ? events.Last().Event.LogPosition : -1;
95102

96103
public bool CanReadStream(string streamId) =>
97104
streamId == streamName;

0 commit comments

Comments
 (0)