caput.mpiarray#

A MPI-distribued numpy array.

This module provides a distributed array object which is a subclass of numpy.ndarray. The goal of this module is to enable seamless use of numpy arrays in a distributed memory context, while remaining as functionally similar to np.ndarray as possible.

Examples#

This example performs a transfrom from time-freq to lag-m space. This involves Fourier transforming each of these two axes of the distributed array:

import numpy as np
from mpi4py import MPI

from caput.mpiarray import MPIArray

nfreq = 32
nprod = 2
ntime = 32

Initialise array with (nfreq, nprod, ntime) global shape
darr1 = MPIArray((nfreq, nprod, ntime), dtype=np.float64)

# Load in data into parallel array
for lfi, fi in darr1.enumerate(axis=0):
    darr1[lfi] = load_freq_data(gfi)

# Perform m-transform (i.e. FFT)
darr2 = MPIArray.wrap(np.fft.fft(darr1, axis=1), axis=0)

# Redistribute to get all frequencies onto each process, this performs the
# global transpose using MPI to make axis=1 the distributed axis, and make
# axis=0 completely local.
darr3 = darr2.redistribute(axis=1)

# Perform the lag transform on the frequency direction.
darr4 = MPIArray.wrap(np.fft.irfft(darr3, axis=0), axis=1)

Note: If a user wishes to create an MPIArray from an ndarray, they should use MPIArray.wrap(). They should not use np.ndarray.view(MPIArray)(). Attributes will not be set correctly if they do.

Global Slicing#

The MPIArray also supports slicing with the global index using the MPIArray.global_slice property. This can be used for both fetching and assignment with global indices, supporting the basic slicing notation of numpy.

Its behaviour changes depending on the exact slice it gets:

  • A full slice (:) along the parallel axis returns a MPIArray on fetching, and accepts an MPIArray on assignment.

  • A partial slice (:) returns and accepts a numpy array on the rank holding the data, and None on other ranks.

It’s important to note that it never communicates data between ranks. It only ever operates on data held on the current rank.

Global Slicing Examples#

Here is an example of this in action. Create and set an MPI array:

>>> import numpy as np
>>> from caput.mpiarray import MPIArray
>>> from caput.util import mpitools
>>>
>>> arr = MPIArray((mpitools.size, 3), dtype=np.float64)
>>> arr[:] = 0.0
>>> for ri in range(mpitools.size):
...     if ri == mpitools.rank:
...         print(ri, arr)
...     mpitools.barrier()
0 [[0. 0. 0.]]

Use a global index to assign to the array

>>> arr.global_slice[3] = 17

Fetch a view of the whole array with a full slice

>>> arr2 = arr.global_slice[:, 2]

Print the third column of the array on all ranks

>>> for ri in range(mpitools.size):
...     if ri == mpitools.rank:
...         print(ri, arr2)
...     mpitools.barrier()
0 [0.]

Fetch a view of the whole array with a partial slice. The final two ranks should be None

>>> arr3 = arr.global_slice[:2, 2]
>>> for ri in range(mpitools.size):
...     if ri == mpitools.rank:
...         print(ri, arr3)
...     mpitools.barrier()
0 [0.]

Direct Slicing#

MPIArray supports direct slicing using [...] (implemented via MPIArray.__getitem__()). This can be used for both fetching and assignment. It is recommended to only index into the non-parallel axis or to do a full slice [:].

Direct Slicing Behaviour#

  • A full slice [:] will return a MPIArray on fetching, with identical properties to the original array.

  • Any indexing or slicing into the non-parallel axis, will also return a MPIArray. The number associated with the parallel axis, will be adjusted if a slice results in an axis reduction.

  • Any indexing into the parallel axis is discouraged. This behaviour is deprecated. For now, it will result into a local index on each rank, returning a regular np.ndarray, along with a warning. In the future, it is encouraged to index into the local array MPIArray.local_array, if you wish to locally index into the parallel axis

Direct Slicing Examples#

Deprecated since version 21.04: Direct indexing into parallel axis is DEPRECATED. For now, it will return a numpy array equal to local array indexing, along with a warning. This behaviour will be removed in the future.

>>> darr = MPIArray((mpitools.size,), axis=0)
>>> (darr[0] == darr.local_array[0]).all()
np.True_
>>> not hasattr(darr[0], "axis")
True

If you wish to index into local portion of a distributed array along its parallel axis, you need to index into the MPIArray.local_array.

>>> darr[:] = 1.0
>>> float(darr.local_array[0])
1.0

indexing into non-parallel axes returns an MPIArray with appropriate attributes Slicing could result in a reduction of axis, and a lower parallel axis number

>>> darr = MPIArray((4, mpitools.size), axis=1)
>>> darr[:] = mpitools.rank
>>> (darr[0] == mpitools.rank).all()
array([ True])
>>> darr[0].axis == 0
True

ufunc#

In NumPy, universal functions (or ufuncs) are functions that operate on ndarrays in an element-by-element fashion. MPIArray supports all ufunc calculations, except along the parallel axis.

ufunc Requirements#

  • Every input MPIArray must be distributed along the same axis.

  • If you pass a kwarg axis to the ufunc, it must not be the parallel axis.

ufunc Behaviour#

  • If no output are provided, the results are converted back to MPIArrays. The new array will either be parallel over the same axis as the input MPIArrays, or possibly one axis down if the ufunc is applied via a reduce method (i.e. the shape of the array is reduced by one axis).

  • For operations that normally reduce to a scalar, the scalars will be wrapped into a 1D array distributed across axis 0.

  • shape related attributes will be re-calculated.

ufunc Examples#

Create an array

>>> dist_arr = MPIArray((mpitools.size, 4), axis=0)
>>> dist_arr[:] = mpitools.rank

Element wise summation and .all() reduction

>>> (dist_arr + dist_arr == 2 * mpitools.rank).all()
array([ True])

Element wise multiplication and reduction

>>> (dist_arr * 2 == 2 * mpitools.rank).all()
array([ True])

The distributed axis is unchanged during an elementwise operation

>>> (dist_arr + dist_arr).axis == 0
True

An operation on multiple arrays with different parallel axes is not possible and will result in an exception

>>> (
...     MPIArray((mpitools.size, 4), axis=0) - MPIArray((mpitools.size, 4), axis=1)
... )
Traceback (most recent call last):
    ...
caput.mpiarray.AxisException: Input argument 1 has an incompatible distributed axis.

Summation across a non-parallel axis

>>> (dist_arr.sum(axis=1) == 4 * mpitools.rank).all()
array([ True])

A sum reducing across all axes will reduce across each local array and give a new distributed array with a single element on each rank.

>>> (dist_arr.sum() == 4 * 3 * mpitools.rank).all()
array([ True])
>>> (dist_arr.sum().local_shape) == (1,)
True
>>> (dist_arr.sum().global_shape) == (mpitools.size,)
True

Reduction methods might result in a decrease in the distributed axis number

>>> dist_arr = MPIArray((mpitools.size, 4, 3), axis=1)
>>> dist_arr.sum(axis=0).axis == 0
True

MPI.Comm#

mpi4py.MPI.Comm provides a wide variety of functions for communication across nodes They provide an upper-case and lower-case variant of many functions. With MPIArrays, please use the uppercase variant of the function. The lower-case variants involve an intermediate pickling process, which can lead to malformed arrays.

Exceptions#

AxisException

Exception for distributed axes related errors with MPIArrays.

UnsupportedOperation

Exception for when an operation cannot be performed with an MPIArray.

Classes#

MPIArray

A numpy-like array distributed across multiple processes.

Functions#

ones(→ MPIArray)

Generate an MPIArray filled with ones.

zeros(→ MPIArray)

Generate an MPIArray filled with zeros.