Source code for sotodlib.tod_ops.sub_polyf

import copy
import numpy as np
import logging
from scipy.special import eval_legendre
from sotodlib.tod_ops import flags
logger = logging.getLogger(__name__)

[docs] def subscan_polyfilter(aman, degree, signal_name="signal", exclude_turnarounds=False, mask=None, exclusive=True, method="legendre", in_place=True): """ Apply polynomial filtering to subscan segments in a data array. This function applies polynomial filtering to subscan segments within signal for each detector. Subscan segments are defined based on the presence of flags such as 'left_scan' and 'right_scan'. Polynomial filtering is used to remove low-degree polynomial trends within each subscan segment. Arguments --------- aman : AxisManager degree : int The degree of the polynomial to be removed. signal_name : string, optional The name of TOD signal to use. If not provided, `aman.signal` will be used. exclude_turnarounds : bool Optional. If True, turnarounds are excluded from subscan identification. Default is False. mask : str or RangesMatrix Optional. A mask used to select specific data points for filtering. If None, no mask is applied. If the mask is given in str, ``aman.flags['mask']`` is used as mask. Arbitrary mask can be specified in the style of RangesMatrix. exclusive : Optional. If True, the mask is used to exclude data from fitting. If False, the mask is used to include data for fitting. Default is True. method : str Optioal. Method to model the baseline of TOD. In `legendre` method, baseline model is constructed using orthonormality of Legendre function. In `polyfit` method, numpy.polyfit is used. `legendre` is faster. Default is `legendre`. in_place: bool Optional. If True, `aman.signal` is overwritten with the processed signal. Returns ------- signal : array-like The processed signal. """ if method not in ["polyfit", "legendre"] : raise ValueError("Only polyfit and legendre are acceptable.") if exclude_turnarounds: if ("left_scan" not in aman.flags) or ("turnarounds" not in aman.flags): logger.warning('aman does not have left/right scan or turnarounds flag. `sotodlib.flags.get_turnaround_flags` will be ran with default parameters') _ = flags.get_turnaround_flags(aman,truncate=True) ls_mask = aman.flags["left_scan"].mask() rs_mask = aman.flags["right_scan"].mask() ta_mask = aman.flags["turnarounds"].mask() valid_scan = np.logical_and(np.logical_or(ls_mask if ls_mask.ndim==1 else ls_mask[0], rs_mask if rs_mask.ndim==1 else rs_mask[0], ), ~ta_mask if ta_mask.ndim==1 else ta_mask[0]) subscan_indices = _get_subscan_range_index(valid_scan) else: if ("left_scan" not in aman.flags): logger.warning('aman does not have left/right scan. `sotodlib.flags.get_turnaround_flags` will be ran with default parameters') _ = flags.get_turnaround_flags(aman,truncate=True) ls_mask = aman.flags["left_scan"].mask() rs_mask = aman.flags["right_scan"].mask() ta_mask = aman.flags["turnarounds"].mask() subscan_indices_l = _get_subscan_range_index(ls_mask if ls_mask.ndim==1 else ls_mask[0]) subscan_indices_r = _get_subscan_range_index(rs_mask if rs_mask.ndim==1 else rs_mask[0]) subscan_indices = np.vstack([subscan_indices_l, subscan_indices_r]) subscan_indices= subscan_indices[np.argsort(subscan_indices[:, 0])] if subscan_indices[0][0] != 0: subscan_indices = np.vstack([ [0, subscan_indices[0][0]], subscan_indices]) if subscan_indices[-1][-1] != aman.samps.count: subscan_indices = np.vstack([ subscan_indices, [subscan_indices[-1][-1], aman.samps.count]]) signal = aman[signal_name] if not(in_place): signal = signal.copy() if mask is None: mask_array = np.zeros(aman.samps.count, dtype=bool) elif type(mask) is str: mask_array = aman.flags[mask].mask() else: mask_array = mask.mask() if exclusive is False: mask_array = ~mask_array is_matrix = len(mask_array.shape) > 1 if method == "polyfit": t = aman.timestamps - aman.timestamps[0] for i_det in range(aman.dets.count): if is_matrix: each_det_mask = mask_array[i_det] else: each_det_mask = mask_array for start, end in subscan_indices: if np.count_nonzero(~each_det_mask[start:end]) < degree: # If degree of freedom is lower than zero, just subtract mean logger.warning('polyfit degree is smaller than the number of valid data points') signal[i_det, start:end] -= np.mean(signal[i_det, start:end]) else: t_mean = np.mean(t[start:end]) pars = np.ma.polyfit( np.ma.array(t[start:end]-t_mean, mask=each_det_mask[start:end]), np.ma.array(signal[i_det,start:end], mask=each_det_mask[start:end]), deg=degree) signal[i_det,start:end] -= np.polyval(pars, t[start:end]-t_mean) elif method == "legendre": degree_corr = degree + 1 time = np.copy(aman["timestamps"]) for start, end in subscan_indices: ### Normalization constant of legendre function norm_vector = np.arange(degree_corr) norm_vector = 2./(2.*norm_vector+1) # Get each subscan to be filtered tod_mat = copy.deepcopy(signal[:, start:end]) # Scale time range into [-1,1] x = np.linspace(-1, 1, tod_mat.shape[1]) dx = np.mean(np.diff(x)) sub_time = time[start:end] # Generate legendre functions of each degree and store them in an array arr_legendre = np.array([eval_legendre(deg, x) for deg in range(degree_corr)]) # flag to know if the result is matrix formation flag_matrix = False # Modify TODs if mask is defined if mask is None : pass else : # if mask is matrix like, we should interpolate TOD det by det. if is_matrix : if np.sum((mask_array[:,start:end]).astype(np.int32)) > 0 : msk_indx = mask_array[:,start:end] for idet in range(tod_mat.shape[0]) : n_intep = np.sum((mask_array[idet,start:end]).astype(np.int32)) if n_intep > 0: if n_intep == tod_mat.shape[1] : continue interped = np.interp(np.flatnonzero(msk_indx[idet]),np.flatnonzero(~msk_indx[idet]), tod_mat[idet][~msk_indx[idet]]) tod_mat[idet,msk_indx[idet]] = interped else: # If mask does not affect this range, just go through. pass # If mask is array like, same ranges of each det will be interpolated. else : n_intep = np.sum((mask_array[start:end]).astype(np.int32)) if n_intep > 0 : if n_intep == tod_mat.shape[1] : continue msk_indx = mask_array[start:end] for idet in range(tod_mat.shape[0]) : interped = np.interp(np.flatnonzero(msk_indx),np.flatnonzero(~msk_indx), tod_mat[idet][~msk_indx]) tod_mat[idet,msk_indx] = interped else : pass means = np.mean(tod_mat, axis=1)[:, np.newaxis] tod_mat -= means # Make model to be subtracted coeffs = np.dot(arr_legendre, tod_mat.T) model = np.dot((coeffs/norm_vector[:, np.newaxis]).T,arr_legendre)*dx model += means signal[:,start:end] -= model if in_place: aman[signal_name] = signal return signal
def _get_subscan_range_index(scan_flag, _min=0): """ Get the indices of subscans in a binary flag array. This function identifies subscan ranges within a binary flag array, where subscans are defined as consecutive sequences of 'True' values in the input 'scan_flag' array. Parameters: - scan_flag (numpy.ndarray): A 1-dimensional binary flag array indicating the presence of subscans. - _min (int, optional): The minimum length of a subscan range to consider. Default is 0. Returns: - numpy.ndarray: A 2-column array containing the start and end indices of subscan ranges in 'scan_flag' where the length of each subscan is greater than or equal to '_min'. """ ones = np.where(scan_flag)[0] diff = np.diff(ones) starts = np.insert(ones[np.where(diff != 1)[0]+1], 0, ones[0]) ends = np.append(ones[np.where(diff != 1)[0] ], ones[-1]) indices = list(zip(starts, ends)) indices = np.array([(start, end+1) for start, end in indices if (end - start + 1 >= _min)]) return indices