import numpy as np
from yt.data_objects.selection_objects.data_selection_objects import (
YTSelectionContainer,
)
from yt.funcs import mylog
from yt.utilities.lib.mesh_utilities import fill_fcoords, fill_fwidths
[docs]
class UnstructuredMesh(YTSelectionContainer):
# This is a base class, not meant to be used directly.
_spatial = False
_connectivity_length = -1
_type_name = "unstructured_mesh"
_skip_add = True
_index_offset = 0
_con_args = ("mesh_id", "filename", "connectivity_indices", "connectivity_coords")
def __init__(
self, mesh_id, filename, connectivity_indices, connectivity_coords, index
):
super().__init__(index.dataset, None)
self.filename = filename
self.mesh_id = mesh_id
# This is where we set up the connectivity information
self.connectivity_indices = connectivity_indices
if connectivity_indices.shape[1] != self._connectivity_length:
if self._connectivity_length == -1:
self._connectivity_length = connectivity_indices.shape[1]
else:
raise RuntimeError
self.connectivity_coords = connectivity_coords
self.ds = index.dataset
self._index = index
self._last_mask = None
self._last_count = -1
self._last_selector_id = None
def _check_consistency(self):
if self.connectivity_indices.shape[1] != self._connectivity_length:
raise RuntimeError
for gi in range(self.connectivity_indices.shape[0]):
ind = self.connectivity_indices[gi, :] - self._index_offset
coords = self.connectivity_coords[ind, :]
for i in range(3):
assert np.unique(coords[:, i]).size == 2
mylog.debug("Connectivity is consistent.")
def __repr__(self):
return "UnstructuredMesh_%04i" % (self.mesh_id)
[docs]
def get_global_startindex(self):
"""
Return the integer starting index for each dimension at the current
level.
"""
raise NotImplementedError
[docs]
def convert(self, datatype):
"""
This will attempt to convert a given unit to cgs from code units. It
either returns the multiplicative factor or throws a KeyError.
"""
return self.ds[datatype]
@property
def shape(self):
raise NotImplementedError
def _generate_container_field(self, field):
raise NotImplementedError
[docs]
def select_fcoords(self, dobj=None):
# This computes centroids!
mask = self._get_selector_mask(dobj.selector)
if mask is None:
return np.empty((0, 3), dtype="float64")
centers = fill_fcoords(
self.connectivity_coords, self.connectivity_indices, self._index_offset
)
return centers[mask, :]
[docs]
def select_fwidth(self, dobj):
raise NotImplementedError
[docs]
def select_icoords(self, dobj):
raise NotImplementedError
[docs]
def select_ires(self, dobj):
raise NotImplementedError
[docs]
def select_tcoords(self, dobj):
raise NotImplementedError
[docs]
def deposit(self, positions, fields=None, method=None, kernel_name="cubic"):
raise NotImplementedError
[docs]
def select_blocks(self, selector):
mask = self._get_selector_mask(selector)
yield self, mask
[docs]
def select(self, selector, source, dest, offset):
mask = self._get_selector_mask(selector)
count = self.count(selector)
if count == 0:
return 0
dest[offset : offset + count] = source[mask, ...]
return count
[docs]
def count(self, selector):
mask = self._get_selector_mask(selector)
if mask is None:
return 0
return self._last_count
[docs]
def count_particles(self, selector, x, y, z):
# We don't cache the selector results
count = selector.count_points(x, y, z, 0.0)
return count
[docs]
def select_particles(self, selector, x, y, z):
mask = selector.select_points(x, y, z, 0.0)
return mask
def _get_selector_mask(self, selector):
if hash(selector) == self._last_selector_id:
mask = self._last_mask
else:
self._last_mask = mask = selector.fill_mesh_cell_mask(self)
self._last_selector_id = hash(selector)
if mask is None:
self._last_count = 0
else:
self._last_count = mask.sum()
return mask
[docs]
def select_fcoords_vertex(self, dobj=None):
mask = self._get_selector_mask(dobj.selector)
if mask is None:
return np.empty((0, self._connectivity_length, 3), dtype="float64")
vertices = self.connectivity_coords[self.connectivity_indices - 1]
return vertices[mask, :, :]
[docs]
class SemiStructuredMesh(UnstructuredMesh):
_connectivity_length = 8
_type_name = "semi_structured_mesh"
_container_fields = ("dx", "dy", "dz")
def __repr__(self):
return "SemiStructuredMesh_%04i" % (self.mesh_id)
def _generate_container_field(self, field):
if self._current_chunk is None:
self.index._identify_base_chunk(self)
if field == "dx":
return self._current_chunk.fwidth[:, 0]
elif field == "dy":
return self._current_chunk.fwidth[:, 1]
elif field == "dz":
return self._current_chunk.fwidth[:, 2]
[docs]
def select_fwidth(self, dobj):
mask = self._get_selector_mask(dobj.selector)
if mask is None:
return np.empty((0, 3), dtype="float64")
widths = fill_fwidths(
self.connectivity_coords, self.connectivity_indices, self._index_offset
)
return widths[mask, :]
[docs]
def select_ires(self, dobj):
ind = np.zeros(self.connectivity_indices.shape[0])
mask = self._get_selector_mask(dobj.selector)
if mask is None:
return np.empty(0, dtype="int32")
return ind[mask]
[docs]
def select_tcoords(self, dobj):
mask = self._get_selector_mask(dobj.selector)
if mask is None:
return np.empty(0, dtype="float64")
dt, t = dobj.selector.get_dt_mesh(self, mask.sum(), self._index_offset)
return dt, t
def _get_selector_mask(self, selector):
if hash(selector) == self._last_selector_id:
mask = self._last_mask
else:
self._last_mask = mask = selector.fill_mesh_cell_mask(self)
self._last_selector_id = hash(selector)
if mask is None:
self._last_count = 0
else:
self._last_count = mask.sum()
return mask
[docs]
def select(self, selector, source, dest, offset):
mask = self._get_selector_mask(selector)
count = self.count(selector)
if count == 0:
return 0
# Note: this likely will not work with vector fields.
dest[offset : offset + count] = source.flat[mask]
return count