import abc
import glob
import os
from functools import cached_property
from yt.config import ytcfg
from yt.funcs import mylog
from yt.utilities.cython_fortran_utils import FortranFile
from .io import _read_fluid_file_descriptor
from .io_utils import read_offset
FIELD_HANDLERS: set[type["FieldFileHandler"]] = set()
[docs]
def get_field_handlers():
return FIELD_HANDLERS
[docs]
def register_field_handler(ph):
FIELD_HANDLERS.add(ph)
DETECTED_FIELDS = {} # type: ignore
[docs]
class HandlerMixin:
"""This contains all the shared methods to handle RAMSES files.
This is not supposed to be user-facing.
"""
[docs]
def setup_handler(self, domain):
"""
Initialize an instance of the class. This automatically sets
the full path to the file. This is not intended to be
overridden in most cases.
If you need more flexibility, rewrite this function to your
need in the inherited class.
"""
self.ds = ds = domain.ds
self.domain = domain
self.domain_id = domain.domain_id
basename = os.path.abspath(ds.root_folder)
iout = int(os.path.basename(ds.parameter_filename).split(".")[0].split("_")[1])
if ds.num_groups > 0:
igroup = ((domain.domain_id - 1) // ds.group_size) + 1
full_path = os.path.join(
basename,
f"group_{igroup:05d}",
self.fname.format(iout=iout, icpu=domain.domain_id),
)
else:
full_path = os.path.join(
basename, self.fname.format(iout=iout, icpu=domain.domain_id)
)
if os.path.exists(full_path):
self.fname = full_path
else:
raise FileNotFoundError(
f"Could not find {self._file_type} file (type: {self.ftype}). "
f"Tried {full_path}"
)
if self.file_descriptor is not None:
if ds.num_groups > 0:
# The particle file descriptor is *only* in the first group
self.file_descriptor = os.path.join(
basename, "group_00001", self.file_descriptor
)
else:
self.file_descriptor = os.path.join(basename, self.file_descriptor)
@property
def exists(self):
"""
This function should return True if the *file* the instance
exists. It is called for each file of the type found on the
disk.
By default, it just returns whether the file exists. Override
it for more complex cases.
"""
return os.path.exists(self.fname)
@property
def has_descriptor(self):
"""
This function should return True if a *file descriptor*
exists.
By default, it just returns whether the file exists. Override
it for more complex cases.
"""
return os.path.exists(self.file_descriptor)
[docs]
@classmethod
def any_exist(cls, ds):
"""
This function should return True if the kind of particle
represented by the class exists in the dataset. It takes as
argument the class itself -not an instance- and a dataset.
Arguments
---------
ds : a Ramses Dataset
Note
----
This function is usually called once at the initialization of
the RAMSES Dataset structure to determine if the particle type
(e.g. regular particles) exists.
"""
if ds.unique_identifier in cls._unique_registry:
return cls._unique_registry[ds.unique_identifier]
iout = int(os.path.basename(ds.parameter_filename).split(".")[0].split("_")[1])
fname = os.path.join(
os.path.split(ds.parameter_filename)[0], cls.fname.format(iout=iout, icpu=1)
)
exists = os.path.exists(fname)
cls._unique_registry[ds.unique_identifier] = exists
return exists
[docs]
class FieldFileHandler(abc.ABC, HandlerMixin):
"""
Abstract class to handle particles in RAMSES. Each instance
represents a single file (one domain).
To add support to a new particle file, inherit from this class and
implement all functions containing a `NotImplementedError`.
See `SinkParticleFileHandler` for an example implementation."""
_file_type = "field"
# These properties are static properties
ftype: str | None = None # The name to give to the field type
fname: str | None = None # The name of the file(s)
# The attributes of the header
attrs: tuple[tuple[str, int, str], ...] | None = None
known_fields = None # A list of tuple containing the field name and its type
config_field: str | None = None # Name of the config section (if any)
file_descriptor: str | None = None # The name of the file descriptor (if any)
# These properties are computed dynamically
field_offsets = None # Mapping from field to offset in file
field_types = (
None # Mapping from field to the type of the data (float, integer, ...)
)
def __init_subclass__(cls, *args, **kwargs):
"""
Registers subclasses at creation.
"""
super().__init_subclass__(*args, **kwargs)
if cls.ftype is not None:
register_field_handler(cls)
cls._unique_registry = {}
return cls
def __init__(self, domain):
self.setup_handler(domain)
[docs]
@classmethod
@abc.abstractmethod
def detect_fields(cls, ds):
"""
Called once to setup the fields of this type
It should set the following static variables:
* parameters: dictionary
Dictionary containing the variables. The keys should match
those of `cls.attrs`
* field_list: list of (ftype, fname)
The list of the field present in the file
"""
pass
[docs]
@classmethod
def get_detected_fields(cls, ds):
"""
Get the detected fields from the registry.
"""
if ds.unique_identifier in DETECTED_FIELDS:
d = DETECTED_FIELDS[ds.unique_identifier]
if cls.ftype in d:
return d[cls.ftype]
return None
[docs]
@classmethod
def set_detected_fields(cls, ds, fields):
"""
Store the detected fields into the registry.
"""
if ds.unique_identifier not in DETECTED_FIELDS:
DETECTED_FIELDS[ds.unique_identifier] = {}
DETECTED_FIELDS[ds.unique_identifier].update({cls.ftype: fields})
[docs]
@classmethod
def purge_detected_fields(cls, ds):
"""
Purge the registry.
This should be called on dataset creation to force the field
detection to be called.
"""
if ds.unique_identifier in DETECTED_FIELDS:
DETECTED_FIELDS.pop(ds.unique_identifier)
@property
def level_count(self):
"""
Return the number of cells per level.
"""
if getattr(self, "_level_count", None) is not None:
return self._level_count
self.offset
return self._level_count
@cached_property
def offset(self):
"""
Compute the offsets of the fields.
By default, it skips the header (as defined by `cls.attrs`)
and computes the offset at each level.
It should be generic enough for most of the cases, but if the
*structure* of your fluid file is non-canonical, change this.
"""
nvars = len(self.field_list)
with FortranFile(self.fname) as fd:
# Skip headers
nskip = len(self.attrs)
fd.skip(nskip)
min_level = self.domain.ds.min_level
# The file is as follows:
# > headers
# loop over levels
# loop over cpu domains
# > <ilevel>: current level
# > <nocts>: number of octs in level, domain
# loop over <nvars> variables (positions, velocities, density, ...)
# loop over <2*2*2> cells in each oct
# > <data> with shape (nocts, )
#
# So there are 8 * nvars records each with length (nocts, )
# at each (level, cpus)
offset, level_count = read_offset(
fd,
min_level,
self.domain.domain_id,
self.parameters["nvar"],
self.domain.amr_header,
Nskip=nvars * 8,
)
self._level_count = level_count
return offset
[docs]
@classmethod
def load_fields_from_yt_config(cls) -> list[str]:
if cls.config_field and ytcfg.has_section(cls.config_field):
cfg = ytcfg.get(cls.config_field, "fields")
fields = [_.strip() for _ in cfg if _.strip() != ""]
return fields
return []
[docs]
class HydroFieldFileHandler(FieldFileHandler):
ftype = "ramses"
fname = "hydro_{iout:05d}.out{icpu:05d}"
file_descriptor = "hydro_file_descriptor.txt"
config_field = "ramses-hydro"
attrs = (
("ncpu", 1, "i"),
("nvar", 1, "i"),
("ndim", 1, "i"),
("nlevelmax", 1, "i"),
("nboundary", 1, "i"),
("gamma", 1, "d"),
)
[docs]
@classmethod
def detect_fields(cls, ds):
# Try to get the detected fields
detected_fields = cls.get_detected_fields(ds)
if detected_fields:
return detected_fields
num = os.path.basename(ds.parameter_filename).split(".")[0].split("_")[1]
testdomain = 1 # Just pick the first domain file to read
basepath = os.path.abspath(os.path.dirname(ds.parameter_filename))
basename = "%s/%%s_%s.out%05i" % (basepath, num, testdomain)
fname = basename % "hydro"
fname_desc = os.path.join(basepath, cls.file_descriptor)
attrs = cls.attrs
with FortranFile(fname) as fd:
hvals = fd.read_attrs(attrs)
cls.parameters = hvals
# Store some metadata
ds.gamma = hvals["gamma"]
nvar = hvals["nvar"]
ok = False
if ds._fields_in_file is not None:
# Case 1: fields are provided by users on construction of dataset
fields = list(ds._fields_in_file)
ok = True
else:
# Case 2: fields are provided by users in the config
fields = cls.load_fields_from_yt_config()
ok = len(fields) > 0
if not ok and os.path.exists(fname_desc):
# Case 3: there is a file descriptor
# Or there is an hydro file descriptor
mylog.debug("Reading hydro file descriptor.")
# For now, we can only read double precision fields
fields = [
e[0] for e in _read_fluid_file_descriptor(fname_desc, prefix="hydro")
]
# We get no fields for old-style hydro file descriptor
ok = len(fields) > 0
if not ok:
# Case 4: attempt autodetection with usual fields
foldername = os.path.abspath(os.path.dirname(ds.parameter_filename))
rt_flag = any(glob.glob(os.sep.join([foldername, "info_rt_*.txt"])))
if rt_flag: # rt run
if nvar < 10:
mylog.info("Detected RAMSES-RT file WITHOUT IR trapping.")
fields = [
"Density",
"x-velocity",
"y-velocity",
"z-velocity",
"Pressure",
"Metallicity",
"HII",
"HeII",
"HeIII",
]
else:
mylog.info("Detected RAMSES-RT file WITH IR trapping.")
fields = [
"Density",
"x-velocity",
"y-velocity",
"z-velocity",
"Pres_IR",
"Pressure",
"Metallicity",
"HII",
"HeII",
"HeIII",
]
else:
if nvar < 5:
mylog.debug(
"nvar=%s is too small! YT doesn't currently "
"support 1D/2D runs in RAMSES %s"
)
raise ValueError
# Basic hydro runs
if nvar == 5:
fields = [
"Density",
"x-velocity",
"y-velocity",
"z-velocity",
"Pressure",
]
if nvar > 5 and nvar < 11:
fields = [
"Density",
"x-velocity",
"y-velocity",
"z-velocity",
"Pressure",
"Metallicity",
]
# MHD runs - NOTE:
# THE MHD MODULE WILL SILENTLY ADD 3 TO THE NVAR IN THE MAKEFILE
if nvar == 11:
fields = [
"Density",
"x-velocity",
"y-velocity",
"z-velocity",
"B_x_left",
"B_y_left",
"B_z_left",
"B_x_right",
"B_y_right",
"B_z_right",
"Pressure",
]
if nvar > 11:
fields = [
"Density",
"x-velocity",
"y-velocity",
"z-velocity",
"B_x_left",
"B_y_left",
"B_z_left",
"B_x_right",
"B_y_right",
"B_z_right",
"Pressure",
"Metallicity",
]
mylog.debug(
"No fields specified by user; automatically setting fields array to %s",
fields,
)
# Allow some wiggle room for users to add too many variables
count_extra = 0
while len(fields) < nvar:
fields.append(f"var_{len(fields)}")
count_extra += 1
if count_extra > 0:
mylog.debug("Detected %s extra fluid fields.", count_extra)
cls.field_list = [(cls.ftype, e) for e in fields]
cls.set_detected_fields(ds, fields)
return fields
[docs]
class GravFieldFileHandler(FieldFileHandler):
ftype = "gravity"
fname = "grav_{iout:05d}.out{icpu:05d}"
config_field = "ramses-grav"
attrs = (
("ncpu", 1, "i"),
("nvar", 1, "i"),
("nlevelmax", 1, "i"),
("nboundary", 1, "i"),
)
[docs]
@classmethod
def detect_fields(cls, ds):
# Try to get the detected fields
detected_fields = cls.get_detected_fields(ds)
if detected_fields:
return detected_fields
ndim = ds.dimensionality
iout = int(str(ds).split("_")[1])
basedir = os.path.split(ds.parameter_filename)[0]
fname = os.path.join(basedir, cls.fname.format(iout=iout, icpu=1))
with FortranFile(fname) as fd:
cls.parameters = fd.read_attrs(cls.attrs)
nvar = cls.parameters["nvar"]
ndim = ds.dimensionality
fields = cls.load_fields_from_yt_config()
if not fields:
if nvar == ndim + 1:
fields = ["Potential"] + [f"{k}-acceleration" for k in "xyz"[:ndim]]
else:
fields = [f"{k}-acceleration" for k in "xyz"[:ndim]]
ndetected = len(fields)
if ndetected != nvar and not ds._warned_extra_fields["gravity"]:
mylog.info("Detected %s extra gravity fields.", nvar - ndetected)
ds._warned_extra_fields["gravity"] = True
for i in range(nvar - ndetected):
fields.append(f"var{i}")
cls.field_list = [(cls.ftype, e) for e in fields]
cls.set_detected_fields(ds, fields)
return fields
[docs]
class RTFieldFileHandler(FieldFileHandler):
ftype = "ramses-rt"
fname = "rt_{iout:05d}.out{icpu:05d}"
file_descriptor = "rt_file_descriptor.txt"
config_field = "ramses-rt"
attrs = (
("ncpu", 1, "i"),
("nvar", 1, "i"),
("ndim", 1, "i"),
("nlevelmax", 1, "i"),
("nboundary", 1, "i"),
("gamma", 1, "d"),
)
[docs]
@classmethod
def detect_fields(cls, ds):
# Try to get the detected fields
detected_fields = cls.get_detected_fields(ds)
if detected_fields:
return detected_fields
fname = ds.parameter_filename.replace("info_", "info_rt_")
rheader = {}
def read_rhs(cast):
line = f.readline()
p, v = line.split("=")
rheader[p.strip()] = cast(v)
with open(fname) as f:
# Read nRTvar, nions, ngroups, iions
for _ in range(4):
read_rhs(int)
# Try to read rtprecision.
# Either it is present or the line is simply blank, so
# we try to parse the line as an int, and if it fails,
# we simply ignore it.
try:
read_rhs(int)
f.readline()
except ValueError:
pass
# Read X and Y fractions
for _ in range(2):
read_rhs(float)
f.readline()
# Reat unit_np, unit_pfd
for _ in range(2):
read_rhs(float)
# Read rt_c_frac
# Note: when using variable speed of light, this line will contain multiple
# values corresponding the the velocity at each level
read_rhs(lambda line: [float(e) for e in line.split()])
f.readline()
# Read n star, t2star, g_star
for _ in range(3):
read_rhs(float)
# Touchy part, we have to read the photon group properties
mylog.debug("Not reading photon group properties")
cls.rt_parameters = rheader
ngroups = rheader["nGroups"]
iout = int(str(ds).split("_")[1])
basedir = os.path.split(ds.parameter_filename)[0]
fname = os.path.join(basedir, cls.fname.format(iout=iout, icpu=1))
fname_desc = os.path.join(basedir, cls.file_descriptor)
with FortranFile(fname) as fd:
cls.parameters = fd.read_attrs(cls.attrs)
ok = False
if ds._fields_in_file is not None:
# Case 1: fields are provided by users on construction of dataset
fields = list(ds._fields_in_file)
ok = True
else:
# Case 2: fields are provided by users in the config
fields = cls.load_fields_from_yt_config()
ok = len(fields) > 0
if not ok and os.path.exists(fname_desc):
# Case 3: there is a file descriptor
# Or there is an hydro file descriptor
mylog.debug("Reading rt file descriptor.")
# For now, we can only read double precision fields
fields = [
e[0] for e in _read_fluid_file_descriptor(fname_desc, prefix="rt")
]
ok = len(fields) > 0
if not ok:
fields = []
tmp = [
"Photon_density_%s",
"Photon_flux_x_%s",
"Photon_flux_y_%s",
"Photon_flux_z_%s",
]
for ng in range(ngroups):
fields.extend([t % (ng + 1) for t in tmp])
cls.field_list = [(cls.ftype, e) for e in fields]
cls.set_detected_fields(ds, fields)
return fields
[docs]
@classmethod
def get_rt_parameters(cls, ds):
if cls.rt_parameters:
return cls.rt_parameters
# Call detect fields to get the rt_parameters
cls.detect_fields(ds)
return cls.rt_parameters