Source code for yt.frontends.enzo_e.data_structures

import os
from functools import cached_property

import numpy as np

from yt.data_objects.index_subobjects.grid_patch import AMRGridPatch
from yt.data_objects.static_output import Dataset
from yt.fields.field_info_container import NullFunc
from yt.frontends.enzo.misc import cosmology_get_units
from yt.frontends.enzo_e.fields import EnzoEFieldInfo
from yt.frontends.enzo_e.misc import (
    get_block_info,
    get_child_index,
    get_listed_subparam,
    get_root_block_id,
    get_root_blocks,
    is_parent,
    nested_dict_get,
)
from yt.funcs import get_pbar, setdefaultattr
from yt.geometry.grid_geometry_handler import GridIndex
from yt.utilities.cosmology import Cosmology
from yt.utilities.logger import ytLogger as mylog
from yt.utilities.on_demand_imports import _h5py as h5py, _libconf as libconf


[docs] class EnzoEGrid(AMRGridPatch): """ Class representing a single EnzoE Grid instance. """ _id_offset = 0 _refine_by = 2 def __init__(self, id, index, block_name, filename=None): """ Returns an instance of EnzoEGrid with *id*, associated with *filename* and *index*. """ # All of the field parameters will be passed to us as needed. AMRGridPatch.__init__(self, id, filename=filename, index=index) self.block_name = block_name self._children_ids = None self._parent_id = -1 self.Level = -1 def __repr__(self): return "EnzoEGrid_%04d" % self.id def _prepare_grid(self): """Copies all the appropriate attributes from the index.""" h = self.index # cache it my_ind = self.id - self._id_offset self.ActiveDimensions = h.grid_dimensions[my_ind] self.LeftEdge = h.grid_left_edge[my_ind] self.RightEdge = h.grid_right_edge[my_ind]
[docs] def get_parent_id(self, desc_block_name): if self.block_name == desc_block_name: raise RuntimeError("Child and parent are the same!") dim = self.ds.dimensionality d_block = desc_block_name[1:].replace(":", "") parent = self while True: a_block = parent.block_name[1:].replace(":", "") gengap = (len(d_block) - len(a_block)) / dim if gengap <= 1: return parent.id cid = get_child_index(a_block, d_block) parent = self.index.grids[parent._children_ids[cid]]
[docs] def add_child(self, child): if self._children_ids is None: self._children_ids = -1 * np.ones( self._refine_by**self.ds.dimensionality, dtype=np.int64 ) a_block = self.block_name[1:].replace(":", "") d_block = child.block_name[1:].replace(":", "") cid = get_child_index(a_block, d_block) self._children_ids[cid] = child.id
@cached_property def particle_count(self): with h5py.File(self.filename, mode="r") as f: fnstr = "{}/{}".format( self.block_name, self.ds.index.io._sep.join(["particle", "%s", "%s"]), ) return { ptype: f.get(fnstr % (ptype, pfield)).size for ptype, pfield in self.ds.index.io.sample_pfields.items() } @cached_property def total_particles(self) -> int: return sum(self.particle_count.values()) @property def Parent(self): if self._parent_id == -1: return None return self.index.grids[self._parent_id] @property def Children(self): if self._children_ids is None: return [] return [self.index.grids[cid] for cid in self._children_ids]
[docs] class EnzoEHierarchy(GridIndex): _strip_path = False grid = EnzoEGrid _preload_implemented = True def __init__(self, ds, dataset_type): self.dataset_type = dataset_type self.directory = os.path.dirname(ds.parameter_filename) self.index_filename = ds.parameter_filename if os.path.getsize(self.index_filename) == 0: raise OSError(-1, "File empty", self.index_filename) GridIndex.__init__(self, ds, dataset_type) self.dataset.dataset_type = self.dataset_type def _count_grids(self): fblock_size = 32768 f = open(self.ds.parameter_filename) f.seek(0, 2) file_size = f.tell() nblocks = np.ceil(float(file_size) / fblock_size).astype(np.int64) f.seek(0) offset = f.tell() ngrids = 0 for _ in range(nblocks): my_block = min(fblock_size, file_size - offset) buff = f.read(my_block) ngrids += buff.count("\n") offset += my_block f.close() self.num_grids = ngrids self.dataset_type = "enzo_e" def _parse_index(self): self.grids = np.empty(self.num_grids, dtype="object") c = 1 pbar = get_pbar("Parsing Hierarchy", self.num_grids) f = open(self.ds.parameter_filename) fblock_size = 32768 f.seek(0, 2) file_size = f.tell() nblocks = np.ceil(float(file_size) / fblock_size).astype(np.int64) f.seek(0) offset = f.tell() lstr = "" # place child blocks after the root blocks rbdim = self.ds.root_block_dimensions nroot_blocks = rbdim.prod() child_id = nroot_blocks last_pid = None for _ib in range(nblocks): fblock = min(fblock_size, file_size - offset) buff = lstr + f.read(fblock) bnl = 0 for _inl in range(buff.count("\n")): nnl = buff.find("\n", bnl) line = buff[bnl:nnl] block_name, block_file = line.split() # Handling of the B, B_, and B__ blocks is consistent with # other unrefined blocks level, left, right = get_block_info(block_name) rbindex = get_root_block_id(block_name) rbid = ( rbindex[0] * rbdim[1:].prod() + rbindex[1] * rbdim[2:].prod() + rbindex[2] ) # There are also blocks at lower level than the # real root blocks. These can be ignored. if level == 0: check_root = get_root_blocks(block_name).prod() if check_root < nroot_blocks: level = -1 if level == -1: grid_id = child_id parent_id = -1 child_id += 1 elif level == 0: grid_id = rbid parent_id = -1 else: grid_id = child_id # Try the last parent_id first if last_pid is not None and is_parent( self.grids[last_pid].block_name, block_name ): parent_id = last_pid else: parent_id = self.grids[rbid].get_parent_id(block_name) last_pid = parent_id child_id += 1 my_grid = self.grid( grid_id, self, block_name, filename=os.path.join(self.directory, block_file), ) my_grid.Level = level my_grid._parent_id = parent_id self.grids[grid_id] = my_grid self.grid_levels[grid_id] = level self.grid_left_edge[grid_id] = left self.grid_right_edge[grid_id] = right self.grid_dimensions[grid_id] = self.ds.active_grid_dimensions if level > 0: self.grids[parent_id].add_child(my_grid) bnl = nnl + 1 pbar.update(c) c += 1 lstr = buff[bnl:] offset += fblock f.close() pbar.finish() slope = self.ds.domain_width / self.ds.arr(np.ones(3), "code_length") self.grid_left_edge = self.grid_left_edge * slope + self.ds.domain_left_edge self.grid_right_edge = self.grid_right_edge * slope + self.ds.domain_left_edge def _populate_grid_objects(self): for g in self.grids: g._prepare_grid() g._setup_dx() self.max_level = self.grid_levels.max() def _setup_derived_fields(self): super()._setup_derived_fields() for fname, field in self.ds.field_info.items(): if not field.particle_type: continue if isinstance(fname, tuple): continue if field._function is NullFunc: continue def _get_particle_type_counts(self): return { ptype: sum(g.particle_count[ptype] for g in self.grids) for ptype in self.ds.particle_types_raw } def _detect_output_fields(self): self.field_list = [] # Do this only on the root processor to save disk work. if self.comm.rank in (0, None): # Just check the first grid. grid = self.grids[0] field_list, ptypes = self.io._read_field_names(grid) mylog.debug("Grid %s has: %s", grid.id, field_list) sample_pfields = self.io.sample_pfields else: field_list = None ptypes = None sample_pfields = None self.field_list = list(self.comm.mpi_bcast(field_list)) self.dataset.particle_types = list(self.comm.mpi_bcast(ptypes)) self.dataset.particle_types_raw = self.dataset.particle_types[:] self.io.sample_pfields = self.comm.mpi_bcast(sample_pfields)
[docs] class EnzoEDataset(Dataset): """ Enzo-E-specific output, set at a fixed time. """ _load_requirements = ["h5py", "libconf"] refine_by = 2 _index_class = EnzoEHierarchy _field_info_class = EnzoEFieldInfo _suffix = ".block_list" particle_types: tuple[str, ...] = () particle_types_raw = None def __init__( self, filename, dataset_type=None, parameter_override=None, conversion_override=None, storage_filename=None, units_override=None, unit_system="cgs", default_species_fields=None, ): """ This class is a stripped down class that simply reads and parses *filename* without looking at the index. *dataset_type* gets passed to the index to pre-determine the style of data-output. However, it is not strictly necessary. Optionally you may specify a *parameter_override* dictionary that will override anything in the parameter file and a *conversion_override* dictionary that consists of {fieldname : conversion_to_cgs} that will override the #DataCGS. """ self.fluid_types += ("enzoe",) if parameter_override is None: parameter_override = {} self._parameter_override = parameter_override if conversion_override is None: conversion_override = {} self._conversion_override = conversion_override self.storage_filename = storage_filename Dataset.__init__( self, filename, dataset_type, units_override=units_override, unit_system=unit_system, default_species_fields=default_species_fields, ) def _parse_parameter_file(self): """ Parses the parameter file and establishes the various dictionaries. """ f = open(self.parameter_filename) # get dimension from first block name b0, fn0 = f.readline().strip().split() level0, left0, right0 = get_block_info(b0, min_dim=0) root_blocks = get_root_blocks(b0) f.close() self.dimensionality = left0.size self._periodicity = tuple(np.ones(self.dimensionality, dtype=bool)) lcfn = self.parameter_filename[: -len(self._suffix)] + ".libconfig" if os.path.exists(lcfn): with open(lcfn) as lf: self.parameters = libconf.load(lf) # Enzo-E ignores all cosmology parameters if "cosmology" is not in # the Physics:list parameter physics_list = nested_dict_get( self.parameters, ("Physics", "list"), default=[] ) if "cosmology" in physics_list: self.cosmological_simulation = 1 co_pars = [ "hubble_constant_now", "omega_matter_now", "omega_lambda_now", "comoving_box_size", "initial_redshift", ] co_dict = { attr: nested_dict_get( self.parameters, ("Physics", "cosmology", attr) ) for attr in co_pars } for attr in ["hubble_constant", "omega_matter", "omega_lambda"]: setattr(self, attr, co_dict[f"{attr}_now"]) # Current redshift is not stored, so it's not possible # to set all cosmological units yet. # Get the time units and use that to figure out redshift. k = cosmology_get_units( self.hubble_constant, self.omega_matter, co_dict["comoving_box_size"], co_dict["initial_redshift"], 0, ) setdefaultattr(self, "time_unit", self.quan(k["utim"], "s")) co = Cosmology( hubble_constant=self.hubble_constant, omega_matter=self.omega_matter, omega_lambda=self.omega_lambda, ) else: self.cosmological_simulation = 0 else: self.cosmological_simulation = 0 fh = h5py.File(os.path.join(self.directory, fn0), "r") self.domain_left_edge = fh.attrs["lower"] self.domain_right_edge = fh.attrs["upper"] if "version" in fh.attrs: version = fh.attrs.get("version").tobytes().decode("ascii") else: version = None # earliest recorded version is '0.9.0' self.parameters["version"] = version # all blocks are the same size ablock = fh[list(fh.keys())[0]] self.current_time = ablock.attrs["time"][0] self.parameters["current_cycle"] = ablock.attrs["cycle"][0] gsi = ablock.attrs["enzo_GridStartIndex"] gei = ablock.attrs["enzo_GridEndIndex"] assert len(gsi) == len(gei) == 3 # sanity check # Enzo-E technically allows each axis to have different ghost zone # depths (this feature is not really used in practice) self.ghost_zones = gsi assert (self.ghost_zones[self.dimensionality :] == 0).all() # sanity check self.root_block_dimensions = root_blocks self.active_grid_dimensions = gei - gsi + 1 self.grid_dimensions = ablock.attrs["enzo_GridDimension"] self.domain_dimensions = root_blocks * self.active_grid_dimensions fh.close() if self.cosmological_simulation: self.current_redshift = co.z_from_t(self.current_time * self.time_unit) self._periodicity += (False,) * (3 - self.dimensionality) self._parse_fluid_prop_params() def _parse_fluid_prop_params(self): """ Parse the fluid properties. """ fp_params = nested_dict_get( self.parameters, ("Physics", "fluid_props"), default=None ) if fp_params is not None: # in newer versions of enzo-e, this data is specified in a # centralized parameter group called Physics:fluid_props # - for internal reasons related to backwards compatibility, # treatment of this physics-group is somewhat special (compared # to the cosmology group). The parameters in this group are # honored even if Physics:list does not include "fluid_props" self.gamma = nested_dict_get(fp_params, ("eos", "gamma")) de_type = nested_dict_get( fp_params, ("dual_energy", "type"), default="disabled" ) uses_de = de_type != "disabled" else: # in older versions, these parameters were more scattered self.gamma = nested_dict_get(self.parameters, ("Field", "gamma")) uses_de = False for method in ("ppm", "mhd_vlct"): subparams = get_listed_subparam( self.parameters, "Method", method, default=None ) if subparams is not None: uses_de = subparams.get("dual_energy", False) self.parameters["uses_dual_energy"] = uses_de def _set_code_unit_attributes(self): if self.cosmological_simulation: box_size = self.parameters["Physics"]["cosmology"]["comoving_box_size"] k = cosmology_get_units( self.hubble_constant, self.omega_matter, box_size, self.parameters["Physics"]["cosmology"]["initial_redshift"], self.current_redshift, ) # Now some CGS values setdefaultattr(self, "length_unit", self.quan(box_size, "Mpccm/h")) setdefaultattr( self, "mass_unit", self.quan(k["urho"], "g/cm**3") * (self.length_unit.in_cgs()) ** 3, ) setdefaultattr(self, "velocity_unit", self.quan(k["uvel"], "cm/s")) else: p = self.parameters for d, u in [("length", "cm"), ("time", "s")]: val = nested_dict_get(p, ("Units", d), default=1) setdefaultattr(self, f"{d}_unit", self.quan(val, u)) mass = nested_dict_get(p, ("Units", "mass")) if mass is None: density = nested_dict_get(p, ("Units", "density")) if density is not None: mass = density * self.length_unit**3 else: mass = 1 setdefaultattr(self, "mass_unit", self.quan(mass, "g")) setdefaultattr(self, "velocity_unit", self.length_unit / self.time_unit) magnetic_unit = np.sqrt( 4 * np.pi * self.mass_unit / (self.time_unit**2 * self.length_unit) ) magnetic_unit = np.float64(magnetic_unit.in_cgs()) setdefaultattr(self, "magnetic_unit", self.quan(magnetic_unit, "gauss")) def __str__(self): return self.basename[: -len(self._suffix)] @classmethod def _is_valid(cls, filename: str, *args, **kwargs) -> bool: ddir = os.path.dirname(filename) if not filename.endswith(cls._suffix): return False if cls._missing_load_requirements(): return False try: with open(filename) as f: block, block_file = f.readline().strip().split() get_block_info(block) if not os.path.exists(os.path.join(ddir, block_file)): return False except Exception: return False return True