Source code for caput.misc

"""A set of miscellaneous routines that don't really fit anywhere more specific."""

import importlib
import os

import numpy as np


[docs]def vectorize(**base_kwargs): """An improved vectorization decorator. Unlike the :class:`np.vectorize` decorator this version works on methods in addition to functions. It also gives an actual scalar value back for any scalar input, instead of returning a 0-dimension array. Parameters ---------- **kwargs Any keyword arguments accepted by :class:`np.vectorize` Returns ------- vectorized_function : func """ class _vectorize_desc: # See # http://www.ianbicking.org/blog/2008/10/decorators-and-descriptors.html # for a description of this pattern def __init__(self, func): # Save a reference to the function and set various properties so the # docstrings etc. get passed through self.func = func self.__doc__ = func.__doc__ self.__name__ = func.__name__ self.__module__ = func.__module__ def __call__(self, *args, **kwargs): # This gets called whenever the wrapped function is invoked arr = np.vectorize(self.func, **base_kwargs)(*args, **kwargs) if not arr.shape: arr = arr.item() return arr def __get__(self, obj, objtype=None): # As a descriptor, this gets called whenever this is used to wrap a # function, and simply binds it to the instance if obj is None: return self new_func = self.func.__get__(obj, objtype) return self.__class__(new_func) return _vectorize_desc
[docs]def scalarize(dtype=np.float64): """Handle scalars and other iterables being passed to numpy requiring code. Parameters ---------- dtype : np.dtype, optional The output datatype. Used only to set the return type of zero-length arrays. Returns ------- vectorized_function : func """ class _scalarize_desc: # See # http://www.ianbicking.org/blog/2008/10/decorators-and-descriptors.html # for a description of this pattern def __init__(self, func): # Save a reference to the function and set various properties so the # docstrings etc. get passed through self.func = func self.__doc__ = func.__doc__ self.__name__ = func.__name__ self.__module__ = func.__module__ def __call__(self, *args, **kwargs): # This gets called whenever the wrapped function is invoked args, scalar, empty = zip(*[self._make_array(a) for a in args]) if all(empty): return np.array([], dtype=dtype) ret = self.func(*args, **kwargs) if all(scalar): ret = ret[0] return ret @staticmethod def _make_array(x): # Change iterables to arrays and scalars into length-1 arrays from skyfield import timelib # Special handling for the slightly awkward skyfield types if isinstance(x, timelib.Time): if isinstance(x.tt, np.ndarray): scalar = False else: scalar = True x = x.ts.tt_jd(np.array([x.tt])) elif isinstance(x, np.ndarray): scalar = False elif isinstance(x, (list, tuple)): x = np.array(x) scalar = False else: x = np.array([x]) scalar = True return (x, scalar, len(x) == 0) def __get__(self, obj, objtype=None): # As a descriptor, this gets called whenever this is used to wrap a # function, and simply binds it to the instance if obj is None: return self new_func = self.func.__get__(obj, objtype) return self.__class__(new_func) return _scalarize_desc
[docs]def listize(**_): """Make functions that already work with `np.ndarray` or scalars accept lists. Also works with tuples. Returns ------- listized_function : func """ class _listize_desc: def __init__(self, func): # Save a reference to the function and set various properties so the # docstrings etc. get passed through self.func = func self.__doc__ = func.__doc__ self.__name__ = func.__name__ self.__module__ = func.__module__ def __call__(self, *args, **kwargs): # This gets called whenever the wrapped function is invoked new_args = [] for arg in args: if isinstance(arg, (list, tuple)): arg = np.array(arg) new_args.append(arg) return self.func(*new_args, **kwargs) def __get__(self, obj, objtype=None): # As a descriptor, this gets called whenever this is used to wrap a # function, and simply binds it to the instance if obj is None: return self new_func = self.func.__get__(obj, objtype) return self.__class__(new_func) return _listize_desc
[docs]def open_h5py_mpi(f, mode, use_mpi=True, comm=None): """Ensure that we have an h5py File object. Opens with MPI-IO if possible. The returned file handle is annotated with two attributes: `.is_mpi` which says whether the file was opened as an MPI file and `.opened` which says whether it was opened in this call. Parameters ---------- f : string, h5py.File or h5py.Group Filename to open, or already open file object. If already open this is just returned as is. mode : string Mode to open file in. use_mpi : bool, optional Whether to use MPI-IO or not (default True) comm : mpi4py.Comm, optional MPI communicator to use. Uses `COMM_WORLD` if not set. Returns ------- fh : h5py.File File handle for h5py.File, with two extra attributes `.is_mpi` and `.opened`. """ import h5py has_mpi = h5py.get_config().mpi if isinstance(f, str): # Open using MPI-IO if we can if has_mpi and use_mpi: from mpi4py import MPI comm = comm if comm is not None else MPI.COMM_WORLD fh = h5py.File(f, mode, driver="mpio", comm=comm) else: fh = h5py.File(f, mode) fh.opened = True elif isinstance(f, (h5py.File, h5py.Group)): fh = f fh.opened = False else: raise ValueError( f"Can't write to {f} (Expected a h5py.File, h5py.Group or str filename)." ) fh.is_mpi = fh.file.driver == "mpio" return fh
[docs]class lock_file: """Manage a lock file around a file creation operation. Parameters ---------- filename : str Final name for the file. preserve : bool, optional Keep the temporary file in the event of failure. comm : MPI.COMM, optional If present only rank=0 will create/remove the lock file and move the file. Returns ------- tmp_name : str File name to use in the locked block. Examples -------- >>> from . import memh5 >>> container = memh5.BasicCont() >>> with lock_file('file_to_create.h5') as fname: ... container.save(fname) ... """ def __init__(self, name, preserve=False, comm=None): if comm is not None and not hasattr(comm, "rank"): raise ValueError("comm argument does not seem to be an MPI communicator.") self.name = name # If comm not specified, set internal rank0 marker to True, # so that rank>0 tasks can open their own files self.rank0 = True if comm is None else comm.rank == 0 self.preserve = preserve def __enter__(self): if self.rank0: with open(self.lockfile, "w+") as fh: fh.write("") return self.tmpfile def __exit__(self, exc_type, exc_val, exc_tb): if self.rank0: # Check if exception was raised and delete the temp file if needed if exc_type is not None: if not self.preserve: os.remove(self.tmpfile) # Otherwise things were successful and we should move the file over else: os.rename(self.tmpfile, self.name) # Finally remove the lock file os.remove(self.lockfile) return False @property def tmpfile(self): """Full path to the lockfile (without file extension).""" base, fname = os.path.split(self.name) return os.path.join(base, "." + fname) @property def lockfile(self): """Full path to the lockfile (with file extension).""" return self.tmpfile + ".lock"
# TODO: remove this. This was to support a patching of this routine to support Python 2 # that used to exist in here. This will be removed when all other repos are changed to # use the version from `inspect`
[docs]def getfullargspec(*args, **kwargs): """See `inspect.getfullargspec`. This is a Python 2 patch that will be removed. """ import inspect import warnings warnings.warn( "This patch to support Python 2 is no longer needed and will be removed.", DeprecationWarning, ) return inspect.getfullargspec(*args, **kwargs)
[docs]def import_class(class_path): """Import class dynamically from a string. Parameters ---------- class_path : str Fully qualified path to the class. If only a single component, look up in the globals. Returns ------- class : class object The class we want to load. """ path_split = class_path.split(".") module_path = ".".join(path_split[:-1]) class_name = path_split[-1] if module_path: m = importlib.import_module(module_path) task_cls = getattr(m, class_name) else: task_cls = globals()[class_name] return task_cls