Source code for yt.data_objects.level_sets.clump_validators

import numpy as np

from yt.utilities.lib.misc_utilities import gravitational_binding_energy
from yt.utilities.operator_registry import OperatorRegistry
from yt.utilities.physical_constants import gravitational_constant_cgs as G

clump_validator_registry = OperatorRegistry()


[docs] def add_validator(name, function): clump_validator_registry[name] = ClumpValidator(function)
[docs] class ClumpValidator: r""" A ClumpValidator is a function that takes a clump and returns True or False as to whether the clump is valid and shall be kept. """ def __init__(self, function, args=None, kwargs=None): self.function = function self.args = args if self.args is None: self.args = [] self.kwargs = kwargs if self.kwargs is None: self.kwargs = {} def __call__(self, clump): return self.function(clump, *self.args, **self.kwargs)
def _gravitationally_bound( clump, use_thermal_energy=True, use_particles=True, truncate=True, num_threads=0 ): "True if clump is gravitationally bound." use_particles &= ("all", "particle_mass") in clump.data.ds.field_info bulk_velocity = clump.quantities.bulk_velocity(use_particles=use_particles) kinetic = ( 0.5 * ( clump["gas", "mass"] * ( (bulk_velocity[0] - clump["gas", "velocity_x"]) ** 2 + (bulk_velocity[1] - clump["gas", "velocity_y"]) ** 2 + (bulk_velocity[2] - clump["gas", "velocity_z"]) ** 2 ) ).sum() ) if use_thermal_energy: kinetic += ( clump["gas", "mass"] * clump["gas", "specific_thermal_energy"] ).sum() if use_particles: kinetic += ( 0.5 * ( clump["all", "particle_mass"] * ( (bulk_velocity[0] - clump["all", "particle_velocity_x"]) ** 2 + (bulk_velocity[1] - clump["all", "particle_velocity_y"]) ** 2 + (bulk_velocity[2] - clump["all", "particle_velocity_z"]) ** 2 ) ).sum() ) if use_particles: m = np.concatenate( [clump["gas", "mass"].in_cgs(), clump["all", "particle_mass"].in_cgs()] ) px = np.concatenate( [clump["index", "x"].in_cgs(), clump["all", "particle_position_x"].in_cgs()] ) py = np.concatenate( [clump["index", "y"].in_cgs(), clump["all", "particle_position_y"].in_cgs()] ) pz = np.concatenate( [clump["index", "z"].in_cgs(), clump["all", "particle_position_z"].in_cgs()] ) else: m = clump["gas", "mass"].in_cgs() px = clump["index", "x"].in_cgs() py = clump["index", "y"].in_cgs() pz = clump["index", "z"].in_cgs() potential = clump.data.ds.quan( G * gravitational_binding_energy( m, px, py, pz, truncate, (kinetic / G).in_cgs(), num_threads=num_threads ), kinetic.in_cgs().units, ) if truncate and potential >= kinetic: return True return potential >= kinetic add_validator("gravitationally_bound", _gravitationally_bound) def _min_cells(clump, n_cells): "True if clump has a minimum number of cells." return clump["index", "ones"].size >= n_cells add_validator("min_cells", _min_cells)