| # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import dbus |
| import json |
| import logging |
| import os.path |
| from xml.dom.minidom import parse, parseString |
| |
| from autotest_lib.client.bin import test, utils |
| from autotest_lib.client.common_lib import error |
| from autotest_lib.client.cros import constants, login |
| |
| class security_DbusMap(test.test): |
| version = 2 |
| |
| def policy_sort_priority(self, policy): |
| """ |
| Given a DOMElement representing one <policy> block from a dbus |
| configuraiton file, return a number suitable for determining |
| the order in which this policy would be applied by dbus-daemon. |
| For example, by returning: |
| 0 for 'default' policies |
| 1 for 'group' policies |
| 2 for 'user' policies |
| ... these numbers can be used as a sort-key for sorting |
| an array of policies into the order they would be evaluated by |
| dbus-daemon. |
| """ |
| # As derived from dbus-daemon(1) manpage |
| if policy.getAttribute('context') == 'default': |
| return 0 |
| if policy.getAttribute('group') != '': |
| return 1 |
| if policy.getAttribute('user') != '': |
| return 2 |
| if policy.getAttribute('at_console') == 'true': |
| return 3 |
| if policy.getAttribute('at_console') == 'false': |
| return 4 |
| if policy.getAttribute('context') == 'mandatory': |
| return 5 |
| |
| |
| def sort_policies(self, policies): |
| """ |
| Given an array of DOMElements representing <policy> blocks, |
| return a sorted copy of the array. Sorting is determined by |
| the order in which dbus-daemon(1) would consider the rules. |
| This is a stable sort, so in cases where dbus would employ |
| "last rule wins," position in the input list will be honored. |
| """ |
| # Use decorate-sort-undecorate to minimize calls to |
| # policy_sort_priority(). See http://wiki.python.org/moin/HowTo/Sorting |
| decorated = [(self.policy_sort_priority(policy), i, policy) for |
| i, policy in enumerate(policies)] |
| decorated.sort() |
| return [policy for _,_,policy in decorated] |
| |
| |
| def check_policies(self, config_doms, dest, iface, member, |
| user='chronos', at_console=None): |
| """ |
| Given 1 or more xml.dom's representing dbus configuration |
| data, determine if the <destination, interface, member> |
| triplet specified in the arguments would be permitted for |
| the specified user. |
| |
| Returns True if permitted, False otherwise. |
| See also http://dbus.freedesktop.org/doc/busconfig.dtd |
| """ |
| # In the default case, if the caller doesn't specify |
| # "at_console," employ this cros-specific heuristic: |
| if user == 'chronos' and at_console == None: |
| at_console = True |
| |
| # Apply the policies iteratively, in the same order |
| # that dbus-daemon(1) would consider them. |
| allow = False |
| for dom in config_doms: |
| for buscfg in dom.getElementsByTagName('busconfig'): |
| policies = self.sort_policies( |
| buscfg.getElementsByTagName('policy')) |
| for policy in policies: |
| ruling = self.check_one_policy(policy, dest, iface, |
| member, user, at_console) |
| if ruling is not None: |
| allow = ruling |
| return allow |
| |
| |
| def check_one_policy(self, policy, dest, iface, member, |
| user='chronos', at_console=True): |
| """ |
| Given a DOMElement representing one <policy> block from a dbus |
| configuration file, determine if the <destination, interface, |
| member> triplet specified in the arguments would be permitted |
| for the specified user. |
| |
| Returns True if permitted, False if prohibited, or |
| None if the policy does not apply to the triplet. |
| """ |
| # While D-Bus overall is a default-deny, this individual |
| # rule may not match, and some previous rule may have already |
| # said "allow" for this interface/method. So, we work from |
| # here starting with "doesn't apply," not "deny" to avoid |
| # falsely masking any previous "allow" rules. |
| allow = None |
| |
| # TODO(jimhebert) group='...' is not currently used by any |
| # Chrome OS dbus policies but could be in the future so |
| # we should add a check for it in this if-block. |
| if ((policy.getAttribute('context') != 'default') and |
| (policy.getAttribute('user') != user) and |
| (policy.getAttribute('at_console') != 'true')): |
| # In this case, the entire <policy> block does not apply |
| return None |
| |
| # Alternatively, if this IS a at_console policy, but the |
| # situation being checked is not an "at_console" situation, |
| # then that's another way the policy would also not apply. |
| if (policy.getAttribute('at_console') == 'true' and not |
| at_console): |
| return None |
| |
| # If the <policy> applies, try to find <allow> or <deny> |
| # child nodes that apply: |
| for node in policy.childNodes: |
| if (node.nodeType == node.ELEMENT_NODE and |
| node.localName in ['allow','deny']): |
| ruling = self.check_one_node(node, dest, iface, member) |
| if ruling is not None: |
| allow = ruling |
| return allow |
| |
| |
| def check_one_node(self, node, dest, iface, member): |
| """ |
| Given a DOMElement representing one <allow> or <deny> tag from a |
| dbus configuration file, determine if the <destination, interface, |
| member> triplet specified in the arguments would be permitted. |
| |
| Returns True if permitted, False if prohibited, or |
| None if the policy does not apply to the triplet. |
| """ |
| # Require send_destination to match (if we accept missing |
| # send_destination we end up falsely processing tags like |
| # <allow own="...">). But, do not require send_interface |
| # or send_member to exist, because omitting them is used |
| # as a way of wildcarding in dbus configuration. |
| if ((node.getAttribute('send_destination') == dest) and |
| (not node.hasAttribute('send_interface') or |
| node.getAttribute('send_interface') == iface) and |
| (not node.hasAttribute('send_member') or |
| node.getAttribute('send_member') == member)): |
| # The rule applies! Return True if it's an allow rule, else false |
| logging.debug(('%s send_destination=%s send_interface=%s ' |
| 'send_member=%s applies to %s %s %s.') % |
| (node.localName, |
| node.getAttribute('send_destination'), |
| node.getAttribute('send_interface'), |
| node.getAttribute('send_member'), |
| dest, iface, member)) |
| return (node.localName == 'allow') |
| else: |
| return None |
| |
| |
| def load_dbus_config_doms(self, dbusdir='/etc/dbus-1/system.d'): |
| """ |
| Given a path to a directory containing valid dbus configuration |
| files (http://dbus.freedesktop.org/doc/busconfig.dtd), return |
| a series of parsed DOMs representing the configuration. |
| This function implements the same requirements as dbus-daemon |
| itself -- notably, that valid config files must be named |
| with a ".conf" extension. |
| Returns: a list of DOMs |
| """ |
| config_doms = [] |
| for dirent in os.listdir(dbusdir): |
| dirent = os.path.join(dbusdir, dirent) |
| if os.path.isfile(dirent) and dirent.endswith('.conf'): |
| config_doms.append(parse(dirent)) |
| return config_doms |
| |
| |
| def mutual_compare(self, dbus_list, baseline, context='all'): |
| """ |
| This is a front-end for compare_dbus_trees which handles |
| comparison in both directions, discovering not only what is |
| missing from the baseline, but what is missing from the system. |
| |
| The optional 'context' argument is (only) used to for |
| providing more detailed context in the debug-logging |
| that occurs. |
| |
| Returns: True if the two exactly match. False otherwise. |
| """ |
| self.sort_dbus_tree(dbus_list) |
| self.sort_dbus_tree(baseline) |
| |
| # Compare trees to find added API's. |
| newapis = self.compare_dbus_trees(dbus_list, baseline) |
| if (len(newapis) > 0): |
| logging.error("New (accessible to %s) API's to review:" % context) |
| logging.error(json.dumps(newapis, sort_keys=True, indent=2)) |
| |
| # Swap arguments to find missing API's. |
| missing_apis = self.compare_dbus_trees(baseline, dbus_list) |
| if (len(missing_apis) > 0): |
| logging.error("Missing API's (expected to be accessible to %s):" % |
| context) |
| logging.error(json.dumps(missing_apis, sort_keys=True, indent=2)) |
| |
| return (len(newapis) + len(missing_apis) == 0) |
| |
| |
| def add_member(self, dbus_list, dest, iface, member): |
| return self._add_surface(dbus_list, dest, iface, member, 'methods') |
| |
| |
| def add_signal(self, dbus_list, dest, iface, signal): |
| return self._add_surface(dbus_list, dest, iface, signal, 'signals') |
| |
| |
| def add_property(self, dbus_list, dest, iface, signal): |
| return self._add_surface(dbus_list, dest, iface, signal, 'properties') |
| |
| |
| def _add_surface(self, dbus_list, dest, iface, member, slot): |
| """ |
| This can add an entry for a member function to a given |
| dbus list. It behaves somewhat like "mkdir -p" in that |
| it creates any missing, necessary intermediate portions |
| of the data structure. For example, if this is the first |
| member being added for a given interface, the interface |
| will not already be mentioned in dbus_list, and this |
| function initializes the interface dictionary appropriately. |
| Returns: None |
| """ |
| # Ensure the Destination object exists in the data structure. |
| dest_idx = -1 |
| for (i, objdict) in enumerate(dbus_list): |
| if objdict['Object_name'] == dest: |
| dest_idx = i |
| if dest_idx == -1: |
| dbus_list.append({'Object_name': dest, 'interfaces': []}) |
| |
| # Ensure the Interface entry exists for that Destination object. |
| iface_idx = -1 |
| for (i, ifacedict) in enumerate(dbus_list[dest_idx]['interfaces']): |
| if ifacedict['interface'] == iface: |
| iface_idx = i |
| if iface_idx == -1: |
| dbus_list[dest_idx]['interfaces'].append({'interface': iface, |
| 'signals': [], |
| 'properties': [], |
| 'methods': []}) |
| |
| # Ensure the slot exists. |
| if not slot in dbus_list[dest_idx]['interfaces'][iface_idx]: |
| dbus_list[dest_idx]['interfaces'][iface_idx][slot] = [] |
| |
| # Add member so long as it's not a duplicate. |
| if not member in ( |
| dbus_list[dest_idx]['interfaces'][iface_idx][slot]): |
| dbus_list[dest_idx]['interfaces'][iface_idx][slot].append( |
| member) |
| |
| |
| def list_baselined_users(self): |
| """ |
| Return a list of usernames for which we keep user-specific |
| attack-surface baselines. |
| """ |
| bdir = os.path.dirname(os.path.abspath(__file__)) |
| users = [] |
| for item in os.listdir(bdir): |
| # Pick up baseline.username files but ignore emacs backups. |
| if item.startswith('baseline.') and not item.endswith('~'): |
| users.append(item.partition('.')[2]) |
| return users |
| |
| |
| def load_baseline(self, user=''): |
| """ |
| Return a list of interface names we expect to be owned |
| by chronos. |
| """ |
| # The overall baseline is 'baseline'. User-specific baselines are |
| # stored in files named 'baseline.<username>'. |
| baseline_name = 'baseline' |
| if user: |
| baseline_name = '%s.%s' % (baseline_name, user) |
| |
| # Figure out path to baseline file, by looking up our own path. |
| bpath = os.path.abspath(__file__) |
| bpath = os.path.join(os.path.dirname(bpath), baseline_name) |
| return self.load_dbus_data_from_disk(bpath) |
| |
| |
| def write_dbus_data_to_disk(self, dbus_list, file_path): |
| """Writes the given dbus data to a given path to a json file. |
| Args: |
| dbus_list: list of dbus dictionaries to write to disk. |
| file_path: the path to the file to write the data to. |
| """ |
| file_handle = open(file_path, 'w') |
| my_json = json.dumps(dbus_list, sort_keys=True, indent=2) |
| # The json dumper has a trailing whitespace problem, and lacks |
| # a final newline. Fix both here. |
| file_handle.write(my_json.replace(', \n',',\n') + '\n') |
| file_handle.close() |
| |
| |
| def load_dbus_data_from_disk(self, file_path): |
| """Loads dbus data from a given path to a json file. |
| Args: |
| file_path: path to the file as a string. |
| Returns: |
| A list of the dictionary representation of the dbus data loaded. |
| The dictionary format is the same as returned by walk_object(). |
| """ |
| file_handle = open(file_path, 'r') |
| dbus_data = json.loads(file_handle.read()) |
| file_handle.close() |
| return dbus_data |
| |
| |
| def sort_dbus_tree(self, tree): |
| """Sorts a an aray of dbus dictionaries in alphabetical order. |
| All levels of the tree are sorted. |
| Args: |
| tree: the array to sort. Modified in-place. |
| """ |
| tree.sort(key=lambda x: x['Object_name']) |
| for dbus_object in tree: |
| dbus_object['interfaces'].sort(key=lambda x: x['interface']) |
| for interface in dbus_object['interfaces']: |
| interface['methods'].sort() |
| interface['signals'].sort() |
| interface['properties'].sort() |
| |
| |
| def compare_dbus_trees(self, current, baseline): |
| """Compares two dbus dictionaries and return the delta. |
| The comparison only returns what is in the current (LHS) and not |
| in the baseline (RHS). If you want the reverse, call again |
| with the arguments reversed. |
| Args: |
| current: dbus tree you want to compare against the baseline. |
| baseline: dbus tree baseline. |
| Returns: |
| A list of dictionary representations of the additional dbus |
| objects, if there is a difference. Otherwise it returns an |
| empty list. The format of the dictionaries is the same as the |
| one returned in walk_object(). |
| """ |
| # Build the key map of what is in the baseline. |
| bl_object_names = [bl_object['Object_name'] for bl_object in baseline] |
| |
| new_items = [] |
| for dbus_object in current: |
| if dbus_object['Object_name'] in bl_object_names: |
| index = bl_object_names.index(dbus_object['Object_name']) |
| bl_object_interfaces = baseline[index]['interfaces'] |
| bl_interface_names = [name['interface'] for name in |
| bl_object_interfaces] |
| |
| # If we have a new interface/method we need to build the shell. |
| new_object = {'Object_name':dbus_object['Object_name'], |
| 'interfaces':[]} |
| |
| for interface in dbus_object['interfaces']: |
| if interface['interface'] in bl_interface_names: |
| # The interface was baselined, check everything. |
| diffslots = {} |
| for slot in ['methods', 'signals', 'properties']: |
| index = bl_interface_names.index( |
| interface['interface']) |
| bl_methods = set(bl_object_interfaces[index][slot]) |
| methods = set(interface[slot]) |
| difference = methods.difference(bl_methods) |
| diffslots[slot] = list(difference) |
| if (diffslots['methods'] or diffslots['signals'] or |
| diffslots['properties']): |
| # This is a new thing we need to track. |
| new_methods = {'interface':interface['interface'], |
| 'methods': diffslots['methods'], |
| 'signals': diffslots['signals'], |
| 'properties': diffslots['properties'] |
| } |
| new_object['interfaces'].append(new_methods) |
| new_items.append(new_object) |
| else: |
| # This is a new interface we need to track. |
| new_object['interfaces'].append(interface) |
| new_items.append(new_object) |
| else: |
| # This is a new object we need to track. |
| new_items.append(dbus_object) |
| return new_items |
| |
| |
| def walk_object(self, bus, object_name, start_path, dbus_objects): |
| """Walks the given bus and object returns a dictionary representation. |
| The formate of the dictionary is as follows: |
| { |
| Object_name: "string" |
| interfaces: |
| [ |
| interface: "string" |
| methods: |
| [ |
| "string1", |
| "string2" |
| ] |
| ] |
| } |
| Note that the decision to capitalize Object_name is just |
| a way to force it to appear above the interface-list it |
| corresponds to, when pretty-printed by the json dumper. |
| This makes it more logical for humans to read/edit. |
| Args: |
| bus: the bus to query, usually system. |
| object_name: the name of the dbus object to walk. |
| start_path: the path inside of the object in which to start walking |
| dbus_objects: current list of dbus objects in the given object |
| Returns: |
| A dictionary representation of a dbus object |
| """ |
| remote_object = bus.get_object(object_name,start_path) |
| unknown_iface = dbus.Interface(remote_object, |
| 'org.freedesktop.DBus.Introspectable') |
| # Convert the string to an xml DOM object we can walk. |
| xml = parseString(unknown_iface.Introspect()) |
| for child in xml.childNodes: |
| if ((child.nodeType == 1) and (child.localName == u'node')): |
| interfaces = child.getElementsByTagName('interface') |
| for interface in interfaces: |
| interface_name = interface.getAttribute('name') |
| # First get the methods. |
| methods = interface.getElementsByTagName('method') |
| method_list = [] |
| for method in methods: |
| method_list.append(method.getAttribute('name')) |
| # Repeat the process for signals. |
| signals = interface.getElementsByTagName('signal') |
| signal_list = [] |
| for signal in signals: |
| signal_list.append(signal.getAttribute('name')) |
| # Properties have to be discovered via API call. |
| prop_list = [] |
| try: |
| prop_iface = dbus.Interface(remote_object, |
| 'org.freedesktop.DBus.Properties') |
| prop_list = prop_iface.GetAll(interface_name).keys() |
| except dbus.exceptions.DBusException: |
| # Many daemons do not support this interface, |
| # which means they have no properties. |
| pass |
| # Create the dictionary with all the above. |
| dictionary = {'interface':interface_name, |
| 'methods':method_list, 'signals':signal_list, |
| 'properties':prop_list} |
| if dictionary not in dbus_objects: |
| dbus_objects.append(dictionary) |
| nodes = child.getElementsByTagName('node') |
| for node in nodes: |
| name = node.getAttribute('name') |
| if start_path[-1] != '/': |
| start_path = start_path + '/' |
| new_name = start_path + name |
| self.walk_object(bus, object_name, new_name, dbus_objects) |
| return {'Object_name':('%s' % object_name), 'interfaces':dbus_objects} |
| |
| |
| def mapper_main(self): |
| # Currently we only dump the SystemBus. Accessing the SessionBus says: |
| # "ExecFailed: /usr/bin/dbus-launch terminated abnormally with the |
| # following error: Autolaunch requested, but X11 support not compiled |
| # in." |
| # If this changes at a later date, add dbus.SessionBus() to the dict. |
| # We've left the code structured to support walking more than one bus |
| # for such an eventuality. |
| |
| buses = {'System Bus': dbus.SystemBus()} |
| |
| for busname in buses.keys(): |
| bus = buses[busname] |
| remote_dbus_object = bus.get_object('org.freedesktop.DBus', |
| '/org/freedesktop/DBus') |
| iface = dbus.Interface(remote_dbus_object, 'org.freedesktop.DBus') |
| dbus_list = [] |
| for i in iface.ListNames(): |
| # There are some strange listings like ":1" which appear after |
| # certain names. Ignore these since we just need the names. |
| if i.startswith(':'): |
| continue |
| dbus_list.append(self.walk_object(bus, i, '/', [])) |
| |
| # Dump the complete observed dataset to disk. In the somewhat |
| # typical case, that we will want to rev the baseline to |
| # match current reality, these files are easily copied and |
| # checked in as a new baseline. |
| self.sort_dbus_tree(dbus_list) |
| observed_data_path = os.path.join(self.outputdir, 'observed') |
| self.write_dbus_data_to_disk(dbus_list, observed_data_path) |
| |
| baseline = self.load_baseline() |
| test_pass = self.mutual_compare(dbus_list, baseline) |
| |
| # Figure out which of the observed API's are callable by specific users |
| # whose attack surface we are particularly sensitive to: |
| dbus_cfg = self.load_dbus_config_doms() |
| for user in self.list_baselined_users(): |
| user_baseline = self.load_baseline(user) |
| user_observed = [] |
| # user_observed will be a subset of dbus_list. Iterate and check |
| # against the configured dbus policies as we go: |
| for objdict in dbus_list: |
| for ifacedict in objdict['interfaces']: |
| for meth in ifacedict['methods']: |
| if (self.check_policies(dbus_cfg, |
| objdict['Object_name'], |
| ifacedict['interface'], meth, |
| user=user)): |
| self.add_member(user_observed, |
| objdict['Object_name'], |
| ifacedict['interface'], meth) |
| # We don't do permission-checking on signals because |
| # signals are allow-all by default. Just copy them over. |
| for sig in ifacedict['signals']: |
| self.add_signal(user_observed, |
| objdict['Object_name'], |
| ifacedict['interface'], sig) |
| # A property might be readable, or even writable, to |
| # a given user if they can reach the Get/Set interface |
| access = [] |
| if (self.check_policies(dbus_cfg, objdict['Object_name'], |
| 'org.freedesktop.DBus.Properties', |
| 'Set', user=user)): |
| access.append('Set') |
| if (self.check_policies(dbus_cfg, objdict['Object_name'], |
| 'org.freedesktop.DBus.Properties', |
| 'Get', user=user) or |
| self.check_policies(dbus_cfg, objdict['Object_name'], |
| 'org.freedesktop.DBus.Properties', |
| 'GetAll', user=user)): |
| access.append('Get') |
| if access: |
| access = ','.join(access) |
| for prop in ifacedict['properties']: |
| self.add_property(user_observed, |
| objdict['Object_name'], |
| ifacedict['interface'], |
| '%s (%s)' % (prop, access)) |
| |
| self.write_dbus_data_to_disk(user_observed, |
| '%s.%s' % (observed_data_path, user)) |
| test_pass = test_pass and self.mutual_compare(user_observed, |
| user_baseline, user) |
| if not test_pass: |
| raise error.TestFail('Baseline mismatch(es)') |
| |
| |
| def run_once(self): |
| """ |
| Enumerates all discoverable interfaces, methods, and signals |
| in dbus-land. Verifies that it matches an expected set. |
| """ |
| login.wait_for_browser() |
| self.mapper_main() |