Source code for yt.frontends.enzo_e.fields

import numpy as np

from yt._typing import KnownFieldsT
from yt.fields.field_info_container import FieldInfoContainer
from yt.fields.particle_fields import add_union_field
from yt.frontends.enzo_e.misc import (
    get_listed_subparam,
    get_particle_mass_correction,
    nested_dict_get,
)

rho_units = "code_mass / code_length**3"
vel_units = "code_velocity"
acc_units = "code_velocity / code_time"
energy_units = "code_velocity**2"
b_units = "code_magnetic"

NODAL_FLAGS = {
    "bfieldi_x": [1, 0, 0],
    "bfieldi_y": [0, 1, 0],
    "bfieldi_z": [0, 0, 1],
}


[docs] class EnzoEFieldInfo(FieldInfoContainer): known_other_fields: KnownFieldsT = ( ("velocity_x", (vel_units, ["velocity_x"], None)), ("velocity_y", (vel_units, ["velocity_y"], None)), ("velocity_z", (vel_units, ["velocity_z"], None)), ("acceleration_x", (acc_units, ["acceleration_x"], None)), ("acceleration_y", (acc_units, ["acceleration_y"], None)), ("acceleration_z", (acc_units, ["acceleration_z"], None)), ("density", (rho_units, ["density"], None)), ("density_total", (rho_units, ["total_density"], None)), ("total_energy", (energy_units, ["specific_total_energy"], None)), ("internal_energy", (energy_units, ["specific_thermal_energy"], None)), ("bfield_x", (b_units, [], None)), ("bfield_y", (b_units, [], None)), ("bfield_z", (b_units, [], None)), ("bfieldi_x", (b_units, [], None)), ("bfieldi_y", (b_units, [], None)), ("bfieldi_z", (b_units, [], None)), ) known_particle_fields: KnownFieldsT = ( ("x", ("code_length", ["particle_position_x"], None)), ("y", ("code_length", ["particle_position_y"], None)), ("z", ("code_length", ["particle_position_z"], None)), ("vx", (vel_units, ["particle_velocity_x"], None)), ("vy", (vel_units, ["particle_velocity_y"], None)), ("vz", (vel_units, ["particle_velocity_z"], None)), ("ax", (acc_units, ["particle_acceleration_x"], None)), ("ay", (acc_units, ["particle_acceleration_y"], None)), ("az", (acc_units, ["particle_acceleration_z"], None)), ("mass", ("code_mass", ["particle_mass"], None)), ) def __init__(self, ds, field_list, slice_info=None): super().__init__(ds, field_list, slice_info=slice_info) # setup nodal flag information for field, arr in NODAL_FLAGS.items(): if ("enzoe", field) in self: finfo = self["enzoe", field] finfo.nodal_flag = np.array(arr)
[docs] def setup_fluid_fields(self): from yt.fields.magnetic_field import setup_magnetic_field_aliases self.setup_energy_field() setup_magnetic_field_aliases(self, "enzoe", [f"bfield_{ax}" for ax in "xyz"])
[docs] def setup_energy_field(self): unit_system = self.ds.unit_system # check if we have a field for internal energy has_ie_field = ("enzoe", "internal_energy") in self.field_list # check if we need to account for magnetic energy vlct_params = get_listed_subparam(self.ds.parameters, "Method", "mhd_vlct", {}) has_magnetic = "no_bfield" != vlct_params.get("mhd_choice", "no_bfield") # define the ("gas", "specific_thermal_energy") field # - this is already done for us if the simulation used the dual-energy # formalism AND ("enzoe", "internal_energy") was saved to disk if not (self.ds.parameters["uses_dual_energy"] and has_ie_field): def _tot_minus_kin(field, data): return ( data["enzoe", "total_energy"] - 0.5 * data["gas", "velocity_magnitude"] ** 2.0 ) if not has_magnetic: # thermal energy = total energy - kinetic energy self.add_field( ("gas", "specific_thermal_energy"), sampling_type="cell", function=_tot_minus_kin, units=unit_system["specific_energy"], ) else: # thermal energy = total energy - kinetic energy - magnetic energy def _sub_b(field, data): return ( _tot_minus_kin(field, data) - data["gas", "magnetic_energy_density"] / data["gas", "density"] ) self.add_field( ("gas", "specific_thermal_energy"), sampling_type="cell", function=_sub_b, units=unit_system["specific_energy"], )
[docs] def setup_particle_fields(self, ptype, ftype="gas", num_neighbors=64): super().setup_particle_fields(ptype, ftype=ftype, num_neighbors=num_neighbors) self.setup_particle_mass_field(ptype)
[docs] def setup_particle_mass_field(self, ptype): fname = "particle_mass" if ptype in self.ds.particle_unions: add_union_field(self, ptype, fname, "code_mass") return pdict = self.ds.parameters.get("Particle", None) if pdict is None: return constants = nested_dict_get(pdict, (ptype, "constants"), default=()) if not constants: return # constants should be a tuple consisting of multiple tuples of (name, type, value). # When there is only one entry, the enclosing tuple gets stripped, so we put it back. if not isinstance(constants[0], tuple): constants = (constants,) names = [c[0] for c in constants] if "mass" not in names: return val = constants[names.index("mass")][2] * self.ds.mass_unit if not self.ds.index.io._particle_mass_is_mass: val = val * get_particle_mass_correction(self.ds) def _pmass(field, data): return val * data[ptype, "particle_ones"] self.add_field( (ptype, fname), function=_pmass, units="code_mass", sampling_type="particle", )