Skip to content

Commit b2357da

Browse files
authored
Merge pull request #74 from koculu/73-enhancement-create-evicttodisk-method-in-maintainer
Add EvictToDisk and refactor IMaintainer.
2 parents 9b45449 + b085602 commit b2357da

File tree

11 files changed

+106
-52
lines changed

11 files changed

+106
-52
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,8 @@ Note: For small data you don't need a maintainer.
143143
// 2. Read/Write data
144144
zoneTree.Upsert(39, "Hello ZoneTree!");
145145

146-
// 3. Complete maintainer running tasks.
147-
maintainer.CompleteRunningTasks();
146+
// 3. Wait for background threads.
147+
maintainer.WaitForBackgroundThreads();
148148
```
149149

150150
## How to delete keys?

src/Playground/Benchmark/OldTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public static void Insert(WriteAheadLogMode mode, int count)
5959
zoneTree.Maintenance.MoveMutableSegmentForward();
6060
zoneTree.Maintenance.StartMergeOperation()?.Join();
6161
}
62-
basicMaintainer.CompleteRunningTasks();
62+
basicMaintainer.WaitForBackgroundThreads();
6363
new StatsCollector().LogWithColor(
6464
"Merged in:",
6565
stopWatch.ElapsedMilliseconds,
@@ -183,7 +183,7 @@ public static void InsertSingleAndMerge(WriteAheadLogMode mode, int count, int k
183183
zoneTree.Maintenance.MoveMutableSegmentForward();
184184
zoneTree.Maintenance.StartMergeOperation()?.Join();
185185

186-
basicMaintainer.CompleteRunningTasks();
186+
basicMaintainer.WaitForBackgroundThreads();
187187
new StatsCollector().LogWithColor(
188188
"Merged in:",
189189
stopWatch.ElapsedMilliseconds,
@@ -222,7 +222,7 @@ public static void Iterate(WriteAheadLogMode mode, int count)
222222
"Completed in:",
223223
stopWatch.ElapsedMilliseconds,
224224
ConsoleColor.Green);
225-
basicMaintainer.CompleteRunningTasks();
225+
basicMaintainer.WaitForBackgroundThreads();
226226
}
227227

228228
public static void MultipleIterate(WriteAheadLogMode mode, int count, int iteratorCount)
@@ -262,7 +262,7 @@ public static void MultipleIterate(WriteAheadLogMode mode, int count, int iterat
262262
"Completed in:",
263263
stopWatch.ElapsedMilliseconds,
264264
ConsoleColor.Green);
265-
basicMaintainer.CompleteRunningTasks();
265+
basicMaintainer.WaitForBackgroundThreads();
266266
}
267267

268268
private static IZoneTree<int, int> OpenOrCreateZoneTree(WriteAheadLogMode mode, string dataPath)

src/Playground/Benchmark/StevesChallenge.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public void Insert(IStatsCollector stats)
7171
zoneTree.Maintenance.MoveMutableSegmentForward();
7272
zoneTree.Maintenance.StartMergeOperation()?.Join();
7373
}
74-
maintainer.CompleteRunningTasks();
74+
maintainer.WaitForBackgroundThreads();
7575

7676
stats.AddStage("Merged In", ConsoleColor.DarkCyan);
7777
}
@@ -100,7 +100,7 @@ public void Iterate(IStatsCollector stats)
100100
stats.AddStage(
101101
"Iterated in",
102102
ConsoleColor.Green);
103-
maintainer.CompleteRunningTasks();
103+
maintainer.WaitForBackgroundThreads();
104104
}
105105

106106
}

src/Playground/Benchmark/ZoneTreeTest1.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public void Insert(IStatsCollector stats)
5151
zoneTree.Maintenance.MoveMutableSegmentForward();
5252
zoneTree.Maintenance.StartMergeOperation()?.Join();
5353
}
54-
maintainer.CompleteRunningTasks();
54+
maintainer.WaitForBackgroundThreads();
5555

5656
stats.AddStage("Merged In", ConsoleColor.DarkCyan);
5757
}
@@ -82,6 +82,6 @@ public void Iterate(IStatsCollector stats)
8282
stats.AddStage(
8383
"Iterated in",
8484
ConsoleColor.Green);
85-
maintainer.CompleteRunningTasks();
85+
maintainer.WaitForBackgroundThreads();
8686
}
8787
}

src/Playground/Benchmark/ZoneTreeTest2.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public void Insert(IStatsCollector stats)
5454
zoneTree.Maintenance.MoveMutableSegmentForward();
5555
zoneTree.Maintenance.StartMergeOperation()?.Join();
5656
}
57-
maintainer.CompleteRunningTasks();
57+
maintainer.WaitForBackgroundThreads();
5858

5959
stats.AddStage("Merged In", ConsoleColor.DarkCyan);
6060
}
@@ -85,6 +85,6 @@ public void Iterate(IStatsCollector stats)
8585
stats.AddStage(
8686
"Iterated in",
8787
ConsoleColor.Green);
88-
maintainer.CompleteRunningTasks();
88+
maintainer.WaitForBackgroundThreads();
8989
}
9090
}

src/Playground/Benchmark/ZoneTreeTest3.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public void Insert(IStatsCollector stats)
5151
zoneTree.Maintenance.ZoneTree.Maintenance.MoveMutableSegmentForward();
5252
zoneTree.Maintenance.ZoneTree.Maintenance.StartMergeOperation()?.Join();
5353
}
54-
maintainer.CompleteRunningTasks();
54+
maintainer.WaitForBackgroundThreads();
5555

5656
stats.AddStage("Merged In", ConsoleColor.DarkCyan);
5757
}
@@ -82,6 +82,6 @@ public void Iterate(IStatsCollector stats)
8282
stats.AddStage(
8383
"Iterated in",
8484
ConsoleColor.Green);
85-
maintainer.CompleteRunningTasks();
85+
maintainer.WaitForBackgroundThreads();
8686
}
8787
}

src/Playground/Test1.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public static void SeveralParallelTransactions()
6363
});
6464
}
6565
Console.WriteLine("Elapsed: " + stopWatch.ElapsedMilliseconds);
66-
basicMaintainer.CompleteRunningTasks();
66+
basicMaintainer.WaitForBackgroundThreads();
6767
zoneTree.Maintenance.SaveMetaData();
6868
}
6969

@@ -110,7 +110,7 @@ public static void TestReverseIterator(
110110
return true;
111111
});
112112
}
113-
maintainer.CompleteRunningTasks();
113+
maintainer.WaitForBackgroundThreads();
114114
}
115115

116116
upload.Stop();
@@ -247,7 +247,7 @@ public static void TestIteratorBehavior(
247247
});
248248
t1.Wait();
249249
zoneTree1.Maintenance.TryCancelMergeOperation();
250-
maintainer.CompleteRunningTasks();
250+
maintainer.WaitForBackgroundThreads();
251251
}
252252

253253

src/ZoneTree/Core/ZoneTreeMaintainer.cs

Lines changed: 50 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System.Collections.Concurrent;
2+
using System.Threading;
23
using Tenray.ZoneTree.Logger;
34
using Tenray.ZoneTree.Segments;
45
using Tenray.ZoneTree.Segments.Disk;
@@ -36,12 +37,6 @@ public sealed class ZoneTreeMaintainer<TKey, TValue> : IMaintainer, IDisposable
3637
/// </summary>
3738
public IZoneTreeMaintenance<TKey, TValue> Maintenance { get; }
3839

39-
/// <inheritdoc/>
40-
public int MinimumSparseArrayLength { get; set; }
41-
42-
/// <inheritdoc/>
43-
public int SparseArrayStepLength { get; set; } = 1_000;
44-
4540
/// <inheritdoc/>
4641
public int ThresholdForMergeOperationStart { get; set; } = 0;
4742

@@ -105,17 +100,27 @@ void AttachEvents()
105100
Maintenance.OnDiskSegmentCreated += OnDiskSegmentCreated;
106101
Maintenance.OnMergeOperationEnded += OnMergeOperationEnded;
107102
Maintenance.OnZoneTreeIsDisposing += OnZoneTreeIsDisposing;
103+
Maintenance.OnBottomSegmentsMergeOperationEnded += OnBottomSegmentsMergeOperationEnded;
108104
}
109105

106+
110107
void OnZoneTreeIsDisposing(IZoneTreeMaintenance<TKey, TValue> zoneTree)
111108
{
112109
Trace("ZoneTree is disposing. ZoneTreeMaintainer disposal started.");
113110
PeriodicTimerCancellationTokenSource.Cancel();
114-
CompleteRunningTasks();
111+
WaitForBackgroundThreads();
115112
Dispose();
116113
Trace("ZoneTreeMaintainer is disposed.");
117114
}
118115

116+
void OnBottomSegmentsMergeOperationEnded(
117+
IZoneTreeMaintenance<TKey, TValue> zoneTree,
118+
MergeResult mergeResult)
119+
{
120+
Trace(mergeResult.ToString());
121+
MergerThreads.Remove(Environment.CurrentManagedThreadId, out _);
122+
}
123+
119124
void OnMergeOperationEnded(
120125
IZoneTreeMaintenance<TKey, TValue> zoneTree,
121126
MergeResult mergeResult)
@@ -168,7 +173,8 @@ void OnMutableSegmentMovedForward(IZoneTreeMaintenance<TKey, TValue> zoneTree)
168173
StartMerge();
169174
}
170175

171-
void StartMerge()
176+
/// <inheritdoc/>
177+
public void StartMerge()
172178
{
173179
lock (this)
174180
{
@@ -182,14 +188,39 @@ void StartMerge()
182188
}
183189
}
184190

191+
192+
/// <inheritdoc/>
193+
public void StartBottomSegmentsMerge(
194+
int fromIndex = 0, int toIndex = int.MaxValue)
195+
{
196+
lock (this)
197+
{
198+
var mergerThread = Maintenance
199+
.StartBottomSegmentsMergeOperation(fromIndex, toIndex);
200+
if (mergerThread == null)
201+
return;
202+
MergerThreads.AddOrUpdate(
203+
mergerThread.ManagedThreadId,
204+
mergerThread,
205+
(key, value) => mergerThread);
206+
}
207+
}
208+
185209
/// <inheritdoc/>
186-
public void TryCancelRunningTasks()
210+
public void TryCancelBackgroundThreads()
187211
{
188212
Maintenance.TryCancelMergeOperation();
213+
Maintenance.TryCancelBottomSegmentsMergeOperation();
189214
}
190215

191216
/// <inheritdoc/>
192-
public void CompleteRunningTasks()
217+
public void WaitForBackgroundThreads()
218+
{
219+
WaitForBackgroundThreadsAsync().Wait();
220+
}
221+
222+
/// <inheritdoc/>
223+
public async Task WaitForBackgroundThreadsAsync()
193224
{
194225
while (true)
195226
{
@@ -203,7 +234,7 @@ public void CompleteRunningTasks()
203234
if (t.ThreadState == ThreadState.Stopped)
204235
MergerThreads.TryRemove(a.Key, out var _);
205236
else
206-
t.Join();
237+
await Task.Run(() => t.Join());
207238
}
208239
Trace("Wait ended");
209240
}
@@ -245,6 +276,13 @@ void Trace(string msg)
245276
Logger.LogTrace(msg);
246277
}
247278

279+
/// <inheritdoc/>
280+
public void EvictToDisk()
281+
{
282+
Maintenance.MoveMutableSegmentForward();
283+
StartMerge();
284+
}
285+
248286
/// <summary>
249287
/// Disposes this maintainer.
250288
/// </summary>
@@ -256,5 +294,6 @@ public void Dispose()
256294
Maintenance.OnDiskSegmentCreated -= OnDiskSegmentCreated;
257295
Maintenance.OnMergeOperationEnded -= OnMergeOperationEnded;
258296
Maintenance.OnZoneTreeIsDisposing -= OnZoneTreeIsDisposing;
297+
Maintenance.OnBottomSegmentsMergeOperationEnded -= OnBottomSegmentsMergeOperationEnded;
259298
}
260299
}

src/ZoneTree/IMaintainer.cs

Lines changed: 36 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,27 +5,11 @@
55
/// merge operations and memory compaction.
66
/// </summary>
77
/// <remarks>
8-
/// You must complete or cancel all pending tasks of this maintainer
8+
/// You must complete or cancel all pending threads of this maintainer
99
/// before disposing.
1010
/// </remarks>
1111
public interface IMaintainer : IDisposable
1212
{
13-
/// <summary>
14-
/// Minimum sparse array length when a new disk segment is created.
15-
/// Default value is 0.
16-
/// </summary>
17-
int MinimumSparseArrayLength { get; set; }
18-
19-
/// <summary>
20-
/// Configures sparse array step length when the disk segment length is bigger than
21-
/// MinimumSparseArrayLength * SparseArrayStepLength.
22-
/// The default value is 1000.
23-
/// <remarks>The sparse array length reduce binary lookup range on disk segment
24-
/// to reduce IO.
25-
/// </remarks>
26-
/// </summary>
27-
int SparseArrayStepLength { get; set; }
28-
2913
/// <summary>
3014
/// Starts merge operation when records count
3115
/// in read-only segments exceeds this value.
@@ -61,12 +45,43 @@ public interface IMaintainer : IDisposable
6145
TimeSpan InactiveBlockCacheCleanupInterval { get; set; }
6246

6347
/// <summary>
64-
/// Tries cancel running tasks.
48+
/// Tries cancel background threads.
49+
/// </summary>
50+
void TryCancelBackgroundThreads();
51+
52+
/// <summary>
53+
/// Blocks the calling thread until all background threads have completed their execution.
54+
/// </summary>
55+
void WaitForBackgroundThreads();
56+
57+
/// <summary>
58+
/// Asynchronously waits for all background threads to complete.
59+
/// </summary>
60+
/// <returns>A task that represents the asynchronous wait operation.</returns>
61+
Task WaitForBackgroundThreadsAsync();
62+
63+
/// <summary>
64+
/// Evicts all in-memory data to disk by moving the mutable segment forward and initiating a merge process.
65+
/// </summary>
66+
/// <remarks>
67+
/// This method is responsible for freeing up memory in the LSM tree by moving data from the mutable in-memory segment to disk storage.
68+
/// It first advances the current mutable segment to a new state, ensuring that any data currently in memory is prepared for disk storage.
69+
/// Afterward, it starts the merging process, which combines the in-memory data with existing on-disk data to maintain the integrity
70+
/// and efficiency of the LSM tree structure.
71+
/// </remarks>
72+
void EvictToDisk();
73+
74+
/// <summary>
75+
/// Initiates the merge process in a new thread.
6576
/// </summary>
66-
void TryCancelRunningTasks();
77+
void StartMerge();
6778

6879
/// <summary>
69-
/// Waits until all running tasks are completed.
80+
/// Initiates a merge of selected bottom segments into a single bottom disk segment.
7081
/// </summary>
71-
void CompleteRunningTasks();
82+
/// <param name="fromIndex">The lower bound</param>
83+
/// <param name="toIndex">The upper bound</param>
84+
/// <returns></returns>
85+
void StartBottomSegmentsMerge(
86+
int fromIndex = 0, int toIndex = int.MaxValue);
7287
}

src/ZoneTree/docs/ZoneTree/README-NUGET.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,8 @@ Note: For small data you don't need a maintainer.
141141
// 2. Read/Write data
142142
zoneTree.Upsert(39, "Hello ZoneTree!");
143143

144-
// 3. Complete maintainer running tasks.
145-
maintainer.CompleteRunningTasks();
144+
// 3. Wait for background threads.
145+
maintainer.WaitForBackgroundThreads();
146146
```
147147

148148
## How to delete keys?

0 commit comments

Comments
 (0)