Skip to content

Commit 85ed9bf

Browse files
adamcfraserbbrks
authored andcommitted
CBG-3354 Channel query support for current version (#6625)
* CBG-3354 Channel query support for current version Adds current version to marshalled _sync.rev property for use with existing indexes. New struct RevAndVersion handles marshal/unmarshal of the rev property, and supports rev only (string) and rev/src/version (map). New structs SyncDataJSON and SyncDataAlias are used to encapsulate this handling at the persistence/marshalling layer. This avoids changes to use of SyncData.CurrentRev, and also avoids potential errors by not duplicating cv in SyncData. * Test updates based on PR feedback
1 parent 09259ee commit 85ed9bf

19 files changed

+423
-137
lines changed

channels/log_entry.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,15 @@ type LogEntry struct {
6060

6161
func (l LogEntry) String() string {
6262
return fmt.Sprintf(
63-
"seq: %d docid: %s revid: %s vbno: %d type: %v collectionID: %d",
63+
"seq: %d docid: %s revid: %s vbno: %d type: %v collectionID: %d source: %s version: %d",
6464
l.Sequence,
6565
l.DocID,
6666
l.RevID,
6767
l.VbNo,
6868
l.Type,
6969
l.CollectionID,
70+
l.SourceID,
71+
l.Version,
7072
)
7173
}
7274

db/change_cache.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,14 @@ func (entry *LogEntry) SetDeleted() {
120120
entry.Flags |= channels.Deleted
121121
}
122122

123+
func (entry *LogEntry) SetRevAndVersion(rv RevAndVersion) {
124+
entry.RevID = rv.RevTreeID
125+
if rv.CurrentSource != "" {
126+
entry.SourceID = rv.CurrentSource
127+
entry.Version = base.HexCasToUint64(rv.CurrentVersion)
128+
}
129+
}
130+
123131
type LogEntries []*LogEntry
124132

125133
// A priority-queue of LogEntries, kept ordered by increasing sequence #.

db/changes.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ type ChangeEntry struct {
5757
principalDoc bool // Used to indicate _user/_role docs
5858
Revoked bool `json:"revoked,omitempty"`
5959
collectionID uint32
60-
CurrentVersion *Version `json:"current_version,omitempty"` // the current version of the change entry
60+
CurrentVersion *Version `json:"-"` // the current version of the change entry. (Not marshalled, pending REST support for cv)
6161
}
6262

6363
const (

db/changes_view.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ type channelsViewRow struct {
2525
ID string
2626
Key []interface{} // Actually [channelName, sequence]
2727
Value struct {
28-
Rev string
28+
Rev RevAndVersion
2929
Flags uint8
3030
}
3131
}
@@ -42,13 +42,12 @@ func nextChannelViewEntry(ctx context.Context, results sgbucket.QueryResultItera
4242
entry := &LogEntry{
4343
Sequence: uint64(viewRow.Key[1].(float64)),
4444
DocID: viewRow.ID,
45-
RevID: viewRow.Value.Rev,
4645
Flags: viewRow.Value.Flags,
4746
TimeReceived: time.Now(),
4847
CollectionID: collectionID,
4948
}
49+
entry.SetRevAndVersion(viewRow.Value.Rev)
5050
return entry, true
51-
5251
}
5352

5453
func nextChannelQueryEntry(ctx context.Context, results sgbucket.QueryResultIterator, collectionID uint32) (*LogEntry, bool) {
@@ -61,11 +60,11 @@ func nextChannelQueryEntry(ctx context.Context, results sgbucket.QueryResultIter
6160
entry := &LogEntry{
6261
Sequence: queryRow.Sequence,
6362
DocID: queryRow.Id,
64-
RevID: queryRow.Rev,
6563
Flags: queryRow.Flags,
6664
TimeReceived: time.Now(),
6765
CollectionID: collectionID,
6866
}
67+
entry.SetRevAndVersion(queryRow.Rev)
6968

7069
if queryRow.RemovalRev != "" {
7170
entry.RevID = queryRow.RemovalRev

db/crud.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1042,7 +1042,7 @@ func (db *DatabaseCollectionWithUser) Put(ctx context.Context, docid string, bod
10421042
func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Context, newDoc *Document, docHLV HybridLogicalVector, existingDoc *sgbucket.BucketDocument) (doc *Document, cv *Version, newRevID string, err error) {
10431043
var matchRev string
10441044
if existingDoc != nil {
1045-
doc, unmarshalErr := unmarshalDocumentWithXattr(ctx, newDoc.ID, existingDoc.Body, existingDoc.Xattr, existingDoc.UserXattr, existingDoc.Cas, DocUnmarshalRev)
1045+
doc, unmarshalErr := unmarshalDocumentWithXattr(ctx, newDoc.ID, existingDoc.Body, existingDoc.Xattrs[base.SyncXattrName], existingDoc.Xattrs[db.userXattrKey()], existingDoc.Cas, DocUnmarshalRev)
10461046
if unmarshalErr != nil {
10471047
return nil, nil, "", base.HTTPErrorf(http.StatusBadRequest, "Error unmarshaling exsiting doc")
10481048
}
@@ -2828,10 +2828,11 @@ func (db *DatabaseCollectionWithUser) CheckProposedRev(ctx context.Context, doci
28282828
}
28292829

28302830
const (
2831-
xattrMacroCas = "cas"
2832-
xattrMacroValueCrc32c = "value_crc32c"
2833-
versionVectorVrsMacro = "_vv.vrs"
2834-
versionVectorCVCASMacro = "_vv.cvCas"
2831+
xattrMacroCas = "cas" // SyncData.Cas
2832+
xattrMacroValueCrc32c = "value_crc32c" // SyncData.Crc32c
2833+
xattrMacroCurrentRevVersion = "rev.vrs" // SyncDataJSON.RevAndVersion.CurrentVersion
2834+
versionVectorVrsMacro = "_vv.vrs" // PersistedHybridLogicalVector.Version
2835+
versionVectorCVCASMacro = "_vv.cvCas" // PersistedHybridLogicalVector.CurrentVersionCAS
28352836
)
28362837

28372838
func macroExpandSpec(xattrName string) []sgbucket.MacroExpansionSpec {
@@ -2851,6 +2852,10 @@ func xattrCrc32cPath(xattrKey string) string {
28512852
return xattrKey + "." + xattrMacroValueCrc32c
28522853
}
28532854

2855+
func xattrCurrentRevVersionPath(xattrKey string) string {
2856+
return xattrKey + "." + xattrMacroCurrentRevVersion
2857+
}
2858+
28542859
func xattrCurrentVersionPath(xattrKey string) string {
28552860
return xattrKey + "." + versionVectorVrsMacro
28562861
}

db/database.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -960,15 +960,15 @@ func (c *DatabaseCollection) processForEachDocIDResults(ctx context.Context, cal
960960
found = results.Next(ctx, &viewRow)
961961
if found {
962962
docid = viewRow.Key
963-
revid = viewRow.Value.RevID
963+
revid = viewRow.Value.RevID.RevTreeID
964964
seq = viewRow.Value.Sequence
965965
channels = viewRow.Value.Channels
966966
}
967967
} else {
968968
found = results.Next(ctx, &queryRow)
969969
if found {
970970
docid = queryRow.Id
971-
revid = queryRow.RevID
971+
revid = queryRow.RevID.RevTreeID
972972
seq = queryRow.Sequence
973973
channels = make([]string, 0)
974974
// Query returns all channels, but we only want to return active channels

db/database_test.go

Lines changed: 81 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1828,6 +1828,84 @@ func TestChannelView(t *testing.T) {
18281828
log.Printf("View Query returned entry (%d): %v", i, entry)
18291829
}
18301830
assert.Len(t, entries, 1)
1831+
require.Equal(t, "doc1", entries[0].DocID)
1832+
collection.RequireCurrentVersion(t, "doc1", entries[0].SourceID, entries[0].Version)
1833+
}
1834+
1835+
func TestChannelQuery(t *testing.T) {
1836+
1837+
db, ctx := setupTestDB(t)
1838+
defer db.Close(ctx)
1839+
collection := GetSingleDatabaseCollectionWithUser(t, db)
1840+
_, err := collection.UpdateSyncFun(ctx, `function(doc, oldDoc) {
1841+
channel(doc.channels);
1842+
}`)
1843+
require.NoError(t, err)
1844+
1845+
// Create doc
1846+
body := Body{"key1": "value1", "key2": 1234, "channels": "ABC"}
1847+
rev1ID, _, err := collection.Put(ctx, "doc1", body)
1848+
require.NoError(t, err, "Couldn't create doc1")
1849+
1850+
// Create a doc to test removal handling. Needs three revisions so that the removal rev (2) isn't
1851+
// the current revision
1852+
removedDocID := "removed_doc"
1853+
removedDocRev1, _, err := collection.Put(ctx, removedDocID, body)
1854+
require.NoError(t, err, "Couldn't create removed_doc")
1855+
removalSource, removalVersion := collection.GetDocumentCurrentVersion(t, removedDocID)
1856+
1857+
updatedChannelBody := Body{"_rev": removedDocRev1, "key1": "value1", "key2": 1234, "channels": "DEF"}
1858+
removalRev, _, err := collection.Put(ctx, removedDocID, updatedChannelBody)
1859+
require.NoError(t, err, "Couldn't update removed_doc")
1860+
1861+
updatedChannelBody = Body{"_rev": removalRev, "key1": "value1", "key2": 2345, "channels": "DEF"}
1862+
removedDocRev3, _, err := collection.Put(ctx, removedDocID, updatedChannelBody)
1863+
require.NoError(t, err, "Couldn't update removed_doc")
1864+
1865+
var entries LogEntries
1866+
1867+
// Test query retrieval via star channel and named channel (queries use different indexes)
1868+
testCases := []struct {
1869+
testName string
1870+
channelName string
1871+
}{
1872+
{
1873+
testName: "star channel",
1874+
channelName: "*",
1875+
},
1876+
{
1877+
testName: "named channel",
1878+
channelName: "ABC",
1879+
},
1880+
}
1881+
1882+
for _, testCase := range testCases {
1883+
t.Run(testCase.testName, func(t *testing.T) {
1884+
entries, err = collection.getChangesInChannelFromQuery(ctx, testCase.channelName, 0, 100, 0, false)
1885+
require.NoError(t, err)
1886+
1887+
for i, entry := range entries {
1888+
log.Printf("Channel Query returned entry (%d): %v", i, entry)
1889+
}
1890+
require.Len(t, entries, 2)
1891+
require.Equal(t, "doc1", entries[0].DocID)
1892+
require.Equal(t, rev1ID, entries[0].RevID)
1893+
collection.RequireCurrentVersion(t, "doc1", entries[0].SourceID, entries[0].Version)
1894+
1895+
removedDocEntry := entries[1]
1896+
require.Equal(t, removedDocID, removedDocEntry.DocID)
1897+
if testCase.channelName == "*" {
1898+
require.Equal(t, removedDocRev3, removedDocEntry.RevID)
1899+
collection.RequireCurrentVersion(t, removedDocID, removedDocEntry.SourceID, removedDocEntry.Version)
1900+
} else {
1901+
require.Equal(t, removalRev, removedDocEntry.RevID)
1902+
// TODO: Pending channel removal rev handling, CBG-3213
1903+
log.Printf("removal rev check of removal cv %s@%d is pending CBG-3213", removalSource, removalVersion)
1904+
//require.Equal(t, removalSource, removedDocEntry.SourceID)
1905+
//require.Equal(t, removalVersion, removedDocEntry.Version)
1906+
}
1907+
})
1908+
}
18311909

18321910
}
18331911

@@ -2450,7 +2528,7 @@ func TestDeleteWithNoTombstoneCreationSupport(t *testing.T) {
24502528
assert.NoError(t, err)
24512529

24522530
var doc Body
2453-
var xattr Body
2531+
var xattr SyncData
24542532

24552533
var xattrs map[string][]byte
24562534
// Ensure document has been added
@@ -2464,8 +2542,8 @@ func TestDeleteWithNoTombstoneCreationSupport(t *testing.T) {
24642542
assert.Equal(t, int64(1), db.DbStats.SharedBucketImport().ImportCount.Value())
24652543

24662544
assert.Nil(t, doc)
2467-
assert.Equal(t, "1-2cac91faf7b3f5e5fd56ff377bdb5466", xattr["rev"])
2468-
assert.Equal(t, float64(2), xattr["sequence"])
2545+
assert.Equal(t, "1-2cac91faf7b3f5e5fd56ff377bdb5466", xattr.CurrentRev)
2546+
assert.Equal(t, uint64(2), xattr.Sequence)
24692547
}
24702548

24712549
func TestResyncUpdateAllDocChannels(t *testing.T) {

db/document.go

Lines changed: 84 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ type ChannelSetEntry struct {
6666

6767
// The sync-gateway metadata stored in the "_sync" property of a Couchbase document.
6868
type SyncData struct {
69-
CurrentRev string `json:"rev"`
69+
CurrentRev string `json:"-"` // CurrentRev. Persisted as RevAndVersion in SyncDataJSON
7070
NewestRev string `json:"new_rev,omitempty"` // Newest rev, if different from CurrentRev
7171
Flags uint8 `json:"flags,omitempty"`
7272
Sequence uint64 `json:"sequence,omitempty"`
@@ -193,7 +193,7 @@ type historyOnlySyncData struct {
193193

194194
type revOnlySyncData struct {
195195
casOnlySyncData
196-
CurrentRev string `json:"rev"`
196+
CurrentRev RevAndVersion `json:"rev"`
197197
}
198198

199199
type casOnlySyncData struct {
@@ -1161,7 +1161,7 @@ func (doc *Document) UnmarshalWithXattr(ctx context.Context, data []byte, xdata
11611161
return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalWithXattr() doc with id: %s (DocUnmarshalHistory). Error: %v", base.UD(doc.ID), unmarshalErr))
11621162
}
11631163
doc.SyncData = SyncData{
1164-
CurrentRev: historyOnlyMeta.CurrentRev,
1164+
CurrentRev: historyOnlyMeta.CurrentRev.RevTreeID,
11651165
History: historyOnlyMeta.History,
11661166
Cas: historyOnlyMeta.Cas,
11671167
}
@@ -1174,7 +1174,7 @@ func (doc *Document) UnmarshalWithXattr(ctx context.Context, data []byte, xdata
11741174
return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalWithXattr() doc with id: %s (DocUnmarshalRev). Error: %v", base.UD(doc.ID), unmarshalErr))
11751175
}
11761176
doc.SyncData = SyncData{
1177-
CurrentRev: revOnlyMeta.CurrentRev,
1177+
CurrentRev: revOnlyMeta.CurrentRev.RevTreeID,
11781178
Cas: revOnlyMeta.Cas,
11791179
}
11801180
doc._rawBody = data
@@ -1231,7 +1231,7 @@ func (doc *Document) MarshalWithXattr() (data []byte, xdata []byte, err error) {
12311231
}
12321232
}
12331233

1234-
xdata, err = base.JSONMarshal(doc.SyncData)
1234+
xdata, err = base.JSONMarshal(&doc.SyncData)
12351235
if err != nil {
12361236
return nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattr() doc SyncData with id: %s. Error: %v", base.UD(doc.ID), err))
12371237
}
@@ -1252,3 +1252,82 @@ func (d *Document) HasCurrentVersion(cv Version) error {
12521252
}
12531253
return nil
12541254
}
1255+
1256+
// SyncDataAlias is an alias for SyncData that doesn't define custom MarshalJSON/UnmarshalJSON
1257+
type SyncDataAlias SyncData
1258+
1259+
// SyncDataJSON is the persisted form of SyncData, with RevAndVersion populated at marshal time
1260+
type SyncDataJSON struct {
1261+
*SyncDataAlias
1262+
RevAndVersion RevAndVersion `json:"rev"`
1263+
}
1264+
1265+
// MarshalJSON populates RevAndVersion using CurrentRev and the HLV (current) source and version.
1266+
// Marshals using SyncDataAlias to avoid recursion, and SyncDataJSON to add the combined RevAndVersion.
1267+
func (s SyncData) MarshalJSON() (data []byte, err error) {
1268+
1269+
var sdj SyncDataJSON
1270+
var sd SyncDataAlias
1271+
sd = (SyncDataAlias)(s)
1272+
sdj.SyncDataAlias = &sd
1273+
sdj.RevAndVersion.RevTreeID = s.CurrentRev
1274+
if s.HLV != nil {
1275+
sdj.RevAndVersion.CurrentSource = s.HLV.SourceID
1276+
sdj.RevAndVersion.CurrentVersion = string(base.Uint64CASToLittleEndianHex(s.HLV.Version))
1277+
}
1278+
return base.JSONMarshal(sdj)
1279+
}
1280+
1281+
// UnmarshalJSON unmarshals using SyncDataJSON, then sets currentRev on SyncData based on the value in RevAndVersion.
1282+
// The HLV's current version stored in RevAndVersion is ignored at unmarshal time - the value in the HLV is the source
1283+
// of truth.
1284+
func (s *SyncData) UnmarshalJSON(data []byte) error {
1285+
1286+
var sdj *SyncDataJSON
1287+
err := base.JSONUnmarshal(data, &sdj)
1288+
if err != nil {
1289+
return err
1290+
}
1291+
*s = SyncData(*sdj.SyncDataAlias)
1292+
s.CurrentRev = sdj.RevAndVersion.RevTreeID
1293+
return nil
1294+
}
1295+
1296+
// RevAndVersion is used to store both revTreeID and currentVersion in a single property, for backwards compatibility
1297+
// with existing indexes using rev. When only RevTreeID is specified, is marshalled/unmarshalled as a string. Otherwise
1298+
// marshalled normally.
1299+
type RevAndVersion struct {
1300+
RevTreeID string `json:"rev,omitempty"`
1301+
CurrentSource string `json:"src,omitempty"`
1302+
CurrentVersion string `json:"vrs,omitempty"` // String representation of version
1303+
}
1304+
1305+
// RevAndVersionJSON aliases RevAndVersion to support conditional unmarshalling from either string (revTreeID) or
1306+
// map (RevAndVersion) representations
1307+
type RevAndVersionJSON RevAndVersion
1308+
1309+
// Marshals RevAndVersion as simple string when only RevTreeID is specified - otherwise performs standard
1310+
// marshalling
1311+
func (rv RevAndVersion) MarshalJSON() (data []byte, err error) {
1312+
1313+
if rv.CurrentSource == "" {
1314+
return base.JSONMarshal(rv.RevTreeID)
1315+
}
1316+
return base.JSONMarshal(RevAndVersionJSON(rv))
1317+
}
1318+
1319+
// Unmarshals either from string (legacy, revID only) or standard RevAndVersion unmarshalling.
1320+
func (rv *RevAndVersion) UnmarshalJSON(data []byte) error {
1321+
1322+
if len(data) == 0 {
1323+
return nil
1324+
}
1325+
switch data[0] {
1326+
case '"':
1327+
return base.JSONUnmarshal(data, &rv.RevTreeID)
1328+
case '{':
1329+
return base.JSONUnmarshal(data, (*RevAndVersionJSON)(rv))
1330+
default:
1331+
return fmt.Errorf("unrecognized JSON format for RevAndVersion: %s", data)
1332+
}
1333+
}

0 commit comments

Comments
 (0)