blob: 911881e00b4af63515de16959d5fb3d9be7ddbdd [file] [log] [blame]
#!/usr/bin/python
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import socket
import sys
import time
from autotest_lib.client.cros import dhcp_handling_rule
from autotest_lib.client.cros import dhcp_packet
from autotest_lib.client.cros import dhcp_test_server
TEST_DATA_PATH_PREFIX = "client/cros/dhcp_test_data/"
def bin2hex(byte_str, justification=20):
"""
Turn big hex strings into prettier strings of hex bytes. Group those hex
bytes into lines justification bytes long.
"""
chars = ["x" + (hex(ord(c))[2:].zfill(2)) for c in byte_str]
groups = []
for i in xrange(0, len(chars), justification):
groups.append("".join(chars[i:i+justification]))
return "\n".join(groups)
def test_packet_serialization():
log_file = open(TEST_DATA_PATH_PREFIX + "dhcp_discovery.log", "rb")
binary_discovery_packet = log_file.read()
log_file.close()
discovery_packet = dhcp_packet.DhcpPacket(byte_str=binary_discovery_packet)
if not discovery_packet.is_valid:
return False
generated_string = discovery_packet.to_binary_string()
if generated_string is None:
print "Failed to generate string from packet object."
return False
if generated_string != binary_discovery_packet:
print "Packets didn't match: "
print "Generated: \n%s" % bin2hex(generated_string)
print "Expected: \n%s" % bin2hex(binary_discovery_packet)
return False
print "test_packet_serialization PASSED"
return True
def receive_packet(a_socket, timeout_seconds=1.0):
data = None
start_time = time.time()
while data is None and start_time + timeout_seconds > time.time():
try:
data, _ = a_socket.recvfrom(1024)
except socket.timeout:
pass # We expect many timeouts.
if data is None:
print "Timed out before we received a response from the server."
return None
print "Client received a packet of length %d from the server." % len(data)
packet = dhcp_packet.DhcpPacket(byte_str=data)
if not packet.is_valid:
print "Received an invalid response from DHCP server."
return None
return packet
def test_simple_server_exchange(server):
intended_ip = "127.0.0.42"
subnet_mask = "255.255.255.0"
server_ip = "127.0.0.1"
lease_time_seconds = 60
test_timeout = 3.0
mac_addr = "\x01\x02\x03\x04\x05\x06"
# Build up our packets and have them request some default option values,
# like the IP we're being assigned and the address of the server assigning
# it.
discovery_message = dhcp_packet.DhcpPacket.create_discovery_packet(mac_addr)
discovery_message.set_option(
dhcp_packet.OPTION_PARAMETER_REQUEST_LIST,
dhcp_packet.OPTION_VALUE_PARAMETER_REQUEST_LIST_DEFAULT)
request_message = dhcp_packet.DhcpPacket.create_request_packet(
discovery_message.transaction_id,
mac_addr)
request_message.set_option(
dhcp_packet.OPTION_PARAMETER_REQUEST_LIST,
dhcp_packet.OPTION_VALUE_PARAMETER_REQUEST_LIST_DEFAULT)
# This is the pool of settings the DHCP server will seem to draw from to
# answer queries from the client. This information is written into packets
# through the handling rules.
dhcp_server_config = {
dhcp_packet.OPTION_SERVER_ID : server_ip,
dhcp_packet.OPTION_SUBNET_MASK : subnet_mask,
dhcp_packet.OPTION_IP_LEASE_TIME : lease_time_seconds,
dhcp_packet.OPTION_REQUESTED_IP : intended_ip,
}
# Build up the handling rules for the server and start the test.
rules = []
rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery(
intended_ip,
server_ip,
dhcp_server_config))
rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToRequest(
intended_ip,
server_ip,
dhcp_server_config))
rules[-1].is_final_handler = True
server.start_test(rules, test_timeout)
# Because we don't want to require root permissions to run these tests,
# listen on the loopback device, don't broadcast, and don't use reserved
# ports (like the actual DHCP ports). Use 8068/8067 instead.
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client_socket.bind(("127.0.0.1", 8068))
client_socket.settimeout(0.1)
client_socket.sendto(discovery_message.to_binary_string(),
(server_ip, 8067))
offer_packet = receive_packet(client_socket)
if offer_packet is None:
return False
if (offer_packet.message_type !=
dhcp_packet.OPTION_VALUE_DHCP_MESSAGE_TYPE_OFFER):
print "Type of DHCP response is not offer."
return False
if offer_packet.get_field(dhcp_packet.FIELD_YOUR_IP) != intended_ip:
print "Server didn't offer the IP we expected."
return False
print "Offer looks good to the client, sending request."
# In real tests, dhcpcd formats all the DISCOVERY and REQUEST messages. In
# our unit test, we have to do this ourselves.
request_message.set_option(
dhcp_packet.OPTION_SERVER_ID,
offer_packet.get_option(dhcp_packet.OPTION_SERVER_ID))
request_message.set_option(
dhcp_packet.OPTION_SUBNET_MASK,
offer_packet.get_option(dhcp_packet.OPTION_SUBNET_MASK))
request_message.set_option(
dhcp_packet.OPTION_IP_LEASE_TIME,
offer_packet.get_option(dhcp_packet.OPTION_IP_LEASE_TIME))
request_message.set_option(
dhcp_packet.OPTION_REQUESTED_IP,
offer_packet.get_option(dhcp_packet.OPTION_REQUESTED_IP))
# Send the REQUEST message.
client_socket.sendto(request_message.to_binary_string(),
(server_ip, 8067))
ack_packet = receive_packet(client_socket)
if ack_packet is None:
return False
if (ack_packet.message_type !=
dhcp_packet.OPTION_VALUE_DHCP_MESSAGE_TYPE_ACK):
print "Type of DHCP response is not acknowledgement."
return False
if offer_packet.get_field(dhcp_packet.FIELD_YOUR_IP) != intended_ip:
print "Server didn't give us the IP we expected."
return False
print "Waiting for the server to finish."
server.wait_for_test_to_finish()
print "Server agrees that the test is over."
if not server.last_test_passed:
print "Server is unhappy with the test result."
return False
print "test_simple_server_exchange PASSED."
return True
def test_server_dialogue():
server = dhcp_test_server.DhcpTestServer(ingress_address="127.0.0.1",
ingress_port=8067,
broadcast_address="127.0.0.1",
broadcast_port=8068)
server.start()
ret = False
if server.is_healthy:
ret = test_simple_server_exchange(server)
else:
print "Server isn't healthy, aborting."
print "Sending server stop() signal."
server.stop()
print "Stop signal sent."
return ret
def run_tests():
logger = logging.getLogger("dhcp")
logger.setLevel(logging.DEBUG)
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.DEBUG)
logger.addHandler(stream_handler)
retval = test_packet_serialization()
retval &= test_server_dialogue()
if retval:
print "All tests PASSED."
return 0
else:
print "Some tests FAILED"
return -1
if __name__ == "__main__":
sys.exit(run_tests())