site_pipeline

The site_pipeline submodule contains programs supporting quick-turnaround data processing at the observatory.

Note

Documentation of interfaces is currently held separately. The focus here will be on operation; command line parameters and config file syntax.

Command line interface

Usage

To execute a pipeline element from the command line, use the so-site-pipeline command. For example, make-source-flags can be invoked as:

so-site-pipeline make-source-flags [options]

To configure tab-completion of element names, in bash, run:

eval `so-site-pipeline --bash-completion`

Wrapping a pipeline script

In order to plug in nicely to the command line wrapper, element submodules should expose functions called main() and get_parser().

The get_parser() function should look like this:

def get_parser(parser=None):
  if parser is None:
    parser = argparse.ArgumentParser()
  # element-specific args:
  parser.add_argument('obs_id', help="The obs_id to analyze.")
  parser.add_arugment('--config-file', help="Config file.")
  return parser

When called by the so-site-pipeline wrapper, a parser will be passed in.

The main() function is the entry point to be called from the CLI or from Prefect. The arguments should include the arguments defined through the ArgumentParser, as well as any additional support for Prefect (such as a logger argument). For example:

def main(obs_id=None, config_file=None, logger=None):
  ...

If you want the submodule to be executable directly as a script (or through python -m), add a __main__ handling block like this one:

if __name__ == '__main__':
  util.main_launcher(main, get_parser)

To register a properly organized submodule in the so-site-pipeline command line wrapper, edit cli.py and see comments inline.

Pipeline Elements

update-g3tsmurf-db

This script set up to create and maintain g3tsmurf databases. See details here.

update-book-plan

This script is designed to help with the bookbinding. It will search a given Level 2 G3tSmurf database for observations that overlap in time. The different optional arguments will let us pass information from something like the sorunlib database to further filter the observations.

check-book

For a description and documentation of the config file format, see sotodlib.site_pipeline.check_book module autodocumentation below.

Command line arguments

Scan an “obs” or “oper” book and check for schema compliance; optionally update an obsfiledb.

usage: check-book [-h] [--config CONFIG] [--add] [--overwrite] book_dir
Positional Arguments
book_dir

Path to the Book.

Named Arguments
--config, -c

Path to config file with work-arounds and ObsFileDb config.

--add

After inspecting the book, add it to the ObsFileDb.

Default: False

--overwrite

If adding to ObsFileDb, remove existing references to this obs first (prevents ‘UNIQUE constraint’ error).

Default: False

Module documentation

check_book.py

This module an entry point to io.check_book, for checking obs/oper Books for internal consistency & proper schema. It may also be used to create/update an ObsFileDb for such Books.

A configuration file can be used to set the ObsFileDb filename and root path for ObsFileDb entries.

The config file can also be used to enable work-arounds and bypass certain exceptions (which should not be necessary on compliant books.)

At the of this writing a minimal config file might be simply:

# Database setup (this is the default).
obsfiledb: './obsfiledb.sqlite'

# For obsdb filenames, path relative to which those names should
# be specified.  (/ is the default.)
root_path: '/'

# Work-arounds
extra_extra_files: ['frame_splits.txt']

But here is a more complete example, with lots of work-arounds enabled:

# Database setup (this is the default).
obsfiledb: './obsfiledb.sqlite'

# For obsdb filenames, path relative to which those names should
# be specified.  (/ is the default.)
root_path: '/'

# Work-arounds
stream_file_pattern: 'D_obs_{stream_id}_{index:03d}.g3'
extra_extra_files: ['frame_splits.txt']
sample_range_inclusive_hack: True
tolerate_missing_ancil: True
tolerate_missing_ancil_timestamps: True
tolerate_timestamps_value_discrepancy: False

# Tolerate arbitrary extra files, except explicitly named ones
tolerate_stray_files: True
banned_files: ['frame_splits.txt']

# If stream_ids are not provided in metadata, list them here.
stream_ids:
  ufm_mv14
  ufm_mv18
  ufm_mv19
  ufm_mv22
  ufm_mv6
  ufm_mv7
  ufm_mv9

# If detset names are not provided in metadata, provide a map from
# stream_id to detset name here.
detset_map:
  ufm_mv14: sch_mv14
  ufm_mv18: sch_mv18
  ufm_mv19: sch_mv19
  ufm_mv22: sch_mv22
  ufm_mv6:  sch_mv6
  ufm_mv7:  sch_mv7
  ufm_mv9:  sch_mv9
sotodlib.site_pipeline.check_book.get_parser(parser=None)[source]
sotodlib.site_pipeline.check_book.main(book_dir, config=None, add=None, overwrite=None)[source]

update-obsdb

For a description and documentation of the config file format, see sotodlib.site_pipeline.update_obsdb module autodocumentation below.

Command line arguments

usage: update-obsdb [-h] --config CONFIG [--recency RECENCY]
                    [--verbosity VERBOSITY] [--booktype BOOKTYPE]
                    [--overwrite]
Named Arguments
--config

ObsDb, ObsfileDb configuration file

--recency

Days to subtract from now to set as minimum ctime. If None, no minimum

--verbosity

Increase output verbosity. 0:Error, 1:Warning, 2:Info(default), 3:Debug

Default: 2

--booktype

Select book type to look for: obs, oper, both(default)

Default: “both”

--overwrite

If true, writes over existing entries

Default: False

Module documentation

update_obsdb.py

Create and/or update an obsdb and obsfiledb based on some books. The config file could be of the form:

base_dir: path_to_base_directories. Can be a list or a single string.
obsdb_cols:
  start_time: float
  stop_time: float
  n_samples: int
  telescope: str
  tube_slot: str
  type: str
  subtype: str

obsdb: dummyobsdb.sqlite
obsfiledb: dummyobsfiledb.sqlite
lat_tube_list_file: path to yaml dict matching tubes and bands
tolerate_stray_files: True
skip_bad_books: True
extra_extra_files:
- Z_bookbinder_log.txt
extra_files:
- M_index.yaml
- M_book.yaml
sotodlib.site_pipeline.update_obsdb.check_meta_type(bookpath: str)[source]
sotodlib.site_pipeline.update_obsdb.telescope_lookup(telescope: str)[source]

Set a number of common queries given a telescope name

Parameters:

telescope (str) – Name of telescope in M_index

sotodlib.site_pipeline.update_obsdb.main(config: str, recency: float | None = None, booktype: str | None = 'both', verbosity: int | None = 2, overwrite: bool | None = False)[source]

Create or update an obsdb for observation or operations data.

Parameters:
  • config (str) – Path to config file

  • recency (float) – How far back in time to look for databases, in days. If None, goes back to the UNIX start date (default: None)

  • booktype (str) – Look for observations or operations data or both (default: both)

  • verbosity (int) – Output verbosity. 0:Error, 1:Warning, 2:Info(default), 3:Debug

  • overwrite (bool) – if False, do not re-check existing entries

sotodlib.site_pipeline.update_obsdb.get_parser(parser=None)[source]

update-smurf-caldbs

This update script is used to add detset and calibration metadata to manifest dbs

Module Docs

Script to import tuning and readout id channel mapping, and detector calibration information into manifest dbs for book loading. At present this just works in the configuration where it has access to both level 2 and level 3 indexing. This is technically possible with just level 3 data / indexing but requires some still non-existant tools.

Configuration file required:

config = {
    'archive': {
        'detset': {
            'root_dir': /path/to/detset/root,
            'index': 'detset.sqlite',
            'h5file': 'detset.h5',
            'context': 'context.yaml',
            'write_relpath': True
        },
        'det_cal': {
            'root_dir': /path/to/det_cal/root,
            'index': 'det_cal.sqlite',
            'h5file': 'det_cal.h5,
            'context': 'context.yaml',
            'failed_obsid_cache': 'failed_obsids.yaml',
            'write_relpath': True
        },
    },
    'g3tsmurf': g3tsmurf_hwp_config.yaml',
    'imprinter': imprinter.yaml,
}

The calibration info described below is used to populate the calibration db. For more information on how calibration info is computed in sodetlib, checkout the following docs and source code:

class sotodlib.site_pipeline.update_smurf_caldbs.CalInfo(readout_id: str = '', r_tes: float = nan, r_frac: float = nan, p_bias: float = nan, s_i: float = nan, phase_to_pW: float = nan, v_bias: float = nan, tau_eff: float = nan, bg: int = -1, polarity: int = 1, r_n: float = nan, p_sat: float = nan)[source]

Class that contains detector calibration information that will go into the caldb.

readout_id

Readout id of detector

Type:

str

r_tes

Detector resistance [ohms], determined through bias steps while the detector is biased

Type:

float

r_frac

Fractional resistance of TES, given by r_tes / r_n

Type:

float

p_bias

Bias power on the TES [J] computed using bias steps at the bias point

Type:

float

s_i

Current responsivity of the TES [1/V] computed using bias steps at the bias point

Type:

float

phase_to_pW

Phase to power conversion factor [pW/rad] computed using s_i, pA_per_phi0, and detector polarity

Type:

float

v_bias

Commanded bias voltage [V] on the bias line of the detector for the observation

Type:

float

tau_eff

Effective thermal time constant [sec] of the detector, measured from bias steps

Type:

float

bg

Bias group of the detector. Taken from IV curve data, which contains bgmap data taken immediately prior to IV. This will be -1 if the detector is unassigned

Type:

int

polarity

Polarity of the detector response for a positive change in bias current while the detector is superconducting. This is needed to correct for detectors that have reversed response.

Type:

int

r_n

Normal resistance of the TES [Ohms] calculated from IV curve data

Type:

float

p_sat

“saturation power” of the TES [J] calculated from IV curve data. This is defined as the electrical bias power at which the TES resistance is 90% of the normal resistance.

Type:

float

Command line arguments

usage: update_smurf_caldbs.py [-h] [--config CONFIG] [--skip-detset]
                              [--skip-detcal] [--overwrite]
Named Arguments
--config

configuration file

--skip-detset

Skip detset update

Default: False

--skip-detcal

Skip detcal update

Default: False

--overwrite

Overwrite existing entries

Default: False

Detector and Readout ID Mapping

These processes are interrelated and use a combination of the DetMap software package and sotodlib. The two scripts below are designed to use the same config files for simplicity and can be run with the level 2 G3tSmurf setup. The resulting ManifestDbs should work for both level 2 and level 3 SMuRf data.

make_det_info_wafer

This script uses based array construction inputs to build detector IDs for a set of UFMs and save them in a ManifestDb / HDF5 file. The formatting of the ResultSet saved in HDF5 file will map all this information into det_info.wafer when used with a correctly formatted context file and a readout to detector id mapping. The detector info mapping created by this script will be stable as long as the same UFMs are used in the same optics tube positions, meaning it only needs to be re-made if the physical hardware setup changes.

Although the full config presented for make_read_det_match will work, here’s a more basic example that will work:

det_db: "./det_info_wafer.db"
det_info: "./det_info_wafer.h5"
array_info_dir: "/home/so/git/site-pipeline-configs/shared/detmapping/design/"

arrays:
  - name: mv7
    stream_id: ufm_mv7
  - name: mv9
    stream_id: ufm_mv9

make_read_det_match

This script generates the readout ID to detector ID mapping required to translate between the detector hardware information (ex: pixel position) and the readout IDs of the resonators used to index the SMuRF data. The script uses the G3tSmurf database to generate a list of TuneSets (tune files) for a set of arrays / stream ids and runs the DetMap mapping software to generate a mapping between detectors and resonators. The saved metadata is formatted so with the correctly formatted Context file the detector ids can be automatically loaded in the det_info AxisManager.

Config file format

Here’s an example configuration file. Many of these values depend on hardware setup and readout software setup. Making the detector ID info only requires a subset of these parameters but the processes are linked so it is probably worth always having the same configuration file. Tested mapping strategies include assignment and map_by_freq.

data_prefix : "/path/to/level2/data/"
g3tsmurf_db: "/path/to/g3tsmurf.db"
read_db: "/path/to/readout_2_detector_manifest.db"
read_info: "/path/to/readout_2_detector_hdf5.h5"
det_db : "/path/to/det_info/wafer/det_info_manifest.db"
det_info : "/path/to/det_info/wafer/det_info_hdf5.h5"

arrays:
  # name must match DetMap array names
  - name: "Cv4"
    stream_id: "ufm_cv4"
    # Based on hardware config
    north_is_highband: False
    dark_bias_lines: []
    # how we want to call DetMap
    mapping :
      version : 0
      strategy: "assignment"
      # parameters for mapping strategy
      params: {
        "output_parent_dir":"/writable/path/",
        "do_csv_output": False,
        "verbose": False,
        "save_layout_plot": False,
        "show_layout_plot": False,
        }

Context file format

To load these metadata with context, these entries must be part of the context file. Since the detector hardware information loads off of the det_id field, which is loaded from the readout to detector mapping, the order of the metadata entries mater.

imports:
  - sotodlib.io.load_smurf
  - sotodlib.io.metadata

obs_loader_type: 'g3tsmurf'

metadata:
    - db: "/path/to/readout_2_detector_manifest.db"
      det_info: true
    - db: "/path/to/det_info/wafer/det_info_manifest.db"
      det_info: true
      multi: true

update_det_match

The update_det_match script will run the det_match module on any new detsets with available calibration metadata. It loads smurf and resonator information from the AxisManager metadata, and matches resonators against a solution file in the site-pipeline-configs.

To run, this script requires a config file described below. If run without the --all flag, it will only run one detset at a time. If run with the --all flag, will continue running until all detsets have been mantched.

usage: __main__.py [-h] [--all] config

Positional Arguments

config

path to config file

Named Arguments

--all

run all detsets

Default: False

Generated results

This generates the following data in the specified results directory:

  • A match file, with the path <results_path>/matches/<detset>.h5 is written for every detset.

  • The file <results_path>/assignment.sqlite is a manifestdb, that contains the mapping from readout-id to detector-id. This is compatible with the det_info_wafer and focal_plane metadata.

  • The <results_path>/det_match.sqlite file, that contains the det_match.Resonator data from the match for each resonator.

Configuration

This script takes in a config yaml file, which corresponds directly to the UpdateDetMatchesConfig class (see docs below).

For example, this can run simply with the config file:

results_path: /path/to/results
context_path: /path/to/context.yaml

Note that by default, this will run a scan of frequency offsets between the solution and the resonator metadata to find the freq-offset with the best match. To disable this, you can run a config file like the following:

results_path: /path/to/results
context_path: /path/to/context.yaml
freq_offset_range_args: None

Below is a more complex config used for SATp1 matching:

results_path: /so/metadata/satp1/manifests/det_match/satp1_det_match_240220m
context_path: /so/metadata/satp1/contexts/smurf_detcal.yaml
show_pb: False
freq_offset_range_args: NULL
apply_solution_pointing: False
solution_type: resonator_set
resonator_set_dir: /so/metadata/satp1/ancillary/detmatch_solutions/satp1_detmatch_solutions_240219r1
match_pars:
  freq_width: 0.2

Below is the full docs of the configuration class.

class sotodlib.site_pipeline.update_det_match.UpdateDetMatchesConfig(results_path: str, context_path: str, site_pipeline_root: str | None = None, wafer_map_path: str | None = None, freq_offset_range_args: tuple[float, float, float] | None = (-4, 4, 0.3), match_pars: Dict | None = None, detset_meta_name: str = 'smurf', detcal_meta_name: str = 'det_cal', show_pb: bool = False, apply_solution_pointing: bool = True, write_relpath: bool = True, solution_type: str = 'kaiwen_handmade', resonator_set_dir: str | None = None)[source]

Configuration for update script

Parameters:
  • results_path (str) – Path to directory where results such as matches, manifestdbs, and h5 files will be stored.

  • context_path (str) – Path to context file. This must contain detset and det_cal metadata.

  • site_pipeline_root (str) – Path to root of site-pipeline-configs. If $SITE_PIPELINE_CONFIG_DIR is set in the environment, that will be used as the default.

  • wafer_map_path (str) – Path to wafer-map to be used to find det-match solution files. If not specified, defaults to <site_pipeline_root>/shared/detmapping/wafer_map.yaml.

  • freq_offset_range_args (Optional[Tuple[float, float, float]]) – If this is not None, for each match, we will scan over a range of freq-offsets to determine the optimal offset to use. If set, must contain a tuple of floats, containing ([start,] stop, [step,]) that will be passed directly to np.arange. If it is None, will just run with the match with freq_offset_mhz=0.

  • match_pars (Optional[Dict]) – If not None, will be passed directly to det_match.MatchParams that is used by the det-match function.

  • detset_meta_name (str) – Name of the metadata entry in the context that contains detset info.

  • detcal_meta_name (str) – Name of the metadata entry in the context that contains det_cal info.

  • show_pb (bool) – Will show progress bar when scanning freq-offset.

  • apply_solution_pointing (bool) – If True, pointing information computed from design-detector positions will be used in the merged detset of the match.

  • write_relpath (bool) – If True, will use the relative path to the h5 file (relative to the db path) when writing to the manifestdb

  • solution_type (str) – Type of solutions to use. Must be one of [‘kaiwen_handmade’, ‘resonator_set’]. If ‘kaiwen_handmade’, will use the handmade solutions from Kaiwen pulled from the wafer_map file in the site-pipeline-configs. If resonator_set, must also specify the resonator_set_dir to pull solutions from.

  • resonator_set_dir (Optional[str]) – If solution_type is ‘resonator_set’, this must be specified and contain the path to the resonator-set solutions. This directory must have a res-set npy file for each stream_id that is expected in the matching, formatted like <resonator_set_dir>/<stream_id>.npy, which contains the result from np.save(fname, match.merged.as_array()).

freq_offsets

If not None, contains freq_offsets determined by freq_offset_range_args which will be scanned over.

Type:

Optional[np.ndarray]

analyze-bright-ptsrc

This script analyzes an observation of a bright point source (such as a planet) and performs per-detector fitting of beam parameters including amplitude and FWHM.

usage: __main__.py [-h] [--ctx_file CTX_FILE] --obs_id OBS_ID
                   --config_file_path CONFIG_FILE_PATH [--outdir OUTDIR]
                   [--test-mode] [--highpass] [--cutoff_high CUTOFF_HIGH]
                   [--lowpass] [--cutoff_low CUTOFF_LOW]
                   [--threshold_src THRESHOLD_SRC] [--do_abs_cal]
                   [--representative_dets REPRESENTATIVE_DETS [REPRESENTATIVE_DETS ...]]
                   [--plot_results]

Named Arguments

--ctx_file

The location of the context file.

--obs_id

Observation id in the context file.

--config_file_path

Location of configuration file that contains beam size and fitting parameters.

--outdir

The location for the .h5 output files to be stored.

--test-mode

Run analysis on a subset of detectors, to quickly check for problems.

Default: False

--highpass

If True, use highpass sine filter.

Default: False

--cutoff_high

The cutoff frequency to be used in the filtering.

--lowpass

If True, use lowpass sine filter.

Default: False

--cutoff_low

The cutoff frequency to be used in the filtering.

--threshold_src

The max amplitude required for the peak finding, given as times the standard deviation of the data.

Default: 10

--do_abs_cal

Do absolute calibration fit.

Default: False

--representative_dets

Representative detectors across the focal plane whose raw and fitted data should be plotted, given as a list of readout ids.

Default: “no detectors”

--plot_results

Make plots of planet footprint and fitted results.

Default: False

finalize-focal-plane

This element produces a finalized focal plane for a given array. It consumes the output of pointing fits (ie from analyze-bright-ptsrc) with a detector map to combine results across multiple tuning epochs. It works by averaging the provided analyze-bright-ptsrc results using weights, determined by how well each fit matches the nominal template, to produce a final focal plane. An affine transformation that lines up the template focal plane computed with physical optics is then computed to create a “noise-free” focal plane.

This element also computes the receiver and optics tube “common mode” transformation. The optics tube common mode is how all of the arrays in one optics tube move together, and the receiver common mode is how all of the optics tubes move together. In the case of the SATs where there is only one tube, the optics tube common mode is always taken to be the identity. Given the smaller number of data points, these common modes are simple rigid transforms (shift and rotation) rather than a full affine transform.

finalize_focal_plane can optionally be run in a “per obs” mode where no averaging is done, in this case the output database is indexed by obs_id.

class sotodlib.site_pipeline.finalize_focal_plane.Transform(shift: numpy.ndarray[Any, numpy.dtype[numpy.floating]], xieta_affine: dataclasses.InitVar[numpy.ndarray[Any, numpy.dtype[numpy.floating]]], gamma_scale: dataclasses.InitVar[float])[source]
shift: ndarray[Any, dtype[floating]]
xieta_affine: dataclasses.InitVar[numpy.ndarray[Any, numpy.dtype[numpy.floating]]]
gamma_scale: dataclasses.InitVar[float]
affine: ndarray[Any, dtype[floating]]
scale: ndarray[Any, dtype[floating]]
shear: float
rot: float
classmethod identity()[source]
decompose()[source]
save(f, path, append='')[source]
class sotodlib.site_pipeline.finalize_focal_plane.Template(det_ids: numpy.ndarray[Any, numpy.dtype[numpy.str_]], fp: numpy.ndarray[Any, numpy.dtype[numpy.floating]], optical: numpy.ndarray[Any, numpy.dtype[numpy.bool_]], pointing_cfg: dataclasses.InitVar[Dict])[source]
det_ids: ndarray[Any, dtype[str_]]
fp: ndarray[Any, dtype[floating]]
optical: ndarray[Any, dtype[bool_]]
pointing_cfg: dataclasses.InitVar[Dict]
center: ndarray[Any, dtype[floating]]
spacing: ndarray[Any, dtype[floating]]
class sotodlib.site_pipeline.finalize_focal_plane.FocalPlane(template: sotodlib.site_pipeline.finalize_focal_plane.Template, stream_id: str, n_aman: dataclasses.InitVar[int])[source]
template: Template
stream_id: str
n_aman: dataclasses.InitVar[int]
full_fp: ndarray[Any, dtype[floating]]
tot_weight: ndarray[Any, dtype[floating]]
avg_fp: ndarray[Any, dtype[floating]]
weights: ndarray[Any, dtype[floating]]
transformed: ndarray[Any, dtype[floating]]
center_transformed: ndarray[Any, dtype[floating]]
have_gamma: bool = False
n_point: ndarray[Any, dtype[int64]]
n_gamma: ndarray[Any, dtype[int64]]
transform: Transform
transform_nocm: Transform
map_to_template(aman)[source]
add_fp(i, fp, weights, template_msk)[source]
save(f, db_info, group)[source]
class sotodlib.site_pipeline.finalize_focal_plane.OpticsTube(pointing_cfg: dataclasses.InitVar[Dict])[source]
pointing_cfg: dataclasses.InitVar[Dict]
name: str
focal_planes: List[FocalPlane]
center: ndarray[Any, dtype[floating]]
center_transformed: ndarray[Any, dtype[floating]]
transform: Transform
save(f, db_info)[source]
sotodlib.site_pipeline.finalize_focal_plane.gamma_fit(src, dst)[source]

Fit the transformation for gamma. Note that the periodicity here assumes things are in radians.

Parameters:
  • src – Source gamma in radians

  • dst – Destination gamma in radians

Returns:

Scale applied to src

shift: Shift applied to scale*src

Return type:

scale

sotodlib.site_pipeline.finalize_focal_plane.main()[source]

Config file format

Here’s an annotated example:

# There are two options to get the data in
# One is to pass in ResultSets like so:
resultsets:
  obs_1: # obs_id associated with this data
    # There are 3 possible ResultSets you can pass
    # pointing is mandatory
    pointing:
      - "PATH/TO/FITS.h5" # The path to the ResultSet
      - "focalplane" # The name of the ResultSet in the h5 file
    # polarization and detmap are optional
    polarization :
      - "PATH/TO/FITS.h5"
      - "polarization"
    detmap:
      - "PATH/TO/DETMAP.h5"
      - "merged"
  obs_2: ...
# When using results sets you also need to pass in additional metadata like
stream_id: "ufm_mv29"
wafer_slot: "ws0"
telescope_flavor: "SAT"
tube_slot: "st1"
# Note that in the ResultSets case only single wafer fits are supported

# You can also load the data in with context like so
context:
  path: PATH/TO/CONTEXT
  # There are two pointing fields in case we have both a tod and map fit for one obs_id
  # This may change down the line
  map_pointing: "map_pointing" # The name of the map based pointing metadata field
  tod_pointing: "tod_pointing" # The name of the TOD based pointing metadata field
  polarization: "polarization" # The name of the polarization metadata field (optional)
  # There are two ways to specitfy the observation, obs_id and query
  # Both can be provided
  obs_id: [obs_1, obs_2] # Pass in the obs_id directly
  query: QUERY # Pass in a query
  # You can pass in detector restrictions here as well
  dets: {} # Should be a dict you would pass to the dets areg of ctx.get_meta

per_obs: False # Set to true if you want to run in per obs mode
weight_factor: 1000 # Weights are computed with sigma=template_spacing/weight_factor.
                    # This is an advanced feature and should be used with caution.

# There are a few ways to pass in a template as well
template: "PATH/TO/TEMPLATE.h5" # As a h5 file with a ResultSet named the same as the UFM
gen_template: False # Or by setting this true to generate the template on the fly

# You also will need to provide some information for using the optics code
pipeline_config_dir : "PATH/TO/PIPELINE/CONFIGS" # If not provided the sysvar $PIPELINE_CONFIG_DIR is used
zemax_path: "PATH/TO/ID9_checked_trace_data.npz" # Only needed for the LAT

# Plotting info
plot: True # Set to output plot
plot_dir: "./plots" # Where to save plots

# Output info
outdir: "."
append: "test" # Will have a "_" before it.

Output file format

The results of finalize_focal_plane are stored in an HDF5 file containing multiple datasets. The datasets are made using the ResultSet class and can be loaded back as such but metadata stored as attributes require h5py.

The datasets and attributes are organized by tube and array as seem below:

focal_plane.h5
- (attr) center # The nominal center of the receive on sky
- (attr) center_transformed # The center with the common mode transform applied
- (group) transform # The receiver common mode
- (group) tube1 # The first tube (ie st1, oti1, etc.)
  - (attr) center # The nominal center of the tube on sky
  - (attr) center_transformed # The center with the common mode transform applied
  - (group) transform # The tube common mode
  - (group) ufm_1 # The first ufm for thi tube (ie ufm_mv29)
    - (attr) template_centers # The nominal center for this array
    - (attr) fit_centers # The fit center for this array
    - (group) transform # The transform for the ufm, includes parameters with and without the common mode
    - (dataset) focal_plane # The focal_plane with just fit positions
      - (attr) measured_gamma # If gamma was actually measured
    - (dataset) focal_plane_full # Also includes avg positions, weights, and counts
  - (group) ufm_2
    ...
  ...

The focal_plane dataset contains four columns:

  • dets:det_id: The detector id

  • xi: The transformed template xi in radians

  • eta: The transformed template eta in radians

  • gamma: The transformed template gamma in radians.

If no polarization angles are provided them gamma will be populated with the nominal values from physical optics. There is an attribute called measured_gamma that will be False in this case.

The focal_plane_full dataset contains nine columns:

  • dets:det_id: The detector id

  • xi_t: The transformed template xi in radians

  • eta_t: The transformed template eta in radians

  • gamma_t: The transformed template gamma in radians.

  • xi_m: The measured xi in radians

  • eta_m: The measured eta in radians

  • gamma_m: The measured gamma in radians.

  • weights: The average weights of the measurements for this det.

  • n_point: The number of pointing fits used for the det.

  • n_gamma: The number of gamma fits used for this det.

All the attributes having to do with the centers of things are (1,3) arrays in the form ((xi), (eta), (gamma)) in radians.

This transformation for xi and eta is an affine transformation defined as \(m = An + t\), where:

  • m is the measured xi-eta pointing

  • n is the nominal xi-eta pointing

  • A is the 2x2 affine matrix

  • t is the final translation

A is then decomposed into a rotation of the xi-eta plane, a shear parameter, and a scale along each axis. This decomposition is done assuming the order as A = rotation*shear*scale.

For gamma the transformation is also technically affine, but since it is in just one dimension it can be described by a single shift and scale.

All of these results are stored as attributes in the transform groups. These nominally are:

  • affine: The full affine matrix

  • shift: The shift in (xi, eta, gamma) in radians

  • scale: The scale along (xi, eta, gamma) in radians

  • rot: The rotation of the xi-eta plane

  • shear: The shear of the xi-eta plane

The transform group for the arrays also include these attributes with whe common mode removed, the names have _nocm appended (ie rot_nocm).

Since the common mode transformations are fit as affine transforms scale will always be (1, 1, 1) and shear will be 0.

finalize_focal_plane will also output a ManifestDb as a file called db.sqlite in the output directory. By default this will be indexed by stream_id and will point to the focal_plane dataset. If you are running in per_obs mode then it will also be indexed by obs_id and will point to results associated with data observation. Be warned that in this case there will only be entries for observations with pointing fits, so design your context accordingly.

preprocess-tod

This script is set up to run a preprocessing pipeline using the preprocess module. See details in See details here for how to build a preprocessing pipeline.

This module includes the functions designed to be run as part of a batch script for automated analysis as well as options for loading AxisManagers that have all the preprocessing steps applied to them.

usage: __main__.py [-h] [--query QUERY] [--obs-id OBS_ID] [--overwrite]
                   [--min-ctime MIN_CTIME] [--max-ctime MAX_CTIME]
                   [--update-delay UPDATE_DELAY]
                   configs

Positional Arguments

configs

Preprocessing Configuration File

Named Arguments

--query

Query to pass to the observation list. Use 'string' to pass in strings within the query.

--obs-id

obs-id of particular observation if we want to run on just one

--overwrite

If true, overwrites existing entries in the database

Default: False

--min-ctime

Minimum timestamp for the beginning of an observation list

--max-ctime

Maximum timestamp for the beginning of an observation list

--update-delay

Number of days (unit is days) in the past to start observation list.

preprocess-obs

This script is set up to run a preprocessing pipeline using the preprocess module. See details in See details here for how to build an obs preprocessing pipeline.

This module is similar to preprocess_tod but removes grouping by detset so that the entire observation is loaded, without signal.

usage: __main__.py [-h] [--query QUERY] [--obs-id OBS_ID] [--overwrite]
                   [--min-ctime MIN_CTIME] [--max-ctime MAX_CTIME]
                   [--update-delay UPDATE_DELAY] [--tags [TAGS ...]]
                   configs

Positional Arguments

configs

Preprocessing Configuration File

Named Arguments

--query

Query to pass to the observation list. Use 'string' to pass in strings within the query.

--obs-id

obs-id of particular observation if we want to run on just one

--overwrite

If true, overwrites existing entries in the database

Default: False

--min-ctime

Minimum timestamp for the beginning of an observation list

--max-ctime

Maximum timestamp for the beginning of an observation list

--update-delay

Number of days (unit is days) in the past to start observation list.

--tags

Observation tags. Ex: –tags ‘jupiter’ ‘setting’

make-source-flags

Command line arguments

usage: make-source-flags [-h] [-c CONFIG_FILE] [-v] obs_id
Positional Arguments
obs_id

Observation for which to generate flags.

Named Arguments
-c, --config-file

Configuration file.

-v, --verbose

Pass multiple times to increase.

Default: 0

Config file format

Here’s an annotated example:

# Context for <whatever>
context_file: ./context4_b.yaml

# How to subdivide observations (by detset, but call it "wafer_slot")
subobs:
  use: detset
  label: wafer_slot

# Metadata index & archive filenaming
archive:
  index: 'archive.sqlite'
  policy:
    type: 'simple'
    filename: 'archive.h5'

# Mask parameters
mask_params:
  mask_res: [2, 'arcmin']
  default: {'xyr': [0., 0., 0.1]}

make-uncal-beam-map

This module produces maps for a single observation of a bright point source. The observation is identifed by an obs_id. The data for the observation may be divided into different detector groups; and each ‘group’ will be loaded and mapped independently (this will normally be associated with a “detset”). The data for each observation in each group may be further subdivided into ‘data splits’; this normally corresponds to frequency “band”.

sotodlib.site_pipeline.make_uncal_beam_map.get_parser(parser=None)[source]
sotodlib.site_pipeline.make_uncal_beam_map.plot_map(bundle, filename=None, tod=None, obs_info=None, det_info=None, focal_plane=None, det_mask=None, group=None, subset=None, zoom_size=None, title=None, **kwargs)[source]
sotodlib.site_pipeline.make_uncal_beam_map.main(config_file=None, obs_id=None, verbose=0, test=False)[source]

Entry point.

Command line arguments

usage: make-uncal-beam-map [-h] [-c CONFIG_FILE] [-v] [--test] obs_id
Positional Arguments
obs_id

Observation for which to make source map.

Named Arguments
-c, --config-file

Configuration file.

-v, --verbose

Pass multiple times to increase.

Default: 0

--test

Reduce detector count for quick tests.

Default: False

Config file format

Here’s an annotated example:

# Data source
context_file: ./act_uranus/context.yaml

# Sub-observation data grouping
subobs:
  use: detset
  label: wafer_slot

# Database of results
archive:
  index: 'archive.sqlite'
  policy:
    type: 'directory'
    root_dir: './'
    pattern: 'maps/{product_id}'

# Output selection and naming
output:
  map_codes: ['solved', 'weights']
  pattern: '{product_id}_{split}_{map_code}.fits'

# Plot generation
plotting:
  zoom:
    f090: [10, arcmin]
    f150: [10, arcmin]

# Preprocessing
preprocessing:
  cal_keys: ['abscal', 'relcal']
  pointing_keys: ['boresight_offset']

# mapmaking parameters
mapmaking:
  force_source: Uranus
  res:
    f090: [15, arcsec]
    f150: [15, arcsec]

Inputs

The Context should cause the TOD to be loaded with all supporting metadata loaded into the AxisManager. Here are key members that will be processed:

  • Deconvolution step:

    • 'timeconst'

    • 'iir_params'

  • Calibration:

    • Whatever is listed in preprocessing.cal_keys

  • Pointing correction:

    • 'boresight_offset'

  • Demodulation and downsampling:

    • not implemented

  • Planet mapmaking:

    • 'source_flags'

    • 'glitch_flags' - optional

update-hwp-angle

Script for running updates on (or creating) a hwp angle g3 file. This script will run periodically even when hwp is not spinning. Meaning is designed to work from something like a cronjob. The output hwp angle should be synchronized to SMuRF timing outside this script. See details here.

Command line arguments

Analyze HWP encoder data from level-2 HK data, and produce HWP angle solution for all times.

usage: update_hwp_angle [-h] -c CONFIG_FILE [-d DATA_DIR] [-o OUTPUT_DIR]
                        [--update-delay UPDATE_DELAY] [--file FILE]
                        [--verbose VERBOSE]
Named Arguments
-c, --config-file

Configuration File for running update_hwp_angle

-d, --data-dir

input data directory, overwrite config data_dir

-o, --output-dir

output data directory, overwrite config output_dir

--update-delay

Days to subtract from now to set as minimum ctime. Set to 0 to build from scratch

Default: 2

--file

Force processing of a specific file, overriding the standard selection process. The file must be in the usual data tree, though. You may specify either the file basename (1234567890.g3) or the full path.

--verbose

increase output verbosity. 0: Error, 1: Warning, 2: Info(default), 3: Debug

Default: 2

make-hwp-solutions

This element generates HWP angle-related metadata, which contains the calibrated HWP angle and flags. The HWP angle is synchronized with the input SMuRF timestamp. See details here.

Command line arguments

usage: make_hwp_solutions [-h] [-o OUTPUT_DIR] [--verbose VERBOSE]
                          [--overwrite] [--load-h5] [--query QUERY]
                          [--min-ctime MIN_CTIME] [--max-ctime MAX_CTIME]
                          [--obs-id OBS_ID]
                          context HWPconfig
Positional Arguments
context

Path to context yaml file to define observation for which to generate hwp angle.

HWPconfig

Path to HWP configuration yaml file.

Named Arguments
-o, --output-dir

output data directory, overwrite config output_dir

--verbose

increase output verbosity. 0: Error, 1: Warning, 2: Info(default), 3: Debug

Default: 2

--overwrite

If true, overwrites existing entries in the database

Default: False

--load-h5

If true, try to load raw encoder data from h5 file

Default: False

--query

Query to pass to the observation list. Use 'string' to pass in strings within the query.

--min-ctime

Minimum timestamp for the beginning of an observation list

--max-ctime

Maximum timestamp for the beginning of an observation list

--obs-id

obs-id of particular observation if we want to run on just one

make-ml-map

This submodule can be used to call the maximum likelihood mapmaker. The mapmaker will produce bin, div and sky maps. The mapmaker has several different flags (see the example config file below) that can be passed via the CLI or a config.yaml file. If an argument is not specified, a value is selected from a set of defaults.

The arguments freq, area and context are required; they should either be supplied through the CLI or the config.yaml.

Command line arguments

usage: __main__.py [-h] [--config-file CONFIG_FILE] [--query QUERY]
                   [--freq FREQ] [--area AREA] [--odir ODIR] [--prefix PREFIX]
                   [-C COMPS] [-c CONTEXT] [-n NTOD] [--tods TODS]
                   [--nset NSET] [-N NMAT] [--max-dets MAX_DETS] [-S SITE]
                   [-v] [-q] [-@ CENTER_AT] [-w WINDOW] [-i INJECT] [--nocal]
                   [--nmat-dir NMAT_DIR] [--nmat-mode NMAT_MODE]
                   [-d DOWNSAMPLE] [--maxiter MAXITER] [-T TILED]
                   [-W WAFER [WAFER ...]]
Named Arguments
--config-file

Path to mapmaker config.yaml file

--query
--freq

Frequency band. (f090, f150…)

--area

Path to FITS file describing the mapping geometry

--odir

Directory for saving output maps

--prefix

Filename prefix. ({prefix}_sky_map.fits)

-C, --comps

T,Q, and/or U

-c, --context

Context containing TODs

-n, --ntod

Special case of tods above. Implemented as follows: [:ntod]

--tods

Restrict TOD selections by index

--nset

Number of detsets kept

-N, --nmat

‘corr’ or ‘uncorr’

--max-dets

Maximum number of dets kept

-S, --site

Observatory site

-v, --verbose
-q, --quiet
-@, --center-at
-w, --window
-i, --inject
--nocal

No relcal or abscal

Default: True

--nmat-dir

Directory to where nmats are loaded from/saved to

--nmat-mode

How to build the noise matrix. ‘build’: Always build from tod. ‘cache’: Use if available in nmat-dir, otherwise build and save. ‘load’: Load from nmat-dir, error if missing. ‘save’: Build from tod and save.

-d, --downsample

Downsample TOD by this factor

--maxiter

Maximum number of iterative steps

-T, --tiled
-W, --wafer

Detector wafer subset to map with

Default Mapmaker Values

The following code block contains the hard-coded default values for non- essential mapmaker arguments. The can be overidden in the CLI or in the config.yaml.

defaults = {"query": "1",
            "comps": "T",
            "ntod": None,
            "tods": None,
            "nset": None,
            "site": 'so_lat',
            "nmat": "corr",
            "max_dets": None,
            "verbose": 0,
            "quiet": 0,
            "center_at": None,
            "window": 0.0,
            "inject": None,
            "nocal": True,
            "nmat_dir": "/nmats",
            "nmat_mode": "build",
            "downsample": 1,
            "maxiter": 500,
            "tiled": 1,
            "wafer": None,
           }

Config file format

Example of a config file:

 # Query
query: "1"

# Context file containing TODs
context: 'context.yaml'

# Telescope info
freq: 'f150'
site: 'so_lat'

# Mapping area footprint
area: 'geometry.fits'

# Output Directory and file name prefix
odir: './output/'
prefix: 'my_maps'

# Detectors info. null by default
tods: [::100] # Restrict TOD selections by index
ntod: 3 # Special case of `tods` above. Implemented as follows: [:ntod]
nset: 10 # Number of detsets kept
max-dets: 200 # Maximum dets kept
wafer: 'w17' # Restrict which wafers are mapped. Can do multiple wafers

# Mapmaking meta
comps: 'T' # TQU
inject: null
nocal: True # No relcal or abscal
downsample: 1 # Downsample TOD by this factor
tiled: 0 # Tiling boolean (0 or 1)
nmat-dir: './nmats/' # Dir to save or load nmat
nmat: 'corr' # 'corr' or 'uncorr'
maxiter: 500 # Max number of iterative steps
nmat_mode: 'build' # 'cache', 'build', 'load' or 'save'
center_at: null
window: 0.0
inject: null

# Scripting tools
verbose: True
quiet: False

QDS Monitor

The QDS Monitor is meant to be a simple to use class that allows users to publish the results of their calculations to a live monitor. The live monitor backend is an Influx Database, which is used with the SO Data Acquisition system, known as the Observatory Control System. This allows us to use the same live monitoring interface, Grafana.

Overview

The Monitor class wraps the InfluxDB interface, and provide a few simple methods – check, record, and write – detailed in the API section.

check is meant to be used to check if the calculation already has been performed for the given observation/tag set. This can be used to ensure expensive calculations are not repeated when running batch jobs. record takes your calculations, timestamps, and a set of identifying tags, and queues them for batch writing to the InfluxDB. Finally, write will write your recorded results to the InfluxDB, clearing the queue.

This perhaps is best demonstrated with some examples, shown in the next section.

Examples

Simple Pseudocode

The general outline we’re aiming for is as follows:

from sotodlib.site_pipeline.monitor import Monitor

# Initialize DB Connection
monitor = Monitor('localhost', 8086, 'qdsDB')

# Load observation
tod = so_data_load.load_observation(context,
          observation_id, detectors_list)

# Compute statistic
result = interesting_calculation(tod)

# Tag and write to DB
tags = {'telescope': 'LAT', 'wafer': wafer_name}
monitor.record('white_noise_level', result, timestamp, tags)
monitor.write()

Real World Example

The following is a real world example of the Monitor in action. We’ll walk through the important parts, omitting some descriptive print statements. The full script is included below.

To start, we will import the module and create our Monitor object. You will need to know the address and port for your InfluxDB, as well as the name of the database within InfluxDB that you want to write to.:

from sotodlib.site_pipeline.monitor import Monitor

monitor = Monitor('localhost', 8086, 'qds')

Note

Secure connection to an external InfluxDB is supported. To connect use to https://example.com/influxdb/ use:

monitor = Monitor(host='example.com',
                  port=443,
                  username=u'username',
                  password=u'ENTER PASSWORD HERE',
                  path='influxdb',
                  ssl=True)

Let’s say we want to load some of the sims, we’ll create our Context and get the observations with:

context = core.Context('pipe_s0001_v2.yaml')
observations = context.obsfiledb.get_obs()

Then we can, for example, loop over all observations, determining the detectors and wafers in each observation:

for obs_id in observations:
    c = context.obsfiledb.conn.execute('select distinct DS.name, DS.det from detsets DS '
                    'join files on DS.name=files.detset '
                    'where obs_id=?', (obs_id,))
    dets_in_obs = [tuple(r) for r in c.fetchall()]
    wafers = np.unique([x[0] for x in dets_in_obs])

We’ll run our calculation for each wafer, so let’s loop over those now, building a detector list for the wafer, and loading the TOD for just those detectors and computing their FFTs:

for wafer in wafers:
    det_list = build_det_list(dets_in_obs, wafer)
    tod = so_data_load.load_observation(context.obsfiledb, obs_id, dets=det_list)

    # Compute ffts
    ffts, freqs = rfft(tod)
    det_white_noise = calculate_noise(tod, ffts, freqs)

Now we want to save our results to the monitor. To do this, we’ll need two other lists, one for the timestamps associated with each noise value (in this case, these are all the same, and use the first timestamp in the TOD), and one for the tags for each noise value (in this example we tag each detector individually with their detector ID, along with the wafer it is on and what telescope we’re working with – this probably is in the context somewhere, but I’m just writing in SAT1):

timestamps = np.ones(len(det_white_noise))*tod.timestamps[0]
base_tags = {'telescope': 'SAT1', 'wafer': wafer}
tag_list = []
for det in det_list:
    det_tag = dict(base_tags)
    det_tag['detector'] = det
    tag_list.append(det_tag)
log_tags = {'observation': obs_id, 'wafer': wafer}
monitor.record('white_noise_level', det_white_noise, timestamps, tag_list, 'detector_stats', log_tags=log_tags)
monitor.write()

We also include a set of log tags, these are to record that we’ve completed this calculation for this observation and wafer. Lastly we record the measurement, giving it the name “white_noise_level”, passing our three lists of equal length (det_white_noise, timestamps, tag_list), and recording the measurement as completed in the “detector_stats” log with the observation ID and wafer log tags.

Where these log tags could come in handy is if we need to stop and restart our calculation and want to skip recomputing the results. Since we saved the wafer along with the observation ID it would make sense to check at the wafer level loop:

for wafer in wafers:
    # Check calculation completed for this wafer
    check_tags = {'wafer': wafer}
    if monitor.check('white_noise_level', obs_id, check_tags):
        continue

Add this to the top of our wafer loop would skip already recorded wafers for this observation id.

The example script in its entirety is shown here:

# Largely based on 20200514_FCT_Software_Example.ipynb from the pwg-fct
import numpy as np

from sotodlib import core
import sotodlib.io.load as so_data_load

from sotodlib.tod_ops import rfft

import qds

monitor = qds.Monitor('localhost', 56777, 'qds')

context = core.Context('pipe_s0001_v2.yaml')
observations = context.obsfiledb.get_obs()
print('Found {} Observations'.format(len(observations)))
o_list = range(len(observations)) # all observations

for o in o_list:
    obs_id = observations[o]
    print('Looking at observation #{} named {}'.format(o,obs_id))

    c = context.obsfiledb.conn.execute('select distinct DS.name, DS.det from detsets DS '
                            'join files on DS.name=files.detset '
                            'where obs_id=?', (obs_id,))
    dets_in_obs = [tuple(r) for r in c.fetchall()]
    wafers = np.unique([x[0] for x in dets_in_obs])

    print('There are {} detectors on {} wafers in this observation'.format(len(dets_in_obs), len(wafers)))

    for wafer in wafers:
        # Check calculation completed for this wafer
        check_tags = {'wafer': wafer}
        if monitor.check('white_noise_level', obs_id, check_tags):
            continue

        # Process Obs+Wafer
        # Build detector list for this wafer
        det_list = []
        for det in dets_in_obs:
            if det[0] == wafer:
                det_list.append(det[1])
        print('{} detectors on this wafer'.format(len(det_list)))

        tod = so_data_load.load_observation(context.obsfiledb, obs_id, dets=det_list )

        print('This observation is {} minutes long. Has {} detectors and {} samples'.format(round((tod.timestamps[-1]-tod.timestamps[0])/60.,2),
                                                                              tod.dets.count, tod.samps.count))

        print('This TOD AxisManager has Axes: ')
        for k in tod._axes:
            print('\t{} with {} entries'.format(tod[k].name, tod[k].count ) )

        print('This TOD  AxisManager has fields : [axes]')
        for k in tod._fields:
            print('\t{} : {}'.format(k, tod._assignments[k]) )
            if type(tod._fields[k]) is core.AxisManager:
                for kk in tod[k]._fields:
                    print('\t\t {} : {}'.format(kk, tod[k]._assignments[kk] ))

        # Compute the FFT and detector white noise levels
        ffts, freqs = rfft(tod)

        tsamp = np.median(np.diff(tod.timestamps))
        norm_fact = (1.0/tsamp)*np.sum(np.abs(np.hanning(tod.samps.count))**2)

        fmsk = freqs > 10
        det_white_noise = 1e6*np.median(np.sqrt(np.abs(ffts[:,fmsk])**2/norm_fact), axis=1)

        # Publish to monitor
        timestamps = np.ones(len(det_white_noise))*tod.timestamps[0]
        base_tags = {'telescope': 'LAT', 'wafer': wafer}
        tag_list = []
        for det in det_list:
            det_tag = dict(base_tags)
            det_tag['detector'] = det
            tag_list.append(det_tag)
        log_tags = {'observation': obs_id, 'wafer': wafer}
        monitor.record('white_noise_level', det_white_noise, timestamps, tag_list, 'detector_stats', log_tags=log_tags)
        monitor.write()

API

class sotodlib.site_pipeline.monitor.Monitor(host, port, database='qds', username='root', password='root', path='', ssl=False)[source]
classmethod from_configs(configs)[source]

Create a monitor from a configuration file

Parameters:

configs (dict or string) – configuration dictionary or string that’s a file name that can be loaded by yaml into a configuration dictionary

Return type:

connected Monitor Instance

check(field, observation, tags, log='obs_process_log')[source]

Check if monitored measurement has been reacorded already.

All recorded measurement fields within the Monitor are tracked in a log within InfluxDB. This check will search this log with a search like:

SELECT {field} FROM "log" WHERE observation = {observation} AND
    {tag1} = '{value1}' AND {tag2} = '{value2}';
Parameters:
  • field (str) – Measurement field to check calculation for, i.e. “white_noise_level”

  • observation (str) – Observation ID

  • tags (dict) – Other tags to included in AND search

  • log (str) – Measurement name for the log within influxdb

Returns:

True if calculation already performed, False otherwise

Return type:

bool

record(field, values, timestamps, tags, measurement, log='obs_process_log', log_tags=None)[source]

Record a monitored statistic to the InfluxDB. Values not written to DB until Monitor.write() is called.

Parameters:
  • field (str) – Measurement field, i.e. “white_noise_level”

  • values (list or np.array) – Values for the field for each unique set of tags and timestamps

  • timestamps (list or np.array) – Timestamps for the field values

  • tags (list of dict) – List of dictionaries containing tags for the InfluxDB

  • measurement (str) – InfluxDB measurement to record to

  • log (str) – InfluxDB measurement to use for logging completed calculation

  • log_tags (list of dict) – Tags to use for the log, typically you won’t want to record you’ve completed a calculation for each individual detector, but maybe some higher level group. If this is None tags will be used.

write()[source]

Write points to InfluxDB, clearing the queue.

Support

class sotodlib.site_pipeline.util.ArchivePolicy(**kwargs)[source]

Storage policy assistance. Helps to determine the HDF5 filename and dataset name for a result.

Make me better!

static from_params(params)[source]
get_dest(product_id)[source]

Returns (hdf_filename, dataset_addr).

class sotodlib.site_pipeline.util.DirectoryArchivePolicy(**kwargs)[source]

Storage policy for stuff organized directly on the filesystem.

get_dest(**kw)[source]

Returns full path to destination directory.

sotodlib.site_pipeline.util.parse_quantity(val, default_units=None)[source]

Convert an expression with units into an astropy Quantity.

Parameters:
  • val – the expression (see Notes).

  • default_units – the units to assume if they are not provided in val.

Returns:

The astropy Quantity decoded from the argument. Note the quantity is converted to the default_units, if they are provided.

Notes

The default_units, if provided, should be “unit-like”, by which we mean it is either:

  • An astropy Unit.

  • A string that astropy.units.Unit() can parse.

The val can be any of the following:

  • A tuple (x, u) or list [x, u], where x is a float and u is unit-like.

  • A string (x), where x can be parsed by astropy.units.Quantity.

  • A float (x), but only if default_units is not None.

Examples

>>> parse_quantity('100 arcsec')
<Quantity 100. arcsec>
>>> parse_quantity([12., 'deg'])
<Quantity 12. deg>
>>> parse_quantity('15 arcmin', 'deg')
<Quantity 0.25 deg>
>>> parse_quantity(100, 'm')
<Quantity 100. m>
sotodlib.site_pipeline.util.lookup_conditional(source, key, tags=None, default=<class 'KeyError'>)[source]

Lookup a value in a dict, with the possibility of descending through nested dictionaries using tags provided by the user.

This function returns the returns source[key] unless source[key] is a dict, in which case the tags (a list of strings) are each tested in the dict to see if they lead to a sub-setting.

For example, if the source dictionary is {‘number’: {‘a’: 1, ‘b’: 2}} and the user requests key ‘number’, with tags=[‘a’], then the returned value will be 1.

If you want a dict to be returned literally, and not crawled further, include a dummy key ‘_stop_here’, with arbitrary value (this key will be removed from the result before returning to the user).

The key ‘_default’ will always cause a match, even if none of the other tags match. (This _default value also becomes the default if further recursion fails to yield an exact match.)

Parameters:
  • source (dict) – The parameter tree to search.

  • key (str) – The key to terminate the search on.

  • tags (list of str or None) – tags that may be auto-descended.

  • default – Value to return if the search does not resolve. The special value KeyError will instead cause a KeyError to be raised if the search is not resolved.

Examples:

source = {
  'my_param': {
    '_default': 100.,
    'f150': 90.
  }
}

lookup_conditional(source, 'my_param')
  => 100.

lookup_conditional(source, 'my_param', tags=['f090'])
  => 100.

lookup_conditional(source, 'my_param', tags=['f150'])
  => 90.

lookup_conditional(source, 'my_other_param')
  KeyError!

lookup_conditional(source, 'my_other_param', default=0)
  => 0

# Note _default takes precedence over default argument.
lookup_conditional(source, 'my_param', default=0)
  => 100.

# Nested example:
source = {
  'fit_params': {
    '_default': {
      'a': 12,
      'b': 100,
      '_stop_here': None,  # don't descend any further.
    },
    'f150': {
      'SAT': {
        'a': 1000,
        'b': 1200,
        '_stop_here': None,
      },
      'LAT': {
        'a': 1,
        'b': 2,
        '_stop_here': None,
      },
    },
  },
}

lookup_conditional(source, 'fit_params', tags=['f150', 'LAT'])
  => {'a': 1, 'b': 2}

lookup_conditional(source, 'fit_params', tags=['LAT'])
  => {'a': 12, 'b': 100}

lookup_conditional(source, 'fit_params', tags=['f150'])
  => {'a': 12, 'b': 100}
sotodlib.site_pipeline.util.init_logger(name, announce='')[source]

Configure and return a logger for site_pipeline elements. It is disconnected from general sotodlib (propagate=False) and displays relative instead of absolute timestamps.

sotodlib.site_pipeline.util.main_launcher(main_func, parser_func, args=None)[source]

Launch an element’s main entry point function, after generating a parser and executing it on the command line arguments (or args if it is passed in).

Parameters:
  • main_func – the main entry point for a pipeline element.

  • parser_func – the argument parser generation function for a pipeline element.

  • args (list of str) – arguments to parse (default is None, which will lead to sys.argv[1:]).

Returns:

Whatever main_func returns.