Source code for syncopy.tests.misc

# -*- coding: utf-8 -*-
#
# Helper methods for testing routines
#

# Builtin/3rd party package imports
import subprocess
import sys
import os
import h5py
import tempfile
import time
import numpy as np
import dask.distributed as dd

# Local imports
from syncopy.datatype import AnalogData
from syncopy.shared.filetypes import _data_classname_to_extension, FILE_EXT
from syncopy import __plt__, __acme__

if __plt__:
    import matplotlib.pyplot as plt
    from matplotlib.backends.backend_agg import FigureCanvasAgg
if __acme__:
    import dask.distributed as dd


def is_win_vm():
    """
    Returns `True` if code is running on virtual Windows machine, `False`
    otherwise
    """

    # If we're not running on Windows abort
    if sys.platform != "win32":
        return False

    # Use the windows management instrumentation command-line to extract machine manufacturer
    out, err = subprocess.Popen(
        "wmic computersystem get manufacturer",
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True,
        shell=True,
    ).communicate()

    # If the vendor name contains any "virtual"-flavor, we're probably running
    # in a VM - if the above command triggered an error, abort
    if len(err) == 0:
        vendor = out.split()[1].lower()
        vmlist = ["vmware", "virtual", "virtualbox", "vbox", "qemu"]
        return any([virtual in vendor for virtual in vmlist])
    else:
        return False


def is_slurm_node():
    """
    Returns `True` if code is running on a SLURM-managed cluster node, `False`
    otherwise
    """

    # Simply test if the srun command is available
    out, err = subprocess.Popen(
        "srun --version",
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True,
        shell=True,
    ).communicate()
    if len(out) > 0:
        return True
    else:
        return False


[docs]def generate_artificial_data( nTrials=2, nChannels=2, equidistant=True, seed=42, overlapping=False, inmemory=True, dimord="default", ): """ Create :class:`~syncopy.AnalogData` object with synthetic harmonic signal(s) Parameters ---------- nTrials : int Number of trials to populate synthetic data object with nChannels : int Number of channels to populate synthetic object with equidistant : bool If `True`, trials of equal length are defined seed : None or int If `None`, imposed noise is completely random. If `seed` is an integer, it is used to fix the (initial) state of NumPy's random number generator :func:`numpy.random.default_rng`, i.e., objects created wtih same `seed` will be populated with identical artificial signals. overlapping : bool If `True`, constructed trials overlap inmemory : bool If `True`, the full `data` array (all channels across all trials) is allocated in memory (fast but dangerous for large arrays), otherwise the output data object's corresponding backing HDF5 file in `__storage__` is filled with synthetic data in a trial-by-trial manner (slow but safe even for very large datasets). dimord : str or list If `dimord` is "default", the constructed output object uses the default dimensional layout of a standard :class:`~syncopy.AnalogData` object. If `dimord` is a list (i.e., ``["channel", "time"]``) the provided sequence of dimensions is used. Returns ------- out : :class:`~syncopy.AnalogData` object Syncopy :class:`~syncopy.AnalogData` object with specified properties populated with a synthetic multivariate trigonometric signal. Notes ----- This is an auxiliary method that is intended purely for internal use. Thus, no error checking is performed. Examples -------- Generate small artificial :class:`~syncopy.AnalogData` object in memory .. code-block:: python >>> iAmSmall = generate_artificial_data(nTrials=5, nChannels=10, inmemory=True) >>> iAmSmall Syncopy AnalogData object with fields cfg : dictionary with keys '' channel : [10] element <class 'numpy.ndarray'> container : None data : 5 trials of length 3000 defined on [15000 x 10] float32 Dataset of size 0.57 MB dimord : 2 element list filename : /Users/pantaray/.spy/spy_158f_4d4153e3.analog mode : r+ sampleinfo : [5 x 2] element <class 'numpy.ndarray'> samplerate : 1000.0 tag : None time : 5 element list trialinfo : [5 x 0] element <class 'numpy.ndarray'> trials : 5 element iterable Use `.log` to see object history Generate artificial :class:`~syncopy.AnalogData` object of more substantial size on disk .. code-block:: python >>> iAmBig = generate_artificial_data(nTrials=50, nChannels=1024, inmemory=False) >>> iAmBig Syncopy AnalogData object with fields cfg : dictionary with keys '' channel : [1024] element <class 'numpy.ndarray'> container : None data : 200 trials of length 3000 defined on [600000 x 1024] float32 Dataset of size 2.29 GB dimord : 2 element list filename : /Users/pantaray/.spy/spy_158f_b80715fe.analog mode : r+ sampleinfo : [200 x 2] element <class 'numpy.ndarray'> samplerate : 1000.0 tag : None time : 200 element list trialinfo : [200 x 0] element <class 'numpy.ndarray'> trials : 200 element iterable Use `.log` to see object history """ # Create dummy 1d signal that will be blown up to fill channels later dt = 0.001 t = np.arange(0, 3, dt, dtype="float32") - 1.0 sig = np.cos(2 * np.pi * (7 * (np.heaviside(t, 1) * t - 1) + 10) * t) # Depending on chosen `dimord` either get default position of time-axis # in `AnalogData` objects or use provided `dimord` and reshape signal accordingly if dimord == "default": dimord = AnalogData._defaultDimord timeAxis = dimord.index("time") idx = [1, 1] idx[timeAxis] = -1 sig = np.repeat(sig.reshape(*idx), axis=idx.index(1), repeats=nChannels) # Initialize random number generator (with possibly user-provided seed-value) rng = np.random.default_rng(seed) # Either construct the full data array in memory using tiling or create # an HDF5 container in `__storage__` and fill it trial-by-trial # NOTE: use `swapaxes` here to ensure two objects created w/same seed really # are affected w/identical additive noise patterns, no matter their respective # `dimord`. out = AnalogData(samplerate=1 / dt, dimord=dimord) if inmemory: idx[timeAxis] = nTrials sig = np.tile(sig, idx) shp = [slice(None), slice(None)] for iTrial in range(nTrials): shp[timeAxis] = slice(iTrial * t.size, (iTrial + 1) * t.size) noise = rng.standard_normal((t.size, nChannels)).astype(sig.dtype) * 0.5 sig[tuple(shp)] += np.swapaxes(noise, timeAxis, 0) out.data = sig else: with h5py.File(out.filename, "w") as h5f: shp = list(sig.shape) shp[timeAxis] *= nTrials dset = h5f.create_dataset("data", shape=tuple(shp), dtype=sig.dtype) shp = [slice(None), slice(None)] for iTrial in range(nTrials): shp[timeAxis] = slice(iTrial * t.size, (iTrial + 1) * t.size) noise = rng.standard_normal((t.size, nChannels)).astype(sig.dtype) * 0.5 dset[tuple(shp)] = sig + np.swapaxes(noise, timeAxis, 0) dset.flush() out.data = h5py.File(out.filename, "r+")["data"] # Define by-trial offsets to generate (non-)equidistant/(non-)overlapping trials trialdefinition = np.zeros((nTrials, 3), dtype="int") if equidistant: equiOffset = 0 if overlapping: equiOffset = 100 offsets = np.full((nTrials,), equiOffset, dtype=sig.dtype) else: offsets = rng.integers(low=int(0.1 * t.size), high=int(0.2 * t.size), size=(nTrials,)) # Using generated offsets, construct trialdef array and make sure initial # and end-samples are within data bounds (only relevant if overlapping # trials are built) shift = (-1) ** (not overlapping) for iTrial in range(nTrials): trialdefinition[iTrial, :] = np.array( [ iTrial * t.size - shift * offsets[iTrial], (iTrial + 1) * t.size + shift * offsets[iTrial], -1000, ] ) if equidistant: trialdefinition[0, :2] += equiOffset trialdefinition[-1, :2] -= equiOffset else: trialdefinition[0, 0] = 0 trialdefinition[-1, 1] = nTrials * t.size out.definetrial(trialdefinition) return out
def construct_spy_filename(basepath, obj): basename = os.path.split(basepath)[1] objext = _data_classname_to_extension(obj.__class__.__name__) return os.path.join(basepath + FILE_EXT["dir"], basename + objext) def figs_equal(fig1, fig2, tol=None): """ Test if two figures are identical Parameters ---------- fig1 : matplotlib figure object Reference figure fig2 : matplotlib figure object Template figure tol : float Positive scalar (b/w 0 and 1) specifying tolerance level for considering `fig1` and `fig2` identical. If `None`, two figures have to be exact pixel-perfect copies to be qualified as identical. Returns ------- equal : bool `True` if `fig1` and `fig2` are identical, `False` otherwise Notes ----- This is an auxiliary method that is intended purely for internal use. Thus, no error checking is performed. Examples -------- >>> import numpy as np >>> import matplotlib.pyplot as plt >>> x = np.linspace(0, 2*np.pi, 100) >>> fig1 = plt.figure(); plt.plot(x, np.sin(x)) >>> figs_equal(fig1, fig1) True >>> fig2 = plt.figure(); plt.plot(x, np.sin(x), color="red") >>> figs_equal(fig1, fig2) False >>> figs_equal(fig1, fig2, tol=0.9) True """ plt.draw_all(force=True) with tempfile.NamedTemporaryFile(suffix=".png", mode="w") as img1: with tempfile.NamedTemporaryFile(suffix=".png", mode="w") as img2: fig1.savefig(img1.name) fig2.savefig(img2.name) if tol is None: return np.array_equal(plt.imread(img1.name), plt.imread(img2.name)) return np.allclose(plt.imread(img1.name), plt.imread(img2.name), atol=tol) def flush_local_cluster(testcluster, timeout=10): """ Resets a parallel computing client to avoid memory spilling """ if isinstance(testcluster, dd.LocalCluster): # client.restart() client = dd.get_client() client.close() time.sleep(1.0) client = dd.Client(testcluster) waiting = 0 while ( len([w["memory_limit"] for w in testcluster.scheduler_info["workers"].values()]) == 0 and waiting < timeout ): time.sleep(1.0) waiting += 1 return