blob: 158cd16269a2f6ed519ae27a37449cf449245bef [file] [log] [blame] [edit]
# Copyright (c) 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import fcntl
import logging
import os
import pyudev
import random
import re
import socket
import struct
import subprocess
import sys
import time
from autotest_lib.client.bin import test, utils
from autotest_lib.client.common_lib import error
class EthernetDongle(object):
""" Used for definining the desired module expect states. """
def __init__(self, expect_speed='100', expect_duplex='full'):
# Expected values for parameters.
self.expected_parameters = {
'ifconfig_status': 0,
'duplex': expect_duplex,
'speed': expect_speed,
'mac_address': None,
'ipaddress': None,
}
def GetParam(self, parameter):
return self.expected_parameters[parameter]
class network_EthernetStressPlug(test.test):
version = 1
def initialize(self, interface=None):
""" Determines and defines the bus information and interface info. """
self.link_speed_failures = 0
sysnet = os.path.join('/', 'sys', 'class', 'net')
def get_ethernet_interface(interface):
""" Valid interface requires link and duplex status."""
avail_eth_interfaces=[]
if interface is None:
# This is not the (bridged) eth dev we are looking for.
for x in os.listdir(sysnet):
sysdev = os.path.join(sysnet, x, 'device')
syswireless = os.path.join(sysnet, x, 'wireless')
if os.path.exists(sysdev) and not os.path.exists(syswireless):
avail_eth_interfaces.append(x)
else:
sysdev = os.path.join(sysnet, interface, 'device')
if os.path.exists(sysdev):
avail_eth_interfaces.append(interface)
else:
raise error.TestError('Network Interface %s is not a device ' % iface)
link_status = 'unknown'
duplex_status = 'unknown'
iface = 'unknown'
for iface in avail_eth_interfaces:
syslink = os.path.join(sysnet, iface, 'operstate')
try:
link_file = open(syslink)
link_status = link_file.readline().strip()
link_file.close()
except:
pass
sysduplex = os.path.join(sysnet, iface, 'duplex')
try:
duplex_file = open(sysduplex)
duplex_status = duplex_file.readline().strip()
duplex_file.close()
except:
pass
if link_status == 'up' and duplex_status == 'full':
return iface
raise error.TestError('Network Interface %s not usable (%s, %s)'
% (iface, link_status, duplex_status))
def get_net_device_path(device=''):
""" Uses udev to get the path of the desired internet device.
Args:
device: look for the /sys entry for this ethX device
Returns:
/sys pathname for the found ethX device or raises an error.
"""
net_list = pyudev.Context().list_devices(subsystem='net')
for dev in net_list:
if dev.sys_path.endswith('net/%s' % device):
return dev.sys_path
raise error.TestError('Could not find /sys device path for %s'
% device)
self.interface = get_ethernet_interface(interface)
self.eth_syspath = get_net_device_path(self.interface)
self.eth_flagspath = os.path.join(self.eth_syspath, 'flags')
# USB Dongles: "authorized" file will disable the USB port and
# in some cases powers off the port. In either case, net/eth* goes
# away. And thus "../../.." won't be valid to access "authorized".
# Build the pathname that goes directly to authpath.
auth_path = os.path.join(self.eth_syspath, '../../../authorized')
if os.path.exists(auth_path):
# now rebuild the path w/o use of '..'
auth_path = os.path.split(self.eth_syspath)[0]
auth_path = os.path.split(auth_path)[0]
auth_path = os.path.split(auth_path)[0]
self.eth_authpath = os.path.join(auth_path,'authorized')
else:
self.eth_authpath = None
# Stores the status of the most recently run iteration.
self.test_status = {
'ipaddress': None,
'eth_state': None,
'reason': None,
'last_wait': 0
}
self.secs_before_warning = 10
# Represents the current number of instances in which ethernet
# took longer than dhcp_warning_level to come up.
self.warning_count = 0
# The percentage of test warnings before we fail the test.
self.warning_threshold = .25
def GetIPAddress(self):
""" Obtains the ipaddress of the interface. """
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
return socket.inet_ntoa(fcntl.ioctl(
s.fileno(), 0x8915, # SIOCGIFADDR
struct.pack('256s', self.interface[:15]))[20:24])
except:
return None
def GetEthernetStatus(self):
"""
Updates self.test_status with the status of the ethernet interface.
Returns:
True if the ethernet device is up. False otherwise.
"""
def ReadEthVal(param):
""" Reads the network parameters of the interface. """
eth_path = os.path.join('/', 'sys', 'class', 'net', self.interface,
param)
val = None
try:
fp = open(eth_path)
val = fp.readline().strip()
fp.close()
except:
pass
return val
eth_out = self.ParseEthTool()
ethernet_status = {
'ifconfig_status': utils.system('ifconfig %s' % self.interface,
ignore_status=True),
'duplex': eth_out.get('Duplex'),
'speed': eth_out.get('Speed'),
'mac_address': ReadEthVal('address'),
'ipaddress': self.GetIPAddress()
}
self.test_status['ipaddress'] = ethernet_status['ipaddress']
for param, val in ethernet_status.iteritems():
if self.dongle.GetParam(param) is None:
# For parameters with expected values none, we check the
# existence of a value.
if not bool(val):
self.test_status['eth_state'] = False
self.test_status['reason'] = '%s is not ready: %s == %s' \
% (self.interface, param, val)
return False
else:
if val != self.dongle.GetParam(param):
self.test_status['eth_state'] = False
self.test_status['reason'] = '%s is not ready. (%s)\n' \
" Expected: '%s'\n" \
" Received: '%s'" \
% (self.interface, param,
self.dongle.GetParam(param),
val)
return False
self.test_status['eth_state'] = True
self.test_status['reason'] = None
return True
def _PowerEthernet(self, power=1):
""" Sends command to change the power state of ethernet.
Args:
power: 0 to unplug, 1 to plug.
"""
if self.eth_authpath:
try:
fp = open(self.eth_authpath, 'w')
fp.write('%d' % power)
fp.close()
except:
raise error.TestError('Could not write %d to %s' %
(power, self.eth_authpath))
# Linux can set network link state by frobbing "flags" bitfields.
# Bit fields are documented in include/uapi/linux/if.h.
# Bit 0 is IFF_UP (link up=1 or down=0).
elif os.path.exists(self.eth_flagspath):
try:
fp = open(self.eth_flagspath, mode='r')
val= int(fp.readline().strip(), 16)
fp.close()
except:
raise error.TestError('Could not read %s' % self.eth_flagspath)
if power:
newval = val | 1
else:
newval = val & ~1
if val != newval:
try:
fp = open(self.eth_flagspath, mode='w')
fp.write('0x%x' % newval)
fp.close()
except:
raise error.TestError('Could not write 0x%x to %s' %
(newval, self.eth_flagspath))
logging.debug("eth flags: 0x%x to 0x%x" % (val, newval))
# else use ifconfig eth0 up/down to switch
else:
logging.warning('plug/unplug event control not found. '
'Use ifconfig %s %s instead' %
(self.interface, 'up' if power else 'down'))
result = subprocess.check_call(['ifconfig', self.interface,
'up' if power else 'down'])
if result:
raise error.TestError('Fail to change the power state of %s' %
self.interface)
def TestPowerEthernet(self, power=1, timeout=45):
""" Tests enabling or disabling the ethernet.
Args:
power: 0 to unplug, 1 to plug.
timeout: Indicates approximately the number of seconds to timeout
how long we should check for the success of the ethernet
state change.
Returns:
The time in seconds required for device to transfer to the desired
state.
Raises:
error.TestFail if the ethernet status is not in the desired state.
"""
start_time = time.time()
end_time = start_time + timeout
power_str = ['off', 'on']
self._PowerEthernet(power)
while time.time() < end_time:
status = self.GetEthernetStatus()
# If GetEthernetStatus() detects the wrong link rate, "bouncing"
# the link _should_ recover. Keep count of how many times this
# happens. Test should fail if happens "frequently".
if power and not status and 'speed' in self.test_status['reason']:
self._PowerEthernet(0)
time.sleep(1)
self._PowerEthernet(power)
self.link_speed_failures += 1
logging.warning('Link Renegotiated ' +
self.test_status['reason'])
# If ethernet is enabled and has an IP, OR
# if ethernet is disabled and does not have an IP,
# then we are in the desired state.
# Return the number of "seconds" for this to happen.
# (translated to an approximation of the number of seconds)
if (power and status and \
self.test_status['ipaddress'] is not None) \
or \
(not power and not status and \
self.test_status['ipaddress'] is None):
return time.time()-start_time
time.sleep(1)
logging.debug(self.test_status['reason'])
raise error.TestFail('ERROR: TIMEOUT : %s IP is %s after setting '
'power %s (last_wait = %.2f seconds)' %
(self.interface, self.test_status['ipaddress'],
power_str[power], self.test_status['last_wait']))
def RandSleep(self, min_sleep, max_sleep):
""" Sleeps for a random duration.
Args:
min_sleep: Minimum sleep parameter in miliseconds.
max_sleep: Maximum sleep parameter in miliseconds.
"""
duration = random.randint(min_sleep, max_sleep)/1000.0
self.test_status['last_wait'] = duration
time.sleep(duration)
def _ParseEthTool_LinkModes(self, line):
""" Parses Ethtool Link Mode Entries.
Inputs:
line: Space separated string of link modes that have the format
(\d+)baseT/(Half|Full) (eg. 100baseT/Full).
Outputs:
List of dictionaries where each dictionary has the format
{ 'Speed': '<speed>', 'Duplex': '<duplex>' }
"""
parameters = []
# QCA ESS EDMA driver doesn't report "Supported link modes:"
if 'Not reported' in line:
return parameters
for speed_to_parse in line.split():
speed_duplex = speed_to_parse.split('/')
parameters.append(
{
'Speed': re.search('(\d*)', speed_duplex[0]).groups()[0],
'Duplex': speed_duplex[1],
}
)
return parameters
def ParseEthTool(self):
"""
Parses the output of Ethtools into a dictionary and returns
the dictionary with some cleanup in the below areas:
Speed: Remove the unit of speed.
Supported link modes: Construct a list of dictionaries.
The list is ordered (relying on ethtool)
and each of the dictionaries contains a Speed
kvp and a Duplex kvp.
Advertised link modes: Same as 'Supported link modes'.
Sample Ethtool Output:
Supported ports: [ TP MII ]
Supported link modes: 10baseT/Half 10baseT/Full
100baseT/Half 100baseT/Full
1000baseT/Half 1000baseT/Full
Supports auto-negotiation: Yes
Advertised link modes: 10baseT/Half 10baseT/Full
100baseT/Half 100baseT/Full
1000baseT/Full
Advertised auto-negotiation: Yes
Speed: 1000Mb/s
Duplex: Full
Port: MII
PHYAD: 2
Transceiver: internal
Auto-negotiation: on
Supports Wake-on: pg
Wake-on: d
Current message level: 0x00000007 (7)
Link detected: yes
Returns:
A dictionary representation of the above ethtool output, or an empty
dictionary if no ethernet dongle is present.
Eg.
{
'Supported ports': '[ TP MII ]',
'Supported link modes': [{'Speed': '10', 'Duplex': 'Half'},
{...},
{'Speed': '1000', 'Duplex': 'Full'}],
'Supports auto-negotiation: 'Yes',
'Advertised link modes': [{'Speed': '10', 'Duplex': 'Half'},
{...},
{'Speed': '1000', 'Duplex': 'Full'}],
'Advertised auto-negotiation': 'Yes'
'Speed': '1000',
'Duplex': 'Full',
'Port': 'MII',
'PHYAD': '2',
'Transceiver': 'internal',
'Auto-negotiation': 'on',
'Supports Wake-on': 'pg',
'Wake-on': 'd',
'Current message level': '0x00000007 (7)',
'Link detected': 'yes',
}
"""
parameters = {}
ethtool_out = os.popen('ethtool %s' % self.interface).read().split('\n')
if 'No data available' in ethtool_out:
return parameters
# bridged interfaces only have two lines of ethtool output.
if len(ethtool_out) < 3:
return parameters
# For multiline entries, keep track of the key they belong to.
current_key = ''
for line in ethtool_out:
current_line = line.strip().partition(':')
if current_line[1] == ':':
current_key = current_line[0]
# Assumes speed does not span more than one line.
# Also assigns empty string if speed field
# is not available.
if current_key == 'Speed':
speed = re.search('^\s*(\d*)', current_line[2])
parameters[current_key] = ''
if speed:
parameters[current_key] = speed.groups()[0]
elif (current_key == 'Supported link modes' or
current_key == 'Advertised link modes'):
parameters[current_key] = []
parameters[current_key] += \
self._ParseEthTool_LinkModes(current_line[2])
else:
parameters[current_key] = current_line[2].strip()
else:
if (current_key == 'Supported link modes' or
current_key == 'Advertised link modes'):
parameters[current_key] += \
self._ParseEthTool_LinkModes(current_line[0])
else:
parameters[current_key]+=current_line[0].strip()
return parameters
def GetDongle(self):
""" Returns the ethernet dongle object associated with what's connected.
Dongle uniqueness is retrieved from the 'product' file that is
associated with each usb dongle in
/sys/devices/pci.*/0000.*/usb.*/.*-.*/product. The correct
dongle object is determined and returned.
Returns:
Object of type EthernetDongle.
Raises:
error.TestFail if ethernet dongle is not found.
"""
ethtool_dict = self.ParseEthTool()
if not ethtool_dict:
raise error.TestFail('Unable to parse ethtool output for %s.' %
self.interface)
# Ethtool output is ordered in terms of speed so this obtains the
# fastest speed supported by dongle.
# QCA ESS EDMA driver doesn't report "Supported link modes".
max_link = ethtool_dict['Advertised link modes'][-1]
return EthernetDongle(expect_speed=max_link['Speed'],
expect_duplex=max_link['Duplex'])
def run_once(self, num_iterations=1):
try:
self.dongle = self.GetDongle()
#Sleep for a random duration between .5 and 2 seconds
#for unplug and plug scenarios.
for i in range(num_iterations):
logging.debug('Iteration: %d start' % i)
linkdown_time = self.TestPowerEthernet(power=0)
linkdown_wait = self.test_status['last_wait']
if linkdown_time > self.secs_before_warning:
self.warning_count+=1
self.RandSleep(500, 2000)
linkup_time = self.TestPowerEthernet(power=1)
linkup_wait = self.test_status['last_wait']
if linkup_time > self.secs_before_warning:
self.warning_count+=1
self.RandSleep(500, 2000)
logging.debug('Iteration: %d end (down:%f/%d up:%f/%d)' %
(i, linkdown_wait, linkdown_time,
linkup_wait, linkup_time))
if self.warning_count > num_iterations * self.warning_threshold:
raise error.TestFail('ERROR: %.2f%% of total runs (%d) '
'took longer than %d seconds for '
'ethernet to come up.' %
(self.warning_threshold*100,
num_iterations,
self.secs_before_warning))
# Link speed failures are secondary.
# Report after all iterations complete.
if self.link_speed_failures > 1:
raise error.TestFail('ERROR: %s : Link Renegotiated %d times'
% (self.interface, self.link_speed_failures))
except Exception as e:
exc_info = sys.exc_info()
self._PowerEthernet(1)
raise exc_info[0], exc_info[1], exc_info[2]