| #!/usr/bin/python |
| # |
| # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Unit tests for server/cros/host_lock_manager.py.""" |
| |
| import mox |
| import unittest |
| import common |
| |
| from autotest_lib.server import frontend |
| from autotest_lib.server.cros import host_lock_manager |
| |
| class HostLockManagerTest(mox.MoxTestBase): |
| """Unit tests for host_lock_manager.HostLockManager. |
| |
| @attribute HOST1: a string, fake host. |
| @attribute HOST2: a string, fake host. |
| @attribute HOST3: a string, fake host. |
| """ |
| |
| HOST1 = 'host1' |
| HOST2 = 'host2' |
| HOST3 = 'host3' |
| |
| |
| class FakeHost(object): |
| """Fake version of Host object defiend in server/frontend.py. |
| |
| @attribute locked: a boolean, True == host is locked. |
| @attribute locked_by: a string, fake user. |
| @attribute lock_time: a string, fake timestamp. |
| """ |
| |
| def __init__(self, locked=False): |
| """Initialize. |
| |
| @param locked: a boolean, True == host is locked. |
| """ |
| self.locked = locked |
| self.locked_by = 'fake_user' |
| self.lock_time = 'fake time' |
| |
| |
| class MockHostLockManager(host_lock_manager.HostLockManager): |
| """Mock out _host_modifier() in HostLockManager class.. |
| |
| @attribute locked: a boolean, True == host is locked. |
| @attribute locked_by: a string, fake user. |
| @attribute lock_time: a string, fake timestamp. |
| """ |
| |
| def _host_modifier(self, hosts, operation, lock_reason=''): |
| """Overwrites original _host_modifier(). |
| |
| Add hosts to self.locked_hosts for LOCK and remove hosts from |
| self.locked_hosts for UNLOCK. |
| |
| @param a set of strings, host names. |
| @param operation: a string, LOCK or UNLOCK. |
| @param lock_reason: a string, a reason for locking the hosts |
| """ |
| if operation == self.LOCK: |
| assert lock_reason |
| self.locked_hosts = self.locked_hosts.union(hosts) |
| elif operation == self.UNLOCK: |
| self.locked_hosts = self.locked_hosts.difference(hosts) |
| |
| |
| def setUp(self): |
| super(HostLockManagerTest, self).setUp() |
| self.afe = self.mox.CreateMock(frontend.AFE) |
| self.manager = host_lock_manager.HostLockManager(self.afe) |
| |
| |
| def testCheckHost_SkipsUnknownHost(self): |
| """Test that host unknown to AFE is skipped.""" |
| self.afe.get_hosts(hostname=self.HOST1).AndReturn(None) |
| self.mox.ReplayAll() |
| actual = self.manager._check_host(self.HOST1, None) |
| self.assertEquals(None, actual) |
| |
| |
| def testCheckHost_DetectsLockedHost(self): |
| """Test that a host which is already locked is skipped.""" |
| host_info = [self.FakeHost(locked=True)] |
| self.afe.get_hosts(hostname=self.HOST1).AndReturn(host_info) |
| self.mox.ReplayAll() |
| actual = self.manager._check_host(self.HOST1, self.manager.LOCK) |
| self.assertEquals(None, actual) |
| |
| |
| def testCheckHost_DetectsUnlockedHost(self): |
| """Test that a host which is already unlocked is skipped.""" |
| host_info = [self.FakeHost()] |
| self.afe.get_hosts(hostname=self.HOST1).AndReturn(host_info) |
| self.mox.ReplayAll() |
| actual = self.manager._check_host(self.HOST1, self.manager.UNLOCK) |
| self.assertEquals(None, actual) |
| |
| |
| def testCheckHost_ReturnsHostToLock(self): |
| """Test that a host which can be locked is returned.""" |
| host_info = [self.FakeHost()] |
| self.afe.get_hosts(hostname=self.HOST1).AndReturn(host_info) |
| self.mox.ReplayAll() |
| host_with_dot = '.'.join([self.HOST1, 'cros']) |
| actual = self.manager._check_host(host_with_dot, self.manager.LOCK) |
| self.assertEquals(self.HOST1, actual) |
| |
| |
| def testCheckHost_ReturnsHostToUnlock(self): |
| """Test that a host which can be unlocked is returned.""" |
| host_info = [self.FakeHost(locked=True)] |
| self.afe.get_hosts(hostname=self.HOST1).AndReturn(host_info) |
| self.mox.ReplayAll() |
| host_with_dot = '.'.join([self.HOST1, 'cros']) |
| actual = self.manager._check_host(host_with_dot, self.manager.UNLOCK) |
| self.assertEquals(self.HOST1, actual) |
| |
| |
| def testLock_WithNonOverlappingHosts(self): |
| """Tests host locking, all hosts not in self.locked_hosts.""" |
| hosts = [self.HOST2] |
| manager = self.MockHostLockManager(self.afe) |
| manager.locked_hosts = set([self.HOST1]) |
| manager.lock(hosts, lock_reason='Locking for test') |
| self.assertEquals(set([self.HOST1, self.HOST2]), manager.locked_hosts) |
| |
| |
| def testLock_WithPartialOverlappingHosts(self): |
| """Tests host locking, some hosts not in self.locked_hosts.""" |
| hosts = [self.HOST1, self.HOST2] |
| manager = self.MockHostLockManager(self.afe) |
| manager.locked_hosts = set([self.HOST1, self.HOST3]) |
| manager.lock(hosts, lock_reason='Locking for test') |
| self.assertEquals(set([self.HOST1, self.HOST2, self.HOST3]), |
| manager.locked_hosts) |
| |
| |
| def testLock_WithFullyOverlappingHosts(self): |
| """Tests host locking, all hosts in self.locked_hosts.""" |
| hosts = [self.HOST1, self.HOST2] |
| self.manager.locked_hosts = set(hosts) |
| self.manager.lock(hosts) |
| self.assertEquals(set(hosts), self.manager.locked_hosts) |
| |
| |
| def testUnlock_WithNonOverlappingHosts(self): |
| """Tests host unlocking, all hosts not in self.locked_hosts.""" |
| hosts = [self.HOST2] |
| self.manager.locked_hosts = set([self.HOST1]) |
| self.manager.unlock(hosts) |
| self.assertEquals(set([self.HOST1]), self.manager.locked_hosts) |
| |
| |
| def testUnlock_WithPartialOverlappingHosts(self): |
| """Tests host locking, some hosts not in self.locked_hosts.""" |
| hosts = [self.HOST1, self.HOST2] |
| manager = self.MockHostLockManager(self.afe) |
| manager.locked_hosts = set([self.HOST1, self.HOST3]) |
| manager.unlock(hosts) |
| self.assertEquals(set([self.HOST3]), manager.locked_hosts) |
| |
| |
| def testUnlock_WithFullyOverlappingHosts(self): |
| """Tests host locking, all hosts in self.locked_hosts.""" |
| hosts = [self.HOST1, self.HOST2] |
| manager = self.MockHostLockManager(self.afe) |
| manager.locked_hosts = set([self.HOST1, self.HOST2, self.HOST3]) |
| manager.unlock(hosts) |
| self.assertEquals(set([self.HOST3]), manager.locked_hosts) |
| |
| |
| def testHostModifier_WithHostsToLock(self): |
| """Test host locking.""" |
| hosts = set([self.HOST1]) |
| self.manager.locked_hosts = set([self.HOST2]) |
| self.mox.StubOutWithMock(self.manager, '_check_host') |
| self.manager._check_host(self.HOST1, |
| self.manager.LOCK).AndReturn(self.HOST1) |
| self.afe.run('modify_hosts', |
| host_filter_data={'hostname__in': [self.HOST1]}, |
| update_data={'locked': True, 'lock_reason': 'Test'}) |
| self.mox.ReplayAll() |
| self.manager._host_modifier(hosts, self.manager.LOCK, |
| lock_reason='Test') |
| self.assertEquals(set([self.HOST1, self.HOST2]), |
| self.manager.locked_hosts) |
| |
| |
| def testHostModifier_WithHostsToUnlock(self): |
| """Test host unlocking.""" |
| hosts = set([self.HOST1]) |
| self.manager.locked_hosts = set([self.HOST1, self.HOST2]) |
| self.mox.StubOutWithMock(self.manager, '_check_host') |
| self.manager._check_host(self.HOST1, |
| self.manager.UNLOCK).AndReturn(self.HOST1) |
| self.afe.run('modify_hosts', |
| host_filter_data={'hostname__in': [self.HOST1]}, |
| update_data={'locked': False}) |
| self.mox.ReplayAll() |
| self.manager._host_modifier(hosts, self.manager.UNLOCK) |
| self.assertEquals(set([self.HOST2]), self.manager.locked_hosts) |
| |
| |
| def testHostModifier_WithoutLockReason(self): |
| """Test host locking without providing a lock reason.""" |
| hosts = set([self.HOST1]) |
| self.manager.locked_hosts = set([self.HOST2]) |
| self.mox.StubOutWithMock(self.manager, '_check_host') |
| self.manager._check_host(self.HOST1, |
| self.manager.LOCK).AndReturn(self.HOST1) |
| self.afe.run('modify_hosts', |
| host_filter_data={'hostname__in': [self.HOST1]}, |
| update_data={'locked': True, |
| 'lock_reason': None}) |
| self.mox.ReplayAll() |
| self.manager._host_modifier(hosts, self.manager.LOCK) |
| self.assertEquals(set([self.HOST2]), self.manager.locked_hosts) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |