Source code for yt.data_objects.particle_trajectories

import numpy as np

from yt.config import ytcfg
from yt.data_objects.field_data import YTFieldData
from yt.funcs import get_pbar, mylog
from yt.units.yt_array import array_like_field
from yt.utilities.exceptions import YTIllDefinedParticleData
from yt.utilities.lib.particle_mesh_operations import CICSample_3
from yt.utilities.on_demand_imports import _h5py as h5py
from yt.utilities.parallel_tools.parallel_analysis_interface import parallel_root_only


[docs] class ParticleTrajectories: r"""A collection of particle trajectories in time over a series of datasets. Parameters ---------- outputs : ~yt.data_objects.time_series.DatasetSeries DatasetSeries object from which to draw the particles. indices : array_like An integer array of particle indices whose trajectories we want to track. If they are not sorted they will be sorted. fields : list of strings, optional A set of fields that is retrieved when the trajectory collection is instantiated. Default: None (will default to the fields 'particle_position_x', 'particle_position_y', 'particle_position_z') suppress_logging : boolean Suppress yt's logging when iterating over the simulation time series. Default: False ptype : str, optional Only use this particle type. Default: None, which uses all particle type. Examples -------- >>> my_fns = glob.glob("orbit_hdf5_chk_00[0-9][0-9]") >>> my_fns.sort() >>> fields = [ ... ("all", "particle_position_x"), ... ("all", "particle_position_y"), ... ("all", "particle_position_z"), ... ("all", "particle_velocity_x"), ... ("all", "particle_velocity_y"), ... ("all", "particle_velocity_z"), ... ] >>> ds = load(my_fns[0]) >>> init_sphere = ds.sphere(ds.domain_center, (0.5, "unitary")) >>> indices = init_sphere["all", "particle_index"].astype("int64") >>> ts = DatasetSeries(my_fns) >>> trajs = ts.particle_trajectories(indices, fields=fields) >>> for t in trajs: ... print( ... t["all", "particle_velocity_x"].max(), ... t["all", "particle_velocity_x"].min(), ... ) """ def __init__( self, outputs, indices, fields=None, suppress_logging=False, ptype=None ): indices.sort() # Just in case the caller wasn't careful self.field_data = YTFieldData() self.data_series = outputs self.masks = [] self.sorts = [] self.array_indices = [] self.indices = indices self.num_indices = len(indices) self.num_steps = len(outputs) self.times = [] self.suppress_logging = suppress_logging self.ptype = ptype if ptype else "all" if fields is None: fields = [] if self.suppress_logging: old_level = int(ytcfg.get("yt", "log_level")) mylog.setLevel(40) ds_first = self.data_series[0] dd_first = ds_first.all_data() fds = {} for field in ( "particle_index", "particle_position_x", "particle_position_y", "particle_position_z", ): fds[field] = dd_first._determine_fields((self.ptype, field))[0] # Note: we explicitly pass dynamic=False to prevent any change in piter from # breaking the assumption that the same processors load the same datasets my_storage = {} pbar = get_pbar("Constructing trajectory information", len(self.data_series)) for i, (sto, ds) in enumerate( self.data_series.piter(storage=my_storage, dynamic=False) ): dd = ds.all_data() newtags = dd[fds["particle_index"]].d.astype("int64") mask = np.isin(newtags, indices, assume_unique=True) sort = np.argsort(newtags[mask]) array_indices = np.where(np.isin(indices, newtags, assume_unique=True))[0] self.array_indices.append(array_indices) self.masks.append(mask) self.sorts.append(sort) pfields = {} for field in (f"particle_position_{ax}" for ax in "xyz"): pfields[field] = dd[fds[field]].ndarray_view()[mask][sort] sto.result_id = ds.parameter_filename sto.result = (ds.current_time, array_indices, pfields) pbar.update(i + 1) pbar.finish() if self.suppress_logging: mylog.setLevel(old_level) sorted_storage = sorted(my_storage.items()) _fn, (time, *_) = sorted_storage[0] time_units = time.units times = [time.to(time_units) for _fn, (time, *_) in sorted_storage] self.times = self.data_series[0].arr([time.value for time in times], time_units) self.particle_fields = [] output_field = np.empty((self.num_indices, self.num_steps)) output_field.fill(np.nan) for field in (f"particle_position_{ax}" for ax in "xyz"): for i, (_fn, (_time, indices, pfields)) in enumerate(sorted_storage): try: # This will fail if particles ids are # duplicate. This is due to the fact that the rhs # would then have a different shape as the lhs output_field[indices, i] = pfields[field] except ValueError as e: raise YTIllDefinedParticleData( "This dataset contains duplicate particle indices!" ) from e self.field_data[fds[field]] = array_like_field( dd_first, output_field.copy(), fds[field] ) self.particle_fields.append(field) # Instantiate fields the caller requested self._get_data(fields)
[docs] def has_key(self, key): return key in self.field_data
[docs] def keys(self): return self.field_data.keys()
def __getitem__(self, key): """ Get the field associated with key. """ if key == "particle_time": return self.times if key not in self.field_data: self._get_data([key]) return self.field_data[key] def __setitem__(self, key, val): """ Sets a field to be some other value. """ self.field_data[key] = val def __delitem__(self, key): """ Delete the field from the trajectory """ del self.field_data[key] def __iter__(self): """ This iterates over the trajectories for the different particles, returning dicts of fields for each trajectory """ for idx in range(self.num_indices): traj = {} traj["particle_index"] = self.indices[idx] traj["particle_time"] = self.times for field in self.field_data.keys(): traj[field] = self[field][idx, :] yield traj def __len__(self): """ The number of individual trajectories """ return self.num_indices
[docs] def add_fields(self, fields): """ Add a list of fields to an existing trajectory Parameters ---------- fields : list of strings A list of fields to be added to the current trajectory collection. Examples ________ >>> trajs = ParticleTrajectories(my_fns, indices) >>> trajs.add_fields([("all", "particle_mass"), ("all", "particle_gpot")]) """ self._get_data(fields)
def _get_data(self, fields): """ Get a list of fields to include in the trajectory collection. The trajectory collection itself is a dict of 2D numpy arrays, with shape (num_indices, num_steps) """ missing_fields = [field for field in fields if field not in self.field_data] if not missing_fields: return if self.suppress_logging: old_level = int(ytcfg.get("yt", "log_level")) mylog.setLevel(40) ds_first = self.data_series[0] dd_first = ds_first.all_data() fds = {} new_particle_fields = [] for field in missing_fields: fds[field] = dd_first._determine_fields(field)[0] if field not in self.particle_fields: ftype = fds[field][0] if ftype in ds_first.particle_types: self.particle_fields.append(field) new_particle_fields.append(field) grid_fields = [ field for field in missing_fields if field not in self.particle_fields ] step = 0 fields_str = ", ".join(str(f) for f in missing_fields) pbar = get_pbar( f"Generating [{fields_str}] fields in trajectories", self.num_steps, ) # Note: we explicitly pass dynamic=False to prevent any change in piter from # breaking the assumption that the same processors load the same datasets my_storage = {} for i, (sto, ds) in enumerate( self.data_series.piter(storage=my_storage, dynamic=False) ): mask = self.masks[i] sort = self.sorts[i] pfield = {} if new_particle_fields: # there's at least one particle field dd = ds.all_data() for field in new_particle_fields: # This is easy... just get the particle fields pfield[field] = dd[fds[field]].d[mask][sort] if grid_fields: # This is hard... must loop over grids for field in grid_fields: pfield[field] = np.zeros(self.num_indices) x = self["particle_position_x"][:, step].d y = self["particle_position_y"][:, step].d z = self["particle_position_z"][:, step].d particle_grids, particle_grid_inds = ds.index._find_points(x, y, z) # This will fail for non-grid index objects for grid in particle_grids: cube = grid.retrieve_ghost_zones(1, grid_fields) for field in grid_fields: CICSample_3( x, y, z, pfield[field], self.num_indices, cube[fds[field]], np.array(grid.LeftEdge, dtype="float64"), np.array(grid.ActiveDimensions, dtype="int32"), grid.dds[0], ) sto.result_id = ds.parameter_filename sto.result = (self.array_indices[i], pfield) pbar.update(step) step += 1 pbar.finish() output_field = np.empty((self.num_indices, self.num_steps)) output_field.fill(np.nan) for field in missing_fields: fd = fds[field] for i, (_fn, (indices, pfield)) in enumerate(sorted(my_storage.items())): output_field[indices, i] = pfield[field] self.field_data[field] = array_like_field(dd_first, output_field.copy(), fd) if self.suppress_logging: mylog.setLevel(old_level)
[docs] def trajectory_from_index(self, index): """ Retrieve a single trajectory corresponding to a specific particle index Parameters ---------- index : int This defines which particle trajectory from the ParticleTrajectories object will be returned. Returns ------- A dictionary corresponding to the particle's trajectory and the fields along that trajectory Examples -------- >>> import matplotlib.pyplot as plt >>> trajs = ParticleTrajectories(my_fns, indices) >>> traj = trajs.trajectory_from_index(indices[0]) >>> plt.plot( ... traj["all", "particle_time"], ... traj["all", "particle_position_x"], ... "-x", ... ) >>> plt.savefig("orbit") """ mask = np.isin(self.indices, (index,), assume_unique=True) if not np.any(mask): print("The particle index %d is not in the list!" % (index)) raise IndexError fields = sorted(self.field_data.keys()) traj = {} traj[self.ptype, "particle_time"] = self.times traj[self.ptype, "particle_index"] = index for field in fields: traj[field] = self[field][mask, :][0] return traj
[docs] @parallel_root_only def write_out(self, filename_base): """ Write out particle trajectories to tab-separated ASCII files (one for each trajectory) with the field names in the file header. Each file is named with a basename and the index number. Parameters ---------- filename_base : string The prefix for the outputted ASCII files. Examples -------- >>> trajs = ParticleTrajectories(my_fns, indices) >>> trajs.write_out("orbit_trajectory") """ fields = sorted(self.field_data.keys()) num_fields = len(fields) first_str = "# particle_time\t" + "\t".join(fields) + "\n" template_str = "%g\t" * num_fields + "%g\n" for ix in range(self.num_indices): outlines = [first_str] for it in range(self.num_steps): outlines.append( template_str % tuple( [self.times[it]] + [self[field][ix, it] for field in fields] ) ) fid = open(filename_base + "_%d.dat" % self.indices[ix], "w") fid.writelines(outlines) fid.close() del fid
[docs] @parallel_root_only def write_out_h5(self, filename): """ Write out all the particle trajectories to a single HDF5 file that contains the indices, the times, and the 2D array for each field individually Parameters ---------- filename : string The output filename for the HDF5 file Examples -------- >>> trajs = ParticleTrajectories(my_fns, indices) >>> trajs.write_out_h5("orbit_trajectories") """ fid = h5py.File(filename, mode="w") fid.create_dataset("particle_indices", dtype=np.int64, data=self.indices) fid.close() self.times.write_hdf5(filename, dataset_name="particle_times") fields = sorted(self.field_data.keys()) for field in fields: self[field].write_hdf5(filename, dataset_name=f"{field}")