Skip to content

feat(eviction): Add eviction based on RSS memory. DRAFT #4772

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/server/dfly_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,22 @@ Usage: dragonfly [FLAGS]
// export MIMALLOC_VERBOSE=1 to see the options before the override.
mi_option_enable(mi_option_show_errors);
mi_option_set(mi_option_max_warnings, 0);

mi_option_enable(mi_option_purge_decommits);
DCHECK(mi_option_get(mi_option_reset_decommits) == 1);

mi_option_set(mi_option_purge_delay, 0);
DCHECK(!mi_option_get(mi_option_reset_delay));

/*
mi_option_arena_eager_commit
MIMALLOC_PAGE_RESET=1
MIMALLOC_RESET_DECOMMITS=1
MIMALLOC_ABANDONED_PAGE_RESET=1
MIMALLOC_RESET_DELAY=1

MIMALLOC_ARENA_EAGER_COMMIT=0
*/

fb2::SetDefaultStackResource(&fb2::std_malloc_resource, kFiberDefaultStackSize);

Expand Down
124 changes: 85 additions & 39 deletions src/server/engine_shard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ extern "C" {
#include "server/search/doc_index.h"
#include "server/server_state.h"
#include "server/tiered_storage.h"
#include "server/tiering/common.h"
#include "server/transaction.h"
#include "util/fibers/proactor_base.h"

using namespace std;
using namespace ::dfly::tiering::literals;

ABSL_FLAG(float, mem_defrag_threshold, 0.7,
"Minimum percentage of used memory relative to maxmemory cap before running "
Expand Down Expand Up @@ -65,6 +67,9 @@ ABSL_FLAG(double, eviction_memory_budget_threshold, 0.1,
"Eviction starts when the free memory (including RSS memory) drops below "
"eviction_memory_budget_threshold * max_memory_limit.");

ABSL_FLAG(uint64_t, force_decommit_threshold, 8_MB,
"The threshold of memory to force decommit when memory is under pressure.");

ABSL_DECLARE_FLAG(uint32_t, max_eviction_per_heartbeat);

namespace dfly {
Expand Down Expand Up @@ -216,45 +221,6 @@ size_t CalculateHowManyBytesToEvictOnShard(size_t global_memory_limit, size_t gl
return shard_budget < shard_memory_threshold ? (shard_memory_threshold - shard_budget) : 0;
}

/* Calculates the number of bytes to evict based on memory and rss memory usage. */
size_t CalculateEvictionBytes() {
const size_t shards_count = shard_set->size();
const double eviction_memory_budget_threshold = GetFlag(FLAGS_eviction_memory_budget_threshold);

const size_t shard_memory_budget_threshold =
size_t(max_memory_limit * eviction_memory_budget_threshold) / shards_count;

const size_t global_used_memory = used_mem_current.load(memory_order_relaxed);

// Calculate how many bytes we need to evict on this shard
size_t goal_bytes = CalculateHowManyBytesToEvictOnShard(max_memory_limit, global_used_memory,
shard_memory_budget_threshold);

// TODO: Eviction due to rss usage is not working well as it causes eviction
// of to many keys untill we finally see decrease in rss. We need to improve
// this logic before we enable it.
/*
const double rss_oom_deny_ratio = ServerState::tlocal()->rss_oom_deny_ratio;
// If rss_oom_deny_ratio is set, we should evict depending on rss memory too
if (rss_oom_deny_ratio > 0.0) {
const size_t max_rss_memory = size_t(rss_oom_deny_ratio * max_memory_limit);
// We start eviction when we have less than eviction_memory_budget_threshold * 100% of free rss
memory const size_t shard_rss_memory_budget_threshold =
size_t(max_rss_memory * eviction_memory_budget_threshold) / shards_count;

// Calculate how much rss memory is used by all shards
const size_t global_used_rss_memory = rss_mem_current.load(memory_order_relaxed);

// Try to evict more bytes if we are close to the rss memory limit
goal_bytes = std::max(
goal_bytes, CalculateHowManyBytesToEvictOnShard(max_rss_memory, global_used_rss_memory,
shard_rss_memory_budget_threshold));
}
*/

return goal_bytes;
}

} // namespace

__thread EngineShard* EngineShard::shard_ = nullptr;
Expand Down Expand Up @@ -794,6 +760,7 @@ void EngineShard::RetireExpiredAndEvict() {
DbContext db_cntx;
db_cntx.time_now_ms = GetCurrentTimeMs();

size_t deleted_bytes = 0;
size_t eviction_goal = GetFlag(FLAGS_enable_heartbeat_eviction) ? CalculateEvictionBytes() : 0;

for (unsigned i = 0; i < db_slice.db_array_size(); ++i) {
Expand All @@ -805,6 +772,7 @@ void EngineShard::RetireExpiredAndEvict() {
if (expt->size() > pt->size() / 4) {
DbSlice::DeleteExpiredStats stats = db_slice.DeleteExpiredStep(db_cntx, ttl_delete_target);

deleted_bytes += stats.deleted_bytes;
eviction_goal -= std::min(eviction_goal, size_t(stats.deleted_bytes));
counter_[TTL_TRAVERSE].IncBy(stats.traversed);
counter_[TTL_DELETE].IncBy(stats.deleted);
Expand All @@ -820,9 +788,87 @@ void EngineShard::RetireExpiredAndEvict() {
<< " bytes. Max eviction per heartbeat: "
<< GetFlag(FLAGS_max_eviction_per_heartbeat);

deleted_bytes += evicted_bytes;
eviction_goal -= std::min(eviction_goal, evicted_bytes);
}
}

if (deleted_bytes > 0) {
// Fast decommit without force
// No force to not preemt during decommit
mi_heap_t* heap = ServerState::tlocal()->data_heap();
mi_heap_collect(heap, false);
}

auto& deleted_bytes_before_decommit = eviction_state_.deleted_bytes_before_decommit;
eviction_state_.deleted_bytes_before_rss_update += deleted_bytes;
deleted_bytes_before_decommit += deleted_bytes;

if (deleted_bytes_before_decommit >= absl::GetFlag(FLAGS_force_decommit_threshold)) {
// Decommit with force
deleted_bytes_before_decommit = 0;
ServerState::tlocal()->DecommitMemory(ServerState::kAllMemory);

io::Result<io::StatusData> sdata_res = io::ReadStatusInfo();
if (sdata_res) {
const size_t total_rss = FetchRssMemory(sdata_res.value());
rss_mem_current.store(total_rss, memory_order_relaxed);
}
}
}

size_t EngineShard::CalculateEvictionBytes() {
const size_t shards_count = shard_set->size();
const double eviction_memory_budget_threshold = GetFlag(FLAGS_eviction_memory_budget_threshold);

const size_t shard_memory_budget_threshold =
size_t(max_memory_limit * eviction_memory_budget_threshold) / shards_count;

const size_t global_used_memory = used_mem_current.load(memory_order_relaxed);

// Calculate how many bytes we need to evict on this shard
size_t goal_bytes = CalculateHowManyBytesToEvictOnShard(max_memory_limit, global_used_memory,
shard_memory_budget_threshold);

DVLOG(2) << "Memory goal bytes: " << goal_bytes << ", used memory: " << global_used_memory
<< ", memory limit: " << max_memory_limit;

// If rss_oom_deny_ratio is set, we should evict depending on rss memory too
const double rss_oom_deny_ratio = ServerState::tlocal()->rss_oom_deny_ratio;
if (rss_oom_deny_ratio > 0.0) {
const size_t max_rss_memory = size_t(rss_oom_deny_ratio * max_memory_limit);
/* We start eviction when we have less than eviction_memory_budget_threshold * 100% of free rss
* memory */
const size_t shard_rss_memory_budget_threshold =
size_t(max_rss_memory * eviction_memory_budget_threshold) / shards_count;

// Calculate how much rss memory is used by all shards
const size_t global_used_rss_memory = rss_mem_current.load(memory_order_relaxed);

auto& global_rss_memory_at_prev_eviction = eviction_state_.global_rss_memory_at_prev_eviction;
auto& deleted_bytes_before_rss_update = eviction_state_.deleted_bytes_before_rss_update;
if (global_used_rss_memory < eviction_state_.global_rss_memory_at_prev_eviction) {
deleted_bytes_before_rss_update -=
std::min(deleted_bytes_before_rss_update,
(global_rss_memory_at_prev_eviction - global_used_rss_memory) / shards_count);
}

global_rss_memory_at_prev_eviction = global_used_rss_memory;

// Try to evict more bytes if we are close to the rss memory limit
const size_t rss_goal_bytes = CalculateHowManyBytesToEvictOnShard(
max_rss_memory, global_used_rss_memory - deleted_bytes_before_rss_update * shards_count,
shard_rss_memory_budget_threshold);

LOG_IF(INFO, rss_goal_bytes > 0)
<< "Rss memory goal bytes: " << rss_goal_bytes
<< ", rss used memory: " << global_used_rss_memory
<< ", rss memory limit: " << max_rss_memory
<< ", deleted_bytes_before_rss_update: " << deleted_bytes_before_rss_update;

goal_bytes = std::max(goal_bytes, rss_goal_bytes);
}
return goal_bytes;
}

void EngineShard::CacheStats() {
Expand Down
10 changes: 10 additions & 0 deletions src/server/engine_shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,12 @@ class EngineShard {
void ResetScanState();
};

struct EvictionTaskState {
size_t deleted_bytes_before_decommit = 0;
size_t deleted_bytes_before_rss_update = 0;
size_t global_rss_memory_at_prev_eviction = 0;
};

EngineShard(util::ProactorBase* pb, mi_heap_t* heap);

// blocks the calling fiber.
Expand All @@ -223,6 +229,9 @@ class EngineShard {
void Heartbeat();
void RetireExpiredAndEvict();

/* Calculates the number of bytes to evict based on memory and rss memory usage. */
size_t CalculateEvictionBytes();

void CacheStats();

// We are running a task that checks whether we need to
Expand Down Expand Up @@ -263,6 +272,7 @@ class EngineShard {
IntentLock shard_lock_;

uint32_t defrag_task_ = 0;
EvictionTaskState eviction_state_; // Used on eviction fiber
util::fb2::Fiber fiber_heartbeat_periodic_;
util::fb2::Done fiber_heartbeat_periodic_done_;

Expand Down
4 changes: 1 addition & 3 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -998,9 +998,7 @@ bool ShouldDenyOnOOM(const CommandId* cid) {
uint64_t start_ns = absl::GetCurrentTimeNanos();
auto memory_stats = etl.GetMemoryUsage(start_ns);

if (memory_stats.used_mem > max_memory_limit ||
(etl.rss_oom_deny_ratio > 0 &&
memory_stats.rss_mem > (max_memory_limit * etl.rss_oom_deny_ratio))) {
if (memory_stats.used_mem > max_memory_limit) {
DLOG(WARNING) << "Out of memory, used " << memory_stats.used_mem << " ,rss "
<< memory_stats.rss_mem << " ,limit " << max_memory_limit;
etl.stats.oom_error_cmd_cnt++;
Expand Down
Loading
Loading