"""
CF Radial data structures
"""
import contextlib
import os
import weakref
import numpy as np
from unyt import unyt_array
from yt.data_objects.index_subobjects.grid_patch import AMRGridPatch
from yt.data_objects.static_output import Dataset
from yt.funcs import mylog
from yt.geometry.grid_geometry_handler import GridIndex
from yt.utilities.file_handler import NetCDF4FileHandler, valid_netcdf_signature
from yt.utilities.on_demand_imports import _xarray as xr
from .fields import CFRadialFieldInfo
[docs]
class CFRadialGrid(AMRGridPatch):
_id_offset = 0
def __init__(self, id, index, level, dimensions):
super().__init__(id, filename=index.index_filename, index=index)
self.Parent = None
self.Children = []
self.Level = level
self.ActiveDimensions = dimensions
[docs]
class CFRadialHierarchy(GridIndex):
grid = CFRadialGrid
def __init__(self, ds, dataset_type="cf_radial"):
self.dataset_type = dataset_type
self.dataset = weakref.proxy(ds)
# our index file is the dataset itself:
self.index_filename = self.dataset.parameter_filename
self.directory = os.path.dirname(self.index_filename)
# float type for the simulation edges and must be float64 now
self.float_type = np.float64
super().__init__(ds, dataset_type)
def _detect_output_fields(self):
# This sets self.field_list, containing all the available on-disk fields and
# records the units for each field.
self.field_list = []
units = {}
with self.ds._handle() as xr_ds_handle:
for key in xr_ds_handle.variables.keys():
if all(x in xr_ds_handle[key].dims for x in ["time", "z", "y", "x"]):
fld = ("cf_radial", key)
self.field_list.append(fld)
units[fld] = xr_ds_handle[key].units
self.ds.field_units.update(units)
def _count_grids(self):
self.num_grids = 1
def _parse_index(self):
self.grid_left_edge[0][:] = self.ds.domain_left_edge[:]
self.grid_right_edge[0][:] = self.ds.domain_right_edge[:]
self.grid_dimensions[0][:] = self.ds.domain_dimensions[:]
self.grid_particle_count[0][0] = 0
self.grid_levels[0][0] = 0
self.max_level = 0
def _populate_grid_objects(self):
# only a single grid, no need to loop
g = self.grid(0, self, self.grid_levels.flat[0], self.grid_dimensions[0])
g._prepare_grid()
g._setup_dx()
self.grids = np.array([g], dtype="object")
[docs]
class CFRadialDataset(Dataset):
_load_requirements = ["xarray", "pyart"]
_index_class = CFRadialHierarchy
_field_info_class = CFRadialFieldInfo
def __init__(
self,
filename,
dataset_type="cf_radial",
storage_filename=None,
storage_overwrite: bool = False,
grid_shape: tuple[int, int, int] | None = None,
grid_limit_x: tuple[float, float] | None = None,
grid_limit_y: tuple[float, float] | None = None,
grid_limit_z: tuple[float, float] | None = None,
units_override=None,
):
"""
Parameters
----------
filename
dataset_type
storage_filename: Optional[str]
the filename to store gridded file to if necessary. If not provided,
the string "_yt_grid" will be appended to the dataset filename.
storage_overwrite: bool
if True and if any gridding parameters are set, then the
storage_filename will be over-written if it exists. Default is False.
grid_shape : Optional[Tuple[int, int, int]]
when gridding to cartesian, grid_shape is the number of cells in the
z, y, x coordinates. If not provided, yt attempts to calculate a
reasonable shape based on the resolution of the original cfradial grid
grid_limit_x : Optional[Tuple[float, float]]
The x range of the cartesian-gridded data in the form (xmin, xmax) with
x in the native radar range units
grid_limit_y : Optional[Tuple[float, float]]
The y range of the cartesian-gridded data in the form (ymin, ymax) with
y in the native radar range units
grid_limit_z : Optional[Tuple[float, float]]
The z range of the cartesian-gridded data in the form (zmin, zmax) with
z in the native radar range units
units_override
"""
self.fluid_types += ("cf_radial",)
with self._handle(filename=filename) as xr_ds_handle:
if "x" not in xr_ds_handle.coords:
if storage_filename is None:
f_base, f_ext = os.path.splitext(filename)
storage_filename = f_base + "_yt_grid" + f_ext
regrid = True
if os.path.exists(storage_filename):
# pyart grid.write will error if the filename exists, so this logic
# forces some explicit behavior to minimize confusion and avoid
# overwriting or deleting without explicit user consent.
if storage_overwrite:
os.remove(storage_filename)
elif any([grid_shape, grid_limit_x, grid_limit_y, grid_limit_z]):
mylog.warning(
"Ignoring provided grid parameters because %s exists.",
storage_filename,
)
mylog.warning(
"To re-grid, either provide a unique storage_filename or set "
"storage_overwrite to True to overwrite %s.",
storage_filename,
)
regrid = False
else:
mylog.info(
"loading existing re-gridded file: %s", storage_filename
)
regrid = False
if regrid:
mylog.info("Building cfradial grid")
from yt.utilities.on_demand_imports import _pyart as pyart
radar = pyart.io.read_cfradial(filename)
grid_limit_z = self._validate_grid_dim(radar, "z", grid_limit_z)
grid_limit_x = self._validate_grid_dim(radar, "x", grid_limit_x)
grid_limit_y = self._validate_grid_dim(radar, "y", grid_limit_y)
grid_limits = (grid_limit_z, grid_limit_y, grid_limit_x)
grid_shape = self._validate_grid_shape(grid_shape)
# note: grid_shape must be in (z, y, x) order.
self.grid_shape = grid_shape
self.grid_limits = grid_limits
mylog.info("Calling pyart.map.grid_from_radars ... ")
# this is fairly slow
grid = pyart.map.grid_from_radars(
(radar,),
grid_shape=self.grid_shape,
grid_limits=self.grid_limits,
)
mylog.info(
"Successfully built cfradial grid, writing to %s",
storage_filename,
)
mylog.info(
"Subsequent loads of %s will load the gridded file by default",
filename,
)
grid.write(storage_filename)
filename = storage_filename
super().__init__(filename, dataset_type, units_override=units_override)
self.storage_filename = storage_filename
self.refine_by = 2 # refinement factor between a grid and its subgrid
@contextlib.contextmanager
def _handle(self, filename: str | None = None):
if filename is None:
if hasattr(self, "filename"):
filename = self.filename
else:
raise RuntimeError("Dataset has no filename yet.")
with xr.open_dataset(filename) as xrds:
yield xrds
def _validate_grid_dim(
self, radar, dim: str, grid_limit: tuple[float, float] | None = None
) -> tuple[float, float]:
if grid_limit is None:
if dim.lower() == "z":
gate_alt = radar.gate_altitude["data"]
gate_alt_units = radar.gate_altitude["units"]
grid_limit = (gate_alt.min(), gate_alt.max())
grid_limit = self._round_grid_guess(grid_limit, gate_alt_units)
mylog.info(
"grid_limit_z not provided, using max height range in data: (%f, %f)",
*grid_limit,
)
else:
max_range = radar.range["data"].max()
grid_limit = self._round_grid_guess(
(-max_range, max_range), radar.range["units"]
)
mylog.info(
"grid_limit_%s not provided, using max horizontal range in data: (%f, %f)",
dim,
*grid_limit,
)
if len(grid_limit) != 2:
raise ValueError(
f"grid_limit_{dim} must have 2 dimensions, but it has {len(grid_limit)}"
)
return grid_limit
def _validate_grid_shape(
self, grid_shape: tuple[int, int, int] | None = None
) -> tuple[int, int, int]:
if grid_shape is None:
grid_shape = (100, 100, 100)
mylog.info(
"grid_shape not provided, using (nz, ny, nx) = (%i, %i, %i)",
*grid_shape,
)
if len(grid_shape) != 3:
raise ValueError(
f"grid_shape must have 3 dimensions, but it has {len(grid_shape)}"
)
return grid_shape
def _round_grid_guess(self, bounds: tuple[float, float], unit_str: str):
# rounds the bounds to the closest 10 km increment that still contains
# the grid_limit
for findstr, repstr in self._field_info_class.unit_subs:
unit_str = unit_str.replace(findstr, repstr)
limits = unyt_array(bounds, unit_str).to("km")
limits[0] = np.floor(limits[0] / 10.0) * 10.0
limits[1] = np.ceil(limits[1] / 10.0) * 10.0
return tuple(limits.to(unit_str).tolist())
def _set_code_unit_attributes(self):
with self._handle() as xr_ds_handle:
length_unit = xr_ds_handle.variables["x"].attrs["units"]
self.length_unit = self.quan(1.0, length_unit)
self.mass_unit = self.quan(1.0, "kg")
self.time_unit = self.quan(1.0, "s")
def _parse_parameter_file(self):
self.parameters = {}
with self._handle() as xr_ds_handle:
x, y, z = (xr_ds_handle.coords[d] for d in "xyz")
self.domain_left_edge = np.array([x.min(), y.min(), z.min()])
self.domain_right_edge = np.array([x.max(), y.max(), z.max()])
self.dimensionality = 3
dims = [xr_ds_handle.sizes[d] for d in "xyz"]
self.domain_dimensions = np.array(dims, dtype="int64")
self._periodicity = (False, False, False)
# note: origin_latitude and origin_longitude arrays will have time
# as a dimension and the initial implementation here only handles
# the first index. Also, the time array may have a datetime dtype,
# so cast to float.
self.origin_latitude = xr_ds_handle.origin_latitude[0]
self.origin_longitude = xr_ds_handle.origin_longitude[0]
self.current_time = float(xr_ds_handle.time.values[0])
# Cosmological information set to zero (not in space).
self.cosmological_simulation = 0
self.current_redshift = 0.0
self.omega_lambda = 0.0
self.omega_matter = 0.0
self.hubble_constant = 0.0
@classmethod
def _is_valid(cls, filename: str, *args, **kwargs) -> bool:
# This accepts a filename or a set of arguments and returns True or
# False depending on if the file is of the type requested.
if not valid_netcdf_signature(filename):
return False
if cls._missing_load_requirements():
return False
is_cfrad = False
try:
# note that we use the NetCDF4FileHandler here to avoid some
# issues with xarray opening datasets it cannot handle. Once
# a dataset is as identified as a CFRadialDataset, xarray is used
# for opening. See https://github.com/yt-project/yt/issues/3987
nc4_file = NetCDF4FileHandler(filename)
with nc4_file.open_ds(keepweakref=True) as ds:
con = "Conventions" # the attribute to check for file conventions
# note that the attributes here are potentially space- or
# comma-delimited strings, so we concatenate a single string
# to search for a substring.
cons = "" # the value of the Conventions attribute
for c in [con, con.lower(), "Sub_" + con.lower()]:
if hasattr(ds, c):
cons += getattr(ds, c)
is_cfrad = "CF/Radial" in cons or "CF-Radial" in cons
except (OSError, AttributeError, ImportError):
return False
return is_cfrad