gce_au_worker: Release allocated resources before terminating

Recent GCP resource leaks(e.g., chromium:579811) show that when
cbuildbot.stages.test_stages.GCETestStage times out, the subprocess that calls
out to ctest and thus this module is aborted, without getting a chance to clean
things up, resulting in leaked GCP resources and various quotas being hit.

This CL patches gce_au_worker to intercept SIGINT and SIGTERM and release
allocated resources before terminating the test process. It also updates `ctest`
to wait for a certain amount of time (10 minutes) before sending SIGKILL to the
test subprocess. Similarly, the cbuildbot code has been updated to suspend the
termination of the `ctest` subprocess.

BUG=b:26489739
TEST=gce_au_worker_unittest, manual ctest run passes, and ran a trybot against
    lakitu-pre-cq

Change-Id: I1a174e7ce16f7f52c2cd1b80e1f936d83b2ac966
Reviewed-on: https://chromium-review.googlesource.com/323867
Commit-Ready: Daniel Wang <wonderfly@google.com>
Tested-by: Daniel Wang <wonderfly@google.com>
Reviewed-by: Simran Basi <sbasi@chromium.org>
Reviewed-by: Don Garrett <dgarrett@chromium.org>
diff --git a/au_test_harness/au_test.py b/au_test_harness/au_test.py
index c0f0de1..d78bff2 100644
--- a/au_test_harness/au_test.py
+++ b/au_test_harness/au_test.py
@@ -8,12 +8,17 @@
 
 import os
 import re
+import signal
+import sys
 import time
 import unittest
 import urllib
 
+from functools import partial
+
 from chromite.lib import cros_logging as logging
 from chromite.lib import dev_server_wrapper
+from chromite.lib import signals
 from crostestutils.au_test_harness import cros_test_proxy
 from crostestutils.au_test_harness import gce_au_worker
 from crostestutils.au_test_harness import real_au_worker
@@ -29,6 +34,18 @@
   See documentation for au_worker for more information.
   """
 
+  def __init__(self, *_args, **_kwargs):
+    super(AUTest, self).__init__(*_args, **_kwargs)
+
+    # Original signal handlers.
+    self._old_sigint = None
+    self._old_sigterm = None
+
+    # Verify that the signals modules is actually usable, and won't segfault
+    # upon invocation of getsignal. See signals.SignalModuleUsable for the
+    # details and upstream python bug.
+    self._use_signals = signals.SignalModuleUsable()
+
   @classmethod
   def ProcessOptions(cls, options):
     """Processes options for the test suite and sets up the worker class.
@@ -83,13 +100,20 @@
       proxy.shutdown()
 
   # --- UNITTEST SPECIFIC METHODS ---
-
   def setUp(self):
     """Overrides unittest.TestCase.setUp and called before every test.
 
     Sets instance specific variables and initializes worker.
     """
     super(AUTest, self).setUp()
+    # Install custom signal handlers that call worker.CleanUp on the receipt of
+    # SIGINT and SIGTERM. This is particularly useful in cases where the worker
+    # allocates resources in prepare stage, but is taking too long at test stage
+    # and the caller decides to terminate it (by sending a SIGTERM). The default
+    # action of SIGTERM is terminate, which leaves expensive resources leaked
+    # and/or local environment tainted.
+    if self._use_signals:
+      self._InstallHandlers()
     self.worker = self.worker_class(self.options, AUTest.test_results_root)
     self.download_folder = os.path.join(os.path.realpath(os.path.curdir),
                                         'latest_download')
@@ -97,6 +121,9 @@
   def tearDown(self):
     """Overrides unittest.TestCase.tearDown and called after every test."""
     self.worker.CleanUp()
+    # Restore signal handlers.
+    if self._use_signals:
+      self._RestoreHandlers()
 
   def testUpdateKeepStateful(self):
     """Tests if we can update normally.
@@ -283,3 +310,47 @@
     # This update is expected to fail...
     expected_msg = 'zlib inflate() error:-3'
     self.AttemptUpdateWithPayloadExpectedFailure(payload, expected_msg)
+
+  # --- PRIVATE HELPER FUNCTIONS ---
+  def _InstallHandlers(self):
+    """Installs signal handlers for SIGINT and SIGTERM."""
+    self._old_sigint = signal.getsignal(signal.SIGINT)
+    self._old_sigterm = signal.getsignal(signal.SIGTERM)
+    signal.signal(signal.SIGINT, partial(self._SigintAndSigtermHandler,
+                                         self._old_sigint))
+    signal.signal(signal.SIGTERM, partial(self._SigintAndSigtermHandler,
+                                          self._old_sigterm))
+
+  def _RestoreHandlers(self):
+    """Restores signal handlers for SIGINT and SIGTERM."""
+    signal.signal(signal.SIGINT, self._old_sigint)
+    signal.signal(signal.SIGTERM, self._old_sigterm)
+
+  def _SigintAndSigtermHandler(self, original_handler, signum, frame):
+    """Common signal handler for SIGINT and SIGTERM.
+
+    It tries to clean up allocated resources, and relays the signal to the
+    original handler.
+
+    Args:
+      original_handler: The original signal handler.
+      signum: The number of the signal to handle.
+      frame: Current stack frame. See signal.signal for details on |signum| and
+          |frame|.
+    """
+    logging.warning('Received signal %d' % signum)
+    if signum:
+      # If we've been invoked because of a signal, ignore delivery of that
+      # signal from this point forward. The invoking context of this method
+      # restores signal delivery to what it was prior; we suppress future
+      # delivery till then since this code handles SIGINT/SIGTERM fully
+      # including delivering the signal to the original handler on the way out.
+      #
+      # Mask both SIGINT and SIGTERM so that the cleanup won't be interrupted.
+      # They will be turned back on once cleanup finishes.
+      signal.signal(signal.SIGINT, signal.SIG_IGN)
+      signal.signal(signal.SIGTERM, signal.SIG_IGN)
+    self.worker.CleanUp()
+    if not signals.RelaySignal(original_handler, signum, frame):
+      logging.warning('Failed to relay signal %d to original handler.' % signum)
+      sys.exit('Received signal %d.' % signum)
diff --git a/au_test_harness/au_test_test.py b/au_test_harness/au_test_test.py
new file mode 100755
index 0000000..a42448b
--- /dev/null
+++ b/au_test_harness/au_test_test.py
@@ -0,0 +1,71 @@
+#!/usr/bin/python2
+#
+# Copyright 2016 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Tests for au_test."""
+
+from __future__ import print_function
+
+import os
+import signal
+import sys
+import time
+import unittest
+import mock
+
+from multiprocessing import Process, Value
+
+import constants
+sys.path.append(constants.CROS_PLATFORM_ROOT)
+sys.path.append(constants.SOURCE_ROOT)
+
+from crostestutils.au_test_harness.au_test import AUTest
+
+
+class AuTestTest(unittest.TestCase):
+  """Test for autest.AUTest."""
+
+  def testSigtermAndSigintHandled(self):
+    """Tests that worker.CleanUp is always called."""
+    def _ChildProcess(shared_integer):
+      class _FakeWorker(object):
+        """A fake worker class to use as a mock."""
+        def __init__(self, options, test_results_root):
+          pass
+
+        def CleanUp(self):
+          shared_integer.value = 1
+
+      with mock.patch.object(AUTest, 'SimpleTestVerify', autospec=True,
+                             side_effect=lambda _: time.sleep(60)):
+        AUTest.worker_class = _FakeWorker
+        AUTest.options = None
+        AUTest.test_results_root = None
+        test_suite = unittest.TestSuite()
+        test_suite.addTest(AUTest('SimpleTestVerify'))
+        test_runner = unittest.TextTestRunner()
+        # This is going to block for 60 seconds.
+        test_runner.run(test_suite)
+
+    for signum in (signal.SIGINT, signal.SIGTERM):
+      # Start a child process to run VerifyImage.
+      shared_integer = Value('i', 0)
+      child = Process(target=_ChildProcess, args=(shared_integer,))
+      child.start()
+
+      # Wait till it gets into SimpleTestVerify.
+      time.sleep(2)
+      # Send signal to the child process - test should abort, and if signal is
+      # not caught and handled, the child will terminate without calling
+      # worker.CleanUp.
+      os.kill(child.pid, signum)
+      child.join()
+      # Assert that worker.CleanUp is called.
+      self.assertEqual(1, shared_integer.value)
+
+if __name__ == '__main__':
+  suite = unittest.TestSuite()
+  suite.addTest(AuTestTest('testSigtermAndSigintHandled'))
+  unittest.TextTestRunner().run(suite)
diff --git a/au_test_harness/gce_au_worker.py b/au_test_harness/gce_au_worker.py
index cf25b66..0c8bbde 100644
--- a/au_test_harness/gce_au_worker.py
+++ b/au_test_harness/gce_au_worker.py
@@ -81,7 +81,6 @@
 from chromite.lib import portage_util
 from crostestutils.au_test_harness import au_worker
 from crostestutils.au_test_harness import constants
-from crostestutils.au_test_harness import update_exception
 
 
 class GCEAUWorker(au_worker.AUWorker):
@@ -98,10 +97,7 @@
     image: A single GCE image associated with a worker.
     image_link: The URL to the image created.
     instances: GCE VM instances associated with a worker.
-    bg_delete_processes:
-      Background processes that delete stale instances and images.
   """
-
   _GS_PATH_COMMON_PREFIX = 'gs://'
   _GS_URL_COMMON_PREFIX = 'https://storage.googleapis.com/'
   _IMAGE_PREFIX = 'test-image-'
@@ -132,7 +128,7 @@
     self.instances = {}
 
     # Background processes that delete throw-away instances.
-    self.bg_delete_processes = []
+    self._bg_delete_processes = []
 
     # Load test specifications from <overlay>/scripts/gce_tests.json, if any.
     self._LoadTests()
@@ -161,11 +157,16 @@
     # Delete existing resources in the background if any.
     bg_delete = Process(target=self._DeleteExistingResources)
     bg_delete.start()
-    self.bg_delete_processes.append(bg_delete)
+    self._bg_delete_processes.append(bg_delete)
 
-    # Creates an image and instances.
-    self._CreateImage(image_path)
-    self._CreateInstances()
+    log_directory, fail_directory = self.GetNextResultsPath('update')
+    # Create an image and instances.
+    try:
+      self._CreateImage(image_path)
+      self._CreateInstances()
+    except:
+      self._HandleFail(log_directory, fail_directory)
+      raise
 
   def VerifyImage(self, unittest, percent_required_to_pass=100, test=''):
     """Verifies the image by running all the required tests.
@@ -185,14 +186,17 @@
     """
     log_directory_base, fail_directory_base = self.GetNextResultsPath(
         'autotest_tests')
-
     steps = []
     for test in self.tests:
       remote = self.gce_context.GetInstanceIP(self.instances[test['name']])
       # Prefer partial to lambda because of Python's late binding.
       steps.append(partial(self._RunTest, test['name'], remote,
                            log_directory_base, fail_directory_base))
-    return_values = parallel.RunParallelSteps(steps, return_values=True)
+    try:
+      return_values = parallel.RunParallelSteps(steps, return_values=True)
+    except:
+      self._HandleFail(log_directory_base, fail_directory_base)
+      raise
 
     passed = True
     test_reports = {}
@@ -210,7 +214,6 @@
     return passed
 
   # --- PRIVATE HELPER FUNCTIONS ---
-
   def _RunTest(self, test, remote, log_directory_base, fail_directory_base):
     """Runs a test or a suite of tests on a given remote.
 
@@ -357,51 +360,40 @@
       custom_tests = portage_util.ReadOverlayFile(
           'scripts/gce_tests.json', board=self.board)
     except portage_util.MissingOverlayException as e:
-      logging.warn('Board overlay not found. Error: %r', e)
+      logging.warning('Board overlay not found. Error: %s', e)
 
     if custom_tests is not None:
       if self.board not in constants.TRUSTED_BOARDS:
-        logging.warn('Custom tests and flags are not allowed for this board '
-                     '(%s)!', self.board)
+        logging.warning('Custom tests and flags are not allowed for this board '
+                        '(%s)!', self.board)
       else:
         # Read the list of tests.
         try:
           json_file = json.loads(custom_tests)
           tests = json_file.get('tests')
         except ValueError as e:
-          logging.warn('scripts/gce_tests.json contains invalid JSON content. '
-                       'Default tests will be run and default flags will be '
-                       'used to create instances. Error: %r', e)
+          logging.warning('scripts/gce_tests.json contains invalid JSON '
+                          'content. Default tests will be run and default '
+                          'flags will be used to create instances. Error: %s',
+                          e)
     self.tests = tests
 
   def _CreateImage(self, image_path):
     """Uploads the gce tarball and creates an image with it."""
-    log_directory, fail_directory = self.GetNextResultsPath('update')
-
     ts = datetime.datetime.fromtimestamp(time.time()).strftime(
         '%Y-%m-%d-%H-%M-%S')
 
     # Upload the GCE tarball to Google Cloud Storage.
     self.tarball_local = image_path
     gs_directory = ('gs://%s/%s' % (self.gcs_bucket, ts))
-    try:
-      self.gscontext.CopyInto(self.tarball_local, gs_directory)
-      self.tarball_remote = '%s/%s' % (gs_directory,
-                                       os.path.basename(self.tarball_local))
-    except Exception as e:
-      raise update_exception.UpdateException(
-          1, 'Update failed. Unable to upload test image GCE tarball to GCS. '
-          'Error: %s' % e)
+    self.tarball_remote = '%s/%s' % (gs_directory,
+                                     os.path.basename(self.tarball_local))
+    self.gscontext.CopyInto(self.tarball_local, gs_directory)
 
-    # Create an image from |image_path| and an instance from the image.
-    image = '%s%s' % (self._IMAGE_PREFIX, ts)
-    try:
-      self.image_link = self.gce_context.CreateImage(
-          image, self._GsPathToUrl(self.tarball_remote))
-      self.image = image
-    except gce.Error as e:
-      self._HandleFail(log_directory, fail_directory)
-      raise update_exception.UpdateException(1, 'Update failed. Error: %r' % e)
+    # Create an image from |image_path|.
+    self.image = self._IMAGE_PREFIX + ts
+    self.image_link = self.gce_context.CreateImage(
+        self.image, self._GsPathToUrl(self.tarball_remote))
 
   def _CreateInstance(self, name, image, **kwargs):
     """Creates a single VM instance with a static IP address."""
@@ -424,64 +416,51 @@
       self.instances[test['name']] = instance
     parallel.RunParallelSteps(steps)
 
-  def _DeleteExistingResouce(self, resource, existence_checker, deletor):
-    """Deletes a resource if it exists.
-
-    This method checks the existence of a resource using |existence_checker|,
-    and deletes it on true.
-
-    Args:
-      resource: The resource name/url to delete.
-      existence_checker: A callable to check existence. This callable should
-          take |resource| as its first argument.
-      deletor: A callable to perform the deletion. This callable should take
-          |resource| as its first argument.
-
-    Raises:
-      ValueError if existence_checker or deletor is not callable.
-    """
-    if not hasattr(existence_checker, '__call__'):
-      raise ValueError('existence_checker must be a callable')
-    if not hasattr(deletor, '__call__'):
-      raise ValueError('deletor must be a callable')
-
-    if existence_checker(resource):
-      deletor(resource)
-
-  def _DeleteInstance(self, name):
-    """Deletes a VM instance and its IP address."""
-    self.gce_context.DeleteInstance(name)
-    self.gce_context.DeleteAddress(name)
-
   def _DeleteExistingResources(self):
-    """Deletes instances, image and the tarball on GCS if they exist."""
-    steps = []
+    """Deletes all allocated GCP resources."""
+    # There are cases where resources are created at the backend but the
+    # resource creation calls fail, for example due to network errors that
+    # happen when the response is being delivered. So we always make sure to
+    # delete all allocated resources (images, instances, addresses) regardless
+    # of whether the corresponding Create operation succeeded.
 
-    if self.tarball_remote:
-      steps.append(partial(self.gscontext.DoCommand,
-                           ['rm', self.tarball_remote]))
-    if self.image:
-      steps.append(partial(self.gce_context.DeleteImage, self.image))
+    # Delete the GCE instances.
+    steps = [partial(self.gce_context.DeleteInstance, instance) for instance in
+             self.instances.values()]
 
-    for instance in self.instances.values():
-      steps.append(partial(
-          self._DeleteExistingResouce,
-          resource=instance,
-          existence_checker=self.gce_context.InstanceExists,
-          deletor=self._DeleteInstance))
+    def _RunParallelIgnoreErrors(funcs):
+      try:
+        parallel.RunParallelSteps(funcs)
+      except parallel.BackgroundFailure as e:
+        # We don't want to halt the test stage (and thus block commits) for
+        # cleanup errors. Leaked resources will be cleaned up externally.
+        logging.warning(
+            'Ignoring BackgroundFailure while deleting resources: %s', e)
+    _RunParallelIgnoreErrors(steps)
 
-    # Delete all resources in parallel.
-    try:
-      parallel.RunParallelSteps(steps)
-    except Exception as e:
-      logging.warn('Infrastructure failure. Error: %r' % e)
+    # Delete static IP addresses.
+    steps = [partial(self.gce_context.DeleteAddress, instance) for instance in
+             self.instances.values()]
+    _RunParallelIgnoreErrors(steps)
 
-    # Reset variables.
-    self.tarball_remote = None
-    self.image = None
-    self.image_link = None
     self.instances = {}
 
+    # Delete the GCE image.
+    # Have to delete the image after all instances are deleted because if the
+    # image is being used to create an instance (e.g., current process is asked
+    # to terminate during instance creation), it cannot be deleted until the
+    # instance creation ends.
+    if self.image:
+      self.gce_context.DeleteImage(self.image)
+    self.image = self.image_link = None
+
+    # Delete the tarball uploaded GCS.
+    # For the same reason, it's safer to delete the tarball after the image is
+    # deleted.
+    if self.tarball_remote:
+      self.gscontext.DoCommand(['rm', self.tarball_remote])
+    self.tarball_remote = None
+
   def _HandleFail(self, log_directory, fail_directory):
     """Handles test failures.
 
@@ -497,22 +476,17 @@
     if not os.path.isdir(parent_dir):
       os.makedirs(parent_dir)
 
-    # Copy logs. Must be done before moving image, as this creates
-    # |fail_directory|.
     try:
+      # Copy logs. Must be done before moving image, as this creates
+      # |fail_directory|.
       shutil.copytree(log_directory, fail_directory)
-    except shutil.Error as e:
-      logging.warning('Ignoring errors while copying logs: %s', e)
 
-    # Copy GCE tarball and ssh private key for debugging.
-    try:
+      # Copy GCE tarball and ssh private key for debugging.
       shutil.copy(self.tarball_local, fail_directory)
       if self.ssh_private_key is not None:
         shutil.copy(self.ssh_private_key, fail_directory)
-    except shutil.Error as e:
-      logging.warning('Ignoring errors while copying GCE tarball: %s', e)
-
-    self._DeleteExistingResources()
+    except (shutil.Error, OSError, IOError) as e:
+      logging.warning('Ignoring error while copying logs: %s', e)
 
   def _GsPathToUrl(self, gs_path):
     """Converts a gs:// path to a URL.
@@ -535,6 +509,6 @@
 
   def _WaitForBackgroundDeleteProcesses(self):
     """Waits for all background proecesses to finish."""
-    for p in self.bg_delete_processes:
+    for p in self._bg_delete_processes:
       p.join()
-    self.bg_delete_processes = []
+    self._bg_delete_processes = []
diff --git a/au_test_harness/gce_au_worker_unittest.py b/au_test_harness/gce_au_worker_unittest.py
index 20ba25b..be129a0 100755
--- a/au_test_harness/gce_au_worker_unittest.py
+++ b/au_test_harness/gce_au_worker_unittest.py
@@ -279,9 +279,7 @@
     """Tests that _HandleFail copies necessary files for repro.
 
     In case of a test failure, _HandleFail should copy necessary files to a
-    debug location, and delete existing GCS/GCE resources. And because of the
-    latter, a final call to Cleanup will not make an additional attempt to
-    delete those resources.
+    debug location.
     """
     worker = GCEAUWorker(self.options, self.test_results_root,
                          project=self.PROJECT, zone=self.ZONE,
@@ -305,20 +303,6 @@
     self.PatchObject(worker, '_RunTest', autospec=True,
                      return_value=(0, None, None))
 
-    # Fake resource existance.
-    remote_instances = ('fake-remote-instance',)
-    self.PatchObject(
-        worker.gce_context, 'InstanceExists', autospec=True,
-        side_effect=lambda instance: instance in remote_instances)
-
-    # Fake resource deletors.
-    self.PatchObject(worker.gce_context, 'DeleteImage', autospec=True)
-    # Make DeleteInstance delete the remote resource.
-    def _OverrideDeleteInstance(instance):
-      remote_instances.remove(instance)
-    self.PatchObject(worker.gce_context, 'DeleteInstance', autospec=True,
-                     side_effect=_OverrideDeleteInstance)
-
     # VerifyImage should fail and _HandleFail should be called.
     worker.VerifyImage(None)
 
@@ -328,15 +312,5 @@
         self.test_results_failed,
         os.path.basename(self.options.ssh_private_key)))
 
-    # CleanUp will not attempt to delete resources because they should have been
-    # deleted by _HandleFail.
-    worker.instances = dict(smoke='fake-instance')
-    worker.CleanUp()
-
-    # Assert that one and only one attempt was made to delete existing
-    # instances.
-    worker.gce_context.DeleteInstance.assert_called_once_with(mock.ANY)
-
-
 if __name__ == '__main__':
   unittest.main()
diff --git a/ctest/ctest.py b/ctest/ctest.py
index 7cc0675..b15f016 100755
--- a/ctest/ctest.py
+++ b/ctest/ctest.py
@@ -205,8 +205,9 @@
     if not quick_update and self.sign_payloads:
       cmd.append('--payload_signing_key=%s' % self.payload_signing_key)
 
+    # Give tests 10 minutes to clean up before shut down.
     res = cros_build_lib.RunCommand(cmd, cwd=self.crosutils_root,
-                                    error_code_ok=True)
+                                    error_code_ok=True, kill_timeout=10 * 60)
     if res.returncode != 0:
       raise TestException('%s exited with code %d: %s' % (' '.join(res.cmd),
                                                           res.returncode,