Source code for sotodlib.qa.metrics

import re
import numpy as np

from ..core import metadata


def _has_tag(root, keys):
    keys = keys.split(".")
    for key in keys:
        if key in root:
            root = root[key]
        else:
            return False
    return True

def _get_tag(root, keys, i):
    keys = keys.split(".")
    for key in keys:
        root = root[key]
    return root[i]


[docs] class QAMetric(object): """ Base class for quality assurance metrics to be recorded in Influx, derived from processed metadata files. The base class includes methods for recording a metric value to Influx, checking if a given obs_id has been recorded for this metric, and fetching a list of obs_id's that can be recorded. Metrics are labelled by the observation ID they describe, an Influx 'measurement' and 'field', and any number of additional tags. A measurement can have multiple fields associated with it, and here we've tried to organize the metrics so that a source of information (e.g. HWP angle solutions) is a measurement and relevant quantities describing it (e.g. HWP success, mean rate, etc) are fields. Subclasses should implement the `_process` and `_get_available_obs` methods to access the source of data they are going to tap, as well as define the `_influx_meas` and `_influx_field` attributes. """ # these will be defined by child classes _influx_meas = None # measurement to record to _influx_field = None # the field to populate def __init__(self, context, monitor, log="qa_metrics_log"): """ A QA metric base class. Arguments --------- context : core.Context Context that includes all necessary metadata to generate metrics. monitor : site_pipeline.monitor.Monitor InfluxDB connection. log : str InfluxDB measurement that is used to log new entries. """ self.context = context self.monitor = monitor self._influx_log = log
[docs] def exists(self, obs_id, tags={}): """ Check if a metric exists for this obs_id in Influx. Arguments --------- obs_id : str The observation ID to check. tags : dict (optional) Further restrict to given tags. Returns ------- exists : bool Whether it exists or not. """ return self.monitor.check(self._influx_field, obs_id, tags, log=self._influx_log)
[docs] def get_existing_obs(self): """ Get a list of observations already recorded to Influx. """ # query influx log measurement for observations of this field res = self.monitor.client.query( f"select {self._influx_field}, observation from {self._influx_log}" ).get_points() return [r["observation"] for r in res]
[docs] def process_and_record(self, obs_id, meta=None): """ Generate a metric for this obs_id and record it to InfluxDB. Arguments --------- obs_id : str The observation ID to process. meta : AxisManager (optional) The metadata for this observation ID. If not provided will read from file. """ if meta is None: meta = self.context.get_meta(obs_id, ignore_missing=True) if meta.obs_info.obs_id != obs_id: raise Exception(f"Metadata does not correspond to obs_id {obs_id}.") line = self._process(meta) log_tags = {"observation": obs_id} # used to identify this entry self.monitor.record(**line, log=self._influx_log, measurement=self._influx_meas, log_tags=log_tags) self.monitor.write()
[docs] def get_new_obs(self): """ Get a list of available observations not yet recorded to InfluxDB. """ # get a list of obs_id to update avail_obs = set(self._get_available_obs()) exist_obs = set(self.get_existing_obs()) return avail_obs - exist_obs
def _process(self, meta): """ Implement this to process an actual metric.""" raise NotImplementedError def _get_available_obs(self): """ Implement this for a given metric.""" raise NotImplementedError
[docs] class PreprocessQA(QAMetric): """ A metric derived from a preprocesstod process. """ _influx_meas = "preprocesstod" _process_args = {} def __init__(self, context, monitor, process_name, process_args={}, **kwargs): """ In addition to the context and monitor, pass the name of the preprocess process to record. It should have a `gen_metric` method implemented and `_influx_field` attribute. """ self._process_args = process_args super().__init__(context, monitor, **kwargs) from sotodlib.preprocess import Pipeline proc = Pipeline.PIPELINE.get(process_name, None) if proc is None: raise Exception(f"No preprocess process with name {process_name}") self._pipe_proc = proc # get the field name from the process self._influx_field = self._pipe_proc._influx_field def _process(self, meta): return self._pipe_proc.gen_metric(meta, meta.preprocess, **self._process_args) def _get_available_obs(self): # find preprocess manifest file man_file = [p["db"] for p in self.context["metadata"] if p.get("label", "") == "preprocess"] if len(man_file) == 0: raise Exception(f"No preprocess metadata block in context {self.context.filename}.") # load manifest and read available observations man_db = metadata.ManifestDb.from_file(man_file[0]) return [o[0] for o in man_db.get_entries(["\"obs:obs_id\""]).asarray()]
# inherit from PreprocessQA to reuse available_obs method class PreprocessValidDets(PreprocessQA): """A metric for the number of detectors deemed valid at the end of the preprocesstod processing. For each wafer slot and bandpass, the number of detectors for which the fraction of samples that were valid is greater than a configurable threshold is recorded. The config entry supports a `process_args` block where the following options can be specified: tags : list Keys into `metadata.det_info` to record as tags with the Influx line. Added to the default list ["wafer_slot", "tel_tube", "wafer.bandpass"]. thresh : float The threshold for the fraction of valid samples above which a detector is deemed good (default 0.75) process_name : str The process from which to read the valid dataset. (default 'glitches') """ _influx_meas = "preprocesstod" _influx_field = "num_valid_dets" def __init__( self, *args, process_args={}, **kwargs ): # bypass the PreprocessQA __init__ super(PreprocessQA, self).__init__(*args, **kwargs) # extract parameters self._tags = process_args.get("tags", []) self._thresh = process_args.get("thresh", 0.75) self._key = process_args.get("process_name", "glitches") def _process(self, meta): # add specified tags tag_keys = ["wafer_slot", "tel_tube", "wafer.bandpass"] tag_keys += [t for t in self._tags if t not in tag_keys] # record one metric per wafer slot, per bandpass # extract these tags for the metric tags = [] vals = [] for bp in np.unique(meta.det_info.wafer.bandpass): for ws in np.unique(meta.det_info.wafer_slot): subset = np.where( (meta.det_info.wafer_slot == ws) & (meta.det_info.wafer.bandpass == bp) )[0] # Compute the number of samples that are valid frac_valid = np.array([ np.dot(r.ranges(), [-1, 1]).sum() / len(subset) for r in meta.preprocess[self._key].valid[subset] ]) # Count detectors with fraction valid above threshold n_good = (frac_valid > self._thresh).sum() # get the tags for this wafer (all detectors in this subset share these) tags_i = { k: _get_tag(meta.det_info, k, subset[0]) for k in tag_keys if _has_tag(meta.det_info, k) } tags_i["telescope"] = meta.obs_info.telescope # add tags and values to respective lists in order vals.append(n_good) tags.append(tags_i) obs_time = [meta.obs_info.timestamp] * len(vals) return { "field": self._influx_field, "values": vals, "timestamps": obs_time, "tags": tags, }
[docs] class HWPSolQA(QAMetric): """ Base class for metrics derived from HWP angle solutions. Subclasses should implement the `_process` method. Some quantities are derived twice, once for each encoder, and these will require and `encoder` parameter to be provided to select which one to produce a metric for. This is indicated by setting the `_needs_encoder` attribute to `True`. """ _influx_meas = "hwp_solution" _needs_encoder = False # set this flag if encoder needs to be specified def __init__(self, context, monitor, encoder=None, **kwargs): super().__init__(context, monitor, **kwargs) self._encoder = encoder self._tags = {} if encoder is not None: if str(encoder) not in ["1", "2"]: raise Exception(f"Invalid value {encoder} for encoder parameter.") self._tags = {"encoder": self._encoder} elif self._needs_encoder: raise Exception("This metric needs an encoder to be specified on creation.") def _get_available_obs(self): # find preprocess manifest file man_file = [p["db"] for p in self.context["metadata"] if p.get("label", "") == "hwp_solution"] if len(man_file) == 0: raise Exception(f"No hwp_solution metadata block in context {self.context.filename}.") # load manifest and read available observations man_db = metadata.ManifestDb.from_file(man_file[0]) obs_re = re.compile("^obs_*.") return [o[0] for o in man_db.get_entries(["\"obs:obs_id\""]).asarray() if obs_re.match(o[0])]
[docs] class HWPSolSuccess(HWPSolQA): """ Records success of the HWP angle solution calculation, for each encode.""" _influx_field = "logger" _needs_encoder = True def _process(self, meta): success = [meta.hwp_solution[f"logger_{self._encoder}"] == "Angle calculation succeeded"] obs_time = [meta.obs_info.timestamp] return { "field": self._influx_field, "values": success, "timestamps": obs_time, "tags": [self._tags], }
[docs] class HWPSolPrimaryEncoder(HWPSolQA): """ The primary encoder used for the HWP angle calculation.""" _influx_field = "primary_encoder" def _process(self, meta): # no tags for this metric return { "field": self._influx_field, "values": [meta.hwp_solution["primary_encoder"]], "timestamps": [meta.obs_info.timestamp], "tags": [self._tags], }
[docs] class HWPSolVersion(HWPSolQA): """ The version of the solution used for the HWP angle calculation.""" _influx_field = "version" def _process(self, meta): # no tags for this metric return { "field": self._influx_field, "values": [meta.hwp_solution["version"]], "timestamps": [meta.obs_info.timestamp], "tags": [self._tags], }
[docs] class HWPSolOffcenter(HWPSolQA): """ Calculated offcentering of HWP angle solution.""" _influx_field = "offcenter" def _process(self, meta): # no tags for this metric return { "field": self._influx_field, "values": [meta.hwp_solution["offcenter"][0]], "timestamps": [meta.obs_info.timestamp], "tags": [self._tags], }
[docs] class HWPSolOffcenterErr(HWPSolQA): """ Standard error on the offcentering of HWP angle solution.""" _influx_field = "offcenter_err" def _process(self, meta): # no tags for this metric return { "field": self._influx_field, "values": [meta.hwp_solution["offcenter"][1]], "timestamps": [meta.obs_info.timestamp], "tags": [self._tags], }
[docs] class HWPSolNumSamples(HWPSolQA): """ The total number of encoder samples.""" _influx_field = "num_samples" _needs_encoder = True def _process(self, meta): flag_key = f"filled_flag_{self._encoder}" frac = [meta.hwp_solution[flag_key].size] obs_time = [meta.obs_info.timestamp] return { "field": self._influx_field, "values": frac, "timestamps": obs_time, "tags": [self._tags], }
[docs] class HWPSolNumFlagged(HWPSolQA): """ The number of encoder samples that were flagged.""" _influx_field = "num_flagged" _needs_encoder = True def _process(self, meta): flag_key = f"filled_flag_{self._encoder}" frac = [meta.hwp_solution[flag_key].sum()] obs_time = [meta.obs_info.timestamp] return { "field": self._influx_field, "values": frac, "timestamps": obs_time, "tags": [self._tags], }
[docs] class HWPSolMeanRate(HWPSolQA): """ The mean calculated HWP angle rate, for each encoder.""" _influx_field = "mean_rate" _needs_encoder = True def _process(self, meta): good_samp = ~meta.hwp_solution[f"filled_flag_{self._encoder}"] nsamp = good_samp.sum() rate = np.nan if nsamp == 0 else (meta.hwp_solution[f"hwp_rate_{self._encoder}"] * good_samp).sum() / nsamp obs_time = [meta.obs_info.timestamp] return { "field": self._influx_field, "values": [rate], "timestamps": obs_time, "tags": [self._tags], }
[docs] class HWPSolMeanTemplate(HWPSolQA): """ The mean of the calculated template magnitude.""" _influx_field = "mean_template" _needs_encoder = True def _process(self, meta): obs_time = [meta.obs_info.timestamp] return { "field": self._influx_field, "values": [np.mean(np.abs(meta.hwp_solution[f"template_{self._encoder}"]))], "timestamps": obs_time, "tags": [self._tags], }