Source code for syncopy.specest.superlet

# -*- coding: utf-8 -*-
#
# Time-frequency analysis with superlets
# Based on 'Time-frequency super-resolution with superlets'
# by Moca et al., 2021 Nature Communications
#

# Builtin/3rd party package imports
import numpy as np
import logging
import platform
from scipy.signal import fftconvolve


[docs]def superlet( data_arr, samplerate, scales, order_max, order_min=1, c_1=3, adaptive=False, ): """ Performs Superlet Transform (SLT) according to Moca et al. [1]_ Both multiplicative SLT and fractional adaptive SLT are available. The former is recommended for a narrow frequency band of interest, whereas the latter is better suited for the analysis of a broad range of frequencies. A superlet (SL) is a set of Morlet wavelets with increasing number of cycles within the Gaussian envelope. Hence the bandwith is constrained more and more with more cycles yielding a sharper frequency resolution. Complementary the low cycle numbers will give a high time resolution. The SLT then is the geometric mean of the set of individual wavelet transforms, combining both wide and narrow-bandwidth wavelets into a super-resolution estimate. Parameters ---------- data_arr : nD :class:`numpy.ndarray` Uniformly sampled time-series data The 1st dimension is interpreted as the time axis samplerate : float Samplerate of the time-series in Hz scales : 1D :class:`numpy.ndarray` Set of scales to use in wavelet transform. Note that for the SL Morlet the relationship between scale and frequency simply is ``s(f) = 1/(2*pi*f)`` Need to be ordered high to low for `adaptive=True` order_max : int Maximal order of the superlet set. Controls the maximum number of cycles within a SL together with the `c_1` parameter: ``c_max = c_1 * order_max`` order_min : int Minimal order of the superlet set. Controls the minimal number of cycles within a SL together with the `c_1` parameter: ``c_min = c_1 * order_min`` Note that for admissability reasons c_min should be at least 3! c_1 : int Number of cycles of the base Morlet wavelet. If set to lower than 3 increase `order_min` as to never have less than 3 cycles in a wavelet! adaptive : bool Whether to perform fractional adaptive SLT or multiplicative SLT. If set to True, the order of the wavelet set will increase linearly with the frequencies of interest from `order_min` to `order_max`. If set to False the same SL will be used for all frequencies. Returns ------- gmean_spec : :class:`numpy.ndarray` Complex time-frequency representation of the input data. Shape is ``(len(scales),) + data_arr.shape`` Notes ----- .. [1] Moca, Vasile V., et al. "Time-frequency super-resolution with superlets." Nature communications 12.1 (2021): 1-18. """ logger = logging.getLogger("syncopy_" + platform.node()) # adaptive SLT if adaptive: logger.debug( f"Running fractional adaptive superlet transform with order_min={order_min}, order_max={order_max} and c_1={c_1} on data with shape {data_arr.shape}." ) gmean_spec = FASLT(data_arr, samplerate, scales, order_max, order_min, c_1) # multiplicative SLT else: logger.debug( f"Running multiplicative superlet transform with order_min={order_min}, order_max={order_max} and c_1={c_1} on data with shape {data_arr.shape}." ) gmean_spec = multiplicativeSLT(data_arr, samplerate, scales, order_max, order_min, c_1) return gmean_spec
def multiplicativeSLT(data_arr, samplerate, scales, order_max, order_min=1, c_1=3): dt = 1 / samplerate # create the complete multiplicative set spanning # order_min - order_max cycles = c_1 * np.arange(order_min, order_max + 1) order_num = order_max + 1 - order_min # number of different orders SL = [MorletSL(c) for c in cycles] # lowest order gmean_spec = cwtSL(data_arr, SL[0], scales, dt) gmean_spec = np.power(gmean_spec, 1 / order_num) for wavelet in SL[1:]: spec = cwtSL(data_arr, wavelet, scales, dt) gmean_spec *= np.power(spec, 1 / order_num) return gmean_spec def FASLT(data_arr, samplerate, scales, order_max, order_min=1, c_1=3): """Fractional adaptive SL transform For non-integer orders fractional SLTs are calculated in the interval [order, order+1) via: R(o_f) = R_1 * R_2 * ... * R_i * R_i+1 ** alpha with o_f = o_i + alpha """ dt = 1 / samplerate # frequencies of interest # from the scales for the SL Morlet fois = 1 / (2 * np.pi * scales) orders = compute_adaptive_order(fois, order_min, order_max) # create the complete superlet set from # all enclosed integer orders orders_int = np.int32(np.floor(orders)) cycles = c_1 * np.unique(orders_int) SL = [MorletSL(c) for c in cycles] # every scale needs a different exponent # for the geometric mean exponents = 1 / (orders - order_min + 1) # which frequencies/scales use the same integer orders order_jumps = np.where(np.diff(orders_int))[0] # each frequency/scale will have its own multiplicative SL # which overlap -> higher orders enclose all the lower orders assert len(SL) == len(order_jumps) + 1 # the fractions alphas = orders % orders_int # 1st order # lowest order is needed for all scales/frequencies gmean_spec = cwtSL(data_arr, SL[0], scales, dt) # 1st order <-> order_min # Geometric normalization according to scale dependent order gmean_spec = np.power(gmean_spec.T, exponents).T # we go to the next scale and order in any case.. # but for order_max == 1 for which order_jumps is empty last_jump = 1 for i, jump in enumerate(order_jumps): # relevant scales for the next order scales_o = scales[last_jump:] # order + 1 spec next_spec = cwtSL(data_arr, SL[i + 1], scales_o, dt) # which fractions for the current next_spec # in the interval [order, order+1) scale_span = slice(last_jump, jump + 1) gmean_spec[scale_span, :] *= np.power( next_spec[: jump - last_jump + 1].T, alphas[scale_span] * exponents[scale_span], ).T # multiply non-fractional next_spec for # all remaining scales/frequencies gmean_spec[jump + 1 :] *= np.power(next_spec[jump - last_jump + 1 :].T, exponents[jump + 1 :]).T # go to the next [order, order+1) interval last_jump = jump + 1 return gmean_spec def adaptiveSLT(data_arr, samplerate, scales, order_max, order_min=1, c_1=3): """This function is not used atm, it implements the non-fractional adaptive SLT. Kept here for reference/comparisons if ever needed""" dt = 1 / samplerate # frequencies of interest # from the scales for the SL Morlet # for len(orders) < len(scales) # multiple scales have the same order/wavelet set (discrete banding) fois = 1 / (2 * np.pi * scales) orders = compute_adaptive_order(fois, order_min, order_max) orders = np.int32(np.rint(orders)) # create the complete superlet cycles = c_1 * np.unique(orders) SL = [MorletSL(c) for c in cycles] # potentially every scale needs a different exponent # for the geometric mean exponents = 1 / orders # which frequencies/scales use the same SL order_jumps = np.where(np.diff(orders))[0] # if len(orders) >= len(scales) this is just # a continuous index array [0, 1, ..., len(scales) - 2] # as every scale has it's own order # otherwise it provides the mapping scales -> order assert len(SL) == len(order_jumps) + 1 == np.unique(orders).size # 1st order # lowest order is needed for all scales/frequencies gmean_spec = cwtSL(data_arr, SL[0], scales, dt) # 1st order <-> order_min # Geometric normalization according to scale dependent order gmean_spec = np.power(gmean_spec.T, exponents).T # each frequency/scale can have its own multiplicative SL # which overlap -> higher orders have all the lower orders for i, jump in enumerate(order_jumps): # relevant scales for that order scales_o = scales[jump + 1 :] wavelet = SL[i + 1] spec = cwtSL(data_arr, wavelet, scales_o, dt) # normalize according to scale dependent order spec = np.power(spec.T, exponents[jump + 1 :]).T gmean_spec[jump + 1 :] *= spec return gmean_spec class MorletSL: def __init__(self, c_i=3, k_sd=5): """The Morlet formulation according to Moca et al. shifts the admissability criterion from the central frequency to the number of cycles c_i within the Gaussian envelope which has a constant standard deviation of k_sd. """ self.c_i = c_i self.k_sd = k_sd def __call__(self, *args, **kwargs): return self.time(*args, **kwargs) def time(self, t, s=1.0): """ Complext Morlet wavelet in the SL formulation. Parameters ---------- t : float Time. If s is not specified, this can be used as the non-dimensional time t/s. s : float Scaling factor. Default is 1. Returns ------- out : complex Value of the Morlet wavelet at the given time """ ts = t / s # scaled time spread parameter # also includes scale normalisation! B_c = self.k_sd / (s * self.c_i * (2 * np.pi) ** 1.5) output = B_c * np.exp(1j * ts) output *= np.exp(-0.5 * (self.k_sd * ts / (2 * np.pi * self.c_i)) ** 2) return output def fourier_period(scale): """ This is the approximate Morlet fourier period as used in the source publication of Moca et al. 2021 Note that w0 (central frequency) is always 1 in this Morlet formulation, hence the scales are not compatible to the standard Wavelet definitions! """ return 2 * np.pi * scale def scale_from_period(period): return period / (2 * np.pi) def cwtSL(data, wavelet, scales, dt): """ The continuous Wavelet transform specifically for Morlets with the Superlet formulation of Moca et al. 2021. Differences to :func:`~syncopy.specest.wavelets.transform.cwt_time`: - Morlet support gets adjusted by number of cycles - normalisation is with 1/(scale * 4pi) - this way the absolute value of the spectrum (modulus) at the corresponding harmonic frequency is the harmonic signal's amplitude Notes ----- The time axis is expected to be along the 1st dimension. """ # wavelets can be complex so output is complex output = np.zeros((len(scales),) + data.shape, dtype=np.complex64) # this checks if really a Superlet Wavelet is being used if not isinstance(wavelet, MorletSL): raise ValueError("Wavelet is not of MorletSL type!") # 1st axis is time slices = [None for _ in data.shape] slices[0] = slice(None) # compute in time for ind, scale in enumerate(scales): t = _get_superlet_support(scale, dt, wavelet.c_i) # sample wavelet and normalise norm = dt**0.5 / (4 * np.pi) wavelet_data = norm * wavelet(t, scale) # this is an 1d array for sure! # np.convolve only works if support is capped # at signal lengths, as its output has shape # max(len(data), len(wavelet_data) output[ind, :] = fftconvolve(data, wavelet_data[tuple(slices)], mode="same") return output def _get_superlet_support(scale, dt, cycles): """ Effective support for the convolution is here not only scale but also cycle dependent. """ # number of points needed to capture wavelet M = 10 * scale * cycles / dt # times to use, centred at zero t = np.arange((-M + 1) / 2.0, (M + 1) / 2.0) * dt return t def compute_adaptive_order(freq, order_min, order_max): """ Computes the superlet order for a given frequency of interest for the fractional adaptive SLT (FASLT) according to equation 7 of Moca et al. 2021. This is a simple linear mapping between the minimal and maximal order onto the respective minimal and maximal frequencies. Note that `freq` should be ordered low to high. """ f_min, f_max = freq[0], freq[-1] order = (order_max - order_min) * (freq - f_min) / (f_max - f_min) # return np.int32(order_min + np.rint(order)) return order_min + order