blob: 8302bd777696120b77f1cfc7c0b9c1b542e51472 [file] [log] [blame]
# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# DESCRIPTION :
#
# This library provides common types and routines for the factory ui
# infrastructure. This library explicitly does not import gtk, to
# allow its use by the autotest control process.
import signal
import subprocess
import sys
import time
import factory_state
ACTIVE = 'ACTIVE'
PASSED = 'PASS'
FAILED = 'FAIL'
UNTESTED = 'UNTESTED'
LOG_PATH = '/var/log/factory.log'
CONSOLE_LOG_PATH = '/tmp/factory_console.log'
DATA_PREFIX = 'FACTORY_DATA:'
LAST_PROBED_HWID_NAME = 'last_probed_hwid'
REVIEW_INFORMATION_TEST_UNIQUE_NAME = 'ReviewInformation'
_state_instance = None
def log(s):
print >> sys.stderr, 'FACTORY: ' + s
def get_state_instance():
''' A quick way to get a (cached) instance for factory_state '''
global _state_instance
if _state_instance is None:
_state_instance = factory_state.get_instance()
return _state_instance
def get_shared_data(key):
return get_state_instance().get_shared(key)
def set_shared_data(key, value):
return get_state_instance().set_shared(key, value)
class FactoryTest:
def __repr__(self):
d = ['%s=%s' % (l, repr(v))
for l, v in self.__dict__.items()
if l != 'self']
c = ('%s' % self.__class__).rpartition('.')[2]
return '%s(%s)' % (c, ','.join(d))
class FactoryAutotestTest(FactoryTest):
# Placeholder parent for tests with autotest_name fields.
pass
class OperatorTest(FactoryAutotestTest):
def __init__(self, label_en='', label_zw='', autotest_name=None,
kbd_shortcut=None, dargs={}, drop_caches=False,
unique_name=None):
self.__dict__.update(vars())
class InformationScreen(OperatorTest):
# These tests never pass or fail, just return to untested state.
pass
class AutomatedSequence(FactoryTest):
def __init__(self, label_en='', label_zw='', subtest_tag_prefix=None,
kbd_shortcut=None, subtest_list=[], unique_name=None):
self.__dict__.update(vars())
class AutomatedSubTest(FactoryAutotestTest):
def __init__(self, label_en='', label_zw='', autotest_name=None,
dargs={}, drop_caches=False, unique_name=None):
self.__dict__.update(vars())
class AutomatedRebootSubTest(AutomatedSubTest):
def __init__(self, label_en='', label_zw='', iterations=None,
autotest_name='factory_RebootStub', dargs={},
drop_caches=False, unique_name=None):
self.__dict__.update(vars())
class KbdShortcutDatabase:
'''Track the bindings between keyboard shortcuts and tests.'''
def __init__(self, test_list, test_db):
self._kbd_shortcut_map = dict(
(test.kbd_shortcut, test) for test in test_list)
# Validate keyboard shortcut uniqueness.
assert(None not in self._kbd_shortcut_map)
delta = set(test_list) - set(self._kbd_shortcut_map.values())
for test in delta:
collision = self._kbd_shortcut_map[test.kbd_shortcut]
log('ERROR: tests %s and %s both have kbd_shortcut %s' %
(test_db.get_unique_id_str(test),
test_db.get_unique_id_str(collision),
test.kbd_shortcut))
assert not delta
def get_shortcut_keys(self):
return set(self._kbd_shortcut_map)
def lookup_test(self, kbd_shortcut):
return self._kbd_shortcut_map.get(kbd_shortcut)
class TestDatabase:
'''This class parses a test_list and allows searching for tests or
their attributes. It also generates tag_prefix values for each
runnable test. Autotest allows tests with the same name to be
uniquely identified by the "tag" parameter to the job.run_test()
function. The factory system generates this value as the
combination of a tag_prefix value and a counter that tracks how
many times a test has run. Tests are identified uniquely in the
status log by both the tag_prefix and their autotest name. These
values are referred to here as the unique_details for the test.'''
def __init__(self, test_list):
self._test_list = test_list
self._subtest_set = set()
self._subtest_parent_map = {}
self._tag_prefix_map = {}
for test in test_list:
if not isinstance(test, AutomatedSequence):
self._tag_prefix_map[test] = test.kbd_shortcut
continue
step_count = 1
for subtest in test.subtest_list:
self._subtest_parent_map[subtest] = test
self._subtest_set.add(subtest)
if not isinstance(subtest, FactoryAutotestTest):
continue
tag_prefix = ('%s_s%d' % (test.subtest_tag_prefix, step_count))
self._tag_prefix_map[subtest] = tag_prefix
step_count += 1
self._tag_prefix_to_subtest_map = dict(
(self._tag_prefix_map[st], st) for st in self._subtest_set)
self._unique_details_map = dict(
(self.get_unique_details(t), t) for t in self.get_all_tests()
if isinstance(t, FactoryAutotestTest))
self._unique_name_map = dict(
(t.unique_name, t) for t in self.get_all_tests()
if getattr(t, 'unique_name', None) is not None)
self._unique_id_str_map = dict(
(self.get_unique_id_str(t), t) for t in self.get_all_tests())
def get_test_by_unique_details(self, autotest_name, tag_prefix):
return self._unique_details_map.get((autotest_name, tag_prefix))
def get_tag_prefix(self, test):
return self._tag_prefix_map[test]
def get_unique_details(self, test):
return (test.autotest_name, self.get_tag_prefix(test))
def get_test_by_unique_name(self, unique_name):
return self._unique_name_map.get(unique_name)
def get_test_by_unique_id_str(self, unique_id_str):
return self._unique_id_str_map.get(unique_id_str)
def get_subtest_parent(self, test):
return self._subtest_parent_map.get(test)
def get_subtest_by_tag_prefix(self, tag_prefix):
return self._tag_prefix_to_subtest_map.get(tag_prefix)
def get_automated_sequences(self):
return [test for test in self._test_list
if isinstance(test, AutomatedSequence)]
def get_all_tests(self):
return set(self._test_list) | self._subtest_set
def get_unique_id_str(self, test):
'''Intended primarily to identify tests for debugging.'''
if test is None:
return None
if isinstance(test, AutomatedSequence):
return test.subtest_tag_prefix
return '%s.%s' % self.get_unique_details(test)
class StatusMap:
'''
Manipulates the status of factory tests by providing query and set
operations to the status, counter, and error messages associated with tests
in test database, by wrapping calls to factory_state.
'''
def __init__(self, test_list, status_file_path, test_db=None,
status_change_callback=None):
self._state = get_state_instance()
self._test_list = [test for test in test_list]
self._test_db = test_db if test_db else TestDatabase(test_list)
self._status_change_callback = status_change_callback
def lookup_status(self, test):
return self._state.lookup_test_status(
self._test_db.get_unique_id_str(test))
def lookup_count(self, test):
return self._state.lookup_test_count(
self._test_db.get_unique_id_str(test))
def increase_count(self, test):
return self._state.increase_test_count(
self._test_db.get_unique_id_str(test))
def lookup_error_msg(self, test):
return self._state.lookup_test_error_msg(
self._test_db.get_unique_id_str(test))
def filter_by_status(self, target_status):
# TODO(hungte) use self._state.filter_by_status?
comp = (isinstance(target_status, list) and
(lambda s: s in target_status) or
(lambda s: s == target_status))
return [test for test in self._test_db.get_all_tests()
if comp(self.lookup_status(test))]
def next_untested(self):
for test in self._test_list:
if self.lookup_status(test) == UNTESTED:
return test
return None
def get_active_top_level_test(self):
assert False, "not supported yet"
def _do_update(self, test, status, error_msg=None):
if isinstance(test, InformationScreen) and (status in [PASSED, FAILED]):
status = UNTESTED
unique_id_str = self._test_db.get_unique_id_str(test)
last_status = self.lookup_status(test)
if status != last_status:
parent_as = self._test_db.get_subtest_parent(test)
log('status change for %s : %s -> %s (as = %s)' %
(unique_id_str, last_status, status,
self._test_db.get_unique_id_str(parent_as)))
self._state.update_test_status(unique_id_str, status, error_msg)
if self._status_change_callback is not None:
self._status_change_callback(test, status)
def update(self, test, status, error_msg=None):
''' Updates the status information of a factory test.
If parameter 'test' is a AutomatedSubTest, also update its parent. '''
parent_as = self._test_db.get_subtest_parent(test)
self._do_update(test, status, error_msg)
if isinstance(parent_as, AutomatedSequence):
self.update_automated_sequence(parent_as, test)
def update_automated_sequence(self, test, sub_test):
rst_index = test.subtest_list.index(sub_test) + 1
for sub_test in test.subtest_list[rst_index:]:
if self.lookup_status(sub_test) != UNTESTED:
self._do_update(sub_test, UNTESTED)
subtest_status_set = set(self.lookup_status(subtest)
for subtest in test.subtest_list)
log('automated sequence %s status set = %s' % (
self._test_db.get_unique_id_str(test), repr(subtest_status_set)))
if len(subtest_status_set) == 1:
status = subtest_status_set.pop()
elif FAILED not in subtest_status_set:
status = ACTIVE
else:
status = FAILED
self._do_update(test, status)
class ControlState:
'''Track the state needed to run and terminate factory tests. The
shared data written to the factory log records the pid information
for each test as it gets run. If the factory UI sees a keyboard
shortcut event, it sends a SIGUSR1 event to the control process,
which then uses the log information to terminate the running
test.'''
def __init__(self, job, test_list, test_db, status_map,
status_file_path, nuke_fn):
self._job = job
self._status_map = status_map
self._kbd_shortcut_db = KbdShortcutDatabase(test_list, test_db)
self._test_db = test_db
self._std_dargs = {
'status_file_path' : status_file_path,
'test_list': test_list}
self._nuke_fn = nuke_fn
signal.signal(signal.SIGUSR1, self.kill_current_test_callback)
def kill_current_test_callback(self, signum, frame):
active_test_data = get_shared_data('active_test_data')
if active_test_data is not None:
log('SIGUSR1 ... KILLING active test %s' % repr(active_test_data))
self._nuke_fn(*active_test_data)
else:
log('SIGUSR1 ... KILLING NOTHING, no active test')
def process_fail_or_kbd_shortcut_activation(self, last_status=None):
kbd_shortcut = get_shared_data('activated_kbd_shortcut')
if kbd_shortcut is None:
if last_status != FAILED:
return None
log('last test failed, routing to review screen...')
return self._test_db.get_test_by_unique_name(
REVIEW_INFORMATION_TEST_UNIQUE_NAME)
target_test = self._kbd_shortcut_db.lookup_test(kbd_shortcut)
log('activated kbd_shortcut %s -> %s' % (
kbd_shortcut, self._test_db.get_unique_id_str(target_test)))
set_shared_data('activated_kbd_shortcut', None)
return target_test
def run_test(self, test, count=None):
if count == None:
count = self._status_map.lookup_count(test)
self._status_map.increase_count(test)
test_tag = '%s_%s' % (self._test_db.get_tag_prefix(test), count)
dargs = {}
dargs.update(test.dargs)
dargs.update(self._std_dargs)
dargs.update({'tag': test_tag,
'subtest_tag': test_tag})
self._job.drop_caches_between_iterations = test.drop_caches
self._status_map.update(test, ACTIVE, None)
status = FAILED
error_msg = None
if self._job.run_test(test.autotest_name, **dargs):
status = PASSED
elif hasattr(self._job, 'last_error'):
error_msg = str(self._job.last_error)
self._status_map.update(test, status, error_msg)
self._job.drop_caches_between_iterations = False
return self.process_fail_or_kbd_shortcut_activation(status)
def lookup_status_by_unique_name(unique_name, test_list, _=None):
"""Determine the status of given test. Somewhat heavyweight,
since it parses the status file."""
# TODO(hungte) we should deprecate this function and use StatusMap or
# factory_state to query directly.
test_db = TestDatabase(test_list)
test = test_db.get_test_by_unique_name(unique_name)
unique_id = test_db.get_unique_id_str(test)
return get_state_instance().lookup_test_status(unique_id)