import os
import warnings
import weakref
import numpy as np
from yt.data_objects.index_subobjects.grid_patch import AMRGridPatch
from yt.data_objects.static_output import Dataset
from yt.fields.magnetic_field import get_magnetic_normalization
from yt.funcs import mylog, setdefaultattr
from yt.geometry.api import Geometry
from yt.geometry.grid_geometry_handler import GridIndex
from yt.utilities.chemical_formulas import compute_mu
from yt.utilities.file_handler import HDF5FileHandler
from .fields import ParthenonFieldInfo
_geom_map = {
"UniformCartesian": Geometry.CARTESIAN,
"UniformCylindrical": Geometry.CYLINDRICAL,
"UniformSpherical": Geometry.SPHERICAL,
}
# fmt: off
_cis = np.array(
[
[0, 0, 0],
[0, 0, 1],
[0, 1, 0],
[0, 1, 1],
[1, 0, 0],
[1, 0, 1],
[1, 1, 0],
[1, 1, 1],
],
dtype="int64",
)
# fmt: on
[docs]
class ParthenonGrid(AMRGridPatch):
_id_offset = 0
def __init__(self, id, index, level):
AMRGridPatch.__init__(self, id, filename=index.index_filename, index=index)
self.Parent = None
self.Children = []
self.Level = level
def _setup_dx(self):
# So first we figure out what the index is. We don't assume
# that dx=dy=dz , at least here. We probably do elsewhere.
id = self.id - self._id_offset
LE, RE = self.index.grid_left_edge[id, :], self.index.grid_right_edge[id, :]
self.dds = self.ds.arr((RE - LE) / self.ActiveDimensions, "code_length")
if self.ds.dimensionality < 2:
self.dds[1] = 1.0
if self.ds.dimensionality < 3:
self.dds[2] = 1.0
self.field_data["dx"], self.field_data["dy"], self.field_data["dz"] = self.dds
[docs]
def retrieve_ghost_zones(self, n_zones, fields, all_levels=False, smoothed=False):
if smoothed:
warnings.warn(
"ghost-zones interpolation/smoothing is not "
"currently supported for Parthenon data.",
category=RuntimeWarning,
stacklevel=2,
)
smoothed = False
return super().retrieve_ghost_zones(
n_zones, fields, all_levels=all_levels, smoothed=smoothed
)
[docs]
class ParthenonHierarchy(GridIndex):
grid = ParthenonGrid
_dataset_type = "parthenon"
_data_file = None
def __init__(self, ds, dataset_type="parthenon"):
self.dataset = weakref.proxy(ds)
self.directory = os.path.dirname(self.dataset.filename)
self.dataset_type = dataset_type
# for now, the index file is the dataset!
self.index_filename = self.dataset.filename
self._handle = ds._handle
GridIndex.__init__(self, ds, dataset_type)
def _detect_output_fields(self):
self.field_list = [("parthenon", k) for k in self.dataset._field_map]
def _count_grids(self):
self.num_grids = self._handle["Info"].attrs["NumMeshBlocks"]
def _parse_index(self):
num_grids = self._handle["Info"].attrs["NumMeshBlocks"]
# TODO: In an unlikely case this would use too much memory, implement
# chunked read along 1 dim
x = self._handle["Locations"]["x"][:, :]
y = self._handle["Locations"]["y"][:, :]
z = self._handle["Locations"]["z"][:, :]
mesh_block_size = self._handle["Info"].attrs["MeshBlockSize"]
self.grids = np.empty(self.num_grids, dtype="object")
levels = self._handle["Levels"][:]
for i in range(num_grids):
self.grid_left_edge[i] = np.array(
[x[i, 0], y[i, 0], z[i, 0]], dtype="float64"
)
self.grid_right_edge[i] = np.array(
[x[i, -1], y[i, -1], z[i, -1]], dtype="float64"
)
self.grid_dimensions[i] = mesh_block_size
self.grids[i] = self.grid(i, self, levels[i])
if self.dataset.dimensionality <= 2:
self.grid_right_edge[:, 2] = self.dataset.domain_right_edge[2]
if self.dataset.dimensionality == 1:
self.grid_right_edge[:, 1:] = self.dataset.domain_right_edge[1:]
self.grid_particle_count = np.zeros([self.num_grids, 1], dtype="int64")
def _populate_grid_objects(self):
for g in self.grids:
g._prepare_grid()
g._setup_dx()
self.max_level = self._handle["Info"].attrs["MaxLevel"]
[docs]
class ParthenonDataset(Dataset):
_field_info_class = ParthenonFieldInfo
_dataset_type = "parthenon"
_index_class = ParthenonHierarchy
def __init__(
self,
filename,
dataset_type="parthenon",
storage_filename=None,
parameters=None,
units_override=None,
unit_system="cgs",
default_species_fields=None,
magnetic_normalization="gaussian",
):
self.fluid_types += ("parthenon",)
if parameters is None:
parameters = {}
self.specified_parameters = parameters
if units_override is None:
units_override = {}
self._handle = HDF5FileHandler(filename)
xrat = self._handle["Info"].attrs["RootGridDomain"][2]
yrat = self._handle["Info"].attrs["RootGridDomain"][5]
zrat = self._handle["Info"].attrs["RootGridDomain"][8]
if xrat != 1.0 or yrat != 1.0 or zrat != 1.0:
raise NotImplementedError(
"Logarithmic grids not yet supported/tested in Parthenon frontend."
)
self._magnetic_factor = get_magnetic_normalization(magnetic_normalization)
self.geometry = _geom_map[self._handle["Info"].attrs["Coordinates"]]
if self.geometry == "cylindrical":
axis_order = ("r", "theta", "z")
else:
axis_order = None
Dataset.__init__(
self,
filename,
dataset_type,
units_override=units_override,
unit_system=unit_system,
default_species_fields=default_species_fields,
axis_order=axis_order,
)
if storage_filename is None:
storage_filename = self.basename + ".yt"
self.storage_filename = storage_filename
def _set_code_unit_attributes(self):
"""
Generates the conversion to various physical _units based on the
parameter file
"""
for unit, cgs in [
("length", "cm"),
("time", "s"),
("mass", "g"),
]:
unit_param = f"Hydro/code_{unit}_cgs"
# Use units, if provided in output
if unit_param in self.parameters:
setdefaultattr(
self, f"{unit}_unit", self.quan(self.parameters[unit_param], cgs)
)
# otherwise use code = cgs
else:
mylog.warning(f"Assuming 1.0 code_{unit} = 1.0 {cgs}")
setdefaultattr(self, f"{unit}_unit", self.quan(1.0, cgs))
self.magnetic_unit = np.sqrt(
self._magnetic_factor
* self.mass_unit
/ (self.time_unit**2 * self.length_unit)
)
self.magnetic_unit.convert_to_units("gauss")
self.velocity_unit = self.length_unit / self.time_unit
def _parse_parameter_file(self):
self.parameters.update(self.specified_parameters)
for key, val in self._handle["Params"].attrs.items():
if key in self.parameters.keys():
mylog.warning(
f"Overriding existing {key!r} key in ds.parameters from data 'Params'"
)
self.parameters[key] = val
xmin, xmax = self._handle["Info"].attrs["RootGridDomain"][0:2]
ymin, ymax = self._handle["Info"].attrs["RootGridDomain"][3:5]
zmin, zmax = self._handle["Info"].attrs["RootGridDomain"][6:8]
self.domain_left_edge = np.array([xmin, ymin, zmin], dtype="float64")
self.domain_right_edge = np.array([xmax, ymax, zmax], dtype="float64")
self.domain_width = self.domain_right_edge - self.domain_left_edge
self.domain_dimensions = self._handle["Info"].attrs["RootGridSize"]
self._field_map = {}
k = 0
dnames = self._handle["Info"].attrs["OutputDatasetNames"]
num_components = self._handle["Info"].attrs["NumComponents"]
if "OutputFormatVersion" in self._handle["Info"].attrs.keys():
self.output_format_version = self._handle["Info"].attrs[
"OutputFormatVersion"
]
else:
raise NotImplementedError("Could not determine OutputFormatVersion.")
# For a single variable, we need to convert it to a list for the following
# zip to work.
if isinstance(num_components, np.uint64):
dnames = (dnames,)
num_components = (num_components,)
component_name_offset = 0
for dname, num_component in zip(dnames, num_components, strict=False):
for j in range(num_component):
fname = self._handle["Info"].attrs["ComponentNames"][
j + component_name_offset
]
self._field_map[fname] = (dname, j)
k += 1
component_name_offset = int(component_name_offset + num_component)
self.refine_by = 2
dimensionality = 3
if self.domain_dimensions[2] == 1:
dimensionality = 2
if self.domain_dimensions[1] == 1:
dimensionality = 1
self.dimensionality = dimensionality
self.current_time = self._handle["Info"].attrs["Time"]
self.num_ghost_zones = 0
self.field_ordering = "fortran"
self.boundary_conditions = [1] * 6
self.cosmological_simulation = False
if "periodicity" in self.parameters:
self._periodicity = tuple(self.parameters["periodicity"])
else:
boundary_conditions = self._handle["Info"].attrs["BoundaryConditions"]
inner_bcs = boundary_conditions[::2]
# outer_bcs = boundary_conditions[1::2]
##Check self consistency
# for inner_bc,outer_bc in zip(inner_bcs,outer_bcs):
# if( (inner_bc == "periodicity" or outer_bc == "periodic") and inner_bc != outer_bc ):
# raise Exception("Inconsistent periodicity in boundary conditions")
self._periodicity = tuple(bc == "periodic" for bc in inner_bcs)
if "gamma" in self.parameters:
self.gamma = float(self.parameters["gamma"])
elif "Hydro/AdiabaticIndex" in self.parameters:
self.gamma = self.parameters["Hydro/AdiabaticIndex"]
else:
mylog.warning(
"Adiabatic index gamma could not be determined. Falling back to 5/3."
)
self.gamma = 5.0 / 3.0
if "mu" in self.parameters:
self.mu = self.parameters["mu"]
elif "Hydro/mu" in self.parameters:
self.mu = self.parameters["Hydro/mu"]
# Legacy He_mass_fraction parameter implemented in AthenaPK
elif "Hydro/He_mass_fraction" in self.parameters:
He_mass_fraction = self.parameters["Hydro/He_mass_fraction"]
self.mu = 1 / (He_mass_fraction * 3.0 / 4.0 + (1 - He_mass_fraction) * 2)
# Fallback to primorial gas composition (and show warning)
else:
mylog.warning(
"Plasma composition could not be determined in data file. Falling back to fully ionized primodial composition."
)
self.mu = self.parameters.get("mu", compute_mu(self.default_species_fields))
@classmethod
def _is_valid(cls, filename: str, *args, **kwargs) -> bool:
return filename.endswith((".phdf", ".rhdf"))
@property
def _skip_cache(self):
return True
def __str__(self):
return self.basename.rsplit(".", 1)[0]