Source code for sotodlib.toast.workflows.sim_sky

# Copyright (c) 2023-2023 Simons Observatory.
# Full license can be found in the top level "LICENSE" file.
"""Simulated detector response to sky signal.
"""

import numpy as np
from astropy import units as u
import toast
import toast.ops

from .. import ops as so_ops
from ..instrument import simulated_telescope
from .job import workflow_timer


[docs] def setup_simulate_sky_map_signal(operators): """Add commandline args and operators for scanning from a map. There are pointing matrix operators associated with each scan map operator and these are disabled by default. If not enabled, the scan map operator use the same pointing matrix as the mapmaking. Args: operators (list): The list of operators to extend. Returns: None """ operators.append(toast.ops.ScanHealpixMap(name="scan_map", enabled=False)) operators.append( toast.ops.StokesWeights( name="scan_map_weights", mode="IQU", weights="weights_scan_map", enabled=False, ) ) operators.append( toast.ops.PixelsHealpix( name="scan_map_pixels", pixels="pixels_scan_map", enabled=False ) ) operators.append(toast.ops.ScanWCSMap(name="scan_wcs_map", enabled=False)) operators.append( toast.ops.StokesWeights( name="scan_wcs_map_weights", mode="IQU", weights="weights_scan_map", enabled=False, ) ) operators.append( toast.ops.PixelsWCS( name="scan_wcs_map_pixels", projection="CAR", pixels="pixels_scan_map", resolution=(0.005 * u.degree, 0.005 * u.degree), submaps=1, auto_bounds=True, enabled=False, ) )
[docs] @workflow_timer def simulate_sky_map_signal(job, otherargs, runargs, data): """Scan a sky map into detector signals. This uses detector pointing to sample from a map into detector timestreams. The maps should already be smoothed with any desired beam effects. We scan the sky with the "final" pointing model in case that is different from the solver pointing model. Args: job (namespace): The configured operators and templates for this job. otherargs (namespace): Other commandline arguments. runargs (namespace): Job related runtime parameters. data (Data): The data container. Returns: None """ log = toast.utils.Logger.get() # Configured operators for this job job_ops = job.operators if job_ops.scan_map.enabled and job_ops.scan_wcs_map.enabled: msg = "Cannot scan from both healpix and WCS maps" log.error(msg) raise RuntimeError(msg) if job_ops.scan_map.enabled: if job_ops.scan_map_pixels.enabled: # We are using a custom pointing matrix job_ops.scan_map_pixels.detector_pointing = job_ops.det_pointing_radec job_ops.scan_map.pixel_dist = "scan_map_pixel_dist" job_ops.scan_map.pixel_pointing = job_ops.scan_map_pixels else: # We are using the same pointing matrix as the mapmaking job_ops.scan_map.pixel_dist = job_ops.binner_final.pixel_dist job_ops.scan_map.pixel_pointing = job.pixels_final if job_ops.scan_map_weights.enabled: job_ops.scan_map_weights.detector_pointing = job_ops.det_pointing_radec job_ops.scan_map.stokes_weights = job_ops.scan_map_weights else: job_ops.scan_map.stokes_weights = job_ops.weights_radec job_ops.scan_map.save_pointing = otherargs.full_pointing job_ops.scan_map.apply(data) if job_ops.scan_wcs_map.enabled: if job_ops.scan_wcs_map_pixels.enabled: # We are using a custom pointing matrix job_ops.scan_wcs_map_pixels.detector_pointing = job_ops.det_pointing_radec job_ops.scan_wcs_map.pixel_dist = "scan_wcs_map_pixel_dist" job_ops.scan_wcs_map.pixel_pointing = job_ops.scan_wcs_map_pixels else: # We are using the same pointing matrix as the mapmaking job_ops.scan_wcs_map.pixel_dist = job_ops.binner_final.pixel_dist job_ops.scan_wcs_map.pixel_pointing = job.pixels_final if job_ops.scan_wcs_map_weights.enabled: job_ops.scan_wcs_map_weights.detector_pointing = job_ops.det_pointing_radec job_ops.scan_wcs_map.stokes_weights = job_ops.scan_wcs_map_weights else: job_ops.scan_wcs_map.stokes_weights = job_ops.weights_radec job_ops.scan_wcs_map.save_pointing = otherargs.full_pointing job_ops.scan_wcs_map.apply(data)
[docs] def setup_simulate_conviqt_signal(operators): """Add commandline args and operators for beam covolution with conviqt. Args: operators (list): The list of operators to extend. Returns: None """ operators.append(toast.ops.SimConviqt(name="conviqt", enabled=False)) operators.append(toast.ops.SimTEBConviqt(name="conviqt_teb", enabled=False))
[docs] @workflow_timer def simulate_conviqt_signal(job, otherargs, runargs, data): """Use libconviqt to generate beam-convolved sky signal timestreams. This uses detector pointing and input beam and sky a_lm expansions to generate detector timestreams. Args: job (namespace): The configured operators and templates for this job. otherargs (namespace): Other commandline arguments. runargs (namespace): Job related runtime parameters. data (Data): The data container. Returns: None """ log = toast.utils.Logger.get() # Configured operators for this job job_ops = job.operators if job_ops.conviqt.enabled and job_ops.sim_ground.hwp_angle is not None: msg = "Data has a half-wave plate. Use conviqt_teb operator " msg += "instead of conviqt." log.error(msg) raise RuntimeError(msg) if job_ops.conviqt_teb.enabled and job_ops.sim_ground.hwp_angle is None: msg = "Data has no half-wave plate. Use conviqt operator " msg += "instead of conviqt_teb." log.error(msg) raise RuntimeError(msg) if job_ops.sim_ground.hwp_angle is None: if job_ops.conviqt.enabled: job_ops.conviqt.comm = data.comm.comm_world job_ops.conviqt.detector_pointing = job_ops.det_pointing_radec job_ops.conviqt.apply(data) else: if job_ops.conviqt_teb.enabled: job_ops.conviqt_teb.comm = data.comm.comm_world job_ops.conviqt_teb.detector_pointing = job_ops.det_pointing_radec job_ops.conviqt_teb.hwp_angle = job_ops.sim_ground.hwp_angle job_ops.conviqt_teb.apply(data)