"""
Module for making in-memory mock-ups of :mod:`h5py` objects.
It is sometimes useful to have a consistent API for data that is independent
of whether that data lives on disk or in memory. :mod:`h5py` provides this to a
certain extent, having :class:`h5py.Dataset` objects that act very much like
:mod:`numpy` arrays. :mod:`memh5` extends this, providing an in-memory
containers, analogous to :class:`h5py.Group`, :class:`h5py.AttributeManager` and
:class:`h5py.Dataset` objects.
In addition to these basic classes that copy the :mod:`h5py` API, A higher
level data container is provided that utilizes these classes along with the
:mod:`h5py` to provide data that is transparently stored either in memory or on
disk.
This also allows the creation and use of :mod:`memh5` objects which can hold
data distributed over a number of MPI processes. These
:class:`MemDatasetDistributed` datasets hold :class:`caput.mpiarray.MPIArray`
objects and can be written to, and loaded from disk like normal :class:`memh5`
objects. Support for this must be explicitly enabled in the root group at
creation with the `distributed=True` flag.
.. warning::
It has been observed that the parallel write of distributed datasets can
lock up. This was when using macOS using `ompio` of OpenMPI 3.0.
Switching to `romio` as the MPI-IO backend helped here, but please report
any further issues.
Basic Classes
=============
- :py:class:`ro_dict`
- :py:class:`MemGroup`
- :py:class:`MemAttrs`
- :py:class:`MemDataset`
- :py:class:`MemDatasetCommon`
- :py:class:`MemDatasetDistributed`
High Level Container
====================
- :py:class:`MemDiskGroup`
- :py:class:`BasicCont`
Utility Functions
=================
- :py:meth:`attrs2dict`
- :py:meth:`is_group`
- :py:meth:`get_h5py_File`
- :py:meth:`copyattrs`
- :py:meth:`deep_group_copy`
"""
from collections.abc import Mapping
import datetime
import warnings
import posixpath
from ast import literal_eval
import json
import logging
import numpy as np
import h5py
from . import fileformats
from . import mpiutil
from . import mpiarray
from . import misc
logger = logging.getLogger(__name__)
try:
import zarr
except ImportError as err:
logger.info(f"zarr support disabled. Install zarr to change this: {err}")
zarr_available = False
else:
zarr_available = True
# Basic Classes
# -------------
[docs]class ro_dict(Mapping):
"""A dict that is read-only to the user.
This class isn't strictly read-only but it cannot be modified through the
traditional dict interface. This prevents the user from mistaking this for
a normal dictionary.
Provides the same interface for reading as the builtin python
:class:`dict` but no methods for writing.
Parameters
----------
d : dict
Initial data for the new dictionary.
"""
def __init__(self, d=None):
if not d:
d = {}
else:
d = dict(d)
self._dict = d
def __getitem__(self, key):
return self._dict[key]
def __len__(self):
return self._dict.__len__()
def __iter__(self):
return self._dict.__iter__()
def __eq__(self, other):
if not isinstance(other, ro_dict):
return False
return Mapping.__eq__(self, other) and self._dict == other._dict
class _Storage(dict):
"""Underlying container that provides storage backing for in-memory groups."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._attrs = MemAttrs()
@property
def attrs(self):
"""
Attributes attached to this object.
Returns
-------
attrs : MemAttrs
"""
return self._attrs
def __eq__(self, other):
if not isinstance(other, _Storage):
return False
return dict.__eq__(self, other) and self._attrs == other._attrs
class _StorageRoot(_Storage):
"""Root level of the storage tree."""
def __init__(self, distributed=False, comm=None):
super().__init__()
if distributed and comm is None:
logger.debug(
"No communicator set for distributed object, using `MPI.COMM_WORLD`"
)
comm = mpiutil.world
self._comm = comm
self._distributed = distributed
@property
def comm(self):
"""Reference to the MPI communicator."""
return self._comm
@property
def distributed(self):
return self._distributed
def __getitem__(self, key):
"""Implements Hierarchical path lookup."""
if "/" not in key:
return super().__getitem__(key)
# Format and split the path.
key = format_abs_path(key)
if key == "/":
return self
path_parts = key.split("/")[1:]
# Crawl the path.
out = self
for part in path_parts:
out = out[part]
return out
def __eq__(self, other):
if not isinstance(other, _StorageRoot):
return False
return (
_Storage.__eq__(self, other)
and self._comm == other._comm
and self._distributed == other._distributed
)
[docs]class MemAttrs(dict):
"""
In memory implementation of the :class:`h5py.AttributeManager`.
Currently just a normal dictionary.
"""
pass
class _MemObjMixin:
"""
Mixin represents the identity of an in-memory h5py-like object.
Implement a few attributes that all memh5 objects have, such as `parent`,
and `file`.
"""
@property
def _group_class(self):
return None
# Here I have to implement __new__ not __init__ since MemDiskGroup
# implements new and messes with parameters.
def __init__(self, storage_root=None, name=""):
self._storage_root = storage_root
if storage_root is not None and not posixpath.isabs(name):
# Should never happen, so this is mostly for debugging.
raise ValueError("Must be given an absolute path.")
self._name = name
@property
def name(self):
"""String giving the full path to this entry."""
return self._name
@property
def parent(self):
"""Parent :class:`MemGroup` that contains this group."""
parent_name, _ = posixpath.split(self.name)
return self._group_class._from_storage_root(self._storage_root, parent_name)
@property
def file(self):
"""Not a file at all but the top most :class:`MemGroup` of the tree."""
return self._group_class._from_storage_root(self._storage_root, "/")
def __eq__(self, other):
if hasattr(other, "_storage_root") and hasattr(other, "name"):
return (self._storage_root is other._storage_root) and (
self.name == other.name
)
return False
def __neq__(self, other):
return not self.__eq__(other)
class _BaseGroup(_MemObjMixin, Mapping):
"""
Implement the majority of the Group interface.
Subclasses must setup the underlying storage in thier constructors, as well
as implement `create_group` and `create_dataset`.
"""
@property
def _group_class(self):
return self.__class__
@property
def comm(self):
"""Reference to the MPI communicator."""
return getattr(self._storage_root, "comm", None)
@property
def distributed(self):
return getattr(self._storage_root, "distributed", False)
@property
def attrs(self):
"""
Attributes attached to this object.
Returns
-------
attrs : MemAttrs
"""
return self._get_storage().attrs
@classmethod
def _from_storage_root(cls, storage_root, name):
self = super(_BaseGroup, cls).__new__(cls)
super(_BaseGroup, self).__init__(storage_root, name)
return self
def _get_storage(self):
return self._storage_root[self.name]
def __getitem__(self, name):
"""Retrieve an object.
The *name* may be a relative or absolute path
"""
path = format_abs_path(posixpath.join(self.name, name))
out = self._storage_root[path]
# Cast the output.
if is_group(out) or isinstance(out, _Storage):
# Group like.
return self._group_class._from_storage_root(self._storage_root, path)
elif isinstance(out, MemDataset):
# Create back references for user facing mem datasets.
out = out.view()
out._storage_root = self._storage_root
return out
else:
# H5py dataset.
return out
def __delitem__(self, name):
"""Delete item from group."""
if name not in self:
raise KeyError("Key %s not present." % name)
path = posixpath.join(self.name, name)
parent_path, name = posixpath.split(path)
parent = self._storage_root[parent_path]
del parent[name]
def __len__(self):
return len(self._get_storage())
def __iter__(self):
keys = list(self._get_storage().keys())
for key in keys:
yield key
def require_dataset(self, name, shape, dtype, **kwargs):
"""Require a dataset to exist, create if it doesn't.
All arguments are passed through to create_dataset.
"""
try:
d = self[name]
except KeyError:
return self.create_dataset(name, shape=shape, dtype=dtype, **kwargs)
if is_group(d):
msg = "Entry '%s' exists and is not a Dataset." % name
raise TypeError(msg)
else:
return d
def require_group(self, name):
"""Require a group to exist, create if it doesn't."""
try:
g = self[name]
except KeyError:
return self.create_group(name)
if not is_group(g):
msg = "Entry '%s' exists and is not a Group." % name
raise TypeError(msg)
else:
return g
[docs]class MemGroup(_BaseGroup):
"""
In memory implementation of the :class:`h5py.Group`.
This class doubles as the memory implementation of :class:`h5py.File`,
object, since the distinction between a file and a group for in-memory data
is moot.
Parameters
----------
distributed : boolean, optional
Allow memh5 object to hold distributed datasets.
comm : MPI.Comm, optional
MPI Communicator to distributed over. If not set, use :obj:`MPI.COMM_WORLD`.
"""
def __init__(self, distributed=False, comm=None):
# Default constructor is only used to create the root group.
storage_root = _StorageRoot(distributed=distributed, comm=comm)
name = "/"
super().__init__(storage_root, name)
@property
def mode(self):
"""String indicating if group is readonly ("r") or read-write ("r+").
:class:`MemGroup` is always read-write.
"""
return "r+"
[docs] @classmethod
def from_group(cls, group):
"""Create a new instance by deep copying an existing group.
Agnostic as to whether the group to be copied is a `MemGroup` or an
`h5py.Group` (which includes `h5py.File` and `zarr.File` objects).
"""
if isinstance(group, MemGroup):
self = cls()
deep_group_copy(group, self)
return self
elif isinstance(group, (str, bytes)):
file_format = fileformats.guess_file_format(group)
return cls.from_file(group, file_format=file_format)
else:
raise RuntimeError(
f"Can't create an instance from type {type(group).__name__} "
f"(expected MemGroup, str or bytes)."
)
[docs] @classmethod
def from_hdf5(
cls,
filename,
distributed=False,
hints=True,
comm=None,
selections=None,
convert_dataset_strings=False,
convert_attribute_strings=True,
**kwargs,
):
"""Create a new instance by copying from an hdf5 group.
Any keyword arguments are passed on to the constructor for `h5py.File`.
Parameters
----------
filename : string
Name of file to load.
distributed : boolean, optional
Whether to load file in distributed mode.
hints : boolean, optional
If in distributed mode use hints to determine whether datasets are
distributed or not.
comm : MPI.Comm, optional
MPI communicator to distributed over. If :obj:`None` use
:obj:`MPI.COMM_WORLD`.
selections : dict
If this is not None, it should map dataset names to axis selections as valid
numpy indexes.
convert_attribute_strings : bool, optional
Try and convert attribute string types to unicode. Default is `True`.
convert_dataset_strings : bool, optional
Try and convert dataset string types to unicode. Default is `False`.
Returns
-------
group : memh5.Group
Root group of loaded file.
"""
return cls.from_file(
filename,
distributed,
hints,
comm,
selections,
convert_dataset_strings,
convert_attribute_strings,
file_format=fileformats.HDF5,
**kwargs,
)
[docs] @classmethod
def from_file(
cls,
filename,
distributed=False,
hints=True,
comm=None,
selections=None,
convert_dataset_strings=False,
convert_attribute_strings=True,
file_format=None,
**kwargs,
):
"""Create a new instance by copying from a file group.
Any keyword arguments are passed on to the constructor for `h5py.File` or
`zarr.File`.
Parameters
----------
filename : string
Name of file to load.
distributed : boolean, optional
Whether to load file in distributed mode.
hints : boolean, optional
If in distributed mode use hints to determine whether datasets are
distributed or not.
comm : MPI.Comm, optional
MPI communicator to distributed over. If :obj:`None` use
:obj:`MPI.COMM_WORLD`.
selections : dict
If this is not None, it should map dataset names to axis selections as valid
numpy indexes.
convert_attribute_strings : bool, optional
Try and convert attribute string types to unicode. Default is `True`.
convert_dataset_strings : bool, optional
Try and convert dataset string types to unicode. Default is `False`.
file_format : `fileformats.FileFormat`, optional
File format to use. Default is `None`, i.e. guess from the name.
Returns
-------
group : memh5.Group
Root group of loaded file.
"""
if comm is None:
comm = mpiutil.world
if file_format is None:
file_format = fileformats.guess_file_format(filename)
if comm is None:
if distributed:
warnings.warn(
"Cannot load file in distributed mode when there is no MPI"
"communicator!!"
)
distributed = False
if not distributed or not hints:
kwargs["mode"] = "r"
with file_format.open(filename, **kwargs) as f:
self = cls(distributed=distributed, comm=comm)
deep_group_copy(
f,
self,
selections=selections,
convert_attribute_strings=convert_attribute_strings,
convert_dataset_strings=convert_dataset_strings,
file_format=file_format,
)
else:
self = _distributed_group_from_file(
filename,
comm=comm,
hints=hints,
selections=selections,
convert_attribute_strings=convert_attribute_strings,
convert_dataset_strings=convert_dataset_strings,
file_format=file_format,
)
return self
[docs] def to_hdf5(
self,
filename,
mode="w",
hints=True,
convert_attribute_strings=True,
convert_dataset_strings=False,
**kwargs,
):
"""Replicate object on disk in an hdf5 file.
Any keyword arguments are passed on to the constructor for `h5py.File`.
Parameters
----------
filename : str
File to save into.
hints : boolean, optional
Whether to write hints into the file that described whether datasets
are distributed, or not.
convert_attribute_strings : bool, optional
Try and convert attribute string types to a unicode type that HDF5
understands. Default is `True`.
convert_dataset_strings : bool, optional
Try and convert dataset string types to bytestrings. Default is `False`.
"""
self.to_file(
filename,
mode,
hints,
convert_attribute_strings,
convert_dataset_strings,
fileformats.HDF5,
**kwargs,
)
[docs] def to_file(
self,
filename,
mode="w",
hints=True,
convert_attribute_strings=True,
convert_dataset_strings=False,
file_format=None,
**kwargs,
):
"""Replicate object on disk in an hdf5 or zarr file.
Any keyword arguments are passed on to the constructor for `h5py.File` or `zarr.File`.
Parameters
----------
filename : str
File to save into.
hints : boolean, optional
Whether to write hints into the file that described whether datasets
are distributed, or not.
convert_attribute_strings : bool, optional
Try and convert attribute string types to a unicode type that HDF5
understands. Default is `True`.
convert_dataset_strings : bool, optional
Try and convert dataset string types to bytestrings. Default is `False`.
file_format : `fileformats.FileFormat`, optional
File format to use. Default is `None`, i.e. guess from the name.
"""
if file_format is None:
file_format = fileformats.guess_file_format(filename)
if not self.distributed:
with file_format.open(filename, mode, **kwargs) as f:
deep_group_copy(
self,
f,
convert_attribute_strings=convert_attribute_strings,
convert_dataset_strings=convert_dataset_strings,
file_format=file_format,
)
else:
_distributed_group_to_file(
self,
filename,
mode,
convert_attribute_strings=convert_attribute_strings,
convert_dataset_strings=convert_dataset_strings,
file_format=file_format,
)
[docs] def create_group(self, name):
"""Create a group within the storage tree."""
path = format_abs_path(posixpath.join(self.name, name))
try:
self[name]
except KeyError:
pass
else:
raise ValueError("Entry %s exists." % name)
# If distributed, synchronise to ensure that we create group collectively
if self.distributed:
self.comm.Barrier()
parent_name = "/"
path_parts = path.split("/")
# In this loop, exception guaranteed not to be raised on first
# iteration, since we know that `parent_name + ''` exists.
for part in path_parts:
try:
parent_name = posixpath.join(parent_name, part)
parent_storage = self._storage_root[parent_name]
except KeyError:
parent_storage[part] = _Storage()
parent_name = posixpath.join(parent_name, part)
parent_storage = parent_storage[part]
if not isinstance(parent_storage, _Storage):
raise ValueError("Entry %s exists and is not a Group." % parent_name)
# Underlying storage has been created. Return the group object.
return self[name]
[docs] def create_dataset(
self,
name,
shape=None,
dtype=None,
data=None,
distributed=False,
distributed_axis=None,
chunks=None,
compression=None,
compression_opts=None,
**kwargs,
):
"""Create a new dataset.
Parameters
----------
name : string
Dataset name.
shape : tuple, optional
Shape tuple. This gives the global shape for a distributed dataset.
dtype : np.dtype, optional
Numpy datatype of the dataset.
data : np.ndarray or MPIArray, optional
Data array to initialise from. Uses a view of the original where possible.
distributed : boolean, optional
Create a distributed dataset or not.
distributed_axis : int, optional
Axis to distribute the data over. If specified with initialisation
data this will cause create a copy with the correct distribution.
compression : str or int
Name or identifier of HDF5 or Zarr compression filter.
compression_opts
See HDF5 and Zarr documentation for compression filters.
Compression options for the dataset.
Returns
-------
dset : memh5.MemDataset
"""
parent_name, name = posixpath.split(posixpath.join(self.name, name))
parent_name = format_abs_path(parent_name)
parent_storage = self.require_group(parent_name)._get_storage()
# If distributed, synchronise to ensure that we create group collectively
if self.distributed:
self.comm.Barrier()
if self.comm is None:
if distributed:
warnings.warn(
"Cannot create distributed dataset when there is no MPI communicator!!"
)
distributed = False
if kwargs:
msg = (
"No extra keyword arguments accepted, this is not an hdf5"
" object but a memory object mocked up to look like one."
)
raise TypeError(msg)
# XXX In future could accept extra arguments and use them if
# writing to disk.
# If data is set, copy out params from it.
if data is not None:
if shape is None:
shape = data.shape
if dtype is None:
dtype = data.dtype
# Otherwise shape is required.
if shape is None:
raise ValueError("shape must be provided.")
# Default dtype is float.
if dtype is None:
dtype = np.float64
# Convert to numpy dtype.
dtype = np.dtype(dtype)
# Create distributed dataset if data is an MPIArray
if isinstance(data, mpiarray.MPIArray) and data.comm is not None:
distributed = True
# Enforce that distributed datasets can only exist in distributed memh5 groups.
if not self.distributed and distributed:
raise RuntimeError(
"Cannot create a distributed dataset in a non-distributed group."
)
# Set the properties of the new dataset
full_path_name = posixpath.join(parent_name, name)
storage_root = None # Do no store the storage root. Creates cyclic references.
# If data is set (and consistent with shape/type), initialise the numpy array from it.
if (
data is not None
and shape == data.shape
and dtype is data.dtype
and hasattr(data, "view")
):
# Create parallel array if requested
if distributed:
# Ensure we are creating from an MPIArray
if not isinstance(data, mpiarray.MPIArray):
raise TypeError(
"Can only create distributed dataset from MPIArray."
)
# Ensure that we are distributing over the same communicator
if data.comm != self.comm:
raise RuntimeError(
"MPI communicator of array must match that of memh5 group."
)
# If the distributed_axis is specified ensure the data is distributed along it.
if distributed_axis is not None:
data = data.redistribute(axis=distributed_axis)
# Create distributed dataset
new_dataset = MemDatasetDistributed.from_mpi_array(
data,
name=full_path_name,
storage_root=storage_root,
chunks=chunks,
compression=compression,
compression_opts=compression_opts,
)
else:
# Create common dataset
new_dataset = MemDatasetCommon.from_numpy_array(
data,
name=full_path_name,
storage_root=storage_root,
chunks=chunks,
compression=compression,
compression_opts=compression_opts,
)
# Otherwise create an empty array and copy into it (if needed)
else:
# Just copy the data.
if distributed:
# Ensure that distributed_axis is set.
if distributed_axis is None:
raise RuntimeError(
"Distributed axis must be specified when creating dataset."
)
new_dataset = MemDatasetDistributed(
shape=shape,
dtype=dtype,
axis=distributed_axis,
comm=self.comm,
name=full_path_name,
storage_root=storage_root,
chunks=chunks,
compression=compression,
compression_opts=compression_opts,
)
else:
new_dataset = MemDatasetCommon(
shape=shape,
dtype=dtype,
name=full_path_name,
storage_root=storage_root,
chunks=chunks,
compression=compression,
compression_opts=compression_opts,
)
if data is not None:
new_dataset[:] = data[:]
# Add new dataset to group
parent_storage[name] = new_dataset
# Ensure __getitem__ is called.
return self[full_path_name]
[docs] def dataset_common_to_distributed(self, name, distributed_axis=0):
"""Convert a common dataset to a distributed one.
Parameters
----------
name : string
Dataset name.
distributed_axis : int, optional
Axis to distribute the data over.
Returns
-------
dset : memh5.MemDatasetDistributed
"""
dset = self[name]
if dset.distributed:
warnings.warn(
"%s is already a distributed dataset, redistribute it along the required axis %d"
% (name, distributed_axis)
)
dset.redistribute(distributed_axis)
return dset
dset_shape = dset.shape
dset_type = dset.dtype
dset_chunks = dset.chunks
dset_compression = dset.compression
dset_compression_opts = dset.compression_opts
dist_len = dset_shape[distributed_axis]
_, sd, ed = mpiutil.split_local(dist_len, comm=self.comm)
md = mpiarray.MPIArray(
dset_shape, axis=distributed_axis, comm=self.comm, dtype=dset_type
)
md.local_array[:] = dset[sd:ed].copy()
attr_dict = {} # temporarily save attrs of this dataset
copyattrs(dset.attrs, attr_dict)
del dset
new_dset = self.create_dataset(
name,
shape=dset_shape,
dtype=dset_type,
chunks=dset_chunks,
compression=dset_compression,
compression_opts=dset_compression_opts,
data=md,
distributed=True,
distributed_axis=distributed_axis,
)
copyattrs(attr_dict, new_dset.attrs)
return new_dset
[docs] def dataset_distributed_to_common(self, name):
"""Convert a distributed dataset to a common one.
Parameters
----------
name : string
Dataset name.
Returns
-------
dset : memh5.MemDatasetCommon
"""
dset = self[name]
if dset.common:
warnings.warn("%s is already a common dataset, no need to convert" % name)
return dset
dset_shape = dset.shape
dset_type = dset.dtype
dset_chunks = dset.chunks
dset_compression = dset.compression
dset_compression_opts = dset.compression_opts
global_array = np.zeros(dset_shape, dtype=dset_type)
local_start = dset.local_offset
nproc = 1 if self.comm is None else self.comm.size
# gather local distributed dataset to a global array for all procs
for rank in range(nproc):
mpiutil.gather_local(
global_array, dset.local_data, local_start, root=rank, comm=self.comm
)
attr_dict = {} # temporarily save attrs of this dataset
copyattrs(dset.attrs, attr_dict)
del dset
new_dset = self.create_dataset(
name,
data=global_array,
shape=dset_shape,
dtype=dset_type,
chunks=dset_chunks,
compression=dset_compression,
compression_opts=dset_compression_opts,
)
copyattrs(attr_dict, new_dset.attrs)
return new_dset
[docs]class MemDataset(_MemObjMixin):
"""Base class for an in memory implementation of :class:`h5py.Dataset`.
This is only an abstract base class. Use :class:`MemDatasetCommon` or
:class:`MemDatasetDistributed`.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._attrs = MemAttrs()
# Must be implemented by child classes
self._data = NotImplemented
@property
def _group_class(self):
return MemGroup
def view(self):
cls = self.__class__
out = cls.__new__(cls)
super(MemDataset, out).__init__(name=self.name, storage_root=self._storage_root)
out._attrs = self._attrs
out._data = self._data
out.chunks = self.chunks
out.compression = self.compression
out.compression_opts = self.compression_opts
return out
@property
def attrs(self):
"""Attributes attached to this object.
Returns
-------
attrs : MemAttrs
"""
return self._attrs
def resize(self):
# h5py datasets reshape() is different from numpy reshape.
msg = "Dataset reshaping not allowed. Perhapse make an new array view."
raise NotImplementedError(msg)
@property
def shape(self):
"""
Shape of the dataset.
Not implemented in base class.
"""
raise NotImplementedError("Not implemented in base class.")
@property
def dtype(self):
"""
numpy data type of the dataset.
Not implemented in base class.
"""
raise NotImplementedError("Not implemented in base class.")
@property
def chunks(self):
"""
Chunk shape of the dataset.
Not implemented in base class.
"""
raise NotImplementedError("Not implemented in base class.")
@property
def compression(self):
"""
Name or identifier of HDF5 compression filter for the dataset.
Not implemented in base class.
"""
raise NotImplementedError("Not implemented in base class.")
@property
def compression_opts(self):
"""
Compression options for the dataset.
See HDF5 documentation for compression filters.
Not implemented in base class.
"""
raise NotImplementedError("Not implemented in base class.")
def __getitem__(self, obj):
raise NotImplementedError("Not implemented in base class.")
def __setitem__(self, obj, val):
raise NotImplementedError("Not implemented in base class.")
def __len__(self):
raise NotImplementedError("Not implemented in base class.")
def __eq__(self, other):
if not isinstance(other, MemDataset):
return False
return _MemObjMixin.__eq__(self, other) and self._attrs == other._attrs
[docs]class MemDatasetCommon(MemDataset):
"""In memory implementation of :class:`h5py.Dataset`.
Inherits from :class:`MemDataset`. Encapsulates a numpy array mocked up to
look like an hdf5 dataset. Similar to h5py datasets, this implements
slicing like a numpy array but as it is not actually a many operations
won't work (e.g. ufuncs).
Parameters
----------
shape : tuple
Shape of array to initialise.
dtype : numpy dtype
Type of array to create.
"""
def __init__(
self,
shape,
dtype,
chunks=None,
compression=None,
compression_opts=None,
**kwargs,
):
super().__init__(**kwargs)
self._data = np.zeros(shape, dtype)
self._chunks = chunks
self._compression = compression
self._compression_opts = compression_opts
[docs] @classmethod
def from_numpy_array(
cls, data, chunks=None, compression=None, compression_opts=None, **kwargs
):
"""Initialise from a numpy array.
Parameters
----------
data : np.ndarray
Array to initialise from.
compression : str or int
Name or identifier of HDF5 or Zarr compression filter.
compression_opts
See HDF5 and Zarr documentation for compression filters.
Compression options for the dataset.
Returns
-------
dset : MemDatasetCommon
Dataset encapsulating the numpy array.
"""
if not isinstance(data, np.ndarray):
raise TypeError("Object must be a numpy array (or subclass).")
self = cls.__new__(cls)
super(MemDatasetCommon, self).__init__(**kwargs)
self._data = data
self._chunks = chunks
self._compression = compression
self._compression_opts = compression_opts
return self
@property
def comm(self):
"""Reference to the MPI communicator."""
return None
@property
def common(self):
return True
@property
def distributed(self):
return False
@property
def data(self):
return self._data
@property
def local_data(self):
return self._data
@property
def shape(self):
return self._data.shape
@property
def dtype(self):
return self._data.dtype
@property
def chunks(self):
return self._chunks
@chunks.setter
def chunks(self, val):
if val is None:
chunks = val
elif len(val) != len(self.shape):
raise ValueError(
f"Chunk size {val} is not compatible with dataset shape {self.shape}."
)
else:
chunks = ()
for i, l in enumerate(self.shape):
chunks += (min(val[i], l),)
self._chunks = chunks
@property
def compression(self):
return self._compression
@compression.setter
def compression(self, val):
self._compression = val
@property
def compression_opts(self):
return self._compression_opts
@compression_opts.setter
def compression_opts(self, val):
self._compression_opts = val
def __getitem__(self, obj):
return self._data[obj]
def __setitem__(self, obj, val):
self._data[obj] = val
def __len__(self):
return len(self._data)
def __iter__(self):
# This needs to be implemented to stop craziness happening when doing
# np.array(dset)
return self._data.__iter__()
def __repr__(self):
return '<memh5 common dataset %s: shape %s, type "%s">' % (
repr(self._name),
repr(self.shape),
repr(self.dtype),
)
def __eq__(self, other):
if not isinstance(other, MemDatasetCommon):
return False
return (
MemDataset.__eq__(self, other)
and self._data == other._data
and self._chunks == other._chunks
and self._compression == other._compression
and self._compression_opts == other._compression_opts
)
[docs]class MemDatasetDistributed(MemDataset):
"""Parallel, in-memory implementation of :class:`h5py.Dataset`.
Inherits from :class:`MemDataset`. Encapsulates an :class:`MPIArray` mocked
up to look like an `h5py` dataset. Similar to h5py datasets, this
implements slicing like a numpy array but as it is not actually a many
operations won't work (e.g. ufuncs).
Parameters
----------
shape : tuple
Shape of array to initialise. This is the *global* shape.
dtype : numpy dtype
Type of array to create.
axis : int, optional
Index of axis to distribute the array over.
comm : MPI.Comm, optional
MPI communicator to distribute over. If :obj:`None` use
:obj:`MPI.COMM_WORLD`.
"""
def __init__(
self,
shape,
dtype,
axis=0,
comm=None,
chunks=None,
compression=None,
compression_opts=None,
**kwargs,
):
super().__init__(**kwargs)
self._data = mpiarray.MPIArray(shape, axis=axis, comm=comm, dtype=dtype)
self._chunks = chunks
self._compression = compression
self._compression_opts = compression_opts
@classmethod
def from_mpi_array(
cls, data, chunks=None, compression=None, compression_opts=None, **kwargs
):
if not isinstance(data, mpiarray.MPIArray):
raise TypeError("Object must be a numpy array (or subclass).")
self = cls.__new__(cls)
super(MemDatasetDistributed, self).__init__(**kwargs)
self._data = data
self._chunks = chunks
self._compression = compression
self._compression_opts = compression_opts
return self
@property
def common(self):
return False
@property
def distributed(self):
return True
@property
def data(self):
return self._data
@property
def local_data(self):
return self._data.local_array
@property
def shape(self):
return self.global_shape
@property
def global_shape(self):
"""
Global shape of the distributed dataset.
The shape of the whole array that is distributed between multiple nodes.
"""
return self._data.global_shape
@property
def local_shape(self):
"""
Local shape of the distributed dataset.
The shape of the part of the distributed array that is allocated to *this* node.
"""
return self._data.local_shape
@property
def local_offset(self):
return self._data.local_offset
@property
def dtype(self):
"""The numpy data type of the dataset"""
return self._data.dtype
@property
def chunks(self):
"""The chunk shape of the dataset."""
return self._chunks
@chunks.setter
def chunks(self, val):
if val is None:
chunks = val
elif len(val) != len(self.shape):
raise ValueError(
f"Chunk size {val} is not compatible with dataset shape {self.shape}."
)
else:
chunks = ()
for i, l in enumerate(self.shape):
chunks += (min(val[i], l),)
self._chunks = chunks
@property
def compression(self):
return self._compression
@compression.setter
def compression(self, val):
self._compression = val
@property
def compression_opts(self):
return self._compression_opts
@compression_opts.setter
def compression_opts(self, val):
self._compression_opts = val
@property
def distributed_axis(self):
"""The index of the axis over which this dataset is distributed."""
return self._data.axis
@property
def comm(self):
"""Reference to the MPI communicator."""
return self._data._comm
[docs] def redistribute(self, axis):
"""Change the axis that the dataset is distributed over.
Parameters
----------
axis : integer
Axis to distribute over.
"""
self._data = self._data.redistribute(axis=axis)
def __getitem__(self, obj):
return self._data.global_slice[obj]
def __setitem__(self, obj, val):
self._data.global_slice[obj] = val
def __iter__(self):
# This needs to be implemented to stop craziness happening when doing
# np.array(dset)
return self._data.__iter__()
def __len__(self):
return len(self._data)
def __repr__(self):
return (
'<memh5 distributed dataset %s: global_shape %s, dist_axis %s, type "%s">'
% (
repr(self._name),
repr(self.global_shape),
repr(self.distributed_axis),
repr(self.dtype),
)
)
def __eq__(self, other):
if not isinstance(other, MemDatasetDistributed):
return False
return (
MemDataset.__eq__(self, other)
and self._data == other._data
and self._chunks == other._chunks
and self._compression == other._compression
and self._compression_opts == other._compression_opts
)
# Higher Level Data Containers
# ----------------------------
[docs]class MemDiskGroup(_BaseGroup):
"""Container whose data may either be stored on disk or in memory.
This container is intended to have the same basic API :class:`h5py.Group`
and :class:`MemGroup` but whose underlying data could live either on disk
or in memory.
Aside from providing a few convenience methods, this class isn't that
useful by itself. It is almost as easy to use :class:`h5py.Group`
or :class:`MemGroup` directly. Where it becomes more useful is for creating
more specialized data containers which can subclass this class. A basic
but useful example is provided in :class:`BasicCont`.
This class also supports the same distributed features as :class:`MemGroup`,
but only when wrapping that class. Attempting to create a distributed object
wrapping a :class:`h5py.File` object will raise an exception. For similar
reasons, :meth:`MemDiskGroup.to_disk` will not work, however,
:meth:`MemDiskGroup.save` will work fine.
Parameters
----------
data_group : :class:`h5py.Group`, :class:`MemGroup` or string, optional
Underlying :mod:`h5py` like data container where data will be stored.
If a string, open a h5py file with that name. If not
provided a new :class:`MemGroup` instance will be created.
distributed : boolean, optional
Allow the container to hold distributed datasets.
comm : MPI.Comm, optional
MPI Communicator to distributed over. If not set, use :obj:`MPI.COMM_WORLD`.
detect_subclass: boolean, optional
If *data_group* is specified, whether to inspect for a
'__memh5_subclass' attribute which specifies a subclass to return.
file_format : `fileformats.FileFormat`
File format to use. File format will be guessed if not supplied. Default `None`.
"""
def __init__(self, data_group=None, distributed=False, comm=None, file_format=None):
toclose = False
if comm is None:
comm = mpiutil.world
if distributed and comm is None:
warnings.warn(
"Cannot create distributed MemDiskGroup when there is no MPI communicator!!"
)
distributed = False
# If data group is not set, initialise a new MemGroup
if data_group is None:
data_group = MemGroup(distributed=distributed, comm=comm)
# If it is a MemDiskGroup then initialise a shallow copy
elif isinstance(data_group, MemDiskGroup):
data_group = data_group._storage_root
# Otherwise, presume it is an HDF5 Group-like object (which includes
# MemGroup and h5py.Group).
else:
data_group, toclose = get_file(
data_group, mode="a", file_format=file_format
)
# Zarr arrays are automatically flushed and closed
toclose = False if file_format == fileformats.HDF5 else toclose
# Check the distribution settings
if distributed:
if isinstance(data_group, h5py.Group) or (
zarr_available and isinstance(data_group, zarr.Group)
):
raise ValueError(
"Distributed MemDiskGroup cannot be created around h5py or zarr objects."
)
# Check parallel distribution is the same
if not data_group.distributed:
raise ValueError(
"Cannot create MemDiskGroup with different distributed setting to MemGroup to wrap."
)
# Check parallel communicator is the same
if comm and comm != data_group.comm:
raise ValueError(
"Cannot create MemDiskGroup with different MPI communicator to MemGroup to wrap."
)
self._toclose = toclose
super().__init__(storage_root=data_group, name=data_group.name)
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
@classmethod
def _detect_subclass_path(cls, group):
"""Determine the true class of `group` from its attributes (otherwise `cls`)."""
return group.attrs.get("__memh5_subclass", None)
@classmethod
def _resolve_subclass(cls, clspath):
"""Validate and return the subclass corresponding to classpath."""
if clspath is None:
return cls
# Try and get a reference to the requested class (warn if we cannot find it)
try:
new_cls = misc.import_class(clspath)
except (ImportError, KeyError):
warnings.warn("Could not import memh5 subclass %s" % clspath)
# Check that it is a subclass of MemDiskGroup
if not issubclass(new_cls, MemDiskGroup):
raise RuntimeError(
"Requested type (%s) is not an subclass of memh5.MemDiskGroup."
% clspath
)
return new_cls
[docs] @classmethod
def from_group(cls, data_group=None, detect_subclass=True):
"""Create data object from a given group.
This wraps the given group object, optionally returning the correct
subclass. This does *not* call `__init__` on the subclass when
this happens.
Parameters
----------
data_group : :class:`h5py.Group`, :class:`MemGroup` or string, optional
:mod:`h5py` like data containerto wrap.
detect_subclass: boolean, optional
If *data_group* is specified, whether to inspect for a
'__memh5_subclass' attribute which specifies a subclass to return.
Returns
-------
grp : MemDiskGroup
"""
if detect_subclass:
new_cls = cls._resolve_subclass(cls._detect_subclass_path(data_group))
else:
new_cls = cls
self = new_cls.__new__(new_cls)
MemDiskGroup.__init__(self, data_group=data_group)
return self
@property
def _data(self):
"""_data was renamed to _storage_root. This added for compatibility."""
return self._storage_root
def _finish_setup(self):
"""Finish the class setup *after* importing from a file."""
pass
[docs] def close(self):
"""Closes file if on disk if file was opened on initialization."""
if self.ondisk and hasattr(self, "_toclose") and self._toclose:
self._storage_root.close()
if hasattr(self, "_lockfile") and (self.comm is None or self.comm.rank is None):
fileformats.remove_file_or_dir(self._lockfile)
def __getitem__(self, name):
"""Retrieve an object.
The *name* may be a relative or absolute path
"""
value = super().__getitem__(name)
path = value.name
if is_group(value):
if not self.group_name_allowed(path):
msg = "Access to group %s not allowed." % path
raise KeyError(msg)
else:
if not self.dataset_name_allowed(path):
msg = "Access to dataset %s not allowed." % path
raise KeyError(msg)
return value
def __len__(self):
n = 0
for _ in self:
n += 1
return n
def __iter__(self):
for key in super().__iter__():
try:
_ = self[key]
except KeyError:
# This key name is not allowed (see __getitem__)
continue
yield key
@property
def ondisk(self):
"""Whether the data is stored on disk as opposed to in memory."""
return hasattr(self, "_storage_root") and (
isinstance(self._storage_root, h5py.File)
or (zarr_available and isinstance(self._storage_root, zarr.Group))
)
@classmethod
def _make_selections(cls, sel_args):
"""
Overwrite this method in your subclass if you want to implement downselection
of axes (e.g. when loading a container from an HDF5 file).
Parameters
----------
sel_args : dict
Should contain valid numpy indexes as values and axis names (str) as keys.
Returns
-------
dict
Mapping of dataset names to numpy indexes for downselection of the data.
May include multiple layers of dicts to map the dataset tree
"""
return None
# For creating new instances. #
[docs] @classmethod
def from_file(
cls,
file_,
ondisk=False,
distributed=False,
comm=None,
detect_subclass=True,
convert_attribute_strings=None,
convert_dataset_strings=None,
file_format=None,
**kwargs,
):
"""Create data object from analysis hdf5 file, store in memory or on disk.
If *ondisk* is True, do not load into memory but store data in h5py objects
that remain associated with the file on disk. This is almost identical
to the default constructor, when providing a file as the *data_group*
object, however provides more flexibility when opening the file through
the additional keyword arguments.
This does *not* call `__init__` on the subclass when restoring.
Parameters
----------
file_ : string or :class:`h5py.Group` object
File with the hdf5 data. File must be compatible with memh5 objects.
ondisk : bool
Whether the data should be stored in-place in *file_* or should be copied
into memory.
distributed : boolean, optional
Allow the container to hold distributed datasets.
comm : MPI.Comm, optional
MPI Communicator to distributed over. If not set, use :obj:`MPI.COMM_WORLD`.
detect_subclass: boolean, optional
If *data_group* is specified, whether to inspect for a
'__memh5_subclass' attribute which specifies a subclass to return.
convert_attribute_strings : bool, optional
Try and convert attribute string types to unicode. If not specified, look
up the name as a class attribute to find a default, and otherwise use
`True`.
convert_dataset_strings : bool, optional
Try and convert dataset string types to unicode. If not specified, look
up the name as a class attribute to find a default, and otherwise use
`False`.
<axis_name>_sel : list or slice
Axis selections can be given to only read a subset of the containers. A
slice can be given, or a list of specific array indices for that axis.
file_format : `fileformats.FileFormat`
File format to use. Default is `None`, i.e. guess from file name.
**kwargs : any other arguments
Any additional keyword arguments are passed to :class:`h5py.File`'s
constructor if *file_* is a filename and silently ignored otherwise.
"""
if file_format is None and not is_group(file_):
file_format = fileformats.guess_file_format(file_)
if file_format == fileformats.Zarr and not zarr_available:
raise RuntimeError("Unable to read zarr file, please install zarr.")
# Get a value for the conversion parameters, looking up on the class type if
# not supplied
if convert_attribute_strings is None:
convert_attribute_strings = getattr(cls, "convert_attribute_strings", True)
if convert_dataset_strings is None:
convert_dataset_strings = getattr(cls, "convert_dataset_strings", False)
lockfile = None
if not ondisk:
if (zarr_available and isinstance(file_, zarr.Group)) or isinstance(
file_, h5py.Group
):
file_ = file_.filename
if "mode" in kwargs:
del kwargs["mode"]
# Look for *_sel parameters in kwargs, collect and remove them from kwargs
sel_args = {}
for a in list(kwargs):
if a.endswith("_sel"):
sel_args[a[:-4]] = kwargs.pop(a)
# Axis selections won't warn if called from a baseclass without detecting
# the subclass
if sel_args and not detect_subclass and cls in [MemDiskGroup, BasicCont]:
warnings.warn(
"Cannot process axis selections as subclass is not known."
)
# Map selections to datasets
sel = cls._make_selections(sel_args)
data = MemGroup.from_file(
file_,
distributed=distributed,
comm=comm,
selections=sel,
convert_attribute_strings=convert_attribute_strings,
convert_dataset_strings=convert_dataset_strings,
file_format=file_format,
**kwargs,
)
toclose = False
else:
# Again, a compatibility hack
if is_group(file_):
data = file_
toclose = False
else:
kwargs.setdefault("mode", "a")
if distributed and file_format == fileformats.Zarr:
lockfile = f"{file_}.sync"
kwargs["synchronizer"] = zarr.ProcessSynchronizer(lockfile)
data = file_format.open(file_, **kwargs)
toclose = file_format == fileformats.HDF5
# Here we explicitly avoid calling __init__ on any derived class. Like
# with a pickle we want to restore the saved state only.
self = cls.from_group(data_group=data, detect_subclass=detect_subclass)
# ... skip the class initialisation, and use a special method
self._finish_setup()
self._toclose = toclose
if lockfile is not None:
self._comm = comm
self._lockfile = lockfile
return self
# Methods for manipulating and building the class. #
[docs] @staticmethod
def group_name_allowed(name):
"""Used by subclasses to restrict creation of and access to groups.
This method is called by :meth:`create_group`, :meth:`require_group`,
and :meth:`__getitem__` to check that the supplied group name is
allowed.
The idea is that subclasses that want to specialize and restrict the
layout of the data container can implement this method instead of
re-implementing the above mentioned methods.
Parameters
----------
name: string
Absolute path to proposed group.
Returns
-------
allowed : bool
``True``
"""
return True
[docs] @staticmethod
def dataset_name_allowed(name):
"""Used by subclasses to restrict creation of and access to datasets.
This method is called by :meth:`create_dataset`,
:meth:`require_dataset`, and :meth:`__getitem__` to check that the
supplied group name is allowed.
The idea is that subclasses that want to specialize and restrict the
layout of the data container can implement this method instead of
re-implementing the above mentioned methods.
Parameters
----------
name: string
Absolute path to proposed dataset.
Returns
-------
allowed : bool
``True``
"""
return True
[docs] def create_dataset(self, name, *args, **kwargs):
"""Create and return a new dataset.
All parameters are passed through to the :meth:`create_dataset` method of
the underlying storage, whether it be an :class:`h5py.Group` or a
:class:`MemGroup`.
"""
path = posixpath.join(self.name, name)
if not self.dataset_name_allowed(path):
msg = "Dataset name %s not allowed." % path
raise ValueError(msg)
new_dataset = self._data.create_dataset(path, *args, **kwargs)
return new_dataset
[docs] def dataset_common_to_distributed(self, name, distributed_axis=0):
"""Convert a common dataset to a distributed one.
Parameters
----------
name : string
Dataset name.
distributed_axis : int, optional
Axis to distribute the data over.
Returns
-------
dset : memh5.MemDatasetDistributed
"""
if isinstance(self._data, MemGroup):
return self._data.dataset_common_to_distributed(name, distributed_axis)
else:
raise RuntimeError(
"Can not convert a h5py or zarr dataset %s to distributed" % name
)
[docs] def dataset_distributed_to_common(self, name):
"""Convert a distributed dataset to a common one.
Parameters
----------
name : string
Dataset name.
Returns
-------
dset : memh5.MemDatasetCommon
"""
if isinstance(self._data, MemGroup):
return self._data.dataset_distributed_to_common(name)
else:
raise RuntimeError(
"Can not convert a h5py or zarr dataset %s to distributed" % name
)
[docs] def create_group(self, name):
"""Create and return a new group."""
path = posixpath.join(self.name, name)
if not self.group_name_allowed(path):
msg = "Group name %s not allowed." % path
raise ValueError(msg)
self._data.create_group(path)
return self._group_class._from_storage_root(self._data, path)
[docs] def to_memory(self):
"""Return a version of this data that lives in memory."""
if isinstance(self._data, MemGroup):
return self
else:
return self.__class__.from_file(self._data)
[docs] def to_disk(self, filename, file_format=fileformats.HDF5, **kwargs):
"""
Return a version of this data that lives on disk.
Parameters
----------
filename : str
File name.
file_format : `fileformats.FileFormat`
File format to use. Default `fileformats.HDF5`.
**kwargs
Keyword arguments passed through to the file creating, e.g. `mode`.
Returns
-------
Instance of this data object that is written to disk.
"""
if not isinstance(self._data, MemGroup):
msg = "This data already lives on disk. Copying to new file anyway."
warnings.warn(msg)
elif self._data.distributed:
raise NotImplementedError(
"Cannot run to_disk on a distributed object. Try running save instead."
)
self.save(filename, file_format=file_format)
return self.__class__.from_file(
filename, ondisk=True, file_format=file_format, **kwargs
)
[docs] def flush(self):
"""Flush the buffers of the underlying hdf5 file if on disk."""
if self.ondisk:
self._data.flush()
[docs] def save(
self,
filename,
convert_attribute_strings=None,
convert_dataset_strings=None,
file_format=fileformats.HDF5,
**kwargs,
):
"""Save data to hdf5/zarr file.
Parameters
----------
filename : str
Name of the file to save into.
convert_attribute_strings : bool, optional
Try and convert attribute string types to a format HDF5 understands. If
not specified, look up the name as a class attribute to find a default,
and otherwise use `True`.
convert_dataset_strings : bool, optional
Try and convert dataset string types to bytestrings before saving to
HDF5. If not specified, look up the name as a class attribute to find a
default, and otherwise use `False`.
file_format : `fileformats.FileFormat`
File format to use. Default `fileformats.HDF5`.
**kwargs
Keyword arguments passed through to the file creating, e.g. `mode`.
"""
if file_format == fileformats.Zarr and not zarr_available:
raise RuntimeError("Unable to write to zarr file, please install zarr.")
# Get a value for the conversion parameters, looking up on the instance if
# not supplied
if convert_attribute_strings is None:
convert_attribute_strings = getattr(self, "convert_attribute_strings", True)
if convert_dataset_strings is None:
convert_dataset_strings = getattr(self, "convert_dataset_strings", False)
# Write out a hint as to what the class of this object is, do this by
# inserting it into the attributes before saving out.
if "__memh5_subclass" not in self.attrs:
clspath = self.__class__.__module__ + "." + self.__class__.__name__
self.attrs["__memh5_subclass"] = clspath
if (zarr_available and isinstance(self._data, zarr.Group)) or isinstance(
self._data, h5py.File
):
with file_format.open(filename, **kwargs) as f:
deep_group_copy(self._data, f)
else:
self._data.to_file(
filename,
convert_attribute_strings=convert_attribute_strings,
convert_dataset_strings=convert_dataset_strings,
file_format=file_format,
**kwargs,
)
[docs]class BasicCont(MemDiskGroup):
"""Basic high level data container.
Inherits from :class:`MemDiskGroup`.
Basic one-level data container that allows any number of datasets in the
root group but no nesting. Data history tracking (in
:attr:`BasicCont.history`) and array axis interpretation (in
:attr:`BasicCont.index_map`) is also provided.
This container is intended to be an example of how a high level container,
with a strictly controlled data layout can be implemented by subclassing
:class:`MemDiskGroup`.
Parameters
----------
Parameters are passed through to the base class constructor.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Initialize new groups only if writable.
if self._data.file.mode == "r+":
self._data.require_group("history")
self._data.require_group("index_map")
self._data.require_group("reverse_map")
if "order" not in self._data["history"].attrs:
self._data["history"].attrs["order"] = "[]"
@property
def history(self):
"""Stores the analysis history for this data.
Do not try to add a new entry by assigning to an element of this
property. Use :meth:`~BasicCont.add_history` instead.
Returns
-------
history : read only dictionary
Each entry is a dictionary containing metadata about that stage in
history. There is also an 'order' entry which specifies how the
other entries are ordered in time.
"""
out = {}
for name, value in self._data["history"].items():
warnings.warn(
"memh5 dataset {} is using a deprecated history format. Read support of "
"files using this format will be continued for now, but you should "
"update the instance of caput that wrote this file.".format(self.name),
DeprecationWarning,
)
out[name] = value.attrs
for name, value in self._data["history"].attrs.items():
out[name] = value
# TODO: this seems like a trememndous hack. I've changed it to a safer version of
# eval, but this should probably be removed
out["order"] = literal_eval(
bytes_to_unicode(self._data["history"].attrs["order"])
)
return ro_dict(out)
@property
def index_map(self):
"""Stores representions of the axes of datasets.
The index map contains arrays used to interpret the axes of the
various datasets. For instance, the 'time', 'prod' and 'freq' axes of
the visibilities are described in the index map.
Do not try to add a new index_map by assigning to an item of this
property. Use :meth:`~BasicCont.create_index_map` instead.
Returns
-------
index_map : read only dictionary
Entries are 1D arrays used to interpret the axes of datasets.
"""
out = {}
for name, value in self._data["index_map"].items():
out[name] = value[:]
return ro_dict(out)
@property
def reverse_map(self):
"""Stores the reverse map from product index to stack index.
Do not try to add a new index_map by assigning to an item of this
property. Use :meth:`~BasicCont.create_index_map` instead.
Returns
-------
reverse_map : read only dictionary
Entry is a 1D arrays used to map from product index to stack index.
"""
out = {}
for name, value in self._data["reverse_map"].items():
out[name] = value[:]
return ro_dict(out)
[docs] def group_name_allowed(self, name):
"""No groups are exposed to the user. Returns ``False``."""
return False
[docs] def dataset_name_allowed(self, name):
"""Datasets may only be created and accessed in the root level group.
Returns ``True`` is *name* is a path in the root group i.e. '/dataset'.
"""
parent_name, name = posixpath.split(name)
return parent_name == "/"
[docs] def create_index_map(self, axis_name, index_map):
"""Create a new index map."""
self._data["index_map"].create_dataset(axis_name, data=index_map)
[docs] def del_index_map(self, axis_name):
"""Delete an index map."""
del self._data["index_map"][axis_name]
[docs] def create_reverse_map(self, axis_name, index_map):
"""Create a new reverse map."""
self._data["reverse_map"].create_dataset(axis_name, data=index_map)
[docs] def del_reverse_map(self, axis_name):
"""Delete an index map."""
del self._data["reverse_map"][axis_name]
[docs] def add_history(self, name, history=None):
"""
Create a new history entry.
Parameters
----------
name : str
Name for history entry.
history
History entry (optional). Needs to be json serializable.
Notes
-----
Previously only dictionaries with depth=1 were supported here. The key/value pairs of these
where added as attributes to the history group when written to disk. Reading the old
history format is still supported, however the history is now an attribute itself and
dictionaries of any depth are allowed as history entries.
"""
if name == "order":
raise ValueError(
'"order" is a reserved name and may not be the'
" name of a history entry."
)
if history is None:
history = {}
order = self.history["order"]
order = order + [name]
history_group = self._data["history"]
history_group.attrs["order"] = str(order)
history_group.attrs[name] = history
[docs] def redistribute(self, dist_axis):
"""Redistribute parallel datasets along a specified axis.
Parameters
----------
dist_axis : int, string, or list of
The axis can be specified by an integer index (positive or
negative), or by a string label which must correspond to an entry in
the `axis` attribute on the dataset. If a list is supplied, each
entry is tried in turn, which allows different datasets to be
redistributed along differently labelled axes.
"""
if not isinstance(dist_axis, (list, tuple)):
dist_axis = [dist_axis]
stack = list(self._data._storage_root.items())
# Crawl over the dataset tree and redistribute any matching datasets.
# NOTE: this is done using a non-recursive stack-based tree walk, the previous
# implementation used a recursive closure which generated a reference
# cycle and caused the entire container to be kept alive until an
# explicit gc run. So let this be a warning to be careful in this code.
while stack:
name, item = stack.pop()
# Recurse into subgroups
if isinstance(item, _Storage):
stack += list(item.items())
# Okay, we've found a distributed dataset, let's try and redistribute it
if isinstance(item, MemDatasetDistributed):
naxis = len(item.shape)
for axis in dist_axis:
# Try processing if this is a string
if isinstance(axis, str):
if "axis" in item.attrs and axis in item.attrs["axis"]:
axis = np.argwhere(item.attrs["axis"] == axis)[0, 0]
else:
continue
# Process if axis is an integer
elif isinstance(axis, int):
# Deal with negative axis index
if axis < 0:
axis = naxis + axis
# Check axis is within bounds
if axis >= naxis:
continue
# Excellent, found a matching axis, time to redistribute
item.redistribute(axis)
break
# Note that this clause is on the FOR.
else:
# If we are here we didn't find a matching axis, emit a warning
logger.info(
"Could not find axis (from %s) to distribute dataset %s over.",
str(dist_axis),
name,
)
# Utilities
# ---------
[docs]def attrs2dict(attrs):
"""Safely copy an h5py attributes object to a dictionary."""
out = {}
for key, value in attrs.items():
if isinstance(value, np.ndarray):
value = value.copy()
out[key] = value
return out
[docs]def is_group(obj):
"""Check if the object is a Group, which includes File objects.
In most cases, if it isn't a Group it's a Dataset, so this can be used to
check for Datasets as well.
"""
return hasattr(obj, "create_group")
[docs]def get_h5py_File(f, **kwargs):
"""Convenience function in order to not break old functionality."""
return get_file(f, file_format=fileformats.HDF5, **kwargs)
[docs]def get_file(f, file_format=None, **kwargs):
"""Checks if input is a `zarr`/`h5py.File` or filename and returns the former.
Parameters
----------
f : h5py/zarr Group or filename string
file_format : `fileformats.FileFormat`
File format to use. File format will be guessed if not supplied. Default `None`.
**kwargs : all keyword arguments
Passed to :class:`h5py.File` constructor or `zarr.open_group`. If `f` is already an open file,
silently ignores all keywords.
Returns
-------
f : hdf5 or zarr group
opened : bool
Whether the a file was opened or not (i.e. was already open).
"""
# Figure out if F is a file or a filename, and whether the file should be
# closed.
if is_group(f):
return f, False
else:
if file_format is None:
file_format = fileformats.guess_file_format(f)
if file_format == fileformats.Zarr and not zarr_available:
raise RuntimeError("Unable to open zarr file. Please install zarr.")
try:
f = file_format.open(f, **kwargs)
except IOError as e:
msg = "Opening file %s caused an error: " % str(f)
raise IOError(msg + str(e)) from e
return f, True
[docs]def copyattrs(a1, a2, convert_strings=False):
"""Copy attributes from one h5py/zarr/memh5 attribute object to another.
Parameters
----------
a1 : h5py/zarr/memh5 object
Attributes to copy from.
a1 : h5py/zarr/memh5 object
Attributes to copy into.
convert_strings : bool, optional
Convert string attributes (or lists/arrays of them) to ensure that they are
unicode.
"""
# Make sure everything is a copy.
a1 = attrs2dict(a1)
# When serializing dictionaries, add this in front of the string
json_prefix = "!!_memh5_json:"
def _map_unicode(value):
if not convert_strings:
return value
# Any arrays of numpy type unicode strings must be transformed before being
# copied into HDF5
if isinstance(a2, h5py.AttributeManager):
# As h5py will coerce the value to an array anyway, do it now such
# that the following test works
if isinstance(value, (tuple, list)):
value = np.array(value)
if isinstance(value, np.ndarray) and value.dtype.kind == "U":
value = value.astype(h5py.special_dtype(vlen=str))
return value
# If we are copying into memh5 ensure that any string are unicode
return bytes_to_unicode(value)
def _map_json(value):
# Serialize/deserialize "special" json values
class Memh5JSONEncoder(json.JSONEncoder):
"""
- Datetimes often appear in the configs (as they are parsed by PyYAML),
so we need to serialise them back to strings.
- Some old data format may have numpy arrays in `history["acq"]`. We have to convert
those to lists and decode byte objects.
"""
def default(self, o):
if isinstance(o, datetime.datetime):
return o.isoformat()
elif isinstance(o, np.number):
return o.data
elif isinstance(o, np.ndarray):
if len(o) == 1:
return o.tolist()[0]
return o.tolist()
elif isinstance(o, bytes): # pragma: py3
return o.decode()
# Let the default method raise the TypeError
return json.JSONEncoder.default(self, o)
if (
isinstance(value, (dict, np.ndarray, datetime.datetime))
and zarr_available
and isinstance(a2, zarr.attrs.Attributes)
) or (
isinstance(value, (dict, datetime.datetime))
and isinstance(a2, h5py.AttributeManager)
):
# Save to JSON converting datetimes.
encoder = Memh5JSONEncoder()
value = json_prefix + encoder.encode(value)
elif isinstance(value, str) and value.startswith(json_prefix):
# Read from JSON, keep serialised datetimes as strings
value = json.loads(value[len(json_prefix) :])
return value
for key in sorted(a1):
val = _map_unicode(a1[key])
val = _map_json(val)
if isinstance(val, np.generic): # zarr can't handle numpy types
val = val.item()
a2[key] = val
[docs]def deep_group_copy(
g1,
g2,
selections=None,
convert_dataset_strings=False,
convert_attribute_strings=True,
file_format=fileformats.HDF5,
skip_distributed=False,
postprocess=None,
):
"""
Copy full data tree from one group to another.
Copies from g1 to g2. An axis downselection can be specified by supplying the
parameter 'selections'. For example to select the first two indexes in
g1["foo"]["bar"], do
>>> g1 = MemGroup()
>>> foo = g1.create_group("foo")
>>> ds = foo.create_dataset(name="bar", data=np.arange(3))
>>> g2 = MemGroup()
>>> deep_group_copy(g1, g2, selections={"foo/bar": slice(2)})
>>> list(g2["foo"]["bar"])
[0, 1]
Parameters
----------
g1 : h5py.Group or zarr.Group
Deep copy from this group.
g2 : h5py.Group or zarr.Group
Deep copy to this group.
selections : dict
If this is not None, it should have a subset of the same hierarchical structure
as g1, but ultimately describe axis selections for group entries as valid
numpy indexes.
convert_attribute_strings : bool, optional
Convert string attributes (or lists/arrays of them) to ensure that they are
unicode.
convert_dataset_strings : bool, optional
Convert strings within datasets to ensure that they are unicode.
file_format : `fileformats.FileFormat`
File format to use. Default `fileformats.HDF5`.
skip_distributed : bool, optional
If `True` skip the write for any distributed dataset, and return a list of the
names of all datasets that were skipped. If `False` (default) throw a
`ValueError` if any distributed datasets are encountered.
postprocess : function, optional
A function that takes is called on each node, with the source and destination
entries, and can modify either.
Returns
-------
distributed_dataset_names : list
Names of the distributed datasets if `skip_distributed` is True. Otherwise
`None` is returned.
"""
distributed_dset_names = []
# only the case if zarr is not installed
if file_format.module is None:
raise RuntimeError("Can't deep_group_copy zarr file. Please install zarr.")
to_file = isinstance(g2, file_format.module.Group)
# Prepare a dataset for writing out, applying selections and transforming any
# datatypes
# Returns: (dtype, shape, data_to_write)
def _prepare_dataset(dset):
# Look for a selection for this dataset (also try without the leading "/")
try:
selection = selections.get(
dset.name, selections.get(dset.name[1:], slice(None))
)
except AttributeError:
selection = slice(None)
# Check if this is a distributed dataset and figure out if we can make this work
# out
if to_file and isinstance(dset, MemDatasetDistributed):
if not skip_distributed:
raise ValueError(
f"Cannot write out a distributed dataset ({dset.name}) "
"via this method."
)
elif selection != slice(None):
raise ValueError(
"Cannot apply a slice when writing out a distributed dataset "
f"({dset.name}) via this method."
)
else:
# If we get here, we should create the dataset, but not write out any data into it (i.e. return None)
distributed_dset_names.append(dset.name)
return dset.dtype, dset.shape, None
# Extract the data for the selection
data = entry[selection]
if convert_dataset_strings:
# Convert unicode strings back into ascii byte strings. This will break
# if there are characters outside of the ascii range
if to_file:
data = ensure_bytestring(data)
# Convert strings in an HDF5 dataset into unicode
else:
data = ensure_unicode(data)
elif to_file:
# If we shouldn't convert we at least need to ensure there aren't any
# Unicode characters before writing
data = check_unicode(entry)
return data.dtype, data.shape, data
# get compression options/chunking for this dataset
# Returns dict of compression and chunking arguments for create_dataset
def _prepare_compression_args(dset):
compression = getattr(dset, "compression", None)
compression_opts = getattr(dset, "compression_opts", None)
if to_file:
# massage args according to file format
compression_kwargs = file_format.compression_kwargs(
compression=compression,
compression_opts=compression_opts,
compressor=getattr(dset, "compressor", None),
)
else:
# in-memory case; use HDF5 compression args format for this case
compression_kwargs = fileformats.HDF5.compression_kwargs(
compression=compression, compression_opts=compression_opts
)
compression_kwargs["chunks"] = getattr(dset, "chunks", None)
# disable compression if not enabled for HDF5 files
# https://github.com/chime-experiment/Pipeline/issues/33
if (
to_file
and file_format == fileformats.HDF5
and not fileformats.HDF5.compression_enabled()
and isinstance(dset, MemDatasetDistributed)
):
compression_kwargs = {}
return compression_kwargs
# Do a non-recursive traversal of the tree, recreating the structure and attributes,
# and copying over any non-distributed datasets
stack = [g1]
while stack:
entry = stack.pop()
key = entry.name
if is_group(entry):
if key != g1.name:
# Only create group if we are above the starting level
g2.create_group(key)
stack += [entry[k] for k in sorted(entry, reverse=True)]
else: # Is a dataset
dtype, shape, data = _prepare_dataset(entry)
compression_kwargs = _prepare_compression_args(entry)
g2.create_dataset(
key,
shape=shape,
dtype=dtype,
data=data,
**compression_kwargs,
)
target = g2[key]
copyattrs(entry.attrs, target.attrs, convert_strings=convert_attribute_strings)
if postprocess:
postprocess(entry, target)
return distributed_dset_names if skip_distributed else None
def _distributed_group_to_file(
group,
fname,
mode,
hints=True,
convert_dataset_strings=False,
convert_attribute_strings=True,
file_format=None,
serial=False,
**kwargs,
):
"""Copy full data tree from distributed memh5 object into the destination file.
This routine works in two stages:
- First rank=0 copies all of the groups, attributes and non-distributed datasets
into the target file. The distributed datasets are identified and created in this
step, but their contents are not written. This is done by `deep_group_copy` to try
and centralize as much of the copying code.
- In the second step, the distributed datasets are written to disk. This is mostly
offloaded to `MPIArray.to_file`, but some code around this needs to change depending
on the file type, and if the data can be written in parallel.
"""
comm = group.comm
def apply_hints(source, dest):
if dest.name == "/":
dest.attrs["__memh5_distributed_file"] = True
elif isinstance(source, MemDatasetCommon):
dest.attrs["__memh5_distributed_dset"] = False
elif isinstance(source, MemDatasetDistributed):
dest.attrs["__memh5_distributed_dset"] = True
# Walk the full structure and separate out what we need to write
if comm.rank == 0:
with file_format.open(fname, mode) as fh:
distributed_dataset_names = deep_group_copy(
group,
fh,
convert_dataset_strings=convert_dataset_strings,
convert_attribute_strings=convert_attribute_strings,
skip_distributed=True,
file_format=file_format,
postprocess=(apply_hints if hints else None),
)
else:
distributed_dataset_names = None
distributed_dataset_names = comm.bcast(distributed_dataset_names)
def _write_distributed_datasets(dest):
for name in distributed_dataset_names:
dset = group[name]
data = check_unicode(dset)
data.to_file(
dest,
name,
chunks=dset.chunks,
compression=dset.compression,
compression_opts=dset.compression_opts,
file_format=file_format,
)
comm.Barrier()
# Write out the distributed parts of the file, this needs to be done slightly
# differently depending on the actual format we want to use (and if HDF5+MPI is
# available)
# NOTE: need to use mode r+ as the file should already exist
if file_format == fileformats.Zarr:
with fileformats.ZarrProcessSynchronizer(
f".{fname}.sync", group.comm
) as synchronizer, zarr.open_group(
store=fname, mode="r+", synchronizer=synchronizer
) as f:
_write_distributed_datasets(f)
elif file_format == fileformats.HDF5:
# Use MPI IO if possible, else revert to serialising
if h5py.get_config().mpi:
# Open file on all ranks
with misc.open_h5py_mpi(fname, "r+", comm=group.comm) as f:
if not f.is_mpi:
raise RuntimeError("Could not create file %s in MPI mode" % fname)
_write_distributed_datasets(f)
else:
_write_distributed_datasets(fname)
else:
raise ValueError(f"Unknown format={file_format}")
def _distributed_group_from_file(
fname,
comm=None,
_=True, # usually `hints`, but hints do not do anything in this method
convert_dataset_strings=False,
convert_attribute_strings=True,
file_format=fileformats.HDF5,
**kwargs,
):
"""
Restore full tree from an HDF5 file or Zarr group into a distributed memh5 object.
A `selections=` parameter may be supplied as parts of 'kwargs'. See
`_deep_group_copy' for a description.
"""
# Create root group
group = MemGroup(distributed=True, comm=comm)
comm = group.comm
selections = kwargs.pop("selections", None)
# == Create some internal functions for doing the read ==
# Copy over attributes with a broadcast from rank = 0
def _copy_attrs_bcast(h5item, memitem, **kwargs):
attr_dict = None
if comm.rank == 0:
attr_dict = dict(h5item.attrs)
attr_dict = comm.bcast(attr_dict, root=0)
copyattrs(attr_dict, memitem.attrs, **kwargs)
# Function to perform a recursive clone of the tree structure
def _copy_from_file(h5group, memgroup, selections=None):
# Copy over attributes
_copy_attrs_bcast(h5group, memgroup, convert_strings=convert_attribute_strings)
# Sort items to ensure consistent order
for key in sorted(h5group):
item = h5group[key]
try:
selection = selections.get(
item.name, selections.get(item.name[1:], None)
)
except AttributeError:
selection = None
# If group, create the entry and the recurse into it
if is_group(item):
new_group = memgroup.create_group(key)
_copy_from_file(item, new_group, selections)
# If dataset, create dataset
else:
# Check if we are in a distributed dataset
if ("__memh5_distributed_dset" in item.attrs) and item.attrs[
"__memh5_distributed_dset"
]:
distributed_axis = item.attrs.get("__memh5_distributed_axis", 0)
# Read from file into MPIArray
pdata = mpiarray.MPIArray.from_file(
h5group,
key,
axis=distributed_axis,
comm=comm,
sel=selection,
file_format=file_format,
)
# Create dataset from MPIArray
dset = memgroup.create_dataset(
key, data=pdata, distributed=True, distributed_axis=pdata.axis
)
else:
# Read common data onto rank zero and broadcast
cdata = None
if comm.rank == 0:
if selection is None:
cdata = item[:]
else:
cdata = item[selection]
# Convert ascii string back to unicode if requested
if convert_dataset_strings:
cdata = ensure_unicode(cdata)
cdata = comm.bcast(cdata, root=0)
# Create dataset from array
dset = memgroup.create_dataset(key, data=cdata, distributed=False)
# Copy attributes over into dataset
_copy_attrs_bcast(item, dset, convert_strings=convert_attribute_strings)
if file_format == fileformats.HDF5:
# Open file on all ranks
with misc.open_h5py_mpi(fname, "r", comm=comm) as f:
# Start recursive file read
_copy_from_file(f, group, selections)
else:
f = file_format.open(fname, "r")
_copy_from_file(f, group, selections)
# Final synchronisation
comm.Barrier()
return group
[docs]def bytes_to_unicode(s):
"""Ensure that a string (or collection of) are unicode.
Any byte strings found will be transformed into unicode. Standard
collections are processed recursively. Numpy arrays of byte strings
are converted. Any other types are returned as is.
Note that as HDF5 files will often contain ASCII strings which h5py
converts to byte strings this will be needed even when fully
transitioned to Python 3.
Parameters
----------
s : object
Object to convert.
Returns
-------
u : object
Converted object.
"""
if isinstance(s, bytes):
return s.decode("utf8")
if isinstance(s, np.ndarray) and s.dtype.kind == "S":
return s.astype(str)
if isinstance(s, (list, tuple, set)):
return s.__class__(bytes_to_unicode(t) for t in s)
if isinstance(s, dict):
return {bytes_to_unicode(k): bytes_to_unicode(v) for k, v in s.items()}
return s
[docs]def dtype_to_unicode(dt):
"""Convert byte strings in a dtype to unicode.
This will attempt to parse a numpy dtype and convert strings to unicode.
.. warning:: Custom alignment will not be preserved in these type conversions as
the byte and unicode string types are of different sizes.
Parameters
----------
dt : np.dtype
Data type to convert.
Returns
-------
new_dt : np.dtype
A new datatype with the converted string type.
"""
return _convert_dtype(dt, "|S", "<U")
[docs]def dtype_to_bytestring(dt):
"""Convert unicode strings in a dtype to byte strings.
This will attempt to parse a numpy dtype and convert strings to bytes.
.. warning:: Custom alignment will not be preserved in these type conversions as
the byte and unicode string types are of different sizes.
Parameters
----------
dt : np.dtype
Data type to convert.
Returns
-------
new_dt : np.dtype
A new datatype with the converted string type.
"""
return _convert_dtype(dt, "<U", "|S")
def _convert_dtype(dt, type_from, type_to):
"""Convert types in a numpy dtype to another type.
.. warning:: Custom alignment will not be preserved in these type conversions as
the byte and unicode string types are of different sizes.
Parameters
----------
dt : np.dtype
Data type to convert.
type_from : str
Type code (with alignment) to find.
type_to : str
Type code (with alignment) to convert to.
Returns
-------
new_dt : np.dtype
A new datatype with the converted string types.
"""
def _conv(t):
# If we get a tuple that should mean it's a type with some extended metadata, extract the
# type and throw away the metadata
if isinstance(t, tuple):
t = t[0]
return t.replace(type_from, type_to)
# For compound types we must recurse over the full compound type structure
def _iter_conv(x):
items = []
for item in x:
name = item[0]
type_ = item[1]
# Recursively convert the type
newtype = _iter_conv(type_) if isinstance(type_, list) else _conv(type_)
items.append((name, newtype))
return items
# For scalar types the conversion is easy
if not dt.names:
return np.dtype(_conv(dt.str))
# For compound types we need to iterate through
else:
return np.dtype(_iter_conv(dt.descr))
[docs]def has_kind(dt, kind):
"""Test if a numpy datatype has any fields of a specified type.
Parameters
----------
dt : np.dtype
Data type to convert.
kind : str
Numpy type code character. e.g. "S" for bytestring and "U" for unicode.
Returns
------
has_kind : bool
True if it contains the requested kind.
"""
# For scalar types the conversion is easy
if not dt.names:
return dt.kind == kind
# For compound types we must recurse over the full compound type structure
def _iter_conv(x):
for item in x:
type_ = item[1]
# Recursively convert the type
if isinstance(type_, list) and _iter_conv(type_):
return True
elif isinstance(type_, tuple) and type_[0][1] == kind:
return True
elif type_[1] == kind:
return True
return False
return _iter_conv(dt.descr)
[docs]def has_unicode(dt):
"""Test if data type contains any unicode fields.
See `has_kind`.
"""
return has_kind(dt, "U")
[docs]def has_bytestring(dt):
"""Test if data type contains any unicode fields.
See `has_kind`.
"""
return has_kind(dt, "S")
[docs]def ensure_bytestring(arr):
"""If needed convert the array to contain bytestrings not unicode.
Parameters
----------
arr : np.ndarray
Input array.
Returns
-------
arr_conv : np.ndarray
The converted array. If no conversion was required, just returns `arr`.
"""
if has_unicode(arr.dtype):
return arr.astype(dtype_to_bytestring(arr.dtype))
else:
return arr
[docs]def ensure_unicode(arr):
"""If needed convert the array to contain unicode strings not bytestrings.
Parameters
----------
arr : np.ndarray
Input array.
Returns
-------
arr_conv : np.ndarray
The converted array. If no conversion was required, just returns `arr`.
"""
if has_bytestring(arr.dtype):
return arr.astype(dtype_to_unicode(arr.dtype))
else:
return arr
[docs]def check_unicode(dset):
"""Test if dataset contains unicode so we can raise an appropriate error.
If there is no unicode, return the data from the array.
Parameters
----------
dset : MemDataset
Dataset to check.
Returns
-------
dset :
The converted array. If no conversion was required, just returns `arr`.
"""
if has_unicode(dset.dtype):
raise TypeError(
'Can not write dataset "%s" of unicode type into HDF5.' % dset.name
)
return dset.data