| # Copyright 2015 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Module containing a class that implements an au_worker for GCE instances. |
| |
| By default GCEAUWorker creates a GCE instance with 'Default Instance Properties' |
| (detailed below), and runs the gce-smoke suite to verify an image. However it |
| allows customized test/suite list and instance properties, through an overlay |
| specific JSON file. |
| |
| Default Instance Properties: |
| project: constants.GCE_PROJECT |
| zone: constants.GCE_DEFAULT_ZONE |
| machine_type: n1-standard-8 |
| network: constants.GCE_DEFAULT_NETWORK |
| other properties: GCE default. |
| https://cloud.google.com/compute/docs/reference/latest/instances/insert |
| |
| To run tests/suites other than the gce-smoke suite, and to specify the instance |
| properties, add gce_tests.json under <overlay>/scripts. Refer to _LoadTests for |
| the exact requirement of this file, but here is a short example: |
| { |
| "tests": [ |
| { |
| "name": "suite:suite1", |
| "flags": { |
| "metadata": { |
| "items": [ |
| { |
| "key": "key1", |
| "value": "value1" |
| } |
| ] |
| } |
| } |
| }, |
| { |
| "name": "foo_Test", |
| "flags": {} |
| } |
| ] |
| } |
| |
| "flags" must strictly follow the schema of the Instance Resource |
| (https://cloud.google.com/compute/docs/reference/latest/instances#resource). |
| |
| GCEAUWorker respects most of the properties except instance name, boot_disk, |
| network and zone. The enforced values of these special properties are: |
| instance_name: managed name |
| boot_disk: a disk with the image being verified |
| network: the network that has required firewall set up |
| zone: project selected default zone |
| |
| Some of the properties of the Instance Resource are set by the GCE |
| backend so trying to set them at the client may result in noops or GCE errors, |
| which will be wrapped into an UpdateException. |
| |
| Note that some properties like 'disks' that depend on the existence of other |
| resources are not supported yet. |
| """ |
| |
| from __future__ import print_function |
| |
| import datetime |
| import json |
| import os |
| import shutil |
| import tempfile |
| import time |
| |
| from functools import partial |
| from multiprocessing import Process |
| |
| from chromite.lib import cros_build_lib |
| from chromite.lib import cros_logging as logging |
| from chromite.lib import gce |
| from chromite.lib import gs |
| from chromite.lib import parallel |
| from chromite.lib import path_util |
| from chromite.lib import portage_util |
| from crostestutils.au_test_harness import au_worker |
| from crostestutils.au_test_harness import constants |
| |
| |
| class GCEAUWorker(au_worker.AUWorker): |
| """Test harness for updating GCE instances. |
| |
| Attributes: |
| gce_context: An utility for GCE operations. |
| gscontext: An utility for GCS operations. |
| network: Default network to create instances in. |
| machine_type: Default machine type to create instances with. |
| gcs_bucket: The GCS bucket to upload image tarballs to. |
| tarball_local: Local path to the tarball of test image. |
| tarball_remote: GCS path to the tarball of test image. |
| image: A single GCE image associated with a worker. |
| image_link: The URL to the image created. |
| instances: GCE VM instances associated with a worker. |
| """ |
| _GS_PATH_COMMON_PREFIX = 'gs://' |
| _GS_URL_COMMON_PREFIX = 'https://storage.googleapis.com/' |
| _IMAGE_PREFIX = 'test-' |
| _INSTANCE_PREFIX = 'test-' |
| _TEST_REPORT_FILENAME = 'test_report.log' |
| |
| def __init__(self, options, test_results_root, |
| project=constants.GCE_PROJECT, |
| zone=constants.GCE_DEFAULT_ZONE, |
| network=constants.GCE_DEFAULT_NETWORK, |
| machine_type=constants.GCE_DEFAULT_MACHINE_TYPE, |
| json_key_file=constants.GCE_JSON_KEY, |
| gcs_bucket=constants.GCS_BUCKET): |
| """Processes GCE-specific options.""" |
| super(GCEAUWorker, self).__init__(options, test_results_root) |
| self.gce_context = gce.GceContext.ForServiceAccountThreadSafe( |
| project, zone, json_key_file=json_key_file) |
| self.json_key_file = json_key_file |
| self.gscontext = gs.GSContext() |
| self.network = network |
| self.machine_type = machine_type |
| self.gcs_bucket = gcs_bucket |
| self.tarball_local = None |
| self.tarball_remote = None |
| self.image = None |
| self.image_link = None |
| # One instance per test. |
| self.instances = {} |
| |
| # Background processes that delete throw-away instances. |
| self._bg_delete_processes = [] |
| |
| # Load test specifications from <overlay>/scripts/gce_tests.json, if any. |
| self._LoadTests() |
| |
| def CleanUp(self): |
| """Deletes throw-away instances and images.""" |
| logging.info('Waiting for GCP resources to be deleted.') |
| self._WaitForBackgroundDeleteProcesses() |
| self._DeleteExistingResources() |
| logging.info('All resources are deleted.') |
| |
| def PrepareBase(self, image_path, signed_base=False): |
| """Auto-update to base image to prepare for test.""" |
| return self.PrepareRealBase(image_path, signed_base) |
| |
| def UpdateImage(self, image_path, src_image_path='', stateful_change='old', |
| proxy_port=None, payload_signing_key=None): |
| """Updates the image on all GCE instances. |
| |
| There may be multiple instances created with different gcloud flags that |
| will be used by different tests or suites. |
| |
| Unlike vm_au_worker or real_au_worker, UpdateImage always creates a new |
| image and a new instance. |
| """ |
| # Delete existing resources in the background if any. |
| bg_delete = Process(target=self._DeleteExistingResources) |
| bg_delete.start() |
| self._bg_delete_processes.append(bg_delete) |
| |
| log_directory, fail_directory = self.GetNextResultsPath('update') |
| # Create an image and instances. |
| try: |
| self._CreateImage(image_path) |
| self._CreateInstances() |
| except: |
| self._HandleFail(log_directory, fail_directory) |
| raise |
| |
| def VerifyImage(self, unittest, percent_required_to_pass=100, test=''): |
| """Verifies the image by running all the required tests. |
| |
| Run the test targets as specified in <overlay>/scripts/gce_gce_tests.json or |
| the default 'gce-smoke' suite if none. Multiple test targets are run in |
| parallel. Test results are joined and printed after all tests finish. Note |
| that a dedicated instance has been created for each test target. |
| |
| Args: |
| unittest: (unittest.TestCase) The test case to report results back to. |
| percent_required_to_pass: (int) The required minimum pass rate. Not used. |
| test: (str) The specific test to run. Not used. |
| |
| Returns: |
| True if all tests pass, or False otherwise. |
| """ |
| log_directory_base, fail_directory_base = self.GetNextResultsPath( |
| 'autotest_tests') |
| steps = [] |
| for test in self.tests: |
| remote = self.gce_context.GetInstanceIP(self.instances[test['name']]) |
| # Prefer partial to lambda because of Python's late binding. |
| steps.append(partial(self._RunTest, test['name'], remote, |
| log_directory_base, fail_directory_base)) |
| try: |
| return_values = parallel.RunParallelSteps(steps, return_values=True) |
| except: |
| self._HandleFail(log_directory_base, fail_directory_base) |
| raise |
| |
| passed = True |
| test_reports = {} |
| for test, percent_passed, report in return_values: |
| passed &= (percent_passed == 100) |
| test_reports[test] = report |
| |
| if not passed: |
| self._HandleFail(log_directory_base, fail_directory_base) |
| print ('\nSome test(s) failed. Test reports:') |
| for test, report in test_reports.iteritems(): |
| print ('\nTest: %s\n%s' % (test, report or '')) |
| if unittest is not None: |
| unittest.fail('Not all tests passed.') |
| return passed |
| |
| # --- PRIVATE HELPER FUNCTIONS --- |
| def _RunTest(self, test, remote, log_directory_base, fail_directory_base): |
| """Runs a test or a suite of tests on a given remote. |
| |
| Runs a test target, whether an individual test or a suite of tests, with |
| 'test_that'. |
| |
| Args: |
| test: The test or suite to run. |
| remote: The hostname of the remote DUT. |
| log_directory_base: The base directory to store test logs. A sub directory |
| specific to this test will be created there. |
| fail_directory_base: The base directory to store test logs in case of a |
| test failure. |
| |
| Returns: |
| test: Same as |test|. This is useful when the caller wants to correlate |
| results to the test name. |
| percent_passed: Pass rate. |
| test_report: Content of the test report generated by test_that. |
| """ |
| log_directory, _ = self._GetResultsDirectoryForTest( |
| test, log_directory_base, fail_directory_base) |
| log_directory_in_chroot = log_directory.rpartition('chroot')[2] |
| |
| # Copy GCE key file in a temporary file inside the chroot and |
| # make sure to remove it before return. |
| with tempfile.NamedTemporaryFile( |
| dir=path_util.FromChrootPath('/tmp')) as gce_key_copy: |
| shutil.copy(self.json_key_file, gce_key_copy.name) |
| |
| args = 'gce_project=%s gce_zone=%s gce_instance=%s gce_key_file=%s' % ( |
| self.gce_context.project, self.gce_context.zone, self.instances[test], |
| path_util.ToChrootPath(gce_key_copy.name)) |
| |
| cmd = ['test_that', '-b', self.board, '--no-quickmerge', |
| '--results_dir=%s' % log_directory_in_chroot, remote, test, |
| '--args=%s' % args] |
| if self.ssh_private_key is not None: |
| cmd.append('--ssh_private_key=%s' % |
| path_util.ToChrootPath(self.ssh_private_key)) |
| |
| result = cros_build_lib.RunCommand(cmd, error_code_ok=True, |
| enter_chroot=True, |
| redirect_stdout=True, |
| cwd=constants.CROSUTILS_DIR) |
| percent_passed = self.ParseGeneratedTestOutput(result.output) |
| test_report = self._GetTestReport(log_directory) |
| |
| # Returns the summarized test_report as it is more useful than the full |
| # output, plus the entire log will always be linked in the failure report. |
| return test, percent_passed, test_report |
| |
| def _GetTestReport(self, results_path): |
| """Returns the content of test_report.log created by test_that. |
| |
| Args: |
| results_path: Path to the directory where results are saved. |
| |
| Returns: |
| Content of test_report.log, or None if report is not found. |
| """ |
| report_path = os.path.join(results_path, self._TEST_REPORT_FILENAME) |
| if os.path.isfile(report_path): |
| with open(report_path) as f: |
| return f.read() |
| logging.warning('Test log not found in %s', results_path) |
| return None |
| |
| def _GetResultsDirectoryForTest(self, test, log_directory_base, |
| fail_directory_base): |
| """Gets the log and fail directories for a particular test. |
| |
| Args: |
| test: The test or suite to get directories for. |
| log_directory_base: The base directory where all test results are saved. |
| fail_directory_base: The base directory where all test failures are |
| recorded. |
| """ |
| # Avoid using colon in file names. Not that it's not allowed, but it causes |
| # confusions and inconvenience as it is used as a separator in many cases, |
| # e.g., $PATH and url. |
| sanitized_test_name = test.replace(':', '_') |
| log_directory = os.path.join(log_directory_base, sanitized_test_name) |
| fail_directory = os.path.join(fail_directory_base, sanitized_test_name) |
| |
| if not os.path.exists(log_directory): |
| os.makedirs(log_directory) |
| return log_directory, fail_directory |
| |
| def _LoadTests(self): |
| """Loads the tests to run from <overlay>/scripts/gce_tests.json. |
| |
| If the JSON file exists, loads the tests and flags to create instance for |
| each test with. The JSON file should contain a "tests" object, which is an |
| array of objects, each of which has only two keys: "name" and "flags". |
| |
| "name" could be any valid Autotest test name, or a suite name, in the form |
| of "suite:<suite_name>", e.g., "suite:gce-smoke". |
| |
| "flags" is a JSON object whose members must be valid proterties of the GCE |
| Instance Resource, as specificed at: |
| https://cloud.google.com/compute/docs/reference/latest/instances#resource. |
| |
| These flags will be used to create instances. Each flag must strictly follow |
| the property schema as defined in the Instance Resource. Failure to do so |
| will result in instance creation failures. |
| |
| Note that a dedicated instance will be created for every test object |
| specified in scripts/gce_tests.json. So group test cases that require |
| similar instance properties together as suites whenever possible. |
| |
| An example scripts/gce_tests.json may look like: |
| { |
| "tests": [ |
| { |
| "name": "suite:gce-smoke", |
| "flags": [] |
| }, |
| { |
| "name": "suite:cloud-init", |
| "flags": { |
| "description": "Test instance", |
| "metadata": { |
| "items": [ |
| { |
| "key": "fake_key", |
| "value": "fake_value" |
| } |
| ] |
| } |
| } |
| } |
| ] |
| } |
| |
| If the JSON file does not exist, the 'gce-smoke' suite will be used to |
| verify the image. |
| """ |
| # Defaults to run the gce-smoke suite if no custom tests are given. |
| tests = [dict(name="suite:gce-smoke", flags=dict())] |
| |
| custom_tests = None |
| try: |
| custom_tests = portage_util.ReadOverlayFile( |
| 'scripts/gce_tests.json', board=self.board) |
| except portage_util.MissingOverlayException as e: |
| logging.warning('Board overlay not found. Error: %s', e) |
| |
| if custom_tests is not None: |
| if self.board not in constants.TRUSTED_BOARDS: |
| logging.warning('Custom tests and flags are not allowed for this board ' |
| '(%s)!', self.board) |
| else: |
| # Read the list of tests. |
| try: |
| json_file = json.loads(custom_tests) |
| tests = json_file.get('tests') |
| except ValueError as e: |
| logging.warning('scripts/gce_tests.json contains invalid JSON ' |
| 'content. Default tests will be run and default ' |
| 'flags will be used to create instances. Error: %s', |
| e) |
| self.tests = tests |
| |
| def _CreateImage(self, image_path): |
| """Uploads the gce tarball and creates an image with it.""" |
| ts = datetime.datetime.fromtimestamp(time.time()).strftime( |
| '%Y-%m-%d-%H-%M-%S') |
| |
| # Upload the GCE tarball to Google Cloud Storage. |
| self.tarball_local = image_path |
| gs_directory = ('gs://%s/%s' % (self.gcs_bucket, ts)) |
| self.tarball_remote = '%s/%s' % (gs_directory, |
| os.path.basename(self.tarball_local)) |
| self.gscontext.CopyInto(self.tarball_local, gs_directory) |
| |
| # Create an image from |image_path|. |
| self.image = self._IMAGE_PREFIX + ts |
| self.image_link = self.gce_context.CreateImage( |
| self.image, self._GsPathToUrl(self.tarball_remote)) |
| |
| def _CreateInstance(self, name, image, **kwargs): |
| """Creates a single VM instance with a static IP address.""" |
| address = self.gce_context.CreateAddress(name) |
| return self.gce_context.CreateInstance(name, image, static_address=address, |
| **kwargs) |
| |
| def _CreateInstances(self): |
| """Creates instances with custom flags as specificed in |self.tests|.""" |
| steps = [] |
| for i, test in enumerate(self.tests): |
| ts = datetime.datetime.fromtimestamp(time.time()).strftime( |
| '%Y-%m-%d-%H-%M-%S') |
| instance = '%s%s-%d' % (self._INSTANCE_PREFIX, ts, i) |
| kwargs = test['flags'].copy() |
| kwargs['description'] = 'For test %s' % test['name'] |
| steps.append(partial(self._CreateInstance, instance, |
| self.image_link, network=self.network, |
| machine_type=self.machine_type, **kwargs)) |
| self.instances[test['name']] = instance |
| parallel.RunParallelSteps(steps) |
| |
| def _DeleteExistingResources(self): |
| """Deletes all allocated GCP resources.""" |
| # There are cases where resources are created at the backend but the |
| # resource creation calls fail, for example due to network errors that |
| # happen when the response is being delivered. So we always make sure to |
| # delete all allocated resources (images, instances, addresses) regardless |
| # of whether the corresponding Create operation succeeded. |
| |
| # Delete the GCE instances. |
| steps = [partial(self.gce_context.DeleteInstance, instance) for instance in |
| self.instances.values()] |
| |
| def _RunParallelIgnoreErrors(funcs): |
| try: |
| parallel.RunParallelSteps(funcs) |
| except parallel.BackgroundFailure as e: |
| # We don't want to halt the test stage (and thus block commits) for |
| # cleanup errors. Leaked resources will be cleaned up externally. |
| logging.warning( |
| 'Ignoring BackgroundFailure while deleting resources: %s', e) |
| _RunParallelIgnoreErrors(steps) |
| |
| # Delete static IP addresses. |
| steps = [partial(self.gce_context.DeleteAddress, instance) for instance in |
| self.instances.values()] |
| _RunParallelIgnoreErrors(steps) |
| |
| self.instances = {} |
| |
| # Delete the GCE image. |
| # Have to delete the image after all instances are deleted because if the |
| # image is being used to create an instance (e.g., current process is asked |
| # to terminate during instance creation), it cannot be deleted until the |
| # instance creation ends. |
| if self.image: |
| self.gce_context.DeleteImage(self.image) |
| self.image = self.image_link = None |
| |
| # Delete the tarball uploaded GCS. |
| # For the same reason, it's safer to delete the tarball after the image is |
| # deleted. |
| if self.tarball_remote: |
| self.gscontext.DoCommand(['rm', self.tarball_remote]) |
| self.tarball_remote = None |
| |
| def _HandleFail(self, log_directory, fail_directory): |
| """Handles test failures. |
| |
| In case of a test failure, copy necessary files, i.e., the GCE tarball and |
| ssh private key, to |fail_directory|, which will be later archived and |
| uploaded to a GCS bucket by chromite. |
| |
| Args: |
| log_directory: The root directory where test logs are stored. |
| fail_directory: The directory to copy files to. |
| """ |
| parent_dir = os.path.dirname(fail_directory) |
| if not os.path.isdir(parent_dir): |
| os.makedirs(parent_dir) |
| |
| try: |
| # Copy logs. Must be done before moving image, as this creates |
| # |fail_directory|. |
| shutil.copytree(log_directory, fail_directory) |
| |
| # Copy GCE tarball and ssh private key for debugging. |
| shutil.copy(self.tarball_local, fail_directory) |
| if self.ssh_private_key is not None: |
| shutil.copy(self.ssh_private_key, fail_directory) |
| except (shutil.Error, OSError, IOError) as e: |
| logging.warning('Ignoring error while copying logs: %s', e) |
| |
| def _GsPathToUrl(self, gs_path): |
| """Converts a gs:// path to a URL. |
| |
| A formal URL is needed when creating an image from a GCS object. |
| |
| Args: |
| gs_path: A GS path, e.g., gs://foo-bucket/bar.tar.gz. |
| |
| Returns: |
| A GCS URL to the same object. |
| |
| Raises: |
| ValueError if |gs_path| is not a valid GS path. |
| """ |
| if not gs_path.startswith(self._GS_PATH_COMMON_PREFIX): |
| raise ValueError('Invalid GCS path: %s' % gs_path) |
| return gs_path.replace(self._GS_PATH_COMMON_PREFIX, |
| self._GS_URL_COMMON_PREFIX, 1) |
| |
| def _WaitForBackgroundDeleteProcesses(self): |
| """Waits for all background proecesses to finish.""" |
| for p in self._bg_delete_processes: |
| p.join() |
| self._bg_delete_processes = [] |