Skip to content

Commit 632624b

Browse files
authored
REP-2943 / REP-2944: Add filter parameter to /check endpoint and enable query filtering (#4)
The `/check` endpoint takes a JSON filter. The verifier parses the JSON filter to `map[string]any`. The verifier starts a change stream without query filtering. The verifier generates a recheck document for a change event of a filtered document, but the document should be filtered out during fetching and thus not compared by the verifier.
1 parent 1350e61 commit 632624b

File tree

8 files changed

+244
-29
lines changed

8 files changed

+244
-29
lines changed

README.md

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ To set a port, use `--serverPort <port number>`. The default is 27020.
6363

6464

6565

66-
1. After launching the verifier (see above), you can send it requests to get it to start verifying. The verification process is started by using the `check`command. The verification process will keep running until you tell the verifier to stop. It will keep track of the inconsistencies it has found and will keep checking those inconsistencies hoping that eventually they will resolve.
66+
1. After launching the verifier (see above), you can send it requests to get it to start verifying. The verification process is started by using the `check`command. An [optional `filter` parameter](#document-filtering) can be passed within the `check` request body to only check documents within that filter. The verification process will keep running until you tell the verifier to stop. It will keep track of the inconsistencies it has found and will keep checking those inconsistencies hoping that eventually they will resolve.
6767

6868
```
6969
curl -H "Content-Type: application/json" -X POST -d '{}' http://127.0.0.1:27020/api/v1/check
@@ -98,7 +98,6 @@ The verifier will now check to completion to make sure that there are no inconsi
9898
9999
The verifier’s iterative process can handle data changes while it is running, until you hit the writesOff endpoint. However, it cannot handle DDL commands. If the verifier receives a DDL change stream event (drop, dropDatabase, rename), the verification will fail. If an untracked DDL event (create, createIndexes, dropIndexes, modify) occurs, the verifier may miss the change.
100100
101-
102101
# Benchmarking Results
103102
104103
Ran on m6id.metal + M40 with 3 replica sets
@@ -131,6 +130,22 @@ The migration-verifier has two steps:
131130
5. Every document to check is written with a generation number. A checking round checks documents for a specific generation. When a check round begins, we start writing new documents with a new generation number
132131
6. The verifier fetches all collection/index/view information on the source and destination and confirms they are identical in every generation. This is duplicated work, but it's fast and convenient for the code.
133132
133+
# Document Filtering
134+
135+
Document filtering can be enabled by passing a `filter` parameter in the `check` request body when starting a check. The filter takes a JSON query. The query syntax is identical to the [read operation query syntax](https://www.mongodb.com/docs/manual/tutorial/query-documents/#std-label-read-operations-query-argument). For example, running the following command makes the verifier check to only check documents within the filter `{"inFilter": {"$ne": false}}` for _all_ namespaces:
136+
137+
```
138+
curl -H "Content-Type: application/json" -X POST -d '{{"filter": {"inFilter": {"$ne": false}}}}' http://127.0.0.1:27020/api/v1/check
139+
```
140+
If a checking is started with the above filter, the table below summarizes the verifier's behavior:
141+
142+
| Source Document | Destination Document | Verifier's Behavior |
143+
|---------------------------------------------------|---------------------------------------------------|---------------------------------------------|
144+
| `{"_id": <id>, "inFilter": true, "diff": "src"}` | `{"_id": <id>, "inFilter": true, "diff": "dst"}` | ❗ (Finds a document with differing content) |
145+
| `{"_id": <id>, "inFilter": false, "diff": "src"}` | `{"_id": <id>, "inFilter": false, "diff": "dst"}` | ✅ (Skips a document) |
146+
| `{"_id": <id>, "inFilter": true, "diff": "src"}` | `{"_id": <id>, "inFilter": false, "diff": "dst"}` | ❗ (Finds a document missing on Destination) |
147+
| `{"_id": <id>, "inFilter": false, "diff": "src"}` | `{"_id": <id>, "inFilter": true, "diff": "dst"}` | ❗ (Finds a document missing on Source) |
148+
134149
# Checking Failures
135150
136151
Because the algorithm is generational, the only failures we care about are in the last generation to run. The first goal is to find the last generation:

internal/partitions/partition.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ func (p *Partition) FindCmd(
137137
// (e.g. use the partitions on the source to read the destination for verification)
138138
// If the passed-in buildinfo indicates a mongodb version < 5.0, type bracketing is not used.
139139
// filterAndPredicates is a slice of filter criteria that's used to construct the "filter" field in the find option.
140-
func (p *Partition) GetFindOptions(buildInfo *bson.M, filterAndPredicates []bson.D) bson.D {
140+
func (p *Partition) GetFindOptions(buildInfo *bson.M, filterAndPredicates bson.A) bson.D {
141141
if p == nil {
142142
if len(filterAndPredicates) > 0 {
143143
return bson.D{{"filter", bson.D{{"$and", filterAndPredicates}}}}
@@ -190,16 +190,16 @@ func (p *Partition) GetFindOptions(buildInfo *bson.M, filterAndPredicates []bson
190190
func (p *Partition) filterWithNoTypeBracketing() bson.D {
191191
// We use $expr to avoid type bracketing and allow comparison of different _id types,
192192
// and $literal to avoid MQL injection from an _id's value.
193-
return bson.D{{"$and", bson.A{
193+
return bson.D{{"$and", []bson.D{
194194
// All _id values >= lower bound.
195-
bson.D{{"$expr", bson.D{
195+
{{"$expr", bson.D{
196196
{"$gte", bson.A{
197197
"$_id",
198198
bson.D{{"$literal", p.Key.Lower}},
199199
}},
200200
}}},
201201
// All _id values <= upper bound.
202-
bson.D{{"$expr", bson.D{
202+
{{"$expr", bson.D{
203203
{"$lte", bson.A{
204204
"$_id",
205205
bson.D{{"$literal", p.Upper}},
@@ -212,10 +212,10 @@ func (p *Partition) filterWithNoTypeBracketing() bson.D {
212212
// partition. This filter will not properly handle mixed-type _ids -- if the upper and lower
213213
// bounds are of different types (except minkey/maxkey), nothing will be returned.
214214
func (p *Partition) filterWithTypeBracketing() bson.D {
215-
return bson.D{{"$and", bson.A{
215+
return bson.D{{"$and", []bson.D{
216216
// All _id values >= lower bound.
217-
bson.D{{"_id", bson.D{{"$gte", p.Key.Lower}}}},
217+
{{"_id", bson.D{{"$gte", p.Key.Lower}}}},
218218
// All _id values <= upper bound.
219-
bson.D{{"_id", bson.D{{"$lte", p.Upper}}}},
219+
{{"_id", bson.D{{"$lte", p.Upper}}}},
220220
}}}
221221
}

internal/partitions/partition_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,14 +134,14 @@ func makeExpectedFilter(lower, upper interface{}) bson.D {
134134
return bson.D{{"$and", bson.A{
135135
bson.D{{"$and", []bson.D{
136136
// All _id values >= lower bound.
137-
bson.D{{"$expr", bson.D{
137+
{{"$expr", bson.D{
138138
{"$gte", bson.A{
139139
"$_id",
140140
bson.D{{"$literal", lower}},
141141
}},
142142
}}},
143143
// All _id values <= upper bound.
144-
bson.D{{"$expr", bson.D{
144+
{{"$expr", bson.D{
145145
{"$lte", bson.A{
146146
"$_id",
147147
bson.D{{"$literal", upper}},
@@ -155,9 +155,9 @@ func makeExpectedFilterWithTypeBracketing(lower, upper interface{}) bson.D {
155155
return bson.D{{"$and", bson.A{
156156
bson.D{{"$and", []bson.D{
157157
// All _id values >= lower bound.
158-
bson.D{{"_id", bson.D{{"$gte", lower}}}},
158+
{{"_id", bson.D{{"$gte", lower}}}},
159159
// All _id values <= upper bound.
160-
bson.D{{"_id", bson.D{{"$lte", upper}}}},
160+
{{"_id", bson.D{{"$lte", upper}}}},
161161
}}},
162162
}}}
163163
}

internal/verifier/check.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ var verificationStatusCheckInterval time.Duration = 15 * time.Second
3333
// testChan is a pair of channels for coordinating generations in tests.
3434
// testChan[0] is a channel signalled when when a generation is complete
3535
// testChan[1] is a channel signalled when Check should continue with the next generation.
36-
func (verifier *Verifier) Check(ctx context.Context) {
36+
func (verifier *Verifier) Check(ctx context.Context, filter map[string]any) {
3737
go func() {
38-
err := verifier.CheckDriver(ctx, nil)
38+
err := verifier.CheckDriver(ctx, filter)
3939
if err != nil {
4040
verifier.logger.Fatal().Err(err).Msgf("Fatal error in generation %d", verifier.generation)
4141
}
@@ -122,7 +122,7 @@ func (verifier *Verifier) CheckWorker(ctx context.Context) error {
122122
return nil
123123
}
124124

125-
func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testChan ...chan struct{}) error {
125+
func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any, testChan ...chan struct{}) error {
126126
verifier.mux.Lock()
127127
if verifier.running {
128128
verifier.mux.Unlock()

internal/verifier/migration_verifier.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ type Verifier struct {
135135
// A user-defined $match-compatible document-level query filter.
136136
// The filter is applied to all namespaces in both initial checking and iterative checking.
137137
// The verifier only checks documents within the filter.
138-
globalFilter bson.D
138+
globalFilter map[string]any
139139
}
140140

141141
// VerificationStatus holds the Verification Status
@@ -397,18 +397,20 @@ func (verifier *Verifier) getGenerationWhileLocked() (int, bool) {
397397
return verifier.generation, verifier.lastGeneration
398398
}
399399

400-
func (verifier *Verifier) maybeAppendGlobalFilterToPredicates(predicates []bson.D) []bson.D {
400+
func (verifier *Verifier) maybeAppendGlobalFilterToPredicates(predicates bson.A) bson.A {
401401
if verifier.globalFilter == nil {
402+
verifier.logger.Debug().Msg("No filter to append; globalFilter is nil")
402403
return predicates
403404
}
405+
verifier.logger.Debug().Str("filter", fmt.Sprintf("%v", verifier.globalFilter)).Msg("Appending filter to find query")
404406
return append(predicates, verifier.globalFilter)
405407
}
406408

407409
func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mongo.Collection, buildInfo *bson.M,
408410
startAtTs *primitive.Timestamp, task *VerificationTask) (*mongo.Cursor, error) {
409411
var findOptions bson.D
410412
runCommandOptions := options.RunCmd()
411-
var andPredicates []bson.D
413+
var andPredicates bson.A
412414

413415
if len(task.Ids) > 0 {
414416
andPredicates = append(andPredicates, bson.D{{"_id", bson.M{"$in": task.Ids}}})
@@ -642,6 +644,9 @@ func (verifier *Verifier) ProcessVerifyTask(workerNum int, task *VerificationTas
642644
ids = append(ids, v.ID)
643645
dataSizes = append(dataSizes, v.dataSize)
644646
}
647+
// Update ids of the failed task so that only ids from mismatches are reported.
648+
// Ids of matching documents are discarded and hidden from the mismatching documents report.
649+
task.Ids = ids
645650
err := verifier.InsertFailedCompareRecheckDocs(task.QueryFilter.Namespace, ids, dataSizes)
646651
if err != nil {
647652
verifier.logger.Error().Msgf("[Worker %d] Error inserting document mismatch into Recheck queue: %+v", workerNum, err)

internal/verifier/migration_verifier_test.go

Lines changed: 96 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ func (suite *MultiDataVersionTestSuite) TestVerifierFetchDocuments() {
210210
expectTwoCommonDocs(srcDocumentMap, dstDocumentMap)
211211

212212
// Test fetchDocuments for ids with a global filter.
213-
verifier.globalFilter = bson.D{{"num", bson.D{{"$lt", 100}}}}
213+
verifier.globalFilter = map[string]any{"num": map[string]any{"$lt": 100}}
214214
srcDocumentMap, dstDocumentMap, err = verifier.fetchDocuments(task)
215215
suite.Require().NoError(err)
216216
expectOneCommonDoc(srcDocumentMap, dstDocumentMap)
@@ -220,7 +220,7 @@ func (suite *MultiDataVersionTestSuite) TestVerifierFetchDocuments() {
220220
Ns: &partitions.Namespace{DB: "keyhole", Coll: "dealers"},
221221
IsCapped: false,
222222
}
223-
verifier.globalFilter = bson.D{{"num", bson.D{{"$lt", 100}}}}
223+
verifier.globalFilter = map[string]any{"num": map[string]any{"$lt": 100}}
224224
srcDocumentMap, dstDocumentMap, err = verifier.fetchDocuments(task)
225225
suite.Require().NoError(err)
226226
expectOneCommonDoc(srcDocumentMap, dstDocumentMap)
@@ -1428,3 +1428,97 @@ func (suite *MultiDataVersionTestSuite) TestGenerationalRechecking() {
14281428
// there should be a failure from the src insert
14291429
suite.Require().Equal(VerificationStatus{TotalTasks: 1, FailedTasks: 1}, *status)
14301430
}
1431+
1432+
func (suite *MultiDataVersionTestSuite) TestVerifierWithFilter() {
1433+
zerolog.SetGlobalLevel(zerolog.DebugLevel)
1434+
1435+
filter := map[string]any{"inFilter": map[string]any{"$ne": false}}
1436+
verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance)
1437+
verifier.SetSrcNamespaces([]string{"testDb1.testColl1"})
1438+
verifier.SetDstNamespaces([]string{"testDb2.testColl3"})
1439+
verifier.SetNamespaceMap()
1440+
verifier.SetIgnoreBSONFieldOrder(true)
1441+
1442+
ctx := context.Background()
1443+
1444+
srcColl := suite.srcMongoClient.Database("testDb1").Collection("testColl1")
1445+
dstColl := suite.dstMongoClient.Database("testDb2").Collection("testColl3")
1446+
1447+
// Documents {_id: 1} should match, {_id: 2} should be ignored because it's not in the filter.
1448+
_, err := srcColl.InsertOne(ctx, bson.M{"_id": 1, "x": 42, "inFilter": true})
1449+
suite.Require().NoError(err)
1450+
_, err = srcColl.InsertOne(ctx, bson.M{"_id": 2, "x": 53, "inFilter": false})
1451+
suite.Require().NoError(err)
1452+
_, err = dstColl.InsertOne(ctx, bson.M{"_id": 1, "x": 42, "inFilter": true})
1453+
suite.Require().NoError(err)
1454+
1455+
checkDoneChan := make(chan struct{})
1456+
checkContinueChan := make(chan struct{})
1457+
go func() {
1458+
err := verifier.CheckDriver(ctx, filter, checkDoneChan, checkContinueChan)
1459+
suite.Require().NoError(err)
1460+
}()
1461+
1462+
waitForTasks := func() *VerificationStatus {
1463+
status, err := verifier.GetVerificationStatus()
1464+
suite.Require().NoError(err)
1465+
1466+
for status.TotalTasks == 0 && verifier.generation < 10 {
1467+
suite.T().Logf("TotalTasks is 0 (generation=%d); waiting another generation …", verifier.generation)
1468+
checkContinueChan <- struct{}{}
1469+
<-checkDoneChan
1470+
status, err = verifier.GetVerificationStatus()
1471+
suite.Require().NoError(err)
1472+
}
1473+
return status
1474+
}
1475+
1476+
// Wait for one generation to finish.
1477+
<-checkDoneChan
1478+
status := waitForTasks()
1479+
suite.Require().Equal(VerificationStatus{TotalTasks: 2, FailedTasks: 0, CompletedTasks: 2}, *status)
1480+
1481+
// Insert another document that is not in the filter.
1482+
_, err = srcColl.InsertOne(ctx, bson.M{"_id": 3, "x": 43, "inFilter": false})
1483+
suite.Require().NoError(err)
1484+
1485+
// Tell check to start the next generation.
1486+
checkContinueChan <- struct{}{}
1487+
1488+
// Wait for generation to finish.
1489+
<-checkDoneChan
1490+
status = waitForTasks()
1491+
// There should be no failures, since the inserted document is not in the filter.
1492+
suite.Require().Equal(VerificationStatus{TotalTasks: 1, CompletedTasks: 1}, *status)
1493+
1494+
// Now insert in the source, this should come up next generation.
1495+
_, err = srcColl.InsertOne(ctx, bson.M{"_id": 4, "x": 44, "inFilter": true})
1496+
suite.Require().NoError(err)
1497+
1498+
// Tell check to start the next generation.
1499+
checkContinueChan <- struct{}{}
1500+
1501+
// Wait for one generation to finish.
1502+
<-checkDoneChan
1503+
status = waitForTasks()
1504+
1505+
// There should be a failure from the src insert of a document in the filter.
1506+
suite.Require().Equal(VerificationStatus{TotalTasks: 1, FailedTasks: 1}, *status)
1507+
1508+
// Now patch up the destination.
1509+
_, err = dstColl.InsertOne(ctx, bson.M{"_id": 4, "x": 44, "inFilter": true})
1510+
suite.Require().NoError(err)
1511+
1512+
// Continue.
1513+
checkContinueChan <- struct{}{}
1514+
1515+
// Wait for it to finish again, this should be a clean run.
1516+
<-checkDoneChan
1517+
status = waitForTasks()
1518+
1519+
// There should be no failures now, since they are equivalent at this point in time.
1520+
suite.Require().Equal(VerificationStatus{TotalTasks: 1, CompletedTasks: 1}, *status)
1521+
1522+
// Turn writes off.
1523+
verifier.WritesOff(ctx)
1524+
}

internal/verifier/web_server.go

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ const RequestInProgressErrorDescription = "Another request is currently in progr
2121

2222
// MigrationVerifierAPI represents the interaction webserver with mongosync
2323
type MigrationVerifierAPI interface {
24-
Check(ctx context.Context)
24+
Check(ctx context.Context, filter map[string]any)
2525
WritesOff(ctx context.Context)
2626
WritesOn(ctx context.Context)
2727
GetProgress(ctx context.Context) (Progress, error)
@@ -121,10 +121,12 @@ func (server *WebServer) RequestAndResponseLogger() gin.HandlerFunc {
121121
}
122122
}
123123

124-
// Run checks the web server. This is a blocking call.
125-
// This function should only be called once during each Webserver's life time.
126-
func (server *WebServer) Run(ctx context.Context) error {
124+
func (server *WebServer) setupRouter() *gin.Engine {
127125
gin.SetMode(gin.ReleaseMode)
126+
// This is set so that gin does not decode JSON numbers into float64.
127+
// The actual BSON type conversion for JSON numbers is deferred to go-driver.
128+
gin.EnableJsonDecoderUseNumber()
129+
128130
router := gin.New()
129131
router.Use(server.RequestAndResponseLogger(), gin.Recovery())
130132

@@ -139,10 +141,15 @@ func (server *WebServer) Run(ctx context.Context) error {
139141
}
140142

141143
router.HandleMethodNotAllowed = true
144+
return router
145+
}
142146

147+
// Run checks the web server. This is a blocking call.
148+
// This function should only be called once during each Webserver's life time.
149+
func (server *WebServer) Run(ctx context.Context) error {
143150
server.srv = &http.Server{
144151
Addr: "0.0.0.0:" + strconv.Itoa(server.port),
145-
Handler: router,
152+
Handler: server.setupRouter(),
146153
}
147154

148155
webServerCtx, shutDownWebServer := context.WithCancel(ctx)
@@ -177,15 +184,21 @@ func (server *WebServer) Run(ctx context.Context) error {
177184
// EmptyRequest is for request with empty body
178185
type EmptyRequest struct{}
179186

187+
// CheckRequest is for requests to the /check endpoint.
188+
type CheckRequest struct {
189+
Filter map[string]any `json:"filter"`
190+
}
191+
180192
func (server *WebServer) checkEndPoint(c *gin.Context) {
181-
var json EmptyRequest
193+
var req CheckRequest
182194

183-
if err := c.ShouldBindJSON(&json); err != nil {
195+
if err := c.ShouldBindJSON(&req); err != nil {
196+
err = errors.Wrap(err, "filter is not valid JSON")
184197
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
185198
return
186199
}
187200

188-
server.Mapi.Check(context.Background())
201+
server.Mapi.Check(context.Background(), req.Filter)
189202
//if err != nil {
190203
// server.operationalErrorResponse(c, err)
191204
// return

0 commit comments

Comments
 (0)