Source code for sotodlib.site_pipeline.update_smurf_caldbs

""" 
Script to import tuning and readout id channel mapping, and detector calibration
information into manifest dbs for book loading.  At present this just works in
the configuration where it has access to both level 2 and level 3 indexing. This
is technically possible with just level 3 data / indexing but requires some
still non-existant tools. 

Configuration file required::

    config = {
        'archive': {
            'detset': {
                'root_dir': /path/to/detset/root,
                'index': 'detset.sqlite',
                'h5file': 'detset.h5',
                'context': 'context.yaml',
                'write_relpath': True
            },
            'det_cal': {
                'root_dir': /path/to/det_cal/root,
                'index': 'det_cal.sqlite',
                'h5file': 'det_cal.h5,
                'context': 'context.yaml',
                'failed_obsid_cache': 'failed_obsids.yaml',
                'write_relpath': True
            },
        },
        'g3tsmurf': g3tsmurf_hwp_config.yaml',
        'imprinter': imprinter.yaml,
    }
"""

import traceback
import os
import argparse
import yaml
from dataclasses import dataclass, astuple, fields
import numpy as np
from tqdm.auto import tqdm
import logging

from typing import Optional, Union

from sotodlib import core
from sotodlib.io.metadata import write_dataset
from sotodlib.io.load_smurf import G3tSmurf, TuneSets
from sotodlib.io.load_book import load_smurf_npy_data, get_cal_obsids
from sotodlib.io.imprinter import Imprinter
import sotodlib.site_pipeline.util as sp_util

# stolen  from pysmurf, max bias volt / num_bits
DEFAULT_RTM_BIT_TO_VOLT = 10 / 2**19
DEFAULT_pA_per_phi0 = 9e6

logger = logging.getLogger('smurf_caldbs')
if not logger.hasHandlers():
    sp_util.init_logger('smurf_caldbs')



def main(config: Union[str, dict], 
        overwrite:Optional[bool]=False,
        skip_detset=False, skip_detcal=False):
    if not skip_detset:
        smurf_detset_info(config, overwrite)
    if not skip_detcal:
        run_update_det_caldb(config, overwrite=overwrite)

def smurf_detset_info(config: Union[str, dict], 
        overwrite:Optional[bool]=False):
    """Write out the updates for the manifest database with information about
    the readout ids present inside each detset.
    """
    if type(config) == str:
        config = yaml.safe_load(open(config, 'r'))

    h5_path = config['archive']['detset']['h5file']
    idx_path = config['archive']['detset']['index']
    root_dir = config['archive']['detset'].get('root_dir')
    if root_dir is not None:
        h5_path = os.path.join(root_dir, h5_path)
        idx_path = os.path.join(root_dir, idx_path)
    h5_relpath = os.path.relpath(h5_path, start=os.path.dirname(idx_path))

    config['g3tsmurf'] = os.path.expandvars(config['g3tsmurf'])
    config['imprinter'] = os.path.expandvars(config['imprinter'])

    SMURF = G3tSmurf.from_configs(config['g3tsmurf'])
    session = SMURF.Session()

    imprinter = Imprinter(config['imprinter'])
    ctx = core.Context(config['archive']['detset']['context'])

    if not os.path.exists(idx_path):
        scheme = core.metadata.ManifestScheme()
        scheme.add_exact_match('dets:detset')
        scheme.add_data_field('dataset')
        db = core.metadata.ManifestDb( 
            idx_path,
            scheme=scheme
        )
    else:
        db = core.metadata.ManifestDb(idx_path)

    keys = [
        "dets:readout_id",
        "dets:smurf.band",
        "dets:smurf.channel",
        "dets:smurf.frequency",
        "dets:stream_id",
        "dets:wafer_slot",
        "dets:tel_tube",
    ]

    stream_maps = {}
    for tube in imprinter.tubes:
        for s, slot in enumerate(imprinter.tubes[tube]['slots']):
            stream_maps[slot] = (f'ws{s}', tube)

    c = ctx.obsfiledb.conn.execute('select distinct name from detsets')
    ctx_detsets = [r[0] for r in c]
    added_detsets = db.get_entries(['dataset'])['dataset']
    detsets = session.query(TuneSets).all()

    for ts in detsets:
        if ts.name not in ctx_detsets:
            logger.debug(f"{ts.name} not in obsfiledb, ignoring")
            continue
        if ts.name in added_detsets and not overwrite:
            continue
        
        det_rs = core.metadata.ResultSet(keys=keys)
        for channel in ts.channels:
            det_rs.append({
                'dets:readout_id': channel.name,
                'dets:smurf.band': channel.band,
                'dets:smurf.channel': channel.channel,
                'dets:smurf.frequency': channel.frequency,
                'dets:stream_id': ts.stream_id,
                'dets:wafer_slot': stream_maps[ts.stream_id][0],
                'dets:tel_tube':stream_maps[ts.stream_id][1],
            })
        write_dataset(
            det_rs, h5_path,
            ts.name, 
            overwrite,
        )
        # add new entries to database
        write_relpath = config['archive']['detset'].get('write_relpath', True)
        if ts.name not in added_detsets:
            db_data = {'dets:detset': ts.name,
                    'dataset': ts.name}
            path = h5_relpath if write_relpath else h5_path
            db.add_entry(db_data, filename=path)


[docs] @dataclass class CalInfo: """ Class that contains detector calibration information that will go into the caldb. Attributes ------------ readout_id: str Readout id of detector r_tes: float Detector resistance [ohms], determined through bias steps while the detector is biased r_frac: float Fractional resistance of TES, given by r_tes / r_n p_bias: float Bias power on the TES [J] computed using bias steps at the bias point s_i: float Current responsivity of the TES [1/V] computed using bias steps at the bias point phase_to_pW: float Phase to power conversion factor [pW/rad] computed using s_i, pA_per_phi0, and detector polarity v_bias: float Commanded bias voltage [V] on the bias line of the detector for the observation tau_eff: float Effective thermal time constant [sec] of the detector, measured from bias steps bg: int Bias group of the detector. Taken from IV curve data, which contains bgmap data taken immediately prior to IV. This will be -1 if the detector is unassigned polarity: int Polarity of the detector response for a positive change in bias current while the detector is superconducting. This is needed to correct for detectors that have reversed response. r_n: float Normal resistance of the TES [Ohms] calculated from IV curve data p_sat: float "saturation power" of the TES [J] calculated from IV curve data. This is defined as the electrical bias power at which the TES resistance is 90% of the normal resistance. """ readout_id: str = '' # From bias steps r_tes: float = np.nan # Ohm r_frac: float = np.nan p_bias: float = np.nan # J s_i: float = np.nan # 1/V phase_to_pW: float = np.nan # pW/rad v_bias: float = np.nan # V tau_eff: float = np.nan # sec # From IV bg: int = -1 polarity: int = 1 r_n: float = np.nan p_sat: float = np.nan # J @classmethod def dtype(cls): """Returns ResultSet dtype for an item based on this class""" dtype = [] for field in fields(cls): if field.name == 'readout_id': dt = ('dets:readout_id', '<U40') else: dt = (field.name, field.type) dtype.append(dt) return dtype
@dataclass class CalResult: success: bool = False fail_msg: str = '' resset: Optional[core.metadata.ResultSet] = None def get_cal_resset(ctx: core.Context, obs_id) -> CalResult: """ Returns calibration ResultSet for a given ObsId. This pulls IV and bias step data for each detset in the observation, and uses that to compute CalInfo for each detector in the observation """ am = ctx.get_obs( obs_id, samples=(0, 1), ignore_missing=True, no_signal=True, on_missing={'det_cal': 'skip'} ) cals = [CalInfo(rid) for rid in am.det_info.readout_id] if len(cals) == 0: return CalResult(success=False, fail_msg="No detectors present in observation.") if 'smurf' not in am.det_info: return CalResult(success=False, fail_msg="Missing smurf info for observation") iv_obsids = get_cal_obsids(ctx, obs_id, 'iv') rtm_bit_to_volt = None pA_per_phi0 = None ivas = {dset: None for dset in iv_obsids} for dset, oid in iv_obsids.items(): if oid is not None: ivas[dset] = load_smurf_npy_data(ctx, oid, 'iv') if rtm_bit_to_volt is None: rtm_bit_to_volt = ivas[dset]['meta']['rtm_bit_to_volt'] pA_per_phi0 = ivas[dset]['meta']['pA_per_phi0'] else: logger.debug("missing IV data for %s", dset) bias_step_obsids = get_cal_obsids(ctx, obs_id, 'bias_steps') bsas = {dset: None for dset in bias_step_obsids} for dset, oid in bias_step_obsids.items(): if oid is not None: bsas[dset] = load_smurf_npy_data(ctx, oid, 'bias_step_analysis') if rtm_bit_to_volt is None: rtm_bit_to_volt = bsas[dset]['meta']['rtm_bit_to_volt'] pA_per_phi0 = ivas[dset]['meta']['pA_per_phi0'] else: logger.debug("missing bias step data for %s", dset) if rtm_bit_to_volt is None: rtm_bit_to_volt = DEFAULT_RTM_BIT_TO_VOLT if pA_per_phi0 is None: pA_per_phi0 = DEFAULT_pA_per_phi0 # Add IV info for i, cal in enumerate(cals): band = am.det_info.smurf.band[i] chan = am.det_info.smurf.channel[i] detset = am.det_info.detset[i] iva = ivas[detset] if iva is None: # No IV analysis for this detset continue ridx = np.where( (iva['bands'] == band) & (iva['channels'] == chan) )[0] if not ridx: # Channel doesn't exist in IV analysis continue ridx = ridx[0] cal.bg = iva['bgmap'][ridx] cal.polarity = iva['polarity'][ridx] cal.r_n = iva['R_n'][ridx] cal.p_sat = iva['p_sat'][ridx] obs_biases = dict(zip(am.bias_lines.vals, am.biases[:, 0] * 2*rtm_bit_to_volt)) bias_line_is_valid = {k: True for k in obs_biases.keys()} # check to see if biases have changed between bias steps and obs for bsa in bsas.values(): if bsa is None: continue for bg, vb_bsa in enumerate(bsa['Vbias']): bl_label = f"{bsa['meta']['stream_id']}_b{bg:0>2}" if np.isnan(vb_bsa): bias_line_is_valid[bl_label] = False continue if np.abs(vb_bsa - obs_biases[bl_label]) > 0.1: logger.debug("bias step and obs biases don't match for %s", bl_label) bias_line_is_valid[bl_label] = False for i, cal in enumerate(cals): band = am.det_info.smurf.band[i] chan = am.det_info.smurf.channel[i] detset = am.det_info.detset[i] stream_id = am.det_info.stream_id[i] bg = cal.bg bsa = bsas[detset] if bsa is None or bg == -1: continue bl_label = f'{stream_id}_b{bg:0>2}' if not bias_line_is_valid[bl_label]: continue ridx = np.where( (bsa['bands'] == band) & (bsa['channels'] == chan) )[0] if not ridx: # Channel doesn't exist in bias step analysis continue ridx = ridx[0] cal.r_tes = bsa['R0'][ridx] cal.r_frac = bsa['Rfrac'][ridx] cal.p_bias = bsa['Pj'][ridx] cal.s_i = bsa['Si'][ridx] cal.tau_eff = bsa['tau_eff'][ridx] if bg != -1: cal.v_bias = bsa['Vbias'][bg] cal.phase_to_pW = pA_per_phi0 / (2*np.pi) / cal.s_i * cal.polarity rset = core.metadata.ResultSet.from_friend(np.array( [astuple(c) for c in cals], dtype=CalInfo.dtype() )) return CalResult(success=True, resset=rset) def get_obs_with_detsets(ctx, detset_idx): """Gets all observations with type 'obs' that have detset data""" db = core.metadata.ManifestDb(detset_idx) detsets = db.get_entries(['dataset'])['dataset'] obs_ids = set() for dset in detsets: obs_ids = obs_ids.union(ctx.obsfiledb.get_obs_with_detset(dset)) return obs_ids def add_to_failed_cache(obs_id, cache_path, msg): logger.info(f"Adding {obs_id} to failed obsid cache with msg: {msg}") if os.path.exists(cache_path): with open(cache_path, 'r') as f: cache = yaml.safe_load(f) else: cache = {} cache[str(obs_id)] = msg with open(cache_path, 'w') as f: yaml.safe_dump(cache, f) def get_failed_obsids(cache_path): if cache_path is None: return set([]) if not os.path.exists(cache_path): return set([]) with open(cache_path, 'r') as f: cache = yaml.safe_load(f) return set(cache.keys()) def update_det_caldb(ctx, idx_path, detset_idx, h5_path, show_pb=False, format_exc=False, overwrite=False, failed_obsid_cache=None, root_dir=None, write_relpath=True, run_single=False): """ Updates the detector caldb with new observations. This will find calibration data for all observations that have detset data and are not in the failed_obsid_cache if specified. Args ----- ctx: core.Context Context object, must contain detset metadata idx_path: str Path to det_cal sqlite manifest db detset_idx: str Path to detset manifestdb h5_path: str Path to h5 file to write results set. show_pb: bool If True, will show progress bar and time remaining format_exc: bool If true, will log the full traceback whenever an exception is thrown. overwrite: bool If True, will overwrite existing entries in the h5 file. failed_obsid_cache: str Path to store failed obs-ids to avoid re-running them. If None, will not use a failed obsid cache and will attempt to add calibration info for all observations with detset data that are not in the det_cal manifest. root_dir: str Root directory of det_cal dbs. If true, will interpret ``idx_path``, ``h5_path``, and ``failed_obsid_cache`` relative to the root_dir. write_relpath: bool If true, when adding entries to the manifestdb, will use the h5 path relative to the idx_path. run_single: bool If true, will stop after writing a single entry to the det-cal db """ if root_dir is not None: h5_path = os.path.join(root_dir, h5_path) idx_path = os.path.join(root_dir, idx_path) if failed_obsid_cache is not None: failed_obsid_cache = os.path.join(root_dir, failed_obsid_cache) h5_relpath = os.path.relpath(h5_path, start=os.path.dirname(idx_path)) if not os.path.exists(idx_path): scheme = core.metadata.ManifestScheme() scheme.add_exact_match('obs:obs_id') scheme.add_data_field('dataset') db = core.metadata.ManifestDb(scheme=scheme) db.to_file(idx_path) db = core.metadata.ManifestDb(idx_path) # detset_db = metadata.Manifest(detset_idx) obsids_with_detsets = get_obs_with_detsets(ctx, detset_idx) failed_obsids = get_failed_obsids(failed_obsid_cache) obsids_with_obs = set(ctx.obsdb.query("type=='obs'")['obs_id']) - failed_obsids remaining_obsids = obsids_with_detsets.intersection(obsids_with_obs) if not overwrite: existing_obsids = set(db.get_entries(['dataset'])['dataset']) remaining_obsids = remaining_obsids - existing_obsids # Sort remaining obs_ids by timestamp remaining_obsids = sorted(remaining_obsids, key=(lambda s: s.split('_')[1])) logger.info("%d datasets to add", len(remaining_obsids)) for obs_id in tqdm(remaining_obsids, disable=(not show_pb)): try: result = get_cal_resset(ctx, obs_id) if result.success: logger.info("Writing metadata for %s", obs_id) write_dataset(result.resset, h5_path, obs_id, overwrite=overwrite) path = h5_relpath if write_relpath else h5_path db.add_entry({ 'obs:obs_id': obs_id, 'dataset': obs_id, }, filename=path, replace=overwrite) else: logger.error("Failed on %s: %s", obs_id, result.fail_msg) if failed_obsid_cache is not None: add_to_failed_cache(obs_id, failed_obsid_cache, result.fail_msg) except Exception as e: logger.error("Failed on %s: %s", obs_id, e) if format_exc: logger.error(traceback.format_exc()) if run_single: break def run_update_det_caldb(config_path, overwrite=False): with open(config_path, 'r') as f: config = yaml.safe_load(f) format_exc = config['archive']['det_cal'].get('format_exc', False) detset_idx = config['archive']['detset']['index'] detset_root_path = config['archive']['detset'].get('root_dir') if detset_root_path is not None: detset_idx = os.path.join(detset_root_path, detset_idx) ctx = core.Context(config['archive']['det_cal']['context']) update_det_caldb( ctx, config['archive']['det_cal']['index'], detset_idx, config['archive']['det_cal']['h5file'], show_pb=config['archive']['det_cal'].get('show_pb', False), format_exc=format_exc, overwrite=overwrite, failed_obsid_cache=config['archive']['det_cal'].get('failed_obsid_cache'), root_dir=config['archive']['det_cal'].get('root_dir'), write_relpath=config['archive']['det_cal'].get('write_relpath', True), run_single=config['archive']['det_cal'].get('run_single', False) ) def get_parser(parser=None): if parser is None: parser = argparse.ArgumentParser() parser.add_argument( '--config', type=str, help="configuration file" ) parser.add_argument( '--skip-detset', action='store_true', help="Skip detset update" ) parser.add_argument( '--skip-detcal', action='store_true', help="Skip detcal update" ) parser.add_argument( '--overwrite', action='store_true', help='Overwrite existing entries' ) return parser if __name__ == "__main__": parser = get_parser(parser=None) args = parser.parse_args() main(**vars(args))