Source code for sotodlib.coords.det_match

import warnings
from dataclasses import dataclass, fields, asdict, field
from typing import List, Optional, Tuple, Iterator
from copy import deepcopy
import h5py

import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import linear_sum_assignment

from sotodlib.coords import optics
from sotodlib.core import metadata
from sotodlib.io.metadata import write_dataset



[docs] def map_band_chans(b1, c1, b2, c2, chans_per_band=512): """ Returns an index mapping of length nchans1 from one set of bands and channels to another. Note that unmapped indices are returned as -1, so this must be handled before (or after) indexing or else you'll get weird results. Args: b1 (np.ndarray): Array of length nchans1 containing the smurf band of each channel c1 (np.ndarray): Array of length nchans1 containing the smurf channel of each channel b2 (np.ndarray): Array of length nchans2 containing the smurf band of each channel c2 (np.ndarray): Array of length nchans2 containing the smurf channel of each channel chans_per_band (int): Lets just hope this never changes. """ acs1 = b1 * chans_per_band + c1 acs2 = b2 * chans_per_band + c2 mapping = np.full_like(acs1, -1, dtype=int) for i, ac in enumerate(acs1): x = np.where(acs2 == ac)[0] if len(x > 0): mapping[i] = x[0] return mapping
[docs] def get_north_is_highband(bands, bgs): """ Checks if north is highband based on bgmapping. This will tell you if the majority of dets on the north side of the ufm (bgs 0-5) belong to highband (bands 4-7). """ highband = (bands > 3).astype(int) * 2 - 1 # 1 if highband, -1 if not north = (bgs < 6).astype(int) * 2 - 1 # 1 if north, -1 if not north[bgs == -1] = 0 return np.mean(highband * north) > 0
[docs] @dataclass class PointingConfig: """ Helper class for getting pointing info from an optics model. Args ----- fp_file: str Path to focal-plane file that is used by the optics module. wafer_slot: str Wafer slot of the UFM. For example: "ws0" tel_type: str Tel type for the optics model. Either "SAT" or "LAT" zemax_path: str If running for a "LAT" tel_type, the path to the zemax file must be specified. """ fp_file: str wafer_slot: str tel_type: str zemax_path: Optional[str] = None dx: float = field(init=False) dy: float = field(init=False) theta: float = field(init=False) fp_pars: dict = field(init=False) def __post_init__(self): if self.tel_type == 'LAT' and (self.zemax_path is None): return ValueError("zemax path must be set for 'LAT' tel_type") if self.tel_type not in ['SAT', 'LAT']: raise ValueError("tel_typ ") self.fp_pars = optics.get_ufm_to_fp_pars( self.tel_type, self.wafer_slot, self.fp_file ) self.dx = self.fp_pars['dx'] self.dy = self.fp_pars['dy'] self.theta = np.deg2rad(self.fp_pars['theta'])
[docs] def get_pointing(self, x, y, pol=0): xp = x * np.cos(self.theta) - y * np.sin(self.theta) + self.dx yp = x * np.sin(self.theta) + y * np.cos(self.theta) + self.dy if self.tel_type.upper() == 'SAT': xi, eta, gamma = optics.SAT_focal_plane( None, x=xp, y=yp, pol=pol ) elif self.tel_type.upper() == 'LAT': xi, eta, gamma = optics.LAT_focal_plane( None, zemax_path, x=xp, y=yp, pol=pol, ) return xi, eta, gamma
[docs] @dataclass class Resonator: """ Data structure to hold any resonator information. """ idx: int is_north: int res_freq: float res_qi: float = np.nan smurf_res_idx: int = -1 smurf_band: int = -1 smurf_channel: int = -1 smurf_subband: int = -1 readout_id: str = '' xi: float = np.nan eta: float = np.nan gamma: float = np.nan bg: int = -1 det_x: float = np.nan det_y: float = np.nan det_row: int = 0 det_col: int = 0 pixel_num: int = 0 det_rhomb: str = '' det_pol: str = '' det_freq: int = 0 det_bandpass: str = '' det_angle_raw_deg: float = np.nan det_angle_actual_deg: float = np.nan det_type: str = '' det_id: str = 'NO_MATCH' is_optical: int = 1 mux_bondpad: int = 0 mux_subband: str = '' mux_band: int = -1 mux_channel: int = -1 mux_layout_pos: int = -1 matched: int = 0 match_idx: int = -1
[docs] def apply_design_properties(smurf_res, design_res, in_place=False, apply_pointing=True): """ Combines two resonators into one, taking smurf-properties such as res-idx, smurf-band, and smurf-channel one, and design properties such as det position and polarization from the other. Args: smurf_res (Resonator): The resonator to take smurf properties from design_res (Resonator): The resonator to take design properties from in_place (bool): If True, the src_res will be modified. Otherwise, a new Resonator will be created and returned. """ if in_place: r = smurf_res else: r = deepcopy(smurf_res) design_props = [ 'bg', 'det_x', 'det_y', 'det_row', 'det_col', 'pixel_num', 'det_rhomb', 'det_pol', 'det_freq', 'det_bandpass', 'det_angle_raw_deg', 'det_angle_actual_deg', 'det_type', 'det_id', 'is_optical', 'mux_bondpad', 'mux_subband', 'mux_band', 'mux_channel', 'mux_layout_pos' ] if apply_pointing: design_props += ['xi', 'eta', 'gamma'] for prop in design_props: setattr(r, prop, getattr(design_res, prop)) return r
[docs] class ResSet: """ Class to hold a group of resonances. This provides easy interfaces for accessing Resonance fields as np arrays for fast computations and provides initialization functions from different data sources. """ def __init__(self, resonances: List[Resonator], name=None): self.resonances: List[Resonator] = resonances self.name = name def __iter__(self): yield from self.resonances def __getitem__(self, k): return self.resonances[k] def __len__(self): return len(self.resonances)
[docs] @classmethod def from_array(cls, arr, name=None, ignore_extra_fields=True): """ Creates a ResSet from a numpy structured array (resulting from ``as_array`` method). Args ----- arr: np.ndarray Structured ResSet array name: str Name for the res-set ignore_extra_fields: bool If True, this will ignore any fields from the array that are not in the Resonator dataclass. This may happen if loading an older saved array, where the resonator fields are not the same. """ resonators = [] names = arr.dtype.names field_names = [f.name for f in fields(Resonator)] for a in arr: # Remove any fields that are not in the Resonator class if # ignore_extra_fields is True. kw = dict(zip(names, a)) for name in names: if name in field_names: continue if ignore_extra_fields: del kw[name] else: raise ValueError(f"Field '{name}' not in Resonator class.") resonators.append(Resonator(**kw)) return cls(resonators, name=name)
[docs] def as_array(self): """ Returns resonance data in the form of a numpy structured array. """ dtype = [] data = [] for field in fields(Resonator): if field.type == str: typ = '<U50' else: typ = field.type dtype.append((field.name, typ)) for r in self.resonances: data.append(tuple(getattr(r, name) for name, _ in dtype)) return np.array(data, dtype=dtype)
[docs] @classmethod def from_aman(cls, aman, stream_id, det_cal=None, name=None): """ Load a resonator set from a Context object based on an obs_id Args ---------- aman: AxisManager Axis manager containing metadata stream_id: str Stream id for ResSet to load det_cal: AxisManager Detector calibration metadata. If not specified, will default to ``aman.det_cal`` """ m = aman.det_info.stream_id == stream_id if not np.any(m): raise ValueError(f"No channels with stream_id {stream_id} in obs") if det_cal is None: det_cal = aman.det_cal north_is_highband = get_north_is_highband( aman.det_info.smurf.band[m], det_cal.bg[m] ) resonators = [] for i, ri in enumerate(np.where(m)[0]): band, channel = aman.det_info.smurf.band[ri], aman.det_info.smurf.channel[ri] is_north = north_is_highband ^ (band < 4) readout_id = aman.det_info.readout_id[ri] bg = det_cal.bg[ri] res_freq=aman.det_info.smurf.frequency[ri] if res_freq >= 6000: res_freq -= 2000 res = Resonator( idx=i, is_north=is_north, res_freq=res_freq, smurf_band=band, smurf_channel=channel, readout_id=readout_id, bg=bg ) resonators.append(res) return cls(resonators, name=name)
[docs] @classmethod def from_tunefile(cls, tunefile, name=None, north_is_highband=True, resfit_file=None, bgmap_file=None): """ Creates an instance based on a smurf-tune file. If a resfit or bgmap file is included, that data will be added to the Resonance objects as well. Args: tunefile (str): Path to Pysmurf tunefile name (str): Name to label this ResSet north_is_highband (bool): True if the north-side of the array corresponds to bands 4-7 resfit_file (str): Path to file containing resonance fit data bgmap_file (str): Path to file containing bgmap data """ tune = np.load(tunefile, allow_pickle=True).item() resonances = [] idx = 0 for band, _v in tune.items(): if 'resonances' not in _v: continue for res_idx, d in _v['resonances'].items(): is_north = north_is_highband ^ (band < 4) res = Resonator( idx=idx, smurf_res_idx=res_idx, res_freq=d['freq'], smurf_band=band, is_north=is_north ) if d['channel'] != -1: res.smurf_channel = d['channel'] if res.smurf_band >= 4: res.res_freq -= 2000 resonances.append(res) idx += 1 rs = cls(resonances, name=name) if resfit_file is not None: rs.add_resfit_data(resfit_file) if bgmap_file is not None: rs.add_bgmap_data(bgmap_file) return rs
[docs] @classmethod def from_wafer_info_file(cls, wafer_info_file, array_name, name=None, pt_cfg: Optional[PointingConfig]=None): """ Initialize a ResSet from a wafer info file. This is a file that contains detector design information. Args ----- wafer_info_file: str Path to wafer info file array_name: str Array name, which is the key in the wafer-info-file. For example: "mv7". name: str Name to assign to the ResSet pt_cfg: PointingConfig If set, this will be used to get pointing info based on the optics model. If not set, pointing info will not be included in the ResSet. """ with h5py.File(wafer_info_file) as f: wafer_array = np.array(f[array_name]) resonators = [] idx = 0 for r in wafer_array: is_north = r['dets:wafer.coax'] == b'N' res = Resonator( idx=idx, det_id=r['dets:det_id'].decode(), mux_bondpad=r['dets:wafer.bond_pad'], mux_band=r['dets:wafer.mux_band'], mux_channel=r['dets:wafer.mux_channel'], mux_subband=r['dets:wafer.mux_subband'], mux_layout_pos=r['dets:wafer.mux_position'], res_freq=r['dets:wafer.design_freq_mhz'], bg=r['dets:wafer.bias_line'], det_pol=r['dets:wafer.pol'], det_bandpass=r['dets:wafer.bandpass'], det_row=r['dets:wafer.det_row'], det_col=r['dets:wafer.det_col'], det_rhomb=r['dets:wafer.rhombus'], det_type=r['dets:wafer.type'], det_x=r['dets:wafer.x'], det_y=r['dets:wafer.y'], det_angle_actual_deg=r['dets:wafer.angle'], is_north=is_north ) if pt_cfg is not None: res.xi, res.eta, res.gamma = pt_cfg.get_pointing(res.det_x, res.det_y) resonators.append(res) idx += 1 return cls(resonators, name=name)
[docs] @classmethod def from_solutions(cls, sol_file, north_is_highband=True, name=None, fp_pars=None, platform='SAT', zemax_path=None): """ Creates an instance from an input-solution file. This will include both design data, along with smurf-band and smurf-channel info. Resonance frequencies used here are the VNA freqs measured by Kaiwen. Args: sol_file (str): Path to solutions file name (str): Name to label this ResSet north_is_highband (bool): True if the north-side of the array corresponds to bands 4-8 fp_pars (dict): Result of the function ``sotododlib.coords.optics.get_ufm_to_fp_pars``. If this is None, detector positions will not be mapped to pointing angles. platform (str): 'SAT' or 'LAT'. Used to determine which focal plane function to use for pointing zemax_path (str): zemax path, required to get pointing for LAT optics """ resonances = [] if fp_pars is not None: theta = np.deg2rad(fp_pars['theta']) dx, dy = fp_pars['dx'], fp_pars['dy'] with open(sol_file, 'r') as f: labels = f.readline().split(',') # Read header for i in range(len(labels)): labels[i] = labels[i].strip() #Helper needed for when sol file has saved ints as floats def _int(val, null_val=None): is_null = (val == 'null' or val == '') if is_null and null_val is not None: return null_val return int(float(val)) i = 0 for line in f.readlines(): d = dict(zip(labels, line.split(','))) if not d['bias_line']: # Skip incomplete lines continue # is_north = north_is_highband ^ (_int(d['smurf_band']) < 4) # is_north = d['is_north'].lower().strip() == 'true' is_north = _int(d['bias_line']) < 6 is_optical = (d['is_optical'].lower() == 'true') r = Resonator( i, res_freq=float(d['freq_mhz']), smurf_band=_int(d['smurf_band']), bg=_int(d['bias_line']), det_x=float(d['det_x']), det_y=float(d['det_y']), det_rhomb=d['rhomb'], det_row=_int(d['det_row']), det_col=_int(d['det_col']), pixel_num=_int(d['pixel_num'], null_val=-1), det_type=d['det_type'], det_id=d['detector_id'].strip(), mux_band=_int(d['mux_band']), mux_channel=_int(d['mux_channel']), mux_subband=d['mux_subband'], mux_bondpad=d['bond_pad'], det_angle_raw_deg=float(d['angle_raw_deg']), det_angle_actual_deg=float(d['angle_actual_deg']), mux_layout_pos=_int(d['mux_layout_position']), det_bandpass=d['bandpass'], det_pol=d['pol'], is_optical=is_optical, is_north=is_north ) if _int(d['smurf_channel']) != -1: r.smurf_channel = _int(d['smurf_channel']) if fp_pars is not None: x, y = float(d['det_x']), float(d['det_y']) xp = x * np.cos(theta) - y * np.sin(theta) + dx yp = x * np.sin(theta) + y * np.cos(theta) + dy if platform.upper() == 'SAT': xi, eta, gamma = optics.SAT_focal_plane( None, x=xp, y=yp, pol=0 ) elif platform.upper() == 'LAT': xi, eta, gamma = optics.LAT_focal_plane( None, zemax_path, x=xp, y=yp, pol=0, ) else: raise ValueError( f"Unknown platform {platform}. Must be 'SAT' or 'LAT'" ) r.xi = xi r.eta = eta r.gamma = gamma resonances.append(r) i += 1 return cls(resonances, name=name)
[docs] def add_resfit_data(self, resfit_file): """ Adds resonator quality data from a res_fit file. """ resfits = np.load(resfit_file, allow_pickle=True).item() for r in self.resonances: d = resfits[r.smurf_band][r.smurf_res_idx] r.res_qi = d['derived_params']['Qi']
[docs] def add_bgmap_data(self, bgmap_file): """ Adds bias group data from an sodetlib bgmap file. """ bgmap = np.load(bgmap_file, allow_pickle=True).item() bands = np.array([res.smurf_band for res in self.resonances], dtype=int) chans = np.array([res.smurf_channel for res in self.resonances], dtype=int) idxmap = map_band_chans(bands, chans, bgmap['bands'], bgmap['channels']) for i, r in enumerate(self.resonances): idx = idxmap[i] if idx == -1: continue bg = bgmap['bgmap'][idx] if bg != -1: r.bg = bg
[docs] def add_pointing(self, bands, chans, xis, etas): """ Adds measured detector pointing to resonators. """ bs = np.array([res.smurf_band for res in self.resonances], dtype=int) cs = np.array([res.smurf_channel for res in self.resonances], dtype=int) idxmap = map_band_chans(bs, cs, bands, chans) for i, r in enumerate(self.resonances): idx = idxmap[i] if idx == -1: continue r.xi = xis[idx] r.eta = etas[idx]
[docs] @dataclass class MatchParams: """ Any constants / hardcoded values go here to be fiddled with Args: unassigned_slots (int): Number of additional "unassigned" node to use per-side freq_offset_mhz (float): constant offset between resonator-set frequencies to use in matching. freq_width (float): width of exponential to use in the frequency cost function (MHz). dist_width (float) width of exponential to use in the pointing cost function (rad) unmatched_good_res_pen (float): penalty to apply to leaving a resonator with a good qi unassigned good_res_qi_thresh (float): qi threshold that is considered "good" force_src_pointing (bool): If true, will assign a np.inf penalty to leaving a src resonator with a provided pointing unmatched. assigned_bg_unmatched_pen (float): Penalty to apply to leaving a resonator with an assigned bg unmatched unassigned_bg_unmatched_pen (float): Penalty to apply to leaving a resonator with an unassigned bg unmatched assigned_bg_mismatch_pen (float): Penalty to apply for matching a resonator with an assigned bias line to another one with a mis-matched bias line. unassigned_bg_mismatch_pen (float): Penalty to apply for matching a resonator with no bias line to another one with a mis-matched bias line. """ unassigned_slots: int = 1000 freq_offset_mhz: float = 0.0 freq_width: float = 2. dist_width: float =0.01 unmatched_good_res_pen: float = 10. good_res_qi_thresh: float = 100e3 force_src_pointing: bool = False assigned_bg_unmatched_pen: float = 100000 unassigned_bg_unmatched_pen: float = 10000 assigned_bg_mismatch_pen: float = 100000 unassigned_bg_mismatch_pen: float = 1
[docs] @dataclass class MatchingStats: unmatched_src: int = 0 unmatched_dst: int = 0 unmatched_src_with_pointing: int = 0 matched_chans: int = 0 mismatched_bg: int = 0 freq_diff_avg: float = 0.0 freq_err_avg: float = 0.0 pointing_err_avg: float = 0.0
[docs] class Match: """ Class for performing a Resonance Matching between two sets of resonators, labeled `src` and `dst`. In the matching algorithm there is basically no difference between `src` and `dst` res-sets, except: - When merged, smurf-data such as band, channel, and res-idx will be taken from the ``src`` res-set - The ``force_src_pointing`` param can be used to assign a very high penalty to leaving any `src` resonator that has pointing info unassigned. Args: src (ResSet): The source resonator set dst (ResSet): The dest resonator set match_pars (MatchParams): MatchParams object used in the matching algorithm. This can be used to tune the cost-function and matching. apply_dst_pointing (bool): If True, the ``merged`` res-set will take its pointing information from ``dst`` instead of ``src``. Attributes: src (ResSet): The source resonator set dst (ResSet): The dest resonator set match_pars (MatchParams): MatchParams object used in the matching algorithm. This can be used to tune the cost-function and matching. matching (np.ndarray): A 2xN array of indices, where the first row corresponds to the indices of the src resonators, and the second row corresponds to the indices of the dst resonators. If the index is larger than the size of the corresponding res-set, that means the paired resonator was not matched. merged (ResSet): A ResSet containing the merged resonators. This is created by applying the "design" properties of the dst resonators to the source resonators. If a source resonator is not matched to any dest resonator, it will be copied as-is. stats (MatchingStats): A MatchingStats object containing some statistics about the matching. """ def __init__(self, src: ResSet, dst: ResSet, match_pars: Optional[MatchParams]=None, apply_dst_pointing=True): self.src = src self.dst = dst if match_pars is None: self.match_pars = MatchParams() else: self.match_pars = match_pars self.apply_dst_pointing = apply_dst_pointing self.matching_cost = np.nan self.matching, self.merged = self._match() self.stats = self.get_stats() def _get_biadjacency_matrix(self): src_arr = self.src.as_array() dst_arr = self.dst.as_array() mat = np.zeros((len(self.src), len(self.dst)), dtype=float) # N/S mismatch m = src_arr['is_north'][:, None] != dst_arr['is_north'][None, :] mat[m] = np.inf # Frequency offset df = src_arr['res_freq'][:, None] - dst_arr['res_freq'][None, :] df -= self.match_pars.freq_offset_mhz mat += np.exp((np.abs(df / self.match_pars.freq_width)) ** 2) # BG mismatch bgs_mismatch = src_arr['bg'][:, None] != dst_arr['bg'][None, :] bgs_unassigned = (src_arr['bg'][:, None] == 1) | (dst_arr['bg'][None, :] == -1) m = bgs_mismatch & bgs_unassigned mat[m] += self.match_pars.unassigned_bg_mismatch_pen m = bgs_mismatch & (~bgs_unassigned) mat[m] += self.match_pars.assigned_bg_mismatch_pen # If pointing, add cost if assigned too far dd = np.sqrt( (src_arr['xi'][:, None] - dst_arr['xi'][None, :])**2 \ + (src_arr['eta'][:, None] - dst_arr['eta'][None, :])**2) m = ~np.isnan(dd) mat[m] += np.exp((np.abs(dd[m] / self.match_pars.dist_width)) ** 2) # Any remaining nans should be set to info so matrix is still solvable mat[np.isnan(mat)] = np.inf return mat def _get_unassigned_costs(self, rs, force_if_pointing=True): ra = rs.as_array() arr = np.zeros(len(rs)) # Additional cost for leaving good resonator unassigned is_good_res = ra['res_qi'] > self.match_pars.good_res_qi_thresh is_good_res[np.isnan(is_good_res)] = 0 arr[is_good_res] += self.match_pars.unmatched_good_res_pen bg_assigned = ra['bg'] != -1 arr[bg_assigned] += self.match_pars.assigned_bg_unmatched_pen arr[~bg_assigned] += self.match_pars.unassigned_bg_unmatched_pen # Infinite cost if has pointing if force_if_pointing: m = ~np.isnan(ra['xi']) arr[m] = np.inf return arr def _match(self): nside = max(len(self.src), len(self.dst)) + self.match_pars.unassigned_slots # Keep this square so all resonators are included in final matching mat_full = np.zeros((nside, nside), dtype=float) with warnings.catch_warnings(): warnings.filterwarnings( action='ignore', message='overflow encountered in exp*', category=RuntimeWarning ) mat_full[:len(self.src), :len(self.dst)] = self._get_biadjacency_matrix() mat_full[:len(self.src), len(self.dst):] = \ self._get_unassigned_costs( self.src, force_if_pointing=self.match_pars.force_src_pointing )[:, None] mat_full[len(self.src):, :len(self.dst)] = \ self._get_unassigned_costs(self.dst, force_if_pointing=False)[None, :] mat_full[len(self.src):, len(self.dst):] = 0 self.matching = np.array(linear_sum_assignment(mat_full)) self.matching_cost = mat_full[self.matching[0], self.matching[1]].sum() for r1, r2 in self.get_match_iter(include_unmatched=True): if r1 is None: r2.matched = 0 continue if r2 is None: r1.matched = 0 continue r1.matched = 1 r1.match_idx = r2.idx r2.matched = 1 r2.match_idx = r1.idx resonances = [] for r1 in self.src: if r1.matched: r2 = self.dst[r1.match_idx] r = apply_design_properties( r1, r2, in_place=False, apply_pointing=self.apply_dst_pointing ) else: r = deepcopy(r1) resonances.append(r) self.merged = ResSet(resonances, name='merged') return self.matching, self.merged
[docs] def get_match_iter(self, include_unmatched=True) \ -> Iterator[Tuple[Optional[Resonator], Optional[Resonator]]]: """ Returns an iterator over matched resonators (r1, r2). Args: include_unmatched (bool): If True, will include unmatched resonators, with the pair set to None. """ for i, j in zip(*self.matching): if i < len(self.src): r1 = self.src[i] else: r1 = None if j < len(self.dst): r2 = self.dst[j] else: r2 = None if (r1 is None) and (r2 is None): continue if (r1 is None) or (r2 is None): if not include_unmatched: continue yield r1, r2
[docs] def get_stats(self) -> MatchingStats: """ Gets stats associated with current matching. """ stats = MatchingStats() src_arr = self.src.as_array() dst_arr = self.dst.as_array() stats.unmatched_src = np.sum(~src_arr['matched'].astype(bool)) stats.unmatched_dst = np.sum(~dst_arr['matched'].astype(bool)) has_pt = ~np.isnan(src_arr['xi']) stats.unmatched_src_with_pointing = np.sum( (~src_arr['matched'].astype(bool)) & has_pt ) dangs = [] for r1, r2 in self.get_match_iter(include_unmatched=False): stats.matched_chans += 1 if r1.bg != r2.bg: stats.mismatched_bg += 1 stats.freq_diff_avg += r1.res_freq - r2.res_freq stats.freq_err_avg += np.abs( r1.res_freq - r2.res_freq - self.match_pars.freq_offset_mhz ) dangs.append( np.sqrt((r1.xi - r2.xi)**2 + (r1.eta - r2.eta)**2) ) stats.pointing_err_avg = np.nan if (~np.isnan(dangs)).any(): stats.pointing_err_avg = np.nanmean(dangs) stats.freq_diff_avg /= stats.matched_chans stats.freq_diff_avg = float(stats.freq_diff_avg) stats.freq_err_avg /= stats.matched_chans stats.freq_err_avg = float(stats.freq_err_avg) return stats
[docs] def save(self, path): """Saves match to HDF5 file""" with h5py.File(path, 'w') as fout: fout.create_group('meta') fout.create_group('meta/match_pars') for k, v in asdict(self.match_pars).items(): fout['meta/match_pars'][k] = v if self.src.name is not None: fout['meta/src_name'] = self.src.name if self.dst.name is not None: fout['meta/dst_name'] = self.dst.name write_dataset( metadata.ResultSet.from_friend(self.src.as_array()), fout, 'src') write_dataset( metadata.ResultSet.from_friend(self.dst.as_array()), fout, 'dst') write_dataset(np.array(self.matching), fout, 'matching') write_dataset( metadata.ResultSet.from_friend(self.merged.as_array()), fout, 'merged')
[docs] @classmethod def load(cls, path): """ Loads a match from a h5 file (resulting from ``save`` method) """ with h5py.File(path) as f: src = ResSet.from_array(np.array(f['src'])) if 'meta/src_name' in f: src.name = f['meta/src_name'][()].decode() dst = ResSet.from_array(np.array(f['dst'])) if 'meta/dst_name' in f: dst.name = f['meta/dst_name'][()].decode() match_pars = {} for k in f['meta/match_pars'].keys(): match_pars[k] = f['meta/match_pars'][k][()] match_pars = MatchParams(**match_pars) match = cls(src, dst, match_pars=match_pars) return match
[docs] def plot_match_freqs(m: Match, is_north=True, show_offset=False, xlim=None): """ Plots src and dst ResSet freqs in a match, along with their matching assignments. """ fig, ax = plt.subplots(figsize=(10, 6)) if not show_offset: offset = m.match_pars.freq_offset_mhz else: offset = 0 yw= 0.4 def plot_res(ax, r: Resonator, y=0, offset=0): if r is None: return if r.is_north != is_north: return xs = [r.res_freq+offset, r.res_freq+offset] ys = [y - yw, y+yw] if r.bg == -1: c = 'grey' else: c = f'C{r.bg}' ax.plot(xs, ys, c=c) for r1, r2 in m.get_match_iter(): plot_res(ax, r1, y=0) plot_res(ax, r2, y=1, offset=offset) for r1, r2 in m.get_match_iter(include_unmatched=False): if r1.is_north != is_north: continue ax.plot([r1.res_freq, r2.res_freq+offset], [yw, 1-yw], c='grey', alpha=.3) ax.set_yticks([yw/2, 1-yw/2]) labels = ['ResSet1', 'ResSet2'] for i, rs in enumerate([m.src, m.dst]): if rs.name is not None: labels[i] = rs.name ax.set_yticklabels(labels, fontsize=14, rotation=90, va='center') ax.set_ylim(0, 1) if xlim is not None: ax.set_xlim(*xlim) ax.set_xlabel("Resonance Freq (MHz)", fontsize=14) ax.set_title(f"Freq Matching (north={is_north})") return fig, ax
[docs] def plot_match_pointing(match: Match, show_pairs=True): fig, ax = plt.subplots(figsize=(10, 6)) sa, da = match.src.as_array(), match.dst.as_array() plt.plot(np.rad2deg(da['xi']), np.rad2deg(da['eta']), '.') plt.plot(np.rad2deg(sa['xi']), np.rad2deg(sa['eta']), '.') if show_pairs: for r1, r2 in match.get_match_iter(include_unmatched=False): plt.plot( *np.rad2deg(np.array([[r1.xi, r2.xi], [r1.eta, r2.eta]])), color='red', alpha=.2 ) ax.set_xlabel("Xi (deg)") ax.set_ylabel("Eta (deg)") return fig, ax