Skip to content

Commit 63328b1

Browse files
committed
feat(server): flush slots traverse better yield (#4821)
Signed-off-by: adi_holden <adi@dragonflydb.io>
1 parent 419daac commit 63328b1

File tree

2 files changed

+24
-31
lines changed

2 files changed

+24
-31
lines changed

src/server/cluster/cluster_family_test.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -723,7 +723,7 @@ TEST_F(ClusterFamilyTest, FlushSlots) {
723723
ExpectConditionWithinTimeout([&]() {
724724
return RunPrivileged({"dflycluster", "flushslots", "0", "0"}) == "OK";
725725
});
726-
726+
util::ThisFiber::SleepFor(1ms);
727727
EXPECT_THAT(RunPrivileged({"dflycluster", "getslotinfo", "slots", "0", "1"}),
728728
RespArray(ElementsAre(
729729
RespArray(ElementsAre(IntArg(0), "key_count", IntArg(0), "total_reads", _,
@@ -732,7 +732,7 @@ TEST_F(ClusterFamilyTest, FlushSlots) {
732732
"total_writes", _, "memory_bytes", _)))));
733733

734734
EXPECT_EQ(RunPrivileged({"dflycluster", "flushslots", "0", "1"}), "OK");
735-
735+
util::ThisFiber::SleepFor(1ms);
736736
EXPECT_THAT(
737737
RunPrivileged({"dflycluster", "getslotinfo", "slots", "0", "1"}),
738738
RespArray(ElementsAre(RespArray(ElementsAre(IntArg(0), "key_count", IntArg(0), "total_reads",

src/server/db_slice.cc

Lines changed: 22 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -787,63 +787,56 @@ void DbSlice::Del(Context cntx, Iterator it) {
787787
}
788788

789789
void DbSlice::FlushSlotsFb(const cluster::SlotSet& slot_ids) {
790+
VLOG(1) << "Start FlushSlotsFb";
790791
// Slot deletion can take time as it traverses all the database, hence it runs in fiber.
791792
// We want to flush all the data of a slot that was added till the time the call to FlushSlotsFb
792793
// was made. Therefore we delete slots entries with version < next_version
793794
uint64_t next_version = 0;
795+
uint64_t del_count = 0;
794796

795797
std::string tmp;
796-
auto del_entry_cb = [&](PrimeTable::iterator it) {
797-
std::string_view key = it->first.GetSlice(&tmp);
798-
SlotId sid = KeySlot(key);
799-
if (slot_ids.Contains(sid) && it.GetVersion() < next_version) {
800-
PerformDeletion(Iterator::FromPrime(it), db_arr_[0].get());
798+
auto iterate_bucket = [&](PrimeTable::bucket_iterator it) {
799+
it.AdvanceIfNotOccupied();
800+
while (!it.is_done()) {
801+
std::string_view key = it->first.GetSlice(&tmp);
802+
SlotId sid = KeySlot(key);
803+
if (slot_ids.Contains(sid) && it.GetVersion() < next_version) {
804+
PerformDeletion(Iterator::FromPrime(it), db_arr_[0].get());
805+
++del_count;
806+
}
807+
++it;
801808
}
802-
return true;
803809
};
804810

805811
auto on_change = [&](DbIndex db_index, const ChangeReq& req) {
806812
FiberAtomicGuard fg;
807813
PrimeTable* table = GetTables(db_index).first;
808814

809-
auto iterate_bucket = [&](DbIndex db_index, PrimeTable::bucket_iterator it) {
810-
it.AdvanceIfNotOccupied();
811-
while (!it.is_done()) {
812-
del_entry_cb(it);
813-
++it;
814-
}
815-
};
816-
817815
if (const PrimeTable::bucket_iterator* bit = req.update()) {
818816
if (!bit->is_done() && bit->GetVersion() < next_version) {
819-
iterate_bucket(db_index, *bit);
817+
iterate_bucket(*bit);
820818
}
821819
} else {
822820
string_view key = get<string_view>(req.change);
823-
table->CVCUponInsert(
824-
next_version, key,
825-
[db_index, next_version, iterate_bucket](PrimeTable::bucket_iterator it) {
826-
DCHECK_LT(it.GetVersion(), next_version);
827-
iterate_bucket(db_index, it);
828-
});
821+
table->CVCUponInsert(next_version, key,
822+
[next_version, iterate_bucket](PrimeTable::bucket_iterator it) {
823+
DCHECK_LT(it.GetVersion(), next_version);
824+
iterate_bucket(it);
825+
});
829826
}
830827
};
831828
next_version = RegisterOnChange(std::move(on_change));
832829

833830
ServerState& etl = *ServerState::tlocal();
834831
PrimeTable* pt = &db_arr_[0]->prime;
835832
PrimeTable::Cursor cursor;
836-
uint64_t i = 0;
833+
837834
do {
838-
PrimeTable::Cursor next = pt->Traverse(cursor, del_entry_cb);
839-
++i;
835+
PrimeTable::Cursor next = pt->TraverseBuckets(cursor, iterate_bucket);
840836
cursor = next;
841-
if (i % 100 == 0) {
842-
ThisFiber::Yield();
843-
}
844-
837+
ThisFiber::Yield();
845838
} while (cursor && etl.gstate() != GlobalState::SHUTTING_DOWN);
846-
839+
VLOG(1) << "FlushSlotsFb del count is: " << del_count;
847840
UnregisterOnChange(next_version);
848841

849842
etl.DecommitMemory(ServerState::kDataHeap);

0 commit comments

Comments
 (0)