from collections import OrderedDict as odict
import yaml
import os
import importlib
import logging
import numpy as np
from . import metadata
from .util import tag_substr
from .axisman import AxisManager, OffsetAxis, AxisInterface
logger = logging.getLogger(__name__)
[docs]
class Context(odict):
# Sets of special handlers may be registered in this class variable, then
# requested by name in the context.yaml key "context_hooks".
hook_sets = {}
[docs]
def __init__(self, filename=None, site_file=None, user_file=None,
data=None, load_list='all'):
"""Construct a Context object. Note this is an ordereddict with a few
attributes added on.
Args:
filename (str): Path to the dataset context file. If None,
that's fine.
site_file (str): Path to the site file. If None, then the
value of SOTODLIB_SITECONFIG environment variable is used;
unless that's unset in which case the file site.yaml in
the current directory will be used.
user_file (str): Path to the user file. If None, then the
value of SOTODLIB_USERCONFIG environment variable is used;
unless that's unset in which case the file
~/.sotodlib.yaml will be used.
data (dict or None): Optional dict of context data to merge
in, after loading the site, user and main context files.
Note the data are merged in with the usual rules (so items
in data['tags'] will me merged into self['tags'].)
load_list (str or list): A list of databases to load; some
combination of 'obsdb', 'detdb', 'obsfiledb', or the
string 'all' to load all of them (default).
"""
super().__init__()
# Start with site and user config.
site_ok, site_file, site_cfg = _read_cfg(
site_file, 'SOTODLIB_SITECONFIG',
os.path.join(os.getcwd(), 'site.yaml'))
logger.info(f'Using site_file={site_file}.')
user_ok, user_file, user_cfg = _read_cfg(
user_file, 'SOTODLIB_USERCONFIG',
os.path.expanduser('~/.sotodlib.yaml'))
logger.info(f'Using user_file={user_file}.')
self.update(site_cfg)
self.update_context(user_cfg)
ok, full_filename, context_cfg = _read_cfg(filename)
if filename is not None and not ok:
raise RuntimeError(
'Could not load requested context file %s' % filename)
logger.info(f'Using context_file={full_filename}.')
self.update_context(context_cfg)
# Update with anything the user passed in.
if data is not None:
self.update_context(data)
self.site_file = site_file
self.user_file = user_file
self.filename = full_filename
self.obsdb = None
self.detdb = None
self.obsfiledb = None
self.obs_detdb = None
for to_import in self.get('imports', []):
importlib.import_module(to_import)
# Activate the requested hook set
if self.get('context_hooks'):
self._hooks = self.hook_sets[self['context_hooks']]
else:
self._hooks = {}
# Check-default 'tags' dict.
self['tags'] = self._get_warn_missing('tags', {})
# Perform recursive substitution on strings defined in tags.
tag_substr(self, self['tags'])
# Load basic databases.
self.reload(load_list)
# Call a post-processing hook before returning to user?
self._call_hook('on-context-ready')
def _call_hook(self, hook_key, *args, **kwargs):
hook_func = self._hooks.get(hook_key)
if hook_func is None:
return
logger.info('Calling hook for %s: %s' % (hook_key, hook_func))
hook_func(self, *args, **kwargs)
def _get_warn_missing(self, k, default=None):
if not k in self:
logger.warning(f'Key "{k}" not present in context.')
return default
return self[k]
def update_context(self, new_stuff):
appendable = ['metadata']
mergeable = ['tags']
for k, v in new_stuff.items():
if k in appendable and k in self:
self[k].extend(v)
elif k in mergeable and k in self:
self[k].update(v)
else:
self[k] = v
[docs]
def reload(self, load_list='all'):
"""Load (or reload) certain databases associated with this dataset.
(Note we don't load any per-observation metadata here.)
"""
# Metadata support databases.
for key, cls in [('detdb', metadata.DetDb),
('obsdb', metadata.ObsDb),
('obsfiledb', metadata.ObsFileDb)]:
if (load_list == 'all' or key in load_list) and key in self:
db_file = self[key]
if not db_file.startswith('/'):
# Relative to context file.
db_file = os.path.join(os.path.split(self.filename)[0], db_file)
db_file = os.path.abspath(db_file)
logger.info(f'Loading {key} from {self[key]} -> {db_file}.')
try:
db = cls.from_file(db_file, force_new_db=False)
except Exception as e:
logger.error(f'DB failure when loading {key} from {self[key]} -> {db_file}\n')
raise e
setattr(self, key, db)
# The metadata loader.
if load_list == 'all' or 'loader' in load_list:
self.loader \
= metadata.SuperLoader(self, obsdb=self.obsdb)
[docs]
def get_obs(self,
obs_id=None,
dets=None,
samples=None,
filename=None,
detsets=None,
meta=None,
ignore_missing=None,
on_missing=None,
free_tags=None,
no_signal=None,
loader_type=None,
):
"""Load TOD and supporting metadata for some observation.
Most arguments to this function are also accepted by (and in
fact passed directly to) :func:`get_meta`, but are documented
here.
Args:
obs_id (multiple): The observation to load (see Notes).
dets (list, array, dict or ResultSet): The detectors to
read. If None, all dets will be read.
samples (tuple of ints): The start and stop sample indices.
If None, read all samples. (Note that some loader
functions might not support this argument.)
filename (str): The path to a file to load, instead of using
obs_id. It is still required that this file appear in the
obsfiledb, but this shortcut will automatically determine
the obs_id and the detector and sample range selections
that correspond to this single file.
detsets (list, array): The detsets to read (with None
equivalent to requesting all detsets).
meta (AxisManager): An AxisManager returned by get_meta
(though possibly with additional axis restrictions
applied) to use as a starting point for detector selection
and sample range. (This will eventually be passed back to
get_meta in the meta= argument, to fill in any missing
metadata fields.)
free_tags (list): Strings to match against the
obs_colon_tags fields for detector restrictions.
ignore_missing (bool): If True, don't fail when a metadata
item can't be loaded, just try to proceed without it.
on_missing (dict): If a metadata entry has a label that
matches a key in this dict, the corresponding value in
this dict will override the on_missing setting from the
metadata entry.
no_signal (bool): If True, the .signal will be set to None.
This is a way to get the axes and pointing info without
the (large) TOD blob. Not all loaders may support this.
loader_type (str): Name of the registered TOD loader
function to use (this will override whatever is specified
in context.yaml).
Notes:
It is acceptable to pass the ``obs_id`` argument by position
(first), but all other arguments should be passed by
keyword.
The ``obs_id`` can be any of the following:
- a string -- this is interpreted as the literal obs_id as
used in the ObsDb and ObsFileDb. Note however that this
string may include "free tags" (see below).
- a dict -- this is understood to be an ObsDb record, and
the value under key 'obs_id' will be used as the obs_id
(the other items will be ignored).
- an AxisManager -- this is a short-hand for passing an
object through meta=... . I.e., ``get_obs(obs_is=axisman)``
is treated the same way as ``get_obs(obs_id=None, meta=axisman)``.
Detector subselection is achieved through the ``dets``
argument. If this is a dict, the keys must all be fields
appearing in det_info. Typically det_info will include at
least readout_id and detset (this is the indexing
information from ObsFileDb). Some examples are::
dets={'readout_id': ['det_00', 'det_01']}
dets={'detset': 'wafer21'}
dets={'band': ['f090']}
dets={'detset': ['wafer21', 'wafer22'], 'band': ['f150']}
Each value in ``dets`` can be a single item, or a list or
numpy array of items. The keys may include an optional
'dets:' prefix.
If ``dets`` is passed as a list or numpy array, that is
equivalent to passing that value in through a dict with key
'readout_id'; e.g.::
dets=['det_00', 'det_01']
You can instead pass a "det_info" ResultSet directly into
the dets argument; that is equivalent to passing
dets=det_info['readout_id']. This is to accomodate the
following sort of pattern::
det_info = context.get_det_info(obs_id)
det_info = det_info.subset(rows=(det_info['band'] == 'f090'))
tod = context.get_obs(obs_id, dets=det_info)
The sample range to load is determined by the samples
argument. Use Python start/stop indexing; for example
samples=(0, -2) will try to read all but the last two
samples and samples=(100, None) will read all samples except
the first 100.
When passing in ``meta``, the obs_id, detector list, and
sample range will be extracted from that object. It is an
error to also specify ``obs_id``, ``dets``, ``samples``,
``filename``, or ``free_ags`` (but this could change).
"""
meta = self.get_meta(obs_id=obs_id, dets=dets, samples=samples,
filename=filename, detsets=detsets, meta=meta,
free_tags=free_tags, ignore_missing=ignore_missing,
on_missing=on_missing)
# Use the obs_id, dets, and samples from meta.
obs_id = meta['obs_info']['obs_id']
dets = list(meta.det_info['readout_id'])
if samples is None and 'samps' in meta:
samples = (meta.samps.offset, meta.samps.offset + meta.samps.count)
# Make sure standard obsloaders are registered ...
from ..io import load as _
# Load TOD.
if loader_type is None:
loader_type = self.get('obs_loader_type', 'default')
loader_func = OBSLOADER_REGISTRY[loader_type] # Register your loader?
aman = loader_func(self.obsfiledb, obs_id, dets=dets,
samples=samples, no_signal=no_signal)
if aman is None:
return meta
if meta is not None:
if 'det_info' in aman and 'det_info' in meta:
# If the loader added det_info, then perform a special
# merge. Duplicate keys should be avoided, because
# checking the values are the same is annoying.
_det_info = aman['det_info']
del aman['det_info']
_det_info.restrict_axes([meta.dets])
for k in meta.det_info._fields:
if k in _det_info._fields:
try:
check = np.all([meta['det_info'][k] ==_det_info[k]])
if check:
_det_info.move(k, None)
continue
except Exception as e:
pass
logger.error(f'Key "{k}" is present in det_info returned by '
f'observation loader as well as in metadata '
f'databases; The two versions are not '
f'comparable. dropping the loader version.')
_det_info.move(k, None)
meta.det_info.merge(_det_info)
aman.merge(meta)
return aman
[docs]
def get_meta(self,
obs_id=None,
dets=None,
samples=None,
filename=None,
detsets=None,
meta=None,
free_tags=None,
check=False,
ignore_missing=False,
on_missing=None,
det_info_scan=False):
"""Load supporting metadata for an observation and return it in an
AxisManager.
The arguments shared with :func:`get_obs` (``obs_id``,
``dets``, ``samples``, ``filename``, ``detsets``, ``meta``,
``free_tags``, ``ignore_missing``, ``on_missing``) have the
same meaning as in that function and are treated in the same
way.
Args:
check (bool): If True, run in a check mode where an attempt
is made to load each metadata entry, but the results are
not kept and instead the function returns a report on what
entries could / could not be loaded
det_info_scan (bool): If True, only process the metadata
entries that explicitly modify det_info.
Returns:
AxisManager with a .dets LabelAxis and .det_info and
.obs_info entries. If samples is specified, or if any
metadata loads triggered its creation, then the .samps
OffsetAxis is also created.
Notes:
When ``meta`` is passed in, it will be used to figure out
the obs_id and detector and sample selections; however a new
metadata AxisManager is returned. Users should not rely on
this; future improvements might modify meta in place, and
try to re-use entries already present rather than loading
them a second time.
"""
def _warn_conflict(preamble, **kwargs):
fails = {k: v for k, v in kwargs.items() if v is not None}
if len(fails):
logger.warning(f'{preamble}: arguments ignored: {fails}')
free_tag_fields = self.get('obs_colon_tags', [])
free_tags = list(free_tags) if free_tags else []
if filename is not None:
_warn_conflict(
'Passing filename={filename} to get_obs with incompatible other args',
obs_id=obs_id, detsets=detsets, samples=samples)
# Resolve this to an obs_id / detset combo.
info = self.obsfiledb.lookup_file(filename, resolve_paths=True)
obs_id = info['obs_id']
detsets = info['detsets']
if info['sample_range'] is None or None in info['sample_range']:
samples = None
logger.warning('Due to incomplete ObsFileDb info, passing filename=... '
'will cause *all* files for the detset covered '
'by that file to be loaded.')
else:
samples = info['sample_range']
# Handle some special cases for obs_id; at the end of this
# checks and conversion, obs_id should be a string.
if isinstance(obs_id, AxisManager):
# Just move that to the meta argument.
_warn_conflict(
'Argument obs_id=<AxisManager> is incompatible with other args',
meta=meta)
obs_id, meta = None, obs_id
elif isinstance(obs_id, dict):
obs_id = obs_id['obs_id'] # You passed in a dict.
elif isinstance(obs_id, str):
# If the obs_id has colon-coded free tags, extract them.
if ':' in obs_id:
tokens = obs_id.split(':')
obs_id = tokens[0]
free_tags.extend(tokens[1:])
if meta is not None:
_warn_conflict(
'Argument meta=<AxisManager> causes det/sample args to be ignored',
samples=samples, dets=dets, detsets=detsets)
obs_id = meta.obs_info['obs_id']
dets = {'dets:readout_id': list(meta.dets.vals)}
if 'samps' in meta:
samples = meta.samps.offset, meta.samps.offset + meta.samps.count
# Call a hook after preparing obs_id but before loading obs
self._call_hook('before-use-detdb', obs_id=obs_id)
# Identify whether we should use a detdb or an obs_detdb
# If there is an obs_detdb, use that.
# Otherwise, use whatever is in self.detdb, even if that is None.
if self.obs_detdb is not None:
detdb = self.obs_detdb
else:
detdb = self.detdb
# Initialize det_info, starting with detdb.
det_info = None
if detdb is not None:
det_info = detdb.props()
# Backwards compatibility -- add "readout_id" if not found.
if 'readout_id' not in det_info.keys:
logger.warning('DetDb does not contain "readout_id"; aliasing from "name".')
det_info.merge(metadata.ResultSet(
['readout_id'], [(name,) for name in det_info['name']]))
# Incorporate detset info from obsfiledb.
detsets_info = self.obsfiledb.get_det_table(obs_id)
det_info = metadata.merge_det_info(det_info, detsets_info)
# Make the request for SuperLoader
request = {'obs:obs_id': obs_id}
if detsets is not None:
request['dets:detset'] = detsets
# Convert dets argument to request entry(s)
if isinstance(dets, dict):
for k, v in dets.items():
if not k.startswith('dets:'):
k = 'dets:' + k
if k in request:
raise ValueError(f'Duplicate specification of dets field "{k}"')
request[k] = v
elif isinstance(dets, metadata.ResultSet):
request['dets:readout_id'] = dets['readout_id']
elif hasattr(dets, '__getitem__'):
# lists, tuples, arrays ...
request['dets:readout_id'] = dets
elif dets is not None:
# Try a cast ...
request['dets:readout_id'] = list(dets)
metadata_list = self._get_warn_missing('metadata', [])
meta = self.loader.load(metadata_list, request, det_info=det_info, check=check,
free_tags=free_tags, free_tag_fields=free_tag_fields,
det_info_scan=det_info_scan, ignore_missing=ignore_missing,
on_missing=on_missing)
if check:
return meta
if samples is not None:
if 'samps' in meta:
meta.restrict('samps', slice(*samples))
else:
start, stop = samples
assert(start >= 0 and stop >= 0) # This could be loosened using obsfiledb
axm = AxisManager(OffsetAxis('samps', stop - start, start, obs_id))
meta = meta.merge(axm)
return meta
[docs]
def get_det_info(self,
obs_id=None,
dets=None,
samples=None,
filename=None,
detsets=None,
meta=None,
free_tags=None,
on_missing=None):
"""Pass all arguments to :func:`get_meta(det_info_scan=True)`, and
then return only the det_info, as a ResultSet.
"""
if meta is None:
meta = self.get_meta(obs_id=obs_id, dets=dets, samples=samples,
filename=filename, detsets=detsets, free_tags=free_tags,
on_missing=on_missing, det_info_scan=True)
# Convert
def _unpack(aman):
items = []
for k in aman.keys():
if isinstance(aman[k], AxisManager):
sub_items = _unpack(aman[k])
for _k, _c in sub_items:
items.append((f'{k}.{_k}', _c))
elif isinstance(aman[k], AxisInterface):
pass
else:
items.append((k, aman[k]))
return items
items = _unpack(meta.det_info)
return metadata.ResultSet([k for k, v in items],
zip(*[v for k, v in items]))
def _read_cfg(filename=None, envvar=None, default=None):
"""Load a YAML file. If filename is None, use the filename specified
in the environment variable called envvar. If that is not defined
or decodes to None or an empty string, use the filename specified
in default.
Returns (ok, full_path, data) where ok is a boolean indicating
whether the file at full_path was found on the file-system,
full_path is the full path to the resolved filename (or None if
not resolved), and data is the OrderedDict containing the data (or
{} if not decoded).
"""
if filename is None and envvar is not None:
filename = os.getenv(envvar)
if filename is None or filename == '':
filename = None
if filename is None and default is not None:
filename = default
if filename is None:
return False, None, odict()
filename = os.path.abspath(filename)
if not os.path.exists(filename):
return False, filename, odict()
return True, filename, yaml.safe_load(open(filename, 'r'))
[docs]
def obsloader_template(db, obs_id, dets=None, prefix=None, samples=None,
no_signal=None,
**kwargs):
"""This function is here to document the API for "obsloader" functions
used by the Context system. "obsloader" functions are used to
load time-ordered detector data (rather than supporting metadata)
from file archives, and return an AxisManager.
Args:
db (ObsFileDB): The database supporting the data files.
obs_id (str): The obs_id (as recognized by ObsFileDb).
dets (list of str): The dets to load. If None, all dets are
loaded. If an empty list, ancillary data for the observation
is still loaded.
samples (tuple): The (start, end) indices of samples which
should be loaded. If start is None, 0 is used. If end is
None, sample_count is used. Passing None is equivalent to
passing (None, None).
prefix (str): The root address of the data files, if not already
known to the ObsFileDb. (This is passed through to ObsFileDb
prefix= argument.)
no_signal (bool): If True, loader should avoid reading signal
data (if possible) and should set .signal=None in the output.
Passing None is equivalent to passing False.
Notes:
This interface is subject to further extension. When possible
such extensions should take the form of optional arguments,
whose default value is None and which are not activated except
when needed. This permits existing loaders to future-proof
themselves by including ``**kwargs`` in the function signature
but raising an exception if kwargs contains anything strange.
See the body of this example function for template code to
reject unexpected kwargs.
Returns:
An AxisManager with the data.
"""
if any([v is not None for v in kwargs.values()]):
raise RuntimeError(
f"This loader function does not understand these kwargs: f{kwargs}")
raise NotImplementedError("This is just a template function.")
#: OBSLOADER_REGISTRY will be accessed by the Context system to load
#: TOD. The function obsloader_template, in this module, shows the
#: signature and describes the interface.
OBSLOADER_REGISTRY = {}