import numpy as np
from yt._typing import FieldType
from yt.fields.derived_field import ValidateParameter, ValidateSpatial
from yt.fields.field_info_container import FieldInfoContainer
from yt.funcs import just_one
from yt.geometry.api import Geometry
from yt.utilities.exceptions import YTDimensionalityError, YTFieldNotFound
from .field_plugin_registry import register_field_plugin
from .vector_operations import create_magnitude_field, create_squared_field
[docs]
@register_field_plugin
def setup_fluid_vector_fields(
registry: FieldInfoContainer, ftype: FieldType = "gas", slice_info=None
) -> None:
# Current implementation for gradient is not valid for curvilinear geometries
geometry: Geometry = registry.ds.geometry
if geometry is not Geometry.CARTESIAN:
return
unit_system = registry.ds.unit_system
# slice_info would be the left, the right, and the factor.
# For example, with the old Enzo-ZEUS fields, this would be:
# slice(None, -2, None)
# slice(1, -1, None)
# 1.0
# Otherwise, we default to a centered difference.
if slice_info is None:
sl_left = slice(None, -2, None)
sl_right = slice(2, None, None)
div_fac = 2.0
else:
sl_left, sl_right, div_fac = slice_info
sl_center = slice(1, -1, None)
def _baroclinic_vorticity_x(field, data):
rho2 = data[ftype, "density"].astype("float64", copy=False) ** 2
return (
data[ftype, "pressure_gradient_y"] * data[ftype, "density_gradient_z"]
- data[ftype, "pressure_gradient_z"] * data[ftype, "density_gradient_z"]
) / rho2
def _baroclinic_vorticity_y(field, data):
rho2 = data[ftype, "density"].astype("float64", copy=False) ** 2
return (
data[ftype, "pressure_gradient_z"] * data[ftype, "density_gradient_x"]
- data[ftype, "pressure_gradient_x"] * data[ftype, "density_gradient_z"]
) / rho2
def _baroclinic_vorticity_z(field, data):
rho2 = data[ftype, "density"].astype("float64", copy=False) ** 2
return (
data[ftype, "pressure_gradient_x"] * data[ftype, "density_gradient_y"]
- data[ftype, "pressure_gradient_y"] * data[ftype, "density_gradient_x"]
) / rho2
bv_validators = [ValidateSpatial(1, [(ftype, "density"), (ftype, "pressure")])]
for ax in "xyz":
n = f"baroclinic_vorticity_{ax}"
registry.add_field(
(ftype, n),
sampling_type="cell",
function=eval(f"_{n}"),
validators=bv_validators,
units=unit_system["frequency"] ** 2,
)
create_magnitude_field(
registry,
"baroclinic_vorticity",
unit_system["frequency"] ** 2,
ftype=ftype,
slice_info=slice_info,
validators=bv_validators,
)
def _vorticity_x(field, data):
vz = data[ftype, "relative_velocity_z"]
vy = data[ftype, "relative_velocity_y"]
f = (vz[sl_center, sl_right, sl_center] - vz[sl_center, sl_left, sl_center]) / (
div_fac * just_one(data["index", "dy"])
)
f -= (
vy[sl_center, sl_center, sl_right] - vy[sl_center, sl_center, sl_left]
) / (div_fac * just_one(data["index", "dz"]))
new_field = data.ds.arr(np.zeros_like(vz, dtype=np.float64), f.units)
new_field[sl_center, sl_center, sl_center] = f
return new_field
def _vorticity_y(field, data):
vx = data[ftype, "relative_velocity_x"]
vz = data[ftype, "relative_velocity_z"]
f = (vx[sl_center, sl_center, sl_right] - vx[sl_center, sl_center, sl_left]) / (
div_fac * just_one(data["index", "dz"])
)
f -= (
vz[sl_right, sl_center, sl_center] - vz[sl_left, sl_center, sl_center]
) / (div_fac * just_one(data["index", "dx"]))
new_field = data.ds.arr(np.zeros_like(vz, dtype=np.float64), f.units)
new_field[sl_center, sl_center, sl_center] = f
return new_field
def _vorticity_z(field, data):
vx = data[ftype, "relative_velocity_x"]
vy = data[ftype, "relative_velocity_y"]
f = (vy[sl_right, sl_center, sl_center] - vy[sl_left, sl_center, sl_center]) / (
div_fac * just_one(data["index", "dx"])
)
f -= (
vx[sl_center, sl_right, sl_center] - vx[sl_center, sl_left, sl_center]
) / (div_fac * just_one(data["index", "dy"]))
new_field = data.ds.arr(np.zeros_like(vy, dtype=np.float64), f.units)
new_field[sl_center, sl_center, sl_center] = f
return new_field
vort_validators = [
ValidateSpatial(1, [(ftype, f"velocity_{d}") for d in "xyz"]),
ValidateParameter("bulk_velocity"),
]
for ax in "xyz":
n = f"vorticity_{ax}"
registry.add_field(
(ftype, n),
sampling_type="cell",
function=eval(f"_{n}"),
units=unit_system["frequency"],
validators=vort_validators,
)
create_magnitude_field(
registry,
"vorticity",
unit_system["frequency"],
ftype=ftype,
slice_info=slice_info,
validators=vort_validators,
)
create_squared_field(
registry,
"vorticity",
unit_system["frequency"] ** 2,
ftype=ftype,
slice_info=slice_info,
validators=vort_validators,
)
def _vorticity_stretching_x(field, data):
return data[ftype, "velocity_divergence"] * data[ftype, "vorticity_x"]
def _vorticity_stretching_y(field, data):
return data[ftype, "velocity_divergence"] * data[ftype, "vorticity_y"]
def _vorticity_stretching_z(field, data):
return data[ftype, "velocity_divergence"] * data[ftype, "vorticity_z"]
for ax in "xyz":
n = f"vorticity_stretching_{ax}"
registry.add_field(
(ftype, n),
sampling_type="cell",
function=eval(f"_{n}"),
units=unit_system["frequency"] ** 2,
validators=vort_validators,
)
create_magnitude_field(
registry,
"vorticity_stretching",
unit_system["frequency"] ** 2,
ftype=ftype,
slice_info=slice_info,
validators=vort_validators,
)
def _vorticity_growth_x(field, data):
return (
-data[ftype, "vorticity_stretching_x"]
- data[ftype, "baroclinic_vorticity_x"]
)
def _vorticity_growth_y(field, data):
return (
-data[ftype, "vorticity_stretching_y"]
- data[ftype, "baroclinic_vorticity_y"]
)
def _vorticity_growth_z(field, data):
return (
-data[ftype, "vorticity_stretching_z"]
- data[ftype, "baroclinic_vorticity_z"]
)
for ax in "xyz":
n = f"vorticity_growth_{ax}"
registry.add_field(
(ftype, n),
sampling_type="cell",
function=eval(f"_{n}"),
units=unit_system["frequency"] ** 2,
validators=vort_validators,
)
def _vorticity_growth_magnitude(field, data):
result = np.sqrt(
data[ftype, "vorticity_growth_x"] ** 2
+ data[ftype, "vorticity_growth_y"] ** 2
+ data[ftype, "vorticity_growth_z"] ** 2
)
dot = data.ds.arr(np.zeros(result.shape), "")
for ax in "xyz":
dot += (
data[ftype, f"vorticity_{ax}"] * data[ftype, f"vorticity_growth_{ax}"]
).to_ndarray()
result = np.sign(dot) * result
return result
registry.add_field(
(ftype, "vorticity_growth_magnitude"),
sampling_type="cell",
function=_vorticity_growth_magnitude,
units=unit_system["frequency"] ** 2,
validators=vort_validators,
take_log=False,
)
def _vorticity_growth_magnitude_absolute(field, data):
return np.sqrt(
data[ftype, "vorticity_growth_x"] ** 2
+ data[ftype, "vorticity_growth_y"] ** 2
+ data[ftype, "vorticity_growth_z"] ** 2
)
registry.add_field(
(ftype, "vorticity_growth_magnitude_absolute"),
sampling_type="cell",
function=_vorticity_growth_magnitude_absolute,
units=unit_system["frequency"] ** 2,
validators=vort_validators,
)
def _vorticity_growth_timescale(field, data):
domegax_dt = data[ftype, "vorticity_x"] / data[ftype, "vorticity_growth_x"]
domegay_dt = data[ftype, "vorticity_y"] / data[ftype, "vorticity_growth_y"]
domegaz_dt = data[ftype, "vorticity_z"] / data[ftype, "vorticity_growth_z"]
return np.sqrt(domegax_dt**2 + domegay_dt**2 + domegaz_dt**2)
registry.add_field(
(ftype, "vorticity_growth_timescale"),
sampling_type="cell",
function=_vorticity_growth_timescale,
units=unit_system["time"],
validators=vort_validators,
)
########################################################################
# With radiation pressure
########################################################################
def _vorticity_radiation_pressure_x(field, data):
rho = data[ftype, "density"].astype("float64", copy=False)
return (
data[ftype, "radiation_acceleration_y"] * data[ftype, "density_gradient_z"]
- data[ftype, "radiation_acceleration_z"]
* data[ftype, "density_gradient_y"]
) / rho
def _vorticity_radiation_pressure_y(field, data):
rho = data[ftype, "density"].astype("float64", copy=False)
return (
data[ftype, "radiation_acceleration_z"] * data[ftype, "density_gradient_x"]
- data[ftype, "radiation_acceleration_x"]
* data[ftype, "density_gradient_z"]
) / rho
def _vorticity_radiation_pressure_z(field, data):
rho = data[ftype, "density"].astype("float64", copy=False)
return (
data[ftype, "radiation_acceleration_x"] * data[ftype, "density_gradient_y"]
- data[ftype, "radiation_acceleration_y"]
* data[ftype, "density_gradient_x"]
) / rho
vrp_validators = [
ValidateSpatial(
1,
[
(ftype, "density"),
(ftype, "radiation_acceleration_x"),
(ftype, "radiation_acceleration_y"),
(ftype, "radiation_acceleration_z"),
],
)
]
for ax in "xyz":
n = f"vorticity_radiation_pressure_{ax}"
registry.add_field(
(ftype, n),
sampling_type="cell",
function=eval(f"_{n}"),
units=unit_system["frequency"] ** 2,
validators=vrp_validators,
)
create_magnitude_field(
registry,
"vorticity_radiation_pressure",
unit_system["frequency"] ** 2,
ftype=ftype,
slice_info=slice_info,
validators=vrp_validators,
)
def _vorticity_radiation_pressure_growth_x(field, data):
return (
-data[ftype, "vorticity_stretching_x"]
- data[ftype, "baroclinic_vorticity_x"]
- data[ftype, "vorticity_radiation_pressure_x"]
)
def _vorticity_radiation_pressure_growth_y(field, data):
return (
-data[ftype, "vorticity_stretching_y"]
- data[ftype, "baroclinic_vorticity_y"]
- data[ftype, "vorticity_radiation_pressure_y"]
)
def _vorticity_radiation_pressure_growth_z(field, data):
return (
-data[ftype, "vorticity_stretching_z"]
- data[ftype, "baroclinic_vorticity_z"]
- data[ftype, "vorticity_radiation_pressure_z"]
)
for ax in "xyz":
n = f"vorticity_radiation_pressure_growth_{ax}"
registry.add_field(
(ftype, n),
sampling_type="cell",
function=eval(f"_{n}"),
units=unit_system["frequency"] ** 2,
validators=vrp_validators,
)
def _vorticity_radiation_pressure_growth_magnitude(field, data):
result = np.sqrt(
data[ftype, "vorticity_radiation_pressure_growth_x"] ** 2
+ data[ftype, "vorticity_radiation_pressure_growth_y"] ** 2
+ data[ftype, "vorticity_radiation_pressure_growth_z"] ** 2
)
dot = data.ds.arr(np.zeros(result.shape), "")
for ax in "xyz":
dot += (
data[ftype, f"vorticity_{ax}"] * data[ftype, f"vorticity_growth_{ax}"]
).to_ndarray()
result = np.sign(dot) * result
return result
registry.add_field(
(ftype, "vorticity_radiation_pressure_growth_magnitude"),
sampling_type="cell",
function=_vorticity_radiation_pressure_growth_magnitude,
units=unit_system["frequency"] ** 2,
validators=vrp_validators,
take_log=False,
)
def _vorticity_radiation_pressure_growth_magnitude_absolute(field, data):
return np.sqrt(
data[ftype, "vorticity_radiation_pressure_growth_x"] ** 2
+ data[ftype, "vorticity_radiation_pressure_growth_y"] ** 2
+ data[ftype, "vorticity_radiation_pressure_growth_z"] ** 2
)
registry.add_field(
(ftype, "vorticity_radiation_pressure_growth_magnitude_absolute"),
sampling_type="cell",
function=_vorticity_radiation_pressure_growth_magnitude_absolute,
units="s**(-2)",
validators=vrp_validators,
)
def _vorticity_radiation_pressure_growth_timescale(field, data):
domegax_dt = (
data[ftype, "vorticity_x"]
/ data[ftype, "vorticity_radiation_pressure_growth_x"]
)
domegay_dt = (
data[ftype, "vorticity_y"]
/ data[ftype, "vorticity_radiation_pressure_growth_y"]
)
domegaz_dt = (
data[ftype, "vorticity_z"]
/ data[ftype, "vorticity_radiation_pressure_growth_z"]
)
return np.sqrt(domegax_dt**2 + domegay_dt**2 + domegaz_dt**2)
registry.add_field(
(ftype, "vorticity_radiation_pressure_growth_timescale"),
sampling_type="cell",
function=_vorticity_radiation_pressure_growth_timescale,
units=unit_system["time"],
validators=vrp_validators,
)
def _shear(field, data):
"""
Shear is defined as [(dvx/dy + dvy/dx)^2 + (dvz/dy + dvy/dz)^2 +
(dvx/dz + dvz/dx)^2 ]^(0.5)
where dvx/dy = [vx(j-1) - vx(j+1)]/[2dy]
and is in units of s^(-1)
(it's just like vorticity except add the derivative pairs instead
of subtracting them)
"""
if data.ds.geometry != "cartesian":
raise NotImplementedError("shear is only supported in cartesian geometries")
try:
vx = data[ftype, "relative_velocity_x"]
vy = data[ftype, "relative_velocity_y"]
except YTFieldNotFound as e:
raise YTDimensionalityError(
"shear computation requires 2 velocity components"
) from e
dvydx = (
vy[sl_right, sl_center, sl_center] - vy[sl_left, sl_center, sl_center]
) / (div_fac * just_one(data["index", "dx"]))
dvxdy = (
vx[sl_center, sl_right, sl_center] - vx[sl_center, sl_left, sl_center]
) / (div_fac * just_one(data["index", "dy"]))
f = (dvydx + dvxdy) ** 2.0
del dvydx, dvxdy
try:
vz = data[ftype, "relative_velocity_z"]
dvzdy = (
vz[sl_center, sl_right, sl_center] - vz[sl_center, sl_left, sl_center]
) / (div_fac * just_one(data["index", "dy"]))
dvydz = (
vy[sl_center, sl_center, sl_right] - vy[sl_center, sl_center, sl_left]
) / (div_fac * just_one(data["index", "dz"]))
f += (dvzdy + dvydz) ** 2.0
del dvzdy, dvydz
dvxdz = (
vx[sl_center, sl_center, sl_right] - vx[sl_center, sl_center, sl_left]
) / (div_fac * just_one(data["index", "dz"]))
dvzdx = (
vz[sl_right, sl_center, sl_center] - vz[sl_left, sl_center, sl_center]
) / (div_fac * just_one(data["index", "dx"]))
f += (dvxdz + dvzdx) ** 2.0
del dvxdz, dvzdx
except YTFieldNotFound:
# the absence of a z velocity component is not blocking
pass
np.sqrt(f, out=f)
new_field = data.ds.arr(np.zeros_like(data[ftype, "velocity_x"]), f.units)
new_field[sl_center, sl_center, sl_center] = f
return new_field
registry.add_field(
(ftype, "shear"),
sampling_type="cell",
function=_shear,
validators=[
ValidateSpatial(
1, [(ftype, "velocity_x"), (ftype, "velocity_y"), (ftype, "velocity_z")]
),
ValidateParameter("bulk_velocity"),
],
units=unit_system["frequency"],
)
def _shear_criterion(field, data):
"""
Divide by c_s to leave shear in units of length**-1, which
can be compared against the inverse of the local cell size (1/dx)
to determine if refinement should occur.
"""
return data[ftype, "shear"] / data[ftype, "sound_speed"]
registry.add_field(
(ftype, "shear_criterion"),
sampling_type="cell",
function=_shear_criterion,
units=unit_system["length"] ** -1,
validators=[
ValidateSpatial(
1,
[
(ftype, "sound_speed"),
(ftype, "velocity_x"),
(ftype, "velocity_y"),
(ftype, "velocity_z"),
],
)
],
)
def _shear_mach(field, data):
"""
Dimensionless shear (shear_mach) is defined nearly the same as shear,
except that it is scaled by the local dx/dy/dz and the local sound speed.
So it results in a unitless quantity that is effectively measuring
shear in mach number.
In order to avoid discontinuities created by multiplying by dx/dy/dz at
grid refinement boundaries, we also multiply by 2**GridLevel.
Shear (Mach) = [(dvx + dvy)^2 + (dvz + dvy)^2 +
(dvx + dvz)^2 ]^(0.5) / c_sound
"""
if data.ds.geometry != "cartesian":
raise NotImplementedError(
"shear_mach is only supported in cartesian geometries"
)
try:
vx = data[ftype, "relative_velocity_x"]
vy = data[ftype, "relative_velocity_y"]
except YTFieldNotFound as e:
raise YTDimensionalityError(
"shear_mach computation requires 2 velocity components"
) from e
dvydx = (
vy[sl_right, sl_center, sl_center] - vy[sl_left, sl_center, sl_center]
) / div_fac
dvxdy = (
vx[sl_center, sl_right, sl_center] - vx[sl_center, sl_left, sl_center]
) / div_fac
f = (dvydx + dvxdy) ** 2.0
del dvydx, dvxdy
try:
vz = data[ftype, "relative_velocity_z"]
dvzdy = (
vz[sl_center, sl_right, sl_center] - vz[sl_center, sl_left, sl_center]
) / div_fac
dvydz = (
vy[sl_center, sl_center, sl_right] - vy[sl_center, sl_center, sl_left]
) / div_fac
f += (dvzdy + dvydz) ** 2.0
del dvzdy, dvydz
dvxdz = (
vx[sl_center, sl_center, sl_right] - vx[sl_center, sl_center, sl_left]
) / div_fac
dvzdx = (
vz[sl_right, sl_center, sl_center] - vz[sl_left, sl_center, sl_center]
) / div_fac
f += (dvxdz + dvzdx) ** 2.0
del dvxdz, dvzdx
except YTFieldNotFound:
# the absence of a z velocity component is not blocking
pass
f *= (
2.0 ** data["index", "grid_level"][sl_center, sl_center, sl_center]
/ data[ftype, "sound_speed"][sl_center, sl_center, sl_center]
) ** 2.0
np.sqrt(f, out=f)
new_field = data.ds.arr(np.zeros_like(vx), f.units)
new_field[sl_center, sl_center, sl_center] = f
return new_field
vs_fields = [
(ftype, "sound_speed"),
(ftype, "velocity_x"),
(ftype, "velocity_y"),
(ftype, "velocity_z"),
]
registry.add_field(
(ftype, "shear_mach"),
sampling_type="cell",
function=_shear_mach,
units="",
validators=[ValidateSpatial(1, vs_fields), ValidateParameter("bulk_velocity")],
)