Skip to content

Commit 9cc8f89

Browse files
gregns1bbrkstorcolvin
committed
CBG-3212: add api to fetch a document by its CV value (#6579)
* CBG-3212: add api to fetch a document by its CV value * test fix * rebased SourceAndVersion -> Version rename * Update currentRevChannels on CV revcache load and doc.updateChannels * fix spelling * Remove currentRevChannels * Move common GetRev/GetCV work into documentRevisionForRequest function * Pass revision.RevID into authorizeUserForChannels * Update db/crud.go Co-authored-by: Tor Colvin <tor.colvin@couchbase.com> --------- Co-authored-by: Ben Brooks <ben.brooks@couchbase.com> Co-authored-by: Tor Colvin <tor.colvin@couchbase.com>
1 parent 85ed9bf commit 9cc8f89

File tree

6 files changed

+247
-37
lines changed

6 files changed

+247
-37
lines changed

db/crud.go

+41-11
Original file line numberDiff line numberDiff line change
@@ -313,14 +313,29 @@ func (db *DatabaseCollectionWithUser) getRev(ctx context.Context, docid, revid s
313313
// No rev ID given, so load active revision
314314
revision, err = db.revisionCache.GetActive(ctx, docid, includeBody)
315315
}
316-
317316
if err != nil {
318317
return DocumentRevision{}, err
319318
}
320319

320+
return db.documentRevisionForRequest(ctx, docid, revision, &revid, nil, maxHistory, historyFrom)
321+
}
322+
323+
// documentRevisionForRequest processes the given DocumentRevision and returns a version of it for a given client request, depending on access, deleted, etc.
324+
func (db *DatabaseCollectionWithUser) documentRevisionForRequest(ctx context.Context, docID string, revision DocumentRevision, revID *string, cv *Version, maxHistory int, historyFrom []string) (DocumentRevision, error) {
325+
// ensure only one of cv or revID is specified
326+
if cv != nil && revID != nil {
327+
return DocumentRevision{}, fmt.Errorf("must have one of cv or revID in documentRevisionForRequest (had cv=%v revID=%v)", cv, revID)
328+
}
329+
var requestedVersion string
330+
if revID != nil {
331+
requestedVersion = *revID
332+
} else if cv != nil {
333+
requestedVersion = cv.String()
334+
}
335+
321336
if revision.BodyBytes == nil {
322337
if db.ForceAPIForbiddenErrors() {
323-
base.InfofCtx(ctx, base.KeyCRUD, "Doc: %s %s is missing", base.UD(docid), base.MD(revid))
338+
base.InfofCtx(ctx, base.KeyCRUD, "Doc: %s %s is missing", base.UD(docID), base.MD(requestedVersion))
324339
return DocumentRevision{}, ErrForbidden
325340
}
326341
return DocumentRevision{}, ErrMissing
@@ -339,16 +354,17 @@ func (db *DatabaseCollectionWithUser) getRev(ctx context.Context, docid, revid s
339354
_, requestedHistory = trimEncodedRevisionsToAncestor(ctx, requestedHistory, historyFrom, maxHistory)
340355
}
341356

342-
isAuthorized, redactedRev := db.authorizeUserForChannels(docid, revision.RevID, revision.Channels, revision.Deleted, requestedHistory)
357+
isAuthorized, redactedRevision := db.authorizeUserForChannels(docID, revision.RevID, cv, revision.Channels, revision.Deleted, requestedHistory)
343358
if !isAuthorized {
344-
if revid == "" {
359+
// client just wanted active revision, not a specific one
360+
if requestedVersion == "" {
345361
return DocumentRevision{}, ErrForbidden
346362
}
347363
if db.ForceAPIForbiddenErrors() {
348-
base.InfofCtx(ctx, base.KeyCRUD, "Not authorized to view doc: %s %s", base.UD(docid), base.MD(revid))
364+
base.InfofCtx(ctx, base.KeyCRUD, "Not authorized to view doc: %s %s", base.UD(docID), base.MD(requestedVersion))
349365
return DocumentRevision{}, ErrForbidden
350366
}
351-
return redactedRev, nil
367+
return redactedRevision, nil
352368
}
353369

354370
// If the revision is a removal cache entry (no body), but the user has access to that removal, then just
@@ -357,13 +373,26 @@ func (db *DatabaseCollectionWithUser) getRev(ctx context.Context, docid, revid s
357373
return DocumentRevision{}, ErrMissing
358374
}
359375

360-
if revision.Deleted && revid == "" {
376+
if revision.Deleted && requestedVersion == "" {
361377
return DocumentRevision{}, ErrDeleted
362378
}
363379

364380
return revision, nil
365381
}
366382

383+
func (db *DatabaseCollectionWithUser) GetCV(ctx context.Context, docid string, cv *Version, includeBody bool) (revision DocumentRevision, err error) {
384+
if cv != nil {
385+
revision, err = db.revisionCache.GetWithCV(ctx, docid, cv, includeBody, RevCacheOmitDelta)
386+
} else {
387+
revision, err = db.revisionCache.GetActive(ctx, docid, includeBody)
388+
}
389+
if err != nil {
390+
return DocumentRevision{}, err
391+
}
392+
393+
return db.documentRevisionForRequest(ctx, docid, revision, nil, cv, 0, nil)
394+
}
395+
367396
// GetDelta attempts to return the delta between fromRevId and toRevId. If the delta can't be generated,
368397
// returns nil.
369398
func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromRevID, toRevID string) (delta *RevisionDelta, redactedRev *DocumentRevision, err error) {
@@ -395,7 +424,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
395424
if fromRevision.Delta != nil {
396425
if fromRevision.Delta.ToRevID == toRevID {
397426

398-
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRevID, fromRevision.Delta.ToChannels, fromRevision.Delta.ToDeleted, encodeRevisions(ctx, docID, fromRevision.Delta.RevisionHistory))
427+
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRevID, nil, fromRevision.Delta.ToChannels, fromRevision.Delta.ToDeleted, encodeRevisions(ctx, docID, fromRevision.Delta.RevisionHistory))
399428
if !isAuthorized {
400429
return nil, &redactedBody, nil
401430
}
@@ -418,7 +447,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
418447
}
419448

420449
deleted := toRevision.Deleted
421-
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRevID, toRevision.Channels, deleted, toRevision.History)
450+
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRevID, nil, toRevision.Channels, deleted, toRevision.History)
422451
if !isAuthorized {
423452
return nil, &redactedBody, nil
424453
}
@@ -477,7 +506,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
477506
return nil, nil, nil
478507
}
479508

480-
func (col *DatabaseCollectionWithUser) authorizeUserForChannels(docID, revID string, channels base.Set, isDeleted bool, history Revisions) (isAuthorized bool, redactedRev DocumentRevision) {
509+
func (col *DatabaseCollectionWithUser) authorizeUserForChannels(docID, revID string, cv *Version, channels base.Set, isDeleted bool, history Revisions) (isAuthorized bool, redactedRev DocumentRevision) {
481510

482511
if col.user != nil {
483512
if err := col.user.AuthorizeAnyCollectionChannel(col.ScopeName, col.Name, channels); err != nil {
@@ -489,6 +518,7 @@ func (col *DatabaseCollectionWithUser) authorizeUserForChannels(docID, revID str
489518
RevID: revID,
490519
History: history,
491520
Deleted: isDeleted,
521+
CV: cv,
492522
}
493523
if isDeleted {
494524
// Deletions are denoted by the deleted message property during 2.x replication
@@ -1044,7 +1074,7 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont
10441074
if existingDoc != nil {
10451075
doc, unmarshalErr := unmarshalDocumentWithXattr(ctx, newDoc.ID, existingDoc.Body, existingDoc.Xattrs[base.SyncXattrName], existingDoc.Xattrs[db.userXattrKey()], existingDoc.Cas, DocUnmarshalRev)
10461076
if unmarshalErr != nil {
1047-
return nil, nil, "", base.HTTPErrorf(http.StatusBadRequest, "Error unmarshaling exsiting doc")
1077+
return nil, nil, "", base.HTTPErrorf(http.StatusBadRequest, "Error unmarshaling existing doc")
10481078
}
10491079
matchRev = doc.CurrentRev
10501080
}

db/crud_test.go

+179
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020

2121
sgbucket "github.com/couchbase/sg-bucket"
2222
"github.com/couchbase/sync_gateway/base"
23+
"github.com/couchbase/sync_gateway/channels"
2324
"github.com/stretchr/testify/assert"
2425
"github.com/stretchr/testify/require"
2526
)
@@ -1872,3 +1873,181 @@ func TestPutExistingCurrentVersionWithNoExistingDoc(t *testing.T) {
18721873
assert.True(t, reflect.DeepEqual(syncData.HLV.PreviousVersions, pv))
18731874
assert.Equal(t, "1-3a208ea66e84121b528f05b5457d1134", syncData.CurrentRev)
18741875
}
1876+
1877+
// TestGetCVWithDocResidentInCache:
1878+
// - Two test cases, one with doc a user will have access to, one without
1879+
// - Purpose is to have a doc that is resident in rev cache and use the GetCV function to retrieve these docs
1880+
// - Assert that the doc the user has access to is corrected fetched
1881+
// - Assert the doc the user doesn't have access to is fetched but correctly redacted
1882+
func TestGetCVWithDocResidentInCache(t *testing.T) {
1883+
const docID = "doc1"
1884+
1885+
testCases := []struct {
1886+
name string
1887+
docChannels []string
1888+
access bool
1889+
}{
1890+
{
1891+
name: "getCVWithUserAccess",
1892+
docChannels: []string{"A"},
1893+
access: true,
1894+
},
1895+
{
1896+
name: "getCVWithoutUserAccess",
1897+
docChannels: []string{"B"},
1898+
access: false,
1899+
},
1900+
}
1901+
for _, testCase := range testCases {
1902+
t.Run(testCase.name, func(t *testing.T) {
1903+
db, ctx := setupTestDB(t)
1904+
defer db.Close(ctx)
1905+
collection := GetSingleDatabaseCollectionWithUser(t, db)
1906+
collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout)
1907+
1908+
// Create a user with access to channel A
1909+
authenticator := db.Authenticator(base.TestCtx(t))
1910+
user, err := authenticator.NewUser("alice", "letmein", channels.BaseSetOf(t, "A"))
1911+
require.NoError(t, err)
1912+
require.NoError(t, authenticator.Save(user))
1913+
collection.user, err = authenticator.GetUser("alice")
1914+
require.NoError(t, err)
1915+
1916+
// create doc with the channels for the test case
1917+
docBody := Body{"channels": testCase.docChannels}
1918+
rev, doc, err := collection.Put(ctx, docID, docBody)
1919+
require.NoError(t, err)
1920+
1921+
vrs := doc.HLV.Version
1922+
src := doc.HLV.SourceID
1923+
sv := &Version{Value: vrs, SourceID: src}
1924+
revision, err := collection.GetCV(ctx, docID, sv, true)
1925+
require.NoError(t, err)
1926+
if testCase.access {
1927+
assert.Equal(t, rev, revision.RevID)
1928+
assert.Equal(t, sv, revision.CV)
1929+
assert.Equal(t, docID, revision.DocID)
1930+
assert.Equal(t, []byte(`{"channels":["A"]}`), revision.BodyBytes)
1931+
} else {
1932+
assert.Equal(t, rev, revision.RevID)
1933+
assert.Equal(t, sv, revision.CV)
1934+
assert.Equal(t, docID, revision.DocID)
1935+
assert.Equal(t, []byte(RemovedRedactedDocument), revision.BodyBytes)
1936+
}
1937+
})
1938+
}
1939+
}
1940+
1941+
// TestGetByCVForDocNotResidentInCache:
1942+
// - Setup db with rev cache size of 1
1943+
// - Put two docs forcing eviction of the first doc
1944+
// - Use GetCV function to fetch the first doc, forcing the rev cache to load the doc from bucket
1945+
// - Assert the doc revision fetched is correct to the first doc we created
1946+
func TestGetByCVForDocNotResidentInCache(t *testing.T) {
1947+
db, ctx := SetupTestDBWithOptions(t, DatabaseContextOptions{
1948+
RevisionCacheOptions: &RevisionCacheOptions{
1949+
Size: 1,
1950+
},
1951+
})
1952+
defer db.Close(ctx)
1953+
collection := GetSingleDatabaseCollectionWithUser(t, db)
1954+
collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout)
1955+
1956+
// Create a user with access to channel A
1957+
authenticator := db.Authenticator(base.TestCtx(t))
1958+
user, err := authenticator.NewUser("alice", "letmein", channels.BaseSetOf(t, "A"))
1959+
require.NoError(t, err)
1960+
require.NoError(t, authenticator.Save(user))
1961+
collection.user, err = authenticator.GetUser("alice")
1962+
require.NoError(t, err)
1963+
1964+
const (
1965+
doc1ID = "doc1"
1966+
doc2ID = "doc2"
1967+
)
1968+
1969+
revBody := Body{"channels": []string{"A"}}
1970+
rev, doc, err := collection.Put(ctx, doc1ID, revBody)
1971+
require.NoError(t, err)
1972+
1973+
// put another doc that should evict first doc from cache
1974+
_, _, err = collection.Put(ctx, doc2ID, revBody)
1975+
require.NoError(t, err)
1976+
1977+
// get by CV should force a load from bucket and have a cache miss
1978+
vrs := doc.HLV.Version
1979+
src := doc.HLV.SourceID
1980+
sv := &Version{Value: vrs, SourceID: src}
1981+
revision, err := collection.GetCV(ctx, doc1ID, sv, true)
1982+
require.NoError(t, err)
1983+
1984+
// assert the fetched doc is the first doc we added and assert that we did in fact get cache miss
1985+
assert.Equal(t, int64(1), db.DbStats.Cache().RevisionCacheMisses.Value())
1986+
assert.Equal(t, rev, revision.RevID)
1987+
assert.Equal(t, sv, revision.CV)
1988+
assert.Equal(t, doc1ID, revision.DocID)
1989+
assert.Equal(t, []byte(`{"channels":["A"]}`), revision.BodyBytes)
1990+
}
1991+
1992+
// TestGetCVActivePathway:
1993+
// - Two test cases, one with doc a user will have access to, one without
1994+
// - Purpose is top specify nil CV to the GetCV function to force the GetActive code pathway
1995+
// - Assert doc that is created is fetched correctly when user has access to doc
1996+
// - Assert that correct error is returned when user has no access to the doc
1997+
func TestGetCVActivePathway(t *testing.T) {
1998+
const docID = "doc1"
1999+
2000+
testCases := []struct {
2001+
name string
2002+
docChannels []string
2003+
access bool
2004+
}{
2005+
{
2006+
name: "activeFetchWithUserAccess",
2007+
docChannels: []string{"A"},
2008+
access: true,
2009+
},
2010+
{
2011+
name: "activeFetchWithoutUserAccess",
2012+
docChannels: []string{"B"},
2013+
access: false,
2014+
},
2015+
}
2016+
for _, testCase := range testCases {
2017+
t.Run(testCase.name, func(t *testing.T) {
2018+
db, ctx := setupTestDB(t)
2019+
defer db.Close(ctx)
2020+
collection := GetSingleDatabaseCollectionWithUser(t, db)
2021+
collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout)
2022+
2023+
// Create a user with access to channel A
2024+
authenticator := db.Authenticator(base.TestCtx(t))
2025+
user, err := authenticator.NewUser("alice", "letmein", channels.BaseSetOf(t, "A"))
2026+
require.NoError(t, err)
2027+
require.NoError(t, authenticator.Save(user))
2028+
collection.user, err = authenticator.GetUser("alice")
2029+
require.NoError(t, err)
2030+
2031+
// test get active path by specifying nil cv
2032+
revBody := Body{"channels": testCase.docChannels}
2033+
rev, doc, err := collection.Put(ctx, docID, revBody)
2034+
require.NoError(t, err)
2035+
revision, err := collection.GetCV(ctx, docID, nil, true)
2036+
2037+
if testCase.access == true {
2038+
require.NoError(t, err)
2039+
vrs := doc.HLV.Version
2040+
src := doc.HLV.SourceID
2041+
sv := &Version{Value: vrs, SourceID: src}
2042+
assert.Equal(t, rev, revision.RevID)
2043+
assert.Equal(t, sv, revision.CV)
2044+
assert.Equal(t, docID, revision.DocID)
2045+
assert.Equal(t, []byte(`{"channels":["A"]}`), revision.BodyBytes)
2046+
} else {
2047+
require.Error(t, err)
2048+
assert.ErrorContains(t, err, ErrForbidden.Error())
2049+
assert.Equal(t, DocumentRevision{}, revision)
2050+
}
2051+
})
2052+
}
2053+
}

db/document.go

+16-18
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,17 @@ type SyncData struct {
9999
removedRevisionBodyKeys map[string]string // keys of non-winning revisions that have been removed (and so may require deletion), indexed by revID
100100
}
101101

102+
// determine set of current channels based on removal entries.
103+
func (sd *SyncData) getCurrentChannels() base.Set {
104+
ch := base.SetOf()
105+
for channelName, channelRemoval := range sd.Channels {
106+
if channelRemoval == nil || channelRemoval.Seq == 0 {
107+
ch.Add(channelName)
108+
}
109+
}
110+
return ch
111+
}
112+
102113
func (sd *SyncData) HashRedact(salt string) SyncData {
103114

104115
// Creating a new SyncData with the redacted info. We copy all the information which stays the same and create new
@@ -178,12 +189,11 @@ type Document struct {
178189
Cas uint64 // Document cas
179190
rawUserXattr []byte // Raw user xattr as retrieved from the bucket
180191

181-
Deleted bool
182-
DocExpiry uint32
183-
RevID string
184-
DocAttachments AttachmentsMeta
185-
inlineSyncData bool
186-
currentRevChannels base.Set // A base.Set of the current revision's channels (determined by SyncData.Channels at UnmarshalJSON time)
192+
Deleted bool
193+
DocExpiry uint32
194+
RevID string
195+
DocAttachments AttachmentsMeta
196+
inlineSyncData bool
187197
}
188198

189199
type historyOnlySyncData struct {
@@ -971,7 +981,6 @@ func (doc *Document) updateChannels(ctx context.Context, newChannels base.Set) (
971981
doc.updateChannelHistory(channel, doc.Sequence, true)
972982
}
973983
}
974-
doc.currentRevChannels = newChannels
975984
if changed != nil {
976985
base.InfofCtx(ctx, base.KeyCRUD, "\tDoc %q / %q in channels %q", base.UD(doc.ID), doc.CurrentRev, base.UD(newChannels))
977986
changedChannels, err = channels.SetFromArray(changed, channels.KeepStar)
@@ -1081,17 +1090,6 @@ func (doc *Document) UnmarshalJSON(data []byte) error {
10811090
doc.SyncData = *syncData.SyncData
10821091
}
10831092

1084-
// determine current revision's channels and store in-memory (avoids doc.Channels iteration at access-check time)
1085-
if len(doc.Channels) > 0 {
1086-
ch := base.SetOf()
1087-
for channelName, channelRemoval := range doc.Channels {
1088-
if channelRemoval == nil || channelRemoval.Seq == 0 {
1089-
ch.Add(channelName)
1090-
}
1091-
}
1092-
doc.currentRevChannels = ch
1093-
}
1094-
10951093
// Unmarshal the rest of the doc body as map[string]interface{}
10961094
if err := doc._body.Unmarshal(data); err != nil {
10971095
return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalJSON() doc with id: %s. Error: %v", base.UD(doc.ID), err))

db/revision_cache_interface.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ func revCacheLoader(ctx context.Context, backingStore RevisionCacheBackingStore,
275275
return revCacheLoaderForDocument(ctx, backingStore, doc, id.RevID)
276276
}
277277

278-
// revCacheLoaderForCv will load a document from the bucket using the CV, comapre the fetched doc and the CV specified in the function,
278+
// revCacheLoaderForCv will load a document from the bucket using the CV, compare the fetched doc and the CV specified in the function,
279279
// and will still return revid for purpose of populating the Rev ID lookup map on the cache
280280
func revCacheLoaderForCv(ctx context.Context, backingStore RevisionCacheBackingStore, id IDandCV, unmarshalBody bool) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, revid string, err error) {
281281
cv := Version{
@@ -337,7 +337,7 @@ func revCacheLoaderForDocumentCV(ctx context.Context, backingStore RevisionCache
337337
if err = doc.HasCurrentVersion(cv); err != nil {
338338
return bodyBytes, body, history, channels, removed, attachments, deleted, doc.Expiry, revid, err
339339
}
340-
channels = doc.currentRevChannels
340+
channels = doc.SyncData.getCurrentChannels()
341341
revid = doc.CurrentRev
342342

343343
return bodyBytes, body, history, channels, removed, attachments, deleted, doc.Expiry, revid, err

0 commit comments

Comments
 (0)