Source code for yt.frontends.parthenon.fields

import numpy as np

from yt._typing import KnownFieldsT
from yt.fields.field_info_container import FieldInfoContainer
from yt.funcs import mylog
from yt.utilities.physical_constants import kboltz, mh

mag_units = "code_magnetic"
pres_units = "code_mass/(code_length*code_time**2)"
rho_units = "code_mass / code_length**3"
vel_units = "code_length / code_time"
mom_units = "code_mass / code_length**2 / code_time"
eng_units = "code_mass / code_length / code_time**2"


[docs] def velocity_field(mom_field): def _velocity(field, data): return data[mom_field] / data["gas", "density"] return _velocity
def _cooling_time_field(field, data): cooling_time = ( data["gas", "density"] * data["gas", "specific_thermal_energy"] / data["gas", "cooling_rate"] ) # Set cooling time where Cooling_Rate==0 to infinity inf_ct_mask = data["cooling_rate"] == 0 cooling_time[inf_ct_mask] = data.ds.quan(np.inf, "s") return cooling_time
[docs] class ParthenonFieldInfo(FieldInfoContainer): known_other_fields: KnownFieldsT = ( # Need to provide info for both primitive and conserved variable as they # can be written indepdendently (or even in the same output file). # New field naming (i.e., "variable_component") of primitive variables ("prim_density", (rho_units, ["density"], None)), ("prim_velocity_1", (vel_units, ["velocity_x"], None)), ("prim_velocity_2", (vel_units, ["velocity_y"], None)), ("prim_velocity_3", (vel_units, ["velocity_z"], None)), ("prim_pressure", (pres_units, ["pressure"], None)), # Magnetic fields carry units of 1/sqrt(pi) so we cannot directly forward # and need to setup aliases below. ("prim_magnetic_field_1", (mag_units, [], None)), ("prim_magnetic_field_2", (mag_units, [], None)), ("prim_magnetic_field_3", (mag_units, [], None)), # New field naming (i.e., "variable_component") of conserved variables ("cons_density", (rho_units, ["density"], None)), ("cons_momentum_density_1", (mom_units, ["momentum_density_x"], None)), ("cons_momentum_density_2", (mom_units, ["momentum_density_y"], None)), ("cons_momentum_density_3", (mom_units, ["momentum_density_z"], None)), ("cons_total_energy_density", (eng_units, ["total_energy_density"], None)), # Magnetic fields carry units of 1/sqrt(pi) so we cannot directly forward # and need to setup aliases below. ("cons_magnetic_field_1", (mag_units, [], None)), ("cons_magnetic_field_2", (mag_units, [], None)), ("cons_magnetic_field_3", (mag_units, [], None)), # Legacy naming. Given that there's no conflict with the names above, # we can just define those here so that the frontend works with older data. ("Density", (rho_units, ["density"], None)), ("Velocity1", (mom_units, ["velocity_x"], None)), ("Velocity2", (mom_units, ["velocity_y"], None)), ("Velocity3", (mom_units, ["velocity_z"], None)), ("Pressure", (pres_units, ["pressure"], None)), ("MagneticField1", (mag_units, [], None)), ("MagneticField2", (mag_units, [], None)), ("MagneticField3", (mag_units, [], None)), ("MomentumDensity1", (mom_units, ["momentum_density_x"], None)), ("MomentumDensity2", (mom_units, ["momentum_density_y"], None)), ("MomentumDensity3", (mom_units, ["momentum_density_z"], None)), ("TotalEnergyDensity", (eng_units, ["total_energy_density"], None)), )
[docs] def setup_fluid_fields(self): from yt.fields.magnetic_field import setup_magnetic_field_aliases unit_system = self.ds.unit_system # Add velocity fields (if only momemtum densities are given) for i, comp in enumerate(self.ds.coordinates.axis_order): # Support both current and legacy scheme for mom_field_name in ["MomentumDensity", "cons_momentum_density_"]: mom_field = ("parthenon", f"{mom_field_name}{i+1}") if mom_field in self.field_list: self.add_field( ("gas", f"velocity_{comp}"), sampling_type="cell", function=velocity_field(mom_field), units=unit_system["velocity"], ) # Figure out thermal energy field if ("parthenon", "Pressure") in self.field_list or ( "parthenon", "prim_pressure", ) in self.field_list: # only show warning for non-AthenaPK codes if "Hydro/AdiabaticIndex" not in self.ds.parameters: mylog.warning( f"Adding a specific thermal energy field assuming an ideal gas with an " f"adiabatic index of {self.ds.gamma}" ) def _specific_thermal_energy(field, data): return ( data["gas", "pressure"] / (data.ds.gamma - 1.0) / data["gas", "density"] ) self.add_field( ("gas", "specific_thermal_energy"), sampling_type="cell", function=_specific_thermal_energy, units=unit_system["specific_energy"], ) elif ("parthenon", "TotalEnergyDensity") in self.field_list or ( "parthenon", "cons_total_energy_density", ) in self.field_list: def _specific_thermal_energy(field, data): eint = ( data["gas", "total_energy_density"] - data["gas", "kinetic_energy_density"] ) if ( ("parthenon", "MagneticField1") in self.field_list or ("parthenon", "prim_magnetic_field_1") in self.field_list or ("parthenon", "cons_magnetic_field_1") in self.field_list ): eint -= data["gas", "magnetic_energy_density"] return eint / data["gas", "density"] self.add_field( ("gas", "specific_thermal_energy"), sampling_type="cell", function=_specific_thermal_energy, units=unit_system["specific_energy"], ) # Add temperature field def _temperature(field, data): return ( (data["gas", "pressure"] / data["gas", "density"]) * data.ds.mu * mh / kboltz ) self.add_field( ("gas", "temperature"), sampling_type="cell", function=_temperature, units=unit_system["temperature"], ) # We can simply all all variants as only fields present will be added setup_magnetic_field_aliases( self, "parthenon", ["MagneticField%d" % ax for ax in (1, 2, 3)] ) setup_magnetic_field_aliases( self, "parthenon", ["prim_magnetic_field_%d" % ax for ax in (1, 2, 3)] ) setup_magnetic_field_aliases( self, "parthenon", ["cons_magnetic_field_%d" % ax for ax in (1, 2, 3)] )