Source code for sotodlib.site_pipeline.update_obsdb_ancil

"""update-obsdb-ancil

This script uses io.ancil modules to update archives of reduced
ancillary data and to update an obsdb with quantities computed based
on those ancillary data.

The entry point is :func:`main` but the different command functions
can be called directly.

"""

import argparse
import itertools
import logging
import time
from typing import Optional
import yaml

from sotodlib import core
from sotodlib.site_pipeline.utils.logging import init_logger
from sotodlib.site_pipeline.update_obsdb import main as uodb_main
from sotodlib.io import ancil


logger = init_logger('update-obsdb-ancil', 'update-obsdb-ancil: ')

DAY = 86400
DEFAULT_CONFIG = {
    'lookback_time':  7 * DAY,
    'datasets': {},
}
DEFAULT_CONFIG_FILENAME = 'ancil.yaml'


def _engines_iter(cfg, args_datasets):
    if args_datasets is None or len(args_datasets) == 0:
        args_datasets = cfg['datasets'].keys()
    for k in args_datasets:
        yield (k, ancil.utils.get_engine(k, cfg['datasets'][k]))


def _get_config(config_file):
    cfg = DEFAULT_CONFIG | yaml.safe_load(open(config_file, 'rb'))
    datasets = cfg.get('datasets')
    if datasets is None:
        return cfg
    for broadcast_key in ['data_prefix']:
        if (val := cfg.get(broadcast_key)) is None:
            continue
        for ds in datasets.values():
            if ds.get(broadcast_key) is None:
                ds[broadcast_key] = val
    return cfg


[docs] def import_records(config_file, time_range=None): cfg = _get_config(config_file) target_db = cfg['target_obsdb'] if cfg.get('upstream_obsdb') is None: raise RuntimeError("No upstream_obsdb is configured.") upstream_db = cfg['upstream_obsdb'] logger.info(f'Checking for new records in {upstream_db} ...') db = core.metadata.ObsDb(target_db) db_up = core.metadata.ObsDb(upstream_db) report = core.metadata.obsdb.diff_obsdbs(db, db_up) if not report['different']: logger.info(' ... databases are in sync.') return if not report['patchable']: logger.error(' ... upstream and target have irreconcilable differences.') raise RuntimeError("Target obsdb cannot be patched to match upstream.") pd = report['patch_data'] if len(pd['remove_obs']): raise RuntimeError("obsdb specifies obs removal.") if len(pd['remove_tags']): raise RuntimeError("obsdb specifies tags removal.") logger.info(f'Adding records -- {len(pd["new_obs"])} new obs ' f'and {len(pd["new_tags"])} new tags ...') core.metadata.obsdb.patch_obsdb(pd, db) logger.info(' ... done')
[docs] def update_base_data(config_file, time_range=None, datasets=None, full_scan=False, verbose=None): cfg = _get_config(config_file) if full_scan: logger.info(f'Performing full update of base data.') elif time_range is None: now = time.time() time_range = (now - cfg['lookback_time'], now) logger.info(f'Updating base data over time range {time_range}.') for dataset, engine in _engines_iter(cfg, datasets): logger.info(f'Updating base data for {dataset}...') engine.update_base(time_range) logger.info(f'Finished updating base data.')
[docs] def update_obsdb(config_file, time_range=None, datasets=None, redo=False, verbose=None): cfg = _get_config(config_file) show_pbar = verbose is None or verbose > 0 logger.info(f'Updating obsdb') for dataset, engine in _engines_iter(cfg, datasets): logger.info(f'Processing {dataset}...') engine.register_friends(cfg['datasets']) # Ensure target columns are present in db. obsdb = core.metadata.ObsDb(cfg['target_obsdb']) engine.obsdb_check(obsdb, create_cols=True) # Find records that need to be updated. q = engine.obsdb_query(time_range=time_range, redo=redo) logger.info(f'Query for records to update is: "{q}"') recs = obsdb.query(q) del obsdb logger.info(f' ... identified {len(recs)} items to update.') if len(recs) == 0: continue recs_it = iter(recs) while rec_bunch := list(itertools.islice(recs_it, 200)): results = engine.collect(rec_bunch, for_obsdb=True, show_pbar=show_pbar) logger.info(f' ... updating obsdb.') obsdb = core.metadata.ObsDb(cfg['target_obsdb']) for rec, result in zip(rec_bunch, results): logger.debug(f"{rec['obs_id']} : {result}") obsdb.update_obs(rec['obs_id'], result, commit=False) obsdb.conn.commit() del obsdb
[docs] def get_parser(parser=None): if parser is None: parser = argparse.ArgumentParser() def add_args(p, *args): if '--config-file' in args: p.add_argument('--config-file', '-c', default=DEFAULT_CONFIG_FILENAME, help="Path to config file.") if '--dataset' in args: p.add_argument( '--dataset', '-d', action='append', help=""" Limit processing to only the specified dataset. If passed multiple times, the chosen datasets are processed in the corresponding order.""") if '--time-range' in args: p.add_argument( '--time-range', nargs=2, type=float, help=""" Restrict processing to items falling between the two provided unix timestamps.""") if '--time-range' in args or '--lookback-days' in args: p.add_argument( '--lookback-days', type=float, help=""" As an alternative to --time-range, restrict processing to items falling between the present time and the specified number of days in the past.""") parser.add_argument('--verbose', '-v', action='count', help= "Pass one or more times to increase logging verbosity.") sps = parser.add_subparsers(title='Commands', dest='command') p = sps.add_parser('update-base-data', help="""Update the base data for an ancillary data archive. This will typically trigger loading of the data from the source, and then reduction and storage in the local archive.""") add_args(p, '--config-file', '--dataset', '--time-range') p.add_argument('--full-scan', action='store_true') p = sps.add_parser('update-obsdb', help=""" Update obsdb records based on data already in the archives. This is typically run after refreshing the base data with update-base-data.""") add_args(p, '--config-file', '--dataset', '--time-range') p.add_argument('--redo', action='store_true') p = sps.add_parser('update-books', help=""" Update obsdb *and* obsfiledb by scanning data directories for new books. This calls site_pipeline.update_obsdb.""") add_args(p, '--config-file', '--lookback-days') p.add_argument('--redo', action='store_true') p = sps.add_parser('import-records', help="""Prepare the target obsdb for updates by pulling in rows from a primary "upstream" obsdb.""") add_args(p, '--config-file') p = sps.add_parser('check', help="""Read the config file and test datasets by attempting to instantiate them. This should not normally touch the underlying data or attempt to load new data from base sources.""") add_args(p, '--config-file', '--dataset', '--time-range') p = sps.add_parser('test', help="""Perform the obsdb data reduction on a specified obs_id. Results are not saved to obsdb.""") add_args(p, '--config-file', '--dataset') p.add_argument('--query', '-q', help= """ Use this obsdb query to select records for computation, instead of passing specific obs_id.""") p.add_argument('--compare', action='store_true', help= """ If passed, the computed values will be compared to values already present in the obsdb.""" ) p.add_argument('obs_id', nargs='*', help= "obs_id (pass multiple) to compute results for.") p = sps.add_parser('run-job', help="""Run a job defined in the config file.""") add_args(p, '--config-file') p.add_argument('job_name', help= " Name of job to run (matched to entry in config['job_defs']).") return parser
[docs] def main( command : Optional[str]=None, verbose : Optional[bool]=None, config_file : Optional[str]=None, time_range : Optional[list]=None, dataset : Optional[list]=None, lookback_days : Optional[float]=None, query : Optional[str]=None, obs_id : Optional[list]=None, full_scan : Optional[bool]=None, redo : Optional[bool]=None, compare : Optional[bool]=None, job_name : Optional[str]=None, chain_count: Optional[int]=0, ): """Entry point for CLI or job runner. Some parameters are only used by some commands. """ if verbose is None: verbose = 0 if verbose > 0: ancil.logger.setLevel(logging.DEBUG) logger.handlers[0].setLevel(logging.DEBUG) # Try to prevent job loops... assert chain_count < 16 if time_range is None: if lookback_days is not None: now = time.time() time_range = (now - lookback_days * DAY, now) elif lookback_days is not None: raise RuntimeError("Do not pass both time_range and lookback_days.") if command == 'update-base-data': update_base_data(config_file, datasets=dataset, time_range=time_range, full_scan=full_scan, verbose=verbose) elif command == 'update-obsdb': update_obsdb(config_file, datasets=dataset, time_range=time_range, redo=redo, verbose=verbose) elif command == 'update-books': uodb_main(config_file, recency=lookback_days, fastwalk=True, overwrite=redo) elif command == 'import-records': import_records(config_file) elif command == 'check': # Just trial load the config, processors. print(f'Loading config file {config_file} ...') cfg = _get_config(config_file) for dataset, engine in _engines_iter(cfg, dataset): print(f' dataset={dataset}') for k, v in engine.check_base().items(): print(f' {k}: {v}') elif command == 'test': # Run on some obs_ids. cfg = _get_config(config_file) obsdb = core.metadata.ObsDb(cfg['target_obsdb']) if query: assert len(obs_id) == 0, "User passed --query and obs_id" items = list(iter(obsdb.query(query))) else: items = [] for o in obs_id: if (oi := obsdb.get(o)) is None: print(f'No obsdb record for "{o}", skipping test.') else: items.append(oi) results = [{} for r in items] for dataset, engine in _engines_iter(cfg, dataset): print(dataset) engine.register_friends(cfg['datasets']) ei = iter(engine.getter(items, results)) for o, r in zip(items, results): _r = next(ei) r.update(_r) print(o['obs_id'], _r) if compare: ex = obsdb.get(o['obs_id']) for k1, k2 in engine._obsdb_map().items(): print(f' {k2:<20s}', ex.get(k2), r[k1]) print() print() elif command == 'run-job': cfg = _get_config(config_file) for job_def in cfg.get('job_defs', []): if job_def['name'] == job_name: break else: raise RuntimeError(f"Job '{job_name}' not found in {config_file} job_defs entry.") fail_count = 0 for step in job_def['steps']: print('Step ...') step_cfg = {'config_file': config_file} | step ignore_fail = step_cfg.pop('ignore_fail', False) carry_fail = step_cfg.pop('carry_fail', False) soldier_on = ignore_fail or carry_fail try: main(**step_cfg, chain_count=chain_count+1) except Exception as e: logger.error(f"Error on step {step}") if soldier_on: logger.warn(f"Step raised error (carry={carry_fail}); " "will continue with next steps.") logger.warn(f"Exception was: {e}") if carry_fail: fail_count += 1 else: raise e if fail_count: raise RuntimeError(f"Job carry-failed on {fail_count} errors.") else: raise RuntimeError(f"Invalid command: {command}")