Source code for sotodlib.toast.workflows.data

# Copyright (c) 2023-2024 Simons Observatory.
# Full license can be found in the top level "LICENSE" file.
"""Data I/O operations.
"""

import os
import numpy as np
from astropy import units as u
import toast
import toast.ops

from .. import ops as so_ops
from .job import workflow_timer


[docs] def setup_load_data_context(operators): """Add commandline args and operators for loading from a context. Args: operators (list): The list of operators to extend. Returns: None """ operators.append( so_ops.LoadContext( name="load_context", enabled=False, ) ) return
[docs] @workflow_timer def load_data_context(job, otherargs, runargs, data): """Load data from a Context. Args: job (namespace): The configured operators and templates for this job. otherargs (namespace): Other commandline arguments. runargs (namespace): Job related runtime parameters. data (Data): The data container. Returns: None """ # Configured operators for this job job_ops = job.operators # Load it if job_ops.load_context.enabled: # Special handling of ACT data. In this case, the user may have # set the MOBY2_TOD_STAGING_PATH environment variable to force the # data loader to make a copy into ramdisk for opening. This will # break parallel access, since each process tries to create (and # then delete) the same file. Here we append the process rank to # the staging directory if "MOBY2_TOD_STAGING_PATH" in os.environ: procdir = os.path.join( os.environ["MOBY2_TOD_STAGING_PATH"], f"{data.comm.world_rank}", f"{os.getpid()}", ) os.makedirs(procdir, exist_ok=True) os.environ["MOBY2_TOD_STAGING_PATH"] = procdir job_ops.load_context.apply(data)
[docs] def setup_load_data_hdf5(operators): """Add commandline args and operators for loading from HDF5 files. Args: operators (list): The list of operators to extend. Returns: None """ operators.append(toast.ops.LoadHDF5(name="load_hdf5", enabled=False))
[docs] @workflow_timer def load_data_hdf5(job, otherargs, runargs, data): """Load data from one or more HDF5 files. Args: job (namespace): The configured operators and templates for this job. otherargs (namespace): Other commandline arguments. runargs (namespace): Job related runtime parameters. data (Data): The data container. Returns: None """ # Configured operators for this job job_ops = job.operators # Load it if job_ops.load_hdf5.enabled: job_ops.load_hdf5.apply(data)
[docs] def setup_load_data_books(operators): """Add commandline args and operators for directly loading L3 books. Args: operators (list): The list of operators to extend. Returns: None """ operators.append(so_ops.LoadBooks(name="load_books", enabled=False))
[docs] @workflow_timer def load_data_books(job, otherargs, runargs, data): """Load data from one or more books. Args: job (namespace): The configured operators and templates for this job. otherargs (namespace): Other commandline arguments. runargs (namespace): Job related runtime parameters. data (Data): The data container. Returns: None """ # Configured operators for this job job_ops = job.operators # Load it if job_ops.load_books.enabled: job_ops.load_books.apply(data)
[docs] def setup_save_data_hdf5(operators): """Add commandline args and operators for saving to HDF5 files. Args: operators (list): The list of operators to extend. Returns: None """ operators.append(toast.ops.SaveHDF5(name="save_hdf5", enabled=False))
[docs] @workflow_timer def save_data_hdf5(job, otherargs, runargs, data): """Save data to HDF5 files. Args: job (namespace): The configured operators and templates for this job. otherargs (namespace): Other commandline arguments. runargs (namespace): Job related runtime parameters. data (Data): The data container. Returns: None """ # Configured operators for this job job_ops = job.operators # Dump it if job_ops.save_hdf5.enabled: if hasattr(otherargs, "out_dir"): hdf5_out = os.path.join(otherargs.out_dir, "data") job_ops.save_hdf5.volume = hdf5_out job_ops.save_hdf5.apply(data)
[docs] def setup_save_data_books(operators): """Add commandline args and operators for saving to L3 books. Args: operators (list): The list of operators to extend. Returns: None """ operators.append(so_ops.SaveBooks(name="save_books", enabled=False))
[docs] @workflow_timer def save_data_books(job, otherargs, runargs, data): """Save data to books. Args: job (namespace): The configured operators and templates for this job. otherargs (namespace): Other commandline arguments. runargs (namespace): Job related runtime parameters. data (Data): The data container. Returns: None """ # Configured operators for this job job_ops = job.operators # Load it if job_ops.save_books.enabled: job_ops.save_books.apply(data)