blob: bca51dbfabaa92cee5b32cce197d85dfa90b55c3 [file] [log] [blame]
#!/usr/bin/python
#
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import datetime, unittest
import mox
import common
# We want to import setup_django_lite_environment first so that the database
# is setup correctly.
from autotest_lib.frontend import setup_django_lite_environment
from autotest_lib.client.common_lib import utils
from autotest_lib.frontend.afe import models, rpc_interface
from django import test
from autotest_lib.server.cros import repair_utils
# See complete_failures_functional_tests.py for why we need this.
class MockDatetime(datetime.datetime):
"""Used to mock out parts of datetime.datetime."""
pass
# We use a mock rpc client object so that we instead directly use the server.
class MockAFE():
"""Used to mock out the rpc client."""
def run(self, func, **kwargs):
"""
The fake run call directly contacts the server.
@param func: The name of the remote function that is being called.
@param kwargs: The arguments to the remotely called function.
"""
return utils.strip_unicode(getattr(rpc_interface, func)(**kwargs))
class FindProblemTestTests(mox.MoxTestBase, test.TestCase):
"""Test that we properly find the last ran job."""
def setUp(self):
super(FindProblemTestTests, self).setUp()
self.mox.StubOutWithMock(MockDatetime, 'today')
self.datetime = datetime.datetime
datetime.datetime = MockDatetime
self._orig_cutoff = repair_utils._CUTOFF_AFTER_TIMEOUT_MINS
self._orig_timeout = repair_utils._DEFAULT_TEST_TIMEOUT_MINS
def tearDown(self):
repair_utils._DEFAULT_TEST_TIMEOUT_MINS = self._orig_timeout
repair_utils._CUTOFF_AFTER_TIMEOUT_MINS = self._orig_cutoff
datetime.datetime = self.datetime
super(FindProblemTestTests, self).tearDown()
def test_should_get_most_recent_job(self):
"""Test that, for a given host, we get the last job ran on that host."""
host = models.Host(hostname='host')
host.save()
old_job = models.Job(owner='me', name='old_job',
created_on=datetime.datetime(2012, 1, 1))
old_job.save()
old_host_queue_entry = models.HostQueueEntry(
job=old_job, host=host, status='test',
started_on=datetime.datetime(2012, 1, 1, 1))
old_host_queue_entry.save()
new_job = models.Job(owner='me', name='new_job',
created_on=datetime.datetime(2012, 1, 1))
new_job.save()
new_host_queue_entry = models.HostQueueEntry(
job=new_job, host=host, status='test',
started_on=datetime.datetime(2012, 1, 1, 2))
new_host_queue_entry.save()
mock_rpc = MockAFE()
datetime.datetime.today().AndReturn(datetime.datetime(2012,1,1))
repair_utils._DEFAULT_TEST_TIMEOUT_MINS = 1440
repair_utils._CUTOFF_AFTER_TIMEOUT_MINS = 60
self.mox.ReplayAll()
result = repair_utils._find_problem_test('host', mock_rpc)
self.assertEqual(result['job']['name'], 'new_job')
def test_should_get_job_for_specified_host_only(self):
"""Test that we only get a job that is for the given host."""
correct_job = models.Job(owner='me', name='correct_job',
created_on=datetime.datetime(2012, 1, 1))
correct_job.save()
correct_host = models.Host(hostname='correct_host')
correct_host.save()
correct_host_queue_entry = models.HostQueueEntry(
job=correct_job, host=correct_host, status='test',
started_on=datetime.datetime(2012, 1, 1, 1))
correct_host_queue_entry.save()
wrong_job = models.Job(owner='me', name='wrong_job',
created_on=datetime.datetime(2012, 1, 1))
wrong_job.save()
wrong_host = models.Host(hostname='wrong_host')
wrong_host.save()
wrong_host_queue_entry = models.HostQueueEntry(
job=wrong_job, host=wrong_host, status='test',
started_on=datetime.datetime(2012, 1, 1, 2))
wrong_host_queue_entry.save()
mock_rpc = MockAFE()
datetime.datetime.today().AndReturn(datetime.datetime(2012,1,1))
repair_utils._DEFAULT_TEST_TIMEOUT_MINS = 1440
repair_utils._CUTOFF_AFTER_TIMEOUT_MINS = 60
self.mox.ReplayAll()
result = repair_utils._find_problem_test('correct_host', mock_rpc)
self.assertEqual(result['job']['name'], 'correct_job')
def test_return_jobs_ran_soon_after_max_job_runtime(self):
"""Test that we get jobs that are just past the max runtime."""
host = models.Host(hostname='host')
host.save()
new_job = models.Job(owner='me', name='new_job',
created_on=datetime.datetime(2012, 1, 1, 0, 0))
new_job.save()
new_host_queue_entry = models.HostQueueEntry(
job=new_job, host=host, status='test',
started_on=datetime.datetime(2012, 1, 1, 2))
new_host_queue_entry.save()
mock_rpc = MockAFE()
datetime.datetime.today().AndReturn(datetime.datetime(2012, 1, 2, 0,
30))
repair_utils._DEFAULT_TEST_TIMEOUT_MINS = 1440
repair_utils._CUTOFF_AFTER_TIMEOUT_MINS = 60
self.mox.ReplayAll()
result = repair_utils._find_problem_test('host', mock_rpc)
self.assertEqual(result['job']['name'], 'new_job')
if __name__ == '__main__':
unittest.main()