AMRVAC data structures
import os
import struct
import warnings
import weakref
from pathlib import Path
import numpy as np
from more_itertools import always_iterable
from yt.config import ytcfg
from yt.data_objects.index_subobjects.grid_patch import AMRGridPatch
from yt.data_objects.static_output import Dataset
from yt.funcs import mylog, setdefaultattr
from yt.geometry.api import Geometry
from yt.geometry.grid_geometry_handler import GridIndex
from yt.utilities.physical_constants import boltzmann_constant_cgs as kb_cgs
from .datfile_utils import get_header, get_tree_info
from .fields import AMRVACFieldInfo
from .io import read_amrvac_namelist
def _parse_geometry(geometry_tag: str) -> Geometry:
"""Translate AMRVAC's geometry tag to yt's format.
geometry_tag : str
A geometry tag as read from AMRVAC's datfile from v5.
geometry_yt : Geometry
An enum member of the yt.geometry.geometry_enum.Geometry class
>>> _parse_geometry("Polar_2.5D")
<Geometry.POLAR: 'polar'>
>>> _parse_geometry("Cartesian_2.5D")
<Geometry.CARTESIAN: 'cartesian'>
geometry_str, _, _dimension_str = geometry_tag.partition("_")
return Geometry(geometry_str.lower())
class AMRVACGrid(AMRGridPatch):
"""A class to populate AMRVACHierarchy.grids, setting parent/children relations."""
_id_offset = 0
def __init__(self, id, index, level):
# <level> should use yt's convention (start from 0)
super().__init__(id, filename=index.index_filename, index=index)
self.Parent = None
self.Children = []
self.Level = level
def get_global_startindex(self):
"""Refresh and retrieve the starting index for each dimension at current level.
self.start_index : int
start_index = (self.LeftEdge - self.ds.domain_left_edge) / self.dds
self.start_index = np.rint(start_index).astype("int64").ravel()
return self.start_index
def retrieve_ghost_zones(self, n_zones, fields, all_levels=False, smoothed=False):
if smoothed:
"ghost-zones interpolation/smoothing is not "
"currently supported for AMRVAC data.",
smoothed = False
return super().retrieve_ghost_zones(
n_zones, fields, all_levels=all_levels, smoothed=smoothed
class AMRVACHierarchy(GridIndex):
grid = AMRVACGrid
def __init__(self, ds, dataset_type="amrvac"):
self.dataset_type = dataset_type
self.dataset = weakref.proxy(ds)
# the index file *is* the datfile
self.index_filename = self.dataset.parameter_filename
self.directory = os.path.dirname(self.index_filename)
self.float_type = np.float64
super().__init__(ds, dataset_type)
def _detect_output_fields(self):
"""Parse field names from the header, as stored in self.dataset.parameters"""
self.field_list = [
(self.dataset_type, f) for f in self.dataset.parameters["w_names"]
def _count_grids(self):
"""Set self.num_grids from datfile header."""
self.num_grids = self.dataset.parameters["nleafs"]
def _parse_index(self):
"""Populate self.grid_* attributes from tree info from datfile header."""
with open(self.index_filename, "rb") as istream:
vaclevels, morton_indices, block_offsets = get_tree_info(istream)
assert (
== len(morton_indices)
== len(block_offsets)
== self.num_grids
self.block_offsets = block_offsets
# YT uses 0-based grid indexing:
# lowest level = 0, while AMRVAC uses 1 for lowest level
ytlevels = np.array(vaclevels, dtype="int32") - 1
self.grid_levels.flat[:] = ytlevels
self.min_level = np.min(ytlevels)
self.max_level = np.max(ytlevels)
assert self.max_level == self.dataset.parameters["levmax"] - 1
# some aliases for left/right edges computation in the coming loop
domain_width = self.dataset.parameters["xmax"] - self.dataset.parameters["xmin"]
block_nx = self.dataset.parameters["block_nx"]
xmin = self.dataset.parameters["xmin"]
dx0 = (
domain_width / self.dataset.parameters["domain_nx"]
) # dx at coarsest grid level (YT level 0)
dim = self.dataset.dimensionality
self.grids = np.empty(self.num_grids, dtype="object")
for igrid, (ytlevel, morton_index) in enumerate(
zip(ytlevels, morton_indices, strict=True)
dx = dx0 / self.dataset.refine_by**ytlevel
left_edge = xmin + (morton_index - 1) * block_nx * dx
# edges and dimensions are filled in a dimensionality-agnostic way
self.grid_left_edge[igrid, :dim] = left_edge
self.grid_right_edge[igrid, :dim] = left_edge + block_nx * dx
self.grid_dimensions[igrid, :dim] = block_nx
self.grids[igrid] = self.grid(igrid, self, ytlevels[igrid])
def _populate_grid_objects(self):
# required method
for g in self.grids:
class AMRVACDataset(Dataset):
_index_class = AMRVACHierarchy
_field_info_class = AMRVACFieldInfo
def __init__(
"""Instantiate AMRVACDataset.
filename : str
Path to a datfile.
dataset_type : str, optional
This should always be 'amrvac'.
units_override : dict, optional
A dictionary of physical normalisation factors to interpret on disk data.
unit_system : str, optional
Either "cgs" (default), "mks" or "code"
geometry_override : str, optional
A geometry flag formatted either according to either
AMRVAC or yt standards.
When this parameter is passed along with v5 or more newer datfiles,
will precede over their internal "geometry" tag.
parfiles : str or list, optional
One or more parfiles to be passed to
# note: geometry_override and parfiles are specific to this frontend
self._geometry_override = geometry_override
self._parfiles = []
if parfiles is not None:
parfiles = list(always_iterable(parfiles))
ppf = Path(parfiles[0])
if not ppf.is_absolute() and Path(filename).resolve().is_relative_to(
ytcfg["yt", "test_data_dir"]
"Looks like %s is relative to your test_data_dir. Assuming this is also true for parfiles.",
parfiles = [Path(ytcfg["yt", "test_data_dir"]) / pf for pf in parfiles]
self._parfiles = parfiles
namelist = None
namelist_gamma = None
c_adiab = None
e_is_internal = None
if parfiles is not None:
namelist = read_amrvac_namelist(parfiles)
if "hd_list" in namelist:
c_adiab = namelist["hd_list"].get("hd_adiab", 1.0)
namelist_gamma = namelist["hd_list"].get("hd_gamma")
elif "mhd_list" in namelist:
c_adiab = namelist["mhd_list"].get("mhd_adiab", 1.0)
namelist_gamma = namelist["mhd_list"].get("mhd_gamma")
if namelist_gamma is not None and self.gamma != namelist_gamma:
"Inconsistent values in gamma: datfile %s, parfiles %s",
if "method_list" in namelist:
e_is_internal = namelist["method_list"].get("solve_internal_e", False)
if c_adiab is not None:
# this complicated unit is required for the adiabatic equation
# of state to make physical sense
c_adiab *= (
self.mass_unit ** (1 - self.gamma)
* self.length_unit ** (2 + 3 * (self.gamma - 1))
/ self.time_unit**2
self.namelist = namelist
self._c_adiab = c_adiab
self._e_is_internal = e_is_internal
self.fluid_types += ("amrvac",)
# refinement factor between a grid and its subgrid
self.refine_by = 2
def _is_valid(cls, filename: str, *args, **kwargs) -> bool:
"""At load time, check whether data is recognized as AMRVAC formatted."""
validation = False
if filename.endswith(".dat"):
with open(filename, mode="rb") as istream:
fmt = "=i"
[datfile_version] = struct.unpack(
fmt, istream.read(struct.calcsize(fmt))
if 3 <= datfile_version < 6:
fmt = "=ii"
offset_tree, offset_blocks = struct.unpack(
fmt, istream.read(struct.calcsize(fmt))
istream.seek(0, 2)
file_size = istream.tell()
validation = (
offset_tree < file_size and offset_blocks < file_size
except Exception:
return validation
def _parse_parameter_file(self):
"""Parse input datfile's header. Apply geometry_override if specified."""
# required method
# populate self.parameters with header data
with open(self.parameter_filename, "rb") as istream:
self.current_time = self.parameters["time"]
self.dimensionality = self.parameters["ndim"]
# force 3D for this definition
dd = np.ones(3, dtype="int64")
dd[: self.dimensionality] = self.parameters["domain_nx"]
self.domain_dimensions = dd
if self.parameters.get("staggered", False):
"'staggered' flag was found, but is currently ignored (unsupported)"
# parse geometry
# by order of decreasing priority, we use
# - geometry_override
# - "geometry" parameter from datfile
# - if all fails, default to "cartesian"
self.geometry = Geometry.CARTESIAN
amrvac_geom = self.parameters.get("geometry", None)
if amrvac_geom is not None:
self.geometry = _parse_geometry(amrvac_geom)
elif self.parameters["datfile_version"] > 4:
"No 'geometry' flag found in datfile with version %d >4.",
if self._geometry_override is not None:
new_geometry = _parse_geometry(self._geometry_override)
if new_geometry == self.geometry:
mylog.info("geometry_override is identical to datfile parameter.")
self.geometry = new_geometry
"Overriding geometry, this may lead to surprising results."
except ValueError:
"Unable to parse geometry_override '%s' (will be ignored).",
# parse peridiocity
periodicity = self.parameters.get("periodic", ())
missing_dim = 3 - len(periodicity)
self._periodicity = (*periodicity, *(missing_dim * (False,)))
self.gamma = self.parameters.get("gamma", 5.0 / 3.0)
# parse domain edges
dle = np.zeros(3)
dre = np.ones(3)
dle[: self.dimensionality] = self.parameters["xmin"]
dre[: self.dimensionality] = self.parameters["xmax"]
self.domain_left_edge = dle
self.domain_right_edge = dre
# defaulting to non-cosmological
self.cosmological_simulation = 0
self.current_redshift = 0.0
self.omega_matter = 0.0
self.omega_lambda = 0.0
self.hubble_constant = 0.0
# units stuff ======================================================================
def _set_code_unit_attributes(self):
"""Reproduce how AMRVAC internally set up physical normalisation factors."""
# This gets called later than Dataset._override_code_units()
# This is the reason why it uses setdefaultattr: it will only fill in the gaps
# left by the "override", instead of overriding them again.
# note: yt sets hydrogen mass equal to proton mass, amrvac doesn't.
mp_cgs = self.quan(1.672621898e-24, "g") # This value is taken from AstroPy
# get self.length_unit if overrides are supplied, otherwise use default
length_unit = getattr(self, "length_unit", self.quan(1, "cm"))
namelist = read_amrvac_namelist(self._parfiles)
He_abundance = namelist.get("mhd_list", {}).get("he_abundance", 0.1)
# 1. calculations for mass, density, numberdensity
if "mass_unit" in self.units_override:
# in this case unit_mass is supplied (and has been set as attribute)
mass_unit = self.mass_unit
density_unit = mass_unit / length_unit**3
nd_unit = density_unit / ((1.0 + 4.0 * He_abundance) * mp_cgs)
# other case: numberdensity is supplied.
# Fall back to one (default) if no overrides supplied
nd_unit = self.quan(self.units_override["numberdensity_unit"])
except KeyError:
nd_unit = self.quan(
1.0, self.__class__.default_units["numberdensity_unit"]
density_unit = (1.0 + 4.0 * He_abundance) * mp_cgs * nd_unit
mass_unit = density_unit * length_unit**3
# 2. calculations for velocity
if "time_unit" in self.units_override:
# in this case time was supplied
velocity_unit = length_unit / self.time_unit
# other case: velocity was supplied.
# Fall back to None if no overrides supplied
velocity_unit = getattr(self, "velocity_unit", None)
# 3. calculations for pressure and temperature
if velocity_unit is None:
# velocity and time not given, see if temperature is given.
# Fall back to one (default) if not
temperature_unit = getattr(self, "temperature_unit", self.quan(1, "K"))
pressure_unit = (
(2.0 + 3.0 * He_abundance) * nd_unit * kb_cgs * temperature_unit
velocity_unit = (np.sqrt(pressure_unit / density_unit)).in_cgs()
# velocity is not zero if either time was given OR velocity was given
pressure_unit = (density_unit * velocity_unit**2).in_cgs()
temperature_unit = (
pressure_unit / ((2.0 + 3.0 * He_abundance) * nd_unit * kb_cgs)
# 4. calculations for magnetic unit and time
time_unit = getattr(
self, "time_unit", length_unit / velocity_unit
) # if time given use it, else calculate
magnetic_unit = (np.sqrt(4 * np.pi * pressure_unit)).to("gauss")
setdefaultattr(self, "mass_unit", mass_unit)
setdefaultattr(self, "density_unit", density_unit)
setdefaultattr(self, "length_unit", length_unit)
setdefaultattr(self, "velocity_unit", velocity_unit)
setdefaultattr(self, "time_unit", time_unit)
setdefaultattr(self, "temperature_unit", temperature_unit)
setdefaultattr(self, "pressure_unit", pressure_unit)
setdefaultattr(self, "magnetic_unit", magnetic_unit)
allowed_unit_combinations = [
{"numberdensity_unit", "temperature_unit", "length_unit"},
{"mass_unit", "temperature_unit", "length_unit"},
{"mass_unit", "time_unit", "length_unit"},
{"numberdensity_unit", "velocity_unit", "length_unit"},
{"mass_unit", "velocity_unit", "length_unit"},
default_units = {
"length_unit": "cm",
"time_unit": "s",
"mass_unit": "g",
"velocity_unit": "cm/s",
"magnetic_unit": "gauss",
"temperature_unit": "K",
# this is the one difference with Dataset.default_units:
# we accept numberdensity_unit as a valid override
"numberdensity_unit": "cm**-3",
def _validate_units_override_keys(cls, units_override):
"""Check that keys in units_override are consistent with AMRVAC's internal
normalisations factors.
# YT supports overriding other normalisations, this method ensures consistency
# between supplied 'units_override' items and those used by AMRVAC.
# AMRVAC's normalisations/units have 3 degrees of freedom.
# Moreover, if temperature unit is specified then velocity unit will be
# calculated accordingly, and vice-versa.
# We replicate this by allowing a finite set of combinations.
# there are only three degrees of freedom, so explicitly check for this
if len(units_override) > 3:
raise ValueError(
"More than 3 degrees of freedom were specified "
f"in units_override ({len(units_override)} given)"
# temperature and velocity cannot both be specified
if "temperature_unit" in units_override and "velocity_unit" in units_override:
raise ValueError(
"Either temperature or velocity is allowed in units_override, not both."
# check if provided overrides are allowed
suo = set(units_override)
for allowed_combo in cls.allowed_unit_combinations:
if suo.issubset(allowed_combo):
raise ValueError(
f"Combination {suo} passed to units_override "
"is not consistent with AMRVAC.\n"
f"Allowed combinations are {cls.allowed_unit_combinations}"
# syntax for mixing super with classmethod is weird...
super(cls, cls)._validate_units_override_keys(units_override)