# -*- coding: utf-8 -*-
#
# Decorators for Syncopy metafunctions and `computeFunction`s
#
# Builtin/3rd party package imports
import functools
import h5py
import inspect
import numpy as np
import dask.distributed as dd
import syncopy as spy
from syncopy.shared.errors import (
SPYTypeError,
SPYValueError,
SPYError,
SPYWarning,
SPYInfo,
)
from syncopy.shared.tools import StructDict
from syncopy.shared.metadata import h5_add_metadata, parse_cF_returns
# Local imports
from .dask_helpers import check_slurm_available, check_workers_available
from .log import get_logger
__all__ = []
[docs]def unwrap_cfg(func):
"""
Decorator that unwraps `cfg` "structure" in metafunction call
Parameters
----------
func : callable
Typically a Syncopy metafunction such as :func:`~syncopy.freqanalysis`
Returns
-------
wrapper_cfg : callable
Wrapped function; `wrapper_cfg` extracts keyword arguments from a possibly
provided `cfg` option "structure" based on the following logic:
1. Probe positional argument list of `func` for regular Python dict or
:class:`~syncopy.StructDict`. *Every hit* is assumed to be a `cfg` option
"structure" and removed from the list. Raises a
:class:`~syncopy.shared.errors.SPYValueError` if (a) more than one
dict (or :class:`~syncopy.StructDict`) is found in provided positional
arguments (b) keywords are provided in addition to `cfg` (c) `cfg` is
provided as positional as well as keyword argument.
2. If no `cfg` is found in positional arguments, check `func`'s keyword
arguments for a provided `cfg` entry. Raises a
:class:`~syncopy.shared.errors.SPYValueError` if `cfg` was provided
as positional argument as well as keyword.
A :class:`~syncopy.shared.errors.SPYTypeError` if `cfg` keyword
entry is not a Python dict or :class:`~syncopy.StructDict`.
3. If `cfg` was found either in positional or keyword arguments, then
(a) process its "linguistic" boolean keys (convert any "yes"/"no" entries
to `True` /`False`) and then (b) extract any existing "data"/"dataset"
entry/entries. Raises a :class:`~syncopy.shared.errors.SPYValueError`
if `cfg` contains both a "data" and "dataset" entry.
4. Perform the actual unwrapping: at this point, a provided `cfg` only
contains keyword arguments of `func`. If the (first) input object `data`
was provided as `cfg` entry, it already exists in the local namespace.
If not, then by convention, `data` is the first element of the
(remaining) positional argument list. Thus, the metafunction can now
be called via ``func(data, *args, **kwargs)``.
5. Amend the docstring of `func`: add a one-liner mentioning the possibility
of using `cfg` when calling `func` to the header of its docstring.
Append a paragraph to the docstrings' "Notes" section illustrating
how to call `func` with a `cfg` option "structure" that specifically
uses `func` and its input parameters. Note: both amendments are only
inserted in `func`'s docstring if the respective sections already exist.
Notes
-----
This decorator is primarily intended as bookkeeper for Syncopy metafunctions.
It permits "FieldTrip-style" calls of Syncopy metafunctions by "Pythonizing"
(processing and subsequent unpacking) of dict-like `cfg` objects. This
standardization allows all other Syncopy decorators (refer to See also section)
to safely use standard Python ``*args`` and ``**kwargs`` input arguments.
Supported call signatures:
* ``func(cfg, data)``: `cfg` exclusively contains keyword arguments of `func`,
`data` is a Syncopy data object.
* ``func(data, cfg)``: same as above
* ``func(data, cfg=cfg)``: same as above, but `cfg` itself is provided as
keyword argument
* ``func(cfg)``: `cfg` contains a field `data` or `dataset` (not both!)
holding one or more Syncopy data objects used as input of `func`
* ``func(cfg=cfg)``: same as above with `cfg` being provided as keyword
* ``func(data, kw1=val1, kw2=val2)``: standard Python call style with keywords
being provided explicitly
* ``func(data, cfg, kw2=val2)``: valid if `cfg` does NOT contain `'kw2'`
Invalid call signatures:
* ``func(data, cfg, cfg=cfg)``: `cfg` must not be provided as positional and
keyword argument
* ``func(cfg, {})``: every dict in `func`'s positional argument list is interpreted
as `cfg` "structure"
* ``func(data, cfg=value)``: `cfg` must be a Python dict or :class:`~syncopy.StructDict`
* ``func(data, cfg, kw1=val1)``: invalid if keyword `'kw1'` also appears in `cfg`
See also
--------
unwrap_select : extract `select` keyword and process in-place data-selections
process_io : set up
:meth:`~syncopy.shared.computational_routine.ComputationalRoutine.computeFunction`-calls
based on parallel processing setup
detect_parallel_client : controls parallel processing engine via `parallel` keyword
_append_docstring : local helper for manipulating docstrings
_append_signature : local helper for manipulating function signatures
"""
# Perform a little introspection gymnastics to get the name of the first
# positional and keyword argument of `func` (if we only find anonymous `**kwargs`,
# come up with an exemplary keyword - `kwarg0` is only used in the generated docstring)
funcParams = inspect.signature(func).parameters
paramList = list(funcParams)
kwargList = [pName for pName, pVal in funcParams.items() if pVal.default != pVal.empty]
arg0 = paramList[0]
if len(kwargList) > 0:
kwarg0 = kwargList[0]
else:
kwarg0 = "some_parameter"
@functools.wraps(func)
def wrapper_cfg(*args, **kwargs):
# First, parse positional arguments for dict-type inputs (`k` counts the
# no. of dicts provided) and convert tuple of positional args to list
cfg = None
k = 0
args = list(args)
for argidx, arg in enumerate(args):
if isinstance(arg, dict):
cfgidx = argidx
k += 1
# If a dict was found, assume it's a `cfg` dict and extract it from
# the positional argument list; if more than one dict was found, abort
if k == 1:
cfg = args.pop(cfgidx)
elif k > 1:
raise SPYValueError(
legal="single `cfg` input",
varname="cfg",
actual="{0:d} `cfg` objects in input arguments".format(k),
)
# Now parse provided keywords for `cfg` entry - if `cfg` was already
# provided as positional argument, abort
if kwargs.get("cfg") is not None:
if cfg:
lgl = "`cfg` either as positional or keyword argument, not both"
raise SPYValueError(legal=lgl, varname="cfg")
cfg = kwargs.pop("cfg")
# If `cfg` was detected either in positional or keyword arguments, process it
if cfg:
# If `cfg` is not dict-like, abort (`StructDict` is a `dict` child)
if not isinstance(cfg, dict):
raise SPYTypeError(cfg, varname="cfg", expected="dictionary-like")
# check if we have saved pre-sets (replay a frontend run via out.cfg)
if func.__name__ in cfg.keys():
cfg = StructDict(cfg[func.__name__])
# IMPORTANT: create a copy of `cfg` using `StructDict` constructor to
# not manipulate `cfg` in user's namespace!
cfg = StructDict(cfg)
# If a meta-function is called using `cfg`, any (not only non-default) values for
# keyword arguments must *either* be provided via `cfg` or via standard kw
# NOTE: the frontend defaults not set by the user do NOT appear in `kwargs`!
for key in kwargs:
# these get special treatment below
if key in ["data", "dataset"]:
continue
elif key in cfg:
lgl = f"parameter set either via `cfg.{key}=...` or directly via keyword"
act = f"parameter `{key}` set in both `cfg` and via explicit keyword"
raise SPYValueError(legal=lgl, varname=f"cfg/{key}", actual=act)
# now attach the explicit set keywords to `cfg`
# to be passed to the func call
else:
cfg[key] = kwargs[key]
# Translate any existing "yes" and "no" fields to `True` and `False`
for key in cfg.keys():
if str(cfg[key]) == "yes":
cfg[key] = True
elif str(cfg[key]) == "no":
cfg[key] = False
# No explicit `cfg`: rename `kwargs` to `cfg` to consolidate processing below;
# IMPORTANT: this does *not* create a copy of `kwargs`, thus the `pop`-ing
# below actually manipulates `kwargs` as well - crucial for the `kwargs.get("data")`
# error checking!
else:
cfg = kwargs
# If `cfg` contains keys 'data' or 'dataset' extract corresponding
# entry and make it a positional argument (abort if both 'data'
# and 'dataset' are present)
data = cfg.pop("data", None)
if cfg.get("dataset"):
if data:
lgl = "either 'data' or 'dataset' in `cfg`/keywords, not both"
raise SPYValueError(legal=lgl, varname="cfg")
data = cfg.pop("dataset")
# If `cfg` did not contain `data`, look into `kwargs`
if data is None:
data = kwargs.pop("data", None)
if kwargs.get("dataset"):
if data:
lgl = "either `data` or `dataset` keyword, not both"
raise SPYValueError(legal=lgl, varname="data/dataset")
data = kwargs.pop("dataset")
# If Syncopy data object(s) were provided convert single objects to one-element
# lists, ensure positional args do *not* contain add'l objects; ensure keyword
# args (besides `cfg`) do *not* contain add'l objects; ensure `data` exclusively
# contains Syncopy data objects. Finally, rename remaining positional arguments
if data:
if any([isinstance(arg, spy.datatype.base_data.BaseData) for arg in args]):
lgl = (
"Syncopy data object provided either via `cfg`/keyword or "
+ "positional arguments, not both"
)
raise SPYValueError(legal=lgl, varname="cfg/data")
if kwargs.get("data") or kwargs.get("dataset"):
lgl = "Syncopy data object provided either via `cfg` or as " + "keyword argument, not both"
raise SPYValueError(legal=lgl, varname="cfg.data")
if not isinstance(data, spy.datatype.base_data.BaseData):
raise SPYError("`data` must be Syncopy data object!")
posargs = args
# If `data` was not provided via `cfg` or as kw-arg, parse positional arguments
if data is None:
posargs = []
while args:
arg = args.pop(0)
if data is not None and isinstance(arg, spy.datatype.base_data.BaseData):
lgl = "only one Syncopy data object"
raise SPYValueError(lgl, varname="data")
if isinstance(arg, spy.datatype.base_data.BaseData):
data = arg
else:
posargs.append(arg)
# if there was no Syncopy data found at this point,
# we call the wrapped function without it
if data is None:
return func(*posargs, **cfg)
else:
# Call function with unfolded `data` + modified positional/keyword args
return func(data, *posargs, **cfg)
# Append two-liner to docstring header mentioning the use of `cfg`
introEntry = (
" \n"
+ " The parameters listed below can be provided as is or a via a `cfg`\n"
+ " configuration 'structure', see Notes for details. \n"
)
wrapper_cfg.__doc__ = _append_docstring(wrapper_cfg, introEntry, insert_in="Header", at_end=True)
# Append a paragraph explaining the use of `cfg` by an example that explicitly
# mentions `func`'s name and input parameters
notesEntry = (
" This function can be either called providing its input arguments directly\n"
+ " or via a `cfg` configuration 'structure'. For instance, the following function\n"
+ " calls are equivalent\n"
+ " \n"
+ " >>> spy.{fname:s}({arg0:s}, {kwarg0:s}=...)\n"
+ " >>> cfg = spy.StructDict()\n"
+ " >>> cfg.{kwarg0:s} = ...\n"
+ " >>> spy.{fname:s}(cfg, {arg0:s})\n"
+ " >>> cfg.{arg0:s} = {arg0:s}\n"
+ " >>> spy.{fname:s}(cfg)\n"
+ " \n"
+ " Please refer to :doc:`/user/fieldtrip` for further details. \n\n"
)
wrapper_cfg.__doc__ = _append_docstring(
wrapper_cfg,
notesEntry.format(fname=func.__name__, arg0=arg0, kwarg0=kwarg0),
insert_in="Notes",
at_end=False,
)
return wrapper_cfg
[docs]def unwrap_select(func):
"""
Decorator for handling in-place data selections via `select` keyword
Parameters
----------
func : callable
Typically a Syncopy metafunction such as :func:`~syncopy.freqanalysis`
Returns
-------
wrapper_select : callable
Wrapped function; `wrapper_select` extracts `select` from keywords
provided to `func` and uses it to set the `._selector` property of the
input object(s). After successfully calling `func` with the modified input,
`wrapper_select` modifies `func` itself:
1. The "Parameters" section in the docstring of `func` is amended by an
entry explaining the usage of `select` (that mostly points to
:func:`~syncopy.selectdata`). Note: `func`'s docstring is only extended
if it has a "Parameters" section.
2. If not already present, `select` is added as optional keyword (with
default value `None`) to the signature of `func`.
Notes
-----
This decorator assumes that `func` has already been processed by
:func:`~syncopy.shared.kwarg_decorators.unwrap_cfg` and hence expects
`func` to obey standard Python call signature ``func(*args, **kwargs)``.
In other words, :func:`~syncopy.shared.kwarg_decorators.unwrap_select` is
intended as "inner" decorator of metafunctions, for instance
.. code-block:: python
@unwrap_cfg
@unwrap_select
def somefunction(data, kw1="default", kw2=None, **kwargs):
...
**Important** The metafunction `func` **must** accept "anonymous" keywords
via a ``**kwargs`` dictionary. This requirement is due to the fact that
:func:`~syncopy.shared.kwarg_decorators.unwrap_cfg` cowardly refuses to change
the byte-code of `func`, that is, `select` is not actually added as a new
keyword to `func`, only the corresponding signature is manipulated.
Thus, if `func` does not support a `kwargs` parameter dictionary,
using this decorator will have *strange* consequences. Specifically, `select`
will show up in `func`'s signature but it won't be actually usable:
.. code-block:: python
@unwrap_cfg
@unwrap_select
def somefunction(data, kw1="default", kw2=None):
...
>>> help(somefunction)
somefunction(data, kw1="default", kw2=None, select=None)
...
>>> somefunction(data, select=None)
TypeError: somefunction() got an unexpected keyword argument 'select'
See also
--------
unwrap_cfg : Decorator for processing `cfg` "structs"
detect_parallel_client : controls parallel processing engine via `parallel` keyword
"""
@functools.wraps(func)
def wrapper_select(*args, **kwargs):
# Either extract `select` from input kws and cycle through positional
# argument to apply in-place selection to the Syncopy object, or raise
# an error if a selection is already present and `select` is not None
select = kwargs.get("select", None)
attached_selection = False
for obj in args:
# this hits all Syncopy data objects
if hasattr(obj, "selection"):
if obj.selection is None and select is not None:
obj.selection = select
attached_selection = True
# we have one and only one input data object
break
else:
if select is not None:
raise SPYError(
f"Selection found both in kwarg 'selection' ({select}) and in \npassed Syncopy Data object of type '{type(obj)}' ({obj.selection})"
)
# Call function with modified data object(s)
res = func(*args, **kwargs)
# Wipe data-selection slot to not alter user objects
# if the selection got attached by this wrapper here
for obj in args:
if hasattr(obj, "selection") and attached_selection:
obj.selection = None
return res
# Append `select` keyword entry to wrapped function's docstring and signature
selectDocEntry = (
" select : dict or :class:`~syncopy.shared.tools.StructDict` or str\n"
+ " In-place selection of subset of input data for processing. Please refer\n"
+ " to :func:`syncopy.selectdata` for further usage details."
)
wrapper_select.__doc__ = _append_docstring(func, selectDocEntry)
wrapper_select.__signature__ = _append_signature(func, "select")
return wrapper_select
[docs]def detect_parallel_client(func):
"""
Decorator for handling parallelization via `parallel` keyword/client detection
Any already initialized Dask cluster always takes precedence
with both `parallel=True` and `parallel=None`. This gets checked via `dd.get_client()`,
and hence if a Dask cluster was set up before, Syncopy (and also potentially ACME later) will just
pass-through this one to the compute classes.
In case no cluster is running, only a dedicated `parallel=True` will spawn either a new
Dask cluster down the road via ACME (if on a slurm cluster) or a new LocalCluster as a default fallback.
The LocalCluster gets closed again after the wrapped function exited.
If `parallel` is `None`:
First attempts to connect to a running dask parallel processing client. If successful,
`parallel` is set to `True` and updated in `func`'s keyword argument dict.
If no client is found `parallel` is set to `False`
If `parallel` is True and ACME is installed AND we are on a slurm cluster:
Do nothing and forward all the parallelization setup with `parallel=True`
to the CR and ultimately ACME
If `parallel` is True and ACME is NOT installed OR we ar NOT on a slurm cluster:
Fire up a standard dask LocalCluster and forward `parallel=True` to func
Parameters
----------
func : callable
Typically a Syncopy metafunction such as :func:`~syncopy.freqanalysis`
Returns
-------
parallel_client_detector : callable
Wrapped function; `parallel_client_detector` attempts to extract `parallel`
from keywords provided to `func`.
After successfully calling `func` with the modified
input arguments, `parallel_client_detector` modifies `func` itself:
1. The "Parameters" section in the docstring of `func` is amended by an
entry explaining the usage of `parallel`. Note: `func`'s docstring is
only extended if it has a "Parameters" section.
2. If not already present, `parallel` is added as optional keyword (with
default value `None`) to the signature of `func`.
Notes
-----
This decorator assumes that `func` has already been processed by
:func:`~syncopy.shared.kwarg_decorators.unwrap_cfg` and hence expects
`func` to obey standard Python call signature ``func(*args, **kwargs)``.
In other words, :func:`~syncopy.shared.kwarg_decorators.detect_parallel_client`
is intended as "inner" decorator of, e.g., metafunctions. See Notes in
the docstring of :func:`~syncopy.shared.kwarg_decorators.unwrap_select` for
further details.
See also
--------
unwrap_select : extract `select` keyword and process in-place data-selections
unwrap_cfg : Decorator for processing `cfg` "structs"
"""
# timeout in seconds for dask worker allocation
dask_timeout = 600
@functools.wraps(func)
def parallel_client_detector(*args, **kwargs):
logger = get_logger()
# Extract `parallel` keyword: if `parallel` is `False`, nothing happens
parallel = kwargs.get("parallel")
kill_spawn = False
has_slurm = check_slurm_available()
# warning only emitted if slurm available but no ACME or Dask client
slurm_msg = ""
# if acme is around, let it manage everything assuming we are on the ESI cluster
if spy.__acme__ and parallel is not False:
try:
client = dd.get_client()
parallel = True
except ValueError:
if parallel:
msg = (
f"Could not find a running dask cluster!\n"
"Try `esi_cluster_setup` from ACME to set up a cluster on the ESI HPC\n"
"..computing sequentially"
)
logger.important(msg)
parallel = False
# This effectively searches for a global dask cluster, and sets
# parallel=True if one was found. If no cluster was found, parallel is set to False,
# so no automatic spawning of a LocalCluster this needs explicit `parallel=True`.
elif parallel is None:
# w/o acme interface dask directly
try:
client = dd.get_client()
# wait for at least 1 worker
check_workers_available(client, timeout=dask_timeout, n_workers=1)
msg = f"..attaching to running Dask client:\n\t{client}"
logger.important(msg)
parallel = True
except ValueError:
parallel = False
# If parallel processing was requested but ACME is not installed
# and no other Dask cluster is running,
# initialize a local dask cluster as fallback for local machines
elif parallel is True:
# if already one cluster is reachable do nothing
try:
client = dd.get_client()
# wait for at least 1 worker
check_workers_available(client, timeout=dask_timeout, n_workers=1)
msg = f"..attaching to running Dask client:\n{client}"
logger.important(msg)
except ValueError:
# we are on a HPC but ACME and/or Dask client are missing,
# LocalCluster still gets created
if has_slurm and not spy.__acme__:
slurm_msg = (
"We are apparently on a slurm cluster but\n"
"Syncopy could not find a Dask client.\n"
"Syncopy does not provide an "
"automatic Dask SLURMCluster on its own!"
"\nPlease consider configuring your own dask cluster "
"via `dask_jobqueue.SLURMCluster()`"
"\n\nCreating a LocalCluster as fallback.."
)
SPYWarning(slurm_msg)
# -- spawn fallback local cluster --
cluster = dd.LocalCluster()
# attaches to local cluster residing in global namespace
dd.Client(cluster)
kill_spawn = True
msg = "No running Dask cluster found, created a local instance:\n" f"\t{cluster.scheduler}"
logger.important(msg)
# Add/update `parallel` to/in keyword args
kwargs["parallel"] = parallel
results = func(*args, **kwargs)
# kill local cluster
if kill_spawn:
# disconnect
dd.get_client().close()
# and kill
cluster.close()
# print again in case it got drowned
if slurm_msg:
SPYWarning(slurm_msg)
return results
# Append `parallel` keyword entry to wrapped function's docstring and signature
parallelDocEntry = (
" parallel : None or bool\n"
+ " If `None` (recommended), processing is automatically performed in \n"
+ " parallel (i.e., concurrently across trials/channel-groups), provided \n"
+ " a dask parallel processing client is running and available. \n"
+ " Parallel processing can be manually disabled by setting `parallel` \n"
+ " to `False`. If `parallel` is `True` but no parallel processing client\n"
+ " is running, computing will be performed sequentially."
)
parallel_client_detector.__doc__ = _append_docstring(func, parallelDocEntry)
parallel_client_detector.__signature__ = _append_signature(func, "parallel")
return parallel_client_detector
[docs]def process_io(func):
"""
Decorator for handling parallel execution of a
:meth:`~syncopy.shared.computational_routine.ComputationalRoutine.computeFunction`
Parameters
----------
func : callable
A Syncopy :meth:`~syncopy.shared.computational_routine.ComputationalRoutine.computeFunction`
Returns
-------
wrapper_io : callable
Wrapped function; `wrapper_io` changes the way it invokes the wrapped
`computeFunction` and processes its output based on the type of the
provided first positional argument `trl_dat`.
* `trl_dat` : dict
Wrapped `computeFunction` is executed concurrently; `trl_dat` was
assembled by
:meth:`~syncopy.shared.computational_routine.ComputationalRoutine.compute_parallel`
and contains information for parallel workers (particularly, paths and
dataset indices of HDF5 files for reading source data and writing results).
Nothing is returned (the output of the wrapped `computeFunction` is
directly written to disk).
* `trl_dat` : :class:`numpy.ndarray` or :class:`~syncopy.datatype.base_data.FauxTrial` object
Wrapped `computeFunction` is executed sequentially (either during dry-
run phase or in purely sequential computations); `trl_dat` is directly
propagated to the wrapped `computeFunction` and its output is returned
(either a tuple or :class:`numpy.ndarray`, depending on the value of
`noCompute`, see
:meth:`~syncopy.shared.computational_routine.ComputationalRoutine.computeFunction`
for details)
Notes
-----
Parallel execution supports two writing modes: concurrent storage of results
in multiple HDF5 files or sequential writing of array blocks in a single
output HDF5 file. In both situations, the output array returned by
:meth:`~syncopy.shared.computational_routine.ComputationalRoutine.computeFunction`
is immediately written to disk and **not** propagated back to the caller to
avoid inter-worker network communication.
In case of parallel writing, trial-channel blocks are stored in individual
HDF5 files (virtual sources) that are consolidated into a single
:class:`h5py.VirtualLayout` which is subsequently used to allocate a virtual
dataset inside a newly created HDF5 file (located in Syncopy's temporary
storage folder).
Conversely, in case of sequential writing, each resulting array is written
sequentially to an existing single output HDF5 file using a distributed mutex
for access control to prevent write collisions.
See also
--------
unwrap_cfg : Decorator for processing `cfg` "structs"
"""
@functools.wraps(func)
def wrapper_io(trl_dat, *wrkargs, **kwargs):
# `trl_dat` is a NumPy array or `FauxTrial` object: execute the wrapped
# function and return its result
if not isinstance(trl_dat, (dict, tuple)):
# Adding the metadata is done in compute_sequential(), nothing to do here.
# Note that the return value of 'func' in the next line may be a tuple containing
# both the ndarray for 'data', and the 'details'.
return func(trl_dat, *wrkargs, **kwargs)
# compatibility to adhere to the inargs the CRs produces: ill-formatted tuples
# which mix dicts, lists and even slices
if isinstance(trl_dat, tuple):
wrkargs = trl_dat[1:]
trl_dat = trl_dat[0]
# The fun part: `trl_dat` is a dictionary holding components for parallelization
keeptrials = trl_dat["keeptrials"]
infilename = trl_dat["infile"]
indset = trl_dat["indset"]
ingrid = trl_dat["ingrid"]
inshape = trl_dat["inshape"]
sigrid = trl_dat["sigrid"]
fancy = trl_dat["fancy"]
vdsdir = trl_dat["vdsdir"]
outfilename = trl_dat["outfile"]
outdset = trl_dat["outdset"]
outgrid = trl_dat["outgrid"]
outshape = trl_dat["outshape"]
outdtype = trl_dat["dtype"]
call_id = trl_dat["call_id"]
# === STEP 1 === read data into memory
# Catch empty source-array selections; this workaround is not
# necessary for h5py version 2.10+ (see https://github.com/h5py/h5py/pull/1174)
if any([not sel for sel in ingrid]):
res, details = np.empty(outshape, dtype=outdtype), {}
else:
with h5py.File(infilename, mode="r") as h5fin:
if fancy:
arr = np.array(h5fin[indset][ingrid])[np.ix_(*sigrid)]
else:
arr = np.array(h5fin[indset][ingrid])
# === STEP 2 === perform computation
# Ensure input array shape was not inflated by scalar selection
# tuple, e.g., ``e=np.ones((2,2)); e[0,:].shape = (2,)`` not ``(1,2)``
# (use an explicit `shape` assignment here to avoid copies)
arr.shape = inshape
# Now, actually call wrapped function
# Put new outputs here!
res, details = parse_cF_returns(func(arr, *wrkargs, **kwargs))
# User-supplied cFs may return a single numpy.ndarray, or a 2-tuple of type (ndarray, sdict) where
# 'ndarray' is a numpy.ndarray containing computation results to be stored in the Syncopy
# data type (like AnalogData),
# and 'sdict' is a shallow dictionary containing meta data that will be temporarily
# attached to the hdf5 container(s)
# during the compute run, but removed/collected and returned as separate return values
# to the user in the frontend.
# In case scalar selections have been performed, explicitly assign
# desired output shape to re-create "lost" singleton dimensions
# (use an explicit `shape` assignment here to avoid copies)
res.shape = outshape
# === STEP 3 === write result to disk
# Write result to multiple stand-alone HDF files or use a mutex to write to a
# common single file (sequentially)
if vdsdir is not None:
with h5py.File(outfilename, "w") as h5fout:
h5fout.create_dataset(outdset, data=res)
h5_add_metadata(h5fout, details, unique_key_suffix=call_id)
h5fout.flush()
else:
# Create distributed lock (use unique name so it's synced across workers)
lock = dd.lock.Lock(name="sequential_write")
# Either (continue to) compute average or write current chunk
lock.acquire()
with h5py.File(outfilename, "r+") as h5fout:
main_dset = h5fout[outdset]
if keeptrials:
main_dset[outgrid] = res
else:
main_dset[()] += res
h5_add_metadata(h5fout, details, unique_key_suffix=call_id)
h5fout.flush()
lock.release()
return None # result has already been written to disk
return wrapper_io
[docs]def _append_docstring(func, supplement, insert_in="Parameters", at_end=True):
"""
Local helper to automate text insertions in docstrings
Parameters
----------
func : callable
Typically a (wrapped) Syncopy metafunction such as :func:`~syncopy.freqanalysis`
supplement : str
Text entry to be added to `func`'s docstring. Has to be already formatted
correctly for its intended destination section, specifically respecting
indentation and line-breaks (e.g., following double-indentation of variable
descriptions in the "Parameters" section)
insert_in : str
Name of section `supplement` should be inserted into. Available options
are `"Header"` (part of the docstring before "Parameters"), `"Parameters"`,
`"Returns"`, `"Notes"` and `"See also"`. Note that the section specified
by `insert_in` has to already exist in `func`'s docstring, otherwise
`supplement` is *not* inserted.
at_end : bool
If `True`, `supplement` is appended at the end of the section specified
by `insert_in`. If `False`, `supplement` is included at the beginning of
the respective section.
Returns
-------
newDocString : str
A copy of `func`'s docstring with `supplement` inserted at the location
specified by `insert_in` and `at_end`.
Notes
-----
This routine is a local auxiliary method that is purely intended for internal
use. Thus, no error checking is performed.
See also
--------
_append_signature : extend a function's signature
"""
if func.__doc__ is None:
return
# these are the 4 whitespaces right in front of every doc string line
space4 = " "
# "Header" insertions always work (an empty docstring is enough to do this).
# Otherwise ensure the provided `insert_in` section already exists, i.e.,
# partitioned `sectionHeading` == queried `sectionTitle`
if insert_in == "Header":
sectionText, sectionDivider, rest = func.__doc__.partition("Parameters\n")
textBefore = ""
sectionHeading = ""
else:
sectionTitle = insert_in + "\n"
textBefore, sectionHeading, textAfter = func.__doc__.partition(sectionTitle)
if sectionHeading != sectionTitle: # `insert_in` was not found in docstring
return func.__doc__
sectionText, sectionDivider, rest = textAfter.partition("\n\n")
sectionTextList = sectionText.splitlines(keepends=True)
if at_end:
insertAtLine = -1
while sectionTextList[insertAtLine].isspace():
insertAtLine -= 1
insertAtLine = min(-1, insertAtLine + 1)
# to avoid clipping the last line of a parameter description
if sectionTextList[-1] != space4:
sectionTextList.append("\n")
sectionTextList.append(space4)
else:
# this is the 1st line break or the ' --------'
insertAtLine = 1
sectionText = "".join(sectionTextList[:insertAtLine])
sectionText += supplement
sectionText += "".join(sectionTextList[insertAtLine:])
newDocString = textBefore + sectionHeading + sectionText + sectionDivider + rest
return newDocString
[docs]def _append_signature(func, kwname, kwdefault=None):
"""
Local helper to automate keyword argument insertions in function signatures
Parameters
----------
func : callable
Typically a (wrapped) Syncopy metafunction such as :func:`~syncopy.freqanalysis`
kwname : str
Name of keyword argument to be added to `func`'s signature
kwdefault : None or any valid type
Default value of keyword argument specified by `kwname`
Returns
-------
newSignature : inspect.Signature
A copy of `func`'s signature with ``kwname=kwdefault`` included as last
named keyword argument (before ``**kwargs``). If `kwname` already exists
in `func`'s named keyword arguments, `newSignature` is an identical copy
of `func`'s signature.
Notes
-----
This function **does not** change `func`'s byte-code, that is, it does not
actually add a new keyword argument to `func` but just appends a named parameter
to `func`'s signature. As a consequence, `func` **must** accept "anonymous"
keywords via a ``**kwargs`` dictionary for this manipulation to work as
intended. If `func` does not support a ``kwargs`` parameter dictionary,
`kwname` with default value `kwdefault` will be listed in `func`'s signature
but trying to use it will trigger a "unexpected keyword argument"-`TypeError`.
This routine is a local auxiliary method that is purely intended for internal
use. Thus, no error checking is performed.
See also
--------
_append_docstring : extend a function's signature
"""
funcSignature = inspect.signature(func)
if kwname in list(funcSignature.parameters):
newSignature = funcSignature
else:
paramList = list(funcSignature.parameters.values())
keyword = inspect.Parameter(kwname, inspect.Parameter.POSITIONAL_OR_KEYWORD, default=kwdefault)
if paramList[-1].name == "kwargs":
paramList.insert(-1, keyword)
else:
paramList.append(keyword)
newSignature = inspect.Signature(parameters=paramList)
return newSignature