Skip to content

Commit 8a3e734

Browse files
gregns1bbrks
authored andcommitted
CBG-3211: Add PutExistingRev for HLV (#6515)
* CBG-3210: Updating HLV on Put And PutExistingRev (#6366) * CBG-3209: Add cv index and retrieval for revision cache (#6491) * CBG-3209: changes for retreival of a doc from the rev cache via CV with backwards compatability in mind * fix failing test, add commnets * fix lint * updated to address comments * rebase chnages needed * updated to tests that call Get on revision cache * updates based of new direction with PR + addressing comments * updated to fix panic * updated to fix another panic * address comments * updates based off commnets * remove commnented out line * updates to skip test relying on import and update PutExistingRev doc update type to update HLV * updates to remove code adding rev id to value inside addToRevMapPostLoad. Added code to assign this inside value.store * remove redundent code * Add support for PutExistingCurrentVersion * updated to remove function not used anymore * remove duplicated code from dev time * fix linter errors + add assertions on body of doc update * address commnets * updates to add further test cases for AddNewerVersions function + fix some incorrect logic * updates to chnage helper function for creation of doc for tests. Also adress further comments * lint error * address comments, add new merge function for merge versions when hlv is in conflict. * updates to remove test case and test * remove unused function * rebase * missed current version name change * more missing updates to name changes
1 parent efb43ba commit 8a3e734

File tree

5 files changed

+397
-5
lines changed

5 files changed

+397
-5
lines changed

db/crud.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1059,6 +1059,103 @@ func (db *DatabaseCollectionWithUser) Put(ctx context.Context, docid string, bod
10591059
return newRevID, doc, err
10601060
}
10611061

1062+
func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Context, newDoc *Document, docHLV HybridLogicalVector, existingDoc *sgbucket.BucketDocument) (doc *Document, cv *SourceAndVersion, newRevID string, err error) {
1063+
var matchRev string
1064+
if existingDoc != nil {
1065+
doc, unmarshalErr := unmarshalDocumentWithXattr(ctx, newDoc.ID, existingDoc.Body, existingDoc.Xattr, existingDoc.UserXattr, existingDoc.Cas, DocUnmarshalRev)
1066+
if unmarshalErr != nil {
1067+
return nil, nil, "", base.HTTPErrorf(http.StatusBadRequest, "Error unmarshaling exsiting doc")
1068+
}
1069+
matchRev = doc.CurrentRev
1070+
}
1071+
generation, _ := ParseRevID(ctx, matchRev)
1072+
if generation < 0 {
1073+
return nil, nil, "", base.HTTPErrorf(http.StatusBadRequest, "Invalid revision ID")
1074+
}
1075+
generation++
1076+
1077+
docUpdateEvent := ExistingVersion
1078+
allowImport := db.UseXattrs()
1079+
doc, newRevID, err = db.updateAndReturnDoc(ctx, newDoc.ID, allowImport, newDoc.DocExpiry, nil, docUpdateEvent, existingDoc, func(doc *Document) (resultDoc *Document, resultAttachmentData AttachmentData, createNewRevIDSkipped bool, updatedExpiry *uint32, resultErr error) {
1080+
// (Be careful: this block can be invoked multiple times if there are races!)
1081+
1082+
var isSgWrite bool
1083+
var crc32Match bool
1084+
1085+
// Is this doc an sgWrite?
1086+
if doc != nil {
1087+
isSgWrite, crc32Match, _ = doc.IsSGWrite(ctx, nil)
1088+
if crc32Match {
1089+
db.dbStats().Database().Crc32MatchCount.Add(1)
1090+
}
1091+
}
1092+
1093+
// If the existing doc isn't an SG write, import prior to updating
1094+
if doc != nil && !isSgWrite && db.UseXattrs() {
1095+
err := db.OnDemandImportForWrite(ctx, newDoc.ID, doc, newDoc.Deleted)
1096+
if err != nil {
1097+
return nil, nil, false, nil, err
1098+
}
1099+
}
1100+
1101+
// Conflict check here
1102+
// if doc has no HLV defined this is a new doc we haven't seen before, skip conflict check
1103+
if doc.HLV == nil {
1104+
doc.HLV = &HybridLogicalVector{}
1105+
addNewerVersionsErr := doc.HLV.AddNewerVersions(docHLV)
1106+
if addNewerVersionsErr != nil {
1107+
return nil, nil, false, nil, addNewerVersionsErr
1108+
}
1109+
} else {
1110+
if !docHLV.IsInConflict(*doc.HLV) {
1111+
// update hlv for all newer incoming source version pairs
1112+
addNewerVersionsErr := doc.HLV.AddNewerVersions(docHLV)
1113+
if addNewerVersionsErr != nil {
1114+
return nil, nil, false, nil, addNewerVersionsErr
1115+
}
1116+
} else {
1117+
base.InfofCtx(ctx, base.KeyCRUD, "conflict detected between the two HLV's for doc %s", base.UD(doc.ID))
1118+
// cancel rest of update, HLV needs to be sent back to client with merge versions populated
1119+
return nil, nil, false, nil, base.HTTPErrorf(http.StatusConflict, "Document revision conflict")
1120+
}
1121+
}
1122+
1123+
// Process the attachments, replacing bodies with digests.
1124+
newAttachments, err := db.storeAttachments(ctx, doc, newDoc.DocAttachments, generation, matchRev, nil)
1125+
if err != nil {
1126+
return nil, nil, false, nil, err
1127+
}
1128+
1129+
// generate rev id for new arriving doc
1130+
strippedBody, _ := stripInternalProperties(newDoc._body)
1131+
encoding, err := base.JSONMarshalCanonical(strippedBody)
1132+
if err != nil {
1133+
return nil, nil, false, nil, err
1134+
}
1135+
newRev := CreateRevIDWithBytes(generation, matchRev, encoding)
1136+
1137+
if err := doc.History.addRevision(newDoc.ID, RevInfo{ID: newRev, Parent: matchRev, Deleted: newDoc.Deleted}); err != nil {
1138+
base.InfofCtx(ctx, base.KeyCRUD, "Failed to add revision ID: %s, for doc: %s, error: %v", newRev, base.UD(newDoc.ID), err)
1139+
return nil, nil, false, nil, base.ErrRevTreeAddRevFailure
1140+
}
1141+
1142+
newDoc.RevID = newRev
1143+
1144+
return newDoc, newAttachments, false, nil, nil
1145+
})
1146+
1147+
if doc != nil && doc.HLV != nil {
1148+
if cv == nil {
1149+
cv = &SourceAndVersion{}
1150+
}
1151+
source, version := doc.HLV.GetCurrentVersion()
1152+
cv.SourceID = source
1153+
cv.Version = version
1154+
}
1155+
1156+
return doc, cv, newRevID, err
1157+
}
1158+
10621159
// Adds an existing revision to a document along with its history (list of rev IDs.)
10631160
func (db *DatabaseCollectionWithUser) PutExistingRev(ctx context.Context, newDoc *Document, docHistory []string, noConflicts bool, forceAllConflicts bool, existingDoc *sgbucket.BucketDocument, docUpdateEvent DocUpdateType) (doc *Document, newRevID string, err error) {
10641161
return db.PutExistingRevWithConflictResolution(ctx, newDoc, docHistory, noConflicts, nil, forceAllConflicts, existingDoc, docUpdateEvent)

db/crud_test.go

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"context"
1515
"encoding/json"
1616
"log"
17+
"reflect"
1718
"testing"
1819
"time"
1920

@@ -1675,3 +1676,194 @@ func TestAssignSequenceReleaseLoop(t *testing.T) {
16751676
releasedSequenceCount := db.DbStats.Database().SequenceReleasedCount.Value() - startReleasedSequenceCount
16761677
assert.Equal(t, int64(expectedReleasedSequenceCount), releasedSequenceCount)
16771678
}
1679+
1680+
// TestPutExistingCurrentVersion:
1681+
// - Put a document in a db
1682+
// - Assert on the update to HLV after that PUT
1683+
// - Construct a HLV to represent the doc created locally being updated on a client
1684+
// - Call PutExistingCurrentVersion simulating doc update arriving over replicator
1685+
// - Assert that the doc's HLV in the bucket has been updated correctly with the CV, PV and cvCAS
1686+
func TestPutExistingCurrentVersion(t *testing.T) {
1687+
db, ctx := setupTestDB(t)
1688+
defer db.Close(ctx)
1689+
1690+
bucketUUID := db.BucketUUID
1691+
collection := GetSingleDatabaseCollectionWithUser(t, db)
1692+
1693+
// create a new doc
1694+
key := "doc1"
1695+
body := Body{"key1": "value1"}
1696+
1697+
rev, _, err := collection.Put(ctx, key, body)
1698+
require.NoError(t, err)
1699+
1700+
// assert on HLV on that above PUT
1701+
syncData, err := collection.GetDocSyncData(ctx, "doc1")
1702+
assert.NoError(t, err)
1703+
uintCAS := base.HexCasToUint64(syncData.Cas)
1704+
assert.Equal(t, bucketUUID, syncData.HLV.SourceID)
1705+
assert.Equal(t, uintCAS, syncData.HLV.Version)
1706+
assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS)
1707+
1708+
// store the cas version allocated to the above doc creation for creation of incoming HLV later in test
1709+
originalDocVersion := syncData.HLV.Version
1710+
1711+
// PUT an update to the above doc
1712+
body = Body{"key1": "value11"}
1713+
body[BodyRev] = rev
1714+
_, _, err = collection.Put(ctx, key, body)
1715+
require.NoError(t, err)
1716+
1717+
// grab the new version for the above update to assert against later in test
1718+
syncData, err = collection.GetDocSyncData(ctx, "doc1")
1719+
assert.NoError(t, err)
1720+
docUpdateVersion := syncData.HLV.Version
1721+
1722+
// construct a mock doc update coming over a replicator
1723+
body = Body{"key1": "value2"}
1724+
newDoc := createTestDocument(key, "", body, false, 0)
1725+
1726+
// construct a HLV that simulates a doc update happening on a client
1727+
// this means moving the current source version pair to PV and adding new sourceID and version pair to CV
1728+
pv := make(map[string]uint64)
1729+
pv[bucketUUID] = originalDocVersion
1730+
// create a version larger than the allocated version above
1731+
incomingVersion := docUpdateVersion + 10
1732+
incomingHLV := HybridLogicalVector{
1733+
SourceID: "test",
1734+
Version: incomingVersion,
1735+
PreviousVersions: pv,
1736+
}
1737+
1738+
// grab the raw doc from the bucket to pass into the PutExistingCurrentVersion function for the above simulation of
1739+
// doc update arriving over replicator
1740+
_, rawDoc, err := collection.GetDocumentWithRaw(ctx, key, DocUnmarshalSync)
1741+
require.NoError(t, err)
1742+
1743+
doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, rawDoc)
1744+
require.NoError(t, err)
1745+
// assert on returned CV
1746+
assert.Equal(t, "test", cv.SourceID)
1747+
assert.Equal(t, incomingVersion, cv.Version)
1748+
assert.Equal(t, []byte(`{"key1":"value2"}`), doc._rawBody)
1749+
1750+
// assert on the sync data from the above update to the doc
1751+
// CV should be equal to CV of update on client but the cvCAS should be updated with the new update and
1752+
// PV should contain the old CV pair
1753+
syncData, err = collection.GetDocSyncData(ctx, "doc1")
1754+
assert.NoError(t, err)
1755+
uintCAS = base.HexCasToUint64(syncData.Cas)
1756+
1757+
assert.Equal(t, "test", syncData.HLV.SourceID)
1758+
assert.Equal(t, incomingVersion, syncData.HLV.Version)
1759+
assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS)
1760+
// update the pv map so we can assert we have correct pv map in HLV
1761+
pv[bucketUUID] = docUpdateVersion
1762+
assert.True(t, reflect.DeepEqual(syncData.HLV.PreviousVersions, pv))
1763+
assert.Equal(t, "3-60b024c44c283b369116c2c2570e8088", syncData.CurrentRev)
1764+
}
1765+
1766+
// TestPutExistingCurrentVersionWithConflict:
1767+
// - Put a document in a db
1768+
// - Assert on the update to HLV after that PUT
1769+
// - Construct a HLV to represent the doc created locally being updated on a client
1770+
// - Call PutExistingCurrentVersion simulating doc update arriving over replicator
1771+
// - Assert conflict between the local HLV for the doc and the incoming mutation is correctly identified
1772+
// - Assert that the doc's HLV in the bucket hasn't been updated
1773+
func TestPutExistingCurrentVersionWithConflict(t *testing.T) {
1774+
db, ctx := setupTestDB(t)
1775+
defer db.Close(ctx)
1776+
1777+
bucketUUID := db.BucketUUID
1778+
collection := GetSingleDatabaseCollectionWithUser(t, db)
1779+
1780+
// create a new doc
1781+
key := "doc1"
1782+
body := Body{"key1": "value1"}
1783+
1784+
_, _, err := collection.Put(ctx, key, body)
1785+
require.NoError(t, err)
1786+
1787+
// assert on the HLV values after the above creation of the doc
1788+
syncData, err := collection.GetDocSyncData(ctx, "doc1")
1789+
assert.NoError(t, err)
1790+
uintCAS := base.HexCasToUint64(syncData.Cas)
1791+
assert.Equal(t, bucketUUID, syncData.HLV.SourceID)
1792+
assert.Equal(t, uintCAS, syncData.HLV.Version)
1793+
assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS)
1794+
1795+
// create a new doc update to simulate a doc update arriving over replicator from, client
1796+
body = Body{"key1": "value2"}
1797+
newDoc := createTestDocument(key, "", body, false, 0)
1798+
incomingHLV := HybridLogicalVector{
1799+
SourceID: "test",
1800+
Version: 1234,
1801+
}
1802+
1803+
// grab the raw doc from the bucket to pass into the PutExistingCurrentVersion function
1804+
_, rawDoc, err := collection.GetDocumentWithRaw(ctx, key, DocUnmarshalSync)
1805+
require.NoError(t, err)
1806+
1807+
// assert that a conflict is correctly identified and the resulting doc and cv are nil
1808+
doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, rawDoc)
1809+
require.Error(t, err)
1810+
assert.ErrorContains(t, err, "Document revision conflict")
1811+
assert.Nil(t, cv)
1812+
assert.Nil(t, doc)
1813+
1814+
// assert persisted doc hlv hasn't been updated
1815+
syncData, err = collection.GetDocSyncData(ctx, "doc1")
1816+
assert.NoError(t, err)
1817+
assert.Equal(t, bucketUUID, syncData.HLV.SourceID)
1818+
assert.Equal(t, uintCAS, syncData.HLV.Version)
1819+
assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS)
1820+
}
1821+
1822+
// TestPutExistingCurrentVersionWithNoExistingDoc:
1823+
// - Purpose of this test is to test PutExistingRevWithBody code pathway where an
1824+
// existing doc is not provided from the bucket into the function simulating a new, not seen
1825+
// before doc entering this code path
1826+
func TestPutExistingCurrentVersionWithNoExistingDoc(t *testing.T) {
1827+
db, ctx := setupTestDB(t)
1828+
defer db.Close(ctx)
1829+
1830+
bucketUUID := db.BucketUUID
1831+
collection := GetSingleDatabaseCollectionWithUser(t, db)
1832+
1833+
// construct a mock doc update coming over a replicator
1834+
body := Body{"key1": "value2"}
1835+
newDoc := createTestDocument("doc2", "", body, false, 0)
1836+
1837+
// construct a HLV that simulates a doc update happening on a client
1838+
// this means moving the current source version pair to PV and adding new sourceID and version pair to CV
1839+
pv := make(map[string]uint64)
1840+
pv[bucketUUID] = 2
1841+
// create a version larger than the allocated version above
1842+
incomingVersion := uint64(2 + 10)
1843+
incomingHLV := HybridLogicalVector{
1844+
SourceID: "test",
1845+
Version: incomingVersion,
1846+
PreviousVersions: pv,
1847+
}
1848+
// call PutExistingCurrentVersion with empty existing doc
1849+
doc, cv, _, err := collection.PutExistingCurrentVersion(ctx, newDoc, incomingHLV, &sgbucket.BucketDocument{})
1850+
require.NoError(t, err)
1851+
assert.NotNil(t, doc)
1852+
// assert on returned CV value
1853+
assert.Equal(t, "test", cv.SourceID)
1854+
assert.Equal(t, incomingVersion, cv.Version)
1855+
assert.Equal(t, []byte(`{"key1":"value2"}`), doc._rawBody)
1856+
1857+
// assert on the sync data from the above update to the doc
1858+
// CV should be equal to CV of update on client but the cvCAS should be updated with the new update and
1859+
// PV should contain the old CV pair
1860+
syncData, err := collection.GetDocSyncData(ctx, "doc2")
1861+
assert.NoError(t, err)
1862+
uintCAS := base.HexCasToUint64(syncData.Cas)
1863+
assert.Equal(t, "test", syncData.HLV.SourceID)
1864+
assert.Equal(t, incomingVersion, syncData.HLV.Version)
1865+
assert.Equal(t, uintCAS, syncData.HLV.CurrentVersionCAS)
1866+
// update the pv map so we can assert we have correct pv map in HLV
1867+
assert.True(t, reflect.DeepEqual(syncData.HLV.PreviousVersions, pv))
1868+
assert.Equal(t, "1-3a208ea66e84121b528f05b5457d1134", syncData.CurrentRev)
1869+
}

0 commit comments

Comments
 (0)