Skip to content

Commit ec02585

Browse files
committed
MB-35594: Don't return estimate=0 for dcp-takeover stats before backfill
The 'dcp-takeover' stats are used by ns_server to estimate how many mutations are remaining on a DCP stream. However, the estimate value is not updated until the backfill task has run once (and scanned the disk file). As such, if 'dcp-takeover' stats are requested before that first backfil task has run, then they can incorrectly report '0' backfill items. To address this, change backfillRemaining to be of type boost::optional, initialized to an empty optional. Only when the backfill scan has completed (when the number of items remaining is determined) is the optional populated. Then, when stats are requested use a new status value "calculating-item-count" if the optional is empty (i.e. before scan). Change-Id: Id7049a0c13a8aab429f137d2f4b293567e360638 Reviewed-on: http://review.couchbase.org/114894 Tested-by: Build Bot <build@couchbase.com> Reviewed-by: James Harrison <james.harrison@couchbase.com> Reviewed-by: Ben Huddleston <ben.huddleston@couchbase.com>
1 parent a9a4c3e commit ec02585

File tree

9 files changed

+156
-52
lines changed

9 files changed

+156
-52
lines changed

engines/ep/src/dcp/active_stream.cc

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ ActiveStream::ActiveStream(EventuallyPersistentEngine* e,
5555
isBackfillTaskRunning(false),
5656
pendingBackfill(false),
5757
lastReadSeqno(st_seqno),
58-
backfillRemaining(0),
58+
backfillRemaining(),
5959
lastReadSeqnoUnSnapshotted(st_seqno),
6060
lastSentSeqno(st_seqno),
6161
curChkSeqno(st_seqno),
@@ -477,6 +477,16 @@ void ActiveStream::setVBucketStateAckRecieved() {
477477
notifyStreamReady();
478478
}
479479

480+
void ActiveStream::setBackfillRemaining(size_t value) {
481+
std::lock_guard<std::mutex> guard(streamMutex);
482+
backfillRemaining = value;
483+
}
484+
485+
void ActiveStream::clearBackfillRemaining() {
486+
std::lock_guard<std::mutex> guard(streamMutex);
487+
backfillRemaining.reset();
488+
}
489+
480490
std::unique_ptr<DcpResponse> ActiveStream::backfillPhase(
481491
std::lock_guard<std::mutex>& lh) {
482492
auto resp = nextQueuedItem();
@@ -505,15 +515,15 @@ std::unique_ptr<DcpResponse> ActiveStream::backfillPhase(
505515
// Only DcpResponse objects representing items from "disk" have a size
506516
// so only update backfillRemaining when non-zero
507517
if (resp->getApproximateSize()) {
508-
if (backfillRemaining.load(std::memory_order_relaxed) > 0) {
509-
backfillRemaining.fetch_sub(1, std::memory_order_relaxed);
518+
Expects(backfillRemaining.is_initialized());
519+
if (*backfillRemaining > 0) {
520+
(*backfillRemaining)--;
510521
}
511522
}
512523
}
513524

514525
if (!isBackfillTaskRunning && readyQ.empty()) {
515526
// Given readyQ.empty() is True resp will be NULL
516-
backfillRemaining.store(0, std::memory_order_relaxed);
517527
// The previous backfill has completed. Check to see if another
518528
// backfill needs to be scheduled.
519529
if (pendingBackfill) {
@@ -747,20 +757,24 @@ void ActiveStream::addTakeoverStats(const AddStatFn& add_stat,
747757
return;
748758
}
749759

750-
size_t total = backfillRemaining.load(std::memory_order_relaxed);
751-
if (backfillRemaining == 0) {
752-
Expects(!isPending());
753-
}
754-
760+
size_t total = 0;
761+
const char* status = nullptr;
755762
if (isBackfilling()) {
756-
add_casted_stat("status", "backfilling", add_stat, cookie);
763+
if (backfillRemaining) {
764+
status = "backfilling";
765+
total += *backfillRemaining;
766+
} else {
767+
status = "calculating-item-count";
768+
}
757769
} else {
758-
add_casted_stat("status", "in-memory", add_stat, cookie);
770+
status = "in-memory";
771+
}
772+
add_casted_stat("status", status, add_stat, cookie);
773+
774+
if (backfillRemaining) {
775+
add_casted_stat(
776+
"backfillRemaining", *backfillRemaining, add_stat, cookie);
759777
}
760-
add_casted_stat("backfillRemaining",
761-
backfillRemaining.load(std::memory_order_relaxed),
762-
add_stat,
763-
cookie);
764778

765779
size_t vb_items = vb.getNumItems();
766780
size_t chk_items = 0;
@@ -1422,6 +1436,9 @@ void ActiveStream::scheduleBackfill_UNLOCKED(bool reschedule) {
14221436
producer->scheduleBackfillManager(
14231437
*vbucket, shared_from_this(), backfillStart, backfillEnd);
14241438
isBackfillTaskRunning.store(true);
1439+
/// Number of backfill items is unknown until the Backfill task
1440+
/// completes the scan phase - reset backfillRemaining counter.
1441+
backfillRemaining.reset();
14251442
} else {
14261443
if (reschedule) {
14271444
// Infrequent code path, see comment below.

engines/ep/src/dcp/active_stream.h

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -144,9 +144,12 @@ class ActiveStream : public Stream,
144144

145145
void setVBucketStateAckRecieved();
146146

147-
void incrBackfillRemaining(size_t by) {
148-
backfillRemaining.fetch_add(by, std::memory_order_relaxed);
149-
}
147+
/// Set the number of backfill items remaining to the given value.
148+
void setBackfillRemaining(size_t value);
149+
150+
/// Clears the number of backfill items remaining, setting to an empty
151+
/// (unknown) value.
152+
void clearBackfillRemaining();
150153

151154
void markDiskSnapshot(uint64_t startSeqno,
152155
uint64_t endSeqno,
@@ -352,12 +355,13 @@ class ActiveStream : public Stream,
352355
snapshotted and put onto readyQ */
353356
AtomicMonotonic<uint64_t, ThrowExceptionPolicy> lastReadSeqno;
354357

355-
/* backfillRemaining is a stat recording the amount of
356-
* items remaining to be read from disk. It is an atomic
357-
* because otherwise the function incrBackfillRemaining
358-
* must acquire the streamMutex lock.
358+
/* backfillRemaining is a stat recording the amount of items remaining to
359+
* be read from disk.
360+
* Before the number of items to be backfilled has been determined (disk
361+
* scanned) it is empty.
362+
* Guarded by streamMutex.
359363
*/
360-
std::atomic<size_t> backfillRemaining;
364+
boost::optional<size_t> backfillRemaining;
361365

362366
std::unique_ptr<DcpResponse> backfillPhase(std::lock_guard<std::mutex>& lh);
363367

engines/ep/src/dcp/backfill_disk.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ backfill_status_t DCPBackfillDisk::create() {
247247
stream->setDead(status);
248248
transitionState(backfill_state_done);
249249
} else {
250-
stream->incrBackfillRemaining(scanCtx->documentCount);
250+
stream->setBackfillRemaining(scanCtx->documentCount);
251251
stream->markDiskSnapshot(
252252
startSeqno, scanCtx->maxSeqno, scanCtx->highCompletedSeqno);
253253
transitionState(backfill_state_scanning);

engines/ep/src/dcp/backfill_memory.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,11 +150,11 @@ backfill_status_t DCPBackfillMemoryBuffered::create() {
150150
remaining count */
151151
while (rangeItr.curr() != rangeItr.end()) {
152152
if (static_cast<uint64_t>((*rangeItr).getBySeqno()) >= startSeqno) {
153-
/* Incr backfill remaining
153+
/* Set backfill remaining
154154
[EPHE TODO]: This will be inaccurate if do not backfill till end
155155
of the iterator
156156
*/
157-
stream->incrBackfillRemaining(rangeItr.count());
157+
stream->setBackfillRemaining(rangeItr.count());
158158

159159
/* Determine the endSeqno of the current snapshot.
160160
We want to send till requested endSeqno, but if that cannot

engines/ep/tests/ep_test_apis.cc

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1641,25 +1641,6 @@ void wait_for_stat_to_be_gte(EngineIface* h,
16411641
}
16421642
}
16431643

1644-
void wait_for_stat_to_be_lte(EngineIface* h,
1645-
const char* stat,
1646-
int final,
1647-
const char* stat_key,
1648-
const time_t max_wait_time_in_secs) {
1649-
useconds_t sleepTime = 128;
1650-
WaitTimeAccumulator<int> accumulator("to be less than or equal to", stat,
1651-
stat_key, final,
1652-
max_wait_time_in_secs);
1653-
for (;;) {
1654-
auto current = get_int_stat(h, stat, stat_key);
1655-
if (current <= final) {
1656-
break;
1657-
}
1658-
accumulator.incrementAndAbortIfLimitReached(current, sleepTime);
1659-
decayingSleep(&sleepTime);
1660-
}
1661-
}
1662-
16631644
void wait_for_expired_items_to_be(EngineIface* h,
16641645
int final,
16651646
const time_t max_wait_time_in_secs) {

engines/ep/tests/ep_test_apis.h

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -472,11 +472,14 @@ void wait_for_stat_to_be_gte(EngineIface* h,
472472
int final,
473473
const char* stat_key = NULL,
474474
const time_t max_wait_time_in_secs = 60);
475+
476+
template <typename T>
475477
void wait_for_stat_to_be_lte(EngineIface* h,
476478
const char* stat,
477-
int final,
479+
T final,
478480
const char* stat_key = NULL,
479481
const time_t max_wait_time_in_secs = 60);
482+
480483
void wait_for_expired_items_to_be(EngineIface* h,
481484
int final,
482485
const time_t max_wait_time_in_secs = 60);
@@ -677,6 +680,28 @@ void wait_for_stat_to_be(EngineIface* h,
677680
}
678681
}
679682

683+
template <typename T>
684+
void wait_for_stat_to_be_lte(EngineIface* h,
685+
const char* stat,
686+
T final,
687+
const char* stat_key,
688+
const time_t max_wait_time_in_secs) {
689+
useconds_t sleepTime = 128;
690+
WaitTimeAccumulator<T> accumulator("to be less than or equal to",
691+
stat,
692+
stat_key,
693+
final,
694+
max_wait_time_in_secs);
695+
for (;;) {
696+
auto current = get_stat<T>(h, stat, stat_key);
697+
if (current <= final) {
698+
break;
699+
}
700+
accumulator.incrementAndAbortIfLimitReached(current, sleepTime);
701+
decayingSleep(&sleepTime);
702+
}
703+
}
704+
680705
/**
681706
* Function that does an exponential wait for a 'val' to reach 'expected'
682707
*

engines/ep/tests/ep_testsuite_dcp.cc

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -788,10 +788,8 @@ ENGINE_ERROR_CODE TestDcpConsumer::openStreams() {
788788
std::stringstream stats_takeover;
789789
stats_takeover << "dcp-vbtakeover " << ctx.vbucket.get() << " "
790790
<< name.c_str();
791-
wait_for_stat_to_be_lte(h,
792-
"estimate",
793-
static_cast<int>(est),
794-
stats_takeover.str().c_str());
791+
wait_for_stat_to_be_lte(
792+
h, "estimate", est, stats_takeover.str().c_str());
795793
}
796794

797795
if (ctx.flags & DCP_ADD_STREAM_FLAG_DISKONLY) {

engines/ep/tests/mock/mock_stream.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ class MockActiveStream : public ActiveStream {
107107
return lastReadSeqno;
108108
}
109109

110-
int getNumBackfillItemsRemaining() const {
110+
boost::optional<size_t> getNumBackfillItemsRemaining() const {
111111
return backfillRemaining;
112112
}
113113

engines/ep/tests/module_tests/dcp_stream_test.cc

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -739,7 +739,7 @@ TEST_P(StreamTest, BackfillOnly) {
739739

740740
// Check that backfill stats have been updated correctly
741741
EXPECT_EQ(numItems, stream->getNumBackfillItems());
742-
EXPECT_EQ(numItems, stream->getNumBackfillItemsRemaining());
742+
EXPECT_EQ(numItems, *stream->getNumBackfillItemsRemaining());
743743

744744
destroy_dcp_stream();
745745
}
@@ -1554,6 +1554,85 @@ TEST_P(SingleThreadedActiveStreamTest, DiskSnapshotSendsChkMarker) {
15541554
producer->cancelCheckpointCreatorTask();
15551555
}
15561556

1557+
/// Test that disk backfill remaining isn't prematurely zero (before counts
1558+
/// read from disk by backfill task).
1559+
TEST_P(SingleThreadedActiveStreamTest, DiskBackfillInitializingItemsRemaining) {
1560+
auto vb = engine->getVBucket(vbid);
1561+
auto& ckptMgr = *vb->checkpointManager;
1562+
1563+
// Delete initial stream (so we can re-create after items are only available
1564+
// from disk.
1565+
stream.reset();
1566+
1567+
// Store 3 items (to check backfill remaining counts).
1568+
// Add items, flush it to disk, then clear checkpoint to force backfill.
1569+
store_item(vbid, makeStoredDocKey("key1"), "value");
1570+
store_item(vbid, makeStoredDocKey("key2"), "value");
1571+
store_item(vbid, makeStoredDocKey("key3"), "value");
1572+
ckptMgr.createNewCheckpoint();
1573+
1574+
flushVBucketToDiskIfPersistent(vbid, 3);
1575+
1576+
bool newCKptCreated;
1577+
ASSERT_EQ(3, ckptMgr.removeClosedUnrefCheckpoints(*vb, newCKptCreated));
1578+
1579+
// Re-create producer now we have items only on disk.
1580+
setupProducer();
1581+
ASSERT_TRUE(stream->isBackfilling());
1582+
1583+
// Should report empty itemsRemaining as that would mislead
1584+
// ns_server if they asked for stats before the backfill task runs (they
1585+
// would think backfill is complete).
1586+
EXPECT_FALSE(stream->getNumBackfillItemsRemaining());
1587+
1588+
bool statusFound = false;
1589+
auto checkStatusFn = [&statusFound](const char* key,
1590+
const uint16_t klen,
1591+
const char* val,
1592+
const uint32_t vlen,
1593+
gsl::not_null<const void*> cookie) {
1594+
if (std::string(key, klen) == "status") {
1595+
EXPECT_EQ(std::string(reinterpret_cast<const char*>(cookie.get())),
1596+
std::string(val, vlen));
1597+
statusFound = true;
1598+
}
1599+
};
1600+
1601+
// Should report status == "calculating_item_count" before backfill
1602+
// scan has occurred.
1603+
stream->addTakeoverStats(checkStatusFn, "calculating-item-count", *vb);
1604+
EXPECT_TRUE(statusFound);
1605+
1606+
// Run the backfill we scheduled when we transitioned to the backfilling
1607+
// state. Run the backfill task once to get initial item counts.
1608+
auto& bfm = producer->getBFM();
1609+
bfm.backfill();
1610+
EXPECT_EQ(3, *stream->getNumBackfillItemsRemaining());
1611+
// Should report status == "backfilling"
1612+
statusFound = false;
1613+
stream->addTakeoverStats(checkStatusFn, "backfilling", *vb);
1614+
EXPECT_TRUE(statusFound);
1615+
1616+
// Run again to actually scan (items remaining unchanged).
1617+
bfm.backfill();
1618+
EXPECT_EQ(3, *stream->getNumBackfillItemsRemaining());
1619+
statusFound = false;
1620+
stream->addTakeoverStats(checkStatusFn, "backfilling", *vb);
1621+
EXPECT_TRUE(statusFound);
1622+
1623+
// Finally run again to complete backfill (so it is shutdown in a clean
1624+
// fashion).
1625+
bfm.backfill();
1626+
1627+
// Consume the items from backfill; should update items remaining.
1628+
// Actually need to consume 4 items (snapshot_marker + 3x mutation).
1629+
stream->consumeBackfillItems(4);
1630+
EXPECT_EQ(0, *stream->getNumBackfillItemsRemaining());
1631+
statusFound = false;
1632+
stream->addTakeoverStats(checkStatusFn, "in-memory", *vb);
1633+
EXPECT_TRUE(statusFound);
1634+
}
1635+
15571636
/*
15581637
* MB-31410: In this test I simulate a DcpConsumer that receives messages
15591638
* while previous messages have been buffered. This simulates the system

0 commit comments

Comments
 (0)