"""Module for estimating Azimuth Synchronous Signal (azss)"""
import numpy as np
from numpy.polynomial import legendre as L
from scipy.interpolate import interp1d
from sotodlib import core, tod_ops
from sotodlib.tod_ops import bin_signal, apodize, filters, pca
import logging
logger = logging.getLogger(__name__)
def _check_azcoverage(aman, flags, az=None, coverage_threshold=0.95,
exclude_turnarounds=True):
"""
Check if the az coverage is sufficient for fitting.
Parameters
----------
az: array-like
A 1D numpy array representing the azimuth angles.
flags: RangesMatrix
Flags indicating invalid samples.
coverage_threshold: float, optional
Threshold for azimuth coverage to consider it sufficient. Default is 0.95.
exclude_turnarounds: bool, optional
Whether to exclude turnaround regions in the azimuth coverage check. Default is True.
Returns
-------
bad_dets: boolean array
A boolean array indicating which detectors have insufficient azimuth coverage.
coverages: float array
An array of azimuth coverage fractions for each detector.
"""
if az is None:
az = aman.boresight.az
if exclude_turnarounds:
tot_range = np.ptp(az[~aman.flags.turnarounds.mask()])
else:
tot_range = np.ptp(az)
if isinstance(flags, str):
flags = aman.flags.get(flags)
coverages = np.array([np.ptp(az[fl])/tot_range if sum(fl) != 0 else 0 for fl in ~flags.mask()])
return coverages < coverage_threshold, coverages
def _valid_dets_mask(azss_stats, min_valid_bins=1):
"""
Boolean mask of detectors usable for fitting / interpolation.
A detector is considered valid if it has at least ``min_valid_bins``
non-NaN bins and is not flagged in ``azss_stats['bad_dets']``, if present.
"""
valid = (~np.isnan(azss_stats.binned_signal)).sum(axis=1) >= min_valid_bins
if 'bad_dets' in azss_stats:
valid &= ~azss_stats['bad_dets']
return valid
def bin_by_az(aman, signal=None, az=None, azrange=None, bins=100, flags=None,
apodize_edges=True, apodize_edges_samps=1600,
apodize_flags=True, apodize_flags_samps=200):
"""
Bins a signal by azimuth angle.
Parameters
----------
aman: TOD
core.AxisManager
signal: array-like, optional
numpy array of signal to be binned. If None, the signal is taken from aman.signal.
az: array-like, optional
A 1D numpy array representing the azimuth angles. If not provided, the azimuth angles are taken from aman.boresight.az attribute.
azrange: array-like, optional
A list specifying the range of azimuth angles to consider for binning. Defaults to None.
If None, [min(az), max(az)] will be used for binning.
bins: int or sequence of scalars
If bins is an int, it defines the number of equal-width bins in the given azrange (100, by default).
If bins is a sequence, it defines the bin edges, including the rightmost edge, allowing for non-uniform bin widths.
If ``bins`` is a sequence, ``bins`` overwrite ``azrange``.
flags: str or RangesMatrix or Ranges, optional
Flag indicating whether to exclude flagged samples when binning the signal.
If provided by a string, `aman.flags.get(flags)` is used for the flags.
Default is no mask applied.
apodize_edges : bool, optional
If True, applies an apodization window to the edges of the signal. Defaults to True.
apodize_edges_samps : int, optional
The number of samples over which to apply the edge apodization window. Defaults to 1600.
apodize_flags : bool, optional
If True, applies an apodization window based on the flags. Defaults to True.
apodize_flags_samps : int, optional
The number of samples over which to apply the flags apodization window. Defaults to 200.
Returns
-------
binning_dict: A dictionary containing the binned signal values.
* 'bin_edges' : float array of bin edges
* 'bin_centers': center of each bin of azimuth
* 'bin_counts': counts of binned samples
* 'binned_signal': binned signal
* 'binned_signal_sigma': estimated sigma of binned signal
"""
if apodize_edges:
weight_for_signal = apodize.get_apodize_window_for_ends(aman, apodize_samps=apodize_edges_samps)
if isinstance(flags, str):
flags = aman.flags.get(flags)
if (flags is not None) and apodize_flags:
flags_mask = flags.mask()
# check the flags dimension
if flags_mask.ndim == 1:
flag_is_1d = True
else:
all_columns_same = np.all(np.all(flags_mask == flags_mask[0, :], axis=0))
if all_columns_same:
flag_is_1d = True
flags_mask = flags_mask[0]
else:
flag_is_1d = False
if flag_is_1d:
weight_for_signal = weight_for_signal * apodize.get_apodize_window_from_flags(aman,
flags=flags,
apodize_samps=apodize_flags_samps)
else:
weight_for_signal = weight_for_signal[np.newaxis, :] * apodize.get_apodize_window_from_flags(aman,
flags=flags,
apodize_samps=apodize_flags_samps)
else:
if (flags is not None) and apodize_flags:
weight_for_signal = apodize.get_apodize_window_from_flags(aman, flags=flags, apodize_samps=apodize_flags_samps)
else:
weight_for_signal = None
binning_dict = bin_signal(aman, bin_by=az, signal=signal,
range=azrange, bins=bins, flags=flags, weight_for_signal=weight_for_signal)
return binning_dict
def _legendre_x(az, binned_az, m, fit_range, bin_width):
"""
Build Legendre x-coordinates (rescaled to [-1, 1]) for both the sample
azimuth array and the bin centers, given a 1D mask m of valid bins.
If fit_range is None the range is inferred from the bin centers
selected by m (extended by half a bin on each side).
"""
if fit_range is None:
az_min = binned_az[m].min() - bin_width / 2
az_max = binned_az[m].max() + bin_width / 2
else:
az_min, az_max = fit_range[0], fit_range[1]
span = az_max - az_min
mid = az_min + az_max
x_samp = (2 * az - mid) / span
x_bin = np.where(m, (2 * binned_az - mid) / span, np.nan)
return x_samp, x_bin
def _fit_one_det(binned_signal_i, x_bin, m, sigma_i, max_mode):
"""
Fit a Legendre polynomial to one detector's binned signal.
Returns
-------
coeffs : ndarray, shape (max_mode + 1,)
binned_model : ndarray, shape (nbins,) (NaN at masked bins)
sum_of_squares : float
redchi2 : float
"""
coeffs = L.legfit(x_bin[m], binned_signal_i[m], max_mode)
binned_model = np.where(m, L.legval(x_bin, coeffs), np.nan)
ssq = np.sum((binned_signal_i[m] - binned_model[m])**2)
redchi2 = ssq / sigma_i**2 / (m.sum() - max_mode - 1)
return coeffs, binned_model, ssq, redchi2
def fit_azss(az, azss_stats, max_mode, modes_axis_name='azss_modes', fit_range=None, overwrite=False):
"""
Function for fitting Legendre polynomials to signal binned in azimuth.
Parameters
----------
az: array-like
azimuth array from main axis manager
azss_stats: AxisManager
Axis manager containing binned signal and azimuth used for fitting.
Created by ``get_azss`` function.
max_mode: integer
Highest order Legendre polynomial to include in the fit.
modes_axis_name: string, optional
The name to assign to the LabelAxis of azss legendre modes when method is 'fit'.
fit_range: list
Azimuth range used to renormalized to the [-1,1] range spanned
by the Legendre polynomials for fitting. Default is the max-min
span in the ``binned_az`` array passed in via ``azss_stats``.
overwrite: bool
If overwrite is true will refit the data even if the fit parameters are
already stored in azss_stats. If False will just use the stored
parameters in azss_stats to compute and return the model.
Returns
-------
model_sig_tod: array-like
Model fit for each detector size ndets x n_samps
"""
bin_width = azss_stats.binned_az[1] - azss_stats.binned_az[0]
ndets = azss_stats.dets.count
nbins = azss_stats.bin_az_samps.count
m_2d = ~np.isnan(azss_stats.binned_signal)
# Number of valid bins cannot be smaller than mode of Legendre function.
valid_dets = _valid_dets_mask(azss_stats, min_valid_bins=max_mode + 1)
model = np.zeros((ndets, len(az)))
coeffs = np.zeros((ndets, max_mode + 1))
binned_model = np.full((ndets, nbins), np.nan)
sum_of_squares = np.full(ndets, np.nan)
redchi2s = np.full(ndets, np.nan)
if not valid_dets.any():
logger.warning('All the detectors have low az coverage and cannot make model')
return model
use_cached = ('coeffs' in azss_stats) and not overwrite
is_uniform = np.array_equal(m_2d[valid_dets].any(axis=0),
m_2d[valid_dets].all(axis=0))
if is_uniform:
m = m_2d[valid_dets][0]
x_samp, x_legendre_bin_centers = _legendre_x(
az, azss_stats.binned_az, m, fit_range, bin_width)
if use_cached:
return L.legval(x_samp, azss_stats.coeffs.T)
# Vectorized Legendre fit over all valid detectors at once.
c = L.legfit(x_legendre_bin_centers[m],
azss_stats.binned_signal[valid_dets][:, m].T,
max_mode).T # shape: (n_valid, max_mode + 1)
coeffs[valid_dets] = c
bm = L.legval(x_legendre_bin_centers, c.T)
binned_model[valid_dets] = np.where(m, bm, np.nan)
model[valid_dets] = L.legval(x_samp, c.T)
diff = azss_stats.binned_signal[valid_dets][:, m] - bm[:, m]
sum_of_squares[valid_dets] = np.sum(diff**2, axis=-1)
redchi2s[valid_dets] = (
sum_of_squares[valid_dets]
/ azss_stats.uniform_binned_signal_sigma[valid_dets]**2
/ (m.sum() - max_mode - 1)
)
else:
for i in np.where(valid_dets)[0]:
m_i = m_2d[i]
x_samp, x_legendre_bin_centers = _legendre_x(
az, azss_stats.binned_az, m_i, fit_range, bin_width)
if use_cached:
model[i] = L.legval(x_samp, azss_stats.coeffs[i])
continue
(coeffs[i],
binned_model[i],
sum_of_squares[i],
redchi2s[i]) = _fit_one_det(
azss_stats.binned_signal[i], x_legendre_bin_centers, m_i,
azss_stats.uniform_binned_signal_sigma[i], max_mode)
model[i] = L.legval(x_samp, coeffs[i])
if use_cached:
return model
mode_names = np.array([f'legendre{mode}' for mode in range(max_mode + 1)], dtype='<U10')
modes_axis = core.LabelAxis(name=modes_axis_name, vals=mode_names)
azss_stats.add_axis(modes_axis)
azss_stats.wrap('binned_model', binned_model, [(0, 'dets'), (1, 'bin_az_samps')])
azss_stats.wrap('x_legendre_bin_centers', x_legendre_bin_centers, [(0, 'bin_az_samps')])
azss_stats.wrap('coeffs', coeffs, [(0, 'dets'), (1, modes_axis_name)])
azss_stats.wrap('redchi2s', redchi2s, [(0, 'dets')])
return model
[docs]
def get_azss(aman, signal='signal', az=None, azrange=None, bins=100, flags=None, scan_flags=None,
apodize_edges=True, apodize_edges_samps=1600, apodize_flags=True, apodize_flags_samps=200,
apply_prefilt=True, prefilt_cfg=None, prefilt_detrend='linear',
method='interpolate', max_mode=None, modes_axis_name='azss_modes', subtract_in_place=False,
merge_stats=True, azss_stats_name='azss_stats',
merge_model=True, azss_model_name='azss_model', coverage_threshold=0.95,
exclude_turnarounds=True, return_det_mask=False):
"""
Derive azss (Azimuth Synchronous Signal) statistics and model from the given axismanager data.
**NOTE:** This function does not modify the ``signal`` unless ``subtract_in_place = True``.
Parameters
----------
aman: TOD
core.AxisManager
signal: array-like, optional
A numpy array representing the signal to be used for azss extraction. If not provided, the signal is taken from aman.signal.
az: array-like, optional
A 1D numpy array representing the azimuth angles. If not provided, the azimuth angles are taken from aman.boresight.az.
azrange: list, optional
A list specifying the range of azimuth angles to consider for binning. Defaults to [-np.pi, np.pi].
If None, [min(az), max(az)] will be used for binning.
bins: int or sequence of scalars
If bins is an int, it defines the number of equal-width bins in the given azrange (100, by default).
If bins is a sequence, it defines the bin edges, including the rightmost edge, allowing for non-uniform bin widths.
If `bins` is a sequence, `bins` overwrite `azrange`.
flags : str or Rannges or RangesMatrix, optional
Flag indicating whether to exclude flagged samples when binning the signal.
Default is no mask applied.
scan_flags : str or Ranges, optional
Subtract in the scan/time region specified by flags.
Typically `flags.left_scan` or `flags.right_scan` will be used.
If we estimate and subtract azss in left going scan only, then
`flags` should be a "union of glitch_flags and flags.right_scan (or ~flags.left_scan)", and
`scan_flags` should be `flags.left_scan`.
apodize_edges : bool, optional
If True, applies an apodization window to the edges of the signal. Defaults to True.
apodize_edges_samps : int, optional
The number of samples over which to apply the edge apodization window. Defaults to 1600.
apodize_flags : bool, optional
If True, applies an apodization window based on the flags. Defaults to True.
apodize_flags_samps : int, optional
The number of samples over which to apply the flags apodization window. Defaults to 200.
apply_prefilt : bool, optional
If True, applies a pre-filter to the signal before azss extraction. Defaults to True.
prefilt_cfg : dict, optional
Configuration for the pre-filter. Defaults to {'type': 'sine2', 'cutoff': 0.005, 'trans_width': 0.005}.
prefilt_detrend : str, optional
Method for detrending before filtering. Defaults to 'linear'.
method: str
The method to use for azss modeling. Options are 'interpolate' and 'fit'.
In 'interpolate', binned signal is used directly.
In 'fit', fitting is applied to the binned signal.
Defaults to 'interpolate'.
max_mode: integer, optional
The number of Legendre modes to use for azss when method is 'fit'. Required when method is 'fit'.
modes_axis_name: string, optional
The name assigned to the LabelAxis of azss legendre modes when method is 'fit'.
Set a unique name when fitting azss with different max_mode values to avoid axis name conflicts.
Defaults to 'azss_modes'.
subtract_in_place: bool
If True, it subtract the modeled tod from original signal. The aman.signal will be modified.
merge_stats: boolean, optional
Boolean flag indicating whether to merge the azss statistics with aman. Defaults to True.
azss_stats_name: string, optional
The name to assign to the merged azss statistics. Defaults to 'azss_stats'.
merge_model: boolean, optional
Boolean flag indicating whether to merge the azss model with the aman. Defaults to True.
azss_model_name: string, optional
The name to assign to the merged azss model. Defaults to 'azss_model'.
coverage_threshold: float, optional
Minimum azimuth coverage fraction required. Default 0.8
exclude_turnarounds: bool, optional
Exclude turnarounds when checking coverage. Default True
return_det_mask: bool, optional
If True, return detector mask along with model. Default False
Returns
-------
Tuple:
- azss_stats: core.AxisManager
- azss statistics including: azumith bin centers, bin counts, binned signal, std of each detector-az bin, std of each detector.
- If ``method=fit`` then also includes: binned legendre model, legendre bin centers, fit coefficients, reduced chi2.
- If ``return_det_mask=True`` then also includes: bad_dets, coverages.
- model_sig_tod: numpy.array
- azss model as a function of time either from fits or interpolation depending on ``method`` argument.
"""
if prefilt_cfg is None:
prefilt_cfg = {'type': 'sine2', 'cutoff': 0.005, 'trans_width': 0.005}
prefilt = filters.get_hpf(prefilt_cfg)
if signal is None:
# signal_name variable to be deleted when tod_ops.fourier_filter is updated
signal_name = 'signal'
signal = aman[signal_name]
elif isinstance(signal, str):
signal_name = signal
signal = aman[signal_name]
elif isinstance(signal, np.ndarray):
raise TypeError("Currently ndarray not supported, need update to tod_ops.fourier_filter module to remove signal_name argument.")
else:
raise TypeError("Signal must be None, str, or ndarray")
if scan_flags is None:
scan_flags = np.ones(aman.samps.count, dtype=bool)
elif isinstance(scan_flags, str):
scan_flags = aman.flags.get(scan_flags).mask()
else:
scan_flags = scan_flags.mask()
if apply_prefilt:
# This requires signal to be a string.
signal = np.array(tod_ops.fourier_filter(
aman, prefilt, detrend=prefilt_detrend, signal_name=signal_name)
)
if az is None:
az = aman.boresight.az
# do binning
binning_dict = bin_by_az(aman, signal=signal, az=az, azrange=azrange, bins=bins, flags=flags,
apodize_edges=apodize_edges, apodize_edges_samps=apodize_edges_samps,
apodize_flags=apodize_flags, apodize_flags_samps=apodize_flags_samps,)
bin_centers = binning_dict['bin_centers']
bin_counts = binning_dict['bin_counts']
binned_signal = binning_dict['binned_signal']
binned_signal_sigma = binning_dict['binned_signal_sigma']
uniform_binned_signal_sigma = np.nanmedian(binned_signal_sigma, axis=-1)
azss_stats = core.AxisManager(aman.dets)
azss_stats.wrap('binned_az', bin_centers, [(0, core.IndexAxis('bin_az_samps', count=bins))])
azss_stats.wrap('bin_counts', bin_counts, [(0, 'dets'), (1, 'bin_az_samps')])
azss_stats.wrap('binned_signal', binned_signal, [(0, 'dets'), (1, 'bin_az_samps')])
azss_stats.wrap('binned_signal_sigma', binned_signal_sigma, [(0, 'dets'), (1, 'bin_az_samps')])
azss_stats.wrap('uniform_binned_signal_sigma', uniform_binned_signal_sigma, [(0, 'dets')])
if return_det_mask:
bad_dets, coverages = _check_azcoverage(aman, flags=flags, az=az,
coverage_threshold=coverage_threshold,
exclude_turnarounds=exclude_turnarounds)
azss_stats.wrap('bad_dets', bad_dets, [(0, 'dets')])
azss_stats.wrap('az_coverage', coverages, [(0, 'dets')])
model_sig_tod = get_azss_model(aman, azss_stats, az, method, max_mode, modes_axis_name, azrange)
if merge_stats:
aman.wrap(azss_stats_name, azss_stats)
if merge_model:
aman.wrap(azss_model_name, model_sig_tod, [(0, 'dets'), (1, 'samps')])
if subtract_in_place:
aman[signal_name][:, scan_flags] -= model_sig_tod.astype(signal.dtype)[:, scan_flags]
return azss_stats, model_sig_tod
def get_azss_model(aman, azss_stats, az=None, method='interpolate',
max_mode=None, modes_axis_name='azss_modes', azrange=None):
"""
Function to return the azss template for subtraction given the azss_stats AxisManager
Parameters
----------
aman: AxisManager
The axis manager containing the signal
azss_stats: AxisManager
Contains binned azss statistics
az: array-like, optional
Azimuth array. If None, uses aman.boresight.az
method: str
Method for modeling: 'interpolate' or 'fit'
max_mode: int, optional
Maximum Legendre mode for 'fit' method
modes_axis_name: string, optional
The name assigned to the LabelAxis of azss legendre modes when method is 'fit'.
azrange: list, optional
Azimuth range for fitting
Returns
-------
model: array-like
AZSS model for each detector
"""
if az is None:
az = aman.boresight.az
if method == 'fit':
if not isinstance(max_mode, int):
raise ValueError('max_mode is not provided as integer')
model = fit_azss(
az=az, azss_stats=azss_stats, max_mode=max_mode,
modes_axis_name=modes_axis_name, fit_range=azrange)
if method == 'interpolate':
model = np.zeros((aman.dets.count, aman.samps.count))
valid_dets = _valid_dets_mask(azss_stats, min_valid_bins=1)
if not valid_dets.any():
logger.warning('All the detectors have low az coverage and cannot make model')
return model
m_2d = ~np.isnan(azss_stats.binned_signal)
is_uniform = np.array_equal(m_2d[valid_dets].any(axis=0),
m_2d[valid_dets].all(axis=0))
if is_uniform:
m = m_2d[valid_dets][0]
f_template = interp1d(azss_stats.binned_az[m], azss_stats.binned_signal[:, m][valid_dets, :], fill_value='extrapolate')
model[valid_dets, :] = f_template(az)
else:
for i in np.where(valid_dets)[0]:
m = m_2d[i]
f_template = interp1d(azss_stats.binned_az[m], azss_stats.binned_signal[i][m], fill_value='extrapolate')
model[i, :] = f_template(az)
if np.any(~np.isfinite(model)):
logger.warning('azss model has nan. set zero to nan but this may make glitch')
model[~np.isfinite(model)] = 0
if 'bad_dets' in azss_stats:
model[azss_stats['bad_dets'], :] = 0
return model
[docs]
def subtract_azss(aman, azss_stats, signal='signal', method='interpolate', max_mode=None,
modes_axis_name='azss_modes', azrange=None,
scan_flags=None, subtract_name='azss_remove', in_place=False, remove_template=False):
"""
Subtract the scan synchronous signal (azss) template from the
signal in the given axis manager.
Parameters
----------
aman : AxisManager
The axis manager containing the signal and the azss template.
azss_stats: AxisManager
Contains AxisManager from get_azss.
signal : str, optional
The name of the field in the axis manager containing the signal to be processed.
Defaults to 'signal'.
method: str
The method to use for azss modeling. Options are 'interpolate' and 'fit'.
max_mode: integer, optinal
The number of Legendre modes to use for azss when method is 'fit'. Required when method is 'fit'.
azrange:
Azimuth range used for 'fit' method.
scan_flags: str or Ranges, optional
Subtract in the scan/time region specified by flags.
Typically `flags.left_scan` or `flags.right_scan` will be used.
subtract_name : str, optional
The name of the field in the axis manager that will store the azss-subtracted signal.
Only used if in_place is False. Defaults to 'azss_remove'.
in_place : bool, optional
If True, the subtraction is done in place, modifying the original signal in the axis manager.
If False, the result is stored in a new field specified by subtract_name. Defaults to False.
Returns
-------
None
"""
if signal is None:
signal_name = 'signal'
signal = aman[signal_name]
elif isinstance(signal, str):
signal_name = signal
signal = aman[signal_name]
elif isinstance(signal, np.ndarray):
if np.shape(signal) != (aman.dets.count, aman.samps.count):
raise ValueError("When passing signal as ndarray shape must match (n_dets x n_samps).")
signal_name = None
else:
raise TypeError("Signal must be None, str, or ndarray")
model = get_azss_model(aman, azss_stats, method=method, max_mode=max_mode,
modes_axis_name=modes_axis_name, azrange=azrange)
if scan_flags is None:
scan_flags = np.ones(aman.samps.count, dtype=bool)
elif isinstance(scan_flags, str):
scan_flags = aman.flags.get(scan_flags).mask()
else:
scan_flags = scan_flags.mask()
if in_place:
if signal_name is None:
signal[:, scan_flags] -= model.astype(signal.dtype)[:, scan_flags]
else:
aman[signal_name][:, scan_flags] -= model.astype(aman[signal_name].dtype)[:, scan_flags]
else:
subtracted = signal[:, scan_flags] - model.astype(signal.dtype)[:, scan_flags]
aman.wrap(subtract_name, subtracted, [(0, 'dets'), (1, 'samps')])
def subtract_azss_template(
aman,
signal='signal',
azss='azss_stats',
method='interpolate',
scan_flags=None,
pca_modes=None,
subtract=True,
):
"""
Make azss template model that is "common" to all detectors and subtract
If pca_modes are specified, make template by pca, otherwise make template by weighted mean.
Parameters
----------
aman: AxisManager
The axis manager containing the signal and the azss template.
signal: str or array-like
numpy array of signal to be binned. If None, the signal is taken from aman.signal.
azss: str or azss_stats AxisManager
azss_stats AxisManager generated by `azss.get_azss`
method : str
The method to use for azss modeling. Option is 'interpolate' only now.
scan_flags: str or flags, optional
Subtract template model in the time region specified by flags.
Typically `flags.left_scan` or `flags.right_scan` will be used.
pca_modes: integer, optinal
Number of pca modes for making azss template
subtract: boolean, optional
If true subtract azss template from signal
Returns
-------
azss_template_model
"""
if isinstance(signal, str):
signal = aman.get(signal)
if isinstance(azss, str):
if azss in aman:
azss = aman.get(azss)
elif azss in aman.preprocess:
azss = aman.preprocess.get(azss)
else:
raise ValueError(f'{azss} field not found in aman or aman.preprocess.')
if scan_flags is None:
scan_flags = np.ones(aman.samps.count, dtype=bool)
elif isinstance(scan_flags, str):
scan_flags = aman.flags.get(scan_flags).mask()
else:
scan_flags = scan_flags.mask()
mask = ~np.any(np.isnan(azss.binned_signal), axis=0)
if pca_modes is not None:
# make pca model
template = np.full((azss.dets.count, azss.bin_az_samps.count), np.nan)
if aman.dets.count < pca_modes:
raise ValueError(f'The number of pca modes {pca_modes} is '
f'larger than the number of detectors {aman.dets.count}.')
p = pca.get_pca(aman, signal=azss.binned_signal[:, mask])
R = p.R[:, :pca_modes]
template[:, mask] = R @ R.T @ azss.binned_signal[:, mask]
else:
# make commom mode template
weighted_azss = np.sum(azss.binned_signal * azss.binned_signal_sigma**-2, axis=0)
weight = np.sum(azss.binned_signal_sigma**-2, axis=0)
template = weighted_azss / weight
# Least square fit the amplitude of azss per each detector
coefs = np.sum(template[np.newaxis, mask] * azss.binned_signal[:, mask] * azss.binned_signal_sigma[:, mask]**-2, axis=1)
coefs /= np.sum(template[np.newaxis, mask]**2 * azss.binned_signal_sigma[:, mask]**-2, axis=1)
template = coefs[:, np.newaxis] * template[np.newaxis, :]
if method == 'interpolate':
f_template = interp1d(azss.binned_az[mask], template[:, mask], fill_value='extrapolate')
model = f_template(aman.boresight.az)
else:
raise ValueError(f'{method} is not supported yet')
if np.any(~np.isfinite(model)):
logger.warning('azss model has nan. set zero to nan but this may make glitch')
model[~np.isfinite(model)] = 0
if subtract:
signal[:, scan_flags] -= model[:, scan_flags]
return model