Source code for yt.utilities.command_line

import argparse
import base64
import json
import os
import pprint
import sys
import textwrap
from typing import Any

import numpy as np
from more_itertools import always_iterable

from yt.config import ytcfg
from yt.funcs import (
    download_file,
    enable_plugins,
    ensure_dir,
    ensure_dir_exists,
    get_git_version,
    mylog,
    update_git,
)
from yt.loaders import load
from yt.utilities.exceptions import YTFieldNotParseable, YTUnidentifiedDataType
from yt.utilities.metadata import get_metadata
from yt.visualization.plot_window import ProjectionPlot, SlicePlot

# isort: off
# This needs to be set before importing startup_tasks
ytcfg["yt", "internals", "command_line"] = True  # isort: skip
from yt.startup_tasks import parser, subparsers  # isort: skip # noqa: E402

# isort: on

# loading field plugins for backward compatibility, since this module
# used to do "from yt.mods import *"

try:
    enable_plugins()
except FileNotFoundError:
    pass

_default_colormap = ytcfg.get("yt", "default_colormap")


def _fix_ds(arg, *args, **kwargs):
    if os.path.isdir(f"{arg}") and os.path.exists(f"{arg}/{arg}"):
        ds = load(f"{arg}/{arg}", *args, **kwargs)
    elif os.path.isdir(f"{arg}.dir") and os.path.exists(f"{arg}.dir/{arg}"):
        ds = load(f"{arg}.dir/{arg}", *args, **kwargs)
    elif arg.endswith(".index"):
        ds = load(arg[:-10], *args, **kwargs)
    else:
        ds = load(arg, *args, **kwargs)
    return ds


def _add_arg(sc, arg):
    if isinstance(arg, str):
        arg = _common_options[arg].copy()
    elif isinstance(arg, tuple):
        exclusive, *args = arg
        if exclusive:
            grp = sc.add_mutually_exclusive_group()
        else:
            grp = sc.add_argument_group()
        for arg in args:
            _add_arg(grp, arg)
        return
    argc = dict(arg.items())
    argnames = []
    if "short" in argc:
        argnames.append(argc.pop("short"))
    if "longname" in argc:
        argnames.append(argc.pop("longname"))
    sc.add_argument(*argnames, **argc)


def _print_failed_source_update(reinstall=False):
    print()
    print("The yt package is not installed from a git repository,")
    print("so you must update this installation manually.")
    if "Continuum Analytics" in sys.version or "Anaconda" in sys.version:
        # see http://stackoverflow.com/a/21318941/1382869 for why we need
        # to check both Continuum *and* Anaconda
        print()
        print("Since it looks like you are using a python installation")
        print("that is managed by conda, you may want to do:")
        print()
        print("    $ conda update yt")
        print()
        print("to update your yt installation.")
        if reinstall:
            print()
            print("To update all of your packages, you can do:")
            print()
            print("    $ conda update --all")
    else:
        print("If you manage your python dependencies with pip, you may")
        print("want to do:")
        print()
        print("    $ python -m pip install -U yt")
        print()
        print("to update your yt installation.")


def _print_installation_information(path):
    import yt

    print()
    print("yt module located at:")
    print(f"    {path}")
    if "YT_DEST" in os.environ:
        spath = os.path.join(os.environ["YT_DEST"], "src", "yt-supplemental")
        if os.path.isdir(spath):
            print("The supplemental repositories are located at:")
            print(f"    {spath}")
    print()
    print("The current version of yt is:")
    print()
    print("---")
    print(f"Version = {yt.__version__}")
    vstring = get_git_version(path)
    if vstring is not None:
        print(f"Changeset = {vstring.strip()}")
    print("---")
    return vstring


[docs] class FileStreamer: final_size = None next_sent = 0 chunksize = 100 * 1024 def __init__(self, f, final_size=None): location = f.tell() f.seek(0, os.SEEK_END) self.final_size = f.tell() - location f.seek(location) self.f = f def __iter__(self): from tqdm import tqdm with tqdm( total=self.final_size, desc="Uploading file", unit="B", unit_scale=True ) as pbar: while self.f.tell() < self.final_size: yield self.f.read(self.chunksize) pbar.update(self.chunksize)
_subparsers = {None: subparsers} _subparsers_description = { "config": "Get and set configuration values for yt", }
[docs] class YTCommandSubtype(type): def __init__(cls, name, b, d): type.__init__(cls, name, b, d) if cls.name is None: return if cls.subparser not in _subparsers: try: description = _subparsers_description[cls.subparser] except KeyError: description = cls.subparser parent_parser = argparse.ArgumentParser(add_help=False) p = subparsers.add_parser( cls.subparser, help=description, description=description, parents=[parent_parser], ) _subparsers[cls.subparser] = p.add_subparsers( title=cls.subparser, dest=cls.subparser ) sp = _subparsers[cls.subparser] for name in always_iterable(cls.name): sc = sp.add_parser(name, description=cls.description, help=cls.description) sc.set_defaults(func=cls.run) for arg in cls.args: _add_arg(sc, arg)
[docs] class YTCommand(metaclass=YTCommandSubtype): args: tuple[str | dict[str, Any], ...] = () name: str | list[str] | None = None description: str = "" aliases = () ndatasets: int = 1 subparser: str | None = None
[docs] @classmethod def run(cls, args): self = cls() # Check for some things we know; for instance, comma separated # field names should be parsed as tuples. if getattr(args, "field", None) is not None and "," in args.field: if args.field.count(",") > 1: raise YTFieldNotParseable(args.field) args.field = tuple(_.strip() for _ in args.field.split(",")) if getattr(args, "weight", None) is not None and "," in args.weight: if args.weight.count(",") > 1: raise YTFieldNotParseable(args.weight) args.weight = tuple(_.strip() for _ in args.weight.split(",")) # Some commands need to be run repeatedly on datasets # In fact, this is the rule and the opposite is the exception # BUT, we only want to parse the arguments once. if cls.ndatasets > 1: self(args) else: ds_args = getattr(args, "ds", []) if len(ds_args) > 1: datasets = args.ds for ds in datasets: args.ds = ds self(args) elif len(ds_args) == 0: datasets = [] self(args) else: args.ds = getattr(args, "ds", [None])[0] self(args)
[docs] class GetParameterFiles(argparse.Action): def __call__(self, parser, namespace, values, option_string=None): if len(values) == 1: datasets = values elif len(values) == 2 and namespace.basename is not None: datasets = [ "%s%04i" % (namespace.basename, r) for r in range(int(values[0]), int(values[1]), namespace.skip) ] else: datasets = values namespace.ds = [_fix_ds(ds) for ds in datasets]
_common_options = { "all": { "longname": "--all", "dest": "reinstall", "default": False, "action": "store_true", "help": ( "Reinstall the full yt stack in the current location. " "This option has been deprecated and will not have any " "effect." ), }, "ds": { "short": "ds", "action": GetParameterFiles, "nargs": "+", "help": "datasets to run on", }, "ods": { "action": GetParameterFiles, "dest": "ds", "nargs": "*", "help": "(Optional) datasets to run on", }, "axis": { "short": "-a", "longname": "--axis", "action": "store", "type": int, "dest": "axis", "default": 4, "help": "Axis (4 for all three)", }, "log": { "short": "-l", "longname": "--log", "action": "store_true", "dest": "takelog", "default": True, "help": "Use logarithmic scale for image", }, "linear": { "longname": "--linear", "action": "store_false", "dest": "takelog", "help": "Use linear scale for image", }, "text": { "short": "-t", "longname": "--text", "action": "store", "type": str, "dest": "text", "default": None, "help": "Textual annotation", }, "field": { "short": "-f", "longname": "--field", "action": "store", "type": str, "dest": "field", "default": "density", "help": ("Field to color by, use a comma to separate field tuple values"), }, "weight": { "short": "-g", "longname": "--weight", "action": "store", "type": str, "dest": "weight", "default": None, "help": ( "Field to weight projections with, " "use a comma to separate field tuple values" ), }, "cmap": { "longname": "--colormap", "action": "store", "type": str, "dest": "cmap", "default": _default_colormap, "help": "Colormap name", }, "zlim": { "short": "-z", "longname": "--zlim", "action": "store", "type": float, "dest": "zlim", "default": None, "nargs": 2, "help": "Color limits (min, max)", }, "dex": { "longname": "--dex", "action": "store", "type": float, "dest": "dex", "default": None, "nargs": 1, "help": "Number of dex above min to display", }, "width": { "short": "-w", "longname": "--width", "action": "store", "type": float, "dest": "width", "default": None, "help": "Width in specified units", }, "unit": { "short": "-u", "longname": "--unit", "action": "store", "type": str, "dest": "unit", "default": "1", "help": "Desired axes units", }, "center": { "short": "-c", "longname": "--center", "action": "store", "type": float, "dest": "center", "default": None, "nargs": 3, "help": "Center, space separated (-1 -1 -1 for max)", }, "max": { "short": "-m", "longname": "--max", "action": "store_true", "dest": "max", "default": False, "help": "Center the plot on the density maximum", }, "bn": { "short": "-b", "longname": "--basename", "action": "store", "type": str, "dest": "basename", "default": None, "help": "Basename of datasets", }, "output": { "short": "-o", "longname": "--output", "action": "store", "type": str, "dest": "output", "default": "frames/", "help": "Folder in which to place output images", }, "outputfn": { "short": "-o", "longname": "--output", "action": "store", "type": str, "dest": "output", "default": None, "help": "File in which to place output", }, "skip": { "short": "-s", "longname": "--skip", "action": "store", "type": int, "dest": "skip", "default": 1, "help": "Skip factor for outputs", }, "proj": { "short": "-p", "longname": "--projection", "action": "store_true", "dest": "projection", "default": False, "help": "Use a projection rather than a slice", }, "maxw": { "longname": "--max-width", "action": "store", "type": float, "dest": "max_width", "default": 1.0, "help": "Maximum width in code units", }, "minw": { "longname": "--min-width", "action": "store", "type": float, "dest": "min_width", "default": 50, "help": "Minimum width in units of smallest dx (default: 50)", }, "nframes": { "short": "-n", "longname": "--nframes", "action": "store", "type": int, "dest": "nframes", "default": 100, "help": "Number of frames to generate", }, "slabw": { "longname": "--slab-width", "action": "store", "type": float, "dest": "slab_width", "default": 1.0, "help": "Slab width in specified units", }, "slabu": { "short": "-g", "longname": "--slab-unit", "action": "store", "type": str, "dest": "slab_unit", "default": "1", "help": "Desired units for the slab", }, "ptype": { "longname": "--particle-type", "action": "store", "type": int, "dest": "ptype", "default": 2, "help": "Particle type to select", }, "agecut": { "longname": "--age-cut", "action": "store", "type": float, "dest": "age_filter", "default": None, "nargs": 2, "help": "Bounds for the field to select", }, "uboxes": { "longname": "--unit-boxes", "action": "store_true", "dest": "unit_boxes", "help": "Display heldsul unit boxes", }, "thresh": { "longname": "--threshold", "action": "store", "type": float, "dest": "threshold", "default": None, "help": "Density threshold", }, "dm_only": { "longname": "--all-particles", "action": "store_false", "dest": "dm_only", "default": True, "help": "Use all particles", }, "grids": { "longname": "--show-grids", "action": "store_true", "dest": "grids", "default": False, "help": "Show the grid boundaries", }, "time": { "longname": "--time", "action": "store_true", "dest": "time", "default": False, "help": "Print time in years on image", }, "contours": { "longname": "--contours", "action": "store", "type": int, "dest": "contours", "default": None, "help": "Number of Contours for Rendering", }, "contour_width": { "longname": "--contour_width", "action": "store", "type": float, "dest": "contour_width", "default": None, "help": "Width of gaussians used for rendering.", }, "enhance": { "longname": "--enhance", "action": "store_true", "dest": "enhance", "default": False, "help": "Enhance!", }, "valrange": { "short": "-r", "longname": "--range", "action": "store", "type": float, "dest": "valrange", "default": None, "nargs": 2, "help": "Range, space separated", }, "up": { "longname": "--up", "action": "store", "type": float, "dest": "up", "default": None, "nargs": 3, "help": "Up, space separated", }, "viewpoint": { "longname": "--viewpoint", "action": "store", "type": float, "dest": "viewpoint", "default": [1.0, 1.0, 1.0], "nargs": 3, "help": "Viewpoint, space separated", }, "pixels": { "longname": "--pixels", "action": "store", "type": int, "dest": "pixels", "default": None, "help": "Number of Pixels for Rendering", }, "halos": { "longname": "--halos", "action": "store", "type": str, "dest": "halos", "default": "multiple", "help": "Run halo profiler on a 'single' halo or 'multiple' halos.", }, "halo_radius": { "longname": "--halo_radius", "action": "store", "type": float, "dest": "halo_radius", "default": 0.1, "help": "Constant radius for profiling halos if using hop output files with no " + "radius entry. Default: 0.1.", }, "halo_radius_units": { "longname": "--halo_radius_units", "action": "store", "type": str, "dest": "halo_radius_units", "default": "1", "help": "Units for radius used with --halo_radius flag. " + "Default: '1' (code units).", }, "halo_hop_style": { "longname": "--halo_hop_style", "action": "store", "type": str, "dest": "halo_hop_style", "default": "new", "help": "Style of hop output file. " + "'new' for yt_hop files and 'old' for enzo_hop files.", }, "halo_dataset": { "longname": "--halo_dataset", "action": "store", "type": str, "dest": "halo_dataset", "default": None, "help": "HaloProfiler dataset.", }, "make_profiles": { "longname": "--make_profiles", "action": "store_true", "default": False, "help": "Make profiles with halo profiler.", }, "make_projections": { "longname": "--make_projections", "action": "store_true", "default": False, "help": "Make projections with halo profiler.", }, }
[docs] class YTInstInfoCmd(YTCommand): name = ["instinfo", "version"] args = ( { "short": "-u", "longname": "--update-source", "action": "store_true", "default": False, "help": "Update the yt installation, if able", }, { "short": "-o", "longname": "--output-version", "action": "store", "default": None, "dest": "outputfile", "help": "File into which the current revision number will be stored", }, ) description = """ Get some information about the yt installation """ def __call__(self, opts): import importlib.resources as importlib_resources path = os.path.dirname(importlib_resources.files("yt")) vstring = _print_installation_information(path) if vstring is not None: print("This installation CAN be automatically updated.") if opts.update_source: update_git(path) elif opts.update_source: _print_failed_source_update() if vstring is not None and opts.outputfile is not None: open(opts.outputfile, "w").write(vstring)
[docs] class YTLoadCmd(YTCommand): name = "load" description = """ Load a single dataset into an IPython instance """ args = ("ds",) def __call__(self, args): if args.ds is None: print("Could not load file.") sys.exit() import IPython import yt local_ns = {} local_ns["ds"] = args.ds local_ns["pf"] = args.ds local_ns["yt"] = yt try: from traitlets.config.loader import Config except ImportError: from IPython.config.loader import Config cfg = Config() # prepend sys.path with current working directory sys.path.insert(0, "") IPython.embed(config=cfg, user_ns=local_ns)
[docs] class YTMapserverCmd(YTCommand): args = ( "proj", "field", "weight", "linear", "center", "width", "cmap", { "short": "-a", "longname": "--axis", "action": "store", "type": int, "dest": "axis", "default": 0, "help": "Axis", }, { "short": "-o", "longname": "--host", "action": "store", "type": str, "dest": "host", "default": None, "help": "IP Address to bind on", }, {"short": "ds", "nargs": 1, "type": str, "help": "The dataset to load."}, ) name = "mapserver" description = """ Serve a plot in a GMaps-style interface """ def __call__(self, args): from yt.frontends.ramses.data_structures import RAMSESDataset from yt.visualization.mapserver.pannable_map import PannableMapServer # For RAMSES datasets, use the bbox feature to make the dataset load faster if RAMSESDataset._is_valid(args.ds) and args.center and args.width: kwa = { "bbox": [ [c - args.width / 2 for c in args.center], [c + args.width / 2 for c in args.center], ] } else: kwa = {} ds = _fix_ds(args.ds, **kwa) if args.center and args.width: center = args.center width = args.width ad = ds.box( left_edge=[c - args.width / 2 for c in args.center], right_edge=[c + args.width / 2 for c in args.center], ) else: center = [0.5] * 3 width = 1.0 ad = ds.all_data() if args.axis >= 4: print("Doesn't work with multiple axes!") return if args.projection: p = ProjectionPlot( ds, args.axis, args.field, weight_field=args.weight, data_source=ad, center=center, width=width, ) else: p = SlicePlot( ds, args.axis, args.field, data_source=ad, center=center, width=width ) p.set_log("all", args.takelog) p.set_cmap("all", args.cmap) PannableMapServer(p.data_source, args.field, args.takelog, args.cmap) try: import bottle except ImportError as e: raise ImportError( "The mapserver functionality requires the bottle " "package to be installed. Please install using `pip " "install bottle`." ) from e bottle.debug(True) if args.host is not None: colonpl = args.host.find(":") if colonpl >= 0: port = int(args.host.split(":")[-1]) args.host = args.host[:colonpl] else: port = 8080 bottle.run(server="auto", host=args.host, port=port) else: bottle.run(server="auto")
[docs] class YTPastebinCmd(YTCommand): name = "pastebin" args = ( { "short": "-l", "longname": "--language", "action": "store", "default": None, "dest": "language", "help": "Use syntax highlighter for the file in language", }, { "short": "-L", "longname": "--languages", "action": "store_true", "default": False, "dest": "languages", "help": "Retrieve a list of supported languages", }, { "short": "-e", "longname": "--encoding", "action": "store", "default": "utf-8", "dest": "encoding", "help": "Specify the encoding of a file (default is " "utf-8 or guessing if available)", }, { "short": "-b", "longname": "--open-browser", "action": "store_true", "default": False, "dest": "open_browser", "help": "Open the paste in a web browser", }, { "short": "-p", "longname": "--private", "action": "store_true", "default": False, "dest": "private", "help": "Paste as private", }, { "short": "-c", "longname": "--clipboard", "action": "store_true", "default": False, "dest": "clipboard", "help": "File to output to; else, print.", }, {"short": "file", "type": str}, ) description = """ Post a script to an anonymous pastebin """ def __call__(self, args): from yt.utilities import lodgeit as lo lo.main( args.file, languages=args.languages, language=args.language, encoding=args.encoding, open_browser=args.open_browser, private=args.private, clipboard=args.clipboard, )
[docs] class YTPastebinGrabCmd(YTCommand): args = ({"short": "number", "type": str},) name = "pastebin_grab" description = """ Print an online pastebin to STDOUT for local use. """ def __call__(self, args): from yt.utilities import lodgeit as lo lo.main(None, download=args.number)
[docs] class YTPlotCmd(YTCommand): args = ( "width", "unit", "bn", "proj", "center", "zlim", "axis", "field", "weight", "skip", "cmap", "output", "grids", "time", "ds", "max", "log", "linear", { "short": "-fu", "longname": "--field-unit", "action": "store", "type": str, "dest": "field_unit", "default": None, "help": "Desired field units", }, { "longname": "--show-scale-bar", "action": "store_true", "help": "Annotate the plot with the scale", }, ) name = "plot" description = """ Create a set of images """ def __call__(self, args): ds = args.ds center = args.center if args.center == (-1, -1, -1): mylog.info("No center fed in; seeking.") v, center = ds.find_max("density") if args.max: v, center = ds.find_max("density") elif args.center is None: center = 0.5 * (ds.domain_left_edge + ds.domain_right_edge) center = np.array(center) if ds.dimensionality < 3: dummy_dimensions = np.nonzero(ds.index.grids[0].ActiveDimensions <= 1) axes = dummy_dimensions[0][0] elif args.axis == 4: axes = range(3) else: axes = args.axis unit = args.unit if unit is None: unit = "unitary" if args.width is None: width = None else: width = (args.width, args.unit) for ax in always_iterable(axes): mylog.info("Adding plot for axis %i", ax) if args.projection: plt = ProjectionPlot( ds, ax, args.field, center=center, width=width, weight_field=args.weight, ) else: plt = SlicePlot(ds, ax, args.field, center=center, width=width) if args.grids: plt.annotate_grids() if args.time: plt.annotate_timestamp() if args.show_scale_bar: plt.annotate_scale() if args.field_unit: plt.set_unit(args.field, args.field_unit) plt.set_cmap(args.field, args.cmap) plt.set_log(args.field, args.takelog) if args.zlim: plt.set_zlim(args.field, *args.zlim) ensure_dir_exists(args.output) plt.save(os.path.join(args.output, f"{ds}"))
[docs] class YTRPDBCmd(YTCommand): name = "rpdb" description = """ Connect to a currently running (on localhost) rpd session. Commands run with --rpdb will trigger an rpdb session with any uncaught exceptions. """ args = ( { "short": "-t", "longname": "--task", "action": "store", "default": 0, "dest": "task", "help": "Open a web browser.", }, ) def __call__(self, args): from . import rpdb rpdb.run_rpdb(int(args.task))
[docs] class YTStatsCmd(YTCommand): args = ( "outputfn", "bn", "skip", "ds", "field", { "longname": "--max", "action": "store_true", "default": False, "dest": "max", "help": "Display maximum of field requested through -f option.", }, { "longname": "--min", "action": "store_true", "default": False, "dest": "min", "help": "Display minimum of field requested through -f option.", }, ) name = "stats" description = """ Print stats and max/min value of a given field (if requested), for one or more datasets (default field is density) """ def __call__(self, args): ds = args.ds ds.print_stats() vals = {} field = ds._get_field_info(args.field) if args.max: vals["max"] = ds.find_max(field) print(f"Maximum {field.name}: {vals['max'][0]:0.5e} at {vals['max'][1]}") if args.min: vals["min"] = ds.find_min(field) print(f"Minimum {field.name}: {vals['min'][0]:0.5e} at {vals['min'][1]}") if args.output is not None: t = ds.current_time.to("yr") with open(args.output, "a") as f: f.write(f"{ds} ({t:0.5e})\n") if "min" in vals: f.write( f"Minimum {field.name} is {vals['min'][0]:0.5e} at {vals['min'][1]}\n" ) if "max" in vals: f.write( f"Maximum {field.name} is {vals['max'][0]:0.5e} at {vals['max'][1]}\n" )
[docs] class YTUpdateCmd(YTCommand): args = ("all",) name = "update" description = """ Update the yt installation to the most recent version """ def __call__(self, opts): import importlib.resources as importlib_resources path = os.path.dirname(importlib_resources.files("yt")) vstring = _print_installation_information(path) if vstring is not None: print() print("This installation CAN be automatically updated.") update_git(path) else: _print_failed_source_update(opts.reinstall)
[docs] class YTDeleteImageCmd(YTCommand): args = ({"short": "delete_hash", "type": str},) description = """ Delete image from imgur.com. """ name = "delete_image" def __call__(self, args): import urllib.error import urllib.request headers = {"Authorization": f"Client-ID {ytcfg.get('yt', 'imagebin_api_key')}"} delete_url = ytcfg.get("yt", "imagebin_delete_url") req = urllib.request.Request( delete_url.format(delete_hash=args.delete_hash), headers=headers, method="DELETE", ) try: response = urllib.request.urlopen(req).read().decode() except urllib.error.HTTPError as e: print("ERROR", e) return {"deleted": False} rv = json.loads(response) if "success" in rv and rv["success"]: print("\nImage successfully deleted!\n") else: print() print("Something has gone wrong! Here is the server response:") print() pprint.pprint(rv)
[docs] class YTUploadImageCmd(YTCommand): args = ({"short": "file", "type": str},) description = """ Upload an image to imgur.com. Must be PNG. """ name = "upload_image" def __call__(self, args): import urllib.error import urllib.parse import urllib.request filename = args.file if not filename.endswith(".png"): print("File must be a PNG file!") return 1 headers = {"Authorization": f"Client-ID {ytcfg.get('yt', 'imagebin_api_key')}"} image_data = base64.b64encode(open(filename, "rb").read()) parameters = { "image": image_data, type: "base64", "name": filename, "title": f"{filename} uploaded by yt", } data = urllib.parse.urlencode(parameters).encode("utf-8") req = urllib.request.Request( ytcfg.get("yt", "imagebin_upload_url"), data=data, headers=headers ) try: response = urllib.request.urlopen(req).read().decode() except urllib.error.HTTPError as e: print("ERROR", e) return {"uploaded": False} rv = json.loads(response) if "data" in rv and "link" in rv["data"]: print() print("Image successfully uploaded! You can find it at:") print(f" {rv['data']['link']}") print() print("If you'd like to delete it, use the following") print(f" yt delete_image {rv['data']['deletehash']}") print() else: print() print("Something has gone wrong! Here is the server response:") print() pprint.pprint(rv)
[docs] class YTUploadFileCmd(YTCommand): args = ({"short": "file", "type": str},) description = """ Upload a file to yt's curldrop. """ name = "upload" def __call__(self, args): from yt.utilities.on_demand_imports import _requests as requests fs = iter(FileStreamer(open(args.file, "rb"))) upload_url = ytcfg.get("yt", "curldrop_upload_url") r = requests.put(upload_url + "/" + os.path.basename(args.file), data=fs) print() print(r.text)
[docs] class YTConfigLocalConfigHandler:
[docs] def load_config(self, args) -> None: import os from yt.config import YTConfig from yt.utilities.configure import CONFIG local_config_file = YTConfig.get_local_config_file() global_config_file = YTConfig.get_global_config_file() local_exists = os.path.exists(local_config_file) global_exists = os.path.exists(global_config_file) local_arg_exists = hasattr(args, "local") global_arg_exists = hasattr(args, "global") config_file: str | None = None if getattr(args, "local", False): config_file = local_config_file elif getattr(args, "global", False): config_file = global_config_file else: if local_exists and global_exists: s = ( "Yt detected a local and a global configuration file, refusing " "to proceed.\n" f"Local config file: {local_config_file}\n" f"Global config file: {global_config_file}" ) # Only print the info about "--global" and "--local" if they exist if local_arg_exists and global_arg_exists: s += ( "\n" # missing eol from previous string "Specify which one you want to use using the `--local` or the " "`--global` flags." ) sys.exit(s) elif local_exists: config_file = local_config_file elif global_exists: config_file = global_config_file if config_file is None: print("WARNING: no configuration file installed.", file=sys.stderr) else: print( f"INFO: reading configuration file: {config_file}", file=sys.stderr ) CONFIG.read(config_file) self.config_file = config_file
_global_local_args = ( { "short": "--local", "action": "store_true", "help": "Store the configuration in the local configuration file.", }, { "short": "--global", "action": "store_true", "help": "Store the configuration in the global configuration file.", }, )
[docs] class YTConfigGetCmd(YTCommand, YTConfigLocalConfigHandler): subparser = "config" name = "get" description = "get a config value" args = ( {"short": "section", "help": "The section containing the option."}, {"short": "option", "help": "The option to retrieve."}, *_global_local_args, ) def __call__(self, args): from yt.utilities.configure import get_config self.load_config(args) print(get_config(args.section, args.option))
[docs] class YTConfigSetCmd(YTCommand, YTConfigLocalConfigHandler): subparser = "config" name = "set" description = "set a config value" args = ( {"short": "section", "help": "The section containing the option."}, {"short": "option", "help": "The option to set."}, {"short": "value", "help": "The value to set the option to."}, *_global_local_args, ) def __call__(self, args): from yt.utilities.configure import set_config self.load_config(args) if self.config_file is None: self.config_file = os.path.join(os.getcwd(), "yt.toml") print( f"INFO: configuration will be written to {self.config_file}", file=sys.stderr, ) set_config(args.section, args.option, args.value, self.config_file)
[docs] class YTConfigRemoveCmd(YTCommand, YTConfigLocalConfigHandler): subparser = "config" name = "rm" description = "remove a config option" args = ( {"short": "section", "help": "The section containing the option."}, {"short": "option", "help": "The option to remove."}, *_global_local_args, ) def __call__(self, args): from yt.utilities.configure import rm_config self.load_config(args) rm_config(args.section, args.option, self.config_file)
[docs] class YTConfigListCmd(YTCommand, YTConfigLocalConfigHandler): subparser = "config" name = "list" description = "show the config content" args = _global_local_args def __call__(self, args): from yt.utilities.configure import write_config self.load_config(args) write_config(sys.stdout)
[docs] class YTConfigPrintPath(YTCommand, YTConfigLocalConfigHandler): subparser = "config" name = "print-path" description = "show path to the config file" args = _global_local_args def __call__(self, args): self.load_config(args) print(self.config_file)
[docs] class YTSearchCmd(YTCommand): args = ( { "short": "-o", "longname": "--output", "action": "store", "type": str, "dest": "output", "default": "yt_index.json", "help": "File in which to place output", }, { "longname": "--check-all", "short": "-a", "help": "Attempt to load every file", "action": "store_true", "default": False, "dest": "check_all", }, { "longname": "--full", "short": "-f", "help": "Output full contents of parameter file", "action": "store_true", "default": False, "dest": "full_output", }, ) description = """ Attempt to find outputs that yt can recognize in directories. """ name = "search" def __call__(self, args): from yt.utilities.object_registries import output_type_registry candidates = [] for base, dirs, files in os.walk(".", followlinks=True): print("(% 10i candidates) Examining %s" % (len(candidates), base)) recurse = [] if args.check_all: candidates.extend([os.path.join(base, _) for _ in files]) for _, otr in sorted(output_type_registry.items()): c, r = otr._guess_candidates(base, dirs, files) candidates.extend([os.path.join(base, _) for _ in c]) recurse.append(r) if len(recurse) > 0 and not all(recurse): del dirs[:] # Now we have a ton of candidates. We're going to do something crazy # and try to load each one. records = [] for i, c in enumerate(sorted(candidates)): print("(% 10i/% 10i) Evaluating %s" % (i, len(candidates), c)) try: record = get_metadata(c, args.full_output) except YTUnidentifiedDataType: continue records.append(record) with open(args.output, "w") as f: json.dump(records, f, indent=4) print(f"Identified {len(records)} records output to {args.output}")
[docs] class YTDownloadData(YTCommand): args = ( { "short": "filename", "action": "store", "type": str, "help": "The name of the file to download", "nargs": "?", "default": "", }, { "short": "location", "action": "store", "type": str, "nargs": "?", "help": "The location in which to place the file, can be " '"supp_data_dir", "test_data_dir", or any valid ' "path on disk. ", "default": "", }, { "longname": "--overwrite", "short": "-c", "help": "Overwrite existing file.", "action": "store_true", "default": False, }, { "longname": "--list", "short": "-l", "help": "Display all available files.", "action": "store_true", "default": False, }, ) description = """ Download a file from http://yt-project.org/data and save it to a particular location. Files can be saved to the locations provided by the "test_data_dir" or "supp_data_dir" configuration entries, or any valid path to a location on disk. """ name = "download" def __call__(self, args): if args.list: self.get_list() return if not args.filename: raise RuntimeError( "You need to provide a filename. See --help " "for details or use --list to get available " "datasets." ) elif not args.location: raise RuntimeError( "You need to specify download location. See --help for details." ) data_url = f"http://yt-project.org/data/{args.filename}" if args.location in ["test_data_dir", "supp_data_dir"]: data_dir = ytcfg.get("yt", args.location) if data_dir == "/does/not/exist": raise RuntimeError(f"'{args.location}' is not configured!") else: data_dir = args.location if not os.path.exists(data_dir): print(f"The directory '{data_dir}' does not exist. Creating...") ensure_dir(data_dir) data_file = os.path.join(data_dir, args.filename) if os.path.exists(data_file) and not args.overwrite: raise OSError(f"File '{data_file}' exists and overwrite=False!") print(f"Attempting to download file: {args.filename}") fn = download_file(data_url, data_file) if not os.path.exists(fn): raise OSError(f"The file '{args.filename}' did not download!!") print(f"File: {args.filename} downloaded successfully to {data_file}")
[docs] def get_list(self): import urllib.request data = ( urllib.request.urlopen("http://yt-project.org/data/datafiles.json") .read() .decode("utf8") ) data = json.loads(data) for key in data: for ds in data[key]: ds["fullname"] = ds["url"].replace("http://yt-project.org/data/", "") print("{fullname} ({size}) type: {code}".format(**ds)) for line in textwrap.wrap(ds["description"]): print("\t", line)
[docs] def run_main(): args = parser.parse_args() # The following is a workaround for a nasty Python 3 bug: # http://bugs.python.org/issue16308 # http://bugs.python.org/issue9253 try: args.func except AttributeError: parser.print_help() sys.exit(0) args.func(args)
if __name__ == "__main__": run_main()