gce_au_worker: Release allocated resources before terminating
Recent GCP resource leaks(e.g., chromium:579811) show that when
cbuildbot.stages.test_stages.GCETestStage times out, the subprocess that calls
out to ctest and thus this module is aborted, without getting a chance to clean
things up, resulting in leaked GCP resources and various quotas being hit.
This CL patches gce_au_worker to intercept SIGINT and SIGTERM and release
allocated resources before terminating the test process. It also updates `ctest`
to wait for a certain amount of time (10 minutes) before sending SIGKILL to the
test subprocess. Similarly, the cbuildbot code has been updated to suspend the
termination of the `ctest` subprocess.
BUG=b:26489739
TEST=gce_au_worker_unittest, manual ctest run passes, and ran a trybot against
lakitu-pre-cq
Change-Id: I1a174e7ce16f7f52c2cd1b80e1f936d83b2ac966
Reviewed-on: https://chromium-review.googlesource.com/323867
Commit-Ready: Daniel Wang <wonderfly@google.com>
Tested-by: Daniel Wang <wonderfly@google.com>
Reviewed-by: Simran Basi <sbasi@chromium.org>
Reviewed-by: Don Garrett <dgarrett@chromium.org>
diff --git a/au_test_harness/au_test.py b/au_test_harness/au_test.py
index c0f0de1..d78bff2 100644
--- a/au_test_harness/au_test.py
+++ b/au_test_harness/au_test.py
@@ -8,12 +8,17 @@
import os
import re
+import signal
+import sys
import time
import unittest
import urllib
+from functools import partial
+
from chromite.lib import cros_logging as logging
from chromite.lib import dev_server_wrapper
+from chromite.lib import signals
from crostestutils.au_test_harness import cros_test_proxy
from crostestutils.au_test_harness import gce_au_worker
from crostestutils.au_test_harness import real_au_worker
@@ -29,6 +34,18 @@
See documentation for au_worker for more information.
"""
+ def __init__(self, *_args, **_kwargs):
+ super(AUTest, self).__init__(*_args, **_kwargs)
+
+ # Original signal handlers.
+ self._old_sigint = None
+ self._old_sigterm = None
+
+ # Verify that the signals modules is actually usable, and won't segfault
+ # upon invocation of getsignal. See signals.SignalModuleUsable for the
+ # details and upstream python bug.
+ self._use_signals = signals.SignalModuleUsable()
+
@classmethod
def ProcessOptions(cls, options):
"""Processes options for the test suite and sets up the worker class.
@@ -83,13 +100,20 @@
proxy.shutdown()
# --- UNITTEST SPECIFIC METHODS ---
-
def setUp(self):
"""Overrides unittest.TestCase.setUp and called before every test.
Sets instance specific variables and initializes worker.
"""
super(AUTest, self).setUp()
+ # Install custom signal handlers that call worker.CleanUp on the receipt of
+ # SIGINT and SIGTERM. This is particularly useful in cases where the worker
+ # allocates resources in prepare stage, but is taking too long at test stage
+ # and the caller decides to terminate it (by sending a SIGTERM). The default
+ # action of SIGTERM is terminate, which leaves expensive resources leaked
+ # and/or local environment tainted.
+ if self._use_signals:
+ self._InstallHandlers()
self.worker = self.worker_class(self.options, AUTest.test_results_root)
self.download_folder = os.path.join(os.path.realpath(os.path.curdir),
'latest_download')
@@ -97,6 +121,9 @@
def tearDown(self):
"""Overrides unittest.TestCase.tearDown and called after every test."""
self.worker.CleanUp()
+ # Restore signal handlers.
+ if self._use_signals:
+ self._RestoreHandlers()
def testUpdateKeepStateful(self):
"""Tests if we can update normally.
@@ -283,3 +310,47 @@
# This update is expected to fail...
expected_msg = 'zlib inflate() error:-3'
self.AttemptUpdateWithPayloadExpectedFailure(payload, expected_msg)
+
+ # --- PRIVATE HELPER FUNCTIONS ---
+ def _InstallHandlers(self):
+ """Installs signal handlers for SIGINT and SIGTERM."""
+ self._old_sigint = signal.getsignal(signal.SIGINT)
+ self._old_sigterm = signal.getsignal(signal.SIGTERM)
+ signal.signal(signal.SIGINT, partial(self._SigintAndSigtermHandler,
+ self._old_sigint))
+ signal.signal(signal.SIGTERM, partial(self._SigintAndSigtermHandler,
+ self._old_sigterm))
+
+ def _RestoreHandlers(self):
+ """Restores signal handlers for SIGINT and SIGTERM."""
+ signal.signal(signal.SIGINT, self._old_sigint)
+ signal.signal(signal.SIGTERM, self._old_sigterm)
+
+ def _SigintAndSigtermHandler(self, original_handler, signum, frame):
+ """Common signal handler for SIGINT and SIGTERM.
+
+ It tries to clean up allocated resources, and relays the signal to the
+ original handler.
+
+ Args:
+ original_handler: The original signal handler.
+ signum: The number of the signal to handle.
+ frame: Current stack frame. See signal.signal for details on |signum| and
+ |frame|.
+ """
+ logging.warning('Received signal %d' % signum)
+ if signum:
+ # If we've been invoked because of a signal, ignore delivery of that
+ # signal from this point forward. The invoking context of this method
+ # restores signal delivery to what it was prior; we suppress future
+ # delivery till then since this code handles SIGINT/SIGTERM fully
+ # including delivering the signal to the original handler on the way out.
+ #
+ # Mask both SIGINT and SIGTERM so that the cleanup won't be interrupted.
+ # They will be turned back on once cleanup finishes.
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+ signal.signal(signal.SIGTERM, signal.SIG_IGN)
+ self.worker.CleanUp()
+ if not signals.RelaySignal(original_handler, signum, frame):
+ logging.warning('Failed to relay signal %d to original handler.' % signum)
+ sys.exit('Received signal %d.' % signum)
diff --git a/au_test_harness/au_test_test.py b/au_test_harness/au_test_test.py
new file mode 100755
index 0000000..a42448b
--- /dev/null
+++ b/au_test_harness/au_test_test.py
@@ -0,0 +1,71 @@
+#!/usr/bin/python2
+#
+# Copyright 2016 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Tests for au_test."""
+
+from __future__ import print_function
+
+import os
+import signal
+import sys
+import time
+import unittest
+import mock
+
+from multiprocessing import Process, Value
+
+import constants
+sys.path.append(constants.CROS_PLATFORM_ROOT)
+sys.path.append(constants.SOURCE_ROOT)
+
+from crostestutils.au_test_harness.au_test import AUTest
+
+
+class AuTestTest(unittest.TestCase):
+ """Test for autest.AUTest."""
+
+ def testSigtermAndSigintHandled(self):
+ """Tests that worker.CleanUp is always called."""
+ def _ChildProcess(shared_integer):
+ class _FakeWorker(object):
+ """A fake worker class to use as a mock."""
+ def __init__(self, options, test_results_root):
+ pass
+
+ def CleanUp(self):
+ shared_integer.value = 1
+
+ with mock.patch.object(AUTest, 'SimpleTestVerify', autospec=True,
+ side_effect=lambda _: time.sleep(60)):
+ AUTest.worker_class = _FakeWorker
+ AUTest.options = None
+ AUTest.test_results_root = None
+ test_suite = unittest.TestSuite()
+ test_suite.addTest(AUTest('SimpleTestVerify'))
+ test_runner = unittest.TextTestRunner()
+ # This is going to block for 60 seconds.
+ test_runner.run(test_suite)
+
+ for signum in (signal.SIGINT, signal.SIGTERM):
+ # Start a child process to run VerifyImage.
+ shared_integer = Value('i', 0)
+ child = Process(target=_ChildProcess, args=(shared_integer,))
+ child.start()
+
+ # Wait till it gets into SimpleTestVerify.
+ time.sleep(2)
+ # Send signal to the child process - test should abort, and if signal is
+ # not caught and handled, the child will terminate without calling
+ # worker.CleanUp.
+ os.kill(child.pid, signum)
+ child.join()
+ # Assert that worker.CleanUp is called.
+ self.assertEqual(1, shared_integer.value)
+
+if __name__ == '__main__':
+ suite = unittest.TestSuite()
+ suite.addTest(AuTestTest('testSigtermAndSigintHandled'))
+ unittest.TextTestRunner().run(suite)
diff --git a/au_test_harness/gce_au_worker.py b/au_test_harness/gce_au_worker.py
index cf25b66..0c8bbde 100644
--- a/au_test_harness/gce_au_worker.py
+++ b/au_test_harness/gce_au_worker.py
@@ -81,7 +81,6 @@
from chromite.lib import portage_util
from crostestutils.au_test_harness import au_worker
from crostestutils.au_test_harness import constants
-from crostestutils.au_test_harness import update_exception
class GCEAUWorker(au_worker.AUWorker):
@@ -98,10 +97,7 @@
image: A single GCE image associated with a worker.
image_link: The URL to the image created.
instances: GCE VM instances associated with a worker.
- bg_delete_processes:
- Background processes that delete stale instances and images.
"""
-
_GS_PATH_COMMON_PREFIX = 'gs://'
_GS_URL_COMMON_PREFIX = 'https://storage.googleapis.com/'
_IMAGE_PREFIX = 'test-image-'
@@ -132,7 +128,7 @@
self.instances = {}
# Background processes that delete throw-away instances.
- self.bg_delete_processes = []
+ self._bg_delete_processes = []
# Load test specifications from <overlay>/scripts/gce_tests.json, if any.
self._LoadTests()
@@ -161,11 +157,16 @@
# Delete existing resources in the background if any.
bg_delete = Process(target=self._DeleteExistingResources)
bg_delete.start()
- self.bg_delete_processes.append(bg_delete)
+ self._bg_delete_processes.append(bg_delete)
- # Creates an image and instances.
- self._CreateImage(image_path)
- self._CreateInstances()
+ log_directory, fail_directory = self.GetNextResultsPath('update')
+ # Create an image and instances.
+ try:
+ self._CreateImage(image_path)
+ self._CreateInstances()
+ except:
+ self._HandleFail(log_directory, fail_directory)
+ raise
def VerifyImage(self, unittest, percent_required_to_pass=100, test=''):
"""Verifies the image by running all the required tests.
@@ -185,14 +186,17 @@
"""
log_directory_base, fail_directory_base = self.GetNextResultsPath(
'autotest_tests')
-
steps = []
for test in self.tests:
remote = self.gce_context.GetInstanceIP(self.instances[test['name']])
# Prefer partial to lambda because of Python's late binding.
steps.append(partial(self._RunTest, test['name'], remote,
log_directory_base, fail_directory_base))
- return_values = parallel.RunParallelSteps(steps, return_values=True)
+ try:
+ return_values = parallel.RunParallelSteps(steps, return_values=True)
+ except:
+ self._HandleFail(log_directory_base, fail_directory_base)
+ raise
passed = True
test_reports = {}
@@ -210,7 +214,6 @@
return passed
# --- PRIVATE HELPER FUNCTIONS ---
-
def _RunTest(self, test, remote, log_directory_base, fail_directory_base):
"""Runs a test or a suite of tests on a given remote.
@@ -357,51 +360,40 @@
custom_tests = portage_util.ReadOverlayFile(
'scripts/gce_tests.json', board=self.board)
except portage_util.MissingOverlayException as e:
- logging.warn('Board overlay not found. Error: %r', e)
+ logging.warning('Board overlay not found. Error: %s', e)
if custom_tests is not None:
if self.board not in constants.TRUSTED_BOARDS:
- logging.warn('Custom tests and flags are not allowed for this board '
- '(%s)!', self.board)
+ logging.warning('Custom tests and flags are not allowed for this board '
+ '(%s)!', self.board)
else:
# Read the list of tests.
try:
json_file = json.loads(custom_tests)
tests = json_file.get('tests')
except ValueError as e:
- logging.warn('scripts/gce_tests.json contains invalid JSON content. '
- 'Default tests will be run and default flags will be '
- 'used to create instances. Error: %r', e)
+ logging.warning('scripts/gce_tests.json contains invalid JSON '
+ 'content. Default tests will be run and default '
+ 'flags will be used to create instances. Error: %s',
+ e)
self.tests = tests
def _CreateImage(self, image_path):
"""Uploads the gce tarball and creates an image with it."""
- log_directory, fail_directory = self.GetNextResultsPath('update')
-
ts = datetime.datetime.fromtimestamp(time.time()).strftime(
'%Y-%m-%d-%H-%M-%S')
# Upload the GCE tarball to Google Cloud Storage.
self.tarball_local = image_path
gs_directory = ('gs://%s/%s' % (self.gcs_bucket, ts))
- try:
- self.gscontext.CopyInto(self.tarball_local, gs_directory)
- self.tarball_remote = '%s/%s' % (gs_directory,
- os.path.basename(self.tarball_local))
- except Exception as e:
- raise update_exception.UpdateException(
- 1, 'Update failed. Unable to upload test image GCE tarball to GCS. '
- 'Error: %s' % e)
+ self.tarball_remote = '%s/%s' % (gs_directory,
+ os.path.basename(self.tarball_local))
+ self.gscontext.CopyInto(self.tarball_local, gs_directory)
- # Create an image from |image_path| and an instance from the image.
- image = '%s%s' % (self._IMAGE_PREFIX, ts)
- try:
- self.image_link = self.gce_context.CreateImage(
- image, self._GsPathToUrl(self.tarball_remote))
- self.image = image
- except gce.Error as e:
- self._HandleFail(log_directory, fail_directory)
- raise update_exception.UpdateException(1, 'Update failed. Error: %r' % e)
+ # Create an image from |image_path|.
+ self.image = self._IMAGE_PREFIX + ts
+ self.image_link = self.gce_context.CreateImage(
+ self.image, self._GsPathToUrl(self.tarball_remote))
def _CreateInstance(self, name, image, **kwargs):
"""Creates a single VM instance with a static IP address."""
@@ -424,64 +416,51 @@
self.instances[test['name']] = instance
parallel.RunParallelSteps(steps)
- def _DeleteExistingResouce(self, resource, existence_checker, deletor):
- """Deletes a resource if it exists.
-
- This method checks the existence of a resource using |existence_checker|,
- and deletes it on true.
-
- Args:
- resource: The resource name/url to delete.
- existence_checker: A callable to check existence. This callable should
- take |resource| as its first argument.
- deletor: A callable to perform the deletion. This callable should take
- |resource| as its first argument.
-
- Raises:
- ValueError if existence_checker or deletor is not callable.
- """
- if not hasattr(existence_checker, '__call__'):
- raise ValueError('existence_checker must be a callable')
- if not hasattr(deletor, '__call__'):
- raise ValueError('deletor must be a callable')
-
- if existence_checker(resource):
- deletor(resource)
-
- def _DeleteInstance(self, name):
- """Deletes a VM instance and its IP address."""
- self.gce_context.DeleteInstance(name)
- self.gce_context.DeleteAddress(name)
-
def _DeleteExistingResources(self):
- """Deletes instances, image and the tarball on GCS if they exist."""
- steps = []
+ """Deletes all allocated GCP resources."""
+ # There are cases where resources are created at the backend but the
+ # resource creation calls fail, for example due to network errors that
+ # happen when the response is being delivered. So we always make sure to
+ # delete all allocated resources (images, instances, addresses) regardless
+ # of whether the corresponding Create operation succeeded.
- if self.tarball_remote:
- steps.append(partial(self.gscontext.DoCommand,
- ['rm', self.tarball_remote]))
- if self.image:
- steps.append(partial(self.gce_context.DeleteImage, self.image))
+ # Delete the GCE instances.
+ steps = [partial(self.gce_context.DeleteInstance, instance) for instance in
+ self.instances.values()]
- for instance in self.instances.values():
- steps.append(partial(
- self._DeleteExistingResouce,
- resource=instance,
- existence_checker=self.gce_context.InstanceExists,
- deletor=self._DeleteInstance))
+ def _RunParallelIgnoreErrors(funcs):
+ try:
+ parallel.RunParallelSteps(funcs)
+ except parallel.BackgroundFailure as e:
+ # We don't want to halt the test stage (and thus block commits) for
+ # cleanup errors. Leaked resources will be cleaned up externally.
+ logging.warning(
+ 'Ignoring BackgroundFailure while deleting resources: %s', e)
+ _RunParallelIgnoreErrors(steps)
- # Delete all resources in parallel.
- try:
- parallel.RunParallelSteps(steps)
- except Exception as e:
- logging.warn('Infrastructure failure. Error: %r' % e)
+ # Delete static IP addresses.
+ steps = [partial(self.gce_context.DeleteAddress, instance) for instance in
+ self.instances.values()]
+ _RunParallelIgnoreErrors(steps)
- # Reset variables.
- self.tarball_remote = None
- self.image = None
- self.image_link = None
self.instances = {}
+ # Delete the GCE image.
+ # Have to delete the image after all instances are deleted because if the
+ # image is being used to create an instance (e.g., current process is asked
+ # to terminate during instance creation), it cannot be deleted until the
+ # instance creation ends.
+ if self.image:
+ self.gce_context.DeleteImage(self.image)
+ self.image = self.image_link = None
+
+ # Delete the tarball uploaded GCS.
+ # For the same reason, it's safer to delete the tarball after the image is
+ # deleted.
+ if self.tarball_remote:
+ self.gscontext.DoCommand(['rm', self.tarball_remote])
+ self.tarball_remote = None
+
def _HandleFail(self, log_directory, fail_directory):
"""Handles test failures.
@@ -497,22 +476,17 @@
if not os.path.isdir(parent_dir):
os.makedirs(parent_dir)
- # Copy logs. Must be done before moving image, as this creates
- # |fail_directory|.
try:
+ # Copy logs. Must be done before moving image, as this creates
+ # |fail_directory|.
shutil.copytree(log_directory, fail_directory)
- except shutil.Error as e:
- logging.warning('Ignoring errors while copying logs: %s', e)
- # Copy GCE tarball and ssh private key for debugging.
- try:
+ # Copy GCE tarball and ssh private key for debugging.
shutil.copy(self.tarball_local, fail_directory)
if self.ssh_private_key is not None:
shutil.copy(self.ssh_private_key, fail_directory)
- except shutil.Error as e:
- logging.warning('Ignoring errors while copying GCE tarball: %s', e)
-
- self._DeleteExistingResources()
+ except (shutil.Error, OSError, IOError) as e:
+ logging.warning('Ignoring error while copying logs: %s', e)
def _GsPathToUrl(self, gs_path):
"""Converts a gs:// path to a URL.
@@ -535,6 +509,6 @@
def _WaitForBackgroundDeleteProcesses(self):
"""Waits for all background proecesses to finish."""
- for p in self.bg_delete_processes:
+ for p in self._bg_delete_processes:
p.join()
- self.bg_delete_processes = []
+ self._bg_delete_processes = []
diff --git a/au_test_harness/gce_au_worker_unittest.py b/au_test_harness/gce_au_worker_unittest.py
index 20ba25b..be129a0 100755
--- a/au_test_harness/gce_au_worker_unittest.py
+++ b/au_test_harness/gce_au_worker_unittest.py
@@ -279,9 +279,7 @@
"""Tests that _HandleFail copies necessary files for repro.
In case of a test failure, _HandleFail should copy necessary files to a
- debug location, and delete existing GCS/GCE resources. And because of the
- latter, a final call to Cleanup will not make an additional attempt to
- delete those resources.
+ debug location.
"""
worker = GCEAUWorker(self.options, self.test_results_root,
project=self.PROJECT, zone=self.ZONE,
@@ -305,20 +303,6 @@
self.PatchObject(worker, '_RunTest', autospec=True,
return_value=(0, None, None))
- # Fake resource existance.
- remote_instances = ('fake-remote-instance',)
- self.PatchObject(
- worker.gce_context, 'InstanceExists', autospec=True,
- side_effect=lambda instance: instance in remote_instances)
-
- # Fake resource deletors.
- self.PatchObject(worker.gce_context, 'DeleteImage', autospec=True)
- # Make DeleteInstance delete the remote resource.
- def _OverrideDeleteInstance(instance):
- remote_instances.remove(instance)
- self.PatchObject(worker.gce_context, 'DeleteInstance', autospec=True,
- side_effect=_OverrideDeleteInstance)
-
# VerifyImage should fail and _HandleFail should be called.
worker.VerifyImage(None)
@@ -328,15 +312,5 @@
self.test_results_failed,
os.path.basename(self.options.ssh_private_key)))
- # CleanUp will not attempt to delete resources because they should have been
- # deleted by _HandleFail.
- worker.instances = dict(smoke='fake-instance')
- worker.CleanUp()
-
- # Assert that one and only one attempt was made to delete existing
- # instances.
- worker.gce_context.DeleteInstance.assert_called_once_with(mock.ANY)
-
-
if __name__ == '__main__':
unittest.main()
diff --git a/ctest/ctest.py b/ctest/ctest.py
index 7cc0675..b15f016 100755
--- a/ctest/ctest.py
+++ b/ctest/ctest.py
@@ -205,8 +205,9 @@
if not quick_update and self.sign_payloads:
cmd.append('--payload_signing_key=%s' % self.payload_signing_key)
+ # Give tests 10 minutes to clean up before shut down.
res = cros_build_lib.RunCommand(cmd, cwd=self.crosutils_root,
- error_code_ok=True)
+ error_code_ok=True, kill_timeout=10 * 60)
if res.returncode != 0:
raise TestException('%s exited with code %d: %s' % (' '.join(res.cmd),
res.returncode,