Source code for yt.frontends.tipsy.data_structures

import glob
import os
import struct

import numpy as np

from yt.data_objects.static_output import ParticleFile
from yt.frontends.sph.data_structures import SPHDataset, SPHParticleIndex
from yt.utilities.cosmology import Cosmology
from yt.utilities.physical_constants import G
from yt.utilities.physical_ratios import cm_per_kpc

from .fields import TipsyFieldInfo


[docs] class TipsyFile(ParticleFile): def __init__(self, ds, io, filename, file_id, range=None): super().__init__(ds, io, filename, file_id, range) if not hasattr(io, "_field_list"): io._create_dtypes(self) # Check automatically what the domain size is io._update_domain(self) self._calculate_offsets(io._field_list) def _calculate_offsets(self, field_list, pcounts=None): self.field_offsets = self.io._calculate_particle_offsets(self, None)
[docs] class TipsyDataset(SPHDataset): _index_class = SPHParticleIndex _file_class = TipsyFile _field_info_class = TipsyFieldInfo _particle_mass_name = "Mass" _particle_coordinates_name = "Coordinates" _sph_ptypes = ("Gas",) _header_spec = ( ("time", "d"), ("nbodies", "i"), ("ndim", "i"), ("nsph", "i"), ("ndark", "i"), ("nstar", "i"), ("dummy", "i"), ) def __init__( self, filename, dataset_type="tipsy", field_dtypes=None, unit_base=None, parameter_file=None, cosmology_parameters=None, index_order=None, index_filename=None, kdtree_filename=None, kernel_name=None, bounding_box=None, units_override=None, unit_system="cgs", default_species_fields=None, ): # Because Tipsy outputs don't have a fixed domain boundary, one can # specify a bounding box which effectively gives a domain_left_edge # and domain_right_edge self.bounding_box = bounding_box self.filter_bbox = bounding_box is not None if field_dtypes is None: field_dtypes = {} success, self.endian = self._validate_header(filename) if not success: print("SOMETHING HAS GONE WRONG. NBODIES != SUM PARTICLES.") print( "{} != (sum == {} + {} + {})".format( self.parameters["nbodies"], self.parameters["nsph"], self.parameters["ndark"], self.parameters["nstar"], ) ) print("Often this can be fixed by changing the 'endian' parameter.") print("This defaults to '>' but may in fact be '<'.") raise RuntimeError self.storage_filename = None # My understanding is that dtypes are set on a field by field basis, # not on a (particle type, field) basis self._field_dtypes = field_dtypes self._unit_base = unit_base or {} self._cosmology_parameters = cosmology_parameters if parameter_file is not None: parameter_file = os.path.abspath(parameter_file) self._param_file = parameter_file filename = os.path.abspath(filename) if units_override is not None: raise RuntimeError( "units_override is not supported for TipsyDataset. " + "Use unit_base instead." ) super().__init__( filename, dataset_type=dataset_type, unit_system=unit_system, index_order=index_order, index_filename=index_filename, kdtree_filename=kdtree_filename, kernel_name=kernel_name, default_species_fields=default_species_fields, ) def __str__(self): return os.path.basename(self.parameter_filename) def _parse_parameter_file(self): # Parsing the header of the tipsy file, from this we obtain # the snapshot time and particle counts. f = open(self.parameter_filename, "rb") hh = self.endian + "".join(str(b) for a, b in self._header_spec) hvals = { a: c for (a, b), c in zip( self._header_spec, struct.unpack(hh, f.read(struct.calcsize(hh))), strict=True, ) } self.parameters.update(hvals) self._header_offset = f.tell() # These are always true, for now. self.dimensionality = 3 self.refine_by = 2 self.parameters["HydroMethod"] = "sph" # Read in parameter file, if available. if self._param_file is None: pfn = glob.glob(os.path.join(self.directory, "*.param")) assert len(pfn) < 2, "More than one param file is in the data directory" if pfn == []: pfn = None else: pfn = pfn[0] else: pfn = self._param_file if pfn is not None: for line in (l.strip() for l in open(pfn)): # skip comment lines and blank lines l = line.strip() if l.startswith("#") or l == "": continue # parse parameters according to tipsy parameter type param, val = (i.strip() for i in line.split("=", 1)) val = val.split("#")[0] if param.startswith("n") or param.startswith("i"): val = int(val) elif param.startswith("d"): val = float(val) elif param.startswith("b"): val = bool(float(val)) self.parameters[param] = val self.current_time = hvals["time"] self.domain_dimensions = np.ones(3, "int32") periodic = self.parameters.get("bPeriodic", True) period = self.parameters.get("dPeriod", None) self._periodicity = (periodic, periodic, periodic) self.cosmological_simulation = float( self.parameters.get("bComove", self._cosmology_parameters is not None) ) if self.cosmological_simulation and period is None: period = 1.0 if self.bounding_box is None: if periodic and period is not None: # If we are periodic, that sets our domain width to # either 1 or dPeriod. self.domain_left_edge = np.zeros(3, "float64") - 0.5 * period self.domain_right_edge = np.zeros(3, "float64") + 0.5 * period else: self.domain_left_edge = None self.domain_right_edge = None else: # This ensures that we know a bounding box has been applied self._domain_override = True bbox = np.array(self.bounding_box, dtype="float64") if bbox.shape == (2, 3): bbox = bbox.transpose() self.domain_left_edge = bbox[:, 0] self.domain_right_edge = bbox[:, 1] # If the cosmology parameters dictionary got set when data is # loaded, we can assume it's a cosmological data set if self.cosmological_simulation == 1.0: cosm = self._cosmology_parameters or {} # In comoving simulations, time stores the scale factor a self.scale_factor = hvals["time"] dcosm = { "current_redshift": (1.0 / self.scale_factor) - 1.0, "omega_lambda": self.parameters.get( "dLambda", cosm.get("omega_lambda", 0.0) ), "omega_matter": self.parameters.get( "dOmega0", cosm.get("omega_matter", 0.0) ), "hubble_constant": self.parameters.get( "dHubble0", cosm.get("hubble_constant", 1.0) ), } for param in dcosm.keys(): pval = dcosm[param] setattr(self, param, pval) else: kpc_unit = self.parameters.get("dKpcUnit", 1.0) self._unit_base["cm"] = 1.0 / (kpc_unit * cm_per_kpc) self.filename_template = self.parameter_filename self.file_count = 1 f.close() def _set_derived_attrs(self): if self.bounding_box is None and ( self.domain_left_edge is None or self.domain_right_edge is None ): self.domain_left_edge = np.array([np.nan, np.nan, np.nan]) self.domain_right_edge = np.array([np.nan, np.nan, np.nan]) self.index super()._set_derived_attrs() def _set_code_unit_attributes(self): # First try to set units based on parameter file if self.cosmological_simulation: mu = self.parameters.get("dMsolUnit", 1.0) self.mass_unit = self.quan(mu, "Msun") lu = self.parameters.get("dKpcUnit", 1000.0) # In cosmological runs, lengths are stored as length*scale_factor self.length_unit = self.quan(lu, "kpc") * self.scale_factor density_unit = self.mass_unit / (self.length_unit / self.scale_factor) ** 3 if "dHubble0" in self.parameters: # Gasoline's internal hubble constant, dHubble0, is stored in # units of proper code time self.hubble_constant *= np.sqrt(G * density_unit) # Finally, we scale the hubble constant by 100 km/s/Mpc self.hubble_constant /= self.quan(100, "km/s/Mpc") # If we leave it as a YTQuantity, the cosmology object # used below will add units back on. self.hubble_constant = self.hubble_constant.to_value("") else: mu = self.parameters.get("dMsolUnit", 1.0) self.mass_unit = self.quan(mu, "Msun") lu = self.parameters.get("dKpcUnit", 1.0) self.length_unit = self.quan(lu, "kpc") # If unit base is defined by the user, override all relevant units if self._unit_base is not None: for my_unit in ["length", "mass", "time"]: if my_unit in self._unit_base: my_val = self._unit_base[my_unit] my_val = ( self.quan(*my_val) if isinstance(my_val, tuple) else self.quan(my_val) ) setattr(self, f"{my_unit}_unit", my_val) # Finally, set the dependent units if self.cosmological_simulation: cosmo = Cosmology( hubble_constant=self.hubble_constant, omega_matter=self.omega_matter, omega_lambda=self.omega_lambda, ) self.current_time = cosmo.lookback_time(self.current_redshift, 1e6) # mass units are rho_crit(z=0) * domain volume mu = ( cosmo.critical_density(0.0) * (1 + self.current_redshift) ** 3 * self.length_unit**3 ) self.mass_unit = self.quan(mu.in_units("Msun"), "Msun") density_unit = self.mass_unit / (self.length_unit / self.scale_factor) ** 3 # need to do this again because we've modified the hubble constant self.unit_registry.modify("h", self.hubble_constant) else: density_unit = self.mass_unit / self.length_unit**3 if not hasattr(self, "time_unit"): self.time_unit = 1.0 / np.sqrt(density_unit * G) @staticmethod def _validate_header(filename): """ This method automatically detects whether the tipsy file is big/little endian and is not corrupt/invalid. It returns a tuple of (Valid, endianswap) where Valid is a boolean that is true if the file is a tipsy file, and endianswap is the endianness character '>' or '<'. """ try: f = open(filename, "rb") except Exception: return False, 1 try: f.seek(0, os.SEEK_END) fs = f.tell() f.seek(0, os.SEEK_SET) # Read in the header t, n, ndim, ng, nd, ns = struct.unpack("<diiiii", f.read(28)) except (OSError, struct.error): return False, 1 endianswap = "<" # Check Endianness if ndim < 1 or ndim > 3: endianswap = ">" f.seek(0) t, n, ndim, ng, nd, ns = struct.unpack(">diiiii", f.read(28)) # File is borked if this is true. The header is 28 bytes, and may # Be followed by a 4 byte pad. Next comes gas particles, which use # 48 bytes, followed by 36 bytes per dark matter particle, and 44 bytes # per star particle. If positions are stored as doubles, each of these # sizes is increased by 12 bytes. if ( fs != 28 + 48 * ng + 36 * nd + 44 * ns and fs != 28 + 60 * ng + 48 * nd + 56 * ns and fs != 32 + 48 * ng + 36 * nd + 44 * ns and fs != 32 + 60 * ng + 48 * nd + 56 * ns ): f.close() return False, 0 f.close() return True, endianswap @classmethod def _is_valid(cls, filename: str, *args, **kwargs) -> bool: return TipsyDataset._validate_header(filename)[0]