gce_au_worker: Attempt to delete resources only when they exist

There have been failures found in recent trybot runs that were due to a second
attempt to delete a GCE resource. GCEAUWorker creates three types of resources -
GCS object, GCE images and GCE instances. Need to check their existence before
making the remote calls.

BUG=None
TEST=gce_au_worker_unittest.py and trybot against lakitu-pre-cq

Change-Id: I8c52e7fd5a751fe80d6c77c43d509e1e31567443
Reviewed-on: https://chromium-review.googlesource.com/310077
Commit-Ready: Daniel Wang <wonderfly@google.com>
Tested-by: Daniel Wang <wonderfly@google.com>
Reviewed-by: Simran Basi <sbasi@chromium.org>
diff --git a/au_test_harness/gce_au_worker.py b/au_test_harness/gce_au_worker.py
index f2441cd..678cdb7 100644
--- a/au_test_harness/gce_au_worker.py
+++ b/au_test_harness/gce_au_worker.py
@@ -132,18 +132,10 @@
 
   def CleanUp(self):
     """Deletes throw-away instances and images."""
-    logging.info('Waiting for all instances and images to be deleted.')
-
-    def _WaitForBackgroundDeleteProcesses():
-      for p in self.bg_delete_processes:
-        p.join()
-      self.bg_delete_processes = []
-
-    _WaitForBackgroundDeleteProcesses()
-    # Delete the instance/image created by the last call to UpdateImage.
-    self._DeleteInstancesIfExist()
-    _WaitForBackgroundDeleteProcesses()
-    logging.info('All instances/images are deleted.')
+    logging.info('Waiting for GCP resources to be deleted.')
+    self._WaitForBackgroundDeleteProcesses()
+    self._DeleteExistingResources()
+    logging.info('All resources are deleted.')
 
   def PrepareBase(self, image_path, signed_base=False):
     """Auto-update to base image to prepare for test."""
@@ -155,7 +147,16 @@
 
     There may be multiple instances created with different gcloud flags that
     will be used by different tests or suites.
+
+    Unlike vm_au_worker or real_au_worker, UpdateImage always creates a new
+    image and a new instance.
     """
+    # Delete existing resources in the background if any.
+    bg_delete = Process(target=self._DeleteExistingResources)
+    bg_delete.start()
+    self.bg_delete_processes.append(bg_delete)
+
+    # Creates an image and instances.
     self._CreateImage(image_path)
     self._CreateInstances()
 
@@ -336,15 +337,14 @@
 
   def _CreateImage(self, image_path):
     """Uploads the gce tarball and creates an image with it."""
-    self.tarball_local = image_path
     log_directory, fail_directory = self.GetNextResultsPath('update')
-    self._DeleteInstancesIfExist()
+
     ts = datetime.datetime.fromtimestamp(time.time()).strftime(
         '%Y-%m-%d-%H-%M-%S')
-    image = '%s%s' % (self.IMAGE_PREFIX, ts)
-    gs_directory = ('gs://%s/%s' % (self.gcs_bucket, ts))
 
     # Upload the GCE tarball to Google Cloud Storage.
+    self.tarball_local = image_path
+    gs_directory = ('gs://%s/%s' % (self.gcs_bucket, ts))
     try:
       self.gscontext.CopyInto(self.tarball_local, gs_directory)
       self.tarball_remote = '%s/%s' % (gs_directory,
@@ -355,6 +355,7 @@
           'Error: %s' % e)
 
     # Create an image from |image_path| and an instance from the image.
+    image = '%s%s' % (self.IMAGE_PREFIX, ts)
     try:
       self.image_link = self.gce_context.CreateImage(
           image, self._GsPathToUrl(self.tarball_remote))
@@ -377,22 +378,60 @@
       self.instances[test['name']] = instance
     parallel.RunParallelSteps(steps)
 
-  def _DeleteInstancesIfExist(self):
-    """Deletes existing instances if any."""
-    def _DeleteInstancesAndImage():
-      steps = [
-          lambda: self.gscontext.DoCommand(['rm', self.tarball_remote]),
-          lambda: self.gce_context.DeleteImage(self.image),
-      ]
-      for instance in self.instances.values():
-        steps.append(partial(self.gce_context.DeleteInstance, instance))
-      parallel.RunParallelSteps(steps)
+  def _DeleteExistingResouce(self, resource, existence_checker, deletor):
+    """Deletes a resource if it exists.
 
-    if self.instances:
-      logging.info('Deleting instances...')
-      bg_delete = Process(target=_DeleteInstancesAndImage)
-      bg_delete.start()
-      self.bg_delete_processes.append(bg_delete)
+    This method checks the existence of a resource using |existence_checker|,
+    and deletes it on true.
+
+    Args:
+      resource: (str) The resource name/url to delete.
+      existence_checker:
+        (callable) The callable to check existence. This callable should take
+        |resource| as its first argument.
+      deletor:
+        (callable) The callable to perform the deletion. This callable should
+        take |resource| as its first argument.
+
+    Raises:
+      ValueError if existence_checker or deletor is not callable.
+    """
+    if not hasattr(existence_checker, '__call__'):
+      raise ValueError('existence_checker must be a callable')
+    if not hasattr(deletor, '__call__'):
+      raise ValueError('deletor must be a callable')
+
+    if existence_checker(resource):
+      deletor(resource)
+
+  def _DeleteExistingResources(self):
+    """Delete instances, image and the tarball on GCS if they exist."""
+    steps = []
+
+    if self.tarball_remote:
+      steps.append(partial(self.gscontext.DoCommand,
+                           ['rm', self.tarball_remote]))
+    if self.image:
+      steps.append(partial(self.gce_context.DeleteImage, self.image))
+
+    for instance in self.instances.values():
+      steps.append(partial(
+          self._DeleteExistingResouce,
+          resource=instance,
+          existence_checker=self.gce_context.InstanceExists,
+          deletor=self.gce_context.DeleteInstance))
+
+    # Delete all resources in parallel.
+    try:
+      parallel.RunParallelSteps(steps)
+    except Exception as e:
+      logging.warn('Infrastructure failure. Error: %r' % e)
+
+    # Reset variables.
+    self.tarball_remote = None
+    self.image = None
+    self.image_link = None
+    self.instances = {}
 
   def _HandleFail(self, log_directory, fail_directory):
     """Handles test failures.
@@ -424,7 +463,7 @@
     except shutil.Error as e:
       logging.warning('Ignoring errors while copying GCE tarball: %s', e)
 
-    self._DeleteInstancesIfExist()
+    self._DeleteExistingResources()
 
   def _GsPathToUrl(self, gs_path):
     """Converts a gs:// path to a URL.
@@ -444,3 +483,9 @@
       raise ValueError('Invalid GCS path: %s' % gs_path)
     return gs_path.replace(self.GS_PATH_COMMON_PREFIX,
                            self.GS_URL_COMMON_PREFIX, 1)
+
+  def _WaitForBackgroundDeleteProcesses(self):
+    """Waits for all background proecesses to finish."""
+    for p in self.bg_delete_processes:
+      p.join()
+    self.bg_delete_processes = []
diff --git a/au_test_harness/gce_au_worker_unittest.py b/au_test_harness/gce_au_worker_unittest.py
index 0ea7531..21825d0 100755
--- a/au_test_harness/gce_au_worker_unittest.py
+++ b/au_test_harness/gce_au_worker_unittest.py
@@ -245,44 +245,94 @@
       self.assertIn(test, actual_tests_run)
 
   def testCleanUp(self):
-    """Tests that CleanUp deletes all instances and doesn't leak processes."""
+    """Tests that CleanUp deletes all GCS/GCE resources."""
     worker = GCEAUWorker(self.options, self.test_results_root,
                          project=self.PROJECT, zone=self.ZONE,
                          network=self.NETWORK, gcs_bucket=self.BUCKET,
                          json_key_file=self.json_key_file)
-    for cmd in ['CopyInto', 'DoCommand']:
-      self.PatchObject(worker.gscontext, cmd, autospec=True)
+    worker.instances = dict(smoke='fake-instance')
+    worker.image = 'fake-image'
+    worker.tarball_remote = 'gs://fake-tarball'
 
-    self.PatchObject(worker.gce_context, 'DeleteInstance', autospec=True)
+    # Fake resource existance.
+    self.PatchObject(worker.gce_context, 'InstanceExists', autospec=True,
+                     return_value=True)
 
-    for _ in range(3):
-      worker.UpdateImage(self.image_path)
-    self.assertEqual(len(worker.bg_delete_processes), 2)
+    # Fake resource deletors.
+    for cmd in ['DeleteImage', 'DeleteInstance']:
+      self.PatchObject(worker.gce_context, cmd, autospec=True)
+    self.PatchObject(worker.gscontext, 'DoCommand', autospec=True)
 
     worker.CleanUp()
-    self.assertEqual(len(worker.bg_delete_processes), 0)
+
+    # Assert that existing resources are cleaned up.
+    worker.gce_context.DeleteImage.assert_called_once_with('fake-image')
+    worker.gce_context.DeleteInstance.assert_called_once_with('fake-instance')
+    self.assertDictEqual({}, worker.instances)
+    self.assertIsNone(worker.image)
+    self.assertIsNone(worker.tarball_remote)
 
   def testHandleFail(self):
-    """Tests that _HandleFail copies necessary files for repro."""
+    """Tests that _HandleFail copies necessary files for repro.
+
+    In case of a test failure, _HandleFail should copy necessary files to a
+    debug location, and delete existing GCS/GCE resources. And because of the
+    latter, a final call to Cleanup will not make an additional attempt to
+    delete those resources.
+    """
     worker = GCEAUWorker(self.options, self.test_results_root,
                          project=self.PROJECT, zone=self.ZONE,
                          network=self.NETWORK, gcs_bucket=self.BUCKET,
                          json_key_file=self.json_key_file)
+
+    worker.instances = dict(smoke='fake-instance')
+    worker.image = 'fake-image'
+    worker.tarball_local = self.image_path
+    worker.tarball_remote = 'gs://fake-tarball'
+    worker.tests = [dict(name='smoke', flags=dict())]
+
+    # Fake general commands.
     for cmd in ['CopyInto', 'DoCommand']:
       self.PatchObject(worker.gscontext, cmd, autospec=True)
     self.PatchObject(cros_build_lib, 'RunCommand', autospec=True)
-    self.PatchObject(worker, '_DeleteInstancesIfExist', autospec=True)
     self.PatchObject(path_util, 'ToChrootPath', autospec=True,
                      return_value='x/y/z')
+
+    # Make _RunTest return 0% of pass rate.
     self.PatchObject(worker, '_RunTest', autospec=True,
                      return_value=(0, None, None))
-    worker.UpdateImage(self.image_path)
+
+    # Fake resource existance.
+    remote_instance = 'fake-remote-instance'
+    self.PatchObject(worker.gce_context, 'InstanceExists', autospec=True,
+                     side_effect=lambda instance: remote_instance is not None)
+
+    # Fake resource deletors.
+    self.PatchObject(worker.gce_context, 'DeleteImage', autospec=True)
+    # Make DeleteInstance delete the remote resource.
+    def _OverrideDeleteInstance(_):
+      remote_instance = None
+    self.PatchObject(worker.gce_context, 'DeleteInstance', autospec=True,
+                     side_effect=_OverrideDeleteInstance)
+
+    # VerifyImage should fail and _HandleFail should be called.
     worker.VerifyImage(None)
+
+    # Assert that required files are retained at debug location.
     self.assertExists(os.path.join(self.test_results_failed, self.GCE_TARBALL))
     self.assertExists(os.path.join(
         self.test_results_failed,
         os.path.basename(self.options.ssh_private_key)))
 
+    # CleanUp will not attempt to delete resources because they should have been
+    # deleted by _HandleFail.
+    worker.instances = dict(smoke='fake-instance')
+    worker.CleanUp()
+
+    # Assert that one and only one attempt was made to delete existing
+    # instances.
+    worker.gce_context.DeleteInstance.assert_called_once_with(mock.ANY)
+
 
 if __name__ == '__main__':
   unittest.main()
diff --git a/lib/gce.py b/lib/gce.py
index 5955319..b5fa0b6 100644
--- a/lib/gce.py
+++ b/lib/gce.py
@@ -264,21 +264,81 @@
     except KeyError:
       return []
 
+  def GetInstance(self, instance, zone=None):
+    """Gets an Instance Resource by name and zone.
+
+    Args:
+      instance: Name of the instance.
+      zone: Zone where the instance is in. Default zone will be used if omitted.
+
+    Returns:
+      An Instance Resource.
+    """
+    result = self.gce_client.instances().get(project=self.project,
+                                             zone=zone or self.zone,
+                                             instance=instance).execute()
+    return result
+
   def GetInstanceIP(self, instance, zone=None):
     """Gets the external IP of an instance.
 
     Args:
       instance: Name of the instance to get IP for.
       zone: Zone where the instance is in. Default zone will be used if omitted.
+
+    Returns:
+      External IP address of the instance.
+
+    Raises:
+      gce.Error on failures.
     """
-    result = self.gce_client.instances().get(project=self.project,
-                                             zone=zone or self.zone,
-                                             instance=instance).execute()
+    result = self.GetInstance(instance, zone)
+    if not result:
+      raise Error('Instance %s does not exist in zone %s.'
+                  % (instance, zone or self.zone))
     try:
       return result['networkInterfaces'][0]['accessConfigs'][0]['natIP']
     except (KeyError, IndexError):
       raise Error('Failed to get IP address for instance %s' % instance)
 
+  def GetImage(self, image):
+    """Gets an Image Resource by name.
+
+    Args:
+      image: Name of the image to look for.
+
+    Returns:
+      An Image Resource.
+    """
+    result = self.gce_client.images().get(project=self.project,
+                                          image=image).execute()
+    return result
+
+  def InstanceExists(self, instance, zone=None):
+    """Checks if an instance exists in the current project.
+
+    Args:
+      instance: Name of the instance to check existence of.  zone: Zone where
+      the instance is in. Default zone will be used if omitted.
+
+    Returns:
+      True if the instance exists or False otherwise.
+    """
+    result = self.GetInstance(instance, zone)
+    return (result is not None)
+
+  def ImageExists(self, image):
+    """Checks if an image exists in the current project.
+
+    Args:
+      instance: Name of the image to check existence of.
+
+    Returns:
+      True if the instance exists or False otherwise.
+    """
+    result = self.GetImage(image)
+    return (result is not None)
+
   def _WaitForZoneOperation(self, operation, zone=None, timeout_handler=None):
     get_request = self.gce_client.zoneOperations().get(
         project=self.project, zone=zone or self.zone, operation=operation)