Preprocess

The preprocess module defines a standardized interface for TOD processing operations so that they can be easily implemented in automatic data analysis scripts. The core of the system is in two parts, the _Preprocess modules and the Pipeline object. The _Preprocess modules each define how a TOD operation is run on an AxisManager TOD and the Pipeline object is used to define the order of the operations and then run them. The site-pipeline.preprocess_tod script is used to run and save Pipelines on lists of observations, grouped by detset. The site-pipeline.preprocess_obs script is used for observation-level preprocessing. This module is similar to site-pipeline.preprocess_tod but removes grouping by detset so that the entire observation is loaded, without signal. For example, pipeline steps such as DetBiasFlags requires tod-level data including signal, whereas SSOFootprint does not and uses observation-level data.

Preprocessing Pipelines

A preprocessing pipeline is series of modules, each inheriting from _Preprocess, that are defined through a configuration file and intended to be run successively on an AxisManager containing time ordered data.

class sotodlib.preprocess.pcore._Preprocess(step_cfgs)[source]

The base class for Preprocessing modules which defines the required functions and keys required in the configurations.

Each preprocess module has four overwritable functions that are called by the processing scripts in site_pipeline. These four functions are each controlled by a specific key in a configuration dictionary passed to the module on creation.

The configuration dictionary has 6 special keys: name, process, calc, save, select, and plot. name is the name used to register the module with the PIPELINE registry. The other four keys are matched to functions in the module, if the key is not present then that function will be skipped when the preprocessing pipeline is run.

There are two special AxisManagers expected to be part of the preprocessing pipeline. aman is the “standard” time ordered data AxisManager that is loaded via our default styles. proc_aman is the preprocess AxisManager, this is carry the data products that will be saved to whatever Metadata Archive is connected to the preprocessing pipeline.

process(aman, proc_aman, sim=False, data_aman=None)[source]

This function makes changes to the time ordered data AxisManager. Ex: calibrating or detrending the timestreams. This function will use any configuration information under the process key of the configuration dictionary and is not expected to change or alter proc_aman.

Parameters:
  • aman (AxisManager) – The time ordered data

  • proc_aman (AxisManager) – Any information generated by previous elements in the preprocessing pipeline.

  • sim (Bool) – False by default when analyzing data. Should be True when doing Transfer Function simulations and determining which steps should be run.

  • data_aman (AxisManager (Optional)) – An AxisManager containing the preprocessed data to be used by this process.

calc_and_save(aman, proc_aman)[source]

This function calculates data products of some sort off of the time ordered data AxisManager.

Ex: Calcuating the white noise of the timestream. This function will use any configuration information under the calc key of the configuration dictionary and can call the save function to make changes to proc_aman.

Parameters:
  • aman (AxisManager) – The time ordered data

  • proc_aman (AxisManager) – Any information generated by previous elements in the preprocessing pipeline.

save(proc_aman, *args)[source]

This function wraps new information into the proc_aman and will use any configuration information under the save key of the configuration dictionary.

Parameters:
  • proc_aman (AxisManager) – Any information generated by previous elements in the preprocessing pipeline.

  • args (any) – Any additional information calc_and_save needs to send to the save function.

select(meta, proc_aman=None, in_place=True)[source]

This function runs any desired data selection of the preprocessing pipeline results. Assumes the pipeline has already been run and that the resulting proc_aman is now saved under the preprocess key in the meta AxisManager loaded via context.

Ex: removing detectors with white noise above some limit. This function will use any configuration information under the select key.

Parameters:
  • meta (AxisManager) – Metadata related to the specific observation

  • proc_aman (AxisManager) – Optional. Any information generated by previous elements in the preprocessing pipeline.

  • in_place (bool) – Optional. Apply selection and return restricted axis manager if True, else return the flag array.

Returns:

meta – Metadata where non-selected detectors have been removed

Return type:

AxisManager

plot(aman, proc_aman, filename)[source]

This function creates plots using results from calc_and_save.

Ex: Plotting det bias flags. This function will use any configuration information under the plot key of the configuration dictionary.

Parameters:
  • aman (AxisManager) – The time ordered data

  • proc_aman (AxisManager) – Any information generated by previous elements in the preprocessing pipeline.

  • filename (str) – Filename should be a concatenation of the global plot_dir config with a name with process step number and placeholder {name} as shown in Pipeline.run().

classmethod gen_metric(meta, proc_aman)[source]

Generate a QA metric from the output of this process.

Parameters:
  • meta (AxisManager) – Metadata related to the specific observation

  • proc_aman (AxisManager) – The output of the preprocessing pipeline.

Returns:

line – InfluxDB line entry elements to be fed to site_pipeline.monitor.Monitor.record

Return type:

dict

static register(process_class)[source]

Registers a new modules with the PIPELINE

The preprocessing pipeline is defined in the Pipeline class. This class inherits from list so that you can easily find and interact with the various pipeline elements. Note that splicing a pipeline will return a list of process modules that can be used to make a new pipeline.

class sotodlib.preprocess.pcore.Pipeline(modules, plot_dir='./', logger=None, wrap_valid=True)[source]

This class is designed to create and run pipelines out of a series of different preprocessing modules (classes that inherent from _Preprocess). It inherits list object. It also contains the registration of all possible preprocess modules in Pipeline.PIPELINE

append(item)[source]

Append object to the end of the list.

insert(index, item)[source]

Insert object before index.

extend(index, other)[source]

Extend list by appending elements from the iterable.

run(aman, proc_aman=None, full_aman=None, select=True, sim=False, update_plot=False, data_amans=None)[source]

The main workhorse function for the pipeline class. This function takes an AxisManager TOD and successively runs the pipeline of preprocessing modules on the AxisManager. The order of operations called by run are:

for process in pipeline:
    process.process()
    process.calc_and_save()
        process.save() ## called by process.calc_and_save()
    process.select()
Parameters:
  • aman (AxisManager) – A TOD object. Generally expected to be raw, unprocessed data. This axismanager will be edited in place by the process and select functions of each preprocess module

  • proc_aman (AxisManager (Optional)) – A preprocess axismanager. If this is provided it is assumed that the pipeline has previously been run on this specific TOD and has returned this preprocess axismanager. In this case, calls to process.calc_and_save() are skipped as the information is expected to be present in this AxisManager.

  • full_aman (AxisManager (Optional)) – A preprocess axismanager. This axis manager stores the outputs of preprocessing functions (proc_aman) but without any of the detector or samps restrictions applied, thus maintaining its original shape. This is returned at the end of the pipeline. If not passed it is instantiated with the same number of dets and samps as aman.

  • select (boolean (Optional)) – if True, the aman detector axis is restricted as described in each preprocess module. Most pipelines are developed with select=True. Running select=False may produce unstable behavior

  • sim (boolean (Optional)) – if running on sim (sim=True), proccesses with the flag skip_on_sim will be skipped.

  • update_plot (boolean (Optional)) – if True, re-runs plotting (along with processes and selects) given proc_aman is aman.preprocess. This assumes process.calc_and_save() has been run on this aman before and has injested flags and other information into proc_aman.

  • data_amans (dict (Optional)) – A dictionary of AxisManagers with keys (step, process.name) filled with AxisManager processed up to step-1. This is used to pre-load all data AxisManager which could be required when processing simulations (e.g. to provide a T2P template)

Returns:

  • full_aman (AxisManager) – A preprocess axismanager that contains all data products calculated throughout the running of the pipeline.

  • success (str) – A string that stores the name of the last process step that the pipeline completed. If the pipeline successfully finishes all steps, success = ‘end’.

Processing Scripts

These scripts are designed to be the ones that interact with specific configuration files and specific manifest databases.

sotodlib.site_pipeline.preprocess_tod.preprocess_tod(configs: str | dict, obs_id: str, group: dict, verbosity: int = 0, compress: bool = False, overwrite: bool = False)[source]

Meant to be run as part of a batched script, this function calls the preprocessing pipeline a specific Observation ID and group combination and saves the results in the ManifestDb specified in the configs.

Parameters:
  • configs (str or dict) – Config file or loaded config dictionary.

  • obs_id (str or ResultSet entry) – obs_id or obs entry that is passed to context.get_obs.

  • group (list) – The group to be run. For example, this might be [‘ws0’, ‘f090’] if group_by (specified by the subobs->use key in the preprocess config) is [‘wafer_slot’, ‘wafer.bandpass’].

  • verbosity (str) – Log level. 0 = error, 1 = warn, 2 = info, 3 = debug.

  • compress (bool) – Whether or not to compress the preprocessing h5 files.

  • overwrite (bool) – If True, overwrite contents of temporary h5 files.

Returns:

  • out_dict (dict or None) – Dictionary output for init config from get_preproc_group_out_dict if preprocessing ran successfully for init layer or None if preprocessing was loaded or preproc_or_load_group failed.

  • errors (tuple) – A tuple containing the error from PreprocessError, an error message, and the traceback. Each will be None if preproc_or_load_group finished successfully.

sotodlib.site_pipeline.preprocess_tod.load_preprocess_tod_sim(obs_id, sim_map, configs='preprocess_configs.yaml', context=None, dets=None, meta=None, modulated=True, logger=<Logger preprocess (DEBUG)>)[source]

Loads the saved information from the preprocessing pipeline and runs the processing section of the pipeline on simulated data

Assumes preprocess_tod has already been run on the requested observation.

Parameters:
  • obs_id (multiple) – passed to context.get_obs to load AxisManager, see Notes for context.get_obs

  • sim_map (pixell.enmap.ndmap) – signal map containing (T, Q, U) fields

  • configs (string or dictionary) – config file or loaded config directory

  • dets (dict) – dets to restrict on from info in det_info. See context.get_meta.

  • meta (AxisManager) – Contains supporting metadata to use for loading. Can be pre-restricted in any way. See context.get_meta.

  • modulated (bool) – If True, apply the HWP angle model and scan the simulation into a modulated signal. If False, scan the simulation into demodulated timestreams.

Returns:

aman – Axis manager after running through the preprocessing steps. Returns None if all detectors are cut.

Return type:

core.AxisManager

sotodlib.site_pipeline.multilayer_preprocess_tod.multilayer_preprocess_tod(obs_id: str, configs_init: str | dict, configs_proc: str | dict, group: list, verbosity: int = 0, compress: bool = False, overwrite: bool = False)[source]

Meant to be run as part of a batched script, this function calls the preprocessing pipeline a specific Observation ID and group combination and saves the results in the ManifestDb specified in the configs.

Parameters:
  • obs_id (str or ResultSet entry) – obs_id or obs entry that is passed to context.get_obs

  • configs_init (str or dict) – Config file or loaded config dictionary for first layer database.

  • configs_proc (str or dict) – Config file or loaded config dictionary for second layer database.

  • group (list) – The group to be run. For example, this might be [‘ws0’, ‘f090’] if group_by (specified by the subobs->use key in the preprocess config) is [‘wafer_slot’, ‘wafer.bandpass’].

  • verbosity (str) – The log level to use. 0 = error, 1 = warn, 2 = info, 3 = debug.

  • compress (bool) – Whether or not to compress the preprocessing h5 files.

  • overwrite (bool) – If True, overwrite contents of temporary h5 files.

Returns:

  • out_dict_init (dict or None) – Dictionary output for init config from get_preproc_group_out_dict if preprocessing ran successfully for init layer or None if preprocessing was loaded or preproc_or_load_group failed.

  • out_dict_proc (dict or None) – Dictionary output for proc config from get_preproc_group_out_dict if preprocessing ran successfully for proc layer or None if preprocessing was loaded, that layer was not run or loaded, or preproc_or_load_group failed.

  • errors (tuple) – A tuple containing the error from PreprocessError, an error message, and the traceback. Each will be None if preproc_or_load_group finished successfully.

sotodlib.site_pipeline.preprocess_obs.preprocess_obs(obs_id, configs, overwrite=False, logger=None, obs_group=None)[source]

Meant to be run as part of a batched script, this function calls the preprocessing pipeline a specific Observation ID and saves the results in the ManifestDb specified in the configs.

Parameters:
  • obs_id (string or ResultSet entry) – obs_id or obs entry that is passed to context.get_obs

  • configs (string or dictionary) – config file or loaded config directory

  • overwrite (bool) – if True, overwrite existing entries in ManifestDb

  • logger (logging instance) – the logger to print to

  • obs_group (list of strings) – List of obs_ids within group

sotodlib.preprocess.preprocess_util.load_and_preprocess(obs_id, configs, context=None, dets=None, meta=None, no_signal=None, logger=None, return_full_aman=False)[source]

Loads the saved information from the preprocessing pipeline and runs the processing section of the pipeline.

Assumes preprocess_tod has already been run on the requested observation.

Parameters:
  • obs_id (multiple) – Passed to context.get_obs to load AxisManager, see Notes for context.get_obs

  • configs (string or dictionary) – Config file or loaded config directory

  • context (core.Context) – Optional. The Context file to use.

  • dets (dict) – Dets to restrict on from info in det_info. See context.get_meta.

  • meta (AxisManager) – Contains supporting metadata to use for loading. Can be pre-restricted in any way. See context.get_meta.

  • no_signal (bool) – If True, signal will be set to None. This is a way to get the axes and pointing info without the (large) TOD blob. Not all loaders may support this.

  • logger (PythonLogger) – Optional. Logger object. If None, a new logger is created.

  • return_full_aman (bool) – Optional. Return unrestricted axis manager alongside restricted aman if True, otherwise return None.

Returns:

  • aman (core.AxisManager or None) – Loaded and restricted axis manager with preprocessing metadata. Returns None if all detectors cut.

  • full_aman (core.AxisManager or None) – Unrestricted preprocessing axis manager. Used when running multilayer pipeline to ensure saved detector axis has the full size when saving metadata.

sotodlib.preprocess.preprocess_util.multilayer_load_and_preprocess(obs_id, configs_init, configs_proc, dets=None, meta=None, no_signal=None, logger=None, init_only=False, ignore_cfg_check=False, stop_for_sims=False)[source]

Loads the saved information from the preprocessing pipeline from a reference and a dependent database and runs the processing section of the pipeline for each.

Assumes preprocess_tod and multilayer_preprocess_tod have already been run on the requested observation.

Parameters:
  • obs_id (multiple) – Passed to context.get_obs to load AxisManager, see Notes for context.get_obs

  • configs_init (string or dictionary) – Config file or loaded config directory

  • configs_proc (string or dictionary) – Second config file or loaded config dictionary to load dependent databases generated using multilayer_preprocess_tod.py.

  • dets (dict) – Dets to restrict on from info in det_info. See context.get_meta.

  • meta (AxisManager) – Contains supporting metadata to use for loading. Can be pre-restricted in any way. See context.get_meta.

  • no_signal (bool) – If True, signal will be set to None. This is a way to get the axes and pointing info without the (large) TOD blob. Not all loaders may support this.

  • logger (PythonLogger) – Optional. Logger object or None will generate a new one.

  • init_only (bool) – Optional. If True, do not run the dependent pipeline.

  • ignore_cfg_check (bool) – If True, do not attempt to validate that configs_init is the same as the config used to create the existing init db.

  • stop_for_sims (bool) – Optinal. If True, will stop before each step of the pipeline with the flag use_data_aman set to True. The intended use is to prepare all necessary data products that cannot be stored in the preprocessing database, to process simulations.

Returns:

aman – Loaded and restricted axis manager with preprocessing metadata. Returns None if all detectors cut.

Return type:

core.AxisManager or None

sotodlib.preprocess.preprocess_util.multilayer_load_and_preprocess_sim(obs_id, configs_init, configs_proc, sim_map, meta=None, logger=None, init_only=False, ignore_cfg_check=False, data_amans=None, interpol=None)[source]

Loads the saved information from the preprocessing pipeline from a reference and a dependent database, loads the signal from a (simulated) map into the AxisManager and runs the processing section of the pipeline for both databases.

Assumes preprocess_tod and multilayer_preprocess_tod have already been run on the requested observation.

Parameters:
  • obs_id (multiple) – Passed to context.get_obs to load AxisManager, see Notes for context.get_obs

  • configs_init (string or dictionary) – Config file or loaded config directory

  • configs_proc (string or dictionary) – Second config file or loaded config dictionary to load dependent databases generated using multilayer_preprocess_tod.py.

  • sim_map (numpy.ndmap or enmap.ndmap) – Input simulated map to be observed

  • meta (AxisManager) – Contains supporting metadata to use for loading. Can be pre-restricted in any way. See context.get_meta.

  • no_signal (bool) – If True, signal will be set to None. This is a way to get the axes and pointing info without the (large) TOD blob. Not all loaders may support this.

  • logger (PythonLogger) – Optional. Logger object or None will generate a new one.

  • init_only (bool) – Optional. Whether or not to run the dependent pipeline.

  • ignore_cfg_check (bool) – If True, do not attempt to validate that configs_init is the same as the config used to create the existing init db.

  • data_amans (dict (Optional)) – A dictionary of AxisManagers with keys (step, process.name) filled with AxisManager processed up to step-1. This is used to pre-load all data AxisManager which could be required when processing simulations (e.g. to provide a T2P template)

  • interpol (str) – Optional. The sub-pixel interpolation to use in from_map

Returns:

aman – Loaded and restricted axis manager with preprocessing metadata. Returns None if all detectors cut.

Return type:

core.AxisManager or None

sotodlib.preprocess.preprocess_util.preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, logger=None, overwrite=False, save_archive=False, save_proc_aman=True, compress=False, skip_missing=False, ignore_cfg_check=False)[source]

This function is expected to receive a single obs_id, and dets dictionary. The dets dictionary must match the grouping specified in the preprocess config files. It accepts either one or two config strings or dicts representing an initial and a dependent pipeline stage. If the preprocess database entry for this obsid-dets group already exists then this function will just load back the processed tod calling either the load_and_preprocess or multilayer_load_and_preprocess functions. If the db entry does not exist or the overwrite flag is set to True then the full preprocessing steps defined in the configs are run and if save_proc_aman is True, the outputs are written to a unique h5 file. Any errors, the info to populate the database, the file path of the h5 file, and the process tod are returned from this function. Processed axis managers can be written to an archive and database by using cleanup_mandb (or setting save_archive to True) which consumes all of the outputs (except the processed tod), writes to the database, and moves the multiple h5 files into fewer h5 files (each <= 10 GB).

Parameters:
  • obs_id (str) – Obs id to process or load

  • configs_init (str or dict) – Filepath or dictionary containing the preprocess configuration file.

  • dets (dict) – Dictionary specifying which detectors/wafers to load see Context.obsdb.get_obs.

  • configs_proc (str or dict) – Filepath or dictionary containing a dependent preprocess configuration file.

  • logger (PythonLogger) – Optional. Logger object or None will generate a new one.

  • overwrite (bool) –

    Optional. Whether or not to overwrite existing entries in the

    preprocess manifest db.

    save_archivebool

    Call cleanup_mandb if True to save to the archive and database files in configs_init and configs_proc. Should be False if preproc_or_load_group is being called from within a parallelized script (i.e. python multiprocessing or MPI).

  • save_proc_aman (bool) – Whether or not to save the preprocessing axis manager. Required if saving into a preprocessing archive.

  • compress (bool) – Whether or not to compress the preprocessing data. Uses flacarray compression.

  • skip_missing (bool) – Do not attempt to run preprocessing pipeline if either of the preproc dbs don’t exist or the obs_id and group combination is not found.

  • ignore_cfg_check (bool) – If True, do not attempt to validate that configs_init is the same as the config used to create the existing init db when running multilayer_load_and_preprocess.

Returns:

  • aman (AxisManager or None) – Preprocessed axis manager if preproc_or_load_group finished successfully or None if it failed.

  • out_dict_init (dict or None) – Dictionary output for init config from get_preproc_group_out_dict if preprocessing ran successfully for init layer or None if preprocessing was loaded or preproc_or_load_group failed.

  • out_dict_proc (dict or None) – Dictionary output for proc config from get_preproc_group_out_dict if preprocessing ran successfully for proc layer or None if preprocessing was loaded, that layer was not run or loaded, or preproc_or_load_group failed.

  • errors (tuple) – A tuple containing the error from PreprocessError, an error message, and the traceback. Each will be None if preproc_or_load_group finished successfully.

Processing Util Functions

These functions support and are used within the driver processing scripts above and are useful for saving, loading, and verifying preprocessing archives and databases.

class sotodlib.preprocess.preprocess_util.PreprocessErrors[source]

Bases: object

Stores the various errors that can occur from the preprocessing functions.

LoadSuccess = 'load_success'
GetGroupsError = 'get_groups_error'
MetaDataError = 'get_meta_data_error'
NoDetsRemainError = 'no_dets_remain_error'
NoGroupOverlapError = 'no_group_overlap_error'
MultilayerPipelineLoadError = 'multilayer_load_and_preprocess_error'
SingleLayerPipelineLoadError = 'single_layer_load_and_preprocess_error'
PipeLineRunError = 'pipeline_run_error'
InitPipeLineRunError = 'init_pipeline_run_error'
ProcPipeLineRunError = 'proc_pipeline_run_error'
PipeLineStepError = 'pipeline_step_error'
NoInitDbError = 'no_init_db_error'
GroupOutputError = 'group_output_error'
ExecutorFutureError = 'executor_future_error'
SkipMissingError = 'skip_missing_error'
classmethod get_errors(e)[source]
sotodlib.preprocess.preprocess_util.filter_preproc_runlist_by_jobdb(jdb, jclass, db, run_list, group_by, overwrite=False, logger=None)[source]

Given a preprocess_tod or multilayer_preprocess_tod run list, checks whether that entry exists in the preprocess jobdb. If it failed or is done and overwrite is False, add it to the list of skipped obs_ids. If it doesn’t exist, is open, or is done but overwite is True, add an open job to the jobdb.

Parameters:
  • jdb (JobManager) – The preprocessing jobdb.

  • jclass (str) – The jobdb class name.

  • db (ManifestDb or None) – Preprocessing database.

  • run_list (list) – List of (obs_id, group) tuples.

  • group_by (list) – How grouping is being done for preprocessing. Specified in the preprocessing config through the subobs.use entry.

  • overwrite (bool) – Whether or not to overwrite entries in the preprocessing db.

  • logger (PythonLogger) – A python logger.

Returns:

run_list – Run list with the subset of skipped entries removed.

Return type:

list

sotodlib.preprocess.preprocess_util.init_logger(name, announce='', verbosity=2)[source]

Configure and return a logger for site_pipeline elements. It is disconnected from general sotodlib (propagate=False) and displays relative instead of absolute timestamps.

Parameters:
  • name (str) – The name of the logger

  • announce (str) – Initial message to be displayed after logger is instantiated.

  • verbosity (int) – Level of logger output 0: Error 1: Warning 2: Info 3: Debug

Returns:

logger – The initialized logger object

Return type:

PythonLogger

sotodlib.preprocess.preprocess_util.get_preprocess_context(configs, context=None)[source]

Load the provided config file and context file. To be used in preprocess_*.py site pipeline scripts. If the provided context file does not have a metadata entry for preprocess then one will be added based on the definition in the config file.

Parameters:
  • configs (str or dict) – The configuration file or dictionary.

  • context (str or core.Context, optional) – The context to use. If None, it is created from the configuration file.

Returns:

  • configs (dict) – The configuration dictionary.

  • context (core.Context) – The context file.

sotodlib.preprocess.preprocess_util.get_groups(obs_id, configs, context=None)[source]

Get subobs group method and groups. To be used in preprocess_*.py site pipeline scripts.

Parameters:
  • obs_id (str) – The obsid.

  • configs (str or dict) – The configuration dictionary.

  • context (core.Context) – The Context file to use.

Returns:

  • group_by (list of str) – The list of keys used to group the detectors.

  • groups (list of list of int) – The list of groups of detectors.

  • errors (tuple) – Tuple of errors or Nones.

sotodlib.preprocess.preprocess_util.get_preprocess_db(configs, group_by, logger=None)[source]

Get or create a ManifestDb found for a given config.

Parameters:
  • configs (dict) – The configuration dictionary.

  • group_by (list of str) – The list of keys used to group the detectors.

  • logger (PythonLogger) – Optional. Logger object. If None, a new logger is created.

Returns:

db – ManifestDb object

Return type:

ManifestDb

sotodlib.preprocess.preprocess_util.swap_archive(config, fpath)[source]

Update the configuration archive policy filename, create an output archive directory if it doesn’t exist, and return a copy of the config.

Parameters:
  • configs (dict) – The configuration dictionary.

  • fpath (str) – The archive policy filename to write to.

Returns:

tc – Copy of the configuration file with an updated archive policy filename

Return type:

dict

sotodlib.preprocess.preprocess_util.load_preprocess_det_select(obs_id, configs, context=None, dets=None, meta=None, logger=None)[source]

Loads the metadata information for the Observation and runs through any data selection specified by the Preprocessing Pipeline.

Parameters:
  • obs_id (multiple) – Passed to context.get_obs to load AxisManager, see Notes for context.get_obs

  • configs (string or dictionary) – Config file or loaded config directory

  • context (core.Context) – The Context file to use.

  • dets (dict) – Dets to restrict on from info in det_info. See context.get_meta.

  • meta (AxisManager) – Contains supporting metadata to use for loading. Can be pre-restricted in any way. See context.get_meta.

  • logger (PythonLogger) – Optional. Logger object. If None, a new logger is created.

Returns:

Restricted list of detector vals.

Return type:

list

sotodlib.preprocess.preprocess_util.find_db(obs_id, configs, dets, context=None, logger=None)[source]

This function checks if the manifest db from a config file exists and searches if it contains an entry for the provided Obs id and set of detectors.

Parameters:
  • obs_id (str) – Obs id to process or load

  • configs (str or dict) – Filepath or dictionary containing the preprocess configuration file.

  • dets (dict) – Dictionary specifying which detectors/wafers to load see Context.obsdb.get_obs.

  • context (core.Context) – Optional. Context object used for data loading/querying.

  • logger (PythonLogger) – Optional. Logger object or None will generate a new one.

Returns:

dbexist – True if db exists and entry for input detectors is found.

Return type:

bool

sotodlib.preprocess.preprocess_util.get_preproc_group_out_dict(obs_id, configs, dets, context=None, subdir='temp')[source]

This function returns a dictionary containing the data destination filename and the values to populate the manifest db.

Parameters:
  • obs_id (str) – Obs id to process or load

  • configs (str or dict) – Filepath or dictionary containing the preprocess configuration file.

  • dets (dict) – Dictionary specifying which detectors/wafers to load see Context.obsdb.get_obs.

  • context (core.Context) – Optional. Context object used for data loading/querying.

  • subdir (str) – Optional. Subdirectory to save the output files into. If it does not exist, it is created.

Returns:

outputs – Dictionary including output filename of data file and information for corresponding database entry.

Return type:

dict

sotodlib.preprocess.preprocess_util.save_group_and_cleanup(obs_id, configs, context=None, subdir='temp', logger=None, remove=False)[source]
This function checks if any temporary files exist from a preprocessing

run and will either add them to the config policy file and create an entry in the manifest db by calling cleanup_mandb. If the file exists but cannot be opened or if remove is True, the file will be deleted. Remove is intended to be to allow for overwrite=True in preprocess_tod.py and multilayer_preprocess_tod.py.

Parameters:
  • obs_id (str) – Obs id to process or load

  • configs (str or dict) – Filepath or dictionary containing the preprocess configuration file.

  • context (core.Context) – Optional. Context object used for data loading/querying.

  • subdir (str) – Optional. Subdirectory to save the output files into. If it does not exist, it is created.

  • logger (PythonLogger) – Optional. Logger object or None will generate a new one.

  • remove (bool) – Optional. Default is False. Whether to remove a file if found. Used when overwrite is True in driving functions.

Returns:

errors – Error from get_groups.

Return type:

tuple

sotodlib.preprocess.preprocess_util.cleanup_obs(obs_id, policy_dir, errlog, configs, context=None, subdir='temp', remove=False)[source]

For a given obs id, this function will search the policy_dir directory if it exists for any files with that obsnum in their filename. If any are found, it will run save_group_and_cleanup for that obs id.

Parameters:
  • obs_id (str) – Obs id to check and clean up

  • policy_dir (str) – Directory to temp per-group output files

  • errlog (str) – Filepath to error logging file.

  • configs (str or dict) – Filepath or dictionary containing the preprocess configuration file.

  • context (core.Context) – Optional. Context object used for data loading/querying.

  • subdir (str) – Optional. Subdirectory to save the output files into.

  • remove (bool) – Optional. Default is False. Whether to remove a file if found. Used when overwrite is True in driving functions.

sotodlib.preprocess.preprocess_util.cleanup_mandb(out_dict, out_meta, errors, configs, logger=None, overwrite=False, db_manager=None)[source]

Function to update the manifest db when data is collected from the preproc_or_load_group function. If used in an mpi framework this function is expected to be run from rank 0 after a comm.gather. See the preproc_or_load_group docstring for the varying expected values of errors and the associated out_dict. This function will either:

1) Update the ManifestDb sqlite file and move the h5 archive from its temporary location to its permanent path if errors[0] is None, out_dict is not``None``. Deletes the temporary h5 file.

2) Return nothing if errors[0] is PreprocessErrors.LoadSuccess or both it and out_dict are None.

  1. Otherwise, update the error log.

Parameters:
  • errors (tuple) – A tuple containing the error from PreprocessError, an error message, and the traceback. Each will be None if preproc_or_load_group finished successfully.

  • out_meta (tuple) – The tuple (obs_id, group).

  • outputs (dict) – Dictionary including entries for the temporary h5 filename (‘temp_file’) and the obs_id group metadata and db entry (db_data). See save_group for more info.

  • configs (dict) – Preprocessing configuration dictionary.

  • logger (PythonLogger) – Optional. Python logger.

  • overwrite (bool) – Optional. Delete the entry in the archive file if it exists and replace it with the new entry.

  • db_manager (DbBatchManager, optional) – External database batch manager for optimized operations. If provided, uses the manager instead of creating individual connections.

sotodlib.preprocess.preprocess_util.get_pcfg_check_aman(pipe)[source]

Given a preprocess pipeline class return an axis manager containing the ordered steps of the pipeline with all arguments for each step.

Parameters:

pipe (_Preprocess class) – Preprocess pipeline class from which to build the step argument axis manager.

sotodlib.preprocess.preprocess_util.check_cfg_match(ref, loaded, logger=None)[source]

Checks that the ref and loaded axis managers containing the ordered preprocess pipelines match one another.

Parameters:
  • ref (AxisManager) – Reference axis manager for cross checking

  • loaded (AxisManager) – Loaded axis manager for cross checking.

  • logger (PythonLogger) – Optional. Python logger object.

Example TOD Pipeline Configuration File

Suppose we want to run a simple pipeline that runs the glitch calculator and estimates the white noise levels of the data. A configuration file for the processing pipeline would look like:

# Context for the data
context_file: 'context.yaml'

# Plot directory prefix
plot_dir: './plots'

# How to subdivide observations
subobs:
    use: detset
    label: detset

# Metadata index & archive filenaming
archive:
    index: 'preprocess_archive.sqlite'
    policy:
        type: 'simple'
        filename: 'preprocess_archive.h5'

process_pipe:
    - name : "fft_trim"
      process:
        axis: 'samps'
        prefer: 'right'

    - name: "trends"
      calc:
        max_trend: 30
        n_pieces: 5
      save: True
      select:
        kind: "any"

    - name: "glitches"
      calc:
        t_glitch: 0.002
        hp_fc: 0.5
        n_sig: 10
        buffer: 20
      save: True
      select:
        max_n_glitch: 20
        sig_glitch: 30

    - name: "detrend"
      process:
        method: "linear"
        count: 10

    - name: "calibrate"
      process:
        kind: "single_value"
        ## phase_to_pA: 9e6/(2*np.pi)
        val: 1432394.4878270582

    - name: "psd"
      process:
        detrend: False
        window: "hann"

    - name: "noise"
      calc:
        low_f: 5
        high_f: 10
      save: True
      select:
        max_noise: 2000

This pipeline can be run through the functions saved in site_pipeline. Each entry in “process_pipe” key will be used to generate a Preprocess module based on the name it is registered to. These entries will then be run in order through the processing pipe. The process function is always run before the calc_and_save function for each module. The plot function can be run after calc_and_save when plot: True for a module that supports it.

Example Planet TOD Pipeline Configuration File

Similar to a regular TOD pipeline, if we want to run one for planet observations, we must first flag sources in the signal and gapfill them. An example configuration file should be equivalent to non-planet data processing after a few extra first steps:

# Context for the data
context_file: 'context.yaml'

# Plot directory prefix
plot_dir: './plots'

# How to subdivide observations
subobs:
    use: wafer_slot
    label: wafer_slot

# Metadata index & archive filenaming
archive:
    index: 'preprocess_archive.sqlite'
    policy:
        type: 'simple'
        filename: 'preprocess_archive.h5'

process_pipe:
    - name : "dark_dets"
      calc: True
      save: True
      select: True

    - name: "source_flags"
      calc:
        mask: {'shape': 'circle',
              'xyr': [0, 0, 1.]}
        center_on: 'jupiter' # set to 'planet' for variable according to planet tag of each obs (must use --planet-obs argument of site-pipeline script)
        res: 20 # np.radians(20/60)
        max_pix: 4.0e+6
      save: True

    - name: "glitchfill"
      flag_aman: "sources"
      flag: "source_flags"
      process:
        nbuf: 10
        use_pca: True
        modes: 3

Example Obs Pipeline Configuration File

Suppose we want to run an observation-level pipeline that creates a SSO footprint. A configuration file for the processing pipeline would look like:

# Context for the data
context_file: 'context.yaml'

# Plot directory prefix
plot_dir: './plots'

# Metadata index & archive filenaming
archive:
    index: 'preprocess_archive.sqlite'
    policy:
        type: 'simple'
        filename: 'preprocess_archive.h5'

process_pipe:
    - name: "sso_footprint"
      calc:
        # If you want to search for nearby sources, exclude source_list
        source_list: ['jupiter']
        distance: 20
        nstep: 100
      save: True
      plot:
        wafer_offsets: {'ws0': [-2.5, -0.5],
                        'ws1': [-2.5, -13],
                        'ws2': [-13, -7],
                        'ws3': [-13, 5],
                        'ws4': [-2.5, 11.5],
                        'ws5': [8.5, 5],
                        'ws6': [8.5, -7]}
        focal_plane: 'focal_plane_positions.npz'

Processing Modules

TOD Operations

class sotodlib.preprocess.processes.FFTTrim(step_cfgs)[source]

Trim the AxisManager to optimize for faster FFTs later in the pipeline. All processing configs go to fft_trim

fft_trim(tod, axis='samps', prefer='right')

Restrict AxisManager sample range so that FFTs are efficient. This uses the find_inferior_integer function.

Parameters:
  • tod (AxisManager) – Target, which is modified in place.

  • axis (str) – Axis to target.

  • prefer (str) – One of [‘left’, ‘right’, ‘center’], indicating whether to trim away samples from the end, the beginning, or !equally at the beginning and end (respectively).

Returns:

The (start, stop) indices to use to slice an array and get these samples.

class sotodlib.preprocess.processes.Detrend(step_cfgs)[source]

Detrend the signal. All processing configs go to detrend_tod

detrend_tod(tod, method='linear', axis_name='samps', signal_name='signal', in_place=True, wrap_name=None, count=10)

Returns detrended data. Detrends data in place by default but pass in_place=False if you would like a copied array (such as if you’re just looking to use this in an FFT).

Using this with method =’mean’ and axis_name=’dets’ will remove a common mode from the detectors Using this with method =’median’ and axis_name=’dets’ will remove a common mode from the detectors with the median rather than the mean

Parameters:
  • tod (axis manager)

  • method (str) – method of detrending can be ‘linear’, ‘mean’, or median

  • axis_name (str) – the axis along which to detrend. default is ‘samps’

  • signal_name (str) – the name of the signal to detrend. defaults to ‘signal’. Can have any shape as long as axis_name can be resolved.

  • in_place (bool.) – If False it makes a copy of signal before detrending and returns the copy.

  • wrap_name (str or None.) – If not None, wrap the detrended data into tod with this name.

  • count (int) – Number of samples to use, on each end, when measuring mean level for ‘linear’ detrend. Values larger than 1 suppress the influence of white noise.

Returns:

signal – Detrended signal. Done in place or on a copy depend on in_place argument.

Return type:

array of type tod[signal_name]

class sotodlib.preprocess.processes.PSDCalc(step_cfgs)[source]

Calculate the PSD of the data and add it to the AxisManager under the “psd” field.

Note: noverlap = 0 amd full_output = True are recommended to get unbiased

median white noise estimation by Noise.

Example config block:

- "name : "psd"
  "signal: "signal" # optional
  "wrap": "psd" # optional
  "process":
    "nperseg": 1024 # optional
    "noverlap": 0 # optional
    "wrap_name": "psd" # optional
    "subscan": False # optional
    "full_output": True # optional
calc_psd(aman, signal=None, timestamps=None, max_samples=262144, prefer='center', freq_spacing=None, merge=False, merge_suffix=None, overwrite=True, subscan=False, full_output=False, label_axis='dets', **kwargs)

Calculates the power spectrum density of an input signal using signal.welch(). Data defaults to aman.signal and times defaults to aman.timestamps. By default the nperseg will be set to power of 2 closest to the 1/50th of the samples used, this can be overridden by providing nperseg or freq_spacing.

Parameters:
  • aman (AxisManager) – with (dets, samps) OR (channels, samps)axes.

  • signal (float ndarray) – data signal to pass to scipy.signal.welch().

  • timestamps (float ndarray) – timestamps associated with the data signal.

  • max_samples (int) – maximum samples along sample axis to send to welch.

  • prefer (str) – One of [‘left’, ‘right’, ‘center’], indicating what part of the array we would like to send to welch if cuts are required.

  • freq_spacing (float) – The approximate desired frequency spacing of the PSD. If None the default nperseg of ~1/50th the signal length is used. If an nperseg is explicitly passed then that will be used.

  • merge (bool) – if True merge results into axismanager.

  • merge_suffix (str, optional) – Suffix to append to the Pxx field name in aman. Defaults to None (merged as Pxx).

  • overwrite (bool) – if true will overwrite f, Pxx axes.

  • subscan (bool) – if True, compute psd on subscans.

  • full_output – if True this also outputs nseg, the number of segments used for welch, for correcting bias of median white noise estimation by calc_wn.

  • label_axis (str) – The name of LabelAxis in the input aman. Default is dets.

  • **kwargs – keyword args to be passed to signal.welch().

Returns:

array of frequencies corresponding to PSD calculated from welch. Pxx: array of PSD values. nseg: number of segments used for welch. this is returned if full_output is True.

Return type:

freqs

class sotodlib.preprocess.processes.Apodize(step_cfgs)[source]

Apodize the edges of a signal. All process configs go to apodize_cosine

apodize_cosine(aman, signal_name='signal', apodize_samps=1600, in_place=True, apo_axis='apodized', window=None)

Function to smoothly filter the timestream to 0’s on the ends with a cosine function. If window is provided, multiply the window function to aman[signal_name].

Parameters:
  • signal_name (str) – Axis to apodize

  • apodize_samps (int) – Number of samples on tod ends to apodize.

  • in_place (bool) – writes over signal with apodized version

  • apo_axis (str) – Axis to store the apodized signal if not in place.

  • window (numpy.ndarray) – Precomputed apodization window.

class sotodlib.preprocess.processes.SubPolyf(step_cfgs)[source]
Fit TOD in each subscan with polynominal of given order and subtract it.

All process configs go to sotodlib.tod_ops.sub_polyf.

subscan_polyfilter(aman, degree, signal_name='signal', exclude_turnarounds=False, mask=None, exclusive=True, method='legendre', in_place=True)

Apply polynomial filtering to subscan segments in a data array. This function applies polynomial filtering to subscan segments within signal for each detector. Subscan segments are defined based on the presence of flags such as ‘left_scan’ and ‘right_scan’. Polynomial filtering is used to remove low-degree polynomial trends within each subscan segment.

Parameters:
  • aman (AxisManager)

  • degree (int) – The degree of the polynomial to be removed.

  • signal_name (string, optional) – The name of TOD signal to use. If not provided, aman.signal will be used.

  • exclude_turnarounds (bool) – Optional. If True, turnarounds are excluded from subscan identification. Default is False.

  • mask (str or RangesMatrix) – Optional. A mask used to select specific data points for filtering. If None, no mask is applied. If the mask is given in str, aman.flags['mask'] is used as mask. Arbitrary mask can be specified in the style of RangesMatrix.

  • exclusive – Optional. If True, the mask is used to exclude data from fitting. If False, the mask is used to include data for fitting. Default is True.

  • method (str) – Optioal. Method to model the baseline of TOD. In legendre method, baseline model is constructed using orthonormality of Legendre function. In polyfit method, numpy.polyfit is used. legendre is faster. Default is legendre.

  • in_place (bool) – Optional. If True, aman.signal is overwritten with the processed signal.

Returns:

signal – The processed signal.

Return type:

array-like

class sotodlib.preprocess.processes.Jumps(step_cfgs)[source]

Run generic jump finding and fixing algorithm.

calc_cfgs should have ‘function’ defined as one of ‘find_jumps’, ‘twopi_jumps’ or ‘slow_jumps’. Any additional configs to the jump function goes in ‘jump_configs’.

Saves results in proc_aman under the “jumps” field.

Data section should define a maximum number of jumps “max_n_jumps”.

Example config block:

- name: "jumps"
  calc:
    function: "twopi_jumps"
  save:
    jumps_name: "jumps_2pi"
  plot:
      plot_ds_factor: 50
  select:
      max_n_jumps: 5
find_jumps(aman, signal, min_sigma, min_size, win_size, exact, fix: Literal[False] = False, inplace=False, merge=True, overwrite=False, name='jumps', ds=10, clean=80, **filter_pars) Tuple[RangesMatrix, csr_array]
find_jumps(aman, signal, min_sigma, min_size, win_size, exact, fix: Literal[True], inplace, merge, overwrite, name, ds, clean, **filter_pars) Tuple[RangesMatrix, csr_array, ndarray[Any, dtype[floating]]]

Find jumps in aman.signal_name with a matched filter for edge detection. Expects aman.signal_name to be 1D of 2D.

Parameters:
  • aman – axis manager.

  • signal – Signal to jumpfind on. If None than aman.signal is used.

  • min_sigma – Number of standard deviations to count as a jump, note that the standard deviation here is computed by std_est and is the white noise standard deviation, so it doesn’t include contributions from jumps or 1/f. If min_size is provided it will be used instead of this.

  • min_size – The smallest jump size counted as a jump. By default this is set to None and min_sigma is used instead, if set this will override min_sigma. If both min_sigma and min_size are None then the IQR is used as min_size.

  • win_size – Size of window used when peak finding. Also used for height estimation, should be of order jump width.

  • exact – If True search for the exact jump location. If False flag allow some undertainty within the window (cheaper).

  • fix – Set to True to fix.

  • inplace – Whether of not signal should be fixed inplace.

  • merge – If True will wrap ranges matrix into aman.flags.<name>

  • overwrite – If True will overwrite existing content of aman.flags.<name>

  • name – String used to populate field in flagmanager if merge is True.

  • ds – Downsample factor used when computing noise level, the actual factor used is ds*win_size.

  • clean – Cleaning value to pass to estimate_heights. See that function for details.

  • **filter_pars – Parameters to pass to _filter

Returns:

RangesMatrix containing jumps in signal,

if signal is 1D Ranges in returned instead. There is some uncertainty on order of a few samples. Jumps within a few samples of each other may not be distinguished.

heights: csr_array of jump heights.

fixed: signal with jump fixed. Only returned if fix is set.

Return type:

jumps

class sotodlib.preprocess.processes.FixJumps(step_cfgs)[source]

Repairs the jump heights given a set of jump flags and heights.

Example config block:

- name: "fix_jumps"
  signal: "signal" # optional
  process:
  jumps_aman: "jumps_2pi"
jumpfix_subtract_heights(x: ndarray[Any, dtype[floating]], jumps: RangesInt32 | RangesMatrix | ndarray[Any, dtype[bool_]], inplace: bool = False, heights: ndarray[Any, dtype[floating]] | csr_array | None = None, **kwargs) ndarray[Any, dtype[floating]]

Naive jump fixing routine where we subtract known heights between jumps. Note that you should exepect a glitch at the jump locations. Works best if you buffer the jumps mask by a bit.

Parameters:
  • x – Data to jumpfix on, expects 1D or 2D.

  • jumps – Boolean mask or Ranges(Matrix) of jump locations. Should be the same shape at x.

  • inplace – Whether of not x should be fixed inplace.

  • heights – Array of jump heights, can be sparse. If None will be computed.

  • **kwargs – Additional arguments to pass to estimate_heights if heights is None.

Returns:

x with jumps removed.

If inplace is True this is just a reference to x.

Return type:

x_fixed

class sotodlib.preprocess.processes.FourierFilter(step_cfgs)[source]

Applies a chain of Fourier filters (defined in fft_ops) to the data.

Example config file entry for one filter:

- name: "fourier_filter"
  process:
    filt_function: "timeconst_filter"
    filter_params:
      timeconst: "det_cal.tau_eff"
      invert: True

Example for passing in a different signal name and wrapping into a new field:

- name: "fourier_filter"
      wrap_name: "lpf_demodQ"
      signal_name: "demodQ"
      process:
        filt_function: "sine2"
        filter_params:
          cutoff: 1
          trans_width: 0.1

Example config file entry for two filters:

- name: "fourier_filter"
  process:
    filters:
      - name: "iir_filter"
        filter_params:
          invert: True
      - name: "timeconst_filter"
        filter_params:
          timeconst: "det_cal.tau_eff"
          invert: True

Or with params from a noise fit:

- name: "fourier_filter"
  process:
    noise_fit_array: "noiseQ_fit"
    filters:
      - name: "iir_filter"
        filter_params:
          invert: True
      - name: "timeconst_filter"
        filter_params:
          timeconst: "det_cal.tau_eff"
          invert: True

See Fourier space filters documentation for more details.

Calibration

class sotodlib.preprocess.processes.Calibrate(step_cfgs)[source]

Calibrate the timestreams based on some provided information.

Type of calibration is decided by process[“kind”]

1. “single_value” : multiplies entire signal by the single value process[“val”]

2. “array” : takes the dot product of the array with the entire signal. The array is specified by process["cal_array"], which must exist in aman. The array can be nested within additional AxisManager objects, for instance det_cal.phase_to_pW.

Example config block(s):

- name: "calibrate"
  process:
    kind: "single_value"
    divide: True # If true will divide instead of multiply.
    # phase_to_pA: 9e6/(2*np.pi)
    val: 1432394.4878270582
- name: "calibrate"
  process:
    kind: "array"
    cal_array: "cal.array"
  select:
    cut_array: "cal.missing_cal" # should be 0 where cal is good 1 where missing.
class sotodlib.preprocess.processes.PCARelCal(step_cfgs)[source]

Estimate the relcal factor from the atmosphere using PCA.

Example configuration file entry:

- name: 'pca_relcal'
  signal: 'lpf_sig'
  pca_run: 'run1'
  calc:
      pca:
          xfac: 2
          yfac: 1.5
          calc_good_medianw: True
      lpf:
          type: "sine2"
          cutoff: 1
          trans_width: 0.1
      trim_samps: 2000
  save: True
  plot:
      plot_ds_factor: 20

See tod_ops.pca for more details on the method.

Flagging and Products

class sotodlib.preprocess.processes.Trends(step_cfgs)[source]

Calculate the trends in the data to look for unlocked detectors. All calculation configs go to get_trending_flags.

Saves results in proc_aman under the “trend” field.

Data selection can have key “kind” equal to “any” or “all.”

Example config block:

- name : "trends"
  signal: "signal" # optional
  calc:
    max_trend: 2.5
    t_piece: 100
  save: True
  plot: True
  select:
    kind: "any"

Flag Detectors with trends larger than max_trend. This function can be used to find unlocked detectors. Note that this is a rough cut and unflagged detectors can still have poor tracking.

Parameters:
  • aman (AxisManager) – The tod

  • max_trend (float) – Slope at which detectors are unlocked. The default is for use with phase units.

  • t_piece (float) – Duration in seconds of each pieces to cut the timestream in to to look for trends

  • max_samples (int) – Maximum samples to compute the slope with.

  • signal (array) – (Optional). Signal to use to generate flags, if None default is aman.signal.

  • timestamps (array) – (Optional). Timestamps to use to generate flags, default is aman.timestamps.

  • merge (bool) – If true, merges the generated flag into aman.

  • overwrite (bool) – If true, write over flag. If false, don’t.

  • name (str) – Name of flag to add to aman.flags if merge is True.

  • full_output (bool) – If true, returns calculated slope sizes

Returns:

  • cut (RangesMatrix) – RangesMatrix of trending regions

  • trends (AxisManager) – If full_output is true, calculated slopes and the sample edges where they were calculated.

class sotodlib.preprocess.processes.GlitchDetection(step_cfgs)[source]

Run glitch detection algorithm to find glitches. All calculation configs go to get_glitch_flags

Saves retsults in proc_aman under the “glitches” field.

Data section should define a glitch significant “sig_glitch” and a maximum number of glitches “max_n_glitch.”

Example configuration block:

- name: "glitches"
  glitch_name: "my_glitches"
  calc:
    signal_name: "hwpss_remove"
    t_glitch: 0.00001
    buffer: 10
    hp_fc: 1
    n_sig: 10
    subscan: False
  save: True
  plot:
      plot_ds_factor: 50
  select:
    max_n_glitch: 10
    sig_glitch: 10
get_glitch_flags(aman, t_glitch=0.002, hp_fc=0.5, n_sig=10, buffer=200, detrend=None, signal_name=None, merge=True, overwrite=False, name='glitches', full_output=False, edge_guard=2000, subscan=False)

Find glitches with fourier filtering. Translation from moby2 as starting point

Parameters:
  • aman (AxisManager) – The tod.

  • t_glitch (float) – Gaussian filter width.

  • hp_fc (float) – High pass filter cutoff.

  • n_sig (int or float) – Significance of detection.

  • buffer (int) – Amount to buffer flags around found location

  • detrend (str) – Detrend method to pass to fourier_filter

  • signal_name (str) – Field name in aman to detect glitches on if None, defaults to signal

  • merge (bool)) – If true, add to aman.flags

  • name (string) – Name of flag to add to aman.flags

  • overwrite (bool) – If true, write over flag. If false, raise ValueError if name already exists in AxisManager

  • full_output (bool) – If true, return sparse matrix with the significance of the detected glitches

  • edge_guard (int) – Number of samples at the beginning and end of the tod to exclude from the returned glitch RangesMatrix. Defaults to 2000 samples (10 sec).

  • subscan (bool) – If True, compute the glitch threshold on a per-subscan basis. Includes turnarounds.

Returns:

flag – RangesMatrix object containing glitch mask.

Return type:

RangesMatrix

class sotodlib.preprocess.processes.GlitchFill(step_cfgs)[source]

Fill glitches. All process configs go to fill_glitches. Notes on flags. If flags are provided as step_cfgs, proc_aman.get(flags) is used. If provided as process_cfgs, aman.get(glitch_flags) is used instead.

Example configuration block:

- name: "glitchfill"
  signal: "hwpss_remove"
  flags: "glitches.glitch_flags" # optional
  process:
    nbuf: 10
    use_pca: False
    modes: 1
    in_place: True
    glitch_flags: "glitch_flags"
    wrap: None
fill_glitches(aman, nbuf=10, use_pca=False, modes=3, signal=None, glitch_flags=None, in_place=True, wrap=None)

This function fills pre-computed glitches provided by the caller in time-ordered data using either a polynomial (default) or PCA-based approach. Wraps the other functions in the tod_ops.gapfill module.

Parameters:
  • aman (AxisManager) – AxisManager to fill glitches in

  • nbuf (int) – Number of buffer samples to use in polynomial gap filling.

  • use_pca (bool) – Whether or not to fill glitches using pca model. Default is False

  • modes (int) – Number of modes in the pca to use if pca=True. Default is 3.

  • signal (ndarray or None) – Array of data to fill glitches in. If None then uses aman.signal. Default is None.

  • glitch_flags (str or RangesMatrix or None) – RangesMatrix containing flags to use for gap filling. If provided by a string, aman.flags.get(flags) is used for the flags. If None then uses aman.flags.glitches.

  • in_place (bool) – If False it makes a copy of signal before gap filling and returns the copy.

  • wrap (str or None) – If not None, wrap the gap filled data into tod with this name.

Returns:

signal – Returns ndarray with gaps filled from input signal.

Return type:

ndarray

class sotodlib.preprocess.processes.Noise(step_cfgs)[source]

Estimate the white noise levels in the data. Assumes the PSD has been wrapped into the preprocessing AxisManager. All calculation configs go to calc_wn.

Saves the results into the “noise” field of proc_aman.

Can run data selection of a “max_noise” value.

When fit: True, the parameter wn_est can be a float or the name of an axis manager containing an array named white_noise. If not specified, the white noise is calculated with calc_wn() and used for wn_est. The calculated white noise will be stored in the noise fit axis manager.

Example config block for fitting PSD:

- name: "noise"
  fit: True
  subscan: False
  calc:
    fwhite: (5, 10)
    lowf: 1
    f_max: 25
    mask: True
    wn_est: noise
    fixed_param: 'wn'
    binning: True
    fit_method: log_curve_fit #or likelihood
    curve_fit_kwargs:
        maxfev: 20000
  save: True
  select:
    max_noise: 2000
    require_finite_fit: True

Set select.require_finite_fit to True to drop detectors whose fit parameters contain NaNs (indicating a failed noise fit).

Example config block for calculating white noise only:

- name: "noise"
  fit: False
  subscan: False
  calc:
    low_f: 5
    high_f: 20
  save: True
  select:
    min_noise: 18e-6
    max_noise: 80e-6

If fit: True this operation will run sotodlib.tod_ops.fft_ops.fit_noise_model(), else it will run sotodlib.tod_ops.fft_ops.calc_wn().

class sotodlib.preprocess.processes.FlagTurnarounds(step_cfgs)[source]
From the Azimuth encoder data, flag turnarounds, left-going, and right-going.

All process configs go to get_turnaround_flags. If the method key is not included in the preprocess config file calc configs then it will default to ‘scanspeed’.

get_turnaround_flags(aman, az=None, method='scanspeed', name='turnarounds', merge=True, merge_lr=True, overwrite=True, t_buffer=2.0, kernel_size=400, peak_threshold=0.1, rel_distance_peaks=0.3, truncate=False, qlim=1, merge_subscans=True, turnarounds_in_subscan=False)

Compute turnaround flags for a dataset.

Parameters:
  • aman (AxisManager) – Input axis manager.

  • az (Array) – (Optional). Azimuth data for turnaround flag computation. If not provided, it uses aman.boresight.az.

  • method (str) – (Optional). The method for computing turnaround flags. Options are az or scanspeed.

  • name (str) – (Optional). The name of the turnaround flag in aman.flags. Default is turnarounds

  • merge (bool) – (Optional). Merge the computed turnaround flags into aman.flags if True.

  • merge_lr (bool) – (Optional). Merge left and right scan flags as aman.flags.left_scan and aman.flags.right_scan if True.

  • overwrite (bool) – (Optional). Overwrite an existing flag in aman.flags with the same name.

  • t_buffer (float) – (Optional). Buffer time (in seconds) for flagging turnarounds in scanspeed method.

  • kernel_size (int) – (Optional). Size of the step-wise matched filter kernel used in scanspeed method.

  • peak_threshold (float) – (Optional). Peak threshold for identifying peaks in the matched filter response. It is a value used to determine the minimum peak height in the signal.

  • rel_distance_peaks (float) – (Optional). Relative distance between peaks. It specifies the minimum distance between peaks as a fraction of the approximate number of samples in one scan period.

  • truncate (bool) – (Optional). Truncate unstable scan segments if True in scanspeed method.

  • qlim (float) – (Optional). Azimuth threshold percentile for az method turnaround detection.

  • merge_subscans (bool) – (Optional). Also merge an AxisManager with subscan information.

  • turnarounds_in_subscan (bool) – (Optional). Turnarounds are included as part of a subscan.

Returns:

Ranges – The turnaround flags as a Ranges object.

Return type:

RangesMatrix

class sotodlib.preprocess.processes.DarkDets(step_cfgs)[source]

Find dark detectors in the data.

Saves results in proc_aman under the “dark_dets” field.

Example config block:

- name : "dark_dets"
  signal: "signal" # optional
  calc: True
  save: True
  select: True
get_dark_dets(aman, merge=True, overwrite=True, dark_flags_name='darks')

Identify and flag dark detectors in the given aman object.

Parameters:
  • aman (AxisManager) – The tod.

  • merge (bool, optional) – If True, merge the dark detector flags into the aman.flags. Default is True.

  • overwrite (bool, optional) – If True, overwrite existing flags with the same name. Default is True.

  • dark_flags_name (str, optional) – The name to use for the dark detector flags in aman.flags. Default is ‘darks’.

Returns:

mskdarks – A matrix of ranges indicating the dark detectors.

Return type:

RangesMatrix

Raises:

ValueError – If merge is True and dark_flags_name already exists in aman.flags and overwrite is False.

class sotodlib.preprocess.processes.SourceFlags(step_cfgs)[source]

Calculate the source flags in the data. All calculation configs go to get_source_flags.

Saves results in proc_aman under the “source_flags” field.

Example config block:

- name : "source_flags"
  source_flags_name: "my_source_flags"
  calc:
    mask: {'shape': 'circle',
           'xyr': [0, 0, 1.]}
    center_on: ['jupiter', 'moon'] # list of str
    res: 20 # arcmin
    max_pix: 4000000 # max number of allowed pixels in map
    distance: 0 # max distance of footprint from source in degrees
  save: True
  select: True # optional
    select_source: 'jupiter' # list of str or str. If not provided, all sources from center_on are selected.
    kind: 'any' # 'any', 'all', or float (0.0 < kind < 1.0)
    invert: False # optional, if True logic is filipped.
    Examples:
        1. invert=False, kind='any' → Select detectors with **no** True flags (e.g., for Moon cut).
        2. invert=True, kind='any' → Select detectors with **any** True flags (e.g., for planet selection).
        3. invert=False, kind=0.4 → Select detectors with <40% of True flags.
get_source_flags(aman, merge=True, overwrite=True, source_flags_name=None, mask=None, center_on=None, res=None, max_pix=None)
class sotodlib.preprocess.processes.GetStats(step_cfgs)[source]

Get basic statistics from a TOD or its power spectrum.

Example config block:

- name : "tod_stats"
  signal: "signal"  # optional
  wrap: "tod_stats" # optional
  calc:
    stat_names: ["median", "std"]
    split_subscans: False  # optional
    psd_mask:  # optional, for cutting a power spectrum in frequency
      freqs: "psd.freqs"
      low_f: 1
      high_f: 10
  save: True

Obs Operations

class sotodlib.preprocess.processes.SSOFootprint(step_cfgs)[source]

Find nearby sources within a given distance and get SSO footprint and plot each source on the focal plane.

Example config block:

- name: "sso_footprint"
  calc:
      # Note: all distances in degrees
      source_list: ['jupiter', 'moon', 'saturn'] # remove to find nearby sources
      distance: 20 # distance from boresight center
      nstep: 100
      telescope_flavor: 'sat' # options: ['sat', 'lat']
      wafer_hit_threshold: 10 # number of planet-wafer distances to consider being a source hit
      # for SATs:
      wafer_radius: 6
      wafer_centers: {'ws0': [-0.19791037, 0.08939717],
                      'ws1': [-0.014455856, -12.528095],
                      'ws2': [-10.867158, -6.2621593],
                      'ws3': [-10.835234, 6.2727923],
                      'ws4': [0.11142064, 12.461107],
                      'ws5': [10.878714, 6.273904],
                      'ws6': [10.870621, -6.2822847]}
      # for LAT:
      wafer_radius: 0.5
      wafer_centers: {'c1_ws0': [-0.36504516, 1.9619369e-05],
                      'c1_ws1': [0.18297304, 0.3164044],
                      'c1_ws2': [0.18297556, -0.31638196],
                      'i1_ws0': [-1.9073119, -0.8932063],
                      'i1_ws1': [-1.357702, -0.57522374],
                      'i1_ws2': [-1.3556796, -1.20838],
                      'i3_ws0': [1.1854928, -0.8960549],
                      'i3_ws1': [1.7332374, -0.57540596],
                      'i3_ws2': [1.7351116, -1.2087585],
                      'i4_ws0': [1.1789553, 0.89766216],
                      'i4_ws1': [1.7351091, 1.208781],
                      'i4_ws2': [1.7332398, 0.5754285],
                      'i5_ws0': [-0.35970667, 1.7832578],
                      'i5_ws1': [0.19053483, 2.0997307],
                      'i5_ws2': [0.1866497, 1.4668859],
                      'i6_ws0': [-1.9017702, 0.89197767],
                      'i6_ws1': [-1.3556821, 1.2084025],
                      'i6_ws2': [-1.3564061, 0.5815026]}
  save: True
  plot:
      # for SATs:
      wafer_offsets: {'ws0': [-2.5, -0.5],
                      'ws1': [-2.5, -13],
                      'ws2': [-13, -7],
                      'ws3': [-13, 5],
                      'ws4': [-2.5, 11.5],
                      'ws5': [8.5, 5],
                      'ws6': [8.5, -7]}
      focal_plane: '/so/home/msilvafe/shared_files/sat_hw_positions.npz'
      # for LAT:
      wafer_offsets: {'c1_ws0': [-0.6, 0.0],
                      'c1_ws1': [-0.0, 0.3],
                      'c1_ws2': [-0.0, -0.3],
                      'i1_ws0': [-2.1, -0.9],
                      'i1_ws1': [-1.6, -0.6],
                      'i1_ws2': [-1.6, -1.2],
                      'i3_ws0': [1.0, -0.9],
                      'i3_ws1': [1.5, -0.6],
                      'i3_ws2': [1.5, -1.2],
                      'i4_ws0': [1.0, 0.9],
                      'i4_ws1': [1.5, 1.2],
                      'i4_ws2': [1.5, 0.6],
                      'i5_ws0': [-0.6, 1.8],
                      'i5_ws1': [-0.0, 2.1],
                      'i5_ws2': [-0.0, 1.5],
                      'i6_ws0': [-2.1, 0.9],
                      'i6_ws1': [-1.6, 1.2],
                      'i6_ws2': [-1.6, 0.6]}
      focal_plane: '/so/home/dnguyen/repos/scripts/lat_hw_positions.npz'
get_sso(aman, sso, nstep=100)

Function for getting xi, eta position of given sso.

Parameters:
  • aman (AxisManager) – Input axis manager.

  • sso (str) – Name of input sso.

  • nstep (int) – Number of steps to downsample the TOD.

Returns:

  • xi (array) – Array of xi positions.

  • eta (array) – Array of eta positions.