From 07cae9fdcef361f9da3d3fad75677efbc2a4ec33 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Fri, 10 Jan 2025 19:40:02 +0000 Subject: [PATCH 01/17] Unskip topology multi-actor conflict tests --- topologytest/hlv_test.go | 4 ---- topologytest/multi_actor_conflict_test.go | 10 ---------- 2 files changed, 14 deletions(-) diff --git a/topologytest/hlv_test.go b/topologytest/hlv_test.go index 24f45b1882..01e007374a 100644 --- a/topologytest/hlv_test.go +++ b/topologytest/hlv_test.go @@ -91,10 +91,6 @@ func createConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, pee if backingPeers[peerName] { continue } - if peer.Type() == PeerTypeCouchbaseLite { - // FIXME: Skipping Couchbase Lite tests for multi actor conflicts, CBG-4434 - continue - } docBody := []byte(fmt.Sprintf(`{"activePeer": "%s", "topology": "%s", "action": "create"}`, peerName, topologyDescription)) docVersion := peer.CreateDocument(dsName, docID, docBody) t.Logf("%s - createVersion: %#v", peerName, docVersion.docMeta) diff --git a/topologytest/multi_actor_conflict_test.go b/topologytest/multi_actor_conflict_test.go index ce2290d40f..434dc9dc7a 100644 --- a/topologytest/multi_actor_conflict_test.go +++ b/topologytest/multi_actor_conflict_test.go @@ -9,7 +9,6 @@ package topologytest import ( - "strings" "testing" ) @@ -42,9 +41,6 @@ func TestMultiActorConflictCreate(t *testing.T) { // 7. assert that the documents are deleted on all peers and have hlv sources equal to the number of active peers func TestMultiActorConflictUpdate(t *testing.T) { for _, topology := range append(simpleTopologies, Topologies...) { - if strings.Contains(topology.description, "CBL") { - t.Skip("CBL actor can generate conflicts and push replication fails with conflict for doc in blip tester CBL-4267") - } t.Run(topology.description, func(t *testing.T) { collectionName, peers, replications := setupTests(t, topology) replications.Stop() @@ -74,9 +70,6 @@ func TestMultiActorConflictUpdate(t *testing.T) { // 7. assert that the documents are deleted on all peers and have hlv sources equal to the number of active peers func TestMultiActorConflictDelete(t *testing.T) { for _, topology := range append(simpleTopologies, Topologies...) { - if strings.Contains(topology.description, "CBL") { - t.Skip("CBL actor can generate conflicts and push replication fails with conflict for doc in blip tester CBL-4267") - } t.Run(topology.description, func(t *testing.T) { collectionName, peers, replications := setupTests(t, topology) replications.Stop() @@ -110,9 +103,6 @@ func TestMultiActorConflictDelete(t *testing.T) { // 11. assert that the documents are resurrected on all peers and have hlv sources equal to the number of active peers and the document body is equivalent to the last write func TestMultiActorConflictResurrect(t *testing.T) { for _, topology := range append(simpleTopologies, Topologies...) { - if strings.Contains(topology.description, "CBL") { - t.Skip("CBL actor can generate conflicts and push replication fails with conflict for doc in blip tester CBL-4267") - } t.Run(topology.description, func(t *testing.T) { collectionName, peers, replications := setupTests(t, topology) replications.Stop() From c30df88a94fc84a7f6f9139ddb427b7b387008cc Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Fri, 10 Jan 2025 21:27:02 +0000 Subject: [PATCH 02/17] Resolve conflicts on pull and rewrite local winner as new version to be pushed back --- db/hybrid_logical_vector.go | 8 +- rest/utilities_testing_blip_client.go | 124 +++++++++++++++--- topologytest/couchbase_lite_mock_peer_test.go | 6 +- topologytest/couchbase_server_peer_test.go | 4 +- topologytest/hlv_test.go | 48 ++++--- topologytest/multi_actor_conflict_test.go | 33 +++-- topologytest/sync_gateway_peer_test.go | 6 +- 7 files changed, 166 insertions(+), 63 deletions(-) diff --git a/db/hybrid_logical_vector.go b/db/hybrid_logical_vector.go index 9fce119b10..dc9368edfe 100644 --- a/db/hybrid_logical_vector.go +++ b/db/hybrid_logical_vector.go @@ -346,13 +346,13 @@ func (hlv *HybridLogicalVector) AddNewerVersions(otherVector *HybridLogicalVecto // for source if the local version for that source is lower for i, v := range otherVector.PreviousVersions { if hlv.PreviousVersions[i] == 0 { - hlv.setPreviousVersion(i, v) + hlv.SetPreviousVersion(i, v) } else { // if we get here then there is entry for this source in PV so we must check if its newer or not otherHLVPVValue := v localHLVPVValue := hlv.PreviousVersions[i] if localHLVPVValue < otherHLVPVValue { - hlv.setPreviousVersion(i, v) + hlv.SetPreviousVersion(i, v) } } } @@ -384,8 +384,8 @@ func (hlv *HybridLogicalVector) computeMacroExpansions() []sgbucket.MacroExpansi return outputSpec } -// setPreviousVersion will take a source/version pair and add it to the HLV previous versions map -func (hlv *HybridLogicalVector) setPreviousVersion(source string, version uint64) { +// SetPreviousVersion will take a source/version pair and add it to the HLV previous versions map +func (hlv *HybridLogicalVector) SetPreviousVersion(source string, version uint64) { if hlv.PreviousVersions == nil { hlv.PreviousVersions = make(HLVVersions) } diff --git a/rest/utilities_testing_blip_client.go b/rest/utilities_testing_blip_client.go index a14c2e1b2c..d3f968351d 100644 --- a/rest/utilities_testing_blip_client.go +++ b/rest/utilities_testing_blip_client.go @@ -37,6 +37,22 @@ const ( RevtreeSubtestName = "revTree" ) +type BlipTesterClientConflictResolverType string + +const ( + ConflictResolverLastWriteWins BlipTesterClientConflictResolverType = "lww" + + ConflictResolverDefault = ConflictResolverLastWriteWins +) + +func (c BlipTesterClientConflictResolverType) IsValid() bool { + switch c { + case ConflictResolverLastWriteWins: + return true + } + return false +} + type BlipTesterClientOpts struct { ClientDeltas bool // Support deltas on the client side Username string @@ -62,6 +78,8 @@ type BlipTesterClientOpts struct { // SourceID is used to define the SourceID for the blip client SourceID string + + ConflictResolver BlipTesterClientConflictResolverType } // defaultBlipTesterClientRevsLimit is the number of revisions sent as history when the client replicates - older revisions are not sent, and may not be stored. @@ -281,6 +299,12 @@ func (cd *clientDoc) currentVersion(t testing.TB) *db.Version { return &rev.version.CV } +func (cd *clientDoc) _currentVersion(t testing.TB) *db.Version { + rev, err := cd._latestRev() + require.NoError(t, err) + return &rev.version.CV +} + type BlipTesterCollectionClient struct { parent *BlipTesterClient @@ -571,20 +595,41 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { doc.lock.Lock() defer doc.lock.Unlock() + var incomingVersion DocVersion var newVersion DocVersion var hlv db.HybridLogicalVector if btc.UseHLV() { + var incomingHLV *db.HybridLogicalVector if revHistory != "" { - existingVersion, _, err := db.ExtractHLVFromBlipMessage(revHistory) + incomingHLV, _, err = db.ExtractHLVFromBlipMessage(revHistory) require.NoError(btr.TB(), err, "error extracting HLV %q: %v", revHistory, err) - hlv = *existingVersion + hlv = *incomingHLV } - v, err := db.ParseVersion(revID) + incomingCV, err := db.ParseVersion(revID) require.NoError(btr.TB(), err, "error parsing version %q: %v", revID, err) - newVersion = DocVersion{CV: v} - require.NoError(btr.TB(), hlv.AddVersion(v)) + incomingVersion = DocVersion{CV: incomingCV} + + clientCV := doc._currentVersion(btc.TB()) + // incoming rev older than stored client version and comes from a different source - need to resolve + if incomingCV.Value < clientCV.Value && incomingCV.SourceID != clientCV.SourceID { + btc.TB().Logf("Detected conflict on pull of doc %q (clientCV:%v - incomingCV:%v incomingHLV:%#v)", docID, clientCV, incomingCV, incomingHLV) + switch btc.BlipTesterClientOpts.ConflictResolver { + case ConflictResolverLastWriteWins: + // generate a new version for the resolution and write it to the remote HLV + v := db.Version{SourceID: fmt.Sprintf("btc-%d", btc.id), Value: uint64(time.Now().UnixNano())} + require.NoError(btc.TB(), hlv.AddVersion(v), "couldn't add incoming HLV into client HLV") + newVersion = DocVersion{CV: v} + hlv.SetPreviousVersion(incomingCV.SourceID, incomingCV.Value) + default: + btc.TB().Fatalf("Unknown conflict resolver %q - cannot resolve detected conflict", btc.BlipTesterClientOpts.ConflictResolver) + } + } else { + newVersion = DocVersion{CV: incomingCV} + } + require.NoError(btc.TB(), hlv.AddVersion(newVersion.CV), "couldn't add newVersion CV into doc HLV") } else { newVersion = DocVersion{RevTreeID: revID} + incomingVersion = newVersion } docRev := clientDocRev{ @@ -614,12 +659,16 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { // store the new sequence for a replaced rev for tests waiting for this specific rev doc._seqsByVersions[replacedVersion] = newClientSeq } - doc._latestServerVersion = newVersion + // store the _incoming_ version - not newVersion - since we may have written a resolved conflict which will need pushing back + doc._latestServerVersion = incomingVersion if !msg.NoReply() { response := msg.Response() response.SetBody([]byte(`[]`)) } + + // new sequence written, wake up changes feeds for push + btcr._seqCond.Broadcast() return } @@ -794,24 +843,53 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { doc.lock.Lock() defer doc.lock.Unlock() - var newVersion DocVersion + var incomingVersion DocVersion + var versionToWrite DocVersion var hlv db.HybridLogicalVector if btc.UseHLV() { + var incomingHLV *db.HybridLogicalVector if revHistory != "" { - existingVersion, _, err := db.ExtractHLVFromBlipMessage(revHistory) + incomingHLV, _, err = db.ExtractHLVFromBlipMessage(revHistory) require.NoError(btr.TB(), err, "error extracting HLV %q: %v", revHistory, err) - hlv = *existingVersion + hlv = *incomingHLV } - v, err := db.ParseVersion(revID) + incomingCV, err := db.ParseVersion(revID) require.NoError(btr.TB(), err, "error parsing version %q: %v", revID, err) - newVersion = DocVersion{CV: v} - require.NoError(btr.TB(), hlv.AddVersion(v)) + incomingVersion = DocVersion{CV: incomingCV} + + // fetch client's latest version to do conflict check and resolution + latestClientRev, err := doc._latestRev() + require.NoError(btc.TB(), err, "couldn't get latest revision for doc %q", docID) + if latestClientRev != nil { + clientCV := latestClientRev.version.CV + + // incoming rev older than stored client version and comes from a different source - need to resolve + if incomingCV.Value < clientCV.Value && incomingCV.SourceID != clientCV.SourceID { + btc.TB().Logf("Detected conflict on pull of doc %q (clientCV:%v - incomingCV:%v incomingHLV:%#v)", docID, clientCV, incomingCV, incomingHLV) + switch btc.BlipTesterClientOpts.ConflictResolver { + case ConflictResolverLastWriteWins: + // local wins so write the local body back as a new resolved version (based on incoming HLV) to push + body = latestClientRev.body + v := db.Version{SourceID: fmt.Sprintf("btc-%d", btc.id), Value: uint64(time.Now().UnixNano())} + require.NoError(btc.TB(), hlv.AddVersion(v), "couldn't add incoming HLV into client HLV") + versionToWrite = DocVersion{CV: v} + hlv.SetPreviousVersion(incomingCV.SourceID, incomingCV.Value) + default: + btc.TB().Fatalf("Unknown conflict resolver %q - cannot resolve detected conflict", btc.BlipTesterClientOpts.ConflictResolver) + } + } else { + // no conflict - accept incoming rev + versionToWrite = DocVersion{CV: incomingCV} + } + } + require.NoError(btc.TB(), hlv.AddVersion(versionToWrite.CV), "couldn't add new CV into doc HLV") } else { - newVersion = DocVersion{RevTreeID: revID} + versionToWrite = DocVersion{RevTreeID: revID} + incomingVersion = versionToWrite } docRev := clientDocRev{ clientSeq: newClientSeq, - version: newVersion, + version: versionToWrite, HLV: hlv, body: body, message: msg, @@ -835,12 +913,16 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { // store the new sequence for a replaced rev for tests waiting for this specific rev doc._seqsByVersions[replacedVersion] = newClientSeq } - doc._latestServerVersion = newVersion + // store the _incoming_ version - not versionToWrite - since we may have written a resolved conflict which will need pushing back + doc._latestServerVersion = incomingVersion if !msg.NoReply() { response := msg.Response() response.SetBody([]byte(`[]`)) } + + // new sequence written, wake up changes feeds for push + btcr._seqCond.Broadcast() } btr.bt.blipContext.HandlerForProfile[db.MessageGetAttachment] = func(msg *blip.Message) { @@ -1007,11 +1089,17 @@ func (btcRunner *BlipTestClientRunner) NewBlipTesterClientOptsWithRT(rt *RestTes if !opts.AllowCreationWithoutBlipTesterClientRunner && !btcRunner.initialisedInsideRunnerCode { require.FailNow(btcRunner.TB(), "must initialise BlipTesterClient inside Run() method") } - id, err := uuid.NewRandom() - require.NoError(btcRunner.TB(), err) + if opts.ConflictResolver == "" { + opts.ConflictResolver = ConflictResolverDefault + } + if !opts.ConflictResolver.IsValid() { + require.FailNow(btcRunner.TB(), "invalid conflict resolver %q", opts.ConflictResolver) + } if opts.SourceID == "" { - opts.SourceID = fmt.Sprintf("btc-%d", id.ID()) + opts.SourceID = "blipclient" } + id, err := uuid.NewRandom() + require.NoError(btcRunner.TB(), err) client = &BlipTesterClient{ BlipTesterClientOpts: *opts, diff --git a/topologytest/couchbase_lite_mock_peer_test.go b/topologytest/couchbase_lite_mock_peer_test.go index 05000a4827..7f82b62ee8 100644 --- a/topologytest/couchbase_lite_mock_peer_test.go +++ b/topologytest/couchbase_lite_mock_peer_test.go @@ -65,7 +65,11 @@ func (p *CouchbaseLiteMockPeer) GetDocument(dsName sgbucket.DataStoreName, docID bodyBytes, meta := p.getLatestDocVersion(dsName, docID) require.NotNil(p.TB(), meta, "docID:%s not found on %s", docID, p) var body db.Body - require.NoError(p.TB(), base.JSONUnmarshal(bodyBytes, &body)) + // it's easier if all clients can return consistent bodies for tombstones + // lets just settle on nil, since we still need special handling anyway for `` vs `{}` so unmarshal doesn't barf + if len(bodyBytes) > 0 && string(bodyBytes) != base.EmptyDocument { + require.NoError(p.TB(), base.JSONUnmarshal(bodyBytes, &body)) + } return *meta, body } diff --git a/topologytest/couchbase_server_peer_test.go b/topologytest/couchbase_server_peer_test.go index 0ca143ff3f..ae41ca7682 100644 --- a/topologytest/couchbase_server_peer_test.go +++ b/topologytest/couchbase_server_peer_test.go @@ -342,6 +342,8 @@ func getBodyAndVersion(peer Peer, collection sgbucket.DataStore, docID string) ( require.NoError(peer.TB(), err) // get hlv to construct DocVersion var body db.Body - require.NoError(peer.TB(), base.JSONUnmarshal(docBytes, &body)) + if len(docBytes) > 0 { + require.NoError(peer.TB(), base.JSONUnmarshal(docBytes, &body)) + } return getDocVersion(docID, peer, cas, xattrs), body } diff --git a/topologytest/hlv_test.go b/topologytest/hlv_test.go index 01e007374a..a55da352ed 100644 --- a/topologytest/hlv_test.go +++ b/topologytest/hlv_test.go @@ -15,6 +15,7 @@ import ( "github.com/couchbase/sync_gateway/base" "github.com/couchbase/sync_gateway/db" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -67,6 +68,25 @@ func waitForTombstoneVersion(t *testing.T, dsName base.ScopeAndCollectionName, p } } +// waitForConvergingVersion waits for the same document version to reach all peers. +func waitForConvergingVersion(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID string) { + t.Logf("waiting for converged doc versions across all peers") + require.EventuallyWithT(t, func(c *assert.CollectT) { + for peerAid, peerA := range peers.SortedPeers() { + docMetaA, bodyA := peerA.GetDocument(dsName, docID) + for peerBid, peerB := range peers.SortedPeers() { + if peerAid == peerBid { + continue + } + docMetaB, bodyB := peerB.GetDocument(dsName, docID) + cvA, cvB := docMetaA.CV(t), docMetaB.CV(t) + require.Equalf(c, cvA, cvB, "CV mismatch: %s:%#v != %s:%#v", peerAid, docMetaA, peerBid, docMetaB) + require.Equalf(c, bodyA, bodyB, "body mismatch: %s:%s != %s:%s", peerAid, bodyA, peerBid, bodyB) + } + } + }, totalWaitTime, pollInterval) +} + // removeSyncGatewayBackingPeers will check if there is sync gateway in topology, if so will track the backing CBS // so we can skip creating docs on these peers (avoiding conflicts between docs created on the SGW and cbs) func removeSyncGatewayBackingPeers(peers map[string]Peer) map[string]bool { @@ -82,9 +102,9 @@ func removeSyncGatewayBackingPeers(peers map[string]Peer) map[string]bool { return peersToRemove } -// createConflictingDocs will create a doc on each peer of the same doc ID to create conflicting documents, then -// returns the last peer to have a doc created on it -func createConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID, topologyDescription string) (lastWrite BodyAndVersion) { +// createConflictingDocs will create a doc on each peer of the same doc ID to create conflicting documents. +// It is not known at this stage which write the "winner" will be, since conflict resolution can happen at replication time which may not be LWW, or may be LWW but with a new value. +func createConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID, topologyDescription string) { backingPeers := removeSyncGatewayBackingPeers(peers) documentVersion := make([]BodyAndVersion, 0, len(peers)) for peerName, peer := range peers { @@ -96,15 +116,10 @@ func createConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, pee t.Logf("%s - createVersion: %#v", peerName, docVersion.docMeta) documentVersion = append(documentVersion, docVersion) } - index := len(documentVersion) - 1 - lastWrite = documentVersion[index] - - return lastWrite } -// updateConflictingDocs will update a doc on each peer of the same doc ID to create conflicting document mutations, then -// returns the last peer to have a doc updated on it. -func updateConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID, topologyDescription string) (lastWrite BodyAndVersion) { +// updateConflictingDocs will update a doc on each peer of the same doc ID to create conflicting document mutations +func updateConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID, topologyDescription string) { backingPeers := removeSyncGatewayBackingPeers(peers) var documentVersion []BodyAndVersion for peerName, peer := range peers { @@ -116,15 +131,10 @@ func updateConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, pee t.Logf("updateVersion: %#v", docVersion.docMeta) documentVersion = append(documentVersion, docVersion) } - index := len(documentVersion) - 1 - lastWrite = documentVersion[index] - - return lastWrite } -// deleteConflictDocs will delete a doc on each peer of the same doc ID to create conflicting document deletions, then -// returns the last peer to have a doc deleted on it -func deleteConflictDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID string) (lastWrite BodyAndVersion) { +// deleteConflictDocs will delete a doc on each peer of the same doc ID to create conflicting document deletions +func deleteConflictDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID string) { backingPeers := removeSyncGatewayBackingPeers(peers) var documentVersion []BodyAndVersion for peerName, peer := range peers { @@ -135,10 +145,6 @@ func deleteConflictDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers t.Logf("deleteVersion: %#v", deleteVersion) documentVersion = append(documentVersion, BodyAndVersion{docMeta: deleteVersion, updatePeer: peerName}) } - index := len(documentVersion) - 1 - lastWrite = documentVersion[index] - - return lastWrite } // getDocID returns a unique doc ID for the test case. Note: when running with Couchbase Server and -count > 1, this will return duplicate IDs for count 2 and higher and they can conflict due to the way bucket pool works. diff --git a/topologytest/multi_actor_conflict_test.go b/topologytest/multi_actor_conflict_test.go index 434dc9dc7a..7825f2a0ff 100644 --- a/topologytest/multi_actor_conflict_test.go +++ b/topologytest/multi_actor_conflict_test.go @@ -23,10 +23,9 @@ func TestMultiActorConflictCreate(t *testing.T) { replications.Stop() docID := getDocID(t) - docVersion := createConflictingDocs(t, collectionName, peers, docID, topology.description) + createConflictingDocs(t, collectionName, peers, docID, topology.description) replications.Start() - waitForVersionAndBody(t, collectionName, peers, replications, docID, docVersion) - + waitForConvergingVersion(t, collectionName, peers, replications, docID) }) } } @@ -46,16 +45,16 @@ func TestMultiActorConflictUpdate(t *testing.T) { replications.Stop() docID := getDocID(t) - docVersion := createConflictingDocs(t, collectionName, peers, docID, topology.description) + createConflictingDocs(t, collectionName, peers, docID, topology.description) replications.Start() - waitForVersionAndBody(t, collectionName, peers, replications, docID, docVersion) + waitForConvergingVersion(t, collectionName, peers, replications, docID) replications.Stop() - docVersion = updateConflictingDocs(t, collectionName, peers, docID, topology.description) + updateConflictingDocs(t, collectionName, peers, docID, topology.description) replications.Start() - waitForVersionAndBody(t, collectionName, peers, replications, docID, docVersion) + waitForConvergingVersion(t, collectionName, peers, replications, docID) }) } } @@ -75,16 +74,16 @@ func TestMultiActorConflictDelete(t *testing.T) { replications.Stop() docID := getDocID(t) - docVersion := createConflictingDocs(t, collectionName, peers, docID, topology.description) + createConflictingDocs(t, collectionName, peers, docID, topology.description) replications.Start() - waitForVersionAndBody(t, collectionName, peers, replications, docID, docVersion) + waitForConvergingVersion(t, collectionName, peers, replications, docID) replications.Stop() - lastWrite := deleteConflictDocs(t, collectionName, peers, docID) + deleteConflictDocs(t, collectionName, peers, docID) replications.Start() - waitForTombstoneVersion(t, collectionName, peers, replications, docID, lastWrite) + waitForConvergingVersion(t, collectionName, peers, replications, docID) }) } } @@ -108,23 +107,23 @@ func TestMultiActorConflictResurrect(t *testing.T) { replications.Stop() docID := getDocID(t) - docVersion := createConflictingDocs(t, collectionName, peers, docID, topology.description) + createConflictingDocs(t, collectionName, peers, docID, topology.description) replications.Start() - waitForVersionAndBody(t, collectionName, peers, replications, docID, docVersion) + waitForConvergingVersion(t, collectionName, peers, replications, docID) replications.Stop() - lastWrite := deleteConflictDocs(t, collectionName, peers, docID) + deleteConflictDocs(t, collectionName, peers, docID) replications.Start() - waitForTombstoneVersion(t, collectionName, peers, replications, docID, lastWrite) + waitForConvergingVersion(t, collectionName, peers, replications, docID) replications.Stop() - lastWriteVersion := updateConflictingDocs(t, collectionName, peers, docID, topology.description) + updateConflictingDocs(t, collectionName, peers, docID, topology.description) replications.Start() - waitForVersionAndBody(t, collectionName, peers, replications, docID, lastWriteVersion) + waitForConvergingVersion(t, collectionName, peers, replications, docID) }) } } diff --git a/topologytest/sync_gateway_peer_test.go b/topologytest/sync_gateway_peer_test.go index 887047799b..c05665044d 100644 --- a/topologytest/sync_gateway_peer_test.go +++ b/topologytest/sync_gateway_peer_test.go @@ -63,7 +63,11 @@ func (p *SyncGatewayPeer) GetDocument(dsName sgbucket.DataStoreName, docID strin collection, ctx := p.getCollection(dsName) doc, err := collection.GetDocument(ctx, docID, db.DocUnmarshalAll) require.NoError(p.TB(), err) - return DocMetadataFromDocument(doc), doc.Body(ctx) + var body db.Body + if !doc.IsDeleted() { + body = doc.Body(ctx) + } + return DocMetadataFromDocument(doc), body } // CreateDocument creates a document on the peer. The test will fail if the document already exists. From a40b38365d58899c255bd3131fe8473b13249162 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Mon, 13 Jan 2025 15:24:16 +0000 Subject: [PATCH 03/17] Add missing versionToWrite case from attempted cleanup --- rest/utilities_testing_blip_client.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/rest/utilities_testing_blip_client.go b/rest/utilities_testing_blip_client.go index d3f968351d..8bfa6ddc04 100644 --- a/rest/utilities_testing_blip_client.go +++ b/rest/utilities_testing_blip_client.go @@ -881,6 +881,9 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { // no conflict - accept incoming rev versionToWrite = DocVersion{CV: incomingCV} } + } else { + // no existing rev - accept incoming rev + versionToWrite = DocVersion{CV: incomingCV} } require.NoError(btc.TB(), hlv.AddVersion(versionToWrite.CV), "couldn't add new CV into doc HLV") } else { From 69e5d6e57ed165e1d55efda71b3b2db34b588a25 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Mon, 13 Jan 2025 17:52:24 +0000 Subject: [PATCH 04/17] Fix incorrect parsing of HLV history in BLIP rev message on BlipTesterClient --- db/hybrid_logical_vector.go | 35 +++++++++++++++++++++++++++ rest/utilities_testing_blip_client.go | 8 +++--- 2 files changed, 39 insertions(+), 4 deletions(-) diff --git a/db/hybrid_logical_vector.go b/db/hybrid_logical_vector.go index dc9368edfe..076fecf4bc 100644 --- a/db/hybrid_logical_vector.go +++ b/db/hybrid_logical_vector.go @@ -443,6 +443,41 @@ func (hlv *HybridLogicalVector) ToHistoryForHLV() string { return s.String() } +func FromHistoryForHLV(history string) (*HybridLogicalVector, error) { + hlv := NewHybridLogicalVector() + // split the history string into PV and MV + versionSets := strings.Split(history, ";") + switch len(versionSets) { + case 0: + // no versions present + return hlv, nil + case 2: + // MV + mvs := strings.Split(versionSets[1], ",") + for _, mv := range mvs { + v, err := ParseVersion(mv) + if err != nil { + return nil, err + } + hlv.MergeVersions[v.SourceID] = v.Value + } + fallthrough + case 1: + // PV + pvs := strings.Split(versionSets[0], ",") + for _, pv := range pvs { + v, err := ParseVersion(pv) + if err != nil { + return nil, err + } + hlv.PreviousVersions[v.SourceID] = v.Value + } + default: + return nil, fmt.Errorf("Invalid history string format") + } + return hlv, nil +} + // appendRevocationMacroExpansions adds macro expansions for the channel map. Not strictly an HLV operation // but putting the function here as it's required when the HLV's current version is being macro expanded func appendRevocationMacroExpansions(currentSpec []sgbucket.MacroExpansionSpec, channelNames []string) (updatedSpec []sgbucket.MacroExpansionSpec) { diff --git a/rest/utilities_testing_blip_client.go b/rest/utilities_testing_blip_client.go index 8bfa6ddc04..cbb7de1682 100644 --- a/rest/utilities_testing_blip_client.go +++ b/rest/utilities_testing_blip_client.go @@ -601,8 +601,8 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { if btc.UseHLV() { var incomingHLV *db.HybridLogicalVector if revHistory != "" { - incomingHLV, _, err = db.ExtractHLVFromBlipMessage(revHistory) - require.NoError(btr.TB(), err, "error extracting HLV %q: %v", revHistory, err) + incomingHLV, err = db.FromHistoryForHLV(revHistory) + require.NoError(btr.TB(), err, "error extracting HLV history %q: %v", revHistory, err) hlv = *incomingHLV } incomingCV, err := db.ParseVersion(revID) @@ -849,8 +849,8 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { if btc.UseHLV() { var incomingHLV *db.HybridLogicalVector if revHistory != "" { - incomingHLV, _, err = db.ExtractHLVFromBlipMessage(revHistory) - require.NoError(btr.TB(), err, "error extracting HLV %q: %v", revHistory, err) + incomingHLV, err = db.FromHistoryForHLV(revHistory) + require.NoError(btr.TB(), err, "error extracting HLV history %q: %v", revHistory, err) hlv = *incomingHLV } incomingCV, err := db.ParseVersion(revID) From dba14fba4c7a5fa399caaad816543ecb7d16faa6 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Mon, 13 Jan 2025 18:01:53 +0000 Subject: [PATCH 05/17] fix lint - rip out more unused code --- topologytest/hlv_test.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/topologytest/hlv_test.go b/topologytest/hlv_test.go index a55da352ed..e5e06835d7 100644 --- a/topologytest/hlv_test.go +++ b/topologytest/hlv_test.go @@ -106,7 +106,6 @@ func removeSyncGatewayBackingPeers(peers map[string]Peer) map[string]bool { // It is not known at this stage which write the "winner" will be, since conflict resolution can happen at replication time which may not be LWW, or may be LWW but with a new value. func createConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID, topologyDescription string) { backingPeers := removeSyncGatewayBackingPeers(peers) - documentVersion := make([]BodyAndVersion, 0, len(peers)) for peerName, peer := range peers { if backingPeers[peerName] { continue @@ -114,14 +113,12 @@ func createConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, pee docBody := []byte(fmt.Sprintf(`{"activePeer": "%s", "topology": "%s", "action": "create"}`, peerName, topologyDescription)) docVersion := peer.CreateDocument(dsName, docID, docBody) t.Logf("%s - createVersion: %#v", peerName, docVersion.docMeta) - documentVersion = append(documentVersion, docVersion) } } // updateConflictingDocs will update a doc on each peer of the same doc ID to create conflicting document mutations func updateConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID, topologyDescription string) { backingPeers := removeSyncGatewayBackingPeers(peers) - var documentVersion []BodyAndVersion for peerName, peer := range peers { if backingPeers[peerName] { continue @@ -129,21 +126,18 @@ func updateConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, pee docBody := []byte(fmt.Sprintf(`{"activePeer": "%s", "topology": "%s", "action": "update"}`, peerName, topologyDescription)) docVersion := peer.WriteDocument(dsName, docID, docBody) t.Logf("updateVersion: %#v", docVersion.docMeta) - documentVersion = append(documentVersion, docVersion) } } // deleteConflictDocs will delete a doc on each peer of the same doc ID to create conflicting document deletions func deleteConflictDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID string) { backingPeers := removeSyncGatewayBackingPeers(peers) - var documentVersion []BodyAndVersion for peerName, peer := range peers { if backingPeers[peerName] { continue } deleteVersion := peer.DeleteDocument(dsName, docID) t.Logf("deleteVersion: %#v", deleteVersion) - documentVersion = append(documentVersion, BodyAndVersion{docMeta: deleteVersion, updatePeer: peerName}) } } From e873a40f257eac76956bcb50ad16fa64a35329dd Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Mon, 13 Jan 2025 18:10:41 +0000 Subject: [PATCH 06/17] Fix TestBlipNonDeltaSyncPush flake --- rest/blip_api_delta_sync_test.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/rest/blip_api_delta_sync_test.go b/rest/blip_api_delta_sync_test.go index 7c5ca79f04..b69e2dcd50 100644 --- a/rest/blip_api_delta_sync_test.go +++ b/rest/blip_api_delta_sync_test.go @@ -960,18 +960,22 @@ func TestBlipNonDeltaSyncPush(t *testing.T) { defer client.Close() client.ClientDeltas = false - btcRunner.StartPull(client.id) - btcRunner.StartPush(client.id) // create doc1 rev 1-0335a345b6ffed05707ccc4cbc1b67f4 version := rt.PutDocDirectly(docID, JsonToMap(t, `{"greetings": [{"hello": "world!"}, {"hi": "alice"}]}`)) + btcRunner.StartOneshotPull(client.id) data := btcRunner.WaitForVersion(client.id, docID, version) assert.Equal(t, `{"greetings":[{"hello":"world!"},{"hi":"alice"}]}`, string(data)) + // create doc1 rev 2-abcxyz on client newRev := btcRunner.AddRev(client.id, docID, &version, []byte(`{"greetings":[{"hello":"world!"},{"hi":"alice"},{"howdy":"bob"}]}`)) - // Check EE is delta, and CE is full-body replication + + btcRunner.StartPushWithOpts(client.id, BlipTesterPushOptions{Continuous: false, Since: "0"}) + msg := client.waitForReplicationMessage(collection, 2) + // ensure message is type rev + require.Equal(t, db.MessageRev, msg.Profile()) // Check the request was NOT sent with a deltaSrc property assert.Equal(t, "", msg.Properties[db.RevMessageDeltaSrc]) From 7c9523999d71f24255c2971599840ac8edc9e9ac Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Mon, 13 Jan 2025 19:25:42 +0000 Subject: [PATCH 07/17] Address PR comments - add safety assertion to ensure changes is filtering out already known revs --- db/hybrid_logical_vector.go | 2 +- rest/utilities_testing_blip_client.go | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/db/hybrid_logical_vector.go b/db/hybrid_logical_vector.go index 076fecf4bc..e8a6f5f6d0 100644 --- a/db/hybrid_logical_vector.go +++ b/db/hybrid_logical_vector.go @@ -384,7 +384,7 @@ func (hlv *HybridLogicalVector) computeMacroExpansions() []sgbucket.MacroExpansi return outputSpec } -// SetPreviousVersion will take a source/version pair and add it to the HLV previous versions map +// SetPreviousVersion will take a source/version pair and sets the value for the given source in the previous versions map func (hlv *HybridLogicalVector) SetPreviousVersion(source string, version uint64) { if hlv.PreviousVersions == nil { hlv.PreviousVersions = make(HLVVersions) diff --git a/rest/utilities_testing_blip_client.go b/rest/utilities_testing_blip_client.go index cbb7de1682..39eb9752ca 100644 --- a/rest/utilities_testing_blip_client.go +++ b/rest/utilities_testing_blip_client.go @@ -863,6 +863,11 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { if latestClientRev != nil { clientCV := latestClientRev.version.CV + // safety check - ensure SG is not sending a rev that we already had - ensures changes feed messaging is working correctly to prevent + if clientCV.SourceID == incomingCV.SourceID && clientCV.Value == incomingCV.Value { + require.FailNow(btc.TB(), "incoming revision %v is equal to client revision %v - should've been filtered via changes response before ending up as a rev", incomingCV, clientCV) + } + // incoming rev older than stored client version and comes from a different source - need to resolve if incomingCV.Value < clientCV.Value && incomingCV.SourceID != clientCV.SourceID { btc.TB().Logf("Detected conflict on pull of doc %q (clientCV:%v - incomingCV:%v incomingHLV:%#v)", docID, clientCV, incomingCV, incomingHLV) From ea85414a0c495baa6698594279fa12950378e569 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Tue, 14 Jan 2025 12:47:22 +0000 Subject: [PATCH 08/17] realign delete/doc update codepaths --- rest/utilities_testing_blip_client.go | 56 +++++++++++++++++---------- 1 file changed, 36 insertions(+), 20 deletions(-) diff --git a/rest/utilities_testing_blip_client.go b/rest/utilities_testing_blip_client.go index 39eb9752ca..7e50a35346 100644 --- a/rest/utilities_testing_blip_client.go +++ b/rest/utilities_testing_blip_client.go @@ -596,7 +596,7 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { defer doc.lock.Unlock() var incomingVersion DocVersion - var newVersion DocVersion + var versionToWrite DocVersion var hlv db.HybridLogicalVector if btc.UseHLV() { var incomingHLV *db.HybridLogicalVector @@ -609,32 +609,48 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { require.NoError(btr.TB(), err, "error parsing version %q: %v", revID, err) incomingVersion = DocVersion{CV: incomingCV} - clientCV := doc._currentVersion(btc.TB()) - // incoming rev older than stored client version and comes from a different source - need to resolve - if incomingCV.Value < clientCV.Value && incomingCV.SourceID != clientCV.SourceID { - btc.TB().Logf("Detected conflict on pull of doc %q (clientCV:%v - incomingCV:%v incomingHLV:%#v)", docID, clientCV, incomingCV, incomingHLV) - switch btc.BlipTesterClientOpts.ConflictResolver { - case ConflictResolverLastWriteWins: - // generate a new version for the resolution and write it to the remote HLV - v := db.Version{SourceID: fmt.Sprintf("btc-%d", btc.id), Value: uint64(time.Now().UnixNano())} - require.NoError(btc.TB(), hlv.AddVersion(v), "couldn't add incoming HLV into client HLV") - newVersion = DocVersion{CV: v} - hlv.SetPreviousVersion(incomingCV.SourceID, incomingCV.Value) - default: - btc.TB().Fatalf("Unknown conflict resolver %q - cannot resolve detected conflict", btc.BlipTesterClientOpts.ConflictResolver) + // fetch client's latest version to do conflict check and resolution + latestClientRev, err := doc._latestRev() + require.NoError(btc.TB(), err, "couldn't get latest revision for doc %q", docID) + if latestClientRev != nil { + clientCV := latestClientRev.version.CV + + // safety check - ensure SG is not sending a rev that we already had - ensures changes feed messaging is working correctly to prevent + if clientCV.SourceID == incomingCV.SourceID && clientCV.Value == incomingCV.Value { + require.FailNow(btc.TB(), "incoming revision %v is equal to client revision %v - should've been filtered via changes response before ending up as a rev", incomingCV, clientCV) + } + + // incoming rev older than stored client version and comes from a different source - need to resolve + if incomingCV.Value < clientCV.Value && incomingCV.SourceID != clientCV.SourceID { + btc.TB().Logf("Detected conflict on pull of doc %q (clientCV:%v - incomingCV:%v incomingHLV:%#v)", docID, clientCV, incomingCV, incomingHLV) + switch btc.BlipTesterClientOpts.ConflictResolver { + case ConflictResolverLastWriteWins: + // local wins so write the local body back as a new resolved version (based on incoming HLV) to push + body = latestClientRev.body + v := db.Version{SourceID: fmt.Sprintf("btc-%d", btc.id), Value: uint64(time.Now().UnixNano())} + require.NoError(btc.TB(), hlv.AddVersion(v), "couldn't add incoming HLV into client HLV") + versionToWrite = DocVersion{CV: v} + hlv.SetPreviousVersion(incomingCV.SourceID, incomingCV.Value) + default: + btc.TB().Fatalf("Unknown conflict resolver %q - cannot resolve detected conflict", btc.BlipTesterClientOpts.ConflictResolver) + } + } else { + // no conflict - accept incoming rev + versionToWrite = DocVersion{CV: incomingCV} } } else { - newVersion = DocVersion{CV: incomingCV} + // no existing rev - accept incoming rev + versionToWrite = DocVersion{CV: incomingCV} } - require.NoError(btc.TB(), hlv.AddVersion(newVersion.CV), "couldn't add newVersion CV into doc HLV") + require.NoError(btc.TB(), hlv.AddVersion(versionToWrite.CV), "couldn't add new CV into doc HLV") } else { - newVersion = DocVersion{RevTreeID: revID} - incomingVersion = newVersion + versionToWrite = DocVersion{RevTreeID: revID} + incomingVersion = versionToWrite } - docRev := clientDocRev{ + clientSeq: newClientSeq, - version: newVersion, + version: versionToWrite, body: body, HLV: hlv, isDelete: true, From 1ee5e290e7dc2d652d25b9f2a86d926a4f067624 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Tue, 14 Jan 2025 12:48:07 +0000 Subject: [PATCH 09/17] Improve logging and print global peer state after failing to wait for converging versions --- topologytest/hlv_test.go | 20 ++++++++++++++++---- topologytest/version_test.go | 2 +- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/topologytest/hlv_test.go b/topologytest/hlv_test.go index e5e06835d7..c66b0e134a 100644 --- a/topologytest/hlv_test.go +++ b/topologytest/hlv_test.go @@ -71,7 +71,7 @@ func waitForTombstoneVersion(t *testing.T, dsName base.ScopeAndCollectionName, p // waitForConvergingVersion waits for the same document version to reach all peers. func waitForConvergingVersion(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID string) { t.Logf("waiting for converged doc versions across all peers") - require.EventuallyWithT(t, func(c *assert.CollectT) { + require.EventuallyWithTf(t, func(c *assert.CollectT) { for peerAid, peerA := range peers.SortedPeers() { docMetaA, bodyA := peerA.GetDocument(dsName, docID) for peerBid, peerB := range peers.SortedPeers() { @@ -84,7 +84,19 @@ func waitForConvergingVersion(t *testing.T, dsName base.ScopeAndCollectionName, require.Equalf(c, bodyA, bodyB, "body mismatch: %s:%s != %s:%s", peerAid, bodyA, peerBid, bodyB) } } - }, totalWaitTime, pollInterval) + }, totalWaitTime, pollInterval, "Peers did not converge on version for doc %q\nGlobal state for all peers:\n%s", docID, peers.PrintGlobalDocState(t, dsName, docID)) +} + +// PrintGlobalDocState returns the current state of a document across all peers, and also logs it on `t`. +func (p Peers) PrintGlobalDocState(t testing.TB, dsName base.ScopeAndCollectionName, docID string) string { + var globalState strings.Builder + for peerName, peer := range p { + docMeta, body := peer.GetDocument(dsName, docID) + globalState.WriteString(fmt.Sprintf("====\npeer(%s)\n----\n%#v\nbody:%v\n", peerName, docMeta, body)) + } + globalStateStr := globalState.String() + t.Logf("Global doc %q state for all peers:\n%s", docID, globalStateStr) + return globalStateStr } // removeSyncGatewayBackingPeers will check if there is sync gateway in topology, if so will track the backing CBS @@ -125,7 +137,7 @@ func updateConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, pee } docBody := []byte(fmt.Sprintf(`{"activePeer": "%s", "topology": "%s", "action": "update"}`, peerName, topologyDescription)) docVersion := peer.WriteDocument(dsName, docID, docBody) - t.Logf("updateVersion: %#v", docVersion.docMeta) + t.Logf("%s - updateVersion: %#v", peerName, docVersion.docMeta) } } @@ -137,7 +149,7 @@ func deleteConflictDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers continue } deleteVersion := peer.DeleteDocument(dsName, docID) - t.Logf("deleteVersion: %#v", deleteVersion) + t.Logf("%s - deleteVersion: %#v", peerName, deleteVersion) } } diff --git a/topologytest/version_test.go b/topologytest/version_test.go index 45ae1f2e5e..387d005053 100644 --- a/topologytest/version_test.go +++ b/topologytest/version_test.go @@ -69,7 +69,7 @@ func DocMetadataFromDocument(doc *db.Document) DocMetadata { } func (v DocMetadata) GoString() string { - return fmt.Sprintf("DocMetadata{\nDocID:%s\n\tRevTreeID:%s\n\tHLV:%+v\n\tMou:%+v\n\tCas:%d\n\tImplicitHLV:%+v\n}", v.DocID, v.RevTreeID, v.HLV, v.Mou, v.Cas, v.ImplicitHLV) + return fmt.Sprintf("DocMetadata{\n\tDocID: %q,\n\tRevTreeID:%q,\n\tHLV:%+v,\n\tMou:%+v,\n\tCas:%d,\n\tImplicitHLV:%+v,\n}", v.DocID, v.RevTreeID, v.HLV, v.Mou, v.Cas, v.ImplicitHLV) } // DocMetadataFromDocVersion returns metadata DocVersion from the given document and version. From 1b058a697f85fffcbc7acabaa535d59899d11077 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Tue, 14 Jan 2025 12:51:57 +0000 Subject: [PATCH 10/17] Remove unused method --- rest/utilities_testing_blip_client.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/rest/utilities_testing_blip_client.go b/rest/utilities_testing_blip_client.go index 7e50a35346..4e658c1258 100644 --- a/rest/utilities_testing_blip_client.go +++ b/rest/utilities_testing_blip_client.go @@ -299,12 +299,6 @@ func (cd *clientDoc) currentVersion(t testing.TB) *db.Version { return &rev.version.CV } -func (cd *clientDoc) _currentVersion(t testing.TB) *db.Version { - rev, err := cd._latestRev() - require.NoError(t, err) - return &rev.version.CV -} - type BlipTesterCollectionClient struct { parent *BlipTesterClient From d08b9e4844781b9b41564bf0adcc3e6fe599d49a Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Tue, 14 Jan 2025 13:21:40 +0000 Subject: [PATCH 11/17] Allow BlipTesterClient conflict resolution to resolve a server tombstone in conflict to a non-deletion client doc --- rest/utilities_testing_blip_client.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/rest/utilities_testing_blip_client.go b/rest/utilities_testing_blip_client.go index 4e658c1258..54280db9ac 100644 --- a/rest/utilities_testing_blip_client.go +++ b/rest/utilities_testing_blip_client.go @@ -592,6 +592,7 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { var incomingVersion DocVersion var versionToWrite DocVersion var hlv db.HybridLogicalVector + isDelete := true if btc.UseHLV() { var incomingHLV *db.HybridLogicalVector if revHistory != "" { @@ -619,8 +620,9 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { btc.TB().Logf("Detected conflict on pull of doc %q (clientCV:%v - incomingCV:%v incomingHLV:%#v)", docID, clientCV, incomingCV, incomingHLV) switch btc.BlipTesterClientOpts.ConflictResolver { case ConflictResolverLastWriteWins: - // local wins so write the local body back as a new resolved version (based on incoming HLV) to push + // local wins so write the local back as a new resolved version (based on incoming HLV) to push body = latestClientRev.body + isDelete = latestClientRev.isDelete v := db.Version{SourceID: fmt.Sprintf("btc-%d", btc.id), Value: uint64(time.Now().UnixNano())} require.NoError(btc.TB(), hlv.AddVersion(v), "couldn't add incoming HLV into client HLV") versionToWrite = DocVersion{CV: v} @@ -647,7 +649,7 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { version: versionToWrite, body: body, HLV: hlv, - isDelete: true, + isDelete: isDelete, message: msg, } From bbe66bb0193c83d9f7531a78e5b47eb357640934 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Tue, 14 Jan 2025 16:26:19 +0000 Subject: [PATCH 12/17] Delay PrintGlobalDocState evaluation until failed EventuallyWithT --- topologytest/hlv_test.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/topologytest/hlv_test.go b/topologytest/hlv_test.go index c66b0e134a..69d0494d75 100644 --- a/topologytest/hlv_test.go +++ b/topologytest/hlv_test.go @@ -71,7 +71,7 @@ func waitForTombstoneVersion(t *testing.T, dsName base.ScopeAndCollectionName, p // waitForConvergingVersion waits for the same document version to reach all peers. func waitForConvergingVersion(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID string) { t.Logf("waiting for converged doc versions across all peers") - require.EventuallyWithTf(t, func(c *assert.CollectT) { + if !assert.EventuallyWithT(t, func(c *assert.CollectT) { for peerAid, peerA := range peers.SortedPeers() { docMetaA, bodyA := peerA.GetDocument(dsName, docID) for peerBid, peerB := range peers.SortedPeers() { @@ -84,7 +84,10 @@ func waitForConvergingVersion(t *testing.T, dsName base.ScopeAndCollectionName, require.Equalf(c, bodyA, bodyB, "body mismatch: %s:%s != %s:%s", peerAid, bodyA, peerBid, bodyB) } } - }, totalWaitTime, pollInterval, "Peers did not converge on version for doc %q\nGlobal state for all peers:\n%s", docID, peers.PrintGlobalDocState(t, dsName, docID)) + }, totalWaitTime, pollInterval) { + // do if !assert->require pattern so we can delay PrintGlobalDocState evaluation + require.FailNowf(t, "Peers did not converge on version", "Global state for doc %q on all peers:\n%s", docID, peers.PrintGlobalDocState(t, dsName, docID)) + } } // PrintGlobalDocState returns the current state of a document across all peers, and also logs it on `t`. From afd96ee1a8e96b9aae4d87a3ac3f2ba090b1de74 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Tue, 14 Jan 2025 16:52:56 +0000 Subject: [PATCH 13/17] Skip the two failing multi-actor conflict tests for the two affected topologies --- topologytest/multi_actor_conflict_test.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/topologytest/multi_actor_conflict_test.go b/topologytest/multi_actor_conflict_test.go index 7825f2a0ff..1b4cd5d30c 100644 --- a/topologytest/multi_actor_conflict_test.go +++ b/topologytest/multi_actor_conflict_test.go @@ -70,6 +70,13 @@ func TestMultiActorConflictUpdate(t *testing.T) { func TestMultiActorConflictDelete(t *testing.T) { for _, topology := range append(simpleTopologies, Topologies...) { t.Run(topology.description, func(t *testing.T) { + switch topology.description { + case "2x CBL<->SG<->CBS XDCR only 1.3", + "CBL<->SG<->CBS1 CBS1<->CBS2 1.2": + // FIXME: CBG-4458 + t.Skip("CBG-4458") + } + collectionName, peers, replications := setupTests(t, topology) replications.Stop() @@ -103,6 +110,13 @@ func TestMultiActorConflictDelete(t *testing.T) { func TestMultiActorConflictResurrect(t *testing.T) { for _, topology := range append(simpleTopologies, Topologies...) { t.Run(topology.description, func(t *testing.T) { + switch topology.description { + case "2x CBL<->SG<->CBS XDCR only 1.3", + "CBL<->SG<->CBS1 CBS1<->CBS2 1.2": + // FIXME: CBG-4458 + t.Skip("CBG-4458") + } + collectionName, peers, replications := setupTests(t, topology) replications.Stop() From 3568d2ff06ee2acba3da2f3233039c981257dee7 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Tue, 14 Jan 2025 17:10:43 +0000 Subject: [PATCH 14/17] Update comment for ExtractHLVFromBlipMessage to clarify example when multiple mv and pv are present --- db/hybrid_logical_vector.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/db/hybrid_logical_vector.go b/db/hybrid_logical_vector.go index e8a6f5f6d0..28cb1350bf 100644 --- a/db/hybrid_logical_vector.go +++ b/db/hybrid_logical_vector.go @@ -492,8 +492,8 @@ func appendRevocationMacroExpansions(currentSpec []sgbucket.MacroExpansionSpec, // ExtractHLVFromBlipMessage extracts the full HLV a string in the format seen over Blip // blip string may be the following formats // 1. cv only: cv -// 2. cv and pv: cv;pv -// 3. cv, pv, and mv: cv;mv;pv +// 2. cv and pv: cv;pv1,pv2 +// 3. cv, pv, and mv: cv;mv1,mv2;pv1,pv2 // // Function will return list of revIDs if legacy rev ID was found in the HLV history section (PV) // TODO: CBG-3662 - Optimise once we've settled on and tested the format with CBL From 1b42fc22925d70c8421da33dfdf2c954666c643d Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Tue, 14 Jan 2025 17:31:30 +0000 Subject: [PATCH 15/17] Fix misused require.FailNow where failureMessage was absent --- rest/utilities_testing_blip_client.go | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/rest/utilities_testing_blip_client.go b/rest/utilities_testing_blip_client.go index 54280db9ac..a08768a5ef 100644 --- a/rest/utilities_testing_blip_client.go +++ b/rest/utilities_testing_blip_client.go @@ -136,7 +136,7 @@ func (c *BlipTesterCollectionClient) OneShotDocsSince(ctx context.Context, since continue } else if latestDocSeq := doc.latestSeq(); latestDocSeq != seq { // this entry should've been cleaned up from _seqStore - require.FailNow(c.TB(), "seq %d found in _seqStore but latestSeq for doc %d - this should've been pruned out!", seq, latestDocSeq) + require.FailNowf(c.TB(), "found old seq in _seqStore", "seq %d found in _seqStore but latestSeq for doc %d - this should've been pruned out!", seq, latestDocSeq) continue } if !yield(seq, doc) { @@ -384,7 +384,7 @@ func (btcc *BlipTesterCollectionClient) _getClientDoc(docID string) (*clientDoc, } clientDoc, ok := btcc._seqStore[seq] if !ok { - require.FailNow(btcc.TB(), "docID %q found in _seqFromDocID but seq %d not in _seqStore %v", docID, seq, btcc._seqStore) + require.FailNowf(btcc.TB(), "seq not found in _seqStore", "docID %q found in _seqFromDocID but seq %d not in _seqStore %v", docID, seq, btcc._seqStore) return nil, false } return clientDoc, ok @@ -612,7 +612,7 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { // safety check - ensure SG is not sending a rev that we already had - ensures changes feed messaging is working correctly to prevent if clientCV.SourceID == incomingCV.SourceID && clientCV.Value == incomingCV.Value { - require.FailNow(btc.TB(), "incoming revision %v is equal to client revision %v - should've been filtered via changes response before ending up as a rev", incomingCV, clientCV) + require.FailNowf(btc.TB(), "incoming revision is equal to client revision", "incoming revision %v is equal to client revision %v - should've been filtered via changes response before ending up as a rev", incomingCV, clientCV) } // incoming rev older than stored client version and comes from a different source - need to resolve @@ -696,7 +696,7 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { response.SetError("HTTP", http.StatusUnprocessableEntity, "test code intentionally rejected delta") return } - require.FailNow(btr.TB(), "expected delta rev message to be sent without noreply flag: %+v", msg) + require.FailNowf(btr.TB(), "expected delta rev message to be sent without noreply flag", "msg: %+v", msg) } // unmarshal body to extract deltaSrc @@ -707,7 +707,7 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { var old db.Body doc, ok := btcr.getClientDoc(docID) if !ok { - require.FailNow(btc.TB(), "docID %q not found in _seqFromDocID", docID) + require.FailNowf(btc.TB(), "doc not found", "docID %q not found", docID) return } var deltaSrcVersion DocVersion @@ -877,7 +877,7 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { // safety check - ensure SG is not sending a rev that we already had - ensures changes feed messaging is working correctly to prevent if clientCV.SourceID == incomingCV.SourceID && clientCV.Value == incomingCV.Value { - require.FailNow(btc.TB(), "incoming revision %v is equal to client revision %v - should've been filtered via changes response before ending up as a rev", incomingCV, clientCV) + require.FailNowf(btc.TB(), "incoming revision is equal to client revision", "incoming revision %v is equal to client revision %v - should've been filtered via changes response before ending up as a rev", incomingCV, clientCV) } // incoming rev older than stored client version and comes from a different source - need to resolve @@ -1045,7 +1045,7 @@ func (btc *BlipTesterCollectionClient) updateLastReplicatedVersion(docID string, defer btc.seqLock.Unlock() doc, ok := btc._getClientDoc(docID) if !ok { - require.FailNow(btc.TB(), "docID %q not found in _seqFromDocID", docID) + require.FailNowf(btc.TB(), "doc not found", "docID %q", docID) return } doc.setLatestServerVersion(version) @@ -1056,7 +1056,7 @@ func (btc *BlipTesterCollectionClient) getLastReplicatedVersion(docID string) (v defer btc.seqLock.Unlock() doc, ok := btc._getClientDoc(docID) if !ok { - require.FailNow(btc.TB(), "docID %q not found in _seqFromDocID", docID) + require.FailNowf(btc.TB(), "doc not found", "docID %q", docID) return DocVersion{}, false } doc.lock.RLock() @@ -1113,7 +1113,7 @@ func (btcRunner *BlipTestClientRunner) NewBlipTesterClientOptsWithRT(rt *RestTes opts.ConflictResolver = ConflictResolverDefault } if !opts.ConflictResolver.IsValid() { - require.FailNow(btcRunner.TB(), "invalid conflict resolver %q", opts.ConflictResolver) + require.FailNowf(btcRunner.TB(), "invalid conflict resolver", "invalid conflict resolver %q", opts.ConflictResolver) } if opts.SourceID == "" { opts.SourceID = "blipclient" @@ -1258,7 +1258,7 @@ func (btcRunner *BlipTestClientRunner) Collection(clientID uint32, collectionNam return collectionClient } } - require.FailNow(btcRunner.clients[clientID].TB(), "Could not find collection %s in BlipTesterClient", collectionName) + require.FailNowf(btcRunner.clients[clientID].TB(), "unknown collection", "Could not find collection %s in BlipTesterClient", collectionName) return nil } @@ -1573,7 +1573,7 @@ func (btc *BlipTesterCollectionClient) StartPullSince(options BlipTesterPullOpti errorDomain := subChangesResponse.Properties["Error-Domain"] errorCode := subChangesResponse.Properties["Error-Code"] if errorDomain != "" && errorCode != "" { - require.FailNowf(btc.TB(), "error %s %s from subChanges with body: %s", errorDomain, errorCode, string(rspBody)) + require.FailNowf(btc.TB(), "error from subchanges", "error %s %s from subChanges with body: %s", errorDomain, errorCode, string(rspBody)) } } @@ -1856,7 +1856,7 @@ func (btc *BlipTesterCollectionClient) ProcessInlineAttachments(inputBody []byte // push the stub as-is continue } - require.FailNow(btc.TB(), "couldn't find data or stub property for inline attachment %s:%v", attachmentName, inlineAttachment) + require.FailNowf(btc.TB(), "couldn't find data or stub property for inline attachment", "att name %s:%v", attachmentName, inlineAttachment) } // Transform inline attachment data into metadata @@ -1899,7 +1899,7 @@ func (btc *BlipTesterCollectionClient) GetVersion(docID string, docVersion DocVe rev, ok := doc._revisionsBySeq[revSeq] if !ok { - require.FailNow(btc.TB(), "seq %q for docID %q found but no rev in _seqStore", revSeq, docID) + require.FailNowf(btc.TB(), "no rev seq in _seqStore", "seq %q for docID %q found but no rev in _seqStore", revSeq, docID) return nil, false } From 85a13767ca8faf81daa0a63319f95edc120370b4 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Tue, 21 Jan 2025 18:18:16 +0000 Subject: [PATCH 16/17] fix post-rebase --- db/hybrid_logical_vector.go | 2 +- topologytest/hlv_test.go | 4 ++-- topologytest/multi_actor_conflict_test.go | 14 -------------- 3 files changed, 3 insertions(+), 17 deletions(-) diff --git a/db/hybrid_logical_vector.go b/db/hybrid_logical_vector.go index 28cb1350bf..1954aa3eb7 100644 --- a/db/hybrid_logical_vector.go +++ b/db/hybrid_logical_vector.go @@ -273,7 +273,7 @@ func (hlv *HybridLogicalVector) InvalidateMV() { if source == hlv.SourceID { continue } - hlv.setPreviousVersion(source, value) + hlv.SetPreviousVersion(source, value) } hlv.MergeVersions = nil } diff --git a/topologytest/hlv_test.go b/topologytest/hlv_test.go index 69d0494d75..20d5a4632e 100644 --- a/topologytest/hlv_test.go +++ b/topologytest/hlv_test.go @@ -69,7 +69,7 @@ func waitForTombstoneVersion(t *testing.T, dsName base.ScopeAndCollectionName, p } // waitForConvergingVersion waits for the same document version to reach all peers. -func waitForConvergingVersion(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID string) { +func waitForConvergingVersion(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, replications Replications, docID string) { t.Logf("waiting for converged doc versions across all peers") if !assert.EventuallyWithT(t, func(c *assert.CollectT) { for peerAid, peerA := range peers.SortedPeers() { @@ -86,7 +86,7 @@ func waitForConvergingVersion(t *testing.T, dsName base.ScopeAndCollectionName, } }, totalWaitTime, pollInterval) { // do if !assert->require pattern so we can delay PrintGlobalDocState evaluation - require.FailNowf(t, "Peers did not converge on version", "Global state for doc %q on all peers:\n%s", docID, peers.PrintGlobalDocState(t, dsName, docID)) + require.FailNowf(t, "Peers did not converge on version", "Global state for doc %q on all peers:\n%s\nReplications: %s", docID, peers.PrintGlobalDocState(t, dsName, docID), replications) } } diff --git a/topologytest/multi_actor_conflict_test.go b/topologytest/multi_actor_conflict_test.go index 1b4cd5d30c..7825f2a0ff 100644 --- a/topologytest/multi_actor_conflict_test.go +++ b/topologytest/multi_actor_conflict_test.go @@ -70,13 +70,6 @@ func TestMultiActorConflictUpdate(t *testing.T) { func TestMultiActorConflictDelete(t *testing.T) { for _, topology := range append(simpleTopologies, Topologies...) { t.Run(topology.description, func(t *testing.T) { - switch topology.description { - case "2x CBL<->SG<->CBS XDCR only 1.3", - "CBL<->SG<->CBS1 CBS1<->CBS2 1.2": - // FIXME: CBG-4458 - t.Skip("CBG-4458") - } - collectionName, peers, replications := setupTests(t, topology) replications.Stop() @@ -110,13 +103,6 @@ func TestMultiActorConflictDelete(t *testing.T) { func TestMultiActorConflictResurrect(t *testing.T) { for _, topology := range append(simpleTopologies, Topologies...) { t.Run(topology.description, func(t *testing.T) { - switch topology.description { - case "2x CBL<->SG<->CBS XDCR only 1.3", - "CBL<->SG<->CBS1 CBS1<->CBS2 1.2": - // FIXME: CBG-4458 - t.Skip("CBG-4458") - } - collectionName, peers, replications := setupTests(t, topology) replications.Stop() From f7014ad46d9e7e003cef02e38d970bf91df72fee Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Tue, 21 Jan 2025 18:27:01 +0000 Subject: [PATCH 17/17] align with release/anemone --- db/hybrid_logical_vector.go | 6 +++--- rest/utilities_testing_blip_client.go | 7 ++++--- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/db/hybrid_logical_vector.go b/db/hybrid_logical_vector.go index 1954aa3eb7..4ef559db7c 100644 --- a/db/hybrid_logical_vector.go +++ b/db/hybrid_logical_vector.go @@ -491,9 +491,9 @@ func appendRevocationMacroExpansions(currentSpec []sgbucket.MacroExpansionSpec, // ExtractHLVFromBlipMessage extracts the full HLV a string in the format seen over Blip // blip string may be the following formats -// 1. cv only: cv -// 2. cv and pv: cv;pv1,pv2 -// 3. cv, pv, and mv: cv;mv1,mv2;pv1,pv2 +// 1. cv only: cv +// 2. cv and pv: cv;pv1,pv2 +// 3. cv+mv and pv: cv,mv1,mv2;pv1,pv2 // // Function will return list of revIDs if legacy rev ID was found in the HLV history section (PV) // TODO: CBG-3662 - Optimise once we've settled on and tested the format with CBL diff --git a/rest/utilities_testing_blip_client.go b/rest/utilities_testing_blip_client.go index a08768a5ef..1311711e6a 100644 --- a/rest/utilities_testing_blip_client.go +++ b/rest/utilities_testing_blip_client.go @@ -596,6 +596,7 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { if btc.UseHLV() { var incomingHLV *db.HybridLogicalVector if revHistory != "" { + // TODO: Replace with new Beta version/handling incomingHLV, err = db.FromHistoryForHLV(revHistory) require.NoError(btr.TB(), err, "error extracting HLV history %q: %v", revHistory, err) hlv = *incomingHLV @@ -1115,11 +1116,11 @@ func (btcRunner *BlipTestClientRunner) NewBlipTesterClientOptsWithRT(rt *RestTes if !opts.ConflictResolver.IsValid() { require.FailNowf(btcRunner.TB(), "invalid conflict resolver", "invalid conflict resolver %q", opts.ConflictResolver) } - if opts.SourceID == "" { - opts.SourceID = "blipclient" - } id, err := uuid.NewRandom() require.NoError(btcRunner.TB(), err) + if opts.SourceID == "" { + opts.SourceID = fmt.Sprintf("btc-%d", id.ID()) + } client = &BlipTesterClient{ BlipTesterClientOpts: *opts,