| # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """ |
| Base class for DHCP tests. This class just sets up a little bit of plumbing, |
| like a virtual ethernet device with one end that looks like a real ethernet |
| device to shill and a DHCP test server on the end that doesn't look like a real |
| ethernet interface to shill. Child classes should override test_body() with the |
| logic of their test. The plumbing of DhcpTestBase is accessible via properties. |
| """ |
| |
| import dbus |
| import logging |
| import socket |
| import struct |
| import time |
| import traceback |
| |
| from autotest_lib.client.bin import test |
| from autotest_lib.client.common_lib import error |
| from autotest_lib.client.cros import dhcp_handling_rule |
| from autotest_lib.client.cros import dhcp_packet |
| from autotest_lib.client.cros import dhcp_test_server |
| from autotest_lib.client.cros import flimflam_test_path |
| from autotest_lib.client.cros import virtual_ethernet_pair |
| |
| import flimflam |
| |
| # These are keys that may be used with the DBus dictionary returned from |
| # DhcpTestBase.get_interface_ipconfig(). |
| DHCPCD_KEY_NAMESERVERS = "NameServers" |
| DHCPCD_KEY_GATEWAY = "Gateway" |
| DHCPCD_KEY_BROADCAST_ADDR = "Broadcast" |
| DHCPCD_KEY_ADDRESS = "Address" |
| DHCPCD_KEY_PREFIX_LENGTH = "Prefixlen" |
| DHCPCD_KEY_DOMAIN_NAME = "DomainName" |
| DHCPCD_KEY_SEARCH_DOMAIN_LIST = "SearchDomains" |
| |
| # We should be able to complete a DHCP negotiation in this amount of time. |
| DHCP_NEGOTIATION_TIMEOUT_SECONDS = 10 |
| |
| class DhcpTestBase(test.test): |
| version = 1 |
| |
| @staticmethod |
| def _cleanup_dbus_types(value): |
| """ |
| Removes meta information from dbus data types and returns their raw |
| Python equivalents. |
| """ |
| if isinstance(value, int): |
| return int(value) |
| elif isinstance(value, dbus.Double): |
| return float(value) |
| elif isinstance(value, str): |
| return str(value) |
| elif isinstance(value, unicode): |
| return unicode(value) |
| elif isinstance(value, dbus.Array): |
| return [DhcpTestBase._cleanup_dbus_types(v) for v in value] |
| elif isinstance(value, dbus.Struct): |
| return tuple([DhcpTestBase._cleanup_dbus_types(v) for v in value]) |
| elif isinstance(value, dbus.Dictionary): |
| ret = dict() |
| for k,v in value.items(): |
| converted_key = DhcpTestBase._cleanup_dbus_types(k) |
| converted_value = DhcpTestBase._cleanup_dbus_types(v) |
| ret[converted_key] = converted_value |
| return ret |
| else: |
| raise error.TestFail("Unhandled dbus data type found in " |
| "conversion: %s." % value) |
| |
| @staticmethod |
| def rewrite_ip_suffix(subnet_mask, ip_in_subnet, ip_suffix): |
| """ |
| Create a new IPv4 address in a subnet by bitwise and'ing an existing |
| address |ip_in_subnet| with |subnet_mask| and bitwise or'ing in |
| |ip_suffix|. For safety, bitwise or the suffix with the complement of |
| the subnet mask. |
| |
| Usage: rewrite_ip_suffix("255.255.255.0", "192.168.1.1", "0.0.0.105") |
| |
| The example usage will return "192.168.1.105". |
| """ |
| mask = struct.unpack("!I", socket.inet_aton(subnet_mask))[0] |
| subnet = mask & struct.unpack("!I", socket.inet_aton(ip_in_subnet))[0] |
| suffix = ~mask & struct.unpack("!I", socket.inet_aton(ip_suffix))[0] |
| return socket.inet_ntoa(struct.pack("!I", (subnet | suffix))) |
| |
| @staticmethod |
| def get_interface_ipconfig(interface_name): |
| """ |
| Returns a dbus dictionary containing settings for an |interface_name| |
| set via DHCP. Returns None if no such interface or setting bundle on |
| that interface can be found in shill. |
| """ |
| flim = flimflam.FlimFlam(dbus.SystemBus()) |
| device = flim.FindElementByNameSubstring("Device", interface_name) |
| if device is None: |
| return None |
| |
| device_properties = device.GetProperties(utf8_strings=True) |
| dhcp_properties = None |
| for property_path in device_properties["IPConfigs"]: |
| ipconfig = flim.GetObjectInterface("IPConfig", property_path) |
| ipconfig_properties = ipconfig.GetProperties(utf8_strings=True) |
| if "Method" not in ipconfig_properties: |
| logging.info("Found ipconfig object with no method field") |
| continue |
| if ipconfig_properties["Method"] != "dhcp": |
| logging.info("Found ipconfig object with method != dhcp") |
| continue |
| if dhcp_properties != None: |
| raise error.TestFail("Found multiple ipconfig objects " |
| "with method == dhcp") |
| dhcp_properties = ipconfig_properties |
| if dhcp_properties is None: |
| logging.info("Did not find IPConfig object with method == dhcp") |
| return None |
| logging.info("Got raw dhcp config dbus object: %s." % dhcp_properties) |
| return DhcpTestBase._cleanup_dbus_types(dhcp_properties) |
| |
| def run_once(self): |
| self._server = None |
| self._server_ip = None |
| self._ethernet_pair = None |
| self._server = None |
| try: |
| self._ethernet_pair = virtual_ethernet_pair.VirtualEthernetPair( |
| peer_interface_name="pseudoethernet0", |
| peer_interface_ip=None) |
| self._ethernet_pair.setup() |
| if not self._ethernet_pair.is_healthy: |
| raise error.TestFail("Could not create virtual ethernet pair.") |
| self._server_ip = self._ethernet_pair.interface_ip |
| self._server = dhcp_test_server.DhcpTestServer( |
| self._ethernet_pair.interface_name) |
| self._server.start() |
| if not self._server.is_healthy: |
| raise error.TestFail("Could not start DHCP test server.") |
| self._subnet_mask = self._ethernet_pair.interface_subnet_mask |
| self.test_body() |
| except error.TestFail: |
| # Pass these through without modification. |
| raise |
| except Exception as e: |
| logging.error("Caught exception: %s.", str(e)) |
| logging.error("Trace: %s", traceback.format_exc()) |
| raise error.TestFail("Caught exception: %s." % str(e)) |
| finally: |
| if self._server is not None: |
| self._server.stop() |
| if self._ethernet_pair is not None: |
| self._ethernet_pair.teardown() |
| |
| def test_body(self): |
| """ |
| Override this method with the body of your test. You may safely assume |
| that the the properties exposed by DhcpTestBase correctly return |
| references to the test apparatus. |
| """ |
| raise error.TestFail("No test body implemented") |
| |
| @property |
| def server_ip(self): |
| """ |
| Return the IP address of the side of the interface that the DHCP test |
| server is bound to. The server itself is bound the the broadcast |
| address on the interface. |
| """ |
| return self._server_ip |
| |
| @property |
| def server(self): |
| """ |
| Returns a reference to the DHCP test server. Use this to add handlers |
| and run tests. |
| """ |
| return self._server |
| |
| @property |
| def ethernet_pair(self): |
| """ |
| Returns a reference to the virtual ethernet pair created to run DHCP |
| tests on. |
| """ |
| return self._ethernet_pair |
| |
| def negotiate_and_check_lease(self, |
| dhcp_options, |
| custom_fields={}, |
| disable_check=False): |
| if dhcp_packet.OPTION_REQUESTED_IP not in dhcp_options: |
| raise error.TestFail("You must specify OPTION_REQUESTED_IP to " |
| "negotiate a DHCP lease") |
| intended_ip = dhcp_options[dhcp_packet.OPTION_REQUESTED_IP] |
| # Build up the handling rules for the server and start the test. |
| rules = [] |
| rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery( |
| intended_ip, |
| self.server_ip, |
| dhcp_options, |
| custom_fields)) |
| rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToRequest( |
| intended_ip, |
| self.server_ip, |
| dhcp_options, |
| custom_fields)) |
| rules[-1].is_final_handler = True |
| self.server.start_test(rules, DHCP_NEGOTIATION_TIMEOUT_SECONDS) |
| self.server.wait_for_test_to_finish() |
| logging.info("Server is negotiating new lease with options: %s" % |
| dhcp_options) |
| if not self.server.last_test_passed: |
| raise error.TestFail("Test server didn't get all the messages it " |
| "was told to expect during negotiation.") |
| |
| if disable_check: |
| logging.info("Skipping check of negotiated DHCP lease parameters.") |
| else: |
| # Wait for configuration to propagate over dbus to shill. |
| # TODO(wiley) Make this event based. This is pretty sloppy. |
| time.sleep(0.1) |
| self.check_dhcp_config(dhcp_options) |
| |
| |
| def check_dhcp_config(self, dhcp_options): |
| # The config is what the interface was actually configured with, as |
| # opposed to dhcp_options, which is what the server expected it be |
| # configured with. |
| dhcp_config = DhcpTestBase.get_interface_ipconfig( |
| self.ethernet_pair.peer_interface_name) |
| if dhcp_config is None: |
| raise error.TestFail("Failed to retrieve DHCP ipconfig object " |
| "from shill.") |
| |
| logging.debug("Got DHCP config: %s", str(dhcp_config)) |
| expected_address = dhcp_options.get(dhcp_packet.OPTION_REQUESTED_IP) |
| configured_address = dhcp_config.get(DHCPCD_KEY_ADDRESS) |
| if expected_address != configured_address: |
| raise error.TestFail("Interface configured with IP address not " |
| "granted by the DHCP server after DHCP " |
| "negotiation. Expected %s but got %s." % |
| (expected_address, configured_address)) |
| |
| # While DNS related settings only propagate to the system when the |
| # service is marked as the default service, we can still check the |
| # IP address on the interface, since that is set immediately. |
| interface_address = self.ethernet_pair.peer_interface_ip |
| if expected_address != interface_address: |
| raise error.TestFail("shill somehow knew about the proper DHCP " |
| "assigned address: %s, but configured the " |
| "interface with something completely " |
| "different: %s." % |
| (expected_address, interface_address)) |
| |
| expected_dns_servers = dhcp_options.get(dhcp_packet.OPTION_DNS_SERVERS) |
| configured_dns_servers = dhcp_config.get(DHCPCD_KEY_NAMESERVERS) |
| if expected_dns_servers != configured_dns_servers: |
| raise error.TestFail("Expected to be configured with DNS server " |
| "list %s, but was configured with %s " |
| "instead." % (expected_dns_servers, |
| configured_dns_servers)) |
| |
| expected_domain_name = dhcp_options.get(dhcp_packet.OPTION_DOMAIN_NAME) |
| configured_domain_name = dhcp_config.get(DHCPCD_KEY_DOMAIN_NAME) |
| if expected_domain_name != configured_domain_name: |
| raise error.TestFail("Expected to be configured with domain " |
| "name %s, but got %s instead." % |
| (expected_domain_name, configured_domain_name)) |
| |
| expected_search_list = dhcp_options.get( |
| dhcp_packet.OPTION_DNS_DOMAIN_SEARCH_LIST) |
| configured_search_list = dhcp_config.get(DHCPCD_KEY_SEARCH_DOMAIN_LIST) |
| if expected_search_list != configured_search_list: |
| raise error.TestFail("Expected to be configured with domain " |
| "search list %s, but got %s instead." % |
| (expected_search_list, configured_search_list)) |
| |
| expected_routers = dhcp_options.get(dhcp_packet.OPTION_ROUTERS) |
| if (not expected_routers and |
| dhcp_options.get(dhcp_packet.OPTION_CLASSLESS_STATIC_ROUTES)): |
| classless_static_routes = dhcp_options[ |
| dhcp_packet.OPTION_CLASSLESS_STATIC_ROUTES] |
| for prefix, destination, gateway in classless_static_routes: |
| if not prefix: |
| logging.info("Using %s as the default gateway" % gateway) |
| expected_routers = [ gateway ] |
| break |
| configured_router = dhcp_config.get(DHCPCD_KEY_GATEWAY) |
| if expected_routers and expected_routers[0] != configured_router: |
| raise error.TestFail("Expected to be configured with gateway %s, " |
| "but got %s instead." % |
| (expected_routers[0], configured_router)) |
| |
| self.server.wait_for_test_to_finish() |
| if not self.server.last_test_passed: |
| raise error.TestFail("Test server didn't get all the messages it " |
| "was told to expect for renewal.") |