Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 25 additions & 13 deletions pipeline/bundling/bundle_maps.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import itertools

import matplotlib.pyplot as plt

from procs_pool import get_exec_env

def car2healpix(norm_hits_map):
"""
Expand All @@ -19,7 +19,7 @@ def car2healpix(norm_hits_map):
return reproject.map2healpix(norm_hits_map, spin=[0])


def main(args):
def main(args, parallelizor=None):
"""
"""
args = args.copy() # Make sure we don't accidentally modify the input args
Expand Down Expand Up @@ -92,7 +92,7 @@ def main(args):
null_prop_val=split_inter_obs,
map_dir=args.map_dir,
abscal=args.abscal,
nproc=args.nproc
parallelizor=parallelizor
)

# Map naming convention
Expand Down Expand Up @@ -188,14 +188,8 @@ def main(args):
plt.close()


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Make bundled maps")
parser.add_argument(
"--config_file", type=str, help="yaml file with configuration."
)

args = parser.parse_args()
config = bundling_utils.Cfg.from_yaml(args.config_file)
def _main(config_file, parallelizor):
config = bundling_utils.Cfg.from_yaml(config_file)
its = [np.atleast_1d(x) for x in [config.freq_channel, config.wafer]]
patch_list = config.patch

Expand Down Expand Up @@ -230,7 +224,7 @@ def main(args):
print(null_prop_val)
config2.null_prop_val_inter_obs = null_prop_val
try:
main(config2)
main(config2, parallelizor)
except ValueError as e:
print(e)

Expand All @@ -243,7 +237,7 @@ def main(args):
print(split_val)
config2.split_label_intra_obs = split_val
try:
main(config2)
main(config2, parallelizor)
except ValueError as e:
print(e)

Expand Down Expand Up @@ -287,3 +281,21 @@ def main(args):
plot = enplot.plot(coadd_map*1e6, colorbar=True, color='gray', range="100:20:20", ticks=10, downgrade=2, autocrop=True)
enplot.write(savename.replace("{}.fits", "mapQ.png"), plot[1])
enplot.write(savename.replace("{}.fits", "mapU.png"), plot[2])

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Make bundled maps")
parser.add_argument(
"--config_file", type=str, help="yaml file with configuration."
)
parser.add_argument(
"--nproc", type=int, default=1, help="Number of parallel processes for concurrent futures."
)

args = parser.parse_args()
rank, executor, as_completed_callable = get_exec_env(args.nproc)
if rank == 0:
try:
nproc = executor.num_workers
except AttributeError:
nproc = executor._max_workers
_main(args.config_file, (executor, as_completed_callable, nproc))
44 changes: 21 additions & 23 deletions pipeline/bundling/bundling_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import healpy as hp
from astropy.io import fits
import h5py
from concurrent.futures import ProcessPoolExecutor, as_completed

from copy import deepcopy

from typing import Optional
Expand Down Expand Up @@ -169,7 +169,7 @@ def _get_map_template_hp(template_map=None, nside=512, dtype=np.float64):

def coadd_maps(maps_list, weights_list, hits_list=None, sign_list=None,
pix_type="hp", res_car=5., car_template_map=None,
dec_cut_car=None, fields_hp=None, abscal=1, nproc=1):
dec_cut_car=None, fields_hp=None, abscal=1, parallelizor=None):
"""
Coadd a list of weighted maps, a list of map weights, and
(optionally) a list of hits maps corresponding to a set of atomics.
Expand Down Expand Up @@ -202,8 +202,8 @@ def coadd_maps(maps_list, weights_list, hits_list=None, sign_list=None,
abscal: array
Multiplicative factor for each map. Output maps will be multiplied
by this number; the weights map will get abscal**-2
nproc: int
Number of parallel processes to use. 1 for serial.
parallelizor: tuple
(MPICommExecutor or ProcessPoolExecutor, as_completed_callable, num_workers)

Returns
-------
Expand All @@ -222,7 +222,7 @@ def coadd_maps(maps_list, weights_list, hits_list=None, sign_list=None,
if isinstance(abscal, list):
abscal = np.array(abscal)

sum_fn = _make_parallel_proc(sum_maps, nproc) if nproc > 1 else sum_maps
sum_fn = _make_parallel_proc(sum_maps, parallelizor) if parallelizor is not None else sum_maps

if pix_type == "car":
template = _get_map_template_car(car_template_map, res_car,
Expand Down Expand Up @@ -400,37 +400,38 @@ def _add_map(imap, omap, pix_type):
op=np.ndarray.__iadd__)


def _make_parallel_proc(fn, nproc_default):
def _make_parallel_proc(fn, parallelizor):
"""Parallelize a coaddition function using ProcessPoolExecutor.

Parameters
----------
fn: function
coaddition function with input params (list of filenames, template map)
and return: coadded map
nproc_default: int
Default number of processes to use
parallelizor: tuple
(MPICommExecutor or ProcessPoolExecutor, as_completed_callable, num_workers)

Returns
-------
fn: function
Parallelized coaddition function with same input params, plus optional
nproc=nproc_default
nproc=num_workers from parallelizor
"""
def parallel_fn(filenames, template, *args, nproc=nproc_default, **kwargs):
exe, as_completed, nproc = parallelizor
def parallel_fn(filenames, template, *args, nproc=nproc, **kwargs):
ibin = int(np.ceil(len(filenames)/nproc))
slices = [slice(iproc*ibin, (iproc+1)*ibin) for iproc in range(nproc)]
out = None
with ProcessPoolExecutor(nproc) as exe:
futures = [exe.submit(fn, filenames, template, *args,
islice=slices[iproc], **kwargs)
for iproc in range(nproc)]
for future in as_completed(futures):
if out is None:
out = future.result()
else:
out += future.result()
futures.remove(future)

futures = [exe.submit(fn, filenames, template, *args,
islice=slices[iproc], **kwargs)
for iproc in range(nproc)]
for future in as_completed(futures):
if out is None:
out = future.result()
else:
out += future.result()
futures.remove(future)

return out
return parallel_fn
Expand Down Expand Up @@ -587,8 +588,6 @@ class Cfg:
Wafer label, e.g. 'ws0'. May be a list of strings.
save_fnames: bool
Save the atomic map filenames for each bundle
nproc: int
Number of parallel processes to use in coadd
atomic_list: str
Path to npy file of atomic map names to restrict the atomic db
abscal: bool
Expand Down Expand Up @@ -623,7 +622,6 @@ class Cfg:
car_map_template: Optional[str] = None
wafer: Optional[str] = None
save_fnames: bool = False
nproc: int = 1
atomic_list: Optional[str] = None
abscal: bool = False
tel: Optional[str] = None
Expand Down
8 changes: 4 additions & 4 deletions pipeline/bundling/coadder.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ def extract_ws_freq(self, input_str):
return ws, freq

def bundle(self, bundle_id, split_label=None, null_prop_val=None,
map_dir=None, abscal=False, nproc=1):
map_dir=None, abscal=False, parallelizor=None):
"""
Make a map bundle given a bundle ID and, optionally, null properties.

Expand All @@ -341,8 +341,8 @@ def bundle(self, bundle_id, split_label=None, null_prop_val=None,
belong to.
abscal: bool
Apply saved abscal factors if True.
nproc: int
Number of parallel processes to use. 1 for serial.
parallelizor: tuple
(MPICommExecutor or ProcessPoolExecutor, as_completed_callable, num_workers)

Returns
-------
Expand All @@ -367,7 +367,7 @@ def bundle(self, bundle_id, split_label=None, null_prop_val=None,

signal, weights, hits = coadd_maps(
fnames, weights_list, hits_list, pix_type=self.pix_type,
car_template_map=self.car_map_template, abscal=abfac, nproc=nproc
car_template_map=self.car_map_template, abscal=abfac, parallelizor=parallelizor
)

return signal, weights, hits, fnames
Expand Down
142 changes: 142 additions & 0 deletions pipeline/bundling/procs_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
from __future__ import annotations
from typing import Tuple, Union, List, Callable, Optional


def _get_mpi_comm() -> (
Tuple[bool, int, Optional["MPICommExecutor"], Optional[Callable]]
):
"""This private function tries to create an MPICommExecutor object and returns it if successful.

Returns
-------
bool
Whether an MPICommExecutor object was created successfully. This is important as only rank 0 creates the executor and all other ranks return None as an executor.
int
The global rank of the master process.
Optional["MPICommExecutor"]
If the executor was created successfully, this is the MPICommExecutor object. Otherwise, it is None.
Optional[Callable]
If the executor was created successfully, this is the as_completed function. Otherwise, it is None.
"""
try:
from mpi4py.futures import MPICommExecutor
from mpi4py.futures import as_completed
from mpi4py import MPI

comm = MPI.COMM_WORLD

rank = comm.Get_rank()
max_workers = comm.Get_size() - 1

return (
True,
rank,
MPICommExecutor(comm, root=0, max_workers=max_workers).__enter__(),
as_completed,
)
except (NameError, ImportError, ValueError):
return False, 0, None, None


def _get_concurrent_comm(
nprocs: Optional[int] = None,
) -> Tuple[int, "ProcessPoolExecutor", Callable]:
"""This private function tries to create an ProcessPoolExecutor object and returns it if successful.

Returns
-------
bool
Whether an ProcessPoolExecutor object was created successfully. This is important as only rank 0 creates the executor and all other ranks return None as an executor.
int
The global rank of the master process.
Optional["ProcessPoolExecutor"]
If the executor was created successfully, this is the ProcessPoolExecutor object. Otherwise, it is None.
Optional[Callable]
If the executor was created successfully, this is the as_completed function. Otherwise, it is None.
"""

from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
from multiprocessing import set_start_method

set_start_method("spawn")
return True, 0, ProcessPoolExecutor(max_workers=nprocs), as_completed


def get_exec_env(
nprocs: int = None, priority: List[str] = ["mpi", "process_pool"]
) -> Tuple[int, Union["MPICommExecutor", "ProcessPoolExecutor"], Callable]:
"""This function sets up the execution environment for parallel processing based on the specified priority list.

Since this function returns a rank value the main function needs to go under an
>>> if rank == 0

block to ensure that the code is only run by the master process.

In addition the main function needs to take the executor and as_completed_callable as arguments.

Let's assume that the main function is called main_func and it current state is as follows:
>>> def main_func():
>>> with ProcessPoolExecutor() as executor:
>>> futures = []
>>> for i in range(10):
>>> futures.append(executor.submit(some_func, i))
>>> for future in as_completed(futures):
>>> result = future.result()
>>> print(result)
>>> if __name__ == "__main__":
>>> main_func()


Utilizing different executors requires to change the main function as follows:
>>> def main_func(executor, as_completed_callable):
>>> futures = []
>>> for i in range(10):
>>> futures.append(executor.submit(some_func, i))
>>> for future in as_completed_callable(futures):
>>> result = future.result()
>>> print(result)
>>> if __name__ == "__main__":
>>> rank, executor, as_completed_callable = get_exec_env(nprocs=4)
>>> if rank == 0:
>>> main_func(executor, as_completed_callable)


Parameters
----------
nprocs : int, optional
The number of processes to use for the process pool executor. If not specified, the default is None.
priority : List[str], optional

Returns
-------
int
The rank of the process that is the master process for this type of execution

Union["MPICommExecutor", "ProcessPoolExecutor"]
The executor that is used to run the code in parallel. This can be either an MPICommExecutor or a ProcessPoolExecutor depending on the execution environment.

Callable
The as_completed function that is used to get the results of the parallel execution based on the type of the executor

Raises
------
ValueError
When a valid executor is not available based on the execution priority list.
"""
user_priority = list(priority)
executor_created = False
while not executor_created:
executor_mode = user_priority.pop(0)
if executor_mode == "mpi":
executor_created, rank, executor, as_completed_callable = (
_get_mpi_comm()
)
elif executor_mode == "process_pool":
executor_created, rank, executor, as_completed_callable = (
_get_concurrent_comm(nprocs)
)
else:
raise ValueError("No executor available")

return rank, executor, as_completed_callable