Skip to content

Commit 557fd0d

Browse files
committed
MB-54729: DCP disk backfill a CDC stream
Backfill of a CDC stream has 3 possible outcomes. If no history is retained, the backfill behaves no differently than it would on 7.1. This case can occur because "history_retention_bytes=0" or the vbucket has explicitly "dropped" any retained history (corner case of history replication). If history is retained the backfill can behave in two ways depending on where the backfill start seqno is relative to ScanContext::historyStartSeqno. Case1: start >= ScanContext::historyStartSeqno The backfill starts inside the retained history range, the backfill will generate a single DCP snapshot marker followed by the mutations. The DCP snapshot will state "history" and "may contain duplicates". Case2: start < ScanContext::historyStartSeqno The backfill starts below the retained history range. In this case the disk snapshot is used to generate two DCP snapshots. The first DCP snapshot is the non-history range, a KVStore::scan from the requested start upto, but not including ScanContext::historyStartSeqno. The second DCP snapshot is the history range, a KVStore::scanAllVersions from ScanContext::historyStartSeqno to the end of the disk snapshots sequence index. In case 2, both DCP snapshot markers represent the entire disk snapshot That is that they will both state the full disk range and the MVS/HCS come from the entire range. For example The disk seqno index range is from 1 to 20, and this is then split into two sub-ranges non-history (nh) and history (h) nh{1,10} h{11, 20} If a backfill occurs and the requested stream start is any of 0 to 10 DCP will transmit two ranges using two markers as follows: snapshot marker 1: snapshot-range{start, 20} mvs = 20, hcs = 20 flags = disk | checkpoint mutations start to 10 snapshot marker 2: snapshot range{start, 20} mvs = 20, hcs = 20 flags = disk | checkpoint | history | may_contain_duplicates mutations 11 to 20 The implementation of the "history" range adds a new optional phase to the DCP backfill state machine. When the backfill transitions into backfill_state_scanning the variations of the backfill are checked for. From here the following paths exist. No history; backfill_state_scanning -> backfill_state_complete In this case the full snapshot is delivered from backfill_state_scanning phase. Only history: backfill_state_scanning -> backfill_state_scanning_history_snapshot In this case the full snapshot is delivered from backfill_state_scanning_history_snapshot phase. The The backfill_state_scanning phase has only inspected the ScanContext and skipped to history. Two snapshots backfill_state_scanning -> backfill_state_scanning_history_snapshot In this case both backfill phases are delivering "snapshots", but the same magma snapshot is the source. Change-Id: I5a6df7ed929d99187a74a071c1d523d904cd6f7e
1 parent 4c2f71c commit 557fd0d

17 files changed

+811
-73
lines changed

engines/ep/src/dcp/active_stream.cc

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,8 @@ bool ActiveStream::markDiskSnapshot(uint64_t startSeqno,
279279
uint64_t endSeqno,
280280
std::optional<uint64_t> highCompletedSeqno,
281281
uint64_t maxVisibleSeqno,
282-
std::optional<uint64_t> timestamp) {
282+
std::optional<uint64_t> timestamp,
283+
SnapshotSource source) {
283284
{
284285
std::unique_lock<std::mutex> lh(streamMutex);
285286

@@ -322,7 +323,8 @@ bool ActiveStream::markDiskSnapshot(uint64_t startSeqno,
322323
/* We may need to send the requested 'snap_start_seqno_' as the snapshot
323324
start when we are sending the first snapshot because the first
324325
snapshot could be resumption of a previous snapshot */
325-
startSeqno = adjustStartIfFirstSnapshot(startSeqno);
326+
startSeqno = adjustStartIfFirstSnapshot(
327+
startSeqno, source != SnapshotSource::NoHistoryPrologue);
326328

327329
VBucketPtr vb = engine->getVBucket(vb_);
328330
if (!vb) {
@@ -361,24 +363,32 @@ bool ActiveStream::markDiskSnapshot(uint64_t startSeqno,
361363
auto mvsToSend = supportSyncReplication()
362364
? std::make_optional(maxVisibleSeqno)
363365
: std::nullopt;
366+
367+
auto flags = MARKER_FLAG_DISK | MARKER_FLAG_CHK;
368+
369+
if (source == SnapshotSource::History) {
370+
flags |= (MARKER_FLAG_HISTORY |
371+
MARKER_FLAG_MAY_CONTAIN_DUPLICATE_KEYS);
372+
}
373+
364374
log(spdlog::level::level_enum::info,
365375
"{} ActiveStream::markDiskSnapshot: Sending disk snapshot with "
366-
"start {}, end {}, and high completed {}, max visible {}",
376+
"start:{}, end:{}, flags:0x{:x}, hcs:{}, mvs:{}",
367377
logPrefix,
368378
startSeqno,
369379
endSeqno,
380+
flags,
370381
to_string_or_none(hcsToSend),
371382
to_string_or_none(mvsToSend));
372-
pushToReadyQ(std::make_unique<SnapshotMarker>(
373-
opaque_,
374-
vb_,
375-
startSeqno,
376-
endSeqno,
377-
MARKER_FLAG_DISK | MARKER_FLAG_CHK,
378-
hcsToSend,
379-
mvsToSend,
380-
timestamp,
381-
sid));
383+
pushToReadyQ(std::make_unique<SnapshotMarker>(opaque_,
384+
vb_,
385+
startSeqno,
386+
endSeqno,
387+
flags,
388+
hcsToSend,
389+
mvsToSend,
390+
timestamp,
391+
sid));
382392
lastSentSnapEndSeqno.store(endSeqno, std::memory_order_relaxed);
383393

384394
if (!isDiskOnly()) {
@@ -2343,7 +2353,7 @@ bool ActiveStream::isSeqnoGapAtEndOfSnapshot(uint64_t streamSeqno) const {
23432353
void ActiveStream::sendSnapshotAndSeqnoAdvanced(CheckpointType checkpointType,
23442354
uint64_t start,
23452355
uint64_t end) {
2346-
start = adjustStartIfFirstSnapshot(start);
2356+
start = adjustStartIfFirstSnapshot(start, true);
23472357

23482358
const auto isCkptTypeDisk = isDiskCheckpointType(checkpointType);
23492359
uint32_t flags = isCkptTypeDisk ? MARKER_FLAG_DISK : MARKER_FLAG_MEMORY;
@@ -2364,9 +2374,12 @@ void ActiveStream::sendSnapshotAndSeqnoAdvanced(CheckpointType checkpointType,
23642374
queueSeqnoAdvanced();
23652375
}
23662376

2367-
uint64_t ActiveStream::adjustStartIfFirstSnapshot(uint64_t start) {
2377+
uint64_t ActiveStream::adjustStartIfFirstSnapshot(uint64_t start,
2378+
bool isCompleteSnapshot) {
23682379
if (!firstMarkerSent) {
2369-
firstMarkerSent = true;
2380+
if (isCompleteSnapshot) {
2381+
firstMarkerSent = true;
2382+
}
23702383
return std::min(snap_start_seqno_, start);
23712384
}
23722385
return start;

engines/ep/src/dcp/active_stream.h

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,23 @@ class ActiveStream : public Stream,
190190

191191
void setBackfillRemaining_UNLOCKED(size_t value);
192192

193+
// The source of the snapshot marker
194+
//
195+
// History - This is a range which has history, all updates to keys will be
196+
// returned.
197+
// NoHistory - A range which does not have history, all keys are the most
198+
// recent updates.
199+
// NoHistoryPrologue - This is the NoHistory range from a disk snapshot also
200+
// also contains History. The backfill will cross from
201+
// the no-history to history ranges.
202+
//
203+
// NoHistoryPrologue exists to indicate the case when a disk snapshot
204+
// has both History and NoHistory ranges - in this case markDiskSnapshot
205+
// for example will get invoked twice by the same source backfill. First
206+
// NoHistoryPrologue, second History. This allows ActiveStream to
207+
// distinguish from NoHistory which will not transition to History.
208+
enum SnapshotSource { History, NoHistory, NoHistoryPrologue };
209+
193210
/**
194211
* Queues a snapshot marker to be sent - only if there are items in
195212
* the backfill range which will be sent.
@@ -206,14 +223,16 @@ class ActiveStream : public Stream,
206223
* @param maxVisibleSeqno seqno of last visible (commit/mutation/system
207224
* event) item
208225
* @param timestamp of the disk snapshot (if available)
226+
* @param source if the snapshot is a history or non-history snapshot
209227
* @return If the stream has queued a snapshot marker. If this is false, the
210228
* stream determined none of the items in the backfill would be sent
211229
*/
212230
bool markDiskSnapshot(uint64_t startSeqno,
213231
uint64_t endSeqno,
214232
std::optional<uint64_t> highCompletedSeqno,
215233
uint64_t maxVisibleSeqno,
216-
std::optional<uint64_t> timestamp);
234+
std::optional<uint64_t> timestamp,
235+
SnapshotSource source);
217236

218237
/**
219238
* Queues a single "Out of Seqno Order" marker with the 'start' flag
@@ -759,9 +778,16 @@ class ActiveStream : public Stream,
759778
* the stream.
760779
* If firstMarkerSent is false this call will set it to true.
761780
* @param start a seqno we think should be the snapshot start
781+
* @param isCompleteSnapshot a boolean which was added by the History/CDC
782+
* work. This bool should be true for when the snapshot is not spread
783+
* over a >1 markers - which is what CDC can do when it has to send
784+
* a disk snapshot as NoHistory{a,b} followed by History{c,d}. If
785+
* this bool is true, the stream can state that the first snapshot
786+
* has been fully processed (the marker of the first snapshot).
762787
* @return the snapshot start to use
763788
*/
764-
uint64_t adjustStartIfFirstSnapshot(uint64_t start);
789+
uint64_t adjustStartIfFirstSnapshot(uint64_t start,
790+
bool isCompleteSnapshot);
765791

766792
/* The last sequence number queued from memory, but is yet to be
767793
snapshotted and put onto readyQ */

engines/ep/src/dcp/backfill_by_id_disk.cc

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ backfill_status_t DCPBackfillByIdDisk::create() {
9393
stream->setDead(cb::mcbp::DcpStreamEndStatus::BackfillFail);
9494
transitionState(backfill_state_done);
9595
} else {
96+
// Will check if a history scan is required.
97+
setupForHistoryScan(*stream, *scanCtx, 0);
98+
9699
bool markerSent = stream->markOSODiskSnapshot(scanCtx->maxSeqno);
97100
if (markerSent) {
98101
transitionState(backfill_state_scanning);
@@ -125,11 +128,29 @@ backfill_status_t DCPBackfillByIdDisk::scan() {
125128
return backfill_success;
126129
}
127130

128-
transitionState(backfill_state_completing);
131+
if (historyScan) {
132+
complete(*stream, false);
133+
transitionState(backfill_state_scanning_history_snapshot);
134+
} else {
135+
transitionState(backfill_state_completing);
136+
}
129137

130138
return backfill_success;
131139
}
132140

141+
void DCPBackfillByIdDisk::complete(ActiveStream& stream, bool cancelled) {
142+
stream.completeOSOBackfill(
143+
scanCtx->maxSeqno, runtime, scanCtx->diskBytesRead);
144+
145+
auto severity = cancelled ? spdlog::level::level_enum::info
146+
: spdlog::level::level_enum::debug;
147+
stream.log(severity,
148+
"({}) Backfill task cid:{} {}",
149+
vbid,
150+
cid.to_string(),
151+
cancelled ? "cancelled" : "finished");
152+
}
153+
133154
void DCPBackfillByIdDisk::complete(bool cancelled) {
134155
auto stream = streamPtr.lock();
135156
if (!stream) {
@@ -143,16 +164,7 @@ void DCPBackfillByIdDisk::complete(bool cancelled) {
143164
return;
144165
}
145166

146-
stream->completeOSOBackfill(
147-
scanCtx->maxSeqno, runtime, scanCtx->diskBytesRead);
148-
149-
auto severity = cancelled ? spdlog::level::level_enum::info
150-
: spdlog::level::level_enum::debug;
151-
stream->log(severity,
152-
"({}) Backfill task cid:{} {}",
153-
vbid,
154-
cid.to_string(),
155-
cancelled ? "cancelled" : "finished");
167+
complete(*stream, cancelled);
156168

157169
transitionState(backfill_state_done);
158170
}

engines/ep/src/dcp/backfill_by_id_disk.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ class DCPBackfillByIdDisk : public DCPBackfillDisk {
5050
*/
5151
void complete(bool cancelled) override;
5252

53+
void complete(ActiveStream& stream, bool cancelled);
54+
5355
/// collection to scan for
5456
CollectionID cid;
5557
};

engines/ep/src/dcp/backfill_by_seqno_disk.cc

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,13 @@ backfill_status_t DCPBackfillBySeqnoDisk::create() {
141141

142142
stream->setDead(cb::mcbp::DcpStreamEndStatus::Rollback);
143143
transitionState(backfill_state_done);
144+
return backfill_success;
145+
}
146+
147+
// Check if a history scan is required or if only a history scan is required
148+
if (setupForHistoryScan(*stream, *scanCtx, startSeqno)) {
149+
// The scan is completely inside the history window
150+
transitionState(backfill_state_scanning_history_snapshot);
144151
} else {
145152
bool markerSent = markDiskSnapshot(*stream, *scanCtx, *kvstore);
146153

@@ -177,8 +184,12 @@ backfill_status_t DCPBackfillBySeqnoDisk::scan() {
177184
auto& bySeqnoCtx = dynamic_cast<BySeqnoScanContext&>(*scanCtx);
178185
switch (kvstore->scan(bySeqnoCtx)) {
179186
case scan_success:
180-
stream->setBackfillScanLastRead(scanCtx->lastReadSeqno);
181-
transitionState(backfill_state_completing);
187+
if (historyScan) {
188+
transitionState(backfill_state_scanning_history_snapshot);
189+
} else {
190+
stream->setBackfillScanLastRead(scanCtx->lastReadSeqno);
191+
transitionState(backfill_state_completing);
192+
}
182193
return backfill_success;
183194
case scan_again:
184195
// Scan should run again (e.g. was paused by callback)
@@ -269,11 +280,22 @@ bool DCPBackfillBySeqnoDisk::markDiskSnapshot(ActiveStream& stream,
269280
if (stream.getFilter().isLegacyFilter()) {
270281
return markLegacyDiskSnapshot(stream, scanCtx, kvs);
271282
}
272-
return stream.markDiskSnapshot(startSeqno,
273-
scanCtx.maxSeqno,
274-
scanCtx.persistedCompletedSeqno,
275-
scanCtx.maxVisibleSeqno,
276-
scanCtx.timestamp);
283+
// HistoryScan: If a disk snapshot is being "split" into non-history and
284+
// history ranges, then the endSeqno of this first range should show the
285+
// entire snapshot. E.g.
286+
// disk snapshot is [a...d], but split
287+
// no-history[a..b]
288+
// history[c..d]
289+
// Then all of the markers from this backfill stats start:a, end:d and mvs
290+
// hcs can only be valid once d is reached.
291+
return stream.markDiskSnapshot(
292+
startSeqno,
293+
historyScan ? historyScan->snapshotMaxSeqno : scanCtx.maxSeqno,
294+
scanCtx.persistedCompletedSeqno,
295+
scanCtx.maxVisibleSeqno,
296+
scanCtx.timestamp,
297+
historyScan ? ActiveStream::SnapshotSource::NoHistoryPrologue
298+
: ActiveStream::SnapshotSource::NoHistory);
277299
}
278300

279301
// This function is used for backfills where the stream is configured as a
@@ -348,7 +370,8 @@ bool DCPBackfillBySeqnoDisk::markLegacyDiskSnapshot(ActiveStream& stream,
348370
scanCtx.maxSeqno,
349371
scanCtx.persistedCompletedSeqno,
350372
scanCtx.maxVisibleSeqno,
351-
scanCtx.timestamp);
373+
scanCtx.timestamp,
374+
ActiveStream::SnapshotSource::NoHistory);
352375
}
353376

354377
// Need to figure out the maxSeqno/maxVisibleSeqno for calling
@@ -433,7 +456,12 @@ bool DCPBackfillBySeqnoDisk::markLegacyDiskSnapshot(ActiveStream& stream,
433456
if (gv.item->isCommitted()) {
434457
// Step 3. If this is a committed item, done.
435458
return stream.markDiskSnapshot(
436-
startSeqno, stats.highSeqno, {}, stats.highSeqno, {});
459+
startSeqno,
460+
stats.highSeqno,
461+
{},
462+
stats.highSeqno,
463+
{},
464+
ActiveStream::SnapshotSource::NoHistory);
437465
}
438466
} else if (gv.getStatus() != cb::engine_errc::no_such_key) {
439467
stream.log(spdlog::level::level_enum::warn,
@@ -546,8 +574,12 @@ bool DCPBackfillBySeqnoDisk::markLegacyDiskSnapshot(ActiveStream& stream,
546574
cb.maxVisibleSeqno < backfillRangeEndSeqno) {
547575
stream.setEndSeqno(cb.maxVisibleSeqno);
548576
}
549-
return stream.markDiskSnapshot(
550-
startSeqno, cb.maxVisibleSeqno, {}, cb.maxVisibleSeqno, {});
577+
return stream.markDiskSnapshot(startSeqno,
578+
cb.maxVisibleSeqno,
579+
{},
580+
cb.maxVisibleSeqno,
581+
{},
582+
ActiveStream::SnapshotSource::NoHistory);
551583
} else {
552584
endStreamIfNeeded();
553585
// Found nothing committed at all

0 commit comments

Comments
 (0)