Source code for sotodlib.qa.metrics

import re
import numpy as np

from ..core import metadata


def _has_tag(root, keys):
    keys = keys.split(".")
    for key in keys:
        if key in root:
            root = root[key]
        else:
            return False
    return True

def _get_tag(root, keys, i):
    keys = keys.split(".")
    for key in keys:
        root = root[key]
    return root[i]


[docs] class QAMetric(object): """ Base class for quality assurance metrics to be recorded in Influx, derived from processed metadata files. The base class includes methods for recording a metric value to Influx, checking if a given obs_id has been recorded for this metric, and fetching a list of obs_id's that can be recorded. Metrics are labelled by the observation ID they describe, an Influx 'measurement' and 'field', and any number of additional tags. A measurement can have multiple fields associated with it, and here we've tried to organize the metrics so that a source of information (e.g. HWP angle solutions) is a measurement and relevant quantities describing it (e.g. HWP success, mean rate, etc) are fields. Subclasses should implement the `_process` and `_get_available_obs` methods to access the source of data they are going to tap, as well as define the `_influx_meas` and `_influx_field` attributes. """ # these will be defined by child classes _influx_meas = None # measurement to record to _influx_field = None # the field to populate def __init__(self, context, monitor, log="qa_metrics_log"): """ A QA metric base class. Arguments --------- context : core.Context Context that includes all necessary metadata to generate metrics. monitor : site_pipeline.monitor.Monitor InfluxDB connection. log : str InfluxDB measurement that is used to log new entries. """ self.context = context self.monitor = monitor self._influx_log = log
[docs] def exists(self, obs_id, tags={}): """ Check if a metric exists for this obs_id in Influx. Arguments --------- obs_id : str The observation ID to check. tags : dict (optional) Further restrict to given tags. Returns ------- exists : bool Whether it exists or not. """ return self.monitor.check(self._influx_field, obs_id, tags, log=self._influx_log)
[docs] def get_existing_obs(self): """ Get a list of observations already recorded to Influx. """ # query influx log measurement for observations of this field res = self.monitor.client.query( f"SELECT observation FROM {self._influx_log} WHERE \"metric_type\" = '{self._influx_field}'" ).get_points() return [r["observation"] for r in list(res)]
[docs] def process_and_record(self, obs_id, meta=None): """ Generate a metric for this obs_id and record it to InfluxDB. Arguments --------- obs_id : str The observation ID to process. meta : AxisManager (optional) The metadata for this observation ID. If not provided will read from file. """ if meta is None: meta = self.context.get_meta(obs_id, ignore_missing=True) if meta.obs_info.obs_id != obs_id: raise Exception(f"Metadata does not correspond to obs_id {obs_id}.") line = self._process(meta) log_tags = {"metric_type": self._influx_field} # used to identify this entry self.monitor.record(**line, log=self._influx_log, measurement=self._influx_meas, log_tags=log_tags, qa_metrics=True, obs_id=obs_id) self.monitor.write()
[docs] def get_new_obs(self): """ Get a list of available observations not yet recorded to InfluxDB. """ # get a list of obs_id to update avail_obs = set(self._get_available_obs()) exist_obs = set(self.get_existing_obs()) return avail_obs - exist_obs
def _process(self, meta): """ Implement this to process an actual metric.""" raise NotImplementedError def _get_available_obs(self): """ Implement this for a given metric.""" raise NotImplementedError
[docs] class PreprocessQA(QAMetric): """ A metric derived from a preprocesstod process. """ _influx_meas = "preprocesstod" _process_args = {} def __init__(self, context, monitor, process_name, process_args={}, **kwargs): """ In addition to the context and monitor, pass the name of the preprocess process to record. It should have a `gen_metric` method implemented and `_influx_field` attribute. """ self._process_args = process_args super().__init__(context, monitor, **kwargs) from sotodlib.preprocess import Pipeline proc = Pipeline.PIPELINE.get(process_name, None) if proc is None: raise Exception(f"No preprocess process with name {process_name}") self._pipe_proc = proc # get the field name from the process self._influx_field = self._pipe_proc._influx_field def _process(self, meta): return self._pipe_proc.gen_metric(meta, meta.preprocess, **self._process_args) def _get_available_obs(self): # find preprocess manifest file man_file = [p["db"] for p in self.context["metadata"] if re.match(".*preprocess", p.get("db", ""))] if len(man_file) == 0: raise Exception(f"No preprocess metadata block in context {self.context.filename}.") # load manifest and read available observations man_db = metadata.ManifestDb.from_file(man_file[0]) return [o[0] for o in man_db.get_entries(["\"obs:obs_id\""]).asarray()]
# inherit from PreprocessQA to reuse available_obs method class PreprocessValidDets(PreprocessQA): """A metric for the number of detectors deemed valid at the end of the preprocesstod processing. For each wafer slot and bandpass, the number of detectors for which the fraction of samples that were valid is greater than a configurable threshold is recorded. The config entry supports a `process_args` block where the following options can be specified: tags : dict The values are keys into `metadata.det_info` to record as tags with the Influx line. The keys are addded to the default list ["wafer_slot", "tel_tube", "bandpass"] with "bandpass" being taken from "wafer.bandpass" or "det_cal.bandpass" if the former isn't found. thresh : float The threshold for the fraction of valid samples above which a detector is deemed good (default 0.75) process_name : str The process from which to read the valid dataset. (default 'glitches') """ _influx_meas = "preprocesstod" _influx_field = "num_valid_dets" def __init__( self, *args, process_args={}, **kwargs ): # bypass the PreprocessQA __init__ super(PreprocessQA, self).__init__(*args, **kwargs) # extract parameters self._tags = process_args.get("tags", {}) self._thresh = process_args.get("thresh", 0.75) self._key = process_args.get("process_name", "glitches") def _process(self, meta): # add specified tags tag_keys = { "wafer_slot": "wafer_slot", "tel_tube": "tel_tube", } if _has_tag(meta.det_info, 'wafer.bandpass'): bandpasses = meta.det_info.wafer.bandpass tag_keys["bandpass"] = "wafer.bandpass" else: bandpasses = meta.det_info.det_cal.bandpass tag_keys["bandpass"] = "det_cal.bandpass" tag_keys.update(self._tags) # record one metric per wafer slot, per bandpass # extract these tags for the metric tags = [] vals = [] for bp in np.unique(bandpasses): for ws in np.unique(meta.det_info.wafer_slot): subset = np.where( (meta.det_info.wafer_slot == ws) & (bandpasses == bp) )[0] if len(subset) > 0: # Compute the number of samples that are valid frac_valid = np.array([ np.dot(r.ranges(), [-1, 1]).sum() / len(subset) for r in meta.preprocess[self._key].valid[subset] ]) # Count detectors with fraction valid above threshold n_good = (frac_valid > self._thresh).sum() # get the tags for this wafer (all detectors in this subset share these) tags_i = { k: _get_tag(meta.det_info, i, subset[0]) for k, i in tag_keys.items() if _has_tag(meta.det_info, i) } tags_i["telescope"] = meta.obs_info.telescope # add tags and values to respective lists in order vals.append(n_good) tags.append(tags_i) obs_time = [meta.obs_info.timestamp] * len(vals) return { "field": self._influx_field, "values": vals, "timestamps": obs_time, "tags": tags, } # inherit from PreprocessQA to reuse available_obs method class PreprocessArrayNoise(PreprocessQA): """Generate a QA metric for array Noise (usually NEP) values for each wafer slot and bandpass. The config entry supports a `process_args` block where the following options can be specified: tags : dict The values are keys into `metadata.det_info` to record as tags with the Influx line. The keys are addded to the default list ["wafer_slot", "tel_tube", "bandpass"] with "bandpass" being taken from "wafer.bandpass" or "det_cal.bandpass" if the former isn't found. noise_aman : str The name of the axis manager that holds the white noise array. (default 'noise') """ _influx_meas = "preprocesstod" _influx_field = "array_noise" def __init__( self, *args, process_args={}, **kwargs ): # bypass the PreprocessQA __init__ super(PreprocessQA, self).__init__(*args, **kwargs) # extract parameters self._tags = process_args.get("tags", {}) self._noise_aman = process_args.get("noise_aman", "noise") self._field_name = process_args.get("field_name", "") self._unit_factor = process_args.get("unit_factor", 1) def _process(self, meta): # record one metric per wafer_slot per bandpass # extract these tags for the metric tag_keys = { "wafer_slot": "wafer_slot", "tel_tube": "tel_tube", } if _has_tag(meta.det_info, 'wafer.bandpass'): bandpasses = meta.det_info.wafer.bandpass tag_keys["bandpass"] = "wafer.bandpass" else: bandpasses = meta.det_info.det_cal.bandpass tag_keys["bandpass"] = "det_cal.bandpass" tag_keys.update(self._tags) tags = [] vals = [] for bp in np.unique(bandpasses): for ws in np.unique(meta.det_info.wafer_slot): subset = np.where( (meta.det_info.wafer_slot == ws) & (bandpasses == bp) )[0] white_noise = meta.preprocess[self._noise_aman].white_noise[subset] * self._unit_factor mask = (white_noise != 0) & (~np.isnan(white_noise)) good_indices = np.nonzero(mask)[0] if good_indices.size > 0: vals.append(np.sqrt(1.0 / np.nansum(1.0 / (white_noise[good_indices])**2))) tags_base = { k: _get_tag(meta.det_info, i, subset[0]) for k, i in tag_keys.items() if _has_tag(meta.det_info, i) } tags_base["telescope"] = meta.obs_info.telescope tags.append(tags_base) obs_time = [meta.obs_info.timestamp] * len(tags) return { "field": f"{self._influx_field}{'_' + self._field_name if self._field_name else ''}", "values": vals, "timestamps": obs_time, "tags": tags, } # inherit from PreprocessQA to reuse available_obs method class PreprocessDetNoise(PreprocessQA): """Generate a QA metric for per detector noise (usually NEP) values for each wafer slot and bandpass. The config entry supports a `process_args` block where the following options can be specified: tags : dict The values are keys into `metadata.det_info` to record as tags with the Influx line. The keys are addded to the default list ["wafer_slot", "tel_tube", "bandpass"] with "bandpass" being taken from "wafer.bandpass" or "det_cal.bandpass" if the former isn't found. noise_aman : str The name of the axis manager that holds the white noise array. (default 'noise') """ _influx_meas = "preprocesstod" _influx_field = "det_noise" def __init__( self, *args, process_args={}, **kwargs ): # bypass the PreprocessQA __init__ super(PreprocessQA, self).__init__(*args, **kwargs) # extract parameters self._tags = process_args.get("tags", {}) self._noise_aman = process_args.get("noise_aman", "noise") self._field_name = process_args.get("field_name", "") self._unit_factor = process_args.get("unit_factor", 1) def _process(self, meta): # record one metric per wafer_slot per bandpass # extract these tags for the metric tag_keys = { "wafer_slot": "wafer_slot", "tel_tube": "tel_tube", } if _has_tag(meta.det_info, 'wafer.bandpass'): bandpasses = meta.det_info.wafer.bandpass tag_keys["bandpass"] = "wafer.bandpass" else: bandpasses = meta.det_info.det_cal.bandpass tag_keys["bandpass"] = "det_cal.bandpass" tag_keys.update(self._tags) tags = [] vals = [] for bp in np.unique(bandpasses): for ws in np.unique(meta.det_info.wafer_slot): subset = np.where( (meta.det_info.wafer_slot == ws) & (bandpasses == bp) )[0] white_noise = meta.preprocess[self._noise_aman].white_noise[subset] * self._unit_factor mask = (white_noise != 0) & (~np.isnan(white_noise)) good_indices = np.nonzero(mask)[0] if good_indices.size > 0: vals.append(np.sqrt(1.0 / np.nansum(1.0 / (white_noise[good_indices])**2)) * np.sqrt(len(good_indices))) tags_base = { k: _get_tag(meta.det_info, i, subset[0]) for k, i in tag_keys.items() if _has_tag(meta.det_info, i) } tags_base["telescope"] = meta.obs_info.telescope tags.append(tags_base) obs_time = [meta.obs_info.timestamp] * len(tags) return { "field": f"{self._influx_field}{'_' + self._field_name if self._field_name else ''}", "values": vals, "timestamps": obs_time, "tags": tags, }
[docs] class HWPSolQA(QAMetric): """ Base class for metrics derived from HWP angle solutions. Subclasses should implement the `_process` method. Some quantities are derived twice, once for each encoder, and these will require and `encoder` parameter to be provided to select which one to produce a metric for. This is indicated by setting the `_needs_encoder` attribute to `True`. """ _influx_meas = "hwp_solution" _needs_encoder = False # set this flag if encoder needs to be specified def __init__(self, context, monitor, encoder=None, **kwargs): super().__init__(context, monitor, **kwargs) self._encoder = encoder self._tags = {} if encoder is not None: if str(encoder) not in ["1", "2"]: raise Exception(f"Invalid value {encoder} for encoder parameter.") self._tags = {"encoder": self._encoder} elif self._needs_encoder: raise Exception("This metric needs an encoder to be specified on creation.") def _get_available_obs(self): # find preprocess manifest file man_file = [p["db"] for p in self.context["metadata"] if p.get("label", "") == "hwp_solution"] if len(man_file) == 0: raise Exception(f"No hwp_solution metadata block in context {self.context.filename}.") # load manifest and read available observations man_db = metadata.ManifestDb.from_file(man_file[0]) obs_re = re.compile("^obs_*.") return [o[0] for o in man_db.get_entries(["\"obs:obs_id\""]).asarray() if obs_re.match(o[0])] def _process(self, meta): tags = self._tags.copy() tags.update({"telescope": meta.obs_info.telescope}) obs_time = [meta.obs_info.timestamp] return { "field": self._influx_field, "values": [self._gen_value(meta)], "timestamps": obs_time, "tags": [tags], }
[docs] class HWPSolSuccess(HWPSolQA): """ Records success of the HWP angle solution calculation, for each encoder. 0: No data, 1: Success, 2: Fail """ _influx_field = "sol_success" _needs_encoder = True def _gen_value(self, meta): if meta.hwp_solution[f"logger_{self._encoder}"] == 'No HWP data': return 0 elif meta.hwp_solution[f"logger_{self._encoder}"] == "Angle calculation succeeded": return 1 else: return 2
[docs] class HWPSolPrimaryEncoder(HWPSolQA): """ The primary encoder used for the HWP angle calculation.""" _influx_field = "primary_encoder" def _gen_value(self, meta): return meta.hwp_solution["primary_encoder"]
[docs] class HWPSolVersion(HWPSolQA): """ The version of the solution used for the HWP angle calculation.""" _influx_field = "version" def _gen_value(self, meta): return meta.hwp_solution["version"]
[docs] class HWPSolOffcenter(HWPSolQA): """ Calculated offcentering of HWP angle solution.""" _influx_field = "offcenter" def _gen_value(self, meta): return meta.hwp_solution["offcenter"][0]
[docs] class HWPSolOffcenterErr(HWPSolQA): """ Standard error on the offcentering of HWP angle solution.""" _influx_field = "offcenter_err" def _gen_value(self, meta): return meta.hwp_solution["offcenter"][1]
[docs] class HWPSolNumSamples(HWPSolQA): """ The total number of encoder samples.""" _influx_field = "num_samples" _needs_encoder = True def _gen_value(self, meta): flag_key = f"filled_flag_{self._encoder}" return meta.hwp_solution[flag_key].size
[docs] class HWPSolNumFlagged(HWPSolQA): """ The number of encoder samples that were flagged.""" _influx_field = "num_flagged" _needs_encoder = True def _gen_value(self, meta): flag_key = f"filled_flag_{self._encoder}" return meta.hwp_solution[flag_key].sum()
[docs] class HWPSolMeanRate(HWPSolQA): """ The mean calculated HWP angle rate, for each encoder.""" _influx_field = "mean_rate" _needs_encoder = True def _gen_value(self, meta): good_samp = ~meta.hwp_solution[f"filled_flag_{self._encoder}"] nsamp = good_samp.sum() rate = 0.0 if nsamp == 0 else (meta.hwp_solution[f"hwp_rate_{self._encoder}"] * good_samp).sum() / nsamp return rate
[docs] class HWPSolMeanTemplate(HWPSolQA): """ The mean of the calculated template magnitude.""" _influx_field = "mean_template" _needs_encoder = True def _gen_value(self, meta): return np.mean(np.abs(meta.hwp_solution[f"template_{self._encoder}"]))
class HWPSolNumDroppedPacketsEncoder(HWPSolQA): """ The number of dropped encoder packets.""" _influx_field = "num_dropped_packets_encoder" _needs_encoder = True def _gen_value(self, meta): return meta.hwp_solution[f"num_dropped_packets_{self._encoder}"] class HWPSolNumDroppedPacketsIrig(HWPSolQA): """ The number of dropped IRIG packets.""" _influx_field = "num_dropped_packets_irig" _needs_encoder = True def _gen_value(self, meta): return meta.hwp_solution[f"num_dropped_packets_irig_{self._encoder}"] class HWPSolNumDataPointGlitchesEncoder(HWPSolQA): """ The number of data point glitches in encoder data.""" _influx_field = "num_data_point_glitches_encoder" _needs_encoder = True def _gen_value(self, meta): return meta.hwp_solution[f"num_glitches_{self._encoder}"] class HWPSolNumDataPointGlitchesIrig(HWPSolQA): """ The number of data point glitches in IRIG data.""" _influx_field = "num_data_point_glitches_irig" _needs_encoder = True def _gen_value(self, meta): return meta.hwp_solution[f"num_glitches_irig_{self._encoder}"] class HWPSolNumValueGlitchesEncoder(HWPSolQA): """ The number of value glitches in encoder data.""" _influx_field = "num_value_glitches_encoder" _needs_encoder = True def _gen_value(self, meta): return meta.hwp_solution[f"num_value_glitches_{self._encoder}"] class HWPSolNumValueGlitchesIrig(HWPSolQA): """ The number of value glitches in IRIG data.""" _influx_field = "num_value_glitches_irig" _needs_encoder = True def _gen_value(self, meta): return meta.hwp_solution[f"num_value_glitches_irig_{self._encoder}"] class HWPSolDeadRotations(HWPSolQA): """ The number of dead rotations that failed to correct glitches.""" _influx_field = "num_dead_rots" _needs_encoder = True def _gen_value(self, meta): return meta.hwp_solution[f"num_dead_rots_{self._encoder}"] class HWPSolDroppedSlits(HWPSolQA): """ The number of dropped slits.""" _influx_field = "num_dropped_slits" _needs_encoder = True def _gen_value(self, meta): return meta.hwp_solution[f"num_dropped_slits_{self._encoder}"]