blob: eabd4e2878e6165d3d8088c9cdc23828c5c94a29 [file] [log] [blame]
# Copyright (c) 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from autotest_lib.client.common_lib import error
import os
class FrameSender(object):
"""Context manager for sending management frames."""
_sender_count = 0
def __init__(self, router, frame_type, channel, ssid_prefix=None,
num_bss=None, frame_count=None, delay=None, dest_addr=None,
probe_resp_footer=None, instance=0):
"""
@param router: LinuxRouter object router to send frames from.
@param frame_type: int management frame type.
@param channel: int targeted channel.
@param ssid_prefix: string SSID prefix for BSSes in the frames.
@param num_bss: int number of BSSes configured for sending frames.
@param frame_count: int number of frames to send, frame_count of 0
implies infinite number of frames.
@param delay: int delay in between frames in milliseconds.
@param dest_addr: MAC address of the destination address (DA).
@param probe_resp_footer: footer bytes for probe responses.
@param instance: int hostapd instance on router to send frames from.
"""
if router.board == "panther":
raise error.TestNAError('Panther router does not support manual '
'beacon frame generation')
self._router = router
self._channel = channel
self._frame_type = frame_type
self._ssid_prefix = ssid_prefix
self._num_bss = num_bss
self._frame_count = frame_count
self._delay = delay
self._dest_addr = dest_addr
self._probe_resp_footer = probe_resp_footer
self._ap_interface = router.hostapd_instances[instance].interface
self._injection_interface = None
self._pid = None
self._index = FrameSender._sender_count
FrameSender._sender_count += 1
def __enter__(self):
self._injection_interface = self._router.get_configured_interface(
'monitor', same_phy_as=self._ap_interface)
self._pid = self._router.send_management_frame(
self._injection_interface,
self._frame_type, self._channel, ssid_prefix=self._ssid_prefix,
num_bss=self._num_bss, frame_count=self._frame_count,
delay=self._delay, dest_addr=self._dest_addr,
probe_resp_footer=self._probe_resp_footer)
return self
def __exit__(self, exception, value, traceback):
if self._injection_interface:
self._router.release_interface(self._injection_interface)
if self._pid:
# Kill process and wait for termination.
self._router.host.run(
'kill {pid};'
' for i in $(seq 1 10); do'
' kill -0 {pid} || break; sleep 0.2;'
' done'.format(pid=self._pid), ignore_status=True)
self._router.host.get_file(
os.path.join(
self._router.logdir, self._router.MGMT_FRAME_SENDER_LOG_FILE),
'debug/frame_sender_%d.log' % self._index)