Source code for yt.frontends.flash.io

from functools import cached_property
from itertools import groupby

import numpy as np

from yt.geometry.selection_routines import AlwaysSelector
from yt.utilities.io_handler import BaseIOHandler


# http://stackoverflow.com/questions/2361945/detecting-consecutive-integers-in-a-list
[docs] def particle_sequences(grids): g_iter = sorted(grids, key=lambda g: g.id) for _k, g in groupby(enumerate(g_iter), lambda i_x: i_x[0] - i_x[1].id): seq = [v[1] for v in g] yield seq[0], seq[-1]
[docs] def grid_sequences(grids): g_iter = sorted(grids, key=lambda g: g.id) for _k, g in groupby(enumerate(g_iter), lambda i_x1: i_x1[0] - i_x1[1].id): seq = [v[1] for v in g] yield seq
[docs] def determine_particle_fields(handle): try: particle_fields = [ s[0].decode("ascii", "ignore").strip() for s in handle["/particle names"][:] ] _particle_fields = {"particle_" + s: i for i, s in enumerate(particle_fields)} except KeyError: _particle_fields = {} return _particle_fields
def _conditionally_split_arrays(inp_arrays, condition): output_true = [] output_false = [] for arr in inp_arrays: output_true.append(arr[condition]) output_false.append(arr[~condition]) return output_true, output_false
[docs] class IOHandlerFLASH(BaseIOHandler): _particle_reader = False _dataset_type = "flash_hdf5" def __init__(self, ds): super().__init__(ds) # Now we cache the particle fields self._handle = ds._handle self._particle_handle = ds._particle_handle self._particle_fields = determine_particle_fields(self._particle_handle) def _read_particles( self, fields_to_read, type, args, grid_list, count_list, conv_factors ): pass
[docs] def io_iter(self, chunks, fields): f = self._handle for chunk in chunks: for field in fields: # Note that we *prefer* to iterate over the fields on the # outside; here, though, we're iterating over them on the # inside because we may exhaust our chunks. ftype, fname = field ds = f[f"/{fname}"] for gs in grid_sequences(chunk.objs): start = gs[0].id - gs[0]._id_offset end = gs[-1].id - gs[-1]._id_offset + 1 data = ds[start:end, :, :, :] for i, g in enumerate(gs): yield field, g, self._read_obj_field(g, field, (data, i))
def _read_particle_coords(self, chunks, ptf): chunks = list(chunks) f_part = self._particle_handle p_ind = self.ds.index._particle_indices px, py, pz = (self._particle_fields[f"particle_pos{ax}"] for ax in "xyz") pblk = self._particle_fields["particle_blk"] blockless_buffer = self.ds.index._blockless_particle_count p_fields = f_part["/tracer particles"] assert len(ptf) == 1 ptype = list(ptf.keys())[0] bxyz = [] # We need to track all the particles that don't have blocks and make # sure they get yielded too. But, we also want our particles to largely # line up with the grids they are resident in. So we keep a buffer of # particles. That buffer is checked for any "blockless" particles, # which get yielded at the end. for chunk in chunks: start = end = None for g1, g2 in particle_sequences(chunk.objs): start = p_ind[g1.id - g1._id_offset] end_nobuff = p_ind[g2.id - g2._id_offset + 1] end = end_nobuff + blockless_buffer maxlen = end_nobuff - start blk = np.asarray(p_fields[start:end, pblk], dtype="=i8") >= 0 # Can we use something like np.choose here? xyz = [ np.asarray(p_fields[start:end, _], dtype="=f8") for _ in (px, py, pz) ] (x, y, z), _xyz = _conditionally_split_arrays(xyz, blk) if _xyz[0].size > 0: bxyz.append(_xyz) yield ptype, (x[:maxlen], y[:maxlen], z[:maxlen]), 0.0 if len(bxyz) == 0: return bxyz = np.concatenate(bxyz, axis=-1) yield ptype, (bxyz[0, :], bxyz[1, :], bxyz[2, :]), 0.0 def _read_particle_fields(self, chunks, ptf, selector): chunks = list(chunks) f_part = self._particle_handle p_ind = self.ds.index._particle_indices px, py, pz = (self._particle_fields[f"particle_pos{ax}"] for ax in "xyz") pblk = self._particle_fields["particle_blk"] blockless_buffer = self.ds.index._blockless_particle_count p_fields = f_part["/tracer particles"] assert len(ptf) == 1 ptype = list(ptf.keys())[0] field_list = ptf[ptype] bxyz = [] bfields = {(ptype, field): [] for field in field_list} for chunk in chunks: for g1, g2 in particle_sequences(chunk.objs): start = p_ind[g1.id - g1._id_offset] end_nobuff = p_ind[g2.id - g2._id_offset + 1] end = end_nobuff + blockless_buffer maxlen = end_nobuff - start blk = np.asarray(p_fields[start:end, pblk], dtype="=i8") >= 0 xyz = [ np.asarray(p_fields[start:end, _], dtype="=f8") for _ in (px, py, pz) ] (x, y, z), _xyz = _conditionally_split_arrays(xyz, blk) x, y, z = (_[:maxlen] for _ in (x, y, z)) if _xyz[0].size > 0: bxyz.append(_xyz) mask = selector.select_points(x, y, z, 0.0) blockless = (_xyz[0] > 0).any() # This checks if none of the particles within these blocks are # included in the mask We need to also allow for blockless # particles to be selected. if mask is None and not blockless: continue for field in field_list: fi = self._particle_fields[field] (data,), (bdata,) = _conditionally_split_arrays( [p_fields[start:end, fi]], blk ) # Note that we have to apply mask to the split array, since # we select on the split array. if mask is not None: # Be sure to truncate off the buffer length!! yield (ptype, field), data[:maxlen][mask] if blockless: bfields[ptype, field].append(bdata) if len(bxyz) == 0: return bxyz = np.concatenate(bxyz, axis=-1) mask = selector.select_points(bxyz[0, :], bxyz[1, :], bxyz[2, :], 0.0) if mask is None: return for field in field_list: data_field = np.concatenate(bfields[ptype, field], axis=-1) yield (ptype, field), data_field[mask] def _read_obj_field(self, obj, field, ds_offset=None): if ds_offset is None: ds_offset = (None, -1) ds, offset = ds_offset # our context here includes datasets and whatnot that are opened in the # hdf5 file if ds is None: ds = self._handle[f"/{field[1]}"] if offset == -1: data = ds[obj.id - obj._id_offset, :, :, :].transpose() else: data = ds[offset, :, :, :].transpose() return data.astype("=f8") def _read_chunk_data(self, chunk, fields): f = self._handle rv = {} for g in chunk.objs: rv[g.id] = {} # Split into particles and non-particles fluid_fields, particle_fields = [], [] for ftype, fname in fields: if ftype in self.ds.particle_types: particle_fields.append((ftype, fname)) else: fluid_fields.append((ftype, fname)) if len(particle_fields) > 0: selector = AlwaysSelector(self.ds) rv.update(self._read_particle_selection([chunk], selector, particle_fields)) if len(fluid_fields) == 0: return rv for field in fluid_fields: ftype, fname = field ds = f[f"/{fname}"] for gs in grid_sequences(chunk.objs): start = gs[0].id - gs[0]._id_offset end = gs[-1].id - gs[-1]._id_offset + 1 data = ds[start:end, :, :, :].transpose() for i, g in enumerate(gs): rv[g.id][field] = np.asarray(data[..., i], "=f8") return rv
[docs] class IOHandlerFLASHParticle(BaseIOHandler): _particle_reader = True _dataset_type = "flash_particle_hdf5" def __init__(self, ds): super().__init__(ds) # Now we cache the particle fields self._handle = ds._handle self._particle_fields = determine_particle_fields(self._handle) self._position_fields = [ self._particle_fields[f"particle_pos{ax}"] for ax in "xyz" ] @property def chunksize(self): return 32**3 def _read_fluid_selection(self, chunks, selector, fields, size): raise NotImplementedError def _read_particle_coords(self, chunks, ptf): chunks = list(chunks) data_files = set() assert len(ptf) == 1 for chunk in chunks: for obj in chunk.objs: data_files.update(obj.data_files) px, py, pz = self._position_fields p_fields = self._handle["/tracer particles"] for data_file in sorted(data_files, key=lambda x: (x.filename, x.start)): pxyz = np.asarray( p_fields[data_file.start : data_file.end, [px, py, pz]], dtype="=f8" ) yield "io", pxyz.T, 0.0 def _yield_coordinates(self, data_file, needed_ptype=None): px, py, pz = self._position_fields p_fields = self._handle["/tracer particles"] pxyz = np.asarray( p_fields[data_file.start : data_file.end, [px, py, pz]], dtype="=f8" ) yield ("io", pxyz) def _read_particle_data_file(self, data_file, ptf, selector=None): px, py, pz = self._position_fields p_fields = self._handle["/tracer particles"] si, ei = data_file.start, data_file.end data_return = {} # This should just be a single item for ptype, field_list in sorted(ptf.items()): x = np.asarray(p_fields[si:ei, px], dtype="=f8") y = np.asarray(p_fields[si:ei, py], dtype="=f8") z = np.asarray(p_fields[si:ei, pz], dtype="=f8") if selector: mask = selector.select_points(x, y, z, 0.0) del x, y, z if mask is None: continue for field in field_list: fi = self._particle_fields[field] data = p_fields[si:ei, fi] if selector: data = data[mask] data_return[(ptype, field)] = data return data_return def _read_particle_fields(self, chunks, ptf, selector): assert len(ptf) == 1 yield from super()._read_particle_fields(chunks, ptf, selector) @cached_property def _pcount(self): return self._handle["/localnp"][:].sum() def _count_particles(self, data_file): si, ei = data_file.start, data_file.end pcount = self._pcount if None not in (si, ei): pcount = np.clip(pcount - si, 0, ei - si) return {"io": pcount} def _identify_fields(self, data_file): fields = [("io", field) for field in self._particle_fields] return fields, {}