Source code for yt.frontends.ramses.data_structures

import os
import weakref
from collections import defaultdict
from functools import cached_property
from itertools import product
from pathlib import Path

import numpy as np

from yt.arraytypes import blankRecordArray
from yt.data_objects.index_subobjects.octree_subset import OctreeSubset
from yt.data_objects.particle_filters import add_particle_filter
from yt.data_objects.static_output import Dataset
from yt.funcs import mylog, setdefaultattr
from yt.geometry.geometry_handler import YTDataChunk
from yt.geometry.oct_container import RAMSESOctreeContainer
from yt.geometry.oct_geometry_handler import OctreeIndex
from yt.utilities.cython_fortran_utils import FortranFile as fpu
from yt.utilities.lib.cosmology_time import t_frw, tau_frw
from yt.utilities.on_demand_imports import _f90nml as f90nml
from yt.utilities.physical_constants import kb, mp

from .definitions import (
    OUTPUT_DIR_EXP,
    OUTPUT_DIR_RE,
    STANDARD_FILE_RE,
    field_aliases,
    particle_families,
    ramses_header,
)
from .field_handlers import get_field_handlers
from .fields import _X, RAMSESFieldInfo
from .hilbert import get_intersecting_cpus
from .io_utils import fill_hydro, read_amr
from .particle_handlers import get_particle_handlers


[docs] class RAMSESFileSanitizer: """A class to handle the different files that can be passed and associated safely to a RAMSES output.""" root_folder = None # Path | None: path to the root folder info_fname = None # Path | None: path to the info file group_name = None # str | None: name of the first group folder (if any) def __init__(self, filename): # Make the resolve optional, so that it works with symlinks paths_to_try = (Path(filename), Path(filename).resolve()) self.original_filename = filename self.output_dir = None self.info_fname = None check_functions = (self.test_with_standard_file, self.test_with_folder_name) # Loop on both the functions and the tested paths for path, check_fun in product(paths_to_try, check_functions): ok, output_dir, group_dir, info_fname = check_fun(path) if ok: break # Early exit if the ok flag is False if not ok: return self.root_folder = output_dir self.group_name = group_dir.name if group_dir else None self.info_fname = info_fname
[docs] def validate(self) -> None: # raise a TypeError if self.original_filename is not a valid path # we also want to expand '$USER' and '~' because os.path.exists('~') is always False filename: str = os.path.expanduser(self.original_filename) if not os.path.exists(filename): raise FileNotFoundError(rf"No such file or directory '{filename!s}'") if self.root_folder is None: raise ValueError( f"Could not determine output directory from '{filename!s}'\n" f"Expected a directory name of form {OUTPUT_DIR_EXP!r} " "containing an info_*.txt file and amr_* files." ) # This last case is (erroneously ?) marked as unreachable by mypy # If/when this bug is fixed upstream, mypy will warn that the unused # 'type: ignore' comment can be removed if self.info_fname is None: # type: ignore [unreachable] raise ValueError(f"Failed to detect info file from '{filename!s}'")
@property def is_valid(self) -> bool: try: self.validate() except (TypeError, FileNotFoundError, ValueError): return False else: return True
[docs] @staticmethod def check_standard_files(folder, iout): """Return True if the folder contains an amr file and the info file.""" # Check that the "amr_" and "info_" files exist ok = (folder / f"amr_{iout}.out00001").is_file() ok &= (folder / f"info_{iout}.txt").is_file() return ok
@staticmethod def _match_output_and_group( path: Path, ) -> tuple[Path, Path | None, str | None]: # Make sure we work with a directory of the form `output_XXXXX` for p in (path, path.parent): match = OUTPUT_DIR_RE.match(p.name) if match: path = p break if match is None: return path, None, None iout = match.group(1) # See whether a folder named `group_YYYYY` exists group_dir = path / "group_00001" if group_dir.is_dir(): return path, group_dir, iout else: return path, None, iout
[docs] @classmethod def test_with_folder_name( cls, output_dir: Path ) -> tuple[bool, Path | None, Path | None, Path | None]: output_dir, group_dir, iout = cls._match_output_and_group(output_dir) ok = output_dir.is_dir() and iout is not None info_fname: Path | None if ok: parent_dir = group_dir or output_dir ok &= cls.check_standard_files(parent_dir, iout) info_fname = parent_dir / f"info_{iout}.txt" else: info_fname = None return ok, output_dir, group_dir, info_fname
[docs] @classmethod def test_with_standard_file( cls, filename: Path ) -> tuple[bool, Path | None, Path | None, Path | None]: output_dir, group_dir, iout = cls._match_output_and_group(filename.parent) ok = ( filename.is_file() and STANDARD_FILE_RE.match(filename.name) is not None and iout is not None ) info_fname: Path | None if ok: parent_dir = group_dir or output_dir ok &= cls.check_standard_files(parent_dir, iout) info_fname = parent_dir / f"info_{iout}.txt" else: info_fname = None return ok, output_dir, group_dir, info_fname
[docs] class RAMSESDomainFile: _last_mask = None _last_selector_id = None _hydro_offset = None _level_count = None _oct_handler_initialized = False _amr_header_initialized = False def __init__(self, ds, domain_id): self.ds = ds self.domain_id = domain_id num = os.path.basename(ds.parameter_filename).split(".")[0].split("_")[1] rootdir = ds.root_folder basedir = os.path.abspath(os.path.dirname(ds.parameter_filename)) basename = "%s/%%s_%s.out%05i" % (basedir, num, domain_id) part_file_descriptor = f"{basedir}/part_file_descriptor.txt" if ds.num_groups > 0: igroup = ((domain_id - 1) // ds.group_size) + 1 basename = "%s/group_%05i/%%s_%s.out%05i" % ( rootdir, igroup, num, domain_id, ) else: basename = "%s/%%s_%s.out%05i" % (basedir, num, domain_id) for t in ["grav", "amr"]: setattr(self, f"{t}_fn", basename % t) self._part_file_descriptor = part_file_descriptor self.max_level = self.ds.parameters["levelmax"] - self.ds.parameters["levelmin"] # Autodetect field files field_handlers = [FH(self) for FH in get_field_handlers() if FH.any_exist(ds)] self.field_handlers = field_handlers for fh in field_handlers: mylog.debug("Detected fluid type %s in domain_id=%s", fh.ftype, domain_id) fh.detect_fields(ds) # self._add_ftype(fh.ftype) # Autodetect particle files particle_handlers = [ PH(self) for PH in get_particle_handlers() if PH.any_exist(ds) ] self.particle_handlers = particle_handlers def __repr__(self): return "RAMSESDomainFile: %i" % self.domain_id @property def level_count(self): lvl_count = None for fh in self.field_handlers: fh.offset if lvl_count is None: lvl_count = fh.level_count.copy() else: lvl_count += fh._level_count return lvl_count @property def amr_file(self): if hasattr(self, "_amr_file") and not self._amr_file.close: self._amr_file.seek(0) return self._amr_file f = fpu(self.amr_fn) self._amr_file = f f.seek(0) return f def _read_amr_header(self): if self._amr_header_initialized: return hvals = {} with self.amr_file as f: f.seek(0) for header in ramses_header(hvals): hvals.update(f.read_attrs(header)) # For speedup, skip reading of 'headl' and 'taill' f.skip(2) hvals["numbl"] = f.read_vector("i") # That's the header, now we skip a few. hvals["numbl"] = np.array(hvals["numbl"]).reshape( (hvals["nlevelmax"], hvals["ncpu"]) ) f.skip() if hvals["nboundary"] > 0: f.skip(2) self._ngridbound = f.read_vector("i").astype("int64") else: self._ngridbound = np.zeros(hvals["nlevelmax"], dtype="int64") _free_mem = f.read_attrs((("free_mem", 5, "i"),)) _ordering = f.read_vector("c") f.skip(4) # Now we're at the tree itself # Now we iterate over each level and each CPU. position = f.tell() self._amr_header = hvals self._amr_offset = position # The maximum effective level is the deepest level # that has a non-zero number of octs nocts_to_this_level = hvals["numbl"].sum(axis=1).cumsum() self._max_level = ( np.argwhere(nocts_to_this_level == nocts_to_this_level[-1])[0][0] - self.ds.parameters["levelmin"] + 1 ) # update levelmax force_max_level, convention = self.ds._force_max_level if convention == "yt": force_max_level += self.ds.min_level + 1 self._amr_header["nlevelmax"] = min( force_max_level, self._amr_header["nlevelmax"] ) self._local_oct_count = hvals["numbl"][ self.ds.min_level :, self.domain_id - 1 ].sum() imin, imax = self.ds.min_level, self._amr_header["nlevelmax"] self._total_oct_count = hvals["numbl"][imin:imax, :].sum(axis=0) self._amr_header_initialized = True @property def ngridbound(self): self._read_amr_header() return self._ngridbound @property def amr_offset(self): self._read_amr_header() return self._amr_offset @property def max_level(self): self._read_amr_header() return self._max_level @max_level.setter def max_level(self, value): self._max_level = value @property def total_oct_count(self): self._read_amr_header() return self._total_oct_count @property def local_oct_count(self): self._read_amr_header() return self._local_oct_count @property def amr_header(self): self._read_amr_header() return self._amr_header @cached_property def oct_handler(self): """Open the oct file, read in octs level-by-level. For each oct, only the position, index, level and domain are needed - its position in the octree is found automatically. The most important is finding all the information to feed oct_handler.add """ self._read_amr_header() oct_handler = RAMSESOctreeContainer( self.ds.domain_dimensions / 2, self.ds.domain_left_edge, self.ds.domain_right_edge, ) root_nodes = self.amr_header["numbl"][self.ds.min_level, :].sum() oct_handler.allocate_domains(self.total_oct_count, root_nodes) mylog.debug( "Reading domain AMR % 4i (%0.3e, %0.3e)", self.domain_id, self.total_oct_count.sum(), self.ngridbound.sum(), ) with self.amr_file as f: f.seek(self.amr_offset) min_level = self.ds.min_level max_level = read_amr( f, self.amr_header, self.ngridbound, min_level, oct_handler ) oct_handler.finalize() new_max_level = max_level if new_max_level > self.max_level: raise RuntimeError( f"The maximum level detected in the AMR file ({new_max_level}) " f" does not match the expected number {self.max_level}." ) self.max_level = new_max_level self._oct_handler_initialized = True return oct_handler
[docs] def included(self, selector): if getattr(selector, "domain_id", None) is not None: return selector.domain_id == self.domain_id domain_ids = self.oct_handler.domain_identify(selector) return self.domain_id in domain_ids
[docs] class RAMSESDomainSubset(OctreeSubset): _domain_offset = 1 _block_order = "F" _base_domain = None def __init__( self, base_region, domain, ds, num_zones=2, num_ghost_zones=0, base_grid=None, ): super().__init__(base_region, domain, ds, num_zones, num_ghost_zones) self._base_grid = base_grid if num_ghost_zones > 0: if not all(ds.periodicity): mylog.warning( "Ghost zones will wrongly assume the domain to be periodic." ) # Create a base domain *with no self._base_domain.fwidth base_domain = RAMSESDomainSubset(ds.all_data(), domain, ds, num_zones) self._base_domain = base_domain elif num_ghost_zones < 0: raise RuntimeError( "Cannot initialize a domain subset with a negative number " f"of ghost zones, was called with {num_ghost_zones=}" ) @property def oct_handler(self): return self.domain.oct_handler def _fill_no_ghostzones(self, fd, fields, selector, file_handler): ndim = self.ds.dimensionality # Here we get a copy of the file, which we skip through and read the # bits we want. oct_handler = self.oct_handler all_fields = [f for ft, f in file_handler.field_list] fields = [f for ft, f in fields] data = {} cell_count = selector.count_oct_cells(self.oct_handler, self.domain_id) # Initializing data container for field in fields: data[field] = np.zeros(cell_count, "float64") # Do an early exit if the cell count is null if cell_count == 0: return data level_inds, cell_inds, file_inds = self.oct_handler.file_index_octs( selector, self.domain_id, cell_count ) cpu_list = [self.domain_id - 1] fill_hydro( fd, file_handler.offset, file_handler.level_count, cpu_list, level_inds, cell_inds, file_inds, ndim, all_fields, fields, data, oct_handler, ) return data def _fill_with_ghostzones( self, fd, fields, selector, file_handler, num_ghost_zones ): ndim = self.ds.dimensionality ncpu = self.ds.parameters["ncpu"] # Here we get a copy of the file, which we skip through and read the # bits we want. oct_handler = self.oct_handler all_fields = [f for ft, f in file_handler.field_list] fields = [f for ft, f in fields] tr = {} cell_count = ( selector.count_octs(self.oct_handler, self.domain_id) * self.nz**ndim ) # Initializing data container for field in fields: tr[field] = np.zeros(cell_count, "float64") # Do an early exit if the cell count is null if cell_count == 0: return tr gz_cache = getattr(self, "_ghost_zone_cache", None) if gz_cache: level_inds, cell_inds, file_inds, domain_inds = gz_cache else: gz_cache = ( level_inds, cell_inds, file_inds, domain_inds, ) = self.oct_handler.file_index_octs_with_ghost_zones( selector, self.domain_id, cell_count, self._num_ghost_zones ) self._ghost_zone_cache = gz_cache cpu_list = list(range(ncpu)) fill_hydro( fd, file_handler.offset, file_handler.level_count, cpu_list, level_inds, cell_inds, file_inds, ndim, all_fields, fields, tr, oct_handler, domain_inds=domain_inds, ) return tr @property def fwidth(self): fwidth = super().fwidth if self._num_ghost_zones > 0: fwidth = fwidth.reshape(-1, 8, 3) n_oct = fwidth.shape[0] # new_fwidth contains the fwidth of the oct+ghost zones # this is a constant array in each oct, so we simply copy # the oct value using numpy fancy-indexing new_fwidth = np.zeros((n_oct, self.nz**3, 3), dtype=fwidth.dtype) new_fwidth[:, :, :] = fwidth[:, 0:1, :] fwidth = new_fwidth.reshape(-1, 3) return fwidth @property def fcoords(self): num_ghost_zones = self._num_ghost_zones if num_ghost_zones == 0: return super().fcoords oh = self.oct_handler indices = oh.fill_index(self.selector).reshape(-1, 8) oct_inds, cell_inds = oh.fill_octcellindex_neighbours( self.selector, self._num_ghost_zones ) N_per_oct = self.nz**3 oct_inds = oct_inds.reshape(-1, N_per_oct) cell_inds = cell_inds.reshape(-1, N_per_oct) inds = indices[oct_inds, cell_inds] fcoords = self.ds.arr(oh.fcoords(self.selector)[inds].reshape(-1, 3), "unitary") return fcoords
[docs] def fill(self, fd, fields, selector, file_handler): if self._num_ghost_zones == 0: return self._fill_no_ghostzones(fd, fields, selector, file_handler) else: return self._fill_with_ghostzones( fd, fields, selector, file_handler, self._num_ghost_zones )
[docs] def retrieve_ghost_zones(self, ngz, fields, smoothed=False): if smoothed: mylog.warning( "%s.retrieve_ghost_zones was called with the " "`smoothed` argument set to True. This is not supported, " "ignoring it.", self, ) smoothed = False _subset_with_gz = getattr(self, "_subset_with_gz", {}) try: new_subset = _subset_with_gz[ngz] mylog.debug( "Reusing previous subset with %s ghost zones for domain %s", ngz, self.domain_id, ) except KeyError: new_subset = RAMSESDomainSubset( self.base_region, self.domain, self.ds, num_ghost_zones=ngz, base_grid=self, ) _subset_with_gz[ngz] = new_subset # Cache the fields new_subset.get_data(fields) self._subset_with_gz = _subset_with_gz return new_subset
[docs] class RAMSESIndex(OctreeIndex): def __init__(self, ds, dataset_type="ramses"): self.fluid_field_list = ds._fields_in_file self.dataset_type = dataset_type self.dataset = weakref.proxy(ds) self.index_filename = self.dataset.parameter_filename self.directory = os.path.dirname(self.index_filename) self.float_type = np.float64 super().__init__(ds, dataset_type) def _initialize_oct_handler(self): if self.ds._bbox is not None: cpu_list = get_intersecting_cpus(self.dataset, self.dataset._bbox) else: cpu_list = range(self.dataset["ncpu"]) self.domains = [RAMSESDomainFile(self.dataset, i + 1) for i in cpu_list] @cached_property def max_level(self): force_max_level, convention = self.ds._force_max_level if convention == "yt": force_max_level += self.ds.min_level + 1 return min(force_max_level, max(dom.max_level for dom in self.domains)) @cached_property def num_grids(self): return sum( dom.local_oct_count for dom in self.domains # + dom.ngridbound.sum() ) def _detect_output_fields(self): dsl = set() # Get the detected particle fields for ph in self.domains[0].particle_handlers: dsl.update(set(ph.field_offsets.keys())) self.particle_field_list = list(dsl) # Get the detected fields dsl = set() for fh in self.domains[0].field_handlers: dsl.update(set(fh.field_list)) self.fluid_field_list = list(dsl) self.field_list = self.particle_field_list + self.fluid_field_list def _identify_base_chunk(self, dobj): use_fast_hilbert = ( hasattr(dobj, "get_bbox") and self.ds.parameters["ordering type"] == "hilbert" ) if getattr(dobj, "_chunk_info", None) is None: if use_fast_hilbert: idoms = { idom + 1 for idom in get_intersecting_cpus(self.ds, dobj, factor=3) } # If the oct handler has been initialized, use it domains = [] for dom in self.domains: # Hilbert indexing is conservative, so reject all those that # aren't in the bbox if dom.domain_id not in idoms: continue # If the domain has its oct handler, refine the selection if dom._oct_handler_initialized and not dom.included(dobj.selector): continue mylog.debug("Identified domain %s", dom.domain_id) domains.append(dom) if len(domains) >= 1: mylog.info( "Identified % 5d/% 5d intersecting domains (% 5d through hilbert key indexing)", len(domains), len(self.domains), len(idoms), ) else: domains = [dom for dom in self.domains if dom.included(dobj.selector)] if len(domains) >= 1: mylog.info("Identified %s intersecting domains", len(domains)) base_region = getattr(dobj, "base_region", dobj) subsets = [ RAMSESDomainSubset( base_region, domain, self.dataset, num_ghost_zones=dobj._num_ghost_zones, ) for domain in domains ] dobj._chunk_info = subsets dobj._current_chunk = list(self._chunk_all(dobj))[0] def _chunk_all(self, dobj): oobjs = getattr(dobj._current_chunk, "objs", dobj._chunk_info) yield YTDataChunk(dobj, "all", oobjs, None) def _chunk_spatial(self, dobj, ngz, sort=None, preload_fields=None): sobjs = getattr(dobj._current_chunk, "objs", dobj._chunk_info) for og in sobjs: if ngz > 0: g = og.retrieve_ghost_zones(ngz, [], smoothed=True) else: g = og yield YTDataChunk(dobj, "spatial", [g], None) def _chunk_io(self, dobj, cache=True, local_only=False): oobjs = getattr(dobj._current_chunk, "objs", dobj._chunk_info) for subset in oobjs: yield YTDataChunk(dobj, "io", [subset], None, cache=cache) def _initialize_level_stats(self): levels = sum(dom.level_count for dom in self.domains) desc = {"names": ["numcells", "level"], "formats": ["int64"] * 2} max_level = self.dataset.min_level + self.dataset.max_level + 2 self.level_stats = blankRecordArray(desc, max_level) self.level_stats["level"] = list(range(max_level)) self.level_stats["numcells"] = [0 for i in range(max_level)] for level in range(self.dataset.min_level + 1): self.level_stats[level + 1]["numcells"] = 2 ** ( level * self.dataset.dimensionality ) for level in range(levels.shape[1]): ncell = levels[:, level].sum() self.level_stats[level + self.dataset.min_level + 1]["numcells"] = ncell def _get_particle_type_counts(self): npart = 0 npart = {k: 0 for k in self.ds.particle_types if k != "all"} for dom in self.domains: for fh in dom.particle_handlers: count = fh.local_particle_count npart[fh.ptype] += count return npart
[docs] def print_stats(self): """ Prints out (stdout) relevant information about the simulation This function prints information based on the fluid on the grids, and therefore does not work for DM only runs. """ if not self.fluid_field_list: print("This function is not implemented for DM only runs") return self._initialize_level_stats() header = "{:>3}\t{:>14}\t{:>14}".format("level", "# cells", "# cells^3") print(header) print(f"{len(header.expandtabs()) * '-'}") for level in range(self.dataset.min_level + self.dataset.max_level + 2): print( "% 3i\t% 14i\t% 14i" % ( level, self.level_stats["numcells"][level], np.ceil(self.level_stats["numcells"][level] ** (1.0 / 3)), ) ) print("-" * 46) print(" \t% 14i" % (self.level_stats["numcells"].sum())) print("\n") dx = self.get_smallest_dx() try: print(f"z = {self.dataset.current_redshift:0.8f}") except Exception: pass print( "t = {:0.8e} = {:0.8e} = {:0.8e}".format( self.ds.current_time.in_units("code_time"), self.ds.current_time.in_units("s"), self.ds.current_time.in_units("yr"), ) ) print("\nSmallest Cell:") for item in ("Mpc", "pc", "AU", "cm"): print(f"\tWidth: {dx.in_units(item):0.3e}")
[docs] class RAMSESDataset(Dataset): _index_class = RAMSESIndex _field_info_class = RAMSESFieldInfo gamma = 1.4 # This will get replaced on hydro_fn open # RAMSES-specific parameters force_cosmological: bool | None _force_max_level: tuple[int, str] _bbox: list[list[float]] | None _self_shielding: bool | None = None def __init__( self, filename, dataset_type="ramses", fields=None, storage_filename=None, units_override=None, unit_system="cgs", extra_particle_fields=None, cosmological=None, bbox=None, max_level=None, max_level_convention=None, default_species_fields=None, self_shielding=None, use_conformal_time=None, ): # Here we want to initiate a traceback, if the reader is not built. if isinstance(fields, str): fields = field_aliases[fields] """ fields: An array of hydro variable fields in order of position in the hydro_XXXXX.outYYYYY file. If set to None, will try a default set of fields. extra_particle_fields: An array of extra particle variables in order of position in the particle_XXXXX.outYYYYY file. cosmological: If set to None, automatically detect cosmological simulation. If a boolean, force its value. self_shielding: If set to True, assume gas is self-shielded above 0.01 mp/cm^3. This affects the fields related to cooling and the mean molecular weight. """ self._fields_in_file = fields # By default, extra fields have not triggered a warning self._warned_extra_fields = defaultdict(lambda: False) self._extra_particle_fields = extra_particle_fields self.force_cosmological = cosmological self._bbox = bbox self._force_max_level = self._sanitize_max_level( max_level, max_level_convention ) file_handler = RAMSESFileSanitizer(filename) # ensure validation happens even if the class is instantiated # directly rather than from yt.load file_handler.validate() # Sanitize the filename info_fname = file_handler.info_fname if file_handler.group_name is not None: self.num_groups = len( [_ for _ in file_handler.root_folder.glob("group_?????") if _.is_dir()] ) else: self.num_groups = 0 self.root_folder = file_handler.root_folder Dataset.__init__( self, info_fname, dataset_type, units_override=units_override, unit_system=unit_system, default_species_fields=default_species_fields, ) # Add the particle types ptypes = [] for PH in get_particle_handlers(): if PH.any_exist(self): ptypes.append(PH.ptype) ptypes = tuple(ptypes) self.particle_types = self.particle_types_raw = ptypes # Add the fluid types for FH in get_field_handlers(): FH.purge_detected_fields(self) if FH.any_exist(self): self.fluid_types += (FH.ftype,) if use_conformal_time is not None: self.use_conformal_time = use_conformal_time elif self.cosmological_simulation: if "rt" in self.fluid_types: self.use_conformal_time = False else: self.use_conformal_time = True else: self.use_conformal_time = False self.storage_filename = storage_filename self.self_shielding = self_shielding @property def self_shielding(self) -> bool: if self._self_shielding is not None: return self._self_shielding # Read namelist.txt file (if any) has_namelist = self.read_namelist() if not has_namelist: self._self_shielding = False return self._self_shielding nml = self.parameters["namelist"] # "self_shielding" is stored in physics_params in older versions of the code physics_params = nml.get("physics_params", default={}) # and in "cooling_params" in more recent ones cooling_params = nml.get("cooling_params", default={}) self_shielding = physics_params.get("self_shielding", False) self_shielding |= cooling_params.get("self_shielding", False) self._self_shielding = self_shielding return self_shielding @self_shielding.setter def self_shielding(self, value): self._self_shielding = value @staticmethod def _sanitize_max_level(max_level, max_level_convention): # NOTE: the initialisation of the dataset class sets # self.min_level _and_ requires force_max_level # to be set, so we cannot convert from to yt/ramses # conventions if max_level is None and max_level_convention is None: return (2**999, "yt") # Check max_level is a valid, positive integer if not isinstance(max_level, (int, np.integer)): raise TypeError( f"Expected `max_level` to be a positive integer, got {max_level} " f"with type {type(max_level)} instead." ) if max_level < 0: raise ValueError( f"Expected `max_level` to be a positive integer, got {max_level} " "instead." ) # Check max_level_convention is set and acceptable if max_level_convention is None: raise ValueError( f"Received `max_level`={max_level}, but no `max_level_convention`. " "Valid conventions are 'yt' and 'ramses'." ) if max_level_convention not in ("ramses", "yt"): raise ValueError( f"Invalid convention {max_level_convention}. " "Valid choices are 'yt' and 'ramses'." ) return (max_level, max_level_convention)
[docs] def create_field_info(self, *args, **kwa): """Extend create_field_info to add the particles types.""" super().create_field_info(*args, **kwa) # Register particle filters if ("io", "particle_family") in self.field_list: for fname, value in particle_families.items(): def loc(val): def closure(pfilter, data): filter = data[pfilter.filtered_type, "particle_family"] == val return filter return closure add_particle_filter( fname, loc(value), filtered_type="io", requires=["particle_family"] ) for k in particle_families.keys(): mylog.info("Adding particle_type: %s", k) self.add_particle_filter(f"{k}")
def __str__(self): return self.basename.rsplit(".", 1)[0] def _set_code_unit_attributes(self): """ Generates the conversion to various physical _units based on the parameter file """ # loading the units from the info file boxlen = self.parameters["boxlen"] length_unit = self.parameters["unit_l"] density_unit = self.parameters["unit_d"] time_unit = self.parameters["unit_t"] # calculating derived units (except velocity and temperature, done below) mass_unit = density_unit * length_unit**3 magnetic_unit = np.sqrt(4 * np.pi * mass_unit / (time_unit**2 * length_unit)) pressure_unit = density_unit * (length_unit / time_unit) ** 2 # TODO: # Generalize the temperature field to account for ionization # For now assume an atomic ideal gas with cosmic abundances (x_H = 0.76) mean_molecular_weight_factor = _X**-1 setdefaultattr(self, "density_unit", self.quan(density_unit, "g/cm**3")) setdefaultattr(self, "magnetic_unit", self.quan(magnetic_unit, "gauss")) setdefaultattr(self, "pressure_unit", self.quan(pressure_unit, "dyne/cm**2")) setdefaultattr(self, "time_unit", self.quan(time_unit, "s")) setdefaultattr(self, "mass_unit", self.quan(mass_unit, "g")) setdefaultattr( self, "velocity_unit", self.quan(length_unit, "cm") / self.time_unit ) temperature_unit = ( self.velocity_unit**2 * mp * mean_molecular_weight_factor / kb ) setdefaultattr(self, "temperature_unit", temperature_unit.in_units("K")) # Only the length unit get scales by a factor of boxlen setdefaultattr(self, "length_unit", self.quan(length_unit * boxlen, "cm")) def _parse_parameter_file(self): # hardcoded for now # These should be explicitly obtained from the file, but for now that # will wait until a reorganization of the source tree and better # generalization. self.dimensionality = 3 self.refine_by = 2 self.parameters["HydroMethod"] = "ramses" self.parameters["Time"] = 1.0 # default unit is 1... # We now execute the same logic Oliver's code does rheader = {} def read_rhs(f, cast): line = f.readline().strip() if line and "=" in line: key, val = (_.strip() for _ in line.split("=")) rheader[key] = cast(val) return key else: return None def cast_a_else_b(cast_a, cast_b): def caster(val): try: return cast_a(val) except ValueError: return cast_b(val) return caster with open(self.parameter_filename) as f: # Standard: first six are ncpu, ndim, levelmin, levelmax, ngridmax, nstep_coarse for _ in range(6): read_rhs(f, int) f.readline() # Standard: next 11 are boxlen, time, aexp, h0, omega_m, omega_l, omega_k, omega_b, unit_l, unit_d, unit_t for _ in range(11): key = read_rhs(f, float) # Read non standard extra fields until hitting the ordering type while key != "ordering type": key = read_rhs(f, cast_a_else_b(float, str)) # This next line deserves some comment. We specify a min_level that # corresponds to the minimum level in the RAMSES simulation. RAMSES is # one-indexed, but it also does refer to the *oct* dimensions -- so # this means that a levelmin of 1 would have *1* oct in it. So a # levelmin of 2 would have 8 octs at the root mesh level. self.min_level = rheader["levelmin"] - 1 # Now we read the hilbert indices self.hilbert_indices = {} if rheader["ordering type"] == "hilbert": f.readline() # header for _ in range(rheader["ncpu"]): dom, mi, ma = f.readline().split() self.hilbert_indices[int(dom)] = (float(mi), float(ma)) if rheader["ordering type"] != "hilbert" and self._bbox is not None: raise NotImplementedError( f"The ordering {rheader['ordering type']} " "is not compatible with the `bbox` argument." ) self.parameters.update(rheader) self.domain_left_edge = np.zeros(3, dtype="float64") self.domain_dimensions = np.ones(3, dtype="int32") * 2 ** (self.min_level + 1) self.domain_right_edge = np.ones(3, dtype="float64") # This is likely not true, but it's not clear # how to determine the boundary conditions self._periodicity = (True, True, True) if self.force_cosmological is not None: is_cosmological = self.force_cosmological else: # These conditions seem to always be true for non-cosmological datasets is_cosmological = not ( rheader["time"] >= 0 and rheader["H0"] == 1 and rheader["aexp"] == 1 ) if not is_cosmological: self.cosmological_simulation = False self.current_redshift = 0 self.hubble_constant = 0 self.omega_matter = 0 self.omega_lambda = 0 else: self.cosmological_simulation = True self.current_redshift = (1.0 / rheader["aexp"]) - 1.0 self.omega_lambda = rheader["omega_l"] self.omega_matter = rheader["omega_m"] self.hubble_constant = rheader["H0"] / 100.0 # This is H100 force_max_level, convention = self._force_max_level if convention == "yt": force_max_level += self.min_level + 1 self.max_level = min(force_max_level, rheader["levelmax"]) - self.min_level - 1 if not self.cosmological_simulation: self.current_time = self.parameters["time"] else: aexp_grid = np.geomspace(1e-3, 1, 2_000, endpoint=False) z_grid = 1 / aexp_grid - 1 self.tau_frw = tau_frw(self, z_grid) self.t_frw = t_frw(self, z_grid) self.current_time = t_frw(self, self.current_redshift).to("Gyr") if self.num_groups > 0: self.group_size = rheader["ncpu"] // self.num_groups self.read_namelist()
[docs] def read_namelist(self) -> bool: """Read the namelist.txt file in the output folder, if present""" namelist_file = os.path.join(self.root_folder, "namelist.txt") if not os.path.exists(namelist_file): return False try: with open(namelist_file) as f: nml = f90nml.read(f) except ImportError as err: mylog.warning( "`namelist.txt` file found but missing package f90nml to read it:", exc_info=err, ) return False except (ValueError, StopIteration, AssertionError) as err: # Note: f90nml may raise a StopIteration, a ValueError or an AssertionError if # the namelist is not valid. mylog.warning( "Could not parse `namelist.txt` file as it was malformed:", exc_info=err, ) return False self.parameters["namelist"] = nml return True
@classmethod def _is_valid(cls, filename: str, *args, **kwargs) -> bool: if cls._missing_load_requirements(): return False return RAMSESFileSanitizer(filename).is_valid