Skip to content

CBG-4605: dcp mode for caching tool #7483

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions base/dcp_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"
"math"
"sync"
"testing"
"time"

"github.com/couchbase/gocbcore/v10"
Expand Down Expand Up @@ -657,3 +658,54 @@ func getLatestVbUUID(failoverLog []gocbcore.FailoverEntry) (vbUUID gocbcore.VbUU
func (dc *DCPClient) GetMetadataKeyPrefix() string {
return dc.metadata.GetKeyPrefix()
}

// StartWorkersForTest will iterate through dcp workers to start them, to be used for caching testing purposes only.
func (dc *DCPClient) StartWorkersForTest(t *testing.T) {
dc.startWorkers(dc.ctx)
}

// NewDCPClientForTest creates a dcp client object for caching test purposes, does not actually create a client you can stream to. TEST ONLY.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of this separate constructor? If this is only to support a separate number of vbuckets, then I'd just create a newDCPClientWithVbucketCount() and have all the common data there.

I'm not sure that this code is going to be sensitive to the number of vbuckets, but presumably the number of workers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea of having this separate constructor was to avoid the call to GetMaxVbNo inside NewDCPClient given this test isn't using real world bucket in server this call would fail so needed to be able top construct the DCP client without actually it being in anyway connected to server.

I thought about passing in a flag to NewDCPClient to get around this but thought against changing any functions like that just for a test tool.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just thinking of making the extra option to the constructor private:

// NewDCPClient constructs a DCPClient which returns DCP events using the callback function.
func NewDCPClient(ctx context.Context, ID string, callback sgbucket.FeedEventCallbackFunc, options DCPClientOptions, bucket *GocbV2Bucket) (*DCPClient, error) {
	numVbuckets, err := bucket.GetMaxVbno()
	if err != nil {
		return nil, fmt.Errorf("Unable to determine maxVbNo when creating DCP client: %w", err)
	}
	return newDCPClientWithForBuckets(ctx, ID, callback, options, bucket, numvBuckets)
}	

func NewDCPClientForTest(ctx context.Context, t *testing.T, ID string, callback sgbucket.FeedEventCallbackFunc, options DCPClientOptions, bucket *GocbV2Bucket, numVbuckets uint16) (*DCPClient, error) {
	return newDCPClientWithForBuckets(ctx, ID, callback, options, bucket, numvBuckets)
}

// newDCPClientWithvBucketCount constructs a DCPClient simulating the number of vbuckets. See NewDCPClient for usage.
func newDCPClientWithvBucketCount(ctx context.Context, ID string, callback sgbucket.FeedEventCallbackFunc, options DCPClientOptions, bucket *GocbV2Bucket, numVbuckets uint16) (*DCPClient, error) {
	numWorkers := DefaultNumWorkers
	if options.NumWorkers > 0 {
		numWorkers = options.NumWorkers
	}

	if options.AgentPriority == gocbcore.DcpAgentPriorityHigh {
		return nil, fmt.Errorf("sync gateway should not set high priority for DCP feeds")
	}

	if options.CheckpointPrefix == "" {
		if options.MetadataStoreType == DCPMetadataStoreCS {
			return nil, fmt.Errorf("callers must specify a checkpoint prefix when persisting metadata")
		}
	}
	client := &DCPClient{
		ctx:                 ctx,
		workers:             make([]*DCPWorker, numWorkers),
		numVbuckets:         numVbuckets,
		callback:            callback,
		ID:                  ID,
		spec:                bucket.GetSpec(),
		supportsCollections: bucket.IsSupported(sgbucket.BucketStoreFeatureCollections),
		terminator:          make(chan bool),
		doneChannel:         make(chan error, 1),
		failOnRollback:      options.FailOnRollback,
		checkpointPrefix:    options.CheckpointPrefix,
		dbStats:             options.DbStats,
		agentPriority:       options.AgentPriority,
		collectionIDs:       options.CollectionIDs,
	}

	// Initialize active vbuckets
	client.activeVbuckets = make(map[uint16]struct{})
	for vbNo := uint16(0); vbNo < numVbuckets; vbNo++ {
		client.activeVbuckets[vbNo] = struct{}{}
	}

	checkpointPrefix := fmt.Sprintf("%s:%v", client.checkpointPrefix, ID)
	switch options.MetadataStoreType {
	case DCPMetadataStoreCS:
		// TODO: Change GetSingleDataStore to a metadata Store?
		metadataStore := bucket.DefaultDataStore()
		client.metadata = NewDCPMetadataCS(ctx, metadataStore, numVbuckets, numWorkers, checkpointPrefix)
	case DCPMetadataStoreInMemory:
		client.metadata = NewDCPMetadataMem(numVbuckets)
	default:
		return nil, fmt.Errorf("Unknown Metadatatype: %d", options.MetadataStoreType)
	}
	if options.InitialMetadata != nil {
		for vbID, meta := range options.InitialMetadata {
			client.metadata.SetMeta(uint16(vbID), meta)
		}
	}
	if len(client.collectionIDs) == 0 {
		client.collectionIDs = []uint32{DefaultCollectionID}
	}

	client.oneShot = options.OneShot

	return client, nil
}

I actually feel more strongly about a refactor of this type since the test function actually omits the ability to tune num workers and so has already deviated from the behavior of the non test function. We aren't tuning this number but this can actually also be a source of bottlenecks and something that could be tested.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see now will push update to carry this out

func NewDCPClientForTest(ctx context.Context, t *testing.T, ID string, callback sgbucket.FeedEventCallbackFunc, options DCPClientOptions, bucket *GocbV2Bucket, numVbuckets uint16) (*DCPClient, error) {
numWorkers := DefaultNumWorkers

client := &DCPClient{
ctx: ctx,
workers: make([]*DCPWorker, numWorkers),
numVbuckets: numVbuckets,
callback: callback,
ID: ID,
spec: bucket.GetSpec(),
supportsCollections: bucket.IsSupported(sgbucket.BucketStoreFeatureCollections),
terminator: make(chan bool),
doneChannel: make(chan error, 1),
failOnRollback: options.FailOnRollback,
checkpointPrefix: options.CheckpointPrefix,
dbStats: options.DbStats,
agentPriority: options.AgentPriority,
collectionIDs: options.CollectionIDs,
}

// Initialize active vbuckets
client.activeVbuckets = make(map[uint16]struct{})
for vbNo := uint16(0); vbNo < numVbuckets; vbNo++ {
client.activeVbuckets[vbNo] = struct{}{}
}

checkpointPrefix := fmt.Sprintf("%s:%v", client.checkpointPrefix, ID)
switch options.MetadataStoreType {
case DCPMetadataStoreCS:
// TODO: Change GetSingleDataStore to a metadata Store?
metadataStore := bucket.DefaultDataStore()
client.metadata = NewDCPMetadataCS(ctx, metadataStore, numVbuckets, numWorkers, checkpointPrefix)
case DCPMetadataStoreInMemory:
client.metadata = NewDCPMetadataMem(numVbuckets)
default:
return nil, fmt.Errorf("Unknown Metadatatype: %d", options.MetadataStoreType)
}

if len(client.collectionIDs) == 0 {
client.collectionIDs = []uint32{DefaultCollectionID}
}

return client, nil
}
17 changes: 17 additions & 0 deletions db/util_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ func (db *DatabaseContext) CallProcessEntry(t *testing.T, ctx context.Context, l
db.changeCache.processEntry(ctx, log)
}

// GetCachedChanges will grab cached changes form channel cache for caching tool, not to be used outside test code.
func (db *DatabaseContext) GetCachedChanges(t *testing.T, ctx context.Context, chanID channels.ID) ([]*LogEntry, error) {
logs, err := db.changeCache.getChannelCache().GetCachedChanges(ctx, chanID)
return logs, err
}

func (db *DatabaseContext) NewDCPCachingCountWaiter(tb testing.TB) *StatWaiter {
return db.NewStatWaiter(db.DbStats.Database().DCPCachingCount, tb)
}
Expand Down Expand Up @@ -769,3 +775,14 @@ func GetIndexPartitionCount(t *testing.T, bucket *base.GocbV2Bucket, dsName sgbu
require.Failf(t, "index not found", "index %s not found in %+v", indexName, output)
return 0
}

// GetMutationListener retrieves mutation listener form database context, to be used only for testing purposes.
func (db *DatabaseContext) GetMutationListener(t *testing.T) changeListener {
return db.mutationListener
}

// InitChannel is a test-only function to initialize a channel in the channel cache.
func (db *DatabaseContext) InitChannel(ctx context.Context, t *testing.T, chanName string) error {
_, err := db.channelCache.getSingleChannelCache(ctx, channels.NewID(chanName, base.DefaultCollectionID))
return err
}
Loading
Loading