import numpy as np
from more_itertools import collapse
from yt.data_objects.field_data import YTFieldData
from yt.fields.derived_field import DerivedField
from yt.frontends.ytdata.utilities import save_as_dataset
from yt.funcs import get_output_filename, is_sequence, iter_fields, mylog
from yt.units.unit_object import Unit # type: ignore
from yt.units.yt_array import YTQuantity, array_like_field
from yt.utilities.exceptions import (
YTIllDefinedBounds,
YTIllDefinedProfile,
YTProfileDataShape,
)
from yt.utilities.lib.misc_utilities import (
new_bin_profile1d,
new_bin_profile2d,
new_bin_profile3d,
)
from yt.utilities.lib.particle_mesh_operations import CICDeposit_2, NGPDeposit_2
from yt.utilities.parallel_tools.parallel_analysis_interface import (
ParallelAnalysisInterface,
parallel_objects,
)
def _sanitize_min_max_units(amin, amax, finfo, registry):
# returns a copy of amin and amax, converted to finfo's output units
umin = getattr(amin, "units", None)
umax = getattr(amax, "units", None)
if umin is None:
umin = Unit(finfo.output_units, registry=registry)
rmin = YTQuantity(amin, umin)
else:
rmin = amin.in_units(finfo.output_units)
if umax is None:
umax = Unit(finfo.output_units, registry=registry)
rmax = YTQuantity(amax, umax)
else:
rmax = amax.in_units(finfo.output_units)
return rmin, rmax
[docs]
def preserve_source_parameters(func):
def save_state(*args, **kwargs):
# Temporarily replace the 'field_parameters' for a
# grid with the 'field_parameters' for the data source
prof = args[0]
source = args[1]
if hasattr(source, "field_parameters"):
old_params = source.field_parameters
source.field_parameters = prof._data_source.field_parameters
tr = func(*args, **kwargs)
source.field_parameters = old_params
else:
tr = func(*args, **kwargs)
return tr
return save_state
[docs]
class ProfileFieldAccumulator:
def __init__(self, n_fields, size):
shape = size + (n_fields,)
self.values = np.zeros(shape, dtype="float64")
self.mvalues = np.zeros(shape, dtype="float64")
self.qvalues = np.zeros(shape, dtype="float64")
self.used = np.zeros(size, dtype="bool")
self.weight_values = np.zeros(size, dtype="float64")
[docs]
class ProfileND(ParallelAnalysisInterface):
"""The profile object class"""
def __init__(self, data_source, weight_field=None):
self.data_source = data_source
self.ds = data_source.ds
self.field_map = {}
self.field_info = {}
self.field_data = YTFieldData()
if weight_field is not None:
self.standard_deviation = YTFieldData()
weight_field = self.data_source._determine_fields(weight_field)[0]
else:
self.standard_deviation = None
self.weight_field = weight_field
self.field_units = {}
ParallelAnalysisInterface.__init__(self, comm=data_source.comm)
[docs]
def add_fields(self, fields):
"""Add fields to profile
Parameters
----------
fields : list of field names
A list of fields to create profile histograms for
"""
fields = self.data_source._determine_fields(fields)
for f in fields:
self.field_info[f] = self.data_source.ds.field_info[f]
temp_storage = ProfileFieldAccumulator(len(fields), self.size)
citer = self.data_source.chunks([], "io")
for chunk in parallel_objects(citer):
self._bin_chunk(chunk, fields, temp_storage)
self._finalize_storage(fields, temp_storage)
[docs]
def set_field_unit(self, field, new_unit):
"""Sets a new unit for the requested field
Parameters
----------
field : string or field tuple
The name of the field that is to be changed.
new_unit : string or Unit object
The name of the new unit.
"""
if field in self.field_units:
self.field_units[field] = Unit(new_unit, registry=self.ds.unit_registry)
else:
fd = self.field_map[field]
if fd in self.field_units:
self.field_units[fd] = Unit(new_unit, registry=self.ds.unit_registry)
else:
raise KeyError(f"{field} not in profile!")
def _finalize_storage(self, fields, temp_storage):
# We use our main comm here
# This also will fill _field_data
for i, _field in enumerate(fields):
# q values are returned as q * weight but we want just q
temp_storage.qvalues[..., i][temp_storage.used] /= (
temp_storage.weight_values[temp_storage.used]
)
# get the profile data from all procs
all_store = {self.comm.rank: temp_storage}
all_store = self.comm.par_combine_object(all_store, "join", datatype="dict")
all_val = np.zeros_like(temp_storage.values)
all_mean = np.zeros_like(temp_storage.mvalues)
all_std = np.zeros_like(temp_storage.qvalues)
all_weight = np.zeros_like(temp_storage.weight_values)
all_used = np.zeros_like(temp_storage.used, dtype="bool")
# Combine the weighted mean and standard deviation from each processor.
# For two samples with total weight, mean, and standard deviation
# given by w, m, and s, their combined mean and standard deviation are:
# m12 = (m1 * w1 + m2 * w2) / (w1 + w2)
# s12 = (m1 * (s1**2 + (m1 - m12)**2) +
# m2 * (s2**2 + (m2 - m12)**2)) / (w1 + w2)
# Here, the mvalues are m and the qvalues are s**2.
for p in sorted(all_store.keys()):
all_used += all_store[p].used
old_mean = all_mean.copy()
old_weight = all_weight.copy()
all_weight[all_store[p].used] += all_store[p].weight_values[
all_store[p].used
]
for i, _field in enumerate(fields):
all_val[..., i][all_store[p].used] += all_store[p].values[..., i][
all_store[p].used
]
all_mean[..., i][all_store[p].used] = (
all_mean[..., i] * old_weight
+ all_store[p].mvalues[..., i] * all_store[p].weight_values
)[all_store[p].used] / all_weight[all_store[p].used]
all_std[..., i][all_store[p].used] = (
old_weight
* (all_std[..., i] + (old_mean[..., i] - all_mean[..., i]) ** 2)
+ all_store[p].weight_values
* (
all_store[p].qvalues[..., i]
+ (all_store[p].mvalues[..., i] - all_mean[..., i]) ** 2
)
)[all_store[p].used] / all_weight[all_store[p].used]
all_std = np.sqrt(all_std)
del all_store
self.used = all_used
blank = ~all_used
self.weight = all_weight
self.weight[blank] = 0.0
for i, field in enumerate(fields):
if self.weight_field is None:
self.field_data[field] = array_like_field(
self.data_source, all_val[..., i], field
)
else:
self.field_data[field] = array_like_field(
self.data_source, all_mean[..., i], field
)
self.standard_deviation[field] = array_like_field(
self.data_source, all_std[..., i], field
)
self.standard_deviation[field][blank] = 0.0
self.weight = array_like_field(
self.data_source, self.weight, self.weight_field
)
self.field_data[field][blank] = 0.0
self.field_units[field] = self.field_data[field].units
if isinstance(field, tuple):
self.field_map[field[1]] = field
else:
self.field_map[field] = field
def _bin_chunk(self, chunk, fields, storage):
raise NotImplementedError
def _filter(self, bin_fields):
# cut_points is set to be everything initially, but
# we also want to apply a filtering based on min/max
pfilter = np.ones(bin_fields[0].shape, dtype="bool")
for (mi, ma), data in zip(self.bounds, bin_fields, strict=True):
pfilter &= data > mi
pfilter &= data < ma
return pfilter, [data[pfilter] for data in bin_fields]
def _get_data(self, chunk, fields):
# We are using chunks now, which will manage the field parameters and
# the like.
bin_fields = [chunk[bf] for bf in self.bin_fields]
for i in range(1, len(bin_fields)):
if bin_fields[0].shape != bin_fields[i].shape:
raise YTProfileDataShape(
self.bin_fields[0],
bin_fields[0].shape,
self.bin_fields[i],
bin_fields[i].shape,
)
# We want to make sure that our fields are within the bounds of the
# binning
pfilter, bin_fields = self._filter(bin_fields)
if not np.any(pfilter):
return None
arr = np.zeros((bin_fields[0].size, len(fields)), dtype="float64")
for i, field in enumerate(fields):
if pfilter.shape != chunk[field].shape:
raise YTProfileDataShape(
self.bin_fields[0], bin_fields[0].shape, field, chunk[field].shape
)
units = chunk.ds.field_info[field].output_units
arr[:, i] = chunk[field][pfilter].in_units(units)
if self.weight_field is not None:
if pfilter.shape != chunk[self.weight_field].shape:
raise YTProfileDataShape(
self.bin_fields[0],
bin_fields[0].shape,
self.weight_field,
chunk[self.weight_field].shape,
)
units = chunk.ds.field_info[self.weight_field].output_units
weight_data = chunk[self.weight_field].in_units(units)
else:
weight_data = np.ones(pfilter.shape, dtype="float64")
weight_data = weight_data[pfilter]
# So that we can pass these into
return arr, weight_data, bin_fields
def __getitem__(self, field):
if field in self.field_data:
fname = field
else:
# deal with string vs tuple field names and attempt to guess which field
# we are supposed to be talking about
fname = self.field_map.get(field, None)
if isinstance(field, tuple):
fname = self.field_map.get(field[1], None)
if fname != field:
raise KeyError(
f"Asked for field '{field}' but only have data for "
f"fields '{list(self.field_data.keys())}'"
)
elif isinstance(field, DerivedField):
fname = self.field_map.get(field.name[1], None)
if fname is None:
raise KeyError(field)
if getattr(self, "fractional", False):
return self.field_data[fname]
else:
return self.field_data[fname].in_units(self.field_units[fname])
[docs]
def items(self):
return [(k, self[k]) for k in self.field_data.keys()]
[docs]
def keys(self):
return self.field_data.keys()
def __iter__(self):
return sorted(self.items())
def _get_bins(self, mi, ma, n, take_log):
if take_log:
ret = np.logspace(np.log10(mi), np.log10(ma), n + 1)
# at this point ret[0] and ret[-1] are not exactly equal to
# mi and ma due to round-off error. Let's force them to be
# mi and ma exactly to avoid incorrectly discarding cells near
# the edges. See Issue #1300.
ret[0], ret[-1] = mi, ma
return ret
else:
return np.linspace(mi, ma, n + 1)
[docs]
def save_as_dataset(self, filename=None):
r"""Export a profile to a reloadable yt dataset.
This function will take a profile and output a dataset
containing all relevant fields. The resulting dataset
can be reloaded as a yt dataset.
Parameters
----------
filename : str, optional
The name of the file to be written. If None, the name
will be a combination of the original dataset plus
the type of object, e.g., Profile1D.
Returns
-------
filename : str
The name of the file that has been created.
Examples
--------
>>> import yt
>>> ds = yt.load("enzo_tiny_cosmology/DD0046/DD0046")
>>> ad = ds.all_data()
>>> profile = yt.create_profile(
... ad,
... [("gas", "density"), ("gas", "temperature")],
... ("gas", "mass"),
... weight_field=None,
... n_bins=(128, 128),
... )
>>> fn = profile.save_as_dataset()
>>> prof_ds = yt.load(fn)
>>> print(prof_ds.data["gas", "mass"])
(128, 128)
>>> print(prof_ds.data["index", "x"].shape) # x bins as 1D array
(128,)
>>> print(prof_ds.data["gas", "density"]) # x bins as 2D array
(128, 128)
>>> p = yt.PhasePlot(
... prof_ds.data,
... ("gas", "density"),
... ("gas", "temperature"),
... ("gas", "mass"),
... weight_field=None,
... )
>>> p.save()
"""
keyword = f"{str(self.ds)}_{self.__class__.__name__}"
filename = get_output_filename(filename, keyword, ".h5")
args = ("field", "log")
extra_attrs = {
"data_type": "yt_profile",
"profile_dimensions": self.size,
"weight_field": self.weight_field,
"fractional": self.fractional,
"accumulation": self.accumulation,
}
data = {}
data.update(self.field_data)
data["weight"] = self.weight
data["used"] = self.used.astype("float64")
std = "standard_deviation"
if self.weight_field is not None:
std_data = getattr(self, std)
data.update({(std, field[1]): std_data[field] for field in self.field_data})
dimensionality = 0
bin_data = []
for ax in "xyz":
if hasattr(self, ax):
dimensionality += 1
data[ax] = getattr(self, ax)
bin_data.append(data[ax])
bin_field_name = f"{ax}_bins"
data[bin_field_name] = getattr(self, bin_field_name)
extra_attrs[f"{ax}_range"] = self.ds.arr(
[data[bin_field_name][0], data[bin_field_name][-1]]
)
for arg in args:
key = f"{ax}_{arg}"
extra_attrs[key] = getattr(self, key)
bin_fields = np.meshgrid(*bin_data)
for i, ax in enumerate("xyz"[:dimensionality]):
data[getattr(self, f"{ax}_field")] = bin_fields[i]
extra_attrs["dimensionality"] = dimensionality
ftypes = {field: "data" for field in data if field[0] != std}
if self.weight_field is not None:
ftypes.update({(std, field[1]): std for field in self.field_data})
save_as_dataset(
self.ds, filename, data, field_types=ftypes, extra_attrs=extra_attrs
)
return filename
[docs]
class ProfileNDFromDataset(ProfileND):
"""
An ND profile object loaded from a ytdata dataset.
"""
def __init__(self, ds):
ProfileND.__init__(self, ds.data, ds.parameters.get("weight_field", None))
self.fractional = ds.parameters.get("fractional", False)
self.accumulation = ds.parameters.get("accumulation", False)
exclude_fields = ["used", "weight"]
for ax in "xyz"[: ds.dimensionality]:
setattr(self, ax, ds.data["data", ax])
ax_bins = f"{ax}_bins"
ax_field = f"{ax}_field"
ax_log = f"{ax}_log"
setattr(self, ax_bins, ds.data["data", ax_bins])
field_name = tuple(ds.parameters.get(ax_field, (None, None)))
setattr(self, ax_field, field_name)
self.field_info[field_name] = ds.field_info[field_name]
setattr(self, ax_log, ds.parameters.get(ax_log, False))
exclude_fields.extend([ax, ax_bins, field_name[1]])
self.weight = ds.data["data", "weight"]
self.used = ds.data["data", "used"].d.astype(bool)
profile_fields = [
f
for f in ds.field_list
if f[1] not in exclude_fields and f[0] != "standard_deviation"
]
for field in profile_fields:
self.field_map[field[1]] = field
self.field_data[field] = ds.data[field]
self.field_info[field] = ds.field_info[field]
self.field_units[field] = ds.data[field].units
if ("standard_deviation", field[1]) in ds.field_list:
self.standard_deviation[field] = ds.data["standard_deviation", field[1]]
[docs]
class Profile1D(ProfileND):
"""An object that represents a 1D profile.
Parameters
----------
data_source : AMD3DData object
The data object to be profiled
x_field : string field name
The field to profile as a function of
x_n : integer
The number of bins along the x direction.
x_min : float
The minimum value of the x profile field. If supplied without units,
assumed to be in the output units for x_field.
x_max : float
The maximum value of the x profile field. If supplied without units,
assumed to be in the output units for x_field.
x_log : boolean
Controls whether or not the bins for the x field are evenly
spaced in linear (False) or log (True) space.
weight_field : string field name
The field to weight the profiled fields by.
override_bins_x : array
Array to set as xbins and ignore other parameters if set
"""
def __init__(
self,
data_source,
x_field,
x_n,
x_min,
x_max,
x_log,
weight_field=None,
override_bins_x=None,
):
super().__init__(data_source, weight_field)
self.x_field = data_source._determine_fields(x_field)[0]
self.field_info[self.x_field] = self.data_source.ds.field_info[self.x_field]
self.x_log = x_log
x_min, x_max = _sanitize_min_max_units(
x_min, x_max, self.field_info[self.x_field], self.ds.unit_registry
)
self.x_bins = array_like_field(
data_source, self._get_bins(x_min, x_max, x_n, x_log), self.x_field
)
if override_bins_x is not None:
self.x_bins = array_like_field(data_source, override_bins_x, self.x_field)
self.size = (self.x_bins.size - 1,)
self.bin_fields = (self.x_field,)
self.x = 0.5 * (self.x_bins[1:] + self.x_bins[:-1])
def _bin_chunk(self, chunk, fields, storage):
rv = self._get_data(chunk, fields)
if rv is None:
return
fdata, wdata, (bf_x,) = rv
bf_x.convert_to_units(self.field_info[self.x_field].output_units)
bin_ind = np.digitize(bf_x, self.x_bins) - 1
new_bin_profile1d(
bin_ind,
wdata,
fdata,
storage.weight_values,
storage.values,
storage.mvalues,
storage.qvalues,
storage.used,
)
# We've binned it!
[docs]
def set_x_unit(self, new_unit):
"""Sets a new unit for the x field
Parameters
----------
new_unit : string or Unit object
The name of the new unit.
"""
self.x_bins.convert_to_units(new_unit)
self.x = 0.5 * (self.x_bins[1:] + self.x_bins[:-1])
@property
def bounds(self):
return ((self.x_bins[0], self.x_bins[-1]),)
[docs]
def plot(self):
r"""
This returns a :class:`~yt.visualization.profile_plotter.ProfilePlot`
with the fields that have been added to this object.
"""
from yt.visualization.profile_plotter import ProfilePlot
return ProfilePlot.from_profiles(self)
def _export_prep(self, fields, only_used):
if only_used:
idxs = self.used
else:
idxs = slice(None, None, None)
if not only_used and not np.all(self.used):
masked = True
else:
masked = False
if fields is None:
fields = self.field_data.keys()
else:
fields = self.data_source._determine_fields(fields)
return idxs, masked, fields
[docs]
def to_dataframe(self, fields=None, only_used=False, include_std=False):
r"""Export a profile object to a pandas DataFrame.
This function will take a data object and construct from it and
optionally a list of fields a pandas DataFrame object. If pandas is
not importable, this will raise ImportError.
Parameters
----------
fields : list of strings or tuple field names, default None
If this is supplied, it is the list of fields to be exported into
the DataFrame. If not supplied, whatever fields exist in the
profile, along with the bin field, will be exported.
only_used : boolean, default False
If True, only the bins which have data will be exported. If False,
all the bins will be exported, but the elements for those bins
in the data arrays will be filled with NaNs.
include_std : boolean, optional
If True, include the standard deviation of the profile
in the pandas DataFrame. It will appear in the table as the
field name with "_stddev" appended, e.g. "velocity_x_stddev".
Default: False
Returns
-------
df : :class:`~pandas.DataFrame`
The data contained in the profile.
Examples
--------
>>> sp = ds.sphere("c", (0.1, "unitary"))
>>> p = sp.profile(
... ("index", "radius"), [("gas", "density"), ("gas", "temperature")]
... )
>>> df1 = p.to_dataframe()
>>> df2 = p.to_dataframe(fields=("gas", "density"), only_used=True)
"""
from yt.utilities.on_demand_imports import _pandas as pd
idxs, masked, fields = self._export_prep(fields, only_used)
pdata = {self.x_field[-1]: self.x[idxs]}
for field in fields:
pdata[field[-1]] = self[field][idxs]
if include_std:
pdata[f"{field[-1]}_stddev"] = self.standard_deviation[field][idxs]
df = pd.DataFrame(pdata)
if masked:
mask = np.zeros(df.shape, dtype="bool")
mask[~self.used, 1:] = True
df.mask(mask, inplace=True)
return df
[docs]
def to_astropy_table(self, fields=None, only_used=False, include_std=False):
"""
Export the profile data to a :class:`~astropy.table.table.QTable`,
which is a Table object which is unit-aware. The QTable can then
be exported to an ASCII file, FITS file, etc.
See the AstroPy Table docs for more details:
http://docs.astropy.org/en/stable/table/
Parameters
----------
fields : list of strings or tuple field names, default None
If this is supplied, it is the list of fields to be exported into
the DataFrame. If not supplied, whatever fields exist in the
profile, along with the bin field, will be exported.
only_used : boolean, optional
If True, only the bins which are used are copied
to the QTable as rows. If False, all bins are
copied, but the bins which are not used are masked.
Default: False
include_std : boolean, optional
If True, include the standard deviation of the profile
in the AstroPy QTable. It will appear in the table as the
field name with "_stddev" appended, e.g. "velocity_x_stddev".
Default: False
Returns
-------
qt : :class:`~astropy.table.QTable`
The data contained in the profile.
Examples
--------
>>> sp = ds.sphere("c", (0.1, "unitary"))
>>> p = sp.profile(
... ("index", "radius"), [("gas", "density"), ("gas", "temperature")]
... )
>>> qt1 = p.to_astropy_table()
>>> qt2 = p.to_astropy_table(fields=("gas", "density"), only_used=True)
"""
from astropy.table import QTable
idxs, masked, fields = self._export_prep(fields, only_used)
qt = QTable(masked=masked)
qt[self.x_field[-1]] = self.x[idxs].to_astropy()
if masked:
qt[self.x_field[-1]].mask = self.used
for field in fields:
qt[field[-1]] = self[field][idxs].to_astropy()
if masked:
qt[field[-1]].mask = self.used
if include_std:
qt[f"{field[-1]}_stddev"] = self.standard_deviation[field][
idxs
].to_astropy()
if masked:
qt[f"{field[-1]}_stddev"].mask = self.used
return qt
[docs]
class Profile1DFromDataset(ProfileNDFromDataset, Profile1D):
"""
A 1D profile object loaded from a ytdata dataset.
"""
def __init(self, ds):
ProfileNDFromDataset.__init__(self, ds)
[docs]
class Profile2D(ProfileND):
"""An object that represents a 2D profile.
Parameters
----------
data_source : AMD3DData object
The data object to be profiled
x_field : string field name
The field to profile as a function of along the x axis.
x_n : integer
The number of bins along the x direction.
x_min : float
The minimum value of the x profile field. If supplied without units,
assumed to be in the output units for x_field.
x_max : float
The maximum value of the x profile field. If supplied without units,
assumed to be in the output units for x_field.
x_log : boolean
Controls whether or not the bins for the x field are evenly
spaced in linear (False) or log (True) space.
y_field : string field name
The field to profile as a function of along the y axis
y_n : integer
The number of bins along the y direction.
y_min : float
The minimum value of the y profile field. If supplied without units,
assumed to be in the output units for y_field.
y_max : float
The maximum value of the y profile field. If supplied without units,
assumed to be in the output units for y_field.
y_log : boolean
Controls whether or not the bins for the y field are evenly
spaced in linear (False) or log (True) space.
weight_field : string field name
The field to weight the profiled fields by.
override_bins_x : array
Array to set as xbins and ignore other parameters if set
override_bins_y : array
Array to set as ybins and ignore other parameters if set
"""
def __init__(
self,
data_source,
x_field,
x_n,
x_min,
x_max,
x_log,
y_field,
y_n,
y_min,
y_max,
y_log,
weight_field=None,
override_bins_x=None,
override_bins_y=None,
):
super().__init__(data_source, weight_field)
# X
self.x_field = data_source._determine_fields(x_field)[0]
self.x_log = x_log
self.field_info[self.x_field] = self.data_source.ds.field_info[self.x_field]
x_min, x_max = _sanitize_min_max_units(
x_min, x_max, self.field_info[self.x_field], self.ds.unit_registry
)
self.x_bins = array_like_field(
data_source, self._get_bins(x_min, x_max, x_n, x_log), self.x_field
)
if override_bins_x is not None:
self.x_bins = array_like_field(data_source, override_bins_x, self.x_field)
# Y
self.y_field = data_source._determine_fields(y_field)[0]
self.y_log = y_log
self.field_info[self.y_field] = self.data_source.ds.field_info[self.y_field]
y_min, y_max = _sanitize_min_max_units(
y_min, y_max, self.field_info[self.y_field], self.ds.unit_registry
)
self.y_bins = array_like_field(
data_source, self._get_bins(y_min, y_max, y_n, y_log), self.y_field
)
if override_bins_y is not None:
self.y_bins = array_like_field(data_source, override_bins_y, self.y_field)
self.size = (self.x_bins.size - 1, self.y_bins.size - 1)
self.bin_fields = (self.x_field, self.y_field)
self.x = 0.5 * (self.x_bins[1:] + self.x_bins[:-1])
self.y = 0.5 * (self.y_bins[1:] + self.y_bins[:-1])
def _bin_chunk(self, chunk, fields, storage):
rv = self._get_data(chunk, fields)
if rv is None:
return
fdata, wdata, (bf_x, bf_y) = rv
bf_x.convert_to_units(self.field_info[self.x_field].output_units)
bin_ind_x = np.digitize(bf_x, self.x_bins) - 1
bf_y.convert_to_units(self.field_info[self.y_field].output_units)
bin_ind_y = np.digitize(bf_y, self.y_bins) - 1
new_bin_profile2d(
bin_ind_x,
bin_ind_y,
wdata,
fdata,
storage.weight_values,
storage.values,
storage.mvalues,
storage.qvalues,
storage.used,
)
# We've binned it!
[docs]
def set_x_unit(self, new_unit):
"""Sets a new unit for the x field
Parameters
----------
new_unit : string or Unit object
The name of the new unit.
"""
self.x_bins.convert_to_units(new_unit)
self.x = 0.5 * (self.x_bins[1:] + self.x_bins[:-1])
[docs]
def set_y_unit(self, new_unit):
"""Sets a new unit for the y field
Parameters
----------
new_unit : string or Unit object
The name of the new unit.
"""
self.y_bins.convert_to_units(new_unit)
self.y = 0.5 * (self.y_bins[1:] + self.y_bins[:-1])
@property
def bounds(self):
return ((self.x_bins[0], self.x_bins[-1]), (self.y_bins[0], self.y_bins[-1]))
[docs]
def plot(self):
r"""
This returns a :class:~yt.visualization.profile_plotter.PhasePlot with
the fields that have been added to this object.
"""
from yt.visualization.profile_plotter import PhasePlot
return PhasePlot.from_profile(self)
[docs]
class Profile2DFromDataset(ProfileNDFromDataset, Profile2D):
"""
A 2D profile object loaded from a ytdata dataset.
"""
def __init(self, ds):
ProfileNDFromDataset.__init__(self, ds)
[docs]
class ParticleProfile(Profile2D):
"""An object that represents a *deposited* 2D profile. This is like a
Profile2D, except that it is intended for particle data. Instead of just
binning the particles, the added fields will be deposited onto the mesh
using either the nearest-grid-point or cloud-in-cell interpolation kernels.
Parameters
----------
data_source : AMD3DData object
The data object to be profiled
x_field : string field name
The field to profile as a function of along the x axis.
x_n : integer
The number of bins along the x direction.
x_min : float
The minimum value of the x profile field. If supplied without units,
assumed to be in the output units for x_field.
x_max : float
The maximum value of the x profile field. If supplied without units,
assumed to be in the output units for x_field.
y_field : string field name
The field to profile as a function of along the y axis
y_n : integer
The number of bins along the y direction.
y_min : float
The minimum value of the y profile field. If supplied without units,
assumed to be in the output units for y_field.
y_max : float
The maximum value of the y profile field. If supplied without units,
assumed to be in the output units for y_field.
weight_field : string field name
The field to use for weighting. Default is None.
deposition : string, optional
The interpolation kernel to be used for
deposition. Valid choices:
"ngp" : nearest grid point interpolation
"cic" : cloud-in-cell interpolation
"""
accumulation = False
fractional = False
def __init__(
self,
data_source,
x_field,
x_n,
x_min,
x_max,
x_log,
y_field,
y_n,
y_min,
y_max,
y_log,
weight_field=None,
deposition="ngp",
):
x_field = data_source._determine_fields(x_field)[0]
y_field = data_source._determine_fields(y_field)[0]
if deposition not in ["ngp", "cic"]:
raise NotImplementedError(deposition)
elif (x_log or y_log) and deposition != "ngp":
mylog.warning(
"cic deposition is only supported for linear axis "
"scales, falling back to ngp deposition"
)
deposition = "ngp"
self.deposition = deposition
# set the log parameters to False (since that doesn't make much sense
# for deposited data) and also turn off the weight field.
super().__init__(
data_source,
x_field,
x_n,
x_min,
x_max,
x_log,
y_field,
y_n,
y_min,
y_max,
y_log,
weight_field=weight_field,
)
# Either stick the particle field in the nearest bin,
# or spread it out using the 2D CIC deposition function
def _bin_chunk(self, chunk, fields, storage):
rv = self._get_data(chunk, fields)
if rv is None:
return
fdata, wdata, (bf_x, bf_y) = rv
# make sure everything has the same units before deposition.
# the units will be scaled to the correct values later.
if self.deposition == "ngp":
func = NGPDeposit_2
elif self.deposition == "cic":
func = CICDeposit_2
for fi, _field in enumerate(fields):
if self.weight_field is None:
deposit_vals = fdata[:, fi]
else:
deposit_vals = wdata * fdata[:, fi]
field_mask = np.zeros(self.size, dtype="uint8")
func(
bf_x,
bf_y,
deposit_vals,
fdata[:, fi].size,
storage.values[:, :, fi],
field_mask,
self.x_bins,
self.y_bins,
)
locs = field_mask > 0
storage.used[locs] = True
if self.weight_field is not None:
func(
bf_x,
bf_y,
wdata,
fdata[:, fi].size,
storage.weight_values,
field_mask,
self.x_bins,
self.y_bins,
)
else:
storage.weight_values[locs] = 1.0
storage.mvalues[locs, fi] = (
storage.values[locs, fi] / storage.weight_values[locs]
)
# We've binned it!
[docs]
class Profile3D(ProfileND):
"""An object that represents a 2D profile.
Parameters
----------
data_source : AMD3DData object
The data object to be profiled
x_field : string field name
The field to profile as a function of along the x axis.
x_n : integer
The number of bins along the x direction.
x_min : float
The minimum value of the x profile field. If supplied without units,
assumed to be in the output units for x_field.
x_max : float
The maximum value of the x profile field. If supplied without units,
assumed to be in the output units for x_field.
x_log : boolean
Controls whether or not the bins for the x field are evenly
spaced in linear (False) or log (True) space.
y_field : string field name
The field to profile as a function of along the y axis
y_n : integer
The number of bins along the y direction.
y_min : float
The minimum value of the y profile field. If supplied without units,
assumed to be in the output units for y_field.
y_max : float
The maximum value of the y profile field. If supplied without units,
assumed to be in the output units for y_field.
y_log : boolean
Controls whether or not the bins for the y field are evenly
spaced in linear (False) or log (True) space.
z_field : string field name
The field to profile as a function of along the z axis
z_n : integer
The number of bins along the z direction.
z_min : float
The minimum value of the z profile field. If supplied without units,
assumed to be in the output units for z_field.
z_max : float
The maximum value of thee z profile field. If supplied without units,
assumed to be in the output units for z_field.
z_log : boolean
Controls whether or not the bins for the z field are evenly
spaced in linear (False) or log (True) space.
weight_field : string field name
The field to weight the profiled fields by.
override_bins_x : array
Array to set as xbins and ignore other parameters if set
override_bins_y : array
Array to set as xbins and ignore other parameters if set
override_bins_z : array
Array to set as xbins and ignore other parameters if set
"""
def __init__(
self,
data_source,
x_field,
x_n,
x_min,
x_max,
x_log,
y_field,
y_n,
y_min,
y_max,
y_log,
z_field,
z_n,
z_min,
z_max,
z_log,
weight_field=None,
override_bins_x=None,
override_bins_y=None,
override_bins_z=None,
):
super().__init__(data_source, weight_field)
# X
self.x_field = data_source._determine_fields(x_field)[0]
self.x_log = x_log
self.field_info[self.x_field] = self.data_source.ds.field_info[self.x_field]
x_min, x_max = _sanitize_min_max_units(
x_min, x_max, self.field_info[self.x_field], self.ds.unit_registry
)
self.x_bins = array_like_field(
data_source, self._get_bins(x_min, x_max, x_n, x_log), self.x_field
)
if override_bins_x is not None:
self.x_bins = array_like_field(data_source, override_bins_x, self.x_field)
# Y
self.y_field = data_source._determine_fields(y_field)[0]
self.y_log = y_log
self.field_info[self.y_field] = self.data_source.ds.field_info[self.y_field]
y_min, y_max = _sanitize_min_max_units(
y_min, y_max, self.field_info[self.y_field], self.ds.unit_registry
)
self.y_bins = array_like_field(
data_source, self._get_bins(y_min, y_max, y_n, y_log), self.y_field
)
if override_bins_y is not None:
self.y_bins = array_like_field(data_source, override_bins_y, self.y_field)
# Z
self.z_field = data_source._determine_fields(z_field)[0]
self.z_log = z_log
self.field_info[self.z_field] = self.data_source.ds.field_info[self.z_field]
z_min, z_max = _sanitize_min_max_units(
z_min, z_max, self.field_info[self.z_field], self.ds.unit_registry
)
self.z_bins = array_like_field(
data_source, self._get_bins(z_min, z_max, z_n, z_log), self.z_field
)
if override_bins_z is not None:
self.z_bins = array_like_field(data_source, override_bins_z, self.z_field)
self.size = (self.x_bins.size - 1, self.y_bins.size - 1, self.z_bins.size - 1)
self.bin_fields = (self.x_field, self.y_field, self.z_field)
self.x = 0.5 * (self.x_bins[1:] + self.x_bins[:-1])
self.y = 0.5 * (self.y_bins[1:] + self.y_bins[:-1])
self.z = 0.5 * (self.z_bins[1:] + self.z_bins[:-1])
def _bin_chunk(self, chunk, fields, storage):
rv = self._get_data(chunk, fields)
if rv is None:
return
fdata, wdata, (bf_x, bf_y, bf_z) = rv
bf_x.convert_to_units(self.field_info[self.x_field].output_units)
bin_ind_x = np.digitize(bf_x, self.x_bins) - 1
bf_y.convert_to_units(self.field_info[self.y_field].output_units)
bin_ind_y = np.digitize(bf_y, self.y_bins) - 1
bf_z.convert_to_units(self.field_info[self.z_field].output_units)
bin_ind_z = np.digitize(bf_z, self.z_bins) - 1
new_bin_profile3d(
bin_ind_x,
bin_ind_y,
bin_ind_z,
wdata,
fdata,
storage.weight_values,
storage.values,
storage.mvalues,
storage.qvalues,
storage.used,
)
# We've binned it!
@property
def bounds(self):
return (
(self.x_bins[0], self.x_bins[-1]),
(self.y_bins[0], self.y_bins[-1]),
(self.z_bins[0], self.z_bins[-1]),
)
[docs]
def set_x_unit(self, new_unit):
"""Sets a new unit for the x field
Parameters
----------
new_unit : string or Unit object
The name of the new unit.
"""
self.x_bins.convert_to_units(new_unit)
self.x = 0.5 * (self.x_bins[1:] + self.x_bins[:-1])
[docs]
def set_y_unit(self, new_unit):
"""Sets a new unit for the y field
Parameters
----------
new_unit : string or Unit object
The name of the new unit.
"""
self.y_bins.convert_to_units(new_unit)
self.y = 0.5 * (self.y_bins[1:] + self.y_bins[:-1])
[docs]
def set_z_unit(self, new_unit):
"""Sets a new unit for the z field
Parameters
----------
new_unit : string or Unit object
The name of the new unit.
"""
self.z_bins.convert_to_units(new_unit)
self.z = 0.5 * (self.z_bins[1:] + self.z_bins[:-1])
[docs]
class Profile3DFromDataset(ProfileNDFromDataset, Profile3D):
"""
A 2D profile object loaded from a ytdata dataset.
"""
def __init(self, ds):
ProfileNDFromDataset.__init__(self, ds)
[docs]
def sanitize_field_tuple_keys(input_dict, data_source):
if input_dict is not None:
dummy = {}
for item in input_dict:
dummy[data_source._determine_fields(item)[0]] = input_dict[item]
return dummy
else:
return input_dict
[docs]
def create_profile(
data_source,
bin_fields,
fields,
n_bins=64,
extrema=None,
logs=None,
units=None,
weight_field=("gas", "mass"),
accumulation=False,
fractional=False,
deposition="ngp",
override_bins=None,
):
r"""
Create a 1, 2, or 3D profile object.
The dimensionality of the profile object is chosen by the number of
fields given in the bin_fields argument.
Parameters
----------
data_source : YTSelectionContainer Object
The data object to be profiled.
bin_fields : list of strings
List of the binning fields for profiling.
fields : list of strings
The fields to be profiled.
n_bins : int or list of ints
The number of bins in each dimension. If None, 64 bins for
each bin are used for each bin field.
Default: 64.
extrema : dict of min, max tuples
Minimum and maximum values of the bin_fields for the profiles.
The keys correspond to the field names. Defaults to the extrema
of the bin_fields of the dataset. If a units dict is provided, extrema
are understood to be in the units specified in the dictionary.
logs : dict of boolean values
Whether or not to log the bin_fields for the profiles.
The keys correspond to the field names. Defaults to the take_log
attribute of the field.
units : dict of strings
The units of the fields in the profiles, including the bin_fields.
weight_field : str or tuple field identifier
The weight field for computing weighted average for the profile
values. If None, the profile values are sums of the data in
each bin. Defaults to ("gas", "mass").
accumulation : bool or list of bools
If True, the profile values for a bin n are the cumulative sum of
all the values from bin 0 to n. If -True, the sum is reversed so
that the value for bin n is the cumulative sum from bin N (total bins)
to n. If the profile is 2D or 3D, a list of values can be given to
control the summation in each dimension independently.
Default: False.
fractional : bool
If True the profile values are divided by the sum of all
the profile data such that the profile represents a probability
distribution function.
deposition : strings
Controls the type of deposition used for ParticlePhasePlots.
Valid choices are 'ngp' and 'cic'. Default is 'ngp'. This parameter is
ignored if the input fields are not of particle type.
override_bins : dict of bins to profile plot with
If set, ignores n_bins and extrema settings and uses the
supplied bins to profile the field. If a units dict is provided,
bins are understood to be in the units specified in the dictionary.
Examples
--------
Create a 1d profile. Access bin field from profile.x and field
data from profile[<field_name>].
>>> ds = load("DD0046/DD0046")
>>> ad = ds.all_data()
>>> profile = create_profile(
... ad, [("gas", "density")], [("gas", "temperature"), ("gas", "velocity_x")]
... )
>>> print(profile.x)
>>> print(profile["gas", "temperature"])
"""
bin_fields = data_source._determine_fields(bin_fields)
fields = list(iter_fields(fields))
is_pfield = [
data_source.ds._get_field_info(f).sampling_type == "particle"
for f in bin_fields + fields
]
wf = None
if weight_field is not None:
wf = data_source.ds._get_field_info(weight_field)
is_pfield.append(wf.sampling_type == "particle")
wf = wf.name
if len(bin_fields) > 1 and isinstance(accumulation, bool):
accumulation = [accumulation for _ in range(len(bin_fields))]
bin_fields = data_source._determine_fields(bin_fields)
fields = data_source._determine_fields(fields)
units = sanitize_field_tuple_keys(units, data_source)
extrema = sanitize_field_tuple_keys(extrema, data_source)
logs = sanitize_field_tuple_keys(logs, data_source)
override_bins = sanitize_field_tuple_keys(override_bins, data_source)
if any(is_pfield) and not all(is_pfield):
if hasattr(data_source.ds, "_sph_ptypes"):
is_local = [
data_source.ds.field_info[f].sampling_type == "local"
for f in bin_fields + fields
]
if wf is not None:
is_local.append(wf.sampling_type == "local")
is_local_or_pfield = [
pf or lf for (pf, lf) in zip(is_pfield, is_local, strict=True)
]
if not all(is_local_or_pfield):
raise YTIllDefinedProfile(
bin_fields, data_source._determine_fields(fields), wf, is_pfield
)
else:
raise YTIllDefinedProfile(
bin_fields, data_source._determine_fields(fields), wf, is_pfield
)
if len(bin_fields) == 1:
cls = Profile1D
elif len(bin_fields) == 2 and all(is_pfield):
if deposition == "cic":
if logs is not None:
if (bin_fields[0] in logs and logs[bin_fields[0]]) or (
bin_fields[1] in logs and logs[bin_fields[1]]
):
raise RuntimeError(
"CIC deposition is only implemented for linear-scaled axes"
)
else:
logs = {bin_fields[0]: False, bin_fields[1]: False}
if any(accumulation) or fractional:
raise RuntimeError(
"The accumulation and fractional keyword arguments must be "
"False for CIC deposition"
)
cls = ParticleProfile
elif len(bin_fields) == 2:
cls = Profile2D
elif len(bin_fields) == 3:
cls = Profile3D
else:
raise NotImplementedError
if weight_field is not None and cls == ParticleProfile:
(weight_field,) = data_source._determine_fields([weight_field])
wf = data_source.ds._get_field_info(weight_field)
if not wf.sampling_type == "particle":
weight_field = None
if not is_sequence(n_bins):
n_bins = [n_bins] * len(bin_fields)
if not is_sequence(accumulation):
accumulation = [accumulation] * len(bin_fields)
if logs is None:
logs = {}
logs_list = []
for bin_field in bin_fields:
if bin_field in logs:
logs_list.append(logs[bin_field])
else:
logs_list.append(data_source.ds.field_info[bin_field].take_log)
logs = logs_list
# Are the extrema all Nones? Then treat them as though extrema was set as None
if extrema is None or not any(collapse(extrema.values())):
ex = [
data_source.quantities["Extrema"](f, non_zero=l)
for f, l in zip(bin_fields, logs, strict=True)
]
# pad extrema by epsilon so cells at bin edges are not excluded
for i, (mi, ma) in enumerate(ex):
mi = mi - np.spacing(mi)
ma = ma + np.spacing(ma)
ex[i][0], ex[i][1] = mi, ma
else:
ex = []
for bin_field in bin_fields:
bf_units = data_source.ds.field_info[bin_field].output_units
try:
field_ex = list(extrema[bin_field[-1]])
except KeyError as e:
try:
field_ex = list(extrema[bin_field])
except KeyError:
raise RuntimeError(
f"Could not find field {bin_field[-1]} or {bin_field} in extrema"
) from e
if isinstance(field_ex[0], tuple):
field_ex = [data_source.ds.quan(*f) for f in field_ex]
if any(exi is None for exi in field_ex):
try:
ds_extrema = data_source.quantities.extrema(bin_field)
except AttributeError:
# ytdata profile datasets don't have data_source.quantities
bf_vals = data_source[bin_field]
ds_extrema = data_source.ds.arr([bf_vals.min(), bf_vals.max()])
for i, exi in enumerate(field_ex):
if exi is None:
field_ex[i] = ds_extrema[i]
# pad extrema by epsilon so cells at bin edges are
# not excluded
field_ex[i] -= (-1) ** i * np.spacing(field_ex[i])
if units is not None and bin_field in units:
for i, exi in enumerate(field_ex):
if hasattr(exi, "units"):
field_ex[i] = exi.to(units[bin_field])
else:
field_ex[i] = data_source.ds.quan(exi, units[bin_field])
fe = data_source.ds.arr(field_ex)
else:
if hasattr(field_ex, "units"):
fe = field_ex.to(bf_units)
else:
fe = data_source.ds.arr(field_ex, bf_units)
fe.convert_to_units(bf_units)
field_ex = [fe[0].v, fe[1].v]
if is_sequence(field_ex[0]):
field_ex[0] = data_source.ds.quan(field_ex[0][0], field_ex[0][1])
field_ex[0] = field_ex[0].in_units(bf_units)
if is_sequence(field_ex[1]):
field_ex[1] = data_source.ds.quan(field_ex[1][0], field_ex[1][1])
field_ex[1] = field_ex[1].in_units(bf_units)
ex.append(field_ex)
if override_bins is not None:
o_bins = []
for bin_field in bin_fields:
bf_units = data_source.ds.field_info[bin_field].output_units
try:
field_obin = override_bins[bin_field[-1]]
except KeyError:
field_obin = override_bins[bin_field]
if field_obin is None:
o_bins.append(None)
continue
if isinstance(field_obin, tuple):
field_obin = data_source.ds.arr(*field_obin)
if units is not None and bin_field in units:
fe = data_source.ds.arr(field_obin, units[bin_field])
else:
if hasattr(field_obin, "units"):
fe = field_obin.to(bf_units)
else:
fe = data_source.ds.arr(field_obin, bf_units)
fe.convert_to_units(bf_units)
field_obin = fe.d
o_bins.append(field_obin)
args = [data_source]
for f, n, (mi, ma), l in zip(bin_fields, n_bins, ex, logs, strict=True):
if mi <= 0 and l:
raise YTIllDefinedBounds(mi, ma)
args += [f, n, mi, ma, l]
kwargs = {"weight_field": weight_field}
if cls is ParticleProfile:
kwargs["deposition"] = deposition
if override_bins is not None:
for o_bin, ax in zip(o_bins, ["x", "y", "z"], strict=False):
kwargs[f"override_bins_{ax}"] = o_bin
obj = cls(*args, **kwargs)
obj.accumulation = accumulation
obj.fractional = fractional
if fields is not None:
obj.add_fields(list(fields))
for field in fields:
if fractional:
obj.field_data[field] /= obj.field_data[field].sum()
for axis, acc in enumerate(accumulation):
if not acc:
continue
temp = obj.field_data[field]
temp = np.rollaxis(temp, axis)
if weight_field is not None:
temp_weight = obj.weight
temp_weight = np.rollaxis(temp_weight, axis)
if acc < 0:
temp = temp[::-1]
if weight_field is not None:
temp_weight = temp_weight[::-1]
if weight_field is None:
temp = temp.cumsum(axis=0)
else:
# avoid 0-division warnings by nan-masking
_denom = temp_weight.cumsum(axis=0)
_denom[_denom == 0.0] = np.nan
temp = (temp * temp_weight).cumsum(axis=0) / _denom
if acc < 0:
temp = temp[::-1]
if weight_field is not None:
temp_weight = temp_weight[::-1]
temp = np.rollaxis(temp, axis)
obj.field_data[field] = temp
if weight_field is not None:
temp_weight = np.rollaxis(temp_weight, axis)
obj.weight = temp_weight
if units is not None:
for field, unit in units.items():
field = data_source._determine_fields(field)[0]
if field == obj.x_field:
obj.set_x_unit(unit)
elif field == getattr(obj, "y_field", None):
obj.set_y_unit(unit)
elif field == getattr(obj, "z_field", None):
obj.set_z_unit(unit)
else:
obj.set_field_unit(field, unit)
return obj