From f5f281bc1ac2849bc10b9516e7f515f04c6e96af Mon Sep 17 00:00:00 2001 From: Reijo Keskitalo Date: Mon, 23 Dec 2024 08:52:05 -0800 Subject: [PATCH 01/16] Add PWV cuts --- sotodlib/toast/workflows/sim_observe.py | 35 +++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/sotodlib/toast/workflows/sim_observe.py b/sotodlib/toast/workflows/sim_observe.py index 5ba820eb2..ae5d8b203 100644 --- a/sotodlib/toast/workflows/sim_observe.py +++ b/sotodlib/toast/workflows/sim_observe.py @@ -98,6 +98,13 @@ def setup_simulate_observing(parser, operators): help="Realization index", type=int, ) + parser.add_argument( + "--pwv_limit", + required=False, + type=float, + help="If set, discard observations with simulated PWV " + "higher than the limit [mm]", + ) operators.append( toast.ops.SimGround( @@ -230,6 +237,34 @@ def simulate_observing(job, otherargs, runargs, comm): job_ops.mem_count.prefix = "After Scan Simulation" job_ops.mem_count.apply(data) + if otherargs.pwv_limit is not None: + iobs = 0 + ngood = 0 + nbad = 0 + while iobs < len(data.obs): + pwv = data.obs[iobs].telescope.site.weather.pwv.to_value(u.mm) + if pwv <= otherargs.pwv_limit: + ngood += 1 + iobs += 1 + else: + nbad += 1 + del data.obs[iobs] + if len(data.obs) == 0: + msg = ( + f"PWV limit = {otherargs.pwv_limit} mm rejected all " + f"{nbad} observations assigned to this process" + ) + raise RuntimeError(msg) + if toast_comm.comm_group_rank is not None: + nbad = toast_comm.comm_group_rank.allreduce(nbad) + ngood = toast_comm.comm_group_rank.allreduce(ngood) + log.info_rank( + f" Discarded {nbad} / {ngood + nbad} observations " + f"with PWV > {otherargs.pwv_limit} mm in", + comm=comm, + timer=timer, + ) + # Apply LAT co-rotation if job_ops.corotate_lat.enabled: log.info_rank(" Running simulated LAT corotation...", comm=comm) From f361d7f964644cb35f519ffee4eef9aa2ed845ff Mon Sep 17 00:00:00 2001 From: Reijo Keskitalo Date: Mon, 23 Dec 2024 10:01:04 -0800 Subject: [PATCH 02/16] Put the add_obs call into a function to profile it separately --- sotodlib/toast/ops/mlmapmaker.py | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/sotodlib/toast/ops/mlmapmaker.py b/sotodlib/toast/ops/mlmapmaker.py index 0b601e2b9..d52b743b9 100644 --- a/sotodlib/toast/ops/mlmapmaker.py +++ b/sotodlib/toast/ops/mlmapmaker.py @@ -488,6 +488,21 @@ def _wrap_obs(self, ob, dets, passinfo): return axobs + @function_timer + def _add_obs(self, mapmaker, name, axobs, nmat, signal_estimate): + """ Add data to the mapmaker + + Singled out for more granular timing + """ + mapmaker.add_obs( + name, + axobs, + deslope=self.deslope, + noise_model=nmat, + signal_estimate=signal_estimate, + ) + return + @function_timer def _init_mapmaker( self, mapmaker, signal_map, mapmaker_prev, eval_prev, comm, gcomm, prefix, @@ -749,13 +764,8 @@ def _exec(self, data, detectors=None, **kwargs): else: signal_estimate = None - mapmaker.add_obs( - ob.name, - axobs, - deslope=self.deslope, - noise_model=nmat, - signal_estimate=signal_estimate, - ) + self._add_obs(mapmaker, ob.name, axobs, nmat, signal_estimate) + del axobs del signal_estimate From 2443631ca6f5b627b5d6bc16ea225992e42312df Mon Sep 17 00:00:00 2001 From: Reijo Keskitalo Date: Mon, 21 Apr 2025 11:47:37 -0700 Subject: [PATCH 03/16] Make FilterBin more compatible with Splits --- sotodlib/toast/ops/splits.py | 51 ++++++++++++-- sotodlib/toast/workflows/proc_mapmaker.py | 5 ++ sotodlib/toast/workflows/sim_sky.py | 81 +++++++++++++---------- 3 files changed, 99 insertions(+), 38 deletions(-) diff --git a/sotodlib/toast/ops/splits.py b/sotodlib/toast/ops/splits.py index 7b11a527a..83c806528 100644 --- a/sotodlib/toast/ops/splits.py +++ b/sotodlib/toast/ops/splits.py @@ -1,4 +1,4 @@ -# Copyright (c) 2024-2024 Simons Observatory. +# Copyright (c) 2024-2025 Simons Observatory. # Full license can be found in the top level "LICENSE" file. import os @@ -442,6 +442,24 @@ class Splits(Operator): splits = List([], help="The list of named splits to apply") + write_hits = Bool(True, help="If True, write the hits map") + + write_cov = Bool(False, help="If True, write the white noise covariance matrices.") + + write_invcov = Bool( + True, + help="If True, write the inverse white noise covariance matrices.", + ) + + write_rcond = Bool(False, help="If True, write the reciprocal condition numbers.") + + write_map = Bool(True, help="If True, write the filtered/destriped map") + + write_noiseweighted_map = Bool( + False, + help="If True, write the noise-weighted map (for fast co-add)", + ) + def __init__(self, **kwargs): super().__init__(**kwargs) return @@ -588,6 +606,11 @@ def write_splits(self, data, split_name=None): msg = "No splits have been created yet, cannot write" raise RuntimeError(msg) + if hasattr(self.mapmaker, "map_binning"): + pixel_pointing = self.mapmaker.map_binning.pixel_pointing + else: + pixel_pointing = self.mapmaker.binning.pixel_pointing + if split_name is None: to_write = dict(self._split_obj) else: @@ -600,10 +623,30 @@ def write_splits(self, data, split_name=None): for spname, spl in to_write.items(): mname = f"{self.name}_{split_name}" - for prod in ["hits", "map", "invcov", "noiseweighted_map"]: - mkey = f"{mname}_{prod}" + for prod, binner_key, write in [ + ("hits", None, self.write_hits), + ("cov", None, self.write_cov), + ("invcov", None, self.write_invcov), + ("rcond", None, self.write_rcond), + ("map", "binned", self.write_map), + ("noiseweighted_map", "noiseweighted", self.write_noiseweighted_map), + ]: + if not write: + continue + if binner_key is not None: + # get the product name from BinMap + mkey = getattr(self.mapmaker.binning, binner_key) + else: + # hits and covariance are not made by BinMap. + # Try synthesizing the product name + mkey = f"{mname}_{prod}" + if mkey not in data: + msg = f"'{mkey}' not found in data. " + smg += f"Available keys are {data.keys()}" + raise RuntimeError(msg) fname = os.path.join( - self.output_dir, f"{self.name}_{spname}_{prod}.{fname_suffix}" + self.output_dir, + f"{self.name}_{spname}_{prod}.{fname_suffix}" ) data[mkey].write( fname, diff --git a/sotodlib/toast/workflows/proc_mapmaker.py b/sotodlib/toast/workflows/proc_mapmaker.py index d3a41f862..b01445627 100644 --- a/sotodlib/toast/workflows/proc_mapmaker.py +++ b/sotodlib/toast/workflows/proc_mapmaker.py @@ -174,6 +174,11 @@ def mapmaker_select_noise_and_binner(job, otherargs, runargs, data): if job_tmpls.baselines.enabled: job_tmpls.noise_model = noise_model + elif job_ops.filterbin.enabled: + job_ops.filterbin.binning = job_ops.binner_final + job_ops.filterbin.binning.full_pointing = otherargs.full_pointing + job_ops.filterbin.det_data = job_ops.sim_noise.det_data + job_ops.filterbin.output_dir = otherargs.out_dir @workflow_timer diff --git a/sotodlib/toast/workflows/sim_sky.py b/sotodlib/toast/workflows/sim_sky.py index 74bddd76a..363c91d83 100644 --- a/sotodlib/toast/workflows/sim_sky.py +++ b/sotodlib/toast/workflows/sim_sky.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023-2024 Simons Observatory. +# Copyright (c) 2023-2025 Simons Observatory. # Full license can be found in the top level "LICENSE" file. """Simulated detector response to sky signal. """ @@ -28,6 +28,7 @@ def setup_simulate_sky_map_signal(operators): """ operators.append(toast.ops.ScanHealpixMap(name="scan_map", enabled=False)) + operators.append(toast.ops.ScanHealpixDetectorMap(name="scan_detector_map", enabled=False)) operators.append( toast.ops.StokesWeights( name="scan_map_weights", @@ -89,53 +90,65 @@ def simulate_sky_map_signal(job, otherargs, runargs, data): # Configured operators for this job job_ops = job.operators - if job_ops.scan_map.enabled and job_ops.scan_wcs_map.enabled: + scan_healpix = job_ops.scan_map.enabled or job_ops.scan_detector_map.enabled + scan_wcs = job_ops.scan_wcs_map.enabled + scan_map = scan_healpix or scan_wcs + + # Check for conflicting options + + if scan_healpix and scan_wcs: msg = "Cannot scan from both healpix and WCS maps" log.error(msg) raise RuntimeError(msg) if job_ops.det_pointing_radec_sim is not job_ops.det_pointing_radec: - if job_ops.scan_map.enabled and not job_ops.scan_map_pixels.enabled: + if scan_healpix and not job_ops.scan_map_pixels.enabled: msg = "Simulation pointing is different from data reduction pointing. " \ " You must enable scan_map_pixels" log.error(msg) raise RuntimeError(msg) - if job_ops.scan_wcs_map.enabled and not job_ops.scan_wcs_map_pixels.enabled: + if scan_wcs and not job_ops.scan_wcs_map_pixels.enabled: msg = "Simulation pointing is different from data reduction pointing. " \ " You must enable scan_wcs_map_pixels" log.error(msg) raise RuntimeError(msg) - if job_ops.scan_map.enabled: - if job_ops.scan_map_pixels.enabled: - # We are using a custom pointing matrix - job_ops.scan_map_pixels.detector_pointing = job_ops.det_pointing_radec_sim - job_ops.scan_map.pixel_dist = "scan_map_pixel_dist" - job_ops.scan_map.pixel_pointing = job_ops.scan_map_pixels - else: - # We are using the same pointing matrix as the mapmaking - job_ops.scan_map.pixel_dist = job_ops.binner_final.pixel_dist - job_ops.scan_map.pixel_pointing = job.pixels_final - if job_ops.scan_map_weights.enabled: - job_ops.scan_map_weights.detector_pointing = job_ops.det_pointing_radec_sim - job_ops.scan_map.stokes_weights = job_ops.scan_map_weights - else: - job_ops.scan_map.stokes_weights = job_ops.weights_radec - job_ops.scan_map.save_pointing = otherargs.full_pointing - job_ops.scan_map.apply(data) - if job_ops.scan_map_pixels.enabled: - # Clean up our custom pointing - toast.ops.Delete(detdata=[ - job_ops.scan_map_pixels.pixels, - job_ops.det_pointing_radec_sim.quats, - ]).apply(data) - if job_ops.scan_map_weights.enabled: - # Clean up our custom pointing - toast.ops.Delete(detdata=[ - job_ops.scan_map_weights.weights, - ]).apply(data) - - if job_ops.scan_wcs_map.enabled: + # Scan signal from HEALPix maps + + for scan_op in job_ops.scan_map, job_ops.scan_detector_map: + if scan_op.enabled: + if job_ops.scan_map_pixels.enabled: + # We are using a custom pointing matrix + job_ops.scan_map_pixels.detector_pointing = job_ops.det_pointing_radec_sim + scan_op.pixel_dist = "scan_map_pixel_dist" + scan_op.pixel_pointing = job_ops.scan_map_pixels + else: + # We are using the same pointing matrix as the mapmaking + scan_op.pixel_dist = job_ops.binner_final.pixel_dist + scan_op.pixel_pointing = job.pixels_final + if job_ops.scan_map_weights.enabled: + job_ops.scan_map_weights.detector_pointing = job_ops.det_pointing_radec_sim + scan_op.stokes_weights = job_ops.scan_map_weights + else: + scan_op.stokes_weights = job_ops.weights_radec + scan_op.save_pointing = otherargs.full_pointing + scan_op.apply(data) + if job_ops.scan_map_pixels.enabled: + # Clean up our custom pointing + toast.ops.Delete(detdata=[ + job_ops.scan_map_pixels.pixels, + job_ops.det_pointing_radec_sim.quats, + ]).apply(data) + if job_ops.scan_map_weights.enabled: + # Clean up our custom pointing + toast.ops.Delete(detdata=[ + job_ops.scan_map_weights.weights, + ]).apply(data) + data.info() + + # Scan signal from WCS maps + + if scan_wcs: if job_ops.scan_wcs_map_pixels.enabled: # We are using a custom pointing matrix job_ops.scan_wcs_map_pixels.detector_pointing = job_ops.det_pointing_radec_sim From 7998f733b6e0d7e1b43bc402cdf82e3f702b0eec Mon Sep 17 00:00:00 2001 From: Reijo Keskitalo Date: Tue, 22 Apr 2025 12:02:20 -0700 Subject: [PATCH 04/16] Add option to apply splits as flags rather than views. Notable speedup in SAT sims. Requires an update in TOAST --- sotodlib/toast/ops/splits.py | 49 ++++++++++++++++++++++++++++++++---- 1 file changed, 44 insertions(+), 5 deletions(-) diff --git a/sotodlib/toast/ops/splits.py b/sotodlib/toast/ops/splits.py index 83c806528..5dd7e2d69 100644 --- a/sotodlib/toast/ops/splits.py +++ b/sotodlib/toast/ops/splits.py @@ -12,7 +12,7 @@ from toast.timing import function_timer from toast.intervals import IntervalList from toast.observation import default_values as defaults -from toast.ops import Operator +from toast.ops import Operator, FlagIntervals from toast import qarray as qa @@ -442,16 +442,39 @@ class Splits(Operator): splits = List([], help="The list of named splits to apply") + splits_as_flags = Bool( + False, + help="If True, splits are applied through flagging rather than views. " + "This can be more efficient if there are only a few samples in each " + "interval.", + ) + + shared_flags = Unicode( + defaults.shared_flags, + allow_none=True, + help="Observation shared key for telescope flags to use when " + "splits_as_flags is True", + ) + + shared_flag_mask = Int( + 128, + help="Bit mask value for flagging when splits_as_flags is True", + ) + write_hits = Bool(True, help="If True, write the hits map") - write_cov = Bool(False, help="If True, write the white noise covariance matrices.") + write_cov = Bool( + False, help="If True, write the white noise covariance matrices." + ) write_invcov = Bool( True, help="If True, write the inverse white noise covariance matrices.", ) - write_rcond = Bool(False, help="If True, write the reciprocal condition numbers.") + write_rcond = Bool( + False, help="If True, write the reciprocal condition numbers." + ) write_map = Bool(True, help="If True, write the filtered/destriped map") @@ -541,6 +564,7 @@ def _exec(self, data, detectors=None, **kwargs): else: map_binner = self.mapmaker.binning pointing_view_save = map_binner.pixel_pointing.view + shared_flag_mask_save = map_binner.shared_flag_mask # If the pixel distribution does yet exit, we create it once prior to # doing any splits. @@ -572,8 +596,22 @@ def _exec(self, data, detectors=None, **kwargs): for ob in data.obs: spl.create_split(ob) - # Set mapmaking tools to use the current split interval list - map_binner.pixel_pointing.view = spl.split_intervals + if self.splits_as_flags: + # Split is applied through flagging, not as a view + # Flag samples outside the intervals by prefixing '~' + # to the view name + FlagIntervals( + shared_flags=self.shared_flags, + shared_flag_bytes=1, + view_mask=[ + ("~" + spl.split_intervals, np.uint8(self.shared_flag_mask)) + ], + ).apply(data) + map_binner.shared_flag_mask |= self.shared_flag_mask + else: + # Set mapmaking tools to use the current split interval list + map_binner.pixel_pointing.view = spl.split_intervals + if not map_binner.full_pointing: # We are not using full pointing and so we clear the # residual pointing for this split @@ -599,6 +637,7 @@ def _exec(self, data, detectors=None, **kwargs): for k, v in mapmaker_save_traits.items(): setattr(self.mapmaker, k, v) map_binner.pixel_pointing.view = pointing_view_save + map_binner.pixel_pointing.shared_flag_mask = shared_flag_mask_save def write_splits(self, data, split_name=None): """Write out all split products.""" From 1b50d95a4d81589e0d996fc0d848f310082013ab Mon Sep 17 00:00:00 2001 From: Reijo Keskitalo Date: Thu, 1 May 2025 10:57:27 -0700 Subject: [PATCH 05/16] Reset flags by default --- sotodlib/toast/ops/splits.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sotodlib/toast/ops/splits.py b/sotodlib/toast/ops/splits.py index 5dd7e2d69..796207dcf 100644 --- a/sotodlib/toast/ops/splits.py +++ b/sotodlib/toast/ops/splits.py @@ -604,8 +604,9 @@ def _exec(self, data, detectors=None, **kwargs): shared_flags=self.shared_flags, shared_flag_bytes=1, view_mask=[ - ("~" + spl.split_intervals, np.uint8(self.shared_flag_mask)) + (f"~{spl.split_intervals}", np.uint8(self.shared_flag_mask)) ], + reset=True, ).apply(data) map_binner.shared_flag_mask |= self.shared_flag_mask else: From 43a7ac38d9b7909ae1326cda6b6eab2497f4665d Mon Sep 17 00:00:00 2001 From: Reijo Keskitalo Date: Thu, 31 Oct 2024 10:03:47 -0700 Subject: [PATCH 06/16] Add support for sorting the schedule --- sotodlib/toast/workflows/sim_observe.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sotodlib/toast/workflows/sim_observe.py b/sotodlib/toast/workflows/sim_observe.py index ae5d8b203..e46f798d0 100644 --- a/sotodlib/toast/workflows/sim_observe.py +++ b/sotodlib/toast/workflows/sim_observe.py @@ -193,6 +193,8 @@ def simulate_observing(job, otherargs, runargs, comm): # Load the schedule file schedule = toast.schedule.GroundSchedule() schedule.read(otherargs.schedule, comm=comm) + if otherargs.sort_schedule: + schedule.sort_by_RA() log.info_rank(" Loaded schedule in", comm=comm, timer=timer) if otherargs.sort_schedule: schedule.sort_by_RA() From 73da03be04290f210bccf10c34af314e2be99572 Mon Sep 17 00:00:00 2001 From: Reijo Keskitalo Date: Wed, 6 Nov 2024 15:05:23 -0800 Subject: [PATCH 07/16] use a consistent communicator and report the size of the focalplane --- sotodlib/toast/workflows/sim_observe.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sotodlib/toast/workflows/sim_observe.py b/sotodlib/toast/workflows/sim_observe.py index e46f798d0..b22ff29b0 100644 --- a/sotodlib/toast/workflows/sim_observe.py +++ b/sotodlib/toast/workflows/sim_observe.py @@ -193,6 +193,7 @@ def simulate_observing(job, otherargs, runargs, comm): # Load the schedule file schedule = toast.schedule.GroundSchedule() schedule.read(otherargs.schedule, comm=comm) + log.info_rank(" Loaded schedule in", comm=comm, timer=timer) if otherargs.sort_schedule: schedule.sort_by_RA() log.info_rank(" Loaded schedule in", comm=comm, timer=timer) From 62b3365fb42d9519d672af6d5805742e933e708e Mon Sep 17 00:00:00 2001 From: Reijo Keskitalo Date: Tue, 6 May 2025 11:46:05 -0700 Subject: [PATCH 08/16] Remove debug statement --- sotodlib/toast/workflows/sim_sky.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sotodlib/toast/workflows/sim_sky.py b/sotodlib/toast/workflows/sim_sky.py index 363c91d83..90a5a7b81 100644 --- a/sotodlib/toast/workflows/sim_sky.py +++ b/sotodlib/toast/workflows/sim_sky.py @@ -144,7 +144,6 @@ def simulate_sky_map_signal(job, otherargs, runargs, data): toast.ops.Delete(detdata=[ job_ops.scan_map_weights.weights, ]).apply(data) - data.info() # Scan signal from WCS maps From 7e1312ee92aea3fd327e5455d695f1c0fc426992 Mon Sep 17 00:00:00 2001 From: Reijo Keskitalo Date: Tue, 6 May 2025 11:46:17 -0700 Subject: [PATCH 09/16] Report the number and size of process groups --- sotodlib/toast/workflows/scripting.py | 6 ++++++ sotodlib/toast/workflows/sim_observe.py | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/sotodlib/toast/workflows/scripting.py b/sotodlib/toast/workflows/scripting.py index 12cffeeaf..185bdfca1 100644 --- a/sotodlib/toast/workflows/scripting.py +++ b/sotodlib/toast/workflows/scripting.py @@ -42,6 +42,12 @@ def load_or_simulate_observing(job, otherargs, runargs, comm): else: group_size = wrk.reduction_group_size(job, runargs, comm) toast_comm = toast.Comm(world=comm, groupsize=group_size) + log.info_rank( + f" Created a TOAST communicator with {toast_comm.world_size} ranks" + f" and {toast_comm.ngroups} process groups of " + f"{toast_comm.group_size} ranks each", + comm, + ) data = toast.Data(comm=toast_comm) # Load data from all formats wrk.load_data_hdf5(job, otherargs, runargs, data) diff --git a/sotodlib/toast/workflows/sim_observe.py b/sotodlib/toast/workflows/sim_observe.py index b22ff29b0..d6f9f464e 100644 --- a/sotodlib/toast/workflows/sim_observe.py +++ b/sotodlib/toast/workflows/sim_observe.py @@ -214,6 +214,12 @@ def simulate_observing(job, otherargs, runargs, comm): # Create the toast communicator. toast_comm = toast.Comm(world=comm, groupsize=group_size) + log.info_rank( + f" Created a TOAST communicator with {toast_comm.world_size} ranks" + f" and {toast_comm.ngroups} process groups of " + f"{toast_comm.group_size} ranks each", + comm, + ) # The data container data = toast.Data(comm=toast_comm) From 34eac4b3403c2a4b936052c43aba195953e0b83f Mon Sep 17 00:00:00 2001 From: Reijo Keskitalo Date: Fri, 30 May 2025 16:22:51 -0700 Subject: [PATCH 10/16] Add single detector WCS scanning --- sotodlib/toast/workflows/sim_sky.py | 62 +++++++++++++++-------------- 1 file changed, 33 insertions(+), 29 deletions(-) diff --git a/sotodlib/toast/workflows/sim_sky.py b/sotodlib/toast/workflows/sim_sky.py index 90a5a7b81..280acf523 100644 --- a/sotodlib/toast/workflows/sim_sky.py +++ b/sotodlib/toast/workflows/sim_sky.py @@ -43,6 +43,7 @@ def setup_simulate_sky_map_signal(operators): ) ) operators.append(toast.ops.ScanWCSMap(name="scan_wcs_map", enabled=False)) + operators.append(toast.ops.ScanWCSDetectorMap(name="scan_wcs_detector_map", enabled=False)) operators.append( toast.ops.StokesWeights( name="scan_wcs_map_weights", @@ -91,7 +92,9 @@ def simulate_sky_map_signal(job, otherargs, runargs, data): job_ops = job.operators scan_healpix = job_ops.scan_map.enabled or job_ops.scan_detector_map.enabled - scan_wcs = job_ops.scan_wcs_map.enabled + scan_wcs = ( + job_ops.scan_wcs_map.enabled or job_ops.scan_wcs_detector_map.enabled + ) scan_map = scan_healpix or scan_wcs # Check for conflicting options @@ -147,34 +150,35 @@ def simulate_sky_map_signal(job, otherargs, runargs, data): # Scan signal from WCS maps - if scan_wcs: - if job_ops.scan_wcs_map_pixels.enabled: - # We are using a custom pointing matrix - job_ops.scan_wcs_map_pixels.detector_pointing = job_ops.det_pointing_radec_sim - job_ops.scan_wcs_map.pixel_dist = "scan_wcs_map_pixel_dist" - job_ops.scan_wcs_map.pixel_pointing = job_ops.scan_wcs_map_pixels - else: - # We are using the same pointing matrix as the mapmaking - job_ops.scan_wcs_map.pixel_dist = job_ops.binner_final.pixel_dist - job_ops.scan_wcs_map.pixel_pointing = job.pixels_final - if job_ops.scan_wcs_map_weights.enabled: - job_ops.scan_wcs_map_weights.detector_pointing = job_ops.det_pointing_radec_sim - job_ops.scan_wcs_map.stokes_weights = job_ops.scan_wcs_map_weights - else: - job_ops.scan_wcs_map.stokes_weights = job_ops.weights_radec - job_ops.scan_wcs_map.save_pointing = otherargs.full_pointing - job_ops.scan_wcs_map.apply(data) - if job_ops.scan_wcs_map_pixels.enabled: - # Clean up our custom pointing - toast.ops.Delete(detdata=[ - job_ops.scan_wcs_map_pixels.pixels, - job_ops.det_pointing_radec_sim.quats, - ]).apply(data) - if job_ops.scan_wcs_map_weights.enabled: - # Clean up our custom pointing - toast.ops.Delete(detdata=[ - job_ops.scan_wcs_map_weights.weights, - ]).apply(data) + for scan_op in job_ops.scan_wcs_map, job_ops.scan_wcs_detector_map: + if scan_op.enabled: + if job_ops.scan_wcs_map_pixels.enabled: + # We are using a custom pointing matrix + job_ops.scan_wcs_map_pixels.detector_pointing = job_ops.det_pointing_radec_sim + scan_op.pixel_dist = "scan_wcs_map_pixel_dist" + scan_op.pixel_pointing = job_ops.scan_wcs_map_pixels + else: + # We are using the same pointing matrix as the mapmaking + scan_op.pixel_dist = job_ops.binner_final.pixel_dist + scan_op.pixel_pointing = job.pixels_final + if job_ops.scan_wcs_map_weights.enabled: + job_ops.scan_wcs_map_weights.detector_pointing = job_ops.det_pointing_radec_sim + scan_op.stokes_weights = job_ops.scan_wcs_map_weights + else: + scan_op.stokes_weights = job_ops.weights_radec + scan_op.save_pointing = otherargs.full_pointing + scan_op.apply(data) + if job_ops.scan_wcs_map_pixels.enabled: + # Clean up our custom pointing + toast.ops.Delete(detdata=[ + job_ops.scan_wcs_map_pixels.pixels, + job_ops.det_pointing_radec_sim.quats, + ]).apply(data) + if job_ops.scan_wcs_map_weights.enabled: + # Clean up our custom pointing + toast.ops.Delete(detdata=[ + job_ops.scan_wcs_map_weights.weights, + ]).apply(data) def setup_simulate_conviqt_signal(operators): From 6c159909d7f1b630aed6ae0921961dd10f443314 Mon Sep 17 00:00:00 2001 From: Reijo Keskitalo Date: Mon, 23 Jun 2025 08:17:20 -0700 Subject: [PATCH 11/16] Don't override full_pointing=True with otherargs.full_pointing --- sotodlib/toast/workflows/pointing.py | 6 +++--- sotodlib/toast/workflows/proc_characterize.py | 8 ++++---- sotodlib/toast/workflows/proc_flagging.py | 4 ++-- sotodlib/toast/workflows/proc_mapmaker.py | 4 ++-- sotodlib/toast/workflows/proc_mapmaker_filterbin.py | 2 +- sotodlib/toast/workflows/sim_sky.py | 4 ++-- 6 files changed, 14 insertions(+), 14 deletions(-) diff --git a/sotodlib/toast/workflows/pointing.py b/sotodlib/toast/workflows/pointing.py index b3c2fec3f..625c34335 100644 --- a/sotodlib/toast/workflows/pointing.py +++ b/sotodlib/toast/workflows/pointing.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023-2024 Simons Observatory. +# Copyright (c) 2023-2025 Simons Observatory. # Full license can be found in the top level "LICENSE" file. import numpy as np @@ -167,11 +167,11 @@ def select_pointing(job, otherargs, runargs, data): if hasattr(job_ops, "binner"): job_ops.binner.pixel_pointing = job.pixels_solve job_ops.binner.stokes_weights = job.weights_solve - job_ops.binner.full_pointing = otherargs.full_pointing + job_ops.binner.full_pointing |= otherargs.full_pointing if hasattr(job_ops, "binner_final"): job_ops.binner_final.pixel_pointing = job.pixels_final job_ops.binner_final.stokes_weights = job.weights_final - job_ops.binner_final.full_pointing = otherargs.full_pointing + job_ops.binner_final.full_pointing |= otherargs.full_pointing # If we are not using a different binner for our final binning, use the # same one as the solve. if not job_ops.binner_final.enabled: diff --git a/sotodlib/toast/workflows/proc_characterize.py b/sotodlib/toast/workflows/proc_characterize.py index 133fd9ee0..494f5dc55 100644 --- a/sotodlib/toast/workflows/proc_characterize.py +++ b/sotodlib/toast/workflows/proc_characterize.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023-2023 Simons Observatory. +# Copyright (c) 2023-2025 Simons Observatory. # Full license can be found in the top level "LICENSE" file. """Timestream processing filters. """ @@ -116,7 +116,7 @@ def hn_map(job, otherargs, runargs, data): job_ops.h_n.pixel_pointing = job.pixels_final job_ops.h_n.pixel_dist = job_ops.binner_final.pixel_dist job_ops.h_n.output_dir = otherargs.out_dir - job_ops.h_n.save_pointing = otherargs.full_pointing + job_ops.h_n.save_pointing |= otherargs.full_pointing job_ops.h_n.apply(data) @@ -154,7 +154,7 @@ def cadence_map(job, otherargs, runargs, data): job_ops.cadence_map.pixel_pointing = job.pixels_final job_ops.cadence_map.pixel_dist = job_ops.binner_final.pixel_dist job_ops.cadence_map.output_dir = otherargs.out_dir - job_ops.cadence_map.save_pointing = otherargs.full_pointing + job_ops.cadence_map.save_pointing |= otherargs.full_pointing job_ops.cadence_map.apply(data) @@ -192,5 +192,5 @@ def crosslinking_map(job, otherargs, runargs, data): job_ops.crosslinking.pixel_pointing = job.pixels_final job_ops.crosslinking.pixel_dist = job_ops.binner_final.pixel_dist job_ops.crosslinking.output_dir = otherargs.out_dir - job_ops.crosslinking.save_pointing = otherargs.full_pointing + job_ops.crosslinking.save_pointing |= otherargs.full_pointing job_ops.crosslinking.apply(data) diff --git a/sotodlib/toast/workflows/proc_flagging.py b/sotodlib/toast/workflows/proc_flagging.py index 4bd683d4d..60bd055d1 100644 --- a/sotodlib/toast/workflows/proc_flagging.py +++ b/sotodlib/toast/workflows/proc_flagging.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023-2024 Simons Observatory. +# Copyright (c) 2023-2025 Simons Observatory. # Full license can be found in the top level "LICENSE" file. """Timestream flagging operations. """ @@ -343,5 +343,5 @@ def processing_mask(job, otherargs, runargs, data): # We are using the same pointing matrix as the mapmaking job_ops.processing_mask.pixel_dist = job_ops.binner.pixel_dist job_ops.processing_mask.pixel_pointing = job.pixels_solve - job_ops.processing_mask.save_pointing = otherargs.full_pointing + job_ops.processing_mask.save_pointing |= otherargs.full_pointing job_ops.processing_mask.apply(data) diff --git a/sotodlib/toast/workflows/proc_mapmaker.py b/sotodlib/toast/workflows/proc_mapmaker.py index b01445627..80162bffe 100644 --- a/sotodlib/toast/workflows/proc_mapmaker.py +++ b/sotodlib/toast/workflows/proc_mapmaker.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023-2024 Simons Observatory. +# Copyright (c) 2023-2025 Simons Observatory. # Full license can be found in the top level "LICENSE" file. """Template regression mapmaking. """ @@ -176,7 +176,7 @@ def mapmaker_select_noise_and_binner(job, otherargs, runargs, data): job_tmpls.noise_model = noise_model elif job_ops.filterbin.enabled: job_ops.filterbin.binning = job_ops.binner_final - job_ops.filterbin.binning.full_pointing = otherargs.full_pointing + job_ops.filterbin.binning.full_pointing |= otherargs.full_pointing job_ops.filterbin.det_data = job_ops.sim_noise.det_data job_ops.filterbin.output_dir = otherargs.out_dir diff --git a/sotodlib/toast/workflows/proc_mapmaker_filterbin.py b/sotodlib/toast/workflows/proc_mapmaker_filterbin.py index 31919c723..353f1aa23 100644 --- a/sotodlib/toast/workflows/proc_mapmaker_filterbin.py +++ b/sotodlib/toast/workflows/proc_mapmaker_filterbin.py @@ -47,7 +47,7 @@ def mapmaker_filterbin(job, otherargs, runargs, data): if job_ops.filterbin.enabled: job_ops.filterbin.binning = job_ops.binner_final - job_ops.filterbin.binning.full_pointing = otherargs.full_pointing + job_ops.filterbin.binning.full_pointing |= otherargs.full_pointing job_ops.filterbin.det_data = job_ops.sim_noise.det_data job_ops.filterbin.output_dir = otherargs.out_dir if otherargs.obsmaps: diff --git a/sotodlib/toast/workflows/sim_sky.py b/sotodlib/toast/workflows/sim_sky.py index 280acf523..15fa3303a 100644 --- a/sotodlib/toast/workflows/sim_sky.py +++ b/sotodlib/toast/workflows/sim_sky.py @@ -134,7 +134,7 @@ def simulate_sky_map_signal(job, otherargs, runargs, data): scan_op.stokes_weights = job_ops.scan_map_weights else: scan_op.stokes_weights = job_ops.weights_radec - scan_op.save_pointing = otherargs.full_pointing + scan_op.save_pointing |= otherargs.full_pointing scan_op.apply(data) if job_ops.scan_map_pixels.enabled: # Clean up our custom pointing @@ -166,7 +166,7 @@ def simulate_sky_map_signal(job, otherargs, runargs, data): scan_op.stokes_weights = job_ops.scan_wcs_map_weights else: scan_op.stokes_weights = job_ops.weights_radec - scan_op.save_pointing = otherargs.full_pointing + scan_op.save_pointing |= otherargs.full_pointing scan_op.apply(data) if job_ops.scan_wcs_map_pixels.enabled: # Clean up our custom pointing From 518527a83caf88430fb7dcf39ebca1a7eb078567 Mon Sep 17 00:00:00 2001 From: Reijo Keskitalo Date: Wed, 25 Jun 2025 02:41:18 -0700 Subject: [PATCH 12/16] Propagate stokes weight mode to demodulated weights --- sotodlib/toast/workflows/proc_demodulation.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sotodlib/toast/workflows/proc_demodulation.py b/sotodlib/toast/workflows/proc_demodulation.py index b581311b5..94ed87136 100644 --- a/sotodlib/toast/workflows/proc_demodulation.py +++ b/sotodlib/toast/workflows/proc_demodulation.py @@ -91,7 +91,9 @@ def demodulate(job, otherargs, runargs, data): comm=data.comm.comm_world, timer=timer, ) - demod_weights = toast.ops.StokesWeightsDemod() + demod_weights = toast.ops.StokesWeightsDemod( + mode=job_ops.weights_radec.mode + ) job_ops.weights_radec = demod_weights if hasattr(job_ops, "binner"): job_ops.binner.stokes_weights = demod_weights From 656923c686080802713e521c584df2aea885d635 Mon Sep 17 00:00:00 2001 From: keskitalo Date: Thu, 26 Jun 2025 09:33:01 +0300 Subject: [PATCH 13/16] Do not attempt to manipulate a non-existent trait --- sotodlib/toast/workflows/sim_sky.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sotodlib/toast/workflows/sim_sky.py b/sotodlib/toast/workflows/sim_sky.py index 15fa3303a..1693b47bc 100644 --- a/sotodlib/toast/workflows/sim_sky.py +++ b/sotodlib/toast/workflows/sim_sky.py @@ -166,7 +166,8 @@ def simulate_sky_map_signal(job, otherargs, runargs, data): scan_op.stokes_weights = job_ops.scan_wcs_map_weights else: scan_op.stokes_weights = job_ops.weights_radec - scan_op.save_pointing |= otherargs.full_pointing + if hasattr(scan_op, "save_pointing"): + scan_op.save_pointing |= otherargs.full_pointing scan_op.apply(data) if job_ops.scan_wcs_map_pixels.enabled: # Clean up our custom pointing From 176633ff385596a39b722839e3ca18ec265a847c Mon Sep 17 00:00:00 2001 From: Reijo Keskitalo Date: Thu, 26 Jun 2025 07:45:47 -0700 Subject: [PATCH 14/16] Skip over existing splits (when restarting a failed job) --- sotodlib/toast/ops/splits.py | 132 +++++++++++++----- sotodlib/toast/workflows/proc_demodulation.py | 2 +- 2 files changed, 99 insertions(+), 35 deletions(-) diff --git a/sotodlib/toast/ops/splits.py b/sotodlib/toast/ops/splits.py index 796207dcf..1161994b4 100644 --- a/sotodlib/toast/ops/splits.py +++ b/sotodlib/toast/ops/splits.py @@ -449,6 +449,12 @@ class Splits(Operator): "interval.", ) + skip_existing = Bool( + False, + help="If the mapmaker reports that all requested output files exist " + "for a split, skip the mapmaking step." + ) + shared_flags = Unicode( defaults.shared_flags, allow_none=True, @@ -586,49 +592,57 @@ def _exec(self, data, detectors=None, **kwargs): # Loop over splits for split_name, spl in self._split_obj.items(): - log.info_rank(f"Running Split '{split_name}'", comm=data.comm.comm_world) # Set mapmaker name based on split and the name of this # Splits operator. mname = f"{self.name}_{split_name}" self.mapmaker.name = mname + log.info_rank( + f"Running Split '{split_name}'", comm=data.comm.comm_world + ) # Apply this split for ob in data.obs: spl.create_split(ob) - if self.splits_as_flags: - # Split is applied through flagging, not as a view - # Flag samples outside the intervals by prefixing '~' - # to the view name - FlagIntervals( - shared_flags=self.shared_flags, - shared_flag_bytes=1, - view_mask=[ - (f"~{spl.split_intervals}", np.uint8(self.shared_flag_mask)) - ], - reset=True, - ).apply(data) - map_binner.shared_flag_mask |= self.shared_flag_mask + if self.skip_existing and self.splits_exist(data, split_name=split_name): + log.info_rank( + f"All outputs for split '{split_name}' exist, skipping...", + comm=data.comm.comm_world, + ) else: - # Set mapmaking tools to use the current split interval list - map_binner.pixel_pointing.view = spl.split_intervals - - if not map_binner.full_pointing: - # We are not using full pointing and so we clear the - # residual pointing for this split - toast.ops.Delete( - detdata=[ - map_binner.pixel_pointing.pixels, - map_binner.stokes_weights.weights, - map_binner.pixel_pointing.detector_pointing.quats, - ], - ).apply(data) - - # Run mapmaking - self.mapmaker.apply(data) - - # Write - self.write_splits(data, split_name=split_name) + if self.splits_as_flags: + # Split is applied through flagging, not as a view + # Flag samples outside the intervals by prefixing '~' + # to the view name + FlagIntervals( + shared_flags=self.shared_flags, + shared_flag_bytes=1, + view_mask=[ + (f"~{spl.split_intervals}", np.uint8(self.shared_flag_mask)) + ], + reset=True, + ).apply(data) + map_binner.shared_flag_mask |= self.shared_flag_mask + else: + # Set mapmaking tools to use the current split interval list + map_binner.pixel_pointing.view = spl.split_intervals + + if not map_binner.full_pointing: + # We are not using full pointing and so we clear the + # residual pointing for this split + toast.ops.Delete( + detdata=[ + map_binner.pixel_pointing.pixels, + map_binner.stokes_weights.weights, + map_binner.pixel_pointing.detector_pointing.quats, + ], + ).apply(data) + + # Run mapmaking + self.mapmaker.apply(data) + + # Write + self.write_splits(data, split_name=split_name) # Remove split for ob in data.obs: @@ -640,6 +654,56 @@ def _exec(self, data, detectors=None, **kwargs): map_binner.pixel_pointing.view = pointing_view_save map_binner.pixel_pointing.shared_flag_mask = shared_flag_mask_save + def splits_exist(self, data, split_name=None): + """Write out all split products.""" + if not hasattr(self, "_split_obj"): + msg = "No splits have been created yet, cannot check existence" + raise RuntimeError(msg) + + if hasattr(self.mapmaker, "map_binning"): + pixel_pointing = self.mapmaker.map_binning.pixel_pointing + else: + pixel_pointing = self.mapmaker.binning.pixel_pointing + + if split_name is None: + to_write = dict(self._split_obj) + else: + to_write = {split_name: self._split_obj[split_name]} + + if self.mapmaker.write_hdf5: + fname_suffix = "h5" + else: + fname_suffix = "fits" + + all_exist = True + for spname, spl in to_write.items(): + mname = f"{self.name}_{split_name}" + for prod, binner_key, write in [ + ("hits", None, self.write_hits), + ("cov", None, self.write_cov), + ("invcov", None, self.write_invcov), + ("rcond", None, self.write_rcond), + ("map", "binned", self.write_map), + ("noiseweighted_map", "noiseweighted", self.write_noiseweighted_map), + ]: + if not write: + continue + if binner_key is not None: + # get the product name from BinMap + mkey = getattr(self.mapmaker.binning, binner_key) + else: + # hits and covariance are not made by BinMap. + # Try synthesizing the product name + mkey = f"{mname}_{prod}" + fname = os.path.join( + self.output_dir, + f"{self.name}_{spname}_{prod}.{fname_suffix}" + ) + if not os.path.isfile(fname): + all_exist = False + break + return all_exist + def write_splits(self, data, split_name=None): """Write out all split products.""" if not hasattr(self, "_split_obj"): @@ -682,7 +746,7 @@ def write_splits(self, data, split_name=None): mkey = f"{mname}_{prod}" if mkey not in data: msg = f"'{mkey}' not found in data. " - smg += f"Available keys are {data.keys()}" + msg += f"Available keys are {data.keys()}" raise RuntimeError(msg) fname = os.path.join( self.output_dir, diff --git a/sotodlib/toast/workflows/proc_demodulation.py b/sotodlib/toast/workflows/proc_demodulation.py index 94ed87136..64b9f8946 100644 --- a/sotodlib/toast/workflows/proc_demodulation.py +++ b/sotodlib/toast/workflows/proc_demodulation.py @@ -92,7 +92,7 @@ def demodulate(job, otherargs, runargs, data): timer=timer, ) demod_weights = toast.ops.StokesWeightsDemod( - mode=job_ops.weights_radec.mode + mode=job_ops.demodulate.mode ) job_ops.weights_radec = demod_weights if hasattr(job_ops, "binner"): From 9c33aaa5ddf296c31d75afa398ae71bcc94ce062 Mon Sep 17 00:00:00 2001 From: keskitalo Date: Wed, 17 Dec 2025 10:19:26 -0800 Subject: [PATCH 15/16] Make sure shared flags do not get altered --- sotodlib/toast/ops/splits.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/sotodlib/toast/ops/splits.py b/sotodlib/toast/ops/splits.py index 1161994b4..80e218e57 100644 --- a/sotodlib/toast/ops/splits.py +++ b/sotodlib/toast/ops/splits.py @@ -590,6 +590,17 @@ def _exec(self, data, detectors=None, **kwargs): if hasattr(self.mapmaker, trt): setattr(self.mapmaker, trt, True) + # Optionally make copies of the shared flags + if self.splits_as_flags: + save_shared_flags = {} + for ob in data.obs: + if ob.comm_col_rank == 0: + save_shared_flags[ob.name] = ob.shared[self.shared_flags].data + else: + save_shared_flags[ob.name] = None + else: + save_shared_flags = None + # Loop over splits for split_name, spl in self._split_obj.items(): # Set mapmaker name based on split and the name of this @@ -654,6 +665,15 @@ def _exec(self, data, detectors=None, **kwargs): map_binner.pixel_pointing.view = pointing_view_save map_binner.pixel_pointing.shared_flag_mask = shared_flag_mask_save + # Optionally restore shared flags + if save_shared_flags is not None: + for ob in data.obs: + ob.shared[self.shared_flags].set( + save_shared_flags[ob.name], offset=(0,), fromrank=0 + ) + + return + def splits_exist(self, data, split_name=None): """Write out all split products.""" if not hasattr(self, "_split_obj"): From 09e374b354305a3525f2ea379be4878e5d28c974 Mon Sep 17 00:00:00 2001 From: keskitalo Date: Wed, 17 Dec 2025 10:19:38 -0800 Subject: [PATCH 16/16] Add unit test for splits_as_flags --- tests/test_toast_splits.py | 48 ++++++++++++++++++++++++++++++-------- 1 file changed, 38 insertions(+), 10 deletions(-) diff --git a/tests/test_toast_splits.py b/tests/test_toast_splits.py index e469e6870..b48499f6a 100644 --- a/tests/test_toast_splits.py +++ b/tests/test_toast_splits.py @@ -1,4 +1,4 @@ -# Copyright (c) 2024 Simons Observatory. +# Copyright (c) 2024-2025 Simons Observatory. # Full license can be found in the top level "LICENSE" file. """Check functionality of Splits. @@ -7,6 +7,7 @@ import unittest import astropy.units as u +import healpy as hp import numpy as np try: @@ -105,23 +106,50 @@ def test_split_maps(self): ) # Test that we can instantiate all the splits + split_names = [ + "all", + "left_going", + "right_going", + "outer_detectors", + "inner_detectors", + "polA", + "polB", + ] splits = so_ops.Splits( name="splits", - splits=[ - "all", - "left_going", - "right_going", - "outer_detectors", - "inner_detectors", - "polA", - "polB", - ], + splits=split_names, mapmaker=mapmaker, output_dir=self.outdir, ) splits.apply(data) + # Test that we get the same result using flags instead of intervals + splits2 = so_ops.Splits( + name="splits_w_flags", + splits=split_names, + mapmaker=mapmaker, + output_dir=self.outdir, + splits_as_flags=True, + ) + + splits2.apply(data) + + for split_name in split_names: + fname1 = os.path.join( + self.outdir, + f"{splits.name}_{split_name}_map.fits", + ) + fname2 = os.path.join( + self.outdir, + f"{splits2.name}_{split_name}_map.fits", + ) + m1 = hp.read_map(fname1, None) + m2 = hp.read_map(fname2, None) + assert np.all((m1 == 0) == (m2 == 0)) + mask = m1 != 0 + assert np.allclose(m1[mask], m2[mask]) + close_data_and_comm(data)