Source code for syncopy.specest.mtmfft

# -*- coding: utf-8 -*-
#
# Spectral estimation with (multi-)tapered FFT
#

# Builtin/3rd party package imports
import numpy as np
from scipy import signal
import logging
import platform

# local imports
from ._norm_spec import _norm_spec, _norm_taper


[docs]def mtmfft( data_arr, samplerate, nSamples=None, taper="hann", taper_opt=None, demean_taper=False, ft_compat=False, ): """ (Multi-)tapered fast Fourier transform. Returns full complex Fourier transform for each taper. Multi-tapering only supported with Slepian windwows (`taper="dpss"`). Parameters ---------- data_arr : (N,) :class:`numpy.ndarray` Uniformly sampled multi-channel time-series data The 1st dimension is interpreted as the time axis samplerate : float Samplerate in Hz nSamples : int or None Absolute length of the (potentially to be padded) signals or `None` for no padding. taper : str or None Taper function to use, one of `scipy.signal.windows` Set to `None` for no tapering. taper_opt : dict or None Additional keyword arguments passed to the `taper` function. For multi-tapering with ``taper='dpss'`` set the keys `'Kmax'` and `'NW'`. For further details, please refer to the `SciPy docs <https://docs.scipy.org/doc/scipy/reference/signal.windows.html>`_ demean_taper : bool Set to `True` to perform de-meaning after tapering ft_compat : bool Set to `True` to use Field Trip's normalization, which is NOT independent of the padding size Returns ------- ftr : 3D :class:`numpy.ndarray` Complex output has shape ``(nTapers x nFreq x nChannels)``. freqs : 1D :class:`numpy.ndarray` Array of Fourier frequencies Notes ----- For a (MTM) power spectral estimate average the absolute squared transforms across tapers: ``Sxx = np.real(ftr * ftr.conj()).mean(axis=0)`` The FFT result is normalized such that this yields the spectral power. For a clean harmonic this will give a peak power of `A**2 / 2`, with `A` as harmonic amplitude. """ # attach dummy channel axis in case only a # single signal/channel is the input if data_arr.ndim < 2: data_arr = data_arr[:, np.newaxis] # raw length without padding signal_length = data_arr.shape[0] if nSamples is None: nSamples = signal_length nChannels = data_arr.shape[1] freqs = np.fft.rfftfreq(nSamples, 1 / samplerate) nFreq = freqs.size # no taper is boxcar if taper is None: taper = "boxcar" if taper_opt is None: taper_opt = {} taper_func = getattr(signal.windows, taper) # only really 2d if taper='dpss' with Kmax > 1 # here we take the actual signal lengths! windows = np.atleast_2d(taper_func(signal_length, **taper_opt)) # normalize window with total (after padding) length windows = _norm_taper(taper, windows, nSamples) # Fourier transforms (nTapers x nFreq x nChannels) ftr = np.zeros((windows.shape[0], nFreq, nChannels), dtype="complex64") logger = logging.getLogger("syncopy_" + platform.node()) logger.debug( f"Running mtmfft on {len(windows)} windows, data chunk has {nSamples} samples and {nChannels} channels." ) for taperIdx, win in enumerate(windows): win = np.tile(win, (nChannels, 1)).T win *= data_arr # de-mean again after tapering - needed for Granger! if demean_taper: win -= win.mean(axis=0) ftr[taperIdx] = np.fft.rfft(win, n=nSamples, axis=0) # FT uses potentially padded length `nSamples`, which dilutes the power if ft_compat: ftr[taperIdx] = _norm_spec(ftr[taperIdx], nSamples, samplerate) # here the normalization adapts such that padding is NOT changing power else: ftr[taperIdx] = _norm_spec( ftr[taperIdx], signal_length * np.sqrt(nSamples / signal_length), samplerate, ) return ftr, freqs
def _get_dpss_pars(tapsmofrq, nSamples, samplerate): """Helper function to retrieve dpss parameters from tapsmofrq""" # taper width parameter in sample units NW = tapsmofrq * nSamples / samplerate # from the minBw formula in `input_processors.process_taper` # Kmax is at least 1! Kmax = int(2 * NW - 1) # optimal number of tapers # ..but NW can be 0.9999999999999999.. # catch those floating point issues Kmax = Kmax if Kmax > 1 else 1 return NW, Kmax