blob: 2d6299e4745682871dc7cc0e7a33c2ccaae84c68 [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from datetime import datetime, timedelta
import collections
import json
import logging
import os
import time
import urlparse
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib.cros import dev_server
from autotest_lib.server import autotest, test
from autotest_lib.server.cros.dynamic_suite import tools
def snippet(text):
"""Returns the text with start/end snip markers around it.
@param text: The snippet text.
@return The text with start/end snip markers around it.
"""
snip = '---8<---' * 10
start = '-- START -'
end = '-- END -'
return ('%s%s\n%s\n%s%s' %
(start, snip[len(start):], text, end, snip[len(end):]))
# Update event types.
EVENT_TYPE_DOWNLOAD_COMPLETE = '1'
EVENT_TYPE_INSTALL_COMPLETE = '2'
EVENT_TYPE_UPDATE_COMPLETE = '3'
EVENT_TYPE_DOWNLOAD_STARTED = '13'
EVENT_TYPE_DOWNLOAD_FINISHED = '14'
EVENT_TYPE_REBOOTED_AFTER_UPDATE = '54'
# Update event results.
EVENT_RESULT_ERROR = '0'
EVENT_RESULT_SUCCESS = '1'
EVENT_RESULT_SUCCESS_REBOOT = '2'
EVENT_RESULT_UPDATE_DEFERRED = '9'
# Omaha event types/results, from update_engine/omaha_request_action.h
# These are stored in dict form in order to easily print out the keys.
EVENT_TYPE_DICT = {
EVENT_TYPE_DOWNLOAD_COMPLETE: 'download_complete',
EVENT_TYPE_INSTALL_COMPLETE: 'install_complete',
EVENT_TYPE_UPDATE_COMPLETE: 'update_complete',
EVENT_TYPE_DOWNLOAD_STARTED: 'download_started',
EVENT_TYPE_DOWNLOAD_FINISHED: 'download_finished',
EVENT_TYPE_REBOOTED_AFTER_UPDATE: 'rebooted_after_update'
}
EVENT_RESULT_DICT = {
EVENT_RESULT_ERROR: 'error',
EVENT_RESULT_SUCCESS: 'success',
EVENT_RESULT_SUCCESS_REBOOT: 'success_reboot',
EVENT_RESULT_UPDATE_DEFERRED: 'update_deferred'
}
class ExpectedUpdateEventChainFailed(error.TestFail):
"""Raised if we fail to receive an expected event in a chain."""
class ExpectedUpdateEvent(object):
"""Defines an expected event in an update process."""
_ATTR_NAME_DICT_MAP = {
'event_type': EVENT_TYPE_DICT,
'event_result': EVENT_RESULT_DICT,
}
_VALID_TYPES = set(EVENT_TYPE_DICT.keys())
_VALID_RESULTS = set(EVENT_RESULT_DICT.keys())
def __init__(self, event_type=None, event_result=None, version=None,
previous_version=None, on_error=None):
"""Initializes an event expectation.
@param event_type: Expected event type.
@param event_result: Expected event result code.
@param version: Expected reported image version.
@param previous_version: Expected reported previous image version.
@param on_error: This is either an object to be returned when a received
event mismatches the expectation, or a callable used
for generating one. In the latter case, takes as
input two attribute dictionaries (expected and actual)
and an iterable of mismatched keys. If None, a generic
message is returned.
"""
if event_type and event_type not in self._VALID_TYPES:
raise ValueError('event_type %s is not valid.' % event_type)
if event_result and event_result not in self._VALID_RESULTS:
raise ValueError('event_result %s is not valid.' % event_result)
self._expected_attrs = {
'event_type': event_type,
'event_result': event_result,
'version': version,
'previous_version': previous_version,
}
self._on_error = on_error
@staticmethod
def _attr_val_str(attr_val, helper_dict, default=None):
"""Returns an enriched attribute value string, or default."""
if not attr_val:
return default
s = str(attr_val)
if helper_dict:
s += ':%s' % helper_dict.get(attr_val, 'unknown')
return s
def _attr_name_and_values(self, attr_name, expected_attr_val,
actual_attr_val=None):
"""Returns an attribute name, expected and actual value strings.
This will return (name, expected, actual); the returned value for
actual will be None if its respective input is None/empty.
"""
helper_dict = self._ATTR_NAME_DICT_MAP.get(attr_name)
expected_attr_val_str = self._attr_val_str(expected_attr_val,
helper_dict,
default='any')
actual_attr_val_str = self._attr_val_str(actual_attr_val, helper_dict)
return attr_name, expected_attr_val_str, actual_attr_val_str
def _attrs_to_str(self, attrs_dict):
return ' '.join(['%s=%s' %
self._attr_name_and_values(attr_name, attr_val)[0:2]
for attr_name, attr_val in attrs_dict.iteritems()])
def __str__(self):
return self._attrs_to_str(self._expected_attrs)
def verify(self, actual_event):
"""Verify the attributes of an actual event.
@param actual_event: a dictionary containing event attributes
@return An error message, or None if all attributes as expected.
"""
mismatched_attrs = [
attr_name for attr_name, expected_attr_val
in self._expected_attrs.iteritems()
if (expected_attr_val and
not self._verify_attr(attr_name, expected_attr_val,
actual_event.get(attr_name)))]
if not mismatched_attrs:
return None
if callable(self._on_error):
return self._on_error(self._expected_attrs, actual_event,
mismatched_attrs)
if self._on_error is None:
return ('Received event (%s) does not match expectation (%s)' %
(self._attrs_to_str(actual_event), self))
return self._on_error
def _verify_attr(self, attr_name, expected_attr_val, actual_attr_val):
"""Verifies that an actual log event attributes matches expected on.
@param attr_name: name of the attribute to verify
@param expected_attr_val: expected attribute value
@param actual_attr_val: actual attribute value
@return True if actual value is present and matches, False otherwise.
"""
# None values are assumed to be missing and non-matching.
if actual_attr_val is None:
logging.error('No value found for %s (expected %s)',
*self._attr_name_and_values(attr_name,
expected_attr_val)[0:2])
return False
# We allow expected version numbers (e.g. 2940.0.0) to be contained in
# actual values (2940.0.0-a1); this is necessary for the test to pass
# with developer / non-release images.
if (actual_attr_val == expected_attr_val or
('version' in attr_name and expected_attr_val in actual_attr_val)):
return True
return False
def get_attrs(self):
"""Returns a dictionary of expected attributes."""
return dict(self._expected_attrs)
class ExpectedUpdateEventChain(object):
"""Defines a chain of expected update events."""
def __init__(self):
self._expected_events_chain = []
self._current_timestamp = None
def add_event(self, expected_events, timeout, on_timeout=None):
"""Adds an expected event to the chain.
@param expected_events: The ExpectedEvent, or a list thereof, to wait
for. If a list is passed, it will wait for *any*
of the provided events, but only one of those.
@param timeout: A timeout (in seconds) to wait for the event.
@param on_timeout: An error string to use if the event times out. If
None, a generic message is used.
"""
if isinstance(expected_events, ExpectedUpdateEvent):
expected_events = [expected_events]
self._expected_events_chain.append(
(expected_events, timeout, on_timeout))
@staticmethod
def _format_event_with_timeout(expected_events, timeout):
"""Returns a string representation of the event, with timeout."""
until = 'within %s seconds' % timeout if timeout else 'indefinitely'
return '%s, %s' % (' OR '.join(map(str, expected_events)), until)
def __str__(self):
return ('[%s]' %
', '.join(
[self._format_event_with_timeout(expected_events, timeout)
for expected_events, timeout, _
in self._expected_events_chain]))
def __repr__(self):
return str(self._expected_events_chain)
def verify(self, get_next_event):
"""Verifies that an actual stream of events complies.
@param get_next_event: a function returning the next event
@raises ExpectedUpdateEventChainFailed if we failed to verify an event.
"""
for expected_events, timeout, on_timeout in self._expected_events_chain:
logging.info('Expecting %s',
self._format_event_with_timeout(expected_events,
timeout))
err_msg = self._verify_event_with_timeout(
expected_events, timeout, on_timeout, get_next_event)
if err_msg is not None:
logging.error('Failed expected event: %s', err_msg)
raise ExpectedUpdateEventChainFailed(err_msg)
def _verify_event_with_timeout(self, expected_events, timeout, on_timeout,
get_next_event):
"""Verify an expected event occurs within a given timeout.
@param expected_events: the list of possible events expected next
@param timeout: specified in seconds
@param on_timeout: A string to return if timeout occurs, or None.
@param get_next_event: function returning the next event in a stream
@return None if event complies, an error string otherwise.
"""
new_event = get_next_event()
if new_event:
# If this is the first event, set it as the current time
if self._current_timestamp is None:
self._current_timestamp = datetime.strptime(new_event[
'timestamp'],
'%Y-%m-%d %H:%M:%S')
# Get the time stamp for the current event and convert to datetime
timestamp = new_event['timestamp']
event_timestamp = datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S')
# Add the timeout onto the timestamp to get its expiry
event_timeout = self._current_timestamp + timedelta(seconds=timeout)
# If the event happened before the timeout
if event_timestamp < event_timeout:
difference = event_timestamp - self._current_timestamp
logging.info('Event took %s seconds to fire during the '
'update', difference.seconds)
results = [event.verify(new_event) for event in expected_events]
self._current_timestamp = event_timestamp
return None if None in results else ' AND '.join(results)
logging.error('Timeout expired')
if on_timeout is None:
return ('Waiting for event %s timed out after %d seconds' %
(' OR '.join(map(str, expected_events)), timeout))
return on_timeout
class UpdateEventLogVerifier(object):
"""Verifies update event chains on a devserver update log."""
def __init__(self, event_log_filename):
self._event_log_filename = event_log_filename
self._event_log = []
self._num_consumed_events = 0
def verify_expected_events_chain(self, expected_event_chain):
"""Verify a given event chain.
@param expected_event_chain: instance of expected event chain.
@raises ExpectedUpdateEventChainFailed if we failed to verify the an
event.
"""
expected_event_chain.verify(self._get_next_log_event)
def _get_next_log_event(self):
"""Returns the next event in an event log.
Uses the filename handed to it during initialization to read the
host log from devserver used during the update.
@return The next new event in the host log, as reported by devserver;
None if no such event was found or an error occurred.
"""
# (Re)read event log from hostlog file, if necessary.
if len(self._event_log) <= self._num_consumed_events:
try:
with open(self._event_log_filename, 'r') as out_log:
self._event_log = json.loads(out_log.read())
except Exception as e:
raise error.TestFail('Error while reading the hostlogs '
'from devserver: %s' % e)
# Return next new event, if one is found.
if len(self._event_log) > self._num_consumed_events:
new_event = {
key: str(val) for key, val
in self._event_log[self._num_consumed_events].iteritems()
}
self._num_consumed_events += 1
logging.info('Consumed new event: %s', new_event)
return new_event
class TestPlatform(object):
"""An interface and factory for platform-dependent functionality."""
# Named tuple containing urls for staged urls needed for test.
# source_url: url to find the update payload for the source image.
# source_stateful_url: url to find the stateful payload for the source
# image.
# target_url: url to find the update payload for the target image.
# target_stateful_url: url to find the stateful payload for the target
# image.
StagedURLs = collections.namedtuple(
'StagedURLs',
['source_url', 'source_stateful_url', 'target_url',
'target_stateful_url'])
def __init__(self):
assert False, 'Cannot instantiate this interface'
@staticmethod
def create(host):
"""Returns a TestPlatform implementation based on the host type.
*DO NOT* override this method.
@param host: a host object representing the DUT
@return A TestPlatform implementation.
"""
os_type = host.get_os_type()
if os_type in ('cros', 'moblab'):
return ChromiumOSTestPlatform(host)
raise error.TestError('Unknown OS type reported by host: %s' % os_type)
def initialize(self, autotest_devserver):
"""Initialize the object.
@param autotest_devserver: Instance of client.common_lib.dev_server to
use to reach the devserver instance for this
build.
"""
raise NotImplementedError
def prep_artifacts(self, test_conf):
"""Prepares update artifacts for the test.
The test config must include 'source_payload_uri' and
'target_payload_uri'. In addition, it may include platform-specific
values as determined by the test control file.
@param test_conf: Dictionary containing the test configuration.
@return A tuple of staged URLs.
@raise error.TestError on failure.
"""
raise NotImplementedError
def reboot_device(self):
"""Reboots the device."""
raise NotImplementedError
def prep_device_for_update(self, source_payload_uri):
"""Prepares the device for update.
@param source_payload_uri: Source payload GS URI to install.
@raise error.TestError on failure.
"""
raise NotImplementedError
def get_active_slot(self):
"""Returns the active boot slot of the device."""
raise NotImplementedError
def start_update_perf(self, bindir):
"""Starts performance monitoring (if available).
@param bindir: Directory containing test binary files.
"""
raise NotImplementedError
def stop_update_perf(self):
"""Stops performance monitoring and returns data (if available).
@return Dictionary containing performance attributes.
"""
raise NotImplementedError
def trigger_update(self, target_payload_uri):
"""Kicks off an update.
@param target_payload_uri: The GS URI to use for the update.
"""
raise NotImplementedError
def finalize_update(self):
"""Performs post-update procedures."""
raise NotImplementedError
def get_update_log(self, num_lines):
"""Returns the update log.
@param num_lines: Number of log lines to return (tail), zero for all.
@return String containing the last |num_lines| from the update log.
"""
raise NotImplementedError
def check_device_after_update(self):
"""Runs final sanity checks.
@raise error.TestError on failure.
"""
raise NotImplementedError
def oobe_triggers_update(self):
"""Returns True if this host has an OOBE flow during which
it will perform an update check and perhaps an update.
One example of such a flow is Hands-Off Zero-Touch Enrollment.
@return Boolean indicating whether the DUT's OOBE triggers an update.
"""
raise NotImplementedError
def verify_version(self, version):
"""Compares the OS version on the DUT with the provided version.
@param version: The version to compare with (string).
@raise error.TestFail if the versions differ.
"""
actual_version = self._host.get_release_version()
if actual_version != version:
err_msg = 'Failed to verify OS version. Expected %s, was %s' % (
version, actual_version)
logging.error(err_msg)
raise error.TestFail(err_msg)
class ChromiumOSTestPlatform(TestPlatform):
"""A TestPlatform implementation for Chromium OS."""
_STATEFUL_UPDATE_FILENAME = 'stateful.tgz'
def __init__(self, host):
self._host = host
self._autotest_devserver = None
self._staged_urls = None
self._perf_mon_pid = None
def _stage_payload(self, build_name, filename, archive_url=None):
"""Stage the given payload onto the devserver.
Works for either a stateful or full/delta test payload. Expects the
gs_path or a combo of build_name + filename.
@param build_name: The build name e.g. x86-mario-release/<version>.
If set, assumes default gs archive bucket and
requires filename to be specified.
@param filename: In conjunction with build_name, this is the file you
are downloading.
@param archive_url: An optional GS archive location, if not using the
devserver's default.
@return URL of the staged payload on the server.
@raise error.TestError if there's a problem with staging.
"""
try:
self._autotest_devserver.stage_artifacts(image=build_name,
files=[filename],
archive_url=archive_url)
return self._autotest_devserver.get_staged_file_url(filename,
build_name)
except dev_server.DevServerException, e:
raise error.TestError('Failed to stage payload: %s' % e)
def _stage_payload_by_uri(self, payload_uri):
"""Stage a payload based on its GS URI.
This infers the build's label, filename and GS archive from the
provided GS URI.
@param payload_uri: The full GS URI of the payload.
@return URL of the staged payload on the server.
@raise error.TestError if there's a problem with staging.
"""
archive_url, _, filename = payload_uri.rpartition('/')
build_name = urlparse.urlsplit(archive_url).path.strip('/')
return self._stage_payload(build_name, filename,
archive_url=archive_url)
def _get_stateful_uri(self, build_uri):
"""Returns a complete GS URI of a stateful update given a build path."""
return '/'.join([build_uri.rstrip('/'), self._STATEFUL_UPDATE_FILENAME])
def _payload_to_stateful_uri(self, payload_uri):
"""Given a payload GS URI, returns the corresponding stateful URI."""
build_uri = payload_uri.rpartition('/')[0]
if build_uri.endswith('payloads'):
build_uri = build_uri.rpartition('/')[0]
return self._get_stateful_uri(build_uri)
@staticmethod
def _get_update_parameters_from_uri(payload_uri):
"""Extract the two vars needed for cros_au from the Google Storage URI.
dev_server.auto_update needs two values from this test:
(1) A build_name string e.g samus-release/R60-9583.0.0
(2) A filename of the exact payload file to use for the update. This
payload needs to have already been staged on the devserver.
This function extracts those two values from a Google Storage URI.
@param payload_uri: Google Storage URI to extract values from
"""
archive_url, _, payload_file = payload_uri.rpartition('/')
build_name = urlparse.urlsplit(archive_url).path.strip('/')
# This test supports payload uris from two Google Storage buckets.
# They store their payloads slightly differently. One stores them in
# a separate payloads directory. E.g
# gs://chromeos-image-archive/samus-release/R60-9583.0.0/blah.bin
# gs://chromeos-releases/dev-channel/samus/9334.0.0/payloads/blah.bin
if build_name.endswith('payloads'):
build_name = build_name.rpartition('/')[0]
payload_file = 'payloads/' + payload_file
logging.debug('Extracted build_name: %s, payload_file: %s from %s.',
build_name, payload_file, payload_uri)
return build_name, payload_file
def _install_version(self, payload_uri, clobber_stateful=False):
"""Install the specified host with the image given by the url.
@param payload_uri: GS URI used to compute values for devserver cros_au
@param clobber_stateful: force a reinstall of the stateful image.
"""
build_name, payload_file = self._get_update_parameters_from_uri(
payload_uri)
logging.info('Installing image %s on the DUT', payload_uri)
try:
ds = self._autotest_devserver
_, pid = ds.auto_update(host_name=self._host.hostname,
build_name=build_name,
force_update=True,
full_update=True,
log_dir=self._results_dir,
payload_filename=payload_file,
clobber_stateful=clobber_stateful)
except:
logging.fatal('ERROR: Failed to install image on the DUT.')
raise
return pid
def _stage_artifacts_onto_devserver(self, test_conf):
"""Stages artifacts that will be used by the test onto the devserver.
@param test_conf: a dictionary containing test configuration values
@return a StagedURLs tuple containing the staged urls.
"""
logging.info('Staging images onto autotest devserver (%s)',
self._autotest_devserver.url())
staged_source_url = None
source_payload_uri = test_conf['source_payload_uri']
if source_payload_uri:
staged_source_url = self._stage_payload_by_uri(source_payload_uri)
# In order to properly install the source image using a full
# payload we'll also need the stateful update that comes with it.
# In general, tests may have their source artifacts in a different
# location than their payloads. This is determined by whether or
# not the source_archive_uri attribute is set; if it isn't set,
# then we derive it from the dirname of the source payload.
source_archive_uri = test_conf.get('source_archive_uri')
if source_archive_uri:
source_stateful_uri = self._get_stateful_uri(source_archive_uri)
else:
source_stateful_uri = self._payload_to_stateful_uri(
source_payload_uri)
staged_source_stateful_url = self._stage_payload_by_uri(
source_stateful_uri)
# Log source image URLs.
logging.info('Source full payload from %s staged at %s',
source_payload_uri, staged_source_url)
if staged_source_stateful_url:
logging.info('Source stateful update from %s staged at %s',
source_stateful_uri, staged_source_stateful_url)
target_payload_uri = test_conf['target_payload_uri']
staged_target_url = self._stage_payload_by_uri(target_payload_uri)
target_stateful_uri = None
staged_target_stateful_url = None
target_archive_uri = test_conf.get('target_archive_uri')
if target_archive_uri:
target_stateful_uri = self._get_stateful_uri(target_archive_uri)
else:
target_stateful_uri = self._payload_to_stateful_uri(
target_payload_uri)
if not staged_target_stateful_url and target_stateful_uri:
staged_target_stateful_url = self._stage_payload_by_uri(
target_stateful_uri)
# Log target payload URLs.
logging.info('%s test payload from %s staged at %s',
test_conf['update_type'], target_payload_uri,
staged_target_url)
logging.info('Target stateful update from %s staged at %s',
target_stateful_uri, staged_target_stateful_url)
return self.StagedURLs(staged_source_url, staged_source_stateful_url,
staged_target_url, staged_target_stateful_url)
def _run_login_test(self, tag):
"""Runs login_LoginSuccess test on the DUT."""
client_at = autotest.Autotest(self._host)
client_at.run_test('login_LoginSuccess', tag=tag)
def _start_perf_mon(self, bindir):
"""Starts monitoring performance and resource usage on a DUT.
Call _stop_perf_mon() with the returned PID to stop monitoring
and collect the results.
@param bindir: Directoy containing monitoring script.
@return The PID of the newly created DUT monitoring process.
"""
# We can't assume much about the source image so we copy the
# performance monitoring script to the DUT directly.
path = os.path.join(bindir, 'update_engine_performance_monitor.py')
self._host.send_file(path, '/tmp')
cmd = 'python /tmp/update_engine_performance_monitor.py --start-bg'
return int(self._host.run(cmd).stdout)
def _stop_perf_mon(self, perf_mon_pid):
"""Stops monitoring performance and resource usage on a DUT.
@param perf_mon_pid: the PID returned from _start_perf_mon().
@return Dictionary containing performance attributes, or None if
unavailable.
"""
# Gracefully handle problems with performance monitoring by
# just returning None.
try:
cmd = ('python /tmp/update_engine_performance_monitor.py '
'--stop-bg=%d') % perf_mon_pid
perf_json_txt = self._host.run(cmd).stdout
return json.loads(perf_json_txt)
except Exception as e:
logging.warning('Failed to parse output from '
'update_engine_performance_monitor.py: %s', e)
return None
# Interface overrides.
#
def initialize(self, autotest_devserver, results_dir):
self._autotest_devserver = autotest_devserver
self._results_dir = results_dir
def reboot_device(self):
self._host.reboot()
def prep_artifacts(self, test_conf):
self._staged_urls = self._stage_artifacts_onto_devserver(test_conf)
return self._staged_urls
def prep_device_for_update(self, source_payload_uri):
# Install the source version onto the DUT.
if self._staged_urls.source_url:
self._install_version(source_payload_uri, clobber_stateful=True)
# Make sure we can login before the target update.
self._run_login_test('source_update')
def get_active_slot(self):
return self._host.run('rootdev -s').stdout.strip()
def start_update_perf(self, bindir):
if self._perf_mon_pid is None:
self._perf_mon_pid = self._start_perf_mon(bindir)
def stop_update_perf(self):
perf_data = None
if self._perf_mon_pid is not None:
perf_data = self._stop_perf_mon(self._perf_mon_pid)
self._perf_mon_pid = None
return perf_data
def trigger_update(self, target_payload_uri):
logging.info('Updating device to target image.')
return self._install_version(target_payload_uri)
def finalize_update(self):
# Stateful update is controlled by cros_au
pass
def get_update_log(self, num_lines):
return self._host.run_output(
'tail -n %d /var/log/update_engine.log' % num_lines,
stdout_tee=None)
def check_device_after_update(self):
# Make sure we can login after update.
self._run_login_test('target_update')
def oobe_triggers_update(self):
return self._host.oobe_triggers_update()
class autoupdate_EndToEndTest(test.test):
"""Complete update test between two Chrome OS releases.
Performs an end-to-end test of updating a ChromeOS device from one version
to another. The test performs the following steps:
1. Stages the source (full) and target update payload on the central
devserver.
2. Installs a source image on the DUT (if provided) and reboots to it.
3. Then starts the target update by calling cros_au RPC on the devserver.
4. This copies the devserver code and all payloads to the DUT.
5. Starts a devserver on the DUT.
6. Starts an update pointing to this local devserver.
7. Watches as the DUT applies the update to rootfs and stateful.
8. Reboots and repeats steps 5-6, ensuring that the next update check
shows the new image version.
9. Returns the hostlogs collected during each update check for
verification against expected update events.
Some notes on naming:
devserver: Refers to a machine running the Chrome OS Update Devserver.
autotest_devserver: An autotest wrapper to interact with a devserver.
Can be used to stage artifacts to a devserver. While
this can also be used to update a machine, we do not
use it for that purpose in this test as we manage
updates with out own devserver instances (see below).
*staged_url's: In this case staged refers to the fact that these items
are available to be downloaded statically from these urls
e.g. 'localhost:8080/static/my_file.gz'. These are usually
given after staging an artifact using a autotest_devserver
though they can be re-created given enough assumptions.
*update_url's: Urls refering to the update RPC on a given omaha devserver.
Since we always use an instantiated omaha devserver to run
updates, these will always reference an existing instance
of an omaha devserver that we just created for the purposes
of updating.
"""
version = 1
# Timeout periods, given in seconds.
_WAIT_FOR_INITIAL_UPDATE_CHECK_SECONDS = 12 * 60
# TODO(sosa): Investigate why this needs to be so long (this used to be
# 120 and regressed).
_WAIT_FOR_DOWNLOAD_STARTED_SECONDS = 4 * 60
# See https://crbug.com/731214 before changing WAIT_FOR_DOWNLOAD
_WAIT_FOR_DOWNLOAD_COMPLETED_SECONDS = 20 * 60
_WAIT_FOR_UPDATE_COMPLETED_SECONDS = 4 * 60
_WAIT_FOR_UPDATE_CHECK_AFTER_REBOOT_SECONDS = 15 * 60
# Logs and their whereabouts.
_WHERE_UPDATE_LOG = ('update_engine log (in sysinfo or on the DUT, also '
'included in the test log)')
_WHERE_OMAHA_LOG = 'Omaha-devserver log (included in the test log)'
def initialize(self):
"""Sets up variables that will be used by test."""
self._host = None
self._omaha_devserver = None
self._source_image_installed = False
def cleanup(self):
"""Kill the omaha devserver if it's still around."""
if self._omaha_devserver:
self._omaha_devserver.stop_devserver()
self._omaha_devserver = None
def _get_hostlog_file(self, filename, pid):
"""Return the hostlog file location.
@param filename: The partial filename to look for.
@param pid: The pid of the update.
"""
hosts = [self._host.hostname, self._host.ip]
for host in hosts:
hostlog = '%s_%s_%s' % (filename, host, pid)
file_url = os.path.join(self.job.resultdir,
dev_server.AUTO_UPDATE_LOG_DIR,
hostlog)
if os.path.exists(file_url):
return file_url
raise error.TestFail('Could not find %s for pid %s' % (filename, pid))
def _dump_update_engine_log(self, test_platform):
"""Dumps relevant AU error log."""
try:
error_log = test_platform.get_update_log(80)
logging.error('Dumping snippet of update_engine log:\n%s',
snippet(error_log))
except Exception:
# Mute any exceptions we get printing debug logs.
pass
def _report_perf_data(self, perf_data):
"""Reports performance and resource data.
Currently, performance attributes are expected to include 'rss_peak'
(peak memory usage in bytes).
@param perf_data: A dictionary containing performance attributes.
"""
rss_peak = perf_data.get('rss_peak')
if rss_peak:
rss_peak_kib = rss_peak / 1024
logging.info('Peak memory (RSS) usage on DUT: %d KiB', rss_peak_kib)
self.output_perf_value(description='mem_usage_peak',
value=int(rss_peak_kib),
units='KiB',
higher_is_better=False)
else:
logging.warning('No rss_peak key in JSON returned by '
'update_engine_performance_monitor.py')
def _error_initial_check(self, expected, actual, mismatched_attrs):
if 'version' in mismatched_attrs:
err_msg = ('Initial update check was received but the reported '
'version is different from what was expected.')
if self._source_image_installed:
err_msg += (' The source payload we installed was probably '
'incorrect or corrupt.')
else:
err_msg += (' The DUT is probably not running the correct '
'source image.')
return err_msg
return 'A test bug occurred; inspect the test log.'
def _error_intermediate(self, expected, actual, mismatched_attrs, action,
problem):
if 'event_result' in mismatched_attrs:
event_result = actual.get('event_result')
reported = (('different than expected (%s)' %
EVENT_RESULT_DICT[event_result])
if event_result else 'missing')
return ('The updater reported result code is %s. This could be an '
'updater bug or a connectivity problem; check the %s. For '
'a detailed log of update events, check the %s.' %
(reported, self._WHERE_UPDATE_LOG, self._WHERE_OMAHA_LOG))
if 'event_type' in mismatched_attrs:
event_type = actual.get('event_type')
reported = ('different (%s)' % EVENT_TYPE_DICT[event_type]
if event_type else 'missing')
return ('Expected the updater to %s (%s) but received event type '
'is %s. This could be an updater %s; check the '
'%s. For a detailed log of update events, check the %s.' %
(action, EVENT_TYPE_DICT[expected['event_type']], reported,
problem, self._WHERE_UPDATE_LOG, self._WHERE_OMAHA_LOG))
if 'version' in mismatched_attrs:
return ('The updater reported an unexpected version despite '
'previously reporting the correct one. This is most likely '
'a bug in update engine; check the %s.' %
self._WHERE_UPDATE_LOG)
return 'A test bug occurred; inspect the test log.'
def _error_download_started(self, expected, actual, mismatched_attrs):
return self._error_intermediate(expected, actual, mismatched_attrs,
'begin downloading',
'bug, crash or provisioning error')
def _error_download_finished(self, expected, actual, mismatched_attrs):
return self._error_intermediate(expected, actual, mismatched_attrs,
'finish downloading', 'bug or crash')
def _error_update_complete(self, expected, actual, mismatched_attrs):
return self._error_intermediate(expected, actual, mismatched_attrs,
'complete the update', 'bug or crash')
def _error_reboot_after_update(self, expected, actual, mismatched_attrs):
if 'event_result' in mismatched_attrs:
event_result = actual.get('event_result')
reported = ('different (%s)' % EVENT_RESULT_DICT[event_result]
if event_result else 'missing')
return ('The updater was expected to reboot (%s) but reported '
'result code is %s. This could be a failure to reboot, an '
'updater bug or a connectivity problem; check the %s and '
'the system log. For a detailed log of update events, '
'check the %s.' %
(EVENT_RESULT_DICT[expected['event_result']], reported,
self._WHERE_UPDATE_LOG, self._WHERE_OMAHA_LOG))
if 'event_type' in mismatched_attrs:
event_type = actual.get('event_type')
reported = ('different (%s)' % EVENT_TYPE_DICT[event_type]
if event_type else 'missing')
return ('Expected to successfully reboot into the new image (%s) '
'but received event type is %s. This probably means that '
'the new image failed to verify after reboot, possibly '
'because the payload is corrupt. This might also be an '
'updater bug or crash; check the %s. For a detailed log of '
'update events, check the %s.' %
(EVENT_TYPE_DICT[expected['event_type']], reported,
self._WHERE_UPDATE_LOG, self._WHERE_OMAHA_LOG))
if 'version' in mismatched_attrs:
return ('The DUT rebooted after the update but reports a different '
'image version than the one expected. This probably means '
'that the payload we applied was incorrect or corrupt.')
if 'previous_version' in mismatched_attrs:
return ('The DUT rebooted after the update and reports the '
'expected version. However, it reports a previous version '
'that is different from the one previously reported. This '
'is most likely a bug in update engine; check the %s.' %
self._WHERE_UPDATE_LOG)
return 'A test bug occurred; inspect the test log.'
def _timeout_err(self, desc, timeout, event_type=None):
if event_type is not None:
desc += ' (%s)' % EVENT_TYPE_DICT[event_type]
return ('Failed to receive %s within %d seconds. This could be a '
'problem with the updater or a connectivity issue. For more '
'details, check the %s.' %
(desc, timeout, self._WHERE_UPDATE_LOG))
def run_update_test(self, test_platform, test_conf):
"""Runs the actual update test once preconditions are met.
@param test_platform: TestPlatform implementation.
@param test_conf: A dictionary containing test configuration values
@raises ExpectedUpdateEventChainFailed if we failed to verify an update
event.
"""
# Record the active root partition.
source_active_slot = test_platform.get_active_slot()
logging.info('Source active slot: %s', source_active_slot)
source_release = test_conf['source_release']
target_release = test_conf['target_release']
# TODO(dhaddock): Reuse update_engine_performance_monitor
# script with chromite autoupdater. Can't use it here anymore because
# the DUT is restarted a bunch of times during the update and the
# process is killed before we can get the results back.
try:
# Update the DUT to the target image.
pid = test_platform.trigger_update(test_conf['target_payload_uri'])
# Verify the host log that was returned from the update.
file_url = self._get_hostlog_file('devserver_hostlog_rootfs', pid)
logging.info('Checking update steps with devserver hostlog file: '
'%s' % file_url)
log_verifier = UpdateEventLogVerifier(file_url)
# Verify chain of events in a successful update process.
chain = ExpectedUpdateEventChain()
chain.add_event(
ExpectedUpdateEvent(
version=source_release,
on_error=self._error_initial_check),
self._WAIT_FOR_INITIAL_UPDATE_CHECK_SECONDS,
on_timeout=self._timeout_err(
'an initial update check',
self._WAIT_FOR_INITIAL_UPDATE_CHECK_SECONDS))
chain.add_event(
ExpectedUpdateEvent(
event_type=EVENT_TYPE_DOWNLOAD_STARTED,
event_result=EVENT_RESULT_SUCCESS,
version=source_release,
on_error=self._error_download_started),
self._WAIT_FOR_DOWNLOAD_STARTED_SECONDS,
on_timeout=self._timeout_err(
'a download started notification',
self._WAIT_FOR_DOWNLOAD_STARTED_SECONDS,
event_type=EVENT_TYPE_DOWNLOAD_STARTED))
chain.add_event(
ExpectedUpdateEvent(
event_type=EVENT_TYPE_DOWNLOAD_FINISHED,
event_result=EVENT_RESULT_SUCCESS,
version=source_release,
on_error=self._error_download_finished),
self._WAIT_FOR_DOWNLOAD_COMPLETED_SECONDS,
on_timeout=self._timeout_err(
'a download finished notification',
self._WAIT_FOR_DOWNLOAD_COMPLETED_SECONDS,
event_type=EVENT_TYPE_DOWNLOAD_FINISHED))
chain.add_event(
ExpectedUpdateEvent(
event_type=EVENT_TYPE_UPDATE_COMPLETE,
event_result=EVENT_RESULT_SUCCESS,
version=source_release,
on_error=self._error_update_complete),
self._WAIT_FOR_UPDATE_COMPLETED_SECONDS,
on_timeout=self._timeout_err(
'an update complete notification',
self._WAIT_FOR_UPDATE_COMPLETED_SECONDS,
event_type=EVENT_TYPE_UPDATE_COMPLETE))
log_verifier.verify_expected_events_chain(chain)
except:
logging.fatal('ERROR: Failure occurred during the target update.')
raise
if test_platform.oobe_triggers_update():
# If DUT automatically checks for update during OOBE,
# checking the post-update CrOS version and slot is sufficient.
# This command checks the OS version.
# The slot is checked a little later, after the else block.
logging.info('Skipping post reboot update check.')
test_platform.verify_version(target_release)
else:
# Observe post-reboot update check, which should indicate that the
# image version has been updated.
# Verify the host log that was returned from the update.
file_url = self._get_hostlog_file('devserver_hostlog_reboot', pid)
logging.info('Checking post-reboot devserver hostlogs: %s' %
file_url)
log_verifier = UpdateEventLogVerifier(file_url)
chain = ExpectedUpdateEventChain()
expected_events = [
ExpectedUpdateEvent(
event_type=EVENT_TYPE_UPDATE_COMPLETE,
event_result=EVENT_RESULT_SUCCESS_REBOOT,
version=target_release,
previous_version=source_release,
on_error=self._error_reboot_after_update),
# Newer versions send a "rebooted_after_update" message
# after reboot with the previous version instead of another
# "update_complete".
ExpectedUpdateEvent(
event_type=EVENT_TYPE_REBOOTED_AFTER_UPDATE,
event_result=EVENT_RESULT_SUCCESS,
version=target_release,
previous_version=source_release,
on_error=self._error_reboot_after_update),
]
chain.add_event(
expected_events,
self._WAIT_FOR_UPDATE_CHECK_AFTER_REBOOT_SECONDS,
on_timeout=self._timeout_err(
'a successful reboot notification',
self._WAIT_FOR_UPDATE_CHECK_AFTER_REBOOT_SECONDS,
event_type=EVENT_TYPE_UPDATE_COMPLETE))
log_verifier.verify_expected_events_chain(chain)
# Make sure we're using a different slot after the update.
target_active_slot = test_platform.get_active_slot()
if target_active_slot == source_active_slot:
err_msg = 'The active image slot did not change after the update.'
if None in (source_release, target_release):
err_msg += (' The DUT likely rebooted into the old image, which '
'probably means that the payload we applied was '
'corrupt. But since we did not check the source '
'and/or target version we cannot say for sure.')
elif source_release == target_release:
err_msg += (' Given that the source and target versions are '
'identical, the DUT likely rebooted into the old '
'image. This probably means that the payload we '
'applied was corrupt.')
else:
err_msg += (' This is strange since the DUT reported the '
'correct target version. This is probably a system '
'bug; check the DUT system log.')
raise error.TestFail(err_msg)
logging.info('Target active slot changed as expected: %s',
target_active_slot)
logging.info('Update successful, test completed')
def run_once(self, host, test_conf):
"""Performs a complete auto update test.
@param host: a host object representing the DUT
@param test_conf: a dictionary containing test configuration values
@raise error.TestError if anything went wrong with setting up the test;
error.TestFail if any part of the test has failed.
"""
self._host = host
logging.debug('The test configuration supplied: %s', test_conf)
# Find a devserver to use. We first try to pick a devserver with the
# least load. In case all devservers' load are higher than threshold,
# fall back to the old behavior by picking a devserver based on the
# payload URI, with which ImageServer.resolve will return a random
# devserver based on the hash of the URI.
# The picked devserver needs to respect the location of the host if
# 'prefer_local_devserver' is set to True or 'restricted_subnets' is
# set.
hostname = self._host.hostname if self._host else None
least_loaded_devserver = dev_server.get_least_loaded_devserver(
hostname=hostname)
if least_loaded_devserver:
logging.debug('Choosing the least loaded devserver: %s',
least_loaded_devserver)
autotest_devserver = dev_server.ImageServer(least_loaded_devserver)
else:
logging.warning('No devserver meets the maximum load requirement. '
'Picking a random devserver to use.')
autotest_devserver = dev_server.ImageServer.resolve(
test_conf['target_payload_uri'], host.hostname)
devserver_hostname = urlparse.urlparse(
autotest_devserver.url()).hostname
logging.info('Devserver chosen for this run: %s', devserver_hostname)
# Obtain a test platform implementation.
test_platform = TestPlatform.create(host)
test_platform.initialize(autotest_devserver, self.job.resultdir)
# Stage source images and update payloads onto the devserver.
staged_urls = test_platform.prep_artifacts(test_conf)
self._source_image_installed = bool(staged_urls.source_url)
# Prepare the DUT (install source version etc).
test_platform.prep_device_for_update(test_conf['source_payload_uri'])
# Start the update.
try:
self.run_update_test(test_platform, test_conf)
except ExpectedUpdateEventChainFailed:
self._dump_update_engine_log(test_platform)
raise
test_platform.check_device_after_update()