blob: f763c0ebb4978781d424c88dfa39d5c89f7de306 [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
'''
This library provides common types and routines for the factory test
infrastructure. This library explicitly does not import gtk, to
allow its use by the autotest control process.
To log to the factory console, use:
from autotest_lib.client.cros import factory
factory.console.info('...') # Or warn, or error
'''
import getpass
import logging
import os
import sys
SCRIPT_PATH = os.path.realpath(__file__)
CROS_FACTORY_LIB_PATH = os.path.dirname(SCRIPT_PATH)
CLIENT_PATH = os.path.realpath(os.path.join(CROS_FACTORY_LIB_PATH, '..', '..'))
FACTORY_STATE_VERSION = 2
class TestListError(Exception):
pass
def in_chroot():
'''Returns True if currently in the chroot.'''
return 'CROS_WORKON_SRCROOT' in os.environ
def get_log_root():
'''Returns the root for logging and state.
This is usually /var/log, or /tmp/factory.$USER if in the chroot, but may be
overridden by the CROS_FACTORY_LOG_ROOT environment variable.
'''
ret = os.environ.get('CROS_FACTORY_LOG_ROOT')
if ret:
return ret
if in_chroot():
return '/tmp/factory.%s' % getpass.getuser()
return '/var/log'
def get_state_root():
'''Returns the root for all factory state.'''
return os.path.join(
get_log_root(), 'factory_state.v%d' % FACTORY_STATE_VERSION)
CONSOLE_LOG_PATH = os.path.join(get_log_root(), 'console.log')
_state_instance = None
def get_current_test_path():
# Returns the path of the currently executing test, if any.
return os.environ.get("CROS_FACTORY_TEST_PATH")
def get_lsb_data():
"""Reads all key-value pairs from system lsb-* configuration files."""
# TODO(hungte) Re-implement using regex.
# lsb-* file format:
# [#]KEY="VALUE DATA"
lsb_files = ('/etc/lsb-release',
'/usr/local/etc/lsb-release',
'/usr/local/etc/lsb-factory')
def unquote(entry):
for c in ('"', "'"):
if entry.startswith(c) and entry.endswith(c):
return entry[1:-1]
return entry
data = dict()
for lsb_file in lsb_files:
if not os.path.exists(lsb_file):
continue
with open(lsb_file, "rt") as lsb_handle:
for line in lsb_handle.readlines():
line = line.strip()
if ('=' not in line) or line.startswith('#'):
continue
(key, value) = line.split('=', 1)
data[unquote(key)] = unquote(value)
return data
def _init_console_log():
handler = logging.FileHandler(CONSOLE_LOG_PATH, "a", delay=True)
log_format = '[%(levelname)s] %(message)s'
test_path = get_current_test_path()
if test_path:
log_format = test_path + ': ' + log_format
handler.setFormatter(logging.Formatter(log_format))
ret = logging.getLogger("console")
ret.addHandler(handler)
ret.setLevel(logging.INFO)
return ret
console = _init_console_log()
def std_repr(obj, extra=[], excluded_keys=[], true_only=False):
'''
Returns the representation of an object including its properties.
@param extra: Extra items to include in the representation.
@param excluded_keys: Keys not to include in the representation.
@param true_only: Whether to include only values that evaluate to
true.
'''
# pylint: disable=W0102
return (obj.__class__.__name__ + '(' +
', '.join(extra +
['%s=%s' % (k, repr(getattr(obj, k)))
for k in sorted(obj.__dict__.keys())
if k[0] != '_' and k not in excluded_keys and
(not true_only or getattr(obj, k))])
+ ')')
def log(s):
'''
Logs a message to the console. Deprecated; use the 'console'
property instead.
TODO(jsalz): Remove references throughout factory tests.
'''
console.info(s)
def get_state_instance():
'''
Returns a cached factory state client instance.
'''
# Delay loading modules to prevent circular dependency.
import factory_common
from autotest_lib.client.cros.factory import state
global _state_instance # pylint: disable=W0603
if _state_instance is None:
_state_instance = state.get_instance()
return _state_instance
def get_shared_data(key, default=None):
if not get_state_instance().has_shared_data(key):
return default
return get_state_instance().get_shared_data(key)
def set_shared_data(*key_value_pairs):
return get_state_instance().set_shared_data(*key_value_pairs)
def has_shared_data(key):
return get_state_instance().has_shared_data(key)
def del_shared_data(key):
return get_state_instance().del_shared_data(key)
def read_test_list(path=None, state_instance=None, text=None):
if len([x for x in [path, text] if x]) != 1:
raise TestListError('Exactly one of path and text must be set')
test_list_locals = {}
# Import test classes into the evaluation namespace
for (k, v) in dict(globals()).iteritems():
if type(v) == type and issubclass(v, FactoryTest):
test_list_locals[k] = v
options = Options()
test_list_locals['options'] = options
if path:
execfile(path, {}, test_list_locals)
else:
exec text in {}, test_list_locals
assert 'TEST_LIST' in test_list_locals, (
'Test list %s does not define TEST_LIST' % (path or '<text>'))
options.check_valid()
return FactoryTestList(test_list_locals['TEST_LIST'],
state_instance or get_state_instance(),
options)
_inited_logging = False
def init_logging(prefix=None, verbose=False):
'''
Initializes logging.
@param prefix: A prefix to display for each log line, e.g., the program
name.
@param verbose: True for debug logging, false for info logging.
'''
global _inited_logging # pylint: disable=W0603
assert not _inited_logging, "May only call init_logging once"
_inited_logging = True
if not prefix:
prefix = os.path.basename(sys.argv[0])
# Make sure that nothing else has initialized logging yet (e.g.,
# autotest, whose logging_config does basicConfig).
assert not logging.getLogger().handlers, (
"Logging has already been initialized")
logging.basicConfig(
format=('[%(levelname)s] ' + prefix +
' %(filename)s:%(lineno)d %(asctime)s.%(msecs)03d %(message)s'),
level=logging.DEBUG if verbose else logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S')
logging.debug('Initialized logging')
class Options(object):
'''Test list options.
These may be set by assigning to the options variable in a test list (e.g.,
'options.auto_run_on_start = False').
'''
# Perform an implicit auto-run when the test driver starts up?
auto_run_on_start = True
# Perform an implicit auto-run when the user switches to any test?
auto_run_on_keypress = False
# Default UI language
ui_lang = 'en'
def check_valid(self):
'''Throws a TestListError if there are any invalid options.'''
# Make sure no errant options, or options with weird types,
# were set.
default_options = Options()
for key in sorted(self.__dict__):
if key.startswith('_'):
continue
if not hasattr(default_options, key):
raise TestListError('Unknown option %s' % key)
value = getattr(self, key)
default_value = getattr(default_options, key)
if type(value) != type(default_value):
raise TestListError(
'Option %s has unexpected type %s (should be %s)' % (
key, type(value), type(default_value)))
class TestState(object):
'''
The complete state of a test.
@property status: The status of the test (one of ACTIVE, PASSED,
FAILED, or UNTESTED).
@property count: The number of times the test has been run.
@property error_msg: The last error message that caused a test failure.
@property shutdown_count: The next of times the test has caused a shutdown.
@property visible: Whether the test is the currently visible test.
'''
ACTIVE = 'ACTIVE'
PASSED = 'PASSED'
FAILED = 'FAILED'
UNTESTED = 'UNTESTED'
def __init__(self, status=UNTESTED, count=0, visible=False, error_msg=None,
shutdown_count=0):
self.status = status
self.count = count
self.visible = visible
self.error_msg = error_msg
self.shutdown_count = shutdown_count
def __repr__(self):
return std_repr(self)
def update(self, status=None, increment_count=0, error_msg=None,
shutdown_count=None, increment_shutdown_count=0, visible=None):
'''
Updates the state of a test.
@param status: The new status of the test.
@param increment_count: An amount by which to increment count.
@param error_msg: If non-None, the new error message for the test.
@param shutdown_count: If non-None, the new shutdown count.
@param increment_shutdown_count: An amount by which to increment
shutdown_count.
@param visible: If non-None, whether the test should become visible.
Returns True if anything was changed.
'''
old_dict = dict(self.__dict__)
if status:
self.status = status
if error_msg is not None:
self.error_msg = error_msg
if shutdown_count is not None:
self.shutdown_count = shutdown_count
if visible is not None:
self.visible = visible
self.count += increment_count
self.shutdown_count += increment_shutdown_count
return self.__dict__ != old_dict
@classmethod
def from_dict_or_object(cls, obj):
if type(obj) == dict:
return TestState(**obj)
else:
assert type(obj) == TestState, type(obj)
return obj
class FactoryTest(object):
'''
A factory test object.
Factory tests are stored in a tree. Each node has an id (unique
among its siblings). Each node also has a path (unique throughout the
tree), constructed by joining the IDs of all the test's ancestors
with a '.' delimiter.
'''
# If True, the test never fails, but only returns to an untested state.
never_fails = False
# If True, the test has a UI, so if it is active factory_ui will not
# display the summary of running tests.
has_ui = False
REPR_FIELDS = ['id', 'autotest_name', 'pytest_name', 'dargs',
'backgroundable', 'never_fails']
def __init__(self,
label_en='',
label_zh='',
autotest_name=None,
pytest_name=None,
kbd_shortcut=None,
dargs=None,
backgroundable=False,
subtests=None,
id=None, # pylint: disable=W0622
has_ui=None,
never_fails=None,
_root=None):
'''
Constructor.
@param label_en: An English label.
@param label_zh: A Chinese label.
@param autotest_name: The name of the autotest to run.
@param pytest_name: The name of the pytest to run (relative to
autotest_lib.client.cros.factory.tests).
@param kbd_shortcut: The keyboard shortcut for the test.
@param dargs: Autotest arguments.
@param backgroundable: Whether the test may run in the background.
@param subtests: A list of tests to run inside this test.
@param id: A unique ID for the test (defaults to the autotest name).
@param has_ui: True if the test has a UI. (This defaults to True for
OperatorTest.) If has_ui is not True, then when the test is
running, the statuses of the test and its siblings will be shown in
the test UI area instead.
@param never_fails: True if the test never fails, but only returns to an
untested state.
@param _root: True only if this is the root node (for internal use
only).
'''
self.label_en = label_en
self.label_zh = label_zh
self.autotest_name = autotest_name
self.pytest_name = pytest_name
self.kbd_shortcut = kbd_shortcut.lower() if kbd_shortcut else None
self.dargs = dargs or {}
self.backgroundable = backgroundable
self.subtests = subtests or []
self.path = ''
self.parent = None
self.root = None
assert not (autotest_name and pytest_name), (
'No more than one of autotest_name, pytest_name must be specified')
if _root:
self.id = None
else:
self.id = id or autotest_name or pytest_name.rpartition('.')[2]
assert self.id, (
'Tests require either an id or autotest name: %r' % self)
assert '.' not in self.id, (
'id cannot contain a period: %r' % self)
# Note that we check ID uniqueness in _init.
if has_ui is not None:
self.has_ui = has_ui
if never_fails is not None:
self.never_fails = never_fails
# Auto-assign label text.
if not self.label_en:
if self.id and (self.id != self.autotest_name):
self.label_en = self.id
elif self.autotest_name:
# autotest_name is type_NameInCamelCase.
self.label_en = self.autotest_name.partition('_')[2]
assert not ((autotest_name or pytest_name) and self.subtests), (
'Test %s may not have both an autotest and subtests' % self.id)
def to_struct(self):
'''Returns the node as a struct suitable for JSONification.'''
ret = dict(
(k, getattr(self, k))
for k in ['id', 'path', 'label_en', 'label_zh',
'kbd_shortcut', 'backgroundable'])
ret['subtests'] = [subtest.to_struct() for subtest in self.subtests]
return ret
def __repr__(self, recursive=False):
attrs = ['%s=%s' % (k, repr(getattr(self, k)))
for k in sorted(self.__dict__.keys())
if k in FactoryTest.REPR_FIELDS and getattr(self, k)]
if recursive and self.subtests:
indent = ' ' * (1 + self.path.count('.'))
attrs.append(
'subtests=[' +
('\n' +
',\n'.join([subtest.__repr__(recursive)
for subtest in self.subtests]
)).replace('\n', '\n' + indent)
+ '\n]')
return '%s(%s)' % (self.__class__.__name__, ', '.join(attrs))
def _init(self, prefix, path_map):
'''
Recursively assigns paths to this node and its children.
Also adds this node to the root's path_map.
'''
if self.parent:
self.root = self.parent.root
self.path = prefix + (self.id or '')
assert self.path not in path_map, 'Duplicate test path %s' % (self.path)
path_map[self.path] = self
for subtest in self.subtests:
subtest.parent = self
subtest._init((self.path + '.' if len(self.path) else ''), path_map)
def depth(self):
'''
Returns the depth of the node (0 for the root).
'''
return self.path.count('.') + (self.parent is not None)
def is_leaf(self):
'''
Returns true if this is a leaf node.
'''
return not self.subtests
def get_ancestors(self):
'''
Returns list of ancestors, ordered by seniority.
'''
if self.parent is not None:
return self.parent.get_ancestors() + [self.parent]
return []
def get_ancestor_groups(self):
'''
Returns list of ancestors that are groups, ordered by seniority.
'''
return [node for node in self.get_ancestors() if node.is_group()]
def get_state(self):
'''
Returns the current test state from the state instance.
'''
return TestState.from_dict_or_object(
self.root.state_instance.get_test_state(self.path))
def update_state(self, update_parent=True, status=None, **kw):
'''
Updates the test state.
See TestState.update for allowable kw arguments.
'''
if self.never_fails and status == TestState.FAILED:
status = TestState.UNTESTED
ret = TestState.from_dict_or_object(
self.root._update_test_state( # pylint: disable=W0212
self.path, status=status, **kw))
if update_parent and self.parent:
self.parent.update_status_from_children()
return ret
def update_status_from_children(self):
'''
Updates the status based on children's status.
A test is active if any children are active; else failed if
any children are failed; else untested if any children are
untested; else passed.
'''
if not self.subtests:
return
statuses = set([x.get_state().status for x in self.subtests])
# If there are any active tests, consider it active; if any failed,
# consider it failed, etc. The order is important!
# pylint: disable=W0631
for status in [TestState.ACTIVE, TestState.FAILED,
TestState.UNTESTED, TestState.PASSED]:
if status in statuses:
break
if status != self.get_state().status:
self.update_state(status=status)
def walk(self, in_order=False):
'''
Yields this test and each sub-test.
@param in_order: Whether to walk in-order. If False, walks depth-first.
'''
if in_order:
# Walking in order - yield self first.
yield self
for subtest in self.subtests:
for f in subtest.walk(in_order):
yield f
if not in_order:
# Walking depth first - yield self last.
yield self
def is_group(self):
'''
Returns true if this node is a test group.
'''
return isinstance(self, TestGroup)
def is_top_level_test(self):
'''
Returns true if this node is a top-level test.
A 'top-level test' is a test directly underneath the root or a
TestGroup, e.g., a node under which all tests must be run
together to be meaningful.
'''
return ((not self.is_group()) and
self.parent and
(self.parent == self.root or self.parent.is_group()))
def get_top_level_parent_or_group(self):
if self.is_group() or self.is_top_level_test() or not self.parent:
return self
return self.parent.get_top_level_parent_or_group()
def get_top_level_tests(self):
'''
Returns a list of top-level tests.
'''
return [node for node in self.walk()
if node.is_top_level_test()]
class FactoryTestList(FactoryTest):
'''
The root node for factory tests.
Properties:
path_map: A map from test paths to FactoryTest objects.
'''
def __init__(self, subtests, state_instance, options):
super(FactoryTestList, self).__init__(_root=True, subtests=subtests)
self.state_instance = state_instance
self.subtests = subtests
self.path_map = {}
self.root = self
self.state_change_callback = None
self.options = options
self._init('', self.path_map)
def get_all_tests(self):
'''
Returns all FactoryTest objects.
'''
return self.path_map.values()
def get_state_map(self):
'''
Returns a map of all FactoryTest objects to their TestStates.
'''
# The state instance may return a dict (for the XML/RPC proxy)
# or the TestState object itself. Convert accordingly.
return dict(
(self.lookup_path(k), TestState.from_dict_or_object(v))
for k, v in self.state_instance.get_test_states().iteritems())
def lookup_path(self, path):
'''
Looks up a test from its path.
'''
return self.path_map.get(path, None)
def _update_test_state(self, path, **kw):
'''
Updates a test state, invoking the state_change_callback if any.
Internal-only; clients should call update_state directly on the
appropriate TestState object.
'''
ret, changed = self.state_instance.update_test_state(path, **kw)
if changed and self.state_change_callback:
self.state_change_callback( # pylint: disable=E1102
self.lookup_path(path), ret)
return ret
class TestGroup(FactoryTest):
'''
A collection of related tests, shown together in RHS panel if one is active.
'''
pass
class FactoryAutotestTest(FactoryTest):
pass
class OperatorTest(FactoryAutotestTest):
has_ui = True
AutomatedSequence = FactoryTest
AutomatedSubTest = FactoryAutotestTest
class ShutdownStep(AutomatedSubTest):
'''A shutdown (halt or reboot) step.
Properties:
iterations: The number of times to reboot.
operation: The command to run to perform the shutdown
(REBOOT or HALT).
'''
REBOOT = 'reboot'
HALT = 'halt'
def __init__(self, operation, iterations=1, **kw):
kw.setdefault('id', operation)
super(ShutdownStep, self).__init__(**kw)
assert not self.autotest_name, (
'Reboot/halt steps may not have an autotest')
assert not self.subtests, 'Reboot/halt steps may not have subtests'
assert not self.backgroundable, (
'Reboot/halt steps may not be backgroundable')
assert iterations > 0
self.iterations = iterations
assert operation in [self.REBOOT, self.HALT]
self.operation = operation
class HaltStep(ShutdownStep):
'''Halts the machine.'''
def __init__(self, **kw):
super(HaltStep, self).__init__(operation=ShutdownStep.HALT, **kw)
class RebootStep(ShutdownStep):
'''Reboots the machine.'''
def __init__(self, **kw):
super(RebootStep, self).__init__(operation=ShutdownStep.REBOOT, **kw)
AutomatedRebootSubTest = RebootStep