import fnmatch
from functools import reduce
from sr.tools.inventory import assetcode, inventory
[docs]class ASTNode(object):
"""An abstract syntax tree node."""
[docs] def sexpr(self):
"""Get a string symbolic expression of the node."""
return ""
[docs]class NonTerminal(ASTNode):
"""A non-terminal AST node."""
[docs] def match(self, inv_nodes):
"""
Check whether the inventory nodes match the operation defined in this
AST node.
:returns: A list of matching nodes.
"""
raise NotImplementedError("match(...) not implemented"
" for {}".format(self.__class__))
[docs]class Terminal(ASTNode):
"""A terminal AST node."""
[docs] def match_single(self, inv_node):
"""
Check whether the inventory node matches the operation definied in this
AST node.
:returns: True or False depending on whether the node match.
"""
raise NotImplementedError("match_single(...) not implemented"
" for {}".format(self.__class__))
[docs] def match(self, inv_nodes):
"""
Check whether the inventory nodes match the operation definied in this
AST node.
:returns: A list of matching nodes.
"""
return list(filter(self.match_single, inv_nodes))
[docs]class Not(NonTerminal):
"""
An AST node representing a 'not' operation.
:param node: The node to be not-ed.
"""
def __init__(self, node):
"""Initialise a 'not' operation."""
super(Not, self).__init__()
self.node = node
[docs] def match(self, inv_nodes):
matches = self.node.match(inv_nodes)
return list(set([x for x in inv_nodes if x not in matches]))
[docs] def sexpr(self):
return "(NOT {0})".format(self.node.sexpr())
[docs]class And(NonTerminal):
"""
An AST node representing an 'and'.
:param left: The left side of the operation.
:param right: The right side of the operation.
"""
def __init__(self, left, right):
"""Initialise an 'and' operation."""
super(And, self).__init__()
self.left = left
self.right = right
[docs] def match(self, inv_nodes):
left_matches = self.left.match(inv_nodes)
right_matches = self.right.match(inv_nodes)
return list({x for x in inv_nodes
if (x in left_matches and x in right_matches)})
[docs] def sexpr(self):
return "(AND {0} {1})".format(self.left.sexpr(), self.right.sexpr())
[docs]class Or(NonTerminal):
"""
An AST node representing an 'or' operation.
:param left: The left side of the operation.
:param right: The right side of the operation.
"""
def __init__(self, left, right):
"""Initialise an 'or' operation."""
super(Or, self).__init__()
self.left = left
self.right = right
[docs] def match(self, inv_nodes):
left_matches = self.left.match(inv_nodes)
right_matches = self.right.match(inv_nodes)
return list({x for x in inv_nodes
if (x in left_matches or x in right_matches)})
[docs] def sexpr(self):
return "(OR {0} {1})".format(self.left.sexpr(), self.right.sexpr())
[docs]class Condition(Terminal):
"""
An AST node representing a 'condition' check operation.
:param conditions: A list of conditions that should be matched.
:type conditions: list of strs
"""
def __init__(self, *conditions):
"""Initialise the node with conditions."""
super(Condition, self).__init__()
self.conditions = set(conditions)
def _flatten(self, l):
if type(l) not in (list, tuple):
return l
ret = []
for i in l:
if type(i) in (list, tuple):
ret.extend(self._flatten(i))
else:
ret.append(i)
return ret
def _state(self, inv_node):
if isinstance(inv_node, inventory.Item):
return inv_node.condition
elif isinstance(inv_node, inventory.ItemGroup):
expected = inv_node.elements
if expected is None:
return 'working'
ret = []
for name in expected:
count = 1
if isinstance(name, dict):
name, count = list(name.items())[0]
if name not in inv_node.types:
ret.append('broken')
if name in inv_node.types:
c = 0
for type_ in inv_node.types[name]:
if c == count:
break
ret.append(self._state(type_))
c += 1
if c != count:
ret.append('broken')
else:
ret.append('broken')
ret = set(self._flatten(ret))
if len(ret) == 1:
return ret.pop()
if 'broken' in ret:
return 'broken'
return 'unknown'
[docs] def match_single(self, inv_node):
return self._state(inv_node) in self.conditions
[docs] def sexpr(self):
return "(Condition {0})".format(list(self.conditions))
[docs]class Type(Terminal):
"""
An AST node representing a 'type' check.
:param types: A list of types that should be checked.
"""
def __init__(self, *types):
"""Initialise the node."""
super(Type, self).__init__()
self.types = types
[docs] def match_single(self, inv_node):
if hasattr(inv_node, 'name'):
for type in self.types:
if fnmatch.fnmatch(inv_node.name, type):
return True
return False
[docs] def sexpr(self):
return "(Type {0})".format(list(self.types))
[docs]class Labelled(Terminal):
"""
An AST node representing a 'labelled' check.
:param labelled: Whether or not the asset is labelled.
:type labelled: A string representation of a bool ('true', '1', etc)
"""
def __init__(self, labelled):
"""Initialise the 'labelled' check node."""
super(Labelled, self).__init__()
self.labelled = labelled.lower() in ('true', '1', 'yes', 't')
[docs] def match_single(self, inv_node):
if hasattr(inv_node, 'labelled'):
return inv_node.labelled == self.labelled
return False
[docs] def sexpr(self):
return "(Labelled {0})".format(self.labelled)
[docs]class Assy(Terminal):
"""
An AST node representing an 'assy' check.
:param assy: Whether or not the asset has an assembly.
:type assy: A string representation of a bool ('true', '1', etc)
"""
def __init__(self, assy):
"""Initialise the 'assy' check node."""
super(Assy, self).__init__()
self.assy = assy.lower() in ('true', '1', 'yes', 't')
[docs] def match_single(self, inv_node):
return self.assy == (hasattr(inv_node, 'code')
and hasattr(inv_node, 'children'))
[docs] def sexpr(self):
return "(Assy {0})".format(self.assy)
[docs]class TriState(Terminal):
"""
An AST node representing a 'tri' check.
:param str key: The key to check.
:param str desired_val: One of 'unset', 'true', 'false'.
"""
def __init__(self, key, desired_val):
"""Initialise a 'tri' check node."""
super(TriState, self).__init__()
self.key = key
self.desired_val = desired_val.lower()
[docs] def match_single(self, inv_node):
if self.desired_val == 'unset':
return self.key not in inv_node.info.keys()
if self.key in inv_node.info.keys():
if self.desired_val == "true":
return inv_node.info[self.key]
else:
return inv_node.info[self.key] is False
return False
[docs] def sexpr(self):
return "(TriState {0}: {1})".format(self.key, self.desired_val)
[docs]class Path(Terminal):
"""
An AST node representing a 'path' check.
:param paths: A list of paths that are deamed valid.
:type paths: list of str
"""
def __init__(self, *paths):
"""Initialise the 'path' check node."""
super(Path, self).__init__()
paths = [path[1:] if path[0] == '/' else path for path in paths]
self.paths = [path + '*' for path in paths]
def _root_path(self, inv_node):
n = inv_node
while n is not None:
if n.parent is None:
return n.path
n = n.parent
[docs] def match_single(self, inv_node):
root_path = self._root_path(inv_node)
if hasattr(inv_node, 'path'):
for path in self.paths:
if fnmatch.fnmatch(inv_node.path[len(root_path) + 1:], path):
return True
return False
[docs] def sexpr(self):
return "(Path {0})".format(list(self.paths))
[docs]class Code(Terminal):
"""
An AST node representing a 'code' check.
:param codes: A list of valid codes.
:type codes: list of str
"""
def __init__(self, *codes):
"""Initialise the 'code' check node."""
self.codes = set(map(assetcode.normalise, codes))
[docs] def match_single(self, inv_node):
return inv_node.code in self.codes
[docs] def sexpr(self):
return "(Code {0})".format(list(self.codes))
[docs]class Serial(Terminal):
"""
An AST node representing a 'serial' check.
:param str serials: A list of valid serials.
"""
def __init__(self, *serials):
"""Initialise the 'serial' check node."""
self.serials = set(serials)
[docs] def match_single(self, inv_node):
serial = inv_node.info.get("serial", None)
return serial in self.serials
[docs] def sexpr(self):
return "(Serial {0})".format(list(self.serials))
[docs]class Function(NonTerminal):
"""
An AST node representing a function.
:params str func_name: The name of the function.
:params node: The node to run the functions on.
"""
_functions = {}
[docs] @classmethod
def register(cls, name):
def wrap(f):
cls._functions[name] = f
return f
return wrap
[docs] @classmethod
def registered_names(cls):
return cls._functions.keys()
def __init__(self, func_name, node):
"""Initialise the function node."""
self.func_name = func_name
self.node = node
[docs] def match(self, inv_nodes):
return list(set(reduce(lambda x, y: list(x) + list(y),
map(self._functions[self.func_name],
self.node.match(inv_nodes)), [])))
[docs] def sexpr(self):
return "(Function '{0}' {1})".format(self.func_name, self.node.sexpr())
@Function.register('parent')
def _parent(inv_node):
"""A function which returns the parent of a node."""
return [inv_node.parent]
@Function.register('children')
def _children(inv_node):
"""A function which returns the children of a node."""
if not hasattr(inv_node, 'children'):
return []
return inv_node.children.values()
@Function.register('siblings')
def _siblings(inv_node):
"""A function which returns the siblings of a node."""
return [x for x in inv_node.parent.children.values() if x is not inv_node]
@Function.register('descendants')
def _descendants(inv_node):
"""A function which returns the descendants of a node."""
def rec(node):
children = getattr(node, 'children', {})
return sum(map(rec, children.values()), [node])
return rec(inv_node)[1:]