blob: 2e99f2a319016e0a74ca3e34c5d312c94b309050 [file] [log] [blame]
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
from locale import *
from autotest_lib.client.common_lib import error
PYSHARK_LOAD_TIMEOUT = 2
FRAME_FIELD_RADIOTAP_DATARATE = 'radiotap.datarate'
FRAME_FIELD_RADIOTAP_MCS_INDEX = 'radiotap.mcs_index'
FRAME_FIELD_WLAN_FRAME_TYPE = 'wlan.fc_type_subtype'
FRAME_FIELD_WLAN_SOURCE_ADDR = 'wlan.sa'
FRAME_FIELD_WLAN_MGMT_SSID = 'wlan_mgt.ssid'
RADIOTAP_KNOWN_BAD_FCS_REJECTOR = (
'not radiotap.flags.badfcs or radiotap.flags.badfcs==0')
WLAN_BEACON_FRAME_TYPE = '0x08'
WLAN_BEACON_ACCEPTOR = 'wlan.fc.type_subtype==0x08'
WLAN_PROBE_REQ_FRAME_TYPE = '0x04'
WLAN_PROBE_REQ_ACCEPTOR = 'wlan.fc.type_subtype==0x04'
PYSHARK_BROADCAST_SSID = 'SSID: '
BROADCAST_SSID = ''
setlocale(LC_ALL, '')
class Frame(object):
"""A frame from a packet capture."""
TIME_FORMAT = "%H:%M:%S.%f"
def __init__(self, frametime, bit_rate, mcs_index, ssid, source_addr):
self._datetime = frametime
self._bit_rate = bit_rate
self._mcs_index = mcs_index
self._ssid = ssid
self._source_addr = source_addr
@property
def time_datetime(self):
"""The time of the frame, as a |datetime| object."""
return self._datetime
@property
def bit_rate(self):
"""The bitrate used to transmit the frame, as an int."""
return self._bit_rate
@property
def mcs_index(self):
"""
The MCS index used to transmit the frame, as an int.
The value may be None, if the frame was not transmitted
using 802.11n modes.
"""
return self._mcs_index
@property
def ssid(self):
"""
The SSID of the frame, as a string.
The value may be None, if the frame does not have an SSID.
"""
return self._ssid
@property
def source_addr(self):
"""The source address of the frame, as a string."""
return self._source_addr
@property
def time_string(self):
"""The time of the frame, in local time, as a string."""
return self._datetime.strftime(self.TIME_FORMAT)
def _fetch_frame_field_value(frame, field):
"""
Retrieve the value of |field| within the |frame|.
@param frame: Pyshark packet object corresponding to a captured frame.
@param field: Field for which the value needs to be extracted from |frame|.
@return Value extracted from the frame if the field exists, else None.
"""
layer_object = frame
for layer in field.split('.'):
try:
layer_object = getattr(layer_object, layer)
except AttributeError:
return None
return layer_object
def _open_capture(pcap_path, display_filter):
"""
Get pyshark packet object parsed contents of a pcap file.
@param pcap_path: string path to pcap file.
@param display_filter: string filter to apply to captured frames.
@return list of Pyshark packet objects.
"""
import pyshark
capture = pyshark.FileCapture(
input_file=pcap_path, display_filter=display_filter)
capture.load_packets(timeout=PYSHARK_LOAD_TIMEOUT)
return capture
def get_frames(local_pcap_path, display_filter, bad_fcs):
"""
Get a parsed representation of the contents of a pcap file.
@param local_pcap_path: string path to a local pcap file on the host.
@param diplay_filter: string filter to apply to captured frames.
@param bad_fcs: string 'include' or 'discard'
@return list of Frame structs.
"""
if bad_fcs == 'include':
display_filter = display_filter
elif bad_fcs == 'discard':
display_filter = '(%s) and (%s)' % (RADIOTAP_KNOWN_BAD_FCS_REJECTOR,
display_filter)
else:
raise error.TestError('Invalid value for bad_fcs arg: %s.' % bad_fcs)
logging.debug('Capture: %s, Filter: %s', local_pcap_path, display_filter)
capture_frames = _open_capture(local_pcap_path, display_filter)
frames = []
logging.info('Parsing frames')
for frame in capture_frames:
rate = _fetch_frame_field_value(frame, FRAME_FIELD_RADIOTAP_DATARATE)
if rate:
rate = atof(rate)
else:
logging.debug('Found bad capture frame: %s', frame)
continue
frametime = frame.sniff_time
mcs_index = _fetch_frame_field_value(
frame, FRAME_FIELD_RADIOTAP_MCS_INDEX)
if mcs_index:
mcs_index = int(mcs_index)
source_addr = _fetch_frame_field_value(
frame, FRAME_FIELD_WLAN_SOURCE_ADDR)
# Get the SSID for any probe requests
frame_type = _fetch_frame_field_value(
frame, FRAME_FIELD_WLAN_FRAME_TYPE)
if (frame_type in [WLAN_BEACON_FRAME_TYPE, WLAN_PROBE_REQ_FRAME_TYPE]):
ssid = _fetch_frame_field_value(frame, FRAME_FIELD_WLAN_MGMT_SSID)
# Since the SSID name is a variable length field, there seems to be
# a bug in the pyshark parsing, it returns 'SSID: ' instead of ''
# for broadcast SSID's.
if ssid == PYSHARK_BROADCAST_SSID:
ssid = BROADCAST_SSID
else:
ssid = None
frames.append(Frame(frametime, rate, mcs_index, ssid, source_addr))
return frames
def get_probe_ssids(local_pcap_path, probe_sender=None):
"""
Get the SSIDs that were named in 802.11 probe requests frames.
Parse a pcap, returning all the SSIDs named in 802.11 probe
request frames. If |probe_sender| is specified, only probes
from that MAC address will be considered.
@param pcap_path: string path to a local pcap file on the host.
@param remote_host: Host object (if the file is remote).
@param probe_sender: MAC address of the device sending probes.
@return: A frozenset of the SSIDs that were probed.
"""
if probe_sender:
diplay_filter = '%s and wlan.addr==%s' % (
WLAN_PROBE_REQ_ACCEPTOR, probe_sender)
else:
diplay_filter = WLAN_PROBE_REQ_ACCEPTOR
frames = get_frames(local_pcap_path, diplay_filter, bad_fcs='discard')
return frozenset(
[frame.ssid for frame in frames if frame.ssid is not None])