import numpy as np
from operator import attrgetter
import copy
import warnings
import re
from so3g.proj import Ranges, RangesMatrix
import sotodlib.core as core
import sotodlib.tod_ops as tod_ops
import sotodlib.obs_ops as obs_ops
from sotodlib.hwp import hwp, hwp_angle_model
import sotodlib.coords.planets as planets
from sotodlib.core.flagman import (has_any_cuts, has_all_cut,
count_cuts, flag_cut_select,
sparse_to_ranges_matrix)
from sotodlib.preprocess import preprocess_util as pp_util
from .pcore import _Preprocess, _FracFlaggedMixIn
from .. import flag_utils
from ..core import AxisManager
logger = pp_util.init_logger("preprocess")
[docs]
class FFTTrim(_Preprocess):
"""Trim the AxisManager to optimize for faster FFTs later in the pipeline.
All processing configs go to `fft_trim`
.. autofunction:: sotodlib.tod_ops.fft_trim
"""
name = "fft_trim"
def __init__(self, step_cfgs):
self.save_name = None
super().__init__(step_cfgs)
def process(self, aman, proc_aman, sim=False, data_aman=None):
if data_aman is not None:
raise NotImplementedError("No support for using data AxisManager in process")
start_stop = tod_ops.fft_trim(aman, **self.process_cfgs)
proc_aman.restrict(self.process_cfgs.get('axis', 'samps'), (start_stop))
return aman, proc_aman
[docs]
class Detrend(_Preprocess):
"""Detrend the signal. All processing configs go to `detrend_tod`
.. autofunction:: sotodlib.tod_ops.detrend_tod
"""
name = "detrend"
def __init__(self, step_cfgs):
self.signal = step_cfgs.get('signal', 'signal')
self.save_name = None
super().__init__(step_cfgs)
def process(self, aman, proc_aman, sim=False, data_aman=None):
if data_aman is not None:
raise NotImplementedError("No support for using data AxisManager in process")
tod_ops.detrend_tod(aman, signal_name=self.signal,
**self.process_cfgs)
return aman, proc_aman
class DetBiasFlags(_FracFlaggedMixIn, _Preprocess):
"""
Derive poorly biased detectors from IV and Bias Step data. Save results
in proc_aman under the "det_bias_cuts" field.
.. autofunction:: sotodlib.tod_ops.flags.get_det_bias_flags
"""
name = "det_bias_flags"
_influx_field = "det_bias_flags_frac"
def __init__(self, step_cfgs):
self.save_name = "det_bias_flags"
super().__init__(step_cfgs)
def calc_and_save(self, aman, proc_aman):
dbc_aman = tod_ops.flags.get_det_bias_flags(aman, merge=False, full_output=True,
**self.calc_cfgs)
self.save(proc_aman, dbc_aman)
return aman, proc_aman
def save(self, proc_aman, dbc_aman):
if self.save_cfgs is None:
return
if self.save_cfgs:
proc_aman.wrap(self.save_name, dbc_aman)
def select(self, meta, proc_aman=None, in_place=True):
if self.select_cfgs is None:
return meta
if proc_aman is None:
proc_aman = meta.preprocess
keep = ~has_all_cut(proc_aman.det_bias_flags.det_bias_flags)
if in_place:
meta.restrict("dets", meta.dets.vals[keep])
return meta
else:
return keep
def plot(self, aman, proc_aman, filename):
if self.plot_cfgs is None:
return
if self.plot_cfgs:
from .preprocess_plot import plot_det_bias_flags
filename = filename.replace('{ctime}', f'{str(aman.timestamps[0])[:5]}')
filename = filename.replace('{obsid}', aman.obs_info.obs_id)
det = aman.dets.vals[0]
ufm = det.split('_')[2]
plot_det_bias_flags(aman, proc_aman['det_bias_flags'], rfrac_range=self.calc_cfgs['rfrac_range'],
psat_range=self.calc_cfgs['psat_range'], filename=filename.replace('{name}', f'{ufm}_bias_cuts_venn'))
[docs]
class Trends(_FracFlaggedMixIn, _Preprocess):
"""Calculate the trends in the data to look for unlocked detectors. All
calculation configs go to `get_trending_flags`.
Saves results in proc_aman under the "trend" field.
Data selection can have key "kind" equal to "any" or "all."
Example config block::
- name : "trends"
signal: "signal" # optional
calc:
max_trend: 2.5
t_piece: 100
save: True
plot: True
select:
kind: "any"
.. autofunction:: sotodlib.tod_ops.flags.get_trending_flags
"""
name = "trends"
_influx_field = "trend_flags_frac"
def __init__(self, step_cfgs):
self.signal = step_cfgs.get('signal', 'signal')
self.save_name = "trends"
super().__init__(step_cfgs)
def calc_and_save(self, aman, proc_aman):
_, trend_aman = tod_ops.flags.get_trending_flags(
aman, merge=False, full_output=True,
signal=aman[self.signal], **self.calc_cfgs)
aman.wrap("trends", trend_aman)
self.save(proc_aman, trend_aman)
return aman, proc_aman
def save(self, proc_aman, trend_aman):
if self.save_cfgs is None:
return
if self.save_cfgs:
proc_aman.wrap(self.save_name, trend_aman)
def select(self, meta, proc_aman=None, in_place=True):
if self.select_cfgs is None:
return meta
if proc_aman is None:
proc_aman = meta.preprocess
if self.select_cfgs["kind"] == "any":
keep = ~has_any_cuts(proc_aman.trends.trend_flags)
elif self.select_cfgs["kind"] == "all":
keep = ~has_all_cut(proc_aman.trends.trend_flags)
else:
raise ValueError(f"Entry '{self.select_cfgs['kind']}' not"
"understood. Expect 'any' or 'all'")
if in_place:
meta.restrict("dets", meta.dets.vals[keep])
return meta
else:
return keep
def plot(self, aman, proc_aman, filename):
if self.plot_cfgs is None:
return
if self.plot_cfgs:
from .preprocess_plot import plot_trending_flags
filename = filename.replace('{ctime}', f'{str(aman.timestamps[0])[:5]}')
filename = filename.replace('{obsid}', aman.obs_info.obs_id)
det = aman.dets.vals[0]
ufm = det.split('_')[2]
plot_trending_flags(aman, proc_aman['trends'], filename=filename.replace('{name}', f'{ufm}_trending_flags'))
[docs]
class GlitchDetection(_FracFlaggedMixIn, _Preprocess):
"""Run glitch detection algorithm to find glitches. All calculation configs
go to `get_glitch_flags`
Saves retsults in proc_aman under the "glitches" field.
Data section should define a glitch significant "sig_glitch" and a maximum
number of glitches "max_n_glitch."
Example configuration block::
- name: "glitches"
glitch_name: "my_glitches"
calc:
signal_name: "hwpss_remove"
t_glitch: 0.00001
buffer: 10
hp_fc: 1
n_sig: 10
subscan: False
save: True
plot:
plot_ds_factor: 50
select:
max_n_glitch: 10
sig_glitch: 10
.. autofunction:: sotodlib.tod_ops.flags.get_glitch_flags
"""
name = "glitches"
_influx_field = "glitch_flags_frac"
def __init__(self, step_cfgs):
self.glitch_name = step_cfgs.get('glitch_name', 'glitches')
self.save_name = self.glitch_name
super().__init__(step_cfgs)
def calc_and_save(self, aman, proc_aman):
_, glitch_aman = tod_ops.flags.get_glitch_flags(aman,
merge=False, full_output=True, **self.calc_cfgs
)
aman.wrap(self.glitch_name, glitch_aman)
self.save(proc_aman, glitch_aman)
if self.calc_cfgs.get('save_plot', False):
flag_utils.plot_glitch_stats(aman, save_path=self.calc_cfgs['save_plot'])
return aman, proc_aman
def save(self, proc_aman, glitch_aman):
if self.save_cfgs is None:
return
if self.save_cfgs:
proc_aman.wrap(self.save_name, glitch_aman)
def select(self, meta, proc_aman=None, in_place=True):
if self.select_cfgs is None:
return meta
if proc_aman is None:
proc_aman = meta.preprocess
flag = sparse_to_ranges_matrix(
proc_aman[self.glitch_name].glitch_detection > self.select_cfgs["sig_glitch"]
)
n_cut = count_cuts(flag)
keep = n_cut <= self.select_cfgs["max_n_glitch"]
if in_place:
meta.restrict("dets", meta.dets.vals[keep])
return meta
else:
return keep
def plot(self, aman, proc_aman, filename):
if self.plot_cfgs is None:
return
if self.plot_cfgs:
from .preprocess_plot import plot_signal_diff, plot_flag_stats
filename = filename.replace('{ctime}', f'{str(aman.timestamps[0])[:5]}')
filename = filename.replace('{obsid}', aman.obs_info.obs_id)
det = aman.dets.vals[0]
ufm = det.split('_')[2]
plot_signal_diff(aman, proc_aman[self.glitch_name], flag_type='glitches', flag_threshold=self.select_cfgs.get("max_n_glitch", 10),
plot_ds_factor=self.plot_cfgs.get("plot_ds_factor", 50), filename=filename.replace('{name}', f'{ufm}_glitch_signal_diff'))
plot_flag_stats(aman, proc_aman[self.glitch_name], flag_type='glitches', filename=filename.replace('{name}', f'{ufm}_glitch_stats'))
[docs]
class FixJumps(_Preprocess):
"""
Repairs the jump heights given a set of jump flags and heights.
Example config block::
- name: "fix_jumps"
signal: "signal" # optional
process:
jumps_aman: "jumps_2pi"
.. autofunction:: sotodlib.tod_ops.jumps.jumpfix_subtract_heights
"""
name = "fix_jumps"
def __init__(self, step_cfgs):
self.signal = step_cfgs.get('signal', 'signal')
self.save_name = None
super().__init__(step_cfgs)
def process(self, aman, proc_aman, sim=False, data_aman=None):
if data_aman is not None:
raise NotImplementedError("No support for using data AxisManager in process")
field = self.process_cfgs['jumps_aman']
aman[self.signal] = tod_ops.jumps.jumpfix_subtract_heights(
aman[self.signal], proc_aman[field].jump_flag.mask(),
inplace=True, heights=proc_aman[field].jump_heights)
return aman, proc_aman
[docs]
class Jumps(_FracFlaggedMixIn, _Preprocess):
"""Run generic jump finding and fixing algorithm.
calc_cfgs should have 'function' defined as one of
'find_jumps', 'twopi_jumps' or 'slow_jumps'. Any additional configs to the
jump function goes in 'jump_configs'.
Saves results in proc_aman under the "jumps" field.
Data section should define a maximum number of jumps "max_n_jumps".
Example config block::
- name: "jumps"
calc:
function: "twopi_jumps"
save:
jumps_name: "jumps_2pi"
plot:
plot_ds_factor: 50
select:
max_n_jumps: 5
.. autofunction:: sotodlib.tod_ops.jumps.find_jumps
"""
name = "jumps"
_influx_field = "jump_flags_frac"
def __init__(self, step_cfgs):
self.signal = step_cfgs.get('signal', 'signal')
self.save_name = 'jumps' if isinstance(step_cfgs.get('save'), bool) else step_cfgs.get('save', {}).get('jumps_name', 'jumps')
super().__init__(step_cfgs)
def calc_and_save(self, aman, proc_aman):
function = self.calc_cfgs.get("function", "find_jumps")
cfgs = self.calc_cfgs.get('jump_configs', {})
if function == 'find_jumps':
func = tod_ops.jumps.find_jumps
elif function == 'twopi_jumps':
func = tod_ops.jumps.twopi_jumps
elif function == 'slow_jumps':
func = tod_ops.jumps.slow_jumps
else:
raise ValueError("function must be 'find_jumps', 'twopi_jumps' or"
f"'slow_jumps'. Received {function}")
jumps, heights = func(aman, merge=False, fix=False,
signal=aman[self.signal], **cfgs)
jump_aman = tod_ops.jumps.jumps_aman(aman, jumps, heights)
self.save(proc_aman, jump_aman)
return aman, proc_aman
def save(self, proc_aman, jump_aman):
if self.save_cfgs is None:
return
if self.save_cfgs:
proc_aman.wrap(self.save_name, jump_aman)
def select(self, meta, proc_aman=None, in_place=True):
if self.select_cfgs is None:
return meta
if proc_aman is None:
proc_aman = meta.preprocess
name = self.save_cfgs.get('jumps_name', 'jumps')
n_cut = count_cuts(proc_aman[name].jump_flag)
keep = n_cut <= self.select_cfgs["max_n_jumps"]
if in_place:
meta.restrict("dets", meta.dets.vals[keep])
return meta
return keep
def plot(self, aman, proc_aman, filename):
if self.plot_cfgs is None:
return
if self.plot_cfgs:
from .preprocess_plot import plot_signal_diff, plot_flag_stats
filename = filename.replace('{ctime}', f'{str(aman.timestamps[0])[:5]}')
filename = filename.replace('{obsid}', aman.obs_info.obs_id)
det = aman.dets.vals[0]
ufm = det.split('_')[2]
name = self.save_cfgs.get('jumps_name', 'jumps')
plot_signal_diff(aman, proc_aman[name], flag_type='jumps', flag_threshold=self.select_cfgs.get("max_n_jumps", 5),
plot_ds_factor=self.plot_cfgs.get("plot_ds_factor", 50), filename=filename.replace('{name}', f'{ufm}_jump_signal_diff'))
plot_flag_stats(aman, proc_aman[name], flag_type='jumps', filename=filename.replace('{name}', f'{ufm}_jumps_stats'))
[docs]
class PSDCalc(_Preprocess):
""" Calculate the PSD of the data and add it to the AxisManager under the
"psd" field.
Note: noverlap = 0 amd full_output = True are recommended to get unbiased
median white noise estimation by Noise.
Example config block::
- "name : "psd"
"signal: "signal" # optional
"wrap": "psd" # optional
"process":
"nperseg": 1024 # optional
"noverlap": 0 # optional
"wrap_name": "psd" # optional
"subscan": False # optional
"full_output": True # optional
.. autofunction:: sotodlib.tod_ops.fft_ops.calc_psd
"""
name = "psd"
def __init__(self, step_cfgs):
self.signal = step_cfgs.get('signal', 'signal')
self.wrap = step_cfgs.get('wrap', 'psd')
self.save_name = None
super().__init__(step_cfgs)
def process(self, aman, proc_aman, sim=False, data_aman=None):
if data_aman is not None:
raise NotImplementedError("No support for using data AxisManager in process")
full_output = self.process_cfgs.get('full_output')
if full_output:
freqs, Pxx, nseg = tod_ops.fft_ops.calc_psd(aman, signal=aman[self.signal],
**self.process_cfgs)
else:
freqs, Pxx = tod_ops.fft_ops.calc_psd(aman, signal=aman[self.signal],
**self.process_cfgs)
fft_aman = core.AxisManager(aman.dets,
core.OffsetAxis("nusamps", len(freqs)))
pxx_axis_map = [(0, "dets"), (1, "nusamps")]
if self.process_cfgs.get('subscan', False):
fft_aman.wrap("Pxx_ss", Pxx, pxx_axis_map+[(2, aman.subscans)])
Pxx = np.nanmean(Pxx, axis=-1) # Mean of subscans
if full_output:
fft_aman.wrap("nseg_ss", nseg, [(0, aman.subscans)])
nseg = np.nansum(nseg)
fft_aman.wrap("freqs", freqs, [(0,"nusamps")])
fft_aman.wrap("Pxx", Pxx, pxx_axis_map)
if full_output:
fft_aman.wrap("nseg", nseg)
aman.wrap(self.wrap, fft_aman)
return aman, proc_aman
def calc_and_save(self, aman, proc_aman):
if "frequency_cutoffs" in proc_aman:
proc_aman["frequency_cutoffs"].wrap(self.wrap, proc_aman["frequency_cutoffs"][self.signal])
return aman, proc_aman
def plot(self, aman, proc_aman, filename):
if self.plot_cfgs is None:
return
if self.plot_cfgs:
from .preprocess_plot import plot_psd
filename = filename.replace('{ctime}', f'{str(aman.timestamps[0])[:5]}')
filename = filename.replace('{obsid}', aman.obs_info.obs_id)
det = aman.dets.vals[0]
ufm = det.split('_')[2]
filename = filename.replace('{name}', f'{ufm}_{self.wrap}')
plot_psd(aman, signal=attrgetter(f"{self.wrap}.Pxx")(proc_aman),
xx=attrgetter(f"{self.wrap}.freqs")(aman), filename=filename, **self.plot_cfgs)
class NoiseRatio(_Preprocess):
""" Compute ratios of "signal band" to white noise in PSDs.
Example config block::
- name: "noise_ratio"
psd: "psdQ"
wrap: "noise_ratio_Q"
subscan: False
calc:
f_sel: [0.04, 0.14]
f_wn: [0.6, 1.0]
save: True
select:
r_max: 1.19
select_per_detector: True
.. autofunction:: sotodlib.tod_ops.fft_ops.noise_ratio
"""
name = "noise_ratio"
def __init__(self, step_cfgs):
self.psd = step_cfgs.get('psd', 'psd')
self.wrap = step_cfgs.get('wrap', 'noise_ratio')
self.subscan = step_cfgs.get('subscan', False)
self.save_name = self.wrap
super().__init__(step_cfgs)
def calc_and_save(self, aman, proc_aman):
if self.psd not in aman:
raise ValueError("PSD is not saved in AxisManager")
psd = aman[self.psd]
pxx = psd.Pxx_ss if self.subscan else psd.Pxx
if self.calc_cfgs is None:
self.calc_cfgs = {}
calc_aman = tod_ops.fft_ops.noise_ratio(proc_aman, pxx, psd.freqs, subscan=self.subscan, **self.calc_cfgs)
self.save(proc_aman, calc_aman)
return aman, proc_aman
def save(self, proc_aman, calc_aman):
if self.save_cfgs is None:
return
else:
proc_aman.wrap(self.save_name, calc_aman)
def select(self, meta, proc_aman=None, in_place=True):
if self.select_cfgs is None:
return meta
if proc_aman is None:
proc_aman = meta.preprocess
per_detector = self.select_cfgs.get("select_per_detector", True)
if per_detector:
noise_ratio = proc_aman[self.wrap]["rdets"]
else:
noise_ratio = proc_aman[self.wrap]["rmean"]
if self.subscan:
noise_ratio = np.nanmean(noise_ratio, axis=-1) # Mean over subscans
keep = np.ones_like(meta.dets.vals, dtype=bool)
keep &= (noise_ratio <= np.float64(self.select_cfgs["r_max"]))
if in_place:
meta.restrict("dets", meta.dets.vals[keep])
return meta
else:
return keep
[docs]
class GetStats(_Preprocess):
"""
Get basic statistics from a TOD or its power spectrum.
Example config block::
- name : "tod_stats"
signal: "signal" # optional
wrap: "tod_stats" # optional
calc:
stat_names: ["median", "std"]
split_subscans: False # optional
psd_mask: # optional, for cutting a power spectrum in frequency
freqs: "psd.freqs"
low_f: 1
high_f: 10
save: True
"""
name = "tod_stats"
def __init__(self, step_cfgs):
self.signal = step_cfgs.get('signal', 'signal')
self.wrap = step_cfgs.get('wrap', 'tod_stats')
self.save_name = self.wrap
super().__init__(step_cfgs)
def calc_and_save(self, aman, proc_aman):
if self.calc_cfgs.get('psd_mask') is not None:
mask_dict = self.calc_cfgs.get('psd_mask')
_f = attrgetter(mask_dict['freqs'])
try:
freqs = _f(aman)
except KeyError:
freqs = _f(proc_aman)
low_f, high_f = mask_dict['low_f'], mask_dict['high_f']
fmask = np.all([freqs >= low_f, freqs <= high_f], axis=0)
self.calc_cfgs['mask'] = fmask
del self.calc_cfgs['psd_mask']
_f = attrgetter(self.signal)
try:
signal = _f(aman)
except KeyError:
signal = _f(proc_aman)
stats_aman = tod_ops.flags.get_stats(aman, signal, **self.calc_cfgs)
self.save(proc_aman, stats_aman)
return aman, proc_aman
def save(self, proc_aman, stats_aman):
if not(self.save_cfgs is None):
proc_aman.wrap(self.save_name, stats_aman)
def plot(self, aman, proc_aman, filename):
if self.plot_cfgs is None:
return
if self.plot_cfgs:
from .preprocess_plot import plot_signal
filename = filename.replace('{ctime}', f'{str(aman.timestamps[0])[:5]}')
filename = filename.replace('{obsid}', aman.obs_info.obs_id)
det = aman.dets.vals[0]
ufm = det.split('_')[2]
filename = filename.replace('{name}', f'{ufm}_{self.signal}')
plot_signal(aman, signal_name=self.signal, x_name="timestamps", filename=filename, **self.plot_cfgs)
class CutBadDistribution(_Preprocess):
"""
Detector cuts to keep a statistic within some bounds of a gaussian distribution.
Example config::
- name: "cut_bad_dist"
select:
param_name: wn_signal
outlier_range: [0.5, 2.0]
kurtosis_threshold: 2.0
blame_max: False
blame_min: False
For parameter options see: :func:`sotodlib.tod_ops.flags.get_good_distribution_flags`
"""
name = "cut_bad_dist"
def __init__(self, step_cfgs):
self.save_name = None
super().__init__(step_cfgs)
def select(self, meta, proc_aman=None, in_place=True):
if self.select_cfgs is None:
return meta
if proc_aman is None:
proc_aman = meta.preprocess
stat_name = self.select_cfgs.get('param_name', 'wn_signal')
if stat_name in meta:
keep = tod_ops.flags.get_good_distribution_flags(meta, **self.select_cfgs)
elif stat_name in proc_aman:
keep = tod_ops.flags.get_good_distribution_flags(proc_aman, **self.select_cfgs)
else:
raise ValueError(f'{stat_name} not found in aman or proc_aman not applying bad dist cut.')
if in_place:
meta.restrict("dets", meta.dets.vals[keep])
return meta
else:
return keep
[docs]
class Noise(_Preprocess):
"""
Estimate the white noise levels in the data. Assumes the PSD has been
wrapped into the preprocessing AxisManager. All calculation configs go to
``calc_wn``.
Saves the results into the "noise" field of proc_aman.
Can run data selection of a "max_noise" value.
When ``fit: True``, the parameter ``wn_est`` can be a float or the name of an
axis manager containing an array named ``white_noise``. If not specified,
the white noise is calculated with ``calc_wn()`` and used for ``wn_est``.
The calculated white noise will be stored in the noise fit axis manager.
Example config block for fitting PSD::
- name: "noise"
fit: True
subscan: False
calc:
fwhite: (5, 10)
lowf: 1
f_max: 25
mask: True
wn_est: noise
fixed_param: 'wn'
binning: True
fit_method: log_curve_fit #or likelihood
curve_fit_kwargs:
maxfev: 20000
save: True
select:
max_noise: 2000
require_finite_fit: True
Set ``select.require_finite_fit`` to ``True`` to drop detectors whose fit
parameters contain NaNs (indicating a failed noise fit).
Example config block for calculating white noise only::
- name: "noise"
fit: False
subscan: False
calc:
low_f: 5
high_f: 20
save: True
select:
min_noise: 18e-6
max_noise: 80e-6
If ``fit: True`` this operation will run
:func:`sotodlib.tod_ops.fft_ops.fit_noise_model`, else it will run
:func:`sotodlib.tod_ops.fft_ops.calc_wn`.
"""
name = "noise"
_influx_field = "median_white_noise"
def __init__(self, step_cfgs):
self.psd = step_cfgs.get('psd', 'psd')
self.fit = step_cfgs.get('fit', False)
self.subscan = step_cfgs.get('subscan', False)
self.save_name = 'noise' if isinstance(step_cfgs.get('save'), bool) else step_cfgs.get('save', {}).get('wrap_name', 'noise')
super().__init__(step_cfgs)
def calc_and_save(self, aman, proc_aman):
if self.psd not in aman:
raise ValueError("PSD is not saved in AxisManager")
psd = aman[self.psd]
pxx = psd.Pxx_ss if self.subscan else psd.Pxx
if "frequency_cutoffs" in proc_aman:
frequency_cutoff = proc_aman["frequency_cutoffs"][self.psd]
else:
frequency_cutoff=None
def check_frequency_cutoff(fmin, fmax):
# limit upper frequency cutoffs to hwp freq
if 'hwp_angle' in aman and frequency_cutoff is not None:
hwp_freq = (np.sum(np.abs(np.diff(np.unwrap(aman.hwp_angle)))) /
(aman.timestamps[-1] - aman.timestamps[0])) / (2 * np.pi)
if fmax > frequency_cutoff:
logger.warning(f"Upper freq={fmax} > hwp freq cutoff={frequency_cutoff:.2f}. Limiting to hwp freq cutoff.")
fmax = frequency_cutoff
if fmin is not None and fmin >= fmax:
raise ValueError(f"lower freq={fmin} >= upper freq={fmax}")
return fmax
if self.calc_cfgs is None:
self.calc_cfgs = {}
if self.fit:
fcfgs = copy.deepcopy(self.calc_cfgs)
fit_method = fcfgs.pop('fit_method', None)
curve_fit_kwargs = fcfgs.pop('curve_fit_kwargs', None)
if curve_fit_kwargs is None and fit_method == 'log_curve_fit':
curve_fit_kwargs = {}
if curve_fit_kwargs is not None and not isinstance(curve_fit_kwargs, dict):
raise TypeError("calc.curve_fit_kwargs must be a mapping when provided")
fixed_param = fcfgs.get('fixed_param', [])
wn_est = fcfgs.get('wn_est', None)
calc_wn = False
if isinstance(wn_est, str):
if wn_est in proc_aman:
fcfgs['wn_est'] = proc_aman[wn_est].white_noise
else:
calc_wn = True
if calc_wn or wn_est is None:
wn_f_low, wn_f_high = fcfgs.get('fwhite', (5, 10))
wn_f_high = check_frequency_cutoff(wn_f_low, wn_f_high)
fcfgs['wn_est'] = tod_ops.fft_ops.calc_wn(aman, pxx=pxx,
freqs=psd.freqs,
nseg=psd.get('nseg'),
low_f=wn_f_low,
high_f=wn_f_high)
if fcfgs.get('subscan') is None:
fcfgs['subscan'] = self.subscan
fcfgs.pop('fwhite', None)
f_max = check_frequency_cutoff(fcfgs.get("lowf", None), fcfgs.pop("f_max", 100))
if fit_method is not None:
fcfgs['fit_method'] = fit_method
if curve_fit_kwargs is not None:
fcfgs['curve_fit_kwargs'] = curve_fit_kwargs
calc_aman = tod_ops.fft_ops.fit_noise_model(aman, pxx=pxx,
f=psd.freqs,
merge_fit=True,
f_max=f_max,
**fcfgs)
if calc_wn or wn_est is None:
if not self.subscan:
calc_aman.wrap("white_noise", fcfgs['wn_est'], [(0,"dets")])
calc_aman.wrap("std", fcfgs['wn_est']*np.sqrt(psd.freqs[-1]-psd.freqs[0]), [(0,"dets")])
else:
calc_aman.wrap("white_noise", fcfgs['wn_est'], [(0,"dets"), (1,"subscans")])
calc_aman.wrap("std", fcfgs['wn_est']*np.sqrt(psd.freqs[-1]-psd.freqs[0]), [(0,"dets"), (1,"subscans")])
else:
wn_f_low = self.calc_cfgs.get("low_f", 5)
wn_f_high = self.calc_cfgs.get("high_f", 10)
wn_f_high = check_frequency_cutoff(wn_f_low, wn_f_high)
wn = tod_ops.fft_ops.calc_wn(aman, pxx=pxx,
freqs=psd.freqs,
nseg=psd.get('nseg'),
low_f=wn_f_low,
high_f=wn_f_high)
if not self.subscan:
calc_aman = core.AxisManager(aman.dets)
calc_aman.wrap("white_noise", wn, [(0,"dets")])
calc_aman.wrap("std", wn*np.sqrt(psd.freqs[-1]-psd.freqs[0]), [(0,"dets")])
else:
calc_aman = core.AxisManager(aman.dets, aman.subscan_info.subscans)
calc_aman.wrap("white_noise", wn, [(0,"dets"), (1,"subscans")])
calc_aman.wrap("std", wn*np.sqrt(psd.freqs[-1]-psd.freqs[0]), [(0,"dets"), (1,"subscans")])
self.save(proc_aman, calc_aman)
return aman, proc_aman
def save(self, proc_aman, noise):
if self.save_cfgs is None:
return
if self.save_cfgs:
proc_aman.wrap(self.save_name, noise)
def select(self, meta, proc_aman=None, in_place=True):
if self.select_cfgs is None:
return meta
if proc_aman is None:
proc_aman = meta.preprocess
if isinstance(self.save_cfgs, bool):
noise_aman = proc_aman[self.select_cfgs.get('name', 'noise')]
elif 'wrap_name' in self.save_cfgs:
noise_aman = proc_aman[self.select_cfgs.get('name', self.save_cfgs['wrap_name'])]
else:
noise_aman = proc_aman[self.select_cfgs.get('name', 'noise')]
if self.fit:
wnix = np.where(noise_aman.noise_model_coeffs.vals == 'white_noise')[0][0]
wn = noise_aman.fit[:,wnix]
fkix = np.where(noise_aman.noise_model_coeffs.vals == 'fknee')[0][0]
fk = noise_aman.fit[:,fkix]
else:
wn = noise_aman.white_noise
fk = None
if self.subscan:
wn = np.nanmean(wn, axis=-1) # Mean over subscans
if fk is not None:
fk = np.nanmean(fk, axis=-1) # Mean over subscans
keep = np.ones_like(wn, dtype=bool)
if "max_noise" in self.select_cfgs.keys():
keep &= (wn <= np.float64(self.select_cfgs["max_noise"]))
if "min_noise" in self.select_cfgs.keys():
keep &= (wn >= np.float64(self.select_cfgs["min_noise"]))
if fk is not None and "max_fknee" in self.select_cfgs.keys():
keep &= (fk <= np.float64(self.select_cfgs["max_fknee"]))
if self.fit and self.select_cfgs.get("require_finite_fit", False):
fit_vals = np.asarray(noise_aman.fit)
fit_flat = fit_vals.reshape(fit_vals.shape[0], -1)
keep &= np.all(np.isfinite(fit_flat), axis=1)
if in_place:
meta.restrict("dets", meta.dets.vals[keep])
return meta
else:
return keep
[docs]
class Calibrate(_Preprocess):
"""Calibrate the timestreams based on some provided information.
Type of calibration is decided by process["kind"]
1. "single_value" : multiplies entire signal by the single value
process["val"]
2. "array" : takes the dot product of the array with the entire signal. The
array is specified by ``process["cal_array"]``, which must exist in
``aman``. The array can be nested within additional ``AxisManager``
objects, for instance ``det_cal.phase_to_pW``.
Example config block(s)::
- name: "calibrate"
process:
kind: "single_value"
divide: True # If true will divide instead of multiply.
# phase_to_pA: 9e6/(2*np.pi)
val: 1432394.4878270582
- name: "calibrate"
process:
kind: "array"
cal_array: "cal.array"
select:
cut_array: "cal.missing_cal" # should be 0 where cal is good 1 where missing.
"""
name = "calibrate"
def __init__(self, step_cfgs):
self.signal = step_cfgs.get('signal', 'signal')
self.save_name = None
super().__init__(step_cfgs)
def process(self, aman, proc_aman, sim=False, data_aman=None):
if data_aman is not None:
raise NotImplementedError("No support for using data AxisManager in process")
if self.process_cfgs["kind"] == "single_value":
if self.process_cfgs.get("divide", False):
aman[self.signal] /= self.process_cfgs["val"]
else:
aman[self.signal] *= self.process_cfgs["val"]
elif self.process_cfgs["kind"] == "array":
field = self.process_cfgs["cal_array"]
_f = attrgetter(field)
if self.process_cfgs.get("proc_aman_cal", False):
cal_arr = _f(proc_aman)
else:
cal_arr = _f(aman)
cal_arr = cal_arr.astype(np.float32)
if self.process_cfgs.get("divide", False):
aman[self.signal] = np.divide(aman[self.signal].T, cal_arr).T
else:
aman[self.signal] = np.multiply(aman[self.signal].T, cal_arr).T
else:
raise ValueError(f"Entry '{self.process_cfgs['kind']}'"
" not understood")
return aman, proc_aman
def select(self, meta, proc_aman=None, in_place=True):
if self.select_cfgs is None:
return meta
keep = meta[self.select_cfgs['cut_array']] == 0
if in_place:
meta.restrict("dets", meta.dets.vals[keep])
return meta
else:
return keep
[docs]
class EstimateHWPSS(_Preprocess):
"""
Builds a HWPSS Template. Calc configs go to ``hwpss_model``.
Results of fitting saved if field specified by calc["name"].
Example config block::
- "name : "estimate_hwpss"
"calc":
"signal_name": "signal" # optional
"hwpss_stats_name": "hwpss_stats"
"save": True
.. autofunction:: sotodlib.hwp.hwp.get_hwpss
"""
name = "estimate_hwpss"
_influx_field = "hwpss_coeffs"
_influx_percentiles = [0, 50, 75, 90, 95, 100]
def __init__(self, step_cfgs):
self.save_name = step_cfgs.get('calc', {}).get("hwpss_stats_name")
super().__init__(step_cfgs)
def calc_and_save(self, aman, proc_aman):
hwpss_stats = hwp.get_hwpss(aman, **self.calc_cfgs)
self.save(proc_aman, hwpss_stats)
return aman, proc_aman
def save(self, proc_aman, hwpss_stats):
if self.save_cfgs:
proc_aman.wrap(self.save_name, hwpss_stats)
else:
return
def plot(self, aman, proc_aman, filename):
if self.plot_cfgs is None:
return
if self.plot_cfgs:
from .preprocess_plot import plot_4f_2f_counts, plot_hwpss_fit_status
filename = filename.replace('{ctime}', f'{str(aman.timestamps[0])[:5]}')
filename = filename.replace('{obsid}', aman.obs_info.obs_id)
det = aman.dets.vals[0]
ufm = det.split('_')[2]
plot_4f_2f_counts(aman, filename=filename.replace('{name}', f'{ufm}_4f_2f_counts'))
plot_hwpss_fit_status(aman, proc_aman[self.calc_cfgs["hwpss_stats_name"]], filename=filename.replace('{name}', f'{ufm}_hwpss_stats'))
@classmethod
def gen_metric(cls, meta, proc_aman):
""" Generate a QA metric for the coefficients of the HWPSS fit.
Coefficient percentiles and mean are recorded for every mode and detset.
Arguments
---------
meta : AxisManager
The full metadata container.
proc_aman : AxisManager
The metadata containing just the output of this process.
Returns
-------
line : dict
InfluxDB line entry elements to be fed to
`site_pipeline.monitor.Monitor.record`
"""
# record one metric per wafer_slot per bandpass
# add specified tags
from ..qa.metrics import _get_tag, _has_tag
tag_keys = {
"wafer_slot": "wafer_slot",
"tel_tube": "tel_tube",
}
if _has_tag(meta.det_info, 'wafer.bandpass'):
bandpasses = meta.det_info.wafer.bandpass
tag_keys["bandpass"] = "wafer.bandpass"
else:
bandpasses = meta.det_info.det_cal.bandpass
tag_keys["bandpass"] = "det_cal.bandpass"
tags = []
vals = []
import re
for bp in np.unique(bandpasses):
for ws in np.unique(meta.det_info.wafer_slot):
subset = np.where(
(meta.det_info.wafer_slot == ws) & (bandpasses == bp)
)[0]
if len(subset) > 0:
# get the coefficients for every detector
coeff = proc_aman.hwpss_stats.coeffs[subset]
# mask those that were not set
nonzero = np.any(coeff != 0.0, axis=1)
# calculate amplitude of each mode
mode_labels = list(proc_aman.hwpss_stats.modes.vals)
num_re = re.compile(r"^[SC](\d+)$")
nums = sorted(list(set([num_re.match(l).group(1) for l in mode_labels])))
coeff_amp = np.zeros((coeff.shape[0], len(nums)), coeff.dtype)
amp_labels = []
for i, n in enumerate(nums):
c_ind = mode_labels.index(f"C{n}")
s_ind = mode_labels.index(f"S{n}")
coeff_amp[:, i] = np.sqrt(coeff[:, c_ind]**2 + coeff[:, s_ind]**2)
amp_labels.append(f"A{n}")
# record percentiles over detectors and fraction of samples flagged
perc = np.percentile(coeff_amp[nonzero], cls._influx_percentiles, axis=0)
mean = coeff_amp[nonzero].mean(axis=0)
tags_base = {
k: _get_tag(meta.det_info, i, subset[0]) for k, i in tag_keys.items() if _has_tag(meta.det_info, i)
}
tags_base["telescope"] = meta.obs_info.telescope
# loop over percentiles and coefficient labels
for pi, p in enumerate(cls._influx_percentiles):
for l in amp_labels:
t_new = tags_base.copy()
t_new.update({"mode": l, "det_stat": f"percentile_{p}"})
tags.append(t_new)
vals += list(perc[pi])
# finally also record the mean
for l in amp_labels:
t_new = tags_base.copy()
t_new.update({"mode": l, "det_stat": "mean"})
tags.append(t_new)
vals += list(mean)
obs_time = [meta.obs_info.timestamp] * len(tags)
return {
"field": cls._influx_field,
"values": vals,
"timestamps": obs_time,
"tags": tags,
}
[docs]
class SubtractHWPSS(_Preprocess):
"""Subtracts a HWPSS template from signal.
Example config block::
- name: "subtract_hwpss"
hwpss_stats: "hwpss_stats"
process:
subtract_name: "hwpss_remove"
.. autofunction:: sotodlib.hwp.hwp.subtract_hwpss
"""
name = "subtract_hwpss"
def __init__(self, step_cfgs):
self.hwpss_stats = step_cfgs.get('hwpss_stats', 'hwpss_stats')
self.save_name = None
super().__init__(step_cfgs)
def process(self, aman, proc_aman, sim=False, data_aman=None):
if data_aman is not None:
raise NotImplementedError("No support for using data AxisManager in process")
if not(proc_aman[self.hwpss_stats] is None):
modes = [int(m[1:]) for m in proc_aman[self.hwpss_stats].modes.vals[::2]]
if sim:
hwpss_stats = hwp.get_hwpss(aman, merge_stats=False, merge_model=False,
modes=modes)
template = hwp.harms_func(aman.hwp_angle, modes, hwpss_stats.coeffs)
else:
template = hwp.harms_func(aman.hwp_angle, modes,
proc_aman[self.hwpss_stats].coeffs)
if 'hwpss_model' in aman._fields:
aman.move('hwpss_model', None)
aman.wrap('hwpss_model', template, [(0, 'dets'), (1, 'samps')])
hwp.subtract_hwpss(
aman,
subtract_name = self.process_cfgs["subtract_name"]
)
return aman, proc_aman
class A2Stats(_Preprocess):
"""
Calculate statistical metrics for A2, the 2f-demodulated Q and U signals.
Takes the following ``calc`` config options:
:stat_names: (*list*) List of strings identifying which statistics to calculate.
Refer to ``sotodlib.tod_ops.flags.get_stats`` (below) for available stats.
Default is ``["mean", "median", "var", "ptp"]``.
:subscan: (*bool*) Whether to calculate stats for each subscan separately.
Default is ``False``.
Example config block::
- name: "a2_stats"
calc:
stat_names: ["mean", "median", "var", "ptp"]
subscan: True
save: True
.. autofunction:: sotodlib.hwp.hwp.demod_tod
.. autofunction:: sotodlib.tod_ops.flags.get_stats
"""
name = "a2_stats"
def __init__(self, step_cfgs):
self.save_name = 'a2_stats'
super().__init__(step_cfgs)
def calc_and_save(self, aman, proc_aman):
# Get A2 signal using the demod_tod function
_, demodQ, demodU = hwp.demod_tod(aman, demod_mode=2, wrap=False)
# Compute A2 stats
stat_names = self.calc_cfgs.get("stat_names", ["mean", "median", "var", "ptp"])
split_subscans = self.calc_cfgs.get("subscan", False)
# Q
a2stats_aman = tod_ops.flags.get_stats(aman, demodQ, stat_names=stat_names, split_subscans=split_subscans)
for sn in stat_names:
a2stats_aman.move(sn, f"{sn}Q")
# U
a2stats_aman.merge(tod_ops.flags.get_stats(aman, demodU, stat_names=stat_names, split_subscans=split_subscans))
for sn in stat_names:
a2stats_aman.move(sn, f"{sn}U")
self.save(proc_aman, a2stats_aman)
return aman, proc_aman
def save(self, proc_aman, a2_stats):
if self.save_cfgs is None:
return
if self.save_cfgs:
proc_aman.wrap(self.save_name, a2_stats)
[docs]
class Apodize(_Preprocess):
"""Apodize the edges of a signal. All process configs go to `apodize_cosine`
.. autofunction:: sotodlib.tod_ops.apodize.apodize_cosine
"""
name = "apodize"
def __init__(self, step_cfgs):
self.save_name = None
super().__init__(step_cfgs)
def process(self, aman, proc_aman, sim=False, data_aman=None):
if data_aman is not None:
raise NotImplementedError("No support for using data AxisManager in process")
tod_ops.apodize.apodize_cosine(aman, **self.process_cfgs)
return aman, proc_aman
[docs]
class Demodulate(_Preprocess):
"""
Demodulate the TOD. All process configs go to ``demod_tod``.
Example config block::
- name: "demodulate"
process:
trim_samps: 6000
demod_cfgs:
bpf_cfg: {'type': 'sine2', 'center': 8, 'width': 3.8, 'trans_width': 0.1}
lpf_cfg: {'type': 'sine2', 'cutoff': 1.9, 'trans_width': 0.1}
If you want to set filters with respect to actual HWP rotation frequency,
you can pass strings like below. ``*`` is needed after the number you want
to multiply HWP freq by::
- name: "demodulate"
process:
trim_samps: 6000
demod_cfgs:
# You can set float number or str (i.e., ``'4*f_HWP'``) as configs
bpf_cfg: {'type': 'sine2', 'center': '4*f_HWP', 'width': '3.8*f_HWP', 'trans_width': 0.1}
lpf_cfg: {'type': 'sine2', 'cutoff': '1.9*f_HWP', 'trans_width': 0.1}
.. autofunction:: sotodlib.hwp.hwp.demod_tod
"""
name = "demodulate"
def __init__(self, step_cfgs):
self.save_name = None
super().__init__(step_cfgs)
def process(self, aman, proc_aman, sim=False, data_aman=None):
if data_aman is not None:
raise NotImplementedError("No support for using data AxisManager in process")
hwp.demod_tod(aman, **self.process_cfgs["demod_cfgs"])
if self.process_cfgs.get("trim_samps"):
trim = self.process_cfgs["trim_samps"]
proc_aman.restrict('samps', (aman.samps.offset + trim,
aman.samps.offset + aman.samps.count - trim))
aman.restrict('samps', (aman.samps.offset + trim,
aman.samps.offset + aman.samps.count - trim))
if 'frequency_cutoffs' in proc_aman:
hwp_freq = (np.sum(np.abs(np.diff(np.unwrap(aman.hwp_angle)))) /
(aman.timestamps[-1] - aman.timestamps[0])) / (2 * np.pi)
lpf_cfg = self.process_cfgs["demod_cfgs"].get("lpf_cfg", None)
if lpf_cfg is not None:
for k, v in lpf_cfg.items():
if k == 'cutoff':
if isinstance(v, str):
freq_cutoff = hwp_freq*float(v.split('*')[0])
else:
freq_cutoff = v
else:
freq_cutoff = 0.95*hwp_freq
if 'dsT' in proc_aman['frequency_cutoffs']:
proc_aman['frequency_cutoffs'].move('dsT', None)
proc_aman['frequency_cutoffs'].wrap('dsT', freq_cutoff)
if 'demodQ' in proc_aman['frequency_cutoffs']:
proc_aman['frequency_cutoffs'].move('demodQ', None)
proc_aman['frequency_cutoffs'].wrap('demodQ', freq_cutoff)
if 'demodU' in proc_aman['frequency_cutoffs']:
proc_aman['frequency_cutoffs'].move('demodU', None)
proc_aman['frequency_cutoffs'].wrap('demodU', freq_cutoff)
return aman, proc_aman
class AzSS(_Preprocess):
"""Estimates Azimuth Synchronous Signal (AzSS) by binning signal by azimuth of boresight and subtract.
All process configs go to ``get_azss``. If ``method`` is 'interpolate', no fitting applied
and binned signal is directly used as AzSS model. If ``method`` is 'fit', Legendre polynominal
fitting will be applied and used as AzSS model. If ``subtract`` is True in process, subtract AzSS model
from signal in place.
Example configuration block::
- name: "azss"
calc:
signal: 'demodQ'
azss_stats_name: 'azss_statsQ'
azrange: [-1.57079, 7.85398]
bins: 1080
flags: 'glitch_flags'
merge_stats: True
merge_model: False
save: True
process:
subtract: True
If we estimate and subtract azss in left going scans only,
make union of glitch_flags and scan_flags first
- name : "union_flags"
process:
flag_labels: ['glitches.glitch_flags', 'turnaround_flags.right_scan']
total_flags_label: 'glitch_flags_left'
- name: "azss"
calc:
signal: 'demodQ'
azss_stats_name: 'azss_statsQ_left'
azrange: [-1.57079, 7.85398]
bins: 1080
flags: 'glitch_flags_left'
scan_flags: 'left_scan'
merge_stats: True
merge_model: False
save: True
process:
subtract: True
.. autofunction:: sotodlib.tod_ops.azss.get_azss
"""
name = "azss"
def __init__(self, step_cfgs):
self.save_name = step_cfgs.get('calc', {}).get("azss_stats_name")
super().__init__(step_cfgs)
def calc_and_save(self, aman, proc_aman):
if self.process_cfgs:
self.save(proc_aman, aman[self.calc_cfgs['azss_stats_name']])
else:
calc_aman, _ = tod_ops.azss.get_azss(aman, **self.calc_cfgs)
self.save(proc_aman, calc_aman)
return aman, proc_aman
def save(self, proc_aman, azss_stats):
if self.save_cfgs is None:
return
if self.save_cfgs:
proc_aman.wrap(self.save_name, azss_stats)
def process(self, aman, proc_aman, sim=False, data_aman=None):
if data_aman is not None:
raise NotImplementedError("No support for using data AxisManager in process")
if 'subtract_in_place' in self.calc_cfgs:
raise ValueError('calc_cfgs.subtract_in_place is not allowed use process_cfgs.subtract')
if self.process_cfgs is None:
# This handles the case if no process configs are passed.
return aman, proc_aman
if self.process_cfgs.get("subtract"):
if self.calc_cfgs.get('azss_stats_name') in proc_aman:
if sim:
tod_ops.azss.get_azss(aman, subtract_in_place=True, **self.calc_cfgs)
else:
tod_ops.azss.subtract_azss(
aman,
proc_aman.get(self.calc_cfgs.get('azss_stats_name')),
signal=self.calc_cfgs.get('signal', 'signal'),
scan_flags=self.calc_cfgs.get('scan_flags'),
method=self.calc_cfgs.get('method', 'interpolate'),
max_mode=self.calc_cfgs.get('max_mode'),
azrange=self.calc_cfgs.get('azrange'),
in_place=True
)
else:
tod_ops.azss.get_azss(aman, subtract_in_place=True, **self.calc_cfgs)
else:
tod_ops.azss.get_azss(aman, **self.calc_cfgs)
return aman, proc_aman
def select(self, meta, proc_aman=None, in_place=True):
if self.select_cfgs is None:
return meta
if proc_aman is None:
proc_aman = meta.preprocess
if 'bad_dets' in proc_aman[self.save_name]:
keep = ~proc_aman[self.save_name]['bad_dets']
else:
keep = np.ones(meta.dets.count, dtype=bool)
if in_place:
meta.restrict("dets", meta.dets.vals[keep])
return meta
else:
return keep
class SubtractAzSSTemplate(_Preprocess):
"""Subtract Azimuth Synchronous Signal (AzSS) common template.
Make common template by weighted mean or pca.
This requires to calculate AzSS beforehand.
Example configuration block::
- name: "subtract_azss_template"
process:
signal: 'signal'
azss: 'azss_stats_left'
method: 'interpolate'
scan_flags: 'left_scan'
pca_modes: 1
subtract: True
.. autofunction:: sotodlib.tod_ops.azss.subtract_azss_template
"""
name = "subtract_azss_template"
def __init__(self, step_cfgs):
self.save_name = None
super().__init__(step_cfgs)
def process(self, aman, proc_aman, sim=False, data_aman=None):
if data_aman is not None:
raise NotImplementedError("No support for using data AxisManager in process")
process_cfgs = copy.deepcopy(self.process_cfgs)
if sim:
process_cfgs["azss"] = proc_aman.get(process_cfgs["azss"])
tod_ops.azss.subtract_azss_template(aman, **process_cfgs)
return aman, proc_aman
[docs]
class GlitchFill(_Preprocess):
"""Fill glitches. All process configs go to `fill_glitches`.
Notes on flags. If flags are provided as step_cfgs, `proc_aman.get(flags)` is used.
If provided as process_cfgs, `aman.get(glitch_flags)` is used instead.
Example configuration block::
- name: "glitchfill"
signal: "hwpss_remove"
flags: "glitches.glitch_flags" # optional
process:
nbuf: 10
use_pca: False
modes: 1
in_place: True
glitch_flags: "glitch_flags"
wrap: None
.. autofunction:: sotodlib.tod_ops.gapfill.fill_glitches
"""
name = "glitchfill"
def __init__(self, step_cfgs):
self.signal = step_cfgs.get('signal', 'signal')
self.flags = step_cfgs.get('flags')
self.save_name = None
super().__init__(step_cfgs)
def process(self, aman, proc_aman, sim=False, data_aman=None):
if data_aman is not None:
raise NotImplementedError("No support for using data AxisManager in process")
if self.flags is not None:
glitch_flags=proc_aman.get(self.flags)
tod_ops.gapfill.fill_glitches(
aman, signal=aman[self.signal],
glitch_flags=glitch_flags,
**self.process_cfgs)
else:
tod_ops.gapfill.fill_glitches(
aman, signal=aman[self.signal],
**self.process_cfgs)
return aman, proc_aman
[docs]
class FlagTurnarounds(_Preprocess):
"""From the Azimuth encoder data, flag turnarounds, left-going, and right-going.
All process configs go to ``get_turnaround_flags``. If the ``method`` key
is not included in the preprocess config file calc configs then it will
default to 'scanspeed'.
.. autofunction:: sotodlib.tod_ops.flags.get_turnaround_flags
"""
name = 'flag_turnarounds'
def __init__(self, step_cfgs):
self.save_name = "turnaround_flags"
super().__init__(step_cfgs)
def calc_and_save(self, aman, proc_aman):
if self.calc_cfgs is None:
self.calc_cfgs = {}
self.calc_cfgs['method'] = 'scanspeed'
elif not('method' in self.calc_cfgs):
self.calc_cfgs['method'] = 'scanspeed'
if self.calc_cfgs['method'] == 'scanspeed':
ta, left, right = tod_ops.flags.get_turnaround_flags(aman, **self.calc_cfgs)
calc_aman = core.AxisManager(aman.dets, aman.samps)
calc_aman.wrap('turnarounds', ta, [(0, 'dets'), (1, 'samps')])
calc_aman.wrap('left_scan', left, [(0, 'dets'), (1, 'samps')])
calc_aman.wrap('right_scan', right, [(0, 'dets'), (1, 'samps')])
if self.calc_cfgs['method'] == 'az':
ta = tod_ops.flags.get_turnaround_flags(aman, **self.calc_cfgs)
calc_aman = core.AxisManager(aman.dets, aman.samps)
calc_aman.wrap('turnarounds', ta, [(0, 'dets'), (1, 'samps')])
if ('merge_subscans' not in self.calc_cfgs) or (self.calc_cfgs['merge_subscans']):
calc_aman.wrap('subscan_info', aman.subscan_info)
self.save(proc_aman, calc_aman)
return aman, proc_aman
def save(self, proc_aman, turn_aman):
if self.save_cfgs is None:
return
if self.save_cfgs:
proc_aman.wrap(self.save_name, turn_aman)
def process(self, aman, proc_aman, sim=False, data_aman=None):
if data_aman is not None:
raise NotImplementedError("No support for using data AxisManager in process")
tod_ops.flags.get_turnaround_flags(aman, **self.process_cfgs)
return aman, proc_aman
[docs]
class SubPolyf(_Preprocess):
"""Fit TOD in each subscan with polynominal of given order and subtract it.
All process configs go to `sotodlib.tod_ops.sub_polyf`.
.. autofunction:: sotodlib.tod_ops.subscan_polyfilter
"""
name = 'sub_polyf'
def __init__(self, step_cfgs):
self.save_name = None
super().__init__(step_cfgs)
def process(self, aman, proc_aman, sim=False, data_aman=None):
if data_aman is not None:
raise NotImplementedError("No support for using data AxisManager in process")
tod_ops.sub_polyf.subscan_polyfilter(aman, **self.process_cfgs)
return aman, proc_aman
[docs]
class DarkDets(_Preprocess):
"""Find dark detectors in the data.
Saves results in proc_aman under the "dark_dets" field.
Example config block::
- name : "dark_dets"
signal: "signal" # optional
calc: True
save: True
select: True
.. autofunction:: sotodlib.tod_ops.flags.get_dark_dets
"""
name = "dark_dets"
def __init__(self, step_cfgs):
self.save_name = "darks"
super().__init__(step_cfgs)
def calc_and_save(self, aman, proc_aman):
mskdarks = tod_ops.flags.get_dark_dets(aman, merge=False)
dark_aman = core.AxisManager(aman.dets, aman.samps)
dark_aman.wrap('darks', mskdarks, [(0, 'dets'), (1, 'samps')])
self.save(proc_aman, dark_aman)
return aman, proc_aman
def save(self, proc_aman, dark_aman):
if self.save_cfgs is None:
return
if self.save_cfgs:
proc_aman.wrap(self.save_name, dark_aman)
def select(self, meta, proc_aman=None, in_place=True):
if self.select_cfgs is None:
return meta
if proc_aman is None:
proc_aman = meta.preprocess
keep = ~has_all_cut(proc_aman.darks.darks)
if in_place:
meta.restrict("dets", meta.dets.vals[keep])
return meta
else:
return keep
class LoadPremadeFlags(_Preprocess):
"""Load premade flags from aman.
Saves results in proc_aman under the "premade_flags_name" field of aman.
In addtion, you can select detectors based on the loaded flags.
This is mainly used for simulation for planet mapmaking.
When simulated planet is made, we need corresponding flags
to carry out the same preprocessing as the real planet mapmaking.
This class is useful whenever you want to premake custum flags and use it.
E.g., if aman contains 'sim_jupiter_flag', you can load it to proc_aman as follows:
Example config block::
- name : "load_premade_flags"
load_premade_flag_name: 'sim_jupiter_flag'
calc:
inv_flag: True
save: True
select: # optional
kind: "any"
invert: True
"""
name = "load_premade_flags"
def __init__(self, step_cfgs):
self.premade_flags_name = step_cfgs.get('load_premade_flag_name', 'premade_flags')
self.save_name = self.premade_flags_name
super().__init__(step_cfgs)
def calc_and_save(self, aman, proc_aman):
print('flags_name', self.premade_flags_name)
premade_flags = aman.flags.get(self.premade_flags_name)
source_aman = core.AxisManager(aman.dets, aman.samps)
source_aman.wrap(self.premade_flags_name, premade_flags, [(0, 'dets'), (1, 'samps')])
if self.calc_cfgs.get('inv_flag'):
source_aman.wrap(self.premade_flags_name + '_inv',
RangesMatrix.from_mask(~premade_flags.mask()),
[(0, 'dets'), (1, 'samps')])
self.save(proc_aman, source_aman)
return aman, proc_aman
def save(self, proc_aman, source_aman):
if self.save_cfgs is None:
return
if self.save_cfgs:
proc_aman.wrap(self.save_name, source_aman)
def select(self, meta, proc_aman=None, in_place=True):
if self.select_cfgs is None:
return meta
if proc_aman is None:
proc_aman = meta.preprocess
keep = flag_cut_select(proc_aman[self.premade_flags_name][self.premade_flags_name],
kind=self.select_cfgs.get("kind", 'all'),
invert=self.select_cfgs.get('invert', False))
if in_place:
meta.restrict("dets", meta.dets.vals[keep])
return meta
else:
return keep
[docs]
class SourceFlags(_Preprocess):
"""Calculate the source flags in the data.
All calculation configs go to `get_source_flags`.
Saves results in proc_aman under the "source_flags" field.
Example config block::
- name : "source_flags"
source_flags_name: "my_source_flags"
calc:
mask: {'shape': 'circle',
'xyr': [0, 0, 1.]}
center_on: ['jupiter', 'moon'] # list of str
res: 20 # arcmin
max_pix: 4000000 # max number of allowed pixels in map
distance: 0 # max distance of footprint from source in degrees
save: True
select: True # optional
select_source: 'jupiter' # list of str or str. If not provided, all sources from center_on are selected.
kind: 'any' # 'any', 'all', or float (0.0 < kind < 1.0)
invert: False # optional, if True logic is filipped.
Examples:
1. invert=False, kind='any' → Select detectors with **no** True flags (e.g., for Moon cut).
2. invert=True, kind='any' → Select detectors with **any** True flags (e.g., for planet selection).
3. invert=False, kind=0.4 → Select detectors with <40% of True flags.
.. autofunction:: sotodlib.tod_ops.flags.get_source_flags
"""
name = "source_flags"
def __init__(self, step_cfgs):
self.source_flags_name = step_cfgs.get('source_flags_name', 'source_flags')
self.save_name = self.source_flags_name
super().__init__(step_cfgs)
def calc_and_save(self, aman, proc_aman):
from sotodlib.coords.planets import SOURCE_LIST
source_name = np.atleast_1d(self.calc_cfgs.get('center_on', 'planet'))
if 'planet' in source_name:
source_list = [x for x in aman.tags if x in SOURCE_LIST]
if len(source_list) == 0:
raise ValueError("No tags match source list")
else:
source_list = [planets.get_source_list_fromstr(isource) for isource in source_name]
# find if source is within footprint + distance
positions = planets.get_nearby_sources(tod=aman, source_list=source_list,
distance=self.calc_cfgs.get('distance', 0))
source_aman = core.AxisManager(aman.dets, aman.samps)
for p in positions:
center_on = planets.get_source_list_fromstr(p[0])
source_flags = tod_ops.flags.get_source_flags(aman,
merge=self.calc_cfgs.get('merge', False),
overwrite=self.calc_cfgs.get('overwrite', True),
source_flags_name=self.calc_cfgs.get('source_flags_name', None),
mask=self.calc_cfgs.get('mask', None),
center_on=center_on,
res=self.calc_cfgs.get('res', None),
max_pix=self.calc_cfgs.get('max_pix', None))
source_aman.wrap(p[0], source_flags, [(0, 'dets'), (1, 'samps')])
# if inv_flag is set, add inverse of source flags
if self.calc_cfgs.get('inv_flag'):
source_aman.wrap(p[0]+ '_inv',
RangesMatrix.from_mask(~source_flags.mask()),
[(0, 'dets'), (1, 'samps')])
# add sources that were not nearby from source list
for source in source_name:
if source not in source_aman._fields:
source_aman.wrap(source, RangesMatrix.zeros([aman.dets.count, aman.samps.count]),
[(0, 'dets'), (1, 'samps')])
if self.calc_cfgs.get('inv_flag'):
source_aman.wrap(source + '_inv',
RangesMatrix.ones([aman.dets.count, aman.samps.count]),
[(0, 'dets'), (1, 'samps')])
self.save(proc_aman, source_aman)
return aman, proc_aman
def save(self, proc_aman, source_aman):
if self.save_cfgs is None:
return
if self.save_cfgs:
proc_aman.wrap(self.save_name, source_aman)
def select(self, meta, proc_aman=None, in_place=True):
if self.select_cfgs is None:
return meta
if proc_aman is None:
source_flags = meta.preprocess.source_flags
else:
source_flags = proc_aman[self.save_name]
if isinstance(self.select_cfgs, bool):
if self.select_cfgs:
select_list = np.atleast_1d(self.calc_cfgs.get('center_on', 'planet'))
else:
if in_place:
return meta
else:
return np.ones(meta.dets.count, dtype=bool)
else:
select_list = np.atleast_1d(self.select_cfgs.get("select_source", self.calc_cfgs.get('center_on', 'planet')))
if 'planet' in select_list:
from sotodlib.coords.planets import SOURCE_LIST
select_list = [x for x in meta.tags if x in SOURCE_LIST]
if len(select_list) == 0:
raise ValueError("No tags match source list")
if isinstance(self.select_cfgs, bool):
inverts = False
else:
inverts = self.select_cfgs.get("invert", False)
if isinstance(inverts, bool):
inverts = [inverts]*len(select_list)
elif len(inverts) != len(select_list):
raise ValueError("Length of intert must match length of select_source, or just bool")
if isinstance(self.select_cfgs, bool):
kinds = "all"
else:
kinds = self.select_cfgs.get("kind", 'all') # default of 'all' is for backward compatibility
if isinstance(kinds, (str, float)):
kinds = [kinds]*len(select_list)
elif len(kinds) != len(select_list):
raise ValueError("Length of kinds must match length of select_source, or just str")
keep_all = np.ones(meta.dets.count, dtype=bool)
for source, kind, invert in zip(select_list, kinds, inverts):
if source in source_flags._fields:
keep_all &= flag_cut_select(source_flags[source], kind, invert=invert)
if in_place:
meta.restrict("dets", meta.dets.vals[keep_all])
return meta
else:
return keep_all
class HWPAngleModel(_Preprocess):
"""Apply hwp angle model to the TOD.
Saves results in proc_aman under the "hwp_angle" field.
Example config block::
- name : "hwp_angle_model"
process: True
calc:
on_sign_ambiguous: 'fail'
save: True
.. autofunction:: sotodlib.hwp.hwp_angle_model.apply_hwp_angle_model
"""
name = "hwp_angle_model"
def __init__(self, step_cfgs):
self.save_name = "hwp_angle"
super().__init__(step_cfgs)
def process(self, aman, proc_aman, sim=False, data_aman=None):
if data_aman is not None:
raise NotImplementedError("No support for using data AxisManager in process")
if (not 'hwp_angle' in aman._fields) and ('hwp_angle' in proc_aman._fields):
aman.wrap('hwp_angle', proc_aman['hwp_angle']['hwp_angle'],
[(0, 'samps')])
else:
hwp_angle_model.apply_hwp_angle_model(aman, **self.calc_cfgs)
return aman, proc_aman
def calc_and_save(self, aman, proc_aman):
if (not 'hwp_angle' in aman._fields):
hwp_angle_model.apply_hwp_angle_model(aman, **self.calc_cfgs)
hwp_angle_aman = core.AxisManager(aman.samps)
hwp_angle_aman.wrap('hwp_angle', aman.hwp_angle, [(0, 'samps')])
self.save(proc_aman, hwp_angle_aman)
return aman, proc_aman
def save(self, proc_aman, hwp_angle_aman):
if self.save_cfgs is None:
return
if self.save_cfgs:
proc_aman.wrap(self.save_name, hwp_angle_aman)
[docs]
class FourierFilter(_Preprocess):
"""
Applies a chain of Fourier filters (defined in fft_ops) to the data.
Example config file entry for one filter::
- name: "fourier_filter"
process:
filt_function: "timeconst_filter"
filter_params:
timeconst: "det_cal.tau_eff"
invert: True
Example for passing in a different signal name and wrapping into a new
field::
- name: "fourier_filter"
wrap_name: "lpf_demodQ"
signal_name: "demodQ"
process:
filt_function: "sine2"
filter_params:
cutoff: 1
trans_width: 0.1
Example config file entry for two filters::
- name: "fourier_filter"
process:
filters:
- name: "iir_filter"
filter_params:
invert: True
- name: "timeconst_filter"
filter_params:
timeconst: "det_cal.tau_eff"
invert: True
Or with params from a noise fit::
- name: "fourier_filter"
process:
noise_fit_array: "noiseQ_fit"
filters:
- name: "iir_filter"
filter_params:
invert: True
- name: "timeconst_filter"
filter_params:
timeconst: "det_cal.tau_eff"
invert: True
See :ref:`fourier-filters` documentation for more details.
"""
name = 'fourier_filter'
def __init__(self, step_cfgs):
self.signal_name = step_cfgs.get('signal_name', 'signal')
# By default signal is overwritted by the filtered signal
self.wrap_name = step_cfgs.get('wrap_name', 'signal')
self.save_name = None
super().__init__(step_cfgs)
def process(self, aman, proc_aman, sim=False, data_aman=None):
if data_aman is not None:
raise NotImplementedError("No support for using data AxisManager in process")
field = self.process_cfgs.get("noise_fit_array", None)
if field:
noise_fit = proc_aman[field]
else:
noise_fit = None
filt_list = self.process_cfgs.get("filters", None)
if filt_list is None:
filt_list = [{
"name": self.process_cfgs.get("filt_function", "high_pass_butter4"),
"filter_params": self.process_cfgs.get("filter_params", None)
}]
filters = []
for spec in filt_list:
fname = spec.get("name")
params = tod_ops.fft_ops.build_filt_params_dict(
fname,
noise_fit=noise_fit,
filter_params=spec.get("filter_params")
)
ffun = getattr(tod_ops.filters, fname)
filters.append(ffun(**params))
filt = tod_ops.filters.FilterChain(filters)
filt_tod = tod_ops.filters.fourier_filter(aman, filt,
signal_name=self.signal_name)
if self.wrap_name in aman._fields:
aman.move(self.wrap_name, None)
aman.wrap(self.wrap_name, filt_tod, [(0, 'dets'), (1, 'samps')])
if self.process_cfgs.get("trim_samps"):
trim = self.process_cfgs["trim_samps"]
aman.restrict('samps', (aman.samps.offset + trim,
aman.samps.offset + aman.samps.count - trim))
proc_aman.restrict('samps', (proc_aman.samps.offset + trim,
proc_aman.samps.offset + proc_aman.samps.count - trim))
return aman, proc_aman
class DetcalNanCuts(_Preprocess):
"""
Remove detectors with NaN values in the specified det_cal metadata fields.
Example config file entry::
- name: "detcal_nan_cuts"
select:
fields: [tau_eff, phase_to_pW]
"""
name = 'detcal_nan_cuts'
def __init__(self, step_cfgs):
self.save_name = None
super().__init__(step_cfgs)
def select(self, meta, proc_aman=None, in_place=True):
if self.select_cfgs is None:
return meta
if proc_aman is None:
proc_aman = meta.preprocess
select_fields = self.select_cfgs.get("fields")
keep = np.ones(meta.dets.count, dtype=bool)
for field in select_fields:
keep &= ~np.isnan(meta.det_cal[field])
if in_place:
meta.restrict('dets', meta.dets.vals[keep])
else:
return keep
[docs]
class PCARelCal(_Preprocess):
"""
Estimate the relcal factor from the atmosphere using PCA.
Example configuration file entry::
- name: 'pca_relcal'
signal: 'lpf_sig'
pca_run: 'run1'
calc:
pca:
xfac: 2
yfac: 1.5
calc_good_medianw: True
lpf:
type: "sine2"
cutoff: 1
trans_width: 0.1
trim_samps: 2000
save: True
plot:
plot_ds_factor: 20
See :ref:`pca-background` for more details on the method.
"""
name = 'pca_relcal'
def __init__(self, step_cfgs):
self.signal = step_cfgs.get('signal', 'signal')
self.run = step_cfgs.get('pca_run', 'run1')
self.bandpass = step_cfgs.get('bandpass_key', 'wafer.bandpass')
self.run_name = f'{self.signal}_{self.run}'
self.save_name = self.run_name
super().__init__(step_cfgs)
def calc_and_save(self, aman, proc_aman):
self.plot_signal = self.signal
if self.calc_cfgs.get("lpf") is not None:
filt = tod_ops.filters.get_lpf(self.calc_cfgs.get("lpf"))
filt_tod = tod_ops.fourier_filter(aman, filt, signal_name='signal')
filt_aman = core.AxisManager(aman.dets, aman.samps)
filt_aman.wrap(self.signal, filt_tod, [(0, 'dets'), (1, 'samps')])
if self.calc_cfgs.get("trim_samps") is not None:
trim = self.calc_cfgs["trim_samps"]
filt_aman.restrict('samps', (filt_aman.samps.offset + trim,
filt_aman.samps.offset + filt_aman.samps.count - trim))
if self.plot_cfgs:
self.plot_signal = filt_aman[self.signal]
bands = np.unique(aman.det_info[self.bandpass])
bands = bands[bands != 'NC']
# align samps w/ proc_aman to include samps restriction when loading back from db.
rc_aman = core.AxisManager(proc_aman.dets, proc_aman.samps)
pca_det_mask = np.full(aman.dets.count, False, dtype=bool)
relcal = np.zeros(aman.dets.count)
pca_weight0 = np.zeros(aman.dets.count)
for band in bands:
m0 = aman.det_info[self.bandpass] == band
if self.plot_cfgs is not None:
rc_aman.wrap(f'{band}_idx', m0, [(0, 'dets')])
band_aman = aman.restrict('dets', aman.dets.vals[m0], in_place=False)
filt_aman = filt_aman.restrict('dets', aman.dets.vals[m0], in_place=False)
band_aman.merge(filt_aman)
pca_out = tod_ops.pca.get_pca(band_aman,signal=band_aman[self.signal])
pca_signal = tod_ops.pca.get_pca_model(band_aman, pca_out,
signal=band_aman[self.signal])
if self.calc_cfgs.get("pca") is None:
result_aman = tod_ops.pca.pca_cuts_and_cal(band_aman, pca_signal)
else:
result_aman = tod_ops.pca.pca_cuts_and_cal(band_aman, pca_signal, **self.calc_cfgs.get("pca"))
pca_det_mask[m0] = np.logical_or(pca_det_mask[m0], result_aman['pca_det_mask'])
relcal[m0] = result_aman['relcal']
pca_weight0[m0] = result_aman['pca_weight0']
if self.plot_cfgs is not None:
rc_aman.wrap(f'{band}_pca_mode0', result_aman['pca_mode0'], [(0, 'samps')])
rc_aman.wrap(f'{band}_xbounds', result_aman['xbounds'])
rc_aman.wrap(f'{band}_ybounds', result_aman['ybounds'])
rc_aman.wrap(f'{band}_median', result_aman['median'])
rc_aman.wrap('pca_det_mask', pca_det_mask, [(0, 'dets')])
rc_aman.wrap('relcal', relcal, [(0, 'dets')])
rc_aman.wrap('pca_weight0', pca_weight0, [(0, 'dets')])
self.save(proc_aman, rc_aman)
return aman, proc_aman
def save(self, proc_aman, pca_aman):
if self.save_cfgs is None:
return
if self.save_cfgs:
proc_aman.wrap(self.save_name, pca_aman)
def select(self, meta, proc_aman=None, in_place=True):
if self.select_cfgs is None:
return meta
if proc_aman is None:
proc_aman = meta.preprocess
keep = ~proc_aman[self.run_name]['pca_det_mask']
if in_place:
meta.restrict("dets", meta.dets.vals[keep])
return meta
else:
return keep
def plot(self, aman, proc_aman, filename):
if self.plot_cfgs is None:
return
if self.plot_cfgs:
from .preprocess_plot import plot_pcabounds
filename = filename.replace('{ctime}', f'{str(aman.timestamps[0])[:5]}')
filename = filename.replace('{obsid}', aman.obs_info.obs_id)
det = aman.dets.vals[0]
ufm = det.split('_')[2]
bands = np.unique(aman.det_info[self.bandpass])
bands = bands[bands != 'NC']
for band in bands:
if f'{band}_pca_mode0' in proc_aman[self.run_name]:
pca_aman = aman.restrict('dets', aman.dets.vals[proc_aman[self.run_name][f'{band}_idx']], in_place=False)
if self.calc_cfgs.get("lpf") is not None:
trim = self.calc_cfgs["trim_samps"]
pca_aman.restrict('samps', (pca_aman.samps.offset + trim,
pca_aman.samps.offset + pca_aman.samps.count - trim))
band_aman = proc_aman[self.run_name].restrict('dets', aman.dets.vals[proc_aman[self.run_name][f'{band}_idx']], in_place=False)
plot_pcabounds(pca_aman, band_aman, filename=filename.replace('{name}', f'{ufm}_{band}_pca'), signal=self.plot_signal, band=band, plot_ds_factor=self.plot_cfgs.get('plot_ds_factor', 20))
proc_aman[self.run_name].move(f'{band}_idx', None)
proc_aman[self.run_name].move(f'{band}_pca_mode0', None)
proc_aman[self.run_name].move(f'{band}_xbounds', None)
proc_aman[self.run_name].move(f'{band}_ybounds', None)
proc_aman[self.run_name].move(f'{band}_median', None)
class PCAFilter(_Preprocess):
"""
Applies a pca filter to the data. `model_signal` is used to calculate
the PCA modes, which are then subtracted from `signal`. If `model_signal`
is not provided, `signal` is used for both. An example use case is to
use a low-pass filtered version of the signal to calculate the PCA modes.
example config file entry::
- name: "pca_filter"
model_signal: "lpf_signal" # optional, if not provided, use signal
signal: "signal"
process:
n_modes: 10
See :ref:`pca-background` for more details on the method.
"""
name = 'pca_filter'
def __init__(self, step_cfgs):
self.signal = step_cfgs.get('signal', 'signal')
self.model_signal = step_cfgs.get('model_signal', None)
self.save_name = None
super().__init__(step_cfgs)
def process(self, aman, proc_aman, sim=False, data_aman=None):
if data_aman is not None:
raise NotImplementedError("No support for using data AxisManager in process")
n_modes = self.process_cfgs.get('n_modes')
signal = aman.get(self.signal)
if self.model_signal:
model_signal = aman.get(self.model_signal)
else:
model_signal = signal
if aman.dets.count < n_modes:
raise ValueError(f'The number of pca modes {n_modes} is '
f'larger than the number of detectors {aman.dets.count}.')
model = tod_ops.pca.get_pca_model(aman, signal=model_signal, n_modes=n_modes)
_ = tod_ops.pca.add_model(aman, model, signal=signal, scale=-1)
return aman, proc_aman
class GetCommonMode(_Preprocess):
"""
Calculate common mode.
example config file entry::
- name: "get_common_mode"
wrap_name: "common_demodQ"
calc:
noise_fit: True
f_max: 2.0
signal: "signal"
method: "median"
save: True
If ``noise_fit`` is True, the 1/f noise fit parameters of the common mode
is wrapped together.
.. autofunction:: sotodlib.tod_ops.pca.get_common_mode
"""
name = 'get_common_mode'
def __init__(self, step_cfgs):
self.wrap_name = step_cfgs.get('wrap_name', 'common_mode')
self.save_name = self.wrap_name
super().__init__(step_cfgs)
def calc_and_save(self, aman, proc_aman):
cmcfgs = copy.deepcopy(self.calc_cfgs)
for k in cmcfgs.keys():
if k not in ['signal', 'method']:
cmcfgs.pop(k)
common_mode = tod_ops.pca.get_common_mode(aman, **cmcfgs)
if self.calc_cfgs['noise_fit']:
common_aman = tod_ops.fft_ops.get_common_noise_params(aman, signal=common_mode,
f_max=self.calc_cfgs['f_max'])
samps = core.OffsetAxis('samps', aman.samps.count)
common_aman.wrap(self.wrap_name, common_mode, [(0, samps)])
else:
common_aman = core.AxisManager(aman.samps)
common_aman.wrap(self.wrap_name, common_mode, [(0, 'samps')])
self.save(proc_aman, common_aman)
return aman, proc_aman
def save(self, proc_aman, common_aman):
if self.save_cfgs is None:
return
if self.save_cfgs:
proc_aman.wrap(self.save_name, common_aman)
class FilterForSources(_Preprocess):
"""
Mask and gap-fill the signal at samples flagged by source_flags.
Then PCA the resulting time ordered data.
example config file entry::
- name: "filter_for_sources"
signal: "signal"
process:
n_modes: 10
source_flags: "source_flags"
edge_guard: 10 # Number of samples to make the first and last flags False
trim_samps: 100
pca_wrap: "pca_model" # optional, if provided, the PCA model is wrapped into aman under this key
.. autofunction:: sotodlib.coords.planets.filter_for_sources
"""
name = 'filter_for_sources'
def __init__(self, step_cfgs):
self.signal = step_cfgs.get('signal', 'signal')
self.save_name = None
super().__init__(step_cfgs)
def process(self, aman, proc_aman, sim=False, data_aman=None):
if data_aman is not None:
raise NotImplementedError("No support for using data AxisManager in process")
n_modes = self.process_cfgs.get('n_modes')
signal = aman.get(self.signal)
flags = aman.flags.get(self.process_cfgs.get('source_flags'))
edge_guard = self.process_cfgs.get('edge_guard')
pca_wrap = self.process_cfgs.get('pca_wrap',None)
if aman.dets.count < n_modes:
raise ValueError(f'The number of pca modes {n_modes} is '
f'larger than the number of detectors {aman.dets.count}.')
planets.filter_for_sources(aman, signal=signal, source_flags=flags, n_modes=n_modes, edge_guard=edge_guard, pca_wrap=pca_wrap)
if self.process_cfgs.get("trim_samps"):
trim = self.process_cfgs["trim_samps"]
proc_aman.restrict('samps', (aman.samps.offset + trim,
aman.samps.offset + aman.samps.count - trim))
aman.restrict('samps', (aman.samps.offset + trim,
aman.samps.offset + aman.samps.count - trim))
return aman, proc_aman
class PTPFlags(_Preprocess):
"""Find detectors with anomalous peak-to-peak signal.
Saves results in proc_aman under the "ptp_flags" field.
Example config block::
- name : "ptp_flags"
calc:
signal_name: "dsT"
kurtosis_threshold: 6
save: True
select: True
.. autofunction:: sotodlib.tod_ops.flags.get_ptp_flags
"""
name = "ptp_flags"
def __init__(self, step_cfgs):
self.save_name = "ptp_flags"
super().__init__(step_cfgs)
def calc_and_save(self, aman, proc_aman):
mskptps = tod_ops.flags.get_ptp_flags(aman, **self.calc_cfgs)
ptp_aman = core.AxisManager(aman.dets, aman.samps)
ptp_aman.wrap('ptp_flags', mskptps, [(0, 'dets'), (1, 'samps')])
self.save(proc_aman, ptp_aman)
return aman, proc_aman
def save(self, proc_aman, ptp_aman):
if self.save_cfgs is None:
return
if self.save_cfgs:
proc_aman.wrap(self.save_name, ptp_aman)
def select(self, meta, proc_aman=None, in_place=True):
if self.select_cfgs is None:
return meta
if proc_aman is None:
proc_aman = meta.preprocess
keep = ~has_all_cut(proc_aman.ptp_flags.ptp_flags)
if in_place:
meta.restrict("dets", meta.dets.vals[keep])
return meta
else:
return keep
class InvVarFlags(_Preprocess):
"""Find detectors with too high inverse variance.
Saves results in proc_aman under the "inv_var_flags" field.
Example config block::
- name : "inv_var_flags"
calc:
signal_name: "demodQ"
nsigma: 6
save: True
select: True
.. autofunction:: sotodlib.tod_ops.flags.get_inv_var_flags
"""
name = "inv_var_flags"
def __init__(self, step_cfgs):
self.save_name = "inv_var_flags"
super().__init__(step_cfgs)
def calc_and_save(self, aman, proc_aman):
msk = tod_ops.flags.get_inv_var_flags(aman, **self.calc_cfgs)
inv_var_aman = core.AxisManager(aman.dets, aman.samps)
inv_var_aman.wrap('inv_var_flags', msk, [(0, 'dets'), (1, 'samps')])
self.save(proc_aman, inv_var_aman)
return aman, proc_aman
def save(self, proc_aman, inv_var_aman):
if self.save_cfgs is None:
return
if self.save_cfgs:
proc_aman.wrap(self.save_name, inv_var_aman)
def select(self, meta, proc_aman=None, in_place=True):
if self.select_cfgs is None:
return meta
if proc_aman is None:
proc_aman = meta.preprocess
keep = ~has_all_cut(proc_aman.inv_var_flags.inv_var_flags)
if in_place:
meta.restrict("dets", meta.dets.vals[keep])
return meta
return keep
class EstimateT2P(_Preprocess):
"""Estimate T to P leakage coefficients.
Saves results in proc_aman under the "t2p" field.
Example config block::
- name : "estimate_t2p"
fit_in_freq : False
calc:
T_sig_name: 'dsT'
Q_sig_name: 'demodQ'
U_sig_name: 'demodU'
joint_fit: True
trim_samps: 2000
lpf_cfgs:
type: 'sine2'
cutoff: 0.5
trans_width: 0.1
flag_name: 'exclude' # a field in aman.flags can combine with union_flags.
save: True
.. autofunction:: sotodlib.tod_ops.t2pleakage.get_t2p_coeffs
"""
name = "estimate_t2p"
def __init__(self, step_cfgs):
self.fit_in_freq = step_cfgs.get('fit_in_freq', False)
self.save_name = "t2p"
super().__init__(step_cfgs)
def calc_and_save(self, aman, proc_aman):
if self.fit_in_freq:
t2p_aman = tod_ops.t2pleakage.get_t2p_coeffs_in_freq(aman, **self.calc_cfgs)
else:
t2p_aman = tod_ops.t2pleakage.get_t2p_coeffs(aman, **self.calc_cfgs)
self.save(proc_aman, t2p_aman)
return aman, proc_aman
def save(self, proc_aman, t2p_aman):
if self.save_cfgs is None:
return
if self.save_cfgs:
proc_aman.wrap(self.save_name, t2p_aman)
def select(self, meta, proc_aman=None, in_place=True):
if self.select_cfgs is None:
return meta
if proc_aman is None:
proc_aman = meta.preprocess
keep = tod_ops.t2pleakage.get_t2p_cuts(meta, t2p_aman=proc_aman.t2p, in_place=False, **self.select_cfgs)
if in_place:
meta.restrict("dets", meta.dets.vals[keep])
return meta
else:
return keep
class SubtractT2P(_Preprocess):
"""Subtract T to P leakage.
Example config block::
- name : "subtract_t2p"
process: {}
.. autofunction:: sotodlib.tod_ops.t2pleakage.subtract_t2p
"""
name = "subtract_t2p"
def __init__(self, step_cfgs):
self.save_name = None
super().__init__(step_cfgs)
def process(self, aman, proc_aman, sim=False, data_aman=None):
if sim and data_aman is not None:
data_aman.restrict("dets", aman.dets.vals)
data_aman.wrap("demodQ", aman.demodQ, [(0, 'dets'), (1, 'samps')], overwrite=True)
data_aman.wrap("demodU", aman.demodU, [(0, 'dets'), (1, 'samps')], overwrite=True)
if self.process_cfgs.get("fit_in_freq"):
t2p_aman = tod_ops.t2pleakage.get_t2p_coeffs_in_freq(
data_aman,
merge_stats=False,
ML_fit=True
)
else:
t2p_aman = tod_ops.t2pleakage.get_t2p_coeffs(
data_aman,
merge_stats=False
)
tod_ops.t2pleakage.subtract_t2p(
aman,
t2p_aman,
T_signal=data_aman.dsT
)
else:
pcfg = copy.deepcopy(self.process_cfgs)
pcfg.pop("fit_in_freq", None)
tod_ops.t2pleakage.subtract_t2p(aman, proc_aman['t2p'],
**pcfg)
return aman, proc_aman
class SplitFlags(_Preprocess):
"""Get flags used for map splitting/bundling.
Saves results in proc_aman under the "split_flags" field.
Example config block::
- name : "split_flags"
calc:
high_gain: 0.115
high_noise: 3.5e-5
high_tau: 1.5e-3
det_A: A
pol_angle: 35
crossover: BL
high_leakage: 1.0e-3
high_2f: 1.5e-3
right_focal_plane: 0
top_focal_plane: 0
central_pixels: 0.071
save: True
.. autofunction:: sotodlib.obs_ops.flags.get_split_flags
"""
name = "split_flags"
def __init__(self, step_cfgs):
self.save_name = "split_flags"
super().__init__(step_cfgs)
def calc_and_save(self, aman, proc_aman):
split_flg_aman = obs_ops.splits.get_split_flags(aman, proc_aman, split_cfg=self.calc_cfgs)
self.save(proc_aman, split_flg_aman)
return aman, proc_aman
def save(self, proc_aman, split_flg_aman):
if self.save_cfgs is None:
return
if self.save_cfgs:
proc_aman.wrap(self.save_name, split_flg_aman)
class UnionFlags(_Preprocess):
"""Do the union of relevant flags for mapping
Typically you would include turnarounds, glitches, etc.
Saves results for aman under the "flags.[total_flags_label]" field.
Example config block::
- name : "union_flags"
process:
flag_labels: ['jumps_2pi.jump_flag', 'glitches.glitch_flags', 'turnaround_flags.turnarounds']
total_flags_label: 'glitch_flags'
"""
name = "union_flags"
def __init__(self, step_cfgs):
self.save_name = None
super().__init__(step_cfgs)
def process(self, aman, proc_aman, sim=False, data_aman=None):
if data_aman is not None:
raise NotImplementedError("No support for using data AxisManager in process")
warnings.warn("UnionFlags function is deprecated and only kept to allow loading of old process archives. Use generalized method CombineFlags")
from so3g.proj import RangesMatrix
total_flags = RangesMatrix.zeros([proc_aman.dets.count, proc_aman.samps.count]) # get an empty flags with shape (Ndets,Nsamps)
for label in self.process_cfgs['flag_labels']:
_label = attrgetter(label)
total_flags += _label(proc_aman) # The + operator is the union operator in this case
if 'flags' not in aman._fields:
from sotodlib.core import FlagManager
aman.wrap('flags', FlagManager.for_tod(aman))
if self.process_cfgs['total_flags_label'] in aman['flags']:
aman['flags'].move(self.process_cfgs['total_flags_label'], None)
aman['flags'].wrap(self.process_cfgs['total_flags_label'], total_flags)
return aman, proc_aman
class CombineFlags(_Preprocess):
"""Do the combination of relevant flags for mapping
Saves results for aman under the "flags.[total_flags_label]" field.
Example config block::
- name : "combine_flags"
process:
flag_labels: ['glitches.glitch_flags', 'source_flags.jupiter_inv']
total_flags_label: 'glitch_flags'
method: 'union' # You can select a method from ['union', '+', 'intersect', '*', 'except', '-'].
#method: ['+', '*'] # Or you can pass individual method for each flags as a list.
# Length of list must match the length of flag_labels.
# If a list, the first method must be '+', as if adding the first flag set to an empty flag set.
# Operations are performed strictly from Left to Right, '*' are not performed first.
"""
name = "combine_flags"
def __init__(self, step_cfgs):
self.save_name = None
super().__init__(step_cfgs)
def process(self, aman, proc_aman, sim=False, data_aman=None):
if data_aman is not None:
raise NotImplementedError("No support for using data AxisManager in process")
from so3g.proj import RangesMatrix
if isinstance(self.process_cfgs['method'], list):
if len(self.process_cfgs['flag_labels']) != len(self.process_cfgs['method']):
raise ValueError("The length of method does not match to the length of flag_labels")
elif any(method not in ['+', 'union', '*', 'intersect', '-', 'except'] for method in self.process_cfgs['method']):
raise ValueError("One or more methods in list are not valid")
elif self.process_cfgs['method'] in ['+', 'union', '*', 'intersect', '-', 'except']:
self.process_cfgs['method'] = ['+'] + (len(self.process_cfgs['flag_labels']) - 1)*[self.process_cfgs['method']]
else:
raise ValueError("The method matches neither list nor the one of the valid operations")
total_flags = RangesMatrix.zeros([proc_aman.dets.count, proc_aman.samps.count]) # get an empty flags with shape (Ndets,Nsamps)
for i, (method, label) in enumerate(zip(self.process_cfgs['method'], self.process_cfgs['flag_labels'])):
_label = attrgetter(label)
if i == 0:
total_flags += _label(proc_aman) # First flags must be added.
if method in ['*', 'intersect']:
warnings.warn("The first method is neither '+' nor 'union'. Interpreted as '+' by default.")
else:
if method in ['+', 'union']:
total_flags += _label(proc_aman) # The + operator is the union operator in this case
elif method in ['*', 'intersect']:
total_flags *= _label(proc_aman) # The * operator is the intersect operator in this case
elif method in ['-', 'except']:
total_flags *= ~ _label(proc_aman) # The - operator is the except operator in this case
if 'flags' not in aman._fields:
from sotodlib.core import FlagManager
aman.wrap('flags', FlagManager.for_tod(aman))
if self.process_cfgs['total_flags_label'] in aman['flags']:
aman['flags'].move(self.process_cfgs['total_flags_label'], None)
aman['flags'].wrap(self.process_cfgs['total_flags_label'], total_flags)
return aman, proc_aman
class RotateFocalPlane(_Preprocess):
""" Interpret the boresight rotation effect as a focal plane rotation
and update them accordingly. This applies constant rotation to focal plane.
If hwp=True, rotations of gamma are reflected. This updates boresight.roll
and focal_plane, in place.
Example config block::
- name : "rotate_focal_plane"
process:
hwp: True
.. autofunction:: sotodlib.coords.demod.rotate_focal_plane
"""
name = "rotate_focal_plane"
def __init__(self, step_cfgs):
self.save_name = None
super().__init__(step_cfgs)
def process(self, aman, proc_aman, sim=False, data_aman=None):
if data_aman is not None:
raise NotImplementedError("No support for using data AxisManager in process")
from sotodlib.coords import demod
demod.rotate_focal_plane(aman, **self.process_cfgs)
return aman, proc_aman
class RotateQU(_Preprocess):
"""Rotate Q and U components to/from telescope coordinates.
Example config block::
- name : "rotate_qu"
process:
sign: 1
offset: 0
update_focal_plane: True
.. autofunction:: sotodlib.coords.demod.rotate_demodQU
"""
name = "rotate_qu"
def __init__(self, step_cfgs):
self.save_name = None
super().__init__(step_cfgs)
def process(self, aman, proc_aman, sim=False, data_aman=None):
if data_aman is not None:
raise NotImplementedError("No support for using data AxisManager in process")
from sotodlib.coords import demod
demod.rotate_demodQU(aman, **self.process_cfgs)
return aman, proc_aman
class SubtractQUCommonMode(_Preprocess):
"""Subtract Q and U common mode.
Example config block::
- name : 'subtract_qu_common_mode'
signal_name_Q: 'demodQ'
signal_name_U: 'demodU'
process: True
calc: True
save: True
.. autofunction:: sotodlib.tod_ops.deproject.subtract_qu_common_mode
"""
name = "subtract_qu_common_mode"
def __init__(self, step_cfgs):
self.signal_name_Q = step_cfgs.get('signal_Q', 'demodQ')
self.signal_name_U = step_cfgs.get('signal_U', 'demodU')
self.save_name = "qu_common_mode_coeffs"
super().__init__(step_cfgs)
def calc_and_save(self, aman, proc_aman):
coeff_aman = get_qu_common_mode_coeffs(aman, Q_signal, U_signal, merge)
self.save(proc_aman, aman)
return aman, proc_aman
def save(self, proc_aman, aman):
if self.save_cfgs is None:
return
if self.save_cfgs:
proc_aman.wrap(self.save_name, aman['qu_common_mode_coeffs'])
def process(self, aman, proc_aman, sim=False, data_aman=None):
if data_aman is not None:
raise NotImplementedError("No support for using data AxisManager in process")
if 'qu_common_mode_coeffs' in proc_aman:
tod_ops.deproject.subtract_qu_common_mode(aman, self.signal_name_Q, self.signal_name_U,
coeff_aman=proc_aman['qu_common_mode_coeffs'],
merge=False)
else:
tod_ops.deproject.subtract_qu_common_mode(aman, self.signal_name_Q,
self.signal_name_U, merge=True)
return aman, proc_aman
class ScanFreqCut(_Preprocess):
"""Apply high-pass cut at the scan frequency.
Example config block::
- name : 'scan_freq_cut'
process:
signal_name_T: 'dsT'
signal_name_Q: 'demodQ'
signal_name_U: 'demodU'
"""
name = "scan_freq_cut"
def __init__(self, step_cfgs):
self.save_name = None
super().__init__(step_cfgs)
def process(self, aman, proc_aman, sim=False, data_aman=None):
if data_aman is not None:
raise NotImplementedError("No support for using data AxisManager in process")
scan_freq = tod_ops.utils.get_scan_freq(aman)
hpf_cfg = {'type': 'sine2', 'cutoff': scan_freq, 'trans_width': scan_freq/10}
filt = tod_ops.get_hpf(hpf_cfg)
n = tod_ops.fft_ops.find_superior_integer(aman.samps.count)
rfft = tod_ops.fft_ops.RFFTObj.for_shape(aman.dets.count, n, "BOTH")
tname = self.process_cfgs["signal_name_T"]
qname = self.process_cfgs["signal_name_Q"]
uname = self.process_cfgs["signal_name_U"]
# Copy results into the existing arrays to avoid aliasing the shared rfft workspace
aman[tname][:] = tod_ops.fourier_filter(aman, filt, signal_name=tname, detrend="linear", rfft=rfft)
aman[qname][:] = tod_ops.fourier_filter(aman, filt, signal_name=qname, detrend="linear", rfft=rfft)
aman[uname][:] = tod_ops.fourier_filter(aman, filt, signal_name=uname, detrend="linear", rfft=rfft)
return aman, proc_aman
class FocalplaneNanFlags(_Preprocess):
"""Find additional detectors which have nans
in their focal plane coordinates.
Saves results in proc_aman under the "fp_flags" field.
Example config block::
- name : "fp_flags"
signal: "signal" # optional
calc:
merge: False
save: True
select: True
.. autofunction:: sotodlib.tod_ops.flags.get_focalplane_flags
"""
name = "fp_flags"
def __init__(self, step_cfgs):
self.save_name = "fp_flags"
super().__init__(step_cfgs)
def calc_and_save(self, aman, proc_aman):
mskfp = tod_ops.flags.get_focalplane_flags(aman, **self.calc_cfgs)
fp_aman = core.AxisManager(aman.dets, aman.samps)
fp_aman.wrap('fp_nans', mskfp, [(0, 'dets'), (1, 'samps')])
self.save(proc_aman, fp_aman)
return aman, proc_aman
def save(self, proc_aman, fp_aman):
if self.save_cfgs is None:
return
if self.save_cfgs:
proc_aman.wrap(self.save_name, fp_aman)
def select(self, meta, proc_aman=None, in_place=True):
if self.select_cfgs is None:
return meta
if proc_aman is None:
proc_aman = meta.preprocess
keep = ~has_all_cut(proc_aman.fp_flags.fp_nans)
if in_place:
meta.restrict("dets", meta.dets.vals[keep])
return meta
return keep
class PointingModel(_Preprocess):
"""Apply pointing model to the TOD.
Saves results in proc_aman under the "pointing" field.
Example config block::
- name : "pointing_model"
process: True
.. autofunction:: sotodlib.coords.pointing_model.apply_pointing_model
"""
name = "pointing_model"
def __init__(self, step_cfgs):
self.save_name = None
super().__init__(step_cfgs)
def process(self, aman, proc_aman, sim=False, data_aman=None):
if data_aman is not None:
raise NotImplementedError("No support for using data AxisManager in process")
from sotodlib.coords import pointing_model
if self.process_cfgs:
pointing_model.apply_pointing_model(aman)
return aman, proc_aman
class BadSubscanFlags(_Preprocess):
"""Identifies and flags bad subscans.
Example config block::
- name : "noisy_subscan_flags"
stats_name: tod_stats # optional
calc:
nstd_lim: 5.0
merge: False
save: True
select: True
.. autofunction:: sotodlib.tod_ops.flags.get_badsubscan_flags
"""
name = "noisy_subscan_flags"
def __init__(self, step_cfgs):
self.stats_name = step_cfgs.get('stats_name', 'tod_stats')
self.save_name = ["noisy_subscan_flags", "noisy_dets_flags"]
super().__init__(step_cfgs)
def calc_and_save(self, aman, proc_aman):
from so3g.proj import RangesMatrix, Ranges
msk_ss = RangesMatrix.zeros((aman.dets.count, aman.samps.count))
msk_det = np.ones(aman.dets.count, dtype=bool)
signal = self.calc_cfgs["subscan_stats"] if isinstance(self.calc_cfgs["subscan_stats"], list) else [self.calc_cfgs["subscan_stats"]]
calc_cfgs_copy = copy.deepcopy(self.calc_cfgs)
for sig in signal:
calc_cfgs_copy["subscan_stats"] = proc_aman[self.stats_name+"_"+sig[-1]]
_msk_ss, _msk_det = tod_ops.flags.get_noisy_subscan_flags(
aman, **calc_cfgs_copy)
msk_ss += _msk_ss # True for bad samps
msk_det &= _msk_det # True for good dets
ss_aman = core.AxisManager(aman.dets, aman.samps)
ss_aman.wrap("valid_subscans", msk_ss, [(0, 'dets'), (1, 'samps')])
det_aman = core.AxisManager(aman.dets)
det_aman.wrap("valid_dets", msk_det, [(0, 'dets')])
self.save(proc_aman, ss_aman, "noisy_subscan_flags")
self.save(proc_aman, det_aman, "noisy_dets_flags")
return aman, proc_aman
def save(self, proc_aman, calc_aman, name):
if self.save_cfgs is None:
return
if self.save_cfgs:
proc_aman.wrap(name, calc_aman)
def select(self, meta, proc_aman=None, in_place=True):
if self.select_cfgs is None:
return meta
if proc_aman is None:
proc_aman = meta.preprocess
keep = proc_aman.noisy_dets_flags.valid_dets
if in_place:
meta.restrict('dets', proc_aman.dets.vals[keep])
return meta
return keep
class CorrectIIRParams(_Preprocess):
"""Correct missing iir_params by default values.
This corrects iir_params only when the observation is within the time_range
that is known to have problem.
Example config block::
- name: "correct_iir_params"
process: True
.. autofunction:: sotodlib.obs_ops.utils.correct_iir_params
"""
name = "correct_iir_params"
def __init__(self, step_cfgs):
self.save_name = None
super().__init__(step_cfgs)
def process(self, aman, proc_aman, sim=False, data_aman=None):
if data_aman is not None:
raise NotImplementedError("No support for using data AxisManager in process")
from sotodlib.obs_ops import correct_iir_params
correct_iir_params(aman)
if "frequency_cutoffs" in proc_aman and np.isnan(proc_aman["frequency_cutoffs"]["signal"]):
n = len(aman.timestamps)
delta_t = (aman.timestamps[-1] - aman.timestamps[0])/n
freqs = np.fft.rfftfreq(n, delta_t)
iir = tod_ops.filters.iir_filter()(freqs, aman)
mag = np.abs(iir) / np.max(np.abs(iir))
# 3dB scale
scale = 10 ** (-3. / 20)
freq_cutoff = freqs[np.min(np.where(np.array(mag < scale * np.max(mag)))[0])]
proc_aman["frequency_cutoffs"]["signal"] = freq_cutoff
return aman, proc_aman
class TrimFlagEdge(_Preprocess):
"""Trim edge until given flags of all detectors are False
To find first and last sample id that has False (i.e., no flags applied) for all detectors.
This is for avoiding glitchfill problem for data whose edge has flags of True.
Example config block::
- name: "trim_flag_edge"
process:
flags: "pca_exclude"
.. autofunction:: sotodlib.core.flagman.find_common_edge_idx
"""
name = 'trim_flag_edge'
def __init__(self, step_cfgs):
self.save_name = None
super().__init__(step_cfgs)
def process(self, aman, proc_aman, sim=False, data_aman=None):
if data_aman is not None:
raise NotImplementedError("No support for using data AxisManager in process")
flags = aman.flags.get(self.process_cfgs.get('flags'))
trimst, trimen = core.flagman.find_common_edge_idx(flags)
aman.restrict('samps', (aman.samps.offset + trimst,
aman.samps.offset + trimen))
proc_aman.restrict('samps', (proc_aman.samps.offset + trimst,
proc_aman.samps.offset + trimen))
return aman, proc_aman
class AcuDropFlags(_Preprocess):
"""Expands ACU drop flag fields in aman to all detectors. ACU drop flags
indicate where samples are missing from the ACU data due to aggregator
failures.
Example config block::
- name: "acu_drop_flags"
calc:
buffer: 200 # disable buffering by setting to False or None.
name: "acu_drop_flags"
merge: True
save: True
"""
name = "acu_drop_flags"
def __init__(self, step_cfgs):
self.save_name = "acu_drops"
super().__init__(step_cfgs)
def calc_and_save(self, aman, proc_aman):
if "acu_drops" in aman.flags:
acu_drops = RangesMatrix([aman.flags.get("acu_drops") for _ in range(aman.dets.count)])
buffer = self.calc_cfgs.get("buffer", 200)
if buffer:
acu_drops = acu_drops.buffer(buffer)
else:
acu_drops = RangesMatrix.zeros(
shape=(aman.dets.count, aman.samps.count))
if self.calc_cfgs.get("merge", True):
aman.flags.wrap('acu_drop_flags', acu_drops, [(0, 'dets'), (1, 'samps')])
flag_aman = core.AxisManager(aman.dets, aman.samps)
flag_aman.wrap(self.calc_cfgs['name'], acu_drops, [(0, 'dets'), (1, 'samps')])
self.save(proc_aman, flag_aman)
return aman, proc_aman
def save(self, proc_aman, flag_aman):
if self.save_cfgs is None:
return
if self.save_cfgs:
proc_aman.wrap(self.save_name, flag_aman)
class SmurfGapsFlags(_Preprocess):
"""Expand smurfgaps flag of each stream_id to all detectors
smurfgaps flags indicates the samples of each stream_id where the
lost frames are filled in the bookbinding process.
Example config block::
- name: "smurfgaps_flags"
calc:
buffer: 200
name: "smurfgaps"
merge: True
save: True
.. autofunction:: sotodlib.tod_ops.flags.expand_smurfgaps_flags
"""
name = "smurfgaps_flags"
def __init__(self, step_cfgs):
self.save_name = "smurfgaps"
super().__init__(step_cfgs)
def calc_and_save(self, aman, proc_aman):
smurfgaps = tod_ops.flags.expand_smurfgaps_flags(aman, **self.calc_cfgs)
flag_aman = core.AxisManager(aman.dets, aman.samps)
flag_aman.wrap(self.calc_cfgs['name'], smurfgaps, [(0, 'dets'), (1, 'samps')])
self.save(proc_aman, flag_aman)
return aman, proc_aman
def save(self, proc_aman, flag_aman):
if self.save_cfgs is None:
return
if self.save_cfgs:
proc_aman.wrap(self.save_name, flag_aman)
class GetTauHWP(_Preprocess):
"""Analyze observation with hwp spinning up or spinning down and
compute the timeconstant of detectors from hwp speed dependence of
the angle of half-wave plate synchronous signal.
Example config block::
- name: "get_tau_hwp"
calc:
width: 1000
apodize_samps: 2000
trim_samps: 2000
min_fhwp: 1
max_fhwp: 2
demod_mode: 4
name: "tau_hwp"
merge: False
save: True
.. autofunction:: sotodlib.hwp.hwp.get_tau_hwp
"""
name = "get_tau_hwp"
def __init__(self, step_cfgs):
self.save_name = step_cfgs.get('calc', {}).get('name')
super().__init__(step_cfgs)
def calc_and_save(self, aman, proc_aman):
tau_hwp_aman = hwp.get_tau_hwp(aman, **self.calc_cfgs)
self.save(proc_aman, tau_hwp_aman)
def save(self, proc_aman, tau_hwp_aman):
if self.save_cfgs is None:
return
if self.save_cfgs:
proc_aman.wrap(self.save_name, tau_hwp_aman)
class Move(_Preprocess):
"""Rename or remove a data field.
To delete the field, pass new_name=None.
Example config block::
- name: "move"
process:
name: "name"
new_name: "new_name"
.. autofunction:: sotodlib.core.axisman.AxisManager.move
"""
name = 'move'
def __init__(self, step_cfgs):
self.save_name = None
super().__init__(step_cfgs)
def process(self, aman, proc_aman, sim=False, data_aman=None):
if data_aman is not None:
raise NotImplementedError("No support for using data AxisManager in process")
aman.move(**self.process_cfgs)
return aman, proc_aman
_Preprocess.register(SplitFlags)
_Preprocess.register(SubtractT2P)
_Preprocess.register(EstimateT2P)
_Preprocess.register(InvVarFlags)
_Preprocess.register(PTPFlags)
_Preprocess.register(PCARelCal)
_Preprocess.register(PCAFilter)
_Preprocess.register(GetCommonMode)
_Preprocess.register(FilterForSources)
_Preprocess.register(FourierFilter)
_Preprocess.register(Trends)
_Preprocess.register(FFTTrim)
_Preprocess.register(Detrend)
_Preprocess.register(GlitchDetection)
_Preprocess.register(Jumps)
_Preprocess.register(FixJumps)
_Preprocess.register(PSDCalc)
_Preprocess.register(NoiseRatio)
_Preprocess.register(Noise)
_Preprocess.register(Calibrate)
_Preprocess.register(EstimateHWPSS)
_Preprocess.register(SubtractHWPSS)
_Preprocess.register(A2Stats)
_Preprocess.register(Apodize)
_Preprocess.register(Demodulate)
_Preprocess.register(AzSS)
_Preprocess.register(SubtractAzSSTemplate)
_Preprocess.register(GlitchFill)
_Preprocess.register(FlagTurnarounds)
_Preprocess.register(SubPolyf)
_Preprocess.register(DetBiasFlags)
_Preprocess.register(SSOFootprint)
_Preprocess.register(DarkDets)
_Preprocess.register(LoadPremadeFlags)
_Preprocess.register(SourceFlags)
_Preprocess.register(HWPAngleModel)
_Preprocess.register(GetStats)
_Preprocess.register(UnionFlags)
_Preprocess.register(CombineFlags)
_Preprocess.register(RotateFocalPlane)
_Preprocess.register(RotateQU)
_Preprocess.register(SubtractQUCommonMode)
_Preprocess.register(ScanFreqCut)
_Preprocess.register(FocalplaneNanFlags)
_Preprocess.register(PointingModel)
_Preprocess.register(BadSubscanFlags)
_Preprocess.register(CorrectIIRParams)
_Preprocess.register(DetcalNanCuts)
_Preprocess.register(TrimFlagEdge)
_Preprocess.register(AcuDropFlags)
_Preprocess.register(SmurfGapsFlags)
_Preprocess.register(GetTauHWP)
_Preprocess.register(Move)
_Preprocess.register(CutBadDistribution)