"""
Utilities for making MPI usage transparent.
This module exposes much of the functionality of :mod:`mpi4py` but will still
run in serial if mpi is not present on the system. It is thus useful for
writing code that can be run in either parallel or serial. Also it exposes all
attributes of the :mod:`mpi4py.MPI` by the :class:`SelfWrapper` class for
convenience (You can just use 'mpiutil.attr' instead of 'from mpi4py import MPI;
MPI.attr').
Functions
=========
.. autosummary::
:toctree: generated/
active_comm
active
close
partition_list
partition_list_mpi
mpilist
mpirange
barrier
bcast
parallel_map
typemap
split_m
split_all
split_local
gather_local
transpose_blocks
allocate_hdf5_dataset
lock_and_write_buffer
parallel_rows_write_hdf5
"""
import sys
import warnings
from types import ModuleType
import numpy as np
rank = 0
size = 1
_comm = None
world = None
rank0 = True
## Try to setup MPI and get the comm, rank and size.
## If not they should end up as rank=0, size=1.
try:
from mpi4py import MPI
_comm = MPI.COMM_WORLD
world = _comm
rank = _comm.Get_rank()
size = _comm.Get_size()
if _comm is not None and size > 1:
print "Starting MPI rank=%i [size=%i]" % (rank, size)
rank0 = True if rank == 0 else False
sys_excepthook = sys.excepthook
def mpi_excepthook(type, value, traceback):
sys_excepthook(type, value, traceback)
MPI.COMM_WORLD.Abort(1)
sys.excepthook = mpi_excepthook
except ImportError:
warnings.warn("Warning: mpi4py not installed.")
class _close_message(object):
def __repr__(self):
return "<Close message>"
[docs]def active_comm(aprocs):
"""Return a communicator consists of a list of processes in `aprocs`."""
if _comm is None:
return None
else:
# create a new communicator from active processes
comm = _comm.Create(_comm.Get_group().Incl(aprocs))
return comm
[docs]def active(aprocs):
"""Make a list of processes in `aprocs` active, while others wait."""
if _comm is None:
return None
else:
# create a new communicator from active processes
comm = _comm.Create(_comm.Get_group().Incl(aprocs))
if rank not in aprocs:
while True:
# Event loop.
# Sit here and await instructions.
# Blocking receive to wait for instructions.
task = _comm.recv(source=0, tag=MPI.ANY_TAG)
# Check if message is special sentinel signaling end.
# If so, stop.
if isinstance(task, _close_message):
break
return comm
[docs]def close(aprocs):
"""Send a message to the waiting processes to close their waiting."""
if rank0:
for i in list(set(range(size)) - set(aprocs)):
_comm.isend(_close_message(), dest=i)
[docs]def partition_list(full_list, i, n, method='con'):
"""Partition a list into `n` pieces. Return the `i`th partition."""
def _partition(N, n, i):
### If partiion `N` numbers into `n` pieces,
### return the start and stop of the `i`th piece
base = (N / n)
rem = N % n
num_lst = rem * [base+1] + (n - rem) * [base]
cum_num_lst = np.cumsum([0] + num_lst)
return cum_num_lst[i], cum_num_lst[i+1]
N = len(full_list)
start, stop = _partition(N, n, i)
if method == 'con':
return full_list[start:stop]
elif method == 'alt':
return full_list[i::n]
elif method == 'rand':
choices = np.random.permutation(N)[start:stop]
return [ full_list[i] for i in choices ]
else:
raise ValueError('Unknown partition method %s' % method)
[docs]def partition_list_mpi(full_list, method='con', comm=_comm):
"""Return the partition of a list specific to the current MPI process."""
if comm is not None:
rank = comm.rank
size = comm.size
return partition_list(full_list, rank, size, method=method)
# alias mpilist for partition_list_mpi for convenience
mpilist = partition_list_mpi
[docs]def mpirange(*args, **kargs):
"""An MPI aware version of `range`, each process gets its own sub section.
"""
full_list = range(*args)
method = kargs.get('method', 'con')
comm = kargs.get('comm', _comm)
return partition_list_mpi(full_list, method=method, comm=comm)
[docs]def barrier(comm=_comm):
if comm is not None and comm.size > 1:
comm.Barrier()
[docs]def bcast(data, root=0, comm=_comm):
if comm is not None and comm.size > 1:
return comm.bcast(data, root=root)
else:
return data
def allreduce(sendobj, op=None, comm=_comm):
if comm is not None and comm.size > 1:
return comm.allreduce(sendobj, op=(op or MPI.SUM))
else:
return sendobj
# def Gatherv(sendbuf, recvbuf, root=0, comm=_comm):
# if comm is not None and comm.size > 1:
# comm.Gatherv(sendbuf, recvbuf, root=root)
# else:
# # if they are just numpy data buffer
# recvbuf = sendbuf.copy()
# # TODO, other cases
# def Allgatherv(sendbuf, recvbuf, comm=_comm):
# if comm is not None and comm.size > 1:
# return _comm.Allgatherv(sendbuf, recvbuf)
# else:
# # if they are just numpy data buffer
# recvbuf = sendbuf.copy()
# # TODO, other cases
[docs]def parallel_map(func, glist, root=None, method='con', comm=_comm):
"""Apply a parallel map using MPI.
Should be called collectively on the same list. All ranks return the full
set of results.
Parameters
----------
func : function
Function to apply.
glist : list
List of map over. Must be globally defined.
root : None or Integer
Which process should gather the results, all processes will gather the results if None.
method: str
How to split `glist` to each process, can be 'con': continuously, 'alt': alternatively, 'rand': randomly. Default is 'con'.
comm : MPI communicator
MPI communicator that array is distributed over. Default is the gobal _comm.
Returns
-------
results : list
Global list of results.
"""
# Synchronize
barrier(comm=comm)
# If we're only on a single node, then just perform without MPI
if comm is None or comm.size == 1:
return [func(item) for item in glist]
# Pair up each list item with its position.
zlist = list(enumerate(glist))
# Partition list based on MPI rank
llist = partition_list_mpi(zlist, method=method, comm=comm)
# Operate on sublist
flist = [(ind, func(item)) for ind, item in llist]
barrier(comm=comm)
rlist = None
if root is None:
# Gather all results onto all ranks
rlist = comm.allgather(flist)
else:
# Gather all results onto the specified rank
rlist = comm.gather(flist, root=root)
if rlist is not None:
# Flatten the list of results
flatlist = [item for sublist in rlist for item in sublist]
# Sort into original order
sortlist = sorted(flatlist, key=(lambda item: item[0]))
# Synchronize
# barrier(comm=comm)
# Zip to remove indices and extract the return values into a list
return list(zip(*sortlist)[1])
else:
return None
[docs]def typemap(dtype):
"""Map a numpy dtype into an MPI_Datatype.
Parameters
----------
dtype : np.dtype
The numpy datatype.
Returns
-------
mpitype : MPI.Datatype
The MPI.Datatype.
"""
# Need to try both as the name of the typedoct changed in mpi4py 2.0
try:
return MPI.__TypeDict__[np.dtype(dtype).char]
except AttributeError:
return MPI._typedict[np.dtype(dtype).char]
[docs]def split_m(n, m):
"""
Split a range (0, n-1) into m sub-ranges of similar length
Parameters
----------
n : integer
Length of range to split.
m : integer
Number of subranges to split into.
Returns
-------
num : np.ndarray[m]
Number in each sub-range
start : np.ndarray[m]
Starting of each sub-range.
end : np.ndarray[m]
End of each sub-range.
See Also
--------
:fun:`split_all`, :fun:`split_local`
"""
base = (n / m)
rem = n % m
part = base * np.ones(m, dtype=np.int) + (np.arange(m) < rem).astype(np.int)
bound = np.cumsum(np.insert(part, 0, 0))
return np.array([part, bound[:m], bound[1:(m + 1)]])
[docs]def split_all(n, comm=_comm):
"""
Split a range (0, n-1) into sub-ranges for each MPI Process.
Parameters
----------
n : integer
Length of range to split.
comm : MPI Communicator, optional
MPI Communicator to use (default COMM_WORLD).
Returns
-------
num : np.ndarray[m]
Number for each rank.
start : np.ndarray[m]
Starting of each sub-range on a given rank.
end : np.ndarray[m]
End of each sub-range.
See Also
--------
:fun:`split_m`, :fun:`split_local`
"""
m = size if comm is None else comm.size
return split_m(n, m)
[docs]def split_local(n, comm=_comm):
"""
Split a range (0, n-1) into sub-ranges for each MPI Process. This returns
the parameters only for the current rank.
Parameters
----------
n : integer
Length of range to split.
comm : MPI Communicator, optional
MPI Communicator to use (default COMM_WORLD).
Returns
-------
num : integer
Number on this rank.
start : integer
Starting of the sub-range for this rank.
end : integer
End of rank for this rank.
See Also
--------
:fun:`split_all`, :fun:`split_local`
"""
pse = split_all(n, comm=comm)
m = rank if comm is None else comm.rank
return pse[:, m]
[docs]def gather_local(global_array, local_array, local_start, root=0, comm=_comm):
"""
Gather data array in each process to the global array in `root` process.
Parameters
----------
global_array : np.ndarray
The global array which will collect data from `local_array` in each process.
local_array : np.ndarray
The local array in each process to be collected to `global_array`.
local_start : N-tuple
The starting index of the local array to be placed in `global_array`.
root : integer
The process local array gathered to.
comm : MPI communicator
MPI communicator that array is distributed over. Default is MPI.COMM_WORLD.
"""
local_size = local_array.shape
if comm is None or comm.size == 1:
# only one process
slc = [slice(s, s+n) for (s, n) in zip(local_start, local_size)]
global_array[slc] = local_array.copy()
else:
local_sizes = comm.gather(local_size, root=root)
local_starts = comm.gather(local_start, root=root)
mpi_type = typemap(local_array.dtype)
# Each process should send its local sections.
if np.prod(local_size) > 0:
# send only when array is non-empty
sreq = comm.Isend([np.ascontiguousarray(local_array), mpi_type], dest=root, tag=0)
if comm.rank == root:
# list of processes which have non-empty array
nonempty_procs = [ i for i in range(comm.size) if np.prod(local_sizes[i]) > 0 ]
# create newtype corresponding to the local array section in the global array
sub_type = [ mpi_type.Create_subarray(global_array.shape, local_sizes[i], local_starts[i]).Commit() for i in nonempty_procs ] # default order=ORDER_C
# Post each receive
reqs = [ comm.Irecv([global_array, sub_type[si]], source=sr, tag=0) for (si, sr) in enumerate(nonempty_procs) ]
# Wait for requests to complete
MPI.Prequest.Waitall(reqs)
# Wait on send request. Important, as can get weird synchronisation
# bugs otherwise as processes exit before completing their send.
if np.prod(local_size) > 0:
sreq.Wait()
[docs]def transpose_blocks(row_array, shape, comm=_comm):
"""
Take a 2D matrix which is split between processes row-wise and split it
column wise between processes.
Parameters
----------
row_array : np.ndarray
The local section of the global array (split row wise).
shape : 2-tuple
The shape of the global array
comm : MPI communicator
MPI communicator that array is distributed over. Default is MPI.COMM_WORLD.
Returns
-------
col_array : np.ndarray
Local section of the global array (split column wise).
"""
if comm is None or comm.size == 1:
# only one process
if row_array.shape[:-1] == shape[:-1]:
# We are working on a single node and being asked to do
# a trivial transpose.
# Note that to mimic the mpi behaviour we have to allow the
# last index to be trimmed.
return row_array[..., :shape[-1]].copy()
else:
raise ValueError('Shape %s is incompatible with `row_array`s shape %s' % (shape, row_array.shape))
nr = shape[0]
nc = shape[-1]
nm = 1 if len(shape) <= 2 else np.prod(shape[1:-1])
pr, sr, er = split_local(nr, comm=comm) * nm
pc, sc, ec = split_local(nc, comm=comm)
par, sar, ear = split_all(nr, comm=comm) * nm
pac, sac, eac = split_all(nc, comm=comm)
#print pr, nc, shape, row_array.shape
row_array = row_array[:nr, ..., :nc].reshape(pr, nc)
requests_send = []
requests_recv = []
recv_buffer = np.empty((nr * nm, pc), dtype=row_array.dtype)
mpitype = typemap(row_array.dtype)
# Iterate over all processes row wise
for ir in range(comm.size):
# Get the start and end of each set of rows
sir, eir = sar[ir], ear[ir]
# Iterate over all processes column wise
for ic in range(comm.size):
# Get the start and end of each set of columns
sic, eic = sac[ic], eac[ic]
# Construct a unique tag
tag = ir * comm.size + ic
# Send and receive the messages as non-blocking passes
if comm.rank == ir:
# Construct the block to send by cutting out the correct
# columns
block = row_array[:, sic:eic].copy()
#print ir, ic, comm.rank, block.shape
# Send the message
request = comm.Isend([block, mpitype], dest=ic, tag=tag)
requests_send.append([ir, ic, request])
if comm.rank == ic:
# Receive the message into the correct set of rows of recv_buffer
request = comm.Irecv([recv_buffer[sir:eir], mpitype], source=ir, tag=tag)
requests_recv.append([ir, ic, request])
#print ir, ic, comm.rank, recv_buffer[sir:eir].shape
# Wait for all processes to have started their messages
comm.Barrier()
# For each node iterate over all sends and wait until completion
for ir, ic, request in requests_send:
stat = MPI.Status()
#try:
request.Wait(status=stat)
#except MPI.Exception:
# print comm.rank, ir, ic, sar[ir], ear[ir], sac[ic], eac[ic], shape
if stat.error != MPI.SUCCESS:
print "**** ERROR in MPI SEND (r: %i c: %i rank: %i) *****" % (ir, ic, comm.rank)
#print "rank %i: Done waiting on MPI SEND" % comm.rank
comm.Barrier()
# For each frequency iterate over all receives and wait until completion
for ir, ic, request in requests_recv:
stat = MPI.Status()
#try:
request.Wait(status=stat)
#except MPI.Exception:
# print comm.rank, (ir, ic), (ear[ir]-sar[ir], eac[ic]-sac[ic]),
#shape, recv_buffer[sar[ir]:ear[ir]].shape, recv_buffer.dtype, row_array.dtype
if stat.error != MPI.SUCCESS:
print "**** ERROR in MPI RECV (r: %i c: %i rank: %i) *****" % (ir, ir, comm.rank)
return recv_buffer.reshape(shape[:-1] + (pc,))
[docs]def allocate_hdf5_dataset(fname, dsetname, shape, dtype, comm=_comm):
"""Create a hdf5 dataset and return its offset and size.
The dataset will be created contiguously and immediately allocated,
however it will not be filled.
Parameters
----------
fname : string
Name of the file to write.
dsetname : string
Name of the dataset to write (must be at root level).
shape : tuple
Shape of the dataset.
dtype : numpy datatype
Type of the dataset.
comm : MPI communicator
Communicator over which to broadcast results.
Returns
-------
offset : integer
Offset into the file at which the dataset starts (in bytes).
size : integer
Size of the dataset in bytes.
"""
import h5py
state = None
if comm is None or comm.rank == 0:
# Create/open file
f = h5py.File(fname, 'a')
# Create dataspace and HDF5 datatype
sp = h5py.h5s.create_simple(shape, shape)
tp = h5py.h5t.py_create(dtype)
# Create a new plist and tell it to allocate the space for dataset
# immediately, but don't fill the file with zeros.
plist = h5py.h5p.create(h5py.h5p.DATASET_CREATE)
plist.set_alloc_time(h5py.h5d.ALLOC_TIME_EARLY)
plist.set_fill_time(h5py.h5d.FILL_TIME_NEVER)
# Create the dataset
dset = h5py.h5d.create(f.id, dsetname, tp, sp, plist)
# Get the offset of the dataset into the file.
state = dset.get_offset(), dset.get_storage_size()
f.close()
# state = comm.bcast(state, root=0)
state = bcast(state, root=0, comm=comm)
return state
[docs]def lock_and_write_buffer(obj, fname, offset, size):
"""Write the contents of a buffer to disk at a given offset, and explicitly
lock the region of the file whilst doing so.
Parameters
----------
obj : buffer
Data to write to disk.
fname : string
Filename to write.
offset : integer
Offset into the file to start writing at.
size : integer
Size of the region to write to (and lock).
"""
import os
import os.fcntl as fcntl
buf = buffer(obj)
if len(buf) > size:
raise Exception("Size doesn't match array length.")
fd = os.open(fname, os.O_RDRW | os.O_CREAT)
fcntl.lockf(fd, fcntl.LOCK_EX, size, offset, os.SEEK_SET)
nb = os.write(fd, buf)
if nb != len(buf):
raise Exception("Something funny happened with the reading.")
fcntl.lockf(fd, fcntl.LOCK_UN)
os.close(fd)
[docs]def parallel_rows_write_hdf5(fname, dsetname, local_data, shape, comm=_comm):
"""Write out array (distributed across processes row wise) into a HDF5 in parallel.
"""
offset, size = allocate_hdf5_dataset(fname, dsetname, shape, local_data.dtype, comm=comm)
lr, sr, er = split_local(shape[0], comm=comm)
nc = np.prod(shape[1:])
lock_and_write_buffer(local_data, fname, offset + sr * nc, lr * nc)
# this is a thin wrapper around THIS module (we patch sys.modules[__name__])
class SelfWrapper(ModuleType):
def __init__(self, self_module, baked_args={}):
for attr in ["__file__", "__hash__", "__buildins__", "__doc__", "__name__", "__package__"]:
setattr(self, attr, getattr(self_module, attr, None))
self.self_module = self_module
def __getattr__(self, name):
if name in globals():
return globals()[name]
elif _comm is not None and name in MPI.__dict__:
return MPI.__dict__[name]
def __call__(self, **kwargs):
# print 'here'
return SelfWrapper(self.self_module, kwargs)
self = sys.modules[__name__]
sys.modules[__name__] = SelfWrapper(self)