blob: 1e928224aa2bf8c679f6dd039c7f88ac316ba613 [file] [log] [blame]
# -*- coding: utf-8 -*-
#
# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
AUTHOR = "Chrome OS Team"
NAME = "Factory"
TIME = "LONG"
TEST_CATEGORY = "Functional"
TEST_CLASS = "suite"
TEST_TYPE = "client"
DOC = """
This suite executed all of the factory tests, and incorporates an
X-windows GTK-based UI to highlight testing results and to allow
factory operators to switch between tests, and to re-run tests, all
on-demand via keyboard shortcuts.
The UI is implemented as a seperate process (see _FACTORY_UI_PATH),
which means that interprocess communication is needed between this
control, the UI, and the tests (which are forked children of this
control process). """
import imp
import os
import pprint
import signal
import traceback
imp.load_source('common', job.autodir + '/bin/common.py')
from autotest_lib.client.bin import factory
from autotest_lib.client.bin import parallel
FACTORY_UI_PATH = job.autodir + '/bin/factory_ui'
FACTORY_STATE_SERVER_PATH = job.autodir + '/bin/factory_state.py'
STATUS_FILE_PATH = job.autodir + '/results/default/status'
DEFAULT_TEST_LIST_PATH = job.autodir + '/site_tests/suite_Factory/test_list'
# Hack to grab the pid for forked tests.
from autotest_lib.client.bin.parallel import fork_waitfor as orig_fork_waitfor
def factory_fork_waitfor(tmp, pid):
factory.set_shared_data('active_test_data', (tmp, pid))
(pid, status) = os.waitpid(pid, 0)
parallel._check_for_subprocess_exception(tmp, pid)
if status == signal.SIGTERM:
raise error.TestError('Test TERMINATED (return code = %s),\n'
'Maybe caused by hitting keyboard shotcut.' %
status)
elif status:
raise error.TestError("Test FAILED with return code: %d" % (status))
parallel.fork_waitfor = factory_fork_waitfor
# These definitions are expose these classes directly into this
# namespace, so that the following exec'ed file can have cleaner
# syntax. These are done in this fashion, as opposed to "from factory
# import <class>" to work-around Python namespace wackiness -- the
# from syntax does not work, creating new classes.
OperatorTest = factory.OperatorTest
InformationScreen = factory.InformationScreen
AutomatedSequence = factory.AutomatedSequence
AutomatedSubTest = factory.AutomatedSubTest
AutomatedRebootSubTest = factory.AutomatedRebootSubTest
# Hack to work around autotest's obsession with GRUB.
job.bootloader.set_default = lambda x: None
job.bootloader.boot_once = lambda x: None
def find_test_list():
test_list = DEFAULT_TEST_LIST_PATH
if os.path.exists(test_list):
return test_list
# try platform specific list
test_list = ('%s.%s' % (DEFAULT_TEST_LIST_PATH,
os.popen("crossystem arch").read().strip()))
if os.path.exists(test_list):
factory.log('Using platform-specific test list: ' + test_list)
return test_list
factory.log('ERROR: Cannot find any test list.')
def start_state_server():
state_proc_args = [FACTORY_STATE_SERVER_PATH]
factory.log('starting state server %s' % repr(state_proc_args))
sp = subprocess.Popen(state_proc_args, stdout=subprocess.PIPE)
factory.log('waiting for state server to come up...')
factory.log('got message from server : %s' % sp.stdout.readline().strip())
return sp
def start_ui():
ui_proc_args = [FACTORY_UI_PATH, find_test_list(),
STATUS_FILE_PATH, str(os.getpid())]
factory.log('starting ui %s' % repr(ui_proc_args))
sp = subprocess.Popen(ui_proc_args, stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
factory.log('waiting for ui to come up...')
factory.log('got message from ui : %s' % repr(sp.stdout.readline().strip()))
# TODO(hungte) define some protocol so that we know the UI failed to start.
return sp
def stop_process(process, caption):
try:
factory.log('stopping %s process %d...' % (caption, process.pid))
process.terminate()
process.kill()
process.wait()
factory.log('%s process ended with code: %d' % (caption,
process.returncode))
except:
factory.log('WARNING: failed to stop %s process. ignored.' % caption)
def step_reboot_seq(next_subtest_info, total_iterations, i=0):
if i < total_iterations:
job.next_step_prepend([step_reboot_seq, next_subtest_info,
total_iterations, i + 1])
factory.log('rebooting (iteration %d of %d)' % (i, total_iterations))
job.reboot()
else:
step_init(next_subtest_info)
def find_next_test_to_run(status_map, test_db):
test = status_map.next_untested()
if test:
return test
# XXX When the status_map is corrupted, we may need to hack the list and
# force some test to run. Currently we assume
# factory.REVIEW_INFORMATION_TEST_UNIQUE_NAME is always runnable.
review_test = test_db.get_test_by_unique_name(
factory.REVIEW_INFORMATION_TEST_UNIQUE_NAME)
if review_test:
factory.log("HACK: Running %s" % str(review_test))
return review_test
return None
def step_init(next_subtest_info=None):
job.next_step([step_init])
factory.log('TEST_LIST:\n%s' % pprint.pformat(TEST_LIST))
ui_process = None
state_server_process = None
class FactoryUINotifyObject(object):
def __init__(self, notify_fd):
self.notify_fd = notify_fd
def notify(self, test, status):
self.notify_fd.write(' ')
self.notify_fd.flush()
try:
state_server_process = start_state_server()
ui_process = start_ui()
ui_notify_fd = ui_process.stdin
ui_notify_obj = FactoryUINotifyObject(ui_notify_fd)
main_loop(next_subtest_info, ui_notify_obj.notify)
except error.JobContinue:
# thrown by autotest framework for reboot.
factory.log('JobContinue: System Reboot')
raise
except error.JobComplete:
factory.log('JobComplete: Factory Test Complete')
raise
except:
factory.log('suite_Factory failed with exception: ' +
traceback.format_exc())
# To fix a broken status tree, we may force a step_init here:
# job.next_step([step_init])
raise
finally:
stop_process(ui_process, 'ui')
stop_process(state_server_process, 'state server')
def main_loop(next_subtest_info, notify_callback):
test_db = factory.TestDatabase(TEST_LIST)
status_map = factory.StatusMap(TEST_LIST, STATUS_FILE_PATH,
status_change_callback=notify_callback)
# initialize factory test states
state_instance = factory.get_state_instance()
# TODO(hungte) maybe we should merge this into StatusMap or ControlState
register_test_list = [test_db.get_unique_id_str(test)
for test in test_db.get_all_tests()]
if state_instance.register_tests(register_test_list, factory.UNTESTED):
factory.log('Registered new test list.')
else:
factory.log('Continue with existing test list.')
# any 'active' tests should be fail now.
for test in test_db.get_all_tests():
status = status_map.lookup_status(test)
# factory.log('test %s: %s' % (test_db.get_unique_id_str(test), status))
if status == factory.UNTESTED:
continue
elif isinstance(test, factory.InformationScreen):
status_map.update(test, factory.UNTESTED)
elif ((not isinstance(test, factory.AutomatedRebootSubTest)) and
status == factory.ACTIVE):
status_map.update(test, factory.FAILED, 'Unknown (reboot?)')
control_state = factory.ControlState(
job, TEST_LIST, test_db, status_map, STATUS_FILE_PATH,
parallel.fork_nuke_subprocess)
def run_automated_sequence(test, start_index=0):
for subtest in test.subtest_list[start_index:]:
if isinstance(subtest, factory.AutomatedRebootSubTest):
next_subtest_info = (test_db.get_tag_prefix(subtest))
step_reboot_seq(next_subtest_info, subtest.iterations)
else:
shortcut_target = control_state.run_test(subtest)
if shortcut_target:
return shortcut_target
if status_map.lookup_status(test) == factory.FAILED:
return None
if next_subtest_info:
tag_prefix = next_subtest_info
reboot_subtest = test_db.get_subtest_by_tag_prefix(tag_prefix)
control_state.run_test(reboot_subtest)
automated_seq = test_db.get_subtest_parent(reboot_subtest)
rst_index = automated_seq.subtest_list.index(reboot_subtest)
run_automated_sequence(automated_seq, start_index=rst_index + 1)
test = find_next_test_to_run(status_map, test_db)
while test is not None:
unique_id_str = test_db.get_unique_id_str(test)
factory.log('control running test %s' % unique_id_str)
if isinstance(test, factory.AutomatedSequence):
shortcut_target = run_automated_sequence(test)
else:
shortcut_target = control_state.run_test(test)
test = shortcut_target or find_next_test_to_run(status_map, test_db)
# This exec defines TEST_LIST in global scope.
execfile(find_test_list())