Source code for sotodlib.coords.pmat

import so3g.proj
import numpy as np
from pixell import enmap, tilemap

from .helpers import _get_csl, _valid_arg, _not_both, _confirm_wcs
from . import helpers
from . import healpix_utils as hp_utils

import logging
logger = logging.getLogger(__name__)


[docs] class P: """Projection Matrix. This class provides functions to apply a Projection Matrix (or its transpose). The Projection Matrix, P, also sometimes called the Pointing Matrix, describes how a vector of time-ordered measurements d are determined by a vector of map pixels m:: d = P m We are working in aspace, of course, where d is not a vector, but rather an array of vectors, and m is not a vector, it's a multi-dimensional array with two of the dimensions corresponding to a rectangular pixelization of the sky. If you are making filter+bin maps, you will want to use these functions: - to_map - to_inverse_weights - remove_weights If you are solving for a map iteratively you will probably just need: - to_map - from_map Important keyword arguments that are used in many functions: - tod: The AxisManager from which signal and pointing information should be taken. - signal: The array to use for time-ordered signal. If a string, it will be looked up in tod. Defaults to 'signal'. - det_weights: A vector of floats representing the inverse variance of each detector, under the assumption of white noise. - cuts: A RangesMatrix that identifies samples that should be excluded from projection operations. - comps: the component code (e.g. 'T', 'TQU', 'QU', ...) that specifies the spin-components being modeled in the map. - dest: the appropriately shaped array in which to place the computed result (as an alternative to allocating a new object to store the result). Note that in the case of det_weights and cuts, the Projection Matrix may also have cached values for those. It is an error to pass either of these as a keyword argument to a projection routine if a value has been cached for it. Objects of this class cache certain pre-computed information (such as the rotation taking boresight coordinates to celestial coordinates) and certain context-dependent settings (such as a map shape, WCS, and spin-component configuration). You may want to inspect or borrow these results, perhaps to reuse them when constructing new instances with slight modifications. The cached attributes are: - sight: A CelestialSightLine, representing the boresight pointing in celestial coordinates. [samps] - fp: FocalPlane object representing the focal plane offsets and T,P responses of each detector. When constructing with a tod argument, fp can be automatically populated from tod.focal_plane; note that if hwp is passed as True then the gamma (polarization) angles will be reflected (gamma' = -gamma). [dets] - geom: The target map geometry. This is a pixell.enmap.Geometry object, with attributes .shape and .wcs (for non-tiled rectpix); a pixell.tilemap.TileGeometry (if tiled); or a Healpix geometry as defined in coords.healpix_utils. - comps: String indicating the spin-components to include in maps. E.g., 'T', 'QU', 'TQU'. - rot: quat giving an additional fixed rotation to apply to get from boresight to celestial coordinates. Not for long... - cuts (optional): RangesMatrix indicating what samples to exclude from projection operations (the indicated samples have projection matrix element 0 in all components). [dets, samps] - threads (optional): list of RangesMatrix objects, to control the use of threads in TOD-to-map operations using OpenMP. See so3g documentation. [[threads, dets, samps], ...] - det_weights (optional): weights (one per detector) to apply to time-ordered data when binning a map (and also when binning a weights matrix). [dets] - interpol (optional): How to interpolate the values for samples between pixel centers. Forwarded to Projectionist. Valid options are: - None, 'nn' or 'nearest': Standard nearest neighbor mapmaking. - 'lin' or 'bilinear': Linearly interpolate between the four closest pixels. Default: None These things can be updated freely, with the following caveats: - If the number of "samples" or "detectors" is changed in one attribute, it will need to be changed in the others to match. - The threads attribute, if in use, needs to be recomputed if anything about the pointing changes (this includes map geometry but does not include map components). Setting the "threads" argument to certain special values will activate different thread assignment algorithms: - False: do not use threading; to_map projections will be single-threaded. - True: use the default algorithm, 'domdir'. - None: same as True. - 'simple': compute self.threads using simple map-stripe algorithm. - 'domdir': compute self.threads using dominant-direction algorithm (recommended). - 'tiles': for tiled geometries, design self.threads such that each tile is assigned to a single thread (each thread may be in charge of multiple tiles). """ def __init__(self, sight=None, fp=None, geom=None, comps='T', cuts=None, threads=None, det_weights=None, interpol=None): self.sight = sight self.fp = fp self.geom = wrap_geom(geom) self.comps = comps self.cuts = cuts self.threads = threads self.active_tiles = None self.det_weights = det_weights self.interpol = interpol self.pix_scheme = _infer_pix_scheme(self.geom) if self.pix_scheme == "healpix" and (self.geom.nside_tile is not None): if isinstance(self.geom.nside_tile, int): self.geom.nside_tile = min(self.geom.nside, self.geom.nside_tile) self.geom.ntile = True # Not used for healpix but needed for tiled()
[docs] @classmethod def for_tod(cls, tod, sight=None, geom=None, comps='T', rot=None, cuts=None, threads=None, det_weights=None, timestamps=None, focal_plane=None, boresight=None, boresight_equ=None, wcs_kernel=None, weather='typical', site='so', interpol=None, hwp=False, qp_kwargs={}): """Set up a Projection Matrix for a TOD. This will ultimately call the main P constructor, but some missing arguments will be extracted from tod and computed along the way. To determine the boresight pointing in celestial coordinates (ultimately passed to constructor as sight=), the first non-None item in the following list is used: - the sight= keyword argument. - the boresight_equ= keyword argument. - the boresight= keyword argument - tod.get('boresight_equ') - tod.get('boresight') qp_kwargs are passed through to qpoint to compute sight if sight and boresight_equ args are None. If the map geometry geom is not specified, but the wcs_kernel is provided, then get_footprint will be called to determine the geom. The per-detector positions and polarization directions are extracted from focal_plane (xi, eta, gamma), or else tod.focal_plane. If hwp=True, then the rotation angles (gamma) are reflected (gamma' = -gamma) before storing in self.fp. """ if sight is None: if boresight_equ is None: if boresight is None: boresight_equ = tod.get('boresight_equ') if boresight_equ is not None: sight = so3g.proj.CelestialSightLine.for_lonlat( boresight_equ.ra, boresight_equ.dec, boresight_equ.get('psi')) else: timestamps = _valid_arg(timestamps, 'timestamps', src=tod) boresight = _valid_arg(boresight, 'boresight', src=tod) assert(boresight is not None) sight = so3g.proj.CelestialSightLine.az_el( timestamps, boresight.az, boresight.el, roll=boresight.roll, site=site, weather=weather, **qp_kwargs) else: sight = _get_csl(sight) # Apply a rotation from equatorial to map WCS coordinates. if rot is not None: sight.Q = rot * sight.Q # Set up the detectors in the focalplane fp = helpers.get_fplane(tod, focal_plane=focal_plane, hwp=hwp) if geom is None and wcs_kernel is not None: geom = helpers.get_footprint(tod, wcs_kernel, sight=sight) return cls(sight=sight, fp=fp, geom=geom, comps=comps, cuts=cuts, threads=threads, det_weights=det_weights, interpol=interpol)
[docs] @classmethod def for_geom(cls, tod, geom, comps='TQU', timestamps=None, focal_plane=None, boresight=None, rot=None, cuts=None): """Deprecated, use .for_tod.""" return cls.for_tod(tod, geom=geom, comps=comps, timestamps=timestamps, focal_plane=focal_plane, boresight=boresight, rot=rot, cuts=cuts)
[docs] def zeros(self, super_shape=None, comps=None): """Returns an enmap concordant with this object's configured geometry and component count. Args: super_shape (tuple): The leading dimensions of the array. If None, self._comp_count(comps) is used. comps: The component list, to override self.comps. Returns: An enmap with shape super_shape + self.geom.shape. """ if super_shape is None: super_shape = (self._comp_count(comps), ) proj, _ = self._get_proj_tiles() if self.pix_scheme == 'healpix': return proj.zeros(super_shape) elif self.pix_scheme == 'rectpix': if self.tiled: # second super_shape here only needed when all tiles are empty return tilemap.from_tiles(proj.zeros(super_shape), self.geom.copy(pre=super_shape)) else: return enmap.ndmap(proj.zeros(super_shape), wcs=self.geom.wcs)
[docs] def to_map(self, tod=None, dest=None, comps=None, signal=None, det_weights=None, cuts=None, eigentol=None): """Project time-ordered signal into a map. This performs the operation m += P d and returns m. Args: tod: AxisManager; possible source for 'signal', 'det_weights'. dest (enmap): the map or array into which the data should be accumulated. (If None, a new enmap is created and initialized to zero.) signal: The time-ordered data, d. If None, tod.signal is used. det_weights: The per-detector weight vector. If None, self.det_weights will be used; if that is not set then uniform weights of 1 are applied. cuts: Sample cuts to exclude from processing. If None, self.cuts is used. eigentol: This is ignored. """ signal = _valid_arg(signal, 'signal', src=tod) det_weights = _not_both(det_weights, self.det_weights, 'det_weights', dtype='float32') cuts = _not_both(cuts, self.cuts, 'cuts') if comps is None: comps = self.comps if dest is None: dest = self.zeros(comps=comps) proj, threads = self._get_proj_threads(cuts=cuts) proj.to_map(signal, self._get_asm(), output=self._prepare_map(dest), det_weights=det_weights, comps=comps, threads=unwrap_ranges(threads)) return dest
[docs] def to_weights(self, tod=None, dest=None, comps=None, signal=None, det_weights=None, cuts=None): """Computes the weights matrix for the uncorrelated noise model and returns it. I.e.: W += P N^-1 P^T and returns W. Here the inverse noise covariance has shape (n_dets), and carries a single weight (1/var) value for each detector. Args: tod (AxisManager): possible source for det_weights. dest (enmap): the map or array into which the weights should be accumulated. (If None, a new enmap is created and initialized to zero.) det_weights: The per-detector weight vector. If None, tod.det_weights will be used; if that is not set then uniform weights of 1 are applied. cuts: Sample cuts to exclude from processing. If None, self.cuts is used. """ det_weights = _not_both(det_weights, self.det_weights, 'det_weights', dtype='float32') cuts = _not_both(cuts, self.cuts, 'cuts') if comps is None: comps = self.comps if dest is None: _n = self._comp_count(comps) dest = self.zeros((_n, _n)) proj, threads = self._get_proj_threads(cuts=cuts) proj.to_weights(self._get_asm(), output=self._prepare_map(dest), det_weights=det_weights, comps=comps, threads=unwrap_ranges(threads)) return dest
[docs] def to_inverse_weights(self, weights_map=None, tod=None, dest=None, comps=None, signal=None, det_weights=None, cuts=None, eigentol=1e-4, ): """Compute an inverse weights map, W^-1, from a weights map. If no weights_map is passed in, it will be computed by calling to_weights, passing through all other arguments. """ if weights_map is None: logger.info('to_inverse_weights: calling .to_weights') weights_map = self.to_weights( tod=tod, comps=comps, signal=signal, det_weights=det_weights, cuts=cuts) weights_map, uf_info = self._flatten_map(weights_map) if dest is not None: dest, uf_info = self._flatten_map(dest, uf_info) _dest = helpers._invert_weights_map( weights_map, eigentol=eigentol, UPLO='U') if dest is not None: dest[:] = _dest else: dest = _dest del _dest dest = self._unflatten_map(dest, uf_info) return dest
[docs] def remove_weights(self, signal_map=None, weights_map=None, inverse_weights_map=None, dest=None, **kwargs): """Apply the inverse weights matrix to a signal map. m' = W^-1 m If W or m are not fully specified, they will be computed by calling other routines inline, with relevant arguments passed through. Args: signal_map: The map m to filter. inverse_weights_map: the matrix W^-1 to apply to the map. Shape should be (n_comp, n_comp, n_row, n_col), but only the upper diagonal in the first two dimensions needs to be populated. If this is None, then "weights_map" is taken as W, and it will be inverted and applied. weights_map: the matrix W. Shape should be (n_comp, n_comp, n_row, n_col), but only the upper diagonal in the first two dimensions needs to be populated. If this is None, then W will be computed and inverted via ``self.to_inverse_weights``. """ if inverse_weights_map is None: inverse_weights_map = self.to_inverse_weights(weights_map=weights_map, **kwargs) if signal_map is None: signal_map = self.to_map(**kwargs) # Get flat numpy-compatible forms for the maps. signal_map, uf_info = self._flatten_map(signal_map) inverse_weights_map, uf_info = self._flatten_map(inverse_weights_map, uf_info) if dest is not None: dest, uf_info = self._flatten_map(dest, uf_info) dest = helpers._apply_inverse_weights_map(inverse_weights_map, signal_map, out=dest) dest = self._unflatten_map(dest, uf_info) return dest
[docs] def from_map(self, signal_map, dest=None, comps=None, wrap=None, cuts=None, tod=None): """Project from a map into the time-domain. d += P m Args: signal_map: The map m. This can probably be just about anything supported by so3g.proj; it doesn't have to match the internally configured geometry. dest: Time-ordered data array, shape (dets, samps). If None, a new array will be created to hold the result. Otherwise, data are *accumulated* into d, so clear it manually if you are trying to do d = P m. comps (str): Projection components, if you want to override. cuts: RangesMatrix, shape (dets, samps) flagging samples that should not be populated. Defaults to empty. wrap (str): If specified, wraps the result as tod[wrap] (after removing whatever was in there). Returns: The dest array. Notes: Since this is a set of one-to-many operation, OpenMP can be used without carefully assigning samples to threads. """ assert cuts is None # whoops, not implemented. # This is not free but it is pretty fast, doesn't do thread # assignments. proj, _ = self._get_proj_tiles() if comps is None: comps = self.comps tod_shape = (self.fp.ndet, len(self.sight.Q)) if dest is None: dest = np.zeros(tod_shape, np.float32) assert(dest.shape == tod_shape) # P.fp/P.sight and dest argument disagree if self.tiled and self.pix_scheme == 'rectpix': # so3g <= 0.1.15 has a dims check on signal_map that fails on the tiled map format. so3g.proj.wcs._ProjectionistBase.from_map( proj, self._prepare_map(signal_map), self._get_asm(), signal=dest, comps=comps) else: proj.from_map(self._prepare_map(signal_map), self._get_asm(), signal=dest, comps=comps) if wrap is not None: if wrap in tod: del tod[wrap] tod.wrap(wrap, dest, [(0, 'dets'), (1, 'samps')]) return dest
@property def tiled(self): """Duck-typing to see if we're tiled or not. Reload-safe, unlike isinstance""" try: self.geom.ntile return True except AttributeError: return False def _comp_count(self, comps=None): """Returns the number of spin components for component code comps. """ if comps is None: comps = self.comps return len(comps) def _get_proj(self): # Backwards compatibility for old so3g interpol_kw = _get_interpol_args(self.interpol) if self.geom is None: raise ValueError("Can't project without a geometry!") if self.pix_scheme == "healpix": return so3g.proj.ProjectionistHealpix.for_healpix( self.geom.nside, self.geom.nside_tile, self.active_tiles, self.geom.ordering, **interpol_kw) elif self.pix_scheme == "rectpix": if self.tiled: return so3g.proj.Projectionist.for_tiled( self.geom.shape, self.geom.wcs, self.geom.tile_shape, active_tiles=self.active_tiles, **interpol_kw) else: return so3g.proj.Projectionist.for_geom(self.geom.shape, self.geom.wcs, **interpol_kw) def _get_proj_tiles(self, assign=False): # Get Projectionist and compute self.active_tiles if it's not # already known. Return Projectionist with active_tiles set, # which is suitable for from_map and zeros (though not for # threaded to_map etc). proj = self._get_proj() if not self.tiled or (self.active_tiles is not None and not assign): return proj, {} tile_info = proj.get_active_tiles(self._get_asm(), assign=assign) self.active_tiles = tile_info['active_tiles'] if self.pix_scheme == "healpix": self.geom.nside_tile = proj.nside_tile # Update nside_tile if it was 'auto' self.geom.ntile = 12*proj.nside_tile**2 elif self.pix_scheme == 'rectpix': # Promote geometry to one with the active tiles marked. self.geom = tilemap.geometry( self.geom.shape, self.geom.wcs, self.geom.tile_shape, active=self.active_tiles) return self._get_proj(), tile_info def _get_proj_threads(self, cuts=None): """Return the Projectionist and sample-thread assignment for the present geometry. If the thread assignment has not been determined yet, it is done now and cached in self.threads. In tiled geometries, if self.active_tiles has not been determined, that is done now and cached. The returned sample-thread assignment is modified by "cuts", which defaults to self.cuts if passed as None. I.e. after computing or looking up the full self.threads, the code returns (proj, self.threads*~cuts). Returns: Tuple (proj, threads*cuts). """ proj = self._get_proj() if cuts is None: cuts = self.cuts if self.threads is None: if self.pix_scheme == "rectpix": self.threads = 'domdir' elif self.pix_scheme == "healpix": self.threads = 'tiles' if (self.geom.nside_tile is not None) else 'simple' need_tiles = (self.active_tiles is None) need_assign = (self.threads in ['tiles']) if need_tiles or need_assign: proj, tile_info = self._get_proj_tiles(need_assign) if need_assign: _tile_threads = wrap_ranges(tile_info['group_ranges']) if self.threads is False: return proj, ~cuts if isinstance(self.threads, str): if self.threads in ['simple', 'domdir']: logger.info(f'_get_proj_threads: assigning using "{self.threads}"') self.threads = wrap_ranges(proj.assign_threads( self._get_asm(), method=self.threads)) elif self.threads == 'tiles': # Computed above unless logic failed us... self.threads = _tile_threads else: raise ValueError('Request for unknown algo threads="%s"' % self.threads) if cuts: threads = [_t * ~cuts for _t in self.threads] else: threads = self.threads return proj, threads def _get_asm(self): """Bundles self.fp and self.sight into an "Assembly" for calling so3g.proj routines.""" return so3g.proj.Assembly.attach(self.sight, self.fp) def _prepare_map(self, map): """Gently reformat a map in order to send it to so3g.""" if self.tiled and self.pix_scheme == "rectpix": return list(map.tiles) else: return map def _flatten_map(self, map, uf_base=None): """Get a version of the map that is a numpy array, for passing to per-pixel math operations. Relies on (self.pix_scheme, self.tiled) to interpret map. This also tries to extract wcs info (if rectpix) from the map, for inline consistency checking (e.g. so we're not happily projecting into a map we loaded from disk that has the same shape but is off by a few pixels from what pmat thinks is the right footprint). It also looks at active_tiles / tile_list, and stores that for downstream compatibility checking with other flattened maps. If uf_base is passed in, it should be an unflatten_info dict (likely from a previous call to _flatten_map). The analysis here will be checked against it for compatibility and any missing values (ahem wcs) will be used to augment the unflatten_info that is returned. Returns: array: The map, reformatted as an array (could simply be the input arg map, or a view of that, or a copy if necessary). unflatten_info: dict with misc compatibility info. """ ufinfo = {'pix_scheme': self.pix_scheme, 'tiled': self.tiled} wcs = None crit_dims = 1 if self.pix_scheme == 'healpix': if self.tiled: ufinfo['tile_list'] = [_m is not None for _m in map] map = hp_utils.tiled_to_compressed(map, -1) crit_dims = 2 else: pass elif self.pix_scheme == 'rectpix': if self.tiled: if isinstance(map, tilemap.TileMap): wcs = map.geometry.wcs ufinfo['active_tiles'] = list(map.active) ufinfo['tile_geom'] = map.geometry.copy(pre=()) else: if isinstance(map, enmap.ndmap): wcs = map.wcs crit_dims = 2 ufinfo.update({'wcs': wcs, 'crit_dims': crit_dims, 'shape': map.shape}) if uf_base is not None: ufinfo['wcs'] = _check_compat(uf_base, ufinfo) return map, ufinfo def _unflatten_map(self, map, uf_info): """Restore a map to full format, assuming it's currently an ndarray. Intended as the inverse op to _flatten_map. Minimize the use of cached self.* here ... rely instead on uf_info. """ if uf_info['pix_scheme'] == 'healpix': if uf_info['tiled']: map = hp_utils.compressed_to_tiled(map, uf_info['tile_list'], -1) else: pass elif uf_info['pix_scheme'] == 'rectpix': if uf_info['tiled']: if not isinstance(map, tilemap.TileMap): g = uf_info['tile_geom'] g = tilemap.geometry(map.shape[:-1] + g.shape, g.wcs, g.tile_shape, active=uf_info['active_tiles']) map = tilemap.TileMap(map, g) else: if not isinstance(map, enmap.ndmap) and uf_info['wcs']: map = enmap.ndmap(map, uf_info['wcs']) return map
class P_PrecompDebug: def __init__(self, geom, pixels, phases): self.geom = wrap_geom(geom).nopre self.pixels = pixels self.phases = phases def zeros(self, super_shape=None): if super_shape is None: super_shape = (self.phases.shape[2],) return enmap.zeros(super_shape + self.geom.shape, self.geom.wcs) def to_map(self, dest=None, signal=None, comps=None): if dest is None: dest = self.zeros() proj = so3g.ProjEng_Precomp_NonTiled() proj.to_map(dest, self.pixels, self.phases, signal, None, None) return dest def from_map(self, signal_map, dest=None, comps=None): if dest is None: dest = np.zeros(self.pixels.shape[:2], np.float32) proj = so3g.ProjEng_Precomp_NonTiled() proj.from_map(signal_map, self.pixels, self.phases, dest) return dest def wrap_geom(geom): if isinstance(geom, tuple) or isinstance(geom, list): return enmap.Geometry(*geom) else: return geom # Helpers for backwards compatibility with old so3g. Consider removing # these once transition is done. The idea is for sotodlib to use the # more recent interface ("threads" is a list of RangesMatrix objects) # while supporting use with older so3g (where "threads" is a single # 3-d RangesMatrix). def wrap_ranges(ranges): # Run this on the "threads" result returned from so3g thread # assignement routines, before storing the result internally. if _so3g_ivals_format() == 1: return [ranges] else: return ranges def unwrap_ranges(ranges): # Run this on the internally stored threads object before passing # it to so3g projection routines. if _so3g_ivals_format() == 1: assert len(ranges) == 1, "Old so3g only supports simple (1-bunch) thread ranges, but got thread ranges with shape %s" % (str(ranges.shape)) return ranges[0] else: return ranges def _so3g_ivals_format(): projclass = so3g.proj.Projectionist if not hasattr(projclass, '_ivals_format'): return 1 else: return projclass._ivals_format def _get_interpol_args(interpol): if _so3g_ivals_format() >= 2: return {'interpol': interpol} assert interpol in [None, "nn", "nearest"], "Old so3g does not support interpolated mapmaking" return {} def _infer_pix_scheme(geom): fail_err = ValueError(f"Cannot determine pix_scheme from geom {geom}") # Healpix: geom contains 'nside' attribute if hasattr(geom, "nside"): pix_scheme = "healpix" # Rectpix: geom is None, an enmap Geometry or TileGeometry or a tuple/list that can be converted to one elif isinstance(geom, enmap.Geometry) or isinstance(geom, tilemap.TileGeometry): pix_scheme = "rectpix" elif isinstance(geom, tuple) or isinstance(geom, list): try: enmap.Geometry(*geom) pix_scheme = "rectpix" except: raise fail_err elif geom is None: pix_scheme = "rectpix" else: raise fail_err return pix_scheme def _check_compat(*uf_infos): # Given one or more "uf_info" dicts, as returned by _flatten_map, # check that the flattened arrays are pixel-correspondent, # including (for rectpix cases) the wcs. # # Raises an error if any of that doesn't pan out. On success, # returns the agreed-upon wcs (which could be None). ref_uf = uf_infos[0] for uf in uf_infos[1:]: for k in ['pix_scheme', 'tiled', 'active_tiles']: if ref_uf.get(k) != uf.get(k): raise ValueError(f"Inconsistent map structures: {uf_infos}") # Not sure how to handle broadcasting of lefter dims, so focus # on the pixel dim(s). dims_to_check = ref_uf['crit_dims'] if ref_uf['shape'][-dims_to_check:] != uf['shape'][-dims_to_check:]: raise ValueError(f"Non-broadcastable map shapes: {uf_infos}") # And the wcs. wcss = [uf.get('wcs') for uf in uf_infos] wcs_to_use = _confirm_wcs(*wcss) return wcs_to_use