diff --git a/redisbench_admin/environments/oss_cluster.py b/redisbench_admin/environments/oss_cluster.py index 526b6c3..c512e3b 100644 --- a/redisbench_admin/environments/oss_cluster.py +++ b/redisbench_admin/environments/oss_cluster.py @@ -67,9 +67,14 @@ def spin_up_local_redis_cluster( return redis_processes, redis_conns -def setup_redis_cluster_from_conns(redis_conns, shard_count, shard_host, start_port): +def setup_redis_cluster_from_conns( + redis_conns, shard_count, shard_host, start_port, db_nodes=1 +): + meet_cmds = [] logging.info("Setting up cluster. Total {} primaries.".format(len(redis_conns))) - meet_cmds = generate_meet_cmds(shard_count, shard_host, start_port) + meet_cmds = generate_meet_cmds( + shard_count, shard_host, start_port, meet_cmds, 1, db_nodes + ) status = setup_oss_cluster_from_conns(meet_cmds, redis_conns, shard_count) if status is True: for conn in redis_conns: @@ -77,12 +82,38 @@ def setup_redis_cluster_from_conns(redis_conns, shard_count, shard_host, start_p return status -def generate_meet_cmds(shard_count, shard_host, start_port): - meet_cmds = [] +def generate_host_port_pairs( + server_private_ips, shard_count, cluster_start_port, required_node_count=1 +): + host_port_pairs = [] + (primaries_per_node, db_private_ips, _,) = split_primaries_per_db_nodes( + server_private_ips, None, shard_count, required_node_count + ) + shard_start = 1 + for node_n, primaries_this_node in enumerate(primaries_per_node, start=0): + server_private_ip = db_private_ips[node_n] + for master_shard_id in range( + shard_start, shard_start + primaries_this_node + 1 + ): + shard_port = master_shard_id + cluster_start_port - 1 + host_port_pairs.append([server_private_ip, shard_port]) - for master_shard_id in range(1, shard_count + 1): - shard_port = master_shard_id + start_port - 1 - meet_cmds.append("CLUSTER MEET {} {}".format(shard_host, shard_port)) + return host_port_pairs + + +def generate_meet_cmds( + shard_count, + server_private_ips, + cluster_start_port, + meet_cmds, + shard_start=1, + db_nodes=1, +): + host_port_pairs = generate_host_port_pairs( + server_private_ips, shard_count, cluster_start_port, db_nodes + ) + for pair in host_port_pairs: + meet_cmds.append("CLUSTER MEET {} {}".format(pair[0], pair[1])) return meet_cmds @@ -199,3 +230,40 @@ def generate_cluster_redis_server_args( def get_cluster_dbfilename(port): return "cluster-node-port-{}.rdb".format(port) + + +def split_primaries_per_db_nodes( + server_private_ips, server_public_ips, shard_count, required_node_count=None +): + if type(server_public_ips) is str: + server_public_ips = [server_public_ips] + if type(server_private_ips) is str: + server_private_ips = [server_private_ips] + db_node_count = len(server_private_ips) + if required_node_count is None: + required_node_count = db_node_count + if db_node_count < required_node_count: + error_msg = "There are less servers ({}) than the required ones for this setup {}. Failing test...".format( + db_node_count, required_node_count + ) + logging.error(error_msg) + raise Exception(error_msg) + if server_public_ips is not None: + server_public_ips = server_public_ips[0:required_node_count] + if server_private_ips is not None: + server_private_ips = server_private_ips[0:required_node_count] + primaries_per_db_node = shard_count // required_node_count + remainder_first_node = shard_count % required_node_count + first_node_primaries = primaries_per_db_node + remainder_first_node + logging.info("DB node {} will have {} primaries".format(1, first_node_primaries)) + primaries_per_node = [first_node_primaries] + for node_n, node_id in enumerate(range(2, required_node_count + 1), start=2): + logging.info("Setting") + logging.info( + "DB node {} will have {} primaries".format(node_n, primaries_per_db_node) + ) + primaries_per_node.append(primaries_per_db_node) + logging.info("Final primaries per node {}".format(primaries_per_node)) + logging.info("Final server_private_ips {}".format(server_private_ips)) + logging.info("Final server_public_ips {}".format(server_public_ips)) + return primaries_per_node, server_private_ips, server_public_ips diff --git a/redisbench_admin/export/export.py b/redisbench_admin/export/export.py index d177255..46b350b 100644 --- a/redisbench_admin/export/export.py +++ b/redisbench_admin/export/export.py @@ -154,6 +154,7 @@ def export_command_logic(args, project_name, project_version): github_org, github_repo, triggering_env, + 1, ) logging.info("Parsed a total of {} metrics".format(len(timeseries_dict.keys()))) logging.info( @@ -213,6 +214,7 @@ def export_json_to_timeseries_dict( tf_github_org, tf_github_repo, triggering_env, + n_db_nodes=1, ): results_dict = {} for test_name, d in benchmark_file.items(): @@ -237,6 +239,7 @@ def export_json_to_timeseries_dict( tf_github_repo, triggering_env, False, + n_db_nodes, ) results_dict[ts_name] = { "labels": timeserie_tags.copy(), diff --git a/redisbench_admin/run/cluster.py b/redisbench_admin/run/cluster.py index 270cd1a..98ec728 100644 --- a/redisbench_admin/run/cluster.py +++ b/redisbench_admin/run/cluster.py @@ -7,7 +7,10 @@ from redisbench_admin.utils.remote import execute_remote_commands -from redisbench_admin.environments.oss_cluster import generate_cluster_redis_server_args +from redisbench_admin.environments.oss_cluster import ( + generate_cluster_redis_server_args, + split_primaries_per_db_nodes, +) from redisbench_admin.utils.utils import wait_for_conn @@ -101,8 +104,8 @@ def generate_startup_nodes_array(redis_conns): # noinspection PyBroadException def spin_up_redis_cluster_remote_redis( - server_public_ip, - server_private_ip, + server_public_ips, + server_private_ips, username, private_key, remote_module_files, @@ -114,44 +117,60 @@ def spin_up_redis_cluster_remote_redis( modules_configuration_parameters_map, logname, redis_7=True, + required_db_nodes=1, ): logging.info("Generating the remote redis-server command arguments") + redis_process_commands = [] logfiles = [] logname_prefix = logname[: len(logname) - 4] + "-" - for master_shard_id in range(1, shard_count + 1): - shard_port = master_shard_id + start_port - 1 + ( + primaries_per_node, + server_private_ips, + server_public_ips, + ) = split_primaries_per_db_nodes( + server_private_ips, server_public_ips, shard_count, required_db_nodes + ) + shard_start = 1 + for node_n, primaries_this_node in enumerate(primaries_per_node, start=0): + server_private_ip = server_private_ips[node_n] + server_public_ip = server_public_ips[node_n] + for master_shard_id in range( + shard_start, shard_start + primaries_this_node + 1 + ): + shard_port = master_shard_id + start_port - 1 - command, logfile = generate_cluster_redis_server_args( - "redis-server", - dbdir_folder, - remote_module_files, - server_private_ip, - shard_port, - redis_configuration_parameters, - "yes", - modules_configuration_parameters_map, - logname_prefix, - "yes", - redis_7, - ) - logging.error( - "Remote primary shard {} command: {}".format( - master_shard_id, " ".join(command) + command, logfile = generate_cluster_redis_server_args( + "redis-server", + dbdir_folder, + remote_module_files, + server_private_ip, + shard_port, + redis_configuration_parameters, + "yes", + modules_configuration_parameters_map, + logname_prefix, + "yes", + redis_7, ) - ) - logfiles.append(logfile) - redis_process_commands.append(" ".join(command)) - res = execute_remote_commands( - server_public_ip, username, private_key, redis_process_commands, ssh_port - ) - for pos, res_pos in enumerate(res): - [recv_exit_status, stdout, stderr] = res_pos - if recv_exit_status != 0: logging.error( - "Remote primary shard {} command returned exit code {}. stdout {}. stderr {}".format( - pos, recv_exit_status, stdout, stderr + "Remote primary shard {} command: {}".format( + master_shard_id, " ".join(command) ) ) + logfiles.append(logfile) + redis_process_commands.append(" ".join(command)) + res = execute_remote_commands( + server_public_ip, username, private_key, redis_process_commands, ssh_port + ) + for pos, res_pos in enumerate(res): + [recv_exit_status, stdout, stderr] = res_pos + if recv_exit_status != 0: + logging.error( + "Remote primary shard {} command returned exit code {}. stdout {}. stderr {}".format( + pos, recv_exit_status, stdout, stderr + ) + ) + shard_start = shard_start + primaries_this_node return logfiles diff --git a/redisbench_admin/run/common.py b/redisbench_admin/run/common.py index 28a6e72..4255e41 100644 --- a/redisbench_admin/run/common.py +++ b/redisbench_admin/run/common.py @@ -68,7 +68,7 @@ def prepare_benchmark_parameters( benchmark_config, benchmark_tool, server_plaintext_port, - server_private_ip, + server_private_ips, remote_results_file, isremote=False, current_workdir=None, @@ -96,7 +96,7 @@ def prepare_benchmark_parameters( isremote, remote_results_file, server_plaintext_port, - server_private_ip, + server_private_ips, client_public_ip, username, private_key, @@ -116,7 +116,7 @@ def prepare_benchmark_parameters( isremote, remote_results_file, server_plaintext_port, - server_private_ip, + server_private_ips, client_public_ip, username, private_key, @@ -146,13 +146,17 @@ def prepare_benchmark_parameters_specif_tooling( isremote, remote_results_file, server_plaintext_port, - server_private_ip, + server_private_ips, client_public_ip, username, private_key, client_ssh_port, redis_password=None, ): + if type(server_private_ips) == list: + server_private_ip = server_private_ips[0] + else: + server_private_ip = server_private_ips if "redis-benchmark" in benchmark_tool: command_arr, command_str = prepare_redis_benchmark_command( benchmark_tool, @@ -296,6 +300,7 @@ def common_exporter_logic( build_variant_name=None, running_platform=None, datapoints_timestamp=None, + n_db_nodes=1, ): per_version_time_series_dict = {} per_branch_time_series_dict = {} @@ -341,6 +346,7 @@ def common_exporter_logic( build_variant_name, running_platform, testcase_metric_context_paths, + n_db_nodes, ) if tf_github_branch is not None and tf_github_branch != "": # extract per branch datapoints @@ -363,6 +369,7 @@ def common_exporter_logic( build_variant_name, running_platform, testcase_metric_context_paths, + n_db_nodes, ) else: logging.error( @@ -493,7 +500,11 @@ def extract_test_feasible_setups( "name": "oss-standalone", "type": "oss-standalone", "redis_topology": {"primaries": 1, "replicas": 0}, - "resources": {"requests": {"cpu": "1000m"}, "limits": {"cpu": "2000m"}}, + "resources": { + "nodes": 1, + "requests": {"cpu": "1000m"}, + "limits": {"cpu": "2000m"}, + }, } logging.info( "Using a backwards compatible 'oss-standalone' setup, with settings: {}".format( @@ -508,7 +519,16 @@ def get_setup_type_and_primaries_count(setup_settings): setup_type = setup_settings["type"] setup_name = setup_settings["name"] shard_count = setup_settings["redis_topology"]["primaries"] - return setup_name, setup_type, shard_count + + node_count = 1 + shard_placement = "dense" + if "redis_topology" in setup_settings: + if "placement" in setup_settings["redis_topology"]: + shard_placement = setup_settings["redis_topology"]["placement"] + if "resources" in setup_settings: + if "nodes" in setup_settings["resources"]: + node_count = setup_settings["resources"]["nodes"] + return setup_name, setup_type, shard_count, shard_placement, node_count def merge_default_and_config_metrics( diff --git a/redisbench_admin/run/redistimeseries.py b/redisbench_admin/run/redistimeseries.py index d3cf651..da0d95c 100644 --- a/redisbench_admin/run/redistimeseries.py +++ b/redisbench_admin/run/redistimeseries.py @@ -39,6 +39,7 @@ def prepare_timeseries_dict( build_variant_name=None, running_platform=None, datapoints_timestamp=None, + n_db_nodes=1, ): time_series_dict = {} # check which metrics to extract @@ -67,6 +68,7 @@ def prepare_timeseries_dict( build_variant_name, running_platform, datapoints_timestamp, + n_db_nodes, ) time_series_dict.update(per_version_time_series_dict) time_series_dict.update(per_branch_time_series_dict) @@ -93,6 +95,7 @@ def add_standardized_metric_bybranch( metadata_tags={}, build_variant_name=None, running_platform=None, + n_db_nodes=1, ): if metric_value is not None: tsname_use_case_duration = get_ts_metric_name( @@ -109,6 +112,7 @@ def add_standardized_metric_bybranch( False, build_variant_name, running_platform, + n_db_nodes, ) labels = get_project_ts_tags( tf_github_org, @@ -119,6 +123,7 @@ def add_standardized_metric_bybranch( metadata_tags, build_variant_name, running_platform, + n_db_nodes, ) labels["branch"] = tf_github_branch labels["deployment_name+branch"] = "{} {}".format( @@ -238,6 +243,7 @@ def timeseries_test_sucess_flow( build_variant_name=None, running_platform=None, timeseries_dict=None, + n_db_nodes=1, ): testcase_metric_context_paths = [] version_target_tables = None @@ -266,6 +272,7 @@ def timeseries_test_sucess_flow( build_variant_name, running_platform, start_time_ms, + n_db_nodes, ) if push_results_redistimeseries: logging.info( diff --git a/redisbench_admin/run_async/terraform.py b/redisbench_admin/run_async/terraform.py index d7c9d06..3a9cbbb 100644 --- a/redisbench_admin/run_async/terraform.py +++ b/redisbench_admin/run_async/terraform.py @@ -172,6 +172,7 @@ def setup_remote_environment( ) _, _, _ = tf.refresh() tf_output = tf.output() + logging.error("TF OUTPUT setup_remote_environment: {}".format(tf_output)) server_private_ip = tf_output_or_none(tf_output, "runner_private_ip") server_public_ip = tf_output_or_none(tf_output, "runner_public_ip") if server_private_ip is not None or server_public_ip is not None: @@ -269,6 +270,7 @@ def terraform_spin_or_reuse_env( else: logging.info("Reusing remote setup {}".format(remote_id)) tf = remote_envs[remote_id] + tf_output = tf.output() ( tf_return_code, username, @@ -277,7 +279,7 @@ def terraform_spin_or_reuse_env( server_plaintext_port, client_private_ip, client_public_ip, - ) = retrieve_tf_connection_vars(None, tf) + ) = retrieve_tf_connection_vars(None, tf_output) return ( client_public_ip, deployment_type, diff --git a/redisbench_admin/run_local/local_db.py b/redisbench_admin/run_local/local_db.py index 7d3e78a..c9a8b05 100644 --- a/redisbench_admin/run_local/local_db.py +++ b/redisbench_admin/run_local/local_db.py @@ -133,7 +133,9 @@ def local_db_spin( ) ) if is_process_alive(redis_process) is False: - raise Exception("Redis process is not alive. Failing test.") + raise Exception( + "Redis shard #{} is not alive. Failing test.".format(shardn) + ) if setup_type == "oss-cluster": cluster_init_steps(clusterconfig, redis_conns, local_module_file) diff --git a/redisbench_admin/run_local/run_local.py b/redisbench_admin/run_local/run_local.py index d2c5d9f..2fd0c87 100644 --- a/redisbench_admin/run_local/run_local.py +++ b/redisbench_admin/run_local/run_local.py @@ -156,6 +156,8 @@ def run_local_command_logic(args, project_name, project_version): setup_name, setup_type, shard_count, + _, + _, ) = get_setup_type_and_primaries_count(setup_settings) if args.allowed_setups != "": allowed_setups = args.allowed_setups.split(",") diff --git a/redisbench_admin/run_remote/remote_client.py b/redisbench_admin/run_remote/remote_client.py index 4f52388..f50130f 100644 --- a/redisbench_admin/run_remote/remote_client.py +++ b/redisbench_admin/run_remote/remote_client.py @@ -38,7 +38,7 @@ def run_remote_client_tool( remote_results_file, return_code, server_plaintext_port, - server_private_ip, + server_private_ips, start_time_ms, start_time_str, username, @@ -96,7 +96,7 @@ def run_remote_client_tool( benchmark_config, benchmark_tool, server_plaintext_port, - server_private_ip, + server_private_ips, remote_results_file, True, None, diff --git a/redisbench_admin/run_remote/remote_db.py b/redisbench_admin/run_remote/remote_db.py index 4c1ef8d..fb095cc 100644 --- a/redisbench_admin/run_remote/remote_db.py +++ b/redisbench_admin/run_remote/remote_db.py @@ -8,7 +8,10 @@ import redis -from redisbench_admin.environments.oss_cluster import setup_redis_cluster_from_conns +from redisbench_admin.environments.oss_cluster import ( + setup_redis_cluster_from_conns, + split_primaries_per_db_nodes, +) from redisbench_admin.run.cluster import ( spin_up_redis_cluster_remote_redis, debug_reload_rdb, @@ -40,23 +43,27 @@ def remote_tmpdir_prune( - server_public_ip, ssh_port, temporary_dir, username, private_key + server_public_ips, ssh_port, temporary_dir, username, private_key ): - execute_remote_commands( - server_public_ip, - username, - private_key, - [ - "mkdir -p {}".format(temporary_dir), - "rm -rf {}/*.log".format(temporary_dir), - "rm -rf {}/*.config".format(temporary_dir), - "rm -rf {}/*.rdb".format(temporary_dir), - "rm -rf {}/*.out".format(temporary_dir), - "rm -rf {}/*.data".format(temporary_dir), - "pkill -9 redis-server", - ], - ssh_port, - ) + if type(server_public_ips) is str: + server_public_ips = [server_public_ips] + + for server_public_ip in server_public_ips: + execute_remote_commands( + server_public_ip, + username, + private_key, + [ + "mkdir -p {}".format(temporary_dir), + "rm -rf {}/*.log".format(temporary_dir), + "rm -rf {}/*.config".format(temporary_dir), + "rm -rf {}/*.rdb".format(temporary_dir), + "rm -rf {}/*.out".format(temporary_dir), + "rm -rf {}/*.data".format(temporary_dir), + "pkill -9 redis-server", + ], + ssh_port, + ) def is_single_endpoint(setup_type): @@ -78,8 +85,8 @@ def remote_db_spin( required_modules, return_code, server_plaintext_port, - server_private_ip, - server_public_ip, + server_private_ips, + server_public_ips, setup_name, setup_type, shard_count, @@ -102,6 +109,8 @@ def remote_db_spin( redis_password=None, flushall_on_every_test_start=False, ignore_keyspace_errors=False, + shard_placement="sparse", + required_node_count=1, ): ( _, @@ -119,7 +128,7 @@ def remote_db_spin( cp_local_dbdir_to_remote( dbdir_folder, private_key, - server_public_ip, + server_public_ips, temporary_dir, username, ) @@ -129,7 +138,7 @@ def remote_db_spin( db_ssh_port, private_key, remote_module_file_dir, - server_public_ip, + server_public_ips, username, ) # setup Redis @@ -138,9 +147,12 @@ def remote_db_spin( topology_setup_start_time = datetime.datetime.now() if setup_type == "oss-cluster": if skip_redis_setup is False: + logging.info( + "Setting up oss-cluster with {} nodes".format(required_node_count) + ) logfiles = spin_up_redis_cluster_remote_redis( - server_public_ip, - server_private_ip, + server_public_ips, + server_private_ips, username, private_key, remote_module_files, @@ -152,42 +164,103 @@ def remote_db_spin( modules_configuration_parameters_map, logname, redis_7, + required_node_count, ) - try: - for p in range(cluster_start_port, cluster_start_port + shard_count): - local_redis_conn, ssh_tunnel = ssh_tunnel_redisconn( - p, - server_private_ip, - server_public_ip, - username, - db_ssh_port, - private_key, - redis_password, + if shard_placement == "sparse": + logging.info( + "Setting up sparse placement between {} nodes".format( + required_node_count + ) + ) + ( + primaries_per_node, + db_private_ips, + db_public_ips, + ) = split_primaries_per_db_nodes( + server_private_ips, + server_public_ips, + shard_count, + required_node_count, + ) + else: + logging.info( + "Setting up dense placement between {} nodes".format( + required_node_count + ) + ) + ( + primaries_per_node, + db_private_ips, + db_public_ips, + ) = split_primaries_per_db_nodes( + server_private_ips[0], + server_public_ips[0], + shard_count, + required_node_count, + ) + logging.info( + "Shard placement is {}. {} primaries per node. DB private IPs: {}; DB public IPs {}".format( + shard_placement, primaries_per_node, db_private_ips, db_public_ips ) - local_redis_conn.ping() - redis_conns.append(local_redis_conn) - except redis.exceptions.ConnectionError as e: - logging.error("A error occurred while spinning DB: {}".format(e.__str__())) - logfile = logfiles[0] - - remote_file = "{}/{}".format(temporary_dir, logfile) - logging.error( - "Trying to fetch DB remote log {} into {}".format(remote_file, logfile) - ) - db_error_artifacts( - db_ssh_port, - dirname, - full_logfiles, - logname, - private_key, - s3_bucket_name, - s3_bucket_path, - server_public_ip, - temporary_dir, - True, - username, ) + shard_start = 1 + for node_n, primaries_this_node in enumerate(primaries_per_node, start=0): + server_private_ip = db_private_ips[node_n] + server_public_ip = db_public_ips[node_n] + for master_shard_id in range( + shard_start, shard_start + primaries_this_node + 1 + ): + shard_port = master_shard_id + cluster_start_port - 1 + try: + local_redis_conn, ssh_tunnel = ssh_tunnel_redisconn( + shard_port, + server_private_ip, + server_public_ip, + username, + db_ssh_port, + private_key, + redis_password, + ) + local_redis_conn.ping() + redis_conns.append(local_redis_conn) + except redis.exceptions.ConnectionError as e: + logging.error( + "A error occurred while spinning DB: {}".format(e.__str__()) + ) + logfile = logfiles[0] + + remote_file = "{}/{}".format(temporary_dir, logfile) + logging.error( + "Trying to fetch DB remote log {} into {}".format( + remote_file, logfile + ) + ) + db_error_artifacts( + db_ssh_port, + dirname, + full_logfiles, + logname, + private_key, + s3_bucket_name, + s3_bucket_path, + server_public_ip, + temporary_dir, + True, + username, + ) + shard_start = shard_start + primaries_this_node + + # we only use the 1st node for single endpoint tests + # for client connections, even on cluster setups we always reload the + # cluster info via cluster slots, so we can indeed only use the 1st node ip + if type(server_private_ips) is str: + server_private_ip = server_private_ips + server_public_ip = server_public_ips + else: + server_private_ip = server_private_ips[0] + server_public_ip = server_public_ips[0] + if is_single_endpoint(setup_type): try: if skip_redis_setup is False: @@ -234,8 +307,9 @@ def remote_db_spin( setup_redis_cluster_from_conns( redis_conns, shard_count, - server_private_ip, + server_private_ips, cluster_start_port, + required_node_count, ) server_plaintext_port = cluster_start_port diff --git a/redisbench_admin/run_remote/remote_env.py b/redisbench_admin/run_remote/remote_env.py index d5efd90..1e8e269 100644 --- a/redisbench_admin/run_remote/remote_env.py +++ b/redisbench_admin/run_remote/remote_env.py @@ -72,7 +72,16 @@ def remote_env_setup( tf_override_name, tf_folder_path, ) + n_db_hosts = 1 + n_client_hosts = 1 + if type(server_public_ip) == list: + n_db_hosts = len(server_public_ip) + if type(client_public_ip) == list: + n_client_hosts = len(client_public_ip) + return ( + n_db_hosts, + n_client_hosts, client_public_ip, server_plaintext_port, server_private_ip, diff --git a/redisbench_admin/run_remote/run_remote.py b/redisbench_admin/run_remote/run_remote.py index 111b957..0994b81 100644 --- a/redisbench_admin/run_remote/run_remote.py +++ b/redisbench_admin/run_remote/run_remote.py @@ -366,6 +366,8 @@ def run_remote_command_logic(args, project_name, project_version): setup_name, setup_type, shard_count, + shard_placement, + required_db_node_count, ) = get_setup_type_and_primaries_count(setup_settings) if args.allowed_setups != "": allowed_setups = args.allowed_setups.split(",") @@ -399,6 +401,8 @@ def run_remote_command_logic(args, project_name, project_version): client_artifacts_map = {} temporary_dir = get_tmp_folder_rnd() ( + n_db_hosts, + n_client_hosts, client_public_ip, server_plaintext_port, server_private_ip, @@ -424,6 +428,25 @@ def run_remote_command_logic(args, project_name, project_version): TF_OVERRIDE_REMOTE, ) + if n_db_hosts < required_db_node_count: + logging.warning( + "SKIPPING test named {}, for setup named {} of topology type {} given node_count={} and the setup has {} nodes.".format( + test_name, + setup_name, + setup_type, + required_db_node_count, + n_db_hosts, + ) + ) + continue + + else: + logging.info( + "Setup requires {} db nodes. This remote setup has {}. All OK".format( + required_db_node_count, n_db_hosts + ) + ) + # after we've created the env, even on error we should always teardown # in case of some unexpected error we fail the test try: @@ -505,6 +528,8 @@ def run_remote_command_logic(args, project_name, project_version): redis_password, flushall_on_every_test_start, ignore_keyspace_errors, + shard_placement, + required_db_node_count, ) if benchmark_type == "read-only": ro_benchmark_set( @@ -792,6 +817,7 @@ def run_remote_command_logic(args, project_name, project_version): tf_triggering_env, {"metric-type": "redis-metrics"}, expire_ms, + n_db_hosts, ) if collect_commandstats: ( @@ -815,6 +841,7 @@ def run_remote_command_logic(args, project_name, project_version): tf_triggering_env, {"metric-type": "commandstats"}, expire_ms, + n_db_hosts, ) ( end_time_ms, @@ -837,6 +864,7 @@ def run_remote_command_logic(args, project_name, project_version): tf_triggering_env, {"metric-type": "latencystats"}, expire_ms, + n_db_hosts, ) if setup_details["env"] is None: @@ -889,6 +917,10 @@ def run_remote_command_logic(args, project_name, project_version): tf_github_repo, tf_triggering_env, metadata_tags, + None, + None, + None, + n_db_hosts, ) if branch_target_tables is not None: for ( @@ -1196,6 +1228,7 @@ def export_redis_metrics( tf_triggering_env, metadata_dict=None, expire_ms=0, + n_db_nodes=1, ): datapoint_errors = 0 datapoint_inserts = 0 @@ -1230,11 +1263,16 @@ def export_redis_metrics( metric_name, metric_value, ) in overall_end_time_metrics.items(): + setup_name_and_nodes = setup_name + if n_db_nodes > 1: + setup_name_and_nodes = setup_name_and_nodes + "-{}-nodes".format( + n_db_nodes + ) tsname_metric = "{}/{}/{}/benchmark_end/{}/{}".format( sprefix, test_name, by_variant, - setup_name, + setup_name_and_nodes, metric_name, ) diff --git a/redisbench_admin/run_remote/standalone.py b/redisbench_admin/run_remote/standalone.py index c76440f..bb0e263 100644 --- a/redisbench_admin/run_remote/standalone.py +++ b/redisbench_admin/run_remote/standalone.py @@ -17,7 +17,7 @@ def spin_up_standalone_remote_redis( temporary_dir, - server_public_ip, + server_public_ips, username, private_key, remote_module_files, @@ -36,6 +36,17 @@ def spin_up_standalone_remote_redis( redis_7, ) + if type(server_public_ips) is str: + server_public_ips = [server_public_ips] + server_public_ip = server_public_ips[0] + logging.info( + "Given we've received multiple IPs for DB server {} and this is a standalone we're using the first one: {}".format( + server_public_ips, server_public_ip + ) + ) + else: + server_public_ip = server_public_ips + # start redis-server commands = [initial_redis_cmd] res = execute_remote_commands( @@ -53,16 +64,21 @@ def spin_up_standalone_remote_redis( def cp_local_dbdir_to_remote( - dbdir_folder, private_key, server_public_ip, temporary_dir, username + dbdir_folder, private_key, server_public_ips, temporary_dir, username ): if dbdir_folder is not None: - logging.info( - "Copying entire content of {} into temporary path: {}".format( - dbdir_folder, temporary_dir + if type(server_public_ips) is str: + server_public_ips = [server_public_ips] + for server_public_ip in server_public_ips: + logging.info( + "Copying entire content of {} into temporary path: {} of remote IP {}".format( + dbdir_folder, temporary_dir, server_public_ip + ) ) - ) - ssh = SSHSession(server_public_ip, username, key_file=open(private_key, "r")) - ssh.put_all(dbdir_folder, temporary_dir) + ssh = SSHSession( + server_public_ip, username, key_file=open(private_key, "r") + ) + ssh.put_all(dbdir_folder, temporary_dir) def remote_module_files_cp( @@ -70,10 +86,13 @@ def remote_module_files_cp( port, private_key, remote_module_file_dir, - server_public_ip, + server_public_ips, username, ): remote_module_files = [] + if type(server_public_ips) is str: + server_public_ips = [server_public_ips] + if local_module_files is not None: for local_module_file in local_module_files: splitted_module_and_plugins = [] @@ -98,23 +117,24 @@ def remote_module_files_cp( remote_module_file_dir, os.path.basename(local_module_file_and_plugin), ) - # copy the module to the DB machine - copy_file_to_remote_setup( - server_public_ip, - username, - private_key, - local_module_file_and_plugin, - remote_module_file, - None, - port, - ) - execute_remote_commands( - server_public_ip, - username, - private_key, - ["chmod 755 {}".format(remote_module_file)], - port, - ) + for server_public_ip in server_public_ips: + # copy the module to the DB machine + copy_file_to_remote_setup( + server_public_ip, + username, + private_key, + local_module_file_and_plugin, + remote_module_file, + None, + port, + ) + execute_remote_commands( + server_public_ip, + username, + private_key, + ["chmod 755 {}".format(remote_module_file)], + port, + ) if pos > 1: remote_module_files_in = remote_module_files_in + " " remote_module_files_in = remote_module_files_in + remote_module_file diff --git a/redisbench_admin/utils/remote.py b/redisbench_admin/utils/remote.py index 9a7e815..6041d1d 100644 --- a/redisbench_admin/utils/remote.py +++ b/redisbench_admin/utils/remote.py @@ -106,6 +106,8 @@ def copy_file_to_remote_setup( def fetch_file_from_remote_setup( server_public_ip, username, private_key, local_file, remote_file ): + if type(server_public_ip) == list: + server_public_ip = server_public_ip[0] logging.info( "Retrieving remote file {} from remote server {} ".format( remote_file, server_public_ip @@ -148,6 +150,8 @@ def execute_remote_commands( def connect_remote_ssh(port, private_key, server_public_ip, username): + if type(server_public_ip) == list: + server_public_ip = server_public_ip[0] k = paramiko.RSAKey.from_private_key_file(private_key) c = paramiko.SSHClient() c.set_missing_host_key_policy(paramiko.AutoAddPolicy()) @@ -265,6 +269,7 @@ def setup_remote_environment( ) _, _, _ = tf.refresh() tf_output = tf.output() + logging.error("TF OUTPUT: {}".format(tf_output)) server_private_ip = tf_output_or_none(tf_output, "server_private_ip") server_public_ip = tf_output_or_none(tf_output, "server_public_ip") client_private_ip = tf_output_or_none(tf_output, "client_private_ip") @@ -291,17 +296,20 @@ def setup_remote_environment( "timeout_secs": tf_timeout_secs, }, ) - return retrieve_tf_connection_vars(return_code, tf) + tf_output = tf.output() + return retrieve_tf_connection_vars(return_code, tf_output) -def retrieve_tf_connection_vars(return_code, tf): - tf_output = tf.output() +def retrieve_tf_connection_vars(return_code, tf_output): + logging.error("TF OUTPUT setup_remote_environment: {}".format(tf_output)) server_private_ip = tf_output["server_private_ip"]["value"][0] server_public_ip = tf_output["server_public_ip"]["value"][0] server_plaintext_port = 6379 client_private_ip = tf_output["client_private_ip"]["value"][0] client_public_ip = tf_output["client_public_ip"]["value"][0] username = "ubuntu" + if "ssh_user" in tf_output: + username = tf_output["ssh_user"]["value"] return ( return_code, username, @@ -679,6 +687,7 @@ def extract_perversion_timeseries_from_results( build_variant_name=None, running_platform=None, testcase_metric_context_paths=[], + n_db_nodes=1, ): break_by_key = "version" break_by_str = "by.{}".format(break_by_key) @@ -699,6 +708,7 @@ def extract_perversion_timeseries_from_results( build_variant_name, running_platform, testcase_metric_context_paths, + n_db_nodes, ) return True, branch_time_series_dict, target_tables @@ -720,6 +730,7 @@ def common_timeseries_extraction( build_variant_name=None, running_platform=None, testcase_metric_context_paths=[], + n_db_nodes=1, ): time_series_dict = {} target_tables = {} @@ -754,6 +765,7 @@ def common_timeseries_extraction( tf_triggering_env, time_series_dict, use_metric_context_path, + n_db_nodes, ) target_tables[target_table_keyname] = target_table_dict @@ -782,6 +794,7 @@ def from_metric_kv_to_timeserie( tf_triggering_env, time_series_dict, use_metric_context_path, + n_db_nodes=1, ): timeserie_tags, ts_name = get_ts_tags_and_name( break_by_key, @@ -801,6 +814,7 @@ def from_metric_kv_to_timeserie( tf_github_repo, tf_triggering_env, use_metric_context_path, + n_db_nodes, ) time_series_dict[ts_name] = { "labels": timeserie_tags.copy(), @@ -877,6 +891,7 @@ def get_ts_tags_and_name( tf_github_repo, tf_triggering_env, use_metric_context_path, + n_db_nodes=1, ): # prepare tags timeserie_tags = get_project_ts_tags( @@ -888,6 +903,7 @@ def get_ts_tags_and_name( metadata_tags, build_variant_name, running_platform, + n_db_nodes, ) timeserie_tags[break_by_key] = break_by_value timeserie_tags["{}+{}".format("deployment_name", break_by_key)] = "{} {}".format( @@ -903,6 +919,7 @@ def get_ts_tags_and_name( test_name, build_variant_name ) timeserie_tags["metric"] = str(metric_name) + timeserie_tags["db_nodes"] = str(n_db_nodes) timeserie_tags["metric_name"] = metric_name timeserie_tags["metric_context_path"] = metric_context_path if metric_context_path is not None: @@ -926,6 +943,7 @@ def get_ts_tags_and_name( use_metric_context_path, build_variant_name, running_platform, + n_db_nodes, ) return timeserie_tags, ts_name @@ -939,6 +957,7 @@ def get_project_ts_tags( metadata_tags={}, build_variant_name=None, running_platform=None, + n_db_nodes=1, ): tags = { "github_org": tf_github_org, @@ -947,6 +966,7 @@ def get_project_ts_tags( "deployment_type": deployment_type, "deployment_name": deployment_name, "triggering_env": tf_triggering_env, + "n_db_nodes": str(n_db_nodes), } if build_variant_name is not None: tags["build_variant"] = build_variant_name @@ -972,6 +992,7 @@ def extract_perbranch_timeseries_from_results( build_variant_name=None, running_platform=None, testcase_metric_context_paths=[], + n_db_nodes=1, ): break_by_key = "branch" break_by_str = "by.{}".format(break_by_key) @@ -992,6 +1013,7 @@ def extract_perbranch_timeseries_from_results( build_variant_name, running_platform, testcase_metric_context_paths, + n_db_nodes, ) return True, branch_time_series_dict, target_tables diff --git a/redisbench_admin/utils/utils.py b/redisbench_admin/utils/utils.py index 4cd44be..6272511 100644 --- a/redisbench_admin/utils/utils.py +++ b/redisbench_admin/utils/utils.py @@ -326,6 +326,7 @@ def get_ts_metric_name( use_metric_context_path=False, build_variant_name=None, running_platform=None, + n_db_nodes=1, ): if use_metric_context_path: metric_name = "{}/{}".format(metric_name, metric_context_path) @@ -339,6 +340,8 @@ def get_ts_metric_name( deployment_name = "/{}".format(deployment_name) else: deployment_name = "" + if n_db_nodes > 1: + deployment_name = deployment_name + "-{}-nodes".format(n_db_nodes) ts_name = ( "ci.benchmarks.redislabs/{by}/" "{triggering_env}/{github_org}/{github_repo}/" diff --git a/tests/test_common.py b/tests/test_common.py index 7cca968..ac21f56 100644 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -396,7 +396,10 @@ def test_extract_test_feasible_setups(): ] assert standalone_setup_type == "oss-standalone" assert standalone_shard_count == 1 - n, t, c = get_setup_type_and_primaries_count(test_setups["oss-standalone"]) + n, t, c, placement, node_count = get_setup_type_and_primaries_count( + test_setups["oss-standalone"] + ) + assert node_count == 1 assert standalone_setup_type == t assert standalone_shard_count == c @@ -406,8 +409,11 @@ def test_extract_test_feasible_setups(): osscluster_shard_count = test_setups["oss-cluster-3-primaries"]["redis_topology"][ "primaries" ] - n, t, c = get_setup_type_and_primaries_count(test_setups["oss-cluster-3-primaries"]) + n, t, c, placement, node_count = get_setup_type_and_primaries_count( + test_setups["oss-cluster-3-primaries"] + ) assert osscluster_setup_type == t + assert node_count == 1 assert osscluster_shard_count == c # wrong read diff --git a/tests/test_oss_cluster.py b/tests/test_oss_cluster.py new file mode 100644 index 0000000..19123ed --- /dev/null +++ b/tests/test_oss_cluster.py @@ -0,0 +1,73 @@ +from redisbench_admin.environments.oss_cluster import split_primaries_per_db_nodes + + +def test_split_primaries_per_db_nodes(): + ( + primaries_per_node, + server_private_ips, + server_public_ips, + ) = split_primaries_per_db_nodes( + ["10.3.0.169", "10.3.0.55", "10.3.0.92"], + ["18.117.74.99", "3.144.3.160", "3.145.92.27"], + 4, + 1, + ) + assert primaries_per_node == [4] + assert server_private_ips == ["10.3.0.169"] + assert server_public_ips == ["18.117.74.99"] + + ( + primaries_per_node, + server_private_ips, + server_public_ips, + ) = split_primaries_per_db_nodes( + ["10.3.0.169", "10.3.0.55", "10.3.0.92"], + ["18.117.74.99", "3.144.3.160", "3.145.92.27"], + 4, + 2, + ) + assert primaries_per_node == [2, 2] + assert server_private_ips == ["10.3.0.169", "10.3.0.55"] + assert server_public_ips == ["18.117.74.99", "3.144.3.160"] + + ( + primaries_per_node, + server_private_ips, + server_public_ips, + ) = split_primaries_per_db_nodes( + ["10.3.0.169", "10.3.0.55", "10.3.0.92"], + ["18.117.74.99", "3.144.3.160", "3.145.92.27"], + 5, + 2, + ) + assert primaries_per_node == [3, 2] + assert server_private_ips == ["10.3.0.169", "10.3.0.55"] + assert server_public_ips == ["18.117.74.99", "3.144.3.160"] + + ( + primaries_per_node, + server_private_ips, + server_public_ips, + ) = split_primaries_per_db_nodes( + ["10.3.0.169", "10.3.0.55", "10.3.0.92"], + ["18.117.74.99", "3.144.3.160", "3.145.92.27"], + 6, + 3, + ) + assert primaries_per_node == [2, 2, 2] + assert server_private_ips == ["10.3.0.169", "10.3.0.55", "10.3.0.92"] + assert server_public_ips == ["18.117.74.99", "3.144.3.160", "3.145.92.27"] + + ( + primaries_per_node, + server_private_ips, + server_public_ips, + ) = split_primaries_per_db_nodes( + ["10.3.0.169", "10.3.0.55", "10.3.0.92"], + ["18.117.74.99", "3.144.3.160", "3.145.92.27"], + 6, + None, + ) + assert primaries_per_node == [2, 2, 2] + assert server_private_ips == ["10.3.0.169", "10.3.0.55", "10.3.0.92"] + assert server_public_ips == ["18.117.74.99", "3.144.3.160", "3.145.92.27"] diff --git a/tests/test_redistimeseries.py b/tests/test_redistimeseries.py index a6bd759..d71bb12 100644 --- a/tests/test_redistimeseries.py +++ b/tests/test_redistimeseries.py @@ -97,6 +97,8 @@ def test_timeseries_test_sucess_flow(): }, "build1", "platform1", + None, + 1, ) assert rts.exists(testcases_and_metric_context_path_setname) @@ -206,6 +208,8 @@ def test_timeseries_test_sucess_flow(): {"arch": "arm64", "os": "ubuntu:16.04", "compiler": "icc"}, "build", "platform2", + None, + 2, ) assert "arm64".encode() in rts.smembers(project_archs_setname) assert "ubuntu:16.04".encode() in rts.smembers(project_oss_setname) @@ -222,5 +226,40 @@ def test_timeseries_test_sucess_flow(): assert len(rts.smembers(project_branches_setname)) == 1 assert len(rts.smembers(project_versions_setname)) == 1 + # check multi-node timeseries + rts.flushall() + deployment_type = "oss-cluster" + deployment_name = "oss-cluster-02-primaries" + + timeseries_test_sucess_flow( + True, + project_version, + benchmark_config, + benchmark_duration_seconds, + dataset_load_duration_seconds, + metrics, + deployment_name, + deployment_type, + merged_exporter_timemetric_path, + results_dict, + rts, + start_time_ms, + test_name, + tf_github_branch, + tf_github_org, + tf_github_repo, + tf_triggering_env, + {"arch": "arm64", "os": "ubuntu:16.04", "compiler": "icc"}, + "build", + "platform2", + None, + 2, + ) + + keys = [x.decode() for x in rts.keys()] + for keyname in keys: + if "target_tables" not in keyname: + if rts.type(keyname) is "TSDB-TYPE": + assert "-2-nodes" in keyname except redis.exceptions.ConnectionError: pass diff --git a/tests/test_remote.py b/tests/test_remote.py index bcac11d..9db75b0 100644 --- a/tests/test_remote.py +++ b/tests/test_remote.py @@ -3,7 +3,6 @@ import redis import yaml - from redisbench_admin.run.redistimeseries import ( prepare_timeseries_dict, timeseries_test_sucess_flow, @@ -22,6 +21,7 @@ exporter_create_ts, get_overall_dashboard_keynames, common_timeseries_extraction, + retrieve_tf_connection_vars, ) @@ -474,3 +474,76 @@ def test_exporter_create_ts(): except redis.exceptions.ConnectionError: pass + + +def test_retrieve_tf_connection_vars(): + tf_output = { + "client_private_ip": { + "sensitive": False, + "type": ["tuple", ["string"]], + "value": ["10.3.0.235"], + }, + "client_public_ip": { + "sensitive": False, + "type": ["tuple", ["string"]], + "value": ["3.135.206.198"], + }, + "server_private_ip": { + "sensitive": False, + "type": ["tuple", ["string"]], + "value": ["10.3.0.53"], + }, + "server_public_ip": { + "sensitive": False, + "type": ["tuple", ["string"]], + "value": ["18.219.10.142"], + }, + } + ( + tf_return_code, + username, + server_private_ip, + server_public_ip, + server_plaintext_port, + client_private_ip, + client_public_ip, + ) = retrieve_tf_connection_vars(None, tf_output) + assert server_private_ip == "10.3.0.53" + assert server_public_ip == "18.219.10.142" + assert username == "ubuntu" + + tf_output_new = { + "client_private_ip": { + "sensitive": False, + "type": ["tuple", [["tuple", ["string"]]]], + "value": [["10.3.0.175"]], + }, + "client_public_ip": { + "sensitive": False, + "type": ["tuple", [["tuple", ["string"]]]], + "value": [["3.136.234.93"]], + }, + "server_private_ip": { + "sensitive": False, + "type": ["tuple", [["tuple", ["string", "string", "string"]]]], + "value": [["10.3.0.236", "10.3.0.9", "10.3.0.211"]], + }, + "server_public_ip": { + "sensitive": False, + "type": ["tuple", [["tuple", ["string", "string", "string"]]]], + "value": [["3.143.24.7", "13.58.158.80", "3.139.82.224"]], + }, + "ssh_user": {"sensitive": False, "type": "string", "value": "ec2"}, + } + ( + tf_return_code, + username, + server_private_ip, + server_public_ip, + server_plaintext_port, + client_private_ip, + client_public_ip, + ) = retrieve_tf_connection_vars(None, tf_output_new) + assert server_private_ip == ["10.3.0.236", "10.3.0.9", "10.3.0.211"] + assert server_public_ip == ["3.143.24.7", "13.58.158.80", "3.139.82.224"] + assert username == "ec2" diff --git a/tests/test_remote_env.py b/tests/test_remote_env.py index 39c1fb9..d617b6c 100644 --- a/tests/test_remote_env.py +++ b/tests/test_remote_env.py @@ -34,6 +34,8 @@ def test_remote_env_setup(): tf_setup_name_sufix = "suffix" benchmark_config = {} ( + n_db_hosts, + n_client_hosts, client_public_ip, server_plaintext_port, server_private_ip, @@ -59,6 +61,8 @@ def test_remote_env_setup(): assert client_public_ip == "2.2.2.2" assert server_private_ip == "10.0.0.1" assert server_public_ip == "1.1.1.1" + assert n_client_hosts == 1 + assert n_db_hosts == 1 # using inventory but missing one manadatory key args = parser.parse_args(