Source code for yt.frontends.athena_pp.io

from itertools import groupby

import numpy as np

from yt.utilities.io_handler import BaseIOHandler
from yt.utilities.logger import ytLogger as mylog


# http://stackoverflow.com/questions/2361945/detecting-consecutive-integers-in-a-list
[docs] def grid_sequences(grids): g_iter = sorted(grids, key=lambda g: g.id) for _, g in groupby(enumerate(g_iter), lambda i_x1: i_x1[0] - i_x1[1].id): seq = [v[1] for v in g] yield seq
[docs] class IOHandlerAthenaPP(BaseIOHandler): _particle_reader = False _dataset_type = "athena_pp" def __init__(self, ds): super().__init__(ds) self._handle = ds._handle def _read_particles( self, fields_to_read, type, args, grid_list, count_list, conv_factors ): pass def _read_fluid_selection(self, chunks, selector, fields, size): chunks = list(chunks) if any((ftype != "athena_pp" for ftype, fname in fields)): raise NotImplementedError f = self._handle rv = {} for field in fields: # Always use *native* 64-bit float. rv[field] = np.empty(size, dtype="=f8") ng = sum(len(c.objs) for c in chunks) mylog.debug( "Reading %s cells of %s fields in %s blocks", size, [f2 for f1, f2 in fields], ng, ) last_dname = None for field in fields: ftype, fname = field dname, fdi = self.ds._field_map[fname] if dname != last_dname: ds = f[f"/{dname}"] ind = 0 for chunk in chunks: for gs in grid_sequences(chunk.objs): start = gs[0].id - gs[0]._id_offset end = gs[-1].id - gs[-1]._id_offset + 1 data = ds[fdi, start:end, :, :, :].transpose() for i, g in enumerate(gs): ind += g.select(selector, data[..., i], rv[field], ind) last_dname = dname return rv def _read_chunk_data(self, chunk, fields): f = self._handle rv = {} for g in chunk.objs: rv[g.id] = {} if len(fields) == 0: return rv for field in fields: ftype, fname = field dname, fdi = self.ds._field_map[fname] ds = f[f"/{dname}"] for gs in grid_sequences(chunk.objs): start = gs[0].id - gs[0]._id_offset end = gs[-1].id - gs[-1]._id_offset + 1 data = ds[fdi, start:end, :, :, :].transpose() for i, g in enumerate(gs): rv[g.id][field] = np.asarray(data[..., i], "=f8") return rv