caput.mpiarray¶
An array class for containing MPI distributed array.
Examples
This example performs a transfrom from time-freq to lag-m space. This involves Fourier transforming each of these two axes of the distributed array:
import numpy as np
from mpi4py import MPI
from caput.mpiarray import MPIArray
nfreq = 32
nprod = 2
ntime = 32
Initialise array with (nfreq, nprod, ntime) global shape
darr1 = MPIArray((nfreq, nprod, ntime), dtype=np.float64)
# Load in data into parallel array
for lfi, fi in darr1.enumerate(axis=0):
darr1[lfi] = load_freq_data(gfi)
# Perform m-transform (i.e. FFT)
darr2 = MPIArray.wrap(np.fft.fft(darr1, axis=1), axis=0)
# Redistribute to get all frequencies onto each process, this performs the
# global transpose using MPI to make axis=1 the distributed axis, and make
# axis=0 completely local.
darr3 = darr2.redistribute(axis=1)
# Perform the lag transform on the frequency direction.
darr4 = MPIArray.wrap(np.fft.irfft(darr3, axis=0), axis=1)
Note: If a user wishes to create an MPIArray from an ndarray, they should use
MPIArray.wrap(). They should not use ndarray.view(MPIArray).
Attributes will not be set correctly if they do.
Global Slicing¶
The MPIArray also supports slicing with the global index using the
MPIArray.global_slice property. This can be used for both fetching
and assignment with global indices, supporting the basic slicing notation of
numpy.
Its behaviour changes depending on the exact slice it gets:
A full slice (:) along the parallel axis returns an
MPIArrayon fetching, and accepts anMPIArrayon assignment.A partial slice (:) returns and accepts a numpy array on the rank holding the data, and
Noneon other ranks.
It’s important to note that it never communicates data between ranks. It only ever operates on data held on the current rank.
Global Slicing Examples¶
Here is an example of this in action. Create and set an MPI array:
>>> import numpy as np
>>> from caput import mpiarray, mpiutil
>>>
>>> arr = mpiarray.MPIArray((mpiutil.size, 3), dtype=np.float64)
>>> arr[:] = 0.0
>>> for ri in range(mpiutil.size):
... if ri == mpiutil.rank:
... print(ri, arr)
... mpiutil.barrier()
0 [[0. 0. 0.]]
Use a global index to assign to the array
>>> arr.global_slice[3] = 17
Fetch a view of the whole array with a full slice
>>> arr2 = arr.global_slice[:, 2]
Print the third column of the array on all ranks
>>> for ri in range(mpiutil.size):
... if ri == mpiutil.rank:
... print(ri, arr2)
... mpiutil.barrier()
0 [0.]
Fetch a view of the whole array with a partial slice. The final two ranks should be None >>> arr3 = arr.global_slice[:2, 2] >>> for ri in range(mpiutil.size): … if ri == mpiutil.rank: … print(ri, arr3) … mpiutil.barrier() 0 [0.]
Direct Slicing¶
MPIArray supports direct slicing using […] (implemented via
__getitem__() ). This can be used for both fetching and assignment. It is
recommended to only index into the non-parallel axis or to do a full slice [:].
Direct Slicing Behaviour¶
A full slice [:] will return a
MPIArrayon fetching, with identical properties to the original array.Any indexing or slicing into the non-parallel axis, will also return a
MPIArray. The number associated with the parallel axis, will be adjusted if a slice results in an axis reduction.Any indexing into the parallel axis is discouraged. This behaviour is deprecated. For now, it will result into a local index on each rank, returning a regular numpy array, along with a warning. In the future, it is encouraged to index into the local array
MPIArray.local_array, if you wish to locally index into the parallel axis
Direct Slicing Examples¶
Deprecated since version 21.04: Direct indexing into parallel axis is DEPRECATED. For now, it will return a numpy array equal to local array indexing, along with a warning. This behaviour will be removed in the future.
>>> darr = mpiarray.MPIArray((mpiutil.size,), axis=0)
>>> (darr[0] == darr.local_array[0]).all()
True
>>> not hasattr(darr[0], "axis")
True
If you wish to index into local portion of a distributed array along its parallel
axis, you need to index into the MPIArray.local_array.
>>> darr[:] = 1.0
>>> darr.local_array[0]
1.0
indexing into non-parallel axes returns an MPIArray with appropriate attributes Slicing could result in a reduction of axis, and a lower parallel axis number
>>> darr = mpiarray.MPIArray((4, mpiutil.size), axis=1)
>>> darr[:] = mpiutil.rank
>>> (darr[0] == mpiutil.rank).all()
array([ True])
>>> darr[0].axis == 0
True
ufunc¶
In NumPy, universal functions (or ufuncs) are functions that operate on ndarrays
in an element-by-element fashion. MPIArray supports all ufunc calculations,
except along the parallel axis.
ufunc Requirements¶
All input
MPIArraymust be distributed along the same axis.If you pass a kwarg axis to the ufunc, it must not be the parallel axis.
ufunc Behaviour¶
If no output are provided, the results are converted back to MPIArrays. The new array will either be parallel over the same axis as the input MPIArrays, or possibly one axis down if the ufunc is applied via a reduce method (i.e. the shape of the array is reduced by one axis).
For operations that normally reduce to a scalar, the scalars will be wrapped into a 1D array distributed across axis 0.
shape related attributes will be re-calculated.
ufunc Examples¶
Create an array
>>> dist_arr = mpiarray.MPIArray((mpiutil.size, 4), axis=0)
>>> dist_arr[:] = mpiutil.rank
Element wise summation and .all() reduction
>>> (dist_arr + dist_arr == 2 * mpiutil.rank).all()
array([ True])
Element wise multiplication and reduction
>>> (dist_arr * 2 == 2 * mpiutil.rank).all()
array([ True])
The distributed axis is unchanged during an elementwise operation
>>> (dist_arr + dist_arr).axis == 0
True
An operation on multiple arrays with different parallel axes is not possible and will result in an exception
>>> (mpiarray.MPIArray((mpiutil.size, 4), axis=0) -
... mpiarray.MPIArray((mpiutil.size, 4), axis=1))
Traceback (most recent call last):
...
caput.mpiarray.AxisException: The distributed axis for all MPIArrays in an expression
should be the same
Summation across a non-parallel axis
>>> (dist_arr.sum(axis=1) == 4 * mpiutil.rank).all()
array([ True])
A sum reducing across all axes will reduce across each local array and give a new distributed array with a single element on each rank.
>>> (dist_arr.sum() == 4 * 3 * mpiutil.rank).all()
array([ True])
>>> (dist_arr.sum().local_shape) == (1,)
True
>>> (dist_arr.sum().global_shape) == (mpiutil.size,)
True
Reduction methods might result in a decrease in the distributed axis number
>>> dist_arr = mpiarray.MPIArray((mpiutil.size, 4, 3), axis=1)
>>> dist_arr.sum(axis=0).axis == 0
True
MPI.Comm¶
mpi4py.MPI.Comm provides a wide variety of functions for communications across nodes https://mpi4py.readthedocs.io/en/stable/overview.html?highlight=allreduce#collective-communications
They provide an upper-case and lower-case variant of many functions. With MPIArrays, please use the uppercase variant of the function. The lower-case variants involve an intermediate pickling process, which can lead to malformed arrays.
- exception caput.mpiarray.AxisException(message)[source]¶
Bases:
ExceptionException for distributed axes related errors with MPIArrays.
- class caput.mpiarray.MPIArray(global_shape, axis=0, comm=None, *args, **kwargs)[source]¶
Bases:
numpy.ndarrayA numpy array like object which is distributed across multiple processes.
- Parameters
global_shape (tuple) – The global array shape. The returned array will be distributed across the specified index.
axis (integer, optional) – The dimension to distribute the array across.
- allgather()[source]¶
Gather a full copy onto each rank.
- Returns
arr – The full global array.
- Return type
np.ndarray
- allreduce(op=None)[source]¶
Perform MPI reduction op within memory buffer.
Usage is restricted to arrays with a single element per rank. Returns same scalar final result to all ranks.
E.g. usage: mpi_array.sum().allreduce() to every rank.
- Parameters
op (MPI reduction operation) – Reduction operation to perform. Default: MPI.SUM
- Returns
result of reduction
- Return type
scalar
- property axis¶
Axis we are distributed over.
- Returns
axis
- Return type
integer
- property comm¶
The communicator over which the array is distributed.
- Returns
comm
- Return type
MPI.Comm
- enumerate(axis)[source]¶
Helper for enumerating over a given axis.
- Parameters
axis (integer) – Which access to enumerate over.
- Returns
iterator – An enumerator which returns the local index into the array and the global index it corresponds to.
- Return type
(local_index, global_index)
- classmethod from_file(f, dataset, comm=None, axis=0, sel=None, file_format=<class 'caput.fileformats.HDF5'>)[source]¶
Read MPIArray from an HDF5 dataset or Zarr array on disk in parallel.
- Parameters
f (filename, or h5py.File object) – File to read dataset from.
dataset (string) – Name of dataset to read from. Must exist.
comm (MPI.Comm, optional) – MPI communicator to distribute over. If None optional, use MPI.COMM_WORLD.
axis (int, optional) – Axis over which the read should be distributed. This can be used to select the most efficient axis for the reading.
sel (tuple, optional) – A tuple of slice objects used to make a selection from the array before reading. The output will be this selection from the dataset distributed over the given axis.
file_format (fileformats.HDF5 or fileformats.Zarr) – File format to use. Default fileformats.HDF5.
- Returns
array
- Return type
- classmethod from_hdf5(f, dataset, comm=None, axis=0, sel=None)[source]¶
Read MPIArray from an HDF5 dataset in parallel.
- Parameters
f (filename, or h5py.File object) – File to read dataset from.
dataset (string) – Name of dataset to read from. Must exist.
comm (MPI.Comm, optional) – MPI communicator to distribute over. If None optional, use MPI.COMM_WORLD.
axis (int, optional) – Axis over which the read should be distributed. This can be used to select the most efficient axis for the reading.
sel (tuple, optional) – A tuple of slice objects used to make a selection from the array before reading. The output will be this selection from the dataset distributed over the given axis.
- Returns
array
- Return type
- gather(rank=0)[source]¶
Gather a full copy onto a specific rank.
- Parameters
rank (int, optional) – Rank to gather onto. Default is rank=0
- Returns
arr – The full global array on the specified rank.
- Return type
np.ndarray, or None
- property global_shape¶
Global array shape.
- Returns
global_shape
- Return type
tuple
- property global_slice¶
Return an objects that presents a view of the array with global slicing.
- Returns
global_slice
- Return type
object
- property local_array¶
The view of the local numpy array.
- Returns
local_array
- Return type
np.ndarray
- property local_offset¶
Offset into global array.
This is equivalent to the global-index of the [0, 0, …] element of the local section.
- Returns
local_offset
- Return type
tuple
- property local_shape¶
Shape of local section.
- Returns
local_shape
- Return type
tuple
- redistribute(axis)[source]¶
Change the axis that the array is distributed over.
- Parameters
axis (integer) – Axis to distribute over.
- Returns
array – A new copy of the array distributed over the specified axis. Note that the local section will have changed.
- Return type
- reshape(*shape)[source]¶
Reshape the array.
Must not attempt to reshape the distributed axis. That axis must be given an input length None.
- Parameters
shape (tuple) – Tuple of axis lengths. The distributed must be given None.
- Returns
array – Reshaped MPIArray as a view of the original data.
- Return type
- to_file(f, dataset, create=False, chunks=None, compression=None, compression_opts=None, file_format=<class 'caput.fileformats.HDF5'>)[source]¶
Parallel write into a contiguous HDF5/Zarr dataset.
- Parameters
f (str, h5py.File, h5py.Group or zarr.Group) – File to write dataset into.
dataset (string) – Name of dataset to write into. Should not exist.
chunks –
compression (str or int) – Name or identifier of HDF5 compression filter.
compression_opts – See HDF5 documentation for compression filters. Compression options for the dataset.
- to_hdf5(f, dataset, create=False, chunks=None, compression=None, compression_opts=None)[source]¶
Parallel write into a contiguous HDF5 dataset.
- Parameters
f (str, h5py.File or h5py.Group) – File to write dataset into.
dataset (string) – Name of dataset to write into. Should not exist.
chunks –
compression (str or int) – Name or identifier of HDF5 compression filter.
compression_opts – See HDF5 documentation for compression filters. Compression options for the dataset.
- to_zarr(f, dataset, create, chunks, compression, compression_opts)[source]¶
Parallel write into a contiguous Zarr dataset.
- Parameters
f (str zarr.Group) – File to write dataset into.
dataset (string) – Name of dataset to write into. Should not exist.
chunks –
compression (str or int) – Name or identifier of HDF5 compression filter.
compression_opts – See HDF5 documentation for compression filters. Compression options for the dataset.
- transpose(*axes)[source]¶
Transpose the array axes.
- Parameters
axes (None, tuple of ints, or n ints) –
None or no argument: reverses the order of the axes.
tuple of ints: i in the j-th place in the tuple means a’s i-th axis becomes a.transpose()’s j-th axis.
n ints: same as an n-tuple of the same ints (this form is intended simply as a “convenience” alternative to the tuple form)
- Returns
array – Transposed MPIArray as a view of the original data.
- Return type
- classmethod wrap(array, axis, comm=None)[source]¶
Turn a set of numpy arrays into a distributed MPIArray object.
This is needed for functions such as np.fft.fft which always return an np.ndarray.
- Parameters
array (np.ndarray) – Array to wrap.
axis (integer) – Axis over which the array is distributed. The lengths are checked to try and ensure this is correct.
comm (MPI.Comm, optional) – The communicator over which the array is distributed. If None (default), use MPI.COMM_WORLD.
- Returns
dist_array – An MPIArray view of the input.
- Return type