Source code for sotodlib.core.metadata.obsfiledb

import sqlite3
import os
import sys
import argparse
from collections import OrderedDict
import numpy as np

from . import common
from .resultset import ResultSet

TABLE_DEFS = {
    'detsets': [
        "`name`    varchar(16)",
        "`det`     varchar(32)",
        "CONSTRAINT name_det UNIQUE (name, det)",
    ],
    'files': [
        "`name`    varchar(256) unique",
        "`detset`  varchar(16)",
        "`obs_id`  varchar(256)",
        "`sample_start` int",
        "`sample_stop`  int",
    ],
    'frame_offsets': [
        "`file_name` varchar(256)",
        "`frame_index` int",
        "`byte_offset` int",
        "`frame_type` varchar(16)",
        "`sample_start` int",
        "`sample_stop` int",
    ],
    'meta': [
        "`param` varchar(32) UNIQUE",
        "`value` varchar",
    ],
}


[docs] class ObsFileDb: """sqlite3-based database for managing large archives of files. The data model here is that each distinct "Observation" comprises co-sampled detector data for a large number of detectors. Each detector belongs to a single "detset", and there is a set of files containing the data for each detset. Finding the file that contains data for a particular detector is a matter of looking up what detset the detector is in, and looking up what file covers that detset. Note that many functions have a "commit" option, which simply affects whether the .commit is called on the database or not (it can be faster to suppress commit ops when a running a batch of updates, and commit manually at the end). """ #: The sqlite3 database connection. conn = None #: Path relative to which filenames in the database should be #: interpreted. This only applies to relative filenames (those not #: starting with /). prefix = ''
[docs] def __init__(self, map_file=None, prefix=None, init_db=True, readonly=False): """Instantiate an ObsFileDb. Arguments: map_file (string): sqlite database file to map. Defaults to ':memory:'. prefix (string): as described in class documentation. init_db (bool): If True, attempt to create the database tables. readonly (bool): If True, the database file will be mapped in read-only mode. Not valid on dbs held in :memory:. """ if isinstance(map_file, sqlite3.Connection): self.conn = map_file else: if map_file is None: map_file = ':memory:' self.conn = sqlite3.connect(map_file) uri = False if readonly: if map_file == ':memory:': raise ValueError('Cannot honor request for readonly db ' 'mapped to :memory:.') map_file, uri = 'file:%s?mode=ro' % map_file, True self.conn = sqlite3.connect(map_file, uri=uri) self.conn.row_factory = sqlite3.Row # access columns by name self.prefix = self._get_prefix(map_file, prefix) if init_db and not readonly: self._create()
@staticmethod def _get_prefix(map_file, prefix): """Common logic for setting the file prefix based on map_file and prefix arguments. """ if prefix is not None: return prefix if map_file == ':memory:': return '' return os.path.split(os.path.abspath(map_file))[0] + '/'
[docs] def to_file(self, filename, overwrite=True, fmt=None): """Write the present database to the indicated filename. Args: filename (str): the path to the output file. overwrite (bool): whether an existing file should be overwritten. fmt (str): 'sqlite', 'dump', or 'gz'. Defaults to 'sqlite' unless the filename ends with '.gz', in which it is 'gz'. """ return common.sqlite_to_file(self.conn, filename, overwrite=overwrite, fmt=fmt)
[docs] @classmethod def from_file(cls, filename, prefix=None, fmt=None, force_new_db=True): """This method calls :func:`sotodlib.core.metadata.common.sqlite_from_file` """ conn = common.sqlite_from_file(filename, fmt=fmt, force_new_db=force_new_db) if prefix is None: prefix = os.path.split(filename)[0] + '/' return cls(conn, init_db=False, prefix=prefix, )
[docs] @classmethod def for_dir(cls, path, filename='obsfiledb.sqlite', readonly=True): """Deprecated; use from_file().""" print('Use of ObsFileDb.for_dir() is deprecated... use from_file.') return cls.from_file(os.path.join(path, filename), prefix=path)
[docs] def copy(self, map_file=None, overwrite=False): """ Duplicate the current database into a new database object, and return it. If map_file is specified, the new database will be connected to that sqlite file on disk. Note that a quick way of writing a Db to disk to call copy(map_file=...). """ if map_file is None: map_file = ':memory:' script = ' '.join(self.conn.iterdump()) if map_file != ':memory:' and os.path.exists(map_file): if not overwrite: raise RuntimeError("Output database '%s' exists -- remove or " "pass overwrite=True to copy." % map_file) os.remove(map_file) new_db = ObsFileDb(map_file, init_db=False) new_db.conn.executescript(script) new_db.prefix = self.prefix return new_db
def _get_version(self, conn=None): if conn is None: conn = self.conn rows = conn.execute('select value from meta where ' 'param="obsfiledb_version"').fetchall() if len(rows) == 0: return None return int(rows[0][0]) def _create(self): """ Create the database tables if they do not already exist. """ # Create the tables: table_defs = TABLE_DEFS.items() c = self.conn.cursor() for table_name, column_defs in table_defs: q = ('create table if not exists `%s` (' % table_name + ','.join(column_defs) + ')') c.execute(q) if self._get_version(conn=c) is None: c.execute('insert or ignore into meta (param,value) values (?,?)', ('obsfiledb_version', 2)) self.conn.commit()
[docs] def add_detset(self, detset_name, detector_names, commit=True): """Add a detset to the detsets table. Arguments: detset_name (str): The (unique) name of this detset. detector_names (list of str): The detectors belonging to this detset. """ for d in detector_names: q = 'insert into detsets (name,det) values (?,?)' self.conn.execute(q, (detset_name, d)) if commit: self.conn.commit()
[docs] def add_obsfile(self, filename, obs_id, detset, sample_start=None, sample_stop=None, commit=True): """Add an observation file to the files table. Arguments: filename (str): The filename, relative to the data base directory and without a leading /. obs_id (str): The observation id. detset (str): The detset name. sample_start (int): The observation sample index at the start of this file. sample_stop (int): sample_start + n_samples. """ self.conn.execute( 'insert into files (name,detset,obs_id,sample_start,sample_stop) ' 'values (?,?,?,?,?)', (filename,detset,obs_id,sample_start,sample_stop)) if commit: self.conn.commit()
# Retrieval
[docs] def get_obs(self): """Returns all a list of all obs_id present in this database. """ c = self.conn.execute('select distinct obs_id from files') return [r[0] for r in c]
[docs] def get_obs_with_detset(self, detset): """Returns a list of all obs_ids that include a specified detset""" c = self.conn.execute( f"select distinct obs_id from files where detset='{detset}'" ) return [r[0] for r in c]
[docs] def get_detsets(self, obs_id): """Returns a list of all detsets represented in the observation specified by obs_id. """ c = self.conn.execute('select distinct detset from files ' 'where obs_id=?', (obs_id,)) return [r[0] for r in c]
[docs] def get_dets(self, detset): """Returns a list of all detectors in the specified detset. """ c = self.conn.execute('select det from detsets where name=?', (detset,)) return [r[0] for r in c]
[docs] def get_det_table(self, obs_id): """Get table of detectors and detsets suitable for use with Context det_info. Returns Resultset with keys=['dets:detset','dets:readout_id']. """ c = self.conn.execute( 'select distinct detsets.name as `dets:detset`, det as `dets:readout_id`' 'from detsets join files ' 'on files.detset=detsets.name where obs_id=?', (obs_id, )) return ResultSet.from_cursor(c)
[docs] def get_files(self, obs_id, detsets=None, prefix=None): """Get the file names associated with a particular obs_id and detsets. Returns: OrderedDict where the key is the detset name and the value is a list of tuples of the form (full_filename, sample_start, sample_stop). """ if prefix is None: prefix = self.prefix if detsets is None: detsets = self.get_detsets(obs_id) c = self.conn.execute('select detset, name, sample_start, sample_stop ' 'from files where obs_id=? and detset in (%s) ' 'order by detset, sample_start' % ','.join(['?' for _ in detsets]), (obs_id,) + tuple(detsets)) output = OrderedDict() for r in c: if not r[0] in output: output[r[0]] = [] output[r[0]].append((os.path.join(prefix, r[1]), r[2], r[3])) return output
[docs] def lookup_file(self, filename, resolve_paths=True, prefix=None, fail_ok=False): """Determine what, if any, obs_id (and detset and sample range) is associated with the specified data file. Args: filename (str): a string corresponding to a file that is covered by this db. See note on how this is resolved. resolve_paths (bool): If True, then the incoming filename is treated as a path to a specific file on disk, and the database is queried for files that also resolve to that are equivalent to that same file on disk (accounting for prefix). If False, then the incoming filename is taken as opaque text to match against the corresponding entries in the obsfiledb file "name" column (including whatever path information is in either of those strings). fail_ok (bool): If True, then None is returned if the filename is not found in the db (instead of raising RuntimeError). Returns: A dict with entries: - ``obs_id``: The obs_id - ``detsets``: A list containing the name of the single detset covered by this file. - ``sample_range``: A tuple with the start and stop sample indices for this file. """ if prefix is None: prefix = self.prefix if resolve_paths: # Clarify our target ... filename = os.path.realpath(filename) basename = os.path.split(filename)[1] # Do a non-conclusive match against the basename ... c = self.conn.execute( 'select name, obs_id, detset, sample_start, sample_stop ' 'from files where name like ?', ('%' + basename, )) rows = c.fetchall() # Keep only the rows that are definitely our target file. rows = [r for r in rows if os.path.realpath(os.path.join(prefix, r[0])) == filename] else: # Do literal exact matching of filename to database. c = self.conn.execute( 'select name, obs_id, detset, sample_start, sample_stop ' 'from files where name=?', (filename, )) rows = c.fetchall() if len(rows) == 0: if fail_ok: return None raise RuntimeError('No match found for "%s"' % filename) if len(rows) > 1: raise RuntimeError('Multiple matches found for "%s"' % filename) _, obs_id, detset, start, stop = tuple(rows[0]) return {'obs_id': obs_id, 'detsets': [detset], 'sample_range': (start, stop)}
[docs] def verify(self, prefix=None): """Check the filesystem for the presence of files described in the database. Returns a dictionary containing this information in various forms; see code for details. This function is used internally by the drop_incomplete() function, and may also be useful for debugging file-finding problems. """ if prefix is None: prefix = self.prefix # Check for the presence of each listed file. c = self.conn.execute('select name, obs_id, detset, sample_start ' 'from files') rows = [] for r in c: fp = os.path.join(prefix, r[0]) rows.append((os.path.exists(fp), fp) + tuple(r)) obs = OrderedDict() for r in rows: present, fullpath, name, obs_id, detset, sample_start = r if obs_id not in obs: obs[obs_id] = {'present': [], 'absent': []} if present: obs[obs_id]['present'].append((detset, sample_start)) else: obs[obs_id]['absent'].append((detset, sample_start)) # Make a detset, sample_start grid for each observation. grids = OrderedDict() for k, v in obs.items(): items = v['present'] detsets = list(set([a for a, b in items])) sample_starts = list(set([b for a, b in items])) grid = np.zeros((len(detsets), len(sample_starts)), bool) for a, b in items: grid[detsets.index(a), sample_starts.index(b)] = True grids[k] = {'detset': detsets, 'sample_start': sample_starts, 'grid': grid} return {'raw': rows, 'obs_id': obs, 'grids': grids}
[docs] def drop_obs(self, obs_id): """Delete the specified obs_id from the database. Returns a list of files that are no longer covered by the database (with prefix). """ # What files does this affect? c = self.conn.execute('select name from files where obs_id=?', (obs_id,)) affected_files = [os.path.join(self.prefix, r[0]) for r in c] # Drop them. self.conn.execute('delete from frame_offsets where file_name in ' '(select name from files where obs_id=?)', (obs_id,)) self.conn.execute('delete from files where obs_id=?', (obs_id,)) self.conn.commit() return affected_files
[docs] def drop_detset(self, detset): """Delete the specified detset from the database. Returns a list of files that are no longer covered by the database (with prefix). """ # What files does this affect? c = self.conn.execute('select name from files where detset=?', (detset,)) affected_files = [os.path.join(self.prefix, r[0]) for r in c] # Drop them. self.conn.execute('delete from frame_offsets where file_name in ' '(select name from files where detset=?)', (detset,)) self.conn.execute('delete from files where detset=?', (detset,)) self.conn.commit() return affected_files
[docs] def drop_incomplete(self): """Compare the files actually present on the system to the ones listed in this database. Drop detsets from each observation, as necessary, such that the database is consistent with the file system. Returns a list of files that are on the system but are no longer included in the database. """ affected_files = [] scan = self.verify() for obs_id, info in scan['grids'].items(): # Drop any detset that does not have complete sample # coverage. detset_to_drop = np.any(~info['grid'], axis=1) for i in detset_to_drop.nonzero()[0]: affected_files.extend( [r[0] for r in self.conn.execute( 'select name from files where obs_id=? and detset=?', (obs_id, info['detset'][i]))]) self.conn.execute( 'delete from files where obs_id=? and detset=?', (obs_id, info['detset'][i])) # Drop any detset that no longer appear in any files. self.conn.execute('delete from detsets where name not in ' '(select distinct detset from files)') self.conn.commit() self.conn.execute('vacuum') # Return the full paths of only the existing files that have # been dropped from the Db. path_map = {r[2]: r[1] for r in scan['raw'] if r[0]} return [r[1] for r in scan['raw'] if r[2] in affected_files]
[docs] def get_file_list(self, fout=None): """Returns a list of all files in the database, without the file prefix, sorted by observation / detset / sample_start. This is the sort of list one might use with rsync --files-from. If you pass an open file or filename to fout, the names will be written there, too. """ c = self.conn.execute('select name from files order by ' 'obs_id, detset, sample_start') output = [r[0] for r in c] if fout is not None: if isinstance(fout, str): assert(not os.path.exists(fout)) fout = open(fout, 'w') for line in output: fout.write(line+'\n') return output
def get_parser(parser=None): if parser is None: parser = argparse.ArgumentParser( epilog="""For details of individual modes, pass a dummy database argument followed by the mode and -h, e.g.: "%(prog)s x files -h" """) parser.add_argument('filename', help="Path to an ObsFileDb.", metavar='obsfiledb.sqlite') cmdsubp = parser.add_subparsers( dest='mode') # "files" p = cmdsubp.add_parser( 'files', usage="""Syntax: %(prog)s %(prog)s --all %(prog)s --clean This will print out a list of the files in the db, along with obs_id and detset. Only a few lines will be shown, unless --all is passed. To get a simple list of all files (for rsync or something), pass --clean. """, help="List the files referenced in the database.") p.add_argument('--clean', action='store_true', help="Print a simple list of all files (for script digestion).") p.add_argument('--all', action='store_true', help="Print all files, not an abbreviated list.") # "reroot" p = cmdsubp.add_parser( 'reroot', help= "Batch change filenames (by prefix) in the database.", usage="""Syntax: %(prog)s old_prefix new_prefix [output options] Examples: %(prog)s /path/on/system1 /path/on/system2 -o my_new_manifest.sqlite %(prog)s /path/on/system1 /new_path/on/system1 --overwrite %(prog)s ./result1/obs_12345.h5 ./result2/obs_12345.h5 --overwrite These operations will create a duplicate of the source ObsFileDb, with only the filenames (potentially) altered. Any filename that starts with the first argument will be changed, in the output, to instead start with the second argument. When you do this you must either say where to write the output (-o) or give the program permission to overwrite your input database file. Note that the first argument need not match all entries in the database; you can use it to pick out a subset (even a single entry). """) p.add_argument('old_prefix', help= "Prefix to match in current database.") p.add_argument('new_prefix', help= "Prefix to replace it with.") p.add_argument('--overwrite', action='store_true', help= "Store modified database in the same file.") p.add_argument('--output-db', '-o', help= "Store modified database in this file.") p.add_argument('--dry-run', action='store_true', help= "Run the conversion steps but do not write the results anywhere.") # "fix-db" p = cmdsubp.add_parser( 'fix-db', help= "Upgrade database (schema fixes, etc).", usage="""Syntax: %(prog)s [output options] """) p.add_argument('--overwrite', action='store_true', help= "Store modified database in the same file.") p.add_argument('--output-db', '-o', help= "Store modified database in this file.") p.add_argument('--dry-run', action='store_true', help= "Run the conversion steps but do not write the results anywhere.") return parser def main(args=None, parser=None): """Entry point for the so-metadata tool.""" if args is None: args = sys.argv[1:] elif not isinstance(args, argparse.Namespace): parser = get_parser() args = parser.parse_args(args) if args.mode is None: args.mode = 'summary' db = ObsFileDb.from_file(args.filename, force_new_db=False) if args.mode == 'files': # Get all files. rows = db.conn.execute( 'select obs_id, name, detset from files ' 'order by obs_id, detset, name').fetchall() if args.clean: for obs_id, filename, detset in rows: print(filename) else: fmt = ' {obs_id} {detset} {filename}' hdr = fmt.format(obs_id="obs_id", detset="detset", filename="Filename") print(hdr) print('-' * (len(hdr) + 20)) n = len(rows) if n > 20 and not args.all: rows = rows[:10] for obs_id, filename, detset in rows: print(fmt.format(obs_id=obs_id, filename=filename, detset=detset)) if len(rows) < n: print(fmt.format(obs_id='...', filename='+%i others' % (n - len(rows)), detset='')) print('(Pass --all to show all results.)') print() elif args.mode == 'reroot': # Reconnect with write? if args.overwrite: if args.output_db: parser.error("Specify only one of --overwrite or --output-db.") db = ObsFileDb.from_file(args.filename, force_new_db=True) args.output_db = args.filename else: if args.output_db is None: parser.error("Specify an output database name with --output-db, " "or pass --overwrite to clobber.") db = ObsFileDb.from_file(args.filename, force_new_db=True) # Get all files matching this prefix ... c = db.conn.execute('select name from files ' 'where name like "%s%%"' % (args.old_prefix)) rows = c.fetchall() print('Found %i records matching prefix ...' % len(rows)) print('Converting to new prefix ...') n_examples = 1 if not args.dry_run: c = db.conn.cursor() for (name, ) in rows: new_name = args.new_prefix + name[len(args.old_prefix):] if n_examples > 0: print(f' Example: converting filename\n' f' "{name}"\n' f' to\n' f' "{new_name}"') n_examples -= 1 if not args.dry_run: c.execute('update files set name=? where name=?', (new_name, name)) print('Saving to %s' % args.output_db) if not args.dry_run: db.conn.commit() c.execute('vacuum') db.to_file(args.output_db) elif args.mode == 'fix-db': # Reconnect with write? if args.overwrite: if args.output_db: parser.error("Specify only one of --overwrite or --output-db.") db = ObsFileDb.from_file(args.filename, force_new_db=True) args.output_db = args.filename else: if args.output_db is None: parser.error("Specify an output database name with --output-db, " "or pass --overwrite to clobber.") db = ObsFileDb.from_file(args.filename, force_new_db=True) # Get version ... v = db._get_version() print(f'Database reports as version = {v}') changes = False if v == 1: # Copy detsets to new table, where uniqueness has been # relaxed. Re-do the meta table, where uniqueness is # enforced to prevent lots of redundant rows added by # _create(). changes = True for line in [ 'drop table meta', 'alter table detsets rename to old_detsets', '*', 'insert into detsets (name, det) select name, det from old_detsets', 'drop table old_detsets', ]: if line == '*': print('Creating updated tables.') db._create() continue print(f'Running: {line}') db.conn.execute(line) print() if changes: print('Saving to %s' % args.output_db) if not args.dry_run: db.conn.commit() db.conn.execute('vacuum') db.to_file(args.output_db) else: print('No changes to make.') else: parser.error(f'Unimplemented mode, "{args.mode}".')