blob: 678cdb7824563a3346e11baa3eb4b179f363ff97 [file] [log] [blame]
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Module containing a class that implements an au_worker for GCE instances.
By default GCEAUWorker creates a GCE instance with 'Default Instance Properties'
(detailed below), and runs the gce-smoke suite to verify an image. However it
allows customized test/suite list and instance properties, through an overlay
specific JSON file.
Default Instance Properties:
project: constants.GCE_PROJECT
zone: constants.GCE_DEFAULT_ZONE
machine_type: n1-standard-8
network: constants.GCE_DEFAULT_NETWORK
other properties: GCE default.
https://cloud.google.com/compute/docs/reference/latest/instances/insert
To run tests/suites other than the gce-smoke suite, and to specify the instance
properties, add gce_tests.json under <overlay>/scripts. Refer to _LoadTests for
the exact requirement of this file, but here is a short example:
{
"tests": [
{
"name": "suite:suite1",
"flags": {
"metadata": {
"items": [
{
"key": "key1",
"value": "value1"
}
]
}
}
},
{
"name": "foo_Test",
"flags": {}
}
]
}
"flags" must strictly follow the schema of the Instance Resource
(https://cloud.google.com/compute/docs/reference/latest/instances#resource).
GCEAUWorker respects most of the properties except instance name, boot_disk,
network and zone. The enforced values of these special properties are:
instance_name: managed name
boot_disk: a disk with the image being verified
network: the network that has required firewall set up
zone: project selected default zone
Some of the properties of the Instance Resource are set by the GCE
backend so trying to set them at the client may result in noops or GCE errors,
which will be wrapped into an UpdateException.
Note that some properties like 'disks' that depend on the existence of other
resources are not supported yet.
"""
from __future__ import print_function
import datetime
import json
import os
import shutil
import time
from functools import partial
from multiprocessing import Process
from chromite.lib import cros_build_lib
from chromite.lib import cros_logging as logging
from chromite.lib import gs
from chromite.lib import parallel
from chromite.lib import path_util
from chromite.lib import portage_util
from crostestutils.au_test_harness import au_worker
from crostestutils.au_test_harness import constants
from crostestutils.au_test_harness import update_exception
from crostestutils.lib import gce
class GCEAUWorker(au_worker.AUWorker):
"""Test harness for updating GCE instances.
Attributes:
gce_context: An utility for GCE operations.
gscontext: An utility for GCS operations.
gcs_bucket: The GCS bucket to upload image tarballs to.
tarball_local: Local path to the tarball of test image.
tarball_remote: GCS path to the tarball of test image.
image: A single GCE image associated with a worker.
image_link: The URL to the image created.
instances: GCE VM instances associated with a worker.
bg_delete_processes:
Background processes that delete stale instances and images.
"""
GS_PATH_COMMON_PREFIX = 'gs://'
GS_URL_COMMON_PREFIX = 'https://storage.googleapis.com/'
IMAGE_PREFIX = 'test-image-'
INSTANCE_PREFIX = 'test-instance-'
def __init__(self, options, test_results_root,
project=constants.GCE_PROJECT,
zone=constants.GCE_DEFAULT_ZONE,
network=constants.GCE_DEFAULT_NETWORK,
machine_type=constants.GCE_DEFAULT_MACHINE_TYPE,
json_key_file=constants.GCE_JSON_KEY,
gcs_bucket=constants.GCS_BUCKET):
"""Processes GCE-specific options."""
super(GCEAUWorker, self).__init__(options, test_results_root)
self.gce_context = gce.GceContext.ForServiceAccountThreadSafe(
project, zone, network, machine_type, json_key_file=json_key_file)
self.gscontext = gs.GSContext()
self.gcs_bucket = gcs_bucket
self.tarball_local = None
self.tarball_remote = None
self.image = None
self.image_link = None
# One instance per test.
self.instances = {}
# Background processes that delete throw-away instances.
self.bg_delete_processes = []
# Load test specifications from <overlay>/scripts/gce_tests.json, if any.
self._LoadTests()
def CleanUp(self):
"""Deletes throw-away instances and images."""
logging.info('Waiting for GCP resources to be deleted.')
self._WaitForBackgroundDeleteProcesses()
self._DeleteExistingResources()
logging.info('All resources are deleted.')
def PrepareBase(self, image_path, signed_base=False):
"""Auto-update to base image to prepare for test."""
return self.PrepareRealBase(image_path, signed_base)
def UpdateImage(self, image_path, src_image_path='', stateful_change='old',
proxy_port=None, payload_signing_key=None):
"""Updates the image on all GCE instances.
There may be multiple instances created with different gcloud flags that
will be used by different tests or suites.
Unlike vm_au_worker or real_au_worker, UpdateImage always creates a new
image and a new instance.
"""
# Delete existing resources in the background if any.
bg_delete = Process(target=self._DeleteExistingResources)
bg_delete.start()
self.bg_delete_processes.append(bg_delete)
# Creates an image and instances.
self._CreateImage(image_path)
self._CreateInstances()
def VerifyImage(self, unittest, percent_required_to_pass=100, test=''):
"""Verifies the image by running all the required tests.
Run the test targets as specified in <overlay>/scripts/gce_gce_tests.json or
the default 'gce-smoke' suite if none. Multiple test targets are run in
parallel. Test results are joined and printed after all tests finish. Note
that a dedicated instance has been created for each test target.
Args:
unittest: (unittest.TestCase) The test case to report results back to.
percent_required_to_pass: (int) The required minimum pass rate. Not used.
test: (str) The specific test to run. Not used.
Returns:
True if all tests pass, or False otherwise.
"""
log_directory_base, fail_directory_base = self.GetNextResultsPath(
'autotest_tests')
steps = []
for test in self.tests:
remote = self.gce_context.GetInstanceIP(self.instances[test['name']])
# Prefer partial to lambda because of Python's late binding.
steps.append(partial(self._RunTest, test['name'], remote,
log_directory_base, fail_directory_base))
return_values = parallel.RunParallelSteps(steps, return_values=True)
passed = True
outputs = {}
for test, percent_passed, output in return_values:
passed &= (percent_passed == 100)
outputs[test] = output
if not passed:
self._HandleFail(log_directory_base, fail_directory_base)
if unittest is not None:
unittest.fail('Not all tests passed')
for test, output in outputs.iteritems():
print ('\nTest: %s\n' % test)
print (output)
return passed
# --- PRIVATE HELPER FUNCTIONS ---
def _RunTest(self, test, remote, log_directory_base, fail_directory_base):
"""Runs a test or a suite of tests on a given remote.
Runs a test target, whether an individual test or a suite of tests, with
'test_that'.
Args:
test: (str) The test or suite to run.
remote: (str) The hostname of the remote DUT.
log_directory_base:
(str) The base directory to store test logs. A sub directory specific
to this test will be created there.
fail_directory_base:
(str) The base directory to store test logs in case of a test failure.
Returns:
test:
(str) Same as |test|. This is useful when the caller wants to
correlate results to the test name.
percent_passed: (int) Pass rate.
output: (str): Original test output.
"""
log_directory, _ = self._GetResultsDirectoryForTest(
test, log_directory_base, fail_directory_base)
log_directory_in_chroot = log_directory.rpartition('chroot')[2]
cmd = ['test_that', '-b', self.board, '--no-quickmerge',
'--results_dir=%s' % log_directory_in_chroot, remote, test]
if self.ssh_private_key is not None:
cmd.append('--ssh_private_key=%s' %
path_util.ToChrootPath(self.ssh_private_key))
result = cros_build_lib.RunCommand(cmd, error_code_ok=True,
enter_chroot=True,
redirect_stdout=True,
cwd=constants.CROSUTILS_DIR)
percent_passed = self.ParseGeneratedTestOutput(result.output)
return test, percent_passed, result.output
def _GetResultsDirectoryForTest(self, test, log_directory_base,
fail_directory_base):
"""Gets the log and fail directories for a particular test.
Args:
test: (str) The test or suite to get directories for.
log_directory_base:
(str) The base directory where all test results are saved.
fail_directory_base:
(str) The base directory where all test failures are recorded.
"""
log_directory = os.path.join(log_directory_base, test)
fail_directory = os.path.join(fail_directory_base, test)
if not os.path.exists(log_directory):
os.makedirs(log_directory)
return log_directory, fail_directory
def _LoadTests(self):
"""Loads the tests to run from <overlay>/scripts/gce_tests.json.
If the JSON file exists, loads the tests and flags to create instance for
each test with. The JSON file should contain a "tests" object, which is an
array of objects, each of which has only two keys: "name" and "flags".
"name" could be any valid Autotest test name, or a suite name, in the form
of "suite:<suite_name>", e.g., "suite:gce-smoke".
"flags" is a JSON object whose members must be valid proterties of the GCE
Instance Resource, as specificed at:
https://cloud.google.com/compute/docs/reference/latest/instances#resource.
These flags will be used to create instances. Each flag must strictly follow
the property schema as defined in the Instance Resource. Failure to do so
will result in instance creation failures.
Note that a dedicated instance will be created for every test object
specified in scripts/gce_tests.json. So group test cases that require
similar instance properties together as suites whenever possible.
An example scripts/gce_tests.json may look like:
{
"tests": [
{
"name": "suite:gce-smoke",
"flags": []
},
{
"name": "suite:cloud-init",
"flags": {
"description": "Test instance",
"metadata": {
"items": [
{
"key": "fake_key",
"value": "fake_value"
}
]
}
}
}
]
}
If the JSON file does not exist, the 'gce-smoke' suite will be used to
verify the image.
"""
# Defaults to run the gce-smoke suite if no custom tests are given.
tests = [dict(name="suite:gce-smoke", flags=dict())]
custom_tests = None
try:
custom_tests = portage_util.ReadOverlayFile(
'scripts/gce_tests.json', board=self.board)
except portage_util.MissingOverlayException as e:
logging.warn('Board overlay not found. Error: %r', e)
if custom_tests is not None:
if self.board not in constants.TRUSTED_BOARDS:
logging.warn('Custom tests and flags are not allowed for this board '
'(%s)!', self.board)
else:
# Read the list of tests.
try:
json_file = json.loads(custom_tests)
tests = json_file.get('tests')
except ValueError as e:
logging.warn('scripts/gce_tests.json contains invalid JSON content. '
'Default tests will be run and default flags will be '
'used to create instances. Error: %r', e)
self.tests = tests
def _CreateImage(self, image_path):
"""Uploads the gce tarball and creates an image with it."""
log_directory, fail_directory = self.GetNextResultsPath('update')
ts = datetime.datetime.fromtimestamp(time.time()).strftime(
'%Y-%m-%d-%H-%M-%S')
# Upload the GCE tarball to Google Cloud Storage.
self.tarball_local = image_path
gs_directory = ('gs://%s/%s' % (self.gcs_bucket, ts))
try:
self.gscontext.CopyInto(self.tarball_local, gs_directory)
self.tarball_remote = '%s/%s' % (gs_directory,
os.path.basename(self.tarball_local))
except Exception as e:
raise update_exception.UpdateException(
1, 'Update failed. Unable to upload test image GCE tarball to GCS. '
'Error: %s' % e)
# Create an image from |image_path| and an instance from the image.
image = '%s%s' % (self.IMAGE_PREFIX, ts)
try:
self.image_link = self.gce_context.CreateImage(
image, self._GsPathToUrl(self.tarball_remote))
self.image = image
except gce.Error as e:
self._HandleFail(log_directory, fail_directory)
raise update_exception.UpdateException(1, 'Update failed. Error: %r' % e)
def _CreateInstances(self):
"""Creates instances with custom flags as specificed in |self.tests|."""
steps = []
for test in self.tests:
ts = datetime.datetime.fromtimestamp(time.time()).strftime(
'%Y-%m-%d-%H-%M-%S')
instance = '%s%s' % (self.INSTANCE_PREFIX, ts)
kwargs = test['flags'].copy()
kwargs['description'] = 'For test %s' % test['name']
steps.append(partial(self.gce_context.CreateInstance, instance,
self.image_link, **kwargs))
self.instances[test['name']] = instance
parallel.RunParallelSteps(steps)
def _DeleteExistingResouce(self, resource, existence_checker, deletor):
"""Deletes a resource if it exists.
This method checks the existence of a resource using |existence_checker|,
and deletes it on true.
Args:
resource: (str) The resource name/url to delete.
existence_checker:
(callable) The callable to check existence. This callable should take
|resource| as its first argument.
deletor:
(callable) The callable to perform the deletion. This callable should
take |resource| as its first argument.
Raises:
ValueError if existence_checker or deletor is not callable.
"""
if not hasattr(existence_checker, '__call__'):
raise ValueError('existence_checker must be a callable')
if not hasattr(deletor, '__call__'):
raise ValueError('deletor must be a callable')
if existence_checker(resource):
deletor(resource)
def _DeleteExistingResources(self):
"""Delete instances, image and the tarball on GCS if they exist."""
steps = []
if self.tarball_remote:
steps.append(partial(self.gscontext.DoCommand,
['rm', self.tarball_remote]))
if self.image:
steps.append(partial(self.gce_context.DeleteImage, self.image))
for instance in self.instances.values():
steps.append(partial(
self._DeleteExistingResouce,
resource=instance,
existence_checker=self.gce_context.InstanceExists,
deletor=self.gce_context.DeleteInstance))
# Delete all resources in parallel.
try:
parallel.RunParallelSteps(steps)
except Exception as e:
logging.warn('Infrastructure failure. Error: %r' % e)
# Reset variables.
self.tarball_remote = None
self.image = None
self.image_link = None
self.instances = {}
def _HandleFail(self, log_directory, fail_directory):
"""Handles test failures.
In case of a test failure, copy necessary files, i.e., the GCE tarball and
ssh private key, to |fail_directory|, which will be later archived and
uploaded to a GCS bucket by chromite.
Args:
log_directory: The root directory where test logs are stored.
fail_directory: The directory to copy files to.
"""
parent_dir = os.path.dirname(fail_directory)
if not os.path.isdir(parent_dir):
os.makedirs(parent_dir)
# Copy logs. Must be done before moving image, as this creates
# |fail_directory|.
try:
shutil.copytree(log_directory, fail_directory)
except shutil.Error as e:
logging.warning('Ignoring errors while copying logs: %s', e)
# Copy GCE tarball and ssh private key for debugging.
try:
shutil.copy(self.tarball_local, fail_directory)
if self.ssh_private_key is not None:
shutil.copy(self.ssh_private_key, fail_directory)
except shutil.Error as e:
logging.warning('Ignoring errors while copying GCE tarball: %s', e)
self._DeleteExistingResources()
def _GsPathToUrl(self, gs_path):
"""Converts a gs:// path to a URL.
A formal URL is needed when creating an image from a GCS object.
Args:
gs_path: A GS path, e.g., gs://foo-bucket/bar.tar.gz.
Returns:
A GCS URL to the same object.
Raises:
ValueError if |gs_path| is not a valid GS path.
"""
if not gs_path.startswith(self.GS_PATH_COMMON_PREFIX):
raise ValueError('Invalid GCS path: %s' % gs_path)
return gs_path.replace(self.GS_PATH_COMMON_PREFIX,
self.GS_URL_COMMON_PREFIX, 1)
def _WaitForBackgroundDeleteProcesses(self):
"""Waits for all background proecesses to finish."""
for p in self.bg_delete_processes:
p.join()
self.bg_delete_processes = []