Source code for sotodlib.toast.workflows.sim_observe

# Copyright (c) 2023-2024 Simons Observatory.
# Full license can be found in the top level "LICENSE" file.
"""Simulated observing / scanning motion of the telescope.
"""

import numpy as np
from astropy import units as u
import toast
from toast.observation import default_values as defaults
import toast.ops

from .. import ops as so_ops
from ..instrument import simulated_telescope
from .job import workflow_timer


[docs] def setup_simulate_observing(parser, operators): """Add commandline args and operators for simulated observing. Args: parser (ArgumentParser): The parser to update. operators (list): The list of operators to extend. Returns: None """ parser.add_argument( "--hardware", required=False, default=None, help="Input hardware file" ) parser.add_argument( "--det_info_file", required=False, default=None, help="Input detector info file for real hardware maps", ) parser.add_argument( "--det_info_version", required=False, default=None, help="Detector info file version such as 'Cv4'", ) parser.add_argument( "--thinfp", required=False, type=int, help="Thin the focalplane by this factor", ) parser.add_argument( "--bands", required=False, default=None, help="Comma-separated list of bands: LAT_f030 (27GHz), LAT_f040 (39GHz), " "LAT_f090 (93GHz), LAT_f150 (145GHz), " "LAT_f230 (225GHz), LAT_f290 (285GHz), " "SAT_f030 (27GHz), SAT_f040 (39GHz), " "SAT_f090 (93GHz), SAT_f150 (145GHz), " "SAT_f230 (225GHz), SAT_f290 (285GHz). ", ) group = parser.add_mutually_exclusive_group(required=False) group.add_argument( "--telescope", default=None, help="Telescope to simulate: LAT, SAT1, SAT2, SAT3, SAT4.", ) group.add_argument( "--tube_slots", default=None, help="Comma-separated list of optics tube slots: c1 (LAT_UHF), i5 (LAT_UHF), " " i6 (LAT_MF), i1 (LAT_MF), i3 (LAT_MF), i4 (LAT_MF), o6 (LAT_LF)," " ST1 (SAT_MF), ST2 (SAT_MF), ST3 (SAT_UHF), ST4 (SAT_LF).", ) group.add_argument( "--wafer_slots", default=None, help="Comma-separated list of wafer slots. " ) parser.add_argument( "--sample_rate", required=False, default=10, help="Sampling rate", type=float, ) parser.add_argument( "--schedule", required=False, default=None, help="Input observing schedule" ) parser.add_argument( "--sort_schedule", required=False, default=False, action="store_true", help="Sort the observing schedule by mean boresight RA. " "This can limit the area of sky each process group deals with.", ) parser.add_argument( "--realization", required=False, default=None, help="Realization index", type=int, ) parser.add_argument( "--pwv_limit", required=False, type=float, help="If set, discard observations with simulated PWV " "higher than the limit [mm]", ) operators.append( toast.ops.SimGround( name="sim_ground", weather="atacama", detset_key="pixel", session_split_key="wafer_slot", enabled=False, so3g_compat_mode=True, ) ) operators.append(so_ops.CoRotator(name="corotate_lat")) operators.append(toast.ops.PerturbHWP(name="perturb_hwp", enabled=False)) # Detector quaternion pointing operators.append( toast.ops.PointingDetectorSimple( name="det_pointing_azel_sim", boresight=defaults.boresight_azel, quats="quats_azel_sim", enabled=False, ) ) operators.append( toast.ops.PointingDetectorSimple( name="det_pointing_radec_sim", boresight=defaults.boresight_radec, quats="quats_radec_sim", enabled=False, ) )
[docs] @workflow_timer def simulate_observing(job, otherargs, runargs, comm): """Simulate the observing motion of the selected detectors with the schedule. Args: job (namespace): The configured operators and templates for this job. otherargs (namespace): Other commandline arguments. runargs (namespace): Job related runtime parameters. comm (MPI.Comm): The MPI world communicator (or None). Returns: (Data): The simulated data. """ log = toast.utils.Logger.get() timer = toast.timing.Timer() timer.start() # Configured operators for this job job_ops = job.operators # Make sure we have the required bands and schedule. These might # not be set during a dry-run, but if we got this far they need to # be set. if otherargs.bands is None: msg = "bands must be specified for simulated observing" log.error_rank(msg, comm=comm) raise RuntimeError(msg) if otherargs.schedule is None: msg = "schedule must be specified for simulated observing" log.error_rank(msg, comm=comm) raise RuntimeError(msg) # Simulated telescope telescope = simulated_telescope( hwfile=otherargs.hardware, det_info_file=otherargs.det_info_file, det_info_version=otherargs.det_info_version, telescope_name=otherargs.telescope, sample_rate=otherargs.sample_rate * u.Hz, bands=otherargs.bands, wafer_slots=otherargs.wafer_slots, tube_slots=otherargs.tube_slots, thinfp=otherargs.thinfp, comm=comm, ) ndet = len(telescope.focalplane.detectors) log.info_rank( f" Simulated focalplane with {ndet} detectors " f"(thinfp = {otherargs.thinfp}) in", comm=comm, timer=timer, ) # Load the schedule file schedule = toast.schedule.GroundSchedule() schedule.read(otherargs.schedule, comm=comm) log.info_rank(" Loaded schedule in", comm=comm, timer=timer) if otherargs.sort_schedule: schedule.sort_by_RA() log.info_rank(" Loaded schedule in", comm=comm, timer=timer) if otherargs.sort_schedule: schedule.sort_by_RA() log.info_rank(" Sorted schedule in", comm=comm, timer=timer) mem = toast.utils.memreport(msg="(whole node)", comm=comm, silent=True) log.info_rank(f" After loading schedule: {mem}", comm) # Get the process group size group_size = toast.job_group_size( comm, runargs, schedule=schedule, focalplane=telescope.focalplane, full_pointing=otherargs.full_pointing, ) # Create the toast communicator. toast_comm = toast.Comm(world=comm, groupsize=group_size) log.info_rank( f" Created a TOAST communicator with {toast_comm.world_size} ranks" f" and {toast_comm.ngroups} process groups of " f"{toast_comm.group_size} ranks each", comm, ) # The data container data = toast.Data(comm=toast_comm) timer.clear() timer.start() job_ops.mem_count.prefix = "Before Simulation" job_ops.mem_count.apply(data) # Simulate the telescope pointing job_ops.sim_ground.telescope = telescope job_ops.sim_ground.enabled = True job_ops.sim_ground.schedule = schedule if job_ops.sim_ground.weather is None: job_ops.sim_ground.weather = telescope.site.name if otherargs.realization is not None: job_ops.sim_ground.realization = otherargs.realization log.info_rank(" Running simulated observing...", comm=comm) job_ops.sim_ground.apply(data) log.info_rank(" Simulated telescope pointing in", comm=comm, timer=timer) job_ops.mem_count.prefix = "After Scan Simulation" job_ops.mem_count.apply(data) if otherargs.pwv_limit is not None: iobs = 0 ngood = 0 nbad = 0 while iobs < len(data.obs): pwv = data.obs[iobs].telescope.site.weather.pwv.to_value(u.mm) if pwv <= otherargs.pwv_limit: ngood += 1 iobs += 1 else: nbad += 1 del data.obs[iobs] if len(data.obs) == 0: msg = ( f"PWV limit = {otherargs.pwv_limit} mm rejected all " f"{nbad} observations assigned to this process" ) raise RuntimeError(msg) if toast_comm.comm_group_rank is not None: nbad = toast_comm.comm_group_rank.allreduce(nbad) ngood = toast_comm.comm_group_rank.allreduce(ngood) log.info_rank( f" Discarded {nbad} / {ngood + nbad} observations " f"with PWV > {otherargs.pwv_limit} mm in", comm=comm, timer=timer, ) # Apply LAT co-rotation if job_ops.corotate_lat.enabled: log.info_rank(" Running simulated LAT corotation...", comm=comm) job_ops.corotate_lat.apply(data) log.info_rank(" Apply LAT co-rotation in", comm=comm, timer=timer) # Perturb HWP spin if job_ops.perturb_hwp.enabled: log.info_rank(" Running simulated HWP perturbation...", comm=comm) job_ops.perturb_hwp.apply(data) log.info_rank(" Perturbed HWP rotation in", comm=comm, timer=timer) return data