"""
Module for making in-memory mock-ups of :mod:`h5py` objects.
.. currentmodule:: caput.memh5
It is sometimes useful to have a consistent API for data that is independent
of whether that data lives on disk or in memory. :mod:`h5py` provides this to a
certain extent, having :class:`h5py.Dataset` objects that act very much like
:mod:`numpy` arrays. :mod:`memh5` extends this, providing an in-memory
containers, analogous to :class:`h5py.Group`, :class:`h5py.AttributeManager` and
:class:`h5py.Dataset` objects.
In addition to these basic classes that copy the :mod:`h5py` API, A higher
level data container is provided that utilizes these classes along with the
:mod:`h5py` to provide data that is transparently stored either in memory or on
disk.
This also allows the creation and use of :mod:`memh5` objects which can hold
data distributed over a number of MPI processes. These
:class:`MemDatasetDistributed` datasets hold :class:`caput.mpiarray.MPIArray`
objects and can be written to, and loaded from disk like normal :class:`memh5`
objects. Support for this must be explicitly enabled in the root group at
creation with the `distributed=True` flag.
Basic Classes
=============
.. autosummary::
:toctree: generated/
ro_dict
MemGroup
MemAttrs
MemDataset
MemDatasetCommon
MemDatasetDistributed
High Level Container
====================
.. autosummary::
:toctree: generated/
MemDiskGroup
BasicCont
Utility Functions
=================
.. autosummary::
:toctree: generated/
attrs2dict
is_group
get_h5py_File
copyattrs
deep_group_copy
"""
import sys
import collections
import warnings
import posixpath
import numpy as np
import h5py
from . import mpiutil
from . import mpiarray
# Basic Classes
# -------------
[docs]class ro_dict(collections.Mapping):
"""A dict that is read-only to the user.
This class isn't strictly read-only but it cannot be modified through the
traditional dict interface. This prevents the user from mistaking this for
a normal dictionary.
Provides the same interface for reading as the builtin python
:class:`dict`s but no methods for writing.
Parameters
----------
d : dict
Initial data for the new dictionary.
"""
[docs] def __init__(self, d=None):
if not d:
d = {}
else:
d = dict(d)
self._dict = d
[docs] def __getitem__(self, key):
return self._dict[key]
[docs] def __len__(self):
return self._dict.__len__()
[docs] def __iter__(self):
return self._dict.__iter__()
class _Storage(dict):
"""Underlying container that provides storage backing for in-memory groups.
"""
def __init__(self, **kwargs):
super(_Storage, self).__init__(**kwargs)
self._attrs = MemAttrs()
@property
def attrs(self):
return self._attrs
class _StorageRoot(_Storage):
"""Root level of the storage tree.
"""
def __init__(self, distributed=False, comm=None):
super(_StorageRoot, self).__init__()
if comm is None:
comm = mpiutil.world
self._comm = comm
if self._comm is None:
if distributed:
warnings.warn('Cannot not be in distributed mode when there is no MPI communicator!!')
self._distributed = False
else:
self._distributed = distributed
@property
def comm(self):
return self._comm
@property
def distributed(self):
return self._distributed
def __getitem__(self, key):
"""Implements Hierarchical path lookup."""
if '/' not in key:
return super(_StorageRoot, self).__getitem__(key)
# Format and split the path.
key = format_abs_path(key)
if key == '/':
return self
path_parts = key.split('/')[1:]
# Crawl the path.
out = self
for part in path_parts:
out = out[part]
return out
[docs]class MemAttrs(dict):
"""In memory implementation of the :class:`h5py.AttributeManager`.
Currently just a normal dictionary.
"""
pass
class _MemObjMixin(object):
"""Mixin represents the identity of an in-memory h5py-like object.
Implement a few attributes that all memh5 objects have, such as `parent`,
and `file`.
"""
@property
def _group_class(self):
return None
# Here I have to implement __new__ not __init__ since MemDiskGroup
# implements new and messes with parameters.
def __init__(self, storage_root=None, name=''):
super(_MemObjMixin, self).__init__()
self._storage_root = storage_root
if storage_root is not None and not posixpath.isabs(name):
# Should never happen, so this is mostly for debugging.
raise ValueError("Must be given an absolute path.")
self._name = name
@property
def name(self):
"""String giving the full path to this entry."""
return self._name
@property
def parent(self):
"""Parent :class:`MemGroup` that contains this group."""
parent_name, myname = posixpath.split(self.name)
return self._group_class._from_storage_root(self._storage_root, parent_name)
@property
def file(self):
"""Not a file at all but the top most :class:`MemGroup` of the tree."""
return self._group_class._from_storage_root(self._storage_root, '/')
def __eq__(self, other):
if hasattr(other, '_storage_root') and hasattr(other, 'name'):
return ((self._storage_root is other._storage_root)
and (self.name == other.name))
return False
def __neq__(self, other):
return not self.__eq__(other)
class _BaseGroup(_MemObjMixin, collections.Mapping):
"""Implement the majority of the Group interface.
Subclasses must setup the underlying storage in thier constructors, as well
as implement `create_group` and `create_dataset`.
"""
@property
def _group_class(self):
return self.__class__
@property
def comm(self):
"""Reference to the MPI communicator.
"""
return getattr(self._storage_root, 'comm', None)
@property
def distributed(self):
return getattr(self._storage_root, 'distributed', False)
@property
def attrs(self):
"""Attributes attached to this object.
Returns
-------
attrs : MemAttrs
"""
return self._get_storage().attrs
@classmethod
def _from_storage_root(cls, storage_root, name):
self = super(_BaseGroup, cls).__new__(cls, storage_root, name)
super(_BaseGroup, self).__init__(storage_root, name)
return self
def _get_storage(self):
return self._storage_root[self.name]
def __getitem__(self, name):
"""Retrieve an object.
The *name* may be a relative or absolute path
"""
path = format_abs_path(posixpath.join(self.name, name))
out = self._storage_root[path]
# Cast the output.
if is_group(out) or isinstance(out, _Storage):
# Group like.
return self._group_class._from_storage_root(self._storage_root, path)
else:
# A dataset
return out
def __delitem__(self, name):
"""Delete item from group."""
if name not in self.keys():
raise KeyError("Key %s not present." % name)
path = posixpath.join(self.name, name)
parent_path, name = posixpath.split(path)
parent = self._storage_root[parent_path]
del parent[name]
def __len__(self):
return len(self._get_storage())
def __iter__(self):
keys = self._get_storage().keys()
for key in keys:
yield key
def require_dataset(self, name, shape, dtype, **kwargs):
"""Require a dataset to exist, create if it doesn't.
All arguments are passed through to create_dataset.
"""
try:
d = self[name]
except KeyError:
return self.create_dataset(name, shape=shape, dtype=dtype, **kwargs)
if is_group(d):
msg = "Entry '%s' exists and is not a Dataset." % name
raise TypeError(msg)
else:
return d
def require_group(self, name):
"""Require a group to exist, create if it doesn't."""
try:
g = self[name]
except KeyError:
return self.create_group(name)
if not is_group(g):
msg = "Entry '%s' exists and is not a Group." % name
raise TypeError(msg)
else:
return g
[docs]class MemGroup(_BaseGroup):
"""In memory implementation of the :class:`h5py.Group`.
This class doubles as the memory implementation of :class:`h5py.File`,
object, since the distinction between a file and a group for in-memory data
is moot.
Parameters
----------
distributed : boolean, optional
Allow memh5 object to hold distributed datasets.
comm : MPI.Comm, optional
MPI Communicator to distributed over. If not set, use :obj:`MPI.COMM_WORLD`.
Attributes
----------
attrs
name
parent
file
Methods
-------
__getitem__
from_group
from_hdf5
to_hdf5
create_group
require_group
create_dataset
require_dataset
"""
[docs] def __init__(self, distributed=False, comm=None):
# Default constructor is only used to create the root group.
storage_root = _StorageRoot(distributed=distributed, comm=comm)
name = '/'
super(MemGroup, self).__init__(storage_root, name)
@property
[docs] def mode(self):
"""String indicating if group is readonly ("r") or read-write ("r+").
:class:`MemGroup`s are always read-write.
"""
return 'r+'
@classmethod
[docs] def from_group(cls, group):
"""Create a new instance by deep copying an existing group.
Agnostic as to whether the group to be copyed is a `MemGroup` or an
`h5py.Group` (which includes `hdf5.File` objects).
"""
if isinstance(group, MemGroup):
self = cls()
deep_group_copy(group, self)
return self
else:
return cls.from_hdf5(group)
@classmethod
[docs] def from_hdf5(cls, filename, distributed=False, hints=True, comm=None, **kwargs):
"""Create a new instance by copying from an hdf5 group.
Any keyword arguments are passed on to the constructor for `h5py.File`.
Parameters
----------
filename : string
Name of file to load.
distributed : boolean, optional
Whether to load file in distributed mode.
hints : boolean, optional
If in distributed mode use hints to determine whether datasets are
distributed or not.
comm : MPI.Comm, optional
MPI communicator to distributed over. If :obj:`None` use
:obj:`MPI.COMM_WORLD`.
Returns
-------
group : memh5.Group
Root group of loaded file.
"""
if comm is None:
comm = mpiutil.world
if comm is None:
if distributed:
warnings.warn('Cannot load file in distributed mode when there is no MPI communicator!!')
distributed = False
if not distributed or not hints:
with h5py.File(filename, **kwargs) as f:
self = cls(distributed=distributed, comm=comm)
deep_group_copy(f, self)
else:
self = _distributed_group_from_hdf5(filename, comm=comm, hints=hints)
return self
[docs] def to_hdf5(self, filename, hints=True, **kwargs):
"""Replicate object on disk in an hdf5 file.
Any keyword arguments are passed on to the constructor for `h5py.File`.
Parameters
----------
filename : str
File to save into.
hints : boolean, optional
Whether to write hints into the file that described whether datasets
are distributed, or not.
"""
if not self.distributed:
with h5py.File(filename, **kwargs) as f:
deep_group_copy(self, f)
else:
_distributed_group_to_hdf5(self, filename, **kwargs)
[docs] def create_group(self, name):
"""Create a group within the storage tree."""
path = format_abs_path(posixpath.join(self.name, name))
try:
self[name]
except KeyError:
pass
else:
raise ValueError('Entry %s exists.' % name)
# If distributed, synchronise to ensure that we create group collectively
if self.distributed:
self.comm.Barrier()
parent_name = '/'
path_parts = path.split('/')
# In this loop, exception guaranteed not to be raised on first
# iteration, since we know that `parent_name + ''` exists.
for part in path_parts:
try:
parent_name = posixpath.join(parent_name, part)
parent_storage = self._storage_root[parent_name]
except KeyError:
parent_storage[part] = _Storage()
parent_name = posixpath.join(parent_name, part)
parent_storage = parent_storage[part]
if not isinstance(parent_storage, _Storage):
raise ValueError('Entry %s exists and is not a Group.'
% parent_name)
# Underlying storage has been created. Return the group object.
return self[name]
[docs] def create_dataset(self, name, shape=None, dtype=None, data=None,
distributed=False, distributed_axis=None, **kwargs):
"""Create a new dataset.
Parameters
----------
name : string
Dataset name.
shape : tuple, optional
Shape tuple. This gives the global shape for a distributed dataset.
dtype : np.dtype, optional
Numpy datatype of the dataset.
data : np.ndarray or MPIArray, optional
Data array to initialise from. Uses a view of the original where possible.
distributed : boolean, optional
Create a distributed dataset or not.
distributed_axis : int, optional
Axis to distribute the data over. If specified with initialisation
data this will cause create a copy with the correct distribution.
Returns
-------
dset : memh5.MemDataset
"""
parent_name, name = posixpath.split(posixpath.join(self.name, name))
parent_name = format_abs_path(parent_name)
parent_storage = self.require_group(parent_name)._get_storage()
# If distributed, synchronise to ensure that we create group collectively
if self.distributed:
self.comm.Barrier()
if self.comm is None:
if distributed:
warnings.warn('Cannot create distributed dataset when there is no MPI communicator!!')
distributed = False
if kwargs:
msg = ("No extra keyword arguments accepted, this is not an hdf5"
" object but a memory object mocked up to look like one.")
raise TypeError(msg)
# XXX In future could accept extra arguments and use them if
# writing to disk.
# If data is set, copy out params from it.
if data is not None:
if shape is None:
shape = data.shape
if dtype is None:
dtype = data.dtype
# Otherwise shape is required.
if shape is None:
raise ValueError('shape must be provided.')
# Default dtype is float.
if dtype is None:
dtype = np.float64
# Convert to numpy dtype.
dtype = np.dtype(dtype)
# Create distributed dataset if data is an MPIArray
if isinstance(data, mpiarray.MPIArray) and data.comm is not None:
distributed = True
# Enforce that distributed datasets can only exist in distributed memh5 groups.
if not self.distributed and distributed:
raise RuntimeError('Cannot create a distributed dataset in a non-distributed group.')
# If data is set (and consistent with shape/type), initialise the numpy array from it.
if (data is not None and shape == data.shape
and dtype is data.dtype and hasattr(data, 'view')):
# Create parallel array if requested
if distributed:
# Ensure we are creating from an MPIArray
if not isinstance(data, mpiarray.MPIArray):
raise TypeError('Can only create distributed dataset from MPIArray.')
# Ensure that we are distributing over the same communicator
if data.comm != self.comm:
raise RuntimeError('MPI communicator of array must match that of memh5 group.')
# If the distributed_axis is specified ensure the data is distributed along it.
if distributed_axis is not None:
data = data.redistribute(axis=distributed_axis)
# Create distributed dataset
new_dataset = MemDatasetDistributed.from_mpi_array(data)
else:
# Create common dataset
new_dataset = MemDatasetCommon.from_numpy_array(data)
# Otherwise create an empty array and copy into it (if needed)
else:
# Just copy the data.
if distributed:
# Ensure that distributed_axis is set.
if distributed_axis is None:
raise RuntimeError('Distributed axis must be specified when creating dataset.')
new_dataset = MemDatasetDistributed(shape=shape, dtype=dtype,
axis=distributed_axis,
comm=self.comm)
else:
new_dataset = MemDatasetCommon(shape=shape, dtype=dtype)
if data is not None:
new_dataset[:] = data[:]
# Add new dataset to group
parent_storage[name] = new_dataset
# Set the properties of the new dataset
new_dataset._name = posixpath.join(parent_name, name)
new_dataset._storage_root = self._storage_root
return new_dataset
[docs] def dataset_common_to_distributed(self, name, distributed_axis=0):
"""Convert a common dataset to a distributed one.
Parameters
----------
name : string
Dataset name.
distributed_axis : int, optional
Axis to distribute the data over.
Returns
-------
dset : memh5.MemDatasetDistributed
"""
dset = self[name]
if dset.distributed:
warnings.warn('%s is already a distributed dataset, redistribute it along the required axis %d' % (name, distributed_axis))
dset.redistribute(distributed_axis)
return dset
dset_shape = dset.shape
dset_type = dset.dtype
dist_len = dset_shape[distributed_axis]
ld, sd, ed = mpiutil.split_local(dist_len, comm=self.comm)
md = mpiarray.MPIArray(dset_shape, axis=distributed_axis, comm=self.comm, dtype=dset_type)
md.local_array[:] = dset[sd:ed].copy()
attr_dict = {} # temporarily save attrs of this dataset
copyattrs(dset.attrs, attr_dict)
del dset
new_dset = self.create_dataset(name, shape=dset_shape, dtype=dset_type, data=md, distributed=True, distributed_axis=distributed_axis)
copyattrs(attr_dict, new_dset.attrs)
return new_dset
[docs] def dataset_distributed_to_common(self, name):
"""Convert a distributed dataset to a common one.
Parameters
----------
name : string
Dataset name.
Returns
-------
dset : memh5.MemDatasetCommon
"""
dset = self[name]
if dset.common:
warnings.warn('%s is already a common dataset, no need to convert' % name)
return dset
dset_shape = dset.shape
dset_type = dset.dtype
global_array = np.zeros(dset_shape, dtype=dset_type)
local_start = dset.local_offset
nproc = 1 if self.comm is None else self.comm.size
# gather local distributed dataset to a global array for all procs
for rank in range(nproc):
mpiutil.gather_local(global_array, dset.local_data, local_start, root=rank, comm=self.comm)
attr_dict = {} # temporarily save attrs of this dataset
copyattrs(dset.attrs, attr_dict)
del dset
new_dset = self.create_dataset(name, data=global_array, shape=dset_shape, dtype=dset_type)
copyattrs(attr_dict, new_dset.attrs)
return new_dset
[docs]class MemDataset(_MemObjMixin):
"""Base class for an in memory implementation of :class:`h5py.Dataset`.
This is only an abstract base class. Use :class:`MemDatasetCommon` or
:class:`MemDatasetDistributed`.
Attributes
----------
attrs
name
parent
file
"""
[docs] def __init__(self, **kwargs):
super(MemDataset, self).__init__(**kwargs)
self._attrs = MemAttrs()
@property
[docs] def attrs(self):
"""Attributes attached to this object.
Returns
-------
attrs : MemAttrs
"""
return self._attrs
[docs] def resize(self):
# h5py datasets reshape() is different from numpy reshape.
msg = "Dataset reshaping not allowed. Perhapse make an new array view."
raise NotImplementedError(msg)
@property
[docs] def shape(self):
raise NotImplementedError("Not implmemented in base class.")
@property
[docs] def dtype(self):
raise NotImplementedError("Not implmemented in base class.")
[docs] def __getitem__(self, obj):
raise NotImplementedError("Not implmemented in base class.")
[docs] def __setitem__(self, obj, val):
raise NotImplementedError("Not implmemented in base class.")
[docs] def __len__(self):
raise NotImplementedError("Not implmemented in base class.")
[docs]class MemDatasetCommon(MemDataset):
"""In memory implementation of :class:`h5py.Dataset`.
Inherits from :class:`MemDataset`. Encapsulates a numpy array mocked up to
look like an hdf5 dataset. Similar to h5py datasets, this implements
slicing like a numpy array but as it is not actually a many operations
won't work (e.g. ufuncs).
Parameters
----------
shape : tuple
Shape of array to initialise.
dtype : numpy dtype
Type of array to create.
Attributes
----------
common
distributed
data
local_data
shape
dtype
Methods
-------
from_numpy_array
"""
[docs] def __init__(self, shape, dtype):
super(MemDatasetCommon, self).__init__()
self._data = np.zeros(shape, dtype)
@classmethod
[docs] def from_numpy_array(cls, data):
"""Initialise from a numpy array.
Parameters
----------
data : np.ndarray
Array to initialise from.
Returns
-------
dset : MemDatasetCommon
Dataset encapsulating the numpy array.
"""
if not isinstance(data, np.ndarray):
raise TypeError("Object must be a numpy array (or subclass).")
dset = cls.__new__(cls)
super(MemDatasetCommon, dset).__init__()
dset._data = data
return dset
@property
[docs] def comm(self):
return None
@property
[docs] def common(self):
return True
@property
[docs] def distributed(self):
return False
@property
[docs] def data(self):
return self._data
@property
[docs] def local_data(self):
return self._data
@property
[docs] def shape(self):
return self._data.shape
@property
[docs] def dtype(self):
return self._data.dtype
[docs] def __getitem__(self, obj):
return self._data[obj]
[docs] def __setitem__(self, obj, val):
self._data[obj] = val
[docs] def __len__(self):
return len(self._data)
[docs] def __iter__(self):
# This needs to be implemented to stop craziness happening when doing
# np.array(dset)
return self._data.__iter__()
[docs] def __repr__(self):
return "<memh5 common dataset %s: shape %s, type \"%s\">" % (repr(self._name), repr(self.shape), repr(self.dtype))
[docs]class MemDatasetDistributed(MemDataset):
"""Parallel, in-memory implementation of :class:`h5py.Dataset`.
Inherits from :class:`MemDataset`. Encapsulates an :class:`MPIArray` mocked
up to look like an `h5py` dataset. Similar to h5py datasets, this
implements slicing like a numpy array but as it is not actually a many
operations won't work (e.g. ufuncs).
Parameters
----------
shape : tuple
Shape of array to initialise. This is the *global* shape.
dtype : numpy dtype
Type of array to create.
axis : int, optional
Index of axis to distribute the array over.
comm : MPI.Comm, optional
MPI communicator to distribute over. If :obj:`None` use
:obj:`MPI.COMM_WORLD`.
Attributes
----------
common
distributed
data
local_data
shape
global_shape
local_shape
local_offset
dtype
comm
distributed_axis
"""
[docs] def __init__(self, shape, dtype, axis=0, comm=None):
super(MemDatasetDistributed, self).__init__()
self._data = mpiarray.MPIArray(shape, axis=axis, comm=comm, dtype=dtype)
@classmethod
[docs] def from_mpi_array(cls, data):
dset = cls.__new__(cls)
MemDataset.__init__(dset)
if not isinstance(data, mpiarray.MPIArray):
raise TypeError("Object must be a numpy array (or subclass).")
dset._data = data
return dset
@property
[docs] def common(self):
return False
@property
[docs] def distributed(self):
return True
@property
[docs] def data(self):
return self._data
@property
[docs] def local_data(self):
return self._data.local_array
@property
[docs] def shape(self):
return self.global_shape
@property
[docs] def global_shape(self):
return self._data.global_shape
@property
[docs] def local_shape(self):
return self._data.local_shape
@property
[docs] def local_offset(self):
return self._data.local_offset
@property
[docs] def dtype(self):
return self._data.dtype
@property
[docs] def distributed_axis(self):
return self._data.axis
@property
[docs] def comm(self):
return self._data._comm
[docs] def redistribute(self, axis):
"""Change the axis that the dataset is distributed over.
Parameters
----------
axis : integer
Axis to distribute over.
"""
self._data = self._data.redistribute(axis=axis)
[docs] def __getitem__(self, obj):
return self._data.global_slice[obj]
[docs] def __setitem__(self, obj, val):
self._data.global_slice[obj] = val
[docs] def __iter__(self):
# This needs to be implemented to stop craziness happening when doing
# np.array(dset)
return self._data.__iter__()
[docs] def __len__(self):
return len(self._data)
[docs] def __repr__(self):
return ("<memh5 distributed dataset %s: global_shape %s, dist_axis %s, type \"%s\">"
% (repr(self._name), repr(self.global_shape), repr(self.distributed_axis), repr(self.dtype)))
# Higher Level Data Containers
# ----------------------------
[docs]class MemDiskGroup(_BaseGroup):
"""Container whose data may either be stored on disk or in memory.
This container is intended to have the same basic API :class:`h5py.Group`
and :class:`MemGroup` but whose underlying data could live either on disk
or in memory.
Aside from providing a few convenience methods, this class isn't that
useful by itself. It is almost as easy to use :class:`h5py.Group`
or :class:`MemGroup` directly. Where it becomes more useful is for creating
more specialized data containers which can subclass this class. A basic
but useful example is provided in :class:`BasicCont`.
This class also supports the same distributed features as :class:`MemGroup`,
but only when wrapping that class. Attempting to create a distributed object
wrapping a :class:`h5py.File` object will raise an exception. For similar
reasons, :meth:`MemDiskGroup.to_disk` will not work, however,
:meth:`MemDiskGroup.save` will work fine.
Parameters
----------
data_group : :class:`h5py.Group`, :class:`MemGroup` or string, optional
Underlying :mod:`h5py` like data container where data will be stored.
If a string, open a h5py file with that name. If not
provided a new :class:`MemGroup` instance will be created.
distributed : boolean, optional
Allow the container to hold distributed datasets.
comm : MPI.Comm, optional
MPI Communicator to distributed over. If not set, use :obj:`MPI.COMM_WORLD`.
detect_subclass: boolean, optional
If *data_group* is specified, whether to inspect for a
'__memh5_subclass' attribute which specifies a subclass to return.
Attributes
----------
attrs
name
parent
file
ondisk
Methods
-------
__getitem__
__delitem__
from_file
dataset_name_allowed
group_name_allowed
create_dataset
require_dataset
create_group
require_group
to_memory
to_disk
flush
close
save
"""
[docs] def __init__(self, data_group=None, distributed=False, comm=None):
toclose = False
if comm is None:
comm = mpiutil.world
if comm is None:
if distributed:
warnings.warn('Cannot create distributed MemDiskGroup when there is no MPI communicator!!')
distributed = False
else:
distributed = distributed
# If data group is not set, initialise a new MemGroup
if data_group is None:
data_group = MemGroup(distributed=distributed, comm=comm)
# If it is a MemDiskGroup then initialise a shallow copy
elif isinstance(data_group, MemDiskGroup):
data_group = data_group._storage_root
# Otherwise, presume it is an HDF5 Group-like object (which includes
# MemGroup and h5py.Group).
else:
data_group, toclose = get_h5py_File(data_group)
if distributed and isinstance(data_group, h5py.Group):
raise ValueError('Distributed MemDiskGroup cannot be created around h5py objects.')
# Check the distribution settings
elif distributed:
# Check parallel distribution is the same
if not data_group.distributed:
raise ValueError('Cannot create MemDiskGroup with different distributed setting to MemGroup to wrap.')
# Check parallel communicator is the same
if comm and comm != data_group.comm:
raise ValueError('Cannot create MemDiskGroup with different MPI communicator to MemGroup to wrap.')
self._toclose = toclose
super(MemDiskGroup, self).__init__(storage_root=data_group, name=data_group.name)
@classmethod
[docs] def from_group(cls, data_group=None, detect_subclass=True):
"""Create data object from a given group.
This wraps the given group object, optionally returning the correct
subclass. This does *not* call `__init__` on the subclass when
this happens.
Parameters
----------
data_group : :class:`h5py.Group`, :class:`MemGroup` or string, optional
:mod:`h5py` like data containerto wrap.
detect_subclass: boolean, optional
If *data_group* is specified, whether to inspect for a
'__memh5_subclass' attribute which specifies a subclass to return.
Returns
-------
grp : MemDiskGroup
"""
# Look for a hint as to the sub class we should return, this should be
# in the attributes of the root.
new_cls = cls
if detect_subclass and '__memh5_subclass' in data_group.attrs:
from .pipeline import _import_class
clspath = data_group.attrs['__memh5_subclass']
# Try and get a reference to the requested class (warn if we cannot find it)
try:
new_cls = _import_class(clspath)
except (ImportError, KeyError):
warnings.warn('Could not import memh5 subclass %s' % clspath)
# Check that it is a subclass of MemDiskGroup
if not issubclass(cls, MemDiskGroup):
raise RuntimeError('Requested type (%s) is not an instance of memh5.MemDiskGroup.' % clspath)
self = new_cls.__new__(new_cls)
MemDiskGroup.__init__(self, data_group=data_group)
return self
@property
[docs] def _data(self):
"""_data was renamed to _storage_root. This added for compatibility."""
return self._storage_root
[docs] def _finish_setup(self):
"""Finish the class setup *after* importing from a file."""
pass
[docs] def __del__(self):
"""Closes file if on disk if file was opened on initialization."""
if self.ondisk and hasattr(self, '_toclose') and self._toclose:
self._storage_root.close()
[docs] def __getitem__(self, name):
"""Retrieve an object.
The *name* may be a relative or absolute path
"""
value = super(MemDiskGroup, self).__getitem__(name)
path = value.name
if is_group(value):
if not self.group_name_allowed(path):
msg = "Access to group %s not allowed." % path
raise KeyError(msg)
else:
if not self.dataset_name_allowed(path):
msg = "Access to dataset %s not allowed." % path
raise KeyError(msg)
return value
[docs] def __len__(self):
n = 0
for key in self:
n += 1
return n
[docs] def __iter__(self):
for key in super(MemDiskGroup, self).__iter__():
try:
value = self[key]
except KeyError:
# This key name is not allowed (see __getitem__)
continue
yield key
@property
[docs] def ondisk(self):
"""Whether the data is stored on disk as opposed to in memory."""
return hasattr(self, '_storage_root') and isinstance(self._storage_root, h5py.File)
# For creating new instances. #
@classmethod
[docs] def from_file(cls, file_, ondisk=False, distributed=False, comm=None,
detect_subclass=True, **kwargs):
"""Create data object from analysis hdf5 file, store in memory or on disk.
If *ondisk* is True, do not load into memory but store data in h5py objects
that remain associated with the file on disk. This is almost identical
to the default constructor, when providing a file as the *data_group*
object, however provides more flexibility when opening the file through
the additional keyword arguments.
This does *not* call `__init__` on the subclass when restoring.
Parameters
----------
file_ : string or :class:`h5py.Group` object
File with the hdf5 data. File must be compatible with memh5 objects.
ondisk : bool
Whether the data should be stored in-place in *file_* or should be copied
into memory.
distributed : boolean, optional
Allow the container to hold distributed datasets.
comm : MPI.Comm, optional
MPI Communicator to distributed over. If not set, use :obj:`MPI.COMM_WORLD`.
detect_subclass: boolean, optional
If *data_group* is specified, whether to inspect for a
'__memh5_subclass' attribute which specifies a subclass to return.
**kwargs : any other arguments
Any additional keyword arguments are passed to :class:`h5py.File`'s
constructor if *file_* is a filename and silently ignored otherwise.
"""
if not ondisk:
if isinstance(file_, h5py.Group):
file_ = file_.filename
data = MemGroup.from_hdf5(file_, distributed=distributed, comm=comm, mode='r', **kwargs)
toclose = False
else:
# Again, a compatibility hack
if is_group(file_):
data = file_
toclose = False
else:
data = h5py.File(file_, **kwargs)
toclose = True
# Here we explicitly avoid calling __init__ on any derived class. Like
# with a pickle we want to restore the saved state only.
self = cls.from_group(data_group=data)
# ... skip the class initialisation, and use a special method
self._finish_setup()
self._toclose = toclose
return self
# Methods for manipulating and building the class. #
[docs] def group_name_allowed(self, name):
"""Used by subclasses to restrict creation of and access to groups.
This method is called by :meth:`create_group`, :meth:`require_group`,
and :meth:`__getitem__` to check that the supplied group name is
allowed.
The idea is that subclasses that want to specialize and restrict the
layout of the data container can implement this method instead of
re-implementing the above mentioned methods.
Parameters
----------
name: string
Absolute path to proposed group.
Returns
-------
allowed : bool
``True``
"""
return True
[docs] def dataset_name_allowed(self, name):
"""Used by subclasses to restrict creation of and access to datasets.
This method is called by :meth:`create_dataset`,
:meth:`require_dataset`, and :meth:`__getitem__` to check that the
supplied group name is allowed.
The idea is that subclasses that want to specialize and restrict the
layout of the data container can implement this method instead of
re-implementing the above mentioned methods.
Parameters
----------
name: string
Absolute path to proposed dataset.
Returns
-------
allowed : bool
``True``
"""
return True
[docs] def create_dataset(self, name, *args, **kwargs):
"""Create and return a new dataset.
All parameters are passed through to the :meth:`create_dataset` method of
the underlying storage, whether it be an :class:`h5py.Group` or a
:class:`MemGroup`.
"""
path = posixpath.join(self.name, name)
if not self.dataset_name_allowed(path):
msg = "Dataset name %s not allowed." % path
raise ValueError(msg)
new_dataset = self._data.create_dataset(path, *args, **kwargs)
return new_dataset
[docs] def dataset_common_to_distributed(self, name, distributed_axis=0):
"""Convert a common dataset to a distributed one.
Parameters
----------
name : string
Dataset name.
distributed_axis : int, optional
Axis to distribute the data over.
Returns
-------
dset : memh5.MemDatasetDistributed
"""
if isinstance(self._data, MemGroup):
return self._data.dataset_common_to_distributed(name, distributed_axis)
else:
raise RuntimeError('Can not convert a h5py dataset %s to distributed' % name)
[docs] def dataset_distributed_to_common(self, name):
"""Convert a distributed dataset to a common one.
Parameters
----------
name : string
Dataset name.
Returns
-------
dset : memh5.MemDatasetCommon
"""
if isinstance(self._data, MemGroup):
return self._data.dataset_distributed_to_common(name)
else:
raise RuntimeError('Can not convert a h5py dataset %s to distributed' % name)
[docs] def create_group(self, name):
"""Create and return a new group."""
path = posixpath.join(self.name, name)
if not self.group_name_allowed(path):
msg = "Group name %s not allowed." % path
raise ValueError(msg)
self._data.create_group(path)
return self._group_class._from_storage_root(self._data, path)
[docs] def to_memory(self):
"""Return a version of this data that lives in memory."""
if isinstance(self._data, MemGroup):
return self
else:
return self.__class__.from_file(self._data)
[docs] def to_disk(self, filename, **kwargs):
"""Return a version of this data that lives on disk."""
if not isinstance(self._data, MemGroup):
msg = ("This data already lives on disk. Copying to new file"
" anyway.")
warnings.warn(msg)
elif self._data.distributed:
raise NotImplementedError("Cannot run to_disk on a distributed object. Try running save instead.")
self.save(filename)
return self.__class__.from_file(filename, ondisk=True, **kwargs)
[docs] def close(self):
"""Close underlying hdf5 file if on disk."""
if self.ondisk:
self._data.close()
[docs] def flush(self):
"""Flush the buffers of the underlying hdf5 file if on disk."""
if self.ondisk:
self._data.flush()
[docs] def save(self, filename, **kwargs):
"""Save data to hdf5 file."""
# Write out a hint as to what the class of this object is, do this by
# inserting it into the attributes before saving out.
if '__memh5_subclass' not in self.attrs:
clspath = self.__class__.__module__ + '.' + self.__class__.__name__
self.attrs['__memh5_subclass'] = clspath
if isinstance(self._data, h5py.File):
with h5py.File(filename, **kwargs) as f:
deep_group_copy(self._data, f)
else:
self._data.to_hdf5(filename, **kwargs)
[docs]class BasicCont(MemDiskGroup):
"""Basic high level data container.
Inherits from :class:`MemDiskGroup`.
Basic one-level data container that allows any number of datasets in the
root group but no nesting. Data history tracking (in
:attr:`BasicCont.history`) and array axis interpretation (in
:attr:`BasicCont.index_map`) is also provided.
This container is intended to be an example of how a high level container,
with a strictly controlled data layout can be implemented by subclassing
:class:`MemDiskGroup`.
Parameters
----------
Parameters are passed through to the base class constructor.
Attributes
----------
index_map
history
Methods
-------
group_name_allowed
dataset_name_allowed
create_index_map
del_index_map
add_history
redistribute
"""
[docs] def __init__(self, *args, **kwargs):
super(BasicCont,self).__init__(*args, **kwargs)
# Initialize new groups only if writable.
if self._data.file.mode == 'r+':
self._data.require_group(u'history')
self._data.require_group(u'index_map')
if 'order' not in self._data['history'].attrs.keys():
self._data['history'].attrs[u'order'] = '[]'
@property
[docs] def history(self):
"""Stores the analysis history for this data.
Do not try to add a new entry by assigning to an element of this
property. Use :meth:`~BasicCont.add_history` instead.
Returns
-------
history : read only dictionary
Each entry is a dictionary containing metadata about that stage in
history. There is also an 'order' entry which specifies how the
other entries are ordered in time.
"""
out = {}
for name, value in self._data['history'].iteritems():
out[name] = value.attrs
out[u'order'] = eval(self._data['history'].attrs['order'])
return ro_dict(out)
@property
[docs] def index_map(self):
"""Stores representions of the axes of datasets.
The index map contains arrays used to interpret the axes of the
various datasets. For instance, the 'time', 'prod' and 'freq' axes of
the visibilities are described in the index map.
Do not try to add a new index_map by assigning to an item of this
property. Use :meth:`~BasicCont.create_index_map` instead.
Returns
-------
index_map : read only dictionary
Entries are 1D arrays used to interpret the axes of datasets.
"""
out = {}
for name, value in self._data['index_map'].iteritems():
out[name] = value[:]
return ro_dict(out)
[docs] def group_name_allowed(self, name):
"""No groups are exposed to the user. Returns ``False``."""
return False
[docs] def dataset_name_allowed(self, name):
"""Datasets may only be created and accessed in the root level group.
Returns ``True`` is *name* is a path in the root group i.e. '/dataset'.
"""
parent_name, name = posixpath.split(name)
return True if parent_name == '/' else False
[docs] def create_index_map(self, axis_name, index_map):
"""Create a new index map.
"""
self._data['index_map'].create_dataset(axis_name, data=index_map)
[docs] def del_index_map(self, axis_name):
"""Delete an index map."""
del self._data['index_map'][axis_name]
[docs] def add_history(self, name, history=None):
"""Create a new history entry."""
if name == 'order':
raise ValueError('"order" is a reserved name and may not be the'
' name of a history entry.')
if history is None:
history = {}
order = self.history['order']
order = order + [name]
history_group = self._data["history"]
history_group.attrs[u'order'] = str(order)
history_group.create_group(name)
for key, value in history.items():
history_group[name].attrs[key] = value
[docs] def redistribute(self, dist_axis):
"""Redistribute parallel datasets along a specified axis.
Parameters
----------
dist_axis : int, string, or list of
The axis can be specified by an integer index (positive or
negative), or by a string label which must correspond to an entry in
the `axis` attribute on the dataset. If a list is supplied, each
entry is tried in turn, which allows different datasets to be
redistributed along differently labelled axes.
"""
if not isinstance(dist_axis, (list, tuple)):
dist_axis = [dist_axis]
# Worker routine to crawl the tree and redistribute any parallel datasets
def _tree_crawl(group):
for name, item in group.items():
# Recurse into subgroups
if isinstance(item, MemGroup):
_tree_crawl(item)
# Okay, we've found a distributed dataset, let's try and redistribute it
if isinstance(item, MemDatasetDistributed):
naxis = len(item.shape)
for axis in dist_axis:
# Try processing if this is a string
if isinstance(axis, basestring):
if 'axis' in item.attrs and axis in item.attrs['axis']:
axis = np.argwhere(item.attrs['axis'] == axis)[0, 0]
else:
continue
# Process if axis is an integer
elif isinstance(axis, int):
# Deal with negative axis index
if axis < 0:
axis = naxis + axis
# Check axis is within bounds
if axis >= naxis:
continue
# Excellent, found a matching axis, time to redistribute
item.redistribute(axis)
break
# Note that this clause is on the FOR.
else:
# If we are here we didn't find a matching axis, emit a warning
if group.comm.rank == 0:
warnings.warn(('Could not find an axis (out of %s)'
+ 'to distributed dataset %s over.') % (str(dist_axis), name))
_tree_crawl(self._data)
# Utilities
# ---------
[docs]def attrs2dict(attrs):
"""Safely copy an h5py attributes object to a dictionary."""
out = {}
for key, value in attrs.iteritems():
if isinstance(value, np.ndarray):
value = value.copy()
out[key] = value
return out
[docs]def is_group(obj):
"""Check if the object is a Group, which includes File objects.
In most cases, if it isn't a Group it's a Dataset, so this can be used to
check for Datasets as well.
"""
return hasattr(obj, 'create_group')
[docs]def get_h5py_File(f, **kwargs):
"""Checks if input is an `h5py.File` or filename and returns the former.
Parameters
----------
f : h5py Group or filename string
**kwargs : all keyword arguments
Passed to :class:`h5py.File` constructor. If `f` is already an open file,
silently ignores all keywords.
Returns
-------
f : hdf5 group
opened : bool
Whether the a file was opened or not (i.e. was already open).
"""
# Figure out if F is a file or a filename, and whether the file should be
# closed.
if is_group(f):
opened = False
#if kwargs:
# msg = "Got some keyword arguments but File is alrady open."
# warnings.warn(msg)
else:
opened = True
try:
f = h5py.File(f, **kwargs)
except IOError as e:
msg = "Opening file %s caused an error: " % str(f)
new_e = IOError(msg + str(e))
raise new_e.__class__, new_e, sys.exc_info()[2]
return f, opened
[docs]def copyattrs(a1, a2):
# Make sure everything is a copy.
a1 = attrs2dict(a1)
for key, value in a1.iteritems():
a2[key] = value
[docs]def deep_group_copy(g1, g2):
"""Copy full data tree from one group to another."""
copyattrs(g1.attrs, g2.attrs)
for key, entry in g1.iteritems():
if is_group(entry):
g2.create_group(key)
deep_group_copy(entry, g2[key])
else:
g2.create_dataset(key, shape=entry.shape, dtype=entry.dtype,
data=entry)
copyattrs(entry.attrs, g2[key].attrs)
def format_abs_path(path):
"""Return absolute path string, formated without any extra '/'s."""
if not posixpath.isabs(path):
raise ValueError("Absolute path must be provided.")
path_parts = path.split('/')
# Strip out any empty key parts. Takes care of '//', trailing '/', and
# removes leading '/'.
path_parts = [p for p in path_parts if p]
out = '/'
for p in path_parts:
out = posixpath.join(out, p)
return out
def _distributed_group_to_hdf5(group, fname, hints=True, **kwargs):
"""Private routine to copy full data tree from distributed memh5 object into an
HDF5 file."""
if not group.distributed:
raise RuntimeError('This should only run on distributed datasets [%s].' % group.name)
comm = group.comm
# Create a copy of the kwargs with no mode argument so that we can override it
kwargs_nomode = kwargs.copy()
if 'mode' in kwargs:
del kwargs_nomode['mode']
# Create group (or file)
if comm.rank == 0:
# If this is the root group, create the file and copy the file level
# attrs
if group.name == '/':
with h5py.File(fname, 'w', **kwargs) as f:
copyattrs(group.attrs, f.attrs)
if hints:
f.attrs['__memh5_distributed_file'] = True
# Create this group and copy attrs
else:
with h5py.File(fname, 'r+', **kwargs) as f:
g = f.create_group(group.name)
copyattrs(group.attrs, g.attrs)
comm.Barrier()
# Write out groups and distributed datasets, these operations must be done collectively
for key, entry in group.iteritems():
# Groups are written out by recursing
if is_group(entry):
_distributed_group_to_hdf5(entry, fname, **kwargs)
# Write out distributed datasets (only the data, the attributes are written below)
elif isinstance(entry, MemDatasetDistributed):
arr = entry._data
arr.to_hdf5(fname, entry.name)
comm.Barrier()
# Write out common datasets, and the attributes on distributed datasets
if comm.rank == 0:
with h5py.File(fname, 'r+', **kwargs_nomode) as f:
for key, entry in group.iteritems():
# Write out common datasets and copy their attrs
if isinstance(entry, MemDatasetCommon):
dset = f.create_dataset(entry.name, data=entry._data)
copyattrs(entry.attrs, dset.attrs)
if hints:
dset.attrs['__memh5_distributed_dset'] = False
# Copy the attributes over for a distributed dataset
elif isinstance(entry, MemDatasetDistributed):
if entry.name not in f:
raise RuntimeError('Distributed dataset should already have been created.')
copyattrs(entry.attrs, f[entry.name].attrs)
if hints:
f[entry.name].attrs['__memh5_distributed_dset'] = True
comm.Barrier()
def _distributed_group_from_hdf5(fname, comm=None, hints=True, **kwargs):
"""Private routine to restore full tree from an HDF5 file into a distributed memh5 object."""
# Create root group
group = MemGroup(distributed=True, comm=comm)
comm = group.comm
# == Create some internal functions for doing the read ==
# Copy over attributes with a broadcast from rank = 0
def _copy_attrs_bcast(h5item, memitem):
attr_dict = None
if comm.rank == 0:
attr_dict = { k: v for k, v in h5item.attrs.items() }
attr_dict = comm.bcast(attr_dict, root=0)
copyattrs(attr_dict, memitem.attrs)
# Function to perform a recursive clone of the tree structure
def _copy_from_file(h5group, memgroup):
# Copy over attributes
_copy_attrs_bcast(h5group, memgroup)
for key, item in h5group.items():
# If group, create the entry and the recurse into it
if is_group(item):
new_group = memgroup.create_group(key)
_copy_from_file(item, new_group)
# If dataset, create dataset
else:
# Check if we are in a distributed dataset
if ('__memh5_distributed_dset' in item.attrs) and item.attrs['__memh5_distributed_dset']:
# Read from file into MPIArray
pdata = mpiarray.MPIArray.from_hdf5(f, key, comm=comm)
# Create dataset from MPIArray
dset = memgroup.create_dataset(key, data=pdata, distributed=True)
else:
# Read common data onto rank zero and broadcast
cdata = None
if comm.rank == 0:
cdata = item[:]
cdata = comm.bcast(cdata, root=0)
# Create dataset from array
dset = memgroup.create_dataset(key, data=cdata, distributed=False)
# Copy attributes over into dataset
_copy_attrs_bcast(item, dset)
# Open file on all ranks
with h5py.File(fname, 'r') as f:
# Start recursive file read
_copy_from_file(f, group)
# Final synchronisation
comm.Barrier()
return group