import re
import datetime
import logging
import numpy as np
from scipy.optimize import fmin
from astropy import units
from astropy.utils import data as au_data
from skyfield import jpllib, api as skyfield_api
import so3g
from pixell import enmap
from .. import core, tod_ops, coords
logger = logging.getLogger(__name__)
# Default source list
SOURCE_LIST = ['mercury',
'venus',
'moon',
'mars',
'jupiter',
'saturn',
'uranus',
'neptune',
('tauA', 83.6272579, 22.02159891),
('rcw38', 134.7805107, -47.50911231),
('iras16183-4958', 245.5154, -50.09168292),
('iras19078+0901', 287.5575891, 9.107188994),
('rcw122', 260.0339538, -38.95673421),
('cenA', 201.3625336, -43.00797508),
('3c279', 194.0409868, -5.79174024),
('3c273', 187.2775626, 2.053532671),
('G025.36-00.14', 279.5264042, -6.793169326),
('QSO_J2253+1608', 343.4952422, 16.14301323),
('galactic_center', -93.5833, -29.0078)]
[docs]
def get_source_list_fromstr(target_source):
"""Get a source_list from SOURCE_LIST by name, or raise ValueError if not found.
"""
for isource in SOURCE_LIST:
if isinstance(isource, str):
isource_name = isource
elif isinstance(isource, tuple):
isource_name = isource[0]
if isource_name.lower() == target_source.lower():
return isource
raise ValueError(f'Source "{target_source}" not found in {SOURCE_LIST}.')
[docs]
def match_fixed_source(source_name):
"""Match source_name against a regular expression and if it has the
format 'Jxxx[+-pmn]yyy', where xxx and yyy are decimal numbers,
then RA, Dec = xxx, yyy in degrees will be returned.
If it doesn't match, nans are returned as RA and Dec.
Args:
source_name (str): Name of fixed-position source
Return:
RA (float): Right Ascension of source in degrees
Dec (float): Declination of source in degrees
match (bool): Whether the source_name match with regular expression
"""
m = re.match(
r'[jJ](?P<ra_deg>\d+(\.\d*)?)(?P<dec_sign>[+-pmn])(?P<dec_deg>\d+(\.\d*)?)', source_name)
if m:
sign = (-1)**(m['dec_sign'] in ['-', 'm', 'n'])
ra, dec = float(m['ra_deg']), sign * float(m['dec_deg'])
return ra, dec, m
else:
return np.nan, np.nan, m
[docs]
class SlowSource:
"""Class to track the time-dependent position of a slow-moving source,
such as a Solar System planet in equatorial coordinates.
"Slow-moving" is in relation to the time range of interest. A
linear approximation is good enough for Solar System, arcsecond
accuracy, on time scales of a few hours.
Args:
timestamp (float): reference time, as a unix timestamp.
ra (float): Right Ascension at reference time, in radians.
dec (float): Declination at reference time, in radians.
v_ra (float): rate of change of RA, in radians.
v_dec (float): rate of change of dec, in radians.
precision: not implemented, don't worry about it.
"""
def __init__(self, timestamp, ra, dec, v_ra=0., v_dec=0.,
precision=.0001 * coords.DEG):
self.timestamp = timestamp
self.ra = ra
self.dec = dec
self.v_ra = v_ra
self.v_dec = v_dec
[docs]
@classmethod
def for_named_source(cls, name, timestamp):
"""Returns a SlowSource for planet ``name``, with position and
peculiar velocity measured at time timestamp (float, unix).
"""
dt = 3600
ra0, dec0, distance = get_source_pos(name, timestamp)
ra1, dec1, distance = get_source_pos(name, timestamp + dt)
return cls(timestamp, ra0, dec0, ((ra1-ra0+np.pi) % (2*np.pi) - np.pi)/dt, (dec1-dec0)/dt)
[docs]
def pos(self, timestamps):
"""Get the (approximate) source position at the times given by the
array of unix timestamps.
Returns two arrays (RA, dec), the same shape as timestamps,
both in radians. The RA are not corrected into any particular
branch (the mapping from timestamp to RA is continuous).
"""
if not isinstance(timestamps, np.ndarray):
ra, dec = self.pos(np.array([timestamps]))
return ra[0], dec[0]
dt = timestamps - self.timestamp
return self.ra + self.v_ra * dt, self.dec + self.v_dec * dt
[docs]
def get_scan_q(tod, planet, boresight_offset=None, refq=None):
"""Identify the point (in time and azimuth space) at which the
specified planet crosses the boresight elevation for the
observation in tod. The rotation taking boresight coordinates to
celestial coordinates at that moment in time (and pointing) is
computed, and its conjugate is returned.
Boresight_offset is for taking into account off-centered wafer
that have position offset in the focal plane, which is either None
or a (xi, eta) tuple in radian.
The returned rotation is useful because it represents a fixed way
to rotate the celestial such that the target source ends up at
(xi, eta) = 0, with the telescope scan direction parallel to xi
and elevation parallel to eta; this is a useful system for
measuring beam and pointing parameters.
"""
if boresight_offset is None:
boresight_offset = (0, 0)
q_xieta = so3g.proj.quat.rotation_xieta(boresight_offset[0], boresight_offset[1])
if refq is None:
refq = so3g.proj.quat.rotation_xieta(0, 0)
# Get reference elevation...
el = np.median(tod.boresight.el[::10])
az = np.median(tod.boresight.az[::10])
t = (tod.timestamps[0] + tod.timestamps[-1]) / 2
if isinstance(planet, (list, tuple)):
_, ra, dec = planet
planet = SlowSource(t, float(ra) * coords.DEG,
float(dec) * coords.DEG)
else:
planet = SlowSource.for_named_source(planet, t)
def scan_q_model(t, az, el, planet):
csl = so3g.proj.CelestialSightLine.az_el(
t, az, el, weather='typical', site='so')
ra0, dec0 = planet.pos(t)
return csl.Q, ~so3g.proj.quat.rotation_lonlat(ra0, dec0) * csl.Q * q_xieta
def distance(p):
dt, daz = p
q, qnet = scan_q_model(t + dt, az + daz, el, planet)
lon, lat, phi = so3g.proj.quat.decompose_lonlat(qnet)
return 90 * coords.DEG - lat
p0 = np.array([0, 0])
X = fmin(distance, p0, disp=0, full_output=1)
p, fopt, n_iter, n_calls, warnflag = X
if warnflag != 0:
logger.warning('Source-scan solver failed to converge or otherwise '
f'complained! warnflag={warnflag}')
q, qnet = scan_q_model(t+p[0], az+p[1], el, planet)
psi = so3g.proj.quat.decompose_xieta(qnet)[2][0]
ra, dec = planet.pos(t+p[0])
rot = ~so3g.proj.quat.rotation_lonlat(ra, dec, psi)
return {'rot': rot,
'timestamp': t+p[0],
'az': az+p[1],
'el': el,
'ra': ra,
'dec': dec,
'psi': psi,
'planet': planet}
[docs]
def get_scan_P(tod, planet, boresight_offset=None, refq=None, res=None, size=None, **kw):
"""Get a standard Projection Matrix targeting a planet (or some
interesting fixed position), in source-scan coordinates.
Returns a Projection Matrix and the output from get_scan_q.
"""
logger.debug(f'get_scan_P: init for {planet}')
if res is None:
res = 0.01 * coords.DEG
X = get_scan_q(tod, planet, boresight_offset=boresight_offset, refq=refq)
rot = so3g.proj.quat.rotation_lonlat(0, 0) * X['rot']
wcs_kernel = coords.get_wcs_kernel('tan', 0., 0., res=res)
logger.debug(f'get_scan_P: getting projection matrix for {wcs_kernel}.')
P = coords.P.for_tod(tod, rot=rot, wcs_kernel=wcs_kernel, **kw)
if size is not None:
# Trim to a local square
mz = P.zeros(comps='T').submap([[-size/2, size/2], [size/2, -size/2]])
P.geom = enmap.Geometry(shape=mz.shape, wcs=mz.wcs)
return P, X
[docs]
def get_horizon_P(tod, az, el, receiver_fixed=False, **kw):
"""Get a standard Projection Matrix targeting arbitrary source (for example
drone) in horizon coordinates.
Args:
tod: AxisManager of the observation
az: azimuth of the target source (rad)
el: elevation of the target source (rad)
receiver_fixed:
If true, rotate the source centered coordinate to be receiver fixed.
This should be True for receiver fixed beam measurement.
This should be False for pol angle calibration with source on drone.
Return:
a Projection Matrix
"""
sight = so3g.proj.CelestialSightLine.for_horizon(
tod.timestamps,
tod.boresight.az,
tod.boresight.el,
tod.boresight.roll
)
pq = so3g.proj.quat.rotation_lonlat(-az, el)
if receiver_fixed:
rot = so3g.proj.quat.euler(2, -tod.boresight.roll)
sight.Q = so3g.proj.quat.rotation_lonlat(0, 0) * rot * ~pq * sight.Q
else:
sight.Q = so3g.proj.quat.rotation_lonlat(0, 0) * ~pq * sight.Q
P = coords.P.for_tod(tod, sight, **kw)
return P
[docs]
def filter_for_sources(tod=None, signal=None, source_flags=None,
n_modes=10, low_pass=None, wrap=None,
pca_wrap=None, edge_guard=None):
"""Mask and gap-fill the signal at samples flagged by source_flags.
Then PCA the resulting time ordered data. Restore the flagged
signal, remove the strongest modes from PCA.
If signal is not passed in tod.signal will be modified directly.
To leave tod.signal intact, pass in signal=tod.signal.copy().
Args:
tod: AxisManager from which defaults will be drawn.
signal: Time-ordered data to operate on. Defaults to
tod.signal.
source_flags: RangesMatrix to use for source flagging. Defaults
to tod.source_flags.
low_pass: Frequency, in Hz, at which to apply low pass filter to
signal. If None, no filtering is done. You can pass in a
filter from tod_ops.filters if you want.
n_modes (int): Number of eigenmodes to remove... interface
subject to change.
wrap (str): If specified, the result will be stored at
tod[wrap].
pca_wrap (str): If specified, the PCA model modes and weights calculated for subtraction will be wrapped into tod[pca_wrap].
edge_guard (int): Number of samples at the beginning and end of the flags to change them False.
Default is None. (Nothing happens.)
Returns:
The filtered signal.
"""
if source_flags is None and tod is not None:
source_flags = tod.source_flags
elif source_flags is None and tod is None:
raise ValueError("Must provide source_flags if tod is None")
if signal is None and tod is not None:
signal = tod.signal
elif signal is None and tod is None:
raise ValueError("Must provide signal if tod is None")
if signal is None:
raise ValueError("signal somehow still None!")
if edge_guard is not None:
if isinstance(edge_guard, int):
edgebl = np.zeros(source_flags.shape[-1], dtype=bool)
edgebl[edge_guard:-edge_guard] = True
edgerm = so3g.proj.ranges.RangesMatrix.from_mask(edgebl)
for iflag in source_flags:
iflag.intersect(edgerm)
else:
raise ValueError("edge_guard must be an int.")
# Get a reasonable gap fill.
signal_pca = signal.copy()
gaps = tod_ops.get_gap_fill(tod, signal=signal_pca, flags=source_flags)
gaps.swap(tod, signal=signal_pca)
# Low pass filter?
if low_pass is not None:
if tod is None:
raise ValueError("tod cannot be None when lowpassing")
if isinstance(low_pass, tod_ops.filters._chainable):
filt = low_pass
else:
filt = tod_ops.filters.low_pass_butter4(low_pass)
n_det, n = signal.shape
rfft = tod_ops.fft_ops.RFFTObj.for_shape(n_det, n, 'BOTH')
rfft.a[:] = signal_pca
rfft.t_forward()
times = tod.timestamps
delta_t = (times[-1]-times[0])/(tod.samps.count - 1)
freqs = np.fft.rfftfreq(n, delta_t)
filt.apply(freqs, tod, target=rfft.b)
signal_pca = rfft.t_backward()
del rfft
# Measure TOD means (after gap fill, low pass, etc).
if isinstance(n_modes, str) and n_modes == 'all':
# Don't overthink that.
signal -= signal_pca
else:
# Make sure PCA decomposition targets (signal_pca) are mean 0.
# It's convenient to remove the same levels from signal now,
# too.
levels = signal_pca.mean(axis=1)
signal_pca -= levels[:, None]
signal -= levels[:, None]
# Get PCA model and discard the source vectors.
pca = tod_ops.pca.get_pca_model(
tod, signal=signal_pca, n_modes=n_modes, wrap=pca_wrap)
del signal_pca
# Remove the PCA model.
tod_ops.pca.add_model(tod, pca, -1, signal=signal)
if wrap and tod is not None:
tod.wrap(wrap, signal, [(0, 'dets'), (1, 'samps')])
return signal
def _get_astrometric(source_name, timestamp, site="_default", planets=None):
"""
Derive skyfield's Astrometric object of a celestial source at a
specific timestamp and observing site, which is used to derive
radec/azel in get_source_pos/get_source_azel.
Args:
source_name: Planet name; in capitalized format, e.g. "Jupiter",
or fixed source specification with a format 'Jxxx[+-pmn]yyy',
where xxx and yyy are decimal numbers of RA, Dec in degrees.
timestamp: unix timestamp.
site (str or so3g.proj.EarthlySite): if this is a string, the
site will be looked up in so3g.proj.SITES dict.
planets (SpiceKernel): the ephemeris object for
skyfield.jpllib). If not passed in, a sensible default
ephemeris is used.
Returns:
planets: SpiceKernel object (for re-use)
astrometric: skyfield's astrometric object
Notes:
The SpiceKernel holds the ephemeris file open. To close that
file, run planets.close(). Deleting the object is not enough,
as the garbage collection can be lazy. After .close(), the
kernel can't be used for ephemeris stuff. Some operations on the
returned "astrometric" object will try to use the kernel, and
will fail. So keep planets around until you're done computing on
astrometric, then call close().
If you're doing lots of astrometric stuff, you can get the
kernel once and re-use it.
If the default ephemeris file (de421.bsp) is not found, it will
be downloaded (16M) and cached.
"""
if planets is None:
# Get the ephemeris
de_filename = core.get_local_file("de421.bsp")
planets = jpllib.SpiceKernel(de_filename)
else:
assert len(planets.codes) # Someone .closed this eph!
for k in [
source_name,
source_name + " barycenter",
]:
try:
target = planets[k]
break
except (ValueError, KeyError):
pass
else:
ra, dec, m = match_fixed_source(source_name)
if m:
# convert from degrees to hours
target = skyfield_api.Star(ra_hours=ra/15., dec_degrees=dec)
else:
options = list(planets.names().values())
raise ValueError(
f'Failed to find a match for "{source_name}" in ephemeris: {options}'
)
if isinstance(site, str):
site = so3g.proj.SITES[site]
observatory = site.skyfield_site(planets)
timescale = skyfield_api.load.timescale()
sf_timestamp = timescale.from_datetime(
datetime.datetime.fromtimestamp(timestamp, tz=skyfield_api.utc)
)
astrometric = observatory.at(sf_timestamp).observe(target)
return planets, astrometric
[docs]
def get_source_pos(source_name, timestamp, site='_default'):
"""Get the equatorial coordinates of a planet (or fixed-position
source, see note) at some time. Returns the apparent position,
accounting for geographical position on earth, but assuming no
atmospheric refraction.
Args:
source_name: Planet name; in capitalized format, e.g. "Jupiter",
or fixed source specification.
timestamp: unix timestamp.
site (str or so3g.proj.EarthlySite): if this is a string, the
site will be looked up in so3g.proj.SITES dict.
Returns:
ra (float): in radians.
dec (float): in radians.
distance (float): in AU.
Note:
Before checking in the ephemeris, the source_name will be
matched against a regular expression and if it has the format
'Jxxx[+-pmn]yyy', where xxx and yyy are decimal numbers, then a
fixed-position source at RA,Dec = xxx,yyy in degrees will be
processed. In that case, the distance is returned as Inf.
"""
# Check against fixed-position template...
ra, dec, m = match_fixed_source(source_name)
if m:
return ra * coords.DEG, dec * coords.DEG, float('inf')
# Derive from skyfield astrometric object
planets, amet0 = _get_astrometric(source_name, timestamp, site)
ra, dec, distance = amet0.radec()
planets.close()
return ra.to(units.rad).value, dec.to(units.rad).value, distance.to(units.au).value
[docs]
def get_source_azel(source_name, timestamp, site='_default'):
"""
Get the apparent azimuth and elevation of a celestial source at a
specific timestamp and observing site. Returns the apparent position,
accounting for geographical position on earth, but assuming no
atmospheric refraction.
Args:
source_name: Planet name; in capitalized format, e.g. "Jupiter"
timestamp (float): The Unix timestamp representing the time for
which to calculate azimuth and elevation.
site (str or so3g.proj.EarthlySite): if this is a string, the
site will be looked up in so3g.proj.SITES dict.
Returns:
az (float): in radians.
el (float): in radians.
distance (float): in AU.
"""
planets, amet0 = _get_astrometric(source_name, timestamp, site)
el, az, distance = amet0.apparent().altaz()
planets.close()
return az.to(units.rad).value, el.to(units.rad).value, distance.to(units.au).value
[docs]
def get_nearby_sources(tod=None, source_list=None, distance=1.):
"""Identify solar system objects (especially "planets") that might be
within a TOD's scan footprint.
Arguments:
tod (AxisManager): The data to check. Needs to have
focal_plane, boresight, timestamps.
source_list (list or None): A list of source names or None to
use a default list. Use simple planet names in lower case
(e.g. ['uranus']), or tuples with source name, RA, and dec in
degrees (e.g. [('tau_a', 83.63, 22.01)]).
distance (float): Maximum distance from the source center, in
degrees, to consider as "within the footprint". (This should
be at least the sum of the beam radius and the planet radius
... though there's usually no harm in going a bit larger than
that.)
Returns:
List of tuples (source_name, SlowSource) that satisfy the
"nearby" condition.
"""
# Make a full sky map with not very many pixels.
shape, wcs = enmap.fullsky_geometry(res=2 * coords.DEG, proj='car')
# Sight line
sight = so3g.proj.CelestialSightLine.az_el(
tod.timestamps, tod.boresight.az, tod.boresight.el, roll=tod.boresight.roll,
site='so', weather='typical')
# One central detector
xieta0, R, _ = coords.helpers.get_focal_plane_cover(tod, 0)
fp = so3g.proj.FocalPlane.from_xieta([xieta0[0]], [xieta0[1]])
asm = so3g.proj.Assembly.attach(sight, fp)
p = so3g.proj.Projectionist.for_geom(shape, wcs)
w = p.to_map(np.zeros((1, len(tod.timestamps)),
'float32')+1, asm, comps='T')
w = enmap.enmap(w, wcs=wcs)
if source_list is None:
source_list = SOURCE_LIST
positions = []
for source_name in source_list:
t = tod.timestamps[0]
if isinstance(source_name, (list, tuple)):
source_name, ra, dec = source_name
sl = coords.planets.SlowSource(t, float(ra) * coords.DEG,
float(dec) * coords.DEG)
else:
sl = coords.planets.SlowSource.for_named_source(source_name, t)
x = w.distance_from([[sl.dec], [sl.ra]])
md = x[w[0] != 0].min()
logger.debug(('Source {:12} is at ({:8.4f},{:8.4f}); '
'that is {:5.2f} degrees off footprint.').format(
source_name, sl.ra / coords.DEG,
sl.dec / coords.DEG, md/coords.DEG))
if md < (R * 1.1 + distance * coords.DEG):
positions.append((source_name, sl))
return positions
[docs]
def compute_source_flags(tod=None, P=None, mask=None, wrap=None,
center_on=None, res=None, max_pix=4e6):
"""Process masking instructions and create RangesMatrix that flags
samples in the TOD that are within the masked region. This
masking makes use of a map with the footprint encoded in P, so
flagging boundaries will correspond to pixel edges.
The interface for "mask" is subject to change -- use of a simple
text file is interim.
Args:
tod (AxisManager): the observation.
P (Projection Matrix): if passed in, must include a map geom
(shape and WCS). If None, will be created from get_scan_P
using center_on and res parameters.
mask: source masking instructions (see note).
wrap: key in tod at which to store the result.
res: If P is None, sets the target mask map resolution
(radians).
max_pix: If P is None, this sets the maximum acceptable number
of pixels for the mask map. This is to catch cases where an
incorrect source has been passed in, for example, leading to a
weird map footprint
Returns:
RangesMatrix marking the samples inside the masked region.
Notes:
The mask can be a dict or a list of dicts. Each dict must be of
the form::
{'shape': 'circle',
'xyr': (XI, ETA, R)}
where R is the radius of the circular mask, and XI and ETA are
the center of the circle, all in degrees.
"""
if P is None:
logger.info('Getting Projection Matrix ...')
P, X = get_scan_P(tod, center_on, res=res, comps='T')
shape, wcs = tuple(P.geom)
if shape[0] * shape[1] > max_pix:
raise ValueError(f'Mask map too large: {shape}')
if isinstance(mask, str):
# Assume it's a filename, and file is simple columns of (x, y,
# radius) in deg. (Deprecated!)
mask = [{'xyr': list(map(float, line.split()))}
for line in open(mask)]
mask_map = P.zeros()
_add_to_mask(mask, mask_map)
a = P.from_map(mask_map)
source_flags = so3g.proj.RangesMatrix(
[so3g.proj.Ranges.from_mask(r != 0) for r in a])
if wrap:
assert tod is not None, "Pass in a tod to 'wrap' the output."
tod.flags.wrap(wrap, source_flags, [(0, 'dets'), (1, 'samps')])
return source_flags
def _add_to_mask(req, mask_map):
# Helper for compute_source_flags.
if req is None:
raise ValueError(f'Requested mask is None. For no mask, pass [].')
if isinstance(req, (list, tuple)):
for _r in req:
# Also, maybe test this somehow?
_add_to_mask(_r, mask_map)
elif isinstance(req, dict):
shape = req.get('shape', 'circle')
if shape == 'circle':
x, y, r = req['xyr']
d = enmap.distance_from(mask_map.shape, mask_map.wcs,
[[y * coords.DEG], [x * coords.DEG]])
mask_map += 1. * (d < r * coords.DEG)
else:
raise ValueError(f'Unknown shape="{shape}" in mask request.')
else:
raise ValueError(f'Weird mask request: {req}')
[docs]
def load_detector_splits(tod=None, filename=None, dataset=None,
source=None, wrap=None):
"""Convert a partition of detectors into a dict of disjoint
RangesMatrix objects; such an object can be passed to make_map()
to efficiently make detector-split maps.
The "detector split" data can be read from an HDF5 dataset, or
passed in directly as an AxisManager.
Args:
tod (AxisManager): This is required, to get the list of dets and
the samps count.
filename (str): The HDF filename, or filename:dataset.
dataset (str): The HDF dataset (if not passed in with filename).
source (array, ResultSet or AxisManager): If not None, then
filename and dataset are ignored and this object is processed
(as though it had just been loaded from HDF).
wrap (str): If not None, the address in tod where to store the
loaded split data.
The format of detector splits, in an HDF5 dataset, is as one would
write from a ResultSet with columns ['dets:name', 'group'] (both
str). All detectors sharing a value in the group column will
grouped together and the group label will be that value.
If passing in "source" directly as a ResultSet, it should have
columns 'dets:name' and 'group'; if as an AxisManager then it
should have a 'dets' axis and a vector 'group' with shape
('dets',) providing the group name for each detector. If it is a
numpy array, it is assumed to correspond one-to-one with the .dets
axis of TOD and the array gives the group name for each detector.
Returns:
data_splits (dict of RangesMatrix): Each entry of the dict is a
RangesMatrix that can be interpreted as cuts to apply during
mapmaking. In this case the RangesMatrix will simply mark
each detector as either fully cut (flagged) or fully uncut.
"""
from sotodlib.io import metadata
if source is None:
if dataset is None:
filename, dataset = filename.split(':')
source = metadata.read_dataset(filename, dataset)
if isinstance(source, np.ndarray):
source, _s = core.AxisManager(tod.dets), source
source.wrap('group', _s)
elif isinstance(source, metadata.ResultSet):
di = core.metadata.loader.unconvert_det_info(tod.det_info)
source = core.metadata.loader.broadcast_resultset(
source, di, axis_key='name')
else:
source = source.copy() # is this an AxisManager?
source.restrict_axes([tod.dets])
if wrap:
tod.wrap(wrap, source)
yes = so3g.proj.RangesMatrix.zeros(tod.signal.shape[1])
flags = {}
for group in source['group']:
if group in flags:
continue
flags[group] = so3g.proj.RangesMatrix.ones((tod.signal.shape))
for i in (source['group'] == group).nonzero()[0]:
flags[group].ranges[i] = yes
return flags
[docs]
def get_det_weights(tod, signal=None, wrap=None,
outlier_clip=None):
"""Compute detector weights, based on variance of signal. If
outlier_clip is set, it is used to trim outliers. See code for
details, but think of it as the number of stdev away from the mean
weight that will be kept, and try 2.5.
Returns:
A det_weights array compatible with projection matrix functions.
"""
if signal is None:
signal = tod.signal
det_weights = np.zeros(signal.shape[:-1])
sigmas = signal.std(axis=-1)
ok = sigmas != 0
det_weights[ok] = sigmas[ok]**-2
if outlier_clip:
qs = np.quantile(det_weights[ok], [.16, .84]) # "1 sigma" lims
q0, dq = qs.mean(), np.diff(qs)[0]/2
ok *= ((q0 - outlier_clip * dq < det_weights) *
(det_weights < q0 + outlier_clip * dq))
return det_weights * ok
[docs]
def write_det_weights(tod, filename, dataset, det_weights=None):
"""Save detector weights to an HDF file dataset.
Args:
tod (AxisManager): provides the dets axis, and det_weights if
not passed explicitly.
filename (str or h5py.File): destination filename (or open File)
dataset (str): address in the HDF5 file to put the result (it
will be overwritten if exists already).
det_weights (array): the weights to write; must be in
correspondence with tod.dets. Defaults to tod.det_weights.
Returns:
ResultSet with the info that was written.
"""
from sotodlib.io import metadata
if det_weights is None:
det_weights = tod.det_weights
rs = core.metadata.ResultSet(['dets:name', 'det_weights'])
rs.rows.extend(list(zip(tod.dets.vals, det_weights)))
metadata.write_dataset(rs, filename, dataset, overwrite=True)
return rs
[docs]
def make_map(tod, center_on=None, scan_coords=True, thread_algo=False,
res=0.01*coords.DEG, size=None, wcs_kernel=None, comps='TQU',
signal=None,
det_weights=None,
filename=None, source_flags=None, cuts=None,
data_splits=None,
low_pass=None, n_modes=10,
eigentol=1e-3, info={}):
"""Make a compact source map from the TOD. Specify filename to write
things to disk; this should be a format string, for example
'{obs_id}_{map}.fits', where 'map' will be given values of
['binned', 'solved', 'weights'] and any other keys (obs_id in this
case) must be passed through info.
"""
assert (center_on is not None) # Pass in the source name, e.g. 'uranus'
# Test the map format string...
if filename is not None:
try:
filename.format(map='binned', **info)
except:
raise ValueError('Failed to process filename format "%s" with info=%s' %
(filename, info))
class MmTimer(coords.Timer):
PRINT_FUNC = logger.info
FMT = 'make_map: {msg}: {elapsed:.3f} seconds'
if signal is None:
signal = tod.signal
if thread_algo == 'none':
thread_algo = False
with MmTimer('setup thread_algo=%s' % thread_algo):
if scan_coords:
P, X = coords.planets.get_scan_P(tod, center_on, res=res, size=size,
comps=comps, cuts=cuts,
threads=thread_algo)
else:
planet = coords.planets.SlowSource.for_named_source(
center_on, tod.timestamps[0])
ra0, dec0 = planet.pos(tod.timestamps.mean())
wcsk = coords.get_wcs_kernel('tan', ra0, dec0, res=res)
P = coords.P.for_tod(tod, comps=comps,
cuts=cuts,
threads=thread_algo,
wcs_kernel=wcsk)
with MmTimer('get_proj_threads'):
P._get_proj_threads()
with MmTimer('filter for sources'):
filter_for_sources(tod, signal=signal, source_flags=source_flags,
low_pass=low_pass, n_modes=n_modes)
if det_weights is None:
det_weights = get_det_weights(tod, signal=signal, outlier_clip=2.)
if data_splits is not None:
# Clear P's internal cuts as we'll be modifying those to pass
# in directly.
base_cuts, P.cuts = P.cuts, None
output = {
'P': P,
'det_weights': det_weights,
'splits': {}
}
# Write out _map and _weights for each group.
for group_label, group_cuts in data_splits.items():
logger.info(f'Mapping split "{group_label}"')
if base_cuts is not None:
group_cuts = group_cuts + base_cuts
with MmTimer('getting weights'):
w = P.to_weights(cuts=group_cuts, det_weights=det_weights)
with MmTimer('getting map and applying inverse weights'):
m = P.remove_weights(
tod=tod, signal=signal, weights_map=w, cuts=group_cuts,
det_weights=det_weights, eigentol=eigentol)
output['splits'][group_label] = {
'binned': None,
'weights': w.astype('float32'),
'solved': m.astype('float32'),
}
if filename is not None:
m.astype('float32').write(
filename.format(map=f'{group_label}_map', **info))
w.astype('float32').write(
filename.format(map=f'{group_label}_weights', **info))
return output
with MmTimer('project signal and weight maps'):
map1 = P.to_map(tod, signal=signal, det_weights=det_weights)
wmap1 = P.to_weights(det_weights=det_weights)
with MmTimer('compute and apply inverse weights map'):
iwmap1 = P.to_inverse_weights(weights_map=wmap1, eigentol=eigentol)
map1b = P.remove_weights(map1, inverse_weights_map=iwmap1)
if filename is not None:
with MmTimer('Write out'):
map1b.write(filename.format(map='solved', **info))
if filename.format(map='x', **info) != filename.format(map='y', **info):
map1.write(filename.format(map='binned', **info))
wmap1.write(filename.format(map='weights', **info))
write_det_weights(tod, filename.format(map='detweights', **info).
replace('.fits', '.h5'), 'detweights',
det_weights=det_weights)
return {'binned': map1,
'weights': wmap1,
'solved': map1b,
'P': P,
'det_weights': det_weights,
}