diff --git a/.gitignore b/.gitignore index f72077d61..e827bd602 100644 --- a/.gitignore +++ b/.gitignore @@ -133,9 +133,13 @@ dmypy.json # Pyre type checker .pyre/ +# SMAC Logs *smac3-output_* *smac3_output* +# Dask Logs +tmp/smac_dask_slurm + # macOS files .DS_Store @@ -150,4 +154,5 @@ src .vscode projects -_api \ No newline at end of file +_api +branin.pkl diff --git a/CHANGELOG.md b/CHANGELOG.md index 2a357ade9..4888af5bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ ## Examples - Add target function with additional arguments (#1134) +- Adapt parallelization example (#1214) ## Improvements - Submit trials to runners in SMBO instead of running configs directly (#937) @@ -19,6 +20,7 @@ ## Documentation - Ask and tell without initial design and warmstarting +- Add a description of parallelization in the documentation (#1226) ## Bugfixes - Ask and tell without initial design may no longer return a config from the initial design - if it is not "removed". diff --git a/docs/advanced_usage/9_parallelism.md b/docs/advanced_usage/9_parallelism.md index 640c8f87f..828a3461d 100644 --- a/docs/advanced_usage/9_parallelism.md +++ b/docs/advanced_usage/9_parallelism.md @@ -1,41 +1,76 @@ # Parallelism -SMAC supports multiple workers natively via Dask. Just specify ``n_workers`` in the scenario and you are ready to go. - +To facilitate parallel execution, SMAC supports executing multiple workers simultaneously via [Dask](https://www.dask.org/). Using this functionality, splits SMAC into a main process, and DASK workers which handle the execution. +The main job handles the optimization process, and coordinates the executor jobs. The executors are queried with the target function and hyperparameter configurations, execute them, and return their result. The executors remain open between different executions. !!! note Please keep in mind that additional workers are only used to evaluate trials. The main thread still orchestrates the optimization process, including training the surrogate model. - !!! warning - Using high number of workers when the target function evaluation is fast might be counterproductive due to the - overhead of communcation. Consider using only one worker in this case. - + When using multiple workers, SMAC is not reproducible anymore. -!!! warning - When using multiple workers, SMAC is not reproducible anymore. +## Parallelizing Locally +To utilize parallelism locally, that means running workers on the same machine as the main jobs, specify the ``n_workers`` keyword when creating the scenario. +```python +Scenario(model.configspace, n_workers=5) +``` -## Running on a Cluster -You can also pass a custom dask client, e.g. to run on a slurm cluster. -See our [parallelism example](../examples/1%20Basics/7_parallelization_cluster.md). +## Parallelizing on SLURM -!!! warning +To utilize this split of main and execution jobs on a [SLURM cluster](https://slurm.schedmd.com/), SMAC supports manually specifying a [Dask](https://www.dask.org/) client. +This allows executing the target function on dedicated SLURM jobs that are necessarily configured with the same hardware requirements,. - On some clusters you cannot spawn new jobs when running a SLURMCluster inside a - job instead of on the login node. No obvious errors might be raised but it can hang silently. +!!! note -!!! warning + While most SLURM clusters behave similarly, the example DASK client might not work for every cluster. For example, some clusters only allow spawning new jobs + from the login node. - Sometimes you need to modify your launch command which can be done with - `SLURMCluster.job_class.submit_command`. +To configure SMAC properly for each cluster, you need to know the ports which allow communication between main and worker jobs. The dask client is then created as follows: ```python -cluster.job_cls.submit_command = submit_command -cluster.job_cls.cancel_command = cancel_command -``` \ No newline at end of file +... +from smac import BlackBoxFacade, Scenario +from dask_jobqueue import SLURMCluster + +cluster = SLURMCluster( + queue="partition_name", # Name of the partition + cores=4, # CPU cores requested + memory="4 GB", # RAM requested + walltime="00:10:00", # Walltime limit for a runner job. + processes=1, # Number of processes per worker + log_directory="tmp/smac_dask_slurm", # Logging directory + nanny=False, # False unless you want to use pynisher + worker_extra_args=[ + "--worker-port", # Worker port range + "60010:60100"], # Worker port range + scheduler_options={ + "port": 60001, # Main Job Port + }, +) +cluster.scale(jobs=n_workers) + +# Dask creates n_workers jobs on the cluster which stay open. +client = Client( + address=cluster, +) + +#Dask waits for n_workers workers to be created +client.wait_for_workers(n_workers) + +# Now we use SMAC to find the best hyperparameters +smac = BlackBoxFacade( + scenario, # Pass scenario + model.train, # Pass Pass target-function + overwrite=True, # Overrides any previous result + dask_client=client, # Pass dask_client +) +incumbent = smac.optimize() +``` + +The full example of this code is given in [parallelism example](../examples/1%20Basics/7_parallelization_cluster.md). \ No newline at end of file diff --git a/examples/1_basics/7_parallelization_cluster.py b/examples/1_basics/7_parallelization_cluster.py index 0a8eb3a70..42b6316ec 100644 --- a/examples/1_basics/7_parallelization_cluster.py +++ b/examples/1_basics/7_parallelization_cluster.py @@ -1,66 +1,31 @@ """Parallelization on Cluster -An example of applying SMAC to optimize Branin using parallelization via Dask client on a +An example of applying SMAC to optimize Branin using parallelization via Dask client on a SLURM cluster. If you do not want to use a cluster but your local machine, set dask_client to `None` and pass `n_workers` to the `Scenario`. - -Sometimes, the submitted jobs by the slurm client might be cancelled once it starts. In that -case, you could try to start your job from a computing node - -:warning: On some clusters you cannot spawn new jobs when running a SLURMCluster inside a -job instead of on the login node. No obvious errors might be raised but it can hang silently. - -Sometimes you need to modify your launch command which can be done with -`SLURMCluster.job_class.submit_command`. - -```python -cluster.job_cls.submit_command = submit_command -cluster.job_cls.cancel_command = cancel_command -``` - -Here we optimize the synthetic 2d function Branin. -We use the black-box facade because it is designed for black-box function optimization. -The black-box facade uses a [Gaussian Process][GP] as its surrogate model. -The facade works best on a numerical hyperparameter configuration space and should not -be applied to problems with large evaluation budgets (up to 1000 evaluations). """ -import numpy as np -from ConfigSpace import Configuration, ConfigurationSpace, Float from dask.distributed import Client from dask_jobqueue import SLURMCluster - from smac import BlackBoxFacade, Scenario __copyright__ = "Copyright 2025, Leibniz University Hanover, Institute of AI" __license__ = "3-clause BSD" +import numpy as np +from ConfigSpace import ConfigurationSpace, Float +from ConfigSpace import Configuration # for type hints -class Branin(object): - @property - def configspace(self) -> ConfigurationSpace: - cs = ConfigurationSpace(seed=0) +class Branin: + def __init__(self, seed: int = 0): + cs = ConfigurationSpace(seed=seed) x0 = Float("x0", (-5, 10), default=-5, log=False) x1 = Float("x1", (0, 15), default=2, log=False) cs.add([x0, x1]) - return cs + self.cs = cs def train(self, config: Configuration, seed: int = 0) -> float: - """Branin function - - Parameters - ---------- - config : Configuration - Contains two continuous hyperparameters, x0 and x1 - seed : int, optional - Not used, by default 0 - - Returns - ------- - float - Branin function value - """ x0 = config["x0"] x1 = config["x1"] a = 1.0 @@ -69,39 +34,40 @@ def train(self, config: Configuration, seed: int = 0) -> float: r = 6.0 s = 10.0 t = 1.0 / (8.0 * np.pi) - ret = a * (x1 - b * x0**2 + c * x0 - r) ** 2 + s * (1 - t) * np.cos(x0) + s - - return ret - + return a * (x1 - b * x0**2 + c * x0 - r) ** 2 \ + + s * (1 - t) * np.cos(x0) + s if __name__ == "__main__": model = Branin() # Scenario object specifying the optimization "environment" - scenario = Scenario(model.configspace, deterministic=True, n_trials=100, trial_walltime_limit=100) + scenario = Scenario( + model.cs, + deterministic=True, + n_trials=100, + trial_walltime_limit=100, + n_workers=5, + ) - # Create cluster - n_workers = 4 # Use 4 workers on the cluster + n_workers = 5 # Use 5 workers on the cluster # Please note that the number of workers is directly set in the # cluster / client. `scenario.n_workers` is ignored in this case. cluster = SLURMCluster( - # This is the partition of our slurm cluster. - queue="cpu_short", - # Your account name - # account="myaccount", - cores=1, - memory="1 GB", - # Walltime limit for each worker. Ensure that your function evaluations - # do not exceed this limit. - # More tips on this here: https://jobqueue.dask.org/en/latest/advanced-tips-and-tricks.html#how-to-handle-job-queueing-system-walltime-killing-workers - walltime="00:10:00", - processes=1, - log_directory="tmp/smac_dask_slurm", - # if you would like to limit the resources consumption of each function evaluation with pynisher, you need to - # set nanny as False - # Otherwise, an error `daemonic processes are not allowed to have children` will raise! - nanny=False, # if you do not use pynisher to limit the memory/time usage, feel free to set this one as True + queue="partition_name", # Name of the partition + cores=4, # CPU cores requested + memory="4 GB", # RAM requested + walltime="00:10:00", # Walltime limit for a runner job. + processes=1, # Number of processes per worker + log_directory="tmp/smac_dask_slurm", # Logging directory + nanny=False, # False unless you want to use pynisher + worker_extra_args=[ + "--worker-port", # Worker port range + "60010:60100"], # Worker port range + scheduler_options={ + "port": 60001, # Main Job Port + }, + # account="myaccount", # Account name on the cluster (optional) ) cluster.scale(jobs=n_workers) @@ -110,8 +76,7 @@ def train(self, config: Configuration, seed: int = 0) -> float: client = Client( address=cluster, ) - # Instead, you can also do - # client = cluster.get_client() + client.wait_for_workers(n_workers) # Now we use SMAC to find the best hyperparameters smac = BlackBoxFacade( @@ -120,13 +85,5 @@ def train(self, config: Configuration, seed: int = 0) -> float: overwrite=True, # Overrides any previous results that are found that are inconsistent with the meta-data dask_client=client, ) - incumbent = smac.optimize() - - # Get cost of default configuration - default_cost = smac.validate(model.configspace.get_default_configuration()) - print(f"Default cost: {default_cost}") - - # Let's calculate the cost of the incumbent - incumbent_cost = smac.validate(incumbent) - print(f"Incumbent cost: {incumbent_cost}") + print(f"Best configuration found: {incumbent}") \ No newline at end of file