Source code for sotodlib.toast.workflows.sim_sky

# Copyright (c) 2023-2026 Simons Observatory.
# Full license can be found in the top level "LICENSE" file.
"""Simulated detector response to sky signal.
"""

import numpy as np
from astropy import units as u
import toast
import toast.ops

from .. import ops as so_ops
from ..instrument import simulated_telescope
from .job import workflow_timer


[docs] def setup_simulate_sky_map_signal(operators): """Add commandline args and operators for scanning from a map. There are pointing matrix operators associated with each scan map operator and these are disabled by default. If not enabled, the scan map operator use the same pointing matrix as the mapmaking. Args: operators (list): The list of operators to extend. Returns: None """ operators.append(toast.ops.ScanAlm(name="scan_alm", enabled=False)) operators.append(toast.ops.ScanHealpixMap(name="scan_map", enabled=False)) operators.append( toast.ops.ScanHealpixDetectorMap( name="scan_detector_map", enabled=False ) ) operators.append( toast.ops.StokesWeights( name="scan_map_weights", mode="IQU", weights="weights_scan_map", enabled=False, ) ) operators.append( toast.ops.PixelsHealpix( name="scan_map_pixels", pixels="pixels_scan_map", enabled=False ) ) operators.append(toast.ops.ScanWCSMap(name="scan_wcs_map", enabled=False)) operators.append( toast.ops.ScanWCSDetectorMap(name="scan_wcs_detector_map", enabled=False) ) operators.append( toast.ops.StokesWeights( name="scan_wcs_map_weights", mode="IQU", weights="weights_scan_map", enabled=False, ) ) operators.append( toast.ops.PixelsWCS( name="scan_wcs_map_pixels", projection="CAR", pixels="pixels_scan_map", resolution=(0.005 * u.degree, 0.005 * u.degree), submaps=1, auto_bounds=True, enabled=False, ) )
[docs] @workflow_timer def simulate_sky_map_signal(job, otherargs, runargs, data): """Scan a sky map into detector signals. This uses detector pointing to sample from a map into detector timestreams. The maps should already be smoothed with any desired beam effects. We scan the sky with the "final" pointing model in case that is different from the solver pointing model. Args: job (namespace): The configured operators and templates for this job. otherargs (namespace): Other commandline arguments. runargs (namespace): Job related runtime parameters. data (Data): The data container. Returns: None """ log = toast.utils.Logger.get() # Configured operators for this job job_ops = job.operators scan_healpix = job_ops.scan_map.enabled or job_ops.scan_detector_map.enabled scan_wcs = ( job_ops.scan_wcs_map.enabled or job_ops.scan_wcs_detector_map.enabled ) scan_map = scan_healpix or scan_wcs # Check for conflicting options if scan_healpix and scan_wcs: msg = "Cannot scan from both healpix and WCS maps" log.error(msg) raise RuntimeError(msg) if job_ops.det_pointing_radec_sim is not job_ops.det_pointing_radec: if scan_healpix and not job_ops.scan_map_pixels.enabled: msg = "Simulation pointing is different from data reduction pointing. " \ " You must enable scan_map_pixels" log.error(msg) raise RuntimeError(msg) if scan_wcs and not job_ops.scan_wcs_map_pixels.enabled: msg = "Simulation pointing is different from data reduction pointing. " \ " You must enable scan_wcs_map_pixels" log.error(msg) raise RuntimeError(msg) # Scan signal from a_lm scan_alm = job_ops.scan_alm if scan_alm.enabled: scan_alm.detector_pointing = job_ops.det_pointing_radec_sim scan_alm.stokes_weights = job_ops.weights_radec scan_alm.apply(data) # Scan signal from HEALPix maps for scan_op in job_ops.scan_map, job_ops.scan_detector_map: if scan_op.enabled: if job_ops.scan_map_pixels.enabled: # We are using a custom pointing matrix job_ops.scan_map_pixels.detector_pointing = \ job_ops.det_pointing_radec_sim scan_op.pixel_dist = "scan_map_pixel_dist" scan_op.pixel_pointing = job_ops.scan_map_pixels else: # We are using the same pointing matrix as the mapmaking scan_op.pixel_dist = job_ops.binner_final.pixel_dist scan_op.pixel_pointing = job.pixels_final if job_ops.scan_map_weights.enabled: job_ops.scan_map_weights.detector_pointing = \ job_ops.det_pointing_radec_sim scan_op.stokes_weights = job_ops.scan_map_weights else: scan_op.stokes_weights = job_ops.weights_radec scan_op.save_pointing |= otherargs.full_pointing scan_op.apply(data) if job_ops.scan_map_pixels.enabled: # Clean up our custom pointing toast.ops.Delete(detdata=[ job_ops.scan_map_pixels.pixels, job_ops.det_pointing_radec_sim.quats, ]).apply(data) if job_ops.scan_map_weights.enabled: # Clean up our custom pointing toast.ops.Delete(detdata=[ job_ops.scan_map_weights.weights, ]).apply(data) # Scan signal from WCS maps for scan_op in job_ops.scan_wcs_map, job_ops.scan_wcs_detector_map: if scan_op.enabled: if job_ops.scan_wcs_map_pixels.enabled: # We are using a custom pointing matrix job_ops.scan_wcs_map_pixels.detector_pointing = \ job_ops.det_pointing_radec_sim scan_op.pixel_dist = "scan_wcs_map_pixel_dist" scan_op.pixel_pointing = job_ops.scan_wcs_map_pixels else: # We are using the same pointing matrix as the mapmaking scan_op.pixel_dist = job_ops.binner_final.pixel_dist scan_op.pixel_pointing = job.pixels_final if job_ops.scan_wcs_map_weights.enabled: job_ops.scan_wcs_map_weights.detector_pointing = \ job_ops.det_pointing_radec_sim scan_op.stokes_weights = job_ops.scan_wcs_map_weights else: scan_op.stokes_weights = job_ops.weights_radec if hasattr(scan_op, "save_pointing"): scan_op.save_pointing |= otherargs.full_pointing scan_op.apply(data) if job_ops.scan_wcs_map_pixels.enabled: # Clean up our custom pointing toast.ops.Delete(detdata=[ job_ops.scan_wcs_map_pixels.pixels, job_ops.det_pointing_radec_sim.quats, ]).apply(data) if job_ops.scan_wcs_map_weights.enabled: # Clean up our custom pointing toast.ops.Delete(detdata=[ job_ops.scan_wcs_map_weights.weights, ]).apply(data)
[docs] def setup_simulate_conviqt_signal(operators): """Add commandline args and operators for beam covolution with conviqt. Args: operators (list): The list of operators to extend. Returns: None """ operators.append(toast.ops.SimConviqt(name="conviqt", enabled=False)) operators.append(toast.ops.SimTEBConviqt(name="conviqt_teb", enabled=False))
[docs] @workflow_timer def simulate_conviqt_signal(job, otherargs, runargs, data): """Use libconviqt to generate beam-convolved sky signal timestreams. This uses detector pointing and input beam and sky a_lm expansions to generate detector timestreams. Args: job (namespace): The configured operators and templates for this job. otherargs (namespace): Other commandline arguments. runargs (namespace): Job related runtime parameters. data (Data): The data container. Returns: None """ log = toast.utils.Logger.get() # Configured operators for this job job_ops = job.operators if job_ops.conviqt.enabled and job_ops.sim_ground.hwp_angle is not None: msg = "Data has a half-wave plate. Use conviqt_teb operator " msg += "instead of conviqt." log.error(msg) raise RuntimeError(msg) if job_ops.conviqt_teb.enabled and job_ops.sim_ground.hwp_angle is None: msg = "Data has no half-wave plate. Use conviqt operator " msg += "instead of conviqt_teb." log.error(msg) raise RuntimeError(msg) if job_ops.sim_ground.hwp_angle is None: if job_ops.conviqt.enabled: job_ops.conviqt.comm = data.comm.comm_world job_ops.conviqt.detector_pointing = job_ops.det_pointing_radec_sim job_ops.conviqt.apply(data) else: if job_ops.conviqt_teb.enabled: job_ops.conviqt_teb.comm = data.comm.comm_world job_ops.conviqt_teb.detector_pointing = \ job_ops.det_pointing_radec_sim job_ops.conviqt_teb.hwp_angle = job_ops.sim_ground.hwp_angle job_ops.conviqt_teb.apply(data)