Source code for sr.tools.inventory.inventory

"""
A set of classes and functions for working with
:doc:`The Inventory </inventory/index>`.
"""
from __future__ import print_function

import codecs
import email.utils
import hashlib
import re
import os
import subprocess
import sys

import six.moves.cPickle as pickle

import yaml

from sr.tools.inventory import assetcode
from sr.tools.environment import get_cache_dir


CACHE_DIR = get_cache_dir('inventory')
RE_PART = re.compile("^(.+)-sr([%s]+)$" % "".join(assetcode.ALPHABET))


[docs]class NotAnInventoryError(OSError): """ Raised when an inventory object is created for a directory that is not an inventory. :param directory: The directory that is not an inventory. Also accessible as the ``directory`` attribute of this class. """ def __init__(self, directory): msg = "'{directory}' is not an inventory.".format(directory=directory) super(NotAnInventoryError, self).__init__(msg) self.directory = directory
[docs]class InvalidFileError(ValueError): """ Raised when an invalid file is found in the inventory. :param path: The path to the file, relative to the inventory. Also accessible as the ``path`` attribute of this class. """ def __init__(self, path, comment): msg = "Invalid asset: '{}' {}.".format(path, comment) super(InvalidFileError, self).__init__(msg) self.path = path
[docs]def find_top_level_dir(start_dir=None): """ Find the top level of the inventory repo. :param str start_dir: The working to start the search from. If this is None, the current working directory is used. :returns: The top level directory or None. :rtype: str or None """ try: cmd = ['git', 'rev-parse', '--show-toplevel'] gitdir = subprocess.check_output(cmd, universal_newlines=True, cwd=start_dir).strip() except subprocess.CalledProcessError: return None usersfn = os.path.join(gitdir, ".meta", "users") if not os.path.isfile(usersfn): return None return gitdir
[docs]def get_inventory(directory=None): """ Get an :class:`Inventory` object for a directory. :param str directory: The directory to find the inventory from. If this is left as None, the current working directory is used. :returns: An instance of an :class:`Inventory` object pointing to the inventory in the directory specified. :rtype: :class:`Inventory` :raises OSError: If the directory is not an inventory. """ if directory is None: directory = os.getcwd() top = find_top_level_dir(directory) if top is None: raise NotAnInventoryError(directory) return Inventory(top)
[docs]def should_ignore(path): """ Check if the path should be ignored. A path that is deamed ignore-worthy starts with '.' or ends with '~'. :param str path: The path to check. :returns: ``True`` if the path should be ignored, else ``False``. :rtype: bool """ if path[0] == ".": return True if path[-1] == "~": return True return False
[docs]def cached_yaml_load(path): """ Load a pickled YAML file from cache. :param str path: The path to load. :returns: The loaded YAML file, possibly from cache. :rtype: dict """ path = os.path.abspath(path) ho = hashlib.sha256() ho.update(path.encode('UTF-8')) h = ho.hexdigest() if not os.path.exists(CACHE_DIR): os.makedirs(CACHE_DIR) p = os.path.join(CACHE_DIR, h) if os.path.exists(p): # cache has file if os.path.getmtime(p) >= os.path.getmtime(path): # check that it's newer try: with open(p, 'rb') as file: return pickle.load(file) except EOFError: os.remove(p) # cache file corrupted, recreate it y = yaml.load(codecs.open(path, "r", encoding="utf-8")) with open(p, 'wb') as file: pickle.dump(y, file) return y
[docs]class Item(object): """ An item in the inventory. :param str path: The path to the item. :param parent: The item parent. """ def __init__(self, path, parent=None): """Create a new ``Item`` object.""" self.path = path self.parent = parent m = RE_PART.match(os.path.basename(path)) if m is None: raise InvalidFileError(path, "does not have a valid name (should be" " in the form <name>-sr<part-code>)") self.name = m.group(1) self.code = m.group(2) # Load data from yaml file self.info_path = path self.info = cached_yaml_load(self.info_path) # Verify that assetcode matches filename if self.info["assetcode"] != self.code: print("Code in asset filename does not match contents of file:", file=sys.stderr) print("\t code in filename: '%s'" % self.code, file=sys.stderr) print("\t code in contents: '%s'" % self.info["assetcode"], file=sys.stderr) print("\n\tOffending file:", self.path, file=sys.stderr) sys.exit(1) # The mandatory properties for pname in ["labelled", "description", "value", "condition"]: try: setattr(self, pname, self.info[pname]) except KeyError: raise ValueError("Part sr{} is missing '{}' property." .format(self.code, pname))
[docs]class ItemTree(object): """ A tree of items in the inventory. :param str path: The path to the tree. :param parent: The parent item or tree. """ special_fnames = { 'info': "group 'info' files may only exist within directories " "which are themselves assets", } ignore_fnames = () def __init__(self, path, parent=None): """Create a new item tree.""" self.name = os.path.basename(path) self.path = path self.parent = parent self.children = {} self._find_children() self.parts = {} self.types = {} for i in self.walk(): self.parts[i.code] = i if i.name not in self.types: self.types[i.name] = [] self.types[i.name].append(i) def _should_ignore(self, fname): """ Ignore dotfiles etc. """ if should_ignore(fname): return True if fname in self.ignore_fnames: return True return False def _find_children(self): for fname in os.listdir(self.path): if self._should_ignore(fname): continue p = os.path.join(self.path, fname) if os.path.isfile(p): if fname in self.special_fnames: raise InvalidFileError(p, self.special_fnames[fname]) # it's got to be an item i = Item(p, parent=self) self.children[i.code] = i elif os.path.isdir(p): # could either be a group or a collection if RE_PART.match(p) is not None: a = ItemGroup(p, parent=self) self.children[a.code] = a else: t = ItemTree(p, parent=self) self.children[t.name] = t
[docs] def walk(self): """ Walk through the item tree, yielding the children. :returns: An iteration of children. """ for child in self.children.values(): if hasattr(child, "walk"): for c in child.walk(): yield c if hasattr(child, "code"): yield child
[docs] def resolve(self, path): """ Resolve the given path into an object. :param str path: The path to resolve. :returns: The resolved items or ``self``. """ if path[0] == "/": path = path[1:] pos = self for n in path.split("/"): pos = pos.children[n] return pos
[docs]class ItemGroup(ItemTree): """ A group of items in the inventory. :param str path: The path to the item group. :param parent: The parent item or tree. """ ignore_fnames = ('info',) def __init__(self, path, parent=None): """Create a new item group.""" ItemTree.__init__(self, path, parent=parent) m = RE_PART.match(os.path.basename(path)) self.name = m.group(1) self.code = m.group(2) # Load info from 'info' file self.info_path = os.path.join(path, "info") self.info = cached_yaml_load(self.info_path) if self.info["assetcode"] != self.code: print("Code in group directory name does not match info file:", file=sys.stderr) print("\t code in directory name: '%s'" % self.code, file=sys.stderr) print("\t code in info: '%s'" % self.info["assetcode"], file=sys.stderr) print("\n\tOffending group:", self.path, file=sys.stderr) sys.exit(1) self.description = self.info["description"] if "elements" not in self.info: raise Exception("Group %s lacks an elements field" % self.code) self.elements = self.info["elements"]
[docs]class Inventory(object): """ An inventory. :param str root_path: The root path to the inventory. """ def __init__(self, root_path): """Create a new inventory.""" self.root_path = root_path self.root = ItemTree(root_path) self._load_users() def _load_users(self): self.users = {} with open(os.path.join(self.root_path, '.meta', 'users')) as file: users = yaml.safe_load(file) for details, user_id in users.items(): self.users[email.utils.parseaddr(details)] = user_id @property def current_user_number(self): """ Get the user ID of the currently configured Git user. :returns: The current user number. :raises KeyError: If the user doesn't exist. """ user = self.get_current_git_user() return self.users[user]
[docs] @staticmethod def get_current_git_user(): """ Get the currently configured Git user. :returns: A tuple containing the name and email address. :rtype: tuple """ name = subprocess.check_output(["git", "config", "user.name"], universal_newlines=True).strip() email = subprocess.check_output(["git", "config", "user.email"], universal_newlines=True).strip() return (name, email)
@property def asset_codes(self): """ Get all the asset codes. :returns: An iteration over the codes. """ for dirpath, dirnames, filenames in os.walk(self.root_path): if '.git' in dirpath or '.meta' in dirpath: continue for filename in dirnames + filenames: if '-sr' in filename: code = filename[filename.rindex('-sr') + 3:] yield code
[docs] def get_next_asset_code(self, user_number): """ Get the next available asset code. :param int user_number: The user number for the asset code. :returns: The new asset code. """ maxno = -1 for p in map(assetcode.code_to_num, self.asset_codes): if p[0] == user_number: maxno = max(maxno, p[1]) return assetcode.num_to_code(user_number, maxno + 1)
[docs] def query(self, query_str): """ Run a query on the inventory. :param str query_str: The query to run on the inventory. :returns: Any items found from the query. :rtype: list of :class:`Item` :raises pyparsing.ParseError: If the query could not be parsed. """ from sr.tools.inventory import query_parser # circular dependency tree = query_parser.search_tree(query_str) return tree.match(self.root.parts.values())