-
Notifications
You must be signed in to change notification settings - Fork 139
CBG-4605: dcp mode for caching tool #7483
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
86542a1
13603b1
5164c0b
f425a04
be9929a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,41 +28,51 @@ import ( | |
var hlc = rosmar.NewHybridLogicalClock(0) | ||
|
||
type dcpDataGen struct { | ||
seqAlloc *sequenceAllocator | ||
delays []time.Duration | ||
dbCtx *db.DatabaseContext | ||
client *base.DCPClient | ||
numChannels int | ||
seqAlloc *sequenceAllocator | ||
delays []time.Duration | ||
dbCtx *db.DatabaseContext | ||
client *base.DCPClient | ||
numChannelsPerDoc int | ||
numTotalChannels int | ||
simRapidUpdate bool | ||
} | ||
|
||
func (dcp *dcpDataGen) vBucketCreation(ctx context.Context) { | ||
// setup the sync data for test run | ||
testSync := dcp.setupSyncDataForTestRun() | ||
delayIndex := 0 | ||
// vBucket creation logic | ||
for i := 0; i < 1024; i++ { | ||
time.Sleep(500 * time.Millisecond) // we need a slight delay each iteration otherwise many vBuckets end up writing at the same times | ||
if i == 520 { | ||
go dcp.syncSeqVBucketCreation(ctx, uint16(i), 2*time.Second, testSync) // sync seq hot vBucket so high delay | ||
go dcp.syncSeqVBucketCreation(ctx, uint16(i), 2*time.Second) // sync seq hot vBucket so high delay | ||
} else { | ||
// iterate through provided delays and assign to vBucket, when we get to end of delay list reset index and | ||
// start from start again this will ensure some consistency between runs of the same parameters | ||
if delayIndex == len(dcp.delays) { | ||
delayIndex = 0 // reset index so we don't go out of bounds | ||
} | ||
go dcp.vBucketGoroutine(ctx, uint16(i), dcp.delays[delayIndex], testSync) | ||
go dcp.vBucketGoroutine(ctx, uint16(i), dcp.delays[delayIndex]) | ||
delayIndex++ | ||
} | ||
} | ||
} | ||
|
||
func (dcp *dcpDataGen) vBucketGoroutine(ctx context.Context, vbNo uint16, delay time.Duration, syncData db.SyncData) { | ||
func (dcp *dcpDataGen) vBucketGoroutine(ctx context.Context, vbNo uint16, delay time.Duration) { | ||
numGoroutines.Add(1) | ||
defer numGoroutines.Add(-1) | ||
vbSeq := uint64(0) | ||
chanCount := 0 | ||
var err error | ||
var newArr []byte | ||
var seqList []uint64 | ||
if delay.Nanoseconds() == 0 { | ||
// mutate as fast as possible | ||
for { | ||
sgwSeqno := dcp.seqAlloc.nextSeq() | ||
var sgwSeqno uint64 | ||
if dcp.simRapidUpdate && vbSeq%2 == 0 { // simulate rapid update on subset of vBuckets if enabled | ||
seqList = dcp.seqAlloc.nextNSequences(5) | ||
} else { | ||
sgwSeqno = dcp.seqAlloc.nextSeq() | ||
} | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
|
@@ -80,7 +90,11 @@ func (dcp *dcpDataGen) vBucketGoroutine(ctx context.Context, vbNo uint16, delay | |
Key: []byte("key-" + strconv.FormatUint(vbSeq, 10) + "-" + strconv.FormatUint(sgwSeqno, 10)), | ||
} | ||
|
||
newArr, err := mutateSyncData(sgwSeqno, syncData) | ||
if sgwSeqno == 0 { | ||
newArr, chanCount, err = dcp.mutateWithDedupe(seqList, chanCount) | ||
} else { | ||
newArr, chanCount, err = dcp.mutateSyncData(sgwSeqno, chanCount) | ||
} | ||
if err != nil { | ||
log.Printf("Error setting sequence: %v", err) | ||
return | ||
|
@@ -95,7 +109,13 @@ func (dcp *dcpDataGen) vBucketGoroutine(ctx context.Context, vbNo uint16, delay | |
defer ticker.Stop() | ||
// we have goroutine with a write delay associated with it | ||
for { | ||
sgwSeqno := dcp.seqAlloc.nextSeq() // allocate seq before wait on ticker | ||
var sgwSeqno uint64 | ||
// allocate seq before wait on ticker | ||
if dcp.simRapidUpdate && vbSeq%2 == 0 { // simulate rapid update on subset of vBuckets if enabled | ||
seqList = dcp.seqAlloc.nextNSequences(5) | ||
} else { | ||
sgwSeqno = dcp.seqAlloc.nextSeq() | ||
} | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
|
@@ -113,7 +133,11 @@ func (dcp *dcpDataGen) vBucketGoroutine(ctx context.Context, vbNo uint16, delay | |
Key: []byte("key-" + strconv.FormatUint(vbSeq, 10) + "-" + strconv.FormatUint(sgwSeqno, 10)), | ||
} | ||
|
||
newArr, err := mutateSyncData(sgwSeqno, syncData) | ||
if sgwSeqno == 0 { | ||
newArr, chanCount, err = dcp.mutateWithDedupe(seqList, chanCount) | ||
} else { | ||
newArr, chanCount, err = dcp.mutateSyncData(sgwSeqno, chanCount) | ||
} | ||
if err != nil { | ||
log.Printf("Error setting sequence: %v", err) | ||
return | ||
|
@@ -124,12 +148,15 @@ func (dcp *dcpDataGen) vBucketGoroutine(ctx context.Context, vbNo uint16, delay | |
} | ||
} | ||
|
||
func (dcp *dcpDataGen) syncSeqVBucketCreation(ctx context.Context, vbNo uint16, delay time.Duration, syncData db.SyncData) { | ||
func (dcp *dcpDataGen) syncSeqVBucketCreation(ctx context.Context, vbNo uint16, delay time.Duration) { | ||
numGoroutines.Add(1) | ||
defer numGoroutines.Add(-1) | ||
ticker := time.NewTicker(delay) | ||
defer ticker.Stop() | ||
vbSeq := uint64(0) | ||
chanCount := 0 | ||
var err error | ||
var newArr []byte | ||
go func() { | ||
numGoroutines.Add(1) | ||
defer numGoroutines.Add(-1) | ||
|
@@ -178,7 +205,7 @@ func (dcp *dcpDataGen) syncSeqVBucketCreation(ctx context.Context, vbNo uint16, | |
Key: []byte("key-" + strconv.FormatUint(vbSeq, 10) + "-" + strconv.FormatUint(sgwSeqno, 10)), | ||
} | ||
|
||
newArr, err := mutateSyncData(sgwSeqno, syncData) | ||
newArr, chanCount, err = dcp.mutateSyncData(sgwSeqno, chanCount) | ||
if err != nil { | ||
log.Printf("Error setting sequence: %v", err) | ||
return | ||
|
@@ -189,34 +216,74 @@ func (dcp *dcpDataGen) syncSeqVBucketCreation(ctx context.Context, vbNo uint16, | |
} | ||
} | ||
|
||
func updateSyncData(seq uint64, syncData db.SyncData) ([]byte, error) { | ||
syncData.Sequence = seq | ||
syncData.RecentSequences = []uint64{seq} | ||
// update seq info on channel set map | ||
for _, v := range syncData.ChannelSet { | ||
v.Start = seq | ||
func (dcp *dcpDataGen) mutateSyncData(sgwSeqno uint64, chanCount int) ([]byte, int, error) { | ||
chanMap := make(channels.ChannelMap) | ||
chanSet := make([]db.ChannelSetEntry, 0, dcp.numChannelsPerDoc) | ||
chanSetMap := base.Set{} | ||
for i := 0; i < dcp.numChannelsPerDoc; i++ { | ||
if chanCount == dcp.numTotalChannels { | ||
chanCount = 0 // reset channel count so we don't go out of bounds | ||
} | ||
chanName := "test-" + strconv.Itoa(chanCount) | ||
chanMap[chanName] = nil | ||
chanSet = append(chanSet, db.ChannelSetEntry{ | ||
Name: chanName, | ||
Start: sgwSeqno, | ||
}) | ||
chanSetMap[chanName] = struct{}{} | ||
chanCount++ | ||
} | ||
revInf := db.RevInfo{ | ||
ID: "1-abc", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is writing 1-abc fine for changing these tests, or should you use 2-def since this will be a subsequent revision? It probably doesn't matter. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree I should be building out the rev tree properly when running in mode to replicate many updates to a document (I have added this to function |
||
Channels: chanSetMap, | ||
} | ||
revTree := db.RevTree{ | ||
"1-abc": &revInf, | ||
} | ||
|
||
syncData := db.SyncData{ | ||
Sequence: sgwSeqno, | ||
CurrentRev: "1-abc", | ||
History: revTree, | ||
Channels: chanMap, | ||
ChannelSet: chanSet, | ||
TimeSaved: time.Now(), | ||
Cas: "0x000008cc2ee83118", | ||
ClusterUUID: "6a1a82a8ea79aa8b82d3f5667892d9ce", | ||
Crc32c: "0x615126c4", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are you intentionally setting cas and crc32 to invalid values that will trigger an import if I understand correctly? You could macro expand these values There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With regards to the CRC32 value, I don't think for this purpose it matters so going to remove it along with the fake cluster UUID. The cas I will pass into the function and properly set for correctness. |
||
RecentSequences: []uint64{sgwSeqno}, | ||
} | ||
byteArrSync, err := json.Marshal(syncData) | ||
if err != nil { | ||
log.Printf("Error marshalling sync data: %v", err) | ||
return nil, err | ||
return nil, 0, err | ||
} | ||
|
||
return byteArrSync, nil | ||
inp := sgbucket.Xattr{ | ||
Name: "_sync", | ||
Value: byteArrSync, | ||
} | ||
encodedVal := sgbucket.EncodeValueWithXattrs([]byte(`{"some":"body"}`), inp) | ||
return encodedVal, chanCount, nil | ||
} | ||
|
||
func (dcp *dcpDataGen) setupSyncDataForTestRun() db.SyncData { | ||
func (dcp *dcpDataGen) mutateWithDedupe(seqs []uint64, chanCount int) ([]byte, int, error) { | ||
chanMap := make(channels.ChannelMap) | ||
chanSet := make([]db.ChannelSetEntry, 0, dcp.numChannels) | ||
chanSet := make([]db.ChannelSetEntry, 0, dcp.numChannelsPerDoc) | ||
currSeq := seqs[len(seqs)-1] // grab current seq form end of seq list | ||
chanSetMap := base.Set{} | ||
for i := 0; i < dcp.numChannels; i++ { | ||
numChan := strconv.Itoa(i) | ||
chanMap["test-"+numChan] = nil | ||
for i := 0; i < dcp.numChannelsPerDoc; i++ { | ||
if chanCount == dcp.numTotalChannels { | ||
chanCount = 0 // reset channel count so we don't go out of bounds | ||
} | ||
chanName := "test-" + strconv.Itoa(chanCount) | ||
chanMap[chanName] = nil | ||
chanSet = append(chanSet, db.ChannelSetEntry{ | ||
Name: "test-" + numChan, | ||
Name: chanName, | ||
Start: currSeq, | ||
}) | ||
chanSetMap["test-"+numChan] = struct{}{} | ||
chanSetMap[chanName] = struct{}{} | ||
chanCount++ | ||
} | ||
revInf := db.RevInfo{ | ||
ID: "1-abc", | ||
|
@@ -226,33 +293,30 @@ func (dcp *dcpDataGen) setupSyncDataForTestRun() db.SyncData { | |
"1-abc": &revInf, | ||
} | ||
|
||
// return some sync data for the test, with channel info according to test parameters (sequence info added later) | ||
// This will make generation of syn data more efficient, instead of generating the whole sync data object we just | ||
// update sequence information on each mutation | ||
return db.SyncData{ | ||
Sequence: 0, | ||
CurrentRev: "1-abc", | ||
History: revTree, | ||
Channels: chanMap, | ||
ChannelSet: chanSet, | ||
TimeSaved: time.Now(), | ||
Cas: "0x000008cc2ee83118", | ||
ClusterUUID: "6a1a82a8ea79aa8b82d3f5667892d9ce", | ||
Crc32c: "0x615126c4", | ||
syncData := db.SyncData{ | ||
Sequence: currSeq, | ||
CurrentRev: "1-abc", | ||
History: revTree, | ||
Channels: chanMap, | ||
ChannelSet: chanSet, | ||
TimeSaved: time.Now(), | ||
Cas: "0x000008cc2ee83118", | ||
ClusterUUID: "6a1a82a8ea79aa8b82d3f5667892d9ce", | ||
Crc32c: "0x615126c4", | ||
RecentSequences: seqs, | ||
} | ||
} | ||
|
||
func mutateSyncData(sgwSeqno uint64, syncData db.SyncData) ([]byte, error) { | ||
newArr, err := updateSyncData(sgwSeqno, syncData) | ||
byteArrSync, err := json.Marshal(syncData) | ||
if err != nil { | ||
return nil, err | ||
log.Printf("Error marshalling sync data: %v", err) | ||
return nil, 0, err | ||
} | ||
|
||
inp := sgbucket.Xattr{ | ||
Name: "_sync", | ||
Value: newArr, | ||
Value: byteArrSync, | ||
} | ||
encodedVal := sgbucket.EncodeValueWithXattrs([]byte(`{"some":"body"}`), inp) | ||
return encodedVal, nil | ||
return encodedVal, chanCount, nil | ||
} | ||
|
||
func createDCPClient(t *testing.T, ctx context.Context, bucket *base.GocbV2Bucket, callback sgbucket.FeedEventCallbackFunc, dbStats *expvar.Map) (*base.DCPClient, error) { | ||
|
@@ -264,7 +328,7 @@ func createDCPClient(t *testing.T, ctx context.Context, bucket *base.GocbV2Bucke | |
AgentPriority: gocbcore.DcpAgentPriorityMed, | ||
CheckpointPrefix: "", | ||
} | ||
|
||
// fake client that we want to hook into | ||
client, err := base.NewDCPClientForTest(ctx, t, "test", callback, options, bucket, 1024) | ||
if err != nil { | ||
return nil, err | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know this matters, is this something that is worth writing a jitter into so we get variable load, or is this just a throttle?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea for a jitter. May work for a future enhancement for the tool. The idea of this sleep here is to stop all vBucket goroutines starting at the same time, I found that if we were starting all vBuckets at the same time the vBuckets with the same delays were allocating and writing sequences to the cache at the same time, so it wasn’t giving a real world example of how the caching system would work. The jitter suggestion here would work and would arguably make the tool more user friendly too, but for the purpose of us evaluating fixes as they go into the cache I think having a more standardised approach like the sleep would work better.
I worry with random jitter between 0 and N you could end up with a run with higher delays and a run with lower delays and they could give different throughput results for the same configuration making it hard to evaluate if fixes as they go into the cache are actually producing any throughput difference. I could be overthinking this though