Source code for yt.fields.fluid_fields

import numpy as np

from yt.units.dimensions import current_mks
from yt.units.unit_object import Unit  # type: ignore
from yt.utilities.chemical_formulas import compute_mu
from yt.utilities.lib.misc_utilities import obtain_relative_velocity_vector

from .derived_field import ValidateParameter, ValidateSpatial
from .field_plugin_registry import register_field_plugin
from .vector_operations import (
    create_averaged_field,
    create_magnitude_field,
    create_vector_fields,
)


[docs] @register_field_plugin def setup_fluid_fields(registry, ftype="gas", slice_info=None): pc = registry.ds.units.physical_constants # slice_info would be the left, the right, and the factor. # For example, with the old Enzo-ZEUS fields, this would be: # slice(None, -2, None) # slice(1, -1, None) # 1.0 # Otherwise, we default to a centered difference. if slice_info is None: sl_left = slice(None, -2, None) sl_right = slice(2, None, None) div_fac = 2.0 else: sl_left, sl_right, div_fac = slice_info unit_system = registry.ds.unit_system if unit_system.base_units[current_mks] is None: mag_units = "magnetic_field_cgs" else: mag_units = "magnetic_field_mks" create_vector_fields( registry, "velocity", unit_system["velocity"], ftype, slice_info ) create_vector_fields( registry, "magnetic_field", unit_system[mag_units], ftype, slice_info ) def _cell_mass(field, data): return data[ftype, "density"] * data[ftype, "cell_volume"] registry.add_field( (ftype, "cell_mass"), sampling_type="cell", function=_cell_mass, units=unit_system["mass"], ) registry.alias((ftype, "mass"), (ftype, "cell_mass")) # momentum def momentum_xyz(v): def _momm(field, data): return data["gas", "mass"] * data["gas", f"velocity_{v}"] def _momd(field, data): return data["gas", "density"] * data["gas", f"velocity_{v}"] return _momm, _momd for v in registry.ds.coordinates.axis_order: _momm, _momd = momentum_xyz(v) registry.add_field( ("gas", f"momentum_{v}"), sampling_type="local", function=_momm, units=unit_system["momentum"], ) registry.add_field( ("gas", f"momentum_density_{v}"), sampling_type="local", function=_momd, units=unit_system["density"] * unit_system["velocity"], ) def _sound_speed(field, data): tr = data.ds.gamma * data[ftype, "pressure"] / data[ftype, "density"] return np.sqrt(tr) registry.add_field( (ftype, "sound_speed"), sampling_type="local", function=_sound_speed, units=unit_system["velocity"], ) def _radial_mach_number(field, data): """Radial component of M{|v|/c_sound}""" tr = data[ftype, "radial_velocity"] / data[ftype, "sound_speed"] return np.abs(tr) registry.add_field( (ftype, "radial_mach_number"), sampling_type="local", function=_radial_mach_number, units="", ) def _kinetic_energy_density(field, data): v = obtain_relative_velocity_vector(data) return 0.5 * data[ftype, "density"] * (v**2).sum(axis=0) registry.add_field( (ftype, "kinetic_energy_density"), sampling_type="local", function=_kinetic_energy_density, units=unit_system["pressure"], validators=[ValidateParameter("bulk_velocity")], ) def _mach_number(field, data): """M{|v|/c_sound}""" return data[ftype, "velocity_magnitude"] / data[ftype, "sound_speed"] registry.add_field( (ftype, "mach_number"), sampling_type="local", function=_mach_number, units="" ) def _courant_time_step(field, data): t1 = data[ftype, "dx"] / ( data[ftype, "sound_speed"] + np.abs(data[ftype, "velocity_x"]) ) t2 = data[ftype, "dy"] / ( data[ftype, "sound_speed"] + np.abs(data[ftype, "velocity_y"]) ) t3 = data[ftype, "dz"] / ( data[ftype, "sound_speed"] + np.abs(data[ftype, "velocity_z"]) ) tr = np.minimum(np.minimum(t1, t2), t3) return tr registry.add_field( (ftype, "courant_time_step"), sampling_type="cell", function=_courant_time_step, units=unit_system["time"], ) def _pressure(field, data): """M{(Gamma-1.0)*rho*E}""" tr = (data.ds.gamma - 1.0) * ( data[ftype, "density"] * data[ftype, "specific_thermal_energy"] ) return tr registry.add_field( (ftype, "pressure"), sampling_type="local", function=_pressure, units=unit_system["pressure"], ) def _kT(field, data): return (pc.kboltz * data[ftype, "temperature"]).in_units("keV") registry.add_field( (ftype, "kT"), sampling_type="local", function=_kT, units="keV", display_name="Temperature", ) def _metallicity(field, data): return data[ftype, "metal_density"] / data[ftype, "density"] registry.add_field( (ftype, "metallicity"), sampling_type="local", function=_metallicity, units="Zsun", ) def _metal_mass(field, data): Z = data[ftype, "metallicity"].to("dimensionless") return Z * data[ftype, "mass"] registry.add_field( (ftype, "metal_mass"), sampling_type="local", function=_metal_mass, units=unit_system["mass"], ) if len(registry.ds.field_info.species_names) > 0: def _number_density(field, data): field_data = np.zeros_like( data["gas", f"{data.ds.field_info.species_names[0]}_number_density"] ) for species in data.ds.field_info.species_names: field_data += data["gas", f"{species}_number_density"] return field_data else: def _number_density(field, data): mu = getattr(data.ds, "mu", compute_mu(data.ds.default_species_fields)) return data[ftype, "density"] / (pc.mh * mu) registry.add_field( (ftype, "number_density"), sampling_type="local", function=_number_density, units=unit_system["number_density"], ) def _mean_molecular_weight(field, data): return data[ftype, "density"] / (pc.mh * data[ftype, "number_density"]) registry.add_field( (ftype, "mean_molecular_weight"), sampling_type="local", function=_mean_molecular_weight, units="", ) setup_gradient_fields( registry, (ftype, "pressure"), unit_system["pressure"], slice_info ) setup_gradient_fields( registry, (ftype, "density"), unit_system["density"], slice_info ) create_averaged_field( registry, "density", unit_system["density"], ftype=ftype, slice_info=slice_info, weight="mass", )
[docs] def setup_gradient_fields(registry, grad_field, field_units, slice_info=None): assert isinstance(grad_field, tuple) ftype, fname = grad_field if slice_info is None: sl_left = slice(None, -2, None) sl_right = slice(2, None, None) div_fac = 2.0 else: sl_left, sl_right, div_fac = slice_info slice_3d = (slice(1, -1), slice(1, -1), slice(1, -1)) def grad_func(axi, ax): slice_3dl = slice_3d[:axi] + (sl_left,) + slice_3d[axi + 1 :] slice_3dr = slice_3d[:axi] + (sl_right,) + slice_3d[axi + 1 :] def func(field, data): block_order = getattr(data, "_block_order", "C") if block_order == "F": # Fortran-ordering: we need to swap axes here and # reswap below field_data = data[grad_field].swapaxes(0, 2) else: field_data = data[grad_field] dx = div_fac * data[ftype, f"d{ax}"] if ax == "theta": dx *= data[ftype, "r"] if ax == "phi": dx *= data[ftype, "r"] * np.sin(data[ftype, "theta"]) f = field_data[slice_3dr] / dx[slice_3d] f -= field_data[slice_3dl] / dx[slice_3d] new_field = np.zeros_like(data[grad_field], dtype=np.float64) new_field = data.ds.arr(new_field, field_data.units / dx.units) new_field[slice_3d] = f if block_order == "F": new_field = new_field.swapaxes(0, 2) return new_field return func field_units = Unit(field_units, registry=registry.ds.unit_registry) grad_units = field_units / registry.ds.unit_system["length"] for axi, ax in enumerate(registry.ds.coordinates.axis_order): f = grad_func(axi, ax) registry.add_field( (ftype, f"{fname}_gradient_{ax}"), sampling_type="local", function=f, validators=[ValidateSpatial(1, [grad_field])], units=grad_units, ) create_magnitude_field( registry, f"{fname}_gradient", grad_units, ftype=ftype, validators=[ValidateSpatial(1, [grad_field])], )