import sys
from typing import TYPE_CHECKING
import numpy as np
from unyt import Unit
from yt._typing import FieldName, FieldType
from yt.funcs import is_sequence, just_one
from yt.geometry.api import Geometry
from yt.utilities.lib.misc_utilities import obtain_relative_velocity_vector
from yt.utilities.math_utils import (
get_cyl_r_component,
get_cyl_theta_component,
get_cyl_z_component,
get_sph_phi_component,
get_sph_r_component,
get_sph_theta_component,
)
from .derived_field import NeedsParameter, ValidateParameter, ValidateSpatial
if sys.version_info >= (3, 11):
from typing import assert_never
else:
from typing_extensions import assert_never
if TYPE_CHECKING:
# avoid circular imports
from yt.fields.field_info_container import FieldInfoContainer
[docs]
def get_bulk(data, basename, unit):
if data.has_field_parameter(f"bulk_{basename}"):
bulk = data.get_field_parameter(f"bulk_{basename}")
else:
bulk = [0, 0, 0] * unit
return bulk
[docs]
def create_magnitude_field(
registry,
basename,
field_units,
ftype="gas",
slice_info=None,
validators=None,
sampling_type=None,
):
axis_order = registry.ds.coordinates.axis_order
field_components = [(ftype, f"{basename}_{ax}") for ax in axis_order]
if sampling_type is None:
sampling_type = "local"
def _magnitude(field, data):
fn = field_components[0]
if data.has_field_parameter(f"bulk_{basename}"):
fn = (fn[0], f"relative_{fn[1]}")
d = data[fn]
mag = (d) ** 2
for idim in range(1, registry.ds.dimensionality):
fn = field_components[idim]
if data.has_field_parameter(f"bulk_{basename}"):
fn = (fn[0], f"relative_{fn[1]}")
mag += (data[fn]) ** 2
return np.sqrt(mag)
registry.add_field(
(ftype, f"{basename}_magnitude"),
sampling_type=sampling_type,
function=_magnitude,
units=field_units,
validators=validators,
)
[docs]
def create_relative_field(
registry, basename, field_units, ftype="gas", slice_info=None, validators=None
):
axis_order = registry.ds.coordinates.axis_order
field_components = [(ftype, f"{basename}_{ax}") for ax in axis_order]
def relative_vector(ax):
def _relative_vector(field, data):
iax = axis_order.index(ax)
d = data[field_components[iax]]
bulk = get_bulk(data, basename, d.unit_quantity)
return d - bulk[iax]
return _relative_vector
for d in axis_order:
registry.add_field(
(ftype, f"relative_{basename}_{d}"),
sampling_type="local",
function=relative_vector(d),
units=field_units,
validators=validators,
)
[docs]
def create_los_field(
registry,
basename,
field_units,
ftype="gas",
slice_info=None,
*,
sampling_type="local",
):
axis_order = registry.ds.coordinates.axis_order
# Here we need to check if we are a particle field, so that we can
# correctly identify the "bulk" field parameter corresponding to
# this vector field.
if sampling_type == "particle":
basenm = basename.removeprefix("particle_")
else:
basenm = basename
validators = [
ValidateParameter(f"bulk_{basenm}"),
ValidateParameter("axis", {"axis": [0, 1, 2]}),
]
field_comps = [(ftype, f"{basename}_{ax}") for ax in axis_order]
def _los_field(field, data):
if data.has_field_parameter(f"bulk_{basenm}"):
fns = [(fc[0], f"relative_{fc[1]}") for fc in field_comps]
else:
fns = field_comps
ax = data.get_field_parameter("axis")
if is_sequence(ax):
# Make sure this is a unit vector
ax /= np.sqrt(np.dot(ax, ax))
ret = data[fns[0]] * ax[0] + data[fns[1]] * ax[1] + data[fns[2]] * ax[2]
elif ax in [0, 1, 2]:
ret = data[fns[ax]]
else:
raise NeedsParameter(["axis"])
return ret
registry.add_field(
(ftype, f"{basename}_los"),
sampling_type=sampling_type,
function=_los_field,
units=field_units,
validators=validators,
display_name=rf"\mathrm{{Line of Sight {basename.capitalize()}}}",
)
[docs]
def create_squared_field(
registry, basename, field_units, ftype="gas", slice_info=None, validators=None
):
axis_order = registry.ds.coordinates.axis_order
field_components = [(ftype, f"{basename}_{ax}") for ax in axis_order]
def _squared(field, data):
fn = field_components[0]
if data.has_field_parameter(f"bulk_{basename}"):
fn = (fn[0], f"relative_{fn[1]}")
squared = data[fn] * data[fn]
for idim in range(1, registry.ds.dimensionality):
fn = field_components[idim]
squared += data[fn] * data[fn]
return squared
registry.add_field(
(ftype, f"{basename}_squared"),
sampling_type="local",
function=_squared,
units=field_units,
validators=validators,
)
[docs]
def create_vector_fields(
registry: "FieldInfoContainer",
basename: FieldName,
field_units,
ftype: FieldType = "gas",
slice_info=None,
) -> None:
# slice_info would be the left, the right, and the factor.
# For example, with the old Enzo-ZEUS fields, this would be:
# slice(None, -2, None)
# slice(1, -1, None)
# 1.0
# Otherwise, we default to a centered difference.
if slice_info is None:
sl_left = slice(None, -2, None)
sl_right = slice(2, None, None)
div_fac = 2.0
else:
sl_left, sl_right, div_fac = slice_info
axis_order = registry.ds.coordinates.axis_order
xn, yn, zn = ((ftype, f"{basename}_{ax}") for ax in axis_order)
# Is this safe?
if registry.ds.dimensionality < 3:
zn = ("index", "zeros")
if registry.ds.dimensionality < 2:
yn = ("index", "zeros")
create_relative_field(
registry,
basename,
field_units,
ftype=ftype,
slice_info=slice_info,
validators=[ValidateParameter(f"bulk_{basename}")],
)
create_magnitude_field(
registry,
basename,
field_units,
ftype=ftype,
slice_info=slice_info,
validators=[ValidateParameter(f"bulk_{basename}")],
)
axis_names = registry.ds.coordinates.axis_order
geometry: Geometry = registry.ds.geometry
if geometry is Geometry.CARTESIAN:
# The following fields are invalid for curvilinear geometries
def _spherical_radius_component(field, data):
"""The spherical radius component of the vector field
Relative to the coordinate system defined by the *normal* vector,
*center*, and *bulk_* field parameters.
"""
normal = data.get_field_parameter("normal")
vectors = obtain_relative_velocity_vector(
data, (xn, yn, zn), f"bulk_{basename}"
)
theta = data["index", "spherical_theta"]
phi = data["index", "spherical_phi"]
rv = get_sph_r_component(vectors, theta, phi, normal)
# Now, anywhere that radius is in fact zero, we want to zero out our
# return values.
rv[np.isnan(theta)] = 0.0
return rv
registry.add_field(
(ftype, f"{basename}_spherical_radius"),
sampling_type="local",
function=_spherical_radius_component,
units=field_units,
validators=[
ValidateParameter("normal"),
ValidateParameter("center"),
ValidateParameter(f"bulk_{basename}"),
],
)
create_los_field(
registry, basename, field_units, ftype=ftype, slice_info=slice_info
)
def _radial(field, data):
return data[ftype, f"{basename}_spherical_radius"]
def _radial_absolute(field, data):
return np.abs(data[ftype, f"{basename}_spherical_radius"])
def _tangential(field, data):
return np.sqrt(
data[ftype, f"{basename}_spherical_theta"] ** 2.0
+ data[ftype, f"{basename}_spherical_phi"] ** 2.0
)
registry.add_field(
(ftype, f"radial_{basename}"),
sampling_type="local",
function=_radial,
units=field_units,
validators=[ValidateParameter("normal"), ValidateParameter("center")],
)
registry.add_field(
(ftype, f"radial_{basename}_absolute"),
sampling_type="local",
function=_radial_absolute,
units=field_units,
)
registry.add_field(
(ftype, f"tangential_{basename}"),
sampling_type="local",
function=_tangential,
units=field_units,
)
def _spherical_theta_component(field, data):
"""The spherical theta component of the vector field
Relative to the coordinate system defined by the *normal* vector,
*center*, and *bulk_* field parameters.
"""
normal = data.get_field_parameter("normal")
vectors = obtain_relative_velocity_vector(
data, (xn, yn, zn), f"bulk_{basename}"
)
theta = data["index", "spherical_theta"]
phi = data["index", "spherical_phi"]
return get_sph_theta_component(vectors, theta, phi, normal)
registry.add_field(
(ftype, f"{basename}_spherical_theta"),
sampling_type="local",
function=_spherical_theta_component,
units=field_units,
validators=[
ValidateParameter("normal"),
ValidateParameter("center"),
ValidateParameter(f"bulk_{basename}"),
],
)
def _spherical_phi_component(field, data):
"""The spherical phi component of the vector field
Relative to the coordinate system defined by the *normal* vector,
*center*, and *bulk_* field parameters.
"""
normal = data.get_field_parameter("normal")
vectors = obtain_relative_velocity_vector(
data, (xn, yn, zn), f"bulk_{basename}"
)
phi = data["index", "spherical_phi"]
return get_sph_phi_component(vectors, phi, normal)
registry.add_field(
(ftype, f"{basename}_spherical_phi"),
sampling_type="local",
function=_spherical_phi_component,
units=field_units,
validators=[
ValidateParameter("normal"),
ValidateParameter("center"),
ValidateParameter(f"bulk_{basename}"),
],
)
def _cp_vectors(ax):
def _cp_val(field, data):
vec = data.get_field_parameter(f"cp_{ax}_vec")
tr = data[xn[0], f"relative_{xn[1]}"] * vec.d[0]
tr += data[yn[0], f"relative_{yn[1]}"] * vec.d[1]
tr += data[zn[0], f"relative_{zn[1]}"] * vec.d[2]
return tr
return _cp_val
for ax in "xyz":
registry.add_field(
(ftype, f"cutting_plane_{basename}_{ax}"),
sampling_type="local",
function=_cp_vectors(ax),
units=field_units,
)
def _divergence(field, data):
ds = div_fac * just_one(data["index", "dx"])
f = data[xn[0], f"relative_{xn[1]}"][sl_right, 1:-1, 1:-1] / ds
f -= data[xn[0], f"relative_{xn[1]}"][sl_left, 1:-1, 1:-1] / ds
ds = div_fac * just_one(data["index", "dy"])
f += data[yn[0], f"relative_{yn[1]}"][1:-1, sl_right, 1:-1] / ds
f -= data[yn[0], f"relative_{yn[1]}"][1:-1, sl_left, 1:-1] / ds
ds = div_fac * just_one(data["index", "dz"])
f += data[zn[0], f"relative_{zn[1]}"][1:-1, 1:-1, sl_right] / ds
f -= data[zn[0], f"relative_{zn[1]}"][1:-1, 1:-1, sl_left] / ds
new_field = data.ds.arr(np.zeros(data[xn].shape, dtype="f8"), str(f.units))
new_field[1:-1, 1:-1, 1:-1] = f
return new_field
def _divergence_abs(field, data):
return np.abs(data[ftype, f"{basename}_divergence"])
field_units = Unit(field_units, registry=registry.ds.unit_registry)
div_units = field_units / registry.ds.unit_system["length"]
registry.add_field(
(ftype, f"{basename}_divergence"),
sampling_type="local",
function=_divergence,
units=div_units,
validators=[ValidateSpatial(1), ValidateParameter(f"bulk_{basename}")],
)
registry.add_field(
(ftype, f"{basename}_divergence_absolute"),
sampling_type="local",
function=_divergence_abs,
units=div_units,
)
def _tangential_over_magnitude(field, data):
tr = (
data[ftype, f"tangential_{basename}"]
/ data[ftype, f"{basename}_magnitude"]
)
return np.abs(tr)
registry.add_field(
(ftype, f"tangential_over_{basename}_magnitude"),
sampling_type="local",
function=_tangential_over_magnitude,
take_log=False,
)
def _cylindrical_radius_component(field, data):
"""The cylindrical radius component of the vector field
Relative to the coordinate system defined by the *normal* vector,
*center*, and *bulk_* field parameters.
"""
normal = data.get_field_parameter("normal")
vectors = obtain_relative_velocity_vector(
data, (xn, yn, zn), f"bulk_{basename}"
)
theta = data["index", "cylindrical_theta"]
return get_cyl_r_component(vectors, theta, normal)
registry.add_field(
(ftype, f"{basename}_cylindrical_radius"),
sampling_type="local",
function=_cylindrical_radius_component,
units=field_units,
validators=[ValidateParameter("normal")],
)
def _cylindrical_theta_component(field, data):
"""The cylindrical theta component of the vector field
Relative to the coordinate system defined by the *normal* vector,
*center*, and *bulk_* field parameters.
"""
normal = data.get_field_parameter("normal")
vectors = obtain_relative_velocity_vector(
data, (xn, yn, zn), f"bulk_{basename}"
)
theta = data["index", "cylindrical_theta"].copy()
theta = np.tile(theta, (3,) + (1,) * len(theta.shape))
return get_cyl_theta_component(vectors, theta, normal)
registry.add_field(
(ftype, f"{basename}_cylindrical_theta"),
sampling_type="local",
function=_cylindrical_theta_component,
units=field_units,
validators=[
ValidateParameter("normal"),
ValidateParameter("center"),
ValidateParameter(f"bulk_{basename}"),
],
)
def _cylindrical_z_component(field, data):
"""The cylindrical z component of the vector field
Relative to the coordinate system defined by the *normal* vector,
*center*, and *bulk_* field parameters.
"""
normal = data.get_field_parameter("normal")
vectors = obtain_relative_velocity_vector(
data, (xn, yn, zn), f"bulk_{basename}"
)
return get_cyl_z_component(vectors, normal)
registry.add_field(
(ftype, f"{basename}_cylindrical_z"),
sampling_type="local",
function=_cylindrical_z_component,
units=field_units,
validators=[
ValidateParameter("normal"),
ValidateParameter("center"),
ValidateParameter(f"bulk_{basename}"),
],
)
elif (
geometry is Geometry.POLAR
or geometry is Geometry.CYLINDRICAL
or geometry is Geometry.SPHERICAL
): # Create Cartesian fields for curvilinear coordinates
if geometry is Geometry.POLAR:
def _cartesian_x(field, data):
return data[ftype, f"{basename}_r"] * np.cos(data[ftype, "theta"])
def _cartesian_y(field, data):
return data[ftype, f"{basename}_r"] * np.sin(data[ftype, "theta"])
def _cartesian_z(field, data):
return data[ftype, f"{basename}_z"]
elif geometry is Geometry.CYLINDRICAL:
def _cartesian_x(field, data):
if data.ds.dimensionality == 2:
return data[ftype, f"{basename}_r"]
elif data.ds.dimensionality == 3:
return data[ftype, f"{basename}_r"] * np.cos(
data[ftype, "theta"]
) - data[ftype, f"{basename}_theta"] * np.sin(data[ftype, "theta"])
def _cartesian_y(field, data):
if data.ds.dimensionality == 2:
return data[ftype, f"{basename}_z"]
elif data.ds.dimensionality == 3:
return data[ftype, f"{basename}_r"] * np.sin(
data[ftype, "theta"]
) + data[ftype, f"{basename}_theta"] * np.cos(data[ftype, "theta"])
def _cartesian_z(field, data):
return data[ftype, f"{basename}_z"]
elif geometry is Geometry.SPHERICAL:
def _cartesian_x(field, data):
if data.ds.dimensionality == 2:
return data[ftype, f"{basename}_r"] * np.sin(
data[ftype, "theta"]
) + data[ftype, f"{basename}_theta"] * np.cos(data[ftype, "theta"])
elif data.ds.dimensionality == 3:
return (
data[ftype, f"{basename}_r"]
* np.sin(data[ftype, "theta"])
* np.cos(data[ftype, "phi"])
+ data[ftype, f"{basename}_theta"]
* np.cos(data[ftype, "theta"])
* np.cos(data[ftype, "phi"])
- data[ftype, f"{basename}_phi"] * np.sin(data[ftype, "phi"])
)
def _cartesian_y(field, data):
if data.ds.dimensionality == 2:
return data[ftype, f"{basename}_r"] * np.cos(
data[ftype, "theta"]
) - data[f"{basename}_theta"] * np.sin(data[ftype, "theta"])
elif data.ds.dimensionality == 3:
return (
data[ftype, f"{basename}_r"]
* np.sin(data[ftype, "theta"])
* np.sin(data[ftype, "phi"])
+ data[ftype, f"{basename}_theta"]
* np.cos(data[ftype, "theta"])
* np.sin(data[ftype, "phi"])
+ data[ftype, f"{basename}_phi"] * np.cos(data[ftype, "phi"])
)
def _cartesian_z(field, data):
return data[ftype, f"{basename}_r"] * np.cos(
data[ftype, "theta"]
) - data[ftype, f"{basename}_theta"] * np.sin(data[ftype, "theta"])
else:
assert_never(geometry)
# it's redundant to define a cartesian x field for 1D data
if registry.ds.dimensionality >= 2:
registry.add_field(
(ftype, f"{basename}_cartesian_x"),
sampling_type="local",
function=_cartesian_x,
units=field_units,
display_field=True,
)
registry.add_field(
(ftype, f"{basename}_cartesian_y"),
sampling_type="local",
function=_cartesian_y,
units=field_units,
display_field=True,
)
registry.add_field(
(ftype, f"{basename}_cartesian_z"),
sampling_type="local",
function=_cartesian_z,
units=field_units,
display_field=True,
)
elif (
geometry is Geometry.GEOGRAPHIC
or geometry is Geometry.INTERNAL_GEOGRAPHIC
or geometry is Geometry.SPECTRAL_CUBE
):
# nothing to do
pass
else:
assert_never(geometry)
if registry.ds.geometry is Geometry.SPHERICAL:
def _cylindrical_radius_component(field, data):
return (
np.sin(data[ftype, "theta"]) * data[ftype, f"{basename}_r"]
+ np.cos(data[ftype, "theta"]) * data[ftype, f"{basename}_theta"]
)
registry.add_field(
(ftype, f"{basename}_cylindrical_radius"),
sampling_type="local",
function=_cylindrical_radius_component,
units=field_units,
display_field=True,
)
registry.alias(
(ftype, f"{basename}_cylindrical_z"),
(ftype, f"{basename}_cartesian_z"),
)
# define vector components appropriate for 'theta'-normal plots.
# The projection plane is called 'conic plane' in the code base as well as docs.
# Contrary to 'poloidal' and 'toroidal', this isn't a widely spread
# naming convention, but here it is exposed to users as part of dedicated
# field names, so it needs to be stable.
def _conic_x(field, data):
rax = axis_names.index("r")
pax = axis_names.index("phi")
bc = data.get_field_parameter(f"bulk_{basename}")
return np.cos(data[ftype, "phi"]) * (
data[ftype, f"{basename}_r"] - bc[rax]
) - np.sin(data[ftype, "phi"]) * (data[ftype, f"{basename}_phi"] - bc[pax])
def _conic_y(field, data):
rax = axis_names.index("r")
pax = axis_names.index("phi")
bc = data.get_field_parameter(f"bulk_{basename}")
return np.sin(data[ftype, "phi"]) * (
data[ftype, f"{basename}_r"] - bc[rax]
) + np.cos(data[ftype, "phi"]) * (data[ftype, f"{basename}_phi"] - bc[pax])
if registry.ds.dimensionality == 3:
registry.add_field(
(ftype, f"{basename}_conic_x"),
sampling_type="local",
function=_conic_x,
units=field_units,
display_field=True,
)
registry.add_field(
(ftype, f"{basename}_conic_y"),
sampling_type="local",
function=_conic_y,
units=field_units,
display_field=True,
)
[docs]
def create_averaged_field(
registry,
basename,
field_units,
ftype="gas",
slice_info=None,
validators=None,
weight="mass",
):
if validators is None:
validators = []
validators += [ValidateSpatial(1, [(ftype, basename)])]
def _averaged_field(field, data):
def atleast_4d(array):
if array.ndim == 3:
return array[..., None]
else:
return array
nx, ny, nz, ngrids = atleast_4d(data[ftype, basename]).shape
new_field = data.ds.arr(
np.zeros((nx - 2, ny - 2, nz - 2, ngrids), dtype=np.float64),
(just_one(data[ftype, basename]) * just_one(data[ftype, weight])).units,
)
weight_field = data.ds.arr(
np.zeros((nx - 2, ny - 2, nz - 2, ngrids), dtype=np.float64),
data[ftype, weight].units,
)
i_i, j_i, k_i = np.mgrid[0:3, 0:3, 0:3]
for i, j, k in zip(i_i.ravel(), j_i.ravel(), k_i.ravel(), strict=True):
sl = (
slice(i, nx - (2 - i)),
slice(j, ny - (2 - j)),
slice(k, nz - (2 - k)),
)
new_field += (
atleast_4d(data[ftype, basename])[sl]
* atleast_4d(data[ftype, weight])[sl]
)
weight_field += atleast_4d(data[ftype, weight])[sl]
# Now some fancy footwork
new_field2 = data.ds.arr(
np.zeros((nx, ny, nz, ngrids)), data[ftype, basename].units
)
new_field2[1:-1, 1:-1, 1:-1] = new_field / weight_field
if data[ftype, basename].ndim == 3:
return new_field2[..., 0]
else:
return new_field2
registry.add_field(
(ftype, f"averaged_{basename}"),
sampling_type="cell",
function=_averaged_field,
units=field_units,
validators=validators,
)