Source code for yt.frontends.chombo.fields

import numpy as np

from yt._typing import KnownFieldsT
from yt.fields.field_info_container import (
    FieldInfoContainer,
    particle_deposition_functions,
    particle_vector_functions,
    standard_particle_fields,
)
from yt.units.unit_object import Unit  # type: ignore
from yt.utilities.exceptions import YTFieldNotFound

rho_units = "code_mass / code_length**3"
mom_units = "code_mass / (code_time * code_length**2)"
eden_units = "code_mass / (code_time**2 * code_length)"  # erg / cm^3
vel_units = "code_length / code_time"
b_units = "code_magnetic"


[docs] class ChomboFieldInfo(FieldInfoContainer): # no custom behaviour is needed yet pass
# Orion 2 Fields # We duplicate everything here from Boxlib, because we want to be able to # subclass it and that can be somewhat tricky.
[docs] class Orion2FieldInfo(ChomboFieldInfo): known_other_fields: KnownFieldsT = ( ("density", (rho_units, ["density"], None)), ("energy-density", (eden_units, ["total_energy_density"], None)), ("radiation-energy-density", (eden_units, ["radiation_energy_density"], None)), ("X-momentum", (mom_units, ["momentum_density_x"], None)), ("Y-momentum", (mom_units, ["momentum_density_y"], None)), ("Z-momentum", (mom_units, ["momentum_density_z"], None)), ("temperature", ("K", ["temperature"], None)), ("X-magnfield", (b_units, [], None)), ("Y-magnfield", (b_units, [], None)), ("Z-magnfield", (b_units, [], None)), ("directrad-dedt-density", (eden_units, ["directrad-dedt-density"], None)), ("directrad-dpxdt-density", (mom_units, ["directrad-dpxdt-density"], None)), ("directrad-dpydt-density", (mom_units, ["directrad-dpydt-density"], None)), ("directrad-dpzdt-density", (mom_units, ["directrad-dpzdt-density"], None)), ) known_particle_fields: KnownFieldsT = ( ("particle_mass", ("code_mass", [], None)), ("particle_position_x", ("code_length", [], None)), ("particle_position_y", ("code_length", [], None)), ("particle_position_z", ("code_length", [], None)), ("particle_momentum_x", ("code_mass*code_length/code_time", [], None)), ("particle_momentum_y", ("code_mass*code_length/code_time", [], None)), ("particle_momentum_z", ("code_mass*code_length/code_time", [], None)), # Note that these are *internal* agmomen ("particle_angmomen_x", ("code_length**2/code_time", [], None)), ("particle_angmomen_y", ("code_length**2/code_time", [], None)), ("particle_angmomen_z", ("code_length**2/code_time", [], None)), ("particle_mlast", ("code_mass", [], None)), ("particle_r", ("code_length", [], None)), ("particle_mdeut", ("code_mass", [], None)), ("particle_n", ("", [], None)), ("particle_mdot", ("code_mass/code_time", [], None)), ("particle_burnstate", ("", [], None)), ("particle_luminosity", ("", [], None)), ("particle_id", ("", ["particle_index"], None)), )
[docs] def setup_particle_fields(self, ptype): def _get_vel(axis): def velocity(field, data): return ( data[(ptype, f"particle_momentum_{axis}")] / data[(ptype, "particle_mass")] ) return velocity for ax in "xyz": self.add_field( (ptype, f"particle_velocity_{ax}"), sampling_type="particle", function=_get_vel(ax), units="code_length/code_time", ) super().setup_particle_fields(ptype)
[docs] def setup_fluid_fields(self): from yt.fields.magnetic_field import setup_magnetic_field_aliases unit_system = self.ds.unit_system def _thermal_energy_density(field, data): try: return ( data[("chombo", "energy-density")] - data[("gas", "kinetic_energy_density")] - data[("gas", "magnetic_energy_density")] ) except YTFieldNotFound: return ( data[("chombo", "energy-density")] - data[("gas", "kinetic_energy_density")] ) def _specific_thermal_energy(field, data): return data[("gas", "thermal_energy_density")] / data[("gas", "density")] def _magnetic_energy_density(field, data): ret = data[("chombo", "X-magnfield")] ** 2 if data.ds.dimensionality > 1: ret = ret + data[("chombo", "Y-magnfield")] ** 2 if data.ds.dimensionality > 2: ret = ret + data[("chombo", "Z-magnfield")] ** 2 return ret / 8.0 / np.pi def _specific_magnetic_energy(field, data): return data[("gas", "specific_magnetic_energy")] / data[("gas", "density")] def _kinetic_energy_density(field, data): p2 = data[("chombo", "X-momentum")] ** 2 if data.ds.dimensionality > 1: p2 = p2 + data[("chombo", "Y-momentum")] ** 2 if data.ds.dimensionality > 2: p2 = p2 + data[("chombo", "Z-momentum")] ** 2 return 0.5 * p2 / data[("gas", "density")] def _specific_kinetic_energy(field, data): return data[("gas", "kinetic_energy_density")] / data[("gas", "density")] def _temperature(field, data): c_v = data.ds.quan(data.ds.parameters["radiation.const_cv"], "erg/g/K") return data[("gas", "specific_thermal_energy")] / c_v def _get_vel(axis): def velocity(field, data): return ( data[("gas", f"momentum_density_{axis}")] / data[("gas", "density")] ) return velocity for ax in "xyz": self.add_field( ("gas", f"velocity_{ax}"), sampling_type="cell", function=_get_vel(ax), units=unit_system["velocity"], ) self.add_field( ("gas", "specific_thermal_energy"), sampling_type="cell", function=_specific_thermal_energy, units=unit_system["specific_energy"], ) self.add_field( ("gas", "thermal_energy_density"), sampling_type="cell", function=_thermal_energy_density, units=unit_system["pressure"], ) self.add_field( ("gas", "kinetic_energy_density"), sampling_type="cell", function=_kinetic_energy_density, units=unit_system["pressure"], ) self.add_field( ("gas", "specific_kinetic_energy"), sampling_type="cell", function=_specific_kinetic_energy, units=unit_system["specific_energy"], ) self.add_field( ("gas", "magnetic_energy_density"), sampling_type="cell", function=_magnetic_energy_density, units=unit_system["pressure"], ) self.add_field( ("gas", "specific_magnetic_energy"), sampling_type="cell", function=_specific_magnetic_energy, units=unit_system["specific_energy"], ) self.add_field( ("gas", "temperature"), sampling_type="cell", function=_temperature, units=unit_system["temperature"], ) setup_magnetic_field_aliases( self, "chombo", [f"{ax}-magnfield" for ax in "XYZ"] )
[docs] class ChomboPICFieldInfo3D(FieldInfoContainer): known_other_fields: KnownFieldsT = ( ("density", (rho_units, ["density", "Density"], None)), ( "potential", ("code_length**2 / code_time**2", ["potential", "Potential"], None), ), ("gravitational_field_x", ("code_length / code_time**2", [], None)), ("gravitational_field_y", ("code_length / code_time**2", [], None)), ("gravitational_field_z", ("code_length / code_time**2", [], None)), ) known_particle_fields: KnownFieldsT = ( ("particle_mass", ("code_mass", [], None)), ("particle_position_x", ("code_length", [], None)), ("particle_position_y", ("code_length", [], None)), ("particle_position_z", ("code_length", [], None)), ("particle_velocity_x", ("code_length / code_time", [], None)), ("particle_velocity_y", ("code_length / code_time", [], None)), ("particle_velocity_z", ("code_length / code_time", [], None)), ) # I am re-implementing this here to override a few default behaviors: # I don't want to skip output units for code_length and I do want # particle_fields to default to take_log = False.
[docs] def setup_particle_fields(self, ptype, ftype="gas", num_neighbors=64): skip_output_units = () for f, (units, aliases, dn) in sorted(self.known_particle_fields): units = self.ds.field_units.get((ptype, f), units) if ( f in aliases or ptype not in self.ds.particle_types_raw ) and units not in skip_output_units: u = Unit(units, registry=self.ds.unit_registry) output_units = str(u.get_cgs_equivalent()) else: output_units = units if (ptype, f) not in self.field_list: continue self.add_output_field( (ptype, f), sampling_type="particle", units=units, display_name=dn, output_units=output_units, take_log=False, ) for alias in aliases: self.alias((ptype, alias), (ptype, f), units=output_units) ppos_fields = [f"particle_position_{ax}" for ax in "xyz"] pvel_fields = [f"particle_velocity_{ax}" for ax in "xyz"] particle_vector_functions(ptype, ppos_fields, pvel_fields, self) particle_deposition_functions(ptype, "particle_position", "particle_mass", self) standard_particle_fields(self, ptype) # Now we check for any leftover particle fields for field in sorted(self.field_list): if field in self: continue if not isinstance(field, tuple): raise RuntimeError if field[0] not in self.ds.particle_types: continue self.add_output_field( field, sampling_type="particle", units=self.ds.field_units.get(field, ""), ) self.setup_smoothed_fields(ptype, num_neighbors=num_neighbors, ftype=ftype)
def _dummy_position(field, data): return 0.5 * np.ones_like(data[("all", "particle_position_x")]) def _dummy_velocity(field, data): return np.zeros_like(data[("all", "particle_velocity_x")]) def _dummy_field(field, data): return 0.0 * data[("chombo", "gravitational_field_x")] fluid_field_types = ["chombo", "gas"] particle_field_types = ["io", "all"]
[docs] class ChomboPICFieldInfo2D(ChomboPICFieldInfo3D): known_other_fields: KnownFieldsT = ( ("density", (rho_units, ["density", "Density"], None)), ( "potential", ("code_length**2 / code_time**2", ["potential", "Potential"], None), ), ("gravitational_field_x", ("code_length / code_time**2", [], None)), ("gravitational_field_y", ("code_length / code_time**2", [], None)), ) known_particle_fields: KnownFieldsT = ( ("particle_mass", ("code_mass", [], None)), ("particle_position_x", ("code_length", [], None)), ("particle_position_y", ("code_length", [], None)), ("particle_velocity_x", ("code_length / code_time", [], None)), ("particle_velocity_y", ("code_length / code_time", [], None)), ) def __init__(self, ds, field_list): super().__init__(ds, field_list) for ftype in fluid_field_types: self.add_field( (ftype, "gravitational_field_z"), sampling_type="cell", function=_dummy_field, units="code_length / code_time**2", ) for ptype in particle_field_types: self.add_field( (ptype, "particle_position_z"), sampling_type="particle", function=_dummy_position, units="code_length", ) self.add_field( (ptype, "particle_velocity_z"), sampling_type="particle", function=_dummy_velocity, units="code_length / code_time", )
[docs] class ChomboPICFieldInfo1D(ChomboPICFieldInfo3D): known_other_fields: KnownFieldsT = ( ("density", (rho_units, ["density", "Density"], None)), ( "potential", ("code_length**2 / code_time**2", ["potential", "Potential"], None), ), ("gravitational_field_x", ("code_length / code_time**2", [], None)), ) known_particle_fields: KnownFieldsT = ( ("particle_mass", ("code_mass", [], None)), ("particle_position_x", ("code_length", [], None)), ("particle_velocity_x", ("code_length / code_time", [], None)), ) def __init__(self, ds, field_list): super().__init__(ds, field_list) for ftype in fluid_field_types: self.add_field( (ftype, "gravitational_field_y"), sampling_type="cell", function=_dummy_field, units="code_length / code_time**2", ) self.add_field( (ftype, "gravitational_field_z"), sampling_type="cell", function=_dummy_field, units="code_length / code_time**2", ) for ptype in particle_field_types: self.add_field( (ptype, "particle_position_y"), sampling_type="particle", function=_dummy_position, units="code_length", ) self.add_field( (ptype, "particle_position_z"), sampling_type="particle", function=_dummy_position, units="code_length", ) self.add_field( (ptype, "particle_velocity_y"), sampling_type="particle", function=_dummy_velocity, units="code_length / code_time", ) self.add_field( (ptype, "particle_velocity_z"), sampling_type="particle", function=_dummy_velocity, units="code_length / code_time", )
[docs] class PlutoFieldInfo(ChomboFieldInfo): known_other_fields: KnownFieldsT = ( ("rho", (rho_units, ["density"], None)), ("prs", ("code_mass / (code_length * code_time**2)", ["pressure"], None)), ("vx1", (vel_units, ["velocity_x"], None)), ("vx2", (vel_units, ["velocity_y"], None)), ("vx3", (vel_units, ["velocity_z"], None)), ("bx1", (b_units, [], None)), ("bx2", (b_units, [], None)), ("bx3", (b_units, [], None)), ) known_particle_fields = ()
[docs] def setup_fluid_fields(self): from yt.fields.magnetic_field import setup_magnetic_field_aliases setup_magnetic_field_aliases(self, "chombo", [f"bx{ax}" for ax in [1, 2, 3]])