blob: 0b591c0f2b7f1884f4821645ac9991fa6d150ecb [file] [log] [blame] [edit]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""
Tools for serializing and deserializing DHCP packets.
DhcpPacket is a class that represents a single DHCP packet and contains some
logic to create and parse binary strings containing on the wire DHCP packets.
While you could call the constructor explicitly, most users should use the
static factories to construct packets with reasonable default values in most of
the fields, even if those values are zeros.
For example:
packet = dhcp_packet.create_offer_packet(transaction_id,
hwmac_addr,
offer_ip,
server_ip)
socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Sending to the broadcast address needs special permissions.
socket.sendto(response_packet.to_binary_string(),
("255.255.255.255", 68))
Note that if you make changes, make sure that the tests in the bottom of this
file still pass.
"""
import collections
import logging
import random
import socket
import struct
def CreatePacketPieceClass(super_class, field_format):
class PacketPiece(super_class):
@staticmethod
def pack(value):
return struct.pack(field_format, value)
@staticmethod
def unpack(byte_string):
return struct.unpack(field_format, byte_string)[0]
return PacketPiece
"""
Represents an option in a DHCP packet. Options may or may not be present in any
given packet, depending on the configurations of the client and the server.
Using namedtuples as super classes gets us the comparison operators we want to
use these Options in dictionaries as keys. Below, we'll subclass Option to
reflect that different kinds of options serialize to on the wire formats in
different ways.
|name|
A human readable name for this option.
|number|
Every DHCP option has a number that goes into the packet to indicate
which particular option is being encoded in the next few bytes. This
property returns that number for each option.
"""
Option = collections.namedtuple("Option", ["name", "number"])
ByteOption = CreatePacketPieceClass(Option, "!B")
ShortOption = CreatePacketPieceClass(Option, "!H")
IntOption = CreatePacketPieceClass(Option, "!I")
class IpAddressOption(Option):
@staticmethod
def pack(value):
return socket.inet_aton(value)
@staticmethod
def unpack(byte_string):
return socket.inet_ntoa(byte_string)
class IpListOption(Option):
@staticmethod
def pack(value):
return "".join([socket.inet_aton(addr) for addr in value])
@staticmethod
def unpack(byte_string):
return [socket.inet_ntoa(byte_string[idx:idx+4])
for idx in range(0, len(byte_string), 4)]
class RawOption(Option):
@staticmethod
def pack(value):
return value
@staticmethod
def unpack(byte_string):
return byte_string
class ByteListOption(Option):
@staticmethod
def pack(value):
return "".join(chr(v) for v in value)
@staticmethod
def unpack(byte_string):
return [ord(c) for c in byte_string]
class ClasslessStaticRoutesOption(Option):
"""
This is a RFC 3442 compliant classless static route option parser and
serializer. The symbolic "value" packed and unpacked from this class
is a list (prefix_size, destination, router) tuples.
"""
@staticmethod
def pack(value):
route_list = value
byte_string = ""
for prefix_size, destination, router in route_list:
byte_string += chr(prefix_size)
# Encode only the significant octets of the destination
# that fall within the prefix.
destination_address_count = (prefix_size + 7) / 8
destination_address = socket.inet_aton(destination)
byte_string += destination_address[:destination_address_count]
byte_string += socket.inet_aton(router)
return byte_string
@staticmethod
def unpack(byte_string):
route_list = []
offset = 0
while offset < len(byte_string):
prefix_size = ord(byte_string[offset])
destination_address_count = (prefix_size + 7) / 8
entry_end = offset + 1 + destination_address_count + 4
if entry_end > len(byte_string):
raise Exception("Classless domain list is corrupted.")
offset += 1
destination_address_end = offset + destination_address_count
destination_address = byte_string[offset:destination_address_end]
# Pad the destination address bytes with zero byte octets to
# fill out an IPv4 address.
destination_address += '\x00' * (4 - destination_address_count)
router_address = byte_string[destination_address_end:entry_end]
route_list.append((prefix_size,
socket.inet_ntoa(destination_address),
socket.inet_ntoa(router_address)))
offset = entry_end
return route_list
class DomainListOption(Option):
"""
This is a RFC 1035 compliant domain list option parser and serializer.
There are some clever compression optimizations that it does not implement
for serialization, but correctly parses. This should be sufficient for
testing.
"""
# Various RFC's let you finish a domain name by pointing to an existing
# domain name rather than repeating the same suffix. All such pointers are
# two bytes long, specify the offset in the byte string, and begin with
# |POINTER_PREFIX| to distinguish them from normal characters.
POINTER_PREFIX = ord("\xC0")
@staticmethod
def pack(value):
domain_list = value
byte_string = ""
for domain in domain_list:
for part in domain.split("."):
byte_string += chr(len(part))
byte_string += part
byte_string += "\x00"
return byte_string
@staticmethod
def unpack(byte_string):
domain_list = []
offset = 0
try:
while offset < len(byte_string):
(new_offset, domain_parts) = DomainListOption._read_domain_name(
byte_string,
offset)
domain_name = ".".join(domain_parts)
domain_list.append(domain_name)
if new_offset <= offset:
raise Exception("Parsing logic error is letting domain "
"list parsing go on forever.")
offset = new_offset
except ValueError:
# Badly formatted packets are not necessarily test errors.
logging.warning("Found badly formatted DHCP domain search list")
return None
return domain_list
@staticmethod
def _read_domain_name(byte_string, offset):
"""
Recursively parse a domain name from a domain name list.
"""
parts = []
while True:
if offset >= len(byte_string):
raise ValueError("Domain list ended without a NULL byte.")
maybe_part_len = ord(byte_string[offset])
offset += 1
if maybe_part_len == 0:
# Domains are terminated with either a 0 or a pointer to a
# domain suffix within |byte_string|.
return (offset, parts)
elif ((maybe_part_len & DomainListOption.POINTER_PREFIX) ==
DomainListOption.POINTER_PREFIX):
if offset >= len(byte_string):
raise ValueError("Missing second byte of domain suffix "
"pointer.")
maybe_part_len &= ~DomainListOption.POINTER_PREFIX
pointer_offset = ((maybe_part_len << 8) +
ord(byte_string[offset]))
offset += 1
(_, more_parts) = DomainListOption._read_domain_name(
byte_string,
pointer_offset)
parts.extend(more_parts)
return (offset, parts)
else:
# That byte was actually the length of the next part, not a
# pointer back into the data.
part_len = maybe_part_len
if offset + part_len >= len(byte_string):
raise ValueError("Part of a domain goes beyond data "
"length.")
parts.append(byte_string[offset : offset + part_len])
offset += part_len
"""
Represents a required field in a DHCP packet. Similar to Option, we'll
subclass Field to reflect that different fields serialize to on the wire formats
in different ways.
|name|
A human readable name for this field.
|offset|
The |offset| for a field defines the starting byte of the field in the
binary packet string. |offset| is used during parsing, along with
|size| to extract the byte string of a field.
|size|
Fields in DHCP packets have a fixed size that must be respected. This
size property is used in parsing to indicate that |self._size| number of
bytes make up this field.
"""
Field = collections.namedtuple("Field", ["name", "offset", "size"])
ByteField = CreatePacketPieceClass(Field, "!B")
ShortField = CreatePacketPieceClass(Field, "!H")
IntField = CreatePacketPieceClass(Field, "!I")
HwAddrField = CreatePacketPieceClass(Field, "!16s")
ServerNameField = CreatePacketPieceClass(Field, "!64s")
BootFileField = CreatePacketPieceClass(Field, "!128s")
class IpAddressField(Field):
@staticmethod
def pack(value):
return socket.inet_aton(value)
@staticmethod
def unpack(byte_string):
return socket.inet_ntoa(byte_string)
# This is per RFC 2131. The wording doesn't seem to say that the packets must
# be this big, but that has been the historic assumption in implementations.
DHCP_MIN_PACKET_SIZE = 300
IPV4_NULL_ADDRESS = "0.0.0.0"
# These are required in every DHCP packet. Without these fields, the
# packet will not even pass DhcpPacket.is_valid
FIELD_OP = ByteField("op", 0, 1)
FIELD_HWTYPE = ByteField("htype", 1, 1)
FIELD_HWADDR_LEN = ByteField("hlen", 2, 1)
FIELD_RELAY_HOPS = ByteField("hops", 3, 1)
FIELD_TRANSACTION_ID = IntField("xid", 4, 4)
FIELD_TIME_SINCE_START = ShortField("secs", 8, 2)
FIELD_FLAGS = ShortField("flags", 10, 2)
FIELD_CLIENT_IP = IpAddressField("ciaddr", 12, 4)
FIELD_YOUR_IP = IpAddressField("yiaddr", 16, 4)
FIELD_SERVER_IP = IpAddressField("siaddr", 20, 4)
FIELD_GATEWAY_IP = IpAddressField("giaddr", 24, 4)
FIELD_CLIENT_HWADDR = HwAddrField("chaddr", 28, 16)
# The following two fields are considered "legacy BOOTP" fields but may
# sometimes be used by DHCP clients.
FIELD_LEGACY_SERVER_NAME = ServerNameField("servername", 44, 64);
FIELD_LEGACY_BOOT_FILE = BootFileField("bootfile", 108, 128);
FIELD_MAGIC_COOKIE = IntField("magic_cookie", 236, 4)
OPTION_TIME_OFFSET = IntOption("time_offset", 2)
OPTION_ROUTERS = IpListOption("routers", 3)
OPTION_SUBNET_MASK = IpAddressOption("subnet_mask", 1)
OPTION_TIME_SERVERS = IpListOption("time_servers", 4)
OPTION_NAME_SERVERS = IpListOption("name_servers", 5)
OPTION_DNS_SERVERS = IpListOption("dns_servers", 6)
OPTION_LOG_SERVERS = IpListOption("log_servers", 7)
OPTION_COOKIE_SERVERS = IpListOption("cookie_servers", 8)
OPTION_LPR_SERVERS = IpListOption("lpr_servers", 9)
OPTION_IMPRESS_SERVERS = IpListOption("impress_servers", 10)
OPTION_RESOURCE_LOC_SERVERS = IpListOption("resource_loc_servers", 11)
OPTION_HOST_NAME = RawOption("host_name", 12)
OPTION_BOOT_FILE_SIZE = ShortOption("boot_file_size", 13)
OPTION_MERIT_DUMP_FILE = RawOption("merit_dump_file", 14)
OPTION_DOMAIN_NAME = RawOption("domain_name", 15)
OPTION_SWAP_SERVER = IpAddressOption("swap_server", 16)
OPTION_ROOT_PATH = RawOption("root_path", 17)
OPTION_EXTENSIONS = RawOption("extensions", 18)
OPTION_INTERFACE_MTU = ShortOption("interface_mtu", 26)
OPTION_VENDOR_ENCAPSULATED_OPTIONS = RawOption(
"vendor_encapsulated_options", 43)
OPTION_REQUESTED_IP = IpAddressOption("requested_ip", 50)
OPTION_IP_LEASE_TIME = IntOption("ip_lease_time", 51)
OPTION_OPTION_OVERLOAD = ByteOption("option_overload", 52)
OPTION_DHCP_MESSAGE_TYPE = ByteOption("dhcp_message_type", 53)
OPTION_SERVER_ID = IpAddressOption("server_id", 54)
OPTION_PARAMETER_REQUEST_LIST = ByteListOption("parameter_request_list", 55)
OPTION_MESSAGE = RawOption("message", 56)
OPTION_MAX_DHCP_MESSAGE_SIZE = ShortOption("max_dhcp_message_size", 57)
OPTION_RENEWAL_T1_TIME_VALUE = IntOption("renewal_t1_time_value", 58)
OPTION_REBINDING_T2_TIME_VALUE = IntOption("rebinding_t2_time_value", 59)
OPTION_VENDOR_ID = RawOption("vendor_id", 60)
OPTION_CLIENT_ID = RawOption("client_id", 61)
OPTION_TFTP_SERVER_NAME = RawOption("tftp_server_name", 66)
OPTION_BOOTFILE_NAME = RawOption("bootfile_name", 67)
OPTION_FULLY_QUALIFIED_DOMAIN_NAME = RawOption("fqdn", 81)
OPTION_DNS_DOMAIN_SEARCH_LIST = DomainListOption("domain_search_list", 119)
OPTION_CLASSLESS_STATIC_ROUTES = ClasslessStaticRoutesOption(
"classless_static_routes", 121)
OPTION_WEB_PROXY_AUTO_DISCOVERY = RawOption("wpad", 252)
# Unlike every other option, which are tuples like:
# <number, length in bytes, data>, the pad and end options are just
# single bytes "\x00" and "\xff" (without length or data fields).
OPTION_PAD = 0
OPTION_END = 255
DHCP_COMMON_FIELDS = [
FIELD_OP,
FIELD_HWTYPE,
FIELD_HWADDR_LEN,
FIELD_RELAY_HOPS,
FIELD_TRANSACTION_ID,
FIELD_TIME_SINCE_START,
FIELD_FLAGS,
FIELD_CLIENT_IP,
FIELD_YOUR_IP,
FIELD_SERVER_IP,
FIELD_GATEWAY_IP,
FIELD_CLIENT_HWADDR,
]
DHCP_REQUIRED_FIELDS = DHCP_COMMON_FIELDS + [
FIELD_MAGIC_COOKIE,
]
DHCP_ALL_FIELDS = DHCP_COMMON_FIELDS + [
FIELD_LEGACY_SERVER_NAME,
FIELD_LEGACY_BOOT_FILE,
FIELD_MAGIC_COOKIE,
]
# The op field in an ipv4 packet is either 1 or 2 depending on
# whether the packet is from a server or from a client.
FIELD_VALUE_OP_CLIENT_REQUEST = 1
FIELD_VALUE_OP_SERVER_RESPONSE = 2
# 1 == 10mb ethernet hardware address type (aka MAC).
FIELD_VALUE_HWTYPE_10MB_ETH = 1
# MAC addresses are still 6 bytes long.
FIELD_VALUE_HWADDR_LEN_10MB_ETH = 6
FIELD_VALUE_MAGIC_COOKIE = 0x63825363
OPTIONS_START_OFFSET = 240
MessageType = collections.namedtuple('MessageType', 'name option_value')
# From RFC2132, the valid DHCP message types are:
MESSAGE_TYPE_UNKNOWN = MessageType('UNKNOWN', 0)
MESSAGE_TYPE_DISCOVERY = MessageType('DISCOVERY', 1)
MESSAGE_TYPE_OFFER = MessageType('OFFER', 2)
MESSAGE_TYPE_REQUEST = MessageType('REQUEST', 3)
MESSAGE_TYPE_DECLINE = MessageType('DECLINE', 4)
MESSAGE_TYPE_ACK = MessageType('ACK', 5)
MESSAGE_TYPE_NAK = MessageType('NAK', 6)
MESSAGE_TYPE_RELEASE = MessageType('RELEASE', 7)
MESSAGE_TYPE_INFORM = MessageType('INFORM', 8)
MESSAGE_TYPE_BY_NUM = [
None,
MESSAGE_TYPE_DISCOVERY,
MESSAGE_TYPE_OFFER,
MESSAGE_TYPE_REQUEST,
MESSAGE_TYPE_DECLINE,
MESSAGE_TYPE_ACK,
MESSAGE_TYPE_NAK,
MESSAGE_TYPE_RELEASE,
MESSAGE_TYPE_INFORM
]
OPTION_VALUE_PARAMETER_REQUEST_LIST_DEFAULT = [
OPTION_REQUESTED_IP.number,
OPTION_IP_LEASE_TIME.number,
OPTION_SERVER_ID.number,
OPTION_SUBNET_MASK.number,
OPTION_ROUTERS.number,
OPTION_DNS_SERVERS.number,
OPTION_HOST_NAME.number,
]
# These are possible options that may not be in every packet.
# Frequently, the client can include a bunch of options that indicate
# that it would like to receive information about time servers, routers,
# lpr servers, and much more, but the DHCP server can usually ignore
# those requests.
#
# Eventually, each option is encoded as:
# <option.number, option.size, [array of option.size bytes]>
# Unlike fields, which make up a fixed packet format, options can be in
# any order, except where they cannot. For instance, option 1 must
# follow option 3 if both are supplied. For this reason, potential
# options are in this list, and added to the packet in this order every
# time.
#
# size < 0 indicates that this is variable length field of at least
# abs(length) bytes in size.
DHCP_PACKET_OPTIONS = [
OPTION_TIME_OFFSET,
OPTION_ROUTERS,
OPTION_SUBNET_MASK,
OPTION_TIME_SERVERS,
OPTION_NAME_SERVERS,
OPTION_DNS_SERVERS,
OPTION_LOG_SERVERS,
OPTION_COOKIE_SERVERS,
OPTION_LPR_SERVERS,
OPTION_IMPRESS_SERVERS,
OPTION_RESOURCE_LOC_SERVERS,
OPTION_HOST_NAME,
OPTION_BOOT_FILE_SIZE,
OPTION_MERIT_DUMP_FILE,
OPTION_SWAP_SERVER,
OPTION_DOMAIN_NAME,
OPTION_ROOT_PATH,
OPTION_EXTENSIONS,
OPTION_INTERFACE_MTU,
OPTION_VENDOR_ENCAPSULATED_OPTIONS,
OPTION_REQUESTED_IP,
OPTION_IP_LEASE_TIME,
OPTION_OPTION_OVERLOAD,
OPTION_DHCP_MESSAGE_TYPE,
OPTION_SERVER_ID,
OPTION_PARAMETER_REQUEST_LIST,
OPTION_MESSAGE,
OPTION_MAX_DHCP_MESSAGE_SIZE,
OPTION_RENEWAL_T1_TIME_VALUE,
OPTION_REBINDING_T2_TIME_VALUE,
OPTION_VENDOR_ID,
OPTION_CLIENT_ID,
OPTION_TFTP_SERVER_NAME,
OPTION_BOOTFILE_NAME,
OPTION_FULLY_QUALIFIED_DOMAIN_NAME,
OPTION_DNS_DOMAIN_SEARCH_LIST,
OPTION_CLASSLESS_STATIC_ROUTES,
OPTION_WEB_PROXY_AUTO_DISCOVERY,
]
def get_dhcp_option_by_number(number):
for option in DHCP_PACKET_OPTIONS:
if option.number == number:
return option
return None
class DhcpPacket(object):
@staticmethod
def create_discovery_packet(hwmac_addr):
"""
Create a discovery packet.
Fill in fields of a DHCP packet as if it were being sent from
|hwmac_addr|. Requests subnet masks, broadcast addresses, router
addresses, dns addresses, domain search lists, client host name, and NTP
server addresses. Note that the offer packet received in response to
this packet will probably not contain all of that information.
"""
# MAC addresses are actually only 6 bytes long, however, for whatever
# reason, DHCP allocated 12 bytes to this field. Ease the burden on
# developers and hide this detail.
while len(hwmac_addr) < 12:
hwmac_addr += chr(OPTION_PAD)
packet = DhcpPacket()
packet.set_field(FIELD_OP, FIELD_VALUE_OP_CLIENT_REQUEST)
packet.set_field(FIELD_HWTYPE, FIELD_VALUE_HWTYPE_10MB_ETH)
packet.set_field(FIELD_HWADDR_LEN, FIELD_VALUE_HWADDR_LEN_10MB_ETH)
packet.set_field(FIELD_RELAY_HOPS, 0)
packet.set_field(FIELD_TRANSACTION_ID, random.getrandbits(32))
packet.set_field(FIELD_TIME_SINCE_START, 0)
packet.set_field(FIELD_FLAGS, 0)
packet.set_field(FIELD_CLIENT_IP, IPV4_NULL_ADDRESS)
packet.set_field(FIELD_YOUR_IP, IPV4_NULL_ADDRESS)
packet.set_field(FIELD_SERVER_IP, IPV4_NULL_ADDRESS)
packet.set_field(FIELD_GATEWAY_IP, IPV4_NULL_ADDRESS)
packet.set_field(FIELD_CLIENT_HWADDR, hwmac_addr)
packet.set_field(FIELD_MAGIC_COOKIE, FIELD_VALUE_MAGIC_COOKIE)
packet.set_option(OPTION_DHCP_MESSAGE_TYPE,
MESSAGE_TYPE_DISCOVERY.option_value)
return packet
@staticmethod
def create_offer_packet(transaction_id,
hwmac_addr,
offer_ip,
server_ip):
"""
Create an offer packet, given some fields that tie the packet to a
particular offer.
"""
packet = DhcpPacket()
packet.set_field(FIELD_OP, FIELD_VALUE_OP_SERVER_RESPONSE)
packet.set_field(FIELD_HWTYPE, FIELD_VALUE_HWTYPE_10MB_ETH)
packet.set_field(FIELD_HWADDR_LEN, FIELD_VALUE_HWADDR_LEN_10MB_ETH)
# This has something to do with relay agents
packet.set_field(FIELD_RELAY_HOPS, 0)
packet.set_field(FIELD_TRANSACTION_ID, transaction_id)
packet.set_field(FIELD_TIME_SINCE_START, 0)
packet.set_field(FIELD_FLAGS, 0)
packet.set_field(FIELD_CLIENT_IP, IPV4_NULL_ADDRESS)
packet.set_field(FIELD_YOUR_IP, offer_ip)
packet.set_field(FIELD_SERVER_IP, server_ip)
packet.set_field(FIELD_GATEWAY_IP, IPV4_NULL_ADDRESS)
packet.set_field(FIELD_CLIENT_HWADDR, hwmac_addr)
packet.set_field(FIELD_MAGIC_COOKIE, FIELD_VALUE_MAGIC_COOKIE)
packet.set_option(OPTION_DHCP_MESSAGE_TYPE,
MESSAGE_TYPE_OFFER.option_value)
return packet
@staticmethod
def create_request_packet(transaction_id,
hwmac_addr):
packet = DhcpPacket()
packet.set_field(FIELD_OP, FIELD_VALUE_OP_CLIENT_REQUEST)
packet.set_field(FIELD_HWTYPE, FIELD_VALUE_HWTYPE_10MB_ETH)
packet.set_field(FIELD_HWADDR_LEN, FIELD_VALUE_HWADDR_LEN_10MB_ETH)
# This has something to do with relay agents
packet.set_field(FIELD_RELAY_HOPS, 0)
packet.set_field(FIELD_TRANSACTION_ID, transaction_id)
packet.set_field(FIELD_TIME_SINCE_START, 0)
packet.set_field(FIELD_FLAGS, 0)
packet.set_field(FIELD_CLIENT_IP, IPV4_NULL_ADDRESS)
packet.set_field(FIELD_YOUR_IP, IPV4_NULL_ADDRESS)
packet.set_field(FIELD_SERVER_IP, IPV4_NULL_ADDRESS)
packet.set_field(FIELD_GATEWAY_IP, IPV4_NULL_ADDRESS)
packet.set_field(FIELD_CLIENT_HWADDR, hwmac_addr)
packet.set_field(FIELD_MAGIC_COOKIE, FIELD_VALUE_MAGIC_COOKIE)
packet.set_option(OPTION_DHCP_MESSAGE_TYPE,
MESSAGE_TYPE_REQUEST.option_value)
return packet
@staticmethod
def create_acknowledgement_packet(transaction_id,
hwmac_addr,
granted_ip,
server_ip):
packet = DhcpPacket()
packet.set_field(FIELD_OP, FIELD_VALUE_OP_SERVER_RESPONSE)
packet.set_field(FIELD_HWTYPE, FIELD_VALUE_HWTYPE_10MB_ETH)
packet.set_field(FIELD_HWADDR_LEN, FIELD_VALUE_HWADDR_LEN_10MB_ETH)
# This has something to do with relay agents
packet.set_field(FIELD_RELAY_HOPS, 0)
packet.set_field(FIELD_TRANSACTION_ID, transaction_id)
packet.set_field(FIELD_TIME_SINCE_START, 0)
packet.set_field(FIELD_FLAGS, 0)
packet.set_field(FIELD_CLIENT_IP, IPV4_NULL_ADDRESS)
packet.set_field(FIELD_YOUR_IP, granted_ip)
packet.set_field(FIELD_SERVER_IP, server_ip)
packet.set_field(FIELD_GATEWAY_IP, IPV4_NULL_ADDRESS)
packet.set_field(FIELD_CLIENT_HWADDR, hwmac_addr)
packet.set_field(FIELD_MAGIC_COOKIE, FIELD_VALUE_MAGIC_COOKIE)
packet.set_option(OPTION_DHCP_MESSAGE_TYPE,
MESSAGE_TYPE_ACK.option_value)
return packet
@staticmethod
def create_nak_packet(transaction_id, hwmac_addr):
"""
Create a negative acknowledge packet.
@param transaction_id: The DHCP transaction ID.
@param hwmac_addr: The client's MAC address.
"""
packet = DhcpPacket()
packet.set_field(FIELD_OP, FIELD_VALUE_OP_SERVER_RESPONSE)
packet.set_field(FIELD_HWTYPE, FIELD_VALUE_HWTYPE_10MB_ETH)
packet.set_field(FIELD_HWADDR_LEN, FIELD_VALUE_HWADDR_LEN_10MB_ETH)
# This has something to do with relay agents
packet.set_field(FIELD_RELAY_HOPS, 0)
packet.set_field(FIELD_TRANSACTION_ID, transaction_id)
packet.set_field(FIELD_TIME_SINCE_START, 0)
packet.set_field(FIELD_FLAGS, 0)
packet.set_field(FIELD_CLIENT_IP, IPV4_NULL_ADDRESS)
packet.set_field(FIELD_YOUR_IP, IPV4_NULL_ADDRESS)
packet.set_field(FIELD_SERVER_IP, IPV4_NULL_ADDRESS)
packet.set_field(FIELD_GATEWAY_IP, IPV4_NULL_ADDRESS)
packet.set_field(FIELD_CLIENT_HWADDR, hwmac_addr)
packet.set_field(FIELD_MAGIC_COOKIE, FIELD_VALUE_MAGIC_COOKIE)
packet.set_option(OPTION_DHCP_MESSAGE_TYPE,
MESSAGE_TYPE_NAK.option_value)
return packet
def __init__(self, byte_str=None):
"""
Create a DhcpPacket, filling in fields from a byte string if given.
Assumes that the packet starts at offset 0 in the binary string. This
includes the fields and options. Fields are different from options in
that we bother to decode these into more usable data types like
integers rather than keeping them as raw byte strings. Fields are also
required to exist, unlike options which may not.
Each option is encoded as a tuple <option number, length, data> where
option number is a byte indicating the type of option, length indicates
the number of bytes in the data for option, and data is a length array
of bytes. The only exceptions to this rule are the 0 and 255 options,
which have 0 data length, and no length byte. These tuples are then
simply appended to each other. This encoding is the same as the BOOTP
vendor extention field encoding.
"""
super(DhcpPacket, self).__init__()
self._options = {}
self._fields = {}
if byte_str is None:
return
if len(byte_str) < OPTIONS_START_OFFSET + 1:
logging.error("Invalid byte string for packet.")
return
for field in DHCP_ALL_FIELDS:
self._fields[field] = field.unpack(byte_str[field.offset :
field.offset +
field.size])
offset = OPTIONS_START_OFFSET
domain_search_list_byte_string = ""
while offset < len(byte_str) and ord(byte_str[offset]) != OPTION_END:
data_type = ord(byte_str[offset])
offset += 1
if data_type == OPTION_PAD:
continue
data_length = ord(byte_str[offset])
offset += 1
data = byte_str[offset: offset + data_length]
offset += data_length
option = get_dhcp_option_by_number(data_type)
if option is None:
logging.warning("Unsupported DHCP option found. "
"Option number: %d", data_type)
continue
if option == OPTION_DNS_DOMAIN_SEARCH_LIST:
# In a cruel twist of fate, the server is allowed to give
# multiple options with this number. The client is expected to
# concatenate the byte strings together and use it as a single
# value.
domain_search_list_byte_string += data
continue
option_value = option.unpack(data)
if option == OPTION_PARAMETER_REQUEST_LIST:
logging.info("Requested options: %s", str(option_value))
self._options[option] = option_value
if domain_search_list_byte_string:
self._options[OPTION_DNS_DOMAIN_SEARCH_LIST] = option_value
@property
def client_hw_address(self):
return self._fields.get(FIELD_CLIENT_HWADDR)
@property
def is_valid(self):
"""
Checks that we have (at a minimum) values for all the required fields,
and that the magic cookie is set correctly.
"""
for field in DHCP_REQUIRED_FIELDS:
if self._fields.get(field) is None:
logging.warning("Missing field %s in packet.", field)
return False
if self._fields[FIELD_MAGIC_COOKIE] != FIELD_VALUE_MAGIC_COOKIE:
return False
return True
@property
def message_type(self):
"""
Gets the value of the DHCP Message Type option in this packet.
If the option is not present, or the value of the option is not
recognized, returns MESSAGE_TYPE_UNKNOWN.
@returns The MessageType for this packet, or MESSAGE_TYPE_UNKNOWN.
"""
if (self._options.has_key(OPTION_DHCP_MESSAGE_TYPE) and
self._options[OPTION_DHCP_MESSAGE_TYPE] > 0 and
self._options[OPTION_DHCP_MESSAGE_TYPE] < len(MESSAGE_TYPE_BY_NUM)):
return MESSAGE_TYPE_BY_NUM[self._options[OPTION_DHCP_MESSAGE_TYPE]]
else:
return MESSAGE_TYPE_UNKNOWN
@property
def transaction_id(self):
return self._fields.get(FIELD_TRANSACTION_ID)
def get_field(self, field):
return self._fields.get(field)
def get_option(self, option):
return self._options.get(option)
def set_field(self, field, field_value):
self._fields[field] = field_value
def set_option(self, option, option_value):
self._options[option] = option_value
def to_binary_string(self):
if not self.is_valid:
return None
# A list of byte strings to be joined into a single string at the end.
data = []
offset = 0
for field in DHCP_ALL_FIELDS:
if field not in self._fields:
continue
field_data = field.pack(self._fields[field])
while offset < field.offset:
# This should only happen when we're padding the fields because
# we're not filling in legacy BOOTP stuff.
data.append("\x00")
offset += 1
data.append(field_data)
offset += field.size
# Last field processed is the magic cookie, so we're ready for options.
# Have to process options
for option in DHCP_PACKET_OPTIONS:
option_value = self._options.get(option)
if option_value is None:
continue
serialized_value = option.pack(option_value)
data.append(struct.pack("BB",
option.number,
len(serialized_value)))
offset += 2
data.append(serialized_value)
offset += len(serialized_value)
data.append(chr(OPTION_END))
offset += 1
while offset < DHCP_MIN_PACKET_SIZE:
data.append(chr(OPTION_PAD))
offset += 1
return "".join(data)
def __str__(self):
options = [k.name + "=" + str(v) for k, v in self._options.items()]
fields = [k.name + "=" + str(v) for k, v in self._fields.items()]
return "<DhcpPacket fields=%s, options=%s>" % (fields, options)