Source code for sotodlib.hwp.hwp

import numpy as np
from scipy.optimize import curve_fit
from lmfit import Model as LmfitModel
from sotodlib import core, tod_ops
from sotodlib.tod_ops import filters, apodize, fft_ops
import logging

logger = logging.getLogger(__name__)


[docs] def get_hwpss(aman, signal=None, hwp_angle=None, bin_signal=True, bins=360, lin_reg=True, modes=[1, 2, 3, 4, 5, 6, 7, 8], apply_prefilt=True, prefilt_cfg=None, prefilt_detrend='linear', flags=None, apodize_edges=True, apodize_edges_samps=1600, apodize_flags=True, apodize_flags_samps=200, merge_stats=True, hwpss_stats_name='hwpss_stats', merge_model=True, hwpss_model_name='hwpss_model'): r""" Extracts HWP synchronous signal (HWPSS) from a time-ordered data (TOD) using linear regression or curve-fitting. The curve-fitting or linear regression are either run on the full time ordered data vs hwp angle or the time ordered data binned in hwp_angle. If the curve-fitting option is used it must be performed on the binned data. Parameters ---------- aman : AxisManager object The TOD to extract HWPSS from. signal : str or None The field name in the axis manager to use for the TOD signal. If not provided, ``signal`` will be used. hwp_angle : array-like, optional The HWP angle for each sample in `aman`. If not provided, `aman.hwp_angle` will be used. bin_signal : bool, optional Whether to bin the TOD signal into HWP angle bins before extracting HWPSS. Default is `True`. bins : int, optional The number of HWP angle bins to use if `bin_signal` is `True`. Default is 360. lin_reg : bool, optional Whether to use linear regression to extract HWPSS from the binned signal. If `False`, curve-fitting will be used instead. Default is `True`. modes : list of int, optional The HWPSS harmonic modes to extract. Default is [1, 2, 3, 4, 5, 6, 7, 8]. apply_prefilt : bool, optional Whether to apply a high-pass filter to signal before extracting HWPSS. Default is `True`. If run through preprocess and `signal` is not `aman.signal` then default to `False`. prefilt_cfg : dict, optional The configuration of the high-pass filter, in Hz. Only used if `apply_prefilt` is `True`. Default is sine2 filter of with cutoff frequency of 1.0 Hz and trans_width of 1.0 Hz. prefilt_detrend: str or None Method of detrending when you apply prefilter. Default is `linear`. If data is already detrended or you do not want to detrend, set it to `None`. flags : str or RangesMatrix or Ranges, optional Flags to be masked out before extracting HWPSS. If Default is None, and no mask will be applied. If provided by a string, `aman.flags.get(flags)` is used for the flags. merge_stats : bool, optional Whether to add the extracted HWPSS statistics to `aman` as new axes. Default is `True`. hwpss_stats_name : str, optional The name to use for the new field containing the HWPSS statistics if `merge_stats` is `True`. Default is 'hwpss_stats'. merge_extract : bool, optional Whether to add the extracted HWPSS to `aman` as a new signal field. Default is `True`. hwpss_extract_name : str, optional The name to use for the new signal field containing the extracted HWPSS if `merge_extract` is `True`. Default is 'hwpss_extract'. Returns ------- hwpss_stats : AxisManager object The extracted HWPSS and its statistics. The statistics include: - **coeffs** (n_dets x n_modes) : coefficients of the model .. math:: \sum_n \mathrm{coeffs}[2n]\sin{(\mathrm{modes}[n] \chi_{\mathrm{hwp}})} + \mathrm{coeffs}[2n+1]\cos{(\mathrm{modes}[n] \chi_{\mathrm{hwp}})} where the sum on n range(len(modes)). **Note**: n_modes is 2*len(modes) - **covars** (n_dets x n_modes x n_modes) : variance covariance matrix of the fitted coefficients for each detector. - **redchi2** (n_dets) : reduced chi^2 of the fit for each detector. **In the binned case the following are returned:** - **binned_angle** (n_bins) : binned version of hwp_angle in range (0, 2pi] with number of bins set by bins argument. - **bin_counts** (n_dets x n_bins): sample counts of each bin for each detector. - **binned_signal** (n_dets x n_bins) : binned signal for each detector. - **sigma_bin** (n_dets) : average over all bins of the standard deviation of the signal within each bin. **In the non-binned case the following are returned:** - **sigma_tod** (n_dets) : estimate of the standard deviation of the signal using function ``estimate_sigma_tod`` """ if prefilt_cfg is None: prefilt_cfg = {'type': 'sine2', 'cutoff': 1.0, 'trans_width': 1.0} prefilt = filters.get_hpf(prefilt_cfg) if signal is None: #signal_name variable to be deleted when tod_ops.fourier_filter is updated signal_name = 'signal' signal = aman[signal_name] elif isinstance(signal, str): signal_name = signal signal = aman[signal_name] elif isinstance(signal, np.ndarray): raise TypeError("Currently ndarray not supported, need update to tod_ops.fourier_filter module to remove signal_name argument.") else: raise TypeError("Signal must be None, str, or ndarray") if apply_prefilt: # This requires signal to be a string. signal = np.array(tod_ops.fourier_filter( aman, prefilt, detrend=prefilt_detrend, signal_name=signal_name) ) if hwp_angle is None: hwp_angle = aman.hwp_angle # define hwpss_stats mode_names = [] for mode in modes: mode_names.append(f'S{mode}') mode_names.append(f'C{mode}') hwpss_stats = core.AxisManager(aman.dets, core.LabelAxis( name='modes', vals=np.array(mode_names, dtype='<U3'))) if bin_signal: hwp_angle_bin_centers, bin_counts, binned_hwpss, hwpss_sigma_bin = get_binned_hwpss( aman, signal, hwp_angle=None, bins=bins, flags=flags, apodize_edges=apodize_edges, apodize_edges_samps=apodize_edges_samps, apodize_flags=apodize_flags, apodize_flags_samps=apodize_flags_samps,) # check bin count num_invalid_bins = np.count_nonzero(np.isnan(binned_hwpss[0][:])) if num_invalid_bins > 0: logger.warning(f'There are {num_invalid_bins} bins with zero samples. ' + 'You maybe using simulation data whose hwp speed is perfectly constant, ' + 'or your specification of number of bins is too large.') # wrap hwpss_stats.wrap('binned_angle', hwp_angle_bin_centers, [ (0, core.IndexAxis('bin_hwp_samps', count=bins))]) hwpss_stats.wrap('bin_counts', bin_counts, [ (0, 'dets'), (1, 'bin_hwp_samps')]) hwpss_stats.wrap('binned_signal', binned_hwpss, [ (0, 'dets'), (1, 'bin_hwp_samps')]) hwpss_stats.wrap('sigma_bin', hwpss_sigma_bin, [(0, 'dets')]) if lin_reg: fitsig_binned, coeffs, covars, redchi2s = hwpss_linreg( x=hwp_angle_bin_centers, ys=binned_hwpss, yerrs=hwpss_sigma_bin, modes=modes) else: params_init = guess_hwpss_params( x=hwp_angle_bin_centers, ys=binned_hwpss, modes=modes) fitsig_binned, coeffs, covars, redchi2s = hwpss_curvefit(x=hwp_angle_bin_centers, ys=binned_hwpss, yerrs=hwpss_sigma_bin, modes=modes, params_init=params_init) # tod template fitsig_tod = harms_func(hwp_angle, modes, coeffs) # wrap the optimal values and stats hwpss_stats.wrap('binned_model', fitsig_binned, [(0, 'dets'), (1, 'bin_hwp_samps')]) hwpss_stats.wrap('coeffs', coeffs, [(0, 'dets'), (1, 'modes')]) hwpss_stats.wrap('covars', covars, [ (0, 'dets'), (1, 'modes'), (2, 'modes')]) hwpss_stats.wrap('redchi2s', redchi2s, [(0, 'dets')]) else: if isinstance(flags, str): flags = aman.flags.get(flags) if flags is None: m = np.ones([aman.dets.count, aman.samps.count], dtype=bool) else: m = ~flags.mask() hwpss_sigma_tod = estimate_sigma_tod(signal, hwp_angle) hwpss_stats.wrap('sigma_tod', hwpss_sigma_tod, [(0, 'dets')]) if lin_reg: fitsig_tod, coeffs, covars, redchi2s = hwpss_linreg( x=hwp_angle, ys=signal, yerrs=hwpss_sigma_tod, modes=modes) else: raise ValueError('Curve-fitting for TOD are specified.' + 'It will take too long time and return meaningless result.' + 'Specify (bin_signal, lin_reg) = (True, True) or (True, False) or (False, True)') hwpss_stats.wrap('coeffs', coeffs, [(0, 'dets'), (1, 'modes')]) hwpss_stats.wrap('covars', covars, [ (0, 'dets'), (1, 'modes'), (2, 'modes')]) hwpss_stats.wrap('redchi2s', redchi2s, [(0, 'dets')]) if merge_stats: aman.wrap(hwpss_stats_name, hwpss_stats) if merge_model: aman.wrap(hwpss_model_name, fitsig_tod, [(0, 'dets'), (1, 'samps')]) return hwpss_stats
def get_binned_hwpss(aman, signal=None, hwp_angle=None, bins=360, flags=None, apodize_edges=True, apodize_edges_samps=1600, apodize_flags=True, apodize_flags_samps=200): """ Bin time-ordered data by the HWP angle and return the binned signal and its standard deviation. Parameters ---------- aman : TOD The Axismanager object to be binned. signal : str, optional The name of the signal to be binned. Defaults to aman.signal if not specified. hwp_angle : str, optional The name of the timestream of hwp_angle. Defaults to aman.hwp_angle if not specified. bins : int, optional The number of HWP angle bins to use. Default is 360. flags : str or RangesMatrix or Ranges. optional Flag indicating whether to exclude flagged samples when binning the signal. If provided by a string, `aman.flags.get(flags)` is used for the flags. Default is no mask applied. apodize_edges : bool, optional If True, applies an apodization window to the edges of the signal. Defaults to True. apodize_edges_samps : int, optional The number of samples over which to apply the edge apodization window. Defaults to 1600. apodize_flags : bool, optional If True, applies an apodization window based on the flags. Defaults to True. apodize_flags_samps : int, optional The number of samples over which to apply the flags apodization window. Defaults to 200. Returns ------- aman_proc: The AxisManager object which contains * center of each bin of hwp_angle * binned hwp synchrounous signal * estimated sigma of binned signal """ if signal is None: signal = aman.signal if hwp_angle is None: hwp_angle = aman['hwp_angle'] if apodize_edges: weight_for_signal = apodize.get_apodize_window_for_ends(aman, apodize_samps=apodize_edges_samps) if isinstance(flags, str): flags = aman.flags.get(flags) if (flags is not None) and apodize_flags: flags_mask = flags.mask() if flags_mask.ndim == 1: flag_is_1d = True else: all_columns_same = np.all(np.all(flags_mask == flags_mask[0, :], axis=0)) if all_columns_same: flag_is_1d = True flags_mask = flags_mask[0] else: flag_is_1d = False if flag_is_1d: weight_for_signal = weight_for_signal * apodize.get_apodize_window_from_flags(aman, flags=flags, apodize_samps=apodize_flags_samps) else: weight_for_signal = weight_for_signal[np.newaxis, :] * apodize.get_apodize_window_from_flags(aman, flags=flags, apodize_samps=apodize_flags_samps) else: if (flags is not None) and apodize_flags: weight_for_signal = apodize.get_apodize_window_from_flags(aman, flags=flags, apodize_samps=apodize_flags_samps) else: weight_for_signal = None binning_dict = tod_ops.bin_signal(aman, bin_by=hwp_angle, range=[0, 2*np.pi], bins=bins, signal=signal, flags=flags, weight_for_signal=weight_for_signal) bin_centers = binning_dict['bin_centers'] bin_counts = binning_dict['bin_counts'] binned_hwpss = binning_dict['binned_signal'] binned_hwpss_sigma = binning_dict['binned_signal_sigma'] # use median of sigma of each bin as uniform sigma for a detector hwpss_sigma = np.nanmedian(binned_hwpss_sigma, axis=-1) return bin_centers, bin_counts, binned_hwpss, hwpss_sigma def hwpss_linreg(x, ys, yerrs, modes): """ Performs a linear regression of the input data ys as a function of x, using a set of sine and cosine basis functions defined by the input modes. Returns the fitted signal, the coefficients of the basis functions, their covariance matrix, and the reduced chi-square. Parameters ----------- x : numpy.ndarray The independent variable values of the data points to fit. ys : numpy.ndarray The dependent variable values of the data points to fit. yerrs : numpy.ndarray The error estimates of the dependent variable values. modes : list of int The frequencies of the sine and cosine basis functions to use. Returns ------- fitsig : numpy.ndarray The fitted signal, obtained by evaluating the model with the optimal coefficients. coeffs : numpy.ndarray The coefficients of the sine and cosine basis functions that best fit the data. covars : numpy.ndarray The covariance matrix of the coefficients, estimated from the data errors. redchi2s : numpy.ndarray The reduced chi-square statistic of the fit, computed for each data point. """ m = np.isnan(ys[0][:]) xn = np.copy(x) x = x[~m] ys = ys[:, ~m] vects = np.zeros([2*len(modes), x.shape[0]], dtype='float32') for i, mode in enumerate(modes): vects[2*i, :] = np.sin(mode*x) vects[2*i+1, :] = np.cos(mode*x) I = np.linalg.inv(np.tensordot(vects, vects, (1, 1))) coeffs = np.matmul(ys, vects.T) coeffs = np.dot(I, coeffs.T).T fitsig = harms_func(x, modes, coeffs) # covariance of coefficients covars = np.zeros((ys.shape[0], 2*len(modes), 2*len(modes))) for det_idx in range(ys.shape[0]): covars[det_idx, :, :] = I * yerrs[det_idx]**2 # reduced chi-square redchi2s = np.sum( ((ys - fitsig)/yerrs[:, np.newaxis])**2, axis=-1) / (x.shape[0] - 2*len(modes)) fitsig = harms_func(xn, modes, coeffs) return fitsig, coeffs, covars, redchi2s def harms_func(x, modes, coeffs): """ calculates the harmonics function given the input values, modes and coefficients. Parameters ---------- x (numpy.ndarray): Input values modes (list): List of modes to be used in the harmonics function coeffs (numpy.ndarray): Coefficients of the harmonics function Returns ------- numpy.ndarray: The calculated harmonics function. """ vects = np.zeros([2*len(modes), x.shape[0]], dtype=coeffs.dtype if coeffs is not None else 'float32') for i, mode in enumerate(modes): vects[2*i, :] = np.sin(mode*x) vects[2*i+1, :] = np.cos(mode*x) if coeffs is None: return vects else: harmonics = np.matmul(coeffs, vects, casting='no') return harmonics def guess_hwpss_params(x, ys, modes): """ Compute initial guess for the coefficients of a harmonics-based fit to data. Parameters ---------- x : array-like of shape (nsamps,) ys : array-like of shape (ndets, nsamps) modes : array-like of shape (nmodes,) List of modes to use in the fit. Returns ------- Params_init : ndarray of shape (m, 2*p) Initial guess for the coefficients of a harmonics-based fit to the data. """ m = np.isnan(ys[0][:]) x = x[~m] ys = ys[:, ~m] vects = harms_func(x, modes, coeffs=None) for i, mode in enumerate(modes): vects[2*i, :] = np.sin(mode*x) vects[2*i+1, :] = np.cos(mode*x) params_init = 2 * np.matmul(ys, vects.T) / x.shape[0] return params_init def wrapper_harms_func(x, modes, *args): """ A wrapper function for the harmonics function to be used for fitting data using Scipy's curve-fitting algorithm. Parameters ---------- x : array-like The x-values of the data points to be fitted. modes : array-like An array of integers representing the modes of the harmonics function. *args : tuple A tuple of arguments. The first argument should be an array of coefficients used to calculate the harmonics function. Returns ------- y : array-like An array of the same length as x representing the values of the harmonics function evaluated at x using the given modes and coefficients. """ coeffs = np.array(args[0]) return harms_func(x, modes, coeffs) def hwpss_curvefit(x, ys, yerrs, modes, params_init=None): """ Fit harmonics to input data using scipy's curve_fit method. Parameters ---------- x : array_like 1-D array of x values. ys : array_like 2-D array of y values for each detector. yerrs : array_like 1-D array of the standard deviation of the y values for each detector. modes : array_like 1-D array of mode numbers to be fitted. params_init : array_like, optional 2-D array of initial parameter values for each detector. Default is None. Returns ------- fitsig : ndarray 2-D array of the fitted values for each detector. coeffs : ndarray 2-D array of the fitted coefficients for each detector. covars : ndarray 3-D array of the covariance matrix of the fitted coefficients for each detector. redchi2s : ndarray 1-D array of the reduced chi-square values for each detector. Notes ----- This function fits a set of harmonic functions to the input data using scipy's curve_fit method. The `modes` parameter specifies the mode numbers to be fitted. The `params_init` parameter can be used to provide initial guesses for the fit parameters. """ N_dets = ys.shape[0] N_samps = ys.shape[-1] N_modes = len(modes) # Handle binned data w/ 0 counts in a bin. m = np.isnan(ys[0][:]) xn = np.copy(x) x = x[~m] ys = ys[:, ~m] if params_init is None: params_init = np.zeros((N_dets, 2*N_modes)) coeffs = np.zeros((N_dets, 2*len(modes))) covars = np.zeros((N_dets, 2*len(modes), 2*len(modes))) redchi2s = np.zeros(N_dets) fitsig = np.zeros((N_dets, N_samps)) for det_idx in range(N_dets): p0 = params_init[det_idx] coeff, covar = curve_fit(lambda x, *p0: wrapper_harms_func(x, modes, p0), x, ys[det_idx], p0=p0, sigma=yerrs[det_idx] * np.ones_like(ys[det_idx]), absolute_sigma=True) coeffs[det_idx, :] = coeff covars[det_idx, :] = covar yfit = harms_func(x, modes, coeff) fitsig[det_idx, :] = harms_func(xn, modes, coeff) redchi2s[det_idx] = np.sum( ((ys[det_idx] - yfit) / yerrs[det_idx])**2) / (x.shape[0] - 2*len(modes)) return fitsig, coeffs, covars, redchi2s def estimate_sigma_tod(signal, hwp_angle): """ Estimate the noise level of a signal in a time-ordered data (TOD) using a half-wave plate (HWP) modulation. Parameters ---------- signal : ndarray A 2D numpy array of shape (n_dets, n_samps) containing the TOD of each detector. hwp_angle : ndarray A 1D numpy array containing the HWP angles in degrees. Returns ------- hwpss_sigma_tod : ndarray A 1D numpy array containing the estimated noise level for each detector. Notes ----- This function computes the mean of the signal in each period of HWP rotation and multiplies it by the square root of the number of samples in that period. The standard deviation of the resulting values for all periods is then computed and returned as the estimated sigma of each data point. """ hwp_zeros_idxes = np.where(np.abs(np.diff(hwp_angle)) > 5)[0][:] + 1 hwpss_sigma_tod = np.zeros((signal.shape[0], hwp_zeros_idxes.shape[0] - 1)) for i, (init_idx, end_idx) in enumerate(zip(hwp_zeros_idxes[:-1], hwp_zeros_idxes[1:])): hwpss_sigma_tod[:, i] = np.mean( signal[:, init_idx:end_idx], axis=-1) * np.sqrt(end_idx - init_idx) hwpss_sigma_tod = np.std(hwpss_sigma_tod, axis=-1) return hwpss_sigma_tod def subtract_hwpss(aman, signal='signal', hwpss_template_name='hwpss_model', subtract_name='hwpss_remove', in_place=False, remove_template=True): """ Subtract the half-wave plate synchronous signal (HWPSS) template from the signal in the given axis manager. Parameters ---------- aman : AxisManager The axis manager containing the signal and the HWPSS template. signal : str, optional The name of the field in the axis manager containing the signal to be processed. Defaults to 'signal'. hwpss_template_name : str, optional The name of the field in the axis manager containing the HWPSS template. Defaults to 'hwpss_model'. subtract_name : str, optional The name of the field in the axis manager that will store the HWPSS-subtracted signal. Only used if in_place is False. Defaults to 'hwpss_remove'. in_place : bool, optional If True, the subtraction is done in place, modifying the original signal in the axis manager. If False, the result is stored in a new field specified by subtract_name. Defaults to False. remove_template : bool, optional If True, the HWPSS template field is removed from the axis manager after subtraction. Defaults to True. Returns ------- None """ if signal is None: signal_name = 'signal' signal = aman[signal_name] elif isinstance(signal, str): signal_name = signal signal = aman[signal_name] elif isinstance(signal, np.ndarray): if np.shape(signal) != (aman.dets.count, aman.samps.count): raise ValueError("When passing signal as ndarray shape must match (n_dets x n_samps).") signal_name = None else: raise TypeError("Signal must be None, str, or ndarray") if in_place: if signal_name is None: signal -= aman[hwpss_template_name].astype(signal.dtype) else: aman[signal_name] -= aman[hwpss_template_name].astype(aman[signal_name].dtype) else: if subtract_name in aman._fields: aman[subtract_name] = np.subtract(signal, aman[hwpss_template_name], dtype='float32') else: aman.wrap(subtract_name, np.subtract(signal, aman[hwpss_template_name], dtype='float32'), [(0, 'dets'), (1, 'samps')]) if remove_template: aman.move(hwpss_template_name, None) def get_hwp_freq(timestamps, hwp_angle): """ Calculate the frequency of HWP rotation. Parameters: timestamps (array-like): An array of timestamps. hwp_angle (array-like): An array of HWP angles in radian Returns: float: The frequency of the HWP rotation in Hz. """ hwp_freq = (np.sum(np.abs(np.diff(np.unwrap(hwp_angle)))) / (timestamps[-1] - timestamps[0])) / (2 * np.pi) return hwp_freq
[docs] def demod_tod(aman, signal=None, demod_mode=4, bpf_cfg=None, lpf_cfg=None, wrap=True): """ Demodulate TOD based on HWP angle Parameters ---------- aman : AxisManager The AxisManager object signal : str, optional Axis name of the signal to demodulate in aman. Default is 'signal'. demod_mode : int, optional Demodulation mode. Default is 4 (i.e. 4th harmonic of HWP). bpf_cfg : dict Configuration for Band-pass filter applied to the TOD data before demodulation. If not specified, a sine-squared bandwidth filter of (demod_mode * HWP speed) +/- 0.95*(HWP speed) is used with transition width 0.1. Example) bpf_cfg = {'type': 'sine2', 'center': 8.0, 'width': 3.8, 'trans_width': 0.1} or bpf_cfg = {'type': 'sine2', 'center': '4*f_HWP', 'width': '1.9*f_HWP', 'trans_width': 0.1} See filters.get_bpf for details. lpf_cfg : dict Configuration for Low-pass filter applied to the demodulated TOD data. If not specified, a sine-squared filter with a cutoff frequency of 0.95*(HWP speed) and transition width 0.1 is used. Example) lpf_cfg = {'type': 'sine2', 'cutoff': 1.9, 'trans_width': 0.1} or lpf_cfg = {'type': 'sine2', 'cutoff': '0.95*f_HWP', 'trans_width': 0.1} See filters.get_lpf for details. wrap : bool, optional If True, the demodulated signal is wrapped and stored in the input aman container. If False, the demodulated signal is returned. Returns ------- None The demodulated TOD data is added to the input `aman` container as new signals: 'dsT' for the original signal filtered with `lpf`, 'demodQ' for the demodulated signal real component filtered with `lpf` and multiplied by 2, and 'demodU' for the demodulated signal imaginary component filtered with `lpf` and multiplied by 2. """ if signal is None: #signal_name variable to be deleted when tod_ops.fourier_filter is updated signal_name = 'signal' signal = aman[signal_name] elif isinstance(signal, str): signal_name = signal signal = aman[signal_name] elif isinstance(signal, np.ndarray): raise TypeError("Currently ndarray not supported, need update to tod_ops.fourier_filter module to remove signal_name argument.") else: raise TypeError("Signal must be None, str, or ndarray") # HWP speed in Hz speed = get_hwp_freq(timestamps=aman.timestamps, hwp_angle=aman.hwp_angle) if bpf_cfg is None: bpf_center = demod_mode * speed bpf_width = speed * 2. * 0.95 bpf_cfg = {'type': 'sine2', 'center': bpf_center, 'width': bpf_width, 'trans_width': 0.1} for k, v in bpf_cfg.items(): if isinstance(v, str): if k in ['center', 'width']: bpf_cfg[k] = speed*float(v.split('*')[0]) bpf = filters.get_bpf(bpf_cfg) if lpf_cfg is None: lpf_cutoff = speed * 0.95 lpf_cfg = {'type': 'sine2', 'cutoff': lpf_cutoff, 'trans_width': 0.1} for k, v in lpf_cfg.items(): if isinstance(v, str): if k == 'cutoff': lpf_cfg[k] = speed*float(v.split('*')[0]) lpf = filters.get_lpf(lpf_cfg) # Create a RFFT object to reuse n = fft_ops.find_superior_integer(aman.samps.count) rfft = fft_ops.RFFTObj.for_shape(aman.dets.count, n, 'BOTH') phasor = np.empty_like(aman.hwp_angle, dtype=np.promote_types(signal.dtype, np.complex64)) np.exp((demod_mode * 1j * aman.hwp_angle), out=phasor) demod = tod_ops.fourier_filter(aman, bpf, detrend=None, signal_name=signal_name, rfft=rfft) * 2. * phasor # Filter the demodulated signal demod_aman = core.AxisManager(aman.dets, aman.samps) demod_aman.wrap("timestamps", aman.timestamps, axis_map=[(0, 'samps')]) demod_aman.wrap("dsT", aman[signal_name].copy(), axis_map=[(0, 'dets'), (1, 'samps')]) demod_aman["dsT"][:] = tod_ops.fourier_filter(demod_aman, lpf, signal_name='dsT', detrend=None, rfft=rfft) demod_aman.wrap("demodQ", demod.real, axis_map=[(0, 'dets'), (1, 'samps')]) demod_aman["demodQ"][:] = tod_ops.fourier_filter(demod_aman, lpf, signal_name="demodQ", detrend=None, rfft=rfft) demod_aman.wrap("demodU", demod.imag, axis_map=[(0, 'dets'), (1, 'samps')]) demod_aman["demodU"][:] = tod_ops.fourier_filter(demod_aman, lpf, signal_name="demodU", detrend=None, rfft=rfft) # Destroy the RFFT object del rfft # Either wrap or return the demodulated signal if wrap: for fld in ['dsT', 'demodQ', 'demodU']: aman.wrap(fld, demod_aman[fld], axis_map=[(0, 'dets'), (1, 'samps')]) del demod_aman else: return demod_aman['dsT'], demod_aman['demodQ'], demod_aman['demodU']
def tau_model(fhwp, tau, AQ, AU, mode): return (AQ + 1j*AU) / (1 - 1j* mode * 2 * np.pi * tau * fhwp ) def get_tau_hwp( aman, signal='signal', lpf_cfg=None, bpf_cfg=None, width=1000, apodize_samps=2000, trim_samps=2000, min_fhwp=1., max_fhwp=2., demod_mode=4, wn=None, flags=None, full_output=False, merge=False, name='tau_hwp' ): """ Analyze observation with hwp spinning up or spinning down and compute the timeconstant of detectors. This demoulate tod and estimate timeconstant from hwp rotation speed depdndence of half-wave plate synchronous signal. Parameters ---------- aman : AxisManager AxisManager object containing the TOD data. signal: std optional Name of signal to process. Default is `signal`. lpf_cfg: dict optional Configuration for Low-pass filter applied before demodulation. bpf_cfg: dict optional Configuration for Band-pass filter applied before demodulation. width: int optional width of single section of TOD. apodize_samps: int optional Number of samples on tod ends to apodize. trim_samps: int optional Number of samples on tod ends to trim. min_fhwp: float optional Mininum rotation frequency of hwp to be used for calculation. max_fhwp: float optional Maximum rotation frequency of hwp to be used for calculation. demod_mode: int, optional Demodulation mode. Default is 4. flags: str optional Name of flags in `aman.flags` to use for fitting. wn: str or None Precomputed white noise level of signal to be used for weights of fitting. If none, the fitting weights will be 1. full_output: bool optional Whether to output all the statistics used for fitting. merge: bool optional Whether to merge calculated statistics to `aman`. Default is False. name: str optional Name under which to wrap the calculated statistics. Default is 'tau_hwp'. Returns ------- result : AxisManager An AxisManager containing time constants, their errors, and reduced chi-squared statistics. """ if lpf_cfg is None: lpf_cfg = {'type': 'sine2', 'cutoff': min_fhwp * 0.95, 'trans_width': 0.1} if bpf_cfg is None: # no band pass filter bpf_cfg = {'type': 'sine2', 'center': 0, 'width': 200, 'trans_width': 0.1} hwp_rate = np.gradient(np.unwrap(aman.hwp_angle) * 200 / 2 / np.pi) if 'hwp_rate' not in aman: aman.wrap('hwp_rate', hwp_rate, [(0, 'samps')]) if wn is None: # Calculate white noise from section with low hwp speed. idx = np.where(abs(hwp_rate) < .5)[0] s = aman.samps.offset + idx[0] e = aman.samps.offset + idx[-1] aman_short = aman.restrict('samps', (s, e), in_place=False) freqs, Pxx, nseg = fft_ops.calc_psd(aman_short, signal=aman_short.get(signal), noverlap=0, full_output=True) wn = fft_ops.calc_wn(aman_short, pxx=Pxx, freqs=freqs, nseg=nseg) tod_ops.detrend_tod(aman, signal_name=signal, method="median") tod_ops.apodize.apodize_cosine(aman, signal_name=signal, apodize_samps=apodize_samps) demod_tod(aman, signal=signal, demod_mode=demod_mode, lpf_cfg=lpf_cfg, bpf_cfg=bpf_cfg, wrap=True) idx = np.where((min_fhwp < abs(hwp_rate)) & (abs(hwp_rate) < max_fhwp))[0] s = idx[0] if trim_samps < idx[0] else trim_samps e = idx[-1] if trim_samps < aman.samps.count - idx[-1] \ else aman.samps.count - trim_samps s += aman.samps.offset e += aman.samps.offset aman.restrict('samps', (s, e), in_place=True) nsections = int(aman.samps.count / width) timestamps = np.zeros(nsections) hwp_rate = np.zeros(nsections) demodQ = np.zeros((nsections, aman.dets.count)) demodU = np.zeros((nsections, aman.dets.count)) weights = np.zeros((nsections, aman.dets.count)) mask = np.ones((nsections, aman.dets.count), dtype=bool) if isinstance(flags, str) and (flags in aman.flags): flags = ~aman.flags[flags].mask() if flags.ndim == 1: for i in range(nsections): msk = np.zeros(aman.samps.count, dtype=bool) msk[i*width:(i+1)*width] = flags[i*width:(i+1)*width] timestamps[i] = np.median(aman.timestamps[msk]) hwp_rate[i] = np.median(aman.hwp_rate[msk]) demodQ[i] = np.median(aman.demodQ[:, msk], axis=1) demodU[i] = np.median(aman.demodU[:, msk], axis=1) count = np.sum(msk) weights[i] = np.sqrt(count/200)/wn mask[i, :] = False if count == 0 else True elif flags.ndim == 2: for i in range(nsections): timestamps[i] = np.median(aman.timestamps[i*width:(i+1)*width]) hwp_rate[i] = np.median(aman.hwp_rate[i*width:(i+1)*width]) for j in range(aman.dets.count): msk = np.zeros(aman.samps.count, dtype=bool) msk[i*width:(i+1)*width] = flags[j, i*width:(i+1)*width] demodQ[i, j] = np.median(aman.demodQ[j, msk]) demodU[i, j] = np.median(aman.demodU[j, msk]) count = np.sum(msk) weights[i, j] = np.sqrt(count/200)/wn[j] mask[i, j] = False if count == 0 else True else: for i in range(nsections): timestamps[i] = np.median(aman.timestamps[i*width:(i+1)*width]) hwp_rate[i] = np.median(aman.hwp_rate[i*width:(i+1)*width]) demodQ[i] = np.median(aman.demodQ[:, i*width:(i+1)*width], axis=1) demodU[i] = np.median(aman.demodU[:, i*width:(i+1)*width], axis=1) weights[i] = np.sqrt(width/200)/wn logger.debug('Fit time constant') AQ = np.full(aman.dets.count, np.nan) AQ_error = np.full(aman.dets.count, np.nan) AU = np.full(aman.dets.count, np.nan) AU_error = np.full(aman.dets.count, np.nan) tau = np.full(aman.dets.count, np.nan) tau_error = np.full(aman.dets.count, np.nan) redchi2s = np.full(aman.dets.count, np.nan) for i in range(aman.dets.count): try: model = LmfitModel(tau_model, independent_vars=['fhwp']) model.set_param_hint('mode', vary=False) params = model.make_params( tau=1e-3, AQ=np.median(demodQ[:, i][mask[:, i]]), AU=np.median(demodU[:, i][mask[:, i]]), mode=demod_mode, ) fit = model.fit( data=demodQ[:, i][mask[:, i]] + 1j * demodU[:, i][mask[:, i]], params=params, fhwp=hwp_rate[mask[:, i]], weights=weights[:, i][mask[:, i]] ) AQ[i] = fit.params['AQ'].value AQ_error[i] = fit.params['AQ'].stderr AU[i] = fit.params['AU'].value AU_error[i] = fit.params['AU'].stderr tau[i] = fit.params['tau'].value tau_error[i] = fit.params['tau'].stderr redchi2s[i] = fit.redchi except Exception: logger.debug(f'Failed to fit {aman.dets.vals[i]}') if full_output: sections = core.OffsetAxis('sections', count=nsections, offset=0) result = core.AxisManager(aman.dets, sections) result.wrap('timestamps', timestamps, axis_map=[(0, 'sections')]) result.wrap('hwp_rate', hwp_rate, axis_map=[(0, 'sections')]) result.wrap('demodQ', np.transpose(demodQ), axis_map=[(0, 'dets'), (1, 'sections')]) result.wrap('demodU', np.transpose(demodU), axis_map=[(0, 'dets'), (1, 'sections')]) result.wrap('weights', np.transpose(weights), axis_map=[(0, 'dets'), (1, 'sections')]) result.wrap('mask', np.transpose(mask), axis_map=[(0, 'dets'), (1, 'sections')]) else: result = core.AxisManager(aman.dets) result.wrap('AQ', AQ, axis_map=[(0, 'dets')]) result.wrap('AQ_error', AQ_error, axis_map=[(0, 'dets')]) result.wrap('AU', AU, axis_map=[(0, 'dets')]) result.wrap('AU_error', AU_error, axis_map=[(0, 'dets')]) result.wrap('tau_hwp', tau, axis_map=[(0, 'dets')]) result.wrap('tau_hwp_error', tau_error, axis_map=[(0, 'dets')]) result.wrap('redchi2s', redchi2s, axis_map=[(0, 'dets')]) if merge: aman.wrap(name, result) return result