Source code for yt.frontends.adaptahop.io

"""
AdaptaHOP data-file handling function




"""

from functools import partial
from operator import attrgetter

import numpy as np

from yt.utilities.cython_fortran_utils import FortranFile
from yt.utilities.io_handler import BaseParticleIOHandler

from .definitions import ATTR_T


[docs] class IOHandlerAdaptaHOPBinary(BaseParticleIOHandler): _dataset_type = "adaptahop_binary" _offsets = None # Location of halos in the file _particle_positions = None # Buffer of halo position def _read_fluid_selection(self, chunks, selector, fields, size): raise NotImplementedError def _read_chunk_data(self, chunk, fields): raise NotImplementedError def _yield_coordinates(self, data_file): yield "halos", self._get_particle_positions() def _read_particle_coords(self, chunks, ptf): # This will read chunks and yield the results. # Only support halo reading for now. assert len(ptf) == 1 assert list(ptf.keys())[0] == "halos" ptype = "halos" for data_file in self._sorted_chunk_iterator(chunks): pcount = ( data_file.ds.parameters["nhalos"] + data_file.ds.parameters["nsubs"] ) if pcount == 0: continue pos = self._get_particle_positions() yield ptype, [pos[:, i] for i in range(3)], 0.0 def _read_particle_fields(self, chunks, ptf, selector): # Now we have all the sizes, and we can allocate # Only support halo reading for now. assert len(ptf) == 1 assert list(ptf.keys())[0] == "halos" def iterate_over_attributes(attr_list): for attr, *_ in attr_list: if isinstance(attr, tuple): yield from attr else: yield attr halo_attributes = self.ds._halo_attributes attr_pos = partial(_find_attr_position, halo_attributes=halo_attributes) for data_file in self._sorted_chunk_iterator(chunks): pcount = ( data_file.ds.parameters["nhalos"] + data_file.ds.parameters["nsubs"] ) if pcount == 0: continue ptype = "halos" field_list0 = sorted(ptf[ptype], key=attr_pos) field_list_pos = [f"raw_position_{k}" for k in "xyz"] field_list = sorted(set(field_list0 + field_list_pos), key=attr_pos) with FortranFile(self.ds.parameter_filename) as fpu: params = fpu.read_attrs(self.ds._header_attributes) todo = _todo_from_attributes(field_list, self.ds._halo_attributes) nhalos = params["nhalos"] + params["nsubs"] data = np.zeros((nhalos, len(field_list))) for ihalo in range(nhalos): jj = 0 for it in todo: if isinstance(it, int): fpu.skip(it) else: tmp = fpu.read_attrs(it) for key in iterate_over_attributes(it): v = tmp[key] if key not in field_list: continue data[ihalo, jj] = v jj += 1 ipos = [field_list.index(k) for k in field_list_pos] w = self.ds.domain_width.to("code_length")[0].value / 2 x, y, z = (data[:, i] + w for i in ipos) mask = selector.select_points(x, y, z, 0.0) del x, y, z if mask is None: continue for field in field_list0: i = field_list.index(field) yield (ptype, field), data[mask, i] def _count_particles(self, data_file): nhalos = data_file.ds.parameters["nhalos"] + data_file.ds.parameters["nsubs"] return {"halos": nhalos} def _identify_fields(self, data_file): fields = [] for attr, _1, _2 in self.ds._halo_attributes: if isinstance(attr, str): fields.append(("halos", attr)) else: for a in attr: fields.append(("halos", a)) return fields, {} # ----------------------------------------------------- # Specific to AdaptaHOP def _get_particle_positions(self): """Read the particles and return them in code_units""" data = getattr(self, "_particle_positions", None) if data is not None: return data with FortranFile(self.ds.parameter_filename) as fpu: params = fpu.read_attrs(self.ds._header_attributes) todo = _todo_from_attributes( ( "particle_identifier", "raw_position_x", "raw_position_y", "raw_position_z", ), self.ds._halo_attributes, ) nhalos = params["nhalos"] + params["nsubs"] data = np.zeros((nhalos, 3)) offset_map = np.zeros((nhalos, 2), dtype="int64") for ihalo in range(nhalos): ipos = fpu.tell() for it in todo: if isinstance(it, int): fpu.skip(it) elif it[0][0] != "particle_identifier": # Small optimisation here: we can read as vector # dt = fpu.read_attrs(it) # data[ihalo, 0] = dt['particle_position_x'] # data[ihalo, 1] = dt['particle_position_y'] # data[ihalo, 2] = dt['particle_position_z'] data[ihalo, :] = fpu.read_vector(it[0][-1]) else: halo_id = fpu.read_int() offset_map[ihalo, 0] = halo_id offset_map[ihalo, 1] = ipos data = self.ds.arr(data, "code_length") + self.ds.domain_width / 2 # Make sure halos are loaded in increasing halo_id order assert np.all(np.diff(offset_map[:, 0]) > 0) # Cache particle positions as one do not expect a large number of halos anyway self._particle_positions = data self._offsets = offset_map return data
[docs] def members(self, ihalo): offset = self._offsets[ihalo, 1] todo = _todo_from_attributes(("particle_identities",), self.ds._halo_attributes) with FortranFile(self.ds.parameter_filename) as fpu: fpu.seek(offset) if isinstance(todo[0], int): fpu.skip(todo.pop(0)) members = fpu.read_attrs(todo.pop(0))["particle_identities"] return members
def _sorted_chunk_iterator(self, chunks): data_files = self._get_data_files(chunks) yield from sorted(data_files, key=attrgetter("filename"))
def _todo_from_attributes(attributes: ATTR_T, halo_attributes: ATTR_T): # Helper function to generate a list of read-skip instructions given a list of # attributes. This is used to skip fields most of the fields when reading # the tree_brick files. iskip = 0 todo: list[int | list[tuple[tuple[str, ...] | str, int, str]]] = [] attributes = tuple(set(attributes)) for i, (attrs, l, k) in enumerate(halo_attributes): attrs_list: tuple[str, ...] if isinstance(attrs, tuple): if not all(isinstance(a, str) for a in attrs): raise TypeError(f"Expected a single str or a tuple of str, got {attrs}") attrs_list = attrs else: attrs_list = (attrs,) ok = False for attr in attrs_list: if attr in attributes: ok = True break if i == 0: if ok: state = "read" todo.append([]) else: state = "skip" if ok: if state == "skip": # Switched from skip to read, store skip information and start # new read list todo.append(iskip) todo.append([]) iskip = 0 if not isinstance(todo[-1], list): raise TypeError todo[-1].append((attrs, l, k)) state = "read" else: iskip += 1 state = "skip" if state == "skip" and iskip > 0: todo.append(iskip) return todo def _find_attr_position(key, halo_attributes: ATTR_T) -> int: j = 0 for attrs, *_ in halo_attributes: if not isinstance(attrs, tuple): attrs = (attrs,) for a in attrs: if key == a: return j j += 1 raise KeyError