Source code for COT.file_reference

#!/usr/bin/env python
#
# file_reference.py - APIs abstracting away various ways to refer to a file.
#
# August 2015, Glenn F. Matthews
# Copyright (c) 2015-2017 the COT project developers.
# See the COPYRIGHT.txt file at the top-level directory of this distribution
# and at https://github.com/glennmatthews/cot/blob/master/COPYRIGHT.txt.
#
# This file is part of the Common OVF Tool (COT) project.
# It is subject to the license terms in the LICENSE.txt file found in the
# top-level directory of this distribution and at
# https://github.com/glennmatthews/cot/blob/master/LICENSE.txt. No part
# of COT, including this file, may be copied, modified, propagated, or
# distributed except according to the terms contained in the LICENSE.txt file.

"""Wrapper classes to abstract away differences between file sources.

**Classes**

.. autosummary::
  :nosignatures:

  FileReference
  FileOnDisk
  FileInTAR
"""

import logging
import os
import shutil
import tarfile

from contextlib import contextmanager, closing

from COT.data_validation import file_checksum

logger = logging.getLogger(__name__)


[docs]class FileReference(object): """Semi-abstract base class for file references."""
[docs] @classmethod def create(cls, container_path, filename, **kwargs): """Create a reference to a file in a container of some sort. Args: container_path (str): Absolute path to a container such as a directory or a TAR file. filename (str): Name of file within the container in question. **kwargs: See :meth:__init__() Returns: FileReference: instance of appropriate subclass """ if not os.path.isabs(container_path): logger.warning("Only absolute paths are accepted, but " 'got apparent relative path "%s".' "\nAttempting to convert it to an absolute path.", container_path) container_path = os.path.abspath(container_path) if not os.path.exists(container_path): raise IOError("Container path '{0}' does not exist" .format(container_path)) if os.path.isdir(container_path): return FileOnDisk(container_path, filename, **kwargs) elif tarfile.is_tarfile(container_path): return FileInTAR(container_path, filename, **kwargs) else: raise NotImplementedError("Don't know how to open container {0}!" .format(container_path))
[docs] def __init__(self, container_path, filename, checksum_algorithm=None, expected_checksum=None, expected_size=None): """Common initialization and validation logic. Args: container_path (str): Path to container (directory, TAR file, etc.) filename (str): Relative path within the container to the file itself checksum_algorithm (str): 'sha1', 'sha256', etc. expected_checksum (str): Expected checksum of the file, if any. expected_size (int): Expected size of the file, in bytes, if any. Raises: IOError: if the file does not actually exist or is not readable. """ if not os.path.isabs(container_path): logger.warning("Only absolute paths are accepted, but " 'got apparent relative path "%s".' "\nAttempting to convert it to an absolute path.", container_path) container_path = os.path.abspath(container_path) self.container_path = container_path self.filename = os.path.normpath(filename) self.checksum_algorithm = checksum_algorithm self._checksum = None self._size = None self.force_refresh = False logger.spam("Initing for file %s, expected_size %s," " expected_checksum %s", self.filename, expected_size, expected_checksum) if not self.exists: raise IOError("File '{0}' does not exist in {1}" .format(self.filename, self.container_path)) if expected_checksum is not None and (self.checksum != expected_checksum): logger.error("The %s checksum for file '%s' is expected to be:" "\n%s\nbut is actually:\n%s\n" "This file may have been tampered with!", self.checksum_algorithm, self.filename, expected_checksum, self.checksum) if expected_size is not None and self.size != int(expected_size): logger.warning("The size of file '%s' is expected to be %s bytes," " but is actually %s bytes.", self.filename, expected_size, self.size) # Should never fail this: assert self.exists
@property def checksum(self): """Checksum of the referenced file.""" if self.checksum_algorithm is None: return None if self._checksum is None or self.force_refresh: with self.open('rb') as file_obj: self._checksum = file_checksum(file_obj, self.checksum_algorithm) return self._checksum @property def exists(self): """Report whether this file actually exists.""" raise NotImplementedError @property def file_path(self): """Actual path to a real file, if any.""" return None @property def size(self): """Size of the referenced file, in bytes.""" raise NotImplementedError
[docs] @contextmanager def open(self, mode): """Open the file and yield a reference to the file object. Automatically closes the file when done. Some subclasses may not support all modes. Args: mode (str): Mode such as 'r', 'w', 'a', 'w+', etc. Yields: file: File object """ raise NotImplementedError
[docs] def refresh(self): """Make sure all information in this reference is still valid.""" # Cache the previously known values exp_size = self.size exp_checksum = self.checksum logger.spam("Refreshing FileReference for '%s', " "expected size %s, cksum %s", self.filename, exp_size, exp_checksum) result = True self.force_refresh = True if not self.exists: logger.error("File '%s' no longer exists!", self.filename) # keep force_refresh as True since we're in a bad state return False # Refresh the attributes and see if they've changed if self.size != exp_size and exp_size is not None: logger.warning("Size of file '%s' has changed" " from %s bytes to %s bytes.", self.filename, exp_size, self.size) result = False if self.checksum != exp_checksum and exp_checksum is not None: logger.error("The %s checksum of file '%s' has changed" " from\n%s\nto\n%s\n" "This file may have been tampered with!", self.checksum_algorithm, self.filename, exp_checksum, self.checksum) result = False return result
[docs]class FileOnDisk(FileReference): """Wrapper for a 'real' file on disk.""" @property def file_path(self): """Directory + filename.""" return os.path.join(self.container_path, self.filename) @property def exists(self): """True if the file exists on disk, else False.""" return os.path.exists(self.file_path) @property def size(self): """The size of this file, in bytes.""" if self._size is None or self.force_refresh: self._size = os.path.getsize(self.file_path) return self._size
[docs] @contextmanager def open(self, mode): """Open the file and return a reference to the file object. Args: mode (str): Mode such as 'r', 'w', 'a', 'w+', etc. Yields: file: File object """ with open(self.file_path, mode) as obj: yield obj
[docs] def copy_to(self, dest_dir): """Copy this file to the given destination directory. Args: dest_dir (str): Destination directory or filename. """ if self.file_path == os.path.join(dest_dir, self.filename): return logger.debug("Copying %s to %s", self.file_path, dest_dir) shutil.copy(self.file_path, dest_dir)
[docs] def add_to_archive(self, tarf): """Copy this file into the given tarfile object. Args: tarf (tarfile.TarFile): Add this file to that archive. """ logger.debug("Adding %s to TAR file as %s", self.file_path, self.filename) tarf.add(self.file_path, self.filename)
[docs]class FileInTAR(FileReference): """Wrapper for a file inside a TAR archive or OVA."""
[docs] def __init__(self, tarfile_path, filename, **kwargs): """Create a reference to a file contained in a TAR archive. Args: tarfile_path (str): Path to TAR archive to read filename (str): File name in the TAR archive. **kwargs: Passed through to :meth:`FileReference.__init__`. Raises: IOError: if ``tarfile_path`` doesn't reference a TAR file, or the TAR file does not contain ``filename``. """ if not os.path.isabs(tarfile_path): logger.warning("Only absolute paths are accepted, but " 'got apparent relative path "%s".' "\nAttempting to convert it to an absolute path.", tarfile_path) tarfile_path = os.path.abspath(tarfile_path) if not tarfile.is_tarfile(tarfile_path): raise IOError("{0} is not a valid TAR file.".format(tarfile_path)) self.tarf = None super(FileInTAR, self).__init__(tarfile_path, filename, **kwargs)
@property def exists(self): """True if the file exists in the TAR archive, else False.""" with tarfile.open(self.container_path, 'r') as tarf: try: tarf.getmember(self.filename) return True except KeyError: # Perhaps an issue with 'foo.txt' versus './foo.txt'? for mem in tarf.getmembers(): if os.path.normpath(mem.name) == self.filename: logger.debug("Found %s at %s in TAR file", self.filename, mem.name) self.filename = mem.name return True return False @property def size(self): """The size of this file in bytes.""" if self._size is None or self.force_refresh: with tarfile.open(self.container_path, 'r') as tarf: self._size = tarf.getmember(self.filename).size return self._size
[docs] @contextmanager def open(self, mode): """Open the TAR and return a reference to the relevant file object. Args: mode (str): Only 'r' and 'rb' modes are supported. Yields: file: File object Raises: ValueError: if ``mode`` is not valid. """ # We can only extract a file object from a TAR file in read mode. if mode != 'r' and mode != 'rb': raise ValueError("FileInTar.open() only supports 'r'/'rb' mode") # actually tarf.extractfile is always a binary object... with tarfile.open(self.container_path, 'r') as tarf: self.tarf = tarf with closing(tarf.extractfile(self.filename)) as obj: yield obj self.tarf = None
[docs] def copy_to(self, dest_dir): """Extract this file to the given destination directory. Args: dest_dir (str): Destination directory or filename. """ with tarfile.open(self.container_path, 'r') as tarf: logger.debug("Extracting %s from %s to %s", self.filename, self.container_path, dest_dir) tarf.extract(self.filename, dest_dir)
[docs] def add_to_archive(self, tarf): """Copy this file into the given tarfile object. Args: tarf (tarfile.TarFile): Add this file to that archive. """ with self.open('r') as obj: logger.debug("Copying %s directly from %s to TAR file", self.filename, self.container_path) tarf.addfile(self.tarf.getmember(self.filename), obj)