blob: f60bb56927aff852c543ae64836e91b51891360c [file] [log] [blame]
#!/usr/bin/python
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Test Utils library."""
from __future__ import print_function
import datetime
import operator
import os
import __builtin__
import fixup_path
fixup_path.FixupPath()
from chromite.lib import cros_test_lib
from chromite.lib import osutils
from chromite.lib.paygen import unittest_lib
from chromite.lib.paygen import utils
class TestUtils(unittest_lib.MoxTestCase):
"""Test utils methods."""
def testCreateTmpInvalidPath(self):
"""Test that we create a tmp eventually even with invalid paths."""
tmps = ['/usr/local/nope', '/tmp']
tmp = utils.CreateTmpDir(tmps=tmps)
self.assertTrue(tmp.startswith('/tmp'))
os.rmdir(tmp)
def testCreateTmpRaiseException(self):
"""Test that we raise an exception when we do not have enough space."""
self.assertRaises(utils.UnableToCreateTmpDir, utils.CreateTmpDir,
minimum_size=2**50)
def testCreateTempFileWithContents(self):
"""Verify that we create a temp file with the right message in it."""
message = "Test Message With Rocks In"
# Create the temp file.
with utils.CreateTempFileWithContents(message) as temp_file:
temp_name = temp_file.name
# Verify the name is valid.
self.assertTrue(os.path.exists(temp_name))
# Verify it has the right contents
with open(temp_name, 'r') as f:
contents = f.readlines()
self.assertEqual([message], contents)
# Verify the temp file goes away when we close it.
self.assertFalse(os.path.exists(temp_name))
#pylint: disable-msg=E1101
@osutils.TempDirDecorator
def testListdirFullpath(self):
file_a = os.path.join(self.tempdir, 'a')
file_b = os.path.join(self.tempdir, 'b')
with file(file_a, 'w+'):
pass
with file(file_b, 'w+'):
pass
self.assertEqual(sorted(utils.ListdirFullpath(self.tempdir)),
[file_a, file_b])
def testThreadedMapNormal(self):
args = [2, 5, 7]
results = utils.ThreadedMap((lambda x: x + 1), args)
self.assertEqual(results, [3, 6, 8])
def testThreadedMapStar(self):
args = [(2, 3), (2, 5), (7, 10)]
results = utils.ThreadedMap((lambda x, y: x * y), args, star=True)
self.assertEqual(results, [6, 10, 70])
def testThreadedMapException(self):
args = [(6, 2), (1, 0), (9, 3)]
self.assertRaises(utils.ThreadedMapError, utils.ThreadedMap,
(lambda x, y: x / y), args, star=True)
def testGroup(self):
items = [(1, 'a'), (2, 'b'), (1, 'c')]
self.assertEquals(utils.Group(items, operator.itemgetter(0)),
[(1, [(1, 'a')]), (2, [(2, 'b')]), (1, [(1, 'c')])])
items = [(1, 'c'), (2, 'b'), (1, 'a')]
self.assertEquals(utils.Group(items, operator.itemgetter(0)),
[(1, [(1, 'c')]), (2, [(2, 'b')]), (1, [(1, 'a')])])
items = [(1, 'a'), (1, 'c'), (2, 'b')]
self.assertEquals(utils.Group(items, operator.itemgetter(0)),
[(1, [(1, 'a'), (1, 'c')]), (2, [(2, 'b')])])
items = [(2, 'b'), (1, 'c'), (1, 'a')]
self.assertEquals(utils.Group(items, operator.itemgetter(0)),
[(2, [(2, 'b')]), (1, [(1, 'c'), (1, 'a')])])
# Special case: an empty input.
self.assertEquals(utils.Group([], operator.itemgetter(0)), [])
def testLinear(self):
# Check basic linear growth.
self.assertEquals([utils.Linear(x, 0, 5, 10, 20) for x in range(0, 6)],
range(10, 21, 2))
# Check linear decay.
self.assertEquals([utils.Linear(x, 0, 5, 20, 10) for x in range(0, 6)],
range(20, 9, -2))
# Check threashold enforcement.
self.assertEquals(utils.Linear(-2, 0, 5, 10, 20), 10)
self.assertEquals(utils.Linear(7, 0, 5, 10, 20), 20)
def testTimeDeltaToString(self):
# Shorthand notation.
C = datetime.timedelta
c = C(days=5, hours=3, minutes=15, seconds=33, microseconds=12037)
# Test with default formatting.
self.assertEquals(utils.TimeDeltaToString(C(5)), '5d')
self.assertEquals(utils.TimeDeltaToString(C(hours=3)), '3h')
self.assertEquals(utils.TimeDeltaToString(C(minutes=15)), '15m')
self.assertEquals(utils.TimeDeltaToString(C(seconds=33)), '33s')
self.assertEquals(utils.TimeDeltaToString(C(microseconds=12037)), '0s')
self.assertEquals(utils.TimeDeltaToString(c), '5d3h15m')
# Test with forced seconds and altered precision.
self.assertEquals(
utils.TimeDeltaToString(c, force_seconds=True), '5d3h15m33s')
self.assertEquals(
utils.TimeDeltaToString(c, force_seconds=True, subsecond_precision=1),
'5d3h15m33s')
self.assertEquals(
utils.TimeDeltaToString(c, force_seconds=True, subsecond_precision=2),
'5d3h15m33.01s')
self.assertEquals(
utils.TimeDeltaToString(c, force_seconds=True, subsecond_precision=3),
'5d3h15m33.012s')
self.assertEquals(
utils.TimeDeltaToString(c, force_seconds=True, subsecond_precision=4),
'5d3h15m33.012s')
self.assertEquals(
utils.TimeDeltaToString(c, force_seconds=True, subsecond_precision=5),
'5d3h15m33.01203s')
self.assertEquals(
utils.TimeDeltaToString(c, force_seconds=True, subsecond_precision=6),
'5d3h15m33.012037s')
self.assertEquals(
utils.TimeDeltaToString(c, force_seconds=True, subsecond_precision=7),
'5d3h15m33.012037s')
class YNInteraction():
"""Class to hold a list of responses and expected reault of YN prompt."""
def __init__(self, responses, expected_result):
self.responses = responses
self.expected_result = expected_result
class InputTest(unittest_lib.MoxTestCase):
"""Test cases for utils interactive user input."""
def testGetInput(self):
self.mox.StubOutWithMock(__builtin__, 'raw_input')
prompt = 'Some prompt'
response = 'Some response'
__builtin__.raw_input(prompt).AndReturn(response)
self.mox.ReplayAll()
self.assertEquals(response, utils.GetInput(prompt))
self.mox.VerifyAll()
def _TestYesNoPrompt(self, interactions, prompt_suffix, default, full):
self.mox.StubOutWithMock(__builtin__, 'raw_input')
prompt = 'Continue' # Not important
actual_prompt = '\n%s %s? ' % (prompt, prompt_suffix)
for interaction in interactions:
for response in interaction.responses:
__builtin__.raw_input(actual_prompt).AndReturn(response)
self.mox.ReplayAll()
for interaction in interactions:
retval = utils.YesNoPrompt(default, prompt=prompt, full=full)
msg = ('A %s prompt with %r responses expected result %r, but got %r' %
(prompt_suffix, interaction.responses,
interaction.expected_result, retval))
self.assertEquals(interaction.expected_result, retval, msg=msg)
self.mox.VerifyAll()
def testYesNoDefaultNo(self):
tests = [YNInteraction([''], utils.NO),
YNInteraction(['n'], utils.NO),
YNInteraction(['no'], utils.NO),
YNInteraction(['foo', ''], utils.NO),
YNInteraction(['foo', 'no'], utils.NO),
YNInteraction(['y'], utils.YES),
YNInteraction(['ye'], utils.YES),
YNInteraction(['yes'], utils.YES),
YNInteraction(['foo', 'yes'], utils.YES),
YNInteraction(['foo', 'bar', 'x', 'y'], utils.YES),
]
self._TestYesNoPrompt(tests, '(y/N)', utils.NO, False)
def testYesNoDefaultYes(self):
tests = [YNInteraction([''], utils.YES),
YNInteraction(['n'], utils.NO),
YNInteraction(['no'], utils.NO),
YNInteraction(['foo', ''], utils.YES),
YNInteraction(['foo', 'no'], utils.NO),
YNInteraction(['y'], utils.YES),
YNInteraction(['ye'], utils.YES),
YNInteraction(['yes'], utils.YES),
YNInteraction(['foo', 'yes'], utils.YES),
YNInteraction(['foo', 'bar', 'x', 'y'], utils.YES),
]
self._TestYesNoPrompt(tests, '(Y/n)', utils.YES, False)
def testYesNoDefaultNoFull(self):
tests = [YNInteraction([''], utils.NO),
YNInteraction(['n', ''], utils.NO),
YNInteraction(['no'], utils.NO),
YNInteraction(['foo', ''], utils.NO),
YNInteraction(['foo', 'no'], utils.NO),
YNInteraction(['y', 'yes'], utils.YES),
YNInteraction(['ye', ''], utils.NO),
YNInteraction(['yes'], utils.YES),
YNInteraction(['foo', 'yes'], utils.YES),
YNInteraction(['foo', 'bar', 'y', ''], utils.NO),
YNInteraction(['n', 'y', 'no'], utils.NO),
]
self._TestYesNoPrompt(tests, '(yes/No)', utils.NO, True)
def testYesNoDefaultYesFull(self):
tests = [YNInteraction([''], utils.YES),
YNInteraction(['n', ''], utils.YES),
YNInteraction(['no'], utils.NO),
YNInteraction(['foo', ''], utils.YES),
YNInteraction(['foo', 'no'], utils.NO),
YNInteraction(['y', 'yes'], utils.YES),
YNInteraction(['n', ''], utils.YES),
YNInteraction(['yes'], utils.YES),
YNInteraction(['foo', 'yes'], utils.YES),
YNInteraction(['foo', 'bar', 'n', ''], utils.YES),
YNInteraction(['n', 'y', 'no'], utils.NO),
]
self._TestYesNoPrompt(tests, '(Yes/no)', utils.YES, True)
if __name__ == '__main__':
cros_test_lib.main()