Source code for sotodlib.core.axisman

import numpy as np
from collections import OrderedDict as odict

import scipy.sparse as sparse
## "temporary" fix to deal with scipy>1.8 changing the sparse setup
try:
    from scipy.sparse import csr_array
except ImportError:
    from scipy.sparse import csr_matrix as csr_array

from .util import get_coindices


class AxisInterface:
    """Abstract base class for axes managed by AxisManager."""

    count = None
    name = None

    def __init__(self, name):
        self.name = name

    def __repr__(self):
        raise NotImplementedError

    def _minirepr_(self):
        return self.__repr__()

    def copy(self):
        raise NotImplementedError
    
    def rename(self, name):
        self.name = name

    def resolve(self, src, axis_index=None):
        """Perform a check or promote-and-check of this Axis against a data
        object.

        The promotion step only applies to "unset" Axes, i.e. those
        here count is None.  Not all Axis types will be able to
        support this.  Promotion involves inspection of src and
        axis_index to fix free parameters in the Axis.  If promotion
        is successful, a new ("set") Axis is returned.  If promotion
        is attempted and fails then a ValueError is raised.X

        The check step involes confirming that the data object
        described by src (and axis_index) is compatible with the
        current axis (or with the axis resulting from axis Promotion).
        Typically that simply involves confirming that
        src.shape[axis_index] == self.count.  If the check fails, a
        ValueError is raised.

        Arguments:
          src: The data object to be wrapped (e.g. a numpy array)
          axis_index: The index of the data object to test for
            compatibility.

        Returns:
          axis: Either self, or the result of promotion.

        """
        # The default implementation performs the check step.
        # Subclasses may attempt promotion, then call this.
        ref_count = src.shape[axis_index]
        if self.count != ref_count:
            raise ValueError(
                "Dimension %i of data is incompatible with axis %s" %
                (axis_index, repr(self)))
        return self

    def restriction(self, selector):
        """Apply `selector` to the elements of this axis, returning a new Axis
        of the same type and an array indexing object (a slice or an
        array of integer indices) that may be used to extract the
        corresponding elements from a vector.

        See class header for acceptable selector objects.

        Returns (new_axis, ar_index).

        """
        raise NotImplementedError

    def intersection(self, friend, return_slices=False):
        """Find the intersection of this Axis and the friend Axis, returning a
        new Axis of the same type.  Optionally, also return array
        indexing objects that select the common elements from array
        dimensions corresponding to self and friend, respectively.

        See class header for acceptable selector objects.

        Returns (new_axis), or (new_axis, ar_index_self,
        ar_index_friend) if return_slices is True.

        """
        raise NotImplementedError


[docs] class IndexAxis(AxisInterface): """This class manages a simple integer-indexed axis. When intersecting data, the longer one will be simply truncated to match the shorter. Selectors must be slice objects (with stride 1!) or tuples to be passed into slice(), e.g. (0, 1000) or (0, None, 1).. """
[docs] def __init__(self, name, count=None): super().__init__(name) self.count = count
def copy(self): return IndexAxis(self.name, self.count) def __repr__(self): return 'IndexAxis(%s)' % self.count
[docs] def resolve(self, src, axis_index=None): if self.count is None: return IndexAxis(self.name, src.shape[axis_index]) return super().resolve(src, axis_index)
[docs] def restriction(self, selector): if not isinstance(selector, slice): sl = slice(*selector) else: sl = selector start, stop, stride = sl.indices(self.count) assert stride == 1 assert stop <= self.count return IndexAxis(self.name, stop - start), sl
[docs] def intersection(self, friend, return_slices=False): count_out = min(self.count, friend.count) ax = IndexAxis(self.name, count_out) if return_slices: return ax, slice(count_out), slice(count_out) else: return ax
[docs] class OffsetAxis(AxisInterface): """This class manages an integer-indexed axis, with an accounting for an integer offset of any single vector relative to some absolute reference point. For example, one vector could could have 100 elements at offset 50, and a second vector could have 100 elements at offset -20. On intersection, the result would have 30 elements at offset 50. The property `origin_tag` may be used to identify the absolute reference point. It could be a TOD name ('obs_2020-12-01') or a timestamp or whatever. Selectors must be slice objects (with stride 1!) or tuples to be passed into slice(), e.g. (0, 1000) or (0, None, 1). """ origin_tag = None offset = 0
[docs] def __init__(self, name, count=None, offset=0, origin_tag=None): super().__init__(name) self.count = count self.offset = offset self.origin_tag = origin_tag
def copy(self): return OffsetAxis(self.name, self.count, self.offset, self.origin_tag) def __repr__(self): return 'OffsetAxis(%s:%s%+i)' % ( self.count, self.origin_tag, self.offset) def _minirepr_(self): return 'OffsetAxis(%s)' % (self.count)
[docs] def resolve(self, src, axis_index=None): if self.count is None: return OffsetAxis(self.name, src.shape[axis_index]) return super().resolve(src, axis_index)
[docs] def restriction(self, selector): if not isinstance(selector, slice): sl = slice(*selector) else: sl = selector start, stop, stride = sl.indices(self.count + self.offset) assert stride == 1 assert start >= self.offset assert stop <= self.offset + self.count return (OffsetAxis(self.name, stop - start, start, self.origin_tag), slice(start - self.offset, stop - self.offset, stride))
[docs] def intersection(self, friend, return_slices=False): offset = max(self.offset, friend.offset) count = min(self.count + self.offset, friend.count + friend.offset) - offset count = max(count, 0) ax = OffsetAxis(self.name, count, offset, self.origin_tag) if return_slices: return ax, \ slice(offset - self.offset, count + offset - self.offset), \ slice(offset - friend.offset, count + offset - friend.offset) else: return ax
[docs] class LabelAxis(AxisInterface): """This class manages a string-labeled axis, i.e., an axis where each element has been given a unique name. The vector of names can be found in self.vals. Instantiation with labels that are not strings will raise a TypeError. On intersection of two vectors, only elements whose names appear in both axes will be preserved. Selectors should be lists (or arrays) of label strings. """
[docs] def __init__(self, name, vals=None): super().__init__(name) if vals is not None: if len(vals): vals = np.array(vals) else: vals = np.array([], dtype=np.str_) if vals.dtype.type is not np.str_: raise TypeError( 'LabelAxis labels must be strings not %s' % vals.dtype) self.vals = vals
@property def count(self): if self.vals is None: return None return len(self.vals) def __repr__(self): if self.vals is None: items = ['?'] elif len(self.vals) > 20: items = ([repr(v) for v in self.vals[:3]] + ['...'] + [repr(v) for v in self.vals[-4:]]) else: items = [repr(v) for v in self.vals] return 'LabelAxis(%s:' % self.count + ','.join(items) + ')' def _minirepr_(self): return 'LabelAxis(%s)' % (self.count) def copy(self): return LabelAxis(self.name, self.vals)
[docs] def resolve(self, src, axis_index=None): if self.count is None: raise RuntimeError( 'LabelAxis cannot be naively promoted from data.') return super().resolve(src, axis_index)
[docs] def restriction(self, selector): # Selector should be list of vals or a mask. Returns new axis and the # indices into self.vals that project out the elements. if self.vals is not None and isinstance(selector, np.ndarray) and selector.dtype == bool: selector = self.vals[selector] _, i0, i1 = get_coindices(selector, self.vals) assert len(i0) == len(selector) # not a strict subset! return LabelAxis(self.name, selector), i1
[docs] def intersection(self, friend, return_slices=False): _vals, i0, i1 = get_coindices(self.vals, friend.vals) ax = LabelAxis(self.name, _vals) if return_slices: return ax, i0, i1 else: return ax
[docs] class AxisManager: """A container for numpy arrays and other multi-dimensional data-carrying objects (including other AxisManagers). This object keeps track of which dimensions of each object are concordant, and allows one to slice all hosted data simultaneously. """
[docs] def __init__(self, *args): self._axes = odict() self._assignments = {} # data_name -> [ax0_name, ax1_name, ...] self._fields = odict() for a in args: if isinstance(a, AxisManager): # merge in the axes and copy in the values. self.merge(a) elif isinstance(a, AxisInterface): self._axes[a.name] = a.copy() else: raise ValueError("Cannot handle type %s in constructor." % a)
@property def shape(self): return tuple([a.count for a in self._axes.values()]) def copy(self, axes_only=False): out = AxisManager() for k, v in self._axes.items(): out._axes[k] = v if axes_only: return out for k, v in self._fields.items(): if np.isscalar(v) or v is None: out._fields[k] = v else: out._fields[k] = v.copy() for k, v in self._assignments.items(): out._assignments[k] = v.copy() return out def _managed_ids(self): ids = [id(self)] for v in self._fields.values(): if isinstance(v, AxisManager): ids.extend(v._managed_ids()) return ids def __delitem__(self, name): if name in self._fields: del self._fields[name] del self._assignments[name] elif name in self._axes: del self._axes[name] for v in self._assignments.values(): for i, n in enumerate(v): if n == name: v[i] = None else: raise KeyError(name)
[docs] def move(self, name, new_name): """Rename or remove a data field. To delete the field, pass new_name=None. """ if new_name is None: del self._fields[name] del self._assignments[name] else: self._fields[new_name] = self._fields.pop(name) self._assignments[new_name] = self._assignments.pop(name) return self
def add_axis(self, a): assert isinstance( a, AxisInterface) self._axes[a.name] = a.copy() def __contains__(self, name): return name in self._fields or name in self._axes def __getitem__(self, name): if name in self._fields: return self._fields[name] if name in self._axes: return self._axes[name] raise KeyError(name) def __setitem__(self, name, val): if name in self._fields: self._fields[name] = val else: raise KeyError(name) def __setattr__(self, name, value): # Assignment to members update those members if "_fields" in self.__dict__ and name in self._fields.keys(): self._fields[name] = value else: # Other assignments update this object self.__dict__[name] = value def __getattr__(self, name): # Prevent members from override special class members. if name.startswith("__"): raise AttributeError(name) return self[name] def __dir__(self): return sorted(tuple(self.__dict__.keys()) + tuple(self.keys())) def keys(self): return list(self._fields.keys()) + list(self._axes.keys()) def get(self, key, default=None): if key in self: return self[key] return default def shape_str(self, name): if np.isscalar(self._fields[name]) or self._fields[name] is None: return '' s = [] for n, ax in zip(self._fields[name].shape, self._assignments[name]): if ax is None: s.append('%i' % n) else: s.append('%s' % ax) return ','.join(s) def __repr__(self): def branch_marker(name): return '*' if isinstance(self._fields[name], AxisManager) else '' stuff = (['%s%s[%s]' % (k, branch_marker(k), self.shape_str(k)) for k in self._fields.keys()] + ['%s:%s' % (k, v._minirepr_()) for k, v in self._axes.items()]) return ("{}(".format(type(self).__name__) + ', '.join(stuff).replace('[]', '') + ")")
[docs] @staticmethod def concatenate(items, axis=0, other_fields='exact'): """Concatenate multiple AxisManagers along the specified axis, which can be an integer (corresponding to the order in items[0]._axes) or the string name of the axis. This operation is difficult to sanity check so it's best to use it only in simple, controlled cases! The first item is given significant privilege in defining what fields are relevant. Fields that appear in the first item, but do not share the target axis, will be treated as follows depending on the value of other_fields: - If other_fields='exact' will compare entries in all items and if they're identical will add it. Otherwise will fail with a ValueError. - If other_fields='fail', the function will fail with a ValueError. - If other_fields='first', the values from the first element of items will be copied into the output. - If other_fields='drop', the fields will simply be ignored (and the output will only contain fields that share the target axis). """ assert other_fields in ['exact', 'fail', 'first', 'drop'] if not isinstance(axis, str): axis = list(items[0]._axes.keys())[axis] fields = [] for name in items[0]._fields.keys(): ax_dim = None for i, ax in enumerate(items[0]._assignments[name]): if ax == axis: if ax_dim is not None: raise ValueError('Entry %s has axis %s on more than ' '1 dimension.' % (name, axis)) ax_dim = i if ax_dim is not None: fields.append((name, ax_dim)) # Design the new axis. vals = np.hstack([item._axes[axis].vals for item in items]) new_ax = LabelAxis(axis, vals) # Concatenate each entry. new_data = {} for name, ax_dim in fields: shape0 = None keepers = [] for item in items: shape1 = list(item._fields[name].shape) if 0 in shape1: continue shape1[ax_dim] = -1 # This dim doesn't have to match. if shape0 is None: shape0 = shape1 elif shape0 != shape1: raise ValueError('Field %s has incompatible shapes: ' % name + '%s and %s' % (shape0, shape1)) keepers.append(item._fields[name]) if len(keepers) == 0: # Well we tried. keepers = [items[0]._fields[name]] # Call class-specific concatenation if needed. if isinstance(keepers[0], AxisManager): new_data[name] = AxisManager.concatenate( keepers, axis=ax_dim, other_fields=other_fields) elif isinstance(keepers[0], np.ndarray): new_data[name] = np.concatenate(keepers, axis=ax_dim) elif isinstance(keepers[0], csr_array): # Note in scipy 1.11 the default format for vstack # and/or hstack seems to have change, as we started # seeing induced cso format here. Force preservation # of incoming format. if ax_dim == 0: new_data[name] = sparse.vstack(keepers, format=keepers[0].format) elif ax_dim == 1: new_data[name] = sparse.hstack(keepers, format=keepers[0].format) else: raise ValueError('sparse arrays cannot concatenate along ' f'axes greater than 1, received {ax_dim}') else: # The general compatible object should have a static # method called concatenate. new_data[name] = keepers[0].concatenate(keepers, axis=ax_dim) # Construct. new_axes = [] for ax_name, ax_def in items[0]._axes.items(): if ax_name == axis: ax_def = new_ax new_axes.append(ax_def) output = AxisManager(*new_axes) for k, v in items[0]._assignments.items(): axis_map = [(i, n) for i, n in enumerate(v) if n is not None] if isinstance(items[0][k], AxisManager): axis_map = None # wrap doesn't like this. if k in new_data: output.wrap(k, new_data[k], axis_map) else: if other_fields == "exact": ## if every item named k is a scalar err_msg = (f"The field '{k}' does not share axis '{axis}'; " f"{k} is not identical across all items " f"pass other_fields='drop' or 'first' or else " f"remove this field from the targets.") if np.any([np.isscalar(i[k]) for i in items]): if not np.all([np.isscalar(i[k]) for i in items]): raise ValueError(err_msg) if not np.all([np.array_equal(i[k], items[0][k], equal_nan=True) for i in items]): raise ValueError(err_msg) output.wrap(k, items[0][k], axis_map) continue elif not np.all([i[k].shape==items[0][k].shape for i in items]): raise ValueError(err_msg) elif not np.all([np.array_equal(i[k], items[0][k], equal_nan=True) for i in items]): raise ValueError(err_msg) output.wrap(k, items[0][k].copy(), axis_map) elif other_fields == 'fail': raise ValueError( f"The field '{k}' does not share axis '{axis}'; " f"pass other_fields='drop' or 'first' or else " f"remove this field from the targets.") elif other_fields == 'first': # Just copy it. if np.isscalar(items[0][k]): output.wrap(k, items[0][k], axis_map) else: output.wrap(k, items[0][k].copy(), axis_map) elif other_fields == 'drop': pass return output
# Add and remove data while maintaining internal consistency.
[docs] def wrap(self, name, data, axis_map=None, accept_new=True, accept_merge=True): """Add data into the AxisManager. Arguments: name (str): name of the new data. data: The data to register. This must be of an acceptable type, i.e. a numpy array or another AxisManager. If scalar (or None) then data will be directly added to _fields with no associated axis. axis_map: A list that assigns dimensions of data to particular Axes. Each entry in the list must be a tuple with the form (dim, name) or (dim, ax), where dim is the index of the dimension being described, name is a string giving the name of an axis already described in the present object, and ax is an AxisInterface object. """ # Don't permit AxisManager reference loops! if isinstance(data, AxisManager): assert(id(self) not in data._managed_ids()) assert(axis_map is None) axis_map = [(i, v) for i, v in enumerate(data._axes.values())] # Handle scalars if np.isscalar(data) or data is None: if name in self._fields: raise ValueError(f'Key: {name} already found in {self}') if np.iscomplex(data): # Complex values aren't supported by HDF scheme right now. raise ValueError(f'Cannot store complex value as scalar.') if isinstance(data, (np.integer, np.floating, np.str_, np.bool_)): # Convert sneaky numpy scalars to native python int/float/str data = data.item() self._fields[name] = data self._assignments[name] = [] return self # Promote input data to a full AxisManager, so we can call up # to self.merge. helper = AxisManager() assign = [None for s in data.shape] # Resolve each axis declaration into an axis object, and check # for conflict. If an axis is passed by name only, the # dimensions must agree with self. If a full axis definition # is included, then intersection will be performed, later. if axis_map is not None: for index, axis in axis_map: if not isinstance(axis, AxisInterface): # So it better be a string label... that we've heard of. if axis not in self._axes: raise ValueError("Axis assignment refers to unknown " "axis '%s'." % axis) axis = self._axes[axis] axis = axis.resolve(data, index) helper._axes[axis.name] = axis assign[index] = axis.name helper._fields[name] = data helper._assignments[name] = assign return self.merge(helper)
[docs] def wrap_new(self, name, shape=None, cls=None, **kwargs): """Create a new object and wrap it, with axes mapped. The shape can include axis names instead of ints, and that will cause the new object to be dimensioned properly and its axes mapped. Args: name (str): name of the new data. shape (tuple of int and std): shape in the same sense as numpy, except that instead of int it is allowed to pass the name of a managed axis. cls (callable): Constructor that should be used to construct the object; it will be called with all kwargs passed to this function, and with the resolved shape as described here. Defaults to numpy.ndarray. Examples: Construct a 2d array and assign it the name 'boresight_quat', with its first axis mapped to the AxisManager tod's "samps" axis: >>> tod.wrap_new('boresight_quat', shape=('samps', 4), dtype='float64') Create a new empty RangesMatrix, carrying a per-det, per-samp flags: >>> tod.wrap_new('glitch_flags', shape=('dets', 'samps'), cls=so3g.proj.RangesMatrix.zeros) """ if cls is None: cls = np.zeros # Turn the shape into a tuple of ints and an axis map. shape_ints, axis_map = [], [] for dim, s in enumerate(shape): if isinstance(s, int): shape_ints.append(s) elif isinstance(s, str): if s in self._axes: shape_ints.append(self._axes[s].count) axis_map.append((dim, self._axes[s])) else: raise ValueError(f'shape includes axis "{s}" which is ' f'not in _axes: {self._axes}') elif isinstance(s, AxisInterface): # Sure, why not. shape_ints.append(s.count) axis_map.append((dim, s.copy())) data = cls(shape=shape_ints, **kwargs) return self.wrap(name, data, axis_map=axis_map)[name]
[docs] def restrict_axes(self, axes, in_place=True): """Restrict this AxisManager by intersecting it with a set of Axis definitions. Arguments: axes (list or dict of Axis): in_place (bool): If in_place == True, the intersection is applied to self. Otherwise, a new object is returned, with data copied out. Returns: The restricted AxisManager. """ if in_place: dest = self else: dest = self.copy(axes_only=True) dest._assignments.update(self._assignments) sels = {} # If simple list/tuple of Axes is passed in, convert to dict if not isinstance(axes, dict): axes = {ax.name: ax for ax in axes} for name, ax in axes.items(): if name not in dest._axes: continue if dest._axes[name].count is None: dest._axes[name] = ax continue _, sel0, sel1 = ax.intersection(dest._axes[name], True) sels[name] = sel1 dest._axes[ax.name] = ax for k, v in self._fields.items(): if isinstance(v, AxisManager): dest._fields[k] = v.restrict_axes(axes, in_place=in_place) elif np.isscalar(v) or v is None: dest._fields[k] = v else: # I.e. an ndarray. sslice = [sels.get(ax, slice(None)) for ax in dest._assignments[k]] sslice = tuple(dest._broadcast_selector(sslice)) sslice = simplify_slice(sslice, v.shape) dest._fields[k] = v[sslice] return dest
@staticmethod def _broadcast_selector(sslice): """sslice is a list of selectors, which will typically be slice(), or an array of indexes. Returns a similar list of selectors, but with any indexing selectors promoted to a higher dimensionality so that the output object will be broadcast to the desired shape. For example if the input is (array([0,1]), slice(0,100,2), array([12,13,14])) then the output will be (array([[0],[1]]), slice(0,100,2), array([12,13,14])) and the result can then be used to index an array and produce a view with shape (2,50,3). """ ex_dim = 0 output = [s for s in sslice] for i in range(len(sslice) - 1, -1, -1): if isinstance(sslice[i], np.ndarray): output[i] = sslice[i].reshape(sslice[i].shape + (1,)*ex_dim) ex_dim += 1 return tuple(output)
[docs] def restrict(self, axis_name, selector, in_place=True): """Restrict the AxisManager by selecting a subset of items in some Axis. The Axis definition and all data fields mapped to that axis will be modified. Arguments: axis_name (str): The name of the Axis. selector (slice or special): Selector, in a form understood by the underlying Axis class (see the .restriction method for the Axis). in_place (bool): If True, modifications are made to this object. Otherwise, a new object with the restriction applied is returned. Returns: The AxisManager with restrictions applied. """ if in_place: dest = self else: dest = self.copy(axes_only=True) dest._assignments.update(self._assignments) new_ax, sl = dest._axes[axis_name].restriction(selector) for k, v in self._fields.items(): if isinstance(v, AxisManager): dest._fields[k] = v.copy() if axis_name in v._axes: dest._fields[k].restrict( axis_name, selector, ## copies of axes made above in_place=True ) elif np.isscalar(v) or v is None: dest._fields[k] = v else: sslice = [sl if n == axis_name else slice(None) for n in dest._assignments[k]] sslice = dest._broadcast_selector(sslice) if in_place: dest._fields[k] = v[sslice] else: dest._fields[k] = v[sslice].copy() dest._axes[axis_name] = new_ax return dest
[docs] @staticmethod def intersection_info(*items): """Given a list of AxisManagers, scan the axes and combine (intersect) any common axes. Returns a dict that maps axis name to restricted Axis object. """ # Get the strictest intersection of each axis. axes_out = odict() for aman in items: for ax in aman._axes.values(): if ax.count is None: continue if ax.name not in axes_out: axes_out[ax.name] = ax.copy() else: axes_out[ax.name] = axes_out[ax.name].intersection( ax, False) return axes_out
[docs] def merge(self, *amans): """Merge the data from other AxisMangers into this one. Axes with the same name will be intersected. """ # Before messing with anything, check for key interference. fields = set(self._fields.keys()) for aman in amans: newf = set(aman._fields.keys()) both = fields.intersection(newf) if len(both): raise ValueError(f'Key conflict: more than one merge target ' f'shares keys: {both}') fields.update(newf) # Get the intersected axis descriptions. axes_out = self.intersection_info(self, *amans) # Reduce the data in self, update our axes. self.restrict_axes(axes_out) # Import the other ones. for aman in amans: aman = aman.restrict_axes(axes_out, in_place=False) for k, v in aman._axes.items(): if k not in self._axes: self._axes[k] = v for k, v in aman._fields.items(): assert(k not in self._fields) # Should have been caught in pre-check self._fields[k] = v self._assignments.update(aman._assignments) return self
[docs] def save(self, dest, group=None, overwrite=False, compression=None): """Write this AxisManager data to an HDF5 group. This is an experimental feature primarily intended to assist with debugging. The schema is subject to change, and it's possible that not all objects supported by AxisManager can be serialized. Args: dest (str or h5py.Group): Place to save it (in combination with group). group (str or None): Group within the HDF5 file (relative to dest). overwrite (bool): If True, remove any existing thing at the specified address before writing there. compression (str or None): Compression filter to apply. E.g. 'gzip'. This string is passed directly to HDF5 dataset routines. Notes: If dest is a string, it is taken to be an HDF5 filename and is opened in 'a' mode. The group, in that case, is the full group name in the file where the data should be written. If dest is an h5py.Group, the group is the group name in the file relative to dest. The overwrite argument only matters if group is passed as a string. A RuntimeError is raised if the group address already exists and overwrite==False. For example, these are equivalent:: # Filename + group address: axisman.save('test.h5', 'x/y/z') # Open h5py.File + group address: with h5py.File('test.h5', 'a') as h: axisman.save(h, 'x/y/z') # Partial group address with h5py.File('test.h5', 'a') as h: g = h.create_group('x/y') axisman.save(g, 'z') When passing a filename, the code probably won't use a context manager... so if you want that protection, open your own h5py.File as in the 2nd and 3rd example. """ from .axisman_io import _save_axisman return _save_axisman(self, dest, group=group, overwrite=overwrite, compression=compression)
[docs] @classmethod def load(cls, src, group=None, fields=None): """Load a saved AxisManager from an HDF5 file and return it. See docs for save() function. The (src, group) args are combined in the same way as (dest, group) in the save function. Examples:: axisman = AxisManager.load('test.h5', 'x/y/z') with h5py.File('test.h5', 'r') as h: axisman = AxisManager.load(h, 'x/y/z') If the fields argument is specified, it must be a list of strings indicating what subfields of the stored AxisManager should be extracted. For nested entries, connect fields with ".". For example ``fields=['subaman.field1', 'subaman.field2']``. When fields is specified, _all_ axes from the AxisManager are included in the result, even if not directly referenced by the requested fields; this behavior is subject to change. """ from .axisman_io import _load_axisman return _load_axisman(src, group, cls, fields=fields)
def simplify_slice(sslice, shape): """Given a tuple of slices, such as what __getitem__ might produce, and the shape of the array it would be applied to, return a new tuple of slices that accomplices the same thing, but while avoiding costly general slices if possible.""" res = [] for n, s in zip(shape, sslice): # Numpy arrays slicing is expensive, and unnecessary if they just select # the same elemnts in the same order if isinstance(s, np.ndarray): # Is this a trivial numpy slice? If so, replace it if s.size == n and np.all(s == np.arange(n)): res.append(slice(None)) # Otherwise bail, and keep the whole original else: return sslice # For anything else just pass it through. This includes normal slices else: res.append(s) return tuple(res)