# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Module that contains the interface for au_test_harness workers.
An au test harnss worker is a class that contains the logic for performing
and validating updates on a target. This should be subclassed to handle
various types of target. Types of targets include VM's, real devices, etc.
import inspect
import os
from chromite.lib import cros_build_lib
from chromite.lib import cros_logging as logging
from chromite.lib import dev_server_wrapper
from chromite.lib import path_util
from crostestutils.au_test_harness import update_exception
class AUWorker(object):
"""Interface for a worker that updates and verifies images."""
# Mapping between cached payloads to directory locations.
update_cache = None
# --- INTERFACE ---
def __init__(self, options, test_results_root):
"""Processes options for the specific-type of worker."""
self.board = options.board
self.test_results_root = test_results_root
self.all_results_root = os.path.join(test_results_root, 'all')
self.fail_results_root = os.path.join(test_results_root, 'failed')
self.use_delta_updates =
self.verbose = options.verbose
self.vm_image_path = None
if options.quick_test:
self.verify_suite = 'build_RootFilesystemSize'
self.verify_suite = 'suite:%s' % (options.verify_suite_name or 'smoke')
self.ssh_private_key = options.ssh_private_key
def CleanUp(self):
"""Called at the end of every test."""
def GetUpdateMessage(self, update_target, update_base, from_vm, proxy):
"""Returns the update message that should be printed out for this update."""
if update_base:
msg = 'Performing a delta update from %s to %s' % (
update_base, update_target)
msg = 'Performing a full update to %s' % update_target
if from_vm: msg += ' in a VM'
if proxy: msg += ' using a proxy on port ' + str(proxy)
return msg
def PrepareBase(self, image_path, signed_base):
"""Method to be called to prepare target for testing this test.
Subclasses must override this method with the correct procedure for
preparing the test target.
Returns the path to the base image (might have changed for vm's).
` Args:
image_path: The image that should reside on the target before the test.
signed_base: If True, use the signed image rather than the actual image.
def UpdateImage(self, image_path, src_image_path='', stateful_change='old',
proxy_port=None, payload_signing_key=None):
"""Implementation of an actual update.
Subclasses must override this method with the correct update procedure for
the class.
See PerformUpdate for description of args.
def UpdateUsingPayload(self, update_path, stateful_change='old',
"""Updates target with the pre-generated update stored in update_path.
Subclasses must override this method with the correct update procedure for
the class.
update_path: Path to the image to update with. This directory should
contain both update.gz, and stateful.image.gz
stateful_change: How to perform the stateful update.
proxy_port: Port to have the client connect to. For use with
def VerifyImage(self, unittest, percent_required_to_pass=100, test=''):
"""Verifies the image with tests.
Verifies that the test images passes the percent required. Subclasses must
override this method with the correct update procedure for the class.
unittest: pointer to a unittest to fail if we cannot verify the image.
percent_required_to_pass: percentage required to pass. This should be
fall between 0-100.
test: test that will be used to verify the image. If omitted or equal to
the empty string the code will use self.verify_suite.
Returns the percent that passed.
def PerformUpdate(self, image_path, src_image_path='', stateful_change='old',
proxy_port=None, payload_signing_key=None):
"""Performs an update using _UpdateImage and reports any error.
Subclasses should not override this method but override _UpdateImage
image_path: Path to the image to update with. This image must be a test
src_image_path: Optional. If set, perform a delta update using the
image specified by the path as the source image.
stateful_change: How to modify the stateful partition. Values are:
'old': Don't modify stateful partition. Just update normally.
'clean': Uses clobber-state to wipe the stateful partition with the
exception of code needed for ssh.
proxy_port: Port to have the client connect to. For use with
payload_signing_key: Path to the private key to use to sign payloads.
Raises an update_exception.UpdateException if _UpdateImage returns an error.
if not self.use_delta_updates: src_image_path = ''
key_to_use = payload_signing_key
self.UpdateImage(image_path, src_image_path, stateful_change, proxy_port,
def SetUpdateCache(cls, update_cache):
"""Sets the global update cache for getting paths to devserver payloads."""
cls.update_cache = update_cache
def PrepareRealBase(self, image_path, signed_base):
"""Prepares a remote device for worker test by updating it to the image."""
real_image_path = image_path
if not signed_base:
real_image_path = real_image_path + '.signed'
return real_image_path
def PrepareVMBase(self, image_path, signed_base):
"""Prepares a VM image for worker test."""
# Tells the VM tests to use the Qemu image as the start point.
self.vm_image_path = os.path.join(os.path.dirname(image_path),
if signed_base:
self.vm_image_path = self.vm_image_path + '.signed'
return self.vm_image_path
def GetStatefulChangeFlag(self, stateful_change):
"""Returns the flag to pass to image_to_vm for the stateful change."""
stateful_change_flag = ''
if stateful_change:
stateful_change_flag = '--stateful_update_flag=%s' % stateful_change
return stateful_change_flag
def AppendUpdateFlags(self, cmd, image_path, src_image_path, proxy_port,
payload_signing_key, for_vm=False):
"""Appends common args to an update cmd defined by an array.
Modifies cmd in places by appending appropriate items given args.
See PerformUpdate for description of args.
for_vm: Additional optional argument to say that the payload is intended
for vm usage (so we don't patch the kernel).
if proxy_port: cmd.append('--proxy_port=%s' % proxy_port)
update_id = dev_server_wrapper.GenerateUpdateId(
image_path, src_image_path, payload_signing_key,
cache_path = self.update_cache.get(update_id)
if cache_path:
update_url = dev_server_wrapper.DevServerWrapper.GetDevServerURL(
port=proxy_port, sub_dir=cache_path)
cmd.append('--update_url=%s' % update_url)
raise update_exception.UpdateException(
1, 'No payload found for %s' % update_id)
def RunUpdateCmd(self, cmd, log_directory=None):
"""Runs the given update cmd given verbose options.
Raises an update_exception.UpdateException if the update fails.
cmd: The shell cmd to run.
log_directory: Where to store the logs for this cmd.
kwds = dict(print_cmd=False, combine_stdout_stderr=True, error_code_ok=True)
if not self.verbose:
kwds['redirect_stdout'] = kwds['redirect_stderr'] = True
if log_directory:
kwds['log_stdout_to_file'] = os.path.join(log_directory, 'update.log')
result = cros_build_lib.RunCommand(cmd, **kwds)
if result.returncode != 0:
raise update_exception.UpdateException(result.returncode, 'Update failed')
def AssertEnoughTestsPassed(self, unittest, output, percent_required_to_pass):
"""Helper function that asserts a sufficient number of tests passed.
unittest: the unittest object running this test.
output: stdout from a test run.
percent_required_to_pass: percentage required to pass. This should be
fall between 0-100.
percent that passed.
percent_passed = self.ParseGeneratedTestOutput(output)
self.TestInfo('Percent passed: %d vs. Percent required: %d' % (
percent_passed, percent_required_to_pass))
if percent_passed < percent_required_to_pass:
print output'%d percent of tests are required to pass' %
return percent_passed
def TestInfo(self, message):'%s: %s', self.test_name, message)
def Initialize(self, port):
"""Initializes test specific variables for each test.
Each test needs to specify a unique ssh port.
port: Unique port for ssh access.
# Initialize port vars.
self._ssh_port = port
self._kvm_pid_file = '/tmp/kvm.%d' % port
# Initialize test results directory.
self.test_name = inspect.stack()[1][3]
self.all_results_directory = os.path.join(self.all_results_root,
self.fail_results_directory = os.path.join(self.fail_results_root,
self.results_count = 0
def GetNextResultsPath(self, label):
"""Returns a tuple results directories to use for this label.
Prefixes directory returned for worker with time called i.e. 1_label,
2_label, etc. The directory returned is outside the chroot so if passing
to an script that is called with enther_chroot, make sure to use
path_util.ToChrootPath(). The first dir returned is the one where results
should be stored. The second is one where failed test results should be
stored. Only the former is created as the latter should only be created if
the test fails.
label: The label used to describe this test phase.
Returns a path for the results directory to use for this label.
self.results_count += 1
results_dir = os.path.join(self.all_results_directory, '%s_%s' % (
self.results_count, label))
fail_dir = os.path.join(self.fail_results_directory, '%s_%s' % (
self.results_count, label))
if not os.path.exists(results_dir):
return results_dir, fail_dir
def ParseGeneratedTestOutput(self, output):
"""Returns the percentage of tests that passed based on output.
output: Output string for
The percentage of tests that passed.
percent_passed = 0
lines = output.split('\n')
for line in lines:
if line.startswith('Total PASS:'):
# FORMAT: ^TOTAL PASS: num_passed/num_total (percent%)$
percent_passed = line.split()[3].strip('()%')
return int(percent_passed)