import numpy as np
from collections import OrderedDict
[docs]
class ResultSet(object):
"""ResultSet is a special container for holding the results of
database queries, i.e. columnar data. The repr of a ResultSet
states the name of its columns, and the number of rows::
>>> print(rset)
ResultSet<[array_code,freq_code], 17094 rows>
You can access the column names in .keys::
>>> print(rset.keys)
['array_code', 'freq_code']
You can request a column by name, and a numpy array of values will
be constructed for you:
>>> rset['array_code']
array(['LF1', 'LF1', 'LF1', ..., 'LF1', 'LF1', 'LF1'], dtype='<U3')
You can request a row by number, and a dict will be constructed
for you:
>>> rset[10]
{'base.array_code': 'LF1', 'base.freq_code': 'f027'}
Note that the array or dict returned by indexing the ResultSet
present copies of the data, not changing those objects will not
update the original ResultSet.
You can also access the raw row data in .rows, which is a simple
list of tuples. If you want to edit the data in a ResultSet,
modify those data rows directly, or else use ``.asarray()`` to get
a numpy array, modify the result, and create and a new ResultSet
from that using the ``.from_friend`` constructor.
You can get a structured numpy array using:
>>> ret.asarray()
array([('LF1', 'f027'), ('LF1', 'f027'), ('LF1', 'f027'), ...,
('LF1', 'f027'), ('LF1', 'f027'), ('LF1', 'f027')],
dtype=[('array_code', '<U3'), ('freq_code', '<U4')])
Slicing works along the row axis; and you can combine two results.
So you could reorganize results like this, if you wanted:
>>> rset[::2] + rset[1::2]
ResultSet<[array_code,freq_code], 17094 rows>
Finally, the .distinct() method returns a ResultSet containing the
distinct elements:
>>> rset.distinct()
ResultSet<[array_code,freq_code], 14 rows>
"""
#: Once instantiated, a list of the names of the ResultSet
#: columns.
keys = None
#: Once instantiated, a list of the raw data tuples.
rows = None
[docs]
def __init__(self, keys, src=None):
self.keys = list(keys)
if src is None:
self.rows = []
else:
self.rows = [tuple(x) for x in src]
[docs]
@classmethod
def from_friend(cls, source):
"""Return a new ResultSet populated with data from source.
If source is a ResultSet, a copy is made. If source is a
numpy structured array, the ResultSet is constructed based on
the dtype names and rows of source.
Otherwise, a TypeError is raised.
"""
if isinstance(source, np.ndarray):
keys = source.dtype.names # structured array?
return cls(keys, list(source))
if isinstance(source, ResultSet):
return cls(source.keys, source.rows)
raise TypeError(f"No implementation to construct {cls} from {source.__class__}.")
def copy(self):
return self.__class__(self.keys, self.rows)
[docs]
def subset(self, keys=None, rows=None):
"""Returns a copy of the object, selecting only the keys and rows
specified.
Arguments:
keys: a list of keys to keep. None keeps all.
rows: a list or array of the integers representing which
rows to keep. This can also be specified as an array of
bools, of the same length as self.rows, to select row by
row. None keeps all.
"""
if keys is None:
keys = self.keys
def key_sel_func(row):
return row
else:
key_idx = [self.keys.index(k) for k in keys]
def key_sel_func(row):
return [row[i] for i in key_idx]
if rows is None:
new_rows = map(key_sel_func, self.rows)
elif isinstance(rows, np.ndarray) and rows.dtype == bool:
assert(len(rows) == len(self.rows))
new_rows = [key_sel_func(r) for r, s in zip(self.rows, rows) if s]
else:
new_rows = [key_sel_func(self.rows[i]) for i in rows]
return self.__class__(keys, new_rows)
[docs]
@classmethod
def from_cursor(cls, cursor, keys=None):
"""Create a ResultSet using the results stored in cursor, an
sqlite.Cursor object. The cursor must have be configured so
that .description is populated.
"""
if keys is None:
keys = [c[0] for c in cursor.description]
self = cls(keys)
self.rows = [tuple(r) for r in cursor]
return self
[docs]
def asarray(self, simplify_keys=False, hdf_compat=False):
"""Get a numpy structured array containing a copy of this data. The
names of the fields are taken from self.keys.
Args:
simplify_keys: If True, then the keys are stripped of any
prefix (such as 'base.'). This is mostly for DetDb, where
the table name can be annoying. An error is thrown if
this results in duplicate field names.
hdf_compat: If True, then 'U'-type columns (Unicode strings)
are converted to 'S'-type (byte strings), so it can be
stored in an HDF5 dataset.
"""
keys = [k for k in self.keys]
if simplify_keys: # remove prefixes
keys = [k.split('.')[-1] for k in keys]
assert(len(set(keys)) == len(keys)) # distinct.
columns = tuple(map(np.array, zip(*self.rows)))
if hdf_compat:
# Translate any Unicode columns to strings.
new_cols = []
for c in columns:
if c.dtype.char == 'U':
new_cols.append(c.astype('S'))
else:
new_cols.append(c)
columns = new_cols
dtype = [(k, c.dtype, c.shape[1:]) for k, c in zip(keys, columns)]
output = np.ndarray(shape=len(columns[0]), dtype=dtype)
for k, c in zip(keys, columns):
output[k] = c
return output
[docs]
def distinct(self):
"""
Returns a ResultSet that is a copy of the present one, with
duplicates removed. The rows are sorted (according to python
sort).
"""
return self.__class__(self.keys, sorted(list(set(self.rows))))
[docs]
def strip(self, patterns=[]):
"""For any keys that start with a string in patterns, remove that
string prefix from the key. Operates in place.
"""
for i, k in enumerate(self.keys):
for p in patterns:
if k.startswith(p):
self.keys[i] = k[len(p):]
break
assert(len(self.keys) == len(set(self.keys)))
[docs]
def to_axismanager(self, axis_name="dets", axis_key="dets"):
"""Build an AxisManager directly from a ResultSet, projecting all columns
along a single axis. This requires no additional metadata to build
Args:
axis_name: string, name of the axis in the AxisManager
axis_key: string, name of the key in the ResultSet to put into the
axis labels. This key will not be added to the AxisManager
fields.
"""
from sotodlib import core
aman = core.AxisManager(
core.LabelAxis(axis_name, self[axis_key])
)
for k in self.keys:
if k == axis_key:
continue
if any([ x is None for x in self[k]]):
raise TypeError("None(s) found in key {}, these cannot be ".format(k)+
"nicely wrapped into an AxisManager")
aman.wrap(k, self[k], [(0,axis_name)])
return aman
def restrict_dets(self, restriction, detdb=None):
# There are 4 classes of keys:
# - dets:* keys appearing only in restriction
# - dets:* keys appearing only in self
# - dets:* keys appearing in both
# - other.
new_keys = [k for k in restriction if k.startswith('dets:')]
match_keys = []
for k in self.keys:
if k in new_keys:
match_keys.append(k)
new_keys.remove(k)
other_keys = [k for k in self.keys if k not in match_keys]
output_keys = new_keys + match_keys + other_keys # disjoint.
output_rows = []
for row in self:
row = dict(row) # copy
for k in match_keys:
if row[k] != restriction[k]:
break
else:
# You passed.
row.update({k: restriction[k] for k in new_keys})
output_rows.append([row[k] for k in output_keys])
# That's all.
return self.__class__(output_keys, output_rows)
# Everything else is just implementing container-like behavior
def __repr__(self):
keystr = 'empty'
if self.keys is not None:
keystr = ','.join(self.keys)
return ('{}<[{}], {} rows>'.format(self.__class__.__name__,
keystr, len(self)))
def __len__(self):
return len(self.rows)
def append(self, item):
vals = []
for k in self.keys:
if k not in item.keys():
raise ValueError(f"Item to append must include key '{k}'")
vals.append(item[k])
self.rows.append(tuple(vals))
def extend(self, items):
if not isinstance(items, ResultSet):
raise TypeError("Extension only valid for two ResultSet objects.")
if self.keys != items.keys:
raise ValueError("Keys do not match: {} <- {}".format(
self.keys, items.keys))
self.rows.extend(items.rows)
def __getitem__(self, item):
# Simple row look-up... convert to dict.
if isinstance(item, int) or isinstance(item, np.integer):
return OrderedDict([(k,v) for k, v in
zip(self.keys, self.rows[item])])
# Look-up by column...
if isinstance(item, str):
index = self.keys.index(item)
return np.array([x[index] for x in self.rows])
# Slicing.
output = self.__class__(self.keys, self.rows[item])
return output
def __iadd__(self, other):
self.extend(other)
return self
def __add__(self, other):
output = self.copy()
output += other
return output
@staticmethod
def concatenate(items, axis=0):
assert(axis == 0)
output = items[0].copy()
for item in items[1:]:
output += item
return output
[docs]
def merge(self, src):
"""Merge with src, which must have same number of rows as self.
Duplicate columns are not allowed.
"""
if len(self) != len(src):
raise ValueError("self and src have different numbers of rows.")
for k in src.keys:
if k in self.keys:
raise ValueError("Duplicate key: %s" % k)
new_keys = self.keys + src.keys
new_rows = [r0 + r1 for r0, r1 in zip(self.rows, src.rows)]
self.keys, self.rows = new_keys, new_rows