import numpy as np
from yt.data_objects.construction_data_containers import YTStreamline
from yt.funcs import get_pbar
from yt.units.yt_array import YTArray
from yt.utilities.amr_kdtree.api import AMRKDTree
from yt.utilities.parallel_tools.parallel_analysis_interface import (
ParallelAnalysisInterface,
parallel_passthrough,
)
[docs]
def sanitize_length(length, ds):
# Ensure that lengths passed in with units are returned as code_length
# magnitudes without units
if isinstance(length, YTArray):
return ds.arr(length).in_units("code_length").d
else:
return length
[docs]
class Streamlines(ParallelAnalysisInterface):
r"""A collection of streamlines that flow through the volume
The Streamlines object contains a collection of streamlines
defined as paths that are parallel to a specified vector field.
Parameters
----------
ds : ~yt.data_objects.static_output.Dataset
This is the dataset to streamline
positions : array_like
An array of initial starting positions of the streamlines.
xfield : str or tuple of str, optional
The x component of the vector field to be streamlined.
Default:'velocity_x'
yfield : str or tuple of str, optional
The y component of the vector field to be streamlined.
Default:'velocity_y'
zfield : str or tuple of str, optional
The z component of the vector field to be streamlined.
Default:'velocity_z'
volume : `yt.extensions.volume_rendering.AMRKDTree`, optional
The volume to be streamlined. Can be specified for
finer-grained control, but otherwise will be automatically
generated. At this point it must use the AMRKDTree.
Default: None
dx : float, optional
Optionally specify the step size during the integration.
Default: minimum dx
length : float, optional
Optionally specify the length of integration.
Default: np.max(self.ds.domain_right_edge-self.ds.domain_left_edge)
direction : real, optional
Specifies the direction of integration. The magnitude of this
value has no effect, only the sign.
get_magnitude: bool, optional
Specifies whether the Streamlines.magnitudes array should be
filled with the magnitude of the vector field at each point in
the streamline. This seems to be a ~10% hit to performance.
Default: False
Examples
--------
>>> import matplotlib.pyplot as plt
... import numpy as np
... from mpl_toolkits.mplot3d import Axes3D
... import yt
... from yt.visualization.api import Streamlines
>>> # Load the dataset and set some parameters
>>> ds = load("IsolatedGalaxy/galaxy0030/galaxy0030")
>>> c = np.array([0.5] * 3)
>>> N = 100
>>> scale = 1.0
>>> pos_dx = np.random.random((N, 3)) * scale - scale / 2.0
>>> pos = c + pos_dx
>>> # Define and construct streamlines
>>> streamlines = Streamlines(
... ds, pos, "velocity_x", "velocity_y", "velocity_z", length=1.0
... )
>>> streamlines.integrate_through_volume()
>>> # Make a 3D plot of the streamlines and save it to disk
>>> fig = plt.figure()
>>> ax = Axes3D(fig)
>>> for stream in streamlines.streamlines:
... stream = stream[np.all(stream != 0.0, axis=1)]
... ax.plot3D(stream[:, 0], stream[:, 1], stream[:, 2], alpha=0.1)
>>> plt.savefig("streamlines.png")
"""
def __init__(
self,
ds,
positions,
xfield="velocity_x",
yfield="velocity_x",
zfield="velocity_x",
volume=None,
dx=None,
length=None,
direction=1,
get_magnitude=False,
):
ParallelAnalysisInterface.__init__(self)
self.ds = ds
self.start_positions = sanitize_length(positions, ds)
self.N = self.start_positions.shape[0]
# I need a data object to resolve the field names to field tuples
# via _determine_fields()
ad = self.ds.all_data()
self.xfield = ad._determine_fields(xfield)[0]
self.yfield = ad._determine_fields(yfield)[0]
self.zfield = ad._determine_fields(zfield)[0]
self.get_magnitude = get_magnitude
self.direction = np.sign(direction)
if volume is None:
volume = AMRKDTree(self.ds)
volume.set_fields(
[self.xfield, self.yfield, self.zfield], [False, False, False], False
)
volume.join_parallel_trees()
self.volume = volume
if dx is None:
dx = self.ds.index.get_smallest_dx()
self.dx = sanitize_length(dx, ds)
if length is None:
length = np.max(self.ds.domain_right_edge - self.ds.domain_left_edge)
self.length = sanitize_length(length, ds)
self.steps = int(self.length / self.dx) + 1
# Fix up the dx.
self.dx = 1.0 * self.length / self.steps
self.streamlines = np.zeros((self.N, self.steps, 3), dtype="float64")
self.magnitudes = None
if self.get_magnitude:
self.magnitudes = np.zeros((self.N, self.steps), dtype="float64")
[docs]
def integrate_through_volume(self):
nprocs = self.comm.size
my_rank = self.comm.rank
self.streamlines[my_rank::nprocs, 0, :] = self.start_positions[my_rank::nprocs]
pbar = get_pbar("Streamlining", self.N)
for i, stream in enumerate(self.streamlines[my_rank::nprocs]):
thismag = None
if self.get_magnitude:
thismag = self.magnitudes[i, :]
step = self.steps
while step > 1:
this_node = self.volume.locate_node(stream[-step, :])
step = self._integrate_through_brick(
this_node, stream, step, mag=thismag
)
pbar.update(i + 1)
pbar.finish()
self._finalize_parallel(None)
self.streamlines = self.ds.arr(self.streamlines, "code_length")
if self.get_magnitude:
self.magnitudes = self.ds.arr(
self.magnitudes, self.ds.field_info[self.xfield].units
)
@parallel_passthrough
def _finalize_parallel(self, data):
self.streamlines = self.comm.mpi_allreduce(self.streamlines, op="sum")
if self.get_magnitude:
self.magnitudes = self.comm.mpi_allreduce(self.magnitudes, op="sum")
def _integrate_through_brick(self, node, stream, step, periodic=False, mag=None):
LE = self.ds.domain_left_edge.d
RE = self.ds.domain_right_edge.d
while step > 1:
self.volume.get_brick_data(node)
brick = node.data
stream[-step + 1] = stream[-step]
if mag is None:
brick.integrate_streamline(
stream[-step + 1], self.direction * self.dx, None
)
else:
marr = [mag]
brick.integrate_streamline(
stream[-step + 1], self.direction * self.dx, marr
)
mag[-step + 1] = marr[0]
cur_stream = stream[-step + 1, :]
if np.sum(np.logical_or(cur_stream < LE, cur_stream >= RE)):
return 0
nLE = node.get_left_edge()
nRE = node.get_right_edge()
if np.sum(np.logical_or(cur_stream < nLE, cur_stream >= nRE)):
return step - 1
step -= 1
return step
[docs]
def clean_streamlines(self):
temp = np.empty(self.N, dtype="object")
temp2 = np.empty(self.N, dtype="object")
for i, stream in enumerate(self.streamlines):
mask = np.all(stream != 0.0, axis=1)
temp[i] = stream[mask]
temp2[i] = self.magnitudes[i, mask]
self.streamlines = temp
self.magnitudes = temp2
[docs]
def path(self, streamline_id):
"""
Returns an YTSelectionContainer1D object defined by a streamline.
Parameters
----------
streamline_id : int
This defines which streamline from the Streamlines object
that will define the YTSelectionContainer1D object.
Returns
-------
An YTStreamline YTSelectionContainer1D object
Examples
--------
>>> from yt.visualization.api import Streamlines
>>> streamlines = Streamlines(ds, [0.5] * 3)
>>> streamlines.integrate_through_volume()
>>> stream = streamlines.path(0)
>>> fig, ax = plt.subplots()
>>> ax.set_yscale("log")
>>> ax.plot(stream["t"], stream["gas", "density"], "-x")
"""
return YTStreamline(
self.streamlines[streamline_id], ds=self.ds, length=self.length
)