Source code for yt.frontends.stream.io

import numpy as np

from yt.utilities.exceptions import YTDomainOverflow
from yt.utilities.io_handler import BaseIOHandler, BaseParticleIOHandler
from yt.utilities.logger import ytLogger as mylog


[docs] class IOHandlerStream(BaseIOHandler): _dataset_type = "stream" _vector_fields = {"particle_velocity": 3, "particle_position": 3} def __init__(self, ds): self.fields = ds.stream_handler.fields self.field_units = ds.stream_handler.field_units super().__init__(ds) def _read_data_set(self, grid, field): # This is where we implement processor-locking tr = self.fields[grid.id][field] if callable(tr): tr = tr(grid, field) # If it's particles, we copy. if len(tr.shape) == 1: return tr.copy() # New in-place unit conversion breaks if we don't copy first return tr def _read_fluid_selection(self, chunks, selector, fields, size): chunks = list(chunks) if any((ftype not in self.ds.fluid_types for ftype, fname in fields)): raise NotImplementedError rv = {} for field in fields: rv[field] = self.ds.arr(np.empty(size, dtype="float64")) ng = sum(len(c.objs) for c in chunks) mylog.debug( "Reading %s cells of %s fields in %s blocks", size, [f2 for f1, f2 in fields], ng, ) for field in fields: ftype, fname = field ind = 0 for chunk in chunks: for g in chunk.objs: ds = self.fields[g.id][ftype, fname] if callable(ds): ds = ds(g, field) ind += g.select(selector, ds, rv[field], ind) # caches return rv def _read_particle_coords(self, chunks, ptf): chunks = list(chunks) for chunk in chunks: for g in chunk.objs: if g.NumberOfParticles == 0: continue gf = self.fields[g.id] for ptype in sorted(ptf): if (ptype, "particle_position") in gf: x, y, z = gf[ptype, "particle_position"].T else: x, y, z = ( gf[ptype, f"particle_position_{ax}"] for ax in self.ds.coordinates.axis_order ) yield ptype, (x, y, z), 0.0 def _read_particle_fields(self, chunks, ptf, selector): chunks = list(chunks) for chunk in chunks: for g in chunk.objs: if g.NumberOfParticles == 0: continue gf = self.fields[g.id] for ptype, field_list in sorted(ptf.items()): if (ptype, "particle_position") in gf: x, y, z = gf[ptype, "particle_position"].T else: x, y, z = ( gf[ptype, f"particle_position_{ax}"] for ax in self.ds.coordinates.axis_order ) mask = selector.select_points(x, y, z, 0.0) if mask is None: continue for field in field_list: data = np.asarray(gf[ptype, field]) yield (ptype, field), data[mask] @property def _read_exception(self): return KeyError
[docs] class StreamParticleIOHandler(BaseParticleIOHandler): _dataset_type = "stream_particles" _vector_fields = {"particle_velocity": 3, "particle_position": 3} def __init__(self, ds): self.fields = ds.stream_handler.fields super().__init__(ds) def _read_particle_coords(self, chunks, ptf): for data_file in self._sorted_chunk_iterator(chunks): f = self.fields[data_file.filename] # This double-reads for ptype in sorted(ptf): yield ( ptype, ( f[ptype, "particle_position_x"], f[ptype, "particle_position_y"], f[ptype, "particle_position_z"], ), 0.0, ) def _read_smoothing_length(self, chunks, ptf, ptype): for data_file in self._sorted_chunk_iterator(chunks): f = self.fields[data_file.filename] return f[ptype, "smoothing_length"] def _read_particle_data_file(self, data_file, ptf, selector=None): return_data = {} f = self.fields[data_file.filename] for ptype, field_list in sorted(ptf.items()): if (ptype, "particle_position") in f: ppos = f[ptype, "particle_position"] x = ppos[:, 0] y = ppos[:, 1] z = ppos[:, 2] else: x, y, z = (f[ptype, f"particle_position_{ax}"] for ax in "xyz") if (ptype, "smoothing_length") in self.ds.field_list: hsml = f[ptype, "smoothing_length"] else: hsml = 0.0 if selector: mask = selector.select_points(x, y, z, hsml) if mask is None: continue for field in field_list: data = f[ptype, field] if selector: data = data[mask] return_data[ptype, field] = data return return_data def _yield_coordinates(self, data_file, needed_ptype=None): # self.fields[g.id][fname] is the pattern here for ptype in self.ds.particle_types_raw: if needed_ptype is not None and needed_ptype is not ptype: continue try: pos = np.column_stack( [ self.fields[data_file.filename][ (ptype, f"particle_position_{ax}") ] for ax in "xyz" ] ) except KeyError: pos = self.fields[data_file.filename][ptype, "particle_position"] if np.any(pos.min(axis=0) < data_file.ds.domain_left_edge) or np.any( pos.max(axis=0) > data_file.ds.domain_right_edge ): raise YTDomainOverflow( pos.min(axis=0), pos.max(axis=0), data_file.ds.domain_left_edge, data_file.ds.domain_right_edge, ) yield ptype, pos def _get_smoothing_length(self, data_file, dtype, shape): ptype = self.ds._sph_ptypes[0] return self.fields[data_file.filename][ptype, "smoothing_length"] def _count_particles(self, data_file): pcount = {} for ptype in self.ds.particle_types_raw: pcount[ptype] = 0 # stream datasets only have one "file" if data_file.file_id > 0: return pcount for ptype in self.ds.particle_types_raw: d = self.fields[data_file.filename] try: pcount[ptype] = d[ptype, "particle_position_x"].size except KeyError: pcount[ptype] = d[ptype, "particle_position"].shape[0] return pcount def _identify_fields(self, data_file): return self.fields[data_file.filename].keys(), {}
[docs] class IOHandlerStreamHexahedral(BaseIOHandler): _dataset_type = "stream_hexahedral" _vector_fields = {"particle_velocity": 3, "particle_position": 3} def __init__(self, ds): self.fields = ds.stream_handler.fields super().__init__(ds) def _read_fluid_selection(self, chunks, selector, fields, size): chunks = list(chunks) assert len(chunks) == 1 chunk = chunks[0] rv = {} for field in fields: ftype, fname = field rv[field] = np.empty(size, dtype="float64") ngrids = sum(len(chunk.objs) for chunk in chunks) mylog.debug( "Reading %s cells of %s fields in %s blocks", size, [fn for ft, fn in fields], ngrids, ) for field in fields: ind = 0 ftype, fname = field for chunk in chunks: for g in chunk.objs: ds = self.fields[g.mesh_id].get(field, None) if ds is None: ds = self.fields[g.mesh_id][fname] ind += g.select(selector, ds, rv[field], ind) # caches return rv
[docs] class IOHandlerStreamOctree(BaseIOHandler): _dataset_type = "stream_octree" _vector_fields = {"particle_velocity": 3, "particle_position": 3} def __init__(self, ds): self.fields = ds.stream_handler.fields super().__init__(ds) def _read_fluid_selection(self, chunks, selector, fields, size): rv = {} ind = 0 chunks = list(chunks) assert len(chunks) == 1 for chunk in chunks: assert len(chunk.objs) == 1 for subset in chunk.objs: field_vals = {} for field in fields: field_vals[field] = self.fields[ subset.domain_id - subset._domain_offset ][field] subset.fill(field_vals, rv, selector, ind) return rv
[docs] class IOHandlerStreamUnstructured(BaseIOHandler): _dataset_type = "stream_unstructured" def __init__(self, ds): self.fields = ds.stream_handler.fields super().__init__(ds) def _read_fluid_selection(self, chunks, selector, fields, size): chunks = list(chunks) rv = {} for field in fields: ftype, fname = field if ftype == "all": ci = np.concatenate( [mesh.connectivity_indices for mesh in self.ds.index.mesh_union] ) else: mesh_id = int(ftype[-1]) - 1 m = self.ds.index.meshes[mesh_id] ci = m.connectivity_indices num_elem = ci.shape[0] if fname in self.ds._node_fields: nodes_per_element = ci.shape[1] rv[field] = np.empty((num_elem, nodes_per_element), dtype="float64") else: rv[field] = np.empty(num_elem, dtype="float64") for field in fields: ind = 0 ftype, fname = field if ftype == "all": objs = list(self.ds.index.mesh_union) else: mesh_ids = [int(ftype[-1])] chunk = chunks[mesh_ids[0] - 1] objs = chunk.objs for g in objs: ds = self.fields[g.mesh_id].get(field, None) if ds is None: f = ("connect%d" % (g.mesh_id + 1), fname) ds = self.fields[g.mesh_id][f] ind += g.select(selector, ds, rv[field], ind) # caches rv[field] = rv[field][:ind] return rv