blob: b79aba374c305ff2b72bb0ee0c985caee53e9748 [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Test Utils library."""
from __future__ import print_function
import datetime
import operator
import os
from chromite.lib import cros_test_lib
from chromite.lib.paygen import utils
class TestUtils(cros_test_lib.TempDirTestCase):
"""Test utils methods."""
def testCreateTmpInvalidPath(self):
"""Test that we create a tmp eventually even with invalid paths."""
tmps = ['/usr/local/nope', '/tmp']
tmp = utils.CreateTmpDir(tmps=tmps)
self.assertTrue(tmp.startswith('/tmp'))
os.rmdir(tmp)
def testCreateTmpRaiseException(self):
"""Test that we raise an exception when we do not have enough space."""
self.assertRaises(utils.UnableToCreateTmpDir, utils.CreateTmpDir,
minimum_size=2 ** 50)
def testCreateTempFileWithContents(self):
"""Verify that we create a temp file with the right message in it."""
message = 'Test Message With Rocks In'
# Create the temp file.
with utils.CreateTempFileWithContents(message) as temp_file:
temp_name = temp_file.name
# Verify the name is valid.
self.assertTrue(os.path.exists(temp_name))
# Verify it has the right contents
with open(temp_name, 'r') as f:
contents = f.readlines()
self.assertEqual([message], contents)
# Verify the temp file goes away when we close it.
self.assertFalse(os.path.exists(temp_name))
# pylint: disable=E1101
def testListdirFullpath(self):
file_a = os.path.join(self.tempdir, 'a')
file_b = os.path.join(self.tempdir, 'b')
with file(file_a, 'w+'):
pass
with file(file_b, 'w+'):
pass
self.assertEqual(sorted(utils.ListdirFullpath(self.tempdir)),
[file_a, file_b])
def testThreadedMapNormal(self):
args = [2, 5, 7]
results = utils.ThreadedMap((lambda x: x + 1), args)
self.assertEqual(results, [3, 6, 8])
def testThreadedMapStar(self):
args = [(2, 3), (2, 5), (7, 10)]
results = utils.ThreadedMap((lambda x, y: x * y), args, star=True)
self.assertEqual(results, [6, 10, 70])
def testThreadedMapException(self):
args = [(6, 2), (1, 0), (9, 3)]
self.assertRaises(utils.ThreadedMapError, utils.ThreadedMap,
(lambda x, y: x / y), args, star=True)
def testGroup(self):
items = [(1, 'a'), (2, 'b'), (1, 'c')]
self.assertEquals(utils.Group(items, operator.itemgetter(0)),
[(1, [(1, 'a')]), (2, [(2, 'b')]), (1, [(1, 'c')])])
items = [(1, 'c'), (2, 'b'), (1, 'a')]
self.assertEquals(utils.Group(items, operator.itemgetter(0)),
[(1, [(1, 'c')]), (2, [(2, 'b')]), (1, [(1, 'a')])])
items = [(1, 'a'), (1, 'c'), (2, 'b')]
self.assertEquals(utils.Group(items, operator.itemgetter(0)),
[(1, [(1, 'a'), (1, 'c')]), (2, [(2, 'b')])])
items = [(2, 'b'), (1, 'c'), (1, 'a')]
self.assertEquals(utils.Group(items, operator.itemgetter(0)),
[(2, [(2, 'b')]), (1, [(1, 'c'), (1, 'a')])])
# Special case: an empty input.
self.assertEquals(utils.Group([], operator.itemgetter(0)), [])
def testLinear(self):
# Check basic linear growth.
self.assertEquals([utils.Linear(x, 0, 5, 10, 20) for x in range(0, 6)],
range(10, 21, 2))
# Check linear decay.
self.assertEquals([utils.Linear(x, 0, 5, 20, 10) for x in range(0, 6)],
range(20, 9, -2))
# Check threashold enforcement.
self.assertEquals(utils.Linear(-2, 0, 5, 10, 20), 10)
self.assertEquals(utils.Linear(7, 0, 5, 10, 20), 20)
def testTimeDeltaToString(self):
# Shorthand notation.
C = datetime.timedelta
c = C(days=5, hours=3, minutes=15, seconds=33, microseconds=12037)
# Test with default formatting.
self.assertEquals(utils.TimeDeltaToString(C(5)), '5d')
self.assertEquals(utils.TimeDeltaToString(C(hours=3)), '3h')
self.assertEquals(utils.TimeDeltaToString(C(minutes=15)), '15m')
self.assertEquals(utils.TimeDeltaToString(C(seconds=33)), '33s')
self.assertEquals(utils.TimeDeltaToString(C(microseconds=12037)), '0s')
self.assertEquals(utils.TimeDeltaToString(c), '5d3h15m')
# Test with forced seconds and altered precision.
self.assertEquals(
utils.TimeDeltaToString(c, force_seconds=True), '5d3h15m33s')
self.assertEquals(
utils.TimeDeltaToString(c, force_seconds=True, subsecond_precision=1),
'5d3h15m33s')
self.assertEquals(
utils.TimeDeltaToString(c, force_seconds=True, subsecond_precision=2),
'5d3h15m33.01s')
self.assertEquals(
utils.TimeDeltaToString(c, force_seconds=True, subsecond_precision=3),
'5d3h15m33.012s')
self.assertEquals(
utils.TimeDeltaToString(c, force_seconds=True, subsecond_precision=4),
'5d3h15m33.012s')
self.assertEquals(
utils.TimeDeltaToString(c, force_seconds=True, subsecond_precision=5),
'5d3h15m33.01203s')
self.assertEquals(
utils.TimeDeltaToString(c, force_seconds=True, subsecond_precision=6),
'5d3h15m33.012037s')
self.assertEquals(
utils.TimeDeltaToString(c, force_seconds=True, subsecond_precision=7),
'5d3h15m33.012037s')