| # -*- coding: utf-8 -*- |
| # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Unittests for parallel library.""" |
| |
| from __future__ import print_function |
| |
| import contextlib |
| import multiprocessing |
| import numbers |
| import os |
| import signal |
| import sys |
| import tempfile |
| import threading |
| import time |
| import unittest |
| |
| import mock |
| from six.moves import cPickle as pickle |
| from six.moves import queue as Queue |
| |
| from chromite.lib import cros_logging as logging |
| from chromite.lib import cros_test_lib |
| from chromite.lib import osutils |
| from chromite.lib import parallel |
| from chromite.lib import partial_mock |
| from chromite.lib import timeout_util |
| |
| |
| # pylint: disable=protected-access |
| |
| |
| _BUFSIZE = 10 ** 4 |
| _EXIT_TIMEOUT = 30 |
| _NUM_WRITES = 100 |
| _NUM_THREADS = 50 |
| _TOTAL_BYTES = _NUM_THREADS * _NUM_WRITES * _BUFSIZE |
| _GREETING = 'hello world' |
| # Include x80 in the binary greeting since that can't be decoded |
| # as utf-8 (see https://docs.python.org/3/howto/unicode.html). |
| # We use _BINARY_GREETING_START to begin writing the message without |
| # a continuation, then use _BINARY_GREETING have the full message with |
| # the continuation character. |
| _BINARY_GREETING_START = b'hello' |
| _BINARY_GREETING = b'hello world\x80' |
| _ESCAPED_GREETING = u'hello world\\x80' |
| _SKIP_FLAKY_TESTS = True |
| |
| |
| class FakeMultiprocessManager(object): |
| """A fake implementation of the multiprocess manager. |
| |
| This is only intended for use with ParallelMock. |
| """ |
| |
| def __enter__(self, *args, **kwargs): |
| return self |
| |
| def __exit__(self, *args, **kwargs): |
| return None |
| |
| def Queue(self): |
| return Queue.Queue() |
| |
| def RLock(self): |
| return threading.RLock() |
| |
| def dict(self, *args, **kwargs): |
| return dict(*args, **kwargs) |
| |
| def list(self, *args, **kwargs): |
| return list(*args, **kwargs) |
| |
| |
| class ParallelMock(partial_mock.PartialMock): |
| """Run parallel steps in sequence for testing purposes. |
| |
| This class updates chromite.lib.parallel to just run processes in |
| sequence instead of running them in parallel. This is useful for |
| testing. |
| """ |
| |
| TARGET = 'chromite.lib.parallel._BackgroundTask' |
| ATTRS = ('ParallelTasks', 'TaskRunner') |
| |
| def PreStart(self): |
| self.PatchObject(parallel, 'Manager', side_effect=FakeMultiprocessManager) |
| partial_mock.PartialMock.PreStart(self) |
| |
| @contextlib.contextmanager |
| def ParallelTasks(self, steps, max_parallel=None, halt_on_error=False): |
| assert max_parallel is None or isinstance(max_parallel, numbers.Integral) |
| assert isinstance(halt_on_error, bool) |
| try: |
| yield |
| finally: |
| for step in steps: |
| step() |
| |
| def TaskRunner(self, queue, task, onexit=None, task_args=None, |
| task_kwargs=None): |
| # Setup of these matches the original code. |
| if task_args is None: |
| task_args = [] |
| elif not isinstance(task_args, list): |
| task_args = list(task_args) |
| if task_kwargs is None: |
| task_kwargs = {} |
| |
| try: |
| while True: |
| # Wait for a new item to show up on the queue. This is a blocking wait, |
| # so if there's nothing to do, we just sit here. |
| x = queue.get() |
| if isinstance(x, parallel._AllTasksComplete): |
| # All tasks are complete, so we should exit. |
| break |
| x = task_args + list(x) |
| task(*x, **task_kwargs) |
| finally: |
| if onexit: |
| onexit() |
| |
| |
| class BackgroundTaskVerifier(partial_mock.PartialMock): |
| """Verify that queues are empty after BackgroundTaskRunner runs. |
| |
| BackgroundTaskRunner should always empty its input queues, even if an |
| exception occurs. This is important for preventing a deadlock in the case |
| where a thread fails partway through (e.g. user presses Ctrl-C before all |
| input can be processed). |
| """ |
| |
| TARGET = 'chromite.lib.parallel' |
| ATTRS = ('BackgroundTaskRunner',) |
| |
| @contextlib.contextmanager |
| def BackgroundTaskRunner(self, task, *args, **kwargs): |
| queue = kwargs.setdefault('queue', multiprocessing.Queue()) |
| args = [task] + list(args) |
| try: |
| with self.backup['BackgroundTaskRunner'](*args, **kwargs): |
| yield queue |
| finally: |
| try: |
| queue.get(False) |
| except Queue.Empty: |
| pass |
| else: |
| raise AssertionError('Expected empty queue after BackgroundTaskRunner') |
| |
| |
| class TestManager(cros_test_lib.TestCase): |
| """Test parallel.Manager().""" |
| |
| def testSigint(self): |
| """Tests that parallel.Manager() ignores SIGINT.""" |
| with parallel.Manager() as manager: |
| queue = manager.Queue() |
| os.kill(manager._process.pid, signal.SIGINT) |
| with self.assertRaises(Queue.Empty): |
| queue.get(block=False) |
| |
| def testSigterm(self): |
| """Tests that parallel.Manager() ignores SIGTERM.""" |
| with parallel.Manager() as manager: |
| queue = manager.Queue() |
| os.kill(manager._process.pid, signal.SIGTERM) |
| with self.assertRaises(Queue.Empty): |
| queue.get(block=False) |
| |
| |
| class TestBackgroundWrapper(cros_test_lib.TestCase): |
| """Unittests for background wrapper.""" |
| |
| def tearDown(self): |
| # Wait for children to exit. |
| try: |
| timeout_util.WaitForReturnValue([[]], multiprocessing.active_children, |
| timeout=_EXIT_TIMEOUT) |
| except timeout_util.TimeoutError: |
| pass |
| |
| # Complain if there are any children left over. |
| active_children = multiprocessing.active_children() |
| for child in active_children: |
| if hasattr(child, 'Kill'): |
| child.Kill(signal.SIGKILL, log_level=logging.WARNING) |
| child.join() |
| self.assertEqual(multiprocessing.active_children(), []) |
| self.assertEqual(active_children, []) |
| |
| def wrapOutputTest(self, func): |
| # Set _PRINT_INTERVAL to a smaller number to make it easier to |
| # reproduce bugs. |
| with mock.patch.multiple(parallel._BackgroundTask, PRINT_INTERVAL=0.01): |
| with tempfile.NamedTemporaryFile() as temp: |
| with open(temp.name, 'w') as output: |
| with mock.patch.multiple(sys, stdout=output): |
| func() |
| return osutils.ReadFile(temp.name) |
| |
| |
| class TestUnicodeContinuation(TestBackgroundWrapper): |
| """Test handling unicode continuation output from background environments.""" |
| |
| def setUp(self): |
| self.printed_message = multiprocessing.Event() |
| |
| def _WriteMessageToStdout(self): |
| """Write binary message to stdout.""" |
| if sys.version_info > (3,0): |
| # Python3 has sys.stdout.buffer to handle binary output. |
| sys.stdout.buffer.write(_BINARY_GREETING_START) |
| else: |
| # Python2 does not have sys.stdout.buffer, but sys.stdout.write can handle |
| # binary strings. |
| sys.stdout.write(_BINARY_GREETING_START) |
| sys.stdout.flush() |
| sys.stdout.seek(0) |
| self.printed_message.set() |
| |
| # Wait for the parent process to read the output. Once the output |
| # has been read, try writing the full message again, including the |
| # continuation character, to be sure that rewritten output is not read |
| # twice and that the continuation(0x80) is handled. |
| time.sleep(parallel._BackgroundTask.PRINT_INTERVAL * 10) |
| # Like above, handle writing binary in Python2 or Python3. |
| if sys.version_info > (3,0): |
| sys.stdout.buffer.write(_BINARY_GREETING) |
| else: |
| sys.stdout.write(_BINARY_GREETING) |
| sys.stdout.flush() |
| |
| def _ParallelWriteMessage(self): |
| """Write binary message to stdout using multiple processes.""" |
| with parallel.Manager() as manager: |
| queue = manager.Queue() |
| with parallel.BackgroundTaskRunner( |
| self._WriteMessageToStdout, queue=queue): |
| queue.put([]) |
| self.printed_message.wait() |
| |
| def testParallelHelloWorld(self): |
| """Test that output is not written multiple times when seeking.""" |
| out = self.wrapOutputTest(self._ParallelWriteMessage) |
| # Verify that the _BINARY_GREETING gets read (because it's decoded) |
| # as _ESCAPED_GREETING. |
| self.assertEqual(out, _ESCAPED_GREETING) |
| |
| |
| class BackgroundHelloWorldTests(TestBackgroundWrapper): |
| """Test HelloWorld output in various background environments.""" |
| |
| def setUp(self): |
| self.printed_hello = multiprocessing.Event() |
| |
| def _HelloWorld(self): |
| """Write 'hello world' to stdout.""" |
| sys.stdout.write('hello') |
| sys.stdout.flush() |
| sys.stdout.seek(0) |
| self.printed_hello.set() |
| |
| # Wait for the parent process to read the output. Once the output |
| # has been read, try writing 'hello world' again, to be sure that |
| # rewritten output is not read twice. |
| time.sleep(parallel._BackgroundTask.PRINT_INTERVAL * 10) |
| sys.stdout.write(_GREETING) |
| sys.stdout.flush() |
| |
| def _ParallelHelloWorld(self): |
| """Write 'hello world' to stdout using multiple processes.""" |
| with parallel.Manager() as manager: |
| queue = manager.Queue() |
| with parallel.BackgroundTaskRunner(self._HelloWorld, queue=queue): |
| queue.put([]) |
| self.printed_hello.wait() |
| |
| def VerifyDefaultQueue(self): |
| """Verify that BackgroundTaskRunner will create a queue on it's own.""" |
| with parallel.BackgroundTaskRunner(self._HelloWorld) as queue: |
| queue.put([]) |
| self.printed_hello.wait() |
| |
| def testParallelHelloWorld(self): |
| """Test that output is not written multiple times when seeking.""" |
| out = self.wrapOutputTest(self._ParallelHelloWorld) |
| self.assertEqual(out, _GREETING) |
| |
| def testMultipleHelloWorlds(self): |
| """Test that multiple threads can be created.""" |
| parallel.RunParallelSteps([self.testParallelHelloWorld] * 2) |
| |
| def testLongTempDirectory(self): |
| """Test that we can handle a long temporary directory.""" |
| with osutils.TempDir() as tempdir: |
| new_tempdir = os.path.join(tempdir, 'xxx/' * 100) |
| osutils.SafeMakedirs(new_tempdir) |
| old_tempdir, old_tempdir_env = osutils.SetGlobalTempDir(new_tempdir) |
| try: |
| self.testParallelHelloWorld() |
| finally: |
| osutils.SetGlobalTempDir(old_tempdir, old_tempdir_env) |
| |
| |
| def _BackgroundTaskRunnerArgs(results, arg1, arg2, kwarg1=None, kwarg2=None): |
| """Helper for TestBackgroundTaskRunnerArgs |
| |
| We specifically want a module function to test against and not a class member. |
| """ |
| results.put((arg1, arg2, kwarg1, kwarg2)) |
| |
| |
| class TestBackgroundTaskRunnerArgs(TestBackgroundWrapper): |
| """Unittests for BackgroundTaskRunner argument handling.""" |
| |
| def testArgs(self): |
| """Test that we can pass args down to the task.""" |
| with parallel.Manager() as manager: |
| results = manager.Queue() |
| arg2s = set((1, 2, 3)) |
| with parallel.BackgroundTaskRunner(_BackgroundTaskRunnerArgs, results, |
| 'arg1', kwarg1='kwarg1') as queue: |
| for arg2 in arg2s: |
| queue.put((arg2,)) |
| |
| # Since the queue is unordered, need to handle arg2 specially. |
| result_arg2s = set() |
| for _ in range(3): |
| result = results.get() |
| self.assertEqual(result[0], 'arg1') |
| result_arg2s.add(result[1]) |
| self.assertEqual(result[2], 'kwarg1') |
| self.assertEqual(result[3], None) |
| self.assertEqual(arg2s, result_arg2s) |
| self.assertEqual(results.empty(), True) |
| |
| |
| class TestFastPrinting(TestBackgroundWrapper): |
| """Stress tests for background sys.stdout handling.""" |
| |
| def _FastPrinter(self): |
| # Writing lots of output quickly often reproduces bugs in this module |
| # because it can trigger race conditions. |
| for _ in range(_NUM_WRITES - 1): |
| sys.stdout.write('x' * _BUFSIZE) |
| sys.stderr.write('x' * (_BUFSIZE - 1) + '\n') |
| |
| def _ParallelPrinter(self): |
| parallel.RunParallelSteps([self._FastPrinter] * _NUM_THREADS) |
| |
| def _NestedParallelPrinter(self): |
| parallel.RunParallelSteps([self._ParallelPrinter]) |
| |
| def testSimpleParallelPrinter(self): |
| out = self.wrapOutputTest(self._ParallelPrinter) |
| self.assertEqual(len(out), _TOTAL_BYTES) |
| |
| def testNestedParallelPrinter(self): |
| """Verify that no output is lost when lots of output is written.""" |
| out = self.wrapOutputTest(self._NestedParallelPrinter) |
| self.assertEqual(len(out), _TOTAL_BYTES) |
| |
| |
| class TestRunParallelSteps(cros_test_lib.TestCase): |
| """Tests for RunParallelSteps.""" |
| |
| def testReturnValues(self): |
| """Test that we pass return values through when requested.""" |
| def f1(): |
| return 1 |
| def f2(): |
| return 2 |
| def f3(): |
| pass |
| |
| return_values = parallel.RunParallelSteps([f1, f2, f3], return_values=True) |
| self.assertEqual(return_values, [1, 2, None]) |
| |
| def testLargeReturnValues(self): |
| """Test that the managed queue prevents hanging on large return values.""" |
| def f1(): |
| return ret_value |
| |
| ret_value = '' |
| for _ in range(10000): |
| ret_value += 'This will be repeated many times.\n' |
| |
| return_values = parallel.RunParallelSteps([f1], return_values=True) |
| self.assertEqual(return_values, [ret_value]) |
| |
| |
| class TestParallelMock(TestBackgroundWrapper): |
| """Test the ParallelMock class.""" |
| |
| def setUp(self): |
| self._calls = 0 |
| |
| def _Callback(self): |
| self._calls += 1 |
| return self._calls |
| |
| def testRunParallelSteps(self): |
| """Make sure RunParallelSteps is mocked out.""" |
| with ParallelMock(): |
| parallel.RunParallelSteps([self._Callback]) |
| self.assertEqual(1, self._calls) |
| |
| def testBackgroundTaskRunner(self): |
| """Make sure BackgroundTaskRunner is mocked out.""" |
| with ParallelMock(): |
| parallel.RunTasksInProcessPool(self._Callback, []) |
| self.assertEqual(0, self._calls) |
| result = parallel.RunTasksInProcessPool(self._Callback, [[]]) |
| self.assertEqual(1, self._calls) |
| self.assertEqual([1], result) |
| result = parallel.RunTasksInProcessPool(self._Callback, [], processes=9, |
| onexit=self._Callback) |
| self.assertEqual(10, self._calls) |
| self.assertEqual([], result) |
| result = parallel.RunTasksInProcessPool(self._Callback, [[]] * 10) |
| self.assertEqual(list(range(11, 21)), result) |
| |
| |
| class TestExceptions(cros_test_lib.MockOutputTestCase): |
| """Test cases where child processes raise exceptions.""" |
| |
| def _SystemExit(self): |
| sys.stdout.write(_GREETING) |
| sys.exit(1) |
| |
| def _KeyboardInterrupt(self): |
| sys.stdout.write(_GREETING) |
| raise KeyboardInterrupt() |
| |
| def _BadPickler(self): |
| return self._BadPickler |
| |
| class _TestException(Exception): |
| """Custom exception for testing.""" |
| |
| def _VerifyExceptionRaised(self, fn, exc_type): |
| """A helper function to verify the correct |exc_type| is raised.""" |
| for task in (lambda: parallel.RunTasksInProcessPool(fn, [[]]), |
| lambda: parallel.RunParallelSteps([fn])): |
| output_str = ex_str = ex = None |
| with self.OutputCapturer() as capture: |
| with self.assertRaises(parallel.BackgroundFailure) as ex: |
| task() |
| output_str = capture.GetStdout() |
| ex_str = str(ex.exception) |
| |
| self.assertTrue(exc_type in [x.type for x in ex.exception.exc_infos]) |
| self.assertEqual(output_str, _GREETING) |
| self.assertTrue(str(exc_type) in ex_str) |
| |
| def testExceptionRaising(self): |
| """Tests the exceptions are raised correctly.""" |
| self.StartPatcher(BackgroundTaskVerifier()) |
| self._VerifyExceptionRaised(self._KeyboardInterrupt, KeyboardInterrupt) |
| self._VerifyExceptionRaised(self._SystemExit, SystemExit) |
| |
| def testExceptionPriority(self): |
| """Tests that foreground exceptions take priority over background.""" |
| self.StartPatcher(BackgroundTaskVerifier()) |
| with self.assertRaises(self._TestException): |
| with parallel.BackgroundTaskRunner(self._KeyboardInterrupt, |
| processes=1) as queue: |
| queue.put([]) |
| raise self._TestException() |
| |
| # We can't test for PickleError with Python 3.5+ due to bugs in Python. |
| @unittest.skipIf(sys.version_info >= (3, 5), |
| 'https://bugs.python.org/issue29187') |
| def testFailedPickle(self): |
| """PicklingError should be thrown when an argument fails to pickle.""" |
| with self.assertRaises(pickle.PickleError): |
| parallel.RunTasksInProcessPool(self._SystemExit, [[self._SystemExit]]) |
| |
| def testFailedPickleOnReturn(self): |
| """PicklingError should be thrown when a return value fails to pickle.""" |
| with self.assertRaises(parallel.BackgroundFailure): |
| parallel.RunParallelSteps([self._BadPickler], return_values=True) |
| |
| |
| class _TestForegroundException(Exception): |
| """An exception to be raised by the foreground process.""" |
| |
| |
| class TestHalting(cros_test_lib.MockOutputTestCase, TestBackgroundWrapper): |
| """Test that child processes are halted when exceptions occur.""" |
| |
| def setUp(self): |
| self.failed = multiprocessing.Event() |
| self.passed = multiprocessing.Event() |
| |
| def _GetKillChildrenTimeout(self): |
| """Return a timeout that is long enough for _BackgroundTask._KillChildren. |
| |
| This unittest is not meant to restrict which signal succeeds in killing the |
| background process, so use a long enough timeout whenever asserting that the |
| background process is killed, keeping buffer for slow builders. |
| """ |
| return (parallel._BackgroundTask.SIGTERM_TIMEOUT + |
| parallel._BackgroundTask.SIGKILL_TIMEOUT) + 30 |
| |
| def _Pass(self): |
| self.passed.set() |
| sys.stdout.write(_GREETING) |
| |
| def _Exit(self): |
| sys.stdout.write(_GREETING) |
| self.passed.wait() |
| sys.exit(1) |
| |
| def _Fail(self): |
| self.failed.wait(self._GetKillChildrenTimeout()) |
| self.failed.set() |
| |
| def _PassEventually(self): |
| self.passed.wait(self._GetKillChildrenTimeout()) |
| self.passed.set() |
| |
| @unittest.skipIf(_SKIP_FLAKY_TESTS, 'Occasionally fails.') |
| def testExceptionRaising(self): |
| """Test that exceptions halt all running steps.""" |
| steps = [self._Exit, self._Fail, self._Pass, self._Fail] |
| output_str, ex_str = None, None |
| with self.OutputCapturer() as capture: |
| try: |
| parallel.RunParallelSteps(steps, halt_on_error=True) |
| except parallel.BackgroundFailure as ex: |
| output_str = capture.GetStdout() |
| ex_str = str(ex) |
| logging.debug(ex_str) |
| self.assertTrue('Traceback' in ex_str) |
| self.assertTrue(self.passed.is_set()) |
| self.assertEqual(output_str, _GREETING) |
| self.assertFalse(self.failed.is_set()) |
| |
| def testForegroundExceptionRaising(self): |
| """Test that BackgroundTaskRunner halts tasks on a foreground exception.""" |
| with self.assertRaises(_TestForegroundException): |
| with parallel.BackgroundTaskRunner(self._PassEventually, |
| processes=1, |
| halt_on_error=True) as queue: |
| queue.put([]) |
| raise _TestForegroundException() |
| self.assertFalse(self.passed.is_set()) |
| |
| @unittest.skipIf(_SKIP_FLAKY_TESTS, 'Occasionally fails.') |
| def testTempFileCleanup(self): |
| """Test that all temp files are cleaned up.""" |
| with osutils.TempDir() as tempdir: |
| self.assertEqual(os.listdir(tempdir), []) |
| self.testExceptionRaising() |
| self.assertEqual(os.listdir(tempdir), []) |
| |
| def testKillQuiet(self, steps=None, **kwargs): |
| """Test that processes do get killed if they're silent for too long.""" |
| if steps is None: |
| steps = [self._Fail] * 2 |
| kwargs.setdefault('SILENT_TIMEOUT', 0.1) |
| kwargs.setdefault('MINIMUM_SILENT_TIMEOUT', 0.01) |
| kwargs.setdefault('SILENT_TIMEOUT_STEP', 0) |
| kwargs.setdefault('SIGTERM_TIMEOUT', 0.1) |
| kwargs.setdefault('PRINT_INTERVAL', 0.01) |
| kwargs.setdefault('GDB_COMMANDS', ('detach',)) |
| |
| ex_str = None |
| with mock.patch.multiple(parallel._BackgroundTask, **kwargs): |
| with self.OutputCapturer() as capture: |
| try: |
| with cros_test_lib.LoggingCapturer(): |
| parallel.RunParallelSteps(steps) |
| except parallel.BackgroundFailure as ex: |
| ex_str = str(ex) |
| error_str = capture.GetStderr() |
| self.assertTrue('parallel_unittest.py' in error_str) |
| self.assertTrue(ex_str) |
| |
| |
| class TestConstants(cros_test_lib.TestCase): |
| """Test values of constants.""" |
| |
| def testSilentTimeout(self): |
| """Verify the silent timeout is small enough.""" |
| # Enforce that the default timeout is less than 9000, the default timeout |
| # set in build/scripts/master/factory/chromeos_factory.py:ChromiteFactory |
| # in the Chrome buildbot source code. |
| self.assertLess( |
| parallel._BackgroundTask.SILENT_TIMEOUT, 9000, |
| 'Do not increase this timeout. Instead, print regular progress ' |
| 'updates, so that buildbot (and cbuildbot) will will know that your ' |
| 'program has not hung.') |
| |
| |
| class TestExitWithParent(cros_test_lib.TestCase): |
| """Tests ExitWithParent.""" |
| |
| def testChildExits(self): |
| """Create a child and a grandchild. The child should die with the parent.""" |
| def GrandChild(): |
| parallel.ExitWithParent() |
| time.sleep(9) |
| |
| def Child(queue): |
| grand_child = multiprocessing.Process(target=GrandChild) |
| grand_child.start() |
| queue.put(grand_child.pid) |
| time.sleep(9) |
| |
| with parallel.Manager() as manager: |
| q = manager.Queue() |
| child = multiprocessing.Process(target=lambda: Child(q)) |
| child.start() |
| grand_child_pid = q.get(timeout=1) |
| |
| # Before we kill the child, the grandchild should be running: |
| self.assertTrue(os.path.isdir('/proc/%d' % grand_child_pid)) |
| os.kill(child.pid, signal.SIGKILL) |
| |
| # (shortly) after we kill the child, the grandchild should kill itself. |
| # We can't use os.waitpid because the grandchild process is not a child |
| # process of ours. Just wait 20 seconds - this should be enough even if the |
| # machine is under load. |
| timeout_util.WaitForReturnTrue( |
| lambda: not os.path.isdir('/proc/%d' % grand_child_pid), |
| 20, |
| period=0.05) |
| |
| |
| def main(_argv): |
| cros_test_lib.main(level='info', module=__name__) |