| # -*- coding: utf-8 -*- |
| # |
| # Copyright (c) 2010 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| AUTHOR = "Chrome OS Team" |
| NAME = "Factory" |
| TIME = "LONG" |
| TEST_CATEGORY = "Functional" |
| TEST_CLASS = "suite" |
| TEST_TYPE = "client" |
| |
| DOC = """ |
| This suite executed all of the factory tests, and incorporates an |
| X-windows GTK-based UI to highlight testing results and to allow |
| factory operators to switch between tests, and to re-run tests, all |
| on-demand via keyboard shortcuts. |
| |
| The UI is implemented as a seperate process (see _FACTORY_UI_PATH), |
| which means that interprocess communication is needed between this |
| control, the UI, and the tests (which are forked children of this |
| control process). """ |
| |
| |
| import imp |
| import os |
| import pprint |
| import signal |
| import traceback |
| |
| imp.load_source('common', job.autodir + '/bin/common.py') |
| |
| from autotest_lib.client.bin import factory |
| from autotest_lib.client.bin import parallel |
| |
| |
| FACTORY_UI_PATH = job.autodir + '/bin/factory_ui' |
| FACTORY_STATE_SERVER_PATH = job.autodir + '/bin/factory_state.py' |
| STATUS_FILE_PATH = job.autodir + '/results/default/status' |
| DEFAULT_TEST_LIST_PATH = job.autodir + '/site_tests/suite_Factory/test_list' |
| |
| |
| # Hack to grab the pid for forked tests. |
| |
| from autotest_lib.client.bin.parallel import fork_waitfor as orig_fork_waitfor |
| |
| def factory_fork_waitfor(tmp, pid): |
| factory.set_shared_data('active_test_data', (tmp, pid)) |
| (pid, status) = os.waitpid(pid, 0) |
| parallel._check_for_subprocess_exception(tmp, pid) |
| if status == signal.SIGTERM: |
| raise error.TestError('Test TERMINATED (return code = %s),\n' |
| 'Maybe caused by hitting keyboard shotcut.' % |
| status) |
| elif status: |
| raise error.TestError("Test FAILED with return code: %d" % (status)) |
| |
| parallel.fork_waitfor = factory_fork_waitfor |
| |
| |
| # These definitions are expose these classes directly into this |
| # namespace, so that the following exec'ed file can have cleaner |
| # syntax. These are done in this fashion, as opposed to "from factory |
| # import <class>" to work-around Python namespace wackiness -- the |
| # from syntax does not work, creating new classes. |
| OperatorTest = factory.OperatorTest |
| InformationScreen = factory.InformationScreen |
| AutomatedSequence = factory.AutomatedSequence |
| AutomatedSubTest = factory.AutomatedSubTest |
| AutomatedRebootSubTest = factory.AutomatedRebootSubTest |
| |
| |
| # Hack to work around autotest's obsession with GRUB. |
| job.bootloader.set_default = lambda x: None |
| job.bootloader.boot_once = lambda x: None |
| |
| |
| def find_test_list(): |
| test_list = DEFAULT_TEST_LIST_PATH |
| if os.path.exists(test_list): |
| return test_list |
| # try platform specific list |
| test_list = ('%s.%s' % (DEFAULT_TEST_LIST_PATH, |
| os.popen("crossystem arch").read().strip())) |
| if os.path.exists(test_list): |
| factory.log('Using platform-specific test list: ' + test_list) |
| return test_list |
| factory.log('ERROR: Cannot find any test list.') |
| |
| def start_state_server(): |
| state_proc_args = [FACTORY_STATE_SERVER_PATH] |
| factory.log('starting state server %s' % repr(state_proc_args)) |
| sp = subprocess.Popen(state_proc_args, stdout=subprocess.PIPE) |
| factory.log('waiting for state server to come up...') |
| factory.log('got message from server : %s' % sp.stdout.readline().strip()) |
| return sp |
| |
| def start_ui(): |
| ui_proc_args = [FACTORY_UI_PATH, find_test_list(), |
| STATUS_FILE_PATH, str(os.getpid())] |
| factory.log('starting ui %s' % repr(ui_proc_args)) |
| sp = subprocess.Popen(ui_proc_args, stdin=subprocess.PIPE, |
| stdout=subprocess.PIPE) |
| factory.log('waiting for ui to come up...') |
| factory.log('got message from ui : %s' % repr(sp.stdout.readline().strip())) |
| # TODO(hungte) define some protocol so that we know the UI failed to start. |
| return sp |
| |
| def stop_process(process, caption): |
| try: |
| factory.log('stopping %s process %d...' % (caption, process.pid)) |
| process.terminate() |
| process.kill() |
| process.wait() |
| factory.log('%s process ended with code: %d' % (caption, |
| process.returncode)) |
| except: |
| factory.log('WARNING: failed to stop %s process. ignored.' % caption) |
| |
| def step_reboot_seq(next_subtest_info, total_iterations, i=0): |
| if i < total_iterations: |
| job.next_step_prepend([step_reboot_seq, next_subtest_info, |
| total_iterations, i + 1]) |
| factory.log('rebooting (iteration %d of %d)' % (i, total_iterations)) |
| job.reboot() |
| else: |
| step_init(next_subtest_info) |
| |
| def find_next_test_to_run(status_map, test_db): |
| test = status_map.next_untested() |
| if test: |
| return test |
| # XXX When the status_map is corrupted, we may need to hack the list and |
| # force some test to run. Currently we assume |
| # factory.REVIEW_INFORMATION_TEST_UNIQUE_NAME is always runnable. |
| review_test = test_db.get_test_by_unique_name( |
| factory.REVIEW_INFORMATION_TEST_UNIQUE_NAME) |
| if review_test: |
| factory.log("HACK: Running %s" % str(review_test)) |
| return review_test |
| return None |
| |
| def step_init(next_subtest_info=None): |
| job.next_step([step_init]) |
| factory.log('TEST_LIST:\n%s' % pprint.pformat(TEST_LIST)) |
| ui_process = None |
| state_server_process = None |
| |
| class FactoryUINotifyObject(object): |
| def __init__(self, notify_fd): |
| self.notify_fd = notify_fd |
| |
| def notify(self, test, status): |
| self.notify_fd.write(' ') |
| self.notify_fd.flush() |
| try: |
| state_server_process = start_state_server() |
| ui_process = start_ui() |
| ui_notify_fd = ui_process.stdin |
| ui_notify_obj = FactoryUINotifyObject(ui_notify_fd) |
| main_loop(next_subtest_info, ui_notify_obj.notify) |
| except error.JobContinue: |
| # thrown by autotest framework for reboot. |
| factory.log('JobContinue: System Reboot') |
| raise |
| except error.JobComplete: |
| factory.log('JobComplete: Factory Test Complete') |
| raise |
| except: |
| factory.log('suite_Factory failed with exception: ' + |
| traceback.format_exc()) |
| # To fix a broken status tree, we may force a step_init here: |
| # job.next_step([step_init]) |
| raise |
| finally: |
| stop_process(ui_process, 'ui') |
| stop_process(state_server_process, 'state server') |
| |
| def main_loop(next_subtest_info, notify_callback): |
| test_db = factory.TestDatabase(TEST_LIST) |
| status_map = factory.StatusMap(TEST_LIST, STATUS_FILE_PATH, |
| status_change_callback=notify_callback) |
| |
| # initialize factory test states |
| state_instance = factory.get_state_instance() |
| # TODO(hungte) maybe we should merge this into StatusMap or ControlState |
| register_test_list = [test_db.get_unique_id_str(test) |
| for test in test_db.get_all_tests()] |
| if state_instance.register_tests(register_test_list, factory.UNTESTED): |
| factory.log('Registered new test list.') |
| else: |
| factory.log('Continue with existing test list.') |
| |
| # any 'active' tests should be fail now. |
| for test in test_db.get_all_tests(): |
| status = status_map.lookup_status(test) |
| # factory.log('test %s: %s' % (test_db.get_unique_id_str(test), status)) |
| if status == factory.UNTESTED: |
| continue |
| elif isinstance(test, factory.InformationScreen): |
| status_map.update(test, factory.UNTESTED) |
| elif ((not isinstance(test, factory.AutomatedRebootSubTest)) and |
| status == factory.ACTIVE): |
| status_map.update(test, factory.FAILED, 'Unknown (reboot?)') |
| |
| control_state = factory.ControlState( |
| job, TEST_LIST, test_db, status_map, STATUS_FILE_PATH, |
| parallel.fork_nuke_subprocess) |
| |
| def run_automated_sequence(test, start_index=0): |
| for subtest in test.subtest_list[start_index:]: |
| if isinstance(subtest, factory.AutomatedRebootSubTest): |
| next_subtest_info = (test_db.get_tag_prefix(subtest)) |
| step_reboot_seq(next_subtest_info, subtest.iterations) |
| else: |
| shortcut_target = control_state.run_test(subtest) |
| if shortcut_target: |
| return shortcut_target |
| if status_map.lookup_status(test) == factory.FAILED: |
| return None |
| |
| if next_subtest_info: |
| tag_prefix = next_subtest_info |
| reboot_subtest = test_db.get_subtest_by_tag_prefix(tag_prefix) |
| control_state.run_test(reboot_subtest) |
| automated_seq = test_db.get_subtest_parent(reboot_subtest) |
| rst_index = automated_seq.subtest_list.index(reboot_subtest) |
| run_automated_sequence(automated_seq, start_index=rst_index + 1) |
| |
| test = find_next_test_to_run(status_map, test_db) |
| while test is not None: |
| unique_id_str = test_db.get_unique_id_str(test) |
| factory.log('control running test %s' % unique_id_str) |
| if isinstance(test, factory.AutomatedSequence): |
| shortcut_target = run_automated_sequence(test) |
| else: |
| shortcut_target = control_state.run_test(test) |
| test = shortcut_target or find_next_test_to_run(status_map, test_db) |
| |
| |
| # This exec defines TEST_LIST in global scope. |
| execfile(find_test_list()) |