Skip to content

CBG-4605: dcp mode for caching tool #7483

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions db/util_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ func (db *DatabaseContext) CallProcessEntry(t *testing.T, ctx context.Context, l
db.changeCache.processEntry(ctx, log)
}

// GetCachedChanges will grab cached changes form channel cache for caching tool, not to be used outside test code.
func (db *DatabaseContext) GetCachedChanges(t *testing.T, ctx context.Context, chanID channels.ID) ([]*LogEntry, error) {
logs, err := db.changeCache.getChannelCache().GetCachedChanges(ctx, chanID)
return logs, err
}

func (db *DatabaseContext) NewDCPCachingCountWaiter(tb testing.TB) *StatWaiter {
return db.NewStatWaiter(db.DbStats.Database().DCPCachingCount, tb)
}
Expand Down Expand Up @@ -774,3 +780,9 @@ func GetIndexPartitionCount(t *testing.T, bucket *base.GocbV2Bucket, dsName sgbu
func (db *DatabaseContext) GetMutationListener(t *testing.T) changeListener {
return db.mutationListener
}

// InitChannel is a test-only function to initialize a channel in the channel cache.
func (db *DatabaseContext) InitChannel(ctx context.Context, t *testing.T, chanName string) error {
_, err := db.channelCache.getSingleChannelCache(ctx, channels.NewID(chanName, base.DefaultCollectionID))
return err
}
168 changes: 116 additions & 52 deletions tools/cache_perf_tool/dcpDataGeneration.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,41 +28,51 @@ import (
var hlc = rosmar.NewHybridLogicalClock(0)

type dcpDataGen struct {
seqAlloc *sequenceAllocator
delays []time.Duration
dbCtx *db.DatabaseContext
client *base.DCPClient
numChannels int
seqAlloc *sequenceAllocator
delays []time.Duration
dbCtx *db.DatabaseContext
client *base.DCPClient
numChannelsPerDoc int
numTotalChannels int
simRapidUpdate bool
}

func (dcp *dcpDataGen) vBucketCreation(ctx context.Context) {
// setup the sync data for test run
testSync := dcp.setupSyncDataForTestRun()
delayIndex := 0
// vBucket creation logic
for i := 0; i < 1024; i++ {
time.Sleep(500 * time.Millisecond) // we need a slight delay each iteration otherwise many vBuckets end up writing at the same times
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know this matters, is this something that is worth writing a jitter into so we get variable load, or is this just a throttle?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea for a jitter. May work for a future enhancement for the tool. The idea of this sleep here is to stop all vBucket goroutines starting at the same time, I found that if we were starting all vBuckets at the same time the vBuckets with the same delays were allocating and writing sequences to the cache at the same time, so it wasn’t giving a real world example of how the caching system would work. The jitter suggestion here would work and would arguably make the tool more user friendly too, but for the purpose of us evaluating fixes as they go into the cache I think having a more standardised approach like the sleep would work better.

I worry with random jitter between 0 and N you could end up with a run with higher delays and a run with lower delays and they could give different throughput results for the same configuration making it hard to evaluate if fixes as they go into the cache are actually producing any throughput difference. I could be overthinking this though

if i == 520 {
go dcp.syncSeqVBucketCreation(ctx, uint16(i), 2*time.Second, testSync) // sync seq hot vBucket so high delay
go dcp.syncSeqVBucketCreation(ctx, uint16(i), 2*time.Second) // sync seq hot vBucket so high delay
} else {
// iterate through provided delays and assign to vBucket, when we get to end of delay list reset index and
// start from start again this will ensure some consistency between runs of the same parameters
if delayIndex == len(dcp.delays) {
delayIndex = 0 // reset index so we don't go out of bounds
}
go dcp.vBucketGoroutine(ctx, uint16(i), dcp.delays[delayIndex], testSync)
go dcp.vBucketGoroutine(ctx, uint16(i), dcp.delays[delayIndex])
delayIndex++
}
}
}

func (dcp *dcpDataGen) vBucketGoroutine(ctx context.Context, vbNo uint16, delay time.Duration, syncData db.SyncData) {
func (dcp *dcpDataGen) vBucketGoroutine(ctx context.Context, vbNo uint16, delay time.Duration) {
numGoroutines.Add(1)
defer numGoroutines.Add(-1)
vbSeq := uint64(0)
chanCount := 0
var err error
var newArr []byte
var seqList []uint64
if delay.Nanoseconds() == 0 {
// mutate as fast as possible
for {
sgwSeqno := dcp.seqAlloc.nextSeq()
var sgwSeqno uint64
if dcp.simRapidUpdate && vbSeq%2 == 0 { // simulate rapid update on subset of vBuckets if enabled
seqList = dcp.seqAlloc.nextNSequences(5)
} else {
sgwSeqno = dcp.seqAlloc.nextSeq()
}
select {
case <-ctx.Done():
return
Expand All @@ -80,7 +90,11 @@ func (dcp *dcpDataGen) vBucketGoroutine(ctx context.Context, vbNo uint16, delay
Key: []byte("key-" + strconv.FormatUint(vbSeq, 10) + "-" + strconv.FormatUint(sgwSeqno, 10)),
}

newArr, err := mutateSyncData(sgwSeqno, syncData)
if sgwSeqno == 0 {
newArr, chanCount, err = dcp.mutateWithDedupe(seqList, chanCount)
} else {
newArr, chanCount, err = dcp.mutateSyncData(sgwSeqno, chanCount)
}
if err != nil {
log.Printf("Error setting sequence: %v", err)
return
Expand All @@ -95,7 +109,13 @@ func (dcp *dcpDataGen) vBucketGoroutine(ctx context.Context, vbNo uint16, delay
defer ticker.Stop()
// we have goroutine with a write delay associated with it
for {
sgwSeqno := dcp.seqAlloc.nextSeq() // allocate seq before wait on ticker
var sgwSeqno uint64
// allocate seq before wait on ticker
if dcp.simRapidUpdate && vbSeq%2 == 0 { // simulate rapid update on subset of vBuckets if enabled
seqList = dcp.seqAlloc.nextNSequences(5)
} else {
sgwSeqno = dcp.seqAlloc.nextSeq()
}
select {
case <-ctx.Done():
return
Expand All @@ -113,7 +133,11 @@ func (dcp *dcpDataGen) vBucketGoroutine(ctx context.Context, vbNo uint16, delay
Key: []byte("key-" + strconv.FormatUint(vbSeq, 10) + "-" + strconv.FormatUint(sgwSeqno, 10)),
}

newArr, err := mutateSyncData(sgwSeqno, syncData)
if sgwSeqno == 0 {
newArr, chanCount, err = dcp.mutateWithDedupe(seqList, chanCount)
} else {
newArr, chanCount, err = dcp.mutateSyncData(sgwSeqno, chanCount)
}
if err != nil {
log.Printf("Error setting sequence: %v", err)
return
Expand All @@ -124,12 +148,15 @@ func (dcp *dcpDataGen) vBucketGoroutine(ctx context.Context, vbNo uint16, delay
}
}

func (dcp *dcpDataGen) syncSeqVBucketCreation(ctx context.Context, vbNo uint16, delay time.Duration, syncData db.SyncData) {
func (dcp *dcpDataGen) syncSeqVBucketCreation(ctx context.Context, vbNo uint16, delay time.Duration) {
numGoroutines.Add(1)
defer numGoroutines.Add(-1)
ticker := time.NewTicker(delay)
defer ticker.Stop()
vbSeq := uint64(0)
chanCount := 0
var err error
var newArr []byte
go func() {
numGoroutines.Add(1)
defer numGoroutines.Add(-1)
Expand Down Expand Up @@ -178,7 +205,7 @@ func (dcp *dcpDataGen) syncSeqVBucketCreation(ctx context.Context, vbNo uint16,
Key: []byte("key-" + strconv.FormatUint(vbSeq, 10) + "-" + strconv.FormatUint(sgwSeqno, 10)),
}

newArr, err := mutateSyncData(sgwSeqno, syncData)
newArr, chanCount, err = dcp.mutateSyncData(sgwSeqno, chanCount)
if err != nil {
log.Printf("Error setting sequence: %v", err)
return
Expand All @@ -189,34 +216,74 @@ func (dcp *dcpDataGen) syncSeqVBucketCreation(ctx context.Context, vbNo uint16,
}
}

func updateSyncData(seq uint64, syncData db.SyncData) ([]byte, error) {
syncData.Sequence = seq
syncData.RecentSequences = []uint64{seq}
// update seq info on channel set map
for _, v := range syncData.ChannelSet {
v.Start = seq
func (dcp *dcpDataGen) mutateSyncData(sgwSeqno uint64, chanCount int) ([]byte, int, error) {
chanMap := make(channels.ChannelMap)
chanSet := make([]db.ChannelSetEntry, 0, dcp.numChannelsPerDoc)
chanSetMap := base.Set{}
for i := 0; i < dcp.numChannelsPerDoc; i++ {
if chanCount == dcp.numTotalChannels {
chanCount = 0 // reset channel count so we don't go out of bounds
}
chanName := "test-" + strconv.Itoa(chanCount)
chanMap[chanName] = nil
chanSet = append(chanSet, db.ChannelSetEntry{
Name: chanName,
Start: sgwSeqno,
})
chanSetMap[chanName] = struct{}{}
chanCount++
}
revInf := db.RevInfo{
ID: "1-abc",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is writing 1-abc fine for changing these tests, or should you use 2-def since this will be a subsequent revision?

It probably doesn't matter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree I should be building out the rev tree properly when running in mode to replicate many updates to a document (I have added this to function mutateWithDedupe.

Channels: chanSetMap,
}
revTree := db.RevTree{
"1-abc": &revInf,
}

syncData := db.SyncData{
Sequence: sgwSeqno,
CurrentRev: "1-abc",
History: revTree,
Channels: chanMap,
ChannelSet: chanSet,
TimeSaved: time.Now(),
Cas: "0x000008cc2ee83118",
ClusterUUID: "6a1a82a8ea79aa8b82d3f5667892d9ce",
Crc32c: "0x615126c4",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you intentionally setting cas and crc32 to invalid values that will trigger an import if I understand correctly?

You could macro expand these values

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With regards to the CRC32 value, I don't think for this purpose it matters so going to remove it along with the fake cluster UUID. The cas I will pass into the function and properly set for correctness.

RecentSequences: []uint64{sgwSeqno},
}
byteArrSync, err := json.Marshal(syncData)
if err != nil {
log.Printf("Error marshalling sync data: %v", err)
return nil, err
return nil, 0, err
}

return byteArrSync, nil
inp := sgbucket.Xattr{
Name: "_sync",
Value: byteArrSync,
}
encodedVal := sgbucket.EncodeValueWithXattrs([]byte(`{"some":"body"}`), inp)
return encodedVal, chanCount, nil
}

func (dcp *dcpDataGen) setupSyncDataForTestRun() db.SyncData {
func (dcp *dcpDataGen) mutateWithDedupe(seqs []uint64, chanCount int) ([]byte, int, error) {
chanMap := make(channels.ChannelMap)
chanSet := make([]db.ChannelSetEntry, 0, dcp.numChannels)
chanSet := make([]db.ChannelSetEntry, 0, dcp.numChannelsPerDoc)
currSeq := seqs[len(seqs)-1] // grab current seq form end of seq list
chanSetMap := base.Set{}
for i := 0; i < dcp.numChannels; i++ {
numChan := strconv.Itoa(i)
chanMap["test-"+numChan] = nil
for i := 0; i < dcp.numChannelsPerDoc; i++ {
if chanCount == dcp.numTotalChannels {
chanCount = 0 // reset channel count so we don't go out of bounds
}
chanName := "test-" + strconv.Itoa(chanCount)
chanMap[chanName] = nil
chanSet = append(chanSet, db.ChannelSetEntry{
Name: "test-" + numChan,
Name: chanName,
Start: currSeq,
})
chanSetMap["test-"+numChan] = struct{}{}
chanSetMap[chanName] = struct{}{}
chanCount++
}
revInf := db.RevInfo{
ID: "1-abc",
Expand All @@ -226,33 +293,30 @@ func (dcp *dcpDataGen) setupSyncDataForTestRun() db.SyncData {
"1-abc": &revInf,
}

// return some sync data for the test, with channel info according to test parameters (sequence info added later)
// This will make generation of syn data more efficient, instead of generating the whole sync data object we just
// update sequence information on each mutation
return db.SyncData{
Sequence: 0,
CurrentRev: "1-abc",
History: revTree,
Channels: chanMap,
ChannelSet: chanSet,
TimeSaved: time.Now(),
Cas: "0x000008cc2ee83118",
ClusterUUID: "6a1a82a8ea79aa8b82d3f5667892d9ce",
Crc32c: "0x615126c4",
syncData := db.SyncData{
Sequence: currSeq,
CurrentRev: "1-abc",
History: revTree,
Channels: chanMap,
ChannelSet: chanSet,
TimeSaved: time.Now(),
Cas: "0x000008cc2ee83118",
ClusterUUID: "6a1a82a8ea79aa8b82d3f5667892d9ce",
Crc32c: "0x615126c4",
RecentSequences: seqs,
}
}

func mutateSyncData(sgwSeqno uint64, syncData db.SyncData) ([]byte, error) {
newArr, err := updateSyncData(sgwSeqno, syncData)
byteArrSync, err := json.Marshal(syncData)
if err != nil {
return nil, err
log.Printf("Error marshalling sync data: %v", err)
return nil, 0, err
}

inp := sgbucket.Xattr{
Name: "_sync",
Value: newArr,
Value: byteArrSync,
}
encodedVal := sgbucket.EncodeValueWithXattrs([]byte(`{"some":"body"}`), inp)
return encodedVal, nil
return encodedVal, chanCount, nil
}

func createDCPClient(t *testing.T, ctx context.Context, bucket *base.GocbV2Bucket, callback sgbucket.FeedEventCallbackFunc, dbStats *expvar.Map) (*base.DCPClient, error) {
Expand All @@ -264,7 +328,7 @@ func createDCPClient(t *testing.T, ctx context.Context, bucket *base.GocbV2Bucke
AgentPriority: gocbcore.DcpAgentPriorityMed,
CheckpointPrefix: "",
}

// fake client that we want to hook into
client, err := base.NewDCPClientForTest(ctx, t, "test", callback, options, bucket, 1024)
if err != nil {
return nil, err
Expand Down
Loading
Loading