blob: 85f11587b3a2c3cb355bd9dfdb47add3952318f1 [file] [log] [blame]
# Lint as: python2, python3
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import logging
import os
import re
import time
import common
from autotest_lib.client.common_lib import error, global_config
from autotest_lib.client.common_lib.cros import retry
from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
from autotest_lib.server.hosts import cros_host
from autotest_lib.server.hosts import cros_repair
from chromite.lib import timeout_util
import six
AUTOTEST_INSTALL_DIR = global_config.global_config.get_config_value(
'SCHEDULER', 'drone_installation_directory')
# Sample output of fping that we are matching against, the fping command
# will return 10 lines but they will be one of these two formats.
# We want to get the IP address for the first line and not match the
# second line that has a non 0 %loss.
# : xmt/rcv/%loss = 10/10/0%, min/avg/max = 0.68/0.88/1.13
# : xmt/rcv/%loss = 10/0/100%
SUBNET_DUT_SEARCH_RE = (r'(?P<ip>[0-1][0-9]) : '
'xmt\/rcv\/%loss = [0-9]+\/[0-9]+\/0%')
MOBLAB_HOME = '/home/moblab'
MOBLAB_AUTODIR = '/usr/local/autodir'
DHCPD_LEASE_FILE = '/var/lib/dhcp/dhcpd.leases'
MOBLAB_SERVICES = ['moblab-scheduler-init',
MOBLAB_PROCESSES = ['apache2', 'dhcpd']
MOBLAB_TMP_DIR = '/mnt/moblab/tmp'
class UpstartServiceNotRunning(error.AutoservError):
"""An expected upstart service was not in the expected state."""
def __init__(self, service_name):
"""Create us.
@param service_name: Name of the service_name that was in the worng
super(UpstartServiceNotRunning, self).__init__(
'Upstart service %s not in running state. Most likely this '
'means moblab did not boot correctly, check the boot logs '
'for detailed error messages as to see why this service was '
'not started.' %
class MoblabHost(cros_host.CrosHost):
"""Moblab specific host class."""
def _initialize_frontend_rpcs(self, timeout_min):
"""Initialize frontends for AFE and TKO for a moblab host.
We tunnel all communication to the frontends through an SSH tunnel as
many testing environments block everything except SSH access to the
moblab DUT.
@param timeout_min: The timeout minuties for AFE services.
web_address = self.rpc_server_tracker.tunnel_connect(MOBLAB_PORT)
# Pass timeout_min to self.afe
self.afe = frontend_wrappers.RetryingAFE(timeout_min=timeout_min,
# Use default timeout_min of MoblabHost for self.tko
self.tko = frontend_wrappers.RetryingTKO(timeout_min=self.timeout_min,
def _initialize(self, *args, **dargs):
super(MoblabHost, self)._initialize(*args, **dargs)
# TODO(jrbarnette): Our superclass already initialized
# _repair_strategy, and now we're re-initializing it here.
# That's awkward, if not actually wrong.
self._repair_strategy = cros_repair.create_moblab_repair_strategy()
self.timeout_min = dargs.get('rpc_timeout_min', 1)
def check_host(host, timeout=10):
Check if the given host is an moblab host.
@param host: An ssh host representing a device.
@param timeout: The timeout for the run command.
@return: True if the host device has adb.
@raises AutoservRunError: If the command failed.
@raises AutoservSSHTimeout: Ssh connection has timed out.
result =
'grep -q moblab /etc/lsb-release',
ignore_status=True, timeout=timeout)
except (error.AutoservRunError, error.AutoservSSHTimeout):
return False
return result.exit_status == 0
def install_boto_file(self, boto_path=''):
"""Install a boto file on the Moblab device.
@param boto_path: Path to the boto file to install. If None, sends the
boto file in the current HOME directory.
@raises error.TestError if the boto file does not exist.
if not boto_path:
boto_path = os.path.join(os.getenv('HOME'), '.boto')
if not os.path.exists(boto_path):
raise error.TestError('Boto File:%s does not exist.' % boto_path)
self.send_file(boto_path, MOBLAB_BOTO_LOCATION)'chown moblab:moblab %s' % MOBLAB_BOTO_LOCATION)
def get_autodir(self):
"""Return the directory to install autotest for client side tests."""
return self.autodir or MOBLAB_AUTODIR
def run_as_moblab(self, command, **kwargs):
"""Moblab commands should be ran as the moblab user not root.
@param command: Command to run as user moblab.
command = "su - moblab -c '%s'" % command
return, **kwargs)
def wait_afe_up(self, timeout_min=5):
"""Wait till the AFE is up and loaded.
Attempt to reach the Moblab's AFE and database through its RPC
@param timeout_min: Minutes to wait for the AFE to respond. Default is
5 minutes.
@raises urllib2.HTTPError if AFE does not respond within the timeout.
# Use moblabhost's own AFE object with a longer timeout to wait for the
# AFE to load. Also re-create the ssh tunnel for connections to moblab.
# Set the timeout_min to be longer than self.timeout_min for rebooting.
# Verify the AFE can handle a simple request.
# Reset the timeout_min after rebooting checks for afe services.
def add_dut(self, hostname):
"""Add a DUT hostname to the AFE.
@param hostname: DUT hostname to add.
result = self.run_as_moblab('%s host create %s' % (ATEST_PATH,
logging.debug('atest host create output for host %s:\n%s',
hostname, result.stdout)
def find_and_add_duts(self):
"""Discover DUTs on the testing subnet and add them to the AFE.
Pings the range of IP's a DUT might be assigned by moblab, then
parses the output to discover connected DUTs, connected means
they have 0% dropped pings.
If they are not already in the AFE, adds them to AFE.
existing_hosts = [host.hostname for host in self.afe.get_hosts()]
fping_result ='fping -g '
'-a -c 10 -p 30 -q', ignore_status=True)
for line in fping_result.stderr.splitlines():
match = re.match(SUBNET_DUT_SEARCH_RE, line)
if match:
dut_ip ='ip')
if dut_ip in existing_hosts:
if self._check_dut_ssh(dut_ip):
def _check_dut_ssh(self, dut_ip):
is_sshable = False
count = 0
while not is_sshable and count < 10:
cmd = ('ssh -o ConnectTimeout=30 -o ConnectionAttempts=30'
' root@%s echo Testing' % dut_ip)
result =
is_sshable = 'Testing' in result.stdout
count += 1
return is_sshable
def verify_software(self):
"""Create the autodir then do standard verify."""
# In case cleanup or powerwash wiped the autodir, create an empty
# directory.
# Removing this mkdir command will result in the disk size check
# not being performed.'mkdir -p %s' % MOBLAB_AUTODIR)
super(MoblabHost, self).verify_software()
def _verify_upstart_service(self, service, timeout_m):
"""Verify that the given moblab service is running.
@param service: The upstart service to check for.
@timeout_m: Timeout (in minuts) before giving up.
@raises TimeoutException or UpstartServiceNotRunning if service isn't
@retry.retry(error.AutoservError, timeout_min=timeout_m, delay_sec=10)
def _verify():
if not self.upstart_status(service):
raise UpstartServiceNotRunning(service)
def verify_moblab_services(self, timeout_m):
"""Verify the required Moblab services are up and running.
@param timeout_m: Timeout (in minutes) for how long to wait for services
to start. Actual time taken may be slightly more than this.
@raises AutoservError if any moblab service is not running.
service = MOBLAB_SERVICES[0]
# First service can take a long time to start, especially on first
# boot where container setup can take 5-10 minutes, depending on the
# device.
self._verify_upstart_service(service, timeout_m)
except error.TimeoutException:
raise UpstartServiceNotRunning(service)
for service in MOBLAB_SERVICES[1:]:
# Follow up services should come up quickly.
self._verify_upstart_service(service, 0.5)
except error.TimeoutException:
raise UpstartServiceNotRunning(service)
for process in MOBLAB_PROCESSES:
try:'pgrep %s' % process)
except error.AutoservRunError:
raise error.AutoservError('Moblab process: %s is not running.'
% process)
def _check_afe(self):
"""Verify whether afe of moblab works before verifying its DUTs.
Verifying moblab sometimes happens after a successful provision, in
which case moblab is restarted but tunnel of afe is not re-connected.
This func is used to check whether afe is working now.
@return True if afe works.
@raises error.AutoservError if AFE is down; other exceptions are passed
except (error.TimeoutException, timeout_util.TimeoutError) as e:
raise error.AutoservError('Moblab AFE is not responding: %s' %
except Exception as e:
logging.error('Unknown exception when checking moblab AFE: %s', e)
return True
def verify_duts(self):
"""Verify the Moblab DUTs are up and running.
@raises AutoservError if no DUTs are in the Ready State.
hosts = self.afe.reverify_hosts()
logging.debug('DUTs scheduled for reverification: %s', hosts)
def verify_special_tasks_complete(self):
"""Wait till the special tasks on the moblab host are complete."""
total_time = 0
while (self.afe.get_special_tasks(is_complete=False) and
total_time < DUT_VERIFY_TIMEOUT):
total_time = total_time + DUT_VERIFY_SLEEP_SECS
if not self.afe.get_hosts(status='Ready'):
for host in self.afe.get_hosts():
logging.error('DUT: %s Status: %s', host, host.status)
raise error.AutoservError('Moblab has 0 Ready DUTs')
def get_platform(self):
"""Determine the correct platform label for this host.
For Moblab devices '_moblab' is appended.
@returns a string representing this host's platform.
return super(MoblabHost, self).get_platform() + '_moblab'
def make_tmp_dir(self, base=MOBLAB_TMP_DIR):
"""Creates a temporary directory.
@param base: The directory where it should be created.
@return Path to a newly created temporary directory.
"""'mkdir -p %s' % base)
return'mktemp -d -p %s' % base).stdout.strip()
def get_os_type(self):
return 'moblab'