import sys from collections import defaultdict from itertools import groupby # TODO: Keep trail of message travelling through the rules class Element(object): def __init__(self): self.outputs = [] def display(self, indent): print ("\t" * indent) + self.get_description() for output in self.outputs: output.display(indent + 1) class Bin(Element): def __init__(self, name): Element.__init__(self) self.name = name def get_description(self): return "[Bin] %s" % self.name def process(self, message): self.forward(message) def forward(self, message): for output in self.outputs: output.process(message) class Rule(Element): def __init__(self, input_): Element.__init__(self) self.input_ = input_ def process(self, message): self.forward(message) def forward(self, message): for output in self.outputs: output.process(message) class Filter(Rule): def __init__(self, input_, rule): Rule.__init__(self, input_) self.rule = rule # Rules: # Boolean 'and' has precedence over 'or' # Enclosure in parentheses means creating a new FilterExpressionGroup # Having 'and' and 'or' operators in the same group means a new FilterExpressionGroup is created for every 'and' chain, to retain precedence # Variable accessors are prefixed with $ # Strings are enclosed in "quotes" rule_length = len(rule) idx = 0 buff = "" in_expression = False current_element = {} element_list = defaultdict(list) operator_list = defaultdict(list) current_depth = 0 while idx < rule_length: char = rule[idx] #print len(buff), len(rule), idx, buff if char == "(" and in_expression == False: # New group encountered #print "START GROUP %d" % current_depth current_depth += 1 elif char == ")" and in_expression == False: # End statement, Process list of elements element_list[current_depth].append(create_filter_expression(buff)) # Add elements to group object group = create_group(element_list[current_depth], operator_list[current_depth]) element_list[current_depth - 1].append(group) element_list[current_depth] = [] operator_list[current_depth] = [] # Clear out lists to prevent working with stale data #print "-- GR: %s" % group buff = "" current_depth -= 1 #print "END GROUP %d" % current_depth elif char == '"': in_expression = not in_expression buff += '"' elif not in_expression and char == "o" and idx + 2 < rule_length and rule[idx+1:idx+2] == "r" and len(buff) > 0 and (buff[-1] == " " or buff[-1] == ")"): # End statement, Boolean OR if buff.strip() != "": element_list[current_depth].append(create_filter_expression(buff)) operator_list[current_depth].append(OR) buff = "" idx += 1 # We read ahead one position extra elif not in_expression and char == "a" and idx + 3 < rule_length and rule[idx+1:idx+3] == "nd" and len(buff) > 0 and (buff[-1] == " " or buff[-1] == ")"): # End statement, Boolean AND if buff.strip() != "": element_list[current_depth].append(create_filter_expression(buff)) operator_list[current_depth].append(AND) buff = "" idx += 2 # We read ahead two positions extra else: buff += char idx += 1 if current_depth > 0: raise Exception("Missing %d closing parenthese(s)." % current_depth) elif current_depth < 0: raise Exception("Missing %d opening parenthese(s)." % (0 - current_depth)) # If there's anything left in the buffer, it's probably a statement we still need to process. if buff.strip() != "": element_list[current_depth].append(create_filter_expression(buff)) if len(element_list[current_depth]) > 1: # Multiple elements, need to encapsulate in a group root_element = create_group(element_list[current_depth], operator_list[current_depth]) elif len(element_list[current_depth]) == 1: root_element = element_list[current_depth][0] else: # FIXME: Proper exception raise Exception("No root elements?!") self.root = root_element #print repr(root_element) def get_description(self): return "[Filter] %s" % self.rule def evaluate(self, message): return self.root.evaluate(message) def process(self, message): if self.evaluate(message): self.forward(message) def create_group(elements, operators): group = FilterExpressionGroup() # Process operators if len(elements) > 1: # Check if the operators vary operator_discrepancy = (len(set(operators)) > 1) if operator_discrepancy: # We'll need to find the 'and' chains and push them into separate child groups idx = 0 sieve = [True for x in xrange(0, len(elements))] final_list = [] for operator, items in groupby(operators): items = list(items) start = idx end = idx + len(items) + 1 relevant_elements = elements[start:end] if operator == AND: for i in xrange(start, end): # Mark as processed sieve[i] = False for i in [x for x in xrange(0, end) if sieve[x] is True]: final_list.append(elements[i]) sieve[i] = False final_list.append(create_group(relevant_elements, [AND for x in xrange(0, end - start)])) idx += len(items) for element in final_list: group.add(element) group.relation = OR # Hardcoded, because all AND chains are taken care of above... else: for element in elements: group.add(element) group.relation = operators[0] else: group.add(elements[0]) return group def create_filter_expression(buff): # TODO: Use shlex split because of spaces in strings? left, operator, right = [x.strip() for x in buff.split(None, 2)] if left[0] == '"' and left[-1] == '"': left_obj = FilterExpressionString(left[1:-1]) elif left[0] == "$": if "[" in left[1:] and left[-1] == "]": name, scope = left[1:-1].split("[", 1) else: name = left[1:] scope = None left_obj = FilterExpressionVariable(name, scope) else: raise Exception("Unrecognized operand type") # No other types supported yet... if right[0] == '"' and right[-1] == '"': right_obj = FilterExpressionString(right[1:-1]) elif right[0] == "$": if "[" in right[1:] and right[-1] == "]": name, scope = right[1:-1].split("[", 1) else: name = right[1:] scope = None right_obj = FilterExpressionVariable(name, scope) else: raise Exception("Unrecognized operand type") # No other types supported yet... operators = { "=": EQUALS, "==": EQUALS, "!=": NOT_EQUALS, ">": MORE_THAN, "<": LESS_THAN, ">=": MORE_THAN_OR_EQUALS, "<=": LESS_THAN_OR_EQUALS, "has": HAS } try: operator_type = operators[operator] except KeyError, e: raise Exception("Invalid operator") expression = FilterExpression(left_obj, operator_type, right_obj) return expression # Broken? #print expression class BinReference(Rule): def __init__(self, input_, name): Rule.__init__(self, input_) self.bin_name = name def get(self): try: return bins[self.bin_name] except KeyError, e: new_bin = Bin(self.bin_name) bins[self.bin_name] = new_bin return new_bin def get_description(self): return "[BinRef] %s" % self.bin_name def process(self, message): # TODO: Actually deposit into bin print "DEPOSITED INTO BIN %s" % self.name class NodeReference(Rule): def __init__(self, input_, name): Rule.__init__(self, input_) self.node_name = name def get_description(self): return "[NodeRef] %s" % self.node_name def process(self, message): # TODO: Actually forward to node print "FORWARDED TO NODE %s" % self.node_name class MethodReference(Rule): def __init__(self, input_, name): Rule.__init__(self, input_) self.method_name = name def get_description(self): return "[MethodRef] %s" % self.method_name def process(self, message): # TODO: Actually pass to method print "PASSED TO METHOD %s" % self.method_name class DistributorReference(Rule): def __init__(self, input_, name, args): Rule.__init__(self, input_) self.distributor_name = name self.args = args def get_description(self): return "[DistRef] %s (%s)" % (self.distributor_name, ", ".join(self.args)) def process(self, message): # TODO: Actually distribute # TODO: Parse args pass#print "DISTRIBUTED TO %s" % self.get_description() NONE = 0 AND = 1 OR = 2 EQUALS = 3 NOT_EQUALS = 4 LESS_THAN = 5 MORE_THAN = 6 LESS_THAN_OR_EQUALS = 7 MORE_THAN_OR_EQUALS = 8 HAS = 9 class FilterExpression(object): def __init__(self, left, operator, right): self.left = left self.operator = operator self.right = right def evaluate(self, message): if self.operator == EQUALS: return (self.left.value(message) == self.right.value(message)) elif self.operator == NOT_EQUALS: return (self.left.value(message) != self.right.value(message)) elif self.operator == LESS_THAN: return (self.left.value(message) < self.right.value(message)) elif self.operator == MORE_THAN: return (self.left.value(message) > self.right.value(message)) elif self.operator == LESS_THAN_OR_EQUALS: return (self.left.value(message) <= self.right.value(message)) elif self.operator == MORE_THAN_OR_EQUALS: return (self.left.value(message) >= self.right.value(message)) elif self.operator == HAS: if is_instance(self.left, basestring): return (self.right.value(message) in self.left.value(message)) # Substring comparison else: return (self.right.value(message) in self.left.values(message)) # In-array check else: # TODO: Log error return False def __repr__(self): if self.operator == EQUALS: opname = "EQUALS" elif self.operator == NOT_EQUALS: opname = "NOT EQUALS" elif self.operator == LESS_THAN: opname = "LESS THAN" elif self.operator == MORE_THAN: opname = "MORE THAN" elif self.operator == LESS_THAN_OR_EQUALS: opname = "LESS THAN OR EQUAL" elif self.operator == MORE_THAN_OR_EQUALS: opname = "MORE THAN OR EQUAL" elif self.operator == HAS: opname = "HAS" else: opname = "?" return "" % (repr(self.left), opname, repr(self.right)) class FilterExpressionGroup(object): def __init__(self): self.elements = [] self.relation = NONE def add(self, element): self.elements.append(element) def evaluate(self, message): if self.relation == AND: for element in self.elements: if element.evaluate(message) != True: return False return True elif self.relation == OR: for element in self.elements: if element.evaluate(message) == True: return True return False else: # TODO: Log error return False def __repr__(self): if self.relation == AND: relname = "AND" elif self.relation == OR: relname = "OR" else: relname = "?" return "" % (relname, ", ".join(repr(x) for x in self.elements)) class FilterExpressionElement(object): def select_value(self, message, scope, name, multiple=False): if scope == "tags": return_value = message.tags elif scope == "type": return_value = message.type_ elif scope == "source": return_value = message.source elif scope == "chain": return_value = message.chain elif scope == "attr": return_value = self.select_attribute(message, name, multiple=multiple) else: raise Exception("Unknown scope") # FIXME: Proper exception if isinstance(return_value, basestring): return return_value elif multiple == False and len(return_value) > 0: return return_value[0] else: raise Exception("No value found") # FIXME: Proper exception def select_attribute(self, message, query, multiple=False): segments = query.split("/") current_object = message.data for segment in segments: try: current_object = current_object[segment] except KeyError, e: raise Exception("Unknown attribute") # FIXME: Proper exception return current_object class FilterExpressionVariable(FilterExpressionElement): def __init__(self, scope, name=None): self.scope = scope self.name = name # TODO: name path parsing def value(self, message): return self.select_value(message, self.scope, self.name, multiple=False) def values(self, message): return self.select_value(message, self.scope, self.name, multiple=True) def __repr__(self): return "" % (self.scope, self.name) class FilterExpressionString(FilterExpressionElement): def __init__(self, string): self.string = string def value(self, message): return self.string def values(self, message): return [self.string] def __repr__(self): return "" % self.string def create_rule(buff, input_): buff = buff.strip() if buff[0] == "*": # Node reference new_obj = NodeReference(input_, buff[1:]) elif buff[0] == "@": # Method call new_obj = MethodReference(input_, buff[1:]) elif buff[0] == "#": # Bin reference new_obj = BinReference(input_, buff[1:]) elif buff[0] == ":": # Distributor if "(" in buff and buff[-1:] == ")": name, arglist = buff[1:-1].split("(", 1) args = [x.strip() for x in arglist.split(",")] else: name = buff[1:] args = [] new_obj = DistributorReference(input_, name, args) else: # Filter new_obj = Filter(input_, buff) input_.outputs.append(new_obj) return new_obj f = open(sys.argv[1]) rulebook = f.read() f.close() rulebook_length = len(rulebook) # Main parsing loop idx = 0 tab_count = 0 current_level = 0 current_rule = {} target_rule = None statement_cache = None new_line = True multiple_statements = False buff = "" bins = {} while idx < rulebook_length: char = rulebook[idx] if char == "\t": if buff == "": new_line = True tab_count += 1 else: buff += char else: if new_line == True: if tab_count > current_level + 1: raise Exception("Incorrect indentation") if rulebook[idx:idx+2] == "=>": # Skip over this, it's optional at the start of a line idx += 2 continue new_line = False if char == "\r": idx += 1 continue # Ignore, we don't want carriage returns elif char == "\n": # Process if buff.strip() == "": # Skip empty lines, we don't care about them idx += 1 tab_count = 0 continue current_level = tab_count tab_count = 0 if current_level == 0: bin_name = buff.strip() new_bin = Bin(bin_name) current_rule[current_level] = new_bin bins[bin_name] = new_bin else: if multiple_statements == True: new_rule = create_rule(buff, statement_cache) else: new_rule = create_rule(buff, current_rule[current_level - 1]) current_rule[current_level] = new_rule buff = "" new_line = True multiple_statements = False elif char == "=" and rulebook[idx + 1] == ">": # Next rule, same line! if multiple_statements == True: statement_cache = create_rule(buff, statement_cache) else: multiple_statements = True statement_cache = create_rule(buff, current_rule[tab_count - 1]) buff = "" idx += 1 # We read one extra character ahead else: # TODO: add entire chunks at once for speed buff += char idx += 1 # TODO: detect infinite loops via bins! for bin_name, bin_ in bins.iteritems(): pass#bin_.display(0) class Message(object): def __init__(self): self.id_ = "" self.type_ = "none" self.tags = [] self.source = "" self.chain = [] self.data = {} def set_data(self, data): self.id_ = data['id'] self.type_ = data['type'] self.tags = data['tags'] self.source = data['source'] self.chain = data['chain'] self.data = data['payload'] import timeit m = Message() m.set_data({ "id": "qwert-yuiop-61238-10842", "type": "task", "tags": ["convert", "mpeg"], "source": "abcde-fghij-00000-00008", "chain": ["abcde-fghij-00000-00005", "abcde-fghij-00000-00006"], "payload": { "command": "convert", "category": "video", "original_filetype": "mpg" } }) bins['remote'].process(m) #print min(timeit.Timer("bins['remote'].process(m)", "from __main__ import bins, m").repeat(7, 1000))