Source code for sotodlib.io.hkdb

"""
Module for loading L3 housekeeping data using a database that
indexes an archive of HK files.

"""
from dataclasses import dataclass, field
import yaml
import os
import logging
import time
import numpy as np
import pathlib
import fnmatch
import hashlib

import sqlalchemy as db
try:
    from sqlalchemy.orm import declarative_base
except ImportError:
    # Deprecated since SQLAlchemy 2.0, keep for backwards compatibility
    from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from typing import Union, List, Optional, Dict, Any
import so3g
from spt3g import core as spt3g_core
from tqdm.auto import tqdm

Base = declarative_base()
Session = sessionmaker()

log = logging.getLogger(__name__)

[docs] @dataclass class HkConfig: """ Configuration object for indexing and loading from an HK archive. If instantiating from a nested dictionary, the ``from_dict`` class method can be used to convert fields to their proper data types. Args ------------ hk_root: str Root directory for the HK archive db_file: Optional[str] Path to the hk index database if the database is an sqlite file. Either this or db_url must be set db_url: Optional[Union[str, db.URL]] URL used for db engine. Either this or db_file must be set echo_db: bool Whether database operations should be echoed file_idx_lookback_time: Optional[float] Time [sec] to look back when scanning for new files to index show_index_pb: bool If true, shows progress bar when indexing aliases: Dict[str, str] Aliases for hk fields. In this dict, the key is the alias name, and the value is the field descriptor, in the format of ``agent.feed.field``. For example:: { 'fp_temp': 'cryo-ls372-lsa21yc.temperatures.Channel_02_T', } These aliases are only used on load, and do not affect how data is stored in the hkdb. Aliases should be valid python identifiers, since they will be set as attributes in the HkResult object. """ hk_root: str db_file: Optional[str] = None db_url: Optional[Union[str, db.URL]] = None echo_db: bool = False file_idx_lookback_time: Optional[float] = None show_index_pb: bool = True aliases: Dict[str, str] = field(default_factory=dict) def __post_init__(self): if self.db_file is None and self.db_url is None: raise ValueError("Either db_file or db_url must be set") if self.db_file is not None and self.db_url is not None: raise ValueError("Only one of db_file or db_url must be set") if self.db_file is not None: self.db_url = f"sqlite:///{self.db_file}"
[docs] @classmethod def from_dict(cls, data: Dict[str, Any]) -> "HkConfig": """ Generates an HkConfig object from a dictionary whose keys are fields of the HkConfig dataclass. If the ``db_url`` is specified, it can be set as a string, a dictionary, or a sqlalchemy URL object. If it is of type dict, it will be converted to a URL object by passing it through to the keyword arguments of sqlalchemy's URL.create function. Environment variables will be expanded in both the string and dict representations. """ _db_url = data.get('db_url') if isinstance(_db_url, dict): url_dict = data['db_url'] for k, v in url_dict.items(): url_dict[k] = os.path.expandvars(v) data['db_url'] = db.URL.create(**url_dict) elif isinstance(_db_url, str): data['db_url'] = os.path.expandvars(data['db_url']) return cls(**data)
[docs] @classmethod def from_yaml(cls, path): with open(path, 'r') as f: return cls.from_dict(yaml.safe_load(f))
[docs] class HkFile(Base): """ Database entry for a hk file. Args ------ path: str Path to the hk file. start_time: float Starting ctime of the file end_time: float Ending ctime of the file size: int Size of file in bytes mod_time: float Modification time of file index_status: float Index status of file. Can be "unindexed", "indexed", or "failed" """ __tablename__ = 'hk_files' id = db.Column(db.Integer, primary_key=True) path = db.Column(db.String, nullable=False, unique=True) start_time = db.Column(db.Float) end_time = db.Column(db.Float) size = db.Column(db.Float) mod_time = db.Column(db.Float) index_status = db.Column(db.String)
[docs] class HkFrame(Base): """ Database entry for an hk frame. Args -------- file_id: int ID of the file containing the frame agent: str instance-id of the OCS agent for the frame feed: str Name of the OCS feed for the frame fields_hash: str Hash to identify the combination of fields included in this frame. byte_offset: int Offset of the frame in the G3 file in bytes start_time: float Starting ctime of the frame end_time: float Ending ctime of the frame """ __tablename__ = 'hk_frames' id = db.Column(db.Integer, primary_key=True) file_id = db.Column(db.Integer, db.ForeignKey('hk_files.id'), index=True) # index required on postgres, or # else delete is slow file = relationship('HkFile') agent = db.Column(db.String) feed = db.Column(db.String) fields_hash = db.Column(db.String) byte_offset = db.Column(db.Integer) start_time = db.Column(db.Float) end_time = db.Column(db.Float)
[docs] class HkDb: """ Helper class for createing database sessions Args ------ cfg : Union[HKConfig, str] Configuration object or path to configuration file """ def __init__(self, cfg: Union[HkConfig, str]): if isinstance(cfg, str): cfg = HkConfig.from_yaml(cfg) self.cfg = cfg self.engine = db.create_engine(cfg.db_url, echo=cfg.echo_db) Session.configure(bind=self.engine) self.Session = sessionmaker(bind=self.engine) Base.metadata.create_all(self.engine)
[docs] def update_file_index(hkcfg: HkConfig, session=None, subdirs=None, retry_failed=False): """Updates HkFiles database with new files on disk. If subdirs is specified, it must be a list of (full path) sub-directories of hk_root; those will be scanned and any new files added to database. Otherwise, all subdirs of hk_root will be scanned, subject to any restriction from hkcfg.file_idx_lookback_time. """ if session is None: hkdb = HkDb(hkcfg) session = hkdb.Session() if hkcfg.file_idx_lookback_time is not None and subdirs is None: min_ctime = time.time() - hkcfg.file_idx_lookback_time else: min_ctime = 0 all_files = [] if subdirs is None: log.debug(f"Getting dirs from {hkcfg.hk_root}.") subdirs = [os.path.join(hkcfg.hk_root, subdir) for subdir in os.listdir(hkcfg.hk_root)] else: _root = pathlib.Path(hkcfg.hk_root) assert all([_root in pathlib.Path(sdir).parents for sdir in subdirs]) log.debug(f"Scanning {len(subdirs)} subdirs (min_ctime={min_ctime}).") for sdir in subdirs: if min_ctime > 0 and min_ctime > os.path.getmtime(sdir): continue all_files.extend([ os.path.join(sdir, f) for f in os.listdir(sdir) if f.endswith('.g3') ]) existing_files = {'all': []} for k in ['failed', 'indexed', 'unindexed']: existing_files[k] = [ os.path.join(hkcfg.hk_root, path) for path, in session.query(HkFile.path).filter(HkFile.index_status == k).all() ] existing_files['all'].extend(existing_files[k]) if retry_failed: retry_files = sorted(list(set(all_files).intersection(existing_files['failed']))) log.info(f"Clearing {len(retry_files)} for retry...") for f in retry_files: relpath = os.path.relpath(f, hkcfg.hk_root) session.query(HkFile).filter(HkFile.path == relpath) \ .update({HkFile.index_status: 'unindexed'}) new_files = sorted(list(set(all_files) - set(existing_files['all']))) files = [] log.info(f"Adding {len(new_files)} new files to index...") for path in new_files: relpath = os.path.relpath(path, hkcfg.hk_root) files.append(HkFile( path=relpath, size=os.path.getsize(path), mod_time=os.path.getmtime(path), index_status='unindexed' )) session.add_all(files) session.commit() return { 'scanned_subdirs': len(subdirs), 'total_files': len(all_files), 'new_files': len(new_files), }
[docs] def get_frames_from_file( hkcfg: HkConfig, file: HkFile, return_on_fail=True ) -> List[HkFrame]: """ Returns HkFile and HkFrame objects corresponding to a given hk file. Args -------- file : HkFile HkFile object corresponding to the file return_on_fail : bool If True, if there is a runtime error while reading the g3 file (usually caused by a forced shutdown), the function will still return parsed frames. Returns --------- frames : List[HkFrame] List of all HkFrames in the file """ frames = [] if os.path.isabs(file.path): path = str(file.path) else: path = os.path.join(hkcfg.hk_root, str(file.path)) reader = so3g.G3IndexedReader(path) while True: byte_offset = reader.Tell() try: frame = reader.Process(None) except RuntimeError: log.error(f"Error processing file {file.path} byte offset: {byte_offset}") if return_on_fail: break else: raise if not frame: break else: frame = frame[0] if frame['hkagg_type'] != so3g.HKFrameType.data: continue # Process frame addr = frame['address'] _, agent, _, feed = addr.split('.') start_time, stop_time = 1<<32, 0 fields = [] for block in frame['blocks']: ts = np.array(block.times) / spt3g_core.G3Units.s fields.extend(block.keys()) start_time = min(start_time, ts[0].item()) stop_time = max(stop_time, ts[-1].item()) fields_hash = hashlib.sha256(','.join(sorted(fields)).encode('ascii')).hexdigest()[:16] frames.append(HkFrame( agent=agent, feed=feed, fields_hash=fields_hash, byte_offset=byte_offset, start_time=start_time, end_time=stop_time, file=file )) return frames
[docs] def update_frame_index(hkcfg: HkConfig, session=None): """Updates HkFrames database with frames from unindexed files""" if session is None: hkdb = HkDb(hkcfg) session = hkdb.Session() report = { 'new_files_indexed': 0, 'new_files_failed': 0, } files = session.query(HkFile).filter(HkFile.index_status == 'unindexed').all() log.info(f"Indexing {len(files)} files") for file in tqdm(files, disable=(not hkcfg.show_index_pb), ascii=True): frames = get_frames_from_file(hkcfg, file) file_start, file_end = 1<<32, 0 for f in frames: file_start = min(file_start, f.start_time) file_end = max(file_end, f.end_time) file.start_time = file_start file.end_time = file_end file.index_status ='indexed' try: session.add_all(frames) session.commit() report['new_files_indexed'] += 1 log.debug(f"Committed index for {file.path}.") except Exception as e: log.error(f"Failed to commit frames from {file.path}.") session.rollback() file.index_status = 'failed' session.commit() report['new_files_failed'] += 1 return report
[docs] def update_index_all(cfg: Union[HkConfig, str], subdirs=None, retry_failed: bool=False): """Updates all HK index databases, and returns a report.""" if isinstance(cfg, str): cfg = HkConfig.from_yaml(cfg) hkdb = HkDb(cfg) session = hkdb.Session() report1 = update_file_index(cfg, session=session, subdirs=subdirs, retry_failed=retry_failed) report2 = update_frame_index(cfg, session=session) return report1 | report2
[docs] def purge_unindexed_files(hkcfg: HkConfig): """Remove any 'unindexed' files from the database. This can be used to ignore files that disappeared from filesystem before being indexed. """ hkdb = HkDb(hkcfg) session = hkdb.Session() session.query(HkFile).filter(HkFile.index_status == 'unindexed').delete( synchronize_session=False) session.commit()
[docs] def reset_failed_files(hkcfg: HkConfig, pattern=None): """Reset "failed" files to the "unindexed" state. If pattern is specified, it should be an sql-compatible matching string, which will be matched against the path. Returns the number of changed rows. """ hkdb = HkDb(hkcfg) session = hkdb.Session() q = session.query(HkFile) \ .filter(HkFile.index_status == 'failed') if pattern is not None: q = q.filter(HkFile.path.like(pattern)) rowcount = q.update({HkFile.index_status: 'unindexed'}, synchronize_session=False) session.commit() return rowcount
[docs] def get_files_info(hkcfg: HkConfig, index_status=None, limit=None): """Query file rows; sorted by path. Optionally filter by index_status (list of str) and limit (int). Returns list of HkFile. """ hkdb = HkDb(hkcfg) session = hkdb.Session() q = session.query(HkFile) if index_status is not None: if isinstance(index_status, str): index_status = [index_status] q = q.filter(HkFile.index_status.in_(index_status)) q = q.order_by(HkFile.path) if limit: q = q.limit(limit) rows = q.all() session.close() return rows
##################### # HK Loading stuff #####################
[docs] @dataclass class Field: agent: str feed: str field: str def __str__(self): return f"{self.agent}.{self.feed}.{self.field}" @staticmethod def _wcmatch(a, b): if '*' in a: return fnmatch.fnmatch(b, a) if '*' in b: return fnmatch.fnmatch(a, b) return a == b
[docs] def matches(self, other): return (self.agent == other.agent and Field._wcmatch(self.feed, other.feed) and Field._wcmatch(self.field, other.field))
[docs] @classmethod def from_str(cls, s): try: agent, feed, field = s.split('.') except Exception: raise ValueError(f"Could not parse field: {s}") return cls(agent, feed, field)
[docs] @dataclass class LoadSpec: """ HK loading specification Args ----- cfg: HkConfig Configuration object fields: List[str] List of field specifications to load. This can either be a field descriptor, of the format ``agent.feed.field``, or an alias defined in the config. Field descriptors can contain wildcards in the feed and field portion., for instance ``agent.*.*`` will load all fields belonging to the specified agent. ``agent.feed.*`` and ``agent.*.*word*`` will also work as expected. start: float Start time to load end: float End time to load downsample_factor: int Downsample factor for data hkdb: Optional[HkDb] HkDb instance to use. If not specified, will create a new one from the cfg. This should be set manually if you are calling ``load_hk`` in a loop to prevent connection build-up. """ cfg: HkConfig fields: List[str] start: float end: float downsample_factor: int = 1 hkdb: Optional[HkDb] = None def __post_init__(self): fs = [] for f in self.fields: if f in self.cfg.aliases: fs.append(Field.from_str(self.cfg.aliases[f])) else: fs.append(Field.from_str(f)) self.fields = fs
[docs] class HkResult: """ Helper class for storing results of LoadHk. If aliases are set for any of the keys, they will be set as attributes of the object. Attributes ------------ data: dict Dict where the key is the field descriptor, and the value is a list where val[0] are timestamps, and val[1] is data. """ def __init__(self, data, aliases=None): if aliases is None: aliases = {} self._aliases = aliases self.data = data for alias, key in aliases.items(): if key in self.data: setattr(self, alias, self.data[key])
[docs] def save(self, path): np.savez(path, aliases=np.array(self._aliases), **self.data)
[docs] @classmethod def load(cls, path): d = np.load(path, allow_pickle=True) aliases = d['aliases'].item() data = {k: d[k] for k in d.files if k != 'aliases'} return cls(data, aliases=aliases)
[docs] def load_hk(load_spec: Union[LoadSpec, dict], show_pb=False, fields=None, start=None, end=None, _field_list_scan=False): """ Loads hk data Args ------ load_spec: LoadSpec Load specification. See docstrings of the LoadSpec class. show_pb: bool If true, will show a progressbar :) fields: List[str] Fields to load (overrides load_spec.fields). start: float Starting timestamp (overrides load_spec.start). end: float Ending timestamp (overrides load_spec.end). _field_list_scan: bool Run in special mode to support get_field_list. """ if isinstance(load_spec, dict): load_spec = LoadSpec(**load_spec) if load_spec.hkdb is not None: hkdb: HkDb = load_spec.hkdb else: hkdb = HkDb(load_spec.cfg) if fields is None: fields = load_spec.fields fields = [Field.from_str(f) if isinstance(f, str) else f for f in fields] if start is None: start = load_spec.start if end is None: end = load_spec.end agent_set = list(set(f.agent for f in fields)) file_spec = {} # {path: [offsets]} feeds = [] with hkdb.Session.begin() as sess: query = sess.query(HkFrame).filter( HkFrame.start_time <= end, HkFrame.end_time >= start, HkFrame.agent.in_(agent_set) ).order_by(HkFrame.start_time) for frame in query: if _field_list_scan: feed_key = (frame.agent, frame.feed, frame.fields_hash) if feed_key in feeds: continue feeds.append(feed_key) if frame.file.path not in file_spec: file_spec[frame.file.path] = [] file_spec[frame.file.path].append(frame.byte_offset) # Convert all paths to absolute paths based on cfg.hk_root def create_abs_path(path): if os.path.isabs(path): return path return os.path.join(load_spec.cfg.hk_root, path) file_spec = {create_abs_path(k): v for k, v in file_spec.items()} result = {} # {field: [timestamps, data]} field_misses = set() def get_result_field(agent, feed, field_name): f = Field(agent, feed, field_name) key = str(f) if key in result: return result[key] if key in field_misses: return None for field in fields: if field.matches(f): result[key] = [[], []] return result[key] # Cache field on miss field_misses.add(key) return None ds_factor = load_spec.downsample_factor nframes = np.sum([len(offsets) for offsets in file_spec.values()]) pb = tqdm(total=nframes, disable=(not show_pb)) for path in sorted(list(file_spec.keys())): offsets = file_spec[path] reader = so3g.G3IndexedReader(path) for offset in sorted(offsets): reader.Seek(offset) frame = reader.Process(None)[0] addr = frame['address'] _, agent, _, feed = addr.split('.') for block in frame['blocks']: ts = np.array(block.times)[::ds_factor] / spt3g_core.G3Units.s for field_name, data in block.items(): field = get_result_field(agent, feed, field_name) if field is None or _field_list_scan: continue field[0].append(ts) field[1].append(np.array(data)[::ds_factor]) pb.update() pb.close() if _field_list_scan: return list(result.keys()) for k, d in result.items(): if len(d[0]) == 0: result[k] = (np.array([]), np.array([])) else: result[k] = (np.hstack(d[0]), np.hstack(d[1])) return HkResult(result, aliases=load_spec.cfg.aliases)
[docs] def get_feed_list(load_spec: Union[LoadSpec, dict]) -> List[str]: """Return the list of feeds present in the db and for the time range specified by a LoadSpec. Args ---- load_spec: LoadSpec Load specification. See docstrings of the LoadSpec class. Returns ------- List[str] The list of feeds, as field spec strings, with wildcard for the field e.g. "an_agent.a_feed.*". Notes ----- The .start_time and .end_time are respected in the query, but the .fields entry is ignored in the search. """ if isinstance(load_spec, dict): load_spec = LoadSpec(**load_spec) if load_spec.hkdb is not None: hkdb: HkDb = load_spec.hkdb else: hkdb = HkDb(load_spec.cfg) pairs = set() with hkdb.Session.begin() as sess: query = sess.query(HkFrame.agent, HkFrame.feed).filter( HkFrame.start_time <= load_spec.end, HkFrame.end_time >= load_spec.start, ) for row in query: pairs.add(tuple(row)) return [f'{a}.{b}.*' for a, b in sorted(list(pairs))]
[docs] def get_field_list(load_spec: Union[LoadSpec, dict], fields: List[Field]=None) -> List[str]: """Inspect the HK files to get the field names associated with each feed covered by the load_spec (or by the fields argument). This is shallow search in that only a single frame from every ``agent.feed`` combination matching the fields list is inspected. Args ---- load_spec: LoadSpec Load specification. See docstrings of the LoadSpec class. fields: List of fields (which may include wildcards) to match against. Returns ------- List[str] The list of fields, as field spec strings. Notes ----- If fields is not specified, then it is taken from load_spec. But normally you'd want it from the get_feeds_list. The .start_time and .end_time are respected in the query, especially in the sense that the shallow data search will begin at .start_time. """ return load_hk(load_spec, fields=fields, _field_list_scan=True)