Source code for yt.geometry.coordinates.geographic_coordinates

import numpy as np
import unyt

from yt.utilities.lib.pixelization_routines import pixelize_cartesian, pixelize_cylinder

from .coordinate_handler import (
    CoordinateHandler,
    _get_coord_fields,
    _setup_dummy_cartesian_coords_and_widths,
)


[docs] class GeographicCoordinateHandler(CoordinateHandler): radial_axis = "altitude" name = "geographic" def __init__(self, ds, ordering=None): if ordering is None: ordering = ("latitude", "longitude", self.radial_axis) super().__init__(ds, ordering) self.image_units = {} self.image_units[self.axis_id["latitude"]] = (None, None) self.image_units[self.axis_id["longitude"]] = (None, None) self.image_units[self.axis_id[self.radial_axis]] = ("deg", "deg")
[docs] def setup_fields(self, registry): # Missing implementation for x, y and z coordinates. _setup_dummy_cartesian_coords_and_widths(registry, axes=("x", "y", "z")) f1, f2 = _get_coord_fields(self.axis_id["latitude"], "") registry.add_field( ("index", "dlatitude"), sampling_type="cell", function=f1, display_field=False, units="", ) registry.add_field( ("index", "latitude"), sampling_type="cell", function=f2, display_field=False, units="", ) f1, f2 = _get_coord_fields(self.axis_id["longitude"], "") registry.add_field( ("index", "dlongitude"), sampling_type="cell", function=f1, display_field=False, units="", ) registry.add_field( ("index", "longitude"), sampling_type="cell", function=f2, display_field=False, units="", ) f1, f2 = _get_coord_fields(self.axis_id[self.radial_axis]) registry.add_field( ("index", f"d{self.radial_axis}"), sampling_type="cell", function=f1, display_field=False, units="code_length", ) registry.add_field( ("index", self.radial_axis), sampling_type="cell", function=f2, display_field=False, units="code_length", ) def _SphericalVolume(field, data): # We can use the transformed coordinates here. # Here we compute the spherical volume element exactly r = data["index", "r"] dr = data["index", "dr"] theta = data["index", "theta"] dtheta = data["index", "dtheta"] vol = ((r + 0.5 * dr) ** 3 - (r - 0.5 * dr) ** 3) / 3.0 vol *= np.cos(theta - 0.5 * dtheta) - np.cos(theta + 0.5 * dtheta) vol *= data["index", "dphi"] return vol registry.add_field( ("index", "cell_volume"), sampling_type="cell", function=_SphericalVolume, units="code_length**3", ) registry.alias(("index", "volume"), ("index", "cell_volume")) def _path_radial_axis(field, data): return data["index", f"d{self.radial_axis}"] registry.add_field( ("index", f"path_element_{self.radial_axis}"), sampling_type="cell", function=_path_radial_axis, units="code_length", ) def _path_latitude(field, data): # We use r here explicitly return data["index", "r"] * data["index", "dlatitude"] * np.pi / 180.0 registry.add_field( ("index", "path_element_latitude"), sampling_type="cell", function=_path_latitude, units="code_length", ) def _path_longitude(field, data): # We use r here explicitly return ( data["index", "r"] * data["index", "dlongitude"] * np.pi / 180.0 * np.sin((90 - data["index", "latitude"]) * np.pi / 180.0) ) registry.add_field( ("index", "path_element_longitude"), sampling_type="cell", function=_path_longitude, units="code_length", ) def _latitude_to_theta(field, data): # latitude runs from -90 to 90 # theta = 0 at +90 deg, np.pi at -90 return (90.0 - data["index", "latitude"]) * np.pi / 180.0 registry.add_field( ("index", "theta"), sampling_type="cell", function=_latitude_to_theta, units="", ) def _dlatitude_to_dtheta(field, data): return data["index", "dlatitude"] * np.pi / 180.0 registry.add_field( ("index", "dtheta"), sampling_type="cell", function=_dlatitude_to_dtheta, units="", ) def _longitude_to_phi(field, data): # longitude runs from -180 to 180 lonvals = data[("index", "longitude")] neglons = lonvals < 0.0 if np.any(neglons): lonvals[neglons] = lonvals[neglons] + 360.0 return lonvals * np.pi / 180.0 registry.add_field( ("index", "phi"), sampling_type="cell", function=_longitude_to_phi, units="" ) def _dlongitude_to_dphi(field, data): return data["index", "dlongitude"] * np.pi / 180.0 registry.add_field( ("index", "dphi"), sampling_type="cell", function=_dlongitude_to_dphi, units="", ) self._setup_radial_fields(registry)
def _setup_radial_fields(self, registry): # This stays here because we don't want to risk the field detector not # properly getting the data_source, etc. def _altitude_to_radius(field, data): surface_height = data.get_field_parameter("surface_height") if surface_height is None: if hasattr(data.ds, "surface_height"): surface_height = data.ds.surface_height else: surface_height = data.ds.quan(0.0, "code_length") return data["index", "altitude"] + surface_height registry.add_field( ("index", "r"), sampling_type="cell", function=_altitude_to_radius, units="code_length", ) registry.alias(("index", "dr"), ("index", "daltitude")) def _retrieve_radial_offset(self, data_source=None): # This returns the factor by which the radial field is multiplied and # the scalar its offset by. Typically the "factor" will *only* be # either 1.0 or -1.0. The order will be factor * r + offset. # Altitude is the radius from the central zone minus the radius of the # surface. Depth to radius is negative value of depth plus the # outermost radius. surface_height = None if data_source is not None: surface_height = data_source.get_field_parameter("surface_height") if surface_height is None: if hasattr(self.ds, "surface_height"): surface_height = self.ds.surface_height else: surface_height = self.ds.quan(0.0, "code_length") return surface_height, 1.0
[docs] def pixelize( self, dimension, data_source, field, bounds, size, antialias=True, periodic=True, *, return_mask=False, ): if self.axis_name[dimension] in ("latitude", "longitude"): buff, mask = self._cyl_pixelize( data_source, field, bounds, size, antialias, dimension ) elif self.axis_name[dimension] == self.radial_axis: buff, mask = self._ortho_pixelize( data_source, field, bounds, size, antialias, dimension, periodic ) else: raise NotImplementedError if return_mask: assert mask is None or mask.dtype == bool return buff, mask else: return buff
[docs] def pixelize_line(self, field, start_point, end_point, npoints): raise NotImplementedError
def _ortho_pixelize( self, data_source, field, bounds, size, antialias, dimension, periodic ): period = self.period[:2].copy() period[0] = self.period[self.x_axis[dimension]] period[1] = self.period[self.y_axis[dimension]] if hasattr(period, "in_units"): period = period.in_units("code_length").d # For a radial axis, px will correspond to longitude and py will # correspond to latitude. px = data_source["px"] pdx = data_source["pdx"] py = data_source["py"] pdy = data_source["pdy"] buff = np.full((size[1], size[0]), np.nan, dtype="float64") mask = pixelize_cartesian( buff, px, py, pdx, pdy, data_source[field], bounds, int(antialias), period, int(periodic), ) return buff, mask def _cyl_pixelize(self, data_source, field, bounds, size, antialias, dimension): offset, factor = self._retrieve_radial_offset(data_source) r = factor * data_source["py"] + offset # Because of the axis-ordering, dimensions 0 and 1 both have r as py # and the angular coordinate as px. But we need to figure out how to # convert our coordinate back to an actual angle, based on which # dimension we're in. pdx = data_source["pdx"].d * np.pi / 180 if self.axis_name[self.x_axis[dimension]] == "latitude": px = (data_source["px"].d + 90) * np.pi / 180 do_transpose = True elif self.axis_name[self.x_axis[dimension]] == "longitude": px = (data_source["px"].d + 180) * np.pi / 180 do_transpose = False else: # We should never get here! raise NotImplementedError buff = np.full((size[1], size[0]), np.nan, dtype="f8") mask = pixelize_cylinder( buff, r, data_source["pdy"], px, pdx, data_source[field], bounds, return_mask=True, ) if do_transpose: buff = buff.transpose() mask = mask.transpose() return buff, mask
[docs] def convert_from_cartesian(self, coord): raise NotImplementedError
[docs] def convert_to_cartesian(self, coord): offset, factor = self._retrieve_radial_offset() if isinstance(coord, np.ndarray) and len(coord.shape) > 1: rad = self.axis_id[self.radial_axis] lon = self.axis_id["longitude"] lat = self.axis_id["latitude"] r = factor * coord[:, rad] + offset colatitude = _latitude_to_colatitude(coord[:, lat]) phi = coord[:, lon] * np.pi / 180 nc = np.zeros_like(coord) # r, theta, phi nc[:, lat] = np.cos(phi) * np.sin(colatitude) * r nc[:, lon] = np.sin(phi) * np.sin(colatitude) * r nc[:, rad] = np.cos(colatitude) * r else: a, b, c = coord colatitude = _latitude_to_colatitude(b) phi = a * np.pi / 180 r = factor * c + offset nc = ( np.cos(phi) * np.sin(colatitude) * r, np.sin(phi) * np.sin(colatitude) * r, np.cos(colatitude) * r, ) return nc
[docs] def convert_to_cylindrical(self, coord): raise NotImplementedError
[docs] def convert_from_cylindrical(self, coord): raise NotImplementedError
[docs] def convert_to_spherical(self, coord): raise NotImplementedError
[docs] def convert_from_spherical(self, coord): raise NotImplementedError
_image_axis_name = None @property def image_axis_name(self): if self._image_axis_name is not None: return self._image_axis_name # This is the x and y axes labels that get displayed. For # non-Cartesian coordinates, we usually want to override these for # Cartesian coordinates, since we transform them. rv = { self.axis_id["latitude"]: ( "x / \\sin(\\mathrm{latitude})", "y / \\sin(\\mathrm{latitude})", ), self.axis_id["longitude"]: ("R", "z"), self.axis_id[self.radial_axis]: ("longitude", "latitude"), } for i in list(rv.keys()): rv[self.axis_name[i]] = rv[i] rv[self.axis_name[i].capitalize()] = rv[i] self._image_axis_name = rv return rv _x_pairs = ( ("latitude", "longitude"), ("longitude", "latitude"), ("altitude", "longitude"), ) _y_pairs = ( ("latitude", "altitude"), ("longitude", "altitude"), ("altitude", "latitude"), ) _data_projection = None @property def data_projection(self): # this will control the default projection to use when displaying data if self._data_projection is not None: return self._data_projection dpj = {} for ax in self.axis_order: if ax == self.radial_axis: dpj[ax] = "Mollweide" else: dpj[ax] = None self._data_projection = dpj return dpj _data_transform = None @property def data_transform(self): # this is the coordinate system on which the data is defined (the crs). if self._data_transform is not None: return self._data_transform dtx = {} for ax in self.axis_order: if ax == self.radial_axis: dtx[ax] = "PlateCarree" else: dtx[ax] = None self._data_transform = dtx return dtx @property def period(self): return self.ds.domain_width
[docs] def sanitize_center(self, center, axis): center, display_center = super().sanitize_center(center, axis) name = self.axis_name[axis] if name == self.radial_axis: display_center = center elif name == "latitude": display_center = ( 0.0 * display_center[0], 0.0 * display_center[1], 0.0 * display_center[2], ) elif name == "longitude": ri = self.axis_id[self.radial_axis] c = (self.ds.domain_right_edge[ri] + self.ds.domain_left_edge[ri]) / 2.0 display_center = [ 0.0 * display_center[0], 0.0 * display_center[1], 0.0 * display_center[2], ] display_center[self.axis_id["latitude"]] = c return center, display_center
[docs] def sanitize_width(self, axis, width, depth): name = self.axis_name[axis] if width is not None: width = super().sanitize_width(axis, width, depth) elif name == self.radial_axis: rax = self.radial_axis width = [ self.ds.domain_width[self.x_axis[rax]], self.ds.domain_width[self.y_axis[rax]], ] elif name == "latitude": ri = self.axis_id[self.radial_axis] # Remember, in spherical coordinates when we cut in theta, # we create a conic section width = [2.0 * self.ds.domain_width[ri], 2.0 * self.ds.domain_width[ri]] elif name == "longitude": ri = self.axis_id[self.radial_axis] width = [self.ds.domain_width[ri], 2.0 * self.ds.domain_width[ri]] return width
[docs] class InternalGeographicCoordinateHandler(GeographicCoordinateHandler): radial_axis = "depth" name = "internal_geographic" def _setup_radial_fields(self, registry): # Altitude is the radius from the central zone minus the radius of the # surface. def _depth_to_radius(field, data): outer_radius = data.get_field_parameter("outer_radius") if outer_radius is None: if hasattr(data.ds, "outer_radius"): outer_radius = data.ds.outer_radius else: # Otherwise, we assume that the depth goes to full depth, # so we can look at the domain right edge in depth. rax = self.axis_id[self.radial_axis] outer_radius = data.ds.domain_right_edge[rax] return -1.0 * data["index", "depth"] + outer_radius registry.add_field( ("index", "r"), sampling_type="cell", function=_depth_to_radius, units="code_length", ) registry.alias(("index", "dr"), ("index", "ddepth")) def _retrieve_radial_offset(self, data_source=None): # Depth means switching sign and adding to full radius outer_radius = None if data_source is not None: outer_radius = data_source.get_field_parameter("outer_radius") if outer_radius is None: if hasattr(self.ds, "outer_radius"): outer_radius = self.ds.outer_radius else: # Otherwise, we assume that the depth goes to full depth, # so we can look at the domain right edge in depth. rax = self.axis_id[self.radial_axis] outer_radius = self.ds.domain_right_edge[rax] return outer_radius, -1.0 _x_pairs = ( ("latitude", "longitude"), ("longitude", "latitude"), ("depth", "longitude"), ) _y_pairs = (("latitude", "depth"), ("longitude", "depth"), ("depth", "latitude"))
[docs] def sanitize_center(self, center, axis): center, display_center = super( GeographicCoordinateHandler, self ).sanitize_center(center, axis) name = self.axis_name[axis] if name == self.radial_axis: display_center = center elif name == "latitude": display_center = ( 0.0 * display_center[0], 0.0 * display_center[1], 0.0 * display_center[2], ) elif name == "longitude": ri = self.axis_id[self.radial_axis] offset, factor = self._retrieve_radial_offset() outermost = factor * self.ds.domain_left_edge[ri] + offset display_center = [ 0.0 * display_center[0], 0.0 * display_center[1], 0.0 * display_center[2], ] display_center[self.axis_id["latitude"]] = outermost / 2.0 return center, display_center
[docs] def sanitize_width(self, axis, width, depth): name = self.axis_name[axis] if width is not None: width = super(GeographicCoordinateHandler, self).sanitize_width( axis, width, depth ) elif name == self.radial_axis: rax = self.radial_axis width = [ self.ds.domain_width[self.x_axis[rax]], self.ds.domain_width[self.y_axis[rax]], ] elif name == "latitude": ri = self.axis_id[self.radial_axis] # Remember, in spherical coordinates when we cut in theta, # we create a conic section offset, factor = self._retrieve_radial_offset() outermost = factor * self.ds.domain_left_edge[ri] + offset width = [2.0 * outermost, 2.0 * outermost] elif name == "longitude": ri = self.axis_id[self.radial_axis] offset, factor = self._retrieve_radial_offset() outermost = factor * self.ds.domain_left_edge[ri] + offset width = [outermost, 2.0 * outermost] return width
def _latitude_to_colatitude(lat_vals): # convert latitude to theta, accounting for units, # including the case where the units are code_length # due to how yt stores the domain_center units for # geographic coordinates. if isinstance(lat_vals, unyt.unyt_array): if lat_vals.units.dimensions == unyt.dimensions.length: return (90.0 - lat_vals.d) * np.pi / 180.0 ninety = unyt.unyt_quantity(90.0, "degree") return (ninety - lat_vals).to("radian") return (90 - lat_vals) * np.pi / 180.0