import abc
import os
import struct
from collections.abc import Callable
from functools import cached_property
from itertools import chain, count
from typing import TYPE_CHECKING, Any
import numpy as np
from yt._typing import FieldKey
from yt.config import ytcfg
from yt.funcs import mylog
from yt.utilities.cython_fortran_utils import FortranFile
from .field_handlers import HandlerMixin
from .io import (
_ramses_particle_binary_file_handler,
_ramses_particle_csv_file_handler,
_read_part_binary_file_descriptor,
_read_part_csv_file_descriptor,
convert_ramses_conformal_time_to_physical_time,
)
if TYPE_CHECKING:
from yt.frontends.ramses.data_structures import RAMSESDomainSubset
PARTICLE_HANDLERS: set[type["ParticleFileHandler"]] = set()
[docs]
def get_particle_handlers():
return PARTICLE_HANDLERS
[docs]
def register_particle_handler(ph):
PARTICLE_HANDLERS.add(ph)
[docs]
class ParticleFileHandler(abc.ABC, HandlerMixin):
"""
Abstract class to handle particles in RAMSES. Each instance
represents a single file (one domain).
To add support to a new particle file, inherit from this class and
implement all functions containing a `NotImplementedError`.
See `SinkParticleFileHandler` for an example implementation."""
_file_type = "particle"
## These properties are static
# The name to give to the particle type
ptype: str
# The name of the file(s).
fname: str
# The name of the file descriptor (if any)
file_descriptor: str | None = None
# The attributes of the header
attrs: tuple[tuple[str, int, str], ...]
# A list of tuple containing the field name and its type
known_fields: list[FieldKey]
# The function to employ to read the file
reader: Callable[
["ParticleFileHandler", "RAMSESDomainSubset", list[tuple[str, str]], int],
dict[tuple[str, str], np.ndarray],
]
# Name of the config section (if any)
config_field: str | None = None
## These properties are computed dynamically
# Mapping from field to offset in file
_field_offsets: dict[tuple[str, str], int]
# Mapping from field to the type of the data (float, integer, ...)
_field_types: dict[tuple[str, str], str]
# Number of particle in the domain
_local_particle_count: int
# The header of the file
_header: dict[str, Any]
def __init_subclass__(cls, *args, **kwargs):
"""
Registers subclasses at creation.
"""
super().__init_subclass__(*args, **kwargs)
if cls.ptype is not None:
register_particle_handler(cls)
cls._unique_registry = {}
return cls
def __init__(self, domain):
self.setup_handler(domain)
# Attempt to read the list of fields from the config file
if self.config_field and ytcfg.has_section(self.config_field):
cfg = ytcfg.get(self.config_field, "fields")
known_fields = []
for c in (_.strip() for _ in cfg.split("\n") if _.strip() != ""):
field, field_type = (_.strip() for _ in c.split(","))
known_fields.append((field, field_type))
self.known_fields = known_fields
[docs]
@abc.abstractmethod
def read_header(self):
"""
This function is called once per file. It should:
* read the header of the file and store any relevant information
* detect the fields in the file
* compute the offsets (location in the file) of each field
It is in charge of setting `self.field_offsets` and `self.field_types`.
* `_field_offsets`: dictionary: tuple -> integer
A dictionary that maps `(type, field_name)` to their
location in the file (integer)
* `_field_types`: dictionary: tuple -> character
A dictionary that maps `(type, field_name)` to their type
(character), following Python's struct convention.
"""
pass
@property
def field_offsets(self) -> dict[tuple[str, str], int]:
if hasattr(self, "_field_offsets"):
return self._field_offsets
self.read_header()
return self._field_offsets
@property
def field_types(self) -> dict[tuple[str, str], str]:
if hasattr(self, "_field_types"):
return self._field_types
self.read_header()
return self._field_types
@property
def local_particle_count(self) -> int:
if hasattr(self, "_local_particle_count"):
return self._local_particle_count
self.read_header()
return self._local_particle_count
@property
def header(self) -> dict[str, Any]:
if hasattr(self, "_header"):
return self._header
self.read_header()
return self._header
[docs]
def handle_field(
self, field: tuple[str, str], data_dict: dict[tuple[str, str], np.ndarray]
):
"""
This function allows custom code to be called to handle special cases,
such as the particle birth time.
It updates the `data_dict` dictionary with the new data.
Parameters
----------
field : tuple[str, str]
The field name.
data_dict : dict[tuple[str, str], np.ndarray]
A dictionary containing the data.
By default, this function does nothing.
"""
pass
_default_dtypes: dict[int, str] = {
1: "c", # char
2: "h", # short,
4: "f", # float
8: "d", # double
}
[docs]
class DefaultParticleFileHandler(ParticleFileHandler):
ptype = "io"
fname = "part_{iout:05d}.out{icpu:05d}"
file_descriptor = "part_file_descriptor.txt"
config_field = "ramses-particles"
reader = _ramses_particle_binary_file_handler
attrs = (
("ncpu", 1, "i"),
("ndim", 1, "i"),
("npart", 1, "i"),
("localseed", -1, "i"),
("nstar_tot", 1, "i"),
("mstar_tot", 1, "d"),
("mstar_lost", 1, "d"),
("nsink", 1, "i"),
)
known_fields = [
("particle_position_x", "d"),
("particle_position_y", "d"),
("particle_position_z", "d"),
("particle_velocity_x", "d"),
("particle_velocity_y", "d"),
("particle_velocity_z", "d"),
("particle_mass", "d"),
("particle_identity", "i"),
("particle_refinement_level", "i"),
]
[docs]
def read_header(self):
if not self.exists:
self._field_offsets = {}
self._field_types = {}
self._local_particle_count = 0
self._header = {}
return
flen = os.path.getsize(self.fname)
with FortranFile(self.fname) as fd:
hvals = dict(fd.read_attrs(self.attrs))
particle_field_pos = fd.tell()
self._header = hvals
self._local_particle_count = hvals["npart"]
extra_particle_fields = self.ds._extra_particle_fields
if self.has_descriptor:
particle_fields = _read_part_binary_file_descriptor(self.file_descriptor)
else:
particle_fields = list(self.known_fields)
if extra_particle_fields is not None:
particle_fields += extra_particle_fields
if (
hvals["nstar_tot"] > 0
and extra_particle_fields is not None
and ("particle_birth_time", "d") not in particle_fields
):
particle_fields += [
("particle_birth_time", "d"),
("particle_metallicity", "d"),
]
def build_iterator():
return chain(
particle_fields,
((f"particle_extra_field_{i}", "d") for i in count(1)),
)
field_offsets = {}
_pfields = {}
ptype = self.ptype
blockLen = struct.calcsize("i") * 2
particle_fields_iterator = build_iterator()
ipos = particle_field_pos
while ipos < flen:
field, vtype = next(particle_fields_iterator)
field_offsets[ptype, field] = ipos
_pfields[ptype, field] = vtype
ipos += blockLen + struct.calcsize(vtype) * hvals["npart"]
if ipos != flen:
particle_fields_iterator = build_iterator()
with FortranFile(self.fname) as fd:
fd.seek(particle_field_pos)
ipos = particle_field_pos
while ipos < flen:
field, vtype = next(particle_fields_iterator)
old_pos = fd.tell()
field_offsets[ptype, field] = old_pos
fd.skip(1)
ipos = fd.tell()
record_len = ipos - old_pos - blockLen
exp_len = struct.calcsize(vtype) * hvals["npart"]
if record_len != exp_len:
# Guess record vtype from length
nbytes = record_len // hvals["npart"]
# NOTE: in some simulations (e.g. New Horizon), the record length is not
# a multiple of 1, 2, 4 or 8. In this case, fallback onto assuming
# double precision.
vtype = _default_dtypes.get(nbytes, "d")
mylog.warning(
"Supposed that `%s` has type %s given record size",
field,
np.dtype(vtype),
)
_pfields[ptype, field] = vtype
if field.startswith("particle_extra_field_"):
iextra = int(field.split("_")[-1])
else:
iextra = 0
if iextra > 0 and not self.ds._warned_extra_fields["io"]:
mylog.warning(
"Detected %s extra particle fields assuming kind "
"`double`. Consider using the `extra_particle_fields` "
"keyword argument if you have unexpected behavior.",
iextra,
)
self.ds._warned_extra_fields["io"] = True
if (
self.ds.use_conformal_time
and (ptype, "particle_birth_time") in field_offsets
):
field_offsets[ptype, "conformal_birth_time"] = field_offsets[
ptype, "particle_birth_time"
]
_pfields[ptype, "conformal_birth_time"] = _pfields[
ptype, "particle_birth_time"
]
self._field_offsets = field_offsets
self._field_types = _pfields
@property
def birth_file_fname(self):
basename = os.path.abspath(self.ds.root_folder)
iout = int(
os.path.basename(self.ds.parameter_filename).split(".")[0].split("_")[1]
)
icpu = self.domain_id
fname = os.path.join(basename, f"birth_{iout:05d}.out{icpu:05d}")
return fname
@cached_property
def has_birth_file(self):
return os.path.exists(self.birth_file_fname)
[docs]
def handle_field(
self, field: tuple[str, str], data_dict: dict[tuple[str, str], np.ndarray]
):
_ptype, fname = field
if not (fname == "particle_birth_time" and self.ds.cosmological_simulation):
return
# If the birth files exist, read from them
if self.has_birth_file:
with FortranFile(self.birth_file_fname) as fd:
# Note: values are written in Gyr, so we need to convert back to code_time
data_dict[field] = (
self.ds.arr(fd.read_vector("d"), "Gyr").to("code_time").v
)
return
# Otherwise, convert conformal time to physical age
ds = self.ds
conformal_time = data_dict[field]
physical_time = (
convert_ramses_conformal_time_to_physical_time(ds, conformal_time)
.to("code_time")
.v
)
# arbitrarily set particles with zero conformal_age to zero
# particle_age. This corresponds to DM particles.
data_dict[field] = np.where(conformal_time != 0, physical_time, 0)
[docs]
class SinkParticleFileHandler(ParticleFileHandler):
"""Handle sink files"""
ptype = "sink"
fname = "sink_{iout:05d}.out{icpu:05d}"
file_descriptor = "sink_file_descriptor.txt"
config_field = "ramses-sink-particles"
reader = _ramses_particle_binary_file_handler
attrs = (("nsink", 1, "i"), ("nindsink", 1, "i"))
known_fields = [
("particle_identifier", "i"),
("particle_mass", "d"),
("particle_position_x", "d"),
("particle_position_y", "d"),
("particle_position_z", "d"),
("particle_velocity_x", "d"),
("particle_velocity_y", "d"),
("particle_velocity_z", "d"),
("particle_birth_time", "d"),
("BH_real_accretion", "d"),
("BH_bondi_accretion", "d"),
("BH_eddington_accretion", "d"),
("BH_esave", "d"),
("gas_spin_x", "d"),
("gas_spin_y", "d"),
("gas_spin_z", "d"),
("BH_spin_x", "d"),
("BH_spin_y", "d"),
("BH_spin_z", "d"),
("BH_spin", "d"),
("BH_efficiency", "d"),
]
[docs]
def read_header(self):
if not self.exists:
self._field_offsets = {}
self._field_types = {}
self._local_particle_count = 0
self._header = {}
return
fd = FortranFile(self.fname)
flen = os.path.getsize(self.fname)
hvals = {}
# Read the header of the file
attrs = self.attrs
hvals.update(fd.read_attrs(attrs))
self._header = hvals
# This is somehow a trick here: we only want one domain to
# be read, as ramses writes all the sinks in all the
# domains. Here, we set the local_particle_count to 0 except
# for the first domain to be red.
if getattr(self.ds, "_sink_file_flag", False):
self._local_particle_count = 0
else:
self.ds._sink_file_flag = True
self._local_particle_count = hvals["nsink"]
# Read the fields + add the sink properties
if self.has_descriptor:
fields = _read_part_binary_file_descriptor(self.file_descriptor)
else:
fields = list(self.known_fields)
# Note: this follows RAMSES convention.
for i in range(self.ds.dimensionality * 2 + 1):
for ilvl in range(self.ds.max_level + 1):
fields.append((f"particle_prop_{ilvl}_{i}", "d"))
field_offsets = {}
_pfields = {}
# Fill the fields, offsets and types
self.fields = []
for field, vtype in fields:
self.fields.append(field)
if fd.tell() >= flen:
break
field_offsets[self.ptype, field] = fd.tell()
_pfields[self.ptype, field] = vtype
fd.skip(1)
self._field_offsets = field_offsets
self._field_types = _pfields
fd.close()
[docs]
class SinkParticleFileHandlerCsv(ParticleFileHandler):
"""Handle sink files from a csv file, the format from the sink particle in ramses"""
ptype = "sink_csv"
fname = "sink_{iout:05d}.csv"
file_descriptor = None
config_field = "ramses-sink-particles"
reader = _ramses_particle_csv_file_handler
attrs = (("nsink", 1, "i"), ("nindsink", 1, "i"))
[docs]
def read_header(self):
if not self.exists:
self._field_offsets = {}
self._field_types = {}
self._local_particle_count = 0
self._header = {}
return
field_offsets = {}
_pfields = {}
fields, self._local_particle_count = _read_part_csv_file_descriptor(self.fname)
for ind, field in enumerate(fields):
field_offsets[self.ptype, field] = ind
_pfields[self.ptype, field] = "d"
self._field_offsets = field_offsets
self._field_types = _pfields
[docs]
def handle_field(
self, field: tuple[str, str], data_dict: dict[tuple[str, str], np.ndarray]
):
_ptype, fname = field
if not (fname == "particle_birth_time" and self.ds.cosmological_simulation):
return
# convert conformal time to physical age
ds = self.ds
conformal_time = data_dict[field]
physical_time = convert_ramses_conformal_time_to_physical_time(
ds, conformal_time
)
# arbitrarily set particles with zero conformal_age to zero
# particle_age. This corresponds to DM particles.
data_dict[field] = np.where(conformal_time > 0, physical_time, 0)