import os
from collections import defaultdict
from typing import Union
import numpy as np
from yt._maintenance.deprecation import issue_deprecation_warning
from yt.frontends.ramses.definitions import VAR_DESC_RE, VERSION_RE
from yt.utilities.cython_fortran_utils import FortranFile
from yt.utilities.exceptions import (
YTFieldTypeNotFound,
YTFileNotParseable,
YTParticleOutputFormatNotImplemented,
)
from yt.utilities.io_handler import BaseIOHandler
from yt.utilities.logger import ytLogger as mylog
from yt.utilities.physical_ratios import cm_per_km, cm_per_mpc
[docs]
def convert_ramses_ages(ds, conformal_ages):
issue_deprecation_warning(
msg=(
"The `convert_ramses_ages' function is deprecated. It should be replaced "
"by the `convert_ramses_conformal_time_to_physical_age' function."
),
stacklevel=3,
since="4.0.3",
)
return convert_ramses_conformal_time_to_physical_age(ds, conformal_ages)
def _ramses_particle_binary_file_handler(particle, subset, fields, count):
"""General file handler for binary file, called by _read_particle_subset
Parameters
----------
particle : ``ParticleFileHandler``
the particle class we want to read
subset: ``RAMSESDomainSubset``
A RAMSES domain subset object
fields: list of tuple
The fields to read
count: integer
The number of elements to count
"""
tr = {}
ds = subset.domain.ds
current_time = ds.current_time.in_units("code_time").v
foffsets = particle.field_offsets
fname = particle.fname
data_types = particle.field_types
with FortranFile(fname) as fd:
# We do *all* conversion into boxlen here.
# This means that no other conversions need to be applied to convert
# positions into the same domain as the octs themselves.
for field in sorted(fields, key=lambda a: foffsets[a]):
if count == 0:
tr[field] = np.empty(0, dtype=data_types[field])
continue
fd.seek(foffsets[field])
dt = data_types[field]
tr[field] = fd.read_vector(dt)
if field[1].startswith("particle_position"):
np.divide(tr[field], ds["boxlen"], tr[field])
if ds.cosmological_simulation and field[1] == "particle_birth_time":
conformal_time = tr[field]
physical_age = convert_ramses_conformal_time_to_physical_age(
ds, conformal_time
)
tr[field] = current_time - physical_age
# arbitrarily set particles with zero conformal_age to zero
# particle_age. This corresponds to DM particles.
tr[field][conformal_time == 0] = 0
return tr
def _ramses_particle_csv_file_handler(particle, subset, fields, count):
"""General file handler for csv file, called by _read_particle_subset
Parameters
----------
particle: ``ParticleFileHandler``
the particle class we want to read
subset: ``RAMSESDomainSubset``
A RAMSES domain subset object
fields: list of tuple
The fields to read
count: integer
The number of elements to count
"""
from yt.utilities.on_demand_imports import _pandas as pd
tr = {}
ds = subset.domain.ds
current_time = ds.current_time.in_units("code_time").v
foffsets = particle.field_offsets
fname = particle.fname
list_field_ind = [
(field, foffsets[field]) for field in sorted(fields, key=lambda a: foffsets[a])
]
# read only selected fields
dat = pd.read_csv(
fname,
delimiter=",",
usecols=[ind for _field, ind in list_field_ind],
skiprows=2,
header=None,
)
for field, ind in list_field_ind:
tr[field] = dat[ind].to_numpy()
if field[1].startswith("particle_position"):
np.divide(tr[field], ds["boxlen"], tr[field])
if ds.cosmological_simulation and field[1] == "particle_birth_time":
conformal_time = tr[field]
physical_age = convert_ramses_conformal_time_to_physical_age(
ds, conformal_time
)
tr[field] = current_time - physical_age
# arbitrarily set particles with zero conformal_age to zero
# particle_age. This corresponds to DM particles.
tr[field][conformal_time == 0] = 0
return tr
[docs]
class IOHandlerRAMSES(BaseIOHandler):
_dataset_type = "ramses"
def _read_fluid_selection(self, chunks, selector, fields, size):
tr = defaultdict(list)
# Set of field types
ftypes = {f[0] for f in fields}
for chunk in chunks:
# Gather fields by type to minimize i/o operations
for ft in ftypes:
# Get all the fields of the same type
field_subs = list(filter(lambda f, ft=ft: f[0] == ft, fields))
# Loop over subsets
for subset in chunk.objs:
fname = None
for fh in subset.domain.field_handlers:
if fh.ftype == ft:
file_handler = fh
fname = fh.fname
break
if fname is None:
raise YTFieldTypeNotFound(ft)
# Now we read the entire thing
with FortranFile(fname) as fd:
# This contains the boundary information, so we skim through
# and pick off the right vectors
rv = subset.fill(fd, field_subs, selector, file_handler)
for ft, f in field_subs:
d = rv.pop(f)
mylog.debug(
"Filling %s with %s (%0.3e %0.3e) (%s zones)",
f,
d.size,
d.min(),
d.max(),
d.size,
)
tr[(ft, f)].append(d)
d = {}
for field in fields:
d[field] = np.concatenate(tr.pop(field))
return d
def _read_particle_coords(self, chunks, ptf):
pn = "particle_position_%s"
fields = [
(ptype, f"particle_position_{ax}")
for ptype, field_list in ptf.items()
for ax in "xyz"
]
for chunk in chunks:
for subset in chunk.objs:
rv = self._read_particle_subset(subset, fields)
for ptype in sorted(ptf):
yield ptype, (
rv[ptype, pn % "x"],
rv[ptype, pn % "y"],
rv[ptype, pn % "z"],
), 0.0
def _read_particle_fields(self, chunks, ptf, selector):
pn = "particle_position_%s"
chunks = list(chunks)
fields = [
(ptype, fname) for ptype, field_list in ptf.items() for fname in field_list
]
for ptype, field_list in sorted(ptf.items()):
for ax in "xyz":
if pn % ax not in field_list:
fields.append((ptype, pn % ax))
if ptype == "sink_csv":
subset = chunks[0].objs[0]
rv = self._read_particle_subset(subset, fields)
for ptype, field_list in sorted(ptf.items()):
x, y, z = (np.asarray(rv[ptype, pn % ax], "=f8") for ax in "xyz")
mask = selector.select_points(x, y, z, 0.0)
if mask is None:
mask = []
for field in field_list:
data = np.asarray(rv.pop((ptype, field))[mask], "=f8")
yield (ptype, field), data
else:
for chunk in chunks:
for subset in chunk.objs:
rv = self._read_particle_subset(subset, fields)
for ptype, field_list in sorted(ptf.items()):
x, y, z = (
np.asarray(rv[ptype, pn % ax], "=f8") for ax in "xyz"
)
mask = selector.select_points(x, y, z, 0.0)
if mask is None:
mask = []
for field in field_list:
data = np.asarray(rv.pop((ptype, field))[mask], "=f8")
yield (ptype, field), data
def _read_particle_subset(self, subset, fields):
"""Read the particle files."""
tr = {}
# Sequential read depending on particle type
for ptype in {f[0] for f in fields}:
# Select relevant files
subs_fields = filter(lambda f, ptype=ptype: f[0] == ptype, fields)
ok = False
for ph in subset.domain.particle_handlers:
if ph.ptype == ptype:
foffsets = ph.field_offsets
data_types = ph.field_types
ok = True
count = ph.local_particle_count
break
if not ok:
raise YTFieldTypeNotFound(ptype)
cosmo = self.ds.cosmological_simulation
if (ptype, "particle_birth_time") in foffsets and cosmo:
foffsets[ptype, "conformal_birth_time"] = foffsets[
ptype, "particle_birth_time"
]
data_types[ptype, "conformal_birth_time"] = data_types[
ptype, "particle_birth_time"
]
tr.update(ph.reader(subset, subs_fields, count))
return tr
def _read_part_binary_file_descriptor(fname: Union[str, "os.PathLike[str]"]):
"""
Read a file descriptor and returns the array of the fields found.
"""
# Mapping
mapping_list = [
("position_x", "particle_position_x"),
("position_y", "particle_position_y"),
("position_z", "particle_position_z"),
("velocity_x", "particle_velocity_x"),
("velocity_y", "particle_velocity_y"),
("velocity_z", "particle_velocity_z"),
("mass", "particle_mass"),
("identity", "particle_identity"),
("levelp", "particle_level"),
("family", "particle_family"),
("tag", "particle_tag"),
]
# Convert to dictionary
mapping = dict(mapping_list)
with open(fname) as f:
line = f.readline()
tmp = VERSION_RE.match(line)
mylog.debug("Reading part file descriptor %s.", fname)
if not tmp:
raise YTParticleOutputFormatNotImplemented()
version = int(tmp.group(1))
if version == 1:
# Skip one line (containing the headers)
line = f.readline()
fields = []
for i, line in enumerate(f.readlines()):
tmp = VAR_DESC_RE.match(line)
if not tmp:
raise YTFileNotParseable(fname, i + 1)
# ivar = tmp.group(1)
varname = tmp.group(2)
dtype = tmp.group(3)
if varname in mapping:
varname = mapping[varname]
else:
varname = f"particle_{varname}"
fields.append((varname, dtype))
else:
raise YTParticleOutputFormatNotImplemented()
return fields
def _read_part_csv_file_descriptor(fname: Union[str, "os.PathLike[str]"]):
"""
Read the file from the csv sink particles output.
"""
from yt.utilities.on_demand_imports import _pandas as pd
# Fields name from the default csv RAMSES sink algorithm in the yt default convention
mapping = {
" # id": "particle_identifier",
"msink": "particle_mass",
"x": "particle_position_x",
"y": "particle_position_y",
"z": "particle_position_z",
"vx": "particle_velocity_x",
"vy": "particle_velocity_y",
"vz": "particle_velocity_z",
"lx": "particle_angular_momentum_x",
"ly": "particle_angular_momentum_y",
"lz": "particle_angular_momentum_z",
"tform": "particle_formation_time",
"acc_Rate": "particle_accretion_Rate",
"del_mass": "particle_delta_mass",
"rho_gas": "particle_rho_gas",
"cs**2": "particle_sound_speed",
"etherm": "particle_etherm",
"vx_gas": "particle_velocity_x_gas",
"vy_gas": "particle_velocity_y_gas",
"vz_gas": "particle_velocity_z_gas",
"mbh": "particle_mass_bh",
"level": "particle_level",
"rsink_star": "particle_radius_star",
}
# read the all file to get the number of particle
dat = pd.read_csv(fname, delimiter=",")
fields = []
local_particle_count = len(dat)
for varname in dat.columns:
if varname in mapping:
varname = mapping[varname]
else:
varname = f"particle_{varname}"
fields.append(varname)
return fields, local_particle_count
def _read_fluid_file_descriptor(fname: Union[str, "os.PathLike[str]"], *, prefix: str):
"""
Read a file descriptor and returns the array of the fields found.
"""
# Mapping
mapping_list = [
("density", "Density"),
("velocity_x", "x-velocity"),
("velocity_y", "y-velocity"),
("velocity_z", "z-velocity"),
("pressure", "Pressure"),
("metallicity", "Metallicity"),
# Add mapping for ionized species
# Note: we expect internally that these names use the HII, HeII,
# HeIII, ... convention for historical reasons. So we need to map
# the names read from `hydro_file_descriptor.txt` to this
# convention.
# This will create fields like ("ramses", "HII") which are mapped
# to ("gas", "H_p1_fraction") in fields.py
("H_p1_fraction", "HII"),
("He_p1_fraction", "HeII"),
("He_p2_fraction", "HeIII"),
# Photon fluxes / densities are stored as `photon_density_XX`, so
# only 100 photon bands can be stored with this format. Let's be
# conservative and support up to 100 bands.
*[(f"photon_density_{i:02d}", f"Photon_density_{i:d}") for i in range(100)],
*[
(f"photon_flux_{i:02d}_{dim}", f"Photon_flux_{dim}_{i:d}")
for i in range(100)
for dim in "xyz"
],
]
# Add mapping for magnetic fields
mapping_list += [
(key, key)
for key in (
f"B_{dim}_{side}" for side in ["left", "right"] for dim in ["x", "y", "z"]
)
]
# Convert to dictionary
mapping = dict(mapping_list)
with open(fname) as f:
line = f.readline()
tmp = VERSION_RE.match(line)
mylog.debug("Reading fluid file descriptor %s.", fname)
if not tmp:
return []
version = int(tmp.group(1))
if version == 1:
# Skip one line (containing the headers)
line = f.readline()
fields = []
for i, line in enumerate(f.readlines()):
tmp = VAR_DESC_RE.match(line)
if not tmp:
raise YTFileNotParseable(fname, i + 1)
# ivar = tmp.group(1)
varname = tmp.group(2)
dtype = tmp.group(3)
if varname in mapping:
varname = mapping[varname]
else:
varname = f"{prefix}_{varname}"
fields.append((varname, dtype))
else:
mylog.error("Version %s", version)
raise YTParticleOutputFormatNotImplemented()
return fields