| # Copyright 2012 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Unittests for parallel library.""" |
| |
| import contextlib |
| import logging |
| import multiprocessing |
| import numbers |
| import os |
| import pickle |
| import queue as Queue |
| import signal |
| import sys |
| import tempfile |
| import threading |
| import time |
| import unittest |
| from unittest import mock |
| |
| from chromite.lib import cros_test_lib |
| from chromite.lib import osutils |
| from chromite.lib import parallel |
| from chromite.lib import partial_mock |
| from chromite.lib import timeout_util |
| |
| |
| # pylint: disable=protected-access |
| |
| |
| _BUFSIZE = 10**4 |
| _EXIT_TIMEOUT = 30 |
| _NUM_WRITES = 100 |
| _NUM_THREADS = 50 |
| _TOTAL_BYTES = _NUM_THREADS * _NUM_WRITES * _BUFSIZE |
| _GREETING = "hello world" |
| # Include x80 in the binary greeting since that can't be decoded |
| # as utf-8 (see https://docs.python.org/3/howto/unicode.html). |
| # We use _BINARY_GREETING_START to begin writing the message without |
| # a continuation, then use _BINARY_GREETING have the full message with |
| # the continuation character. |
| _BINARY_GREETING_START = b"hello" |
| _BINARY_GREETING = b"hello world\x80" |
| _ESCAPED_GREETING = "hello world\\x80" |
| _SKIP_FLAKY_TESTS = True |
| |
| |
| class FakeMultiprocessManager: |
| """A fake implementation of the multiprocess manager. |
| |
| This is only intended for use with ParallelMock. |
| """ |
| |
| def __enter__(self, *args, **kwargs): |
| return self |
| |
| def __exit__(self, *args, **kwargs): |
| return None |
| |
| def Queue(self): |
| return Queue.Queue() |
| |
| def RLock(self): |
| return threading.RLock() |
| |
| def dict(self, *args, **kwargs): |
| return dict(*args, **kwargs) |
| |
| def list(self, *args, **kwargs): |
| return list(*args, **kwargs) |
| |
| |
| class ParallelMock(partial_mock.PartialMock): |
| """Run parallel steps in sequence for testing purposes. |
| |
| This class updates chromite.lib.parallel to just run processes in |
| sequence instead of running them in parallel. This is useful for |
| testing. |
| """ |
| |
| TARGET = "chromite.lib.parallel._BackgroundTask" |
| ATTRS = ("ParallelTasks", "TaskRunner") |
| |
| def PreStart(self): |
| self.PatchObject( |
| parallel, "Manager", side_effect=FakeMultiprocessManager |
| ) |
| |
| @classmethod |
| @contextlib.contextmanager |
| def ParallelTasks(cls, steps, max_parallel=None, halt_on_error=False): |
| assert max_parallel is None or isinstance( |
| max_parallel, numbers.Integral |
| ) |
| assert isinstance(halt_on_error, bool) |
| try: |
| yield |
| finally: |
| for step in steps: |
| step() |
| |
| def TaskRunner( |
| self, queue, task, onexit=None, task_args=None, task_kwargs=None |
| ): |
| # Setup of these matches the original code. |
| if task_args is None: |
| task_args = [] |
| elif not isinstance(task_args, list): |
| task_args = list(task_args) |
| if task_kwargs is None: |
| task_kwargs = {} |
| |
| try: |
| while True: |
| # Wait for a new item to show up on the queue. This is a |
| # blocking wait, so if there's nothing to do, we just sit here. |
| x = queue.get() |
| if isinstance(x, parallel._AllTasksComplete): |
| # All tasks are complete, so we should exit. |
| break |
| x = task_args + list(x) |
| task(*x, **task_kwargs) |
| finally: |
| if onexit: |
| onexit() |
| |
| |
| class BackgroundTaskVerifier(partial_mock.PartialMock): |
| """Verify that queues are empty after BackgroundTaskRunner runs. |
| |
| BackgroundTaskRunner should always empty its input queues, even if an |
| exception occurs. This is important for preventing a deadlock in the case |
| where a thread fails partway through (e.g. user presses Ctrl-C before all |
| input can be processed). |
| """ |
| |
| TARGET = "chromite.lib.parallel" |
| ATTRS = ("BackgroundTaskRunner",) |
| |
| @contextlib.contextmanager |
| def BackgroundTaskRunner(self, task, *args, **kwargs): |
| queue = kwargs.setdefault("queue", multiprocessing.Queue()) |
| args = [task] + list(args) |
| try: |
| with self.backup["BackgroundTaskRunner"](*args, **kwargs): |
| yield queue |
| finally: |
| try: |
| queue.get(False) |
| except Queue.Empty: |
| pass |
| else: |
| raise AssertionError( |
| "Expected empty queue after BackgroundTaskRunner" |
| ) |
| |
| |
| class TestManager(cros_test_lib.TestCase): |
| """Test parallel.Manager().""" |
| |
| def testSigint(self): |
| """Tests that parallel.Manager() ignores SIGINT.""" |
| with parallel.Manager() as manager: |
| queue = manager.Queue() |
| os.kill(manager._process.pid, signal.SIGINT) |
| with self.assertRaises(Queue.Empty): |
| queue.get(block=False) |
| |
| def testSigterm(self): |
| """Tests that parallel.Manager() ignores SIGTERM.""" |
| with parallel.Manager() as manager: |
| queue = manager.Queue() |
| os.kill(manager._process.pid, signal.SIGTERM) |
| with self.assertRaises(Queue.Empty): |
| queue.get(block=False) |
| |
| |
| class TestBackgroundWrapper(cros_test_lib.TestCase): |
| """Unittests for background wrapper.""" |
| |
| def tearDown(self): |
| # Wait for children to exit. |
| try: |
| timeout_util.WaitForReturnValue( |
| [[]], multiprocessing.active_children, timeout=_EXIT_TIMEOUT |
| ) |
| except timeout_util.TimeoutError: |
| pass |
| |
| # Complain if there are any children left over. |
| active_children = multiprocessing.active_children() |
| for child in active_children: |
| if hasattr(child, "Kill"): |
| child.Kill(signal.SIGKILL, log_level=logging.WARNING) |
| child.join() |
| self.assertEqual(multiprocessing.active_children(), []) |
| self.assertEqual(active_children, []) |
| |
| def wrapOutputTest(self, func): |
| # Set _PRINT_INTERVAL to a smaller number to make it easier to |
| # reproduce bugs. |
| with mock.patch.multiple(parallel._BackgroundTask, PRINT_INTERVAL=0.01): |
| with tempfile.NamedTemporaryFile() as temp: |
| with open(temp.name, "w", encoding="utf-8") as output: |
| with mock.patch.multiple(sys, stdout=output): |
| func() |
| return osutils.ReadFile(temp.name) |
| |
| |
| class TestUnicodeContinuation(TestBackgroundWrapper): |
| """Test handling unicode continuation output from background envs.""" |
| |
| def setUp(self): |
| self.printed_message = multiprocessing.Event() |
| |
| def _WriteMessageToStdout(self): |
| """Write binary message to stdout.""" |
| # Python3 has sys.stdout.buffer to handle binary output. |
| sys.stdout.buffer.write(_BINARY_GREETING_START) |
| sys.stdout.flush() |
| sys.stdout.seek(0) |
| self.printed_message.set() |
| |
| # Wait for the parent process to read the output. Once the output |
| # has been read, try writing the full message again, including the |
| # continuation character, to be sure that rewritten output is not read |
| # twice and that the continuation(0x80) is handled. |
| time.sleep(parallel._BackgroundTask.PRINT_INTERVAL * 10) |
| sys.stdout.buffer.write(_BINARY_GREETING) |
| sys.stdout.flush() |
| |
| def _ParallelWriteMessage(self): |
| """Write binary message to stdout using multiple processes.""" |
| with parallel.Manager() as manager: |
| queue = manager.Queue() |
| with parallel.BackgroundTaskRunner( |
| self._WriteMessageToStdout, queue=queue |
| ): |
| queue.put([]) |
| self.printed_message.wait() |
| |
| def testParallelHelloWorld(self): |
| """Test that output is not written multiple times when seeking.""" |
| out = self.wrapOutputTest(self._ParallelWriteMessage) |
| # Verify that the _BINARY_GREETING gets read (because it's decoded) |
| # as _ESCAPED_GREETING. |
| self.assertEqual(out, _ESCAPED_GREETING) |
| |
| |
| class BackgroundHelloWorldTests(TestBackgroundWrapper): |
| """Test HelloWorld output in various background environments.""" |
| |
| def setUp(self): |
| self.printed_hello = multiprocessing.Event() |
| |
| def _HelloWorld(self): |
| """Write 'hello world' to stdout.""" |
| sys.stdout.write("hello") |
| sys.stdout.flush() |
| sys.stdout.seek(0) |
| self.printed_hello.set() |
| |
| # Wait for the parent process to read the output. Once the output |
| # has been read, try writing 'hello world' again, to be sure that |
| # rewritten output is not read twice. |
| time.sleep(parallel._BackgroundTask.PRINT_INTERVAL * 10) |
| sys.stdout.write(_GREETING) |
| sys.stdout.flush() |
| |
| def _ParallelHelloWorld(self): |
| """Write 'hello world' to stdout using multiple processes.""" |
| with parallel.Manager() as manager: |
| queue = manager.Queue() |
| with parallel.BackgroundTaskRunner(self._HelloWorld, queue=queue): |
| queue.put([]) |
| self.printed_hello.wait() |
| |
| def VerifyDefaultQueue(self): |
| """Verify that BackgroundTaskRunner will create a queue on it's own.""" |
| with parallel.BackgroundTaskRunner(self._HelloWorld) as queue: |
| queue.put([]) |
| self.printed_hello.wait() |
| |
| def testParallelHelloWorld(self): |
| """Test that output is not written multiple times when seeking.""" |
| out = self.wrapOutputTest(self._ParallelHelloWorld) |
| self.assertEqual(out, _GREETING) |
| |
| def testMultipleHelloWorlds(self): |
| """Test that multiple threads can be created.""" |
| parallel.RunParallelSteps([self.testParallelHelloWorld] * 2) |
| |
| def testLongTempDirectory(self): |
| """Test that we can handle a long temporary directory.""" |
| with osutils.TempDir() as tempdir: |
| new_tempdir = os.path.join(tempdir, "xxx/" * 100) |
| osutils.SafeMakedirs(new_tempdir) |
| old_tempdir, old_tempdir_env = osutils.SetGlobalTempDir(new_tempdir) |
| try: |
| self.testParallelHelloWorld() |
| finally: |
| osutils.SetGlobalTempDir(old_tempdir, old_tempdir_env) |
| |
| |
| def _BackgroundTaskRunnerArgs(results, arg1, arg2, kwarg1=None, kwarg2=None): |
| """Helper for TestBackgroundTaskRunnerArgs |
| |
| We specifically want a module function to test against and not a class |
| member. |
| """ |
| results.put((arg1, arg2, kwarg1, kwarg2)) |
| |
| |
| class TestBackgroundTaskRunnerArgs(TestBackgroundWrapper): |
| """Unittests for BackgroundTaskRunner argument handling.""" |
| |
| def testArgs(self): |
| """Test that we can pass args down to the task.""" |
| with parallel.Manager() as manager: |
| results = manager.Queue() |
| arg2s = set((1, 2, 3)) |
| with parallel.BackgroundTaskRunner( |
| _BackgroundTaskRunnerArgs, results, "arg1", kwarg1="kwarg1" |
| ) as queue: |
| for arg2 in arg2s: |
| queue.put((arg2,)) |
| |
| # Since the queue is unordered, need to handle arg2 specially. |
| result_arg2s = set() |
| for _ in range(3): |
| result = results.get() |
| self.assertEqual(result[0], "arg1") |
| result_arg2s.add(result[1]) |
| self.assertEqual(result[2], "kwarg1") |
| self.assertEqual(result[3], None) |
| self.assertEqual(arg2s, result_arg2s) |
| self.assertEqual(results.empty(), True) |
| |
| |
| class TestFastPrinting(TestBackgroundWrapper): |
| """Stress tests for background sys.stdout handling.""" |
| |
| def _FastPrinter(self): |
| # Writing lots of output quickly often reproduces bugs in this module |
| # because it can trigger race conditions. |
| for _ in range(_NUM_WRITES - 1): |
| sys.stdout.write("x" * _BUFSIZE) |
| sys.stderr.write("x" * (_BUFSIZE - 1) + "\n") |
| |
| def _ParallelPrinter(self): |
| parallel.RunParallelSteps([self._FastPrinter] * _NUM_THREADS) |
| |
| def _NestedParallelPrinter(self): |
| parallel.RunParallelSteps([self._ParallelPrinter]) |
| |
| def testSimpleParallelPrinter(self): |
| out = self.wrapOutputTest(self._ParallelPrinter) |
| self.assertEqual(len(out), _TOTAL_BYTES) |
| |
| def testNestedParallelPrinter(self): |
| """Verify that no output is lost when lots of output is written.""" |
| out = self.wrapOutputTest(self._NestedParallelPrinter) |
| self.assertEqual(len(out), _TOTAL_BYTES) |
| |
| |
| class TestRunParallelSteps(cros_test_lib.TestCase): |
| """Tests for RunParallelSteps.""" |
| |
| def testReturnValues(self): |
| """Test that we pass return values through when requested.""" |
| |
| def f1(): |
| return 1 |
| |
| def f2(): |
| return 2 |
| |
| def f3(): |
| pass |
| |
| return_values = parallel.RunParallelSteps( |
| [f1, f2, f3], return_values=True |
| ) |
| self.assertEqual(return_values, [1, 2, None]) |
| |
| def testLargeReturnValues(self): |
| """Verify the managed queue prevents hanging on large return values.""" |
| |
| def f1(): |
| return ret_value |
| |
| ret_value = "" |
| for _ in range(10000): |
| ret_value += "This will be repeated many times.\n" |
| |
| return_values = parallel.RunParallelSteps([f1], return_values=True) |
| self.assertEqual(return_values, [ret_value]) |
| |
| |
| class TestParallelMock(TestBackgroundWrapper): |
| """Test the ParallelMock class.""" |
| |
| def setUp(self): |
| self._calls = 0 |
| |
| def _Callback(self): |
| self._calls += 1 |
| return self._calls |
| |
| def testRunParallelSteps(self): |
| """Make sure RunParallelSteps is mocked out.""" |
| with ParallelMock(): |
| parallel.RunParallelSteps([self._Callback]) |
| self.assertEqual(1, self._calls) |
| |
| def testBackgroundTaskRunner(self): |
| """Make sure BackgroundTaskRunner is mocked out.""" |
| with ParallelMock(): |
| parallel.RunTasksInProcessPool(self._Callback, []) |
| self.assertEqual(0, self._calls) |
| result = parallel.RunTasksInProcessPool(self._Callback, [[]]) |
| self.assertEqual(1, self._calls) |
| self.assertEqual([1], result) |
| result = parallel.RunTasksInProcessPool( |
| self._Callback, [], processes=9, onexit=self._Callback |
| ) |
| self.assertEqual(10, self._calls) |
| self.assertEqual([], result) |
| result = parallel.RunTasksInProcessPool(self._Callback, [[]] * 10) |
| self.assertEqual(list(range(11, 21)), result) |
| |
| |
| class TestExceptions(cros_test_lib.MockOutputTestCase): |
| """Test cases where child processes raise exceptions.""" |
| |
| def _SystemExit(self): |
| sys.stdout.write(_GREETING) |
| sys.exit(1) |
| |
| def _KeyboardInterrupt(self): |
| sys.stdout.write(_GREETING) |
| raise KeyboardInterrupt() |
| |
| def _BadPickler(self): |
| return self._BadPickler |
| |
| class _TestException(Exception): |
| """Custom exception for testing.""" |
| |
| def _VerifyExceptionRaised(self, fn, exc_type): |
| """A helper function to verify the correct |exc_type| is raised.""" |
| for task in ( |
| lambda: parallel.RunTasksInProcessPool(fn, [[]]), |
| lambda: parallel.RunParallelSteps([fn]), |
| ): |
| output_str = ex_str = ex = None |
| with self.OutputCapturer() as capture: |
| with self.assertRaises(parallel.BackgroundFailure) as ex: |
| task() |
| output_str = capture.GetStdout() |
| ex_str = str(ex.exception) |
| |
| self.assertTrue( |
| exc_type in [x.type for x in ex.exception.exc_infos] |
| ) |
| self.assertEqual(output_str, _GREETING) |
| self.assertTrue(str(exc_type) in ex_str) |
| |
| def testExceptionRaising(self): |
| """Tests the exceptions are raised correctly.""" |
| self.StartPatcher(BackgroundTaskVerifier()) |
| self._VerifyExceptionRaised(self._KeyboardInterrupt, KeyboardInterrupt) |
| self._VerifyExceptionRaised(self._SystemExit, SystemExit) |
| |
| def testExceptionPriority(self): |
| """Tests that foreground exceptions take priority over background.""" |
| self.StartPatcher(BackgroundTaskVerifier()) |
| with self.assertRaises(self._TestException): |
| with parallel.BackgroundTaskRunner( |
| self._KeyboardInterrupt, processes=1 |
| ) as queue: |
| queue.put([]) |
| raise self._TestException() |
| |
| # We can't test for PickleError with Python 3.5+ due to bugs in Python. |
| @unittest.skipIf( |
| sys.version_info >= (3, 5), "https://bugs.python.org/issue29187" |
| ) |
| def testFailedPickle(self): |
| """PicklingError should be thrown when an argument fails to pickle.""" |
| with self.assertRaises(pickle.PickleError): |
| parallel.RunTasksInProcessPool( |
| self._SystemExit, [[self._SystemExit]] |
| ) |
| |
| def testFailedPickleOnReturn(self): |
| """Verify PicklingError thrown when a return value fails to pickle.""" |
| with self.assertRaises(parallel.BackgroundFailure): |
| parallel.RunParallelSteps([self._BadPickler], return_values=True) |
| |
| |
| class _TestForegroundException(Exception): |
| """An exception to be raised by the foreground process.""" |
| |
| |
| class TestHalting(cros_test_lib.MockOutputTestCase, TestBackgroundWrapper): |
| """Test that child processes are halted when exceptions occur.""" |
| |
| def setUp(self): |
| self.failed = multiprocessing.Event() |
| self.passed = multiprocessing.Event() |
| |
| def _GetKillChildrenTimeout(self): |
| """Return a timeout that triggers _BackgroundTask._KillChildren. |
| |
| This unittest is not meant to restrict which signal succeeds in killing |
| the background process, so use a long enough timeout whenever asserting |
| that the background process is killed, keeping buffer for slow builders. |
| """ |
| return ( |
| parallel._BackgroundTask.SIGTERM_TIMEOUT |
| + parallel._BackgroundTask.SIGKILL_TIMEOUT |
| ) + 30 |
| |
| def _Pass(self): |
| self.passed.set() |
| sys.stdout.write(_GREETING) |
| |
| def _Exit(self): |
| sys.stdout.write(_GREETING) |
| self.passed.wait() |
| sys.exit(1) |
| |
| def _Fail(self): |
| self.failed.wait(self._GetKillChildrenTimeout()) |
| self.failed.set() |
| |
| def _PassEventually(self): |
| self.passed.wait(self._GetKillChildrenTimeout()) |
| self.passed.set() |
| |
| @unittest.skipIf(_SKIP_FLAKY_TESTS, "Occasionally fails.") |
| def testExceptionRaising(self): |
| """Test that exceptions halt all running steps.""" |
| steps = [self._Exit, self._Fail, self._Pass, self._Fail] |
| output_str, ex_str = None, None |
| with self.OutputCapturer() as capture: |
| try: |
| parallel.RunParallelSteps(steps, halt_on_error=True) |
| except parallel.BackgroundFailure as ex: |
| output_str = capture.GetStdout() |
| ex_str = str(ex) |
| logging.debug(ex_str) |
| self.assertTrue("Traceback" in ex_str) |
| self.assertTrue(self.passed.is_set()) |
| self.assertEqual(output_str, _GREETING) |
| self.assertFalse(self.failed.is_set()) |
| |
| def testForegroundExceptionRaising(self): |
| """Verify BackgroundTaskRunner halts tasks on a foreground exception.""" |
| with self.assertRaises(_TestForegroundException): |
| with mock.patch.multiple( |
| parallel._BackgroundTask, SIGTERM_TIMEOUT=5 |
| ): |
| with parallel.BackgroundTaskRunner( |
| self._PassEventually, processes=1, halt_on_error=True |
| ) as queue: |
| queue.put([]) |
| raise _TestForegroundException() |
| self.assertFalse(self.passed.is_set()) |
| |
| @unittest.skipIf(_SKIP_FLAKY_TESTS, "Occasionally fails.") |
| def testTempFileCleanup(self): |
| """Test that all temp files are cleaned up.""" |
| with osutils.TempDir() as tempdir: |
| self.assertEqual(os.listdir(tempdir), []) |
| self.testExceptionRaising() |
| self.assertEqual(os.listdir(tempdir), []) |
| |
| def testKillQuiet(self, steps=None, **kwargs): |
| """Test that processes do get killed if they're silent for too long.""" |
| if steps is None: |
| steps = [self._Fail] * 2 |
| kwargs.setdefault("SILENT_TIMEOUT", 0.1) |
| kwargs.setdefault("MINIMUM_SILENT_TIMEOUT", 0.01) |
| kwargs.setdefault("SILENT_TIMEOUT_STEP", 0) |
| kwargs.setdefault("SIGTERM_TIMEOUT", 0.1) |
| kwargs.setdefault("PRINT_INTERVAL", 0.01) |
| kwargs.setdefault("GDB_COMMANDS", ("detach",)) |
| |
| ex_str = None |
| error_str = None |
| with mock.patch.multiple(parallel._BackgroundTask, **kwargs): |
| with self.OutputCapturer() as capture: |
| try: |
| with cros_test_lib.LoggingCapturer(): |
| parallel.RunParallelSteps(steps) |
| except parallel.BackgroundFailure as ex: |
| ex_str = str(ex) |
| error_str = capture.GetStderr() |
| self.assertTrue("parallel_unittest.py" in error_str) |
| self.assertTrue(ex_str) |
| |
| |
| class TestConstants(cros_test_lib.TestCase): |
| """Test values of constants.""" |
| |
| def testSilentTimeout(self): |
| """Verify the silent timeout is small enough.""" |
| # Enforce that the default timeout is less than 9000, the default |
| # timeout set in |
| # build/scripts/master/factory/chromeos_factory.py:ChromiteFactory in |
| # the Chrome buildbot source code. |
| self.assertLess( |
| parallel._BackgroundTask.SILENT_TIMEOUT, |
| 9000, |
| "Do not increase this timeout. Instead, print regular progress " |
| "updates, so that buildbot (and cbuildbot) will will know that " |
| "your program has not hung.", |
| ) |
| |
| |
| class TestExitWithParent(cros_test_lib.TestCase): |
| """Tests ExitWithParent.""" |
| |
| def testChildExits(self): |
| """Create a child and a grandchild. |
| |
| The child should die with the parent. |
| """ |
| |
| def GrandChild(): |
| parallel.ExitWithParent() |
| time.sleep(9) |
| |
| def Child(queue): |
| grand_child = multiprocessing.Process(target=GrandChild) |
| grand_child.start() |
| queue.put(grand_child.pid) |
| time.sleep(9) |
| |
| with parallel.Manager() as manager: |
| q = manager.Queue() |
| child = multiprocessing.Process(target=lambda: Child(q)) |
| child.start() |
| grand_child_pid = q.get(timeout=1) |
| |
| # Before we kill the child, the grandchild should be running: |
| self.assertTrue(os.path.isdir("/proc/%d" % grand_child_pid)) |
| os.kill(child.pid, signal.SIGKILL) |
| |
| # (shortly) after we kill the child, the grandchild should kill itself. |
| # We can't use os.waitpid because the grandchild process is not a child |
| # process of ours. Just wait 20 seconds - this should be enough even if |
| # the machine is under load. |
| timeout_util.WaitForReturnTrue( |
| lambda: not os.path.isdir("/proc/%d" % grand_child_pid), |
| 20, |
| period=0.05, |
| ) |