Source code for yt.geometry.unstructured_mesh_handler

import os
import weakref

import numpy as np

from yt.geometry.geometry_handler import Index, YTDataChunk
from yt.utilities.lib.mesh_utilities import smallest_fwidth
from yt.utilities.logger import ytLogger as mylog


[docs] class UnstructuredIndex(Index): """The Index subclass for unstructured and hexahedral mesh datasets.""" _unsupported_objects = ("proj", "covering_grid", "smoothed_covering_grid") def __init__(self, ds, dataset_type): self.dataset_type = dataset_type self.dataset = weakref.proxy(ds) self.index_filename = self.dataset.parameter_filename self.directory = os.path.dirname(self.index_filename) self.float_type = np.float64 super().__init__(ds, dataset_type) def _setup_geometry(self): mylog.debug("Initializing Unstructured Mesh Geometry Handler.") self._initialize_mesh()
[docs] def get_smallest_dx(self): """ Returns (in code units) the smallest cell size in the simulation. """ dx = min( smallest_fwidth( mesh.connectivity_coords, mesh.connectivity_indices, mesh._index_offset ) for mesh in self.meshes ) return dx
[docs] def convert(self, unit): return self.dataset.conversion_factors[unit]
def _initialize_mesh(self): raise NotImplementedError def _identify_base_chunk(self, dobj): if getattr(dobj, "_chunk_info", None) is None: dobj._chunk_info = self.meshes if getattr(dobj, "size", None) is None: dobj.size = self._count_selection(dobj) dobj._current_chunk = list(self._chunk_all(dobj))[0] def _count_selection(self, dobj, meshes=None): if meshes is None: meshes = dobj._chunk_info count = sum(m.count(dobj.selector) for m in meshes) return count def _chunk_all(self, dobj, cache=True): oobjs = getattr(dobj._current_chunk, "objs", dobj._chunk_info) yield YTDataChunk(dobj, "all", oobjs, dobj.size, cache) def _chunk_spatial(self, dobj, ngz, sort=None, preload_fields=None): sobjs = getattr(dobj._current_chunk, "objs", dobj._chunk_info) # This is where we will perform cutting of the Octree and # load-balancing. That may require a specialized selector object to # cut based on some space-filling curve index. for og in sobjs: if ngz > 0: g = og.retrieve_ghost_zones(ngz, [], smoothed=True) else: g = og size = self._count_selection(dobj, [og]) if size == 0: continue yield YTDataChunk(dobj, "spatial", [g], size) def _chunk_io(self, dobj, cache=True, local_only=False): oobjs = getattr(dobj._current_chunk, "objs", dobj._chunk_info) for subset in oobjs: s = self._count_selection(dobj, oobjs) yield YTDataChunk(dobj, "io", [subset], s, cache=cache)