Source code for syncopy.io.utils

# -*- coding: utf-8 -*-
#
# Collection of I/O utility functions
#

# Builtin/3rd party package imports
import os
import sys
import shutil
import inspect
import numpy as np
from datetime import datetime
from glob import glob
from collections import OrderedDict
from tqdm import tqdm

if sys.platform == "win32":
    # tqdm breaks term colors on Windows - fix that (tqdm issue #446)
    import colorama

    colorama.deinit()
    colorama.init(strip=False)

# Local imports
from syncopy import __storage__, __sessionid__, __checksum_algorithm__, __spydir__
from syncopy.datatype.base_data import BaseData
from syncopy.datatype.util import get_dir_size
from syncopy.shared.parsers import scalar_parser
from syncopy.shared.errors import SPYTypeError, log
from syncopy.shared.queries import user_input

__all__ = ["cleanup", "clear"]

# Dictionary keys for beginning of info/json file that are not class properties
startInfoDict = OrderedDict()
startInfoDict["filename"] = None
startInfoDict["dataclass"] = None
startInfoDict["data_dtype"] = None
startInfoDict["data_shape"] = None
startInfoDict["data_offset"] = None
startInfoDict["trl_dtype"] = None
startInfoDict["trl_shape"] = None
startInfoDict["trl_offset"] = None
startInfoDict["file_checksum"] = None
startInfoDict["order"] = "C"
startInfoDict["checksum_algorithm"] = __checksum_algorithm__.__name__


def hash_file(fname, bsize=65536):
    """
    An enlightening docstring...

    Internal helper routine, do not parse inputs
    """

    hash = __checksum_algorithm__()
    with open(fname, "rb") as f:
        for block in iter(lambda: f.read(bsize), b""):
            hash.update(block)
    return hash.hexdigest()


[docs]def cleanup(older_than=24, interactive=True, only_current_session=False): """ Delete old files in temporary Syncopy folder The location of the temporary folder is stored in `syncopy.__storage__`. Parameters ---------- older_than : int Files older than `older_than` hours will be removed interactive : bool Set to `False` to remove all (sessions and dangling files) at once without a prompt asking for confirmation only_current_session : bool Set to `True` to only remove dangling files associated to *this* Syncopy instance Examples -------- >>> spy.cleanup() """ # Make sure age-cutoff is valid scalar_parser(older_than, varname="older_than", ntype="int_like", lims=[0, np.inf]) older_than = int(older_than) # For clarification: show location of storage folder that is scanned here funcName = "Syncopy <{}>".format(inspect.currentframe().f_code.co_name) storage_size_gb, storage_num_files = get_dir_size(__storage__, out="GB") dirInfo = "\n{name:s} Analyzing temporary storage folder '{dir:s}' containing {numf:d} files with total size {sizegb:.2f} GB...\n" log( dirInfo.format( name=funcName, dir=__storage__, numf=storage_num_files, sizegb=storage_size_gb, ), caller="cleanup", ) # Parse interactive keyword: if `False`, don't ask, just delete if not isinstance(interactive, bool): raise SPYTypeError(interactive, varname="interactive", expected="bool") # Also check for dangling data (not associated to any session) data = glob(os.path.join(__storage__, "spy_*")) dangling = [] for dat in data: sessid = os.path.splitext(os.path.basename(dat))[0].split("_")[1] if not only_current_session: dangling.append(dat) elif sessid == __sessionid__: dangling.append(dat) # Farewell if nothing's to do here if not dangling: ext = "Did not find any dangling data or Syncopy session remains " + "older than {age:d} hours." log(ext.format(name=funcName, age=older_than), caller=cleanup) spydir_size_gb, spydir_num_files = get_dir_size(__spydir__, out="GB") log( f"Note: {spydir_num_files} files with total size of {spydir_size_gb:.2f} GB left in spy dir '{__spydir__}'.", caller=cleanup, ) return # Prepare info prompt for dangling files if dangling: dangInfo = ( "Found {numdang:d} dangling files not associated to any session " + "using {szdang:4.1f} GB of disk space. \n" ) numdang = 0 szdang = 0.0 for file in dangling: try: if os.path.isfile(file): szdang += os.path.getsize(file) / 1024**3 numdang += 1 elif os.path.isdir(file): szdang += sum( os.path.getsize(os.path.join(dirpth, fname)) / 1024**3 for dirpth, _, fnames in os.walk(file) for fname in fnames ) numdang += 1 except OSError as ex: log( f"Dangling file {file} no longer exists: {ex}. (Maybe already deleted.)", caller=cleanup, ) dangInfo = dangInfo.format(numdang=numdang, szdang=szdang) dangOptions = ( "[D]ANGLING FILE removal to delete anything not associated to sessions " + "(you will not be prompted for confirmation) \n" ) dangValid = ["D"] promptInfo = dangInfo promptOptions = dangOptions promptValid = dangValid # Put together actual prompt message message promptChoice = "\nPlease choose one of the following options:\n" abortOption = "[C]ANCEL\n" abortValid = ["C"] if dangling: rmAllOption = "[R]EMOVE all dangling files at once " + "(you will not be prompted for confirmation)\n" rmAllValid = ["R"] promptInfo = dangInfo promptOptions = dangOptions + rmAllOption promptValid = dangValid + rmAllValid # By default, ask what to do; if `interactive` is `False`, remove everything if interactive: choice = user_input( promptInfo + promptChoice + promptOptions + abortOption, valid=promptValid + abortValid, ) else: choice = "R" # Deleate all dangling files at once if choice == "D": for dat in tqdm(dangling, desc="Deleting dangling data...", disable=None): _rm_session([dat]) # Delete everything elif choice == "R": for contents in tqdm([[dat] for dat in dangling], desc="Deleting temporary data...", disable=None): _rm_session(contents) # Don't do anything for now, continue w/dangling data else: print(f"Aborting...") # Report on remaining data storage_size_gb, storage_num_files = get_dir_size(__storage__, out="GB") log( f"{storage_num_files} files with total size of {storage_size_gb:.2f} GB left in storage dir '{__storage__}'.", caller="cleanup", ) spydir_size_gb, spydir_num_files = get_dir_size(__spydir__, out="GB") log( f"{spydir_num_files} files with total size of {spydir_size_gb:.2f} GB left in spy dir '{__spydir__}'.", caller="cleanup", )
[docs]def clear(): """ Clear Syncopy objects from memory Notes ----- Syncopy objects are **not** loaded wholesale into memory. Only the corresponding meta-information is read from disk and held in memory. The underlying numerical data is streamed on-demand from disk leveraging HDF5's modified LRU (least recently used) page replacement algorithm. Thus, :func:`syncopy.clear` simply force-flushes all of Syncopy's HDF5 backing devices to free up memory currently blocked by cached data chunks. Examples -------- >>> spy.clear() """ # Get current frame thisFrame = sys._getframe() # For later reference: dynamically fetch name of current function funcName = "Syncopy <{}>".format(thisFrame.f_code.co_name) # Go through caller's namespace and execute `clear` of `BaseData` children counter = 0 for name, value in thisFrame.f_back.f_locals.items(): if isinstance(value, BaseData): value.clear() counter += 1 # Be talkative msg = "{name:s} flushed {objcount:d} objects from memory" print(msg.format(name=funcName, objcount=counter)) return
def _rm_session(session_files): """ Local helper for deleting tmp data of a given spy session """ for file in session_files: try: os.unlink(file) if os.path.isfile(file) else shutil.rmtree(file) except Exception as ex: pass return