blob: 308449d176e6ecc40bdd37355e9f9a2bcca0a9e6 [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import compiler, hashlib, logging, os, re, traceback, signal
import common
from autotest_lib.client.common_lib import base_job, control_data
from autotest_lib.client.common_lib import utils
from autotest_lib.client.common_lib.cros import dev_server
from autotest_lib.server.cros.dynamic_suite import constants
from autotest_lib.server.cros.dynamic_suite import control_file_getter
from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
from autotest_lib.server.cros.dynamic_suite import host_lock_manager, job_status
from autotest_lib.server.cros.dynamic_suite.job_status import Status
from autotest_lib.server import frontend
class Suite(object):
"""
A suite of tests, defined by some predicate over control file variables.
Given a place to search for control files a predicate to match the desired
tests, can gather tests and fire off jobs to run them, and then wait for
results.
@var _predicate: a function that should return True when run over a
ControlData representation of a control file that should be in
this Suite.
@var _tag: a string with which to tag jobs run in this suite.
@var _build: the build on which we're running this suite.
@var _afe: an instance of AFE as defined in server/frontend.py.
@var _tko: an instance of TKO as defined in server/frontend.py.
@var _jobs: currently scheduled jobs, if any.
@var _cf_getter: a control_file_getter.ControlFileGetter
"""
@staticmethod
def create_ds_getter(build, devserver):
"""
@param build: the build on which we're running this suite.
@param devserver: the devserver which contains the build.
@return a FileSystemGetter instance that looks under |autotest_dir|.
"""
return control_file_getter.DevServerGetter(build, devserver)
@staticmethod
def create_fs_getter(autotest_dir):
"""
@param autotest_dir: the place to find autotests.
@return a FileSystemGetter instance that looks under |autotest_dir|.
"""
# currently hard-coded places to look for tests.
subpaths = ['server/site_tests', 'client/site_tests',
'server/tests', 'client/tests']
directories = [os.path.join(autotest_dir, p) for p in subpaths]
return control_file_getter.FileSystemGetter(directories)
@staticmethod
def parse_tag(tag):
"""Splits a string on ',' optionally surrounded by whitespace."""
return map(lambda x: x.strip(), tag.split(','))
@staticmethod
def name_in_tag_predicate(name):
"""Returns predicate that takes a control file and looks for |name|.
Builds a predicate that takes in a parsed control file (a ControlData)
and returns True if the SUITE tag is present and contains |name|.
@param name: the suite name to base the predicate on.
@return a callable that takes a ControlData and looks for |name| in that
ControlData object's suite member.
"""
return lambda t: hasattr(t, 'suite') and \
name in Suite.parse_tag(t.suite)
@staticmethod
def list_all_suites(build, devserver, cf_getter=None):
"""
Parses all ControlData objects with a SUITE tag and extracts all
defined suite names.
@param build: the build on which we're running this suite.
@param devserver: the devserver which contains the build.
@param cf_getter: control_file_getter.ControlFileGetter. Defaults to
using DevServerGetter.
@return list of suites
"""
if cf_getter is None:
cf_getter = Suite.create_ds_getter(build, devserver)
suites = set()
predicate = lambda t: hasattr(t, 'suite')
for test in Suite.find_and_parse_tests(cf_getter, predicate,
add_experimental=True):
suites.update(Suite.parse_tag(test.suite))
return list(suites)
@staticmethod
def create_from_name(name, build, devserver, cf_getter=None, afe=None,
tko=None, pool=None, results_dir=None,
max_runtime_mins=24*60):
"""
Create a Suite using a predicate based on the SUITE control file var.
Makes a predicate based on |name| and uses it to instantiate a Suite
that looks for tests in |autotest_dir| and will schedule them using
|afe|. Pulls control files from the default dev server.
Results will be pulled from |tko| upon completion.
@param name: a value of the SUITE control file variable to search for.
@param build: the build on which we're running this suite.
@param devserver: the devserver which contains the build.
@param cf_getter: a control_file_getter.ControlFileGetter.
If None, default to using a DevServerGetter.
@param afe: an instance of AFE as defined in server/frontend.py.
@param tko: an instance of TKO as defined in server/frontend.py.
@param pool: Specify the pool of machines to use for scheduling
purposes.
@param results_dir: The directory where the job can write results to.
This must be set if you want job_id of sub-jobs
list in the job keyvals.
@return a Suite instance.
"""
if cf_getter is None:
cf_getter = Suite.create_ds_getter(build, devserver)
return Suite(Suite.name_in_tag_predicate(name),
name, build, cf_getter, afe, tko, pool, results_dir)
@staticmethod
def create_from_name_and_blacklist(name, blacklist, build, devserver,
cf_getter=None, afe=None, tko=None,
pool=None, results_dir=None,
max_runtime_mins=24*60):
"""
Create a Suite using a predicate based on the SUITE control file var.
Makes a predicate based on |name| and uses it to instantiate a Suite
that looks for tests in |autotest_dir| and will schedule them using
|afe|. Pulls control files from the default dev server.
Results will be pulled from |tko| upon completion.
@param name: a value of the SUITE control file variable to search for.
@param blacklist: iterable of control file paths to skip.
@param build: the build on which we're running this suite.
@param devserver: the devserver which contains the build.
@param cf_getter: a control_file_getter.ControlFileGetter.
If None, default to using a DevServerGetter.
@param afe: an instance of AFE as defined in server/frontend.py.
@param tko: an instance of TKO as defined in server/frontend.py.
@param pool: Specify the pool of machines to use for scheduling
purposes.
@param results_dir: The directory where the job can write results to.
This must be set if you want job_id of sub-jobs
list in the job keyvals.
@return a Suite instance.
"""
if cf_getter is None:
cf_getter = Suite.create_ds_getter(build, devserver)
def in_tag_not_in_blacklist_predicate(test):
return (Suite.name_in_tag_predicate(name)(test) and
hasattr(test, 'path') and
True not in [b.endswith(test.path) for b in blacklist])
return Suite(in_tag_not_in_blacklist_predicate,
name, build, cf_getter, afe, tko, pool, results_dir,
max_runtime_mins)
def __init__(self, predicate, tag, build, cf_getter, afe=None, tko=None,
pool=None, results_dir=None, max_runtime_mins=24*60):
"""
Constructor
@param predicate: a function that should return True when run over a
ControlData representation of a control file that should be in
this Suite.
@param tag: a string with which to tag jobs run in this suite.
@param build: the build on which we're running this suite.
@param cf_getter: a control_file_getter.ControlFileGetter
@param afe: an instance of AFE as defined in server/frontend.py.
@param tko: an instance of TKO as defined in server/frontend.py.
@param pool: Specify the pool of machines to use for scheduling
purposes.
@param results_dir: The directory where the job can write results to.
This must be set if you want job_id of sub-jobs
list in the job keyvals.
"""
self._predicate = predicate
self._tag = tag
self._build = build
self._cf_getter = cf_getter
self._results_dir = results_dir
self._afe = afe or frontend_wrappers.RetryingAFE(timeout_min=30,
delay_sec=10,
debug=False)
self._tko = tko or frontend_wrappers.RetryingTKO(timeout_min=30,
delay_sec=10,
debug=False)
self._pool = pool
self._jobs = []
self._tests = Suite.find_and_parse_tests(self._cf_getter,
self._predicate,
add_experimental=True)
self._max_runtime_mins = max_runtime_mins
@property
def tests(self):
"""
A list of ControlData objects in the suite, with added |text| attr.
"""
return self._tests
def stable_tests(self):
"""
|self.tests|, filtered for non-experimental tests.
"""
return filter(lambda t: not t.experimental, self.tests)
def unstable_tests(self):
"""
|self.tests|, filtered for experimental tests.
"""
return filter(lambda t: t.experimental, self.tests)
def _create_job(self, test):
"""
Thin wrapper around frontend.AFE.create_job().
@param test: ControlData object for a test to run.
@return a frontend.Job object with an added test_name member.
test_name is used to preserve the higher level TEST_NAME
name of the job.
"""
job_deps = list(test.dependencies)
if self._pool:
meta_hosts = self._pool
cros_label = constants.VERSION_PREFIX + self._build
job_deps.append(cros_label)
else:
# No pool specified use any machines with the following label.
meta_hosts = constants.VERSION_PREFIX + self._build
test_obj = self._afe.create_job(
control_file=test.text,
name='/'.join([self._build, self._tag, test.name]),
control_type=test.test_type.capitalize(),
meta_hosts=[meta_hosts],
dependencies=job_deps,
keyvals={constants.JOB_BUILD_KEY: self._build,
constants.JOB_SUITE_KEY: self._tag},
max_runtime_mins=self._max_runtime_mins)
setattr(test_obj, 'test_name', test.name)
return test_obj
def run_and_wait(self, record, manager, add_experimental=True):
"""
Synchronously run tests in |self.tests|.
Schedules tests against a device running image |self._build|, and
then polls for status, using |record| to print status when each
completes.
Tests returned by self.stable_tests() will always be run, while tests
in self.unstable_tests() will only be run if |add_experimental| is true.
@param record: callable that records job status.
prototype:
record(base_job.status_log_entry)
@param manager: a populated HostLockManager instance to handle
unlocking DUTs that we already reimaged.
@param add_experimental: schedule experimental tests as well, or not.
"""
logging.debug('Discovered %d stable tests.', len(self.stable_tests()))
logging.debug('Discovered %d unstable tests.',
len(self.unstable_tests()))
try:
Status('INFO', 'Start %s' % self._tag).record_result(record)
self.schedule(add_experimental)
# Unlock all hosts, so test jobs can be run on them.
manager.unlock()
try:
for result in job_status.wait_for_results(self._afe,
self._tko,
self._jobs):
result.record_all(record)
if (self._results_dir and
job_status.is_for_infrastructure_fail(result)):
self._remember_provided_job_id(result)
except Exception as e:
logging.error(traceback.format_exc())
Status('FAIL', self._tag,
'Exception waiting for results').record_result(record)
except Exception as e:
logging.error(traceback.format_exc())
Status('FAIL', self._tag,
'Exception while scheduling suite').record_result(record)
def schedule(self, add_experimental=True):
"""
Schedule jobs using |self._afe|.
frontend.Job objects representing each scheduled job will be put in
|self._jobs|.
@param add_experimental: schedule experimental tests as well, or not.
"""
for test in self.stable_tests():
logging.debug('Scheduling %s', test.name)
self._jobs.append(self._create_job(test))
if add_experimental:
for test in self.unstable_tests():
logging.debug('Scheduling experimental %s', test.name)
test.name = constants.EXPERIMENTAL_PREFIX + test.name
self._jobs.append(self._create_job(test))
if self._results_dir:
self._remember_scheduled_job_ids()
def _remember_scheduled_job_ids(self):
"""
Record scheduled job ids as keyvals, so they can be referenced later.
"""
for job in self._jobs:
self._remember_provided_job_id(job)
def _remember_provided_job_id(self, job):
"""
Record provided job as a suite job keyval, for later referencing.
@param job: some representation of a job, including id, test_name
and owner
"""
if job.id and job.owner and job.test_name:
job_id_owner = '%s-%s' % (job.id, job.owner)
logging.debug('Adding job keyval for %s=%s',
job.test_name, job_id_owner)
utils.write_keyval(
self._results_dir,
{hashlib.md5(job.test_name).hexdigest(): job_id_owner})
@staticmethod
def find_and_parse_tests(cf_getter, predicate, add_experimental=False):
"""
Function to scan through all tests and find eligible tests.
Looks at control files returned by _cf_getter.get_control_file_list()
for tests that pass self._predicate().
@param cf_getter: a control_file_getter.ControlFileGetter used to list
and fetch the content of control files
@param predicate: a function that should return True when run over a
ControlData representation of a control file that should be in
this Suite.
@param add_experimental: add tests with experimental attribute set.
@return list of ControlData objects that should be run, with control
file text added in |text| attribute.
"""
tests = {}
files = cf_getter.get_control_file_list()
matcher = re.compile(r'[^/]+/(deps|profilers)/.+')
for file in filter(lambda f: not matcher.match(f), files):
logging.debug('Considering %s', file)
text = cf_getter.get_control_file_contents(file)
try:
found_test = control_data.parse_control_string(
text, raise_warnings=True)
if not add_experimental and found_test.experimental:
continue
found_test.text = text
found_test.path = file
tests[file] = found_test
except control_data.ControlVariableException, e:
logging.warn("Skipping %s\n%s", file, e)
except Exception, e:
logging.error("Bad %s\n%s", file, e)
return [test for test in tests.itervalues() if predicate(test)]