blob: ceb41dd4a6a963890c20a2dc216a1ed00d74292e [file] [log] [blame]
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Unit tests for the deploy module."""
from __future__ import print_function
import json
import multiprocessing
import os
from chromite.cli import command
from chromite.cli import deploy
from chromite.lib import cros_build_lib
from chromite.lib import cros_test_lib
from chromite.lib import portage_util
from chromite.lib import remote_access
try:
import portage
except ImportError:
if cros_build_lib.IsInsideChroot():
raise
# pylint: disable=protected-access
class ChromiumOSDeviceFake(object):
"""Fake for device."""
def __init__(self):
self.board = 'board'
self.hostname = None
self.username = None
self.port = None
self.lsb_release = None
def IsDirWritable(self, _):
return True
class ChromiumOSDeviceHandlerFake(object):
"""Fake for chromite.lib.remote_access.ChomiumOSDeviceHandler."""
class RemoteAccessFake(object):
"""Fake for chromite.lib.remote_access.RemoteAccess."""
def __init__(self):
self.remote_sh_output = None
def RemoteSh(self, *_args, **_kwargs):
return cros_build_lib.CommandResult(output=self.remote_sh_output)
def __init__(self, *_args, **_kwargs):
self._agent = self.RemoteAccessFake()
self.device = ChromiumOSDeviceFake()
# TODO(dpursell): Mock remote access object in cros_test_lib (brbug.com/986).
def GetAgent(self):
return self._agent
def __exit__(self, _type, _value, _traceback):
pass
def __enter__(self):
return ChromiumOSDeviceFake()
class BrilloDeployOperationFake(deploy.BrilloDeployOperation):
"""Fake for deploy.BrilloDeployOperation."""
def __init__(self, pkg_count, emerge, queue):
super(BrilloDeployOperationFake, self).__init__(pkg_count, emerge)
self._queue = queue
def ParseOutput(self, output=None):
super(BrilloDeployOperationFake, self).ParseOutput(output)
self._queue.put('advance')
class DbApiFake(object):
"""Fake for Portage dbapi."""
def __init__(self, pkgs):
self.pkg_db = {}
for cpv, slot, rdeps_raw, build_time in pkgs:
self.pkg_db[cpv] = {
'SLOT': slot, 'RDEPEND': rdeps_raw, 'BUILD_TIME': build_time}
def cpv_all(self):
return self.pkg_db.keys()
def aux_get(self, cpv, keys):
pkg_info = self.pkg_db[cpv]
return [pkg_info[key] for key in keys]
class PackageScannerFake(object):
"""Fake for PackageScanner."""
def __init__(self, packages):
self.pkgs = packages
self.listed = []
self.num_updates = None
def Run(self, _device, _root, _packages, _update, _deep, _deep_rev):
return self.pkgs, self.listed, self.num_updates
class PortageTreeFake(object):
"""Fake for Portage tree."""
def __init__(self, dbapi):
self.dbapi = dbapi
class TestInstallPackageScanner(cros_test_lib.MockOutputTestCase):
"""Test the update package scanner."""
_BOARD = 'foo_board'
_BUILD_ROOT = '/build/%s' % _BOARD
_VARTREE = [
('foo/app1-1.2.3-r4', '0', 'foo/app2 !foo/app3', '1413309336'),
('foo/app2-4.5.6-r7', '0', '', '1413309336'),
('foo/app4-2.0.0-r1', '0', 'foo/app1 foo/app5', '1413309336'),
('foo/app5-3.0.7-r3', '0', '', '1413309336'),
]
def setUp(self):
"""Patch imported modules."""
self.PatchObject(cros_build_lib, 'GetChoice', return_value=0)
self.device = ChromiumOSDeviceHandlerFake()
self.scanner = deploy._InstallPackageScanner(self._BUILD_ROOT)
def SetupVartree(self, vartree_pkgs):
self.device.GetAgent().remote_sh_output = json.dumps(vartree_pkgs)
def SetupBintree(self, bintree_pkgs):
bintree = PortageTreeFake(DbApiFake(bintree_pkgs))
build_root = os.path.join(self._BUILD_ROOT, '')
portage_db = {build_root: {'bintree': bintree}}
self.PatchObject(portage, 'create_trees', return_value=portage_db)
def ValidatePkgs(self, actual, expected, constraints=None):
# Containing exactly the same packages.
self.assertEquals(sorted(expected), sorted(actual))
# Packages appear in the right order.
if constraints is not None:
for needs, needed in constraints:
self.assertGreater(actual.index(needs), actual.index(needed))
def testRunUpdatedVersion(self):
self.SetupVartree(self._VARTREE)
app1 = 'foo/app1-1.2.5-r4'
self.SetupBintree([
(app1, '0', 'foo/app2 !foo/app3', '1413309336'),
('foo/app2-4.5.6-r7', '0', '', '1413309336'),
])
installs, listed, num_updates = self.scanner.Run(
self.device, '/', ['app1'], True, True, True)
self.ValidatePkgs(installs, [app1])
self.ValidatePkgs(listed, [app1])
self.assertEquals(num_updates, 1)
def testRunUpdatedBuildTime(self):
self.SetupVartree(self._VARTREE)
app1 = 'foo/app1-1.2.3-r4'
self.SetupBintree([
(app1, '0', 'foo/app2 !foo/app3', '1413309350'),
('foo/app2-4.5.6-r7', '0', '', '1413309336'),
])
installs, listed, num_updates = self.scanner.Run(
self.device, '/', ['app1'], True, True, True)
self.ValidatePkgs(installs, [app1])
self.ValidatePkgs(listed, [app1])
self.assertEquals(num_updates, 1)
def testRunExistingDepUpdated(self):
self.SetupVartree(self._VARTREE)
app1 = 'foo/app1-1.2.5-r2'
app2 = 'foo/app2-4.5.8-r3'
self.SetupBintree([
(app1, '0', 'foo/app2 !foo/app3', '1413309350'),
(app2, '0', '', '1413309350'),
])
installs, listed, num_updates = self.scanner.Run(
self.device, '/', ['app1'], True, True, True)
self.ValidatePkgs(installs, [app1, app2], constraints=[(app1, app2)])
self.ValidatePkgs(listed, [app1])
self.assertEquals(num_updates, 2)
def testRunMissingDepUpdated(self):
self.SetupVartree(self._VARTREE)
app1 = 'foo/app1-1.2.5-r2'
app6 = 'foo/app6-1.0.0-r1'
self.SetupBintree([
(app1, '0', 'foo/app2 !foo/app3 foo/app6', '1413309350'),
('foo/app2-4.5.6-r7', '0', '', '1413309336'),
(app6, '0', '', '1413309350'),
])
installs, listed, num_updates = self.scanner.Run(
self.device, '/', ['app1'], True, True, True)
self.ValidatePkgs(installs, [app1, app6], constraints=[(app1, app6)])
self.ValidatePkgs(listed, [app1])
self.assertEquals(num_updates, 1)
def testRunExistingRevDepUpdated(self):
self.SetupVartree(self._VARTREE)
app1 = 'foo/app1-1.2.5-r2'
app4 = 'foo/app4-2.0.1-r3'
self.SetupBintree([
(app1, '0', 'foo/app2 !foo/app3', '1413309350'),
(app4, '0', 'foo/app1 foo/app5', '1413309350'),
('foo/app5-3.0.7-r3', '0', '', '1413309336'),
])
installs, listed, num_updates = self.scanner.Run(
self.device, '/', ['app1'], True, True, True)
self.ValidatePkgs(installs, [app1, app4], constraints=[(app4, app1)])
self.ValidatePkgs(listed, [app1])
self.assertEquals(num_updates, 2)
def testRunMissingRevDepNotUpdated(self):
self.SetupVartree(self._VARTREE)
app1 = 'foo/app1-1.2.5-r2'
app6 = 'foo/app6-1.0.0-r1'
self.SetupBintree([
(app1, '0', 'foo/app2 !foo/app3', '1413309350'),
(app6, '0', 'foo/app1', '1413309350'),
])
installs, listed, num_updates = self.scanner.Run(
self.device, '/', ['app1'], True, True, True)
self.ValidatePkgs(installs, [app1])
self.ValidatePkgs(listed, [app1])
self.assertEquals(num_updates, 1)
def testRunTransitiveDepsUpdated(self):
self.SetupVartree(self._VARTREE)
app1 = 'foo/app1-1.2.5-r2'
app2 = 'foo/app2-4.5.8-r3'
app4 = 'foo/app4-2.0.0-r1'
app5 = 'foo/app5-3.0.8-r2'
self.SetupBintree([
(app1, '0', 'foo/app2 !foo/app3', '1413309350'),
(app2, '0', '', '1413309350'),
(app4, '0', 'foo/app1 foo/app5', '1413309350'),
(app5, '0', '', '1413309350'),
])
installs, listed, num_updates = self.scanner.Run(
self.device, '/', ['app1'], True, True, True)
self.ValidatePkgs(installs, [app1, app2, app4, app5],
constraints=[(app1, app2), (app4, app1), (app4, app5)])
self.ValidatePkgs(listed, [app1])
self.assertEquals(num_updates, 4)
def testRunDisjunctiveDepsExistingUpdated(self):
self.SetupVartree(self._VARTREE)
app1 = 'foo/app1-1.2.5-r2'
self.SetupBintree([
(app1, '0', '|| ( foo/app6 foo/app2 ) !foo/app3', '1413309350'),
('foo/app2-4.5.6-r7', '0', '', '1413309336'),
])
installs, listed, num_updates = self.scanner.Run(
self.device, '/', ['app1'], True, True, True)
self.ValidatePkgs(installs, [app1])
self.ValidatePkgs(listed, [app1])
self.assertEquals(num_updates, 1)
def testRunDisjunctiveDepsDefaultUpdated(self):
self.SetupVartree(self._VARTREE)
app1 = 'foo/app1-1.2.5-r2'
app7 = 'foo/app7-1.0.0-r1'
self.SetupBintree([
(app1, '0', '|| ( foo/app6 foo/app7 ) !foo/app3', '1413309350'),
(app7, '0', '', '1413309350'),
])
installs, listed, num_updates = self.scanner.Run(
self.device, '/', ['app1'], True, True, True)
self.ValidatePkgs(installs, [app1, app7], constraints=[(app1, app7)])
self.ValidatePkgs(listed, [app1])
self.assertEquals(num_updates, 1)
class TestDeploy(cros_test_lib.ProgressBarTestCase):
"""Test deploy.Deploy."""
@staticmethod
def FakeGetPackagesByCPV(cpvs, _strip, _sysroot):
return ['/path/to/%s.tbz2' % cpv.pv for cpv in cpvs]
def setUp(self):
self.PatchObject(remote_access, 'ChromiumOSDeviceHandler',
side_effect=ChromiumOSDeviceHandlerFake)
self.PatchObject(cros_build_lib, 'GetBoard', return_value=None)
self.PatchObject(cros_build_lib, 'GetSysroot', return_value='sysroot')
self.package_scanner = self.PatchObject(deploy, '_InstallPackageScanner')
self.get_packages_paths = self.PatchObject(
deploy, '_GetPackagesByCPV', side_effect=self.FakeGetPackagesByCPV)
self.emerge = self.PatchObject(deploy, '_Emerge', return_value=None)
self.unmerge = self.PatchObject(deploy, '_Unmerge', return_value=None)
def testDeployEmerge(self):
"""Test that deploy._Emerge is called for each package."""
_BINPKG = '/path/to/bar-1.2.5.tbz2'
def FakeIsFile(fname):
return fname == _BINPKG
packages = ['some/foo-1.2.3', _BINPKG, 'some/foobar-2.0']
self.package_scanner.return_value = PackageScannerFake(packages)
self.PatchObject(os.path, 'isfile', side_effect=FakeIsFile)
deploy.Deploy(None, ['package'], force=True, clean_binpkg=False)
# Check that package names were correctly resolved into binary packages.
self.get_packages_paths.assert_called_once_with(
[portage_util.SplitCPV(p) for p in packages if p != _BINPKG],
True, 'sysroot')
# Check that deploy._Emerge is called the right number of times.
self.assertEqual(self.emerge.call_count, len(packages))
self.assertEqual(self.unmerge.call_count, 0)
def testDeployUnmerge(self):
"""Test that deploy._Unmerge is called for each package."""
packages = ['foo', 'bar', 'foobar']
self.package_scanner.return_value = PackageScannerFake(packages)
deploy.Deploy(None, ['package'], force=True, clean_binpkg=False,
emerge=False)
# Check that deploy._Unmerge is called the right number of times.
self.assertEqual(self.emerge.call_count, 0)
self.assertEqual(self.unmerge.call_count, len(packages))
def testDeployMergeWithProgressBar(self):
"""Test that BrilloDeployOperation.Run() is called for merge."""
packages = ['foo', 'bar', 'foobar']
self.package_scanner.return_value = PackageScannerFake(packages)
run = self.PatchObject(deploy.BrilloDeployOperation, 'Run',
return_value=None)
self.PatchObject(command, 'UseProgressBar', return_value=True)
deploy.Deploy(None, ['package'], force=True, clean_binpkg=False)
# Check that BrilloDeployOperation.Run was called.
self.assertTrue(run.called)
def testDeployUnmergeWithProgressBar(self):
"""Test that BrilloDeployOperation.Run() is called for unmerge."""
packages = ['foo', 'bar', 'foobar']
self.package_scanner.return_value = PackageScannerFake(packages)
run = self.PatchObject(deploy.BrilloDeployOperation, 'Run',
return_value=None)
self.PatchObject(command, 'UseProgressBar', return_value=True)
deploy.Deploy(None, ['package'], force=True, clean_binpkg=False,
emerge=False)
# Check that BrilloDeployOperation.Run was called.
self.assertTrue(run.called)
def testBrilloDeployMergeOperation(self):
"""Test that BrilloDeployOperation works for merge."""
def func(queue):
for event in op.MERGE_EVENTS:
queue.get()
print(event)
queue = multiprocessing.Queue()
# Emerge one package.
op = BrilloDeployOperationFake(1, True, queue)
with self.OutputCapturer():
op.Run(func, queue)
# Check that the progress bar prints correctly.
self.AssertProgressBarAllEvents(len(op.MERGE_EVENTS))
def testBrilloDeployUnmergeOperation(self):
"""Test that BrilloDeployOperation works for unmerge."""
def func(queue):
for event in op.UNMERGE_EVENTS:
queue.get()
print(event)
queue = multiprocessing.Queue()
# Unmerge one package.
op = BrilloDeployOperationFake(1, False, queue)
with self.OutputCapturer():
op.Run(func, queue)
# Check that the progress bar prints correctly.
self.AssertProgressBarAllEvents(len(op.UNMERGE_EVENTS))