| # Copyright 2015 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import dbus |
| import logging |
| import os |
| import time |
| |
| from autotest_lib.client.bin import test, utils |
| from autotest_lib.client.common_lib import error |
| |
| |
| class platform_Firewall(test.test): |
| """Ensure the firewall service is working correctly.""" |
| |
| version = 1 |
| |
| _PORT = 1234 |
| _IFACE = "eth0" |
| |
| _TCP_RULE = "-A INPUT -p tcp -m tcp --dport %d -j ACCEPT" % _PORT |
| _UDP_RULE = "-A INPUT -p udp -m udp --dport %d -j ACCEPT" % _PORT |
| _IFACE_RULE = "-A INPUT -i %s -p tcp -m tcp --dport %d -j ACCEPT" % (_IFACE, |
| _PORT) |
| |
| _POLL_INTERVAL = 5 |
| |
| _IPTABLES_DEL_CMD = "%s -D INPUT -p %s -m %s --dport %d -j ACCEPT" |
| |
| @staticmethod |
| def _iptables_rules(executable): |
| rule_output = utils.system_output("%s -S" % executable) |
| logging.debug(rule_output) |
| return [line.strip() for line in rule_output.splitlines()] |
| |
| |
| @staticmethod |
| def _check(expected_rule, actual_rules, error_msg, executable, check): |
| # If check() returns false, fail the test. |
| if not check(expected_rule, actual_rules): |
| raise error.TestFail(error_msg % executable) |
| |
| |
| @staticmethod |
| def _check_included(expected_rule, actual_rules, error_msg, executable): |
| # Test whether the rule is included, fail if it's not. |
| platform_Firewall._check( |
| expected_rule, actual_rules, error_msg, executable, |
| lambda e, a: e in a) |
| |
| |
| @staticmethod |
| def _check_not_included(expected_rule, actual_rules, error_msg, executable): |
| # Test whether the rule is not included, fail if it is. |
| platform_Firewall._check( |
| expected_rule, actual_rules, error_msg, executable, |
| lambda e, a: e not in a) |
| |
| |
| def run_once(self): |
| # Create lifeline file descriptors. |
| self.tcp_r, self.tcp_w = os.pipe() |
| self.udp_r, self.udp_w = os.pipe() |
| self.iface_r, self.iface_w = os.pipe() |
| |
| try: |
| bus = dbus.SystemBus() |
| pb_proxy = bus.get_object('org.chromium.PermissionBroker', |
| '/org/chromium/PermissionBroker') |
| pb = dbus.Interface(pb_proxy, 'org.chromium.PermissionBroker') |
| |
| tcp_lifeline = dbus.types.UnixFd(self.tcp_r) |
| ret = pb.RequestTcpPortAccess(dbus.UInt16(self._PORT), "", |
| tcp_lifeline) |
| # |ret| is a dbus.Boolean, but compares as int. |
| if ret == 0: |
| raise error.TestFail("RequestTcpPortAccess returned false.") |
| |
| udp_lifeline = dbus.types.UnixFd(self.udp_r) |
| ret = pb.RequestUdpPortAccess(dbus.UInt16(self._PORT), "", |
| udp_lifeline) |
| # |ret| is a dbus.Boolean, but compares as int. |
| if ret == 0: |
| raise error.TestFail("RequestUdpPortAccess returned false.") |
| |
| iface_lifeline = dbus.types.UnixFd(self.iface_r) |
| ret = pb.RequestTcpPortAccess(dbus.UInt16(self._PORT), |
| dbus.String(self._IFACE), |
| iface_lifeline) |
| # |ret| is a dbus.Boolean, but compares as int. |
| if ret == 0: |
| raise error.TestFail( |
| "RequestTcpPortAccess(port, interface) returned false.") |
| |
| # Test IPv4 and IPv6. |
| for executable in ["iptables", "ip6tables"]: |
| actual_rules = self._iptables_rules(executable) |
| self._check_included( |
| self._TCP_RULE, actual_rules, |
| "RequestTcpPortAccess did not add %s rule.", |
| executable) |
| self._check_included( |
| self._UDP_RULE, actual_rules, |
| "RequestUdpPortAccess did not add %s rule.", |
| executable) |
| self._check_included( |
| self._IFACE_RULE, actual_rules, |
| "RequestTcpPortAccess(port, interface)" |
| " did not add %s rule.", |
| executable) |
| |
| ret = pb.ReleaseTcpPort(dbus.UInt16(self._PORT), "") |
| # |ret| is a dbus.Boolean, but compares as int. |
| if ret == 0: |
| raise error.TestFail("ReleaseTcpPort returned false.") |
| |
| ret = pb.ReleaseUdpPort(dbus.UInt16(self._PORT), "") |
| # |ret| is a dbus.Boolean, but compares as int. |
| if ret == 0: |
| raise error.TestFail("ReleaseUdpPort returned false.") |
| |
| # Test IPv4 and IPv6. |
| for executable in ["iptables", "ip6tables"]: |
| rules = self._iptables_rules(executable) |
| self._check_not_included( |
| self._TCP_RULE, rules, |
| "ReleaseTcpPortAccess did not remove %s rule.", |
| executable) |
| self._check_not_included( |
| self._UDP_RULE, rules, |
| "ReleaseUdpPortAccess did not remove %s rule.", |
| executable) |
| |
| # permission_broker should plug the firewall hole |
| # when the requesting process exits. |
| # Simulate the process exiting by closing |iface_w|. |
| os.close(self.iface_w) |
| |
| # permission_broker checks every |_POLL_INTERVAL| seconds |
| # for processes that have exited. |
| # This is ugly, but it's either this or polling /var/log/messages. |
| time.sleep(self._POLL_INTERVAL + 1) |
| # Test IPv4 and IPv6. |
| for executable in ["iptables", "ip6tables"]: |
| rules = self._iptables_rules(executable) |
| self._check_not_included( |
| self._IFACE_RULE, rules, |
| "permission_broker did not remove %s rule.", |
| executable) |
| |
| except dbus.DBusException as e: |
| raise error.TestFail("D-Bus error: " + e.get_dbus_message()) |
| |
| |
| def cleanup(self): |
| # File descriptors could already be closed. |
| try: |
| os.close(self.tcp_w) |
| os.close(self.udp_w) |
| os.close(self.iface_w) |
| except OSError: |
| pass |
| |
| # We don't want the cleanup() method to fail, so we ignore exit codes. |
| # This also allows us to clean up iptables rules unconditionally. |
| # The command will fail if the rule has already been deleted, |
| # but it won't fail the test. |
| for executable in ["iptables", "ip6tables"]: |
| cmd = self._IPTABLES_DEL_CMD % (executable, "tcp", "tcp", |
| self._PORT) |
| utils.system(cmd, ignore_status=True) |
| cmd = self._IPTABLES_DEL_CMD % (executable, "udp", "udp", |
| self._PORT) |
| utils.system(cmd, ignore_status=True) |
| cmd = self._IPTABLES_DEL_CMD % (executable, "tcp", "tcp", |
| self._PORT) |
| cmd += " -i %s" % self._IFACE |
| utils.system(cmd, ignore_status=True) |