From 17bf20711e079e4ad61e2b6adc61da87669a08b3 Mon Sep 17 00:00:00 2001 From: Alay Shah Date: Fri, 3 May 2024 16:29:44 -0700 Subject: [PATCH 01/15] Branch out train from federate --- .../scheduler/comm_utils/job_utils.py | 112 ++++++-- .../scheduler/master/server_runner.py | 27 +- .../scheduler/slave/client_runner.py | 271 ++++++++---------- 3 files changed, 209 insertions(+), 201 deletions(-) diff --git a/python/fedml/computing/scheduler/comm_utils/job_utils.py b/python/fedml/computing/scheduler/comm_utils/job_utils.py index 384cbacd1d..036e01490c 100644 --- a/python/fedml/computing/scheduler/comm_utils/job_utils.py +++ b/python/fedml/computing/scheduler/comm_utils/job_utils.py @@ -13,18 +13,51 @@ from fedml.computing.scheduler.slave.client_constants import ClientConstants from fedml.computing.scheduler.comm_utils.sys_utils import get_python_program from fedml.computing.scheduler.scheduler_core.compute_cache_manager import ComputeCacheManager +from ..scheduler_entry.constants import Constants from dataclasses import dataclass, field, fields from fedml.computing.scheduler.slave.client_data_interface import FedMLClientDataInterface from fedml.core.common.singleton import Singleton from fedml.computing.scheduler.comm_utils.container_utils import ContainerUtils -from typing import List +from typing import List, Dict, Any, Optional import threading import json run_docker_without_gpu = False +@dataclass +class JobArgs: + request_json: Dict[str, Any] + conf_file_object: Any + fedml_config_object: field(default_factory=dict) + client_rank: Optional[int] = None + + def __post_init__(self): + self.run_config = self.request_json.get("run_config", {}) + self.run_params = self.run_config.get("parameters", {}) + self.client_rank = self.request_json.get("client_rank", 1) if self.client_rank is None else self.client_rank + self.job_yaml = self.run_params.get("job_yaml", {}) + self.job_yaml_default_none = self.run_params.get("job_yaml", None) + self.job_api_key = self.run_params.get("job_api_key", None) + self.job_api_key = self.job_yaml.get("fedml_run_dynamic_params", None) if self.job_api_key is None else self.job_api_key + self.assigned_gpu_ids = self.run_params.get("gpu_ids", None) + self.job_type = self.job_yaml.get("job_type", None) + # TODO: Can we remove task_type? + self.job_type = self.job_yaml.get("task_type", Constants.JOB_TASK_TYPE_TRAIN) if self.job_type is None else self.job_type + self.containerize = self.fedml_config_object.get("containerize", None) + self.image_pull_policy = self.fedml_config_object.get("image_pull_policy", Constants.IMAGE_PULL_POLICY_ALWAYS) + self.entry_args_dict = self.conf_file_object.get("fedml_entry_args", {}) + self.entry_args = self.entry_args_dict.get("entry_args", None) + self.scheduler_match_info = self.request_json.get("scheduler_match_info", {}) + self.env_args = self.fedml_config_object.get("environment_args", None) + self.docker_args = JobRunnerUtils.create_instance_from_dict(DockerArgs, + self.fedml_config_object.get("docker", {})) + self.executable_interpreter = ClientConstants.CLIENT_SHELL_PS \ + if platform.system() == ClientConstants.PLATFORM_WINDOWS else ClientConstants.CLIENT_SHELL_BASH + self.framework_type = self.job_yaml.get("framework_type", None) + + @dataclass class DockerArgs: image: str = SchedulerConstants.FEDML_DEFAULT_LAUNCH_IMAGE @@ -393,30 +426,49 @@ def create_instance_from_dict(data_class, input_dict: {}): return instance @staticmethod - def generate_bootstrap_commands(bootstrap_script_path, bootstrap_script_dir, bootstrap_script_file): - if os.path.exists(bootstrap_script_path): - bootstrap_stat = os.stat(bootstrap_script_path) - if platform.system() == 'Windows': - os.chmod(bootstrap_script_path, - bootstrap_stat.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) - bootstrap_scripts = "{}".format(bootstrap_script_path) - else: - os.chmod(bootstrap_script_path, - bootstrap_stat.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) - bootstrap_scripts = "cd {}; ./{}".format( - bootstrap_script_dir, os.path.basename(bootstrap_script_file)) - - bootstrap_scripts = str(bootstrap_scripts).replace('\\', os.sep).replace('/', os.sep) - shell_cmd_list = list() - shell_cmd_list.append(bootstrap_scripts) - return shell_cmd_list + def generate_bootstrap_commands(env_args, unzip_package_path) -> (List[str], str): + bootstrap_cmd_list = list() + bootstrap_script_path, bootstrap_script_dir, bootstrap_script_file = [None] * 3 + + if env_args is not None: + bootstrap_script_file = env_args.get("bootstrap", None) + if bootstrap_script_file is not None: + bootstrap_script_file = str(bootstrap_script_file).replace('\\', os.sep).replace('/', os.sep) + if platform.system() == 'Windows': + bootstrap_script_file = bootstrap_script_file.rstrip('.sh') + '.bat' + if bootstrap_script_file is not None: + bootstrap_script_dir = os.path.join(unzip_package_path, "fedml", + os.path.dirname(bootstrap_script_file)) + bootstrap_script_path = os.path.join( + bootstrap_script_dir, bootstrap_script_dir, os.path.basename(bootstrap_script_file) + ) + + if bootstrap_script_path: + logging.info("Bootstrap commands are being generated...") + if os.path.exists(bootstrap_script_path): + bootstrap_stat = os.stat(bootstrap_script_path) + if platform.system() == 'Windows': + os.chmod(bootstrap_script_path, + bootstrap_stat.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) + bootstrap_scripts = "{}".format(bootstrap_script_path) + else: + os.chmod(bootstrap_script_path, + bootstrap_stat.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) + bootstrap_scripts = "cd {}; ./{}".format( + bootstrap_script_dir, os.path.basename(bootstrap_script_file)) + + bootstrap_scripts = str(bootstrap_scripts).replace('\\', os.sep).replace('/', os.sep) + bootstrap_cmd_list.append(bootstrap_scripts) + if len(bootstrap_cmd_list): + logging.info(f"Generated following Bootstrap commands: {bootstrap_cmd_list}") + else: + logging.info("No Bootstrap commands generated.") + return bootstrap_cmd_list, bootstrap_script_file @staticmethod - def generate_job_execute_commands(run_id, edge_id, version, - package_type, executable_interpreter, entry_file_full_path, - conf_file_object, entry_args, assigned_gpu_ids, - job_api_key, client_rank, scheduler_match_info=None, - cuda_visible_gpu_ids_str=None): + def generate_job_execute_commands(run_id, edge_id, version, package_type, entry_file_full_path, + job_args: JobArgs, cuda_visible_gpu_ids_str: str = None): + shell_cmd_list = list() entry_commands_origin = list() @@ -430,12 +482,12 @@ def generate_job_execute_commands(run_id, edge_id, version, # Generate the export env list for publishing environment variables export_cmd = "set" if platform.system() == "Windows" else "export" export_config_env_list, config_env_name_value_map = JobRunnerUtils.parse_config_args_as_env_variables( - export_cmd, conf_file_object) + export_cmd, job_args.conf_file_object) # Generate the export env list about scheduler matching info for publishing environment variables export_match_env_list, match_env_name_value_map = \ JobRunnerUtils.assign_matched_resources_to_run_and_generate_envs( - run_id, export_cmd, scheduler_match_info + run_id, export_cmd, job_args.scheduler_match_info ) # Replace entry commands with environment variable values @@ -447,7 +499,7 @@ def generate_job_execute_commands(run_id, edge_id, version, ) # Replace entry arguments with environment variable values - entry_args = JobRunnerUtils.replace_entry_args_with_env_variable(entry_args, config_env_name_value_map) + entry_args = JobRunnerUtils.replace_entry_args_with_env_variable(job_args.entry_args, config_env_name_value_map) entry_args = JobRunnerUtils.replace_entry_args_with_env_variable(entry_args, match_env_name_value_map) # Add the export env list to the entry commands @@ -462,13 +514,13 @@ def generate_job_execute_commands(run_id, edge_id, version, entry_commands.insert(0, f"{export_cmd} FEDML_CURRENT_VERSION={version}\n") entry_commands.insert(0, f"{export_cmd} FEDML_ENV_VERSION={version}\n") entry_commands.insert(0, f"{export_cmd} FEDML_USING_MLOPS=true\n") - entry_commands.insert(0, f"{export_cmd} FEDML_CLIENT_RANK={client_rank}\n") + entry_commands.insert(0, f"{export_cmd} FEDML_CLIENT_RANK={job_args.client_rank}\n") entry_commands.insert(0, f"{export_cmd} FEDML_ENV_LOCAL_ON_PREMISE_PLATFORM_HOST={fedml.get_local_on_premise_platform_host()}\n") entry_commands.insert(0, f"{export_cmd} FEDML_ENV_LOCAL_ON_PREMISE_PLATFORM_PORT={fedml.get_local_on_premise_platform_port()}\n") - if job_api_key is not None and str(job_api_key).strip() != "": - random_out = sys_utils.random2(job_api_key, "FEDML@88119999GREAT") + if job_args.job_api_key is not None and str(job_args.job_api_key).strip() != "": + random_out = sys_utils.random2(job_args.job_api_key, "FEDML@88119999GREAT") random_list = random_out.split("FEDML_NEXUS@") entry_commands.insert(0, f"{export_cmd} FEDML_RUN_API_KEY={random_list[1]}\n") @@ -510,7 +562,7 @@ def generate_job_execute_commands(run_id, edge_id, version, entry_file_handle.close() # Generate the shell commands to be executed - shell_cmd_list.append(f"{executable_interpreter} {entry_file_full_path}") + shell_cmd_list.append(f"{job_args.executable_interpreter} {entry_file_full_path}") return shell_cmd_list diff --git a/python/fedml/computing/scheduler/master/server_runner.py b/python/fedml/computing/scheduler/master/server_runner.py index 238349a3e4..8054600a26 100755 --- a/python/fedml/computing/scheduler/master/server_runner.py +++ b/python/fedml/computing/scheduler/master/server_runner.py @@ -28,7 +28,7 @@ from ..comm_utils.job_cleanup import JobCleanup from ..scheduler_core.scheduler_matcher import SchedulerMatcher from ..comm_utils.constants import SchedulerConstants -from ..comm_utils.job_utils import JobRunnerUtils +from ..comm_utils.job_utils import JobRunnerUtils, JobArgs from ..comm_utils.run_process_utils import RunProcessUtils from ....core.mlops.mlops_runtime_log import MLOpsRuntimeLog @@ -902,24 +902,11 @@ def should_continue_run_job(self, run_id): return True def execute_job_task(self, entry_file_full_path, conf_file_full_path, run_id): - run_config = self.request_json["run_config"] - run_params = run_config.get("parameters", {}) - job_yaml = run_params.get("job_yaml", {}) - job_yaml_default_none = run_params.get("job_yaml", None) - job_api_key = job_yaml.get("run_api_key", None) - job_api_key = job_yaml.get("fedml_run_dynamic_params", None) if job_api_key is None else job_api_key - assigned_gpu_ids = run_params.get("gpu_ids", None) - framework_type = job_yaml.get("framework_type", None) - job_type = job_yaml.get("job_type", None) - job_type = job_yaml.get("task_type", Constants.JOB_TASK_TYPE_TRAIN) if job_type is None else job_type - conf_file_object = load_yaml_config(conf_file_full_path) - entry_args_dict = conf_file_object.get("fedml_entry_args", {}) - entry_args = entry_args_dict.get("arg_items", None) - - executable_interpreter = ClientConstants.CLIENT_SHELL_PS \ - if platform.system() == ClientConstants.PLATFORM_WINDOWS else ClientConstants.CLIENT_SHELL_BASH + job_args = JobArgs(request_json=self.request_json, + conf_file_object=load_yaml_config(conf_file_full_path), + client_rank=0) - if job_yaml_default_none is None: + if job_args.job_yaml_default_none is None: # Generate the job executing commands for previous federated learning (Compatibility) python_program = get_python_program() logging.info("Run the server: {} {} --cf {} --rank 0 --role server".format( @@ -942,9 +929,7 @@ def execute_job_task(self, entry_file_full_path, conf_file_full_path, run_id): # Generate the job executing commands job_executing_commands = JobRunnerUtils.generate_job_execute_commands( run_id=self.run_id, edge_id=self.edge_id, version=self.version, package_type=self.package_type, - executable_interpreter=executable_interpreter, entry_file_full_path=entry_file_full_path, - conf_file_object=conf_file_object, entry_args=entry_args, assigned_gpu_ids=assigned_gpu_ids, - job_api_key=job_api_key, client_rank=0) + entry_file_full_path=entry_file_full_path, job_args=job_args) # Run the job executing commands logging.info(f"Run the server job with job id {self.run_id}, device id {self.edge_id}.") diff --git a/python/fedml/computing/scheduler/slave/client_runner.py b/python/fedml/computing/scheduler/slave/client_runner.py index 79b5697728..a78b23fc0c 100755 --- a/python/fedml/computing/scheduler/slave/client_runner.py +++ b/python/fedml/computing/scheduler/slave/client_runner.py @@ -18,11 +18,12 @@ from urllib.parse import urljoin, urlparse import requests +from fedml.computing.scheduler.comm_utils import job_utils import fedml from ..comm_utils.constants import SchedulerConstants from ..comm_utils.job_cleanup import JobCleanup -from ..comm_utils.job_utils import JobRunnerUtils, DockerArgs +from ..comm_utils.job_utils import JobRunnerUtils, DockerArgs, JobArgs from ..comm_utils.run_process_utils import RunProcessUtils from ..scheduler_entry.constants import Constants from ....core.mlops.mlops_device_perfs import MLOpsDevicePerfStats @@ -139,7 +140,7 @@ def __repr__(self): def copy_runner(self): copy_runner = FedMLClientRunner(self.args) - copy_runner.disable_client_login = self.disable_client_login + copy_runner.disable_client_login = self.disable_client_login copy_runner.model_device_server = self.model_device_server copy_runner.model_device_client_list = self.model_device_client_list copy_runner.run_process_event = self.run_process_event @@ -161,7 +162,7 @@ def copy_runner(self): copy_runner.unique_device_id = self.unique_device_id copy_runner.args = self.args copy_runner.request_json = self.request_json - copy_runner.version =self.version + copy_runner.version = self.version copy_runner.device_id = self.device_id copy_runner.cur_dir = self.cur_dir copy_runner.cur_dir = self.cur_dir @@ -555,41 +556,52 @@ def run_impl(self): logging.info(" ") logging.info("====Your Run Logs Begin===") - process, is_launch_task, error_list = self.execute_job_task(unzip_package_path=unzip_package_path, - entry_file_full_path=entry_file_full_path, - conf_file_full_path=conf_file_full_path, - dynamic_args_config=dynamic_args_config, - fedml_config_object=self.fedml_config_object) - - logging.info("====Your Run Logs End===") - logging.info(" ") - logging.info(" ") - - ret_code, out, err = process.returncode if process else None, None, None - is_run_ok = sys_utils.is_runner_finished_normally(process.pid) - if is_launch_task: - is_run_ok = True - if error_list is not None and len(error_list) > 0: - is_run_ok = False - if ret_code is None or ret_code <= 0: - self.check_runner_stop_event() - - if is_run_ok: - if out is not None: - out_str = sys_utils.decode_our_err_result(out) - if out_str != "": - logging.info("{}".format(out_str)) + job_args = JobArgs(request_json=self.request_json, + conf_file_object=load_yaml_config(conf_file_full_path), + fedml_config_object=fedml_config_object) - self.mlops_metrics.report_client_id_status( - self.edge_id, ClientConstants.MSG_MLOPS_CLIENT_STATUS_FINISHED, - server_id=self.server_id, run_id=run_id) + is_run_ok = False + if job_args.job_type == Constants.JOB_TASK_TYPE_TRAIN: + process, is_launch_task, error_list = self.execute_train_job_task(job_args=job_args, + unzip_package_path=unzip_package_path, + entry_file_full_path=entry_file_full_path) - if is_launch_task: - sys_utils.log_return_info(f"job {run_id}", ret_code) - else: - sys_utils.log_return_info(entry_file, ret_code) else: - is_run_ok = False + process, is_launch_task, error_list = self.execute_job_task(unzip_package_path=unzip_package_path, + entry_file_full_path=entry_file_full_path, + conf_file_full_path=conf_file_full_path, + dynamic_args_config=dynamic_args_config, + fedml_config_object=self.fedml_config_object) + + logging.info("====Your Run Logs End===") + logging.info(" ") + logging.info(" ") + + ret_code, out, err = process.returncode if process else None, None, None + is_run_ok = sys_utils.is_runner_finished_normally(process.pid) + if is_launch_task: + is_run_ok = True + if error_list is not None and len(error_list) > 0: + is_run_ok = False + if ret_code is None or ret_code <= 0: + self.check_runner_stop_event() + + if is_run_ok: + if out is not None: + out_str = sys_utils.decode_our_err_result(out) + if out_str != "": + logging.info("{}".format(out_str)) + + self.mlops_metrics.report_client_id_status( + self.edge_id, ClientConstants.MSG_MLOPS_CLIENT_STATUS_FINISHED, + server_id=self.server_id, run_id=run_id) + + if is_launch_task: + sys_utils.log_return_info(f"job {run_id}", ret_code) + else: + sys_utils.log_return_info(entry_file, ret_code) + else: + is_run_ok = False if not is_run_ok: # If the run status is killed or finished, then return with the normal state. @@ -617,75 +629,70 @@ def run_impl(self): self.edge_id, ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED, server_id=self.server_id, run_id=run_id) - def execute_job_task(self, unzip_package_path, entry_file_full_path, conf_file_full_path, dynamic_args_config, + def execute_train_job_task(self, job_args: JobArgs, unzip_package_path, entry_file_full_path): + self.check_runner_stop_event() + + self.mlops_metrics.report_client_id_status( + self.edge_id, ClientConstants.MSG_MLOPS_CLIENT_STATUS_RUNNING, run_id=self.run_id) + + bootstrap_cmd_list, bootstrap_script_file = JobRunnerUtils.generate_bootstrap_commands(job_args.env_args, + unzip_package_path) + + try: + job_executing_commands = JobRunnerUtils.generate_launch_docker_command(docker_args=job_args.docker_args, + run_id=self.run_id, + edge_id=self.edge_id, + unzip_package_path=unzip_package_path, + executable_interpreter=job_args.executable_interpreter, + entry_file_full_path=entry_file_full_path, + bootstrap_cmd_list=bootstrap_cmd_list, + cuda_visible_gpu_ids_str=self.cuda_visible_gpu_ids_str, + image_pull_policy=job_args.image_pull_policy) + + except Exception as e: + logging.error(f"Error occurred while generating containerized launch commands. " + f"Exception: {e}, Traceback: {traceback.format_exc()}") + return None, None, None + + if not job_executing_commands: + raise Exception("Failed to generate docker execution command") + + # Run the job executing commands + logging.info(f"Run the client job with job id {self.run_id}, device id {self.edge_id}.") + process, error_list = ClientConstants.execute_commands_with_live_logs( + job_executing_commands, callback=self.start_job_perf, error_processor=self.job_error_processor, + should_write_log_file=False if job_args.job_type == Constants.JOB_TASK_TYPE_FEDERATE else True) + is_launch_task = False if job_args.job_type == Constants.JOB_TASK_TYPE_FEDERATE else True + + return process, is_launch_task, error_list + + def execute_job_task(self, job_args: JobArgs, unzip_package_path, entry_file_full_path, conf_file_full_path, dynamic_args_config, fedml_config_object): - run_config = self.request_json["run_config"] - run_params = run_config.get("parameters", {}) - client_rank = self.request_json.get("client_rank", 1) - job_yaml = run_params.get("job_yaml", {}) - job_yaml_default_none = run_params.get("job_yaml", None) - job_api_key = job_yaml.get("run_api_key", None) - job_api_key = job_yaml.get("fedml_run_dynamic_params", None) if job_api_key is None else job_api_key - assigned_gpu_ids = run_params.get("gpu_ids", None) - job_type = job_yaml.get("job_type", None) - containerize = fedml_config_object.get("containerize", None) - image_pull_policy = fedml_config_object.get("image_pull_policy", Constants.IMAGE_PULL_POLICY_ALWAYS) - # TODO: Can we remove task_type? - job_type = job_yaml.get("task_type", Constants.JOB_TASK_TYPE_TRAIN) if job_type is None else job_type - conf_file_object = load_yaml_config(conf_file_full_path) - entry_args_dict = conf_file_object.get("fedml_entry_args", {}) - entry_args = entry_args_dict.get("arg_items", None) - scheduler_match_info = self.request_json.get("scheduler_match_info", {}) - if job_type == Constants.JOB_TASK_TYPE_TRAIN: - containerize = True if containerize is None else containerize # Bootstrap Info - bootstrap_script_path, bootstrap_script_dir, bootstrap_script_file = [None] * 3 env_args = fedml_config_object.get("environment_args", None) - - if env_args is not None: - bootstrap_script_file = env_args.get("bootstrap", None) - if bootstrap_script_file is not None: - bootstrap_script_file = str(bootstrap_script_file).replace('\\', os.sep).replace('/', os.sep) - if platform.system() == 'Windows': - bootstrap_script_file = bootstrap_script_file.rstrip('.sh') + '.bat' - if bootstrap_script_file is not None: - bootstrap_script_dir = os.path.join(unzip_package_path, "fedml", - os.path.dirname(bootstrap_script_file)) - bootstrap_script_path = os.path.join( - bootstrap_script_dir, bootstrap_script_dir, os.path.basename(bootstrap_script_file) - ) - - bootstrap_cmd_list = list() - if bootstrap_script_path: - logging.info("Bootstrap commands are being generated...") - bootstrap_cmd_list = JobRunnerUtils.generate_bootstrap_commands(bootstrap_script_path=bootstrap_script_path, - bootstrap_script_dir=bootstrap_script_dir, - bootstrap_script_file=bootstrap_script_file) - logging.info(f"Generated following Bootstrap commands: {bootstrap_cmd_list}") - - if not containerize: - if len(bootstrap_cmd_list) and not (job_type == Constants.JOB_TASK_TYPE_DEPLOY or - job_type == Constants.JOB_TASK_TYPE_SERVE): - bootstrapping_successful = self.run_bootstrap_script(bootstrap_cmd_list=bootstrap_cmd_list, - bootstrap_script_file=bootstrap_script_file) - - if not bootstrapping_successful: - logging.info("failed to update local fedml config.") - self.check_runner_stop_event() - # Send failed msg when exceptions. - self.cleanup_run_when_starting_failed(status=ClientConstants.MSG_MLOPS_CLIENT_STATUS_EXCEPTION) - raise Exception(f"Failed to execute following bootstrap commands: {bootstrap_cmd_list}") - - logging.info("cleanup the previous learning process and bootstrap process...") - ClientConstants.cleanup_learning_process(self.request_json["runId"]) - ClientConstants.cleanup_bootstrap_process(self.request_json["runId"]) - - executable_interpreter = ClientConstants.CLIENT_SHELL_PS \ - if platform.system() == ClientConstants.PLATFORM_WINDOWS else ClientConstants.CLIENT_SHELL_BASH - - if job_yaml_default_none is None: - # Generate the job executing commands for previous federated learning (Compatibility) + bootstrap_cmd_list, bootstrap_script_file = JobRunnerUtils.generate_bootstrap_commands(env_args, + unzip_package_path) + + # if not containerize: + if len(bootstrap_cmd_list) and not (job_args.job_type == Constants.JOB_TASK_TYPE_DEPLOY or + job_args.job_type == Constants.JOB_TASK_TYPE_SERVE): + bootstrapping_successful = self.run_bootstrap_script(bootstrap_cmd_list=bootstrap_cmd_list, + bootstrap_script_file=bootstrap_script_file) + + if not bootstrapping_successful: + logging.info("failed to update local fedml config.") + self.check_runner_stop_event() + # Send failed msg when exceptions. + self.cleanup_run_when_starting_failed(status=ClientConstants.MSG_MLOPS_CLIENT_STATUS_EXCEPTION) + raise Exception(f"Failed to execute following bootstrap commands: {bootstrap_cmd_list}") + + logging.info("cleanup the previous learning process and bootstrap process...") + ClientConstants.cleanup_learning_process(self.request_json["runId"]) + ClientConstants.cleanup_bootstrap_process(self.request_json["runId"]) + + # Generate the job executing commands for previous federated learning (Compatibility) + if job_args.job_yaml_default_none is None: python_program = get_python_program() logging.info("Run the client: {} {} --cf {} --rank {} --role client".format( python_program, entry_file_full_path, conf_file_full_path, str(dynamic_args_config.get("rank", 1)))) @@ -698,49 +705,8 @@ def execute_job_task(self, unzip_package_path, entry_file_full_path, conf_file_f process, error_list = ClientConstants.execute_commands_with_live_logs( shell_cmd_list, callback=self.callback_start_fl_job, should_write_log_file=False) is_launch_task = False - else: - self.check_runner_stop_event() - self.mlops_metrics.report_client_id_status( - self.edge_id, ClientConstants.MSG_MLOPS_CLIENT_STATUS_RUNNING, run_id=self.run_id) - - # Generate the job executing commands - job_executing_commands = JobRunnerUtils.generate_job_execute_commands( - self.run_id, self.edge_id, self.version, - self.package_type, executable_interpreter, entry_file_full_path, - conf_file_object, entry_args, assigned_gpu_ids, - job_api_key, client_rank, scheduler_match_info=scheduler_match_info, - cuda_visible_gpu_ids_str=self.cuda_visible_gpu_ids_str) - - if containerize is not None and containerize is True: - docker_args = fedml_config_object.get("docker", {}) - docker_args = JobRunnerUtils.create_instance_from_dict(DockerArgs, docker_args) - try: - job_executing_commands = JobRunnerUtils.generate_launch_docker_command(docker_args=docker_args, - run_id=self.run_id, - edge_id=self.edge_id, - unzip_package_path=unzip_package_path, - executable_interpreter=executable_interpreter, - entry_file_full_path=entry_file_full_path, - bootstrap_cmd_list=bootstrap_cmd_list, - cuda_visible_gpu_ids_str=self.cuda_visible_gpu_ids_str, - image_pull_policy=image_pull_policy) - except Exception as e: - logging.error(f"Error occurred while generating containerized launch commands. " - f"Exception: {e}, Traceback: {traceback.format_exc()}") - return None, None, None - - if not job_executing_commands: - raise Exception("Failed to generate docker execution command") - - # Run the job executing commands - logging.info(f"Run the client job with job id {self.run_id}, device id {self.edge_id}.") - process, error_list = ClientConstants.execute_commands_with_live_logs( - job_executing_commands, callback=self.start_job_perf, error_processor=self.job_error_processor, - should_write_log_file=False if job_type == Constants.JOB_TASK_TYPE_FEDERATE else True) - is_launch_task = False if job_type == Constants.JOB_TASK_TYPE_FEDERATE else True - - return process, is_launch_task, error_list + return process, is_launch_task, error_list def callback_start_fl_job(self, job_pid): ClientConstants.save_learning_process(self.run_id, job_pid) @@ -1001,6 +967,7 @@ def callback_start_train(self, topic, payload): self.run_process_event_map[run_id_str], self.run_process_completed_event_map[run_id_str], self.message_center.get_message_queue())) self.run_process_map[run_id_str].start() + # FIXME (@alaydshah): Seems this info is not really being persisted. ClientConstants.save_run_process(run_id, self.run_process_map[run_id_str].pid) def callback_stop_train(self, topic, payload): @@ -1238,7 +1205,7 @@ def response_device_info_to_mlops(self, topic, payload): if context is not None: response_payload["context"] = context self.message_center.send_message(response_topic, json.dumps(response_payload), run_id=run_id) - + def callback_report_device_info(self, topic, payload): payload_json = json.loads(payload) server_id = payload_json.get("server_id", 0) @@ -1555,7 +1522,6 @@ def on_agent_mqtt_connected(self, mqtt_client_object): self.add_message_listener(topic_stop_train, self.callback_stop_train) self.mqtt_mgr.add_message_listener(topic_stop_train, self.listener_message_dispatch_center) - # Setup MQTT message listener for client status switching topic_client_status = "fl_client/flclient_agent_" + str(self.edge_id) + "/status" self.add_message_listener(topic_client_status, self.callback_runner_id_status) @@ -1578,20 +1544,25 @@ def on_agent_mqtt_connected(self, mqtt_client_object): topic_request_edge_device_info_from_mlops = f"deploy/mlops/slave_agent/request_device_info/{self.edge_id}" self.add_message_listener(topic_request_edge_device_info_from_mlops, self.response_device_info_to_mlops) - self.mqtt_mgr.add_message_listener(topic_request_edge_device_info_from_mlops, self.listener_message_dispatch_center) + self.mqtt_mgr.add_message_listener(topic_request_edge_device_info_from_mlops, + self.listener_message_dispatch_center) topic_request_deploy_master_device_info_from_mlops = None if self.model_device_server_id is not None: topic_request_deploy_master_device_info_from_mlops = f"deploy/mlops/master_agent/request_device_info/{self.model_device_server_id}" - self.add_message_listener(topic_request_deploy_master_device_info_from_mlops, self.response_device_info_to_mlops) - self.mqtt_mgr.add_message_listener(topic_request_deploy_master_device_info_from_mlops, self.listener_message_dispatch_center) + self.add_message_listener(topic_request_deploy_master_device_info_from_mlops, + self.response_device_info_to_mlops) + self.mqtt_mgr.add_message_listener(topic_request_deploy_master_device_info_from_mlops, + self.listener_message_dispatch_center) topic_request_deploy_slave_device_info_from_mlops = None if self.model_device_client_edge_id_list is not None and len(self.model_device_client_edge_id_list) > 0: topic_request_deploy_slave_device_info_from_mlops = f"deploy/mlops/slave_agent/request_device_info/{self.model_device_client_edge_id_list[0]}" - self.add_message_listener(topic_request_deploy_slave_device_info_from_mlops, self.response_device_info_to_mlops) - self.mqtt_mgr.add_message_listener(topic_request_deploy_slave_device_info_from_mlops, self.listener_message_dispatch_center) - + self.add_message_listener(topic_request_deploy_slave_device_info_from_mlops, + self.response_device_info_to_mlops) + self.mqtt_mgr.add_message_listener(topic_request_deploy_slave_device_info_from_mlops, + self.listener_message_dispatch_center) + # Setup MQTT message listener to logout from MLOps. topic_client_logout = "mlops/client/logout/" + str(self.edge_id) self.add_message_listener(topic_client_logout, self.callback_client_logout) From d10ce964f0ba4477be8c0da566b6411ece027e93 Mon Sep 17 00:00:00 2001 From: Alay Shah Date: Fri, 3 May 2024 17:19:51 -0700 Subject: [PATCH 02/15] Keep federate execute component as it is --- .../scheduler/comm_utils/job_utils.py | 24 ++++--- .../scheduler/master/server_runner.py | 27 +++++-- .../scheduler/slave/client_runner.py | 70 ++++++++++--------- 3 files changed, 71 insertions(+), 50 deletions(-) diff --git a/python/fedml/computing/scheduler/comm_utils/job_utils.py b/python/fedml/computing/scheduler/comm_utils/job_utils.py index 036e01490c..cca1641280 100644 --- a/python/fedml/computing/scheduler/comm_utils/job_utils.py +++ b/python/fedml/computing/scheduler/comm_utils/job_utils.py @@ -427,7 +427,7 @@ def create_instance_from_dict(data_class, input_dict: {}): @staticmethod def generate_bootstrap_commands(env_args, unzip_package_path) -> (List[str], str): - bootstrap_cmd_list = list() + bootstrap_cmd_list, bootstrap_script_path = list() bootstrap_script_path, bootstrap_script_dir, bootstrap_script_file = [None] * 3 if env_args is not None: @@ -466,9 +466,11 @@ def generate_bootstrap_commands(env_args, unzip_package_path) -> (List[str], str return bootstrap_cmd_list, bootstrap_script_file @staticmethod - def generate_job_execute_commands(run_id, edge_id, version, package_type, entry_file_full_path, - job_args: JobArgs, cuda_visible_gpu_ids_str: str = None): - + def generate_job_execute_commands(run_id, edge_id, version, + package_type, executable_interpreter, entry_file_full_path, + conf_file_object, entry_args, assigned_gpu_ids, + job_api_key, client_rank, scheduler_match_info=None, + cuda_visible_gpu_ids_str=None): shell_cmd_list = list() entry_commands_origin = list() @@ -482,12 +484,12 @@ def generate_job_execute_commands(run_id, edge_id, version, package_type, entry_ # Generate the export env list for publishing environment variables export_cmd = "set" if platform.system() == "Windows" else "export" export_config_env_list, config_env_name_value_map = JobRunnerUtils.parse_config_args_as_env_variables( - export_cmd, job_args.conf_file_object) + export_cmd, conf_file_object) # Generate the export env list about scheduler matching info for publishing environment variables export_match_env_list, match_env_name_value_map = \ JobRunnerUtils.assign_matched_resources_to_run_and_generate_envs( - run_id, export_cmd, job_args.scheduler_match_info + run_id, export_cmd, scheduler_match_info ) # Replace entry commands with environment variable values @@ -499,7 +501,7 @@ def generate_job_execute_commands(run_id, edge_id, version, package_type, entry_ ) # Replace entry arguments with environment variable values - entry_args = JobRunnerUtils.replace_entry_args_with_env_variable(job_args.entry_args, config_env_name_value_map) + entry_args = JobRunnerUtils.replace_entry_args_with_env_variable(entry_args, config_env_name_value_map) entry_args = JobRunnerUtils.replace_entry_args_with_env_variable(entry_args, match_env_name_value_map) # Add the export env list to the entry commands @@ -514,13 +516,13 @@ def generate_job_execute_commands(run_id, edge_id, version, package_type, entry_ entry_commands.insert(0, f"{export_cmd} FEDML_CURRENT_VERSION={version}\n") entry_commands.insert(0, f"{export_cmd} FEDML_ENV_VERSION={version}\n") entry_commands.insert(0, f"{export_cmd} FEDML_USING_MLOPS=true\n") - entry_commands.insert(0, f"{export_cmd} FEDML_CLIENT_RANK={job_args.client_rank}\n") + entry_commands.insert(0, f"{export_cmd} FEDML_CLIENT_RANK={client_rank}\n") entry_commands.insert(0, f"{export_cmd} FEDML_ENV_LOCAL_ON_PREMISE_PLATFORM_HOST={fedml.get_local_on_premise_platform_host()}\n") entry_commands.insert(0, f"{export_cmd} FEDML_ENV_LOCAL_ON_PREMISE_PLATFORM_PORT={fedml.get_local_on_premise_platform_port()}\n") - if job_args.job_api_key is not None and str(job_args.job_api_key).strip() != "": - random_out = sys_utils.random2(job_args.job_api_key, "FEDML@88119999GREAT") + if job_api_key is not None and str(job_api_key).strip() != "": + random_out = sys_utils.random2(job_api_key, "FEDML@88119999GREAT") random_list = random_out.split("FEDML_NEXUS@") entry_commands.insert(0, f"{export_cmd} FEDML_RUN_API_KEY={random_list[1]}\n") @@ -562,7 +564,7 @@ def generate_job_execute_commands(run_id, edge_id, version, package_type, entry_ entry_file_handle.close() # Generate the shell commands to be executed - shell_cmd_list.append(f"{job_args.executable_interpreter} {entry_file_full_path}") + shell_cmd_list.append(f"{executable_interpreter} {entry_file_full_path}") return shell_cmd_list diff --git a/python/fedml/computing/scheduler/master/server_runner.py b/python/fedml/computing/scheduler/master/server_runner.py index 8054600a26..238349a3e4 100755 --- a/python/fedml/computing/scheduler/master/server_runner.py +++ b/python/fedml/computing/scheduler/master/server_runner.py @@ -28,7 +28,7 @@ from ..comm_utils.job_cleanup import JobCleanup from ..scheduler_core.scheduler_matcher import SchedulerMatcher from ..comm_utils.constants import SchedulerConstants -from ..comm_utils.job_utils import JobRunnerUtils, JobArgs +from ..comm_utils.job_utils import JobRunnerUtils from ..comm_utils.run_process_utils import RunProcessUtils from ....core.mlops.mlops_runtime_log import MLOpsRuntimeLog @@ -902,11 +902,24 @@ def should_continue_run_job(self, run_id): return True def execute_job_task(self, entry_file_full_path, conf_file_full_path, run_id): - job_args = JobArgs(request_json=self.request_json, - conf_file_object=load_yaml_config(conf_file_full_path), - client_rank=0) + run_config = self.request_json["run_config"] + run_params = run_config.get("parameters", {}) + job_yaml = run_params.get("job_yaml", {}) + job_yaml_default_none = run_params.get("job_yaml", None) + job_api_key = job_yaml.get("run_api_key", None) + job_api_key = job_yaml.get("fedml_run_dynamic_params", None) if job_api_key is None else job_api_key + assigned_gpu_ids = run_params.get("gpu_ids", None) + framework_type = job_yaml.get("framework_type", None) + job_type = job_yaml.get("job_type", None) + job_type = job_yaml.get("task_type", Constants.JOB_TASK_TYPE_TRAIN) if job_type is None else job_type + conf_file_object = load_yaml_config(conf_file_full_path) + entry_args_dict = conf_file_object.get("fedml_entry_args", {}) + entry_args = entry_args_dict.get("arg_items", None) + + executable_interpreter = ClientConstants.CLIENT_SHELL_PS \ + if platform.system() == ClientConstants.PLATFORM_WINDOWS else ClientConstants.CLIENT_SHELL_BASH - if job_args.job_yaml_default_none is None: + if job_yaml_default_none is None: # Generate the job executing commands for previous federated learning (Compatibility) python_program = get_python_program() logging.info("Run the server: {} {} --cf {} --rank 0 --role server".format( @@ -929,7 +942,9 @@ def execute_job_task(self, entry_file_full_path, conf_file_full_path, run_id): # Generate the job executing commands job_executing_commands = JobRunnerUtils.generate_job_execute_commands( run_id=self.run_id, edge_id=self.edge_id, version=self.version, package_type=self.package_type, - entry_file_full_path=entry_file_full_path, job_args=job_args) + executable_interpreter=executable_interpreter, entry_file_full_path=entry_file_full_path, + conf_file_object=conf_file_object, entry_args=entry_args, assigned_gpu_ids=assigned_gpu_ids, + job_api_key=job_api_key, client_rank=0) # Run the job executing commands logging.info(f"Run the server job with job id {self.run_id}, device id {self.edge_id}.") diff --git a/python/fedml/computing/scheduler/slave/client_runner.py b/python/fedml/computing/scheduler/slave/client_runner.py index a78b23fc0c..129989e439 100755 --- a/python/fedml/computing/scheduler/slave/client_runner.py +++ b/python/fedml/computing/scheduler/slave/client_runner.py @@ -560,7 +560,6 @@ def run_impl(self): conf_file_object=load_yaml_config(conf_file_full_path), fedml_config_object=fedml_config_object) - is_run_ok = False if job_args.job_type == Constants.JOB_TASK_TYPE_TRAIN: process, is_launch_task, error_list = self.execute_train_job_task(job_args=job_args, unzip_package_path=unzip_package_path, @@ -573,35 +572,35 @@ def run_impl(self): dynamic_args_config=dynamic_args_config, fedml_config_object=self.fedml_config_object) - logging.info("====Your Run Logs End===") - logging.info(" ") - logging.info(" ") - - ret_code, out, err = process.returncode if process else None, None, None - is_run_ok = sys_utils.is_runner_finished_normally(process.pid) - if is_launch_task: - is_run_ok = True - if error_list is not None and len(error_list) > 0: - is_run_ok = False - if ret_code is None or ret_code <= 0: - self.check_runner_stop_event() + logging.info("====Your Run Logs End===") + logging.info(" ") + logging.info(" ") + + ret_code, out, err = process.returncode if process else None, None, None + is_run_ok = sys_utils.is_runner_finished_normally(process.pid) + if is_launch_task: + is_run_ok = True + if error_list is not None and len(error_list) > 0: + is_run_ok = False + if ret_code is None or ret_code <= 0: + self.check_runner_stop_event() - if is_run_ok: - if out is not None: - out_str = sys_utils.decode_our_err_result(out) - if out_str != "": - logging.info("{}".format(out_str)) + if is_run_ok: + if out is not None: + out_str = sys_utils.decode_our_err_result(out) + if out_str != "": + logging.info("{}".format(out_str)) - self.mlops_metrics.report_client_id_status( - self.edge_id, ClientConstants.MSG_MLOPS_CLIENT_STATUS_FINISHED, - server_id=self.server_id, run_id=run_id) + self.mlops_metrics.report_client_id_status( + self.edge_id, ClientConstants.MSG_MLOPS_CLIENT_STATUS_FINISHED, + server_id=self.server_id, run_id=run_id) - if is_launch_task: - sys_utils.log_return_info(f"job {run_id}", ret_code) - else: - sys_utils.log_return_info(entry_file, ret_code) - else: - is_run_ok = False + if is_launch_task: + sys_utils.log_return_info(f"job {run_id}", ret_code) + else: + sys_utils.log_return_info(entry_file, ret_code) + else: + is_run_ok = False if not is_run_ok: # If the run status is killed or finished, then return with the normal state. @@ -666,17 +665,23 @@ def execute_train_job_task(self, job_args: JobArgs, unzip_package_path, entry_fi return process, is_launch_task, error_list - def execute_job_task(self, job_args: JobArgs, unzip_package_path, entry_file_full_path, conf_file_full_path, dynamic_args_config, + def execute_job_task(self, unzip_package_path, entry_file_full_path, conf_file_full_path, dynamic_args_config, fedml_config_object): + run_config = self.request_json["run_config"] + run_params = run_config.get("parameters", {}) + job_yaml = run_params.get("job_yaml", {}) + job_yaml_default_none = run_params.get("job_yaml", None) + job_type = job_yaml.get("job_type", None) + # TODO: Can we remove task_type? + job_type = job_yaml.get("task_type", Constants.JOB_TASK_TYPE_TRAIN) if job_type is None else job_type # Bootstrap Info env_args = fedml_config_object.get("environment_args", None) bootstrap_cmd_list, bootstrap_script_file = JobRunnerUtils.generate_bootstrap_commands(env_args, unzip_package_path) - # if not containerize: - if len(bootstrap_cmd_list) and not (job_args.job_type == Constants.JOB_TASK_TYPE_DEPLOY or - job_args.job_type == Constants.JOB_TASK_TYPE_SERVE): + if len(bootstrap_cmd_list) and not (job_type == Constants.JOB_TASK_TYPE_DEPLOY or + job_type == Constants.JOB_TASK_TYPE_SERVE): bootstrapping_successful = self.run_bootstrap_script(bootstrap_cmd_list=bootstrap_cmd_list, bootstrap_script_file=bootstrap_script_file) @@ -692,7 +697,7 @@ def execute_job_task(self, job_args: JobArgs, unzip_package_path, entry_file_ful ClientConstants.cleanup_bootstrap_process(self.request_json["runId"]) # Generate the job executing commands for previous federated learning (Compatibility) - if job_args.job_yaml_default_none is None: + if job_yaml_default_none is None: python_program = get_python_program() logging.info("Run the client: {} {} --cf {} --rank {} --role client".format( python_program, entry_file_full_path, conf_file_full_path, str(dynamic_args_config.get("rank", 1)))) @@ -705,7 +710,6 @@ def execute_job_task(self, job_args: JobArgs, unzip_package_path, entry_file_ful process, error_list = ClientConstants.execute_commands_with_live_logs( shell_cmd_list, callback=self.callback_start_fl_job, should_write_log_file=False) is_launch_task = False - return process, is_launch_task, error_list def callback_start_fl_job(self, job_pid): From 2c115d6ce62c5fbea6ff542c1e8e322007cf343f Mon Sep 17 00:00:00 2001 From: Alay Shah Date: Sun, 5 May 2024 19:12:59 -0700 Subject: [PATCH 03/15] Making Singleton Thread-Safe --- python/fedml/core/common/singleton.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/python/fedml/core/common/singleton.py b/python/fedml/core/common/singleton.py index 69ee009155..4c206e9ba5 100644 --- a/python/fedml/core/common/singleton.py +++ b/python/fedml/core/common/singleton.py @@ -1,6 +1,16 @@ +import threading + + class Singleton(object): + _instance = None + # For thread safety + _lock = threading.Lock() + def __new__(cls, *args, **kw): - if not hasattr(cls, "_instance"): - orig = super(Singleton, cls) - cls._instance = orig.__new__(cls, *args, **kw) + if cls._instance is None: + with cls._lock: + # Another thread could have created the instance before we acquired the lock. So check that the + # instance is still nonexistent. + if cls._instance is None: + cls._instance = super().__new__(cls) return cls._instance From 64b0f05a6ccc6d7cf08480e48528130fccf259b9 Mon Sep 17 00:00:00 2001 From: Alay Shah Date: Mon, 6 May 2024 14:45:18 -0700 Subject: [PATCH 04/15] Bug fix --- python/fedml/computing/scheduler/slave/client_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/fedml/computing/scheduler/slave/client_runner.py b/python/fedml/computing/scheduler/slave/client_runner.py index 129989e439..e089fe3ead 100755 --- a/python/fedml/computing/scheduler/slave/client_runner.py +++ b/python/fedml/computing/scheduler/slave/client_runner.py @@ -558,7 +558,7 @@ def run_impl(self): job_args = JobArgs(request_json=self.request_json, conf_file_object=load_yaml_config(conf_file_full_path), - fedml_config_object=fedml_config_object) + fedml_config_object=self.fedml_config_object) if job_args.job_type == Constants.JOB_TASK_TYPE_TRAIN: process, is_launch_task, error_list = self.execute_train_job_task(job_args=job_args, From b29d4e3ecff7ece7707947770a3d508c8f963af0 Mon Sep 17 00:00:00 2001 From: Alay Shah Date: Tue, 7 May 2024 16:10:04 -0700 Subject: [PATCH 05/15] Create container --- .../scheduler/slave/client_runner.py | 63 +++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/python/fedml/computing/scheduler/slave/client_runner.py b/python/fedml/computing/scheduler/slave/client_runner.py index e089fe3ead..3d933a8052 100755 --- a/python/fedml/computing/scheduler/slave/client_runner.py +++ b/python/fedml/computing/scheduler/slave/client_runner.py @@ -17,11 +17,13 @@ import zipfile from urllib.parse import urljoin, urlparse +import docker.types import requests from fedml.computing.scheduler.comm_utils import job_utils import fedml from ..comm_utils.constants import SchedulerConstants +from ..comm_utils.container_utils import ContainerUtils from ..comm_utils.job_cleanup import JobCleanup from ..comm_utils.job_utils import JobRunnerUtils, DockerArgs, JobArgs from ..comm_utils.run_process_utils import RunProcessUtils @@ -665,6 +667,67 @@ def execute_train_job_task(self, job_args: JobArgs, unzip_package_path, entry_fi return process, is_launch_task, error_list + def create_docker_container(self, job_args: JobArgs, docker_args: DockerArgs, + unzip_package_path: str, entry_file_full_path: str, + bootstrap_cmd_list, image_pull_policy: str = None): + + docker_client = JobRunnerUtils.get_docker_client(docker_args=job_args.docker_args) + ContainerUtils.get_instance().pull_image_with_policy(image_pull_policy, job_args.docker_args.image, + client=docker_client) + + container_name = JobRunnerUtils.get_run_container_name(self.run_id) + JobRunnerUtils.remove_run_container_if_exists(container_name, docker_client) + device_requests = [] + volumes = [] + binds = {} + environment = {"MAIN_ENTRY": entry_file_full_path} + destination_launch_dir = "/home/fedml/launch" + + # Generate the bootstrap commands + auto_gen_bootstrap_file_name = "fedml-launch-bootstrap-auto-gen.sh" + if bootstrap_cmd_list: + bootstrap_script_file = os.path.join(unzip_package_path, auto_gen_bootstrap_file_name) + with open(bootstrap_script_file, "w") as f: + f.write("#!/bin/bash\n") + f.write("set -e\n") + f.write(f"cd {destination_launch_dir}\n") + f.write("\n".join(bootstrap_cmd_list)) + destination_bootstrap_dir = os.path.join(destination_launch_dir, auto_gen_bootstrap_file_name) + environment["BOOTSTRAP_DIR"] = destination_bootstrap_dir + + # Source Code Mounting + source_code_dir = os.path.join(unzip_package_path, "fedml") + volumes.append(source_code_dir) + binds[source_code_dir] = { + "bind": destination_launch_dir, + "mode": "rw" + } + + if self.cuda_visible_gpu_ids_str is not None: + gpu_id_list = self.cuda_visible_gpu_ids_str.split(",") + device_requests.append( + docker.types.DeviceRequest(device_ids=gpu_id_list, capabilities=[["gpu"]])) + logging.info(f"device_requests: {device_requests}") + + try: + container = docker_client.api.create_container( + image=docker_args.image, + name=container_name, + remove=True, + tty=True, + host_config=docker_client.api.create_host_config( + binds=binds, + devices=device_requests, + ), + ports=job_args.docker_args.ports, + volumes=volumes, + detach=True # Run container in detached mode + ) + except Exception as e: + logging.error(f"Failed to create docker container with Exception {e}. Traceback: {traceback.format_exc()}") + raise Exception(f"Failed to create docker container with Exception {e}") + return container + def execute_job_task(self, unzip_package_path, entry_file_full_path, conf_file_full_path, dynamic_args_config, fedml_config_object): run_config = self.request_json["run_config"] From 153fdd605e92cf022e454c6105e1c6b6d7455826 Mon Sep 17 00:00:00 2001 From: Alay Shah Date: Tue, 7 May 2024 17:17:15 -0700 Subject: [PATCH 06/15] Nits --- python/fedml/computing/scheduler/slave/client_runner.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/python/fedml/computing/scheduler/slave/client_runner.py b/python/fedml/computing/scheduler/slave/client_runner.py index 3d933a8052..59e500f56a 100755 --- a/python/fedml/computing/scheduler/slave/client_runner.py +++ b/python/fedml/computing/scheduler/slave/client_runner.py @@ -639,6 +639,10 @@ def execute_train_job_task(self, job_args: JobArgs, unzip_package_path, entry_fi bootstrap_cmd_list, bootstrap_script_file = JobRunnerUtils.generate_bootstrap_commands(job_args.env_args, unzip_package_path) + container = self.create_docker_container(job_args=job_args, docker_args=job_args.docker_args, + unzip_package_path=unzip_package_path, + entry_file_full_path=entry_file_full_path, + bootstrap_cmd_list=bootstrap_cmd_list) try: job_executing_commands = JobRunnerUtils.generate_launch_docker_command(docker_args=job_args.docker_args, run_id=self.run_id, @@ -669,10 +673,11 @@ def execute_train_job_task(self, job_args: JobArgs, unzip_package_path, entry_fi def create_docker_container(self, job_args: JobArgs, docker_args: DockerArgs, unzip_package_path: str, entry_file_full_path: str, - bootstrap_cmd_list, image_pull_policy: str = None): + bootstrap_cmd_list): docker_client = JobRunnerUtils.get_docker_client(docker_args=job_args.docker_args) - ContainerUtils.get_instance().pull_image_with_policy(image_pull_policy, job_args.docker_args.image, + ContainerUtils.get_instance().pull_image_with_policy(image_pull_policy=job_args.image_pull_policy, + image_name=job_args.docker_args.image, client=docker_client) container_name = JobRunnerUtils.get_run_container_name(self.run_id) From 91308fe4a8421cfb9aa74c4abe724389999a5d85 Mon Sep 17 00:00:00 2001 From: Alay Shah Date: Tue, 7 May 2024 17:21:51 -0700 Subject: [PATCH 07/15] Bug fix --- python/fedml/computing/scheduler/comm_utils/job_utils.py | 2 +- python/fedml/computing/scheduler/slave/client_runner.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/fedml/computing/scheduler/comm_utils/job_utils.py b/python/fedml/computing/scheduler/comm_utils/job_utils.py index cca1641280..a0c0ecc165 100644 --- a/python/fedml/computing/scheduler/comm_utils/job_utils.py +++ b/python/fedml/computing/scheduler/comm_utils/job_utils.py @@ -427,7 +427,7 @@ def create_instance_from_dict(data_class, input_dict: {}): @staticmethod def generate_bootstrap_commands(env_args, unzip_package_path) -> (List[str], str): - bootstrap_cmd_list, bootstrap_script_path = list() + bootstrap_cmd_list = list() bootstrap_script_path, bootstrap_script_dir, bootstrap_script_file = [None] * 3 if env_args is not None: diff --git a/python/fedml/computing/scheduler/slave/client_runner.py b/python/fedml/computing/scheduler/slave/client_runner.py index 59e500f56a..a581521bc5 100755 --- a/python/fedml/computing/scheduler/slave/client_runner.py +++ b/python/fedml/computing/scheduler/slave/client_runner.py @@ -540,7 +540,7 @@ def run_impl(self): ##### if not os.path.exists(unzip_package_path): - logging.info("failed to unzip file.") + logging.error("Failed to unzip file.") self.check_runner_stop_event() # Send failed msg when exceptions. self.cleanup_run_when_starting_failed(status=ClientConstants.MSG_MLOPS_CLIENT_STATUS_EXCEPTION) From 99f3aee5007081480442916d504e667c6b0d2009 Mon Sep 17 00:00:00 2001 From: Alay Shah Date: Tue, 7 May 2024 17:26:55 -0700 Subject: [PATCH 08/15] Docker File for Launch --- devops/dockerfile/fedml-launch/job/Dockerfile | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 devops/dockerfile/fedml-launch/job/Dockerfile diff --git a/devops/dockerfile/fedml-launch/job/Dockerfile b/devops/dockerfile/fedml-launch/job/Dockerfile new file mode 100644 index 0000000000..b8a4277990 --- /dev/null +++ b/devops/dockerfile/fedml-launch/job/Dockerfile @@ -0,0 +1,27 @@ +# Image Name: fedml/fedml-launch-job:cu11.6 +ARG BASE_IMAGE=fedml/fedml:latest-torch1.13.1-cuda11.6-cudnn8-devel +FROM ${BASE_IMAGE} + +ARG HOME_DIR = /home/fedml +## Only Modify if you want to use a different version of FedML +RUN mkdir -p ${HOME_DIR}/fedml-pip +ADD ./python ${HOME_DIR}/fedml-pip +WORKDIR .${HOME_DIR}/fedml-pip +RUN pip3 install -e ./ + +# 1. Specify Bootrap path (If any) +ENV BOOTSTRAP_DIR="" + +# 2. MOUNT User's Local Folder (If any) +ENV DATA_FOLDER="" +VOLUME [ DATA_FOLDER ] + +# 3. MOUNT Fedml Home Folder +VOLUME ["/home/fedml/launch"] + +# 4. Enter the entrypoint +WORKDIR ${HOME_DIR}/launch +ENV MAIN_ENTRY="" + +# if bootstrap dir is not empty, then run the bootstrap script +CMD /bin/bash ${BOOTSTRAP_DIR}; python3 ${MAIN_ENTRY} From ecd2b77e1126a4697757baca4d3fab4f77e93672 Mon Sep 17 00:00:00 2001 From: Alay Shah Date: Tue, 7 May 2024 17:49:20 -0700 Subject: [PATCH 09/15] Update docker file, and docker image name in constants --- devops/dockerfile/fedml-launch/job/Dockerfile | 8 ++++---- python/fedml/computing/scheduler/comm_utils/constants.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/devops/dockerfile/fedml-launch/job/Dockerfile b/devops/dockerfile/fedml-launch/job/Dockerfile index b8a4277990..5ef1398343 100644 --- a/devops/dockerfile/fedml-launch/job/Dockerfile +++ b/devops/dockerfile/fedml-launch/job/Dockerfile @@ -2,12 +2,12 @@ ARG BASE_IMAGE=fedml/fedml:latest-torch1.13.1-cuda11.6-cudnn8-devel FROM ${BASE_IMAGE} -ARG HOME_DIR = /home/fedml +ARG HOME_DIR=/home/fedml ## Only Modify if you want to use a different version of FedML RUN mkdir -p ${HOME_DIR}/fedml-pip -ADD ./python ${HOME_DIR}/fedml-pip -WORKDIR .${HOME_DIR}/fedml-pip -RUN pip3 install -e ./ +COPY ./python ${HOME_DIR}/fedml-pip +WORKDIR ${HOME_DIR}/fedml-pip +RUN pip3 install ./ # 1. Specify Bootrap path (If any) ENV BOOTSTRAP_DIR="" diff --git a/python/fedml/computing/scheduler/comm_utils/constants.py b/python/fedml/computing/scheduler/comm_utils/constants.py index b1294181bb..d397d8f767 100644 --- a/python/fedml/computing/scheduler/comm_utils/constants.py +++ b/python/fedml/computing/scheduler/comm_utils/constants.py @@ -103,7 +103,7 @@ class SchedulerConstants: RUN_PROCESS_TYPE_BOOTSTRAP_PROCESS = "bootstrap-process" FEDML_DEFAULT_LAUNCH_CONTAINER_PREFIX = "fedml_default_launch_container" - FEDML_DEFAULT_LAUNCH_IMAGE = "fedml/fedml-default-launch:cu12.1-u22.04" + FEDML_DEFAULT_LAUNCH_IMAGE = "fedml/fedml-launch-job:cu11.6" FEDML_DEFAULT_LOG_DIR = ".fedml/fedml-client/fedml/logs" FEDML_DEFAULT_DATA_DIR = ".fedml/fedml-client/fedml/data" From 5dfe7d41b9934962f1beb23d880aa43db90ff929 Mon Sep 17 00:00:00 2001 From: Alay Shah Date: Tue, 7 May 2024 23:30:03 -0700 Subject: [PATCH 10/15] Update Docker, Bootstrapping --- devops/dockerfile/fedml-launch/job/Dockerfile | 8 +++--- .../scheduler/slave/client_runner.py | 27 +++++++++---------- 2 files changed, 16 insertions(+), 19 deletions(-) diff --git a/devops/dockerfile/fedml-launch/job/Dockerfile b/devops/dockerfile/fedml-launch/job/Dockerfile index 5ef1398343..ab15a0f934 100644 --- a/devops/dockerfile/fedml-launch/job/Dockerfile +++ b/devops/dockerfile/fedml-launch/job/Dockerfile @@ -9,8 +9,8 @@ COPY ./python ${HOME_DIR}/fedml-pip WORKDIR ${HOME_DIR}/fedml-pip RUN pip3 install ./ -# 1. Specify Bootrap path (If any) -ENV BOOTSTRAP_DIR="" +# 1. Specify Bootrap file path (If any) +ENV BOOTSTRAP_SCRIPT="" # 2. MOUNT User's Local Folder (If any) ENV DATA_FOLDER="" @@ -23,5 +23,5 @@ VOLUME ["/home/fedml/launch"] WORKDIR ${HOME_DIR}/launch ENV MAIN_ENTRY="" -# if bootstrap dir is not empty, then run the bootstrap script -CMD /bin/bash ${BOOTSTRAP_DIR}; python3 ${MAIN_ENTRY} +# Run Bootstrap Script and Main Entry File +CMD /bin/bash ${BOOTSTRAP_SCRIPT}; python3 ${MAIN_ENTRY} diff --git a/python/fedml/computing/scheduler/slave/client_runner.py b/python/fedml/computing/scheduler/slave/client_runner.py index a581521bc5..97050cdc5b 100755 --- a/python/fedml/computing/scheduler/slave/client_runner.py +++ b/python/fedml/computing/scheduler/slave/client_runner.py @@ -642,7 +642,7 @@ def execute_train_job_task(self, job_args: JobArgs, unzip_package_path, entry_fi container = self.create_docker_container(job_args=job_args, docker_args=job_args.docker_args, unzip_package_path=unzip_package_path, entry_file_full_path=entry_file_full_path, - bootstrap_cmd_list=bootstrap_cmd_list) + bootstrap_script_file=bootstrap_script_file) try: job_executing_commands = JobRunnerUtils.generate_launch_docker_command(docker_args=job_args.docker_args, run_id=self.run_id, @@ -673,12 +673,17 @@ def execute_train_job_task(self, job_args: JobArgs, unzip_package_path, entry_fi def create_docker_container(self, job_args: JobArgs, docker_args: DockerArgs, unzip_package_path: str, entry_file_full_path: str, - bootstrap_cmd_list): + bootstrap_script_file: str = None): docker_client = JobRunnerUtils.get_docker_client(docker_args=job_args.docker_args) - ContainerUtils.get_instance().pull_image_with_policy(image_pull_policy=job_args.image_pull_policy, - image_name=job_args.docker_args.image, - client=docker_client) + logging.info(f"Start pulling the launch job image {job_args.docker_args.image}... " + f"with policy {job_args.image_pull_policy}") + try: + ContainerUtils.get_instance().pull_image_with_policy(image_pull_policy=job_args.image_pull_policy, + image_name=job_args.docker_args.image, + client=docker_client) + except Exception as e: + raise Exception(f"Failed to pull the launch job image {job_args.docker_args.image} with Exception {e}") container_name = JobRunnerUtils.get_run_container_name(self.run_id) JobRunnerUtils.remove_run_container_if_exists(container_name, docker_client) @@ -689,16 +694,8 @@ def create_docker_container(self, job_args: JobArgs, docker_args: DockerArgs, destination_launch_dir = "/home/fedml/launch" # Generate the bootstrap commands - auto_gen_bootstrap_file_name = "fedml-launch-bootstrap-auto-gen.sh" - if bootstrap_cmd_list: - bootstrap_script_file = os.path.join(unzip_package_path, auto_gen_bootstrap_file_name) - with open(bootstrap_script_file, "w") as f: - f.write("#!/bin/bash\n") - f.write("set -e\n") - f.write(f"cd {destination_launch_dir}\n") - f.write("\n".join(bootstrap_cmd_list)) - destination_bootstrap_dir = os.path.join(destination_launch_dir, auto_gen_bootstrap_file_name) - environment["BOOTSTRAP_DIR"] = destination_bootstrap_dir + if bootstrap_script_file is not None: + environment["BOOTSTRAP_SCRIPT"] = bootstrap_script_file # Source Code Mounting source_code_dir = os.path.join(unzip_package_path, "fedml") From 04fa0cb7cd8e9202883d2117eeac251a90a7391a Mon Sep 17 00:00:00 2001 From: Alay Shah Date: Wed, 8 May 2024 12:26:35 -0700 Subject: [PATCH 11/15] Fix Docker File --- devops/dockerfile/fedml-launch/job/Dockerfile | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/devops/dockerfile/fedml-launch/job/Dockerfile b/devops/dockerfile/fedml-launch/job/Dockerfile index ab15a0f934..c6a4ead6b4 100644 --- a/devops/dockerfile/fedml-launch/job/Dockerfile +++ b/devops/dockerfile/fedml-launch/job/Dockerfile @@ -8,9 +8,6 @@ RUN mkdir -p ${HOME_DIR}/fedml-pip COPY ./python ${HOME_DIR}/fedml-pip WORKDIR ${HOME_DIR}/fedml-pip RUN pip3 install ./ - -# 1. Specify Bootrap file path (If any) -ENV BOOTSTRAP_SCRIPT="" # 2. MOUNT User's Local Folder (If any) ENV DATA_FOLDER="" @@ -19,9 +16,10 @@ VOLUME [ DATA_FOLDER ] # 3. MOUNT Fedml Home Folder VOLUME ["/home/fedml/launch"] -# 4. Enter the entrypoint +# 4. Enter the bootstrap and job entrypoints WORKDIR ${HOME_DIR}/launch -ENV MAIN_ENTRY="" +ENV BOOTSTRAP_SCRIPT="fedml_bootstrap_generated.sh" +ENV MAIN_ENTRY="fedml_job_entry_pack.sh'" # Run Bootstrap Script and Main Entry File -CMD /bin/bash ${BOOTSTRAP_SCRIPT}; python3 ${MAIN_ENTRY} +CMD /bin/bash ${BOOTSTRAP_SCRIPT}; /bin/bash ${MAIN_ENTRY} \ No newline at end of file From 28aa266ea07db27d88f6f7fb3743919e3765c7d3 Mon Sep 17 00:00:00 2001 From: Alay Shah Date: Wed, 8 May 2024 13:09:29 -0700 Subject: [PATCH 12/15] Fix Docker File Bug --- devops/dockerfile/fedml-launch/job/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/devops/dockerfile/fedml-launch/job/Dockerfile b/devops/dockerfile/fedml-launch/job/Dockerfile index c6a4ead6b4..0cca3b5b95 100644 --- a/devops/dockerfile/fedml-launch/job/Dockerfile +++ b/devops/dockerfile/fedml-launch/job/Dockerfile @@ -19,7 +19,7 @@ VOLUME ["/home/fedml/launch"] # 4. Enter the bootstrap and job entrypoints WORKDIR ${HOME_DIR}/launch ENV BOOTSTRAP_SCRIPT="fedml_bootstrap_generated.sh" -ENV MAIN_ENTRY="fedml_job_entry_pack.sh'" +ENV MAIN_ENTRY="fedml_job_entry_pack.sh" # Run Bootstrap Script and Main Entry File CMD /bin/bash ${BOOTSTRAP_SCRIPT}; /bin/bash ${MAIN_ENTRY} \ No newline at end of file From 9b58df47ac52c0693079a85a73e90ac6e9ee596d Mon Sep 17 00:00:00 2001 From: Alay Shah Date: Wed, 8 May 2024 15:29:20 -0700 Subject: [PATCH 13/15] pip install fedml in Docker file instead of copying and building from source --- devops/dockerfile/fedml-launch/job/Dockerfile | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/devops/dockerfile/fedml-launch/job/Dockerfile b/devops/dockerfile/fedml-launch/job/Dockerfile index 0cca3b5b95..7a9717c8aa 100644 --- a/devops/dockerfile/fedml-launch/job/Dockerfile +++ b/devops/dockerfile/fedml-launch/job/Dockerfile @@ -4,11 +4,13 @@ FROM ${BASE_IMAGE} ARG HOME_DIR=/home/fedml ## Only Modify if you want to use a different version of FedML -RUN mkdir -p ${HOME_DIR}/fedml-pip -COPY ./python ${HOME_DIR}/fedml-pip -WORKDIR ${HOME_DIR}/fedml-pip -RUN pip3 install ./ - +RUN pip3 install -U fedml +## Only Uncomment if you want to use a local version of FedML +# RUN mkdir -p ${HOME_DIR}/fedml-pip +# COPY ./python ${HOME_DIR}/fedml-pip +# WORKDIR ${HOME_DIR}/fedml-pip +# RUN pip3 install ./ + # 2. MOUNT User's Local Folder (If any) ENV DATA_FOLDER="" VOLUME [ DATA_FOLDER ] @@ -22,4 +24,4 @@ ENV BOOTSTRAP_SCRIPT="fedml_bootstrap_generated.sh" ENV MAIN_ENTRY="fedml_job_entry_pack.sh" # Run Bootstrap Script and Main Entry File -CMD /bin/bash ${BOOTSTRAP_SCRIPT}; /bin/bash ${MAIN_ENTRY} \ No newline at end of file +CMD /bin/bash ${BOOTSTRAP_SCRIPT}; /bin/bash ${MAIN_ENTRY} From 17c857ad2b0fb9599a062e5cc21516b8fd2a0526 Mon Sep 17 00:00:00 2001 From: alaydshah Date: Wed, 8 May 2024 20:11:42 +0000 Subject: [PATCH 14/15] Fix container creation bugs --- .../scheduler/slave/client_runner.py | 44 ++++++++----------- 1 file changed, 19 insertions(+), 25 deletions(-) diff --git a/python/fedml/computing/scheduler/slave/client_runner.py b/python/fedml/computing/scheduler/slave/client_runner.py index 97050cdc5b..1525024f91 100755 --- a/python/fedml/computing/scheduler/slave/client_runner.py +++ b/python/fedml/computing/scheduler/slave/client_runner.py @@ -639,10 +639,9 @@ def execute_train_job_task(self, job_args: JobArgs, unzip_package_path, entry_fi bootstrap_cmd_list, bootstrap_script_file = JobRunnerUtils.generate_bootstrap_commands(job_args.env_args, unzip_package_path) - container = self.create_docker_container(job_args=job_args, docker_args=job_args.docker_args, + container = self.create_docker_container(docker_args=job_args.docker_args, unzip_package_path=unzip_package_path, - entry_file_full_path=entry_file_full_path, - bootstrap_script_file=bootstrap_script_file) + image_pull_policy=job_args.image_pull_policy) try: job_executing_commands = JobRunnerUtils.generate_launch_docker_command(docker_args=job_args.docker_args, run_id=self.run_id, @@ -671,34 +670,29 @@ def execute_train_job_task(self, job_args: JobArgs, unzip_package_path, entry_fi return process, is_launch_task, error_list - def create_docker_container(self, job_args: JobArgs, docker_args: DockerArgs, - unzip_package_path: str, entry_file_full_path: str, - bootstrap_script_file: str = None): + def create_docker_container(self, docker_args: DockerArgs, + unzip_package_path: str, + image_pull_policy: str = None): - docker_client = JobRunnerUtils.get_docker_client(docker_args=job_args.docker_args) - logging.info(f"Start pulling the launch job image {job_args.docker_args.image}... " - f"with policy {job_args.image_pull_policy}") + docker_client = JobRunnerUtils.get_docker_client(docker_args=docker_args) + logging.info(f"Start pulling the launch job image {docker_args.image}... " + f"with policy {image_pull_policy}") try: - ContainerUtils.get_instance().pull_image_with_policy(image_pull_policy=job_args.image_pull_policy, - image_name=job_args.docker_args.image, - client=docker_client) + ContainerUtils.get_instance().pull_image_with_policy(image_name=docker_args.image, + client=docker_client, + image_pull_policy=image_pull_policy) except Exception as e: - raise Exception(f"Failed to pull the launch job image {job_args.docker_args.image} with Exception {e}") + raise Exception(f"Failed to pull the launch job image {docker_args.image} with Exception {e}") container_name = JobRunnerUtils.get_run_container_name(self.run_id) JobRunnerUtils.remove_run_container_if_exists(container_name, docker_client) device_requests = [] volumes = [] binds = {} - environment = {"MAIN_ENTRY": entry_file_full_path} - destination_launch_dir = "/home/fedml/launch" - - # Generate the bootstrap commands - if bootstrap_script_file is not None: - environment["BOOTSTRAP_SCRIPT"] = bootstrap_script_file # Source Code Mounting source_code_dir = os.path.join(unzip_package_path, "fedml") + destination_launch_dir = "/home/fedml/launch" volumes.append(source_code_dir) binds[source_code_dir] = { "bind": destination_launch_dir, @@ -712,16 +706,16 @@ def create_docker_container(self, job_args: JobArgs, docker_args: DockerArgs, logging.info(f"device_requests: {device_requests}") try: + host_config = docker_client.api.create_host_config( + binds=binds, + device_requests=device_requests, + ) container = docker_client.api.create_container( image=docker_args.image, name=container_name, - remove=True, tty=True, - host_config=docker_client.api.create_host_config( - binds=binds, - devices=device_requests, - ), - ports=job_args.docker_args.ports, + host_config=host_config, + ports=docker_args.ports, volumes=volumes, detach=True # Run container in detached mode ) From 453471637874a9f42fc62e743854cebc526597cf Mon Sep 17 00:00:00 2001 From: Alay Shah Date: Thu, 9 May 2024 11:28:59 -0700 Subject: [PATCH 15/15] Nits --- .../computing/scheduler/comm_utils/job_utils.py | 3 +++ .../fedml/computing/scheduler/slave/client_runner.py | 12 +++++++----- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/python/fedml/computing/scheduler/comm_utils/job_utils.py b/python/fedml/computing/scheduler/comm_utils/job_utils.py index a0c0ecc165..48c5fbc159 100644 --- a/python/fedml/computing/scheduler/comm_utils/job_utils.py +++ b/python/fedml/computing/scheduler/comm_utils/job_utils.py @@ -66,6 +66,9 @@ class DockerArgs: registry: str = "" ports: List[int] = field(default_factory=lambda: [2345]) + def __post_init__(self): + self.client = JobRunnerUtils.get_docker_client(self) + class JobRunnerUtils(Singleton): STATIC_RUN_LOCK_KEY_SUFFIX = "STATIC" diff --git a/python/fedml/computing/scheduler/slave/client_runner.py b/python/fedml/computing/scheduler/slave/client_runner.py index 1525024f91..f188e707f3 100755 --- a/python/fedml/computing/scheduler/slave/client_runner.py +++ b/python/fedml/computing/scheduler/slave/client_runner.py @@ -2,6 +2,7 @@ import logging import multiprocessing import sys +from datetime import datetime from multiprocessing import Process import os @@ -642,6 +643,7 @@ def execute_train_job_task(self, job_args: JobArgs, unzip_package_path, entry_fi container = self.create_docker_container(docker_args=job_args.docker_args, unzip_package_path=unzip_package_path, image_pull_policy=job_args.image_pull_policy) + try: job_executing_commands = JobRunnerUtils.generate_launch_docker_command(docker_args=job_args.docker_args, run_id=self.run_id, @@ -674,18 +676,17 @@ def create_docker_container(self, docker_args: DockerArgs, unzip_package_path: str, image_pull_policy: str = None): - docker_client = JobRunnerUtils.get_docker_client(docker_args=docker_args) logging.info(f"Start pulling the launch job image {docker_args.image}... " f"with policy {image_pull_policy}") try: ContainerUtils.get_instance().pull_image_with_policy(image_name=docker_args.image, - client=docker_client, + client=docker_args.client, image_pull_policy=image_pull_policy) except Exception as e: raise Exception(f"Failed to pull the launch job image {docker_args.image} with Exception {e}") container_name = JobRunnerUtils.get_run_container_name(self.run_id) - JobRunnerUtils.remove_run_container_if_exists(container_name, docker_client) + JobRunnerUtils.remove_run_container_if_exists(container_name, docker_args.client) device_requests = [] volumes = [] binds = {} @@ -706,11 +707,11 @@ def create_docker_container(self, docker_args: DockerArgs, logging.info(f"device_requests: {device_requests}") try: - host_config = docker_client.api.create_host_config( + host_config = docker_args.client.api.create_host_config( binds=binds, device_requests=device_requests, ) - container = docker_client.api.create_container( + container = docker_args.client.api.create_container( image=docker_args.image, name=container_name, tty=True, @@ -719,6 +720,7 @@ def create_docker_container(self, docker_args: DockerArgs, volumes=volumes, detach=True # Run container in detached mode ) + docker_args.client.api.start(container=container.get("Id")) except Exception as e: logging.error(f"Failed to create docker container with Exception {e}. Traceback: {traceback.format_exc()}") raise Exception(f"Failed to create docker container with Exception {e}")