Skip to content

Commit 07f4320

Browse files
gregns1bbrkstorcolvin
authored
CBG-3212: add api to fetch a document by its CV value (#6579)
* CBG-3212: add api to fetch a document by its CV value * test fix * rebased SourceAndVersion -> Version rename * Update currentRevChannels on CV revcache load and doc.updateChannels * fix spelling * Remove currentRevChannels * Move common GetRev/GetCV work into documentRevisionForRequest function * Pass revision.RevID into authorizeUserForChannels * Update db/crud.go Co-authored-by: Tor Colvin <tor.colvin@couchbase.com> --------- Co-authored-by: Ben Brooks <ben.brooks@couchbase.com> Co-authored-by: Tor Colvin <tor.colvin@couchbase.com>
1 parent f185349 commit 07f4320

File tree

6 files changed

+247
-37
lines changed

6 files changed

+247
-37
lines changed

db/crud.go

+41-11
Original file line numberDiff line numberDiff line change
@@ -322,14 +322,29 @@ func (db *DatabaseCollectionWithUser) getRev(ctx context.Context, docid, revid s
322322
// No rev ID given, so load active revision
323323
revision, err = db.revisionCache.GetActive(ctx, docid, includeBody)
324324
}
325-
326325
if err != nil {
327326
return DocumentRevision{}, err
328327
}
329328

329+
return db.documentRevisionForRequest(ctx, docid, revision, &revid, nil, maxHistory, historyFrom)
330+
}
331+
332+
// documentRevisionForRequest processes the given DocumentRevision and returns a version of it for a given client request, depending on access, deleted, etc.
333+
func (db *DatabaseCollectionWithUser) documentRevisionForRequest(ctx context.Context, docID string, revision DocumentRevision, revID *string, cv *Version, maxHistory int, historyFrom []string) (DocumentRevision, error) {
334+
// ensure only one of cv or revID is specified
335+
if cv != nil && revID != nil {
336+
return DocumentRevision{}, fmt.Errorf("must have one of cv or revID in documentRevisionForRequest (had cv=%v revID=%v)", cv, revID)
337+
}
338+
var requestedVersion string
339+
if revID != nil {
340+
requestedVersion = *revID
341+
} else if cv != nil {
342+
requestedVersion = cv.String()
343+
}
344+
330345
if revision.BodyBytes == nil {
331346
if db.ForceAPIForbiddenErrors() {
332-
base.InfofCtx(ctx, base.KeyCRUD, "Doc: %s %s is missing", base.UD(docid), base.MD(revid))
347+
base.InfofCtx(ctx, base.KeyCRUD, "Doc: %s %s is missing", base.UD(docID), base.MD(requestedVersion))
333348
return DocumentRevision{}, ErrForbidden
334349
}
335350
return DocumentRevision{}, ErrMissing
@@ -348,16 +363,17 @@ func (db *DatabaseCollectionWithUser) getRev(ctx context.Context, docid, revid s
348363
_, requestedHistory = trimEncodedRevisionsToAncestor(ctx, requestedHistory, historyFrom, maxHistory)
349364
}
350365

351-
isAuthorized, redactedRev := db.authorizeUserForChannels(docid, revision.RevID, revision.Channels, revision.Deleted, requestedHistory)
366+
isAuthorized, redactedRevision := db.authorizeUserForChannels(docID, revision.RevID, cv, revision.Channels, revision.Deleted, requestedHistory)
352367
if !isAuthorized {
353-
if revid == "" {
368+
// client just wanted active revision, not a specific one
369+
if requestedVersion == "" {
354370
return DocumentRevision{}, ErrForbidden
355371
}
356372
if db.ForceAPIForbiddenErrors() {
357-
base.InfofCtx(ctx, base.KeyCRUD, "Not authorized to view doc: %s %s", base.UD(docid), base.MD(revid))
373+
base.InfofCtx(ctx, base.KeyCRUD, "Not authorized to view doc: %s %s", base.UD(docID), base.MD(requestedVersion))
358374
return DocumentRevision{}, ErrForbidden
359375
}
360-
return redactedRev, nil
376+
return redactedRevision, nil
361377
}
362378

363379
// If the revision is a removal cache entry (no body), but the user has access to that removal, then just
@@ -366,13 +382,26 @@ func (db *DatabaseCollectionWithUser) getRev(ctx context.Context, docid, revid s
366382
return DocumentRevision{}, ErrMissing
367383
}
368384

369-
if revision.Deleted && revid == "" {
385+
if revision.Deleted && requestedVersion == "" {
370386
return DocumentRevision{}, ErrDeleted
371387
}
372388

373389
return revision, nil
374390
}
375391

392+
func (db *DatabaseCollectionWithUser) GetCV(ctx context.Context, docid string, cv *Version, includeBody bool) (revision DocumentRevision, err error) {
393+
if cv != nil {
394+
revision, err = db.revisionCache.GetWithCV(ctx, docid, cv, includeBody, RevCacheOmitDelta)
395+
} else {
396+
revision, err = db.revisionCache.GetActive(ctx, docid, includeBody)
397+
}
398+
if err != nil {
399+
return DocumentRevision{}, err
400+
}
401+
402+
return db.documentRevisionForRequest(ctx, docid, revision, nil, cv, 0, nil)
403+
}
404+
376405
// GetDelta attempts to return the delta between fromRevId and toRevId. If the delta can't be generated,
377406
// returns nil.
378407
func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromRevID, toRevID string) (delta *RevisionDelta, redactedRev *DocumentRevision, err error) {
@@ -404,7 +433,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
404433
if fromRevision.Delta != nil {
405434
if fromRevision.Delta.ToRevID == toRevID {
406435

407-
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRevID, fromRevision.Delta.ToChannels, fromRevision.Delta.ToDeleted, encodeRevisions(ctx, docID, fromRevision.Delta.RevisionHistory))
436+
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRevID, nil, fromRevision.Delta.ToChannels, fromRevision.Delta.ToDeleted, encodeRevisions(ctx, docID, fromRevision.Delta.RevisionHistory))
408437
if !isAuthorized {
409438
return nil, &redactedBody, nil
410439
}
@@ -427,7 +456,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
427456
}
428457

429458
deleted := toRevision.Deleted
430-
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRevID, toRevision.Channels, deleted, toRevision.History)
459+
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRevID, nil, toRevision.Channels, deleted, toRevision.History)
431460
if !isAuthorized {
432461
return nil, &redactedBody, nil
433462
}
@@ -486,7 +515,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
486515
return nil, nil, nil
487516
}
488517

489-
func (col *DatabaseCollectionWithUser) authorizeUserForChannels(docID, revID string, channels base.Set, isDeleted bool, history Revisions) (isAuthorized bool, redactedRev DocumentRevision) {
518+
func (col *DatabaseCollectionWithUser) authorizeUserForChannels(docID, revID string, cv *Version, channels base.Set, isDeleted bool, history Revisions) (isAuthorized bool, redactedRev DocumentRevision) {
490519

491520
if col.user != nil {
492521
if err := col.user.AuthorizeAnyCollectionChannel(col.ScopeName, col.Name, channels); err != nil {
@@ -498,6 +527,7 @@ func (col *DatabaseCollectionWithUser) authorizeUserForChannels(docID, revID str
498527
RevID: revID,
499528
History: history,
500529
Deleted: isDeleted,
530+
CV: cv,
501531
}
502532
if isDeleted {
503533
// Deletions are denoted by the deleted message property during 2.x replication
@@ -1064,7 +1094,7 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont
10641094
if existingDoc != nil {
10651095
doc, unmarshalErr := unmarshalDocumentWithXattr(ctx, newDoc.ID, existingDoc.Body, existingDoc.Xattr, existingDoc.UserXattr, existingDoc.Cas, DocUnmarshalRev)
10661096
if unmarshalErr != nil {
1067-
return nil, nil, "", base.HTTPErrorf(http.StatusBadRequest, "Error unmarshaling exsiting doc")
1097+
return nil, nil, "", base.HTTPErrorf(http.StatusBadRequest, "Error unmarshaling existing doc")
10681098
}
10691099
matchRev = doc.CurrentRev
10701100
}

db/crud_test.go

+179
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020

2121
sgbucket "github.com/couchbase/sg-bucket"
2222
"github.com/couchbase/sync_gateway/base"
23+
"github.com/couchbase/sync_gateway/channels"
2324
"github.com/stretchr/testify/assert"
2425
"github.com/stretchr/testify/require"
2526
)
@@ -1867,3 +1868,181 @@ func TestPutExistingCurrentVersionWithNoExistingDoc(t *testing.T) {
18671868
assert.True(t, reflect.DeepEqual(syncData.HLV.PreviousVersions, pv))
18681869
assert.Equal(t, "1-3a208ea66e84121b528f05b5457d1134", syncData.CurrentRev)
18691870
}
1871+
1872+
// TestGetCVWithDocResidentInCache:
1873+
// - Two test cases, one with doc a user will have access to, one without
1874+
// - Purpose is to have a doc that is resident in rev cache and use the GetCV function to retrieve these docs
1875+
// - Assert that the doc the user has access to is corrected fetched
1876+
// - Assert the doc the user doesn't have access to is fetched but correctly redacted
1877+
func TestGetCVWithDocResidentInCache(t *testing.T) {
1878+
const docID = "doc1"
1879+
1880+
testCases := []struct {
1881+
name string
1882+
docChannels []string
1883+
access bool
1884+
}{
1885+
{
1886+
name: "getCVWithUserAccess",
1887+
docChannels: []string{"A"},
1888+
access: true,
1889+
},
1890+
{
1891+
name: "getCVWithoutUserAccess",
1892+
docChannels: []string{"B"},
1893+
access: false,
1894+
},
1895+
}
1896+
for _, testCase := range testCases {
1897+
t.Run(testCase.name, func(t *testing.T) {
1898+
db, ctx := setupTestDB(t)
1899+
defer db.Close(ctx)
1900+
collection := GetSingleDatabaseCollectionWithUser(t, db)
1901+
collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout)
1902+
1903+
// Create a user with access to channel A
1904+
authenticator := db.Authenticator(base.TestCtx(t))
1905+
user, err := authenticator.NewUser("alice", "letmein", channels.BaseSetOf(t, "A"))
1906+
require.NoError(t, err)
1907+
require.NoError(t, authenticator.Save(user))
1908+
collection.user, err = authenticator.GetUser("alice")
1909+
require.NoError(t, err)
1910+
1911+
// create doc with the channels for the test case
1912+
docBody := Body{"channels": testCase.docChannels}
1913+
rev, doc, err := collection.Put(ctx, docID, docBody)
1914+
require.NoError(t, err)
1915+
1916+
vrs := doc.HLV.Version
1917+
src := doc.HLV.SourceID
1918+
sv := &Version{Value: vrs, SourceID: src}
1919+
revision, err := collection.GetCV(ctx, docID, sv, true)
1920+
require.NoError(t, err)
1921+
if testCase.access {
1922+
assert.Equal(t, rev, revision.RevID)
1923+
assert.Equal(t, sv, revision.CV)
1924+
assert.Equal(t, docID, revision.DocID)
1925+
assert.Equal(t, []byte(`{"channels":["A"]}`), revision.BodyBytes)
1926+
} else {
1927+
assert.Equal(t, rev, revision.RevID)
1928+
assert.Equal(t, sv, revision.CV)
1929+
assert.Equal(t, docID, revision.DocID)
1930+
assert.Equal(t, []byte(RemovedRedactedDocument), revision.BodyBytes)
1931+
}
1932+
})
1933+
}
1934+
}
1935+
1936+
// TestGetByCVForDocNotResidentInCache:
1937+
// - Setup db with rev cache size of 1
1938+
// - Put two docs forcing eviction of the first doc
1939+
// - Use GetCV function to fetch the first doc, forcing the rev cache to load the doc from bucket
1940+
// - Assert the doc revision fetched is correct to the first doc we created
1941+
func TestGetByCVForDocNotResidentInCache(t *testing.T) {
1942+
db, ctx := SetupTestDBWithOptions(t, DatabaseContextOptions{
1943+
RevisionCacheOptions: &RevisionCacheOptions{
1944+
Size: 1,
1945+
},
1946+
})
1947+
defer db.Close(ctx)
1948+
collection := GetSingleDatabaseCollectionWithUser(t, db)
1949+
collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout)
1950+
1951+
// Create a user with access to channel A
1952+
authenticator := db.Authenticator(base.TestCtx(t))
1953+
user, err := authenticator.NewUser("alice", "letmein", channels.BaseSetOf(t, "A"))
1954+
require.NoError(t, err)
1955+
require.NoError(t, authenticator.Save(user))
1956+
collection.user, err = authenticator.GetUser("alice")
1957+
require.NoError(t, err)
1958+
1959+
const (
1960+
doc1ID = "doc1"
1961+
doc2ID = "doc2"
1962+
)
1963+
1964+
revBody := Body{"channels": []string{"A"}}
1965+
rev, doc, err := collection.Put(ctx, doc1ID, revBody)
1966+
require.NoError(t, err)
1967+
1968+
// put another doc that should evict first doc from cache
1969+
_, _, err = collection.Put(ctx, doc2ID, revBody)
1970+
require.NoError(t, err)
1971+
1972+
// get by CV should force a load from bucket and have a cache miss
1973+
vrs := doc.HLV.Version
1974+
src := doc.HLV.SourceID
1975+
sv := &Version{Value: vrs, SourceID: src}
1976+
revision, err := collection.GetCV(ctx, doc1ID, sv, true)
1977+
require.NoError(t, err)
1978+
1979+
// assert the fetched doc is the first doc we added and assert that we did in fact get cache miss
1980+
assert.Equal(t, int64(1), db.DbStats.Cache().RevisionCacheMisses.Value())
1981+
assert.Equal(t, rev, revision.RevID)
1982+
assert.Equal(t, sv, revision.CV)
1983+
assert.Equal(t, doc1ID, revision.DocID)
1984+
assert.Equal(t, []byte(`{"channels":["A"]}`), revision.BodyBytes)
1985+
}
1986+
1987+
// TestGetCVActivePathway:
1988+
// - Two test cases, one with doc a user will have access to, one without
1989+
// - Purpose is top specify nil CV to the GetCV function to force the GetActive code pathway
1990+
// - Assert doc that is created is fetched correctly when user has access to doc
1991+
// - Assert that correct error is returned when user has no access to the doc
1992+
func TestGetCVActivePathway(t *testing.T) {
1993+
const docID = "doc1"
1994+
1995+
testCases := []struct {
1996+
name string
1997+
docChannels []string
1998+
access bool
1999+
}{
2000+
{
2001+
name: "activeFetchWithUserAccess",
2002+
docChannels: []string{"A"},
2003+
access: true,
2004+
},
2005+
{
2006+
name: "activeFetchWithoutUserAccess",
2007+
docChannels: []string{"B"},
2008+
access: false,
2009+
},
2010+
}
2011+
for _, testCase := range testCases {
2012+
t.Run(testCase.name, func(t *testing.T) {
2013+
db, ctx := setupTestDB(t)
2014+
defer db.Close(ctx)
2015+
collection := GetSingleDatabaseCollectionWithUser(t, db)
2016+
collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout)
2017+
2018+
// Create a user with access to channel A
2019+
authenticator := db.Authenticator(base.TestCtx(t))
2020+
user, err := authenticator.NewUser("alice", "letmein", channels.BaseSetOf(t, "A"))
2021+
require.NoError(t, err)
2022+
require.NoError(t, authenticator.Save(user))
2023+
collection.user, err = authenticator.GetUser("alice")
2024+
require.NoError(t, err)
2025+
2026+
// test get active path by specifying nil cv
2027+
revBody := Body{"channels": testCase.docChannels}
2028+
rev, doc, err := collection.Put(ctx, docID, revBody)
2029+
require.NoError(t, err)
2030+
revision, err := collection.GetCV(ctx, docID, nil, true)
2031+
2032+
if testCase.access == true {
2033+
require.NoError(t, err)
2034+
vrs := doc.HLV.Version
2035+
src := doc.HLV.SourceID
2036+
sv := &Version{Value: vrs, SourceID: src}
2037+
assert.Equal(t, rev, revision.RevID)
2038+
assert.Equal(t, sv, revision.CV)
2039+
assert.Equal(t, docID, revision.DocID)
2040+
assert.Equal(t, []byte(`{"channels":["A"]}`), revision.BodyBytes)
2041+
} else {
2042+
require.Error(t, err)
2043+
assert.ErrorContains(t, err, ErrForbidden.Error())
2044+
assert.Equal(t, DocumentRevision{}, revision)
2045+
}
2046+
})
2047+
}
2048+
}

db/document.go

+16-18
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,17 @@ type SyncData struct {
9898
removedRevisionBodyKeys map[string]string // keys of non-winning revisions that have been removed (and so may require deletion), indexed by revID
9999
}
100100

101+
// determine set of current channels based on removal entries.
102+
func (sd *SyncData) getCurrentChannels() base.Set {
103+
ch := base.SetOf()
104+
for channelName, channelRemoval := range sd.Channels {
105+
if channelRemoval == nil || channelRemoval.Seq == 0 {
106+
ch.Add(channelName)
107+
}
108+
}
109+
return ch
110+
}
111+
101112
func (sd *SyncData) HashRedact(salt string) SyncData {
102113

103114
// Creating a new SyncData with the redacted info. We copy all the information which stays the same and create new
@@ -177,12 +188,11 @@ type Document struct {
177188
Cas uint64 // Document cas
178189
rawUserXattr []byte // Raw user xattr as retrieved from the bucket
179190

180-
Deleted bool
181-
DocExpiry uint32
182-
RevID string
183-
DocAttachments AttachmentsMeta
184-
inlineSyncData bool
185-
currentRevChannels base.Set // A base.Set of the current revision's channels (determined by SyncData.Channels at UnmarshalJSON time)
191+
Deleted bool
192+
DocExpiry uint32
193+
RevID string
194+
DocAttachments AttachmentsMeta
195+
inlineSyncData bool
186196
}
187197

188198
type historyOnlySyncData struct {
@@ -970,7 +980,6 @@ func (doc *Document) updateChannels(ctx context.Context, newChannels base.Set) (
970980
doc.updateChannelHistory(channel, doc.Sequence, true)
971981
}
972982
}
973-
doc.currentRevChannels = newChannels
974983
if changed != nil {
975984
base.InfofCtx(ctx, base.KeyCRUD, "\tDoc %q / %q in channels %q", base.UD(doc.ID), doc.CurrentRev, base.UD(newChannels))
976985
changedChannels, err = channels.SetFromArray(changed, channels.KeepStar)
@@ -1080,17 +1089,6 @@ func (doc *Document) UnmarshalJSON(data []byte) error {
10801089
doc.SyncData = *syncData.SyncData
10811090
}
10821091

1083-
// determine current revision's channels and store in-memory (avoids doc.Channels iteration at access-check time)
1084-
if len(doc.Channels) > 0 {
1085-
ch := base.SetOf()
1086-
for channelName, channelRemoval := range doc.Channels {
1087-
if channelRemoval == nil || channelRemoval.Seq == 0 {
1088-
ch.Add(channelName)
1089-
}
1090-
}
1091-
doc.currentRevChannels = ch
1092-
}
1093-
10941092
// Unmarshal the rest of the doc body as map[string]interface{}
10951093
if err := doc._body.Unmarshal(data); err != nil {
10961094
return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalJSON() doc with id: %s. Error: %v", base.UD(doc.ID), err))

db/revision_cache_interface.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ func revCacheLoader(ctx context.Context, backingStore RevisionCacheBackingStore,
275275
return revCacheLoaderForDocument(ctx, backingStore, doc, id.RevID)
276276
}
277277

278-
// revCacheLoaderForCv will load a document from the bucket using the CV, comapre the fetched doc and the CV specified in the function,
278+
// revCacheLoaderForCv will load a document from the bucket using the CV, compare the fetched doc and the CV specified in the function,
279279
// and will still return revid for purpose of populating the Rev ID lookup map on the cache
280280
func revCacheLoaderForCv(ctx context.Context, backingStore RevisionCacheBackingStore, id IDandCV, unmarshalBody bool) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, revid string, err error) {
281281
cv := Version{
@@ -337,7 +337,7 @@ func revCacheLoaderForDocumentCV(ctx context.Context, backingStore RevisionCache
337337
if err = doc.HasCurrentVersion(cv); err != nil {
338338
return bodyBytes, body, history, channels, removed, attachments, deleted, doc.Expiry, revid, err
339339
}
340-
channels = doc.currentRevChannels
340+
channels = doc.SyncData.getCurrentChannels()
341341
revid = doc.CurrentRev
342342

343343
return bodyBytes, body, history, channels, removed, attachments, deleted, doc.Expiry, revid, err

0 commit comments

Comments
 (0)