Source code for syncopy.datatype.discrete_data

# -*- coding: utf-8 -*-
#
# Syncopy's abstract base class for discrete data + regular children
#

# Builtin/3rd party package imports
import numpy as np
from abc import ABC
from collections.abc import Iterator
import inspect


# Local imports
from .base_data import BaseData, FauxTrial
from .methods.definetrial import definetrial
from syncopy.shared.parsers import scalar_parser, array_parser
from syncopy.shared.errors import SPYValueError, SPYError, SPYTypeError
from syncopy.plotting import spike_plotting

from syncopy.io.nwb import _spikedata_to_nwbfile

from syncopy import __pynwb__

if __pynwb__:  # pragma: no cover
    from pynwb import NWBHDF5IO


__all__ = ["SpikeData", "EventData"]


[docs]class DiscreteData(BaseData, ABC): """ Abstract class for non-uniformly sampled data where only time-stamps are recorded. Notes ----- This class cannot be instantiated. Use one of the children instead. """ _infoFileProperties = BaseData._infoFileProperties + ("samplerate",) _hdfFileAttributeProperties = BaseData._hdfFileAttributeProperties + ("samplerate",) _selectionKeyWords = BaseData._selectionKeyWords + ("latency",) @property def data(self): """ Array-like object representing data without trials. Trials are concatenated along the time axis. """ if getattr(self._data, "id", None) is not None: if self._data.id.valid == 0: lgl = "open HDF5 file" act = "backing HDF5 file {} has been closed" raise SPYValueError(legal=lgl, actual=act.format(self.filename), varname="data") return self._data @data.setter def data(self, inData): """Also checks for integer type of data""" # this comes from BaseData self._set_dataset_property(inData, "data") if inData is not None: if not np.issubdtype(self.data.dtype, np.integer): raise SPYTypeError(self.data.dtype, "data", "integer like") def __str__(self): # Get list of print-worthy attributes ppattrs = [ attr for attr in self.__dir__() if not (attr.startswith("_") or attr in ["log", "trialdefinition"]) ] ppattrs = [ attr for attr in ppattrs if hasattr(self, attr) and not (inspect.ismethod(getattr(self, attr)) or isinstance(getattr(self, attr), Iterator)) ] ppattrs.sort() # Construct string for pretty-printing class attributes dsep = " by " hdstr = "Syncopy {clname:s} object with fields\n\n" ppstr = hdstr.format(clname=self.__class__.__name__) maxKeyLength = max([len(k) for k in ppattrs]) printString = "{0:>" + str(maxKeyLength + 5) + "} : {1:}\n" for attr in ppattrs: value = getattr(self, attr) if hasattr(value, "shape") and attr == "data" and self.sampleinfo is not None: tlen = np.unique([sinfo[1] - sinfo[0] for sinfo in self.sampleinfo]) if tlen.size == 1: trlstr = "of length {} ".format(str(tlen[0])) else: trlstr = "" dsize = np.prod(self.data.shape) * self.data.dtype.itemsize / 1024**2 dunit = "MB" if dsize > 1000: dsize /= 1024 dunit = "GB" valueString = "{} trials {}defined on ".format(str(len(self.trials)), trlstr) valueString += ( "[" + " x ".join([str(numel) for numel in value.shape]) + "] {dt:s} {tp:s} " + "of size {sz:3.2f} {szu:s}" ) valueString = valueString.format( dt=self.data.dtype.name, tp=self.data.__class__.__name__, sz=dsize, szu=dunit, ) elif hasattr(value, "shape"): valueString = ( "[" + " x ".join([str(numel) for numel in value.shape]) + "] element " + str(type(value)) ) elif isinstance(value, list): if attr == "dimord" and value is not None: valueString = dsep.join(dim for dim in self.dimord) else: valueString = "{0} element list".format(len(value)) elif isinstance(value, dict): msg = "dictionary with {nk:s}keys{ks:s}" keylist = value.keys() showkeys = len(keylist) < 7 valueString = msg.format( nk=str(len(keylist)) + " " if not showkeys else "", ks=" '" + "', '".join(key for key in keylist) + "'" if showkeys else "", ) else: valueString = str(value) ppstr += printString.format(attr, valueString) ppstr += "\nUse `.log` to see object history" return ppstr @property def sample(self): """Indices of all recorded samples""" if self.data is None: return None return self.data[:, self.dimord.index("sample")] @property def samplerate(self): """float: underlying sampling rate of non-uniform data acquisition""" return self._samplerate @samplerate.setter def samplerate(self, sr): if sr is None: self._samplerate = None return try: scalar_parser(sr, varname="samplerate", lims=[1, np.inf]) except Exception as exc: raise exc self._samplerate = sr @BaseData.trialdefinition.setter def trialdefinition(self, trldef): if trldef is None: sidx = self.dimord.index("sample") self._trialdefinition = np.array( [[np.nanmin(self.data[:, sidx]), np.nanmax(self.data[:, sidx]), 0]] ) self._trial_ids = [0] else: array_parser(trldef, varname="trialdefinition", dims=2) array_parser( trldef[:, :2], varname="sampleinfo", hasnan=False, hasinf=False, ntype="int_like", lims=[0, np.inf], ) self._trialdefinition = trldef.copy() self._triald_ids = np.arange(self.sampleinfo.shape[0]) # Compute trial-IDs by matching data samples with provided trial-bounds samples = self.data[:, self.dimord.index("sample")] idx = np.searchsorted(samples, self.sampleinfo.ravel()) idx = idx.reshape(self.sampleinfo.shape) self._trialslice = [slice(st, end) for st, end in idx] self.trialid = np.full((samples.shape), -1, dtype=int) for itrl, itrl_slice in enumerate(self._trialslice): self.trialid[itrl_slice] = itrl self._trial_ids = np.arange(self.sampleinfo.shape[0]) @property def time(self): """list(float): trigger-relative time of each event""" if self.samplerate is not None and self.sampleinfo is not None: return [ (trl[:, self.dimord.index("sample")] - self.sampleinfo[tk, 0] + self.trialdefinition[tk, 2]) / self.samplerate for tk, trl in enumerate(self.trials) ] @property def trialid(self): """:class:`numpy.ndarray` of trial id associated with the sample""" return self._trialid @trialid.setter def trialid(self, trlid): """ 1d-array of the size of the total number of samples, encoding which sample belongs to which trial. """ if trlid is None: self._trialid = None return if self.data is None: SPYError( "SyNCoPy core - trialid: Cannot assign `trialid` without data. " + "Please assign data first" ) return if (self.data.shape[0] == 0) and (trlid.shape[0] == 0): self._trialid = np.array(trlid, dtype=int) return scount = np.nanmax(self.data[:, self.dimord.index("sample")]) try: array_parser( trlid, varname="trialid", dims=(self.data.shape[0],), hasnan=False, hasinf=False, ntype="int_like", lims=[-1, scount], ) except Exception as exc: raise exc self._trialid = np.array(trlid, dtype=int) @property def trialtime(self): """list(:class:`numpy.ndarray`): trigger-relative sample times in s""" if self.samplerate is not None and self.sampleinfo is not None: sample0 = self.sampleinfo[:, 0] - self._t0 sample0 = np.append(sample0, np.nan)[self.trialid] return (self.data[:, self.dimord.index("sample")] - sample0) / self.samplerate # Helper function that grabs a single trial
[docs] def _get_trial(self, trialno): return self._data[self._trialslice[trialno], :]
# Helper function that spawns a `FauxTrial` object given actual trial information
[docs] def _preview_trial(self, trialno): """ Generate a `FauxTrial` instance of a trial Parameters ---------- trialno : int Number of trial the `FauxTrial` object is intended to mimic Returns ------- faux_trl : :class:`syncopy.datatype.base_data.FauxTrial` An instance of :class:`syncopy.datatype.base_data.FauxTrial` mainly intended to be used in `noCompute` runs of :meth:`syncopy.shared.computational_routine.ComputationalRoutine.computeFunction` to avoid loading actual trial-data into memory. Notes ----- If an active in-place selection is found, the generated `FauxTrial` object respects it (e.g., if only 2 of 10 channels are selected in-place, `faux_trl` reports to only contain 2 channels) See also -------- syncopy.datatype.base_data.FauxTrial : class definition and further details syncopy.shared.computational_routine.ComputationalRoutine : Syncopy compute engine """ trlSlice = self._trialslice[trialno] trialIdx = np.arange(trlSlice.start, trlSlice.stop) # np.where(self.trialid == trialno)[0] nCol = len(self.dimord) idx = [[], slice(0, nCol)] if self.selection is not None: # selections are harmonized, just take `.time` idx[0] = trialIdx[self.selection.time[self.selection.trial_ids.index(trialno)]].tolist() else: idx[0] = trialIdx.tolist() shp = [len(idx[0]), nCol] return FauxTrial(shp, tuple(idx), self.data.dtype, self.dimord)
[docs] def __init__(self, data=None, samplerate=None, trialid=None, **kwargs): # set as instance attribute to allow (un-)registering of additional datasets self._hdfFileDatasetProperties = BaseData._hdfFileDatasetProperties + ("data",) # Assign (default) values self._trialid = None self._samplerate = None self._data = None self.samplerate = samplerate self.trialid = trialid # Call initializer super().__init__(data=data, **kwargs) if self.data is not None: if self.data.size == 0: # initialization with empty data not allowed raise SPYValueError("non empty data set", "data") # In case of manual data allocation (reading routine would leave a # mark in `cfg`), fill in missing info if self.sampleinfo is None: # Fill in dimensional info definetrial(self, kwargs.get("trialdefinition"))
[docs] def save_nwb(self, **kwargs): raise NotImplementedError("Saving of this datatype to NWB files is not supported.")
# plotting, only virtual in the abc
[docs] def singlepanelplot(self): raise NotImplementedError
[docs] def multipanelplot(self): raise NotImplementedError
[docs]class SpikeData(DiscreteData): """Spike times of multi- and/or single units This class can be used for representing spike trains. The data is always stored as a two-dimensional [nSpikes x 3] array on disk with the columns being ``["sample", "channel", "unit"]``. The "unit" is the neuron a spike originated from. Note that in the raw data, a signal from one neuron may show up in several (nearby) channels, with different strengths. The waveform of the action potential is used as a signature to identify the signal of a neuron across channels. Once this has been done, it is known which neuron spiked when, and the channel information is typically no longer of interest. I.e., with spike data that is ready for the scientific analysis, there typically is only one channel. Note that this means that "channel x unit z" is the same neuron as "channel y unit z", since the unit should identify the same neuron, regardless of the channel. Often, the raw data around individual spikes is save along with the spikes, so that one can later infer the type of neuron (e.g., inhibitory/excitatory) from it. We support this with the 'waveform' attribute of spy.SpikeData. Data is only read from disk on demand, similar to HDF5 files. """ _infoFileProperties = DiscreteData._infoFileProperties + ( "channel", "unit", ) _defaultDimord = ["sample", "channel", "unit"] _stackingDimLabel = "sample" _selectionKeyWords = DiscreteData._selectionKeyWords + ( "channel", "unit", ) def _compute_unique_idx(self): """ Use `np.unique` on whole(!) dataset to compute globally available channel and unit indices only once This function gets triggered by the constructor `if data is not None` or latest when channel/unit labels are assigned with the respective setters. """ # after data was added via selection or loading from file # this function gets re-triggered by the channel/unit setters! if self.data is None: return # this is costly and loads the entire hdf5 dataset into memory! self.channel_idx = np.unique(self.data[:, self.dimord.index("channel")]) self.unit_idx = np.unique(self.data[:, self.dimord.index("unit")]) @property def channel(self): """:class:`numpy.ndarray` : list of original channel names for each unit""" return self._channel @channel.setter def channel(self, chan): if self.data is None: if chan is not None: raise SPYValueError( f"non-empty SpikeData", "cannot assign `channel` without data. " + "Please assign data first", ) # No labels for no data is fine self._channel = chan return # there is data elif chan is None: raise SPYValueError("channel labels, cannot set `channel` to `None` with existing data.") # if we landed here, we have data and new labels # in case of selections and/or loading from file # the constructor was called with data=None, hence # we have to compute the unique indices here if self.channel_idx is None: self._compute_unique_idx() # we need as many labels as there are distinct channels nChan = self.channel_idx.size if nChan != len(chan): raise SPYValueError(f"exactly {nChan} channel label(s)") array_parser(chan, varname="channel", ntype="str", dims=(nChan,)) self._channel = np.array(chan) def _default_channel_labels(self): """ Creates the default channel labels """ # channel entries in self.data are 0-based chan_max = self.channel_idx.max() channel_labels = np.array( ["channel" + str(int(i + 1)).zfill(len(str(chan_max))) for i in self.channel_idx] ) return channel_labels @property def unit(self): """:class:`numpy.ndarray(str)` : unit names""" return self._unit @unit.setter def unit(self, unit): if self.data is None: if unit is not None: raise SPYValueError( f"non-empty SpikeData", "cannot assign `unit` without data. " + "Please assign data first", ) # empy labels for empty data is fine self._unit = unit return # there is data elif unit is None: raise SPYValueError("unit labels, cannot set `unit` to `None` with existing data.") # in case of selections and/or loading from file # the constructor was called with data=None, hence # we have to compute this here if self.unit_idx is None: self._compute_unique_idx() if unit is None and self.data is not None: raise SPYValueError("Cannot set `unit` to `None` with existing data.") elif self.data is None and unit is not None: raise SPYValueError( "Syncopy - SpikeData - unit: Cannot assign `unit` without data. " + "Please assign data first" ) elif unit is None: self._unit = None return nunit = self.unit_idx.size if nunit != len(unit): raise SPYValueError(f"exactly {nunit} unit label(s)") array_parser(unit, varname="unit", ntype="str", dims=(nunit,)) self._unit = np.array(unit) def _default_unit_labels(self): """ Creates the default unit labels """ unit_max = self.unit_idx.max() return np.array(["unit" + str(int(i + 1)).zfill(len(str(unit_max))) for i in self.unit_idx]) # Helper function that extracts by-trial unit-indices def _get_unit(self, trials, units=None): """ Get relative by-trial indices of unit selections Parameters ---------- trials : list List of trial-indices to perform selection on units : None or list List of unit-indices to be selected Returns ------- indices : list of lists List of by-trial sample-indices corresponding to provided unit-selection. If `units` is `None`, `indices` is a list of universal (i.e., ``slice(None)``) selectors. Notes ----- This class method is intended to be solely used by :class:`syncopy.datatype.selector.Selector` objects and thus has purely auxiliary character. Therefore, all input sanitization and error checking is left to :class:`syncopy.datatype.selector.Selector` and not performed here. See also -------- syncopy.datatype.selector.Selector : Syncopy data selectors """ if units is not None: indices = [] for trlno in trials: thisTrial = self.data[self._trialslice[trlno], self.dimord.index("unit")] trialUnits = [] for unit in units: trialUnits += list(np.where(thisTrial == unit)[0]) if len(trialUnits) > 1: steps = np.diff(trialUnits) if steps.min() == steps.max() == 1: trialUnits = slice(trialUnits[0], trialUnits[-1] + 1, 1) indices.append(trialUnits) else: indices = [slice(None)] * len(trials) return indices @property def waveform(self): """The waveform of the spikes in the data. This is a tiny part of the raw data around the time point at which a spike was detected (by the recording hardware/software: keep in mind that the spike data Syncopy is working with is already pre-processed). From this sequence of samples, one can derive the waveform type of the spike (via classification), which may allow to derive the type of neuron that produced the spike. """ return self._waveform @waveform.setter def waveform(self, waveform): """Set a waveform dataset from a numpy array, `None`, or an `h5py.Dataset` instance.""" if self.data is None: if waveform is not None: raise SPYValueError( legal="non-empty SpikeData", varname="waveform", actual="empty SpikeData main dataset (None). Cannot assign `waveform` without data. Please assign data first.", ) if waveform is None: self._set_dataset_property(waveform, "waveform") # None self._unregister_dataset("waveform", del_attr=False) return if waveform.ndim < 2: raise SPYValueError( legal="waveform data with at least 2 dimensions", varname="waveform", actual=f"data with {waveform.ndim} dimensions", ) if waveform.shape[0] != self.data.shape[0]: raise SPYValueError( f"waveform shape[0]={waveform.shape[0]} must equal nSpikes={self.data.shape[0]}. " + "Please create one waveform per spike in data.", varname="waveform", actual=f"wrong size waveform with shape {waveform.shape}", ) self._set_dataset_property(waveform, "waveform") # "Constructor"
[docs] def __init__( self, data=None, filename=None, trialdefinition=None, samplerate=None, channel=None, unit=None, dimord=None, ): """Initialize a :class:`SpikeData` object. Parameters ---------- data : [nSpikes x 3] :class:`numpy.ndarray` filename : str path to filename or folder (spy container) trialdefinition : :class:`EventData` object or nTrials x 3 array [start, stop, trigger_offset] sample indices for `M` trials samplerate : float sampling rate in Hz channel : str or list/array(str) original channel names unit : str or list/array(str) names of all units dimord : list(str) ordered list of dimension labels 1. `filename` + `data` : create hdf dataset incl. sampleinfo @filename 2. `filename` no `data` : read from file (spy, hdf5 file) 3. just `data` : try to attach data (error checking done by :meth:`SpikeData.data.setter`) See also -------- :func:`syncopy.definetrial` """ # instance attribute to allow modification self._hdfFileAttributeProperties = DiscreteData._hdfFileAttributeProperties + ( "channel", "unit", ) self._unit = None self.unit_idx = None self._channel = None self.channel_idx = None # Call parent initializer super().__init__( data=data, filename=filename, trialdefinition=trialdefinition, samplerate=samplerate, dimord=dimord, ) self._hdfFileDatasetProperties += ("waveform",) self._waveform = None # for fast lookup and labels self._compute_unique_idx() # constructor gets `data=None` for # empty inits, selections and loading from file # can't set any labels in that case if channel is not None: # setter raises exception if data=None self.channel = channel elif data is not None: # data but no given labels self.channel = self._default_channel_labels() # same for unit if unit is not None: # setter raises exception if data=None self.unit = unit elif data is not None: self.unit = self._default_unit_labels()
[docs] def save_nwb(self, outpath, with_trialdefinition=True): """Save SpikeData in Neurodata Without Borders (NWB) file format. An NWBFile represents a single session of an experiment. Parameters ---------- outpath : str, path-like. Where to save the NWB file, including file name and `.nwb` extension. All directories in the path must exist. Example: `'mydata.nwb'`. with_trialdefinition : Boolean, whether to save the trial definition in the NWB file. Returns ------- None, called for side effect of writing the NWB file to disk. Notes ----- Due to the very general architecture of the NWB format, many fields need to be interpreted by software reading the format. Thus, providing a generic function to save Syncopy data in NWB format is possible only if you know who will read it. Depending on your target software, you may need to manually format the data using pynwb before writing it to disk, or manually open it using pynwb before using it with the target software. Selections are ignored, the full data is exported. Create a new Syncopy data object before calling this function if you want to export a subset only. """ if not __pynwb__: raise SPYError("NWB support is not available. Please install the 'pynwb' package.") nwbfile = _spikedata_to_nwbfile(self, nwbfile=None, with_trialdefinition=with_trialdefinition) # Write the file to disk. with NWBHDF5IO(outpath, "w") as io: io.write(nwbfile)
# implement plotting
[docs] def singlepanelplot(self, **show_kwargs): figax = spike_plotting.plot_single_figure_SpikeData(self, **show_kwargs) return figax
# implement plotting
[docs] def multipanelplot(self, **show_kwargs): figax = spike_plotting.plot_multi_figure_SpikeData(self, **show_kwargs) return figax
[docs]class EventData(DiscreteData): """Timestamps and integer codes of experimental events This class can be used for representing events during an experiment, e.g. stimulus was turned on, etc. These usually occur at non-regular time points and have associated event codes. Data is only read from disk on demand, similar to HDF5 files. """ _defaultDimord = ["sample", "eventid"] _stackingDimLabel = "sample" _selectionKeyWords = DiscreteData._selectionKeyWords + ("eventid",) @property def eventid(self): """numpy.ndarray(int): integer event code assocated with each event""" if self.data is None: return None return np.unique(self.data[:, self.dimord.index("eventid")]) # Helper function that extracts by-trial eventid-indices def _get_eventid(self, trials, eventids=None): """ Get relative by-trial indices of event-id selections Parameters ---------- trials : list List of trial-indices to perform selection on eventids : None or list List of event-id-indices to be selected Returns ------- indices : list of lists List of by-trial sample-indices corresponding to provided event-id-selection. If `eventids` is `None`, `indices` is a list of universal (i.e., ``slice(None)``) selectors. Notes ----- This class method is intended to be solely used by :class:`syncopy.datatype.selector.Selector` objects and thus has purely auxiliary character. Therefore, all input sanitization and error checking is left to :class:`syncopy.datatype.selector.Selector` and not performed here. See also -------- syncopy.datatype.selector.Selector : Syncopy data selectors """ if eventids is not None: indices = [] for trlno in trials: thisTrial = self.data[self._trialslice[trlno], self.dimord.index("eventid")] trialEvents = [] for event in eventids: trialEvents += list(np.where(thisTrial == event)[0]) if len(trialEvents) > 1: steps = np.diff(trialEvents) if steps.min() == steps.max() == 1: trialEvents = slice(trialEvents[0], trialEvents[-1] + 1, 1) indices.append(trialEvents) else: indices = [slice(None)] * len(trials) return indices # "Constructor"
[docs] def __init__( self, data=None, filename=None, trialdefinition=None, samplerate=None, dimord=None, ): """Initialize a :class:`EventData` object. Parameters ---------- data : [nEvents x 2] :class:`numpy.ndarray` filename : str path to filename or folder (spy container) trialdefinition : :class:`EventData` object or nTrials x 3 array [start, stop, trigger_offset] sample indices for `M` trials samplerate : float sampling rate in Hz dimord : list(str) ordered list of dimension labels 1. `filename` + `data` : create hdf dataset incl. sampleinfo @filename 2. `filename` no `data` : read from file(spy, hdf5) 3. just `data` : try to attach data (error checking done by :meth:`EventData.data.setter`) See also -------- :func:`syncopy.definetrial` """ if dimord is not None: # ensure that event data can have extra dimord columns if len(dimord) != len(self._defaultDimord): for col in self._defaultDimord: if col not in dimord: base = "dimensional label {}" lgl = base.format("'" + col + "'") raise SPYValueError(legal=lgl, varname="dimord") self._defaultDimord = dimord # Call parent initializer super().__init__( data=data, filename=filename, trialdefinition=trialdefinition, samplerate=samplerate, dimord=dimord, ) self._hdfFileAttributeProperties = BaseData._hdfFileAttributeProperties + ("samplerate",)