import os
from functools import partial
import numpy as np
from yt import units
from yt._typing import KnownFieldsT
from yt.fields.field_info_container import FieldInfoContainer
from yt.frontends.ramses.io import convert_ramses_conformal_time_to_physical_age
from yt.utilities.cython_fortran_utils import FortranFile
from yt.utilities.linear_interpolators import BilinearFieldInterpolator
from yt.utilities.logger import ytLogger as mylog
from yt.utilities.physical_constants import (
boltzmann_constant_cgs,
mass_hydrogen_cgs,
mh,
mp,
)
from .field_handlers import RTFieldFileHandler
b_units = "code_magnetic"
ra_units = "code_length / code_time**2"
rho_units = "code_density"
vel_units = "code_velocity"
pressure_units = "code_pressure"
ener_units = "code_mass * code_velocity**2"
specific_ener_units = "code_velocity**2"
ang_mom_units = "code_mass * code_velocity * code_length"
cooling_function_units = " erg * cm**3 /s"
cooling_function_prime_units = " erg * cm**3 /s/K"
flux_unit = "1 / code_length**2 / code_time"
number_density_unit = "1 / code_length**3"
known_species_masses = {
sp: mh * v
for sp, v in [
("HI", 1.0),
("HII", 1.0),
("Electron", 1.0),
("HeI", 4.0),
("HeII", 4.0),
("HeIII", 4.0),
("H2I", 2.0),
("H2II", 2.0),
("HM", 1.0),
("DI", 2.0),
("DII", 2.0),
("HDI", 3.0),
]
}
known_species_names = {
"HI": "H_p0",
"HII": "H_p1",
"Electron": "El",
"HeI": "He_p0",
"HeII": "He_p1",
"HeIII": "He_p2",
"H2I": "H2_p0",
"H2II": "H2_p1",
"HM": "H_m1",
"DI": "D_p0",
"DII": "D_p1",
"HDI": "HD_p0",
}
_cool_axes = ("lognH", "logT") # , "logTeq")
_cool_arrs = (
("cooling_primordial", cooling_function_units),
("heating_primordial", cooling_function_units),
("cooling_compton", cooling_function_units),
("heating_compton", cooling_function_units),
("cooling_metal", cooling_function_units),
("cooling_primordial_prime", cooling_function_prime_units),
("heating_primordial_prime", cooling_function_prime_units),
("cooling_compton_prime", cooling_function_prime_units),
("heating_compton_prime", cooling_function_prime_units),
("cooling_metal_prime", cooling_function_prime_units),
("mu", None),
("abundances", None),
)
_cool_species = (
"Electron_number_density",
"HI_number_density",
"HII_number_density",
"HeI_number_density",
"HeII_number_density",
"HeIII_number_density",
)
_X = 0.76 # H fraction, hardcoded
_Y = 0.24 # He fraction, hardcoded
[docs]
class RAMSESFieldInfo(FieldInfoContainer):
known_other_fields: KnownFieldsT = (
("Density", (rho_units, ["density"], None)),
("x-velocity", (vel_units, ["velocity_x"], None)),
("y-velocity", (vel_units, ["velocity_y"], None)),
("z-velocity", (vel_units, ["velocity_z"], None)),
("Pres_IR", (pressure_units, ["pres_IR", "pressure_IR"], None)),
("Pressure", (pressure_units, ["pressure"], None)),
("Metallicity", ("", ["metallicity"], None)),
("HII", ("", ["H_p1_fraction"], None)),
("HeII", ("", ["He_p1_fraction"], None)),
("HeIII", ("", ["He_p2_fraction"], None)),
("x-acceleration", (ra_units, ["acceleration_x"], None)),
("y-acceleration", (ra_units, ["acceleration_y"], None)),
("z-acceleration", (ra_units, ["acceleration_z"], None)),
("Potential", (specific_ener_units, ["potential"], None)),
("B_x_left", (b_units, ["magnetic_field_x_left"], None)),
("B_x_right", (b_units, ["magnetic_field_x_right"], None)),
("B_y_left", (b_units, ["magnetic_field_y_left"], None)),
("B_y_right", (b_units, ["magnetic_field_y_right"], None)),
("B_z_left", (b_units, ["magnetic_field_z_left"], None)),
("B_z_right", (b_units, ["magnetic_field_z_right"], None)),
)
known_particle_fields: KnownFieldsT = (
("particle_position_x", ("code_length", [], None)),
("particle_position_y", ("code_length", [], None)),
("particle_position_z", ("code_length", [], None)),
("particle_velocity_x", (vel_units, [], None)),
("particle_velocity_y", (vel_units, [], None)),
("particle_velocity_z", (vel_units, [], None)),
("particle_mass", ("code_mass", [], None)),
("particle_identity", ("", ["particle_index"], None)),
("particle_refinement_level", ("", [], None)),
("particle_birth_time", ("code_time", ["age"], None)),
("conformal_birth_time", ("", [], None)),
("particle_metallicity", ("", [], None)),
("particle_family", ("", [], None)),
("particle_tag", ("", [], None)),
# sink field parameters
("particle_mass", ("code_mass", [], None)),
("particle_angular_momentum_x", (ang_mom_units, [], None)),
("particle_angular_momentum_y", (ang_mom_units, [], None)),
("particle_angular_momentum_z", (ang_mom_units, [], None)),
("particle_formation_time", ("code_time", [], None)),
("particle_accretion_Rate", ("code_mass/code_time", [], None)),
("particle_delta_mass", ("code_mass", [], None)),
("particle_rho_gas", (rho_units, [], None)),
("particle_cs**2", (vel_units, [], None)),
("particle_etherm", (ener_units, [], None)),
("particle_velocity_x_gas", (vel_units, [], None)),
("particle_velocity_y_gas", (vel_units, [], None)),
("particle_velocity_z_gas", (vel_units, [], None)),
("particle_mass_bh", ("code_mass", [], None)),
("particle_level", ("", [], None)),
("particle_radius_star", ("code_length", [], None)),
)
known_sink_fields: KnownFieldsT = (
("particle_position_x", ("code_length", [], None)),
("particle_position_y", ("code_length", [], None)),
("particle_position_z", ("code_length", [], None)),
("particle_velocity_x", (vel_units, [], None)),
("particle_velocity_y", (vel_units, [], None)),
("particle_velocity_z", (vel_units, [], None)),
("particle_mass", ("code_mass", [], None)),
("particle_identifier", ("", ["particle_index"], None)),
("particle_birth_time", ("code_time", ["age"], None)),
("BH_real_accretion", ("code_mass/code_time", [], None)),
("BH_bondi_accretion", ("code_mass/code_time", [], None)),
("BH_eddington_accretion", ("code_mass/code_time", [], None)),
("BH_esave", (ener_units, [], None)),
("gas_spin_x", (ang_mom_units, [], None)),
("gas_spin_y", (ang_mom_units, [], None)),
("gas_spin_z", (ang_mom_units, [], None)),
("BH_spin_x", ("", [], None)),
("BH_spin_y", ("", [], None)),
("BH_spin_z", ("", [], None)),
("BH_spin", (ang_mom_units, [], None)),
("BH_efficiency", ("", [], None)),
)
[docs]
def setup_particle_fields(self, ptype):
super().setup_particle_fields(ptype)
def star_age(field, data):
if data.ds.cosmological_simulation:
conformal_age = data[ptype, "conformal_birth_time"]
physical_age = convert_ramses_conformal_time_to_physical_age(
data.ds, conformal_age
)
return data.ds.arr(physical_age, "code_time")
else:
formation_time = data[ptype, "particle_birth_time"]
return data.ds.current_time - formation_time
self.add_field(
(ptype, "star_age"),
sampling_type="particle",
function=star_age,
units=self.ds.unit_system["time"],
)
[docs]
def setup_fluid_fields(self):
def _temperature(field, data):
rv = data["gas", "pressure"] / data["gas", "density"]
rv *= mass_hydrogen_cgs / boltzmann_constant_cgs
return rv
self.add_field(
("gas", "temperature"),
sampling_type="cell",
function=_temperature,
units=self.ds.unit_system["temperature"],
)
self.create_cooling_fields()
self.species_names = [
known_species_names[fn]
for ft, fn in self.field_list
if fn in known_species_names
]
# See if we need to load the rt fields
rt_flag = RTFieldFileHandler.any_exist(self.ds)
if rt_flag: # rt run
self.create_rt_fields()
# Load magnetic fields
if ("gas", "magnetic_field_x_left") in self:
self.create_magnetic_fields()
# Potential field
if ("gravity", "Potential") in self:
self.create_gravity_fields()
[docs]
def create_gravity_fields(self):
def potential_energy(field, data):
return data["gas", "potential"] * data["gas", "cell_mass"]
self.add_field(
("gas", "potential_energy"),
sampling_type="cell",
function=potential_energy,
units=self.ds.unit_system["energy"],
)
[docs]
def create_magnetic_fields(self):
# Calculate cell-centred magnetic fields from face-centred
def mag_field(ax):
def _mag_field(field, data):
return (
data[("gas", f"magnetic_field_{ax}_left")]
+ data[("gas", f"magnetic_field_{ax}_right")]
) / 2
return _mag_field
for ax in self.ds.coordinates.axis_order:
self.add_field(
("gas", f"magnetic_field_{ax}"),
sampling_type="cell",
function=mag_field(ax),
units=self.ds.unit_system["magnetic_field_cgs"],
)
def _divB(field, data):
"""Calculate magnetic field divergence"""
out = np.zeros_like(data[("gas", "magnetic_field_x_right")])
for ax in data.ds.coordinates.axis_order:
out += (
data[("gas", f"magnetic_field_{ax}_right")]
- data[("gas", f"magnetic_field_{ax}_left")]
)
return out / data[("gas", "dx")]
self.add_field(
("gas", "magnetic_field_divergence"),
sampling_type="cell",
function=_divB,
units=self.ds.unit_system["magnetic_field_cgs"]
/ self.ds.unit_system["length"],
)
[docs]
def create_rt_fields(self):
self.ds.fluid_types += ("rt",)
p = RTFieldFileHandler.get_rt_parameters(self.ds).copy()
p.update(self.ds.parameters)
ngroups = p["nGroups"]
# Make sure rt_c_frac is at least as long as the number of levels in
# the simulation. Pad with either 1 (default) when using level-dependent
# reduced speed of light, otherwise pad with a constant value
if len(p["rt_c_frac"]) == 1:
pad_value = p["rt_c_frac"][0]
else:
pad_value = 1
rt_c_frac = np.pad(
p["rt_c_frac"],
(0, max(0, self.ds.max_level - len(["rt_c_frac"]) + 1)),
constant_values=pad_value,
)
rt_c = rt_c_frac * units.c / (p["unit_l"] / p["unit_t"])
dens_conv = (p["unit_np"] / rt_c).value / units.cm**3
########################################
# Adding the fields in the hydro_* files
def _temp_IR(field, data):
rv = data["gas", "pres_IR"] / data["gas", "density"]
rv *= mass_hydrogen_cgs / boltzmann_constant_cgs
return rv
self.add_field(
("gas", "temp_IR"),
sampling_type="cell",
function=_temp_IR,
units=self.ds.unit_system["temperature"],
)
def _species_density(field, data, species: str):
return data["gas", f"{species}_fraction"] * data["gas", "density"]
def _species_mass(field, data, species: str):
return data["gas", f"{species}_density"] * data["index", "cell_volume"]
for species in ["H_p1", "He_p1", "He_p2"]:
self.add_field(
("gas", species + "_density"),
sampling_type="cell",
function=partial(_species_density, species=species),
units=self.ds.unit_system["density"],
)
self.add_field(
("gas", species + "_mass"),
sampling_type="cell",
function=partial(_species_mass, species=species),
units=self.ds.unit_system["mass"],
)
########################################
# Adding the fields in the rt_ files
def gen_pdens(igroup):
def _photon_density(field, data):
# The photon density depends on the possibly level-dependent conversion factor.
ilvl = data["index", "grid_level"].astype("int64")
dc = dens_conv[ilvl]
rv = data["ramses-rt", f"Photon_density_{igroup + 1}"] * dc
return rv
return _photon_density
for igroup in range(ngroups):
self.add_field(
("rt", f"photon_density_{igroup + 1}"),
sampling_type="cell",
function=gen_pdens(igroup),
units=self.ds.unit_system["number_density"],
)
flux_conv = p["unit_pf"] / units.cm**2 / units.s
flux_unit = (
1 / self.ds.unit_system["time"] / self.ds.unit_system["length"] ** 2
).units
def gen_flux(key, igroup):
def _photon_flux(field, data):
rv = data["ramses-rt", f"Photon_flux_{key}_{igroup + 1}"] * flux_conv
return rv
return _photon_flux
for key in "xyz":
for igroup in range(ngroups):
self.add_field(
("rt", f"photon_flux_{key}_{igroup + 1}"),
sampling_type="cell",
function=gen_flux(key, igroup),
units=flux_unit,
)
[docs]
def create_cooling_fields(self):
num = os.path.basename(self.ds.parameter_filename).split(".")[0].split("_")[1]
filename = "%s/cooling_%05i.out" % (
os.path.dirname(self.ds.parameter_filename),
int(num),
)
if not os.path.exists(filename):
mylog.warning("This output has no cooling fields")
return
# Function to create the cooling fields
def _create_field(name, interp_object, unit):
def _func(field, data):
shape = data[("gas", "temperature")].shape
d = {
"lognH": np.log10(_X * data[("gas", "density")] / mh).ravel(),
"logT": np.log10(data[("gas", "temperature")]).ravel(),
}
rv = interp_object(d).reshape(shape)
if name[-1] != "mu":
rv = 10 ** interp_object(d).reshape(shape)
cool = data.ds.arr(rv, unit)
if "metal" in name[-1].split("_"):
cool = (
cool * data[("gas", "metallicity")] / 0.02
) # Ramses uses Zsolar=0.02
elif "compton" in name[-1].split("_"):
cool = data.ds.arr(rv, unit + "/cm**3")
cool = (
cool / data[("gas", "number_density")]
) # Compton cooling/heating is written to file in erg/s
return cool
self.add_field(name=name, sampling_type="cell", function=_func, units=unit)
# Load cooling files
avals = {}
tvals = {}
with FortranFile(filename) as fd:
n1, n2 = fd.read_vector("i")
for ax in _cool_axes:
avals[ax] = fd.read_vector("d")
for i, (tname, unit) in enumerate(_cool_arrs):
var = fd.read_vector("d")
if var.size == n1 and i == 0:
# If this case occurs, the cooling files were produced pre-2010 in
# a format that is no longer supported
mylog.warning(
"This cooling file format is no longer supported. "
"Cooling field loading skipped."
)
return
if var.size == n1 * n2:
tvals[tname] = {
"data": var.reshape((n1, n2), order="F"),
"unit": unit,
}
else:
var = var.reshape((n1, n2, var.size // (n1 * n2)), order="F")
for i in range(var.shape[-1]):
tvals[_cool_species[i]] = {
"data": var[:, :, i],
"unit": "1/cm**3",
}
# Add the mu field first, as it is needed for the number density
interp = BilinearFieldInterpolator(
tvals["mu"]["data"],
(avals["lognH"], avals["logT"]),
["lognH", "logT"],
truncate=True,
)
_create_field(("gas", "mu"), interp, tvals["mu"]["unit"])
# Add the number density field, based on mu
def _number_density(field, data):
return data[("gas", "density")] / mp / data[("gas", "mu")]
self.add_field(
name=("gas", "number_density"),
sampling_type="cell",
function=_number_density,
units=number_density_unit,
)
# Add the cooling and heating fields, which need the number density field
for key in tvals:
if key != "mu":
interp = BilinearFieldInterpolator(
tvals[key]["data"],
(avals["lognH"], avals["logT"]),
["lognH", "logT"],
truncate=True,
)
_create_field(("gas", key), interp, tvals[key]["unit"])
# Add total cooling and heating fields
def _all_cool(field, data):
return (
data[("gas", "cooling_primordial")]
+ data[("gas", "cooling_metal")]
+ data[("gas", "cooling_compton")]
)
def _all_heat(field, data):
return (
data[("gas", "heating_primordial")] + data[("gas", "heating_compton")]
)
self.add_field(
name=("gas", "cooling_total"),
sampling_type="cell",
function=_all_cool,
units=cooling_function_units,
)
self.add_field(
name=("gas", "heating_total"),
sampling_type="cell",
function=_all_heat,
units=cooling_function_units,
)
# Add net cooling fields
def _net_cool(field, data):
return data[("gas", "cooling_total")] - data[("gas", "heating_total")]
self.add_field(
name=("gas", "cooling_net"),
sampling_type="cell",
function=_net_cool,
units=cooling_function_units,
)