import numpy as np
from scipy.optimize import curve_fit
from sotodlib import core, tod_ops
from sotodlib.tod_ops import bin_signal, filters
import logging
logger = logging.getLogger(__name__)
[docs]
def get_hwpss(aman, signal_name=None, hwp_angle=None, bin_signal=True, bins=360,
lin_reg=True, modes=[1, 2, 3, 4, 6, 8], apply_prefilt=True,
prefilt_cfg=None, prefilt_detrend='linear', flags=None,
merge_stats=True, hwpss_stats_name='hwpss_stats',
merge_model=True, hwpss_model_name='hwpss_model'):
"""
Extracts HWP synchronous signal (HWPSS) from a time-ordered data (TOD)
using linear regression or curve-fitting. The curve-fitting or linear
regression are either run on the full time ordered data vs hwp angle
or the time ordered data binned in hwp_angle. If the curve-fitting
option is used it must be performed on the binned data.
Parameters
----------
aman : AxisManager object
The TOD to extract HWPSS from.
signal_name : str
The field name in the axis manager to use for the TOD signal.
If not provided, ``signal`` will be used.
hwp_angle : array-like, optional
The HWP angle for each sample in `aman`. If not provided, `aman.hwp_angle` will be used.
bin_signal : bool, optional
Whether to bin the TOD signal into HWP angle bins before extracting HWPSS. Default is `True`.
bins : int, optional
The number of HWP angle bins to use if `bin_signal` is `True`. Default is 360.
lin_reg : bool, optional
Whether to use linear regression to extract HWPSS from the binned signal. If `False`, curve-fitting will be used instead.
Default is `True`.
modes : list of int, optional
The HWPSS harmonic modes to extract. Default is [1, 2, 3, 4, 6, 8].
apply_prefilt : bool, optional
Whether to apply a high-pass filter to signal before extracting HWPSS. Default is `True`.
If run through preprocess and `signal` is not `aman.signal` then default to `False`.
prefilt_cfg : dict, optional
The configuration of the high-pass filter, in Hz. Only used if `apply_prefilt` is `True`.
Default is sine2 filter of with cutoff frequency of 1.0 Hz and trans_width of 1.0 Hz.
prefilt_detrend: str or None
Method of detrending when you apply prefilter. Default is `linear`. If data is already detrended or you do not want to detrend,
set it to `None`.
flags : RangesMatrix, optional
Flags to be masked out before extracting HWPSS. If Default is None, and no mask will be applied.
merge_stats : bool, optional
Whether to add the extracted HWPSS statistics to `aman` as new axes. Default is `True`.
hwpss_stats_name : str, optional
The name to use for the new field containing the HWPSS statistics if `merge_stats` is `True`. Default is 'hwpss_stats'.
merge_extract : bool, optional
Whether to add the extracted HWPSS to `aman` as a new signal field. Default is `True`.
hwpss_extract_name : str, optional
The name to use for the new signal field containing the extracted HWPSS if `merge_extract` is `True`. Default is 'hwpss_extract'.
Returns
-------
hwpss_stats : AxisManager object
The extracted HWPSS and its statistics. The statistics include:
- **coeffs** (n_dets x n_modes) : coefficients of the model
.. math::
\sum_n \mathrm{coeffs}[2n]\sin{(\mathrm{modes}[n] \chi_{\mathrm{hwp}})} + \mathrm{coeffs}[2n+1]\cos{(\mathrm{modes}[n] \chi_{\mathrm{hwp}})}
where the sum on n range(len(modes)). **Note**: n_modes is 2*len(modes)
- **covars** (n_dets x n_modes x n_modes) : variance covariance matrix of the fitted coefficients for each detector.
- **redchi2** (n_dets) : reduced chi^2 of the fit for each detector.
**In the binned case the following are returned:**
- **binned_angle** (n_bins) : binned version of hwp_angle in range (0, 2pi] with number of bins set by bins argument.
- **binned_signal** (n_dets x n_bins) : binned signal for each detector.
- **sigma_bin** (n_dets) : average over all bins of the standard deviation of the signal within each bin.
**In the non-binned case the following are returned:**
- **sigma_tod** (n_dets) : estimate of the standard deviation of the signal using function ``estimate_sigma_tod``
"""
if prefilt_cfg is None:
prefilt_cfg = {'type': 'sine2', 'cutoff': 1.0, 'trans_width': 1.0}
prefilt = filters.get_hpf(prefilt_cfg)
if signal_name is None:
if apply_prefilt:
signal = np.array(tod_ops.fourier_filter(
aman, prefilt, detrend=prefilt_detrend, signal_name='signal'))
else:
signal = aman.signal
else:
if apply_prefilt:
signal = np.array(tod_ops.fourier_filter(
aman, prefilt, detrend=prefilt_detrend, signal_name=signal_name))
else:
signal = aman[signal_name]
if hwp_angle is None:
hwp_angle = aman.hwp_angle
# define hwpss_stats
mode_names = []
for mode in modes:
mode_names.append(f'S{mode}')
mode_names.append(f'C{mode}')
hwpss_stats = core.AxisManager(aman.dets, core.LabelAxis(
name='modes', vals=np.array(mode_names, dtype='<U3')))
if bin_signal:
hwp_angle_bin_centers, binned_hwpss, hwpss_sigma_bin = get_binned_hwpss(
aman, signal, hwp_angle=None, bins=bins, flags=flags)
# check bin count
num_invalid_bins = np.count_nonzero(np.isnan(binned_hwpss[0][:]))
if num_invalid_bins > 0:
logger.warning(f'There are {num_invalid_bins} bins with zero samples. ' +
'You maybe using simulation data whose hwp speed is perfectly constant, ' +
'or your specification of number of bins is too large.')
# wrap
hwpss_stats.wrap('binned_angle', hwp_angle_bin_centers, [
(0, core.IndexAxis('bin_samps', count=bins))])
hwpss_stats.wrap('binned_signal', binned_hwpss, [
(0, 'dets'), (1, 'bin_samps')])
hwpss_stats.wrap('sigma_bin', hwpss_sigma_bin, [(0, 'dets')])
if lin_reg:
fitsig_binned, coeffs, covars, redchi2s = hwpss_linreg(
x=hwp_angle_bin_centers, ys=binned_hwpss, yerrs=hwpss_sigma_bin, modes=modes)
else:
params_init = guess_hwpss_params(
x=hwp_angle_bin_centers, ys=binned_hwpss, modes=modes)
fitsig_binned, coeffs, covars, redchi2s = hwpss_curvefit(x=hwp_angle_bin_centers, ys=binned_hwpss, yerrs=hwpss_sigma_bin,
modes=modes, params_init=params_init)
# tod template
fitsig_tod = harms_func(hwp_angle, modes, coeffs)
# wrap the optimal values and stats
hwpss_stats.wrap('binned_model', fitsig_binned,
[(0, 'dets'), (1, 'bin_samps')])
hwpss_stats.wrap('coeffs', coeffs, [(0, 'dets'), (1, 'modes')])
hwpss_stats.wrap('covars', covars, [
(0, 'dets'), (1, 'modes'), (2, 'modes')])
hwpss_stats.wrap('redchi2s', redchi2s, [(0, 'dets')])
else:
if flags is None:
m = np.ones([aman.dets.count, aman.samps.count], dtype=bool)
else:
m = ~flags.mask()
hwpss_sigma_tod = estimate_sigma_tod(signal, hwp_angle)
hwpss_stats.wrap('sigma_tod', hwpss_sigma_tod, [(0, 'dets')])
if lin_reg:
fitsig_tod, coeffs, covars, redchi2s = hwpss_linreg(
x=hwp_angle, ys=signal, yerrs=hwpss_sigma_tod, modes=modes)
else:
raise ValueError('Curve-fitting for TOD are specified.' +
'It will take too long time and return meaningless result.' +
'Specify (bin_signal, lin_reg) = (True, True) or (True, False) or (False, True)')
hwpss_stats.wrap('coeffs', coeffs, [(0, 'dets'), (1, 'modes')])
hwpss_stats.wrap('covars', covars, [
(0, 'dets'), (1, 'modes'), (2, 'modes')])
hwpss_stats.wrap('redchi2s', redchi2s, [(0, 'dets')])
if merge_stats:
aman.wrap(hwpss_stats_name, hwpss_stats)
if merge_model:
aman.wrap(hwpss_model_name, fitsig_tod, [(0, 'dets'), (1, 'samps')])
return hwpss_stats
def get_binned_hwpss(aman, signal=None, hwp_angle=None,
bins=360, flags=None):
"""
Bin time-ordered data by the HWP angle and return the binned signal and its standard deviation.
Parameters
----------
aman : TOD
The Axismanager object to be binned.
signal : str, optional
The name of the signal to be binned. Defaults to aman.signal if not specified.
hwp_angle : str, optional
The name of the timestream of hwp_angle. Defaults to aman.hwp_angle if not specified.
bins : int, optional
The number of HWP angle bins to use. Default is 360.
flags : None or RangesMatrix
Flag indicating whether to exclude flagged samples when binning the signal.
Default is no mask applied.
Returns
-------
aman_proc:
The AxisManager object which contains
* center of each bin of hwp_angle
* binned hwp synchrounous signal
* estimated sigma of binned signal
"""
if signal is None:
signal = aman.signal
if hwp_angle is None:
hwp_angle = aman['hwp_angle']
binning_dict = bin_signal(aman, bin_by=hwp_angle, range=[0, 2*np.pi],
bins=bins, signal=signal, flags=flags)
bin_centers = binning_dict['bin_centers']
binned_hwpss = binning_dict['binned_signal']
binned_hwpss_sigma = binning_dict['binned_signal_sigma']
# use median of sigma of each bin as uniform sigma for a detector
hwpss_sigma = np.nanmedian(binned_hwpss_sigma, axis=-1)
return bin_centers, binned_hwpss, hwpss_sigma
def hwpss_linreg(x, ys, yerrs, modes):
"""
Performs a linear regression of the input data ys as a function of x, using a set of sine and cosine
basis functions defined by the input modes. Returns the fitted signal, the coefficients of the
basis functions, their covariance matrix, and the reduced chi-square.
Parameters
-----------
x : numpy.ndarray
The independent variable values of the data points to fit.
ys : numpy.ndarray
The dependent variable values of the data points to fit.
yerrs : numpy.ndarray
The error estimates of the dependent variable values.
modes : list of int
The frequencies of the sine and cosine basis functions to use.
Returns
-------
fitsig : numpy.ndarray
The fitted signal, obtained by evaluating the model with the optimal coefficients.
coeffs : numpy.ndarray
The coefficients of the sine and cosine basis functions that best fit the data.
covars : numpy.ndarray
The covariance matrix of the coefficients, estimated from the data errors.
redchi2s : numpy.ndarray
The reduced chi-square statistic of the fit, computed for each data point.
"""
m = np.isnan(ys[0][:])
xn = np.copy(x)
x = x[~m]
ys = ys[:, ~m]
vects = np.zeros([2*len(modes), x.shape[0]], dtype='float32')
for i, mode in enumerate(modes):
vects[2*i, :] = np.sin(mode*x)
vects[2*i+1, :] = np.cos(mode*x)
I = np.linalg.inv(np.tensordot(vects, vects, (1, 1)))
coeffs = np.matmul(ys, vects.T)
coeffs = np.dot(I, coeffs.T).T
fitsig = harms_func(x, modes, coeffs)
# covariance of coefficients
covars = np.zeros((ys.shape[0], 2*len(modes), 2*len(modes)))
for det_idx in range(ys.shape[0]):
covars[det_idx, :, :] = I * yerrs[det_idx]**2
# reduced chi-square
redchi2s = np.sum(
((ys - fitsig)/yerrs[:, np.newaxis])**2, axis=-1) / (x.shape[0] - 2*len(modes))
fitsig = harms_func(xn, modes, coeffs)
return fitsig, coeffs, covars, redchi2s
def harms_func(x, modes, coeffs):
"""
calculates the harmonics function given the input values, modes and coefficients.
Parameters
----------
x (numpy.ndarray): Input values
modes (list): List of modes to be used in the harmonics function
coeffs (numpy.ndarray): Coefficients of the harmonics function
Returns
-------
numpy.ndarray: The calculated harmonics function.
"""
vects = np.zeros([2*len(modes), x.shape[0]], dtype='float32')
for i, mode in enumerate(modes):
vects[2*i, :] = np.sin(mode*x)
vects[2*i+1, :] = np.cos(mode*x)
if coeffs is None:
return vects
else:
harmonics = np.matmul(coeffs, vects)
return harmonics
def guess_hwpss_params(x, ys, modes):
"""
Compute initial guess for the coefficients of a harmonics-based fit to data.
Parameters
----------
x : array-like of shape (nsamps,)
ys : array-like of shape (ndets, nsamps)
modes : array-like of shape (nmodes,)
List of modes to use in the fit.
Returns
-------
Params_init : ndarray of shape (m, 2*p)
Initial guess for the coefficients of a harmonics-based fit to the data.
"""
m = np.isnan(ys[0][:])
x = x[~m]
ys = ys[:, ~m]
vects = harms_func(x, modes, coeffs=None)
for i, mode in enumerate(modes):
vects[2*i, :] = np.sin(mode*x)
vects[2*i+1, :] = np.cos(mode*x)
params_init = 2 * np.matmul(ys, vects.T) / x.shape[0]
return params_init
def wrapper_harms_func(x, modes, *args):
"""
A wrapper function for the harmonics function to be used for fitting data using Scipy's curve-fitting algorithm.
Parameters
----------
x : array-like
The x-values of the data points to be fitted.
modes : array-like
An array of integers representing the modes of the harmonics function.
*args : tuple
A tuple of arguments. The first argument should be an array of coefficients used to calculate the harmonics function.
Returns
-------
y : array-like
An array of the same length as x representing the values of the harmonics function evaluated at x using the given
modes and coefficients.
"""
coeffs = np.array(args[0])
return harms_func(x, modes, coeffs)
def hwpss_curvefit(x, ys, yerrs, modes, params_init=None):
"""
Fit harmonics to input data using scipy's curve_fit method.
Parameters
----------
x : array_like
1-D array of x values.
ys : array_like
2-D array of y values for each detector.
yerrs : array_like
1-D array of the standard deviation of the y values for each detector.
modes : array_like
1-D array of mode numbers to be fitted.
params_init : array_like, optional
2-D array of initial parameter values for each detector. Default is None.
Returns
-------
fitsig : ndarray
2-D array of the fitted values for each detector.
coeffs : ndarray
2-D array of the fitted coefficients for each detector.
covars : ndarray
3-D array of the covariance matrix of the fitted coefficients for each detector.
redchi2s : ndarray
1-D array of the reduced chi-square values for each detector.
Notes
-----
This function fits a set of harmonic functions to the input data using scipy's curve_fit method.
The `modes` parameter specifies the mode numbers to be fitted.
The `params_init` parameter can be used to provide initial guesses for the fit parameters.
"""
N_dets = ys.shape[0]
N_samps = ys.shape[-1]
N_modes = len(modes)
# Handle binned data w/ 0 counts in a bin.
m = np.isnan(ys[0][:])
xn = np.copy(x)
x = x[~m]
ys = ys[:, ~m]
if params_init is None:
params_init = np.zeros((N_dets, 2*N_modes))
coeffs = np.zeros((N_dets, 2*len(modes)))
covars = np.zeros((N_dets, 2*len(modes), 2*len(modes)))
redchi2s = np.zeros(N_dets)
fitsig = np.zeros((N_dets, N_samps))
for det_idx in range(N_dets):
p0 = params_init[det_idx]
coeff, covar = curve_fit(lambda x, *p0: wrapper_harms_func(x, modes, p0),
x, ys[det_idx], p0=p0, sigma=yerrs[det_idx] *
np.ones_like(ys[det_idx]),
absolute_sigma=True)
coeffs[det_idx, :] = coeff
covars[det_idx, :] = covar
yfit = harms_func(x, modes, coeff)
fitsig[det_idx, :] = harms_func(xn, modes, coeff)
redchi2s[det_idx] = np.sum(
((ys[det_idx] - yfit) / yerrs[det_idx])**2) / (x.shape[0] - 2*len(modes))
return fitsig, coeffs, covars, redchi2s
def estimate_sigma_tod(signal, hwp_angle):
"""
Estimate the noise level of a signal in a time-ordered data (TOD) using a half-wave plate (HWP) modulation.
Parameters
----------
signal : ndarray
A 2D numpy array of shape (n_dets, n_samps) containing the TOD of each detector.
hwp_angle : ndarray
A 1D numpy array containing the HWP angles in degrees.
Returns
-------
hwpss_sigma_tod : ndarray
A 1D numpy array containing the estimated noise level for each detector.
Notes
-----
This function computes the mean of the signal in each period of HWP rotation and multiplies it
by the square root of the number of samples in that period. The standard deviation of the
resulting values for all periods is then computed and returned as the estimated sigma of each data point.
"""
hwp_zeros_idxes = np.where(np.abs(np.diff(hwp_angle)) > 5)[0][:] + 1
hwpss_sigma_tod = np.zeros((signal.shape[0], hwp_zeros_idxes.shape[0] - 1))
for i, (init_idx, end_idx) in enumerate(zip(hwp_zeros_idxes[:-1], hwp_zeros_idxes[1:])):
hwpss_sigma_tod[:, i] = np.mean(
signal[:, init_idx:end_idx], axis=-1) * np.sqrt(end_idx - init_idx)
hwpss_sigma_tod = np.std(hwpss_sigma_tod, axis=-1)
return hwpss_sigma_tod
def subtract_hwpss(aman, signal=None, hwpss_template=None,
subtract_name='hwpss_remove'):
"""
Subtract the half-wave plate synchronous signal (HWPSS) template from the
signal in the given axis manager.
Parameters
----------
aman : AxisManager
The axis manager containing the signal to which the HWPSS template will
be applied.
signal : ndarray, optional
The signal to which the HWPSS template will be applied. If `signal` is
None (default), the signal contained in the axis manager will be used.
hwpss_template : ndarray, optional
The HWPSS template to be subtracted from the signal. If `hwpss_template`
is None (default), the HWPSS template stored in the axis manager under
the key 'hwpss_extract' will be used.
subtract_name : str, optional
The name of the output axis manager that will contain the HWPSS-
subtracted signal. Defaults to 'hwpss_remove'.
Returns
-------
None
"""
if signal is None:
signal = aman.signal
if hwpss_template is None:
hwpss_template = aman['hwpss_model']
aman.wrap(subtract_name, np.subtract(
signal, hwpss_template), [(0, 'dets'), (1, 'samps')])
[docs]
def demod_tod(aman, signal_name='signal', demod_mode=4,
bpf_cfg=None, lpf_cfg=None):
"""
Demodulate TOD based on HWP angle
Parameters
----------
aman : AxisManager
The AxisManager object
signal_name : str, optional
Axis name of the demodulated signal in aman. Default is 'signal'.
demod_mode : int, optional
Demodulation mode. Default is 4.
bpf_cfg : dict
Configuration for Band-pass filter applied to the TOD data before demodulation.
If not specified, a 4th-order Butterworth filter of
(demod_mode * HWP speed) +/- 0.95*(HWP speed) is used.
Example) bpf_cfg = {'type': 'butter4', 'center': 8.0, 'width': 3.8}
See filters.get_bpf for details.
lpf_cfg : dict
Configuration for Low-pass filter applied to the demodulated TOD data. If not specified,
a 4th-order Butterworth filter with a cutoff frequency of 0.95*(HWP speed)
is used.
Example) lpf_cfg = {'type': 'butter4', 'cutoff': 1.9}
See filters.get_lpf for details.
Returns
-------
None
The demodulated TOD data is added to the input `aman` container as new signals:
'dsT' for the original signal filtered with `lpf`, 'demodQ' for the demodulated
signal real component filtered with `lpf` and multiplied by 2, and 'demodU' for
the demodulated signal imaginary component filtered with `lpf` and multiplied by 2.
"""
# HWP speed in Hz
speed = (np.sum(np.abs(np.diff(np.unwrap(aman.hwp_angle)))) /
(aman.timestamps[-1] - aman.timestamps[0])) / (2 * np.pi)
if bpf_cfg is None:
bpf_center = demod_mode * speed
bpf_width = speed * 2. * 0.95
bpf_cfg = {'type': 'sine2',
'center': bpf_center,
'width': bpf_width,
'trans_width': 0.1}
bpf = filters.get_bpf(bpf_cfg)
if lpf_cfg is None:
lpf_cutoff = speed * 0.95
lpf_cfg = {'type': 'sine2',
'cutoff': lpf_cutoff,
'trans_width': 0.1}
lpf = filters.get_lpf(lpf_cfg)
phasor = np.exp(demod_mode * 1.j * aman.hwp_angle)
demod = tod_ops.fourier_filter(aman, bpf, detrend=None,
signal_name=signal_name) * phasor
# dsT
aman.wrap_new('dsT', dtype='float32', shape=('dets', 'samps'))
aman.dsT = aman[signal_name]
aman['dsT'] = tod_ops.fourier_filter(
aman, lpf, signal_name='dsT', detrend=None)
# demodQ
aman.wrap_new('demodQ', dtype='float32', shape=('dets', 'samps'))
aman['demodQ'] = demod.real
aman['demodQ'] = tod_ops.fourier_filter(
aman, lpf, signal_name='demodQ', detrend=None) * 2.
# demodU
aman.wrap_new('demodU', dtype='float32', shape=('dets', 'samps'))
aman['demodU'] = demod.imag
aman['demodU'] = tod_ops.fourier_filter(
aman, lpf, signal_name='demodU', detrend=None) * 2.