Source code for yt.frontends.ramses.data_structures

import os
import weakref
from collections import defaultdict
from itertools import product
from pathlib import Path
from typing import Optional

import numpy as np

from yt.arraytypes import blankRecordArray
from yt.data_objects.index_subobjects.octree_subset import OctreeSubset
from yt.data_objects.particle_filters import add_particle_filter
from yt.data_objects.static_output import Dataset
from yt.funcs import mylog, setdefaultattr
from yt.geometry.geometry_handler import YTDataChunk
from yt.geometry.oct_container import RAMSESOctreeContainer
from yt.geometry.oct_geometry_handler import OctreeIndex
from yt.utilities.cython_fortran_utils import FortranFile as fpu
from yt.utilities.lib.cosmology_time import friedman
from yt.utilities.on_demand_imports import _f90nml as f90nml
from yt.utilities.physical_constants import kb, mp
from yt.utilities.physical_ratios import cm_per_mpc

from .definitions import (
    OUTPUT_DIR_EXP,
    OUTPUT_DIR_RE,
    STANDARD_FILE_RE,
    field_aliases,
    particle_families,
    ramses_header,
)
from .field_handlers import get_field_handlers
from .fields import _X, RAMSESFieldInfo
from .hilbert import get_cpu_list
from .io_utils import fill_hydro, read_amr
from .particle_handlers import get_particle_handlers


[docs] class RAMSESFileSanitizer: """A class to handle the different files that can be passed and associated safely to a RAMSES output.""" root_folder = None # Path | None: path to the root folder info_fname = None # Path | None: path to the info file group_name = None # str | None: name of the first group folder (if any) def __init__(self, filename): # Make the resolve optional, so that it works with symlinks paths_to_try = (Path(filename), Path(filename).resolve()) self.original_filename = filename self.output_dir = None self.info_fname = None check_functions = (self.test_with_standard_file, self.test_with_folder_name) # Loop on both the functions and the tested paths for path, check_fun in product(paths_to_try, check_functions): ok, output_dir, group_dir, info_fname = check_fun(path) if ok: break # Early exit if the ok flag is False if not ok: return self.root_folder = output_dir self.group_name = group_dir.name if group_dir else None self.info_fname = info_fname
[docs] def validate(self) -> None: # raise a TypeError if self.original_filename is not a valid path # we also want to expand '$USER' and '~' because os.path.exists('~') is always False filename: str = os.path.expanduser(self.original_filename) if not os.path.exists(filename): raise FileNotFoundError(rf"No such file or directory '{filename!s}'") if self.root_folder is None: raise ValueError( f"Could not determine output directory from '{filename!s}'\n" f"Expected a directory name of form {OUTPUT_DIR_EXP!r} " "containing an info_*.txt file and amr_* files." ) # This last case is (erroneously ?) marked as unreachable by mypy # If/when this bug is fixed upstream, mypy will warn that the unused # 'type: ignore' comment can be removed if self.info_fname is None: # type: ignore [unreachable] raise ValueError(f"Failed to detect info file from '{filename!s}'")
@property def is_valid(self) -> bool: try: self.validate() except (TypeError, FileNotFoundError, ValueError): return False else: return True
[docs] @staticmethod def check_standard_files(folder, iout): """Return True if the folder contains an amr file and the info file.""" # Check that the "amr_" and "info_" files exist ok = (folder / f"amr_{iout}.out00001").is_file() ok &= (folder / f"info_{iout}.txt").is_file() return ok
@staticmethod def _match_output_and_group( path: Path, ) -> tuple[Path, Optional[Path], Optional[str]]: # Make sure we work with a directory of the form `output_XXXXX` for p in (path, path.parent): match = OUTPUT_DIR_RE.match(p.name) if match: path = p break if match is None: return path, None, None iout = match.group(1) # See whether a folder named `group_YYYYY` exists group_dir = path / "group_00001" if group_dir.is_dir(): return path, group_dir, iout else: return path, None, iout
[docs] @classmethod def test_with_folder_name( cls, output_dir: Path ) -> tuple[bool, Optional[Path], Optional[Path], Optional[Path]]: output_dir, group_dir, iout = cls._match_output_and_group(output_dir) ok = output_dir.is_dir() and iout is not None info_fname: Optional[Path] if ok: parent_dir = group_dir or output_dir ok &= cls.check_standard_files(parent_dir, iout) info_fname = parent_dir / f"info_{iout}.txt" else: info_fname = None return ok, output_dir, group_dir, info_fname
[docs] @classmethod def test_with_standard_file( cls, filename: Path ) -> tuple[bool, Optional[Path], Optional[Path], Optional[Path]]: output_dir, group_dir, iout = cls._match_output_and_group(filename.parent) ok = ( filename.is_file() and STANDARD_FILE_RE.match(filename.name) is not None and iout is not None ) info_fname: Optional[Path] if ok: parent_dir = group_dir or output_dir ok &= cls.check_standard_files(parent_dir, iout) info_fname = parent_dir / f"info_{iout}.txt" else: info_fname = None return ok, output_dir, group_dir, info_fname
[docs] class RAMSESDomainFile: _last_mask = None _last_selector_id = None def __init__(self, ds, domain_id): self.ds = ds self.domain_id = domain_id num = os.path.basename(ds.parameter_filename).split(".")[0].split("_")[1] rootdir = ds.root_folder basedir = os.path.abspath(os.path.dirname(ds.parameter_filename)) basename = "%s/%%s_%s.out%05i" % (basedir, num, domain_id) part_file_descriptor = f"{basedir}/part_file_descriptor.txt" if ds.num_groups > 0: igroup = ((domain_id - 1) // ds.group_size) + 1 basename = "%s/group_%05i/%%s_%s.out%05i" % ( rootdir, igroup, num, domain_id, ) else: basename = "%s/%%s_%s.out%05i" % (basedir, num, domain_id) for t in ["grav", "amr"]: setattr(self, f"{t}_fn", basename % t) self._part_file_descriptor = part_file_descriptor self._read_amr_header() # Autodetect field files field_handlers = [FH(self) for FH in get_field_handlers() if FH.any_exist(ds)] self.field_handlers = field_handlers for fh in field_handlers: mylog.debug("Detected fluid type %s in domain_id=%s", fh.ftype, domain_id) fh.detect_fields(ds) # self._add_ftype(fh.ftype) # Autodetect particle files particle_handlers = [ PH(self) for PH in get_particle_handlers() if PH.any_exist(ds) ] self.particle_handlers = particle_handlers for ph in particle_handlers: mylog.debug( "Detected particle type %s in domain_id=%s", ph.ptype, domain_id ) ph.read_header() # self._add_ptype(ph.ptype) # Load the AMR structure self._read_amr() _hydro_offset = None _level_count = None def __repr__(self): return "RAMSESDomainFile: %i" % self.domain_id @property def level_count(self): lvl_count = None for fh in self.field_handlers: fh.offset if lvl_count is None: lvl_count = fh.level_count.copy() else: lvl_count += fh._level_count return lvl_count @property def amr_file(self): if hasattr(self, "_amr_file"): self._amr_file.seek(0) return self._amr_file f = fpu(self.amr_fn) self._amr_file = f f.seek(0) return f def _read_amr_header(self): hvals = {} f = self.amr_file f.seek(0) for header in ramses_header(hvals): hvals.update(f.read_attrs(header)) # For speedup, skip reading of 'headl' and 'taill' f.skip(2) hvals["numbl"] = f.read_vector("i") # That's the header, now we skip a few. hvals["numbl"] = np.array(hvals["numbl"]).reshape( (hvals["nlevelmax"], hvals["ncpu"]) ) f.skip() if hvals["nboundary"] > 0: f.skip(2) self.ngridbound = f.read_vector("i").astype("int64") else: self.ngridbound = np.zeros(hvals["nlevelmax"], dtype="int64") free_mem = f.read_attrs((("free_mem", 5, "i"),)) # NOQA ordering = f.read_vector("c") # NOQA f.skip(4) # Now we're at the tree itself # Now we iterate over each level and each CPU. self.amr_header = hvals # update levelmax force_max_level, convention = self.ds._force_max_level if convention == "yt": force_max_level += self.ds.min_level + 1 self.amr_header["nlevelmax"] = min( force_max_level, self.amr_header["nlevelmax"] ) self.amr_offset = f.tell() self.local_oct_count = hvals["numbl"][ self.ds.min_level :, self.domain_id - 1 ].sum() self.total_oct_count = hvals["numbl"][self.ds.min_level :, :].sum(axis=0) def _read_amr(self): """Open the oct file, read in octs level-by-level. For each oct, only the position, index, level and domain are needed - its position in the octree is found automatically. The most important is finding all the information to feed oct_handler.add """ self.oct_handler = RAMSESOctreeContainer( self.ds.domain_dimensions / 2, self.ds.domain_left_edge, self.ds.domain_right_edge, ) root_nodes = self.amr_header["numbl"][self.ds.min_level, :].sum() self.oct_handler.allocate_domains(self.total_oct_count, root_nodes) mylog.debug( "Reading domain AMR % 4i (%0.3e, %0.3e)", self.domain_id, self.total_oct_count.sum(), self.ngridbound.sum(), ) f = self.amr_file f.seek(self.amr_offset) min_level = self.ds.min_level max_level = read_amr( f, self.amr_header, self.ngridbound, min_level, self.oct_handler ) self.max_level = max_level self.oct_handler.finalize() # Close AMR file f.close()
[docs] def included(self, selector): if getattr(selector, "domain_id", None) is not None: return selector.domain_id == self.domain_id domain_ids = self.oct_handler.domain_identify(selector) return self.domain_id in domain_ids
[docs] class RAMSESDomainSubset(OctreeSubset): _domain_offset = 1 _block_order = "F" _base_domain = None def __init__( self, base_region, domain, ds, num_zones=2, num_ghost_zones=0, base_grid=None, ): super().__init__(base_region, domain, ds, num_zones, num_ghost_zones) self._base_grid = base_grid if num_ghost_zones > 0: if not all(ds.periodicity): mylog.warning( "Ghost zones will wrongly assume the domain to be periodic." ) # Create a base domain *with no self._base_domain.fwidth base_domain = RAMSESDomainSubset(ds.all_data(), domain, ds, num_zones) self._base_domain = base_domain elif num_ghost_zones < 0: raise RuntimeError( "Cannot initialize a domain subset with a negative number " "of ghost zones, was called with num_ghost_zones=%s" % num_ghost_zones ) def _fill_no_ghostzones(self, fd, fields, selector, file_handler): ndim = self.ds.dimensionality # Here we get a copy of the file, which we skip through and read the # bits we want. oct_handler = self.oct_handler all_fields = [f for ft, f in file_handler.field_list] fields = [f for ft, f in fields] data = {} cell_count = selector.count_oct_cells(self.oct_handler, self.domain_id) levels, cell_inds, file_inds = self.oct_handler.file_index_octs( selector, self.domain_id, cell_count ) # Initializing data container for field in fields: data[field] = np.zeros(cell_count, "float64") cpu_list = [self.domain_id - 1] fill_hydro( fd, file_handler.offset, file_handler.level_count, cpu_list, levels, cell_inds, file_inds, ndim, all_fields, fields, data, oct_handler, ) return data def _fill_with_ghostzones( self, fd, fields, selector, file_handler, num_ghost_zones ): ndim = self.ds.dimensionality ncpu = self.ds.parameters["ncpu"] # Here we get a copy of the file, which we skip through and read the # bits we want. oct_handler = self.oct_handler all_fields = [f for ft, f in file_handler.field_list] fields = [f for ft, f in fields] tr = {} cell_count = ( selector.count_octs(self.oct_handler, self.domain_id) * self.nz**ndim ) gz_cache = getattr(self, "_ghost_zone_cache", None) if gz_cache: levels, cell_inds, file_inds, domains = gz_cache else: gz_cache = ( levels, cell_inds, file_inds, domains, ) = self.oct_handler.file_index_octs_with_ghost_zones( selector, self.domain_id, cell_count, self._num_ghost_zones ) self._ghost_zone_cache = gz_cache # Initializing data container for field in fields: tr[field] = np.zeros(cell_count, "float64") cpu_list = list(range(ncpu)) fill_hydro( fd, file_handler.offset, file_handler.level_count, cpu_list, levels, cell_inds, file_inds, ndim, all_fields, fields, tr, oct_handler, domains=domains, ) return tr @property def fwidth(self): fwidth = super().fwidth if self._num_ghost_zones > 0: fwidth = fwidth.reshape(-1, 8, 3) n_oct = fwidth.shape[0] # new_fwidth contains the fwidth of the oct+ghost zones # this is a constant array in each oct, so we simply copy # the oct value using numpy fancy-indexing new_fwidth = np.zeros((n_oct, self.nz**3, 3), dtype=fwidth.dtype) new_fwidth[:, :, :] = fwidth[:, 0:1, :] fwidth = new_fwidth.reshape(-1, 3) return fwidth @property def fcoords(self): num_ghost_zones = self._num_ghost_zones if num_ghost_zones == 0: return super().fcoords oh = self.oct_handler indices = oh.fill_index(self.selector).reshape(-1, 8) oct_inds, cell_inds = oh.fill_octcellindex_neighbours( self.selector, self._num_ghost_zones ) N_per_oct = self.nz**3 oct_inds = oct_inds.reshape(-1, N_per_oct) cell_inds = cell_inds.reshape(-1, N_per_oct) inds = indices[oct_inds, cell_inds] fcoords = self.ds.arr(oh.fcoords(self.selector)[inds].reshape(-1, 3), "unitary") return fcoords
[docs] def fill(self, fd, fields, selector, file_handler): if self._num_ghost_zones == 0: return self._fill_no_ghostzones(fd, fields, selector, file_handler) else: return self._fill_with_ghostzones( fd, fields, selector, file_handler, self._num_ghost_zones )
[docs] def retrieve_ghost_zones(self, ngz, fields, smoothed=False): if smoothed: mylog.warning( "%s.retrieve_ghost_zones was called with the " "`smoothed` argument set to True. This is not supported, " "ignoring it.", self, ) smoothed = False _subset_with_gz = getattr(self, "_subset_with_gz", {}) try: new_subset = _subset_with_gz[ngz] mylog.debug( "Reusing previous subset with %s ghost zones for domain %s", ngz, self.domain_id, ) except KeyError: new_subset = RAMSESDomainSubset( self.base_region, self.domain, self.ds, num_ghost_zones=ngz, base_grid=self, ) _subset_with_gz[ngz] = new_subset # Cache the fields new_subset.get_data(fields) self._subset_with_gz = _subset_with_gz return new_subset
[docs] class RAMSESIndex(OctreeIndex): def __init__(self, ds, dataset_type="ramses"): self.fluid_field_list = ds._fields_in_file self.dataset_type = dataset_type self.dataset = weakref.proxy(ds) self.index_filename = self.dataset.parameter_filename self.directory = os.path.dirname(self.index_filename) self.max_level = None self.float_type = np.float64 super().__init__(ds, dataset_type) def _initialize_oct_handler(self): if self.ds._bbox is not None: cpu_list = get_cpu_list(self.dataset, self.dataset._bbox) else: cpu_list = range(self.dataset["ncpu"]) self.domains = [RAMSESDomainFile(self.dataset, i + 1) for i in cpu_list] total_octs = sum( dom.local_oct_count for dom in self.domains # + dom.ngridbound.sum() ) force_max_level, convention = self.ds._force_max_level if convention == "yt": force_max_level += self.ds.min_level + 1 self.max_level = min( force_max_level, max(dom.max_level for dom in self.domains) ) self.num_grids = total_octs def _detect_output_fields(self): dsl = set() # Get the detected particle fields for domain in self.domains: for ph in domain.particle_handlers: dsl.update(set(ph.field_offsets.keys())) self.particle_field_list = list(dsl) cosmo = self.ds.cosmological_simulation for f in self.particle_field_list[:]: if f[1] == "particle_birth_time" and cosmo: self.particle_field_list.append((f[0], "conformal_birth_time")) # Get the detected fields dsl = set() for fh in self.domains[0].field_handlers: dsl.update(set(fh.field_list)) self.fluid_field_list = list(dsl) self.field_list = self.particle_field_list + self.fluid_field_list def _identify_base_chunk(self, dobj): if getattr(dobj, "_chunk_info", None) is None: domains = [dom for dom in self.domains if dom.included(dobj.selector)] base_region = getattr(dobj, "base_region", dobj) if len(domains) > 1: mylog.debug("Identified %s intersecting domains", len(domains)) subsets = [ RAMSESDomainSubset( base_region, domain, self.dataset, num_ghost_zones=dobj._num_ghost_zones, ) for domain in domains ] dobj._chunk_info = subsets dobj._current_chunk = list(self._chunk_all(dobj))[0] def _chunk_all(self, dobj): oobjs = getattr(dobj._current_chunk, "objs", dobj._chunk_info) yield YTDataChunk(dobj, "all", oobjs, None) def _chunk_spatial(self, dobj, ngz, sort=None, preload_fields=None): sobjs = getattr(dobj._current_chunk, "objs", dobj._chunk_info) for og in sobjs: if ngz > 0: g = og.retrieve_ghost_zones(ngz, [], smoothed=True) else: g = og yield YTDataChunk(dobj, "spatial", [g], None) def _chunk_io(self, dobj, cache=True, local_only=False): oobjs = getattr(dobj._current_chunk, "objs", dobj._chunk_info) for subset in oobjs: yield YTDataChunk(dobj, "io", [subset], None, cache=cache) def _initialize_level_stats(self): levels = sum(dom.level_count for dom in self.domains) desc = {"names": ["numcells", "level"], "formats": ["int64"] * 2} max_level = self.dataset.min_level + self.dataset.max_level + 2 self.level_stats = blankRecordArray(desc, max_level) self.level_stats["level"] = list(range(max_level)) self.level_stats["numcells"] = [0 for i in range(max_level)] for level in range(self.dataset.min_level + 1): self.level_stats[level + 1]["numcells"] = 2 ** ( level * self.dataset.dimensionality ) for level in range(self.max_level + 1): self.level_stats[level + self.dataset.min_level + 1]["numcells"] = levels[ :, level ].sum() def _get_particle_type_counts(self): npart = 0 npart = {k: 0 for k in self.ds.particle_types if k != "all"} for dom in self.domains: for fh in dom.particle_handlers: count = fh.local_particle_count npart[fh.ptype] += count return npart
[docs] def print_stats(self): """ Prints out (stdout) relevant information about the simulation This function prints information based on the fluid on the grids, and therefore does not work for DM only runs. """ if not self.fluid_field_list: print("This function is not implemented for DM only runs") return self._initialize_level_stats() header = "{:>3}\t{:>14}\t{:>14}".format("level", "# cells", "# cells^3") print(header) print(f"{len(header.expandtabs()) * '-'}") for level in range(self.dataset.min_level + self.dataset.max_level + 2): print( "% 3i\t% 14i\t% 14i" % ( level, self.level_stats["numcells"][level], np.ceil(self.level_stats["numcells"][level] ** (1.0 / 3)), ) ) print("-" * 46) print(" \t% 14i" % (self.level_stats["numcells"].sum())) print("\n") dx = self.get_smallest_dx() try: print(f"z = {self.dataset.current_redshift:0.8f}") except Exception: pass print( "t = {:0.8e} = {:0.8e} = {:0.8e}".format( self.ds.current_time.in_units("code_time"), self.ds.current_time.in_units("s"), self.ds.current_time.in_units("yr"), ) ) print("\nSmallest Cell:") for item in ("Mpc", "pc", "AU", "cm"): print(f"\tWidth: {dx.in_units(item):0.3e}")
[docs] class RAMSESDataset(Dataset): _load_requirements = ["f90nml"] _index_class = RAMSESIndex _field_info_class = RAMSESFieldInfo gamma = 1.4 # This will get replaced on hydro_fn open def __init__( self, filename, dataset_type="ramses", fields=None, storage_filename=None, units_override=None, unit_system="cgs", extra_particle_fields=None, cosmological=None, bbox=None, max_level=None, max_level_convention=None, default_species_fields=None, ): # Here we want to initiate a traceback, if the reader is not built. if isinstance(fields, str): fields = field_aliases[fields] """ fields: An array of hydro variable fields in order of position in the hydro_XXXXX.outYYYYY file. If set to None, will try a default set of fields. extra_particle_fields: An array of extra particle variables in order of position in the particle_XXXXX.outYYYYY file. cosmological: If set to None, automatically detect cosmological simulation. If a boolean, force its value. """ self._fields_in_file = fields # By default, extra fields have not triggered a warning self._warned_extra_fields = defaultdict(lambda: False) self._extra_particle_fields = extra_particle_fields self.force_cosmological = cosmological self._bbox = bbox self._force_max_level = self._sanitize_max_level( max_level, max_level_convention ) file_handler = RAMSESFileSanitizer(filename) # ensure validation happens even if the class is instantiated # directly rather than from yt.load file_handler.validate() # Sanitize the filename info_fname = file_handler.info_fname if file_handler.group_name is not None: self.num_groups = len( [_ for _ in file_handler.root_folder.glob("group_?????") if _.is_dir()] ) else: self.num_groups = 0 self.root_folder = file_handler.root_folder Dataset.__init__( self, info_fname, dataset_type, units_override=units_override, unit_system=unit_system, default_species_fields=default_species_fields, ) # Add the particle types ptypes = [] for PH in get_particle_handlers(): if PH.any_exist(self): ptypes.append(PH.ptype) ptypes = tuple(ptypes) self.particle_types = self.particle_types_raw = ptypes # Add the fluid types for FH in get_field_handlers(): FH.purge_detected_fields(self) if FH.any_exist(self): self.fluid_types += (FH.ftype,) self.storage_filename = storage_filename @staticmethod def _sanitize_max_level(max_level, max_level_convention): # NOTE: the initialisation of the dataset class sets # self.min_level _and_ requires force_max_level # to be set, so we cannot convert from to yt/ramses # conventions if max_level is None and max_level_convention is None: return (2**999, "yt") # Check max_level is a valid, positive integer if not isinstance(max_level, (int, np.integer)): raise TypeError( f"Expected `max_level` to be a positive integer, got {max_level} " f"with type {type(max_level)} instead." ) if max_level < 0: raise ValueError( f"Expected `max_level` to be a positive integer, got {max_level} " "instead." ) # Check max_level_convention is set and acceptable if max_level_convention is None: raise ValueError( f"Received `max_level`={max_level}, but no `max_level_convention`. " "Valid conventions are 'yt' and 'ramses'." ) if max_level_convention not in ("ramses", "yt"): raise ValueError( f"Invalid convention {max_level_convention}. " "Valid choices are 'yt' and 'ramses'." ) return (max_level, max_level_convention)
[docs] def create_field_info(self, *args, **kwa): """Extend create_field_info to add the particles types.""" super().create_field_info(*args, **kwa) # Register particle filters if ("io", "particle_family") in self.field_list: for fname, value in particle_families.items(): def loc(val): def closure(pfilter, data): filter = data[(pfilter.filtered_type, "particle_family")] == val return filter return closure add_particle_filter( fname, loc(value), filtered_type="io", requires=["particle_family"] ) for k in particle_families.keys(): mylog.info("Adding particle_type: %s", k) self.add_particle_filter(f"{k}")
def __str__(self): return self.basename.rsplit(".", 1)[0] def _set_code_unit_attributes(self): """ Generates the conversion to various physical _units based on the parameter file """ # loading the units from the info file boxlen = self.parameters["boxlen"] length_unit = self.parameters["unit_l"] density_unit = self.parameters["unit_d"] time_unit = self.parameters["unit_t"] # calculating derived units (except velocity and temperature, done below) mass_unit = density_unit * length_unit**3 magnetic_unit = np.sqrt(4 * np.pi * mass_unit / (time_unit**2 * length_unit)) pressure_unit = density_unit * (length_unit / time_unit) ** 2 # TODO: # Generalize the temperature field to account for ionization # For now assume an atomic ideal gas with cosmic abundances (x_H = 0.76) mean_molecular_weight_factor = _X**-1 setdefaultattr(self, "density_unit", self.quan(density_unit, "g/cm**3")) setdefaultattr(self, "magnetic_unit", self.quan(magnetic_unit, "gauss")) setdefaultattr(self, "pressure_unit", self.quan(pressure_unit, "dyne/cm**2")) setdefaultattr(self, "time_unit", self.quan(time_unit, "s")) setdefaultattr(self, "mass_unit", self.quan(mass_unit, "g")) setdefaultattr( self, "velocity_unit", self.quan(length_unit, "cm") / self.time_unit ) temperature_unit = ( self.velocity_unit**2 * mp * mean_molecular_weight_factor / kb ) setdefaultattr(self, "temperature_unit", temperature_unit.in_units("K")) # Only the length unit get scales by a factor of boxlen setdefaultattr(self, "length_unit", self.quan(length_unit * boxlen, "cm")) def _parse_parameter_file(self): # hardcoded for now # These should be explicitly obtained from the file, but for now that # will wait until a reorganization of the source tree and better # generalization. self.dimensionality = 3 self.refine_by = 2 self.parameters["HydroMethod"] = "ramses" self.parameters["Time"] = 1.0 # default unit is 1... # We now execute the same logic Oliver's code does rheader = {} def read_rhs(f, cast): line = f.readline().strip() if line and "=" in line: key, val = (_.strip() for _ in line.split("=")) rheader[key] = cast(val) return key else: return None def cast_a_else_b(cast_a, cast_b): def caster(val): try: return cast_a(val) except ValueError: return cast_b(val) return caster with open(self.parameter_filename) as f: # Standard: first six are ncpu, ndim, levelmin, levelmax, ngridmax, nstep_coarse for _ in range(6): read_rhs(f, int) f.readline() # Standard: next 11 are boxlen, time, aexp, h0, omega_m, omega_l, omega_k, omega_b, unit_l, unit_d, unit_t for _ in range(11): key = read_rhs(f, float) # Read non standard extra fields until hitting the ordering type while key != "ordering type": key = read_rhs(f, cast_a_else_b(float, str)) # This next line deserves some comment. We specify a min_level that # corresponds to the minimum level in the RAMSES simulation. RAMSES is # one-indexed, but it also does refer to the *oct* dimensions -- so # this means that a levelmin of 1 would have *1* oct in it. So a # levelmin of 2 would have 8 octs at the root mesh level. self.min_level = rheader["levelmin"] - 1 # Now we read the hilbert indices self.hilbert_indices = {} if rheader["ordering type"] == "hilbert": f.readline() # header for _ in range(rheader["ncpu"]): dom, mi, ma = f.readline().split() self.hilbert_indices[int(dom)] = (float(mi), float(ma)) if rheader["ordering type"] != "hilbert" and self._bbox is not None: raise NotImplementedError( "The ordering %s is not compatible with the `bbox` argument." % rheader["ordering type"] ) self.parameters.update(rheader) self.domain_left_edge = np.zeros(3, dtype="float64") self.domain_dimensions = np.ones(3, dtype="int32") * 2 ** (self.min_level + 1) self.domain_right_edge = np.ones(3, dtype="float64") # This is likely not true, but it's not clear # how to determine the boundary conditions self._periodicity = (True, True, True) if self.force_cosmological is not None: is_cosmological = self.force_cosmological else: # These conditions seem to always be true for non-cosmological datasets is_cosmological = not ( rheader["time"] >= 0 and rheader["H0"] == 1 and rheader["aexp"] == 1 ) if not is_cosmological: self.cosmological_simulation = 0 self.current_redshift = 0 self.hubble_constant = 0 self.omega_matter = 0 self.omega_lambda = 0 else: self.cosmological_simulation = 1 self.current_redshift = (1.0 / rheader["aexp"]) - 1.0 self.omega_lambda = rheader["omega_l"] self.omega_matter = rheader["omega_m"] self.hubble_constant = rheader["H0"] / 100.0 # This is H100 force_max_level, convention = self._force_max_level if convention == "yt": force_max_level += self.min_level + 1 self.max_level = min(force_max_level, rheader["levelmax"]) - self.min_level - 1 if self.cosmological_simulation == 0: self.current_time = self.parameters["time"] else: self.tau_frw, self.t_frw, self.dtau, self.n_frw, self.time_tot = friedman( self.omega_matter, self.omega_lambda, 1.0 - self.omega_matter - self.omega_lambda, ) age = self.parameters["time"] iage = 1 + int(10.0 * age / self.dtau) iage = np.min([iage, self.n_frw // 2 + (iage - self.n_frw // 2) // 10]) try: self.time_simu = self.t_frw[iage] * (age - self.tau_frw[iage - 1]) / ( self.tau_frw[iage] - self.tau_frw[iage - 1] ) + self.t_frw[iage - 1] * (age - self.tau_frw[iage]) / ( self.tau_frw[iage - 1] - self.tau_frw[iage] ) self.current_time = ( (self.time_tot + self.time_simu) / (self.hubble_constant * 1e7 / cm_per_mpc) / self.parameters["unit_t"] ) except IndexError: mylog.warning( "Yt could not convert conformal time to physical time. " "Yt will assume the simulation is *not* cosmological." ) self.cosmological_simulation = 0 self.current_time = self.parameters["time"] if self.num_groups > 0: self.group_size = rheader["ncpu"] // self.num_groups # Read namelist.txt file (if any) self.read_namelist()
[docs] def read_namelist(self): """Read the namelist.txt file in the output folder, if present""" namelist_file = os.path.join(self.root_folder, "namelist.txt") if os.path.exists(namelist_file): try: with open(namelist_file) as f: nml = f90nml.read(f) except ImportError as e: nml = f"An error occurred when reading the namelist: {str(e)}" except (ValueError, StopIteration, AssertionError) as err: # Note: f90nml may raise a StopIteration, a ValueError or an AssertionError if # the namelist is not valid. mylog.warning( "Could not parse `namelist.txt` file as it was malformed:", exc_info=err, ) return self.parameters["namelist"] = nml
@classmethod def _is_valid(cls, filename: str, *args, **kwargs) -> bool: if cls._missing_load_requirements(): return False return RAMSESFileSanitizer(filename).is_valid