Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ Note: For small data you don't need a maintainer.
// 2. Read/Write data
zoneTree.Upsert(39, "Hello ZoneTree!");

// 3. Complete maintainer running tasks.
maintainer.CompleteRunningTasks();
// 3. Wait for background threads.
maintainer.WaitForBackgroundThreads();
```

## How to delete keys?
Expand Down
8 changes: 4 additions & 4 deletions src/Playground/Benchmark/OldTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public static void Insert(WriteAheadLogMode mode, int count)
zoneTree.Maintenance.MoveMutableSegmentForward();
zoneTree.Maintenance.StartMergeOperation()?.Join();
}
basicMaintainer.CompleteRunningTasks();
basicMaintainer.WaitForBackgroundThreads();
new StatsCollector().LogWithColor(
"Merged in:",
stopWatch.ElapsedMilliseconds,
Expand Down Expand Up @@ -183,7 +183,7 @@ public static void InsertSingleAndMerge(WriteAheadLogMode mode, int count, int k
zoneTree.Maintenance.MoveMutableSegmentForward();
zoneTree.Maintenance.StartMergeOperation()?.Join();

basicMaintainer.CompleteRunningTasks();
basicMaintainer.WaitForBackgroundThreads();
new StatsCollector().LogWithColor(
"Merged in:",
stopWatch.ElapsedMilliseconds,
Expand Down Expand Up @@ -222,7 +222,7 @@ public static void Iterate(WriteAheadLogMode mode, int count)
"Completed in:",
stopWatch.ElapsedMilliseconds,
ConsoleColor.Green);
basicMaintainer.CompleteRunningTasks();
basicMaintainer.WaitForBackgroundThreads();
}

public static void MultipleIterate(WriteAheadLogMode mode, int count, int iteratorCount)
Expand Down Expand Up @@ -262,7 +262,7 @@ public static void MultipleIterate(WriteAheadLogMode mode, int count, int iterat
"Completed in:",
stopWatch.ElapsedMilliseconds,
ConsoleColor.Green);
basicMaintainer.CompleteRunningTasks();
basicMaintainer.WaitForBackgroundThreads();
}

private static IZoneTree<int, int> OpenOrCreateZoneTree(WriteAheadLogMode mode, string dataPath)
Expand Down
4 changes: 2 additions & 2 deletions src/Playground/Benchmark/StevesChallenge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void Insert(IStatsCollector stats)
zoneTree.Maintenance.MoveMutableSegmentForward();
zoneTree.Maintenance.StartMergeOperation()?.Join();
}
maintainer.CompleteRunningTasks();
maintainer.WaitForBackgroundThreads();

stats.AddStage("Merged In", ConsoleColor.DarkCyan);
}
Expand Down Expand Up @@ -100,7 +100,7 @@ public void Iterate(IStatsCollector stats)
stats.AddStage(
"Iterated in",
ConsoleColor.Green);
maintainer.CompleteRunningTasks();
maintainer.WaitForBackgroundThreads();
}

}
4 changes: 2 additions & 2 deletions src/Playground/Benchmark/ZoneTreeTest1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void Insert(IStatsCollector stats)
zoneTree.Maintenance.MoveMutableSegmentForward();
zoneTree.Maintenance.StartMergeOperation()?.Join();
}
maintainer.CompleteRunningTasks();
maintainer.WaitForBackgroundThreads();

stats.AddStage("Merged In", ConsoleColor.DarkCyan);
}
Expand Down Expand Up @@ -82,6 +82,6 @@ public void Iterate(IStatsCollector stats)
stats.AddStage(
"Iterated in",
ConsoleColor.Green);
maintainer.CompleteRunningTasks();
maintainer.WaitForBackgroundThreads();
}
}
4 changes: 2 additions & 2 deletions src/Playground/Benchmark/ZoneTreeTest2.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void Insert(IStatsCollector stats)
zoneTree.Maintenance.MoveMutableSegmentForward();
zoneTree.Maintenance.StartMergeOperation()?.Join();
}
maintainer.CompleteRunningTasks();
maintainer.WaitForBackgroundThreads();

stats.AddStage("Merged In", ConsoleColor.DarkCyan);
}
Expand Down Expand Up @@ -85,6 +85,6 @@ public void Iterate(IStatsCollector stats)
stats.AddStage(
"Iterated in",
ConsoleColor.Green);
maintainer.CompleteRunningTasks();
maintainer.WaitForBackgroundThreads();
}
}
4 changes: 2 additions & 2 deletions src/Playground/Benchmark/ZoneTreeTest3.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void Insert(IStatsCollector stats)
zoneTree.Maintenance.ZoneTree.Maintenance.MoveMutableSegmentForward();
zoneTree.Maintenance.ZoneTree.Maintenance.StartMergeOperation()?.Join();
}
maintainer.CompleteRunningTasks();
maintainer.WaitForBackgroundThreads();

stats.AddStage("Merged In", ConsoleColor.DarkCyan);
}
Expand Down Expand Up @@ -82,6 +82,6 @@ public void Iterate(IStatsCollector stats)
stats.AddStage(
"Iterated in",
ConsoleColor.Green);
maintainer.CompleteRunningTasks();
maintainer.WaitForBackgroundThreads();
}
}
6 changes: 3 additions & 3 deletions src/Playground/Test1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static void SeveralParallelTransactions()
});
}
Console.WriteLine("Elapsed: " + stopWatch.ElapsedMilliseconds);
basicMaintainer.CompleteRunningTasks();
basicMaintainer.WaitForBackgroundThreads();
zoneTree.Maintenance.SaveMetaData();
}

Expand Down Expand Up @@ -110,7 +110,7 @@ public static void TestReverseIterator(
return true;
});
}
maintainer.CompleteRunningTasks();
maintainer.WaitForBackgroundThreads();
}

upload.Stop();
Expand Down Expand Up @@ -247,7 +247,7 @@ public static void TestIteratorBehavior(
});
t1.Wait();
zoneTree1.Maintenance.TryCancelMergeOperation();
maintainer.CompleteRunningTasks();
maintainer.WaitForBackgroundThreads();
}


Expand Down
61 changes: 50 additions & 11 deletions src/ZoneTree/Core/ZoneTreeMaintainer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Concurrent;
using System.Threading;
using Tenray.ZoneTree.Logger;
using Tenray.ZoneTree.Segments;
using Tenray.ZoneTree.Segments.Disk;
Expand Down Expand Up @@ -36,14 +37,8 @@
/// </summary>
public IZoneTreeMaintenance<TKey, TValue> Maintenance { get; }

/// <inheritdoc/>
public int MinimumSparseArrayLength { get; set; }

/// <inheritdoc/>
public int SparseArrayStepLength { get; set; } = 1_000;

/// <inheritdoc/>
public int ThresholdForMergeOperationStart { get; set; } = 0;

Check warning on line 41 in src/ZoneTree/Core/ZoneTreeMaintainer.cs

View workflow job for this annotation

GitHub Actions / build

Member 'ThresholdForMergeOperationStart' is explicitly initialized to its default value (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca1805)

/// <inheritdoc/>
public int MaximumReadOnlySegmentCount { get; set; } = 64;
Expand Down Expand Up @@ -105,17 +100,27 @@
Maintenance.OnDiskSegmentCreated += OnDiskSegmentCreated;
Maintenance.OnMergeOperationEnded += OnMergeOperationEnded;
Maintenance.OnZoneTreeIsDisposing += OnZoneTreeIsDisposing;
Maintenance.OnBottomSegmentsMergeOperationEnded += OnBottomSegmentsMergeOperationEnded;
}


void OnZoneTreeIsDisposing(IZoneTreeMaintenance<TKey, TValue> zoneTree)
{
Trace("ZoneTree is disposing. ZoneTreeMaintainer disposal started.");
PeriodicTimerCancellationTokenSource.Cancel();
CompleteRunningTasks();
WaitForBackgroundThreads();
Dispose();
Trace("ZoneTreeMaintainer is disposed.");
}

void OnBottomSegmentsMergeOperationEnded(
IZoneTreeMaintenance<TKey, TValue> zoneTree,
MergeResult mergeResult)
{
Trace(mergeResult.ToString());
MergerThreads.Remove(Environment.CurrentManagedThreadId, out _);
}

void OnMergeOperationEnded(
IZoneTreeMaintenance<TKey, TValue> zoneTree,
MergeResult mergeResult)
Expand Down Expand Up @@ -168,7 +173,8 @@
StartMerge();
}

void StartMerge()
/// <inheritdoc/>
public void StartMerge()
{
lock (this)
{
Expand All @@ -182,14 +188,39 @@
}
}


/// <inheritdoc/>
public void StartBottomSegmentsMerge(
int fromIndex = 0, int toIndex = int.MaxValue)
{
lock (this)
{
var mergerThread = Maintenance
.StartBottomSegmentsMergeOperation(fromIndex, toIndex);
if (mergerThread == null)
return;
MergerThreads.AddOrUpdate(
mergerThread.ManagedThreadId,
mergerThread,
(key, value) => mergerThread);
}
}

/// <inheritdoc/>
public void TryCancelRunningTasks()
public void TryCancelBackgroundThreads()
{
Maintenance.TryCancelMergeOperation();
Maintenance.TryCancelBottomSegmentsMergeOperation();
}

/// <inheritdoc/>
public void CompleteRunningTasks()
public void WaitForBackgroundThreads()
{
WaitForBackgroundThreadsAsync().Wait();
}

/// <inheritdoc/>
public async Task WaitForBackgroundThreadsAsync()
{
while (true)
{
Expand All @@ -203,7 +234,7 @@
if (t.ThreadState == ThreadState.Stopped)
MergerThreads.TryRemove(a.Key, out var _);
else
t.Join();
await Task.Run(() => t.Join());
}
Trace("Wait ended");
}
Expand Down Expand Up @@ -245,6 +276,13 @@
Logger.LogTrace(msg);
}

/// <inheritdoc/>
public void EvictToDisk()
{
Maintenance.MoveMutableSegmentForward();
StartMerge();
}

/// <summary>
/// Disposes this maintainer.
/// </summary>
Expand All @@ -256,5 +294,6 @@
Maintenance.OnDiskSegmentCreated -= OnDiskSegmentCreated;
Maintenance.OnMergeOperationEnded -= OnMergeOperationEnded;
Maintenance.OnZoneTreeIsDisposing -= OnZoneTreeIsDisposing;
Maintenance.OnBottomSegmentsMergeOperationEnded -= OnBottomSegmentsMergeOperationEnded;
}
}
57 changes: 36 additions & 21 deletions src/ZoneTree/IMaintainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,11 @@
/// merge operations and memory compaction.
/// </summary>
/// <remarks>
/// You must complete or cancel all pending tasks of this maintainer
/// You must complete or cancel all pending threads of this maintainer
/// before disposing.
/// </remarks>
public interface IMaintainer : IDisposable
{
/// <summary>
/// Minimum sparse array length when a new disk segment is created.
/// Default value is 0.
/// </summary>
int MinimumSparseArrayLength { get; set; }

/// <summary>
/// Configures sparse array step length when the disk segment length is bigger than
/// MinimumSparseArrayLength * SparseArrayStepLength.
/// The default value is 1000.
/// <remarks>The sparse array length reduce binary lookup range on disk segment
/// to reduce IO.
/// </remarks>
/// </summary>
int SparseArrayStepLength { get; set; }

/// <summary>
/// Starts merge operation when records count
/// in read-only segments exceeds this value.
Expand Down Expand Up @@ -61,12 +45,43 @@ public interface IMaintainer : IDisposable
TimeSpan InactiveBlockCacheCleanupInterval { get; set; }

/// <summary>
/// Tries cancel running tasks.
/// Tries cancel background threads.
/// </summary>
void TryCancelBackgroundThreads();

/// <summary>
/// Blocks the calling thread until all background threads have completed their execution.
/// </summary>
void WaitForBackgroundThreads();

/// <summary>
/// Asynchronously waits for all background threads to complete.
/// </summary>
/// <returns>A task that represents the asynchronous wait operation.</returns>
Task WaitForBackgroundThreadsAsync();

/// <summary>
/// Evicts all in-memory data to disk by moving the mutable segment forward and initiating a merge process.
/// </summary>
/// <remarks>
/// This method is responsible for freeing up memory in the LSM tree by moving data from the mutable in-memory segment to disk storage.
/// It first advances the current mutable segment to a new state, ensuring that any data currently in memory is prepared for disk storage.
/// Afterward, it starts the merging process, which combines the in-memory data with existing on-disk data to maintain the integrity
/// and efficiency of the LSM tree structure.
/// </remarks>
void EvictToDisk();

/// <summary>
/// Initiates the merge process in a new thread.
/// </summary>
void TryCancelRunningTasks();
void StartMerge();

/// <summary>
/// Waits until all running tasks are completed.
/// Initiates a merge of selected bottom segments into a single bottom disk segment.
/// </summary>
void CompleteRunningTasks();
/// <param name="fromIndex">The lower bound</param>
/// <param name="toIndex">The upper bound</param>
/// <returns></returns>
void StartBottomSegmentsMerge(
int fromIndex = 0, int toIndex = int.MaxValue);
}
4 changes: 2 additions & 2 deletions src/ZoneTree/docs/ZoneTree/README-NUGET.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ Note: For small data you don't need a maintainer.
// 2. Read/Write data
zoneTree.Upsert(39, "Hello ZoneTree!");

// 3. Complete maintainer running tasks.
maintainer.CompleteRunningTasks();
// 3. Wait for background threads.
maintainer.WaitForBackgroundThreads();
```

## How to delete keys?
Expand Down
2 changes: 1 addition & 1 deletion src/ZoneTree/docs/ZoneTree/guide/quick-start.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ for (var i = 0; i < 10_000_000; ++i){
}

// Ensure maintainer merge operations are completed before tree disposal.
maintainer.CompleteRunningTasks();
maintainer.WaitForBackgroundThreads();
```

## How to merge data to the disk segment manually?
Expand Down
Loading