Skip to content

CBG-4605: dcp mode for caching tool #7483

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open

CBG-4605: dcp mode for caching tool #7483

wants to merge 5 commits into from

Conversation

gregns1
Copy link
Contributor

@gregns1 gregns1 commented Apr 15, 2025

CBG-4605

  • A mode to run 1024 goroutines to mock vBuckets and call direct into mutation on dcp client for caching feed load generation.

Pre-review checklist

  • Removed debug logging (fmt.Print, log.Print, ...)
  • Logging sensitive data? Make sure it's tagged (e.g. base.UD(docID), base.MD(dbName))
  • Updated relevant information in the API specifications (such as endpoint descriptions, schemas, ...) in docs/api

Integration Tests

@gregns1 gregns1 self-assigned this Apr 15, 2025
delayIndex := 0
// vBucket creation logic
for i := 0; i < 1024; i++ {
time.Sleep(500 * time.Millisecond) // we need a slight delay each iteration otherwise many vBuckets end up writing at the same times
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know this matters, is this something that is worth writing a jitter into so we get variable load, or is this just a throttle?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea for a jitter. May work for a future enhancement for the tool. The idea of this sleep here is to stop all vBucket goroutines starting at the same time, I found that if we were starting all vBuckets at the same time the vBuckets with the same delays were allocating and writing sequences to the cache at the same time, so it wasn’t giving a real world example of how the caching system would work. The jitter suggestion here would work and would arguably make the tool more user friendly too, but for the purpose of us evaluating fixes as they go into the cache I think having a more standardised approach like the sleep would work better.

I worry with random jitter between 0 and N you could end up with a run with higher delays and a run with lower delays and they could give different throughput results for the same configuration making it hard to evaluate if fixes as they go into the cache are actually producing any throughput difference. I could be overthinking this though

Copy link
Collaborator

@torcolvin torcolvin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't really been paying as much attention to this, but I thought I'd give some readability comments.

chanCount++
}
revInf := db.RevInfo{
ID: "1-abc",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is writing 1-abc fine for changing these tests, or should you use 2-def since this will be a subsequent revision?

It probably doesn't matter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree I should be building out the rev tree properly when running in mode to replicate many updates to a document (I have added this to function mutateWithDedupe.

Comment on lines 251 to 253
Cas: "0x000008cc2ee83118",
ClusterUUID: "6a1a82a8ea79aa8b82d3f5667892d9ce",
Crc32c: "0x615126c4",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you intentionally setting cas and crc32 to invalid values that will trigger an import if I understand correctly?

You could macro expand these values

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With regards to the CRC32 value, I don't think for this purpose it matters so going to remove it along with the fake cluster UUID. The cas I will pass into the function and properly set for correctness.

dc.startWorkers(dc.ctx)
}

// NewDCPClientForTest creates a dcp client object for caching test purposes, does not actually create a client you can stream to. TEST ONLY.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of this separate constructor? If this is only to support a separate number of vbuckets, then I'd just create a newDCPClientWithVbucketCount() and have all the common data there.

I'm not sure that this code is going to be sensitive to the number of vbuckets, but presumably the number of workers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea of having this separate constructor was to avoid the call to GetMaxVbNo inside NewDCPClient given this test isn't using real world bucket in server this call would fail so needed to be able top construct the DCP client without actually it being in anyway connected to server.

I thought about passing in a flag to NewDCPClient to get around this but thought against changing any functions like that just for a test tool.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just thinking of making the extra option to the constructor private:

// NewDCPClient constructs a DCPClient which returns DCP events using the callback function.
func NewDCPClient(ctx context.Context, ID string, callback sgbucket.FeedEventCallbackFunc, options DCPClientOptions, bucket *GocbV2Bucket) (*DCPClient, error) {
	numVbuckets, err := bucket.GetMaxVbno()
	if err != nil {
		return nil, fmt.Errorf("Unable to determine maxVbNo when creating DCP client: %w", err)
	}
	return newDCPClientWithForBuckets(ctx, ID, callback, options, bucket, numvBuckets)
}	

func NewDCPClientForTest(ctx context.Context, t *testing.T, ID string, callback sgbucket.FeedEventCallbackFunc, options DCPClientOptions, bucket *GocbV2Bucket, numVbuckets uint16) (*DCPClient, error) {
	return newDCPClientWithForBuckets(ctx, ID, callback, options, bucket, numvBuckets)
}

// newDCPClientWithvBucketCount constructs a DCPClient simulating the number of vbuckets. See NewDCPClient for usage.
func newDCPClientWithvBucketCount(ctx context.Context, ID string, callback sgbucket.FeedEventCallbackFunc, options DCPClientOptions, bucket *GocbV2Bucket, numVbuckets uint16) (*DCPClient, error) {
	numWorkers := DefaultNumWorkers
	if options.NumWorkers > 0 {
		numWorkers = options.NumWorkers
	}

	if options.AgentPriority == gocbcore.DcpAgentPriorityHigh {
		return nil, fmt.Errorf("sync gateway should not set high priority for DCP feeds")
	}

	if options.CheckpointPrefix == "" {
		if options.MetadataStoreType == DCPMetadataStoreCS {
			return nil, fmt.Errorf("callers must specify a checkpoint prefix when persisting metadata")
		}
	}
	client := &DCPClient{
		ctx:                 ctx,
		workers:             make([]*DCPWorker, numWorkers),
		numVbuckets:         numVbuckets,
		callback:            callback,
		ID:                  ID,
		spec:                bucket.GetSpec(),
		supportsCollections: bucket.IsSupported(sgbucket.BucketStoreFeatureCollections),
		terminator:          make(chan bool),
		doneChannel:         make(chan error, 1),
		failOnRollback:      options.FailOnRollback,
		checkpointPrefix:    options.CheckpointPrefix,
		dbStats:             options.DbStats,
		agentPriority:       options.AgentPriority,
		collectionIDs:       options.CollectionIDs,
	}

	// Initialize active vbuckets
	client.activeVbuckets = make(map[uint16]struct{})
	for vbNo := uint16(0); vbNo < numVbuckets; vbNo++ {
		client.activeVbuckets[vbNo] = struct{}{}
	}

	checkpointPrefix := fmt.Sprintf("%s:%v", client.checkpointPrefix, ID)
	switch options.MetadataStoreType {
	case DCPMetadataStoreCS:
		// TODO: Change GetSingleDataStore to a metadata Store?
		metadataStore := bucket.DefaultDataStore()
		client.metadata = NewDCPMetadataCS(ctx, metadataStore, numVbuckets, numWorkers, checkpointPrefix)
	case DCPMetadataStoreInMemory:
		client.metadata = NewDCPMetadataMem(numVbuckets)
	default:
		return nil, fmt.Errorf("Unknown Metadatatype: %d", options.MetadataStoreType)
	}
	if options.InitialMetadata != nil {
		for vbID, meta := range options.InitialMetadata {
			client.metadata.SetMeta(uint16(vbID), meta)
		}
	}
	if len(client.collectionIDs) == 0 {
		client.collectionIDs = []uint32{DefaultCollectionID}
	}

	client.oneShot = options.OneShot

	return client, nil
}

I actually feel more strongly about a refactor of this type since the test function actually omits the ability to tune num workers and so has already deviated from the behavior of the non test function. We aren't tuning this number but this can actually also be a source of bottlenecks and something that could be tested.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see now will push update to carry this out

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants