blob: dab933bf783e9550008f898598ecb020778d9597 [file] [log] [blame]
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Module containing a class that implements an au_worker for GCE instances.
By default GCEAUWorker creates a GCE instance with 'Default Instance Properties'
(detailed below), and runs the gce-smoke suite to verify an image. However it
allows customized test/suite list and instance properties, through an overlay
specific JSON file.
Default Instance Properties:
project: constants.GCE_PROJECT
zone: constants.GCE_DEFAULT_ZONE
machine_type: n1-standard-8
network: constants.GCE_DEFAULT_NETWORK
other properties: GCE default.
https://cloud.google.com/compute/docs/reference/latest/instances/insert
To run tests/suites other than the gce-smoke suite, and to specify the instance
properties, add gce_tests.json under <overlay>/scripts. Refer to _LoadTests for
the exact requirement of this file, but here is a short example:
{
"tests": [
{
"name": "suite:suite1",
"flags": {
"metadata": {
"items": [
{
"key": "key1",
"value": "value1"
}
]
}
}
},
{
"name": "foo_Test",
"flags": {}
}
]
}
"flags" must strictly follow the schema of the Instance Resource
(https://cloud.google.com/compute/docs/reference/latest/instances#resource).
GCEAUWorker respects most of the properties except instance name, boot_disk,
network and zone. The enforced values of these special properties are:
instance_name: managed name
boot_disk: a disk with the image being verified
network: the network that has required firewall set up
zone: project selected default zone
Some of the properties of the Instance Resource are set by the GCE
backend so trying to set them at the client may result in noops or GCE errors,
which will be wrapped into an UpdateException.
Note that some properties like 'disks' that depend on the existence of other
resources are not supported yet.
"""
from __future__ import print_function
import datetime
import json
import os
import shutil
import tempfile
import time
from functools import partial
from multiprocessing import Process
from chromite.lib import cros_build_lib
from chromite.lib import cros_logging as logging
from chromite.lib import gce
from chromite.lib import gs
from chromite.lib import parallel
from chromite.lib import path_util
from chromite.lib import portage_util
from crostestutils.au_test_harness import au_worker
from crostestutils.au_test_harness import constants
class GCEAUWorker(au_worker.AUWorker):
"""Test harness for updating GCE instances.
Attributes:
gce_context: An utility for GCE operations.
gscontext: An utility for GCS operations.
network: Default network to create instances in.
machine_type: Default machine type to create instances with.
gcs_bucket: The GCS bucket to upload image tarballs to.
tarball_local: Local path to the tarball of test image.
tarball_remote: GCS path to the tarball of test image.
image: A single GCE image associated with a worker.
image_link: The URL to the image created.
instances: GCE VM instances associated with a worker.
"""
_GS_PATH_COMMON_PREFIX = 'gs://'
_GS_URL_COMMON_PREFIX = 'https://storage.googleapis.com/'
_IMAGE_PREFIX = 'test-'
_INSTANCE_PREFIX = 'test-'
_TEST_REPORT_FILENAME = 'test_report.log'
def __init__(self, options, test_results_root,
project=constants.GCE_PROJECT,
zone=constants.GCE_DEFAULT_ZONE,
network=constants.GCE_DEFAULT_NETWORK,
machine_type=constants.GCE_DEFAULT_MACHINE_TYPE,
json_key_file=constants.GCE_JSON_KEY,
gcs_bucket=constants.GCS_BUCKET):
"""Processes GCE-specific options."""
super(GCEAUWorker, self).__init__(options, test_results_root)
self.gce_context = gce.GceContext.ForServiceAccountThreadSafe(
project, zone, json_key_file=json_key_file)
self.json_key_file = json_key_file
self.gscontext = gs.GSContext()
self.network = network
self.machine_type = machine_type
self.gcs_bucket = gcs_bucket
self.tarball_local = None
self.tarball_remote = None
self.image = None
self.image_link = None
# One instance per test.
self.instances = {}
# Background processes that delete throw-away instances.
self._bg_delete_processes = []
# Load test specifications from <overlay>/scripts/gce_tests.json, if any.
self._LoadTests()
def CleanUp(self):
"""Deletes throw-away instances and images."""
logging.info('Waiting for GCP resources to be deleted.')
self._WaitForBackgroundDeleteProcesses()
self._DeleteExistingResources()
logging.info('All resources are deleted.')
def PrepareBase(self, image_path, signed_base=False):
"""Auto-update to base image to prepare for test."""
return self.PrepareRealBase(image_path, signed_base)
def UpdateImage(self, image_path, src_image_path='', stateful_change='old',
proxy_port=None, payload_signing_key=None):
"""Updates the image on all GCE instances.
There may be multiple instances created with different gcloud flags that
will be used by different tests or suites.
Unlike vm_au_worker or real_au_worker, UpdateImage always creates a new
image and a new instance.
"""
# Delete existing resources in the background if any.
bg_delete = Process(target=self._DeleteExistingResources)
bg_delete.start()
self._bg_delete_processes.append(bg_delete)
log_directory, fail_directory = self.GetNextResultsPath('update')
# Create an image and instances.
try:
self._CreateImage(image_path)
self._CreateInstances()
except:
self._HandleFail(log_directory, fail_directory)
raise
def VerifyImage(self, unittest, percent_required_to_pass=100, test=''):
"""Verifies the image by running all the required tests.
Run the test targets as specified in <overlay>/scripts/gce_gce_tests.json or
the default 'gce-smoke' suite if none. Multiple test targets are run in
parallel. Test results are joined and printed after all tests finish. Note
that a dedicated instance has been created for each test target.
Args:
unittest: (unittest.TestCase) The test case to report results back to.
percent_required_to_pass: (int) The required minimum pass rate. Not used.
test: (str) The specific test to run. Not used.
Returns:
True if all tests pass, or False otherwise.
"""
log_directory_base, fail_directory_base = self.GetNextResultsPath(
'autotest_tests')
steps = []
for test in self.tests:
remote = self.gce_context.GetInstanceIP(self.instances[test['name']])
# Prefer partial to lambda because of Python's late binding.
steps.append(partial(self._RunTest, test['name'], remote,
log_directory_base, fail_directory_base))
try:
return_values = parallel.RunParallelSteps(steps, return_values=True)
except:
self._HandleFail(log_directory_base, fail_directory_base)
raise
passed = True
test_reports = {}
for test, percent_passed, report in return_values:
passed &= (percent_passed == 100)
test_reports[test] = report
if not passed:
self._HandleFail(log_directory_base, fail_directory_base)
print ('\nSome test(s) failed. Test reports:')
for test, report in test_reports.iteritems():
print ('\nTest: %s\n%s' % (test, report or ''))
if unittest is not None:
unittest.fail('Not all tests passed.')
return passed
# --- PRIVATE HELPER FUNCTIONS ---
def _RunTest(self, test, remote, log_directory_base, fail_directory_base):
"""Runs a test or a suite of tests on a given remote.
Runs a test target, whether an individual test or a suite of tests, with
'test_that'.
Args:
test: The test or suite to run.
remote: The hostname of the remote DUT.
log_directory_base: The base directory to store test logs. A sub directory
specific to this test will be created there.
fail_directory_base: The base directory to store test logs in case of a
test failure.
Returns:
test: Same as |test|. This is useful when the caller wants to correlate
results to the test name.
percent_passed: Pass rate.
test_report: Content of the test report generated by test_that.
"""
log_directory, _ = self._GetResultsDirectoryForTest(
test, log_directory_base, fail_directory_base)
log_directory_in_chroot = log_directory.rpartition('chroot')[2]
# Copy GCE key file in a temporary file inside the chroot and
# make sure to remove it before return.
with tempfile.NamedTemporaryFile(
dir=path_util.FromChrootPath('/tmp')) as gce_key_copy:
shutil.copy(self.json_key_file, gce_key_copy.name)
args = 'gce_project=%s gce_zone=%s gce_instance=%s gce_key_file=%s' % (
self.gce_context.project, self.gce_context.zone, self.instances[test],
path_util.ToChrootPath(gce_key_copy.name))
cmd = ['test_that', '-b', self.board, '--no-quickmerge',
'--results_dir=%s' % log_directory_in_chroot, remote, test,
'--args=%s' % args]
if self.ssh_private_key is not None:
cmd.append('--ssh_private_key=%s' %
path_util.ToChrootPath(self.ssh_private_key))
result = cros_build_lib.RunCommand(cmd, error_code_ok=True,
enter_chroot=True,
redirect_stdout=True,
cwd=constants.CROSUTILS_DIR)
percent_passed = self.ParseGeneratedTestOutput(result.output)
test_report = self._GetTestReport(log_directory)
# Returns the summarized test_report as it is more useful than the full
# output, plus the entire log will always be linked in the failure report.
return test, percent_passed, test_report
def _GetTestReport(self, results_path):
"""Returns the content of test_report.log created by test_that.
Args:
results_path: Path to the directory where results are saved.
Returns:
Content of test_report.log, or None if report is not found.
"""
report_path = os.path.join(results_path, self._TEST_REPORT_FILENAME)
if os.path.isfile(report_path):
with open(report_path) as f:
return f.read()
logging.warning('Test log not found in %s', results_path)
return None
def _GetResultsDirectoryForTest(self, test, log_directory_base,
fail_directory_base):
"""Gets the log and fail directories for a particular test.
Args:
test: The test or suite to get directories for.
log_directory_base: The base directory where all test results are saved.
fail_directory_base: The base directory where all test failures are
recorded.
"""
# Avoid using colon in file names. Not that it's not allowed, but it causes
# confusions and inconvenience as it is used as a separator in many cases,
# e.g., $PATH and url.
sanitized_test_name = test.replace(':', '_')
log_directory = os.path.join(log_directory_base, sanitized_test_name)
fail_directory = os.path.join(fail_directory_base, sanitized_test_name)
if not os.path.exists(log_directory):
os.makedirs(log_directory)
return log_directory, fail_directory
def _LoadTests(self):
"""Loads the tests to run from <overlay>/scripts/gce_tests.json.
If the JSON file exists, loads the tests and flags to create instance for
each test with. The JSON file should contain a "tests" object, which is an
array of objects, each of which has only two keys: "name" and "flags".
"name" could be any valid Autotest test name, or a suite name, in the form
of "suite:<suite_name>", e.g., "suite:gce-smoke".
"flags" is a JSON object whose members must be valid proterties of the GCE
Instance Resource, as specificed at:
https://cloud.google.com/compute/docs/reference/latest/instances#resource.
These flags will be used to create instances. Each flag must strictly follow
the property schema as defined in the Instance Resource. Failure to do so
will result in instance creation failures.
Note that a dedicated instance will be created for every test object
specified in scripts/gce_tests.json. So group test cases that require
similar instance properties together as suites whenever possible.
An example scripts/gce_tests.json may look like:
{
"tests": [
{
"name": "suite:gce-smoke",
"flags": []
},
{
"name": "suite:cloud-init",
"flags": {
"description": "Test instance",
"metadata": {
"items": [
{
"key": "fake_key",
"value": "fake_value"
}
]
}
}
}
]
}
If the JSON file does not exist, the 'gce-smoke' suite will be used to
verify the image.
"""
# Defaults to run the gce-smoke suite if no custom tests are given.
tests = [dict(name="suite:gce-smoke", flags=dict())]
custom_tests = None
try:
custom_tests = portage_util.ReadOverlayFile(
'scripts/gce_tests.json', board=self.board)
except portage_util.MissingOverlayException as e:
logging.warning('Board overlay not found. Error: %s', e)
if custom_tests is not None:
if self.board not in constants.TRUSTED_BOARDS:
logging.warning('Custom tests and flags are not allowed for this board '
'(%s)!', self.board)
else:
# Read the list of tests.
try:
json_file = json.loads(custom_tests)
tests = json_file.get('tests')
except ValueError as e:
logging.warning('scripts/gce_tests.json contains invalid JSON '
'content. Default tests will be run and default '
'flags will be used to create instances. Error: %s',
e)
self.tests = tests
def _CreateImage(self, image_path):
"""Uploads the gce tarball and creates an image with it."""
ts = datetime.datetime.fromtimestamp(time.time()).strftime(
'%Y-%m-%d-%H-%M-%S')
# Upload the GCE tarball to Google Cloud Storage.
self.tarball_local = image_path
gs_directory = ('gs://%s/%s' % (self.gcs_bucket, ts))
self.tarball_remote = '%s/%s' % (gs_directory,
os.path.basename(self.tarball_local))
self.gscontext.CopyInto(self.tarball_local, gs_directory)
# Create an image from |image_path|.
self.image = self._IMAGE_PREFIX + ts
self.image_link = self.gce_context.CreateImage(
self.image, self._GsPathToUrl(self.tarball_remote))
def _CreateInstance(self, name, image, **kwargs):
"""Creates a single VM instance with a static IP address."""
address = self.gce_context.CreateAddress(name)
return self.gce_context.CreateInstance(name, image, static_address=address,
**kwargs)
def _CreateInstances(self):
"""Creates instances with custom flags as specificed in |self.tests|."""
steps = []
for i, test in enumerate(self.tests):
ts = datetime.datetime.fromtimestamp(time.time()).strftime(
'%Y-%m-%d-%H-%M-%S')
instance = '%s%s-%d' % (self._INSTANCE_PREFIX, ts, i)
kwargs = test['flags'].copy()
kwargs['description'] = 'For test %s' % test['name']
steps.append(partial(self._CreateInstance, instance,
self.image_link, network=self.network,
machine_type=self.machine_type, **kwargs))
self.instances[test['name']] = instance
parallel.RunParallelSteps(steps)
def _DeleteExistingResources(self):
"""Deletes all allocated GCP resources."""
# There are cases where resources are created at the backend but the
# resource creation calls fail, for example due to network errors that
# happen when the response is being delivered. So we always make sure to
# delete all allocated resources (images, instances, addresses) regardless
# of whether the corresponding Create operation succeeded.
# Delete the GCE instances.
steps = [partial(self.gce_context.DeleteInstance, instance) for instance in
self.instances.values()]
def _RunParallelIgnoreErrors(funcs):
try:
parallel.RunParallelSteps(funcs)
except parallel.BackgroundFailure as e:
# We don't want to halt the test stage (and thus block commits) for
# cleanup errors. Leaked resources will be cleaned up externally.
logging.warning(
'Ignoring BackgroundFailure while deleting resources: %s', e)
_RunParallelIgnoreErrors(steps)
# Delete static IP addresses.
steps = [partial(self.gce_context.DeleteAddress, instance) for instance in
self.instances.values()]
_RunParallelIgnoreErrors(steps)
self.instances = {}
# Delete the GCE image.
# Have to delete the image after all instances are deleted because if the
# image is being used to create an instance (e.g., current process is asked
# to terminate during instance creation), it cannot be deleted until the
# instance creation ends.
if self.image:
self.gce_context.DeleteImage(self.image)
self.image = self.image_link = None
# Delete the tarball uploaded GCS.
# For the same reason, it's safer to delete the tarball after the image is
# deleted.
if self.tarball_remote:
self.gscontext.DoCommand(['rm', self.tarball_remote])
self.tarball_remote = None
def _HandleFail(self, log_directory, fail_directory):
"""Handles test failures.
In case of a test failure, copy necessary files, i.e., the GCE tarball and
ssh private key, to |fail_directory|, which will be later archived and
uploaded to a GCS bucket by chromite.
Args:
log_directory: The root directory where test logs are stored.
fail_directory: The directory to copy files to.
"""
parent_dir = os.path.dirname(fail_directory)
if not os.path.isdir(parent_dir):
os.makedirs(parent_dir)
try:
# Copy logs. Must be done before moving image, as this creates
# |fail_directory|.
shutil.copytree(log_directory, fail_directory)
# Copy GCE tarball and ssh private key for debugging.
shutil.copy(self.tarball_local, fail_directory)
if self.ssh_private_key is not None:
shutil.copy(self.ssh_private_key, fail_directory)
except (shutil.Error, OSError, IOError) as e:
logging.warning('Ignoring error while copying logs: %s', e)
def _GsPathToUrl(self, gs_path):
"""Converts a gs:// path to a URL.
A formal URL is needed when creating an image from a GCS object.
Args:
gs_path: A GS path, e.g., gs://foo-bucket/bar.tar.gz.
Returns:
A GCS URL to the same object.
Raises:
ValueError if |gs_path| is not a valid GS path.
"""
if not gs_path.startswith(self._GS_PATH_COMMON_PREFIX):
raise ValueError('Invalid GCS path: %s' % gs_path)
return gs_path.replace(self._GS_PATH_COMMON_PREFIX,
self._GS_URL_COMMON_PREFIX, 1)
def _WaitForBackgroundDeleteProcesses(self):
"""Waits for all background proecesses to finish."""
for p in self._bg_delete_processes:
p.join()
self._bg_delete_processes = []