import so3g.proj
import numpy as np
import scipy
from pixell import enmap, tilemap
from .helpers import _get_csl, _valid_arg, _not_both
from . import helpers
import logging
logger = logging.getLogger(__name__)
[docs]
class P:
"""Projection Matrix.
This class provides functions to apply a Projection Matrix (or its
transpose). The Projection Matrix, P, also sometimes called the
Pointing Matrix, describes how a vector of time-ordered
measurements d are determined by a vector of map pixels m::
d = P m
We are working in aspace, of course, where d is not a vector, but
rather an array of vectors, and m is not a vector, it's a
multi-dimensional array with two of the dimensions corresponding
to a rectangular pixelization of the sky.
If you are making filter+bin maps, you will want to use these functions:
- to_map
- to_inverse_weights
- remove_weights
If you are solving for a map iteratively you will probably just need:
- to_map
- from_map
Important keyword arguments that are used in many functions:
- tod: The AxisManager from which signal and pointing information
should be taken.
- signal: The array to use for time-ordered signal. If a string,
it will be looked up in tod. Defaults to 'signal'.
- det_weights: A vector of floats representing the inverse
variance of each detector, under the assumption of white noise.
- cuts: A RangesMatrix that identifies samples that should be
excluded from projection operations.
- comps: the component code (e.g. 'T', 'TQU', 'QU', ...) that
specifies the spin-components being modeled in the map.
- dest: the appropriately shaped array in which to place the
computed result (as an alternative to allocating a new object to
store the result).
Note that in the case of det_weights and cuts, the Projection
Matrix may also have cached values for those. It is an error to
pass either of these as a keyword argument to a projection routine
if a value has been cached for it.
Objects of this class cache certain pre-computed information (such
as the rotation taking boresight coordinates to celestial
coordinates) and certain context-dependent settings (such as a map
shape, WCS, and spin-component configuration). You may want to
inspect or borrow these results, perhaps to reuse them when
constructing new instances with slight modifications. The cached
attributes are:
- sight: A CelestialSightLine, representing the boresight pointing
in celestial coordinates. [samps]
- fp: G3VectorQuat representing the focal plane offsets of each
detector. When constructing with a tod argument, fp can be
automatically populated from tod.focal_plane; note that if hwp
is passed as True then the gamma (polarization) angles will be
reflected (gamma' = -gamma). [dets]
- geom: The target map geometry. This is a pixell.enmap.Geometry
object, with attributes .shape and .wcs; or possibly (if tiled)
a pixell.tilemap.TileGeometry.
- comps: String indicating the spin-components to include in maps.
E.g., 'T', 'QU', 'TQU'.
- rot: quat giving an additional fixed rotation to apply to get
from boresight to celestial coordinates. Not for long...
- cuts (optional): RangesMatrix indicating what samples to exclude
from projection operations (the indicated samples have
projection matrix element 0 in all components). [dets, samps]
- threads (optional): list of RangesMatrix objects, to control the
use of threads in TOD-to-map operations using OpenMP. See so3g
documentation. [[threads, dets, samps], ...]
- det_weights (optional): weights (one per detector) to apply to
time-ordered data when binning a map (and also when binning a
weights matrix). [dets]
- interpol (optional): How to interpolate the values for samples
between pixel centers. Forwarded to Projectionist. Valid
options are:
- None, 'nn' or 'nearest': Standard nearest neighbor mapmaking.
- 'lin' or 'bilinear': Linearly interpolate between the four
closest pixels.
Default: None
These things can be updated freely, with the following caveats:
- If the number of "samples" or "detectors" is changed in one
attribute, it will need to be changed in the others to match.
- The threads attribute, if in use, needs to be recomputed if
anything about the pointing changes (this includes map geometry
but does not include map components).
Setting the "threads" argument to certain special values will
activate different thread assignment algorithms:
- False: do not use threading; to_map projections will be
single-threaded.
- True: use the default algorithm, 'domdir'.
- None: same as True.
- 'simple': compute self.threads using simple map-stripe
algorithm.
- 'domdir': compute self.threads using dominant-direction
algorithm (recommended).
- 'tiles': for tiled geometries, design self.threads such that
each tile is assigned to a single thread (each thread may be in
charge of multiple tiles).
"""
def __init__(self, sight=None, fp=None, geom=None, comps='T',
cuts=None, threads=None, det_weights=None, interpol=None):
self.sight = sight
self.fp = fp
self.geom = wrap_geom(geom)
self.comps = comps
self.cuts = cuts
self.threads = threads
self.active_tiles = None
self.det_weights = det_weights
self.interpol = interpol
[docs]
@classmethod
def for_tod(cls, tod, sight=None, geom=None, comps='T',
rot=None, cuts=None, threads=None, det_weights=None,
timestamps=None, focal_plane=None, boresight=None,
boresight_equ=None, wcs_kernel=None, weather='typical',
site='so', interpol=None, hwp=False):
"""Set up a Projection Matrix for a TOD. This will ultimately call
the main P constructor, but some missing arguments will be
extracted from tod and computed along the way.
To determine the boresight pointing in celestial coordinates
(ultimately passed to constructor as sight=), the first
non-None item in the following list is used:
- the sight= keyword argument.
- the boresight_equ= keyword argument.
- the boresight= keyword argument
- tod.get('boresight_equ')
- tod.get('boresight')
If the map geometry geom is not specified, but the wcs_kernel
is provided, then get_footprint will be called to determine
the geom.
The per-detector positions and polarization directions are
extracted from focal_plane (xi, eta, gamma), or else
tod.focal_plane. If hwp=True, then the rotation angles
(gamma) are reflected (gamma' = -gamma) before storing in
self.fp.
"""
if sight is None:
if boresight_equ is None:
if boresight is None:
boresight_equ = tod.get('boresight_equ')
if boresight_equ is not None:
sight = so3g.proj.CelestialSightLine.for_lonlat(
boresight_equ.ra, boresight_equ.dec, boresight_equ.get('psi'))
else:
timestamps = _valid_arg(timestamps, 'timestamps', src=tod)
boresight = _valid_arg(boresight, 'boresight', src=tod)
assert(boresight is not None)
sight = so3g.proj.CelestialSightLine.az_el(
timestamps, boresight.az, boresight.el, roll=boresight.roll,
site=site, weather=weather)
else:
sight = _get_csl(sight)
# Apply a rotation from equatorial to map WCS coordinates.
if rot is not None:
sight.Q = rot * sight.Q
# Set up the detectors in the focalplane
fp = _valid_arg(focal_plane, 'focal_plane', src=tod)
xi, eta, gamma = fp.xi, fp.eta, fp.get('gamma')
if np.any(np.isnan(np.array([xi, eta]))):
raise ValueError('np.nan is included in xi or eta')
if gamma is not None and np.any(np.isnan(gamma)):
raise ValueError('np.nan is included in gamma')
assert (gamma is not None or not hwp)
if hwp:
gamma = -gamma
fp = so3g.proj.quat.rotation_xieta(xi, eta, gamma)
if geom is None and wcs_kernel is not None:
geom = helpers.get_footprint(tod, wcs_kernel, sight=sight)
return cls(sight=sight, fp=fp, geom=geom, comps=comps,
cuts=cuts, threads=threads, det_weights=det_weights,
interpol=interpol)
[docs]
@classmethod
def for_geom(cls, tod, geom, comps='TQU', timestamps=None,
focal_plane=None, boresight=None, rot=None, cuts=None):
"""Deprecated, use .for_tod."""
return cls.for_tod(tod, geom=geom, comps=comps,
timestamps=timestamps, focal_plane=focal_plane,
boresight=boresight, rot=rot, cuts=cuts)
[docs]
def zeros(self, super_shape=None, comps=None):
"""Returns an enmap concordant with this object's configured geometry
and component count.
Args:
super_shape (tuple): The leading dimensions of the array.
If None, self._comp_count(comps) is used.
comps: The component list, to override self.comps.
Returns:
An enmap with shape super_shape + self.geom.shape.
"""
if super_shape is None:
super_shape = (self._comp_count(comps), )
if self.tiled:
# Need to fully resolve tiling to get occupied tiles.
proj, _ = self._get_proj_threads()
return tilemap.from_tiles(proj.zeros(super_shape), self.geom)
else:
proj = self._get_proj()
return enmap.ndmap(proj.zeros(super_shape), wcs=self.geom.wcs)
[docs]
def to_map(self, tod=None, dest=None, comps=None, signal=None,
det_weights=None, cuts=None, eigentol=None):
"""Project time-ordered signal into a map. This performs the operation
m += P d
and returns m.
Args:
tod: AxisManager; possible source for 'signal', 'det_weights'.
dest (enmap): the map or array into which the data should be
accumulated. (If None, a new enmap is created and
initialized to zero.)
signal: The time-ordered data, d. If None, tod.signal is used.
det_weights: The per-detector weight vector. If None,
self.det_weights will be used; if that is not set then
uniform weights of 1 are applied.
cuts: Sample cuts to exclude from processing. If None,
self.cuts is used.
eigentol: This is ignored.
"""
signal = _valid_arg(signal, 'signal', src=tod)
det_weights = _not_both(det_weights, self.det_weights,
'det_weights', dtype='float32')
cuts = _not_both(cuts, self.cuts, 'cuts')
if comps is None: comps = self.comps
if dest is None: dest = self.zeros(comps=comps)
proj, threads = self._get_proj_threads(cuts=cuts)
proj.to_map(signal, self._get_asm(), output=self._prepare_map(dest),
det_weights=det_weights, comps=comps, threads=unwrap_ranges(threads))
return dest
[docs]
def to_weights(self, tod=None, dest=None, comps=None, signal=None,
det_weights=None, cuts=None):
"""Computes the weights matrix for the uncorrelated noise model and
returns it. I.e.:
W += P N^-1 P^T
and returns W. Here the inverse noise covariance has shape
(n_dets), and carries a single weight (1/var) value for each
detector.
Args:
tod (AxisManager): possible source for det_weights.
dest (enmap): the map or array into which the weights should
be accumulated. (If None, a new enmap is created and
initialized to zero.)
det_weights: The per-detector weight vector. If None,
tod.det_weights will be used; if that is not set then
uniform weights of 1 are applied.
cuts: Sample cuts to exclude from processing. If None,
self.cuts is used.
"""
det_weights = _not_both(det_weights, self.det_weights,
'det_weights', dtype='float32')
cuts = _not_both(cuts, self.cuts, 'cuts')
if comps is None:
comps = self.comps
if dest is None:
_n = self._comp_count(comps)
dest = self.zeros((_n, _n))
proj, threads = self._get_proj_threads(cuts=cuts)
proj.to_weights(self._get_asm(), output=self._prepare_map(dest),
det_weights=det_weights, comps=comps, threads=unwrap_ranges(threads))
return dest
[docs]
def to_inverse_weights(self, weights_map=None, tod=None, dest=None,
comps=None, signal=None, det_weights=None, cuts=None,
eigentol=1e-4,
):
"""Compute an inverse weights map, W^-1, from a weights map. If no
weights_map is passed in, it will be computed by calling
to_weights, passing through all other arguments.
"""
if weights_map is None:
logger.info('to_inverse_weights: calling .to_weights')
weights_map = self.to_weights(
tod=tod, comps=comps, signal=signal, det_weights=det_weights, cuts=cuts)
# Works for both normal and tiled maps
if dest is None: dest = np.zeros_like(weights_map)
dest[:] = helpers._invert_weights_map(weights_map, eigentol=eigentol, UPLO='U')
return dest
[docs]
def remove_weights(self, signal_map=None, weights_map=None, inverse_weights_map=None,
dest=None, **kwargs):
"""Apply the inverse weights matrix to a signal map.
m' = W^-1 m
If W or m are not fully specified, they will be computed by
calling other routines inline, with relevant arguments passed
through.
Args:
signal_map: The map m to filter.
inverse_weights_map: the matrix W^-1 to apply to the map.
Shape should be (n_comp, n_comp, n_row, n_col), but only
the upper diagonal in the first two dimensions needs to be
populated. If this is None, then "weights_map" is taken
as W, and it will be inverted and applied.
weights_map: the matrix W. Shape should be (n_comp, n_comp,
n_row, n_col), but only the upper diagonal in the first
two dimensions needs to be populated. If this is None,
then W will be computed by a call to
"""
if inverse_weights_map is None:
inverse_weights_map = self.to_inverse_weights(weights_map=weights_map, **kwargs)
if signal_map is None:
signal_map = self.to_map(**kwargs)
if dest is None: dest = np.zeros_like(signal_map)
dest[:] = helpers._apply_inverse_weights_map(inverse_weights_map, signal_map)
return dest
[docs]
def from_map(self, signal_map, dest=None, comps=None, wrap=None,
cuts=None, tod=None):
"""Project from a map into the time-domain.
d += P m
Args:
signal_map: The map m. This can probably be just about
anything supported by so3g.proj; it doesn't have to match
the internally configured geometry.
dest: Time-ordered data array, shape (dets, samps). If
None, a new array will be created to hold the result.
Otherwise, data are *accumulated* into d, so clear it
manually if you are trying to do d = P m.
comps (str): Projection components, if you want to override.
cuts: RangesMatrix, shape (dets, samps) flagging samples
that should not be populated. Defaults to empty.
wrap (str): If specified, wraps the result as tod[wrap]
(after removing whatever was in there).
Returns:
The dest array.
Notes:
Since this is a set of one-to-many operation, OpenMP can
be used without carefully assigning samples to threads.
"""
assert cuts is None # whoops, not implemented.
# _get_proj doesn't set up the tiling info, so
# must call get_proj_threads. This is ugly, since from_map
# doesn't need the threading structures. Can we find a better design?
if self.tiled: proj, _ = self._get_proj_threads()
else: proj = self._get_proj()
if comps is None:
comps = self.comps
tod_shape = (len(self.fp), len(self.sight.Q))
if dest is None:
dest = np.zeros(tod_shape, np.float32)
assert(dest.shape == tod_shape) # P.fp/P.sight and dest argument disagree
proj.from_map(self._prepare_map(signal_map), self._get_asm(), signal=dest, comps=comps)
if wrap is not None:
if wrap in tod:
del tod[wrap]
tod.wrap(wrap, dest, [(0, 'dets'), (1, 'samps')])
return dest
@property
def tiled(self):
"""Duck-typing to see if we're tiled or not. Reload-safe, unlike isinstance"""
try:
self.geom.ntile
return True
except AttributeError:
return False
def _comp_count(self, comps=None):
"""Returns the number of spin components for component code comps.
"""
if comps is None:
comps = self.comps
return len(comps)
def _get_proj(self):
if self.geom is None:
raise ValueError("Can't project without a geometry!")
# Backwards compatibility for old so3g
interpol_kw = _get_interpol_args(self.interpol)
if self.tiled:
return so3g.proj.Projectionist.for_tiled(
self.geom.shape, self.geom.wcs, self.geom.tile_shape,
active_tiles=self.active_tiles, **interpol_kw)
else:
return so3g.proj.Projectionist.for_geom(self.geom.shape,
self.geom.wcs, **interpol_kw)
def _get_proj_threads(self, cuts=None):
"""Return the Projectionist and sample-thread assignment for the
present geometry. If the thread assignment has not been
determined yet, it is done now and cached in self.threads. In
tiled geometries, if self.active_tiles has not been
determined, that is done now and cached.
The returned sample-thread assignment is modified by "cuts",
which defaults to self.cuts if passed as None. I.e. after
computing or looking up the full self.threads, the code
returns (proj, self.threads*~cuts).
Returns:
Tuple (proj, threads*cuts).
"""
proj = self._get_proj()
if cuts is None:
cuts = self.cuts
if self.tiled and self.active_tiles is None:
logger.info('_get_proj_threads: get_active_tiles')
if isinstance(self.threads, str) and self.threads == 'tiles':
logger.info('_get_proj_threads: assigning using "tiles"')
tile_info = proj.get_active_tiles(self._get_asm(), assign=True)
_tile_threads = wrap_ranges(tile_info['group_ranges'])
else:
tile_info = proj.get_active_tiles(self._get_asm())
self.active_tiles = tile_info['active_tiles']
proj = self._get_proj()
if self.threads is False:
return proj, ~cuts
if self.threads is None:
self.threads = 'domdir'
if isinstance(self.threads, str):
if self.threads in ['simple', 'domdir']:
logger.info(f'_get_proj_threads: assigning using "{self.threads}"')
self.threads = wrap_ranges(proj.assign_threads(
self._get_asm(), method=self.threads))
elif self.threads == 'tiles':
# Computed above unless logic failed us...
self.threads = _thile_threads
else:
raise ValueError('Request for unknown algo threads="%s"' % self.threads)
if cuts:
threads = [_t * ~cuts for _t in self.threads]
else:
threads = self.threads
return proj, threads
def _get_asm(self):
"""Bundles self.fp and self.sight into an "Assembly" for calling
so3g.proj routines."""
so3g_fp = so3g.proj.FocalPlane()
for i, q in enumerate(self.fp):
so3g_fp[f'a{i}'] = q
return so3g.proj.Assembly.attach(self.sight, so3g_fp)
def _prepare_map(self, map):
if self.tiled: return map.tiles
else: return map
class P_PrecompDebug:
def __init__(self, geom, pixels, phases):
self.geom = wrap_geom(geom).nopre
self.pixels = pixels
self.phases = phases
def zeros(self, super_shape=None):
if super_shape is None: super_shape = (self.phases.shape[2],)
return enmap.zeros(super_shape + self.geom.shape, self.geom.wcs)
def to_map(self, dest=None, signal=None, comps=None):
if dest is None: dest = self.zeros()
proj = so3g.ProjEng_Precomp_NonTiled()
proj.to_map(dest, self.pixels, self.phases, signal, None, None)
return dest
def from_map(self, signal_map, dest=None, comps=None):
if dest is None: dest = np.zeros(self.pixels.shape[:2], np.float32)
proj = so3g.ProjEng_Precomp_NonTiled()
proj.from_map(signal_map, self.pixels, self.phases, dest)
return dest
def wrap_geom(geom):
if isinstance(geom, tuple) or isinstance(geom, list):
return enmap.Geometry(*geom)
else:
return geom
# Helpers for backwards compatibility with old so3g. Consider removing
# these once transition is done. The idea is for sotodlib to use the
# more recent interface ("threads" is a list of RangesMatrix objects)
# while supporting use with older so3g (where "threads" is a single
# 3-d RangesMatrix).
def wrap_ranges(ranges):
# Run this on the "threads" result returned from so3g thread
# assignement routines, before storing the result internally.
if _so3g_ivals_format() == 1:
return [ranges]
else:
return ranges
def unwrap_ranges(ranges):
# Run this on the internally stored threads object before passing
# it to so3g projection routines.
if _so3g_ivals_format() == 1:
assert len(ranges) == 1, "Old so3g only supports simple (1-bunch) thread ranges, but got thread ranges with shape %s" % (str(ranges.shape))
return ranges[0]
else:
return ranges
def _so3g_ivals_format():
projclass = so3g.proj.Projectionist
if not hasattr(projclass, '_ivals_format'):
return 1
else:
return projclass._ivals_format
def _get_interpol_args(interpol):
if _so3g_ivals_format() >= 2:
return {'interpol': interpol}
assert interpol in [None, "nn", "nearest"], "Old so3g does not support interpolated mapmaking"
return {}