-
Notifications
You must be signed in to change notification settings - Fork 139
CBG-4605: dcp mode for caching tool #7483
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
delayIndex := 0 | ||
// vBucket creation logic | ||
for i := 0; i < 1024; i++ { | ||
time.Sleep(500 * time.Millisecond) // we need a slight delay each iteration otherwise many vBuckets end up writing at the same times |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know this matters, is this something that is worth writing a jitter into so we get variable load, or is this just a throttle?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea for a jitter. May work for a future enhancement for the tool. The idea of this sleep here is to stop all vBucket goroutines starting at the same time, I found that if we were starting all vBuckets at the same time the vBuckets with the same delays were allocating and writing sequences to the cache at the same time, so it wasn’t giving a real world example of how the caching system would work. The jitter suggestion here would work and would arguably make the tool more user friendly too, but for the purpose of us evaluating fixes as they go into the cache I think having a more standardised approach like the sleep would work better.
I worry with random jitter between 0 and N you could end up with a run with higher delays and a run with lower delays and they could give different throughput results for the same configuration making it hard to evaluate if fixes as they go into the cache are actually producing any throughput difference. I could be overthinking this though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't really been paying as much attention to this, but I thought I'd give some readability comments.
chanCount++ | ||
} | ||
revInf := db.RevInfo{ | ||
ID: "1-abc", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is writing 1-abc fine for changing these tests, or should you use 2-def since this will be a subsequent revision?
It probably doesn't matter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree I should be building out the rev tree properly when running in mode to replicate many updates to a document (I have added this to function mutateWithDedupe
.
Cas: "0x000008cc2ee83118", | ||
ClusterUUID: "6a1a82a8ea79aa8b82d3f5667892d9ce", | ||
Crc32c: "0x615126c4", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you intentionally setting cas and crc32 to invalid values that will trigger an import if I understand correctly?
You could macro expand these values
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With regards to the CRC32 value, I don't think for this purpose it matters so going to remove it along with the fake cluster UUID. The cas I will pass into the function and properly set for correctness.
base/dcp_client.go
Outdated
dc.startWorkers(dc.ctx) | ||
} | ||
|
||
// NewDCPClientForTest creates a dcp client object for caching test purposes, does not actually create a client you can stream to. TEST ONLY. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the purpose of this separate constructor? If this is only to support a separate number of vbuckets, then I'd just create a newDCPClientWithVbucketCount()
and have all the common data there.
I'm not sure that this code is going to be sensitive to the number of vbuckets, but presumably the number of workers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea of having this separate constructor was to avoid the call to GetMaxVbNo
inside NewDCPClient
given this test isn't using real world bucket in server this call would fail so needed to be able top construct the DCP client without actually it being in anyway connected to server.
I thought about passing in a flag to NewDCPClient
to get around this but thought against changing any functions like that just for a test tool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was just thinking of making the extra option to the constructor private:
// NewDCPClient constructs a DCPClient which returns DCP events using the callback function.
func NewDCPClient(ctx context.Context, ID string, callback sgbucket.FeedEventCallbackFunc, options DCPClientOptions, bucket *GocbV2Bucket) (*DCPClient, error) {
numVbuckets, err := bucket.GetMaxVbno()
if err != nil {
return nil, fmt.Errorf("Unable to determine maxVbNo when creating DCP client: %w", err)
}
return newDCPClientWithForBuckets(ctx, ID, callback, options, bucket, numvBuckets)
}
func NewDCPClientForTest(ctx context.Context, t *testing.T, ID string, callback sgbucket.FeedEventCallbackFunc, options DCPClientOptions, bucket *GocbV2Bucket, numVbuckets uint16) (*DCPClient, error) {
return newDCPClientWithForBuckets(ctx, ID, callback, options, bucket, numvBuckets)
}
// newDCPClientWithvBucketCount constructs a DCPClient simulating the number of vbuckets. See NewDCPClient for usage.
func newDCPClientWithvBucketCount(ctx context.Context, ID string, callback sgbucket.FeedEventCallbackFunc, options DCPClientOptions, bucket *GocbV2Bucket, numVbuckets uint16) (*DCPClient, error) {
numWorkers := DefaultNumWorkers
if options.NumWorkers > 0 {
numWorkers = options.NumWorkers
}
if options.AgentPriority == gocbcore.DcpAgentPriorityHigh {
return nil, fmt.Errorf("sync gateway should not set high priority for DCP feeds")
}
if options.CheckpointPrefix == "" {
if options.MetadataStoreType == DCPMetadataStoreCS {
return nil, fmt.Errorf("callers must specify a checkpoint prefix when persisting metadata")
}
}
client := &DCPClient{
ctx: ctx,
workers: make([]*DCPWorker, numWorkers),
numVbuckets: numVbuckets,
callback: callback,
ID: ID,
spec: bucket.GetSpec(),
supportsCollections: bucket.IsSupported(sgbucket.BucketStoreFeatureCollections),
terminator: make(chan bool),
doneChannel: make(chan error, 1),
failOnRollback: options.FailOnRollback,
checkpointPrefix: options.CheckpointPrefix,
dbStats: options.DbStats,
agentPriority: options.AgentPriority,
collectionIDs: options.CollectionIDs,
}
// Initialize active vbuckets
client.activeVbuckets = make(map[uint16]struct{})
for vbNo := uint16(0); vbNo < numVbuckets; vbNo++ {
client.activeVbuckets[vbNo] = struct{}{}
}
checkpointPrefix := fmt.Sprintf("%s:%v", client.checkpointPrefix, ID)
switch options.MetadataStoreType {
case DCPMetadataStoreCS:
// TODO: Change GetSingleDataStore to a metadata Store?
metadataStore := bucket.DefaultDataStore()
client.metadata = NewDCPMetadataCS(ctx, metadataStore, numVbuckets, numWorkers, checkpointPrefix)
case DCPMetadataStoreInMemory:
client.metadata = NewDCPMetadataMem(numVbuckets)
default:
return nil, fmt.Errorf("Unknown Metadatatype: %d", options.MetadataStoreType)
}
if options.InitialMetadata != nil {
for vbID, meta := range options.InitialMetadata {
client.metadata.SetMeta(uint16(vbID), meta)
}
}
if len(client.collectionIDs) == 0 {
client.collectionIDs = []uint32{DefaultCollectionID}
}
client.oneShot = options.OneShot
return client, nil
}
I actually feel more strongly about a refactor of this type since the test function actually omits the ability to tune num workers and so has already deviated from the behavior of the non test function. We aren't tuning this number but this can actually also be a source of bottlenecks and something that could be tested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see now will push update to carry this out
CBG-4605
Pre-review checklist
fmt.Print
,log.Print
, ...)base.UD(docID)
,base.MD(dbName)
)docs/api
Integration Tests
GSI=true,xattrs=true
https://jenkins.sgwdev.com/job/SyncGateway-Integration/000/