import os
import logging
import time
import sys
import copy
import yaml
import numpy as np
import h5py
import traceback
import inspect
from pathlib import Path
import re
from tqdm import tqdm
from sotodlib.hwp import hwp_angle_model
from sotodlib.coords import demod as demod_mm
from sotodlib.tod_ops import t2pleakage
from sotodlib.core.flagman import has_any_cuts
from sotodlib.site_pipeline.jobdb import JState
from sotodlib.core.util import H5ContextManager
from .. import core
from . import Pipeline
[docs]
class PreprocessErrors:
"""Stores the various errors that can occur from the preprocessing
functions.
"""
LoadSuccess = "load_success"
GetGroupsError = "get_groups_error"
MetaDataError = "get_meta_data_error"
NoDetsRemainError = "no_dets_remain_error"
NoGroupOverlapError = "no_group_overlap_error"
MultilayerPipelineLoadError = "multilayer_load_and_preprocess_error"
SingleLayerPipelineLoadError = "single_layer_load_and_preprocess_error"
PipeLineRunError = "pipeline_run_error"
InitPipeLineRunError = "init_pipeline_run_error"
ProcPipeLineRunError = "proc_pipeline_run_error"
PipeLineStepError = "pipeline_step_error"
NoInitDbError = "no_init_db_error"
GroupOutputError = "group_output_error"
ExecutorFutureError = "executor_future_error"
SkipMissingError = "skip_missing_error"
[docs]
@classmethod
def get_errors(cls, e):
errmsg = f'{type(e)}: {e}'
tb = ''.join(traceback.format_tb(e.__traceback__))
return errmsg, tb
def _get_aman_encodings(encodings, field):
"""Encodings for flacarray compression."""
if (
isinstance(field, np.ndarray)
and np.issubdtype(field.dtype, np.number)
and not np.isnan(field).any()
):
encodings["type"] = "flacarray"
if np.issubdtype(field.dtype, np.floating):
if field.dtype == np.float32:
quanta = 1e-7
else:
quanta = 1e-10
encodings["args"] = {
"level": 5,
"quanta": quanta,
}
return
elif isinstance(field, core.AxisManager):
for name in field._assignments:
subfield = field[name]
encodings[name] = {}
_get_aman_encodings(encodings[name], subfield)
[docs]
def filter_preproc_runlist_by_jobdb(jdb, jclass, db, run_list, group_by,
overwrite=False, logger=None):
"""Given a preprocess_tod or multilayer_preprocess_tod run list, checks
whether that entry exists in the preprocess jobdb. If it failed or is done
and overwrite is False, add it to the list of skipped obs_ids. If it
doesn't exist, is open, or is done but overwite is True, add an open job
to the jobdb.
Arguments
---------
jdb : JobManager
The preprocessing jobdb.
jclass : str
The jobdb class name.
db : ManifestDb or None
Preprocessing database.
run_list : list
List of (obs_id, group) tuples.
group_by : list
How grouping is being done for preprocessing. Specified in the
preprocessing config through the subobs.use entry.
overwrite : bool
Whether or not to overwrite entries in the preprocessing db.
logger : PythonLogger
A python logger.
Returns
-------
run_list : list
Run list with the subset of skipped entries removed.
"""
if logger is None:
logger = init_logger("preprocess")
run_list_skipped = []
failed = 0
done = 0
existing_jobs = jdb.get_jobs(jclass)
tags_to_job = {
frozenset({k: v for k, v in j.tags.items() if k != 'error'}.items()): j
for j in existing_jobs
}
new_jobs = []
for r in tqdm(run_list, total=len(run_list),
desc="filtering by jobdb"):
tags = {}
tags["obs:obs_id"] = r[0]
for gb, g in zip(group_by, r[1]):
tags['dets:' + gb] = g
job = tags_to_job.get(frozenset(tags.items()), None)
if not job:
tags["error"] = None
new_jobs.append(jdb.create_job(jclass, tags=tags, commit=False,
check_existing=False))
elif job.jstate in [JState.done, JState.failed]:
found = True
if job.jstate == JState.done and db is not None:
x = db.inspect({'obs:obs_id': r[0]})
found = any(
[a[f'dets:{gb}'] for gb in group_by] == r[1]
for a in x
)
if overwrite or not found:
with jdb.locked(job) as j:
if overwrite == True:
j.jstate = "open"
for _t in j._tags:
if _t.key == "error":
_t.value = None
else:
if job.jstate == JState.done:
done += 1
elif job.jstate == JState.failed:
failed += 1
run_list_skipped.append(r)
jdb.commit_jobs(new_jobs)
logger.info(f"skipping {done} done jobs and {failed} failed jobs from jobdb")
run_list = [r for r in run_list if r not in run_list_skipped]
return run_list
def reopen_failed_preproc_jobs(jdb, jclass, error=None):
"""Helper function to re-open jobs that failed with particular errors
so that they can be re-run with the run_from_jobdb argument.
Arguments
---------
jdb : JobManager or str
The preprocessing jobdb or a path to an existing jobdb file.
jclass : str
The jobdb class name.
error : str or None
The PreprocessError name to re-open. If None, re-open all failed jobs.
"""
if isinstance(jdb, str):
if not os.path.isfile(jdb):
return
jdb = JobManager(sqlite_file=jdb)
failed_jobs = jdb.get_jobs(jclass=jclass, jstate=JState.failed)
for job in failed_jobs:
if job.tags['error'] == error or error is None:
with jdb.locked(job) as j:
j.jstate=JState.open
for _t in j._tags:
if _t.key == "error":
_t.value = None
class ArchivePolicy:
"""Storage policy assistance. Helps to determine the HDF5
filename and dataset name for a result.
Make me better!
"""
@staticmethod
def from_params(params):
if params['type'] == 'simple':
return ArchivePolicy(**params)
if params['type'] == 'directory':
return DirectoryArchivePolicy(**params)
raise ValueError('No handler for "type"="%s"' % params['type'])
def __init__(self, **kwargs):
self.filename = kwargs['filename']
def get_dest(self, product_id):
"""Returns (hdf_filename, dataset_addr).
"""
return self.filename, product_id
class DirectoryArchivePolicy:
"""Storage policy for stuff organized directly on the filesystem.
"""
def __init__(self, **kwargs):
self.root_dir = kwargs['root_dir']
self.pattern = kwargs['pattern']
def get_dest(self, **kw):
"""Returns full path to destination directory.
"""
return os.path.join(self.root_dir, self.pattern.format(**kw))
class _ReltimeFormatter(logging.Formatter):
def __init__(self, *args, t0=None, **kw):
super().__init__(*args, **kw)
if t0 is None:
t0 = time.time()
self.start_time = t0
def formatTime(self, record, datefmt=None):
if datefmt is None:
datefmt = '%8.3f'
return datefmt % (record.created - self.start_time)
[docs]
def init_logger(name, announce='', verbosity=2):
"""Configure and return a logger for site_pipeline elements. It is
disconnected from general sotodlib (propagate=False) and displays
relative instead of absolute timestamps.
Arguments
----------
name : str
The name of the logger
announce : str
Initial message to be displayed after logger is instantiated.
verbosity : int
Level of logger output
0: Error
1: Warning
2: Info
3: Debug
Returns
-------
logger : PythonLogger
The initialized logger object
"""
logger = logging.getLogger(name)
if verbosity == 0:
level = logging.ERROR
elif verbosity == 1:
level = logging.WARNING
elif verbosity == 2:
level = logging.INFO
elif verbosity == 3:
level = logging.DEBUG
# add handler only if it doesn't exist
if len(logger.handlers) == 0:
ch = logging.StreamHandler(sys.stdout)
formatter = _ReltimeFormatter('%(asctime)s: %(message)s (%(levelname)s)')
ch.setLevel(level)
ch.setFormatter(formatter)
logger.addHandler(ch)
i, r = formatter.start_time // 1, formatter.start_time % 1
text = (time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(i))
+ (',%03d' % (r*1000)))
logger.info(f'{announce}Log timestamps are relative to {text}')
else:
for handler in logger.handlers:
if isinstance(handler, logging.StreamHandler):
handler.setLevel(level)
break
logger.propagate = False
logger.setLevel(logging.DEBUG)
return logger
[docs]
def get_preprocess_context(configs, context=None):
"""Load the provided config file and context file. To be used in
``preprocess_*.py`` site pipeline scripts. If the provided context
file does not have a metadata entry for preprocess then one will
be added based on the definition in the config file.
Arguments
----------
configs : str or dict
The configuration file or dictionary.
context : str or core.Context, optional
The context to use. If None, it is created from the configuration file.
Returns
-------
configs : dict
The configuration dictionary.
context : core.Context
The context file.
"""
if type(configs) == str:
configs = yaml.safe_load(open(configs, "r"))
if context is None:
context = core.Context(configs["context_file"])
if type(context) == str:
context = core.Context(context)
# if context doesn't have the preprocess archive it in add it
# allows us to use same context before and after calculations
found=False
if context.get("metadata") is None:
context["metadata"] = []
for key in context.get("metadata"):
if key.get("name") == "preprocess" or key.get("label") == "preprocess":
found=True
break
if not found:
context["metadata"].append(
{
"db" : configs["archive"]["index"],
"name" : "preprocess"
}
)
return configs, context
[docs]
def get_groups(obs_id, configs, context=None):
"""Get subobs group method and groups. To be used in
``preprocess_*.py`` site pipeline scripts.
Arguments
----------
obs_id : str
The obsid.
configs : str or dict
The configuration dictionary.
context : core.Context
The Context file to use.
Returns
-------
group_by : list of str
The list of keys used to group the detectors.
groups : list of list of int
The list of groups of detectors.
errors : tuple
Tuple of errors or Nones.
"""
try:
if type(configs) == str:
configs = yaml.safe_load(open(configs, "r"))
if context is None:
context = core.Context(configs["context_file"])
group_by = np.atleast_1d(configs['subobs'].get('use', 'detset'))
for i, gb in enumerate(group_by):
if gb.startswith('dets:'):
group_by[i] = gb.split(':',1)[1]
if (gb == 'detset') and (len(group_by) == 1):
groups = context.obsfiledb.get_detsets(obs_id)
return group_by, [[g] for g in groups], (None, None, None)
det_info = context.get_det_info(obs_id)
rs = det_info.subset(keys=group_by).distinct()
groups = [[b for a,b in r.items()] for r in rs]
return group_by, groups, (None, None, None)
except Exception as e:
error = PreprocessErrors.GetGroupsError
errmsg, tb = PreprocessErrors.get_errors(e)
return [], [], (error, errmsg, tb)
[docs]
def get_preprocess_db(configs, group_by, logger=None):
"""Get or create a ManifestDb found for a given
config.
Arguments
----------
configs : dict
The configuration dictionary.
group_by : list of str
The list of keys used to group the detectors.
logger : PythonLogger
Optional. Logger object. If None, a new logger
is created.
Returns
-------
db : ManifestDb
ManifestDb object
"""
if logger is None:
logger = init_logger("preprocess")
if os.path.exists(configs['archive']['index']):
db = core.metadata.ManifestDb(configs['archive']['index'])
else:
logger.debug(f"Creating {configs['archive']['index']} as the "
"archive index.")
scheme = core.metadata.ManifestScheme()
scheme.add_exact_match('obs:obs_id')
for gb in group_by:
scheme.add_exact_match('dets:' + gb)
scheme.add_data_field('dataset')
db = core.metadata.ManifestDb(
configs['archive']['index'],
scheme=scheme
)
return db
[docs]
def swap_archive(config, fpath):
"""Update the configuration archive policy filename,
create an output archive directory if it doesn't exist,
and return a copy of the config.
Arguments
----------
configs : dict
The configuration dictionary.
fpath : str
The archive policy filename to write to.
Returns
-------
tc : dict
Copy of the configuration file with an updated archive policy filename
"""
tc = copy.deepcopy(config)
tc['archive']['policy']['filename'] = os.path.join(os.path.dirname(tc['archive']['policy']['filename']), fpath)
dname = os.path.dirname(tc['archive']['policy']['filename'])
os.makedirs(dname, exist_ok=True)
return tc
[docs]
def load_preprocess_det_select(obs_id, configs, context=None,
dets=None, meta=None, logger=None):
"""Loads the metadata information for the Observation and runs through any
data selection specified by the Preprocessing Pipeline.
Arguments
----------
obs_id : multiple
Passed to `context.get_obs` to load AxisManager, see Notes for
`context.get_obs`
configs : string or dictionary
Config file or loaded config directory
context : core.Context
The Context file to use.
dets : dict
Dets to restrict on from info in det_info. See context.get_meta.
meta : AxisManager
Contains supporting metadata to use for loading.
Can be pre-restricted in any way. See context.get_meta.
logger : PythonLogger
Optional. Logger object. If None, a new logger
is created.
Returns
-------
list
Restricted list of detector vals.
"""
if logger is None:
logger = init_logger("preprocess")
configs, context = get_preprocess_context(configs, context)
pipe = Pipeline(configs["process_pipe"], logger=logger)
if meta is None:
meta = context.get_meta(obs_id, dets=dets)
logger.info("Restricting detectors on all processes")
keep_all = np.ones(meta.dets.count,dtype=bool)
for process in pipe[:]:
keep = process.select(meta, in_place=False)
if isinstance(keep, np.ndarray):
keep_all &= keep
return meta.dets.vals[keep_all]
[docs]
def load_and_preprocess(obs_id, configs, context=None, dets=None, meta=None,
no_signal=None, logger=None, return_full_aman=False):
"""Loads the saved information from the preprocessing pipeline and runs
the processing section of the pipeline.
Assumes preprocess_tod has already been run on the requested observation.
Arguments
----------
obs_id : multiple
Passed to `context.get_obs` to load AxisManager, see Notes for
`context.get_obs`
configs : string or dictionary
Config file or loaded config directory
context : core.Context
Optional. The Context file to use.
dets : dict
Dets to restrict on from info in det_info. See context.get_meta.
meta : AxisManager
Contains supporting metadata to use for loading.
Can be pre-restricted in any way. See context.get_meta.
no_signal : bool
If True, signal will be set to None.
This is a way to get the axes and pointing info without
the (large) TOD blob. Not all loaders may support this.
logger : PythonLogger
Optional. Logger object. If None, a new logger
is created.
return_full_aman : bool
Optional. Return unrestricted axis manager alongside restricted aman
if True, otherwise return None.
Returns
-------
aman : core.AxisManager or None
Loaded and restricted axis manager with preprocessing metadata. Returns
``None`` if all detectors cut.
full_aman : core.AxisManager or None
Unrestricted preprocessing axis manager. Used when running multilayer
pipeline to ensure saved detector axis has the full size when saving
metadata.
"""
if logger is None:
logger = init_logger("preprocess")
configs, context = get_preprocess_context(configs, context)
meta = context.get_meta(obs_id, dets=dets, meta=meta)
if return_full_aman:
full_aman = meta.preprocess.copy()
else:
full_aman = None
if (
'valid_data' in meta.preprocess and
isinstance(meta.preprocess.valid_data, core.AxisManager)
):
keep = has_any_cuts(meta.preprocess.valid_data.valid_data)
meta.restrict("dets", keep)
else:
det_vals = load_preprocess_det_select(obs_id, configs=configs, context=context,
dets=dets, meta=meta, logger=logger)
meta.restrict("dets", [d for d in meta.dets.vals if d in det_vals])
if meta.dets.count == 0:
logger.info(f"No detectors left after cuts in obs {obs_id}")
return None
else:
pipe = Pipeline(configs["process_pipe"], logger=logger)
aman = context.get_obs(meta, no_signal=no_signal)
pipe.run(aman, aman.preprocess, select=False)
return aman, full_aman
[docs]
def multilayer_load_and_preprocess(obs_id, configs_init, configs_proc,
dets=None, meta=None, no_signal=None,
logger=None, init_only=False,
ignore_cfg_check=False,
stop_for_sims=False):
"""Loads the saved information from the preprocessing pipeline from a
reference and a dependent database and runs the processing section of
the pipeline for each.
Assumes preprocess_tod and multilayer_preprocess_tod have already been run
on the requested observation.
Arguments
----------
obs_id : multiple
Passed to `context.get_obs` to load AxisManager, see Notes for
`context.get_obs`
configs_init : string or dictionary
Config file or loaded config directory
configs_proc : string or dictionary
Second config file or loaded config dictionary to load
dependent databases generated using multilayer_preprocess_tod.py.
dets : dict
Dets to restrict on from info in det_info. See context.get_meta.
meta : AxisManager
Contains supporting metadata to use for loading.
Can be pre-restricted in any way. See context.get_meta.
no_signal : bool
If True, signal will be set to None.
This is a way to get the axes and pointing info without
the (large) TOD blob. Not all loaders may support this.
logger : PythonLogger
Optional. Logger object or None will generate a new one.
init_only : bool
Optional. If True, do not run the dependent pipeline.
ignore_cfg_check : bool
If True, do not attempt to validate that configs_init is the same as
the config used to create the existing init db.
stop_for_sims: bool
Optinal. If True, will stop before each step of the pipeline
with the flag `use_data_aman` set to True. The intended use is
to prepare all necessary data products that cannot be stored in
the preprocessing database, to process simulations.
Returns
-------
aman : core.AxisManager or None
Loaded and restricted axis manager with preprocessing metadata. Returns
``None`` if all detectors cut.
"""
if logger is None:
logger = init_logger("preprocess")
configs_init, context_init = get_preprocess_context(configs_init)
meta_init = context_init.get_meta(obs_id, dets=dets, meta=meta)
configs_proc, context_proc = get_preprocess_context(configs_proc)
meta_proc = context_proc.get_meta(obs_id, dets=dets, meta=meta)
# Count number of stops
if stop_for_sims:
num_stops = 0
for process in configs_init["process_pipe"]:
if process.get("use_data_aman", False):
num_stops += 1
for process in configs_proc["process_pipe"]:
if process.get("use_data_aman", False):
num_stops += 1
logger.warning(
"Currently running with `stop_for_sims=True`. "
f"It will generate {num_stops} additional copies "
"of the data AxisManager with a higher memory usage."
)
group_by_init = np.atleast_1d(configs_init['subobs'].get('use', 'detset'))
group_by_proc = np.atleast_1d(configs_proc['subobs'].get('use', 'detset'))
if (group_by_init != group_by_proc).any():
raise ValueError('init and proc groups do not match')
if meta_init.dets.count == 0 or meta_proc.dets.count == 0:
logger.info(f"No detectors in obs {obs_id}")
return None
else:
pipe_init = Pipeline(configs_init["process_pipe"], logger=logger)
if not ignore_cfg_check:
aman_cfgs_ref = get_pcfg_check_aman(pipe_init)
if (
ignore_cfg_check or
check_cfg_match(aman_cfgs_ref, meta_proc.preprocess['pcfg_ref'],
logger=logger)
):
pipe_proc = Pipeline(configs_proc["process_pipe"], logger=logger)
logger.info("Restricting detectors on all init pipeline processes")
if (
'valid_data' in meta_init.preprocess and
isinstance(meta_init.preprocess.valid_data, core.AxisManager)
):
keep_all = has_any_cuts(meta_init.preprocess.valid_data.valid_data)
else:
keep_all = np.ones(meta_init.dets.count,dtype=bool)
for process in pipe_init[:]:
keep = process.select(meta_init, in_place=False)
if isinstance(keep, np.ndarray):
keep_all &= keep
meta_init.restrict("dets", meta_init.dets.vals[keep_all])
meta_proc.restrict("dets", meta_init.dets.vals)
logger.info("Restricting detectors on all proc pipeline processes")
if (
'valid_data' in meta_proc.preprocess and
isinstance(meta_proc.preprocess.valid_data, core.AxisManager)
):
keep_all = has_any_cuts(meta_proc.preprocess.valid_data.valid_data)
else:
keep_all = np.ones(meta_proc.dets.count, dtype=bool)
for process in pipe_proc[:]:
keep = process.select(meta_proc, in_place=False)
if isinstance(keep, np.ndarray):
keep_all &= keep
meta_proc.restrict("dets", meta_proc.dets.vals[keep_all])
meta_init.restrict('dets', meta_proc.dets.vals)
aman = context_init.get_obs(meta_init, no_signal=no_signal)
logger.info("Running initial pipeline")
if stop_for_sims:
out_amans_init = run_pipeline_stepgroups(
pipe_init,
aman,
run_last_step=not(init_only)
)
if init_only:
return out_amans_init
else:
pipe_init.run(aman, aman.preprocess, select=False)
if init_only:
return aman
logger.info("Running dependent pipeline")
if stop_for_sims:
aman = out_amans_init[(len(pipe_init), 'last')]
proc_aman = context_proc.get_meta(obs_id, meta=aman)
if 'valid_data' in aman.preprocess:
aman.preprocess.move('valid_data', None)
aman.preprocess.merge(proc_aman.preprocess)
if stop_for_sims:
out_amans = run_pipeline_stepgroups(
pipe_proc,
out_amans_init[(len(pipe_init), 'last')],
)
out_amans.update({
(step, name): out_amans_init[(step, name)]
for (step, name) in out_amans_init
if name != 'last'
})
return out_amans
else:
pipe_proc.run(aman, aman.preprocess, select=False)
return aman
else:
raise ValueError('Dependency check between configs failed.')
def run_pipeline_stepgroups(pipe, aman, run_last_step=False):
"""
Run a Pipeline object, grouping steps based on
the flag `use_data_aman` in the configuration
file.
Arguments
----------
pipe : Pipeline
Pipeline object to run.
aman : AxisManager
AxisManager to process.
run_last_step : bool
If True, will create a dict item containing the
AxisManager after run the full pipeline.
"""
batch_idx = [
(step, process.name)
for step, process in enumerate(pipe)
if process.use_data_aman
]
if batch_idx or run_last_step:
batch_idx = [(0, pipe[0].name)] + batch_idx
if run_last_step:
batch_idx += [(len(pipe), 'last')]
pipes = {}
for idx in range(len(batch_idx)-1):
start, start_name = batch_idx[idx]
end, end_name = batch_idx[idx+1]
# If asked to stop at the first process
# one needs to save the current state of
# the AxisManager
if end == 0:
pipes[end, end_name] = None
else:
pipes[end, end_name] = pipe[start:end]
out_amans = {}
loc_aman = aman.copy()
for (step, name), pipe in pipes.items():
if pipe is not None:
pipe.run(loc_aman, aman.preprocess, select=False)
out_amans[step, name] = loc_aman.copy()
return out_amans
else:
return {}
[docs]
def multilayer_load_and_preprocess_sim(obs_id, configs_init, configs_proc,
sim_map, meta=None,
logger=None, init_only=False,
ignore_cfg_check=False,
data_amans=None,
interpol=None):
"""Loads the saved information from the preprocessing pipeline from a
reference and a dependent database, loads the signal from a (simulated)
map into the AxisManager and runs the processing section of the pipeline
for both databases.
Assumes preprocess_tod and multilayer_preprocess_tod have already been run
on the requested observation.
Arguments
----------
obs_id : multiple
Passed to `context.get_obs` to load AxisManager, see Notes for
`context.get_obs`
configs_init : string or dictionary
Config file or loaded config directory
configs_proc : string or dictionary
Second config file or loaded config dictionary to load
dependent databases generated using multilayer_preprocess_tod.py.
sim_map : numpy.ndmap or enmap.ndmap
Input simulated map to be observed
meta : AxisManager
Contains supporting metadata to use for loading.
Can be pre-restricted in any way. See context.get_meta.
no_signal : bool
If True, signal will be set to None.
This is a way to get the axes and pointing info without
the (large) TOD blob. Not all loaders may support this.
logger : PythonLogger
Optional. Logger object or None will generate a new one.
init_only : bool
Optional. Whether or not to run the dependent pipeline.
ignore_cfg_check : bool
If True, do not attempt to validate that configs_init is the same as
the config used to create the existing init db.
data_amans: dict (Optional)
A dictionary of AxisManagers with keys (step, process.name)
filled with AxisManager processed up to step-1. This is used
to pre-load all data AxisManager which could be required when
processing simulations (e.g. to provide a T2P template)
interpol: str
Optional. The sub-pixel interpolation to use in from_map
Returns
-------
aman : core.AxisManager or None
Loaded and restricted axis manager with preprocessing metadata. Returns
``None`` if all detectors cut.
"""
if logger is None:
logger = init_logger("preprocess")
configs_init, context_init = get_preprocess_context(configs_init)
meta_init = context_init.get_meta(obs_id, meta=meta)
configs_proc, context_proc = get_preprocess_context(configs_proc)
meta_proc = context_proc.get_meta(obs_id, meta=meta)
group_by_init = np.atleast_1d(configs_init['subobs'].get('use', 'detset'))
group_by_proc = np.atleast_1d(configs_proc['subobs'].get('use', 'detset'))
if (group_by_init != group_by_proc).any():
raise ValueError('init and proc groups do not match')
if meta_init.dets.count == 0 or meta_proc.dets.count == 0:
logger.info(f"No detectors in obs {obs_id}")
return None
else:
pipe_init = Pipeline(configs_init["process_pipe"], logger=logger)
if not ignore_cfg_check:
aman_cfgs_ref = get_pcfg_check_aman(pipe_init)
if ignore_cfg_check or check_cfg_match(
aman_cfgs_ref,
meta_proc.preprocess['pcfg_ref'],
logger=logger):
pipe_proc = Pipeline(configs_proc["process_pipe"], logger=logger)
logger.info("Restricting detectors on all init pipeline processes")
if (
'valid_data' in meta_init.preprocess and
isinstance(meta_init.preprocess.valid_data, core.AxisManager)
):
keep_all = has_any_cuts(meta_init.preprocess.valid_data.valid_data)
else:
keep_all = np.ones(meta_init.dets.count,dtype=bool)
for process in pipe_init[:]:
keep = process.select(meta_init, in_place=False)
if isinstance(keep, np.ndarray):
keep_all &= keep
meta_init.restrict("dets", meta_init.dets.vals[keep_all])
meta_proc.restrict("dets", meta_init.dets.vals)
logger.info("Restricting detectors on all proc pipeline processes")
if (
'valid_data' in meta_proc.preprocess and
isinstance(meta_proc.preprocess.valid_data, core.AxisManager)
):
keep_all = has_any_cuts(meta_proc.preprocess.valid_data.valid_data)
else:
keep_all = np.ones(meta_proc.dets.count, dtype=bool)
for process in pipe_proc[:]:
keep = process.select(meta_proc, in_place=False)
if isinstance(keep, np.ndarray):
keep_all &= keep
meta_proc.restrict("dets", meta_proc.dets.vals[keep_all])
meta_init.restrict('dets', meta_proc.dets.vals)
aman = context_init.get_obs(meta_proc, no_signal=True)
# One needs to correct HWP model and gamma
# before loading in the simulated map
aman = hwp_angle_model.apply_hwp_angle_model(aman)
if "wiregrid_cal" in aman.det_info:
logger.info(f"gamma from wiregrid_cal")
aman.move(
name="det_info.wiregrid_cal.gamma",
new_name="focal_plane.gamma"
)
aman.move("signal", None)
logger.info("Reading in simulated map")
demod_mm.from_map(aman, sim_map, wrap=True, modulated=True, interpol=interpol)
logger.info("Running initial pipeline")
pipe_init.run(aman, aman.preprocess, sim=True)
if init_only:
return aman
logger.info("Running dependent pipeline")
proc_aman = context_proc.get_meta(obs_id, meta=aman)
if 'valid_data' in aman.preprocess:
aman.preprocess.move('valid_data', None)
aman.preprocess.merge(proc_aman.preprocess)
pipe_proc.run(aman, aman.preprocess, sim=True, data_amans=data_amans)
return aman
else:
raise ValueError('Dependency check between configs failed.')
[docs]
def find_db(obs_id, configs, dets, context=None, logger=None):
"""This function checks if the manifest db from
a config file exists and searches if it contains
an entry for the provided Obs id and set of detectors.
Arguments
----------
obs_id : str
Obs id to process or load
configs : str or dict
Filepath or dictionary containing the preprocess configuration file.
dets : dict
Dictionary specifying which detectors/wafers to load see ``Context.obsdb.get_obs``.
context : core.Context
Optional. Context object used for data loading/querying.
logger : PythonLogger
Optional. Logger object or None will generate a new one.
Returns
-------
dbexist : bool
True if db exists and entry for input detectors is found.
"""
if logger is None:
logger = init_logger("preprocess")
if type(configs) == str:
configs = yaml.safe_load(open(configs, "r"))
if context is None:
context = core.Context(configs["context_file"])
group_by = np.atleast_1d(configs['subobs'].get('use', 'detset'))
cur_groups = [list(np.fromiter(dets.values(), dtype='<U32'))]
dbexist = True
if os.path.exists(configs['archive']['index']):
db = core.metadata.ManifestDb(configs['archive']['index'])
dbix = {'obs:obs_id':obs_id}
for gb, g in zip(group_by, cur_groups[0]):
dbix[f'dets:{gb}'] = g
if len(db.inspect(dbix)) == 0:
dbexist = False
logger.debug(f"Entry {dbix} not found in {configs['archive']['index']}")
else:
logger.debug(f"Entry {dbix} found in {configs['archive']['index']}")
else:
dbexist = False
return dbexist
def cleanup_archive(configs, logger=None):
"""This function finds the final preprocess archive file and deletes any
datasets that are not found in the preprocess database. This helps avoid
cases where the database writing was interrupted in a previous run.
Arguments
----------
configs : str or dict
Filepath or dictionary containing the preprocess configuration file.
logger : PythonLogger
Optional. Logger object or None will generate a new one.
"""
if type(configs) == str:
configs = yaml.safe_load(open(configs, "r"))
if logger is None:
logger = init_logger("preprocess")
if os.path.exists(configs['archive']['index']):
db = core.metadata.ManifestDb(configs['archive']['index'])
basename = os.path.splitext(os.path.basename(configs["archive"]["policy"]["filename"]))[0]
# remove datasets from last archive file if they are not in db
archive_files = list(
Path(os.path.dirname(configs["archive"]["policy"]["filename"])).rglob(f"{basename}*.h5")
)
pattern = re.compile(r"\d+")
archive_files = [p for p in archive_files if pattern.findall(p.stem)]
if archive_files:
latest_file = max([(int(pattern.findall(p.stem)[-1]), p)
for p in archive_files if pattern.findall(p.stem)],
key=lambda t: t[0])[1]
db_datasets = [d['dataset'] for d in db.inspect()]
with H5ContextManager(latest_file, mode="r+") as f:
keys = list(f.keys())
for key in keys:
if key not in db_datasets:
logger.debug(f"{key} not found in db. deleting it from {latest_file}.")
del f[key]
db.conn.close()
[docs]
def get_preproc_group_out_dict(obs_id, configs, dets, context=None, subdir='temp'):
"""This function returns a dictionary containing the data destination filename
and the values to populate the manifest db.
Arguments
----------
obs_id : str
Obs id to process or load
configs : str or dict
Filepath or dictionary containing the preprocess configuration file.
dets : dict
Dictionary specifying which detectors/wafers to load see
``Context.obsdb.get_obs``.
context : core.Context
Optional. Context object used for data loading/querying.
subdir : str
Optional. Subdirectory to save the output files into. If it does not
exist, it is created.
Returns
-------
outputs : dict
Dictionary including output filename of data file and information for
corresponding database entry.
"""
if type(configs) == str:
configs = yaml.safe_load(open(configs, "r"))
if context is None:
context = core.Context(configs["context_file"])
cur_groups = [list(np.fromiter(dets.values(), dtype='<U32'))]
group_by = np.atleast_1d(configs['subobs'].get('use', 'detset'))
newpath = f'{subdir}/{obs_id}'
for cg in cur_groups[0]:
newpath += f'_{cg}'
temp_config = swap_archive(configs, newpath+'.h5')
policy = ArchivePolicy.from_params(temp_config['archive']['policy'])
dest_file, dest_dataset = policy.get_dest(obs_id)
for gb, g in zip(group_by, cur_groups[0]):
if gb == 'detset':
dest_dataset += "_" + g
else:
dest_dataset += "_" + gb + "_" + str(g)
# Collect info for saving h5 file.
outputs = {}
outputs['temp_file'] = dest_file
# Collect index info.
db_data = {'obs:obs_id': obs_id,
'dataset': dest_dataset}
for gb, g in zip(group_by, cur_groups[0]):
db_data['dets:'+gb] = g
outputs['db_data'] = db_data
return outputs
[docs]
def save_group_and_cleanup(obs_id, configs, context=None, subdir='temp',
logger=None, remove=False):
"""This function checks if any temporary files exist from a preprocessing
run and will either add them to the config policy file and create an entry
in the manifest db by calling ``cleanup_mandb``. If the file exists but
cannot be opened or if remove is True, the file will be deleted. Remove
is intended to be to allow for overwrite=True in ``preprocess_tod.py``
and ``multilayer_preprocess_tod.py``.
Arguments
----------
obs_id : str
Obs id to process or load
configs : str or dict
Filepath or dictionary containing the preprocess configuration file.
context : core.Context
Optional. Context object used for data loading/querying.
subdir : str
Optional. Subdirectory to save the output files into.
If it does not exist, it is created.
logger : PythonLogger
Optional. Logger object or None will generate a new one.
remove : bool
Optional. Default is False. Whether to remove a file if found.
Used when ``overwrite`` is True in driving functions.
Returns
-------
errors : tuple
Error from get_groups.
"""
if logger is None:
logger = init_logger("preprocess")
if type(configs) == str:
configs = yaml.safe_load(open(configs, "r"))
if context is None:
context = core.Context(configs["context_file"])
group_by, groups, errors = get_groups(obs_id, configs)
all_groups = groups.copy()
for g in all_groups:
if 'wafer.bandpass' in group_by:
if 'NC' in g:
groups.remove(g)
continue
for g in groups:
dets = {gb:gg for gb, gg in zip(group_by, g)}
outputs_grp = get_preproc_group_out_dict(obs_id, configs,
dets, subdir=subdir)
if os.path.exists(outputs_grp['temp_file']):
try:
if not remove:
cleanup_mandb(outputs_grp, (obs_id, g),
(None, None, None), configs, logger)
else:
# if we're overwriting, remove file so it will re-run
os.remove(outputs_grp['temp_file'])
except OSError as e:
# remove if it can't be opened
os.remove(outputs_grp['temp_file'])
except Exception as e:
err_str = str(e)
if "destination object already exists" in err_str:
# remove temp file it was copied but not deleted
os.remove(outputs_grp['temp_file'])
else:
errmsg = f"{type(e).__name__}: {e}"
tb = ''.join(traceback.format_tb(e.__traceback__))
logger.error(
f"save_group_and_cleanup failed for {outputs_grp['temp_file']}:\n{errmsg}\n{tb}"
)
raise
return errors
[docs]
def cleanup_obs(obs_id, policy_dir, errlog, configs, context=None,
subdir='temp', remove=False):
"""For a given obs id, this function will search the policy_dir directory
if it exists for any files with that obsnum in their filename. If any are
found, it will run save_group_and_cleanup for that obs id.
Arguments
---------
obs_id : str
Obs id to check and clean up
policy_dir : str
Directory to temp per-group output files
errlog : str
Filepath to error logging file.
configs : str or dict
Filepath or dictionary containing the preprocess configuration file.
context : core.Context
Optional. Context object used for data loading/querying.
subdir : str
Optional. Subdirectory to save the output files into.
remove : bool
Optional. Default is False. Whether to remove a file if found.
Used when ``overwrite`` is True in driving functions.
"""
if os.path.exists(policy_dir):
found = False
for f in os.listdir(policy_dir):
if obs_id in f:
found = True
break
if found:
errors = save_group_and_cleanup(obs_id, configs, context,
subdir=subdir, remove=remove)
if errors[0] is not None:
with open(errlog, 'a') as f:
f.write(f"{time.time()}, {obs_id}, n/a', {errors[0]}\n")
f.write("\t" + (errors[1] or "") + (errors[2] or "") + "\n")
[docs]
def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None,
logger=None, overwrite=False, save_archive=False,
save_proc_aman=True, compress=False,
skip_missing=False, ignore_cfg_check=False):
"""
This function is expected to receive a single obs_id, and dets dictionary.
The dets dictionary must match the grouping specified in the preprocess
config files. It accepts either one or two config strings or dicts representing
an initial and a dependent pipeline stage. If the preprocess database entry for
this obsid-dets group already exists then this function will just load back the
processed tod calling either the ``load_and_preprocess`` or
``multilayer_load_and_preprocess`` functions. If the db entry does not exist or
the overwrite flag is set to True then the full preprocessing steps defined in
the configs are run and if save_proc_aman is True, the outputs are written to a
unique h5 file. Any errors, the info to populate the database, the file path of
the h5 file, and the process tod are returned from this function. Processed axis
managers can be written to an archive and database by using cleanup_mandb
(or setting save_archive to True) which consumes all of the outputs
(except the processed tod), writes to the database, and moves the multiple h5
files into fewer h5 files (each <= 10 GB).
Arguments
---------
obs_id : str
Obs id to process or load
configs_init : str or dict
Filepath or dictionary containing the preprocess configuration file.
dets : dict
Dictionary specifying which detectors/wafers to load see
``Context.obsdb.get_obs``.
configs_proc : str or dict
Filepath or dictionary containing a dependent preprocess configuration
file.
logger : PythonLogger
Optional. Logger object or None will generate a new one.
overwrite : bool
Optional. Whether or not to overwrite existing entries in the
preprocess manifest db.
save_archive : bool
Call cleanup_mandb if True to save to the archive and database files
in configs_init and configs_proc. Should be False if preproc_or_load_group
is being called from within a parallelized script (i.e. python multiprocessing or MPI).
save_proc_aman : bool
Whether or not to save the preprocessing axis manager. Required if saving into
a preprocessing archive.
compress : bool
Whether or not to compress the preprocessing data. Uses flacarray compression.
skip_missing : bool
Do not attempt to run preprocessing pipeline if either of the preproc dbs
don't exist or the obs_id and group combination is not found.
ignore_cfg_check : bool
If True, do not attempt to validate that configs_init is the same as the config
used to create the existing init db when running ``multilayer_load_and_preprocess``.
Returns
-------
aman : AxisManager or None
Preprocessed axis manager if preproc_or_load_group finished
successfully or None if it failed.
out_dict_init : dict or None
Dictionary output for init config from get_preproc_group_out_dict
if preprocessing ran successfully for init layer or ``None`` if
preprocessing was loaded or ``preproc_or_load_group`` failed.
out_dict_proc : dict or None
Dictionary output for proc config from get_preproc_group_out_dict
if preprocessing ran successfully for proc layer or ``None`` if
preprocessing was loaded, that layer was not run or loaded, or
``preproc_or_load_group`` failed.
errors : tuple
A tuple containing the error from PreprocessError, an error message,
and the traceback. Each will be None if preproc_or_load_group finished
successfully.
"""
init_temp_subdir = "temp"
proc_temp_subdir = "temp_proc"
if compress == True:
compress = "gzip"
else:
compress = None
if logger is None:
logger = init_logger("preprocess")
group = [list(np.fromiter(dets.values(), dtype='<U32'))][0]
# Do a try except around config and meta reading to catch metadata failures
try:
if type(configs_init) == str:
configs_init = yaml.safe_load(open(configs_init, "r"))
context_init = core.Context(configs_init["context_file"])
make_lmsi_init = configs_init.get("lmsi_config") is not None
if configs_proc is not None:
if type(configs_proc) == str:
configs_proc = yaml.safe_load(open(configs_proc, "r"))
context_proc = core.Context(configs_proc["context_file"])
make_lmsi_proc = configs_proc.get("lmsi_config") is not None
# Ensure grouping matches between init and proc configs
group_by_init = np.atleast_1d(configs_init['subobs'].get('use', 'detset'))
group_by_proc = np.atleast_1d(configs_proc['subobs'].get('use', 'detset'))
if (group_by_init != group_by_proc).any():
raise ValueError('init and proc groups do not match')
except Exception as e:
errmsg, tb = PreprocessErrors.get_errors(e)
logger.error(f"Get configs/context failed for {obs_id}: {group}\n{errmsg}\n{tb}")
return None, None, None, (PreprocessErrors.MetaDataError, errmsg, tb)
db_init_exist = find_db(obs_id, configs_init, dets, logger=logger)
if configs_proc is not None:
db_proc_exist = find_db(obs_id, configs_proc, dets, logger=logger)
else:
db_proc_exist = False
# Skip entries not in either db if the db exists but entry is not found.
# Setting overwrite to True will bypass this and re-run.
if not overwrite and skip_missing:
# init db exists but entry not in init db
if (
os.path.exists(configs_init['archive']['index'])
and not db_init_exist
):
logger.warn(f"{obs_id}: {group} not found in init db and skip missing={skip_missing}")
return None, None, None, (PreprocessErrors.SkipMissingError, None, None)
# proc db exists but entry not in proc db
if (
configs_proc is not None
and os.path.exists(configs_proc['archive']['index'])
and not db_proc_exist
):
logger.warn(f"{obs_id}: {group} not found in proc db and skip missing={skip_missing}")
return None, None, None, (PreprocessErrors.SkipMissingError, None, None)
# Cannot run if proc db exists but init db does not
if db_proc_exist and not db_init_exist and not overwrite:
logger.error("loading from proc db requires init db if overwrite is False")
return None, None, None, (PreprocessErrors.NoInitDbError, None, None)
# Load first layer only
if not overwrite:
if db_init_exist and not db_proc_exist:
out_dict_init = None
try:
# need unrestricted proc aman for second layer
if configs_proc is not None:
return_full_aman = True
else:
return_full_aman = False
logger.info(f"Loading and applying preprocessing for initial layer db on {obs_id}:{group}")
aman, proc_aman = load_and_preprocess(obs_id=obs_id, dets=dets, configs=configs_init,
logger=logger, return_full_aman=return_full_aman)
except Exception as e:
errmsg, tb = PreprocessErrors.get_errors(e)
logger.error(f"Initial layer Pipeline Load Error for {obs_id}: {group}\n{errmsg}\n{tb}")
return None, None, None, (PreprocessErrors.SingleLayerPipelineLoadError, errmsg, tb)
# Return if not running proc db
if configs_proc is None:
logger.info(f"preproc_or_load_group finished successfully for {obs_id}:{group}")
return aman, out_dict_init, None, (PreprocessErrors.LoadSuccess, None, None)
# Load first and second layer
elif db_init_exist and db_proc_exist:
try:
logger.info(f"Loading and applying preprocessing for both dbs on {obs_id}:{group}")
aman = multilayer_load_and_preprocess(obs_id=obs_id, dets=dets, configs_init=configs_init,
configs_proc=configs_proc, logger=logger,
ignore_cfg_check=ignore_cfg_check)
logger.info(f"preproc_or_load_group finished successfully for {obs_id}:{group}")
return aman, None, None, (PreprocessErrors.LoadSuccess, None, None)
except Exception as e:
errmsg, tb = PreprocessErrors.get_errors(e)
logger.error(f"Multilayer Pipeline Load Error for {obs_id}: {group}\n{errmsg}\n{tb}")
return None, None, None, (PreprocessErrors.MultilayerPipelineLoadError, errmsg, tb)
# Run first layer
if not db_init_exist or overwrite:
try:
logger.info(f"Generating new init db entry for {obs_id}: {group}")
pipe_init = Pipeline(configs_init["process_pipe"],
plot_dir=configs_init["plot_dir"],
logger=logger)
aman_cfgs_ref = get_pcfg_check_aman(pipe_init)
out_dict_init = get_preproc_group_out_dict(obs_id,
configs_init,
dets,
subdir=init_temp_subdir)
aman = context_init.get_obs(obs_id, dets=dets)
tags = np.array(context_init.obsdb.get(aman.obs_info.obs_id,
tags=True)['tags'])
aman.wrap('tags', tags)
proc_aman, success = pipe_init.run(aman)
aman.wrap('preprocess', proc_aman)
except Exception as e:
errmsg, tb = PreprocessErrors.get_errors(e)
logger.error(f"Pipeline Run Error for {obs_id}: {group}\n{errmsg}\n{tb}")
return None, None, None, (PreprocessErrors.InitPipeLineRunError, errmsg, tb)
if success != 'end':
logger.error(f"Init Pipeline Step Error for {obs_id}: {group}\nFailed at step {success}")
return None, None, None, (PreprocessErrors.PipeLineStepError, success, None)
if save_proc_aman:
logger.info(f"Saving preprocessing axis manager to "
f"{out_dict_init['temp_file']}:{out_dict_init['db_data']['dataset']}")
encodings = {}
if compress is not None:
_get_aman_encodings(encodings, proc_aman)
proc_aman.save(out_dict_init['temp_file'],
out_dict_init['db_data']['dataset'],
compression=compress,
encodings=encodings,
overwrite=overwrite)
if save_archive:
logger.info(f"Adding result to init db for {obs_id}: {group}")
cleanup_mandb(out_dict_init, (obs_id, group), (None, None, None),
configs_init, logger=logger, overwrite=overwrite)
# Make init plots
if make_lmsi_init:
new_plots = os.path.join(configs_init["plot_dir"],
f'{str(aman.timestamps[0])[:5]}',
aman.obs_info.obs_id)
from pathlib import Path
import lmsi.core as lmsi
if os.path.exists(new_plots):
lmsi.core([Path(x.name) for x in Path(new_plots).glob("*.png")],
Path(configs_init["lmsi_config"]),
Path(os.path.join(new_plots, 'index.html')))
# Return if not running proc db
if configs_proc is None:
logger.info(f"preproc_or_load_group finished successfully for {obs_id}:{group}")
return aman, out_dict_init, None, (None, None, None)
# Run second layer
if (not db_proc_exist or overwrite) and configs_proc is not None:
try:
logger.info(f"Generating new proc db entry for {obs_id}: {group}")
init_fields = aman.preprocess._fields.copy()
init_fields.pop('valid_data', None)
tags_proc = np.array(context_proc.obsdb.get(aman.obs_info.obs_id,
tags=True)['tags'])
if "tags" in aman._fields:
aman.move("tags", None)
aman.wrap('tags', tags_proc)
out_dict_proc = get_preproc_group_out_dict(obs_id,
configs_proc,
dets=dets,
subdir=proc_temp_subdir)
pipe_proc = Pipeline(configs_proc["process_pipe"],
plot_dir=configs_proc["plot_dir"], logger=logger)
proc_aman, success = pipe_proc.run(aman, full_aman=proc_aman)
pipe_init = Pipeline(configs_init["process_pipe"],
plot_dir=configs_init["plot_dir"],
logger=logger)
proc_aman.wrap('pcfg_ref', get_pcfg_check_aman(pipe_init))
for init_field in init_fields:
if init_field in proc_aman:
proc_aman.move(init_field, None)
except Exception as e:
errmsg, tb = PreprocessErrors.get_errors(e)
logger.error(f"Pipeline Run Error for {obs_id}: {group}\n{errmsg}\n{tb}")
return None, out_dict_init, None, (PreprocessErrors.ProcPipeLineRunError, errmsg, tb)
if success != 'end':
logger.error(f"Proc Pipeline Step Error for {obs_id}: {group}\nFailed at step {success}")
return None, out_dict_init, None, (PreprocessErrors.PipeLineStepError, success, None)
if save_proc_aman:
logger.info(f"Saving proc axis manager to "
f"{out_dict_proc['temp_file']}:{out_dict_proc['db_data']['dataset']}")
encodings = {}
if compress is not None:
_get_aman_encodings(encodings, proc_aman)
proc_aman.save(out_dict_proc['temp_file'],
out_dict_proc['db_data']['dataset'],
compression=compress,
encodings=encodings,
overwrite=overwrite)
if save_archive:
logger.info(f"Adding result to proc db for {obs_id}: {group}")
cleanup_mandb(out_dict_proc, (obs_id, group), (None, None, None),
configs_proc, logger=logger, overwrite=overwrite)
if 'valid_data' in aman.preprocess:
aman.preprocess.move('valid_data', None)
aman.preprocess.merge(proc_aman)
# Make proc plots
if make_lmsi_proc:
new_plots = os.path.join(configs_proc["plot_dir"],
f'{str(aman.timestamps[0])[:5]}',
aman.obs_info.obs_id)
from pathlib import Path
import lmsi.core as lmsi
if os.path.exists(new_plots):
lmsi.core([Path(x.name) for x in Path(new_plots).glob("*.png")],
Path(configs_proc["lmsi_config"]),
Path(os.path.join(new_plots, 'index.html')))
logger.info(f"preproc_or_load_group finished successfully for {obs_id}:{group}")
return aman, out_dict_init, out_dict_proc, (None, None, None)
[docs]
def cleanup_mandb(out_dict, out_meta, errors, configs, logger=None, overwrite=False, db_manager=None):
"""Function to update the manifest db when data is collected from the
``preproc_or_load_group`` function. If used in an mpi framework this
function is expected to be run from rank 0 after a ``comm.gather``.
See the ``preproc_or_load_group`` docstring for the varying expected
values of ``errors`` and the associated ``out_dict``. This function will
either:
1) Update the ManifestDb sqlite file and move the h5 archive from its
temporary location to its permanent path if errors[0] is ``None``, out_dict
is not``None``. Deletes the temporary h5 file.
2) Return nothing if errors[0] is ``PreprocessErrors.LoadSuccess`` or both it
and out_dict are None.
3) Otherwise, update the error log.
Arguments
---------
errors : tuple
A tuple containing the error from PreprocessError, an error message,
and the traceback. Each will be None if preproc_or_load_group finished
successfully.
out_meta : tuple
The tuple (obs_id, group).
outputs : dict
Dictionary including entries for the temporary h5 filename
('temp_file') and the obs_id group metadata and db entry (db_data).
See save_group for more info.
configs : dict
Preprocessing configuration dictionary.
logger : PythonLogger
Optional. Python logger.
overwrite : bool
Optional. Delete the entry in the archive file if it exists and
replace it with the new entry.
db_manager : DbBatchManager, optional
External database batch manager for optimized operations.
If provided, uses the manager instead of creating individual connections.
"""
if logger is None:
logger = init_logger("preprocess")
if out_dict is not None and os.path.isfile(out_dict['temp_file']):
obs_id, group = out_meta
logger.info(f"Adding future result to db for {obs_id}: {group}")
# Expects archive policy filename to be <path>/<filename>.h5 and then this adds
# <path>/<filename>_<xxx>.h5 where xxx is a number that increments up from 0
# whenever the file size exceeds 10 GB.
nfile = 0
folder = os.path.dirname(configs['archive']['policy']['filename'])
basename = os.path.splitext(configs['archive']['policy']['filename'])[0]
dest_file = basename + '_' + str(nfile).zfill(3) + '.h5'
if os.path.isabs(folder) and not(os.path.exists(folder)):
os.makedirs(folder)
while os.path.exists(dest_file) and os.path.getsize(dest_file) > 10e9:
nfile += 1
dest_file = basename + '_' + str(nfile).zfill(3) + '.h5'
group_by = [k.split(':')[-1] for k in out_dict['db_data'].keys() if 'dets' in k]
h5_path = os.path.relpath(dest_file,
start=os.path.dirname(configs['archive']['index']))
src_file = out_dict['temp_file']
with H5ContextManager(dest_file, mode='a') as f_dest:
with H5ContextManager(src_file, mode='r') as f_src:
for dts in f_src.keys():
# If the dataset or group already exists, delete it to overwrite
if overwrite and dts in f_dest:
del f_dest[dts]
f_src.copy(f_src[f'{dts}'], f_dest, f'{dts}')
for member in f_src[dts]:
if isinstance(f_src[f'{dts}/{member}'], h5py.Dataset):
f_src.copy(f_src[f'{dts}/{member}'], f_dest[f'{dts}'], f'{dts}/{member}')
if db_manager is not None:
# Use the batch manager
db_manager.add_entry(out_dict['db_data'], h5_path)
else:
# Use the original approach for backward compatibility
db = get_preprocess_db(configs, group_by, logger)
if len(db.inspect(out_dict['db_data'])) == 0:
db.add_entry(out_dict['db_data'], h5_path)
# make sure we close the db each time
db.conn.close()
os.remove(src_file)
elif (
errors[0] == PreprocessErrors.LoadSuccess or
(errors[0] is None and out_dict is None)
):
return
else:
folder = os.path.dirname(configs['archive']['index'])
if not(os.path.exists(folder)):
os.makedirs(folder)
errlog = os.path.join(folder, 'errlog.txt')
with open(errlog, 'a') as f:
f.write(f"{time.time()}, {out_meta[0]}, {out_meta[1]}, {errors[0]}\n")
f.write("\t" + (errors[1] or "") + (errors[2] or "") + "\n")
[docs]
def get_pcfg_check_aman(pipe):
"""
Given a preprocess pipeline class return an axis manager containing
the ordered steps of the pipeline with all arguments for each step.
Arguments
---------
pipe : _Preprocess class
Preprocess pipeline class from which to build the step argument axis
manager.
"""
pcfg_ref = core.AxisManager()
for i, pp in enumerate(pipe):
pcfg_ref.wrap(f'{i}_{pp.name}', core.AxisManager())
for memb in inspect.getmembers(pp, lambda a:not(inspect.isroutine(a))):
if not memb[0][0] == '_':
if type(memb[1]) is dict:
pcfg_ref[f'{i}_{pp.name}'].wrap(memb[0], core.AxisManager())
for itm in memb[1].items():
pcfg_ref[f'{i}_{pp.name}'][memb[0]].wrap(itm[0], str(itm[1]))
elif not np.isscalar(memb[1]):
pcfg_ref[f'{i}_{pp.name}'].wrap(memb[0], str(memb[1]))
else:
pcfg_ref[f'{i}_{pp.name}'].wrap(memb[0], memb[1])
return pcfg_ref
def _check_assignment_length(a, b):
"""
Helper function to check if the set of assignments in axis manager ``a`` matches
the length of assignments in axis manager ``b``.
Arguments
---------
a : AxisManager
Primary axis manager to cross check assignments with.
b : AxisManager
Secondary axis manager to cross check assignments with
"""
aa = np.fromiter(a._assignments.keys(), dtype='<U32')
bb = np.fromiter(b._assignments.keys(), dtype='<U32')
if len(aa) != len(bb):
return False, None, None
else:
return True, aa, bb
[docs]
def check_cfg_match(ref, loaded, logger=None):
"""
Checks that the ``ref`` and ``loaded`` axis managers containing the ordered
preprocess pipelines match one another.
Arguments
---------
ref : AxisManager
Reference axis manager for cross checking
loaded : AxisManager
Loaded axis manager for cross checking.
logger : PythonLogger
Optional. Python logger object.
"""
if logger is None:
logger = init_logger("preprocess")
check, ref_items, loaded_items = _check_assignment_length(ref, loaded)
if check:
for ri, li in zip (ref_items, loaded_items):
if ri != li:
logger.warning('Config check fails due to ordered pipeline element names not matching.')
return False
else:
if type(ref[ri]) is core.AxisManager and type(loaded[li]) is core.AxisManager:
check_cfg_match(ref[ri], loaded[li], logger)
elif ref[ri] == loaded[li]:
continue
else:
logger.warning(f'Config check fails due to arguments of {li} not matching')
return False
return True
else:
logger.warning('Config check fails due to pipeline list not being of equal length')
return False