# -*- coding: utf-8 -*-
#
# Syncopy's abstract base class for continuous data + regular children
#
"""Uniformly sampled (continuous data).
This module holds classes to represent data with a uniformly sampled time axis.
"""
# Builtin/3rd party package imports
import inspect
import numpy as np
from abc import ABC
from collections.abc import Iterator
# Local imports
from .base_data import BaseData, FauxTrial, _definetrial
from .methods.definetrial import definetrial
from .base_data import BaseData
from syncopy.shared.parsers import scalar_parser, array_parser
from syncopy.shared.errors import SPYValueError, log
from syncopy.shared.tools import best_match
from syncopy.plotting import sp_plotting, mp_plotting
from .util import TimeIndexer
__all__ = ["AnalogData", "SpectralData", "CrossSpectralData", "TimeLockData"]
[docs]class ContinuousData(BaseData, ABC):
"""Abstract class for uniformly sampled data
Notes
-----
This class cannot be instantiated. Use one of the children instead.
"""
_infoFileProperties = BaseData._infoFileProperties + ("samplerate", "channel",)
_hdfFileDatasetProperties = BaseData._hdfFileDatasetProperties + ("data",)
# all continuous data types have a time axis
_selectionKeyWords = BaseData._selectionKeyWords + ('latency',)
@property
def data(self):
"""
HDF5 dataset property representing contiguous
data without trialdefinition.
Trials are concatenated along the time axis.
"""
if getattr(self._data, "id", None) is not None:
if self._data.id.valid == 0:
lgl = "open HDF5 file"
act = "backing HDF5 file {} has been closed"
raise SPYValueError(legal=lgl, actual=act.format(self.filename),
varname="data")
return self._data
@data.setter
def data(self, inData):
self._set_dataset_property(inData, "data")
if inData is None:
return
def __str__(self):
# Get list of print-worthy attributes
ppattrs = [attr for attr in self.__dir__()
if not (attr.startswith("_") or attr in ["log", "trialdefinition"])]
ppattrs = [attr for attr in ppattrs
if not (inspect.ismethod(getattr(self, attr))
or isinstance(getattr(self, attr), Iterator))]
if self.__class__.__name__ == "CrossSpectralData":
ppattrs.remove("channel")
ppattrs.sort()
# Construct string for pretty-printing class attributes
dsep = " by "
hdstr = "Syncopy {clname:s} object with fields\n\n"
ppstr = hdstr.format(clname=self.__class__.__name__)
maxKeyLength = max([len(k) for k in ppattrs])
printString = "{0:>" + str(maxKeyLength + 5) + "} : {1:}\n"
for attr in ppattrs:
value = getattr(self, attr)
if hasattr(value, 'shape') and attr == "data" and self.sampleinfo is not None:
tlen = np.unique(np.diff(self.sampleinfo))
if tlen.size == 1:
trlstr = "of length {} ".format(str(tlen[0]))
else:
trlstr = ""
dsize = np.prod(self.data.shape)*self.data.dtype.itemsize/1024**2
dunit = "MB"
if dsize > 1000:
dsize /= 1024
dunit = "GB"
valueString = "{} trials {}defined on ".format(str(len(self.trials)), trlstr)
valueString += "[" + " x ".join([str(numel) for numel in value.shape]) \
+ "] {dt:s} {tp:s} " +\
"of size {sz:3.2f} {szu:s}"
valueString = valueString.format(dt=self.data.dtype.name,
tp=self.data.__class__.__name__,
sz=dsize,
szu=dunit)
elif hasattr(value, 'shape'):
valueString = "[" + " x ".join([str(numel) for numel in value.shape]) \
+ "] element " + str(type(value))
elif isinstance(value, list):
if attr == "dimord" and value is not None:
valueString = dsep.join(dim for dim in self.dimord)
else:
valueString = "{0} element list".format(len(value))
elif isinstance(value, dict):
msg = "dictionary with {nk:s}keys{ks:s}"
keylist = value.keys()
showkeys = len(keylist) < 7
valueString = msg.format(nk=str(len(keylist)) + " " if not showkeys else "",
ks=" '" + "', '".join(key for key in keylist) + "'" if showkeys else "")
else:
valueString = str(value)
ppstr += printString.format(attr, valueString)
ppstr += "\nUse `.log` to see object history"
return ppstr
@property
def _shapes(self):
if self.sampleinfo is not None:
shp = [list(self.data.shape) for k in range(self.sampleinfo.shape[0])]
for k, sg in enumerate(self.sampleinfo):
shp[k][self._stackingDim] = sg[1] - sg[0]
return [tuple(sp) for sp in shp]
@property
def channel(self):
""" :class:`numpy.ndarray` : list of recording channel names """
# if data exists but no user-defined channel labels, create them on the fly
if self._channel is None and self._data is not None:
nChannel = self.data.shape[self.dimord.index("channel")]
return np.array(["channel" + str(i + 1).zfill(len(str(nChannel)))
for i in range(nChannel)])
return self._channel
@channel.setter
def channel(self, channel):
if channel is None:
self._channel = None
return
if self.data is None:
raise SPYValueError("Syncopy: Cannot assign `channels` without data. " +
"Please assign data first")
array_parser(channel, varname="channel", ntype="str",
dims=(self.data.shape[self.dimord.index("channel")],))
self._channel = np.array(channel)
@property
def samplerate(self):
"""float: sampling rate of uniformly sampled data in Hz"""
return self._samplerate
@samplerate.setter
def samplerate(self, sr):
if sr is None:
self._samplerate = None
return
scalar_parser(sr, varname="samplerate", lims=[np.finfo('float').eps, np.inf])
self._samplerate = float(sr)
# we need a new TimeIndexer
if self.trialdefinition is not None:
self._time = TimeIndexer(self.trialdefinition,
self.samplerate,
list(self._trial_ids))
@BaseData.trialdefinition.setter
def trialdefinition(self, trldef):
# all-to-all trialdefinition
if trldef is None:
self._trialdefinition = np.array([[0, self.data.shape[self.dimord.index("time")], 0]])
self._trial_ids = [0]
else:
scount = self.data.shape[self.dimord.index("time")]
array_parser(trldef, varname="trialdefinition", dims=2)
array_parser(trldef[:, :2], varname="sampleinfo", hasnan=False,
hasinf=False, ntype="int_like", lims=[0, scount])
self._trialdefinition = trldef.copy()
self._trial_ids = np.arange(self.sampleinfo.shape[0])
self._time = TimeIndexer(self.trialdefinition,
self.samplerate,
list(self._trial_ids))
@property
def time(self):
"""indexable iterable of the time arrays"""
if self.samplerate is not None and self.sampleinfo is not None:
return self._time
# Helper function that grabs a single trial
[docs] def _get_trial(self, trialno):
idx = [slice(None)] * len(self.dimord)
idx[self._stackingDim] = slice(int(self.sampleinfo[trialno, 0]), int(self.sampleinfo[trialno, 1]))
return self._data[tuple(idx)]
[docs] def _is_empty(self):
return super()._is_empty() or self.samplerate is None
# Helper function that spawns a `FauxTrial` object given actual trial information
[docs] def _preview_trial(self, trialno):
"""
Generate a `FauxTrial` instance of a trial
Parameters
----------
trialno : int
Number of trial the `FauxTrial` object is intended to mimic
Returns
-------
faux_trl : :class:`syncopy.datatype.base_data.FauxTrial`
An instance of :class:`syncopy.datatype.base_data.FauxTrial` mainly
intended to be used in `noCompute` runs of
:meth:`syncopy.shared.computational_routine.ComputationalRoutine.computeFunction`
to avoid loading actual trial-data into memory.
Notes
-----
If an active in-place selection is found, the generated `FauxTrial` object
respects it (e.g., if only 2 of 10 channels are selected in-place, `faux_trl`
reports to only contain 2 channels)
See also
--------
syncopy.datatype.base_data.FauxTrial : class definition and further details
syncopy.shared.computational_routine.ComputationalRoutine : Syncopy compute engine
"""
shp = list(self.data.shape)
idx = [slice(None)] * len(self.dimord)
stop = int(self.sampleinfo[trialno, 1])
start = int(self.sampleinfo[trialno, 0])
shp[self._stackingDim] = stop - start
idx[self._stackingDim] = slice(start, stop)
# process existing data selections
if self.selection is not None:
# time-selection is most delicate due to trial-offset
tsel = self.selection.time[trialno]
if isinstance(tsel, slice):
if tsel.start is not None:
tstart = tsel.start
else:
tstart = 0
if tsel.stop is not None:
tstop = tsel.stop
else:
tstop = stop - start
# account for trial offsets and compute slicing index + shape
start = start + tstart
stop = start + (tstop - tstart)
idx[self._stackingDim] = slice(start, stop)
shp[self._stackingDim] = stop - start
else:
idx[self._stackingDim] = [tp + start for tp in tsel]
shp[self._stackingDim] = len(tsel)
# process the rest
dims = list(self.dimord)
dims.pop(self._stackingDim)
for dim in dims:
sel = getattr(self.selection, dim)
if sel is not None:
dimIdx = self.dimord.index(dim)
idx[dimIdx] = sel
if isinstance(sel, slice):
begin, end, delta = sel.start, sel.stop, sel.step
if sel.start is None:
begin = 0
elif sel.start < 0:
begin = shp[dimIdx] + sel.start
if sel.stop is None:
end = shp[dimIdx]
elif sel.stop < 0:
end = shp[dimIdx] + sel.stop
if sel.step is None:
delta = 1
shp[dimIdx] = int(np.ceil((end - begin) / delta))
idx[dimIdx] = slice(begin, end, delta)
elif isinstance(sel, list):
shp[dimIdx] = len(sel)
else:
shp[dimIdx] = 1
return FauxTrial(shp, tuple(idx), self.data.dtype, self.dimord)
# Make instantiation persistent in all subclasses
[docs] def __init__(self, data=None, channel=None, samplerate=None, **kwargs):
self._channel = None
self._samplerate = None
self._data = None
self._time = None
self.samplerate = samplerate # use setter for error-checking
# Call initializer
super().__init__(data=data, **kwargs)
# catches channel propagation
# from concatenation of syncopy data objects
if self._channel is None:
self.channel = channel
# overwrites channels from concatenation if desired
elif channel is not None:
self.channel = channel
if self.data is not None:
# In case of manual data allocation (reading routine would leave a
# mark in `cfg`), fill in missing info
if self.sampleinfo is None:
# First, fill in dimensional info
definetrial(self, kwargs.get("trialdefinition"))
# plotting, only virtual in the abc
[docs] def singlepanelplot(self):
raise NotImplementedError
[docs] def multipanelplot(self):
raise NotImplementedError
[docs]class AnalogData(ContinuousData):
"""Multi-channel, uniformly-sampled, analog (real float) data
This class can be used for representing any analog signal data with a time
and a channel axis such as local field potentials, firing rates, eye
position etc.
The data is always stored as a two-dimensional array on disk. On disk, Trials are
concatenated along the time axis.
Data is only read from disk on demand, similar to HDF5 files.
"""
_infoFileProperties = ContinuousData._infoFileProperties
_defaultDimord = ["time", "channel"]
_stackingDimLabel = "time"
_selectionKeyWords = ContinuousData._selectionKeyWords + ('channel',)
# "Constructor"
[docs] def __init__(self,
data=None,
filename=None,
trialdefinition=None,
samplerate=None,
channel=None,
dimord=None):
"""Initialize an :class:`AnalogData` object.
Parameters
----------
data : 2D :class:numpy.ndarray or HDF5 dataset
multi-channel time series data with uniform sampling
filename : str
path to target filename that should be used for writing
trialdefinition : :class:`EventData` object or Mx3 array
[start, stop, trigger_offset] sample indices for `M` trials
samplerate : float
sampling rate in Hz
channel : str or list/array(str)
dimord : list(str)
ordered list of dimension labels
1. `filename` + `data` : create hdf dataset incl. sampleinfo @filename
2. just `data` : try to attach data (error checking done by :meth:`AnalogData.data.setter`)
See also
--------
:func:`syncopy.definetrial`
"""
# FIXME: I think escalating `dimord` to `BaseData` should be sufficient so that
# the `if any(key...) loop in `BaseData.__init__()` takes care of assigning a default dimord
if dimord is None:
dimord = self._defaultDimord
# Call parent initializer
super().__init__(data=data,
filename=filename,
trialdefinition=trialdefinition,
samplerate=samplerate,
channel=channel,
dimord=dimord)
# set as instance attribute to allow modification
self._hdfFileAttributeProperties = BaseData._hdfFileAttributeProperties + ("samplerate", "channel",)
# implement plotting
[docs] def singlepanelplot(self, shifted=True, **show_kwargs):
figax = sp_plotting.plot_AnalogData(self, shifted, **show_kwargs)
return figax
[docs] def multipanelplot(self, **show_kwargs):
figax = mp_plotting.plot_AnalogData(self, **show_kwargs)
return figax
[docs]class SpectralData(ContinuousData):
"""
Multi-channel, real or complex spectral data
This class can be used for representing any data with a frequency, channel,
and optionally a time axis. The datatype can be complex or float.
"""
_infoFileProperties = ContinuousData._infoFileProperties + ("taper", "freq",)
_hdfFileAttributeProperties = BaseData._hdfFileAttributeProperties +\
("samplerate", "channel", "freq",)
_defaultDimord = ["time", "taper", "freq", "channel"]
_stackingDimLabel = "time"
_selectionKeyWords = ContinuousData._selectionKeyWords + ('channel', 'frequency', 'taper',)
@property
def taper(self):
""" :class:`numpy.ndarray` : list of window functions used """
if self._taper is None and self._data is not None:
nTaper = self.data.shape[self.dimord.index("taper")]
return np.array(["taper" + str(i + 1).zfill(len(str(nTaper)))
for i in range(nTaper)])
return self._taper
@taper.setter
def taper(self, tpr):
if tpr is None:
self._taper = None
return
if self.data is None:
print("Syncopy core - taper: Cannot assign `taper` without data. "+\
"Please assing data first")
try:
array_parser(tpr, dims=(self.data.shape[self.dimord.index("taper")],),
varname="taper", ntype="str", )
except Exception as exc:
raise exc
self._taper = np.array(tpr)
@property
def freq(self):
""":class:`numpy.ndarray`: frequency axis in Hz """
# if data exists but no user-defined frequency axis, create one on the fly
if self._freq is None and self._data is not None:
return np.arange(self.data.shape[self.dimord.index("freq")])
return self._freq
@freq.setter
def freq(self, freq):
if freq is None:
self._freq = None
return
if self.data is None:
print("Syncopy core - freq: Cannot assign `freq` without data. "+\
"Please assing data first")
return
array_parser(freq, varname="freq", hasnan=False, hasinf=False,
dims=(self.data.shape[self.dimord.index("freq")],))
self._freq = np.array(freq)
# Helper function that extracts frequency-related indices
def _get_freq(self, foi=None, foilim=None):
"""
`foi` is legacy, we use `foilim` for frequency selection
Error checking is performed by `Selector` class
"""
if foilim is not None:
_, selFreq = best_match(self.freq, foilim, span=True)
selFreq = selFreq.tolist()
if len(selFreq) > 1:
selFreq = slice(selFreq[0], selFreq[-1] + 1, 1)
elif foi is not None:
_, selFreq = best_match(self.freq, foi)
selFreq = selFreq.tolist()
if len(selFreq) > 1:
freqSteps = np.diff(selFreq)
if freqSteps.min() == freqSteps.max() == 1:
selFreq = slice(selFreq[0], selFreq[-1] + 1, 1)
else:
selFreq = slice(None)
return selFreq
# "Constructor"
[docs] def __init__(self,
data=None,
filename=None,
trialdefinition=None,
samplerate=None,
channel=None,
taper=None,
freq=None,
dimord=None):
self._taper = None
self._freq = None
# FIXME: See similar comment above in `AnalogData.__init__()`
if dimord is None:
dimord = self._defaultDimord
# Call parent initializer
super().__init__(data=data,
filename=filename,
trialdefinition=trialdefinition,
samplerate=samplerate,
channel=channel,
dimord=dimord)
# If __init__ attached data, be careful
if self.data is not None:
# In case of manual data allocation (reading routine would leave a
# mark in `cfg`), fill in missing info
if len(self.cfg) == 0:
# concat operations will set this!
if self.freq is None:
self.freq = freq
if self.taper is None:
self.taper = taper
# Dummy assignment: if we have no data but freq/taper labels,
# assign bogus to trigger setter warnings
else:
self.freq = freq
self.taper = taper
# implement plotting
[docs] def singlepanelplot(self, logscale=True, **show_kwargs):
figax = sp_plotting.plot_SpectralData(self, logscale, **show_kwargs)
return figax
[docs] def multipanelplot(self, **show_kwargs):
figax = mp_plotting.plot_SpectralData(self, **show_kwargs)
return figax
[docs]class CrossSpectralData(ContinuousData):
"""
Multi-channel real or complex spectral connectivity data
This class can be used for representing channel-channel interactions involving
frequency and optionally time or lag. The datatype can be complex or float.
"""
# Adapt `infoFileProperties` and `hdfFileAttributeProperties` from `ContinuousData`
_infoFileProperties = BaseData._infoFileProperties +\
("samplerate", "channel_i", "channel_j", "freq", )
_hdfFileAttributeProperties = BaseData._hdfFileAttributeProperties +\
("samplerate", "channel_i", "channel_j", "freq", )
_defaultDimord = ["time", "freq", "channel_i", "channel_j"]
_stackingDimLabel = "time"
_selectionKeyWords = ContinuousData._selectionKeyWords + ('channel_i', 'channel_j', 'frequency',)
_channel_i = None
_channel_j = None
_samplerate = None
_data = None
# Steal frequency-related stuff from `SpectralData`
_get_freq = SpectralData._get_freq
freq = SpectralData.freq
# override channel property to avoid accidental access
@property
def channel(self):
return "see channel_i and channel_j"
@channel.setter
def channel(self, channel):
if channel is None:
pass
else:
msg = f"CrossSpectralData has no 'channel' to set but dimord: {self._dimord}"
raise NotImplementedError(msg)
@property
def channel_i(self):
""" :class:`numpy.ndarray` : list of recording channel names """
# if data exists but no user-defined channel labels, create them on the fly
if self._channel_i is None and self._data is not None:
nChannel = self.data.shape[self.dimord.index("channel_i")]
return np.array(["channel" + str(i + 1).zfill(len(str(nChannel)))
for i in range(nChannel)])
return self._channel_i
@channel_i.setter
def channel_i(self, channel_i):
""" :class:`numpy.ndarray` : list of channel labels """
if channel_i is None:
self._channel_i = None
return
if self.data is None:
raise SPYValueError("Syncopy: Cannot assign `channels` without data. " +
"Please assign data first")
try:
array_parser(channel_i, varname="channel_i", ntype="str",
dims=(self.data.shape[self.dimord.index("channel_i")],))
except Exception as exc:
raise exc
self._channel_i = np.array(channel_i)
@property
def channel_j(self):
""" :class:`numpy.ndarray` : list of recording channel names """
# if data exists but no user-defined channel labels, create them on the fly
if self._channel_j is None and self._data is not None:
nChannel = self.data.shape[self.dimord.index("channel_j")]
return np.array(["channel" + str(i + 1).zfill(len(str(nChannel)))
for i in range(nChannel)])
return self._channel_j
@channel_j.setter
def channel_j(self, channel_j):
""" :class:`numpy.ndarray` : list of channel labels """
if channel_j is None:
self._channel_j = None
return
if self.data is None:
raise SPYValueError("Syncopy: Cannot assign `channels` without data. " +
"Please assign data first")
try:
array_parser(channel_j, varname="channel_j", ntype="str",
dims=(self.data.shape[self.dimord.index("channel_j")],))
except Exception as exc:
raise exc
self._channel_j = np.array(channel_j)
[docs] def __init__(self,
data=None,
filename=None,
channel_i=None,
channel_j=None,
samplerate=None,
freq=None,
dimord=None):
self._freq = None
# Set dimensional labels
self.dimord = dimord
# Call parent initializer
super().__init__(data=data,
filename=filename,
samplerate=samplerate,
dimord=dimord)
if freq is not None:
# set frequencies
self.freq = freq
[docs] def singlepanelplot(self, **show_kwargs):
sp_plotting.plot_CrossSpectralData(self, **show_kwargs)
[docs]class TimeLockData(ContinuousData):
"""
Multi-channel, uniformly-sampled, time-locked data.
"""
_infoFileProperties = ContinuousData._infoFileProperties
_defaultDimord = ["time", "channel"]
_selectionKeyWords = ContinuousData._selectionKeyWords + ('channel',)
_stackingDimLabel = "time"
# "Constructor"
[docs] def __init__(self,
data=None,
filename=None,
trialdefinition=None,
samplerate=None,
channel=None,
dimord=None):
"""
Initialize an :class:`TimeLockData` object.
Parameters
----------
data : 2D :class:numpy.ndarray or HDF5 dataset
multi-channel time series data with uniform sampling
filename : str
path to target filename that should be used for writing
samplerate : float
sampling rate in Hz
channel : str or list/array(str)
dimord : list(str)
ordered list of dimension labels
See also
--------
:func:`syncopy.definetrial`
"""
if dimord is None:
dimord = self._defaultDimord
# Call parent initializer
# trialdefinition has to come from a CR!
super().__init__(data=data,
filename=filename,
trialdefinition=trialdefinition,
samplerate=samplerate,
channel=channel,
dimord=dimord)
# A `h5py.Dataset` holding the average of `data`, or `None` if not computed yet.
self._avg = None
# A `h5py.Dataset` holding variance of `data`, or `None` if not computed yet.
self._var = None
# A `h5py.Dataset` holding covariance of `data`, or `None` if not computed yet.
self._cov = None
# set as instance attribute to allow modification
self._hdfFileDatasetProperties = ContinuousData._hdfFileDatasetProperties + ("avg", "var", "cov",)
@property
def avg(self):
return self._avg
@property
def var(self):
return self._var
@property
def cov(self):
return self._cov
@ContinuousData.trialdefinition.setter
def trialdefinition(self, trldef):
"""
Override trialdefinition setter, which is special for time-locked data:
all trials have to have the same length and relative timings.
So the trialdefinition has the same offsets everywhere, and it has the general
simple structure:
[[0, nSamples, offset],
[nSamples, 2 * nSamples, offset],
[2 * nSamples, 3 * nSamples, offset],
...]
"""
# we need parent setter for basic validation
ContinuousData.trialdefinition.fset(self, trldef)
# now check for additional conditions
# FIXME: not clear, is timelocked data to be expected
# to have same offsets?!
# if not np.unique(trldef[:, 2]).size == 1:
# lgl = "equal offsets for timelocked data"
# act = "different offsets"
# raise SPYValueError(lgl, varname="trialdefinition", actual=act)
# diff-diff should give 0 -> same number of samples for each trial
if not np.all(np.diff(trldef, axis=0, n=2) == 0):
lgl = "all trials of same length for timelocked data"
act = "unequal sized trials defined"
raise SPYValueError(lgl, varname="trialdefinition", actual=act)
# TODO - overload `time` property, as there is only one by definition!
# implement plotting
[docs] def singlepanelplot(self, shifted=True, **show_kwargs):
figax = sp_plotting.plot_AnalogData(self, shifted, **show_kwargs)
return figax
[docs] def multipanelplot(self, **show_kwargs):
figax = mp_plotting.plot_AnalogData(self, **show_kwargs)
return figax