blob: 8599f4a95e8c8e5f36083aebc36e59e6123de8da [file] [log] [blame]
# Copyright 2012 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Test the cros_build_lib module."""
import builtins
import contextlib
import datetime
import difflib
import logging
import os
from pathlib import Path
import shutil
import signal
import subprocess
from typing import Any, List
from unittest import mock
from chromite.lib import constants
from chromite.lib import cros_build_lib
from chromite.lib import cros_test_lib
from chromite.lib import osutils
class RunCommandErrorStrTest(cros_test_lib.TestCase):
"""Test that RunCommandError __str__ works as expected."""
def testNonUTF8Characters(self) -> None:
"""Test that non-UTF8 characters do not kill __str__"""
result = cros_build_lib.run(["ls", "/does/not/exist"], check=False)
rce = cros_build_lib.RunCommandError("\x81", result)
str(rce)
class CmdToStrTest(cros_test_lib.TestCase):
"""Test the CmdToStr function."""
def setUp(self) -> None:
self.differ = difflib.Differ()
def _assertEqual(self, func, test_input, test_output, result) -> None:
"""Like assertEqual but with built-in diff support."""
msg = "Expected %s to translate %r to %r, but got %r" % (
func,
test_input,
test_output,
result,
)
self.assertEqual(test_output, result, msg)
def _testData(self, functor, tests, check_type=True) -> None:
"""Process an iterable of test data."""
for test_output, test_input in tests:
result = functor(test_input)
self._assertEqual(functor.__name__, test_input, test_output, result)
if check_type:
# Also make sure the result is a string, otherwise the %r output
# will include a "u" prefix and that is not good for logging.
self.assertEqual(type(test_output), str)
def testShellQuote(self) -> None:
"""Basic ShellQuote tests."""
# Tuples of (expected output string, input data).
tests_quote = (
("''", ""),
("a", "a"),
("'a b c'", "a b c"),
("'a\tb'", "a\tb"),
("'a\nb'", "a\nb"),
("'/a$file'", "/a$file"),
("'/a#file'", "/a#file"),
("""'b"c'""", 'b"c'),
("'a@()b'", "a@()b"),
("j%k", "j%k"),
(r'''"s'a\$va\\rs"''', r"s'a$va\rs"),
(r'''"\\'\\\""''', r'''\'\"'''),
(r'''"'\\\$"''', r"""'\$"""),
)
bytes_quote = (
# Since we allow passing bytes down, quote them too.
("bytes", b"bytes"),
("'by tes'", b"by tes"),
("bytes", "bytes"),
("'by tes'", "by tes"),
)
# Expected input output specific to ShellUnquote. This string cannot be
# produced by ShellQuote but is still a valid bash escaped string.
tests_unquote = ((r"""\$""", r'''"\\$"'''),)
def aux(s):
return cros_build_lib.ShellUnquote(cros_build_lib.ShellQuote(s))
# We can only go one way bytes->string.
self._testData(cros_build_lib.ShellQuote, bytes_quote)
self._testData(aux, [(x, x) for x, _ in bytes_quote], False)
self._testData(cros_build_lib.ShellQuote, tests_quote)
self._testData(cros_build_lib.ShellUnquote, tests_unquote)
# Test that the operations are reversible.
self._testData(aux, [(x, x) for x, _ in tests_quote], False)
self._testData(aux, [(x, x) for _, x in tests_quote], False)
def testShellQuoteOjbects(self) -> None:
"""Test objects passed to ShellQuote."""
self.assertEqual("/", cros_build_lib.ShellQuote(Path("/")))
self.assertEqual("None", cros_build_lib.ShellQuote(None))
self.assertNotEqual("", cros_build_lib.ShellQuote)
def testCmdToStr(self) -> None:
# Dict of expected output strings to input lists.
tests = (
(r"a b", ["a", "b"]),
(r"'a b' c", ["a b", "c"]),
(r'''a "b'c"''', ["a", "b'c"]),
(r'''a "/'\$b" 'a b c' "xy'z"''', ["a", "/'$b", "a b c", "xy'z"]),
("", []),
("a b c", [b"a", "b", "c"]),
("bad None cmd", ["bad", None, "cmd"]),
)
self._testData(cros_build_lib.CmdToStr, tests)
class TestCalledProcessError(cros_test_lib.TestCase):
"""Test CalledProcessError API."""
def testOutputStdout(self) -> None:
"""Make sure .output is removed and .stdout works."""
e = cros_build_lib.CalledProcessError(
0, ["true"], stdout="STDOUT", stderr="STDERR"
)
with self.assertRaises(AttributeError):
assert e.output is None
assert e.stdout == "STDOUT"
assert e.stderr == "STDERR"
e.stdout = "STDout"
e.stderr = "STDerr"
with self.assertRaises(AttributeError):
assert e.output is None
assert e.stdout == "STDout"
assert e.stderr == "STDerr"
class TestRunCommandNoMock(cros_test_lib.TestCase):
"""Class that tests run by not mocking subprocess.Popen"""
def testErrorCodeNotRaisesError(self) -> None:
"""Don't raise exception when command returns non-zero exit code."""
result = cros_build_lib.run(["ls", "/does/not/exist"], check=False)
self.assertTrue(result.returncode != 0)
def testMissingCommandRaisesError(self) -> None:
"""Raise error when command is not found."""
self.assertRaises(
cros_build_lib.RunCommandError,
cros_build_lib.run,
["/does/not/exist"],
check=True,
)
self.assertRaises(
cros_build_lib.RunCommandError,
cros_build_lib.run,
["/does/not/exist"],
check=False,
)
def testDryRun(self) -> None:
"""Verify dryrun doesn't run the real command."""
# Check exit & output when not captured.
result = cros_build_lib.run(["false"], dryrun=True)
self.assertEqual(0, result.returncode)
self.assertEqual(None, result.stdout)
self.assertEqual(None, result.stderr)
# Check captured binary output.
result = cros_build_lib.run(
["echo", "hi"], dryrun=True, capture_output=True
)
self.assertEqual(0, result.returncode)
self.assertEqual(b"", result.stdout)
self.assertEqual(b"", result.stderr)
# Check captured text output.
result = cros_build_lib.run(
["echo", "hi"], dryrun=True, capture_output=True, encoding="utf-8"
)
self.assertEqual(0, result.returncode)
self.assertEqual("", result.stdout)
self.assertEqual("", result.stderr)
# Check captured merged output.
result = cros_build_lib.run(
["echo", "hi"], dryrun=True, stdout=True, stderr=subprocess.STDOUT
)
self.assertEqual(0, result.returncode)
self.assertEqual(b"", result.stdout)
self.assertEqual(None, result.stderr)
def testInputBytes(self) -> None:
"""Verify input argument when it is bytes."""
for data in (b"", b"foo", b"bar\nhigh"):
result = cros_build_lib.run(
["cat"], input=data, capture_output=True
)
self.assertEqual(result.stdout, data)
def testInputBytesEncoding(self) -> None:
"""Verify bytes input argument when encoding is set."""
for data in (b"", b"foo", b"bar\nhigh"):
result = cros_build_lib.run(
["cat"], input=data, encoding="utf-8", capture_output=True
)
self.assertEqual(result.stdout, data.decode("utf-8"))
def testInputString(self) -> None:
"""Verify input argument when it is a string."""
for data in ("", "foo", "bar\nhigh"):
result = cros_build_lib.run(
["cat"], input=data, capture_output=True
)
self.assertEqual(result.stdout, data.encode("utf-8"))
def testInputStringEncoding(self) -> None:
"""Verify bytes input argument when encoding is set."""
for data in ("", "foo", "bar\nhigh"):
result = cros_build_lib.run(
["cat"], input=data, encoding="utf-8", capture_output=True
)
self.assertEqual(result.stdout, data)
def testInputFileObject(self) -> None:
"""Verify input argument when it is a file object."""
result = cros_build_lib.run(
["cat"],
# TODO(b/236161656): Fix.
# pylint: disable-next=consider-using-with
input=open("/dev/null", encoding="utf-8"),
capture_output=True,
)
self.assertEqual(result.stdout, b"")
with open(__file__, encoding="utf-8") as f:
result = cros_build_lib.run(["cat"], input=f, capture_output=True)
self.assertEqual(
result.stdout, osutils.ReadFile(__file__, mode="rb")
)
def testInputFileDescriptor(self) -> None:
"""Verify input argument when it is a file descriptor."""
with open("/dev/null", encoding="utf-8") as f:
result = cros_build_lib.run(
["cat"], input=f.fileno(), capture_output=True
)
self.assertEqual(result.stdout, b"")
with open(__file__, encoding="utf-8") as f:
result = cros_build_lib.run(
["cat"], input=f.fileno(), capture_output=True
)
self.assertEqual(
result.stdout, osutils.ReadFile(__file__, mode="rb")
)
def testMixedEncodingCommand(self) -> None:
"""Verify cmd can mix bytes & strings."""
result = cros_build_lib.run(
[b"echo", "hi", "ß"], capture_output=True, encoding="utf-8"
)
self.assertEqual(result.stdout, "hi ß\n")
def testEncodingBinaryOutput(self) -> None:
"""Verify encoding=None output handling."""
result = cros_build_lib.run(
b"echo o\xff ut; echo e\xff rr >&2", shell=True, capture_output=True
)
self.assertEqual(result.stdout, b"o\xff ut\n")
self.assertEqual(result.stderr, b"e\xff rr\n")
def testEncodingUtf8Output(self) -> None:
"""Verify encoding='utf-8' output handling."""
result = cros_build_lib.run(
["echo", "ß"], capture_output=True, encoding="utf-8"
)
self.assertEqual(result.stdout, "ß\n")
def testEncodingStrictInvalidUtf8Output(self) -> None:
"""Verify encoding='utf-8' output with invalid content."""
with self.assertRaises(UnicodeDecodeError):
cros_build_lib.run(
["echo", b"\xff"], capture_output=True, encoding="utf-8"
)
with self.assertRaises(UnicodeDecodeError):
cros_build_lib.run(
["echo", b"\xff"],
capture_output=True,
encoding="utf-8",
errors="strict",
)
def testEncodingReplaceInvalidUtf8Output(self) -> None:
"""Verify invalid content's encoding='utf-8' errors='replace' output."""
result = cros_build_lib.run(
["echo", b"S\xffE"],
capture_output=True,
encoding="utf-8",
errors="replace",
)
self.assertEqual(result.stdout, "S\ufffdE\n")
def testCommandArgsValidTypes(self) -> None:
"""Verify command args can be of known types."""
# Support bytes, strings, and Path objects.
result = cros_build_lib.run(
["echo", b"bytes", Path("path")], capture_output=True
)
self.assertEqual(result.stdout, b"bytes path\n")
def testCommandArgsInvalidTypes(self) -> None:
"""Verify command args with invalid types are rejected."""
with self.assertRaises(TypeError):
cros_build_lib.run(["echo", 1234], capture_output=True)
def testExecutable(self) -> None:
"""Verify executable arg is handled correctly."""
# This should run the echo program.
result = cros_build_lib.run(
["asdf", "1234"],
executable=shutil.which("echo"),
capture_output=True,
encoding="utf-8",
)
self.assertEqual(result.stdout, "1234\n")
# The shell should see the custom argv[0].
result = cros_build_lib.run(
["-asdf", "-c", 'printf %s "$0"'],
executable=shutil.which("sh"),
capture_output=True,
encoding="utf-8",
)
self.assertEqual(result.stdout, "-asdf")
def testCmdTimeout(self) -> None:
"""Verify cmd_timeout arg is handled correctly."""
m = mock.Mock()
# The command successfully runs without hitting the timeout.
result = cros_build_lib.run(
["sleep", "0"], cmd_timeout=1, pre_timeout_hook=m.pre_timeout_hook
)
self.assertEqual(0, result.returncode)
m.pre_timeout_hook.assert_not_called()
# Raises an exception and runs the hook when hitting the timeout.
self.assertRaises(
cros_build_lib.RunCommandError,
cros_build_lib.run,
["sleep", "1"],
cmd_timeout=0,
pre_timeout_hook=m.pre_timeout_hook,
)
m.pre_timeout_hook.assert_called_once()
def _ForceLoggingLevel(functor):
def inner(*args, **kwargs):
logger = logging.getLogger()
current = logger.getEffectiveLevel()
try:
logger.setLevel(logging.INFO)
return functor(*args, **kwargs)
finally:
logger.setLevel(current)
return inner
class TestRunCommand(cros_test_lib.MockTestCase):
"""Tests of run functionality."""
def setUp(self) -> None:
# These ENV variables affect run behavior, hide them.
self._old_envs = {
e: os.environ.pop(e)
for e in constants.ENV_PASSTHRU
if e in os.environ
}
# Get the original value for SIGINT so our signal() mock can return the
# correct thing.
self._old_sigint = signal.getsignal(signal.SIGINT)
# Mock the return value of Popen().
self.stdin = None
self.stderr = b"test error"
self.stdout = b"test output"
self.proc_mock = mock.MagicMock(
returncode=0, communicate=self._Communicate
)
self.popen_mock = self.PatchObject(
cros_build_lib, "_Popen", return_value=self.proc_mock
)
self.signal_mock = self.PatchObject(signal, "signal")
self.getsignal_mock = self.PatchObject(signal, "getsignal")
def tearDown(self) -> None:
# Restore hidden ENVs.
os.environ.update(self._old_envs)
def _Communicate(self, stdin, timeout=None):
"""Used by mocked _Popen for communicate method.
This allows us to capture what was passed on input.
"""
del timeout # To suppress the unused argument lint error.
self.stdin = stdin
return self.stdout, self.stderr
@contextlib.contextmanager
def _MockChecker(self, cmd, **kwargs):
"""Verify the mocks we set up"""
ignore_sigint = kwargs.pop("ignore_sigint", False)
# Make some arbitrary functors we can pretend are signal handlers.
# Note that these are intentionally defined on the fly via lambda-
# this is to ensure that they're unique to each run.
sigint_suppress = lambda signum, frame: None
sigint_suppress.__name__ = "sig_ign_sigint"
normal_sigint = lambda signum, frame: None
normal_sigint.__name__ = "sigint"
normal_sigterm = lambda signum, frame: None
normal_sigterm.__name__ = "sigterm"
# Set up complicated mock for signal.signal().
def _SignalChecker(sig, _action):
"""Return the right signal values so we can check the calls."""
if sig == signal.SIGINT:
return sigint_suppress if ignore_sigint else normal_sigint
elif sig == signal.SIGTERM:
return normal_sigterm
else:
raise ValueError("unknown sig %i" % sig)
self.signal_mock.side_effect = _SignalChecker
# Set up complicated mock for signal.getsignal().
def _GetsignalChecker(sig):
"""Return the right signal values so we can check the calls."""
if sig == signal.SIGINT:
return sigint_suppress if ignore_sigint else normal_sigint
elif sig == signal.SIGTERM:
return normal_sigterm
else:
raise ValueError("unknown sig %i" % sig)
self.getsignal_mock.side_effect = _GetsignalChecker
# Let the body of code run, then check the signal behavior afterwards.
# We don't get visibility into signal ordering vs command execution,
# but it's kind of hard to mess up that, so we won't bother.
yield
class RejectSigIgn:
"""Make sure the signal action is not SIG_IGN."""
def __eq__(self, other: Any) -> bool:
return other != signal.SIG_IGN
# Verify the signals checked/setup are correct.
if ignore_sigint:
self.signal_mock.assert_has_calls(
[
mock.call(signal.SIGINT, signal.SIG_IGN),
mock.call(signal.SIGTERM, RejectSigIgn()),
mock.call(signal.SIGINT, sigint_suppress),
mock.call(signal.SIGTERM, normal_sigterm),
]
)
else:
self.signal_mock.assert_has_calls(
[
mock.call(signal.SIGINT, RejectSigIgn()),
mock.call(signal.SIGTERM, RejectSigIgn()),
mock.call(signal.SIGINT, normal_sigint),
mock.call(signal.SIGTERM, normal_sigterm),
]
)
self.assertEqual(self.getsignal_mock.call_count, 2)
# Verify various args are passed down to the real command.
pargs = self.popen_mock.call_args[0][0]
self.assertEqual(cmd, pargs)
# Verify various kwargs are passed down to the real command.
pkwargs = self.popen_mock.call_args[1]
for key in ("cwd", "stdin", "stdout", "stderr"):
kwargs.setdefault(key, None)
kwargs.setdefault("shell", False)
kwargs.setdefault("env", mock.ANY)
kwargs["close_fds"] = True
self.longMessage = True
# TODO(b/236161656): Fix.
# pylint: disable-next=consider-using-dict-items
for key in kwargs.keys():
self.assertEqual(
kwargs[key], pkwargs[key], msg="kwargs[%s] mismatch" % key
)
def _AssertCrEqual(self, expected, actual) -> None:
"""Helper method to compare two CompletedProcess objects.
This is needed since assertEqual does not know how to compare two
CompletedProcess objects.
Args:
expected: a CompletedProcess object, expected result.
actual: a CompletedProcess object, actual result.
"""
self.assertEqual(expected.args, actual.args)
self.assertEqual(expected.stderr, actual.stderr)
self.assertEqual(expected.stdout, actual.stdout)
self.assertEqual(expected.returncode, actual.returncode)
@_ForceLoggingLevel
def _TestCmd(
self, cmd, real_cmd, sp_kv=None, rc_kv=None, sudo=False
) -> None:
"""Factor out common setup logic for testing run().
Args:
cmd: a string or an array of strings that will be passed to run.
real_cmd: the real command we expect run to call (might be modified
to have enter_chroot).
sp_kv: key-value pairs passed to subprocess.Popen().
rc_kv: key-value pairs passed to run().
sudo: use sudo_run() rather than run().
"""
if sp_kv is None:
sp_kv = {}
if rc_kv is None:
rc_kv = {}
stdout = None
stderr = None
if rc_kv.get("stdout") or rc_kv.get("capture_output"):
stdout = self.stdout
if rc_kv.get("stderr") or rc_kv.get("capture_output"):
stderr = self.stderr
expected_result = cros_build_lib.CompletedProcess(
args=real_cmd,
stdout=stdout,
stderr=stderr,
returncode=self.proc_mock.returncode,
)
arg_dict = {}
for attr in (
"close_fds",
"cwd",
"env",
"stdin",
"stdout",
"stderr",
"shell",
):
if attr in sp_kv:
arg_dict[attr] = sp_kv[attr]
else:
if attr == "close_fds":
arg_dict[attr] = True
elif attr == "shell":
arg_dict[attr] = False
else:
arg_dict[attr] = None
if sudo:
runcmd = cros_build_lib.sudo_run
else:
runcmd = cros_build_lib.run
with self._MockChecker(
real_cmd, ignore_sigint=rc_kv.get("ignore_sigint"), **sp_kv
):
actual_result = runcmd(cmd, **rc_kv)
# If run was called with encoding, we need to encode the result
# before making a comparison below, as the underlying data we are
# passing to _Popen is stored as bytes.
if "encoding" in rc_kv:
encoding = rc_kv["encoding"]
if actual_result.stdout is not None:
actual_result.stdout = actual_result.stdout.encode(encoding)
if actual_result.stderr is not None:
actual_result.stderr = actual_result.stderr.encode(encoding)
self._AssertCrEqual(expected_result, actual_result)
def testReturnCodeZeroWithArrayCmd(self, ignore_sigint=False) -> None:
"""--enter_chroot=False and --cmd is an array of strings.
Parameterized so this can also be used by some other tests w/ alternate
params to run().
Args:
ignore_sigint: If True, we'll tell run to ignore sigint.
"""
self.proc_mock.returncode = 0
cmd_list = ["foo", "bar", "roger"]
self._TestCmd(
cmd_list, cmd_list, rc_kv=dict(ignore_sigint=ignore_sigint)
)
def testSignalRestoreNormalCase(self) -> None:
"""Test run() properly sets/restores sigint. Normal case."""
self.testReturnCodeZeroWithArrayCmd(ignore_sigint=True)
def testReturnCodeZeroWithArrayCmdEnterChroot(self) -> None:
"""--enter_chroot=True and --cmd is an array of strings."""
self.proc_mock.returncode = 0
cmd_list = ["foo", "bar", "roger"]
real_cmd = cmd_list
if not cros_build_lib.IsInsideChroot():
real_cmd = ["cros_sdk", "--"] + cmd_list
self._TestCmd(cmd_list, real_cmd, rc_kv=dict(enter_chroot=True))
@_ForceLoggingLevel
def testCommandFailureRaisesError(self, ignore_sigint=False) -> None:
"""Verify error raised by communicate() is caught.
Parameterized so this can also be used by some other tests w/ alternate
params to run().
Args:
ignore_sigint: If True, we'll tell run to ignore sigint.
"""
cmd = "test cmd"
self.proc_mock.returncode = 1
with self._MockChecker(
["/bin/bash", "-c", cmd], ignore_sigint=ignore_sigint
):
self.assertRaises(
cros_build_lib.RunCommandError,
cros_build_lib.run,
cmd,
shell=True,
ignore_sigint=ignore_sigint,
check=True,
)
@_ForceLoggingLevel
def testSubprocessCommunicateExceptionRaisesError(
self, ignore_sigint=False
) -> None:
"""Verify error raised by communicate() is caught.
Parameterized so this can also be used by some other tests w/ alternate
params to run().
Args:
ignore_sigint: If True, we'll tell run to ignore sigint.
"""
cmd = ["test", "cmd"]
self.proc_mock.communicate = mock.MagicMock(side_effect=ValueError)
with self._MockChecker(cmd, ignore_sigint=ignore_sigint):
self.assertRaises(
ValueError, cros_build_lib.run, cmd, ignore_sigint=ignore_sigint
)
def testSignalRestoreExceptionCase(self) -> None:
"""Test run() properly sets/restores sigint. Exception case."""
self.testSubprocessCommunicateExceptionRaisesError(ignore_sigint=True)
@mock.patch(
"chromite.lib.cros_build_lib._get_trace_context", return_value={}
)
def testEnvWorks(self, _trace_context_mock) -> None:
"""Test run(..., env=xyz) works."""
# We'll put this bogus environment together, just to make sure
# subprocess.Popen gets passed it.
rc_env = {"Tom": "Jerry", "Itchy": "Scratchy"}
sp_env = dict(rc_env, LC_MESSAGES="C")
# This is a simple case, copied from testReturnCodeZeroWithArrayCmd()
self.proc_mock.returncode = 0
cmd_list = ["foo", "bar", "roger"]
# Run. We expect the env= to be passed through from sp
# (subprocess.Popen) to rc (run).
self._TestCmd(
cmd_list, cmd_list, sp_kv=dict(env=sp_env), rc_kv=dict(env=rc_env)
)
@mock.patch(
"chromite.lib.cros_build_lib._get_trace_context", return_value={}
)
def testExtraEnvOnlyWorks(self, _trace_context_mock) -> None:
"""Test run(..., extra_env=xyz) works."""
# We'll put this bogus environment together, just to make sure
# subprocess.Popen gets passed it.
extra_env = {"Pinky": "Brain"}
## This is a little bit circular, since the same logic is used to
## compute the value inside, but at least it checks that this happens.
total_env = os.environ.copy()
# The core run code forces this too.
total_env["LC_MESSAGES"] = "C"
total_env.update(extra_env)
# This is a simple case, copied from testReturnCodeZeroWithArrayCmd()
self.proc_mock.returncode = 0
cmd_list = ["foo", "bar", "roger"]
# Run. We expect the env= to be passed through from sp
# (subprocess.Popen) to rc (run).
self._TestCmd(
cmd_list,
cmd_list,
sp_kv=dict(env=total_env),
rc_kv=dict(extra_env=extra_env),
)
@mock.patch(
"chromite.lib.cros_build_lib._get_trace_context", return_value={}
)
def testClearEnvWorks(self, _trace_context_mock) -> None:
"""Test run(..., clear_env=["PATH"]) works."""
## This is a little bit circular, since the same logic is used to
## compute the value inside, but at least it checks that this happens.
total_env = os.environ.copy()
# The core run code forces this too.
total_env["LC_MESSAGES"] = "C"
# PATH should always be in our test environment, so we use that as our
# test variable.
total_env.pop("PATH")
# This is a simple case, copied from testReturnCodeZeroWithArrayCmd()
self.proc_mock.returncode = 0
cmd_list = ["foo", "bar", "roger"]
# Run. We expect the env= to be passed through from sp
# (subprocess.Popen) to rc (run).
self._TestCmd(
cmd_list,
cmd_list,
sp_kv=dict(env=total_env),
rc_kv=dict(clear_env=["PATH"]),
)
@mock.patch(
"chromite.lib.cros_build_lib.IsInsideChroot", return_value=False
)
@mock.patch(
"chromite.lib.cros_build_lib._get_trace_context", return_value={}
)
def testChrootClearEnvWorks(
self, _inchroot_mock, _trace_context_mock
) -> None:
"""Test run(..., enter_chroot=True, clear_env=a) works."""
# We'll put this bogus environment together, just to make sure
# subprocess.Popen gets passed it.
env = {"Tom": "Jerry", "Itchy": "Scratchy"}
extra_env = {
"Pinky": "Brain",
"Yakko": "Dot",
}
total_env = {
"Tom": "Jerry",
"Itchy": "Scratchy",
"Pinky": "Brain",
"LC_MESSAGES": "C",
}
# This is a simple case, copied from testReturnCodeZeroWithArrayCmd()
self.proc_mock.returncode = 0
cmd_list = ["foo", "bar", "roger"]
# Run. We expect the env= to be passed through from sp
# (subprocess.Popen) to rc (run).
self._TestCmd(
cmd_list,
["cros_sdk", "Pinky=Brain", "--"] + cmd_list,
sp_kv=dict(env=total_env),
rc_kv=dict(
env=env,
extra_env=extra_env,
enter_chroot=True,
clear_env=["Yakko"],
),
)
@mock.patch(
"chromite.lib.cros_build_lib._get_trace_context", return_value={}
)
def testExtraEnvTooWorks(self, _trace_context_mock) -> None:
"""Test run(..., env=xy, extra_env=z) works."""
# We'll put this bogus environment together, just to make sure
# subprocess.Popen gets passed it.
env = {"Tom": "Jerry", "Itchy": "Scratchy"}
extra_env = {"Pinky": "Brain"}
total_env = {
"Tom": "Jerry",
"Itchy": "Scratchy",
"Pinky": "Brain",
"LC_MESSAGES": "C",
}
# This is a simple case, copied from testReturnCodeZeroWithArrayCmd()
self.proc_mock.returncode = 0
cmd_list = ["foo", "bar", "roger"]
# Run. We expect the env= to be passed through from sp
# (subprocess.Popen) to rc (run).
self._TestCmd(
cmd_list,
cmd_list,
sp_kv=dict(env=total_env),
rc_kv=dict(env=env, extra_env=extra_env),
)
@mock.patch(
"chromite.lib.cros_build_lib.IsInsideChroot", return_value=False
)
@mock.patch(
"chromite.lib.cros_build_lib._get_trace_context", return_value={}
)
def testChrootExtraEnvWorks(
self, _inchroot_mock, _trace_context_mock
) -> None:
"""Test run(..., enter_chroot=True, env=xy, extra_env=z) works."""
# We'll put this bogus environment together, just to make sure
# subprocess.Popen gets passed it.
env = {"Tom": "Jerry", "Itchy": "Scratchy"}
extra_env = {"Pinky": "Brain"}
total_env = {
"Tom": "Jerry",
"Itchy": "Scratchy",
"Pinky": "Brain",
"LC_MESSAGES": "C",
}
# This is a simple case, copied from testReturnCodeZeroWithArrayCmd()
self.proc_mock.returncode = 0
cmd_list = ["foo", "bar", "roger"]
# Run. We expect the env= to be passed through from sp
# (subprocess.Popen) to rc (run).
self._TestCmd(
cmd_list,
["cros_sdk", "Pinky=Brain", "--"] + cmd_list,
sp_kv=dict(env=total_env),
rc_kv=dict(env=env, extra_env=extra_env, enter_chroot=True),
)
def testExceptionEquality(self) -> None:
"""Verify equality methods for RunCommandError"""
c1 = cros_build_lib.CompletedProcess(["ls", "arg"], returncode=1)
c2 = cros_build_lib.CompletedProcess(["ls", "arg1"], returncode=1)
c3 = cros_build_lib.CompletedProcess(["ls", "arg"], returncode=2)
e1 = cros_build_lib.RunCommandError("Message 1", c1)
e2 = cros_build_lib.RunCommandError("Message 1", c1)
e_diff_msg = cros_build_lib.RunCommandError("Message 2", c1)
e_diff_cmd = cros_build_lib.RunCommandError("Message 1", c2)
e_diff_code = cros_build_lib.RunCommandError("Message 1", c3)
self.assertEqual(e1, e2)
self.assertNotEqual(e1, e_diff_msg)
self.assertNotEqual(e1, e_diff_cmd)
self.assertNotEqual(e1, e_diff_code)
def testSudoRunCommand(self) -> None:
"""Test sudo_run(...) works."""
cmd_list = ["foo", "bar", "roger"]
sudo_list = ["sudo", "--"] + cmd_list
self.proc_mock.returncode = 0
self._TestCmd(cmd_list, sudo_list, sudo=True)
def testSudoRunCommandShell(self) -> None:
"""Test sudo_run(..., shell=True) works."""
cmd = "foo bar roger"
sudo_list = ["sudo", "--", "/bin/bash", "-c", cmd]
self.proc_mock.returncode = 0
self._TestCmd(cmd, sudo_list, sudo=True, rc_kv=dict(shell=True))
def testSudoRunCommandEnv(self) -> None:
"""Test sudo_run(..., extra_env=z) works."""
cmd_list = ["foo", "bar", "roger"]
sudo_list = ["sudo", "shucky=ducky", "--"] + cmd_list
extra_env = {"shucky": "ducky"}
self.proc_mock.returncode = 0
self._TestCmd(
cmd_list, sudo_list, sudo=True, rc_kv=dict(extra_env=extra_env)
)
def testSudoRunCommandUser(self) -> None:
"""Test sudo_run(..., user='...') works."""
cmd_list = ["foo", "bar", "roger"]
sudo_list = ["sudo", "-u", "MMMMMonster", "--"] + cmd_list
self.proc_mock.returncode = 0
self._TestCmd(
cmd_list, sudo_list, sudo=True, rc_kv=dict(user="MMMMMonster")
)
def testSudoRunCommandUserShell(self) -> None:
"""Test sudo_run(..., user='...', shell=True) works."""
cmd = "foo bar roger"
sudo_list = ["sudo", "-u", "MMMMMonster", "--", "/bin/bash", "-c", cmd]
self.proc_mock.returncode = 0
self._TestCmd(
cmd,
sudo_list,
sudo=True,
rc_kv=dict(user="MMMMMonster", shell=True),
)
def testInputBytes(self) -> None:
"""Test that we can always pass non-UTF-8 bytes as input."""
cmd_list = ["foo", "bar", "roger"]
bytes_input = b"\xff"
self.proc_mock.returncode = 0
self._TestCmd(
cmd_list,
cmd_list,
sp_kv={"stdin": subprocess.PIPE},
rc_kv={"input": bytes_input, "encoding": "utf-8"},
)
self.assertEqual(self.stdin, bytes_input)
def testInputString(self) -> None:
"""Test that we encode UTF-8 strings passed on input."""
cmd_list = ["foo", "bar", "roger"]
unicode_input = "💩"
self.proc_mock.returncode = 0
self._TestCmd(
cmd_list,
cmd_list,
sp_kv={"stdin": subprocess.PIPE},
rc_kv={"input": unicode_input, "encoding": "utf-8"},
)
self.assertEqual(self.stdin, unicode_input.encode("utf-8"))
def testInputStringNoEncoding(self) -> None:
"""Verify we encode UTF-8 input strings w/out passing encoding."""
cmd_list = ["foo", "bar", "roger"]
unicode_input = "💩"
self.proc_mock.returncode = 0
self._TestCmd(
cmd_list,
cmd_list,
sp_kv={"stdin": subprocess.PIPE},
rc_kv={"input": unicode_input},
)
self.assertEqual(self.stdin, unicode_input.encode("utf-8"))
# TODO(crbug.com/1072139): Migrate tests to use 'legacy_capture_output' fixture
# once this module is Python 3-only.
class TestRunCommandOutput(
cros_test_lib.TempDirTestCase, cros_test_lib.OutputTestCase
):
"""Tests of run output options."""
@_ForceLoggingLevel
def testLogStdoutToFile(self) -> None:
log = os.path.join(self.tempdir, "output")
ret = cros_build_lib.run(["echo", "monkeys"], stdout=log)
self.assertEqual(osutils.ReadFile(log), "monkeys\n")
self.assertIs(ret.stdout, None)
self.assertIs(ret.stderr, None)
os.unlink(log)
ret = cros_build_lib.run(
["sh", "-c", "echo monkeys3 >&2"], stdout=log, stderr=True
)
self.assertEqual(ret.stderr, b"monkeys3\n")
self.assertExists(log)
self.assertEqual(os.path.getsize(log), 0)
os.unlink(log)
ret = cros_build_lib.run(
["sh", "-c", "echo monkeys4; echo monkeys5 >&2"],
stdout=log,
stderr=subprocess.STDOUT,
)
self.assertIs(ret.stdout, None)
self.assertIs(ret.stderr, None)
self.assertEqual(osutils.ReadFile(log), "monkeys4\nmonkeys5\n")
@_ForceLoggingLevel
def testLogStderrToFile(self) -> None:
log = os.path.join(self.tempdir, "output")
ret = cros_build_lib.run(["sh", "-c", "echo monkeys >&2"], stderr=log)
self.assertEqual(osutils.ReadFile(log), "monkeys\n")
self.assertIs(ret.stdout, None)
self.assertIs(ret.stderr, None)
@_ForceLoggingLevel
def testLogStdoutToFileWithOrWithoutAppend(self) -> None:
log = os.path.join(self.tempdir, "output")
ret = cros_build_lib.run(["echo", "monkeys"], stdout=log)
self.assertEqual(osutils.ReadFile(log), "monkeys\n")
self.assertIs(ret.stdout, None)
self.assertIs(ret.stderr, None)
# Without append.
ret = cros_build_lib.run(["echo", "monkeys2"], stdout=log)
self.assertEqual(osutils.ReadFile(log), "monkeys2\n")
self.assertIs(ret.stdout, None)
self.assertIs(ret.stderr, None)
def testOutputPath(self) -> None:
"""Check stdout=/stderr= works with Path objects."""
stdout = self.tempdir / "stdout"
stderr = self.tempdir / "stderr"
cros_build_lib.run(
["sh", "-c", "echo out; echo err >&2"],
stdout=stdout,
stderr=stderr,
)
self.assertEqual(stdout.read_bytes(), b"out\n")
self.assertEqual(stderr.read_bytes(), b"err\n")
def testOutputFileHandle(self) -> None:
"""Verify writing to existing file handles."""
stdout = os.path.join(self.tempdir, "stdout")
stderr = os.path.join(self.tempdir, "stderr")
with open(stdout, "wb") as outfp:
with open(stderr, "wb") as errfp:
cros_build_lib.run(
["sh", "-c", "echo out; echo err >&2"],
stdout=outfp,
stderr=errfp,
)
self.assertEqual("out\n", osutils.ReadFile(stdout))
self.assertEqual("err\n", osutils.ReadFile(stderr))
# TODO(crbug.com/1072139): Re-enable this test and migrate away from using
# OutputCapturer once this module is Python 3 only.
@cros_test_lib.pytestmark_skip
def testRunCommandAtNoticeLevel(self) -> None:
"""Ensure that run prints output when mute_output is False."""
# Needed by cros_sdk and brillo/cros chroot.
with self.OutputCapturer():
cros_build_lib.run(
["echo", "foo"],
check=False,
print_cmd=False,
debug_level=logging.NOTICE,
)
self.AssertOutputContainsLine("foo")
def testRunCommandRedirectStdoutStderrOnCommandError(self) -> None:
"""Tests that stderr is captured when run raises."""
with self.assertRaises(cros_build_lib.RunCommandError) as cm:
cros_build_lib.run(["cat", "/"], stderr=True)
self.assertIsNotNone(cm.exception.stderr)
self.assertNotEqual("", cm.exception.stderr)
def _CaptureLogOutput(self, cmd, **kwargs):
"""Capture logging output of run."""
log = os.path.join(self.tempdir, "output")
fh = logging.FileHandler(log)
fh.setLevel(logging.DEBUG)
logging.getLogger().addHandler(fh)
cros_build_lib.run(cmd, **kwargs)
logging.getLogger().removeHandler(fh)
output = osutils.ReadFile(log)
fh.close()
return output
@_ForceLoggingLevel
def testLogOutput(self) -> None:
"""Normal log_output, stdout followed by stderr."""
cmd = "echo Greece; echo Italy >&2; echo Spain"
log_output = (
"run: /bin/bash -c "
"'echo Greece; echo Italy >&2; echo Spain'\n"
"(stdout):\nGreece\nSpain\n\n(stderr):\nItaly\n\n"
)
output = self._CaptureLogOutput(
cmd, shell=True, log_output=True, encoding="utf-8"
)
self.assertEqual(output, log_output)
class HelperMethodSimpleTests(cros_test_lib.OutputTestCase):
"""Tests for various helper methods without using mocks."""
def testUserDateTime(self) -> None:
"""Test with a raw time value."""
expected = "Mon, 16 Jun 1980 05:03:20 -0700 (PDT)"
with cros_test_lib.SetTimeZone("US/Pacific"):
timeval = 330005000
self.assertEqual(
cros_build_lib.UserDateTimeFormat(timeval=timeval), expected
)
def testUserDateTimeDateTime(self) -> None:
"""Test with a datetime object."""
expected = "Mon, 16 Jun 1980 00:00:00 -0700 (PDT)"
with cros_test_lib.SetTimeZone("US/Pacific"):
timeval = datetime.datetime(1980, 6, 16)
self.assertEqual(
cros_build_lib.UserDateTimeFormat(timeval=timeval), expected
)
def testUserDateTimeDateTimeInWinter(self) -> None:
"""Test that we correctly switch from PDT to PST."""
expected = "Wed, 16 Jan 1980 00:00:00 -0800 (PST)"
with cros_test_lib.SetTimeZone("US/Pacific"):
timeval = datetime.datetime(1980, 1, 16)
self.assertEqual(
cros_build_lib.UserDateTimeFormat(timeval=timeval), expected
)
def testUserDateTimeDateTimeInEST(self) -> None:
"""Test that we correctly switch from PDT to EST."""
expected = "Wed, 16 Jan 1980 00:00:00 -0500 (EST)"
with cros_test_lib.SetTimeZone("US/Eastern"):
timeval = datetime.datetime(1980, 1, 16)
self.assertEqual(
cros_build_lib.UserDateTimeFormat(timeval=timeval), expected
)
def testUserDateTimeCurrentTime(self) -> None:
"""Test that we can get the current time."""
cros_build_lib.UserDateTimeFormat()
def testParseUserDateTimeFormat(self) -> None:
stringtime = cros_build_lib.UserDateTimeFormat(100000.0)
self.assertEqual(
cros_build_lib.ParseUserDateTimeFormat(stringtime), 100000.0
)
def testGetRandomString(self) -> None:
"""Verify it looks valid."""
data = cros_build_lib.GetRandomString()
self.assertRegex(data, r"^[a-z0-9]+$")
self.assertEqual(32, len(data))
def testMachineDetails(self) -> None:
"""Verify we don't crash."""
contents = cros_build_lib.MachineDetails()
self.assertNotEqual(contents, "")
self.assertEqual(contents[-1], "\n")
class TestInput(cros_test_lib.MockOutputTestCase):
"""Tests of input gathering functionality."""
def testBooleanPrompt(self) -> None:
"""Verify BooleanPrompt() full behavior."""
m = self.PatchObject(builtins, "input")
m.return_value = ""
self.assertTrue(cros_build_lib.BooleanPrompt())
self.assertFalse(cros_build_lib.BooleanPrompt(default=False))
m.return_value = "yes"
self.assertTrue(cros_build_lib.BooleanPrompt())
m.return_value = "ye"
self.assertTrue(cros_build_lib.BooleanPrompt())
m.return_value = "y"
self.assertTrue(cros_build_lib.BooleanPrompt())
m.return_value = "no"
self.assertFalse(cros_build_lib.BooleanPrompt())
m.return_value = "n"
self.assertFalse(cros_build_lib.BooleanPrompt())
def testBooleanShellValue(self) -> None:
"""Verify BooleanShellValue() inputs work as expected"""
for v in (None,):
self.assertTrue(cros_build_lib.BooleanShellValue(v, True))
self.assertFalse(cros_build_lib.BooleanShellValue(v, False))
for v in (1234, "", "akldjsf", '"'):
self.assertRaises(
ValueError, cros_build_lib.BooleanShellValue, v, True
)
self.assertTrue(cros_build_lib.BooleanShellValue(v, True, msg=""))
self.assertFalse(cros_build_lib.BooleanShellValue(v, False, msg=""))
for v in (
"yes",
"YES",
"YeS",
"y",
"Y",
"1",
"true",
"True",
"TRUE",
):
self.assertTrue(cros_build_lib.BooleanShellValue(v, True))
self.assertTrue(cros_build_lib.BooleanShellValue(v, False))
for v in (
"no",
"NO",
"nO",
"n",
"N",
"0",
"false",
"False",
"FALSE",
):
self.assertFalse(cros_build_lib.BooleanShellValue(v, True))
self.assertFalse(cros_build_lib.BooleanShellValue(v, False))
def testGetChoiceLists(self) -> None:
"""Verify GetChoice behavior w/lists."""
m = self.PatchObject(builtins, "input")
m.return_value = "1"
ret = cros_build_lib.GetChoice("title", ["a", "b", "c"])
self.assertEqual(ret, 1)
def testGetChoiceGenerator(self) -> None:
"""Verify GetChoice behavior w/generators."""
m = self.PatchObject(builtins, "input")
m.return_value = "2"
ret = cros_build_lib.GetChoice("title", list(range(3)))
self.assertEqual(ret, 2)
def testGetChoiceWindow(self) -> None:
"""Verify GetChoice behavior w/group_size set."""
m = self.PatchObject(builtins, "input")
cnt = [0]
def _Gen():
while True:
cnt[0] += 1
yield "a"
m.side_effect = ["\n", "2"]
ret = cros_build_lib.GetChoice("title", _Gen(), group_size=2)
self.assertEqual(ret, 2)
# Verify we showed the correct number of times.
self.assertEqual(cnt[0], 5)
class Test_iflatten_instance(cros_test_lib.TestCase):
"""Test iflatten_instance function."""
def test_it(self) -> None:
f = lambda x, **kwargs: list(
cros_build_lib.iflatten_instance(x, **kwargs)
)
self.assertEqual([1, 2], f([1, 2]))
self.assertEqual([1, "2a"], f([1, "2a"]))
self.assertEqual([1, 2, "b"], f([1, [2, "b"]]))
self.assertEqual(
[1, 2, "f", "d", "a", "s"],
f([1, 2, ("fdas",)], terminate_on_kls=int),
)
self.assertEqual([""], f(""))
self.assertEqual([b""], f(b""))
self.assertEqual([b"1234"], f(b"1234"))
self.assertEqual([b"12", b"34"], f([b"12", b"34"]))
class TestAssertRootUserCheck(cros_test_lib.MockTestCase):
"""Tests root/Non-root user functionality for a root user."""
def setUp(self) -> None:
self.geteuid_mock = self.PatchObject(os, "geteuid", return_value=0)
def testAssertNonRootUserForRoot(self) -> None:
"""Verify AssertNonRootUser raises an exception"""
self.assertRaises(
cros_build_lib.DieSystemExit, cros_build_lib.AssertNonRootUser
)
def testAssertRootUserForRoot(self) -> None:
"""Verify AssertRootUser doesn't raise an exception"""
cros_build_lib.AssertRootUser()
class TestAssertNonRootUserCheck(cros_test_lib.MockTestCase):
"""Tests root/Non-root user functionality for a non-root user."""
def setUp(self) -> None:
self.geteuid_mock = self.PatchObject(os, "geteuid", return_value=20)
def testAssertNonRootUserforNonRoot(self) -> None:
"""Verify AssertNonRootUser doesn't raise an exception"""
cros_build_lib.AssertNonRootUser()
def testAssertRootUserforNonRoot(self) -> None:
"""Verify AssertRootUser raises an exception"""
self.assertRaises(
cros_build_lib.DieSystemExit, cros_build_lib.AssertRootUser
)
class TarballTests(cros_test_lib.TempDirTestCase):
"""Test tarball handling functions."""
def setUp(self) -> None:
"""Create files/dirs needed for tar test."""
self.tarball_path = os.path.join(self.tempdir, "test.tar.xz")
self.inputDir = os.path.join(self.tempdir, "inputs")
self.inputs = [
"inputA",
"inputB",
"sub/subfile",
"sub2/subfile",
]
self.inputsWithDirs = [
"inputA",
"inputB",
"sub",
"sub2",
]
# Create the input files.
for i in self.inputs:
osutils.WriteFile(os.path.join(self.inputDir, i), i, makedirs=True)
def testCreateSuccess(self) -> None:
"""Create a tarfile."""
cros_build_lib.CreateTarball(
self.tarball_path, self.inputDir, inputs=self.inputs
)
def testCreateSuccessWithDirs(self) -> None:
"""Create a tarfile."""
cros_build_lib.CreateTarball(
self.tarball_path, self.inputDir, inputs=self.inputsWithDirs
)
def testCreateSuccessWithTooManyFiles(self) -> None:
"""Test a tarfile creation with -T /dev/stdin."""
# pylint: disable=protected-access
num_inputs = cros_build_lib._THRESHOLD_TO_USE_T_FOR_TAR + 1
inputs = ["input%s" % x for x in range(num_inputs)]
largeInputDir = os.path.join(self.tempdir, "largeinputs")
for i in inputs:
osutils.WriteFile(os.path.join(largeInputDir, i), i, makedirs=True)
cros_build_lib.CreateTarball(
self.tarball_path, largeInputDir, inputs=inputs
)
def testCreateExtractSuccessWithNoCompressionProgram(self) -> None:
"""Create a tarfile without any compression, then extract it."""
path = os.path.join(self.tempdir, "test.tar")
cros_build_lib.CreateTarball(path, self.inputDir, inputs=self.inputs)
cros_build_lib.ExtractTarball(path, self.tempdir)
# Again, but using Path instead of str paths.
path = Path(path)
cros_build_lib.CreateTarball(
path, Path(self.inputDir), inputs=self.inputs
)
cros_build_lib.ExtractTarball(path, self.tempdir)
def testCreateExtractSuccessWithCompressionProgram(self) -> None:
"""Create a tarfile with compression, then extract it."""
tar_files = [
"test.tar.gz",
"test.tar.bz2",
"test.tar.xz",
"test.tar.zst",
]
dir_path = self.tempdir / "dir"
dir_path.mkdir()
D = cros_test_lib.Directory
dir_structure = [
D(".", []),
D("test", ["file1.txt"]),
D("foo", ["file1.txt"]),
D("bar", ["file1.txt", "file2.c"]),
]
cros_test_lib.CreateOnDiskHierarchy(dir_path, dir_structure)
for tar_file in tar_files:
tar_file_path = self.tempdir / tar_file
comp = cros_build_lib.CompressionExtToType(tar_file)
cros_build_lib.CreateTarball(
tar_file_path, dir_path, compression=comp
)
cros_test_lib.VerifyTarball(tar_file_path, dir_structure)
def testExtractFailureWithMissingFile(self) -> None:
"""Verify that stderr from tar is printed if in encounters an error."""
tarball = "a-tarball-which-does-not-exist.tar.gz"
try:
cros_build_lib.ExtractTarball(tarball, self.tempdir)
except cros_build_lib.TarballError as e:
# Check to see that tar's error message is printed in the exception.
self.assertIn("No such file or directory", e.args[0])
def test_IsTarball(self) -> None:
"""Test IsTarball helper function."""
self.assertTrue(cros_build_lib.IsTarball("file.tar"))
self.assertTrue(cros_build_lib.IsTarball("file.tar.bz2"))
self.assertTrue(cros_build_lib.IsTarball("file.tar.gz"))
self.assertTrue(cros_build_lib.IsTarball("file.tar.xz"))
self.assertTrue(cros_build_lib.IsTarball("file.tar.zst"))
self.assertTrue(cros_build_lib.IsTarball("file.tbz"))
self.assertTrue(cros_build_lib.IsTarball("file.txz"))
self.assertFalse(cros_build_lib.IsTarball("file.txt"))
self.assertFalse(cros_build_lib.IsTarball("file.tart"))
self.assertFalse(cros_build_lib.IsTarball("file.bz2"))
# Tests for tar exceptions.
class FailedCreateTarballExceptionTests(
cros_test_lib.TempDirTestCase, cros_test_lib.LoggingTestCase
):
"""Tests exception handling for CreateTarball."""
def setUp(self) -> None:
self.inputDir = os.path.join(self.tempdir, "BadInputDirectory")
def testSuccess(self) -> None:
"""Verify tarball creation when cwd and target dir exist."""
target_dir = os.path.join(self.tempdir, "target_dir")
target_file = os.path.join(target_dir, "stuff.tar")
osutils.SafeMakedirs(target_dir)
working_dir = os.path.join(self.tempdir, "working_dir")
osutils.SafeMakedirs(working_dir)
osutils.WriteFile(os.path.join(working_dir, "file1.txt"), "file1")
osutils.WriteFile(os.path.join(working_dir, "file2.txt"), "file2")
cros_build_lib.CreateTarball(target_file, working_dir)
target_contents = os.listdir(target_dir)
self.assertEqual(target_contents, ["stuff.tar"])
def testFailureBadTarget(self) -> None:
"""Verify expected error when target does not exist."""
target_dir = os.path.join(self.tempdir, "target_dir")
target_file = os.path.join(target_dir, "stuff.tar")
working_dir = os.path.join(self.tempdir, "working_dir")
osutils.SafeMakedirs(working_dir)
with cros_test_lib.LoggingCapturer() as logs:
with self.assertRaises(cros_build_lib.TarballError):
cros_build_lib.CreateTarball(target_file, working_dir)
self.AssertLogsContain(logs, "CreateTarball failed creating")
def testFailureBadWorkingDir(self) -> None:
"""Verify expected error when cwd does not exist."""
target_dir = os.path.join(self.tempdir, "target_dir")
osutils.SafeMakedirs(target_dir)
target_file = os.path.join(target_dir, "stuff.tar")
working_dir = os.path.join(self.tempdir, "working_dir")
with cros_test_lib.LoggingCapturer() as logs:
with self.assertRaises(cros_build_lib.RunCommandError):
cros_build_lib.CreateTarball(target_file, working_dir)
self.AssertLogsContain(logs, "CreateTarball unable to run tar for")
# Tests for tar failure retry logic.
class FailedCreateTarballTests(cros_test_lib.RunCommandTestCase):
"""Tests special case error handling for CreateTarball."""
def setUp(self) -> None:
"""Mock run mock."""
# Each test can change this value as needed. Each element is the return
# code in the CompletedProcess for subsequent calls to run().
self.tarResults = []
def Result(*_args, **_kwargs):
"""Creates CompletedProcess objects for each tarResults value."""
return cros_build_lib.CompletedProcess(
stdout="", stderr="", returncode=self.tarResults.pop(0)
)
self.rc.SetDefaultCmdResult(side_effect=Result)
def testSuccess(self) -> None:
"""CreateTarball works the first time."""
self.tarResults = [0]
cros_build_lib.CreateTarball("foo", "bar", inputs=["a", "b"])
self.assertEqual(self.rc.call_count, 1)
def testFailedOnceSoft(self) -> None:
"""Force a single retry for CreateTarball."""
self.tarResults = [1, 0]
cros_build_lib.CreateTarball("foo", "bar", inputs=["a", "b"], timeout=0)
self.assertEqual(self.rc.call_count, 2)
def testFailedOnceHard(self) -> None:
"""Test unrecoverable error."""
self.tarResults = [2]
with self.assertRaises(cros_build_lib.RunCommandError) as cm:
cros_build_lib.CreateTarball("foo", "bar", inputs=["a", "b"])
self.assertEqual(self.rc.call_count, 1)
self.assertEqual(cm.exception.args[1].returncode, 2)
def testFailedThriceSoft(self) -> None:
"""Exhaust retries for recoverable errors."""
self.tarResults = [1, 1, 1]
with self.assertRaises(cros_build_lib.RunCommandError) as cm:
cros_build_lib.CreateTarball(
"foo", "bar", inputs=["a", "b"], timeout=0
)
self.assertEqual(self.rc.call_count, 3)
self.assertEqual(cm.exception.args[1].returncode, 1)
class ClearShadowLocksTests(
cros_test_lib.TempDirTestCase, cros_test_lib.LoggingTestCase
):
"""Tests shadowlock files are removed from the given sysroot."""
def setUp(self) -> None:
D = cros_test_lib.Directory
file_layout = (
D(
"etc",
(
"test.lock",
"testfile.txt",
"passwd.lock",
"group.lock",
"shadow.lock",
"shadow.lockfile",
"gshadow.lock",
),
),
)
cros_test_lib.CreateOnDiskHierarchy(self.tempdir, file_layout)
def testClearShadowLocksSuccess(self) -> None:
cros_build_lib.ClearShadowLocks(self.tempdir)
self.assertTrue(os.path.exists(f"{self.tempdir}/etc/test.lock"))
self.assertTrue(os.path.exists(f"{self.tempdir}/etc/testfile.txt"))
self.assertFalse(os.path.exists(f"{self.tempdir}/etc/passwd.lock"))
self.assertFalse(os.path.exists(f"{self.tempdir}/etc/group.lock"))
self.assertFalse(os.path.exists(f"{self.tempdir}/etc/shadow.lock"))
self.assertFalse(os.path.exists(f"{self.tempdir}/etc/shadow.lockfile"))
self.assertFalse(os.path.exists(f"{self.tempdir}/etc/gshadow.lock"))
def testClearShadowLocksPathDoesNotExist(self) -> None:
with cros_test_lib.LoggingCapturer() as logs:
cros_build_lib.ClearShadowLocks(Path("fake/path/does/not/exist"))
self.AssertLogsContain(
logs,
"Unable to clear shadow-utils lockfiles, path does not exist",
)
class FindCompressorTests(cros_test_lib.TempDirTestCase):
"""Tests for `FindCompressor`."""
def _test_comp(
self, comps: List[str], comp_type: cros_build_lib.CompressionType
) -> None:
"""Helper for FindCompressor testing."""
for comp in comps:
comp_root = self.tempdir / comp
comp_path = comp_root / "bin" / comp
osutils.Touch(comp_path, makedirs=True, mode=0o755)
self.assertEqual(
comp_path,
Path(cros_build_lib.FindCompressor(comp_type, root=comp_root)),
)
def testFindCompressorXz(self) -> None:
"""Test FindCompressor with xz."""
self.assertEqual(
str(constants.CHROMITE_SCRIPTS_DIR / "xz_auto"),
cros_build_lib.FindCompressor(cros_build_lib.CompressionType.XZ),
)
def testFindCompressorGzip(self) -> None:
"""Test FindCompressor with gzip."""
comps = ("pigz", "gzip")
self._test_comp(comps, cros_build_lib.CompressionType.GZIP)
def testFindCompressorGzipNotFound(self) -> None:
"""Test FindCompressor with missing xz."""
self.assertEqual(
"gzip",
cros_build_lib.FindCompressor(
cros_build_lib.CompressionType.GZIP, root=self.tempdir
),
)
def testFindCompressorBzip2(self) -> None:
"""Test FindCompressor with bzip2."""
comps = ("lbzip2", "pbzip2", "bzip2")
self._test_comp(comps, cros_build_lib.CompressionType.BZIP2)
def testFindCompressorBzip2NotFound(self) -> None:
"""Test FindCompressor with missing bzip2."""
self.assertEqual(
"bzip2",
cros_build_lib.FindCompressor(
cros_build_lib.CompressionType.BZIP2, root=self.tempdir
),
)
def testFindCompressorZstd(self) -> None:
"""Test FindCompressor with zstd."""
comps = ("zstdmt", "zstd")
self._test_comp(comps, cros_build_lib.CompressionType.ZSTD)
def testFindCompressorZstdNotFound(self) -> None:
"""Test FindCompressor with missing zstd."""
self.assertEqual(
"zstd",
cros_build_lib.FindCompressor(
cros_build_lib.CompressionType.ZSTD, root=self.tempdir
),
)
def testFindCompressorNone(self) -> None:
"""Test FindCompressor for none type."""
self.assertEqual(
"cat",
cros_build_lib.FindCompressor(cros_build_lib.CompressionType.NONE),
)
def testFindCompressorInvalid(self) -> None:
"""Test FindCompressor with invalid compression type."""
with self.assertRaises(ValueError):
cros_build_lib.FindCompressor(888)
class CompressFileTests(
cros_test_lib.TempDirTestCase, cros_test_lib.RunCommandTestCase
):
"""Tests for `CompressFile`."""
def testCompressFile(self) -> None:
"""Test CompressFile."""
self.PatchObject(
cros_build_lib,
"CompressionExtToType",
return_value=cros_build_lib.CompressionType.ZSTD,
)
self.PatchObject(
cros_build_lib, "FindCompressor", return_value="<foobar comp>"
)
# Run test.
cros_build_lib.CompressFile(
self.tempdir / "infile", self.tempdir / "outfile"
)
# Verify.
self.rc.assertCommandCalled(
["<foobar comp>", "-c", "--", self.tempdir / "infile"],
stdout=self.tempdir / "outfile",
)
def testCompressFileWithCompressionLevel(self) -> None:
"""Test CompressFile with compression level."""
self.PatchObject(
cros_build_lib,
"CompressionExtToType",
return_value=cros_build_lib.CompressionType.ZSTD,
)
self.PatchObject(
cros_build_lib, "FindCompressor", return_value="<foobar comp>"
)
# Run test.
cros_build_lib.CompressFile(
self.tempdir / "infile",
self.tempdir / "outfile",
compression_level=123,
)
# Verify.
self.rc.assertCommandCalled(
[
"<foobar comp>",
"-c",
"-123",
"--",
self.tempdir / "infile",
],
stdout=self.tempdir / "outfile",
)
def testCompressFileWithZeroCompressionLevel(self) -> None:
"""Test CompressFile with compression level as 0."""
self.PatchObject(
cros_build_lib,
"CompressionExtToType",
return_value=cros_build_lib.CompressionType.ZSTD,
)
self.PatchObject(
cros_build_lib, "FindCompressor", return_value="<foobar comp>"
)
# Run test.
cros_build_lib.CompressFile(
self.tempdir / "infile",
self.tempdir / "outfile",
compression_level=0,
)
# Verify.
self.rc.assertCommandContains(
[
"<foobar comp>",
"-c",
"-0",
"--",
self.tempdir / "infile",
],
stdout=self.tempdir / "outfile",
)