Skip to content

Commit 34d54e3

Browse files
Tree: Local Rollback (#24954)
This change implements local rollback in shared tree and adds some basic unit test for the scenario. This is modeled after the existing code for aborting a transaction, which behaves similarly. This is a precursor change to expanded fuzz coverage for rollback for all dds via the dds fuzz and incorporating tree into local server stress. There is a prototype of that here: #24930
1 parent 6fa2008 commit 34d54e3

File tree

5 files changed

+235
-0
lines changed

5 files changed

+235
-0
lines changed

packages/dds/tree/src/shared-tree-core/defaultResubmitMachine.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,19 @@ export class DefaultResubmitMachine<TChange> implements ResubmitMachine<TChange>
5959
this.inFlightQueue.push(commit);
6060
}
6161

62+
public onCommitRollback(commit: GraphCommit<TChange>): void {
63+
assert(
64+
this.inFlightQueue.length > 0 &&
65+
commit.revision === this.inFlightQueue[this.inFlightQueue.length - 1]?.revision,
66+
"must rollback latest commit in the in flight queue",
67+
);
68+
69+
if (this.latestInFlightCommitWithStaleEnrichments === this.inFlightQueue.length - 1) {
70+
this.latestInFlightCommitWithStaleEnrichments -= 1;
71+
}
72+
this.inFlightQueue.pop();
73+
}
74+
6275
public prepareForResubmit(toResubmit: readonly GraphCommit<TChange>[]): void {
6376
assert(
6477
!this.isInResubmitPhase,

packages/dds/tree/src/shared-tree-core/resubmitMachine.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,13 @@ export interface ResubmitMachine<TChange> {
3737
*/
3838
onCommitSubmitted(commit: GraphCommit<TChange>): void;
3939

40+
/**
41+
* Must be called on a commit after rollback, so it can be removed
42+
* as it will never be (re)submitted.
43+
* @param commit - The commit that was rolled back
44+
*/
45+
onCommitRollback(commit: GraphCommit<TChange>): void;
46+
4047
/**
4148
* Must be called after a sequenced commit is applied.
4249
* Note that this may be called multiples times in a row after a number of sequenced commits have been applied

packages/dds/tree/src/shared-tree-core/sharedTreeCore.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,20 @@ export class SharedTreeCore<TEditor extends ChangeFamilyEditor, TChange>
418418
const enrichedCommit = this.resubmitMachine.peekNextCommit();
419419
this.submitCommit(enrichedCommit, localOpMetadata, true);
420420
}
421+
public rollback(content: JsonCompatibleReadOnly, localOpMetadata: unknown): void {
422+
// Empty context object is passed in, as our decode function is schema-agnostic.
423+
const {
424+
commit: { revision },
425+
} = this.messageCodec.decode(this.serializer.decode(content), {
426+
idCompressor: this.idCompressor,
427+
});
428+
const [commit] = this.editManager.findLocalCommit(revision);
429+
const { parent } = commit;
430+
assert(parent !== undefined, "must have parent");
431+
const [precedingCommit] = this.editManager.findLocalCommit(parent.revision);
432+
this.editManager.localBranch.removeAfter(precedingCommit);
433+
this.resubmitMachine.onCommitRollback(commit);
434+
}
421435

422436
public applyStashedOp(content: JsonCompatibleReadOnly): void {
423437
// Empty context object is passed in, as our decode function is schema-agnostic.
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
/*!
2+
* Copyright (c) Microsoft Corporation and contributors. All rights reserved.
3+
* Licensed under the MIT License.
4+
*/
5+
6+
import { strict as assert } from "node:assert";
7+
8+
import { createIdCompressor } from "@fluidframework/id-compressor/internal";
9+
10+
import {
11+
MockContainerRuntimeFactory,
12+
MockFluidDataStoreRuntime,
13+
MockStorage,
14+
} from "@fluidframework/test-runtime-utils/internal";
15+
16+
import { SharedTreeTestFactory } from "../utils.js";
17+
18+
import { SchemaFactory, TreeViewConfiguration } from "../../simple-tree/index.js";
19+
import { FlushMode } from "@fluidframework/runtime-definitions/internal";
20+
21+
const sf = new SchemaFactory("Test");
22+
class TestNode extends sf.objectRecursive("test node", {
23+
child: sf.optionalRecursive([sf.number]),
24+
}) {}
25+
26+
function setupTree() {
27+
const containerRuntimeFactory = new MockContainerRuntimeFactory({
28+
flushMode: FlushMode.TurnBased,
29+
});
30+
const dataStoreRuntime1 = new MockFluidDataStoreRuntime({
31+
idCompressor: createIdCompressor(),
32+
});
33+
34+
const factory = new SharedTreeTestFactory(() => {});
35+
36+
const containerRuntime = containerRuntimeFactory.createContainerRuntime(dataStoreRuntime1);
37+
const tree1 = factory.create(dataStoreRuntime1, "A");
38+
tree1.connect({
39+
deltaConnection: dataStoreRuntime1.createDeltaConnection(),
40+
objectStorage: new MockStorage(),
41+
});
42+
43+
const view = tree1.viewWith(new TreeViewConfiguration({ schema: TestNode }));
44+
view.initialize(new TestNode({}));
45+
containerRuntime.flush();
46+
return { view, containerRuntime, containerRuntimeFactory };
47+
}
48+
49+
describe("SharedTreeCore rollback", () => {
50+
it("should rollback a local insert operation", async () => {
51+
const { view, containerRuntime, containerRuntimeFactory } = setupTree();
52+
view.root = new TestNode({ child: 0 });
53+
assert.deepEqual(view.root.child, 0, "after local insert");
54+
55+
// Rollback local change
56+
containerRuntime.rollback?.();
57+
assert.deepEqual(view.root.child, undefined, "after rollback of insert");
58+
59+
// Process messages to ensure no-op
60+
containerRuntime.flush();
61+
containerRuntimeFactory.processAllMessages();
62+
assert.deepEqual(view.root.child, undefined, "after processAllMessages post-rollback");
63+
});
64+
65+
it("should rollback a local update operation", async () => {
66+
const { view, containerRuntime, containerRuntimeFactory } = setupTree();
67+
view.root = new TestNode({ child: 1 });
68+
containerRuntime.flush();
69+
containerRuntimeFactory.processAllMessages();
70+
assert.deepEqual(view.root.child, 1, "after initial insert");
71+
72+
// Local update
73+
view.root.child = 2;
74+
assert.deepEqual(view.root.child, 2, "after local update");
75+
76+
containerRuntime.rollback?.();
77+
assert.deepEqual(view.root.child, 1, "after rollback of update");
78+
});
79+
80+
it("should rollback a local delete operation", async () => {
81+
const { view, containerRuntime, containerRuntimeFactory } = setupTree();
82+
view.root = new TestNode({ child: 5 });
83+
containerRuntime.flush();
84+
containerRuntimeFactory.processAllMessages();
85+
assert.deepEqual(view.root.child, 5, "after initial insert");
86+
87+
// Local delete
88+
view.root.child = undefined;
89+
assert.deepEqual(view.root.child, undefined, "after local delete");
90+
91+
containerRuntime.rollback?.();
92+
assert.deepEqual(view.root.child, 5, "after rollback of delete");
93+
});
94+
95+
it("should rollback multiple local operations in sequence", async () => {
96+
const { view, containerRuntime, containerRuntimeFactory } = setupTree();
97+
view.root = new TestNode({ child: 10 });
98+
containerRuntime.flush();
99+
containerRuntimeFactory.processAllMessages();
100+
assert.deepEqual(view.root.child, 10, "after initial insert");
101+
102+
// Multiple local changes
103+
view.root.child = 20;
104+
view.root.child = undefined;
105+
view.root.child = 30;
106+
assert.deepEqual(view.root.child, 30, "after multiple local ops");
107+
108+
containerRuntime.rollback?.();
109+
assert.deepEqual(view.root.child, 10, "after rollback of multiple ops");
110+
});
111+
112+
it("should not rollback already flushed (acked) operations", async () => {
113+
const { view, containerRuntime, containerRuntimeFactory } = setupTree();
114+
view.root = new TestNode({ child: 100 });
115+
containerRuntime.flush();
116+
containerRuntimeFactory.processAllMessages();
117+
assert.deepEqual(view.root.child, 100, "after flush and process");
118+
119+
// Local change and flush again
120+
view.root.child = 200;
121+
containerRuntime.flush();
122+
containerRuntimeFactory.processAllMessages();
123+
assert.deepEqual(view.root.child, 200, "after second flush");
124+
125+
// Rollback should not affect acked changes
126+
containerRuntime.rollback?.();
127+
assert.deepEqual(view.root.child, 200, "rollback after flush (no effect)");
128+
});
129+
130+
it("should be a no-op if rollback is called with no pending changes", async () => {
131+
const { view, containerRuntime, containerRuntimeFactory } = setupTree();
132+
view.root = new TestNode({ child: 7 });
133+
containerRuntime.flush();
134+
containerRuntimeFactory.processAllMessages();
135+
assert.deepEqual(view.root.child, 7, "after flush");
136+
137+
containerRuntime.rollback?.();
138+
assert.deepEqual(view.root.child, 7, "rollback with no pending changes");
139+
});
140+
141+
it("should rollback local changes in presence of remote changes from another client", async () => {
142+
const containerRuntimeFactory = new MockContainerRuntimeFactory({
143+
flushMode: FlushMode.TurnBased,
144+
});
145+
const factory = new SharedTreeTestFactory(() => {});
146+
147+
// Client 1
148+
const dataStoreRuntime1 = new MockFluidDataStoreRuntime({
149+
idCompressor: createIdCompressor(),
150+
clientId: "1",
151+
});
152+
const containerRuntime1 =
153+
containerRuntimeFactory.createContainerRuntime(dataStoreRuntime1);
154+
const tree1 = factory.create(dataStoreRuntime1, "A");
155+
tree1.connect({
156+
deltaConnection: dataStoreRuntime1.createDeltaConnection(),
157+
objectStorage: new MockStorage(),
158+
});
159+
const view1 = tree1.viewWith(new TreeViewConfiguration({ schema: TestNode }));
160+
view1.initialize(new TestNode({}));
161+
containerRuntime1.flush();
162+
163+
// Client 2
164+
const dataStoreRuntime2 = new MockFluidDataStoreRuntime({
165+
idCompressor: createIdCompressor(),
166+
clientId: "2",
167+
});
168+
const containerRuntime2 =
169+
containerRuntimeFactory.createContainerRuntime(dataStoreRuntime2);
170+
const tree2 = factory.create(dataStoreRuntime2, "A");
171+
tree2.connect({
172+
deltaConnection: dataStoreRuntime2.createDeltaConnection(),
173+
objectStorage: new MockStorage(),
174+
});
175+
const view2 = tree2.viewWith(new TreeViewConfiguration({ schema: TestNode }));
176+
view2.initialize(new TestNode({}));
177+
containerRuntime2.flush();
178+
179+
containerRuntimeFactory.processAllMessages();
180+
181+
// Client 1 makes a local change (not flushed)
182+
view1.root.child = 1;
183+
assert.deepEqual(view1.root.child, 1, "client 1 local change");
184+
185+
// Client 2 makes a local change and flushes
186+
view2.root.child = 2;
187+
containerRuntime2.flush();
188+
containerRuntimeFactory.processAllMessages();
189+
190+
// Rollback local change in client 1
191+
containerRuntime1.rollback?.();
192+
containerRuntime1.flush();
193+
containerRuntimeFactory.processAllMessages();
194+
195+
// Should reflect remote change from client 2
196+
assert.deepEqual(view1.root.child, 2, "client 1 after rollback and remote change");
197+
});
198+
});

packages/dds/tree/src/test/shared-tree-core/sharedTreeCore.spec.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,9 @@ describe("SharedTreeCore", () => {
410410
public onSequencedCommitApplied(isLocal: boolean): void {
411411
this.sequencingLog.push(isLocal);
412412
}
413+
public onCommitRollback(): void {
414+
throw new Error("not implemented");
415+
}
413416
}
414417

415418
interface Enrichment<T extends object> {

0 commit comments

Comments
 (0)