Source code for model

#      ConCERO - a program to automate data format conversion and the execution of economic modelling software.
#      Copyright (C) 2018  CSIRO Energy Business Unit
#
#     This program is free software: you can redistribute it and/or modify
#     it under the terms of the GNU General Public License as published by
#     the Free Software Foundation, either version 3 of the License, or
#     (at your option) any later version.
#
#     This program is distributed in the hope that it will be useful,
#     but WITHOUT ANY WARRANTY; without even the implied warranty of
#     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#     GNU General Public License for more details.
#
#     You should have received a copy of the GNU General Public License
#     along with this program.  If not, see <https://www.gnu.org/licenses/>.

"""
Created on Feb 05 12:28:55 2018

.. sectionauthor:: Lyle Collins <Lyle.Collins@csiro.au>
.. codeauthor:: Lyle Collins <Lyle.Collins@csiro.au>
"""

import os
import subprocess
import contextlib

import concero.conf as conf
import concero.modfuncs as modfuncs
from concero.from_cero import FromCERO
from concero.to_cero import ToCERO
from concero.cero import CERO

@contextlib.contextmanager
def _modified_environ(*remove, wd=None, **update):
    """
    Temporarily updates the ``os.environ`` dictionary in-place.

    The ``os.environ`` dictionary is updated in-place so that the modification
    is sure to work in all situations.

    :param remove: Environment variables to remove.
    :param wd: Directory to change to (before any changes to environment variables are made). By default, current working directory.
    :param update: Dictionary of environment variables and values to add/update.

    Code for this function from: ``https://stackoverflow.com/questions/2059482/python-temporarily-modify-the-current-processs-environment``
    """
    for k, v in update.items():
        if not isinstance(v, str):
            update[k] = "%s" % v

    env = os.environ
    update = update or {}
    remove = remove or []

    # List of environment variables being updated or removed.
    stomped = (set(update.keys()) | set(remove)) & set(env.keys())
    # Environment variables and values to restore on exit.
    update_after = {k: env[k] for k in stomped}
    # Environment variables and values to remove on exit.
    remove_after = frozenset(k for k in update if k not in env)

    old_dir = os.getcwd()

    if wd:
        try:
            os.chdir(wd)
        except FileNotFoundError:
            # Attempt to create directory if it doesn't exist
            os.mkdir(wd)
            os.chdir(wd)

    try:
        env.update(update)
        [env.pop(k, None) for k in remove]
        yield
    finally:
        env.update(update_after)
        [env.pop(k) for k in remove_after]
        os.chdir(old_dir)


[docs]class Model(dict): _logger = conf.setup_logger(__name__) def __init__(self, model: dict, *args, parent: "Model"=None, **kwargs): """ :param model: A `dict` containing ``Model`` options. :param args: Passed to superclass (`dict`) at initialisation. :param "Model" parent: If provided, inherits all key-value pairs from ``parent``. :param kwargs: Passed to superclass (`dict`) at initialisation. """ defaults = {"name": "default_model_name", "cmds": [], "input_conf": [], "output_conf": [], "search_paths": [], "wd": None} defaults.update(model) if parent is None: parent = {} defaults.update(parent) super().__init__(defaults, *args, **kwargs) if self["name"] == "default_model_name": Model._logger.warning("Model not named - default name '%s' assigned." % self["name"]) # Command string processing if isinstance(self["cmds"], str): self["cmds"] = [self["cmds"]] if not self["cmds"]: Model._logger.info("No commands specified for model '%s'." % defaults["name"]) if not self["search_paths"]: self["search_paths"].append(os.path.abspath(".")) if isinstance(self["input_conf"], str): self["input_conf"] = [self["input_conf"]] if isinstance(self["output_conf"], str): self["output_conf"] = [self["output_conf"]] # Locate and load configuration files... for idx, input_conf in enumerate(self["input_conf"]): self["input_conf"][idx] = self.find_file(input_conf) par_dict = {"ref_dir": os.path.abspath(os.path.dirname(self["input_conf"][idx]))} self["input_conf"][idx] = FromCERO(self["input_conf"][idx], parent=par_dict) # Locate and load configuration files... for idx, output_conf in enumerate(self["output_conf"]): self["output_conf"][idx] = self.find_file(output_conf) par_dict = {"search_paths": os.path.abspath(os.path.dirname(self["output_conf"][idx]))} self["output_conf"][idx] = ToCERO(self["output_conf"][idx], parent=par_dict)
[docs] def is_valid(self, raise_exception=True): """ Checks the validity of ``self`` as a ``Model`` object. Method does not ensure runtime issues will not occur. :param bool raise_exception: :return bool: Returns `True` if ``self`` is a valid ``Model``. """ req_keys = ["name", "cmds", "input_conf", "output_conf"] if not all([k in self for k in req_keys]): msg = ("All models must have all of the keys: %s. Attempted to create model" + " with at least one of these keys missing.") % req_keys Model._logger.error(msg) if raise_exception: raise TypeError(msg) print(msg) return False for ic in self["input_conf"]: if not FromCERO.check_config(ic, raise_exception=raise_exception, runtime=False): return False for oc in self["output_conf"]: if not ToCERO.check_config(oc, raise_exception=raise_exception, runtime=False): return False return True
[docs] def run_checks(self, raise_exception=True): """ Performs runtime checks on ``self`` to ensure it is a valid Model object. Failure of runtime checks indicates that the model is not ready to run. :param bool raise_exception: If True, an exception is raised on check failure (as opposed to returning `False`). :return bool: """ for ic in self["input_conf"]: if not FromCERO.check_config(ic, raise_exception=raise_exception, runtime=True): return False return True
def check_config(self, raise_exception=False, runtime=False): if runtime: return self.run_checks(raise_exception=raise_exception) return self.is_valid(raise_exception=raise_exception)
[docs] def run(self, cero) -> 'CERO': """ Executes all data import/export operations (defined by ``input_conf`` and ``output_conf`` respectively) and the execution of any commands. :param pandas.DataFrame cero: A CERO that contains all necessary data for conversion to input files (for \ model execution). :return pandas.DataFrame: A CERO of relevant output data ('relevant' is defined by ``output_conf``). """ for input_conf in self["input_conf"]: input_conf.exec_procedures(cero) print("Completed converting CERO to model input files (%s). Now processing commands..." % self["name"]) with _modified_environ(wd=self["wd"], **self.get("env_vars", {})): for cmdobj in self["cmds"]: cmd = {"type": "shell", "shell": True} # Default command if isinstance(cmdobj, str): # cmd is interpreted as shell command by default # cmdobj = cmdobj.split(" ") cmd.update({"args": cmdobj}) elif isinstance(cmdobj, dict): cmd.update(cmdobj) # Add user updates if "args" not in cmd: raise ValueError("'args' must be provided for command of type 'dict'.") else: raise TypeError("Invalid command object in configuration file.") # Change to command-specific directory cmd_run_dir = cmd.pop("wd", self["wd"]) if cmd_run_dir: cmd_run_dir = os.path.abspath(cmd_run_dir) cmd_type = cmd.pop("type") # Execute commands msg = "In directory '%s', executing command '%s'." % (cmd_run_dir, cmd) Model._logger.info(msg) with _modified_environ(wd=cmd_run_dir, **cmd.pop("env_vars", {})): # Depending on cmd_type, execute command in different ways... if cmd_type in ["shell"]: args = cmd.pop("args") Model._logger.info("Executing shell command: %s, with keyword args: %s." % (args, cmd)) try: cmd["output"] = subprocess.check_output(args=args, stderr=subprocess.STDOUT, universal_newlines=True, **cmd) except subprocess.CalledProcessError as e: msg = ("Command '%s' failed with returncode: %s, and message:\n" + "%s\n" + "Program logs may have more information.") % (args, e.returncode, e.output) Model._logger.error(msg) print(msg) raise e Model._logger.info(cmd["output"]) print("Command returned: \n%s" % cmd["output"], end="") elif cmd_type in ["python_method"]: try: assert("func" in cmd) except AssertionError: raise ValueError("'func' must be defined for commands of type 'python_method'.") func = getattr(modfuncs, cmd.pop("func")) cmd["output"] = func(*cmd["args"], **cmd["kwargs"]) else: raise ValueError("Unsupported command type specified.") if not self["output_conf"]: return CERO.create_empty() ceros = [] for oc in self["output_conf"]: ceros.append(oc.create_cero()) try: cero = CERO.combine_ceros(ceros, overwrite=False) except CERO.CEROIndexConflict: raise RuntimeWarning("Attempts to duplicate the export of data - i.e. one or more data series are being " + "exported more than once (which should be avoided). The last procedure will define " + "the intended data.") cero = CERO.combine_ceros(ceros) return cero
def find_file(self, filename): orig_filename = filename filename = os.path.relpath(filename) for sp in self["search_paths"]: Model._logger.debug("Model.find_file(): testing path: %s" % os.path.join(sp, filename)) if os.path.isfile(os.path.join(sp, filename)): return os.path.join(sp, filename) else: raise FileNotFoundError("File '%s' not found." % orig_filename)