blob: 62a06f8eac43e6db1bc1d86b944498301d1f59d7 [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""
Base class for DHCP tests. This class just sets up a little bit of plumbing,
like a virtual ethernet device with one end that looks like a real ethernet
device to shill and a DHCP test server on the end that doesn't look like a real
ethernet interface to shill. Child classes should override test_body() with the
logic of their test. The plumbing of DhcpTestBase is accessible via properties.
"""
import dbus
import logging
import socket
import struct
import time
import traceback
from autotest_lib.client.bin import test
from autotest_lib.client.common_lib import error
from autotest_lib.client.cros import dhcp_handling_rule
from autotest_lib.client.cros import dhcp_packet
from autotest_lib.client.cros import dhcp_test_server
from autotest_lib.client.cros import flimflam_test_path
from autotest_lib.client.cros import virtual_ethernet_pair
import flimflam
# These are keys that may be used with the DBus dictionary returned from
# DhcpTestBase.get_interface_ipconfig().
DHCPCD_KEY_NAMESERVERS = "NameServers"
DHCPCD_KEY_GATEWAY = "Gateway"
DHCPCD_KEY_BROADCAST_ADDR = "Broadcast"
DHCPCD_KEY_ADDRESS = "Address"
DHCPCD_KEY_PREFIX_LENGTH = "Prefixlen"
DHCPCD_KEY_DOMAIN_NAME = "DomainName"
DHCPCD_KEY_SEARCH_DOMAIN_LIST = "SearchDomains"
# We should be able to complete a DHCP negotiation in this amount of time.
DHCP_NEGOTIATION_TIMEOUT_SECONDS = 10
class DhcpTestBase(test.test):
version = 1
@staticmethod
def _cleanup_dbus_types(value):
"""
Removes meta information from dbus data types and returns their raw
Python equivalents.
"""
if isinstance(value, int):
return int(value)
elif isinstance(value, dbus.Double):
return float(value)
elif isinstance(value, str):
return str(value)
elif isinstance(value, unicode):
return unicode(value)
elif isinstance(value, dbus.Array):
return [DhcpTestBase._cleanup_dbus_types(v) for v in value]
elif isinstance(value, dbus.Struct):
return tuple([DhcpTestBase._cleanup_dbus_types(v) for v in value])
elif isinstance(value, dbus.Dictionary):
ret = dict()
for k,v in value.items():
converted_key = DhcpTestBase._cleanup_dbus_types(k)
converted_value = DhcpTestBase._cleanup_dbus_types(v)
ret[converted_key] = converted_value
return ret
else:
raise error.TestFail("Unhandled dbus data type found in "
"conversion: %s." % value)
@staticmethod
def rewrite_ip_suffix(subnet_mask, ip_in_subnet, ip_suffix):
"""
Create a new IPv4 address in a subnet by bitwise and'ing an existing
address |ip_in_subnet| with |subnet_mask| and bitwise or'ing in
|ip_suffix|. For safety, bitwise or the suffix with the complement of
the subnet mask.
Usage: rewrite_ip_suffix("255.255.255.0", "192.168.1.1", "0.0.0.105")
The example usage will return "192.168.1.105".
"""
mask = struct.unpack("!I", socket.inet_aton(subnet_mask))[0]
subnet = mask & struct.unpack("!I", socket.inet_aton(ip_in_subnet))[0]
suffix = ~mask & struct.unpack("!I", socket.inet_aton(ip_suffix))[0]
return socket.inet_ntoa(struct.pack("!I", (subnet | suffix)))
@staticmethod
def get_interface_ipconfig(interface_name):
"""
Returns a dbus dictionary containing settings for an |interface_name|
set via DHCP. Returns None if no such interface or setting bundle on
that interface can be found in shill.
"""
flim = flimflam.FlimFlam(dbus.SystemBus())
device = flim.FindElementByNameSubstring("Device", interface_name)
if device is None:
return None
device_properties = device.GetProperties(utf8_strings=True)
dhcp_properties = None
for property_path in device_properties["IPConfigs"]:
ipconfig = flim.GetObjectInterface("IPConfig", property_path)
ipconfig_properties = ipconfig.GetProperties(utf8_strings=True)
if "Method" not in ipconfig_properties:
logging.info("Found ipconfig object with no method field")
continue
if ipconfig_properties["Method"] != "dhcp":
logging.info("Found ipconfig object with method != dhcp")
continue
if dhcp_properties != None:
raise error.TestFail("Found multiple ipconfig objects "
"with method == dhcp")
dhcp_properties = ipconfig_properties
if dhcp_properties is None:
logging.info("Did not find IPConfig object with method == dhcp")
return None
logging.info("Got raw dhcp config dbus object: %s." % dhcp_properties)
return DhcpTestBase._cleanup_dbus_types(dhcp_properties)
def run_once(self):
self._server = None
self._server_ip = None
self._ethernet_pair = None
self._server = None
try:
self._ethernet_pair = virtual_ethernet_pair.VirtualEthernetPair(
peer_interface_name="pseudoethernet0",
peer_interface_ip=None)
self._ethernet_pair.setup()
if not self._ethernet_pair.is_healthy:
raise error.TestFail("Could not create virtual ethernet pair.")
self._server_ip = self._ethernet_pair.interface_ip
self._server = dhcp_test_server.DhcpTestServer(
self._ethernet_pair.interface_name)
self._server.start()
if not self._server.is_healthy:
raise error.TestFail("Could not start DHCP test server.")
self._subnet_mask = self._ethernet_pair.interface_subnet_mask
self.test_body()
except Exception, e:
logging.error("Caught exception: %s.", str(e))
logging.error("Trace: %s", traceback.format_exc())
raise error.TestFail("Caught exception: %s." % str(e))
finally:
if self._server is not None:
self._server.stop()
if self._ethernet_pair is not None:
self._ethernet_pair.teardown()
def test_body(self):
"""
Override this method with the body of your test. You may safely assume
that the the properties exposed by DhcpTestBase correctly return
references to the test apparatus.
"""
raise error.TestFail("No test body implemented")
@property
def server_ip(self):
"""
Return the IP address of the side of the interface that the DHCP test
server is bound to. The server itself is bound the the broadcast
address on the interface.
"""
return self._server_ip
@property
def server(self):
"""
Returns a reference to the DHCP test server. Use this to add handlers
and run tests.
"""
return self._server
@property
def ethernet_pair(self):
"""
Returns a reference to the virtual ethernet pair created to run DHCP
tests on.
"""
return self._ethernet_pair
def negotiate_and_check_lease(self,
dhcp_options,
custom_fields={},
disable_check=False):
if dhcp_packet.OPTION_REQUESTED_IP not in dhcp_options:
raise error.TestFail("You must specify OPTION_REQUESTED_IP to "
"negotiate a DHCP lease")
intended_ip = dhcp_options[dhcp_packet.OPTION_REQUESTED_IP]
# Build up the handling rules for the server and start the test.
rules = []
rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery(
intended_ip,
self.server_ip,
dhcp_options,
custom_fields))
rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToRequest(
intended_ip,
self.server_ip,
dhcp_options,
custom_fields))
rules[-1].is_final_handler = True
self.server.start_test(rules, DHCP_NEGOTIATION_TIMEOUT_SECONDS)
self.server.wait_for_test_to_finish()
logging.info("Server is negotiating new lease with options: %s" %
dhcp_options)
if not self.server.last_test_passed:
raise error.TestFail("Test server didn't get all the messages it "
"was told to expect during negotiation.")
if disable_check:
logging.info("Skipping check of negotiated DHCP lease parameters.")
else:
# Wait for configuration to propagate over dbus to shill.
# TODO(wiley) Make this event based. This is pretty sloppy.
time.sleep(0.1)
self.check_dhcp_config(dhcp_options)
def check_dhcp_config(self, dhcp_options):
# The config is what the interface was actually configured with, as
# opposed to dhcp_options, which is what the server expected it be
# configured with.
dhcp_config = DhcpTestBase.get_interface_ipconfig(
self.ethernet_pair.peer_interface_name)
if dhcp_config is None:
raise error.TestFail("Failed to retrieve DHCP ipconfig object "
"from shill.")
logging.debug("Got DHCP config: %s", str(dhcp_config))
expected_address = dhcp_options.get(dhcp_packet.OPTION_REQUESTED_IP)
configured_address = dhcp_config.get(DHCPCD_KEY_ADDRESS)
if expected_address != configured_address:
raise error.TestFail("Interface configured with IP address not "
"granted by the DHCP server after DHCP "
"negotiation. Expected %s but got %s." %
(expected_address, configured_address))
# While DNS related settings only propagate to the system when the
# service is marked as the default service, we can still check the
# IP address on the interface, since that is set immediately.
interface_address = self.ethernet_pair.peer_interface_ip
if expected_address != interface_address:
raise error.TestFail("shill somehow knew about the proper DHCP "
"assigned address: %s, but configured the "
"interface with something completely "
"different: %s." %
(expected_address, interface_address))
expected_dns_servers = dhcp_options.get(dhcp_packet.OPTION_DNS_SERVERS)
configured_dns_servers = dhcp_config.get(DHCPCD_KEY_NAMESERVERS)
if expected_dns_servers != configured_dns_servers:
raise error.TestFail("Expected to be configured with DNS server "
"list %s, but was configured with %s "
"instead." % (expected_dns_servers,
configured_dns_servers))
expected_domain_name = dhcp_options.get(dhcp_packet.OPTION_DOMAIN_NAME)
configured_domain_name = dhcp_config.get(DHCPCD_KEY_DOMAIN_NAME)
if expected_domain_name != configured_domain_name:
raise error.TestFail("Expected to be configured with domain "
"name %s, but got %s instead." %
(expected_domain_name, configured_domain_name))
expected_search_list = dhcp_options.get(
dhcp_packet.OPTION_DNS_DOMAIN_SEARCH_LIST)
configured_search_list = dhcp_config.get(DHCPCD_KEY_SEARCH_DOMAIN_LIST)
if expected_search_list != configured_search_list:
raise error.TestFail("Expected to be configured with domain "
"search list %s, but got %s instead." %
(expected_search_list, configured_search_list))
expected_routers = dhcp_options.get(dhcp_packet.OPTION_ROUTERS)
if (not expected_routers and
dhcp_options.get(dhcp_packet.OPTION_CLASSLESS_STATIC_ROUTES)):
classless_static_routes = dhcp_options[
dhcp_packet.OPTION_CLASSLESS_STATIC_ROUTES]
for prefix, destination, gateway in classless_static_routes:
if not prefix:
logging.info("Using %s as the default gateway" % gateway)
expected_routers = [ gateway ]
break
configured_router = dhcp_config.get(DHCPCD_KEY_GATEWAY)
if expected_routers and expected_routers[0] != configured_router:
raise error.TestFail("Expected to be configured with gateway %s, "
"but got %s instead." %
(expected_routers[0], configured_router))
self.server.wait_for_test_to_finish()
if not self.server.last_test_passed:
raise error.TestFail("Test server didn't get all the messages it "
"was told to expect for renewal.")