blob: a959aacba7ee0072a777263952c71d48f0d04d00 [file] [log] [blame]
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import dbus, logging, os, re, shutil, socket, stat, sys, time
import auth_server, constants, cryptohome, dns_server
import cros_logging, cros_ui, login, ownership, pyauto_test
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error
from dbus.mainloop.glib import DBusGMainLoop
from autotest_lib.client.cros import flimflam_test_path
import flimflam
class UITest(pyauto_test.PyAutoTest):
"""Base class for tests that drive some portion of the user interface.
By default subclasses will use the default remote credentials before
the run_once method is invoked, and will log out at the completion
of the test case even if an exception is thrown.
Subclasses can opt out of the automatic login by setting the member
variable 'auto_login' to False.
Subclasses can log in with arbitrary credentials by passing
the 'creds' parameter in their control file. See the documentation of
UITest.initialize for more details.
If your subclass overrides the initialize() or cleanup() methods, it
should make sure to invoke this class' version of those methods as well.
The standard super(...) function cannot be used for this, since the base
test class is not a 'new style' Python class.
"""
version = 1
auto_login = True
fake_owner = True
username = None
password = None
# Processes that we know crash and are willing to ignore.
crash_blacklist = []
# This is a symlink. We look up the real path at runtime by following it.
_resolv_test_file = 'resolv.conf.test'
_resolv_bak_file = 'resolv.conf.bak'
def __init__(self, job, bindir, outputdir):
pyauto_test.PyAutoTest.__init__(self, job, bindir, outputdir)
def xsystem(self, cmd, timeout=None, ignore_status=False):
"""Convenience wrapper around cros_ui.xsystem, to save you an import.
"""
return cros_ui.xsystem(cmd, timeout, ignore_status)
def listen_to_signal(self, callback, signal, interface):
"""Listens to the given |signal| that is sent to power manager.
"""
self._system_bus.add_signal_receiver(
handler_function=callback,
signal_name=signal,
dbus_interface=interface,
bus_name=None,
path='/')
def __connect_to_flimflam(self):
"""Connect to the network manager via DBus.
Stores dbus connection in self._flim upon success, throws on failure.
"""
self._bus_loop = DBusGMainLoop(set_as_default=True)
self._system_bus = dbus.SystemBus(mainloop=self._bus_loop)
self._flim = flimflam.FlimFlam(self._system_bus)
def __get_host_by_name(self, hostname):
"""Resolve the dotted-quad IPv4 address of |hostname|
This used to use suave python code, like this:
hosts = socket.getaddrinfo(hostname, 80, socket.AF_INET)
(fam, socktype, proto, canonname, (host, port)) = hosts[0]
return host
But that hangs sometimes, and we don't understand why. So, use
a subprocess with a timeout.
"""
try:
host = utils.system_output('%s -c "import socket; '
'print socket.gethostbyname(\'%s\')"' % (
sys.executable, hostname),
ignore_status=True, timeout=2)
except Exception as e:
logging.warning(e)
return None
return host or None
def __attempt_resolve(self, hostname, ip, expected=True):
logging.debug('Attempting to resolve %s to %s' % (hostname, ip))
try:
host = self.__get_host_by_name(hostname)
logging.debug('Resolve attempt for %s got %s' % (hostname, host))
return host and (host == ip) == expected
except socket.gaierror as err:
logging.error(err)
def use_local_dns(self, dns_port=53):
"""Set all devices to use our in-process mock DNS server.
"""
self._dnsServer = dns_server.LocalDns(fake_ip='127.0.0.1',
local_port=dns_port)
self._dnsServer.run()
# Turn off captive portal checking, until we fix
# http://code.google.com/p/chromium-os/issues/detail?id=19640
self.check_portal_list = self._flim.GetCheckPortalList()
self._flim.SetCheckPortalList('')
# Set all devices to use locally-running DNS server.
try:
# Follow resolv.conf symlink.
resolv = os.path.realpath(constants.RESOLV_CONF_FILE)
# Grab path to the real file, do following work in that directory.
resolv_dir = os.path.dirname(resolv)
resolv_test = os.path.join(resolv_dir, self._resolv_test_file)
resolv_bak = os.path.join(resolv_dir, self._resolv_bak_file)
resolv_contents = 'nameserver 127.0.0.1'
# Back up the current resolv.conf.
os.rename(resolv, resolv_bak)
# To stop flimflam from editing resolv.conf while we're working
# with it, we want to make the directory -r-x-r-x-r-x. Open an
# fd to the file first, so that we'll retain the ability to
# alter it.
resolv_fd = open(resolv, 'w')
self._resolv_dir_mode = os.stat(resolv_dir).st_mode
os.chmod(resolv_dir, (stat.S_IRUSR | stat.S_IXUSR |
stat.S_IRGRP | stat.S_IXGRP |
stat.S_IROTH | stat.S_IXOTH))
resolv_fd.write(resolv_contents)
resolv_fd.close()
assert utils.read_one_line(resolv) == resolv_contents
except Exception as e:
logging.error(str(e))
raise e
utils.poll_for_condition(
lambda: self.__attempt_resolve('www.google.com.', '127.0.0.1'),
utils.TimeoutError('Timed out waiting for DNS changes.'),
timeout=10)
def revert_dns(self):
"""Clear the custom DNS setting for all devices and force them to use
DHCP to pull the network's real settings again.
"""
try:
# Follow resolv.conf symlink.
resolv = os.path.realpath(constants.RESOLV_CONF_FILE)
# Grab path to the real file, do following work in that directory.
resolv_dir = os.path.dirname(resolv)
resolv_bak = os.path.join(resolv_dir, self._resolv_bak_file)
os.chmod(resolv_dir, self._resolv_dir_mode)
os.rename(resolv_bak, resolv)
utils.poll_for_condition(
lambda: self.__attempt_resolve('www.google.com.',
'127.0.0.1',
expected=False),
utils.TimeoutError('Timed out waiting to revert DNS.'),
timeout=10)
finally:
# Set captive portal checking to whatever it was at the start.
self._flim.SetCheckPortalList(self.check_portal_list)
def start_authserver(self):
"""Spin up a local mock of the Google Accounts server, then spin up
a local fake DNS server and tell the networking stack to use it. This
will trick Chrome into talking to our mock when we login.
Subclasses can override this method to change this behavior.
"""
self._authServer = auth_server.GoogleAuthServer()
self._authServer.run()
self.use_local_dns()
def stop_authserver(self):
"""Tears down fake dns and fake Google Accounts server. If your
subclass does not create these objects, you will want to override this
method as well.
"""
if hasattr(self, '_authServer'):
self.revert_dns()
self._authServer.stop()
self._dnsServer.stop()
def initialize(self, creds=None, is_creating_owner=False,
extra_chrome_flags=[]):
"""Overridden from test.initialize() to log out and (maybe) log in.
If self.auto_login is True, this will automatically log in using the
credentials specified by 'creds' at startup, otherwise login will not
happen.
Regardless of the state of self.auto_login, the self.username and
self.password properties will be set to the credentials specified
by 'creds'.
Authentication is not performed against live servers. Instead, we spin
up a local DNS server that will lie and say that all sites resolve to
127.0.0.1. We use DBus to tell flimflam to use this DNS server to
resolve addresses. We then spin up a local httpd that will respond
to queries at the Google Accounts endpoints. We clear the DNS setting
and tear down these servers in cleanup().
Args:
creds: String specifying the credentials for this test case. Can
be a named set of credentials as defined by
constants.CREDENTIALS, or a 'username:password' pair.
Defaults to None -- browse without signing-in.
is_creating_owner: If the test case is creating a new device owner.
extra_chrome_flags: Extra chrome flags to pass to chrome, if any.
"""
# Mark /var/log/messages now; we'll run through all subsequent
# log messages at the end of the test and log info about processes that
# crashed.
self._log_reader = cros_logging.LogReader()
self._log_reader.set_start_by_current()
self.__connect_to_flimflam()
if creds:
self.start_authserver()
# We yearn for Chrome coredumps...
open(constants.CHROME_CORE_MAGIC_FILE, 'w').close()
# The UI must be taken down to ensure that no stale state persists.
cros_ui.stop()
(self.username, self.password) = self.__resolve_creds(creds)
# Ensure there's no stale cryptohome from previous tests.
try:
cryptohome.remove_vault(self.username)
except cryptohome.ChromiumOSError as err:
logging.error(err)
# Fake ownership unless the test is explicitly testing owner creation.
if not is_creating_owner:
logging.info('Faking ownership...')
self.__fake_ownership()
self.fake_owner = True
else:
logging.info('Erasing stale owner state.')
ownership.clear_ownership()
self.fake_owner = False
cros_ui.start()
pyauto_test.PyAutoTest.initialize(self, auto_login=False,
extra_chrome_flags=extra_chrome_flags)
if self.auto_login:
self.login(self.username, self.password)
if is_creating_owner:
login.wait_for_ownership()
def __fake_ownership(self):
"""Fake ownership by generating the necessary magic files."""
# Determine the module directory.
dirname = os.path.dirname(__file__)
mock_certfile = os.path.join(dirname, constants.MOCK_OWNER_CERT)
mock_signedpolicyfile = os.path.join(dirname,
constants.MOCK_OWNER_POLICY)
utils.open_write_close(
constants.OWNER_KEY_FILE,
ownership.cert_extract_pubkey_der(mock_certfile))
shutil.copy(mock_signedpolicyfile,
constants.SIGNED_POLICY_FILE)
def __resolve_creds(self, creds):
"""Map credential identifier to username, password and type.
Args:
creds: credential identifier to resolve.
Returns:
A (username, password) tuple.
"""
if not creds:
return [None, None] # Browse without signing-in.
if creds[0] == '$':
if creds not in constants.CREDENTIALS:
raise error.TestFail('Unknown credentials: %s' % creds)
(name, passwd) = constants.CREDENTIALS[creds]
return [cryptohome.canonicalize(name), passwd]
(name, passwd) = creds.split(':')
return [cryptohome.canonicalize(name), passwd]
def login(self, username=None, password=None):
"""Log in with a set of credentials.
This method is called from UITest.initialize(), so you won't need it
unless your testcase has cause to log in multiple times. This
DOES NOT affect self.username or self.password.
If username and self.username are not defined, logs in as guest.
Forces a log out if already logged in.
Blocks until login is complete.
TODO(nirnimesh): Does NOT work with webui login
crosbug.com/18271
Args:
username: username to log in as, defaults to self.username.
password: password to log in with, defaults to self.password.
Raises:
error.TestError, if login has an error
"""
if self.logged_in():
self.logout()
uname = username or self.username
passwd = password or self.password
if uname: # Regular login
login_error = self.pyauto.Login(username=uname, password=passwd)
if login_error:
raise error.TestError('Error during login (%s, %s): %s.' % (
uname, passwd, login_error))
logging.info('Logged in as %s.' % uname)
else: # Login as guest
self.pyauto.LoginAsGuest()
logging.info('Logged in as guest.')
if not self.logged_in():
raise error.TestError('Not logged in')
def logged_in(self):
return self.pyauto.GetLoginInfo()['is_logged_in']
def logout(self):
"""Log out.
This method is called from UITest.cleanup(), so you won't need it
unless your testcase needs to test functionality while logged out.
"""
if not self.logged_in():
return
self._save_logs_from_cryptohome()
cros_ui.restart(self.pyauto.Logout)
def _save_logs_from_cryptohome(self):
"""Recover dirs from cryptohome in case another test run wipes."""
try:
for dir in constants.CRYPTOHOME_DIRS_TO_RECOVER:
dir_path = os.path.join(constants.CRYPTOHOME_MOUNT_PT, dir)
if os.path.isdir(dir_path):
target = os.path.join(self.resultsdir,
'%s-%f' % (dir, time.time()))
logging.debug('Saving %s to %s.', dir_path, target)
shutil.copytree(src=dir_path, dst=target, symlinks=True)
except (IOError, OSError, shutil.Error) as err:
logging.error(err)
def validate_basic_policy(self, basic_policy):
# Pull in protobuf definitions.
sys.path.append(self.srcdir)
from device_management_backend_pb2 import PolicyFetchResponse
from device_management_backend_pb2 import PolicyData
from chrome_device_policy_pb2 import ChromeDeviceSettingsProto
from chrome_device_policy_pb2 import UserWhitelistProto
response_proto = PolicyFetchResponse()
response_proto.ParseFromString(basic_policy)
ownership.assert_has_policy_data(response_proto)
poldata = PolicyData()
poldata.ParseFromString(response_proto.policy_data)
ownership.assert_has_device_settings(poldata)
ownership.assert_username(poldata, self.username)
polval = ChromeDeviceSettingsProto()
polval.ParseFromString(poldata.policy_value)
ownership.assert_new_users(polval, True)
ownership.assert_users_on_whitelist(polval, (self.username,))
def __log_crashed_processes(self, processes):
"""Runs through the log watched by |watcher| to see if a crash was
reported for any process names listed in |processes|. SIGABRT crashes in
chrome or supplied-chrome during ui restart are ignored.
"""
ui_restart_begin_regex = re.compile(cros_ui.UI_RESTART_ATTEMPT_MSG)
crash_regex = re.compile(
'Received crash notification for ([-\w]+).+ (sig \d+)')
ui_restart_end_regex = re.compile(cros_ui.UI_RESTART_COMPLETE_MSG)
in_restart = False
for line in self._log_reader.get_logs().splitlines():
if ui_restart_begin_regex.search(line):
in_restart = True
elif ui_restart_end_regex.search(line):
in_restart = False
else:
match = crash_regex.search(line)
if (match and not match.group(1) in processes and
not (in_restart and
(match.group(1) == constants.BROWSER or
match.group(1) == 'supplied_chrome') and
match.group(2) == 'sig 6')):
self.job.record('INFO', self.tagged_testname,
line[match.start():])
def cleanup(self):
"""Overridden from pyauto_test.cleanup() to log out and restart
session_manager when the test is complete.
"""
try:
logpath = constants.CHROME_LOG_DIR
try:
for filename in os.listdir(logpath):
fullpath = os.path.join(logpath, filename)
if os.path.isfile(fullpath):
shutil.copy(fullpath, os.path.join(self.resultsdir,
filename))
except (IOError, OSError) as err:
logging.error(err)
self._save_logs_from_cryptohome()
pyauto_test.PyAutoTest.cleanup(self)
if os.path.isfile(constants.CRYPTOHOMED_LOG):
try:
base = os.path.basename(constants.CRYPTOHOMED_LOG)
shutil.copy(constants.CRYPTOHOMED_LOG,
os.path.join(self.resultsdir, base))
except (IOError, OSError) as err:
logging.error(err)
if self.fake_owner:
logging.info('Erasing fake owner state.')
ownership.clear_ownership()
self.__log_crashed_processes(self.crash_blacklist)
if os.path.isfile(constants.CHROME_CORE_MAGIC_FILE):
os.unlink(constants.CHROME_CORE_MAGIC_FILE)
finally:
self.stop_authserver()
def get_auth_endpoint_misses(self):
if hasattr(self, '_authServer'):
return self._authServer.get_endpoint_misses()
else:
return {}