# -*- coding: utf-8 -*-
#
# Syncopy's abstract base class for continuous data + regular children
#
"""Uniformly sampled (continuous data).
This module holds classes to represent data with a uniformly sampled time axis.
"""
# Builtin/3rd party package imports
import inspect
import numpy as np
from abc import ABC
from collections.abc import Iterator
# Local imports
from .base_data import BaseData, FauxTrial
from .methods.definetrial import definetrial
from .base_data import BaseData
from syncopy.shared.parsers import scalar_parser, array_parser
from syncopy.shared.errors import SPYValueError, SPYError
from syncopy.shared.tools import best_match
from syncopy.plotting import sp_plotting, mp_plotting
from syncopy.io.nwb import _analog_timelocked_to_nwbfile
from .util import TimeIndexer
from syncopy import __pynwb__
if __pynwb__: # pragma: no cover
from pynwb import NWBHDF5IO
__all__ = ["AnalogData", "SpectralData", "CrossSpectralData", "TimeLockData"]
[docs]class ContinuousData(BaseData, ABC):
"""Abstract class for uniformly sampled data
Notes
-----
This class cannot be instantiated. Use one of the children instead.
"""
_infoFileProperties = BaseData._infoFileProperties + (
"samplerate",
"channel",
)
_hdfFileDatasetProperties = BaseData._hdfFileDatasetProperties + ("data",)
# all continuous data types have a time axis
_selectionKeyWords = BaseData._selectionKeyWords + ("latency",)
@property
def data(self):
"""
HDF5 dataset property representing contiguous
data without trialdefinition.
Trials are concatenated along the time axis.
"""
if getattr(self._data, "id", None) is not None:
if self._data.id.valid == 0:
lgl = "open HDF5 file"
act = "backing HDF5 file {} has been closed"
raise SPYValueError(legal=lgl, actual=act.format(self.filename), varname="data")
return self._data
@data.setter
def data(self, inData):
self._set_dataset_property(inData, "data")
if inData is None:
return
@property
def is_time_locked(self):
# check for equal offsets
if not np.unique(self.trialdefinition[:, 2]).size == 1:
return False
# check for equal sample sizes of the trials
if not np.unique(np.diff(self.sampleinfo, axis=1)).size == 1:
return False
return True
def __str__(self):
# Get list of print-worthy attributes
ppattrs = [
attr
for attr in self.__dir__()
if not (attr.startswith("_") or attr in ["log", "trialdefinition"])
]
ppattrs = [
attr
for attr in ppattrs
if not (inspect.ismethod(getattr(self, attr)) or isinstance(getattr(self, attr), Iterator))
]
if self.__class__.__name__ == "CrossSpectralData":
ppattrs.remove("channel")
ppattrs.sort()
# Construct string for pretty-printing class attributes
dsep = " by "
hdstr = "Syncopy {clname:s} object with fields\n\n"
ppstr = hdstr.format(clname=self.__class__.__name__)
maxKeyLength = max([len(k) for k in ppattrs])
printString = "{0:>" + str(maxKeyLength + 5) + "} : {1:}\n"
for attr in ppattrs:
value = getattr(self, attr)
if hasattr(value, "shape") and attr == "data" and self.sampleinfo is not None:
tlen = np.unique(np.diff(self.sampleinfo))
if tlen.size == 1:
trlstr = "of length {} ".format(str(tlen[0]))
else:
trlstr = ""
dsize = np.prod(self.data.shape) * self.data.dtype.itemsize / 1024**2
dunit = "MB"
if dsize > 1000:
dsize /= 1024
dunit = "GB"
valueString = "{} trials {}defined on ".format(str(len(self.trials)), trlstr)
valueString += (
"["
+ " x ".join([str(numel) for numel in value.shape])
+ "] {dt:s} {tp:s} "
+ "of size {sz:3.2f} {szu:s}"
)
valueString = valueString.format(
dt=self.data.dtype.name,
tp=self.data.__class__.__name__,
sz=dsize,
szu=dunit,
)
elif hasattr(value, "shape"):
valueString = (
"[" + " x ".join([str(numel) for numel in value.shape]) + "] element " + str(type(value))
)
elif isinstance(value, list):
if attr == "dimord" and value is not None:
valueString = dsep.join(dim for dim in self.dimord)
else:
valueString = "{0} element list".format(len(value))
elif isinstance(value, dict):
msg = "dictionary with {nk:s}keys{ks:s}"
keylist = value.keys()
showkeys = len(keylist) < 7
valueString = msg.format(
nk=str(len(keylist)) + " " if not showkeys else "",
ks=" '" + "', '".join(key for key in keylist) + "'" if showkeys else "",
)
else:
valueString = str(value)
ppstr += printString.format(attr, valueString)
ppstr += "\nUse `.log` to see object history"
return ppstr
@property
def _shapes(self):
if self.sampleinfo is not None:
shp = [list(self.data.shape) for k in range(self.sampleinfo.shape[0])]
for k, sg in enumerate(self.sampleinfo):
shp[k][self._stackingDim] = sg[1] - sg[0]
return [tuple(sp) for sp in shp]
@property
def channel(self):
""":class:`numpy.ndarray` : list of recording channel names"""
# if data exists but no user-defined channel labels, create them on the fly
if self._channel is None and self._data is not None:
nChannel = self.data.shape[self.dimord.index("channel")]
# default labels
return np.array(["channel" + str(i + 1).zfill(len(str(nChannel))) for i in range(nChannel)])
return self._channel
@channel.setter
def channel(self, channel):
if channel is None:
self._channel = None
return
if self.data is None:
raise SPYValueError(
"Syncopy: Cannot assign `channels` without data. " + "Please assign data first"
)
array_parser(
channel,
varname="channel",
ntype="str",
dims=(self.data.shape[self.dimord.index("channel")],),
)
self._channel = np.array(channel)
@property
def samplerate(self):
"""float: sampling rate of uniformly sampled data in Hz"""
return self._samplerate
@samplerate.setter
def samplerate(self, sr):
if sr is None:
self._samplerate = None
return
scalar_parser(sr, varname="samplerate", lims=[np.finfo("float").eps, np.inf])
self._samplerate = float(sr)
# we need a new TimeIndexer
if self.trialdefinition is not None:
self._time = TimeIndexer(self.trialdefinition, self.samplerate, list(self._trial_ids))
@BaseData.trialdefinition.setter
def trialdefinition(self, trldef):
# all-to-all trialdefinition
if trldef is None:
self._trialdefinition = np.array([[0, self.data.shape[self.dimord.index("time")], 0]])
self._trial_ids = [0]
else:
scount = self.data.shape[self.dimord.index("time")]
array_parser(trldef, varname="trialdefinition", dims=2)
if trldef.shape[-1] < 3:
lgl = "trialdefinition with at least 3 columns: [start, stop, offset]"
act = f"got only {trldef.shape[-1]} columns"
raise SPYValueError(lgl, "trialdefinition", act)
array_parser(
trldef[:, :2],
varname="sampleinfo",
hasnan=False,
hasinf=False,
ntype="int_like",
lims=[0, scount],
)
self._trialdefinition = trldef.copy()
self._trial_ids = np.arange(self.sampleinfo.shape[0])
self._time = TimeIndexer(self.trialdefinition, self.samplerate, list(self._trial_ids))
@property
def time(self):
"""indexable iterable of the time arrays"""
if self.samplerate is not None and self.sampleinfo is not None:
return self._time
# Helper function that grabs a single trial
[docs] def _get_trial(self, trialno):
idx = [slice(None)] * len(self.dimord)
idx[self._stackingDim] = slice(int(self.sampleinfo[trialno, 0]), int(self.sampleinfo[trialno, 1]))
return self._data[tuple(idx)]
[docs] def _is_empty(self):
return super()._is_empty() or self.samplerate is None
# Helper function that spawns a `FauxTrial` object given actual trial information
[docs] def _preview_trial(self, trialno):
"""
Generate a `FauxTrial` instance of a trial
Parameters
----------
trialno : int
Number of trial the `FauxTrial` object is intended to mimic
Returns
-------
faux_trl : :class:`syncopy.datatype.base_data.FauxTrial`
An instance of :class:`syncopy.datatype.base_data.FauxTrial` mainly
intended to be used in `noCompute` runs of
:meth:`syncopy.shared.computational_routine.ComputationalRoutine.computeFunction`
to avoid loading actual trial-data into memory.
Notes
-----
If an active in-place selection is found, the generated `FauxTrial` object
respects it (e.g., if only 2 of 10 channels are selected in-place, `faux_trl`
reports to only contain 2 channels)
See also
--------
syncopy.datatype.base_data.FauxTrial : class definition and further details
syncopy.shared.computational_routine.ComputationalRoutine : Syncopy compute engine
"""
shp = list(self.data.shape)
idx = [slice(None)] * len(self.dimord)
stop = int(self.sampleinfo[trialno, 1])
start = int(self.sampleinfo[trialno, 0])
shp[self._stackingDim] = stop - start
idx[self._stackingDim] = slice(start, stop)
# process existing data selections
if self.selection is not None:
# time-selection is most delicate due to trial-offset
tsel = self.selection.time[trialno]
if isinstance(tsel, slice):
if tsel.start is not None:
tstart = tsel.start
else:
tstart = 0
if tsel.stop is not None:
tstop = tsel.stop
else:
tstop = stop - start
# account for trial offsets and compute slicing index + shape
start = start + tstart
stop = start + (tstop - tstart)
idx[self._stackingDim] = slice(start, stop)
shp[self._stackingDim] = stop - start
else:
idx[self._stackingDim] = [tp + start for tp in tsel]
shp[self._stackingDim] = len(tsel)
# process the rest
dims = list(self.dimord)
dims.pop(self._stackingDim)
for dim in dims:
sel = getattr(self.selection, dim)
if sel is not None:
dimIdx = self.dimord.index(dim)
idx[dimIdx] = sel
if isinstance(sel, slice):
begin, end, delta = sel.start, sel.stop, sel.step
if sel.start is None:
begin = 0
elif sel.start < 0:
begin = shp[dimIdx] + sel.start
if sel.stop is None:
end = shp[dimIdx]
elif sel.stop < 0:
end = shp[dimIdx] + sel.stop
if sel.step is None:
delta = 1
shp[dimIdx] = int(np.ceil((end - begin) / delta))
idx[dimIdx] = slice(begin, end, delta)
elif isinstance(sel, list):
shp[dimIdx] = len(sel)
else:
shp[dimIdx] = 1
return FauxTrial(shp, tuple(idx), self.data.dtype, self.dimord)
# Make instantiation persistent in all subclasses
[docs] def __init__(self, data=None, channel=None, samplerate=None, **kwargs):
self._channel = None
self._samplerate = None
self._data = None
self._time = None
self.samplerate = samplerate # use setter for error-checking
# Call initializer
super().__init__(data=data, **kwargs)
# catches channel propagation
# from concatenation of syncopy data objects
if self._channel is None:
self.channel = channel
# overwrites channels from concatenation if desired
elif channel is not None:
self.channel = channel
if self.data is not None:
# In case of manual data allocation (reading routine would leave a
# mark in `cfg`), fill in missing info
if self.sampleinfo is None:
# First, fill in dimensional info
definetrial(self, kwargs.get("trialdefinition"))
# plotting, only virtual in the abc
[docs] def singlepanelplot(self):
raise NotImplementedError
[docs] def multipanelplot(self):
raise NotImplementedError
[docs]class AnalogData(ContinuousData):
"""Multi-channel, uniformly-sampled, analog (real float) data
This class can be used for representing any analog signal data with a time
and a channel axis such as local field potentials, firing rates, eye
position etc.
The data is always stored as a two-dimensional array on disk. On disk, Trials are
concatenated along the time axis.
Data is only read from disk on demand, similar to HDF5 files.
"""
_infoFileProperties = ContinuousData._infoFileProperties
_defaultDimord = ["time", "channel"]
_stackingDimLabel = "time"
_selectionKeyWords = ContinuousData._selectionKeyWords + ("channel",)
# "Constructor"
[docs] def __init__(
self,
data=None,
filename=None,
trialdefinition=None,
samplerate=None,
channel=None,
dimord=None,
):
"""Initialize an :class:`AnalogData` object.
Parameters
----------
data : 2D :class:numpy.ndarray or HDF5 dataset
multi-channel time series data with uniform sampling
filename : str
path to target filename that should be used for writing
trialdefinition : :class:`EventData` object or Mx3 array
[start, stop, trigger_offset] sample indices for `M` trials
samplerate : float
sampling rate in Hz
channel : str or list/array(str)
dimord : list(str)
ordered list of dimension labels
1. `filename` + `data` : create hdf dataset incl. sampleinfo @filename
2. just `data` : try to attach data (error checking done by :meth:`AnalogData.data.setter`)
See also
--------
:func:`syncopy.definetrial`
"""
# FIXME: I think escalating `dimord` to `BaseData` should be sufficient so that
# the `if any(key...) loop in `BaseData.__init__()` takes care of assigning a default dimord
if dimord is None:
dimord = self._defaultDimord
# Call parent initializer
super().__init__(
data=data,
filename=filename,
trialdefinition=trialdefinition,
samplerate=samplerate,
channel=channel,
dimord=dimord,
)
# set as instance attribute to allow modification
self._hdfFileAttributeProperties = BaseData._hdfFileAttributeProperties + (
"samplerate",
"channel",
)
# implement plotting
[docs] def singlepanelplot(self, shifted=True, **show_kwargs):
figax = sp_plotting.plot_AnalogData(self, shifted, **show_kwargs)
return figax
[docs] def multipanelplot(self, **show_kwargs):
figax = mp_plotting.plot_AnalogData(self, **show_kwargs)
return figax
[docs] def save_nwb(self, outpath, nwbfile=None, with_trialdefinition=True, is_raw=True):
"""Save AnalogData in Neurodata Without Borders (NWB) file format.
An NWBFile represents a single session of an experiment.
Parameters
----------
outpath : str, path-like. Where to save the NWB file, including file name and `.nwb` extension.
All directories in the path must exist. Example: `'mydata.nwb'`.
nwbfile : :class:`~pynwb.file.NWBFile` instance
Set to an existing instance to add an LFP signal with `is_raw=False`
with_trialdefinition : Boolean, whether to save the trial definition in the NWB file.
is_raw : Boolean, whether this is raw data (that should never change), as opposed to LFP data that
typically originates from some preprocessing, e.g., down-sampling and detrending. Determines where
data is stored in the NWB container, to make it easier for other software to interprete what
the data represents. If `is_raw` is `True`, the ``ElectricalSeries`` is stored directly in an
acquisition of the :class:`pynwb.NWBFile`. If False, it is stored inside an `LFP` instance in
a processing group called `ecephys`.
Returns
-------
nwbfile : :class:`~pynwb.file.NWBFile` instance
Can be used to further add meta-information or even data via the pynwb API.
To save use the :class:`pynwb.NWBHDF5IO` interface.
Notes
-----
Due to the very general architecture of the NWB format, many fields need to be interpreted
by software reading the format. Thus,
providing a generic function to save Syncopy data in NWB format is possible only if you know who will read it.
Depending on your target software, you may need to manually format the data using pynwb before writing
it to disk, or manually open it using pynwb before using it with the target software.
In place selections are ignored, the full dataset is exported. Create a new Syncopy data object from a selection
before calling this function if you want to export a subset only.
The Syncopy NWB reader only supports the NWB raw data format.
This function requires the optional 'pynwb' dependency to be installed.
"""
if not __pynwb__:
raise SPYError("NWB support is not available. Please install the 'pynwb' package.")
nwbfile = _analog_timelocked_to_nwbfile(
self,
nwbfile=nwbfile,
with_trialdefinition=with_trialdefinition,
is_raw=is_raw,
)
# Write the file to disk.
with NWBHDF5IO(outpath, "w") as io:
io.write(nwbfile)
return nwbfile
[docs]class SpectralData(ContinuousData):
"""
Multi-channel, real or complex spectral data
This class can be used for representing any data with a frequency, channel,
and optionally a time axis. The datatype can be complex or float.
"""
_infoFileProperties = ContinuousData._infoFileProperties + (
"taper",
"freq",
)
_hdfFileAttributeProperties = BaseData._hdfFileAttributeProperties + (
"samplerate",
"channel",
"freq",
)
_defaultDimord = ["time", "taper", "freq", "channel"]
_stackingDimLabel = "time"
_selectionKeyWords = ContinuousData._selectionKeyWords + (
"channel",
"frequency",
"taper",
)
@property
def taper(self):
""":class:`numpy.ndarray` : list of window functions used"""
if self._taper is None and self._data is not None:
nTaper = self.data.shape[self.dimord.index("taper")]
return np.array(["taper" + str(i + 1).zfill(len(str(nTaper))) for i in range(nTaper)])
return self._taper
@taper.setter
def taper(self, tpr):
if tpr is None:
self._taper = None
return
if self.data is None:
print("Syncopy core - taper: Cannot assign `taper` without data. " + "Please assing data first")
try:
array_parser(
tpr,
dims=(self.data.shape[self.dimord.index("taper")],),
varname="taper",
ntype="str",
)
except Exception as exc:
raise exc
self._taper = np.array(tpr)
@property
def freq(self):
""":class:`numpy.ndarray`: frequency axis in Hz"""
# if data exists but no user-defined frequency axis, create one on the fly
if self._freq is None and self._data is not None:
return np.arange(self.data.shape[self.dimord.index("freq")])
return self._freq
@freq.setter
def freq(self, freq):
if freq is None:
self._freq = None
return
if self.data is None:
print("Syncopy core - freq: Cannot assign `freq` without data. " + "Please assing data first")
return
array_parser(
freq,
varname="freq",
hasnan=False,
hasinf=False,
dims=(self.data.shape[self.dimord.index("freq")],),
)
self._freq = np.array(freq)
# Helper function that extracts frequency-related indices
def _get_freq(self, foi=None, foilim=None):
"""
`foi` is legacy, we use `foilim` for frequency selection
Error checking is performed by `Selector` class
"""
if foilim is not None:
_, selFreq = best_match(self.freq, foilim, span=True)
selFreq = selFreq.tolist()
if len(selFreq) > 1:
selFreq = slice(selFreq[0], selFreq[-1] + 1, 1)
elif foi is not None:
_, selFreq = best_match(self.freq, foi)
selFreq = selFreq.tolist()
if len(selFreq) > 1:
freqSteps = np.diff(selFreq)
if freqSteps.min() == freqSteps.max() == 1:
selFreq = slice(selFreq[0], selFreq[-1] + 1, 1)
else:
selFreq = slice(None)
return selFreq
# "Constructor"
[docs] def __init__(
self,
data=None,
filename=None,
trialdefinition=None,
samplerate=None,
channel=None,
taper=None,
freq=None,
dimord=None,
):
self._taper = None
self._freq = None
# FIXME: See similar comment above in `AnalogData.__init__()`
if dimord is None:
dimord = self._defaultDimord
# Call parent initializer
super().__init__(
data=data,
filename=filename,
trialdefinition=trialdefinition,
samplerate=samplerate,
channel=channel,
dimord=dimord,
)
# If __init__ attached data, be careful
if self.data is not None:
# In case of manual data allocation (reading routine would leave a
# mark in `cfg`), fill in missing info
if len(self.cfg) == 0:
# concat operations will set this!
if self.freq is None:
self.freq = freq
if self.taper is None:
self.taper = taper
# Dummy assignment: if we have no data but freq/taper labels,
# assign bogus to trigger setter warnings
else:
self.freq = freq
self.taper = taper
# implement plotting
[docs] def singlepanelplot(self, logscale=True, **show_kwargs):
figax = sp_plotting.plot_SpectralData(self, logscale, **show_kwargs)
return figax
[docs] def multipanelplot(self, **show_kwargs):
figax = mp_plotting.plot_SpectralData(self, **show_kwargs)
return figax
[docs]class CrossSpectralData(ContinuousData):
"""
Multi-channel real or complex spectral connectivity data
This class can be used for representing channel-channel interactions involving
frequency and optionally time or lag. The datatype can be complex or float.
"""
# Adapt `infoFileProperties` and `hdfFileAttributeProperties` from `ContinuousData`
_infoFileProperties = BaseData._infoFileProperties + (
"samplerate",
"channel_i",
"channel_j",
"freq",
)
_hdfFileAttributeProperties = BaseData._hdfFileAttributeProperties + (
"samplerate",
"channel_i",
"channel_j",
"freq",
)
_defaultDimord = ["time", "freq", "channel_i", "channel_j"]
_stackingDimLabel = "time"
_selectionKeyWords = ContinuousData._selectionKeyWords + (
"channel_i",
"channel_j",
"frequency",
)
_channel_i = None
_channel_j = None
_samplerate = None
_data = None
# Steal frequency-related stuff from `SpectralData`
_get_freq = SpectralData._get_freq
freq = SpectralData.freq
# override channel property to avoid accidental access
@property
def channel(self):
return "see channel_i and channel_j"
@channel.setter
def channel(self, channel):
if channel is None:
pass
else:
msg = f"CrossSpectralData has no 'channel' to set but dimord: {self._dimord}"
raise NotImplementedError(msg)
@property
def channel_i(self):
""":class:`numpy.ndarray` : list of recording channel names"""
# if data exists but no user-defined channel labels, create them on the fly
if self._channel_i is None and self._data is not None:
nChannel = self.data.shape[self.dimord.index("channel_i")]
return np.array(["channel" + str(i + 1).zfill(len(str(nChannel))) for i in range(nChannel)])
return self._channel_i
@channel_i.setter
def channel_i(self, channel_i):
""":class:`numpy.ndarray` : list of channel labels"""
if channel_i is None:
self._channel_i = None
return
if self.data is None:
raise SPYValueError(
"Syncopy: Cannot assign `channels` without data. " + "Please assign data first"
)
try:
array_parser(
channel_i,
varname="channel_i",
ntype="str",
dims=(self.data.shape[self.dimord.index("channel_i")],),
)
except Exception as exc:
raise exc
self._channel_i = np.array(channel_i)
@property
def channel_j(self):
""":class:`numpy.ndarray` : list of recording channel names"""
# if data exists but no user-defined channel labels, create them on the fly
if self._channel_j is None and self._data is not None:
nChannel = self.data.shape[self.dimord.index("channel_j")]
return np.array(["channel" + str(i + 1).zfill(len(str(nChannel))) for i in range(nChannel)])
return self._channel_j
@channel_j.setter
def channel_j(self, channel_j):
""":class:`numpy.ndarray` : list of channel labels"""
if channel_j is None:
self._channel_j = None
return
if self.data is None:
raise SPYValueError(
"Syncopy: Cannot assign `channels` without data. " + "Please assign data first"
)
try:
array_parser(
channel_j,
varname="channel_j",
ntype="str",
dims=(self.data.shape[self.dimord.index("channel_j")],),
)
except Exception as exc:
raise exc
self._channel_j = np.array(channel_j)
[docs] def __init__(
self,
data=None,
filename=None,
channel_i=None,
channel_j=None,
samplerate=None,
freq=None,
dimord=None,
):
self._freq = None
# Set dimensional labels
self.dimord = dimord
# Call parent initializer
super().__init__(data=data, filename=filename, samplerate=samplerate, dimord=dimord)
if freq is not None:
# set frequencies
self.freq = freq
[docs] def singlepanelplot(self, **show_kwargs):
return sp_plotting.plot_CrossSpectralData(self, **show_kwargs)
[docs]class TimeLockData(ContinuousData):
"""
Multi-channel, uniformly-sampled, time-locked data.
"""
_infoFileProperties = ContinuousData._infoFileProperties
_defaultDimord = ["time", "channel"]
_selectionKeyWords = ContinuousData._selectionKeyWords + ("channel",)
_stackingDimLabel = "time"
# "Constructor"
[docs] def __init__(
self,
data=None,
filename=None,
trialdefinition=None,
samplerate=None,
channel=None,
dimord=None,
):
"""
Initialize an :class:`TimeLockData` object.
Parameters
----------
data : 2D :class:numpy.ndarray or HDF5 dataset
multi-channel time series data with uniform sampling
filename : str
path to target filename that should be used for writing
samplerate : float
sampling rate in Hz
channel : str or list/array(str)
dimord : list(str)
ordered list of dimension labels
See also
--------
:func:`syncopy.definetrial`
"""
if dimord is None:
dimord = self._defaultDimord
# Call parent initializer
# trialdefinition has to come from a CR!
super().__init__(
data=data,
filename=filename,
trialdefinition=trialdefinition,
samplerate=samplerate,
channel=channel,
dimord=dimord,
)
# A `h5py.Dataset` holding the average of `data`, or `None` if not computed yet.
self._avg = None
# A `h5py.Dataset` holding variance of `data`, or `None` if not computed yet.
self._var = None
# A `h5py.Dataset` holding covariance of `data`, or `None` if not computed yet.
self._cov = None
# set as instance attribute to allow modification
self._hdfFileDatasetProperties = ContinuousData._hdfFileDatasetProperties + (
"avg",
"var",
"cov",
)
@property
def avg(self):
return self._avg
@property
def var(self):
return self._var
@property
def cov(self):
return self._cov
@ContinuousData.trialdefinition.setter
def trialdefinition(self, trldef):
"""
Override trialdefinition setter, which is special for time-locked data:
all trials have to have the same length and relative timings.
So the trialdefinition has the same offsets everywhere, and it has the general
simple structure:
[[0, nSamples, offset],
[nSamples, 2 * nSamples, offset],
[2 * nSamples, 3 * nSamples, offset],
...]
"""
# we need parent setter for basic validation
ContinuousData.trialdefinition.fset(self, trldef)
# now check for additional conditions
if not self.is_time_locked:
lgl = "trialdefinition with equally sized trials and common offsets"
act = "not timelock compatible trialdefinition"
raise SPYValueError(lgl, "trialdefinition", act)
# TODO - overload `time` property, as there is only one by definition!
# implement plotting
[docs] def singlepanelplot(self, shifted=True, **show_kwargs):
figax = sp_plotting.plot_AnalogData(self, shifted, **show_kwargs)
return figax
[docs] def multipanelplot(self, **show_kwargs):
figax = mp_plotting.plot_AnalogData(self, **show_kwargs)
return figax
[docs] def save_nwb(self, outpath, with_trialdefinition=True, is_raw=True):
"""Save TimeLockData in Neurodata Without Borders (NWB) file format.
An NWBFile represents a single session of an experiment.
Parameters
----------
outpath : str, path-like. Where to save the NWB file, including file name and `.nwb` extension. All directories in the path must exist. Example: `'mydata.nwb'`.
with_trialdefinition : Boolean, whether to save the trial definition in the NWB file.
is_raw : Boolean, whether this is raw data (that should never change), as opposed to LFP data that originates from some processing, e.g., down-sampling and
detrending. Determines where data is stored in the NWB container, to make it easier for other software to interprete what the data represents. If `is_raw` is `True`,
the `ElectricalSeries` is stored directly in an acquisition of the :class:`pynwb.NWBFile`. If False, it is stored inside an `LFP` instance in a processing group called `ecephys`.
Note that for the Syncopy NWB reader, the data should be stored as raw, so this is currently the default.
Returns
-------
None, called for side effect of writing the NWB file to disk.
Notes
-----
Due to the very general architecture of the NWB format, many fields need to be interpreted by software reading the format. Thus,
providing a generic function to save Syncopy data in NWB format is possible only if you know who will read it.
Depending on your target software, you may need to manually format the data using pynwb before writing it to disk, or manually
open it using pynwb before using it with the target software.
Selections are ignored, the full data is exported. Create a new Syncopy data object before calling this function if you want to export a subset only.
This function requires the optional 'pynwb' dependency to be installed.
"""
if not __pynwb__:
raise SPYError("NWB support is not available. Please install the 'pynwb' package.")
nwbfile = _analog_timelocked_to_nwbfile(
self, nwbfile=None, with_trialdefinition=with_trialdefinition, is_raw=is_raw
)
# Write the file to disk.
with NWBHDF5IO(outpath, "w") as io:
io.write(nwbfile)