blob: eb8c61fb900908cc6839be325504eaa38030529a [file] [log] [blame]
#!/usr/bin/python
#
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Unit tests for site_utils/deduping_scheduler.py."""
import logging
import mox
import unittest
import deduping_scheduler
from autotest_lib.server import frontend
class DedupingSchedulerTest(mox.MoxTestBase):
"""Unit tests for DedupingScheduler
@var _BUILD: fake build
@var _BOARD: fake board to reimage
@var _SUITE: fake suite name
@var _POOL: fake machine pool name
"""
_BUILD = 'build'
_BOARD = 'board'
_SUITE = 'suite'
_POOL = 'pool'
_NUM = 2
def setUp(self):
super(DedupingSchedulerTest, self).setUp()
self.afe = self.mox.CreateMock(frontend.AFE)
self.scheduler = deduping_scheduler.DedupingScheduler(afe=self.afe)
def testScheduleSuite(self):
"""Test a successful de-dup and suite schedule."""
# A similar suite has not already been scheduled.
self.afe.get_jobs(name__startswith=self._BUILD,
name__endswith='control.'+self._SUITE).AndReturn([])
# Expect an attempt to schedule; allow it to succeed.
self.afe.run('create_suite_job',
suite_name=self._SUITE,
board=self._BOARD,
build=self._BUILD,
check_hosts=False,
pool=self._POOL,
num=self._NUM).AndReturn(7)
self.mox.ReplayAll()
self.assertTrue(self.scheduler.ScheduleSuite(self._SUITE,
self._BOARD,
self._BUILD,
self._POOL,
self._NUM))
def testShouldNotScheduleSuite(self):
"""Test a successful de-dup and avoiding scheduling the suite."""
# A similar suite has already been scheduled.
self.afe.get_jobs(
name__startswith=self._BUILD,
name__endswith='control.'+self._SUITE).AndReturn(['42'])
self.mox.ReplayAll()
self.assertFalse(self.scheduler.ScheduleSuite(self._SUITE,
self._BOARD,
self._BUILD,
self._POOL,
None))
def testForceScheduleSuite(self):
"""Test a successful de-dup, but force scheduling the suite."""
# Expect an attempt to schedule; allow it to succeed.
self.afe.run('create_suite_job',
suite_name=self._SUITE,
board=self._BOARD,
build=self._BUILD,
check_hosts=False,
num=None,
pool=self._POOL).AndReturn(7)
self.mox.ReplayAll()
self.assertTrue(self.scheduler.ScheduleSuite(self._SUITE,
self._BOARD,
self._BUILD,
self._POOL,
None,
force=True))
def testShouldScheduleSuiteExplodes(self):
"""Test a failure to de-dup."""
# Barf while checking for similar suites.
self.afe.get_jobs(
name__startswith=self._BUILD,
name__endswith='control.'+self._SUITE).AndRaise(Exception())
self.mox.ReplayAll()
self.assertRaises(deduping_scheduler.DedupException,
self.scheduler.ScheduleSuite,
self._SUITE,
self._BOARD,
self._BUILD,
self._POOL,
self._NUM)
def testScheduleFail(self):
"""Test a successful de-dup and failure to schedule the suite."""
# A similar suite has not already been scheduled.
self.afe.get_jobs(name__startswith=self._BUILD,
name__endswith='control.'+self._SUITE).AndReturn([])
# Expect an attempt to create a job for the suite; fail it.
self.afe.run('create_suite_job',
suite_name=self._SUITE,
board=self._BOARD,
build=self._BUILD,
check_hosts=False,
num=None,
pool=None).AndReturn(None)
self.mox.ReplayAll()
self.assertRaises(deduping_scheduler.ScheduleException,
self.scheduler.ScheduleSuite,
self._SUITE,
self._BOARD,
self._BUILD,
None,
None)
def testScheduleExplodes(self):
"""Test a successful de-dup and barf while scheduling the suite."""
# A similar suite has not already been scheduled.
self.afe.get_jobs(name__startswith=self._BUILD,
name__endswith='control.'+self._SUITE).AndReturn([])
# Expect an attempt to create a job for the suite; barf on it.
self.afe.run('create_suite_job',
suite_name=self._SUITE,
board=self._BOARD,
build=self._BUILD,
check_hosts=False,
num=None,
pool=None).AndRaise(Exception())
self.mox.ReplayAll()
self.assertRaises(deduping_scheduler.ScheduleException,
self.scheduler.ScheduleSuite,
self._SUITE,
self._BOARD,
self._BUILD,
None,
None)
if __name__ == '__main__':
unittest.main()