blob: bee54753cc6889152cb2ab2b7c2707a19201136e [file] [log] [blame]
# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import datetime
import json
import logging
import os
import re
import requests
import shutil
import time
import urlparse
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib import utils
from autotest_lib.client.cros.update_engine import update_engine_event
_DEFAULT_COPY = shutil.copy
class UpdateEngineUtil(object):
"""Utility code shared between client and server update_engine autotests"""
# Update engine status lines.
# Update engine statuses.
# Files used by the tests.
_UPDATE_ENGINE_LOG = '/var/log/update_engine.log'
_UPDATE_ENGINE_LOG_DIR = '/var/log/update_engine/'
_CUSTOM_LSB_RELEASE = '/mnt/stateful_partition/etc/lsb-release'
_UPDATE_ENGINE_PREFS_DIR = '/var/lib/update_engine/prefs/'
# Update engine prefs
_UPDATE_CHECK_RESPONSE_HASH = 'update-check-response-hash'
# Public key used to force update_engine to verify omaha response data on
# test images.
def __init__(self, run_func=_DEFAULT_RUN, get_file=_DEFAULT_COPY):
Initialize this class.
@param run_func: the function to use to run commands on the client.
Defaults for use by client tests, but can be
overwritten to run remotely from a server test.
@param get_file: the function to use when copying a file. Defaults
for use by client tests. Called with
(file, destination) syntax.
self._create_update_engine_variables(run_func, get_file)
def _create_update_engine_variables(self, run_func=_DEFAULT_RUN,
"""See __init__()."""
self._run = run_func
self._get_file = get_file
def _wait_for_progress(self, progress):
Waits until we reach the percentage passed as the input.
@param progress: The progress [0.0 .. 1.0] we want to wait for.
while True:
completed = self._get_update_progress()
logging.debug('Checking if %s is greater than %s', completed,
if completed >= progress:
def _is_update_started(self):
"""Checks if the update has started."""
status = self._get_update_engine_status()
if status is None:
return False
return status[self._CURRENT_OP] in (
def _has_progress_stopped(self):
"""Checks that the update_engine progress has stopped moving."""
before = self._get_update_engine_status()[self._PROGRESS]
for i in range(0, 10):
if before != self._get_update_engine_status()[self._PROGRESS]:
return False
return True
def _get_update_progress(self):
"""Returns the current payload downloaded progress."""
while True:
status = self._get_update_engine_status()
if not status:
if self._is_update_engine_idle(status):
err_str = self._get_last_error_string()
raise error.TestFail('Update status was idle while trying to '
'get download status. Last error: %s' %
if self._is_update_engine_reporting_error(status):
err_str = self._get_last_error_string()
raise error.TestFail('Update status reported error: %s' %
# When the update has moved to the final stages, update engine
# displays progress as 0.0 but for our needs we will return 1.0
if self._is_update_finished_downloading(status):
return 1.0
# If we call this right after reboot it may not be downloading yet.
return float(status[self._PROGRESS])
def _wait_for_update_to_fail(self):
"""Waits for the update to retry until failure."""
timeout_minutes = 20
timeout = time.time() + 60 * timeout_minutes
while True:
if self._check_update_engine_log_for_entry('Reached max attempts ',
logging.debug('Found log entry for failed update.')
if self._is_update_engine_idle():
if time.time() > timeout:
raise error.TestFail('Update did not fail as expected. Timeout'
': %d minutes.' % timeout_minutes)
def _wait_for_update_to_complete(self, finalizing_ok=False):
Checks if the update has completed.
@param finalizing_ok: FINALIZING status counts as complete.
if finalizing_ok:
while True:
status = self._get_update_engine_status()
# During reboot, status will be None
if status is not None:
if self._is_update_engine_reporting_error(status):
err_str = self._get_last_error_string()
raise error.TestFail('Update status reported error: %s' %
if self._is_update_engine_idle(status):
err_str = self._get_last_error_string()
raise error.TestFail('Update status was unexpectedly '
'IDLE when we were waiting for the '
'update to complete: %s' % err_str)
if status[self._CURRENT_OP] in statuses:
def _get_update_engine_status(self, timeout=3600, ignore_timeout=True):
Gets a dictionary version of update_engine_client --status.
@param timeout: How long to wait for the status to return.
@param ignore_timeout: True to throw an exception if timeout occurs.
@return Dictionary of values within update_engine_client --status.
@raise: error.AutoservError if command times out
status = self._run('update_engine_client --status', timeout=timeout,
ignore_status=True, ignore_timeout=ignore_timeout)
if status is None:
return None
if status.exit_status != 0:
return None
status_dict = {}
for line in status.stdout.splitlines():
entry = line.partition('=')
status_dict[entry[0]] = entry[2]
return status_dict
def _check_update_engine_log_for_entry(self, entry, raise_error=False,
Checks for entries in the update_engine log.
@param entry: The line to search for.
@param raise_error: Fails tests if log doesn't contain entry.
@param err_str: The error string to raise if we cannot find entry.
@param update_engine_log: Update engine log string you want to
search. If None, we will read from the
current update engine log.
@return Boolean if the update engine log contains the entry.
if isinstance(entry, str):
# Create a tuple of strings so we can itarete over it.
entry = (entry,)
if not update_engine_log:
update_engine_log = self._run(
'cat %s' % self._UPDATE_ENGINE_LOG).stdout
if all(msg in update_engine_log for msg in entry):
return True
if not raise_error:
return False
error_str = ('Did not find expected string(s) in update_engine log: '
'%s' % entry)
raise error.TestFail(err_str if err_str else error_str)
def _is_update_finished_downloading(self, status=None):
Checks if the update has moved to the final stages.
@param status: Output of _get_update_engine_status(). If None that
function will be called here first.
if status is None:
status = self._get_update_engine_status()
return status[self._CURRENT_OP] in [
def _is_update_engine_idle(self, status=None):
Checks if the update engine is idle.
@param status: Output of _get_update_engine_status(). If None that
function will be called here first.
if status is None:
status = self._get_update_engine_status()
return status[self._CURRENT_OP] == self._UPDATE_STATUS_IDLE
def _is_checking_for_update(self, status=None):
Checks if the update status is still checking for an update.
@param status: Output of _get_update_engine_status(). If None that
function will be called here first.
if status is None:
status = self._get_update_engine_status()
return status[self._CURRENT_OP] in (
def _is_update_engine_reporting_error(self, status=None):
Checks if the update engine status reported an error.
@param status: Output of _get_update_engine_status(). If None that
function will be called here first.
if status is None:
status = self._get_update_engine_status()
return (status[self._CURRENT_OP] ==
def _update_continued_where_it_left_off(self, progress,
Checks that the update did not restart after an interruption.
When testing a reboot interrupt we can do additional checks on the
logs before and after reboot to see if the update resumed.
@param progress: The progress the last time we checked.
@param reboot_interrupt: True if we are doing a reboot interrupt test.
@returns True if update continued. False if update restarted.
completed = self._get_update_progress()'New value: %f, old value: %f', completed, progress)
if completed >= progress:
return True
# Sometimes update_engine will continue an update but the first reported
# progress won't be correct. So check the logs for resume info.
if not reboot_interrupt or not self._check_update_engine_log_for_entry(
'Resuming an update that was previously started'):
return False
# Find the reported Completed and Resumed progress.
pattern = ('(.*)/(.*) operations \((.*)%\), (.*)/(.*) bytes downloaded'
' \((.*)%\), overall progress (.*)%')
before_pattern = 'Completed %s' % pattern
before_log = self._get_update_engine_log(r_index=1)
before_match = re.findall(before_pattern, before_log)[-1]
after_pattern = 'Resuming after %s' % pattern
after_log = self._get_update_engine_log(r_index=0)
after_match = re.findall(after_pattern, after_log)[0]
logging.debug('Progress before interrupt: %s', before_match)
logging.debug('Progress after interrupt: %s', after_match)
# Check the Resuming progress is greater than Completed progress.
for i in range(0, len(before_match)):
logging.debug('Comparing %d and %d', int(before_match[i]),
if int(before_match[i]) > int(after_match[i]):
return False
return True
def _get_payload_properties_file(self, payload_url, target_dir, **kwargs):
Downloads the payload properties file into a directory.
@param payload_url: The URL to the update payload file.
@param target_dir: The directory to download the file into.
@param kwargs: A dictionary of key/values that needs to be overridden on
the payload properties file.
payload_props_url = payload_url + '.json'
_, _, file_name = payload_props_url.rpartition('/')
response = json.loads(requests.get(payload_props_url).text)
# Override existing keys if any.
for k, v in kwargs.iteritems():
# Don't set default None values. We don't want to override good
# values to None.
if v is not None:
response[k] = v
with open(os.path.join(target_dir, file_name), 'w') as fp:
json.dump(response, fp)
except (requests.exceptions.RequestException,
ValueError) as err:
raise error.TestError(
'Failed to get update payload properties: %s with error: %s' %
(payload_props_url, err))
def _append_query_to_url(self, url, query_dict):
Appends the dictionary kwargs to the URL url as query string.
This function will replace the already existing query strings in url
with the ones in the input dictionary. I also removes keys that have
a None value.
@param url: The given input URL.
@param query_dicl: A dictionary of key/values to be converted to query
@return: The same input URL url but with new query string items added.
# TODO(ahassani): This doesn't work (or maybe should not) for queries
# with multiple values for a specific key.
parsed_url = list(urlparse.urlsplit(url))
parsed_query = urlparse.parse_qs(parsed_url[3])
for k, v in query_dict.items():
parsed_query[k] = [v]
parsed_url[3] = '&'.join(
'%s=%s' % (k, v[0]) for k, v in parsed_query.items()
if v[0] is not None)
return urlparse.urlunsplit(parsed_url)
def _check_for_update(self, update_url, interactive=True,
ignore_status=False, wait_for_completion=False,
Starts a background update check.
@param update_url: The URL to get an update from.
@param interactive: True if we are doing an interactive update.
@param ignore_status: True if we should ignore exceptions thrown.
@param wait_for_completion: True for --update, False for
@param kwargs: The dictionary to be converted to a query string and
appended to the end of the update URL. e.g:
{'critical_update': True, 'foo': 'bar'} ->
'http:/' Look
at or for the list of accepted
values. If there was already query string in update_url, it will
append the new values and override the old ones.
update_url = self._append_query_to_url(update_url, kwargs)
cmd = ['update_engine_client',
'--update' if wait_for_completion else '--check_for_update',
'--omaha_url=%s' % update_url]
if not interactive:
self._run(cmd, ignore_status=ignore_status)
def _save_extra_update_engine_logs(self, number_of_logs):
Get the last X number of update_engine logs on the DUT.
@param number_of_logs: The number of logs to save.
files = self._get_update_engine_logs()
for i in range(number_of_logs if number_of_logs <= len(files) else
file = os.path.join(self._UPDATE_ENGINE_LOG_DIR, files[i])
self._get_file(file, self.resultsdir)
def _get_update_engine_logs(self, timeout=3600, ignore_timeout=True):
Helper function to return the list of files in /var/log/update_engine/.
@param timeout: How many seconds to wait for command to complete.
@param ignore_timeout: True if we should not throw an error on timeout.
cmd = ['ls', '-t', '-1', self._UPDATE_ENGINE_LOG_DIR]
return self._run(cmd, timeout=timeout,
def _get_update_engine_log(self, r_index=0, timeout=3600,
Returns the last r_index'th update_engine log.
@param r_index: The index of the last r_index'th update_engine log
in order they were created. For example:
0 -> last one.
1 -> second to last one.
@param timeout: How many seconds to wait for command to complete.
@param ignore_timeout: True if we should not throw an error on timeout.
files = self._get_update_engine_logs()
return self._run('cat %s' % os.path.join(self._UPDATE_ENGINE_LOG_DIR,
def _create_custom_lsb_release(self, update_url, build='', **kwargs):
Create a custom lsb-release file.
In order to tell OOBE to ping a different update server than the
default we need to create our own lsb-release. We will include a
deserver update URL.
@param update_url: String of url to use for update check.
@param build: String of the build number to use. Represents the
Chrome OS build this device thinks it is on.
@param kwargs: A dictionary of key/values to be made into a query string
and appended to the update_url
update_url = self._append_query_to_url(update_url, kwargs)
self._run('mkdir %s' % os.path.dirname(self._CUSTOM_LSB_RELEASE),
self._run('touch %s' % self._CUSTOM_LSB_RELEASE)
self._run('echo "CHROMEOS_RELEASE_VERSION=%s" > %s' %
(build, self._CUSTOM_LSB_RELEASE))
self._run('echo "CHROMEOS_AUSERVER=%s" >> %s' %
(update_url, self._CUSTOM_LSB_RELEASE))
def _clear_custom_lsb_release(self):
Delete the custom release file, if any.
Intended to clear work done by _create_custom_lsb_release().
self._run('rm %s' % self._CUSTOM_LSB_RELEASE, ignore_status=True)
def _get_update_requests(self):
Get the contents of all the update requests from the most recent log.
@returns: a sequential list of <request> xml blocks or None if none.
update_log = self._get_update_engine_log()
# Matches <request ... /request>. The match can be on multiple
# lines and the search is not greedy so it only matches one block.
return re.findall(r'<request.*?/request>', update_log, re.DOTALL)
def _get_time_of_last_update_request(self):
Get the time of the last update request from most recent logfile.
@returns: seconds since epoch of when last update request happened
(second accuracy), or None if no such timestamp exists.
update_log = self._get_update_engine_log()
# Matches any single line with "MMDD/HHMMSS ... Request ... xml", e.g.
# "[0723/] Request: <?xml".
result = re.findall(r'([0-9]{4}/[0-9]{6}).*Request.*xml', update_log)
if not result:
return None
match = result[-1]
# The log does not include the year, so set it as this year.
# This assumption could cause incorrect behavior, but is unlikely to.
current_year =
log_datetime = datetime.datetime.strptime(match, LOG_TIMESTAMP_FORMAT)
log_datetime = log_datetime.replace(year=current_year)
return time.mktime(log_datetime.timetuple())
def _take_screenshot(self, filename):
Take a screenshot and save in resultsdir.
@param filename: The name of the file to save
file_location = os.path.join('/tmp', filename)
self._run('screenshot %s' % file_location)
self._get_file(file_location, self.resultsdir)
except (error.AutoservRunError, error.CmdError):
logging.exception('Failed to take screenshot.')
def _get_last_error_string(self):
Gets the last error message in the update engine log.
@returns: The error message.
err_str = 'Updating payload state for error code: '
log = self._get_update_engine_log().splitlines()
targets = [line for line in log if err_str in line]
logging.debug('Error lines found: %s', targets)
if not targets:
return None
return targets[-1].rpartition(err_str)[2]
def _get_latest_initial_request(self):
Return the most recent initial update request.
AU requests occur in a chain of messages back and forth, e.g. the
initial request for an update -> the reply with the update -> the
report that install has started -> report that install has finished,
etc. This function finds the first request in the latest such chain.
This message has no eventtype listed, or is rebooted_after_update
type (as an artifact from a previous update since this one).
Subsequent messages in the chain have different eventtype values.
@returns: string of the entire update request or None.
requests = self._get_update_requests()
if not requests:
return None
MATCH_STR = r'eventtype="(.*?)"'
for i in xrange(len(requests) - 1, -1, -1):
search =, requests[i])
if (not search or
( ==
return requests[i]
return None