Source code for yt.frontends.exodus_ii.simulation_handling

import glob

from yt.data_objects.time_series import DatasetSeries
from yt.funcs import only_on_root
from yt.loaders import load
from yt.utilities.exceptions import YTUnidentifiedDataType
from yt.utilities.logger import ytLogger as mylog
from yt.utilities.parallel_tools.parallel_analysis_interface import parallel_objects


[docs] class ExodusIISimulation(DatasetSeries): r""" Initialize an ExodusII Simulation object. Upon creation, the input directory is searched for valid ExodusIIDatasets. The get_time_series can be used to generate a DatasetSeries object. simulation_directory : str The directory that contain the simulation data. Examples -------- >>> import yt >>> sim = yt.load_simulation("demo_second", "ExodusII") >>> sim.get_time_series() >>> for ds in sim: ... print(ds.current_time) """ def __init__(self, simulation_directory, find_outputs=False): self.simulation_directory = simulation_directory fn_pattern = f"{self.simulation_directory}/*" potential_outputs = glob.glob(fn_pattern) self.all_outputs = self._check_for_outputs(potential_outputs) self.all_outputs.sort(key=lambda obj: obj["filename"]) def __iter__(self): for o in self._pre_outputs: fn, step = o ds = load(fn, step=step) self._setup_function(ds) yield ds def __getitem__(self, key): if isinstance(key, slice): if isinstance(key.start, float): return self.get_range(key.start, key.stop) # This will return a sliced up object! return DatasetSeries(self._pre_outputs[key], self.parallel) o = self._pre_outputs[key] fn, step = o o = load(fn, step=step) self._setup_function(o) return o
[docs] def get_time_series(self, parallel=False, setup_function=None): r""" Instantiate a DatasetSeries object for a set of outputs. If no additional keywords given, a DatasetSeries object will be created with all potential datasets created by the simulation. Fine-level filtering is currently not implemented. """ all_outputs = self.all_outputs ds_list = [] for output in all_outputs: num_steps = output["num_steps"] fn = output["filename"] for step in range(num_steps): ds_list.append((fn, step)) super().__init__(ds_list, parallel=parallel, setup_function=setup_function)
def _check_for_outputs(self, potential_outputs): r""" Check a list of files to see if they are valid datasets. """ only_on_root( mylog.info, "Checking %d potential outputs.", len(potential_outputs) ) my_outputs = {} for my_storage, output in parallel_objects( potential_outputs, storage=my_outputs ): try: ds = load(output) except (FileNotFoundError, YTUnidentifiedDataType): mylog.error("Failed to load %s", output) continue my_storage.result = {"filename": output, "num_steps": ds.num_steps} my_outputs = [ my_output for my_output in my_outputs.values() if my_output is not None ] return my_outputs