Source code for syncopy.connectivity.connectivity_analysis

# -*- coding: utf-8 -*-
#
# Syncopy connectivity analysis frontend
#

# Builtin/3rd party package imports
import numpy as np
import h5py

# Syncopy imports
import syncopy as spy
from syncopy.connectivity.AV_compRoutines import (
    NormalizeCrossSpectra,
    NormalizeCrossCov,
    GrangerCausality,
)
from syncopy.connectivity.ST_compRoutines import (
    CrossSpectra,
    CrossCovariance,
    SpectralDyadicProduct,
    PPC_column,
)
from syncopy.shared.input_processors import (
    process_taper,
    process_foi,
    process_padding,
    check_effective_parameters,
    check_passed_kwargs,
)
from syncopy.shared.kwarg_decorators import (
    unwrap_cfg,
    unwrap_select,
    detect_parallel_client,
)
from syncopy.shared.errors import SPYValueError, SPYWarning, SPYInfo, SPYTypeError

from syncopy.datatype import CrossSpectralData, AnalogData, SpectralData
from syncopy.shared.tools import get_defaults, best_match, get_frontend_cfg
from syncopy.shared.parsers import data_parser, scalar_parser, sequence_parser
from syncopy.shared.computational_routine import propagate_properties
from syncopy.statistics import jackknifing as jk
from syncopy.statistics import summary_stats as st

availableMethods = ("coh", "corr", "granger", "csd", "ppc")
connectivity_outputs = ("abs", "pow", "complex", "fourier", "angle", "real", "imag")


[docs]@unwrap_cfg @unwrap_select @detect_parallel_client def connectivityanalysis( data, method="coh", keeptrials=False, output="abs", foi=None, foilim=None, pad="maxperlen", channelcmb=None, polyremoval=0, tapsmofrq=None, nTaper=None, taper="hann", taper_opt=None, jackknife=False, **kwargs, ): """ Perform connectivity analysis of Syncopy :class:`~syncopy.SpectralData` OR directly :class:`~syncopy.AnalogData` objects In case the input is an :class:`~syncopy.AnalogData` object, a (multi-)tapered Fourier analysis is performed implicitly to arrive at the cross spectral densities needed for coherence, ppc and Granger causality estimates. Relevant parameters are the same as for :func:`~syncopy.freqanalysis` with ``method='mtmfft'``: ('foi', 'foilim', 'pad', 'tapsmofrq', 'nTaper', 'taper', 'taper_opt', 'polyremoval') If the input is already in the spectral domain, so ``data`` is of class :class:`~syncopy.SpectralData`, no additional modification of the spectra is performed, and all parameters above to control the spectral estimation have no effect. **Usage Summary** List of available analysis methods and respective distinct options: "coh" : (Multi-) tapered coherency estimate Compute the normalized cross spectral densities between channel combinations * **output** : one of ('abs', 'pow', 'complex', 'angle', 'imag' or 'real') * **channelcmb**: [senders, receivers], two sequences encoding channel names/indices * **jackknife**: set to `True` to compute the variance via jackknife resampling **Spectral analysis** (input is :class:`~syncopy.AnalogData`): * **taper** : one of :data:`~syncopy.shared.const_def.availableTapers` * **tapsmofrq** : spectral smoothing box for slepian tapers (in Hz) * **nTaper** : (optional) number of orthogonal tapers for slepian tapers * **pad**: either pad to an absolute length in seconds or set to `'nextpow2'` "csd" : ('Multi-) tapered cross spectra Computes the cross spectral estimates between channel combinations output is fixed: complex cross spectra * **channelcmb**: [senders, receivers], two sequences encoding channel names/indices * **keeptrials**: set to False to get single-trial cross-spectra **Spectral analysis** (input is :class:`~syncopy.AnalogData`): * **taper** : one of :data:`~syncopy.shared.const_def.availableTapers` * **tapsmofrq** : spectral smoothing box for slepian tapers (in Hz) * **nTaper** : (optional) number of orthogonal tapers for slepian tapers * **pad**: either pad to an absolute length in seconds or set to `'nextpow2'` "ppc" : Pairwise phase consistency, see [Vinck2010]_ Computes the PPC phase locking index for channel combinations. NOTE: Computations for all channel pairs can be slow, it is recommended to use the `channelcmb` parameter to compute only desired channel pairs, as this can lead to a significant speed up. output is fixed : real ppc spectrum * **channelcmb**: [senders, receivers], two sequences encoding channel names/indices **Spectral analysis** (input is :class:`~syncopy.AnalogData`): * **taper** : one of :data:`~syncopy.shared.const_def.availableTapers` * **tapsmofrq** : spectral smoothing box for slepian tapers (in Hz) * **nTaper** : (optional) number of orthogonal tapers for slepian tapers * **pad**: either pad to an absolute length in seconds or set to `'nextpow2'` "granger" : Spectral Granger-Geweke causality following [Dhamala2008]_ Computes linear causality estimates between channel combinations. WARNING: When inputting :class:`~syncopy.SpectralData` directly, it is very important that the previous `spy.freqanalysis` was done without foi/foilim specification as Granger causality needs all attainable frequencies (0, f_Nyquist)! It is possible to compute many channel pairs at once (default behavior), but this can be numerically unstable. The recommended way is to use the `channelcmb` parameter, which triggers pair-wise computation. * **channelcmb**: [senders, receivers], two sequences encoding channel names/indices * **jackknife**: set to `True` to compute the variance via jackknife resampling **Spectral analysis** (input is :class:`~syncopy.AnalogData`): * **taper** : one of :data:`~syncopy.shared.const_def.availableTapers` * **tapsmofrq** : spectral smoothing box for slepian tapers (in Hz) * **nTaper** : (optional, not recommended) number of slepian tapers * **pad**: either pad to an absolute length in seconds or set to `'nextpow2'` After the computation, information about the convergence and potential regularization of the cross-spectral densities can be obtained by inspecting the ``.info`` property of the resulting :class:~`syncopy.CrossSpectralData` object. Keys of that info-dict are: ```{'converged', 'max rel. err', 'reg. factor', 'initial cond. num'}```. "corr" : Cross-correlations Computes the one sided (positive lags) cross-correlations between all channel combinations of :class:`~syncopy.AnalogData`. The maximal lag is half the trial lengths. * **keeptrials** : set to `True` for single trial cross-correlations Parameters ---------- data : `~syncopy.AnalogData` A non-empty Syncopy :class:`~syncopy.SpectralData` or :class:`~syncopy.AnalogData` object method : str Connectivity estimation method, one of ``'coh'`, 'corr', 'granger', 'csd', 'ppc'`` output : str Relevant for coherence estimation (``method='coh'``) Use ``'pow'`` for absolute squared coherence, ``'abs'`` for absolute value of coherence , ``'complex'`` for the complex valued coherency or ``'angle'``, ``'imag'`` or ``'real'`` to extract the phase difference, imaginary or real part of the coherency respectively. keeptrials : bool Relevant only for cross-correlations (``method='corr'``) and cross spectra (``method='csd'``). If `True` single-trial cross-correlations/cross-spectra are returned. foi : array-like or None Frequencies of interest (Hz) for output. If desired frequencies cannot be matched exactly, the closest possible frequencies are used. If ``foi`` is ``None`` or ``foi = "all"``, all attainable frequencies (i.e., zero to Nyquist / 2) are selected. foilim : array-like [fmin, fmax] or None or "all" Frequency-window ``[fmin, fmax]`` (in Hz) of interest. The `foi` array will be constructed in 1Hz steps from `fmin` to `fmax` (inclusive). pad : 'maxperlen', float or 'nextpow2' - For the default ``'maxperlen'``, no padding is performed in case of equal length trials, while trials of varying lengths are padded to match the longest trial. If ``pad`` is a number all trials are padded so that ``pad`` indicates the absolute length of all trials after padding (in seconds). For instance ``pad = 2`` pads all trials to an absolute length of 2000 samples, if and only if the longest trial contains at maximum 2000 samples and the samplerate is 1kHz. If ``pad`` is ``'nextpow2'`` all trials are padded to the nearest power of two (in samples) of the longest trial. channelcmb : [senders, receivers], list of array like, optional Two sequences ``senders`` and ``receivers`` encoding channel names or indices. such that connectivity measure gets computed only for those (senders x receivers) channel combinations. Only supported for spectral measures ``'coh', 'csd', 'ppc', 'granger'`` and requires :class:`~syncopy.SpectralData` as input data type. tapsmofrq : float or None Only valid if ``method`` is ``'coh'``, ``'csd'`` or ``'granger'`` and input data is a :class:`~syncopy.AnalogData`. Enables multi-tapering and sets the amount of spectral smoothing with slepian tapers in Hz. nTaper : int or None Only valid if ``method`` is ``'coh'`` or ``'granger'`` and ``tapsmofrq`` is set. Number of orthogonal tapers to use for multi-tapering. It is not recommended to set the number of tapers manually! Leave at ``None`` for the optimal number to be set automatically. taper : str or None, optional Only valid if ``method`` is ``'coh'``, ``'csd'`` or ``'granger'`` and input data is a :class:`~syncopy.AnalogData`. Windowing function, one of :data:`~syncopy.specest.const_def.availableTapers` For multi-tapering with slepian tapers use `tapsmofrq` directly. taper_opt : dict or None Only valid if ``method`` is ``'coh'``, ``'csd'`` or ``'granger'`` and input data is a :class:`~syncopy.AnalogData`. Dictionary with keys for additional taper parameters. For example :func:`~scipy.signal.windows.kaiser` has the additional parameter 'beta'. For multi-tapering set ``tapsmofrq`` directly. jackknife: bool, optional Set to `True` to compute the variance via jackknife resampling, only available (and meaningful) for methods `coh` and `granger` Returns ------- out : `~syncopy.CrossSpectralData` The analyis result with dims ['time', 'freq', 'channel_i', channel_j']. The `out` may contain additional metadata, based on the `method` used to compute it: * For `method='granger'`, the `out.info` property contains the keys listed and explained in :data:`~syncopy.connectivity.AV_compRoutines.GrangerCausality.metadata_keys`. * if `jackknife=True`, `out.jack_var` and `out.jack_bias` contain the jackknife variance and bias Examples -------- In the following `adata` is an instance of :class:`~syncopy.AnalogData` Calculate the coherence between all channels with 2Hz spectral smoothing, and plot the results for two combinations between 30Hz and 90Hz: >>> coh = spy.connectivityanalysis(adata, method='coh', tapsmofrq=2) >>> coh.singlepanelplot(channel_i=0, channel_j=1, frequency=[30,90]) >>> coh.singlepanelplot(channel_i=1, channel_j=2, frequency=[30,90]) Compute the cross-correlation between channel 8 and 12 and plot the results for the first 200ms: >>> cfg = spy.StructDict() >>> cfg.method = 'corr' >>> cfg.select = {'channel': ['channel8', 'channel12']} >>> corr = spy.connectivityanalysis(adata, cfg) >>> corr.singlepanelplot(channel_i='channel8', channel_j='channel12', latency=[0, 0.2]) Estimate Granger causality only between channel pairs (0->3), (0->4), (2->3) and (2->4) First compute a :class:`~syncopy.SpectralData` with complex dtype and no frequency limitations (so no foi/foilim setting): >>> spec = spy.freqanalysis(adata, output='complex') Then the connectivity analysis: >>> cfg = spy.StructDict() >>> cfg.method = 'granger' >>> cfg.channelcmb = [[0, 2], [3, 4]] # [senders, receivers] >>> granger = spy.connectivityanalysis(spec, cfg) Plot the (2->4) results between 15Hz and 60Hz: >>> granger.singlepanelplot(channel_i='channel3', channel_j='channel5', frequency=[15, 60]) Notes ----- .. [Dhamala2008] Dhamala, Mukeshwar, Govindan Rangarajan, and Mingzhou Ding. "Analyzing information flow in brain networks with nonparametric Granger causality." Neuroimage 41.2 (2008): 354-362. .. [Vinck2010] Vinck, Martin, et al. "The pairwise phase consistency: a bias-free measure of rhythmic neuronal synchronization." Neuroimage 51.1 (2010): 112-122. """ # Make sure our one mandatory input object can be processed try: data_parser(data, varname="data", writable=None, empty=False) except Exception as exc: raise exc if not isinstance(data, (AnalogData, SpectralData)): lgl = "either AnalogData or SpectralData as input" act = f"{data.__class__.__name__}" raise SPYValueError(lgl, "data", act) timeAxis = data.dimord.index("time") # Get everything of interest in local namespace defaults = get_defaults(connectivityanalysis) lcls = locals() # check for ineffective additional kwargs check_passed_kwargs(lcls, defaults, frontend_name="connectivity") # Ensure a valid computational method was selected if method not in availableMethods: lgl = "'" + "or '".join(opt + "' " for opt in availableMethods) raise SPYValueError(legal=lgl, varname="method", actual=method) if not isinstance(jackknife, bool): raise SPYTypeError(jackknife, "jackknife", "boolean") if jackknife and method not in ["coh", "granger"]: # makes only sense for these methods spy.log( f"Jackknife is not available for method {method}", level="WARNING", caller="connectivityanalysis", ) jackknife = False # output settings are only relevant for coherence if method != "coh" and output != defaults["output"]: msg = f"Setting `output` for method {method} has no effect!" SPYWarning(msg) # if a subset selection is present # get sampleinfo and check for equidistancy if data.selection is not None: sinfo = data.selection.trialdefinition[:, :2] # user picked discrete set of time points else: sinfo = data.sampleinfo lenTrials = np.diff(sinfo).squeeze() nTrials = len(sinfo) # validate channel combinations parameter if channelcmb is not None: if not isinstance(data, SpectralData): expected = "SpectralData, `channelcmb` not supported for other data types, " raise SPYTypeError(data, "data", expected=expected) if not isinstance(channelcmb, list): raise SPYTypeError(channelcmb, "channelcmb", expected="list") if len(channelcmb) != 2: lgl = "list with exactly two elements: [senders, receivers]" raise SPYValueError(legal=lgl, varname="channelcmb", actual=f"length of {len(channelcmb)}") # can't have channel selection AND channelcmb parameter set at the same time if data.selection is not None and data.selection.channel != slice(None, None, 1): raise SPYValueError("either channel selection or use channelcmb", "select/channelcmb", "both") senders, receivers = channelcmb # make sure we have an iterable of consistent type sequence_parser(senders, varname="channelcmb[senders,") if not isinstance(senders[0], (str, int)): raise SPYTypeError(senders[0], "channelcmb[senders,", "either `int` or `str`") # fix type, either channel name (str) or index (int) cmb_type = type(senders[0]) chan_avail = data.channel if cmb_type == str else range(len(data.channel)) # repeat now with type check sequence_parser(senders, varname="channelcmb[senders,", content_type=cmb_type) # check also receivers sequence_parser(receivers, varname="channelcmb[,receivers]", content_type=cmb_type) # check that channels are available for chan in senders: if chan not in chan_avail: lgl = "names or indices of existing channels" act = chan raise SPYValueError(lgl, "channelcmb", act) for chan in receivers: if chan not in chan_avail: lgl = "names or indices of existing channels" act = chan raise SPYValueError(lgl, "channelcmb", act) # check padding if method == "corr" and pad != "maxperlen": lgl = "'maxperlen', no padding needed/allowed for cross-correlations" actual = f"{pad}" raise SPYValueError(legal=lgl, varname="pad", actual=actual) # check polyremoval if polyremoval is not None: scalar_parser(polyremoval, varname="polyremoval", ntype="int_like", lims=[0, 1]) # Prepare keyword dict for logging (use `lcls` to get actually provided # keyword values, not defaults set above) log_dict = { "method": method, "keeptrials": keeptrials, "polyremoval": polyremoval, "pad": pad, "channelcmb": channelcmb } new_cfg = get_frontend_cfg(defaults, lcls, kwargs) # --- method specific processing --- if method == "corr": if not isinstance(data, AnalogData): lgl = f"AnalogData instance as input for method {method}" actual = f"{data.__class__.__name__}" raise SPYValueError(lgl, "data", actual) if lcls["foi"] is not None: msg = "Parameter `foi` has no effect for method `corr`" SPYWarning(msg) check_effective_parameters(CrossCovariance, defaults, lcls, besides=("jackknife",)) # single trial cross-correlations if keeptrials: av_compRoutine = None # no trial average norm = True # normalize individual trials within the ST CR else: av_compRoutine = NormalizeCrossCov() norm = False # parallel computation over trials st_compRoutine = CrossCovariance( samplerate=data.samplerate, polyremoval=polyremoval, timeAxis=timeAxis, norm=norm, ) # hard coded as class attribute st_dimord = CrossCovariance.dimord # all these methods need the single trial cross spectra # we just have to sort out if we need an mtmfft first (for AnalogData input) elif method in ["csd", "coh", "ppc", "granger"]: if nTrials == 1: lgl = ( "multi-trial input data, spectral connectivity measures critically depend on trial averaging!" ) act = "only one trial" raise SPYValueError(lgl, "data", act) if keeptrials is not False and method in ("coh", "ppc", "granger"): lgl = f"False, trial averaging needed for method {method}!" act = keeptrials raise SPYValueError(lgl, varname="keeptrials", actual=act) # AnalogData - we have to setup implicit spectral analysis (mtmfft) if isinstance(data, AnalogData): # the actual number of samples in case of later padding nSamples = process_padding(pad, lenTrials, data.samplerate) check_effective_parameters(CrossSpectra, defaults, lcls, besides=("jackknife", "channelcmb")) st_compRoutine, st_dimord = cross_spectra( data, method, nSamples, foi, foilim, tapsmofrq, nTaper, taper, taper_opt, polyremoval, log_dict, timeAxis, ) # SpectralData input elif isinstance(data, SpectralData): # cross-spectra need complex input spectra if not np.issubdtype(data.data.dtype, np.complexfloating): lgl = "complex valued spectra, set `output='fourier'` in spy.freqanalysis!" act = "real valued spectral data" raise SPYValueError(lgl, "data", act) if method == "granger": # check that for SpectralData input, we have empty time axes # no time-resolved Granger supported atm if isinstance(data, SpectralData): if data.data.shape[data.dimord.index("time")] != len(data.trials): raise NotImplementedError( "Time resolved Granger causality from tf-spectra not available atm" ) # by constraining to output='fourier', detrimental taper averaging # gets already catched by freqanalysis! check_effective_parameters( SpectralDyadicProduct, defaults, lcls, besides=("jackknife", "channelcmb"), ) # -- channelcmb, sanity checks for channelcmb done above! -- # truly rectangular matrix operations (len(senders) ~= len(receivers)) # only meaningful for PPC and single trial cross-spectra if channelcmb is not None and method in ['ppc', 'csd']: senders, receivers = channelcmb # save current selection if data.selection is not None: select_backup = data.selection.select else: select_backup = None # get the indices/slice of the selected senders channels # by using temporary inplace selections data.selectdata(channel=senders, inplace=True) send_idx = data.selection.channel send_N = len(data.channel[send_idx]) # get the indices/slice of the selected receiver channels data.selectdata(channel=receivers, inplace=True) rec_idx = data.selection.channel rec_N = len(data.channel[rec_idx]) # revert to initial selection if any if select_backup: data.selectdata(select_backup, inplace=True) else: data.selection = None st_compRoutine = SpectralDyadicProduct(send_idx=send_idx, send_N=send_N, rec_idx=rec_idx, rec_N=rec_N) else: # there are no free parameters here, # everything had to be setup during freqanalysis! st_compRoutine = SpectralDyadicProduct() st_dimord = SpectralDyadicProduct.dimord # --- Set up of computation of single trial cross quantities is complete --- if method == "coh": if output not in connectivity_outputs: lgl = f"one of {connectivity_outputs}" raise SPYValueError(lgl, varname="output", actual=output) log_dict["output"] = output # final normalization after trial averaging av_compRoutine = NormalizeCrossSpectra(output=output) elif method == "ppc": # besides = ['jackknife'] # spectral analysis only possible with AnalogData if isinstance(data, AnalogData): besides = ["taper", "tapsmofrq", "nTaper"] else: besides = ['channelcmb'] check_effective_parameters(PPC_column, defaults, lcls, besides=besides) # this needs to be treated differently, as we need repeated # inits of the PPC CR to compute all trial pairs av_compRoutine = "ppc" elif method == "granger": besides = ["jackknife"] # spectral analysis only possible with AnalogData if isinstance(data, AnalogData): besides += ["taper", "tapsmofrq", "nTaper"] else: besides += ["channelcmb"] check_effective_parameters(GrangerCausality, defaults, lcls, besides=besides) # after trial averaging # hardcoded numerical parameters av_compRoutine = GrangerCausality(rtol=5e-6, nIter=100, cond_max=1e4) # here the single trial spectra are the final result elif method == "csd": av_compRoutine = None # ------------------------------------------------- # Call the chosen single trial ComputationalRoutine # ------------------------------------------------- # the single trial results need a new DataSet st_out = CrossSpectralData(dimord=st_dimord) # we need single trials for the jackknife and the PPC keeptrials = True if (keeptrials or (jackknife or method == "ppc")) else False # Perform the trial-parallelized computation of the matrix quantity st_compRoutine.initialize( data, st_out._stackingDim, chan_per_worker=None, # no parallelisation over channels possible keeptrials=keeptrials, ) # True for jackknifing st_compRoutine.compute(data, st_out, parallel=kwargs.get("parallel"), log_dict=log_dict) if jackknife: jack_in = st_out # single trials for the replicates # the trial average for the direct estimate by the av_compRoutine st_out = spy.mean(st_out, dim="trials") # compute all the leave-one-out (loo) trial average replicates replicates_avg = jk.trial_avg_replicates(jack_in) # ------------------------ # evaluate av_compRoutine # ------------------------ # for single trial cross-corr/cross spectra results # keeptrials can be True and hence we are done here if av_compRoutine is None: st_out.cfg.update(data.cfg) st_out.cfg.update({"connectivityanalysis": new_cfg}) return st_out # -- PPC computation -- # set up nTrials(nTrials-1) pair computations # which need an outer loop over nTrials as a single CR # can only compute one column of all the nTrials x nTrials combinations elif av_compRoutine == "ppc": # we need to average all the CR results, shapes match accumulator = np.zeros(st_out.trials[0].shape, dtype=np.float32) nTrials = len(st_out.trials) # to create the trial selections trl_arr = np.arange(nTrials) # upper triangle weights for grand average weights = np.arange(1, nTrials) / (nTrials - 1) # any selection got already digested by the preceding st_compRoutine # so we can loop over all trials for the upper triangular (w/o diagonal) for trl_idx in range(1, nTrials): # hdf5 index tuple to access a 2nd trial # needs to be done before(!) any trial subselection trl2_idx = st_out._preview_trial(trl_idx).idx hdf5_path = st_out._filename # create selection for upper triangle trl_bi = trl_arr < trl_idx st_out.selectdata(trials=trl_arr[trl_bi], inplace=True) # set up CR ppc_CR = PPC_column(trl2_idx=trl2_idx, hdf5_path=hdf5_path) # inner result trl_pairs = CrossSpectralData(dimord=st_dimord) ppc_CR.initialize(st_out, trl_pairs._stackingDim, chan_per_worker=None, keeptrials=True) ppc_CR.compute(st_out, trl_pairs, parallel=kwargs.get("parallel"), log_dict=log_dict) # now average the nTrials-1 remaining pairs trl_pairs_avg = st.mean(trl_pairs, dim="trials") accumulator += trl_pairs_avg.trials[0] * weights[trl_idx - 1] # reset selection st_out.selection = None # normalize and create single trial PPC output object accumulator *= 2 / nTrials out = CrossSpectralData(dimord=st_dimord, data=accumulator) time_axis = np.any(np.diff(st_out.trialdefinition)[:, 0] != 1) propagate_properties(st_out, out, keeptrials=False, time_axis=time_axis) # add log from last PPC CR call out.log = trl_pairs._log # -- Coherence and Granger -- else: out = CrossSpectralData(dimord=st_dimord) # full quadratic CSD for coherence in any case if method == 'coh' or channelcmb is None: # now take the trial average from the single trial CR as input av_compRoutine.initialize(st_out, out._stackingDim, chan_per_worker=None) av_compRoutine.pre_check() # make sure we got a trial_average av_compRoutine.compute(st_out, out, parallel=kwargs.get("parallel"), log_dict=log_dict) # loop over the pairs and write in CR external hdf5 to # collect results of individual pair results elif channelcmb is not None and method == 'granger': senders, receivers = channelcmb # create new filename fname = spy.CrossSpectralData().filename with h5py.File(fname, "w") as h5file: shape = (1, len(st_out.freq), len(senders), len(receivers)) dset = h5file.create_dataset("data", dtype=np.float32, shape=shape) # compute granger in pairs for idx1, ch1 in enumerate(senders): for idx2, ch2 in enumerate(receivers): # 2-channel quadratic csd st_out.selectdata(channel_i=[ch1, ch2], channel_j=[ch1, ch2], inplace=True) # single pair result pair_out = spy.CrossSpectralData(dimord=st_dimord) av_compRoutine.initialize(st_out, pair_out._stackingDim, chan_per_worker=None) av_compRoutine.pre_check() # make sure we got a trial_average av_compRoutine.compute(st_out, pair_out, parallel=kwargs.get("parallel"), log_dict=log_dict) # write result, idx1/idx2 directly encode positions in result matrix with h5py.File(fname, "r+") as h5file: dset = h5file['data'] # only direction sender(ch1) -> receiver(ch2) dset[0, :, idx1, idx2] = pair_out.data[0, :, 0, 1] # finally attach result matrix to result object with h5py.File(fname, "r") as h5file: dset = h5file['data'] out.data = dset out._reopen() # ..and attach metadata out._log = pair_out._log out.samplerate = st_out.samplerate # either str or int if isinstance(senders[0], str): out.channel_i = senders out.channel_j = receivers else: out.channel_i = data.channel[senders] out.channel_j = data.channel[receivers] # trivial trialdefinition out.trialdefinition = np.array([[0, 1., 0]]) # `out` is the direct estimate if jackknife: jack_rep = CrossSpectralData(dimord=st_dimord) av_compRoutine.initialize(replicates_avg, jack_rep._stackingDim) # without `pre_check` we can compute the replicates for all loo averages (in parallel!) av_compRoutine.compute( replicates_avg, jack_rep, parallel=kwargs.get("parallel"), log_dict=log_dict, ) # now compute bias and variance bias, variance = jk.bias_var(out, jack_rep) bias._persistent_hdf5 = True variance._persistent_hdf5 = True # and attach to output object out._register_dataset("jack_var", inData=variance.data) out._register_dataset("jack_bias", inData=bias.data) out.jack_var = out._jack_var out.jack_bias = out._jack_bias # just post-select specific channel combinations for coherence if channelcmb is not None and method == 'coh': senders, receivers = channelcmb out = out.selectdata(channel_i=senders) out = out.selectdata(channel_j=receivers) # attach potential older cfg's from the input # to support chained frontend calls.. out.cfg.update(data.cfg) # attach frontend parameters for replay new_cfg.update({"output": output}) out.cfg.update({"connectivityanalysis": new_cfg}) return out
def cross_spectra( data, method, nSamples, foi, foilim, tapsmofrq, nTaper, taper, taper_opt, polyremoval, log_dict, timeAxis, ): """ Helper to set up the CR to compute the single trial cross-spectra from AnalogData """ # --- Basic foi sanitization --- foi, foilim = process_foi(foi, foilim, data.samplerate) # --- Setting up specific Methods --- if method == "granger": if foi is not None or foilim is not None: lgl = "no foi specification for Granger analysis" actual = "foi or foilim specification" raise SPYValueError(lgl, "foi/foilim", actual) nChannels = len(data.channel) nTrials = len(data.trials) # warn user if this ratio is not small if nChannels / nTrials > 0.1: msg = "Multi-channel Granger analysis can be numerically unstable, it is recommended to have at least 10 times the number of trials compared to the number of channels. Try calculating in sub-groups of fewer channels!" SPYWarning(msg) # --- set up computation of the single trial CSDs --- # Construct array of maximally attainable frequencies freqs = np.fft.rfftfreq(nSamples, 1 / data.samplerate) # Match desired frequencies as close as possible to # actually attainable freqs # these are the frequencies attached to the CrossSpectralData by the CR! if foi is not None: foi, _ = best_match(freqs, foi, squash_duplicates=True) elif foilim is not None: foi, _ = best_match(freqs, foilim, span=True, squash_duplicates=True) elif foi is None and foilim is None: # Construct array of maximally attainable frequencies msg = f"Setting frequencies of interest to {freqs[0]:.1f}-" f"{freqs[-1]:.1f}Hz" SPYInfo(msg) foi = freqs # sanitize taper selection and retrieve dpss settings if data.selection is None: sinfo = data.sampleinfo else: sinfo = data.selection.trialdefinition[:, :2] lenTrials = np.diff(sinfo).squeeze() taper, taper_opt = process_taper( taper, taper_opt, tapsmofrq, nTaper, keeptapers=False, # ST_CSD's always average tapers foimax=foi.max(), samplerate=data.samplerate, nSamples=lenTrials.mean(), output="pow", ) # ST_CSD's always have this unit/norm log_dict["foi"] = foi log_dict["taper"] = taper if taper_opt and taper == "dpss": log_dict["nTaper"] = taper_opt["Kmax"] log_dict["tapsmofrq"] = tapsmofrq elif taper_opt: log_dict["taper_opt"] = taper_opt # parallel computation over trials st_compRoutine = CrossSpectra( samplerate=data.samplerate, nSamples=nSamples, taper=taper, taper_opt=taper_opt, demean_taper=method == "granger", polyremoval=polyremoval, timeAxis=timeAxis, foi=foi, ) # hard coded as class attribute st_dimord = CrossSpectra.dimord return st_compRoutine, st_dimord