import hashlib
import itertools as it
import os
import pickle
import shutil
import sys
import tempfile
import unittest
from collections.abc import Callable, Mapping
from functools import wraps
from importlib.util import find_spec
from shutil import which
from typing import TYPE_CHECKING
from unittest import SkipTest
import matplotlib
import numpy as np
from more_itertools import always_iterable
from numpy.random import RandomState
from unyt.exceptions import UnitOperationError
from yt._maintenance.deprecation import issue_deprecation_warning
from yt.config import ytcfg
from yt.frontends.stream.data_structures import StreamParticlesDataset
from yt.funcs import is_sequence
from yt.loaders import load, load_particles
from yt.units.yt_array import YTArray, YTQuantity
if TYPE_CHECKING:
from collections.abc import Mapping
from yt._typing import AnyFieldKey
ANSWER_TEST_TAG = "answer_test"
# Expose assert_true and assert_less_equal from unittest.TestCase
# this is adopted from nose. Doing this here allows us to avoid importing
# nose at the top level.
def _deprecated_assert_func(func):
@wraps(func)
def retf(*args, **kwargs):
issue_deprecation_warning(
f"yt.testing.{func.__name__} is deprecated",
since="4.2",
stacklevel=3,
)
return func(*args, **kwargs)
return retf
class _Dummy(unittest.TestCase):
def nop(self):
pass
_t = _Dummy("nop")
assert_true = _deprecated_assert_func(_t.assertTrue)
assert_less_equal = _deprecated_assert_func(_t.assertLessEqual)
[docs]
def assert_rel_equal(a1, a2, decimals, err_msg="", verbose=True):
from numpy.testing import assert_almost_equal
# We have nan checks in here because occasionally we have fields that get
# weighted without non-zero weights. I'm looking at you, particle fields!
if isinstance(a1, np.ndarray):
assert a1.size == a2.size
# Mask out NaNs
assert (np.isnan(a1) == np.isnan(a2)).all()
a1[np.isnan(a1)] = 1.0
a2[np.isnan(a2)] = 1.0
# Mask out 0
ind1 = np.array(np.abs(a1) < np.finfo(a1.dtype).eps)
ind2 = np.array(np.abs(a2) < np.finfo(a2.dtype).eps)
assert (ind1 == ind2).all()
a1[ind1] = 1.0
a2[ind2] = 1.0
elif np.any(np.isnan(a1)) and np.any(np.isnan(a2)):
return True
if not isinstance(a1, np.ndarray) and a1 == a2 == 0.0:
# NANS!
a1 = a2 = 1.0
return assert_almost_equal(
np.array(a1) / np.array(a2), 1.0, decimals, err_msg=err_msg, verbose=verbose
)
# tested: volume integral is 1.
[docs]
def cubicspline_python(
x: float | np.ndarray,
) -> np.ndarray:
"""
cubic spline SPH kernel function for testing against more
effiecient cython methods
Parameters
----------
x:
impact parameter / smoothing length [dimenionless]
Returns
-------
value of the kernel function
"""
# C is 8/pi
_c = 8.0 / np.pi
x = np.asarray(x)
kernel = np.zeros(x.shape, dtype=x.dtype)
half1 = np.where(np.logical_and(x >= 0.0, x <= 0.5))
kernel[half1] = 1.0 - 6.0 * x[half1] ** 2 * (1.0 - x[half1])
half2 = np.where(np.logical_and(x > 0.5, x <= 1.0))
kernel[half2] = 2.0 * (1.0 - x[half2]) ** 3
return kernel * _c
[docs]
def integrate_kernel(
kernelfunc: Callable[[float], float], b: float, hsml: float
) -> float:
"""
integrates a kernel function over a line passing entirely
through it
Parameters:
-----------
kernelfunc:
the kernel function to integrate
b:
impact parameter
hsml:
smoothing length [same units as impact parameter]
Returns:
--------
the integral of the SPH kernel function.
units: 1 / units of b and hsml
"""
pre = 1.0 / hsml**2
x = b / hsml
xmax = np.sqrt(1.0 - x**2)
xmin = -1.0 * xmax
xe = np.linspace(xmin, xmax, 500) # shape: 500, x.shape
xc = 0.5 * (xe[:-1, ...] + xe[1:, ...])
dx = np.diff(xe, axis=0)
spv = kernelfunc(np.sqrt(xc**2 + x**2))
integral = np.sum(spv * dx, axis=0)
return pre * integral
_zeroperiods = np.array([0.0, 0.0, 0.0])
[docs]
def distancematrix(
pos3_i0: np.ndarray,
pos3_i1: np.ndarray,
periodic: tuple[bool, bool, bool] = (True,) * 3,
periods: np.ndarray = _zeroperiods,
) -> np.ndarray:
"""
Calculates the distances between two arrays of points.
Parameters:
----------
pos3_i0: shape (first number of points, 3)
positions of the first set of points. The second index is
for positions along the different cartesian axes
pos3_i1: shape (second number of points, 3)
as pos3_i0, but for the second set of points
periodic:
are the positions along each axis periodic (True) or not
periods:
the periods along each axis. Ignored if positions in a given
direction are not periodic.
Returns:
--------
a 2D-array of distances between postions `pos3_i0` (changes along
index 0) and `pos3_i1` (changes along index 1)
"""
d2 = np.zeros((len(pos3_i0), len(pos3_i1)), dtype=pos3_i0.dtype)
for ax in range(3):
# 'center on' pos3_i1
_d = pos3_i0[:, ax, np.newaxis] - pos3_i1[np.newaxis, :, ax]
if periodic[ax]:
_period = periods[ax]
_d += 0.5 * _period # center on half box size
_d %= _period # wrap coordinate to 0 -- boxsize range
_d -= 0.5 * _period # center back to zero
d2 += _d**2
return np.sqrt(d2)
[docs]
def amrspace(extent, levels=7, cells=8):
"""Creates two numpy arrays representing the left and right bounds of
an AMR grid as well as an array for the AMR level of each cell.
Parameters
----------
extent : array-like
This a sequence of length 2*ndims that is the bounds of each dimension.
For example, the 2D unit square would be given by [0.0, 1.0, 0.0, 1.0].
A 3D cylindrical grid may look like [0.0, 2.0, -1.0, 1.0, 0.0, 2*np.pi].
levels : int or sequence of ints, optional
This is the number of AMR refinement levels. If given as a sequence (of
length ndims), then each dimension will be refined down to this level.
All values in this array must be the same or zero. A zero valued dimension
indicates that this dim should not be refined. Taking the 3D cylindrical
example above if we don't want refine theta but want r and z at 5 we would
set levels=(5, 5, 0).
cells : int, optional
This is the number of cells per refinement level.
Returns
-------
left : float ndarray, shape=(npoints, ndims)
The left AMR grid points.
right : float ndarray, shape=(npoints, ndims)
The right AMR grid points.
level : int ndarray, shape=(npoints,)
The AMR level for each point.
Examples
--------
>>> l, r, lvl = amrspace([0.0, 2.0, 1.0, 2.0, 0.0, 3.14], levels=(3, 3, 0), cells=2)
>>> print(l)
[[ 0. 1. 0. ]
[ 0.25 1. 0. ]
[ 0. 1.125 0. ]
[ 0.25 1.125 0. ]
[ 0.5 1. 0. ]
[ 0. 1.25 0. ]
[ 0.5 1.25 0. ]
[ 1. 1. 0. ]
[ 0. 1.5 0. ]
[ 1. 1.5 0. ]]
"""
extent = np.asarray(extent, dtype="f8")
dextent = extent[1::2] - extent[::2]
ndims = len(dextent)
if isinstance(levels, int):
minlvl = maxlvl = levels
levels = np.array([levels] * ndims, dtype="int32")
else:
levels = np.asarray(levels, dtype="int32")
minlvl = levels.min()
maxlvl = levels.max()
if minlvl != maxlvl and (minlvl != 0 or {minlvl, maxlvl} != set(levels)):
raise ValueError("all levels must have the same value or zero.")
dims_zero = levels == 0
dims_nonzero = ~dims_zero
ndims_nonzero = dims_nonzero.sum()
npoints = (cells**ndims_nonzero - 1) * maxlvl + 1
left = np.empty((npoints, ndims), dtype="float64")
right = np.empty((npoints, ndims), dtype="float64")
level = np.empty(npoints, dtype="int32")
# fill zero dims
left[:, dims_zero] = extent[::2][dims_zero]
right[:, dims_zero] = extent[1::2][dims_zero]
# fill non-zero dims
dcell = 1.0 / cells
left_slice = tuple(
(
slice(extent[2 * n], extent[2 * n + 1], extent[2 * n + 1])
if dims_zero[n]
else slice(0.0, 1.0, dcell)
)
for n in range(ndims)
)
right_slice = tuple(
(
slice(extent[2 * n + 1], extent[2 * n], -extent[2 * n + 1])
if dims_zero[n]
else slice(dcell, 1.0 + dcell, dcell)
)
for n in range(ndims)
)
left_norm_grid = np.reshape(np.mgrid[left_slice].T.flat[ndims:], (-1, ndims))
lng_zero = left_norm_grid[:, dims_zero]
lng_nonzero = left_norm_grid[:, dims_nonzero]
right_norm_grid = np.reshape(np.mgrid[right_slice].T.flat[ndims:], (-1, ndims))
rng_zero = right_norm_grid[:, dims_zero]
rng_nonzero = right_norm_grid[:, dims_nonzero]
level[0] = maxlvl
left[0, :] = extent[::2]
right[0, dims_zero] = extent[1::2][dims_zero]
right[0, dims_nonzero] = (dcell**maxlvl) * dextent[dims_nonzero] + extent[::2][
dims_nonzero
]
for i, lvl in enumerate(range(maxlvl, 0, -1)):
start = (cells**ndims_nonzero - 1) * i + 1
stop = (cells**ndims_nonzero - 1) * (i + 1) + 1
dsize = dcell ** (lvl - 1) * dextent[dims_nonzero]
level[start:stop] = lvl
left[start:stop, dims_zero] = lng_zero
left[start:stop, dims_nonzero] = lng_nonzero * dsize + extent[::2][dims_nonzero]
right[start:stop, dims_zero] = rng_zero
right[start:stop, dims_nonzero] = (
rng_nonzero * dsize + extent[::2][dims_nonzero]
)
return left, right, level
def _check_field_unit_args_helper(args: dict, default_args: dict):
values = list(args.values())
keys = list(args.keys())
if all(v is None for v in values):
for key in keys:
args[key] = default_args[key]
elif None in values:
raise ValueError(
"Error in creating a fake dataset:"
f" either all or none of the following arguments need to specified: {keys}."
)
elif any(len(v) != len(values[0]) for v in values):
raise ValueError(
"Error in creating a fake dataset:"
f" all the following arguments must have the same length: {keys}."
)
return list(args.values())
_fake_random_ds_default_fields = ("density", "velocity_x", "velocity_y", "velocity_z")
_fake_random_ds_default_units = ("g/cm**3", "cm/s", "cm/s", "cm/s")
_fake_random_ds_default_negative = (False, False, False, False)
[docs]
def fake_random_ds(
ndims,
peak_value=1.0,
fields=None,
units=None,
particle_fields=None,
particle_field_units=None,
negative=False,
nprocs=1,
particles=0,
length_unit=1.0,
unit_system="cgs",
bbox=None,
default_species_fields=None,
):
from yt.loaders import load_uniform_grid
prng = RandomState(0x4D3D3D3)
if not is_sequence(ndims):
ndims = [ndims, ndims, ndims]
else:
assert len(ndims) == 3
if not is_sequence(negative):
if fields:
negative = [negative for f in fields]
else:
negative = None
fields, units, negative = _check_field_unit_args_helper(
{
"fields": fields,
"units": units,
"negative": negative,
},
{
"fields": _fake_random_ds_default_fields,
"units": _fake_random_ds_default_units,
"negative": _fake_random_ds_default_negative,
},
)
offsets = []
for n in negative:
if n:
offsets.append(0.5)
else:
offsets.append(0.0)
data = {}
for field, offset, u in zip(fields, offsets, units, strict=True):
v = (prng.random_sample(ndims) - offset) * peak_value
if field[0] == "all":
v = v.ravel()
data[field] = (v, u)
if particles:
if particle_fields is not None:
for field, unit in zip(particle_fields, particle_field_units, strict=True):
if field in ("particle_position", "particle_velocity"):
data["io", field] = (prng.random_sample((int(particles), 3)), unit)
else:
data["io", field] = (prng.random_sample(size=int(particles)), unit)
else:
for f in (f"particle_position_{ax}" for ax in "xyz"):
data["io", f] = (prng.random_sample(size=particles), "code_length")
for f in (f"particle_velocity_{ax}" for ax in "xyz"):
data["io", f] = (prng.random_sample(size=particles) - 0.5, "cm/s")
data["io", "particle_mass"] = (prng.random_sample(particles), "g")
ug = load_uniform_grid(
data,
ndims,
length_unit=length_unit,
nprocs=nprocs,
unit_system=unit_system,
bbox=bbox,
default_species_fields=default_species_fields,
)
return ug
_geom_transforms = {
# These are the bounds we want. Cartesian we just assume goes 0 .. 1.
"cartesian": ((0.0, 0.0, 0.0), (1.0, 1.0, 1.0)),
"spherical": ((0.0, 0.0, 0.0), (1.0, np.pi, 2 * np.pi)),
"cylindrical": ((0.0, 0.0, 0.0), (1.0, 1.0, 2.0 * np.pi)), # rzt
"polar": ((0.0, 0.0, 0.0), (1.0, 2.0 * np.pi, 1.0)), # rtz
"geographic": ((-90.0, -180.0, 0.0), (90.0, 180.0, 1000.0)), # latlonalt
"internal_geographic": ((-90.0, -180.0, 0.0), (90.0, 180.0, 1000.0)), # latlondep
"spectral_cube": ((0.0, 0.0, 0.0), (1.0, 1.0, 1.0)),
}
_fake_amr_ds_default_fields = ("Density",)
_fake_amr_ds_default_units = ("g/cm**3",)
[docs]
def fake_amr_ds(
fields=None,
units=None,
geometry="cartesian",
particles=0,
length_unit=None,
*,
domain_left_edge=None,
domain_right_edge=None,
):
from yt.loaders import load_amr_grids
fields, units = _check_field_unit_args_helper(
{
"fields": fields,
"units": units,
},
{
"fields": _fake_amr_ds_default_fields,
"units": _fake_amr_ds_default_units,
},
)
prng = RandomState(0x4D3D3D3)
default_LE, default_RE = _geom_transforms[geometry]
LE = np.array(domain_left_edge or default_LE, dtype="float64")
RE = np.array(domain_right_edge or default_RE, dtype="float64")
data = []
for gspec in _amr_grid_index:
level, left_edge, right_edge, dims = gspec
left_edge = left_edge * (RE - LE) + LE
right_edge = right_edge * (RE - LE) + LE
gdata = {
"level": level,
"left_edge": left_edge,
"right_edge": right_edge,
"dimensions": dims,
}
for f, u in zip(fields, units, strict=True):
gdata[f] = (prng.random_sample(dims), u)
if particles:
for i, f in enumerate(f"particle_position_{ax}" for ax in "xyz"):
pdata = prng.random_sample(particles)
pdata /= right_edge[i] - left_edge[i]
pdata += left_edge[i]
gdata["io", f] = (pdata, "code_length")
for f in (f"particle_velocity_{ax}" for ax in "xyz"):
gdata["io", f] = (prng.random_sample(particles) - 0.5, "cm/s")
gdata["io", "particle_mass"] = (prng.random_sample(particles), "g")
data.append(gdata)
bbox = np.array([LE, RE]).T
return load_amr_grids(
data, [32, 32, 32], geometry=geometry, bbox=bbox, length_unit=length_unit
)
_fake_particle_ds_default_fields = (
"particle_position_x",
"particle_position_y",
"particle_position_z",
"particle_mass",
"particle_velocity_x",
"particle_velocity_y",
"particle_velocity_z",
)
_fake_particle_ds_default_units = ("cm", "cm", "cm", "g", "cm/s", "cm/s", "cm/s")
_fake_particle_ds_default_negative = (False, False, False, False, True, True, True)
[docs]
def fake_particle_ds(
fields=None,
units=None,
negative=None,
npart=16**3,
length_unit=1.0,
data=None,
):
from yt.loaders import load_particles
prng = RandomState(0x4D3D3D3)
if negative is not None and not is_sequence(negative):
negative = [negative for f in fields]
fields, units, negative = _check_field_unit_args_helper(
{
"fields": fields,
"units": units,
"negative": negative,
},
{
"fields": _fake_particle_ds_default_fields,
"units": _fake_particle_ds_default_units,
"negative": _fake_particle_ds_default_negative,
},
)
offsets = []
for n in negative:
if n:
offsets.append(0.5)
else:
offsets.append(0.0)
data = data if data else {}
for field, offset, u in zip(fields, offsets, units, strict=True):
if field in data:
v = data[field]
continue
if "position" in field:
v = prng.normal(loc=0.5, scale=0.25, size=npart)
np.clip(v, 0.0, 1.0, v)
v = prng.random_sample(npart) - offset
data[field] = (v, u)
bbox = np.array([[0.0, 1.0], [0.0, 1.0], [0.0, 1.0]])
ds = load_particles(data, 1.0, bbox=bbox)
return ds
[docs]
def fake_tetrahedral_ds():
from yt.frontends.stream.sample_data.tetrahedral_mesh import (
_connectivity,
_coordinates,
)
from yt.loaders import load_unstructured_mesh
prng = RandomState(0x4D3D3D3)
# the distance from the origin
node_data = {}
dist = np.sum(_coordinates**2, 1)
node_data["connect1", "test"] = dist[_connectivity]
# each element gets a random number
elem_data = {}
elem_data["connect1", "elem"] = prng.rand(_connectivity.shape[0])
ds = load_unstructured_mesh(
_connectivity, _coordinates, node_data=node_data, elem_data=elem_data
)
return ds
[docs]
def fake_hexahedral_ds(fields=None):
from yt.frontends.stream.sample_data.hexahedral_mesh import (
_connectivity,
_coordinates,
)
from yt.loaders import load_unstructured_mesh
prng = RandomState(0x4D3D3D3)
# the distance from the origin
node_data = {}
dist = np.sum(_coordinates**2, 1)
node_data["connect1", "test"] = dist[_connectivity - 1]
for field in always_iterable(fields):
node_data["connect1", field] = dist[_connectivity - 1]
# each element gets a random number
elem_data = {}
elem_data["connect1", "elem"] = prng.rand(_connectivity.shape[0])
ds = load_unstructured_mesh(
_connectivity - 1, _coordinates, node_data=node_data, elem_data=elem_data
)
return ds
[docs]
def small_fake_hexahedral_ds():
from yt.loaders import load_unstructured_mesh
_coordinates = np.array(
[
[-1.0, -1.0, -1.0],
[0.0, -1.0, -1.0],
[-0.0, 0.0, -1.0],
[-1.0, -0.0, -1.0],
[-1.0, -1.0, 0.0],
[-0.0, -1.0, 0.0],
[-0.0, 0.0, -0.0],
[-1.0, 0.0, -0.0],
]
)
_connectivity = np.array([[1, 2, 3, 4, 5, 6, 7, 8]])
# the distance from the origin
node_data = {}
dist = np.sum(_coordinates**2, 1)
node_data["connect1", "test"] = dist[_connectivity - 1]
ds = load_unstructured_mesh(_connectivity - 1, _coordinates, node_data=node_data)
return ds
[docs]
def fake_stretched_ds(N=16):
from yt.loaders import load_uniform_grid
rng = np.random.default_rng(seed=0x4D3D3D3)
data = {"density": rng.random((N, N, N))}
cell_widths = []
for _ in range(3):
cw = rng.random(N)
cw /= cw.sum()
cell_widths.append(cw)
return load_uniform_grid(
data,
[N, N, N],
bbox=np.array([[0.0, 1.0], [0.0, 1.0], [0.0, 1.0]]),
cell_widths=cell_widths,
)
[docs]
def fake_vr_orientation_test_ds(N=96, scale=1):
"""
create a toy dataset that puts a sphere at (0,0,0), a single cube
on +x, two cubes on +y, and three cubes on +z in a domain from
[-1*scale,1*scale]**3. The lower planes
(x = -1*scale, y = -1*scale, z = -1*scale) are also given non-zero
values.
This dataset allows you to easily explore orientations and
handiness in VR and other renderings
Parameters
----------
N : integer
The number of cells along each direction
scale : float
A spatial scale, the domain boundaries will be multiplied by scale to
test datasets that have spatial different scales (e.g. data in CGS units)
"""
from yt.loaders import load_uniform_grid
xmin = ymin = zmin = -1.0 * scale
xmax = ymax = zmax = 1.0 * scale
dcoord = (xmax - xmin) / N
arr = np.zeros((N, N, N), dtype=np.float64)
arr[:, :, :] = 1.0e-4
bbox = np.array([[xmin, xmax], [ymin, ymax], [zmin, zmax]])
# coordinates -- in the notation data[i, j, k]
x = (np.arange(N) + 0.5) * dcoord + xmin
y = (np.arange(N) + 0.5) * dcoord + ymin
z = (np.arange(N) + 0.5) * dcoord + zmin
x3d, y3d, z3d = np.meshgrid(x, y, z, indexing="ij")
# sphere at the origin
c = np.array([0.5 * (xmin + xmax), 0.5 * (ymin + ymax), 0.5 * (zmin + zmax)])
r = np.sqrt((x3d - c[0]) ** 2 + (y3d - c[1]) ** 2 + (z3d - c[2]) ** 2)
arr[r < 0.05] = 1.0
arr[abs(x3d - xmin) < 2 * dcoord] = 0.3
arr[abs(y3d - ymin) < 2 * dcoord] = 0.3
arr[abs(z3d - zmin) < 2 * dcoord] = 0.3
# single cube on +x
xc = 0.75 * scale
dx = 0.05 * scale
idx = np.logical_and(
np.logical_and(x3d > xc - dx, x3d < xc + dx),
np.logical_and(
np.logical_and(y3d > -dx, y3d < dx), np.logical_and(z3d > -dx, z3d < dx)
),
)
arr[idx] = 1.0
# two cubes on +y
dy = 0.05 * scale
for yc in [0.65 * scale, 0.85 * scale]:
idx = np.logical_and(
np.logical_and(y3d > yc - dy, y3d < yc + dy),
np.logical_and(
np.logical_and(x3d > -dy, x3d < dy), np.logical_and(z3d > -dy, z3d < dy)
),
)
arr[idx] = 0.8
# three cubes on +z
dz = 0.05 * scale
for zc in [0.5 * scale, 0.7 * scale, 0.9 * scale]:
idx = np.logical_and(
np.logical_and(z3d > zc - dz, z3d < zc + dz),
np.logical_and(
np.logical_and(x3d > -dz, x3d < dz), np.logical_and(y3d > -dz, y3d < dz)
),
)
arr[idx] = 0.6
data = {"density": (arr, "g/cm**3")}
ds = load_uniform_grid(data, arr.shape, bbox=bbox)
return ds
[docs]
def fake_sph_orientation_ds():
"""Returns an in-memory SPH dataset useful for testing
This dataset should have one particle at the origin, one more particle
along the x axis, two along y, and three along z. All particles will
have non-overlapping smoothing regions with a radius of 0.25, masses of 1,
and densities of 1, and zero velocity.
"""
from yt import load_particles
npart = 7
# one particle at the origin, one particle along x-axis, two along y,
# three along z
data = {
"particle_position_x": (np.array([0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0]), "cm"),
"particle_position_y": (np.array([0.0, 0.0, 1.0, 2.0, 0.0, 0.0, 0.0]), "cm"),
"particle_position_z": (np.array([0.0, 0.0, 0.0, 0.0, 1.0, 2.0, 3.0]), "cm"),
"particle_mass": (np.ones(npart), "g"),
"particle_velocity_x": (np.zeros(npart), "cm/s"),
"particle_velocity_y": (np.zeros(npart), "cm/s"),
"particle_velocity_z": (np.zeros(npart), "cm/s"),
"smoothing_length": (0.25 * np.ones(npart), "cm"),
"density": (np.ones(npart), "g/cm**3"),
"temperature": (np.ones(npart), "K"),
}
bbox = np.array([[-4, 4], [-4, 4], [-4, 4]])
return load_particles(data=data, length_unit=1.0, bbox=bbox)
[docs]
def fake_sph_grid_ds(hsml_factor=1.0):
"""Returns an in-memory SPH dataset useful for testing
This dataset should have 27 particles with the particles arranged uniformly
on a 3D grid. The bottom left corner is (0.5,0.5,0.5) and the top right
corner is (2.5,2.5,2.5). All particles will have non-overlapping smoothing
regions with a radius of 0.05, masses of 1, and densities of 1, and zero
velocity.
"""
from yt import load_particles
npart = 27
x = np.empty(npart)
y = np.empty(npart)
z = np.empty(npart)
tot = 0
for i in range(0, 3):
for j in range(0, 3):
for k in range(0, 3):
x[tot] = i + 0.5
y[tot] = j + 0.5
z[tot] = k + 0.5
tot += 1
data = {
"particle_position_x": (x, "cm"),
"particle_position_y": (y, "cm"),
"particle_position_z": (z, "cm"),
"particle_mass": (np.ones(npart), "g"),
"particle_velocity_x": (np.zeros(npart), "cm/s"),
"particle_velocity_y": (np.zeros(npart), "cm/s"),
"particle_velocity_z": (np.zeros(npart), "cm/s"),
"smoothing_length": (0.05 * np.ones(npart) * hsml_factor, "cm"),
"density": (np.ones(npart), "g/cm**3"),
"temperature": (np.ones(npart), "K"),
}
bbox = np.array([[0, 3], [0, 3], [0, 3]])
return load_particles(data=data, length_unit=1.0, bbox=bbox)
[docs]
def constantmass(i: int, j: int, k: int) -> float:
return 1.0
_xhat = np.array([1, 0, 0])
_yhat = np.array([0, 1, 0])
_zhat = np.array([0, 0, 1])
_floathalves = 0.5 * np.ones((3,), dtype=np.float64)
[docs]
def fake_sph_flexible_grid_ds(
hsml_factor: float = 1.0,
nperside: int = 3,
periodic: bool = True,
e1hat: np.ndarray = _xhat,
e2hat: np.ndarray = _yhat,
e3hat: np.ndarray = _zhat,
offsets: np.ndarray = _floathalves,
massgenerator: Callable[[int, int, int], float] = constantmass,
unitrho: float = 1.0,
bbox: np.ndarray | None = None,
recenter: np.ndarray | None = None,
) -> StreamParticlesDataset:
"""Returns an in-memory SPH dataset useful for testing
Parameters:
-----------
hsml_factor:
all particles have smoothing lengths of `hsml_factor` * 0.5
nperside:
the dataset will have `nperside`**3 particles, arranged
uniformly on a 3D grid
periodic:
are the positions taken to be periodic? (applies to all
coordinate axes)
e1hat: shape (3,)
the first basis vector defining the 3D grid. If the basis
vectors are not normalized to 1 or not orthogonal, the spacing
or overlap between SPH particles will be affected, but this is
allowed.
e2hat: shape (3,)
the second basis vector defining the 3D grid. (See `e1hat`.)
e3hat: shape (3,)
the third basis vector defining the 3D grid. (See `e1hat`.)
offsets: shape (3,)
the the zero point of the 3D grid along each coordinate axis
massgenerator:
a function assigning a mass to each particle, as a function of
the e[1-3]hat indices, in order
unitrho:
defines the density for a particle with mass 1 ('g'), and the
standard (uniform) grid `hsml_factor`.
bbox: if np.ndarray, shape is (2, 3)
the assumed enclosing volume of the particles. Should enclose
all the coordinate values. If not specified, a bbox is defined
which encloses all coordinates values with a margin. If
`periodic`, the size of the `bbox` along each coordinate is
also the period along that axis.
recenter:
if not `None`, after generating the grid, the positions are
periodically shifted to move the old center to this positions.
Useful for testing periodicity handling.
This shift is relative to the halfway positions of the bbox
edges.
Returns:
--------
A `StreamParticlesDataset` object with particle positions, masses,
velocities (zero), smoothing lengths, and densities specified.
Values are in cgs units.
"""
npart = nperside**3
pos = np.empty((npart, 3), dtype=np.float64)
mass = np.empty((npart,), dtype=np.float64)
for i in range(0, nperside):
for j in range(0, nperside):
for k in range(0, nperside):
_pos = (
(offsets[0] + i) * e1hat
+ (offsets[1] + j) * e2hat
+ (offsets[2] + k) * e3hat
)
ind = nperside**2 * i + nperside * j + k
pos[ind, :] = _pos
mass[ind] = massgenerator(i, j, k)
rho = unitrho * mass
if bbox is None:
eps = 1e-3
margin = (1.0 + eps) * hsml_factor
bbox = np.array(
[
[np.min(pos[:, 0]) - margin, np.max(pos[:, 0]) + margin],
[np.min(pos[:, 1]) - margin, np.max(pos[:, 1]) + margin],
[np.min(pos[:, 2]) - margin, np.max(pos[:, 2]) + margin],
]
)
if recenter is not None:
periods = bbox[:, 1] - bbox[:, 0]
# old center -> new position
pos += -0.5 * periods[np.newaxis, :] + recenter[np.newaxis, :]
# wrap coordinates -> all in [0, boxsize) range
pos %= periods[np.newaxis, :]
# shift back to original bbox range
pos += (bbox[:, 0])[np.newaxis, :]
if not periodic:
# remove points outside bbox to avoid errors:
okinds = np.ones(len(mass), dtype=bool)
for ax in [0, 1, 2]:
okinds &= pos[:, ax] < bbox[ax, 1]
okinds &= pos[:, ax] >= bbox[ax, 0]
npart = sum(okinds)
else:
okinds = np.ones((npart,), dtype=bool)
data: Mapping[AnyFieldKey, tuple[np.ndarray, str]] = {
"particle_position_x": (np.copy(pos[okinds, 0]), "cm"),
"particle_position_y": (np.copy(pos[okinds, 1]), "cm"),
"particle_position_z": (np.copy(pos[okinds, 2]), "cm"),
"particle_mass": (np.copy(mass[okinds]), "g"),
"particle_velocity_x": (np.zeros(npart), "cm/s"),
"particle_velocity_y": (np.zeros(npart), "cm/s"),
"particle_velocity_z": (np.zeros(npart), "cm/s"),
"smoothing_length": (np.ones(npart) * 0.5 * hsml_factor, "cm"),
"density": (np.copy(rho[okinds]), "g/cm**3"),
}
ds = load_particles(
data=data,
bbox=bbox,
periodicity=(periodic,) * 3,
length_unit=1.0,
mass_unit=1.0,
time_unit=1.0,
velocity_unit=1.0,
)
ds.kernel_name = "cubic"
return ds
[docs]
def fake_random_sph_ds(
npart: int,
bbox: np.ndarray,
periodic: bool | tuple[bool, bool, bool] = True,
massrange: tuple[float, float] = (0.5, 2.0),
hsmlrange: tuple[float, float] = (0.5, 2.0),
unitrho: float = 1.0,
) -> StreamParticlesDataset:
"""Returns an in-memory SPH dataset useful for testing
Parameters:
-----------
npart:
number of particles to generate
bbox: shape: (3, 2), units: "cm"
the assumed enclosing volume of the particles. Particle
positions are drawn uniformly from these ranges.
periodic:
are the positions taken to be periodic? If a single value,
that value is applied to all axes
massrange:
particle masses are drawn uniformly from this range (unit: "g")
hsmlrange: units: "cm"
particle smoothing lengths are drawn uniformly from this range
unitrho:
defines the density for a particle with mass 1 ("g"), and
smoothing length 1 ("cm").
Returns:
--------
A `StreamParticlesDataset` object with particle positions, masses,
velocities (zero), smoothing lengths, and densities specified.
Values are in cgs units.
"""
if not hasattr(periodic, "__len__"):
periodic = (periodic,) * 3
gen = np.random.default_rng(seed=0)
posx = gen.uniform(low=bbox[0][0], high=bbox[0][1], size=npart)
posy = gen.uniform(low=bbox[1][0], high=bbox[1][1], size=npart)
posz = gen.uniform(low=bbox[2][0], high=bbox[2][1], size=npart)
mass = gen.uniform(low=massrange[0], high=massrange[1], size=npart)
hsml = gen.uniform(low=hsmlrange[0], high=hsmlrange[1], size=npart)
dens = mass / hsml**3 * unitrho
data: Mapping[AnyFieldKey, tuple[np.ndarray, str]] = {
"particle_position_x": (posx, "cm"),
"particle_position_y": (posy, "cm"),
"particle_position_z": (posz, "cm"),
"particle_mass": (mass, "g"),
"particle_velocity_x": (np.zeros(npart), "cm/s"),
"particle_velocity_y": (np.zeros(npart), "cm/s"),
"particle_velocity_z": (np.zeros(npart), "cm/s"),
"smoothing_length": (hsml, "cm"),
"density": (dens, "g/cm**3"),
}
ds = load_particles(
data=data,
bbox=bbox,
periodicity=periodic,
length_unit=1.0,
mass_unit=1.0,
time_unit=1.0,
velocity_unit=1.0,
)
ds.kernel_name = "cubic"
return ds
[docs]
def construct_octree_mask(prng=RandomState(0x1D3D3D3), refined=None): # noqa B008
# Implementation taken from url:
# http://docs.hyperion-rt.org/en/stable/advanced/indepth_oct.html
if refined in (None, True):
refined = [True]
if not refined:
refined = [False]
return refined
# Loop over subcells
for _ in range(8):
# Insert criterion for whether cell should be sub-divided. Here we
# just use a random number to demonstrate.
divide = prng.random_sample() < 0.12
# Append boolean to overall list
refined.append(divide)
# If the cell is sub-divided, recursively divide it further
if divide:
construct_octree_mask(prng, refined)
return refined
[docs]
def fake_octree_ds(
prng=RandomState(0x4D3D3D3), # noqa B008
refined=None,
quantities=None,
bbox=None,
sim_time=0.0,
length_unit=None,
mass_unit=None,
time_unit=None,
velocity_unit=None,
magnetic_unit=None,
periodicity=(True, True, True),
num_zones=2,
partial_coverage=1,
unit_system="cgs",
):
from yt.loaders import load_octree
octree_mask = np.asarray(
construct_octree_mask(prng=prng, refined=refined), dtype=np.uint8
)
particles = np.sum(np.invert(octree_mask))
if quantities is None:
quantities = {}
quantities["gas", "density"] = prng.random_sample((particles, 1))
quantities["gas", "velocity_x"] = prng.random_sample((particles, 1))
quantities["gas", "velocity_y"] = prng.random_sample((particles, 1))
quantities["gas", "velocity_z"] = prng.random_sample((particles, 1))
ds = load_octree(
octree_mask=octree_mask,
data=quantities,
bbox=bbox,
sim_time=sim_time,
length_unit=length_unit,
mass_unit=mass_unit,
time_unit=time_unit,
velocity_unit=velocity_unit,
magnetic_unit=magnetic_unit,
periodicity=periodicity,
partial_coverage=partial_coverage,
num_zones=num_zones,
unit_system=unit_system,
)
return ds
[docs]
def add_noise_fields(ds):
"""Add 4 classes of noise fields to a dataset"""
prng = RandomState(0x4D3D3D3)
def _binary_noise(field, data):
"""random binary data"""
return prng.randint(low=0, high=2, size=data.size).astype("float64")
def _positive_noise(field, data):
"""random strictly positive data"""
return prng.random_sample(data.size) + 1e-16
def _negative_noise(field, data):
"""random negative data"""
return -prng.random_sample(data.size)
def _even_noise(field, data):
"""random data with mixed signs"""
return 2 * prng.random_sample(data.size) - 1
ds.add_field(("gas", "noise0"), _binary_noise, sampling_type="cell")
ds.add_field(("gas", "noise1"), _positive_noise, sampling_type="cell")
ds.add_field(("gas", "noise2"), _negative_noise, sampling_type="cell")
ds.add_field(("gas", "noise3"), _even_noise, sampling_type="cell")
[docs]
def expand_keywords(keywords, full=False):
"""
expand_keywords is a means for testing all possible keyword
arguments in the nosetests. Simply pass it a dictionary of all the
keyword arguments and all of the values for these arguments that you
want to test.
It will return a list of kwargs dicts containing combinations of
the various kwarg values you passed it. These can then be passed
to the appropriate function in nosetests.
If full=True, then every possible combination of keywords is produced,
otherwise, every keyword option is included at least once in the output
list. Be careful, by using full=True, you may be in for an exponentially
larger number of tests!
Parameters
----------
keywords : dict
a dictionary where the keys are the keywords for the function,
and the values of each key are the possible values that this key
can take in the function
full : bool
if set to True, every possible combination of given keywords is
returned
Returns
-------
array of dicts
An array of dictionaries to be individually passed to the appropriate
function matching these kwargs.
Examples
--------
>>> keywords = {}
>>> keywords["dpi"] = (50, 100, 200)
>>> keywords["cmap"] = ("cmyt.arbre", "cmyt.kelp")
>>> list_of_kwargs = expand_keywords(keywords)
>>> print(list_of_kwargs)
array([{'cmap': 'cmyt.arbre', 'dpi': 50},
{'cmap': 'cmyt.kelp', 'dpi': 100},
{'cmap': 'cmyt.arbre', 'dpi': 200}], dtype=object)
>>> list_of_kwargs = expand_keywords(keywords, full=True)
>>> print(list_of_kwargs)
array([{'cmap': 'cmyt.arbre', 'dpi': 50},
{'cmap': 'cmyt.arbre', 'dpi': 100},
{'cmap': 'cmyt.arbre', 'dpi': 200},
{'cmap': 'cmyt.kelp', 'dpi': 50},
{'cmap': 'cmyt.kelp', 'dpi': 100},
{'cmap': 'cmyt.kelp', 'dpi': 200}], dtype=object)
>>> for kwargs in list_of_kwargs:
... write_projection(*args, **kwargs)
"""
issue_deprecation_warning(
"yt.testing.expand_keywords is deprecated", since="4.2", stacklevel=3
)
# if we want every possible combination of keywords, use iter magic
if full:
keys = sorted(keywords)
list_of_kwarg_dicts = np.array(
[
dict(zip(keys, prod, strict=True))
for prod in it.product(*(keywords[key] for key in keys))
]
)
# if we just want to probe each keyword, but not necessarily every
# combination
else:
# Determine the maximum number of values any of the keywords has
num_lists = 0
for val in keywords.values():
if isinstance(val, str):
num_lists = max(1.0, num_lists)
else:
num_lists = max(len(val), num_lists)
# Construct array of kwargs dicts, each element of the list is a different
# **kwargs dict. each kwargs dict gives a different combination of
# the possible values of the kwargs
# initialize array
list_of_kwarg_dicts = np.array([{} for x in range(num_lists)])
# fill in array
for i in np.arange(num_lists):
list_of_kwarg_dicts[i] = {}
for key in keywords.keys():
# if it's a string, use it (there's only one)
if isinstance(keywords[key], str):
list_of_kwarg_dicts[i][key] = keywords[key]
# if there are more options, use the i'th val
elif i < len(keywords[key]):
list_of_kwarg_dicts[i][key] = keywords[key][i]
# if there are not more options, use the 0'th val
else:
list_of_kwarg_dicts[i][key] = keywords[key][0]
return list_of_kwarg_dicts
[docs]
def skip(reason: str):
# a drop-in replacement for pytest.mark.skip decorator with nose-compatibility
def dec(func):
@wraps(func)
def wrapper(*args, **kwargs):
raise SkipTest(reason)
return wrapper
return dec
[docs]
def skipif(condition: bool, reason: str):
# a drop-in replacement for pytest.mark.skipif decorator with nose-compatibility
def dec(func):
if condition:
return skip(reason)(func)
else:
return func
return dec
[docs]
def requires_module(module):
"""
Decorator that takes a module name as an argument and tries to import it.
If the module imports without issue, the function is returned, but if not,
a null function is returned. This is so tests that depend on certain modules
being imported will not fail if the module is not installed on the testing
platform.
"""
return skipif(find_spec(module) is None, reason=f"Missing required module {module}")
[docs]
def requires_module_pytest(*module_names):
"""
This is a replacement for yt.testing.requires_module that's
compatible with pytest, and accepts an arbitrary number of requirements to
avoid stacking decorators
Important: this is meant to decorate test functions only, it won't work as a
decorator to fixture functions.
It's meant to be imported as
>>> from yt.testing import requires_module_pytest as requires_module
So that it can be later renamed to `requires_module`.
"""
# note: import pytest here so that it is not a hard requirement for
# importing yt.testing see https://github.com/yt-project/yt/issues/4507
import pytest
def deco(func):
missing = [name for name in module_names if find_spec(name) is None]
# note that order between these two decorators matters
@pytest.mark.skipif(
missing,
reason=f"missing requirement(s): {', '.join(missing)}",
)
@wraps(func)
def inner_func(*args, **kwargs):
return func(*args, **kwargs)
return inner_func
return deco
[docs]
def requires_file(req_file):
condition = (
not os.path.exists(req_file)
and not os.path.exists(os.path.join(ytcfg.get("yt", "test_data_dir"), req_file))
and not ytcfg.get("yt", "internals", "strict_requires")
)
return skipif(condition, reason=f"Missing required file {req_file}")
[docs]
def disable_dataset_cache(func):
@wraps(func)
def newfunc(*args, **kwargs):
restore_cfg_state = False
if not ytcfg.get("yt", "skip_dataset_cache"):
ytcfg["yt", "skip_dataset_cache"] = True
restore_cfg_state = True
rv = func(*args, **kwargs)
if restore_cfg_state:
ytcfg["yt", "skip_dataset_cache"] = False
return rv
return newfunc
[docs]
@disable_dataset_cache
def units_override_check(fn):
from numpy.testing import assert_equal
units_list = ["length", "time", "mass", "velocity", "magnetic", "temperature"]
ds1 = load(fn)
units_override = {}
attrs1 = []
attrs2 = []
for u in units_list:
unit_attr = getattr(ds1, f"{u}_unit", None)
if unit_attr is not None:
attrs1.append(unit_attr)
units_override[f"{u}_unit"] = (unit_attr.v, unit_attr.units)
del ds1
ds2 = load(fn, units_override=units_override)
assert len(ds2.units_override) > 0
for u in units_list:
unit_attr = getattr(ds2, f"{u}_unit", None)
if unit_attr is not None:
attrs2.append(unit_attr)
assert_equal(attrs1, attrs2)
# This is an export of the 40 grids in IsolatedGalaxy that are of level 4 or
# lower. It's just designed to give a sample AMR index to deal with.
_amr_grid_index = [
[0, [0.0, 0.0, 0.0], [1.0, 1.0, 1.0], [32, 32, 32]],
[1, [0.25, 0.21875, 0.25], [0.5, 0.5, 0.5], [16, 18, 16]],
[1, [0.5, 0.21875, 0.25], [0.75, 0.5, 0.5], [16, 18, 16]],
[1, [0.21875, 0.5, 0.25], [0.5, 0.75, 0.5], [18, 16, 16]],
[1, [0.5, 0.5, 0.25], [0.75, 0.75, 0.5], [16, 16, 16]],
[1, [0.25, 0.25, 0.5], [0.5, 0.5, 0.75], [16, 16, 16]],
[1, [0.5, 0.25, 0.5], [0.75, 0.5, 0.75], [16, 16, 16]],
[1, [0.25, 0.5, 0.5], [0.5, 0.75, 0.75], [16, 16, 16]],
[1, [0.5, 0.5, 0.5], [0.75, 0.75, 0.75], [16, 16, 16]],
[2, [0.5, 0.5, 0.5], [0.71875, 0.71875, 0.71875], [28, 28, 28]],
[3, [0.5, 0.5, 0.5], [0.6640625, 0.65625, 0.6796875], [42, 40, 46]],
[4, [0.5, 0.5, 0.5], [0.59765625, 0.6015625, 0.6015625], [50, 52, 52]],
[2, [0.28125, 0.5, 0.5], [0.5, 0.734375, 0.71875], [28, 30, 28]],
[3, [0.3359375, 0.5, 0.5], [0.5, 0.671875, 0.6640625], [42, 44, 42]],
[4, [0.40625, 0.5, 0.5], [0.5, 0.59765625, 0.59765625], [48, 50, 50]],
[2, [0.5, 0.28125, 0.5], [0.71875, 0.5, 0.71875], [28, 28, 28]],
[3, [0.5, 0.3359375, 0.5], [0.671875, 0.5, 0.6640625], [44, 42, 42]],
[4, [0.5, 0.40625, 0.5], [0.6015625, 0.5, 0.59765625], [52, 48, 50]],
[2, [0.28125, 0.28125, 0.5], [0.5, 0.5, 0.71875], [28, 28, 28]],
[3, [0.3359375, 0.3359375, 0.5], [0.5, 0.5, 0.671875], [42, 42, 44]],
[
4,
[0.46484375, 0.37890625, 0.50390625],
[0.4765625, 0.390625, 0.515625],
[6, 6, 6],
],
[4, [0.40625, 0.40625, 0.5], [0.5, 0.5, 0.59765625], [48, 48, 50]],
[2, [0.5, 0.5, 0.28125], [0.71875, 0.71875, 0.5], [28, 28, 28]],
[3, [0.5, 0.5, 0.3359375], [0.6796875, 0.6953125, 0.5], [46, 50, 42]],
[4, [0.5, 0.5, 0.40234375], [0.59375, 0.6015625, 0.5], [48, 52, 50]],
[2, [0.265625, 0.5, 0.28125], [0.5, 0.71875, 0.5], [30, 28, 28]],
[3, [0.3359375, 0.5, 0.328125], [0.5, 0.65625, 0.5], [42, 40, 44]],
[4, [0.40234375, 0.5, 0.40625], [0.5, 0.60546875, 0.5], [50, 54, 48]],
[2, [0.5, 0.265625, 0.28125], [0.71875, 0.5, 0.5], [28, 30, 28]],
[3, [0.5, 0.3203125, 0.328125], [0.6640625, 0.5, 0.5], [42, 46, 44]],
[4, [0.5, 0.3984375, 0.40625], [0.546875, 0.5, 0.5], [24, 52, 48]],
[4, [0.546875, 0.41796875, 0.4453125], [0.5625, 0.4375, 0.5], [8, 10, 28]],
[4, [0.546875, 0.453125, 0.41796875], [0.5546875, 0.48046875, 0.4375], [4, 14, 10]],
[4, [0.546875, 0.4375, 0.4375], [0.609375, 0.5, 0.5], [32, 32, 32]],
[4, [0.546875, 0.4921875, 0.41796875], [0.56640625, 0.5, 0.4375], [10, 4, 10]],
[
4,
[0.546875, 0.48046875, 0.41796875],
[0.5703125, 0.4921875, 0.4375],
[12, 6, 10],
],
[4, [0.55859375, 0.46875, 0.43359375], [0.5703125, 0.48046875, 0.4375], [6, 6, 2]],
[2, [0.265625, 0.28125, 0.28125], [0.5, 0.5, 0.5], [30, 28, 28]],
[3, [0.328125, 0.3359375, 0.328125], [0.5, 0.5, 0.5], [44, 42, 44]],
[4, [0.4140625, 0.40625, 0.40625], [0.5, 0.5, 0.5], [44, 48, 48]],
]
[docs]
def check_results(func):
r"""This is a decorator for a function to verify that the (numpy ndarray)
result of a function is what it should be.
This function is designed to be used for very light answer testing.
Essentially, it wraps around a larger function that returns a numpy array,
and that has results that should not change. It is not necessarily used
inside the testing scripts themselves, but inside testing scripts written
by developers during the testing of pull requests and new functionality.
If a hash is specified, it "wins" and the others are ignored. Otherwise,
tolerance is 1e-8 (just above single precision.)
The correct results will be stored if the command line contains
--answer-reference , and otherwise it will compare against the results on
disk. The filename will be func_results_ref_FUNCNAME.cpkl where FUNCNAME
is the name of the function being tested.
If you would like more control over the name of the pickle file the results
are stored in, you can pass the result_basename keyword argument to the
function you are testing. The check_results decorator will use the value
of the keyword to construct the filename of the results data file. If
result_basename is not specified, the name of the testing function is used.
This will raise an exception if the results are not correct.
Examples
--------
>>> @check_results
... def my_func(ds):
... return ds.domain_width
>>> my_func(ds)
>>> @check_results
... def field_checker(dd, field_name):
... return dd[field_name]
>>> field_checker(ds.all_data(), "density", result_basename="density")
"""
def compute_results(func):
@wraps(func)
def _func(*args, **kwargs):
name = kwargs.pop("result_basename", func.__name__)
rv = func(*args, **kwargs)
if hasattr(rv, "convert_to_base"):
rv.convert_to_base()
_rv = rv.ndarray_view()
else:
_rv = rv
mi = _rv.min()
ma = _rv.max()
st = _rv.std(dtype="float64")
su = _rv.sum(dtype="float64")
si = _rv.size
ha = hashlib.md5(_rv.tobytes()).hexdigest()
fn = f"func_results_ref_{name}.cpkl"
with open(fn, "wb") as f:
pickle.dump((mi, ma, st, su, si, ha), f)
return rv
return _func
import yt.startup_tasks as _startup_tasks
unparsed_args = _startup_tasks.unparsed_args
if "--answer-reference" in unparsed_args:
return compute_results(func)
def compare_results(func):
@wraps(func)
def _func(*args, **kwargs):
from numpy.testing import assert_allclose, assert_equal
name = kwargs.pop("result_basename", func.__name__)
rv = func(*args, **kwargs)
if hasattr(rv, "convert_to_base"):
rv.convert_to_base()
_rv = rv.ndarray_view()
else:
_rv = rv
vals = (
_rv.min(),
_rv.max(),
_rv.std(dtype="float64"),
_rv.sum(dtype="float64"),
_rv.size,
hashlib.md5(_rv.tobytes()).hexdigest(),
)
fn = f"func_results_ref_{name}.cpkl"
if not os.path.exists(fn):
print("Answers need to be created with --answer-reference .")
return False
with open(fn, "rb") as f:
ref = pickle.load(f)
print(f"Sizes: {vals[4] == ref[4]} ({vals[4]}, {ref[4]})")
assert_allclose(vals[0], ref[0], 1e-8, err_msg="min")
assert_allclose(vals[1], ref[1], 1e-8, err_msg="max")
assert_allclose(vals[2], ref[2], 1e-8, err_msg="std")
assert_allclose(vals[3], ref[3], 1e-8, err_msg="sum")
assert_equal(vals[4], ref[4])
print("Hashes equal: %s" % (vals[-1] == ref[-1]))
return rv
return _func
return compare_results(func)
[docs]
def periodicity_cases(ds):
# This is a generator that yields things near the corners. It's good for
# getting different places to check periodicity.
yield (ds.domain_left_edge + ds.domain_right_edge) / 2.0
dx = ds.domain_width / ds.domain_dimensions
# We start one dx in, and only go to one in as well.
for i in (1, ds.domain_dimensions[0] - 2):
for j in (1, ds.domain_dimensions[1] - 2):
for k in (1, ds.domain_dimensions[2] - 2):
center = dx * np.array([i, j, k]) + ds.domain_left_edge
yield center
[docs]
def run_nose(
verbose=False,
run_answer_tests=False,
answer_big_data=False,
call_pdb=False,
module=None,
):
issue_deprecation_warning(
"yt.run_nose (aka yt.testing.run_nose) is deprecated. "
"Please do not rely on this function as it will be removed "
"in the process of migrating yt tests from nose to pytest.",
stacklevel=3,
since="4.1",
)
from yt.utilities.logger import ytLogger as mylog
from yt.utilities.on_demand_imports import _nose
orig_level = mylog.getEffectiveLevel()
mylog.setLevel(50)
nose_argv = sys.argv
nose_argv += ["--exclude=answer_testing", "--detailed-errors", "--exe"]
if call_pdb:
nose_argv += ["--pdb", "--pdb-failures"]
if verbose:
nose_argv.append("-v")
if run_answer_tests:
nose_argv.append("--with-answer-testing")
if answer_big_data:
nose_argv.append("--answer-big-data")
if module:
nose_argv.append(module)
initial_dir = os.getcwd()
yt_file = os.path.abspath(__file__)
yt_dir = os.path.dirname(yt_file)
if os.path.samefile(os.path.dirname(yt_dir), initial_dir):
# Provide a nice error message to work around nose bug
# see https://github.com/nose-devs/nose/issues/701
raise RuntimeError(
"""
The yt.run_nose function does not work correctly when invoked in
the same directory as the installed yt package. Try starting
a python session in a different directory before invoking yt.run_nose
again. Alternatively, you can also run the "nosetests" executable in
the current directory like so:
$ nosetests
"""
)
os.chdir(yt_dir)
try:
_nose.run(argv=nose_argv)
finally:
os.chdir(initial_dir)
mylog.setLevel(orig_level)
[docs]
def assert_allclose_units(actual, desired, rtol=1e-7, atol=0, **kwargs):
"""Raise an error if two objects are not equal up to desired tolerance
This is a wrapper for :func:`numpy.testing.assert_allclose` that also
verifies unit consistency
Parameters
----------
actual : array-like
Array obtained (possibly with attached units)
desired : array-like
Array to compare with (possibly with attached units)
rtol : float, optional
Relative tolerance, defaults to 1e-7
atol : float or quantity, optional
Absolute tolerance. If units are attached, they must be consistent
with the units of ``actual`` and ``desired``. If no units are attached,
assumes the same units as ``desired``. Defaults to zero.
Notes
-----
Also accepts additional keyword arguments accepted by
:func:`numpy.testing.assert_allclose`, see the documentation of that
function for details.
"""
from numpy.testing import assert_allclose
# Create a copy to ensure this function does not alter input arrays
act = YTArray(actual)
des = YTArray(desired)
try:
des = des.in_units(act.units)
except UnitOperationError as e:
raise AssertionError(
f"Units of actual ({act.units}) and desired ({des.units}) "
"do not have equivalent dimensions"
) from e
rt = YTArray(rtol)
if not rt.units.is_dimensionless:
raise AssertionError(f"Units of rtol ({rt.units}) are not dimensionless")
if not isinstance(atol, YTArray):
at = YTQuantity(atol, des.units)
try:
at = at.in_units(act.units)
except UnitOperationError as e:
raise AssertionError(
f"Units of atol ({at.units}) and actual ({act.units}) "
"do not have equivalent dimensions"
) from e
# units have been validated, so we strip units before calling numpy
# to avoid spurious errors
act = act.value
des = des.value
rt = rt.value
at = at.value
return assert_allclose(act, des, rt, at, **kwargs)
[docs]
def assert_fname(fname):
"""Function that checks file type using libmagic"""
if fname is None:
return
with open(fname, "rb") as fimg:
data = fimg.read()
image_type = ""
# see http://www.w3.org/TR/PNG/#5PNG-file-signature
if data.startswith(b"\211PNG\r\n\032\n"):
image_type = ".png"
# see http://www.mathguide.de/info/tools/media-types/image/jpeg
elif data.startswith(b"\377\330"):
image_type = ".jpeg"
elif data.startswith(b"%!PS-Adobe"):
data_str = data.decode("utf-8", "ignore")
if "EPSF" in data_str[: data_str.index("\n")]:
image_type = ".eps"
else:
image_type = ".ps"
elif data.startswith(b"%PDF"):
image_type = ".pdf"
extension = os.path.splitext(fname)[1]
assert image_type == extension, (
f"Expected an image of type {extension!r} but {fname!r} "
"is an image of type {image_type!r}"
)
[docs]
def requires_backend(backend):
"""Decorator to check for a specified matplotlib backend.
This decorator returns the decorated function if the specified `backend`
is same as of `matplotlib.get_backend()`, otherwise returns null function.
It could be used to execute function only when a particular `backend` of
matplotlib is being used.
Parameters
----------
backend : String
The value which is compared with the current matplotlib backend in use.
"""
return skipif(
backend.lower() != matplotlib.get_backend().lower(),
reason=f"'{backend}' backend not in use",
)
[docs]
def requires_external_executable(*names):
missing = [name for name in names if which(name) is None]
return skipif(
len(missing) > 0, reason=f"missing external executable(s): {', '.join(missing)}"
)
[docs]
class TempDirTest(unittest.TestCase):
"""
A test class that runs in a temporary directory and
removes it afterward.
"""
[docs]
def setUp(self):
self.curdir = os.getcwd()
self.tmpdir = tempfile.mkdtemp()
os.chdir(self.tmpdir)
[docs]
def tearDown(self):
os.chdir(self.curdir)
shutil.rmtree(self.tmpdir)
[docs]
class ParticleSelectionComparison:
"""
This is a test helper class that takes a particle dataset, caches the
particles it has on disk (manually reading them using lower-level IO
routines) and then received a data object that it compares against manually
running the data object's selection routines. All supplied data objects
must be created from the input dataset.
"""
def __init__(self, ds):
self.ds = ds
# Construct an index so that we get all the data_files
ds.index
particles = {}
# hsml is the smoothing length we use for radial selection
hsml = {}
for data_file in ds.index.data_files:
for ptype, pos_arr in ds.index.io._yield_coordinates(data_file):
particles.setdefault(ptype, []).append(pos_arr)
if ptype in getattr(ds, "_sph_ptypes", ()):
hsml.setdefault(ptype, []).append(
ds.index.io._get_smoothing_length(
data_file, pos_arr.dtype, pos_arr.shape
)
)
for ptype in particles:
particles[ptype] = np.concatenate(particles[ptype])
if ptype in hsml:
hsml[ptype] = np.concatenate(hsml[ptype])
self.particles = particles
self.hsml = hsml
[docs]
def compare_dobj_selection(self, dobj):
from numpy.testing import assert_array_almost_equal_nulp
for ptype in sorted(self.particles):
x, y, z = self.particles[ptype].T
# Set our radii to zero for now, I guess?
radii = self.hsml.get(ptype, 0.0)
sel_index = dobj.selector.select_points(x, y, z, radii)
if sel_index is None:
sel_pos = np.empty((0, 3))
else:
sel_pos = self.particles[ptype][sel_index, :]
obj_results = []
for chunk in dobj.chunks([], "io"):
obj_results.append(chunk[ptype, "particle_position"])
if any(_.size > 0 for _ in obj_results):
obj_results = np.concatenate(obj_results, axis=0)
else:
obj_results = np.empty((0, 3))
# Sometimes we get unitary scaling or other floating point noise. 5
# NULP should be OK. This is mostly for stuff like Rockstar, where
# the f32->f64 casting happens at different places depending on
# which code path we use.
assert_array_almost_equal_nulp(
np.asarray(sel_pos), np.asarray(obj_results), 5
)
[docs]
def run_defaults(self):
"""
This runs lots of samples that touch different types of wraparounds.
Specifically, it does:
* sphere in center with radius 0.1 unitary
* sphere in center with radius 0.2 unitary
* sphere in each of the eight corners of the domain with radius 0.1 unitary
* sphere in center with radius 0.5 unitary
* box that covers 0.1 .. 0.9
* box from 0.8 .. 0.85
* box from 0.3..0.6, 0.2..0.8, 0.0..0.1
"""
sp1 = self.ds.sphere("c", (0.1, "unitary"))
self.compare_dobj_selection(sp1)
sp2 = self.ds.sphere("c", (0.2, "unitary"))
self.compare_dobj_selection(sp2)
centers = [
[0.04, 0.5, 0.5],
[0.5, 0.04, 0.5],
[0.5, 0.5, 0.04],
[0.04, 0.04, 0.04],
[0.96, 0.5, 0.5],
[0.5, 0.96, 0.5],
[0.5, 0.5, 0.96],
[0.96, 0.96, 0.96],
]
r = self.ds.quan(0.1, "unitary")
for center in centers:
c = self.ds.arr(center, "unitary") + self.ds.domain_left_edge.in_units(
"unitary"
)
if not all(self.ds.periodicity):
# filter out the periodic bits for non-periodic datasets
if any(c - r < self.ds.domain_left_edge) or any(
c + r > self.ds.domain_right_edge
):
continue
sp = self.ds.sphere(c, (0.1, "unitary"))
self.compare_dobj_selection(sp)
sp = self.ds.sphere("c", (0.5, "unitary"))
self.compare_dobj_selection(sp)
dd = self.ds.all_data()
self.compare_dobj_selection(dd)
# This is in raw numbers, so we can offset for the left edge
LE = self.ds.domain_left_edge.in_units("unitary").d
reg1 = self.ds.r[
(0.1 + LE[0], "unitary") : (0.9 + LE[0], "unitary"),
(0.1 + LE[1], "unitary") : (0.9 + LE[1], "unitary"),
(0.1 + LE[2], "unitary") : (0.9 + LE[2], "unitary"),
]
self.compare_dobj_selection(reg1)
reg2 = self.ds.r[
(0.8 + LE[0], "unitary") : (0.85 + LE[0], "unitary"),
(0.8 + LE[1], "unitary") : (0.85 + LE[1], "unitary"),
(0.8 + LE[2], "unitary") : (0.85 + LE[2], "unitary"),
]
self.compare_dobj_selection(reg2)
reg3 = self.ds.r[
(0.3 + LE[0], "unitary") : (0.6 + LE[0], "unitary"),
(0.2 + LE[1], "unitary") : (0.8 + LE[1], "unitary"),
(0.0 + LE[2], "unitary") : (0.1 + LE[2], "unitary"),
]
self.compare_dobj_selection(reg3)
def _deprecated_numpy_testing_reexport(func):
import numpy.testing as npt
npt_func = getattr(npt, func.__name__)
@wraps(npt_func)
def retf(*args, **kwargs):
__tracebackhide__ = True # Hide traceback for pytest
issue_deprecation_warning(
f"yt.testing.{func.__name__} is a pure re-export of "
f"numpy.testing.{func.__name__}, it will stop working in the future. "
"Please import this function directly from numpy instead.",
since="4.2",
stacklevel=3,
)
return npt_func(*args, **kwargs)
return retf
@_deprecated_numpy_testing_reexport
def assert_array_equal(): ...
@_deprecated_numpy_testing_reexport
def assert_almost_equal(): ...
@_deprecated_numpy_testing_reexport
def assert_equal(): ...
@_deprecated_numpy_testing_reexport
def assert_array_less(): ...
@_deprecated_numpy_testing_reexport
def assert_string_equal(): ...
@_deprecated_numpy_testing_reexport
def assert_array_almost_equal_nulp(): ...
@_deprecated_numpy_testing_reexport
def assert_allclose(): ...
@_deprecated_numpy_testing_reexport
def assert_raises(): ...
@_deprecated_numpy_testing_reexport
def assert_approx_equal(): ...
@_deprecated_numpy_testing_reexport
def assert_array_almost_equal(): ...