blob: d2ee2a25096f202ad27e5773d640048a49ee39db [file] [log] [blame]
#!/usr/bin/python
import os, unittest
import common
from autotest_lib.client.common_lib import global_config
from autotest_lib.client.common_lib.test_utils import mock
from autotest_lib.scheduler import drone_manager, drone_utility, drones
from autotest_lib.scheduler import scheduler_config, site_drone_manager
class MockDrone(drones._AbstractDrone):
def __init__(self, name, active_processes=0, max_processes=10,
allowed_users=None):
super(MockDrone, self).__init__()
self.name = name
self.hostname = name
self.active_processes = active_processes
self.max_processes = max_processes
self.allowed_users = allowed_users
# maps method names list of tuples containing method arguments
self._recorded_calls = {'queue_call': [],
'send_file_to': []}
def queue_call(self, method, *args, **kwargs):
self._recorded_calls['queue_call'].append((method, args, kwargs))
def call(self, method, *args, **kwargs):
# don't bother differentiating between call() and queue_call()
return self.queue_call(method, *args, **kwargs)
def send_file_to(self, drone, source_path, destination_path,
can_fail=False):
self._recorded_calls['send_file_to'].append(
(drone, source_path, destination_path))
# method for use by tests
def _check_for_recorded_call(self, method_name, arguments):
recorded_arg_list = self._recorded_calls[method_name]
was_called = arguments in recorded_arg_list
if not was_called:
print 'Recorded args:', recorded_arg_list
print 'Expected:', arguments
return was_called
def was_call_queued(self, method, *args, **kwargs):
return self._check_for_recorded_call('queue_call',
(method, args, kwargs))
def was_file_sent(self, drone, source_path, destination_path):
return self._check_for_recorded_call('send_file_to',
(drone, source_path,
destination_path))
class DroneManager(unittest.TestCase):
_DRONE_INSTALL_DIR = '/drone/install/dir'
_DRONE_RESULTS_DIR = os.path.join(_DRONE_INSTALL_DIR, 'results')
_RESULTS_DIR = '/results/dir'
_SOURCE_PATH = 'source/path'
_DESTINATION_PATH = 'destination/path'
_WORKING_DIRECTORY = 'working/directory'
_USERNAME = 'my_user'
def setUp(self):
self.god = mock.mock_god()
self.god.stub_with(drones, 'AUTOTEST_INSTALL_DIR',
self._DRONE_INSTALL_DIR)
self.manager = drone_manager.DroneManager()
self.god.stub_with(self.manager, '_results_dir', self._RESULTS_DIR)
# we don't want this to ever actually get called
self.god.stub_function(drones, 'get_drone')
# we don't want the DroneManager to go messing with global config
def do_nothing():
pass
self.god.stub_with(self.manager, 'refresh_drone_configs', do_nothing)
# set up some dummy drones
self.mock_drone = MockDrone('mock_drone')
self.manager._drones[self.mock_drone.name] = self.mock_drone
self.results_drone = MockDrone('results_drone', 0, 10)
self.manager._results_drone = self.results_drone
self.mock_drone_process = drone_manager.Process(self.mock_drone.name, 0)
def tearDown(self):
self.god.unstub_all()
def _test_choose_drone_for_execution_helper(self, processes_info_list,
requested_processes):
for index, process_info in enumerate(processes_info_list):
active_processes, max_processes = process_info
self.manager._enqueue_drone(MockDrone(index, active_processes,
max_processes))
return self.manager._choose_drone_for_execution(requested_processes,
self._USERNAME, None)
def test_choose_drone_for_execution(self):
drone = self._test_choose_drone_for_execution_helper([(1, 2), (0, 2)],
1)
self.assertEquals(drone.name, 1)
def test_choose_drone_for_execution_some_full(self):
drone = self._test_choose_drone_for_execution_helper([(0, 1), (1, 3)],
2)
self.assertEquals(drone.name, 1)
def test_choose_drone_for_execution_all_full(self):
drone = self._test_choose_drone_for_execution_helper([(2, 1), (3, 2)],
1)
self.assertEquals(drone.name, 1)
def test_choose_drone_for_execution_all_full_same_percentage_capacity(self):
drone = self._test_choose_drone_for_execution_helper([(5, 3), (10, 6)],
1)
self.assertEquals(drone.name, 1)
def test_user_restrictions(self):
# this drone is restricted to a different user
self.manager._enqueue_drone(MockDrone(1, max_processes=10,
allowed_users=['fakeuser']))
# this drone is allowed but has lower capacity
self.manager._enqueue_drone(MockDrone(2, max_processes=2,
allowed_users=[self._USERNAME]))
self.assertEquals(2,
self.manager.max_runnable_processes(self._USERNAME,
None))
drone = self.manager._choose_drone_for_execution(
1, username=self._USERNAME, drone_hostnames_allowed=None)
self.assertEquals(drone.name, 2)
def test_user_restrictions_with_full_drone(self):
# this drone is restricted to a different user
self.manager._enqueue_drone(MockDrone(1, max_processes=10,
allowed_users=['fakeuser']))
# this drone is allowed but is full
self.manager._enqueue_drone(MockDrone(2, active_processes=3,
max_processes=2,
allowed_users=[self._USERNAME]))
self.assertEquals(0,
self.manager.max_runnable_processes(self._USERNAME,
None))
drone = self.manager._choose_drone_for_execution(
1, username=self._USERNAME, drone_hostnames_allowed=None)
self.assertEquals(drone.name, 2)
def _setup_test_drone_restrictions(self, active_processes=0):
self.manager._enqueue_drone(MockDrone(
1, active_processes=active_processes, max_processes=10))
self.manager._enqueue_drone(MockDrone(
2, active_processes=active_processes, max_processes=5))
self.manager._enqueue_drone(MockDrone(
3, active_processes=active_processes, max_processes=2))
def test_drone_restrictions_allow_any(self):
self._setup_test_drone_restrictions()
self.assertEquals(10,
self.manager.max_runnable_processes(self._USERNAME,
None))
drone = self.manager._choose_drone_for_execution(
1, username=self._USERNAME, drone_hostnames_allowed=None)
self.assertEqual(drone.name, 1)
def test_drone_restrictions_under_capacity(self):
self._setup_test_drone_restrictions()
drone_hostnames_allowed = (2, 3)
self.assertEquals(
5, self.manager.max_runnable_processes(self._USERNAME,
drone_hostnames_allowed))
drone = self.manager._choose_drone_for_execution(
1, username=self._USERNAME,
drone_hostnames_allowed=drone_hostnames_allowed)
self.assertEqual(drone.name, 2)
def test_drone_restrictions_over_capacity(self):
self._setup_test_drone_restrictions(active_processes=6)
drone_hostnames_allowed = (2, 3)
self.assertEquals(
0, self.manager.max_runnable_processes(self._USERNAME,
drone_hostnames_allowed))
drone = self.manager._choose_drone_for_execution(
7, username=self._USERNAME,
drone_hostnames_allowed=drone_hostnames_allowed)
self.assertEqual(drone.name, 2)
def test_drone_restrictions_allow_none(self):
self._setup_test_drone_restrictions()
drone_hostnames_allowed = ()
self.assertEquals(
0, self.manager.max_runnable_processes(self._USERNAME,
drone_hostnames_allowed))
drone = self.manager._choose_drone_for_execution(
1, username=self._USERNAME,
drone_hostnames_allowed=drone_hostnames_allowed)
self.assertEqual(drone, None)
def test_initialize(self):
results_hostname = 'results_repo'
results_install_dir = '/results/install'
global_config.global_config.override_config_value(
scheduler_config.CONFIG_SECTION,
'results_host_installation_directory', results_install_dir)
(drones.get_drone.expect_call(self.mock_drone.name)
.and_return(self.mock_drone))
results_drone = MockDrone('results_drone')
self.god.stub_function(results_drone, 'set_autotest_install_dir')
drones.get_drone.expect_call(results_hostname).and_return(results_drone)
results_drone.set_autotest_install_dir.expect_call(results_install_dir)
self.manager.initialize(base_results_dir=self._RESULTS_DIR,
drone_hostnames=[self.mock_drone.name],
results_repository_hostname=results_hostname)
self.assert_(self.mock_drone.was_call_queued(
'initialize', self._DRONE_RESULTS_DIR + '/'))
self.god.check_playback()
def test_execute_command(self):
self.manager._enqueue_drone(self.mock_drone)
pidfile_name = 'my_pidfile'
log_file = 'log_file'
pidfile_id = self.manager.execute_command(
command=['test', drone_manager.WORKING_DIRECTORY],
working_directory=self._WORKING_DIRECTORY,
pidfile_name=pidfile_name,
num_processes=1,
log_file=log_file)
full_working_directory = os.path.join(self._DRONE_RESULTS_DIR,
self._WORKING_DIRECTORY)
self.assertEquals(pidfile_id.path,
os.path.join(full_working_directory, pidfile_name))
self.assert_(self.mock_drone.was_call_queued(
'execute_command', ['test', full_working_directory],
full_working_directory,
os.path.join(self._DRONE_RESULTS_DIR, log_file), pidfile_name))
def test_attach_file_to_execution(self):
self.manager._enqueue_drone(self.mock_drone)
contents = 'my\ncontents'
attached_path = self.manager.attach_file_to_execution(
self._WORKING_DIRECTORY, contents)
self.manager.execute_command(command=['test'],
working_directory=self._WORKING_DIRECTORY,
pidfile_name='mypidfile',
num_processes=1,
drone_hostnames_allowed=None)
self.assert_(self.mock_drone.was_call_queued(
'write_to_file',
os.path.join(self._DRONE_RESULTS_DIR, attached_path),
contents))
def test_copy_results_on_drone(self):
self.manager.copy_results_on_drone(self.mock_drone_process,
self._SOURCE_PATH,
self._DESTINATION_PATH)
self.assert_(self.mock_drone.was_call_queued(
'copy_file_or_directory',
os.path.join(self._DRONE_RESULTS_DIR, self._SOURCE_PATH),
os.path.join(self._DRONE_RESULTS_DIR, self._DESTINATION_PATH)))
def test_copy_to_results_repository(self):
site_drone_manager.ENABLE_ARCHIVING = True
self.manager.copy_to_results_repository(self.mock_drone_process,
self._SOURCE_PATH)
self.assert_(self.mock_drone.was_file_sent(
self.results_drone,
os.path.join(self._DRONE_RESULTS_DIR, self._SOURCE_PATH),
os.path.join(self._RESULTS_DIR, self._SOURCE_PATH)))
def test_write_lines_to_file(self):
file_path = 'file/path'
lines = ['line1', 'line2']
written_data = 'line1\nline2\n'
# write to results repository
self.manager.write_lines_to_file(file_path, lines)
self.assert_(self.results_drone.was_call_queued(
'write_to_file', os.path.join(self._RESULTS_DIR, file_path),
written_data))
# write to a drone
self.manager.write_lines_to_file(
file_path, lines, paired_with_process=self.mock_drone_process)
self.assert_(self.mock_drone.was_call_queued(
'write_to_file',
os.path.join(self._DRONE_RESULTS_DIR, file_path), written_data))
def test_pidfile_expiration(self):
self.god.stub_with(self.manager, '_get_max_pidfile_refreshes',
lambda: 0)
pidfile_id = self.manager.get_pidfile_id_from('tag', 'name')
self.manager.register_pidfile(pidfile_id)
self.manager._drop_old_pidfiles()
self.manager._drop_old_pidfiles()
self.assertFalse(self.manager._registered_pidfile_info)
if __name__ == '__main__':
unittest.main()