yt v2.3 documentation

Parallel Computation With YT

YT has been instrumented with the ability to compute many – most, even – quantities in parallel. This utilizes the package mpi4py to parallelize using the Message Passing Interface, typically installed on clusters.

Capabilities

Currently, YT is able to perform the following actions in parallel:

This list covers just about every action YT can take! Additionally, almost all scripts will benefit from parallelization without any modification. The goal of Parallel-YT has been to retain API compatibility and abstract all parallelism.

Setting Up Parallel YT

To run scripts in parallel, you must first install mpi4py. Instructions for doing so are provided on the mpipy website. Once that has been accomplished, you’re all done! You just need to launch your scripts with mpirun (or equivalent) and signal to YT that you want to run them in parallel.

For instance, the following script, which we’ll save as my_script.py:

from yt.mods import *
pf = load("RD0035/RedshiftOutput0035")
v, c = pf.h.find_max("Density")
print v, c
pc = PlotCollection(pf, center = [0.5, 0.5, 0.5])
pc.add_projection("Density", 0)
pc.save()

Will execute the finding of the maximum density and the projection in parallel if launched in parallel. To do so, at the command line you would execute

$ mpirun -np 16 python2.7 my_script.py --parallel

if you wanted it to run in parallel. If you run into problems, the you can use Remote and Disconnected Debugging to examine what went wrong.

Warning

If you manually interact with the filesystem, not through YT, you will have to ensure that you only execute your functions on the root processor. You can do this with the function only_on_root().

It’s important to note that all of the processes listed in capabilities work – and no additional work is necessary to parallelize those processes. Furthermore, the yt command itself recognizes the --parallel option, so those commands will work in parallel as well.

The Derived Quantities and Profile objects must both have the lazy_reader option set to True when they are instantiated. What this does is to operate on a grid-by-grid decomposed basis. In yt version 1.5 and the trunk, this has recently been set to be the default.

Types of Parallelism

In order to divide up the work, YT will attempt to send different tasks to different processors. However, to minimize inter-process communication, YT will decompose the information in different ways based on the task.

Spatial Decomposition

During this process, the hierarchy will be decomposed along either all three axes or along an image plane, if the process is that of projection. This type of parallelism is overall less efficient than grid-based parallelism, but it has been shown to obtain good results overall.

The following operations use spatial decomposition:

  • Halo finding
  • Merger tree
  • Two point functions
  • Volume rendering
  • Radial column density

Grid Decomposition

The alternative to spatial decomposition is a simple round-robin of the grids. This process alows YT to pool data access to a given Enzo data file, which ultimately results in faster read times and better parallelism.

The following operations use grid decomposition:

  • Projections
  • Slices
  • Cutting planes
  • Derived Quantities
  • 1-, 2-, and 3-D profiles
  • Isocontours & flux calculations

Object-Based

In a fashion similar to grid decomposition, computation can be parallelized over objects. This is especially useful for embarrasingly parallel tasks where the items to be worked on can be split into seperate chunks and saved to a list. The list is then split up and each MPI task performs parts of it independently.

Parallelizing Your Analysis

It is easy within YT to parallelize a list of tasks, as long as those tasks are independent of one another. Using object-based parallelism, the function parallel_objects() will automatically split up a list of tasks over the specified number of processors (or cores). Please see this heavily-commented example:

# This is necessary to prevent a race-condition where each copy of
# yt attempts to save information about datasets to the same file on disk,
# simultaneously. This will be fixed, eventually...
from yt.config import ytcfg; ytcfg["yt","serialize"] = "False"
# As always...
from yt.mods import *
import glob

# The number 4, below, is the number of processes to parallelize over, which
# is generally equal to the number of MPI tasks the job is launched with.
# If num_procs is set to zero or a negative number, the for loop below
# will be run such that each iteration of the loop is done by a single MPI
# task. Put another way, setting it to zero means that no matter how many
# MPI tasks the job is run with, num_procs will default to the number of
# MPI tasks automatically.
num_procs = 4

# fns is a list of all Enzo hierarchy files in directories one level down.
fns = glob.glob("*/*.hierarchy")
fns.sort()
# This dict will store information collected in the loop, below.
# Inside the loop each task will have a local copy of the dict, but
# the dict will be combined once the loop finishes.
my_storage = {}
# In this example, because the storage option is used in the
# parallel_objects function, the loop yields a tuple, which gets used
# as (sto, fn) inside the loop.
# In the loop, sto is essentially my_storage, but a local copy of it.
# If data does not need to be combined after the loop is done, the line
# would look like:
#       for fn in parallel_objects(fns, num_procs):
for sto, fn in parallel_objects(fns, num_procs, storage = my_storage):
    # Open a data file, remembering that fn is different on each task.
    pf = load(fn)
    dd = pf.h.all_data()
    # This copies fn and the min/max of density to the local copy of
    # my_storage
    sto.result_id = fn
    sto.result = dd.quantities["Extrema"]("Density")
    # Makes and saves a plot of the gas density.
    pc = PlotCollection(pf, [0.5, 0.5, 0.5])
    pc.add_projection("Density", 0)
    pc.save()
# At this point, as the loop exits, the local copies of my_storage are
# combined such that all tasks now have an identical and full version of
# my_storage. Until this point, each task is unaware of what the other
# tasks have produced.
# Below, the values in my_storage are printed by only one task. The other
# tasks do nothing.
if ytcfg.getint("yt", "__topcomm_parallel_rank") == 0:
    for fn, vals in sorted(my_storage.items()):
        print fn, vals

This example above can be modified to loop over anything that can be saved to a Python list: halos, data files, arrays, and more.

Parallel Performance, Resources, and Tuning

Optimizing parallel jobs in YT is difficult; there are many parameters that affect how well and quickly the job runs. In many cases, the only way to find out what the minimum (or optimal) number of processors is, or amount of memory needed, is through trial and error. However, this section will attempt to provide some insight into what are good starting values for a given parallel task.

Grid Decomposition

In general, these types of parallel calculations scale very well with number of processors. They are also fairly memory-conservative. The two limiting factors is therefore the number of grids in the dataset, and the speed of the disk the data is stored on. There is no point in running a parallel job of this kind with more processors than grids, because the extra processors will do absolutely nothing, and will in fact probably just serve to slow down the whole calculation due to the extra overhead. The speed of the disk is also a consideration - if it is not a high-end parallel file system, adding more tasks will not speed up the calculation if the disk is already swamped with activity.

The best advice for these sort of calculations is to run with just a few processors and go from there, seeing if it the runtime improves noticeably.

Projections, Slices, and Cutting Planes

Projections, slices and cutting planes are the most common methods of creating two-dimensional representations of data. All three have been parallelized in a grid-based fashion.

  • Projections: projections are parallelized utilizing a quad-tree approach. Data is loaded for each processor, typically by a process that consolidates open/close/read operations, and each grid is then iterated over and cells are deposited into a data structure that stores values correpsonding to positions in the two-dimensional plane. This provides excellent load balancing, and in serial is quite fast. However, as of yt 2.3, the operation by which quadtrees are joined across processors scales poorly; while memory consumption scales well, the time to completion does not. As such, projections can often be done very fast when operating only on a single processor! The quadtree algorithm can be used inline (and, indeed, it is for this reason that it is slow.) It is recommended that you attempt to project in serial before projecting in parallel; even for the very largest datasets (Enzo 1024^3 root grid with 7 levels of refinement) in the absence of IO the quadtree algorithm takes only three minutes or so on a decent processor.
  • Slices: to generate a slice, grids that intersect a given slice are iterated over and their finest-resolution cells are deposited. The grids are decomposed via standard load balancing. While this operation is parallel, it is almost never necessary to slice a dataset in parallel, as all data is loaded on demand anyway. The slice operation has been parallelized so as to enable slicing when running in situ.
  • Cutting planes: cutting planes are parallelized exactly as slices are. However, in contrast to slices, because the data-selection operation can be much more time consuming, cutting planes often benefit from parallelism.

Object-Based

Like grid decomposition, it does not help to run with more processors than the number of objects to be iterated over. There is also the matter of the kind of work being done on each object, and whether it is disk-intensive, cpu-intensive, or memory-intensive. It is up to the user to figure out what limits the performance of their script, and use the correct amount of resources, accordingly.

Disk-intensive jobs are limited by the speed of the file system, as above, and extra processors beyond its capability are likely counter-productive. It may require some testing or research (e.g. supercomputer documentation) to find out what the file system is capable of.

If it is cpu-intensive, it’s best to use as many processors as possible and practical.

For a memory-intensive job, each processor needs to be able to allocate enough memory, which may mean using fewer than the maximum number of tasks per compute node, and increasing the number of nodes. The memory used per processor should be calculated, compared to the memory on each compute node, which dictates how many tasks per node. After that, the number of processors used overall is dictated by the disk system or CPU-intensity of the job.

Domain Decomposition

The various types of analysis that utilize domain decomposition use them in different enough ways that they are be discussed separately.

Halo-Finding

Halo finding, along with the merger tree that uses halo finding, operates on the particles in the volume, and is therefore mostly grid-agnostic. Generally, the biggest concern for halo finding is the amount of memory needed. There is subtle art in estimating the amount of memory needed for halo finding, but a rule of thumb is that Parallel HOP (parallelHF()) is the most memory-intensive, followed by plain HOP (HaloFinder()), with Friends of Friends (FOFHaloFinder()) being the most memory-conservative. It has been found that parallelHF() needs roughly 1 MB of memory per 5,000 particles, although recent work has improved this and the memory requirement is now smaller than this. But this is a good starting point for beginning to calculate the memory required for halo-finding.

Two point functions

Please see Strategies for Computational Efficiency for more details.

Volume Rendering

The simplest way to think about volume rendering, and the radial column density module that uses it, is that it load-balances over the grids in the dataset. Each processor is given roughly the same sized volume to operate on. In practice, there are just a few things to keep in mind when doing volume rendering. First, it only uses a power of two number of processors. If the job is run with 100 processors, only 64 of them will actually do anything. Second, the absolute maximum number of processors is the number of grids. But in order to keep work distributed evenly, typically the number of processors should be no greater than one-eighth or one-quarter the number of processors that were used to produce the dataset.

Additional Tips

  • Don’t be afraid to change how a parallel job is run. Change the number of processors, or memory allocated, and see if things work better or worse. After all, it’s just a computer, it doesn’t pass moral judgment!

  • Similarly, human time is more valuable than computer time. Try increasing the number of processors, and see if the runtime drops significantly. There will be a sweet spot between speed of run and the waiting time in the job scheduler queue; it may be worth trying to find it.

  • If you are using object-based parallelism but doing CPU-intensive computations on each object, you may find that setting :py:data:’num_procs’ equal to the number of processors per compute node can lead to significant speedups. By default, most mpi implimentations will assign tasks to processors on a ‘by-slot’ basis, so this setting will tell yt to do computations on a single object using only the processors on a single compute node. A nice application for this type of parallelism is calculating a list of derived quantities for a large number of simulation outputs.

  • It is impossible to tune a parallel operation without understanding what’s going on. Read the documentation, look at the underlying code, or talk to other yt users. Get informed!

  • Sometimes it is difficult to know if a job is cpu, memory, or disk intensive, especially if the parallel job utilizes several of the kinds of parallelism discussed above. In this case, it may be worthwhile to put some simple timers in your script (as below) around different parts.

    from yt.mods import *
    import time
    
    pf = load("DD0152")
    t0 = time.time()
    bigstuff, hugestuff = StuffFinder(pf)
    BigHugeStuffParallelFunction(pf, bigstuff, hugestuff)
    t1 = time.time()
    for i in range(1000000):
        tinystuff, ministuff = GetTinyMiniStuffOffDisk("in%06d.txt" % i)
        array = TinyTeensyParallelFunction(pf, tinystuff, ministuff)
        SaveTinyMiniStuffToDisk("out%06d.txt" % i, array)
    t2 = time.time()
    
    if ytcfg.getint("yt", "__topcomm_parallel_rank") == 0:
        print "BigStuff took %.5e sec, TinyStuff took %.5e sec" % (t1 - t0, t2 - t1)
    
  • Remember that if the script handles disk IO explicitly, and does not use a built-in yt function to write data to disk, care must be taken to avoid race-conditions. Be explicit about which MPI task writes to disk using a construction something like this:

    if ytcfg.getint("yt", "__topcomm_parallel_rank") == 0:
        file = open("out.txt", "w")
        file.write(stuff)
        file.close()
    
  • Many supercomputers allow users to ssh into the nodes that their job is running on. Many job schedulers send the names of the nodes that are used in the notification emails, or a command like qstat -f NNNN, where NNNN is the job ID, will also show this information. By ssh-ing into nodes, the memory usage of each task can be viewed in real-time as the job runs (using top, for example), and can give valuable feedback about the resources the task requires.