Source code for yt.frontends.enzo.data_structures

import os
import re
import string
import time
import weakref
from collections import defaultdict
from functools import cached_property

import numpy as np
from more_itertools import always_iterable

from yt.data_objects.index_subobjects.grid_patch import AMRGridPatch
from yt.data_objects.static_output import Dataset
from yt.fields.field_info_container import NullFunc
from yt.frontends.enzo.misc import cosmology_get_units
from yt.funcs import get_pbar, iter_fields, setdefaultattr
from yt.geometry.geometry_handler import YTDataChunk
from yt.geometry.grid_geometry_handler import GridIndex
from yt.utilities.logger import ytLogger as mylog
from yt.utilities.on_demand_imports import _h5py as h5py, _libconf as libconf

from .fields import EnzoFieldInfo


[docs] class EnzoGrid(AMRGridPatch): """ Class representing a single Enzo Grid instance. """ def __init__(self, id, index): """ Returns an instance of EnzoGrid with *id*, associated with *filename* and *index*. """ # All of the field parameters will be passed to us as needed. AMRGridPatch.__init__(self, id, filename=None, index=index) self._children_ids = [] self._parent_id = -1 self.Level = -1
[docs] def set_filename(self, filename): """ Intelligently set the filename. """ if filename is None: self.filename = filename return if self.index._strip_path: self.filename = os.path.join( self.index.directory, os.path.basename(filename) ) elif filename[0] == os.path.sep: self.filename = filename else: self.filename = os.path.join(self.index.directory, filename) return
@property def Parent(self): if self._parent_id == -1: return None return self.index.grids[self._parent_id - self._id_offset] @property def Children(self): return [self.index.grids[cid - self._id_offset] for cid in self._children_ids] @property def NumberOfActiveParticles(self): if not hasattr(self.index, "grid_active_particle_count"): return {} id = self.id - self._id_offset nap = { ptype: self.index.grid_active_particle_count[ptype][id] for ptype in self.index.grid_active_particle_count } return nap
[docs] class EnzoGridInMemory(EnzoGrid): __slots__ = ["proc_num"]
[docs] def set_filename(self, filename): pass
[docs] class EnzoGridGZ(EnzoGrid): __slots__ = ()
[docs] def retrieve_ghost_zones(self, n_zones, fields, all_levels=False, smoothed=False): NGZ = self.ds.parameters.get("NumberOfGhostZones", 3) if n_zones > NGZ: return EnzoGrid.retrieve_ghost_zones( self, n_zones, fields, all_levels, smoothed ) # ----- Below is mostly the original code, except we remove the field # ----- access section # We will attempt this by creating a datacube that is exactly bigger # than the grid by nZones*dx in each direction nl = self.get_global_startindex() - n_zones new_left_edge = nl * self.dds + self.ds.domain_left_edge # Something different needs to be done for the root grid, though level = self.Level kwargs = { "dims": self.ActiveDimensions + 2 * n_zones, "num_ghost_zones": n_zones, "use_pbar": False, } # This should update the arguments to set the field parameters to be # those of this grid. kwargs.update(self.field_parameters) if smoothed: # cube = self.index.smoothed_covering_grid( # level, new_left_edge, new_right_edge, **kwargs) cube = self.index.smoothed_covering_grid(level, new_left_edge, **kwargs) else: cube = self.index.covering_grid(level, new_left_edge, **kwargs) # ----- This is EnzoGrid.get_data, duplicated here mostly for # ---- efficiency's sake. start_zone = NGZ - n_zones if start_zone == 0: end_zone = None else: end_zone = -(NGZ - n_zones) sl = tuple(slice(start_zone, end_zone) for i in range(3)) if fields is None: return cube for field in iter_fields(fields): if field in self.field_list: conv_factor = 1.0 if field in self.ds.field_info: conv_factor = self.ds.field_info[field]._convert_function(self) if self.ds.field_info[field].sampling_type == "particle": continue temp = self.index.io._read_raw_data_set(self, field) temp = temp.swapaxes(0, 2) cube.field_data[field] = np.multiply(temp, conv_factor, temp)[sl] return cube
[docs] class EnzoHierarchy(GridIndex): _strip_path = False grid = EnzoGrid _preload_implemented = True def __init__(self, ds, dataset_type): self.dataset_type = dataset_type self.index_filename = os.path.abspath(f"{ds.parameter_filename}.hierarchy") if os.path.getsize(self.index_filename) == 0: raise OSError(-1, "File empty", self.index_filename) self.directory = os.path.dirname(self.index_filename) # For some reason, r8 seems to want Float64 if "CompilerPrecision" in ds and ds["CompilerPrecision"] == "r4": self.float_type = "float32" else: self.float_type = "float64" GridIndex.__init__(self, ds, dataset_type) # sync it back self.dataset.dataset_type = self.dataset_type def _count_grids(self): self.num_grids = None test_grid = test_grid_id = None self.num_stars = 0 for line in rlines(open(self.index_filename, "rb")): if ( line.startswith("BaryonFileName") or line.startswith("ParticleFileName") or line.startswith("FileName ") ): test_grid = line.split("=")[-1].strip().rstrip() if line.startswith("NumberOfStarParticles"): self.num_stars = int(line.split("=")[-1]) if line.startswith("Grid "): if self.num_grids is None: self.num_grids = int(line.split("=")[-1]) test_grid_id = int(line.split("=")[-1]) if test_grid is not None: break self._guess_dataset_type(self.ds.dimensionality, test_grid, test_grid_id) def _guess_dataset_type(self, rank, test_grid, test_grid_id): if test_grid[0] != os.path.sep: test_grid = os.path.join(self.directory, test_grid) if not os.path.exists(test_grid): test_grid = os.path.join(self.directory, os.path.basename(test_grid)) mylog.debug("Your data uses the annoying hardcoded path.") self._strip_path = True if self.dataset_type is not None: return if rank == 3: mylog.debug("Detected packed HDF5") if self.parameters.get("WriteGhostZones", 0) == 1: self.dataset_type = "enzo_packed_3d_gz" self.grid = EnzoGridGZ else: self.dataset_type = "enzo_packed_3d" elif rank == 2: mylog.debug("Detect packed 2D") self.dataset_type = "enzo_packed_2d" elif rank == 1: mylog.debug("Detect packed 1D") self.dataset_type = "enzo_packed_1d" else: raise NotImplementedError # Sets are sorted, so that won't work! def _parse_index(self): def _next_token_line(token, f): for line in f: if line.startswith(token): return line.split()[2:] pattern = r"Pointer: Grid\[(\d*)\]->NextGrid(Next|This)Level = (\d*)\s+$" patt = re.compile(pattern) f = open(self.index_filename) self.grids = [self.grid(1, self)] self.grids[0].Level = 0 si, ei, LE, RE, fn, npart = [], [], [], [], [], [] pbar = get_pbar("Parsing Hierarchy ", self.num_grids) version = self.dataset.parameters.get("VersionNumber", None) params = self.dataset.parameters if version is None and "Internal" in params: version = float(params["Internal"]["Provenance"]["VersionNumber"]) if version >= 3.0: active_particles = True nap = { ap_type: [] for ap_type in params["Physics"]["ActiveParticles"][ "ActiveParticlesEnabled" ] } else: if "AppendActiveParticleType" in self.parameters: nap = {} active_particles = True for type in self.parameters.get("AppendActiveParticleType", []): nap[type] = [] else: nap = None active_particles = False for grid_id in range(self.num_grids): pbar.update(grid_id + 1) # We will unroll this list si.append(_next_token_line("GridStartIndex", f)) ei.append(_next_token_line("GridEndIndex", f)) LE.append(_next_token_line("GridLeftEdge", f)) RE.append(_next_token_line("GridRightEdge", f)) nb = int(_next_token_line("NumberOfBaryonFields", f)[0]) fn.append([None]) if nb > 0: fn[-1] = _next_token_line("BaryonFileName", f) npart.append(int(_next_token_line("NumberOfParticles", f)[0])) # Below we find out what active particles exist in this grid, # and add their counts individually. if active_particles: ptypes = _next_token_line("PresentParticleTypes", f) counts = [int(c) for c in _next_token_line("ParticleTypeCounts", f)] for ptype in self.parameters.get("AppendActiveParticleType", []): if ptype in ptypes: nap[ptype].append(counts[ptypes.index(ptype)]) else: nap[ptype].append(0) if nb == 0 and npart[-1] > 0: fn[-1] = _next_token_line("ParticleFileName", f) for line in f: if len(line) < 2: break if line.startswith("Pointer:"): vv = patt.findall(line)[0] self.__pointer_handler(vv) pbar.finish() self._fill_arrays(ei, si, LE, RE, npart, nap) temp_grids = np.empty(self.num_grids, dtype="object") temp_grids[:] = self.grids self.grids = temp_grids self.filenames = fn def _initialize_grid_arrays(self): super()._initialize_grid_arrays() if "AppendActiveParticleType" in self.parameters.keys() and len( self.parameters["AppendActiveParticleType"] ): gac = { ptype: np.zeros(self.num_grids, dtype="i4") for ptype in self.parameters["AppendActiveParticleType"] } self.grid_active_particle_count = gac def _fill_arrays(self, ei, si, LE, RE, npart, nap): self.grid_dimensions.flat[:] = ei self.grid_dimensions -= np.array(si, dtype="i4") self.grid_dimensions += 1 self.grid_left_edge.flat[:] = LE self.grid_right_edge.flat[:] = RE self.grid_particle_count.flat[:] = npart if nap is not None: for ptype in nap: self.grid_active_particle_count[ptype].flat[:] = nap[ptype] def __pointer_handler(self, m): sgi = int(m[2]) - 1 if sgi == -1: return # if it's 0, then we're done with that lineage # Okay, so, we have a pointer. We make a new grid, with an id of the length+1 # (recall, Enzo grids are 1-indexed) self.grids.append(self.grid(len(self.grids) + 1, self)) # We'll just go ahead and make a weakref to cache second_grid = self.grids[sgi] # zero-indexed already first_grid = self.grids[int(m[0]) - 1] if m[1] == "Next": first_grid._children_ids.append(second_grid.id) second_grid._parent_id = first_grid.id second_grid.Level = first_grid.Level + 1 elif m[1] == "This": if first_grid.Parent is not None: first_grid.Parent._children_ids.append(second_grid.id) second_grid._parent_id = first_grid._parent_id second_grid.Level = first_grid.Level self.grid_levels[sgi] = second_grid.Level def _rebuild_top_grids(self, level=0): mylog.info("Rebuilding grids on level %s", level) cmask = self.grid_levels.flat == (level + 1) cmsum = cmask.sum() mask = np.zeros(self.num_grids, dtype="bool") for grid in self.select_grids(level): mask[:] = 0 LE = self.grid_left_edge[grid.id - grid._id_offset] RE = self.grid_right_edge[grid.id - grid._id_offset] grids, grid_i = self.get_box_grids(LE, RE) mask[grid_i] = 1 grid._children_ids = [] cgrids = self.grids[(mask * cmask).astype("bool")] mylog.info("%s: %s / %s", grid, len(cgrids), cmsum) for cgrid in cgrids: grid._children_ids.append(cgrid.id) cgrid._parent_id = grid.id mylog.info("Finished rebuilding") def _populate_grid_objects(self): for g, f in zip(self.grids, self.filenames, strict=True): g._prepare_grid() g._setup_dx() g.set_filename(f[0]) del self.filenames # No longer needed. self.max_level = self.grid_levels.max() def _detect_active_particle_fields(self): ap_list = self.dataset["AppendActiveParticleType"] _fields = {ap: [] for ap in ap_list} fields = [] for ptype in self.dataset["AppendActiveParticleType"]: select_grids = self.grid_active_particle_count[ptype].flat if not np.any(select_grids): current_ptypes = self.dataset.particle_types new_ptypes = [p for p in current_ptypes if p != ptype] self.dataset.particle_types = new_ptypes self.dataset.particle_types_raw = new_ptypes continue if ptype != "DarkMatter": gs = self.grids[select_grids > 0] g = gs[0] handle = h5py.File(g.filename, "r") grid_group = handle[f"/Grid{g.id:08d}"] for pname in ["Active Particles", "Particles"]: if pname in grid_group: break else: raise RuntimeError("Could not find active particle group in data.") node = grid_group[pname] for ptype in (str(p) for p in node): if ptype not in _fields: continue for field in (str(f) for f in node[ptype]): _fields[ptype].append(field) if node[ptype][field].ndim > 1: self.io._array_fields[field] = ( node[ptype][field].shape[1:], ) fields += [(ptype, field) for field in _fields.pop(ptype)] handle.close() return set(fields) def _setup_derived_fields(self): super()._setup_derived_fields() aps = self.dataset.parameters.get("AppendActiveParticleType", []) for fname, field in self.ds.field_info.items(): if not field.sampling_type == "particle": continue if isinstance(fname, tuple): continue if field._function is NullFunc: continue for apt in aps: dd = field._copy_def() dd.pop("name") self.ds.field_info.add_field((apt, fname), sampling_type="cell", **dd) def _detect_output_fields(self): self.field_list = [] # Do this only on the root processor to save disk work. if self.comm.rank in (0, None): mylog.info("Gathering a field list (this may take a moment.)") field_list = set() random_sample = self._generate_random_grids() for grid in random_sample: if not hasattr(grid, "filename"): continue try: gf = self.io._read_field_names(grid) except self.io._read_exception as e: raise OSError("Grid %s is a bit funky?", grid.id) from e mylog.debug("Grid %s has: %s", grid.id, gf) field_list = field_list.union(gf) if "AppendActiveParticleType" in self.dataset.parameters: ap_fields = self._detect_active_particle_fields() field_list = list(set(field_list).union(ap_fields)) if not any(f[0] == "io" for f in field_list): if "io" in self.dataset.particle_types_raw: ptypes_raw = list(self.dataset.particle_types_raw) ptypes_raw.remove("io") self.dataset.particle_types_raw = tuple(ptypes_raw) if "io" in self.dataset.particle_types: ptypes = list(self.dataset.particle_types) ptypes.remove("io") self.dataset.particle_types = tuple(ptypes) ptypes = self.dataset.particle_types ptypes_raw = self.dataset.particle_types_raw else: field_list = None ptypes = None ptypes_raw = None self.field_list = list(self.comm.mpi_bcast(field_list)) self.dataset.particle_types = list(self.comm.mpi_bcast(ptypes)) self.dataset.particle_types_raw = list(self.comm.mpi_bcast(ptypes_raw)) def _generate_random_grids(self): if self.num_grids > 40: rng = np.random.default_rng() starter = rng.integers(0, 20) random_sample = np.mgrid[starter : len(self.grids) - 1 : 20j].astype( "int32" ) # We also add in a bit to make sure that some of the grids have # particles gwp = self.grid_particle_count > 0 if np.any(gwp) and not np.any(gwp[random_sample,]): # We just add one grid. This is not terribly efficient. first_grid = np.where(gwp)[0][0] random_sample.resize((21,)) random_sample[-1] = first_grid mylog.debug("Added additional grid %s", first_grid) mylog.debug("Checking grids: %s", random_sample.tolist()) else: random_sample = np.mgrid[0 : max(len(self.grids), 1)].astype("int32") return self.grids[random_sample,] def _get_particle_type_counts(self): try: ret = {} for ptype in self.grid_active_particle_count: ret[ptype] = self.grid_active_particle_count[ptype].sum() return ret except AttributeError: return super()._get_particle_type_counts()
[docs] def find_particles_by_type(self, ptype, max_num=None, additional_fields=None): """ Returns a structure of arrays with all of the particles' positions, velocities, masses, types, IDs, and attributes for a particle type **ptype** for a maximum of **max_num** particles. If non-default particle fields are used, provide them in **additional_fields**. """ # Not sure whether this routine should be in the general HierarchyType. if self.grid_particle_count.sum() == 0: mylog.info("Data contains no particles.") return None if additional_fields is None: additional_fields = [ "metallicity_fraction", "creation_time", "dynamical_time", ] pfields = [f for f in self.field_list if f.startswith("particle_")] nattr = self.dataset["NumberOfParticleAttributes"] if nattr > 0: pfields += additional_fields[:nattr] # Find where the particles reside and count them if max_num is None: max_num = 1e100 total = 0 pstore = [] for level in range(self.max_level, -1, -1): for grid in self.select_grids(level): index = np.where(grid["particle_type"] == ptype)[0] total += len(index) pstore.append(index) if total >= max_num: break if total >= max_num: break result = None if total > 0: result = {} for p in pfields: result[p] = np.zeros(total, "float64") # Now we retrieve data for each field ig = count = 0 for level in range(self.max_level, -1, -1): for grid in self.select_grids(level): nidx = len(pstore[ig]) if nidx > 0: for p in pfields: result[p][count : count + nidx] = grid[p][pstore[ig]] count += nidx ig += 1 if count >= total: break if count >= total: break # Crop data if retrieved more than max_num if count > max_num: for p in pfields: result[p] = result[p][0:max_num] return result
[docs] class EnzoHierarchyInMemory(EnzoHierarchy): grid = EnzoGridInMemory @cached_property def enzo(self): import enzo return enzo def __init__(self, ds, dataset_type=None): self.dataset_type = dataset_type self.float_type = "float64" self.dataset = weakref.proxy(ds) # for _obtain_enzo self.float_type = self.enzo.hierarchy_information["GridLeftEdge"].dtype self.directory = os.getcwd() GridIndex.__init__(self, ds, dataset_type) def _initialize_data_storage(self): pass def _count_grids(self): self.num_grids = self.enzo.hierarchy_information["GridDimensions"].shape[0] def _parse_index(self): self._copy_index_structure() mylog.debug("Copying reverse tree") reverse_tree = self.enzo.hierarchy_information["GridParentIDs"].ravel().tolist() # Initial setup: mylog.debug("Reconstructing parent-child relationships") grids = [] # We enumerate, so it's 0-indexed id and 1-indexed pid self.filenames = ["-1"] * self.num_grids for id, pid in enumerate(reverse_tree): grids.append(self.grid(id + 1, self)) grids[-1].Level = self.grid_levels[id, 0] if pid > 0: grids[-1]._parent_id = pid grids[pid - 1]._children_ids.append(grids[-1].id) self.max_level = self.grid_levels.max() mylog.debug("Preparing grids") self.grids = np.empty(len(grids), dtype="object") for i, grid in enumerate(grids): if (i % 1e4) == 0: mylog.debug("Prepared % 7i / % 7i grids", i, self.num_grids) grid.filename = "Inline_processor_%07i" % (self.grid_procs[i, 0]) grid._prepare_grid() grid._setup_dx() grid.proc_num = self.grid_procs[i, 0] self.grids[i] = grid mylog.debug("Prepared") def _initialize_grid_arrays(self): EnzoHierarchy._initialize_grid_arrays(self) self.grid_procs = np.zeros((self.num_grids, 1), "int32") def _copy_index_structure(self): # Dimensions are important! self.grid_dimensions[:] = self.enzo.hierarchy_information["GridEndIndices"][:] self.grid_dimensions -= self.enzo.hierarchy_information["GridStartIndices"][:] self.grid_dimensions += 1 self.grid_left_edge[:] = self.enzo.hierarchy_information["GridLeftEdge"][:] self.grid_right_edge[:] = self.enzo.hierarchy_information["GridRightEdge"][:] self.grid_levels[:] = self.enzo.hierarchy_information["GridLevels"][:] self.grid_procs = self.enzo.hierarchy_information["GridProcs"].copy() self.grid_particle_count[:] = self.enzo.hierarchy_information[ "GridNumberOfParticles" ][:]
[docs] def save_data(self, *args, **kwargs): pass
_cached_field_list = None _cached_derived_field_list = None def _generate_random_grids(self): my_rank = self.comm.rank my_grids = self.grids[self.grid_procs.ravel() == my_rank] if len(my_grids) > 40: rng = np.random.default_rng() starter = rng.integers(0, 20) random_sample = np.mgrid[starter : len(my_grids) - 1 : 20j].astype("int32") mylog.debug("Checking grids: %s", random_sample.tolist()) else: random_sample = np.mgrid[0 : max(len(my_grids) - 1, 1)].astype("int32") return my_grids[random_sample,] def _chunk_io(self, dobj, cache=True, local_only=False): gfiles = defaultdict(list) gobjs = getattr(dobj._current_chunk, "objs", dobj._chunk_info) for g in gobjs: gfiles[g.filename].append(g) for fn in sorted(gfiles): if local_only: gobjs = [g for g in gfiles[fn] if g.proc_num == self.comm.rank] gfiles[fn] = gobjs gs = gfiles[fn] count = self._count_selection(dobj, gs) yield YTDataChunk(dobj, "io", gs, count, cache=cache)
[docs] class EnzoHierarchy1D(EnzoHierarchy): def _fill_arrays(self, ei, si, LE, RE, npart, nap): self.grid_dimensions[:, :1] = ei self.grid_dimensions[:, :1] -= np.array(si, dtype="i4") self.grid_dimensions += 1 self.grid_left_edge[:, :1] = LE self.grid_right_edge[:, :1] = RE self.grid_particle_count.flat[:] = npart self.grid_left_edge[:, 1:] = 0.0 self.grid_right_edge[:, 1:] = 1.0 self.grid_dimensions[:, 1:] = 1 if nap is not None: raise NotImplementedError
[docs] class EnzoHierarchy2D(EnzoHierarchy): def _fill_arrays(self, ei, si, LE, RE, npart, nap): self.grid_dimensions[:, :2] = ei self.grid_dimensions[:, :2] -= np.array(si, dtype="i4") self.grid_dimensions += 1 self.grid_left_edge[:, :2] = LE self.grid_right_edge[:, :2] = RE self.grid_particle_count.flat[:] = npart self.grid_left_edge[:, 2] = 0.0 self.grid_right_edge[:, 2] = 1.0 self.grid_dimensions[:, 2] = 1 if nap is not None: raise NotImplementedError
[docs] class EnzoDataset(Dataset): """ Enzo-specific output, set at a fixed time. """ _load_requirements = ["h5py"] _index_class = EnzoHierarchy _field_info_class = EnzoFieldInfo def __init__( self, filename, dataset_type=None, parameter_override=None, conversion_override=None, storage_filename=None, units_override=None, unit_system="cgs", default_species_fields=None, ): """ This class is a stripped down class that simply reads and parses *filename* without looking at the index. *dataset_type* gets passed to the index to pre-determine the style of data-output. However, it is not strictly necessary. Optionally you may specify a *parameter_override* dictionary that will override anything in the parameter file and a *conversion_override* dictionary that consists of {fieldname : conversion_to_cgs} that will override the #DataCGS. """ self.fluid_types += ("enzo",) if filename.endswith(".hierarchy"): filename = filename[:-10] if parameter_override is None: parameter_override = {} self._parameter_override = parameter_override if conversion_override is None: conversion_override = {} self._conversion_override = conversion_override self.storage_filename = storage_filename Dataset.__init__( self, filename, dataset_type, units_override=units_override, unit_system=unit_system, default_species_fields=default_species_fields, ) def _setup_1d(self): self._index_class = EnzoHierarchy1D self.domain_left_edge = np.concatenate([[self.domain_left_edge], [0.0, 0.0]]) self.domain_right_edge = np.concatenate([[self.domain_right_edge], [1.0, 1.0]]) def _setup_2d(self): self._index_class = EnzoHierarchy2D self.domain_left_edge = np.concatenate([self.domain_left_edge, [0.0]]) self.domain_right_edge = np.concatenate([self.domain_right_edge, [1.0]])
[docs] def get_parameter(self, parameter, type=None): """ Gets a parameter not in the parameterDict. """ if parameter in self.parameters: return self.parameters[parameter] for line in open(self.parameter_filename): if line.find("#") >= 1: # Keep the commented lines line = line[: line.find("#")] line = line.strip().rstrip() if len(line) < 2: continue try: param, vals = map(string.strip, map(string.rstrip, line.split("="))) except ValueError: mylog.error("ValueError: '%s'", line) if parameter == param: if type is None: t = vals.split() else: t = map(type, vals.split()) if len(t) == 1: self.parameters[param] = t[0] else: self.parameters[param] = t if param.endswith("Units") and not param.startswith("Temperature"): dataType = param[:-5] self.conversion_factors[dataType] = self.parameters[param] return self.parameters[parameter] return ""
@cached_property def unique_identifier(self) -> str: if "CurrentTimeIdentifier" in self.parameters: # enzo2 return str(self.parameters["CurrentTimeIdentifier"]) elif "MetaDataDatasetUUID" in self.parameters: # enzo2 return str(self.parameters["MetaDataDatasetUUID"]) elif "Internal" in self.parameters: # enzo3 return str( self.parameters["Internal"]["Provenance"]["CurrentTimeIdentidier"] ) else: return super().unique_identifier def _parse_parameter_file(self): """ Parses the parameter file and establishes the various dictionaries. """ # Let's read the file with open(self.parameter_filename) as f: line = f.readline().strip() f.seek(0) if line == "Internal:": self._parse_enzo3_parameter_file(f) else: self._parse_enzo2_parameter_file(f) def _parse_enzo3_parameter_file(self, f): self.parameters = p = libconf.load(f) sim = p["SimulationControl"] internal = p["Internal"] phys = p["Physics"] self.refine_by = sim["AMR"]["RefineBy"] self._periodicity = tuple( a == 3 for a in sim["Domain"]["LeftFaceBoundaryCondition"] ) self.dimensionality = sim["Domain"]["TopGridRank"] self.domain_dimensions = np.array( sim["Domain"]["TopGridDimensions"], dtype="int64" ) self.domain_left_edge = np.array( sim["Domain"]["DomainLeftEdge"], dtype="float64" ) self.domain_right_edge = np.array( sim["Domain"]["DomainRightEdge"], dtype="float64" ) self.gamma = phys["Hydro"]["Gamma"] self.current_time = internal["InitialTime"] self.cosmological_simulation = phys["Cosmology"]["ComovingCoordinates"] if self.cosmological_simulation == 1: cosmo = phys["Cosmology"] self.current_redshift = internal["CosmologyCurrentRedshift"] self.omega_lambda = cosmo["OmegaLambdaNow"] self.omega_matter = cosmo["OmegaMatterNow"] self.hubble_constant = cosmo["HubbleConstantNow"] else: self.current_redshift = 0.0 self.omega_lambda = 0.0 self.omega_matter = 0.0 self.hubble_constant = 0.0 self.cosmological_simulation = 0 self.particle_types = ["DarkMatter"] + phys["ActiveParticles"][ "ActiveParticlesEnabled" ] self.particle_types = tuple(self.particle_types) self.particle_types_raw = self.particle_types if self.dimensionality == 1: self._setup_1d() elif self.dimensionality == 2: self._setup_2d() def _parse_enzo2_parameter_file(self, f): for line in (l.strip() for l in f): if (len(line) < 2) or ("=" not in line): continue param, vals = (i.strip() for i in line.split("=", 1)) # First we try to decipher what type of value it is. vals = vals.split() # Special case approaching. if "(do" in vals: vals = vals[:1] if len(vals) == 0: pcast = str # Assume NULL output else: v = vals[0] # Figure out if it's castable to floating point: try: float(v) except ValueError: pcast = str else: if any("." in v or "e+" in v or "e-" in v for v in vals): pcast = float elif v == "inf": pcast = str else: pcast = int # Now we figure out what to do with it. if len(vals) == 0: vals = "" elif len(vals) == 1: vals = pcast(vals[0]) else: vals = np.array([pcast(i) for i in vals if i != "-99999"]) if param.startswith("Append"): if param not in self.parameters: self.parameters[param] = [] self.parameters[param].append(vals) else: self.parameters[param] = vals self.refine_by = self.parameters["RefineBy"] _periodicity = tuple( always_iterable(self.parameters["LeftFaceBoundaryCondition"] == 3) ) self.dimensionality = self.parameters["TopGridRank"] if self.dimensionality > 1: self.domain_dimensions = self.parameters["TopGridDimensions"] if len(self.domain_dimensions) < 3: tmp = self.domain_dimensions.tolist() tmp.append(1) self.domain_dimensions = np.array(tmp) _periodicity += (False,) self.domain_left_edge = np.array( self.parameters["DomainLeftEdge"], "float64" ).copy() self.domain_right_edge = np.array( self.parameters["DomainRightEdge"], "float64" ).copy() else: self.domain_left_edge = np.array( self.parameters["DomainLeftEdge"], "float64" ) self.domain_right_edge = np.array( self.parameters["DomainRightEdge"], "float64" ) self.domain_dimensions = np.array( [self.parameters["TopGridDimensions"], 1, 1] ) _periodicity += (False, False) assert len(_periodicity) == 3 self._periodicity = _periodicity self.gamma = self.parameters["Gamma"] if self.parameters["ComovingCoordinates"]: self.cosmological_simulation = 1 self.current_redshift = self.parameters["CosmologyCurrentRedshift"] self.omega_lambda = self.parameters["CosmologyOmegaLambdaNow"] self.omega_matter = self.parameters["CosmologyOmegaMatterNow"] self.omega_radiation = self.parameters.get( "CosmologyOmegaRadiationNow", 0.0 ) self.hubble_constant = self.parameters["CosmologyHubbleConstantNow"] else: self.current_redshift = 0.0 self.omega_lambda = 0.0 self.omega_matter = 0.0 self.hubble_constant = 0.0 self.cosmological_simulation = 0 self.particle_types = [] self.current_time = self.parameters["InitialTime"] if ( self.parameters["NumberOfParticles"] > 0 and "AppendActiveParticleType" in self.parameters.keys() ): # If this is the case, then we know we should have a DarkMatter # particle type, and we don't need the "io" type. self.parameters["AppendActiveParticleType"].append("DarkMatter") else: # We do not have an "io" type for Enzo particles if the # ActiveParticle machinery is on, as we simply will ignore any of # the non-DarkMatter particles in that case. However, for older # datasets, we call this particle type "io". self.particle_types = ["io"] for ptype in self.parameters.get("AppendActiveParticleType", []): self.particle_types.append(ptype) self.particle_types = tuple(self.particle_types) self.particle_types_raw = self.particle_types if self.dimensionality == 1: self._setup_1d() elif self.dimensionality == 2: self._setup_2d() def _set_code_unit_attributes(self): if self.cosmological_simulation: k = cosmology_get_units( self.hubble_constant, self.omega_matter, self.parameters["CosmologyComovingBoxSize"], self.parameters["CosmologyInitialRedshift"], self.current_redshift, ) # Now some CGS values box_size = self.parameters["CosmologyComovingBoxSize"] setdefaultattr(self, "length_unit", self.quan(box_size, "Mpccm/h")) setdefaultattr( self, "mass_unit", self.quan(k["urho"], "g/cm**3") * (self.length_unit.in_cgs()) ** 3, ) setdefaultattr(self, "time_unit", self.quan(k["utim"], "s")) setdefaultattr(self, "velocity_unit", self.quan(k["uvel"], "cm/s")) else: if "LengthUnits" in self.parameters: length_unit = self.parameters["LengthUnits"] mass_unit = self.parameters["DensityUnits"] * length_unit**3 time_unit = self.parameters["TimeUnits"] elif "SimulationControl" in self.parameters: units = self.parameters["SimulationControl"]["Units"] length_unit = units["Length"] mass_unit = units["Density"] * length_unit**3 time_unit = units["Time"] else: mylog.warning("Setting 1.0 in code units to be 1.0 cm") mylog.warning("Setting 1.0 in code units to be 1.0 s") length_unit = mass_unit = time_unit = 1.0 setdefaultattr(self, "length_unit", self.quan(length_unit, "cm")) setdefaultattr(self, "mass_unit", self.quan(mass_unit, "g")) setdefaultattr(self, "time_unit", self.quan(time_unit, "s")) setdefaultattr(self, "velocity_unit", self.length_unit / self.time_unit) density_unit = self.mass_unit / self.length_unit**3 magnetic_unit = np.sqrt(4 * np.pi * density_unit) * self.velocity_unit magnetic_unit = np.float64(magnetic_unit.in_cgs()) setdefaultattr(self, "magnetic_unit", self.quan(magnetic_unit, "gauss")) @classmethod def _is_valid(cls, filename: str, *args, **kwargs) -> bool: return filename.endswith(".hierarchy") or os.path.exists( f"{filename}.hierarchy" ) @classmethod def _guess_candidates(cls, base, directories, files): candidates = [ _ for _ in files if _.endswith(".hierarchy") and os.path.exists(os.path.join(base, _.rsplit(".", 1)[0])) ] # Typically, Enzo won't have nested outputs. return candidates, (len(candidates) == 0)
[docs] class EnzoDatasetInMemory(EnzoDataset): _index_class = EnzoHierarchyInMemory _dataset_type = "enzo_inline" def __init__(self, parameter_override=None, conversion_override=None): self.fluid_types += ("enzo",) if parameter_override is None: parameter_override = {} self._parameter_override = parameter_override if conversion_override is None: conversion_override = {} self._conversion_override = conversion_override Dataset.__init__(self, "InMemoryParameterFile", self._dataset_type) def _parse_parameter_file(self): enzo = self._obtain_enzo() ncalls = enzo.yt_parameter_file["NumberOfPythonCalls"] self._input_filename = f"cycle{ncalls:08d}" self.parameters["CurrentTimeIdentifier"] = time.time() self.parameters.update(enzo.yt_parameter_file) self.conversion_factors.update(enzo.conversion_factors) for i in self.parameters: if isinstance(self.parameters[i], tuple): self.parameters[i] = np.array(self.parameters[i]) if i.endswith("Units") and not i.startswith("Temperature"): dataType = i[:-5] self.conversion_factors[dataType] = self.parameters[i] self.domain_left_edge = self.parameters["DomainLeftEdge"].copy() self.domain_right_edge = self.parameters["DomainRightEdge"].copy() for i in self.conversion_factors: if isinstance(self.conversion_factors[i], tuple): self.conversion_factors[i] = np.array(self.conversion_factors[i]) for p, v in self._parameter_override.items(): self.parameters[p] = v for p, v in self._conversion_override.items(): self.conversion_factors[p] = v self.refine_by = self.parameters["RefineBy"] self._periodicity = tuple( always_iterable(self.parameters["LeftFaceBoundaryCondition"] == 3) ) self.dimensionality = self.parameters["TopGridRank"] self.domain_dimensions = self.parameters["TopGridDimensions"] self.current_time = self.parameters["InitialTime"] if self.parameters["ComovingCoordinates"]: self.cosmological_simulation = 1 self.current_redshift = self.parameters["CosmologyCurrentRedshift"] self.omega_lambda = self.parameters["CosmologyOmegaLambdaNow"] self.omega_matter = self.parameters["CosmologyOmegaMatterNow"] self.hubble_constant = self.parameters["CosmologyHubbleConstantNow"] else: self.current_redshift = 0.0 self.omega_lambda = 0.0 self.omega_matter = 0.0 self.hubble_constant = 0.0 self.cosmological_simulation = 0 def _obtain_enzo(self): import enzo return enzo @classmethod def _is_valid(cls, filename: str, *args, **kwargs) -> bool: return False
# These next two functions are taken from # http://www.reddit.com/r/Python/comments/6hj75/reverse_file_iterator/c03vms4 # Credit goes to "Brian" on Reddit
[docs] def rblocks(f, blocksize=4096): """Read file as series of blocks from end of file to start. The data itself is in normal order, only the order of the blocks is reversed. ie. "hello world" -> ["ld","wor", "lo ", "hel"] Note that the file must be opened in binary mode. """ if "b" not in f.mode.lower(): raise Exception("File must be opened using binary mode.") size = os.stat(f.name).st_size fullblocks, lastblock = divmod(size, blocksize) # The first(end of file) block will be short, since this leaves # the rest aligned on a blocksize boundary. This may be more # efficient than having the last (first in file) block be short f.seek(-lastblock, 2) yield f.read(lastblock).decode("ascii") for i in range(fullblocks - 1, -1, -1): f.seek(i * blocksize) yield f.read(blocksize).decode("ascii")
[docs] def rlines(f, keepends=False): """Iterate through the lines of a file in reverse order. If keepends is true, line endings are kept as part of the line. """ buf = "" for block in rblocks(f): buf = block + buf lines = buf.splitlines(keepends) # Return all lines except the first (since may be partial) if lines: lines.reverse() buf = lines.pop() # Last line becomes end of new first line. yield from lines yield buf # First line.