Source code for yt.frontends.http_stream.data_structures

import json
import time
from functools import cached_property

import numpy as np

from yt.data_objects.static_output import ParticleDataset, ParticleFile
from yt.frontends.sph.fields import SPHFieldInfo
from yt.geometry.particle_geometry_handler import ParticleIndex
from yt.utilities.on_demand_imports import _requests as requests


[docs] class HTTPParticleFile(ParticleFile): pass
[docs] class HTTPStreamDataset(ParticleDataset): _load_requirements = ["requests"] _index_class = ParticleIndex _file_class = HTTPParticleFile _field_info_class = SPHFieldInfo _particle_mass_name = "Mass" _particle_coordinates_name = "Coordinates" _particle_velocity_name = "Velocities" filename_template = "" def __init__( self, base_url, dataset_type="http_particle_stream", unit_system="cgs", index_order=None, index_filename=None, ): self.base_url = base_url super().__init__( "", dataset_type=dataset_type, unit_system=unit_system, index_order=index_order, index_filename=index_filename, ) def __str__(self): return self.base_url @cached_property def unique_identifier(self) -> str: return str(self.parameters.get("unique_identifier", time.time())) def _parse_parameter_file(self): self.dimensionality = 3 self.refine_by = 2 self.parameters["HydroMethod"] = "sph" # Here's where we're going to grab the JSON index file hreq = requests.get(self.base_url + "/yt_index.json") if hreq.status_code != 200: raise RuntimeError header = json.loads(hreq.content) header["particle_count"] = { int(k): header["particle_count"][k] for k in header["particle_count"] } self.parameters = header # Now we get what we need self.domain_left_edge = np.array(header["domain_left_edge"], "float64") self.domain_right_edge = np.array(header["domain_right_edge"], "float64") self.domain_dimensions = np.ones(3, "int32") self._periodicity = (True, True, True) self.current_time = header["current_time"] self.cosmological_simulation = int(header["cosmological_simulation"]) for attr in ( "current_redshift", "omega_lambda", "omega_matter", "hubble_constant", ): setattr(self, attr, float(header[attr])) self.file_count = header["num_files"] def _set_units(self): length_unit = float(self.parameters["units"]["length"]) time_unit = float(self.parameters["units"]["time"]) mass_unit = float(self.parameters["units"]["mass"]) density_unit = mass_unit / length_unit**3 velocity_unit = length_unit / time_unit self._unit_base = {} self._unit_base["cm"] = 1.0 / length_unit self._unit_base["s"] = 1.0 / time_unit super()._set_units() self.conversion_factors["velocity"] = velocity_unit self.conversion_factors["mass"] = mass_unit self.conversion_factors["density"] = density_unit @classmethod def _is_valid(cls, filename: str, *args, **kwargs) -> bool: if not filename.startswith("http://"): return False if cls._missing_load_requirements(): return False return requests.get(filename + "/yt_index.json").status_code == 200