"""
AMRVAC-specific IO functions
"""
import os
import numpy as np
from more_itertools import always_iterable
from yt.geometry.selection_routines import GridSelector
from yt.utilities.io_handler import BaseIOHandler
from yt.utilities.on_demand_imports import _f90nml as f90nml
[docs]
def read_amrvac_namelist(parfiles):
"""Read one or more parfiles, and return a unified f90nml.Namelist object.
This function replicates the patching logic of MPI-AMRVAC where redundant parameters
only retain last-in-line values, with the exception of `&filelist:base_filename`,
which is accumulated. When passed a single file, this function acts as a mere
wrapper of f90nml.read().
Parameters
----------
parfiles : str, os.Pathlike, byte, or an iterable returning those types
A file path, or a list of file paths to MPI-AMRVAC configuration parfiles.
Returns
-------
unified_namelist : f90nml.Namelist
A single namelist object. The class inherits from ordereddict.
"""
parfiles = (os.path.expanduser(pf) for pf in always_iterable(parfiles))
# first merge the namelists
namelists = [f90nml.read(parfile) for parfile in parfiles]
unified_namelist = f90nml.Namelist()
for nml in namelists:
unified_namelist.patch(nml)
if "filelist" not in unified_namelist:
return unified_namelist
# accumulate `&filelist:base_filename`
base_filename = "".join(
nml.get("filelist", {}).get("base_filename", "") for nml in namelists
)
unified_namelist["filelist"]["base_filename"] = base_filename
return unified_namelist
[docs]
class AMRVACIOHandler(BaseIOHandler):
_particle_reader = False
_dataset_type = "amrvac"
def __init__(self, ds):
BaseIOHandler.__init__(self, ds)
self.ds = ds
self.datfile = ds.parameter_filename
header = self.ds.parameters
self.block_shape = np.append(header["block_nx"], header["nw"])
def _read_particle_coords(self, chunks, ptf):
"""Not implemented yet."""
# This needs to *yield* a series of tuples of (ptype, (x, y, z)).
# chunks is a list of chunks, and ptf is a dict where the keys are
# ptypes and the values are lists of fields.
raise NotImplementedError
def _read_particle_fields(self, chunks, ptf, selector):
"""Not implemented yet."""
# This gets called after the arrays have been allocated. It needs to
# yield ((ptype, field), data) where data is the masked results of
# reading ptype, field and applying the selector to the data read in.
# Selector objects have a .select_points(x,y,z) that returns a mask, so
# you need to do your masking here.
raise NotImplementedError
def _read_data(self, fid, grid, field):
"""Retrieve field data from a grid.
Parameters
----------
fid: file descriptor (open binary file with read access)
grid : yt.frontends.amrvac.data_structures.AMRVACGrid
The grid from which data is to be read.
field : str
A field name.
Returns
-------
data : np.ndarray
A 3D array of float64 type representing grid data.
"""
ileaf = grid.id
offset = grid._index.block_offsets[ileaf]
field_idx = self.ds.parameters["w_names"].index(field)
field_shape = self.block_shape[:-1]
count = np.prod(field_shape)
byte_size_field = count * 8 # size of a double
fid.seek(offset + byte_size_field * field_idx)
data = np.fromfile(fid, "=f8", count=count)
data.shape = field_shape[::-1]
data = data.T
# Always convert data to 3D, as grid.ActiveDimensions is always 3D
while len(data.shape) < 3:
data = data[..., np.newaxis]
return data
def _read_fluid_selection(self, chunks, selector, fields, size):
"""Retrieve field(s) data in a selected region of space.
Parameters
----------
chunks : generator
A generator for multiple chunks, each of which contains a list of grids.
selector : yt.geometry.selection_routines.SelectorObject
A spatial region selector.
fields : list
A list of tuples (ftype, fname).
size : np.int64
The cumulative number of objs contained in all chunks.
Returns
-------
data_dict : dict
keys are the (ftype, fname) tuples, values are arrays that have been masked
using whatever selector method is appropriate. Arrays have dtype float64.
"""
# @Notes from Niels:
# The chunks list has YTDataChunk objects containing the different grids.
# The list of grids can be obtained by doing eg.
# grids_list = chunks[0].objs or chunks[1].objs etc.
# Every element in "grids_list" is then an AMRVACGrid object,
# and has hence all attributes of a grid :
# (Level, ActiveDimensions, LeftEdge, etc.)
chunks = list(chunks)
data_dict = {} # <- return variable
if isinstance(selector, GridSelector):
if not len(chunks) == len(chunks[0].objs) == 1:
raise RuntimeError
grid = chunks[0].objs[0]
with open(self.datfile, "rb") as fh:
for ftype, fname in fields:
data_dict[ftype, fname] = self._read_data(fh, grid, fname)
else:
if size is None:
size = sum(g.count(selector) for chunk in chunks for g in chunk.objs)
for field in fields:
data_dict[field] = np.empty(size, dtype="float64")
# nb_grids = sum(len(chunk.objs) for chunk in chunks)
with open(self.datfile, "rb") as fh:
ind = 0
for chunk in chunks:
for grid in chunk.objs:
nd = 0
for field in fields:
ftype, fname = field
data = self._read_data(fh, grid, fname)
nd = grid.select(selector, data, data_dict[field], ind)
ind += nd
return data_dict
def _read_chunk_data(self, chunk, fields):
"""Not implemented yet."""
# This reads the data from a single chunk without doing any selection,
# and is only used for caching data that might be used by multiple
# different selectors later. For instance, this can speed up ghost zone
# computation.
# it should be used by _read_fluid_selection instead of _read_data
raise NotImplementedError