import numpy as np
import logging
from . import flags
logger = logging.getLogger(__name__)
[docs]
def subscan_polyfilter(aman, degree, signal=None, exclude_turnarounds=False, mask=None, in_place=True):
"""
Apply polynomial filtering to subscan segments in a data array.
This function applies polynomial filtering to subscan segments within signal for each detector.
Subscan segments are defined based on the presence of flags such as 'left_scan' and 'right_scan'. Polynomial filtering
is used to remove low-degree polynomial trends within each subscan segment.
Arguments
---------
aman : AxisManager
degree : int
The degree of the polynomial to be removed.
signal : array-like, optional
The TOD signal to use. If not provided, `aman.signal` will be used.
exclude_turnarounds : bool
Optional. If True, turnarounds are excluded from subscan identification. Default is False.
mask : str or RangesMatrix
Optional. A mask used to select specific data points for filtering. Default is None.
If None, no mask is applied. If the mask is given in str, ``aman.flags['mask']`` is used as mask.
Arbitrary mask can be specified in the style of RangesMatrix.
in_place: bool
Optional. If True, `aman.signal` is overwritten with the processed signal.
Returns
-------
signal : array-like
The processed signal.
"""
if signal is None:
signal = aman.signal
if not(in_place):
signal = signal.copy()
if exclude_turnarounds:
if ("left_scan" not in aman.flags) or ("turnarounds" not in aman.flags):
logger.warning('aman does not have left/right scan or turnarounds flag. `sotodlib.flags.get_turnaround_flags` will be ran with default parameters')
_ = flags.get_turnaround_flags(aman)
valid_scan = np.logical_and(np.logical_or(aman.flags["left_scan"].mask(), aman.flags["right_scan"].mask()),
~aman.flags["turnarounds"].mask())
subscan_indices = _get_subscan_range_index(valid_scan)
else:
if ("left_scan" not in aman.flags):
logger.warning('aman does not have left/right scan. `sotodlib.flags.get_turnaround_flags` will be ran with default parameters')
_ = flags.get_turnaround_flags(aman)
subscan_indices_l = _get_subscan_range_index(aman.flags["left_scan"].mask())
subscan_indices_r = _get_subscan_range_index(aman.flags["right_scan"].mask())
subscan_indices = np.vstack([subscan_indices_l, subscan_indices_r])
subscan_indices= subscan_indices[np.argsort(subscan_indices[:, 0])]
if mask is None:
mask_array = np.zeros(aman.samps.count, dtype=bool)
elif type(mask) is str:
mask_array = aman.flags[mask].mask()
else:
mask_array = mask.mask()
is_matrix = len(mask_array.shape) > 1
t = aman.timestamps - aman.timestamps[0]
for i_det in range(aman.dets.count):
if is_matrix:
each_det_mask = mask_array[i_det]
else:
each_det_mask = mask_array
for start, end in subscan_indices:
if np.count_nonzero(~each_det_mask[start:end+1]) < degree:
# If degree of freedom is lower than zero, just subtract mean
signal[i_det, start:end+1] -= np.mean(signal[i_det, start:end+1])
else:
t_mean = np.mean(t[start:end+1])
pars = np.ma.polyfit(
np.ma.array(t[start:end+1]-t_mean, mask=each_det_mask[start:end+1]),
np.ma.array(signal[i_det,start:end+1], mask=each_det_mask[start:end+1]),
deg=degree)
signal[i_det,start:end+1] -= np.polyval(pars, t[start:end+1]-t_mean)
if in_place:
aman.signal = signal
return signal
def _get_subscan_range_index(scan_flag,_min=0):
"""
Get the indices of subscans in a binary flag array.
This function identifies subscan ranges within a binary flag array, where subscans are defined as consecutive
sequences of 'True' values in the input 'scan_flag' array.
Parameters:
- scan_flag (numpy.ndarray): A 1-dimensional binary flag array indicating the presence of subscans.
- _min (int, optional): The minimum length of a subscan range to consider. Default is 0.
Returns:
- numpy.ndarray: A 2-column array containing the start and end indices of subscan ranges in 'scan_flag' where
the length of each subscan is greater than or equal to '_min'.
"""
ones = np.where(scan_flag)[0]
diff = np.diff(ones)
starts = np.insert(ones[np.where(diff != 1)[0]+1], 0, ones[0])
ends = np.append(ones[np.where(diff != 1)[0] ], ones[-1])
indices = list(zip(starts, ends))
indices = np.array([(start, end) for start, end in indices if (end - start + 1 >= _min)])
return indices