blob: d1a1e83360a3c3f8f50ce50d6a4c1904dd63fd1a [file] [log] [blame]
#!/usr/bin/python2
#
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Unit tests for server/cros/dynamic_suite/dynamic_suite.py."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import collections
from collections import OrderedDict
import mock
import mox
import os
import six
from six.moves import range
from six.moves import zip
import shutil
import tempfile
import unittest
import common
from autotest_lib.client.common_lib import base_job
from autotest_lib.client.common_lib import control_data
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib import priorities
from autotest_lib.client.common_lib import utils
from autotest_lib.client.common_lib.cros import dev_server
from autotest_lib.server import frontend
from autotest_lib.server.cros import provision
from autotest_lib.server.cros.dynamic_suite import control_file_getter
from autotest_lib.server.cros.dynamic_suite import constants
from autotest_lib.server.cros.dynamic_suite import job_status
from autotest_lib.server.cros.dynamic_suite import suite as SuiteBase
from autotest_lib.server.cros.dynamic_suite import suite_common
from autotest_lib.server.cros.dynamic_suite.comparators import StatusContains
from autotest_lib.server.cros.dynamic_suite.fakes import FakeControlData
from autotest_lib.server.cros.dynamic_suite.fakes import FakeJob
from autotest_lib.server.cros.dynamic_suite.fakes import FakeMultiprocessingPool
from autotest_lib.server.cros.dynamic_suite.suite import RetryHandler
from autotest_lib.server.cros.dynamic_suite.suite import Suite
class SuiteTest(mox.MoxTestBase):
"""Unit tests for dynamic_suite Suite class.
@var _BUILDS: fake build
@var _TAG: fake suite tag
"""
_BOARD = 'board:board'
_BUILDS = {provision.CROS_VERSION_PREFIX:'build_1',
provision.FW_RW_VERSION_PREFIX:'fwrw_build_1'}
_TAG = 'au'
_ATTR = {'attr:attr'}
_DEVSERVER_HOST = 'http://dontcare:8080'
_FAKE_JOB_ID = 10
def setUp(self):
"""Setup."""
super(SuiteTest, self).setUp()
self.maxDiff = None
self.use_batch = suite_common.ENABLE_CONTROLS_IN_BATCH
suite_common.ENABLE_CONTROLS_IN_BATCH = False
self.afe = self.mox.CreateMock(frontend.AFE)
self.tko = self.mox.CreateMock(frontend.TKO)
self.tmpdir = tempfile.mkdtemp(suffix=type(self).__name__)
self.getter = self.mox.CreateMock(control_file_getter.ControlFileGetter)
self.devserver = dev_server.ImageServer(self._DEVSERVER_HOST)
self.files = OrderedDict(
[('one', FakeControlData(self._TAG, self._ATTR, 'data_one',
'FAST', job_retries=None)),
('two', FakeControlData(self._TAG, self._ATTR, 'data_two',
'SHORT', dependencies=['feta'])),
('three', FakeControlData(self._TAG, self._ATTR, 'data_three',
'MEDIUM')),
('four', FakeControlData('other', self._ATTR, 'data_four',
'LONG', dependencies=['arugula'])),
('five', FakeControlData(self._TAG, {'other'}, 'data_five',
'LONG', dependencies=['arugula',
'caligula'])),
('six', FakeControlData(self._TAG, self._ATTR, 'data_six',
'LENGTHY')),
('seven', FakeControlData(self._TAG, self._ATTR, 'data_seven',
'FAST', job_retries=1))])
self.files_to_filter = {
'with/deps/...': FakeControlData(self._TAG, self._ATTR,
'gets filtered'),
'with/profilers/...': FakeControlData(self._TAG, self._ATTR,
'gets filtered')}
def tearDown(self):
"""Teardown."""
suite_common.ENABLE_CONTROLS_IN_BATCH = self.use_batch
super(SuiteTest, self).tearDown()
shutil.rmtree(self.tmpdir, ignore_errors=True)
def expect_control_file_parsing(self, suite_name=_TAG):
"""Expect an attempt to parse the 'control files' in |self.files|.
@param suite_name: The suite name to parse control files for.
"""
all_files = list(self.files.keys()) + list(self.files_to_filter.keys())
self._set_control_file_parsing_expectations(False, all_files,
self.files, suite_name)
def _set_control_file_parsing_expectations(self, already_stubbed,
file_list, files_to_parse,
suite_name):
"""Expect an attempt to parse the 'control files' in |files|.
@param already_stubbed: parse_control_string already stubbed out.
@param file_list: the files the dev server returns
@param files_to_parse: the {'name': FakeControlData} dict of files we
expect to get parsed.
"""
if not already_stubbed:
self.mox.StubOutWithMock(control_data, 'parse_control_string')
self.mox.StubOutWithMock(suite_common.multiprocessing, 'Pool')
suite_common.multiprocessing.Pool(
processes=suite_common.get_process_limit()).AndReturn(
FakeMultiprocessingPool())
self.getter.get_control_file_list(
suite_name=suite_name).AndReturn(file_list)
for file, data in six.iteritems(files_to_parse):
self.getter.get_control_file_contents(
file).InAnyOrder().AndReturn(data.string)
control_data.parse_control_string(
data.string,
raise_warnings=True,
path=file).InAnyOrder().AndReturn(data)
def expect_control_file_parsing_in_batch(self, suite_name=_TAG):
"""Expect an attempt to parse the contents of all control files in
|self.files| and |self.files_to_filter|, form them to a dict.
@param suite_name: The suite name to parse control files for.
"""
self.getter = self.mox.CreateMock(control_file_getter.DevServerGetter)
self.mox.StubOutWithMock(control_data, 'parse_control_string')
self.mox.StubOutWithMock(suite_common.multiprocessing, 'Pool')
suite_common.multiprocessing.Pool(
processes=suite_common.get_process_limit()).AndReturn(
FakeMultiprocessingPool())
suite_info = {}
for k, v in six.iteritems(self.files):
suite_info[k] = v.string
control_data.parse_control_string(
v.string,
raise_warnings=True,
path=k).InAnyOrder().AndReturn(v)
for k, v in six.iteritems(self.files_to_filter):
suite_info[k] = v.string
self.getter._dev_server = self._DEVSERVER_HOST
self.getter.get_suite_info(
suite_name=suite_name).AndReturn(suite_info)
def testFindAllTestInBatch(self):
"""Test switch on enable_getting_controls_in_batch for function
find_all_test."""
self.use_batch = suite_common.ENABLE_CONTROLS_IN_BATCH
self.expect_control_file_parsing_in_batch()
suite_common.ENABLE_CONTROLS_IN_BATCH = True
self.mox.ReplayAll()
predicate = lambda d: d.suite == self._TAG
tests = SuiteBase.find_and_parse_tests(self.getter,
predicate,
self._TAG)
self.assertEquals(len(tests), 6)
self.assertTrue(self.files['one'] in tests)
self.assertTrue(self.files['two'] in tests)
self.assertTrue(self.files['three'] in tests)
self.assertTrue(self.files['five'] in tests)
self.assertTrue(self.files['six'] in tests)
self.assertTrue(self.files['seven'] in tests)
suite_common.ENABLE_CONTROLS_IN_BATCH = self.use_batch
def testFindAndParseStableTests(self):
"""Should find only tests that match a predicate."""
self.expect_control_file_parsing()
self.mox.ReplayAll()
predicate = lambda d: d.text == self.files['two'].string
tests = SuiteBase.find_and_parse_tests(self.getter,
predicate,
self._TAG)
self.assertEquals(len(tests), 1)
self.assertEquals(tests[0], self.files['two'])
def testFindSuiteSyntaxErrors(self):
"""Check all control files for syntax errors.
This test actually parses all control files in the autotest directory
for syntax errors, by using the un-forgiving parser and pretending to
look for all control files with the suite attribute.
"""
autodir = os.path.abspath(
os.path.join(os.path.dirname(__file__), '..', '..', '..'))
fs_getter = SuiteBase.create_fs_getter(autodir)
predicate = lambda t: hasattr(t, 'suite')
SuiteBase.find_and_parse_tests(fs_getter, predicate,
forgiving_parser=False)
def testFindAndParseTestsSuite(self):
"""Should find all tests that match a predicate."""
self.expect_control_file_parsing()
self.mox.ReplayAll()
predicate = lambda d: d.suite == self._TAG
tests = SuiteBase.find_and_parse_tests(self.getter,
predicate,
self._TAG)
self.assertEquals(len(tests), 6)
self.assertTrue(self.files['one'] in tests)
self.assertTrue(self.files['two'] in tests)
self.assertTrue(self.files['three'] in tests)
self.assertTrue(self.files['five'] in tests)
self.assertTrue(self.files['six'] in tests)
self.assertTrue(self.files['seven'] in tests)
def testFindAndParseTestsAttr(self):
"""Should find all tests that match a predicate."""
self.expect_control_file_parsing()
self.mox.ReplayAll()
predicate = SuiteBase.matches_attribute_expression_predicate('attr:attr')
tests = SuiteBase.find_and_parse_tests(self.getter,
predicate,
self._TAG)
self.assertEquals(len(tests), 6)
self.assertTrue(self.files['one'] in tests)
self.assertTrue(self.files['two'] in tests)
self.assertTrue(self.files['three'] in tests)
self.assertTrue(self.files['four'] in tests)
self.assertTrue(self.files['six'] in tests)
self.assertTrue(self.files['seven'] in tests)
def testAdHocSuiteCreation(self):
"""Should be able to schedule an ad-hoc suite by specifying
a single test name."""
self.expect_control_file_parsing(suite_name='ad_hoc_suite')
self.mox.ReplayAll()
predicate = SuiteBase.test_name_equals_predicate('name-data_five')
suite = Suite.create_from_predicates([predicate], self._BUILDS,
self._BOARD, devserver=None,
cf_getter=self.getter,
afe=self.afe, tko=self.tko)
self.assertFalse(self.files['one'] in suite.tests)
self.assertFalse(self.files['two'] in suite.tests)
self.assertFalse(self.files['four'] in suite.tests)
self.assertTrue(self.files['five'] in suite.tests)
def mock_control_file_parsing(self):
"""Fake out find_and_parse_tests(), returning content from |self.files|.
"""
for test in self.files.values():
test.text = test.string # mimic parsing.
self.mox.StubOutWithMock(SuiteBase, 'find_and_parse_tests')
SuiteBase.find_and_parse_tests(
mox.IgnoreArg(),
mox.IgnoreArg(),
mox.IgnoreArg(),
forgiving_parser=True,
run_prod_code=False,
test_args=None).AndReturn(list(self.files.values()))
def expect_job_scheduling(self, recorder,
tests_to_skip=[], ignore_deps=False,
raises=False, suite_deps=[], suite=None,
extra_keyvals={}):
"""Expect jobs to be scheduled for 'tests' in |self.files|.
@param recorder: object with a record_entry to be used to record test
results.
@param tests_to_skip: [list, of, test, names] that we expect to skip.
@param ignore_deps: If true, ignore tests' dependencies.
@param raises: If True, expect exceptions.
@param suite_deps: If True, add suite level dependencies.
@param extra_keyvals: Extra keyvals set to tests.
"""
record_job_id = suite and suite._results_dir
if record_job_id:
self.mox.StubOutWithMock(suite, '_remember_job_keyval')
recorder.record_entry(
StatusContains.CreateFromStrings('INFO', 'Start %s' % self._TAG),
log_in_subdir=False)
tests = list(self.files.values())
n = 1
for test in tests:
if test.name in tests_to_skip:
continue
dependencies = []
if not ignore_deps:
dependencies.extend(test.dependencies)
if suite_deps:
dependencies.extend(suite_deps)
dependencies.append(self._BOARD)
build = self._BUILDS[provision.CROS_VERSION_PREFIX]
keyvals = {
'build': build,
'suite': self._TAG,
'builds': SuiteTest._BUILDS,
'experimental':test.experimental,
}
keyvals.update(extra_keyvals)
job_mock = self.afe.create_job(
control_file=test.text,
name=mox.And(mox.StrContains(build),
mox.StrContains(test.name)),
control_type=mox.IgnoreArg(),
meta_hosts=[self._BOARD],
dependencies=dependencies,
keyvals=keyvals,
max_runtime_mins=24*60,
timeout_mins=1440,
parent_job_id=None,
reboot_before=mox.IgnoreArg(),
run_reset=mox.IgnoreArg(),
priority=priorities.Priority.DEFAULT,
synch_count=test.sync_count,
require_ssp=test.require_ssp
)
if raises:
job_mock.AndRaise(error.NoEligibleHostException())
recorder.record_entry(
StatusContains.CreateFromStrings('START', test.name),
log_in_subdir=False)
recorder.record_entry(
StatusContains.CreateFromStrings('TEST_NA', test.name),
log_in_subdir=False)
recorder.record_entry(
StatusContains.CreateFromStrings('END', test.name),
log_in_subdir=False)
else:
fake_job = FakeJob(id=n)
job_mock.AndReturn(fake_job)
if record_job_id:
suite._remember_job_keyval(fake_job)
n += 1
def testScheduleTestsAndRecord(self):
"""Should schedule stable and experimental tests with the AFE."""
name_list = ['name-data_one', 'name-data_two', 'name-data_three',
'name-data_four', 'name-data_five', 'name-data_six',
'name-data_seven']
keyval_dict = {constants.SCHEDULED_TEST_COUNT_KEY: 7,
constants.SCHEDULED_TEST_NAMES_KEY: repr(name_list)}
self.mock_control_file_parsing()
self.mox.ReplayAll()
suite = Suite.create_from_name(self._TAG, self._BUILDS, self._BOARD,
self.devserver,
afe=self.afe, tko=self.tko,
results_dir=self.tmpdir)
self.mox.ResetAll()
recorder = self.mox.CreateMock(base_job.base_job)
self.expect_job_scheduling(recorder, suite=suite)
self.mox.StubOutWithMock(utils, 'write_keyval')
utils.write_keyval(self.tmpdir, keyval_dict)
self.mox.ReplayAll()
suite.schedule(recorder.record_entry)
for job in suite._jobs:
self.assertTrue(hasattr(job, 'test_name'))
def testScheduleTests(self):
"""Should schedule tests with the AFE."""
name_list = ['name-data_one', 'name-data_two', 'name-data_three',
'name-data_four', 'name-data_five', 'name-data_six',
'name-data_seven']
keyval_dict = {constants.SCHEDULED_TEST_COUNT_KEY: len(name_list),
constants.SCHEDULED_TEST_NAMES_KEY: repr(name_list)}
self.mock_control_file_parsing()
recorder = self.mox.CreateMock(base_job.base_job)
self.expect_job_scheduling(recorder)
self.mox.StubOutWithMock(utils, 'write_keyval')
utils.write_keyval(None, keyval_dict)
self.mox.ReplayAll()
suite = Suite.create_from_name(self._TAG, self._BUILDS, self._BOARD,
self.devserver,
afe=self.afe, tko=self.tko)
suite.schedule(recorder.record_entry)
def testScheduleTestsIgnoreDeps(self):
"""Test scheduling tests ignoring deps."""
name_list = ['name-data_one', 'name-data_two', 'name-data_three',
'name-data_four', 'name-data_five', 'name-data_six',
'name-data_seven']
keyval_dict = {constants.SCHEDULED_TEST_COUNT_KEY: len(name_list),
constants.SCHEDULED_TEST_NAMES_KEY: repr(name_list)}
self.mock_control_file_parsing()
recorder = self.mox.CreateMock(base_job.base_job)
self.expect_job_scheduling(recorder, ignore_deps=True)
self.mox.StubOutWithMock(utils, 'write_keyval')
utils.write_keyval(None, keyval_dict)
self.mox.ReplayAll()
suite = Suite.create_from_name(self._TAG, self._BUILDS, self._BOARD,
self.devserver,
afe=self.afe, tko=self.tko,
ignore_deps=True)
suite.schedule(recorder.record_entry)
def testScheduleUnrunnableTestsTESTNA(self):
"""Tests which fail to schedule should be TEST_NA."""
# Since all tests will be fail to schedule, the num of scheduled tests
# will be zero.
name_list = []
keyval_dict = {constants.SCHEDULED_TEST_COUNT_KEY: 0,
constants.SCHEDULED_TEST_NAMES_KEY: repr(name_list)}
self.mock_control_file_parsing()
recorder = self.mox.CreateMock(base_job.base_job)
self.expect_job_scheduling(recorder, raises=True)
self.mox.StubOutWithMock(utils, 'write_keyval')
utils.write_keyval(None, keyval_dict)
self.mox.ReplayAll()
suite = Suite.create_from_name(self._TAG, self._BUILDS, self._BOARD,
self.devserver,
afe=self.afe, tko=self.tko)
suite.schedule(recorder.record_entry)
def testRetryMapAfterScheduling(self):
"""Test job-test and test-job mapping are correctly updated."""
name_list = ['name-data_one', 'name-data_two', 'name-data_three',
'name-data_four', 'name-data_five', 'name-data_six',
'name-data_seven']
keyval_dict = {constants.SCHEDULED_TEST_COUNT_KEY: 7,
constants.SCHEDULED_TEST_NAMES_KEY: repr(name_list)}
self.mock_control_file_parsing()
recorder = self.mox.CreateMock(base_job.base_job)
self.expect_job_scheduling(recorder)
self.mox.StubOutWithMock(utils, 'write_keyval')
utils.write_keyval(None, keyval_dict)
all_files = list(self.files.items())
# Sort tests in self.files so that they are in the same
# order as they are scheduled.
expected_retry_map = {}
for n in range(len(all_files)):
test = all_files[n][1]
job_id = n + 1
job_retries = 1 if test.job_retries is None else test.job_retries
if job_retries > 0:
expected_retry_map[job_id] = {
'state': RetryHandler.States.NOT_ATTEMPTED,
'retry_max': job_retries}
self.mox.ReplayAll()
suite = Suite.create_from_name(self._TAG, self._BUILDS, self._BOARD,
self.devserver,
afe=self.afe, tko=self.tko,
job_retry=True)
suite.schedule(recorder.record_entry)
self.assertEqual(expected_retry_map, suite._retry_handler._retry_map)
def testSuiteMaxRetries(self):
"""Test suite max retries."""
name_list = ['name-data_one', 'name-data_two', 'name-data_three',
'name-data_four', 'name-data_five',
'name-data_six', 'name-data_seven']
keyval_dict = {constants.SCHEDULED_TEST_COUNT_KEY: 7,
constants.SCHEDULED_TEST_NAMES_KEY: repr(name_list)}
self.mock_control_file_parsing()
recorder = self.mox.CreateMock(base_job.base_job)
self.expect_job_scheduling(recorder)
self.mox.StubOutWithMock(utils, 'write_keyval')
utils.write_keyval(None, keyval_dict)
self.mox.ReplayAll()
suite = Suite.create_from_name(self._TAG, self._BUILDS, self._BOARD,
self.devserver,
afe=self.afe, tko=self.tko,
job_retry=True, max_retries=1)
suite.schedule(recorder.record_entry)
self.assertEqual(suite._retry_handler._max_retries, 1)
# Find the job_id of the test that allows retry
job_id = next(six.iterkeys(suite._retry_handler._retry_map))
suite._retry_handler.add_retry(old_job_id=job_id, new_job_id=10)
self.assertEqual(suite._retry_handler._max_retries, 0)
def testSuiteDependencies(self):
"""Should add suite dependencies to tests scheduled."""
name_list = ['name-data_one', 'name-data_two', 'name-data_three',
'name-data_four', 'name-data_five', 'name-data_six',
'name-data_seven']
keyval_dict = {constants.SCHEDULED_TEST_COUNT_KEY: len(name_list),
constants.SCHEDULED_TEST_NAMES_KEY: repr(name_list)}
self.mock_control_file_parsing()
recorder = self.mox.CreateMock(base_job.base_job)
self.expect_job_scheduling(recorder, suite_deps=['extra'])
self.mox.StubOutWithMock(utils, 'write_keyval')
utils.write_keyval(None, keyval_dict)
self.mox.ReplayAll()
suite = Suite.create_from_name(self._TAG, self._BUILDS, self._BOARD,
self.devserver, extra_deps=['extra'],
afe=self.afe, tko=self.tko)
suite.schedule(recorder.record_entry)
def testInheritedKeyvals(self):
"""Tests should inherit some allowlisted job keyvals."""
# Only keyvals in constants.INHERITED_KEYVALS are inherited to tests.
job_keyvals = {
constants.KEYVAL_CIDB_BUILD_ID: '111',
constants.KEYVAL_CIDB_BUILD_STAGE_ID: '222',
constants.KEYVAL_BRANCH: 'dummy_branch',
constants.KEYVAL_BUILDER_NAME: 'model-dummy',
constants.KEYVAL_MAIN_BUILDER_NAME: 'main-dummy',
'your': 'name',
}
test_keyvals = {
constants.KEYVAL_CIDB_BUILD_ID: '111',
constants.KEYVAL_CIDB_BUILD_STAGE_ID: '222',
constants.KEYVAL_BRANCH: 'dummy_branch',
constants.KEYVAL_BUILDER_NAME: 'model-dummy',
constants.KEYVAL_MAIN_BUILDER_NAME: 'main-dummy',
}
self.mock_control_file_parsing()
recorder = self.mox.CreateMock(base_job.base_job)
self.expect_job_scheduling(
recorder,
extra_keyvals=test_keyvals)
self.mox.StubOutWithMock(utils, 'write_keyval')
utils.write_keyval(None, job_keyvals)
utils.write_keyval(None, mox.IgnoreArg())
self.mox.ReplayAll()
suite = Suite.create_from_name(self._TAG, self._BUILDS, self._BOARD,
self.devserver,
afe=self.afe, tko=self.tko,
job_keyvals=job_keyvals)
suite.schedule(recorder.record_entry)
def _createSuiteWithMockedTestsAndControlFiles(self, file_bugs=False):
"""Create a Suite, using mocked tests and control file contents.
@return Suite object, after mocking out behavior needed to create it.
"""
self.result_reporter = _MemoryResultReporter()
self.expect_control_file_parsing()
self.mox.ReplayAll()
suite = Suite.create_from_name(
self._TAG,
self._BUILDS,
self._BOARD,
self.devserver,
self.getter,
afe=self.afe,
tko=self.tko,
file_bugs=file_bugs,
job_retry=True,
result_reporter=self.result_reporter,
)
self.mox.ResetAll()
return suite
def _createSuiteMockResults(self, results_dir=None, result_status='FAIL'):
"""Create a suite, returned a set of mocked results to expect.
@param results_dir: A mock results directory.
@param result_status: A desired result status, e.g. 'FAIL', 'WARN'.
@return List of mocked results to wait on.
"""
self.suite = self._createSuiteWithMockedTestsAndControlFiles(
file_bugs=True)
self.suite._results_dir = results_dir
test_report = self._get_bad_test_report(result_status)
test_predicates = test_report.predicates
test_fallout = test_report.fallout
self.recorder = self.mox.CreateMock(base_job.base_job)
self.recorder.record_entry = self.mox.CreateMock(
base_job.base_job.record_entry)
self._mock_recorder_with_results([test_predicates], self.recorder)
return [test_predicates, test_fallout]
def _mock_recorder_with_results(self, results, recorder):
"""
Checks that results are recoded in order, eg:
START, (status, name, reason) END
@param results: list of results
@param recorder: status recorder
"""
for result in results:
status = result[0]
test_name = result[1]
recorder.record_entry(
StatusContains.CreateFromStrings('START', test_name),
log_in_subdir=False)
recorder.record_entry(
StatusContains.CreateFromStrings(*result),
log_in_subdir=False).InAnyOrder('results')
recorder.record_entry(
StatusContains.CreateFromStrings('END %s' % status, test_name),
log_in_subdir=False)
def schedule_and_expect_these_results(self, suite, results, recorder):
"""Create mox stubs for call to suite.schedule and
job_status.wait_for_results
@param suite: suite object for which to stub out schedule(...)
@param results: results object to be returned from
job_stats_wait_for_results(...)
@param recorder: mocked recorder object to replay status messages
"""
def result_generator(results):
"""A simple generator which generates results as Status objects.
This generator handles 'send' by simply ignoring it.
@param results: results object to be returned from
job_stats_wait_for_results(...)
@yield: job_status.Status objects.
"""
results = [job_status.Status(*r) for r in results]
for r in results:
new_input = (yield r)
if new_input:
yield None
self.mox.StubOutWithMock(suite, 'schedule')
suite.schedule(recorder.record_entry)
suite._retry_handler = RetryHandler({})
waiter_patch = mock.patch.object(
job_status.JobResultWaiter, 'wait_for_results', autospec=True)
waiter_mock = waiter_patch.start()
waiter_mock.return_value = result_generator(results)
self.addCleanup(waiter_patch.stop)
def testRunAndWaitSuccess(self):
"""Should record successful results."""
suite = self._createSuiteWithMockedTestsAndControlFiles()
recorder = self.mox.CreateMock(base_job.base_job)
results = [('GOOD', 'good'), ('FAIL', 'bad', 'reason')]
self._mock_recorder_with_results(results, recorder)
self.schedule_and_expect_these_results(suite, results, recorder)
self.mox.ReplayAll()
suite.schedule(recorder.record_entry)
suite.wait(recorder.record_entry)
def testRunAndWaitFailure(self):
"""Should record failure to gather results."""
suite = self._createSuiteWithMockedTestsAndControlFiles()
recorder = self.mox.CreateMock(base_job.base_job)
recorder.record_entry(
StatusContains.CreateFromStrings('FAIL', self._TAG, 'waiting'),
log_in_subdir=False)
self.mox.StubOutWithMock(suite, 'schedule')
suite.schedule(recorder.record_entry)
self.mox.ReplayAll()
with mock.patch.object(
job_status.JobResultWaiter, 'wait_for_results',
autospec=True) as wait_mock:
wait_mock.side_effect = Exception
suite.schedule(recorder.record_entry)
suite.wait(recorder.record_entry)
def testRunAndWaitScheduleFailure(self):
"""Should record failure to schedule jobs."""
suite = self._createSuiteWithMockedTestsAndControlFiles()
recorder = self.mox.CreateMock(base_job.base_job)
recorder.record_entry(
StatusContains.CreateFromStrings('INFO', 'Start %s' % self._TAG),
log_in_subdir=False)
recorder.record_entry(
StatusContains.CreateFromStrings('FAIL', self._TAG, 'scheduling'),
log_in_subdir=False)
self.mox.StubOutWithMock(suite._job_creator, 'create_job')
suite._job_creator.create_job(
mox.IgnoreArg(), retry_for=mox.IgnoreArg()).AndRaise(
Exception('Expected during test.'))
self.mox.ReplayAll()
suite.schedule(recorder.record_entry)
suite.wait(recorder.record_entry)
def testGetTestsSortedByTime(self):
"""Should find all tests and sorted by TIME setting."""
self.expect_control_file_parsing()
self.mox.ReplayAll()
# Get all tests.
tests = SuiteBase.find_and_parse_tests(self.getter,
lambda d: True,
self._TAG)
self.assertEquals(len(tests), 7)
times = [control_data.ControlData.get_test_time_index(test.time)
for test in tests]
self.assertTrue(all(x>=y for x, y in zip(times, times[1:])),
'Tests are not ordered correctly.')
def _get_bad_test_report(self, result_status='FAIL'):
"""
Fetch the predicates of a failing test, and the parameters
that are a fallout of this test failing.
"""
predicates = collections.namedtuple('predicates',
'status, testname, reason')
fallout = collections.namedtuple('fallout',
('time_start, time_end, job_id,'
'username, hostname'))
test_report = collections.namedtuple('test_report',
'predicates, fallout')
return test_report(predicates(result_status, 'bad_test',
'dreadful_reason'),
fallout('2014-01-01 01:01:01', 'None',
self._FAKE_JOB_ID, 'user', 'myhost'))
def testJobRetryTestFail(self):
"""Test retry works."""
test_to_retry = self.files['seven']
fake_new_job_id = self._FAKE_JOB_ID + 1
fake_job = FakeJob(id=self._FAKE_JOB_ID)
fake_new_job = FakeJob(id=fake_new_job_id)
test_results = self._createSuiteMockResults()
self.schedule_and_expect_these_results(
self.suite,
[test_results[0] + test_results[1]],
self.recorder)
self.mox.StubOutWithMock(self.suite._job_creator, 'create_job')
self.suite._job_creator.create_job(
test_to_retry,
retry_for=self._FAKE_JOB_ID).AndReturn(fake_new_job)
self.mox.ReplayAll()
self.suite.schedule(self.recorder.record_entry)
self.suite._retry_handler._retry_map = {
self._FAKE_JOB_ID: {'state': RetryHandler.States.NOT_ATTEMPTED,
'retry_max': 1}
}
self.suite._jobs_to_tests[self._FAKE_JOB_ID] = test_to_retry
self.suite.wait(self.recorder.record_entry)
expected_retry_map = {
self._FAKE_JOB_ID: {'state': RetryHandler.States.RETRIED,
'retry_max': 1},
fake_new_job_id: {'state': RetryHandler.States.NOT_ATTEMPTED,
'retry_max': 0}
}
# Check retry map is correctly updated
self.assertEquals(self.suite._retry_handler._retry_map,
expected_retry_map)
# Check _jobs_to_tests is correctly updated
self.assertEquals(self.suite._jobs_to_tests[fake_new_job_id],
test_to_retry)
def testJobRetryTestWarn(self):
"""Test that no retry is scheduled if test warns."""
test_to_retry = self.files['seven']
fake_job = FakeJob(id=self._FAKE_JOB_ID)
test_results = self._createSuiteMockResults(result_status='WARN')
self.schedule_and_expect_these_results(
self.suite,
[test_results[0] + test_results[1]],
self.recorder)
self.mox.ReplayAll()
self.suite.schedule(self.recorder.record_entry)
self.suite._retry_handler._retry_map = {
self._FAKE_JOB_ID: {'state': RetryHandler.States.NOT_ATTEMPTED,
'retry_max': 1}
}
self.suite._jobs_to_tests[self._FAKE_JOB_ID] = test_to_retry
expected_jobs_to_tests = self.suite._jobs_to_tests.copy()
expected_retry_map = self.suite._retry_handler._retry_map.copy()
self.suite.wait(self.recorder.record_entry)
self.assertTrue(self.result_reporter.results)
# Check retry map and _jobs_to_tests, ensure no retry was scheduled.
self.assertEquals(self.suite._retry_handler._retry_map,
expected_retry_map)
self.assertEquals(self.suite._jobs_to_tests, expected_jobs_to_tests)
def testFailedJobRetry(self):
"""Make sure the suite survives even if the retry failed."""
test_to_retry = self.files['seven']
fake_job = FakeJob(id=self._FAKE_JOB_ID)
test_results = self._createSuiteMockResults()
self.schedule_and_expect_these_results(
self.suite,
[test_results[0] + test_results[1]],
self.recorder)
self.mox.StubOutWithMock(self.suite._job_creator, 'create_job')
self.suite._job_creator.create_job(
test_to_retry, retry_for=self._FAKE_JOB_ID).AndRaise(
error.RPCException('Expected during test'))
# Do not file a bug.
self.mox.StubOutWithMock(self.suite, '_should_report')
self.suite._should_report(mox.IgnoreArg()).AndReturn(False)
self.mox.ReplayAll()
self.suite.schedule(self.recorder.record_entry)
self.suite._retry_handler._retry_map = {
self._FAKE_JOB_ID: {
'state': RetryHandler.States.NOT_ATTEMPTED,
'retry_max': 1}}
self.suite._jobs_to_tests[self._FAKE_JOB_ID] = test_to_retry
self.suite.wait(self.recorder.record_entry)
expected_retry_map = {
self._FAKE_JOB_ID: {
'state': RetryHandler.States.ATTEMPTED,
'retry_max': 1}}
expected_jobs_to_tests = self.suite._jobs_to_tests.copy()
self.assertEquals(self.suite._retry_handler._retry_map,
expected_retry_map)
self.assertEquals(self.suite._jobs_to_tests, expected_jobs_to_tests)
class _MemoryResultReporter(SuiteBase._ResultReporter):
"""Reporter that stores results internally for testing."""
def __init__(self):
self.results = []
def report(self, result):
"""Reports the result by storing it internally."""
self.results.append(result)
if __name__ == '__main__':
unittest.main()