# -*- coding: utf-8 -*-
#
# Load data from Field Trip .mat files
#
# Builtin/3rd party package imports
import re
import h5py
import numpy as np
from scipy import io as sio
from tqdm import tqdm
# Local imports
from syncopy.shared.errors import SPYValueError, SPYInfo, SPYWarning
from syncopy.shared.parsers import io_parser, sequence_parser, scalar_parser
from syncopy.datatype import AnalogData
__all__ = ["load_ft_raw"]
# Required fields for the ft_datatype_raw
req_fields_raw = ("time", "trial", "label")
[docs]def load_ft_raw(filename, list_only=False, select_structures=None, include_fields=None, mem_use=4000):
"""
Imports raw time-series data from Field Trip
into potentially multiple :class:`~syncopy.AnalogData` objects,
one for each structure found within the MAT-file.
The aim is to parse each FT data structure, which
have the following fields (Syncopy analogon on the right):
+--------------------+------------+
| FT | Syncopy |
+====================+============+
| label | channel |
+--------------------+------------+
| trial | trial |
+--------------------+------------+
| time | time |
+--------------------+------------+
| trialinfo | trialinfo |
+--------------------+------------+
| fsample (optional) | samplerate |
+--------------------+------------+
| cfg | cfg |
+--------------------+------------+
Limitations:
The FT `cfg` contains a lot of meta data which at the
moment we don't import into Syncopy. Syncopy however has
it's own `cfg` mirroring FT's functionality (replay analyses)
FT's `sampleinfo` is not generally compatible with Syncopy
Parameters
----------
filename: str
Path to the MAT-file
list_only: bool, optional
Set to `True` to return only a list containing the names
of the structures found
select_structures: sequence or None, optional
Sequence of strings, one for each structure,
the default `None` will load all structures found
include_fields: sequence, optional
Additional MAT-File fields within each structure to
be imported. They can be accessed via the `AnalogData.info`
attribute.
mem_use: int
The amount of RAM requested for the import process in MB. Note that < v7.3
MAT-File formats can only be loaded at once. For MAT-File v7.3 this should
be at least twice the size of a single trial.
Returns
-------
out_dict: dict
Dictionary with the names of the structures as keys loaded from the MAT-File,
and :class:`~syncopy.AnalogData` datasets as values
Notes
-----
For MAT-File < v7.3 the MAT-file gets loaded completely
into RAM using :func:`scipy.io.loadmat`, but its size should be capped by Matlab at 2GB.
The >v7.3 MAT-files are in hdf5 format and will be read in trial-by-trial,
this should be the Matlab default for MAT-files exceeding 2GB.
See also
--------
`MAT-File formats <https://de.mathworks.com/help/matlab/import_export/mat-file-versions.html>`_
`Field Trip datastructures <https://www.fieldtriptoolbox.org/development/datastructure/>`_
Examples
--------
Load two structures `'Data_K'` and `'Data_KB'` from a MAT-File `example.mat`:
>>> dct = load_ft_raw('example.mat', select_structures=('Data_K', 'Data_KB'))
Access the individual :class:`~syncopy.AnalogData` datasets:
>>> data_kb = dct['Data_KB']
>>> data_k = dct['Data_K']
Load all structures from `example.mat` plus additional field `'chV1'`:
>>> dct = load_ft_raw('example.mat', include_fields=('chV1',))
Access the additionally loaded field:
>>> dct['Data_K'].info['chV1']
Just peek into the MAT-File and get a list of the contained structures:
>>> load_ft_raw('example.mat', list_only=True)
>>> ['Data_K', 'Data_KB']
"""
# -- Input validation --
io_parser(filename, isfile=True)
if select_structures is not None:
sequence_parser(select_structures, varname="select_structures", content_type=str)
if include_fields is not None:
sequence_parser(include_fields, varname="include_fields", content_type=str)
scalar_parser(mem_use, varname="mem_use", ntype="int_like", lims=[1, np.inf])
# -- MAT-File Format --
version = _get_Matlab_version(filename)
msg = f"Reading MAT-File version {version} "
SPYInfo(msg)
# new hdf container format, use h5py
if version >= 7.3:
h5File = h5py.File(filename, "r")
struct_keys = [key for key in h5File.keys() if "#" not in key]
struct_container = h5File
struct_reader = lambda struct: _read_hdf_structure(
struct, h5File=h5File, mem_use=mem_use, include_fields=include_fields
)
# old format <2GB, use scipy's MAT reader
else:
if mem_use < 2000:
msg = "MAT-File version < 7.3 does not support lazy loading"
msg += f"\nReading {filename} might take up to 2GB of RAM, you requested only {mem_use / 1000}GB"
SPYInfo(msg)
lgl = "2000 or more MB"
actual = f"{mem_use}"
raise SPYValueError(lgl, varname="mem_use", actual=actual)
raw_dict = sio.loadmat(filename, mat_dtype=True, simplify_cells=True)
struct_keys = [skey for skey in raw_dict.keys() if "__" not in skey]
struct_container = raw_dict
struct_reader = lambda struct: _read_dict_structure(struct, include_fields=include_fields)
msg = f"Found {len(struct_keys)} structure(s): {struct_keys} in {filename}"
SPYInfo(msg)
if list_only:
return struct_keys
if len(struct_keys) == 0:
SPYValueError(
legal="At least one structure",
varname=filename,
actual="No structure found",
)
# -- IO Operations --
out_dict = {}
# load only a subset
if select_structures is not None:
keys = select_structures
# load all structures found
else:
keys = struct_keys
for skey in keys:
if skey not in struct_keys:
msg = f"Could not find structure `{skey}` in {filename}"
SPYWarning(msg)
continue
structure = struct_container[skey]
_check_req_fields(req_fields_raw, structure)
# the AnalogData objs
adata = struct_reader(structure)
# Write log-entry
msg = f"loaded struct `{skey}` from Matlab file version {version}\n"
msg += f"\tsource file: {filename}"
adata.log = msg
out_dict[skey] = adata
return out_dict
def _read_hdf_structure(h5Group, h5File, mem_use, include_fields=None):
"""
Each Matlab structure contained in
a hdf5 MAT-File is a h5py Group object.
Each key of this Group corresponds to
a field in the Matlab structure.
This is the translation from FT to Syncopy:
+--------------------+------------+
| FT | Syncopy |
+--------------------+------------+
| label | channel |
| trial | trial |
| time | time |
| fsample (optional) | samplerate |
| cfg | X |
+--------------------+------------+
"""
# for user info
struct_name = h5Group.name[1:]
# this should be fixed upstream such that
# the `defaultDimord` is indeed the default :)
AData = AnalogData(dimord=AnalogData._defaultDimord)
# probably better to define an abstract mapping
# if we want to support more FT formats in the future
# these are numpy arrays holding hdf5 object references
# i.e. one per trial, channel, time (per trial)
trl_refs = h5Group["trial"][:, 0]
time_refs = h5Group["time"][:, 0]
chan_refs = h5Group["label"][0, :]
if "fsample" in h5Group:
AData.samplerate = h5Group["fsample"][0, 0]
else:
AData.samplerate = _infer_fsample(h5File[time_refs[0]])
# -- retrieve shape information --
nTrials = trl_refs.size
# peek in 1st trial to determine the number of channels
# and one trial size for
nSamples1, nChannels = h5File[trl_refs[0]].shape
# compute total hdf5 shape
# we stack along 1st axis
trlSamples = [h5File[ref].shape[0] for ref in trl_refs]
# in samples
mean_trl_size = np.mean(trlSamples)
nTotalSamples = np.sum(trlSamples)
# get sample indices
si = np.r_[0, np.cumsum(trlSamples)]
sampleinfo = np.column_stack([si[:-1], si[1:]])
itemsize = h5File[trl_refs[0]].dtype.itemsize
# in Mbyte
trl_size = itemsize * mean_trl_size * nChannels / 1e6
# assumption: single trial fits into RAM
if trl_size >= 0.4 * mem_use:
lgl = f"{2.5 * trl_size} or more MB"
actual = f"{mem_use}"
raise SPYValueError(lgl, varname="mem_use", actual=actual)
# -- IO process --
# create new hdf5 dataset for our AnalogData
# with the default dimord ['time', 'channel']
# and our default data type np.float32 -> implicit casting!
with h5py.File(AData.filename, mode="w") as h5FileOut:
ADset = h5FileOut.create_dataset("data", dtype=np.float32, shape=[nTotalSamples, nChannels])
pbar = tqdm(trl_refs, desc=f"{struct_name} - loading {nTrials} trials", disable=None)
SampleCounter = 0 # trial stacking
# one swipe per trial
for tr in pbar:
trl_array = h5File[tr]
# in samples
trl_samples = trl_array.shape[0]
ADset[SampleCounter : SampleCounter + trl_samples, :] = trl_array
SampleCounter += trl_samples
pbar.close()
AData.data = ADset
AData._reopen()
# -- trialdefinition --
offsets = []
# we need to look into the time vectors for each trial
for time_r in time_refs:
offsets.append(h5File[time_r][0, 0])
offsets = np.rint(np.array(offsets) * AData.samplerate)
trl_def = np.hstack([sampleinfo, offsets[:, None]])
# check if there is a 'trialinfo'
try:
trl_def = np.hstack([trl_def, h5Group["trialinfo"]])
except KeyError:
pass
AData.trialdefinition = trl_def
# each channel label is an integer array with shape (X, 1),
# where `X` is the number of ascii encoded characters
channels = ["".join(map(chr, h5File[cr][:, 0])) for cr in chan_refs]
AData.channel = channels
# -- Additional Fields --
if include_fields is not None:
AData.info = {}
# additional fields in MAT-File
afields = [k for k in h5Group.keys() if k not in req_fields_raw]
msg = f"Found following additional fields: {afields}"
SPYInfo(msg, caller="load_ft_raw")
for field in include_fields:
if field not in h5Group:
msg = f"Could not find additional field {field} in {struct_name}"
SPYWarning(msg, caller="load_ft_raw")
continue
dset = h5Group[field]
# we only support fields pointing
# directly to a dataset containing actual data
# and not references to larger objects
if isinstance(dset[0], h5py.Reference):
msg = f"Could not read additional field '{field}'\n"
msg += "Only simple fields holding str labels or 1D arrays are supported atm"
SPYWarning(msg)
continue
# ASCII encoding via uint16
if dset.dtype == np.uint16 and len(dset.shape) == 2:
AData.info[field] = _parse_MAT_hdf_strings(dset).tolist()
# numerical data can be written
# directly as into info dict
elif dset.dtype == np.float64:
AData.info[field] = dset[...].tolist()
else:
msg = f"Could not read additional field '{field}'\n"
msg += "Unknown data type, only 1D numerical or string arrays/fields supported"
SPYWarning(msg)
continue
return AData
def _read_dict_structure(structure, include_fields=None):
"""
Local helper to parse a single FT structure
and return an :class:`~syncopy.AnalogData` object
Only for for Matlab data format version < 7.3
which was opened via scipy.io.loadmat!
This is the translation from FT to Syncopy:
+--------------------+------------+
| FT | Syncopy |
+--------------------+------------+
| label | channel |
| trial | trial |
| time | time |
| fsample (optional) | samplerate |
| cfg | X |
+--------------------+------------+
Each trial in FT has nChannels x nSamples ordering,
Syncopy has nSamples x nChannels
"""
# initialize AnalogData
if "fsample" in structure:
samplerate = structure["fsample"]
else:
samplerate = _infer_fsample(structure["time"][0])
AData = AnalogData(samplerate=samplerate)
# compute total hdf5 shape
# we use fixed stacking along 1st axis
# but channel x sample ordering in FT
nTotalSamples = np.sum([trl.shape[1] for trl in structure["trial"]])
nChannels = structure["trial"][0].shape[0]
sampleinfo = []
with h5py.File(AData._filename, "w") as h5file:
dset = h5file.create_dataset("data", dtype=np.float32, shape=[nTotalSamples, nChannels])
stack_count = 0
for trl in structure["trial"]:
trl_size = trl.shape[1]
# default data type np.float32 -> implicit casting!
dset[stack_count : stack_count + trl_size] = trl.T.astype(np.float32)
# construct on the fly to cover all the trials
sampleinfo.append(np.array([stack_count, stack_count + trl_size]))
stack_count += trl_size
AData.data = dset
AData._reopen()
sampleinfo = np.array(sampleinfo)
# get the channel ids
channels = structure["label"]
# set the channel ids
AData.channel = list(channels.astype(str))
# get the offets
offsets = np.array([tvec[0] for tvec in structure["time"]])
offsets *= AData.samplerate
# build trialdefinition
trl_def = np.column_stack([sampleinfo, offsets])
# check if there is a 'trialinfo'
try:
trl_def = np.hstack([trl_def, structure["trialinfo"]])
except KeyError:
pass
AData.trialdefinition = trl_def
# -- Additional Fields --
if include_fields is not None:
AData.info = {}
# additional fields in MAT-File
afields = [k for k in structure.keys() if k not in req_fields_raw]
msg = f"Found following additional fields: {afields}"
SPYInfo(msg, caller="load_ft_raw")
for field in include_fields:
if field not in structure:
msg = f"Could not find additional field {field}"
SPYWarning(msg, caller="load_ft_raw")
continue
# we only support fields pointing directly to some data
# no nested structures!
if np.ndim(structure[field]) != 0:
msg = f"Could not read additional nested field '{field}'\n"
msg += "Only simple fields holding str labels or 1D arrays are supported"
SPYWarning(msg)
continue
AData.info[field] = structure[field].tolist()
return AData
def _get_Matlab_version(filename):
"""
Peeks into the 1st line of a .mat file
and extracts the version information.
Works for both < 7.3 and newer MAT-files.
"""
with open(filename, "rb") as matfile:
line1 = next(matfile)
# relevant information
header = line1[:76].decode()
# matches for example 'MATLAB 5.01'
# with the version as only capture group
pattern = re.compile(r"^MATLAB\s(\d*\.\d*)")
match = pattern.match(header)
if not match:
lgl = "recognizable .mat file"
actual = "can not recognize .mat file"
raise SPYValueError(lgl, filename, actual)
version = float(match.group(1))
return version
def _check_req_fields(req_fields, structure):
"""
Just check the the minimal required fields
(aka keys in Python) are present in a
Matlab structure
Works for both old-style (dict) and
new-style (hdf5 Group) MAT-file structures.
"""
for key in req_fields:
if key not in structure:
lgl = f"{key} present in MAT structure"
actual = f"{key} missing"
raise SPYValueError(lgl, "MAT structure", actual)
def _infer_fsample(time_vector):
"""
Akin to `ft_datatype_raw` determine
the sampling frequency from the sampling
times
"""
return np.mean(np.diff(time_vector))
def _parse_MAT_hdf_strings(dataset):
"""
Expects a hdf5 dataset of shape (X, N),
where X is the number of characters in
a single string, and N is the number of strings.
The entries themselves are are of integer type,
the ASCII encoding of strings in Matlab v7.3.
Intended for small(!!) string datasets containing
for example some labels
"""
# FIXME: a simple `for in ascii_arr in dataset` might do the trick as well
# (no need to enumerate)?
str_seq = []
for i, ascii_arr in enumerate(dataset[...].T):
string = "".join(map(chr, ascii_arr))
str_seq.append(string)
return np.array(str_seq)