import abc
import os
from typing import Optional
from yt._typing import FieldKey
from yt.config import ytcfg
from yt.funcs import mylog
from yt.utilities.cython_fortran_utils import FortranFile
from .field_handlers import HandlerMixin
from .io import (
_ramses_particle_binary_file_handler,
_ramses_particle_csv_file_handler,
_read_part_binary_file_descriptor,
_read_part_csv_file_descriptor,
)
PARTICLE_HANDLERS: set[type["ParticleFileHandler"]] = set()
[docs]
def get_particle_handlers():
return PARTICLE_HANDLERS
[docs]
def register_particle_handler(ph):
PARTICLE_HANDLERS.add(ph)
[docs]
class ParticleFileHandler(abc.ABC, HandlerMixin):
"""
Abstract class to handle particles in RAMSES. Each instance
represents a single file (one domain).
To add support to a new particle file, inherit from this class and
implement all functions containing a `NotImplementedError`.
See `SinkParticleFileHandler` for an example implementation."""
_file_type = "particle"
# These properties are static properties
ptype: Optional[str] = None # The name to give to the particle type
fname: Optional[str] = None # The name of the file(s).
file_descriptor: Optional[str] = None # The name of the file descriptor (if any)
attrs: tuple[tuple[str, int, str], ...] # The attributes of the header
known_fields: Optional[
list[FieldKey]
] = None # A list of tuple containing the field name and its type
config_field: Optional[str] = None # Name of the config section (if any)
# These properties are computed dynamically
field_offsets = None # Mapping from field to offset in file
field_types = (
None # Mapping from field to the type of the data (float, integer, ...)
)
local_particle_count = None # The number of particle in the domain
def __init_subclass__(cls, *args, **kwargs):
"""
Registers subclasses at creation.
"""
super().__init_subclass__(*args, **kwargs)
if cls.ptype is not None:
register_particle_handler(cls)
cls._unique_registry = {}
return cls
def __init__(self, domain):
self.setup_handler(domain)
# Attempt to read the list of fields from the config file
if self.config_field and ytcfg.has_section(self.config_field):
cfg = ytcfg.get(self.config_field, "fields")
known_fields = []
for c in (_.strip() for _ in cfg.split("\n") if _.strip() != ""):
field, field_type = (_.strip() for _ in c.split(","))
known_fields.append((field, field_type))
self.known_fields = known_fields
[docs]
@abc.abstractmethod
def read_header(self):
"""
This function is called once per file. It should:
* read the header of the file and store any relevant information
* detect the fields in the file
* compute the offsets (location in the file) of each field
It is in charge of setting `self.field_offsets` and `self.field_types`.
* `field_offsets`: dictionary: tuple -> integer
A dictionary that maps `(type, field_name)` to their
location in the file (integer)
* `field_types`: dictionary: tuple -> character
A dictionary that maps `(type, field_name)` to their type
(character), following Python's struct convention.
"""
pass
[docs]
class DefaultParticleFileHandler(ParticleFileHandler):
ptype = "io"
fname = "part_{iout:05d}.out{icpu:05d}"
file_descriptor = "part_file_descriptor.txt"
config_field = "ramses-particles"
reader = _ramses_particle_binary_file_handler
attrs = (
("ncpu", 1, "i"),
("ndim", 1, "i"),
("npart", 1, "i"),
("localseed", -1, "i"),
("nstar_tot", 1, "i"),
("mstar_tot", 1, "d"),
("mstar_lost", 1, "d"),
("nsink", 1, "i"),
)
known_fields = [
("particle_position_x", "d"),
("particle_position_y", "d"),
("particle_position_z", "d"),
("particle_velocity_x", "d"),
("particle_velocity_y", "d"),
("particle_velocity_z", "d"),
("particle_mass", "d"),
("particle_identity", "i"),
("particle_refinement_level", "i"),
]
[docs]
def read_header(self):
if not self.exists:
self.field_offsets = {}
self.field_types = {}
self.local_particle_count = 0
return
fd = FortranFile(self.fname)
fd.seek(0, os.SEEK_END)
flen = fd.tell()
fd.seek(0)
hvals = {}
attrs = self.attrs
hvals.update(fd.read_attrs(attrs))
self.header = hvals
self.local_particle_count = hvals["npart"]
extra_particle_fields = self.ds._extra_particle_fields
if self.has_descriptor:
particle_fields = _read_part_binary_file_descriptor(self.file_descriptor)
else:
particle_fields = list(self.known_fields)
if extra_particle_fields is not None:
particle_fields += extra_particle_fields
if hvals["nstar_tot"] > 0 and extra_particle_fields is not None:
particle_fields += [
("particle_birth_time", "d"),
("particle_metallicity", "d"),
]
field_offsets = {}
_pfields = {}
ptype = self.ptype
# Read offsets
for field, vtype in particle_fields:
if fd.tell() >= flen:
break
field_offsets[ptype, field] = fd.tell()
_pfields[ptype, field] = vtype
fd.skip(1)
iextra = 0
while fd.tell() < flen:
iextra += 1
field, vtype = ("particle_extra_field_%i" % iextra, "d")
particle_fields.append((field, vtype))
field_offsets[ptype, field] = fd.tell()
_pfields[ptype, field] = vtype
fd.skip(1)
fd.close()
if iextra > 0 and not self.ds._warned_extra_fields["io"]:
mylog.warning(
"Detected %s extra particle fields assuming kind "
"`double`. Consider using the `extra_particle_fields` "
"keyword argument if you have unexpected behavior.",
iextra,
)
self.ds._warned_extra_fields["io"] = True
self.field_offsets = field_offsets
self.field_types = _pfields
[docs]
class SinkParticleFileHandler(ParticleFileHandler):
"""Handle sink files"""
ptype = "sink"
fname = "sink_{iout:05d}.out{icpu:05d}"
file_descriptor = "sink_file_descriptor.txt"
config_field = "ramses-sink-particles"
reader = _ramses_particle_binary_file_handler
attrs = (("nsink", 1, "i"), ("nindsink", 1, "i"))
known_fields = [
("particle_identifier", "i"),
("particle_mass", "d"),
("particle_position_x", "d"),
("particle_position_y", "d"),
("particle_position_z", "d"),
("particle_velocity_x", "d"),
("particle_velocity_y", "d"),
("particle_velocity_z", "d"),
("particle_birth_time", "d"),
("BH_real_accretion", "d"),
("BH_bondi_accretion", "d"),
("BH_eddington_accretion", "d"),
("BH_esave", "d"),
("gas_spin_x", "d"),
("gas_spin_y", "d"),
("gas_spin_z", "d"),
("BH_spin_x", "d"),
("BH_spin_y", "d"),
("BH_spin_z", "d"),
("BH_spin", "d"),
("BH_efficiency", "d"),
]
[docs]
def read_header(self):
if not self.exists:
self.field_offsets = {}
self.field_types = {}
self.local_particle_count = 0
return
fd = FortranFile(self.fname)
fd.seek(0, os.SEEK_END)
flen = fd.tell()
fd.seek(0)
hvals = {}
# Read the header of the file
attrs = self.attrs
hvals.update(fd.read_attrs(attrs))
self._header = hvals
# This is somehow a trick here: we only want one domain to
# be read, as ramses writes all the sinks in all the
# domains. Here, we set the local_particle_count to 0 except
# for the first domain to be red.
if getattr(self.ds, "_sink_file_flag", False):
self.local_particle_count = 0
else:
self.ds._sink_file_flag = True
self.local_particle_count = hvals["nsink"]
# Read the fields + add the sink properties
if self.has_descriptor:
fields = _read_part_binary_file_descriptor(self.file_descriptor)
else:
fields = list(self.known_fields)
# Note: this follows RAMSES convention.
for i in range(self.ds.dimensionality * 2 + 1):
for ilvl in range(self.ds.max_level + 1):
fields.append((f"particle_prop_{ilvl}_{i}", "d"))
field_offsets = {}
_pfields = {}
# Fill the fields, offsets and types
self.fields = []
for field, vtype in fields:
self.fields.append(field)
if fd.tell() >= flen:
break
field_offsets[self.ptype, field] = fd.tell()
_pfields[self.ptype, field] = vtype
fd.skip(1)
self.field_offsets = field_offsets
self.field_types = _pfields
fd.close()
[docs]
class SinkParticleFileHandlerCsv(ParticleFileHandler):
"""Handle sink files from a csv file, the format from the sink particle in ramses"""
ptype = "sink_csv"
fname = "sink_{iout:05d}.csv"
file_descriptor = None
config_field = "ramses-sink-particles"
reader = _ramses_particle_csv_file_handler
attrs = (("nsink", 1, "i"), ("nindsink", 1, "i"))
[docs]
def read_header(self):
if not self.exists:
self.field_offsets = {}
self.field_types = {}
self.local_particle_count = 0
return
field_offsets = {}
_pfields = {}
fields, self.local_particle_count = _read_part_csv_file_descriptor(self.fname)
for ind, field in enumerate(fields):
field_offsets[self.ptype, field] = ind
_pfields[self.ptype, field] = "d"
self.field_offsets = field_offsets
self.field_types = _pfields