Source code for sotodlib.core.metadata.detdb

import sqlite3
import gzip
import os

from .resultset import ResultSet
from . import common


class SchemaError(Exception):
    """
    This is raised in cases where the code detects a schema violation,
    such as tables not having the required named columns.
    """
    pass


class IntervalError(Exception):
    """
    This is raised in cases where the code detects that time intervals
    in a property table are of negative size or overlap with other
    intervals for the same det_id.
    """
    pass


TABLE_DEFS = {
    'dets': [
        "`id` integer primary key autoincrement",
        "`name` varchar(256) unique",
        ],
}
SPECIAL_COLS = ['det_id', 'time0', 'time1']


[docs] class DetDb(object): """ Detector database. The database stores data about a set of detectors. The ``dets`` table lists all valid detectors, associating a (time-invariant) name to each ``id``. The other tables in the database are user configurable "property tables" that must obey certain rules: 1. They have at least the following columns: - ``det_id`` integer - ``time0`` integer (unix timestamp) - ``time1`` integer (unix timestamp) 2. The values time0 and time1 define an interval ``[time0,time1)`` over which the data in the row is valid. Every row shall respect the constraint that ``time0 <= time1``. 3. No two rows in a property table shall have the same ``det_id`` and overlapping time intervals. Note that since the intervals are half-open, the intervals [t0, t1) and [t1, t2) do not overlap. """ #: A time-range that is meant to signify "all reasonable times"; #: in this case it spans from years 1970 - 2096. ALWAYS = (0., 4e9) #: Column definitions (a list of strings) that must appear in all #: Property Tables. TABLE_TEMPLATE = [ "`det_id` integer", "`time0` integer", "`time1` integer", ]
[docs] def __init__(self, map_file=None, init_db=True): """Instantiate a DetDb. If map_file is provided, the database will be connected to the indicated sqlite file on disk, and any changes made to this object be written back to the file. """ if isinstance(map_file, sqlite3.Connection): self.conn = map_file else: if map_file is None: map_file = ':memory:' self.conn = sqlite3.connect(map_file) self.conn.row_factory = sqlite3.Row # access columns by name if init_db: # Create dets table if not found. c = self.conn.cursor() c.execute("SELECT name FROM sqlite_master " "WHERE type='table' and name not like 'sqlite_%';") tables = [r[0] for r in c] if 'dets' not in tables: self.create_table('dets', TABLE_DEFS['dets'], raw=True)
def __len__(self): return self.conn.execute('select count(id) from dets').fetchone()[0] def _get_property_tables(self): """Return a list of all property tables.""" c = self.conn.cursor() c.execute("SELECT name FROM sqlite_master WHERE " "type='table' and name not like 'sqlite_%' " "and name != 'dets';") return [str(x[0]) for x in c] def _get_property_list(self, table_name): """Return a list of all property columns for the specified property table. """ c = self.conn.cursor() c.execute(f'PRAGMA table_info({table_name})') return [r[1] for r in c if r[1] not in SPECIAL_COLS]
[docs] def validate(self): """ Checks that the database is following internal rules. Specifically we check that a ``dets`` table exists and has the necessary columns; then we check that all other tables do not have overlapping property intervals. Raises SchemaError in the first case, IntervalError in the second. """ c = self.conn.cursor() c.execute("SELECT name FROM sqlite_master WHERE type='table' " "and name not like 'sqlite_%';") tables = [r[0] for r in c] if 'dets' not in tables: raise SchemaError("Database does not contain a `dets` table.") tables.remove('dets') for t in tables: try: c.execute("SELECT det_id,time0,time1 from `%s` " "order by det_id,time0" % t) except sqlite3.OperationalError as e: raise SchemaError("Key columns not found in table `%s`" % t) last_id, last_t1 = None, None for r in c: _id, _t0, _t1 = r if (_t1 < _t0): raise IntervalError( "Negative size time interval for table %s, " "det_id=%i" % (t, _id)) if _id == last_id: if _t0 < last_t1: raise IntervalError( "Overlapping interval for table %s, " "det_id=%i" % (t, _id)) last_t1 = _t1 else: last_id, last_t1 = _id, _t1
[docs] def create_table(self, table_name, column_defs, raw=False, commit=True): """Add a property table to the database. Args: table_name (str): The name of the new table. column_defs (list): A list of sqlite column definition strings. raw (bool): See below. commit (bool): Whether to commit the changes to the db. The special columns `det`, `time0` and `time1` will be pre-pended unless raw=True. An example of column_defs is:: column_defs=[ "`x_pos` float", "`y_pos` float", ] """ c = self.conn.cursor() pre_cols = self.TABLE_TEMPLATE if raw: pre_cols = [] q = ('create table if not exists `%s` (' % table_name + ','.join(pre_cols + column_defs) + ')') c.execute(q) if commit: self.conn.commit() return self
[docs] def copy(self, map_file=None, overwrite=False): """ Duplicate the current database into a new database object, and return it. If map_file is specified, the new database will be connected to that sqlite file on disk. Note that a quick way of writing a Db to disk to call copy(map_file=...) and then simply discard the returned object. """ if map_file is not None and os.path.exists(map_file): if overwrite: os.remove(map_file) else: raise RuntimeError("Output file %s exists (overwrite=True " "to overwrite)." % map_file) new_db = DetDb(map_file=map_file, init_db=False) script = ' '.join(self.conn.iterdump()) new_db.conn.executescript(script) return new_db
[docs] def to_file(self, filename, overwrite=True, fmt=None): """Write the present database to the indicated filename. Args: filename (str): the path to the output file. overwrite (bool): whether an existing file should be overwritten. fmt (str): 'sqlite', 'dump', or 'gz'. Defaults to 'sqlite' unless the filename ends with '.gz', in which it is 'gz'. """ if fmt is None: if filename.endswith('.gz'): fmt = 'gz' else: fmt = 'sqlite' if os.path.exists(filename) and not overwrite: raise RuntimeError(f'File {filename} exists; remove or pass ' 'overwrite=True.') if fmt == 'sqlite': self.copy(map_file=filename, overwrite=overwrite) elif fmt == 'dump': with open(filename, 'w') as fout: for line in self.conn.iterdump(): fout.write(line) elif fmt == 'gz': with gzip.GzipFile(filename, 'wb') as fout: for line in self.conn.iterdump(): fout.write(line.encode('utf-8')) else: raise RuntimeError(f'Unknown format "{fmt}" requested.')
[docs] @classmethod def from_file(cls, filename, fmt=None, force_new_db=True): """This method calls :func:`sotodlib.core.metadata.common.sqlite_from_file` """ conn = common.sqlite_from_file(filename, fmt=fmt, force_new_db=force_new_db) return cls(conn, init_db=False)
[docs] def reduce(self, dets=None, time0=None, time1=None, inplace=False): """Discard information from the database unless it is "relevant". Args: dets (list or ResultSet): A list of detectors names that are relevant. If this is a ResultSet, the 'name' column is used. If None, all dets are relevant. time0 (int): If time1 is None, then a property's time range must contain this time for it to be considered relevant. If time1 is not None, see below. time1 (int): Along with time0, forms a time range that have non-zero intersection with the property's time range for the entry to be considered relevant. inplace (bool): Whether to act on the present object, or to return a modified copy. Returns the reduced data (which is self, if inplace is True). """ if not inplace: return self.copy().reduce(dets, time0, time1, inplace=True) time_clause = '0' if time0 is not None: if time1 is None: time_clause = '(%i >= time0) or (time1 > %i)' % (time0, time0) else: assert(time1 >= time0) time_clause = '(%i <= time1) or (time0 < %i)' % (time0, time1) else: assert(time1 is None) c = self.conn.cursor() if dets is not None: # Convert to a list of names. if isinstance(dets, ResultSet): dets = dets['name'] # Create a temporary table to list dets we're keeping. self.create_table('_keepers', ["`name` varchar"], raw=True) for d in dets: c.execute('insert into _keepers (name) values (?)', (d,)) # Intersect dets against _keepers, discard _keepers. c.execute('delete from dets where not dets.name in ' '(select name from _keepers)') c.execute('drop table _keepers') det_clause = 'not det_id in (select id from dets)' else: det_clause = '0' # Remove orphaned rows from other tables. for t in self._get_property_tables(): c.execute('delete from %s where %s or %s' % (t, det_clause, time_clause)) self.conn.commit() # Compact the db. c.execute('vacuum') self.conn.commit() return self
# Construction.
[docs] def get_id(self, name, commit=True, create=True): """Returns a detector's internal id. If the detector isn't in the dets table yet, and create==True, then it is added. """ c = self.conn.execute('select id from dets where name=?', (name,)) db_id = c.fetchone() if db_id is not None: return db_id[0] if not create: raise ValueError("Detector {} not in table and " "create=False".format(name)) c = self.conn.execute('insert into dets (name) values (?)', (name,)) db_id = c.lastrowid if commit: self.conn.commit() return db_id
[docs] def add_props(self, table_, name_, time_range=None, commit=True, **kw): """Add property information for a detector. Args: table_ (str): The property table name. name_ (str): The detector name. time_range (pair of ints): The time range over which the property value is applicable. commit (bool): Whether or not to commit the db. All other keyword arguments are interpreted as data to write into the property table. """ if time_range is None: time_range = self.ALWAYS row_id = self.get_id(name_, create=True, commit=False) keys, values = zip(*kw.items()) key_string = ('det_id,time0,time1' + (''.join([',`{}`'] * len(keys)).format(*keys))) val_string = '?,?,?' + ''.join([',?'] * len(keys)) q = (f'insert into {table_} ({key_string}) ' f'values ({val_string})') self.conn.execute( q, (row_id, time_range[0], time_range[1]) + tuple(values)) if commit: self.conn.commit()
# Forward lookup.
[docs] def dets(self, timestamp=None, props={}): """ Get a list of detectors matching the conditions listed in the "props" dict. If timestamp is not provided, then time range restriction is not applied. Returns a list of detector names. """ # Accumulate a query, and args. q = 'select dets.name as name from dets' args = [] # Whatever we were given, convert it to a list of dicts. if isinstance(props, ResultSet): prop_sets = list(props.distinct()) elif isinstance(props, dict): prop_sets = [props] else: prop_sets = props # Expand each match row into query and args. other_tables = [] row_wheres = [] for props in prop_sets: r = [] for m, v in props.items(): if '.' in m: t, m = m.split('.', 1) else: t, m = 'base', m if t not in other_tables: other_tables.append(t) r.append((f'{t}.{m}=?', v)) row_wheres.append(r) # Joins. for t in other_tables: q += ' join %s on %s.det_id=dets.id' % (t, t) # Accumulate restriction strings... restricts = [] if timestamp is not None: time_clause = '%s.time0 <= ? and ? < %s.time1' for t in other_tables: restricts.append(time_clause % (t, t)) args.extend([timestamp, timestamp]) # Matching of each prop_set. prop_criteria = [] for r in row_wheres: if len(r) == 0: prop_criteria.append('1') continue conds, vals = zip(*r) prop_criteria.append(' and '.join(conds)) args.extend(vals) if len(prop_criteria) == 0: prop_criteria.append('0') restricts.append(' or '.join(['(' + pc + ')' for pc in prop_criteria])) # Apply restrictions... if (restricts): q += ' where ' + ' and '.join(restricts) q = q + ' group by id' c = self.conn.cursor() c.execute(q, tuple(args)) return ResultSet.from_cursor(c)
# Reverse lookup.
[docs] def props(self, dets=None, timestamp=None, props=None, concise=False): """ Get the value of the properties listed in props, for each detector identified in dets (a list of strings, or a ResultSet with a column called 'name'). """ # Create temporary table c = self.conn.cursor() c.execute('begin transaction') c.execute('drop table if exists _dets') c.execute('create temp table _dets (`name` varchar(32))') q = 'insert into _dets (name) values (?)' if dets is None: dets = self.dets()['name'] if isinstance(dets, ResultSet): dets = dets['name'] elif isinstance(dets, dict): # This is intended to handle a single row extracted from a # ResultSet. dets = [dets['name']] for a in dets: c.execute(q, (a,)) c.execute('end transaction') # Expand props argument. if props is None: props = [t + '.' for t in self._get_property_tables()] props, props_ = [], props for p in props_: if p.endswith('.'): table_p = self._get_property_list(p[:-1]) props.extend([p + _p for _p in table_p]) else: props.append(p) # Now look stuff up in it. other_tables = [] fields, keys = [], [] for i, m in enumerate(props): if '.' in m: t, f = m.split('.', 1) else: t, f = 'base', m if t not in other_tables: other_tables.append(t) key = f'{t}.{f}' keys.append(key) fields.append(f'{key} as result{i}') q = ('select ' + ', '.join(fields) + ' from _dets join dets on _dets.name=dets.name ' + ' '.join(['join %s on %s.det_id=dets.id' % (m, m) for m in other_tables])) c.execute(q) results = ResultSet.from_cursor(c, keys=keys) c.execute('drop table if exists _dets') results.strip(['base.']) return results
[docs] def intersect(self, *specs, resolve=False): """Intersect the provided detector specs. Each entry is either a list (or similar iterable) of detector names, or a dictionary specifying detector properties. If resolve=True, then the returned item is a list (rather than, possibly, a dict). """ if len(specs) == 0: return [] dicts = [s for s in specs if isinstance(s, dict)] others = [s for s in specs if not isinstance(s, dict)] # Reduce the dicts. req = {} for d in dicts: for k, v in d.items(): if k in req: if req[k] != v: return [] else: req[k] = v if len(others) == 0: if resolve: return self.dets(props=req)['name'] return req # Turn it into a list. req = self.dets(props=req)['name'] keepers = set(req) for other in others: keepers.intersection_update(other) return [n for n in req if n in keepers]
[docs] def get_example(): """Returns an example DetDb, mapped to RAM. The two property tables are called "base" and "geometry". This example is for demonstrating the code and interface and has no relation to any instrument's actual detector layout! """ db = DetDb() TABLES = [ ('base', [ "`instrument` varchar(32)", "`camera` varchar(32)", "`array_code` varchar(16)", "`array_class` varchar(16)", "`wafer_code` varchar(32)", "`freq_code` varchar(16)", "`det_type` varchar(32)", ]), ('geometry', [ "`wafer_x` float", "`wafer_y` float", "`wafer_pol` float", ]), ] for n, d in TABLES: print('Creating table %s' % n) db.create_table(n, d) tel_info = {'instrument': 'simonsobs', 'camera': 'latr'} det_names = [] for ar_type, bands, n_ar, n_wa, n_det in [ ('LF', [27, 39], 1, 3, 37), ('MF', [93, 145], 4, 3, 432), ('HF', [225, 278], 2, 3, 542) ]: print('Creating %s-type arrays...' % ar_type) for fi, f in enumerate(bands): for ar in range(n_ar): for wa in range(n_wa): iofs = (fi*n_ar*n_wa + n_wa*ar + wa)*n_det info = {'freq_code': 'f%03i' % f, 'array_class': ar_type, 'array_code': '%s%i' % (ar_type, ar+1), 'wafer_code': 'W%i' % (wa+1)} for i in range(iofs, iofs + n_det): det_names.append('%s%i_%05i' % (ar_type, ar+1, i)) db.add_props('base', det_names[-1], det_type='bolo', **tel_info, **info, commit=False) print('Committing {} detectors...'.format(len(det_names))) db.conn.commit() # Organize these dets in a big square. This is not the plan. n_row = int(len(det_names)**.5 + 1) for i in range(n_row): for j in range(n_row): d = i*n_row+j if d >= len(det_names): break x, y, ang = i * .02, j * .02, (i+j) % 12. * 15 db.add_props('geometry', det_names[d], commit=False, wafer_x=x, wafer_y=y, wafer_pol=ang) db.conn.commit() print('Checking the work...') db.validate() return db