Source code for yt.frontends.rockstar.data_structures

import glob
import os

import numpy as np

from yt.data_objects.static_output import ParticleDataset
from yt.frontends.halo_catalog.data_structures import HaloCatalogFile
from yt.funcs import setdefaultattr
from yt.geometry.particle_geometry_handler import ParticleIndex
from yt.utilities import fortran_utils as fpu
from yt.utilities.cosmology import Cosmology

from .definitions import header_dt
from .fields import RockstarFieldInfo


[docs] class RockstarBinaryFile(HaloCatalogFile): def __init__(self, ds, io, filename, file_id, range): with open(filename, "rb") as f: self.header = fpu.read_cattrs(f, header_dt, "=") self._position_offset = f.tell() f.seek(0, os.SEEK_END) self._file_size = f.tell() super().__init__(ds, io, filename, file_id, range) def _read_particle_positions(self, ptype, f=None): """ Read all particle positions in this file. """ if f is None: close = True f = open(self.filename, "rb") else: close = False pcount = self.header["num_halos"] pos = np.empty((pcount, 3), dtype="float64") f.seek(self._position_offset, os.SEEK_SET) halos = np.fromfile(f, dtype=self.io._halo_dt, count=pcount) for i, ax in enumerate("xyz"): pos[:, i] = halos[f"particle_position_{ax}"].astype("float64") if close: f.close() return pos
[docs] class RockstarDataset(ParticleDataset): _index_class = ParticleIndex _file_class = RockstarBinaryFile _field_info_class = RockstarFieldInfo _suffix = ".bin" def __init__( self, filename, dataset_type="rockstar_binary", units_override=None, unit_system="cgs", index_order=None, index_filename=None, ): super().__init__( filename, dataset_type, units_override=units_override, unit_system=unit_system, ) def _parse_parameter_file(self): with open(self.parameter_filename, "rb") as f: hvals = fpu.read_cattrs(f, header_dt) hvals.pop("unused") self.dimensionality = 3 self.refine_by = 2 prefix = ".".join(self.parameter_filename.rsplit(".", 2)[:-2]) self.filename_template = f"{prefix}.%(num)s{self._suffix}" self.file_count = len(glob.glob(prefix + ".*" + self._suffix)) # Now we can set up things we already know. self.cosmological_simulation = 1 self.current_redshift = (1.0 / hvals["scale"]) - 1.0 self.hubble_constant = hvals["h0"] self.omega_lambda = hvals["Ol"] self.omega_matter = hvals["Om"] cosmo = Cosmology( hubble_constant=self.hubble_constant, omega_matter=self.omega_matter, omega_lambda=self.omega_lambda, ) self.current_time = cosmo.lookback_time(self.current_redshift, 1e6).in_units( "s" ) self._periodicity = (True, True, True) self.particle_types = "halos" self.particle_types_raw = "halos" self.domain_left_edge = np.array([0.0, 0.0, 0.0]) self.domain_right_edge = np.array([hvals["box_size"]] * 3) self.domain_dimensions = np.ones(3, "int32") self.parameters.update(hvals) def _set_code_unit_attributes(self): z = self.current_redshift setdefaultattr(self, "length_unit", self.quan(1.0 / (1.0 + z), "Mpc / h")) setdefaultattr(self, "mass_unit", self.quan(1.0, "Msun / h")) setdefaultattr(self, "velocity_unit", self.quan(1.0, "km / s")) setdefaultattr(self, "time_unit", self.length_unit / self.velocity_unit) @classmethod def _is_valid(cls, filename: str, *args, **kwargs) -> bool: if not filename.endswith(".bin"): return False try: with open(filename, mode="rb") as f: header = fpu.read_cattrs(f, header_dt) except OSError: return False else: return header["magic"] == 18077126535843729616