Source code for sotodlib.site_pipeline.util

import math
import os
import inspect
import logging
import time
import sys
import argparse

from astropy import units as u

from .. import core

[docs] class ArchivePolicy: """Storage policy assistance. Helps to determine the HDF5 filename and dataset name for a result. Make me better! """
[docs] @staticmethod def from_params(params): if params['type'] == 'simple': return ArchivePolicy(**params) if params['type'] == 'directory': return DirectoryArchivePolicy(**params) raise ValueError('No handler for "type"="%s"' % params['type'])
def __init__(self, **kwargs): self.filename = kwargs['filename']
[docs] def get_dest(self, product_id): """Returns (hdf_filename, dataset_addr). """ return self.filename, product_id
[docs] class DirectoryArchivePolicy: """Storage policy for stuff organized directly on the filesystem. """ def __init__(self, **kwargs): self.root_dir = kwargs['root_dir'] self.pattern = kwargs['pattern']
[docs] def get_dest(self, **kw): """Returns full path to destination directory. """ return os.path.join(self.root_dir, self.pattern.format(**kw))
[docs] def parse_quantity(val, default_units=None): """Convert an expression with units into an astropy Quantity. Args: val: the expression (see Notes). default_units: the units to assume if they are not provided in val. Returns: The astropy Quantity decoded from the argument. Note the quantity is converted to the default_units, if they are provided. Notes: The default_units, if provided, should be "unit-like", by which we mean it is either: - An astropy Unit. - A string that astropy.units.Unit() can parse. The val can be any of the following: - A tuple (x, u) or list [x, u], where x is a float and u is unit-like. - A string (x), where x can be parsed by astropy.units.Quantity. - A float (x), but only if default_units is not None. Examples: >>> parse_quantity('100 arcsec') <Quantity 100. arcsec> >>> parse_quantity([12., 'deg']) <Quantity 12. deg> >>> parse_quantity('15 arcmin', 'deg') <Quantity 0.25 deg> >>> parse_quantity(100, 'm') <Quantity 100. m> """ if default_units is not None: default_units = u.Unit(default_units) if isinstance(val, str): q = u.Quantity(val) elif isinstance(val, (list, tuple)): q = val[0] * u.Unit(val[1]) elif isinstance(val, (float, int)): if default_units is None: raise ValueError( f"Cannot decode argument '{val}' without default_units.") q = val * default_units if default_units is not None: q = q.to(default_units) return q
def _filter_dict(d, bad_keys=['_stop_here']): if not isinstance(d, dict): return d # Support for lookup_conditional return {k: v for k, v in d.items() if k not in bad_keys}
[docs] def lookup_conditional(source, key, tags=None, default=KeyError): """Lookup a value in a dict, with the possibility of descending through nested dictionaries using tags provided by the user. This function returns the returns source[key] unless source[key] is a dict, in which case the tags (a list of strings) are each tested in the dict to see if they lead to a sub-setting. For example, if the source dictionary is {'number': {'a': 1, 'b': 2}} and the user requests key 'number', with tags=['a'], then the returned value will be 1. If you want a dict to be returned literally, and not crawled further, include a dummy key '_stop_here', with arbitrary value (this key will be removed from the result before returning to the user). The key '_default' will always cause a match, even if none of the other tags match. (This _default value also becomes the default if further recursion fails to yield an exact match.) Args: source (dict): The parameter tree to search. key (str): The key to terminate the search on. tags (list of str or None): tags that may be auto-descended. default: Value to return if the search does not resolve. The special value KeyError will instead cause a KeyError to be raised if the search is not resolved. Examples:: source = { 'my_param': { '_default': 100., 'f150': 90. } } lookup_conditional(source, 'my_param') => 100. lookup_conditional(source, 'my_param', tags=['f090']) => 100. lookup_conditional(source, 'my_param', tags=['f150']) => 90. lookup_conditional(source, 'my_other_param') KeyError! lookup_conditional(source, 'my_other_param', default=0) => 0 # Note _default takes precedence over default argument. lookup_conditional(source, 'my_param', default=0) => 100. # Nested example: source = { 'fit_params': { '_default': { 'a': 12, 'b': 100, '_stop_here': None, # don't descend any further. }, 'f150': { 'SAT': { 'a': 1000, 'b': 1200, '_stop_here': None, }, 'LAT': { 'a': 1, 'b': 2, '_stop_here': None, }, }, }, } lookup_conditional(source, 'fit_params', tags=['f150', 'LAT']) => {'a': 1, 'b': 2} lookup_conditional(source, 'fit_params', tags=['LAT']) => {'a': 12, 'b': 100} lookup_conditional(source, 'fit_params', tags=['f150']) => {'a': 12, 'b': 100} """ if tags is None: tags = [] if key is not None: # On entry, key is not None. result = default if key in source: result = lookup_conditional(source[key], None, tags=tags, default=default) if inspect.isclass(result) and issubclass(result, Exception): raise result(f"Failed to find key '{key}' in {source}") return result else: # This block is entered on recursion. if not isinstance(source, dict): return source if '_stop_here' in source: return _filter_dict(source) # Update default? if '_default' in source: default = _filter_dict(source['_default']) # Find a tag. for t in tags: if t in source: return lookup_conditional(source[t], None, tags=tags, default=default) return default
class _ReltimeFormatter(logging.Formatter): def __init__(self, *args, t0=None, **kw): super().__init__(*args, **kw) if t0 is None: t0 = time.time() self.start_time = t0 def formatTime(self, record, datefmt=None): if datefmt is None: datefmt = '%8.3f' return datefmt % (record.created - self.start_time)
[docs] def init_logger(name, announce=''): """Configure and return a logger for site_pipeline elements. It is disconnected from general sotodlib (propagate=False) and displays relative instead of absolute timestamps. """ logger = logging.getLogger(name) # add handler only if it doesn't exist if len(logger.handlers) == 0: ch = logging.StreamHandler(sys.stdout) formatter = _ReltimeFormatter('%(asctime)s: %(message)s (%(levelname)s)') ch.setLevel(logging.INFO) ch.setFormatter(formatter) logger.addHandler(ch) i, r = formatter.start_time // 1, formatter.start_time % 1 text = (time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(i)) + (',%03d' % (r*1000))) logger.info(f'{announce}Log timestamps are relative to {text}') logger.propagate = False logger.setLevel(logging.DEBUG) return logger
[docs] def main_launcher(main_func, parser_func, args=None): """Launch an element's main entry point function, after generating a parser and executing it on the command line arguments (or args if it is passed in). Args: main_func: the main entry point for a pipeline element. parser_func: the argument parser generation function for a pipeline element. args (list of str): arguments to parse (default is None, which will lead to sys.argv[1:]). Returns: Whatever main_func returns. """ if args is None: args = sys.argv[1:] return main_func(**vars(parser_func().parse_args(args=args)))