Skip to content

Commit 0f16af3

Browse files
committed
NCBC-4005 Implement extParallelUnstaging
Motivation ========== Unstaging in parallel will significantly improve performance of transactions which have many staged mutation. Modification ============ Do the unstaging in parallel. Limit the number of unstaging docs that are in-flight at any time to an arbitrary number of 100 for now, but that constant can be changed if needed. We keep up to 100 unstaging tasks in parallel, which sounds sufficient. Note this is only when they are done entirely with KV - a query in the transaction then makes the query server do the commit. Results ======= The extention's tests pass, no alarming failures elsewhere. Change-Id: I1f7515cc99e008cb8adf1bf2844d0a5fbd5c9a95 Reviewed-on: https://review.couchbase.org/c/couchbase-net-client/+/226777 Reviewed-by: Michael Reiche <michael.reiche@couchbase.com> Tested-by: Build Bot <build@couchbase.com>
1 parent af6b3c1 commit 0f16af3

File tree

2 files changed

+45
-17
lines changed

2 files changed

+45
-17
lines changed

src/Couchbase/Client/Transactions/AttemptContext.cs

Lines changed: 44 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public class AttemptContext
5353
private readonly TransactionContext _overallContext;
5454
private readonly MergedTransactionConfig _config;
5555
private readonly ITestHooks _testHooks;
56+
private readonly int _unstagingConcurrency = 100;
5657
internal IRedactor Redactor { get; }
5758
internal AttemptStates AttemptState = AttemptStates.NOTHING_WRITTEN;
5859
private readonly ErrorTriage _triage;
@@ -1272,26 +1273,43 @@ private async Task UnstageDocs(IRequestSpan? parentSpan)
12721273
{
12731274
using var traceSpan = TraceSpan(parent: parentSpan);
12741275
var allStagedMutations = _stagedMutations.ToList();
1275-
foreach (var sm in allStagedMutations)
1276+
var sem = new SemaphoreSlim(_unstagingConcurrency);
1277+
var tasks = allStagedMutations.Select(async sm =>
12761278
{
1279+
await UnstageDoc(sm, sem).CAF();
1280+
});
1281+
await Task.WhenAll(tasks).CAF();
1282+
}
1283+
1284+
private async Task UnstageDoc(StagedMutation sm, SemaphoreSlim sem)
1285+
{
1286+
try
1287+
{
1288+
await sem.WaitAsync().CAF();
12771289
(var cas, var content) = await FetchIfNeededBeforeUnstage(sm).CAF();
12781290
switch (sm.Type)
12791291
{
12801292
case StagedMutationType.Remove:
12811293
await UnstageRemove(sm).CAF();
12821294
break;
12831295
case StagedMutationType.Insert:
1284-
await UnstageInsertOrReplace(sm, cas, content, insertMode: true, ambiguityResolutionMode: false)
1296+
await UnstageInsertOrReplace(sm, cas, content, insertMode: true,
1297+
ambiguityResolutionMode: false)
12851298
.CAF();
12861299
break;
12871300
case StagedMutationType.Replace:
12881301
await UnstageInsertOrReplace(sm, cas, content, insertMode: false,
12891302
ambiguityResolutionMode: false).CAF();
12901303
break;
12911304
default:
1292-
throw new InvalidOperationException($"Cannot un-stage transaction mutation of type {sm.Type}");
1305+
throw new InvalidOperationException(
1306+
$"Cannot un-stage transaction mutation of type {sm.Type}");
12931307
}
12941308
}
1309+
finally
1310+
{
1311+
sem.Release();
1312+
}
12951313
}
12961314

12971315
private async Task UnstageRemove(StagedMutation sm, bool ambiguityResolutionMode = false,
@@ -1848,24 +1866,33 @@ internal async Task RollbackWithKv(bool isAppRollback, IRequestSpan? parentSpan)
18481866

18491867
await SetAtrAborted(isAppRollback, traceSpan.Item).CAF();
18501868
var allMutations = _stagedMutations.ToList();
1851-
foreach (var sm in allMutations)
1869+
var sem = new SemaphoreSlim(_unstagingConcurrency);
1870+
var tasks = allMutations.Select(async sm =>
18521871
{
1853-
switch (sm.Type)
1872+
try
18541873
{
1855-
case StagedMutationType.Insert:
1856-
await RollbackStagedInsert(sm, traceSpan.Item).CAF();
1857-
break;
1858-
case StagedMutationType.Remove:
1859-
case StagedMutationType.Replace:
1860-
await RollbackStagedReplaceOrRemove(sm, traceSpan.Item).CAF();
1861-
break;
1862-
default:
1863-
throw new InvalidOperationException(sm.Type +
1864-
" is not a supported mutation type for rollback.");
1874+
await sem.WaitAsync().CAF();
1875+
switch (sm.Type)
1876+
{
1877+
case StagedMutationType.Insert:
1878+
await RollbackStagedInsert(sm, traceSpan.Item).CAF();
1879+
break;
1880+
case StagedMutationType.Remove:
1881+
case StagedMutationType.Replace:
1882+
await RollbackStagedReplaceOrRemove(sm, traceSpan.Item).CAF();
1883+
break;
1884+
default:
1885+
throw new InvalidOperationException(sm.Type +
1886+
" is not a supported mutation type for rollback.");
18651887

1888+
}
18661889
}
1867-
}
1868-
1890+
finally
1891+
{
1892+
sem.Release();
1893+
}
1894+
});
1895+
await Task.WhenAll(tasks).CAF();
18691896
await SetAtrRolledBack(traceSpan.Item).CAF();
18701897
}
18711898

src/Couchbase/Client/Transactions/ProtocolVersion.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public static IEnumerable<ExtensionName> ExtensionsSupported()
3333
yield return new ExtensionName("ExtThreadSafe", "EXT_THREAD_SAFE", "TS");
3434
yield return new ExtensionName("ExtInsertExisting", "EXT_INSERT_EXISTING", "IX");
3535
yield return new ExtensionName("ExtReplaceBodyWithXattr", "EXT_REPLACE_BODY_WITH_XATTR","RX");
36+
yield return new ExtensionName("ExtParallelUnstaging", "EXT_PARALLEL_UNSTAGING", "PU");
3637
}
3738

3839
internal static bool Supported(string shortCode) => SupportedShortCodes.Value.Contains(shortCode);

0 commit comments

Comments
 (0)