| #!/usr/bin/python |
| """ |
| Cartesian configuration format file parser. |
| |
| Filter syntax: |
| , means OR |
| .. means AND |
| . means IMMEDIATELY-FOLLOWED-BY |
| |
| Example: |
| qcow2..Fedora.14, RHEL.6..raw..boot, smp2..qcow2..migrate..ide |
| means match all dicts whose names have: |
| (qcow2 AND (Fedora IMMEDIATELY-FOLLOWED-BY 14)) OR |
| ((RHEL IMMEDIATELY-FOLLOWED-BY 6) AND raw AND boot) OR |
| (smp2 AND qcow2 AND migrate AND ide) |
| |
| Note: |
| 'qcow2..Fedora.14' is equivalent to 'Fedora.14..qcow2'. |
| 'qcow2..Fedora.14' is not equivalent to 'qcow2..14.Fedora'. |
| 'ide, scsi' is equivalent to 'scsi, ide'. |
| |
| Filters can be used in 3 ways: |
| only <filter> |
| no <filter> |
| <filter>: |
| The last one starts a conditional block. |
| |
| @copyright: Red Hat 2008-2011 |
| """ |
| |
| import re, os, sys, optparse, collections |
| |
| class ParserError: |
| def __init__(self, msg, line=None, filename=None, linenum=None): |
| self.msg = msg |
| self.line = line |
| self.filename = filename |
| self.linenum = linenum |
| |
| def __str__(self): |
| if self.line: |
| return "%s: %r (%s:%s)" % (self.msg, self.line, |
| self.filename, self.linenum) |
| else: |
| return "%s (%s:%s)" % (self.msg, self.filename, self.linenum) |
| |
| |
| num_failed_cases = 5 |
| |
| |
| class Node(object): |
| def __init__(self): |
| self.name = [] |
| self.dep = [] |
| self.content = [] |
| self.children = [] |
| self.labels = set() |
| self.append_to_shortname = False |
| self.failed_cases = collections.deque() |
| |
| |
| def _match_adjacent(block, ctx, ctx_set): |
| # TODO: explain what this function does |
| if block[0] not in ctx_set: |
| return 0 |
| if len(block) == 1: |
| return 1 |
| if block[1] not in ctx_set: |
| return int(ctx[-1] == block[0]) |
| k = 0 |
| i = ctx.index(block[0]) |
| while i < len(ctx): |
| if k > 0 and ctx[i] != block[k]: |
| i -= k - 1 |
| k = 0 |
| if ctx[i] == block[k]: |
| k += 1 |
| if k >= len(block): |
| break |
| if block[k] not in ctx_set: |
| break |
| i += 1 |
| return k |
| |
| |
| def _might_match_adjacent(block, ctx, ctx_set, descendant_labels): |
| matched = _match_adjacent(block, ctx, ctx_set) |
| for elem in block[matched:]: |
| if elem not in descendant_labels: |
| return False |
| return True |
| |
| |
| # Filter must inherit from object (otherwise type() won't work) |
| class Filter(object): |
| def __init__(self, s): |
| self.filter = [] |
| for char in s: |
| if not (char.isalnum() or char.isspace() or char in ".,_-"): |
| raise ParserError("Illegal characters in filter") |
| for word in s.replace(",", " ").split(): |
| word = [block.split(".") for block in word.split("..")] |
| for block in word: |
| for elem in block: |
| if not elem: |
| raise ParserError("Syntax error") |
| self.filter += [word] |
| |
| |
| def match(self, ctx, ctx_set): |
| for word in self.filter: |
| for block in word: |
| if _match_adjacent(block, ctx, ctx_set) != len(block): |
| break |
| else: |
| return True |
| return False |
| |
| |
| def might_match(self, ctx, ctx_set, descendant_labels): |
| for word in self.filter: |
| for block in word: |
| if not _might_match_adjacent(block, ctx, ctx_set, |
| descendant_labels): |
| break |
| else: |
| return True |
| return False |
| |
| |
| class NoOnlyFilter(Filter): |
| def __init__(self, line): |
| Filter.__init__(self, line.split(None, 1)[1]) |
| self.line = line |
| |
| |
| class OnlyFilter(NoOnlyFilter): |
| def is_irrelevant(self, ctx, ctx_set, descendant_labels): |
| return self.match(ctx, ctx_set) |
| |
| |
| def requires_action(self, ctx, ctx_set, descendant_labels): |
| return not self.might_match(ctx, ctx_set, descendant_labels) |
| |
| |
| def might_pass(self, failed_ctx, failed_ctx_set, ctx, ctx_set, |
| descendant_labels): |
| for word in self.filter: |
| for block in word: |
| if (_match_adjacent(block, ctx, ctx_set) > |
| _match_adjacent(block, failed_ctx, failed_ctx_set)): |
| return self.might_match(ctx, ctx_set, descendant_labels) |
| return False |
| |
| |
| class NoFilter(NoOnlyFilter): |
| def is_irrelevant(self, ctx, ctx_set, descendant_labels): |
| return not self.might_match(ctx, ctx_set, descendant_labels) |
| |
| |
| def requires_action(self, ctx, ctx_set, descendant_labels): |
| return self.match(ctx, ctx_set) |
| |
| |
| def might_pass(self, failed_ctx, failed_ctx_set, ctx, ctx_set, |
| descendant_labels): |
| for word in self.filter: |
| for block in word: |
| if (_match_adjacent(block, ctx, ctx_set) < |
| _match_adjacent(block, failed_ctx, failed_ctx_set)): |
| return not self.match(ctx, ctx_set) |
| return False |
| |
| |
| class Condition(NoFilter): |
| def __init__(self, line): |
| Filter.__init__(self, line.rstrip(":")) |
| self.line = line |
| self.content = [] |
| |
| |
| class NegativeCondition(OnlyFilter): |
| def __init__(self, line): |
| Filter.__init__(self, line.lstrip("!").rstrip(":")) |
| self.line = line |
| self.content = [] |
| |
| |
| class Parser(object): |
| """ |
| Parse an input file or string that follows the Cartesian Config File format |
| and generate a list of dicts that will be later used as configuration |
| parameters by autotest tests that use that format. |
| |
| @see: http://autotest.kernel.org/wiki/CartesianConfig |
| """ |
| |
| def __init__(self, filename=None, debug=False): |
| """ |
| Initialize the parser and optionally parse a file. |
| |
| @param filename: Path of the file to parse. |
| @param debug: Whether to turn on debugging output. |
| """ |
| self.node = Node() |
| self.debug = debug |
| if filename: |
| self.parse_file(filename) |
| |
| |
| def parse_file(self, filename): |
| """ |
| Parse a file. |
| |
| @param filename: Path of the configuration file. |
| """ |
| self.node = self._parse(FileReader(filename), self.node) |
| |
| |
| def parse_string(self, s): |
| """ |
| Parse a string. |
| |
| @param s: String to parse. |
| """ |
| self.node = self._parse(StrReader(s), self.node) |
| |
| |
| def get_dicts(self, node=None, ctx=[], content=[], shortname=[], dep=[]): |
| """ |
| Generate dictionaries from the code parsed so far. This should |
| be called after parsing something. |
| |
| @return: A dict generator. |
| """ |
| def process_content(content, failed_filters): |
| # 1. Check that the filters in content are OK with the current |
| # context (ctx). |
| # 2. Move the parts of content that are still relevant into |
| # new_content and unpack conditional blocks if appropriate. |
| # For example, if an 'only' statement fully matches ctx, it |
| # becomes irrelevant and is not appended to new_content. |
| # If a conditional block fully matches, its contents are |
| # unpacked into new_content. |
| # 3. Move failed filters into failed_filters, so that next time we |
| # reach this node or one of its ancestors, we'll check those |
| # filters first. |
| for t in content: |
| filename, linenum, obj = t |
| if type(obj) is Op: |
| new_content.append(t) |
| continue |
| # obj is an OnlyFilter/NoFilter/Condition/NegativeCondition |
| if obj.requires_action(ctx, ctx_set, labels): |
| # This filter requires action now |
| if type(obj) is OnlyFilter or type(obj) is NoFilter: |
| self._debug(" filter did not pass: %r (%s:%s)", |
| obj.line, filename, linenum) |
| failed_filters.append(t) |
| return False |
| else: |
| self._debug(" conditional block matches: %r (%s:%s)", |
| obj.line, filename, linenum) |
| # Check and unpack the content inside this Condition |
| # object (note: the failed filters should go into |
| # new_internal_filters because we don't expect them to |
| # come from outside this node, even if the Condition |
| # itself was external) |
| if not process_content(obj.content, |
| new_internal_filters): |
| failed_filters.append(t) |
| return False |
| continue |
| elif obj.is_irrelevant(ctx, ctx_set, labels): |
| # This filter is no longer relevant and can be removed |
| continue |
| else: |
| # Keep the filter and check it again later |
| new_content.append(t) |
| return True |
| |
| def might_pass(failed_ctx, |
| failed_ctx_set, |
| failed_external_filters, |
| failed_internal_filters): |
| for t in failed_external_filters: |
| if t not in content: |
| return True |
| filename, linenum, filter = t |
| if filter.might_pass(failed_ctx, failed_ctx_set, ctx, ctx_set, |
| labels): |
| return True |
| for t in failed_internal_filters: |
| filename, linenum, filter = t |
| if filter.might_pass(failed_ctx, failed_ctx_set, ctx, ctx_set, |
| labels): |
| return True |
| return False |
| |
| def add_failed_case(): |
| node.failed_cases.appendleft((ctx, ctx_set, |
| new_external_filters, |
| new_internal_filters)) |
| if len(node.failed_cases) > num_failed_cases: |
| node.failed_cases.pop() |
| |
| node = node or self.node |
| # Update dep |
| for d in node.dep: |
| dep = dep + [".".join(ctx + [d])] |
| # Update ctx |
| ctx = ctx + node.name |
| ctx_set = set(ctx) |
| labels = node.labels |
| # Get the current name |
| name = ".".join(ctx) |
| if node.name: |
| self._debug("checking out %r", name) |
| # Check previously failed filters |
| for i, failed_case in enumerate(node.failed_cases): |
| if not might_pass(*failed_case): |
| self._debug(" this subtree has failed before") |
| del node.failed_cases[i] |
| node.failed_cases.appendleft(failed_case) |
| return |
| # Check content and unpack it into new_content |
| new_content = [] |
| new_external_filters = [] |
| new_internal_filters = [] |
| if (not process_content(node.content, new_internal_filters) or |
| not process_content(content, new_external_filters)): |
| add_failed_case() |
| return |
| # Update shortname |
| if node.append_to_shortname: |
| shortname = shortname + node.name |
| # Recurse into children |
| count = 0 |
| for n in node.children: |
| for d in self.get_dicts(n, ctx, new_content, shortname, dep): |
| count += 1 |
| yield d |
| # Reached leaf? |
| if not node.children: |
| self._debug(" reached leaf, returning it") |
| d = {"name": name, "dep": dep, "shortname": ".".join(shortname)} |
| for filename, linenum, op in new_content: |
| op.apply_to_dict(d) |
| yield d |
| # If this node did not produce any dicts, remember the failed filters |
| # of its descendants |
| elif not count: |
| new_external_filters = [] |
| new_internal_filters = [] |
| for n in node.children: |
| (failed_ctx, |
| failed_ctx_set, |
| failed_external_filters, |
| failed_internal_filters) = n.failed_cases[0] |
| for obj in failed_internal_filters: |
| if obj not in new_internal_filters: |
| new_internal_filters.append(obj) |
| for obj in failed_external_filters: |
| if obj in content: |
| if obj not in new_external_filters: |
| new_external_filters.append(obj) |
| else: |
| if obj not in new_internal_filters: |
| new_internal_filters.append(obj) |
| add_failed_case() |
| |
| |
| def _debug(self, s, *args): |
| if self.debug: |
| s = "DEBUG: %s" % s |
| print s % args |
| |
| |
| def _warn(self, s, *args): |
| s = "WARNING: %s" % s |
| print s % args |
| |
| |
| def _parse_variants(self, cr, node, prev_indent=-1): |
| """ |
| Read and parse lines from a FileReader object until a line with an |
| indent level lower than or equal to prev_indent is encountered. |
| |
| @param cr: A FileReader/StrReader object. |
| @param node: A node to operate on. |
| @param prev_indent: The indent level of the "parent" block. |
| @return: A node object. |
| """ |
| node4 = Node() |
| |
| while True: |
| line, indent, linenum = cr.get_next_line(prev_indent) |
| if not line: |
| break |
| |
| name, dep = map(str.strip, line.lstrip("- ").split(":", 1)) |
| for char in name: |
| if not (char.isalnum() or char in "@._-"): |
| raise ParserError("Illegal characters in variant name", |
| line, cr.filename, linenum) |
| for char in dep: |
| if not (char.isalnum() or char.isspace() or char in ".,_-"): |
| raise ParserError("Illegal characters in dependencies", |
| line, cr.filename, linenum) |
| |
| node2 = Node() |
| node2.children = [node] |
| node2.labels = node.labels |
| |
| node3 = self._parse(cr, node2, prev_indent=indent) |
| node3.name = name.lstrip("@").split(".") |
| node3.dep = dep.replace(",", " ").split() |
| node3.append_to_shortname = not name.startswith("@") |
| |
| node4.children += [node3] |
| node4.labels.update(node3.labels) |
| node4.labels.update(node3.name) |
| |
| return node4 |
| |
| |
| def _parse(self, cr, node, prev_indent=-1): |
| """ |
| Read and parse lines from a StrReader object until a line with an |
| indent level lower than or equal to prev_indent is encountered. |
| |
| @param cr: A FileReader/StrReader object. |
| @param node: A Node or a Condition object to operate on. |
| @param prev_indent: The indent level of the "parent" block. |
| @return: A node object. |
| """ |
| while True: |
| line, indent, linenum = cr.get_next_line(prev_indent) |
| if not line: |
| break |
| |
| words = line.split(None, 1) |
| |
| # Parse 'variants' |
| if line == "variants:": |
| # 'variants' is not allowed inside a conditional block |
| if (isinstance(node, Condition) or |
| isinstance(node, NegativeCondition)): |
| raise ParserError("'variants' is not allowed inside a " |
| "conditional block", |
| None, cr.filename, linenum) |
| node = self._parse_variants(cr, node, prev_indent=indent) |
| continue |
| |
| # Parse 'include' statements |
| if words[0] == "include": |
| if len(words) < 2: |
| raise ParserError("Syntax error: missing parameter", |
| line, cr.filename, linenum) |
| filename = os.path.expanduser(words[1]) |
| if isinstance(cr, FileReader) and not os.path.isabs(filename): |
| filename = os.path.join(os.path.dirname(cr.filename), |
| filename) |
| if not os.path.isfile(filename): |
| self._warn("%r (%s:%s): file doesn't exist or is not a " |
| "regular file", line, cr.filename, linenum) |
| continue |
| node = self._parse(FileReader(filename), node) |
| continue |
| |
| # Parse 'only' and 'no' filters |
| if words[0] in ("only", "no"): |
| if len(words) < 2: |
| raise ParserError("Syntax error: missing parameter", |
| line, cr.filename, linenum) |
| try: |
| if words[0] == "only": |
| f = OnlyFilter(line) |
| elif words[0] == "no": |
| f = NoFilter(line) |
| except ParserError, e: |
| e.line = line |
| e.filename = cr.filename |
| e.linenum = linenum |
| raise |
| node.content += [(cr.filename, linenum, f)] |
| continue |
| |
| # Look for operators |
| op_match = _ops_exp.search(line) |
| |
| # Parse conditional blocks |
| if ":" in line: |
| index = line.index(":") |
| if not op_match or index < op_match.start(): |
| index += 1 |
| cr.set_next_line(line[index:], indent, linenum) |
| line = line[:index] |
| try: |
| if line.startswith("!"): |
| cond = NegativeCondition(line) |
| else: |
| cond = Condition(line) |
| except ParserError, e: |
| e.line = line |
| e.filename = cr.filename |
| e.linenum = linenum |
| raise |
| self._parse(cr, cond, prev_indent=indent) |
| node.content += [(cr.filename, linenum, cond)] |
| continue |
| |
| # Parse regular operators |
| if not op_match: |
| raise ParserError("Syntax error", line, cr.filename, linenum) |
| node.content += [(cr.filename, linenum, Op(line, op_match))] |
| |
| return node |
| |
| |
| # Assignment operators |
| |
| _reserved_keys = set(("name", "shortname", "dep")) |
| |
| |
| def _op_set(d, key, value): |
| if key not in _reserved_keys: |
| d[key] = value |
| |
| |
| def _op_append(d, key, value): |
| if key not in _reserved_keys: |
| d[key] = d.get(key, "") + value |
| |
| |
| def _op_prepend(d, key, value): |
| if key not in _reserved_keys: |
| d[key] = value + d.get(key, "") |
| |
| |
| def _op_regex_set(d, exp, value): |
| exp = re.compile("%s$" % exp) |
| for key in d: |
| if key not in _reserved_keys and exp.match(key): |
| d[key] = value |
| |
| |
| def _op_regex_append(d, exp, value): |
| exp = re.compile("%s$" % exp) |
| for key in d: |
| if key not in _reserved_keys and exp.match(key): |
| d[key] += value |
| |
| |
| def _op_regex_prepend(d, exp, value): |
| exp = re.compile("%s$" % exp) |
| for key in d: |
| if key not in _reserved_keys and exp.match(key): |
| d[key] = value + d[key] |
| |
| |
| def _op_regex_del(d, empty, exp): |
| exp = re.compile("%s$" % exp) |
| for key in d.keys(): |
| if key not in _reserved_keys and exp.match(key): |
| del d[key] |
| |
| |
| _ops = {"=": (r"\=", _op_set), |
| "+=": (r"\+\=", _op_append), |
| "<=": (r"\<\=", _op_prepend), |
| "?=": (r"\?\=", _op_regex_set), |
| "?+=": (r"\?\+\=", _op_regex_append), |
| "?<=": (r"\?\<\=", _op_regex_prepend), |
| "del": (r"^del\b", _op_regex_del)} |
| |
| _ops_exp = re.compile("|".join([op[0] for op in _ops.values()])) |
| |
| |
| class Op(object): |
| def __init__(self, line, m): |
| self.func = _ops[m.group()][1] |
| self.key = line[:m.start()].strip() |
| value = line[m.end():].strip() |
| if value and (value[0] == value[-1] == '"' or |
| value[0] == value[-1] == "'"): |
| value = value[1:-1] |
| self.value = value |
| |
| |
| def apply_to_dict(self, d): |
| self.func(d, self.key, self.value) |
| |
| |
| # StrReader and FileReader |
| |
| class StrReader(object): |
| """ |
| Preprocess an input string for easy reading. |
| """ |
| def __init__(self, s): |
| """ |
| Initialize the reader. |
| |
| @param s: The string to parse. |
| """ |
| self.filename = "<string>" |
| self._lines = [] |
| self._line_index = 0 |
| self._stored_line = None |
| for linenum, line in enumerate(s.splitlines()): |
| line = line.rstrip().expandtabs() |
| stripped_line = line.lstrip() |
| indent = len(line) - len(stripped_line) |
| if (not stripped_line |
| or stripped_line.startswith("#") |
| or stripped_line.startswith("//")): |
| continue |
| self._lines.append((stripped_line, indent, linenum + 1)) |
| |
| |
| def get_next_line(self, prev_indent): |
| """ |
| Get the next line in the current block. |
| |
| @param prev_indent: The indentation level of the previous block. |
| @return: (line, indent, linenum), where indent is the line's |
| indentation level. If no line is available, (None, -1, -1) is |
| returned. |
| """ |
| if self._stored_line: |
| ret = self._stored_line |
| self._stored_line = None |
| return ret |
| if self._line_index >= len(self._lines): |
| return None, -1, -1 |
| line, indent, linenum = self._lines[self._line_index] |
| if indent <= prev_indent: |
| return None, -1, -1 |
| self._line_index += 1 |
| return line, indent, linenum |
| |
| |
| def set_next_line(self, line, indent, linenum): |
| """ |
| Make the next call to get_next_line() return the given line instead of |
| the real next line. |
| """ |
| line = line.strip() |
| if line: |
| self._stored_line = line, indent, linenum |
| |
| |
| class FileReader(StrReader): |
| """ |
| Preprocess an input file for easy reading. |
| """ |
| def __init__(self, filename): |
| """ |
| Initialize the reader. |
| |
| @parse filename: The name of the input file. |
| """ |
| StrReader.__init__(self, open(filename).read()) |
| self.filename = filename |
| |
| |
| if __name__ == "__main__": |
| parser = optparse.OptionParser('usage: %prog [options] filename ' |
| '[extra code] ...\n\nExample:\n\n ' |
| '%prog tests.cfg "only my_set" "no qcow2"') |
| parser.add_option("-v", "--verbose", dest="debug", action="store_true", |
| help="include debug messages in console output") |
| parser.add_option("-f", "--fullname", dest="fullname", action="store_true", |
| help="show full dict names instead of short names") |
| parser.add_option("-c", "--contents", dest="contents", action="store_true", |
| help="show dict contents") |
| |
| options, args = parser.parse_args() |
| if not args: |
| parser.error("filename required") |
| |
| c = Parser(args[0], debug=options.debug) |
| for s in args[1:]: |
| c.parse_string(s) |
| |
| for i, d in enumerate(c.get_dicts()): |
| if options.fullname: |
| print "dict %4d: %s" % (i + 1, d["name"]) |
| else: |
| print "dict %4d: %s" % (i + 1, d["shortname"]) |
| if options.contents: |
| keys = d.keys() |
| keys.sort() |
| for key in keys: |
| print " %s = %s" % (key, d[key]) |