Skip to content

Commit 1679edc

Browse files
authored
CBG-3354 Channel query support for current version (#6625)
* CBG-3354 Channel query support for current version Adds current version to marshalled _sync.rev property for use with existing indexes. New struct RevAndVersion handles marshal/unmarshal of the rev property, and supports rev only (string) and rev/src/version (map). New structs SyncDataJSON and SyncDataAlias are used to encapsulate this handling at the persistence/marshalling layer. This avoids changes to use of SyncData.CurrentRev, and also avoids potential errors by not duplicating cv in SyncData. * Test updates based on PR feedback
1 parent fca186e commit 1679edc

18 files changed

+413
-130
lines changed

channels/log_entry.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,15 @@ type LogEntry struct {
6060

6161
func (l LogEntry) String() string {
6262
return fmt.Sprintf(
63-
"seq: %d docid: %s revid: %s vbno: %d type: %v collectionID: %d",
63+
"seq: %d docid: %s revid: %s vbno: %d type: %v collectionID: %d source: %s version: %d",
6464
l.Sequence,
6565
l.DocID,
6666
l.RevID,
6767
l.VbNo,
6868
l.Type,
6969
l.CollectionID,
70+
l.SourceID,
71+
l.Version,
7072
)
7173
}
7274

db/change_cache.go

+8
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,14 @@ func (entry *LogEntry) SetDeleted() {
120120
entry.Flags |= channels.Deleted
121121
}
122122

123+
func (entry *LogEntry) SetRevAndVersion(rv RevAndVersion) {
124+
entry.RevID = rv.RevTreeID
125+
if rv.CurrentSource != "" {
126+
entry.SourceID = rv.CurrentSource
127+
entry.Version = base.HexCasToUint64(rv.CurrentVersion)
128+
}
129+
}
130+
123131
type LogEntries []*LogEntry
124132

125133
// A priority-queue of LogEntries, kept ordered by increasing sequence #.

db/changes.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ type ChangeEntry struct {
5757
principalDoc bool // Used to indicate _user/_role docs
5858
Revoked bool `json:"revoked,omitempty"`
5959
collectionID uint32
60-
CurrentVersion *Version `json:"current_version,omitempty"` // the current version of the change entry
60+
CurrentVersion *Version `json:"-"` // the current version of the change entry. (Not marshalled, pending REST support for cv)
6161
}
6262

6363
const (

db/changes_view.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ type channelsViewRow struct {
2525
ID string
2626
Key []interface{} // Actually [channelName, sequence]
2727
Value struct {
28-
Rev string
28+
Rev RevAndVersion
2929
Flags uint8
3030
}
3131
}
@@ -42,13 +42,12 @@ func nextChannelViewEntry(ctx context.Context, results sgbucket.QueryResultItera
4242
entry := &LogEntry{
4343
Sequence: uint64(viewRow.Key[1].(float64)),
4444
DocID: viewRow.ID,
45-
RevID: viewRow.Value.Rev,
4645
Flags: viewRow.Value.Flags,
4746
TimeReceived: time.Now(),
4847
CollectionID: collectionID,
4948
}
49+
entry.SetRevAndVersion(viewRow.Value.Rev)
5050
return entry, true
51-
5251
}
5352

5453
func nextChannelQueryEntry(ctx context.Context, results sgbucket.QueryResultIterator, collectionID uint32) (*LogEntry, bool) {
@@ -61,11 +60,11 @@ func nextChannelQueryEntry(ctx context.Context, results sgbucket.QueryResultIter
6160
entry := &LogEntry{
6261
Sequence: queryRow.Sequence,
6362
DocID: queryRow.Id,
64-
RevID: queryRow.Rev,
6563
Flags: queryRow.Flags,
6664
TimeReceived: time.Now(),
6765
CollectionID: collectionID,
6866
}
67+
entry.SetRevAndVersion(queryRow.Rev)
6968

7069
if queryRow.RemovalRev != "" {
7170
entry.RevID = queryRow.RemovalRev

db/crud.go

+9-4
Original file line numberDiff line numberDiff line change
@@ -2762,10 +2762,11 @@ func (db *DatabaseCollectionWithUser) CheckProposedRev(ctx context.Context, doci
27622762
}
27632763

27642764
const (
2765-
xattrMacroCas = "cas"
2766-
xattrMacroValueCrc32c = "value_crc32c"
2767-
versionVectorVrsMacro = "_vv.vrs"
2768-
versionVectorCVCASMacro = "_vv.cvCas"
2765+
xattrMacroCas = "cas" // SyncData.Cas
2766+
xattrMacroValueCrc32c = "value_crc32c" // SyncData.Crc32c
2767+
xattrMacroCurrentRevVersion = "rev.vrs" // SyncDataJSON.RevAndVersion.CurrentVersion
2768+
versionVectorVrsMacro = "_vv.vrs" // PersistedHybridLogicalVector.Version
2769+
versionVectorCVCASMacro = "_vv.cvCas" // PersistedHybridLogicalVector.CurrentVersionCAS
27692770
)
27702771

27712772
func macroExpandSpec(xattrName string) []sgbucket.MacroExpansionSpec {
@@ -2785,6 +2786,10 @@ func xattrCrc32cPath(xattrKey string) string {
27852786
return xattrKey + "." + xattrMacroValueCrc32c
27862787
}
27872788

2789+
func xattrCurrentRevVersionPath(xattrKey string) string {
2790+
return xattrKey + "." + xattrMacroCurrentRevVersion
2791+
}
2792+
27882793
func xattrCurrentVersionPath(xattrKey string) string {
27892794
return xattrKey + "." + versionVectorVrsMacro
27902795
}

db/database.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -970,15 +970,15 @@ func (c *DatabaseCollection) processForEachDocIDResults(ctx context.Context, cal
970970
found = results.Next(ctx, &viewRow)
971971
if found {
972972
docid = viewRow.Key
973-
revid = viewRow.Value.RevID
973+
revid = viewRow.Value.RevID.RevTreeID
974974
seq = viewRow.Value.Sequence
975975
channels = viewRow.Value.Channels
976976
}
977977
} else {
978978
found = results.Next(ctx, &queryRow)
979979
if found {
980980
docid = queryRow.Id
981-
revid = queryRow.RevID
981+
revid = queryRow.RevID.RevTreeID
982982
seq = queryRow.Sequence
983983
channels = make([]string, 0)
984984
// Query returns all channels, but we only want to return active channels

db/database_test.go

+81-3
Original file line numberDiff line numberDiff line change
@@ -1828,6 +1828,84 @@ func TestChannelView(t *testing.T) {
18281828
log.Printf("View Query returned entry (%d): %v", i, entry)
18291829
}
18301830
assert.Equal(t, 1, len(entries))
1831+
require.Equal(t, "doc1", entries[0].DocID)
1832+
collection.RequireCurrentVersion(t, "doc1", entries[0].SourceID, entries[0].Version)
1833+
}
1834+
1835+
func TestChannelQuery(t *testing.T) {
1836+
1837+
db, ctx := setupTestDB(t)
1838+
defer db.Close(ctx)
1839+
collection := GetSingleDatabaseCollectionWithUser(t, db)
1840+
_, err := collection.UpdateSyncFun(ctx, `function(doc, oldDoc) {
1841+
channel(doc.channels);
1842+
}`)
1843+
require.NoError(t, err)
1844+
1845+
// Create doc
1846+
body := Body{"key1": "value1", "key2": 1234, "channels": "ABC"}
1847+
rev1ID, _, err := collection.Put(ctx, "doc1", body)
1848+
require.NoError(t, err, "Couldn't create doc1")
1849+
1850+
// Create a doc to test removal handling. Needs three revisions so that the removal rev (2) isn't
1851+
// the current revision
1852+
removedDocID := "removed_doc"
1853+
removedDocRev1, _, err := collection.Put(ctx, removedDocID, body)
1854+
require.NoError(t, err, "Couldn't create removed_doc")
1855+
removalSource, removalVersion := collection.GetDocumentCurrentVersion(t, removedDocID)
1856+
1857+
updatedChannelBody := Body{"_rev": removedDocRev1, "key1": "value1", "key2": 1234, "channels": "DEF"}
1858+
removalRev, _, err := collection.Put(ctx, removedDocID, updatedChannelBody)
1859+
require.NoError(t, err, "Couldn't update removed_doc")
1860+
1861+
updatedChannelBody = Body{"_rev": removalRev, "key1": "value1", "key2": 2345, "channels": "DEF"}
1862+
removedDocRev3, _, err := collection.Put(ctx, removedDocID, updatedChannelBody)
1863+
require.NoError(t, err, "Couldn't update removed_doc")
1864+
1865+
var entries LogEntries
1866+
1867+
// Test query retrieval via star channel and named channel (queries use different indexes)
1868+
testCases := []struct {
1869+
testName string
1870+
channelName string
1871+
}{
1872+
{
1873+
testName: "star channel",
1874+
channelName: "*",
1875+
},
1876+
{
1877+
testName: "named channel",
1878+
channelName: "ABC",
1879+
},
1880+
}
1881+
1882+
for _, testCase := range testCases {
1883+
t.Run(testCase.testName, func(t *testing.T) {
1884+
entries, err = collection.getChangesInChannelFromQuery(ctx, testCase.channelName, 0, 100, 0, false)
1885+
require.NoError(t, err)
1886+
1887+
for i, entry := range entries {
1888+
log.Printf("Channel Query returned entry (%d): %v", i, entry)
1889+
}
1890+
require.Len(t, entries, 2)
1891+
require.Equal(t, "doc1", entries[0].DocID)
1892+
require.Equal(t, rev1ID, entries[0].RevID)
1893+
collection.RequireCurrentVersion(t, "doc1", entries[0].SourceID, entries[0].Version)
1894+
1895+
removedDocEntry := entries[1]
1896+
require.Equal(t, removedDocID, removedDocEntry.DocID)
1897+
if testCase.channelName == "*" {
1898+
require.Equal(t, removedDocRev3, removedDocEntry.RevID)
1899+
collection.RequireCurrentVersion(t, removedDocID, removedDocEntry.SourceID, removedDocEntry.Version)
1900+
} else {
1901+
require.Equal(t, removalRev, removedDocEntry.RevID)
1902+
// TODO: Pending channel removal rev handling, CBG-3213
1903+
log.Printf("removal rev check of removal cv %s@%d is pending CBG-3213", removalSource, removalVersion)
1904+
//require.Equal(t, removalSource, removedDocEntry.SourceID)
1905+
//require.Equal(t, removalVersion, removedDocEntry.Version)
1906+
}
1907+
})
1908+
}
18311909

18321910
}
18331911

@@ -2451,7 +2529,7 @@ func TestDeleteWithNoTombstoneCreationSupport(t *testing.T) {
24512529
assert.NoError(t, err)
24522530

24532531
var doc Body
2454-
var xattr Body
2532+
var xattr SyncData
24552533

24562534
// Ensure document has been added
24572535
waitAndAssertCondition(t, func() bool {
@@ -2462,8 +2540,8 @@ func TestDeleteWithNoTombstoneCreationSupport(t *testing.T) {
24622540
assert.Equal(t, int64(1), db.DbStats.SharedBucketImport().ImportCount.Value())
24632541

24642542
assert.Nil(t, doc)
2465-
assert.Equal(t, "1-2cac91faf7b3f5e5fd56ff377bdb5466", xattr["rev"])
2466-
assert.Equal(t, float64(2), xattr["sequence"])
2543+
assert.Equal(t, "1-2cac91faf7b3f5e5fd56ff377bdb5466", xattr.CurrentRev)
2544+
assert.Equal(t, uint64(2), xattr.Sequence)
24672545
}
24682546

24692547
func TestResyncUpdateAllDocChannels(t *testing.T) {

db/document.go

+84-5
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ type ChannelSetEntry struct {
6565

6666
// The sync-gateway metadata stored in the "_sync" property of a Couchbase document.
6767
type SyncData struct {
68-
CurrentRev string `json:"rev"`
68+
CurrentRev string `json:"-"` // CurrentRev. Persisted as RevAndVersion in SyncDataJSON
6969
NewestRev string `json:"new_rev,omitempty"` // Newest rev, if different from CurrentRev
7070
Flags uint8 `json:"flags,omitempty"`
7171
Sequence uint64 `json:"sequence,omitempty"`
@@ -192,7 +192,7 @@ type historyOnlySyncData struct {
192192

193193
type revOnlySyncData struct {
194194
casOnlySyncData
195-
CurrentRev string `json:"rev"`
195+
CurrentRev RevAndVersion `json:"rev"`
196196
}
197197

198198
type casOnlySyncData struct {
@@ -1160,7 +1160,7 @@ func (doc *Document) UnmarshalWithXattr(ctx context.Context, data []byte, xdata
11601160
return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalWithXattr() doc with id: %s (DocUnmarshalHistory). Error: %v", base.UD(doc.ID), unmarshalErr))
11611161
}
11621162
doc.SyncData = SyncData{
1163-
CurrentRev: historyOnlyMeta.CurrentRev,
1163+
CurrentRev: historyOnlyMeta.CurrentRev.RevTreeID,
11641164
History: historyOnlyMeta.History,
11651165
Cas: historyOnlyMeta.Cas,
11661166
}
@@ -1173,7 +1173,7 @@ func (doc *Document) UnmarshalWithXattr(ctx context.Context, data []byte, xdata
11731173
return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalWithXattr() doc with id: %s (DocUnmarshalRev). Error: %v", base.UD(doc.ID), unmarshalErr))
11741174
}
11751175
doc.SyncData = SyncData{
1176-
CurrentRev: revOnlyMeta.CurrentRev,
1176+
CurrentRev: revOnlyMeta.CurrentRev.RevTreeID,
11771177
Cas: revOnlyMeta.Cas,
11781178
}
11791179
doc._rawBody = data
@@ -1230,7 +1230,7 @@ func (doc *Document) MarshalWithXattr() (data []byte, xdata []byte, err error) {
12301230
}
12311231
}
12321232

1233-
xdata, err = base.JSONMarshal(doc.SyncData)
1233+
xdata, err = base.JSONMarshal(&doc.SyncData)
12341234
if err != nil {
12351235
return nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattr() doc SyncData with id: %s. Error: %v", base.UD(doc.ID), err))
12361236
}
@@ -1251,3 +1251,82 @@ func (d *Document) HasCurrentVersion(cv Version) error {
12511251
}
12521252
return nil
12531253
}
1254+
1255+
// SyncDataAlias is an alias for SyncData that doesn't define custom MarshalJSON/UnmarshalJSON
1256+
type SyncDataAlias SyncData
1257+
1258+
// SyncDataJSON is the persisted form of SyncData, with RevAndVersion populated at marshal time
1259+
type SyncDataJSON struct {
1260+
*SyncDataAlias
1261+
RevAndVersion RevAndVersion `json:"rev"`
1262+
}
1263+
1264+
// MarshalJSON populates RevAndVersion using CurrentRev and the HLV (current) source and version.
1265+
// Marshals using SyncDataAlias to avoid recursion, and SyncDataJSON to add the combined RevAndVersion.
1266+
func (s SyncData) MarshalJSON() (data []byte, err error) {
1267+
1268+
var sdj SyncDataJSON
1269+
var sd SyncDataAlias
1270+
sd = (SyncDataAlias)(s)
1271+
sdj.SyncDataAlias = &sd
1272+
sdj.RevAndVersion.RevTreeID = s.CurrentRev
1273+
if s.HLV != nil {
1274+
sdj.RevAndVersion.CurrentSource = s.HLV.SourceID
1275+
sdj.RevAndVersion.CurrentVersion = string(base.Uint64CASToLittleEndianHex(s.HLV.Version))
1276+
}
1277+
return base.JSONMarshal(sdj)
1278+
}
1279+
1280+
// UnmarshalJSON unmarshals using SyncDataJSON, then sets currentRev on SyncData based on the value in RevAndVersion.
1281+
// The HLV's current version stored in RevAndVersion is ignored at unmarshal time - the value in the HLV is the source
1282+
// of truth.
1283+
func (s *SyncData) UnmarshalJSON(data []byte) error {
1284+
1285+
var sdj *SyncDataJSON
1286+
err := base.JSONUnmarshal(data, &sdj)
1287+
if err != nil {
1288+
return err
1289+
}
1290+
*s = SyncData(*sdj.SyncDataAlias)
1291+
s.CurrentRev = sdj.RevAndVersion.RevTreeID
1292+
return nil
1293+
}
1294+
1295+
// RevAndVersion is used to store both revTreeID and currentVersion in a single property, for backwards compatibility
1296+
// with existing indexes using rev. When only RevTreeID is specified, is marshalled/unmarshalled as a string. Otherwise
1297+
// marshalled normally.
1298+
type RevAndVersion struct {
1299+
RevTreeID string `json:"rev,omitempty"`
1300+
CurrentSource string `json:"src,omitempty"`
1301+
CurrentVersion string `json:"vrs,omitempty"` // String representation of version
1302+
}
1303+
1304+
// RevAndVersionJSON aliases RevAndVersion to support conditional unmarshalling from either string (revTreeID) or
1305+
// map (RevAndVersion) representations
1306+
type RevAndVersionJSON RevAndVersion
1307+
1308+
// Marshals RevAndVersion as simple string when only RevTreeID is specified - otherwise performs standard
1309+
// marshalling
1310+
func (rv RevAndVersion) MarshalJSON() (data []byte, err error) {
1311+
1312+
if rv.CurrentSource == "" {
1313+
return base.JSONMarshal(rv.RevTreeID)
1314+
}
1315+
return base.JSONMarshal(RevAndVersionJSON(rv))
1316+
}
1317+
1318+
// Unmarshals either from string (legacy, revID only) or standard RevAndVersion unmarshalling.
1319+
func (rv *RevAndVersion) UnmarshalJSON(data []byte) error {
1320+
1321+
if len(data) == 0 {
1322+
return nil
1323+
}
1324+
switch data[0] {
1325+
case '"':
1326+
return base.JSONUnmarshal(data, &rv.RevTreeID)
1327+
case '{':
1328+
return base.JSONUnmarshal(data, (*RevAndVersionJSON)(rv))
1329+
default:
1330+
return fmt.Errorf("unrecognized JSON format for RevAndVersion: %s", data)
1331+
}
1332+
}

0 commit comments

Comments
 (0)