Skip to content

Commit 6a585e9

Browse files
gregns1bbrks
authored andcommitted
CBG-3356: Add current version to ChangeEntry (#6575)
* CBG-3356: add CV to change entry, test that it corretcly populates when calling for changes. Tests need to activate a channel cache as backfill for channel cache not yet implemented * updates to fix failing tests. Added cv to version type returned by Putting a doc and deleting a doc to make testing easier * minor changes * updates after rebase * fix for test failure * updates from rebase * updated comment * changes in response to commmets * updates to fix test failures * rebase + lint skip * updates to update the doc id changes test I have to actually test the codepath
1 parent 1648a4e commit 6a585e9

File tree

6 files changed

+145
-26
lines changed

6 files changed

+145
-26
lines changed

db/changes.go

+26-13
Original file line numberDiff line numberDiff line change
@@ -44,19 +44,20 @@ type ChangesOptions struct {
4444
// A changes entry; Database.GetChanges returns an array of these.
4545
// Marshals into the standard CouchDB _changes format.
4646
type ChangeEntry struct {
47-
Seq SequenceID `json:"seq"`
48-
ID string `json:"id"`
49-
Deleted bool `json:"deleted,omitempty"`
50-
Removed base.Set `json:"removed,omitempty"`
51-
Doc json.RawMessage `json:"doc,omitempty"`
52-
Changes []ChangeRev `json:"changes"`
53-
Err error `json:"err,omitempty"` // Used to notify feed consumer of errors
54-
allRemoved bool // Flag to track whether an entry is a removal in all channels visible to the user.
55-
branched bool
56-
backfill backfillFlag // Flag used to identify non-client entries used for backfill synchronization (di only)
57-
principalDoc bool // Used to indicate _user/_role docs
58-
Revoked bool `json:"revoked,omitempty"`
59-
collectionID uint32
47+
Seq SequenceID `json:"seq"`
48+
ID string `json:"id"`
49+
Deleted bool `json:"deleted,omitempty"`
50+
Removed base.Set `json:"removed,omitempty"`
51+
Doc json.RawMessage `json:"doc,omitempty"`
52+
Changes []ChangeRev `json:"changes"`
53+
Err error `json:"err,omitempty"` // Used to notify feed consumer of errors
54+
allRemoved bool // Flag to track whether an entry is a removal in all channels visible to the user.
55+
branched bool
56+
backfill backfillFlag // Flag used to identify non-client entries used for backfill synchronization (di only)
57+
principalDoc bool // Used to indicate _user/_role docs
58+
Revoked bool `json:"revoked,omitempty"`
59+
collectionID uint32
60+
CurrentVersion *SourceAndVersion `json:"current_version,omitempty"` // the current version of the change entry
6061
}
6162

6263
const (
@@ -481,6 +482,12 @@ func makeChangeEntry(logEntry *LogEntry, seqID SequenceID, channel channels.ID)
481482
principalDoc: logEntry.IsPrincipal,
482483
collectionID: logEntry.CollectionID,
483484
}
485+
// populate CurrentVersion entry if log entry has sourceID and Version populated
486+
// This allows current version to be nil in event of CV not being populated on log entry
487+
// allowing omitempty to work as expected
488+
if logEntry.SourceID != "" && logEntry.Version != 0 {
489+
change.CurrentVersion = &SourceAndVersion{SourceID: logEntry.SourceID, Version: logEntry.Version}
490+
}
484491
if logEntry.Flags&channels.Removed != 0 {
485492
change.Removed = base.SetOf(channel.Name)
486493
}
@@ -1281,6 +1288,12 @@ func createChangesEntry(ctx context.Context, docid string, db *DatabaseCollectio
12811288
row.Seq = SequenceID{Seq: populatedDoc.Sequence}
12821289
row.SetBranched((populatedDoc.Flags & channels.Branched) != 0)
12831290

1291+
if populatedDoc.HLV != nil {
1292+
cv := SourceAndVersion{}
1293+
cv.SourceID, cv.Version = populatedDoc.HLV.GetCurrentVersion()
1294+
row.CurrentVersion = &cv
1295+
}
1296+
12841297
var removedChannels []string
12851298

12861299
userCanSeeDocChannel := false

db/changes_test.go

+33
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,39 @@ func TestDocDeletionFromChannelCoalescedRemoved(t *testing.T) {
290290
printChanges(changes)
291291
}
292292

293+
func TestCVPopulationOnChangeEntry(t *testing.T) {
294+
db, ctx := setupTestDB(t)
295+
defer db.Close(ctx)
296+
collection := GetSingleDatabaseCollectionWithUser(t, db)
297+
collectionID := collection.GetCollectionID()
298+
bucketUUID := db.BucketUUID
299+
300+
collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout)
301+
302+
authenticator := db.Authenticator(base.TestCtx(t))
303+
user, err := authenticator.NewUser("alice", "letmein", channels.BaseSetOf(t, "A"))
304+
require.NoError(t, err)
305+
require.NoError(t, authenticator.Save(user))
306+
307+
collection.user, _ = authenticator.GetUser("alice")
308+
309+
// Make channel active
310+
_, err = db.channelCache.GetChanges(ctx, channels.NewID("A", collectionID), getChangesOptionsWithZeroSeq(t))
311+
require.NoError(t, err)
312+
313+
_, doc, err := collection.Put(ctx, "doc1", Body{"channels": []string{"A"}})
314+
require.NoError(t, err)
315+
316+
require.NoError(t, collection.WaitForPendingChanges(base.TestCtx(t)))
317+
318+
changes, err := collection.GetChanges(ctx, base.SetOf("A"), getChangesOptionsWithZeroSeq(t))
319+
require.NoError(t, err)
320+
321+
assert.Equal(t, doc.ID, changes[0].ID)
322+
assert.Equal(t, bucketUUID, changes[0].CurrentVersion.SourceID)
323+
assert.Equal(t, doc.Cas, changes[0].CurrentVersion.Version)
324+
}
325+
293326
func TestDocDeletionFromChannelCoalesced(t *testing.T) {
294327
if base.TestUseXattrs() {
295328
t.Skip("This test is known to be failing against couchbase server with XATTRS enabled. Same error as TestDocDeletionFromChannelCoalescedRemoved")

db/database_test.go

+16-10
Original file line numberDiff line numberDiff line change
@@ -1054,6 +1054,7 @@ func TestConflicts(t *testing.T) {
10541054
db, ctx := setupTestDB(t)
10551055
defer db.Close(ctx)
10561056
collection := GetSingleDatabaseCollectionWithUser(t, db)
1057+
bucketUUID := db.BucketUUID
10571058

10581059
collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout)
10591060

@@ -1125,15 +1126,19 @@ func TestConflicts(t *testing.T) {
11251126
Conflicts: true,
11261127
ChangesCtx: base.TestCtx(t),
11271128
}
1129+
fetchedDoc, _, err := collection.GetDocWithXattr(ctx, "doc", DocUnmarshalCAS)
1130+
require.NoError(t, err)
1131+
11281132
changes, err := collection.GetChanges(ctx, channels.BaseSetOf(t, "all"), options)
11291133
assert.NoError(t, err, "Couldn't GetChanges")
11301134
assert.Equal(t, 1, len(changes))
11311135
assert.Equal(t, &ChangeEntry{
1132-
Seq: SequenceID{Seq: 3},
1133-
ID: "doc",
1134-
Changes: []ChangeRev{{"rev": "2-b"}, {"rev": "2-a"}},
1135-
branched: true,
1136-
collectionID: collectionID,
1136+
Seq: SequenceID{Seq: 3},
1137+
ID: "doc",
1138+
Changes: []ChangeRev{{"rev": "2-b"}, {"rev": "2-a"}},
1139+
branched: true,
1140+
collectionID: collectionID,
1141+
CurrentVersion: &SourceAndVersion{SourceID: bucketUUID, Version: fetchedDoc.Cas},
11371142
}, changes[0],
11381143
)
11391144

@@ -1164,11 +1169,12 @@ func TestConflicts(t *testing.T) {
11641169
assert.NoError(t, err, "Couldn't GetChanges")
11651170
assert.Equal(t, 1, len(changes))
11661171
assert.Equal(t, &ChangeEntry{
1167-
Seq: SequenceID{Seq: 4},
1168-
ID: "doc",
1169-
Changes: []ChangeRev{{"rev": "2-a"}, {"rev": rev3}},
1170-
branched: true,
1171-
collectionID: collectionID,
1172+
Seq: SequenceID{Seq: 4},
1173+
ID: "doc",
1174+
Changes: []ChangeRev{{"rev": "2-a"}, {"rev": rev3}},
1175+
branched: true,
1176+
collectionID: collectionID,
1177+
CurrentVersion: &SourceAndVersion{SourceID: bucketUUID, Version: doc.Cas},
11721178
}, changes[0])
11731179

11741180
}

db/hybrid_logical_vector.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ type HybridLogicalVector struct {
3030

3131
// SourceAndVersion is a structure used to add a new entry to a HLV
3232
type SourceAndVersion struct {
33-
SourceID string
34-
Version uint64
33+
SourceID string `json:"source_id"`
34+
Version uint64 `json:"version"`
3535
}
3636

3737
func CreateVersion(source string, version uint64) SourceAndVersion {

rest/changes_test.go

+62
Original file line numberDiff line numberDiff line change
@@ -273,3 +273,65 @@ func TestWebhookWinningRevChangedEvent(t *testing.T) {
273273
assert.Equal(t, 5, int(atomic.LoadUint32(&WinningRevChangedCount)))
274274
assert.Equal(t, 6, int(atomic.LoadUint32(&DocumentChangedCount)))
275275
}
276+
277+
func TestCVPopulationOnChangesViaAPI(t *testing.T) {
278+
rtConfig := RestTesterConfig{
279+
SyncFn: `function(doc) {channel(doc.channels)}`,
280+
}
281+
rt := NewRestTester(t, &rtConfig)
282+
defer rt.Close()
283+
ctx := base.TestCtx(t)
284+
collection := rt.GetSingleTestDatabaseCollection()
285+
bucketUUID := rt.GetDatabase().BucketUUID
286+
const DocID = "doc1"
287+
288+
// activate channel cache
289+
_, err := rt.WaitForChanges(0, "/{{.keyspace}}/_changes", "", true)
290+
require.NoError(t, err)
291+
292+
resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/"+DocID, `{"channels": ["ABC"]}`)
293+
RequireStatus(t, resp, http.StatusCreated)
294+
295+
require.NoError(t, collection.WaitForPendingChanges(base.TestCtx(t)))
296+
297+
changes, err := rt.WaitForChanges(1, "/{{.keyspace}}/_changes", "", true)
298+
require.NoError(t, err)
299+
300+
fetchedDoc, _, err := collection.GetDocWithXattr(ctx, DocID, db.DocUnmarshalCAS)
301+
require.NoError(t, err)
302+
303+
assert.Equal(t, "doc1", changes.Results[0].ID)
304+
assert.Equal(t, bucketUUID, changes.Results[0].CurrentVersion.SourceID)
305+
assert.Equal(t, fetchedDoc.Cas, changes.Results[0].CurrentVersion.Version)
306+
}
307+
308+
func TestCVPopulationOnDocIDChanges(t *testing.T) {
309+
rtConfig := RestTesterConfig{
310+
SyncFn: `function(doc) {channel(doc.channels)}`,
311+
}
312+
rt := NewRestTester(t, &rtConfig)
313+
defer rt.Close()
314+
ctx := base.TestCtx(t)
315+
collection := rt.GetSingleTestDatabaseCollection()
316+
bucketUUID := rt.GetDatabase().BucketUUID
317+
const DocID = "doc1"
318+
319+
// activate channel cache
320+
_, err := rt.WaitForChanges(0, "/{{.keyspace}}/_changes", "", true)
321+
require.NoError(t, err)
322+
323+
resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/"+DocID, `{"channels": ["ABC"]}`)
324+
RequireStatus(t, resp, http.StatusCreated)
325+
326+
require.NoError(t, collection.WaitForPendingChanges(base.TestCtx(t)))
327+
328+
changes, err := rt.WaitForChanges(1, fmt.Sprintf(`/{{.keyspace}}/_changes?filter=_doc_ids&doc_ids=%s`, DocID), "", true)
329+
require.NoError(t, err)
330+
331+
fetchedDoc, _, err := collection.GetDocWithXattr(ctx, DocID, db.DocUnmarshalCAS)
332+
require.NoError(t, err)
333+
334+
assert.Equal(t, "doc1", changes.Results[0].ID)
335+
assert.Equal(t, bucketUUID, changes.Results[0].CurrentVersion.SourceID)
336+
assert.Equal(t, fetchedDoc.Cas, changes.Results[0].CurrentVersion.Version)
337+
}

rest/changestest/changes_api_test.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -861,6 +861,7 @@ func TestChangesFromCompoundSinceViaDocGrant(t *testing.T) {
861861
}
862862
}`})
863863
defer rt.Close()
864+
collection := rt.GetSingleTestDatabaseCollection()
864865

865866
// Create user with access to channel NBC:
866867
ctx := rt.Context()
@@ -928,14 +929,18 @@ func TestChangesFromCompoundSinceViaDocGrant(t *testing.T) {
928929
// Write another doc
929930
_ = rt.PutDoc("mix-1", `{"channel":["ABC", "PBS", "HBO"]}`)
930931

932+
fetchedDoc, _, err := collection.GetDocWithXattr(ctx, "mix-1", db.DocUnmarshalSync)
933+
require.NoError(t, err)
934+
mixSource, mixVersion := fetchedDoc.HLV.GetCurrentVersion()
935+
931936
cacheWaiter.AddAndWait(1)
932937

933938
// Issue a changes request with a compound since value from the last changes response
934939
// ensure we don't backfill from the start, but have everything from the compound sequence onwards
935940
expectedResults = []string{
936941
`{"seq":"8:2","id":"hbo-1","changes":[{"rev":"1-46f8c67c004681619052ee1a1cc8e104"}]}`,
937942
`{"seq":8,"id":"grant-1","changes":[{"rev":"1-c5098bb14d12d647c901850ff6a6292a"}]}`,
938-
`{"seq":9,"id":"mix-1","changes":[{"rev":"1-32f69cdbf1772a8e064f15e928a18f85"}]}`,
943+
fmt.Sprintf(`{"seq":9,"id":"mix-1","changes":[{"rev":"1-32f69cdbf1772a8e064f15e928a18f85"}], "current_version":{"source_id": "%s", "version": %d}}`, mixSource, mixVersion),
939944
}
940945

941946
t.Run("grant via existing channel", func(t *testing.T) {

0 commit comments

Comments
 (0)