import itertools
import logging
import os
import sys
import traceback
from functools import wraps
from io import StringIO
import numpy as np
from more_itertools import always_iterable
import yt.utilities.logger
from yt.config import ytcfg
from yt.data_objects.image_array import ImageArray
from yt.funcs import is_sequence
from yt.units.unit_registry import UnitRegistry # type: ignore
from yt.units.yt_array import YTArray
from yt.utilities.exceptions import YTNoDataInObjectError
from yt.utilities.lib.quad_tree import QuadTree, merge_quadtrees
from yt.utilities.logger import ytLogger as mylog
# We default to *no* parallelism unless it gets turned on, in which case this
# will be changed.
MPI = None
parallel_capable = False
dtype_names = {
"float32": "MPI.FLOAT",
"float64": "MPI.DOUBLE",
"int32": "MPI.INT",
"int64": "MPI.LONG",
"c": "MPI.CHAR",
}
op_names = {"sum": "MPI.SUM", "min": "MPI.MIN", "max": "MPI.MAX"}
[docs]
class FilterAllMessages(logging.Filter):
"""
This is a simple filter for logging.Logger's that won't let any
messages pass.
"""
[docs]
def filter(self, record):
return 0
# Set up translation table and import things
[docs]
def traceback_writer_hook(file_suffix=""):
def write_to_file(exc_type, exc, tb):
sys.__excepthook__(exc_type, exc, tb)
fn = f"yt_traceback{file_suffix}"
with open(fn, "w") as fhandle:
traceback.print_exception(exc_type, exc, tb, file=fhandle)
print(f"Wrote traceback to {fn}")
MPI.COMM_WORLD.Abort(1)
return write_to_file
[docs]
def default_mpi_excepthook(exception_type, exception_value, tb):
traceback.print_tb(tb)
mylog.error("%s: %s", exception_type.__name__, exception_value)
comm = yt.communication_system.communicators[-1]
if comm.size > 1:
mylog.error("Error occurred on rank %d.", comm.rank)
MPI.COMM_WORLD.Abort(1)
[docs]
def enable_parallelism(suppress_logging: bool = False, communicator=None) -> bool:
"""
This method is used inside a script to turn on MPI parallelism, via
mpi4py. More information about running yt in parallel can be found
here: https://yt-project.org/docs/3.0/analyzing/parallel_computation.html
Parameters
----------
suppress_logging : bool
If set to True, only rank 0 will log information after the initial
setup of MPI.
communicator : mpi4py.MPI.Comm
The MPI communicator to use. This controls which processes yt can see.
If not specified, will be set to COMM_WORLD.
Returns
-------
parallel_capable: bool
True if the call was successful. False otherwise.
"""
global parallel_capable, MPI
try:
from mpi4py import MPI as _MPI
except ImportError:
mylog.error("Could not enable parallelism: mpi4py is not installed")
return False
MPI = _MPI
exe_name = os.path.basename(sys.executable)
# if no communicator specified, set to COMM_WORLD
if communicator is None:
communicator = MPI.COMM_WORLD.Dup()
else:
communicator = communicator.Dup()
parallel_capable = communicator.size > 1
if not parallel_capable:
mylog.error(
"Could not enable parallelism: only one mpi process is running. "
"To remedy this, launch the Python interpreter as\n"
" mpirun -n <X> python3 <yourscript>.py # with X > 1 ",
)
return False
mylog.info(
"Global parallel computation enabled: %s / %s",
communicator.rank,
communicator.size,
)
communication_system.push(communicator)
ytcfg["yt", "internals", "global_parallel_rank"] = communicator.rank
ytcfg["yt", "internals", "global_parallel_size"] = communicator.size
ytcfg["yt", "internals", "parallel"] = True
if exe_name == "embed_enzo" or ("_parallel" in dir(sys) and sys._parallel): # type: ignore
ytcfg["yt", "inline"] = True
yt.utilities.logger.uncolorize_logging()
# Even though the uncolorize function already resets the format string,
# we reset it again so that it includes the processor.
f = logging.Formatter(
"P%03i %s" % (communicator.rank, yt.utilities.logger.ufstring)
)
if len(yt.utilities.logger.ytLogger.handlers) > 0:
yt.utilities.logger.ytLogger.handlers[0].setFormatter(f)
if ytcfg.get("yt", "parallel_traceback"):
sys.excepthook = traceback_writer_hook("_%03i" % communicator.rank)
else:
sys.excepthook = default_mpi_excepthook
if ytcfg.get("yt", "log_level") < 20:
yt.utilities.logger.ytLogger.warning(
"Log Level is set low -- this could affect parallel performance!"
)
dtype_names.update(
{
"float32": MPI.FLOAT,
"float64": MPI.DOUBLE,
"int32": MPI.INT,
"int64": MPI.LONG,
"c": MPI.CHAR,
}
)
op_names.update({"sum": MPI.SUM, "min": MPI.MIN, "max": MPI.MAX})
# Turn off logging on all but the root rank, if specified.
if suppress_logging:
if communicator.rank > 0:
mylog.addFilter(FilterAllMessages())
return True
# Because the dtypes will == correctly but do not hash the same, we need this
# function for dictionary access.
[docs]
def get_mpi_type(dtype):
for dt, val in dtype_names.items():
if dt == dtype:
return val
[docs]
class ObjectIterator:
"""
This is a generalized class that accepts a list of objects and then
attempts to intelligently iterate over them.
"""
def __init__(self, pobj, just_list=False, attr="_grids"):
self.pobj = pobj
if hasattr(pobj, attr) and getattr(pobj, attr) is not None:
gs = getattr(pobj, attr)
else:
gs = getattr(pobj._data_source, attr)
if len(gs) == 0:
raise YTNoDataInObjectError(pobj)
if hasattr(gs[0], "proc_num"):
# This one sort of knows about MPI, but not quite
self._objs = [
g
for g in gs
if g.proc_num == ytcfg.get("yt", "internals", "topcomm_parallel_rank")
]
self._use_all = True
else:
self._objs = gs
if hasattr(self._objs[0], "filename"):
self._objs = sorted(self._objs, key=lambda g: g.filename)
self._use_all = False
self.ng = len(self._objs)
self.just_list = just_list
def __iter__(self):
yield from self._objs
[docs]
class ParallelObjectIterator(ObjectIterator):
"""
This takes an object, *pobj*, that implements ParallelAnalysisInterface,
and then does its thing, calling initialize and finalize on the object.
"""
def __init__(self, pobj, just_list=False, attr="_grids", round_robin=False):
ObjectIterator.__init__(self, pobj, just_list, attr=attr)
# pobj has to be a ParallelAnalysisInterface, so it must have a .comm
# object.
self._offset = pobj.comm.rank
self._skip = pobj.comm.size
# Note that we're doing this in advance, and with a simple means
# of choosing them; more advanced methods will be explored later.
if self._use_all:
self.my_obj_ids = np.arange(len(self._objs))
else:
if not round_robin:
self.my_obj_ids = np.array_split(
np.arange(len(self._objs)), self._skip
)[self._offset]
else:
self.my_obj_ids = np.arange(len(self._objs))[self._offset :: self._skip]
def __iter__(self):
for gid in self.my_obj_ids:
yield self._objs[gid]
if not self.just_list:
self.pobj._finalize_parallel()
[docs]
def parallel_simple_proxy(func):
"""
This is a decorator that broadcasts the result of computation on a single
processor to all other processors. To do so, it uses the _processing and
_distributed flags in the object to check for blocks. Meant only to be
used on objects that subclass
:class:`~yt.utilities.parallel_tools.parallel_analysis_interface.ParallelAnalysisInterface`.
"""
@wraps(func)
def single_proc_results(self, *args, **kwargs):
retval = None
if hasattr(self, "dont_wrap"):
if func.__name__ in self.dont_wrap:
return func(self, *args, **kwargs)
if not parallel_capable or self._processing or not self._distributed:
return func(self, *args, **kwargs)
comm = _get_comm((self,))
if self._owner == comm.rank:
self._processing = True
retval = func(self, *args, **kwargs)
self._processing = False
# To be sure we utilize the root= kwarg, we manually access the .comm
# attribute, which must be an instance of MPI.Intracomm, and call bcast
# on that.
retval = comm.comm.bcast(retval, root=self._owner)
return retval
return single_proc_results
[docs]
class ParallelDummy(type):
"""
This is a base class that, on instantiation, replaces all attributes that
don't start with ``_`` with
:func:`~yt.utilities.parallel_tools.parallel_analysis_interface.parallel_simple_proxy`-wrapped
attributes. Used as a metaclass.
"""
def __init__(cls, name, bases, d):
super().__init__(name, bases, d)
skip = d.pop("dont_wrap", [])
extra = d.pop("extra_wrap", [])
for attrname in d:
if attrname.startswith("_") or attrname in skip:
if attrname not in extra:
continue
attr = getattr(cls, attrname)
if callable(attr):
setattr(cls, attrname, parallel_simple_proxy(attr))
[docs]
def parallel_passthrough(func):
"""
If we are not run in parallel, this function passes the input back as
output; otherwise, the function gets called. Used as a decorator.
"""
@wraps(func)
def passage(self, *args, **kwargs):
if not self._distributed:
return args[0]
return func(self, *args, **kwargs)
return passage
def _get_comm(args):
if len(args) > 0 and hasattr(args[0], "comm"):
comm = args[0].comm
else:
comm = communication_system.communicators[-1]
return comm
[docs]
def parallel_blocking_call(func):
"""
This decorator blocks on entry and exit of a function.
"""
@wraps(func)
def barrierize(*args, **kwargs):
if not parallel_capable:
return func(*args, **kwargs)
mylog.debug("Entering barrier before %s", func.__name__)
comm = _get_comm(args)
comm.barrier()
retval = func(*args, **kwargs)
mylog.debug("Entering barrier after %s", func.__name__)
comm.barrier()
return retval
return barrierize
[docs]
def parallel_root_only_then_broadcast(func):
"""
This decorator blocks and calls the function on the root processor and then
broadcasts the results to the other processors.
"""
@wraps(func)
def root_only(*args, **kwargs):
if not parallel_capable:
return func(*args, **kwargs)
comm = _get_comm(args)
rv0 = None
if comm.rank == 0:
try:
rv0 = func(*args, **kwargs)
except Exception:
traceback.print_last()
rv = comm.mpi_bcast(rv0)
if not rv:
raise RuntimeError
return rv
return root_only
[docs]
def parallel_root_only(func):
"""
This decorator blocks and calls the function on the root processor,
but does not broadcast results to the other processors.
"""
@wraps(func)
def root_only(*args, **kwargs):
if not parallel_capable:
return func(*args, **kwargs)
comm = _get_comm(args)
rv = None
if comm.rank == 0:
try:
rv = func(*args, **kwargs)
all_clear = 1
except Exception:
traceback.print_last()
all_clear = 0
else:
all_clear = None
all_clear = comm.mpi_bcast(all_clear)
if not all_clear:
raise RuntimeError
return rv
return root_only
[docs]
class Workgroup:
def __init__(self, size, ranks, comm, name):
self.size = size
self.ranks = ranks
self.comm = comm
self.name = name
[docs]
class ProcessorPool:
comm = None
size = None
ranks = None
available_ranks = None
tasks = None
def __init__(self):
self.comm = communication_system.communicators[-1]
self.size = self.comm.size
self.ranks = list(range(self.size))
self.available_ranks = list(range(self.size))
self.workgroups = []
[docs]
def add_workgroup(self, size=None, ranks=None, name=None):
if size is None:
size = len(self.available_ranks)
if len(self.available_ranks) < size:
mylog.error(
"Not enough resources available, asked for %d have %d",
size,
self.available_ranks,
)
raise RuntimeError
if ranks is None:
ranks = [self.available_ranks.pop(0) for i in range(size)]
# Default name to the workgroup number.
if name is None:
name = str(len(self.workgroups))
group = self.comm.comm.Get_group().Incl(ranks)
new_comm = self.comm.comm.Create(group)
if self.comm.rank in ranks:
communication_system.communicators.append(Communicator(new_comm))
self.workgroups.append(Workgroup(len(ranks), ranks, new_comm, name))
[docs]
def free_workgroup(self, workgroup):
# If you want to actually delete the workgroup you will need to
# pop it out of the self.workgroups list so you don't have references
# that are left dangling, e.g. see free_all() below.
for i in workgroup.ranks:
if self.comm.rank == i:
communication_system.communicators.pop()
self.available_ranks.append(i)
self.available_ranks.sort()
[docs]
def free_all(self):
for wg in self.workgroups:
self.free_workgroup(wg)
while self.workgroups:
self.workgroups.pop(0)
[docs]
@classmethod
def from_sizes(cls, sizes):
pool = cls()
rank = pool.comm.rank
for i, size in enumerate(always_iterable(sizes)):
if is_sequence(size):
size, name = size
else:
name = "workgroup_%02i" % i
pool.add_workgroup(size, name=name)
for wg in pool.workgroups:
if rank in wg.ranks:
workgroup = wg
return pool, workgroup
def __getitem__(self, key):
for wg in self.workgroups:
if wg.name == key:
return wg
raise KeyError(key)
[docs]
class ResultsStorage:
slots = ["result", "result_id"]
result = None
result_id = None
[docs]
def parallel_objects(objects, njobs=0, storage=None, barrier=True, dynamic=False):
r"""This function dispatches components of an iterable to different
processors.
The parallel_objects function accepts an iterable, *objects*, and based on
the number of jobs requested and number of available processors, decides
how to dispatch individual objects to processors or sets of processors.
This can implicitly include multi-level parallelism, such that the
processor groups assigned each object can be composed of several or even
hundreds of processors. *storage* is also available, for collation of
results at the end of the iteration loop.
Calls to this function can be nested.
This should not be used to iterate over datasets --
:class:`~yt.data_objects.time_series.DatasetSeries` provides a much nicer
interface for that.
Parameters
----------
objects : Iterable
The list of objects to dispatch to different processors.
njobs : int
How many jobs to spawn. By default, one job will be dispatched for
each available processor.
storage : dict
This is a dictionary, which will be filled with results during the
course of the iteration. The keys will be the dataset
indices and the values will be whatever is assigned to the *result*
attribute on the storage during iteration.
barrier : bool
Should a barier be placed at the end of iteration?
dynamic : bool
This governs whether or not dynamic load balancing will be enabled.
This requires one dedicated processor; if this is enabled with a set of
128 processors available, only 127 will be available to iterate over
objects as one will be load balancing the rest.
Examples
--------
Here is a simple example of iterating over a set of centers and making
slice plots centered at each.
>>> for c in parallel_objects(centers):
... SlicePlot(ds, "x", "Density", center=c).save()
...
Here's an example of calculating the angular momentum vector of a set of
spheres, but with a set of four jobs of multiple processors each. Note
that we also store the results.
>>> storage = {}
>>> for sto, c in parallel_objects(centers, njobs=4, storage=storage):
... sp = ds.sphere(c, (100, "kpc"))
... sto.result = sp.quantities["AngularMomentumVector"]()
...
>>> for sphere_id, L in sorted(storage.items()):
... print(centers[sphere_id], L)
...
"""
if dynamic:
from .task_queue import dynamic_parallel_objects
yield from dynamic_parallel_objects(objects, njobs=njobs, storage=storage)
return
if not parallel_capable:
njobs = 1
my_communicator = communication_system.communicators[-1]
my_size = my_communicator.size
if njobs <= 0:
njobs = my_size
if njobs > my_size:
mylog.error(
"You have asked for %s jobs, but you only have %s processors.",
njobs,
my_size,
)
raise RuntimeError
my_rank = my_communicator.rank
all_new_comms = np.array_split(np.arange(my_size), njobs)
for i, comm_set in enumerate(all_new_comms):
if my_rank in comm_set:
my_new_id = i
break
if parallel_capable:
communication_system.push_with_ids(all_new_comms[my_new_id].tolist())
to_share = {}
# If our objects object is slice-aware, like time series data objects are,
# this will prevent intermediate objects from being created.
oiter = itertools.islice(enumerate(objects), my_new_id, None, njobs)
for result_id, obj in oiter:
if storage is not None:
rstore = ResultsStorage()
rstore.result_id = result_id
yield rstore, obj
to_share[rstore.result_id] = rstore.result
else:
yield obj
if parallel_capable:
communication_system.pop()
if storage is not None:
# Now we have to broadcast it
new_storage = my_communicator.par_combine_object(
to_share, datatype="dict", op="join"
)
storage.update(new_storage)
if barrier:
my_communicator.barrier()
[docs]
def parallel_ring(objects, generator_func, mutable=False):
r"""This function loops in a ring around a set of objects, yielding the
results of generator_func and passing from one processor to another to
avoid IO or expensive computation.
This function is designed to operate in sequence on a set of objects, where
the creation of those objects might be expensive. For instance, this could
be a set of particles that are costly to read from disk. Processor N will
run generator_func on an object, and the results of that will both be
yielded and passed to processor N-1. If the length of the objects is not
equal to the number of processors, then the final processor in the top
communicator will re-generate the data as needed.
In all likelihood, this function will only be useful internally to yt.
Parameters
----------
objects : Iterable
The list of objects to operate on.
generator_func : callable
This function will be called on each object, and the results yielded.
It must return a single NumPy array; for multiple values, it needs to
have a custom dtype.
mutable : bool
Should the arrays be considered mutable? Currently, this will only
work if the number of processors equals the number of objects.
Examples
--------
Here is a simple example of a ring loop around a set of integers, with a
custom dtype.
>>> dt = np.dtype([("x", "float64"), ("y", "float64"), ("z", "float64")])
>>> def gfunc(o):
... np.random.seed(o)
... rv = np.empty(1000, dtype=dt)
... rv["x"] = np.random.random(1000)
... rv["y"] = np.random.random(1000)
... rv["z"] = np.random.random(1000)
... return rv
...
>>> obj = range(8)
>>> for obj, arr in parallel_ring(obj, gfunc):
... print(arr["x"].sum(), arr["y"].sum(), arr["z"].sum())
...
"""
if mutable:
raise NotImplementedError
my_comm = communication_system.communicators[-1]
my_size = my_comm.size
my_rank = my_comm.rank # This will also be the first object we access
if not parallel_capable and not mutable:
for obj in objects:
yield obj, generator_func(obj)
return
generate_endpoints = len(objects) != my_size
# gback False: send the object backwards
# gforw False: receive an object from forwards
if len(objects) == my_size:
generate_endpoints = False
gback = False
gforw = False
else:
# In this case, the first processor (my_rank == 0) will generate.
generate_endpoints = True
gback = my_rank == 0
gforw = my_rank == my_size - 1
if generate_endpoints and mutable:
raise NotImplementedError
# Now we need to do pairwise sends
source = (my_rank + 1) % my_size
dest = (my_rank - 1) % my_size
oiter = itertools.islice(itertools.cycle(objects), my_rank, my_rank + len(objects))
idata = None
isize = np.zeros((1,), dtype="int64")
osize = np.zeros((1,), dtype="int64")
for obj in oiter:
if idata is None or gforw:
idata = generator_func(obj)
idtype = odtype = idata.dtype
if get_mpi_type(idtype) is None:
idtype = "c"
yield obj, idata
# We first send to the previous processor
tags = []
if not gforw:
tags.append(my_comm.mpi_nonblocking_recv(isize, source))
if not gback:
osize[0] = idata.size
tags.append(my_comm.mpi_nonblocking_send(osize, dest))
my_comm.mpi_Request_Waitall(tags)
odata = idata
tags = []
if not gforw:
idata = np.empty(isize[0], dtype=odtype)
tags.append(
my_comm.mpi_nonblocking_recv(idata.view(idtype), source, dtype=idtype)
)
if not gback:
tags.append(
my_comm.mpi_nonblocking_send(odata.view(idtype), dest, dtype=idtype)
)
my_comm.mpi_Request_Waitall(tags)
del odata
[docs]
class CommunicationSystem:
communicators: list["Communicator"] = []
def __init__(self):
self.communicators.append(Communicator(None))
[docs]
def push(self, new_comm):
if not isinstance(new_comm, Communicator):
new_comm = Communicator(new_comm)
self.communicators.append(new_comm)
self._update_parallel_state(new_comm)
[docs]
def push_with_ids(self, ids):
group = self.communicators[-1].comm.Get_group().Incl(ids)
new_comm = self.communicators[-1].comm.Create(group)
self.push(new_comm)
return new_comm
def _update_parallel_state(self, new_comm):
ytcfg["yt", "internals", "topcomm_parallel_size"] = new_comm.size
ytcfg["yt", "internals", "topcomm_parallel_rank"] = new_comm.rank
if new_comm.rank > 0 and ytcfg.get("yt", "serialize"):
ytcfg["yt", "only_deserialize"] = True
[docs]
def pop(self):
self.communicators.pop()
self._update_parallel_state(self.communicators[-1])
def _reconstruct_communicator():
return communication_system.communicators[-1]
[docs]
class Communicator:
comm = None
_grids = None
_distributed = None
__tocast = "c"
def __init__(self, comm=None):
self.comm = comm
self._distributed = comm is not None and self.comm.size > 1
def __del__(self):
if self.comm is not None:
self.comm.Free()
"""
This is an interface specification providing several useful utility
functions for analyzing something in parallel.
"""
def __reduce__(self):
# We don't try to reconstruct any of the properties of the communicator
# or the processors. In general, we don't want to.
return (_reconstruct_communicator, ())
[docs]
def barrier(self):
if not self._distributed:
return
mylog.debug("Opening MPI Barrier on %s", self.comm.rank)
self.comm.Barrier()
[docs]
def mpi_exit_test(self, data=False):
# data==True -> exit. data==False -> no exit
mine, statuses = self.mpi_info_dict(data)
if True in statuses.values():
raise RuntimeError("Fatal error. Exiting.")
return None
[docs]
@parallel_passthrough
def par_combine_object(self, data, op, datatype=None):
# op can be chosen from:
# cat
# join
# data is selected to be of types:
# np.ndarray
# dict
# data field dict
if datatype is not None:
pass
elif isinstance(data, dict):
datatype = "dict"
elif isinstance(data, np.ndarray):
datatype = "array"
elif isinstance(data, list):
datatype = "list"
# Now we have our datatype, and we conduct our operation
if datatype == "dict" and op == "join":
if self.comm.rank == 0:
for i in range(1, self.comm.size):
data.update(self.comm.recv(source=i, tag=0))
else:
self.comm.send(data, dest=0, tag=0)
# Send the keys first, then each item one by one
# This is to prevent MPI from crashing when sending more
# than 2GiB of data over the network.
keys = self.comm.bcast(list(data.keys()), root=0)
for key in keys:
tmp = data.get(key, None)
data[key] = self.comm.bcast(tmp, root=0)
return data
elif datatype == "dict" and op == "cat":
field_keys = sorted(data.keys())
size = data[field_keys[0]].shape[-1]
sizes = np.zeros(self.comm.size, dtype="int64")
outsize = np.array(size, dtype="int64")
self.comm.Allgather([outsize, 1, MPI.LONG], [sizes, 1, MPI.LONG])
# This nested concatenate is to get the shapes to work out correctly;
# if we just add [0] to sizes, it will broadcast a summation, not a
# concatenation.
offsets = np.add.accumulate(np.concatenate([[0], sizes]))[:-1]
arr_size = self.comm.allreduce(size, op=MPI.SUM)
for key in field_keys:
dd = data[key]
rv = self.alltoallv_array(dd, arr_size, offsets, sizes)
data[key] = rv
return data
elif datatype == "array" and op == "cat":
if data is None:
ncols = -1
size = 0
dtype = "float64"
mylog.warning(
"Array passed to par_combine_object was None. "
"Setting dtype to float64. This may break things!"
)
else:
dtype = data.dtype
if len(data) == 0:
ncols = -1
size = 0
elif len(data.shape) == 1:
ncols = 1
size = data.shape[0]
else:
ncols, size = data.shape
ncols = self.comm.allreduce(ncols, op=MPI.MAX)
if ncols == 0:
data = np.zeros(0, dtype=dtype) # This only works for
elif data is None:
data = np.zeros((ncols, 0), dtype=dtype)
size = data.shape[-1]
sizes = np.zeros(self.comm.size, dtype="int64")
outsize = np.array(size, dtype="int64")
self.comm.Allgather([outsize, 1, MPI.LONG], [sizes, 1, MPI.LONG])
# This nested concatenate is to get the shapes to work out correctly;
# if we just add [0] to sizes, it will broadcast a summation, not a
# concatenation.
offsets = np.add.accumulate(np.concatenate([[0], sizes]))[:-1]
arr_size = self.comm.allreduce(size, op=MPI.SUM)
data = self.alltoallv_array(data, arr_size, offsets, sizes)
return data
elif datatype == "list" and op == "cat":
recv_data = self.comm.allgather(data)
# Now flatten into a single list, since this
# returns us a list of lists.
data = []
while recv_data:
data.extend(recv_data.pop(0))
return data
raise NotImplementedError
[docs]
@parallel_passthrough
def mpi_bcast(self, data, root=0):
# The second check below makes sure that we know how to communicate
# this type of array. Otherwise, we'll pickle it.
if isinstance(data, np.ndarray) and get_mpi_type(data.dtype) is not None:
if self.comm.rank == root:
if isinstance(data, YTArray):
info = (
data.shape,
data.dtype,
str(data.units),
data.units.registry.lut,
)
if isinstance(data, ImageArray):
info += ("ImageArray",)
else:
info += ("YTArray",)
else:
info = (data.shape, data.dtype)
else:
info = ()
info = self.comm.bcast(info, root=root)
if self.comm.rank != root:
if len(info) == 5:
registry = UnitRegistry(lut=info[3], add_default_symbols=False)
if info[-1] == "ImageArray":
data = ImageArray(
np.empty(info[0], dtype=info[1]),
units=info[2],
registry=registry,
)
else:
data = YTArray(
np.empty(info[0], dtype=info[1]), info[2], registry=registry
)
else:
data = np.empty(info[0], dtype=info[1])
mpi_type = get_mpi_type(info[1])
self.comm.Bcast([data, mpi_type], root=root)
return data
else:
# Use pickled methods.
data = self.comm.bcast(data, root=root)
return data
[docs]
def preload(self, grids, fields, io_handler):
# This is non-functional.
return
[docs]
@parallel_passthrough
def mpi_allreduce(self, data, dtype=None, op="sum"):
op = op_names[op]
if isinstance(data, np.ndarray) and data.dtype != bool:
if dtype is None:
dtype = data.dtype
if dtype != data.dtype:
data = data.astype(dtype)
temp = data.copy()
self.comm.Allreduce(
[temp, get_mpi_type(dtype)], [data, get_mpi_type(dtype)], op
)
return data
else:
# We use old-school pickling here on the assumption the arrays are
# relatively small ( < 1e7 elements )
return self.comm.allreduce(data, op)
###
# Non-blocking stuff.
###
[docs]
def mpi_nonblocking_recv(self, data, source, tag=0, dtype=None):
if not self._distributed:
return -1
if dtype is None:
dtype = data.dtype
mpi_type = get_mpi_type(dtype)
return self.comm.Irecv([data, mpi_type], source, tag)
[docs]
def mpi_nonblocking_send(self, data, dest, tag=0, dtype=None):
if not self._distributed:
return -1
if dtype is None:
dtype = data.dtype
mpi_type = get_mpi_type(dtype)
return self.comm.Isend([data, mpi_type], dest, tag)
[docs]
def mpi_Request_Waitall(self, hooks):
if not self._distributed:
return
MPI.Request.Waitall(hooks)
[docs]
def mpi_Request_Waititer(self, hooks):
for _hook in hooks:
req = MPI.Request.Waitany(hooks)
yield req
[docs]
def mpi_Request_Testall(self, hooks):
"""
This returns False if any of the request hooks are un-finished,
and True if they are all finished.
"""
if not self._distributed:
return True
return MPI.Request.Testall(hooks)
###
# End non-blocking stuff.
###
###
# Parallel rank and size properties.
###
@property
def size(self):
if not self._distributed:
return 1
return self.comm.size
@property
def rank(self):
if not self._distributed:
return 0
return self.comm.rank
[docs]
def mpi_info_dict(self, info):
if not self._distributed:
return 0, {0: info}
data = None
if self.comm.rank == 0:
data = {0: info}
for i in range(1, self.comm.size):
data[i] = self.comm.recv(source=i, tag=0)
else:
self.comm.send(info, dest=0, tag=0)
mylog.debug("Opening MPI Broadcast on %s", self.comm.rank)
data = self.comm.bcast(data, root=0)
return self.comm.rank, data
[docs]
def claim_object(self, obj):
if not self._distributed:
return
obj._owner = self.comm.rank
obj._distributed = True
[docs]
def do_not_claim_object(self, obj):
if not self._distributed:
return
obj._owner = -1
obj._distributed = True
[docs]
def write_on_root(self, fn):
if not self._distributed:
return open(fn, "w")
if self.comm.rank == 0:
return open(fn, "w")
else:
return StringIO()
[docs]
def get_filename(self, prefix, rank=None):
if not self._distributed:
return prefix
if rank is None:
return "%s_%04i" % (prefix, self.comm.rank)
else:
return "%s_%04i" % (prefix, rank)
[docs]
def is_mine(self, obj):
if not obj._distributed:
return True
return obj._owner == self.comm.rank
[docs]
def send_quadtree(self, target, buf, tgd, args):
sizebuf = np.zeros(1, "int64")
sizebuf[0] = buf[0].size
self.comm.Send([sizebuf, MPI.LONG], dest=target)
self.comm.Send([buf[0], MPI.INT], dest=target)
self.comm.Send([buf[1], MPI.DOUBLE], dest=target)
self.comm.Send([buf[2], MPI.DOUBLE], dest=target)
[docs]
def recv_quadtree(self, target, tgd, args):
sizebuf = np.zeros(1, "int64")
self.comm.Recv(sizebuf, source=target)
buf = [
np.empty((sizebuf[0],), "int32"),
np.empty((sizebuf[0], args[2]), "float64"),
np.empty((sizebuf[0],), "float64"),
]
self.comm.Recv([buf[0], MPI.INT], source=target)
self.comm.Recv([buf[1], MPI.DOUBLE], source=target)
self.comm.Recv([buf[2], MPI.DOUBLE], source=target)
return buf
[docs]
@parallel_passthrough
def merge_quadtree_buffers(self, qt, merge_style):
# This is a modified version of pairwise reduction from Lisandro Dalcin,
# in the reductions demo of mpi4py
size = self.comm.size
rank = self.comm.rank
mask = 1
buf = qt.tobuffer()
print("PROC", rank, buf[0].shape, buf[1].shape, buf[2].shape)
sys.exit()
args = qt.get_args() # Will always be the same
tgd = np.array([args[0], args[1]], dtype="int64")
sizebuf = np.zeros(1, "int64")
while mask < size:
if (mask & rank) != 0:
target = (rank & ~mask) % size
# print("SENDING FROM %02i to %02i" % (rank, target))
buf = qt.tobuffer()
self.send_quadtree(target, buf, tgd, args)
# qt = self.recv_quadtree(target, tgd, args)
else:
target = rank | mask
if target < size:
# print("RECEIVING FROM %02i on %02i" % (target, rank))
buf = self.recv_quadtree(target, tgd, args)
qto = QuadTree(tgd, args[2], qt.bounds)
qto.frombuffer(buf[0], buf[1], buf[2], merge_style)
merge_quadtrees(qt, qto, style=merge_style)
del qto
# self.send_quadtree(target, qt, tgd, args)
mask <<= 1
if rank == 0:
buf = qt.tobuffer()
sizebuf[0] = buf[0].size
self.comm.Bcast([sizebuf, MPI.LONG], root=0)
if rank != 0:
buf = [
np.empty((sizebuf[0],), "int32"),
np.empty((sizebuf[0], args[2]), "float64"),
np.empty((sizebuf[0],), "float64"),
]
self.comm.Bcast([buf[0], MPI.INT], root=0)
self.comm.Bcast([buf[1], MPI.DOUBLE], root=0)
self.comm.Bcast([buf[2], MPI.DOUBLE], root=0)
self.refined = buf[0]
if rank != 0:
qt = QuadTree(tgd, args[2], qt.bounds)
qt.frombuffer(buf[0], buf[1], buf[2], merge_style)
return qt
[docs]
def send_array(self, arr, dest, tag=0):
if not isinstance(arr, np.ndarray):
self.comm.send((None, None), dest=dest, tag=tag)
self.comm.send(arr, dest=dest, tag=tag)
return
tmp = arr.view(self.__tocast) # Cast to CHAR
# communicate type and shape and optionally units
if isinstance(arr, YTArray):
unit_metadata = (str(arr.units), arr.units.registry.lut)
if isinstance(arr, ImageArray):
unit_metadata += ("ImageArray",)
else:
unit_metadata += ("YTArray",)
else:
unit_metadata = ()
self.comm.send((arr.dtype.str, arr.shape) + unit_metadata, dest=dest, tag=tag)
self.comm.Send([arr, MPI.CHAR], dest=dest, tag=tag)
del tmp
[docs]
def recv_array(self, source, tag=0):
metadata = self.comm.recv(source=source, tag=tag)
dt, ne = metadata[:2]
if ne is None and dt is None:
return self.comm.recv(source=source, tag=tag)
arr = np.empty(ne, dtype=dt)
if len(metadata) == 5:
registry = UnitRegistry(lut=metadata[3], add_default_symbols=False)
if metadata[-1] == "ImageArray":
arr = ImageArray(arr, units=metadata[2], registry=registry)
else:
arr = YTArray(arr, metadata[2], registry=registry)
tmp = arr.view(self.__tocast)
self.comm.Recv([tmp, MPI.CHAR], source=source, tag=tag)
return arr
[docs]
def alltoallv_array(self, send, total_size, offsets, sizes):
if len(send.shape) > 1:
recv = []
for i in range(send.shape[0]):
recv.append(
self.alltoallv_array(send[i, :].copy(), total_size, offsets, sizes)
)
recv = np.array(recv)
return recv
offset = offsets[self.comm.rank]
tmp_send = send.view(self.__tocast)
recv = np.empty(total_size, dtype=send.dtype)
if isinstance(send, YTArray):
# We assume send.units is consistent with the units
# on the receiving end.
if isinstance(send, ImageArray):
recv = ImageArray(recv, units=send.units)
else:
recv = YTArray(recv, send.units)
recv[offset : offset + send.size] = send[:]
dtr = send.dtype.itemsize / tmp_send.dtype.itemsize # > 1
roff = [off * dtr for off in offsets]
rsize = [siz * dtr for siz in sizes]
tmp_recv = recv.view(self.__tocast)
self.comm.Allgatherv(
(tmp_send, tmp_send.size, MPI.CHAR), (tmp_recv, (rsize, roff), MPI.CHAR)
)
return recv
[docs]
def probe_loop(self, tag, callback):
while True:
st = MPI.Status()
self.comm.Probe(MPI.ANY_SOURCE, tag=tag, status=st)
try:
callback(st)
except StopIteration:
mylog.debug("Probe loop ending.")
break
communication_system = CommunicationSystem()
[docs]
class ParallelAnalysisInterface:
comm = None
_grids = None
_distributed = None
def __init__(self, comm=None):
if comm is None:
self.comm = communication_system.communicators[-1]
else:
self.comm = comm
self._grids = self.comm._grids
self._distributed = self.comm._distributed
def _get_objs(self, attr, *args, **kwargs):
if self._distributed:
rr = kwargs.pop("round_robin", False)
self._initialize_parallel(*args, **kwargs)
return ParallelObjectIterator(self, attr=attr, round_robin=rr)
return ObjectIterator(self, attr=attr)
def _get_grids(self, *args, **kwargs):
if self._distributed:
self._initialize_parallel(*args, **kwargs)
return ParallelObjectIterator(self, attr="_grids")
return ObjectIterator(self, attr="_grids")
def _get_grid_objs(self):
if self._distributed:
return ParallelObjectIterator(self, True, attr="_grids")
return ObjectIterator(self, True, attr="_grids")
[docs]
def get_dependencies(self, fields):
deps = []
fi = self.ds.field_info
for field in fields:
if any(getattr(v, "ghost_zones", 0) > 0 for v in fi[field].validators):
continue
deps += list(
always_iterable(fi[field].get_dependencies(ds=self.ds).requested)
)
return list(set(deps))
def _initialize_parallel(self):
pass
def _finalize_parallel(self):
pass
[docs]
def partition_index_2d(self, axis):
if not self._distributed:
return False, self.index.grid_collection(self.center, self.index.grids)
xax = self.ds.coordinates.x_axis[axis]
yax = self.ds.coordinates.y_axis[axis]
cc = MPI.Compute_dims(self.comm.size, 2)
mi = self.comm.rank
cx, cy = np.unravel_index(mi, cc)
x = np.mgrid[0 : 1 : (cc[0] + 1) * 1j][cx : cx + 2]
y = np.mgrid[0 : 1 : (cc[1] + 1) * 1j][cy : cy + 2]
DLE, DRE = self.ds.domain_left_edge.copy(), self.ds.domain_right_edge.copy()
LE = np.ones(3, dtype="float64") * DLE
RE = np.ones(3, dtype="float64") * DRE
LE[xax] = x[0] * (DRE[xax] - DLE[xax]) + DLE[xax]
RE[xax] = x[1] * (DRE[xax] - DLE[xax]) + DLE[xax]
LE[yax] = y[0] * (DRE[yax] - DLE[yax]) + DLE[yax]
RE[yax] = y[1] * (DRE[yax] - DLE[yax]) + DLE[yax]
mylog.debug("Dimensions: %s %s", LE, RE)
reg = self.ds.region(self.center, LE, RE)
return True, reg
[docs]
def partition_index_3d(self, ds, padding=0.0, rank_ratio=1):
LE, RE = np.array(ds.left_edge), np.array(ds.right_edge)
# We need to establish if we're looking at a subvolume, in which case
# we *do* want to pad things.
if (LE == self.ds.domain_left_edge).all() and (
RE == self.ds.domain_right_edge
).all():
subvol = False
else:
subvol = True
if not self._distributed and not subvol:
return False, LE, RE, ds
if not self._distributed and subvol:
return True, LE, RE, self.ds.region(self.center, LE - padding, RE + padding)
elif ytcfg.get("yt", "inline"):
# At this point, we want to identify the root grid tile to which
# this processor is assigned.
# The only way I really know how to do this is to get the level-0
# grid that belongs to this processor.
grids = self.ds.index.select_grids(0)
root_grids = [g for g in grids if g.proc_num == self.comm.rank]
if len(root_grids) != 1:
raise RuntimeError
# raise KeyError
LE = root_grids[0].LeftEdge
RE = root_grids[0].RightEdge
return True, LE, RE, self.ds.region(self.center, LE, RE)
cc = MPI.Compute_dims(self.comm.size / rank_ratio, 3)
mi = self.comm.rank % (self.comm.size // rank_ratio)
cx, cy, cz = np.unravel_index(mi, cc)
x = np.mgrid[LE[0] : RE[0] : (cc[0] + 1) * 1j][cx : cx + 2]
y = np.mgrid[LE[1] : RE[1] : (cc[1] + 1) * 1j][cy : cy + 2]
z = np.mgrid[LE[2] : RE[2] : (cc[2] + 1) * 1j][cz : cz + 2]
LE = np.array([x[0], y[0], z[0]], dtype="float64")
RE = np.array([x[1], y[1], z[1]], dtype="float64")
if padding > 0:
return True, LE, RE, self.ds.region(self.center, LE - padding, RE + padding)
return False, LE, RE, self.ds.region(self.center, LE, RE)
[docs]
def partition_region_3d(self, left_edge, right_edge, padding=0.0, rank_ratio=1):
"""
Given a region, it subdivides it into smaller regions for parallel
analysis.
"""
LE, RE = left_edge[:], right_edge[:]
if not self._distributed:
raise NotImplementedError
cc = MPI.Compute_dims(self.comm.size / rank_ratio, 3)
mi = self.comm.rank % (self.comm.size // rank_ratio)
cx, cy, cz = np.unravel_index(mi, cc)
x = np.mgrid[LE[0] : RE[0] : (cc[0] + 1) * 1j][cx : cx + 2]
y = np.mgrid[LE[1] : RE[1] : (cc[1] + 1) * 1j][cy : cy + 2]
z = np.mgrid[LE[2] : RE[2] : (cc[2] + 1) * 1j][cz : cz + 2]
LE = np.array([x[0], y[0], z[0]], dtype="float64")
RE = np.array([x[1], y[1], z[1]], dtype="float64")
if padding > 0:
return True, LE, RE, self.ds.region(self.center, LE - padding, RE + padding)
return False, LE, RE, self.ds.region(self.center, LE, RE)
[docs]
def partition_index_3d_bisection_list(self):
"""
Returns an array that is used to drive _partition_index_3d_bisection,
below.
"""
def factor(n):
if n == 1:
return [1]
i = 2
limit = n**0.5
while i <= limit:
if n % i == 0:
ret = factor(n / i)
ret.append(i)
return ret
i += 1
return [n]
cc = MPI.Compute_dims(self.comm.size, 3)
si = self.comm.size
factors = factor(si)
xyzfactors = [factor(cc[0]), factor(cc[1]), factor(cc[2])]
# Each entry of cuts is a two element list, that is:
# [cut dim, number of cuts]
cuts = []
# The higher cuts are in the beginning.
# We're going to do our best to make the cuts cyclic, i.e. x, then y,
# then z, etc...
lastdim = 0
for f in factors:
nextdim = (lastdim + 1) % 3
while True:
if f in xyzfactors[nextdim]:
cuts.append([nextdim, f])
topop = xyzfactors[nextdim].index(f)
xyzfactors[nextdim].pop(topop)
lastdim = nextdim
break
nextdim = (nextdim + 1) % 3
return cuts
[docs]
class GroupOwnership(ParallelAnalysisInterface):
def __init__(self, items):
ParallelAnalysisInterface.__init__(self)
self.num_items = len(items)
self.items = items
assert self.num_items >= self.comm.size
self.owned = list(range(self.comm.size))
self.pointer = 0
if parallel_capable:
communication_system.push_with_ids([self.comm.rank])
def __del__(self):
if parallel_capable:
communication_system.pop()
[docs]
def inc(self, n=-1):
old_item = self.item
if n == -1:
n = self.comm.size
for _ in range(n):
if self.pointer >= self.num_items - self.comm.size:
break
self.owned[self.pointer % self.comm.size] += self.comm.size
self.pointer += 1
if self.item is not old_item:
self.switch()
[docs]
def dec(self, n=-1):
old_item = self.item
if n == -1:
n = self.comm.size
for _ in range(n):
if self.pointer == 0:
break
self.owned[(self.pointer - 1) % self.comm.size] -= self.comm.size
self.pointer -= 1
if self.item is not old_item:
self.switch()
_last = None
@property
def item(self):
own = self.owned[self.comm.rank]
if self._last != own:
self._item = self.items[own]
self._last = own
return self._item