gce_au_worker: Revise gce_au_worker based on the new GCE util

All changes include:

* Update gce_au_worker.py with the lately added GCE Python API based utility,
  i.e., gce.py, see CL:300202.
* Use locally built GCE tarball instead of depending on its existence in the
  archive GCS bucket, see CL:300315.
* Update lib/constants with the credentials of the GCE project used for testing.
  Background: brbug.com/1199 and crbug.com/526292.
* Handle test failures by archiving GCE tarball and test private key for repro.
* Add a unit test for gce_au_worker.py.

BUG=brillo:1196
TEST=Trybot run with a config that enables gce tests.

Change-Id: Ide8ed380f23e1c1f015094413f5bc93cbc60760f
Reviewed-on: https://chromium-review.googlesource.com/301741
Commit-Ready: Daniel Wang <wonderfly@google.com>
Tested-by: Daniel Wang <wonderfly@google.com>
Reviewed-by: Simran Basi <sbasi@chromium.org>
diff --git a/au_test_harness/cros_au_test_harness.py b/au_test_harness/cros_au_test_harness.py
index 8dd3ce3..8f5c388 100755
--- a/au_test_harness/cros_au_test_harness.py
+++ b/au_test_harness/cros_au_test_harness.py
@@ -28,7 +28,6 @@
 from chromite.lib import cros_build_lib
 from chromite.lib import cros_logging as logging
 from chromite.lib import dev_server_wrapper
-from chromite.lib import gs
 from chromite.lib import parallel
 from chromite.lib import sudo
 from chromite.lib import timeout_util
@@ -125,26 +124,18 @@
     leftover_args: Args left after parsing.
   """
 
-  _IMAGE_PATH_REQUIREMENT = ('For vm and real types, the image must be a local '
-                             'file. For gce, the image path has to be a valid '
-                             'Google Cloud Storage URI.')
-
   if leftover_args: parser.error('Found unsupported flags ' + leftover_args)
   if not options.type in ['real', 'vm', 'gce']:
     parser.error('Failed to specify valid test type.')
 
   def _IsValidImage(image):
     """Asserts that |image_path| is a valid image file for |options.type|."""
-    if not image:
-      return False
-    return (gs.PathIsGs(image) if options.type == 'gce' else
-            os.path.isfile(image))
+    return (image is not None) and os.path.isfile(image)
 
   if not _IsValidImage(options.target_image):
     parser.error('Testing requires a valid target image.\n'
-                 '%s\n'
                  'Given: type=%s, target_image=%s.' %
-                 (_IMAGE_PATH_REQUIREMENT, options.type, options.target_image))
+                 (options.type, options.target_image))
 
   if not options.base_image:
     logging.info('No base image supplied.  Using target as base image.')
@@ -152,9 +143,8 @@
 
   if not _IsValidImage(options.base_image):
     parser.error('Testing requires a valid base image.\n'
-                 '%s\n'
                  'Given: type=%s, base_image=%s.' %
-                 (_IMAGE_PATH_REQUIREMENT, options.type, options.base_image))
+                 (options.type, options.base_image))
 
   if (options.payload_signing_key and not
       os.path.isfile(options.payload_signing_key)):
diff --git a/au_test_harness/cros_au_test_harness_unittest.py b/au_test_harness/cros_au_test_harness_unittest.py
index c16bcd0..ef9555f 100755
--- a/au_test_harness/cros_au_test_harness_unittest.py
+++ b/au_test_harness/cros_au_test_harness_unittest.py
@@ -15,7 +15,6 @@
 import os
 import sys
 import unittest
-import uuid
 
 import constants
 sys.path.append(constants.SOURCE_ROOT)
@@ -51,48 +50,6 @@
 
     self.assertNotIn(self.INVALID_TYPE_ERROR, cm.exception.result.error)
 
-  def testCheckOptionsRequiresGSPathForGCETests(self):
-    """Tests that CheckOptions requires a valid GS path for GCE tests."""
-    local_path = '/tmp/foo/bar'
-    gs_path = 'gs://foo-bucket/bar.tar.gz'
-    cmd = [os.path.join(constants.CROSUTILS_DIR, 'bin', 'cros_au_test_harness'),
-           '--type=gce',
-           '--target_image=%s' % local_path
-          ]
-    with self.assertRaises(cros_build_lib.RunCommandError) as cm:
-      cros_build_lib.RunCommand(cmd)
-    self.assertIn(self.INVALID_IMAGE_PATH, cm.exception.result.error)
-
-    cmd = [os.path.join(constants.CROSUTILS_DIR, 'bin', 'cros_au_test_harness'),
-           '--type=gce',
-           '--target_image=%s' % gs_path
-          ]
-    with self.assertRaises(cros_build_lib.RunCommandError) as cm:
-      cros_build_lib.RunCommand(cmd)
-    self.assertNotIn(self.INVALID_IMAGE_PATH, cm.exception.result.error)
-
-  @unittest.skip('This test runs but only for demo purposes. Do not check it '
-                 'in as is')
-  def testSimpleTestsOnGCE(self):
-    """Tests that cros_au_test_harness is able to run simple tests on GCE.
-
-    Explicitly triggers SimpleTestVerify and SimpleTestUpdateAndVerify via
-    '--test_prefix'.
-    """
-    board = 'lakitu'
-    gs_path = 'gs://test-images/chromiumos_test_image.tar.gz'
-    test_results_dir = 'chroot/tmp/test_results_%s' % str(uuid.uuid4())
-    cmd = [os.path.join(constants.CROSUTILS_DIR, 'bin', 'cros_au_test_harness'),
-           '--type=gce',
-           '--target_image=%s' % gs_path,
-           '--board=%s' % board,
-           '--test_results=%s' % test_results_dir,
-           '--test_prefix=Simple',
-           '--verify_suite_name=smoke',
-           '--parallel'
-          ]
-    cros_build_lib.RunCommand(cmd)
-
 
 if __name__ == '__main__':
   unittest.main()
diff --git a/au_test_harness/gce_au_worker.py b/au_test_harness/gce_au_worker.py
index fe62db2..ed9c82d 100644
--- a/au_test_harness/gce_au_worker.py
+++ b/au_test_harness/gce_au_worker.py
@@ -7,35 +7,58 @@
 from __future__ import print_function
 
 import datetime
+import os
+import shutil
 import time
 
 from multiprocessing import Process
 
-from chromite.compute import gcloud
 from chromite.lib import cros_build_lib
 from chromite.lib import cros_logging as logging
+from chromite.lib import gs
 from chromite.lib import path_util
 from crostestutils.au_test_harness import au_worker
 from crostestutils.au_test_harness import constants
 from crostestutils.au_test_harness import update_exception
+from crostestutils.lib import gce
 
 
 class GCEAUWorker(au_worker.AUWorker):
-  """Test harness for updating GCE instances."""
+  """Test harness for updating GCE instances.
 
-  _INSTANCE_PREFIX = 'test-instance-'
-  _IMAGE_PREFIX = 'test-image-'
+  Attributes:
+    gce_context: An utility for GCE operations.
+    gscontext: An utility for GCS operations.
+    gcs_bucket: The GCS bucket to upload image tarballs to.
+    instance: A single VM instance associated with a worker.
+    image: A single GCE image associated with a worker.
+    tarball_local: Local path to the tarball of test image.
+    tarball_remote: GCS path to the tarball of test image.
+    bg_delete_processes:
+      Background processes that delete stale instances and images.
+  """
 
-  def __init__(self, options, test_results_root, project=constants.GCE_PROJECT,
-               zone=constants.GCE_ZONE):
+  INSTANCE_PREFIX = 'test-instance-'
+  IMAGE_PREFIX = 'test-image-'
+  GS_PATH_COMMON_PREFIX = 'gs://'
+  GS_URL_COMMON_PREFIX = 'https://storage.googleapis.com/'
+
+  def __init__(self, options, test_results_root,
+               project=constants.GCE_PROJECT,
+               zone=constants.GCE_DEFAULT_ZONE,
+               network=constants.GCE_DEFAULT_NETWORK,
+               json_key_file=constants.GCE_JSON_KEY,
+               gcs_bucket=constants.GCS_BUCKET):
     """Processes GCE-specific options."""
     super(GCEAUWorker, self).__init__(options, test_results_root)
-
-    # Google Cloud project and zone, in which to create the test instance.
-    self.gccontext = gcloud.GCContext(project, zone)
-    self.image = ''
-    self.instance = ''
-    self.instance_ip = ''
+    self.gce_context = gce.GceContext.ForServiceAccount(
+        project, zone, network, json_key_file=json_key_file)
+    self.gscontext = gs.GSContext()
+    self.gcs_bucket = gcs_bucket
+    self.tarball_local = None
+    self.tarball_remote = None
+    self.instance = None
+    self.image = None
 
     # Background processes that delete throw-away instances.
     self.bg_delete_processes = []
@@ -47,36 +70,26 @@
     def _WaitForBackgroundDeleteProcesses():
       for p in self.bg_delete_processes:
         p.join()
+      self.bg_delete_processes = []
 
     _WaitForBackgroundDeleteProcesses()
     # Delete the instance/image created by the last call to UpdateImage.
-    self._DeleteExistingInstanceInBackground()
+    self._DeleteInstanceIfExists()
     _WaitForBackgroundDeleteProcesses()
     logging.info('All instances/images are deleted.')
 
-  def _DeleteExistingInstanceInBackground(self):
+  def _DeleteInstanceIfExists(self):
     """Deletes existing instances if any."""
-
-    def _DeleteInstance():
-      bg_delete = Process(target=self.gccontext.DeleteInstance,
-                          args=(self.instance,), kwargs=dict(quiet=True))
-      bg_delete.start()
-      self.bg_delete_processes.append(bg_delete)
-      self.instance = ''
-      self.instance_ip = ''
-
-    def _DeleteImage():
-      bg_delete = Process(target=self.gccontext.DeleteImage,
-                          args=(self.image,), kwargs=dict(quiet=True))
-      bg_delete.start()
-      self.bg_delete_processes.append(bg_delete)
-      self.image = ''
+    def _DeleteInstanceAndImage():
+      self.gscontext.DoCommand(['rm', self.tarball_remote])
+      self.gce_context.DeleteInstance(self.instance)
+      self.gce_context.DeleteImage(self.image)
 
     if self.instance:
       logging.info('Existing instance %s found. Deleting...', self.instance)
-      _DeleteInstance()
-      if self.image:
-        _DeleteImage()
+      bg_delete = Process(target=_DeleteInstanceAndImage)
+      bg_delete.start()
+      self.bg_delete_processes.append(bg_delete)
 
   def PrepareBase(self, image_path, signed_base=False):
     """Auto-update to base image to prepare for test."""
@@ -86,37 +99,50 @@
                   proxy_port=None, payload_signing_key=None):
     """Updates the image on a GCE instance.
 
-    Unlike real_au_worker, this method always creates a new instance. Note that
-    |image_path| has to be a valid Google Cloud Storage url, e.g.,
-    gs://foo-bucket/bar.tar.gz.
+    Unlike real_au_worker, this method always creates a new instance.
     """
-    self._DeleteExistingInstanceInBackground()
+    self.tarball_local = image_path
+    log_directory, fail_directory = self.GetNextResultsPath('update')
+    self._DeleteInstanceIfExists()
     ts = datetime.datetime.fromtimestamp(time.time()).strftime(
         '%Y-%m-%d-%H-%M-%S')
-    image = '%s%s' % (self._IMAGE_PREFIX, ts)
-    instance = '%s%s' % (self._INSTANCE_PREFIX, ts)
+    image = '%s%s' % (self.IMAGE_PREFIX, ts)
+    instance = '%s%s' % (self.INSTANCE_PREFIX, ts)
+    gs_directory = ('gs://%s/%s' % (self.gcs_bucket, ts))
+
+    # Upload the GCE tarball to Google Cloud Storage.
+    try:
+      logging.info('Uploading GCE tarball %s to %s ...' , self.tarball_local,
+                   gs_directory)
+      self.gscontext.CopyInto(self.tarball_local, gs_directory)
+      self.tarball_remote = '%s/%s' % (gs_directory,
+                                       os.path.basename(self.tarball_local))
+    except Exception as e:
+      raise update_exception.UpdateException(
+          1, 'Update failed. Unable to upload test image GCE tarball to GCS. '
+          'Error: %s' % e)
 
     # Create an image from |image_path| and an instance from the image.
     try:
-      self.gccontext.CreateImage(image, image_path)
-      self.gccontext.CreateInstance(instance, image)
-    except gcloud.GCCommandError as e:
-      raise update_exception.UpdateException(
-          1, 'Update failed. Error: %s' % e.message)
-    self.instance_ip = self.gccontext.GetInstanceIP(instance)
+      image_link = self.gce_context.CreateImage(
+          image, self._GsPathToUrl(self.tarball_remote))
+      self.gce_context.CreateInstance(instance, image_link)
+    except gce.Error as e:
+      self._HandleFail(log_directory, fail_directory)
+      raise update_exception.UpdateException(1, 'Update failed. Error: %s' % e)
     self.instance = instance
     self.image = image
 
   def VerifyImage(self, unittest, percent_required_to_pass=100, test=''):
     """Verifies an image using test_that with verification suite."""
-    test_directory, _ = self.GetNextResultsPath('autotest_tests')
-    if not test:
-      test = self.verify_suite
+    log_directory, fail_directory = self.GetNextResultsPath('autotest_tests')
+    log_directory_in_chroot = log_directory.rpartition('chroot')[2]
+    instance_ip = self.gce_context.GetInstanceIP(self.instance)
+    test_suite = test or self.verify_suite
 
-    self.TestInfo('Running test %s to verify image.' % test)
-
-    cmd = ['test_that', '--no-quickmerge', '--results_dir=%s' % test_directory,
-           self.instance_ip, test]
+    cmd = ['test_that', '-b', self.board, '--no-quickmerge',
+           '--results_dir=%s' % log_directory_in_chroot, instance_ip,
+           test_suite]
     if self.ssh_private_key is not None:
       cmd.append('--ssh_private_key=%s' %
                  path_util.ToChrootPath(self.ssh_private_key))
@@ -127,6 +153,57 @@
     ret = self.AssertEnoughTestsPassed(unittest, result.output,
                                        percent_required_to_pass)
     if not ret:
-      self._DeleteExistingInstanceInBackground()
+      self._HandleFail(log_directory, fail_directory)
 
     return ret
+
+  def _HandleFail(self, log_directory, fail_directory):
+    """Handles test failures.
+
+    In case of a test failure, copy necessary files, i.e., the GCE tarball and
+    ssh private key, to |fail_directory|, which will be later archived and
+    uploaded to a GCS bucket by chromite.
+
+    Args:
+      log_directory: The root directory where test logs are stored.
+      fail_directory: The directory to copy files to.
+    """
+    parent_dir = os.path.dirname(fail_directory)
+    if not os.path.isdir(parent_dir):
+      os.makedirs(parent_dir)
+
+    # Copy logs. Must be done before moving image, as this creates
+    # |fail_directory|.
+    try:
+      shutil.copytree(log_directory, fail_directory)
+    except shutil.Error as e:
+      logging.warning('Ignoring errors while copying logs: %s', e)
+
+    # Copy GCE tarball and ssh private key for debugging.
+    try:
+      shutil.copy(self.tarball_local, fail_directory)
+      if self.ssh_private_key is not None:
+        shutil.copy(self.ssh_private_key, fail_directory)
+    except shutil.Error as e:
+      logging.warning('Ignoring errors while copying GCE tarball: %s', e)
+
+    self._DeleteInstanceIfExists()
+
+  def _GsPathToUrl(self, gs_path):
+    """Converts a gs:// path to a URL.
+
+    A formal URL is needed when creating an image from a GCS object.
+
+    Args:
+      gs_path: A GS path, e.g., gs://foo-bucket/bar.tar.gz.
+
+    Returns:
+      A GCS URL to the same object.
+
+    Raises:
+      ValueError if |gs_path| is not a valid GS path.
+    """
+    if not gs_path.startswith(self.GS_PATH_COMMON_PREFIX):
+      raise ValueError('Invalid GCS path: %s' % gs_path)
+    return gs_path.replace(self.GS_PATH_COMMON_PREFIX,
+                           self.GS_URL_COMMON_PREFIX, 1)
diff --git a/au_test_harness/gce_au_worker_unittest.py b/au_test_harness/gce_au_worker_unittest.py
new file mode 100755
index 0000000..76a7280
--- /dev/null
+++ b/au_test_harness/gce_au_worker_unittest.py
@@ -0,0 +1,152 @@
+#!/usr/bin/python2
+#
+# Copyright 2015 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Tests for gce_au_worker."""
+
+from __future__ import print_function
+
+import os
+import sys
+import unittest
+
+import constants
+sys.path.append(constants.CROS_PLATFORM_ROOT)
+sys.path.append(constants.SOURCE_ROOT)
+
+from chromite.lib import cros_build_lib
+from chromite.lib import cros_test_lib
+from chromite.lib import osutils
+from chromite.lib import path_util
+from crostestutils.au_test_harness.gce_au_worker import GCEAUWorker
+from crostestutils.lib.gce import GceContext
+
+class Options(object):
+  """A fake class to hold command line options."""
+
+  def __init__(self):
+    self.board = 'fake-board'
+    self.delta = False
+    self.verbose = False
+    self.quick_test = False
+    self.verify_suite_name = 'smoke'
+
+
+class GceAuWorkerTest(cros_test_lib.MockTempDirTestCase):
+  """Test suite for GCEAUWorker."""
+
+  PROJECT = 'test-project'
+  ZONE = 'test-zone'
+  NETWORK = 'default'
+  BUCKET = 'foo-bucket'
+  CLIENT_EMAIL = 'test-account@testdomain.com'
+  GCE_TARBALL = 'chromiumos_test_image_gce_tar.gz'
+
+  def setUp(self):
+    # Fake out environment.
+    options = Options()
+    options.ssh_private_key = os.path.join(self.tempdir, 'ssh-private-key')
+    self.ssh_private_key = options.ssh_private_key
+    osutils.Touch(self.ssh_private_key)
+
+    test_results_root = os.path.join(self.tempdir, 'test-results')
+    self.test_results_all = os.path.join(test_results_root, 'all')
+    self.test_results_failed = os.path.join(test_results_root, 'failed')
+    osutils.SafeMakedirs(self.test_results_all)
+
+    self.json_key_file = os.path.join(self.tempdir, 'service_account.json')
+    osutils.Touch(self.json_key_file)
+
+    self.image_path = os.path.join(self.tempdir, self.GCE_TARBALL)
+    osutils.Touch(self.image_path)
+
+    self.PatchObject(GceContext, 'ForServiceAccount', autospec=True)
+    self.worker = GCEAUWorker(options, test_results_root, project=self.PROJECT,
+                              zone=self.ZONE, network=self.NETWORK,
+                              gcs_bucket=self.BUCKET,
+                              json_key_file=self.json_key_file)
+
+    # Mock out methods.
+    for cmd in ['CreateInstance', 'CreateImage', 'GetInstanceIP',
+                'DeleteInstance', 'DeleteImage', 'ListInstances', 'ListImages']:
+      self.PatchObject(self.worker.gce_context, cmd, autospec=True)
+
+    for cmd in ['CopyInto', 'DoCommand']:
+      self.PatchObject(self.worker.gscontext, cmd, autospec=True)
+
+    self.PatchObject(self.worker, 'GetNextResultsPath', autospec=True,
+                     return_value=(self.test_results_all,
+                                   self.test_results_failed))
+
+  def testUpdateImage(self):
+    """Tests that UpdateImage creates a GCE VM using the given tarball."""
+
+    def _CopyInto(src, _):
+      self.assertEqual(self.image_path, src)
+
+    self.PatchObject(self.worker.gscontext, 'CopyInto', autospec=True,
+                     side_effect=_CopyInto)
+    self.PatchObject(self.worker, '_DeleteInstanceIfExists', autospec=True)
+    self.PatchObject(self.worker, 'GetNextResultsPath', autospec=True,
+                     return_value=('test-resultsi-all', 'test-results-failed'))
+    self.worker.UpdateImage(self.image_path)
+
+    #pylint: disable=protected-access
+    self.worker._DeleteInstanceIfExists.assert_called_once_with()
+    #pylint: enable=protected-access
+    self.assertNotEqual(self.worker.instance, '')
+    self.assertNotEqual(self.worker.image, '')
+    self.assertTrue(self.worker.gscontext.CopyInto.called)
+
+  def testCleanUp(self):
+    """Tests that CleanUp deletes all instances and doesn't leak processes."""
+    for _ in range(3):
+      self.worker.UpdateImage(self.image_path)
+    self.assertEqual(len(self.worker.bg_delete_processes), 2)
+
+    self.worker.CleanUp()
+    self.assertEqual(len(self.worker.bg_delete_processes), 0)
+
+  def testVerifyImage(self):
+    """Tests that VerifyImage calls out to test_that with correct args."""
+
+    def _RunCommand(cmd, *args, **kwargs):
+      expected_cmd = ['test_that', '-b', 'fake-board', '--no-quickmerge',
+                      '--results_dir=%s' % self.test_results_all, '1.2.3.4',
+                      'suite:smoke']
+      for i, arg in enumerate(expected_cmd):
+        self.assertEqual(arg, cmd[i])
+
+      return cros_build_lib.CommandResult()
+
+    self.PatchObject(cros_build_lib, 'RunCommand', autospec=True,
+                     side_effect=_RunCommand)
+    self.PatchObject(self.worker, 'AssertEnoughTestsPassed', autospec=True)
+    self.PatchObject(self.worker, '_DeleteInstanceIfExists', autospec=True)
+    self.PatchObject(self.worker.gce_context, 'GetInstanceIP', autospec=True,
+                     return_value='1.2.3.4')
+    self.PatchObject(path_util, 'ToChrootPath', autospec=True,
+                     return_value='x/y/z')
+    self.worker.UpdateImage(self.image_path)
+    self.worker.VerifyImage(None)
+    self.assertTrue(cros_build_lib.RunCommand.called)
+
+  def testHandleFail(self):
+    """Tests that _HandleFail copies necessary files for repro."""
+    self.PatchObject(cros_build_lib, 'RunCommand', autospec=True)
+    self.PatchObject(self.worker, '_DeleteInstanceIfExists', autospec=True)
+    self.PatchObject(path_util, 'ToChrootPath', autospec=True,
+                     return_value='x/y/z')
+    self.PatchObject(self.worker, 'AssertEnoughTestsPassed', autospec=True,
+                     return_value=False)
+    self.worker.UpdateImage(self.image_path)
+    self.worker.VerifyImage(None)
+    self.assertExists(os.path.join(self.test_results_failed, self.GCE_TARBALL))
+    self.assertExists(os.path.join(self.test_results_failed,
+                                   os.path.basename(self.ssh_private_key)))
+
+
+if __name__ == '__main__':
+  unittest.main()
diff --git a/lib/constants.py b/lib/constants.py
index 9283902..66e9ac7 100644
--- a/lib/constants.py
+++ b/lib/constants.py
@@ -19,5 +19,9 @@
 
 MAX_TIMEOUT_SECONDS = 4800
 
-GCE_PROJECT = 'chromiumos-gce-testlab'
-GCE_ZONE = 'us-central1-a'
+# Information of the GCE autotest bots.
+GCE_PROJECT = 'cros-autotest-bots'
+GCE_DEFAULT_ZONE = 'us-central1-a'
+GCE_DEFAULT_NETWORK = 'network-prod'
+GCE_JSON_KEY = '/creds/service_accounts/service-account-cros-autotest-bots.json'
+GCS_BUCKET = 'chromeos-test-gce-tarballs'