# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Test the cros_build_lib module."""
from __future__ import print_function
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(
import contextlib
import datetime
import difflib
import errno
import functools
import itertools
import logging
import signal
import socket
import StringIO
import time
import __builtin__
from chromite.cbuildbot import constants
from chromite.cbuildbot import repository
from chromite.lib import cros_build_lib
from chromite.lib import git
from chromite.lib import cros_test_lib
from chromite.lib import osutils
from chromite.lib import partial_mock
from chromite.lib import retry_util
from chromite.lib import signals as cros_signals
# TODO(build): Finish test wrapper (
# Until then, this has to be after the chromite imports.
import mock
# pylint: disable=W0212,R0904
class RunCommandErrorStrTest(cros_test_lib.TestCase):
"""Test that RunCommandError __str__ works as expected."""
def testNonUTF8Characters(self):
"""Test that non-UTF8 characters do not kill __str__"""
result = cros_build_lib.RunCommand(['ls', '/does/not/exist'],
rce = cros_build_lib.RunCommandError('\x81', result)
class CmdToStrTest(cros_test_lib.TestCase):
"""Test the CmdToStr function."""
def setUp(self):
self.differ = difflib.Differ()
def _assertEqual(self, func, test_input, test_output, result):
"""Like assertEqual but with built in diff support."""
diff = '\n'.join(list([test_output], [result])))
msg = ('Expected %s to translate %r to %r, but got %r\n%s' %
(func, test_input, test_output, result, diff))
self.assertEqual(test_output, result, msg)
def _testData(self, functor, tests, check_type=True):
"""Process a dict of test data."""
for test_output, test_input in tests.iteritems():
result = functor(test_input)
self._assertEqual(functor.__name__, test_input, test_output, result)
if check_type:
# Also make sure the result is a string, otherwise the %r output will
# include a "u" prefix and that is not good for logging.
self.assertEqual(type(test_output), str)
def testShellQuote(self):
"""Basic ShellQuote tests."""
# Dict of expected output strings to input lists.
tests_quote = {
"''": '',
'a': unicode('a'),
"'a b c'": unicode('a b c'),
"'a\tb'": 'a\tb',
"'/a$file'": '/a$file',
"'/a#file'": '/a#file',
"""'b"c'""": 'b"c',
"'a@()b'": 'a@()b',
'j%k': 'j%k',
r'''"s'a\$va\\rs"''': r"s'a$va\rs",
r'''"\\'\\\""''': r'''\'\"''',
r'''"'\\\$"''': r"""'\$""",
# Expected input output specific to ShellUnquote. This string cannot be
# produced by ShellQuote but is still a valid bash escaped string.
tests_unquote = {
r'''\$''': r'''"\\$"''',
def aux(s):
return cros_build_lib.ShellUnquote(cros_build_lib.ShellQuote(s))
self._testData(cros_build_lib.ShellQuote, tests_quote)
self._testData(cros_build_lib.ShellUnquote, tests_unquote)
# Test that the operations are reversible.
self._testData(aux, {k: k for k in tests_quote.values()}, False)
self._testData(aux, {k: k for k in tests_quote.keys()}, False)
def testCmdToStr(self):
# Dict of expected output strings to input lists.
tests = {
r"a b": ['a', 'b'],
r"'a b' c": ['a b', 'c'],
r'''a "b'c"''': ['a', "b'c"],
r'''a "/'\$b" 'a b c' "xy'z"''':
[unicode('a'), "/'$b", 'a b c', "xy'z"],
'': [],
self._testData(cros_build_lib.CmdToStr, tests)
class RunCommandMock(partial_mock.PartialCmdMock):
"""Provides a context where all RunCommand invocations low-level mocked."""
TARGET = 'chromite.lib.cros_build_lib'
ATTRS = ('RunCommand',)
DEFAULT_ATTR = 'RunCommand'
def RunCommand(self, cmd, *args, **kwargs):
result = self._results['RunCommand'].LookupResult(
(cmd,), hook_args=(cmd,) + args, hook_kwargs=kwargs)
popen_mock = PopenMock()
popen_mock.AddCmdResult(partial_mock.Ignore(), result.returncode,
result.output, result.error)
with popen_mock:
return self.backup['RunCommand'](cmd, *args, **kwargs)
class RunCommandTestCase(cros_test_lib.MockTestCase):
"""MockTestCase that mocks out RunCommand by default."""
def setUp(self):
self.rc = self.StartPatcher(RunCommandMock())
self.assertCommandCalled = self.rc.assertCommandCalled
self.assertCommandContains = self.rc.assertCommandContains
class RunCommandTempDirTestCase(RunCommandTestCase,
"""Convenience class mixing TempDirTestCase and RunCommandTestCase"""
class PopenMock(partial_mock.PartialCmdMock):
"""Provides a context where all _Popen instances are low-level mocked."""
TARGET = 'chromite.lib.cros_build_lib._Popen'
ATTRS = ('__init__',)
DEFAULT_ATTR = '__init__'
def __init__(self):
partial_mock.PartialCmdMock.__init__(self, create_tempdir=True)
def _target__init__(self, inst, cmd, *args, **kwargs):
result = self._results['__init__'].LookupResult(
(cmd,), hook_args=(inst, cmd,) + args, hook_kwargs=kwargs)
script = os.path.join(self.tempdir, '')
stdout = os.path.join(self.tempdir, 'output')
stderr = os.path.join(self.tempdir, 'error')
osutils.WriteFile(stdout, result.output)
osutils.WriteFile(stderr, result.error)
['#!/bin/bash\n', 'cat %s\n' % stdout, 'cat %s >&2\n' % stderr,
'exit %s' % result.returncode])
os.chmod(script, 0o700)
kwargs['cwd'] = self.tempdir
self.backup['__init__'](inst, [script, '--'] + cmd, *args, **kwargs)
class TestRunCommandNoMock(cros_test_lib.TestCase):
"""Class that tests RunCommand by not mocking subprocess.Popen"""
def testErrorCodeNotRaisesError(self):
"""Don't raise exception when command returns non-zero exit code."""
result = cros_build_lib.RunCommand(['ls', '/does/not/exist'],
self.assertTrue(result.returncode != 0)
def testMissingCommandRaisesError(self):
"""Raise error when command is not found."""
self.assertRaises(cros_build_lib.RunCommandError, cros_build_lib.RunCommand,
['/does/not/exist'], error_code_ok=False)
self.assertRaises(cros_build_lib.RunCommandError, cros_build_lib.RunCommand,
['/does/not/exist'], error_code_ok=True)
def _ForceLoggingLevel(functor):
def inner(*args, **kwargs):
current = cros_build_lib.logger.getEffectiveLevel()
return functor(*args, **kwargs)
return inner
class TestRunCommand(cros_test_lib.MockTestCase):
"""Tests of RunCommand functionality."""
def setUp(self):
# Get the original value for SIGINT so our signal() mock can return the
# correct thing.
self._old_sigint = signal.getsignal(signal.SIGINT)
# Mock the return value of Popen().
self.error = 'test error'
self.output = 'test output'
self.proc_mock = mock.MagicMock(
communicate=lambda x: (self.output, self.error))
self.popen_mock = self.PatchObject(cros_build_lib, '_Popen',
self.signal_mock = self.PatchObject(signal, 'signal')
self.getsignal_mock = self.PatchObject(signal, 'getsignal')
self.PatchObject(cros_signals, 'SignalModuleUsable', return_value=True)
def _MockChecker(self, cmd, **kwargs):
"""Verify the mocks we set up"""
ignore_sigint = kwargs.pop('ignore_sigint', False)
# Make some arbitrary functors we can pretend are signal handlers.
# Note that these are intentionally defined on the fly via lambda-
# this is to ensure that they're unique to each run.
sigint_suppress = lambda signum, frame: None
sigint_suppress.__name__ = 'sig_ign_sigint'
normal_sigint = lambda signum, frame: None
normal_sigint.__name__ = 'sigint'
normal_sigterm = lambda signum, frame: None
normal_sigterm.__name__ = 'sigterm'
# Set up complicated mock for signal.signal().
def _SignalChecker(sig, _action):
"""Return the right signal values so we can check the calls."""
if sig == signal.SIGINT:
return sigint_suppress if ignore_sigint else normal_sigint
elif sig == signal.SIGTERM:
return normal_sigterm
raise ValueError('unknown sig %i' % sig)
self.signal_mock.side_effect = _SignalChecker
# Set up complicated mock for signal.getsignal().
def _GetsignalChecker(sig):
"""Return the right signal values so we can check the calls."""
if sig == signal.SIGINT:
return normal_sigint
elif sig == signal.SIGTERM:
return normal_sigterm
raise ValueError('unknown sig %i' % sig)
self.getsignal_mock.side_effect = _GetsignalChecker
# Let the body of code run, then check the signal behavior afterwards.
# We don't get visibility into signal ordering vs command execution,
# but it's kind of hard to mess up that, so we won't bother.
class RejectSigIgn(object):
"""Make sure the signal action is not SIG_IGN."""
def __eq__(self, other):
return other != signal.SIG_IGN
# Verify the signals checked/setup are correct.
if ignore_sigint:
self.signal_mock.assert_has_calls([, signal.SIG_IGN),, RejectSigIgn()),, sigint_suppress),, normal_sigterm),
self.assertEqual(self.getsignal_mock.call_count, 1)
self.signal_mock.assert_has_calls([, RejectSigIgn()),, RejectSigIgn()),, normal_sigint),, normal_sigterm),
self.assertEqual(self.getsignal_mock.call_count, 2)
# Verify various args are passed down to the real command.
pargs = self.popen_mock.call_args[0][0]
self.assertEqual(cmd, pargs)
# Verify various kwargs are passed down to the real command.
pkwargs = self.popen_mock.call_args[1]
for key in ('cwd', 'stdin', 'stdout', 'stderr'):
kwargs.setdefault(key, None)
kwargs.setdefault('shell', False)
kwargs.setdefault('env', mock.ANY)
kwargs['close_fds'] = True
self.longMessage = True
for key in kwargs.keys():
self.assertEqual(kwargs[key], pkwargs[key],
msg='kwargs[%s] mismatch' % key)
def _AssertCrEqual(self, expected, actual):
"""Helper method to compare two CommandResult objects.
This is needed since assertEqual does not know how to compare two
CommandResult objects.
expected: a CommandResult object, expected result.
actual: a CommandResult object, actual result.
self.assertEqual(expected.cmd, actual.cmd)
self.assertEqual(expected.error, actual.error)
self.assertEqual(expected.output, actual.output)
self.assertEqual(expected.returncode, actual.returncode)
def _TestCmd(self, cmd, real_cmd, sp_kv=None, rc_kv=None, sudo=False):
"""Factor out common setup logic for testing RunCommand().
cmd: a string or an array of strings that will be passed to RunCommand.
real_cmd: the real command we expect RunCommand to call (might be
modified to have enter_chroot).
sp_kv: key-value pairs passed to subprocess.Popen().
rc_kv: key-value pairs passed to RunCommand().
sudo: use SudoRunCommand() rather than RunCommand().
if sp_kv is None:
sp_kv = {}
if rc_kv is None:
rc_kv = {}
expected_result = cros_build_lib.CommandResult()
expected_result.cmd = real_cmd
expected_result.error = self.error
expected_result.output = self.output
expected_result.returncode = self.proc_mock.returncode
arg_dict = dict()
for attr in ('close_fds', 'cwd', 'env', 'stdin', 'stdout', 'stderr',
if attr in sp_kv:
arg_dict[attr] = sp_kv[attr]
if attr == 'close_fds':
arg_dict[attr] = True
elif attr == 'shell':
arg_dict[attr] = False
arg_dict[attr] = None
if sudo:
runcmd = cros_build_lib.SudoRunCommand
runcmd = cros_build_lib.RunCommand
with self._MockChecker(real_cmd, ignore_sigint=rc_kv.get('ignore_sigint'),
actual_result = runcmd(cmd, **rc_kv)
self._AssertCrEqual(expected_result, actual_result)
def testReturnCodeZeroWithArrayCmd(self, ignore_sigint=False):
"""--enter_chroot=False and --cmd is an array of strings.
Parameterized so this can also be used by some other tests w/ alternate
params to RunCommand().
ignore_sigint: If True, we'll tell RunCommand to ignore sigint.
self.proc_mock.returncode = 0
cmd_list = ['foo', 'bar', 'roger']
self._TestCmd(cmd_list, cmd_list,
def testSignalRestoreNormalCase(self):
"""Test RunCommand() properly sets/restores sigint. Normal case."""
def testReturnCodeZeroWithArrayCmdEnterChroot(self):
"""--enter_chroot=True and --cmd is an array of strings."""
self.proc_mock.returncode = 0
cmd_list = ['foo', 'bar', 'roger']
real_cmd = cmd_list
if not cros_build_lib.IsInsideChroot():
real_cmd = ['cros_sdk', '--'] + cmd_list
self._TestCmd(cmd_list, real_cmd, rc_kv=dict(enter_chroot=True))
def testCommandFailureRaisesError(self, ignore_sigint=False):
"""Verify error raised by communicate() is caught.
Parameterized so this can also be used by some other tests w/ alternate
params to RunCommand().
ignore_sigint: If True, we'll tell RunCommand to ignore sigint.
cmd = 'test cmd'
self.proc_mock.returncode = 1
with self._MockChecker(['/bin/bash', '-c', cmd],
cros_build_lib.RunCommand, cmd, shell=True,
ignore_sigint=ignore_sigint, error_code_ok=False)
def testSubprocessCommunicateExceptionRaisesError(self, ignore_sigint=False):
"""Verify error raised by communicate() is caught.
Parameterized so this can also be used by some other tests w/ alternate
params to RunCommand().
ignore_sigint: If True, we'll tell RunCommand to ignore sigint.
cmd = ['test', 'cmd']
self.proc_mock.communicate = mock.MagicMock(side_effect=ValueError)
with self._MockChecker(cmd, ignore_sigint=ignore_sigint):
self.assertRaises(ValueError, cros_build_lib.RunCommand, cmd,
def testSignalRestoreExceptionCase(self):
"""Test RunCommand() properly sets/restores sigint. Exception case."""
def testEnvWorks(self):
"""Test RunCommand(..., env=xyz) works."""
# We'll put this bogus environment together, just to make sure
# subprocess.Popen gets passed it.
env = {'Tom': 'Jerry', 'Itchy': 'Scratchy'}
# This is a simple case, copied from testReturnCodeZeroWithArrayCmd()
self.proc_mock.returncode = 0
cmd_list = ['foo', 'bar', 'roger']
# Run. We expect the env= to be passed through from sp (subprocess.Popen)
# to rc (RunCommand).
self._TestCmd(cmd_list, cmd_list,
def testExtraEnvOnlyWorks(self):
"""Test RunCommand(..., extra_env=xyz) works."""
# We'll put this bogus environment together, just to make sure
# subprocess.Popen gets passed it.
extra_env = {'Pinky' : 'Brain'}
## This is a little bit circular, since the same logic is used to compute
## the value inside, but at least it checks that this happens.
total_env = os.environ.copy()
# This is a simple case, copied from testReturnCodeZeroWithArrayCmd()
self.proc_mock.returncode = 0
cmd_list = ['foo', 'bar', 'roger']
# Run. We expect the env= to be passed through from sp (subprocess.Popen)
# to rc (RunCommand).
self._TestCmd(cmd_list, cmd_list,
def testExtraEnvTooWorks(self):
"""Test RunCommand(..., env=xy, extra_env=z) works."""
# We'll put this bogus environment together, just to make sure
# subprocess.Popen gets passed it.
env = {'Tom': 'Jerry', 'Itchy': 'Scratchy'}
extra_env = {'Pinky': 'Brain'}
total_env = {'Tom': 'Jerry', 'Itchy': 'Scratchy', 'Pinky': 'Brain'}
# This is a simple case, copied from testReturnCodeZeroWithArrayCmd()
self.proc_mock.returncode = 0
cmd_list = ['foo', 'bar', 'roger']
# Run. We expect the env= to be passed through from sp (subprocess.Popen)
# to rc (RunCommand).
self._TestCmd(cmd_list, cmd_list,
rc_kv=dict(env=env, extra_env=extra_env))
@mock.patch('chromite.lib.cros_build_lib.IsInsideChroot', return_value=False)
def testChrootExtraEnvWorks(self, _inchroot_mock):
"""Test RunCommand(..., enter_chroot=True, env=xy, extra_env=z) works."""
# We'll put this bogus environment together, just to make sure
# subprocess.Popen gets passed it.
env = {'Tom': 'Jerry', 'Itchy': 'Scratchy'}
extra_env = {'Pinky': 'Brain'}
total_env = {'Tom': 'Jerry', 'Itchy': 'Scratchy', 'Pinky': 'Brain'}
# This is a simple case, copied from testReturnCodeZeroWithArrayCmd()
self.proc_mock.returncode = 0
cmd_list = ['foo', 'bar', 'roger']
# Run. We expect the env= to be passed through from sp (subprocess.Popen)
# to rc (RunCommand).
self._TestCmd(cmd_list, ['cros_sdk', 'Pinky=Brain', '--'] + cmd_list,
rc_kv=dict(env=env, extra_env=extra_env, enter_chroot=True))
def testExceptionEquality(self):
"""Verify equality methods for RunCommandError"""
c1 = cros_build_lib.CommandResult(cmd=['ls', 'arg'], returncode=1)
c2 = cros_build_lib.CommandResult(cmd=['ls', 'arg1'], returncode=1)
c3 = cros_build_lib.CommandResult(cmd=['ls', 'arg'], returncode=2)
e1 = cros_build_lib.RunCommandError('Message 1', c1)
e2 = cros_build_lib.RunCommandError('Message 1', c1)
e_diff_msg = cros_build_lib.RunCommandError('Message 2', c1)
e_diff_cmd = cros_build_lib.RunCommandError('Message 1', c2)
e_diff_code = cros_build_lib.RunCommandError('Message 1', c3)
self.assertEqual(e1, e2)
self.assertNotEqual(e1, e_diff_msg)
self.assertNotEqual(e1, e_diff_cmd)
self.assertNotEqual(e1, e_diff_code)
def testSudoRunCommand(self):
"""Test SudoRunCommand(...) works."""
cmd_list = ['foo', 'bar', 'roger']
sudo_list = ['sudo', '--'] + cmd_list
self.proc_mock.returncode = 0
self._TestCmd(cmd_list, sudo_list, sudo=True)
def testSudoRunCommandShell(self):
"""Test SudoRunCommand(..., shell=True) works."""
cmd = 'foo bar roger'
sudo_list = ['sudo', '--', '/bin/bash', '-c', cmd]
self.proc_mock.returncode = 0
self._TestCmd(cmd, sudo_list, sudo=True,
def testSudoRunCommandEnv(self):
"""Test SudoRunCommand(..., extra_env=z) works."""
cmd_list = ['foo', 'bar', 'roger']
sudo_list = ['sudo', 'shucky=ducky', '--'] + cmd_list
extra_env = {'shucky' : 'ducky'}
self.proc_mock.returncode = 0
self._TestCmd(cmd_list, sudo_list, sudo=True,
def testSudoRunCommandUser(self):
"""Test SudoRunCommand(..., user='...') works."""
cmd_list = ['foo', 'bar', 'roger']
sudo_list = ['sudo', '-u', 'MMMMMonster', '--'] + cmd_list
self.proc_mock.returncode = 0
self._TestCmd(cmd_list, sudo_list, sudo=True,
def testSudoRunCommandUserShell(self):
"""Test SudoRunCommand(..., user='...', shell=True) works."""
cmd = 'foo bar roger'
sudo_list = ['sudo', '-u', 'MMMMMonster', '--', '/bin/bash', '-c', cmd]
self.proc_mock.returncode = 0
self._TestCmd(cmd, sudo_list, sudo=True,
rc_kv=dict(user='MMMMMonster', shell=True))
class TestRunCommandLogging(cros_test_lib.TempDirTestCase):
"""Tests of RunCommand logging."""
def testLogStdoutToFile(self):
log = os.path.join(self.tempdir, 'output')
ret = cros_build_lib.RunCommand(
['echo', 'monkeys'], log_stdout_to_file=log)
self.assertEqual(osutils.ReadFile(log), 'monkeys\n')
self.assertIs(ret.output, None)
self.assertIs(ret.error, None)
ret = cros_build_lib.RunCommand(
['sh', '-c', 'echo monkeys3 >&2'],
log_stdout_to_file=log, redirect_stderr=True)
self.assertEqual(ret.error, 'monkeys3\n')
self.assertEqual(os.path.getsize(log), 0)
ret = cros_build_lib.RunCommand(
['sh', '-c', 'echo monkeys4; echo monkeys5 >&2'],
log_stdout_to_file=log, combine_stdout_stderr=True)
self.assertIs(ret.output, None)
self.assertIs(ret.error, None)
self.assertEqual(osutils.ReadFile(log), 'monkeys4\nmonkeys5\n')
class TestRetries(cros_test_lib.MockTestCase):
"""Tests of GenericRetry and relatives."""
def testGenericRetry(self):
source, source2 = iter(xrange(5)).next, iter(xrange(5)).next
def f():
val = source2()
self.assertEqual(val, source())
if val < 4:
raise ValueError()
return val
handler = lambda ex: isinstance(ex, ValueError)
self.assertRaises(ValueError, retry_util.GenericRetry, handler, 3, f)
self.assertEqual(4, retry_util.GenericRetry(handler, 1, f))
self.assertRaises(StopIteration, retry_util.GenericRetry, handler, 3, f)
def testRetryExceptionBadArgs(self):
"""Verify we reject non-classes or tuples of classes"""
self.assertRaises(TypeError, retry_util.RetryException, '', 3, map)
self.assertRaises(TypeError, retry_util.RetryException, 123, 3, map)
self.assertRaises(TypeError, retry_util.RetryException, None, 3, map)
self.assertRaises(TypeError, retry_util.RetryException, [None], 3, map)
def testRetryException(self):
"""Verify we retry only when certain exceptions get thrown"""
source, source2 = iter(xrange(6)).next, iter(xrange(6)).next
def f():
val = source2()
self.assertEqual(val, source())
if val < 2:
raise OSError()
if val < 5:
raise ValueError()
return val
self.assertRaises(OSError, retry_util.RetryException,
(OSError, ValueError), 2, f)
self.assertRaises(ValueError, retry_util.RetryException,
(OSError, ValueError), 1, f)
self.assertEqual(5, retry_util.RetryException(ValueError, 1, f))
self.assertRaises(StopIteration, retry_util.RetryException,
ValueError, 3, f)
def testRetryWithBackoff(self):
sleep_history = []
def mock_sleep(x):
self.PatchObject(time, 'sleep', new=mock_sleep)
def always_fails():
raise ValueError()
handler = lambda x: True
with self.assertRaises(ValueError):
retry_util.GenericRetry(handler, 5, always_fails, sleep=1,
self.assertEqual(sleep_history, [1, 2, 4, 8, 16])
def testBasicRetry(self):
# pylint: disable=E1101
path = os.path.join(self.tempdir, 'script')
paths = {
'stop': os.path.join(self.tempdir, 'stop'),
'store': os.path.join(self.tempdir, 'store'),
"import sys\n"
"val = int(open(%(store)r).read())\n"
"stop_val = int(open(%(stop)r).read())\n"
"open(%(store)r, 'w').write(str(val + 1))\n"
"print val\n"
"sys.exit(0 if val == stop_val else 1)\n" % paths)
os.chmod(path, 0o755)
def _setup_counters(start, stop):
osutils.WriteFile(paths['store'], str(start))
osutils.WriteFile(paths['stop'], str(stop))
def _check_counters(sleep, sleep_cnt):
calls = [ * (x + 1)) for x in range(sleep_cnt)]
sleep_mock = self.PatchObject(time, 'sleep')
_setup_counters(0, 0)
command = ['python', path]
kwargs = {'redirect_stdout': True, 'print_cmd': False}
self.assertEqual(cros_build_lib.RunCommand(command, **kwargs).output, '0\n')
_check_counters(0, 0)
func = retry_util.RunCommandWithRetries
_setup_counters(2, 2)
self.assertEqual(func(0, command, sleep=0, **kwargs).output, '2\n')
_check_counters(0, 0)
_setup_counters(0, 2)
self.assertEqual(func(2, command, sleep=1, **kwargs).output, '2\n')
_check_counters(1, 2)
_setup_counters(0, 1)
self.assertEqual(func(1, command, sleep=2, **kwargs).output, '1\n')
_check_counters(2, 1)
_setup_counters(0, 3)
func, 2, command, sleep=3, **kwargs)
_check_counters(3, 2)
class TestTimedCommand(cros_test_lib.MockTestCase):
"""Tests for TimedCommand()"""
# TODO: Would be nice to insert a hook into the logging system so we verify
# the message actually gets passed down. The logging module swallows the
# exceptions it throws internally when not all args get converted.
def setUp(self):
self.cmd = mock.MagicMock(return_value=1234)
self.cmd.__name__ = 'name'
def testBasic(self):
"""Make sure simple stuff works."""
def testArgs(self):
"""Verify passing of optional args to the destination function."""
cros_build_lib.TimedCommand(self.cmd, 'arg', 1, kw=True, alist=[])
self.cmd.assert_called_once_with('arg', 1, kw=True, alist=[])
def testReturn(self):
"""Verify return values get passed back."""
ret = cros_build_lib.TimedCommand(self.cmd)
self.assertEqual(ret, 1234)
def testCallback(self):
"""Verify log callback does the right thing."""
def cb(lvl, msg, ret, delta):
self.assertEqual(lvl, 10)
self.assertEqual(msg, 'msg!')
self.assertEqual(ret, 1234)
self.assertTrue(isinstance(delta, datetime.timedelta))
cros_build_lib.TimedCommand(self.cmd, timed_log_level=10,
timed_log_msg='msg!', timed_log_callback=cb)
def testLog(self):
"""Verify the logger module gets called."""
m = self.PatchObject(logging, 'log')
cros_build_lib.TimedCommand(self.cmd, timed_log_level=logging.WARNING,
self.assertEqual(m.call_count, 1)
def testLogStraight(self):
"""Verify logging messages does the right thing."""
cros_build_lib.TimedCommand(self.cmd, timed_log_level=logging.WARNING,
class TestListFiles(cros_test_lib.TempDirTestCase):
"""Tests of ListFiles funciton."""
def _CreateNestedDir(self, dir_structure):
for entry in dir_structure:
full_path = os.path.join(os.path.join(self.tempdir, entry))
# ensure dirs are created
if full_path.endswith('/'):
# we only want to create directories
except OSError as err:
if err.errno == errno.EEXIST:
# we don't care if the dir already exists
# create dummy files
tmp = open(full_path, 'w')
def testTraverse(self):
"""Test that we are traversing the directory properly."""
dir_structure = ['one/two/test.txt', 'one/',
files = cros_build_lib.ListFiles(self.tempdir)
for f in files:
f = f.replace(self.tempdir, '').lstrip('/')
if f not in dir_structure:'%s was not found in %s' % (f, dir_structure))
def testEmptyFilePath(self):
"""Test that we return nothing when directories are empty."""
dir_structure = ['one/', 'two/', 'one/a/']
files = cros_build_lib.ListFiles(self.tempdir)
self.assertEqual(files, [])
def testNoSuchDir(self):
cros_build_lib.ListFiles(os.path.join(self.tempdir, 'missing'))
except OSError as err:
self.assertEqual(err.errno, errno.ENOENT)
class HelperMethodSimpleTests(cros_test_lib.TestCase):
"""Tests for various helper methods without using mocks."""
def _TestChromeosVersion(self, test_str, expected=None):
actual = cros_build_lib.GetChromeosVersion(test_str)
self.assertEqual(expected, actual)
def testGetChromeosVersionWithValidVersionReturnsValue(self):
expected = ''
self._TestChromeosVersion(test_str, expected)
def testGetChromeosVersionWithMultipleVersionReturnsFirstMatch(self):
expected = ''
self._TestChromeosVersion(test_str, expected)
def testGetChromeosVersionWithInvalidVersionReturnsDefault(self):
test_str = ' CHROMEOS_VERSION_STRING=invalid_version_string '
def testGetChromeosVersionWithEmptyInputReturnsDefault(self):
def testGetChromeosVersionWithNoneInputReturnsDefault(self):
def testUserDateTime(self):
"""Test with a raw time value."""
expected = 'Mon, 16 Jun 1980 05:03:20 -0700 (PDT)'
with cros_test_lib.SetTimeZone('US/Pacific'):
timeval = 330005000
def testUserDateTimeDateTime(self):
"""Test with a datetime object."""
expected = 'Mon, 16 Jun 1980 00:00:00 -0700 (PDT)'
with cros_test_lib.SetTimeZone('US/Pacific'):
timeval = datetime.datetime(1980, 6, 16)
def testUserDateTimeDateTimeInWinter(self):
"""Test that we correctly switch from PDT to PST."""
expected = 'Wed, 16 Jan 1980 00:00:00 -0800 (PST)'
with cros_test_lib.SetTimeZone('US/Pacific'):
timeval = datetime.datetime(1980, 1, 16)
def testUserDateTimeDateTimeInEST(self):
"""Test that we correctly switch from PDT to EST."""
expected = 'Wed, 16 Jan 1980 00:00:00 -0500 (EST)'
with cros_test_lib.SetTimeZone('US/Eastern'):
timeval = datetime.datetime(1980, 1, 16)
def testUserDateTimeCurrentTime(self):
"""Test that we can get the current time."""
def testParseUserDateTimeFormat(self):
stringtime = cros_build_lib.UserDateTimeFormat(100000.0)
def testParseDurationToSeconds(self):
3600 + 60 + 1)
def testMachineDetails(self):
"""Verify we don't crash."""
contents = cros_build_lib.MachineDetails()
self.assertNotEqual(contents, '')
self.assertEqual(contents[-1], '\n')
class TestInput(cros_test_lib.MockOutputTestCase):
"""Tests of input gathering functionality."""
def testGetInput(self):
"""Verify GetInput() basic behavior."""
response = 'Some response'
self.PatchObject(__builtin__, 'raw_input', return_value=response)
self.assertEquals(response, cros_build_lib.GetInput('prompt'))
def testBooleanPrompt(self):
"""Verify BooleanPrompt() full behavior."""
m = self.PatchObject(cros_build_lib, 'GetInput')
m.return_value = ''
m.return_value = 'yes'
m.return_value = 'ye'
m.return_value = 'y'
m.return_value = 'no'
m.return_value = 'n'
def testBooleanShellValue(self):
"""Verify BooleanShellValue() inputs work as expected"""
for v in (None,):
self.assertTrue(cros_build_lib.BooleanShellValue(v, True))
self.assertFalse(cros_build_lib.BooleanShellValue(v, False))
for v in (1234, '', 'akldjsf', '"'):
self.assertRaises(ValueError, cros_build_lib.BooleanShellValue, v, True)
self.assertTrue(cros_build_lib.BooleanShellValue(v, True, msg=''))
self.assertFalse(cros_build_lib.BooleanShellValue(v, False, msg=''))
for v in ('yes', 'YES', 'YeS', 'y', 'Y', '1', 'true', 'True', 'TRUE',):
self.assertTrue(cros_build_lib.BooleanShellValue(v, True))
self.assertTrue(cros_build_lib.BooleanShellValue(v, False))
for v in ('no', 'NO', 'nO', 'n', 'N', '0', 'false', 'False', 'FALSE',):
self.assertFalse(cros_build_lib.BooleanShellValue(v, True))
self.assertFalse(cros_build_lib.BooleanShellValue(v, False))
def testGetChoiceLists(self):
"""Verify GetChoice behavior w/lists."""
m = self.PatchObject(cros_build_lib, 'GetInput')
m.return_value = '1'
ret = cros_build_lib.GetChoice('title', ['a', 'b', 'c'])
self.assertEqual(ret, 1)
def testGetChoiceGenerator(self):
"""Verify GetChoice behavior w/generators."""
m = self.PatchObject(cros_build_lib, 'GetInput')
m.return_value = '2'
ret = cros_build_lib.GetChoice('title', xrange(3))
self.assertEqual(ret, 2)
def testGetChoiceWindow(self):
"""Verify GetChoice behavior w/group_size set."""
m = self.PatchObject(cros_build_lib, 'GetInput')
cnt = [0]
def _Gen():
while True:
cnt[0] += 1
yield 'a'
m.side_effect = ['\n', '2']
ret = cros_build_lib.GetChoice('title', _Gen(), group_size=2)
self.assertEqual(ret, 2)
# Verify we showed the correct number of times.
self.assertEqual(cnt[0], 5)
class TestContextManagerStack(cros_test_lib.TestCase):
"""Test the ContextManagerStack class."""
def test(self):
invoked = []
counter = iter(itertools.count()).next
def _mk_kls(has_exception=None, exception_kls=None, suppress=False):
class foon(object):
"""Simple context manager which runs checks on __exit__."""
marker = counter()
def __enter__(self):
return self
# pylint: disable=no-self-argument,bad-context-manager
def __exit__(obj_self, exc_type, exc, traceback):
if has_exception is not None:
self.assertTrue(all(x is not None
for x in (exc_type, exc, traceback)))
self.assertTrue(exc_type == has_exception)
if exception_kls:
raise exception_kls()
if suppress:
return True
return foon
with cros_build_lib.ContextManagerStack() as stack:
# Note... these tests are in reverse, since the exception
# winds its way up the stack.
stack.Add(_mk_kls(ValueError, suppress=True))
stack.Add(_mk_kls(IndexError, exception_kls=ValueError))
self.assertEqual(invoked, list(reversed(range(6))))
class TestManifestCheckout(cros_test_lib.TempDirTestCase):
"""Tests for ManifestCheckout functionality."""
def setUp(self):
self.manifest_dir = os.path.join(self.tempdir, '.repo', 'manifests')
# Initialize a repo instance here.
local_repo = os.path.join(constants.SOURCE_ROOT, '.repo/repo/.git')
# Create a copy of our existing manifests.git, but rewrite it so it
# looks like a remote manifests.git. This is to avoid hitting the
# network, and speeds things up in general.
local_manifests = 'file://%s/.repo/manifests.git' % constants.SOURCE_ROOT
temp_manifests = os.path.join(self.tempdir, 'manifests.git')
git.RunGit(self.tempdir, ['clone', '-n', '--bare', local_manifests])
['fetch', '-f', '-u', local_manifests,
git.RunGit(temp_manifests, ['branch', '-D', 'default'])
repo = repository.RepoRepository(
temp_manifests, self.tempdir,
repo_url='file://%s' % local_repo, repo_branch='default')
self.active_manifest = os.path.realpath(
os.path.join(self.tempdir, '.repo', 'manifest.xml'))
def testManifestInheritance(self):
osutils.WriteFile(self.active_manifest, """
<include name="include-target.xml" />
<include name="empty.xml" />
<project name="monkeys" path="baz" remote="foon" revision="master" />
# First, verify it properly explodes if the include can't be found.
git.ManifestCheckout, self.tempdir)
# Next, verify it can read an empty manifest; this is to ensure
# that we can point Manifest at the empty manifest without exploding,
# same for ManifestCheckout; this sort of thing is primarily useful
# to ensure no step of an include assumes everything is yet assembled.
empty_path = os.path.join(self.manifest_dir, 'empty.xml')
osutils.WriteFile(empty_path, '<manifest/>')
git.ManifestCheckout(self.tempdir, manifest_path=empty_path)
# Next, verify include works.
os.path.join(self.manifest_dir, 'include-target.xml'),
<remote name="foon" fetch="http://localhost" />
manifest = git.ManifestCheckout(self.tempdir)
self.assertEqual(list(manifest.checkouts_by_name), ['monkeys'])
self.assertEqual(list(manifest.remotes), ['foon'])
# pylint: disable=E1101
def testGetManifestsBranch(self):
func = git.ManifestCheckout._GetManifestsBranch
manifest = self.manifest_dir
repo_root = self.tempdir
# pylint: disable=W0613
def reconfig(merge='master', origin='origin'):
if merge is not None:
merge = 'refs/heads/%s' % merge
for key in ('merge', 'origin'):
val = locals()[key]
key = 'branch.default.%s' % key
if val is None:
git.RunGit(manifest, ['config', '--unset', key], error_code_ok=True)
git.RunGit(manifest, ['config', key, val])
# First, verify our assumptions about a fresh repo init are correct.
self.assertEqual('default', git.GetCurrentBranch(manifest))
self.assertEqual('master', func(repo_root))
# Ensure we can handle a missing origin; this can occur jumping between
# branches, and can be worked around.
self.assertEqual('default', git.GetCurrentBranch(manifest))
self.assertEqual('master', func(repo_root))
def assertExcept(message, **kwargs):
self.assertRaises2(OSError, func, repo_root, ex_msg=message,
check_attrs={'errno': errno.ENOENT})
# No merge target means the configuration isn't usable, period.
assertExcept("git tracking configuration for that branch is broken",
# Ensure we detect if we're on the wrong branch, even if it has
# tracking setup.
git.RunGit(manifest, ['checkout', '-t', 'origin/master', '-b', 'test'])
assertExcept("It should be checked out to 'default'")
# Ensure we handle detached HEAD w/ an appropriate exception.
git.RunGit(manifest, ['checkout', '--detach', 'test'])
assertExcept("It should be checked out to 'default'")
# Finally, ensure that if the default branch is non-existant, we still throw
# a usable exception.
git.RunGit(manifest, ['branch', '-d', 'default'])
assertExcept("It should be checked out to 'default'")
def testGitMatchBranchName(self):
git_repo = os.path.join(self.tempdir, '.repo', 'manifests')
branches = git.MatchBranchName(git_repo, 'default', namespace='')
self.assertEqual(branches, ['refs/heads/default'])
branches = git.MatchBranchName(git_repo, 'default', namespace='refs/heads/')
self.assertEqual(branches, ['default'])
branches = git.MatchBranchName(git_repo, 'origin/f.*link',
self.assertTrue('firmware-link-' in branches[0])
branches = git.MatchBranchName(git_repo, 'r23')
self.assertEqual(branches, ['refs/remotes/origin/release-R23-2913.B'])
class Test_iflatten_instance(cros_test_lib.TestCase):
"""Test iflatten_instance function."""
def test_it(self):
f = lambda *a: list(cros_build_lib.iflatten_instance(*a))
self.assertEqual([1, 2], f([1, 2]))
self.assertEqual([1, '2a'], f([1, '2a']))
self.assertEqual([1, 2, 'b'], f([1, [2, 'b']]))
self.assertEqual([1, 2, 'f', 'd', 'a', 's'], f([1, 2, ('fdas',)], int))
self.assertEqual([''], f(''))
class TestKeyValueFiles(cros_test_lib.TempDirTestCase):
"""Tests handling of key/value files."""
def setUp(self):
self.contents = """# A comment !@
A = 1
AA= 2
AAA =3
AAAAA\t \t=\t 5
AAAAAA = 6 \t\t# Another comment
\t# Aerith lives!
C = 'D'
CC= 'D'
CCC ='D'
\t# monsters go boom #
E \t= "Fxxxxx" # Blargl
EE= "Faaa\taaaa"\x20
EEE ="Fk \t kkkk"\t
Q = "'q"
\tQQ ="q'"\x20
R = "r
RR = "rr
RRR = 'rrr
SSS=" ss
self.expected = {
'A': '1',
'AA': '2',
'AAA': '3',
'AAAA': '4',
'AAAAA': '5',
'AAAAAA': '6',
'C': 'D',
'CC': 'D',
'CCC': 'D',
'E': 'Fxxxxx',
'EE': 'Faaa\taaaa',
'EEE': 'Fk \t kkkk',
'Q': "'q",
'QQ': "q'",
'QQQ': '"q"',
'R': 'r\n',
'RR': 'rr\nrrr',
'RRR': 'rrr\n RRRR\n rrr\n',
'SSS': ' ss\n\'ssss\'\nss',
'T': '\nttt'
self.conf_file = os.path.join(self.tempdir, 'file.conf')
osutils.WriteFile(self.conf_file, self.contents)
def _RunAndCompare(self, test_input, multiline):
result = cros_build_lib.LoadKeyValueFile(test_input, multiline=multiline)
self.assertEqual(self.expected, result)
def testLoadFilePath(self):
"""Verify reading a simple file works"""
self._RunAndCompare(self.conf_file, True)
def testLoadStringIO(self):
"""Verify passing in StringIO object works."""
self._RunAndCompare(StringIO.StringIO(self.contents), True)
def testLoadFileObject(self):
"""Verify passing in open file object works."""
with open(self.conf_file) as f:
self._RunAndCompare(f, True)
def testNoMultlineValues(self):
"""Verify exception is thrown when multiline is disabled."""
self.assertRaises(ValueError, self._RunAndCompare, self.conf_file, False)
class SafeRunTest(cros_test_lib.TestCase):
"""Tests SafeRunTest functionality."""
def _raise_exception(self, e):
raise e
def testRunsSafely(self):
"""Verify that we are robust to exceptions."""
def append_val(value):
call_list = []
f_list = [functools.partial(append_val, 1),
Exception('testRunsSafely exception.')),
functools.partial(append_val, 2)]
self.assertRaises(Exception, cros_build_lib.SafeRun, f_list)
self.assertEquals(call_list, [1, 2])
def testRaisesFirstException(self):
"""Verify we raise the first exception when multiple are encountered."""
class E1(Exception):
"""Simple exception class."""
class E2(Exception):
"""Simple exception class."""
f_list = [functools.partial(self._raise_exception, e) for e in [E1, E2]]
self.assertRaises(E1, cros_build_lib.SafeRun, f_list)
def testCombinedRaise(self):
"""Raises a RuntimeError with exceptions combined."""
f_list = [functools.partial(self._raise_exception, Exception())] * 3
self.assertRaises(RuntimeError, cros_build_lib.SafeRun, f_list,
class FrozenAttributesTest(cros_test_lib.TestCase):
"""Tests FrozenAttributesMixin functionality."""
class DummyClass(object):
"""Any class that does not override __setattr__."""
class SetattrClass(object):
"""Class that does override __setattr__."""
def __setattr__(self, attr, value):
"""Adjust value here to later confirm that this code ran."""
object.__setattr__(self, attr, self.SETATTR_OFFSET + value)
def _TestBasics(self, cls):
# pylint: disable=W0201
def _Expected(val):
return getattr(cls, 'SETATTR_OFFSET', 0) + val
obj = cls()
obj.a = 1
obj.b = 2
self.assertEquals(_Expected(1), obj.a)
self.assertEquals(_Expected(2), obj.b)
self.assertRaises(cros_build_lib.AttributeFrozenError, setattr, obj, 'a', 3)
self.assertEquals(_Expected(1), obj.a)
self.assertRaises(cros_build_lib.AttributeFrozenError, setattr, obj, 'c', 3)
self.assertFalse(hasattr(obj, 'c'))
def testFrozenByMetaclass(self):
"""Test attribute freezing with FrozenAttributesClass."""
class DummyByMeta(self.DummyClass):
"""Class that freezes DummyClass using metaclass construct."""
__metaclass__ = cros_build_lib.FrozenAttributesClass
class SetattrByMeta(self.SetattrClass):
"""Class that freezes SetattrClass using metaclass construct."""
__metaclass__ = cros_build_lib.FrozenAttributesClass
def testFrozenByMixinFirst(self):
"""Test attribute freezing with FrozenAttributesMixin first in hierarchy."""
class Dummy(cros_build_lib.FrozenAttributesMixin, self.DummyClass):
"""Class that freezes DummyClass using mixin construct."""
class Setattr(cros_build_lib.FrozenAttributesMixin, self.SetattrClass):
"""Class that freezes SetattrClass using mixin construct."""
def testFrozenByMixinLast(self):
"""Test attribute freezing with FrozenAttributesMixin last in hierarchy."""
class Dummy(self.DummyClass, cros_build_lib.FrozenAttributesMixin):
"""Class that freezes DummyClass using mixin construct."""
class Setattr(self.SetattrClass, cros_build_lib.FrozenAttributesMixin):
"""Class that freezes SetattrClass using mixin construct."""
class TestGetIPv4Address(RunCommandTestCase):
"""Tests the GetIPv4Address function."""
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 16436 qdisc noqueue state UNKNOWN
link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
2: eth0: <NO-CARRIER,BROADCAST,MULTICAST,UP> mtu 1500 qdisc pfifo_fast state \
DOWN qlen 1000
link/ether cc:cc:cc:cc:cc:cc brd ff:ff:ff:ff:ff:ff
3: eth1: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP \
qlen 1000
link/ether dd:dd:dd:dd:dd:dd brd ff:ff:ff:ff:ff:ff
inet brd scope global eth1
inet6 cdef:0:cdef:cdef:cdef:cdef:cdef:cdef/64 scope global dynamic
valid_lft 2592000sec preferred_lft 604800sec
def testGetIPv4AddressParseResult(self):
"""Verifies we can parse the output and get correct IP address."""
self.rc.AddCmdResult(partial_mock.In('ip'), output=self.IP_GLOBAL_OUTPUT)
self.assertEqual(cros_build_lib.GetIPv4Address(), '')
def testGetIPv4Address(self):
"""Tests that correct shell commmand is called."""
cros_build_lib.GetIPv4Address(global_ip=False, dev='eth0')
['ip', 'addr', 'show', 'scope', 'host', 'dev', 'eth0'])
self.rc.assertCommandContains(['ip', 'addr', 'show', 'scope', 'global'])
class TestGetHostname(cros_test_lib.MockTestCase):
"""Tests GetHostName & GetHostDomain functionality."""
def setUp(self):
self.gethostname_mock = self.PatchObject(
socket, 'gethostname', return_value='m!!n')
self.gethostbyaddr_mock = self.PatchObject(
socket, 'gethostbyaddr', return_value=(
'm!!', ('cow', 'bar',), ('',)))
def testGetHostNameNonQualified(self):
"""Verify non-qualified behavior"""
self.assertEqual(cros_build_lib.GetHostName(), 'm!!n')
def testGetHostNameFullyQualified(self):
"""Verify fully qualified behavior"""
def testGetHostNameBadDns(self):
"""Do not fail when the user's dns is bad"""
self.gethostbyaddr_mock.side_effect = socket.gaierror('should be caught')
self.assertEqual(cros_build_lib.GetHostName(), 'm!!n')
def testGetHostDomain(self):
"""Verify basic behavior"""
self.assertEqual(cros_build_lib.GetHostDomain(), '')
class TestGetChrootVersion(cros_test_lib.MockTestCase):
"""Tests GetChrootVersion functionality."""
def testSimpleBuildroot(self):
"""Verify buildroot arg works"""
read_mock = self.PatchObject(osutils, 'ReadFile', return_value='12\n')
ret = cros_build_lib.GetChrootVersion(buildroot='/build/root')
self.assertEqual(ret, '12')
def testSimpleChroot(self):
"""Verify chroot arg works"""
read_mock = self.PatchObject(osutils, 'ReadFile', return_value='70')
ret = cros_build_lib.GetChrootVersion(chroot='/ch/root')
self.assertEqual(ret, '70')
def testNoChroot(self):
"""Verify we don't blow up when there is no chroot yet"""
ret = cros_build_lib.GetChrootVersion(chroot='/.$om3/place/nowhere')
self.assertEqual(ret, None)
class TestChrootPathHelpers(cros_test_lib.TestCase):
"""Verify we correctly reinterpret paths to be used inside/outside chroot."""
@mock.patch('chromite.lib.cros_build_lib.IsInsideChroot', return_value=True)
def testToChrootPathInChroot(self, _inchroot_mock):
"""Test we return the original path to be used in chroot while in chroot."""
path = '/foo/woo/bar'
self.assertEqual(cros_build_lib.ToChrootPath(path), path)
@mock.patch('chromite.lib.cros_build_lib.IsInsideChroot', return_value=False)
def testToChrootPathOutChroot(self, _inchroot_mock):
"""Test we convert the path to be used in chroot while outside chroot."""
subpath = 'bar/haa/ooo'
path = os.path.join(constants.SOURCE_ROOT, subpath)
chroot_path = git.ReinterpretPathForChroot(path)
self.assertEqual(cros_build_lib.ToChrootPath(path), chroot_path)
@mock.patch('chromite.lib.cros_build_lib.IsInsideChroot', return_value=True)
def testFromChrootInChroot(self, _inchroot_mock):
"""Test we return the original chroot path while in chroot."""
path = '/foo/woo/bar'
self.assertEqual(cros_build_lib.FromChrootPath(path), path)
@mock.patch('chromite.lib.cros_build_lib.IsInsideChroot', return_value=False)
def testFromChrootOutChroot(self, _inchroot_mock):
"""Test we convert the chroot path to be used outside chroot."""
# Test that chroot source root has been replaced in the path.
subpath = 'foo/woo/bar'
chroot_path = os.path.join(constants.CHROOT_SOURCE_ROOT, subpath)
path = os.path.join(constants.SOURCE_ROOT, subpath)
self.assertEqual(cros_build_lib.FromChrootPath(chroot_path), path)
# Test that a chroot path has been converted.
chroot_path = '/foo/woo/bar'
path = os.path.join(constants.SOURCE_ROOT,
self.assertEqual(cros_build_lib.FromChrootPath(chroot_path), path)
class CollectionTest(cros_test_lib.TestCase):
"""Tests for Collection helper."""
def testDefaults(self):
"""Verify default values kick in."""
O = cros_build_lib.Collection('O', a=0, b='string', c={})
o = O()
self.assertEqual(o.a, 0)
self.assertEqual(o.b, 'string')
self.assertEqual(o.c, {})
def testOverrideDefaults(self):
"""Verify we can set custom values at instantiation time."""
O = cros_build_lib.Collection('O', a=0, b='string', c={})
o = O(a=1000)
self.assertEqual(o.a, 1000)
self.assertEqual(o.b, 'string')
self.assertEqual(o.c, {})
def testSetNoNewMembers(self):
"""Verify we cannot add new members after the fact."""
O = cros_build_lib.Collection('O', a=0, b='string', c={})
o = O()
# Need the func since self.assertRaises evaluates the args in this scope.
def _setit(collection):
collection.does_not_exit = 10
self.assertRaises(AttributeError, _setit, o)
self.assertRaises(AttributeError, setattr, o, 'new_guy', 10)
def testGetNoNewMembers(self):
"""Verify we cannot get new members after the fact."""
O = cros_build_lib.Collection('O', a=0, b='string', c={})
o = O()
# Need the func since self.assertRaises evaluates the args in this scope.
def _getit(collection):
return collection.does_not_exit
self.assertRaises(AttributeError, _getit, o)
self.assertRaises(AttributeError, getattr, o, 'foooo')
def testNewValue(self):
"""Verify we change members correctly."""
O = cros_build_lib.Collection('O', a=0, b='string', c={})
o = O()
o.a = 'a string'
o.c = 123
self.assertEqual(o.a, 'a string')
self.assertEqual(o.b, 'string')
self.assertEqual(o.c, 123)
def testString(self):
"""Make sure the string representation is readable by da hue mans."""
O = cros_build_lib.Collection('O', a=0, b='string', c={})
o = O()
self.assertEqual("Collection_O(a=0, b='string', c={})", str(o))
class GetImageDiskPartitionInfoTests(RunCommandTestCase):
"""Tests the GetImageDiskPartitionInfo function."""
SAMPLE_PARTED = """/foo/chromiumos_qemu_image.bin:3360MB:file:512:512:gpt:;
start size part contents
0 1 PMBR (Boot GUID: 88FB7EB8-2B3F-B943-B933-\
1 1 Pri GPT header
2 32 Pri GPT table
1921024 2097152 1 Label: "STATE"
Type: Linux data
UUID: EEBD83BE-397E-BD44-878B-0DDDD5A5C510
20480 32768 2 Label: "KERN-A"
Type: ChromeOS kernel
UUID: 7007C2F3-08E5-AB40-A4BC-FF5B01F5460D
Attr: priority=15 tries=15 successful=1
1101824 819200 3 Label: "ROOT-A"
Type: ChromeOS rootfs
UUID: F4C5C3AD-027F-894B-80CD-3DEC57932948
53248 32768 4 Label: "KERN-B"
Type: ChromeOS kernel
UUID: C85FB478-404C-8741-ADB8-11312A35880D
Attr: priority=0 tries=0 successful=0
282624 819200 5 Label: "ROOT-B"
Type: ChromeOS rootfs
UUID: A99F4231-1EC3-C542-AC0C-DF3729F5DB07
16448 1 6 Label: "KERN-C"
Type: ChromeOS kernel
UUID: 81F0E336-FAC9-174D-A08C-864FE627B637
Attr: priority=0 tries=0 successful=0
16449 1 7 Label: "ROOT-C"
Type: ChromeOS rootfs
UUID: 9E127FCA-30C1-044E-A5F2-DF74E6932692
86016 32768 8 Label: "OEM"
Type: Linux data
UUID: 72986347-A37C-684F-9A19-4DBAF41C55A9
16450 1 9 Label: "reserved"
Type: ChromeOS reserved
UUID: BA85A0A7-1850-964D-8EF8-6707AC106C3A
16451 1 10 Label: "reserved"
Type: ChromeOS reserved
UUID: 16C9EC9B-50FA-DD46-98DC-F781360817B4
64 16384 11 Label: "RWFW"
Type: ChromeOS firmware
UUID: BE8AECB9-4F78-7C44-8F23-5A9273B7EC8F
249856 32768 12 Label: "EFI-SYSTEM"
Type: EFI System Partition
UUID: 88FB7EB8-2B3F-B943-B933-EEC571FFB6E1
4050847 32 Sec GPT table
4050879 1 Sec GPT header
def testCgpt(self):
"""Tests that we can list all partitions with `cgpt` correctly."""
self.PatchObject(cros_build_lib, 'IsInsideChroot', return_value=True)
self.rc.AddCmdResult(partial_mock.Ignore(), output=self.SAMPLE_CGPT)
partitions = cros_build_lib.GetImageDiskPartitionInfo('...', unit='B')
self.assertEqual(partitions['STATE'].start, 983564288)
self.assertEqual(partitions['STATE'].size, 1073741824)
self.assertEqual(partitions['STATE'].number, 1)
self.assertEqual(partitions['STATE'].name, 'STATE')
self.assertEqual(partitions['EFI-SYSTEM'].start, 249856 * 512)
self.assertEqual(partitions['EFI-SYSTEM'].size, 32768 * 512)
self.assertEqual(partitions['EFI-SYSTEM'].number, 12)
self.assertEqual(partitions['EFI-SYSTEM'].name, 'EFI-SYSTEM')
# Because "reserved" is duplicated, we only have 11 key-value pairs.
self.assertEqual(11, len(partitions))
def testNormalPath(self):
self.PatchObject(cros_build_lib, 'IsInsideChroot', return_value=False)
self.rc.AddCmdResult(partial_mock.Ignore(), output=self.SAMPLE_PARTED)
partitions = cros_build_lib.GetImageDiskPartitionInfo('_ignored')
# Because "reserved" is duplicated, we only have 11 key-value pairs.
self.assertEqual(11, len(partitions))
self.assertEqual(1, partitions['STATE'].number)
self.assertEqual(2147, partitions['ROOT-A'].size)
def testKeyedByNumber(self):
self.PatchObject(cros_build_lib, 'IsInsideChroot', return_value=False)
self.rc.AddCmdResult(partial_mock.Ignore(), output=self.SAMPLE_PARTED)
partitions = cros_build_lib.GetImageDiskPartitionInfo(
'_ignored', key_selector='number'
self.assertEqual(12, len(partitions))
self.assertEqual('STATE', partitions[1].name)
self.assertEqual(2147, partitions[3].size)
self.assertEqual('reserved', partitions[9].name)
self.assertEqual('reserved', partitions[10].name)
def testChangeUnitOutsideChroot(self):
def changeUnit(unit):
cros_build_lib.GetImageDiskPartitionInfo('_ignored', unit)
['-m', '_ignored', 'unit', unit, 'print'],
self.PatchObject(cros_build_lib, 'IsInsideChroot', return_value=False)
self.rc.AddCmdResult(partial_mock.Ignore(), output=self.SAMPLE_PARTED)
# We must use 2-char units here because the mocked output is in 'MB'.
def testChangeUnitInsideChroot(self):
self.PatchObject(cros_build_lib, 'IsInsideChroot', return_value=True)
self.rc.AddCmdResult(partial_mock.Ignore(), output=self.SAMPLE_CGPT)
partitions = cros_build_lib.GetImageDiskPartitionInfo('_ignored', 'B')
self.assertEqual(partitions['STATE'].start, 983564288)
self.assertEqual(partitions['STATE'].size, 1073741824)
partitions = cros_build_lib.GetImageDiskPartitionInfo('_ignored', 'KB')
self.assertEqual(partitions['STATE'].start, 983564288 / 1000.0)
self.assertEqual(partitions['STATE'].size, 1073741824 / 1000.0)
partitions = cros_build_lib.GetImageDiskPartitionInfo('_ignored', 'MB')
self.assertEqual(partitions['STATE'].start, 983564288 / 10.0**6)
self.assertEqual(partitions['STATE'].size, 1073741824 / 10.0**6)
self.assertRaises(KeyError, cros_build_lib.GetImageDiskPartitionInfo,
'_ignored', 'PB')
if __name__ == '__main__':