Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions sotodlib/toast/ops/mlmapmaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,21 @@ def _wrap_obs(self, ob, dets, passinfo):

return axobs

@function_timer
def _add_obs(self, mapmaker, name, axobs, nmat, signal_estimate):
""" Add data to the mapmaker

Singled out for more granular timing
"""
mapmaker.add_obs(
name,
axobs,
deslope=self.deslope,
noise_model=nmat,
signal_estimate=signal_estimate,
)
return

@function_timer
def _init_mapmaker(
self, mapmaker, signal_map, mapmaker_prev, eval_prev, comm, gcomm, prefix,
Expand Down Expand Up @@ -749,13 +764,8 @@ def _exec(self, data, detectors=None, **kwargs):
else:
signal_estimate = None

mapmaker.add_obs(
ob.name,
axobs,
deslope=self.deslope,
noise_model=nmat,
signal_estimate=signal_estimate,
)
self._add_obs(mapmaker, ob.name, axobs, nmat, signal_estimate)

del axobs
del signal_estimate

Expand Down
215 changes: 191 additions & 24 deletions sotodlib/toast/ops/splits.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024-2024 Simons Observatory.
# Copyright (c) 2024-2025 Simons Observatory.
# Full license can be found in the top level "LICENSE" file.

import os
Expand All @@ -12,7 +12,7 @@
from toast.timing import function_timer
from toast.intervals import IntervalList
from toast.observation import default_values as defaults
from toast.ops import Operator
from toast.ops import Operator, FlagIntervals
from toast import qarray as qa


Expand Down Expand Up @@ -442,6 +442,53 @@ class Splits(Operator):

splits = List([], help="The list of named splits to apply")

splits_as_flags = Bool(
False,
help="If True, splits are applied through flagging rather than views. "
"This can be more efficient if there are only a few samples in each "
"interval.",
)

skip_existing = Bool(
False,
help="If the mapmaker reports that all requested output files exist "
"for a split, skip the mapmaking step."
)

shared_flags = Unicode(
defaults.shared_flags,
allow_none=True,
help="Observation shared key for telescope flags to use when "
"splits_as_flags is True",
)

shared_flag_mask = Int(
128,
help="Bit mask value for flagging when splits_as_flags is True",
)

write_hits = Bool(True, help="If True, write the hits map")

write_cov = Bool(
False, help="If True, write the white noise covariance matrices."
)

write_invcov = Bool(
True,
help="If True, write the inverse white noise covariance matrices.",
)

write_rcond = Bool(
False, help="If True, write the reciprocal condition numbers."
)

write_map = Bool(True, help="If True, write the filtered/destriped map")

write_noiseweighted_map = Bool(
False,
help="If True, write the noise-weighted map (for fast co-add)",
)

def __init__(self, **kwargs):
super().__init__(**kwargs)
return
Expand Down Expand Up @@ -523,6 +570,7 @@ def _exec(self, data, detectors=None, **kwargs):
else:
map_binner = self.mapmaker.binning
pointing_view_save = map_binner.pixel_pointing.view
shared_flag_mask_save = map_binner.shared_flag_mask

# If the pixel distribution does yet exit, we create it once prior to
# doing any splits.
Expand All @@ -542,36 +590,70 @@ def _exec(self, data, detectors=None, **kwargs):
if hasattr(self.mapmaker, trt):
setattr(self.mapmaker, trt, True)

# Optionally make copies of the shared flags
if self.splits_as_flags:
save_shared_flags = {}
for ob in data.obs:
if ob.comm_col_rank == 0:
save_shared_flags[ob.name] = ob.shared[self.shared_flags].data
else:
save_shared_flags[ob.name] = None
else:
save_shared_flags = None

# Loop over splits
for split_name, spl in self._split_obj.items():
log.info_rank(f"Running Split '{split_name}'", comm=data.comm.comm_world)
# Set mapmaker name based on split and the name of this
# Splits operator.
mname = f"{self.name}_{split_name}"
self.mapmaker.name = mname
log.info_rank(
f"Running Split '{split_name}'", comm=data.comm.comm_world
)

# Apply this split
for ob in data.obs:
spl.create_split(ob)

# Set mapmaking tools to use the current split interval list
map_binner.pixel_pointing.view = spl.split_intervals
if not map_binner.full_pointing:
# We are not using full pointing and so we clear the
# residual pointing for this split
toast.ops.Delete(
detdata=[
map_binner.pixel_pointing.pixels,
map_binner.stokes_weights.weights,
map_binner.pixel_pointing.detector_pointing.quats,
],
).apply(data)

# Run mapmaking
self.mapmaker.apply(data)

# Write
self.write_splits(data, split_name=split_name)
if self.skip_existing and self.splits_exist(data, split_name=split_name):
log.info_rank(
f"All outputs for split '{split_name}' exist, skipping...",
comm=data.comm.comm_world,
)
else:
if self.splits_as_flags:
# Split is applied through flagging, not as a view
# Flag samples outside the intervals by prefixing '~'
# to the view name
FlagIntervals(
shared_flags=self.shared_flags,
shared_flag_bytes=1,
view_mask=[
(f"~{spl.split_intervals}", np.uint8(self.shared_flag_mask))
],
reset=True,
).apply(data)
map_binner.shared_flag_mask |= self.shared_flag_mask
Comment on lines +628 to +636
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inside the Split base class, when a split is created a interval list is created and the per-detector flags are saved. When the split is removed, that interval list is purged and the per-detector flags are restored.

If you are applying the FlagIntervals operator, that will modify the shared flags and leave it in a different state at the end. I think in order to do this cleanly, you would need to save the shared_flag values, overwrite them here for each split, and then restore them when each split is removed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Just modifying a user-specified flag bit is not enough. I added saving and restoring of the shared flags. I also expanded on the existing unit test to confirm that intervals and flags produce the same output maps.

else:
# Set mapmaking tools to use the current split interval list
map_binner.pixel_pointing.view = spl.split_intervals

if not map_binner.full_pointing:
# We are not using full pointing and so we clear the
# residual pointing for this split
toast.ops.Delete(
detdata=[
map_binner.pixel_pointing.pixels,
map_binner.stokes_weights.weights,
map_binner.pixel_pointing.detector_pointing.quats,
],
).apply(data)

# Run mapmaking
self.mapmaker.apply(data)

# Write
self.write_splits(data, split_name=split_name)

# Remove split
for ob in data.obs:
Expand All @@ -581,13 +663,78 @@ def _exec(self, data, detectors=None, **kwargs):
for k, v in mapmaker_save_traits.items():
setattr(self.mapmaker, k, v)
map_binner.pixel_pointing.view = pointing_view_save
map_binner.pixel_pointing.shared_flag_mask = shared_flag_mask_save

# Optionally restore shared flags
if save_shared_flags is not None:
for ob in data.obs:
ob.shared[self.shared_flags].set(
save_shared_flags[ob.name], offset=(0,), fromrank=0
)

return

def splits_exist(self, data, split_name=None):
"""Write out all split products."""
if not hasattr(self, "_split_obj"):
msg = "No splits have been created yet, cannot check existence"
raise RuntimeError(msg)

if hasattr(self.mapmaker, "map_binning"):
pixel_pointing = self.mapmaker.map_binning.pixel_pointing
else:
pixel_pointing = self.mapmaker.binning.pixel_pointing

if split_name is None:
to_write = dict(self._split_obj)
else:
to_write = {split_name: self._split_obj[split_name]}

if self.mapmaker.write_hdf5:
fname_suffix = "h5"
else:
fname_suffix = "fits"

all_exist = True
for spname, spl in to_write.items():
mname = f"{self.name}_{split_name}"
for prod, binner_key, write in [
("hits", None, self.write_hits),
("cov", None, self.write_cov),
("invcov", None, self.write_invcov),
("rcond", None, self.write_rcond),
("map", "binned", self.write_map),
("noiseweighted_map", "noiseweighted", self.write_noiseweighted_map),
]:
if not write:
continue
if binner_key is not None:
# get the product name from BinMap
mkey = getattr(self.mapmaker.binning, binner_key)
else:
# hits and covariance are not made by BinMap.
# Try synthesizing the product name
mkey = f"{mname}_{prod}"
fname = os.path.join(
self.output_dir,
f"{self.name}_{spname}_{prod}.{fname_suffix}"
)
if not os.path.isfile(fname):
all_exist = False
break
return all_exist

def write_splits(self, data, split_name=None):
"""Write out all split products."""
if not hasattr(self, "_split_obj"):
msg = "No splits have been created yet, cannot write"
raise RuntimeError(msg)

if hasattr(self.mapmaker, "map_binning"):
pixel_pointing = self.mapmaker.map_binning.pixel_pointing
else:
pixel_pointing = self.mapmaker.binning.pixel_pointing

if split_name is None:
to_write = dict(self._split_obj)
else:
Expand All @@ -600,10 +747,30 @@ def write_splits(self, data, split_name=None):

for spname, spl in to_write.items():
mname = f"{self.name}_{split_name}"
for prod in ["hits", "map", "invcov", "noiseweighted_map"]:
mkey = f"{mname}_{prod}"
for prod, binner_key, write in [
("hits", None, self.write_hits),
("cov", None, self.write_cov),
("invcov", None, self.write_invcov),
("rcond", None, self.write_rcond),
("map", "binned", self.write_map),
("noiseweighted_map", "noiseweighted", self.write_noiseweighted_map),
]:
if not write:
continue
if binner_key is not None:
# get the product name from BinMap
mkey = getattr(self.mapmaker.binning, binner_key)
else:
# hits and covariance are not made by BinMap.
# Try synthesizing the product name
mkey = f"{mname}_{prod}"
if mkey not in data:
msg = f"'{mkey}' not found in data. "
msg += f"Available keys are {data.keys()}"
raise RuntimeError(msg)
fname = os.path.join(
self.output_dir, f"{self.name}_{spname}_{prod}.{fname_suffix}"
self.output_dir,
f"{self.name}_{spname}_{prod}.{fname_suffix}"
)
data[mkey].write(
fname,
Expand Down
6 changes: 3 additions & 3 deletions sotodlib/toast/workflows/pointing.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023-2024 Simons Observatory.
# Copyright (c) 2023-2025 Simons Observatory.
# Full license can be found in the top level "LICENSE" file.

import numpy as np
Expand Down Expand Up @@ -167,11 +167,11 @@ def select_pointing(job, otherargs, runargs, data):
if hasattr(job_ops, "binner"):
job_ops.binner.pixel_pointing = job.pixels_solve
job_ops.binner.stokes_weights = job.weights_solve
job_ops.binner.full_pointing = otherargs.full_pointing
job_ops.binner.full_pointing |= otherargs.full_pointing
if hasattr(job_ops, "binner_final"):
job_ops.binner_final.pixel_pointing = job.pixels_final
job_ops.binner_final.stokes_weights = job.weights_final
job_ops.binner_final.full_pointing = otherargs.full_pointing
job_ops.binner_final.full_pointing |= otherargs.full_pointing
# If we are not using a different binner for our final binning, use the
# same one as the solve.
if not job_ops.binner_final.enabled:
Expand Down
8 changes: 4 additions & 4 deletions sotodlib/toast/workflows/proc_characterize.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023-2023 Simons Observatory.
# Copyright (c) 2023-2025 Simons Observatory.
# Full license can be found in the top level "LICENSE" file.
"""Timestream processing filters.
"""
Expand Down Expand Up @@ -116,7 +116,7 @@ def hn_map(job, otherargs, runargs, data):
job_ops.h_n.pixel_pointing = job.pixels_final
job_ops.h_n.pixel_dist = job_ops.binner_final.pixel_dist
job_ops.h_n.output_dir = otherargs.out_dir
job_ops.h_n.save_pointing = otherargs.full_pointing
job_ops.h_n.save_pointing |= otherargs.full_pointing
job_ops.h_n.apply(data)


Expand Down Expand Up @@ -154,7 +154,7 @@ def cadence_map(job, otherargs, runargs, data):
job_ops.cadence_map.pixel_pointing = job.pixels_final
job_ops.cadence_map.pixel_dist = job_ops.binner_final.pixel_dist
job_ops.cadence_map.output_dir = otherargs.out_dir
job_ops.cadence_map.save_pointing = otherargs.full_pointing
job_ops.cadence_map.save_pointing |= otherargs.full_pointing
job_ops.cadence_map.apply(data)


Expand Down Expand Up @@ -192,5 +192,5 @@ def crosslinking_map(job, otherargs, runargs, data):
job_ops.crosslinking.pixel_pointing = job.pixels_final
job_ops.crosslinking.pixel_dist = job_ops.binner_final.pixel_dist
job_ops.crosslinking.output_dir = otherargs.out_dir
job_ops.crosslinking.save_pointing = otherargs.full_pointing
job_ops.crosslinking.save_pointing |= otherargs.full_pointing
job_ops.crosslinking.apply(data)
4 changes: 3 additions & 1 deletion sotodlib/toast/workflows/proc_demodulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ def demodulate(job, otherargs, runargs, data):
comm=data.comm.comm_world,
timer=timer,
)
demod_weights = toast.ops.StokesWeightsDemod()
demod_weights = toast.ops.StokesWeightsDemod(
mode=job_ops.demodulate.mode
)
job_ops.weights_radec = demod_weights
if hasattr(job_ops, "binner"):
job_ops.binner.stokes_weights = demod_weights
Expand Down
4 changes: 2 additions & 2 deletions sotodlib/toast/workflows/proc_flagging.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023-2024 Simons Observatory.
# Copyright (c) 2023-2025 Simons Observatory.
# Full license can be found in the top level "LICENSE" file.
"""Timestream flagging operations.
"""
Expand Down Expand Up @@ -343,5 +343,5 @@ def processing_mask(job, otherargs, runargs, data):
# We are using the same pointing matrix as the mapmaking
job_ops.processing_mask.pixel_dist = job_ops.binner.pixel_dist
job_ops.processing_mask.pixel_pointing = job.pixels_solve
job_ops.processing_mask.save_pointing = otherargs.full_pointing
job_ops.processing_mask.save_pointing |= otherargs.full_pointing
job_ops.processing_mask.apply(data)
Loading