blob: ac04c24c3cec8f742a04835c2a3b69ffa951603f [file] [log] [blame] [edit]
#!/usr/bin/python
"""
Cartesian configuration format file parser.
Filter syntax:
, means OR
.. means AND
. means IMMEDIATELY-FOLLOWED-BY
Example:
qcow2..Fedora.14, RHEL.6..raw..boot, smp2..qcow2..migrate..ide
means match all dicts whose names have:
(qcow2 AND (Fedora IMMEDIATELY-FOLLOWED-BY 14)) OR
((RHEL IMMEDIATELY-FOLLOWED-BY 6) AND raw AND boot) OR
(smp2 AND qcow2 AND migrate AND ide)
Note:
'qcow2..Fedora.14' is equivalent to 'Fedora.14..qcow2'.
'qcow2..Fedora.14' is not equivalent to 'qcow2..14.Fedora'.
'ide, scsi' is equivalent to 'scsi, ide'.
Filters can be used in 3 ways:
only <filter>
no <filter>
<filter>:
The last one starts a conditional block.
@copyright: Red Hat 2008-2011
"""
import re, os, sys, optparse, collections
class ParserError:
def __init__(self, msg, line=None, filename=None, linenum=None):
self.msg = msg
self.line = line
self.filename = filename
self.linenum = linenum
def __str__(self):
if self.line:
return "%s: %r (%s:%s)" % (self.msg, self.line,
self.filename, self.linenum)
else:
return "%s (%s:%s)" % (self.msg, self.filename, self.linenum)
num_failed_cases = 5
class Node(object):
def __init__(self):
self.name = []
self.dep = []
self.content = []
self.children = []
self.labels = set()
self.append_to_shortname = False
self.failed_cases = collections.deque()
def _match_adjacent(block, ctx, ctx_set):
# TODO: explain what this function does
if block[0] not in ctx_set:
return 0
if len(block) == 1:
return 1
if block[1] not in ctx_set:
return int(ctx[-1] == block[0])
k = 0
i = ctx.index(block[0])
while i < len(ctx):
if k > 0 and ctx[i] != block[k]:
i -= k - 1
k = 0
if ctx[i] == block[k]:
k += 1
if k >= len(block):
break
if block[k] not in ctx_set:
break
i += 1
return k
def _might_match_adjacent(block, ctx, ctx_set, descendant_labels):
matched = _match_adjacent(block, ctx, ctx_set)
for elem in block[matched:]:
if elem not in descendant_labels:
return False
return True
# Filter must inherit from object (otherwise type() won't work)
class Filter(object):
def __init__(self, s):
self.filter = []
for char in s:
if not (char.isalnum() or char.isspace() or char in ".,_-"):
raise ParserError("Illegal characters in filter")
for word in s.replace(",", " ").split():
word = [block.split(".") for block in word.split("..")]
for block in word:
for elem in block:
if not elem:
raise ParserError("Syntax error")
self.filter += [word]
def match(self, ctx, ctx_set):
for word in self.filter:
for block in word:
if _match_adjacent(block, ctx, ctx_set) != len(block):
break
else:
return True
return False
def might_match(self, ctx, ctx_set, descendant_labels):
for word in self.filter:
for block in word:
if not _might_match_adjacent(block, ctx, ctx_set,
descendant_labels):
break
else:
return True
return False
class NoOnlyFilter(Filter):
def __init__(self, line):
Filter.__init__(self, line.split(None, 1)[1])
self.line = line
class OnlyFilter(NoOnlyFilter):
def is_irrelevant(self, ctx, ctx_set, descendant_labels):
return self.match(ctx, ctx_set)
def requires_action(self, ctx, ctx_set, descendant_labels):
return not self.might_match(ctx, ctx_set, descendant_labels)
def might_pass(self, failed_ctx, failed_ctx_set, ctx, ctx_set,
descendant_labels):
for word in self.filter:
for block in word:
if (_match_adjacent(block, ctx, ctx_set) >
_match_adjacent(block, failed_ctx, failed_ctx_set)):
return self.might_match(ctx, ctx_set, descendant_labels)
return False
class NoFilter(NoOnlyFilter):
def is_irrelevant(self, ctx, ctx_set, descendant_labels):
return not self.might_match(ctx, ctx_set, descendant_labels)
def requires_action(self, ctx, ctx_set, descendant_labels):
return self.match(ctx, ctx_set)
def might_pass(self, failed_ctx, failed_ctx_set, ctx, ctx_set,
descendant_labels):
for word in self.filter:
for block in word:
if (_match_adjacent(block, ctx, ctx_set) <
_match_adjacent(block, failed_ctx, failed_ctx_set)):
return not self.match(ctx, ctx_set)
return False
class Condition(NoFilter):
def __init__(self, line):
Filter.__init__(self, line.rstrip(":"))
self.line = line
self.content = []
class NegativeCondition(OnlyFilter):
def __init__(self, line):
Filter.__init__(self, line.lstrip("!").rstrip(":"))
self.line = line
self.content = []
class Parser(object):
"""
Parse an input file or string that follows the Cartesian Config File format
and generate a list of dicts that will be later used as configuration
parameters by autotest tests that use that format.
@see: http://autotest.kernel.org/wiki/CartesianConfig
"""
def __init__(self, filename=None, debug=False):
"""
Initialize the parser and optionally parse a file.
@param filename: Path of the file to parse.
@param debug: Whether to turn on debugging output.
"""
self.node = Node()
self.debug = debug
if filename:
self.parse_file(filename)
def parse_file(self, filename):
"""
Parse a file.
@param filename: Path of the configuration file.
"""
self.node = self._parse(FileReader(filename), self.node)
def parse_string(self, s):
"""
Parse a string.
@param s: String to parse.
"""
self.node = self._parse(StrReader(s), self.node)
def get_dicts(self, node=None, ctx=[], content=[], shortname=[], dep=[]):
"""
Generate dictionaries from the code parsed so far. This should
be called after parsing something.
@return: A dict generator.
"""
def process_content(content, failed_filters):
# 1. Check that the filters in content are OK with the current
# context (ctx).
# 2. Move the parts of content that are still relevant into
# new_content and unpack conditional blocks if appropriate.
# For example, if an 'only' statement fully matches ctx, it
# becomes irrelevant and is not appended to new_content.
# If a conditional block fully matches, its contents are
# unpacked into new_content.
# 3. Move failed filters into failed_filters, so that next time we
# reach this node or one of its ancestors, we'll check those
# filters first.
for t in content:
filename, linenum, obj = t
if type(obj) is Op:
new_content.append(t)
continue
# obj is an OnlyFilter/NoFilter/Condition/NegativeCondition
if obj.requires_action(ctx, ctx_set, labels):
# This filter requires action now
if type(obj) is OnlyFilter or type(obj) is NoFilter:
self._debug(" filter did not pass: %r (%s:%s)",
obj.line, filename, linenum)
failed_filters.append(t)
return False
else:
self._debug(" conditional block matches: %r (%s:%s)",
obj.line, filename, linenum)
# Check and unpack the content inside this Condition
# object (note: the failed filters should go into
# new_internal_filters because we don't expect them to
# come from outside this node, even if the Condition
# itself was external)
if not process_content(obj.content,
new_internal_filters):
failed_filters.append(t)
return False
continue
elif obj.is_irrelevant(ctx, ctx_set, labels):
# This filter is no longer relevant and can be removed
continue
else:
# Keep the filter and check it again later
new_content.append(t)
return True
def might_pass(failed_ctx,
failed_ctx_set,
failed_external_filters,
failed_internal_filters):
for t in failed_external_filters:
if t not in content:
return True
filename, linenum, filter = t
if filter.might_pass(failed_ctx, failed_ctx_set, ctx, ctx_set,
labels):
return True
for t in failed_internal_filters:
filename, linenum, filter = t
if filter.might_pass(failed_ctx, failed_ctx_set, ctx, ctx_set,
labels):
return True
return False
def add_failed_case():
node.failed_cases.appendleft((ctx, ctx_set,
new_external_filters,
new_internal_filters))
if len(node.failed_cases) > num_failed_cases:
node.failed_cases.pop()
node = node or self.node
# Update dep
for d in node.dep:
dep = dep + [".".join(ctx + [d])]
# Update ctx
ctx = ctx + node.name
ctx_set = set(ctx)
labels = node.labels
# Get the current name
name = ".".join(ctx)
if node.name:
self._debug("checking out %r", name)
# Check previously failed filters
for i, failed_case in enumerate(node.failed_cases):
if not might_pass(*failed_case):
self._debug(" this subtree has failed before")
del node.failed_cases[i]
node.failed_cases.appendleft(failed_case)
return
# Check content and unpack it into new_content
new_content = []
new_external_filters = []
new_internal_filters = []
if (not process_content(node.content, new_internal_filters) or
not process_content(content, new_external_filters)):
add_failed_case()
return
# Update shortname
if node.append_to_shortname:
shortname = shortname + node.name
# Recurse into children
count = 0
for n in node.children:
for d in self.get_dicts(n, ctx, new_content, shortname, dep):
count += 1
yield d
# Reached leaf?
if not node.children:
self._debug(" reached leaf, returning it")
d = {"name": name, "dep": dep, "shortname": ".".join(shortname)}
for filename, linenum, op in new_content:
op.apply_to_dict(d)
yield d
# If this node did not produce any dicts, remember the failed filters
# of its descendants
elif not count:
new_external_filters = []
new_internal_filters = []
for n in node.children:
(failed_ctx,
failed_ctx_set,
failed_external_filters,
failed_internal_filters) = n.failed_cases[0]
for obj in failed_internal_filters:
if obj not in new_internal_filters:
new_internal_filters.append(obj)
for obj in failed_external_filters:
if obj in content:
if obj not in new_external_filters:
new_external_filters.append(obj)
else:
if obj not in new_internal_filters:
new_internal_filters.append(obj)
add_failed_case()
def _debug(self, s, *args):
if self.debug:
s = "DEBUG: %s" % s
print s % args
def _warn(self, s, *args):
s = "WARNING: %s" % s
print s % args
def _parse_variants(self, cr, node, prev_indent=-1):
"""
Read and parse lines from a FileReader object until a line with an
indent level lower than or equal to prev_indent is encountered.
@param cr: A FileReader/StrReader object.
@param node: A node to operate on.
@param prev_indent: The indent level of the "parent" block.
@return: A node object.
"""
node4 = Node()
while True:
line, indent, linenum = cr.get_next_line(prev_indent)
if not line:
break
name, dep = map(str.strip, line.lstrip("- ").split(":", 1))
for char in name:
if not (char.isalnum() or char in "@._-"):
raise ParserError("Illegal characters in variant name",
line, cr.filename, linenum)
for char in dep:
if not (char.isalnum() or char.isspace() or char in ".,_-"):
raise ParserError("Illegal characters in dependencies",
line, cr.filename, linenum)
node2 = Node()
node2.children = [node]
node2.labels = node.labels
node3 = self._parse(cr, node2, prev_indent=indent)
node3.name = name.lstrip("@").split(".")
node3.dep = dep.replace(",", " ").split()
node3.append_to_shortname = not name.startswith("@")
node4.children += [node3]
node4.labels.update(node3.labels)
node4.labels.update(node3.name)
return node4
def _parse(self, cr, node, prev_indent=-1):
"""
Read and parse lines from a StrReader object until a line with an
indent level lower than or equal to prev_indent is encountered.
@param cr: A FileReader/StrReader object.
@param node: A Node or a Condition object to operate on.
@param prev_indent: The indent level of the "parent" block.
@return: A node object.
"""
while True:
line, indent, linenum = cr.get_next_line(prev_indent)
if not line:
break
words = line.split(None, 1)
# Parse 'variants'
if line == "variants:":
# 'variants' is not allowed inside a conditional block
if (isinstance(node, Condition) or
isinstance(node, NegativeCondition)):
raise ParserError("'variants' is not allowed inside a "
"conditional block",
None, cr.filename, linenum)
node = self._parse_variants(cr, node, prev_indent=indent)
continue
# Parse 'include' statements
if words[0] == "include":
if len(words) < 2:
raise ParserError("Syntax error: missing parameter",
line, cr.filename, linenum)
filename = os.path.expanduser(words[1])
if isinstance(cr, FileReader) and not os.path.isabs(filename):
filename = os.path.join(os.path.dirname(cr.filename),
filename)
if not os.path.isfile(filename):
self._warn("%r (%s:%s): file doesn't exist or is not a "
"regular file", line, cr.filename, linenum)
continue
node = self._parse(FileReader(filename), node)
continue
# Parse 'only' and 'no' filters
if words[0] in ("only", "no"):
if len(words) < 2:
raise ParserError("Syntax error: missing parameter",
line, cr.filename, linenum)
try:
if words[0] == "only":
f = OnlyFilter(line)
elif words[0] == "no":
f = NoFilter(line)
except ParserError, e:
e.line = line
e.filename = cr.filename
e.linenum = linenum
raise
node.content += [(cr.filename, linenum, f)]
continue
# Look for operators
op_match = _ops_exp.search(line)
# Parse conditional blocks
if ":" in line:
index = line.index(":")
if not op_match or index < op_match.start():
index += 1
cr.set_next_line(line[index:], indent, linenum)
line = line[:index]
try:
if line.startswith("!"):
cond = NegativeCondition(line)
else:
cond = Condition(line)
except ParserError, e:
e.line = line
e.filename = cr.filename
e.linenum = linenum
raise
self._parse(cr, cond, prev_indent=indent)
node.content += [(cr.filename, linenum, cond)]
continue
# Parse regular operators
if not op_match:
raise ParserError("Syntax error", line, cr.filename, linenum)
node.content += [(cr.filename, linenum, Op(line, op_match))]
return node
# Assignment operators
_reserved_keys = set(("name", "shortname", "dep"))
def _op_set(d, key, value):
if key not in _reserved_keys:
d[key] = value
def _op_append(d, key, value):
if key not in _reserved_keys:
d[key] = d.get(key, "") + value
def _op_prepend(d, key, value):
if key not in _reserved_keys:
d[key] = value + d.get(key, "")
def _op_regex_set(d, exp, value):
exp = re.compile("%s$" % exp)
for key in d:
if key not in _reserved_keys and exp.match(key):
d[key] = value
def _op_regex_append(d, exp, value):
exp = re.compile("%s$" % exp)
for key in d:
if key not in _reserved_keys and exp.match(key):
d[key] += value
def _op_regex_prepend(d, exp, value):
exp = re.compile("%s$" % exp)
for key in d:
if key not in _reserved_keys and exp.match(key):
d[key] = value + d[key]
def _op_regex_del(d, empty, exp):
exp = re.compile("%s$" % exp)
for key in d.keys():
if key not in _reserved_keys and exp.match(key):
del d[key]
_ops = {"=": (r"\=", _op_set),
"+=": (r"\+\=", _op_append),
"<=": (r"\<\=", _op_prepend),
"?=": (r"\?\=", _op_regex_set),
"?+=": (r"\?\+\=", _op_regex_append),
"?<=": (r"\?\<\=", _op_regex_prepend),
"del": (r"^del\b", _op_regex_del)}
_ops_exp = re.compile("|".join([op[0] for op in _ops.values()]))
class Op(object):
def __init__(self, line, m):
self.func = _ops[m.group()][1]
self.key = line[:m.start()].strip()
value = line[m.end():].strip()
if value and (value[0] == value[-1] == '"' or
value[0] == value[-1] == "'"):
value = value[1:-1]
self.value = value
def apply_to_dict(self, d):
self.func(d, self.key, self.value)
# StrReader and FileReader
class StrReader(object):
"""
Preprocess an input string for easy reading.
"""
def __init__(self, s):
"""
Initialize the reader.
@param s: The string to parse.
"""
self.filename = "<string>"
self._lines = []
self._line_index = 0
self._stored_line = None
for linenum, line in enumerate(s.splitlines()):
line = line.rstrip().expandtabs()
stripped_line = line.lstrip()
indent = len(line) - len(stripped_line)
if (not stripped_line
or stripped_line.startswith("#")
or stripped_line.startswith("//")):
continue
self._lines.append((stripped_line, indent, linenum + 1))
def get_next_line(self, prev_indent):
"""
Get the next line in the current block.
@param prev_indent: The indentation level of the previous block.
@return: (line, indent, linenum), where indent is the line's
indentation level. If no line is available, (None, -1, -1) is
returned.
"""
if self._stored_line:
ret = self._stored_line
self._stored_line = None
return ret
if self._line_index >= len(self._lines):
return None, -1, -1
line, indent, linenum = self._lines[self._line_index]
if indent <= prev_indent:
return None, -1, -1
self._line_index += 1
return line, indent, linenum
def set_next_line(self, line, indent, linenum):
"""
Make the next call to get_next_line() return the given line instead of
the real next line.
"""
line = line.strip()
if line:
self._stored_line = line, indent, linenum
class FileReader(StrReader):
"""
Preprocess an input file for easy reading.
"""
def __init__(self, filename):
"""
Initialize the reader.
@parse filename: The name of the input file.
"""
StrReader.__init__(self, open(filename).read())
self.filename = filename
if __name__ == "__main__":
parser = optparse.OptionParser('usage: %prog [options] filename '
'[extra code] ...\n\nExample:\n\n '
'%prog tests.cfg "only my_set" "no qcow2"')
parser.add_option("-v", "--verbose", dest="debug", action="store_true",
help="include debug messages in console output")
parser.add_option("-f", "--fullname", dest="fullname", action="store_true",
help="show full dict names instead of short names")
parser.add_option("-c", "--contents", dest="contents", action="store_true",
help="show dict contents")
options, args = parser.parse_args()
if not args:
parser.error("filename required")
c = Parser(args[0], debug=options.debug)
for s in args[1:]:
c.parse_string(s)
for i, d in enumerate(c.get_dicts()):
if options.fullname:
print "dict %4d: %s" % (i + 1, d["name"])
else:
print "dict %4d: %s" % (i + 1, d["shortname"])
if options.contents:
keys = d.keys()
keys.sort()
for key in keys:
print " %s = %s" % (key, d[key])