From 2cd1cbb200908f78ca7502c5cb41a181dee3822e Mon Sep 17 00:00:00 2001 From: Su Lifan Date: Wed, 24 Nov 2021 15:15:50 +0800 Subject: [PATCH 1/5] Initial support for keys following zipfian This patch allows memtier generate keys that follows zipfian distribution. An additional parameter --key-zipf-exp is introduced, meaning P(Key = n) ~ n^{-exp}, which is bounded to (0, 5) to be sane. The range of keys are limited to positive in this version. Signed-off-by: Su Lifan --- client.h | 4 ++ memtier_benchmark.1 | 5 ++ memtier_benchmark.cpp | 25 +++++++++- memtier_benchmark.h | 1 + obj_gen.cpp | 106 +++++++++++++++++++++++++++++++++++++++++- obj_gen.h | 18 ++++++- 6 files changed, 156 insertions(+), 3 deletions(-) diff --git a/client.h b/client.h index 06da93bd..08ac0875 100755 --- a/client.h +++ b/client.h @@ -124,6 +124,8 @@ class client : public connections_manager { return OBJECT_GENERATOR_KEY_RANDOM; } else if (cfg->key_pattern[index] == 'G') { return OBJECT_GENERATOR_KEY_GAUSSIAN; + } else if (cfg->key_pattern[index] == 'Z') { + return OBJECT_GENERATOR_KEY_ZIPFIAN; } else { if (index == key_pattern_set) return OBJECT_GENERATOR_KEY_SET_ITER; @@ -137,6 +139,8 @@ class client : public connections_manager { return OBJECT_GENERATOR_KEY_RANDOM; } else if (cmd->key_pattern == 'G') { return OBJECT_GENERATOR_KEY_GAUSSIAN; + } else if (cmd->key_pattern == 'Z') { + return OBJECT_GENERATOR_KEY_ZIPFIAN; } else { return index; } diff --git a/memtier_benchmark.1 b/memtier_benchmark.1 index c363b5bb..c67659df 100644 --- a/memtier_benchmark.1 +++ b/memtier_benchmark.1 @@ -184,6 +184,7 @@ Key ID maximum value (default: 10000000) \fB\-\-key\-pattern\fR=\fI\,PATTERN\/\fR Set:Get pattern (default: R:R) G for Gaussian distribution. +Z for Zipfian distribution (will limit keys to positive). R for uniform Random. S for Sequential. P for Parallel (Sequential were each client has a subset of the key\-range). @@ -195,6 +196,10 @@ The standard deviation used in the Gaussian distribution \fB\-\-key\-median\fR The median point used in the Gaussian distribution (default is the center of the key range) +.TP +\fB\-\-key\-zipf\-exp\fR +The exponent used in the zipf distribution, limit to (0, 5) +(default is 1, though any number >2 seems insane)\n .SS "WAIT Options:" .TP \fB\-\-wait\-ratio\fR=\fI\,RATIO\/\fR diff --git a/memtier_benchmark.cpp b/memtier_benchmark.cpp index c2b91ab7..318ae971 100755 --- a/memtier_benchmark.cpp +++ b/memtier_benchmark.cpp @@ -225,6 +225,7 @@ static void config_print_to_json(json_handler * jsonhandler, struct benchmark_co jsonhandler->write_obj("key_pattern" ,"\"%s\"", cfg->key_pattern); jsonhandler->write_obj("key_stddev" ,"%f", cfg->key_stddev); jsonhandler->write_obj("key_median" ,"%f", cfg->key_median); + jsonhandler->write_obj("key_zipf_exp" ,"%f", cfg->key_zipf_exp); jsonhandler->write_obj("reconnect_interval","%u", cfg->reconnect_interval); jsonhandler->write_obj("multi_key_get" ,"%u", cfg->multi_key_get); jsonhandler->write_obj("authenticate" ,"\"%s\"", cfg->authenticate ? cfg->authenticate : ""); @@ -365,6 +366,7 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf o_key_pattern, o_key_stddev, o_key_median, + o_key_zipf_exp, o_show_config, o_hide_histogram, o_print_percentiles, @@ -439,6 +441,7 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf { "key-pattern", 1, 0, o_key_pattern }, { "key-stddev", 1, 0, o_key_stddev }, { "key-median", 1, 0, o_key_median }, + { "key-zipf-exp", 1, 0, o_key_zipf_exp}, { "reconnect-interval", 1, 0, o_reconnect_interval }, { "multi-key-get", 1, 0, o_multi_key_get }, { "authenticate", 1, 0, 'a' }, @@ -690,6 +693,14 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf return -1; } break; + case o_key_zipf_exp: + endptr = NULL; + cfg->key_zipf_exp = strtod(optarg, &endptr); + if (cfg->key_zipf_exp <= 0 || cfg->key_zipf_exp >= 5 || !endptr || *endptr != '\0') { + fprintf(stderr, "error: key-zipf-exp must be within interval (0, 5).\n"); + return -1; + } + break; case o_key_pattern: cfg->key_pattern = optarg; @@ -697,12 +708,14 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf (cfg->key_pattern[key_pattern_set] != 'R' && cfg->key_pattern[key_pattern_set] != 'S' && cfg->key_pattern[key_pattern_set] != 'G' && + cfg->key_pattern[key_pattern_set] != 'Z' && cfg->key_pattern[key_pattern_set] != 'P') || (cfg->key_pattern[key_pattern_get] != 'R' && cfg->key_pattern[key_pattern_get] != 'S' && cfg->key_pattern[key_pattern_get] != 'G' && + cfg->key_pattern[key_pattern_get] != 'Z' && cfg->key_pattern[key_pattern_get] != 'P')) { - fprintf(stderr, "error: key-pattern must be in the format of [S/R/G/P]:[S/R/G/P].\n"); + fprintf(stderr, "error: key-pattern must be in the format of [S/R/G/P/Z]:[S/R/G/P/Z].\n"); return -1; } @@ -939,12 +952,15 @@ void usage() { " --key-pattern=PATTERN Set:Get pattern (default: R:R)\n" " G for Gaussian distribution.\n" " R for uniform Random.\n" + " Z for zipf distribution (will limit keys to positive).\n" " S for Sequential.\n" " P for Parallel (Sequential were each client has a subset of the key-range).\n" " --key-stddev The standard deviation used in the Gaussian distribution\n" " (default is key range / 6)\n" " --key-median The median point used in the Gaussian distribution\n" " (default is the center of the key range)\n" + " --key-zipf-exp The exponent used in the zipf distribution, limit to (0, 5)\n" + " (default is 1, though any number >2 seems insane)\n" "\n" "WAIT Options:\n" " --wait-ratio=RATIO Set:Wait ratio (default is no WAIT commands - 1:0)\n" @@ -1465,6 +1481,13 @@ int main(int argc, char *argv[]) obj_gen->set_key_distribution(cfg.key_stddev, cfg.key_median); } obj_gen->set_expiry_range(cfg.expiry_range.min, cfg.expiry_range.max); + if (cfg.key_pattern[key_pattern_set] == 'Z' || cfg.key_pattern[key_pattern_get] == 'Z') { + if (cfg.key_zipf_exp == 0.0) { + // user can't specify 0.0, so 0.0 means unset + cfg.key_zipf_exp = 1.0; + } + obj_gen->set_key_zipf_distribution(cfg.key_zipf_exp); + } // Prepare output file FILE *outfile; diff --git a/memtier_benchmark.h b/memtier_benchmark.h index 7bbab0a3..c5ff1568 100644 --- a/memtier_benchmark.h +++ b/memtier_benchmark.h @@ -79,6 +79,7 @@ struct benchmark_config { unsigned long long key_maximum; double key_stddev; double key_median; + double key_zipf_exp; const char *key_pattern; unsigned int reconnect_interval; int multi_key_get; diff --git a/obj_gen.cpp b/obj_gen.cpp index 1a6168e0..3183877f 100644 --- a/obj_gen.cpp +++ b/obj_gen.cpp @@ -150,6 +150,14 @@ object_generator::object_generator(size_t n_key_iterators/*= OBJECT_GENERATOR_KE m_key_max(0), m_key_stddev(0), m_key_median(0), + m_key_zipf_min(0), + m_key_zipf_max(0), + m_key_zipf_exp(1), + m_key_zipf_1mexp(0), + m_key_zipf_1mexpInv(0), + m_key_zipf_Hmin(0), + m_key_zipf_Hmax(0), + m_key_zipf_s(0), m_value_buffer(NULL), m_random_fd(-1), m_value_buffer_size(0), @@ -172,6 +180,14 @@ object_generator::object_generator(const object_generator& copy) : m_key_max(copy.m_key_max), m_key_stddev(copy.m_key_stddev), m_key_median(copy.m_key_median), + m_key_zipf_min(copy.m_key_zipf_min), + m_key_zipf_max(copy.m_key_zipf_max), + m_key_zipf_exp(copy.m_key_zipf_exp), + m_key_zipf_1mexp(copy.m_key_zipf_1mexp), + m_key_zipf_1mexpInv(copy.m_key_zipf_1mexpInv), + m_key_zipf_Hmin(copy.m_key_zipf_Hmin), + m_key_zipf_Hmax(copy.m_key_zipf_Hmax), + m_key_zipf_s(copy.m_key_zipf_s), m_value_buffer(NULL), m_random_fd(-1), m_value_buffer_size(0), @@ -348,6 +364,47 @@ void object_generator::set_key_distribution(double key_stddev, double key_median m_key_median = key_median; } +// should be called after set_key_range in memtier_benchmark.cpp +void object_generator::set_key_zipf_distribution(double key_exp) +{ + const double eps = 1e-4; + + if (key_exp < eps) + m_key_zipf_exp = 0.; + else if (fabs(key_exp - 1) < eps) + m_key_zipf_exp = 1.; + else + m_key_zipf_exp = key_exp; + + if (m_key_min == 0) + m_key_zipf_min = 1; + else + m_key_zipf_min = m_key_min; + + if (m_key_max <= m_key_zipf_min) + m_key_zipf_max = m_key_zipf_min; + else + m_key_zipf_max = m_key_max; + + if (m_key_zipf_exp < eps) + return; // degenerated to uniform distribution + else if (fabs(key_exp - 1) < eps) { + m_key_zipf_Hmin = log(m_key_zipf_min + 0.5) - 1. / m_key_zipf_min; + m_key_zipf_Hmax = log(m_key_zipf_max + 0.5); + double t = log(m_key_zipf_min + 1.5) - 1. / (m_key_zipf_min + 1); + m_key_zipf_s = m_key_zipf_min + 1 - exp(t); + } else { + m_key_zipf_1mexp = 1. - m_key_zipf_exp; + m_key_zipf_1mexpInv = 1. / m_key_zipf_1mexp; + m_key_zipf_Hmin = pow(m_key_zipf_min + 0.5, m_key_zipf_1mexp) - + m_key_zipf_1mexp * pow(m_key_zipf_min, -m_key_zipf_exp); + m_key_zipf_Hmax = pow(m_key_zipf_max + 0.5, m_key_zipf_1mexp); + double t = pow(m_key_zipf_min + 1.5, m_key_zipf_1mexp) - + m_key_zipf_1mexp * pow(m_key_zipf_min + 1, -m_key_zipf_exp); + m_key_zipf_s = m_key_zipf_min + 1 - pow(t, m_key_zipf_1mexpInv); + } +} + // return a random number between r_min and r_max unsigned long long object_generator::random_range(unsigned long long r_min, unsigned long long r_max) { @@ -361,15 +418,62 @@ unsigned long long object_generator::normal_distribution(unsigned long long r_mi return m_random.gaussian_distribution_range(r_stddev, r_median, r_min, r_max); } +// following sampler is based on: +// Rejection-inversion to generate variates from monotone discrete distributions +// ACM Transactions on Modeling and Computer Simulation. +// Volume 6 Issue 3 July 1996 pp 169–184 +// https://doi.org/10.1145/235025.235029 +unsigned long long object_generator::zipf_distribution() +{ + const double eps = 1e-4; + + if (m_key_zipf_exp < eps) + return random_range(m_key_zipf_min, m_key_zipf_max); + else if (fabs(m_key_zipf_exp - 1.0) < eps) { + while (true) { + double p = m_random.get_random() / (double)(m_random.get_random_max()); + double u = p * (m_key_zipf_Hmax - m_key_zipf_Hmin) + m_key_zipf_Hmin; + double x = exp(u); + if (x < m_key_zipf_min - 0.5) + x = m_key_zipf_min + 0.5; + if (x >= m_key_zipf_max + 0.5) + x = m_key_zipf_max; + double k = floor(x + 0.5); + if (k - x <= m_key_zipf_s) + return k; + if (u > log(k + 0.5) - 1. / k) + return k; + } + } else { + while (true) { + double p = m_random.get_random() / (double)(m_random.get_random_max()); + double u = p * (m_key_zipf_Hmax - m_key_zipf_Hmin) + m_key_zipf_Hmin; + double x = pow(u, m_key_zipf_1mexpInv); + if (x < m_key_zipf_min - 0.5) + x = m_key_zipf_min + 0.5; + if (x >= m_key_zipf_max + 0.5) + x = m_key_zipf_max; + double k = floor(x + 0.5); + if (k - x <= m_key_zipf_s) + return k; + double t = (u - pow(k + 0.5, m_key_zipf_1mexp)); + if (m_key_zipf_1mexpInv * t > -pow(k, -m_key_zipf_exp)) + return k; + } + } +} + unsigned long long object_generator::get_key_index(int iter) { - assert(iter < static_cast(m_next_key.size()) && iter >= OBJECT_GENERATOR_KEY_GAUSSIAN); + assert(iter < static_cast(m_next_key.size()) && iter >= OBJECT_GENERATOR_KEY_ZIPFIAN); unsigned long long k; if (iter==OBJECT_GENERATOR_KEY_RANDOM) { k = random_range(m_key_min, m_key_max); } else if(iter==OBJECT_GENERATOR_KEY_GAUSSIAN) { k = normal_distribution(m_key_min, m_key_max, m_key_stddev, m_key_median); + } else if(iter == OBJECT_GENERATOR_KEY_ZIPFIAN) { + k = zipf_distribution(); } else { if (m_next_key[iter] < m_key_min) m_next_key[iter] = m_key_min; diff --git a/obj_gen.h b/obj_gen.h index e1c1cc0b..89ae57a1 100644 --- a/obj_gen.h +++ b/obj_gen.h @@ -47,7 +47,7 @@ class gaussian_noise: public random_generator { private: double gaussian_distribution(const double &stddev); bool m_hasSpare; - double m_spare; + double m_spare; }; class data_object { @@ -75,6 +75,7 @@ class data_object { #define OBJECT_GENERATOR_KEY_GET_ITER 0 #define OBJECT_GENERATOR_KEY_RANDOM -1 #define OBJECT_GENERATOR_KEY_GAUSSIAN -2 +#define OBJECT_GENERATOR_KEY_ZIPFIAN -3 class object_generator { public: @@ -98,6 +99,19 @@ class object_generator { unsigned long long m_key_max; double m_key_stddev; double m_key_median; + + // zipf will only be used for key generation + // adjusted min and max key for zipf, may be difference from user specified + unsigned long long m_key_zipf_min; + unsigned long long m_key_zipf_max; + // other persist data across generations + double m_key_zipf_exp; + double m_key_zipf_1mexp; + double m_key_zipf_1mexpInv; + double m_key_zipf_Hmin; + double m_key_zipf_Hmax; + double m_key_zipf_s; + data_object m_object; std::vector m_next_key; @@ -121,6 +135,7 @@ class object_generator { unsigned long long random_range(unsigned long long r_min, unsigned long long r_max); unsigned long long normal_distribution(unsigned long long r_min, unsigned long long r_max, double r_stddev, double r_median); + unsigned long long zipf_distribution(); void set_random_data(bool random_data); void set_data_size_fixed(unsigned int size); @@ -131,6 +146,7 @@ class object_generator { void set_key_prefix(const char *key_prefix); void set_key_range(unsigned long long key_min, unsigned long long key_max); void set_key_distribution(double key_stddev, double key_median); + void set_key_zipf_distribution(double key_exp); void set_random_seed(int seed); unsigned long long get_key_index(int iter); From 3fcfe426fad0664696d373acd256f799f65b70c9 Mon Sep 17 00:00:00 2001 From: Su Lifan Date: Mon, 6 Dec 2021 11:10:52 +0800 Subject: [PATCH 2/5] update bash-completion for zipf argument Signed-off-by: Su Lifan --- bash-completion/memtier_benchmark | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bash-completion/memtier_benchmark b/bash-completion/memtier_benchmark index a10003c5..db3f53b8 100644 --- a/bash-completion/memtier_benchmark +++ b/bash-completion/memtier_benchmark @@ -27,7 +27,7 @@ _memtier_completions() options_no_args=("--debug" "--show-config" "--hide-histogram" "--distinct-client-seed" "--randomize"\ "--random-data" "--data-verify" "--verify-only" "--generate-keys" "--key-stddev"\ - "--key-median" "--no-expiry" "--cluster-mode" "--help" "--version"\ + "--key-median" "--key-zipf-exp" "--no-expiry" "--cluster-mode" "--help" "--version"\ "-D" "-R" "-h" "-v") options_comp=("--protocol" "-P" "--key-pattern" "--data-size-pattern" "--command-key-pattern") From 61429167157984567c095e8c4d21067d0159fe29 Mon Sep 17 00:00:00 2001 From: Paulo Sousa Date: Tue, 27 May 2025 21:45:04 +0100 Subject: [PATCH 3/5] Add test to check Zipfian key distribution - tracks and counts command execution using redis-py Monitor class --- tests/tests_oss_zipfian_distribution.py | 151 ++++++++++++++++++++++++ 1 file changed, 151 insertions(+) create mode 100644 tests/tests_oss_zipfian_distribution.py diff --git a/tests/tests_oss_zipfian_distribution.py b/tests/tests_oss_zipfian_distribution.py new file mode 100644 index 00000000..7d0b7082 --- /dev/null +++ b/tests/tests_oss_zipfian_distribution.py @@ -0,0 +1,151 @@ +import tempfile +import threading +from collections import Counter +import redis +import math + +import redis.client + +from include import ( + get_default_memtier_config, + add_required_env_arguments, + ensure_clean_benchmark_folder, + addTLSArgs, + agg_info_commandstats, + assert_minimum_memtier_outcomes, + get_expected_request_count, +) +from mbdirector.benchmark import Benchmark +from mbdirector.runner import RunConfig + + +class MonitorThread(threading.Thread): + """Monitor Redis commands and count key accesses""" + + def __init__(self, connection, stop_commands: set[str] = None): + threading.Thread.__init__(self) + self.monitor: redis.client.Monitor = connection.monitor() + self.stop_commands: set[str] = stop_commands or { + "INFO COMMANDSTATS", + "FLUSHALL", + } + + self.key_counts: Counter = Counter() + + def run(self): + try: + with self.monitor as m: + for command_info in m.listen(): + command = command_info.get("command") + + if command.upper() in self.stop_commands: + break + + parts = command.split() + if len(parts) >= 2 and parts[0].upper() in {"SET", "GET"}: + key = parts[1] + self.key_counts[key] += 1 + + except redis.ConnectionError: + # stop monitoring: server connection was closed + pass + + +def correlation_coeficient(x: list[float], y: list[float]) -> float: + """Calculate Pearson correlation coefficient between two lists of numbers""" + n = len(x) + mean_x = sum(x) / n + mean_y = sum(y) / n + + numerator = sum((x[i] - mean_x) * (y[i] - mean_y) for i in range(n)) + + sum_sq_x = sum((x[i] - mean_x) ** 2 for i in range(n)) + sum_sq_y = sum((y[i] - mean_y) ** 2 for i in range(n)) + denominator = math.sqrt(sum_sq_x * sum_sq_y) + + return numerator / denominator if denominator != 0 else 0 + + +def test_zipfian_key_distribution(env): + """Test that the Zipfian key-pattern follows Zipf's law""" + key_min = 1 + key_max = 10000 + + # Configure benchmark with Zipfian distribution + benchmark_specs = { + "name": env.testName, + "args": [ + "--ratio=1:1", # Both SET and GET operations + "--key-pattern=Z:Z", # Zipfian for both SET and GET + f"--key-minimum={key_min}", + f"--key-maximum={key_max}", + ], + } + + addTLSArgs(benchmark_specs, env) + config = get_default_memtier_config(threads=2, clients=10) + master_nodes_list = env.getMasterNodesList() + overall_expected_request_count = get_expected_request_count( + config, key_min, key_max + ) + + add_required_env_arguments(benchmark_specs, config, env, master_nodes_list) + + # Create a temporary directory + test_dir = tempfile.mkdtemp() + config = RunConfig(test_dir, env.testName, config, {}) + ensure_clean_benchmark_folder(config.results_dir) + + benchmark = Benchmark.from_json(config, benchmark_specs) + + monitor_threads = [] + master_nodes_connections = env.getOSSMasterNodesConnectionList() + # Start monitoring Redis commands + for conn in master_nodes_connections: + monitor_thread = MonitorThread(conn) + monitor_thread.start() + monitor_threads.append(monitor_thread) + + # Run the benchmark + memtier_ok = benchmark.run() + + # Verify the benchmark ran successfully + merged_command_stats = {"cmdstat_set": {"calls": 0}, "cmdstat_get": {"calls": 0}} + overall_request_count = agg_info_commandstats( + master_nodes_connections, merged_command_stats + ) + assert_minimum_memtier_outcomes( + config, env, memtier_ok, overall_expected_request_count, overall_request_count + ) + + # Combine results from all master nodes' monitor threads + combined_key_counts = Counter() + for thread in monitor_threads: + thread.join() # waits for monitor thread to finish + combined_key_counts.update(thread.key_counts) + + # Verify Zipfian properties + if len(combined_key_counts) > 0: + # Sort keys by frequency (ascending) + sorted_keys = sorted( + combined_key_counts.items(), key=lambda x: x[1], reverse=True + ) + + # Verify that frequency follows Zipf's law + # in Zipfian, frequency of kth element is proportional to 1/k^s + # so log(frequency) should be roughly linear with log(rank) + ranks = list(range(1, len(sorted_keys) + 1)) + _, frequencies = zip(*sorted_keys) + + # Calculate correlation between log(rank) and log(frequency) + log_ranks = [math.log(r) for r in ranks] + log_freqs = [math.log(f) if f > 0 else 0 for f in frequencies] + correlation = correlation_coeficient(log_ranks, log_freqs) + + env.debugPrint( + f"Correlation between log(rank) and log(frequency): {correlation}" + ) + + # should be close to -1 for a perfect law relationship + # should be negative since log(rank) is increasing and log(freq) is decreasing + env.assertTrue(correlation < -0.8) From 8f7ef19e9a50cfd2eae3fa2984651734d68b4ccf Mon Sep 17 00:00:00 2001 From: Paulo Sousa Date: Tue, 27 May 2025 22:58:12 +0100 Subject: [PATCH 4/5] Add test to validate impact of Zipfian's exponent on key distribution - Refactor Zipfian tests to avoid repetition --- tests/tests_oss_zipfian_distribution.py | 187 +++++++++--------------- tests/zipfian_benchmark_runner.py | 134 +++++++++++++++++ 2 files changed, 200 insertions(+), 121 deletions(-) create mode 100644 tests/zipfian_benchmark_runner.py diff --git a/tests/tests_oss_zipfian_distribution.py b/tests/tests_oss_zipfian_distribution.py index 7d0b7082..ac791782 100644 --- a/tests/tests_oss_zipfian_distribution.py +++ b/tests/tests_oss_zipfian_distribution.py @@ -1,54 +1,8 @@ -import tempfile -import threading from collections import Counter -import redis import math +from itertools import pairwise -import redis.client - -from include import ( - get_default_memtier_config, - add_required_env_arguments, - ensure_clean_benchmark_folder, - addTLSArgs, - agg_info_commandstats, - assert_minimum_memtier_outcomes, - get_expected_request_count, -) -from mbdirector.benchmark import Benchmark -from mbdirector.runner import RunConfig - - -class MonitorThread(threading.Thread): - """Monitor Redis commands and count key accesses""" - - def __init__(self, connection, stop_commands: set[str] = None): - threading.Thread.__init__(self) - self.monitor: redis.client.Monitor = connection.monitor() - self.stop_commands: set[str] = stop_commands or { - "INFO COMMANDSTATS", - "FLUSHALL", - } - - self.key_counts: Counter = Counter() - - def run(self): - try: - with self.monitor as m: - for command_info in m.listen(): - command = command_info.get("command") - - if command.upper() in self.stop_commands: - break - - parts = command.split() - if len(parts) >= 2 and parts[0].upper() in {"SET", "GET"}: - key = parts[1] - self.key_counts[key] += 1 - - except redis.ConnectionError: - # stop monitoring: server connection was closed - pass +from zipfian_benchmark_runner import ZipfianBenchmarkRunner def correlation_coeficient(x: list[float], y: list[float]) -> float: @@ -66,86 +20,77 @@ def correlation_coeficient(x: list[float], y: list[float]) -> float: return numerator / denominator if denominator != 0 else 0 +def analyze_zipfian_correlation(key_counts: Counter) -> float: + """Analyze key distribution and return correlation coefficient for Zipf's law validation""" + + # Sort keys by frequency (descending) + sorted_keys = sorted(key_counts.items(), key=lambda x: x[1], reverse=True) + + # Verify that frequency follows Zipf's law + # in Zipfian, frequency of kth element is proportional to 1/k^s + # so log(frequency) should be roughly linear with log(rank) + ranks = list(range(1, len(sorted_keys) + 1)) + _, frequencies = zip(*sorted_keys) + + # Calculate correlation between log(rank) and log(frequency) + log_ranks = [math.log(r) for r in ranks] + log_freqs = [math.log(f) if f > 0 else 0 for f in frequencies] + + return correlation_coeficient(log_ranks, log_freqs) + + +def calculate_concentration_ratio(distribution: Counter, top_n: int = 10) -> float: + """Calculate the concentration ratio of top N keys in the distribution""" + top_n_sum = sum(sorted(distribution.values(), reverse=True)[:top_n]) + total_sum = sum(distribution.values()) + + return top_n_sum / total_sum if total_sum > 0 else 0.0 + + def test_zipfian_key_distribution(env): """Test that the Zipfian key-pattern follows Zipf's law""" key_min = 1 key_max = 10000 - # Configure benchmark with Zipfian distribution - benchmark_specs = { - "name": env.testName, - "args": [ - "--ratio=1:1", # Both SET and GET operations - "--key-pattern=Z:Z", # Zipfian for both SET and GET - f"--key-minimum={key_min}", - f"--key-maximum={key_max}", - ], - } - - addTLSArgs(benchmark_specs, env) - config = get_default_memtier_config(threads=2, clients=10) - master_nodes_list = env.getMasterNodesList() - overall_expected_request_count = get_expected_request_count( - config, key_min, key_max - ) - - add_required_env_arguments(benchmark_specs, config, env, master_nodes_list) - - # Create a temporary directory - test_dir = tempfile.mkdtemp() - config = RunConfig(test_dir, env.testName, config, {}) - ensure_clean_benchmark_folder(config.results_dir) - - benchmark = Benchmark.from_json(config, benchmark_specs) - - monitor_threads = [] - master_nodes_connections = env.getOSSMasterNodesConnectionList() - # Start monitoring Redis commands - for conn in master_nodes_connections: - monitor_thread = MonitorThread(conn) - monitor_thread.start() - monitor_threads.append(monitor_thread) - - # Run the benchmark - memtier_ok = benchmark.run() - - # Verify the benchmark ran successfully - merged_command_stats = {"cmdstat_set": {"calls": 0}, "cmdstat_get": {"calls": 0}} - overall_request_count = agg_info_commandstats( - master_nodes_connections, merged_command_stats - ) - assert_minimum_memtier_outcomes( - config, env, memtier_ok, overall_expected_request_count, overall_request_count - ) - - # Combine results from all master nodes' monitor threads - combined_key_counts = Counter() - for thread in monitor_threads: - thread.join() # waits for monitor thread to finish - combined_key_counts.update(thread.key_counts) - - # Verify Zipfian properties - if len(combined_key_counts) > 0: - # Sort keys by frequency (ascending) - sorted_keys = sorted( - combined_key_counts.items(), key=lambda x: x[1], reverse=True - ) + # Use the helper class to run benchmark and collect data + runner = ZipfianBenchmarkRunner(env, key_min, key_max) + combined_key_counts = runner.run_benchmark_and_collect_key_counting(env.testName) + + # Verify Zipfian properties using helper function + correlation = analyze_zipfian_correlation(combined_key_counts) - # Verify that frequency follows Zipf's law - # in Zipfian, frequency of kth element is proportional to 1/k^s - # so log(frequency) should be roughly linear with log(rank) - ranks = list(range(1, len(sorted_keys) + 1)) - _, frequencies = zip(*sorted_keys) + # should be close to -1 for a perfect law relationship + # should be negative since log(rank) is increasing and log(freq) is decreasing + env.assertTrue(correlation < -0.8) - # Calculate correlation between log(rank) and log(frequency) - log_ranks = [math.log(r) for r in ranks] - log_freqs = [math.log(f) if f > 0 else 0 for f in frequencies] - correlation = correlation_coeficient(log_ranks, log_freqs) - env.debugPrint( - f"Correlation between log(rank) and log(frequency): {correlation}" +def test_zipfian_exponent_effect(env): + """Test different Zipfian exponents and verify they affect the distribution""" + key_min = 1 + key_max = 10000 + + # Test with different exponents + zipf_exponents = {0.5, 1.0, 2.0} + distributions = {} + + # Run benchmarks for each exponent + runner = ZipfianBenchmarkRunner(env, key_min, key_max) + + for zipf_exp in zipf_exponents: + test_name = f"{env.testName}_exp_{zipf_exp}" + combined_key_counts = runner.run_benchmark_and_collect_key_counting( + test_name, zipf_exp=zipf_exp ) + distributions[zipf_exp] = combined_key_counts + + # Verify that higher exponents lead to more skewed distributions + sorted_exponents = sorted(zipf_exponents) + for exp1, exp2 in pairwise(sorted_exponents): + dist1 = distributions[exp1] + dist2 = distributions[exp2] + + concentration1 = calculate_concentration_ratio(dist1) + concentration2 = calculate_concentration_ratio(dist2) - # should be close to -1 for a perfect law relationship - # should be negative since log(rank) is increasing and log(freq) is decreasing - env.assertTrue(correlation < -0.8) + # Higher exponent should have higher concentration in top keys + env.assertTrue(concentration2 > concentration1) diff --git a/tests/zipfian_benchmark_runner.py b/tests/zipfian_benchmark_runner.py new file mode 100644 index 00000000..b1d87973 --- /dev/null +++ b/tests/zipfian_benchmark_runner.py @@ -0,0 +1,134 @@ +import tempfile +import threading +import redis +from collections import Counter + +from include import ( + get_default_memtier_config, + add_required_env_arguments, + ensure_clean_benchmark_folder, + addTLSArgs, + agg_info_commandstats, + assert_minimum_memtier_outcomes, + get_expected_request_count, +) +from mbdirector.benchmark import Benchmark +from mbdirector.runner import RunConfig + + +class MonitorThread(threading.Thread): + """Monitor Redis commands and count key accesses""" + + def __init__(self, connection, stop_commands: set[str] = None): + threading.Thread.__init__(self) + self.monitor: redis.client.Monitor = connection.monitor() + self.stop_commands: set[str] = stop_commands or { + "INFO COMMANDSTATS", + "FLUSHALL", + } + self.key_counts: Counter = Counter() + + def run(self): + try: + with self.monitor as m: + for command_info in m.listen(): + command = command_info.get("command") + + if command.upper() in self.stop_commands: + break + + parts = command.split() + if len(parts) >= 2 and parts[0].upper() in {"SET", "GET"}: + key = parts[1] + self.key_counts[key] += 1 + + except redis.ConnectionError: + # stop monitoring: server connection was closed + pass + + +class ZipfianBenchmarkRunner: + """Helper class to run Zipfian distribution benchmarks and collect key access data""" + + def __init__( + self, env, key_min: int, key_max: int, threads: int = 2, clients: int = 10 + ): + self.env = env + self.key_min = key_min + self.key_max = key_max + self.threads = threads + self.clients = clients + + def run_benchmark_and_collect_key_counting( + self, test_name: str, zipf_exp: float = None + ) -> Counter: + """Run a complete benchmark and return key access distribution data""" + # Create benchmark specs + args = [ + "--ratio=1:1", # Both SET and GET operations + "--key-pattern=Z:Z", # Zipfian for both SET and GET + f"--key-minimum={self.key_min}", + f"--key-maximum={self.key_max}", + ] + + if zipf_exp is not None: + args.append(f"--key-zipf-exp={zipf_exp}") + + benchmark_specs = {"name": test_name, "args": args} + + addTLSArgs(benchmark_specs, self.env) + + config = get_default_memtier_config(threads=self.threads, clients=self.clients) + master_nodes_list = self.env.getMasterNodesList() + overall_expected_request_count = get_expected_request_count( + config, self.key_min, self.key_max + ) + + add_required_env_arguments(benchmark_specs, config, self.env, master_nodes_list) + + # Create temporary directory and run config + test_dir = tempfile.mkdtemp() + run_config = RunConfig(test_dir, test_name, config, {}) + ensure_clean_benchmark_folder(run_config.results_dir) + + benchmark = Benchmark.from_json(run_config, benchmark_specs) + + # Setup monitoring + master_nodes_connections = self.env.getOSSMasterNodesConnectionList() + + monitor_threads = [] + for conn in master_nodes_connections: + # prevent accumulating stats from previous runs + conn.execute_command("CONFIG", "RESETSTAT") + + # start monitoring connection + monitor_thread = MonitorThread(conn) + monitor_thread.start() + monitor_threads.append(monitor_thread) + + # Run the benchmark + memtier_ok = benchmark.run() + + # Verify the benchmark ran successfully + merged_command_stats = { + "cmdstat_set": {"calls": 0}, + "cmdstat_get": {"calls": 0}, + } + overall_request_count = agg_info_commandstats( + master_nodes_connections, merged_command_stats + ) + assert_minimum_memtier_outcomes( + run_config, + self.env, + memtier_ok, + overall_expected_request_count, + overall_request_count, + ) + + """Collect and combine results from all monitor threads""" + combined_key_counts = Counter() + for thread in monitor_threads: + thread.join() # waits for monitor thread to finish + combined_key_counts.update(thread.key_counts) + + return combined_key_counts From 51a4673132f64bb412e8420d92fbca229ebef1bf Mon Sep 17 00:00:00 2001 From: Paulo Sousa Date: Wed, 28 May 2025 09:30:34 +0100 Subject: [PATCH 5/5] Enhance --help docs for key-zipf-exp option to clarify impact of higher exponents --- memtier_benchmark.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/memtier_benchmark.cpp b/memtier_benchmark.cpp index 27dc68ac..e7da28f3 100755 --- a/memtier_benchmark.cpp +++ b/memtier_benchmark.cpp @@ -1068,6 +1068,7 @@ void usage() { " --key-median The median point used in the Gaussian distribution\n" " (default is the center of the key range)\n" " --key-zipf-exp The exponent used in the zipf distribution, limit to (0, 5)\n" + " Higher exponents result in higher concentration in top keys\n" " (default is 1, though any number >2 seems insane)\n" "\n" "WAIT Options:\n"