Source code for yt.frontends.amrvac.fields

"""
AMRVAC-specific fields

"""


import functools

import numpy as np

from yt.fields.field_info_container import FieldInfoContainer
from yt.units import dimensions
from yt.utilities.logger import ytLogger as mylog

# We need to specify which fields we might have in our dataset.  The field info
# container subclass here will define which fields it knows about.  There are
# optionally methods on it that get called which can be subclassed.

direction_aliases = {
    "cartesian": ("x", "y", "z"),
    "polar": ("r", "theta", "z"),
    "cylindrical": ("r", "z", "theta"),
    "spherical": ("r", "theta", "phi"),
}


def _velocity(field, data, idir, prefix=None):
    """Velocity = linear momentum / density"""
    # This is meant to be used with functools.partial to produce
    # functions with only 2 arguments (field, data)
    # idir : int
    #    the direction index (1, 2 or 3)
    # prefix : str
    #    used to generalize to dust fields
    if prefix is None:
        prefix = ""
    moment = data["gas", "%smoment_%d" % (prefix, idir)]
    rho = data["gas", f"{prefix}density"]

    mask1 = rho == 0
    if mask1.any():
        mylog.info(
            "zeros found in %sdensity, "
            "patching them to compute corresponding velocity field.",
            prefix,
        )
        mask2 = moment == 0
        if not ((mask1 & mask2) == mask1).all():
            raise RuntimeError
        rho[mask1] = 1
    return moment / rho


code_density = "code_mass / code_length**3"
code_moment = "code_mass / code_length**2 / code_time"
code_pressure = "code_mass / code_length / code_time**2"


[docs] class AMRVACFieldInfo(FieldInfoContainer): # for now, define a finite family of dust fields (up to 100 species) MAXN_DUST_SPECIES = 100 known_dust_fields = [ ("rhod%d" % idust, (code_density, ["dust%d_density" % idust], None)) for idust in range(1, MAXN_DUST_SPECIES + 1) ] + [ ( "m%dd%d" % (idir, idust), (code_moment, ["dust%d_moment_%d" % (idust, idir)], None), ) for idust in range(1, MAXN_DUST_SPECIES + 1) for idir in (1, 2, 3) ] # format: (native(?) field, (units, [aliases], display_name)) # note: aliases will correspond to "gas" typed fields # whereas the native ones are "amrvac" typed known_other_fields = ( ("rho", (code_density, ["density"], None)), ("m1", (code_moment, ["moment_1"], None)), ("m2", (code_moment, ["moment_2"], None)), ("m3", (code_moment, ["moment_3"], None)), ("e", (code_pressure, ["energy_density"], None)), ("b1", ("code_magnetic", ["magnetic_1"], None)), ("b2", ("code_magnetic", ["magnetic_2"], None)), ("b3", ("code_magnetic", ["magnetic_3"], None)), ("Te", ("code_temperature", ["temperature"], None)), *known_dust_fields, ) known_particle_fields = () def _setup_velocity_fields(self, idust=None): if idust is None: dust_flag = dust_label = "" else: dust_flag = "d%d" % idust dust_label = "dust%d_" % idust us = self.ds.unit_system for idir, alias in enumerate(direction_aliases[self.ds.geometry], start=1): if ("amrvac", "m%d%s" % (idir, dust_flag)) not in self.field_list: break velocity_fn = functools.partial(_velocity, idir=idir, prefix=dust_label) self.add_field( ("gas", f"{dust_label}velocity_{alias}"), function=velocity_fn, units=us["velocity"], dimensions=dimensions.velocity, sampling_type="cell", ) self.alias( ("gas", "%svelocity_%d" % (dust_label, idir)), ("gas", f"{dust_label}velocity_{alias}"), units=us["velocity"], ) self.alias( ("gas", f"{dust_label}moment_{alias}"), ("gas", "%smoment_%d" % (dust_label, idir)), units=us["density"] * us["velocity"], ) def _setup_dust_fields(self): idust = 1 imax = self.__class__.MAXN_DUST_SPECIES while ("amrvac", "rhod%d" % idust) in self.field_list: if idust > imax: mylog.error( "Only the first %d dust species are currently read by yt. " "If you read this, please consider issuing a ticket. ", imax, ) break self._setup_velocity_fields(idust) idust += 1 n_dust_found = idust - 1 us = self.ds.unit_system if n_dust_found > 0: def _total_dust_density(field, data): tot = np.zeros_like(data[("gas", "density")]) for idust in range(1, n_dust_found + 1): tot += data["dust%d_density" % idust] return tot self.add_field( ("gas", "total_dust_density"), function=_total_dust_density, dimensions=dimensions.density, units=us["density"], sampling_type="cell", ) def dust_to_gas_ratio(field, data): return data[("gas", "total_dust_density")] / data[("gas", "density")] self.add_field( ("gas", "dust_to_gas_ratio"), function=dust_to_gas_ratio, dimensions=dimensions.dimensionless, sampling_type="cell", )
[docs] def setup_fluid_fields(self): from yt.fields.magnetic_field import setup_magnetic_field_aliases setup_magnetic_field_aliases(self, "amrvac", [f"mag{ax}" for ax in "xyz"]) self._setup_velocity_fields() # gas velocities self._setup_dust_fields() # dust derived fields (including velocities) # fields with nested dependencies are defined thereafter # by increasing level of complexity us = self.ds.unit_system def _kinetic_energy_density(field, data): # devnote : have a look at issue 1301 return 0.5 * data["gas", "density"] * data["gas", "velocity_magnitude"] ** 2 self.add_field( ("gas", "kinetic_energy_density"), function=_kinetic_energy_density, units=us["density"] * us["velocity"] ** 2, dimensions=dimensions.density * dimensions.velocity**2, sampling_type="cell", ) # magnetic energy density if ("amrvac", "b1") in self.field_list: def _magnetic_energy_density(field, data): emag = 0.5 * data["gas", "magnetic_1"] ** 2 for idim in "23": if ("amrvac", f"b{idim}") not in self.field_list: break emag += 0.5 * data["gas", f"magnetic_{idim}"] ** 2 # in AMRVAC the magnetic field is defined in units where mu0 = 1, # such that # Emag = 0.5*B**2 instead of Emag = 0.5*B**2 / mu0 # To correctly transform the dimensionality from gauss**2 -> rho*v**2, # we have to take mu0 into account. If we divide here, units when adding # the field should be us["density"]*us["velocity"]**2. # If not, they should be us["magnetic_field"]**2 and division should # happen elsewhere. emag /= 4 * np.pi # divided by mu0 = 4pi in cgs, # yt handles 'mks' and 'code' unit systems internally. return emag self.add_field( ("gas", "magnetic_energy_density"), function=_magnetic_energy_density, units=us["density"] * us["velocity"] ** 2, dimensions=dimensions.density * dimensions.velocity**2, sampling_type="cell", ) # Adding the thermal pressure field. # In AMRVAC we have multiple physics possibilities: # - if HD/MHD + energy equation P = (gamma-1)*(e - ekin (- emag)) for (M)HD # - if HD/MHD but solve_internal_e is true in parfile, P = (gamma-1)*e for both # - if (m)hd_energy is false in parfile (isothermal), P = c_adiab * rho**gamma def _full_thermal_pressure_HD(field, data): # energy density and pressure are actually expressed in the same unit pthermal = (data.ds.gamma - 1) * ( data["gas", "energy_density"] - data["gas", "kinetic_energy_density"] ) return pthermal def _full_thermal_pressure_MHD(field, data): pthermal = ( _full_thermal_pressure_HD(field, data) - (data.ds.gamma - 1) * data["gas", "magnetic_energy_density"] ) return pthermal def _polytropic_thermal_pressure(field, data): return (data.ds.gamma - 1) * data["gas", "energy_density"] def _adiabatic_thermal_pressure(field, data): return data.ds._c_adiab * data["gas", "density"] ** data.ds.gamma pressure_recipe = None if ("amrvac", "e") in self.field_list: if self.ds._e_is_internal: pressure_recipe = _polytropic_thermal_pressure mylog.info("Using polytropic EoS for thermal pressure.") elif ("amrvac", "b1") in self.field_list: pressure_recipe = _full_thermal_pressure_MHD mylog.info("Using full MHD energy for thermal pressure.") else: pressure_recipe = _full_thermal_pressure_HD mylog.info("Using full HD energy for thermal pressure.") elif self.ds._c_adiab is not None: pressure_recipe = _adiabatic_thermal_pressure mylog.info("Using adiabatic EoS for thermal pressure (isothermal).") mylog.warning( "If you used usr_set_pthermal you should " "redefine the thermal_pressure field." ) if pressure_recipe is not None: self.add_field( ("gas", "thermal_pressure"), function=pressure_recipe, units=us["density"] * us["velocity"] ** 2, dimensions=dimensions.density * dimensions.velocity**2, sampling_type="cell", ) # sound speed and temperature depend on thermal pressure def _sound_speed(field, data): return np.sqrt( data.ds.gamma * data["gas", "thermal_pressure"] / data["gas", "density"] ) self.add_field( ("gas", "sound_speed"), function=_sound_speed, units=us["velocity"], dimensions=dimensions.velocity, sampling_type="cell", ) else: mylog.warning( "e not found and no parfile passed, can not set thermal_pressure." )