blob: 4a4a2a2ee69ff6807872d7ace61d0c49245ee775 [file] [log] [blame] [edit]
#!/usr/bin/python
import cPickle
import os, unittest
import common
from autotest_lib.client.bin import local_host
from autotest_lib.client.common_lib import global_config
from autotest_lib.client.common_lib import utils
from autotest_lib.client.common_lib.test_utils import mock
from autotest_lib.frontend import setup_django_lite_environment
from autotest_lib.scheduler import drone_manager, drone_utility, drones
from autotest_lib.scheduler import scheduler_config, drone_manager
from autotest_lib.scheduler import thread_lib
from autotest_lib.scheduler import pidfile_monitor
from autotest_lib.server.hosts import ssh_host
class MockDrone(drones._AbstractDrone):
def __init__(self, name, active_processes=0, max_processes=10,
allowed_users=None):
super(MockDrone, self).__init__()
self.name = name
self.hostname = name
self.active_processes = active_processes
self.max_processes = max_processes
self.allowed_users = allowed_users
self._host = 'mock_drone'
# maps method names list of tuples containing method arguments
self._recorded_calls = {'queue_call': [],
'send_file_to': []}
def queue_call(self, method, *args, **kwargs):
self._recorded_calls['queue_call'].append((method, args, kwargs))
def call(self, method, *args, **kwargs):
# don't bother differentiating between call() and queue_call()
return self.queue_call(method, *args, **kwargs)
def send_file_to(self, drone, source_path, destination_path,
can_fail=False):
self._recorded_calls['send_file_to'].append(
(drone, source_path, destination_path))
# method for use by tests
def _check_for_recorded_call(self, method_name, arguments):
recorded_arg_list = self._recorded_calls[method_name]
was_called = arguments in recorded_arg_list
if not was_called:
print 'Recorded args:', recorded_arg_list
print 'Expected:', arguments
return was_called
def was_call_queued(self, method, *args, **kwargs):
return self._check_for_recorded_call('queue_call',
(method, args, kwargs))
def was_file_sent(self, drone, source_path, destination_path):
return self._check_for_recorded_call('send_file_to',
(drone, source_path,
destination_path))
class DroneManager(unittest.TestCase):
_DRONE_INSTALL_DIR = '/drone/install/dir'
_DRONE_RESULTS_DIR = os.path.join(_DRONE_INSTALL_DIR, 'results')
_RESULTS_DIR = '/results/dir'
_SOURCE_PATH = 'source/path'
_DESTINATION_PATH = 'destination/path'
_WORKING_DIRECTORY = 'working/directory'
_USERNAME = 'my_user'
def setUp(self):
self.god = mock.mock_god()
self.god.stub_with(drones, 'AUTOTEST_INSTALL_DIR',
self._DRONE_INSTALL_DIR)
self.manager = drone_manager.DroneManager()
self.god.stub_with(self.manager, '_results_dir', self._RESULTS_DIR)
# we don't want this to ever actually get called
self.god.stub_function(drones, 'get_drone')
# we don't want the DroneManager to go messing with global config
def do_nothing():
pass
self.god.stub_with(self.manager, 'refresh_drone_configs', do_nothing)
# set up some dummy drones
self.mock_drone = MockDrone('mock_drone')
self.manager._drones[self.mock_drone.name] = self.mock_drone
self.results_drone = MockDrone('results_drone', 0, 10)
self.manager._results_drone = self.results_drone
self.mock_drone_process = drone_manager.Process(self.mock_drone.name, 0)
def tearDown(self):
self.god.unstub_all()
def _test_choose_drone_for_execution_helper(self, processes_info_list,
requested_processes):
for index, process_info in enumerate(processes_info_list):
active_processes, max_processes = process_info
self.manager._enqueue_drone(
MockDrone(index, active_processes, max_processes,
allowed_users=None)
)
return self.manager._choose_drone_for_execution(
requested_processes, self._USERNAME, None)
def test_choose_drone_for_execution(self):
drone = self._test_choose_drone_for_execution_helper([(1, 2), (0, 2)],
1)
self.assertEquals(drone.name, 1)
def test_choose_drone_for_execution_some_full(self):
drone = self._test_choose_drone_for_execution_helper([(0, 1), (1, 3)],
2)
self.assertEquals(drone.name, 1)
def test_choose_drone_for_execution_all_full(self):
drone = self._test_choose_drone_for_execution_helper([(2, 1), (3, 2)],
1)
self.assertEquals(drone.name, 1)
def test_choose_drone_for_execution_all_full_same_percentage_capacity(self):
drone = self._test_choose_drone_for_execution_helper([(5, 3), (10, 6)],
1)
self.assertEquals(drone.name, 1)
def test_user_restrictions(self):
# this drone is restricted to a different user
self.manager._enqueue_drone(MockDrone(1, max_processes=10,
allowed_users=['fakeuser']))
# this drone is allowed but has lower capacity
self.manager._enqueue_drone(MockDrone(2, max_processes=2,
allowed_users=[self._USERNAME]))
self.assertEquals(2,
self.manager.max_runnable_processes(self._USERNAME,
None))
drone = self.manager._choose_drone_for_execution(
1, username=self._USERNAME, drone_hostnames_allowed=None)
self.assertEquals(drone.name, 2)
def test_user_restrictions_with_full_drone(self):
# this drone is restricted to a different user
self.manager._enqueue_drone(MockDrone(1, max_processes=10,
allowed_users=['fakeuser']))
# this drone is allowed but is full
self.manager._enqueue_drone(MockDrone(2, active_processes=3,
max_processes=2,
allowed_users=[self._USERNAME]))
self.assertEquals(0,
self.manager.max_runnable_processes(self._USERNAME,
None))
drone = self.manager._choose_drone_for_execution(
1, username=self._USERNAME, drone_hostnames_allowed=None)
self.assertEquals(drone.name, 2)
def _setup_test_drone_restrictions(self, active_processes=0):
self.manager._enqueue_drone(MockDrone(
1, active_processes=active_processes, max_processes=10))
self.manager._enqueue_drone(MockDrone(
2, active_processes=active_processes, max_processes=5))
self.manager._enqueue_drone(MockDrone(
3, active_processes=active_processes, max_processes=2))
def test_drone_restrictions_allow_any(self):
self._setup_test_drone_restrictions()
self.assertEquals(10,
self.manager.max_runnable_processes(self._USERNAME,
None))
drone = self.manager._choose_drone_for_execution(
1, username=self._USERNAME, drone_hostnames_allowed=None)
self.assertEqual(drone.name, 1)
def test_drone_restrictions_under_capacity(self):
self._setup_test_drone_restrictions()
drone_hostnames_allowed = (2, 3)
self.assertEquals(
5, self.manager.max_runnable_processes(self._USERNAME,
drone_hostnames_allowed))
drone = self.manager._choose_drone_for_execution(
1, username=self._USERNAME,
drone_hostnames_allowed=drone_hostnames_allowed)
self.assertEqual(drone.name, 2)
def test_drone_restrictions_over_capacity(self):
self._setup_test_drone_restrictions(active_processes=6)
drone_hostnames_allowed = (2, 3)
self.assertEquals(
0, self.manager.max_runnable_processes(self._USERNAME,
drone_hostnames_allowed))
drone = self.manager._choose_drone_for_execution(
7, username=self._USERNAME,
drone_hostnames_allowed=drone_hostnames_allowed)
self.assertEqual(drone.name, 2)
def test_drone_restrictions_allow_none(self):
self._setup_test_drone_restrictions()
drone_hostnames_allowed = ()
self.assertEquals(
0, self.manager.max_runnable_processes(self._USERNAME,
drone_hostnames_allowed))
drone = self.manager._choose_drone_for_execution(
1, username=self._USERNAME,
drone_hostnames_allowed=drone_hostnames_allowed)
self.assertEqual(drone, None)
def test_initialize(self):
results_hostname = 'results_repo'
results_install_dir = '/results/install'
global_config.global_config.override_config_value(
scheduler_config.CONFIG_SECTION,
'results_host_installation_directory', results_install_dir)
(drones.get_drone.expect_call(self.mock_drone.name)
.and_return(self.mock_drone))
results_drone = MockDrone('results_drone')
self.god.stub_function(results_drone, 'set_autotest_install_dir')
drones.get_drone.expect_call(results_hostname).and_return(results_drone)
results_drone.set_autotest_install_dir.expect_call(results_install_dir)
self.manager.initialize(base_results_dir=self._RESULTS_DIR,
drone_hostnames=[self.mock_drone.name],
results_repository_hostname=results_hostname)
self.assert_(self.mock_drone.was_call_queued(
'initialize', self._DRONE_RESULTS_DIR + '/'))
self.god.check_playback()
def test_execute_command(self):
self.manager._enqueue_drone(self.mock_drone)
pidfile_name = 'my_pidfile'
log_file = 'log_file'
pidfile_id = self.manager.execute_command(
command=['test', drone_manager.WORKING_DIRECTORY],
working_directory=self._WORKING_DIRECTORY,
pidfile_name=pidfile_name,
num_processes=1,
log_file=log_file)
full_working_directory = os.path.join(self._DRONE_RESULTS_DIR,
self._WORKING_DIRECTORY)
self.assertEquals(pidfile_id.path,
os.path.join(full_working_directory, pidfile_name))
self.assert_(self.mock_drone.was_call_queued(
'execute_command', ['test', full_working_directory],
full_working_directory,
os.path.join(self._DRONE_RESULTS_DIR, log_file), pidfile_name))
def test_attach_file_to_execution(self):
self.manager._enqueue_drone(self.mock_drone)
contents = 'my\ncontents'
attached_path = self.manager.attach_file_to_execution(
self._WORKING_DIRECTORY, contents)
self.manager.execute_command(command=['test'],
working_directory=self._WORKING_DIRECTORY,
pidfile_name='mypidfile',
num_processes=1,
drone_hostnames_allowed=None)
self.assert_(self.mock_drone.was_call_queued(
'write_to_file',
os.path.join(self._DRONE_RESULTS_DIR, attached_path),
contents))
def test_copy_results_on_drone(self):
self.manager.copy_results_on_drone(self.mock_drone_process,
self._SOURCE_PATH,
self._DESTINATION_PATH)
self.assert_(self.mock_drone.was_call_queued(
'copy_file_or_directory',
os.path.join(self._DRONE_RESULTS_DIR, self._SOURCE_PATH),
os.path.join(self._DRONE_RESULTS_DIR, self._DESTINATION_PATH)))
def test_copy_to_results_repository(self):
drone_manager.ENABLE_ARCHIVING = True
self.manager._copy_to_results_repository(self.mock_drone_process,
self._SOURCE_PATH)
self.assert_(self.mock_drone.was_file_sent(
self.results_drone,
os.path.join(self._DRONE_RESULTS_DIR, self._SOURCE_PATH),
os.path.join(self._RESULTS_DIR, self._SOURCE_PATH)))
def test_write_lines_to_file(self):
file_path = 'file/path'
lines = ['line1', 'line2']
written_data = 'line1\nline2\n'
# write to results repository
self.manager.write_lines_to_file(file_path, lines)
self.assert_(self.results_drone.was_call_queued(
'write_to_file', os.path.join(self._RESULTS_DIR, file_path),
written_data))
# write to a drone
self.manager.write_lines_to_file(
file_path, lines, paired_with_process=self.mock_drone_process)
self.assert_(self.mock_drone.was_call_queued(
'write_to_file',
os.path.join(self._DRONE_RESULTS_DIR, file_path), written_data))
def test_pidfile_expiration(self):
self.god.stub_with(self.manager, '_get_max_pidfile_refreshes',
lambda: 0)
pidfile_id = self.manager.get_pidfile_id_from('tag', 'name')
self.manager.register_pidfile(pidfile_id)
self.manager._drop_old_pidfiles()
self.manager._drop_old_pidfiles()
self.assertFalse(self.manager._registered_pidfile_info)
class ThreadedDroneTest(unittest.TestCase):
_DRONE_INSTALL_DIR = '/drone/install/dir'
_RESULTS_DIR = '/results/dir'
_DRONE_CLASS = drones._RemoteDrone
_DRONE_HOST = ssh_host.SSHHost
def create_drone(self, drone_hostname, mock_hostname,
timestamp_remote_calls=False):
"""Create and initialize a Remote Drone.
@return: A remote drone instance.
"""
mock_host = self.god.create_mock_class(self._DRONE_HOST, mock_hostname)
self.god.stub_function(drones.drone_utility, 'create_host')
drones.drone_utility.create_host.expect_call(drone_hostname).and_return(
mock_host)
mock_host.is_up.expect_call().and_return(True)
return self._DRONE_CLASS(drone_hostname,
timestamp_remote_calls=timestamp_remote_calls)
def create_fake_pidfile_info(self, tag='tag', name='name'):
pidfile_id = self.manager.get_pidfile_id_from(tag, name)
self.manager.register_pidfile(pidfile_id)
return self.manager._registered_pidfile_info
def setUp(self):
self.god = mock.mock_god()
self.god.stub_with(drones, 'AUTOTEST_INSTALL_DIR',
self._DRONE_INSTALL_DIR)
self.manager = drone_manager.DroneManager()
self.god.stub_with(self.manager, '_results_dir', self._RESULTS_DIR)
# we don't want this to ever actually get called
self.god.stub_function(drones, 'get_drone')
# we don't want the DroneManager to go messing with global config
def do_nothing():
pass
self.god.stub_with(self.manager, 'refresh_drone_configs', do_nothing)
self.results_drone = MockDrone('results_drone', 0, 10)
self.manager._results_drone = self.results_drone
self.drone_utility_path = 'mock-drone-utility-path'
self.mock_return = {'results': ['mock results'],
'warnings': []}
def tearDown(self):
self.god.unstub_all()
def test_trigger_refresh(self):
"""Test drone manager trigger refresh."""
self.god.stub_with(self._DRONE_CLASS, '_drone_utility_path',
self.drone_utility_path)
mock_drone = self.create_drone('fakedrone1', 'fakehost1')
self.manager._drones[mock_drone.hostname] = mock_drone
# Create some fake pidfiles and confirm that a refresh call is
# executed on each drone host, with the same pidfile paths. Then
# check that each drone gets a key in the returned results dictionary.
for i in range(0, 1):
pidfile_info = self.create_fake_pidfile_info(
'tag%s' % i, 'name%s' %i)
pidfile_paths = [pidfile.path for pidfile in pidfile_info.keys()]
refresh_call = drone_utility.call('refresh', pidfile_paths)
expected_results = {}
mock_result = utils.CmdResult(
stdout=cPickle.dumps(self.mock_return))
for drone in self.manager.get_drones():
drone._host.run.expect_call(
'python %s' % self.drone_utility_path,
stdin=cPickle.dumps([refresh_call]), stdout_tee=None,
connect_timeout=mock.is_instance_comparator(int)
).and_return(mock_result)
expected_results[drone] = self.mock_return['results']
self.manager.trigger_refresh()
self.assertTrue(self.manager._refresh_task_queue.get_results() ==
expected_results)
self.god.check_playback()
def test_sync_refresh(self):
"""Test drone manager sync refresh."""
mock_drone = self.create_drone('fakedrone1', 'fakehost1')
self.manager._drones[mock_drone.hostname] = mock_drone
# Insert some drone_utility results into the results queue, then
# check that get_results returns it in the right format, and that
# the rest of sync_refresh populates the right datastructures for
# correct handling of agents. Also confirm that this method of
# syncing is sufficient for the monitor to pick up the exit status
# of the process in the same way it would in handle_agents.
pidfile_path = 'results/hosts/host_id/job_id-name/.autoserv_execute'
pidfiles = {pidfile_path: '123\n12\n0\n'}
drone_utility_results = {
'pidfiles': pidfiles,
'autoserv_processes':{},
'all_processes':{},
'parse_processes':{},
'pidfiles_second_read':pidfiles,
}
# Our manager instance isn't the drone manager singletone that the
# pidfile_monitor will use by default, becuase setUp doesn't call
# drone_manager.instance().
self.god.stub_with(drone_manager, '_the_instance', self.manager)
monitor = pidfile_monitor.PidfileRunMonitor()
monitor.pidfile_id = drone_manager.PidfileId(pidfile_path)
self.manager.register_pidfile(monitor.pidfile_id)
self.assertTrue(monitor._state.exit_status == None)
self.manager._refresh_task_queue.results_queue.put(
thread_lib.ThreadedTaskQueue.result(
mock_drone, [drone_utility_results]))
self.manager.sync_refresh()
pidfiles = self.manager._pidfiles
pidfile_id = pidfiles.keys()[0]
pidfile_contents = pidfiles[pidfile_id]
self.assertTrue(
pidfile_id.path == pidfile_path and
pidfile_contents.process.pid == 123 and
pidfile_contents.process.hostname ==
mock_drone.hostname and
pidfile_contents.exit_status == 12 and
pidfile_contents.num_tests_failed == 0)
self.assertTrue(monitor.exit_code() == 12)
self.god.check_playback()
class ThreadedLocalhostDroneTest(ThreadedDroneTest):
_DRONE_CLASS = drones._LocalDrone
_DRONE_HOST = local_host.LocalHost
def create_drone(self, drone_hostname, mock_hostname,
timestamp_remote_calls=False):
"""Create and initialize a Remote Drone.
@return: A remote drone instance.
"""
mock_host = self.god.create_mock_class(self._DRONE_HOST, mock_hostname)
self.god.stub_function(drones.drone_utility, 'create_host')
local_drone = self._DRONE_CLASS(
timestamp_remote_calls=timestamp_remote_calls)
self.god.stub_with(local_drone, '_host', mock_host)
return local_drone
if __name__ == '__main__':
unittest.main()