Source code for yt.frontends.exodus_ii.io

import numpy as np

from yt.utilities.file_handler import NetCDF4FileHandler
from yt.utilities.io_handler import BaseIOHandler


[docs] class IOHandlerExodusII(BaseIOHandler): _particle_reader = False _dataset_type = "exodus_ii" _INDEX_OFFSET = 1 def __init__(self, ds): self.filename = ds.index_filename exodus_ii_handler = NetCDF4FileHandler(self.filename) self.handler = exodus_ii_handler super().__init__(ds) self.node_fields = ds._get_nod_names() self.elem_fields = ds._get_elem_names() def _read_particle_coords(self, chunks, ptf): pass def _read_particle_fields(self, chunks, ptf, selector): pass def _read_fluid_selection(self, chunks, selector, fields, size): # This needs to allocate a set of arrays inside a dictionary, where the # keys are the (ftype, fname) tuples and the values are arrays that # have been masked using whatever selector method is appropriate. The # dict gets returned at the end and it should be flat, with selected # data. Note that if you're reading grid data, you might need to # special-case a grid selector object. with self.handler.open_ds() as ds: chunks = list(chunks) rv = {} for field in fields: ftype, fname = field if ftype == "all": ci = np.concatenate( [ mesh.connectivity_indices - self._INDEX_OFFSET for mesh in self.ds.index.mesh_union ] ) else: ci = ds.variables[ftype][:] - self._INDEX_OFFSET num_elem = ci.shape[0] if fname in self.node_fields: nodes_per_element = ci.shape[1] rv[field] = np.zeros((num_elem, nodes_per_element), dtype="float64") elif fname in self.elem_fields: rv[field] = np.zeros(num_elem, dtype="float64") for field in fields: ind = 0 ftype, fname = field if ftype == "all": mesh_ids = [mesh.mesh_id + 1 for mesh in self.ds.index.mesh_union] objs = list(self.ds.index.mesh_union) else: mesh_ids = [int(ftype.replace("connect", ""))] chunk = chunks[mesh_ids[0] - 1] objs = chunk.objs if fname in self.node_fields: field_ind = self.node_fields.index(fname) fdata = ds.variables["vals_nod_var%d" % (field_ind + 1)] for g in objs: ci = g.connectivity_indices - self._INDEX_OFFSET data = fdata[self.ds.step][ci] ind += g.select(selector, data, rv[field], ind) # caches if fname in self.elem_fields: field_ind = self.elem_fields.index(fname) for g, mesh_id in zip(objs, mesh_ids, strict=True): fdata = ds.variables[ "vals_elem_var%deb%s" % (field_ind + 1, mesh_id) ][:] data = fdata[self.ds.step, :] ind += g.select(selector, data, rv[field], ind) # caches rv[field] = rv[field][:ind] return rv def _read_chunk_data(self, chunk, fields): # This reads the data from a single chunk, and is only used for # caching. pass