"""Library for accessing the budget files."""
from __future__ import print_function
import collections
from decimal import Decimal as D, ROUND_CEILING, ROUND_FLOOR, ROUND_UP
import runpy
import os
import six
from subprocess import check_call, check_output, CalledProcessError
import sys
import tempfile
from tempfile import NamedTemporaryFile
import tokenize
from six.moves import cStringIO as StringIO
import yaml
try:
from yaml import CLoader as YAML_Loader
except ImportError:
from yaml import Loader as YAML_Loader
# Spending against a budget line is allowed to go over its value by this factor
FUDGE_FACTOR = D("1.1")
[docs]def dict_constructor(loader, node):
"""Constructor for libyaml to use ordered dicts instead of dicts."""
return collections.OrderedDict(loader.construct_pairs(node))
[docs]def num_constructor(loader, node):
"""Constructor for libyaml to translate numeric literals to Decimals."""
return D(node.value)
# Give me ordered dictionaries back
YAML_Loader.add_constructor(yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
dict_constructor)
# Parse floats as decimals
YAML_Loader.add_constructor("tag:yaml.org,2002:float", num_constructor)
[docs]def dec_ceil(d):
"""Get the ceiling of a decimal."""
return d.to_integral_exact(ROUND_CEILING)
[docs]def dec_floor(d):
"""Get the floor of a decimal."""
return d.to_integral_exact(ROUND_FLOOR)
[docs]def py_translate_to_decimals(s):
"""Translate any literal floats in the given source into decimals."""
# Parse numbers in the string as Decimals
# based on example from http://docs.python.org/2.7/library/tokenize.html
result = []
g = tokenize.generate_tokens(StringIO(str(s)).readline)
for toknum, tokval, _, _, _ in g:
if toknum == tokenize.NUMBER and '.' in tokval:
result.extend([
(tokenize.NAME, 'Decimal'),
(tokenize.OP, '('),
(tokenize.STRING, repr(tokval)),
(tokenize.OP, ')')
])
else:
result.append((toknum, tokval))
# Turn it back into python
return tokenize.untokenize(result)
[docs]class BudgetItem(object):
"""
A budget item.
:param str name: The name of the item.
:param str fname: The filename of the item.
:param conf: The configuration."""
def __init__(self, name, fname, conf):
"""Create a new budget item."""
self.fname = fname
self.conf = conf
y = yaml.load(open(fname, "r"), Loader=YAML_Loader)
if False in [x in y for x in ["cost", "summary", "description"]]:
print("Error: %s does not match schema." % fname, file=sys.stderr)
sys.exit(1)
self.name = name
self.summary = y["summary"]
self.description = y["description"]
if "closed" in y:
self.closed = y["closed"]
else:
self.closed = False
if self.closed:
# Lines that are closed have no uncertainty
self.uncertainty = 0
elif "uncertainty" in y:
self.uncertainty = D(y["uncertainty"])
else:
self.uncertainty = FUDGE_FACTOR - 1
if "consumable" in y:
self.consumable = y["consumable"]
else:
self.consumable = None
self.cost = self._parse_cost(y["cost"], conf)
def _parse_cost(self, s, conf):
"""Parse the cost string."""
s = py_translate_to_decimals(s)
cost = eval(s,
{"Decimal": D,
"ceil": dec_ceil,
"ceiling": dec_ceil,
"floor": dec_floor},
conf.vars)
if isinstance(cost, int):
cost = D(cost)
# Round the result up to the nearest penny
cost = cost.quantize(D("0.01"), rounding=ROUND_UP)
return cost
[docs]class InvalidPath(Exception):
"""An exception representing an invalid budget path."""
pass
[docs]class BudgetTree(object):
"""
Container for the BudgetItems and BudgetTrees below a certain point.
:param str name: The name of the tree.
"""
def __init__(self, name):
"""Create the budget tree."""
self.children = {}
self.name = name
[docs] def add_child(self, child):
"""
Add a child to the tree.
:param child: The child to add.
:type child: ``BudgetTree`` or ``BudgetItem``.
:raises ValueError: If the child is not a valid type.
"""
if isinstance(child, BudgetTree):
self.children[child.name] = child
elif isinstance(child, BudgetItem):
self.children[os.path.basename(child.name)] = child
else:
raise ValueError("Attempted to add unsupported object type to "
"BudgetTree")
[docs] def total(self):
"""
Sum all children.
:returns: The total cost of all the children.
:rtype: decimal.Decimal
"""
t = D(0)
for ent in self.children.values():
if isinstance(ent, BudgetTree):
t += ent.total()
else:
t += ent.cost
return t
[docs] def walk(self):
"""
Walk through all the BudgetItems of the this tree.
:returns: An iterator over the items in the tree.
"""
for c in self.children.values():
if isinstance(c, BudgetItem):
yield c
elif isinstance(c, BudgetTree):
for e in c.walk():
yield e
[docs] def path(self, path):
"""
Get the object at the given path relative to this one.
:returns: The object at the path.
:raises InvalidPath: If the path is invalid.
"""
pos = self
for s in path.split("/"):
try:
pos = pos.children[s]
except KeyError:
raise InvalidPath("'{}' has no child node '{}'"
.format(pos.name, s))
return pos
[docs] def draw(self, fd=sys.stdout, space=" ", prefix=""):
"""
Draw a text-representation of the tree.
:param fd: Where to print the result.
:param space: The string representing a space.
:param prefix: The prefix for each line in the tree.
"""
format_string = '{prefix}--{name} ({cost})'
print(format_string.format(prefix=prefix,
name=os.path.basename(self.name),
cost=self.total()), file=fd)
for n, c in enumerate(self.children.values()):
child_prefix = prefix + space
if isinstance(c, BudgetItem):
if n == len(self.children) - 1:
child_prefix += "+"
else:
child_prefix += "|"
print(format_string.format(prefix=child_prefix,
name=os.path.basename(c.name),
cost=c.cost), file=fd)
elif isinstance(c, BudgetTree):
child_prefix = prefix + space
if n == len(self.children) - 1:
child_prefix += " "
else:
child_prefix += "|"
c.draw(fd=fd, prefix=child_prefix)
[docs]class NoBudgetConfig(Exception):
"""
An exception representing that no config file for the budget has been
found.
"""
def __init__(self):
super(NoBudgetConfig, self).__init__("No config file found")
[docs]class BudgetConfig(object):
"""
A class representing a budget config.
:param str root: The root path to the budget.
"""
def __init__(self, root):
"""Create a new budget config."""
pypath = os.path.join(root, "config.py")
yamlpath = os.path.join(root, "config.yaml")
if os.path.exists(pypath):
self._load_from_py(pypath)
self.path = pypath
elif os.path.exists(yamlpath):
self._load_from_yaml(yamlpath)
self.path = yamlpath
else:
raise NoBudgetConfig()
def _load_from_py(self, fname):
with open(fname, "r") as in_file:
in_src = in_file.read()
if six.PY3:
tempfile = NamedTemporaryFile('w', encoding='utf-8')
else:
tempfile = NamedTemporaryFile('w')
with tempfile as f:
trans_src = py_translate_to_decimals(in_src)
f.write(trans_src)
f.flush()
conf = runpy.run_path(f.name,
init_globals={"Decimal": D,
"ceil": dec_ceil,
"ceiling": dec_ceil,
"floor": dec_floor})
# Variables that are part of the normal running environment
nullset = set(runpy.run_path("/dev/null").keys())
# Remove vars that are part of the normal running env
for name in nullset:
if name in conf:
conf.pop(name)
self.vars = conf
for vname in list(self.vars.keys()):
val = self.vars[vname]
if type(val) not in [int, D, float]:
self.vars.pop(vname)
def _load_from_yaml(self, fname):
"""Munge the old yaml file into a python file."""
# Use the python loader to make ordered dicts work
y = yaml.load(open(fname, "r"), Loader=YAML_Loader)
if six.PY3:
tempfile = NamedTemporaryFile('w', encoding='utf-8')
else:
tempfile = NamedTemporaryFile('w')
with tempfile as f:
for vname, val in y["vars"].items():
print("{0} = {1}".format(vname, val), file=f)
f.flush()
self._load_from_py(f.name)
[docs]def load_budget(root):
"""
Load the budget from a budget root.
:param root: The root path to the budget.
"""
root = os.path.abspath(root)
funds_in_path = os.path.join(root, "funds-in.yaml")
conf = BudgetConfig(root)
tree = BudgetTree("sr")
for dirpath, dirnames, filenames in os.walk(root):
for d in [".git", ".meta"]:
try:
dirnames.remove(d)
except ValueError:
# those directories will not always be there
pass
for fname in filenames:
fullp = os.path.abspath(os.path.join(dirpath, fname))
if fullp in [conf.path, funds_in_path]:
# these files are yaml files, but not budget items
continue
if fname[-5:] != ".yaml":
continue
name = fullp[len(root) + 1:-5]
r = tree
for d in name.split("/")[:-1]:
if d not in r.children:
r.add_child(BudgetTree(d))
r = r.children[d]
r.add_child(BudgetItem(name, fullp, conf))
return tree
[docs]class TmpBudgetExport(object):
"""
A class for temporarily exporting a budget.
:param root: The root path to the budget.
:param rev: The revision to export.
"""
def __init__(self, root, rev):
"""Create a new temporary budget export."""
self.rev = rev
self.tmpdir = tempfile.mkdtemp()
self._export(root, rev, self.tmpdir)
self.btree = load_budget(self.tmpdir)
def _export(self, root, rev, path):
check_call("git archive {0} | tar -x -C {1}".format(rev, path),
cwd=root, shell=True)
[docs]def load_budget_rev(root, rev):
"""
Load a specific revision of the budget.
:param str root: The root of the budget.
:param str rev: The revision to get.
"""
t = TmpBudgetExport(root, rev)
return t.btree
[docs]class NotBudgetRepo(Exception):
"""An exception raised if the repo is not a budget."""
pass
[docs]def find_root(path=None):
"""
Find the root directory of the budget repository.
Checks that the repository is budget.git too.
:param path: if provided is a path within the budget.git repository
(defaults to working directory)
"""
if path is None:
path = os.getcwd()
try:
# check that we're in budget.git
with open("/dev/null", "w") as n:
check_call(["git", "rev-list",
# This is the first commit of spending.git
"c7e8a3bdc82ad244ed302bf9a7f4934e0ca83292"],
cwd=path,
stdout=n,
stderr=n)
except CalledProcessError:
# it's not the spending repository
raise NotBudgetRepo()
root = check_output(["git", "rev-parse", "--show-toplevel"],
cwd=path)
return root.strip().decode('utf-8')
[docs]class AddedItem(object):
"""An item that has been added to the budget."""
def __init__(self, a):
self.a = a
[docs]class RemovedItem(object):
"""An item that has been removed from the budget."""
def __init__(self, a):
self.a = a
[docs]class ChangedItem(object):
"""An item in the budget that has been changed."""
def __init__(self, a, b):
self.a = a
self.b = b
def _item_dict(tree):
items = {}
for i in tree.walk():
items[i.name] = i
return items
[docs]def diff_trees(a, b):
"""
Get the difference betwen two budget trees.
:returns: A sorted list of changes containing either ``AddedItem``,
``RemovedItem`` or ``ChangedItem`` objects.
:rtype: list of ``AddedItem``, ``RemovedItem`` and ``ChangedItem``
"""
a_items = _item_dict(a)
b_items = _item_dict(b)
# Remove items from this list as we find them
# it'll eventually have things in it that aren't in A
added = list(b_items.values())
changes = []
for ai in a_items.values():
if ai.name in b_items:
# Features in both trees
bi = b_items[ai.name]
added.remove(bi)
if bi.cost != ai.cost:
# changed value
changes.append(ChangedItem(ai, bi))
else:
# it's been removed
changes.append(RemovedItem(ai))
for i in added:
changes.append(AddedItem(i))
return sorted(changes, key=lambda change: change.a.name)
[docs]def changes_to_tree(changes):
"""
Convert a list of changes into a tree.
:param changes: A list of changes in the budget.
:raises ValueError: If the list of changes contains non-change objects.
:returns: A new budget tree.
:rtype: BudgetTree
"""
tree = BudgetTree("sr")
for c in changes:
item = None
if isinstance(c, AddedItem):
item = c.a
elif isinstance(c, ChangedItem):
# Craft a new item that describes the change in cost
item = BudgetItem(c.a.name, c.a.fname, c.a.conf)
item.cost = c.b.cost - c.a.cost
elif isinstance(c, RemovedItem):
# Craft a new item that describes the reduction in cost
item = BudgetItem(c.a.name, c.a.fname, c.a.conf)
item.cost *= -1
else:
raise ValueError("Unsupported object type in change list")
def fudge_parent(parents, name):
"""Fudge a directory name in because there were two..."""
name = "{0}.d".format(name)
if name in parents[-1].children:
res = parents[-1].children[name]
else:
res = BudgetTree(name)
parents[-1].add_child(res)
return res
r = tree
parents = []
for d in item.name.split("/")[:-1]:
if not hasattr(r, "children"):
# It's not a directory:
# Add a directory at the same level with '.d' on the end.
r = fudge_parent(parents, d)
if d not in r.children:
r.add_child(BudgetTree(d))
parents.append(r)
r = r.children[d]
if not isinstance(r, BudgetTree):
# need to fudge a directory in
r = fudge_parent(parents, os.path.basename(r.name))
if os.path.basename(item.name) in r.children:
raise ValueError("Unsupported situation!")
r.add_child(item)
return tree