blob: f5c79582f5d016e7c3830acaa8d264998931e01c [file] [log] [blame]
#!/usr/bin/env python
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# Style note: dbusfuzz.py is a stand-alone tool independent of autotest.
# It should adhere to PEP-8, rather than local autotest style.
"""Perform API Fuzzing of DBus interfaces."""
import dbus
import logging
import optparse
import re
import subprocess
import sys
import yaml
class LoggedDbus(object):
"""Convenience wrapper around dbus, adds pretend-mode and logging."""
def __init__(self, bus=dbus.SystemBus(), logger=logging.debug,
pretend=False):
self.bus = bus
self.logger = logger
self.pretend = pretend
self.runcount = 0
def _GetBareMethod(self, dest, path, iface, method):
"""Get a reference to the specified DBus method."""
remote_object = self.bus.get_object(dest, path)
method_ref = remote_object.get_dbus_method(method, dbus_interface=iface)
return method_ref
def _LogCall(self, dest, path, iface, method, *args):
self.logger('TEST #%d: %s %s %s->%s with %s',
self.runcount, dest, path, iface, method, repr(args))
def GetMethod(self, dest, path, iface, method):
"""Return a (wrapped, for logging) reference to the a DBus method."""
if not self.pretend:
method_ref = self._GetBareMethod(dest, path, iface, method)
# Work around python's lack of multi-line lambdas.
wrapped_method = lambda *args: [
self._LogCall(dest, path, iface, method, args),
None if self.pretend else apply(method_ref, args)
][1]
return wrapped_method
class ProcessMonitor(object):
"""Watch for target process to disappear."""
# TODO(jimhebert) Determine if we would be better served with
# a deeper process monitoring strategy, e.g. one based on
# python-ptrace. If so, borrow or write one as needed.
def __init__(self, process_name):
self.process_name = process_name
self.pids = self._Pids()
if not self.pids:
raise Exception('No processes by that name, nothing to monitor')
logging.debug('Found pids for %s: %s' %
(self.process_name, ', '.join(self.pids)))
def _Pids(self):
# Look up (potentially multiple) pid(s). This helps when fuzzing
# e.g. udisks-daemon, which runs 2 processes, and we don't want to
# have to guess which one to care about. Would not work very well
# if we were fuzzing something like Apache where child processes
# come and go naturally.
try:
return set(subprocess.Popen(['pgrep', self.process_name],
stdout=subprocess.PIPE
).communicate()[0].splitlines())
except subprocess.CalledProcessError:
return set([])
def Fault(self):
"""Returns a boolean indicating if your monitored process
has died since you began monitoring. Returns True if so,
even if the process was restarted by e.g. init.
"""
return self.pids != self._Pids()
class BaseMutator(object):
"""Base class for Mutators. The @classmethods are exposed for
direct use. Other than that, it is intended as a base and not
meant to be instantiated directly.
"""
def __init__(self, orig_val, config={}):
self.orig_val = orig_val
self.config = config
def Mutations(self):
"""Stub. Implement datatype-specific mutations in your sub-classes.
Callers of Mutations depend on the original value being yielded
first.
"""
yield self.orig_val
@classmethod
def GetMutators(cls):
"""Returns a dictionary of all known Mutator classes, mapping the
class name (as a string) to a reference to the actual class.
"""
mutators = {}
for subclass in cls.__subclasses__():
mutators[subclass.__name__] = subclass
mutators.update(subclass.GetMutators())
return mutators
class UInt32Mutator(BaseMutator):
"""Mutator for the DBus 'UInt32' type."""
BITCT = 32
def Mutations(self):
yield self.orig_val
# This Mutator tends to produce redundant test cases, especially
# between the bit-flip round and the boundary-case round.
# Accumulate all of the values and enforce uniqueness before
# returning any of them.
mutations = set([])
mutations.add(0)
# Flip individual bits.
for i in range(self.BITCT):
mutations.add(self.orig_val ^ (1 << i))
# Look for boundary cases.
for i in range(self.BITCT):
val = 1 << i
mutations.add(val-1)
mutations.add(val)
mutations.add(val+1)
# Now yield all the mutations. Ensure we do not repeat the orig_val.
if self.orig_val in mutations:
mutations.remove(self.orig_val)
for m in sorted(mutations):
if m < (1 << (self.BITCT - 1)):
yield m
class BoolMutator(BaseMutator):
"""Included for the sake of completeness."""
def Mutations(self):
"""Returns a generator, yielding both boolean values. In order to
remain consistent with other Mutators, returns the original
value first, as a 'good' value.
"""
# With the current "change 1 variable at a time" mutation strategy,
# this is of questionable value, but later if we enable the fuzzing
# engine to do, say, every possible combination of (A x B) mutations,
# then this could help increase the coverage gained from those mutated
# values, by exposing them to some "True" and "False" codepaths.
yield self.orig_val
yield (not self.orig_val)
class StringMutator(BaseMutator):
"""Basic string mutator to generate fuzzy inputs."""
# TODO(jimhebert) continue to look for an existing string
# mutator from some other package that is loosely-coupled
# enough to be re-used here. Or give up and build this
# one out, since this is basically no more than a dumb
# placeholder right now.
LENGTH_FACTOR = 17
def Mutations(self):
"""Returns a generator, yielding a series of mutated test strings."""
yield self.orig_val
for i in range(self.LENGTH_FACTOR):
yield (self.orig_val + ('A' * (1 << i)))
class DbusFuzzer(object):
"""Main class encapsulating the DBus Fuzzer."""
# Some constants returned by FuzzerMain
PASS = 'PASS'
FAIL = 'FAIL'
DONE = 'DONE'
_mutators = BaseMutator.GetMutators()
def __init__(self, fuzzplan, start_at=0, stop_at=None, pretend=False):
self.LoadFuzzPlan(fuzzplan)
if self.fuzzplan['bus'] == "SystemBus":
rawbus = dbus.SystemBus()
elif self.fuzzplan['bus'] == "SessionBus":
rawbus = dbus.SessionBus()
else:
rawbus = dbus.bus.BusConnection(self.fuzzplan['bus'])
self.bus = LoggedDbus(bus=rawbus, pretend=pretend)
self.start_at = start_at
self.stop_at = stop_at
self.pretend = pretend
def MakeMutator(self, cfg):
"""Given a dictionary like:
{'mutator': 'StringMutator', 'value': 'anything',
'arbitrary': 'other', 'stuff': 'also'}
Constructs an instance of the specified Mutator class,
initialized with a starting good value as specified
in the 'value' key. Any additional key/value pairs are
preserved, permitting Mutator-specific configuration
options.
"""
mutclass = cfg['mutator']
val = cfg['value']
del cfg['mutator']
del cfg['value']
# Resolve the Mutator class name (as a string) to a reference
# to the actual class, without using 'eval'.
if not mutclass in self._mutators:
raise Exception('Unrecognized Mutator class', mutclass)
return self._mutators[mutclass](val, config=cfg).Mutations()
def ArglistPermutations(self, args):
"""Given a list of arguments, this function returns a generator
that you can iterate to get mutations of the argument list.
Items from the argument list which, themselves, are generators,
are themselves iterated, one at a time. Generator arguments should
yield some "good" value as their first yield, since it will be used
repeatedly. The "bad" values -- the rest of them -- will be used once.
The permutations are generated with a "change only 1 variable at a
time" strategy. E.g. in a 3-item arglist, you will get permutations
like:
[good1, good2, good3]
[mutation1, good2, good3]
[mutation2, good2, good3]
... until the first argument's generator is exhausted. Then:
[good1, mutation1, good3]
[good1, mutation2, good3]
... and so on.
Items which are not generators are taken to be "fixed," non-mutating
arguments.
"""
# Save off the first yields from each of the args, since we'll
# need them repeatedly and only have one chance to save them.
base_args = []
# Can't use "for arg in args" because we need to assign-overtop
# items which represent requested Mutators.
for i in range(len(args)):
arg = args[i]
# If arg is one of the {mutator: SomeMutator...} dicts from
# the fuzzplan, we need to swap in the appropriate object.
if (hasattr(arg, 'keys') and 'mutator' in arg.keys() and
'value' in arg.keys()):
base_args.append(arg['value'])
args[i] = self.MakeMutator(arg)
else: # for fixed, non-mutation-generator args.
base_args.append(arg)
# Emit this case, since this is a valid permutation, before moving
# on to further mutations.
yield base_args
# Now we need to target each slot in the arglist, one at a time,
# yielding a series of permutations as described in the docstring.
slotnum = 0
while slotnum < len(args):
mutated_args = base_args[:]
if hasattr(args[slotnum], 'next'):
for mutation in args[slotnum]:
mutated_args[slotnum] = mutation
yield mutated_args
# Else, the non-mutated form was already emitted by the first yield
# above, so nothing to do in that case.
slotnum += 1
def LoadFuzzPlan(self, planfile):
"""Given a path to a YAML-encoded fuzzplan file, deserializes
and sanity-checks that configuration.
"""
self.fuzzplan = yaml.load(file(planfile))
# Basic sanity check that what we loaded looks like a fuzzplan.
if self.fuzzplan['fuzzplan_version']:
return True
return False
def FuzzerMain(self):
"""Main entry point for the fuzzer.
Bails at the first fault detected.
Returns the highest test # executed, so you can restart at n+1
later if you wish.
"""
for target in self.fuzzplan['targets']:
dest = target['dbus_destination']
monitor = ProcessMonitor(target['daemon_process'])
for pathrec in target['dbus_paths']:
path = pathrec['path']
for ifacerec in pathrec['interfaces']:
interface = ifacerec['interface']
for methodcall in ifacerec['methodcalls']:
method = methodcall[0]
arglist = methodcall[1:]
meth = self.bus.GetMethod(dest, path, interface,
method)
# Iterate all the mutations.
for testargs in self.ArglistPermutations(arglist):
# Fast forward over this if start_at demands it.
if self.bus.runcount < self.start_at:
self.bus.runcount += 1
continue
# Work starts here.
try:
apply(meth, testargs)
except dbus.DBusException as e:
# Might be useful to log this?
pass
if monitor.Fault():
logging.error("%s fault after test #%d." %
(process, self.bus.runcount))
return (self.FAIL, self.bus.runcount)
if (self.stop_at != None and
self.bus.runcount >= self.stop_at):
return (self.PASS, self.bus.runcount)
self.bus.runcount += 1
return (self.DONE, self.bus.runcount - 1)
def main():
programhelp = '%prog - Fuzz dbus interfaces according to a specified plan.'
parser = optparse.OptionParser(usage='usage: %prog [options] fuzzplan',
description=programhelp)
parser.add_option('-s', '--start_at', default=0, type='int',
help='Testcase number to start on, e.g. 0')
parser.add_option('-e', '--stop_at', default=None, type='int',
help='Testcase number to stop on.')
parser.add_option('-p', '--pretend', default=False,
action='store_true',
help='Pretend mode. Do everything except the dbus calls.')
(options, inputs) = parser.parse_args()
if len(inputs) != 1:
parser.print_usage()
sys.exit(1)
# TODO(jimhebert) add a command line logging option that takes a filename.
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
fuzzer = DbusFuzzer(inputs[0], start_at=options.start_at,
stop_at=options.stop_at, pretend=options.pretend)
print "%s:%s" % fuzzer.FuzzerMain()
if __name__ == '__main__':
main()