Source code for syncopy.statistics.timelockanalysis

# -*- coding: utf-8 -*-
#
# Syncopy timelock-analysis methods
#

import os
import numpy as np
import h5py
from copy import deepcopy

# Syncopy imports

import syncopy as spy
from syncopy.shared.parsers import data_parser
from syncopy.shared.kwarg_decorators import (
    unwrap_cfg,
    unwrap_select,
    detect_parallel_client,
)
from syncopy.shared.input_processors import (
    check_effective_parameters,
    check_passed_kwargs,
)
from syncopy.shared.tools import get_defaults, get_frontend_cfg
from syncopy.shared.errors import SPYValueError, SPYTypeError, SPYInfo, SPYWarning
from syncopy.shared.latency import get_analysis_window, create_trial_selection

# local imports
from syncopy.statistics.compRoutines import Covariance

__all__ = ["timelockanalysis"]


[docs]@unwrap_cfg @unwrap_select @detect_parallel_client def timelockanalysis( data, latency="maxperiod", covariance=False, ddof=None, trials="all", keeptrials=False, **kwargs ): """ Average, variance and covariance for :class:`~syncopy.AnalogData` objects across trials. If input ``data`` is not timelocked already, trial cutting and selections will be applied according to the ``latency`` setting. Parameters ---------- data : Syncopy :class:`~syncopy.AnalogData` object Syncopy :class:`~syncopy.AnalogData` object to be averaged across trials latency : array_like or {'maxperiod', 'minperiod', 'prestim', 'poststim'} Either set desired time window (`[begin, end]`) in seconds, 'maxperiod' (default) for the maximum period available or `'minperiod' for minimal time-window all trials share, or `'prestim'` (all t < 0) or `'poststim'` (all t > 0) FieldTrip note: this also sets `covarianceWindow` covariance : bool Set to ``True`` to also compute covariance over channels ddof : int or None Degrees of freedom for covariance estimation, defaults to ``N - 1`` trials : 'all' or sequence Trial selection for FieldTrip compatibility, alternatively use standard Syncopy ``select`` dictionary which also allows additional selections over channels. keeptrials : bool Set to ``False`` to get single trial covariances in ``out.cov`` Returns ------- out : :class:`~syncopy.TimeLockData` Time locked data object, with additional datasets: "avg", "var" and "cov" if ``convariance`` was set to ``True`` """ # -- check user input -- data_parser(data, varname="data", empty=False, dataclass="AnalogData") if ddof is not None: if not isinstance(ddof, int) or ddof < 0: lgl = "positive integer value" act = ddof raise SPYValueError(lgl, "ddof", act) if not isinstance(covariance, bool): raise SPYTypeError(covariance, varname="covariance", expected="bool") if not isinstance(keeptrials, bool): raise SPYTypeError(covariance, varname="keeptrials", expected="bool") # latency gets checked within selectdata(latency=...) # -- standard block to check and store provided kwargs/cfg -- defaults = get_defaults(timelockanalysis) lcls = locals() # check for ineffective additional kwargs check_passed_kwargs(lcls, defaults, frontend_name="timelockanalysis") # save frontend call in cfg new_cfg = get_frontend_cfg(defaults, lcls, kwargs) log_dict = { "latency": latency, "covariance": covariance, "ddof": ddof, "trials": trials, } # -- create outtput object -- # start empty, data gets added later tld = spy.TimeLockData(samplerate=data.samplerate) # -- propagate old cfg and attach this one -- tld.cfg.update(data.cfg) tld.cfg.update({"timelockanalysis": new_cfg}) # to restore later as we apply selection inside here select_backup = None if data.selection is None else deepcopy(data.selection.select) if data.selection is not None: if trials != "all" and data.selection.select.get("trials") is not None: lgl = "either `trials != 'all'` or selection" act = "trial keyword and trial selection" raise SPYValueError(lgl, "trials", act) # evaluate legacy `trials` keyword value as selection elif trials != "all": select = data.selection.select select["trials"] = trials data.selectdata(select, inplace=True) elif trials != "all": # error handling done here data.selectdata(trials=trials, inplace=True) # digest selections if data.selection is not None: # select trials either via selection of keyword: trl_def = data.selection.trialdefinition sinfo = data.selection.trialdefinition[:, :2] trials = data.selection.trials trl_ivl = data.selection.trialintervals # beginnings and ends of all (selected) trials in trigger-relative time trl_starts, trl_ends = trl_ivl[:, 0], trl_ivl[:, 1] else: trl_def = data.trialdefinition sinfo = data.sampleinfo trials = data.trials # beginnings and ends of all (selected) trials in trigger-relative time trl_starts, trl_ends = data.trialintervals[:, 0], data.trialintervals[:, 1] # --- apply `latency` (time window of analysis) --- if data.selection is not None: # update selection data.selectdata(data.selection.select, latency=latency, inplace=True) else: # create new selection data.selectdata(latency=latency, inplace=True) # stream copy cut/selected trials/time window into new dataset # by exploiting the in place selection dset = _dataset_from_trials(data, dset_name="data", filename=tld._gen_filename()) # no copy here tld.data = dset tld.trialdefinition = data.selection.trialdefinition # now calculate via standard statistics avg = spy.mean(tld, dim="trials", parallel=False) var = spy.var(tld, dim="trials", parallel=False) # attach data to TimeLockData tld._update_dataset("avg", avg.data) tld._update_dataset("var", var.data) # explicitly delete unneeded objects but keep the data avg._persistent_hdf5, var._persistent_hdf5 = True, True del avg, var # -- set up covariance CR -- if covariance: check_effective_parameters(Covariance, defaults, lcls, besides=["covariance", "trials", "latency"]) covCR = Covariance(ddof=ddof, statAxis=data.dimord.index("time")) # dimord is time x freq x channel x channel out = spy.CrossSpectralData(dimord=spy.CrossSpectralData._defaultDimord) covCR.initialize( data, out._stackingDim, keeptrials=keeptrials, ) # and compute covCR.compute(data, out, parallel=kwargs.get("parallel"), log_dict=log_dict) # attach computed cov as array tld._update_dataset("cov", out.data[:, 0, ...].squeeze()) # -- restore initial selection or wipe -- if select_backup: # this rewrites the cfg data.selectdata(select_backup, inplace=True) else: data.selection = None # erase local selection entry data.cfg.pop("selectdata") return tld
def _dataset_from_trials(spy_data, dset_name="new_data", filename=None): """ Helper to construct a new dataset from a TrialIndexer, respecting selections This function is only needed if a dataset is to be transferred between two different Syncopy data classes, for the same data class a standard ``new = old.selectdata(..., inplace=False)`` does the trick. """ stackDim = spy_data._stackingDim # get the TrialIndexer if spy_data.selection is None: trials = spy_data.trials else: trials = spy_data.selection.trials # shapes have to match except for stacking dim # which is guaranteed by the source trials Indexer stackingDimSize = sum([trl.shape[stackDim] for trl in trials]) # index of 1st trial idx0 = 0 if spy_data.selection is None else spy_data.selection.trial_ids[0] new_shape = list(trials[idx0].shape) # plug in stacking dimension new_shape[stackDim] = stackingDimSize if filename is None: # generates new name with same extension filename = spy_data._gen_filename() # create new hdf5 File and dataset with h5py.File(filename, mode="w") as h5f: new_ds = h5f.create_dataset(dset_name, shape=new_shape) # all-to-all indexer idx = [slice(None) for _ in range(len(new_shape))] # stacking dim chunk size counter stacking = 0 # now stream the trials into the new dataset for trl in trials: # length along stacking dimension trl_len = trl.shape[stackDim] # define the chunk and increment stacking dim indexer idx[stackDim] = slice(stacking, stacking + trl_len) stacking += trl_len # insert the trial new_ds[tuple(idx)] = trl # open again for reading and return dataset directly return h5py.File(filename, mode="r+")[dset_name]