import os
import sys
from contextlib import contextmanager
import numpy as np
from yt import __version__ as yt_version
from yt.funcs import iter_fields
from yt.utilities.exceptions import YTGDFAlreadyExists
from yt.utilities.on_demand_imports import _h5py as h5py
from yt.utilities.parallel_tools.parallel_analysis_interface import (
communication_system,
parallel_objects,
)
[docs]
def write_to_gdf(
ds,
gdf_path,
fields=None,
data_author=None,
data_comment=None,
dataset_units=None,
particle_type_name="dark_matter",
overwrite=False,
**kwargs,
):
"""
Write a dataset to the given path in the Grid Data Format.
Parameters
----------
ds : Dataset object
The yt data to write out.
gdf_path : string
The path of the file to output.
fields
The field or list of fields to write out. If None, defaults to
ds.field_list.
data_author : string, optional
The name of the author who wrote the data. Default: None.
data_comment : string, optional
A descriptive comment. Default: None.
dataset_units : dictionary, optional
A dictionary of (value, unit) tuples to set the default units
of the dataset. Keys can be:
* "length_unit"
* "time_unit"
* "mass_unit"
* "velocity_unit"
* "magnetic_unit"
If not specified, these will carry over from the parent
dataset.
particle_type_name : string, optional
The particle type of the particles in the dataset. Default: "dark_matter"
overwrite : boolean, optional
Whether or not to overwrite an already existing file. If False, attempting
to overwrite an existing file will result in an exception.
Examples
--------
>>> dataset_units = {"length_unit": (1.0, "Mpc"), "time_unit": (1.0, "Myr")}
>>> write_to_gdf(
... ds,
... "clumps.h5",
... data_author="John ZuHone",
... dataset_units=dataset_units,
... data_comment="My Really Cool Dataset",
... overwrite=True,
... )
"""
if fields is None:
fields = ds.field_list
fields = list(iter_fields(fields))
with _create_new_gdf(
ds,
gdf_path,
data_author,
data_comment,
dataset_units=dataset_units,
particle_type_name=particle_type_name,
overwrite=overwrite,
) as f:
# now add the fields one-by-one
_write_fields_to_gdf(ds, f, fields, particle_type_name)
[docs]
def save_field(ds, fields, field_parameters=None):
"""
Write a single field associated with the dataset ds to the
backup file.
Parameters
----------
ds : Dataset object
The yt dataset that the field is associated with.
fields : field of list of fields
The name(s) of the field(s) to save.
field_parameters : dictionary
A dictionary of field parameters to set.
"""
fields = list(iter_fields(fields))
for field_name in fields:
if isinstance(field_name, tuple):
field_name = field_name[1]
field_obj = ds._get_field_info(field_name)
if field_obj.sampling_type == "particle":
print("Saving particle fields currently not supported.")
return
with _get_backup_file(ds) as f:
# now save the field
_write_fields_to_gdf(
ds,
f,
fields,
particle_type_name="dark_matter",
field_parameters=field_parameters,
)
def _write_fields_to_gdf(
ds, fhandle, fields, particle_type_name, field_parameters=None
):
for field in fields:
# add field info to field_types group
g = fhandle["field_types"]
# create the subgroup with the field's name
fi = ds._get_field_info(field)
ftype, fname = fi.name
try:
sg = g.create_group(fname)
except ValueError:
print("Error - File already contains field called " + field)
sys.exit(1)
# grab the display name and units from the field info container.
display_name = fi.display_name
units = fi.units
# check that they actually contain something...
if display_name:
sg.attrs["field_name"] = np.bytes_(display_name)
else:
sg.attrs["field_name"] = np.bytes_(str(field))
if units:
sg.attrs["field_units"] = np.bytes_(units)
else:
sg.attrs["field_units"] = np.bytes_("None")
# @todo: is this always true?
sg.attrs["staggering"] = 0
# first we must create the datasets on all processes.
g = fhandle["data"]
for grid in ds.index.grids:
for field in fields:
# sanitize get the field info object
fi = ds._get_field_info(field)
ftype, fname = fi.name
grid_group = g["grid_%010i" % (grid.id - grid._id_offset)]
particles_group = grid_group["particles"]
pt_group = particles_group[particle_type_name]
if fi.sampling_type == "particle": # particle data
pt_group.create_dataset(fname, grid.ActiveDimensions, dtype="float64")
else: # a field
grid_group.create_dataset(fname, grid.ActiveDimensions, dtype="float64")
# now add the actual data, grid by grid
g = fhandle["data"]
data_source = ds.all_data()
citer = data_source.chunks([], "io", local_only=True)
for region in parallel_objects(citer):
# is there a better way to the get the grids on each chunk?
for chunk in ds.index._chunk_io(region):
for grid in chunk.objs:
for field in fields:
# sanitize and get the field info object
fi = ds._get_field_info(field)
ftype, fname = fi.name
# set field parameters, if specified
if field_parameters is not None:
for k, v in field_parameters.items():
grid.set_field_parameter(k, v)
grid_group = g["grid_%010i" % (grid.id - grid._id_offset)]
particles_group = grid_group["particles"]
pt_group = particles_group[particle_type_name]
# add the field data to the grid group
# Check if this is a real field or particle data.
grid.get_data(field)
units = fhandle["field_types"][fname].attrs["field_units"]
if fi.sampling_type == "particle": # particle data
dset = pt_group[fname]
dset[:] = grid[field].in_units(units)
else: # a field
dset = grid_group[fname]
dset[:] = grid[field].in_units(units)
@contextmanager
def _get_backup_file(ds):
backup_filename = ds.backup_filename
if os.path.exists(backup_filename):
# backup file already exists, open it. We use parallel
# h5py if it is available
if communication_system.communicators[-1].size > 1 and h5py.get_config().mpi:
mpi4py_communicator = communication_system.communicators[-1].comm
f = h5py.File(
backup_filename, mode="r+", driver="mpio", comm=mpi4py_communicator
)
else:
f = h5py.File(backup_filename, mode="r+")
yield f
f.close()
else:
# backup file does not exist, create it
with _create_new_gdf(
ds,
backup_filename,
data_author=None,
data_comment=None,
particle_type_name="dark_matter",
) as f:
yield f
@contextmanager
def _create_new_gdf(
ds,
gdf_path,
data_author=None,
data_comment=None,
dataset_units=None,
particle_type_name="dark_matter",
overwrite=False,
**kwargs,
):
# Make sure we have the absolute path to the file first
gdf_path = os.path.abspath(gdf_path)
# Is the file already there? If so, are we allowing
# overwriting?
if os.path.exists(gdf_path) and not overwrite:
raise YTGDFAlreadyExists(gdf_path)
###
# Create and open the file with h5py. We use parallel
# h5py if it is available.
###
if communication_system.communicators[-1].size > 1 and h5py.get_config().mpi:
mpi4py_communicator = communication_system.communicators[-1].comm
f = h5py.File(gdf_path, mode="w", driver="mpio", comm=mpi4py_communicator)
else:
f = h5py.File(gdf_path, mode="w")
###
# "gridded_data_format" group
###
g = f.create_group("gridded_data_format")
g.attrs["data_software"] = "yt"
g.attrs["data_software_version"] = yt_version
if data_author is not None:
g.attrs["data_author"] = data_author
if data_comment is not None:
g.attrs["data_comment"] = data_comment
###
# "simulation_parameters" group
###
g = f.create_group("simulation_parameters")
g.attrs["refine_by"] = ds.refine_by
g.attrs["dimensionality"] = ds.dimensionality
g.attrs["domain_dimensions"] = ds.domain_dimensions
g.attrs["current_time"] = ds.current_time
g.attrs["domain_left_edge"] = ds.domain_left_edge
g.attrs["domain_right_edge"] = ds.domain_right_edge
g.attrs["unique_identifier"] = ds.unique_identifier
g.attrs["cosmological_simulation"] = ds.cosmological_simulation
# @todo: Where is this in the yt API?
g.attrs["num_ghost_zones"] = 0
# @todo: Where is this in the yt API?
g.attrs["field_ordering"] = 0
# @todo: not yet supported by yt.
g.attrs["boundary_conditions"] = np.array([0, 0, 0, 0, 0, 0], "int32")
if ds.cosmological_simulation:
g.attrs["current_redshift"] = ds.current_redshift
g.attrs["omega_matter"] = ds.omega_matter
g.attrs["omega_lambda"] = ds.omega_lambda
g.attrs["hubble_constant"] = ds.hubble_constant
if dataset_units is None:
dataset_units = {}
g = f.create_group("dataset_units")
for u in ["length", "time", "mass", "velocity", "magnetic"]:
unit_name = u + "_unit"
if unit_name in dataset_units:
value, units = dataset_units[unit_name]
else:
attr = getattr(ds, unit_name)
value = float(attr)
units = str(attr.units)
d = g.create_dataset(unit_name, data=value)
d.attrs["unit"] = units
###
# "field_types" group
###
g = f.create_group("field_types")
###
# "particle_types" group
###
g = f.create_group("particle_types")
# @todo: Particle type iterator
sg = g.create_group(particle_type_name)
sg["particle_type_name"] = np.bytes_(particle_type_name)
###
# root datasets -- info about the grids
###
f["grid_dimensions"] = ds.index.grid_dimensions
f["grid_left_index"] = np.array(
[grid.get_global_startindex() for grid in ds.index.grids]
).reshape(ds.index.grid_dimensions.shape[0], 3)
f["grid_level"] = ds.index.grid_levels.flat
# @todo: Fill with proper values
f["grid_parent_id"] = -np.ones(ds.index.grid_dimensions.shape[0])
f["grid_particle_count"] = ds.index.grid_particle_count
###
# "data" group -- where we should spend the most time
###
g = f.create_group("data")
for grid in ds.index.grids:
# add group for this grid
grid_group = g.create_group("grid_%010i" % (grid.id - grid._id_offset))
# add group for the particles on this grid
particles_group = grid_group.create_group("particles")
particles_group.create_group(particle_type_name)
yield f
# close the file when done
f.close()