# Copyright (c) 2023-2024 Simons Observatory.
# Full license can be found in the top level "LICENSE" file.
"""Simulated observing / scanning motion of the telescope.
"""
import numpy as np
from astropy import units as u
import toast
from toast.observation import default_values as defaults
import toast.ops
from .. import ops as so_ops
from ..instrument import simulated_telescope
from .job import workflow_timer
[docs]
def setup_simulate_observing(parser, operators):
"""Add commandline args and operators for simulated observing.
Args:
parser (ArgumentParser): The parser to update.
operators (list): The list of operators to extend.
Returns:
None
"""
parser.add_argument(
"--hardware", required=False, default=None, help="Input hardware file"
)
parser.add_argument(
"--det_info_file",
required=False,
default=None,
help="Input detector info file for real hardware maps",
)
parser.add_argument(
"--det_info_version",
required=False,
default=None,
help="Detector info file version such as 'Cv4'",
)
parser.add_argument(
"--thinfp",
required=False,
type=int,
help="Thin the focalplane by this factor",
)
parser.add_argument(
"--bands",
required=False,
default=None,
help="Comma-separated list of bands: LAT_f030 (27GHz), LAT_f040 (39GHz), "
"LAT_f090 (93GHz), LAT_f150 (145GHz), "
"LAT_f230 (225GHz), LAT_f290 (285GHz), "
"SAT_f030 (27GHz), SAT_f040 (39GHz), "
"SAT_f090 (93GHz), SAT_f150 (145GHz), "
"SAT_f230 (225GHz), SAT_f290 (285GHz). ",
)
group = parser.add_mutually_exclusive_group(required=False)
group.add_argument(
"--telescope",
default=None,
help="Telescope to simulate: LAT, SAT1, SAT2, SAT3, SAT4.",
)
group.add_argument(
"--tube_slots",
default=None,
help="Comma-separated list of optics tube slots: c1 (LAT_UHF), i5 (LAT_UHF), "
" i6 (LAT_MF), i1 (LAT_MF), i3 (LAT_MF), i4 (LAT_MF), o6 (LAT_LF),"
" ST1 (SAT_MF), ST2 (SAT_MF), ST3 (SAT_UHF), ST4 (SAT_LF).",
)
group.add_argument(
"--wafer_slots", default=None, help="Comma-separated list of wafer slots. "
)
parser.add_argument(
"--sample_rate",
required=False,
default=10,
help="Sampling rate",
type=float,
)
parser.add_argument(
"--schedule", required=False, default=None, help="Input observing schedule"
)
parser.add_argument(
"--sort_schedule",
required=False,
default=False,
action="store_true",
help="Sort the observing schedule by mean boresight RA. "
"This can limit the area of sky each process group deals with.",
)
parser.add_argument(
"--realization",
required=False,
default=None,
help="Realization index",
type=int,
)
parser.add_argument(
"--pwv_limit",
required=False,
type=float,
help="If set, discard observations with simulated PWV "
"higher than the limit [mm]",
)
operators.append(
toast.ops.SimGround(
name="sim_ground",
weather="atacama",
detset_key="pixel",
session_split_key="wafer_slot",
enabled=False,
so3g_compat_mode=True,
)
)
operators.append(so_ops.CoRotator(name="corotate_lat"))
operators.append(toast.ops.PerturbHWP(name="perturb_hwp", enabled=False))
# Detector quaternion pointing
operators.append(
toast.ops.PointingDetectorSimple(
name="det_pointing_azel_sim",
boresight=defaults.boresight_azel,
quats="quats_azel_sim",
enabled=False,
)
)
operators.append(
toast.ops.PointingDetectorSimple(
name="det_pointing_radec_sim",
boresight=defaults.boresight_radec,
quats="quats_radec_sim",
enabled=False,
)
)
[docs]
@workflow_timer
def simulate_observing(job, otherargs, runargs, comm):
"""Simulate the observing motion of the selected detectors with the schedule.
Args:
job (namespace): The configured operators and templates for this job.
otherargs (namespace): Other commandline arguments.
runargs (namespace): Job related runtime parameters.
comm (MPI.Comm): The MPI world communicator (or None).
Returns:
(Data): The simulated data.
"""
log = toast.utils.Logger.get()
timer = toast.timing.Timer()
timer.start()
# Configured operators for this job
job_ops = job.operators
# Make sure we have the required bands and schedule. These might
# not be set during a dry-run, but if we got this far they need to
# be set.
if otherargs.bands is None:
msg = "bands must be specified for simulated observing"
log.error_rank(msg, comm=comm)
raise RuntimeError(msg)
if otherargs.schedule is None:
msg = "schedule must be specified for simulated observing"
log.error_rank(msg, comm=comm)
raise RuntimeError(msg)
# Simulated telescope
telescope = simulated_telescope(
hwfile=otherargs.hardware,
det_info_file=otherargs.det_info_file,
det_info_version=otherargs.det_info_version,
telescope_name=otherargs.telescope,
sample_rate=otherargs.sample_rate * u.Hz,
bands=otherargs.bands,
wafer_slots=otherargs.wafer_slots,
tube_slots=otherargs.tube_slots,
thinfp=otherargs.thinfp,
comm=comm,
)
ndet = len(telescope.focalplane.detectors)
log.info_rank(
f" Simulated focalplane with {ndet} detectors "
f"(thinfp = {otherargs.thinfp}) in",
comm=comm,
timer=timer,
)
# Load the schedule file
schedule = toast.schedule.GroundSchedule()
schedule.read(otherargs.schedule, comm=comm)
log.info_rank(" Loaded schedule in", comm=comm, timer=timer)
if otherargs.sort_schedule:
schedule.sort_by_RA()
log.info_rank(" Loaded schedule in", comm=comm, timer=timer)
if otherargs.sort_schedule:
schedule.sort_by_RA()
log.info_rank(" Sorted schedule in", comm=comm, timer=timer)
mem = toast.utils.memreport(msg="(whole node)", comm=comm, silent=True)
log.info_rank(f" After loading schedule: {mem}", comm)
# Get the process group size
group_size = toast.job_group_size(
comm,
runargs,
schedule=schedule,
focalplane=telescope.focalplane,
full_pointing=otherargs.full_pointing,
)
# Create the toast communicator.
toast_comm = toast.Comm(world=comm, groupsize=group_size)
log.info_rank(
f" Created a TOAST communicator with {toast_comm.world_size} ranks"
f" and {toast_comm.ngroups} process groups of "
f"{toast_comm.group_size} ranks each",
comm,
)
# The data container
data = toast.Data(comm=toast_comm)
timer.clear()
timer.start()
job_ops.mem_count.prefix = "Before Simulation"
job_ops.mem_count.apply(data)
# Simulate the telescope pointing
job_ops.sim_ground.telescope = telescope
job_ops.sim_ground.enabled = True
job_ops.sim_ground.schedule = schedule
if job_ops.sim_ground.weather is None:
job_ops.sim_ground.weather = telescope.site.name
if otherargs.realization is not None:
job_ops.sim_ground.realization = otherargs.realization
log.info_rank(" Running simulated observing...", comm=comm)
job_ops.sim_ground.apply(data)
log.info_rank(" Simulated telescope pointing in", comm=comm, timer=timer)
job_ops.mem_count.prefix = "After Scan Simulation"
job_ops.mem_count.apply(data)
if otherargs.pwv_limit is not None:
iobs = 0
ngood = 0
nbad = 0
while iobs < len(data.obs):
pwv = data.obs[iobs].telescope.site.weather.pwv.to_value(u.mm)
if pwv <= otherargs.pwv_limit:
ngood += 1
iobs += 1
else:
nbad += 1
del data.obs[iobs]
if len(data.obs) == 0:
msg = (
f"PWV limit = {otherargs.pwv_limit} mm rejected all "
f"{nbad} observations assigned to this process"
)
raise RuntimeError(msg)
if toast_comm.comm_group_rank is not None:
nbad = toast_comm.comm_group_rank.allreduce(nbad)
ngood = toast_comm.comm_group_rank.allreduce(ngood)
log.info_rank(
f" Discarded {nbad} / {ngood + nbad} observations "
f"with PWV > {otherargs.pwv_limit} mm in",
comm=comm,
timer=timer,
)
# Apply LAT co-rotation
if job_ops.corotate_lat.enabled:
log.info_rank(" Running simulated LAT corotation...", comm=comm)
job_ops.corotate_lat.apply(data)
log.info_rank(" Apply LAT co-rotation in", comm=comm, timer=timer)
# Perturb HWP spin
if job_ops.perturb_hwp.enabled:
log.info_rank(" Running simulated HWP perturbation...", comm=comm)
job_ops.perturb_hwp.apply(data)
log.info_rank(" Perturbed HWP rotation in", comm=comm, timer=timer)
return data