blob: 34a6ee5d8c188d9f3120ffa3f89029dfcb066f5d [file] [log] [blame]
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import base64
import logging
import os
import re
from autotest_lib.client.common_lib import error
def get_server_addr_in_lab(hostname):
"""
If we are in the lab use the names for the server, AKA rspro, and the
router as defined in: go/chromeos-lab-hostname-convention
@param hostname String machine name in wifi cell
(e.g. chromeos1-shelf1-host1.cros)
@return String server name in cell
(e.g. chromeos1-shelf1-host1-server.cros)
"""
domain = ''
machine = hostname
if hostname.find('.') > 0:
domain_start = hostname.find('.')
domain = hostname[domain_start:]
machine = hostname[0:domain_start]
return '%s-server%s' % (machine, domain)
def get_router_addr_in_lab(hostname):
"""
If we are in the lab use the names for the server, AKA rspro, and the
router as defined in: go/chromeos-lab-hostname-convention
@param hostname String machine name in wifi cell
(e.g. chromeos1-shelf1-host1.cros)
@return String router name in cell
(e.g. chromeos1-shelf1-host1-router.cros)
"""
domain = ''
machine = hostname
if hostname.find('.') > 0:
domain_start = hostname.find('.')
domain = hostname[domain_start:]
machine = hostname[0:domain_start]
return '%s-router%s' % (machine, domain)
def is_installed(host, filename):
"""
Checks if a file exists on a remote machine.
@param host Host object representing the remote machine.
@param filename String path of the file to check for existence.
@return True if filename is installed on host; False otherwise.
"""
result = host.run("ls %s" % filename, ignore_status=True)
m = re.search(filename, result.stdout)
return m is not None
def get_install_path(host, filename, paths):
"""
Checks if a file exists on a remote machine in one of several paths.
@param host Host object representing the remote machine.
@param filename String name of the file to check for existence.
@param paths List of paths to check for filename in.
@return String full path of installed file, or None if not found.
"""
if not paths:
return None
# A single path entry is the same as testing is_installed().
if len(paths) == 1:
install_path = os.path.join(paths, filename)
if is_installed(host, install_path):
return install_path
return None
result = host.run("ls {%s}/%s" % (','.join(paths), filename),
ignore_status=True)
found_path = result.stdout.split('\n')[0]
return found_path or None
def must_be_installed(host, cmd):
"""
Asserts that cmd is installed on a remote machine at some path and raises
an exception if this is not the case.
@param host Host object representing the remote machine.
@param cmd String name of the command to check for existence.
@return String full path of cmd on success. Error raised on failure.
"""
if is_installed(host, cmd):
return cmd
# Hunt for the equivalent file in a bunch of places.
cmd_base = os.path.basename(cmd)
local_paths = ['/bin',
'/sbin',
'/usr/bin',
'/usr/sbin',
'/usr/local/bin',
'/usr/local/sbin']
alternate_path = get_install_path(host, cmd_base, local_paths)
if alternate_path:
return alternate_path
raise error.TestFail('Unable to find %s on %s' % (cmd, host.ip))
def get_default_ssid(test_name, ipaddr, host):
"""
Calculate ssid based on test name.
This lets us track progress by watching beacon frames. Generate a unique
suffix for this SSID based either a unique MAC address on the AP, or
failing this, the IP address of the AP.
@param test_name String name of this test (e.g. network_WiFiMatFunc).
@param ipaddr String IP address of the AP in this test.
@param host Host object representing the router.
@return String 32 character SSID.
"""
if test_name.find('network_') == 0:
# Many of our tests start with this very uninteresting prefix.
# Remove it so we can have more unique substring bytes.
test_name = test_name[len('network_'):]
address_lines = []
if host:
address_lines = host.run('/usr/sbin/ip addr show',
ignore_status=True).stdout.splitlines()
mac_re = re.compile('link/ether (?P<mac>(([0-9a-f]{2}):?){6}) ',
flags=re.IGNORECASE)
for line in address_lines:
mac_match = mac_re.search(line)
if mac_match:
mac_string = mac_match.group('mac')
if mac_string not in ['00:00:00:00:00:00', 'ff:ff:ff:ff:ff:ff']:
mac_bytes = ''
for octet in mac_string.split(':'):
mac_bytes += chr(int(octet, 16))
unique_name = base64.b64encode(mac_bytes)
break
else:
unique_name = ipaddr
return re.sub('[^a-zA-Z0-9_]', '_', "%s_%s" %
(test_name, unique_name))[0:32]
def ping_args(params):
"""
Builds up an argument string for the ping command from a dict of settings.
@param params dict of settings for ping.
@return String of arguments that ping will understand.
"""
args = ""
if 'count' in params:
args += " -c %s" % params['count']
if 'size' in params:
args += " -s %s" % params['size']
if 'bcast' in params:
args += " -b"
if 'flood' in params:
args += " -f"
if 'interval' in params:
args += " -i %s" % params['interval']
if 'interface' in params:
args += " -I %s" % params['interface']
if 'qos' in params:
ac = string.lower(params['qos'])
if ac == 'be':
args += " -Q 0x04"
elif ac == 'bk':
args += " -Q 0x02"
elif ac == 'vi':
args += " -Q 0x08"
elif ac == 'vo':
args += " -Q 0x10"
else:
args += " -Q %s" % ac
return args
def parse_ping_output(ping_output):
"""
Extract a dictionary of statistics from the output of the ping command.
On error, some statistics may be missing entirely from the output.
@param ping_output String output of ping.
@return dict of relevant statistics on success.
"""
stats = {}
for k in ('xmit', 'recv', 'loss', 'min', 'avg', 'max', 'dev'):
stats[k] = '???'
m = re.search('([0-9]*) packets transmitted,[ ]*([0-9]*)[ ]'
'(packets |)received, ([0-9]*)', ping_output)
if m is not None:
stats['xmit'] = m.group(1)
stats['recv'] = m.group(2)
stats['loss'] = m.group(4)
m = re.search('(round-trip|rtt) min[^=]*= '
'([0-9.]*)/([0-9.]*)/([0-9.]*)/([0-9.]*)', ping_output)
if m is not None:
stats['min'] = m.group(2)
stats['avg'] = m.group(3)
stats['max'] = m.group(4)
stats['dev'] = m.group(5)
return stats
def get_wlan_devs(host, command_iw):
"""Get a list of WiFi devices.
@param host host object representing a remote machine.
@param command_iw string path to 'iw' executable on host
@return list of string wifi device names. (e.g. ['mlan0']).
"""
ret = []
result = host.run('%s dev' % command_iw)
current_if = None
for line in result.stdout.splitlines():
ifmatch = re.search('Interface (\S*)', line)
if ifmatch is not None:
current_if = ifmatch.group(1)
elif ('type managed' in line or 'type IBSS' in line) and current_if:
ret.append(current_if)
logging.info('Found wireless interfaces %r', ret)
return ret
def get_interface_mac(host, ifname, command_ip):
"""Get the MAC address of a given interface on host.
@param host host object representing remote machine.
@param ifname string interface name.
@param command_ip string ip command on host.
@return string MAC address for interface on host.
"""
result = host.run("%s link show %s" % (command_ip, ifname))
macmatch = re.search("link/ether (\S*)", result.stdout)
if macmatch is not None:
return macmatch.group(1)
return None