blob: 6588c1e1b15147255937f3ad2a078fc4669014a5 [file] [log] [blame]
import logging, re, time
from autotest_lib.server import autotest, test, hosts
from autotest_lib.server.cros import stress
from autotest_lib.client.common_lib import error
# Timeout duration to wait for a DHCP response.
# It usually doesn't take this long, but just in case.
TIMEOUT = 60
class network_StressServoEthernetPlug(test.test):
ETH_MAC = 'mac'
ETH_IP = 'ipaddress'
version = 1
def initialize(self, host):
self.host = host
self.host_iface = None
self.servo_iface = None
self.servo_eth_up()
end_time = time.time() + TIMEOUT
while time.time() < end_time:
self.eth_interfaces = self.get_eth_interfaces()
if len(self.eth_interfaces) >= 2:
break
time.sleep(1)
# Assuming 2 ethernet interfaces, the interface not for host
# is that associated with servo.
for iface, eth_dict in self.eth_interfaces.iteritems():
if eth_dict[self.ETH_IP] == self.host.hostname:
self.host_iface = iface
else:
self.servo_iface = iface
if not self.servo_iface:
raise error.TestError('Cannot find servo ethernet interface')
logging.info('Servo eth: %s', self.servo_iface)
logging.info('Host eth: %s', self.host_iface)
def servo_eth_up(self):
logging.info('Bringing up ethernet')
self.host.servo.set('dut_hub_on', 'yes')
def servo_eth_down(self):
logging.info('Bringing down ethernet')
self.host.servo.set('dut_hub_on', 'no')
def get_eth_interfaces(self):
""" Gets the ethernet device object.
Returns:
A dictionary of ethernet devices.
{
'eth<x>':
{
'mac': <mac address>,
'ipaddress': <ipaddress>,
}
}
"""
results = self.host.run('ifconfig').stdout.split('\n')
eth_dict = {}
iterator = results.__iter__()
for line in iterator:
# Search for the beginning of an interface section.
iface_start = re.search('^(eth\S+)\s+Link encap:Ethernet\s+HWaddr'
'\s+(\S+)', line)
if iface_start:
(iface, hwaddr) = iface_start.groups()
line = iterator.next()
result = re.search('^\s+inet addr:(\S+)\s+', line)
ipaddress = None
if result:
ipaddress = result.groups()[0]
eth_dict[iface] = {self.ETH_MAC: hwaddr, self.ETH_IP: ipaddress}
return eth_dict
def verify_eth_status(self, up_list, timeout=TIMEOUT):
""" Verify the up_list ifaces are up (and its contrapositive). """
end_time = time.time() + timeout
interfaces = {}
while time.time() < end_time:
interfaces = self.get_eth_interfaces()
error_message = ('Expected eth status %s but instead got %s' %
(up_list, interfaces.keys()))
if set(interfaces.keys()) == set(up_list):
# Check to make sure all the interfaces are up.
for iface, eth_dict in interfaces.iteritems():
if not eth_dict[self.ETH_IP]:
error_message = ('Ethernet interface %s did not '
'receive address.' % iface)
break
else:
# All desired interfaces are up, and they have ip addresses.
break
time.sleep(1)
else:
# If the while loop terminates without interruption, we've timed out
# waiting for the interface.
raise error.TestFail(error_message)
def run_once(self, num_iterations=1):
for iteration in range(num_iterations):
logging.info('Executing iteration %d', iteration)
self.servo_eth_down()
self.verify_eth_status([self.host_iface])
self.servo_eth_up()
self.verify_eth_status([self.host_iface, self.servo_iface])