import os
import warnings
from functools import partial
import numpy as np
from yt import units
from yt._typing import KnownFieldsT
from yt.fields.field_detector import FieldDetector
from yt.fields.field_info_container import FieldInfoContainer
from yt.frontends.ramses.io import convert_ramses_conformal_time_to_physical_time
from yt.utilities.cython_fortran_utils import FortranFile
from yt.utilities.lib.cosmology_time import t_frw
from yt.utilities.linear_interpolators import BilinearFieldInterpolator
from yt.utilities.logger import ytLogger as mylog
from yt.utilities.physical_constants import (
boltzmann_constant_cgs,
mass_hydrogen_cgs,
mh,
mp,
)
from .field_handlers import RTFieldFileHandler
b_units = "code_magnetic"
ra_units = "code_length / code_time**2"
rho_units = "code_density"
vel_units = "code_velocity"
pressure_units = "code_pressure"
ener_units = "code_mass * code_velocity**2"
specific_ener_units = "code_velocity**2"
ang_mom_units = "code_mass * code_velocity * code_length"
cooling_function_units = " erg * cm**3 /s"
cooling_function_prime_units = " erg * cm**3 /s/K"
flux_unit = "1 / code_length**2 / code_time"
number_density_unit = "1 / code_length**3"
known_species_masses = {
sp: mh * v
for sp, v in [
("HI", 1.0),
("HII", 1.0),
("Electron", 1.0),
("HeI", 4.0),
("HeII", 4.0),
("HeIII", 4.0),
("H2I", 2.0),
("H2II", 2.0),
("HM", 1.0),
("DI", 2.0),
("DII", 2.0),
("HDI", 3.0),
]
}
known_species_names = {
"HI": "H_p0",
"HII": "H_p1",
"Electron": "El",
"HeI": "He_p0",
"HeII": "He_p1",
"HeIII": "He_p2",
"H2I": "H2_p0",
"H2II": "H2_p1",
"HM": "H_m1",
"DI": "D_p0",
"DII": "D_p1",
"HDI": "HD_p0",
}
_cool_axes = ("lognH", "logT") # , "logTeq")
_cool_arrs = (
("cooling_primordial", cooling_function_units),
("heating_primordial", cooling_function_units),
("cooling_compton", cooling_function_units),
("heating_compton", cooling_function_units),
("cooling_metal", cooling_function_units),
("cooling_primordial_prime", cooling_function_prime_units),
("heating_primordial_prime", cooling_function_prime_units),
("cooling_compton_prime", cooling_function_prime_units),
("heating_compton_prime", cooling_function_prime_units),
("cooling_metal_prime", cooling_function_prime_units),
("mu", None),
("abundances", None),
)
_cool_species = (
"Electron_number_density",
"HI_number_density",
"HII_number_density",
"HeI_number_density",
"HeII_number_density",
"HeIII_number_density",
)
_X = 0.76 # H fraction, hardcoded
_Y = 0.24 # He fraction, hardcoded
[docs]
class RAMSESFieldInfo(FieldInfoContainer):
known_other_fields: KnownFieldsT = (
("Density", (rho_units, ["density"], None)),
("x-velocity", (vel_units, ["velocity_x"], None)),
("y-velocity", (vel_units, ["velocity_y"], None)),
("z-velocity", (vel_units, ["velocity_z"], None)),
("Pres_IR", (pressure_units, ["pres_IR", "pressure_IR"], None)),
("Pressure", (pressure_units, ["pressure"], None)),
("Metallicity", ("", ["metallicity"], None)),
("HII", ("", ["H_p1_fraction"], None)),
("HeII", ("", ["He_p1_fraction"], None)),
("HeIII", ("", ["He_p2_fraction"], None)),
("x-acceleration", (ra_units, ["acceleration_x"], None)),
("y-acceleration", (ra_units, ["acceleration_y"], None)),
("z-acceleration", (ra_units, ["acceleration_z"], None)),
("Potential", (specific_ener_units, ["potential"], None)),
("B_x_left", (b_units, ["magnetic_field_x_left"], None)),
("B_x_right", (b_units, ["magnetic_field_x_right"], None)),
("B_y_left", (b_units, ["magnetic_field_y_left"], None)),
("B_y_right", (b_units, ["magnetic_field_y_right"], None)),
("B_z_left", (b_units, ["magnetic_field_z_left"], None)),
("B_z_right", (b_units, ["magnetic_field_z_right"], None)),
)
known_particle_fields: KnownFieldsT = (
("particle_position_x", ("code_length", [], None)),
("particle_position_y", ("code_length", [], None)),
("particle_position_z", ("code_length", [], None)),
("particle_velocity_x", (vel_units, [], None)),
("particle_velocity_y", (vel_units, [], None)),
("particle_velocity_z", (vel_units, [], None)),
("particle_mass", ("code_mass", [], None)),
("particle_identity", ("", ["particle_index"], None)),
("particle_refinement_level", ("", [], None)),
("particle_birth_time", ("code_time", ["age"], None)),
("conformal_birth_time", ("", [], None)),
("particle_metallicity", ("", [], None)),
("particle_family", ("", [], None)),
("particle_tag", ("", [], None)),
# sink field parameters
("particle_mass", ("code_mass", [], None)),
("particle_angular_momentum_x", (ang_mom_units, [], None)),
("particle_angular_momentum_y", (ang_mom_units, [], None)),
("particle_angular_momentum_z", (ang_mom_units, [], None)),
("particle_formation_time", ("code_time", [], None)),
("particle_accretion_rate", ("code_mass/code_time", [], None)),
("particle_delta_mass", ("code_mass", [], None)),
("particle_rho_gas", (rho_units, [], None)),
("particle_cs**2", (vel_units, [], None)),
("particle_etherm", (ener_units, [], None)),
("particle_velocity_x_gas", (vel_units, [], None)),
("particle_velocity_y_gas", (vel_units, [], None)),
("particle_velocity_z_gas", (vel_units, [], None)),
("particle_mass_bh", ("code_mass", [], None)),
("particle_level", ("", [], None)),
("particle_radius_star", ("code_length", [], None)),
)
known_sink_fields: KnownFieldsT = (
("particle_position_x", ("code_length", [], None)),
("particle_position_y", ("code_length", [], None)),
("particle_position_z", ("code_length", [], None)),
("particle_velocity_x", (vel_units, [], None)),
("particle_velocity_y", (vel_units, [], None)),
("particle_velocity_z", (vel_units, [], None)),
("particle_mass", ("code_mass", [], None)),
("particle_identifier", ("", ["particle_index"], None)),
("particle_birth_time", ("code_time", ["age"], None)),
("BH_real_accretion", ("code_mass/code_time", [], None)),
("BH_bondi_accretion", ("code_mass/code_time", [], None)),
("BH_eddington_accretion", ("code_mass/code_time", [], None)),
("BH_esave", (ener_units, [], None)),
("gas_spin_x", (ang_mom_units, [], None)),
("gas_spin_y", (ang_mom_units, [], None)),
("gas_spin_z", (ang_mom_units, [], None)),
("BH_spin_x", ("", [], None)),
("BH_spin_y", ("", [], None)),
("BH_spin_z", ("", [], None)),
("BH_spin", (ang_mom_units, [], None)),
("BH_efficiency", ("", [], None)),
)
[docs]
def setup_particle_fields(self, ptype):
super().setup_particle_fields(ptype)
def star_age_from_conformal_cosmo(field, data):
conformal_age = data[ptype, "conformal_birth_time"]
birth_time = convert_ramses_conformal_time_to_physical_time(
data.ds, conformal_age
)
return data.ds.current_time - birth_time
def star_age_from_physical_cosmo(field, data):
H0 = float(
data.ds.quan(data.ds.hubble_constant * 100, "km/s/Mpc").to("1/Gyr")
)
times = data[ptype, "conformal_birth_time"].value
time_tot = float(t_frw(data.ds, 0) * H0)
birth_time = (time_tot + times) / H0
t_out = float(data.ds.current_time.to("Gyr"))
return data.apply_units(t_out - birth_time, "Gyr")
def star_age(field, data):
formation_time = data[ptype, "particle_birth_time"]
return data.ds.current_time - formation_time
if self.ds.cosmological_simulation and self.ds.use_conformal_time:
fun = star_age_from_conformal_cosmo
elif self.ds.cosmological_simulation:
fun = star_age_from_physical_cosmo
else:
fun = star_age
self.add_field(
(ptype, "star_age"),
sampling_type="particle",
function=fun,
units=self.ds.unit_system["time"],
)
[docs]
def setup_fluid_fields(self):
def _temperature_over_mu(field, data):
rv = data["gas", "pressure"] / data["gas", "density"]
rv *= mass_hydrogen_cgs / boltzmann_constant_cgs
return rv
self.add_field(
("gas", "temperature_over_mu"),
sampling_type="cell",
function=_temperature_over_mu,
units=self.ds.unit_system["temperature"],
)
found_cooling_fields = self.create_cooling_fields()
if found_cooling_fields:
def _temperature(field, data):
return data["gas", "temperature_over_mu"] * data["gas", "mu"]
else:
def _temperature(field, data):
if not isinstance(data, FieldDetector):
warnings.warn(
"Trying to calculate temperature but the cooling tables "
"couldn't be found or read. yt will return T/µ instead of "
"T — this is equivalent to assuming µ=1.0. To suppress this, "
"derive the temperature from temperature_over_mu with "
"some values for mu.",
category=RuntimeWarning,
stacklevel=1,
)
return data["gas", "temperature_over_mu"]
self.add_field(
("gas", "temperature"),
sampling_type="cell",
function=_temperature,
units=self.ds.unit_system["temperature"],
)
self.species_names = [
known_species_names[fn]
for ft, fn in self.field_list
if fn in known_species_names
]
# See if we need to load the rt fields
rt_flag = RTFieldFileHandler.any_exist(self.ds)
if rt_flag: # rt run
self.create_rt_fields()
# Load magnetic fields
if ("gas", "magnetic_field_x_left") in self:
self.create_magnetic_fields()
# Potential field
if ("gravity", "Potential") in self:
self.create_gravity_fields()
[docs]
def create_gravity_fields(self):
def potential_energy(field, data):
return data["gas", "potential"] * data["gas", "cell_mass"]
self.add_field(
("gas", "potential_energy"),
sampling_type="cell",
function=potential_energy,
units=self.ds.unit_system["energy"],
)
[docs]
def create_magnetic_fields(self):
# Calculate cell-centred magnetic fields from face-centred
def mag_field(ax):
def _mag_field(field, data):
return (
data["gas", f"magnetic_field_{ax}_left"]
+ data["gas", f"magnetic_field_{ax}_right"]
) / 2
return _mag_field
for ax in self.ds.coordinates.axis_order:
self.add_field(
("gas", f"magnetic_field_{ax}"),
sampling_type="cell",
function=mag_field(ax),
units=self.ds.unit_system["magnetic_field_cgs"],
)
def _divB(field, data):
"""Calculate magnetic field divergence"""
out = np.zeros_like(data["gas", "magnetic_field_x_right"])
for ax in data.ds.coordinates.axis_order:
out += (
data["gas", f"magnetic_field_{ax}_right"]
- data["gas", f"magnetic_field_{ax}_left"]
)
return out / data["gas", "dx"]
self.add_field(
("gas", "magnetic_field_divergence"),
sampling_type="cell",
function=_divB,
units=self.ds.unit_system["magnetic_field_cgs"]
/ self.ds.unit_system["length"],
)
[docs]
def create_rt_fields(self):
self.ds.fluid_types += ("rt",)
p = RTFieldFileHandler.get_rt_parameters(self.ds).copy()
p.update(self.ds.parameters)
ngroups = p["nGroups"]
# Make sure rt_c_frac is at least as long as the number of levels in
# the simulation. Pad with either 1 (default) when using level-dependent
# reduced speed of light, otherwise pad with a constant value
if len(p["rt_c_frac"]) == 1:
pad_value = p["rt_c_frac"][0]
else:
pad_value = 1
rt_c_frac = np.pad(
p["rt_c_frac"],
(0, max(0, self.ds.max_level - len(["rt_c_frac"]) + 1)),
constant_values=pad_value,
)
rt_c = rt_c_frac * units.c / (p["unit_l"] / p["unit_t"])
dens_conv = (p["unit_np"] / rt_c).value / units.cm**3
########################################
# Adding the fields in the hydro_* files
def _temp_IR(field, data):
rv = data["gas", "pres_IR"] / data["gas", "density"]
rv *= mass_hydrogen_cgs / boltzmann_constant_cgs
return rv
self.add_field(
("gas", "temp_IR"),
sampling_type="cell",
function=_temp_IR,
units=self.ds.unit_system["temperature"],
)
def _species_density(field, data, species: str):
return data["gas", f"{species}_fraction"] * data["gas", "density"]
def _species_mass(field, data, species: str):
return data["gas", f"{species}_density"] * data["index", "cell_volume"]
for species in ["H_p1", "He_p1", "He_p2"]:
self.add_field(
("gas", species + "_density"),
sampling_type="cell",
function=partial(_species_density, species=species),
units=self.ds.unit_system["density"],
)
self.add_field(
("gas", species + "_mass"),
sampling_type="cell",
function=partial(_species_mass, species=species),
units=self.ds.unit_system["mass"],
)
########################################
# Adding the fields in the rt_ files
def gen_pdens(igroup):
def _photon_density(field, data):
# The photon density depends on the possibly level-dependent conversion factor.
ilvl = data["index", "grid_level"].astype("int64")
dc = dens_conv[ilvl]
rv = data["ramses-rt", f"Photon_density_{igroup + 1}"] * dc
return rv
return _photon_density
for igroup in range(ngroups):
self.add_field(
("rt", f"photon_density_{igroup + 1}"),
sampling_type="cell",
function=gen_pdens(igroup),
units=self.ds.unit_system["number_density"],
)
flux_conv = p["unit_pf"] / units.cm**2 / units.s
flux_unit = (
1 / self.ds.unit_system["time"] / self.ds.unit_system["length"] ** 2
).units
def gen_flux(key, igroup):
def _photon_flux(field, data):
rv = data["ramses-rt", f"Photon_flux_{key}_{igroup + 1}"] * flux_conv
return rv
return _photon_flux
for key in "xyz":
for igroup in range(ngroups):
self.add_field(
("rt", f"photon_flux_{key}_{igroup + 1}"),
sampling_type="cell",
function=gen_flux(key, igroup),
units=flux_unit,
)
[docs]
def create_cooling_fields(self) -> bool:
"Create cooling fields from the cooling files. Return True if successful."
num = os.path.basename(self.ds.parameter_filename).split(".")[0].split("_")[1]
filename = "%s/cooling_%05i.out" % (
os.path.dirname(self.ds.parameter_filename),
int(num),
)
if not os.path.exists(filename):
mylog.warning("This output has no cooling fields")
return False
# Function to create the cooling fields
def _create_field(name, interp_object, unit):
def _func(field, data):
shape = data["gas", "temperature_over_mu"].shape
# Ramses assumes a fraction X of Hydrogen within the non-metal gas.
# It has to be corrected by metallicity.
Z = data["gas", "metallicity"]
nH = ((1 - _Y) * (1 - Z) * data["gas", "density"] / mh).to("cm**-3")
if data.ds.self_shielding:
boost = np.maximum(np.exp(-nH / 0.01), 1e-20)
else:
boost = 1
d = {
"lognH": np.log10(nH / boost).ravel(),
"logT": np.log10(data["gas", "temperature_over_mu"]).ravel(),
}
rv = interp_object(d).reshape(shape)
if name[-1] != "mu":
rv = 10 ** interp_object(d).reshape(shape)
cool = data.ds.arr(rv, unit)
if "metal" in name[-1].split("_"):
cool = (
cool * data["gas", "metallicity"] / 0.02
) # Ramses uses Zsolar=0.02
elif "compton" in name[-1].split("_"):
cool = data.ds.arr(rv, unit + "/cm**3")
cool = (
cool / data["gas", "number_density"]
) # Compton cooling/heating is written to file in erg/s
return cool
self.add_field(name=name, sampling_type="cell", function=_func, units=unit)
# Load cooling files
avals = {}
tvals = {}
with FortranFile(filename) as fd:
n1, n2 = fd.read_vector("i")
for ax in _cool_axes:
avals[ax] = fd.read_vector("d")
for i, (tname, unit) in enumerate(_cool_arrs):
var = fd.read_vector("d")
if var.size == n1 and i == 0:
# If this case occurs, the cooling files were produced pre-2010 in
# a format that is no longer supported
mylog.warning(
"This cooling file format is no longer supported. "
"Cooling field loading skipped."
)
return False
if var.size == n1 * n2:
tvals[tname] = {
"data": var.reshape((n1, n2), order="F"),
"unit": unit,
}
else:
var = var.reshape((n1, n2, var.size // (n1 * n2)), order="F")
for i in range(var.shape[-1]):
tvals[_cool_species[i]] = {
"data": var[:, :, i],
"unit": "1/cm**3",
}
# Add the mu field first, as it is needed for the number density
interp = BilinearFieldInterpolator(
tvals["mu"]["data"],
(avals["lognH"], avals["logT"]),
["lognH", "logT"],
truncate=True,
)
_create_field(("gas", "mu"), interp, "dimensionless")
# Add the number density field, based on mu
def _number_density(field, data):
return data["gas", "density"] / mp / data["gas", "mu"]
self.add_field(
name=("gas", "number_density"),
sampling_type="cell",
function=_number_density,
units=number_density_unit,
)
# Add the cooling and heating fields, which need the number density field
for key in tvals:
if key != "mu":
interp = BilinearFieldInterpolator(
tvals[key]["data"],
(avals["lognH"], avals["logT"]),
["lognH", "logT"],
truncate=True,
)
_create_field(("gas", key), interp, tvals[key]["unit"])
# Add total cooling and heating fields
def _all_cool(field, data):
return (
data["gas", "cooling_primordial"]
+ data["gas", "cooling_metal"]
+ data["gas", "cooling_compton"]
)
def _all_heat(field, data):
return data["gas", "heating_primordial"] + data["gas", "heating_compton"]
self.add_field(
name=("gas", "cooling_total"),
sampling_type="cell",
function=_all_cool,
units=cooling_function_units,
)
self.add_field(
name=("gas", "heating_total"),
sampling_type="cell",
function=_all_heat,
units=cooling_function_units,
)
# Add net cooling fields
def _net_cool(field, data):
return data["gas", "cooling_total"] - data["gas", "heating_total"]
self.add_field(
name=("gas", "cooling_net"),
sampling_type="cell",
function=_net_cool,
units=cooling_function_units,
)
return True