Source code for syncopy.datatype.base_data

# -*- coding: utf-8 -*-
#
# Syncopy's main abstract base class + helpers
#

# Builtin/3rd party package imports
import getpass
import socket
import time
import sys
import os
from abc import ABC, abstractmethod
from hashlib import blake2b
from itertools import chain
from types import GeneratorType
import shutil
import numpy as np
import h5py
import scipy as sp

# Local imports
import syncopy as spy
from .selector import Selector
from .util import TrialIndexer
from .methods.arithmetic import _process_operator
from .methods.selectdata import selectdata
from .methods.show import show
from syncopy.shared.tools import SerializableDict
from syncopy.shared.parsers import (
    array_parser,
    io_parser,
    filename_parser,
    data_parser,
)
from syncopy.shared.errors import (
    SPYInfo,
    SPYTypeError,
    SPYValueError,
    SPYError,
    SPYWarning,
)
from syncopy.datatype.methods.definetrial import definetrial as _definetrial
from syncopy import __version__, __storage__, __acme__, __sessionid__

if __acme__:
    import acme
    import dask


__all__ = []


[docs]class BaseData(ABC): """ Abstract base class for all data classes Data classes in Syncopy manage storing array data and metadata in HDF5 and JSON files, respectively. This base class contains the fundamental functionality shared across all data classes, that is, * properties for arrays that have a corresponding HDF5 datasets ('dataset properties') and the associated I/O * properties for data history (`BaseData.log` and `BaseData.cfg`) * methods and properties for defining trials on the data Further properties and methods are defined in subclasses, e.g. `syncopy.AnalogData`. """ #: properties that are written into the JSON file and HDF5 attributes upon save _infoFileProperties = ("dimord", "_version", "_log", "cfg", "info") _hdfFileAttributeProperties = ( "dimord", "_version", "_log", ) # all data types have a `trials` property _selectionKeyWords = ("trials",) #: properties that are mapped onto HDF5 datasets _hdfFileDatasetProperties = () # Checksum algorithm _checksum_algorithm = spy.__checksum_algorithm__.__name__ # Dummy allocations of class attributes that are actually initialized in subclasses _mode = None _stackingDimLabel = None # Set caller for `SPYWarning` to not have it show up as '<module>' _spwCaller = "BaseData.{}" # Attach data selection and output routines to make them available as class methods selectdata = selectdata show = show # Initialize hidden attributes used by all children _filename = None _trialdefinition = None _dimord = None _mode = None _lhd = ( "\n\t\t>>> SyNCopy v. {ver:s} <<< \n\n" + "Created: {timestamp:s} \n\n" + "System Profile: \n" + "{sysver:s} \n" + "ACME: {acver:s}\n" + "Dask: {daver:s}\n" + "NumPy: {npver:s}\n" + "SciPy: {spver:s}\n\n" + "--- LOG ---" ) _log_header = _lhd.format( ver=__version__, timestamp=time.asctime(), sysver=sys.version, acver=acme.__version__ if __acme__ else "--", daver=dask.__version__ if __acme__ else "--", npver=np.__version__, spver=sp.__version__, ) _log = "" @property @classmethod @abstractmethod def _defaultDimord(cls): return NotImplementedError @property def _stackingDim(self): if any(["DiscreteData" in str(base) for base in self.__class__.__mro__]): return 0 else: if self._stackingDimLabel is not None and self.dimord is not None: return self.dimord.index(self._stackingDimLabel) @property def cfg(self): """Dictionary of previous operations on data""" return self._cfg @cfg.setter def cfg(self, dct): """For loading only, for processing the frontends extend the existing (empty) cfg dictionary""" if not isinstance(dct, dict): raise SPYTypeError(dct, varname="cfg", expected="dictionary-like object") self._cfg = dct @property def info(self): """Dictionary of auxiliary meta information""" return self._info @info.setter def info(self, dct): """ Users usually want to extend the existing info dictionary, however it is possible to completely overwrite with a new dict """ if not isinstance(dct, dict): raise SPYTypeError(dct, varname="info", expected="dictionary-like object") self._info = SerializableDict(dct) @property def container(self): try: return filename_parser(self.filename)["container"] except SPYValueError: return None except Exception as exc: raise exc
[docs] def _register_dataset(self, propertyName, inData=None): """ Register a new dataset, so that it is handled during saving, comparison, copy and other operations. This dataset is not managed in any way during parallel operations and is intended for holding additional data things like statistics. Thus it is NOT safe to use this in a multi-threaded/parallel context, like in a compute function (cF). Parameters ---------- propertyName : str The name for the new dataset, this will be used as the dataset name in the hdf5 container when saving. It will be added as an attribute named `'_' + propertyName` to this SyncopyData object. Note that this means that your propertyName must not clash with other attribute names of syncopy data objects. To ensure the latter, it is recommended to use names with a prefix like `'dset_'`. Clashes will be detected and result in errors. in_data : None or np.ndarray or h5py.Dataset The data to store. Must have the final number of dimensions you want. """ if not propertyName in self._hdfFileDatasetProperties: self._hdfFileDatasetProperties = self._hdfFileDatasetProperties + (propertyName,) # trivial case if inData is None: setattr(self, "_" + propertyName, None) return supportedSetters = { np.ndarray: self._set_dataset_property_with_ndarray, h5py.Dataset: self._set_dataset_property_with_dataset, } try: # same attribute for both ndarray and hdf5 dataset ndim = inData.ndim except AttributeError: msg = "HDF5 dataset, or NumPy array" raise SPYTypeError(inData, varname="data", expected=msg) supportedSetters[type(inData)](inData, propertyName, ndim=ndim)
[docs] def _unregister_dataset(self, propertyName, del_from_file=True, del_attr=True): """ Unregister and delete an additional dataset from the Syncopy data object, and optionally delete it from the backing hdf5 file. Assumes that the backing h5py file is open in writeable mode. Parameters ---------- propertyName : str The name of the entry in `self._hdfFileDatasetProperties` to remove. The attribute named `'_' + propertyName` of this SyncopyData object will be deleted. del_from_file: bool Whether to remove the dataset named 'propertyName' from the backing hdf5 file on disk. del_attr: bool Whether to remove the dataset attribute from the Syncopy data object. """ if del_attr: if propertyName in self._hdfFileDatasetProperties: tmp_list = list(self._hdfFileDatasetProperties) tmp_list.remove(propertyName) self._hdfFileDatasetProperties = tuple(tmp_list) if hasattr(self, "_" + propertyName): delattr(self, "_" + propertyName) if del_from_file: if self.mode == "r": lgl = "HDF5 dataset with write or copy-on-write access" act = "read-only file" raise SPYValueError(legal=lgl, varname="mode", actual=act) if isinstance(self._data, h5py.Dataset): if isinstance(self._data.file, h5py.File): if propertyName in self._data.file.keys(): del self._data.file[propertyName] else: SPYWarning("Could not delete dataset from file.")
[docs] def _update_dataset(self, propertyName, inData=None): """ Resets an additional dataset which was already registered via ``_register_dataset`` to ``inData``. """ if getattr(self, "_" + propertyName) is not None: self._unregister_dataset(propertyName) self._register_dataset(propertyName, inData)
[docs] def _set_dataset_property(self, inData, propertyName, ndim=None): """Set property that is streamed from HDF dataset ('dataset property') This method automatically selects the appropriate set method according to the type of the input data (`dataIn`). Parameters ---------- dataIn : str, np.ndarray, or h5py.Dataset, list, generator Filename, array, list of arrays, list of syncopy objects, HDF5 dataset or generator object to be stored in property propertyName : str Name of the property. The actual data must reside in the attribute `"_" + propertyName` ndim : int Number of expected array dimensions. """ if propertyName == "data": if any(["DiscreteData" in str(base) for base in self.__class__.__mro__]): ndim = 2 if ndim is None: ndim = len(self._defaultDimord) supportedSetters = { GeneratorType: self._set_dataset_property_with_generator, list: self._set_dataset_property_with_list, str: self._set_dataset_property_with_str, np.ndarray: self._set_dataset_property_with_ndarray, h5py.Dataset: self._set_dataset_property_with_dataset, type(None): self._set_dataset_property_with_none, } try: supportedSetters[type(inData)](inData, propertyName, ndim=ndim) except KeyError: msg = "filename of HDF5 file, HDF5 dataset, list, generator or NumPy array" raise SPYTypeError(inData, varname="data", expected=msg)
[docs] def _set_dataset_property_with_none(self, inData, propertyName, ndim): """Set a dataset property to None""" setattr(self, "_" + propertyName, None)
[docs] def _set_dataset_property_with_str(self, filename, propertyName, ndim): """Set a dataset property with a filename str Parameters ---------- filename : str A filename pointing to a HDF5 file containing the dataset `propertyName`. propertyName : str Name of the property to be filled with the dataset ndim : int Number of expected array dimensions. """ fpath, fname = io_parser(filename, varname="filename", isfile=True, exists=True) filename = os.path.join(fpath, fname) # ensure `filename` is absolute path md = self.mode if md == "w": md = "r+" isHdf = False try: h5f = h5py.File(filename, mode=md) isHdf = True except OSError as exc: err = "HDF5: " + str(exc) if not isHdf: raise SPYValueError("accessible HDF5 file", actual=err, varname="data") h5keys = list(h5f.keys()) if propertyName not in h5keys and len(h5keys) != 1: lgl = "HDF5 file with only one 'data' dataset or single dataset of arbitrary name" act = "HDF5 file holding {} data-objects" raise SPYValueError(legal=lgl, actual=act.format(str(len(h5keys))), varname=propertyName) if len(h5keys) == 1: setattr(self, propertyName, h5f[h5keys[0]]) else: setattr(self, propertyName, h5f[propertyName]) self.filename = filename
[docs] def _set_dataset_property_with_ndarray(self, inData, propertyName, ndim): """Set a dataset property with a NumPy array If no data exists, a backing HDF5 dataset will be created. Parameters ---------- inData : numpy.ndarray NumPy array to be stored in property of name `propertyName` propertyName : str Name of the property to be filled with `inData`. Will get an underscore (`'_'`) prefix added, so do not include that. ndim : int Number of expected array dimensions. """ # Ensure array has right no. of dimensions array_parser(inData, varname=f"{propertyName}", dims=ndim) # Gymnastics for `DiscreteData` objects w/non-standard `dimord`s. # This only applies to the 'main' dataset called 'data'. The checks are not needed # for additional, sequential datasets which people may attach. if propertyName == "data": self._check_dataset_property_discretedata(inData) else: if not hasattr(self, "_" + propertyName): setattr(self, "_" + propertyName, None) # Prevent error on gettattr call below. # If there is existing data, replace values if shape and type match if isinstance(getattr(self, "_" + propertyName), h5py.Dataset): prop = getattr(self, "_" + propertyName) if self.mode == "r": lgl = "dataset with write or copy-on-write access" act = "read-only file" raise SPYValueError(legal=lgl, varname=propertyName, actual=act) if prop.shape != inData.shape: lgl = "dataset with shape {}".format(str(prop.shape)) act = "data with shape {}".format(str(inData.shape)) raise SPYValueError(legal=lgl, varname=propertyName, actual=act) if prop.dtype != inData.dtype: lgl = "dataset of type {}".format(prop.dtype.name) act = "data of type {}".format(inData.dtype.name) raise SPYValueError(legal=lgl, varname=propertyName, actual=act) prop[...] = inData # or create backing file on disk else: if self.filename is None: self.filename = self._gen_filename() if propertyName not in self._hdfFileDatasetProperties: if getattr(self, "_" + propertyName) is not None and not isinstance( getattr(self, "_" + propertyName), h5py.Dataset ): raise SPYValueError( legal="propertyName that does not clash with existing attributes", varname="propertyName", actual=propertyName, ) h5f = self._get_backing_hdf5_file_handle() if h5f is None: with h5py.File(self.filename, "w") as h5f: h5f.create_dataset(propertyName, data=inData) else: h5f.create_dataset(propertyName, data=inData) md = self.mode if md == "w": md = "r+" setattr(self, "_" + propertyName, h5py.File(self.filename, md)[propertyName])
[docs] def _set_dataset_property_with_dataset(self, inData, propertyName, ndim): """Set a dataset property with an already loaded HDF5 dataset Parameters ---------- inData : h5py.Dataset HDF5 dataset to be stored in property of name `propertyName` propertyName : str Name of the property to be filled with the dataset ndim : int Number of expected array dimensions. """ if inData.id.valid == 0: lgl = "open HDF5 file" act = "backing HDF5 file is closed" raise SPYValueError(legal=lgl, actual=act, varname="data") if propertyName == "data": # Ensure dataset has right no. of dimensions if inData.ndim != ndim: lgl = "{}-dimensional data".format(ndim) act = "{}-dimensional HDF5 dataset".format(inData.ndim) raise SPYValueError(legal=lgl, varname="data", actual=act) self._check_dataset_property_discretedata(inData) self.filename = inData.file.filename else: # creates hidden attribute behind the property on the fly if not hasattr(self, "_" + propertyName): setattr(self, "_" + propertyName, None) self._mode = inData.file.mode setattr(self, "_" + propertyName, inData)
[docs] def _set_dataset_property_with_list(self, inData, propertyName, ndim): """Set a dataset property with a list of NumPy arrays or syncopy data objects. Parameters ---------- inData : list list of :class:`numpy.ndarray`s or syncopy data objects. propertyName : str Name of the property to be filled with the concatenated array Can only be ``data`` for syncopy objects to be concatenated. ndim : int Number of expected array dimensions. """ # first catch empty lists if len(inData) == 0: msg = ( "Trying to set syncopy data with empty list, " f"setting `{propertyName}` dataset to `None`!" ) SPYWarning(msg) self._set_dataset_property_with_none(None, propertyName, ndim) return # check if we have consistent list entries check = np.sum([isinstance(val, np.ndarray) for val in inData]) # check has to be either 0 (no arrays) or len(inData) (all arrays) if check != 0 and check != len(inData): lgl = "consistent data types" act = "mix of NumPy arrays and other data types" raise SPYValueError(lgl, "data", act) # as we catched empty lists above, and checked against inconsistent # types we can do a hard instance check on the 1st entry only if isinstance(inData[0], np.ndarray): self._set_dataset_property_with_array_list(inData, propertyName, ndim) # alternatively must be all syncopy data objects else: for val in inData: data_parser(val) # this should not happen, as all derived classes hardcoded this in their setters if propertyName != "data": raise SPYError(f"Cannot concatenate syncopy objects for dataset {propertyName}") # if we landed here all is clear self._set_dataset_property_with_spy_list(inData, ndim)
[docs] def _set_dataset_property_with_array_list(self, inData, propertyName, ndim): """Set a dataset property with a list of NumPy arrays. Parameters ---------- inData : list list of :class:`numpy.ndarray`s Each array corresponds to a trial. Arrays are stacked together to fill dataset. propertyName : str Name of the property to be filled with the concatenated array ndim : int Number of expected array dimensions. """ # Check list entries: must be numeric, finite NumPy arrays for val in inData: array_parser(val, varname="data", hasinf=False, dims=ndim) # Ensure we don't have a mix of real/complex arrays if np.unique([np.iscomplexobj(val) for val in inData]).size > 1: lgl = "list of numeric NumPy arrays of same numeric type (real/complex)" act = "real and complex NumPy arrays" raise SPYValueError(legal=lgl, varname="data", actual=act) # Requirements for input arrays differ wrt data-class (`DiscreteData` always 2D) if any(["ContinuousData" in str(base) for base in self.__class__.__mro__]): # Ensure shapes match up if any(val.shape != inData[0].shape for val in inData): lgl = "NumPy arrays of identical shape" act = "NumPy arrays with mismatching shapes" raise SPYValueError(legal=lgl, varname="data", actual=act) trialLens = [val.shape[self.dimord.index("time")] for val in inData] else: # Ensure all arrays have shape `(N, nCol)`` if self.__class__.__name__ == "SpikeData": nCol = 3 else: # EventData nCol = inData[0].shape[1] if any(val.shape[1] != nCol for val in inData): lgl = "NumPy 2d-arrays with {} columns".format(nCol) act = "NumPy arrays of different shape" raise SPYValueError(legal=lgl, varname="data", actual=act) trialLens = [np.nanmax(val[:, self.dimord.index("sample")]) for val in inData] nTrials = len(trialLens) # Use constructed quantities to set up trial layout matrix accumSamples = np.cumsum(trialLens) trialdefinition = np.zeros((nTrials, 3)) trialdefinition[1:, 0] = accumSamples[:-1] trialdefinition[:, 1] = accumSamples if self.samplerate is not None: # set standard offset to -1s trialdefinition[:, 2] = -self.samplerate else: trialdefinition[:, 2] = 0 # Finally, concatenate provided arrays and let corresponding setting method # perform the actual HDF magic data = np.concatenate(inData, axis=self._stackingDim) self._set_dataset_property_with_ndarray(data, propertyName, ndim) self.trialdefinition = trialdefinition
[docs] def _set_dataset_property_with_spy_list(self, inData, ndim): """Set the `data` dataset property from a list of compatible syncopy data objects. This implements concatenation along trials of syncopy data objects. Parameters ---------- inData : list Non empty list of syncopy data objects, e.g. :class:`~syncopy.AnalogData`. Trials are stacked together to fill dataset. ndim : int Number of expected array dimensions. """ # -- dataset shape and object attribute inquiries -- # take the 1st non-empty object as reference i_ref = 0 # to avoid "probably undefined loop variable" linter warning for i_ref, spy_obj in enumerate(inData): if spy_obj.data is None: SPYWarning(f"Skipping empty dataset {spy_obj.filename} for concatenation") continue else: spy_obj_ref = spy_obj shape_ref = np.array(spy_obj.data.shape) if len(shape_ref) != ndim: lgl = f"dataset with dimension of {ndim}" act = f"got dataset with dimension {len(shape_ref)}" raise SPYValueError(lgl, "data", act) stacking_dim_ref = spy_obj._stackingDim # collect remaining attribute names like channel, freq, etc. attr_ref = [attr for attr in spy_obj._hdfFileAttributeProperties if not attr.startswith("_")] # boolean array to index non-stacking dimensions # for strict shape comparison bvec = np.ones(shape_ref.size, dtype=bool) bvec[stacking_dim_ref] = False break # now loop again and check against all others lgl = "compatible syncopy objects for concatenation" stack_count = 0 for spy_obj in inData[i_ref:]: if spy_obj.selection is not None: SPYWarning("In place selections will be ignored for concatenation!") if spy_obj.data is None: SPYWarning(f"Skipping empty dataset {spy_obj.filename} for concatenation") continue if spy_obj._stackingDim != stacking_dim_ref: act = f"different stacking dimensions, {stacking_dim_ref} and {spy_obj._stackingDim}" raise SPYValueError(lgl, "data", act) # catch mismatching dimensions (2d vs. 3d) if len(shape_ref) != len(spy_obj.data.shape): act = f"mismatching shapes, {tuple(shape_ref)} and {spy_obj.data.shape}" raise SPYValueError(lgl, "data", act) # shape tuple gets casted by numpy for array subtraction if not np.all((shape_ref - spy_obj.data.shape)[bvec] == 0): act = f"mismatching shapes, {tuple(shape_ref)} and {spy_obj.data.shape}" raise SPYValueError(lgl, "data", act) # check attributes like channel, freq, etc. # this also catches incompatible syncopy data types with same ndim, # e.g. SpectralData and CrossSpectralData for attr in spy_obj._hdfFileAttributeProperties: if attr.startswith("_"): continue attr_val = getattr(spy_obj, attr, None) if attr_val is None or attr not in attr_ref: act = f"missing attribute `{attr}` in {spy_obj.filename}" raise SPYValueError(lgl, "data", act) # now hard check values, should be all arrays/sequences # we want identical channel label, freq axis and so on.. if not np.all(getattr(spy_obj_ref, attr) == attr_val): act = f"different attribute values for `{attr}`" raise SPYValueError(lgl, "data", act) # finally increment stack count stack_count += spy_obj.data.shape[stacking_dim_ref] # now we have all we need to compute # the shape of the concatenated object res_shape = shape_ref res_shape[stacking_dim_ref] = stack_count # finally create the chained trial generator trl_gen = chain(*[spy_obj.trials for spy_obj in inData]) # this setter is only valid for empty (new) syncopy objects # hence it should be fine to potentially re-define the dimord here self._stackingDimLabel = spy_obj_ref._stackingDimLabel # and route through the generator setter self._set_dataset_property_with_generator( trl_gen, propertyName="data", ndim=len(res_shape), shape=res_shape ) # -- set attribute properties -- # attach dummy selection to reference object # for easy propagation of properties spy.selectdata(spy_obj_ref, inplace=True) # Get/set dimensional attributes for prop in spy_obj_ref.selection._dimProps: selection = getattr(spy_obj_ref.selection, prop) if selection is not None: if np.issubdtype(type(selection), np.number): selection = [selection] setattr(self, prop, getattr(spy_obj_ref, prop)[selection]) self.samplerate = spy_obj_ref.samplerate spy_obj_ref.selection = None
[docs] def _set_dataset_property_with_generator(self, gen, propertyName, ndim, shape=None): """ Create a dataset from a generator yielding (single trial) numpy arrays. If `shape` is not given fall back to HDF5 resizable datasets along the stacking dimension. Expects empty property - will not try to overwrite datasets with generators! Parameters ---------- gen : generator Generator yielding (single trial) numpy arrays. Their shapes have to match except along the `stacking_dim` ndim : int Number of dimensions of the numpy arrays propertyName : str The name of the property which manages the dataset shape : tuple The final shape of the hdf5 dataset. If left at `None`, the dataset will be resized along the stacking dimension for every trial drawn from the generator """ if propertyName not in self._hdfFileDatasetProperties: raise SPYValueError( legal=f"one of {self._hdfFileDatasetProperties}", varname="propertyName", actual=propertyName, ) # If there is existing data, get out if isinstance(getattr(self, "_" + propertyName), h5py.Dataset): lgl = "empty syncopy object" act = "non-empty syncopy object" raise SPYValueError(lgl, "data", act) # look at 1st trial to determine fixed dimensions try: trial1 = next(gen) except StopIteration: lgl = "non-exhausted generator" act = "exhausted generator" raise SPYValueError(lgl, "data", act) shape1 = list(trial1.shape) # initial shape # further generated arrays will be checked against shape1 if len(shape1) != ndim: lgl = f"arrays of dimension {ndim}" act = f"got array with dimension {len(shape1)}" raise SPYValueError(lgl, "data", act) # boolean array to index non-stacking dimensions # for strict shape comparison bvec = np.ones(len(shape1), dtype=bool) bvec[self._stackingDim] = False # prepare to resize hdf5 if shape is None: shape = shape1 maxshape = shape.copy() maxshape[self._stackingDim] = None resize = True else: maxshape = None resize = False # construct slicing index stack_idx = [np.s_[:] for _ in range(len(shape))] # -- write data -- stack_count = 0 trlSamples = [] # for constructing the trialdefinition with h5py.File(self.filename, "w") as h5f: dset = h5f.create_dataset(propertyName, shape=shape, maxshape=maxshape, dtype=trial1.dtype) # we have to plug in the 1st trial already generated stack_step = trial1.shape[self._stackingDim] stack_idx[self._stackingDim] = np.s_[0:stack_step] dset[tuple(stack_idx)] = trial1 stack_count += stack_step trlSamples.append(stack_step) # now stream through the arrays from the generator for trial in gen: # check shape except stacking dim if not np.all((shape1 - np.array(trial.shape))[bvec] == 0): lgl = "compatible trial shapes" act = f"mismatching shapes, {tuple(shape1)} and {trial.shape}" raise SPYValueError(lgl, "data", act) stack_step = trial.shape[self._stackingDim] # we have to resize for every trial if no total shape was given if resize: dset.resize(stack_count + stack_step, axis=self._stackingDim) stack_idx[self._stackingDim] = np.s_[stack_count : stack_count + stack_step] dset[tuple(stack_idx)] = trial stack_count += stack_step trlSamples.append(stack_step) setattr(self, "_" + propertyName, dset) self._reopen() # -- construct trialdefinition -- if propertyName == "data": si = np.r_[0, np.cumsum(trlSamples)] sampleinfo = np.column_stack([si[:-1], si[1:]]) trialdefinition = np.column_stack([sampleinfo, np.zeros(len(sampleinfo))]) if self.samplerate is not None: # set standard offset to -1s trialdefinition[:, 2] = -self.samplerate self.trialdefinition = trialdefinition
[docs] def _check_dataset_property_discretedata(self, inData): """Check `DiscreteData` input data for shape consistency Parameters ---------- inData : array/h5py.Dataset array-like to be stored as a `DiscreteData` data source """ # Special case `DiscreteData`: `dimord` encodes no. of expected cols/rows; # ensure this is consistent w/`inData`! if any(["DiscreteData" in str(base) for base in self.__class__.__mro__]): if len(self._defaultDimord) not in inData.shape: lgl = "array with {} columns corresponding to dimord {}" lgl = lgl.format(len(self._defaultDimord), self._defaultDimord) act = "array with shape {}".format(str(inData.shape)) raise SPYValueError(legal=lgl, varname="data", actual=act)
[docs] def _is_empty(self): return all([getattr(self, "_" + attr, None) is None for attr in self._hdfFileDatasetProperties])
@property def dimord(self): """list(str): ordered list of data dimension labels""" return self._dimord @dimord.setter def dimord(self, dims): # ensure `dims` can be safely compared to potentially existing `self._dimord` if dims is not None: try: array_parser(dims, varname="dims", ntype="str", dims=1) except Exception as exc: raise exc if self._dimord is not None and not dims == self._dimord: print( "Syncopy core - dimord: Cannot change `dimord` of object. " + "Functionality currently not supported" ) if dims is None: self._dimord = None return # this enforces the _defaultDimord if set(dims) != set(self._defaultDimord): base = "dimensional labels {}" lgl = base.format("'" + "' x '".join(str(dim) for dim in self._defaultDimord) + "'") act = base.format("'" + "' x '".join(str(dim) for dim in dims) + "'") raise SPYValueError(legal=lgl, varname="dimord", actual=act) # this enforces that custom dimords are set for every axis if len(dims) != len(self._defaultDimord): lgl = f"Custom dimord has length {len(self._defaultDimord)}" act = f"Custom dimord has length {len(dims)}" raise SPYValueError(legal=lgl, varname="dimord", actual=act) # Canonical way to perform initial allocation of dimensional properties # (`self._channel = None`, `self._freq = None` etc.) self._dimord = list(dims) for dim in [dlabel for dlabel in dims if dlabel != "time"]: setattr(self, "_" + dim, None) @property def filename(self): # implicit support for multiple backing filenames: convert list to str if isinstance(self._filename, list): outname = "".join(fname + ", " for fname in self._filename)[:-2] else: outname = self._filename return outname @filename.setter def filename(self, fname): if not isinstance(fname, str): raise SPYTypeError(fname, varname="fname", expected="str") self._filename = os.path.abspath(os.path.expanduser(str(fname))) @property def log(self): """str: log of previous operations on data""" print(self._log_header + self._log) @log.setter def log(self, msg): """This appends the assigned msg to the existing log""" if not isinstance(msg, str): raise SPYTypeError(msg, varname="log", expected="str") prefix = "\n\n|=== {user:s}@{host:s}: {time:s} ===|\n\n\t{caller:s}" clr = sys._getframe().f_back.f_code.co_name if clr.startswith("_") and not clr.startswith("__"): clr = clr[1:] self._log += ( prefix.format( user=getpass.getuser(), host=socket.gethostname(), time=time.asctime(), caller=clr + ": " if clr != "<module>" else "", ) + msg ) @property def mode(self): """str: write mode for data, 'r' for read-only, 'w' for writable FIXME: append/replace with HDF5? """ return self._mode @property def tag(self): try: return filename_parser(self.filename)["tag"] except SPYValueError: return None except Exception as exc: raise exc @mode.setter def mode(self, md): # If the mode is not changing, don't do anything if md == self._mode: return # Ensure input makes sense and we actually have permission to change # the data access mode if not isinstance(md, str): raise SPYTypeError(md, varname="mode", expected="str") options = ["r", "r+", "w", "c"] if md not in options: lgl = "'" + "or '".join(opt + "' " for opt in options) raise SPYValueError(lgl, varname="mode", actual=md) # prevent accidental data loss by not allowing mode = "w" in h5py if md == "w": md = "r+" # If data is already attached to the object, flush and close. All # datasets need to be closed before the file can be re-opened with a # different mode. # This assumes that all datasets attached as properties are stored in # the same hdf5 file, and thus closing the file for 'data' handles all others. for prop in self._hdfFileDatasetProperties: if isinstance(prop, h5py.Dataset): prop.flush() prop = getattr(self, self._hdfFileDatasetProperties[0]) if prop is not None: prop.file.close() # Re-attach datasets for propertyName in self._hdfFileDatasetProperties: if prop is not None: try: prop_value = h5py.File(self.filename, mode=md)[propertyName] except: SPYInfo(f"Could not retrieve dataset '{propertyName}' from HDF5 file.") prop_value = None prop_name = propertyName if propertyName == "data" else "_" + propertyName setattr(self, prop_name, prop_value) self._mode = md @property def selection(self): """Data selection specified by :class:`Selector`""" return self._selector @selection.setter def selection(self, select): if select is None: self._selector = None else: self._selector = Selector(self, select) @property def trialdefinition(self): """nTrials x >=3 :class:`numpy.ndarray` of [start, end, offset, trialinfo[:]]""" if self._trialdefinition is not None: # to avoid hanging references return self._trialdefinition.copy() @property def sampleinfo(self): """nTrials x 2 :class:`numpy.ndarray` of [start, end] sample indices""" if self._trialdefinition is not None: return self._trialdefinition[:, :2] else: return None @sampleinfo.setter def sampleinfo(self, sinfo): raise SPYError("Cannot set sampleinfo. Use `BaseData.trialdefinition` instead.") @property def trial_ids(self): """Index list of trials""" if self._trialdefinition is not None: return self._trial_ids @property def trialintervals(self): """nTrials x 2 :class:`numpy.ndarray` of [start, end] times in seconds""" if self._trialdefinition is not None and self._samplerate is not None: # trial lengths in samples start_end = self.sampleinfo - self.sampleinfo[:, 0][:, None] start_end[:, 1] -= 1 # account for last time point # add offset and convert to seconds start_end = (start_end + self._t0[:, None]) / self._samplerate return start_end else: return None @property def _t0(self): """These are the (trigger) offsets""" if self._trialdefinition is not None: return self._trialdefinition[:, 2] else: return None @property def trials(self): """list-like iterable of trials""" if self.sampleinfo is not None: trial_ids = list(range(self.sampleinfo.shape[0])) # this is cheap as it just initializes an indexable generator # with no real data and/or computation! return TrialIndexer(self, trial_ids) else: return None @property def trialinfo(self): """nTrials x M :class:`numpy.ndarray` with numeric information about each trial Each trial can have M properties (condition, original trial no., ...) coded by numbers. This property are the fourth and onward columns of `BaseData._trialdefinition`. """ if self._trialdefinition is not None: if self._trialdefinition.shape[1] > 3: return self._trialdefinition[:, 3:] else: # If trials are defined but no trialinfo return empty array with # nTrial rows, but 0 columns. This works well with np.hstack. return np.empty(shape=(len(self.trials), 0)) else: return None @trialinfo.setter def trialinfo(self, trl): raise SPYError( "Cannot set trialinfo. Use `BaseData.trialdefinition` or `syncopy.definetrial` instead." ) # Helper function that grabs a single trial
[docs] @abstractmethod def _get_trial(self, trialno): pass
# Helper function that creates a `FauxTrial` object given actual trial information
[docs] @abstractmethod def _preview_trial(self, trialno): pass
# Convenience function, wiping contents of backing device from memory
[docs] def clear(self): """Clear loaded data from memory Calls `flush` method of HDF5 dataset. """ for propName in self._hdfFileDatasetProperties: dsetProp = getattr(self, "_" + propName) if dsetProp is not None: dsetProp.flush() return
[docs] def _close(self): """Close backing hdf5 file.""" self.clear() for propertyName in self._hdfFileDatasetProperties: dsetProp = getattr(self, "_" + propertyName) if isinstance(dsetProp, h5py.Dataset): if dsetProp.id.valid != 0: # Check whether backing HDF5 file is open. dsetProp.file.close()
[docs] def _get_backing_hdf5_file_handle(self): """Get handle to `h5py.File` instance of backing HDF5 file Checks all datasets in `self._hdfFileDatasetProperties` for valid handles, returns `None` if none found. Note that the mode of the returned instance depends on the current value of `self.mode`. """ for propertyName in self._hdfFileDatasetProperties: dsetProp = getattr(self, "_" + propertyName) if isinstance(dsetProp, h5py.Dataset): if dsetProp.id.valid != 0: return dsetProp.file return None
[docs] def _reopen(self): """Reattach datasets from backing hdf5 file. Respects current `self.mode`.""" for propertyName in self._hdfFileDatasetProperties: dsetProp = getattr(self, "_" + propertyName) if isinstance(dsetProp, h5py.Dataset): setattr( self, "_" + propertyName, h5py.File(self.filename, mode=self.mode)[propertyName], )
[docs] def copy(self): """ Create a copy of the entire object on disk. Returns ------- cpy : Syncopy data object Reference to the copied data object on disk Notes ----- For copying only a subset of the `data` use :func:`syncopy.selectdata` directly with the default `inplace=False` parameter. See also -------- :func:`syncopy.save` : save to specific file path :func:`syncopy.selectdata` : creates copy of a selection with `inplace=False` """ return spy.copy(self)
# Attach trial-definition routine to not re-invent the wheel here definetrial = _definetrial # Wrapper that makes saving routine usable as class method
[docs] def save(self, container=None, tag=None, filename=None, overwrite=False): r"""Save data object as new ``spy`` container to disk (:func:`syncopy.save_data`) FIXME: update docu Parameters ---------- container : str Path to Syncopy container folder (\*.spy) to be used for saving. If omitted, a .spy extension will be added to the folder name. tag : str Tag to be appended to container basename filename : str Explicit path to data file. This is only necessary if the data should not be part of a container folder. An extension (\*.<dataclass>) will be added if omitted. The `tag` argument is ignored. overwrite : bool If `True` an existing HDF5 file and its accompanying JSON file is overwritten (without prompt). Examples -------- >>> save_spy(obj, filename="session1") >>> # --> os.getcwd()/session1.<dataclass> >>> # --> os.getcwd()/session1.<dataclass>.info >>> save_spy(obj, filename="/tmp/session1") >>> # --> /tmp/session1.<dataclass> >>> # --> /tmp/session1.<dataclass>.info >>> save_spy(obj, container="container.spy") >>> # --> os.getcwd()/container.spy/container.<dataclass> >>> # --> os.getcwd()/container.spy/container.<dataclass>.info >>> save_spy(obj, container="/tmp/container.spy") >>> # --> /tmp/container.spy/container.<dataclass> >>> # --> /tmp/container.spy/container.<dataclass>.info >>> save_spy(obj, container="session1.spy", tag="someTag") >>> # --> os.getcwd()/container.spy/session1_someTag.<dataclass> >>> # --> os.getcwd()/container.spy/session1_someTag.<dataclass>.info """ # Ensure `obj.save()` simply overwrites on-disk representation of object if container is None and tag is None and filename is None: if self.container is None: raise SPYError( "Cannot create spy container in temporary " + "storage {} - please provide explicit path. ".format(__storage__) ) overwrite = True filename = self.filename # Support `obj.save(tag="newtag")` if container is None and filename is None: if self.container is None: raise SPYError( "Object is not associated to an existing spy container - " + "please save object first using an explicit path. " ) container = filename_parser(self.filename)["folder"] spy.save(self, filename=filename, container=container, tag=tag, overwrite=overwrite)
# Helper function generating pseudo-random temp file-names
[docs] def _gen_filename(self): fname_hsh = blake2b(digest_size=4, salt=os.urandom(blake2b.SALT_SIZE)).hexdigest() fname = os.path.join( __storage__, "spy_{sess:s}_{hash:s}{ext:s}".format( sess=__sessionid__, hash=fname_hsh, ext=self._classname_to_extension() ), ) return fname
# Helper function converting object class-name to usable file extension
[docs] def _classname_to_extension(self): return "." + self.__class__.__name__.split("Data")[0].lower()
# Legacy support def __repr__(self): return self.__str__() # Make class contents readable from the command line @abstractmethod def __str__(self): pass # Destructor def __del__(self): # keep all datasets alive and open if self._persistent_hdf5: return # close hdf5 file for propertyName in self._hdfFileDatasetProperties: prop = getattr(self, "_" + propertyName) if prop is not None: try: prop.file.close() # can happen if the file was deleted elsewhere # or we exit un-gracefully from some undefined state except (ValueError, ImportError, TypeError, AttributeError): pass # remove from file system if __storage__ in self.filename and os.path.exists(self.filename): os.unlink(self.filename) shutil.rmtree(os.path.splitext(self.filename)[0], ignore_errors=True) # Support for basic arithmetic operations (no in-place computations supported yet) def __add__(self, other): return _process_operator(self, other, "+") def __radd__(self, other): return _process_operator(self, other, "+") def __sub__(self, other): return _process_operator(self, other, "-") def __rsub__(self, other): return _process_operator(self, other, "-") def __mul__(self, other): return _process_operator(self, other, "*") def __rmul__(self, other): return _process_operator(self, other, "*") def __truediv__(self, other): return _process_operator(self, other, "/") def __rtruediv__(self, other): return _process_operator(self, other, "/") def __pow__(self, other): return _process_operator(self, other, "**") def __eq__(self, other): # If other object is not a Syncopy data-class, get out if not "BaseData" in str(other.__class__.__mro__): SPYInfo("Not a Syncopy object") return False # Check if two Syncopy objects of same type/dimord are present try: data_parser(other, dimord=self.dimord, dataclass=self.__class__.__name__) except Exception as exc: SPYInfo("Syncopy object of different type/dimord") return False # First, ensure we have something to compare here if self._is_empty(): if not other._is_empty(): SPYInfo("Empty and non-empty Syncopy object") return False return True elif not self._is_empty(): if other._is_empty(): SPYInfo("Non-empty and empty Syncopy object") return False # If in-place selections are present, abort if self.selection is not None or other.selection is not None: err = "Cannot perform object comparison with existing in-place selection" raise SPYError(err) # Use `_infoFileProperties` to fetch dimensional object props: remove `dimord` # (has already been checked by `data_parser` above) and remove `cfg` (two # objects might be identical even if their history deviates) dimProps = [prop for prop in self._infoFileProperties if not prop.startswith("_")] dimProps = list(set(dimProps).difference(["dimord", "cfg"])) for prop in dimProps: val_this = getattr(self, prop) val_other = getattr(other, prop) if isinstance(val_this, np.ndarray) and isinstance(val_other, np.ndarray): isEqual = val_this.tolist() == val_other.tolist() # catch None elif val_this is None and val_other is not None: isEqual = False elif val_this is not None and val_other is None: isEqual = False else: isEqual = val_this == val_other if not isEqual: SPYInfo("Mismatch in {}".format(prop)) return False # Check if trial setup is identical if not np.array_equal(self.trialdefinition, other.trialdefinition): SPYInfo("Mismatch in trial layouts") return False # If an object is compared to itself (or its shallow copy), don't bother # juggling NumPy arrays but simply perform a quick dataset/filename comparison both_hdfFileDatasetProperties = self._hdfFileDatasetProperties + other._hdfFileDatasetProperties isEqual = True if self.filename == other.filename: for dsetName in both_hdfFileDatasetProperties: if hasattr(self, "_" + dsetName) and hasattr(other, "_" + dsetName): val_this = getattr(self, "_" + dsetName) val_other = getattr(other, "_" + dsetName) if isinstance(val_this, h5py.Dataset): isEqual = val_this == val_other if not isEqual: SPYInfo( f"HDF dataset '{dsetName}' mismatch for types '{type(val_this)}' and '{type(val_other)}'" ) return False else: SPYInfo(f"HDF dataset mismatch: extra dataset '{dsetName}' in one instance") return False else: for dsetName in both_hdfFileDatasetProperties: if dsetName != "data": if hasattr(self, "_" + dsetName) and hasattr(other, "_" + dsetName): val_this = getattr(self, "_" + dsetName) val_other = getattr(other, "_" + dsetName) if isinstance(val_this, h5py.Dataset): # isEqual = True # This case gets checked by trial below. isEqual = val_this == val_other elif val_this is None and val_other is None: isEqual = True if not isEqual: SPYInfo( f"HDF dataset '{dsetName}' mismatch for types '{type(val_this)}' and '{type(val_other)}'" ) return False else: SPYInfo(f"HDF dataset mismatch: extra dataset '{dsetName}' in one instance") return False # The other object really is a standalone Syncopy class instance and # everything but the data itself aligns; now the most expensive part: # trial by trial data comparison for tk in range(len(self.trials)): if not np.allclose(self.trials[tk], other.trials[tk]): SPYInfo("Mismatch in trial #{}".format(tk)) return False # If we made it this far, `self` and `other` really seem to be identical return True # Class "constructor"
[docs] def __init__(self, filename=None, dimord=None, mode="r+", **kwargs): """ Keys of kwargs are the datasets from _hdfFileDatasetProperties, and kwargs must *only* include datasets for which a property with a setter exists. 1. filename + data = create HDF5 file at filename with data in it 2. data only """ # each instance needs its own cfg! self._cfg = {} self._info = SerializableDict() # set to `True` to keep backing hdf5 alive # when the destructor is hit self._persistent_hdf5 = False # Initialize hidden attributes for propertyName in self._hdfFileDatasetProperties: setattr(self, "_" + propertyName, None) self._selector = None # Set mode self.mode = mode # If any dataset property contains data and no dimord is set, use the # default dimord if ( any( [key in self._hdfFileDatasetProperties and value is not None for key, value in kwargs.items()] ) and dimord is None ): self.dimord = self._defaultDimord else: self.dimord = dimord # If a target filename is provided use it, otherwise generate random # filename in `syncopy.__storage__` if filename is not None: self.filename = filename else: self.filename = self._gen_filename() # Attach dataset properties and let set methods do error checking. for propertyName in self._hdfFileDatasetProperties: if propertyName in kwargs: setattr(self, propertyName, kwargs[propertyName]) # Write initial log entry self.log = "created {clname:s} object".format(clname=self.__class__.__name__) # Write version self._version = __version__
[docs]class FauxTrial: """ Stand-in mockup of NumPy arrays representing trial data Parameters ---------- shape : tuple Shape of source trial array idx : tuple Tuple of slices for extracting trial-data from source object's `data` dataset. The provided tuple **has** to be a proper indexing sequence, i.e., if `idx` refers to the `k`-th trial in `obj`, then ``obj.data[idx]`` must slice `data` correctly so that ``obj.data[idx] == obj.trials[k]`` dtype : :class:`numpy.dtype` Datatype of source trial array dimord : list Dimensional order of source trial array Returns ------- faux_trl : FauxTrial object An instance of `FauxTrial` that essentially parrots :class:`numpy.ndarray` objects and can, thus, be used to feed "fake" trials into a :meth:`~syncopy.shared.computational_routine.ComputationalRoutine.computeFunction` to get the `noCompute` runs out of the way w/o actually loading trials into memory. See also -------- syncopy.continuous_data.ContinuousData._preview_trial : makes use of this class """
[docs] def __init__(self, shape, idx, dtype, dimord): self.shape = tuple(shape) self.idx = tuple(idx) self.dtype = dtype self.dimord = dimord
def __str__(self): msg = "Trial placeholder of shape {} and datatype {}" return msg.format(str(self.shape), str(self.dtype)) def __repr__(self): return self.__str__()
[docs] def squeeze(self): """ Remove 1's from shape and return a new `FauxTrial` instance (parroting the NumPy original :func:`numpy.squeeze`) """ shp = list(self.shape) while 1 in shp: shp.remove(1) return FauxTrial(shp, self.idx, self.dtype, self.dimord)
@property def T(self): """ Return a new `FauxTrial` instance with reversed dimensions (parroting the NumPy original :func:`numpy.transpose`) """ return FauxTrial(self.shape[::-1], self.idx[::-1], self.dtype, self.dimord[::-1])