Source code for yt.frontends.artio.data_structures

import os
import weakref
from collections import defaultdict

import numpy as np

from yt.data_objects.field_data import YTFieldData
from yt.data_objects.index_subobjects.octree_subset import OctreeSubset
from yt.data_objects.static_output import Dataset
from yt.data_objects.unions import ParticleUnion
from yt.frontends.artio import _artio_caller
from yt.frontends.artio._artio_caller import (
    ARTIOSFCRangeHandler,
    artio_fileset,
    artio_is_valid,
)
from yt.frontends.artio.fields import ARTIOFieldInfo
from yt.funcs import mylog, setdefaultattr
from yt.geometry import particle_deposit as particle_deposit
from yt.geometry.geometry_handler import Index, YTDataChunk
from yt.utilities.exceptions import YTParticleDepositionNotImplemented


[docs] class ARTIOOctreeSubset(OctreeSubset): _domain_offset = 0 domain_id = -1 _con_args = ("base_region", "sfc_start", "sfc_end", "oct_handler", "ds") _type_name = "octree_subset" _num_zones = 2 def __init__(self, base_region, sfc_start, sfc_end, oct_handler, ds): self.field_data = YTFieldData() self.field_parameters = {} self.sfc_start = sfc_start self.sfc_end = sfc_end self._oct_handler = oct_handler self.ds = ds self._last_mask = None self._last_selector_id = None self._current_particle_type = "all" self._current_fluid_type = self.ds.default_fluid_type self.base_region = base_region self.base_selector = base_region.selector @property def oct_handler(self): return self._oct_handler @property def min_ind(self): return self.sfc_start @property def max_ind(self): return self.sfc_end
[docs] def fill(self, fields, selector): if len(fields) == 0: return [] handle = self.oct_handler.artio_handle field_indices = [ handle.parameters["grid_variable_labels"].index(f) for (ft, f) in fields ] cell_count = selector.count_oct_cells(self.oct_handler, self.domain_id) self.data_size = cell_count levels, cell_inds, file_inds = self.oct_handler.file_index_octs( selector, self.domain_id, cell_count ) domain_counts = self.oct_handler.domain_count(selector) tr = [np.zeros(cell_count, dtype="float64") for field in fields] self.oct_handler.fill_sfc( levels, cell_inds, file_inds, domain_counts, field_indices, tr ) tr = dict(zip(fields, tr, strict=True)) return tr
[docs] def fill_particles(self, fields): if len(fields) == 0: return {} ptype_indices = self.ds.particle_types art_fields = [(ptype_indices.index(ptype), fname) for ptype, fname in fields] species_data = self.oct_handler.fill_sfc_particles(art_fields) tr = defaultdict(dict) # Now we need to sum things up and then fill for s, f in fields: count = 0 dt = "float64" # default i = ptype_indices.index(s) # No vector fields in ARTIO count += species_data[i, f].size dt = species_data[i, f].dtype tr[s][f] = np.zeros(count, dtype=dt) cp = 0 v = species_data.pop((i, f)) tr[s][f][cp : cp + v.size] = v cp += v.size return tr
# We create something of a fake octree here. This is primarily to enable us to # reuse code for things like __getitem__ and the like. We will also create a # new oct_handler type that is functionally equivalent, except that it will # only manage the root mesh.
[docs] class ARTIORootMeshSubset(ARTIOOctreeSubset): _num_zones = 1 _type_name = "sfc_subset" _selector_module = _artio_caller domain_id = -1
[docs] def fill(self, fields, selector): # We know how big these will be. if len(fields) == 0: return [] handle = self.ds._handle field_indices = [ handle.parameters["grid_variable_labels"].index(f) for (ft, f) in fields ] tr = self.oct_handler.fill_sfc(selector, field_indices) self.data_size = tr[0].size tr = dict(zip(fields, tr, strict=True)) return tr
[docs] def deposit(self, positions, fields=None, method=None, kernel_name="cubic"): # Here we perform our particle deposition. if fields is None: fields = [] cls = getattr(particle_deposit, f"deposit_{method}", None) if cls is None: raise YTParticleDepositionNotImplemented(method) nz = self.nz nvals = (nz, nz, nz, self.ires.size) # We allocate number of zones, not number of octs op = cls(nvals, kernel_name) op.initialize() mylog.debug( "Depositing %s (%s^3) particles into %s Root Mesh", positions.shape[0], positions.shape[0] ** 0.3333333, nvals[-1], ) pos = np.array(positions, dtype="float64") f64 = [np.array(f, dtype="float64") for f in fields] self.oct_handler.deposit(op, self.base_selector, pos, f64) vals = op.finalize() if vals is None: return return np.asfortranarray(vals)
[docs] class ARTIOIndex(Index): def __init__(self, ds, dataset_type="artio"): self.dataset_type = dataset_type self.dataset = weakref.proxy(ds) # for now, the index file is the dataset! self.index_filename = self.dataset.parameter_filename self.directory = os.path.dirname(self.index_filename) self.max_level = ds.max_level self.range_handlers = {} self.float_type = np.float64 super().__init__(ds, dataset_type) @property def max_range(self): return self.dataset.max_range def _setup_geometry(self): mylog.debug("Initializing Geometry Handler empty for now.")
[docs] def get_smallest_dx(self): """ Returns (in code units) the smallest cell size in the simulation. """ return ( self.dataset.domain_width / (self.dataset.domain_dimensions * 2 ** (self.max_level)) ).min()
def _get_particle_type_counts(self): # this could be done in the artio C interface without creating temporary # arrays but I don't want to touch that code # if a future brave soul wants to try, take a look at # `read_sfc_particles` in _artio_caller.pyx result = {} ad = self.ds.all_data() for ptype in self.ds.particle_types_raw: result[ptype] = ad[ptype, "PID"].size return result
[docs] def convert(self, unit): return self.dataset.conversion_factors[unit]
[docs] def find_max(self, field, finest_levels=3): """ Returns (value, center) of location of maximum for a given field. """ if (field, finest_levels) in self._max_locations: return self._max_locations[field, finest_levels] mv, pos = self.find_max_cell_location(field, finest_levels) self._max_locations[field, finest_levels] = (mv, pos) return mv, pos
[docs] def find_max_cell_location(self, field, finest_levels=3): source = self.all_data() if finest_levels is not False: source.min_level = self.max_level - finest_levels mylog.debug("Searching for maximum value of %s", field) max_val, mx, my, mz = source.quantities["MaxLocation"](field) mylog.info("Max Value is %0.5e at %0.16f %0.16f %0.16f", max_val, mx, my, mz) self.ds.parameters[f"Max{field}Value"] = max_val self.ds.parameters[f"Max{field}Pos"] = f"{mx, my, mz}" return max_val, np.array((mx, my, mz), dtype="float64")
def _detect_output_fields(self): self.fluid_field_list = self._detect_fluid_fields() self.particle_field_list = self._detect_particle_fields() self.field_list = self.fluid_field_list + self.particle_field_list mylog.debug("Detected fields: %s", (self.field_list,)) def _detect_fluid_fields(self): return [("artio", f) for f in self.ds.artio_parameters["grid_variable_labels"]] def _detect_particle_fields(self): fields = set() for i, ptype in enumerate(self.ds.particle_types): if ptype == "all": break # This will always be after all intrinsic for fname in self.ds.particle_variables[i]: fields.add((ptype, fname)) return list(fields) def _identify_base_chunk(self, dobj): if getattr(dobj, "_chunk_info", None) is None: try: all_data = all(dobj.left_edge == self.ds.domain_left_edge) and all( dobj.right_edge == self.ds.domain_right_edge ) except Exception: all_data = False base_region = getattr(dobj, "base_region", dobj) sfc_start = getattr(dobj, "sfc_start", None) sfc_end = getattr(dobj, "sfc_end", None) nz = getattr(dobj, "_num_zones", 0) if all_data: mylog.debug("Selecting entire artio domain") list_sfc_ranges = self.ds._handle.root_sfc_ranges_all( max_range_size=self.max_range ) elif sfc_start is not None and sfc_end is not None: mylog.debug("Restricting to %s .. %s", sfc_start, sfc_end) list_sfc_ranges = [(sfc_start, sfc_end)] else: mylog.debug("Running selector on artio base grid") list_sfc_ranges = self.ds._handle.root_sfc_ranges( dobj.selector, max_range_size=self.max_range ) ci = [] # v = np.array(list_sfc_ranges) # list_sfc_ranges = [ (v.min(), v.max()) ] for start, end in list_sfc_ranges: if (start, end) in self.range_handlers.keys(): range_handler = self.range_handlers[start, end] else: range_handler = ARTIOSFCRangeHandler( self.ds.domain_dimensions, self.ds.domain_left_edge, self.ds.domain_right_edge, self.ds._handle, start, end, ) range_handler.construct_mesh() self.range_handlers[start, end] = range_handler if nz != 2: ci.append( ARTIORootMeshSubset( base_region, start, end, range_handler.root_mesh_handler, self.ds, ) ) if nz != 1 and range_handler.total_octs > 0: ci.append( ARTIOOctreeSubset( base_region, start, end, range_handler.octree_handler, self.ds, ) ) dobj._chunk_info = ci if len(list_sfc_ranges) > 1: mylog.info("Created %d chunks for ARTIO", len(list_sfc_ranges)) dobj._current_chunk = list(self._chunk_all(dobj))[0] def _data_size(self, dobj, dobjs): size = 0 for d in dobjs: size += d.data_size return size def _chunk_all(self, dobj): oobjs = getattr(dobj._current_chunk, "objs", dobj._chunk_info) yield YTDataChunk(dobj, "all", oobjs, None, cache=True) def _chunk_spatial(self, dobj, ngz, preload_fields=None): if ngz > 0: raise NotImplementedError sobjs = getattr(dobj._current_chunk, "objs", dobj._chunk_info) for og in sobjs: if ngz > 0: g = og.retrieve_ghost_zones(ngz, [], smoothed=True) else: g = og yield YTDataChunk(dobj, "spatial", [g], None, cache=True) def _chunk_io(self, dobj, cache=True, local_only=False): # _current_chunk is made from identify_base_chunk oobjs = getattr(dobj._current_chunk, "objs", dobj._chunk_info) for chunk in oobjs: yield YTDataChunk(dobj, "io", [chunk], None, cache=cache) def _read_fluid_fields(self, fields, dobj, chunk=None): if len(fields) == 0: return {}, [] if chunk is None: self._identify_base_chunk(dobj) fields_to_return = {} fields_to_read, fields_to_generate = self._split_fields(fields) if len(fields_to_read) == 0: return {}, fields_to_generate fields_to_return = self.io._read_fluid_selection( self._chunk_io(dobj), dobj.selector, fields_to_read ) return fields_to_return, fields_to_generate def _icoords_to_fcoords( self, icoords: np.ndarray, ires: np.ndarray, axes: tuple[int, ...] | None = None, ) -> tuple[np.ndarray, np.ndarray]: """ Accepts icoords and ires and returns appropriate fcoords and fwidth. Mostly useful for cases where we have irregularly spaced or structured grids. """ dds = self.ds.domain_width[axes,] / ( self.ds.domain_dimensions[axes,] * self.ds.refine_by ** ires[:, None] ) pos = (0.5 + icoords) * dds + self.ds.domain_left_edge[axes,] return pos, dds
[docs] class ARTIODataset(Dataset): _handle = None _index_class = ARTIOIndex _field_info_class = ARTIOFieldInfo def __init__( self, filename, dataset_type="artio", storage_filename=None, max_range=1024, units_override=None, unit_system="cgs", default_species_fields=None, ): if self._handle is not None: return self.max_range = max_range self.fluid_types += ("artio",) self._filename = filename self._fileset_prefix = filename[:-4] self._handle = artio_fileset(bytes(self._fileset_prefix, "utf-8")) self.artio_parameters = self._handle.parameters # Here we want to initiate a traceback, if the reader is not built. Dataset.__init__( self, filename, dataset_type, units_override=units_override, unit_system=unit_system, default_species_fields=default_species_fields, ) self.storage_filename = storage_filename def _set_code_unit_attributes(self): setdefaultattr(self, "mass_unit", self.quan(self.parameters["unit_m"], "g")) setdefaultattr(self, "length_unit", self.quan(self.parameters["unit_l"], "cm")) setdefaultattr(self, "time_unit", self.quan(self.parameters["unit_t"], "s")) setdefaultattr(self, "velocity_unit", self.length_unit / self.time_unit) def _parse_parameter_file(self): # hard-coded -- not provided by headers self.dimensionality = 3 self.refine_by = 2 self.parameters["HydroMethod"] = "artio" self.parameters["Time"] = 1.0 # default unit is 1... # read header self.num_grid = self._handle.num_grid self.domain_dimensions = np.ones(3, dtype="int32") * self.num_grid self.domain_left_edge = np.zeros(3, dtype="float64") self.domain_right_edge = np.ones(3, dtype="float64") * self.num_grid # TODO: detect if grid exists self.min_level = 0 # ART has min_level=0 self.max_level = self.artio_parameters["grid_max_level"][0] # TODO: detect if particles exist if self._handle.has_particles: self.num_species = self.artio_parameters["num_particle_species"][0] self.particle_variables = [ ["PID", "SPECIES"] for i in range(self.num_species) ] # If multiple N-BODY species exist, they all have the same name, # which can lead to conflict if not renamed # A particle union will be created later to hold all N-BODY # particles and will take the name "N-BODY" labels = self.artio_parameters["particle_species_labels"] if labels.count("N-BODY") > 1: for species, label in enumerate(labels): if label == "N-BODY": labels[species] = f"N-BODY_{species}" self.particle_types_raw = self.artio_parameters["particle_species_labels"] self.particle_types = tuple(self.particle_types_raw) for species in range(self.num_species): # Mass would be best as a derived field, # but wouldn't detect under 'all' label = self.artio_parameters["particle_species_labels"][species] if "N-BODY" in label: self.particle_variables[species].append("MASS") if self.artio_parameters["num_primary_variables"][species] > 0: self.particle_variables[species].extend( self.artio_parameters[ "species_%02d_primary_variable_labels" % (species,) ] ) if self.artio_parameters["num_secondary_variables"][species] > 0: self.particle_variables[species].extend( self.artio_parameters[ "species_%02d_secondary_variable_labels" % (species,) ] ) else: self.num_species = 0 self.particle_variables = [] self.particle_types = () self.particle_types_raw = self.particle_types self.current_time = self.quan( self._handle.tphys_from_tcode(self.artio_parameters["tl"][0]), "yr" ) # detect cosmology if "abox" in self.artio_parameters: self.cosmological_simulation = True abox = self.artio_parameters["abox"][0] self.omega_lambda = self.artio_parameters["OmegaL"][0] self.omega_matter = self.artio_parameters["OmegaM"][0] self.hubble_constant = self.artio_parameters["hubble"][0] self.current_redshift = 1.0 / self.artio_parameters["auni"][0] - 1.0 self.current_redshift_box = 1.0 / abox - 1.0 self.parameters["initial_redshift"] = ( 1.0 / self.artio_parameters["auni_init"][0] - 1.0 ) self.parameters["CosmologyInitialRedshift"] = self.parameters[ "initial_redshift" ] self.parameters["unit_m"] = self.artio_parameters["mass_unit"][0] self.parameters["unit_t"] = self.artio_parameters["time_unit"][0] * abox**2 self.parameters["unit_l"] = self.artio_parameters["length_unit"][0] * abox if self.artio_parameters["DeltaDC"][0] != 0: mylog.warning( "DeltaDC != 0, which implies auni != abox. " "Be sure you understand which expansion parameter " "is appropriate for your use! (Gnedin, Kravtsov, & Rudd 2011)" ) else: self.cosmological_simulation = False self.parameters["unit_l"] = self.artio_parameters["length_unit"][0] self.parameters["unit_t"] = self.artio_parameters["time_unit"][0] self.parameters["unit_m"] = self.artio_parameters["mass_unit"][0] # hard coded assumption of 3D periodicity self._periodicity = (True, True, True)
[docs] def create_field_info(self): super().create_field_info() # only make the particle union if there are multiple DM species. # If there are multiple, "N-BODY_0" will be the first species. If there # are not multiple, they will be all under "N-BODY" if "N-BODY_0" in self.particle_types_raw: dm_labels = [ label for label in self.particle_types_raw if "N-BODY" in label ] # Use the N-BODY label for the union to be consistent with the # previous single mass N-BODY case, where this label was used for # all N-BODY particles by default pu = ParticleUnion("N-BODY", dm_labels) self.add_particle_union(pu)
@classmethod def _is_valid(cls, filename: str, *args, **kwargs) -> bool: # a valid artio header file starts with a prefix and ends with .art name, _, ext = filename.rpartition(".") if ext != "art": return False return artio_is_valid(bytes(name, "utf-8"))