Skip to content

Commit 86cb132

Browse files
committed
NCBC-3957: Implement extReplaceBodyWithXattr
Motivation ========== This is an optimization for clusters that support the ReviveDocument and ReplaceBodyWithXattr flags. We don't have to store the document contents in our staged mutations in memory and move it across to the server when unstaging. Now we just tell the server to move the already staged contents across to the body of the doc. Modification ============ We just check to see if the server supports the flags, and use them. There are some error handling changes that are part of this as well - most of the changes have to do with that. See the docs on transactions for details. Results ======= All the extReplaceBodyWithXattr tests in FIT now pass. Change-Id: I49da87fbc4a4a693d246919c3a06f4291538983c Reviewed-on: https://review.couchbase.org/c/couchbase-net-client/+/225606 Tested-by: Build Bot <build@couchbase.com> Reviewed-by: Michael Reiche <michael.reiche@couchbase.com>
1 parent 3e45ac8 commit 86cb132

File tree

5 files changed

+101
-22
lines changed

5 files changed

+101
-22
lines changed

src/Couchbase/Client/Transactions/AttemptContext.cs

Lines changed: 49 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1350,6 +1350,8 @@ await RepeatUntilSuccessOrThrow(async () =>
13501350
return Task.FromResult((sm.Doc.Cas, sm.Content));
13511351
}
13521352

1353+
1354+
13531355
private async Task UnstageInsertOrReplace(StagedMutation sm, ulong cas, object content, bool insertMode = false,
13541356
bool ambiguityResolutionMode = false, IRequestSpan? parentSpan = null)
13551357
{
@@ -1407,40 +1409,70 @@ await RepeatUntilSuccessOrThrow(async () =>
14071409
ambiguityResolutionMode = true;
14081410
return RepeatAction.RepeatWithDelay;
14091411
case ErrorClass.FailCasMismatch:
1410-
if (ambiguityResolutionMode)
1411-
{
1412-
throw _triage.AssertNotNull(triaged, ex);
1413-
}
1414-
else
1415-
{
1416-
cas = 0;
1417-
return RepeatAction.RepeatWithDelay;
1418-
}
1412+
RepeatAction returnVal;
1413+
(returnVal, cas) = await HandleDocChangedDuringCommit(sm, cas).CAF();
1414+
return returnVal;
14191415
case ErrorClass.FailDocNotFound:
14201416
// TODO: publish IllegalDocumentState event to the application.
14211417
Logger?.LogError("IllegalDocumentState: " + triaged.ec);
14221418
insertMode = true;
14231419
return RepeatAction.RepeatWithDelay;
14241420
case ErrorClass.FailDocAlreadyExists:
1421+
// we just move on if it is a replace
1422+
if (!insertMode)
1423+
return RepeatAction.NoRepeat;
14251424
if (ambiguityResolutionMode)
14261425
{
14271426
throw _triage.AssertNotNull(triaged, ex);
14281427
}
1429-
else
1430-
{
1431-
// TODO: publish an IllegalDocumentState event to the application.
1432-
Logger?.LogError("IllegalDocumentState: " + triaged.ec);
1433-
insertMode = false;
1434-
cas = 0;
1435-
return RepeatAction.RepeatWithDelay;
1436-
}
1428+
// now consider it a replace
1429+
insertMode = false;
1430+
cas = 0;
1431+
return RepeatAction.RepeatWithDelay;
14371432
}
14381433

14391434
throw _triage.AssertNotNull(triaged, ex);
14401435
}
14411436
}).CAF();
14421437
}
14431438

1439+
private async Task<(RepeatAction, ulong)> HandleDocChangedDuringCommit(StagedMutation sm, ulong cas)
1440+
{
1441+
Logger.LogDebug("handling doc changed during commit");
1442+
if (HasExpiredClientSide(sm.Doc.Id, DefaultTestHooks.HOOK_BEFORE_DOC_CHANGED_DURING_COMMIT))
1443+
{
1444+
throw CreateError(this, ErrorClass.FailExpiry)
1445+
.DoNotRollbackAttempt()
1446+
.Cause(new AttemptExpiredException(this,
1447+
"Commit expired in HandleDocChangedDuringCommit"))
1448+
.Build();
1449+
}
1450+
try
1451+
{
1452+
await _testHooks.BeforeDocChangedDuringCommit(this, sm.Doc.Id).CAF();
1453+
var doc = await this.GetWithMav(sm.Doc.Collection, sm.Doc.Id).CAF();
1454+
if (doc?.TransactionXattrs?.Id?.Transactionid != TransactionId ||
1455+
(doc?.TransactionXattrs == null)) {
1456+
return (RepeatAction.NoRepeat, cas);
1457+
}
1458+
// Same txn/attempt, so let's just retry with the new cas
1459+
cas = doc.Cas;
1460+
return (RepeatAction.RepeatNoDelay, cas);
1461+
} catch (Exception ex)
1462+
{
1463+
var triaged = _triage.TriageUnstageInsertOrReplaceErrors(ex, _expirationOvertimeMode);
1464+
Logger.LogDebug("handling doc changed during commit got {ec}", triaged.ec);
1465+
return triaged.ec switch
1466+
{
1467+
ErrorClass.FailTransient => (RepeatAction.RepeatWithDelay, cas),
1468+
ErrorClass.TransactionOperationFailed => throw new
1469+
TransactionOperationFailedException(ErrorClass.FailCasMismatch, false, true,
1470+
ex, TransactionOperationFailedException.FinalError.TransactionFailed),
1471+
_ => throw _triage.AssertNotNull(triaged, ex)
1472+
};
1473+
}
1474+
}
1475+
14441476
private async Task SetAtrCommit(IRequestSpan? parentSpan)
14451477
{
14461478
_ = _atr ?? throw new InvalidOperationException($"{nameof(SetAtrCommit)} without initializing ATR.");

src/Couchbase/Client/Transactions/DataAccess/DocumentRepository.cs

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
using Couchbase.Client.Transactions.DataModel;
1212
using Couchbase.Client.Transactions.Internal;
1313
using Couchbase.Client.Transactions.Support;
14+
using Couchbase.Core.Configuration.Server;
1415
using Newtonsoft.Json;
1516
using Newtonsoft.Json.Linq;
1617
using JsonSerializer = Newtonsoft.Json.JsonSerializer;
@@ -113,8 +114,43 @@ public DocumentRepository(TransactionContext overallContext, TimeSpan? keyValueT
113114
return (updatedDoc.Cas, updatedDoc.MutationToken);
114115
}
115116

117+
public bool SupportsReplaceBodyWithXattr(ICouchbaseCollection collection)
118+
{
119+
var bucket = collection.Scope.Bucket;
120+
if (bucket is CouchbaseBucket couchBucket)
121+
{
122+
// NOTE: we look for SUBDOC_REVIVE_DOCUMENT as this came slightly later than
123+
// ReplaceBodyWithXattr, and both are needed.
124+
return couchBucket.CurrentConfig?.BucketCapabilities.Contains(BucketCapabilities
125+
.SUBDOC_REVIVE_DOCUMENT) == true;
126+
}
127+
128+
return false;
129+
}
130+
116131
public async Task<(ulong updatedCas, MutationToken? mutationToken)> UnstageInsertOrReplace(ICouchbaseCollection collection, string docId, ulong cas, object finalDoc, bool insertMode)
117132
{
133+
if (SupportsReplaceBodyWithXattr(collection))
134+
{
135+
MutateInOptions opts;
136+
if (insertMode) {
137+
opts = GetMutateInOptions(StoreSemantics.AccessDeleted)
138+
.ReviveDocument(true);
139+
} else
140+
{
141+
opts = GetMutateInOptions(StoreSemantics.Replace)
142+
.Cas(cas);
143+
}
144+
var mutateResult = await collection.MutateInAsync(docId, specs =>
145+
specs.ReplaceBodyWithXattr(TransactionFields.StagedData)
146+
.Upsert(TransactionFields.TransactionInterfacePrefixOnly, string.Empty,
147+
isXattr: true, createPath: true)
148+
.Remove(TransactionFields.TransactionInterfacePrefixOnly,
149+
isXattr: true),
150+
opts).CAF();
151+
return (mutateResult.Cas, mutateResult.MutationToken);
152+
}
153+
// if bucket doesn't support ReplaceBodyWithXattr (and ReviveDocument)
118154
if (insertMode)
119155
{
120156
var opts = new InsertOptions().Defaults(_durability, _keyValueTimeout);
@@ -127,10 +163,10 @@ public DocumentRepository(TransactionContext overallContext, TimeSpan? keyValueT
127163
.Cas(cas)
128164
.Transcoder(_userDataTranscoder);
129165
var mutateResult = await collection.MutateInAsync(docId, specs =>
130-
specs.Upsert(TransactionFields.TransactionInterfacePrefixOnly, string.Empty,
131-
isXattr: true, createPath: true)
132-
.Remove(TransactionFields.TransactionInterfacePrefixOnly, isXattr: true)
133-
.SetDoc(finalDoc), opts).CAF();
166+
specs.Upsert(TransactionFields.TransactionInterfacePrefixOnly, string.Empty,
167+
isXattr: true, createPath: true)
168+
.Remove(TransactionFields.TransactionInterfacePrefixOnly, isXattr: true)
169+
.SetDoc(finalDoc), opts).CAF();
134170
return (mutateResult.Cas, mutateResult.MutationToken);
135171
}
136172
}

src/Couchbase/Client/Transactions/Error/ErrorClass.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,11 @@ public static ErrorClass Classify(this Exception ex)
8989
return ErrorClass.FailCasMismatch;
9090
}
9191

92+
if (ex is DocumentAlreadyAliveException)
93+
{
94+
return ErrorClass.FailDocAlreadyExists;
95+
}
96+
9297
if (ex.IsFailTransient())
9398
{
9499
return ErrorClass.FailTransient;

src/Couchbase/Client/Transactions/Internal/Test/ITestHooks.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#nullable enable
22
using System;
33
using System.Threading.Tasks;
4+
using Couchbase.Client.Transactions.Components;
45
using Couchbase.Core.Compatibility;
56

67
#pragma warning disable CS1591
@@ -23,6 +24,7 @@ internal interface ITestHooks
2324
public Task<int?> BeforeDocCommitted(AttemptContext self, string id);
2425
public Task<int?> BeforeDocRolledBack(AttemptContext self, string id);
2526
public Task<int?> AfterDocCommittedBeforeSavingCas(AttemptContext self, string id);
27+
public Task <int?> BeforeDocChangedDuringCommit(AttemptContext self, string id);
2628
public Task<int?> AfterDocCommitted(AttemptContext self, string id);
2729
public Task<int?> AfterDocsCommitted( AttemptContext self);
2830
public Task<int?> BeforeDocRemoved(AttemptContext self, string id);
@@ -97,11 +99,14 @@ internal class DefaultTestHooks : ITestHooks
9799
public const string HOOK_QUERY_KV_REMOVE = "queryKvRemove";
98100
public const string HOOK_QUERY_KV_INSERT = "queryKvInsert";
99101
public const string HOOK_REMOVE_DOC = "removeDoc";
100-
public const string HOOK_QUERY_ROLLBACK = "queryRollback"; public virtual Task<int?> BeforeAtrCommit(AttemptContext self) => Task.FromResult<int?>(0);
102+
public const string HOOK_BEFORE_DOC_CHANGED_DURING_COMMIT = "beforeDocChangedDuringCommit";
103+
public const string HOOK_QUERY_ROLLBACK = "queryRollback";
104+
public virtual Task<int?> BeforeAtrCommit(AttemptContext self) => Task.FromResult<int?>(0);
101105
public virtual Task<int?> AfterAtrCommit(AttemptContext self) => Task.FromResult<int?>(0);
102106
public virtual Task<int?> BeforeDocCommitted(AttemptContext self, string id) => Task.FromResult<int?>(0);
103107
public virtual Task<int?> BeforeDocRolledBack(AttemptContext self, string id) => Task.FromResult<int?>(0);
104108
public virtual Task<int?> AfterDocCommittedBeforeSavingCas(AttemptContext self, string id) => Task.FromResult<int?>(0);
109+
public virtual Task<int?> BeforeDocChangedDuringCommit(AttemptContext self, string id) => Task.FromResult<int?>(0);
105110
public virtual Task<int?> AfterDocCommitted(AttemptContext self, string id) => Task.FromResult<int?>(0);
106111
public virtual Task<int?> AfterDocsCommitted( AttemptContext self) => Task.FromResult<int?>(0);
107112
public virtual Task<int?> BeforeDocRemoved(AttemptContext self, string id) => Task.FromResult<int?>(0);

src/Couchbase/Client/Transactions/ProtocolVersion.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public static IEnumerable<ExtensionName> ExtensionsSupported()
3232
yield return new ExtensionName("ExtQueryContext", "EXT_QUERY_CONTEXT", "QC");
3333
yield return new ExtensionName("ExtThreadSafe", "EXT_THREAD_SAFE", "TS");
3434
yield return new ExtensionName("ExtInsertExisting", "EXT_INSERT_EXISTING", "IX");
35+
yield return new ExtensionName("ExtReplaceBodyWithXattr", "EXT_REPLACE_BODY_WITH_XATTR","RX");
3536
}
3637

3738
internal static bool Supported(string shortCode) => SupportedShortCodes.Value.Contains(shortCode);

0 commit comments

Comments
 (0)