Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,5 @@ venv.bak/
# Version file
src/vivarium_cluster_tools/_version.py

# macOS
*.DS_Store
1 change: 1 addition & 0 deletions docs/source/api_reference/psimulate/performance_logger.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.. automodule:: vivarium_cluster_tools.psimulate.performance_logger
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.. automodule:: vivarium_cluster_tools.psimulate.worker.load_test_work_horse
23 changes: 1 addition & 22 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@
# documentation root, use os.path.abspath to make it absolute, like shown here.
from pathlib import Path

from docutils.nodes import Text
from sphinx.ext.intersphinx import missing_reference

import vivarium_cluster_tools

base_dir = Path(vivarium_cluster_tools.__file__).parent
Expand Down Expand Up @@ -53,6 +50,7 @@
# ones.
extensions = [
"sphinx.ext.autodoc",
"sphinx_autodoc_typehints",
"sphinx.ext.intersphinx",
"sphinx.ext.doctest",
"sphinx.ext.todo",
Expand Down Expand Up @@ -236,22 +234,3 @@
dtype, target = line.split(None, 1)
target = target.strip()
nitpick_ignore.append((dtype, target))


# Fix sphinx warnings when for literal Ellipses in type hints.
def setup(app):
app.connect("missing-reference", __sphinx_issue_8127)


def __sphinx_issue_8127(app, env, node, contnode):
reftarget = node.get("reftarget", None)
if reftarget == "..":
node["reftype"] = "data"
node["reftarget"] = "Ellipsis"
text_node = next(iter(contnode.traverse(lambda n: n.tagname == "#text")))
replacement_node = Text("...", "")
if text_node.parent is not None:
text_node.parent.replace(text_node, replacement_node)
else: # e.g. happens in rtype fields
contnode = replacement_node
return missing_reference(app, env, node, contnode)
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
"sphinx>=4.0",
"sphinx-rtd-theme",
"sphinx-click",
"sphinx-autodoc-typehints",
"IPython",
"matplotlib",
]
Expand Down
2 changes: 2 additions & 0 deletions src/vivarium_cluster_tools/cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
================

"""

from pathlib import Path
from typing import Callable, List, Optional

Expand Down Expand Up @@ -49,6 +50,7 @@ class MinutesOrNone(click.ParamType):

def convert(self, value: str, param: str, ctx: click.Context) -> Optional[float]:
"""Converts the value to float seconds from minutes.

If conversion fails, calls the `fail` method from `click.ParamType`.
"""
try:
Expand Down
39 changes: 25 additions & 14 deletions src/vivarium_cluster_tools/psimulate/branches.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
Tools for managing the parameter space of a parallel run.

"""

from itertools import product
from pathlib import Path
from typing import Dict, Iterator, List, Optional, Tuple, Union
Expand Down Expand Up @@ -120,23 +121,24 @@ def __len__(self) -> int:
def calculate_input_draws(
input_draw_count: int, existing_draws: List[int] = None
) -> List[int]:
"""Determines a random sample of the GBD input draws to use given a draw
count and any existing draws.
"""Determines a random sample of the GBD input draws to use.

Notes
-----
The input draws returned account for the draw count provided and any
already-used draws.

Parameters
----------
input_draw_count
The number of draws to pull.

existing_draws
Any draws that have already been pulled and should not be pulled again.

Returns
-------
List[int]
A set of unique input draw numbers, guaranteed not to overlap with any
existing draw numbers.

"""
max_draw_count = NUMBER_OF_DRAWS
if input_draw_count > max_draw_count:
Expand All @@ -162,21 +164,18 @@ def calculate_input_draws(
def calculate_random_seeds(
random_seed_count: int, existing_seeds: List[int] = None
) -> List[int]:
"""Generates random seeds to use given a count of seeds and any existing
seeds.
"""Generates random seeds to use given a count of seeds and any existing seeds.

Parameters
----------
random_seed_count
The number of random seeds to generate.

existing_seeds
Any random seeds that have already been generated and should not be
generated again.

Returns
-------
List[int]
A set of unique random seeds, guaranteed not to overlap with any
existing random seeds.

Expand Down Expand Up @@ -249,7 +248,7 @@ def _check_count_and_values(
count_name: str,
values_name: str,
max_count: int,
):
) -> None:
"""Checks input configuration count and values for integers outside of range.

Parameters
Expand Down Expand Up @@ -284,7 +283,8 @@ def _check_count_and_values(


def expand_branch_templates(templates: Dict) -> List[Dict]:
"""
"""Expand branch template lists into individual branches.

Take a list of dictionaries of configuration values (like the ones used in
experiment branch configurations) and expand it by taking any values which
are lists and creating a new set of branches which is made up of the
Expand All @@ -309,6 +309,14 @@ def expand_branch_templates(templates: Dict) -> List[Dict]:
{'a': {'b': 2, 'c': 3, 'd': 6}}
]

Parameters
----------
templates
A dictionary of configuration values that may contain lists.

Returns
-------
A list of dictionaries, each representing a single branch configuration.
"""
expanded_branches = []

Expand Down Expand Up @@ -351,7 +359,7 @@ def expand_branch_templates(templates: Dict) -> List[Dict]:
return final_branches


def validate_artifact_path(artifact_path: str) -> str:
def validate_artifact_path(artifact_path: str) -> None:
"""Validates that the path to the data artifact from the branches file exists.

The path specified in the configuration must be absolute
Expand All @@ -361,10 +369,13 @@ def validate_artifact_path(artifact_path: str) -> str:
artifact_path
The path to the artifact.

Raises
------
FileNotFoundError
If the artifact path is not an absolute path or does not exist.

"""
path = Path(artifact_path)

if not path.is_absolute() or not path.exists():
raise FileNotFoundError(f"Cannot find artifact at path {path}")

return str(path)
19 changes: 8 additions & 11 deletions src/vivarium_cluster_tools/psimulate/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ def psimulate():

You may initiate a new run with the ``run`` sub-command or restart a run
from where it was stopped by using the ``restart`` sub-command.

"""
pass

Expand Down Expand Up @@ -109,7 +108,6 @@ def run(
created with the same name as the MODEL_SPECIFICATION if one does not exist.
Results will be written to a further subdirectory named after the start time
of the simulation run.

"""
logs.configure_main_process_logging_to_terminal(options["verbose"])
main = handle_exceptions(runner.main, logger, options["with_debugger"])
Expand Down Expand Up @@ -146,12 +144,12 @@ def run(
)
@cli_tools.pass_shared_options(shared_options)
def restart(results_root, **options):
"""Restart a parallel simulation from a previous run at RESULTS_ROOT.
"""Restart a parallel simulation.

This restarts a parallel simulation from a previous run at RESULTS_ROOT.
Restarting will not erase existing results, but will start workers to
perform the remaining simulations. RESULTS_ROOT is expected to be an
output directory from a previous ``psimulate run`` invocation.

"""
logs.configure_main_process_logging_to_terminal(options["verbose"])
main = handle_exceptions(runner.main, logger, options["with_debugger"])
Expand Down Expand Up @@ -199,14 +197,13 @@ def restart(results_root, **options):
)
@cli_tools.pass_shared_options(shared_options)
def expand(results_root, **options):
"""Expand a previous run at RESULTS_ROOT by adding input draws and/or
random seeds.

Expanding will not erase existing results, but will start workers to perform
the additional simulations determined by the added draws/seeds.
RESULTS_ROOT is expected to be an output directory from a previous
``psimulate run`` invocation.
"""Expand a previous run.

This expands a previous run at RESULTS_ROOT by adding input draws and/or
random seeds. Expanding will not erase existing results, but will start
workers to perform the additional simulations determined by the added
draws/seeds. RESULTS_ROOT is expected to be an output directory from a
previous ``psimulate run`` invocation.
"""
logs.configure_main_process_logging_to_terminal(options["verbose"])
main = handle_exceptions(runner.main, logger, options["with_debugger"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from vivarium_cluster_tools.psimulate import COMMANDS

if typing.TYPE_CHECKING:
# Cyclic import.
from vivarium_cluster_tools.psimulate.branches import Keyspace


Expand Down
30 changes: 15 additions & 15 deletions src/vivarium_cluster_tools/psimulate/performance_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
def transform_perf_df_for_appending(
perf_df: pd.DataFrame, output_paths: OutputPaths
) -> pd.DataFrame:
"""
"""Transform performance DataFrame for appending to central logs.

Take performance dataframe from performance report and 1) turn index into columns so
we can write to csv, 2) add artifact name column, and 3) aggregate scenario information
into one column.
Expand All @@ -26,11 +27,10 @@ def transform_perf_df_for_appending(
DataFrame pulled from performance report with index values uniquely identifying each child
job and column values containing their performance data.
output_paths
OutputPaths object from paths.py containing information about the results directory.
OutputPaths object containing information about the results directory.

Returns
--------
DataFrame
-------
The transformed DataFrame which can be directly appended to our central logs. The data now
has a simple RangeIndex, the index values as columns, a new artifact name column, and a new
scenario parameters column.
Expand Down Expand Up @@ -74,9 +74,8 @@ def append_child_job_data(child_job_performance_data: pd.DataFrame) -> str:
DataFrame pulled from transform_perf_df_for_appending.

Returns
--------
str
str of the first file in our central logs containing child job data.
-------
The first file in our central logs containing child job data.
"""
log_files = glob.glob(
CENTRAL_PERFORMANCE_LOGS_DIRECTORY.as_posix() + "/log_summary_*.csv"
Expand Down Expand Up @@ -120,11 +119,11 @@ def generate_runner_job_data(
Parameters
----------
job_number
int of job number for our runner job.
The job number for our runner job.
output_paths
OutputPaths object from paths.py containing information about the results directory.
OutputPaths object containing information about the results directory.
first_file_with_data
str of the first file in our central logs containing child job data
The first file in our central logs containing child job data
launched by our runner job.
"""
runner_data = pd.DataFrame({"job_number": job_number}, index=[0])
Expand All @@ -144,17 +143,18 @@ def generate_runner_job_data(
def append_perf_data_to_central_logs(
perf_df: pd.DataFrame, output_paths: OutputPaths
) -> None:
"""Append performance data to the central logs. This consists of child job data
and runner data. The child job data will contain performance information and identifying
information for each child job and the runner data will contain data about the runner job
that launched these child jobs.
"""Append performance data to the central logs.

This consists of child job data and runner data. The child job data will contain
performance information and identifying information for each child job and the
runner data will contain data about the runner job that launched these child jobs.

Parameters
----------
perf_df
DataFrame pulled from performance report.
output_paths
OutputPaths object from paths.py containing information about the results directory.
OutputPaths object containing information about the results directory.
"""
if not output_paths.logging_to_central_results_directory:
logger.warning(
Expand Down
19 changes: 14 additions & 5 deletions src/vivarium_cluster_tools/psimulate/redis_dbs/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
Creates redis databases to store job parameters and results.

"""

import atexit
import math
import socket
Expand Down Expand Up @@ -90,9 +91,14 @@ def _launch_redis(


def _get_random_free_port() -> int:
# NOTE: this implementation is vulnerable to rare race conditions where some other
# process gets the same port after we free our socket but before we use the port
# number we got. Should be so rare in practice that it doesn't matter.
"""Get a random free port number.

Notes
-----
This implementation is vulnerable to rare race conditions where some other
process gets the same port after we free our socket but before we use the port
number we got. Should be so rare in practice that it doesn't matter.
"""
s = socket.socket()
s.bind(("", 0))
port = s.getsockname()[1]
Expand All @@ -101,6 +107,9 @@ def _get_random_free_port() -> int:


def _expected_sufficient_workers(num_queues) -> int:
# Rough estimate of the number of workers needed to ensure that each queue gets
# at least one worker. cf. https://w.wiki/7bnb
"""Estimate the number of workers needed.

The intent is to provide a rough estimate of the number of workers needed to
ensure that each queue gets at least one worker. cf. https://w.wiki/7bnb
"""
return int(math.ceil(num_queues * (math.log(num_queues) + 0.57)))
15 changes: 10 additions & 5 deletions src/vivarium_cluster_tools/psimulate/results/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,11 @@ def _concat_results(


def _concat_preserve_types(df_list: List[pd.DataFrame]) -> pd.DataFrame:
"""Concatenate datasets and preserve all ``numpy`` dtypes (but not any
pandas-specific dtypes, e.g. categories become objects). We assume that all
dataframes in the list have identical columns and dtypes to the first.
"""Concatenate datasets and preserve all ``numpy`` dtypes.

This does not preserve any pandas-specific dtypes, e.g. categories become
objects). We assume that all dataframes in the list have identical columns
and dtypes to the first.
"""
dtype_mapping = {col: df_list[0][col].dtype for col in df_list[0].columns}
for df in df_list:
Expand All @@ -106,8 +108,11 @@ def _concat_preserve_types(df_list: List[pd.DataFrame]) -> pd.DataFrame:

@vct_utils.backoff_and_retry(backoff_seconds=30, num_retries=3, log_function=logger.warning)
def _safe_write(df: pd.DataFrame, output_path: Path) -> None:
# Writing to some file types over and over balloons the file size so
# write to new file and move it over to avoid
"""Write a dataframe to disk, retrying on failure.

Writing to some file types over and over balloons the file size so instead we
write to a new file and move it over.
"""
temp_output_path = output_path.with_name(
output_path.stem + "_update" + output_path.suffix
)
Expand Down
Loading
Loading