import os
import re
import weakref
import numpy as np
from yt.data_objects.index_subobjects.grid_patch import AMRGridPatch
from yt.data_objects.static_output import Dataset
from yt.fields.field_info_container import FieldInfoContainer
from yt.funcs import mylog, setdefaultattr
from yt.geometry.api import Geometry
from yt.geometry.grid_geometry_handler import GridIndex
from yt.utilities.file_handler import HDF5FileHandler, valid_hdf5_signature
from yt.utilities.lib.misc_utilities import get_box_grids_level
from yt.utilities.on_demand_imports import _h5py as h5py
from yt.utilities.parallel_tools.parallel_analysis_interface import parallel_root_only
from .fields import (
ChomboFieldInfo,
ChomboPICFieldInfo1D,
ChomboPICFieldInfo2D,
ChomboPICFieldInfo3D,
Orion2FieldInfo,
PlutoFieldInfo,
)
[docs]
def is_chombo_hdf5(fn):
try:
with h5py.File(fn, mode="r") as fileh:
valid = "Chombo_global" in fileh["/"]
except (KeyError, OSError, ImportError):
return False
return valid
[docs]
class ChomboGrid(AMRGridPatch):
_id_offset = 0
__slots__ = ["_level_id", "stop_index"]
def __init__(self, id, index, level, start, stop):
AMRGridPatch.__init__(self, id, filename=index.index_filename, index=index)
self._parent_id = []
self._children_ids = []
self.Level = level
self.ActiveDimensions = stop - start + 1
[docs]
def get_global_startindex(self):
"""
Return the integer starting index for each dimension at the current
level.
"""
if self.start_index is not None:
return self.start_index
if self.Parent is None:
iLE = self.LeftEdge - self.ds.domain_left_edge
start_index = iLE / self.dds
return np.rint(start_index).astype("int64").ravel()
pdx = self.Parent[0].dds
start_index = (self.Parent[0].get_global_startindex()) + np.rint(
(self.LeftEdge - self.Parent[0].LeftEdge) / pdx
)
self.start_index = (start_index * self.ds.refine_by).astype("int64").ravel()
return self.start_index
def _setup_dx(self):
# has already been read in and stored in index
self.dds = self.ds.arr(self.index.dds_list[self.Level], "code_length")
@property
def Parent(self):
if len(self._parent_id) == 0:
return None
return [self.index.grids[pid - self._id_offset] for pid in self._parent_id]
@property
def Children(self):
return [self.index.grids[cid - self._id_offset] for cid in self._children_ids]
[docs]
class ChomboHierarchy(GridIndex):
grid = ChomboGrid
_data_file = None
def __init__(self, ds, dataset_type="chombo_hdf5"):
self.domain_left_edge = ds.domain_left_edge
self.domain_right_edge = ds.domain_right_edge
self.dataset_type = dataset_type
self.field_indexes = {}
self.dataset = weakref.proxy(ds)
# for now, the index file is the dataset!
self.index_filename = os.path.abspath(self.dataset.parameter_filename)
self.directory = ds.directory
self._handle = ds._handle
self._levels = [key for key in self._handle.keys() if key.startswith("level")]
GridIndex.__init__(self, ds, dataset_type)
self._read_particles()
def _read_particles(self):
# only do anything if the dataset contains particles
if not any(f[1].startswith("particle_") for f in self.field_list):
return
self.num_particles = 0
particles_per_grid = []
for key, val in self._handle.items():
if key.startswith("level"):
level_particles = val["particles:offsets"][:]
self.num_particles += level_particles.sum()
particles_per_grid = np.concatenate(
(particles_per_grid, level_particles)
)
for i, _grid in enumerate(self.grids):
self.grids[i].NumberOfParticles = particles_per_grid[i]
self.grid_particle_count[i] = particles_per_grid[i]
assert self.num_particles == self.grid_particle_count.sum()
# Chombo datasets, by themselves, have no "known" fields. However,
# we will look for "fluid" fields by finding the string "component" in
# the output file, and "particle" fields by finding the string "particle".
def _detect_output_fields(self):
# look for fluid fields
output_fields = []
for key, val in self._handle.attrs.items():
if key.startswith("component"):
output_fields.append(val.decode("ascii"))
self.field_list = [("chombo", c) for c in output_fields]
# look for particle fields
particle_fields = []
for key, val in self._handle.attrs.items():
if key.startswith("particle"):
particle_fields.append(val.decode("ascii"))
self.field_list.extend([("io", c) for c in particle_fields])
def _count_grids(self):
self.num_grids = 0
for lev in self._levels:
d = self._handle[lev]
if "Processors" in d:
self.num_grids += d["Processors"].len()
elif "boxes" in d:
self.num_grids += d["boxes"].len()
else:
raise RuntimeError("Unknown file specification")
def _parse_index(self):
f = self._handle # shortcut
self.max_level = f.attrs["num_levels"] - 1
grids = []
self.dds_list = []
i = 0
D = self.dataset.dimensionality
for lev_index, lev in enumerate(self._levels):
level_number = int(re.match(r"level_(\d+)", lev).groups()[0])
try:
boxes = f[lev]["boxes"][()]
except KeyError:
boxes = f[lev]["particles:boxes"][()]
dx = f[lev].attrs["dx"]
self.dds_list.append(dx * np.ones(3))
if D == 1:
self.dds_list[lev_index][1] = 1.0
self.dds_list[lev_index][2] = 1.0
if D == 2:
self.dds_list[lev_index][2] = 1.0
for level_id, box in enumerate(boxes):
si = np.array([box[f"lo_{ax}"] for ax in "ijk"[:D]])
ei = np.array([box[f"hi_{ax}"] for ax in "ijk"[:D]])
if D == 1:
si = np.concatenate((si, [0.0, 0.0]))
ei = np.concatenate((ei, [0.0, 0.0]))
if D == 2:
si = np.concatenate((si, [0.0]))
ei = np.concatenate((ei, [0.0]))
pg = self.grid(len(grids), self, level=level_number, start=si, stop=ei)
grids.append(pg)
grids[-1]._level_id = level_id
self.grid_levels[i] = level_number
self.grid_left_edge[i] = self.dds_list[lev_index] * si.astype(
self.float_type
)
self.grid_right_edge[i] = self.dds_list[lev_index] * (
ei.astype(self.float_type) + 1
)
self.grid_particle_count[i] = 0
self.grid_dimensions[i] = ei - si + 1
i += 1
self.grids = np.empty(len(grids), dtype="object")
for gi, g in enumerate(grids):
self.grids[gi] = g
def _populate_grid_objects(self):
self._reconstruct_parent_child()
for g in self.grids:
g._prepare_grid()
g._setup_dx()
def _setup_derived_fields(self):
self.derived_field_list = []
def _reconstruct_parent_child(self):
mask = np.empty(len(self.grids), dtype="int32")
mylog.debug("First pass; identifying child grids")
for i, grid in enumerate(self.grids):
get_box_grids_level(
self.grid_left_edge[i, :],
self.grid_right_edge[i, :],
self.grid_levels[i].item() + 1,
self.grid_left_edge,
self.grid_right_edge,
self.grid_levels,
mask,
)
ids = np.where(mask.astype("bool")) # where is a tuple
grid._children_ids = ids[0] + grid._id_offset
mylog.debug("Second pass; identifying parents")
for i, grid in enumerate(self.grids): # Second pass
for child in grid.Children:
child._parent_id.append(i + grid._id_offset)
[docs]
class ChomboDataset(Dataset):
_load_requirements = ["h5py"]
_index_class = ChomboHierarchy
_field_info_class: type[FieldInfoContainer] = ChomboFieldInfo
def __init__(
self,
filename,
dataset_type="chombo_hdf5",
storage_filename=None,
ini_filename=None,
units_override=None,
unit_system="cgs",
default_species_fields=None,
):
self.fluid_types += ("chombo",)
self._handle = HDF5FileHandler(filename)
self.dataset_type = dataset_type
self.geometry = Geometry.CARTESIAN
self.ini_filename = ini_filename
self.fullplotdir = os.path.abspath(filename)
Dataset.__init__(
self,
filename,
self.dataset_type,
units_override=units_override,
unit_system=unit_system,
default_species_fields=default_species_fields,
)
self.storage_filename = storage_filename
self.cosmological_simulation = False
# These are parameters that I very much wish to get rid of.
self.parameters["HydroMethod"] = "chombo"
self.parameters["DualEnergyFormalism"] = 0
self.parameters["EOSType"] = -1 # default
def _set_code_unit_attributes(self):
if not hasattr(self, "length_unit"):
mylog.warning("Setting code length unit to be 1.0 cm")
if not hasattr(self, "mass_unit"):
mylog.warning("Setting code mass unit to be 1.0 g")
if not hasattr(self, "time_unit"):
mylog.warning("Setting code time unit to be 1.0 s")
setdefaultattr(self, "length_unit", self.quan(1.0, "cm"))
setdefaultattr(self, "mass_unit", self.quan(1.0, "g"))
setdefaultattr(self, "time_unit", self.quan(1.0, "s"))
setdefaultattr(self, "magnetic_unit", self.quan(np.sqrt(4.0 * np.pi), "gauss"))
setdefaultattr(self, "velocity_unit", self.length_unit / self.time_unit)
def _localize(self, f, default):
if f is None:
return os.path.join(self.directory, default)
return f
def _parse_parameter_file(self):
self.dimensionality = self._handle["Chombo_global/"].attrs["SpaceDim"]
self.domain_left_edge = self._calc_left_edge()
self.domain_right_edge = self._calc_right_edge()
self.domain_dimensions = self._calc_domain_dimensions()
# if a lower-dimensional dataset, set up pseudo-3D stuff here.
if self.dimensionality == 1:
self.domain_left_edge = np.concatenate((self.domain_left_edge, [0.0, 0.0]))
self.domain_right_edge = np.concatenate(
(self.domain_right_edge, [1.0, 1.0])
)
self.domain_dimensions = np.concatenate((self.domain_dimensions, [1, 1]))
if self.dimensionality == 2:
self.domain_left_edge = np.concatenate((self.domain_left_edge, [0.0]))
self.domain_right_edge = np.concatenate((self.domain_right_edge, [1.0]))
self.domain_dimensions = np.concatenate((self.domain_dimensions, [1]))
self.refine_by = self._handle["/level_0"].attrs["ref_ratio"]
self._determine_periodic()
self._determine_current_time()
def _determine_current_time(self):
# some datasets will not be time-dependent, and to make
# matters worse, the simulation time is not always
# stored in the same place in the hdf file! Make
# sure we handle that here.
try:
self.current_time = self._handle.attrs["time"]
except KeyError:
try:
self.current_time = self._handle["/level_0"].attrs["time"]
except KeyError:
self.current_time = 0.0
def _determine_periodic(self):
# we default to true unless the HDF5 file says otherwise
is_periodic = np.array([True, True, True])
for dir in range(self.dimensionality):
try:
is_periodic[dir] = self._handle["/level_0"].attrs[
"is_periodic_%d" % dir
]
except KeyError:
is_periodic[dir] = True
self._periodicity = tuple(is_periodic)
def _calc_left_edge(self):
fileh = self._handle
dx0 = fileh["/level_0"].attrs["dx"]
D = self.dimensionality
LE = dx0 * ((np.array(list(fileh["/level_0"].attrs["prob_domain"])))[0:D])
return LE
def _calc_right_edge(self):
fileh = self._handle
dx0 = fileh["/level_0"].attrs["dx"]
D = self.dimensionality
RE = dx0 * ((np.array(list(fileh["/level_0"].attrs["prob_domain"])))[D:] + 1)
return RE
def _calc_domain_dimensions(self):
fileh = self._handle
D = self.dimensionality
L_index = (np.array(list(fileh["/level_0"].attrs["prob_domain"])))[0:D]
R_index = (np.array(list(fileh["/level_0"].attrs["prob_domain"])))[D:] + 1
return R_index - L_index
@classmethod
def _is_valid(cls, filename: str, *args, **kwargs) -> bool:
if cls._missing_load_requirements():
return False
if not is_chombo_hdf5(filename):
return False
pluto_ini_file_exists = False
orion2_ini_file_exists = False
if isinstance(filename, str):
dir_name = os.path.dirname(os.path.abspath(filename))
pluto_ini_filename = os.path.join(dir_name, "pluto.ini")
orion2_ini_filename = os.path.join(dir_name, "orion2.ini")
pluto_ini_file_exists = os.path.isfile(pluto_ini_filename)
orion2_ini_file_exists = os.path.isfile(orion2_ini_filename)
if not (pluto_ini_file_exists or orion2_ini_file_exists):
try:
fileh = h5py.File(filename, mode="r")
valid = "Chombo_global" in fileh["/"]
# ORION2 simulations should always have this:
valid = valid and "CeilVA_mass" not in fileh.attrs.keys()
valid = valid and "Charm_global" not in fileh.keys()
fileh.close()
return valid
except Exception:
pass
return False
[docs]
@parallel_root_only
def print_key_parameters(self):
for a in [
"current_time",
"domain_dimensions",
"domain_left_edge",
"domain_right_edge",
]:
if not hasattr(self, a):
mylog.error("Missing %s in parameter file definition!", a)
continue
v = getattr(self, a)
mylog.info("Parameters: %-25s = %s", a, v)
[docs]
class PlutoHierarchy(ChomboHierarchy):
def __init__(self, ds, dataset_type="chombo_hdf5"):
ChomboHierarchy.__init__(self, ds, dataset_type)
def _parse_index(self):
f = self._handle # shortcut
self.max_level = f.attrs["num_levels"] - 1
grids = []
self.dds_list = []
i = 0
D = self.dataset.dimensionality
for lev_index, lev in enumerate(self._levels):
level_number = int(re.match(r"level_(\d+)", lev).groups()[0])
try:
boxes = f[lev]["boxes"][()]
except KeyError:
boxes = f[lev]["particles:boxes"][()]
dx = f[lev].attrs["dx"]
self.dds_list.append(dx * np.ones(3))
if D == 1:
self.dds_list[lev_index][1] = 1.0
self.dds_list[lev_index][2] = 1.0
if D == 2:
self.dds_list[lev_index][2] = 1.0
for level_id, box in enumerate(boxes):
si = np.array([box[f"lo_{ax}"] for ax in "ijk"[:D]])
ei = np.array([box[f"hi_{ax}"] for ax in "ijk"[:D]])
if D == 1:
si = np.concatenate((si, [0.0, 0.0]))
ei = np.concatenate((ei, [0.0, 0.0]))
if D == 2:
si = np.concatenate((si, [0.0]))
ei = np.concatenate((ei, [0.0]))
pg = self.grid(len(grids), self, level=level_number, start=si, stop=ei)
grids.append(pg)
grids[-1]._level_id = level_id
self.grid_levels[i] = level_number
self.grid_left_edge[i] = (
self.dds_list[lev_index] * si.astype(self.float_type)
+ self.domain_left_edge.value
)
self.grid_right_edge[i] = (
self.dds_list[lev_index] * (ei.astype(self.float_type) + 1)
+ self.domain_left_edge.value
)
self.grid_particle_count[i] = 0
self.grid_dimensions[i] = ei - si + 1
i += 1
self.grids = np.empty(len(grids), dtype="object")
for gi, g in enumerate(grids):
self.grids[gi] = g
[docs]
class PlutoDataset(ChomboDataset):
_index_class = PlutoHierarchy
_field_info_class = PlutoFieldInfo
def __init__(
self,
filename,
dataset_type="chombo_hdf5",
storage_filename=None,
ini_filename=None,
units_override=None,
unit_system="cgs",
default_species_fields=None,
):
ChomboDataset.__init__(
self,
filename,
dataset_type,
storage_filename,
ini_filename,
units_override=units_override,
unit_system=unit_system,
default_species_fields=default_species_fields,
)
def _parse_parameter_file(self):
"""
Check to see whether a 'pluto.ini' file
exists in the plot file directory. If one does, attempt to parse it.
Otherwise grab the dimensions from the hdf5 file.
"""
pluto_ini_file_exists = False
dir_name = os.path.dirname(os.path.abspath(self.fullplotdir))
pluto_ini_filename = os.path.join(dir_name, "pluto.ini")
pluto_ini_file_exists = os.path.isfile(pluto_ini_filename)
self.dimensionality = self._handle["Chombo_global/"].attrs["SpaceDim"]
self.domain_dimensions = self._calc_domain_dimensions()
self.refine_by = self._handle["/level_0"].attrs["ref_ratio"]
if pluto_ini_file_exists:
lines = [line.strip() for line in open(pluto_ini_filename)]
domain_left_edge = np.zeros(self.dimensionality)
domain_right_edge = np.zeros(self.dimensionality)
for il, ll in enumerate(
lines[
lines.index("[Grid]") + 2 : lines.index("[Grid]")
+ 2
+ self.dimensionality
]
):
domain_left_edge[il] = float(ll.split()[2])
domain_right_edge[il] = float(ll.split()[-1])
self._periodicity = [0] * 3
for il, ll in enumerate(
lines[
lines.index("[Boundary]") + 2 : lines.index("[Boundary]")
+ 2
+ 6 : 2
]
):
self.periodicity[il] = ll.split()[1] == "periodic"
self._periodicity = tuple(self.periodicity)
for ll in lines[lines.index("[Parameters]") + 2 :]:
if ll.split()[0] == "GAMMA":
self.gamma = float(ll.split()[1])
self.domain_left_edge = domain_left_edge
self.domain_right_edge = domain_right_edge
else:
self.domain_left_edge = self._calc_left_edge()
self.domain_right_edge = self._calc_right_edge()
self._periodicity = (True, True, True)
# if a lower-dimensional dataset, set up pseudo-3D stuff here.
if self.dimensionality == 1:
self.domain_left_edge = np.concatenate((self.domain_left_edge, [0.0, 0.0]))
self.domain_right_edge = np.concatenate(
(self.domain_right_edge, [1.0, 1.0])
)
self.domain_dimensions = np.concatenate((self.domain_dimensions, [1, 1]))
if self.dimensionality == 2:
self.domain_left_edge = np.concatenate((self.domain_left_edge, [0.0]))
self.domain_right_edge = np.concatenate((self.domain_right_edge, [1.0]))
self.domain_dimensions = np.concatenate((self.domain_dimensions, [1]))
self._determine_current_time()
@classmethod
def _is_valid(cls, filename: str, *args, **kwargs) -> bool:
if cls._missing_load_requirements():
return False
if not is_chombo_hdf5(filename):
return False
pluto_ini_file_exists = False
if isinstance(filename, str):
dir_name = os.path.dirname(os.path.abspath(filename))
pluto_ini_filename = os.path.join(dir_name, "pluto.ini")
pluto_ini_file_exists = os.path.isfile(pluto_ini_filename)
if pluto_ini_file_exists:
return True
return False
[docs]
class Orion2Hierarchy(ChomboHierarchy):
def __init__(self, ds, dataset_type="orion_chombo_native"):
ChomboHierarchy.__init__(self, ds, dataset_type)
def _detect_output_fields(self):
# look for fluid fields
output_fields = []
for key, val in self._handle.attrs.items():
if key.startswith("component"):
output_fields.append(val.decode("ascii"))
self.field_list = [("chombo", c) for c in output_fields]
# look for particle fields
self.particle_filename = self.index_filename[:-4] + "sink"
if not os.path.exists(self.particle_filename):
return
pfield_list = [("io", str(c)) for c in self.io.particle_field_index.keys()]
self.field_list.extend(pfield_list)
def _read_particles(self):
if not os.path.exists(self.particle_filename):
return
with open(self.particle_filename) as f:
lines = f.readlines()
self.num_stars = int(lines[0].strip().split(" ")[0])
for num, line in enumerate(lines[1:]):
particle_position_x = float(line.split(" ")[1])
particle_position_y = float(line.split(" ")[2])
particle_position_z = float(line.split(" ")[3])
coord = [particle_position_x, particle_position_y, particle_position_z]
# for each particle, determine which grids contain it
# copied from object_finding_mixin.py
mask = np.ones(self.num_grids)
for i in range(len(coord)):
np.choose(
np.greater(self.grid_left_edge.d[:, i], coord[i]),
(mask, 0),
mask,
)
np.choose(
np.greater(self.grid_right_edge.d[:, i], coord[i]),
(0, mask),
mask,
)
ind = np.where(mask == 1)
selected_grids = self.grids[ind]
# in orion, particles always live on the finest level.
# so, we want to assign the particle to the finest of
# the grids we just found
if len(selected_grids) != 0:
grid = sorted(selected_grids, key=lambda grid: grid.Level)[-1]
ind = np.where(self.grids == grid)[0][0]
self.grid_particle_count[ind] += 1
self.grids[ind].NumberOfParticles += 1
# store the position in the *.sink file for fast access.
try:
self.grids[ind]._particle_line_numbers.append(num + 1)
except AttributeError:
self.grids[ind]._particle_line_numbers = [num + 1]
[docs]
class Orion2Dataset(ChomboDataset):
_load_requirements = ["h5py"]
_index_class = Orion2Hierarchy
_field_info_class = Orion2FieldInfo
def __init__(
self,
filename,
dataset_type="orion_chombo_native",
storage_filename=None,
ini_filename=None,
units_override=None,
default_species_fields=None,
):
ChomboDataset.__init__(
self,
filename,
dataset_type,
storage_filename,
ini_filename,
units_override=units_override,
default_species_fields=default_species_fields,
)
def _parse_parameter_file(self):
"""
Check to see whether an 'orion2.ini' file
exists in the plot file directory. If one does, attempt to parse it.
Otherwise grab the dimensions from the hdf5 file.
"""
orion2_ini_file_exists = False
dir_name = os.path.dirname(os.path.abspath(self.fullplotdir))
orion2_ini_filename = os.path.join(dir_name, "orion2.ini")
orion2_ini_file_exists = os.path.isfile(orion2_ini_filename)
if orion2_ini_file_exists:
self._parse_inputs_file("orion2.ini")
self.dimensionality = 3
self.domain_left_edge = self._calc_left_edge()
self.domain_right_edge = self._calc_right_edge()
self.domain_dimensions = self._calc_domain_dimensions()
self.refine_by = self._handle["/level_0"].attrs["ref_ratio"]
self._determine_periodic()
self._determine_current_time()
def _parse_inputs_file(self, ini_filename):
self.fullplotdir = os.path.abspath(self.parameter_filename)
self.ini_filename = self._localize(self.ini_filename, ini_filename)
lines = open(self.ini_filename).readlines()
# read the file line by line, storing important parameters
for line in lines:
try:
param, sep, vals = line.partition("=")
if not sep:
# No = sign present, so split by space instead
param, sep, vals = line.partition(" ")
param = param.strip()
vals = vals.strip()
if not param: # skip blank lines
continue
if param[0] == "#": # skip comment lines
continue
if param[0] == "[": # skip stanza headers
continue
vals = vals.partition("#")[0] # strip trailing comments
try:
self.parameters[param] = np.int64(vals)
except ValueError:
try:
self.parameters[param] = np.float64(vals)
except ValueError:
self.parameters[param] = vals
except ValueError:
mylog.error("ValueError: '%s'", line)
if param == "GAMMA":
self.gamma = np.float64(vals)
@classmethod
def _is_valid(cls, filename: str, *args, **kwargs) -> bool:
if cls._missing_load_requirements():
return False
if not is_chombo_hdf5(filename):
return False
pluto_ini_file_exists = False
orion2_ini_file_exists = False
if isinstance(filename, str):
dir_name = os.path.dirname(os.path.abspath(filename))
pluto_ini_filename = os.path.join(dir_name, "pluto.ini")
orion2_ini_filename = os.path.join(dir_name, "orion2.ini")
pluto_ini_file_exists = os.path.isfile(pluto_ini_filename)
orion2_ini_file_exists = os.path.isfile(orion2_ini_filename)
if orion2_ini_file_exists:
return True
if not pluto_ini_file_exists:
try:
fileh = h5py.File(filename, mode="r")
valid = "CeilVA_mass" in fileh.attrs.keys()
valid = (
"Chombo_global" in fileh["/"] and "Charm_global" not in fileh["/"]
)
valid = valid and "CeilVA_mass" in fileh.attrs.keys()
fileh.close()
return valid
except Exception:
pass
return False
[docs]
class ChomboPICHierarchy(ChomboHierarchy):
def __init__(self, ds, dataset_type="chombo_hdf5"):
ChomboHierarchy.__init__(self, ds, dataset_type)
[docs]
class ChomboPICDataset(ChomboDataset):
_load_requirements = ["h5py"]
_index_class = ChomboPICHierarchy
_field_info_class = ChomboPICFieldInfo3D
def __init__(
self,
filename,
dataset_type="chombo_hdf5",
storage_filename=None,
ini_filename=None,
units_override=None,
):
ChomboDataset.__init__(
self,
filename,
dataset_type,
storage_filename,
ini_filename,
units_override=units_override,
)
if self.dimensionality == 1:
self._field_info_class = ChomboPICFieldInfo1D
if self.dimensionality == 2:
self._field_info_class = ChomboPICFieldInfo2D
@classmethod
def _is_valid(cls, filename: str, *args, **kwargs) -> bool:
if not valid_hdf5_signature(filename):
return False
if cls._missing_load_requirements():
return False
if not is_chombo_hdf5(filename):
return False
pluto_ini_file_exists = False
orion2_ini_file_exists = False
if isinstance(filename, str):
dir_name = os.path.dirname(os.path.abspath(filename))
pluto_ini_filename = os.path.join(dir_name, "pluto.ini")
orion2_ini_filename = os.path.join(dir_name, "orion2.ini")
pluto_ini_file_exists = os.path.isfile(pluto_ini_filename)
orion2_ini_file_exists = os.path.isfile(orion2_ini_filename)
if orion2_ini_file_exists:
return False
if pluto_ini_file_exists:
return False
try:
fileh = h5py.File(filename, mode="r")
valid = "Charm_global" in fileh["/"]
fileh.close()
return valid
except Exception:
pass
return False