import re
import numpy as np
from ..core import metadata
def _has_tag(root, keys):
keys = keys.split(".")
for key in keys:
if key in root:
root = root[key]
else:
return False
return True
def _get_tag(root, keys, i):
keys = keys.split(".")
for key in keys:
root = root[key]
return root[i]
[docs]
class QAMetric(object):
""" Base class for quality assurance metrics to be recorded in Influx,
derived from processed metadata files.
The base class includes methods for recording a metric value to Influx,
checking if a given obs_id has been recorded for this metric, and fetching
a list of obs_id's that can be recorded.
Metrics are labelled by the observation ID they describe, an Influx 'measurement'
and 'field', and any number of additional tags. A measurement can have multiple
fields associated with it, and here we've tried to organize the metrics so that
a source of information (e.g. HWP angle solutions) is a measurement and relevant
quantities describing it (e.g. HWP success, mean rate, etc) are fields.
Subclasses should implement the `_process` and `_get_available_obs` methods to
access the source of data they are going to tap, as well as define the
`_influx_meas` and `_influx_field` attributes.
"""
# these will be defined by child classes
_influx_meas = None # measurement to record to
_influx_field = None # the field to populate
def __init__(self, context, monitor, log="qa_metrics_log"):
""" A QA metric base class.
Arguments
---------
context : core.Context
Context that includes all necessary metadata to generate metrics.
monitor : site_pipeline.monitor.Monitor
InfluxDB connection.
log : str
InfluxDB measurement that is used to log new entries.
"""
self.context = context
self.monitor = monitor
self._influx_log = log
[docs]
def exists(self, obs_id, tags={}):
""" Check if a metric exists for this obs_id in Influx.
Arguments
---------
obs_id : str
The observation ID to check.
tags : dict (optional)
Further restrict to given tags.
Returns
-------
exists : bool
Whether it exists or not.
"""
return self.monitor.check(self._influx_field, obs_id, tags, log=self._influx_log)
[docs]
def get_existing_obs(self):
""" Get a list of observations already recorded to Influx.
"""
# query influx log measurement for observations of this field
res = self.monitor.client.query(
f"SELECT observation FROM {self._influx_log} WHERE \"metric_type\" = '{self._influx_field}'"
).get_points()
return [r["observation"] for r in list(res)]
[docs]
def process_and_record(self, obs_id, meta=None):
""" Generate a metric for this obs_id and record it to InfluxDB.
Arguments
---------
obs_id : str
The observation ID to process.
meta : AxisManager (optional)
The metadata for this observation ID. If not provided will read from file.
"""
if meta is None:
meta = self.context.get_meta(obs_id, ignore_missing=True)
if meta.obs_info.obs_id != obs_id:
raise Exception(f"Metadata does not correspond to obs_id {obs_id}.")
line = self._process(meta)
log_tags = {"metric_type": self._influx_field} # used to identify this entry
self.monitor.record(**line, log=self._influx_log, measurement=self._influx_meas, log_tags=log_tags, qa_metrics=True, obs_id=obs_id)
self.monitor.write()
[docs]
def get_new_obs(self):
""" Get a list of available observations not yet recorded to InfluxDB.
"""
# get a list of obs_id to update
avail_obs = set(self._get_available_obs())
exist_obs = set(self.get_existing_obs())
return avail_obs - exist_obs
def _process(self, meta):
""" Implement this to process an actual metric."""
raise NotImplementedError
def _get_available_obs(self):
""" Implement this for a given metric."""
raise NotImplementedError
[docs]
class PreprocessQA(QAMetric):
""" A metric derived from a preprocesstod process.
"""
_influx_meas = "preprocesstod"
_process_args = {}
def __init__(self, context, monitor, process_name, process_args={}, **kwargs):
""" In addition to the context and monitor, pass the name of the
preprocess process to record. It should have a `gen_metric` method
implemented and `_influx_field` attribute.
"""
self._process_args = process_args
super().__init__(context, monitor, **kwargs)
from sotodlib.preprocess import Pipeline
proc = Pipeline.PIPELINE.get(process_name, None)
if proc is None:
raise Exception(f"No preprocess process with name {process_name}")
self._pipe_proc = proc
# get the field name from the process
self._influx_field = self._pipe_proc._influx_field
def _process(self, meta):
return self._pipe_proc.gen_metric(meta, meta.preprocess, **self._process_args)
def _get_available_obs(self):
# find preprocess manifest file
man_file = [p["db"] for p in self.context["metadata"] if re.match(".*preprocess", p.get("db", ""))]
if len(man_file) == 0:
raise Exception(f"No preprocess metadata block in context {self.context.filename}.")
# load manifest and read available observations
man_db = metadata.ManifestDb.from_file(man_file[0])
return [o[0] for o in man_db.get_entries(["\"obs:obs_id\""]).asarray()]
# inherit from PreprocessQA to reuse available_obs method
class PreprocessValidDets(PreprocessQA):
"""A metric for the number of detectors deemed valid at the end of the preprocesstod
processing. For each wafer slot and bandpass, the number of detectors for which the
fraction of samples that were valid is greater than a configurable threshold is recorded.
The config entry supports a `process_args` block where the following options can be
specified:
tags : dict
The values are keys into `metadata.det_info` to record as tags with
the Influx line. The keys are addded to the default list
["wafer_slot", "tel_tube", "bandpass"] with "bandpass" being taken
from "wafer.bandpass" or "det_cal.bandpass" if the former isn't found.
thresh : float
The threshold for the fraction of valid samples above which a detector is
deemed good (default 0.75)
process_name : str
The process from which to read the valid dataset. (default 'glitches')
"""
_influx_meas = "preprocesstod"
_influx_field = "num_valid_dets"
def __init__(
self,
*args,
process_args={},
**kwargs
):
# bypass the PreprocessQA __init__
super(PreprocessQA, self).__init__(*args, **kwargs)
# extract parameters
self._tags = process_args.get("tags", {})
self._thresh = process_args.get("thresh", 0.75)
self._key = process_args.get("process_name", "glitches")
def _process(self, meta):
# add specified tags
tag_keys = {
"wafer_slot": "wafer_slot",
"tel_tube": "tel_tube",
}
if _has_tag(meta.det_info, 'wafer.bandpass'):
bandpasses = meta.det_info.wafer.bandpass
tag_keys["bandpass"] = "wafer.bandpass"
else:
bandpasses = meta.det_info.det_cal.bandpass
tag_keys["bandpass"] = "det_cal.bandpass"
tag_keys.update(self._tags)
# record one metric per wafer slot, per bandpass
# extract these tags for the metric
tags = []
vals = []
for bp in np.unique(bandpasses):
for ws in np.unique(meta.det_info.wafer_slot):
subset = np.where(
(meta.det_info.wafer_slot == ws) & (bandpasses == bp)
)[0]
if len(subset) > 0:
# Compute the number of samples that are valid
frac_valid = np.array([
np.dot(r.ranges(), [-1, 1]).sum() / len(subset)
for r in meta.preprocess[self._key].valid[subset]
])
# Count detectors with fraction valid above threshold
n_good = (frac_valid > self._thresh).sum()
# get the tags for this wafer (all detectors in this subset share these)
tags_i = {
k: _get_tag(meta.det_info, i, subset[0]) for k, i in tag_keys.items() if _has_tag(meta.det_info, i)
}
tags_i["telescope"] = meta.obs_info.telescope
# add tags and values to respective lists in order
vals.append(n_good)
tags.append(tags_i)
obs_time = [meta.obs_info.timestamp] * len(vals)
return {
"field": self._influx_field,
"values": vals,
"timestamps": obs_time,
"tags": tags,
}
# inherit from PreprocessQA to reuse available_obs method
class PreprocessArrayNoise(PreprocessQA):
"""Generate a QA metric for array Noise (usually NEP) values
for each wafer slot and bandpass.
The config entry supports a `process_args` block where the following
options can be specified:
tags : dict
The values are keys into `metadata.det_info` to record as tags with
the Influx line. The keys are addded to the default list
["wafer_slot", "tel_tube", "bandpass"] with "bandpass" being taken
from "wafer.bandpass" or "det_cal.bandpass" if the former isn't found.
noise_aman : str
The name of the axis manager that holds the white noise array.
(default 'noise')
"""
_influx_meas = "preprocesstod"
_influx_field = "array_noise"
def __init__(
self,
*args,
process_args={},
**kwargs
):
# bypass the PreprocessQA __init__
super(PreprocessQA, self).__init__(*args, **kwargs)
# extract parameters
self._tags = process_args.get("tags", {})
self._noise_aman = process_args.get("noise_aman", "noise")
self._field_name = process_args.get("field_name", "")
self._unit_factor = process_args.get("unit_factor", 1)
def _process(self, meta):
# record one metric per wafer_slot per bandpass
# extract these tags for the metric
tag_keys = {
"wafer_slot": "wafer_slot",
"tel_tube": "tel_tube",
}
if _has_tag(meta.det_info, 'wafer.bandpass'):
bandpasses = meta.det_info.wafer.bandpass
tag_keys["bandpass"] = "wafer.bandpass"
else:
bandpasses = meta.det_info.det_cal.bandpass
tag_keys["bandpass"] = "det_cal.bandpass"
tag_keys.update(self._tags)
tags = []
vals = []
for bp in np.unique(bandpasses):
for ws in np.unique(meta.det_info.wafer_slot):
subset = np.where(
(meta.det_info.wafer_slot == ws) & (bandpasses == bp)
)[0]
white_noise = meta.preprocess[self._noise_aman].white_noise[subset] * self._unit_factor
mask = (white_noise != 0) & (~np.isnan(white_noise))
good_indices = np.nonzero(mask)[0]
if good_indices.size > 0:
vals.append(np.sqrt(1.0 / np.nansum(1.0 / (white_noise[good_indices])**2)))
tags_base = {
k: _get_tag(meta.det_info, i, subset[0]) for k, i in tag_keys.items() if _has_tag(meta.det_info, i)
}
tags_base["telescope"] = meta.obs_info.telescope
tags.append(tags_base)
obs_time = [meta.obs_info.timestamp] * len(tags)
return {
"field": f"{self._influx_field}{'_' + self._field_name if self._field_name else ''}",
"values": vals,
"timestamps": obs_time,
"tags": tags,
}
# inherit from PreprocessQA to reuse available_obs method
class PreprocessDetNoise(PreprocessQA):
"""Generate a QA metric for per detector noise (usually NEP) values
for each wafer slot and bandpass.
The config entry supports a `process_args` block where the following
options can be specified:
tags : dict
The values are keys into `metadata.det_info` to record as tags with
the Influx line. The keys are addded to the default list
["wafer_slot", "tel_tube", "bandpass"] with "bandpass" being taken
from "wafer.bandpass" or "det_cal.bandpass" if the former isn't found.
noise_aman : str
The name of the axis manager that holds the white noise array.
(default 'noise')
"""
_influx_meas = "preprocesstod"
_influx_field = "det_noise"
def __init__(
self,
*args,
process_args={},
**kwargs
):
# bypass the PreprocessQA __init__
super(PreprocessQA, self).__init__(*args, **kwargs)
# extract parameters
self._tags = process_args.get("tags", {})
self._noise_aman = process_args.get("noise_aman", "noise")
self._field_name = process_args.get("field_name", "")
self._unit_factor = process_args.get("unit_factor", 1)
def _process(self, meta):
# record one metric per wafer_slot per bandpass
# extract these tags for the metric
tag_keys = {
"wafer_slot": "wafer_slot",
"tel_tube": "tel_tube",
}
if _has_tag(meta.det_info, 'wafer.bandpass'):
bandpasses = meta.det_info.wafer.bandpass
tag_keys["bandpass"] = "wafer.bandpass"
else:
bandpasses = meta.det_info.det_cal.bandpass
tag_keys["bandpass"] = "det_cal.bandpass"
tag_keys.update(self._tags)
tags = []
vals = []
for bp in np.unique(bandpasses):
for ws in np.unique(meta.det_info.wafer_slot):
subset = np.where(
(meta.det_info.wafer_slot == ws) & (bandpasses == bp)
)[0]
white_noise = meta.preprocess[self._noise_aman].white_noise[subset] * self._unit_factor
mask = (white_noise != 0) & (~np.isnan(white_noise))
good_indices = np.nonzero(mask)[0]
if good_indices.size > 0:
vals.append(np.sqrt(1.0 / np.nansum(1.0 / (white_noise[good_indices])**2)) * np.sqrt(len(good_indices)))
tags_base = {
k: _get_tag(meta.det_info, i, subset[0]) for k, i in tag_keys.items() if _has_tag(meta.det_info, i)
}
tags_base["telescope"] = meta.obs_info.telescope
tags.append(tags_base)
obs_time = [meta.obs_info.timestamp] * len(tags)
return {
"field": f"{self._influx_field}{'_' + self._field_name if self._field_name else ''}",
"values": vals,
"timestamps": obs_time,
"tags": tags,
}
[docs]
class HWPSolQA(QAMetric):
""" Base class for metrics derived from HWP angle solutions. Subclasses should
implement the `_process` method. Some quantities are derived twice, once for each
encoder, and these will require and `encoder` parameter to be provided to select
which one to produce a metric for. This is indicated by setting the `_needs_encoder`
attribute to `True`.
"""
_influx_meas = "hwp_solution"
_needs_encoder = False # set this flag if encoder needs to be specified
def __init__(self, context, monitor, encoder=None, **kwargs):
super().__init__(context, monitor, **kwargs)
self._encoder = encoder
self._tags = {}
if encoder is not None:
if str(encoder) not in ["1", "2"]:
raise Exception(f"Invalid value {encoder} for encoder parameter.")
self._tags = {"encoder": self._encoder}
elif self._needs_encoder:
raise Exception("This metric needs an encoder to be specified on creation.")
def _get_available_obs(self):
# find preprocess manifest file
man_file = [p["db"] for p in self.context["metadata"] if p.get("label", "") == "hwp_solution"]
if len(man_file) == 0:
raise Exception(f"No hwp_solution metadata block in context {self.context.filename}.")
# load manifest and read available observations
man_db = metadata.ManifestDb.from_file(man_file[0])
obs_re = re.compile("^obs_*.")
return [o[0] for o in man_db.get_entries(["\"obs:obs_id\""]).asarray() if obs_re.match(o[0])]
def _process(self, meta):
tags = self._tags.copy()
tags.update({"telescope": meta.obs_info.telescope})
obs_time = [meta.obs_info.timestamp]
return {
"field": self._influx_field,
"values": [self._gen_value(meta)],
"timestamps": obs_time,
"tags": [tags],
}
[docs]
class HWPSolSuccess(HWPSolQA):
""" Records success of the HWP angle solution calculation, for each encoder.
0: No data, 1: Success, 2: Fail
"""
_influx_field = "sol_success"
_needs_encoder = True
def _gen_value(self, meta):
if meta.hwp_solution[f"logger_{self._encoder}"] == 'No HWP data':
return 0
elif meta.hwp_solution[f"logger_{self._encoder}"] == "Angle calculation succeeded":
return 1
else:
return 2
[docs]
class HWPSolPrimaryEncoder(HWPSolQA):
""" The primary encoder used for the HWP angle calculation."""
_influx_field = "primary_encoder"
def _gen_value(self, meta):
return meta.hwp_solution["primary_encoder"]
[docs]
class HWPSolVersion(HWPSolQA):
""" The version of the solution used for the HWP angle calculation."""
_influx_field = "version"
def _gen_value(self, meta):
return meta.hwp_solution["version"]
[docs]
class HWPSolOffcenter(HWPSolQA):
""" Calculated offcentering of HWP angle solution."""
_influx_field = "offcenter"
def _gen_value(self, meta):
return meta.hwp_solution["offcenter"][0]
[docs]
class HWPSolOffcenterErr(HWPSolQA):
""" Standard error on the offcentering of HWP angle solution."""
_influx_field = "offcenter_err"
def _gen_value(self, meta):
return meta.hwp_solution["offcenter"][1]
[docs]
class HWPSolNumSamples(HWPSolQA):
""" The total number of encoder samples."""
_influx_field = "num_samples"
_needs_encoder = True
def _gen_value(self, meta):
flag_key = f"filled_flag_{self._encoder}"
return meta.hwp_solution[flag_key].size
[docs]
class HWPSolNumFlagged(HWPSolQA):
""" The number of encoder samples that were flagged."""
_influx_field = "num_flagged"
_needs_encoder = True
def _gen_value(self, meta):
flag_key = f"filled_flag_{self._encoder}"
return meta.hwp_solution[flag_key].sum()
[docs]
class HWPSolMeanRate(HWPSolQA):
""" The mean calculated HWP angle rate, for each encoder."""
_influx_field = "mean_rate"
_needs_encoder = True
def _gen_value(self, meta):
good_samp = ~meta.hwp_solution[f"filled_flag_{self._encoder}"]
nsamp = good_samp.sum()
rate = 0.0 if nsamp == 0 else (meta.hwp_solution[f"hwp_rate_{self._encoder}"] * good_samp).sum() / nsamp
return rate
[docs]
class HWPSolMeanTemplate(HWPSolQA):
""" The mean of the calculated template magnitude."""
_influx_field = "mean_template"
_needs_encoder = True
def _gen_value(self, meta):
return np.mean(np.abs(meta.hwp_solution[f"template_{self._encoder}"]))
class HWPSolNumDroppedPacketsEncoder(HWPSolQA):
""" The number of dropped encoder packets."""
_influx_field = "num_dropped_packets_encoder"
_needs_encoder = True
def _gen_value(self, meta):
return meta.hwp_solution[f"num_dropped_packets_{self._encoder}"]
class HWPSolNumDroppedPacketsIrig(HWPSolQA):
""" The number of dropped IRIG packets."""
_influx_field = "num_dropped_packets_irig"
_needs_encoder = True
def _gen_value(self, meta):
return meta.hwp_solution[f"num_dropped_packets_irig_{self._encoder}"]
class HWPSolNumDataPointGlitchesEncoder(HWPSolQA):
""" The number of data point glitches in encoder data."""
_influx_field = "num_data_point_glitches_encoder"
_needs_encoder = True
def _gen_value(self, meta):
return meta.hwp_solution[f"num_glitches_{self._encoder}"]
class HWPSolNumDataPointGlitchesIrig(HWPSolQA):
""" The number of data point glitches in IRIG data."""
_influx_field = "num_data_point_glitches_irig"
_needs_encoder = True
def _gen_value(self, meta):
return meta.hwp_solution[f"num_glitches_irig_{self._encoder}"]
class HWPSolNumValueGlitchesEncoder(HWPSolQA):
""" The number of value glitches in encoder data."""
_influx_field = "num_value_glitches_encoder"
_needs_encoder = True
def _gen_value(self, meta):
return meta.hwp_solution[f"num_value_glitches_{self._encoder}"]
class HWPSolNumValueGlitchesIrig(HWPSolQA):
""" The number of value glitches in IRIG data."""
_influx_field = "num_value_glitches_irig"
_needs_encoder = True
def _gen_value(self, meta):
return meta.hwp_solution[f"num_value_glitches_irig_{self._encoder}"]
class HWPSolDeadRotations(HWPSolQA):
""" The number of dead rotations that failed to correct glitches."""
_influx_field = "num_dead_rots"
_needs_encoder = True
def _gen_value(self, meta):
return meta.hwp_solution[f"num_dead_rots_{self._encoder}"]
class HWPSolDroppedSlits(HWPSolQA):
""" The number of dropped slits."""
_influx_field = "num_dropped_slits"
_needs_encoder = True
def _gen_value(self, meta):
return meta.hwp_solution[f"num_dropped_slits_{self._encoder}"]