blob: c58dd24e2534e372477a0a66b2e597aac6fefe2d [file] [log] [blame]
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import dbus
import logging
import os
import time
from autotest_lib.client.bin import test, utils
from autotest_lib.client.common_lib import error
class platform_Firewall(test.test):
"""Ensure the firewall service is working correctly."""
version = 1
_PORT = 1234
_IFACE = "eth0"
_TCP_RULE = "-A INPUT -p tcp -m tcp --dport %d -j ACCEPT" % _PORT
_UDP_RULE = "-A INPUT -p udp -m udp --dport %d -j ACCEPT" % _PORT
_IFACE_RULE = "-A INPUT -i %s -p tcp -m tcp --dport %d -j ACCEPT" % (_IFACE,
_PORT)
_POLL_INTERVAL = 5
_IPTABLES_DEL_CMD = "%s -D INPUT -p %s -m %s --dport %d -j ACCEPT"
@staticmethod
def _iptables_rules(executable):
rule_output = utils.system_output("%s -S" % executable)
logging.debug(rule_output)
return [line.strip() for line in rule_output.splitlines()]
@staticmethod
def _check_rule(expected_rule, actual_rules, error_msg, executable):
if expected_rule not in actual_rules:
raise error.TestFail(error_msg % executable)
def run_once(self):
bus = dbus.SystemBus()
pb_proxy = bus.get_object('org.chromium.PermissionBroker',
'/org/chromium/PermissionBroker')
pb = dbus.Interface(pb_proxy, 'org.chromium.PermissionBroker')
self.tcp_r, self.tcp_w = os.pipe()
self.udp_r, self.udp_w = os.pipe()
self.iface_r, self.iface_w = os.pipe()
try:
tcp_lifeline = dbus.types.UnixFd(self.tcp_r)
ret = pb.RequestTcpPortAccess(dbus.UInt16(self._PORT), "",
tcp_lifeline)
# |ret| is a dbus.Boolean, but compares as int.
if ret == 0:
raise error.TestFail("RequestTcpPortAccess returned false.")
udp_lifeline = dbus.types.UnixFd(self.udp_r)
ret = pb.RequestUdpPortAccess(dbus.UInt16(self._PORT), "",
udp_lifeline)
# |ret| is a dbus.Boolean, but compares as int.
if ret == 0:
raise error.TestFail("RequestUdpPortAccess returned false.")
iface_lifeline = dbus.types.UnixFd(self.iface_r)
ret = pb.RequestTcpPortAccess(dbus.UInt16(self._PORT),
dbus.String(self._IFACE),
iface_lifeline)
# |ret| is a dbus.Boolean, but compares as int.
if ret == 0:
raise error.TestFail(
"RequestTcpPortAccess(port, interface) returned false.")
# Test IPv4 and IPv6.
for executable in ["iptables", "ip6tables"]:
actual_rules = self._iptables_rules(executable)
self._check_rule(self._TCP_RULE, actual_rules,
"RequestTcpPortAccess did not add %s rule.",
executable)
self._check_rule(self._UDP_RULE, actual_rules,
"RequestUdpPortAccess did not add %s rule.",
executable)
self._check_rule(self._IFACE_RULE, actual_rules,
"RequestTcpPortAccess(port, interface)"
" did not add %s rule.",
executable)
# permission_broker should plug the firewall hole
# when the requesting process exits.
# Simulate the process exiting by closing both write ends.
os.close(self.tcp_w)
os.close(self.udp_w)
os.close(self.iface_w)
# permission_broker checks every |_POLL_INTERVAL| seconds
# for processes that have exited.
# This is ugly, but it's either this or polling /var/log/messages.
time.sleep(self._POLL_INTERVAL + 1)
# Test IPv4 and IPv6.
for executable in ["iptables", "ip6tables"]:
rules = self._iptables_rules(executable)
if (self._TCP_RULE in rules or self._UDP_RULE in rules
or self._IFACE_RULE in rules):
raise error.TestFail(
"permission_broker did not remove %s rule.", executable)
except dbus.DBusException as e:
raise error.TestFail("D-Bus error: " + e.get_dbus_message())
def cleanup(self):
# File descriptors could already be closed.
try:
os.close(self.tcp_w)
os.close(self.udp_w)
os.close(self.iface_w)
except OSError:
pass
# We don't want the cleanup() method to fail, so we ignore exit codes.
# This also allows us to clean up iptables rules unconditionally.
# The command will fail if the rule has already been deleted,
# but it won't fail the test.
for executable in ["iptables", "ip6tables"]:
cmd = self._IPTABLES_DEL_CMD % (executable, "tcp", "tcp",
self._PORT)
utils.system(cmd, ignore_status=True)
cmd = self._IPTABLES_DEL_CMD % (executable, "udp", "udp",
self._PORT)
utils.system(cmd, ignore_status=True)
cmd = self._IPTABLES_DEL_CMD % (executable, "tcp", "tcp",
self._PORT)
cmd += " -i %s" % self._IFACE
utils.system(cmd, ignore_status=True)