yt.utilities.parallel_tools.parallel_analysis_interface module

Parallel data mapping techniques for yt

class yt.utilities.parallel_tools.parallel_analysis_interface.CommunicationSystem[source]

Bases: object

communicators = [<yt.utilities.parallel_tools.parallel_analysis_interface.Communicator object>]
class yt.utilities.parallel_tools.parallel_analysis_interface.Communicator(comm=None)[source]

Bases: object

alltoallv_array(send, total_size, offsets, sizes)[source]
comm = None
get_filename(prefix, rank=None)[source]
merge_quadtree_buffers(qt, merge_style)[source]

This returns False if any of the request hooks are un-finished, and True if they are all finished.

mpi_allreduce(data, dtype=None, op='sum')[source]
mpi_bcast(data, root=0)[source]
mpi_nonblocking_recv(data, source, tag=0, dtype=None)[source]
mpi_nonblocking_send(data, dest, tag=0, dtype=None)[source]
par_combine_object(data, op, datatype=None)[source]
preload(grids, fields, io_handler)[source]
probe_loop(tag, callback)[source]
recv_array(source, tag=0)[source]
recv_quadtree(target, tgd, args)[source]
send_array(arr, dest, tag=0)[source]
send_quadtree(target, buf, tgd, args)[source]
class yt.utilities.parallel_tools.parallel_analysis_interface.FilterAllMessages(name='')[source]

Bases: logging.Filter

This is a simple filter for logging.Logger’s that won’t let any messages pass.

class yt.utilities.parallel_tools.parallel_analysis_interface.GroupOwnership(items)[source]

Bases: yt.utilities.parallel_tools.parallel_analysis_interface.ParallelAnalysisInterface

comm = None
partition_index_3d(ds, padding=0.0, rank_ratio=1)

Returns an array that is used to drive _partition_index_3d_bisection, below.

partition_region_3d(left_edge, right_edge, padding=0.0, rank_ratio=1)

Given a region, it subdivides it into smaller regions for parallel analysis.

class yt.utilities.parallel_tools.parallel_analysis_interface.ObjectIterator(pobj, just_list=False, attr='_grids')[source]

Bases: object

This is a generalized class that accepts a list of objects and then attempts to intelligently iterate over them.

class yt.utilities.parallel_tools.parallel_analysis_interface.ParallelAnalysisInterface(comm=None)[source]

Bases: object

comm = None
partition_index_3d(ds, padding=0.0, rank_ratio=1)[source]

Returns an array that is used to drive _partition_index_3d_bisection, below.

partition_region_3d(left_edge, right_edge, padding=0.0, rank_ratio=1)[source]

Given a region, it subdivides it into smaller regions for parallel analysis.

class yt.utilities.parallel_tools.parallel_analysis_interface.ParallelDummy(name, bases, d)[source]

Bases: type

This is a base class that, on instantiation, replaces all attributes that don’t start with _ with parallel_simple_proxy()-wrapped attributes. Used as a metaclass.

mro() → list

return a type’s method resolution order

class yt.utilities.parallel_tools.parallel_analysis_interface.ParallelObjectIterator(pobj, just_list=False, attr='_grids', round_robin=False)[source]

Bases: yt.utilities.parallel_tools.parallel_analysis_interface.ObjectIterator

This takes an object, pobj, that implements ParallelAnalysisInterface, and then does its thing, calling initliaze and finalize on the object.

class yt.utilities.parallel_tools.parallel_analysis_interface.ProcessorPool[source]

Bases: object

add_workgroup(size=None, ranks=None, name=None)[source]
available_ranks = None
comm = None
classmethod from_sizes(sizes)[source]
ranks = None
size = None
tasks = None
class yt.utilities.parallel_tools.parallel_analysis_interface.ResultsStorage[source]

Bases: object

result = None
result_id = None
slots = ['result', 'result_id']
class yt.utilities.parallel_tools.parallel_analysis_interface.Workgroup(size, ranks, comm, name)[source]

Bases: object

yt.utilities.parallel_tools.parallel_analysis_interface.default_mpi_excepthook(exception_type, exception_value, tb)[source]
yt.utilities.parallel_tools.parallel_analysis_interface.enable_parallelism(suppress_logging=False, communicator=None)[source]

This method is used inside a script to turn on MPI parallelism, via mpi4py. More information about running yt in parallel can be found here:

  • suppress_logging (bool) – If set to True, only rank 0 will log information after the initial setup of MPI.
  • communicator (mpi4py.MPI.Comm) – The MPI communicator to use. This controls which processes yt can see. If not specified, will be set to COMM_WORLD.

This decorator blocks on entry and exit of a function.

yt.utilities.parallel_tools.parallel_analysis_interface.parallel_objects(objects, njobs=0, storage=None, barrier=True, dynamic=False)[source]

This function dispatches components of an iterable to different processors.

The parallel_objects function accepts an iterable, objects, and based on the number of jobs requested and number of available processors, decides how to dispatch individual objects to processors or sets of processors. This can implicitly include multi-level parallelism, such that the processor groups assigned each object can be composed of several or even hundreds of processors. storage is also available, for collation of results at the end of the iteration loop.

Calls to this function can be nested.

This should not be used to iterate over datasets – DatasetSeries provides a much nicer interface for that.

  • objects (Iterable) – The list of objects to dispatch to different processors.
  • njobs (int) – How many jobs to spawn. By default, one job will be dispatched for each available processor.
  • storage (dict) – This is a dictionary, which will be filled with results during the course of the iteration. The keys will be the dataset indices and the values will be whatever is assigned to the result attribute on the storage during iteration.
  • barrier (bool) – Should a barier be placed at the end of iteration?
  • dynamic (bool) – This governs whether or not dynamic load balancing will be enabled. This requires one dedicated processor; if this is enabled with a set of 128 processors available, only 127 will be available to iterate over objects as one will be load balancing the rest.


Here is a simple example of iterating over a set of centers and making slice plots centered at each.

>>> for c in parallel_objects(centers):
...     SlicePlot(ds, "x", "Density", center = c).save()

Here’s an example of calculating the angular momentum vector of a set of spheres, but with a set of four jobs of multiple processors each. Note that we also store the results.

>>> storage = {}
>>> for sto, c in parallel_objects(centers, njobs=4, storage=storage):
...     sp = ds.sphere(c, (100, "kpc"))
...     sto.result = sp.quantities["AngularMomentumVector"]()
>>> for sphere_id, L in sorted(storage.items()):
...     print centers[sphere_id], L

If we are not run in parallel, this function passes the input back as output; otherwise, the function gets called. Used as a decorator.

yt.utilities.parallel_tools.parallel_analysis_interface.parallel_ring(objects, generator_func, mutable=False)[source]

This function loops in a ring around a set of objects, yielding the results of generator_func and passing from one processor to another to avoid IO or expensive computation.

This function is designed to operate in sequence on a set of objects, where the creation of those objects might be expensive. For instance, this could be a set of particles that are costly to read from disk. Processor N will run generator_func on an object, and the results of that will both be yielded and passed to processor N-1. If the length of the objects is not equal to the number of processors, then the final processor in the top communicator will re-generate the data as needed.

In all likelihood, this function will only be useful internally to yt.

  • objects (Iterable) – The list of objects to operate on.
  • generator_func (callable) – This function will be called on each object, and the results yielded. It must return a single NumPy array; for multiple values, it needs to have a custom dtype.
  • mutable (bool) – Should the arrays be considered mutable? Currently, this will only work if the number of processors equals the number of objects.
  • dynamic (bool) – This governs whether or not dynamic load balancing will be enabled. This requires one dedicated processor; if this is enabled with a set of 128 processors available, only 127 will be available to iterate over objects as one will be load balancing the rest.


Here is a simple example of a ring loop around a set of integers, with a custom dtype.

>>> dt = np.dtype([('x', 'float64'), ('y', 'float64'), ('z', 'float64')])
>>> def gfunc(o):
...     np.random.seed(o)
...     rv = np.empty(1000, dtype=dt)
...     rv['x'] = np.random.random(1000)
...     rv['y'] = np.random.random(1000)
...     rv['z'] = np.random.random(1000)
...     return rv
>>> obj = range(8)
>>> for obj, arr in parallel_ring(obj, gfunc):
...     print arr['x'].sum(), arr['y'].sum(), arr['z'].sum()

This decorator blocks and calls the function on the root processor, but does not broadcast results to the other processors.


This is a decorator that broadcasts the result of computation on a single processor to all other processors. To do so, it uses the _processing and _distributed flags in the object to check for blocks. Meant only to be used on objects that subclass ParallelAnalysisInterface.