Skip to content

Commit b34105b

Browse files
gregns1bbrks
authored andcommitted
CBG-3355: Add current version to channel cache (#6571)
* CBG-3255: Add current version to log entry for population on the channel cache. Pre-requisite for my work on adding CV to change entries. Only adds CV to log entry from docs seen over DCP at this time pending work on channel cache backfill * add comments and protect against panic in channel cache population * add more commnets * updated to move test and few lines populating log entry
1 parent 8a3e734 commit b34105b

File tree

7 files changed

+140
-0
lines changed

7 files changed

+140
-0
lines changed

channels/log_entry.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ type LogEntry struct {
5454
PrevSequence uint64 // Sequence of previous active revision
5555
IsPrincipal bool // Whether the log-entry is a tracking entry for a principal doc
5656
CollectionID uint32 // Collection ID
57+
SourceID string // SourceID allocated to the doc's Current Version on the HLV
58+
Version uint64 // Version allocated to the doc's Current Version on the HLV
5759
}
5860

5961
func (l LogEntry) String() string {

db/change_cache.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,7 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent) {
499499
if len(rawUserXattr) > 0 {
500500
collection.revisionCache.RemoveWithRev(docID, syncData.CurrentRev)
501501
}
502+
502503
change := &LogEntry{
503504
Sequence: syncData.Sequence,
504505
DocID: docID,
@@ -509,6 +510,10 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent) {
509510
Channels: syncData.Channels,
510511
CollectionID: event.CollectionID,
511512
}
513+
if syncData.HLV != nil {
514+
change.SourceID = syncData.HLV.SourceID
515+
change.Version = syncData.HLV.Version
516+
}
512517

513518
millisecondLatency := int(feedLatency / time.Millisecond)
514519

db/change_cache_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,24 @@ func logEntry(seq uint64, docid string, revid string, channelNames []string, col
7474
return entry
7575
}
7676

77+
func testLogEntryWithCV(seq uint64, docid string, revid string, channelNames []string, collectionID uint32, sourceID string, version uint64) *LogEntry {
78+
entry := &LogEntry{
79+
Sequence: seq,
80+
DocID: docid,
81+
RevID: revid,
82+
TimeReceived: time.Now(),
83+
CollectionID: collectionID,
84+
SourceID: sourceID,
85+
Version: version,
86+
}
87+
channelMap := make(channels.ChannelMap)
88+
for _, channelName := range channelNames {
89+
channelMap[channelName] = nil
90+
}
91+
entry.Channels = channelMap
92+
return entry
93+
}
94+
7795
func TestSkippedSequenceList(t *testing.T) {
7896

7997
skipList := NewSkippedSequenceList()

db/changes_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,3 +521,45 @@ func BenchmarkChangesFeedDocUnmarshalling(b *testing.B) {
521521
}
522522

523523
}
524+
525+
// TestCurrentVersionPopulationOnChannelCache:
526+
// - Make channel active on cache
527+
// - Add a doc that is assigned this channel
528+
// - Get the sync data of that doc to assert against the HLV defined on it
529+
// - Wait for the channel cache to be populated with this doc write
530+
// - Assert the CV in the entry fetched from channel cache matches the sync data CV and the bucket UUID on the database context
531+
func TestCurrentVersionPopulationOnChannelCache(t *testing.T) {
532+
base.SetUpTestLogging(t, base.LevelDebug, base.KeyCRUD, base.KeyImport, base.KeyDCP, base.KeyCache, base.KeyHTTP)
533+
db, ctx := setupTestDB(t)
534+
defer db.Close(ctx)
535+
collection := GetSingleDatabaseCollectionWithUser(t, db)
536+
collectionID := collection.GetCollectionID()
537+
bucketUUID := db.BucketUUID
538+
collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout)
539+
540+
// Make channel active
541+
_, err := db.channelCache.GetChanges(ctx, channels.NewID("ABC", collectionID), getChangesOptionsWithZeroSeq(t))
542+
require.NoError(t, err)
543+
544+
// Put a doc that gets assigned a CV to populate the channel cache with
545+
_, _, err = collection.Put(ctx, "doc1", Body{"channels": []string{"ABC"}})
546+
require.NoError(t, err)
547+
err = collection.WaitForPendingChanges(base.TestCtx(t))
548+
require.NoError(t, err)
549+
550+
syncData, err := collection.GetDocSyncData(ctx, "doc1")
551+
require.NoError(t, err)
552+
uintCAS := base.HexCasToUint64(syncData.Cas)
553+
554+
// get entry of above doc from channel cache
555+
entries, err := db.channelCache.GetChanges(ctx, channels.NewID("ABC", collectionID), getChangesOptionsWithZeroSeq(t))
556+
require.NoError(t, err)
557+
require.NotNil(t, entries)
558+
559+
// assert that the source and version has been populated with the channel cache entry for the doc
560+
assert.Equal(t, "doc1", entries[0].DocID)
561+
assert.Equal(t, uintCAS, entries[0].Version)
562+
assert.Equal(t, bucketUUID, entries[0].SourceID)
563+
assert.Equal(t, syncData.HLV.SourceID, entries[0].SourceID)
564+
assert.Equal(t, syncData.HLV.Version, entries[0].Version)
565+
}

db/channel_cache_single_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -951,6 +951,23 @@ func verifyChannelDocIDs(entries []*LogEntry, docIDs []string) bool {
951951
return true
952952
}
953953

954+
type cvValues struct {
955+
source string
956+
version uint64
957+
}
958+
959+
func verifyCVEntries(entries []*LogEntry, cvs []cvValues) bool {
960+
for index, cv := range cvs {
961+
if entries[index].SourceID != cv.source {
962+
return false
963+
}
964+
if entries[index].Version != cv.version {
965+
return false
966+
}
967+
}
968+
return true
969+
}
970+
954971
func writeEntries(entries []*LogEntry) {
955972
for index, entry := range entries {
956973
log.Printf("%d:seq=%d, docID=%s, revID=%s", index, entry.Sequence, entry.DocID, entry.RevID)

db/channel_cache_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,58 @@ func TestChannelCacheMaxSize(t *testing.T) {
5353
assert.Equal(t, 4, int(maxEntries))
5454
}
5555

56+
// TestChannelCacheCurrentVersion:
57+
// - Makes channel channels active for channels used in test by requesting changes on each channel
58+
// - Add 4 docs to the channel cache with CV defined in the log entry
59+
// - Get changes for each channel in question and assert that the CV is populated in each entry expected
60+
func TestChannelCacheCurrentVersion(t *testing.T) {
61+
db, ctx := setupTestDB(t)
62+
defer db.Close(ctx)
63+
64+
cache := db.changeCache.getChannelCache()
65+
66+
collectionID := GetSingleDatabaseCollection(t, db.DatabaseContext).GetCollectionID()
67+
68+
// Make channels active
69+
_, err := cache.GetChanges(ctx, channels.NewID("chanA", collectionID), getChangesOptionsWithCtxOnly(t))
70+
require.NoError(t, err)
71+
_, err = cache.GetChanges(ctx, channels.NewID("chanB", collectionID), getChangesOptionsWithCtxOnly(t))
72+
require.NoError(t, err)
73+
_, err = cache.GetChanges(ctx, channels.NewID("chanC", collectionID), getChangesOptionsWithCtxOnly(t))
74+
require.NoError(t, err)
75+
_, err = cache.GetChanges(ctx, channels.NewID("chanD", collectionID), getChangesOptionsWithCtxOnly(t))
76+
require.NoError(t, err)
77+
78+
cache.AddToCache(ctx, testLogEntryWithCV(1, "doc1", "1-a", []string{"chanB", "chanC", "chanD"}, collectionID, "test1", 123))
79+
cache.AddToCache(ctx, testLogEntryWithCV(2, "doc2", "1-a", []string{"chanB", "chanC", "chanD"}, collectionID, "test2", 1234))
80+
cache.AddToCache(ctx, testLogEntryWithCV(3, "doc3", "1-a", []string{"chanC", "chanD"}, collectionID, "test3", 12345))
81+
cache.AddToCache(ctx, testLogEntryWithCV(4, "doc4", "1-a", []string{"chanC"}, collectionID, "test4", 123456))
82+
83+
// assert on channel cache entries for 'chanC'
84+
entriesChanC, err := cache.GetChanges(ctx, channels.NewID("chanC", collectionID), getChangesOptionsWithZeroSeq(t))
85+
assert.NoError(t, err)
86+
require.Len(t, entriesChanC, 4)
87+
assert.True(t, verifyChannelSequences(entriesChanC, []uint64{1, 2, 3, 4}))
88+
assert.True(t, verifyChannelDocIDs(entriesChanC, []string{"doc1", "doc2", "doc3", "doc4"}))
89+
assert.True(t, verifyCVEntries(entriesChanC, []cvValues{{source: "test1", version: 123}, {source: "test2", version: 1234}, {source: "test3", version: 12345}, {source: "test4", version: 123456}}))
90+
91+
// assert on channel cache entries for 'chanD'
92+
entriesChanD, err := cache.GetChanges(ctx, channels.NewID("chanD", collectionID), getChangesOptionsWithZeroSeq(t))
93+
assert.NoError(t, err)
94+
require.Len(t, entriesChanD, 3)
95+
assert.True(t, verifyChannelSequences(entriesChanD, []uint64{1, 2, 3}))
96+
assert.True(t, verifyChannelDocIDs(entriesChanD, []string{"doc1", "doc2", "doc3"}))
97+
assert.True(t, verifyCVEntries(entriesChanD, []cvValues{{source: "test1", version: 123}, {source: "test2", version: 1234}, {source: "test3", version: 12345}}))
98+
99+
// assert on channel cache entries for 'chanB'
100+
entriesChanB, err := cache.GetChanges(ctx, channels.NewID("chanB", collectionID), getChangesOptionsWithZeroSeq(t))
101+
assert.NoError(t, err)
102+
require.Len(t, entriesChanB, 2)
103+
assert.True(t, verifyChannelSequences(entriesChanB, []uint64{1, 2}))
104+
assert.True(t, verifyChannelDocIDs(entriesChanB, []string{"doc1", "doc2"}))
105+
assert.True(t, verifyCVEntries(entriesChanB, []cvValues{{source: "test1", version: 123}, {source: "test2", version: 1234}}))
106+
}
107+
56108
func getCacheUtilization(stats *base.CacheStats) (active, tombstones, removals int) {
57109
active = int(stats.ChannelCacheRevsActive.Value())
58110
tombstones = int(stats.ChannelCacheRevsTombstone.Value())

db/util_testing.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,10 @@ func (dbc *DatabaseContext) CollectionChannelViewForTest(tb testing.TB, collecti
459459
return collection.getChangesInChannelFromQuery(base.TestCtx(tb), channelName, startSeq, endSeq, 0, false)
460460
}
461461

462+
func (db *DatabaseContext) GetChannelCache() ChannelCache {
463+
return db.channelCache
464+
}
465+
462466
// Test-only version of GetPrincipal that doesn't trigger channel/role recalculation
463467
func (dbc *DatabaseContext) GetPrincipalForTest(tb testing.TB, name string, isUser bool) (info *auth.PrincipalConfig, err error) {
464468
ctx := base.TestCtx(tb)

0 commit comments

Comments
 (0)