Skip to content

Commit 284c2e2

Browse files
committed
MB-54729: Add historyStartSeqno to ScanContext
Add a new member, historyStartSeqno to ScanContext for later use in DCP backfill when "change_stream=true". This will always be 0 for a KVStore which reports: StorageProperties::HistoryRetentionAvailable::No For a KVStore which reports: StorageProperties::HistoryRetentionAvailable::Yes This will be set to: 0: If no history is configured (retention_bytes=0) >0: When history is available, this value represents the lowest seqno at which a continuous change stream could be made available. This commit is not dependent upon Magma's changes to add history, but adds the ability for MockMagmaKVStore to "inject" a history start. Change-Id: I66db1d84af54f4a999f260d30b356a9a149ddf31
1 parent 86fb4e3 commit 284c2e2

File tree

4 files changed

+99
-13
lines changed

4 files changed

+99
-13
lines changed

engines/ep/src/kvstore/kvstore.cc

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,16 @@ ScanContext::ScanContext(
5454
std::unique_ptr<StatusCallback<CacheLookup>> cl,
5555
const std::vector<Collections::KVStore::DroppedCollection>&
5656
droppedCollections,
57-
uint64_t maxSeqno)
57+
uint64_t maxSeqno,
58+
uint64_t historyStartSeqno)
5859
: vbid(vbid),
5960
handle(std::move(handle)),
6061
docFilter(docFilter),
6162
valFilter(valFilter),
6263
logger(getGlobalBucketLogger().get()),
6364
collectionsContext(droppedCollections),
6465
maxSeqno(maxSeqno),
66+
historyStartSeqno(historyStartSeqno),
6567
callback(std::move(cb)),
6668
lookup(std::move(cl)) {
6769
Expects(callback != nullptr);
@@ -82,7 +84,8 @@ BySeqnoScanContext::BySeqnoScanContext(
8284
const vbucket_state& vbucketState,
8385
const std::vector<Collections::KVStore::DroppedCollection>&
8486
droppedCollections,
85-
std::optional<uint64_t> timestamp)
87+
std::optional<uint64_t> timestamp,
88+
uint64_t historyStartSeqno)
8689

8790
: ScanContext(vb,
8891
std::move(handle),
@@ -91,7 +94,8 @@ BySeqnoScanContext::BySeqnoScanContext(
9194
std::move(cb),
9295
std::move(cl),
9396
droppedCollections,
94-
end),
97+
end,
98+
historyStartSeqno),
9599
startSeqno(start),
96100
purgeSeqno(purgeSeqno),
97101
documentCount(_documentCount),
@@ -110,15 +114,17 @@ ByIdScanContext::ByIdScanContext(
110114
ValueFilter _valFilter,
111115
const std::vector<Collections::KVStore::DroppedCollection>&
112116
droppedCollections,
113-
uint64_t maxSeqno)
117+
uint64_t maxSeqno,
118+
uint64_t historyStartSeqno)
114119
: ScanContext(vb,
115120
std::move(handle),
116121
_docFilter,
117122
_valFilter,
118123
std::move(cb),
119124
std::move(cl),
120125
droppedCollections,
121-
maxSeqno),
126+
maxSeqno,
127+
historyStartSeqno),
122128
ranges(std::move(ranges)),
123129
lastReadKey(nullptr, 0) {
124130
}

engines/ep/src/kvstore/kvstore.h

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,8 @@ class ScanContext {
392392
std::unique_ptr<StatusCallback<CacheLookup>> cl,
393393
const std::vector<Collections::KVStore::DroppedCollection>&
394394
droppedCollections,
395-
uint64_t maxSeqno);
395+
uint64_t maxSeqno,
396+
uint64_t historyStartSeqno = 0);
396397

397398
virtual ~ScanContext() = default;
398399

@@ -412,14 +413,15 @@ class ScanContext {
412413
return *lookup;
413414
}
414415

415-
const Vbid vbid;
416+
const Vbid vbid{0};
416417
uint64_t lastReadSeqno{0};
417418
std::unique_ptr<KVFileHandle> handle;
418-
const DocumentFilter docFilter;
419-
const ValueFilter valFilter;
420-
BucketLogger* logger;
419+
const DocumentFilter docFilter{DocumentFilter::ALL_ITEMS};
420+
const ValueFilter valFilter{ValueFilter::KEYS_ONLY};
421+
BucketLogger* logger{nullptr};
421422
const Collections::VB::ScanContext collectionsContext;
422-
uint64_t maxSeqno;
423+
uint64_t maxSeqno{0};
424+
uint64_t historyStartSeqno{0};
423425

424426
/**
425427
* Cumulative count of bytes read from disk during this scan. Counts
@@ -454,7 +456,8 @@ class BySeqnoScanContext : public ScanContext {
454456
const vbucket_state& vbucketState,
455457
const std::vector<Collections::KVStore::DroppedCollection>&
456458
droppedCollections,
457-
std::optional<uint64_t> timestamp = {});
459+
std::optional<uint64_t> timestamp = {},
460+
uint64_t historyStartSeqno = 0);
458461

459462
const uint64_t startSeqno;
460463
const uint64_t purgeSeqno;
@@ -516,7 +519,8 @@ class ByIdScanContext : public ScanContext {
516519
ValueFilter _valFilter,
517520
const std::vector<Collections::KVStore::DroppedCollection>&
518521
droppedCollections,
519-
uint64_t maxSeqno);
522+
uint64_t maxSeqno,
523+
uint64_t historyStartSeqno = 0);
520524
std::vector<ByIdRange> ranges;
521525
// Key should be set by KVStore when a scan must be paused, this is where
522526
// a scan can resume from

engines/ep/tests/mock/mock_magma_kvstore.cc

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,3 +90,44 @@ magma::Status MockMagmaKVStore::runImplicitCompactKVStore(Vbid vbid) {
9090
magma::Status MockMagmaKVStore::newCheckpoint(Vbid vbid) {
9191
return magma->NewCheckpoint(vbid.get());
9292
}
93+
94+
std::unique_ptr<BySeqnoScanContext> MockMagmaKVStore::initBySeqnoScanContext(
95+
std::unique_ptr<StatusCallback<GetValue>> cb,
96+
std::unique_ptr<StatusCallback<CacheLookup>> cl,
97+
Vbid vbid,
98+
uint64_t startSeqno,
99+
DocumentFilter options,
100+
ValueFilter valOptions,
101+
SnapshotSource source,
102+
std::unique_ptr<KVFileHandle> fileHandle) const {
103+
auto scanContext =
104+
MagmaKVStore::initBySeqnoScanContext(std::move(cb),
105+
std::move(cl),
106+
vbid,
107+
startSeqno,
108+
options,
109+
valOptions,
110+
source,
111+
std::move(fileHandle));
112+
113+
if (historyStartSeqno) {
114+
scanContext->historyStartSeqno = historyStartSeqno.value();
115+
}
116+
return scanContext;
117+
}
118+
119+
std::unique_ptr<ByIdScanContext> MockMagmaKVStore::initByIdScanContext(
120+
std::unique_ptr<StatusCallback<GetValue>> cb,
121+
std::unique_ptr<StatusCallback<CacheLookup>> cl,
122+
Vbid vbid,
123+
const std::vector<ByIdRange>& ranges,
124+
DocumentFilter options,
125+
ValueFilter valOptions) const {
126+
auto scanContext = MagmaKVStore::initByIdScanContext(
127+
std::move(cb), std::move(cl), vbid, ranges, options, valOptions);
128+
129+
if (historyStartSeqno) {
130+
scanContext->historyStartSeqno = historyStartSeqno.value();
131+
}
132+
return scanContext;
133+
}

engines/ep/tests/mock/mock_magma_kvstore.h

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,34 @@ class MockMagmaKVStore : public MagmaKVStore {
1919
public:
2020
explicit MockMagmaKVStore(MagmaKVStoreConfig& config);
2121

22+
/**
23+
* See base-class comments for usage. However this "mock" version will
24+
* override the ScanContext::historyStartSeqno if this class defines
25+
* historyStartSeqno (see historyStartSeqno member below).
26+
*/
27+
std::unique_ptr<BySeqnoScanContext> initBySeqnoScanContext(
28+
std::unique_ptr<StatusCallback<GetValue>> cb,
29+
std::unique_ptr<StatusCallback<CacheLookup>> cl,
30+
Vbid vbid,
31+
uint64_t startSeqno,
32+
DocumentFilter options,
33+
ValueFilter valOptions,
34+
SnapshotSource source,
35+
std::unique_ptr<KVFileHandle> fileHandle = nullptr) const override;
36+
37+
/**
38+
* See base-class comments for usage. However this "mock" version will
39+
* override the ScanContext::historyStartSeqno if this class defines
40+
* historyStartSeqno (see historyStartSeqno member below).
41+
*/
42+
std::unique_ptr<ByIdScanContext> initByIdScanContext(
43+
std::unique_ptr<StatusCallback<GetValue>> cb,
44+
std::unique_ptr<StatusCallback<CacheLookup>> cl,
45+
Vbid vbid,
46+
const std::vector<ByIdRange>& ranges,
47+
DocumentFilter options,
48+
ValueFilter valOptions) const override;
49+
2250
ReadVBStateResult readVBStateFromDisk(Vbid vbid);
2351

2452
ReadVBStateResult readVBStateFromDisk(
@@ -75,6 +103,13 @@ class MockMagmaKVStore : public MagmaKVStore {
75103
std::function<bool()> snapshotVBucketErrorInjector;
76104

77105
StorageProperties storageProperties;
106+
107+
/**
108+
* The historyStartSeqno in this class when set will override the value
109+
* MagmaKVStore sets. This allows arbitrary placement of the history range
110+
* for testing of various two-phase backfills.
111+
*/
112+
std::optional<uint64_t> historyStartSeqno;
78113
};
79114

80115
#endif

0 commit comments

Comments
 (0)