blob: 308449d176e6ecc40bdd37355e9f9a2bcca0a9e6 [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import compiler, hashlib, logging, os, re, traceback, signal
import common
from autotest_lib.client.common_lib import base_job, control_data
from autotest_lib.client.common_lib import utils
from autotest_lib.client.common_lib.cros import dev_server
from autotest_lib.server.cros.dynamic_suite import constants
from autotest_lib.server.cros.dynamic_suite import control_file_getter
from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
from autotest_lib.server.cros.dynamic_suite import host_lock_manager, job_status
from autotest_lib.server.cros.dynamic_suite.job_status import Status
from autotest_lib.server import frontend
class Suite(object):
A suite of tests, defined by some predicate over control file variables.
Given a place to search for control files a predicate to match the desired
tests, can gather tests and fire off jobs to run them, and then wait for
@var _predicate: a function that should return True when run over a
ControlData representation of a control file that should be in
this Suite.
@var _tag: a string with which to tag jobs run in this suite.
@var _build: the build on which we're running this suite.
@var _afe: an instance of AFE as defined in server/
@var _tko: an instance of TKO as defined in server/
@var _jobs: currently scheduled jobs, if any.
@var _cf_getter: a control_file_getter.ControlFileGetter
def create_ds_getter(build, devserver):
@param build: the build on which we're running this suite.
@param devserver: the devserver which contains the build.
@return a FileSystemGetter instance that looks under |autotest_dir|.
return control_file_getter.DevServerGetter(build, devserver)
def create_fs_getter(autotest_dir):
@param autotest_dir: the place to find autotests.
@return a FileSystemGetter instance that looks under |autotest_dir|.
# currently hard-coded places to look for tests.
subpaths = ['server/site_tests', 'client/site_tests',
'server/tests', 'client/tests']
directories = [os.path.join(autotest_dir, p) for p in subpaths]
return control_file_getter.FileSystemGetter(directories)
def parse_tag(tag):
"""Splits a string on ',' optionally surrounded by whitespace."""
return map(lambda x: x.strip(), tag.split(','))
def name_in_tag_predicate(name):
"""Returns predicate that takes a control file and looks for |name|.
Builds a predicate that takes in a parsed control file (a ControlData)
and returns True if the SUITE tag is present and contains |name|.
@param name: the suite name to base the predicate on.
@return a callable that takes a ControlData and looks for |name| in that
ControlData object's suite member.
return lambda t: hasattr(t, 'suite') and \
name in Suite.parse_tag(t.suite)
def list_all_suites(build, devserver, cf_getter=None):
Parses all ControlData objects with a SUITE tag and extracts all
defined suite names.
@param build: the build on which we're running this suite.
@param devserver: the devserver which contains the build.
@param cf_getter: control_file_getter.ControlFileGetter. Defaults to
using DevServerGetter.
@return list of suites
if cf_getter is None:
cf_getter = Suite.create_ds_getter(build, devserver)
suites = set()
predicate = lambda t: hasattr(t, 'suite')
for test in Suite.find_and_parse_tests(cf_getter, predicate,
return list(suites)
def create_from_name(name, build, devserver, cf_getter=None, afe=None,
tko=None, pool=None, results_dir=None,
Create a Suite using a predicate based on the SUITE control file var.
Makes a predicate based on |name| and uses it to instantiate a Suite
that looks for tests in |autotest_dir| and will schedule them using
|afe|. Pulls control files from the default dev server.
Results will be pulled from |tko| upon completion.
@param name: a value of the SUITE control file variable to search for.
@param build: the build on which we're running this suite.
@param devserver: the devserver which contains the build.
@param cf_getter: a control_file_getter.ControlFileGetter.
If None, default to using a DevServerGetter.
@param afe: an instance of AFE as defined in server/
@param tko: an instance of TKO as defined in server/
@param pool: Specify the pool of machines to use for scheduling
@param results_dir: The directory where the job can write results to.
This must be set if you want job_id of sub-jobs
list in the job keyvals.
@return a Suite instance.
if cf_getter is None:
cf_getter = Suite.create_ds_getter(build, devserver)
return Suite(Suite.name_in_tag_predicate(name),
name, build, cf_getter, afe, tko, pool, results_dir)
def create_from_name_and_blacklist(name, blacklist, build, devserver,
cf_getter=None, afe=None, tko=None,
pool=None, results_dir=None,
Create a Suite using a predicate based on the SUITE control file var.
Makes a predicate based on |name| and uses it to instantiate a Suite
that looks for tests in |autotest_dir| and will schedule them using
|afe|. Pulls control files from the default dev server.
Results will be pulled from |tko| upon completion.
@param name: a value of the SUITE control file variable to search for.
@param blacklist: iterable of control file paths to skip.
@param build: the build on which we're running this suite.
@param devserver: the devserver which contains the build.
@param cf_getter: a control_file_getter.ControlFileGetter.
If None, default to using a DevServerGetter.
@param afe: an instance of AFE as defined in server/
@param tko: an instance of TKO as defined in server/
@param pool: Specify the pool of machines to use for scheduling
@param results_dir: The directory where the job can write results to.
This must be set if you want job_id of sub-jobs
list in the job keyvals.
@return a Suite instance.
if cf_getter is None:
cf_getter = Suite.create_ds_getter(build, devserver)
def in_tag_not_in_blacklist_predicate(test):
return (Suite.name_in_tag_predicate(name)(test) and
hasattr(test, 'path') and
True not in [b.endswith(test.path) for b in blacklist])
return Suite(in_tag_not_in_blacklist_predicate,
name, build, cf_getter, afe, tko, pool, results_dir,
def __init__(self, predicate, tag, build, cf_getter, afe=None, tko=None,
pool=None, results_dir=None, max_runtime_mins=24*60):
@param predicate: a function that should return True when run over a
ControlData representation of a control file that should be in
this Suite.
@param tag: a string with which to tag jobs run in this suite.
@param build: the build on which we're running this suite.
@param cf_getter: a control_file_getter.ControlFileGetter
@param afe: an instance of AFE as defined in server/
@param tko: an instance of TKO as defined in server/
@param pool: Specify the pool of machines to use for scheduling
@param results_dir: The directory where the job can write results to.
This must be set if you want job_id of sub-jobs
list in the job keyvals.
self._predicate = predicate
self._tag = tag
self._build = build
self._cf_getter = cf_getter
self._results_dir = results_dir
self._afe = afe or frontend_wrappers.RetryingAFE(timeout_min=30,
self._tko = tko or frontend_wrappers.RetryingTKO(timeout_min=30,
self._pool = pool
self._jobs = []
self._tests = Suite.find_and_parse_tests(self._cf_getter,
self._max_runtime_mins = max_runtime_mins
def tests(self):
A list of ControlData objects in the suite, with added |text| attr.
return self._tests
def stable_tests(self):
|self.tests|, filtered for non-experimental tests.
return filter(lambda t: not t.experimental, self.tests)
def unstable_tests(self):
|self.tests|, filtered for experimental tests.
return filter(lambda t: t.experimental, self.tests)
def _create_job(self, test):
Thin wrapper around frontend.AFE.create_job().
@param test: ControlData object for a test to run.
@return a frontend.Job object with an added test_name member.
test_name is used to preserve the higher level TEST_NAME
name of the job.
job_deps = list(test.dependencies)
if self._pool:
meta_hosts = self._pool
cros_label = constants.VERSION_PREFIX + self._build
# No pool specified use any machines with the following label.
meta_hosts = constants.VERSION_PREFIX + self._build
test_obj = self._afe.create_job(
name='/'.join([self._build, self._tag,]),
keyvals={constants.JOB_BUILD_KEY: self._build,
constants.JOB_SUITE_KEY: self._tag},
setattr(test_obj, 'test_name',
return test_obj
def run_and_wait(self, record, manager, add_experimental=True):
Synchronously run tests in |self.tests|.
Schedules tests against a device running image |self._build|, and
then polls for status, using |record| to print status when each
Tests returned by self.stable_tests() will always be run, while tests
in self.unstable_tests() will only be run if |add_experimental| is true.
@param record: callable that records job status.
@param manager: a populated HostLockManager instance to handle
unlocking DUTs that we already reimaged.
@param add_experimental: schedule experimental tests as well, or not.
logging.debug('Discovered %d stable tests.', len(self.stable_tests()))
logging.debug('Discovered %d unstable tests.',
Status('INFO', 'Start %s' % self._tag).record_result(record)
# Unlock all hosts, so test jobs can be run on them.
for result in job_status.wait_for_results(self._afe,
if (self._results_dir and
except Exception as e:
Status('FAIL', self._tag,
'Exception waiting for results').record_result(record)
except Exception as e:
Status('FAIL', self._tag,
'Exception while scheduling suite').record_result(record)
def schedule(self, add_experimental=True):
Schedule jobs using |self._afe|.
frontend.Job objects representing each scheduled job will be put in
@param add_experimental: schedule experimental tests as well, or not.
for test in self.stable_tests():
logging.debug('Scheduling %s',
if add_experimental:
for test in self.unstable_tests():
logging.debug('Scheduling experimental %s', = constants.EXPERIMENTAL_PREFIX +
if self._results_dir:
def _remember_scheduled_job_ids(self):
Record scheduled job ids as keyvals, so they can be referenced later.
for job in self._jobs:
def _remember_provided_job_id(self, job):
Record provided job as a suite job keyval, for later referencing.
@param job: some representation of a job, including id, test_name
and owner
if and job.owner and job.test_name:
job_id_owner = '%s-%s' % (, job.owner)
logging.debug('Adding job keyval for %s=%s',
job.test_name, job_id_owner)
{hashlib.md5(job.test_name).hexdigest(): job_id_owner})
def find_and_parse_tests(cf_getter, predicate, add_experimental=False):
Function to scan through all tests and find eligible tests.
Looks at control files returned by _cf_getter.get_control_file_list()
for tests that pass self._predicate().
@param cf_getter: a control_file_getter.ControlFileGetter used to list
and fetch the content of control files
@param predicate: a function that should return True when run over a
ControlData representation of a control file that should be in
this Suite.
@param add_experimental: add tests with experimental attribute set.
@return list of ControlData objects that should be run, with control
file text added in |text| attribute.
tests = {}
files = cf_getter.get_control_file_list()
matcher = re.compile(r'[^/]+/(deps|profilers)/.+')
for file in filter(lambda f: not matcher.match(f), files):
logging.debug('Considering %s', file)
text = cf_getter.get_control_file_contents(file)
found_test = control_data.parse_control_string(
text, raise_warnings=True)
if not add_experimental and found_test.experimental:
found_test.text = text
found_test.path = file
tests[file] = found_test
except control_data.ControlVariableException, e:
logging.warn("Skipping %s\n%s", file, e)
except Exception, e:
logging.error("Bad %s\n%s", file, e)
return [test for test in tests.itervalues() if predicate(test)]