Source code for syncopy.datatype.methods.selectdata

# -*- coding: utf-8 -*-
#
# Syncopy data selection methods
#

# Builtin/3rd party package imports
import numpy as np
import h5py

# Local imports
import syncopy as spy
from syncopy.shared.tools import get_frontend_cfg, get_defaults
from syncopy.shared.parsers import data_parser
from syncopy.shared.errors import SPYValueError, SPYTypeError, SPYInfo, log
from syncopy.shared.kwarg_decorators import (
    unwrap_cfg,
    process_io,
    detect_parallel_client,
)
from syncopy.shared.computational_routine import ComputationalRoutine
from syncopy.shared.latency import get_analysis_window, create_trial_selection

__all__ = ["selectdata"]


[docs]@unwrap_cfg @detect_parallel_client def selectdata( data, trials=None, channel=None, channel_i=None, channel_j=None, latency=None, frequency=None, taper=None, unit=None, eventid=None, inplace=False, clear=False, **kwargs, ): """ Create a new Syncopy object from a selection **Usage Notice** Syncopy offers two modes for selecting data: * **in-place** selections mark subsets of a Syncopy data object for processing via a ``select`` dictionary *without* creating a new object * **deep-copy** selections copy subsets of a Syncopy data object to keep and preserve in a new object created by :func:`~syncopy.selectdata` All Syncopy metafunctions, such as :func:`~syncopy.freqanalysis`, support **in-place** data selection via a ``select`` keyword, effectively avoiding potentially slow copy operations and saving disk space. The keys accepted by the `select` dictionary are identical to the keyword arguments discussed below. In addition, ``select = "all"`` can be used to select entire object contents. Examples >>> select = {"toilim" : [-0.25, 0]} >>> spy.freqanalysis(data, select=select) >>> # or equivalently >>> cfg = spy.get_defaults(spy.freqanalysis) >>> cfg.select = select >>> spy.freqanalysis(cfg, data) **Usage Summary** List of Syncopy data objects and respective valid data selectors: :class:`~syncopy.AnalogData` : trials, channel, toi/toilim Examples >>> spy.selectdata(data, trials=[0, 3, 5], channel=["channel01", "channel02"]) >>> cfg = spy.StructDict() >>> cfg.trials = [5, 3, 0]; cfg.toilim = [0.25, 0.5] >>> spy.selectdata(cfg, data) :class:`~syncopy.SpectralData` : trials, channel, toi/toilim, foi/foilim, taper Examples >>> spy.selectdata(data, trials=[0, 3, 5], channel=["channel01", "channel02"]) >>> cfg = spy.StructDict() >>> cfg.foi = [30, 40, 50]; cfg.taper = slice(2, 4) >>> spy.selectdata(cfg, data) :class:`~syncopy.EventData` : trials, toi/toilim, eventid Examples >>> spy.selectdata(data, toilim=[-1, 2.5], eventid=[0, 1]) >>> cfg = spy.StructDict() >>> cfg.trials = [0, 0, 1, 0]; cfg.eventid = slice(2, None) >>> spy.selectdata(cfg, data) :class:`~syncopy.SpikeData` : trials, toi/toilim, unit, channel Examples >>> spy.selectdata(data, toilim=[-1, 2.5], unit=range(0, 10)) >>> cfg = spy.StructDict() >>> cfg.toi = [1.25, 3.2]; cfg.trials = [0, 1, 2, 3] >>> spy.selectdata(cfg, data) **Note** Any property that is not specifically accessed via one of the provided selectors is taken as is, e.g., ``spy.selectdata(data, trials=[1, 2])`` selects the entire contents of trials no. 2 and 3, while ``spy.selectdata(data, channel=range(0, 50))`` selects the first 50 channels of `data` across all defined trials. Consequently, if no keywords are specified, the entire contents of `data` is selected. **Full documentation below** Parameters ---------- data : Syncopy data object A non-empty Syncopy data object. **Note** the type of `data` determines which keywords can be used. Some keywords are only valid for certain types of Syncopy objects, e.g., "freqs" is not a valid selector for an :class:`~syncopy.AnalogData` object. trials : list (integers) or None or "all" List of integers representing trial numbers to be selected; can include repetitions and need not be sorted (e.g., ``trials = [0, 1, 0, 0, 2]`` is valid) but must be finite and not NaN. If `trials` is `None`, or ``trials = "all"`` all trials are selected. channel : list (integers or strings), slice, range, str, int, None or "all" Channel-selection; can be a list of channel names (``['channel3', 'channel1']``), a list of channel indices (``[3, 5]``), a slice (``slice(3, 10)``) or range (``range(3, 10)``). Note that following Python conventions, channels are counted starting at zero, and range and slice selections are half-open intervals of the form `[low, high)`, i.e., low is included , high is excluded. Thus, ``channel = [0, 1, 2]`` or ``channel = slice(0, 3)`` selects the first up to (and including) the third channel. Selections can be unsorted and may include repetitions but must match exactly, be finite and not NaN. If `channel` is `None`, or ``channel = "all"`` all channels are selected. latency : [begin, end], {'maxperiod', 'minperiod', 'prestim', 'poststim', 'all'} or None Either set desired time window (`[begin, end]`) in seconds, 'maxperiod' (default) for the maximum period available or `'minperiod' for minimal time-window all trials share, or `'prestim'` (all t < 0) or `'poststim'` (all t > 0) If set this will apply a selection which is timelocked, meaning non-fitting (effectively too short) trials will be excluded frequency : list (floats [fmin, fmax]) or None or "all" Frequency-window ``[fmin, fmax]`` (in Hz) to be extracted. Window specifications must be sorted (e.g., ``[90, 70]`` is invalid) and not NaN but may be unbounded (e.g., ``[-np.inf, 60.5]`` is valid). Edges `fmin` and `fmax` are included in the selection. If `foilim` is `None` or ``foilim = "all"``, all frequencies are selected. taper : list (integers or strings), slice, range, str, int, None or "all" Taper-selection; can be a list of taper names (``['dpss-win-1', 'dpss-win-3']``), a list of taper indices (``[3, 5]``), a slice (``slice(3, 10)``) or range (``range(3, 10)``). Note that following Python conventions, tapers are counted starting at zero, and range and slice selections are half-open intervals of the form `[low, high)`, i.e., low is included , high is excluded. Thus, ``taper = [0, 1, 2]`` or ``taper = slice(0, 3)`` selects the first up to (and including) the third taper. Selections can be unsorted and may include repetitions but must match exactly, be finite and not NaN. If `taper` is `None` or ``taper = "all"``, all tapers are selected. unit : list (integers or strings), slice, range, str, int, None or "all" Unit-selection; can be a list of unit names (``['unit10', 'unit3']``), a list of unit indices (``[3, 5]``), a slice (``slice(3, 10)``) or range (``range(3, 10)``). Note that following Python conventions, units are counted starting at zero, and range and slice selections are half-open intervals of the form `[low, high)`, i.e., low is included , high is excluded. Thus, ``unit = [0, 1, 2]`` or ``unit = slice(0, 3)`` selects the first up to (and including) the third unit. Selections can be unsorted and may include repetitions but must match exactly, be finite and not NaN. If `unit` is `None` or ``unit = "all"``, all units are selected. eventid : list (integers), slice, range, int, None or "all" Event-ID-selection; can be a list of event-id codes (``[2, 0, 1]``), slice (``slice(0, 2)``) or range (``range(0, 2)``). Note that following Python conventions, range and slice selections are half-open intervals of the form `[low, high)`, i.e., low is included , high is excluded. Selections can be unsorted and may include repetitions but must match exactly, be finite and not NaN. If `eventid` is `None` or ``eventid = "all"``, all events are selected. inplace : bool If `inplace` is `True` **no** new object is created. Instead the provided selection is stored in the input object's `selection` attribute for later use. By default `inplace` is `False` and all calls to `selectdata` create a new Syncopy data object. clear : bool If `True` remove any active in-place selection. Note that in-place selections can also be removed manually by assinging `None` to the `selection` property, i.e., ``mydata.selection = None`` is equivalent to ``spy.selectdata(mydata, clear=True)`` or ``mydata.selectdata(clear=True)`` Returns ------- dataselection : Syncopy data object Syncopy data object of the same type as `data` but containing only the subset specified by provided selectors. Notes ----- This routine represents a convenience function for creating new Syncopy objects based on existing data entities. However, in many situations, the creation of a new object (and thus the allocation of additional disk-space) might not be necessary: all Syncopy metafunctions, such as :func:`~syncopy.freqanalysis`, support **in-place** data selection. Consider the following example: assume `data` is an :class:`~syncopy.AnalogData` object representing 220 trials of LFP recordings containing baseline (between second -0.25 and 0) and stimulus-on data (on the interval [0.25, 0.5]). To compute the baseline spectrum, data-selection does **not** have to be performed before calling :func:`~syncopy.freqanalysis` but instead can be done in-place: >>> import syncopy as spy >>> cfg = spy.get_defaults(spy.freqanalysis) >>> cfg.method = 'mtmfft' >>> cfg.taper = 'dpss' >>> cfg.output = 'pow' >>> cfg.tapsmofrq = 10 >>> # define baseline/stimulus-on ranges >>> baseSelect = {"toilim": [-0.25, 0]} >>> stimSelect = {"toilim": [0.25, 0.5]} >>> # in-place selection of baseline interval performed by `freqanalysis` >>> cfg.select = baseSelect >>> baselineSpectrum = spy.freqanalysis(cfg, data) >>> # in-place selection of stimulus-on time-frame performed by `freqanalysis` >>> cfg.select = stimSelect >>> stimonSpectrum = spy.freqanalysis(cfg, data) Especially for large data-sets, in-place data selection performed by Syncopy's metafunctions does not only save disk-space but can significantly increase performance. Examples -------- Use :func:`~syncopy.tests.misc.generate_artificial_data` to create a synthetic :class:`syncopy.AnalogData` object. >>> from syncopy.tests.misc import generate_artificial_data >>> adata = generate_artificial_data(nTrials=10, nChannels=32) Assume a hypothetical trial onset at second 2.0 with the first second of each trial representing baseline recordings. To extract only the stimulus-on period from `adata`, one could use >>> stimon = spy.selectdata(adata, toilim=[2.0, np.inf]) Note that this is equivalent to >>> stimon = adata.selectdata(toilim=[2.0, np.inf]) See also -------- :func:`syncopy.show` : Show (subsets) of Syncopy objects """ # Ensure our one mandatory input is usable data_parser(data, varname="data", empty=False) # Vet the only inputs not checked by `Selector` if not isinstance(inplace, bool): raise SPYTypeError(inplace, varname="inplace", expected="Boolean") if not isinstance(clear, bool): raise SPYTypeError(clear, varname="clear", expected="Boolean") # there is no `@unwrap_select` decorator in place here, # a `select` dictionary must therefore be directly passed via ** unpacking: # select = {'channel': [0]}; spy.selectdata(data, **select) if "select" in kwargs: lgl = "unpacked selection keywords directly, try `**select`" act = "`select` as explicit parameter" raise SPYValueError(legal=lgl, varname="selection kwargs", actual=act) # get input arguments into cfg dict new_cfg = get_frontend_cfg(get_defaults(selectdata), locals(), kwargs) if not inplace: out = data.__class__(dimord=data.dimord) # First collect all available keyword values into a dict selectDict = { "trials": trials, "channel": channel, "channel_i": channel_i, "channel_j": channel_j, "latency": latency, "frequency": frequency, "taper": taper, "unit": unit, "eventid": eventid, } # relevant selection keywords for the type of `data` expected = list(data._selectionKeyWords) # filter out typos like 'trails' if len(kwargs) > 0: kwargs.pop("parallel", None) if any([key not in expected for key in kwargs]): lgl = ( f"the following keywords for {data.__class__.__name__}: '" + "'".join(opt + "', " for opt in expected)[:-2] ) lgl += " and 'inplace', 'clear', 'parallel'" act = "dict with keys '" + "'".join(key + "', " for key in kwargs.keys())[:-2] raise SPYValueError(legal=lgl, varname="selection kwargs", actual=act) # get out if unsuitable selection keywords given, e.g. 'frequency' for AnalogData for key, value in selectDict.items(): if key not in expected and value is not None: lgl = f"one of {data.__class__._selectionKeyWords}" act = f"no `{key}` selection available for {data.__class__.__name__}" raise SPYValueError(lgl, "selection arguments", act) # now just keep going with the selection keys relevant for that particular data type selectDict = {key: selectDict[key] for key in data._selectionKeyWords} # First simplest case: determine whether we just need to clear an existing selection if clear: if any(value is not None for value in selectDict.values()): lgl = "no data selectors if `clear = True`" raise SPYValueError(lgl, varname="select", actual=selectDict) if data.selection is None: SPYInfo("No in-place selection found. ") else: data.selection = None SPYInfo("In-place selection cleared") return # first do a selection without latency as a possible subselection # of trials needs to be applied before the latency digesting functions # can be called (if the user by himself throws out non-fitting trials) selectDict.pop("latency") # Pass provided selections on to `Selector` class which performs error checking # this is an in-place selection! data.selection = selectDict # -- sort out trials if latency is set -- if latency is not None: if not isinstance(latency, str) or latency != "all": # sanity check done here, converts str arguments # ('maxperiod' and so on) into time window [start, end] of analysis window = get_analysis_window(data, latency) # this respects active inplace selections and # might update the trial selection to exclude non-fitting trials selectDict, numDiscard = create_trial_selection(data, window) if numDiscard > 0: msg = f"Discarded {numDiscard} trial(s) which did not fit into latency window" SPYInfo(msg) # update inplace selection selectDict["latency"] = window data.selection = selectDict # If an in-place selection was requested we're done. if inplace: # attach frontend parameters for replay data.cfg.update({"selectdata": new_cfg}) return # Inform the user what's about to happen selectionSize = _get_selection_size(data) if selectionSize > 1000: selectionSize /= 1024 sUnit = "GB" msg = ( "Copying {dsize:3.2f} {dunit:s} of data based on selection " + "to create new {objkind:s} object on disk" ) SPYInfo(msg.format(dsize=selectionSize, dunit=sUnit, objkind=data.__class__.__name__)) # Create inventory of all available selectors and actually provided values # to create a bookkeeping dict for logging log_dct = {"inplace": inplace, "clear": clear, "latency": latency} log_dct.update(selectDict) log_dct.update(**kwargs) # Fire up `ComputationalRoutine`-subclass to do the actual selecting/copying selectMethod = DataSelection() selectMethod.initialize(data, out._stackingDim, chan_per_worker=kwargs.get("chan_per_worker")) selectMethod.compute(data, out, parallel=kwargs.get("parallel"), log_dict=log_dct) # Handle selection of waveform for SpikeData objects if type(data) == spy.SpikeData and data.waveform is not None: if inplace: spy.log( "Inplace selection of SpikeData with waveform not supported for the waveform.", level="WARNING", ) else: fauxTrials = [data._preview_trial(trlno) for trlno in data.selection.trial_ids] spikes_by_trial = [f.idx[0] for f in fauxTrials] spike_idx = np.concatenate([np.array(x).ravel() for x in spikes_by_trial]) # Copy the proper subset of the waveform dataset to `out`, the new `SpikeData` object. hdf5_file_in = data._get_backing_hdf5_file_handle() hdf5_file_out = out._get_backing_hdf5_file_handle() # Copy the waveform dataset into the new file, trial by trial to prevent memory issues. ds = hdf5_file_out.create_dataset( "waveform", shape=(len(spike_idx), *data.waveform.shape[1:]), dtype=data.waveform.dtype, ) cur_new_idx = 0 for tidx, old_trial_indices in enumerate(spikes_by_trial): num_spikes_this_trial = len(old_trial_indices) new_indices = np.s_[cur_new_idx : cur_new_idx + num_spikes_this_trial] ds[new_indices, :, :] = hdf5_file_in["/waveform"][old_trial_indices, :, :] cur_new_idx = new_indices.stop out.waveform = ds # Wipe data-selection slot to not alter input object data.selection = None # attach cfg out.cfg.update(data.cfg) out.cfg.update({"selectdata": new_cfg}) # return newly created output object return out
def _get_selection_size(data): """ Local helper routine for computing the on-disk size of an active data-selection """ fauxTrials = [data._preview_trial(trlno) for trlno in data.selection.trial_ids] fauxSizes = [np.prod(ftrl.shape) * ftrl.dtype.itemsize for ftrl in fauxTrials] return sum(fauxSizes) / 1024**2 @process_io def _selectdata(trl, noCompute=False, chunkShape=None): if noCompute: return trl.shape, trl.dtype return trl class DataSelection(ComputationalRoutine): computeFunction = staticmethod(_selectdata) def process_metadata(self, data, out): # Get/set timing-related selection modifiers out.trialdefinition = data.selection.trialdefinition # if data.selection._timeShuffle: # FIXME: should be implemented down the road # out.time = data.selection.timepoints if data.selection._samplerate: out.samplerate = data.samplerate # Get/set dimensional attributes changed by selection for prop in data.selection._dimProps: selection = getattr(data.selection, prop) if selection is not None: if np.issubdtype(type(selection), np.number): selection = [selection] setattr(out, prop, getattr(data, prop)[selection])