#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import time
import numpy as np
import scipy.interpolate
import h5py
import so3g
from spt3g import core
import logging
import yaml
import datetime
import sotodlib
import traceback
logger = logging.getLogger(__name__)
[docs]
class G3tHWP():
def __init__(self, config_file=None):
"""
Class to manage L2 HK data into HWP angle g3.
Args
-----
config_file: str
path to config yaml file
"""
if config_file is not None:
if os.path.exists(config_file):
self.config_file = config_file
self.configs = yaml.safe_load(open(self.config_file, "r"))
logger.info("Loading config from " + self.config_file)
else:
logger.warning(
"Cannot find config file, use all default values")
self.configs = {}
else:
logger.warning("Cannot find config file, use all default values")
self.configs = {}
self._start = 0
self._end = 0
self._file_list = None
self._start = self.configs.get('start', 0)
self._end = self.configs.get('end', 0)
self._file_list = self.configs.get('file_list', None)
self._data_dir = self.configs.get('data_dir', None)
self._margin = self.configs.get('margin', 10)
# 1st/2nd encoder readout
self._field_instance = self.configs.get('field_instance',
'satp1.hwp-bbb-e1.feeds.HWPEncoder')
self._field_instance_sub = self.configs.get('field_instance_sub',
'satp1.hwp-bbb-e2.feeds.HWPEncoder')
self._field_list = self.configs.get('field_list',
['rising_edge_count', 'irig_time', 'counter',
'counter_index', 'irig_synch_pulse_clock_time',
'irig_synch_pulse_clock_counts', 'quad'])
# Size of pakcets sent from the BBB
# 120 in the latest version, 150 in the previous version
self._pkt_size = self.configs.get('pkt_size', 120)
# IRIG type
# 0: 1Hz IRIG (default), 1: 10Hz IRIG
self._irig_type = self.configs.get('irig_type', 0)
# Number of encoder slits per HWP revolution
self._num_edges = self.configs.get('num_edges', 570 * 2)
# Reference slit edgen width
self._ref_edges = self.configs.get('ref_edges', 2)
# Reference slit angle
self._delta_angle = 2 * np.pi / self._num_edges
# Search range of reference slot
self._ref_range = self.configs.get('ref_range', 0.1)
# Threshoild for outlier data to calculate nominal slit width
self._slit_width_lim = self.configs.get('slit_width_lim', 0.1)
# The distance from the hwp center to the fine encoder slots (mm)
self._encoder_disk_radius = self.configs.get(
'encoder_disk_radius', 346.25)
# Output path + filename
self._output = self.configs.get('output', None)
# logger for write_solution_h5
self._write_solution_h5_logger = 'Not set'
# encoder suffixes
self._suffixes = ['_1', '_2']
[docs]
def load_data(self, start=None, end=None,
data_dir=None, instance=None):
"""
Loads house keeping data for a given time range and
returns HWP parameters in L2 HK .g3 file
Args
-----
start : timestamp or datetime
start time for data, assumed to be in UTC unless specified
end : timestamp or datetime
end time for data, assumed to be in UTC unless specified
data_dir : str or None
path to HK g3 file, overwrite config file
instance : str or None
instance of field list, overwrite config file
ex.) lab data 'observatory.HBA.feeds.HWPEncoder'
ex.) site data 'satp1.hwp-bbb-e1.feeds.HWPEncoder'
ex.) site data 'satp3.hwp-bbb-a1.feeds.HWPEncoder'
Returns
----
dict
{alias[i] : (time[i], data[i])}
"""
if start is not None and end is not None:
self._start = start
self._end = end
if self._start is None:
logger.error("Cannot find time range")
return {}
if isinstance(start, datetime.datetime):
if start.tzinfo is None:
logger.warning(
'No tzinfo info in start argument, set to utc timezone')
start = start.replace(tzinfo=datetime.timezone.utc)
self._start = start.timestamp()
if isinstance(end, datetime.datetime):
if end.tzinfo is None:
logger.warning(
'No tzinfo info in end argument, set to utc timezone')
end = start.replace(tzinfo=datetime.timezone.utc)
self._end = end.timestamp()
if data_dir is not None:
self._data_dir = data_dir
if self._data_dir is None:
logger.error("Cannot find data directory")
return {}
if instance is not None:
self._field_instance = instance
# load housekeeping data with hwp keys
logger.info('Loading HK data files ')
logger.info("input time range: " +
str(self._start) + " - " + str(self._end))
fields, alias = self._key_formatting()
data = so3g.hk.load_range(
self._start,
self._end,
fields,
alias,
data_dir=self._data_dir)
if not any(data):
logger.info('HWP is not spinning in time range {' + str(
self._start) + ' - ' + str(self._end) + '}, data is empty')
return data
[docs]
def load_file(self, file_list=None, instance=None):
"""
Loads house keeping data with specified g3 files.
Return HWP parameters from SO HK data.
Args
-----
file_list : str or list or None
path and file name of input level 2 HK g3 file
instance : str or None
instance of field list, overwrite config file
ex.) lab data 'observatory.HBA.feeds.HWPEncoder'
ex.) site data 'satp1.hwp-bbb-e1.feeds.HWPEncoder'
ex.) site data 'satp3.hwp-bbb-a1.feeds.HWPEncoder'
Returns
----
dict
{alias[i] : (time[i], data[i])}
"""
if file_list is None and self._file_list is None:
logger.error('Cannot find input g3 file')
return {}
if file_list is not None:
self._file_list = file_list
if instance is not None:
self._field_instance = instance
# load housekeeping files with hwp keys
scanner = so3g.hk.HKArchiveScanner()
if not (isinstance(self._file_list, list)
or isinstance(self._file_list, np.ndarray)):
self._file_list = [self._file_list]
for f in self._file_list:
if not os.path.exists(f):
logger.error('Cannot find input g3 file')
return {}
scanner.process_file(f)
logger.info("Loading HK data files: {}".format(
' '.join(map(str, self._file_list))))
arc = scanner.finalize()
if not any(arc.get_fields()[0]):
self._start = 0
self._end = 0
return {}
fields, alias = self._key_formatting()
if not np.any([f in arc.get_fields()[0].keys() for f in fields]):
logger.info(
"HWP is not spinning in input g3 files or cannot find field")
return {}
if self._start == 0 and self._end == 0:
self._start = np.min([arc.simple(f)[0][0]
for f in fields if f in arc.get_fields()[0].keys()])
self._end = np.max([arc.simple(f)[0][-1]
for f in fields if f in arc.get_fields()[0].keys()])
data = {a: arc.simple(f) for a, f in zip(
alias, fields) if f in arc.get_fields()[0].keys()}
return data
def _key_formatting(self):
"""
Formatting hwp housekeeping field names and aliases
Return
-----
fields, alias
"""
# 1st encoder readout
fields = [self._field_instance + '_full.' + f if 'counter' in f
else self._field_instance + '.' + f for f in self._field_list]
alias = [a + '_1' for a in self._field_list]
# 2nd encoder readout
if self._field_instance_sub is not None:
fields += [self._field_instance_sub + '_full.' + f if 'counter' in f
else self._field_instance_sub + '.' + f for f in self._field_list]
alias += [a + '_2' for a in self._field_list]
# metadata key
meta_keys = {
'pid_direction': 'hwp-pid.feeds.hwppid.direction',
}
platform = self._field_instance.split('.')[0]
for k, f in meta_keys.items():
alias.append(k)
fields.append(platform + '.' + f)
return fields, alias
def _data_formatting(self, data, suffix):
"""
Formatting encoder data
Args
-----
data : dict
HWP HK data from load_data
suffix: Specify whether to use 1st or 2nd encoder, '_1' or '_2'
'_1' for 1st encoder, '_2' for 2nd encoder
Returns
--------
dict
{'rising_edge_count', 'irig_time', 'counter', 'counter_index', 'quad', 'quad_time'}
"""
keys = ['rising_edge_count', 'irig_time',
'counter', 'counter_index', 'quad', 'quad_time']
out = {k: data[k+suffix][1] if k+suffix in data.keys() else []
for k in keys}
# irig part
if 'irig_time'+suffix not in data.keys():
logger.warning(
'All IRIG time is not correct for encoder' + suffix)
return out
if self._irig_type == 1:
out['irig_time'] = data['irig_synch_pulse_clock_time'+suffix][1]
out['rising_edge_count'] = data['irig_synch_pulse_clock_counts'+suffix][1]
# encoder part
if 'counter'+suffix not in data.keys():
logger.warning(
'No encoder data is available for encoder'+suffix)
return out
out['quad'] = data['quad'+suffix][1]
out['quad_time'] = data['quad'+suffix][0]
return out
def _slowdata_process(self, fast_time, irig_time, suffix):
""" Diagnose hwp status and output status flags
Returns
--------
dict
{stable, locked, hwp_rate, slow_time}
Notes
------
- Time definition -
if fast_time exists: slow_time = fast_time
elif: irig_time exists but no fast_time, slow_time = irig_time
else: slow_time is per 10 sec array
"""
slow_time = np.arange(self._start, self._end, 10)
if len(irig_time) == 0:
out = {
'locked'+suffix: np.zeros_like(slow_time, dtype=bool),
'stable'+suffix: np.zeros_like(slow_time, dtype=bool),
'hwp_rate'+suffix: np.zeros_like(slow_time, dtype=np.float32),
'slow_time'+suffix: slow_time,
}
return out
if len(fast_time) == 0:
fast_irig_time = irig_time
locked = np.zeros_like(irig_time, dtype=bool)
stable = np.zeros_like(irig_time, dtype=bool)
hwp_rate = np.zeros_like(irig_time, dtype=np.float32)
else:
# hwp speed calc. (approximate using ref)
hwp_rate_ref = 1 / np.diff(fast_time[self._ref_indexes])
hwp_rate = [hwp_rate_ref[0] for i in range(self._ref_indexes[0])]
for n in range(len(np.diff(self._ref_indexes))):
hwp_rate += [hwp_rate_ref[n]
for r in range(np.diff(self._ref_indexes)[n])]
hwp_rate += [hwp_rate_ref[-1] for i in range(len(fast_time) -
self._ref_indexes[-1])]
fast_irig_time = fast_time
locked = np.ones_like(fast_time, dtype=bool)
locked[np.where(hwp_rate == 0)] = False
stable = np.ones_like(fast_time, dtype=bool)
# irig only status
irig_only_time = irig_time[np.where(
(irig_time < fast_time[0]) | (irig_time > fast_time[-1]))]
irig_only_locked = np.zeros_like(irig_only_time, dtype=bool)
irig_only_hwp_rate = np.zeros_like(
irig_only_time, dtype=np.float32)
fast_irig_time = np.append(irig_only_time, fast_time)
fast_irig_idx = np.argsort(fast_irig_time)
fast_irig_time = fast_irig_time[fast_irig_idx]
locked = np.append(irig_only_locked, locked)[fast_irig_idx]
hwp_rate = np.append(irig_only_hwp_rate, hwp_rate)[fast_irig_idx]
stable = np.ones_like(fast_irig_time, dtype=bool)
# slow status
slow_time = slow_time[np.where(
(slow_time < fast_irig_time[0]) | (slow_time > fast_irig_time[-1]))]
slow_locked = np.zeros_like(slow_time, dtype=bool)
slow_stable = np.zeros_like(slow_time, dtype=bool)
slow_hwp_rate = np.zeros_like(slow_time, dtype=np.float32)
slow_time = np.append(slow_time, fast_irig_time)
slow_idx = np.argsort(slow_time)
slow_time = slow_time[slow_idx]
locked = np.append(slow_locked, locked)[slow_idx]
stable = np.append(slow_stable, stable)[slow_idx]
hwp_rate = np.append(slow_hwp_rate, hwp_rate)[slow_idx]
locked[np.where(hwp_rate == 0)] = False
return {'locked'+suffix: locked, 'stable'+suffix: stable, 'hwp_rate'+suffix: hwp_rate, 'slow_time'+suffix: slow_time}
[docs]
def analyze(self, data, ratio=None, mod2pi=True, fast=True, suffix='_1'):
"""
Analyze HWP angle solution
to be checked by hardware that 0 is CW and 1 is CCW from (sky side) consistently for all SAT
Args
-----
data : dict
HWP HK data from load_data
ratio : float, optional
parameter for referelce slit
threshold = 2 slit distances +/- ratio
mod2pi : bool, optional
If True, return hwp angle % 2pi
fast : bool, optional
If True, run fast fill_ref algorithm
Returns
--------
dict
{fast_time, angle, slow_time, stable, locked, hwp_rate, template, filled_indexes}
Notes
------
* fast_time: timestamp
* IRIG synched timing (~2kHz)
* angle (float): IRIG synched HWP angle in radian
* slow_time: timestamp
* time list of slow block
* stable: bool
* if non-zero, indicates the HWP spin state is known.
* i.e. it is either spinning at a measurable rate, or stationary.
* When this flag is non-zero, the hwp_rate field can be taken at face value.
* locked: bool
* if non-zero, indicates the HWP is spinning and the position solution is working.
* In this case one should find the hwp_angle populated in the fast data block.
* hwp_rate: float
* the "approximate" HWP spin rate, with sign, in revs / second.
* Use placeholder value of 0 for cases when not "stable".
* filled_indexes: boolean array
* indexes where we filled due to packet drop etc.
"""
if not any(data):
logger.info("no HWP field data")
d = self._data_formatting(data, suffix)
# hwp angle calc.
if ratio is not None:
logger.info(f"Overwriting reference slit threshold by {ratio}.")
self._ref_range = ratio
out = {}
logger.info("Start calclulating angle.")
if len(d['irig_time']) == 0:
logger.warning('There is no correct IRIG timing. Stop analyze.')
else:
fast_time, angle = self._hwp_angle_calculator(
d['counter'], d['counter_index'], d['irig_time'],
d['rising_edge_count'], d['quad_time'], d['quad'],
mod2pi, fast)
if len(fast_time) == 0:
logger.warning('analyzed encoder data is None')
out.update(self._slowdata_process(
fast_time, d['irig_time'], suffix))
out['fast_time'+suffix] = fast_time
out['angle'+suffix] = angle
out['quad'+suffix] = self._quad_corrected
out['ref_indexes'+suffix] = self._ref_indexes
out['filled_indexes'+suffix] = self._filled_indexes
return out
def eval_angle(self, solved, poly_order=3, suffix='_1'):
"""
Evaluate the non-uniformity of hwp angle timestamp and subtract
The raw hwp angle timestamp is kept.
Args
-----
solved: dict
dict data from analyze
poly_order:
order of polynomial filtering for removing drift of hwp speed
for evaluating the non-uniformity of hwp angle.
suffix:
'_1' for 1st encoder, '_2' for 2nd encoder
Returns
--------
output: dict
{fast_time, fast_time_raw, angle, slow_time, stable, locked, hwp_rate, template}
Notes
------
* template: float array (ratio)
* Averaged non-uniformity of the hwp angle
* normalized by the step of the angle encoder
non-uniformity of hwp angle comes from following reasons,
- non-uniformity of encoder slits
- sag of rotor
- bad tuning of the comparator threshold of DriverBoard
- degradation of LED
and the non-uniformity can be time-dependent.
Need to evaluate and subtract it before interpolating hwp angle into Smurf timestamps.
The non-uniformity of encoder slots creates additional hwp angle jitter.
The maximum possible additional jitter is comparable to the requirement of angle jitter.
We make an template of encoder slits and subtract it from the timestamp.
"""
if 'fast_time_raw'+suffix in solved.keys():
logger.info(
'Non-uniformity is already subtracted. Calculation is skipped.')
return
def detrend(array, deg=poly_order):
x = np.linspace(-1, 1, len(array))
p = np.polyfit(x, array, deg=deg)
pv = np.polyval(p, x)
return array - pv
logger.info('Remove non-uniformity from hwp angle and overwrite')
# template subtraction
ft = solved['fast_time'+suffix][solved['ref_indexes'+suffix]
[0]:solved['ref_indexes'+suffix][-2]+1]
# remove rotation frequency drift for making a template of encoder slits
ft = detrend(ft, deg=3)
# make template
template_slit = np.diff(ft).reshape(
len(solved['ref_indexes'+suffix])-2, self._num_edges)
template_slit = np.average(template_slit, axis=0)
average_slit = np.average(template_slit)
# subtract template, keep raw timestamp
subtract = np.cumsum(np.roll(np.tile(template_slit-average_slit, len(
self._ref_indexes) + 1), self._ref_indexes[0] + 1)[:len(solved['fast_time'+suffix])])
solved['fast_time_raw'+suffix] = solved['fast_time'+suffix]
solved['fast_time'+suffix] = solved['fast_time'+suffix] - subtract
solved['template'+suffix] = template_slit / \
np.average(np.diff(solved['fast_time'+suffix]))
def eval_offcentering(self, solved):
"""
Evaluate the off-centering of the hwp from the phase difference between two encoders.
Assume that slot pattern subraction is already applied
* Definition of offcentering must be clear.
Args
-----
solved: dict
dict solved from eval_angle
{fast_time_1, angle_2, fast_time_2, angle_2, ...}
Returns
--------
output: dict
{offcenter_idx1, offcenter_idx2, offcentering, offset_time}
Notes
------
* offcenter_idx1: int
* index of the solved['fast_time_1'] for which offcentering is estimated.
* offcenter_idx2: int
* index of the solved['fast_time_2'] for which offcentering is estimated.
* offcentering: float
* Offcentering (mm) at solved['fast_time(_2)'][offcenter_idx1(2)].
* offset_time: float
* Offset time of the encoder signals induced by the offcentering.
* Offset time is the delayed (advanced) timing of the encoder1 (2) in sec.
"""
logger.info('Remove offcentering effect from hwp angle and overwrite')
# Calculate offcentering from where the first reference slot was detected by the 2nd encoder.
if solved["ref_indexes_1"][0] > self._num_edges/2-1:
offcenter_idx1_start, offcenter_idx2_start = int(
solved["ref_indexes_1"][0]-self._num_edges/2), int(solved["ref_indexes_2"][0])
else:
offcenter_idx1_start, offcenter_idx2_start = int(
solved["ref_indexes_1"][1]-self._num_edges/2), int(solved["ref_indexes_2"][0])
# Calculate offcentering to the end of the shorter encoder data.
if len(solved["fast_time_1"][offcenter_idx1_start:]) > len(solved["fast_time_2"][offcenter_idx2_start:]):
idx_length = len(solved["fast_time_2"][offcenter_idx2_start:])
else:
idx_length = len(solved["fast_time_1"][offcenter_idx1_start:])
offcenter_idx1 = np.arange(
offcenter_idx1_start, offcenter_idx1_start+idx_length-1)
offcenter_idx2 = np.arange(
offcenter_idx2_start, offcenter_idx2_start+idx_length-1)
# Calculate the offset time of the encoders induced by the offcentering.
offset_time = (solved["fast_time_1"][offcenter_idx1] -
solved["fast_time_2"][offcenter_idx2])/2
# Calculate the offcentering (mm).
period = (solved["fast_time_1"][offcenter_idx1+1] -
solved["fast_time_1"][offcenter_idx1])*self._num_edges
offset_angle = offset_time/period*2*np.pi
offcentering = np.tan(offset_angle)*self._encoder_disk_radius
solved['offcenter_idx1'] = offcenter_idx1
solved['offcenter_idx2'] = offcenter_idx2
solved['offcentering'] = offcentering
solved['offset_time'] = offset_time
return
def correct_offcentering(self, solved):
"""
Correct the timing of solved['fast_time'] which is delayed (advanced) by the offcentering.
Args
-----
solved: dict
dict solved from eval_angle
{fast_time_1, angle_1, fast_time_2, angle_2, ...}
offcentering: dict
dict solved from eval_offcentering
{offcenter_idx1, offcenter_idx2, offcentering, offset_time}
Returns
--------
output: dict
{fast_time, angle, fast_time_2, angle_2, ...}
Notes
------
* offcenter_idx1: int
* index of the solved['fast_time_1'] for which offcentering is estimated.
* offcenter_idx2: int
* index of the solved['fast_time_2'] for which offcentering is estimated.
* offcentering: float
* Offcentering (mm) at solved['fast_time_1(2)'][offcenter_idx1(2)].
* offset_time: float
* Offset time of the encoder signals induced by the offcentering.
* Offset time is the delayed (advanced) timing of the encoder1 (2) in sec.
* We should allow to correct the offcentering by external input, since offcentering measurement is not always available.
"""
# Skip the correction when the offcentering estimation doesn't exist.
if 'offcentering' not in solved.keys():
logger.warning(
'Offcentering info does not exist. Offcentering correction is skipped.')
return
offcenter_idx1 = solved['offcenter_idx1']
offcenter_idx2 = solved['offcenter_idx2']
offset_time = solved['offset_time']
solved['fast_time_1'] = solved['fast_time_1'][offcenter_idx1] - offset_time
solved['fast_time_2'] = solved['fast_time_2'][offcenter_idx2] + offset_time
solved['angle_1'] = solved['angle_1'][offcenter_idx1]
solved['angle_2'] = solved['angle_2'][offcenter_idx2]
return
[docs]
def write_solution(self, solved, output=None):
"""
Output HWP angle + flags as SO HK g3 format
Args
-----
solved: dict
dict data from analyze
output: str or None
output path + file name, override config file
Notes
-----------
Output file format
* Provider: 'hwp'
* Fast block
* 'hwp.hwp_angle'
* Slow block
* 'hwp.stable'
* 'hwp.locked'
* 'hwp.hwp_rate'
- fast_time: timestamp
IRIG synched timing (~2kHz)
- angle: float
IRIG synched HWP angle in radian
- slow_time: timestamp
time list of slow block
- stable: bool
if non-zero, indicates the HWP spin state is known.
i.e. it is either spinning at a measurable rate, or stationary.
When this flag is non-zero, the hwp_rate field can be taken at face value.
- locked: bool
if non-zero, indicates the HWP is spinning and the position solution is working.
In this case one should find the hwp_angle populated in the fast data block.
- hwp_rate: float
the "approximate" HWP spin rate, with sign, in revs / second.
Use placeholder value of 0 for cases when not "locked".
"""
if self._output is None and output is None:
logger.warning('Not specified output file')
return
if output is not None:
self._output = output
if len(solved) == 0:
logger.warning('input data is empty, skip writing')
return
if len(solved['fast_time']) == 0:
logger.info('write no rotation data, skip writing')
return
session = so3g.hk.HKSessionHelper(hkagg_version=2)
writer = core.G3Writer(output)
writer.Process(session.session_frame())
prov_id = session.add_provider('hwp')
writer.Process(session.status_frame())
# Divide the full time span into equal intervals
start_time = solved['slow_time'].min()
end_time = solved['slow_time'].max()
if np.any(solved['fast_time']):
start_time = min(start_time, solved['fast_time'].min())
end_time = max(end_time, solved['fast_time'].max())
frame_length = 60 # seconds
while start_time < end_time:
t0, t1 = start_time, start_time + frame_length
# Write a slow frame?
s = (t0 <= solved['slow_time']) * (solved['slow_time'] < t1)
if np.any(s):
frame = session.data_frame(prov_id)
slow_block = core.G3TimesampleMap()
slow_block.times = core.G3VectorTime(
[core.G3Time(_t * core.G3Units.s) for _t in solved['slow_time'][s]])
slow_block['stable'] = core.G3VectorInt(solved['stable'][s])
slow_block['locked'] = core.G3VectorInt(solved['locked'][s])
slow_block['hwp_rate'] = core.G3VectorDouble(
solved['hwp_rate'][s])
frame['block_names'].append('slow')
frame['blocks'].append(slow_block)
writer.Process(frame)
# Write a fast frame?
s = (t0 <= solved['fast_time']) * (solved['fast_time'] < t1)
if np.any(s):
frame = session.data_frame(prov_id)
fast_block = core.G3TimesampleMap()
fast_block.times = core.G3VectorTime(
[core.G3Time(_t * core.G3Units.s) for _t in solved['fast_time'][s]])
fast_block['hwp_angle'] = core.G3VectorDouble(
solved['angle'][s])
frame['block_names'].append('fast')
frame['blocks'].append(fast_block)
writer.Process(frame)
start_time += frame_length
return
def _set_empty_axes(self, aman, suffix=None):
if suffix is None:
aman.wrap_new('hwp_angle', shape=('samps', ), dtype=np.float64)
aman.wrap('primary_encoder', 0)
aman.wrap('version', 1)
aman.wrap('pid_direction', 0)
aman.wrap('offcenter_direction', 0)
aman.wrap_new('offcenter', shape=(2,), dtype=np.float64)
aman.wrap('sotodlib_version', sotodlib.__version__)
else:
aman.wrap_new('hwp_angle_ver1'+suffix,
shape=('samps', ), dtype=np.float64)
aman.wrap_new('hwp_angle_ver2'+suffix,
shape=('samps', ), dtype=np.float64)
aman.wrap_new('hwp_angle_ver3'+suffix,
shape=('samps', ), dtype=np.float64)
aman.wrap_new('quad'+suffix, shape=('samps', ), dtype=int)
aman.wrap('quad_direction'+suffix, 0)
aman.wrap_new('stable'+suffix, shape=('samps', ), dtype=bool)
aman.wrap_new('locked'+suffix, shape=('samps', ), dtype=bool)
aman.wrap_new('hwp_rate'+suffix, shape=('samps', ), dtype=np.float16)
aman.wrap_new('template'+suffix, shape=(self._num_edges, ), dtype=np.float64)
aman.wrap_new('filled_flag'+suffix, shape=('samps', ), dtype=bool)
aman.wrap('version'+suffix, 1)
aman.wrap('logger'+suffix, self._write_solution_h5_logger)
return aman
def _set_raw_axes(self, aman, data):
""" Set raw encoder data in aman """
for k, v in data.items():
aman.wrap('raw_' + k, np.array(v))
return aman
def _load_raw_axes(self, aman, output, h5_address):
""" Load raw encoder data from h5 """
fileds, alias = self._key_formatting()
data = {}
f = h5py.File(output)
for a in alias:
if 'raw_' + a in f[h5_address].keys():
v = f[h5_address]['raw_' + a][:]
data[a] = v
f.close()
return data
def _bool_interpolation(self, timestamp1, data, timestamp2):
interp = scipy.interpolate.interp1d(
timestamp1, data, kind='linear', bounds_error=False)(timestamp2)
result = (interp > 0.999)
return result
[docs]
def set_data(self, tod, h5_filename=None):
"""
Output HWP hk data as AxisManager format. The results are stored in HDF5 files.
We save the copy of raw hwp encoder hk data into HDF5 file, to save time for
re-calculating the hwp angle solutions.
Args
----
tod: AxisManager
h5_filename:
If this is not None, try to load raw encoder data from hdf5 file
Notes
-----
Output file format
- Raw output
x is a number different for each data and each observation
- raw_rising_edge_count_1/2: int (2, x)
- raw_irig_time_1/2: float (2, x)
- raw_counter_1/2: float (2, x)
- raw_counter_index_1/2: int (2, x)
- raw_irig_synch_pulse_clock_time_1/2: float (2, x)
- raw_irig_synch_pulse_clock_counts_1/2: int (2, x)
- raw_quad_1/2: bool (2, x)
- raw_pid_direction: bool (2, x)
"""
if len(tod.timestamps) == 0:
logger.warning('Input data is empty.')
return
aman = sotodlib.core.AxisManager(tod.samps)
start = int(tod.timestamps[0])-self._margin
end = int(tod.timestamps[-1])+self._margin
self.data = {}
try:
if h5_filename is not None:
logger.info('Loading raw encoder data from h5')
try:
obs_id = tod.obs_info['obs_id']
self.data = self._load_raw_axes(aman, h5_filename, obs_id)
except Exception as e:
logger.error(f"Exception '{e}' thrown while loading HWP data from h5. Attempt to load from hk.")
self.data = self.load_data(start, end)
else:
self.data = self.load_data(start, end)
except Exception as e:
logger.error(
f"Exception '{e}' thrown while loading HWP data. The specified encoder field is missing.")
self._write_solution_h5_logger = 'HWP data too short'
print(traceback.format_exc())
finally:
self._set_raw_axes(aman, self.data)
return aman
[docs]
def make_solution(self, tod):
"""
Output HWP angle, flags, metadata as AxisManager format
The results are stored in HDF5 files. Since HWP angle solution HDF5 files are large,
we automatically split into the new output files.
Args
----
tod: AxisManager
Notes
-----
Output file format
- Primary output
- timestamp: float (samps,)
SMuRF synched timestamp
- hwp_angle: float (samps,)
The latest version of the SMuRF synched HWP angle in radian.
* ver1: HWP angle calculated from the raw encoder signal.
* ver2: HWP angle after the template subtraction.
* ver3: HWP angle after the template and off-centering subtraction.
The field 'version' indicates which version this hwp_angle is.
- primaty encoder: int
This field indicates which encoder is used for hwp_angle, 1 or 2
- version: int
This field indicates the version of the HWP angle in hwp_angle.
- Supplementary output
Suffix _1/2 indicates the encoder_1 or encoder_2
- version_1/2: int
This field indicates the version of the HWP angle of each encoder.
- hwp_angle_ver1/2/3_1/2: float (samps,)
This field stores the ver1/2/3 angle data.
- stable_1/2: bool (samps,)
If non-zero, indicates the HWP spin state is known.
i.e. it is either spinning at a measurable rate, or stationary.
When this flag is non-zero, the hwp_rate field can be taken at face value.
- locked_1/2: bool (samp,)
If non-zero, indicates the HWP is spinning and the position solution is working.
In this case one should find the hwp_angle populated in the fast data block.
- hwp_rate_1/2: float (samps,)
The "approximate" HWP spin rate, with sign, in revs / second.
Use placeholder value of 0 for cases when not "locked".
- logger_1/2: str
Log message for angle calculation status
'No HWP data', 'HWP data too short',
'Angle calculation failed', 'Angle calculation succeeded'
- filled_flag_1/2: bool (samps,)
Array to indicate the index of hwp angle filled due to packet drop, etc.
- quad_1/2: int (quad,)
0 or 1 or -1. 0 means no data
- template_1/2: float (1140,)
Template of the non uniformity of hwp encoder plate
- offcenter: float (2,)
- (average offcenter, std of offcenter) unit is (mm)
- Rotation direction
Rotation direction estimated by several methods.
0 or 1 or -1. 0 means no data.
- quad_direction_1/2: int
Estimation by median encoder quadrature for each encoder
- pid_direction: int
Estimation by median pid controller commanded direction
- offcenter_direction: int
Estimation by the offcentering measured by the time offset between two encoders.
To be implemented.
- template_direction: int
Estimation by the template of encoder plate.
To be implemented.
- scan_direction: int
Estimation by scan synchronous modulation of rotation speed.
"""
aman = sotodlib.core.AxisManager(tod.samps)
aman.wrap_new('timestamps', ('samps', ))[:] = tod.timestamps
self._set_empty_axes(aman)
if 'pid_direction' in self.data.keys():
pid_direction = np.nanmedian(self.data['pid_direction'][1])*2 - 1
if pid_direction in [1, -1]:
aman['pid_direction'] = pid_direction
else:
aman['pid_direction'] = 0
solved = {}
for suffix in self._suffixes:
logger.info('Start analyzing encoder'+suffix)
self._set_empty_axes(aman, suffix)
# load data
if not 'counter' + suffix in self.data.keys():
logger.warning('No HWP data in the specified timestamps.')
self._write_solution_h5_logger = 'No HWP data'
continue
# version 1
# calculate HWP angle
try:
solved.update(self.analyze(self.data, mod2pi=False, suffix=suffix))
except Exception as e:
logger.error(
f"Exception '{e}' thrown while calculating HWP angle. Angle calculation failed.")
self._write_solution_h5_logger = 'Angle calculation failed'
print(traceback.format_exc())
continue
if len(solved) == 0 or ('fast_time'+suffix not in solved.keys()) or len(solved['fast_time'+suffix]) == 0:
logger.info(
'No correct rotation data in the specified timestamps.')
self._write_solution_h5_logger = 'No HWP data'
continue
self._write_solution_h5_logger = 'Angle calculation succeeded'
aman['version'+suffix] = 1
aman['stable'+suffix] = self._bool_interpolation(
solved['slow_time'+suffix], solved['stable'+suffix], tod.timestamps)
aman['locked'+suffix] = self._bool_interpolation(
solved['slow_time'+suffix], solved['locked'+suffix], tod.timestamps)
aman['hwp_rate'+suffix] = scipy.interpolate.interp1d(
solved['slow_time'+suffix], solved['hwp_rate'+suffix], kind='linear', bounds_error=False)(tod.timestamps)
aman['logger'+suffix] = self._write_solution_h5_logger
quad = self._bool_interpolation(
solved['fast_time'+suffix], solved['quad'+suffix], tod.timestamps)
aman['quad'+suffix] = np.array([1 if q else -1 for q in quad])
aman['quad_direction'+suffix] = np.nanmedian(aman['quad'+suffix])
filled_flag = np.zeros_like(solved['fast_time'+suffix], dtype=bool)
filled_flag[solved['filled_indexes'+suffix]] = 1
filled_flag = scipy.interpolate.interp1d(
solved['fast_time'+suffix], filled_flag, kind='linear', bounds_error=False)(tod.timestamps)
aman['filled_flag'+suffix] = filled_flag.astype(bool)
aman['hwp_angle_ver1'+suffix] = np.mod(scipy.interpolate.interp1d(
solved['fast_time'+suffix], solved['angle'+suffix], kind='linear', bounds_error=False)(tod.timestamps), 2*np.pi)
# version 2
# calculate template subtracted angle
try:
self.eval_angle(solved, poly_order=3, suffix=suffix)
aman['hwp_angle_ver2'+suffix] = np.mod(scipy.interpolate.interp1d(
solved['fast_time'+suffix], solved['angle'+suffix], kind='linear', bounds_error=False)(tod.timestamps), 2*np.pi)
aman['version'+suffix] = 2
aman['template'+suffix] = solved['template'+suffix]
except Exception as e:
logger.error(
f"Exception '{e}' thrown while the template subtraction.")
print(traceback.format_exc())
# version 3
# calculate off-centering corrected angle
if (aman.version_1 == 2 and aman.version_2 == 2):
try:
self.eval_offcentering(solved)
self.correct_offcentering(solved)
for suffix in self._suffixes:
aman['hwp_angle_ver3'+suffix] = np.mod(scipy.interpolate.interp1d(
solved['fast_time'+suffix], solved['angle'+suffix], kind='linear', bounds_error=False)(tod.timestamps), 2*np.pi)
aman['version'+suffix] = 3
aman['offcenter'] = np.array([np.average(solved['offcentering']), np.std(solved['offcentering'])])
aman.offcenter_direction = np.sign(aman['offcenter'][0])
except Exception as e:
logger.error(
f"Exception '{e}' thrown while the off-centering correction.")
print(traceback.format_exc())
else:
logger.warning(
'Offcentering calculation is only available when two encoders are operating. Skipped.')
# make the hwp angle solution with highest version as hwp_angle
highest_version = int(np.max([aman.version_1, aman.version_2]))
primary_encoder = int(np.argmax([aman.version_1, aman.version_2]) + 1)
logger.info(f'Save hwp_angle_ver{highest_version}_{primary_encoder} as hwp_angle')
aman.hwp_angle = aman[f'hwp_angle_ver{highest_version}_{primary_encoder}']
aman.primary_encoder = primary_encoder
aman.version = highest_version
return aman
def _hwp_angle_calculator(
self,
counter,
counter_idx,
irig_time,
rising_edge,
quad_time,
quad,
mod2pi,
fast):
# counter: BBB counter values for encoder signal edges
self._encd_clk = counter
# counter_index: index numbers for detected edges by BBB
self._encd_cnt = counter_idx
# irig_time: decoded time in second since the unix epoch
self._irig_time = irig_time
# rising_edge_count: BBB clcok count values for the IRIG on-time
# reference marker risinge edge
self._rising_edge = rising_edge
# Reference slit indexes
self._ref_indexes = []
# quad: quadrature signal to determine rotation direction
self._quad_time = quad_time
self._quad = quad
self._quad_corrected = []
# return arrays
self._time = []
self._angle = []
# metadata of packet drop
self._num_dropped_pkts = 0
self._filled_indexes = []
# check duplication in data
self._duplication_check()
# check IRIG timing quality
self._irig_quality_check()
# check 32 bit internal counter overflow glitch
self._process_counter_overflow_glitch()
# treat counter index reset due to agent reboot
self._process_counter_index_reset()
# check packet drop
self._encoder_packet_sort()
self._fill_dropped_packets()
# assign IRIG synched timestamp
self._time = scipy.interpolate.interp1d(
self._rising_edge,
self._irig_time,
kind='linear',
fill_value='extrapolate')(self._encd_clk)
# Reject unexpected counter
idx = np.where((1 / np.diff(self._time) / self._num_edges) > 5.0)[0]
if len(idx) > 0:
self._encd_clk = np.delete(self._encd_clk, idx)
self._encd_cnt = self._encd_cnt[0] + \
np.arange(len(self._encd_cnt) - len(idx))
self._time = np.delete(self._time, idx)
# reference finding and fill its angle
_status_find_ref = self._find_refs()
if _status_find_ref == -1:
return [], []
if fast:
self._fill_refs_fast()
else:
self._fill_refs()
# re-assign IRIG synched timestamp
self._time = scipy.interpolate.interp1d(
self._rising_edge,
self._irig_time,
kind='linear',
fill_value='extrapolate')(self._encd_clk)
self._quad_corrected = self._quad_form(
scipy.interpolate.interp1d(
self._quad_time,
self._quad_form(self._quad),
kind='linear',
fill_value='extrapolate')(self._time))
# calculate hwp angle with IRIG timing
self._calc_angle_linear(mod2pi)
logger.debug('qualitycheck')
logger.debug('_time: ' + str(len(self._time)))
logger.debug('_angle: ' + str(len(self._angle)))
logger.debug('_encd_cnt: ' + str(len(self._encd_cnt)))
logger.debug('_encd_clk: ' + str(len(self._encd_clk)))
logger.debug('_ref_cnt: ' + str(len(self._ref_cnt)))
logger.debug('_ref_indexes: ' + str(len(self._ref_indexes)))
if len(self._time) != len(self._angle):
logger.warning('Failed to calculate hwp angle!')
return [], []
logger.info('hwp angle calculation is finished.')
return self._time, self._angle
def _find_refs(self):
""" Find reference slits """
# Calculate spacing between all clock values
diff = np.ediff1d(self._encd_clk) # [1:]
n = 0
diff_split = []
for i in range(len(diff)):
diff_split.append(diff[n:n + (self._num_edges - 2):1])
n += (self._num_edges - 2)
if n >= len(diff):
break
offset = 1
# Conditions for idenfitying the ref slit
# Slit distance somewhere between 2 slits:
# 2 slit distances (defined above) +/- 10%
for i in range(len(diff_split)):
_diff = diff_split[i]
# eliminate upper/lower _slit_width_lim
_diff_upperlim = np.percentile(
_diff, (1 - self._slit_width_lim) * 100)
_diff_lowerlim = np.percentile(_diff, self._slit_width_lim * 100)
__diff = _diff[np.where(
(_diff < _diff_upperlim) & (_diff > _diff_lowerlim))]
# Define mean value as nominal slit distance
if len(__diff) == 0:
continue
slit_dist = np.mean(__diff)
# Conditions for idenfitying the ref slit
# Slit distance somewhere between 2 slits:
# 2 slit distances (defined above) +/- ref_range
ref_hi_cond = ((self._ref_edges + 2) *
slit_dist * (1 + self._ref_range))
ref_lo_cond = ((self._ref_edges + 1) *
slit_dist * (1 - self._ref_range))
# Find the reference slit locations (indexes)
_ref_idx = np.argwhere(np.logical_and(
_diff < ref_hi_cond, _diff > ref_lo_cond)).flatten()
if len(_ref_idx) != 1:
continue
self._ref_indexes.append(_ref_idx[0] + offset)
offset += len(diff_split[i])
# Define the reference slit line to be the line before
# the two "missing" lines
# Store the count and clock values of the reference lines
self._ref_indexes = np.array(self._ref_indexes)
if len(self._ref_indexes) == 0:
if len(diff) < self._num_edges:
logger.warning(
'cannot find reference points, # of data is less than # of slit')
else:
logger.warning(
'cannot find reference points, please adjust parameters!')
return -1
# delete unexpected ref slit indexes
self._ref_indexes = np.delete(self._ref_indexes, np.where(
np.diff(self._ref_indexes) < self._num_edges - 10)[0])
self._ref_clk = self._encd_clk[self._ref_indexes]
self._ref_cnt = self._encd_cnt[self._ref_indexes]
logger.debug('found {} reference points'.format(
len(self._ref_indexes)))
return 0
def _fill_refs(self):
""" Fill in the reference edges """
# If no references, say that the first sample is theta = 0
# This case comes up for testing with a function generator
if len(self._ref_clk) == 0:
self._ref_clk = [self._encd_clk[0]]
self._ref_cnt = [self._encd_cnt[0]]
return
# Loop over all of the reference slits
for ii in range(len(self._ref_indexes)):
logger.debug("\r {:.2f} %".format(
100. * ii / len(self._ref_indexes)), end="")
# Location of this slit
ref_index = self._ref_indexes[ii]
# Linearly interpolate the missing slits
clks_to_add = np.linspace(
self._encd_clk[ref_index - 1], self._encd_clk[ref_index], self._ref_edges + 2)[1:-1]
self._encd_clk = np.insert(self._encd_clk, ref_index, clks_to_add)
# Adjust the encoder count values for the added lines
# Add 2 to all future counts and interpolate the counts
# for the two added slits
self._encd_cnt[ref_index:] += self._ref_edges
cnts_to_add = np.linspace(
self._encd_cnt[ref_index - 1], self._encd_cnt[ref_index], self._ref_edges + 2)[1:-1]
self._encd_cnt = np.insert(self._encd_cnt, ref_index, cnts_to_add)
# Also adjsut the reference count values in front of
# this one for the added lines
self._ref_cnt[ii + 1:] += self._ref_edges
# Adjust the reference index values in front of this one
# for the added lines
self._ref_indexes[ii + 1:] += self._ref_edges
return
def _fill_refs_fast(self):
""" Fill in the reference edges """
# If no references, say that the first sample is theta = 0
# This case comes up for testing with a function generator
if len(self._ref_clk) == 0:
self._ref_clk = [self._encd_clk[0]]
self._ref_cnt = [self._encd_cnt[0]]
return
# insert interpolate clk to reference points
lastsub = np.split(self._encd_clk, self._ref_indexes)[-1]
self._encd_clk = np.concatenate(
np.array(
[[sub_clk, np.linspace(self._encd_clk[ref_index - 1], self._encd_clk[ref_index], self._ref_edges + 2)[1:-1]]
for ref_index, sub_clk
in zip(self._ref_indexes, np.split(self._encd_clk, self._ref_indexes))], dtype=object
).flatten()
)
self._encd_clk = np.append(self._encd_clk, lastsub)
self._encd_cnt = self._encd_cnt[0] + np.arange(
len(self._encd_cnt) + len(self._ref_indexes) * self._ref_edges)
self._ref_indexes += np.arange(len(self._ref_indexes)
) * self._ref_edges
self._ref_cnt = self._encd_cnt[self._ref_indexes]
return
def _calc_angle_linear(self, mod2pi=True):
self._encd_cnt_split = np.split(self._encd_cnt, self._ref_indexes)
angle_first_revolution = (self._encd_cnt_split[0] - self._ref_cnt[0]) * \
(2 * np.pi / self._num_edges) % (2 * np.pi)
angle_last_revolution = (self._encd_cnt_split[-1] - self._ref_cnt[-1]) * \
(2 * np.pi / self._num_edges) % (2 * np.pi) + \
len(self._ref_cnt) * 2 * np.pi
self._angle = np.concatenate([(self._encd_cnt_split[i] - self._ref_cnt[i]) *
(2 * np.pi /
np.diff(self._ref_indexes)[i - 1])
% (2 * np.pi) + i * 2 * np.pi
for i in range(1, len(self._encd_cnt_split) - 1)])
self._angle = np.concatenate(
[angle_first_revolution, self._angle.flatten(), angle_last_revolution])
if mod2pi:
self._angle = self._angle % (2 * np.pi)
return
def _duplication_check(self):
""" Check the duplication in hk data and remove it """
unique_array, unique_index = np.unique(
self._encd_cnt, return_index=True)
if len(unique_array) != len(self._encd_cnt):
logger.warning(
'Duplication is found in encoder data, performing correction.')
self._encd_cnt = unique_array
self._encd_clk = self._encd_clk[unique_index]
unique_array, unique_index = np.unique(
self._rising_edge, return_index=True)
if len(unique_array) != len(self._rising_edge):
logger.warning(
'Duplication is found in IRIG data, performing correction.')
self._rising_edge = unique_array
self._irig_time = self._irig_time[unique_index]
def _irig_quality_check(self):
""" IRIG timing quality check """
idx = np.where(np.diff(self._irig_time) == 1)[0]
if self._irig_type == 1:
idx = np.where(np.isclose(np.diff(self._irig_time),
np.full(len(self._irig_time)-1, 0.1)))[0]
if len(self._irig_time) - 1 == len(idx):
return
elif len(self._irig_time) > len(idx) and len(idx) > 0:
if np.any(np.diff(self._irig_time) > 5):
logger.warning(
'a part of the IRIG time is incorrect, performing the correction process...')
self._irig_time = self._irig_time[idx]
self._rising_edge = self._rising_edge[idx]
logger.warning('deleted wrong irig_time, indices: ' +
str(np.where(np.diff(self._irig_time) != 1)[0]))
else:
self._irig_time = np.array([])
self._rising_edge = np.array([])
def _process_counter_overflow_glitch(self):
"""
Treat glitches due to 32 bit internal counter overflow
We suspect that this is a glitch caused by the very occasional failure of the encoder counter overflow correction
due to latency or other problems on the pc running the encoder agent.
"""
idx = np.where((np.diff(self._encd_clk)>=2**32-1) & (np.diff(self._encd_clk)<2**32+1e6))[0] + 1
if len(idx) > 0:
logger.warning(f'{len(idx)} counter overflow glitches are found, perform correction.')
for i in idx:
self._encd_clk[i] -= 2**32
def _process_counter_index_reset(self):
""" Treat counter index reset due to agent reboot """
idx = np.where(np.diff(self._encd_cnt) < -1e4)[0] + 1
if len(idx) > 0:
logger.warning(f'{len(idx)} counter resets are found, perform correction.')
for i in idx:
self._encd_cnt[i:] = self._encd_cnt[i:] + abs(np.diff(self._encd_cnt)[i-1]) + 1
def _fill_dropped_packets(self):
""" Estimate the number of dropped packets """
cnt_diff = np.diff(self._encd_cnt)
dropped_samples = np.sum(cnt_diff[cnt_diff >= self._pkt_size])
self._num_dropped_pkts = dropped_samples // (self._pkt_size - 1)
if self._num_dropped_pkts > 0:
logger.warning('{} dropped packets are found, performing fill process'.format(
self._num_dropped_pkts))
idx = np.where(np.diff(self._encd_cnt) > 1)[0]
for i in range(len(idx)):
ii = (np.where(np.diff(self._encd_cnt) > 1)[0])[0]
_diff = int(np.diff(self._encd_cnt)[ii])
# Fill dropped counters with counters one before or one after rotation.
# This filling method works even when the reference slot counter is dropped.
self._filled_indexes += list(range(ii + 1,
ii + 1 + self._pkt_size))
if ii - self._num_edges + self._ref_edges + 1 >= 0:
gap_clk = self._encd_clk[ii - self._num_edges + self._ref_edges + 1: ii+_diff - self._num_edges + self._ref_edges] \
- self._encd_clk[ii-self._num_edges +
self._ref_edges] + self._encd_clk[ii]
else:
gap_clk = self._encd_clk[ii - _diff + self._num_edges: ii - 1 + self._num_edges] \
- self._encd_clk[ii - _diff +
self._num_edges - 1] + self._encd_clk[ii]
gap_cnt = np.arange(self._encd_cnt[ii]+1, self._encd_cnt[ii+1])
self._encd_cnt = np.insert(self._encd_cnt, ii+1, gap_cnt)
self._encd_clk = np.insert(self._encd_clk, ii+1, gap_clk)
return
def _encoder_packet_sort(self):
cnt_diff = np.diff(self._encd_cnt)
if np.any(cnt_diff != 1):
logger.debug(
'a part of the counter is incorrect')
if np.any(cnt_diff < 0):
if 1 - self._pkt_size in cnt_diff:
logger.warning(
'Packet flip found, performing sort process')
idx = np.argsort(self._encd_cnt)
self._encd_clk = self._encd_clk[idx]
else:
logger.warning('Packet drop exists')
else:
logger.debug('no need to fix encoder index')
return
def _quad_form(self, quad):
# bit process
quad[(quad >= 0.5)] = 1
quad[(quad < 0.5)] = 0
offset = 0
for quad_split in np.array_split(quad, 1 + np.floor(len(quad) / 100)):
if quad_split.mean() > 0.1 and quad_split.mean() < 0.9:
for j in range(len(quad_split)):
quad[j + offset] = int(quad_split.mean() + 0.5)
offset += len(quad_split)
continue
outlier = np.argwhere(
np.abs(
quad_split.mean() -
quad_split) > 0.5).flatten()
if len(outlier) > 5:
logger.warning(
"flipping quad is corrected by mean value")
for i in outlier:
if i == 0:
ii, iii = i + 1, i + 2
elif i == outlier[-1]:
ii, iii = i - 1, i - 2
else:
ii, iii = i - 1, i + 1
if quad_split[i] + quad_split[ii] + quad_split[iii] == 1:
quad[i + offset] = 0
if quad_split[i] + quad_split[ii] + quad_split[iii] == 2:
quad[i + offset] = 1
offset += len(quad_split)
return quad