Skip to content

Commit 0d1f2c5

Browse files
committed
MB-54729: Enable history scan for CDC backfill
Replace the todo markers with code that now utilises the magma history API - this now means scanAllVersions for example is hooked into the magma history scanning API. Add new tests that validate multiple versions can be stored and returned. Also required are changes to unit tests to respect new expectation checks that occur in magma - primarily that flushing writes ordered batches - this is only a problem for tests which bypass the flusher and call KVStore directly. **** ISSUES **** ep-engine_ep_unit_tests does not pass: 1) Exception from magma MagmaKVStoreRollbackTest.Rollback hits the following exception GSL: Precondition failure: 'levelSize >= compactionState[level].history.Size' at /Users/jimwalker/Code/couchbase/neo/magma/lsm/lsm_tree.cc:895 2) Seg-fault in magma Seen in a number of tests, 1 example: CollectionsDcpEphemeralOrPersistent/CollectionsDcpParameterizedTest.DefaultCollectionDropped/persistent_magma_value_only Process 78731 stopped * thread couchbase#1, queue = 'com.apple.main-thread', stop reason = EXC_BAD_ACCESS (code=EXC_I386_GPFLT) frame #0: 0x00000001012eb7b0 ep-engine_ep_unit_tests`magma::DocSequenceBuffer::GetKey(this=0x0000000118131700) at lsd.cc:75:36 [opt] 72 } 73 74 Slice DocSequenceBuffer::GetKey() { -> 75 seqFmt.Set(sortedList[offset]->seqno); 76 return seqFmt.Encode(); 77 } 78 * thread couchbase#1, queue = 'com.apple.main-thread', stop reason = EXC_BAD_ACCESS (code=EXC_I386_GPFLT) * frame #0: 0x00000001012eb7b0 ep-engine_ep_unit_tests`magma::DocSequenceBuffer::GetKey(this=0x0000000118131700) at lsd.cc:75:36 [opt] frame couchbase#1: 0x0000000101361e2e ep-engine_ep_unit_tests`magma::mvccIteratorAdaptor::GetKey(this=0x0000000118536c00) at mvcc.h:249:25 [opt] frame couchbase#2: 0x000000010132b688 ep-engine_ep_unit_tests`magma::IteratorWithFilter::filterKeys(this=0x0000000118128350) at iterator.cc:214:32 [opt] frame couchbase#3: 0x000000010132de5b ep-engine_ep_unit_tests`magma::KVReader::ReadKVs(this=0x00007ff7bfefd550) at common.cc:70:19 [opt] frame couchbase#4: 0x0000000101378f63 ep-engine_ep_unit_tests`magma::LSMTree::writeSSTable(this=0x000000011855a820, w=0x00007ff7bfefd890, itr=0x0000000118128350, maxSn=10, stopFn=function<bool (const magma::Slice &)> @ 0x00007ff7bfefd860)>) at lsm_tree.cc:719:15 [opt] frame couchbase#5: 0x0000000101376ee8 ep-engine_ep_unit_tests`magma::LSMTree::writeSSTable(this=0x000000011855a820, appendMode=<unavailable>, itr=0x0000000118128350, sizeEstimate=<unavailable>, maxSn=10, stopFn=function<bool (const magma::Slice &)> @ 0x00007ff7bfefdb60)>) at lsm_tree.cc:682:17 [opt] frame couchbase#6: 0x00000001013761b2 ep-engine_ep_unit_tests`magma::LSMTree::writeMemtable(this=0x000000011855a820, memtable=0x000000011854c7a0) at lsm_tree.cc:449:21 [opt] frame #7: 0x000000010137753f ep-engine_ep_unit_tests`magma::LSMTree::doMemtableFlushWork(this=0x000000011855a820) at lsm_tree.cc:531:18 [opt] frame #8: 0x000000010139fe62 ep-engine_ep_unit_tests`std::__1::__function::__func<magma::LSMTree::newFlush()::$_16, std::__1::allocator<magma::LSMTree::newFlush()::$_16>, std::__1::tuple<magma::Status, magma::CheckpointTransaction> ()>::operator()() [inlined] magma::LSMTree::newFlush(this=<unavailable>)::$_16::operator()() const at lsm_tree.cc:993:34 [opt] frame #9: 0x000000010139fe5d ep-engine_ep_unit_tests`std::__1::__function::__func<magma::LSMTree::newFlush()::$_16, std::__1::allocator<magma::LSMTree::newFlush()::$_16>, std::__1::tuple<magma::Status, magma::CheckpointTransaction> ()>::operator()() [inlined] decltype(__f=<unavailable>)::$_16&>(fp)()) std::__1::__invoke<magma::LSMTree::newFlush()::$_16&>(magma::LSMTree::newFlush()::$_16&) at type_traits:3918:1 [opt] frame #10: 0x000000010139fe5d ep-engine_ep_unit_tests`std::__1::__function::__func<magma::LSMTree::newFlush()::$_16, std::__1::allocator<magma::LSMTree::newFlush()::$_16>, std::__1::tuple<magma::Status, magma::CheckpointTransaction> ()>::operator()() [inlined] std::__1::tuple<magma::Status, magma::CheckpointTransaction> std::__1::__invoke_void_return_wrapper<std::__1::tuple<magma::Status, magma::CheckpointTransaction>, false>::__call<magma::LSMTree::newFlush(__args=<unavailable>)::$_16&>(magma::LSMTree::newFlush()::$_16&) at invoke.h:30:16 [opt] frame #11: 0x000000010139fe5d ep-engine_ep_unit_tests`std::__1::__function::__func<magma::LSMTree::newFlush()::$_16, std::__1::allocator<magma::LSMTree::newFlush()::$_16>, std::__1::tuple<magma::Status, magma::CheckpointTransaction> ()>::operator()() [inlined] std::__1::__function::__alloc_func<magma::LSMTree::newFlush()::$_16, std::__1::allocator<magma::LSMTree::newFlush()::$_16>, std::__1::tuple<magma::Status, magma::CheckpointTransaction> ()>::operator(this=<unavailable>)() at function.h:178:16 [opt] frame #12: 0x000000010139fe59 ep-engine_ep_unit_tests`std::__1::__function::__func<magma::LSMTree::newFlush()::$_16, std::__1::allocator<magma::LSMTree::newFlush()::$_16>, std::__1::tuple<magma::Status, magma::CheckpointTransaction> ()>::operator(this=<unavailable>)() at function.h:352:12 [opt] frame #13: 0x00000001012f72af ep-engine_ep_unit_tests`magma::FlushWork::Execute() [inlined] std::__1::__function::__value_func<std::__1::tuple<magma::Status, magma::CheckpointTransaction> ()>::operator(this=<unavailable>)() const at function.h:505:16 [opt] frame #14: 0x00000001012f7296 ep-engine_ep_unit_tests`magma::FlushWork::Execute() [inlined] std::__1::function<std::__1::tuple<magma::Status, magma::CheckpointTransaction> ()>::operator(this=0x0000000118131560)() const at function.h:1182:12 [opt] frame #15: 0x00000001012f7292 ep-engine_ep_unit_tests`magma::FlushWork::Execute(this=0x0000000118131560) at flush_work.cc:61:29 [opt] frame #16: 0x0000000101389d5e ep-engine_ep_unit_tests`magma::KVStore::flushMemTables(this=0x00007ff7bfefe1c0)::$_38::operator()() at kvstore.cc:515:27 [opt] frame #17: 0x0000000101388fac ep-engine_ep_unit_tests`magma::KVStore::flushMemTables(this=0x000000010442a420, wal=<unavailable>, offset=(SegID = 1, SegOffset = 4096), flushMode=<unavailable>, blockMode=Blocking) at kvstore.cc:582:16 [opt] frame #18: 0x0000000101389a5a ep-engine_ep_unit_tests`magma::KVStore::FlushMemTables(this=<unavailable>, wal=<unavailable>, flushMode=<unavailable>, blockMode=<unavailable>) at kvstore.cc:387:12 [opt] frame #19: 0x00000001012fd9ba ep-engine_ep_unit_tests`magma::Magma::Impl::syncKVStore(this=0x000000011814f000, kvID=<unavailable>, checkpoint=true) at db.cc:1352:21 [opt] frame #20: 0x000000010132678a ep-engine_ep_unit_tests`std::__1::__function::__func<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8, std::__1::allocator<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8>, void ()>::operator()() [inlined] magma::Magma::Impl::CompactKVStore(this=0x00007ff7bfefe400)>)::$_7::operator()() const at db.cc:880:23 [opt] frame #21: 0x0000000101326772 ep-engine_ep_unit_tests`std::__1::__function::__func<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8, std::__1::allocator<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8>, void ()>::operator()() [inlined] magma::Magma::Impl::CompactKVStore(this=<unavailable>)>)::$_8::operator()() const at db.cc:891:21 [opt] frame #22: 0x0000000101326772 ep-engine_ep_unit_tests`std::__1::__function::__func<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8, std::__1::allocator<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8>, void ()>::operator()() [inlined] decltype(__f=<unavailable>)>)::$_8&>(fp)()) std::__1::__invoke<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8&>(magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8&) at type_traits:3918:1 [opt] frame #23: 0x0000000101326772 ep-engine_ep_unit_tests`std::__1::__function::__func<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8, std::__1::allocator<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8>, void ()>::operator()() [inlined] void std::__1::__invoke_void_return_wrapper<void, true>::__call<magma::Magma::Impl::CompactKVStore(__args=<unavailable>)>)::$_8&>(magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8&) at invoke.h:61:9 [opt] frame #24: 0x0000000101326772 ep-engine_ep_unit_tests`std::__1::__function::__func<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8, std::__1::allocator<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8>, void ()>::operator()() [inlined] std::__1::__function::__alloc_func<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8, std::__1::allocator<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8>, void ()>::operator(this=<unavailable>)() at function.h:178:16 [opt] frame #25: 0x0000000101326764 ep-engine_ep_unit_tests`std::__1::__function::__func<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8, std::__1::allocator<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8>, void ()>::operator(this=<unavailable>)() at function.h:352:12 [opt] frame #26: 0x0000000101303138 ep-engine_ep_unit_tests`magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>) [inlined] std::__1::__function::__value_func<void ()>::operator(this=<unavailable>)() const at function.h:505:16 [opt] frame #27: 0x000000010130312d ep-engine_ep_unit_tests`magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>) [inlined] std::__1::function<void ()>::operator(this=0x00007ff7bfefe4b0)() const at function.h:1182:12 [opt] frame #28: 0x0000000101303129 ep-engine_ep_unit_tests`magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>) [inlined] magma::defer::~defer(this=0x00007ff7bfefe4b0) at common.h:92:9 [opt] frame #29: 0x0000000101303129 ep-engine_ep_unit_tests`magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>) [inlined] magma::defer::~defer(this=0x00007ff7bfefe4b0) at common.h:91:14 [opt] frame #30: 0x0000000101303129 ep-engine_ep_unit_tests`magma::Magma::Impl::CompactKVStore(this=<unavailable>, kvID=<unavailable>, lowKey=0x00007ff7bfefe780, highKey=0x00007ff7bfefe780, makeCallback=magma::Magma::CompactionCallbackBuilder @ 0x00007ff7bfefe550)>) at db.cc:895:1 [opt] frame #31: 0x000000010130336c ep-engine_ep_unit_tests`magma::Magma::CompactKVStore(this=<unavailable>, kvID=0, lowKey=0x00007ff7bfefe780, highKey=<unavailable>, makeCallback=<unavailable>)>) at db.cc:901:18 [opt] frame #32: 0x000000010004fd3d ep-engine_ep_unit_tests`MagmaMemoryTrackingProxy::CompactKVStore(this=<unavailable>, kvID=0, lowKey=0x00007ff7bfefe780, highKey=0x00007ff7bfefe780, makeCallback=magma::Magma::CompactionCallbackBuilder @ 0x00007ff7bfefea00)>) at magma-memory-tracking-proxy.cc:190:19 [opt] frame #33: 0x00000001000a9eeb ep-engine_ep_unit_tests`MagmaKVStore::compactDBInternal(this=<unavailable>, vbLock=0x00007ff7bfefeda0, ctx=std::__1::shared_ptr<CompactionContext>::element_type @ 0x00000001184acc20 strong=3 weak=1) at magma-kvstore.cc:2590:29 [opt] frame #34: 0x00000001000a93ad ep-engine_ep_unit_tests`MagmaKVStore::compactDB(this=0x00000001067e6500, vbLock=0x00007ff7bfefeda0, ctx=nullptr) at magma-kvstore.cc:2445:12 [opt] frame #35: 0x00000001001d7eb0 ep-engine_ep_unit_tests`EPBucket::compactInternal(this=0x00000001067e6000, vb=0x00007ff7bfefed90, config=<unavailable>) at ep_bucket.cc:1398:25 [opt] frame #36: 0x00000001001d83f6 ep-engine_ep_unit_tests`EPBucket::doCompact(this=0x00000001067e6000, vbid=(vbid = 0), config=0x00007ff7bfefedf0, cookies=size=0) at ep_bucket.cc:1476:14 [opt] 3) Key sorting issue Magma now checks for sorted keys - it turns out KV flushing is violating that ordering. Need to know if KV should fix or is the magma check required?? Example: CollectionsDcpEphemeralOrPersistent/CollectionsLegacyDcpTest.default_collection_is_not_vbucket_highseqno_with_pending/persistent_nexus_couchstore_magma_value_only CRITICAL [(SynchronousEPEngine:default) magma_0]Fatal error: Found: preceding key(d2) > current key( _collection). If history is enabled, all keys in the batch must be sorted lexicographicall The problem is that the test flushes a prepare(default collection, key=d2) and create-collection(fruit) together. The flusher orders these... \0d2 \1create_fruit This is correct. But \0d2 is marked as a prepare, when flushed to disk it goes into a special namespace. This occurs in KVStore after the sorting. \0d2 becomes \2\0d2 And magma actually sees \2\0d2 \1create_fruit and we have violated the expects Change-Id: Ica9ea1b52c51f125c9e8839a0fca412834fc25f7
1 parent 65a919c commit 0d1f2c5

15 files changed

+214
-69
lines changed

engines/ep/src/kv_bucket.cc

+5
Original file line numberDiff line numberDiff line change
@@ -3098,6 +3098,11 @@ std::chrono::seconds KVBucket::getHistoryRetentionSeconds() const {
30983098

30993099
void KVBucket::setHistoryRetentionBytes(size_t bytes) {
31003100
historyRetentionBytes = bytes;
3101+
for (auto& i : vbMap.shards) {
3102+
KVShard* shard = i.get();
3103+
shard->getRWUnderlying()->setHistoryRetentionBytes(bytes);
3104+
}
3105+
31013106
}
31023107

31033108
size_t KVBucket::getHistoryRetentionBytes() const {

engines/ep/src/kvstore/couch-kvstore/couch-kvstore.cc

+7
Original file line numberDiff line numberDiff line change
@@ -4542,3 +4542,10 @@ std::unique_ptr<TransactionContext> CouchKVStore::begin(
45424542
return std::make_unique<CouchKVStoreTransactionContext>(
45434543
*this, vbid, std::move(pcb));
45444544
}
4545+
4546+
void CouchKVStore::setHistoryRetentionBytes(size_t size) {
4547+
// no-op.
4548+
// Note: StorageProperties reports that history scan is not supported, so
4549+
// we accept this attempt to set size, but will fail if a scanAllVersions
4550+
// is attempted.
4551+
}

engines/ep/src/kvstore/couch-kvstore/couch-kvstore.h

+2
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,8 @@ class CouchKVStore : public KVStore
410410
std::unique_ptr<TransactionContext> begin(
411411
Vbid vbid, std::unique_ptr<PersistenceCallback> pcb) override;
412412

413+
void setHistoryRetentionBytes(size_t size) override;
414+
413415
protected:
414416
/**
415417
* RAII holder for a couchstore LocalDoc object

engines/ep/src/kvstore/kvstore_iface.h

+5
Original file line numberDiff line numberDiff line change
@@ -763,6 +763,11 @@ class KVStoreIface {
763763
* @param vbid ID of the vbucket being created
764764
*/
765765
virtual void prepareToCreateImpl(Vbid vbid) = 0;
766+
767+
/**
768+
* Method to configure the amount of history a vbucket should retain.
769+
*/
770+
virtual void setHistoryRetentionBytes(size_t size) = 0;
766771
};
767772

768773
std::string to_string(KVStoreIface::ReadVBStateStatus status);

engines/ep/src/kvstore/magma-kvstore/magma-kvstore.cc

+16-10
Original file line numberDiff line numberDiff line change
@@ -863,7 +863,7 @@ StorageProperties MagmaKVStore::getStorageProperties() const {
863863
StorageProperties::AutomaticDeduplication::No,
864864
StorageProperties::PrepareCounting::No,
865865
StorageProperties::CompactionStaleItemCallbacks::Yes,
866-
StorageProperties::HistoryRetentionAvailable::No);
866+
StorageProperties::HistoryRetentionAvailable::Yes);
867867
return rv;
868868
}
869869

@@ -1683,8 +1683,7 @@ std::unique_ptr<BySeqnoScanContext> MagmaKVStore::initBySeqnoScanContext(
16831683
getDroppedStatus.String());
16841684
}
16851685

1686-
// @todo:assign this using magma->GetOldestHistorySeqno(snapshot);
1687-
auto historyStartSeqno = 0;
1686+
auto historyStartSeqno = magma->GetOldestHistorySeqno(snapshot);
16881687
if (logger->should_log(spdlog::level::info)) {
16891688
logger->info(
16901689
"MagmaKVStore::initBySeqnoScanContext {} seqno:{} endSeqno:{}"
@@ -1797,8 +1796,7 @@ std::unique_ptr<ByIdScanContext> MagmaKVStore::initByIdScanContext(
17971796
return nullptr;
17981797
}
17991798

1800-
// @todo:assign this using magma->GetOldestHistorySeqno(snapshot);
1801-
auto historyStartSeqno = 0;
1799+
auto historyStartSeqno = magma->GetOldestHistorySeqno(snapshot);
18021800
logger->info(
18031801
"MagmaKVStore::initByIdScanContext {} historyStartSeqno:{} "
18041802
"KeyIterator:{}",
@@ -1818,13 +1816,16 @@ std::unique_ptr<ByIdScanContext> MagmaKVStore::initByIdScanContext(
18181816
historyStartSeqno);
18191817
}
18201818

1819+
scan_error_t MagmaKVStore::scan(BySeqnoScanContext& ctx) const {
1820+
return scan(ctx, magma::Magma::SeqIterator::Mode::Snapshot);
1821+
}
1822+
18211823
scan_error_t MagmaKVStore::scanAllVersions(BySeqnoScanContext& ctx) const {
1822-
// @todo use magma's mode
1823-
// return scan(ctx, magma::Magma::SeqIterator::Mode::History);
1824-
return scan(ctx);
1824+
return scan(ctx, magma::Magma::SeqIterator::Mode::History);
18251825
}
18261826

1827-
scan_error_t MagmaKVStore::scan(BySeqnoScanContext& ctx) const {
1827+
scan_error_t MagmaKVStore::scan(BySeqnoScanContext& ctx,
1828+
magma::Magma::SeqIterator::Mode mode) const {
18281829
if (ctx.lastReadSeqno == ctx.maxSeqno) {
18291830
logger->TRACE("MagmaKVStore::scan {} lastReadSeqno:{} == maxSeqno:{}",
18301831
ctx.vbid,
@@ -1837,7 +1838,8 @@ scan_error_t MagmaKVStore::scan(BySeqnoScanContext& ctx) const {
18371838
startSeqno = ctx.lastReadSeqno + 1;
18381839
}
18391840
auto& mctx = dynamic_cast<MagmaScanContext&>(ctx);
1840-
for (mctx.itr->Seek(startSeqno, ctx.maxSeqno); mctx.itr->Valid();
1841+
for (mctx.itr->Initialize(startSeqno, ctx.maxSeqno, mode);
1842+
mctx.itr->Valid();
18411843
mctx.itr->Next()) {
18421844
Slice keySlice, metaSlice, valSlice;
18431845
uint64_t seqno;
@@ -3710,3 +3712,7 @@ std::pair<Status, uint64_t> MagmaKVStore::getOldestRollbackableHighSeqno(
37103712

37113713
return {status, seqno};
37123714
}
3715+
3716+
void MagmaKVStore::setHistoryRetentionBytes(size_t size) {
3717+
magma->SetHistoryRetentionSize(size);
3718+
}

engines/ep/src/kvstore/magma-kvstore/magma-kvstore.h

+5
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,8 @@ class MagmaKVStore : public KVStore {
567567
std::unique_ptr<TransactionContext> begin(
568568
Vbid vbid, std::unique_ptr<PersistenceCallback> pcb) override;
569569

570+
void setHistoryRetentionBytes(size_t size) override;
571+
570572
// Magma uses a unique logger with a prefix of magma so that all logging
571573
// calls from the wrapper thru magma will be prefixed with magma.
572574
std::shared_ptr<BucketLogger> logger;
@@ -768,6 +770,9 @@ class MagmaKVStore : public KVStore {
768770
const magma::Slice& valSlice,
769771
std::function<magma::Status(magma::Slice&)> valueRead) const;
770772

773+
scan_error_t scan(BySeqnoScanContext& ctx,
774+
magma::Magma::SeqIterator::Mode mode) const;
775+
771776
MagmaKVStoreConfig& configuration;
772777

773778
/**

engines/ep/src/kvstore/magma-kvstore/magma-memory-tracking-proxy.cc

+3-1
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,9 @@ magma::Status MagmaMemoryTrackingProxy::WriteDocs(
510510
docOperations,
511511
kvsRev,
512512
wrappedDocCallback,
513-
wrappedPostCallback);
513+
wrappedPostCallback,
514+
// @todo: don't force this - must be passed down
515+
magma::Magma::HistoryMode::Enabled);
514516
}
515517

516518
magma::Status MagmaMemoryTrackingProxy::NewCheckpoint(

engines/ep/src/kvstore/nexus-kvstore/nexus-kvstore.cc

+7
Original file line numberDiff line numberDiff line change
@@ -3299,3 +3299,10 @@ Vbid::id_type NexusKVStore::getCacheSlot(Vbid vbid) const {
32993299
uint64_t NexusKVStore::getPurgeSeqno(Vbid vbid) const {
33003300
return purgeSeqno.at(getCacheSlot(vbid));
33013301
}
3302+
3303+
void NexusKVStore::setHistoryRetentionBytes(size_t size) {
3304+
// no-op.
3305+
// Note: StorageProperties reports that history scan is not supported, so
3306+
// we accept this attempt to set size, but will fail if a scanAllVersions
3307+
// is attempted.
3308+
}

engines/ep/src/kvstore/nexus-kvstore/nexus-kvstore.h

+1
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ class NexusKVStore : public KVStoreIface {
143143
void delSystemEvent(TransactionContext& txnCtx,
144144
const queued_item item) override;
145145
void endTransaction(Vbid vbid) override;
146+
void setHistoryRetentionBytes(size_t size) override;
146147

147148
/**
148149
* Unit test only hook called before we compact the first KVStore. Public as

engines/ep/src/kvstore/rocksdb-kvstore/rocksdb-kvstore.cc

+7
Original file line numberDiff line numberDiff line change
@@ -1973,3 +1973,10 @@ RocksDBKVStoreTransactionContext::RocksDBKVStoreTransactionContext(
19731973
: TransactionContext(kvstore, vbid, std::move(cb)),
19741974
pendingReqs(std::make_unique<RocksDBKVStore::PendingRequestQueue>()) {
19751975
}
1976+
1977+
void RocksDBKVStore::setHistoryRetentionBytes(size_t size) {
1978+
// no-op.
1979+
// Note: StorageProperties reports that history scan is not supported, so
1980+
// we accept this attempt to set size, but will fail if a scanAllVersions
1981+
// is attempted.
1982+
}

engines/ep/src/kvstore/rocksdb-kvstore/rocksdb-kvstore.h

+2
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,8 @@ class RocksDBKVStore : public KVStore {
333333
std::unique_ptr<TransactionContext> begin(
334334
Vbid vbid, std::unique_ptr<PersistenceCallback> pcb) override;
335335

336+
void setHistoryRetentionBytes(size_t size) override;
337+
336338
protected:
337339
// Write a batch of updates to the given database; measuring the time
338340
// taken and adding the timer to the commit histogram.

engines/ep/tests/mock/mock_kvstore.h

+1
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ class MockKVStore : public KVStore {
225225
prepareToRollback,
226226
(Vbid vbid),
227227
(override));
228+
MOCK_METHOD(void, setHistoryRetentionBytes, (size_t size), (override));
228229

229230
/**
230231
* Helper function to replace the existing read-write KVStore in the given

engines/ep/tests/module_tests/collections/collections_kvstore_test.cc

+51-18
Original file line numberDiff line numberDiff line change
@@ -88,25 +88,53 @@ class CollectionsKVStoreTestBase : public KVStoreBackend, public KVStoreTest {
8888

8989
void applyEvents(TransactionContext& txnCtx,
9090
VB::Commit& commitData,
91-
const CollectionsManifest& cm) {
91+
const CollectionsManifest& cm,
92+
bool writeEventNow = true) {
9293
manifest.update(*vbucket, makeManifest(cm));
9394

9495
std::vector<queued_item> events;
9596
getEventsFromCheckpoint(events);
9697

9798
for (auto& ev : events) {
9899
commitData.collections.recordSystemEvent(*ev);
100+
if (writeEventNow) {
101+
if (ev->isDeleted()) {
102+
kvstore->delSystemEvent(txnCtx, ev);
103+
} else {
104+
kvstore->setSystemEvent(txnCtx, ev);
105+
}
106+
}
107+
}
108+
if (!writeEventNow) {
109+
std::move(events.begin(),
110+
events.end(),
111+
std::back_inserter(allEvents));
112+
}
113+
}
114+
115+
void applyEvents(TransactionContext& txnCtx,
116+
const CollectionsManifest& cm,
117+
bool writeEventNow = true) {
118+
return applyEvents(txnCtx, flush, cm, writeEventNow);
119+
}
120+
121+
// This function is to be used in conjunction with applyEvents when
122+
// writeEventNow=false allowing a test to better emulate the flusher and
123+
// write keys in a sorted batch. Tests can applyEvents so that collection
124+
// metadata management does updates, but defer the system event writing
125+
// until ready to commit
126+
void sortAndWriteAllEvents(TransactionContext& txnCtx) {
127+
std::sort(allEvents.begin(),
128+
allEvents.end(),
129+
OrderItemsForDeDuplication{});
130+
for (auto& ev : allEvents) {
99131
if (ev->isDeleted()) {
100132
kvstore->delSystemEvent(txnCtx, ev);
101133
} else {
102134
kvstore->setSystemEvent(txnCtx, ev);
103135
}
104136
}
105-
}
106-
107-
void applyEvents(TransactionContext& txnCtx,
108-
const CollectionsManifest& cm) {
109-
applyEvents(txnCtx, flush, cm);
137+
allEvents.clear();
110138
}
111139

112140
void checkUid(const Collections::KVStore::Manifest& md,
@@ -219,7 +247,8 @@ class CollectionsKVStoreTestBase : public KVStoreBackend, public KVStoreTest {
219247
VB::Commit commitData(manifest);
220248
auto ctx = kvstore->begin(vbucket->getId(),
221249
std::make_unique<PersistenceCallback>());
222-
applyEvents(*ctx, commitData, cm);
250+
applyEvents(*ctx, commitData, cm, false);
251+
sortAndWriteAllEvents(*ctx);
223252
kvstore->commit(std::move(ctx), commitData);
224253
auto [status, md] = kvstore->getCollectionsManifest(Vbid(0));
225254
EXPECT_TRUE(status);
@@ -235,6 +264,7 @@ class CollectionsKVStoreTestBase : public KVStoreBackend, public KVStoreTest {
235264
VBucketPtr vbucket;
236265
WriteCallback wc;
237266
DeleteCallback dc;
267+
std::vector<queued_item> allEvents;
238268
};
239269

240270
class CollectionsKVStoreTest
@@ -578,19 +608,21 @@ class CollectionRessurectionKVStoreTest
578608
auto ctx = kvstore->begin(vbucket->getId(),
579609
std::make_unique<PersistenceCallback>());
580610
cm.add(targetScope);
581-
applyEvents(*ctx, cm);
611+
applyEvents(*ctx, cm, false);
582612
cm.add(target, targetScope);
583-
applyEvents(*ctx, cm);
613+
applyEvents(*ctx, cm, false);
614+
sortAndWriteAllEvents(*ctx);
584615
kvstore->commit(std::move(ctx), flush);
585616
}
586617

587618
// runs a flush batch that will leave the target collection in dropped state
588619
void dropScope() {
589620
openScopeOpenCollection();
590-
cm.remove(targetScope);
591621
auto ctx = kvstore->begin(vbucket->getId(),
592622
std::make_unique<PersistenceCallback>());
593-
applyEvents(*ctx, cm);
623+
cm.remove(targetScope);
624+
applyEvents(*ctx, cm, false);
625+
sortAndWriteAllEvents(*ctx);
594626
kvstore->commit(std::move(ctx), flush);
595627
}
596628

@@ -704,9 +736,9 @@ void CollectionRessurectionKVStoreTest::resurectionScopesTest() {
704736
std::make_unique<PersistenceCallback>());
705737
if (!cm.exists(targetScope)) {
706738
cm.add(targetScope);
707-
applyEvents(*ctx, cm);
739+
applyEvents(*ctx, cm, false);
708740
cm.add(target, targetScope);
709-
applyEvents(*ctx, cm);
741+
applyEvents(*ctx, cm, false);
710742
}
711743

712744
std::string expectedName = target.name;
@@ -715,22 +747,23 @@ void CollectionRessurectionKVStoreTest::resurectionScopesTest() {
715747
// iterate cycles of remove/add
716748
for (int ii = 0; ii < getCycles(); ii++) {
717749
cm.remove(scope);
718-
applyEvents(*ctx, cm);
719-
750+
applyEvents(*ctx, cm, false);
720751
if (resurectWithNewName()) {
721752
expectedName = target.name + "_" + std::to_string(ii);
722753
scope.name = targetScope.name + "_" + std::to_string(ii);
723754
}
724755
cm.add(scope);
725-
applyEvents(*ctx, cm);
756+
applyEvents(*ctx, cm, false);
726757
cm.add({expectedName, target.uid}, scope);
727-
applyEvents(*ctx, cm);
758+
applyEvents(*ctx, cm, false);
728759
}
729760

730761
if (dropCollectionAtEnd()) {
731762
cm.remove(scope);
732-
applyEvents(*ctx, cm);
763+
applyEvents(*ctx, cm, false);
733764
}
765+
766+
sortAndWriteAllEvents(*ctx);
734767
kvstore->commit(std::move(ctx), flush);
735768

736769
// Now validate

0 commit comments

Comments
 (0)