import weakref
import numpy as np
import yt.geometry.particle_deposit as particle_deposit
from yt._typing import FieldKey
from yt.config import ytcfg
from yt.data_objects.selection_objects.data_selection_objects import (
YTSelectionContainer,
)
from yt.funcs import is_sequence
from yt.geometry.selection_routines import convert_mask_to_indices
from yt.units.yt_array import YTArray
from yt.utilities.exceptions import (
YTFieldTypeNotFound,
YTParticleDepositionNotImplemented,
)
from yt.utilities.lib.interpolators import ghost_zone_interpolate
from yt.utilities.lib.mesh_utilities import clamp_edges
from yt.utilities.nodal_data_utils import get_nodal_slices
[docs]
class AMRGridPatch(YTSelectionContainer):
_spatial = True
_num_ghost_zones = 0
_grids = None
_id_offset = 1
_cache_mask = True
_type_name = "grid"
_skip_add = True
_con_args = ("id", "filename")
_container_fields = (
("index", "dx"),
("index", "dy"),
("index", "dz"),
("index", "x"),
("index", "y"),
("index", "z"),
)
OverlappingSiblings = None
def __init__(self, id, filename=None, index=None):
super().__init__(index.dataset, None)
self.id = id
self._child_mask = self._child_indices = self._child_index_mask = None
self.ds = index.dataset
self._index = weakref.proxy(index)
self.start_index = None
self.filename = filename
self._last_mask = None
self._last_count = -1
self._last_selector_id = None
[docs]
def get_global_startindex(self):
"""
Return the integer starting index for each dimension at the current
level.
"""
if self.start_index is not None:
return self.start_index
if self.Parent is None:
left = self.LeftEdge.d - self.ds.domain_left_edge.d
start_index = left / self.dds.d
return np.rint(start_index).astype("int64").ravel()
pdx = self.Parent.dds.d
di = np.rint((self.LeftEdge.d - self.Parent.LeftEdge.d) / pdx)
start_index = self.Parent.get_global_startindex() + di
self.start_index = (start_index * self.ds.refine_by).astype("int64").ravel()
return self.start_index
def __getitem__(self, key):
tr = super().__getitem__(key)
try:
fields = self._determine_fields(key)
except YTFieldTypeNotFound:
return tr
finfo = self.ds._get_field_info(fields[0])
if not finfo.sampling_type == "particle":
num_nodes = 2 ** sum(finfo.nodal_flag)
new_shape = list(self.ActiveDimensions)
if num_nodes > 1:
new_shape += [num_nodes]
return tr.reshape(new_shape)
return tr
[docs]
def convert(self, datatype):
"""
This will attempt to convert a given unit to cgs from code units. It
either returns the multiplicative factor or throws a KeyError.
"""
return self.ds[datatype]
@property
def shape(self):
return self.ActiveDimensions
def _reshape_vals(self, arr):
if len(arr.shape) == 3:
return arr
return arr.reshape(self.ActiveDimensions, order="C")
def _generate_container_field(self, field):
if self._current_chunk is None:
self.index._identify_base_chunk(self)
if field == ("index", "dx"):
tr = self._current_chunk.fwidth[:, 0]
elif field == ("index", "dy"):
tr = self._current_chunk.fwidth[:, 1]
elif field == ("index", "dz"):
tr = self._current_chunk.fwidth[:, 2]
elif field == ("index", "x"):
tr = self._current_chunk.fcoords[:, 0]
elif field == ("index", "y"):
tr = self._current_chunk.fcoords[:, 1]
elif field == ("index", "z"):
tr = self._current_chunk.fcoords[:, 2]
return self._reshape_vals(tr)
def _setup_dx(self):
# So first we figure out what the index is. We don't assume
# that dx=dy=dz, at least here. We probably do elsewhere.
id = self.id - self._id_offset
ds = self.ds
index = self.index
if self.Parent is not None:
if not hasattr(self.Parent, "dds"):
self.Parent._setup_dx()
self.dds = self.Parent.dds.d / self.ds.refine_by
else:
LE, RE = (index.grid_left_edge[id, :].d, index.grid_right_edge[id, :].d)
self.dds = (RE - LE) / self.ActiveDimensions
if self.ds.dimensionality < 3:
self.dds[2] = ds.domain_right_edge[2] - ds.domain_left_edge[2]
elif self.ds.dimensionality < 2:
self.dds[1] = ds.domain_right_edge[1] - ds.domain_left_edge[1]
self.dds = self.dds.view(YTArray)
self.dds.units = self.index.grid_left_edge.units
def __repr__(self):
cls_name = self.__class__.__name__
return f"{cls_name}_{self.id:04d} ({self.ActiveDimensions})"
def __int__(self):
return self.id
[docs]
def clear_data(self):
"""
Clear out the following things: child_mask, child_indices, all fields,
all field parameters.
"""
super().clear_data()
self._setup_dx()
def _prepare_grid(self):
"""Copies all the appropriate attributes from the index."""
# This is definitely the slowest part of generating the index
# Now we give it pointers to all of its attributes
# Note that to keep in line with Enzo, we have broken PEP-8
h = self.index # cache it
my_ind = self.id - self._id_offset
self.ActiveDimensions = h.grid_dimensions[my_ind]
self.LeftEdge = h.grid_left_edge[my_ind]
self.RightEdge = h.grid_right_edge[my_ind]
# This can be expensive so we allow people to disable this behavior
# via a config option
if ytcfg.get("yt", "reconstruct_index"):
if is_sequence(self.Parent) and len(self.Parent) > 0:
p = self.Parent[0]
else:
p = self.Parent
if p is not None and p != []:
# clamp grid edges to an integer multiple of the parent cell
# width
clamp_edges(self.LeftEdge, p.LeftEdge, p.dds)
clamp_edges(self.RightEdge, p.RightEdge, p.dds)
h.grid_levels[my_ind, 0] = self.Level
# This might be needed for streaming formats
# self.Time = h.gridTimes[my_ind,0]
self.NumberOfParticles = h.grid_particle_count[my_ind, 0]
[docs]
def get_position(self, index):
"""Returns center position of an *index*."""
pos = (index + 0.5) * self.dds + self.LeftEdge
return pos
def _fill_child_mask(self, child, mask, tofill, dlevel=1):
rf = self.ds.refine_by
if dlevel != 1:
rf = rf**dlevel
gi, cgi = self.get_global_startindex(), child.get_global_startindex()
startIndex = np.maximum(0, cgi // rf - gi)
endIndex = np.minimum(
(cgi + child.ActiveDimensions) // rf - gi, self.ActiveDimensions
)
endIndex += startIndex == endIndex
mask[
startIndex[0] : endIndex[0],
startIndex[1] : endIndex[1],
startIndex[2] : endIndex[2],
] = tofill
@property
def child_mask(self):
"""
Generates self.child_mask, which is zero where child grids exist (and
thus, where higher resolution data is available).
"""
child_mask = np.ones(self.ActiveDimensions, "bool")
for child in self.Children:
self._fill_child_mask(child, child_mask, 0)
for sibling in self.OverlappingSiblings or []:
self._fill_child_mask(sibling, child_mask, 0, dlevel=0)
return child_mask
@property
def child_indices(self):
return self.child_mask == 0
@property
def child_index_mask(self):
"""
Generates self.child_index_mask, which is -1 where there is no child,
and otherwise has the ID of the grid that resides there.
"""
child_index_mask = np.zeros(self.ActiveDimensions, "int32") - 1
for child in self.Children:
self._fill_child_mask(child, child_index_mask, child.id)
for sibling in self.OverlappingSiblings or []:
self._fill_child_mask(sibling, child_index_mask, sibling.id, dlevel=0)
return child_index_mask
[docs]
def retrieve_ghost_zones(self, n_zones, fields, all_levels=False, smoothed=False):
# We will attempt this by creating a datacube that is exactly bigger
# than the grid by nZones*dx in each direction
nl = self.get_global_startindex() - n_zones
new_left_edge = nl * self.dds + self.ds.domain_left_edge
# Something different needs to be done for the root grid, though
level = self.Level
if all_levels:
level = self.index.max_level + 1
kwargs = {
"dims": self.ActiveDimensions + 2 * n_zones,
"num_ghost_zones": n_zones,
"use_pbar": False,
"fields": fields,
}
# This should update the arguments to set the field parameters to be
# those of this grid.
field_parameters = {}
field_parameters.update(self.field_parameters)
if smoothed:
cube = self.ds.smoothed_covering_grid(
level, new_left_edge, field_parameters=field_parameters, **kwargs
)
else:
cube = self.ds.covering_grid(
level, new_left_edge, field_parameters=field_parameters, **kwargs
)
cube._base_grid = self
return cube
[docs]
def get_vertex_centered_data(
self,
fields: list[FieldKey],
smoothed: bool = True,
no_ghost: bool = False,
):
# Make sure the field list has only unique entries
fields = list(set(fields))
new_fields = {}
for field in fields:
finfo = self.ds._get_field_info(field)
new_fields[field] = self.ds.arr(
np.zeros(self.ActiveDimensions + 1), finfo.output_units
)
if no_ghost:
for field in fields:
# Ensure we have the native endianness in this array. Avoid making
# a copy if possible.
old_field = np.asarray(self[field], dtype="=f8")
# We'll use the ghost zone routine, which will naturally
# extrapolate here.
input_left = np.array([0.5, 0.5, 0.5], dtype="float64")
output_left = np.array([0.0, 0.0, 0.0], dtype="float64")
# rf = 1 here
ghost_zone_interpolate(
1, old_field, input_left, new_fields[field], output_left
)
else:
cg = self.retrieve_ghost_zones(1, fields, smoothed=smoothed)
for field in fields:
src = cg[field].in_units(new_fields[field].units).d
dest = new_fields[field].d
np.add(dest, src[1:, 1:, 1:], dest)
np.add(dest, src[:-1, 1:, 1:], dest)
np.add(dest, src[1:, :-1, 1:], dest)
np.add(dest, src[1:, 1:, :-1], dest)
np.add(dest, src[:-1, 1:, :-1], dest)
np.add(dest, src[1:, :-1, :-1], dest)
np.add(dest, src[:-1, :-1, 1:], dest)
np.add(dest, src[:-1, :-1, :-1], dest)
np.multiply(dest, 0.125, dest)
return new_fields
[docs]
def select_icoords(self, dobj):
mask = self._get_selector_mask(dobj.selector)
if mask is None:
return np.empty((0, 3), dtype="int64")
coords = convert_mask_to_indices(mask, self._last_count)
coords += self.get_global_startindex()[None, :]
return coords
[docs]
def select_fcoords(self, dobj):
mask = self._get_selector_mask(dobj.selector)
if mask is None:
return np.empty((0, 3), dtype="float64")
coords = convert_mask_to_indices(mask, self._last_count).astype("float64")
coords += 0.5
coords *= self.dds.d[None, :]
coords += self.LeftEdge.d[None, :]
return coords
[docs]
def select_fwidth(self, dobj):
count = self.count(dobj.selector)
if count == 0:
return np.empty((0, 3), dtype="float64")
coords = np.empty((count, 3), dtype="float64")
for axis in range(3):
coords[:, axis] = self.dds.d[axis]
return coords
[docs]
def select_ires(self, dobj):
mask = self._get_selector_mask(dobj.selector)
if mask is None:
return np.empty(0, dtype="int64")
coords = np.empty(self._last_count, dtype="int64")
coords[:] = self.Level
return coords
[docs]
def select_tcoords(self, dobj):
dt, t = dobj.selector.get_dt(self)
return dt, t
[docs]
def smooth(self, *args, **kwargs):
raise NotImplementedError
[docs]
def particle_operation(self, *args, **kwargs):
raise NotImplementedError
[docs]
def deposit(self, positions, fields=None, method=None, kernel_name="cubic"):
# Here we perform our particle deposition.
cls = getattr(particle_deposit, f"deposit_{method}", None)
if cls is None:
raise YTParticleDepositionNotImplemented(method)
# We allocate number of zones, not number of octs. Everything
# inside this is Fortran ordered because of the ordering in the
# octree deposit routines, so we reverse it here to match the
# convention there
nvals = tuple(self.ActiveDimensions[::-1])
# append a dummy dimension because we are only depositing onto
# one grid
op = cls(nvals + (1,), kernel_name)
op.initialize()
if positions.size > 0:
op.process_grid(self, positions, fields)
vals = op.finalize()
if vals is None:
return
# Fortran-ordered, so transpose.
vals = vals.transpose()
# squeeze dummy dimension we appended above
return np.squeeze(vals, axis=0)
[docs]
def select_blocks(self, selector):
mask = self._get_selector_mask(selector)
yield self, mask
def _get_selector_mask(self, selector):
if self._cache_mask and hash(selector) == self._last_selector_id:
mask = self._last_mask
else:
mask, count = selector.fill_mask_regular_grid(self)
if self._cache_mask:
self._last_mask = mask
self._last_selector_id = hash(selector)
if mask is None:
self._last_count = 0
else:
self._last_count = count
return mask
[docs]
def select(self, selector, source, dest, offset):
mask = self._get_selector_mask(selector)
count = self.count(selector)
if count == 0:
return 0
dim = np.squeeze(self.ds.dimensionality)
nodal_flag = source.shape[:dim] - self.ActiveDimensions[:dim]
if sum(nodal_flag) == 0:
dest[offset : offset + count] = source[mask]
else:
slices = get_nodal_slices(source.shape, nodal_flag, dim)
for i, sl in enumerate(slices):
dest[offset : offset + count, i] = source[tuple(sl)][np.squeeze(mask)]
return count
[docs]
def count(self, selector):
mask = self._get_selector_mask(selector)
if mask is None:
return 0
return self._last_count
[docs]
def count_particles(self, selector, x, y, z):
# We don't cache the selector results
count = selector.count_points(x, y, z, 0.0)
return count
[docs]
def select_particles(self, selector, x, y, z):
mask = selector.select_points(x, y, z, 0.0)
return mask