| # Copyright 2012 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Unittest for cros_test_lib (tests for tests? Who'd a thunk it).""" |
| |
| import os |
| import subprocess |
| import sys |
| import time |
| import unittest |
| from unittest import mock |
| |
| from chromite.lib import cros_build_lib |
| from chromite.lib import cros_test_lib |
| from chromite.lib import osutils |
| from chromite.lib import partial_mock |
| from chromite.lib import timeout_util |
| |
| |
| # Convenience alias |
| Dir = cros_test_lib.Directory |
| |
| |
| class CrosTestCaseTest(cros_test_lib.TestCase): |
| """Test the cros_test_lib.TestCase.""" |
| |
| def testAssertStartsWith(self): |
| s = "abcdef" |
| prefix = "abc" |
| self.assertStartsWith(s, prefix) |
| prefix = "def" |
| self.assertRaises(AssertionError, self.assertStartsWith, s, prefix) |
| |
| def testAssertEndsWith(self): |
| s = "abcdef" |
| suffix = "abc" |
| self.assertRaises(AssertionError, self.assertEndsWith, s, suffix) |
| suffix = "def" |
| self.assertEndsWith(s, suffix) |
| |
| |
| class TruthTableTest(cros_test_lib.TestCase): |
| """Test TruthTable functionality.""" |
| |
| def _TestTableSmoke(self, tt, lines): |
| """Run the given truth table through basic smoke checks. |
| |
| Args: |
| tt: A TruthTable object. |
| lines: The expected input lines, in order (list of tuples). |
| """ |
| # Check that more than one iterable can be used at once. |
| iter1 = iter(tt) |
| iter2 = iter(tt) |
| self.assertEqual(lines[0], next(iter1)) |
| self.assertEqual(lines[0], next(iter2)) |
| self.assertEqual(lines[1], next(iter2)) |
| |
| # Check that iteration works again. |
| for ix, line in enumerate(tt): |
| self.assertEqual(lines[ix], line) |
| |
| # Check direct access of input lines. |
| for i in range(len(tt)): |
| self.assertEqual(lines[i], tt.GetInputs(i)) |
| |
| # Check assertions on bad input to GetInputs. |
| self.assertRaises(ValueError, tt.GetInputs, -1) |
| self.assertRaises(ValueError, tt.GetInputs, len(tt)) |
| |
| def testTwoDimensions(self): |
| """Test TruthTable behavior for two boolean inputs.""" |
| tt = cros_test_lib.TruthTable(inputs=[(True, True), (True, False)]) |
| self.assertEqual(len(tt), pow(2, 2)) |
| |
| # Check truth table output. |
| self.assertFalse(tt.GetOutput((False, False))) |
| self.assertFalse(tt.GetOutput((False, True))) |
| self.assertTrue(tt.GetOutput((True, False))) |
| self.assertTrue(tt.GetOutput((True, True))) |
| |
| # Check assertions on bad input to GetOutput. |
| self.assertRaises(TypeError, tt.GetOutput, True) |
| self.assertRaises(ValueError, tt.GetOutput, (True, True, True)) |
| |
| # Check iteration over input lines. |
| lines = list(tt) |
| self.assertEqual((False, False), lines[0]) |
| self.assertEqual((False, True), lines[1]) |
| self.assertEqual((True, False), lines[2]) |
| self.assertEqual((True, True), lines[3]) |
| |
| self._TestTableSmoke(tt, lines) |
| |
| def testFourDimensions(self): |
| """Test TruthTable behavior for four boolean inputs.""" |
| false1 = (True, True, True, False) |
| false2 = (True, False, True, False) |
| true1 = (False, True, False, True) |
| true2 = (True, True, False, False) |
| tt = cros_test_lib.TruthTable( |
| inputs=(false1, false2), input_result=False |
| ) |
| self.assertEqual(len(tt), pow(2, 4)) |
| |
| # Check truth table output. |
| self.assertFalse(tt.GetOutput(false1)) |
| self.assertFalse(tt.GetOutput(false2)) |
| self.assertTrue(tt.GetOutput(true1)) |
| self.assertTrue(tt.GetOutput(true2)) |
| |
| # Check assertions on bad input to GetOutput. |
| self.assertRaises(TypeError, tt.GetOutput, True) |
| self.assertRaises(ValueError, tt.GetOutput, (True, True, True)) |
| |
| # Check iteration over input lines. |
| lines = list(tt) |
| self.assertEqual((False, False, False, False), lines[0]) |
| self.assertEqual((False, False, False, True), lines[1]) |
| self.assertEqual((False, True, True, True), lines[7]) |
| self.assertEqual((True, True, True, True), lines[15]) |
| |
| self._TestTableSmoke(tt, lines) |
| |
| |
| class VerifyTarballTest(cros_test_lib.MockTempDirTestCase): |
| """Test tarball verification functionality.""" |
| |
| TARBALL = "fake_tarball" |
| |
| def setUp(self): |
| self.rc_mock = self.StartPatcher(cros_test_lib.RunCommandMock()) |
| |
| def _MockTarList(self, files): |
| """Mock out tarball content list call. |
| |
| Args: |
| files: A list of contents to return. |
| """ |
| self.rc_mock.AddCmdResult( |
| partial_mock.ListRegex("tar -tf"), stdout="\n".join(files) |
| ) |
| |
| def testNormPath(self): |
| """Test path normalization.""" |
| tar_contents = ["./", "./foo/", "./foo/./a", "./foo/./b"] |
| dir_struct = [Dir(".", []), Dir("foo", ["a", "b"])] |
| self._MockTarList(tar_contents) |
| cros_test_lib.VerifyTarball(self.TARBALL, dir_struct) |
| |
| def testDuplicate(self): |
| """Test duplicate detection.""" |
| tar_contents = ["a", "b", "a"] |
| dir_struct = ["a", "b"] |
| self._MockTarList(tar_contents) |
| self.assertRaises( |
| AssertionError, |
| cros_test_lib.VerifyTarball, |
| self.TARBALL, |
| dir_struct, |
| ) |
| |
| |
| class MockTestCaseTest(cros_test_lib.TestCase): |
| """Tests MockTestCase functionality.""" |
| |
| class MyMockTestCase(cros_test_lib.MockTestCase): |
| """Helper class for testing MockTestCase.""" |
| |
| def testIt(self): |
| pass |
| |
| class Mockable: |
| """Helper test class intended for having values mocked out.""" |
| |
| TO_BE_MOCKED = 0 |
| TO_BE_MOCKED2 = 10 |
| TO_BE_MOCKED3 = 20 |
| |
| def GetPatcher(self, attr, val): |
| return mock.patch( |
| "%s.MockTestCaseTest.Mockable.%s" % (__name__, attr), new=val |
| ) |
| |
| def testPatchRemovalError(self): |
| """Verify that patch removal during tearDown is robust to Exceptions.""" |
| tc = self.MyMockTestCase("testIt") |
| patcher = self.GetPatcher("TO_BE_MOCKED", -100) |
| patcher2 = self.GetPatcher("TO_BE_MOCKED2", -200) |
| patcher3 = self.GetPatcher("TO_BE_MOCKED3", -300) |
| patcher3.start() |
| tc.setUp() |
| tc.StartPatcher(patcher) |
| tc.StartPatcher(patcher2) |
| patcher.stop() |
| self.assertEqual(self.Mockable.TO_BE_MOCKED2, -200) |
| self.assertEqual(self.Mockable.TO_BE_MOCKED3, -300) |
| |
| def abort(): |
| raise RuntimeError() |
| |
| patcher.stop = abort |
| self.assertRaises(RuntimeError, tc.tearDown) |
| # Make sure that even though exception is raised for stopping 'patcher', |
| # we continue to stop 'patcher2', and run patcher.stopall(). |
| self.assertEqual(self.Mockable.TO_BE_MOCKED2, 10) |
| self.assertEqual(self.Mockable.TO_BE_MOCKED3, 20) |
| |
| |
| class TestCaseTest(unittest.TestCase): |
| """Tests TestCase functionality.""" |
| |
| def testTimeout(self): |
| """Test that test cases are interrupted when they are hanging.""" |
| |
| class TimeoutTestCase(cros_test_lib.TestCase): |
| """Raises a TimeoutError because it takes too long.""" |
| |
| TEST_CASE_TIMEOUT = 1 |
| |
| def testSleeping(self): |
| """Sleep for 2 minutes. This should raise a TimeoutError.""" |
| time.sleep(2 * 60) |
| raise AssertionError("Test case should have timed out.") |
| |
| # Run the test case, verifying it raises a TimeoutError. |
| test = TimeoutTestCase(methodName="testSleeping") |
| self.assertRaises(timeout_util.TimeoutError, test.testSleeping) |
| |
| |
| class OutputTestCaseTest( |
| cros_test_lib.OutputTestCase, cros_test_lib.TempDirTestCase |
| ): |
| """Tests OutputTestCase functionality.""" |
| |
| def testStdoutAndStderr(self): |
| """Check capturing stdout and stderr.""" |
| with self.OutputCapturer(): |
| print("foo") |
| print("bar", file=sys.stderr) |
| self.AssertOutputContainsLine("foo") |
| self.AssertOutputContainsLine( |
| "bar", check_stdout=False, check_stderr=True |
| ) |
| |
| def testStdoutReadDuringCapture(self): |
| """Check reading stdout mid-capture.""" |
| with self.OutputCapturer(): |
| print("foo") |
| self.AssertOutputContainsLine("foo") |
| print("bar") |
| self.AssertOutputContainsLine("bar") |
| self.AssertOutputContainsLine("foo") |
| self.AssertOutputContainsLine("bar") |
| |
| def testClearCaptured(self): |
| """Check writing data, clearing it, then writing more data.""" |
| with self.OutputCapturer() as cap: |
| print("foo") |
| self.AssertOutputContainsLine("foo") |
| cap.ClearCaptured() |
| self.AssertOutputContainsLine("foo", invert=True) |
| print("bar") |
| self.AssertOutputContainsLine("bar") |
| |
| @cros_test_lib.pytestmark_skip |
| def testRunCommandCapture(self): |
| """Check capturing run() subprocess output.""" |
| with self.OutputCapturer(): |
| cros_build_lib.run(["sh", "-c", "echo foo; echo bar >&2"]) |
| self.AssertOutputContainsLine("foo") |
| self.AssertOutputContainsLine( |
| "bar", check_stdout=False, check_stderr=True |
| ) |
| |
| def testCapturingStdoutAndStderrToFile(self): |
| """Check that OutputCapturer captures to a named file.""" |
| stdout_path = os.path.join(self.tempdir, "stdout") |
| stderr_path = os.path.join(self.tempdir, "stderr") |
| with self.OutputCapturer( |
| stdout_path=stdout_path, stderr_path=stderr_path |
| ): |
| print("foo") |
| print("bar", file=sys.stderr) |
| |
| # Check that output can be read by OutputCapturer. |
| self.AssertOutputContainsLine("foo") |
| self.AssertOutputContainsLine( |
| "bar", check_stdout=False, check_stderr=True |
| ) |
| # Verify that output is actually written to the correct files. |
| self.assertEqual("foo\n", osutils.ReadFile(stdout_path)) |
| self.assertEqual("bar\n", osutils.ReadFile(stderr_path)) |
| |
| |
| class RunCommandTestCase(cros_test_lib.RunCommandTestCase): |
| """Verify the test case behavior.""" |
| |
| def testPopenMockEncodingEmptyStrings(self): |
| """Verify automatic encoding in PopenMock works with default output.""" |
| self.rc.AddCmdResult(["/x"]) |
| result = cros_build_lib.run(["/x"], capture_output=True) |
| self.assertEqual(b"", result.stdout) |
| self.assertEqual(b"", result.stderr) |
| result = cros_build_lib.run( |
| ["/x"], capture_output=True, encoding="utf-8" |
| ) |
| self.assertEqual("", result.stdout) |
| self.assertEqual("", result.stderr) |
| |
| def testPopenMockBinaryData(self): |
| """Verify our automatic encoding in PopenMock works with bytes.""" |
| self.rc.AddCmdResult(["/x"], stderr=b"\xff") |
| result = cros_build_lib.run(["/x"], capture_output=True) |
| self.assertEqual(b"", result.stdout) |
| self.assertEqual(b"\xff", result.stderr) |
| with self.assertRaises(UnicodeDecodeError): |
| cros_build_lib.run(["/x"], capture_output=True, encoding="utf-8") |
| |
| def testPopenMockMixedData(self): |
| """Verify our automatic encoding in PopenMock works with mixed data.""" |
| self.rc.AddCmdResult(["/x"], stderr=b"abc\x00", stdout="Yes\u20a0") |
| result = cros_build_lib.run(["/x"], capture_output=True) |
| self.assertEqual(b"Yes\xe2\x82\xa0", result.stdout) |
| self.assertEqual(b"abc\x00", result.stderr) |
| result = cros_build_lib.run( |
| ["/x"], capture_output=True, encoding="utf-8" |
| ) |
| self.assertEqual("Yes\u20a0", result.stdout) |
| self.assertEqual("abc\x00", result.stderr) |
| |
| def testPopenMockCombiningStderr(self): |
| """Verify combining stderr into stdout works.""" |
| self.rc.AddCmdResult(["/x"], stderr="err", stdout="out") |
| result = cros_build_lib.run(["/x"], stdout=True, stderr=True) |
| self.assertEqual(b"err", result.stderr) |
| self.assertEqual(b"out", result.stdout) |
| result = cros_build_lib.run( |
| ["/x"], stdout=True, stderr=subprocess.STDOUT |
| ) |
| self.assertEqual(None, result.stderr) |
| self.assertEqual(b"outerr", result.stdout) |
| |
| def testExecutable(self): |
| """Verify executable arg is handled.""" |
| self.rc.AddCmdResult(["/x"], stderr="err", stdout="out") |
| result = cros_build_lib.run( |
| ["/x"], executable="/exe", stdout=True, stderr=True |
| ) |
| self.assertEqual(b"err", result.stderr) |
| self.assertEqual(b"out", result.stdout) |