caput.mpiarray

An array class for containing MPI distributed array.

Examples

This example performs a transfrom from time-freq to lag-m space. This involves Fourier transforming each of these two axes of the distributed array:

import numpy as np
from mpi4py import MPI

from caput.mpiarray import MPIArray

nfreq = 32
nprod = 2
ntime = 32

Initialise array with (nfreq, nprod, ntime) global shape
darr1 = MPIArray((nfreq, nprod, ntime), dtype=np.float64)

# Load in data into parallel array
for lfi, fi in darr1.enumerate(axis=0):
    darr1[lfi] = load_freq_data(gfi)

# Perform m-transform (i.e. FFT)
darr2 = MPIArray.wrap(np.fft.fft(darr1, axis=1), axis=0)

# Redistribute to get all frequencies onto each process, this performs the
# global transpose using MPI to make axis=1 the distributed axis, and make
# axis=0 completely local.
darr3 = darr2.redistribute(axis=1)

# Perform the lag transform on the frequency direction.
darr4 = MPIArray.wrap(np.fft.irfft(darr3, axis=0), axis=1)

Note: If a user wishes to create an MPIArray from an ndarray, they should use MPIArray.wrap(). They should not use ndarray.view(MPIArray). Attributes will not be set correctly if they do.

Global Slicing

The MPIArray also supports slicing with the global index using the MPIArray.global_slice property. This can be used for both fetching and assignment with global indices, supporting the basic slicing notation of numpy.

Its behaviour changes depending on the exact slice it gets:

  • A full slice (:) along the parallel axis returns an MPIArray on fetching, and accepts an MPIArray on assignment.

  • A partial slice (:) returns and accepts a numpy array on the rank holding the data, and None on other ranks.

It’s important to note that it never communicates data between ranks. It only ever operates on data held on the current rank.

Global Slicing Examples

Here is an example of this in action. Create and set an MPI array:

>>> import numpy as np
>>> from caput import mpiarray, mpiutil
>>>
>>> arr = mpiarray.MPIArray((mpiutil.size, 3), dtype=np.float64)
>>> arr[:] = 0.0
>>> for ri in range(mpiutil.size):
...    if ri == mpiutil.rank:
...        print(ri, arr)
...    mpiutil.barrier()
0 [[0. 0. 0.]]

Use a global index to assign to the array

>>> arr.global_slice[3] = 17

Fetch a view of the whole array with a full slice

>>> arr2 = arr.global_slice[:, 2]

Print the third column of the array on all ranks

>>> for ri in range(mpiutil.size):
...    if ri == mpiutil.rank:
...        print(ri, arr2)
...    mpiutil.barrier()
0 [0.]

Fetch a view of the whole array with a partial slice. The final two ranks should be None >>> arr3 = arr.global_slice[:2, 2] >>> for ri in range(mpiutil.size): … if ri == mpiutil.rank: … print(ri, arr3) … mpiutil.barrier() 0 [0.]

Direct Slicing

MPIArray supports direct slicing using […] (implemented via __getitem__() ). This can be used for both fetching and assignment. It is recommended to only index into the non-parallel axis or to do a full slice [:].

Direct Slicing Behaviour

  • A full slice [:] will return a MPIArray on fetching, with identical properties to the original array.

  • Any indexing or slicing into the non-parallel axis, will also return a MPIArray. The number associated with the parallel axis, will be adjusted if a slice results in an axis reduction.

  • Any indexing into the parallel axis is discouraged. This behaviour is deprecated. For now, it will result into a local index on each rank, returning a regular numpy array, along with a warning. In the future, it is encouraged to index into the local array MPIArray.local_array, if you wish to locally index into the parallel axis

Direct Slicing Examples

Deprecated since version 21.04: Direct indexing into parallel axis is DEPRECATED. For now, it will return a numpy array equal to local array indexing, along with a warning. This behaviour will be removed in the future.

>>> darr = mpiarray.MPIArray((mpiutil.size,), axis=0)
>>> (darr[0] == darr.local_array[0]).all()
True
>>> not hasattr(darr[0], "axis")
True

If you wish to index into local portion of a distributed array along its parallel axis, you need to index into the MPIArray.local_array.

>>> darr[:] = 1.0
>>> darr.local_array[0]
1.0

indexing into non-parallel axes returns an MPIArray with appropriate attributes Slicing could result in a reduction of axis, and a lower parallel axis number

>>> darr = mpiarray.MPIArray((4, mpiutil.size), axis=1)
>>> darr[:] = mpiutil.rank
>>> (darr[0] == mpiutil.rank).all()
array([ True])
>>> darr[0].axis == 0
True

ufunc

In NumPy, universal functions (or ufuncs) are functions that operate on ndarrays in an element-by-element fashion. MPIArray supports all ufunc calculations, except along the parallel axis.

ufunc Requirements

  • All input MPIArray must be distributed along the same axis.

  • If you pass a kwarg axis to the ufunc, it must not be the parallel axis.

ufunc Behaviour

  • If no output are provided, the results are converted back to MPIArrays. The new array will either be parallel over the same axis as the input MPIArrays, or possibly one axis down if the ufunc is applied via a reduce method (i.e. the shape of the array is reduced by one axis).

  • For operations that normally reduce to a scalar, the scalars will be wrapped into a 1D array distributed across axis 0.

  • shape related attributes will be re-calculated.

ufunc Examples

Create an array

>>> dist_arr = mpiarray.MPIArray((mpiutil.size, 4), axis=0)
>>> dist_arr[:] = mpiutil.rank

Element wise summation and .all() reduction

>>> (dist_arr + dist_arr == 2 * mpiutil.rank).all()
array([ True])

Element wise multiplication and reduction

>>> (dist_arr * 2 == 2 * mpiutil.rank).all()
array([ True])

The distributed axis is unchanged during an elementwise operation

>>> (dist_arr + dist_arr).axis == 0
True

An operation on multiple arrays with different parallel axes is not possible and will result in an exception

>>> (mpiarray.MPIArray((mpiutil.size, 4), axis=0) -
...  mpiarray.MPIArray((mpiutil.size, 4), axis=1))  
Traceback (most recent call last):
...
caput.mpiarray.AxisException: The distributed axis for all MPIArrays in an expression
should be the same

Summation across a non-parallel axis

>>> (dist_arr.sum(axis=1) == 4 * mpiutil.rank).all()
array([ True])

A sum reducing across all axes will reduce across each local array and give a new distributed array with a single element on each rank.

>>> (dist_arr.sum() == 4 * 3 * mpiutil.rank).all()
array([ True])
>>> (dist_arr.sum().local_shape) == (1,)
True
>>> (dist_arr.sum().global_shape) == (mpiutil.size,)
True

Reduction methods might result in a decrease in the distributed axis number

>>> dist_arr = mpiarray.MPIArray((mpiutil.size, 4, 3), axis=1)
>>> dist_arr.sum(axis=0).axis == 0
True

MPI.Comm

mpi4py.MPI.Comm provides a wide variety of functions for communications across nodes https://mpi4py.readthedocs.io/en/stable/overview.html?highlight=allreduce#collective-communications

They provide an upper-case and lower-case variant of many functions. With MPIArrays, please use the uppercase variant of the function. The lower-case variants involve an intermediate pickling process, which can lead to malformed arrays.

exception caput.mpiarray.AxisException(message)[source]

Bases: Exception

Exception for distributed axes related errors with MPIArrays.

class caput.mpiarray.DummyContext[source]

Bases: object

A completely dummy context manager.

class caput.mpiarray.MPIArray(global_shape, axis=0, comm=None, *args, **kwargs)[source]

Bases: numpy.ndarray

A numpy array like object which is distributed across multiple processes.

Parameters
  • global_shape (tuple) – The global array shape. The returned array will be distributed across the specified index.

  • axis (integer, optional) – The dimension to distribute the array across.

allgather()[source]

Gather a full copy onto each rank.

Returns

arr – The full global array.

Return type

np.ndarray

allreduce(op=None)[source]

Perform MPI reduction op within memory buffer.

Usage is restricted to arrays with a single element per rank. Returns same scalar final result to all ranks.

E.g. usage: mpi_array.sum().allreduce() to every rank.

Parameters

op (MPI reduction operation) – Reduction operation to perform. Default: MPI.SUM

Returns

result of reduction

Return type

scalar

property axis

Axis we are distributed over.

Returns

axis

Return type

integer

property comm

The communicator over which the array is distributed.

Returns

comm

Return type

MPI.Comm

copy()[source]

Return a copy of the MPIArray.

Returns

arr_copy

Return type

MPIArray

enumerate(axis)[source]

Helper for enumerating over a given axis.

Parameters

axis (integer) – Which access to enumerate over.

Returns

iterator – An enumerator which returns the local index into the array and the global index it corresponds to.

Return type

(local_index, global_index)

classmethod from_file(f, dataset, comm=None, axis=0, sel=None, file_format=<class 'caput.fileformats.HDF5'>)[source]

Read MPIArray from an HDF5 dataset or Zarr array on disk in parallel.

Parameters
  • f (filename, or h5py.File object) – File to read dataset from.

  • dataset (string) – Name of dataset to read from. Must exist.

  • comm (MPI.Comm, optional) – MPI communicator to distribute over. If None optional, use MPI.COMM_WORLD.

  • axis (int, optional) – Axis over which the read should be distributed. This can be used to select the most efficient axis for the reading.

  • sel (tuple, optional) – A tuple of slice objects used to make a selection from the array before reading. The output will be this selection from the dataset distributed over the given axis.

  • file_format (fileformats.HDF5 or fileformats.Zarr) – File format to use. Default fileformats.HDF5.

Returns

array

Return type

MPIArray

classmethod from_hdf5(f, dataset, comm=None, axis=0, sel=None)[source]

Read MPIArray from an HDF5 dataset in parallel.

Parameters
  • f (filename, or h5py.File object) – File to read dataset from.

  • dataset (string) – Name of dataset to read from. Must exist.

  • comm (MPI.Comm, optional) – MPI communicator to distribute over. If None optional, use MPI.COMM_WORLD.

  • axis (int, optional) – Axis over which the read should be distributed. This can be used to select the most efficient axis for the reading.

  • sel (tuple, optional) – A tuple of slice objects used to make a selection from the array before reading. The output will be this selection from the dataset distributed over the given axis.

Returns

array

Return type

MPIArray

gather(rank=0)[source]

Gather a full copy onto a specific rank.

Parameters

rank (int, optional) – Rank to gather onto. Default is rank=0

Returns

arr – The full global array on the specified rank.

Return type

np.ndarray, or None

property global_shape

Global array shape.

Returns

global_shape

Return type

tuple

property global_slice

Return an objects that presents a view of the array with global slicing.

Returns

global_slice

Return type

object

property local_array

The view of the local numpy array.

Returns

local_array

Return type

np.ndarray

property local_offset

Offset into global array.

This is equivalent to the global-index of the [0, 0, …] element of the local section.

Returns

local_offset

Return type

tuple

property local_shape

Shape of local section.

Returns

local_shape

Return type

tuple

redistribute(axis)[source]

Change the axis that the array is distributed over.

Parameters

axis (integer) – Axis to distribute over.

Returns

array – A new copy of the array distributed over the specified axis. Note that the local section will have changed.

Return type

MPIArray

reshape(*shape)[source]

Reshape the array.

Must not attempt to reshape the distributed axis. That axis must be given an input length None.

Parameters

shape (tuple) – Tuple of axis lengths. The distributed must be given None.

Returns

array – Reshaped MPIArray as a view of the original data.

Return type

MPIArray

to_file(f, dataset, create=False, chunks=None, compression=None, compression_opts=None, file_format=<class 'caput.fileformats.HDF5'>)[source]

Parallel write into a contiguous HDF5/Zarr dataset.

Parameters
  • f (str, h5py.File, h5py.Group or zarr.Group) – File to write dataset into.

  • dataset (string) – Name of dataset to write into. Should not exist.

  • chunks

  • compression (str or int) – Name or identifier of HDF5 compression filter.

  • compression_opts – See HDF5 documentation for compression filters. Compression options for the dataset.

to_hdf5(f, dataset, create=False, chunks=None, compression=None, compression_opts=None)[source]

Parallel write into a contiguous HDF5 dataset.

Parameters
  • f (str, h5py.File or h5py.Group) – File to write dataset into.

  • dataset (string) – Name of dataset to write into. Should not exist.

  • chunks

  • compression (str or int) – Name or identifier of HDF5 compression filter.

  • compression_opts – See HDF5 documentation for compression filters. Compression options for the dataset.

to_zarr(f, dataset, create, chunks, compression, compression_opts)[source]

Parallel write into a contiguous Zarr dataset.

Parameters
  • f (str zarr.Group) – File to write dataset into.

  • dataset (string) – Name of dataset to write into. Should not exist.

  • chunks

  • compression (str or int) – Name or identifier of HDF5 compression filter.

  • compression_opts – See HDF5 documentation for compression filters. Compression options for the dataset.

transpose(*axes)[source]

Transpose the array axes.

Parameters

axes (None, tuple of ints, or n ints) –

  • None or no argument: reverses the order of the axes.

  • tuple of ints: i in the j-th place in the tuple means a’s i-th axis becomes a.transpose()’s j-th axis.

  • n ints: same as an n-tuple of the same ints (this form is intended simply as a “convenience” alternative to the tuple form)

Returns

array – Transposed MPIArray as a view of the original data.

Return type

MPIArray

classmethod wrap(array, axis, comm=None)[source]

Turn a set of numpy arrays into a distributed MPIArray object.

This is needed for functions such as np.fft.fft which always return an np.ndarray.

Parameters
  • array (np.ndarray) – Array to wrap.

  • axis (integer) – Axis over which the array is distributed. The lengths are checked to try and ensure this is correct.

  • comm (MPI.Comm, optional) – The communicator over which the array is distributed. If None (default), use MPI.COMM_WORLD.

Returns

dist_array – An MPIArray view of the input.

Return type

MPIArray