Source code for COT.edit_hardware

#!/usr/bin/env python
#
# edit_hardware.py - Implements "edit-hardware" sub-command
#
# September 2013, Glenn F. Matthews
# Copyright (c) 2013-2016 the COT project developers.
# See the COPYRIGHT.txt file at the top-level directory of this distribution
# and at https://github.com/glennmatthews/cot/blob/master/COPYRIGHT.txt.
#
# This file is part of the Common OVF Tool (COT) project.
# It is subject to the license terms in the LICENSE.txt file found in the
# top-level directory of this distribution and at
# https://github.com/glennmatthews/cot/blob/master/LICENSE.txt. No part
# of COT, including this file, may be copied, modified, propagated, or
# distributed except according to the terms contained in the LICENSE.txt file.

"""Module for editing hardware details of a VM.

**Functions**

.. autosummary::
  :nosignatures:

  expand_list_wildcard

**Classes**

.. autosummary::
  :nosignatures:

  COTEditHardware
"""

import argparse
import logging
import re
import textwrap
import warnings

from .data_validation import canonicalize_ide_subtype, canonicalize_nic_subtype
from .data_validation import canonicalize_scsi_subtype
from .data_validation import no_whitespace, mac_address
from .data_validation import non_negative_int, positive_int, InvalidInputError
from .submodule import COTSubmodule

logger = logging.getLogger(__name__)


[docs]class COTEditHardware(COTSubmodule): """Edit hardware information (CPUs, RAM, NICs, etc.). Inherited attributes: :attr:`~COTGenericSubmodule.UI`, :attr:`~COTSubmodule.package`, :attr:`~COTSubmodule.output` Attributes: :attr:`profiles`, :attr:`delete_all_other_profiles`, :attr:`cpus`, :attr:`memory`, :attr:`nics`, :attr:`nic_types`, :attr:`mac_addresses_list`, :attr:`nic_networks`, :attr:`nic_names`, :attr:`network_descriptions`, :attr:`serial_ports`, :attr:`serial_connectivity`, :attr:`scsi_subtypes`, :attr:`ide_subtypes`, :attr:`virtual_system_type` """ def __init__(self, ui): """Instantiate this submodule with the given UI.""" super(COTEditHardware, self).__init__(ui) self.profiles = None """Configuration profile(s) to edit.""" self.delete_all_other_profiles = False """Delete all profiles other than those set in :attr:`profiles`.""" self._cpus = None self._memory = None self._nics = None self._nic_types = None self.mac_addresses_list = None """List of MAC addresses to set.""" self.nic_networks = None """List of NIC-to-network mappings. Can use wildcards as described in :func:`expand_list_wildcard`. """ self.nic_names = None """List of NIC name strings. Can use wildcards as described in :func:`expand_list_wildcard`. """ self.network_descriptions = None """List of network description strings. Can use wildcards as described in :func:`expand_list_wildcard`. """ self._serial_ports = None self.serial_connectivity = None """List of serial connection strings.""" self._scsi_subtypes = None self._ide_subtypes = None self.virtual_system_type = None """Virtual system type""" @property def cpus(self): """Number of CPUs to set.""" return self._cpus @cpus.setter def cpus(self, value): try: value = int(value) except ValueError: raise InvalidInputError("cpus value must be an integer") if value < 1: raise InvalidInputError("CPU count must be at least 1") self.vm.platform.validate_cpu_count(value) self._cpus = value @property def memory(self): """Amount of RAM (in megabytes) to set.""" return self._memory # We like to see memory input in the form "4096M" or "4 GB" or "2 GiB" MEMORY_REGEXP = r"^\s*(\d+)\s*([mMgG])?i?[bB]?\s*$" @memory.setter def memory(self, value): value = str(value) match = re.match(self.MEMORY_REGEXP, value) if not match: raise InvalidInputError("Could not parse memory string '{0}'" .format(value)) mem_value = int(match.group(1)) if mem_value <= 0: raise InvalidInputError("Memory must be greater than zero") if match.group(2) == 'M' or match.group(2) == 'm': # default logger.debug("Memory specified in megabytes") elif match.group(2) == 'G' or match.group(2) == 'g': logger.debug("Memory specified in gigabytes - " "converting to megabytes") mem_value *= 1024 else: # Try to be clever and guess the units if mem_value <= 64: logger.warning("Memory units not specified, " "guessing '%s' means '%s GiB'", mem_value, mem_value) mem_value *= 1024 else: logger.warning("Memory units not specified, " "guessing '%s' means '%s MiB'", mem_value, mem_value) self.vm.platform.validate_memory_amount(mem_value) self._memory = mem_value @property def nics(self): """Number of NICs to set.""" return self._nics @nics.setter def nics(self, value): try: value = int(value) except ValueError: raise InvalidInputError("nics value must be an integer") self.vm.platform.validate_nic_count(value) self._nics = value @property def nic_type(self): """NIC type string to set. .. deprecated:: 1.5 Use :attr:`nic_types` instead. """ warnings.warn("Use nic_types instead", DeprecationWarning) if self.nic_types is None or len(self.nic_types) == 0: return None if len(self.nic_types) > 1: raise TypeError("nic_types has more than one element ({0}). " "Use nic_types instead of nic_type." .format(self.nic_types)) return self.nic_types[0] @nic_type.setter def nic_type(self, value): warnings.warn("Use nic_types instead", DeprecationWarning) self.nic_types = [value] @property def nic_types(self): """List of NIC type strings to set.""" return self._nic_types @nic_types.setter def nic_types(self, value): value = [canonicalize_nic_subtype(v) for v in value] self.vm.platform.validate_nic_types(value) self._nic_types = value @property def serial_ports(self): """Serial port count to set.""" return self._serial_ports @serial_ports.setter def serial_ports(self, value): try: value = int(value) except ValueError: raise InvalidInputError("serial_ports value must be an integer") self.vm.platform.validate_serial_count(value) self._serial_ports = value @property def scsi_subtype(self): """SCSI controller subtype string to set. .. deprecated:: 1.5 Use :attr:`scsi_subtypes` instead. """ warnings.warn("Use scsi_subtypes instead", DeprecationWarning) if self.scsi_subtypes is None or len(self.scsi_subtypes) == 0: return None if len(self.scsi_subtypes) > 1: raise TypeError("scsi_subtypes has more than one element ({0}). " "Use scsi_subtypes instead of scsi_subtype." .format(self.scsi_subtypes)) return self.scsi_subtypes[0] @scsi_subtype.setter def scsi_subtype(self, value): warnings.warn("Use scsi_subtypes instead", DeprecationWarning) self.scsi_subtypes = [value] @property def scsi_subtypes(self): """SCSI controller subtype string(s) to set.""" return self._scsi_subtypes @scsi_subtypes.setter def scsi_subtypes(self, value): value = [canonicalize_scsi_subtype(v) for v in value] value = [v for v in value if v] # TODO: self.vm.platform.validate_scsi_types(value) self._scsi_subtypes = value @property def ide_subtype(self): """IDE controller subtype string to set. .. deprecated:: 1.5 Use :attr:`ide_subtypes` instead. """ warnings.warn("Use ide_subtypes instead", DeprecationWarning) if self.ide_subtypes is None or len(self.ide_subtypes) == 0: return None if len(self.ide_subtypes) > 1: raise TypeError("ide_subtypes has more than one element ({0}). " "Use ide_subtypes instead of ide_subtype." .format(self.ide_subtypes)) return self.ide_subtypes[0] @ide_subtype.setter def ide_subtype(self, value): warnings.warn("Use ide_subtypes instead", DeprecationWarning) self.ide_subtypes = [value] @property def ide_subtypes(self): """IDE controller subtype string(s) to set.""" return self._ide_subtypes @ide_subtypes.setter def ide_subtypes(self, value): value = [canonicalize_ide_subtype(v) for v in value] value = [v for v in value if v] # TODO: self.vm.platform.validate_ide_types(value) self._ide_subtypes = value
[docs] def ready_to_run(self): """Check whether the module is ready to :meth:`run`. :returns: ``(True, ready_message)`` or ``(False, reason_why_not)`` """ # Need some work to do! if not any([x is not None and x is not False for x in [ self.profiles, self.delete_all_other_profiles, self.cpus, self.memory, self.nics, self.nic_types, self.mac_addresses_list, self.nic_networks, self.nic_names, self.network_descriptions, self.serial_ports, self.serial_connectivity, self.scsi_subtypes, self.ide_subtypes, self.virtual_system_type, ]]): return (False, "No work requested! Please specify at least " "one hardware change") return super(COTEditHardware, self).ready_to_run()
def _run_create_new_profiles(self): """Create new profiles as needed, with user input. Helper for :meth:`_run_update_profiles`. """ # Create new profiles as needed profile_list = self.vm.config_profiles for profile in self.profiles: # pylint: disable=not-an-iterable if profile not in profile_list: self.UI.confirm_or_die( "Profile '{0}' does not exist. Create it?" .format(profile)) label = self.UI.get_input( "Please enter a label for this configuration profile", profile) desc = self.UI.get_input( "Please enter a description for this " "configuration profile", label) self.vm.create_configuration_profile(profile, label=label, description=desc) def _run_delete_other_profiles(self): """Delete all profiles except those requested. Helper for :meth:`_run_update_profiles`. """ if self.profiles is None: self.UI.confirm_or_die( "--delete-all-other-profiles was specified but no " "--profiles was specified. Really proceed to delete ALL " "configuration profiles?") profiles_to_delete = self.vm.config_profiles else: profiles_to_delete = list(set(self.vm.config_profiles) - set(self.profiles)) for profile in profiles_to_delete: if self.profiles is not None: if not self.UI.confirm("Delete profile {0}?".format(profile)): logger.verbose("Skipping deletion of profile %s", profile) continue # else (profiles == None) we already confirmed earlier self.vm.delete_configuration_profile(profile) def _run_update_profiles(self): """Handle profile changes. Helper for :meth:`run`.""" if self.profiles is not None: # Warn user about non-profile-aware properties if self.virtual_system_type is not None: self.UI.confirm_or_die( "VirtualSystemType is not filtered by configuration" " profile. Requested system type(s) '{0}' will be set for" " ALL profiles, not just profile(s) {1}. Continue?" .format(" ".join(self.virtual_system_type), self.profiles)) if self.network_descriptions is not None: self.UI.confirm_or_die( "Network descriptions are not filtered by configuration" " profile. Requested network descriptions will be set for" " networks across ALL profiles, not just profile(s) {0}." " Continue?".format(self.profiles)) self._run_create_new_profiles() if self.delete_all_other_profiles: self._run_delete_other_profiles() def _run_update_nics(self): """Handle NIC changes. Helper for :meth:`run`.""" vm = self.vm nics_dict = vm.get_nic_count(self.profiles) if self.nics is not None: for (profile, count) in nics_dict.items(): if self.nics < count: self.UI.confirm_or_die( "Profile {0} currently has {1} NIC(s). " "Delete {2} NIC(s) to reduce to {3} total?" .format(profile, count, (count - self.nics), self.nics)) vm.set_nic_count(self.nics, self.profiles) if self.nic_types is not None: vm.set_nic_types(self.nic_types, self.profiles) nics_dict = vm.get_nic_count(self.profiles) max_nics = max(nics_dict.values()) if self.mac_addresses_list is not None: vm.set_nic_mac_addresses(self.mac_addresses_list, self.profiles) if self.nic_names is not None: names = expand_list_wildcard(self.nic_names, max_nics) vm.set_nic_names(names, self.profiles) def _run_update_networks(self): """Handle network changes. Helper for :meth:`run`.""" vm = self.vm nics_dict = vm.get_nic_count(self.profiles) max_nics = max(nics_dict.values()) if self.network_descriptions is None: new_descs = [] else: new_descs = expand_list_wildcard(self.network_descriptions, max_nics) if self.nic_networks is None: # Just rename existing networks, instead of making new ones for network, desc in zip(vm.networks, new_descs): # Despite the name, create_network can also be used to # update an existing network. vm.create_network(network, desc) if self.nic_networks is not None: existing_networks = vm.networks new_networks = expand_list_wildcard(self.nic_networks, max_nics) for network in new_networks: if new_descs: new_desc = new_descs.pop(0) else: new_desc = None if network not in existing_networks: self.UI.confirm_or_die( "Network {0} is not currently defined. " "Create it?".format(network)) if not new_desc: new_desc = self.UI.get_input( "Please enter a description for this network", network) # create or update vm.create_network(network, new_desc) vm.set_nic_networks(new_networks, self.profiles) def _run_update_serial(self): """Handle serial port changes. Helper for :meth:`run`.""" if self.serial_ports is not None: serial_dict = self.vm.get_serial_count(self.profiles) for (profile, count) in serial_dict.items(): if self.serial_ports < count: self.UI.confirm_or_die( "Profile {0} currently has {1} serial port(s). " "Delete {2} port(s) to reduce to {3} total?" .format(profile, count, (count - self.serial_ports), self.serial_ports)) self.vm.set_serial_count(self.serial_ports, self.profiles) if self.serial_connectivity is not None: serial_dict = self.vm.get_serial_count(self.profiles) for (profile, count) in serial_dict.items(): if len(self.serial_connectivity) < count: self.UI.confirm_or_die( "There are {0} serial port(s) under profile {1}, but " "you have specified connectivity information for only " "{2}. " "\nThe remaining ports will be unreachable. Continue?" .format(count, profile, len(self.serial_connectivity))) self.vm.set_serial_connectivity(self.serial_connectivity, self.profiles)
[docs] def run(self): """Do the actual work of this submodule. :raises InvalidInputError: if :func:`ready_to_run` reports ``False`` """ super(COTEditHardware, self).run() self._run_update_profiles() vm = self.vm if self.virtual_system_type is not None: vm.system_types = self.virtual_system_type if self.cpus is not None: vm.set_cpu_count(self.cpus, self.profiles) if self.memory is not None: vm.set_memory(self.memory, self.profiles) self._run_update_nics() self._run_update_networks() self._run_update_serial() if self.scsi_subtypes is not None: vm.set_scsi_subtypes(self.scsi_subtypes, self.profiles) if self.ide_subtypes is not None: vm.set_ide_subtypes(self.ide_subtypes, self.profiles)
[docs] def create_subparser(self): """Create 'edit-hardware' CLI subparser.""" wrapper = textwrap.TextWrapper(width=self.UI.terminal_width - 1, initial_indent=' ', subsequent_indent=' ') p = self.UI.add_subparser( 'edit-hardware', add_help=False, formatter_class=argparse.RawDescriptionHelpFormatter, usage=self.UI.fill_usage("edit-hardware", [ "PACKAGE [-o OUTPUT] -v TYPE [TYPE2 ...]", "PACKAGE [-o OUTPUT] \ [-p PROFILE [PROFILE2 ...] [--delete-all-other-profiles]] [-c CPUS] \ [-m MEMORY] [-n NICS] [--nic-types TYPE [TYPE2 ...]] \ [-N NETWORK [NETWORK2 ...]] [-M MAC1 [MAC2 ...]] \ [--nic-names NAME1 [NAME2 ...]] [-s SERIAL_PORTS] [-S URI1 [URI2 ...]] \ [--scsi-subtypes TYPE [TYPE2 ...]] [--ide-subtypes TYPE [TYPE2 ...]]", ]), help="Edit virtual machine hardware properties of an OVF", description="Edit hardware properties of the specified OVF or OVA", epilog=("Notes:\n" + wrapper.fill( "The --nic-names, --nic-networks, and --network-descriptions" " options support the use of a wildcard value to" " automatically generate a series of consecutively numbered" " strings. The syntax for the wildcard option is '{' followed" " by a number to start incrementing from, followed by '}'." " See examples below." ) + "\n\n" + self.UI.fill_examples([ ('Create a new profile named "1CPU-8GB" with 1 CPU and 8' ' gigabytes of RAM', 'cot edit-hardware csr1000v.ova --output csr1000v_custom.ova' ' --profile 1CPU-4GB --cpus 1 --memory 8GB'), ("Wildcard example - without caring about how many NICs are" " defined in the input OVA, rename all of the NICs in the" " output OVA as 'Ethernet0/10', 'Ethernet0/11'," " 'Ethernet0/12', etc., and map them to networks" " 'Ethernet0_10', 'Ethernet0_11', 'Ethernet0_12', etc.," " which are described as 'Data network 1', 'Data network 2'," " etc.", 'cot edit-hardware input.ova -o output.ova' ' --nic-names "Ethernet0/{10}"' ' --nic-networks "Ethernet0_{10}"' ' --network-descriptions "Data network {1}"'), ("Combination of fixed and wildcarded names - rename the NICs" " in the output OVA as 'mgmt', 'eth0', 'eth1', 'eth2'...", 'cot edit-hardware input.ova -o output.ova' ' --nic-names "mgmt" "eth{0}"'), ]))) g = p.add_argument_group("general options") g.add_argument('-h', '--help', action='help', help="Show this help message and exit") g.add_argument('-o', '--output', help="Name/path of new OVF/OVA package to create " "instead of updating the existing OVF") g.add_argument('-v', '--virtual-system-type', nargs='+', type=no_whitespace, metavar=('TYPE', 'TYPE2'), help="Change virtual system type(s) supported by " "this OVF/OVA package.") g.add_argument('-p', '--profiles', nargs='+', type=no_whitespace, metavar=('PROFILE', 'PROFILE2'), help="Make hardware changes only under the given " "configuration profile(s). (default: changes apply " "to all profiles)") g.add_argument('--delete-all-other-profiles', action='store_true', help="Delete all configuration profiles other than" " those specified with the --profiles option") g = p.add_argument_group("computational hardware options") g.add_argument('-c', '--cpus', type=positive_int, help="Set the number of CPUs.") g.add_argument('-m', '--memory', help="Set the amount of RAM. " '(Examples: "4096M", "4 GiB")') g = p.add_argument_group("network interface options") g.add_argument('-n', '--nics', type=non_negative_int, help="Set the number of NICs.") g.add_argument('--nic-types', nargs='+', metavar=('TYPE', 'TYPE2'), help="Set the hardware type(s) for all NICs. " "(default: do not change existing NICs, and new " "NICs added will match the existing type(s).)") g.add_argument('--nic-names', action='append', nargs='+', metavar=('NAME1', 'NAME2'), help="Specify a list of one or more NIC names or " "patterns to apply to NIC devices. See Notes.") g.add_argument('-N', '--nic-networks', nargs='+', metavar=('NETWORK', 'NETWORK2'), help="Specify a series of one or more network names " "or patterns to map NICs to. See Notes.") g.add_argument('--network-descriptions', nargs='+', metavar=('NAME1', 'NAME2'), help="Specify a list of one or more network " "descriptions or patterns to apply to the networks. " "See Notes.") g.add_argument('-M', '--mac-addresses-list', type=mac_address, metavar=('MAC1', 'MAC2'), action='append', nargs='+', help="Specify a list of MAC addresses for the NICs. " "If N MACs are specified, the first (N-1) NICs " "will receive the first (N-1) MACs, and all " "remaining NICs will receive the Nth MAC") g = p.add_argument_group("serial port options") g.add_argument('-s', '--serial-ports', type=non_negative_int, help="Set the number of serial ports.") g.add_argument('-S', '--serial-connectivity', metavar=('URI1', 'URI2'), action='append', nargs='+', help="Specify a series of connectivity strings " '(URIs such as "telnet://localhost:9101") to map ' "serial ports to. If fewer URIs than serial ports " "are specified, the remaining ports will be " "unmapped.""") g = p.add_argument_group("disk and disk controller options") g.add_argument('--scsi-subtypes', action='append', nargs='+', metavar=('TYPE', 'TYPE2'), help='Set resource subtype(s) (such as "lsilogic" or ' '"virtio") for all SCSI controllers. If an empty ' "string is provided, any existing subtype will be " "removed.") g.add_argument('--ide-subtypes', action='append', nargs='+', metavar=('TYPE', 'TYPE2'), help='Set resource subtype(s) (such as "virtio") for ' "all IDE controllers. If an empty string is " "provided, any existing subtype will be removed.") p.add_argument('PACKAGE', help="OVF descriptor or OVA file to edit") p.set_defaults(instance=self)
[docs]def expand_list_wildcard(name_list, length): """Expand a list containing a wildcard to the desired length. Since various items (NIC names, network names, etc.) are often named or numbered sequentially, we provide this API to allow the user to specify a wildcard value to permit automatically expanding a list of input strings to the desired length. The syntax for the wildcard option is ``{`` followed by a number (indicating the starting index for the name) followed by ``}``. Examples: ``["eth{0}"]`` ``Expands to ["eth0", "eth1", "eth2", ...]`` ``["mgmt0" "eth{10}"]`` ``Expands to ["mgmt0", "eth10", "eth11", "eth12", ...]`` :param list name_list: List of names to assign. :param list length: Length to expand to :return: Expanded list """ if len(name_list) < length: logger.info("Expanding list %s to %d entries", name_list, length) # Extract the pattern and remove it from the list pattern = name_list[-1] name_list = name_list[:-1] # Look for the magic string in the pattern match = re.search(r"{(\d+)}", pattern) if match: i = int(match.group(1)) else: i = 0 while len(name_list) < length: name_list.append(re.sub(r"{\d+}", str(i), pattern)) i += 1 logger.info("New list is %s", name_list) return name_list