Design Guide: Syncopy Compute Classes#
A compute class represents the centerpiece of a Syncopy analysis routine.
The abstract base class
ComputationalRoutine is the concrete
realization of a general-purpose computing object. This class provides a
blueprint for implementing algorithmic strategies in Syncopy. Every
computational method in Syncopy consists of a core routine, the
which can be executed either sequentially or fully parallel. To unify
common instruction sequences and minimize code redundancy, Syncopy’s
ComputationalRoutine manages all pre- and post-processing steps
necessary during preparation and after termination of a calculation. This
permits developers to focus exclusively on the implementation of the actual
algorithmic details when including a new computational method in Syncopy.
ComputationalRoutine to perform all required
computational management tasks, a
computeFunction() has to satisfy a few basic
requirements. Syncopy leverages a hierarchical parallelization paradigm
whose low-level foundation is represented by trial-based parallelism (its
open-ended higher levels may constitute by-object, by-experiment or
by-session parallelization). Thus, with
computeFunction() representing the
computational core of an (arbitrarily complex) superseding algorithm, it
has to be structured to support trial-based parallel computing.
Specifically, this means the scope of work of a
computeFunction() is a single
trial. Note that this also implies that any parallelism integrated in
computeFunction() has to be designed with higher-level parallel execution in mind
(e.g., concurrent processing of sessions on top of trials).
computeFunction() is a regular stand-alone Python function (not a
class method) that accepts a
numpy.ndarray as its first positional
argument and supports (at least) the two keyword arguments chunkShape and
numpy.ndarray represents aggregate data from one
trial (only data, no meta-information). Any required meta-info (such as
channel labels, trial definition records etc.) has to be passed to
either as additional (2nd an onward) positional or named keyword arguments
(chunkShape and noCompute are the only reserved keywords).
The return values of
computeFunction() are controlled by the noCompute keyword. In
computeFunction() returns exactly one
numpy.ndarray representing the
result of processing data from a single trial. The noCompute keyword is
used to perform a ‘dry-run’ of the processing operations to propagate the
expected numerical type and memory footprint of the result to
ComputationalRoutine without actually performing any
calculations. To optimize performance,
the information gathered in the dry-runs for each trial to allocate
identically-sized array-blocks accommodating the largest (by shape)
result-array across all trials. In this manner a global block-size is
identified, which can subsequently be accessed inside
computeFunction() via the
chunkShape keyword during the actual computation.
Summarized, a valid
computeFunction(), cfunc, meets the following basic requirements:
>>> def cfunc(arr, arg1, arg2, ..., argN, chunkShape=None, noCompute=None, **kwargs)
where arr is a
numpy.ndarrayrepresenting trial data, arg1, …, argN are arbitrary positional arguments and chunkShape (a tuple if not None) as well as noCompute (bool if not None) are reserved keywords.
During the dry-run phase, i.e., if noCompute is True, the expected output shape and its
numpy.dtypeare returned, otherwise the result of the computation (a
numpy.ndarray) is returned:
>>> def cfunc(arr, arg1, arg2, ..., argN, chunkShape=None, noCompute=None, **kwargs) >>> # determine expected output shape and numerical type... >>> if noCompute: >>> return outShape, outdtype >>> # the actual computation is happening here... >>> return res
Note that dtype and shape of res have to agree with outShape and outdtype specified in the dry-run.
A simple instance of a
computeFunction() illustrating these concepts
is given in Examples.
The Algorithmic Layout of
ComputationalRoutine wraps an external
computeFunction() by executing all necessary auxiliary routines leading up to and post
termination of the actual computation (memory pre-allocation, generation of
parallel/sequential instruction trees, processing and storage of results,
ComputationalRoutine is an abstract base
class that can represent any trial-concurrent computational tree. Thus, any
arbitrarily complex algorithmic pattern satisfying this single criterion
can be incorporated as a regular class into Syncopy with minimal
implementation effort by simply inheriting from
Internally, the operational principle of a
is encapsulated in two class methods:
The class is instantiated with (at least) the positional and keyword arguments of the associated
computeFunction()minus the trial-data array (the the first positional argument of
computeFunction()) and the reserved keywords chunkShape and noCompute. Further, an additional keyword is reserved at class instantiation time: keeptrials controls whether data is averaged across trials after calculation (
keeptrials = False). Thus, let Algo be a concrete subclass of
ComputationalRoutine, and let cfunc, defined akin to above
>>> def cfunc(arr, arg1, arg2, argN, chunkShape=None, noCompute=None, kwarg1="this", kwarg2=False)
be its corresponding
computeFunction(). Then a valid instantiation of Algo may look as follows:
>>> algorithm = Algo(arg1, arg1, arg2, argN, kwarg1="this", kwarg2=False)
Now algorithm is a regular Python class instance that inherits all required attributes from the parent base class
ComputationalRoutineuses regular Python class attributes (
__dict__keys, not slots) to ensure maximal design flexibility for implementing novel computational strategies while keeping memory overhead limited due to the encapsulation of the actual computational workload in the static method
Before the algorithm instance of Algo can be used, a dry-run of the actual computation has to be performed to determine the expected dimensionality and numerical type of the result,
where data is a Syncopy data object representing the input quantity to be processed by algorithm.
This management method constitutes the functional core of
ComputationalRoutine. It handles memory pre-allocation, storage provisioning, the actual computation and processing of meta-information. Theses tasks are encapsulated in distinct class methods which are designed to perform the respective operations independently from the concrete computational procedure. Thus, most of these methods do not require any problem-specific adaptions and act as stand-alone administration routines. The only exception to this design-concept is
process_metadata(), which is intended to attach meta-information to the final output object. Since modifications of meta-data are highly dependent on the nature of the performed calculation,
process_metadata()is the only abstract method of
ComputationalRoutinethat needs to be supplied in addition to
Several keywords control the workflow in
Depending on the parallel keyword, processing is done either sequentially trial by trial (
parallel = False) or concurrently across all trials (if parallel is True). The two scenarios are handled by separate class methods,
compute_parallel(), respectively, that use independent operational frameworks for processing. However, both
compute_parallel()call an external
computeFunction()to perform the actual calculation.
The parallel_store keyword controls the employed storage mechanism: if True, the result of the computation is written in a fully concurrent manner where each worker saves its locally held data segment on disk leveraging the distributed access capabilities of virtual HDF5 datasets. If
parallel_store = False, and parallel is True, a mutex is used to lock a single HDF5 file for sequential writing. If
parallel = parallel_storeand parallel is False, the computation result is saved using standard single-process HDF writing.
The method keyword can be used to override the default selection of the processing function (
compute_parallel()if parallel is True or
compute_sequential()otherwise). Refer to the docstrings of
compute_sequential()for details on the required structure of a concurrent or serial processing function.
The keyword log_dict can be used to provide a dictionary of keyword-value pairs that are passed on to
process_metadata()to be attached to the final output object.
Going back to the exemplary algorithm instance of Algo discussed above, after initialization, the actual computation is kicked off with a single call of
compute()with keywords pursuant to the intended computational workflow. For instance,
>>> algorithm.compute(data, out, parallel=True)
launches the parallel processing of data using the computational scheme implemented in cfunc and stores the result in the Syncopy object out.
To further clarify these concepts, Examples illustrates how to
encapsulate a simple algorithmic scheme in a subclass of
ComputationalRoutine that calls a custom
Consider the following example illustrating the implementation of a
(deliberately simple) filtering routine by subclassing
ComputationalRoutine and designing a
As a first step, a
computeFunction() is defined:
>>> import numpy as np >>> from scipy import signal >>> def lowpass(arr, b, a, noCompute=None, chunkShape=None): >>> if noCompute: >>> return arr.shape, arr.dtype >>> res = signal.filtfilt(b, a, arr.T, padlen=200).T >>> return res
As detailed above, the first positional argument of lowpass is a
numpy.ndarray representing numerical data from a single trial, the
second and third positional arguments, b and a respectively, represent
filter coefficients. The only keyword arguments of lowpass are the
mandatory reserved keywords noCompute and chunkShape. With the
in place, a subclass of
ComputationalRoutine can be implemented:
>>> from syncopy.shared.computational_routine import ComputationalRoutine >>> class LowPassFilter(ComputationalRoutine): >>> computeFunction = staticmethod(lowpass) >>> >>> def process_metadata(self, data, out): >>> if self.keeptrials: >>> out.sampleinfo = np.array(data.sampleinfo) >>> out.trialinfo = np.array(data.trialinfo) >>> out._t0 = np.zeros((len(data.trials),)) >>> else: >>> trl = np.array([[0, data.sampleinfo[0, 1], 0]]) >>> out.sampleinfo = trl[:, :2] >>> out._t0 = trl[:, 2] >>> out.trialinfo = trl[:, 3:] >>> out.samplerate = data.samplerate >>> out.channel = np.array(data.channel)
Note that LowPassFilter simply binds the
computeFunction() lowpass as static
method - no additional modifications are required. It further provides
process_metadata as regular class method for setting all required
attributes of the output object out.
Suppose data is a Syncopy
AnalogData object holding data to be
filtered. To use the introduced filtering routine, the concrete class
LowPassFilter has to be instantiated first:
>>> myfilter = LowPassFilter(b, a)
This step performs the actual class initialization and allocates the
ComputationalRoutine. Next, all necessary
pre-calculation management tasks need to be performed:
Now, the myfilter instance holds references to the expected shape of the resulting output and its numerical type. The actual filtering is then performed by first allocating an empty Syncopy object for the result
>>> out = spy.AnalogData()
and subsequently invoking
>>> myfilter.compute(data, out)
This call performs several tasks: first, an HDF5 data-set of appropriate dimensions is allocated, then the actual filtering is performed sequentially, in a trial-by-trial succession (results are stored in the created HDF5 data-set, which is subsequently attached to out), and finally meta-data is written to the output object out using the supplied process_metadata class method.
To perform the calculation in a trial-concurrent manner, first launch a
dask client (using e.g.,
the myfilter instance (to reset its attributes) and simply call compute
with the parallel keyword set to True:
>>> client = spy.esi_cluster_setup() >>> myfilter.initialize(data) >>> myfilter.compute(data, out, parallel=True)
For realizing more complex mechanisms, consult the implementations of
syncopy.freqanalysis() or other metafunctions in Syncopy.
Additional return values for backend and compute functions#
Description and Overview#
One can pass a 2nd return value (a dict) from the cF functions, which will be attached automatically and temporarily to the (virtual or non-virtual) hdf5 files used by the compute backend (cF + process_io wrapper) to store results. The returned dict is subject to various limitations of hdf5 container ‘attributes’, including:
keys must be strings
values must be ndarrays with dtype != object, and size not exceeding 64k of data.
Note that your backend function can return whatever it wants as a 2nd return value, as long as you adapt/encode this in the cF to a dict following the rules mentioned above.
The return value will get attached to the hdf5 file(s) used to store computation results, and can later be extracted from the hdf5 file(s) in process_metadata.
Using the 2nd return value when writing a cF#
Users (as in: developers writing compute functions) are supposed to do the following:
Have their backend function return a 2nd argument (typically a dict, but this is not required)
In the cF, accept the 2nd argument returned from the backend function and remove or encode parts of the return value and put it into a dict. All entries in the dict must comply with the hdf5 attribute rules mentioned above. Then pass this modified return value on (i.e., return the modified dict as a 2nd return value of the cF).
Automatically: now the dict items get added to the hdf5 file as attributes, attached to a new hdf5 group called metadata, that is temporarily added to the hdf5 container(s). (Note that there may exist a single hdf5 container in the case of sequential storage, or several containers in case of parallel storage using virtual datasets. You do not need to worry about this.)
In your process_metadata implementation, (with signature: def process_metadata(self, data, out):), call my_metadata = metadata_from_hdf5_file(out.filename) to obtain the metadata as a dictionary. (Note: By default, this will remove the added data from the hdf5 file(s) after retrieving it.)
- Do whatever you want with the metadata in process_metadata, e.g.:
Check the return value for things indicating that things went wrong in the backend/cF, and raise exceptions or print warnings accordingly
To recompute absolute trial indices from relative ones (if a selection was active in the input data, and that makes sense for your cF), you can optionally call my_metadata_absind = metadata_trial_indices_abs(my_metadata, data.selection) on the collected metadata.
If you did some special encoding in the cF to fit the data into the dict with the hdf5 ‘attribute’ limitations, you may want to undo this.
To pass it to the frontend/user, one could add it to the info property or the log of out, or attach it to the syncopy data instance out as a new attribute. Keep in mind that such an attribute will not be saved in that case, but the user calling the frontend will have access to it.