Source code for yt.utilities.grid_data_format.writer

import os
import sys
from contextlib import contextmanager

import numpy as np

from yt import __version__ as yt_version
from yt.funcs import iter_fields
from yt.utilities.exceptions import YTGDFAlreadyExists
from yt.utilities.on_demand_imports import _h5py as h5py
from yt.utilities.parallel_tools.parallel_analysis_interface import (
    communication_system,
    parallel_objects,
)


[docs] def write_to_gdf( ds, gdf_path, fields=None, data_author=None, data_comment=None, dataset_units=None, particle_type_name="dark_matter", overwrite=False, **kwargs, ): """ Write a dataset to the given path in the Grid Data Format. Parameters ---------- ds : Dataset object The yt data to write out. gdf_path : string The path of the file to output. fields The field or list of fields to write out. If None, defaults to ds.field_list. data_author : string, optional The name of the author who wrote the data. Default: None. data_comment : string, optional A descriptive comment. Default: None. dataset_units : dictionary, optional A dictionary of (value, unit) tuples to set the default units of the dataset. Keys can be: * "length_unit" * "time_unit" * "mass_unit" * "velocity_unit" * "magnetic_unit" If not specified, these will carry over from the parent dataset. particle_type_name : string, optional The particle type of the particles in the dataset. Default: "dark_matter" overwrite : boolean, optional Whether or not to overwrite an already existing file. If False, attempting to overwrite an existing file will result in an exception. Examples -------- >>> dataset_units = {"length_unit": (1.0, "Mpc"), "time_unit": (1.0, "Myr")} >>> write_to_gdf( ... ds, ... "clumps.h5", ... data_author="John ZuHone", ... dataset_units=dataset_units, ... data_comment="My Really Cool Dataset", ... overwrite=True, ... ) """ if fields is None: fields = ds.field_list fields = list(iter_fields(fields)) with _create_new_gdf( ds, gdf_path, data_author, data_comment, dataset_units=dataset_units, particle_type_name=particle_type_name, overwrite=overwrite, ) as f: # now add the fields one-by-one _write_fields_to_gdf(ds, f, fields, particle_type_name)
[docs] def save_field(ds, fields, field_parameters=None): """ Write a single field associated with the dataset ds to the backup file. Parameters ---------- ds : Dataset object The yt dataset that the field is associated with. fields : field of list of fields The name(s) of the field(s) to save. field_parameters : dictionary A dictionary of field parameters to set. """ fields = list(iter_fields(fields)) for field_name in fields: if isinstance(field_name, tuple): field_name = field_name[1] field_obj = ds._get_field_info(field_name) if field_obj.sampling_type == "particle": print("Saving particle fields currently not supported.") return with _get_backup_file(ds) as f: # now save the field _write_fields_to_gdf( ds, f, fields, particle_type_name="dark_matter", field_parameters=field_parameters, )
def _write_fields_to_gdf( ds, fhandle, fields, particle_type_name, field_parameters=None ): for field in fields: # add field info to field_types group g = fhandle["field_types"] # create the subgroup with the field's name fi = ds._get_field_info(field) ftype, fname = fi.name try: sg = g.create_group(fname) except ValueError: print("Error - File already contains field called " + field) sys.exit(1) # grab the display name and units from the field info container. display_name = fi.display_name units = fi.units # check that they actually contain something... if display_name: sg.attrs["field_name"] = np.bytes_(display_name) else: sg.attrs["field_name"] = np.bytes_(str(field)) if units: sg.attrs["field_units"] = np.bytes_(units) else: sg.attrs["field_units"] = np.bytes_("None") # @todo: is this always true? sg.attrs["staggering"] = 0 # first we must create the datasets on all processes. g = fhandle["data"] for grid in ds.index.grids: for field in fields: # sanitize get the field info object fi = ds._get_field_info(field) ftype, fname = fi.name grid_group = g["grid_%010i" % (grid.id - grid._id_offset)] particles_group = grid_group["particles"] pt_group = particles_group[particle_type_name] if fi.sampling_type == "particle": # particle data pt_group.create_dataset(fname, grid.ActiveDimensions, dtype="float64") else: # a field grid_group.create_dataset(fname, grid.ActiveDimensions, dtype="float64") # now add the actual data, grid by grid g = fhandle["data"] data_source = ds.all_data() citer = data_source.chunks([], "io", local_only=True) for region in parallel_objects(citer): # is there a better way to the get the grids on each chunk? for chunk in ds.index._chunk_io(region): for grid in chunk.objs: for field in fields: # sanitize and get the field info object fi = ds._get_field_info(field) ftype, fname = fi.name # set field parameters, if specified if field_parameters is not None: for k, v in field_parameters.items(): grid.set_field_parameter(k, v) grid_group = g["grid_%010i" % (grid.id - grid._id_offset)] particles_group = grid_group["particles"] pt_group = particles_group[particle_type_name] # add the field data to the grid group # Check if this is a real field or particle data. grid.get_data(field) units = fhandle["field_types"][fname].attrs["field_units"] if fi.sampling_type == "particle": # particle data dset = pt_group[fname] dset[:] = grid[field].in_units(units) else: # a field dset = grid_group[fname] dset[:] = grid[field].in_units(units) @contextmanager def _get_backup_file(ds): backup_filename = ds.backup_filename if os.path.exists(backup_filename): # backup file already exists, open it. We use parallel # h5py if it is available if communication_system.communicators[-1].size > 1 and h5py.get_config().mpi: mpi4py_communicator = communication_system.communicators[-1].comm f = h5py.File( backup_filename, mode="r+", driver="mpio", comm=mpi4py_communicator ) else: f = h5py.File(backup_filename, mode="r+") yield f f.close() else: # backup file does not exist, create it with _create_new_gdf( ds, backup_filename, data_author=None, data_comment=None, particle_type_name="dark_matter", ) as f: yield f @contextmanager def _create_new_gdf( ds, gdf_path, data_author=None, data_comment=None, dataset_units=None, particle_type_name="dark_matter", overwrite=False, **kwargs, ): # Make sure we have the absolute path to the file first gdf_path = os.path.abspath(gdf_path) # Is the file already there? If so, are we allowing # overwriting? if os.path.exists(gdf_path) and not overwrite: raise YTGDFAlreadyExists(gdf_path) ### # Create and open the file with h5py. We use parallel # h5py if it is available. ### if communication_system.communicators[-1].size > 1 and h5py.get_config().mpi: mpi4py_communicator = communication_system.communicators[-1].comm f = h5py.File(gdf_path, mode="w", driver="mpio", comm=mpi4py_communicator) else: f = h5py.File(gdf_path, mode="w") ### # "gridded_data_format" group ### g = f.create_group("gridded_data_format") g.attrs["data_software"] = "yt" g.attrs["data_software_version"] = yt_version if data_author is not None: g.attrs["data_author"] = data_author if data_comment is not None: g.attrs["data_comment"] = data_comment ### # "simulation_parameters" group ### g = f.create_group("simulation_parameters") g.attrs["refine_by"] = ds.refine_by g.attrs["dimensionality"] = ds.dimensionality g.attrs["domain_dimensions"] = ds.domain_dimensions g.attrs["current_time"] = ds.current_time g.attrs["domain_left_edge"] = ds.domain_left_edge g.attrs["domain_right_edge"] = ds.domain_right_edge g.attrs["unique_identifier"] = ds.unique_identifier g.attrs["cosmological_simulation"] = ds.cosmological_simulation # @todo: Where is this in the yt API? g.attrs["num_ghost_zones"] = 0 # @todo: Where is this in the yt API? g.attrs["field_ordering"] = 0 # @todo: not yet supported by yt. g.attrs["boundary_conditions"] = np.array([0, 0, 0, 0, 0, 0], "int32") if ds.cosmological_simulation: g.attrs["current_redshift"] = ds.current_redshift g.attrs["omega_matter"] = ds.omega_matter g.attrs["omega_lambda"] = ds.omega_lambda g.attrs["hubble_constant"] = ds.hubble_constant if dataset_units is None: dataset_units = {} g = f.create_group("dataset_units") for u in ["length", "time", "mass", "velocity", "magnetic"]: unit_name = u + "_unit" if unit_name in dataset_units: value, units = dataset_units[unit_name] else: attr = getattr(ds, unit_name) value = float(attr) units = str(attr.units) d = g.create_dataset(unit_name, data=value) d.attrs["unit"] = units ### # "field_types" group ### g = f.create_group("field_types") ### # "particle_types" group ### g = f.create_group("particle_types") # @todo: Particle type iterator sg = g.create_group(particle_type_name) sg["particle_type_name"] = np.bytes_(particle_type_name) ### # root datasets -- info about the grids ### f["grid_dimensions"] = ds.index.grid_dimensions f["grid_left_index"] = np.array( [grid.get_global_startindex() for grid in ds.index.grids] ).reshape(ds.index.grid_dimensions.shape[0], 3) f["grid_level"] = ds.index.grid_levels.flat # @todo: Fill with proper values f["grid_parent_id"] = -np.ones(ds.index.grid_dimensions.shape[0]) f["grid_particle_count"] = ds.index.grid_particle_count ### # "data" group -- where we should spend the most time ### g = f.create_group("data") for grid in ds.index.grids: # add group for this grid grid_group = g.create_group("grid_%010i" % (grid.id - grid._id_offset)) # add group for the particles on this grid particles_group = grid_group.create_group("particles") particles_group.create_group(particle_type_name) yield f # close the file when done f.close()