Source code for COT.helpers.helper

#!/usr/bin/env python
#
# helper.py - Abstract provider of a non-Python helper program.
#
# February 2015, Glenn F. Matthews
# Copyright (c) 2015-2016 the COT project developers.
# See the COPYRIGHT.txt file at the top-level directory of this distribution
# and at https://github.com/glennmatthews/cot/blob/master/COPYRIGHT.txt.
#
# This file is part of the Common OVF Tool (COT) project.
# It is subject to the license terms in the LICENSE.txt file found in the
# top-level directory of this distribution and at
# https://github.com/glennmatthews/cot/blob/master/LICENSE.txt. No part
# of COT, including this file, may be copied, modified, propagated, or
# distributed except according to the terms contained in the LICENSE.txt file.

"""Common interface for providers of non-Python helper programs.

Provides the ability to install the program if not already present,
and the ability to run the program as well.

**Classes**

.. autosummary::
  :nosignatures:

  HelperNotFoundError
  HelperError
  Helper
  PackageManager

**Attributes**

.. autosummary::
  :nosignatures:

  helpers

**Functions**

.. autosummary::
  :nosignatures:

  check_call
  check_output
"""

import logging

import os
import os.path
import contextlib
import errno
import re
import shutil
import subprocess

try:
    from subprocess import check_output as _check_output
except ImportError:
    # Python 2.6 doesn't have subprocess.check_output.
    # Implement it ourselves:
    def _check_output(args, **kwargs):
        process = subprocess.Popen(args,
                                   stdout=subprocess.PIPE,
                                   **kwargs)
        stdout, _ = process.communicate()
        retcode = process.poll()
        if retcode:
            e = subprocess.CalledProcessError(retcode, " ".join(args))
            e.output = stdout
            raise e
        return stdout

import tarfile
import distutils.spawn
from distutils.version import StrictVersion
import requests

from verboselogs import VerboseLogger

logging.setLoggerClass(VerboseLogger)
logger = logging.getLogger(__name__)

try:
    # Python 3.x
    from tempfile import TemporaryDirectory
except ImportError:
    # Python 2.x
    import tempfile

    @contextlib.contextmanager
    def TemporaryDirectory(suffix='',   # noqa: N802
                           prefix='tmp',
                           dirpath=None):
        """Create a temporary directory and make sure it's deleted later.

        Reimplementation of Python 3's ``tempfile.TemporaryDirectory``.
        For the parameters, see :class:`tempfile.TemporaryDirectory`.

        Yields:
          str: Path to temporary directory
        """
        tempdir = tempfile.mkdtemp(suffix, prefix, dirpath)
        try:
            yield tempdir
        finally:
            shutil.rmtree(tempdir)


[docs]class HelperNotFoundError(OSError): """A helper program cannot be located."""
[docs]class HelperError(EnvironmentError): """A helper program exited with non-zero return code."""
[docs]class HelperDict(dict): """Dictionary of Helper objects by name. Similar to :class:`collections.defaultdict` but takes the key as a parameter to the factory. """
[docs] def __init__(self, factory, *args, **kwargs): """Create the given dictionary with the given factory class/method. Args: factory (object): Factory class or method to be called to populate a new entry in response to :meth:`__missing__`. For the other parameters, see :class:`dict`. """ super(HelperDict, self).__init__(*args, **kwargs) self.factory = factory
def __missing__(self, key): """Method called when accessing a non-existent key. Automatically populate the given key with an instance of the factory. Args: key (object): Key that was not yet defined in this dictionary. Returns: object: Result of calling ``self.factory(key)`` """ self[key] = self.factory(key) return self[key]
[docs]class Helper(object): """A provider of a non-Python helper program. **Static Methods** .. autosummary:: :nosignatures: cp download_and_expand_tgz mkdir **Instance Properties** .. autosummary:: name info_uri installable installed path version **Instance Methods** .. autosummary:: :nosignatures: call install unsure_how_to_install """
[docs] def __init__(self, name, info_uri=None, version_args=None, version_regexp="([0-9.]+)"): """Initializer. Args: name (str): Name of helper executable info_uri (str): URI to refer to for more info about this helper. version_args (list): Args to pass to the helper to get its version. Defaults to ``['--version']`` if unset. version_regexp (str): Regexp to get the version number from the output of the command. """ self._name = name self._info_uri = info_uri self._path = None self._installed = None self._version = None if not version_args: version_args = ['--version'] self._version_args = version_args self._version_regexp = version_regexp
def __bool__(self): """A helper is True if installed and False if not installed.""" return self.installed # For Python 2.x compatibility: __nonzero__ = __bool__ _provider_package = {} """Mapping of package manager name to package name to install with it.""" USER_INTERFACE = None """User interface (if any) available to helpers.""" @property def name(self): """Name of the helper program.""" return self._name @property def info_uri(self): """URI for more information about this helper.""" return self._info_uri @property def path(self): """Discovered path to the helper.""" if not self._path: logger.verbose("Checking for helper executable %s", self.name) self._path = distutils.spawn.find_executable(self.name) if self._path: logger.verbose("%s is at %s", self.name, self.path) self._installed = True else: logger.verbose("No path to %s found", self.name) return self._path @property def installed(self): """Whether this helper program is installed and available to run.""" if self._installed is None: self._installed = (self.path is not None) return self._installed @property def installable(self): """Whether COT is capable of installing this program on this system.""" for pm_name in self._provider_package: if helpers[pm_name]: return True return False @property def version(self): """Release version of the associated helper program.""" if self.installed and not self._version: output = self.call(self._version_args, require_success=False) match = re.search(self._version_regexp, output) if not match: raise RuntimeError("Unable to find version number in output:" "\n{0}".format(output)) self._version = StrictVersion(match.group(1)) return self._version
[docs] def call(self, args, capture_output=True, **kwargs): """Call the helper program with the given arguments. Args: args (list): List of arguments to the helper program. capture_output (boolean): If ``True``, stdout/stderr will be redirected to a buffer and returned, instead of being displayed to the user. (I.e., :func:`check_output` will be invoked instead of :func:`check_call`) Returns: str: Captured stdout/stderr if :attr:`capture_output` is True, else ``None``. For the other parameters, see :func:`check_call` and :func:`check_output`. Raises: HelperNotFoundError: if the helper was not previously installed, and the user declines to install it at this time. """ if not self.path: if self.USER_INTERFACE and not self.USER_INTERFACE.confirm( "{0} does not appear to be installed.\nTry to install it?" .format(self.name)): raise HelperNotFoundError( 1, "Unable to proceed without helper program '{0}'. " "Please install it and/or check your $PATH." .format(self.name)) self.install() args.insert(0, self.name) if capture_output: return check_output(args, **kwargs) else: check_call(args, **kwargs) return None
[docs] def install(self): """Install the helper program. Raises: NotImplementedError: if not :attr:`installable` HelperError: if installation is attempted but fails. Subclasses should not override this method but instead should provide an appropriate implementation of the :meth:`_install` method. """ if self.installed: return if not self.installable: self.unsure_how_to_install() logger.info("Installing '%s'...", self.name) # Call the subclass implementation self._install() # Make sure it actually performed as promised if not self.path: raise HelperNotFoundError( 1, "Installation did not raise an exception, but afterward, " "unable to locate {0}!".format(self.name)) logger.info("Successfully installed '%s'", self.name)
[docs] def unsure_how_to_install(self): """Raise a NotImplementedError about missing install logic.""" msg = "Unsure how to install {0}.".format(self.name) if self.info_uri: msg += "\nRefer to {0} for information".format(self.info_uri) raise NotImplementedError(msg)
def _install(self): """Subclass-specific implementation of installation logic.""" # Default implementation for pm_name, package in self._provider_package.items(): if helpers[pm_name]: helpers[pm_name].install_package(package) return # We shouldn't get here under normal call flow and logic. self.unsure_how_to_install() @staticmethod @contextlib.contextmanager
[docs] def download_and_expand_tgz(url): """Context manager for downloading and expanding a .tar.gz file. Creates a temporary directory, downloads the specified URL into the directory, unzips and untars the file into this directory, then yields to the given block. When the block exits, the temporary directory and its contents are deleted. :: with download_and_expand_tgz("http://example.com/foo.tgz") as d: # archive contents have been extracted to 'd' ... # d is automatically cleaned up. Args: url (str): URL of a .tgz or .tar.gz file to download. Yields: str: Temporary directory path where the archive has been extracted. """ with TemporaryDirectory(prefix="cot_helper") as d: logger.debug("Temporary directory is %s", d) logger.verbose("Downloading and extracting %s", url) response = requests.get(url, stream=True) tgz = os.path.join(d, 'helper.tgz') with open(tgz, 'wb') as f: shutil.copyfileobj(response.raw, f) del response logger.debug("Extracting %s", tgz) # the "with tarfile.open()..." construct isn't supported in 2.6 tarf = tarfile.open(tgz, "r:gz") try: tarf.extractall(path=d) finally: tarf.close() try: yield d finally: logger.debug("Cleaning up temporary directory %s", d)
@staticmethod
[docs] def mkdir(directory, permissions=493): # 493 == 0o755 """Check whether the given target directory exists, and create if not. Args: directory (str): Directory to check/create. permissions (int): Permission mask to set when creating a directory. Default is ``0o755``. """ if os.path.isdir(directory): # TODO: permissions check, update permissions if needed return True elif os.path.exists(directory): raise RuntimeError("Path {0} exists but is not a directory!" .format(directory)) try: logger.verbose("Creating directory " + directory) os.makedirs(directory, permissions) return True except OSError as e: logger.verbose("Directory %s creation failed, trying sudo", directory) try: check_call(['sudo', 'mkdir', '-p', '--mode=%o' % permissions, directory]) except HelperError: # That failed too - re-raise the original exception raise e return True
@staticmethod
[docs] def cp(src, dest): """Copy the given src to the given dest, using sudo if needed. Args: src (str): Source path. dest (str): Destination path. Returns: bool: True Raises: HelperError: if file copying fails """ logger.verbose("Copying %s to %s", src, dest) try: shutil.copy(src, dest) except (OSError, IOError) as e: logger.verbose('Installation error, trying sudo.') try: check_call(['sudo', 'cp', src, dest]) except HelperError: # That failed too - re-raise the original exception raise e return True
helpers = HelperDict(Helper) """Dictionary of concrete Helper subclasses to be populated at load time."""
[docs]class PackageManager(Helper): """Helper program with additional API method install_package()."""
[docs] def install_package(self, package): """Install the requested package if needed. Args: package (str): Name of the package to install. """ raise NotImplementedError("install_package not implemented!")
[docs]def check_call(args, require_success=True, retry_with_sudo=False, **kwargs): """Wrapper for :func:`subprocess.check_call`. Unlike :meth:`check_output` below, this does not redirect stdout or stderr; all output from the subprocess will be sent to the system stdout/stderr as normal. Args: args (list): Command to invoke and its associated args require_success (boolean): If ``False``, do not raise an error when the command exits with a return code other than 0 retry_with_sudo (boolean): If ``True``, if the command gets an exception, prepend ``sudo`` to the command and try again. For the other parameters, see :func:`subprocess.check_call`. Raises: HelperNotFoundError: if the command doesn't exist (instead of a :class:`OSError`) HelperError: if :attr:`require_success` is not ``False`` and the command returns a value other than 0 (instead of a :class:`subprocess.CalledProcessError`). OSError: as :func:`subprocess.check_call`. """ cmd = args[0] logger.info("Calling '%s'...", " ".join(args)) try: subprocess.check_call(args, **kwargs) except OSError as e: if retry_with_sudo and (e.errno == errno.EPERM or e.errno == errno.EACCES): check_call(['sudo'] + args, require_success=require_success, retry_with_sudo=False, **kwargs) return if e.errno != errno.ENOENT: raise raise HelperNotFoundError(e.errno, "Unable to locate helper program '{0}'. " "Please check your $PATH.".format(cmd)) except subprocess.CalledProcessError as e: if require_success: if retry_with_sudo: check_call(['sudo'] + args, require_success=require_success, retry_with_sudo=False, **kwargs) return raise HelperError(e.returncode, "Helper program '{0}' exited with error {1}" .format(cmd, e.returncode)) logger.info("...done") logger.debug("%s exited successfully", cmd)
[docs]def check_output(args, require_success=True, retry_with_sudo=False, **kwargs): """Wrapper for :func:`subprocess.check_output`. Automatically redirects stderr to stdout, captures both to a buffer, and generates a debug message with the stdout contents. Args: args (list): Command to invoke and its associated args require_success (boolean): If ``False``, do not raise an error when the command exits with a return code other than 0 retry_with_sudo (boolean): If ``True``, if the command gets an exception, prepend ``sudo`` to the command and try again. For the other parameters, see :func:`subprocess.check_output`. Returns: str: Captured stdout/stderr from the command Raises: HelperNotFoundError: if the command doesn't exist (instead of a :class:`OSError`) HelperError: if :attr:`require_success` is not ``False`` and the command returns a value other than 0 (instead of a :class:`subprocess.CalledProcessError`). OSError: as :func:`subprocess.check_output`. """ cmd = args[0] logger.info("Calling '%s' and capturing its output...", " ".join(args)) try: stdout = _check_output(args, stderr=subprocess.STDOUT, **kwargs).decode('ascii', 'ignore') except OSError as e: if e.errno != errno.ENOENT: raise raise HelperNotFoundError(e.errno, "Unable to locate helper program '{0}'. " "Please check your $PATH.".format(cmd)) except subprocess.CalledProcessError as e: stdout = e.output.decode() if require_success: if retry_with_sudo: return check_output(['sudo'] + args, require_success=require_success, retry_with_sudo=False, **kwargs) raise HelperError(e.returncode, "Helper program '{0}' exited with error {1}:" "\n> {2}\n{3}".format(cmd, e.returncode, " ".join(args), stdout)) logger.info("...done") logger.verbose("%s output:\n%s", cmd, stdout) return stdout
[docs]def helper_select(choices): """Select the first helper that is available from the given list. If no helper in the list is currently installed, will install the first installable helper from the list. Raises: HelperNotFoundError: if no valid helper is available or installable. Args: choices (list): List of helpers, in order from most preferred to least preferred. Each choice in this list can be either: * a string (the helper name, such as "mkisofs") * a tuple of (name, minimum version) such as ("qemu-img", "2.1.0"). Returns: Helper: The selected helper class instance. """ for choice in choices: if isinstance(choice, str): # Helper name only, no version constraints name = choice min_version = None else: # Tuple of (name, version) (name, vers) = choice min_version = StrictVersion(vers) if helpers[name]: if min_version is None or helpers[name].version >= min_version: return helpers[name] # OK, nothing yet installed. So what can we install? for choice in choices: if isinstance(choice, str): name = choice min_version = None else: (name, vers) = choice min_version = StrictVersion(vers) if helpers[name].installable: helpers[name].install() if min_version is None or helpers[name].version >= min_version: return helpers[name] raise HelperNotFoundError("No helper available or installable!")