| # Copyright (c) 2016 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import fcntl |
| import logging |
| import os |
| import pyudev |
| import random |
| import re |
| import socket |
| import struct |
| import subprocess |
| import sys |
| import time |
| |
| from autotest_lib.client.bin import test, utils |
| from autotest_lib.client.common_lib import error |
| |
| |
| class EthernetDongle(object): |
| """ Used for definining the desired module expect states. """ |
| |
| def __init__(self, expect_speed='100', expect_duplex='full'): |
| # Expected values for parameters. |
| self.expected_parameters = { |
| 'ifconfig_status': 0, |
| 'duplex': expect_duplex, |
| 'speed': expect_speed, |
| 'mac_address': None, |
| 'ipaddress': None, |
| } |
| |
| def GetParam(self, parameter): |
| return self.expected_parameters[parameter] |
| |
| class network_EthernetStressPlug(test.test): |
| version = 1 |
| |
| def initialize(self, interface=None): |
| """ Determines and defines the bus information and interface info. """ |
| |
| self.link_speed_failures = 0 |
| sysnet = os.path.join('/', 'sys', 'class', 'net') |
| |
| def get_ethernet_interface(interface): |
| """ Valid interface requires link and duplex status.""" |
| avail_eth_interfaces=[] |
| if interface is None: |
| # This is not the (bridged) eth dev we are looking for. |
| for x in os.listdir(sysnet): |
| sysdev = os.path.join(sysnet, x, 'device') |
| syswireless = os.path.join(sysnet, x, 'wireless') |
| if os.path.exists(sysdev) and not os.path.exists(syswireless): |
| avail_eth_interfaces.append(x) |
| else: |
| sysdev = os.path.join(sysnet, interface, 'device') |
| if os.path.exists(sysdev): |
| avail_eth_interfaces.append(interface) |
| else: |
| raise error.TestError('Network Interface %s is not a device ' % iface) |
| |
| link_status = 'unknown' |
| duplex_status = 'unknown' |
| iface = 'unknown' |
| |
| for iface in avail_eth_interfaces: |
| syslink = os.path.join(sysnet, iface, 'operstate') |
| try: |
| link_file = open(syslink) |
| link_status = link_file.readline().strip() |
| link_file.close() |
| except: |
| pass |
| |
| sysduplex = os.path.join(sysnet, iface, 'duplex') |
| try: |
| duplex_file = open(sysduplex) |
| duplex_status = duplex_file.readline().strip() |
| duplex_file.close() |
| except: |
| pass |
| |
| if link_status == 'up' and duplex_status == 'full': |
| return iface |
| |
| raise error.TestError('Network Interface %s not usable (%s, %s)' |
| % (iface, link_status, duplex_status)) |
| |
| def get_net_device_path(device=''): |
| """ Uses udev to get the path of the desired internet device. |
| Args: |
| device: look for the /sys entry for this ethX device |
| Returns: |
| /sys pathname for the found ethX device or raises an error. |
| """ |
| net_list = pyudev.Context().list_devices(subsystem='net') |
| for dev in net_list: |
| if dev.sys_path.endswith('net/%s' % device): |
| return dev.sys_path |
| |
| raise error.TestError('Could not find /sys device path for %s' |
| % device) |
| |
| self.interface = get_ethernet_interface(interface) |
| self.eth_syspath = get_net_device_path(self.interface) |
| self.eth_flagspath = os.path.join(self.eth_syspath, 'flags') |
| |
| # USB Dongles: "authorized" file will disable the USB port and |
| # in some cases powers off the port. In either case, net/eth* goes |
| # away. And thus "../../.." won't be valid to access "authorized". |
| # Build the pathname that goes directly to authpath. |
| auth_path = os.path.join(self.eth_syspath, '../../../authorized') |
| if os.path.exists(auth_path): |
| # now rebuild the path w/o use of '..' |
| auth_path = os.path.split(self.eth_syspath)[0] |
| auth_path = os.path.split(auth_path)[0] |
| auth_path = os.path.split(auth_path)[0] |
| |
| self.eth_authpath = os.path.join(auth_path,'authorized') |
| else: |
| self.eth_authpath = None |
| |
| # Stores the status of the most recently run iteration. |
| self.test_status = { |
| 'ipaddress': None, |
| 'eth_state': None, |
| 'reason': None, |
| 'last_wait': 0 |
| } |
| |
| self.secs_before_warning = 10 |
| |
| # Represents the current number of instances in which ethernet |
| # took longer than dhcp_warning_level to come up. |
| self.warning_count = 0 |
| |
| # The percentage of test warnings before we fail the test. |
| self.warning_threshold = .25 |
| |
| def GetIPAddress(self): |
| """ Obtains the ipaddress of the interface. """ |
| try: |
| s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
| return socket.inet_ntoa(fcntl.ioctl( |
| s.fileno(), 0x8915, # SIOCGIFADDR |
| struct.pack('256s', self.interface[:15]))[20:24]) |
| except: |
| return None |
| |
| def GetEthernetStatus(self): |
| """ |
| Updates self.test_status with the status of the ethernet interface. |
| |
| Returns: |
| True if the ethernet device is up. False otherwise. |
| """ |
| |
| def ReadEthVal(param): |
| """ Reads the network parameters of the interface. """ |
| eth_path = os.path.join('/', 'sys', 'class', 'net', self.interface, |
| param) |
| val = None |
| try: |
| fp = open(eth_path) |
| val = fp.readline().strip() |
| fp.close() |
| except: |
| pass |
| return val |
| |
| eth_out = self.ParseEthTool() |
| ethernet_status = { |
| 'ifconfig_status': utils.system('ifconfig %s' % self.interface, |
| ignore_status=True), |
| 'duplex': eth_out.get('Duplex'), |
| 'speed': eth_out.get('Speed'), |
| 'mac_address': ReadEthVal('address'), |
| 'ipaddress': self.GetIPAddress() |
| } |
| |
| self.test_status['ipaddress'] = ethernet_status['ipaddress'] |
| |
| for param, val in ethernet_status.iteritems(): |
| if self.dongle.GetParam(param) is None: |
| # For parameters with expected values none, we check the |
| # existence of a value. |
| if not bool(val): |
| self.test_status['eth_state'] = False |
| self.test_status['reason'] = '%s is not ready: %s == %s' \ |
| % (self.interface, param, val) |
| return False |
| else: |
| if val != self.dongle.GetParam(param): |
| self.test_status['eth_state'] = False |
| self.test_status['reason'] = '%s is not ready. (%s)\n' \ |
| " Expected: '%s'\n" \ |
| " Received: '%s'" \ |
| % (self.interface, param, |
| self.dongle.GetParam(param), |
| val) |
| return False |
| |
| self.test_status['eth_state'] = True |
| self.test_status['reason'] = None |
| return True |
| |
| def _PowerEthernet(self, power=1): |
| """ Sends command to change the power state of ethernet. |
| Args: |
| power: 0 to unplug, 1 to plug. |
| """ |
| |
| if self.eth_authpath: |
| try: |
| fp = open(self.eth_authpath, 'w') |
| fp.write('%d' % power) |
| fp.close() |
| except: |
| raise error.TestError('Could not write %d to %s' % |
| (power, self.eth_authpath)) |
| |
| # Linux can set network link state by frobbing "flags" bitfields. |
| # Bit fields are documented in include/uapi/linux/if.h. |
| # Bit 0 is IFF_UP (link up=1 or down=0). |
| elif os.path.exists(self.eth_flagspath): |
| try: |
| fp = open(self.eth_flagspath, mode='r') |
| val= int(fp.readline().strip(), 16) |
| fp.close() |
| except: |
| raise error.TestError('Could not read %s' % self.eth_flagspath) |
| |
| if power: |
| newval = val | 1 |
| else: |
| newval = val & ~1 |
| |
| if val != newval: |
| try: |
| fp = open(self.eth_flagspath, mode='w') |
| fp.write('0x%x' % newval) |
| fp.close() |
| except: |
| raise error.TestError('Could not write 0x%x to %s' % |
| (newval, self.eth_flagspath)) |
| logging.debug("eth flags: 0x%x to 0x%x" % (val, newval)) |
| |
| # else use ifconfig eth0 up/down to switch |
| else: |
| logging.warning('plug/unplug event control not found. ' |
| 'Use ifconfig %s %s instead' % |
| (self.interface, 'up' if power else 'down')) |
| result = subprocess.check_call(['ifconfig', self.interface, |
| 'up' if power else 'down']) |
| if result: |
| raise error.TestError('Fail to change the power state of %s' % |
| self.interface) |
| |
| def TestPowerEthernet(self, power=1, timeout=45): |
| """ Tests enabling or disabling the ethernet. |
| Args: |
| power: 0 to unplug, 1 to plug. |
| timeout: Indicates approximately the number of seconds to timeout |
| how long we should check for the success of the ethernet |
| state change. |
| |
| Returns: |
| The time in seconds required for device to transfer to the desired |
| state. |
| |
| Raises: |
| error.TestFail if the ethernet status is not in the desired state. |
| """ |
| |
| start_time = time.time() |
| end_time = start_time + timeout |
| |
| power_str = ['off', 'on'] |
| self._PowerEthernet(power) |
| |
| while time.time() < end_time: |
| status = self.GetEthernetStatus() |
| |
| |
| # If GetEthernetStatus() detects the wrong link rate, "bouncing" |
| # the link _should_ recover. Keep count of how many times this |
| # happens. Test should fail if happens "frequently". |
| if power and not status and 'speed' in self.test_status['reason']: |
| self._PowerEthernet(0) |
| time.sleep(1) |
| self._PowerEthernet(power) |
| self.link_speed_failures += 1 |
| logging.warning('Link Renegotiated ' + |
| self.test_status['reason']) |
| |
| # If ethernet is enabled and has an IP, OR |
| # if ethernet is disabled and does not have an IP, |
| # then we are in the desired state. |
| # Return the number of "seconds" for this to happen. |
| # (translated to an approximation of the number of seconds) |
| if (power and status and \ |
| self.test_status['ipaddress'] is not None) \ |
| or \ |
| (not power and not status and \ |
| self.test_status['ipaddress'] is None): |
| return time.time()-start_time |
| |
| time.sleep(1) |
| |
| logging.debug(self.test_status['reason']) |
| raise error.TestFail('ERROR: TIMEOUT : %s IP is %s after setting ' |
| 'power %s (last_wait = %.2f seconds)' % |
| (self.interface, self.test_status['ipaddress'], |
| power_str[power], self.test_status['last_wait'])) |
| |
| def RandSleep(self, min_sleep, max_sleep): |
| """ Sleeps for a random duration. |
| |
| Args: |
| min_sleep: Minimum sleep parameter in miliseconds. |
| max_sleep: Maximum sleep parameter in miliseconds. |
| """ |
| duration = random.randint(min_sleep, max_sleep)/1000.0 |
| self.test_status['last_wait'] = duration |
| time.sleep(duration) |
| |
| def _ParseEthTool_LinkModes(self, line): |
| """ Parses Ethtool Link Mode Entries. |
| Inputs: |
| line: Space separated string of link modes that have the format |
| (\d+)baseT/(Half|Full) (eg. 100baseT/Full). |
| |
| Outputs: |
| List of dictionaries where each dictionary has the format |
| { 'Speed': '<speed>', 'Duplex': '<duplex>' } |
| """ |
| parameters = [] |
| |
| # QCA ESS EDMA driver doesn't report "Supported link modes:" |
| if 'Not reported' in line: |
| return parameters |
| |
| for speed_to_parse in line.split(): |
| speed_duplex = speed_to_parse.split('/') |
| parameters.append( |
| { |
| 'Speed': re.search('(\d*)', speed_duplex[0]).groups()[0], |
| 'Duplex': speed_duplex[1], |
| } |
| ) |
| return parameters |
| |
| def ParseEthTool(self): |
| """ |
| Parses the output of Ethtools into a dictionary and returns |
| the dictionary with some cleanup in the below areas: |
| Speed: Remove the unit of speed. |
| Supported link modes: Construct a list of dictionaries. |
| The list is ordered (relying on ethtool) |
| and each of the dictionaries contains a Speed |
| kvp and a Duplex kvp. |
| Advertised link modes: Same as 'Supported link modes'. |
| |
| Sample Ethtool Output: |
| Supported ports: [ TP MII ] |
| Supported link modes: 10baseT/Half 10baseT/Full |
| 100baseT/Half 100baseT/Full |
| 1000baseT/Half 1000baseT/Full |
| Supports auto-negotiation: Yes |
| Advertised link modes: 10baseT/Half 10baseT/Full |
| 100baseT/Half 100baseT/Full |
| 1000baseT/Full |
| Advertised auto-negotiation: Yes |
| Speed: 1000Mb/s |
| Duplex: Full |
| Port: MII |
| PHYAD: 2 |
| Transceiver: internal |
| Auto-negotiation: on |
| Supports Wake-on: pg |
| Wake-on: d |
| Current message level: 0x00000007 (7) |
| Link detected: yes |
| |
| Returns: |
| A dictionary representation of the above ethtool output, or an empty |
| dictionary if no ethernet dongle is present. |
| Eg. |
| { |
| 'Supported ports': '[ TP MII ]', |
| 'Supported link modes': [{'Speed': '10', 'Duplex': 'Half'}, |
| {...}, |
| {'Speed': '1000', 'Duplex': 'Full'}], |
| 'Supports auto-negotiation: 'Yes', |
| 'Advertised link modes': [{'Speed': '10', 'Duplex': 'Half'}, |
| {...}, |
| {'Speed': '1000', 'Duplex': 'Full'}], |
| 'Advertised auto-negotiation': 'Yes' |
| 'Speed': '1000', |
| 'Duplex': 'Full', |
| 'Port': 'MII', |
| 'PHYAD': '2', |
| 'Transceiver': 'internal', |
| 'Auto-negotiation': 'on', |
| 'Supports Wake-on': 'pg', |
| 'Wake-on': 'd', |
| 'Current message level': '0x00000007 (7)', |
| 'Link detected': 'yes', |
| } |
| """ |
| parameters = {} |
| ethtool_out = os.popen('ethtool %s' % self.interface).read().split('\n') |
| if 'No data available' in ethtool_out: |
| return parameters |
| |
| # bridged interfaces only have two lines of ethtool output. |
| if len(ethtool_out) < 3: |
| return parameters |
| |
| # For multiline entries, keep track of the key they belong to. |
| current_key = '' |
| for line in ethtool_out: |
| current_line = line.strip().partition(':') |
| if current_line[1] == ':': |
| current_key = current_line[0] |
| |
| # Assumes speed does not span more than one line. |
| # Also assigns empty string if speed field |
| # is not available. |
| if current_key == 'Speed': |
| speed = re.search('^\s*(\d*)', current_line[2]) |
| parameters[current_key] = '' |
| if speed: |
| parameters[current_key] = speed.groups()[0] |
| elif (current_key == 'Supported link modes' or |
| current_key == 'Advertised link modes'): |
| parameters[current_key] = [] |
| parameters[current_key] += \ |
| self._ParseEthTool_LinkModes(current_line[2]) |
| else: |
| parameters[current_key] = current_line[2].strip() |
| else: |
| if (current_key == 'Supported link modes' or |
| current_key == 'Advertised link modes'): |
| parameters[current_key] += \ |
| self._ParseEthTool_LinkModes(current_line[0]) |
| else: |
| parameters[current_key]+=current_line[0].strip() |
| |
| return parameters |
| |
| def GetDongle(self): |
| """ Returns the ethernet dongle object associated with what's connected. |
| |
| Dongle uniqueness is retrieved from the 'product' file that is |
| associated with each usb dongle in |
| /sys/devices/pci.*/0000.*/usb.*/.*-.*/product. The correct |
| dongle object is determined and returned. |
| |
| Returns: |
| Object of type EthernetDongle. |
| |
| Raises: |
| error.TestFail if ethernet dongle is not found. |
| """ |
| ethtool_dict = self.ParseEthTool() |
| |
| if not ethtool_dict: |
| raise error.TestFail('Unable to parse ethtool output for %s.' % |
| self.interface) |
| |
| # Ethtool output is ordered in terms of speed so this obtains the |
| # fastest speed supported by dongle. |
| # QCA ESS EDMA driver doesn't report "Supported link modes". |
| max_link = ethtool_dict['Advertised link modes'][-1] |
| |
| return EthernetDongle(expect_speed=max_link['Speed'], |
| expect_duplex=max_link['Duplex']) |
| |
| def run_once(self, num_iterations=1): |
| try: |
| self.dongle = self.GetDongle() |
| |
| #Sleep for a random duration between .5 and 2 seconds |
| #for unplug and plug scenarios. |
| for i in range(num_iterations): |
| logging.debug('Iteration: %d start' % i) |
| linkdown_time = self.TestPowerEthernet(power=0) |
| linkdown_wait = self.test_status['last_wait'] |
| if linkdown_time > self.secs_before_warning: |
| self.warning_count+=1 |
| |
| self.RandSleep(500, 2000) |
| |
| linkup_time = self.TestPowerEthernet(power=1) |
| linkup_wait = self.test_status['last_wait'] |
| |
| if linkup_time > self.secs_before_warning: |
| self.warning_count+=1 |
| |
| self.RandSleep(500, 2000) |
| logging.debug('Iteration: %d end (down:%f/%d up:%f/%d)' % |
| (i, linkdown_wait, linkdown_time, |
| linkup_wait, linkup_time)) |
| |
| if self.warning_count > num_iterations * self.warning_threshold: |
| raise error.TestFail('ERROR: %.2f%% of total runs (%d) ' |
| 'took longer than %d seconds for ' |
| 'ethernet to come up.' % |
| (self.warning_threshold*100, |
| num_iterations, |
| self.secs_before_warning)) |
| |
| # Link speed failures are secondary. |
| # Report after all iterations complete. |
| if self.link_speed_failures > 1: |
| raise error.TestFail('ERROR: %s : Link Renegotiated %d times' |
| % (self.interface, self.link_speed_failures)) |
| |
| except Exception as e: |
| exc_info = sys.exc_info() |
| self._PowerEthernet(1) |
| raise exc_info[0], exc_info[1], exc_info[2] |