| # Copyright 2012 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Unit tests for cros_mark_as_stable.py.""" |
| |
| import os |
| from unittest import mock |
| |
| from chromite.lib import constants |
| from chromite.lib import cros_build_lib |
| from chromite.lib import cros_test_lib |
| from chromite.lib import git |
| from chromite.lib import osutils |
| from chromite.lib import parallel |
| from chromite.lib import parallel_unittest |
| from chromite.lib import partial_mock |
| from chromite.lib import portage_util |
| from chromite.lib import repo_util |
| from chromite.scripts import cros_mark_as_stable |
| |
| |
| pytestmark = cros_test_lib.pytestmark_inside_only |
| |
| |
| class RunGitMock(partial_mock.PartialCmdMock): |
| """Partial mock for git.RunMock.""" |
| |
| TARGET = "chromite.lib.git" |
| ATTRS = ("RunGit",) |
| DEFAULT_ATTR = "RunGit" |
| |
| def RunGit(self, _git_repo, cmd, _retry=True, **kwargs): |
| return self._results["RunGit"].LookupResult( |
| (cmd,), hook_args=(cmd,), hook_kwargs=kwargs |
| ) |
| |
| |
| class NonClassTests(cros_test_lib.MockTestCase): |
| """Test the flow for pushing a change.""" |
| |
| def setUp(self) -> None: |
| self._branch = "test_branch" |
| self._target_manifest_branch = "cros/master" |
| |
| def _TestPushChange(self, bad_cls) -> None: |
| side_effect = Exception("unittest says this should not be called") |
| |
| git_log = "Marking test_one as stable\nMarking test_two as stable\n" |
| fake_description = "Marking set of ebuilds as stable\n\n%s" % git_log |
| self.PatchObject(git, "DoesCommitExistInRepo", return_value=True) |
| self.PatchObject( |
| cros_mark_as_stable, "_DoWeHaveLocalCommits", return_value=True |
| ) |
| self.PatchObject( |
| cros_mark_as_stable.GitBranch, |
| "CreateBranch", |
| side_effect=side_effect, |
| ) |
| self.PatchObject( |
| cros_mark_as_stable.GitBranch, "Exists", side_effect=side_effect |
| ) |
| |
| push_mock = self.PatchObject(git, "PushBranch") |
| self.PatchObject( |
| git, |
| "GetTrackingBranch", |
| return_value=git.RemoteRef("gerrit", "refs/remotes/gerrit/master"), |
| ) |
| sync_mock = self.PatchObject(git, "SyncPushBranch") |
| create_mock = self.PatchObject(git, "CreatePushBranch") |
| git_mock = self.StartPatcher(RunGitMock()) |
| |
| git_mock.AddCmdResult(["checkout", self._branch]) |
| |
| cmd = [ |
| "log", |
| "--format=short", |
| "--perl-regexp", |
| "--author", |
| "^(?!chrome-bot|chromeos-ci-prod|chromeos-ci-release)", |
| "refs/remotes/gerrit/master..%s" % self._branch, |
| ] |
| |
| if bad_cls: |
| push_mock.side_effect = side_effect |
| create_mock.side_effect = side_effect |
| git_mock.AddCmdResult(cmd, stdout="Found bad stuff") |
| else: |
| git_mock.AddCmdResult(cmd, stdout="\n") |
| cmd = [ |
| "log", |
| "--format=format:%s%n%n%b", |
| "refs/remotes/gerrit/master..%s" % self._branch, |
| ] |
| git_mock.AddCmdResult(cmd, stdout=git_log) |
| git_mock.AddCmdResult(["merge", "--squash", self._branch]) |
| git_mock.AddCmdResult(["commit", "-m", fake_description]) |
| git_mock.AddCmdResult(["config", "push.default", "tracking"]) |
| |
| cros_mark_as_stable.PushChange( |
| self._branch, self._target_manifest_branch, False, "." |
| ) |
| sync_mock.assert_called_with( |
| ".", "gerrit", "refs/remotes/gerrit/master" |
| ) |
| if not bad_cls: |
| push_mock.assert_called_with( |
| "merge_branch", ".", dryrun=False, staging_branch=None |
| ) |
| create_mock.assert_called_with( |
| "merge_branch", ".", remote_push_branch=mock.ANY |
| ) |
| |
| def testPushChange(self) -> None: |
| """Verify pushing changes works.""" |
| self._TestPushChange(bad_cls=False) |
| |
| def testPushChangeBadCls(self) -> None: |
| """Verify we do not push bad CLs.""" |
| self.assertRaises(AssertionError, self._TestPushChange, bad_cls=True) |
| |
| |
| class EbuildMock: |
| """Mock portage_util.Ebuild.""" |
| |
| def __init__(self, path, new_package=True) -> None: |
| self.path = path |
| self.package = "%s_package" % path |
| self.cros_workon_vars = "cros_workon_vars" |
| self._new_package = new_package |
| |
| # pylint: disable=unused-argument |
| def RevWorkOnEBuild(self, srcroot, manifest, reject_self_repo=True): |
| if self._new_package: |
| return ( |
| "%s_new_package" % self.path, |
| "%s_new_ebuild" % self.path, |
| "%s_old_ebuild" % self.path, |
| ) |
| |
| |
| # pylint: disable=protected-access |
| class MarkAsStableCMDTest(cros_test_lib.MockTempDirTestCase): |
| """Test cros_mark_as_stable commands.""" |
| |
| def setUp(self) -> None: |
| self._manifest = "manifest" |
| self._parser = cros_mark_as_stable.GetParser() |
| self._package_list = ["pkg1"] |
| |
| self._overlays = [ |
| os.path.join(self.tempdir, "overlay_%s" % i) for i in range(0, 3) |
| ] |
| |
| self._overlay_remote_ref = { |
| self._overlays[0]: git.RemoteRef("remote", "ref", "project_1"), |
| self._overlays[1]: git.RemoteRef("remote", "ref", "project_1"), |
| self._overlays[2]: git.RemoteRef("remote", "ref", "project_2"), |
| } |
| |
| self._git_project_overlays = {} |
| self._overlay_tracking_branch = {} |
| for overlay in self._overlays: |
| self._git_project_overlays.setdefault( |
| self._overlay_remote_ref[overlay], [] |
| ).append(overlay) |
| self._overlay_tracking_branch[overlay] = self._overlay_remote_ref[ |
| overlay |
| ].ref |
| |
| self.PatchObject(git, "GetTrackingBranchViaManifest") |
| # TODO: Remove explicit --buildroot after --srcroot remove and a normal |
| # default is set. |
| self._commit_options = self._parser.parse_args( |
| ["commit", "--buildroot", str(constants.SOURCE_ROOT)] |
| ) |
| self._push_options = self._parser.parse_args( |
| ["push", "--buildroot", str(constants.SOURCE_ROOT)] |
| ) |
| |
| def testWorkOnPush(self) -> None: |
| """Test _WorkOnPush.""" |
| self.PatchObject(parallel, "RunTasksInProcessPool") |
| |
| cros_mark_as_stable._WorkOnPush( |
| self._push_options, |
| self._overlay_tracking_branch, |
| self._git_project_overlays, |
| ) |
| |
| def testPushOverlays(self) -> None: |
| """Test _PushOverlays.""" |
| self.PatchObject(os.path, "isdir", return_value=True) |
| mock_push_change = self.PatchObject(cros_mark_as_stable, "PushChange") |
| |
| cros_mark_as_stable._PushOverlays( |
| self._push_options, self._overlays, self._overlay_tracking_branch |
| ) |
| self.assertEqual(mock_push_change.call_count, 3) |
| |
| def testWorkOnCommit(self) -> None: |
| """Test _WorkOnCommit.""" |
| self.PatchObject(parallel, "RunTasksInProcessPool") |
| self.PatchObject(cros_mark_as_stable, "_CommitOverlays") |
| self.PatchObject( |
| cros_mark_as_stable, "_GetOverlayToEbuildsMap", return_value={} |
| ) |
| |
| cros_mark_as_stable._WorkOnCommit( |
| self._commit_options, |
| self._overlays, |
| self._overlay_tracking_branch, |
| self._git_project_overlays, |
| self._manifest, |
| self._package_list, |
| ) |
| |
| def testGetOverlayToEbuildsMap(self) -> None: |
| """Test _GetOverlayToEbuildsMap.""" |
| self.PatchObject( |
| portage_util, "GetOverlayEBuilds", return_value=["ebuild"] |
| ) |
| |
| expected_overlay_dicts = { |
| overlay: ["ebuild"] for overlay in self._overlays |
| } |
| overlay_ebuilds = cros_mark_as_stable._GetOverlayToEbuildsMap( |
| self._commit_options, self._overlays, self._package_list |
| ) |
| self.assertCountEqual(expected_overlay_dicts, overlay_ebuilds) |
| |
| def testCommitOverlays(self) -> None: |
| """Test _CommitOverlays.""" |
| mock_run_process_pool = self.PatchObject( |
| parallel, "RunTasksInProcessPool" |
| ) |
| self.PatchObject(os.path, "isdir", return_value=True) |
| self.PatchObject( |
| git, |
| "RunGit", |
| return_value=cros_build_lib.CompletedProcess(stdout=""), |
| ) |
| self.PatchObject( |
| cros_mark_as_stable, "_WorkOnEbuild", return_value=None |
| ) |
| self.PatchObject(git, "GetGitRepoRevision") |
| self.PatchObject(cros_mark_as_stable.GitBranch, "CreateBranch") |
| self.PatchObject( |
| cros_mark_as_stable.GitBranch, "Exists", return_value=True |
| ) |
| self.PatchObject(portage_util.EBuild, "CommitChange") |
| |
| overlay_ebuilds = { |
| self._overlays[0]: ["ebuild_1_1", "ebuild_1_2"], |
| self._overlays[1]: ["ebuild_2_1"], |
| self._overlays[2]: ["ebuild_3_1", "ebuild_3_2"], |
| } |
| |
| cros_mark_as_stable._CommitOverlays( |
| self._commit_options, |
| self._manifest, |
| self._overlays, |
| self._overlay_tracking_branch, |
| overlay_ebuilds, |
| [], |
| [], |
| ) |
| self.assertEqual(3, mock_run_process_pool.call_count) |
| |
| def testWorkOnEbuildWithNewPackage(self) -> None: |
| """Test _WorkOnEbuild with new packages.""" |
| overlay = self._overlays[0] |
| ebuild = EbuildMock("ebuild") |
| |
| with parallel.Manager() as manager: |
| revved_packages = manager.list() |
| new_package_atoms = manager.list() |
| |
| messages = manager.list() |
| ebuild_paths_to_add = manager.list() |
| ebuild_paths_to_remove = manager.list() |
| |
| cros_mark_as_stable._WorkOnEbuild( |
| overlay, |
| ebuild, |
| self._manifest, |
| self._commit_options, |
| ebuild_paths_to_add, |
| ebuild_paths_to_remove, |
| messages, |
| revved_packages, |
| new_package_atoms, |
| ) |
| self.assertCountEqual(ebuild_paths_to_add, ["ebuild_new_ebuild"]) |
| self.assertCountEqual(ebuild_paths_to_remove, ["ebuild_old_ebuild"]) |
| self.assertCountEqual( |
| messages, |
| [cros_mark_as_stable._GIT_COMMIT_MESSAGE % "ebuild_package"], |
| ) |
| self.assertCountEqual(revved_packages, ["ebuild_package"]) |
| self.assertCountEqual(new_package_atoms, ["=ebuild_new_package"]) |
| |
| def testWorkOnEbuildWithoutNewPackage(self) -> None: |
| """Test _WorkOnEbuild without new packages.""" |
| ebuild = EbuildMock("ebuild", new_package=False) |
| overlay = self._overlays[0] |
| |
| with parallel.Manager() as manager: |
| revved_packages = manager.list() |
| new_package_atoms = manager.list() |
| |
| messages = manager.list() |
| ebuild_paths_to_add = manager.list() |
| ebuild_paths_to_remove = manager.list() |
| |
| cros_mark_as_stable._WorkOnEbuild( |
| overlay, |
| ebuild, |
| self._manifest, |
| self._commit_options, |
| ebuild_paths_to_add, |
| ebuild_paths_to_remove, |
| messages, |
| revved_packages, |
| new_package_atoms, |
| ) |
| self.assertEqual(list(ebuild_paths_to_add), []) |
| self.assertEqual(list(ebuild_paths_to_remove), []) |
| self.assertEqual(list(messages), []) |
| self.assertEqual(list(revved_packages), []) |
| self.assertEqual(list(new_package_atoms), []) |
| |
| |
| class MainTests( |
| cros_test_lib.RunCommandTestCase, cros_test_lib.MockTempDirTestCase |
| ): |
| """Tests for cros_mark_as_stable.main().""" |
| |
| def setUp(self) -> None: |
| self.PatchObject( |
| git.ManifestCheckout, "Cached", return_value="manifest" |
| ) |
| self.mock_work_on_push = self.PatchObject( |
| cros_mark_as_stable, "_WorkOnPush" |
| ) |
| self.mock_work_on_commit = self.PatchObject( |
| cros_mark_as_stable, "_WorkOnCommit" |
| ) |
| |
| self._overlays = [] |
| remote_refs = [] |
| self._overlay_tracking_branch = {} |
| self._git_project_overlays = {} |
| for i in range(0, 3): |
| overlay = os.path.join(self.tempdir, "overlay_%s" % i) |
| osutils.SafeMakedirs(overlay) |
| self._overlays.append(overlay) |
| |
| remote_ref = git.RemoteRef("remote", "ref", "project_%s" % i) |
| remote_refs.append(remote_ref) |
| |
| self._overlay_tracking_branch[overlay] = remote_ref.ref |
| self._git_project_overlays[remote_ref.project_name] = [overlay] |
| |
| self.PatchObject( |
| portage_util, "FindOverlays", return_value=self._overlays |
| ) |
| self.PatchObject( |
| git, "GetTrackingBranchViaManifest", side_effect=remote_refs |
| ) |
| |
| def testMainWithCommit(self) -> None: |
| """Test Main with Commit options.""" |
| cros_mark_as_stable.main( |
| ["commit", "--all", "--overlays", ":".join(self._overlays)] |
| ) |
| self.mock_work_on_commit.assert_called_once_with( |
| mock.ANY, |
| self._overlays, |
| self._overlay_tracking_branch, |
| self._git_project_overlays, |
| "manifest", |
| None, |
| ) |
| |
| def testMainWithPush(self) -> None: |
| """Test Main with Push options.""" |
| cros_mark_as_stable.main( |
| ["push", "--all", "--overlays", ":".join(self._overlays)] |
| ) |
| self.mock_work_on_push.assert_called_once_with( |
| mock.ANY, self._overlay_tracking_branch, self._git_project_overlays |
| ) |
| options = self.mock_work_on_push.call_args[0][0] |
| self.assertEqual(options.buildroot, constants.SOURCE_ROOT) |
| self.assertIsNone(options.srcroot) |
| |
| def testMainWithOverlayTypeCommit(self) -> None: |
| """Test Main with Commit options.""" |
| cros_mark_as_stable.main(["commit", "--all", "--overlay-type", "both"]) |
| self.mock_work_on_commit.assert_called_once_with( |
| mock.ANY, |
| self._overlays, |
| self._overlay_tracking_branch, |
| self._git_project_overlays, |
| "manifest", |
| None, |
| ) |
| options = self.mock_work_on_commit.call_args[0][0] |
| self.assertEqual(options.buildroot, constants.SOURCE_ROOT) |
| self.assertIsNone(options.srcroot) |
| |
| def testMainWithBuildroot(self) -> None: |
| """Test Main with Commit options.""" |
| self.PatchObject( |
| os.path, "isdir", side_effect=lambda p: p == "/buildroot" |
| ) |
| |
| cros_mark_as_stable.main( |
| [ |
| "commit", |
| "--all", |
| "--overlay-type", |
| "both", |
| "--buildroot", |
| "/buildroot", |
| ] |
| ) |
| |
| options = self.mock_work_on_commit.call_args[0][0] |
| self.assertEqual(options.buildroot, "/buildroot") |
| self.assertIsNone(options.srcroot) |
| |
| def testMainWithSrcroot(self) -> None: |
| """Test Main with Commit options.""" |
| self.PatchObject( |
| os.path, "isdir", side_effect=lambda p: p == "/buildroot" |
| ) |
| |
| cros_mark_as_stable.main( |
| [ |
| "commit", |
| "--all", |
| "--overlay-type", |
| "both", |
| "--srcroot", |
| "/buildroot/src", |
| ] |
| ) |
| |
| options = self.mock_work_on_commit.call_args[0][0] |
| self.assertEqual(options.buildroot, "/buildroot") |
| self.assertIsNone(options.srcroot) |
| |
| |
| class CleanStalePackagesTest(cros_test_lib.RunCommandTestCase): |
| """Tests for cros_mark_as_stable.CleanStalePackages.""" |
| |
| def setUp(self) -> None: |
| self.PatchObject(osutils, "FindMissingBinaries", return_value=[]) |
| |
| def testNormalClean(self) -> None: |
| """Clean up boards/packages with normal success""" |
| cros_mark_as_stable.CleanStalePackages( |
| ".", ("board1", "board2"), ["cow", "car"] |
| ) |
| |
| def testNothingToUnmerge(self) -> None: |
| """Clean up packages that don't exist (portage will exit 1)""" |
| self.rc.AddCmdResult(partial_mock.In("emerge"), returncode=1) |
| cros_mark_as_stable.CleanStalePackages(".", (), ["no/pkg"]) |
| |
| def testUnmergeError(self) -> None: |
| """Make sure random exit errors are not ignored""" |
| self.rc.AddCmdResult(partial_mock.In("emerge"), returncode=123) |
| with parallel_unittest.ParallelMock(): |
| with self.assertRaises(cros_build_lib.RunCommandError): |
| cros_mark_as_stable.CleanStalePackages(".", (), ["no/pkg"]) |
| |
| |
| class GitBranchTest(cros_test_lib.MockTestCase): |
| """Tests for cros_mark_as_stable.GitBranch.""" |
| |
| def setUp(self) -> None: |
| # Always stub RunCommmand out as we use it in every method. |
| self.git_mock = self.PatchObject(git, "RunGit") |
| self.start_mock = self.PatchObject(repo_util.Repository, "StartBranch") |
| |
| self._branch_name = "test_branch" |
| self._target_manifest_branch = "cros/test" |
| self._branch = cros_mark_as_stable.GitBranch( |
| branch_name=self._branch_name, |
| tracking_branch=self._target_manifest_branch, |
| cwd=".", |
| ) |
| |
| def testCheckoutCreate(self) -> None: |
| """Test init with no previous branch existing.""" |
| self.PatchObject(self._branch, "Exists", return_value=False) |
| cros_mark_as_stable.GitBranch.Checkout(self._branch) |
| self.start_mock.assert_called_with( |
| self._branch_name, projects=["."], cwd="." |
| ) |
| |
| def testCheckoutNoCreate(self) -> None: |
| """Test init with previous branch existing.""" |
| self.PatchObject(self._branch, "Exists", return_value=True) |
| cros_mark_as_stable.GitBranch.Checkout(self._branch) |
| self.git_mock.assert_called_with( |
| ".", ["checkout", "-f", self._branch_name] |
| ) |
| |
| def testExists(self) -> None: |
| """Test if branch exists that is created.""" |
| result = cros_build_lib.CompletedProcess( |
| stdout=self._branch_name + "\n" |
| ) |
| self.PatchObject(git, "RunGit", return_value=result) |
| self.assertTrue(self._branch.Exists()) |