"""File containing class to build all available ap_configurators."""
import logging
import common
from import ap_constants
from autotest_lib.server import site_utils
from autotest_lib.server.cros import ap_config
from autotest_lib.server.cros.ap_configurators import ap_cartridge
from autotest_lib.server.cros.ap_configurators import ap_spec
from autotest_lib.server.cros.ap_configurators import static_ap_configurator
from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
class APConfiguratorFactory(object):
"""Class that instantiates all available APConfigurators.
@attribute CONFIGURATOR_MAP: a dict of strings, mapping to model-specific
APConfigurator objects.
@attribute BANDS: a string, bands supported by an AP.
@attribute MODES: a string, 802.11 modes supported by an AP.
@attribute SECURITIES: a string, security methods supported by an AP.
@attribute HOSTNAMES: a string, AP hostname.
@attribute ap_list: a list of APConfigurator objects.
@attribute ap_config: an APConfiguratorConfig object.
BANDS = 'bands'
MODES = 'modes'
SECURITIES = 'securities'
HOSTNAMES = 'hostnames'
def __init__(self, ap_test_type, spec=None):
webdriver_ready = False
self.ap_list = []
self.test_type = ap_test_type
for ap in ap_config.get_ap_list():
def _get_aps_by_visibility(self, visible=True):
"""Returns all configurators that support setting visibility.
@param visibility = True if SSID should be visible; False otherwise.
@returns aps: a set of APConfigurators"""
if visible:
return set(self.ap_list)
return set(filter(lambda ap: ap.is_visibility_supported(),
def _get_aps_by_mode(self, band, mode):
"""Returns all configurators that support a given 802.11 mode.
@param band: an 802.11 band.
@param mode: an 802.11 modes.
@returns aps: a set of APConfigurators.
if not mode:
return set(self.ap_list)
aps = []
for ap in self.ap_list:
modes = ap.get_supported_modes()
for d in modes:
if d['band'] == band and mode in d['modes']:
return set(aps)
def _get_aps_by_security(self, security):
"""Returns all configurators that support a given security mode.
@param security: the security type
@returns aps: a set of APConfigurators.
if not security:
return set(self.ap_list)
aps = []
for ap in self.ap_list:
if ap.is_security_mode_supported(security):
return set(aps)
def _get_aps_by_band(self, band, channel=None):
"""Returns all APs that support a given band.
@param band: the band desired.
@returns aps: a set of APConfigurators.
if not band:
return set(self.ap_list)
aps = []
for ap in self.ap_list:
bands_and_channels = ap.get_supported_bands()
for d in bands_and_channels:
if channel:
if d['band'] == band and channel in d['channels']:
elif d['band'] == band:
return set(aps)
def get_aps_by_hostnames(self, hostnames, ap_list=None):
"""Returns specific APs by host name.
@param hostnames: a list of strings, AP's wan_hostname defined in the AP
configuration file.
@param ap_list: a list of APConfigurator objects.
@return a list of APConfigurators.
if ap_list == None:
ap_list = self.ap_list
aps = []
for ap in ap_list:
if ap.host_name in hostnames:'Found AP by hostname %s', ap.host_name)
return aps
def _get_aps_by_configurator_type(self, configurator_type, ap_list):
"""Returns APs that match the given configurator type.
@param configurator_type: the type of configurtor to return.
@param ap_list: a list of APConfigurator objects.
@return a list of APConfigurators.
aps = []
for ap in ap_list:
if ap.configurator_type == configurator_type:
return aps
def _get_aps_by_lab_location(self, want_chamber_aps, ap_list):
"""Returns APs that are inside or outside of the chaos/clique lab.
@param want_chamber_aps: True to select only APs in the chaos/clique
chamber. False to select APs outside of the chaos/clique chamber.
@param ap_list: a list of APConfigurator objects.
@return a list of APConfigurators
aps = []
afe = frontend_wrappers.RetryingAFE(
timeout_min=10, delay_sec=5, server=site_utils.get_global_afe_hostname())
if self.test_type == ap_constants.AP_TEST_TYPE_CHAOS:
ap_label = 'chaos_ap'
lab_label = 'chaos_chamber'
elif self.test_type == ap_constants.AP_TEST_TYPE_CLIQUE:
ap_label = 'clique_ap'
lab_label = 'clique_chamber'
elif self.test_type == ap_constants.AP_TEST_TYPE_CASEY5:
ap_label = 'casey_ap5'
lab_label = 'casey_chamber5'
elif self.test_type == ap_constants.AP_TEST_TYPE_CASEY7:
ap_label = 'casey_ap7'
lab_label = 'casey_chamber7'
return None
all_aps = set(afe.get_hostnames(label=ap_label))
chamber_devices = set(afe.get_hostnames(label=lab_label))
chamber_aps = all_aps.intersection(chamber_devices)
for ap in ap_list:
if want_chamber_aps and ap.host_name in chamber_aps:
if not want_chamber_aps and ap.host_name not in chamber_aps:
return aps
def get_ap_configurators_by_spec(self, spec=None, pre_configure=False):
"""Returns available configurators meeting spec.
@param spec: a validated ap_spec object
@param pre_configure: boolean, True to set all of the configuration
options for the APConfigurator object using the
given ap_spec; False otherwise. An ap_spec must
be passed for this to have any effect.
@returns aps: a list of APConfigurator objects
if not spec:
return self.ap_list
# APSpec matching is exact. With the exception of lab location, even
# if a hostname is passed the capabilities of a given configurator
# much match everything in the APSpec. This helps to prevent failures
# during the pre-scan phase.
aps = self._get_aps_by_band(,
aps &= self._get_aps_by_mode(, spec.mode)
aps &= self._get_aps_by_security(
aps &= self._get_aps_by_visibility(spec.visible)
matching_aps = list(aps)
# If APs hostnames are provided, assume the tester knows the location
# of the AP and skip AFE calls.
if spec.hostnames is None:
matching_aps = self._get_aps_by_lab_location(spec.lab_ap,
if spec.configurator_type != ap_spec.CONFIGURATOR_ANY:
matching_aps = self._get_aps_by_configurator_type(
spec.configurator_type, matching_aps)
if spec.hostnames is not None:
matching_aps = self.get_aps_by_hostnames(spec.hostnames,
if pre_configure:
for ap in matching_aps:
return matching_aps
def turn_off_all_routers(self, broken_pdus):
"""Powers down all of the routers.
@param broken_pdus: list of bad/offline PDUs.
ap_power_cartridge = ap_cartridge.APCartridge()
for ap in self.ap_list: