Skip to content

Commit f6ed489

Browse files
committed
MB-35458 [SR]: Move SyncWrite completion to bg DurabilityCompletionTask
[[Re-apply after fixing error in DurabilityCompletionTask::run (skipping last vBucket).]] Change how SyncWrites which are Resolved and awaiting Completion are handled, by moving the final VBucket::commit() / abort() into a background task - DurabilityCompletionTask. +Background+ There are two reasons for making this change: a) Performance - specifically latency of front-end worker threads. By moving completion into a background task, we reduce the amount of work done on the thread which actually detected the SyncWrite was resolved - typically the front-end DCP threads when a DCP_SEQNO_ACK is processed. Given that we SEQNO_ACK at the end of Snapshot, A single SEQNO_ACK could result in committing multiple SyncWrites. Committing one SyncWrite is similar to a normal front-end Set operation, so there is potentially a non-trivial amount of work needed to be done when completing SyncWrites, which could tie up the front-end thread (causing other Connections to have to wait) for a noticable amount of time. b) Simplification of lock management. Doing completion in a background task simplifies lock management, for example we avoid lock inversions with earlier locks acquired during dcpSeqnoAck when attemping to later call notifySeqnoAvailable when this was done on the original thread. +Problem+ While (a) was the first reason identified for making this change (see MB-33092), (b) is the reason this change is being made now. During testing the following lock-order-inversion was seen: WARNING: ThreadSanitizer: lock-order-inversion (potential deadlock) Cycle in lock order graph: Stream::streamMutex => StreamContainer::rwlock => Stream::streamMutex The crux of the issue is the processing of DCP_SEQNO_ACKNOWLEDGED messages by the DcpProducer - this acquires the Stream::streamMutex before calling VBucket::seqnoAcknowledged(), however that function currently results in VBucket::commit() being called to synchronously complete the SyncWrite; which in turn must nodify all connected replica that a new seqno is available, requiring StreamContainer::rwlock to be acquired: Mutex StreamContainer::rwlock acquired here while holding mutex Stream::streamMutex in thread T15: ... couchbase#6 StreamContainer<std::shared_ptr<Stream> >::rlock() #7 DcpProducer::notifySeqnoAvailable(Vbid, unsigned long) ... #13 VBucket::commit(...) #14 ActiveDurabilityMonitor::commit(...) #15 ActiveDurabilityMonitor::processCompletedSyncWriteQueue() #16 ActiveDurabilityMonitor::seqnoAckReceived(...) #17 VBucket::seqnoAcknowledged(...) #18 ActiveStream::seqnoAck(...) #19 DcpProducer::seqno_acknowledged(...) ... Mutex Stream::streamMutex previously acquired by the same thread here: ... couchbase#3 std::lock_guard<std::mutex>::lock_guard(std::mutex&) couchbase#4 ActiveStream::seqnoAck(...) couchbase#5 DcpProducer::seqno_acknowledged(...) ... This conflicts with the ordering seen when sending items out on the DCP connection - inside DcpProducer::step() where the StreamContainer::rwlock is acquired first, then ActiveStream::mutex acquired later: Mutex Stream::streamMutex acquired here while holding mutex StreamContainer::rwlock in thread T15: ... couchbase#3 std::lock_guard<std::mutex>::lock_guard(std::mutex&) couchbase#4 ActiveStream::next() couchbase#5 DcpProducer::getNextItem() couchbase#6 DcpProducer::step(dcp_message_producers*) ... Mutex StreamContainer::rwlock previously acquired by the same thread here: #0 pthread_rwlock_rdlock <null> (libtsan.so.0+0x00000002c98b) ... couchbase#4 std::shared_lock<cb::RWLock>::shared_lock(cb::RWLock&) couchbase#5 StreamContainer<>::ResumableIterationHandle::ResumableIterationHandle() couchbase#6 StreamContainer<>::startResumable() #7 DcpProducer::getNextItem() #8 DcpProducer::step(dcp_message_producers*) ... +Solution+ The processing of resolved SyncWrites moved into a new background task. Instead of immediately processing them within ActiveDM::seqnoAckReceived(), that function notifies the new NonIO DurabilityCompletionTask that there are SyncWrites waiting for completion. DurabilityCompletionTask maintains a bool per vBucket indicating if there are SyncWrites for that vBucket pending completion. When the task is run, for each flag which is true it calls VBucket::processResolvedSyncWrites() for the associated VBucket. +Implementation Notes+ Currently there is just a single DurabilityCompletionTask (per Bucket), this was chosen as 1 task per vBucket (i.e. 1024 per Bucket) would be inefficient for our current background task scheduler (both in terms of latency to schedule each task for only one vBucket's worth of work, and in terms of managing that many tasks in the future queue). However, that does _potentially_ mean there's fewer resources (threads) available to complete SyncWrites on - previously that work could be done concurrently on all frontend threads (~O(num_cpus). Now the same work only has 1 thread available to run on (there's only a single DurabilityCompletionTask). _If_ this becomes a bottleneck we could look at increasing the number of DurabilityCompletionTask - e.g. sharding all vBuckets across multiple tasks like flusher / bgfetcher. Change-Id: I33ecfa78b03b4d2120b5d05f54984b24ce038fd8 Reviewed-on: http://review.couchbase.org/113749 Reviewed-by: Ben Huddleston <ben.huddleston@couchbase.com> Tested-by: Build Bot <build@couchbase.com>
1 parent dd410e5 commit f6ed489

35 files changed

+457
-75
lines changed

engines/ep/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ ADD_LIBRARY(ep_objs OBJECT
227227
src/defragmenter_visitor.cc
228228
src/diskdockey.cc
229229
src/durability/active_durability_monitor.cc
230+
src/durability/durability_completion_task.cc
230231
src/durability/durability_monitor.cc
231232
src/durability/durability_monitor_impl.cc
232233
src/durability/passive_durability_monitor.cc

engines/ep/benchmarks/defragmenter_bench.cc

+1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class DefragmentBench : public benchmark::Fixture {
5656
/*table*/ nullptr,
5757
std::make_shared<DummyCB>(),
5858
/*newSeqnoCb*/ nullptr,
59+
[](Vbid) { return; },
5960
NoopSyncWriteCompleteCb,
6061
NoopSeqnoAckCb,
6162
config,

engines/ep/benchmarks/item_compressor_bench.cc

+1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class ItemCompressorBench : public benchmark::Fixture {
5555
/*table*/ nullptr,
5656
std::make_shared<DummyCB>(),
5757
/*newSeqnoCb*/ nullptr,
58+
[](Vbid) { return; },
5859
NoopSyncWriteCompleteCb,
5960
NoopSeqnoAckCb,
6061
config,

engines/ep/src/durability/active_durability_monitor.cc

+13-9
Original file line numberDiff line numberDiff line change
@@ -545,7 +545,7 @@ void ActiveDurabilityMonitor::setReplicationTopology(
545545
s->setReplicationTopology(topology, *resolvedQueue);
546546
}
547547

548-
processCompletedSyncWriteQueue();
548+
checkForResolvedSyncWrites();
549549
}
550550

551551
int64_t ActiveDurabilityMonitor::getHighPreparedSeqno() const {
@@ -620,8 +620,9 @@ ENGINE_ERROR_CODE ActiveDurabilityMonitor::seqnoAckReceived(
620620
seqnoAckReceivedPostProcessHook();
621621
}
622622

623-
// Process the Completed Queue, committing all items and removing them.
624-
processCompletedSyncWriteQueue();
623+
// Check if any there's now any resolved SyncWrites which should be
624+
// completed.
625+
checkForResolvedSyncWrites();
625626

626627
return ENGINE_SUCCESS;
627628
}
@@ -640,7 +641,7 @@ void ActiveDurabilityMonitor::processTimeout(
640641
// the correct locks).
641642
state.wlock()->removeExpired(asOf, *resolvedQueue);
642643

643-
processCompletedSyncWriteQueue();
644+
checkForResolvedSyncWrites();
644645
}
645646

646647
void ActiveDurabilityMonitor::notifyLocalPersistence() {
@@ -729,6 +730,13 @@ void ActiveDurabilityMonitor::addStatsForChain(
729730
}
730731
}
731732

733+
void ActiveDurabilityMonitor::checkForResolvedSyncWrites() {
734+
if (resolvedQueue->empty()) {
735+
return;
736+
}
737+
vb.notifySyncWritesPendingCompletion();
738+
}
739+
732740
void ActiveDurabilityMonitor::processCompletedSyncWriteQueue() {
733741
std::lock_guard<ResolvedQueue::ConsumerLock> lock(
734742
resolvedQueue->getConsumerLock());
@@ -1697,11 +1705,7 @@ void ActiveDurabilityMonitor::checkForCommit() {
16971705
// the resolvedQueue (under the correct locks).
16981706
state.wlock()->updateHighPreparedSeqno(*resolvedQueue);
16991707

1700-
// @todo: Consider to commit in a dedicated function for minimizing
1701-
// contention on front-end threads, as this function is supposed to
1702-
// execute under VBucket-level lock.
1703-
1704-
processCompletedSyncWriteQueue();
1708+
checkForResolvedSyncWrites();
17051709
}
17061710

17071711
template <class exception>

engines/ep/src/durability/active_durability_monitor.h

+9-3
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,12 @@ class ActiveDurabilityMonitor : public DurabilityMonitor {
282282
*/
283283
void removedQueuedAck(const std::string& node);
284284

285+
/**
286+
* For all items in the completedSWQueue, call VBucket::commit /
287+
* VBucket::abort as appropriate, then remove the item from the queue.
288+
*/
289+
void processCompletedSyncWriteQueue();
290+
285291
/**
286292
* @return all of the currently tracked writes
287293
*/
@@ -363,10 +369,10 @@ class ActiveDurabilityMonitor : public DurabilityMonitor {
363369
const ReplicationChain& chain) const;
364370

365371
/**
366-
* For all items in the completedSWQueue, call VBucket::commit /
367-
* VBucket::abort as appropriate, then remove the item from the queue.
372+
* Checks if the resolvedQueue contains any SyncWrites awaiting completion,
373+
* and if so notifies the VBucket.
368374
*/
369-
void processCompletedSyncWriteQueue();
375+
void checkForResolvedSyncWrites();
370376

371377
// The stats object for the owning Bucket
372378
EPStats& stats;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2+
/*
3+
* Copyright 2019 Couchbase, Inc
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#include "durability_completion_task.h"
19+
20+
#include "ep_engine.h"
21+
#include "executorpool.h"
22+
#include "vbucket.h"
23+
24+
#include <climits>
25+
26+
using namespace std::chrono_literals;
27+
28+
DurabilityCompletionTask::DurabilityCompletionTask(
29+
EventuallyPersistentEngine& engine)
30+
: GlobalTask(&engine, TaskId::DurabilityCompletionTask),
31+
pendingVBs(engine.getConfiguration().getMaxVbuckets()),
32+
vbid(0) {
33+
for (auto& vb : pendingVBs) {
34+
vb.store(false);
35+
}
36+
}
37+
38+
bool DurabilityCompletionTask::run() {
39+
if (engine->getEpStats().isShutdown) {
40+
return false;
41+
}
42+
43+
// Start by putting ourselves back to sleep once run() completes.
44+
// If a new VB is notified (or a VB is re-notified after it is processed in
45+
// the loop below) then that will cause the task to be re-awoken.
46+
snooze(INT_MAX);
47+
// Clear the wakeUpScheduled flag - that allows notifySyncWritesToComplete()
48+
// to wake up (re-schedule) this task if new vBuckets have SyncWrites which
49+
// need completing.
50+
wakeUpScheduled.store(false);
51+
52+
const auto startTime = std::chrono::steady_clock::now();
53+
54+
// Loop for each vBucket, starting from where we previously left off.
55+
// For each vbucket, if the pending flag is set then clear it, and process
56+
// its resolved SyncWrites.
57+
for (size_t count = 0; count < pendingVBs.size();
58+
count++, vbid = (vbid + 1) % pendingVBs.size()) {
59+
if (pendingVBs[vbid].exchange(false)) {
60+
engine->getVBucket(Vbid(vbid))->processResolvedSyncWrites();
61+
}
62+
// Yield back to scheduler if we have exceeded the maximum runtime
63+
// for a single execution.
64+
auto runtime = std::chrono::steady_clock::now() - startTime;
65+
if (runtime > maxChunkDuration) {
66+
wakeUp();
67+
break;
68+
}
69+
}
70+
71+
return true;
72+
}
73+
74+
void DurabilityCompletionTask::notifySyncWritesToComplete(Vbid vbid) {
75+
bool expected = false;
76+
if (!pendingVBs[vbid.get()].compare_exchange_strong(expected, true)) {
77+
// This VBucket transitioned from false -> true - wake ourselves up so
78+
// we can start to process the SyncWrites.
79+
expected = false;
80+
81+
// Performance: Only wake up the task once (and don't repeatedly try to
82+
// wake if it's already scheduled to wake) - ExecutorPool::wake() isn't
83+
// super cheap so avoid it if already pending.
84+
if (wakeUpScheduled.compare_exchange_strong(expected, true)) {
85+
ExecutorPool::get()->wake(getId());
86+
}
87+
}
88+
}
89+
90+
const std::chrono::steady_clock::duration
91+
DurabilityCompletionTask::maxChunkDuration = 25ms;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2+
/*
3+
* Copyright 2019 Couchbase, Inc
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
#pragma once
18+
19+
#include "globaltask.h"
20+
#include <memcached/vbucket.h>
21+
22+
/*
23+
* This task is used to complete (commit or abort) all SyncWrites which have
24+
* been resolved by each vbucket's ActiveDM.
25+
*
26+
* This is done in a separate task to reduce the amount of work done on
27+
* the thread which actually detected the SyncWrite was resolved - typically
28+
* the front-end DCP threads when a DCP_SEQNO_ACK is processed.
29+
* Given that we SEQNO_ACK at the end of Snapshot, A single SEQNO_ACK could
30+
* result in committing multiple SyncWrites, and Committing one SyncWrite is
31+
* similar to a normal front-end Set operation, we want to move this to a
32+
* background task.
33+
*
34+
* Additionally, by doing this in a background task it simplifies lock
35+
* management, for example we avoid lock inversions with earlier locks acquired
36+
* during dcpSeqnoAck when attemping to later call notifySeqnoAvailable when
37+
* this was done on the original thread.
38+
*/
39+
class DurabilityCompletionTask : public GlobalTask {
40+
public:
41+
DurabilityCompletionTask(EventuallyPersistentEngine& engine);
42+
43+
bool run() override;
44+
45+
std::string getDescription() override {
46+
return "DurabilityCompletionTask";
47+
}
48+
49+
std::chrono::microseconds maxExpectedDuration() override {
50+
// Task shouldn't run much longer than maxChunkDuration; given we yield
51+
// after that duration - however _could_ exceed a bit given we check
52+
// the duration on each vBucket. As such add a 2x margin of error.
53+
return std::chrono::duration_cast<std::chrono::microseconds>(
54+
2 * maxChunkDuration);
55+
}
56+
57+
/**
58+
* Notifies the task that the given vBucket has SyncWrite(s) ready to
59+
* be completed.
60+
* If the given vBucket isn't already pending, then will wake up the task
61+
* for it to run.
62+
*/
63+
void notifySyncWritesToComplete(Vbid vbid);
64+
65+
private:
66+
/**
67+
* A flag for each (possible) Vbid, set to true if there are SyncWrites
68+
* which need to be resolved.
69+
*/
70+
std::vector<std::atomic_bool> pendingVBs;
71+
72+
/// The index of the vBucket to check for resolved SyncWrites next
73+
/// in run().
74+
/// Kept as member variable (and not just local) so we resume from the
75+
/// vBucket we left off from, to ensure fair scheduling.
76+
int vbid;
77+
78+
/**
79+
* Flag which is used to check if a wakeup has already been scheduled for
80+
* this task.
81+
*/
82+
std::atomic<bool> wakeUpScheduled{false};
83+
84+
/// Maximum duration this task should execute for before yielding back to
85+
/// the ExecutorPool (to allow other tasks to run).
86+
static const std::chrono::steady_clock::duration maxChunkDuration;
87+
};

engines/ep/src/ep_bucket.cc

+1
Original file line numberDiff line numberDiff line change
@@ -1169,6 +1169,7 @@ VBucketPtr EPBucket::makeVBucket(
11691169
std::move(table),
11701170
flusherCb,
11711171
std::move(newSeqnoCb),
1172+
makeSyncWriteResolvedCB(),
11721173
makeSyncWriteCompleteCB(),
11731174
makeSeqnoAckCB(),
11741175
engine.getConfiguration(),

engines/ep/src/ep_vb.cc

+2
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ EPVBucket::EPVBucket(Vbid i,
4747
std::unique_ptr<FailoverTable> table,
4848
std::shared_ptr<Callback<Vbid>> flusherCb,
4949
NewSeqnoCallback newSeqnoCb,
50+
SyncWriteResolvedCallback syncWriteResolvedCb,
5051
SyncWriteCompleteCallback syncWriteCb,
5152
SeqnoAckCallback seqnoAckCb,
5253
Configuration& config,
@@ -69,6 +70,7 @@ EPVBucket::EPVBucket(Vbid i,
6970
flusherCb,
7071
std::make_unique<StoredValueFactory>(st),
7172
std::move(newSeqnoCb),
73+
syncWriteResolvedCb,
7274
syncWriteCb,
7375
seqnoAckCb,
7476
config,

engines/ep/src/ep_vb.h

+1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ class EPVBucket : public VBucket {
3939
std::unique_ptr<FailoverTable> table,
4040
std::shared_ptr<Callback<Vbid>> flusherCb,
4141
NewSeqnoCallback newSeqnoCb,
42+
SyncWriteResolvedCallback syncWriteResolvedCb,
4243
SyncWriteCompleteCallback syncWriteCb,
4344
SeqnoAckCallback seqnoAckCb,
4445
Configuration& config,

engines/ep/src/ephemeral_bucket.cc

+1
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ VBucketPtr EphemeralBucket::makeVBucket(
208208
lastSnapEnd,
209209
std::move(table),
210210
std::move(newSeqnoCb),
211+
makeSyncWriteResolvedCB(),
211212
makeSyncWriteCompleteCB(),
212213
makeSeqnoAckCB(),
213214
engine.getConfiguration(),

engines/ep/src/ephemeral_vb.cc

+2
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ EphemeralVBucket::EphemeralVBucket(
4444
uint64_t lastSnapEnd,
4545
std::unique_ptr<FailoverTable> table,
4646
NewSeqnoCallback newSeqnoCb,
47+
SyncWriteResolvedCallback syncWriteResolvedCb,
4748
SyncWriteCompleteCallback syncWriteCb,
4849
SeqnoAckCallback seqnoAckCb,
4950
Configuration& config,
@@ -65,6 +66,7 @@ EphemeralVBucket::EphemeralVBucket(
6566
/*flusherCb*/ nullptr,
6667
std::make_unique<OrderedStoredValueFactory>(st),
6768
std::move(newSeqnoCb),
69+
syncWriteResolvedCb,
6870
syncWriteCb,
6971
seqnoAckCb,
7072
config,

engines/ep/src/ephemeral_vb.h

+1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ class EphemeralVBucket : public VBucket {
3939
uint64_t lastSnapEnd,
4040
std::unique_ptr<FailoverTable> table,
4141
NewSeqnoCallback newSeqnoCb,
42+
SyncWriteResolvedCallback syncWriteResolvedCb,
4243
SyncWriteCompleteCallback syncWriteCb,
4344
SeqnoAckCallback seqnoAckCb,
4445
Configuration& config,

engines/ep/src/kv_bucket.cc

+13
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
#include "connmap.h"
4040
#include "dcp/dcpconnmap.h"
4141
#include "defragmenter.h"
42+
#include "durability/durability_completion_task.h"
4243
#include "durability_timeout_task.h"
4344
#include "ep_engine.h"
4445
#include "ep_time.h"
@@ -450,6 +451,10 @@ bool KVBucket::initialize() {
450451
config.getDurabilityTimeoutTaskInterval()));
451452
ExecutorPool::get()->schedule(durabilityTimeoutTask);
452453

454+
durabilityCompletionTask =
455+
std::make_shared<DurabilityCompletionTask>(engine);
456+
ExecutorPool::get()->schedule(durabilityCompletionTask);
457+
453458
ExTask workloadMonitorTask =
454459
std::make_shared<WorkLoadMonitor>(&engine, false);
455460
ExecutorPool::get()->schedule(workloadMonitorTask);
@@ -2625,6 +2630,14 @@ uint16_t KVBucket::getNumOfVBucketsInState(vbucket_state_t state) const {
26252630
return vbMap.getVBStateCount(state);
26262631
}
26272632

2633+
SyncWriteResolvedCallback KVBucket::makeSyncWriteResolvedCB() {
2634+
return [this](Vbid vbid) {
2635+
if (this->durabilityCompletionTask) {
2636+
this->durabilityCompletionTask->notifySyncWritesToComplete(vbid);
2637+
}
2638+
};
2639+
}
2640+
26282641
SyncWriteCompleteCallback KVBucket::makeSyncWriteCompleteCB() {
26292642
auto& engine = this->engine;
26302643
return [&engine](const void* cookie, ENGINE_ERROR_CODE status) {

0 commit comments

Comments
 (0)