blob: 885c068ca6140b050f5c21369d0737a909de3e78 [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import glob, logging, os, re, shutil, subprocess, sys, time
import auth_server, common, constants, cros_logging, cros_ui, cryptohome
import dns_server, login, ownership, pyauto_test
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error
class UITest(pyauto_test.PyAutoTest):
"""Base class for tests that drive some portion of the user interface.
By default subclasses will use the default remote credentials before
the run_once method is invoked, and will log out at the completion
of the test case even if an exception is thrown.
Subclasses can opt out of the automatic login by setting the member
variable 'auto_login' to False.
Subclasses can log in with arbitrary credentials by passing
the 'creds' parameter in their control file. See the documentation of
UITest.initialize for more details.
If your subclass overrides the initialize() or cleanup() methods, it
should make sure to invoke this class' version of those methods as well.
The standard super(...) function cannot be used for this, since the base
test class is not a 'new style' Python class.
"""
version = 1
skip_oobe = True
auto_login = True
fake_owner = True
username = None
password = None
# Processes that we know crash and are willing to ignore.
crash_blacklist = []
# ftrace-related files.
_ftrace_process_fork_event_enable_file = \
'/sys/kernel/debug/tracing/events/sched/sched_process_fork/enable'
_ftrace_process_fork_event_filter_file = \
'/sys/kernel/debug/tracing/events/sched/sched_process_fork/filter'
_ftrace_signal_generate_event_enable_file = \
'/sys/kernel/debug/tracing/events/signal/signal_generate/enable'
_ftrace_signal_generate_event_filter_file = \
'/sys/kernel/debug/tracing/events/signal/signal_generate/filter'
_ftrace_trace_file = '/sys/kernel/debug/tracing/trace'
_last_chrome_log = ''
def start_authserver(self, authenticator=None):
"""Spin up a local mock of the Google Accounts server, then spin up
a local fake DNS server and tell the networking stack to use it. This
will trick Chrome into talking to our mock when we login.
Subclasses can override this method to change this behavior.
"""
self._authServer = auth_server.GoogleAuthServer(
authenticator=authenticator)
self._authServer.run()
self._dnsServer = dns_server.LocalDns()
self._dnsServer.run()
def stop_authserver(self):
"""Tears down fake dns and fake Google Accounts server. If your
subclass does not create these objects, you will want to override this
method as well.
"""
if hasattr(self, '_authServer'):
self._authServer.stop()
del self._authServer
if hasattr(self, '_dnsServer'):
self._dnsServer.stop()
del self._dnsServer
def start_chrome_event_tracing(self):
"""Start tracing events of a chrome process being created or receiving a
signal.
"""
try:
# Clear the trace buffer.
utils.open_write_close(self._ftrace_trace_file, '')
# Trace only chrome process creation events, which we may later use
# to determine if a chrome process is killed by its parent.
utils.open_write_close(
self._ftrace_process_fork_event_filter_file,
'child_comm==chrome')
# Trace only chrome processes receiving any signal except for
# the uninteresting SIGPROF (sig 27 on x86 and arm).
utils.open_write_close(
self._ftrace_signal_generate_event_filter_file,
'comm==chrome && sig!=27')
# Enable the process_fork event tracing.
utils.open_write_close(
self._ftrace_process_fork_event_enable_file, '1')
# Enable the signal_generate event tracing.
utils.open_write_close(
self._ftrace_signal_generate_event_enable_file, '1')
except IOError as err:
logging.warning('Failed to start chrome signal tracing: %s', err)
def stop_chrome_event_tracing(self):
"""Stop tracing events of a chrome process being created or receiving a
signal.
"""
try:
# Disable the process_fork event tracing.
utils.open_write_close(
self._ftrace_process_fork_event_enable_file, '0')
# Disable the signal_generate event tracing.
utils.open_write_close(
self._ftrace_signal_generate_event_enable_file, '0')
# Clear the process_fork event filter.
utils.open_write_close(
self._ftrace_process_fork_event_filter_file, '0')
# Clear the signal_generate event filter.
utils.open_write_close(
self._ftrace_signal_generate_event_filter_file, '0')
# Dump the trace buffer to a log file.
trace_file = os.path.join(self.resultsdir, 'chrome_event_trace')
trace_data = utils.read_file(self._ftrace_trace_file)
utils.open_write_close(trace_file, trace_data)
except IOError as err:
logging.warning('Failed to stop chrome signal tracing: %s', err)
def start_tcpdump(self, iface):
"""Start tcpdump process, if not running already."""
if not hasattr(self, '_tcpdump'):
self._tcpdump = subprocess.Popen(
['tcpdump', '-i', iface, '-vv'], stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
def stop_tcpdump(self, fname_prefix):
"""Stop tcpdump process and save output to a new file."""
if hasattr(self, '_tcpdump'):
self._tcpdump.terminate()
# Save output to a new file
next_index = len(glob.glob(
os.path.join(self.resultsdir, '%s-*' % fname_prefix)))
tcpdump_file = os.path.join(
self.resultsdir, '%s-%d' % (fname_prefix, next_index))
logging.info('Saving tcpdump output to %s' % tcpdump_file)
utils.open_write_close(tcpdump_file, self._tcpdump.communicate()[0])
del self._tcpdump
def __log_all_processes(self, fname_prefix):
"""Log all processes to a file.
Args:
fname_prefix: Prefix of the log file.
"""
try:
next_index = len(glob.glob(
os.path.join(self.resultsdir, '%s-*' % fname_prefix)))
log_file = os.path.join(
self.resultsdir, '%s-%d' % (fname_prefix, next_index))
utils.open_write_close(log_file, utils.system_output('ps -eF'))
except (error.CmdError, IOError, OSError) as err:
logging.warning('Failed to log all processes: %s', err)
def __perform_ui_diagnostics(self):
"""Save diagnostic logs about UI.
This includes the output of:
$ initctl status ui
$ ps auxwww
"""
output_file = os.path.join(self.resultsdir, 'ui_diagnostics.txt')
with open(output_file, 'w') as output_fd:
print >> output_fd, time.asctime(), '\n'
cmd = 'initctl status ui'
print >> output_fd, '$ %s' % cmd
print >> output_fd, utils.system_output(cmd), '\n'
cmd = 'ps auxwww'
print >> output_fd, '$ %s' % cmd
print >> output_fd, utils.system_output(cmd), '\n'
logging.info('Saved UI diagnostics to %s' % output_file)
def __generate_coredumps(self, names):
"""Generate core dump files in results dir for given processes.
Note that the coredumps are forced via SIGBUS and the processes will be
terminated. Ideally we should use gdb gcore to create dumps
non-intrusively. However, the current dumps generated by gcore could not
be properly read back by gdb, i.e. no reasonable symbolized stack could
be generated.
Args:
names: A list of process names that need to be dumped.
"""
# Get all pids of named processes.
pids = []
for name in names:
# Get pids of given name, slice [1:] to skip ps's first line 'PID'
pids = pids + [ pid.strip() for pid in utils.system_output(
'ps -C %s -o pid' % name).splitlines()[1:]]
logging.info('Will force core dumps for the following pid: %s' %
' '.join(pids))
# Stop all processes so that forcing dump would change their state.
for pid in pids:
utils.system('kill -STOP %s' % pid)
# Force core dump.
for pid in pids:
utils.system('kill -BUS %s' % pid)
# Resume to let the core dump finish.
for pid in reversed(pids):
utils.system('kill -CONT %s' % pid)
def initialize(self, creds=None, is_creating_owner=False,
extra_chrome_flags=[], subtract_extra_chrome_flags=[],
*args, **kwargs):
"""Overridden from test.initialize() to log out and (maybe) log in.
If self.auto_login is True, this will automatically log in using the
credentials specified by 'creds' at startup, otherwise login will not
happen.
Regardless of the state of self.auto_login, the self.username and
self.password properties will be set to the credentials specified
by 'creds'.
Authentication is not performed against live servers. Instead, we spin
up a local DNS server that will lie and say that all sites resolve to
127.0.0.1. The DNS server tells flimflam via DBus that it should be
used to resolve addresses. We then spin up a local httpd that will
respond to queries at the Google Accounts endpoints. We clear the DNS
setting and tear down these servers in cleanup().
Args:
creds: String specifying the credentials for this test case. Can
be a named set of credentials as defined by
constants.CREDENTIALS, or a 'username:password' pair.
Defaults to None -- browse without signing-in.
is_creating_owner: If the test case is creating a new device owner.
extra_chrome_flags: Extra chrome flags to pass to chrome, if any.
subtract_extra_chrome_flags: Remove default flags passed to chrome
by pyauto, if any.
"""
# Mark /var/log/messages now; we'll run through all subsequent
# log messages at the end of the test and log info about processes that
# crashed.
self._log_reader = cros_logging.LogReader()
self._log_reader.set_start_by_current()
if creds:
self.start_authserver()
# Run tcpdump on 'lo' interface to investigate network
# issues in the lab during login.
self.start_tcpdump(iface='lo')
# Log all processes so that we can correlate PIDs to processes in
# the chrome signal trace.
self.__log_all_processes('processes--before-tracing')
# Start event tracing related to chrome processes.
self.start_chrome_event_tracing()
# We yearn for Chrome coredumps...
open(constants.CHROME_CORE_MAGIC_FILE, 'w').close()
# The UI must be taken down to ensure that no stale state persists.
cros_ui.stop()
(self.username, self.password) = self.__resolve_creds(creds)
# Ensure there's no stale cryptohome from previous tests.
try:
cryptohome.remove_all_vaults()
except cryptohome.ChromiumOSError as err:
logging.error(err)
# Fake ownership unless the test is explicitly testing owner creation.
if not is_creating_owner:
logging.info('Faking ownership...')
cros_ui.fake_ownership()
self.fake_owner = True
else:
logging.info('Erasing stale owner state.')
ownership.clear_ownership()
self.fake_owner = False
try:
cros_ui.start()
except:
self.__perform_ui_diagnostics()
if not login.wait_for_browser_exit('Chrome crashed during login'):
self.__generate_coredumps([constants.BROWSER])
raise
# Save name of the last chrome log before our test started.
log_files = glob.glob(constants.CHROME_LOG_DIR + '/chrome_*')
self._last_chrome_log = max(log_files) if log_files else ''
pyauto_test.PyAutoTest.initialize(
self, auto_login=False,
extra_chrome_flags=extra_chrome_flags,
subtract_extra_chrome_flags=subtract_extra_chrome_flags,
*args, **kwargs)
if self.skip_oobe or self.auto_login:
self.pyauto.SkipToLogin()
if self.auto_login:
self.login(self.username, self.password)
if is_creating_owner:
login.wait_for_ownership()
def __resolve_creds(self, creds):
"""Map credential identifier to username, password and type.
Args:
creds: credential identifier to resolve.
Returns:
A (username, password) tuple.
"""
if not creds:
return [None, None] # Browse without signing-in.
if creds[0] == '$':
if creds not in constants.CREDENTIALS:
raise error.TestFail('Unknown credentials: %s' % creds)
(name, passwd) = constants.CREDENTIALS[creds]
return [cryptohome.canonicalize(name), passwd]
(name, passwd) = creds.split(':')
return [cryptohome.canonicalize(name), passwd]
def take_screenshot(self, fname_prefix, format='png'):
"""Take screenshot and save to a new file in the results dir.
Args:
fname_prefix: prefix for the output fname
format: string indicating file format ('png', 'jpg', etc)
Returns:
the path of the saved screenshot file
"""
next_index = len(glob.glob(
os.path.join(self.resultsdir, '%s-*.%s' % (fname_prefix, format))))
screenshot_file = os.path.join(
self.resultsdir, '%s-%d.%s' % (fname_prefix, next_index, format))
logging.info('Saving screenshot to %s.' % screenshot_file)
utils.system('DISPLAY=:0.0 XAUTHORITY=/home/chronos/.Xauthority '
'/usr/local/bin/import -window root -depth 8 %s' %
screenshot_file)
return screenshot_file
def login(self, username=None, password=None):
"""Log in with a set of credentials.
This method is called from UITest.initialize(), so you won't need it
unless your testcase has cause to log in multiple times. This
DOES NOT affect self.username or self.password.
If username and self.username are not defined, logs in as guest.
Forces a log out if already logged in.
Blocks until login is complete.
Args:
username: username to log in as, defaults to self.username.
password: password to log in with, defaults to self.password.
Raises:
error.TestError, if login has an error
"""
if self.logged_in():
self.logout()
uname = username or self.username
passwd = password or self.password
try:
screenshot_name = 'login-success-screenshot'
if uname: # Regular login
login_error = self.pyauto.Login(username=uname,
password=passwd)
if login_error:
screenshot_name = 'login-error-screenshot'
raise error.TestFail(
'Error during login (%s, %s): %s. See the file named '
'%s.png in the results folder.' % (uname, passwd,
login_error, screenshot_name))
else: # Login as guest
self.pyauto.LoginAsGuest()
logging.info('Logged in as guest.')
if not self.logged_in():
screenshot_name = 'login-bizarre-fail-screenshot'
raise error.TestFail('Login was successful, but logged_in() '
'returned False. This should not happen. '
'Please check the file named %s.png '
'located in the results folder.' %
screenshot_name)
except:
# If Login() times out, update error messages.
screenshot_name = 'login-timeout-fail-screenshot'
raise error.TestFail('Login timed out. Please check the file '
'named %s.png located in the results '
'folder.' % screenshot_name)
finally:
self.take_screenshot(fname_prefix=screenshot_name)
self.stop_tcpdump(fname_prefix='tcpdump-lo--till-login')
logging.info('Logged in as %s. You can verify with the '
'file named %s.png located in the results '
'folder.' % (uname, screenshot_name))
def logged_in(self):
return self.pyauto.GetLoginInfo()['is_logged_in']
def logout(self):
"""Log out.
This method is called from UITest.cleanup(), so you won't need it
unless your testcase needs to test functionality while logged out.
"""
if not self.logged_in():
return
self._save_logs_from_cryptohome()
try:
cros_ui.restart(self.pyauto.Logout)
except:
self.__perform_ui_diagnostics()
if not login.wait_for_browser_exit('Chrome crashed during logout'):
self.__generate_coredumps([constants.BROWSER])
raise
def _save_logs_from_cryptohome(self):
"""Recover dirs from cryptohome in case another test run wipes."""
try:
for dir in constants.CRYPTOHOME_DIRS_TO_RECOVER:
dir_path = os.path.join(constants.CRYPTOHOME_MOUNT_PT, dir)
if os.path.isdir(dir_path):
target = os.path.join(self.resultsdir,
'%s-%f' % (dir, time.time()))
logging.debug('Saving %s to %s.', dir_path, target)
shutil.copytree(src=dir_path, dst=target, symlinks=True)
except (IOError, OSError, shutil.Error) as err:
logging.error(err)
def validate_basic_policy(self, basic_policy):
# Pull in protobuf definitions.
sys.path.append(self.srcdir)
from device_management_backend_pb2 import PolicyFetchResponse
from device_management_backend_pb2 import PolicyData
from chrome_device_policy_pb2 import ChromeDeviceSettingsProto
from chrome_device_policy_pb2 import UserWhitelistProto
response_proto = PolicyFetchResponse()
response_proto.ParseFromString(basic_policy)
ownership.assert_has_policy_data(response_proto)
poldata = PolicyData()
poldata.ParseFromString(response_proto.policy_data)
ownership.assert_has_device_settings(poldata)
ownership.assert_username(poldata, self.username)
polval = ChromeDeviceSettingsProto()
polval.ParseFromString(poldata.policy_value)
ownership.assert_new_users(polval, True)
ownership.assert_users_on_whitelist(polval, (self.username,))
def __log_crashed_processes(self, processes):
"""Runs through the log watched by |watcher| to see if a crash was
reported for any process names not listed in |processes|. SIGABRT
crashes in chrome or supplied-chrome during ui restart are ignored.
"""
ui_restart_begin_regex = re.compile(cros_ui.UI_RESTART_ATTEMPT_MSG)
crash_regex = re.compile(
'Received crash notification for ([-\w]+).+ (sig \d+)')
ui_restart_end_regex = re.compile(cros_ui.UI_RESTART_COMPLETE_MSG)
in_restart = False
for line in self._log_reader.get_logs().splitlines():
if ui_restart_begin_regex.search(line):
in_restart = True
elif ui_restart_end_regex.search(line):
in_restart = False
else:
match = crash_regex.search(line)
if (match and not match.group(1) in processes and
not (in_restart and
(match.group(1) == constants.BROWSER or
match.group(1) == 'supplied_chrome') and
match.group(2) == 'sig 6')):
self.job.record('INFO', self.tagged_testname,
line[match.start():])
def execute(self, iterations=None, test_length=None,
profile_only=None, _get_time=time.time,
postprocess_profiled_run=None, constraints=(), *args, **kwargs):
"""Wrapper around execute to take a screenshot for any exception."""
try:
super(UITest, self).execute(iterations=iterations,
test_length=test_length,
profile_only=profile_only,
_get_time=_get_time,
postprocess_profiled_run=
postprocess_profiled_run,
constraints=constraints,
*args, **kwargs)
except:
self.take_screenshot(fname_prefix='test-fail-screenshot')
raise
def cleanup(self):
"""Overridden from pyauto_test.cleanup() to log out and restart
session_manager when the test is complete.
"""
try:
# Save all chrome logs created during the test.
try:
for fullpath in glob.glob(
constants.CHROME_LOG_DIR + '/chrome_*'):
if os.path.isfile(fullpath) and \
not os.path.islink(fullpath) and \
fullpath > self._last_chrome_log: # ignore old logs
shutil.copy2(fullpath, self.resultsdir)
except (IOError, OSError) as err:
logging.error(err)
self._save_logs_from_cryptohome()
pyauto_test.PyAutoTest.cleanup(self)
if os.path.isfile(constants.CRYPTOHOMED_LOG):
try:
base = os.path.basename(constants.CRYPTOHOMED_LOG)
shutil.copy(constants.CRYPTOHOMED_LOG,
os.path.join(self.resultsdir, base))
except (IOError, OSError) as err:
logging.error(err)
if self.fake_owner:
logging.info('Erasing fake owner state.')
ownership.clear_ownership()
self.__log_crashed_processes(self.crash_blacklist)
if os.path.isfile(constants.CHROME_CORE_MAGIC_FILE):
os.unlink(constants.CHROME_CORE_MAGIC_FILE)
finally:
self.stop_chrome_event_tracing()
self.__log_all_processes('processes--after-tracing')
self.stop_tcpdump(fname_prefix='tcpdump-lo--till-end')
self.stop_authserver()
def get_auth_endpoint_misses(self):
if hasattr(self, '_authServer'):
return self._authServer.get_endpoint_misses()
else:
return {}