#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import numpy as np
import scipy.interpolate
import h5py
from copy import copy
import so3g
from spt3g import core
import logging
import yaml
import datetime
import sotodlib
import traceback
logger = logging.getLogger(__name__)
[docs]
class G3tHWP():
def __init__(self, config_file=None):
"""
Class to manage L2 HK data into HWP angle g3.
Args
-----
config_file: str
path to config yaml file
"""
if config_file is not None:
if os.path.exists(config_file):
self.config_file = config_file
self.configs = yaml.safe_load(open(self.config_file, "r"))
logger.info("Loading config from " + self.config_file)
else:
logger.warning(
"Cannot find config file, use all default values")
self.configs = {}
else:
logger.warning("Cannot find config file, use all default values")
self.configs = {}
self._start = 0
self._end = 0
self._file_list = None
self._start = self.configs.get('start', 0)
self._end = self.configs.get('end', 0)
self._file_list = self.configs.get('file_list', None)
self._data_dir = self.configs.get('data_dir', None)
self._margin = self.configs.get('margin', 10)
# 1st/2nd encoder readout
self._field_instance = self.configs.get('field_instance',
'satp1.hwp-bbb-e1.feeds.HWPEncoder')
self._field_instance_sub = self.configs.get('field_instance_sub',
'satp1.hwp-bbb-e2.feeds.HWPEncoder')
self._field_list = self.configs.get('field_list',
['rising_edge_count', 'irig_time', 'counter',
'counter_index', 'irig_synch_pulse_clock_time',
'irig_synch_pulse_clock_counts', 'quad'])
# Size of pakcets sent from the BBB
# 120 in the latest version, 150 in the previous version
self._pkt_size = self.configs.get('pkt_size', 120)
# IRIG type
# 0: 1Hz IRIG (default), 1: 10Hz IRIG
self._irig_type = self.configs.get('irig_type', 0)
# Number of encoder slits per HWP revolution
self._num_edges = self.configs.get('num_edges', 570 * 2)
# Reference slit edgen width
self._ref_edges = self.configs.get('ref_edges', 2)
# Number of encoder slits per HWP revolution pre-process
self._edges_per_rev = self._num_edges - self._ref_edges
# Reference slit angle
self._delta_angle = 2 * np.pi / self._num_edges
# Search range of reference slot
self._ref_range = self.configs.get('ref_range', 0.1)
# Threshoild for outlier data to calculate nominal slit width
self._slit_width_lim = self.configs.get('slit_width_lim', 0.1)
# The distance from the hwp center to the fine encoder slots (mm)
self._encoder_disk_radius = self.configs.get(
'encoder_disk_radius', 346.25)
# The time period and amount of irig desynchronization
# [ start_time, stop_time, amount of time shift ]
self._irig_desync = self.configs.get('irig_desync', None)
# Output path + filename
self._output = self.configs.get('output', None)
# encoder suffixes
self._suffixes = ['_1', '_2']
[docs]
def load_data(self, start=None, end=None,
data_dir=None, instance=None):
"""
Loads house keeping data for a given time range and
returns HWP parameters in L2 HK .g3 file
Args
-----
start : timestamp or datetime
start time for data, assumed to be in UTC unless specified
end : timestamp or datetime
end time for data, assumed to be in UTC unless specified
data_dir : str or None
path to HK g3 file, overwrite config file
instance : str or None
instance of field list, overwrite config file
ex.) lab data 'observatory.HBA.feeds.HWPEncoder'
ex.) site data 'satp1.hwp-bbb-e1.feeds.HWPEncoder'
ex.) site data 'satp3.hwp-bbb-a1.feeds.HWPEncoder'
Returns
----
dict
{alias[i] : (time[i], data[i])}
"""
if start is not None and end is not None:
self._start = start
self._end = end
if self._start is None:
logger.error("Cannot find time range")
return {}
if isinstance(start, datetime.datetime):
if start.tzinfo is None:
logger.warning(
'No tzinfo info in start argument, set to utc timezone')
start = start.replace(tzinfo=datetime.timezone.utc)
self._start = start.timestamp()
if isinstance(end, datetime.datetime):
if end.tzinfo is None:
logger.warning(
'No tzinfo info in end argument, set to utc timezone')
end = start.replace(tzinfo=datetime.timezone.utc)
self._end = end.timestamp()
if data_dir is not None:
self._data_dir = data_dir
if self._data_dir is None:
logger.error("Cannot find data directory")
return {}
if instance is not None:
self._field_instance = instance
# load housekeeping data with hwp keys
logger.info('Loading HK data files ')
logger.info("input time range: " +
str(self._start) + " - " + str(self._end))
fields, alias = self._key_formatting()
data = so3g.hk.load_range(
self._start,
self._end,
fields,
alias,
data_dir=self._data_dir)
if not any(data):
logger.info('HWP is not spinning in time range {' + str(
self._start) + ' - ' + str(self._end) + '}, data is empty')
return data
[docs]
def load_file(self, file_list=None, instance=None):
"""
Loads house keeping data with specified g3 files.
Return HWP parameters from SO HK data.
Args
-----
file_list : str or list or None
path and file name of input level 2 HK g3 file
instance : str or None
instance of field list, overwrite config file
ex.) lab data 'observatory.HBA.feeds.HWPEncoder'
ex.) site data 'satp1.hwp-bbb-e1.feeds.HWPEncoder'
ex.) site data 'satp3.hwp-bbb-a1.feeds.HWPEncoder'
Returns
----
dict
{alias[i] : (time[i], data[i])}
"""
if file_list is None and self._file_list is None:
logger.error('Cannot find input g3 file')
return {}
if file_list is not None:
self._file_list = file_list
if instance is not None:
self._field_instance = instance
# load housekeeping files with hwp keys
scanner = so3g.hk.HKArchiveScanner()
if not (isinstance(self._file_list, list)
or isinstance(self._file_list, np.ndarray)):
self._file_list = [self._file_list]
for f in self._file_list:
if not os.path.exists(f):
logger.error('Cannot find input g3 file')
return {}
scanner.process_file(f)
logger.info("Loading HK data files: {}".format(
' '.join(map(str, self._file_list))))
arc = scanner.finalize()
if not any(arc.get_fields()[0]):
self._start = 0
self._end = 0
return {}
fields, alias = self._key_formatting()
if not np.any([f in arc.get_fields()[0].keys() for f in fields]):
logger.info(
"HWP is not spinning in input g3 files or cannot find field")
return {}
if self._start == 0 and self._end == 0:
self._start = np.min([arc.simple(f)[0][0]
for f in fields if f in arc.get_fields()[0].keys()])
self._end = np.max([arc.simple(f)[0][-1]
for f in fields if f in arc.get_fields()[0].keys()])
data = {a: arc.simple(f) for a, f in zip(
alias, fields) if f in arc.get_fields()[0].keys()}
return data
def _key_formatting(self):
"""
Formatting hwp housekeeping field names and aliases
Return
-----
fields, alias
"""
# 1st encoder readout
fields = [self._field_instance + '_full.' + f if 'counter' in f
else self._field_instance + '.' + f for f in self._field_list]
alias = [a + '_1' for a in self._field_list]
# 2nd encoder readout
if self._field_instance_sub is not None:
fields += [self._field_instance_sub + '_full.' + f if 'counter' in f
else self._field_instance_sub + '.' + f for f in self._field_list]
alias += [a + '_2' for a in self._field_list]
# metadata key
meta_keys = {
'pid_direction': 'hwp-pid.feeds.hwppid.direction',
}
platform = self._field_instance.split('.')[0]
for k, f in meta_keys.items():
alias.append(k)
fields.append(platform + '.' + f)
return fields, alias
def _data_formatting(self, data, suffix):
"""
Formatting encoder data
Args
-----
data : dict
HWP HK data from load_data
suffix: Specify whether to use 1st or 2nd encoder, '_1' or '_2'
'_1' for 1st encoder, '_2' for 2nd encoder
Returns
--------
dict
{'rising_edge_count', 'irig_time', 'counter', 'counter_index', 'quad', 'quad_time'}
"""
keys = ['rising_edge_count', 'irig_time',
'counter', 'counter_index', 'quad', 'quad_time']
out = {k: data[k+suffix][1] if k+suffix in data.keys() else []
for k in keys}
# irig part
if 'irig_time'+suffix not in data.keys():
logger.warning(
'All IRIG time is not correct for encoder' + suffix)
return out
if self._irig_type == 1:
out['irig_time'] = data['irig_synch_pulse_clock_time'+suffix][1]
out['rising_edge_count'] = data['irig_synch_pulse_clock_counts'+suffix][1]
# encoder part
if 'counter'+suffix not in data.keys():
logger.warning(
'No encoder data is available for encoder'+suffix)
return out
if 'quad' + suffix in data:
out['quad'] = data['quad'+suffix][1]
out['quad_time'] = data['quad'+suffix][0]
else:
out['quad'] = []
out['quad_time'] = []
return out
def _slowdata_process(self, fast_time, irig_time, suffix):
""" Diagnose hwp status and output status flags
Returns
--------
dict
{stable, locked, hwp_rate, slow_time}
Notes
------
- Time definition -
if fast_time exists: slow_time = fast_time
elif: irig_time exists but no fast_time, slow_time = irig_time
else: slow_time is per 10 sec array
"""
slow_time = np.arange(self._start, self._end, 10)
if len(irig_time) == 0:
out = {
'locked'+suffix: np.zeros_like(slow_time, dtype=bool),
'stable'+suffix: np.zeros_like(slow_time, dtype=bool),
'hwp_rate'+suffix: np.zeros_like(slow_time, dtype=np.float32),
'slow_time'+suffix: slow_time,
}
return out
if len(fast_time) == 0:
fast_irig_time = irig_time
locked = np.zeros_like(irig_time, dtype=bool)
stable = np.zeros_like(irig_time, dtype=bool)
hwp_rate = np.zeros_like(irig_time, dtype=np.float32)
else:
# hwp speed calc. (approximate using ref)
ref_indexes = self._ref_indexes
if isinstance(self._ref_indexes, tuple):
ref_indexes = self._ref_indexes[0]
hwp_rate_ref = 1 / np.diff(fast_time[ref_indexes])
hwp_rate = [hwp_rate_ref[0] for i in range(ref_indexes[0])]
for n in range(len(np.diff(ref_indexes))):
hwp_rate += [hwp_rate_ref[n]
for r in range(np.diff(ref_indexes)[n])]
hwp_rate += [hwp_rate_ref[-1] for i in range(len(fast_time) -
ref_indexes[-1])]
fast_irig_time = fast_time
locked = np.ones_like(fast_time, dtype=bool)
locked[hwp_rate == 0] = False
stable = np.ones_like(fast_time, dtype=bool)
# irig only status
irig_only_time = irig_time[
(irig_time < fast_time[0]) | (irig_time > fast_time[-1])]
irig_only_locked = np.zeros_like(irig_only_time, dtype=bool)
irig_only_hwp_rate = np.zeros_like(
irig_only_time, dtype=np.float32)
fast_irig_time = np.append(irig_only_time, fast_time)
fast_irig_idx = np.argsort(fast_irig_time)
fast_irig_time = fast_irig_time[fast_irig_idx]
locked = np.append(irig_only_locked, locked)[fast_irig_idx]
hwp_rate = np.append(irig_only_hwp_rate, hwp_rate)[fast_irig_idx]
stable = np.ones_like(fast_irig_time, dtype=bool)
# slow status
slow_time = slow_time[
(slow_time < fast_irig_time[0]) | (slow_time > fast_irig_time[-1])]
slow_locked = np.zeros_like(slow_time, dtype=bool)
slow_stable = np.zeros_like(slow_time, dtype=bool)
slow_hwp_rate = np.zeros_like(slow_time, dtype=np.float32)
slow_time = np.append(slow_time, fast_irig_time)
slow_idx = np.argsort(slow_time)
slow_time = slow_time[slow_idx]
locked = np.append(slow_locked, locked)[slow_idx]
stable = np.append(slow_stable, stable)[slow_idx]
hwp_rate = np.append(slow_hwp_rate, hwp_rate)[slow_idx]
locked[hwp_rate == 0] = False
return {'locked'+suffix: locked, 'stable'+suffix: stable, 'hwp_rate'+suffix: hwp_rate, 'slow_time'+suffix: slow_time}
[docs]
def analyze(self, data, ratio=None, mod2pi=True, fast=True, suffix='_1'):
"""
Analyze HWP angle solution
to be checked by hardware that 0 is CW and 1 is CCW from (sky side) consistently for all SAT
Args
-----
data : dict
HWP HK data from load_data
ratio : float, optional
parameter for referelce slit
threshold = 2 slit distances +/- ratio
mod2pi : bool, optional
If True, return hwp angle % 2pi
fast : bool, optional
If True, run fast fill_ref algorithm
Returns
--------
dict
{fast_time, angle, slow_time, stable, locked, hwp_rate, ref_indexes, filled_flag}
Notes
------
* fast_time: timestamp
* IRIG synched timing (~2kHz)
* angle (float): IRIG synched HWP angle in radian
* slow_time: timestamp
* time list of slow block
* stable: bool
* if non-zero, indicates the HWP spin state is known.
* i.e. it is either spinning at a measurable rate, or stationary.
* When this flag is non-zero, the hwp_rate field can be taken at face value.
* locked: bool
* if non-zero, indicates the HWP is spinning and the position solution is working.
* In this case one should find the hwp_angle populated in the fast data block.
* hwp_rate: float
* the "approximate" HWP spin rate, with sign, in revs / second.
* Use placeholder value of 0 for cases when not "stable".
* ref_indexes: int
* Indexes of of reference slots.
* filled_flag: bool
* Flag indicating the points that are filled due to packet drop.
* num_dropped_packets
* number of dropped encoder packets
* num_dropped_packets_irig
* number of dropped irig packets
* num_glitches
* number of encoder data point glitches, unexpected data points
* num_value_glitches
* number of encoder value glitches, points with value shift due to glitches
* num_glitches_irig
* number of irig data point glitches, unexpected data points
* num_value_glitches_irig
* number of irig value glitches, points with value shift due to glitches
* num_dead_rots
* number rotations that failed to fix glitches
* num_dropped_slits
* number rotations that failed to fix glitches
"""
if not any(data):
logger.info("no HWP field data")
d = self._data_formatting(data, suffix)
# hwp angle calc.
if ratio is not None:
logger.info(f"Overwriting reference slit threshold by {ratio}.")
self._ref_range = ratio
out = {}
logger.info("Start calclulating angle.")
if len(d['irig_time']) == 0:
logger.warning('There is no correct IRIG timing. Stop analyze.')
else:
fast_time, angle = self._hwp_angle_calculator(
d['counter'], d['counter_index'], d['irig_time'],
d['rising_edge_count'], d['quad_time'], d['quad'],
mod2pi, fast)
if len(fast_time) == 0:
logger.warning('analyzed encoder data is None')
out.update(self._slowdata_process(
fast_time, d['irig_time'], suffix))
out['fast_time'+suffix] = fast_time
out['angle'+suffix] = angle
out['quad'+suffix] = self._quad_corrected
out['ref_indexes'+suffix] = self._ref_indexes
# generate flags
filled_flag = np.zeros_like(fast_time, dtype=bool)
for edge in self._edges_dropped_pkts:
filled_flag[(self._encd_clk >= edge[0]) & (self._encd_clk <= edge[1])] = True
out['filled_flag'+suffix] = filled_flag
out['num_dropped_packets'+suffix] = int(self._num_dropped_pkts)
out['num_dropped_packets_irig'+suffix] = int(self._num_dropped_pkts_irig)
out['num_glitches'+suffix] = int(self._num_glitches)
out['num_glitches_irig'+suffix] = int(self._num_glitches_irig)
out['num_value_glitches'+suffix] = int(self._num_value_glitches)
out['num_value_glitches_irig'+suffix] = int(self._num_value_glitches_irig)
out['num_dead_rots'+suffix] = int(self._num_dead_rots)
out['num_dropped_slits'+suffix] = int(self._num_dropped_slits)
return out
def eval_angle(self, solved, poly_order=3, suffix='_1'):
"""
Evaluate the non-uniformity of hwp angle timestamp (template) and subtract it
The raw hwp angle timestamp is kept.
Args
-----
solved: dict
dict data from analyze
poly_order:
order of polynomial filtering for removing drift of hwp speed
for evaluating the non-uniformity of hwp angle.
suffix:
'_1' for 1st encoder, '_2' for 2nd encoder
Returns
--------
output: dict
{fast_time, fast_time_raw, template, template_err, ...}
Notes
------
* template: float array (ratio)
* Averaged non-uniformity of the hwp angle
* normalized by the step of the angle encoder
* template_err: float array
* Error bar of template
non-uniformity of hwp angle comes from following reasons,
- non-uniformity of encoder slits
- sag of rotor
- bad tuning of the comparator threshold of DriverBoard
- degradation of LED
and the non-uniformity can be time-dependent.
Need to evaluate and subtract it before interpolating hwp angle into Smurf timestamps.
The non-uniformity of encoder slots creates additional hwp angle jitter.
The maximum possible additional jitter is comparable to the requirement of angle jitter.
We make an template of encoder slits and subtract it from the timestamp.
"""
if 'fast_time_raw'+suffix in solved.keys():
logger.info(
'Non-uniformity is already subtracted. Calculation is skipped.')
return
def detrend(array, deg):
x = np.linspace(-1, 1, len(array))
p,_ ,_ ,_ ,_ = np.polyfit(x, array, deg=deg, full=True) # supress rank warning
pv = np.polyval(p, x)
return array - pv
logger.info('Remove non-uniformity from hwp angle and overwrite')
ref_indexes = solved['ref_indexes'+suffix]
fast_time = solved['fast_time'+suffix]
# Trim only the timestamps of integer revolutions
ft = fast_time[ref_indexes[0]:ref_indexes[-2]+1]
# remove rotation frequency drift for making a template of encoder slits
ft = detrend(ft, deg=poly_order)
# make template from difference of time
template_slit = np.diff(ft).reshape(len(solved['ref_indexes'+suffix])-2, self._num_edges)
template_err = np.std(template_slit, axis=0)
template_slit = np.average(template_slit, axis=0) # take average of all revolutions
template_slit = np.cumsum(template_slit)
template_slit -= np.average(template_slit) # remove global time ofset
subtract = np.roll(np.tile(template_slit, int(np.ceil(len(fast_time)/self._num_edges))), ref_indexes[0]+1)
subtract = subtract[:len(fast_time)]
# subtract template, keep raw timestamp
solved['fast_time_raw'+suffix] = copy(fast_time)
solved['fast_time'+suffix] = fast_time - subtract
# Normalize template by the width of slit
average_dt_slit = np.average(np.diff(fast_time - subtract))
solved['template'+suffix] = template_slit / average_dt_slit
solved['template_err'+suffix] = template_err / average_dt_slit
def template_subtraction(self, solved, suffix='_1'):
""" Template subtraction taking into account the drift of hwp rotation speed
"""
ref_indexes = solved['ref_indexes'+suffix]
fast_time = solved['fast_time'+suffix]
counter = np.arange(len(fast_time))
spl = scipy.interpolate.CubicSpline(counter[ref_indexes], fast_time[ref_indexes])
dt_smoothed = spl(counter)
dt_derivative = spl.derivative()(counter)
template = np.split(fast_time - dt_smoothed, ref_indexes)[1:-1]
template_err = np.std(template, axis=0)
template = np.average(template, axis=0)
template -= np.average(template) # no global shift
template_model = np.roll(template, ref_indexes[0])
template_model = np.tile(template_model, int(np.ceil(len(fast_time)/self._num_edges)))[:len(fast_time)]
template_model = template_model * dt_derivative / np.average(dt_derivative)
solved['fast_time_raw'+suffix] = copy(fast_time)
solved['fast_time'+suffix] = fast_time - template_model
# Normalize template by the width of slit
average_dt_slit = np.average(np.diff(fast_time - template_model))
solved['template'+suffix] = np.diff(template, append=template[0]) / average_dt_slit
solved['template_t'+suffix] = template / average_dt_slit
solved['template_t_err'+suffix] = template_err / average_dt_slit
return
def eval_offcentering(self, solved):
"""
Evaluate the off-centering of the hwp from the phase difference between two encoders.
Assume that slot pattern subraction is already applied
* Definition of offcentering must be clear.
Args
-----
solved: dict
dict solved from template_subtraction
{fast_time_1, angle_2, fast_time_2, angle_2, ...}
Returns
--------
output: dict
{offcenter_idx1, offcenter_idx2, offcentering, offset_time}
Notes
------
* offcenter_idx1: int
* index of the solved['fast_time_1'] for which offcentering is estimated.
* offcenter_idx2: int
* index of the solved['fast_time_2'] for which offcentering is estimated.
* offcentering: float
* Offcentering (mm) at solved['fast_time(_2)'][offcenter_idx1(2)].
* offset_time: float
* Offset time of the encoder signals induced by the offcentering.
* Offset time is the delayed (advanced) timing of the encoder1 (2) in sec.
"""
logger.info('Remove offcentering effect from hwp angle and overwrite')
# Find the first point where two encoder data are available
# and offcentering is calculable
t1_0 = solved['fast_time_1'][solved['ref_indexes_1']][0]
t2_0 = solved['fast_time_2'][solved['ref_indexes_2']][0]
for i1_start in solved["ref_indexes_1"]:
if solved['fast_time_1'][i1_start] > t2_0:
break
else:
raise ValueError('Cannot correct offcentering because '
'two encoder data do not overlap')
for i2_start in solved["ref_indexes_2"]:
if solved['fast_time_2'][i2_start] > t1_0:
break
else:
raise ValueError('Cannot correct offcentering because '
'two encoder data do not overlap')
t1_start = solved['fast_time_1'][i1_start]
t2_start = solved['fast_time_2'][i2_start]
if t1_start <= t2_start:
if i1_start >= int(self._num_edges/2):
i1_start -= int(self._num_edges/2)
i2_start -= self._num_edges
else:
i2_start -= int(self._num_edges/2)
else:
if i2_start >= int(self._num_edges/2):
i1_start -= self._num_edges
i2_start -= int(self._num_edges/2)
else:
i1_start -= int(self._num_edges/2)
# Calculate offcentering to the end of the shorter encoder data.
if len(solved["fast_time_1"][i1_start:]) > len(solved["fast_time_2"][i2_start:]):
idx_length = len(solved["fast_time_2"][i2_start:])
else:
idx_length = len(solved["fast_time_1"][i1_start:])
offcenter_idx1 = np.arange(
i1_start, i1_start+idx_length-1)
offcenter_idx2 = np.arange(
i2_start, i2_start+idx_length-1)
# Calculate the offset time of the encoders induced by the offcentering.
offset_time = (solved["fast_time_1"][offcenter_idx1] -
solved["fast_time_2"][offcenter_idx2])/2
# Calculate the offcentering (mm).
period = (solved["fast_time_1"][offcenter_idx1+1] -
solved["fast_time_1"][offcenter_idx1])*self._num_edges
# When data is extremely noisy, period gets zero and offcentering becomes nan
period[period == 0] = np.median(period)
offset_angle = 2 * np.pi * offset_time / period
offcentering = np.tan(offset_angle) * self._encoder_disk_radius
solved['offcenter_idx1'] = offcenter_idx1
solved['offcenter_idx2'] = offcenter_idx2
solved['offcentering'] = offcentering
solved['offset_time'] = offset_time
return
def correct_offcentering(self, solved):
"""
Correct the timing of solved['fast_time'] which is delayed (advanced) by the offcentering.
Args
-----
solved: dict
dict solved from template_subtraction
{fast_time_1, angle_1, fast_time_2, angle_2, ...}
offcentering: dict
dict solved from eval_offcentering
{offcenter_idx1, offcenter_idx2, offcentering, offset_time}
Returns
--------
output: dict
{fast_time, angle, fast_time_2, angle_2, ...}
Notes
------
* offcenter_idx1: int
* index of the solved['fast_time_1'] for which offcentering is estimated.
* offcenter_idx2: int
* index of the solved['fast_time_2'] for which offcentering is estimated.
* offcentering: float
* Offcentering (mm) at solved['fast_time_1(2)'][offcenter_idx1(2)].
* offset_time: float
* Offset time of the encoder signals induced by the offcentering.
* Offset time is the delayed (advanced) timing of the encoder1 (2) in sec.
* We should allow to correct the offcentering by external input, since offcentering measurement is not always available.
"""
# Skip the correction when the offcentering estimation doesn't exist.
if 'offcentering' not in solved.keys():
logger.warning(
'Offcentering info does not exist. Offcentering correction is skipped.')
return
offcenter_idx1 = solved['offcenter_idx1']
offcenter_idx2 = solved['offcenter_idx2']
offset_time = solved['offset_time']
solved['fast_time_raw_1'] = solved['fast_time_raw_1'][offcenter_idx1]
solved['fast_time_raw_2'] = solved['fast_time_raw_2'][offcenter_idx2]
solved['fast_time_1'] = solved['fast_time_1'][offcenter_idx1] - offset_time
solved['fast_time_2'] = solved['fast_time_2'][offcenter_idx2] + offset_time
solved['angle_1'] = solved['angle_1'][offcenter_idx1]
solved['angle_2'] = solved['angle_2'][offcenter_idx2]
return
[docs]
def write_solution(self, solved, output=None):
"""
Output HWP angle + flags as SO HK g3 format
Args
-----
solved: dict
dict data from analyze
output: str or None
output path + file name, override config file
Notes
-----------
Output file format
* Provider: 'hwp'
* Fast block
* 'hwp.hwp_angle'
* Slow block
* 'hwp.stable'
* 'hwp.locked'
* 'hwp.hwp_rate'
- fast_time: timestamp
IRIG synched timing (~2kHz)
- angle: float
IRIG synched HWP angle in radian
- slow_time: timestamp
time list of slow block
- stable: bool
if non-zero, indicates the HWP spin state is known.
i.e. it is either spinning at a measurable rate, or stationary.
When this flag is non-zero, the hwp_rate field can be taken at face value.
- locked: bool
if non-zero, indicates the HWP is spinning and the position solution is working.
In this case one should find the hwp_angle populated in the fast data block.
- hwp_rate: float
the "approximate" HWP spin rate, with sign, in revs / second.
Use placeholder value of 0 for cases when not "locked".
"""
if self._output is None and output is None:
logger.warning('Not specified output file')
return
if output is not None:
self._output = output
if len(solved) == 0:
logger.warning('input data is empty, skip writing')
return
if len(solved['fast_time']) == 0:
logger.info('write no rotation data, skip writing')
return
session = so3g.hk.HKSessionHelper(hkagg_version=2)
writer = core.G3Writer(output)
writer.Process(session.session_frame())
prov_id = session.add_provider('hwp')
writer.Process(session.status_frame())
# Divide the full time span into equal intervals
start_time = solved['slow_time'].min()
end_time = solved['slow_time'].max()
if np.any(solved['fast_time']):
start_time = min(start_time, solved['fast_time'].min())
end_time = max(end_time, solved['fast_time'].max())
frame_length = 60 # seconds
while start_time < end_time:
t0, t1 = start_time, start_time + frame_length
# Write a slow frame?
s = (t0 <= solved['slow_time']) * (solved['slow_time'] < t1)
if np.any(s):
frame = session.data_frame(prov_id)
slow_block = core.G3TimesampleMap()
slow_block.times = core.G3VectorTime(
[core.G3Time(_t * core.G3Units.s) for _t in solved['slow_time'][s]])
slow_block['stable'] = core.G3VectorInt(solved['stable'][s])
slow_block['locked'] = core.G3VectorInt(solved['locked'][s])
slow_block['hwp_rate'] = core.G3VectorDouble(
solved['hwp_rate'][s])
frame['block_names'].append('slow')
frame['blocks'].append(slow_block)
writer.Process(frame)
# Write a fast frame?
s = (t0 <= solved['fast_time']) * (solved['fast_time'] < t1)
if np.any(s):
frame = session.data_frame(prov_id)
fast_block = core.G3TimesampleMap()
fast_block.times = core.G3VectorTime(
[core.G3Time(_t * core.G3Units.s) for _t in solved['fast_time'][s]])
fast_block['hwp_angle'] = core.G3VectorDouble(
solved['angle'][s])
frame['block_names'].append('fast')
frame['blocks'].append(fast_block)
writer.Process(frame)
start_time += frame_length
return
def _set_empty_axes(self, aman, suffix=None):
if suffix is None:
aman.wrap_new('hwp_angle', shape=('samps', ), dtype=np.float64)
aman.wrap('primary_encoder', 0)
aman.wrap('version', 0)
aman.wrap('pid_direction', 0)
aman.wrap('offcenter_direction', 0)
aman.wrap_new('offcenter', shape=(2,), dtype=np.float64)
aman.wrap('sotodlib_version', sotodlib.__version__)
else:
aman.wrap_new('hwp_angle_ver1'+suffix,
shape=('samps', ), dtype=np.float64)
aman.wrap_new('hwp_angle_ver2'+suffix,
shape=('samps', ), dtype=np.float64)
aman.wrap_new('hwp_angle_ver3'+suffix,
shape=('samps', ), dtype=np.float64)
aman.wrap_new('quad'+suffix, shape=('samps', ), dtype=int)
aman.wrap('quad_direction'+suffix, 0)
aman.wrap_new('stable'+suffix, shape=('samps', ), dtype=bool)
aman.wrap_new('locked'+suffix, shape=('samps', ), dtype=bool)
aman.wrap_new('hwp_rate'+suffix, shape=('samps', ), dtype=np.float32)
aman.wrap_new('template'+suffix, shape=(self._num_edges, ), dtype=np.float64)
aman.wrap_new('template_t'+suffix, shape=(self._num_edges, ), dtype=np.float64)
aman.wrap_new('template_t_err'+suffix, shape=(self._num_edges, ), dtype=np.float64)
aman.wrap_new('filled_flag'+suffix, shape=('samps', ), dtype=bool)
aman.wrap('num_dropped_packets'+suffix, 0)
aman.wrap('num_dropped_packets_irig'+suffix, 0)
aman.wrap('num_glitches'+suffix, 0)
aman.wrap('num_glitches_irig'+suffix, 0)
aman.wrap('num_value_glitches'+suffix, 0)
aman.wrap('num_value_glitches_irig'+suffix, 0)
aman.wrap('num_dead_rots'+suffix, 0)
aman.wrap('num_dropped_slits'+suffix, 0)
aman.wrap('version'+suffix, 1)
aman.wrap('logger'+suffix, 'Not set')
return aman
def _set_raw_axes(self, aman, data):
""" Set raw encoder data in aman """
for k, v in data.items():
aman.wrap('raw_' + k, np.array(v))
return aman
def _load_raw_axes(self, aman, output, h5_address):
""" Load raw encoder data from h5 """
fileds, alias = self._key_formatting()
data = {}
f = h5py.File(output)
for a in alias:
if 'raw_' + a in f[h5_address].keys():
v = f[h5_address]['raw_' + a][:]
data[a] = v
f.close()
return data
def _angle_interpolation(self, timestamp1, angle, timestamp2):
"""Linearly interpolate the angle to the timestamp of the detector readout.
Fill outside the data range constant by the first and last values of the angle."""
return np.interp(timestamp2, timestamp1, angle, left=angle[0], right=angle[-1])
def _bool_interpolation(self, timestamp1, data, timestamp2, fill, round_option):
"""Linearly interpolate the boolean array. fill values by False outside of the
data range
Args
fill: Outside of data range is filled by this value, 0 or 1
round_option: round option, 'floor' or 'ceil'
"""
interp = np.interp(timestamp2, timestamp1, data, left=fill, right=fill)
if round_option == 'floor':
interp = np.floor(interp)
elif round_option == 'ceil':
interp = np.ceil(interp)
else:
raise ValueError(f'round option {round_option} is not supported.')
return interp.astype(bool)
[docs]
def set_data(self, tod, h5_filename=None):
"""
Output HWP hk data as AxisManager format. The results are stored in HDF5 files.
We save the copy of raw hwp encoder hk data into HDF5 file, to save time for
re-calculating the hwp angle solutions.
Args
----
tod: AxisManager
h5_filename:
If this is not None, try to load raw encoder data from hdf5 file
Notes
-----
Output file format
- Raw output
x is a number different for each data and each observation
- raw_rising_edge_count_1/2: int (2, x)
- raw_irig_time_1/2: float (2, x)
- raw_counter_1/2: float (2, x)
- raw_counter_index_1/2: int (2, x)
- raw_irig_synch_pulse_clock_time_1/2: float (2, x)
- raw_irig_synch_pulse_clock_counts_1/2: int (2, x)
- raw_quad_1/2: bool (2, x)
- raw_pid_direction: bool (2, x)
"""
if len(tod.timestamps) == 0:
logger.warning('Input data is empty.')
return
aman = sotodlib.core.AxisManager(tod.samps)
start = int(tod.timestamps[0])-self._margin
end = int(tod.timestamps[-1])+self._margin
self.data = {}
if h5_filename is not None:
logger.info('Loading raw encoder data from h5')
obs_id = tod.obs_info['obs_id']
self.data = self._load_raw_axes(aman, h5_filename, obs_id)
else:
try:
self.data = self.load_data(start, end)
except Exception as e:
logger.error(
f"Exception '{e}' thrown while loading HWP data. The specified encoder field is missing.")
print(traceback.format_exc())
finally:
self._set_raw_axes(aman, self.data)
return aman
[docs]
def make_solution(self, tod):
"""
Output HWP angle, flags, metadata as AxisManager format
The results are stored in HDF5 files. Since HWP angle solution HDF5 files are large,
we automatically split into the new output files.
Args
----
tod: AxisManager
Notes
-----
Output file format
- Primary output
- timestamp: float (samps,)
SMuRF synched timestamp
- hwp_angle: float (samps,)
The latest version of the SMuRF synched HWP angle in radian.
* ver1: HWP angle calculated from the raw encoder signal.
* ver2: HWP angle after the template subtraction.
* ver3: HWP angle after the template and off-centering subtraction.
The field 'version' indicates which version this hwp_angle is.
- primaty encoder: int
This field indicates which encoder is used for hwp_angle, 1 or 2
- version: int
This field indicates the version of the HWP angle in hwp_angle.
- Supplementary output
Suffix _1/2 indicates the encoder_1 or encoder_2
- version_1/2: int
This field indicates the version of the HWP angle of each encoder.
- hwp_angle_ver1/2/3_1/2: float (samps,)
This field stores the ver1/2/3 angle data.
- stable_1/2: bool (samps,)
If non-zero, indicates the HWP spin state is known.
i.e. it is either spinning at a measurable rate, or stationary.
When this flag is non-zero, the hwp_rate field can be taken at face value.
- locked_1/2: bool (samp,)
If non-zero, indicates the HWP is spinning and the position solution is working.
In this case one should find the hwp_angle populated in the fast data block.
- hwp_rate_1/2: float (samps,)
The "approximate" HWP spin rate, with sign, in revs / second.
Use placeholder value of 0 for cases when not "locked".
- logger_1/2: str
Log message for angle calculation status
'No HWP data', 'HWP data too short',
'Angle calculation failed', 'Angle calculation succeeded'
- filled_flag_1/2: bool (samps,)
Array to indicate the data points that are filled due to packet drop.
- quad_1/2: int (quad,)
0 or 1 or -1. 0 means no data
- template_1/2: float (1140,)
Template of the non uniformity of hwp encoder plate
- template_err_1/2: float (1140,)
Error bar of template
- offcenter: float (2,)
- (average offcenter, std of offcenter) unit is (mm)
- Rotation direction
Rotation direction estimated by several methods.
0 or 1 or -1. 0 means no data.
- quad_direction_1/2: int
Estimation by median encoder quadrature for each encoder
- pid_direction: int
Estimation by median pid controller commanded direction
- offcenter_direction: int
Estimation by the offcentering measured by the time offset between two encoders.
"""
aman = sotodlib.core.AxisManager(tod.samps)
aman.wrap_new('timestamps', ('samps', ))[:] = tod.timestamps
self._set_empty_axes(aman)
if 'pid_direction' in self.data.keys():
pid_direction = np.nanmedian(self.data['pid_direction'][1])*2 - 1
if pid_direction in [1, -1]:
aman['pid_direction'] = pid_direction
else:
aman['pid_direction'] = 0
solved = {}
success = []
# version 0
# No data or angle calculation is failed
for suffix in self._suffixes:
logger.info('Start analyzing encoder'+suffix)
self._set_empty_axes(aman, suffix)
# load data
if not 'counter' + suffix in self.data.keys():
logger.warning('No HWP data in the specified timestamps.')
aman['logger'+suffix] = 'No HWP data'
success.append(False)
continue
try:
solved.update(self.analyze(self.data, mod2pi=False, suffix=suffix))
except Exception as e:
logger.error(
f"Exception '{e}' thrown while calculating HWP angle. Angle calculation failed.")
aman['logger'+suffix] = 'Angle calculation failed'
success.append(False)
print(traceback.format_exc())
continue
if len(solved) == 0 or ('fast_time'+suffix not in solved.keys()) or len(solved['fast_time'+suffix]) == 0:
logger.info(
'No correct rotation data in the specified timestamps.')
aman['logger'+suffix] = 'No HWP data'
success.append(False)
continue
aman['logger'+suffix] = 'Angle calculation succeeded'
success.append(True)
# Correct ambiguity of references when angle solution is succeeded
# and only one of the encoders have ambiguity
ref_ambiguous = ['angle' + suffix in solved.keys() and
isinstance(solved['angle' + suffix], tuple) for suffix in self._suffixes]
if sum(ref_ambiguous) == 2:
logger.error('Both encoders have ambiguity in references. Abort angle calculation')
for suffix in self._suffixes:
aman['logger'+suffix] += ', failed to correct ambiguous references'
success = [False, False]
if sum(ref_ambiguous) == 1 and sum(success) < 2:
logger.error('Cannot correct ambiguity of references. Abort angle calculation')
ambiguous_suffix = np.array(self._suffixes)[ref_ambiguous][0]
aman['logger'+ambiguous_suffix] += ', failed to correct ambiguous references'
success = [False, False]
if sum(ref_ambiguous) == 1 and sum(success) == 2:
ambiguous_suffix = np.array(self._suffixes)[ref_ambiguous][0]
good_suffix = np.array(self._suffixes)[np.logical_not(ref_ambiguous)][0]
good_angle = np.interp(solved['fast_time' + ambiguous_suffix],
solved['fast_time' + good_suffix], solved['angle' + good_suffix])
for i, ambiguous_angle in enumerate(solved['angle' + ambiguous_suffix]):
median_diff = np.mod(np.median(ambiguous_angle - good_angle) - np.pi, 2 * np.pi) - np.pi
logger.debug(median_diff)
if abs(abs(median_diff) - np.pi) < 2 * np.arctan(5 / self._encoder_disk_radius):
solved['angle' + ambiguous_suffix] = solved['angle' + ambiguous_suffix][i]
solved['ref_indexes' + ambiguous_suffix] = solved['ref_indexes' + ambiguous_suffix][i]
logger.warning(f'Corrected ambiguous references of encoder{ambiguous_suffix}')
break
if isinstance(solved['angle' + ambiguous_suffix], tuple):
logger.error('Cannot correct ambiguity of references.')
aman['logger'+ambiguous_suffix] += ', failed to correct ambiguous references'
success = success & np.logical_not(ref_ambiguous)
# version 1
# angle calculation succeeded
for i, suffix in enumerate(self._suffixes):
if not success[i]:
continue
aman['version'+suffix] = 1
aman['stable'+suffix] = self._bool_interpolation(
solved['slow_time'+suffix], solved['stable'+suffix], tod.timestamps, 0, 'floor')
aman['locked'+suffix] = self._bool_interpolation(
solved['slow_time'+suffix], solved['locked'+suffix], tod.timestamps, 0, 'floor')
aman['hwp_rate'+suffix] = np.interp(
tod.timestamps, solved['slow_time'+suffix], solved['hwp_rate'+suffix], left=0, right=0)
quad = scipy.interpolate.interp1d(
solved['fast_time'+suffix], solved['quad'+suffix], kind='linear', fill_value='extrapolate')(tod.timestamps)
aman['quad'+suffix] = np.array([1 if q else -1 for q in quad])
aman['quad_direction'+suffix] = np.nanmedian(aman['quad'+suffix])
filled_flag = np.zeros_like(solved['fast_time'+suffix], dtype=bool)
filled_flag[solved['filled_flag'+suffix]] = 1
aman['filled_flag'+suffix] = self._bool_interpolation(
solved['fast_time'+suffix], filled_flag, tod.timestamps, 1, 'ceil')
aman['hwp_angle_ver1'+suffix] = np.mod(self._angle_interpolation(
solved['fast_time'+suffix], solved['angle'+suffix], tod.timestamps), 2*np.pi)
aman['num_dropped_packets'+suffix] = solved['num_dropped_packets'+suffix]
aman['num_dropped_packets_irig'+suffix] = solved['num_dropped_packets_irig'+suffix]
aman['num_glitches'+suffix] = solved['num_glitches'+suffix]
aman['num_glitches_irig'+suffix] = solved['num_glitches_irig'+suffix]
aman['num_value_glitches'+suffix] = solved['num_value_glitches'+suffix]
aman['num_value_glitches_irig'+suffix] = solved['num_value_glitches_irig'+suffix]
aman['num_dead_rots'+suffix] = solved['num_dead_rots'+suffix]
aman['num_dropped_slits'+suffix] = solved['num_dropped_slits'+suffix]
# version 2
# calculate template subtracted angle
for i, suffix in enumerate(self._suffixes):
if not success[i]:
continue
try:
self.template_subtraction(solved, suffix=suffix)
aman['hwp_angle_ver2'+suffix] = np.mod(self._angle_interpolation(
solved['fast_time'+suffix], solved['angle'+suffix], tod.timestamps), 2*np.pi)
aman['version'+suffix] = 2
aman['template'+suffix] = solved['template'+suffix]
aman['template_t'+suffix] = solved['template_t'+suffix]
aman['template_t_err'+suffix] = solved['template_t_err'+suffix]
except Exception as e:
logger.error(
f"Exception '{e}' thrown while the template subtraction.")
print(traceback.format_exc())
# version 3
# calculate off-centering corrected angle
if (aman.version_1 == 2 and aman.version_2 == 2):
try:
self.eval_offcentering(solved)
self.correct_offcentering(solved)
for suffix in self._suffixes:
aman['hwp_angle_ver3'+suffix] = np.mod(self._angle_interpolation(
solved['fast_time'+suffix], solved['angle'+suffix], tod.timestamps), 2*np.pi)
aman['version'+suffix] = 3
aman['offcenter'] = np.array([np.average(solved['offcentering']), np.std(solved['offcentering'])])
aman.offcenter_direction = np.sign(aman['offcenter'][0])
except Exception as e:
logger.error(
f"Exception '{e}' thrown while the off-centering correction.")
print(traceback.format_exc())
else:
logger.warning(
'Offcentering calculation is only available when two encoders are operating. Skipped.')
# make the hwp angle solution with highest version as hwp_angle
highest_version = int(np.max([aman.version_1, aman.version_2]))
primary_encoder = int(np.argmax([aman.version_1, aman.version_2]) + 1)
logger.info(f'Save hwp_angle_ver{highest_version}_{primary_encoder} as hwp_angle')
aman.hwp_angle = aman[f'hwp_angle_ver{highest_version}_{primary_encoder}']
aman.primary_encoder = primary_encoder
aman.version = highest_version
return aman
def _hwp_angle_calculator(
self,
counter,
counter_idx,
irig_time,
rising_edge,
quad_time,
quad,
mod2pi,
fast):
# counter: BBB counter values for encoder signal edges
self._encd_clk = counter
# counter_index: index numbers for detected edges by BBB
self._encd_cnt = counter_idx
# irig_time: decoded time in second since the unix epoch
self._irig_time = irig_time
# rising_edge_count: BBB clcok count values for the IRIG on-time
# reference marker risinge edge
self._rising_edge = rising_edge
# reference slit indexes
self._ref_indexes = []
# temporary placeholder for full references
self._ref_clk_tmp = None
# quad: quadrature signal to determine rotation direction
self._quad_time = quad_time
self._quad = quad
self._quad_corrected = []
# return arrays
self._time = []
self._angle = []
# metadata of packet drop
self._num_dropped_pkts = 0
self._num_dropped_pkts_irig = 0
self._edges_dropped_pkts = []
self._num_dropped_slits = 0
# glitch statistics
self._num_glitches = 0
self._num_value_glitches = 0
self._num_glitches_irig = 0
self._num_value_glitches_irig = 0
self._num_dead_rots = 0
# if no data, skip analysis
if len(self._encd_clk) < self._num_edges:
return [], []
# check duplication in data
self._duplication_check()
# check IRIG timing quality
self._irig_quality_check()
self._fix_irig_desync()
# check 32 bit internal counter overflow glitch
self._process_counter_overflow_glitch()
# treat counter index reset due to agent reboot
self._process_counter_index_reset()
# check packet drop
self._encoder_packet_sort()
self._fill_dropped_packets()
# reference finding
_status_find_ref = self._find_refs()
if not _status_find_ref:
return [], []
# correct references with glitches
self._correct_bad_refs()
# reference filling
if fast:
self._fill_refs_fast()
else:
self._fill_refs()
# If encoder slit is missing, temporarily "correct" refereces
# to fix glitches. ambiguity of references needs to be corrected
# by comparing two encoders
for i in range(1, 3):
diff_ref = np.median(np.diff(self._ref_indexes[::i + 1])) - self._num_edges
if 0 <= diff_ref < 3:
logger.warning(f'Detected {i} missing slit, '
'ambiguous references needs to be corrected')
self._ref_clk_tmp = self._ref_clk
self._num_dropped_slits = i
refs = self._find_true_refs()
self._ref_indexes = refs[0]
break
# glitch removal
self._fix_datapoint_glitches()
if len(self._encd_clk) < self._num_edges:
return [], []
self._fix_value_glitches()
# assign IRIG synched timestamp
self._time = scipy.interpolate.interp1d(
self._rising_edge,
self._irig_time,
kind='linear',
fill_value='extrapolate')(self._encd_clk)
self._quad_corrected = self._quad_form(
scipy.interpolate.interp1d(
self._quad_time,
self._quad_form(self._quad),
kind='linear',
fill_value='extrapolate')(self._time))
# If references are ambiguous, list up all possible patterns of
# reference indexes and angles
if self._num_dropped_slits > 0:
_angle = []
_ref_indexes = []
# restore all the reference indexes
self._ref_indexes = np.where(np.isin(self._encd_clk, self._ref_clk_tmp))[0]
refs = self._find_true_refs()
for ref in refs:
self._ref_indexes = ref
self._generate_sub_data(ref_clk=True)
self._calc_angle_linear(mod2pi)
_angle.append(self._angle)
_ref_indexes.append(self._ref_indexes)
self._angle = tuple(_angle)
self._ref_indexes = tuple(_ref_indexes)
else:
self._calc_angle_linear(mod2pi)
logger.debug('qualitycheck')
logger.debug('_time: ' + str(len(self._time)))
logger.debug('_angle: ' + str(len(self._angle)))
logger.debug('_encd_cnt: ' + str(len(self._encd_cnt)))
logger.debug('_encd_clk: ' + str(len(self._encd_clk)))
logger.debug('_ref_cnt: ' + str(len(self._ref_cnt)))
logger.debug('_ref_indexes: ' + str(len(self._ref_indexes)))
if isinstance(self._angle, tuple):
if np.any([len(self._time) != len(angle) for angle in self._angle]):
logger.warning('Failed to calculate hwp angle!')
return [], []
else:
if len(self._time) != len(self._angle):
logger.warning('Failed to calculate hwp angle!')
return [], []
logger.info('hwp angle calculation is finished.')
return self._time, self._angle
def _find_refs(self):
""" Find reference slits """
# Function to find the mean of an array without outliers
def _mean(arr):
high = np.percentile(arr, 90)
low = np.percentile(arr, 10)
return np.mean(arr[(arr < high) & (arr > low)])
# Function to find the mean of an array larger than the median without outliers
def _mean_high(arr):
high = np.percentile(arr, 90)
low = np.percentile(arr, 55)
return np.mean(arr[(arr < high) & (arr > low)])
# Generate self._encd_diff
self._generate_sub_data(ref_clk=False)
# Find the instantaneous average encoder clk spacing between datapoints
rot_spacing = np.arange(0, len(self._encd_diff), self._num_edges - 2)
diff_split = np.split(self._encd_diff, rot_spacing[1:])
clk_split = [np.average(clk) for clk in np.split(self._encd_clk, rot_spacing[1:])]
slit_dist = np.interp(self._encd_clk, clk_split, [_mean(_diff) for _diff in diff_split])
slit_dist_high = np.interp(self._encd_clk, clk_split, [_mean_high(_diff) for _diff in diff_split])
# Generate thresholds to distinguish references and glitched references
ref_hi_cond = (self._ref_edges + 1) * slit_dist * (1 + self._ref_range)
ref_lo_cond = (self._ref_edges + 1) * slit_dist * (1 - self._ref_range)
ref_gl_cond = slit_dist_high * (1 + self._ref_range)
# Normal references
ref_ind = np.where((self._encd_diff > ref_lo_cond) & (self._encd_diff < ref_hi_cond))[0]
# Potential glitched references
ref_ind_glitch = np.where((self._encd_diff < ref_lo_cond) & (self._encd_diff > ref_gl_cond))[0]
# Find and add glitched references to normal references
used_ind_pairs = []
def _before(x, ind):
return abs(x - ind) + 1e8*(1 + np.sign(x - ind))
def _after(x, ind):
return abs(x - ind) + 1e8*(1 - np.sign(x - ind))
if len(ref_ind) > 0:
for ind in ref_ind_glitch:
# Check that there is sufficient space before and after the glitched reference
before = ref_ind[np.argmin(_before(ref_ind, ind))]
after = ref_ind[np.argmin(_after(ref_ind, ind))]
if all(np.array([after - ind, ind - before]) > 0.9 * self._num_edges):
if (before, after) in used_ind_pairs:
continue
used_ind_pairs.append((before, after))
ref_ind = np.append(ref_ind, ind)
# Define the reference slit line to be the line before
# the two "missing" lines
# Store the count and clock values of the reference lines
self._ref_indexes = np.sort(ref_ind)
if len(self._ref_indexes) == 0:
if len(self._encd_diff) < self._num_edges:
logger.warning(
'cannot find reference points, # of data is less than # of slit')
else:
logger.warning(
'cannot find reference points, please adjust parameters!')
return False
# check quality of ref_indexes
number_of_bad_refs = np.sum(np.diff(self._ref_indexes) != self._num_edges - 2)
if number_of_bad_refs > 0:
logger.warning(f'There are {number_of_bad_refs} bad ref indexes')
self._ref_clk = self._encd_clk[self._ref_indexes]
self._ref_cnt = self._encd_cnt[self._ref_indexes]
logger.debug('found {} reference points'.format(
len(self._ref_indexes)))
return True
def _generate_sub_data(self, ref_clk=True):
""" Re-generates datafields derived from _encd_clk, _encd_cnt, and _ref_indexes """
self._encd_diff = np.ediff1d(self._encd_clk, to_begin=self._encd_clk[1]-self._encd_clk[0])
# Only generate if self._ref_indexes exists
if ref_clk:
self._ref_clk = np.take(self._encd_clk, self._ref_indexes)
self._ref_cnt = np.take(self._encd_cnt, self._ref_indexes)
return True
def _correct_bad_refs(self, plot=False, **kwargs):
""" Corrects glitches in reference slits """
# Generate statistics of the references
self._ref_mean = np.mean(self._encd_diff[self._ref_indexes])
self._ref_std = np.std(self._encd_diff[self._ref_indexes])
# Find the glitched reference correction size
ref_corr_size = np.zeros(len(self._ref_indexes))
for index, ind in enumerate(self._ref_indexes):
# Only correct references 5 stds away from the mean
if self._encd_diff[ind] < self._ref_mean - 5*self._ref_std:
# Find the expected correction value and step size
# in the forward direction
cvf, csf = self._find_corr_size(ind, 1)
# Find the expected correction value and step size
# in the reverse direction
cvr, csr = self._find_corr_size(ind, -1)
# Return the step size which corrects better
ref_corr_size[index] = csf if cvf < cvr else -csr
# Mask out points that are glitched
encd_mask = np.ones(len(self._encd_clk))
for index, ind in enumerate(np.copy(self._ref_indexes)):
corr = int(ref_corr_size[index])
if corr == 0:
continue
elif corr > 0:
# Create glitch mask and edit other reference
encd_mask[ind:ind + corr] *= 0
self._ref_indexes[index + 1:] -= corr
else:
# Create glitch mask and edit other reference
encd_mask[ind + corr:ind] *= 0
self._ref_indexes[index:] += corr
# Apply mask
self._encd_clk = self._encd_clk[encd_mask.astype(bool)]
for ind, value in enumerate(encd_mask.astype(bool)):
if not value:
self._encd_cnt[ind:] -= 1
self._encd_cnt = self._encd_cnt[encd_mask.astype(bool)]
# Re-generate various arrays
self._generate_sub_data()
return True
def _find_corr_size(self, ind, direction, glitch_steps = 10):
""" Finds how many datapoints to sum over in order to fix the glitch """
# Residual of summing up differences
running_avg = self._encd_diff[ind] - self._ref_mean
# Sum in a direction until the resisual starts increasing
for step in np.arange(1, 1 + glitch_steps):
# Make sure the next step index is reasonable
step_index = min(max(0, int(ind + direction*step)), len(self._encd_diff) - 1)
new_avg = running_avg + self._encd_diff[step_index]
if abs(new_avg) < abs(running_avg):
running_avg = new_avg
else:
# Return the minimum residual and the number of steps to reach it
return abs(running_avg), step - 1
else:
logger.warning('Error: Reference optimization took too long')
return self._ref_mean, 0
def _fill_refs(self):
""" Fill in the reference edges """
# If no references, say that the first sample is theta = 0
# This case comes up for testing with a function generator
if len(self._ref_clk) == 0:
self._ref_clk = [self._encd_clk[0]]
self._ref_cnt = [self._encd_cnt[0]]
return
# Loop over all of the reference slits
for ii in range(len(self._ref_indexes)):
logger.debug("\r {:.2f} %".format(
100. * ii / len(self._ref_indexes)), end="")
# Location of this slit
ref_index = self._ref_indexes[ii]
# Linearly interpolate the missing slits
clks_to_add = np.linspace(
self._encd_clk[ref_index - 1], self._encd_clk[ref_index], self._ref_edges + 2)[1:-1]
self._encd_clk = np.insert(self._encd_clk, ref_index, clks_to_add)
# Adjust the encoder count values for the added lines
# Add 2 to all future counts and interpolate the counts
# for the two added slits
self._encd_cnt[ref_index:] += self._ref_edges
cnts_to_add = np.linspace(
self._encd_cnt[ref_index - 1], self._encd_cnt[ref_index], self._ref_edges + 2)[1:-1]
self._encd_cnt = np.insert(self._encd_cnt, ref_index, cnts_to_add)
# Also adjsut the reference count values in front of
# this one for the added lines
self._ref_cnt[ii + 1:] += self._ref_edges
# Adjust the reference index values in front of this one
# for the added lines
self._ref_indexes[ii + 1:] += self._ref_edges
return
def _fill_refs_fast(self):
""" Fill in the reference edges """
# If no references, say that the first sample is theta = 0
# This case comes up for testing with a function generator
if len(self._ref_clk) == 0:
self._ref_clk = [self._encd_clk[0]]
self._ref_cnt = [self._encd_cnt[0]]
return True
# Split the encoder clk/cnt arrays based on the references
encd_clk_split = np.split(self._encd_clk, self._ref_indexes)
filled_clks = [encd_clk_split[0]]
encd_cnt_split = np.split(self._encd_cnt, self._ref_indexes)
filled_cnts = [encd_cnt_split[0]]
# Loop over every split
for i, ref_ind in enumerate(np.copy(self._ref_indexes)):
start_clk = self._encd_clk[ref_ind-1]
end_clk = self._encd_clk[ref_ind]
# Insert extra clks to fill the reference
filled_clks.append([int(start_clk+(end_clk-start_clk)/3),
int(start_clk+(end_clk-start_clk)*2/3)])
filled_clks.append(encd_clk_split[i+1])
# Adjust the cnt for added clks
filled_cnts.append(2*i + encd_cnt_split[i+1][0] + np.array([0, 1]))
filled_cnts.append(2*(i + 1) + encd_cnt_split[i+1])
# Adjust the ref for added clks
self._ref_indexes[i+1:] += self._ref_edges
# Convert split arrays back into single array
self._encd_clk = np.array([clk for entry in filled_clks for clk in entry])
self._encd_cnt = np.array([cnt for entry in filled_cnts for cnt in entry])
self._generate_sub_data()
return True
def _find_true_refs(self):
# If encoder slit is missing, true references and fake references are mixed
# find true references from the distance between references
# this is one of the slowest processes
max_gl = 20 # maximum number of glitches
unique_refs = set()
for i in range(len(self._ref_indexes)):
refs = [self._ref_indexes[i]]
for j in range(i, 0, -1):
r = self._ref_indexes[j]
if 0 <= refs[-1] - r - self._num_edges < max_gl:
refs.append(r)
refs = refs[::-1]
for j in range(i + 1, len(self._ref_indexes)):
r = self._ref_indexes[j]
if 0 <= r - refs[-1] - self._num_edges < max_gl:
refs.append(r)
unique_refs.add(tuple(refs))
# sort by length, very short ones are not true references
refs = sorted([np.array(v) for v in unique_refs], key=len, reverse=True)
refs = [v for v in refs if len(v) > len(refs[0]) / 2]
return refs
def _fix_datapoint_glitches(self):
""" Removes glitches that add encoder datapoints """
# Find how many extra datapoints there are per revolution
self._glitches = np.ediff1d(self._ref_indexes, to_end=self._num_edges) - self._num_edges
encd_diff_split = np.split(self._encd_diff, self._ref_indexes)
bad_fills = []
dead_rots = []
total_mask = [np.ones(len(encd_diff_split[0]), dtype=bool)]
# Loop through every rotation and find which points to mask out
for i, diff in enumerate(encd_diff_split[1:-1]):
# Only process rotations which have a glitch
if self._glitches[i] != 0:
# Assuming the signal has some static duty cycle, the clk difference between
# two points follows a bimodal distribution. These lines find the average
# clk diff peaks of that distribution as well as whether the rotation starts
# with a high or low clock difference
high, low = self._find_high_low_values(diff)
start_high = self._find_rot_start_type(diff)
# Generate a glitch mask for this rotation
result, mask = self._glitch_mask(diff, high, low, start_high)
# If the mask could not be generated, check if the duty cycle was incorrectly calculated
if not result:
result, mask = self._glitch_mask(diff, high, low, not start_high)
# If the mask could still not be generated, check if it was because of a packet drop or remove the rotation
if not result:
start_clk = self._ref_clk[i]
end_clk = self._ref_clk[i+1]
# If the rotation had a packet drop
if len(diff) >= self._num_edges and \
np.any([edges[0] < end_clk and start_clk < edges[1] for edges in self._edges_dropped_pkts]):
# Fill the rotation with data from an adjacent rotation
if i == 0:
gap_clk = self._encd_clk[self._ref_indexes[i+1]:self._ref_indexes[i+2]] \
- self._encd_clk[self._ref_indexes[i+1]] + self._encd_clk[self._ref_indexes[i]]
else:
if self._ref_indexes[i - 1] in bad_fills:
bad_fills.append(self._ref_indexes[i])
gap_clk = self._encd_clk[(self._ref_clk[i-1] <= self._encd_clk) &
(self._encd_clk < self._ref_clk[i])]
gap_clk = gap_clk[total_mask[-1]] - self._ref_clk[i-1] + self._ref_clk[i]
corr_factor = (self._ref_clk[i+1] - self._ref_clk[i]) * \
(self._num_edges - 1) / (gap_clk[-1] - gap_clk[0]) / (self._num_edges)
gap_clk = corr_factor * (gap_clk - gap_clk[0]) + gap_clk[0]
fill_clk = np.zeros(len(diff))
fill_clk[:self._num_edges] = gap_clk
self._encd_clk[(self._ref_clk[i] <= self._encd_clk) &
(self._encd_clk < self._ref_clk[i+1])] = fill_clk
mask = np.full(len(diff), False)
mask[:self._num_edges] = np.logical_not(mask[:self._num_edges])
else:
logger.debug(f'{i} dead rots index')
if i == 0: # For removing first rotation of spin up data if necessary
total_mask = [np.zeros(len(encd_diff_split[0]), dtype=bool)]
self._ref_indexes[1:] -= len(encd_diff_split[0])
mask = np.full(len(diff), False)
dead_rots.append(i)
total_mask.append(mask)
# Find how many glitches were removed
num_glitches = len(mask) - np.sum(mask)
self._ref_indexes[i+1:] -= num_glitches
logger.debug(f'{num_glitches}, {np.where(~np.array(mask))[0]}')
elif min(diff) < 0.1*np.median(diff):
# Sometimes a dropped packet gets filled with glitched data
bad_fills.append(self._ref_indexes[i])
total_mask.append(np.ones(len(diff), dtype=bool))
else:
total_mask.append(np.ones(len(diff), dtype=bool))
# For removing last rotation of spin down data if necessary
if len(encd_diff_split) - 3 in dead_rots:
total_mask.append(np.zeros(len(encd_diff_split[-1]), dtype=bool))
self._ref_indexes = self._ref_indexes[:-1]
else:
total_mask.append(np.ones(len(encd_diff_split[-1]), dtype=bool))
# Join the individual rotation masks into a single overall mask
total_mask = np.array([bool(mask) for entry in total_mask for mask in entry])
self._num_glitches = int(sum(~total_mask))
if self._num_glitches > 0:
logger.warning(f'{self._num_glitches} glitches are removed')
if len(dead_rots) > 0:
self._num_dead_rots = len(dead_rots)
logger.warning(f'Could not remove glitches from {len(dead_rots)} rotations')
self._encd_clk = self._encd_clk[total_mask]
self._encd_cnt -= np.cumsum(np.logical_not(total_mask).astype(int))
self._encd_cnt = self._encd_cnt[total_mask]
self._ref_indexes = np.delete(self._ref_indexes, dead_rots)
# Correct for packet drop fills which have glitches
for ref in bad_fills:
_before = self._ref_indexes[self._ref_indexes < ref]
before_ref = max(_before) if len(_before) > 0 else False
_after = self._ref_indexes[self._ref_indexes > ref]
after_ref = min(_after) if len(_after) > 0 else False
fill_ref = before_ref if before_ref else after_ref
if not fill_ref:
logger.warning(f'Cannot correct glitches, data might be too short')
continue
fill_values = (self._encd_clk[ref + self._num_edges] - self._encd_clk[ref]) * \
(self._encd_clk[fill_ref:fill_ref + self._num_edges] - self._encd_clk[fill_ref]) / \
(self._encd_clk[fill_ref + self._num_edges] - self._encd_clk[fill_ref]) + \
self._encd_clk[ref]
self._encd_clk[ref:ref + self._num_edges] = fill_values
return True
def _find_high_low_values(self, arr):
""" Finds the two peaks of an array with a bimodal distribution """
# This function leads to warning of empty array when arr is very short
med = np.median(arr[arr > 0.3*np.median(arr)])
high = np.median(arr[arr > med])
low = np.median(arr[np.logical_and(arr < med, arr > 0.3*med)])
return high, low
def _find_rot_start_type(self, arr):
""" Finds the starting pattern of an array with a bimodal distribution """
even = np.median(np.diff(arr)[::2][:20])
odd = np.median(np.diff(arr)[1::2][:20])
return True if even < odd else False
def _glitch_mask(self, diffs, high, low, start):
"""
Generates a mask of 'good' points in an array which is expected to follow
a bimodal distribution
"""
return_mask = [True, True]
toggle = not start
diff_sum = 0
prev_res = 0
# Iterate over ever point in the array, checking the rolling sum
# against what it is expected to be. If the residual doesn't follow
# the expected pattern, mask that point as a glitch
for ind, diff in enumerate(diffs[2:]):
diff_sum += diff
res = abs(high-diff_sum) if toggle else abs(low-diff_sum)
if prev_res > res:
diff_forward = diff_sum + diffs[ind+3] if len(diffs) > ind + 3 else diff_sum
comp = 2*low + high if toggle else low + 2*high
if abs(comp-diff_forward) > abs(high+low-diff_forward):
return_mask.append(False)
prev_res = res
continue
return_mask.append(True)
diff_sum = diff
toggle = not toggle
prev_res = abs(high-diff_sum) if toggle else abs(low-diff_sum)
else:
return_mask.append(True)
return_mask = return_mask[1:]
if np.sum(return_mask) == self._num_edges + 1 and diffs[-1] < 0.3 * low:
return_mask[-1] = False
# Check that the number of valid points is what we expect in one rotation
if np.sum(return_mask) == self._num_edges:
return True, return_mask
else:
return False, None
def _fix_value_glitches(self):
""" Removes glitches that change the encoder value """
self._generate_sub_data()
diff_matrix = np.split(self._encd_diff, self._ref_indexes)
# Find the average value in each rotation
norm_matrix = np.array([[np.mean(diff[1:])] for diff in diff_matrix])
# Average rotation template
template = np.median(diff_matrix[1:-1]/norm_matrix[1:-1], axis=0)
# Handle the first and last rotations
cut_start = self._num_edges * int(len(diff_matrix[0]) / self._num_edges + 1) - len(diff_matrix[0])
expectation = np.tile(template, int(len(self._encd_diff) / self._num_edges) + 2)
expectation = expectation[cut_start:cut_start + len(self._encd_diff)]
# Smoothly vary the expectation depending on speed
spl = scipy.interpolate.CubicSpline(self._encd_cnt[self._ref_indexes], self._encd_clk[self._ref_indexes])
expectation *= spl.derivative()(self._encd_cnt)
# Find which encd values differ from expectation in a way that
# looks like a glitch. At 2 Hz points 5% away from expectations are
# regarded as value glutches.
# Increase the threshold depending on rotation speed
error = self._encd_diff - expectation
fhwp_inv = np.diff(self._encd_clk[self._ref_indexes]) / 2e8
thresholds = 0.05 * 2 * fhwp_inv
for i, threshold in zip(self._ref_indexes[:-1], thresholds):
for j in range(self._num_edges):
dist = abs(error[i+j]) - threshold * expectation[i+j]
if dist > 0 and dist < expectation[i+j]:
dist_inds = 3 if j == self._num_edges - 1 else 1
dist_sign = np.sign(error[i+j])
try:
error[i+j] -= dist_sign*dist
error[i+j+1:i+j+1+dist_inds] += dist_sign*dist/dist_inds
self._num_value_glitches += 1
except IndexError:
pass
# Recreate the encoder clock after accounting for value glitches
self._encd_clk = self._encd_clk + np.cumsum(expectation + error - self._encd_diff)
self._generate_sub_data()
return True
def _calc_angle_linear(self, mod2pi=True):
""" Calculate hwp angle of encoder counters for each revolution.
The ref_indexes are at the "rising edge" of reference/missing slit.
On the other hand, the actual reference point is the center between
the "riging edge" and "falling edge" of the referece/missing slit.
Therefore, the angle of ref_indexes should be `2 pi * integer - half of
slit width (360 deg / num_edges / 2)`.
"""
self._encd_cnt_split = np.split(self._encd_cnt, self._ref_indexes)
angle_first_revolution = (self._encd_cnt_split[0] - self._ref_cnt[0]) * \
(2 * np.pi / self._num_edges)
angle_last_revolution = (self._encd_cnt_split[-1] - self._ref_cnt[-1]) * \
(2 * np.pi / self._num_edges) + (len(self._ref_cnt) - 1) * 2 * np.pi
self._angle = np.concatenate(
[(self._encd_cnt_split[i] - self._ref_cnt[i]) *
(2 * np.pi /np.diff(self._ref_indexes)[i - 1]) + i * 2 * np.pi
for i in range(1, len(self._encd_cnt_split) - 1)]
)
self._angle = np.concatenate(
[angle_first_revolution, self._angle.flatten(), angle_last_revolution])
# Subtract half of slit width
self._angle -= np.pi / self._num_edges
if mod2pi:
self._angle = self._angle % (2 * np.pi)
return
def _duplication_check(self):
""" Check the duplication in hk data and remove it """
unique_array, unique_index = np.unique(
self._encd_cnt, return_index=True)
if len(unique_array) != len(self._encd_cnt):
logger.warning(
'Duplication is found in encoder data, performing correction.')
self._encd_cnt = unique_array
self._encd_clk = self._encd_clk[unique_index]
unique_array, unique_index = np.unique(
self._rising_edge, return_index=True)
if len(unique_array) != len(self._rising_edge):
logger.warning(
'Duplication is found in IRIG data, performing correction.')
self._rising_edge = unique_array
self._irig_time = self._irig_time[unique_index]
def _irig_quality_check(self):
""" IRIG timing quality check """
idx = np.where(np.diff(self._irig_time) == 1)[0]
if self._irig_type == 1:
idx = np.where(np.isclose(np.diff(self._irig_time),
np.full(len(self._irig_time)-1, 0.1)))[0]
if len(self._irig_time) - 1 == len(idx):
return
# check packet drop or data point glitches
elif len(self._irig_time) > len(idx) and len(idx) > 0:
self._num_glitches_irig = np.sum(np.diff(self._irig_time) < 1)
if self._num_glitches_irig > 0:
logger.warning(f'{self._num_glitches_irig} additional irig time is detected')
self._num_dropped_pkts_irig = np.sum(np.diff(self._irig_time) > 1)
if self._num_dropped_pkts_irig > 0:
logger.warning(f'{self._num_dropped_pkts_irig} irig packet drops is detected')
if np.any(np.diff(self._irig_time) > 5):
logger.warning(
'a part of the IRIG time is incorrect, performing the correction process...')
self._irig_time = self._irig_time[idx]
self._rising_edge = self._rising_edge[idx]
logger.warning('deleted wrong irig_time, indices: ' +
str(np.where(np.diff(self._irig_time) != 1)[0]))
else:
self._irig_time = np.array([])
self._rising_edge = np.array([])
return
# check value glitches of irig rising edges
def min_distance_clk(time, interval):
mod = np.mod(time, interval)
return np.min([mod, interval - mod], axis=0)
bbb_clk = np.diff(self._rising_edge)
avg_bbb_clk = np.median(bbb_clk)
self._num_value_glitches_irig = np.sum(min_distance_clk(bbb_clk, avg_bbb_clk) >= 1e5)
if self._num_value_glitches_irig > 0:
logger.warning(f'{self._num_value_glitches_irig} glitched irig_time is detected')
idx = np.where(min_distance_clk(bbb_clk, avg_bbb_clk) < 1e5)[0]
if len(self._irig_time) > len(idx) and len(idx) > 0:
self._irig_time = self._irig_time[idx]
self._rising_edge = self._rising_edge[idx]
def _fix_irig_desync(self):
""" Fix IRIG desynchronization by adding constant time offset """
if self._irig_desync is None:
return
for t0, t1, dt in self._irig_desync:
desynced = (t0 <= self._irig_time) & (self._irig_time <= t1)
if np.any(desynced):
logger.warning('irig time has known desynchronization, apply correction')
self._irig_time[desynced] -= dt
def _process_counter_overflow_glitch(self):
"""
Treat glitches due to 32 bit internal counter overflow
We suspect that this is a glitch caused by the very occasional failure of the encoder counter overflow correction
due to latency or other problems on the pc running the encoder agent.
"""
idx = np.where((np.diff(self._encd_clk)>=2**32-1) & (np.diff(self._encd_clk)<2**32+1e6))[0] + 1
if len(idx) > 0:
logger.warning(f'{len(idx)} counter overflow glitches are found, perform correction.')
for i in idx:
self._encd_clk[i] -= 2**32
def _process_counter_index_reset(self):
""" Treat counter index reset due to agent reboot """
idx = np.where(np.diff(self._encd_cnt) < -1e4)[0] + 1
if len(idx) > 0:
logger.warning(f'{len(idx)} counter resets are found, perform correction.')
for i in idx:
self._encd_cnt[i:] = self._encd_cnt[i:] + abs(np.diff(self._encd_cnt)[i-1]) + 1
def _fill_dropped_packets(self):
""" Estimate the number of dropped packets """
cnt_diff = np.diff(self._encd_cnt)
dropped_samples = np.sum(cnt_diff[cnt_diff >= self._pkt_size] - 1)
self._num_dropped_pkts = dropped_samples // self._pkt_size
if self._num_dropped_pkts > 0:
logger.warning('{} dropped packets are found, performing fill process'.format(
self._num_dropped_pkts))
for _ in np.where(np.diff(self._encd_cnt) > 1)[0]:
index = np.where(np.diff(self._encd_cnt) > 1)[0][0]
gap = int(np.diff(self._encd_cnt)[index]) - 1
# Fill dropped counters with counters one before or one after rotation.
# This filling method works even when the reference slot counter is dropped.
rot_needed = int(np.ceil(gap/self._edges_per_rev)) + 1
self._edges_dropped_pkts.append((self._encd_clk[index], self._encd_clk[index+1]))
if index - rot_needed*self._edges_per_rev >= 0:
rot_before_index = index - rot_needed*self._edges_per_rev
gap_clk = self._encd_clk[rot_before_index:rot_before_index + gap + 2] \
- self._encd_clk[rot_before_index] + self._encd_clk[index]
corr_factor = (self._encd_clk[index + 1] - self._encd_clk[index])/(gap_clk[-1] - gap_clk[0])
gap_clk = (corr_factor*(gap_clk - gap_clk[0]) + gap_clk[0])[1:-1]
else:
rot_after_index = index + rot_needed*self._edges_per_rev + 1
gap_clk = self._encd_clk[rot_after_index - gap - 1:rot_after_index + 1] \
- self._encd_clk[rot_after_index] + self._encd_clk[index + 1]
corr_factor = (self._encd_clk[index + 1] - self._encd_clk[index])/(gap_clk[-1] - gap_clk[0])
gap_clk = (corr_factor*(gap_clk - gap_clk[-1]) + gap_clk[-1])[1:-1]
gap_cnt = np.arange(self._encd_cnt[index] + 1, self._encd_cnt[index+1])
self._encd_cnt = np.insert(self._encd_cnt, index + 1, gap_cnt)
self._encd_clk = np.insert(self._encd_clk, index + 1, gap_clk)
self._edges_dropped_pkts = np.array(self._edges_dropped_pkts)
return True
def _encoder_packet_sort(self):
cnt_diff = np.diff(self._encd_cnt)
if np.any(cnt_diff != 1):
logger.debug(
'a part of the counter is incorrect')
if np.any(cnt_diff < 0):
if 1 - self._pkt_size in cnt_diff:
logger.warning(
'Packet flip found, performing sort process')
idx = np.argsort(self._encd_cnt)
self._encd_clk = self._encd_clk[idx]
else:
logger.warning('Packet drop exists')
else:
logger.debug('no need to fix encoder index')
return
def _quad_form(self, quad):
# bit process
quad[(quad >= 0.5)] = 1
quad[(quad < 0.5)] = 0
offset = 0
outlier_count = 0
for quad_split in np.array_split(quad, 1 + np.floor(len(quad) / 100)):
if quad_split.mean() > 0.1 and quad_split.mean() < 0.9:
for j in range(len(quad_split)):
quad[j + offset] = int(quad_split.mean() + 0.5)
offset += len(quad_split)
continue
outlier = np.argwhere(
np.abs(quad_split.mean() - quad_split) > 0.5).flatten()
if len(outlier) > 5:
outlier_count += 1
for i in outlier:
if i == 0:
ii, iii = i + 1, i + 2
elif i == outlier[-1]:
ii, iii = i - 1, i - 2
else:
ii, iii = i - 1, i + 1
if quad_split[i] + quad_split[ii] + quad_split[iii] == 1:
quad[i + offset] = 0
if quad_split[i] + quad_split[ii] + quad_split[iii] == 2:
quad[i + offset] = 1
offset += len(quad_split)
if outlier_count > 0:
logger.warning("flipping quad was corrected by mean value "
f"in {outlier_count} sections.")
return quad