# -*- coding: utf-8 -*-
#
# Syncopy spectral estimation methods
#
# Builtin/3rd party package imports
import numpy as np
# Syncopy imports
from syncopy.shared.parsers import data_parser, scalar_parser, array_parser
from syncopy.shared.tools import get_defaults, get_frontend_cfg
from syncopy.datatype import SpectralData
from syncopy.shared.errors import (
SPYValueError,
SPYTypeError,
SPYWarning,
SPYInfo,
SPYLog,
)
from syncopy.shared.kwarg_decorators import (
unwrap_cfg,
unwrap_select,
detect_parallel_client,
)
from syncopy.shared.tools import best_match
from syncopy.shared.const_def import spectralConversions
import syncopy as spy
from syncopy.shared.input_processors import (
process_taper,
process_foi,
process_padding,
check_effective_parameters,
check_passed_kwargs,
)
# method specific imports - they should go!
from syncopy.specest.fooofspy import default_fooof_opt
import syncopy.specest.wavelets as spywave
import syncopy.specest.superlet as superlet
from .wavelet import get_optimal_wavelet_scales
# Local imports
from .compRoutines import (
SuperletTransform,
WaveletTransform,
MultiTaperFFT,
MultiTaperFFTConvol,
FooofSpy,
)
availableFooofOutputs = ["fooof", "fooof_aperiodic", "fooof_peaks"]
availableOutputs = tuple(spectralConversions.keys())
availableWavelets = ("Morlet", "Paul", "DOG", "Ricker", "Marr", "Mexican_hat")
availableMethods = ("mtmfft", "mtmconvol", "wavelet", "superlet", "welch")
[docs]@unwrap_cfg
@unwrap_select
@detect_parallel_client
def freqanalysis(
data,
method="mtmfft",
output="pow",
keeptrials=True,
foi=None,
foilim=None,
pad="maxperlen",
polyremoval=0,
taper="hann",
demean_taper=False,
taper_opt=None,
tapsmofrq=None,
nTaper=None,
keeptapers=False,
toi="all",
t_ftimwin=None,
wavelet="Morlet",
width=6,
order=None,
order_max=None,
order_min=1,
c_1=3,
adaptive=False,
out=None,
fooof_opt=None,
ft_compat=False,
**kwargs,
):
"""
Perform (time-)frequency analysis of Syncopy :class:`~syncopy.AnalogData` objects
**Usage Summary**
Options available in all analysis methods:
* **output** : one of :data:`~syncopy.specest.freqanalysis.availableOutputs`;
return power spectra, complex Fourier spectra or absolute values.
* **foi**/**foilim** : frequencies of interest; either array of frequencies or
frequency window (not both)
* **keeptrials** : return individual trials or grand average
* **polyremoval** : de-trending method to use (0 = mean, 1 = linear or `None`)
List of available analysis methods and respective distinct options:
"mtmfft" : (Multi-)tapered Fourier transform
Perform frequency analysis on time-series trial data using either a single
taper window (Hanning) or many tapers based on the discrete prolate
spheroidal sequence (DPSS) that maximize energy concentration in the main
lobe.
* **taper** : one of :data:`~syncopy.shared.const_def.availableTapers`
* **tapsmofrq** : spectral smoothing box for slepian tapers (in Hz)
* **nTaper** : number of orthogonal tapers for slepian tapers
* **keeptapers** : return individual tapers or average
* **pad**: either pad to an absolute length or set to `'nextpow2'`
Post-processing of the resulting spectra with FOOOF is available
via setting `output` to one of `'fooof'`, `'fooof_aperiodic'` or
`'fooof_peaks'`, see below for details. The returned spectrum represents
the full fooofed spectrum for `'fooof'`, the aperiodic
fit for `'fooof_aperiodic'`, and the peaks (Gaussians fit to them) for
`'fooof_peaks'`. Returned data is in linear scale. Noisy input
data will most likely lead to fitting issues with fooof, always inspect
your results!
"mtmconvol" : (Multi-)tapered sliding window Fourier transform
Perform time-frequency analysis on time-series trial data based on a sliding
window short-time Fourier transform using either a single Hanning taper or
multiple DPSS tapers.
* **taper** : one of :data:`~syncopy.shared.const_def.availableTapers`
* **tapsmofrq** : spectral smoothing box for slepian tapers (in Hz)
* **nTaper** : number of orthogonal tapers for slepian tapers
* **keeptapers** : return individual tapers or average
* **toi** : time-points of interest; can be either an array representing
analysis window centroids (in sec), a scalar between 0 and 1 encoding
the percentage of overlap between adjacent windows or "all" to center
a window on every sample in the data.
* **t_ftimwin** : sliding window length (in sec)
"welch" : Welch's method for the estimation of power spectra based on
time-averaging over short, modified periodograms. Here, *modified* means that
a taper is applied.
See [Welch1967]_ for details.
* **toi** : time-points of interest; a scalar between 0 and 1 encoding
the percentage of overlap between adjacent windows.
* **t_ftimwin** : sliding window length (in sec)
* **taper** : one of :data:`~syncopy.shared.const_def.availableTapers`
* **tapsmofrq** : spectral smoothing box for slepian tapers (in Hz)
* **nTaper** : number of orthogonal tapers for slepian tapers
* **keeptapers** : must be `False` with Welch. For multi-tapering,
taper averaging happens as part of the modified periodogram computation,
i.e., before the window averaging performed by Welch.
"wavelet" : (Continuous non-orthogonal) wavelet transform
Perform time-frequency analysis on time-series trial data using a non-orthogonal
continuous wavelet transform.
* **wavelet** : one of :data:`~syncopy.specest.freqanalysis.availableWavelets`
* **toi** : time-points of interest; can be either an array representing
time points (in sec) or "all"(pre-trimming and subsampling of results)
* **width** : Nondimensional frequency constant of Morlet wavelet function (>= 6)
* **order** : Order of Paul wavelet function (>= 4) or derivative order
of real-valued DOG wavelets (2 = mexican hat)
"superlet" : Superlet transform
Perform time-frequency analysis on time-series trial data using
the super-resolution superlet transform (SLT) from [Moca2021]_.
* **order_max** : Maximal order of the superlet
* **order_min** : Minimal order of the superlet
* **c_1** : Number of cycles of the base Morlet wavelet
* **adaptive** : If set to `True` perform fractional adaptive SLT,
otherwise perform multiplicative SLT
**Full documentation below**
Parameters
----------
data : `~syncopy.AnalogData`
A non-empty Syncopy :class:`~syncopy.datatype.AnalogData` object
method : str
Spectral estimation method, one of :data:`~syncopy.specest.freqanalysis.availableMethods`
(see below).
output : str
Output of spectral estimation. One of :data:`~syncopy.specest.freqanalysis.availableOutputs` (see below);
use `'pow'` for power spectrum (:obj:`numpy.float32`), `'fourier'` for complex
Fourier coefficients (:obj:`numpy.complex64`) or `'abs'` for absolute
values (:obj:`numpy.float32`). Use one of `'fooof'`, `'fooof_aperiodic'` or
`'fooof_peaks'` to request post-processing of the results with FOOOF, also see
the `'fooof_opt'` parameter description.
keeptrials : bool
If `True` spectral estimates of individual trials are returned, otherwise
results are averaged across trials.
foi : array-like or None
Frequencies of interest (Hz) for output. If desired frequencies cannot be
matched exactly, the closest possible frequencies are used. If `foi` is `None`
or ``foi = "all"``, all attainable frequencies (i.e., zero to Nyquist / 2)
are selected.
foilim : array-like (floats [fmin, fmax]) or None or "all"
Frequency-window ``[fmin, fmax]`` (in Hz) of interest. Window
specifications must be sorted (e.g., ``[90, 70]`` is invalid) and not NaN
but may be unbounded (e.g., ``[-np.inf, 60.5]`` is valid). Edges `fmin`
and `fmax` are included in the selection. If `foilim` is `None` or
``foilim = "all"``, all frequencies are selected.
pad : 'maxperlen', float or 'nextpow2'
For the default `maxperlen`, no padding is performed in case of equal
length trials, while trials of varying lengths are padded to match the
longest trial. If `pad` is a number all trials are padded so that `pad` indicates
the absolute length of all trials after padding in seconds. For instance
``pad = 2`` pads all trials to an absolute length of 2000 samples, if and
only if the longest trial contains at maximum 2000 samples and the
samplerate is 1kHz. If `pad` is `'nextpow2'` all trials are padded to the
nearest power of two (in samples) of the longest trial.
polyremoval : int or None
Order of polynomial used for de-trending data in the time domain prior
to spectral analysis. A value of 0 corresponds to subtracting the mean
("de-meaning"), ``polyremoval = 1`` removes linear trends (subtracting the
least squares fit of a linear polynomial).
If `polyremoval` is `None`, no de-trending is performed. Note that
for spectral estimation de-meaning is very advisable and hence also the
default.
tapsmofrq : float or None
Only valid if `method` is `'mtmfft'` or `'mtmconvol'`
Enables multi-tapering and sets the amount of one-sided spectral
smoothing with slepian tapers in Hz.
nTaper : int or None
Only valid if `method` is `'mtmfft'` or `'mtmconvol'` and `tapsmofrq` is set.
Number of orthogonal tapers to use for multi-tapering. It is not recommended to set the number
of tapers manually! Leave at `None` for the optimal number to be set automatically.
taper : str or None, optional
Only valid if ``method`` is ``'mtmfft'`` or ``'mtmconvol'``. Windowing function,
one of :data:`~syncopy.shared.const_def.availableTapers`
For multi-tapering with slepian tapers use `tapsmofrq` directly.
demean_taper : bool
Set to `True` to perform de-meaning after tapering. Recommended for later Granger
analysis with :func:`~syncopy.connectivityanalysis`. Only valid for ``method='mtmfft'``.
taper_opt : dict or None
Dictionary with keys for additional taper parameters.
For example :func:`~scipy.signal.windows.kaiser` has
the additional parameter 'beta'. For multi-tapering use `tapsmofrq` directly.
keeptapers : bool
Only valid if `method` is `'mtmfft'` or `'mtmconvol'` and multi-tapering enabled
via setting `tapsmofrq`.
If `True`, return spectral estimates for each taper.
Otherwise power spectrum is averaged across tapers,
if and only if `output` is `pow`.
toi : float or array-like or "all"
**Mandatory input** for time-frequency analysis methods (`method` is either
`"mtmconvol"` or `"wavelet"` or `"superlet"`).
If `toi` is scalar, it must be a value between 0 and 1 indicating the
percentage of overlap between time-windows specified by `t_ftimwin` (only
valid if `method` is `'mtmconvol'`).
If `toi` is an array it explicitly selects the centroids of analysis
windows (in seconds), if `toi` is `"all"`, analysis windows are centered
on all samples in the data for `method="mtmconvol"`. For wavelet based
methods (`"wavelet"` or `"superlet"`) toi needs to be either an
equidistant array of time points or "all".
t_ftimwin : positive float
Only valid if `method` is `'mtmconvol'` or `'welch'`. Sliding window length (in seconds).
wavelet : str
Only valid if `method` is `'wavelet'`. Wavelet function to use, one of
:data:`~syncopy.specest.freqanalysis.availableWavelets` (see below).
width : positive float
Only valid if `method` is `'wavelet'` and `wavelet` is `'Morlet'`. Nondimensional
frequency constant of Morlet wavelet function. This number should be >= 6,
which corresponds to 6 cycles within the analysis window to ensure sufficient
spectral sampling.
order : positive int
Only valid if `method` is `'wavelet'` and `wavelet` is `'Paul'` or `'DOG'`. Order
of the wavelet function. If `wavelet` is `'Paul'`, `order` should be chosen
>= 4 to ensure that the analysis window contains at least a single oscillation.
At an order of 40, the Paul wavelet exhibits about the same number of cycles
as the Morlet wavelet with a `width` of 6.
All other supported wavelets functions are *real-valued* derivatives of
Gaussians (DOGs). Hence, if `wavelet` is `'DOG'`, `order` represents the derivative order.
The special case of a second order DOG yields a function known as "Mexican Hat",
"Marr" or "Ricker" wavelet, which can be selected alternatively by setting
`wavelet` to `'Mexican_hat'`, `'Marr'` or `'Ricker'`. **Note**: A real-valued
wavelet function encodes *only* information about peaks and discontinuities
in the signal and does *not* provide any information about amplitude or phase.
order_max : int
Only valid if `method` is `'superlet'`.
Maximal order of the superlet set. Controls the maximum
number of cycles within a SL together
with the `c_1` parameter: c_max = c_1 * order_max
order_min : int
Only valid if `method` is `'superlet'`.
Minimal order of the superlet set. Controls
the minimal number of cycles within a SL together
with the `c_1` parameter: c_min = c_1 * order_min
Note that for admissability reasons c_min should be at least 3!
c_1 : int
Only valid if `method` is `'superlet'`.
Number of cycles of the base Morlet wavelet. If set to lower
than 3 increase `order_min` as to never have less than 3 cycles
in a wavelet!
adaptive : bool
Only valid if `method` is `'superlet'`.
Wether to perform multiplicative SLT or fractional adaptive SLT.
If set to True, the order of the wavelet set will increase
linearly with the frequencies of interest from `order_min`
to `order_max`. If set to False the same SL will be used for
all frequencies.
fooof_opt : dict or None
Only valid if `method` is `'mtmfft'` and `output` is one of
`'fooof'`, `'fooof_aperiodic'`, or `'fooof_peaks'`.
Additional keyword arguments passed to the `FOOOF` constructor. Available
arguments include ``'peak_width_limits'``, ``'max_n_peaks'``, ``'min_peak_height'``,
``'peak_threshold'``, and ``'aperiodic_mode'``.
Please refer to the
`FOOOF docs <https://fooof-tools.github.io/fooof/generated/fooof.FOOOF.html#fooof.FOOOF>`_
for the meanings and the defaults.
See the FOOOF reference [Donoghue2020]_ for details.
ft_compat : bool, optional
Set to `True` to use Field Trip's spectral normalization for FFT based methods
(``method='mtmfft'`` and ``method='mtmconvol'``). So spectral power is NOT
independent of the padding size!
Returns
-------
spec : :class:`~syncopy.SpectralData`
(Time-)frequency spectrum of input data. The `spec` may contain additional metadata,
based on the `method` used to compute it:
* For `method='mtmfft'` when `output` is one of
`'fooof'`, `'fooof_aperiodic'`, or `'fooof_peaks'`, the `spec.metadata` property contains
the keys listed and explained in :data:`~syncopy.specest.compRoutines.FooofSpy.metadata_keys`.
Notes
-----
.. [Moca2021] Moca, Vasile V., et al. "Time-frequency super-resolution with superlets."
Nature communications 12.1 (2021): 1-18.
.. [Donoghue2020] Donoghue et al. 2020, DOI 10.1038/s41593-020-00744-x.
.. [Welch1967] Welch. "The use of fast Fourier transform for the estimation of power spectra: A method based on time averaging over short, modified periodograms.", 1976, DOI 10.1109/TAU.1967.1161901
**Options**
.. autodata:: syncopy.specest.freqanalysis.availableMethods
.. autodata:: syncopy.specest.freqanalysis.availableOutputs
.. autodata:: syncopy.shared.const_def.availableTapers
.. autodata:: syncopy.specest.freqanalysis.availableWavelets
Examples
--------
Generate 10 seconds of white noise, sampled at 1000 Hz:
>>> import syncopy.tests.synth_data as synth_data
>>> wn = synth_data.white_noise(nTrials=2, nChannels=3, nSamples=10000, samplerate=1000)
Configure Welch's method to estimate the power spectral density with a window length of 0.25 seconds
and an overlap of 50 percent between the windows:
>>> cfg = spy.get_defaults(spy.freqanalysis)
>>> cfg.method = "welch"
>>> cfg.t_ftimwin = 0.25 # Window length in seconds.
>>> cfg.toi = 0.5 # Overlap between periodograms (0.5 = 50 percent overlap).
Run Welch:
>>> psd = spy.freqanalysis(cfg, wn)
Visualize the result for the first trial:
>>> _, ax = res.singlepanelplot(trials=0, logscale=False)
>>> ax.set_title("Welch result")
See also
--------
syncopy.specest.mtmfft.mtmfft : (multi-)tapered Fourier transform of multi-channel time series data
syncopy.specest.mtmconvol.mtmconvol : time-frequency analysis of multi-channel time series data with a sliding window FFT
syncopy.specest.wavelet.wavelet : time-frequency analysis of multi-channel time series data using a wavelet transform
syncopy.specest.fooofspy.fooofspy : parameterization of neural power spectra with the 'fitting oscillations & one over f' method
numpy.fft.fft : NumPy's reference FFT implementation
scipy.signal.stft : SciPy's Short Time Fourier Transform
"""
# Make sure our one mandatory input object can be processed
try:
data_parser(data, varname="data", dataclass="AnalogData", writable=None, empty=False)
except Exception as exc:
raise exc
timeAxis = data.dimord.index("time")
# Get everything of interest in local namespace
defaults = get_defaults(freqanalysis)
lcls = locals()
# check for ineffective additional kwargs
check_passed_kwargs(lcls, defaults, frontend_name="freqanalysis")
new_cfg = get_frontend_cfg(defaults, lcls, kwargs)
is_fooof = False
if method == "mtmfft" and output.startswith("fooof"):
is_fooof = True
output_fooof = output
output = "pow" # We need to change this as the mtmfft running first will complain otherwise.
if is_fooof:
if output_fooof not in availableFooofOutputs:
lgl = "'" + "or '".join(opt + "' " for opt in availableFooofOutputs)
raise SPYValueError(legal=lgl, varname="output_fooof", actual=output_fooof)
# Ensure a valid computational method was selected
if method not in availableMethods:
lgl = "'" + "or '".join(opt + "' " for opt in availableMethods)
raise SPYValueError(legal=lgl, varname="method", actual=method)
# Ensure a valid output format was selected
valid_outputs = spectralConversions.keys()
if output not in valid_outputs:
lgl = "'" + "or '".join(opt + "' " for opt in valid_outputs)
raise SPYValueError(legal=lgl, varname="output", actual=output)
# Parse all Boolean keyword arguments
for vname in ["keeptrials", "keeptapers", "demean_taper", "ft_compat"]:
if not isinstance(lcls[vname], bool):
raise SPYTypeError(lcls[vname], varname=vname, expected="Bool")
# If only a subset of `data` is to be processed, make some necessary adjustments
# of the sampleinfo and trial lengths
if data.selection is not None:
# Refuse to go ahead with active time selection and provided `toi` on top`
if any(tsel != slice(None) for tsel in data.selection.time) and isinstance(toi, (np.ndarray, list)):
lgl = "no `toi` specification due to active in-place time-selection in input dataset"
raise SPYValueError(legal=lgl, varname="toi", actual=toi)
sinfo = data.selection.trialdefinition[:, :2]
trialList = data.selection.trial_ids
else:
trialList = list(range(len(data.trials)))
sinfo = data.sampleinfo
lenTrials = np.diff(sinfo).squeeze()
if not lenTrials.shape:
lenTrials = lenTrials[None]
numTrials = len(trialList)
# check polyremoval
if polyremoval is not None:
scalar_parser(polyremoval, varname="polyremoval", ntype="int_like", lims=[0, 1])
# --- Padding ---
# Sliding window FFT does not support "fancy" padding
if method in ["mtmconvol", "welch"] and isinstance(pad, str) and pad != defaults["pad"]:
msg = (
"methods 'mtmconvol' and 'welch' only support in-place padding for windows "
+ "exceeding trial boundaries. Your choice of `pad = '{}'` will be ignored. "
)
SPYWarning(msg.format(pad))
if method == "mtmfft":
# the actual number of samples in case of later padding
minSampleNum = process_padding(pad, lenTrials, data.samplerate)
else:
minSampleNum = lenTrials.min()
# Compute length (in seconds) of shortest trial
minTrialLength = minSampleNum / data.samplerate
# Shortcut to data sampling interval
dt = 1 / data.samplerate
foi, foilim = process_foi(foi, foilim, data.samplerate)
# see also https://docs.obspy.org/_modules/obspy/signal/detrend.html#polynomial
if polyremoval is not None:
try:
scalar_parser(polyremoval, varname="polyremoval", lims=[0, 1], ntype="int_like")
except Exception as exc:
raise exc
# Prepare keyword dict for logging (use `lcls` to get actually provided
# keyword values, not defaults set above)
log_dct = {
"method": method,
"output": output_fooof if is_fooof else output,
"keeptapers": keeptapers,
"keeptrials": keeptrials,
"polyremoval": polyremoval,
"pad": pad,
}
SPYLog(f"Running specest method '{method}'.", loglevel="DEBUG")
# --------------------------------
# 1st: Check time-frequency inputs
# to prepare/sanitize `toi`
# --------------------------------
if method in ["mtmconvol", "wavelet", "superlet", "welch"]:
# Get start/end timing info respecting potential in-place selection
if toi is None:
raise SPYTypeError(toi, varname="toi", expected="scalar or array-like or 'all'")
if data.selection is not None:
tStart = data.selection.trialdefinition[:, 2] / data.samplerate
else:
tStart = data._t0 / data.samplerate
tEnd = tStart + lenTrials / data.samplerate
# for these methods only 'all' or an equidistant array
# of time points (sub-sampling, trimming) are valid
if method in ["wavelet", "superlet"]:
valid = True
if np.issubdtype(type(toi), np.number):
valid = False
elif isinstance(toi, str):
if toi != "all":
valid = False
else:
# take everything
preSelect = [slice(None)] * numTrials
postSelect = [slice(None)] * numTrials
elif not iter(toi):
valid = False
# this is the sequence type - can only be an interval!
else:
try:
array_parser(
toi,
varname="toi",
hasinf=False,
hasnan=False,
lims=[tStart.min(), tEnd.max()],
dims=(None,),
)
except Exception as exc:
raise exc
toi = np.array(toi)
# check for equidistancy
if not np.allclose(np.diff(toi, 2), np.zeros(len(toi) - 2)):
valid = False
# trim (preSelect) and subsample output (postSelect)
else:
preSelect = []
postSelect = []
# get sample intervals and relative indices from toi
for tk in range(numTrials):
start = int(data.samplerate * (toi[0] - tStart[tk]))
stop = int(data.samplerate * (toi[-1] - tStart[tk]) + 1)
preSelect.append(slice(max(0, start), max(stop, stop - start)))
smpIdx = np.minimum(lenTrials[tk] - 1, data.samplerate * (toi - tStart[tk]) - start)
postSelect.append(smpIdx.astype(np.intp))
# get out if sth wasn't right
if not valid:
lgl = "array of equidistant time-points or 'all' for wavelet based methods"
raise SPYValueError(legal=lgl, varname="toi", actual=toi)
# Update `log_dct` w/method-specific options (use `lcls` to get actually
# provided keyword values, not defaults set in here)
log_dct["toi"] = lcls["toi"]
# --------------------------------------------
# Check options specific to mtm*-methods
# (particularly tapers and foi/freqs alignment)
# --------------------------------------------
if "mtm" in method or method == "welch":
if method in ["mtmconvol", "welch"]:
# get the sliding window size
try:
scalar_parser(t_ftimwin, varname="t_ftimwin", lims=[dt, minTrialLength])
except Exception as exc:
SPYInfo("Please specify 't_ftimwin' parameter.. exiting!")
raise exc
# this is the effective sliding window FFT sample size
minSampleNum = int(t_ftimwin * data.samplerate)
if method == "welch":
if keeptapers:
raise SPYValueError(
legal="keeptapers='False' with method='welch'",
varname="keeptapers",
actual=keeptapers,
)
if output != "pow":
raise SPYValueError(
legal="output='pow' with method='welch'",
varname="output",
actual=output,
)
# Construct array of maximally attainable frequencies
freqs = np.fft.rfftfreq(minSampleNum, dt)
# Match desired frequencies as close as possible to
# actually attainable freqs
# these are the frequencies attached to the SpectralData by the CR!
if foi is not None:
foi, _ = best_match(freqs, foi, squash_duplicates=True)
elif foilim is not None:
foi, _ = best_match(freqs, foilim, span=True, squash_duplicates=True)
else:
msg = f"Automatic FFT frequency selection from {freqs[0]:.1f}Hz to " f"{freqs[-1]:.1f}Hz"
SPYInfo(msg)
foi = freqs
log_dct["foi"] = foi
# Abort if desired frequency selection is empty
if foi.size == 0:
lgl = "non-empty frequency specification"
act = "empty frequency selection"
raise SPYValueError(legal=lgl, varname="foi/foilim", actual=act)
# sanitize taper selection and/or retrieve dpss settings
taper, taper_opt = process_taper(
taper,
taper_opt,
tapsmofrq,
nTaper,
keeptapers,
foimax=foi.max(),
samplerate=data.samplerate,
nSamples=lenTrials.mean(), # best we can do here
output=output,
)
# Update `log_dct` w/method-specific options
log_dct["taper"] = taper
if taper_opt and taper == "dpss":
log_dct["nTaper"] = taper_opt["Kmax"]
log_dct["tapsmofrq"] = tapsmofrq
elif taper_opt:
log_dct["taper_opt"] = taper_opt
# -------------------------------------------------------
# Now, prepare explicit compute-classes for chosen method
# -------------------------------------------------------
if method == "mtmfft":
check_effective_parameters(MultiTaperFFT, defaults, lcls)
# method specific parameters
method_kwargs = {
"samplerate": data.samplerate,
"taper": taper,
"taper_opt": taper_opt,
"nSamples": minSampleNum,
"demean_taper": demean_taper,
"ft_compat": ft_compat,
}
# Set up compute-class
specestMethod = MultiTaperFFT(
foi=foi,
timeAxis=timeAxis,
keeptapers=keeptapers,
polyremoval=polyremoval,
output=output,
method_kwargs=method_kwargs,
)
elif method in ["mtmconvol", "welch"]:
check_effective_parameters(MultiTaperFFTConvol, defaults, lcls)
# Process `toi` for sliding window multi taper fft,
# we have to account for three scenarios: (1) center sliding
# windows on all samples in (selected) trials (2) `toi` was provided as
# percentage indicating the degree of overlap b/w time-windows and (3) a set
# of discrete time points was provided. These three cases are encoded in
# `overlap, i.e., ``overlap > 1` => all, `0 < overlap < 1` => percentage,
# `overlap < 0` => discrete `toi`
# overlap = None
if isinstance(toi, str):
if method == "welch":
lgl = "toi to be a float in range [0, 1] for method='welch'"
raise SPYValueError(legal=lgl, varname="toi", actual=toi)
if toi != "all":
lgl = "`toi = 'all'` to center analysis windows on all time-points"
raise SPYValueError(legal=lgl, varname="toi", actual=toi)
equidistant = True
overlap = np.inf
elif np.issubdtype(type(toi), np.number):
scalar_parser(toi, varname="toi", lims=[0, 1])
overlap = toi
equidistant = True
# this captures all other cases, e.i. toi is of sequence type
else:
if method == "welch":
lgl = "toi to be a float in range [0, 1] for method='welch'"
raise SPYValueError(legal=lgl, varname="toi", actual=toi)
overlap = -1
array_parser(
toi,
varname="toi",
hasinf=False,
hasnan=False,
lims=[tStart.min(), tEnd.max()],
dims=(None,),
)
toi = np.array(toi)
tSteps = np.diff(toi)
if (tSteps < 0).any():
lgl = "ordered list/array of time-points"
act = "unsorted list/array"
raise SPYValueError(legal=lgl, varname="toi", actual=act)
# Account for round-off errors: if toi spacing is almost at sample interval
# manually correct it
if np.isclose(tSteps.min(), dt):
tSteps[np.isclose(tSteps, dt)] = dt
if tSteps.min() < dt:
msg = f"`toi` selection too fine, max. time resolution is {dt}s"
SPYWarning(msg)
# This is imho a bug in NumPy - even `arange` and `linspace` may produce
# arrays that are numerically not exactly equidistant - `unique` will
# show several entries here - use `allclose` to identify "even" spacings
equidistant = np.allclose(tSteps, [tSteps[0]] * tSteps.size)
# If `toi` was 'all' or a percentage, use entire time interval of (selected)
# trials and check if those trials have *approximately* equal length
if toi is None:
if not np.allclose(lenTrials, [minSampleNum] * lenTrials.size):
msg = (
"processing trials of different lengths (min = {}; max = {} samples)"
+ " with `toi = 'all'`"
)
SPYWarning(msg.format(int(minSampleNum), int(lenTrials.max())))
# number of samples per window
nperseg = int(t_ftimwin * data.samplerate)
halfWin = int(nperseg / 2)
postSelect = slice(None) # select all is the default
if 0 <= overlap <= 1: # `toi` is percentage
noverlap = min(nperseg - 1, int(overlap * nperseg))
# windows get shifted exactly 1 sample
# to get a spectral estimate at each sample
else:
noverlap = nperseg - 1
# `toi` is array
if overlap < 0:
# Compute necessary padding at begin/end of trials to fit sliding windows
offStart = ((toi[0] - tStart) * data.samplerate).astype(np.intp)
padBegin = halfWin - offStart
padBegin = ((padBegin > 0) * padBegin).astype(np.intp)
offEnd = ((tEnd - toi[-1]) * data.samplerate).astype(np.intp)
padEnd = halfWin - offEnd
padEnd = ((padEnd > 0) * padEnd).astype(np.intp)
# Treat equi-distant `toi` arrays with spacing large enough that windows
# do not overlap as if they were not equidistant
if tSteps.max() * data.samplerate > halfWin and equidistant:
equidistant = False
# Compute sample-indices (one slice/list per trial) from time-selections
soi = []
if equidistant:
# soi just trims the input data to the [toi[0], toi[-1]] interval
# postSelect then subsamples the spectral esimate to the user given toi
postSelect = []
for tk in range(numTrials):
start = max(0, int(round(data.samplerate * (toi[0] - tStart[tk]) - halfWin)))
stop = int(round(data.samplerate * (toi[-1] - tStart[tk]) + halfWin + 1))
soi.append(slice(start, max(stop, stop - start)))
# chosen toi subsampling interval in sample units, min. is 1;
# compute `delta_idx` s.t. stop - start / delta_idx == toi.size
delta_idx = int(round((soi[0].stop - soi[0].start) / toi.size))
delta_idx = delta_idx if delta_idx > 1 else 1
postSelect = slice(None, None, delta_idx)
else:
for tk in range(numTrials):
starts = (data.samplerate * (toi - tStart[tk]) - halfWin).astype(np.intp)
starts += padBegin[tk]
stops = (data.samplerate * (toi - tStart[tk]) + halfWin + 1).astype(np.intp)
stops += padBegin[tk]
stops = np.maximum(stops, stops - starts, dtype=np.intp)
soi.append([slice(start, stop) for start, stop in zip(starts, stops)])
# postSelect here remains slice(None), as resulting spectrum
# has exactly one entry for each soi
# `toi` is percentage or "all"
else:
soi = [slice(None)] * numTrials
# Collect keyword args for `mtmconvol` in dictionary
method_kwargs = {
"samplerate": data.samplerate,
"nperseg": nperseg,
"noverlap": noverlap,
"taper": taper,
"taper_opt": taper_opt,
}
# Set up compute-class
specestMethod = MultiTaperFFTConvol(
soi,
postSelect,
equidistant=equidistant,
toi=toi,
foi=foi,
timeAxis=timeAxis,
keeptapers=keeptapers,
polyremoval=polyremoval,
output=output,
method_kwargs=method_kwargs,
)
elif method == "wavelet":
check_effective_parameters(WaveletTransform, defaults, lcls)
# Check wavelet selection
if wavelet not in availableWavelets:
lgl = "'" + "or '".join(opt + "' " for opt in availableWavelets)
raise SPYValueError(legal=lgl, varname="wavelet", actual=wavelet)
if wavelet not in ["Morlet", "Paul"]:
msg = (
"the chosen wavelet '{}' is real-valued and does not provide "
+ "any information about amplitude or phase of the data. This wavelet function "
+ "may be used to isolate peaks or discontinuities in the signal. "
)
SPYWarning(msg.format(wavelet))
# Check for consistency of `width`, `order` and `wavelet`
if wavelet == "Morlet":
try:
scalar_parser(width, varname="width", lims=[1, np.inf])
except Exception as exc:
raise exc
wfun = getattr(spywave, wavelet)(w0=width)
else:
if width != lcls["width"]:
msg = "option `width` has no effect for wavelet '{}'"
SPYWarning(msg.format(wavelet))
if wavelet == "Paul":
try:
scalar_parser(order, varname="order", lims=[4, np.inf], ntype="int_like")
except Exception as exc:
raise exc
wfun = getattr(spywave, wavelet)(m=order)
elif wavelet == "DOG":
try:
scalar_parser(order, varname="order", lims=[1, np.inf], ntype="int_like")
except Exception as exc:
raise exc
wfun = getattr(spywave, wavelet)(m=order)
else:
if order is not None:
msg = "option `order` has no effect for wavelet '{}'"
SPYWarning(msg.format(wavelet))
wfun = getattr(spywave, wavelet)()
# automatic frequency selection
if foi is None and foilim is None:
scales = get_optimal_wavelet_scales(
wfun.scale_from_period, # all availableWavelets sport one!
int(minTrialLength * data.samplerate),
dt,
)
foi = 1 / wfun.fourier_period(scales)
msg = f"Setting frequencies of interest to {foi[0]:.1f}-" f"{foi[-1]:.1f}Hz"
SPYInfo(msg)
else:
if foilim is not None:
foi = np.arange(foilim[0], foilim[1] + 1, dtype=float)
# 0 frequency is not valid
foi[foi < 0.01] = 0.01
scales = wfun.scale_from_period(1 / foi)
# Update `log_dct` w/method-specific options (use `lcls` to get actually
# provided keyword values, not defaults set in here)
log_dct["foi"] = foi
log_dct["wavelet"] = lcls["wavelet"]
log_dct["width"] = lcls["width"]
log_dct["order"] = lcls["order"]
# method specific parameters
method_kwargs = {
"samplerate": data.samplerate,
"scales": scales,
"wavelet": wfun,
}
# Set up compute-class
specestMethod = WaveletTransform(
preSelect,
postSelect,
toi=toi,
timeAxis=timeAxis,
polyremoval=polyremoval,
output=output,
method_kwargs=method_kwargs,
)
elif method == "superlet":
check_effective_parameters(SuperletTransform, defaults, lcls)
# check and parse superlet specific arguments
if order_max is None:
lgl = "Positive integer needed for order_max"
raise SPYValueError(legal=lgl, varname="order_max", actual=None)
else:
scalar_parser(order_max, varname="order_max", lims=[1, np.inf], ntype="int_like")
scalar_parser(order_min, varname="order_min", lims=[1, order_max], ntype="int_like")
scalar_parser(c_1, varname="c_1", lims=[1, np.inf], ntype="int_like")
# if no frequencies are user selected, take a sensitive default
if foi is None and foilim is None:
scales = get_optimal_wavelet_scales(
superlet.scale_from_period, int(minTrialLength * data.samplerate), dt
)
foi = 1 / superlet.fourier_period(scales)
msg = f"Setting frequencies of interest to {foi[0]:.1f}-" f"{foi[-1]:.1f}Hz"
SPYInfo(msg)
else:
if foilim is not None:
# frequency range in 1Hz steps
foi = np.arange(foilim[0], foilim[1] + 1, dtype=float)
# 0 frequency is not valid
foi[foi < 0.01] = 0.01
scales = superlet.scale_from_period(1.0 / foi)
# FASLT needs ordered frequencies low - high
# meaning the scales have to go high - low
if adaptive:
if len(scales) < 2:
lgl = "A range of frequencies"
act = "Single frequency"
raise SPYValueError(legal=lgl, varname="foi", actual=act)
if np.any(np.diff(scales) > 0):
msg = "Sorting frequencies low to high for adaptive SLT.."
SPYWarning(msg)
scales = np.sort(scales)[::-1]
log_dct["foi"] = foi
log_dct["c_1"] = lcls["c_1"]
log_dct["order_max"] = lcls["order_max"]
log_dct["order_min"] = lcls["order_min"]
# method specific parameters
method_kwargs = {
"samplerate": data.samplerate,
"scales": scales,
"order_max": order_max,
"order_min": order_min,
"c_1": c_1,
"adaptive": adaptive,
}
# Set up compute-class
specestMethod = SuperletTransform(
preSelect,
postSelect,
toi=toi,
timeAxis=timeAxis,
polyremoval=polyremoval,
output=output,
method_kwargs=method_kwargs,
)
# -------------------------------------------------
# Sanitize output and call the ComputationalRoutine
# -------------------------------------------------
out = SpectralData(dimord=SpectralData._defaultDimord)
# Perform actual computation
specestMethod.initialize(
data,
out._stackingDim,
chan_per_worker=kwargs.get("chan_per_worker"),
keeptrials=keeptrials,
)
specestMethod.compute(data, out, parallel=kwargs.get("parallel"), log_dict=log_dct)
# FOOOF is a post-processing method of MTMFFT output, so we handle it here, once
# the MTMFFT has finished.
if is_fooof:
# Use the output of the MTMFFMT method as the new data and create new output data.
fooof_data = out
fooof_out = SpectralData(dimord=SpectralData._defaultDimord)
# method specific parameters
if fooof_opt is None:
fooof_opt = default_fooof_opt
# These go into the FOOOF constructor, so we keep them separate from the fooof_settings below.
fooof_kwargs = {
**default_fooof_opt,
**fooof_opt,
} # Join the ones from fooof_opt (the user) into the default fooof_kwargs.
# Settings used during the FOOOF analysis (that are NOT passed to FOOOF constructor).
# The user cannot influence these: in_freqs is derived from mtmfft output, freq_range is always None (=full mtmfft output spectrum).
# We still define them here, and they are passed through to the backend and actually used there.
fooof_settings = {
"in_freqs": fooof_data.freq,
"freq_range": None, # or something like [2, 40] to limit frequency range (post processing). Currently not exposed to user.
}
if fooof_data.freq[0] == 0:
# FOOOF does not work with input frequency zero in the data.
raise SPYValueError(
legal="a frequency range that does not include zero. Use 'foi' or 'foilim' to restrict.",
varname="foi/foilim",
actual="Frequency range from {} to {}.".format(min(fooof_data.freq), max(fooof_data.freq)),
)
# Set up compute-class
# - the output must be one of 'fooof', 'fooof_aperiodic',
# or 'fooof_peaks'.
# - everything passed as method_kwargs is passed as arguments
# to the fooof.FOOOF() constructor or functions, the other args are
# used elsewhere.
fooofMethod = FooofSpy(
output=output_fooof,
fooof_settings=fooof_settings,
method_kwargs=fooof_kwargs,
)
# Update `log_dct` w/method-specific options
log_dct["fooof_method"] = output_fooof
log_dct["fooof_opt"] = fooof_kwargs
# Perform actual computation
fooofMethod.initialize(
fooof_data,
fooof_out._stackingDim,
chan_per_worker=kwargs.get("chan_per_worker"),
keeptrials=keeptrials,
)
fooofMethod.compute(fooof_data, fooof_out, parallel=kwargs.get("parallel"), log_dict=log_dct)
out = fooof_out
# Perform mtmconvolv post-processing for `method='welch'`.
if method == "welch":
welch_data = out
out = spy.mean(welch_data, dim="time")
# Attach potential older cfg's from the input
# to support chained frontend calls.
out.cfg.update(data.cfg)
# Attach frontend parameters for replay.
out.cfg.update({"freqanalysis": new_cfg})
return out