from collections import defaultdict
import numpy as np
from yt.frontends.open_pmd.misc import get_component, is_const_component
from yt.geometry.selection_routines import GridSelector
from yt.utilities.io_handler import BaseIOHandler
[docs]
class IOHandlerOpenPMDHDF5(BaseIOHandler):
_field_dtype = "float32"
_dataset_type = "openPMD"
def __init__(self, ds, *args, **kwargs):
self.ds = ds
self._handle = ds._handle
self.base_path = ds.base_path
self.meshes_path = ds.meshes_path
self.particles_path = ds.particles_path
self._array_fields = {}
self._cached_ptype = ""
def _fill_cache(self, ptype, index=0, offset=None):
"""Fills the particle position cache for the ``ptype``.
Parameters
----------
ptype : str
The on-disk name of the particle species
index : int, optional
offset : int, optional
"""
if str((ptype, index, offset)) not in self._cached_ptype:
self._cached_ptype = str((ptype, index, offset))
pds = self._handle[self.base_path + self.particles_path + "/" + ptype]
axes = list(pds["position"].keys())
if offset is None:
if is_const_component(pds["position/" + axes[0]]):
offset = pds["position/" + axes[0]].attrs["shape"]
else:
offset = pds["position/" + axes[0]].len()
self.cache = np.empty((3, offset), dtype=np.float64)
for i in np.arange(3):
ax = "xyz"[i]
if ax in axes:
np.add(
get_component(pds, "position/" + ax, index, offset),
get_component(pds, "positionOffset/" + ax, index, offset),
self.cache[i],
)
else:
# Pad accordingly with zeros to make 1D/2D datasets compatible
# These have to be the same shape as the existing axes since that
# equals the number of particles
self.cache[i] = np.zeros(offset)
def _read_particle_selection(self, chunks, selector, fields):
"""Read particle fields for particle species masked by a selection.
Parameters
----------
chunks
A list of chunks
A chunk is a list of grids
selector
A region (inside your domain) specifying which parts of the field
you want to read. See [1] and [2]
fields : array_like
Tuples (ptype, pfield) representing a field
Returns
-------
dict
keys are tuples (ptype, pfield) representing a field
values are (N,) ndarrays with data from that field
"""
f = self._handle
bp = self.base_path
pp = self.particles_path
ds = f[bp + pp]
unions = self.ds.particle_unions
chunks = list(chunks) # chunks is a generator
rv = {}
ind = {}
particle_count = {}
ptf = defaultdict(list) # ParticleTypes&Fields
rfm = defaultdict(list) # RequestFieldMapping
for ptype, pname in fields:
pfield = (ptype, pname)
# Overestimate the size of all pfields so they include all particles
# and shrink it later
particle_count[pfield] = 0
if ptype in unions:
for pt in unions[ptype]:
particle_count[pfield] += self.ds.particle_type_counts[pt]
ptf[pt].append(pname)
rfm[pt, pname].append(pfield)
else:
particle_count[pfield] = self.ds.particle_type_counts[ptype]
ptf[ptype].append(pname)
rfm[pfield].append(pfield)
rv[pfield] = np.empty((particle_count[pfield],), dtype=np.float64)
ind[pfield] = 0
for ptype in ptf:
for chunk in chunks:
for grid in chunk.objs:
if str(ptype) == "io":
species = list(ds.keys())[0]
else:
species = ptype
if species not in grid.ptypes:
continue
# read particle coords into cache
self._fill_cache(species, grid.pindex, grid.poffset)
mask = selector.select_points(
self.cache[0], self.cache[1], self.cache[2], 0.0
)
if mask is None:
continue
pds = ds[species]
for field in ptf[ptype]:
component = "/".join(field.split("_")[1:])
component = component.replace("positionCoarse", "position")
component = component.replace("-", "_")
data = get_component(pds, component, grid.pindex, grid.poffset)[
mask
]
for request_field in rfm[ptype, field]:
rv[request_field][
ind[request_field] : ind[request_field] + data.shape[0]
] = data
ind[request_field] += data.shape[0]
for field in fields:
rv[field] = rv[field][: ind[field]]
return rv
def _read_fluid_selection(self, chunks, selector, fields, size):
"""Reads given fields masked by a given selection.
Parameters
----------
chunks
A list of chunks
A chunk is a list of grids
selector
A region (inside your domain) specifying which parts of the field
you want to read. See [1] and [2]
fields : array_like
Tuples (fname, ftype) representing a field
size : int
Size of the data to read
Returns
-------
dict
keys are tuples (ftype, fname) representing a field
values are flat (``size``,) ndarrays with data from that field
"""
f = self._handle
bp = self.base_path
mp = self.meshes_path
ds = f[bp + mp]
chunks = list(chunks)
rv = {}
ind = {}
if isinstance(selector, GridSelector):
if not (len(chunks) == len(chunks[0].objs) == 1):
raise RuntimeError
if size is None:
size = sum(g.count(selector) for chunk in chunks for g in chunk.objs)
for field in fields:
rv[field] = np.empty(size, dtype=np.float64)
ind[field] = 0
for ftype, fname in fields:
field = (ftype, fname)
for chunk in chunks:
for grid in chunk.objs:
mask = grid._get_selector_mask(selector)
if mask is None:
continue
component = fname.replace("_", "/").replace("-", "_")
if component.split("/")[0] not in grid.ftypes:
data = np.full(grid.ActiveDimensions, 0, dtype=np.float64)
else:
data = get_component(ds, component, grid.findex, grid.foffset)
# The following is a modified AMRGridPatch.select(...)
data.shape = (
mask.shape
) # Workaround - casts a 2D (x,y) array to 3D (x,y,1)
count = grid.count(selector)
rv[field][ind[field] : ind[field] + count] = data[mask]
ind[field] += count
for field in fields:
rv[field] = rv[field][: ind[field]]
rv[field].flatten()
return rv