blob: 34c949e88372387e21020e9ea5bb20ddac56b68d [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Test the cros_build_lib module."""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(
import contextlib
import datetime
import difflib
import errno
import functools
import itertools
import logging
import mox
import signal
import StringIO
import time
import __builtin__
from chromite.buildbot import constants
from chromite.lib import cros_build_lib
from chromite.lib import git
from chromite.lib import cros_test_lib
from chromite.lib import osutils
from chromite.lib import partial_mock
from chromite.lib import retry_util
from chromite.lib import signals as cros_signals
# TODO(build): Finish test wrapper (
# Until then, this has to be after the chromite imports.
import mock
# pylint: disable=W0212,R0904
class CmdToStrTest(cros_test_lib.TestCase):
"""Test the CmdToStr function."""
def setUp(self):
self.differ = difflib.Differ()
def _assertEqual(self, func, test_input, test_output, result):
"""Like assertEqual but with built in diff support."""
diff = '\n'.join(list([test_output], [result])))
msg = ('Expected %s to translate %r to %r, but got %r\n%s' %
(func, test_input, test_output, result, diff))
self.assertEqual(test_output, result, msg)
# Also make sure the result is a string, otherwise the %r output will
# include a "u" prefix and that is not good for logging.
self.assertEqual(type(test_output), str)
def _testData(self, functor, tests):
"""Process a dict of test data."""
for test_output, test_input in tests.iteritems():
result = functor(test_input)
self._assertEqual(functor.__name__, test_input, test_output, result)
def testShellQuote(self):
"""Basic ShellQuote tests."""
# Dict of expected output strings to input lists.
tests = {
"''": '',
'a': unicode('a'),
"'a b c'": unicode('a b c'),
"'a\tb'": 'a\tb',
"'/a$file'": '/a$file',
"'/a#file'": '/a#file',
"""'b"c'""": 'b"c',
"'a@()b'": 'a@()b',
'j%k': 'j%k',
r'''"'a\$va\r"''': r"'a$va\r",
self._testData(cros_build_lib.ShellQuote, tests)
def testCmdToStr(self):
# Dict of expected output strings to input lists.
tests = {
r"a b": ['a', 'b'],
r"'a b' c": ['a b', 'c'],
r'''a "b'c"''': ['a', "b'c"],
r'''a "/'\$b" 'a b c' "xy'z"''':
[unicode('a'), "/'$b", 'a b c', "xy'z"],
'': [],
self._testData(cros_build_lib.CmdToStr, tests)
class RunCommandMock(partial_mock.PartialCmdMock):
"""Provides a context where all RunCommand invocations low-level mocked."""
TARGET = 'chromite.lib.cros_build_lib'
ATTRS = ('RunCommand',)
DEFAULT_ATTR = 'RunCommand'
def RunCommand(self, cmd, *args, **kwargs):
result = self._results['RunCommand'].LookupResult(
(cmd,), hook_args=(cmd,) + args, hook_kwargs=kwargs)
popen_mock = PopenMock()
popen_mock.AddCmdResult(partial_mock.Ignore(), result.returncode,
result.output, result.error)
with popen_mock:
return self.backup['RunCommand'](cmd, *args, **kwargs)
class RunCommandTestCase(cros_test_lib.MockTestCase):
"""MockTestCase that mocks out RunCommand by default."""
def setUp(self):
self.rc = self.StartPatcher(RunCommandMock())
self.assertCommandCalled = self.rc.assertCommandCalled
self.assertCommandContains = self.rc.assertCommandContains
class RunCommandTempDirTestCase(RunCommandTestCase,
"""Convenience class mixing TempDirTestCase and RunCommandTestCase"""
class PopenMock(partial_mock.PartialCmdMock):
"""Provides a context where all _Popen instances are low-level mocked."""
TARGET = 'chromite.lib.cros_build_lib._Popen'
ATTRS = ('__init__',)
DEFAULT_ATTR = '__init__'
def __init__(self):
partial_mock.PartialCmdMock.__init__(self, create_tempdir=True)
def _target__init__(self, inst, cmd, *args, **kwargs):
result = self._results['__init__'].LookupResult(
(cmd,), hook_args=(inst, cmd,) + args, hook_kwargs=kwargs)
script = os.path.join(self.tempdir, '')
stdout = os.path.join(self.tempdir, 'output')
stderr = os.path.join(self.tempdir, 'error')
osutils.WriteFile(stdout, result.output)
osutils.WriteFile(stderr, result.error)
['#!/bin/bash\n', 'cat %s\n' % stdout, 'cat %s >&2\n' % stderr,
'exit %s' % result.returncode])
os.chmod(script, 0o700)
kwargs['cwd'] = self.tempdir
self.backup['__init__'](inst, [script, '--'] + cmd, *args, **kwargs)
class TestRunCommandNoMock(cros_test_lib.TestCase):
"""Class that tests RunCommand by not mocking subprocess.Popen"""
def testErrorCodeNotRaisesError(self):
"""Don't raise exception when command returns non-zero exit code."""
result = cros_build_lib.RunCommand(['ls', '/does/not/exist'],
self.assertTrue(result.returncode != 0)
def testReturnCodeNotZeroErrorOkNotRaisesError(self):
"""Raise error when proc.communicate() returns non-zero."""
self.assertRaises(cros_build_lib.RunCommandError, cros_build_lib.RunCommand,
def _ForceLoggingLevel(functor):
def inner(*args, **kwargs):
current = cros_build_lib.logger.getEffectiveLevel()
return functor(*args, **kwargs)
return inner
class TestRunCommand(cros_test_lib.MoxTestCase):
"""Tests of RunCommand functionality."""
def setUp(self):
# Get the original value for SIGINT so our signal() mock can return the
# correct thing.
self._old_sigint = signal.getsignal(signal.SIGINT)
self.mox.StubOutWithMock(cros_build_lib, '_Popen', use_mock_anything=True)
self.mox.StubOutWithMock(signal, 'signal')
self.mox.StubOutWithMock(signal, 'getsignal')
self.mox.StubOutWithMock(cros_signals, 'SignalModuleUsable')
self.proc_mock = self.mox.CreateMockAnything()
self.error = 'test error'
self.output = 'test output'
def _SetupPopen(self, cmd, **kwargs):
ignore_sigint = kwargs.pop('ignore_sigint', False)
for val in ('cwd', 'stdin', 'stdout', 'stderr'):
kwargs.setdefault(val, None)
kwargs.setdefault('shell', False)
kwargs.setdefault('env', mox.IgnoreArg())
kwargs['close_fds'] = True
# Make some arbitrary functors we can pretend are signal handlers.
# Note that these are intentionally defined on the fly via lambda-
# this is to ensure that they're unique to each run.
sigint_suppress = lambda signum, frame:None
sigint_suppress.__name__ = 'sig_ign_sigint'
normal_sigint = lambda signum, frame:None
normal_sigint.__name__ = 'sigint'
normal_sigterm = lambda signum, frame:None
normal_sigterm.__name__ = 'sigterm'
# If requested, RunCommand will ignore sigints; record that.
if ignore_sigint:
signal.signal(signal.SIGINT, signal.SIG_IGN).AndReturn(sigint_suppress)
signal.signal(signal.SIGINT, mox.IgnoreArg()).AndReturn(normal_sigint)
signal.signal(signal.SIGTERM, mox.IgnoreArg()).AndReturn(normal_sigterm)
cros_build_lib._Popen(cmd, **kwargs).AndReturn(self.proc_mock)
yield self.proc_mock
# If it ignored them, RunCommand will restore sigints; record that.
if ignore_sigint:
signal.signal(signal.SIGINT, sigint_suppress).AndReturn(signal.SIG_IGN)
signal.signal(signal.SIGINT, normal_sigint).AndReturn(None)
signal.signal(signal.SIGTERM, normal_sigterm).AndReturn(None)
def _AssertCrEqual(self, expected, actual):
"""Helper method to compare two CommandResult objects.
This is needed since assertEqual does not know how to compare two
CommandResult objects.
expected: a CommandResult object, expected result.
actual: a CommandResult object, actual result.
self.assertEqual(expected.cmd, actual.cmd)
self.assertEqual(expected.error, actual.error)
self.assertEqual(expected.output, actual.output)
self.assertEqual(expected.returncode, actual.returncode)
def _TestCmd(self, cmd, real_cmd, sp_kv=dict(), rc_kv=dict(), sudo=False):
"""Factor out common setup logic for testing RunCommand().
cmd: a string or an array of strings that will be passed to RunCommand.
real_cmd: the real command we expect RunCommand to call (might be
modified to have enter_chroot).
sp_kv: key-value pairs passed to subprocess.Popen().
rc_kv: key-value pairs passed to RunCommand().
sudo: use SudoRunCommand() rather than RunCommand().
expected_result = cros_build_lib.CommandResult()
expected_result.cmd = real_cmd
expected_result.error = self.error
expected_result.output = self.output
expected_result.returncode = self.proc_mock.returncode
arg_dict = dict()
for attr in 'close_fds cwd env stdin stdout stderr shell'.split():
if attr in sp_kv:
arg_dict[attr] = sp_kv[attr]
if attr == 'close_fds':
arg_dict[attr] = True
elif attr == 'shell':
arg_dict[attr] = False
arg_dict[attr] = None
with self._SetupPopen(real_cmd,
**sp_kv) as proc:
proc.communicate(None).AndReturn((self.output, self.error))
if sudo:
actual_result = cros_build_lib.SudoRunCommand(cmd, **rc_kv)
actual_result = cros_build_lib.RunCommand(cmd, **rc_kv)
self._AssertCrEqual(expected_result, actual_result)
def testReturnCodeZeroWithArrayCmd(self, ignore_sigint=False):
"""--enter_chroot=False and --cmd is an array of strings.
Parameterized so this can also be used by some other tests w/ alternate
params to RunCommand().
ignore_sigint: If True, we'll tell RunCommand to ignore sigint.
self.proc_mock.returncode = 0
cmd_list = ['foo', 'bar', 'roger']
self._TestCmd(cmd_list, cmd_list,
def testSignalRestoreNormalCase(self):
"""Test RunCommand() properly sets/restores sigint. Normal case."""
def testReturnCodeZeroWithArrayCmdEnterChroot(self):
"""--enter_chroot=True and --cmd is an array of strings."""
self.proc_mock.returncode = 0
cmd_list = ['foo', 'bar', 'roger']
real_cmd = cmd_list
if not cros_build_lib.IsInsideChroot():
real_cmd = ['cros_sdk', '--'] + cmd_list
self._TestCmd(cmd_list, real_cmd, rc_kv=dict(enter_chroot=True))
def testCommandFailureRaisesError(self, ignore_sigint=False):
"""Verify error raised by communicate() is caught.
Parameterized so this can also be used by some other tests w/ alternate
params to RunCommand().
ignore_sigint: If True, we'll tell RunCommand to ignore sigint.
cmd = 'test cmd'
with self._SetupPopen(['/bin/bash', '-c', cmd],
ignore_sigint=ignore_sigint) as proc:
proc.communicate(None).AndReturn((self.output, self.error))
cros_build_lib.RunCommand, cmd, shell=True,
ignore_sigint=ignore_sigint, error_code_ok=False)
def testSubprocessCommunicateExceptionRaisesError(self, ignore_sigint=False):
"""Verify error raised by communicate() is caught.
Parameterized so this can also be used by some other tests w/ alternate
params to RunCommand().
ignore_sigint: If True, we'll tell RunCommand to ignore sigint.
cmd = ['test', 'cmd']
with self._SetupPopen(cmd, ignore_sigint=ignore_sigint) as proc:
self.assertRaises(ValueError, cros_build_lib.RunCommand, cmd,
def testSignalRestoreExceptionCase(self):
"""Test RunCommand() properly sets/restores sigint. Exception case."""
def testEnvWorks(self):
"""Test RunCommand(..., env=xyz) works."""
# We'll put this bogus environment together, just to make sure
# subprocess.Popen gets passed it.
env = {'Tom': 'Jerry', 'Itchy': 'Scratchy'}
# This is a simple case, copied from testReturnCodeZeroWithArrayCmd()
self.proc_mock.returncode = 0
cmd_list = ['foo', 'bar', 'roger']
# Run. We expect the env= to be passed through from sp (subprocess.Popen)
# to rc (RunCommand).
self._TestCmd(cmd_list, cmd_list,
def testExtraEnvOnlyWorks(self):
"""Test RunCommand(..., extra_env=xyz) works."""
# We'll put this bogus environment together, just to make sure
# subprocess.Popen gets passed it.
extra_env = {'Pinky' : 'Brain'}
## This is a little bit circular, since the same logic is used to compute
## the value inside, but at least it checks that this happens.
total_env = os.environ.copy()
# This is a simple case, copied from testReturnCodeZeroWithArrayCmd()
self.proc_mock.returncode = 0
cmd_list = ['foo', 'bar', 'roger']
# Run. We expect the env= to be passed through from sp (subprocess.Popen)
# to rc (RunCommand).
self._TestCmd(cmd_list, cmd_list,
def testExtraEnvTooWorks(self):
"""Test RunCommand(..., env=xy, extra_env=z) works."""
# We'll put this bogus environment together, just to make sure
# subprocess.Popen gets passed it.
env = {'Tom': 'Jerry', 'Itchy': 'Scratchy'}
extra_env = {'Pinky': 'Brain'}
total_env = {'Tom': 'Jerry', 'Itchy': 'Scratchy', 'Pinky': 'Brain'}
# This is a simple case, copied from testReturnCodeZeroWithArrayCmd()
self.proc_mock.returncode = 0
cmd_list = ['foo', 'bar', 'roger']
# Run. We expect the env= to be passed through from sp (subprocess.Popen)
# to rc (RunCommand).
self._TestCmd(cmd_list, cmd_list,
rc_kv=dict(env=env, extra_env=extra_env))
def testExceptionEquality(self):
"""Verify equality methods for RunCommandError"""
c1 = cros_build_lib.CommandResult(cmd=['ls', 'arg'], returncode=1)
c2 = cros_build_lib.CommandResult(cmd=['ls', 'arg1'], returncode=1)
c3 = cros_build_lib.CommandResult(cmd=['ls', 'arg'], returncode=2)
e1 = cros_build_lib.RunCommandError('Message 1', c1)
e2 = cros_build_lib.RunCommandError('Message 1', c1)
e_diff_msg = cros_build_lib.RunCommandError('Message 2', c1)
e_diff_cmd = cros_build_lib.RunCommandError('Message 1', c2)
e_diff_code = cros_build_lib.RunCommandError('Message 1', c3)
self.assertEqual(e1, e2)
self.assertNotEqual(e1, e_diff_msg)
self.assertNotEqual(e1, e_diff_cmd)
self.assertNotEqual(e1, e_diff_code)
def testSudoRunCommand(self):
"""Test SudoRunCommand(...) works."""
cmd_list = ['foo', 'bar', 'roger']
sudo_list = ['sudo', '--'] + cmd_list
self.proc_mock.returncode = 0
self._TestCmd(cmd_list, sudo_list, sudo=True)
def testSudoRunCommandShell(self):
"""Test SudoRunCommand(..., shell=True) works."""
cmd = 'foo bar roger'
sudo_list = ['sudo', '--', '/bin/bash', '-c', cmd]
self.proc_mock.returncode = 0
self._TestCmd(cmd, sudo_list, sudo=True,
def testSudoRunCommandEnv(self):
"""Test SudoRunCommand(..., extra_env=z) works."""
cmd_list = ['foo', 'bar', 'roger']
sudo_list = ['sudo', 'shucky=ducky', '--'] + cmd_list
extra_env = {'shucky' : 'ducky'}
self.proc_mock.returncode = 0
self._TestCmd(cmd_list, sudo_list, sudo=True,
def testSudoRunCommandUser(self):
"""Test SudoRunCommand(..., user='...') works."""
cmd_list = ['foo', 'bar', 'roger']
sudo_list = ['sudo', '-u', 'MMMMMonster', '--'] + cmd_list
self.proc_mock.returncode = 0
self._TestCmd(cmd_list, sudo_list, sudo=True,
def testSudoRunCommandUserShell(self):
"""Test SudoRunCommand(..., user='...', shell=True) works."""
cmd = 'foo bar roger'
sudo_list = ['sudo', '-u', 'MMMMMonster', '--', '/bin/bash', '-c', cmd]
self.proc_mock.returncode = 0
self._TestCmd(cmd, sudo_list, sudo=True,
rc_kv=dict(user='MMMMMonster', shell=True))
class TestRunCommandLogging(cros_test_lib.TempDirTestCase):
"""Tests of RunCommand logging."""
def testLogStdoutToFile(self):
# Make mox happy.
log = os.path.join(self.tempdir, 'output')
ret = cros_build_lib.RunCommand(
['python', '-c', 'print "monkeys"'], log_stdout_to_file=log)
self.assertEqual(osutils.ReadFile(log), 'monkeys\n')
self.assertTrue(ret.output is None)
self.assertTrue(ret.error is None)
# Validate dumb api usage.
ret = cros_build_lib.RunCommand(
['python', '-c', 'import sys;print "monkeys2"'],
log_stdout_to_file=log, redirect_stdout=True)
self.assertTrue(ret.output is None)
self.assertEqual(osutils.ReadFile(log), 'monkeys2\n')
ret = cros_build_lib.RunCommand(
['python', '-c', 'import sys;print >> sys.stderr, "monkeys3"'],
log_stdout_to_file=log, redirect_stderr=True)
self.assertEqual(ret.error, 'monkeys3\n')
self.assertEqual(os.stat(log).st_size, 0)
ret = cros_build_lib.RunCommand(
['python', '-u', '-c',
'import sys;print "monkeys4"\nprint >> sys.stderr, "monkeys5"\n'],
log_stdout_to_file=log, combine_stdout_stderr=True)
self.assertTrue(ret.output is None)
self.assertTrue(ret.error is None)
self.assertEqual(osutils.ReadFile(log), 'monkeys4\nmonkeys5\n')
class TestRetries(cros_test_lib.MoxTestCase):
"""Tests of GenericRetry and relatives."""
def testGenericRetry(self):
source, source2 = iter(xrange(5)).next, iter(xrange(5)).next
def f():
val = source2()
self.assertEqual(val, source())
if val < 4:
raise ValueError()
return val
handler = lambda ex: isinstance(ex, ValueError)
self.assertRaises(ValueError, retry_util.GenericRetry, handler, 3, f)
self.assertEqual(4, retry_util.GenericRetry(handler, 1, f))
self.assertRaises(StopIteration, retry_util.GenericRetry, handler, 3, f)
def testRetryExceptionBadArgs(self):
"""Verify we reject non-classes or tuples of classes"""
self.assertRaises(TypeError, retry_util.RetryException, '', 3, map)
self.assertRaises(TypeError, retry_util.RetryException, 123, 3, map)
self.assertRaises(TypeError, retry_util.RetryException, None, 3, map)
self.assertRaises(TypeError, retry_util.RetryException, [None], 3, map)
def testRetryException(self):
"""Verify we retry only when certain exceptions get thrown"""
source, source2 = iter(xrange(6)).next, iter(xrange(6)).next
def f():
val = source2()
self.assertEqual(val, source())
if val < 2:
raise OSError()
if val < 5:
raise ValueError()
return val
self.assertRaises(OSError, retry_util.RetryException,
(OSError, ValueError), 2, f)
self.assertRaises(ValueError, retry_util.RetryException,
(OSError, ValueError), 1, f)
self.assertEqual(5, retry_util.RetryException(ValueError, 1, f))
self.assertRaises(StopIteration, retry_util.RetryException,
ValueError, 3, f)
def testBasicRetry(self):
# pylint: disable=E1101
path = os.path.join(self.tempdir, 'script')
paths = {
'stop': os.path.join(self.tempdir, 'stop'),
'store': os.path.join(self.tempdir, 'store')
"import sys\n"
"val = int(open(%(store)r).read())\n"
"stop_val = int(open(%(stop)r).read())\n"
"open(%(store)r, 'w').write(str(val + 1))\n"
"print val\n"
"sys.exit(0 if val == stop_val else 1)\n" % paths)
os.chmod(path, 0o755)
def _setup_counters(start, stop, sleep, sleep_cnt):
for i in xrange(sleep_cnt):
time.sleep(sleep * (i + 1))
osutils.WriteFile(paths['store'], str(start))
osutils.WriteFile(paths['stop'], str(stop))
self.mox.StubOutWithMock(time, 'sleep')
_setup_counters(0, 0, 0, 0)
command = ['python', path]
kwargs = {'redirect_stdout': True, 'print_cmd': False}
self.assertEqual(cros_build_lib.RunCommand(command, **kwargs).output, '0\n')
func = retry_util.RunCommandWithRetries
_setup_counters(2, 2, 0, 0)
self.assertEqual(func(0, command, sleep=0, **kwargs).output, '2\n')
_setup_counters(0, 2, 1, 2)
self.assertEqual(func(2, command, sleep=1, **kwargs).output, '2\n')
_setup_counters(0, 1, 2, 1)
self.assertEqual(func(1, command, sleep=2, **kwargs).output, '1\n')
_setup_counters(0, 3, 3, 2)
func, 2, command, sleep=3, **kwargs)
class TestTimedCommand(cros_test_lib.MoxTestCase):
"""Tests for TimedCommand()"""
def setUp(self):
self.mox.StubOutWithMock(cros_build_lib, 'RunCommand')
def testBasic(self):
"""Make sure simple stuff works."""
cros_build_lib.RunCommand(['true', 'foo'])
cros_build_lib.TimedCommand(cros_build_lib.RunCommand, ['true', 'foo'])
def testArgs(self):
"""Verify passing of optional args to the destination function."""
cros_build_lib.RunCommand(':', shell=True, print_cmd=False,
cros_build_lib.TimedCommand(cros_build_lib.RunCommand, ':', shell=True,
print_cmd=False, error_code_ok=False)
def testLog(self):
"""Verify logging does the right thing."""
self.mox.StubOutWithMock(cros_build_lib.logger, 'log')
cros_build_lib.RunCommand('fun', shell=True)
cros_build_lib.logger.log(mox.IgnoreArg(), 'msg! %s', mox.IgnoreArg())
cros_build_lib.TimedCommand(cros_build_lib.RunCommand, 'fun',
timed_log_msg='msg! %s', shell=True)
class TestListFiles(cros_test_lib.TempDirTestCase):
"""Tests of ListFiles funciton."""
def _CreateNestedDir(self, dir_structure):
for entry in dir_structure:
full_path = os.path.join(os.path.join(self.tempdir, entry))
# ensure dirs are created
if full_path.endswith('/'):
# we only want to create directories
except OSError as err:
if err.errno == errno.EEXIST:
# we don't care if the dir already exists
# create dummy files
tmp = open(full_path, 'w')
def testTraverse(self):
"""Test that we are traversing the directory properly."""
dir_structure = ['one/two/test.txt', 'one/',
files = cros_build_lib.ListFiles(self.tempdir)
for f in files:
f = f.replace(self.tempdir, '').lstrip('/')
if f not in dir_structure:'%s was not found in %s' % (f, dir_structure))
def testEmptyFilePath(self):
"""Test that we return nothing when directories are empty."""
dir_structure = ['one/', 'two/', 'one/a/']
files = cros_build_lib.ListFiles(self.tempdir)
self.assertEqual(files, [])
def testNoSuchDir(self):
cros_build_lib.ListFiles(os.path.join(self.tempdir, 'missing'))
except OSError as err:
self.assertEqual(err.errno, errno.ENOENT)
class HelperMethodSimpleTests(cros_test_lib.TestCase):
"""Tests for various helper methods without using mox."""
def _TestChromeosVersion(self, test_str, expected=None):
actual = cros_build_lib.GetChromeosVersion(test_str)
self.assertEqual(expected, actual)
def testGetChromeosVersionWithValidVersionReturnsValue(self):
expected = ''
self._TestChromeosVersion(test_str, expected)
def testGetChromeosVersionWithMultipleVersionReturnsFirstMatch(self):
expected = ''
self._TestChromeosVersion(test_str, expected)
def testGetChromeosVersionWithInvalidVersionReturnsDefault(self):
test_str = ' CHROMEOS_VERSION_STRING=invalid_version_string '
def testGetChromeosVersionWithEmptyInputReturnsDefault(self):
def testGetChromeosVersionWithNoneInputReturnsDefault(self):
def testUserDateTime(self):
# cros_test_lib.TestCase takes care of saving/restoring the environ.
os.environ['TZ'] = '0'
timeval = 330005000
expected = 'Mon, 16 Jun 1980 12:03:20 +0000 ()'
def testUserDateTimeDateTime(self):
# cros_test_lib.TestCase takes care of saving/restoring the environ.
os.environ['TZ'] = '0'
timeval = datetime.datetime(1980, 6, 16)
expected = 'Mon, 16 Jun 1980 00:00:00 +0000 ()'
class YNInteraction():
"""Class to hold a list of responses and expected reault of YN prompt."""
def __init__(self, responses, expected_result):
self.responses = responses
self.expected_result = expected_result
class TestInput(cros_test_lib.MoxOutputTestCase):
"""Tests of input gathering functionality."""
def testGetInput(self):
self.mox.StubOutWithMock(__builtin__, 'raw_input')
prompt = 'Some prompt'
response = 'Some response'
self.assertEquals(response, cros_build_lib.GetInput(prompt))
def testBooleanPrompt(self):
self.mox.StubOutWithMock(cros_build_lib, 'GetInput')
for value in ('', '', 'yes', 'ye', 'y', 'no', 'n'):
def testBooleanShellValue(self):
"""Verify BooleanShellValue() inputs work as expected"""
for v in (None,):
self.assertTrue(cros_build_lib.BooleanShellValue(v, True))
self.assertFalse(cros_build_lib.BooleanShellValue(v, False))
for v in (1234, '', 'akldjsf', '"'):
self.assertRaises(ValueError, cros_build_lib.BooleanShellValue, v, True)
self.assertTrue(cros_build_lib.BooleanShellValue(v, True, msg=''))
self.assertFalse(cros_build_lib.BooleanShellValue(v, False, msg=''))
for v in ('yes', 'YES', 'YeS', 'y', 'Y', '1', 'true', 'True', 'TRUE',):
self.assertTrue(cros_build_lib.BooleanShellValue(v, True))
self.assertTrue(cros_build_lib.BooleanShellValue(v, False))
for v in ('no', 'NO', 'nO', 'n', 'N', '0', 'false', 'False', 'FALSE',):
self.assertFalse(cros_build_lib.BooleanShellValue(v, True))
self.assertFalse(cros_build_lib.BooleanShellValue(v, False))
class TestContextManagerStack(cros_test_lib.TestCase):
"""Test the ContextManagerStack class."""
def test(self):
invoked = []
counter = iter(itertools.count()).next
def _mk_kls(has_exception=None, exception_kls=None, suppress=False):
class foon(object):
"""Simple context manager which runs checks on __exit__."""
marker = counter()
def __enter__(self):
return self
# pylint: disable=E0213
def __exit__(obj_self, exc_type, exc, traceback):
if has_exception is not None:
self.assertTrue(all(x is not None
for x in (exc_type, exc, traceback)))
self.assertTrue(exc_type == has_exception)
if exception_kls:
raise exception_kls()
if suppress:
return True
return foon
with cros_build_lib.ContextManagerStack() as stack:
# Note... these tests are in reverse, since the exception
# winds its way up the stack.
stack.Add(_mk_kls(ValueError, suppress=True))
stack.Add(_mk_kls(IndexError, exception_kls=ValueError))
self.assertEqual(invoked, list(reversed(range(6))))
class TestManifestCheckout(cros_test_lib.TempDirTestCase):
"""Tests for ManifestCheckout functionality."""
def setUp(self):
self.manifest_dir = os.path.join(self.tempdir, '.repo', 'manifests')
# Initialize a repo intance here.
# TODO(vapier, ferringb): mangle this so it inits from a local
# checkout if one is available, same for the git-repo fetch.
cmd = ['repo', 'init', '-u', constants.MANIFEST_URL]
cros_build_lib.RunCommand(cmd, cwd=self.tempdir, input='',
self.active_manifest = os.path.realpath(
os.path.join(self.tempdir, '.repo', 'manifest.xml'))
def testManifestInheritance(self):
osutils.WriteFile(self.active_manifest, """
<include name="include-target.xml" />
<include name="empty.xml" />
<project name="monkeys" path="baz" remote="foon" revision="master" />
# First, verify it properly explodes if the include can't be found.
git.ManifestCheckout, self.tempdir)
# Next, verify it can read an empty manifest; this is to ensure
# that we can point Manifest at the empty manifest without exploding,
# same for ManifestCheckout; this sort of thing is primarily useful
# to ensure no step of an include assumes everything is yet assembled.
empty_path = os.path.join(self.manifest_dir, 'empty.xml')
osutils.WriteFile(empty_path, '<manifest/>')
git.ManifestCheckout(self.tempdir, manifest_path=empty_path)
# Next, verify include works.
osutils.WriteFile(os.path.join(self.manifest_dir, 'include-target.xml'),
<remote name="foon" fetch="http://localhost" />
manifest = git.ManifestCheckout(self.tempdir)
self.assertEqual(list(manifest.checkouts_by_name), ['monkeys'])
self.assertEqual(list(manifest.remotes), ['foon'])
# pylint: disable=E1101
def testGetManifestsBranch(self):
func = git.ManifestCheckout._GetManifestsBranch
manifest = self.manifest_dir
repo_root = self.tempdir
# pylint: disable=W0613
def reconfig(merge='master', origin='origin'):
if merge is not None:
merge = 'refs/heads/%s' % merge
for key in ('merge', 'origin'):
val = locals()[key]
key = 'branch.default.%s' % key
if val is None:
git.RunGit(manifest, ['config', '--unset', key], error_code_ok=True)
git.RunGit(manifest, ['config', key, val])
# First, verify our assumptions about a fresh repo init are correct.
self.assertEqual('default', git.GetCurrentBranch(manifest))
self.assertEqual('master', func(repo_root))
# Ensure we can handle a missing origin; this can occur jumping between
# branches, and can be worked around.
self.assertEqual('default', git.GetCurrentBranch(manifest))
self.assertEqual('master', func(repo_root))
# TODO(ferringb): convert this over to assertRaises2
def assertExcept(message, **kwargs):
assert "Testing for %s, an exception wasn't thrown." % (message,)
except OSError as e:
self.assertEqual(e.errno, errno.ENOENT)
self.assertTrue(message in str(e),
msg="Couldn't find string %r in error message %r"
% (message, str(e)))
# No merge target means the configuration isn't usable, period.
assertExcept("git tracking configuration for that branch is broken",
# Ensure we detect if we're on the wrong branch, even if it has
# tracking setup.
git.RunGit(manifest, ['checkout', '-t', 'origin/master', '-b', 'test'])
assertExcept("It should be checked out to 'default'")
# Ensure we handle detached HEAD w/ an appropriate exception.
git.RunGit(manifest, ['checkout', '--detach', 'test'])
assertExcept("It should be checked out to 'default'")
# Finally, ensure that if the default branch is non-existant, we still throw
# a usable exception.
git.RunGit(manifest, ['branch', '-d', 'default'])
assertExcept("It should be checked out to 'default'")
def testGitMatchBranchName(self):
git_repo = os.path.join(self.tempdir, '.repo', 'manifests')
branches = git.MatchBranchName(git_repo, 'default', namespace='')
self.assertEqual(branches, ['refs/heads/default'])
branches = git.MatchBranchName(git_repo, 'default', namespace='refs/heads/')
self.assertEqual(branches, ['default'])
branches = git.MatchBranchName(git_repo, 'origin/f.*link',
self.assertTrue('firmware-link-' in branches[0])
branches = git.MatchBranchName(git_repo, 'r23')
self.assertEqual(branches, ['refs/remotes/origin/release-R23-2913.B'])
class Test_iflatten_instance(cros_test_lib.TestCase):
"""Test iflatten_instance function."""
def test_it(self):
f = lambda *a: list(cros_build_lib.iflatten_instance(*a))
self.assertEqual([1, 2], f([1, 2]))
self.assertEqual([1, '2a'], f([1, '2a']))
self.assertEqual([1, 2, 'b'], f([1, [2, 'b']]))
self.assertEqual([1, 2, 'f', 'd', 'a', 's'], f([1, 2, ('fdas',)], int))
self.assertEqual([''], f(''))
class TestKeyValueFiles(cros_test_lib.TempDirTestCase):
"""Tests handling of key/value files."""
def setUp(self):
self.contents = """# A comment !@
A = 1
AA= 2
AAA =3
AAAAA\t \t=\t 5
AAAAAA = 6 \t\t# Another comment
\t# Aerith lives!
C = 'D'
CC= 'D'
CCC ='D'
\t# monsters go boom #
E \t= "Fxxxxx" # Blargl
EE= "Faaa\taaaa"\x20
EEE ="Fk \t kkkk"\t
Q = "'q"
\tQQ ="q'"\x20
R = "r
RR = "rr
RRR = 'rrr
SSS=" ss
self.expected = {
'A': '1',
'AA': '2',
'AAA': '3',
'AAAA': '4',
'AAAAA': '5',
'AAAAAA': '6',
'C': 'D',
'CC': 'D',
'CCC': 'D',
'E': 'Fxxxxx',
'EE': 'Faaa\taaaa',
'EEE': 'Fk \t kkkk',
'Q': "'q",
'QQ': "q'",
'QQQ': '"q"',
'R': 'r\n',
'RR': 'rr\nrrr',
'RRR': 'rrr\n RRRR\n rrr\n',
'SSS': ' ss\n\'ssss\'\nss',
'T': '\nttt'
self.conf_file = os.path.join(self.tempdir, 'file.conf')
osutils.WriteFile(self.conf_file, self.contents)
def _RunAndCompare(self, test_input, multiline):
result = cros_build_lib.LoadKeyValueFile(test_input, multiline=multiline)
self.assertEqual(self.expected, result)
def testLoadFilePath(self):
"""Verify reading a simple file works"""
self._RunAndCompare(self.conf_file, True)
def testLoadStringIO(self):
"""Verify passing in StringIO object works."""
self._RunAndCompare(StringIO.StringIO(self.contents), True)
def testLoadFileObject(self):
"""Verify passing in open file object works."""
with open(self.conf_file) as f:
self._RunAndCompare(f, True)
def testNoMultlineValues(self):
"""Verify exception is thrown when multiline is disabled."""
self.assertRaises(ValueError, self._RunAndCompare, self.conf_file, False)
class SafeRunTest(cros_test_lib.TestCase):
"""Tests SafeRunTest functionality."""
def _raise_exception(self, e):
raise e
def testRunsSafely(self):
"""Verify that we are robust to exceptions."""
def append_val(value):
call_list = []
f_list = [functools.partial(append_val, 1),
Exception('testRunsSafely exception.')),
functools.partial(append_val, 2)]
self.assertRaises(Exception, cros_build_lib.SafeRun, f_list)
self.assertEquals(call_list, [1, 2])
def testRaisesFirstException(self):
"""Verify we raise the first exception when multiple are encountered."""
class E1(Exception):
"""Simple exception class."""
class E2(Exception):
"""Simple exception class."""
f_list = [functools.partial(self._raise_exception, e) for e in [E1, E2]]
self.assertRaises(E1, cros_build_lib.SafeRun, f_list)
def testCombinedRaise(self):
"""Raises a RuntimeError with exceptions combined."""
f_list = [functools.partial(self._raise_exception, Exception())] * 3
self.assertRaises(RuntimeError, cros_build_lib.SafeRun, f_list,
class FrozenAttributesTest(cros_test_lib.TestCase):
"""Tests FrozenAttributesMixin functionality."""
class DummyClass(object):
"""Any class that does not override __setattr__."""
class SetattrClass(object):
"""Class that does override __setattr__."""
def __setattr__(self, attr, value):
"""Adjust value here to later confirm that this code ran."""
object.__setattr__(self, attr, self.SETATTR_OFFSET + value)
def _TestBasics(self, cls):
# pylint: disable=W0201
def _Expected(val):
return getattr(cls, 'SETATTR_OFFSET', 0) + val
obj = cls()
obj.a = 1
obj.b = 2
self.assertEquals(_Expected(1), obj.a)
self.assertEquals(_Expected(2), obj.b)
self.assertRaises(cros_build_lib.AttributeFrozenError, setattr, obj, 'a', 3)
self.assertEquals(_Expected(1), obj.a)
self.assertRaises(cros_build_lib.AttributeFrozenError, setattr, obj, 'c', 3)
self.assertFalse(hasattr(obj, 'c'))
def testFrozenByMetaclass(self):
"""Test attribute freezing with FrozenAttributesClass."""
class DummyByMeta(self.DummyClass):
"""Class that freezes DummyClass using metaclass construct."""
__metaclass__ = cros_build_lib.FrozenAttributesClass
class SetattrByMeta(self.SetattrClass):
"""Class that freezes SetattrClass using metaclass construct."""
__metaclass__ = cros_build_lib.FrozenAttributesClass
def testFrozenByMixinFirst(self):
"""Test attribute freezing with FrozenAttributesMixin first in hierarchy."""
class Dummy(cros_build_lib.FrozenAttributesMixin, self.DummyClass):
"""Class that freezes DummyClass using mixin construct."""
class Setattr(cros_build_lib.FrozenAttributesMixin, self.SetattrClass):
"""Class that freezes SetattrClass using mixin construct."""
def testFrozenByMixinLast(self):
"""Test attribute freezing with FrozenAttributesMixin last in hierarchy."""
class Dummy(self.DummyClass, cros_build_lib.FrozenAttributesMixin):
"""Class that freezes DummyClass using mixin construct."""
class Setattr(self.SetattrClass, cros_build_lib.FrozenAttributesMixin):
"""Class that freezes SetattrClass using mixin construct."""
class TestGetIPv4Address(RunCommandTestCase):
"""Tests the GetIPv4Address function."""
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 16436 qdisc noqueue state UNKNOWN
link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
2: eth0: <NO-CARRIER,BROADCAST,MULTICAST,UP> mtu 1500 qdisc pfifo_fast state \
DOWN qlen 1000
link/ether cc:cc:cc:cc:cc:cc brd ff:ff:ff:ff:ff:ff
3: eth1: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP \
qlen 1000
link/ether dd:dd:dd:dd:dd:dd brd ff:ff:ff:ff:ff:ff
inet brd scope global eth1
inet6 cdef:0:cdef:cdef:cdef:cdef:cdef:cdef/64 scope global dynamic
valid_lft 2592000sec preferred_lft 604800sec
def testGetIPv4AddressParseResult(self):
"""Verifies we can parse the output and get correct IP address."""
self.rc.AddCmdResult(partial_mock.In('ip'), output=self.IP_GLOBAL_OUTPUT)
self.assertEqual(cros_build_lib.GetIPv4Address(), '')
def testGetIPv4Address(self):
"""Tests that correct shell commmand is called."""
cros_build_lib.GetIPv4Address(global_ip=False, dev='eth0')
['ip', 'addr', 'show', 'scope', 'host', 'dev', 'eth0'])
self.rc.assertCommandContains(['ip', 'addr', 'show', 'scope', 'global'])
class TestGetChrootVersion(cros_test_lib.MockTestCase):
"""Tests GetChrootVersion functionality."""
def testSimpleBuildroot(self):
"""Verify buildroot arg works"""
read_mock = self.PatchObject(osutils, 'ReadFile', return_value='12\n')
ret = cros_build_lib.GetChrootVersion(buildroot='/build/root')
self.assertEqual(ret, '12')
def testSimpleChroot(self):
"""Verify chroot arg works"""
read_mock = self.PatchObject(osutils, 'ReadFile', return_value='70')
ret = cros_build_lib.GetChrootVersion(chroot='/ch/root')
self.assertEqual(ret, '70')
def testNoChroot(self):
"""Verify we don't blow up when there is no chroot yet"""
ret = cros_build_lib.GetChrootVersion(chroot='/.$om3/place/nowhere')
self.assertEqual(ret, None)
class TestChrootPathHelpers(cros_test_lib.TestCase):
"""Verify we correctly reinterpret paths to be used inside/outside chroot."""
@mock.patch('chromite.lib.cros_build_lib.IsInsideChroot', return_value=True)
def testToChrootPathInChroot(self, _inchroot_mock):
"""Test we return the original path to be used in chroot while in chroot."""
path = '/foo/woo/bar'
self.assertEqual(cros_build_lib.ToChrootPath(path), path)
@mock.patch('chromite.lib.cros_build_lib.IsInsideChroot', return_value=False)
def testToChrootPathOutChroot(self, _inchroot_mock):
"""Test we convert the path to be used in chroot while outside chroot."""
subpath = 'bar/haa/ooo'
path = os.path.join(constants.SOURCE_ROOT, subpath)
chroot_path = git.ReinterpretPathForChroot(path)
self.assertEqual(cros_build_lib.ToChrootPath(path), chroot_path)
@mock.patch('chromite.lib.cros_build_lib.IsInsideChroot', return_value=True)
def testFromChrootInChroot(self, _inchroot_mock):
"""Test we return the original chroot path while in chroot."""
path = '/foo/woo/bar'
self.assertEqual(cros_build_lib.FromChrootPath(path), path)
@mock.patch('chromite.lib.cros_build_lib.IsInsideChroot', return_value=False)
def testFromChrootOutChroot(self, _inchroot_mock):
"""Test we convert the chroot path to be used outside chroot."""
# Test that chroot source root has been replaced in the path.
subpath = 'foo/woo/bar'
chroot_path = os.path.join(constants.CHROOT_SOURCE_ROOT, subpath)
path = os.path.join(constants.SOURCE_ROOT, subpath)
self.assertEqual(cros_build_lib.FromChrootPath(chroot_path), path)
# Test that a chroot path has been converted.
chroot_path = '/foo/woo/bar'
path = os.path.join(constants.SOURCE_ROOT,
self.assertEqual(cros_build_lib.FromChrootPath(chroot_path), path)
class CollectionTest(cros_test_lib.TestCase):
"""Tests for Collection helper."""
def testDefaults(self):
"""Verify default values kick in."""
O = cros_build_lib.Collection('O', a=0, b='string', c={})
o = O()
self.assertEqual(o.a, 0)
self.assertEqual(o.b, 'string')
self.assertEqual(o.c, {})
def testOverrideDefaults(self):
"""Verify we can set custom values at instantiation time."""
O = cros_build_lib.Collection('O', a=0, b='string', c={})
o = O(a=1000)
self.assertEqual(o.a, 1000)
self.assertEqual(o.b, 'string')
self.assertEqual(o.c, {})
def testSetNoNewMembers(self):
"""Verify we cannot add new members after the fact."""
O = cros_build_lib.Collection('O', a=0, b='string', c={})
o = O()
# Need the func since self.assertRaises evaluates the args in this scope.
def _setit(collection):
collection.does_not_exit = 10
self.assertRaises(AttributeError, _setit, o)
self.assertRaises(AttributeError, setattr, o, 'new_guy', 10)
def testGetNoNewMembers(self):
"""Verify we cannot get new members after the fact."""
O = cros_build_lib.Collection('O', a=0, b='string', c={})
o = O()
# Need the func since self.assertRaises evaluates the args in this scope.
def _getit(collection):
return collection.does_not_exit
self.assertRaises(AttributeError, _getit, o)
self.assertRaises(AttributeError, getattr, o, 'foooo')
def testNewValue(self):
"""Verify we change members correctly."""
O = cros_build_lib.Collection('O', a=0, b='string', c={})
o = O()
o.a = 'a string'
o.c = 123
self.assertEqual(o.a, 'a string')
self.assertEqual(o.b, 'string')
self.assertEqual(o.c, 123)
def testString(self):
"""Make sure the string representation is readable by da hue mans."""
O = cros_build_lib.Collection('O', a=0, b='string', c={})
o = O()
self.assertEqual("Collection_O(a=0, b='string', c={})", str(o))
if __name__ == '__main__':