blob: ef1f755d935c5003df9ddda0bc370ac7765c0aa0 [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Test the cros_build_lib module."""
from __future__ import print_function
import contextlib
import datetime
import difflib
import errno
import functools
import itertools
import os
import signal
import socket
import sys
import mock
import six
from six.moves import builtins
from chromite.lib import constants
from chromite.cbuildbot import repository
from chromite.lib import cros_build_lib
from chromite.lib import cros_logging as logging
from chromite.lib import cros_test_lib
from chromite.lib import git
from chromite.lib import osutils
from chromite.lib import partial_mock
from chromite.lib import signals as cros_signals
class RunCommandErrorStrTest(cros_test_lib.TestCase):
"""Test that RunCommandError __str__ works as expected."""
def testNonUTF8Characters(self):
"""Test that non-UTF8 characters do not kill __str__"""
result = cros_build_lib.RunCommand(['ls', '/does/not/exist'],
error_code_ok=True)
rce = cros_build_lib.RunCommandError('\x81', result)
str(rce)
class TruncateStringTest(cros_test_lib.TestCase):
"""Test the TruncateStringToLine function."""
def testTruncate(self):
self.assertEqual(cros_build_lib.TruncateStringToLine('1234567', 5),
'12...')
def testNoTruncate(self):
self.assertEqual(cros_build_lib.TruncateStringToLine('1234567', 7),
'1234567')
def testNoTruncateMultiline(self):
self.assertEqual(cros_build_lib.TruncateStringToLine('1234567\nasdf', 7),
'1234567')
class CmdToStrTest(cros_test_lib.TestCase):
"""Test the CmdToStr function."""
def setUp(self):
self.differ = difflib.Differ()
def _assertEqual(self, func, test_input, test_output, result):
"""Like assertEqual but with built in diff support."""
diff = '\n'.join(list(self.differ.compare([test_output], [result])))
msg = ('Expected %s to translate %r to %r, but got %r\n%s' %
(func, test_input, test_output, result, diff))
self.assertEqual(test_output, result, msg)
def _testData(self, functor, tests, check_type=True):
"""Process a dict of test data."""
for test_output, test_input in tests.items():
result = functor(test_input)
self._assertEqual(functor.__name__, test_input, test_output, result)
if check_type:
# Also make sure the result is a string, otherwise the %r output will
# include a "u" prefix and that is not good for logging.
self.assertEqual(type(test_output), str)
def testShellQuote(self):
"""Basic ShellQuote tests."""
# Dict of expected output strings to input lists.
tests_quote = {
"''": '',
'a': u'a',
"'a b c'": u'a b c',
"'a\tb'": 'a\tb',
"'/a$file'": '/a$file',
"'/a#file'": '/a#file',
"""'b"c'""": 'b"c',
"'a@()b'": 'a@()b',
'j%k': 'j%k',
# pylint: disable=invalid-triple-quote
# https://github.com/edaniszewski/pylint-quotes/issues/20
r'''"s'a\$va\\rs"''': r"s'a$va\rs",
r'''"\\'\\\""''': r'''\'\"''',
r'''"'\\\$"''': r"""'\$""",
}
# Expected input output specific to ShellUnquote. This string cannot be
# produced by ShellQuote but is still a valid bash escaped string.
tests_unquote = {
# pylint: disable=invalid-triple-quote
# https://github.com/edaniszewski/pylint-quotes/issues/20
r'''\$''': r'''"\\$"''',
}
def aux(s):
return cros_build_lib.ShellUnquote(cros_build_lib.ShellQuote(s))
self._testData(cros_build_lib.ShellQuote, tests_quote)
self._testData(cros_build_lib.ShellUnquote, tests_unquote)
# Test that the operations are reversible.
self._testData(aux, {k: k for k in tests_quote.values()}, False)
self._testData(aux, {k: k for k in tests_quote.keys()}, False)
def testCmdToStr(self):
# Dict of expected output strings to input lists.
tests = {
r'a b': ['a', 'b'],
r"'a b' c": ['a b', 'c'],
# pylint: disable=invalid-triple-quote
# https://github.com/edaniszewski/pylint-quotes/issues/20
r'''a "b'c"''': ['a', "b'c"],
r'''a "/'\$b" 'a b c' "xy'z"''':
[u'a', "/'$b", 'a b c', "xy'z"],
'': [],
}
self._testData(cros_build_lib.CmdToStr, tests)
class TestRunCommandNoMock(cros_test_lib.TestCase):
"""Class that tests RunCommand by not mocking subprocess.Popen"""
def testErrorCodeNotRaisesError(self):
"""Don't raise exception when command returns non-zero exit code."""
result = cros_build_lib.RunCommand(['ls', '/does/not/exist'],
error_code_ok=True)
self.assertTrue(result.returncode != 0)
def testMissingCommandRaisesError(self):
"""Raise error when command is not found."""
self.assertRaises(cros_build_lib.RunCommandError, cros_build_lib.RunCommand,
['/does/not/exist'], error_code_ok=False)
self.assertRaises(cros_build_lib.RunCommandError, cros_build_lib.RunCommand,
['/does/not/exist'], error_code_ok=True)
def testInputString(self):
"""Verify input argument when it is a string."""
for data in ('', 'foo', 'bar\nhigh'):
result = cros_build_lib.RunCommand(['cat'], input=data)
self.assertEqual(result.output, data)
def testInputFileObject(self):
"""Verify input argument when it is a file object."""
result = cros_build_lib.RunCommand(['cat'], input=open('/dev/null'))
self.assertEqual(result.output, '')
result = cros_build_lib.RunCommand(['cat'], input=open(__file__))
self.assertEqual(result.output, osutils.ReadFile(__file__))
def testInputFileDescriptor(self):
"""Verify input argument when it is a file descriptor."""
with open('/dev/null') as f:
result = cros_build_lib.RunCommand(['cat'], input=f.fileno())
self.assertEqual(result.output, '')
with open(__file__) as f:
result = cros_build_lib.RunCommand(['cat'], input=f.fileno())
self.assertEqual(result.output, osutils.ReadFile(__file__))
def _ForceLoggingLevel(functor):
def inner(*args, **kwargs):
logger = logging.getLogger()
current = logger.getEffectiveLevel()
try:
logger.setLevel(logging.INFO)
return functor(*args, **kwargs)
finally:
logger.setLevel(current)
return inner
class TestRunCommand(cros_test_lib.MockTestCase):
"""Tests of RunCommand functionality."""
def setUp(self):
# These ENV variables affect RunCommand behavior, hide them.
self._old_envs = {e: os.environ.pop(e) for e in constants.ENV_PASSTHRU
if e in os.environ}
# Get the original value for SIGINT so our signal() mock can return the
# correct thing.
self._old_sigint = signal.getsignal(signal.SIGINT)
# Mock the return value of Popen().
self.error = 'test error'
self.output = 'test output'
self.proc_mock = mock.MagicMock(
returncode=0,
communicate=lambda x: (self.output, self.error))
self.popen_mock = self.PatchObject(cros_build_lib, '_Popen',
return_value=self.proc_mock)
self.signal_mock = self.PatchObject(signal, 'signal')
self.getsignal_mock = self.PatchObject(signal, 'getsignal')
self.PatchObject(cros_signals, 'SignalModuleUsable', return_value=True)
def tearDown(self):
# Restore hidden ENVs.
os.environ.update(self._old_envs)
@contextlib.contextmanager
def _MockChecker(self, cmd, **kwargs):
"""Verify the mocks we set up"""
ignore_sigint = kwargs.pop('ignore_sigint', False)
# Make some arbitrary functors we can pretend are signal handlers.
# Note that these are intentionally defined on the fly via lambda-
# this is to ensure that they're unique to each run.
sigint_suppress = lambda signum, frame: None
sigint_suppress.__name__ = 'sig_ign_sigint'
normal_sigint = lambda signum, frame: None
normal_sigint.__name__ = 'sigint'
normal_sigterm = lambda signum, frame: None
normal_sigterm.__name__ = 'sigterm'
# Set up complicated mock for signal.signal().
def _SignalChecker(sig, _action):
"""Return the right signal values so we can check the calls."""
if sig == signal.SIGINT:
return sigint_suppress if ignore_sigint else normal_sigint
elif sig == signal.SIGTERM:
return normal_sigterm
else:
raise ValueError('unknown sig %i' % sig)
self.signal_mock.side_effect = _SignalChecker
# Set up complicated mock for signal.getsignal().
def _GetsignalChecker(sig):
"""Return the right signal values so we can check the calls."""
if sig == signal.SIGINT:
self.assertFalse(ignore_sigint)
return normal_sigint
elif sig == signal.SIGTERM:
return normal_sigterm
else:
raise ValueError('unknown sig %i' % sig)
self.getsignal_mock.side_effect = _GetsignalChecker
# Let the body of code run, then check the signal behavior afterwards.
# We don't get visibility into signal ordering vs command execution,
# but it's kind of hard to mess up that, so we won't bother.
yield
class RejectSigIgn(object):
"""Make sure the signal action is not SIG_IGN."""
def __eq__(self, other):
return other != signal.SIG_IGN
# Verify the signals checked/setup are correct.
if ignore_sigint:
self.signal_mock.assert_has_calls([
mock.call(signal.SIGINT, signal.SIG_IGN),
mock.call(signal.SIGTERM, RejectSigIgn()),
mock.call(signal.SIGINT, sigint_suppress),
mock.call(signal.SIGTERM, normal_sigterm),
])
self.assertEqual(self.getsignal_mock.call_count, 1)
else:
self.signal_mock.assert_has_calls([
mock.call(signal.SIGINT, RejectSigIgn()),
mock.call(signal.SIGTERM, RejectSigIgn()),
mock.call(signal.SIGINT, normal_sigint),
mock.call(signal.SIGTERM, normal_sigterm),
])
self.assertEqual(self.getsignal_mock.call_count, 2)
# Verify various args are passed down to the real command.
pargs = self.popen_mock.call_args[0][0]
self.assertEqual(cmd, pargs)
# Verify various kwargs are passed down to the real command.
pkwargs = self.popen_mock.call_args[1]
for key in ('cwd', 'stdin', 'stdout', 'stderr'):
kwargs.setdefault(key, None)
kwargs.setdefault('shell', False)
kwargs.setdefault('env', mock.ANY)
kwargs['close_fds'] = True
self.longMessage = True
for key in kwargs.keys():
self.assertEqual(kwargs[key], pkwargs[key],
msg='kwargs[%s] mismatch' % key)
def _AssertCrEqual(self, expected, actual):
"""Helper method to compare two CommandResult objects.
This is needed since assertEqual does not know how to compare two
CommandResult objects.
Args:
expected: a CommandResult object, expected result.
actual: a CommandResult object, actual result.
"""
self.assertEqual(expected.cmd, actual.cmd)
self.assertEqual(expected.error, actual.error)
self.assertEqual(expected.output, actual.output)
self.assertEqual(expected.returncode, actual.returncode)
@_ForceLoggingLevel
def _TestCmd(self, cmd, real_cmd, sp_kv=None, rc_kv=None, sudo=False):
"""Factor out common setup logic for testing RunCommand().
Args:
cmd: a string or an array of strings that will be passed to RunCommand.
real_cmd: the real command we expect RunCommand to call (might be
modified to have enter_chroot).
sp_kv: key-value pairs passed to subprocess.Popen().
rc_kv: key-value pairs passed to RunCommand().
sudo: use SudoRunCommand() rather than RunCommand().
"""
if sp_kv is None:
sp_kv = {}
if rc_kv is None:
rc_kv = {}
expected_result = cros_build_lib.CommandResult()
expected_result.cmd = real_cmd
expected_result.error = self.error
expected_result.output = self.output
expected_result.returncode = self.proc_mock.returncode
arg_dict = dict()
for attr in ('close_fds', 'cwd', 'env', 'stdin', 'stdout', 'stderr',
'shell'):
if attr in sp_kv:
arg_dict[attr] = sp_kv[attr]
else:
if attr == 'close_fds':
arg_dict[attr] = True
elif attr == 'shell':
arg_dict[attr] = False
else:
arg_dict[attr] = None
if sudo:
runcmd = cros_build_lib.SudoRunCommand
else:
runcmd = cros_build_lib.RunCommand
with self._MockChecker(real_cmd, ignore_sigint=rc_kv.get('ignore_sigint'),
**sp_kv):
actual_result = runcmd(cmd, **rc_kv)
self._AssertCrEqual(expected_result, actual_result)
def testReturnCodeZeroWithArrayCmd(self, ignore_sigint=False):
"""--enter_chroot=False and --cmd is an array of strings.
Parameterized so this can also be used by some other tests w/ alternate
params to RunCommand().
Args:
ignore_sigint: If True, we'll tell RunCommand to ignore sigint.
"""
self.proc_mock.returncode = 0
cmd_list = ['foo', 'bar', 'roger']
self._TestCmd(cmd_list, cmd_list,
rc_kv=dict(ignore_sigint=ignore_sigint))
def testSignalRestoreNormalCase(self):
"""Test RunCommand() properly sets/restores sigint. Normal case."""
self.testReturnCodeZeroWithArrayCmd(ignore_sigint=True)
def testReturnCodeZeroWithArrayCmdEnterChroot(self):
"""--enter_chroot=True and --cmd is an array of strings."""
self.proc_mock.returncode = 0
cmd_list = ['foo', 'bar', 'roger']
real_cmd = cmd_list
if not cros_build_lib.IsInsideChroot():
real_cmd = ['cros_sdk', '--'] + cmd_list
self._TestCmd(cmd_list, real_cmd, rc_kv=dict(enter_chroot=True))
@_ForceLoggingLevel
def testCommandFailureRaisesError(self, ignore_sigint=False):
"""Verify error raised by communicate() is caught.
Parameterized so this can also be used by some other tests w/ alternate
params to RunCommand().
Args:
ignore_sigint: If True, we'll tell RunCommand to ignore sigint.
"""
cmd = 'test cmd'
self.proc_mock.returncode = 1
with self._MockChecker(['/bin/bash', '-c', cmd],
ignore_sigint=ignore_sigint):
self.assertRaises(cros_build_lib.RunCommandError,
cros_build_lib.RunCommand, cmd, shell=True,
ignore_sigint=ignore_sigint, error_code_ok=False)
@_ForceLoggingLevel
def testSubprocessCommunicateExceptionRaisesError(self, ignore_sigint=False):
"""Verify error raised by communicate() is caught.
Parameterized so this can also be used by some other tests w/ alternate
params to RunCommand().
Args:
ignore_sigint: If True, we'll tell RunCommand to ignore sigint.
"""
cmd = ['test', 'cmd']
self.proc_mock.communicate = mock.MagicMock(side_effect=ValueError)
with self._MockChecker(cmd, ignore_sigint=ignore_sigint):
self.assertRaises(ValueError, cros_build_lib.RunCommand, cmd,
ignore_sigint=ignore_sigint)
def testSignalRestoreExceptionCase(self):
"""Test RunCommand() properly sets/restores sigint. Exception case."""
self.testSubprocessCommunicateExceptionRaisesError(ignore_sigint=True)
def testEnvWorks(self):
"""Test RunCommand(..., env=xyz) works."""
# We'll put this bogus environment together, just to make sure
# subprocess.Popen gets passed it.
rc_env = {'Tom': 'Jerry', 'Itchy': 'Scratchy'}
sp_env = dict(rc_env, LC_MESSAGES='C')
# This is a simple case, copied from testReturnCodeZeroWithArrayCmd()
self.proc_mock.returncode = 0
cmd_list = ['foo', 'bar', 'roger']
# Run. We expect the env= to be passed through from sp (subprocess.Popen)
# to rc (RunCommand).
self._TestCmd(cmd_list, cmd_list,
sp_kv=dict(env=sp_env),
rc_kv=dict(env=rc_env))
def testExtraEnvOnlyWorks(self):
"""Test RunCommand(..., extra_env=xyz) works."""
# We'll put this bogus environment together, just to make sure
# subprocess.Popen gets passed it.
extra_env = {'Pinky' : 'Brain'}
## This is a little bit circular, since the same logic is used to compute
## the value inside, but at least it checks that this happens.
total_env = os.environ.copy()
# The core RunCommand code forces this too.
total_env['LC_MESSAGES'] = 'C'
total_env.update(extra_env)
# This is a simple case, copied from testReturnCodeZeroWithArrayCmd()
self.proc_mock.returncode = 0
cmd_list = ['foo', 'bar', 'roger']
# Run. We expect the env= to be passed through from sp (subprocess.Popen)
# to rc (RunCommand).
self._TestCmd(cmd_list, cmd_list,
sp_kv=dict(env=total_env),
rc_kv=dict(extra_env=extra_env))
def testExtraEnvTooWorks(self):
"""Test RunCommand(..., env=xy, extra_env=z) works."""
# We'll put this bogus environment together, just to make sure
# subprocess.Popen gets passed it.
env = {'Tom': 'Jerry', 'Itchy': 'Scratchy'}
extra_env = {'Pinky': 'Brain'}
total_env = {
'Tom': 'Jerry',
'Itchy': 'Scratchy',
'Pinky': 'Brain',
'LC_MESSAGES': 'C'
}
# This is a simple case, copied from testReturnCodeZeroWithArrayCmd()
self.proc_mock.returncode = 0
cmd_list = ['foo', 'bar', 'roger']
# Run. We expect the env= to be passed through from sp (subprocess.Popen)
# to rc (RunCommand).
self._TestCmd(cmd_list, cmd_list,
sp_kv=dict(env=total_env),
rc_kv=dict(env=env, extra_env=extra_env))
@mock.patch('chromite.lib.cros_build_lib.IsInsideChroot', return_value=False)
def testChrootExtraEnvWorks(self, _inchroot_mock):
"""Test RunCommand(..., enter_chroot=True, env=xy, extra_env=z) works."""
# We'll put this bogus environment together, just to make sure
# subprocess.Popen gets passed it.
env = {'Tom': 'Jerry', 'Itchy': 'Scratchy'}
extra_env = {'Pinky': 'Brain'}
total_env = {
'Tom': 'Jerry',
'Itchy': 'Scratchy',
'Pinky': 'Brain',
'LC_MESSAGES': 'C'
}
# This is a simple case, copied from testReturnCodeZeroWithArrayCmd()
self.proc_mock.returncode = 0
cmd_list = ['foo', 'bar', 'roger']
# Run. We expect the env= to be passed through from sp (subprocess.Popen)
# to rc (RunCommand).
self._TestCmd(cmd_list, ['cros_sdk', 'Pinky=Brain', '--'] + cmd_list,
sp_kv=dict(env=total_env),
rc_kv=dict(env=env, extra_env=extra_env, enter_chroot=True))
def testExceptionEquality(self):
"""Verify equality methods for RunCommandError"""
c1 = cros_build_lib.CommandResult(cmd=['ls', 'arg'], returncode=1)
c2 = cros_build_lib.CommandResult(cmd=['ls', 'arg1'], returncode=1)
c3 = cros_build_lib.CommandResult(cmd=['ls', 'arg'], returncode=2)
e1 = cros_build_lib.RunCommandError('Message 1', c1)
e2 = cros_build_lib.RunCommandError('Message 1', c1)
e_diff_msg = cros_build_lib.RunCommandError('Message 2', c1)
e_diff_cmd = cros_build_lib.RunCommandError('Message 1', c2)
e_diff_code = cros_build_lib.RunCommandError('Message 1', c3)
self.assertEqual(e1, e2)
self.assertNotEqual(e1, e_diff_msg)
self.assertNotEqual(e1, e_diff_cmd)
self.assertNotEqual(e1, e_diff_code)
def testSudoRunCommand(self):
"""Test SudoRunCommand(...) works."""
cmd_list = ['foo', 'bar', 'roger']
sudo_list = ['sudo', '--'] + cmd_list
self.proc_mock.returncode = 0
self._TestCmd(cmd_list, sudo_list, sudo=True)
def testSudoRunCommandShell(self):
"""Test SudoRunCommand(..., shell=True) works."""
cmd = 'foo bar roger'
sudo_list = ['sudo', '--', '/bin/bash', '-c', cmd]
self.proc_mock.returncode = 0
self._TestCmd(cmd, sudo_list, sudo=True,
rc_kv=dict(shell=True))
def testSudoRunCommandEnv(self):
"""Test SudoRunCommand(..., extra_env=z) works."""
cmd_list = ['foo', 'bar', 'roger']
sudo_list = ['sudo', 'shucky=ducky', '--'] + cmd_list
extra_env = {'shucky' : 'ducky'}
self.proc_mock.returncode = 0
self._TestCmd(cmd_list, sudo_list, sudo=True,
rc_kv=dict(extra_env=extra_env))
def testSudoRunCommandUser(self):
"""Test SudoRunCommand(..., user='...') works."""
cmd_list = ['foo', 'bar', 'roger']
sudo_list = ['sudo', '-u', 'MMMMMonster', '--'] + cmd_list
self.proc_mock.returncode = 0
self._TestCmd(cmd_list, sudo_list, sudo=True,
rc_kv=dict(user='MMMMMonster'))
def testSudoRunCommandUserShell(self):
"""Test SudoRunCommand(..., user='...', shell=True) works."""
cmd = 'foo bar roger'
sudo_list = ['sudo', '-u', 'MMMMMonster', '--', '/bin/bash', '-c', cmd]
self.proc_mock.returncode = 0
self._TestCmd(cmd, sudo_list, sudo=True,
rc_kv=dict(user='MMMMMonster', shell=True))
class TestRunCommandOutput(cros_test_lib.TempDirTestCase,
cros_test_lib.OutputTestCase):
"""Tests of RunCommand output options."""
@_ForceLoggingLevel
def testLogStdoutToFile(self):
log = os.path.join(self.tempdir, 'output')
ret = cros_build_lib.RunCommand(
['echo', 'monkeys'], log_stdout_to_file=log)
self.assertEqual(osutils.ReadFile(log), 'monkeys\n')
self.assertIs(ret.output, None)
self.assertIs(ret.error, None)
os.unlink(log)
ret = cros_build_lib.RunCommand(
['sh', '-c', 'echo monkeys3 >&2'],
log_stdout_to_file=log, redirect_stderr=True)
self.assertEqual(ret.error, 'monkeys3\n')
self.assertExists(log)
self.assertEqual(os.path.getsize(log), 0)
os.unlink(log)
ret = cros_build_lib.RunCommand(
['sh', '-c', 'echo monkeys4; echo monkeys5 >&2'],
log_stdout_to_file=log, combine_stdout_stderr=True)
self.assertIs(ret.output, None)
self.assertIs(ret.error, None)
self.assertEqual(osutils.ReadFile(log), 'monkeys4\nmonkeys5\n')
@_ForceLoggingLevel
def testLogStdoutToFileWithOrWithoutAppend(self):
log = os.path.join(self.tempdir, 'output')
ret = cros_build_lib.RunCommand(
['echo', 'monkeys'], log_stdout_to_file=log)
self.assertEqual(osutils.ReadFile(log), 'monkeys\n')
self.assertIs(ret.output, None)
self.assertIs(ret.error, None)
# Without append
ret = cros_build_lib.RunCommand(
['echo', 'monkeys2'], log_stdout_to_file=log)
self.assertEqual(osutils.ReadFile(log), 'monkeys2\n')
self.assertIs(ret.output, None)
self.assertIs(ret.error, None)
# With append
ret = cros_build_lib.RunCommand(
['echo', 'monkeys3'], append_to_file=True, log_stdout_to_file=log)
self.assertEqual(osutils.ReadFile(log), 'monkeys2\nmonkeys3\n')
self.assertIs(ret.output, None)
self.assertIs(ret.error, None)
def _CaptureRunCommand(self, command, mute_output):
"""Capture a RunCommand() output with the specified |mute_output|.
Args:
command: command to send to RunCommand().
mute_output: RunCommand() |mute_output| parameter.
Returns:
A (stdout, stderr) pair of captured output.
"""
with self.OutputCapturer() as output:
cros_build_lib.RunCommand(command,
debug_level=logging.DEBUG,
mute_output=mute_output)
return (output.GetStdout(), output.GetStderr())
@_ForceLoggingLevel
def testSubprocessMuteOutput(self):
"""Test RunCommand |mute_output| parameter."""
command = ['sh', '-c', 'echo foo; echo bar >&2']
# Always mute: we shouldn't get any output.
self.assertEqual(self._CaptureRunCommand(command, mute_output=True),
('', ''))
# Mute based on |debug_level|: we should't get any output.
self.assertEqual(self._CaptureRunCommand(command, mute_output=None),
('', ''))
# Never mute: we should get 'foo\n' and 'bar\n'.
self.assertEqual(self._CaptureRunCommand(command, mute_output=False),
('foo\n', 'bar\n'))
def testRunCommandAtNoticeLevel(self):
"""Ensure that RunCommand prints output when mute_output is False."""
# Needed by cros_sdk and brillo/cros chroot.
with self.OutputCapturer():
cros_build_lib.RunCommand(['echo', 'foo'], mute_output=False,
error_code_ok=True, print_cmd=False,
debug_level=logging.NOTICE)
self.AssertOutputContainsLine('foo')
def testRunCommandRedirectStdoutStderrOnCommandError(self):
"""Tests that stderr is captured when RunCommand raises."""
with self.assertRaises(cros_build_lib.RunCommandError) as cm:
cros_build_lib.RunCommand(['cat', '/'], redirect_stderr=True)
self.assertIsNotNone(cm.exception.result.error)
self.assertNotEqual('', cm.exception.result.error)
def _CaptureLogOutput(self, cmd, **kwargs):
"""Capture logging output of RunCommand."""
log = os.path.join(self.tempdir, 'output')
fh = logging.FileHandler(log)
fh.setLevel(logging.DEBUG)
logging.getLogger().addHandler(fh)
cros_build_lib.RunCommand(cmd, **kwargs)
logging.getLogger().removeHandler(fh)
return osutils.ReadFile(log)
@_ForceLoggingLevel
def testLogOutput(self):
"""Normal log_output, stdout followed by stderr."""
cmd = 'echo Greece; echo Italy >&2; echo Spain'
log_output = ('RunCommand: /bin/bash -c '
"'echo Greece; echo Italy >&2; echo Spain'\n"
'(stdout):\nGreece\nSpain\n\n(stderr):\nItaly\n\n')
self.assertEquals(self._CaptureLogOutput(cmd, shell=True, log_output=True),
log_output)
class TestTimedSection(cros_test_lib.TestCase):
"""Tests for TimedSection context manager."""
def testTimerValues(self):
"""Make sure simple stuff works."""
with cros_build_lib.TimedSection() as timer:
# While running, we have access to the start time.
self.assertIsInstance(timer.start, datetime.datetime)
self.assertIsNone(timer.finish)
self.assertIsNone(timer.delta)
# After finishing, all values should be set.
self.assertIsInstance(timer.start, datetime.datetime)
self.assertIsInstance(timer.finish, datetime.datetime)
self.assertIsInstance(timer.delta, datetime.timedelta)
class TestListFiles(cros_test_lib.TempDirTestCase):
"""Tests of ListFiles funciton."""
def _CreateNestedDir(self, dir_structure):
for entry in dir_structure:
full_path = os.path.join(os.path.join(self.tempdir, entry))
# ensure dirs are created
try:
os.makedirs(os.path.dirname(full_path))
if full_path.endswith('/'):
# we only want to create directories
return
except OSError as err:
if err.errno == errno.EEXIST:
# we don't care if the dir already exists
pass
else:
raise
# create dummy files
tmp = open(full_path, 'w')
tmp.close()
def testTraverse(self):
"""Test that we are traversing the directory properly."""
dir_structure = ['one/two/test.txt', 'one/blah.py',
'three/extra.conf']
self._CreateNestedDir(dir_structure)
files = cros_build_lib.ListFiles(self.tempdir)
for f in files:
f = f.replace(self.tempdir, '').lstrip('/')
if f not in dir_structure:
self.fail('%s was not found in %s' % (f, dir_structure))
def testEmptyFilePath(self):
"""Test that we return nothing when directories are empty."""
dir_structure = ['one/', 'two/', 'one/a/']
self._CreateNestedDir(dir_structure)
files = cros_build_lib.ListFiles(self.tempdir)
self.assertEqual(files, [])
def testNoSuchDir(self):
try:
cros_build_lib.ListFiles(os.path.join(self.tempdir, 'missing'))
except OSError as err:
self.assertEqual(err.errno, errno.ENOENT)
class HelperMethodSimpleTests(cros_test_lib.OutputTestCase):
"""Tests for various helper methods without using mocks."""
def _TestChromeosVersion(self, test_str, expected=None):
actual = cros_build_lib.GetChromeosVersion(test_str)
self.assertEqual(expected, actual)
def testGetChromeosVersionWithValidVersionReturnsValue(self):
expected = '0.8.71.2010_09_10_1530'
test_str = ' CHROMEOS_VERSION_STRING=0.8.71.2010_09_10_1530 '
self._TestChromeosVersion(test_str, expected)
def testGetChromeosVersionWithMultipleVersionReturnsFirstMatch(self):
expected = '0.8.71.2010_09_10_1530'
test_str = (' CHROMEOS_VERSION_STRING=0.8.71.2010_09_10_1530 '
' CHROMEOS_VERSION_STRING=10_1530 ')
self._TestChromeosVersion(test_str, expected)
def testGetChromeosVersionWithInvalidVersionReturnsDefault(self):
test_str = ' CHROMEOS_VERSION_STRING=invalid_version_string '
self._TestChromeosVersion(test_str)
def testGetChromeosVersionWithEmptyInputReturnsDefault(self):
self._TestChromeosVersion('')
def testGetChromeosVersionWithNoneInputReturnsDefault(self):
self._TestChromeosVersion(None)
def testUserDateTime(self):
"""Test with a raw time value."""
expected = 'Mon, 16 Jun 1980 05:03:20 -0700 (PDT)'
with cros_test_lib.SetTimeZone('US/Pacific'):
timeval = 330005000
self.assertEqual(cros_build_lib.UserDateTimeFormat(timeval=timeval),
expected)
def testUserDateTimeDateTime(self):
"""Test with a datetime object."""
expected = 'Mon, 16 Jun 1980 00:00:00 -0700 (PDT)'
with cros_test_lib.SetTimeZone('US/Pacific'):
timeval = datetime.datetime(1980, 6, 16)
self.assertEqual(cros_build_lib.UserDateTimeFormat(timeval=timeval),
expected)
def testUserDateTimeDateTimeInWinter(self):
"""Test that we correctly switch from PDT to PST."""
expected = 'Wed, 16 Jan 1980 00:00:00 -0800 (PST)'
with cros_test_lib.SetTimeZone('US/Pacific'):
timeval = datetime.datetime(1980, 1, 16)
self.assertEqual(cros_build_lib.UserDateTimeFormat(timeval=timeval),
expected)
def testUserDateTimeDateTimeInEST(self):
"""Test that we correctly switch from PDT to EST."""
expected = 'Wed, 16 Jan 1980 00:00:00 -0500 (EST)'
with cros_test_lib.SetTimeZone('US/Eastern'):
timeval = datetime.datetime(1980, 1, 16)
self.assertEqual(cros_build_lib.UserDateTimeFormat(timeval=timeval),
expected)
def testUserDateTimeCurrentTime(self):
"""Test that we can get the current time."""
cros_build_lib.UserDateTimeFormat()
def testParseUserDateTimeFormat(self):
stringtime = cros_build_lib.UserDateTimeFormat(100000.0)
self.assertEqual(cros_build_lib.ParseUserDateTimeFormat(stringtime),
100000.0)
def testMachineDetails(self):
"""Verify we don't crash."""
contents = cros_build_lib.MachineDetails()
self.assertNotEqual(contents, '')
self.assertEqual(contents[-1], '\n')
def testGetCommonPathPrefix(self):
"""Test helper function correctness."""
self.assertEqual('/a', cros_build_lib.GetCommonPathPrefix(['/a/b']))
self.assertEqual('/a', cros_build_lib.GetCommonPathPrefix(['/a/']))
self.assertEqual('/', cros_build_lib.GetCommonPathPrefix(['/a']))
self.assertEqual(
'/a', cros_build_lib.GetCommonPathPrefix(['/a/b', '/a/c']))
self.assertEqual(
'/a/b', cros_build_lib.GetCommonPathPrefix(['/a/b/c', '/a/b/d']))
self.assertEqual('/', cros_build_lib.GetCommonPathPrefix(['/a/b', '/c/d']))
self.assertEqual(
'/', cros_build_lib.GetCommonPathPrefix(['/a/b', '/aa/b']))
def testFormatDetailedTraceback(self):
"""Verify various aspects of the traceback"""
# When there is no active exception, should output nothing.
data = cros_build_lib.FormatDetailedTraceback()
self.assertEqual(data, '')
# Generate a local exception and test it.
try:
varint = 12345
varstr = 'vaaars'
raise Exception('fooood')
except Exception:
lines = cros_build_lib.FormatDetailedTraceback().splitlines()
# Check basic start/finish lines.
self.assertIn('Traceback ', lines[0])
self.assertIn('Exception: fooood', lines[-1])
# Verify some local vars get correctly decoded.
for line in lines:
if 'varint' in line:
self.assertIn('int', line)
self.assertIn(str(varint), line)
break
else:
raise AssertionError('could not find local "varint" in output:\n\n%s' %
''.join(lines))
for line in lines:
if 'varstr' in line:
self.assertIn('str', line)
self.assertIn(varstr, line)
break
else:
raise AssertionError('could not find local "varstr" in output:\n\n%s' %
''.join(lines))
def _testPrintDetailedTraceback(self, check_stdout):
"""Helper method for testing PrintDetailedTraceback."""
try:
varint = 12345
varstr = 'vaaars'
raise Exception('fooood')
except Exception:
with self.OutputCapturer() as output:
if check_stdout is None:
stream = None
elif check_stdout:
stream = sys.stdout
else:
stream = sys.stderr
cros_build_lib.PrintDetailedTraceback(file=stream)
# The non-selected stream shouldn't have anything.
data = output.GetStderr() if check_stdout else output.GetStdout()
self.assertEqual(data, '')
kwargs = {
'check_stdout': check_stdout,
'check_stderr': not check_stdout,
}
self.AssertOutputContainsLine(r'Traceback ', **kwargs)
self.AssertOutputContainsLine(r'Exception: fooood', **kwargs)
self.AssertOutputContainsLine(r'varint.*int.*%s' % varint, **kwargs)
self.AssertOutputContainsLine(r'varstr.*str.*%s' % varstr, **kwargs)
def testPrintDetailedTracebackStderrDefault(self):
"""Verify default (stderr) handling"""
self._testPrintDetailedTraceback(None)
def testPrintDetailedTracebackStderr(self):
"""Verify stderr handling"""
self._testPrintDetailedTraceback(False)
def testPrintDetailedTracebackStdout(self):
"""Verify stdout handling"""
self._testPrintDetailedTraceback(True)
class TestInput(cros_test_lib.MockOutputTestCase):
"""Tests of input gathering functionality."""
def testGetInput(self):
"""Verify GetInput() basic behavior."""
response = 'Some response'
if sys.version_info.major < 3:
self.PatchObject(builtins, 'raw_input', return_value=response)
self.PatchObject(builtins, 'input', return_value=response)
self.assertEquals(response, cros_build_lib.GetInput('prompt'))
def testBooleanPrompt(self):
"""Verify BooleanPrompt() full behavior."""
m = self.PatchObject(cros_build_lib, 'GetInput')
m.return_value = ''
self.assertTrue(cros_build_lib.BooleanPrompt())
self.assertFalse(cros_build_lib.BooleanPrompt(default=False))
m.return_value = 'yes'
self.assertTrue(cros_build_lib.BooleanPrompt())
m.return_value = 'ye'
self.assertTrue(cros_build_lib.BooleanPrompt())
m.return_value = 'y'
self.assertTrue(cros_build_lib.BooleanPrompt())
m.return_value = 'no'
self.assertFalse(cros_build_lib.BooleanPrompt())
m.return_value = 'n'
self.assertFalse(cros_build_lib.BooleanPrompt())
def testBooleanShellValue(self):
"""Verify BooleanShellValue() inputs work as expected"""
for v in (None,):
self.assertTrue(cros_build_lib.BooleanShellValue(v, True))
self.assertFalse(cros_build_lib.BooleanShellValue(v, False))
for v in (1234, '', 'akldjsf', '"'):
self.assertRaises(ValueError, cros_build_lib.BooleanShellValue, v, True)
self.assertTrue(cros_build_lib.BooleanShellValue(v, True, msg=''))
self.assertFalse(cros_build_lib.BooleanShellValue(v, False, msg=''))
for v in ('yes', 'YES', 'YeS', 'y', 'Y', '1', 'true', 'True', 'TRUE',):
self.assertTrue(cros_build_lib.BooleanShellValue(v, True))
self.assertTrue(cros_build_lib.BooleanShellValue(v, False))
for v in ('no', 'NO', 'nO', 'n', 'N', '0', 'false', 'False', 'FALSE',):
self.assertFalse(cros_build_lib.BooleanShellValue(v, True))
self.assertFalse(cros_build_lib.BooleanShellValue(v, False))
def testGetChoiceLists(self):
"""Verify GetChoice behavior w/lists."""
m = self.PatchObject(cros_build_lib, 'GetInput')
m.return_value = '1'
ret = cros_build_lib.GetChoice('title', ['a', 'b', 'c'])
self.assertEqual(ret, 1)
def testGetChoiceGenerator(self):
"""Verify GetChoice behavior w/generators."""
m = self.PatchObject(cros_build_lib, 'GetInput')
m.return_value = '2'
ret = cros_build_lib.GetChoice('title', list(range(3)))
self.assertEqual(ret, 2)
def testGetChoiceWindow(self):
"""Verify GetChoice behavior w/group_size set."""
m = self.PatchObject(cros_build_lib, 'GetInput')
cnt = [0]
def _Gen():
while True:
cnt[0] += 1
yield 'a'
m.side_effect = ['\n', '2']
ret = cros_build_lib.GetChoice('title', _Gen(), group_size=2)
self.assertEqual(ret, 2)
# Verify we showed the correct number of times.
self.assertEqual(cnt[0], 5)
class TestContextManagerStack(cros_test_lib.TestCase):
"""Test the ContextManagerStack class."""
def test(self):
invoked = []
counter = iter(itertools.count()).next
def _mk_kls(has_exception=None, exception_kls=None, suppress=False):
class foon(object):
"""Simple context manager which runs checks on __exit__."""
marker = counter()
def __enter__(self):
return self
# pylint: disable=no-self-argument,bad-context-manager
def __exit__(obj_self, exc_type, exc, traceback):
invoked.append(obj_self.marker)
if has_exception is not None:
self.assertTrue(all(x is not None
for x in (exc_type, exc, traceback)))
self.assertTrue(exc_type == has_exception)
if exception_kls:
raise exception_kls()
if suppress:
return True
return foon
with cros_build_lib.ContextManagerStack() as stack:
# Note... these tests are in reverse, since the exception
# winds its way up the stack.
stack.Add(_mk_kls())
stack.Add(_mk_kls(ValueError, suppress=True))
stack.Add(_mk_kls(IndexError, exception_kls=ValueError))
stack.Add(_mk_kls(IndexError))
stack.Add(_mk_kls(exception_kls=IndexError))
stack.Add(_mk_kls())
self.assertEqual(invoked, list(range(5, -1, -1)))
class TestManifestCheckout(cros_test_lib.TempDirTestCase):
"""Tests for ManifestCheckout functionality."""
def setUp(self):
self.manifest_dir = os.path.join(self.tempdir, '.repo', 'manifests')
# Initialize a repo instance here.
local_repo = os.path.join(constants.SOURCE_ROOT, '.repo/repo/.git')
# TODO(evanhernandez): This is a hack. Find a way to simplify this test.
# We used to use the current checkout's manifests.git, but that caused
# problems in production environemnts.
remote_manifests = os.path.join(self.tempdir, 'remote', 'manifests.git')
osutils.SafeMakedirs(remote_manifests)
git.Init(remote_manifests)
default_manifest = os.path.join(remote_manifests, 'default.xml')
osutils.WriteFile(
default_manifest,
'<?xml version="1.0" encoding="UTF-8"?><manifest></manifest>')
git.AddPath(default_manifest)
git.Commit(remote_manifests, 'dummy commit', allow_empty=True)
git.CreateBranch(remote_manifests, 'default')
git.CreateBranch(remote_manifests, 'release-R23-2913.B')
git.CreateBranch(remote_manifests, 'firmware-link-')
# Create a copy of our existing manifests.git, but rewrite it so it
# looks like a remote manifests.git. This is to avoid hitting the
# network, and speeds things up in general.
local_manifests = 'file://%s' % remote_manifests
temp_manifests = os.path.join(self.tempdir, 'manifests.git')
git.RunGit(self.tempdir, ['clone', '-n', '--bare', local_manifests])
git.RunGit(temp_manifests,
['fetch', '-f', '-u', local_manifests,
'refs/remotes/origin/*:refs/heads/*'])
git.RunGit(temp_manifests, ['branch', '-D', 'default'])
repo = repository.RepoRepository(
temp_manifests, self.tempdir,
repo_url='file://%s' % local_repo, repo_branch='default')
repo.Initialize()
self.active_manifest = os.path.realpath(
os.path.join(self.tempdir, '.repo', 'manifest.xml'))
def testManifestInheritance(self):
osutils.WriteFile(self.active_manifest, """
<manifest>
<include name="include-target.xml" />
<include name="empty.xml" />
<project name="monkeys" path="baz" remote="foon" revision="master" />
</manifest>""")
# First, verify it properly explodes if the include can't be found.
self.assertRaises(EnvironmentError,
git.ManifestCheckout, self.tempdir)
# Next, verify it can read an empty manifest; this is to ensure
# that we can point Manifest at the empty manifest without exploding,
# same for ManifestCheckout; this sort of thing is primarily useful
# to ensure no step of an include assumes everything is yet assembled.
empty_path = os.path.join(self.manifest_dir, 'empty.xml')
osutils.WriteFile(empty_path, '<manifest/>')
git.Manifest(empty_path)
git.ManifestCheckout(self.tempdir, manifest_path=empty_path)
# Next, verify include works.
osutils.WriteFile(
os.path.join(self.manifest_dir, 'include-target.xml'),
"""
<manifest>
<remote name="foon" fetch="http://localhost" />
</manifest>""")
manifest = git.ManifestCheckout(self.tempdir)
self.assertEqual(list(manifest.checkouts_by_name), ['monkeys'])
self.assertEqual(list(manifest.remotes), ['foon'])
def testGetManifestsBranch(self):
# pylint: disable=protected-access
func = git.ManifestCheckout._GetManifestsBranch
manifest = self.manifest_dir
repo_root = self.tempdir
# pylint: disable=unused-argument
def reconfig(merge='master', origin='origin'):
if merge is not None:
merge = 'refs/heads/%s' % merge
for key in ('merge', 'origin'):
val = locals()[key]
key = 'branch.default.%s' % key
if val is None:
git.RunGit(manifest, ['config', '--unset', key], error_code_ok=True)
else:
git.RunGit(manifest, ['config', key, val])
# First, verify our assumptions about a fresh repo init are correct.
self.assertEqual('default', git.GetCurrentBranch(manifest))
self.assertEqual('master', func(repo_root))
# Ensure we can handle a missing origin; this can occur jumping between
# branches, and can be worked around.
reconfig(origin=None)
self.assertEqual('default', git.GetCurrentBranch(manifest))
self.assertEqual('master', func(repo_root))
def assertExcept(message, **kwargs):
reconfig(**kwargs)
self.assertRaises2(OSError, func, repo_root, ex_msg=message,
check_attrs={'errno': errno.ENOENT})
# No merge target means the configuration isn't usable, period.
assertExcept('git tracking configuration for that branch is broken',
merge=None)
# Ensure we detect if we're on the wrong branch, even if it has
# tracking setup.
git.RunGit(manifest, ['checkout', '-t', 'origin/master', '-b', 'test'])
assertExcept("It should be checked out to 'default'")
# Ensure we handle detached HEAD w/ an appropriate exception.
git.RunGit(manifest, ['checkout', '--detach', 'test'])
assertExcept("It should be checked out to 'default'")
# Finally, ensure that if the default branch is non-existant, we still throw
# a usable exception.
git.RunGit(manifest, ['branch', '-d', 'default'])
assertExcept("It should be checked out to 'default'")
def testGitMatchBranchName(self):
git_repo = os.path.join(self.tempdir, '.repo', 'manifests')
branches = git.MatchBranchName(git_repo, 'default', namespace='')
self.assertEqual(branches, ['refs/heads/default'])
branches = git.MatchBranchName(git_repo, 'default', namespace='refs/heads/')
self.assertEqual(branches, ['default'])
branches = git.MatchBranchName(git_repo, 'origin/f.*link',
namespace='refs/remotes/')
self.assertTrue('firmware-link-' in branches[0])
branches = git.MatchBranchName(git_repo, 'r23')
self.assertEqual(branches, ['refs/remotes/origin/release-R23-2913.B'])
class Test_iflatten_instance(cros_test_lib.TestCase):
"""Test iflatten_instance function."""
def test_it(self):
f = lambda x, **kwargs: list(cros_build_lib.iflatten_instance(x, **kwargs))
self.assertEqual([1, 2], f([1, 2]))
self.assertEqual([1, '2a'], f([1, '2a']))
self.assertEqual([1, 2, 'b'], f([1, [2, 'b']]))
self.assertEqual([1, 2, 'f', 'd', 'a', 's'],
f([1, 2, ('fdas',)], terminate_on_kls=int))
self.assertEqual([''], f(''))
class SafeRunTest(cros_test_lib.TestCase):
"""Tests SafeRunTest functionality."""
def _raise_exception(self, e):
raise e
def testRunsSafely(self):
"""Verify that we are robust to exceptions."""
def append_val(value):
call_list.append(value)
call_list = []
f_list = [functools.partial(append_val, 1),
functools.partial(self._raise_exception,
Exception('testRunsSafely exception.')),
functools.partial(append_val, 2)]
self.assertRaises(Exception, cros_build_lib.SafeRun, f_list)
self.assertEquals(call_list, [1, 2])
def testRaisesFirstException(self):
"""Verify we raise the first exception when multiple are encountered."""
class E1(Exception):
"""Simple exception class."""
pass
class E2(Exception):
"""Simple exception class."""
pass
f_list = [functools.partial(self._raise_exception, e) for e in [E1, E2]]
self.assertRaises(E1, cros_build_lib.SafeRun, f_list)
def testCombinedRaise(self):
"""Raises a RuntimeError with exceptions combined."""
f_list = [functools.partial(self._raise_exception, Exception())] * 3
self.assertRaises(RuntimeError, cros_build_lib.SafeRun, f_list,
combine_exceptions=True)
class FrozenAttributesTest(cros_test_lib.TestCase):
"""Tests FrozenAttributesMixin functionality."""
class DummyClass(object):
"""Any class that does not override __setattr__."""
class SetattrClass(object):
"""Class that does override __setattr__."""
SETATTR_OFFSET = 10
def __setattr__(self, attr, value):
"""Adjust value here to later confirm that this code ran."""
object.__setattr__(self, attr, self.SETATTR_OFFSET + value)
def _TestBasics(self, cls):
# pylint: disable=attribute-defined-outside-init
def _Expected(val):
return getattr(cls, 'SETATTR_OFFSET', 0) + val
obj = cls()
obj.a = 1
obj.b = 2
self.assertEquals(_Expected(1), obj.a)
self.assertEquals(_Expected(2), obj.b)
obj.Freeze()
self.assertRaises(cros_build_lib.AttributeFrozenError, setattr, obj, 'a', 3)
self.assertEquals(_Expected(1), obj.a)
self.assertRaises(cros_build_lib.AttributeFrozenError, setattr, obj, 'c', 3)
self.assertFalse(hasattr(obj, 'c'))
def testFrozenByMetaclass(self):
"""Test attribute freezing with FrozenAttributesClass."""
@six.add_metaclass(cros_build_lib.FrozenAttributesClass)
class DummyByMeta(self.DummyClass):
"""Class that freezes DummyClass using metaclass construct."""
self._TestBasics(DummyByMeta)
@six.add_metaclass(cros_build_lib.FrozenAttributesClass)
class SetattrByMeta(self.SetattrClass):
"""Class that freezes SetattrClass using metaclass construct."""
self._TestBasics(SetattrByMeta)
def testFrozenByMixinFirst(self):
"""Test attribute freezing with FrozenAttributesMixin first in hierarchy."""
class Dummy(cros_build_lib.FrozenAttributesMixin, self.DummyClass):
"""Class that freezes DummyClass using mixin construct."""
self._TestBasics(Dummy)
class Setattr(cros_build_lib.FrozenAttributesMixin, self.SetattrClass):
"""Class that freezes SetattrClass using mixin construct."""
self._TestBasics(Setattr)
def testFrozenByMixinLast(self):
"""Test attribute freezing with FrozenAttributesMixin last in hierarchy."""
class Dummy(self.DummyClass, cros_build_lib.FrozenAttributesMixin):
"""Class that freezes DummyClass using mixin construct."""
self._TestBasics(Dummy)
class Setattr(self.SetattrClass, cros_build_lib.FrozenAttributesMixin):
"""Class that freezes SetattrClass using mixin construct."""
self._TestBasics(Setattr)
class TestGetIPv4Address(cros_test_lib.RunCommandTestCase):
"""Tests the GetIPv4Address function."""
IP_GLOBAL_OUTPUT = """
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 16436 qdisc noqueue state UNKNOWN
link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
2: eth0: <NO-CARRIER,BROADCAST,MULTICAST,UP> mtu 1500 qdisc pfifo_fast state \
DOWN qlen 1000
link/ether cc:cc:cc:cc:cc:cc brd ff:ff:ff:ff:ff:ff
3: eth1: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP \
qlen 1000
link/ether dd:dd:dd:dd:dd:dd brd ff:ff:ff:ff:ff:ff
inet 111.11.11.111/22 brd 111.11.11.255 scope global eth1
inet6 cdef:0:cdef:cdef:cdef:cdef:cdef:cdef/64 scope global dynamic
valid_lft 2592000sec preferred_lft 604800sec
"""
def testGetIPv4AddressParseResult(self):
"""Verifies we can parse the output and get correct IP address."""
self.rc.AddCmdResult(partial_mock.In('ip'), output=self.IP_GLOBAL_OUTPUT)
self.assertEqual(cros_build_lib.GetIPv4Address(), '111.11.11.111')
def testGetIPv4Address(self):
"""Tests that correct shell commmand is called."""
cros_build_lib.GetIPv4Address(global_ip=False, dev='eth0')
self.rc.assertCommandContains(
['ip', 'addr', 'show', 'scope', 'host', 'dev', 'eth0'])
cros_build_lib.GetIPv4Address(global_ip=True)
self.rc.assertCommandContains(['ip', 'addr', 'show', 'scope', 'global'])
class TestGetHostname(cros_test_lib.MockTestCase):
"""Tests GetHostName & GetHostDomain functionality."""
def setUp(self):
self.gethostname_mock = self.PatchObject(
socket, 'gethostname', return_value='m!!n')
self.gethostbyaddr_mock = self.PatchObject(
socket, 'gethostbyaddr', return_value=(
'm!!n.google.com', ('cow', 'bar',), ('127.0.0.1.a',)))
def testGetHostNameNonQualified(self):
"""Verify non-qualified behavior"""
self.assertEqual(cros_build_lib.GetHostName(), 'm!!n')
def testGetHostNameFullyQualified(self):
"""Verify fully qualified behavior"""
self.assertEqual(cros_build_lib.GetHostName(fully_qualified=True),
'm!!n.google.com')
def testGetHostNameBadDns(self):
"""Do not fail when the user's dns is bad"""
self.gethostbyaddr_mock.side_effect = socket.gaierror('should be caught')
self.assertEqual(cros_build_lib.GetHostName(), 'm!!n')
def testGetHostDomain(self):
"""Verify basic behavior"""
self.assertEqual(cros_build_lib.GetHostDomain(), 'google.com')
def testHostIsCIBuilder(self):
"""Test HostIsCIBuilder."""
fq_hostname_golo = 'test.golo.chromium.org'
fq_hostname_gce_1 = 'test.chromeos-bot.internal'
fq_hostname_gce_2 = 'test.chrome.corp.google.com'
fq_hostname_invalid = 'test'
self.assertTrue(cros_build_lib.HostIsCIBuilder(fq_hostname_golo))
self.assertTrue(cros_build_lib.HostIsCIBuilder(fq_hostname_gce_1))
self.assertTrue(cros_build_lib.HostIsCIBuilder(fq_hostname_gce_2))
self.assertFalse(cros_build_lib.HostIsCIBuilder(fq_hostname_invalid))
self.assertFalse(cros_build_lib.HostIsCIBuilder(
fq_hostname=fq_hostname_golo, gce_only=True))
self.assertFalse(cros_build_lib.HostIsCIBuilder(
fq_hostname=fq_hostname_gce_1, golo_only=True))
class GetImageDiskPartitionInfoTests(cros_test_lib.RunCommandTestCase):
"""Tests the GetImageDiskPartitionInfo function."""
SAMPLE_PARTED = """/foo/chromiumos_qemu_image.bin:\
2271240192B:file:512:512:gpt::;
11:32768B:8421375B:8388608B::RWFW:;
6:8421376B:8421887B:512B::KERN-C:;
7:8421888B:8422399B:512B::ROOT-C:;
9:8422400B:8422911B:512B::reserved:;
10:8422912B:8423423B:512B::reserved:;
2:10485760B:27262975B:16777216B::KERN-A:;
4:27262976B:44040191B:16777216B::KERN-B:;
8:44040192B:60817407B:16777216B:ext4:OEM:msftdata;
12:127926272B:161480703B:33554432B:fat16:EFI-SYSTEM:boot, esp;
5:161480704B:163577855B:2097152B::ROOT-B:;
3:163577856B:2260729855B:2097152000B:ext2:ROOT-A:;
1:2260729856B:2271215615B:10485760B:ext2:STATE:msftdata;
"""
SAMPLE_CGPT = """
start size part contents
0 1 PMBR (Boot GUID: 88FB7EB8-2B3F-B943-B933-\
EEC571FFB6E1)
1 1 Pri GPT header
2 32 Pri GPT table
1921024 2097152 1 Label: "STATE"
Type: Linux data
UUID: EEBD83BE-397E-BD44-878B-0DDDD5A5C510
20480 32768 2 Label: "KERN-A"
Type: ChromeOS kernel
UUID: 7007C2F3-08E5-AB40-A4BC-FF5B01F5460D
Attr: priority=15 tries=15 successful=1
1101824 819200 3 Label: "ROOT-A"
Type: ChromeOS rootfs
UUID: F4C5C3AD-027F-894B-80CD-3DEC57932948
53248 32768 4 Label: "KERN-B"
Type: ChromeOS kernel
UUID: C85FB478-404C-8741-ADB8-11312A35880D
Attr: priority=0 tries=0 successful=0
282624 819200 5 Label: "ROOT-B"
Type: ChromeOS rootfs
UUID: A99F4231-1EC3-C542-AC0C-DF3729F5DB07
16448 1 6 Label: "KERN-C"
Type: ChromeOS kernel
UUID: 81F0E336-FAC9-174D-A08C-864FE627B637
Attr: priority=0 tries=0 successful=0
16449 1 7 Label: "ROOT-C"
Type: ChromeOS rootfs
UUID: 9E127FCA-30C1-044E-A5F2-DF74E6932692
86016 32768 8 Label: "OEM"
Type: Linux data
UUID: 72986347-A37C-684F-9A19-4DBAF41C55A9
16450 1 9 Label: "reserved"
Type: ChromeOS reserved
UUID: BA85A0A7-1850-964D-8EF8-6707AC106C3A
16451 1 10 Label: "reserved"
Type: ChromeOS reserved
UUID: 16C9EC9B-50FA-DD46-98DC-F781360817B4
64 16384 11 Label: "RWFW"
Type: ChromeOS firmware
UUID: BE8AECB9-4F78-7C44-8F23-5A9273B7EC8F
249856 32768 12 Label: "EFI-SYSTEM"
Type: EFI System Partition
UUID: 88FB7EB8-2B3F-B943-B933-EEC571FFB6E1
4050847 32 Sec GPT table
4050879 1 Sec GPT header
"""
def testCgpt(self):
"""Tests that we can list all partitions with `cgpt` correctly."""
self.PatchObject(cros_build_lib, 'IsInsideChroot', return_value=True)
self.rc.AddCmdResult(partial_mock.Ignore(), output=self.SAMPLE_CGPT)
partitions = cros_build_lib.GetImageDiskPartitionInfo('...')
part_dict = {p.name: p for p in partitions}
self.assertEqual(part_dict['STATE'].start, 983564288)
self.assertEqual(part_dict['STATE'].size, 1073741824)
self.assertEqual(part_dict['STATE'].number, 1)
self.assertEqual(part_dict['STATE'].name, 'STATE')
self.assertEqual(part_dict['EFI-SYSTEM'].start, 249856 * 512)
self.assertEqual(part_dict['EFI-SYSTEM'].size, 32768 * 512)
self.assertEqual(part_dict['EFI-SYSTEM'].number, 12)
self.assertEqual(part_dict['EFI-SYSTEM'].name, 'EFI-SYSTEM')
self.assertEqual(12, len(partitions))
def testNormalPath(self):
self.PatchObject(cros_build_lib, 'IsInsideChroot', return_value=False)
self.rc.AddCmdResult(partial_mock.Ignore(), output=self.SAMPLE_PARTED)
partitions = cros_build_lib.GetImageDiskPartitionInfo('_ignored')
part_dict = {p.name: p for p in partitions}
self.assertEqual(12, len(partitions))
self.assertEqual(1, part_dict['STATE'].number)
self.assertEqual(2097152000, part_dict['ROOT-A'].size)
def testKeyedByNumber(self):
self.PatchObject(cros_build_lib, 'IsInsideChroot', return_value=False)
self.rc.AddCmdResult(partial_mock.Ignore(), output=self.SAMPLE_PARTED)
partitions = cros_build_lib.GetImageDiskPartitionInfo(
'_ignored'
)
part_dict = {p.number: p for p in partitions}
self.assertEqual(12, len(part_dict))
self.assertEqual('STATE', part_dict[1].name)
self.assertEqual(2097152000, part_dict[3].size)
self.assertEqual('reserved', part_dict[9].name)
self.assertEqual('reserved', part_dict[10].name)
def testChangeUnitInsideChroot(self):
self.PatchObject(cros_build_lib, 'IsInsideChroot', return_value=True)
self.rc.AddCmdResult(partial_mock.Ignore(), output=self.SAMPLE_CGPT)
partitions = cros_build_lib.GetImageDiskPartitionInfo('_ignored')
part_dict = {p.name: p for p in partitions}
self.assertEqual(part_dict['STATE'].start, 983564288)
self.assertEqual(part_dict['STATE'].size, 1073741824)
class DummyOutput(object):
"""Object with a component called output."""
def __init__(self, output):
self.output = output
class CreateTarballTests(cros_test_lib.TempDirTestCase):
"""Test the CreateTarball function."""
def setUp(self):
"""Create files/dirs needed for tar test."""
self.target = os.path.join(self.tempdir, 'test.tar.xz')
self.inputDir = os.path.join(self.tempdir, 'inputs')
self.inputs = [
'inputA',
'inputB',
'sub/subfile',
'sub2/subfile',
]
self.inputsWithDirs = [
'inputA',
'inputB',
'sub',
'sub2',
]
# Create the input files.
for i in self.inputs:
osutils.WriteFile(os.path.join(self.inputDir, i), i, makedirs=True)
def testSuccess(self):
"""Create a tarfile."""
cros_build_lib.CreateTarball(self.target, self.inputDir,
inputs=self.inputs)
def testSuccessWithDirs(self):
"""Create a tarfile."""
cros_build_lib.CreateTarball(self.target, self.inputDir,
inputs=self.inputsWithDirs)
def testSuccessWithTooManyFiles(self):
"""Test a tarfile creation with -T /dev/stdin."""
# pylint: disable=protected-access
num_inputs = cros_build_lib._THRESHOLD_TO_USE_T_FOR_TAR + 1
inputs = ['input%s' % x for x in range(num_inputs)]
largeInputDir = os.path.join(self.tempdir, 'largeinputs')
for i in inputs:
osutils.WriteFile(os.path.join(largeInputDir, i), i, makedirs=True)
cros_build_lib.CreateTarball(self.target, largeInputDir, inputs=inputs)
# Tests for tar failure retry logic.
class FailedCreateTarballTests(cros_test_lib.MockTestCase):
"""Tests special case error handling for CreateTarBall."""
def setUp(self):
"""Mock RunCommand mock."""
# Each test can change this value as needed. Each element is the return
# code in the CommandResult for subsequent calls to RunCommand().
self.tarResults = []
def Result(*_args, **_kwargs):
"""Creates CommandResult objects for each tarResults value in turn."""
return cros_build_lib.CommandResult(returncode=self.tarResults.pop(0))
self.mockRun = self.PatchObject(cros_build_lib, 'RunCommand',
autospec=True,
side_effect=Result)
def testSuccess(self):
"""CreateTarball works the first time."""
self.tarResults = [0]
cros_build_lib.CreateTarball('foo', 'bar', inputs=['a', 'b'])
self.assertEqual(self.mockRun.call_count, 1)
def testFailedOnceSoft(self):
"""Force a single retry for CreateTarball."""
self.tarResults = [1, 0]
cros_build_lib.CreateTarball('foo', 'bar', inputs=['a', 'b'], timeout=0)
self.assertEqual(self.mockRun.call_count, 2)
def testFailedOnceHard(self):
"""Test unrecoverable error."""
self.tarResults = [2]
with self.assertRaises(cros_build_lib.RunCommandError) as cm:
cros_build_lib.CreateTarball('foo', 'bar', inputs=['a', 'b'])
self.assertEqual(self.mockRun.call_count, 1)
self.assertEqual(cm.exception.args[1].returncode, 2)
def testFailedThriceSoft(self):
"""Exhaust retries for recoverable errors."""
self.tarResults = [1, 1, 1]
with self.assertRaises(cros_build_lib.RunCommandError) as cm:
cros_build_lib.CreateTarball('foo', 'bar', inputs=['a', 'b'], timeout=0)
self.assertEqual(self.mockRun.call_count, 3)
self.assertEqual(cm.exception.args[1].returncode, 1)