Source code for sotodlib.site_pipeline.update_det_match

import h5py
import numpy as np
import os
import yaml
from copy import deepcopy
from typing import Optional, Dict, List
from dataclasses import dataclass
from tqdm.auto import tqdm
import logging
import argparse
import time

from sotodlib.coords import det_match, optics
from sotodlib.coords.det_match_solutions import SolutionsCfg, solve_all
from sotodlib import core
from sotodlib.core.metadata import ManifestDb
from sotodlib.io.metadata import write_dataset


logger = logging.getLogger("update_det_match")
if len(logger.handlers) == 0:
    fmt = logging.Formatter("%(asctime)s %(levelname)s %(name)s : %(message)s")
    handler = logging.StreamHandler()
    handler.setFormatter(fmt)
    logger.addHandler(handler)
logger.setLevel(logging.INFO)


def get_detset_time(detset: str) -> float:
    """
    Gets timestamp associated with a detset. Will parse this from the detset
    name, assuming it is of the form <stream_id>_<time>_tune.
    """
    return float(detset.split('_')[-2])


[docs] @dataclass class UpdateDetMatchesConfig: """ Configuration for update script Args ------ results_path: str Path to directory where results such as matches, manifestdbs, and h5 files will be stored. context_path: str Path to context file. This must contain detset and det_cal metadata. site_pipeline_root: str Path to root of site-pipeline-configs. If ``$SITE_PIPELINE_CONFIG_DIR`` is set in the environment, that will be used as the default. wafer_map_path: str Path to wafer-map to be used to find det-match solution files. If not specified, defaults to ``<site_pipeline_root>/shared/detmapping/wafer_map.yaml``. freq_offset_range_args: Optional[Tuple[float, float, float]] If this is not None, for each match, we will scan over a range of freq-offsets to determine the optimal offset to use. If set, must contain a tuple of floats, containing ([start,] stop, [step,]) that will be passed directly to ``np.arange``. If it is None, will just run with the match with freq_offset_mhz=0. match_pars: Optional[Dict] If not None, will be passed directly to ``det_match.MatchParams`` that is used by the det-match function. detset_meta_name: str Name of the metadata entry in the context that contains detset info. detcal_meta_name: str Name of the metadata entry in the context that contains det_cal info. ufms: Optional[List[str]] List of ufm names to run update_det_match on. Will run on all ufms for which detsets exist if None. show_pb: bool Will show progress bar when scanning freq-offset. apply_solution_pointing: bool If True, pointing information computed from design-detector positions will be used in the ``merged`` detset of the match. write_relpath: bool If True, will use the relative path to the h5 file (relative to the db path) when writing to the manifestdb solution_type: str Type of solutions to use. Must be one of ['kaiwen_handmade', 'resonator_set']. If 'kaiwen_handmade', will use the handmade solutions from Kaiwen pulled from the wafer_map file in the site-pipeline-configs. If `resonator_set`, must also specify the ``resonator_set_dir`` to pull solutions from. resonator_set_dir: Optional[str] If ``solution_type`` is 'resonator_set', this must be specified and contain the path to the resonator-set solutions. This directory must have a res-set npy file for each stream_id that is expected in the matching, formatted like ``<resonator_set_dir>/<stream_id>.npy``, which contains the result from ``np.save(fname, match.merged.as_array())``. time_before_cache_failure: float Time in seconds before a failed detset will be added to the cache. This is to prevent new detsets still acquiring data from being added right away. Attributes ------------- freq_offsets : Optional[np.ndarray] If not None, contains freq_offsets determined by ``freq_offset_range_args`` which will be scanned over. """ results_path: str context_path: str site_pipeline_root:str = os.environ.get('SITE_PIPELINE_CONFIG_DIR') wafer_map_path: Optional[str] = None freq_offset_range_args: Optional[tuple[float, float, float]] = (-4, 4, 0.3) match_pars: Optional[Dict] = None detset_meta_name : str = 'smurf' detcal_meta_name: str = 'det_cal' ufms: Optional[List[str]] = None show_pb: bool = False apply_solution_pointing: bool = True write_relpath: bool = True solution_type: str = 'kaiwen_handmade' resonator_set_dir: Optional[str] = None time_before_cache_failure: float = float(3600 * 24 * 7) start_time: int = 0 stop_time: int = 2**32 def __post_init__(self): if self.site_pipeline_root is None: raise ValueError("Must set site_pipeline_root, or SITE_PIPELINE_CONFIG_DIR env var") if self.wafer_map_path is None: self.wafer_map_path = os.path.join( self.site_pipeline_root, 'shared/detmapping/wafer_map.yaml') if self.freq_offset_range_args is not None: self.freq_offsets = np.arange(*self.freq_offset_range_args) else: self.freq_offsets = None if self.match_pars is None: self.match_pars = {} if not os.path.exists(self.results_path): raise FileNotFoundError(f"Results dir does not exist: {self.results_path}") allowed_solution_types = ['kaiwen_handmade', 'resonator_set'] if self.solution_type not in allowed_solution_types: raise ValueError( f"Solution type ({self.solution_type}) must be a member of: {allowed_solution_types}") if self.solution_type == 'resonator_set': if self.resonator_set_dir is None: raise ValueError("Must specify resonator_set_dir for solution_type='resonator_set'") if self.ufms is not None: self.ufms = [u.lower() for u in self.ufms]
class Runner: def __init__(self, cfg: UpdateDetMatchesConfig): self.cfg = cfg self.ctx = core.Context(cfg.context_path) self.detset_db = None self.detcal_db = None with open(self.cfg.wafer_map_path, 'r') as f: self.wafer_map = yaml.safe_load(f) self.ufm_to_fp_file = os.path.join( cfg.site_pipeline_root, 'shared/focalplane/ufm_to_fp.yaml') for d in self.ctx['metadata']: if 'name' in d: entry_name = d['name'] elif 'label' in d: entry_name = d['label'] else: continue if entry_name == cfg.detset_meta_name: self.detset_db = core.metadata.ManifestDb(d['db']) elif entry_name == cfg.detcal_meta_name: self.detcal_db = core.metadata.ManifestDb(d['db']) if self.detset_db is None: raise Exception( f"Could not find detset metadata entry with name: {cfg.detset_meta_name}") if self.detcal_db is None: raise Exception( f"Could not find detcal metadata entry with name: {cfg.detcal_meta_name}") self.failed_detset_cache_path = os.path.join(cfg.results_path, 'failed_detsets.yaml') self.match_dir = os.path.join(cfg.results_path, 'matches') if not os.path.exists(self.match_dir): os.mkdir(self.match_dir) def get_remaining_detsets(self) -> List[str]: detsets_all = set(self.detset_db.get_entries(['dataset'])['dataset']) if self.cfg.ufms: detsets_all = set([d for d in detsets_all if '_'.join(d.split('_')[:2]) in self.cfg.ufms]) ts = [get_detset_time(d) for d in detsets_all] detsets_all = set([d for t, d in zip(ts, detsets_all) if t > self.cfg.start_time and t < self.cfg.stop_time]) failed_detsets = set(get_failed_detsets(self.failed_detset_cache_path)) finished_detsets = set([os.path.splitext(f)[0] for f in os.listdir(self.match_dir)]) remaining_detsets = list(detsets_all - failed_detsets - finished_detsets) return remaining_detsets def load_solution_set(runner: Runner, stream_id: str, wafer_slot=None): cfg = runner.cfg if cfg.solution_type == 'kaiwen_handmade': sol_file = os.path.join( os.path.dirname(runner.cfg.wafer_map_path), runner.wafer_map[stream_id]['solution'] ) teltype = runner.wafer_map[stream_id]['tel_type'] if wafer_slot is None: # Pull from detmapping cfg wafer_slot = runner.wafer_map[stream_id]['wafer_slot'] fp_pars = optics.get_ufm_to_fp_pars(teltype, wafer_slot, runner.ufm_to_fp_file) rs = det_match.ResSet.from_solutions(sol_file, fp_pars=fp_pars, platform=teltype) rs.name = 'sol' return rs elif cfg.solution_type == 'resonator_set': sol_file = os.path.join(cfg.resonator_set_dir, f"{stream_id}.npy") rs_arr = np.load(sol_file) rs = det_match.ResSet.from_array(rs_arr) rs.name = 'sol' return rs def add_to_failed_cache(cache_file, detset, msg, cfg: UpdateDetMatchesConfig): if time.time() - get_detset_time(detset) < cfg.time_before_cache_failure: logger.info(f"{detset} is too recent to add to failed cache") return if os.path.exists(cache_file): with open(cache_file, 'r') as f: x = yaml.safe_load(f) else: x = {} x[str(detset)] = str(msg) with open(cache_file, 'w') as f: yaml.dump(x, f) def get_failed_detsets(cache_file): if not os.path.exists(cache_file): return [] with open(cache_file, 'r') as f: x = yaml.safe_load(f) return list(x.keys()) def run_match_aman(runner: Runner, aman, detset, wafer_slot=None): stream_id = aman.det_info.stream_id[aman.det_info.detset == detset][0] rs0 = det_match.ResSet.from_aman(aman, stream_id) rs0.name = 'meas' rs1 = load_solution_set(runner, stream_id, wafer_slot=wafer_slot) rs1.name = 'sol' match_pars = det_match.MatchParams(**runner.cfg.match_pars) freq_offsets = runner.cfg.freq_offsets if freq_offsets is not None: costs, opt_freq = scan_for_freq_offset( rs0, rs1, freq_offsets, show_pb=runner.cfg.show_pb, match_pars=match_pars, ) match_pars.freq_offset_mhz = opt_freq match = det_match.Match(rs0, rs1, match_pars=match_pars, apply_dst_pointing=runner.cfg.apply_solution_pointing) return match def run_match(runner: Runner, detset: str) -> bool: """ Creates match files for specified detset, along with any other unmatched detsets in the loaded observation. If match fails for a known reason, this will add it to the failed_detset_cache so that it is not re-attempted. Returns ------- success: bool True if match was successful """ # Find obs-id with cal info obs_all = set(runner.ctx.obsdb.query("type=='obs'")['obs_id']) cur = runner.ctx.obsfiledb.conn.execute(f""" select distinct obs_id from files where detset=='{detset}' """) obsids_with_cal = set(runner.detcal_db.get_entries(['dataset'])['dataset']) obs_dset = {r[0] for r in cur} obs_ids = sorted(list( obs_all.intersection(obs_dset).intersection(obsids_with_cal)), key=lambda s:s.split('_')[1])[::-1] if len(obs_ids) == 0: add_to_failed_cache( runner.failed_detset_cache_path, detset, "NO_OBSID_WITH_CAL", runner.cfg ) logger.error(f"Cannot find obsid for detset {detset}") return False obs_id = obs_ids[0] book_dir = os.path.dirname(runner.ctx.obsfiledb.get_files(obs_id)[detset][0][0]) book_idx_file = os.path.join(book_dir, 'M_index.yaml') with open(book_idx_file, 'r') as f: book_idx = yaml.safe_load(f) aman = runner.ctx.get_meta(obs_id, ignore_missing=True) finished_detsets = set([os.path.splitext(f)[0] for f in os.listdir(runner.match_dir)]) new_detsets = [] for detset in np.unique(aman.det_info.detset): if ( detset not in finished_detsets and get_detset_time(detset) > runner.cfg.start_time and get_detset_time(detset) < runner.cfg.stop_time ): new_detsets.append(detset) logger.info(f"Loaded obs_id {obs_id}. Running matches for detsets:") for ds in new_detsets: logger.info(f" - {ds}") for ds in new_detsets: stream_id = aman.det_info.stream_id[aman.det_info.detset == ds][0] # Try to get wafer slot info from book idx if 'wafer_slots' in book_idx: for ws in book_idx['wafer_slots']: if ws['stream_id'] == stream_id: wafer_slot = ws['wafer_slot'] break else: logger.error( f"Could not find wafer_slot from book index for ds={detset}, " f"obs_id={obs_id}" ) raise Exception("Could not find wafer-slot") else: wafer_slot = None match = run_match_aman(runner, aman, ds, wafer_slot=wafer_slot) fpath = os.path.join(runner.match_dir, f"{ds}.h5") match.save(fpath) logger.info(f"Saved match to file: {fpath}") return True def scan_for_freq_offset(rs0, rs1, freq_offsets, match_pars=None, show_pb=True): """ Scans through a list of frequency offsets to find optimal match between two res-sets. Returns ---------- costs : np.ndarray Costs of size ``len(freq_offsets)`` containing the matching cost at each frequency opt_freq : float Optimal offset-frequency for match. """ if match_pars is None: match_pars = det_match.MatchParams() else: match_pars = deepcopy(match_pars) freq_offsets = np.array(freq_offsets) rs0 = deepcopy(rs0) rs1 = deepcopy(rs1) costs = np.full_like(freq_offsets, np.nan) for i, offset in enumerate(tqdm(freq_offsets, disable=(not show_pb))): match_pars.freq_offset_mhz = offset match = det_match.Match(rs0, rs1, match_pars=match_pars) costs[i] = match.matching_cost imin = np.argmin(costs) opt_freq = freq_offsets[imin] logger.info(f"Found freq offset: {opt_freq}") return costs, opt_freq def update_manifests(runner: Runner, detset): det_match_idx = os.path.join(runner.cfg.results_path, 'det_match.sqlite') det_match_h5 = os.path.join(runner.cfg.results_path, 'det_match.h5') assignment_idx = os.path.join(runner.cfg.results_path, 'assignment.sqlite') assignment_h5 = os.path.join(runner.cfg.results_path, 'assignment.h5') match_file = os.path.join(runner.cfg.results_path, f'matches/{detset}.h5') with h5py.File(match_file, 'r') as f: ra = np.array(f['merged']) names = list(ra.dtype.names) names[names.index('readout_id')] = 'dets:readout_id' names[names.index('det_id')] = 'dets:det_id' ra.dtype.names = tuple(names) assignment = ra[['dets:readout_id', 'dets:det_id']] def add_to_db(arr, db_path, h5_path, detset, write_relpath=True): write_dataset(core.metadata.ResultSet.from_friend(arr), h5_path, detset, overwrite=True) if not os.path.exists(db_path): scheme = core.metadata.ManifestScheme() scheme.add_exact_match('dets:detset') scheme.add_data_field('dataset') db = core.metadata.ManifestDb(db_path, scheme=scheme) if write_relpath: h5_path = os.path.relpath(h5_path, start=os.path.dirname(db_path)) db = core.metadata.ManifestDb(db_path) if detset not in db.get_entries(['dataset'])['dataset']: db.add_entry({ 'dets:detset': detset, 'dataset': detset }, h5_path) else: logger.warning(f"Dataset {detset} already exists in db: {db_path}") write_relpath = runner.cfg.write_relpath add_to_db(ra, det_match_idx, det_match_h5, detset, write_relpath=write_relpath) add_to_db(assignment, assignment_idx, assignment_h5, detset, write_relpath=write_relpath) def update_manifests_all(runner): det_match_idx = os.path.join(runner.cfg.results_path, 'det_match.sqlite') assignment_idx = os.path.join(runner.cfg.results_path, 'assignment.sqlite') if os.path.exists(det_match_idx): det_match_db = ManifestDb(det_match_idx) det_match_detsets = det_match_db.get_entries(['dataset'])['dataset'] else: det_match_detsets = [] if os.path.exists(assignment_idx): assignment_db = ManifestDb(assignment_idx) assignment_detsets = assignment_db.get_entries(['dataset'])['dataset'] else: assignment_detsets = [] indexed_detsets = set(det_match_detsets).intersection(set(assignment_detsets)) completed_detsets = set([os.path.splitext(f)[0] for f in os.listdir(runner.match_dir)]) for ds in (completed_detsets - indexed_detsets): logger.info(f"Adding {ds} to manifests") update_manifests(runner, ds) def get_parser(parser=None): if parser is None: parser = argparse.ArgumentParser() parser.add_argument('config_file', type=str, help='path to config file') parser.add_argument('--run-all', action='store_true', help='run all detsets') parser.add_argument('--solutions-mode', action='store_true', help='create a det_match solution set') return parser def main(config_file: str, run_all: bool=False, solutions_mode: bool=False): """ Site-pipeline script to peform detector matching of tunesets to a solution set. ------ config_file: str Yaml configuration file for tuneset matching (or solution set matching if solutions_mode is True). run_all: bool Run all detsets if True, otherwise exit after first successful match. solutions_mode: bool Create a det_match solution instead of matching a solution to a tuneset. Requires a solutions configuration file. """ if solutions_mode: with open(config_file, "r") as f: cfg = SolutionsCfg(**yaml.safe_load(f)) solve_all(cfg) else: with open(config_file, 'r') as f: cfg = UpdateDetMatchesConfig(**yaml.safe_load(f)) runner = Runner(cfg) remaining_detsets = runner.get_remaining_detsets() logger.info(f"{len(remaining_detsets)} detsets to match") if run_all: update_manifests_all(runner) for detset in remaining_detsets: logger.info(f"running: {detset}") run_match(runner, detset) update_manifests_all(runner) else: for detset in remaining_detsets: success = run_match(runner, detset) update_manifests_all(runner) if success: break if __name__ == '__main__': parser = get_parser() args = parser.parse_args() main(args.config_file, run_all=args.run_all, solutions_mode=args.solutions_mode)