| # Copyright 2012 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Unittests for chromite.lib.patch.""" |
| |
| import copy |
| import functools |
| import itertools |
| import os |
| import shutil |
| import tempfile |
| import time |
| from unittest import mock |
| |
| from chromite.lib import config_lib |
| from chromite.lib import constants |
| from chromite.lib import cros_build_lib |
| from chromite.lib import cros_test_lib |
| from chromite.lib import gerrit |
| from chromite.lib import git |
| from chromite.lib import osutils |
| from chromite.lib import patch as cros_patch |
| |
| |
| _GetNumber = functools.partial(next, itertools.count()) |
| |
| # Change-ID of a known open change in public gerrit. |
| GERRIT_OPEN_CHANGEID = "8366" |
| GERRIT_MERGED_CHANGEID = "3" |
| GERRIT_ABANDONED_CHANGEID = "2" |
| |
| FAKE_PATCH_JSON = { |
| "project": "tacos/chromite", |
| "branch": "main", |
| "id": "Iee5c89d929f1850d7d4e1a4ff5f21adda800025f", |
| "currentPatchSet": { |
| "number": "2", |
| "ref": gerrit.GetChangeRef(1112, 2), |
| "revision": "ff10979dd360e75ff21f5cf53b7f8647578785ef", |
| }, |
| "number": "1112", |
| "subject": "chromite commit", |
| "owner": { |
| "name": "Chromite main", |
| "email": "chromite@chromium.org", |
| }, |
| "url": "https://chromium-review.googlesource.com/1112", |
| "lastUpdated": 1311024529, |
| "sortKey": "00166e8700001052", |
| "open": True, |
| "status": "NEW", |
| } |
| |
| # List of labels as seen in the top level desc by normal CrOS devs. |
| FAKE_LABELS_JSON = { |
| "Code-Review": { |
| "default_value": 0, |
| "values": { |
| " 0": "No score", |
| "+1": "Looks good to me, but someone else must approve", |
| "+2": "Looks good to me, approved", |
| "-1": "I would prefer that you did not submit this", |
| "-2": "Do not submit", |
| }, |
| }, |
| "Commit-Queue": { |
| "default_value": 0, |
| "optional": True, |
| "values": { |
| " 0": "Not Ready", |
| "+1": "Try ready", |
| "+2": "Ready", |
| "+3": "Ignore this label", |
| }, |
| }, |
| "Verified": { |
| "default_value": 0, |
| "values": { |
| " 0": "No score", |
| "+1": "Verified", |
| "-1": "Fails", |
| }, |
| }, |
| } |
| |
| # List of label values as seen in the permitted_labels field. |
| # Note: The space before the 0 is intentional -- it's what gerrit returns. |
| FAKE_PERMITTED_LABELS_JSON = { |
| "Code-Review": ["-2", "-1", " 0", "+1", "+2"], |
| "Commit-Queue": [" 0", "+1", "+2"], |
| "Verified": ["-1", " 0", "+1"], |
| } |
| |
| # An account structure as seen in owners or reviewers lists. |
| FAKE_GERRIT_ACCT_JSON = { |
| "_account_id": 991919291, |
| "avatars": [ |
| {"height": 26, "url": "https://example.com/26/photo.jpg"}, |
| {"height": 32, "url": "https://example.com/32/photo.jpg"}, |
| {"height": 100, "url": "https://example.com/100/photo.jpg"}, |
| ], |
| "email": "happy-funky-duck@chromium.org", |
| "name": "Duckworth Duck", |
| } |
| |
| # A valid change json result as returned by gerrit. |
| FAKE_CHANGE_JSON = { |
| "_number": 8366, |
| "branch": "main", |
| "change_id": "I3a753d6bacfbe76e5675a6f2f7941fe520c095e5", |
| "created": "2016-09-14 23:02:46.000000000", |
| "current_revision": "be54f9935bedb157b078eefa26fc1885b8264da6", |
| "deletions": 1, |
| "hashtags": [], |
| "id": "example%2Frepo~main~I3a753d6bacfbe76e5675a6f2f7941fe520c095e5", |
| "insertions": 0, |
| "labels": FAKE_LABELS_JSON, |
| "mergeable": True, |
| "owner": FAKE_GERRIT_ACCT_JSON, |
| "permitted_labels": FAKE_PERMITTED_LABELS_JSON, |
| "project": "example/repo", |
| "removable_reviewers": [], |
| "reviewers": {}, |
| "revisions": { |
| "be54f9935bedb157b078eefa26fc1885b8264da6": { |
| "_number": 1, |
| "commit": { |
| "author": { |
| "date": "2016-09-14 22:54:07.000000000", |
| "email": "vapier@chromium.org", |
| "name": "Mike Frysinger", |
| "tz": -240, |
| }, |
| "committer": { |
| "date": "2016-09-14 23:02:03.000000000", |
| "email": "vapier@chromium.org", |
| "name": "Mike Frysinger", |
| "tz": -240, |
| }, |
| "message": ( |
| "my super great message\n\nChange-Id: " |
| "I3a753d6bacfbe76e5675a6f2f7941fe520c095e5\n" |
| ), |
| "parents": [ |
| { |
| "commit": "3d54362a9b010330bae2dde973fc5c3efc4e5f44", |
| "subject": "some parent commit", |
| }, |
| ], |
| "subject": "my super great message", |
| }, |
| "created": "2016-09-14 23:02:46.000000000", |
| "fetch": { |
| "http": { |
| "ref": "refs/changes/15/8366/1", |
| "url": "https://chromium.googlesource.com/example/repo", |
| }, |
| "repo": { |
| "ref": "refs/changes/15/8366/1", |
| "url": "example/repo", |
| }, |
| "rpc": { |
| "ref": "refs/changes/15/8366/1", |
| "url": "rpc://chromium/example/repo", |
| }, |
| "sso": { |
| "ref": "refs/changes/15/8366/1", |
| "url": "sso://chromium/example/repo", |
| }, |
| }, |
| "kind": "REWORK", |
| "ref": "refs/changes/15/8366/1", |
| "uploader": FAKE_GERRIT_ACCT_JSON, |
| }, |
| }, |
| "status": "NEW", |
| "subject": "my super great message", |
| "submit_type": "CHERRY_PICK", |
| "topic": "some-topic", |
| "updated": "2016-09-14 23:02:46.000000000", |
| } |
| |
| |
| class GitRepoPatchTestCase(cros_test_lib.TempDirTestCase): |
| """Helper TestCase class for writing test cases.""" |
| |
| # No mock bits are to be used in this class's tests. |
| # This needs to actually validate git output, and git behaviour, rather |
| # than test our assumptions about git's behaviour/output. |
| |
| patch_kls = cros_patch.GitRepoPatch |
| |
| COMMIT_TEMPLATE = """\ |
| commit abcdefgh |
| |
| Author: Fake person |
| Date: Tue Oct 99 |
| |
| I am the first commit. |
| |
| %(extra)s |
| |
| %(change-id)s |
| """ |
| |
| # Boolean controlling whether the target class natively knows its |
| # ChangeId; only GerritPatches do. |
| has_native_change_id = False |
| |
| DEFAULT_TRACKING = ( |
| "refs/remotes/%s/main" % config_lib.GetSiteParams().EXTERNAL_REMOTE |
| ) |
| |
| @staticmethod |
| def _CreateSourceRepo(tmp_path): |
| """Generate a new repo with a single commit.""" |
| bare_path = os.path.join(tmp_path, "bare.git") |
| checkout_path = os.path.join(tmp_path, "checkout") |
| cros_build_lib.run( |
| [ |
| "git", |
| "init", |
| "--separate-git-dir", |
| bare_path, |
| "--initial-branch", |
| "main", |
| checkout_path, |
| ], |
| cwd=tmp_path, |
| print_cmd=False, |
| capture_output=True, |
| ) |
| |
| # Nerf any hooks the OS might have installed on us as they aren't going |
| # to be useful to us, just slow things down. |
| shutil.rmtree(os.path.join(bare_path, "hooks")) |
| |
| # Add an initial commit then wipe the working tree. |
| cros_build_lib.run( |
| ["git", "commit", "--allow-empty", "-m", "initial commit"], |
| cwd=checkout_path, |
| print_cmd=False, |
| capture_output=True, |
| ) |
| |
| return bare_path |
| |
| @classmethod |
| def setUpClass(cls) -> None: |
| # Generate the same git tree once as it's a bit expensive. |
| cls._cache_root = tempfile.mkdtemp(prefix="chromite-patch-unittest") |
| cls._cache_bare = cls._CreateSourceRepo(cls._cache_root) |
| |
| @classmethod |
| def tearDownClass(cls) -> None: |
| shutil.rmtree(cls._cache_root) |
| |
| def setUp(self) -> None: |
| # Create an empty repo to work from. |
| self.source = self._cache_bare |
| self.default_cwd = os.path.join(self.tempdir, "unwritable") |
| # Disallow write so as to smoke out any invalid writes to cwd. |
| os.mkdir(self.default_cwd, 0o500) |
| os.chdir(self.default_cwd) |
| |
| def _MkPatch(self, source, sha1, ref="refs/heads/main", **kwargs): |
| # This arg is used by inherited versions of _MkPatch. Pop it to make |
| # this _MkPatch compatible with them. |
| site_params = config_lib.GetSiteParams() |
| kwargs.pop("suppress_branch", None) |
| return self.patch_kls( |
| source, |
| "chromiumos/chromite", |
| ref, |
| "%s/main" % site_params.EXTERNAL_REMOTE, |
| kwargs.pop("remote", site_params.EXTERNAL_REMOTE), |
| sha1=sha1, |
| **kwargs, |
| ) |
| |
| def _run(self, cmd, cwd=None, **kwargs): |
| # This is to match git.RunGit behavior. |
| kwargs.setdefault("print_cmd", False) |
| if "capture_output" not in kwargs: |
| kwargs.setdefault("stdout", True) |
| kwargs.setdefault("stderr", True) |
| kwargs.setdefault("encoding", "utf-8") |
| |
| # Note that cwd is intentionally set to a location the user can't write |
| # to; this flushes out any bad usage in the tests that would work by |
| # fluke of being invoked from w/in a git repo. |
| if cwd is None: |
| cwd = self.default_cwd |
| |
| return cros_build_lib.run(cmd, cwd=cwd, **kwargs).stdout.strip() |
| |
| def _GetSha1(self, cwd, refspec): |
| return self._run(["git", "rev-list", "-n1", refspec], cwd=cwd) |
| |
| def _MakeRepo(self, name, clone, remote=None, alternates=True): |
| path = os.path.join(self.tempdir, name) |
| cmd = ["git", "clone", clone, path] |
| if alternates: |
| cmd += ["--reference", clone] |
| if remote is None: |
| remote = config_lib.GetSiteParams().EXTERNAL_REMOTE |
| cmd += ["--origin", remote] |
| self._run(cmd) |
| # Nerf any hooks the OS might have installed on us as they aren't going |
| # to be useful to us, just slow things down. |
| shutil.rmtree(os.path.join(path, ".git", "hooks")) |
| return path |
| |
| def _MakeCommit(self, repo, commit=None): |
| if commit is None: |
| commit = "commit at %s" % (time.time(),) |
| self._run(["git", "commit", "-a", "-m", commit], repo) |
| return self._GetSha1(repo, "HEAD") |
| |
| def CommitFile(self, repo, filename, content, commit=None, **kwargs): |
| osutils.WriteFile(os.path.join(repo, filename), content) |
| self._run(["git", "add", filename], repo) |
| sha1 = self._MakeCommit(repo, commit=commit) |
| if not self.has_native_change_id: |
| kwargs.pop("ChangeId", None) |
| patch = self._MkPatch(repo, sha1, **kwargs) |
| self.assertEqual(patch.sha1, sha1) |
| return patch |
| |
| def _CommonGitSetup(self): |
| git1 = self._MakeRepo("git1", self.source) |
| git2 = os.path.join(self.tempdir, "git2") |
| # Avoid another git clone as it's a bit expensive. |
| shutil.copytree(git1, git2) |
| patch = self.CommitFile(git1, "monkeys", "foon") |
| return git1, git2, patch |
| |
| def MakeChangeId(self, how_many=1): |
| l = [cros_patch.MakeChangeId() for _ in range(how_many)] |
| if how_many == 1: |
| return l[0] |
| return l |
| |
| def CommitChangeIdFile( |
| self, |
| repo, |
| changeid=None, |
| extra=None, |
| filename="monkeys", |
| content="flinging", |
| raw_changeid_text=None, |
| **kwargs, |
| ): |
| template = self.COMMIT_TEMPLATE |
| if changeid is None: |
| changeid = self.MakeChangeId() |
| if raw_changeid_text is None: |
| raw_changeid_text = "Change-Id: %s" % (changeid,) |
| if extra is None: |
| extra = "" |
| commit = template % {"change-id": raw_changeid_text, "extra": extra} |
| |
| return self.CommitFile( |
| repo, filename, content, commit=commit, ChangeId=changeid, **kwargs |
| ) |
| |
| |
| # pylint: disable=protected-access |
| class TestGitRepoPatch(GitRepoPatchTestCase): |
| """Unittests for git patch related methods.""" |
| |
| def testGetDiffStatus(self) -> None: |
| git1, _, patch1 = self._CommonGitSetup() |
| # Ensure that it can work on the first commit, even if it |
| # doesn't report anything (no delta; it's the first files). |
| patch1 = self._MkPatch(git1, self._GetSha1(git1, self.DEFAULT_TRACKING)) |
| self.assertEqual({}, patch1.GetDiffStatus(git1)) |
| patch2 = self.CommitFile(git1, "monkeys", "blah") |
| self.assertEqual({"monkeys": "M"}, patch2.GetDiffStatus(git1)) |
| git.RunGit(git1, ["mv", "monkeys", "monkeys2"]) |
| patch3 = self._MkPatch(git1, self._MakeCommit(git1, commit="mv")) |
| self.assertEqual( |
| {"monkeys": "D", "monkeys2": "A"}, patch3.GetDiffStatus(git1) |
| ) |
| patch4 = self.CommitFile(git1, "monkey2", "blah") |
| self.assertEqual({"monkey2": "A"}, patch4.GetDiffStatus(git1)) |
| |
| def testFetch(self) -> None: |
| _, git2, patch = self._CommonGitSetup() |
| patch.Fetch(git2) |
| self.assertEqual(patch.sha1, self._GetSha1(git2, "FETCH_HEAD")) |
| # Verify reuse; specifically that Fetch doesn't actually run since |
| # the rev is already available locally via alternates. |
| patch.project_url = "/dev/null" |
| git3 = self._MakeRepo("git3", git2) |
| patch.Fetch(git3) |
| self.assertEqual(patch.sha1, self._GetSha1(git3, patch.sha1)) |
| |
| def testFetchFirstPatchInSeries(self) -> None: |
| git1, git2, patch = self._CommonGitSetup() |
| self.CommitFile(git1, "monkeys", "foon2") |
| patch.Fetch(git2) |
| |
| def testFetchWithoutSha1(self) -> None: |
| git1, git2, _ = self._CommonGitSetup() |
| patch2 = self.CommitFile(git1, "monkeys", "foon2") |
| sha1, patch2.sha1 = patch2.sha1, None |
| patch2.Fetch(git2) |
| self.assertEqual(sha1, patch2.sha1) |
| |
| def testParentless(self) -> None: |
| git1 = self._MakeRepo("git1", self.source) |
| patch1 = self._MkPatch(git1, self._GetSha1(git1, "HEAD")) |
| self.assertRaises2( |
| cros_patch.PatchNoParents, |
| patch1.Apply, |
| git1, |
| self.DEFAULT_TRACKING, |
| check_attrs={"inflight": False}, |
| ) |
| |
| def testNoParentOrAlreadyApplied(self) -> None: |
| git1 = self._MakeRepo("git1", self.source) |
| patch1 = self._MkPatch(git1, self._GetSha1(git1, "HEAD")) |
| self.assertRaises2( |
| cros_patch.PatchNoParents, |
| patch1.Apply, |
| git1, |
| self.DEFAULT_TRACKING, |
| check_attrs={"inflight": False}, |
| ) |
| patch2 = self.CommitFile(git1, "monkeys", "rule") |
| self.assertRaises2( |
| cros_patch.PatchIsEmpty, |
| patch2.Apply, |
| git1, |
| self.DEFAULT_TRACKING, |
| check_attrs={"inflight": True}, |
| ) |
| |
| def testGetNoParents(self) -> None: |
| git1 = self._MakeRepo("git1", self.source) |
| sha1 = self._GetSha1(git1, "HEAD") |
| patch = self._MkPatch(self.source, sha1) |
| self.assertEqual(patch._GetParents(git1), []) |
| |
| def testGet1Parent(self) -> None: |
| git1 = self._MakeRepo("git1", self.source) |
| patch1 = self.CommitFile(git1, "foo", "foo") |
| patch2 = self.CommitFile(git1, "bar", "bar") |
| self.assertEqual(patch2._GetParents(git1), [patch1.sha1]) |
| |
| def testGet2Parents(self) -> None: |
| # Prepare a merge commit, then test that its two parents are correctly |
| # calculated. |
| git1 = self._MakeRepo("git1", self.source) |
| patch_common = self.CommitFile(git1, "foo", "foo") |
| |
| patch_right = self.CommitFile(git1, "bar", "bar") |
| |
| git.RunGit(git1, ["reset", "--hard", patch_common.sha1]) |
| patch_left = self.CommitFile(git1, "baz", "baz") |
| |
| git.RunGit(git1, ["merge", patch_right.sha1]) |
| sha1 = self._GetSha1(git1, "HEAD") |
| patch_merge = self._MkPatch(self.source, sha1, suppress_branch=True) |
| |
| self.assertEqual( |
| patch_merge._GetParents(git1), [patch_left.sha1, patch_right.sha1] |
| ) |
| |
| def testIsAncestor(self) -> None: |
| git1 = self._MakeRepo("git1", self.source) |
| patch1 = self.CommitFile(git1, "foo", "foo") |
| patch2 = self.CommitFile(git1, "bar", "bar") |
| self.assertTrue(patch1._IsAncestorOf(git1, patch1)) |
| self.assertTrue(patch1._IsAncestorOf(git1, patch2)) |
| self.assertFalse(patch2._IsAncestorOf(git1, patch1)) |
| |
| def testFromSha1(self) -> None: |
| git1 = self._MakeRepo("git1", self.source) |
| patch1 = self.CommitFile(git1, "foo", "foo") |
| patch2 = self.CommitFile(git1, "bar", "bar") |
| patch2_from_sha1 = patch1._FromSha1(patch2.sha1) |
| patch2_from_sha1.Fetch(git1) |
| patch2.Fetch(git1) |
| self.assertEqual(patch2.tree_hash, patch2_from_sha1.tree_hash) |
| |
| def testValidateMerge(self) -> None: |
| git1 = self._MakeRepo("git1", self.source) |
| |
| # Prepare history like this: |
| # * E (upstream) |
| # * | D (merge being handled) |
| # |\| |
| # * | C |
| # | * B |
| # |/ |
| # * A |
| # |
| # D is valid to merge into E. |
| A = self.CommitFile(git1, "A", "A") |
| B = self.CommitFile(git1, "B", "B") |
| E = self.CommitFile(git1, "E", "E") |
| git.RunGit(git1, ["reset", "--hard", A.sha1]) |
| C = self.CommitFile(git1, "C", "C") |
| git.RunGit(git1, ["merge", B.sha1]) |
| sha1 = self._GetSha1(git1, "HEAD") |
| D = self._MkPatch(self.source, sha1, suppress_branch=True) |
| |
| D._ValidateMergeCommit(git1, E.sha1, [C.sha1, B.sha1]) |
| |
| def testValidateMergeFailure(self) -> None: |
| git1 = self._MakeRepo("git1", self.source) |
| # * F (merge being handled) |
| # |\ |
| # | * E |
| # * | D |
| # | | * C (upstream) |
| # | |/ |
| # | * B |
| # |/ |
| # * A |
| # |
| # F is not valid to merge into E. |
| A = self.CommitFile(git1, "A", "A") |
| B = self.CommitFile(git1, "B", "B") |
| C = self.CommitFile(git1, "C", "C") |
| git.RunGit(git1, ["reset", "--hard", B.sha1]) |
| E = self.CommitFile(git1, "E", "E") |
| git.RunGit(git1, ["reset", "--hard", A.sha1]) |
| D = self.CommitFile(git1, "D", "D") |
| git.RunGit(git1, ["merge", E.sha1]) |
| sha1 = self._GetSha1(git1, "HEAD") |
| F = self._MkPatch(self.source, sha1, suppress_branch=True) |
| |
| with self.assertRaises(cros_patch.NonMainlineMerge): |
| F._ValidateMergeCommit(git1, C.sha1, [D.sha1, E.sha1]) |
| |
| def testDeleteEbuildTwice(self) -> None: |
| """Test that double-deletes of ebuilds are flagged as conflicts.""" |
| # Create monkeys.ebuild for testing. |
| git1 = self._MakeRepo("git1", self.source) |
| patch1 = self.CommitFile(git1, "monkeys.ebuild", "rule") |
| git.RunGit(git1, ["rm", "monkeys.ebuild"]) |
| patch2 = self._MkPatch(git1, self._MakeCommit(git1, commit="rm")) |
| |
| # Delete an ebuild that does not exist in TOT. |
| check_attrs = {"inflight": False, "files": ("monkeys.ebuild",)} |
| self.assertRaises2( |
| cros_patch.EbuildConflict, |
| patch2.Apply, |
| git1, |
| self.DEFAULT_TRACKING, |
| check_attrs=check_attrs, |
| ) |
| |
| # Delete an ebuild that exists in TOT, but does not exist in the current |
| # patch series. |
| check_attrs["inflight"] = True |
| self.assertRaises2( |
| cros_patch.EbuildConflict, |
| patch2.Apply, |
| git1, |
| patch1.sha1, |
| check_attrs=check_attrs, |
| ) |
| |
| def testCleanlyApply(self) -> None: |
| _, git2, patch = self._CommonGitSetup() |
| # Clone git3 before we modify git2; else we'll just wind up |
| # cloning its default branch. |
| git3 = self._MakeRepo("git3", git2) |
| patch.Apply(git2, self.DEFAULT_TRACKING) |
| # Verify reuse; specifically that Fetch doesn't actually run since |
| # the object is available in alternates. testFetch partially |
| # validates this; the Apply usage here fully validates it via |
| # ensuring that the attempted Apply goes boom if it can't get the |
| # required sha1. |
| patch.project_url = "/dev/null" |
| patch.Apply(git3, self.DEFAULT_TRACKING) |
| |
| def testFailsApply(self) -> None: |
| _, git2, patch1 = self._CommonGitSetup() |
| patch2 = self.CommitFile(git2, "monkeys", "not foon") |
| # Note that Apply creates its own branch, resetting to the default, |
| # thus we have to re-apply (even if it looks wrong, it's right). |
| patch2.Apply(git2, self.DEFAULT_TRACKING) |
| self.assertRaises2( |
| cros_patch.ApplyPatchException, |
| patch1.Apply, |
| git2, |
| self.DEFAULT_TRACKING, |
| exact_kls=True, |
| check_attrs={"inflight": True}, |
| ) |
| |
| def testTrivial(self) -> None: |
| _, git2, patch1 = self._CommonGitSetup() |
| # Throw in a bunch of newlines so that content-merging would work. |
| content = "not foon%s" % ("\n" * 100) |
| patch1 = self._MkPatch(git2, self._GetSha1(git2, "HEAD")) |
| patch1 = self.CommitFile(git2, "monkeys", content) |
| git.RunGit(git2, ["update-ref", self.DEFAULT_TRACKING, patch1.sha1]) |
| patch2 = self.CommitFile(git2, "monkeys", "%sblah" % content) |
| patch3 = self.CommitFile(git2, "monkeys", "%sblahblah" % content) |
| # Get us a back to the basic, then derive from there; this is used to |
| # verify that even if content merging works, trivial is flagged. |
| self.CommitFile(git2, "monkeys", "foon") |
| patch4 = self.CommitFile(git2, "monkeys", content) |
| patch5 = self.CommitFile(git2, "monkeys", "%sfoon" % content) |
| # Reset so we derive the next changes from patch1. |
| git.RunGit(git2, ["reset", "--hard", patch1.sha1]) |
| patch6 = self.CommitFile(git2, "blah", "some-other-file") |
| self.CommitFile( |
| git2, "monkeys", "%sblah" % content.replace("not", "bot") |
| ) |
| |
| self.assertRaises2( |
| cros_patch.PatchIsEmpty, |
| patch1.Apply, |
| git2, |
| self.DEFAULT_TRACKING, |
| trivial=True, |
| check_attrs={"inflight": False, "trivial": False}, |
| ) |
| |
| # Now test conflicts since we're still at ToT; note that this is an |
| # actual conflict because the fuzz anchors have changed. |
| self.assertRaises2( |
| cros_patch.ApplyPatchException, |
| patch3.Apply, |
| git2, |
| self.DEFAULT_TRACKING, |
| trivial=True, |
| check_attrs={"inflight": False, "trivial": False}, |
| exact_kls=True, |
| ) |
| |
| # Now test trivial conflict; this would've merged fine were it not for |
| # trivial. |
| self.assertRaises2( |
| cros_patch.PatchIsEmpty, |
| patch4.Apply, |
| git2, |
| self.DEFAULT_TRACKING, |
| trivial=True, |
| check_attrs={"inflight": False, "trivial": False}, |
| exact_kls=True, |
| ) |
| |
| # Move us into inflight testing. |
| patch2.Apply(git2, self.DEFAULT_TRACKING, trivial=True) |
| |
| # Repeat the tests from above; should still be the same. |
| self.assertRaises2( |
| cros_patch.PatchIsEmpty, |
| patch4.Apply, |
| git2, |
| self.DEFAULT_TRACKING, |
| trivial=True, |
| check_attrs={"inflight": False, "trivial": False}, |
| ) |
| |
| # Actual conflict merge conflict due to inflight; non trivial induced. |
| self.assertRaises2( |
| cros_patch.ApplyPatchException, |
| patch5.Apply, |
| git2, |
| self.DEFAULT_TRACKING, |
| trivial=True, |
| check_attrs={"inflight": True, "trivial": False}, |
| exact_kls=True, |
| ) |
| |
| self.assertRaises2( |
| cros_patch.PatchIsEmpty, |
| patch1.Apply, |
| git2, |
| self.DEFAULT_TRACKING, |
| trivial=True, |
| check_attrs={"inflight": False}, |
| ) |
| |
| self.assertRaises2( |
| cros_patch.ApplyPatchException, |
| patch5.Apply, |
| git2, |
| self.DEFAULT_TRACKING, |
| trivial=True, |
| check_attrs={"inflight": True, "trivial": False}, |
| exact_kls=True, |
| ) |
| |
| # And this should apply without issue, despite the differing history. |
| patch6.Apply(git2, self.DEFAULT_TRACKING, trivial=True) |
| |
| def _assertLookupAliases(self, remote) -> None: |
| git1 = self._MakeRepo("git1", self.source) |
| patch = self.CommitChangeIdFile(git1, remote=remote) |
| prefix = "chrome-internal:" if patch.internal else "chromium:" |
| vals = [ |
| patch.sha1, |
| getattr(patch, "gerrit_number", None), |
| getattr(patch, "original_sha1", None), |
| ] |
| # Append full Change-ID if it exists. |
| if patch.project and patch.tracking_branch and patch.change_id: |
| vals.append( |
| "%s~%s~%s" |
| % (patch.project, patch.tracking_branch, patch.change_id) |
| ) |
| vals = [x for x in vals if x is not None] |
| self.assertEqual( |
| set(prefix + x for x in vals), set(patch.LookupAliases()) |
| ) |
| |
| def testExternalLookupAliases(self) -> None: |
| self._assertLookupAliases(config_lib.GetSiteParams().EXTERNAL_REMOTE) |
| |
| def testInternalLookupAliases(self) -> None: |
| self._assertLookupAliases(config_lib.GetSiteParams().INTERNAL_REMOTE) |
| |
| def testChangeIdMetadata(self) -> None: |
| """Verify Change-Id is set in git metadata.""" |
| git1, git2, _ = self._CommonGitSetup() |
| changeid = "I%s" % ("1".rjust(40, "0")) |
| patch = self.CommitChangeIdFile( |
| git1, changeid=changeid, change_id=changeid, raw_changeid_text="" |
| ) |
| patch.change_id = changeid |
| patch.Fetch(git1) |
| self.assertIn("Change-Id: %s\n" % changeid, patch.commit_message) |
| patch = self.CommitChangeIdFile( |
| git2, changeid=changeid, change_id=changeid |
| ) |
| patch.Fetch(git2) |
| self.assertEqual(patch.change_id, changeid) |
| self.assertIn("Change-Id: %s\n" % changeid, patch.commit_message) |
| |
| |
| class TestGerritFetchOnlyPatch(cros_test_lib.MockTestCase): |
| """Test of GerritFetchOnlyPatch.""" |
| |
| def testFromAttrDict(self) -> None: |
| """Test whether FromAttrDict can handle with commit message.""" |
| attr_dict_without_msg = { |
| cros_patch.ATTR_PROJECT_URL: "https://host/chromite/tacos", |
| cros_patch.ATTR_PROJECT: "chromite/tacos", |
| cros_patch.ATTR_REF: "refs/changes/11/12345/4", |
| cros_patch.ATTR_BRANCH: "main", |
| cros_patch.ATTR_REMOTE: "cros-internal", |
| cros_patch.ATTR_COMMIT: "7181e4b5e182b6f7d68461b04253de095bad74f9", |
| cros_patch.ATTR_CHANGE_ID: ( |
| "I47ea30385af60ae4cc2acc5d1a283a46423bc6e1" |
| ), |
| cros_patch.ATTR_GERRIT_NUMBER: "12345", |
| cros_patch.ATTR_PATCH_NUMBER: "4", |
| cros_patch.ATTR_OWNER_EMAIL: "foo@chromium.org", |
| cros_patch.ATTR_FAIL_COUNT: 1, |
| cros_patch.ATTR_PASS_COUNT: 1, |
| cros_patch.ATTR_TOTAL_FAIL_COUNT: 3, |
| } |
| |
| attr_dict_with_msg = { |
| cros_patch.ATTR_PROJECT_URL: "https://host/chromite/tacos", |
| cros_patch.ATTR_PROJECT: "chromite/tacos", |
| cros_patch.ATTR_REF: "refs/changes/11/12345/4", |
| cros_patch.ATTR_BRANCH: "main", |
| cros_patch.ATTR_REMOTE: "cros-internal", |
| cros_patch.ATTR_COMMIT: "7181e4b5e182b6f7d68461b04253de095bad74f9", |
| cros_patch.ATTR_CHANGE_ID: ( |
| "I47ea30385af60ae4cc2acc5d1a283a46423bc6e1" |
| ), |
| cros_patch.ATTR_GERRIT_NUMBER: "12345", |
| cros_patch.ATTR_PATCH_NUMBER: "4", |
| cros_patch.ATTR_OWNER_EMAIL: "foo@chromium.org", |
| cros_patch.ATTR_FAIL_COUNT: 1, |
| cros_patch.ATTR_PASS_COUNT: 1, |
| cros_patch.ATTR_TOTAL_FAIL_COUNT: 3, |
| cros_patch.ATTR_COMMIT_MESSAGE: "commit message", |
| } |
| |
| self.PatchObject( |
| cros_patch.GitRepoPatch, |
| "_AddFooters", |
| return_value="commit message", |
| ) |
| |
| result_1 = cros_patch.GerritFetchOnlyPatch.FromAttrDict( |
| attr_dict_without_msg |
| ).commit_message |
| result_2 = cros_patch.GerritFetchOnlyPatch.FromAttrDict( |
| attr_dict_with_msg |
| ).commit_message |
| self.assertEqual(None, result_1) |
| self.assertEqual("commit message", result_2) |
| |
| def testGetAttributeDict(self) -> None: |
| """Test Whether GetAttributeDict can get the commit message properly.""" |
| change = cros_patch.GerritFetchOnlyPatch( |
| "https://host/chromite/tacos", |
| "chromite/tacos", |
| "refs/changes/11/12345/4", |
| "main", |
| "cros-internal", |
| "7181e4b5e182b6f7d68461b04253de095bad74f9", |
| "I47ea30385af60ae4cc2acc5d1a283a46423bc6e1", |
| "12345", |
| "4", |
| "foo@chromium.org", |
| 1, |
| 1, |
| 3, |
| ) |
| |
| expected = { |
| cros_patch.ATTR_PROJECT_URL: "https://host/chromite/tacos", |
| cros_patch.ATTR_PROJECT: "chromite/tacos", |
| cros_patch.ATTR_REF: "refs/changes/11/12345/4", |
| cros_patch.ATTR_BRANCH: "main", |
| cros_patch.ATTR_REMOTE: "cros-internal", |
| cros_patch.ATTR_COMMIT: "7181e4b5e182b6f7d68461b04253de095bad74f9", |
| cros_patch.ATTR_CHANGE_ID: ( |
| "I47ea30385af60ae4cc2acc5d1a283a46423bc6e1" |
| ), |
| cros_patch.ATTR_GERRIT_NUMBER: "12345", |
| cros_patch.ATTR_PATCH_NUMBER: "4", |
| cros_patch.ATTR_OWNER_EMAIL: "foo@chromium.org", |
| cros_patch.ATTR_FAIL_COUNT: "1", |
| cros_patch.ATTR_PASS_COUNT: "1", |
| cros_patch.ATTR_TOTAL_FAIL_COUNT: "3", |
| cros_patch.ATTR_COMMIT_MESSAGE: None, |
| } |
| self.assertEqual(change.GetAttributeDict(), expected) |
| |
| self.PatchObject( |
| cros_patch.GitRepoPatch, |
| "_AddFooters", |
| return_value="commit message", |
| ) |
| change.commit_message = "commit message" |
| expected[cros_patch.ATTR_COMMIT_MESSAGE] = "commit message" |
| self.assertEqual(change.GetAttributeDict(), expected) |
| |
| |
| class TestGetOptionLinesFromCommitMessage(cros_test_lib.TestCase): |
| """Tests of GetOptionFromCommitMessage.""" |
| |
| _M1 = """jabberwocky: by Lewis Carroll |
| |
| 'Twas brillig, and the slithy toves |
| did gyre and gimble in the wabe. |
| """ |
| |
| _M2 = """jabberwocky: by Lewis Carroll |
| |
| All mimsy were the borogroves, |
| And the mome wraths outgrabe. |
| jabberwocky: Charles Lutwidge Dodgson |
| """ |
| |
| _M3 = """jabberwocky: by Lewis Carroll |
| |
| He took his vorpal sword in hand: |
| Long time the manxome foe he sought |
| jabberwocky: |
| """ |
| |
| _M4 = """the poem continues... |
| |
| jabberwocky: O frabjuous day! |
| jabberwocky: Calloh! Callay! |
| """ |
| |
| def testNoMessage(self) -> None: |
| o = cros_patch.GetOptionLinesFromCommitMessage("", "jabberwocky:") |
| self.assertEqual(None, o) |
| |
| def testNoOption(self) -> None: |
| o = cros_patch.GetOptionLinesFromCommitMessage(self._M1, "jabberwocky:") |
| self.assertEqual(None, o) |
| |
| def testYesOption(self) -> None: |
| o = cros_patch.GetOptionLinesFromCommitMessage(self._M2, "jabberwocky:") |
| self.assertEqual(["Charles Lutwidge Dodgson"], o) |
| |
| def testEmptyOption(self) -> None: |
| o = cros_patch.GetOptionLinesFromCommitMessage(self._M3, "jabberwocky:") |
| self.assertEqual([], o) |
| |
| def testMultiOption(self) -> None: |
| o = cros_patch.GetOptionLinesFromCommitMessage(self._M4, "jabberwocky:") |
| self.assertEqual(["O frabjuous day!", "Calloh! Callay!"], o) |
| |
| |
| class TestApplyAgainstManifest( |
| GitRepoPatchTestCase, cros_test_lib.MockTestCase |
| ): |
| """Test applying a patch against a manifest""" |
| |
| MANIFEST_TEMPLATE = """\ |
| <?xml version="1.0" encoding="UTF-8"?> |
| <manifest> |
| <remote name="cros" /> |
| <default revision="refs/heads/main" remote="cros" /> |
| %(projects)s |
| </manifest> |
| """ |
| |
| def _CommonRepoSetup(self, *projects): |
| basedir = self.tempdir |
| repodir = os.path.join(basedir, ".repo") |
| manifest_file = os.path.join(repodir, "manifest.xml") |
| proj_pieces = [] |
| for project in projects: |
| proj_pieces.append("<project") |
| for key, val in project.items(): |
| if key == "path": |
| val = os.path.relpath( |
| os.path.realpath(val), os.path.realpath(self.tempdir) |
| ) |
| proj_pieces.append(' %s="%s"' % (key, val)) |
| proj_pieces.append(" />\n ") |
| proj_str = "".join(proj_pieces) |
| content = self.MANIFEST_TEMPLATE % {"projects": proj_str} |
| os.mkdir(repodir) |
| osutils.WriteFile(manifest_file, content) |
| return basedir |
| |
| def testApplyAgainstManifest(self) -> None: |
| git1, git2, _ = self._CommonGitSetup() |
| |
| readme_text = "Stub README text." |
| readme1 = self.CommitFile(git1, "README", readme_text) |
| readme_text += " Even more stub README text." |
| readme2 = self.CommitFile(git1, "README", readme_text) |
| readme_text += " Even more README text." |
| readme3 = self.CommitFile(git1, "README", readme_text) |
| |
| git1_proj = { |
| "path": git1, |
| "name": "chromiumos/chromite", |
| "revision": str(readme1.sha1), |
| "upstream": "refs/heads/main", |
| } |
| git2_proj = { |
| "path": git2, |
| "name": "git2", |
| } |
| basedir = self._CommonRepoSetup(git1_proj, git2_proj) |
| |
| self.PatchObject( |
| git.ManifestCheckout, "_GetManifestsBranch", return_value=None |
| ) |
| manifest = git.ManifestCheckout(basedir) |
| |
| readme2.ApplyAgainstManifest(manifest) |
| readme3.ApplyAgainstManifest(manifest) |
| |
| # Verify that both readme2 and readme3 are on the patch branch. |
| cmd = [ |
| "git", |
| "log", |
| "--format=%T", |
| "%s..%s" % (readme1.sha1, constants.PATCH_BRANCH), |
| ] |
| trees = self._run(cmd, git1).splitlines() |
| self.assertEqual( |
| trees, [str(readme3.tree_hash), str(readme2.tree_hash)] |
| ) |
| |
| |
| class TestLocalPatchGit(GitRepoPatchTestCase): |
| """Test Local patch handling.""" |
| |
| patch_kls = cros_patch.LocalPatch |
| |
| def setUp(self) -> None: |
| self.sourceroot = os.path.join(self.tempdir, "sourceroot") |
| |
| def _MkPatch(self, source, sha1, ref="refs/heads/main", **kwargs): |
| remote = kwargs.pop( |
| "remote", config_lib.GetSiteParams().EXTERNAL_REMOTE |
| ) |
| return self.patch_kls( |
| source, |
| "chromiumos/chromite", |
| ref, |
| "%s/main" % remote, |
| remote, |
| sha1, |
| **kwargs, |
| ) |
| |
| def testUpload(self) -> None: |
| def ProjectDirMock(_sourceroot): |
| return git1 |
| |
| git1, git2, patch = self._CommonGitSetup() |
| |
| git2_sha1 = self._GetSha1(git2, "HEAD") |
| |
| patch.ProjectDir = ProjectDirMock |
| # First suppress carbon copy behaviour so we verify pushing plain works. |
| sha1 = patch.sha1 |
| patch._GetCarbonCopy = lambda: sha1 # pylint: disable=protected-access |
| patch.Upload(git2, "refs/testing/test1") |
| self.assertEqual(self._GetSha1(git2, "refs/testing/test1"), patch.sha1) |
| |
| # Enable CarbonCopy behaviour; verify it lands a different |
| # sha1. Additionally verify it didn't corrupt the patch's sha1 locally. |
| del patch._GetCarbonCopy |
| patch.Upload(git2, "refs/testing/test2") |
| self.assertNotEqual( |
| self._GetSha1(git2, "refs/testing/test2"), patch.sha1 |
| ) |
| self.assertEqual(patch.sha1, sha1) |
| # Ensure the carbon creation didn't damage the target repo. |
| self.assertEqual(self._GetSha1(git1, "HEAD"), sha1) |
| |
| # Ensure we didn't damage the target repo's state at all. |
| self.assertEqual(git2_sha1, self._GetSha1(git2, "HEAD")) |
| # Ensure the content is the same. |
| base = ["git", "show"] |
| self.assertEqual( |
| self._run(base + ["refs/testing/test1:monkeys"], git2), |
| self._run(base + ["refs/testing/test2:monkeys"], git2), |
| ) |
| base = ["git", "log", "--format=%B", "-n1"] |
| self.assertEqual( |
| self._run(base + ["refs/testing/test1"], git2), |
| self._run(base + ["refs/testing/test2"], git2), |
| ) |
| |
| |
| class UploadedLocalPatchTestCase(GitRepoPatchTestCase): |
| """Test uploading of local git patches.""" |
| |
| PROJECT = "chromiumos/chromite" |
| ORIGINAL_BRANCH = "original_branch" |
| ORIGINAL_SHA1 = "ffffffff".ljust(40, "0") |
| |
| patch_kls = cros_patch.UploadedLocalPatch |
| |
| def _MkPatch(self, source, sha1, ref="refs/heads/main", **kwargs): |
| site_params = config_lib.GetSiteParams() |
| return self.patch_kls( |
| source, |
| self.PROJECT, |
| ref, |
| "%s/main" % site_params.EXTERNAL_REMOTE, |
| self.ORIGINAL_BRANCH, |
| kwargs.pop("original_sha1", self.ORIGINAL_SHA1), |
| kwargs.pop("remote", site_params.EXTERNAL_REMOTE), |
| carbon_copy_sha1=sha1, |
| **kwargs, |
| ) |
| |
| |
| class TestUploadedLocalPatch(UploadedLocalPatchTestCase): |
| """Test uploading of local git patches.""" |
| |
| def testStringRepresentation(self) -> None: |
| _, _, patch = self._CommonGitSetup() |
| str_rep = str(patch).split(":") |
| for element in [ |
| self.PROJECT, |
| self.ORIGINAL_BRANCH, |
| self.ORIGINAL_SHA1[:8], |
| ]: |
| self.assertTrue( |
| element in str_rep, |
| msg="Couldn't find %s in %s" % (element, str_rep), |
| ) |
| |
| |
| class TestGerritPatch(TestGitRepoPatch): |
| """Test Gerrit patch handling.""" |
| |
| has_native_change_id = True |
| |
| class patch_kls(cros_patch.GerritPatch): |
| """Test helper class to suppress pointing to actual gerrit.""" |
| |
| # Suppress the behaviour pointing the project url at actual gerrit, |
| # instead slaving it back to a local repo for tests. |
| def __init__(self, *args, **kwargs) -> None: |
| cros_patch.GerritPatch.__init__(self, *args, **kwargs) |
| assert hasattr(self, "patch_dict") |
| self.project_url = self.patch_dict["_unittest_url_bypass"] |
| |
| @property |
| def test_json(self): |
| return copy.deepcopy(FAKE_PATCH_JSON) |
| |
| def _MkPatch(self, source, sha1, ref="refs/heads/main", **kwargs): |
| site_params = config_lib.GetSiteParams() |
| json = self.test_json |
| remote = kwargs.pop("remote", site_params.EXTERNAL_REMOTE) |
| url_prefix = kwargs.pop("url_prefix", site_params.EXTERNAL_GERRIT_URL) |
| suppress_branch = kwargs.pop("suppress_branch", False) |
| change_id = kwargs.pop("ChangeId", None) |
| if change_id is None: |
| change_id = self.MakeChangeId() |
| json.update(kwargs) |
| change_num, patch_num = _GetNumber(), _GetNumber() |
| # Note we intentionally use a gerrit like refspec here; we want to |
| # ensure that none of our common code pathways puke on a non head/tag. |
| refspec = gerrit.GetChangeRef(change_num + 1000, patch_num) |
| json["currentPatchSet"].update( |
| dict(number=patch_num, ref=refspec, revision=sha1) |
| ) |
| json["branch"] = os.path.basename(ref) |
| json["_unittest_url_bypass"] = source |
| json["id"] = change_id |
| |
| obj = self.patch_kls(json.copy(), remote, url_prefix) |
| self.assertEqual(obj.patch_dict, json) |
| self.assertEqual(obj.remote, remote) |
| self.assertEqual(obj.url_prefix, url_prefix) |
| self.assertEqual(obj.project, json["project"]) |
| self.assertEqual(obj.ref, refspec) |
| self.assertEqual(obj.change_id, change_id) |
| self.assertEqual( |
| obj.id, |
| "%s%s~%s~%s" |
| % ( |
| site_params.CHANGE_PREFIX[remote], |
| json["project"], |
| json["branch"], |
| change_id, |
| ), |
| ) |
| |
| # Now make the fetching actually work, if desired. |
| if not suppress_branch: |
| # Note that a push is needed here, rather than a branch; branch |
| # will just make it under refs/heads, we want it literally in |
| # refs/changes/ |
| self._run( |
| ["git", "push", source, "%s:%s" % (sha1, refspec)], source |
| ) |
| return obj |
| |
| def testApprovalTimestamp(self) -> None: |
| """Test that the approval timestamp is correctly extracted from JSON.""" |
| repo = self._MakeRepo("git", self.source) |
| for approvals, expected in [ |
| (None, 0), |
| ([], 0), |
| ([1], 1), |
| ([1, 3, 2], 3), |
| ]: |
| currentPatchSet = copy.deepcopy(FAKE_PATCH_JSON["currentPatchSet"]) |
| if approvals is not None: |
| currentPatchSet["approvals"] = [ |
| {"grantedOn": x} for x in approvals |
| ] |
| patch = self._MkPatch( |
| repo, |
| self._GetSha1(repo, self.DEFAULT_TRACKING), |
| currentPatchSet=currentPatchSet, |
| ) |
| msg = "Expected %r, but got %r (approvals=%r)" % ( |
| expected, |
| patch.approval_timestamp, |
| approvals, |
| ) |
| self.assertEqual(patch.approval_timestamp, expected, msg) |
| |
| def _assertGerritDependencies(self, remote=None) -> None: |
| if remote is None: |
| remote = config_lib.GetSiteParams().EXTERNAL_REMOTE |
| |
| convert = str |
| if remote == config_lib.GetSiteParams().INTERNAL_REMOTE: |
| convert = lambda val: "chrome-internal:%s" % (val,) |
| elif remote == config_lib.GetSiteParams().EXTERNAL_REMOTE: |
| convert = lambda val: "chromium:%s" % (val,) |
| git1 = self._MakeRepo("git1", self.source, remote=remote) |
| patch = self._MkPatch(git1, self._GetSha1(git1, "HEAD"), remote=remote) |
| cid1, cid2 = "1", "2" |
| |
| # Test cases with no dependencies, 1 dependency, and 2 dependencies. |
| self.assertEqual(patch.GerritDependencies(), []) |
| patch.patch_dict["dependsOn"] = [{"number": cid1}] |
| self.assertEqual( |
| [ |
| cros_patch.AddPrefix(x, x.gerrit_number) |
| for x in patch.GerritDependencies() |
| ], |
| [convert(cid1)], |
| ) |
| patch.patch_dict["dependsOn"].append({"number": cid2}) |
| self.assertEqual( |
| [ |
| cros_patch.AddPrefix(x, x.gerrit_number) |
| for x in patch.GerritDependencies() |
| ], |
| [convert(cid1), convert(cid2)], |
| ) |
| |
| def testExternalGerritDependencies(self) -> None: |
| self._assertGerritDependencies() |
| |
| def testInternalGerritDependencies(self) -> None: |
| self._assertGerritDependencies( |
| config_lib.GetSiteParams().INTERNAL_REMOTE |
| ) |
| |
| def testReviewedOnMetadata(self) -> None: |
| """Verify Change-Id and Reviewed-On are set in git metadata.""" |
| git1, _, patch = self._CommonGitSetup() |
| patch.Apply(git1, self.DEFAULT_TRACKING) |
| reviewed_on = "/".join( |
| [ |
| config_lib.GetSiteParams().EXTERNAL_GERRIT_URL, |
| patch.gerrit_number, |
| ] |
| ) |
| self.assertIn("Reviewed-on: %s\n" % reviewed_on, patch.commit_message) |
| |
| def _MakeFooters(self): |
| return ( |
| (), |
| (("Footer-1", "foo"),), |
| (("Change-id", "42"),), |
| (("Footer-1", "foo"), ("Change-id", "42")), |
| ) |
| |
| def _MakeCommitMessages(self): |
| headers = ( |
| "A standard commit message header", |
| "", |
| "Footer-1: foo", |
| "Change-id: 42", |
| ) |
| |
| bodies = ("", "\n", "Lots of comments\n about the commit\n" * 100) |
| |
| for header, body, preexisting in itertools.product( |
| headers, bodies, self._MakeFooters() |
| ): |
| yield "\n".join( |
| ( |
| header, |
| body, |
| "\n".join("%s: %s" for tag, ident in preexisting), |
| ) |
| ) |
| |
| def testAddFooters(self) -> None: |
| repo = self._MakeRepo("git", self.source) |
| patch = self._MkPatch(repo, self._GetSha1(repo, "HEAD")) |
| approval = {"type": "VRIF", "value": "1", "grantedOn": 1391733002} |
| |
| for msg in self._MakeCommitMessages(): |
| for footers in self._MakeFooters(): |
| with mock.patch( |
| "chromite.lib.patch.FooterForApproval", |
| new=mock.Mock(side_effect=itertools.cycle(footers)), |
| ), mock.patch.object( |
| patch, "_approvals", new=[approval] * len(footers) |
| ): |
| patch._commit_message = msg |
| |
| # Idempotence |
| self.assertEqual( |
| patch._AddFooters(msg), |
| patch._AddFooters(patch._AddFooters(msg)), |
| ) |
| |
| # there may be pre-existing footers. This asserts that we |
| # can Get all the footers after we Set them. |
| self.assertFalse( |
| bool( |
| set(footers) |
| - set(patch._GetFooters(patch._AddFooters(msg))) |
| ) |
| ) |
| |
| if set(footers) - set(patch._GetFooters(msg)): |
| self.assertNotEqual(msg, patch._AddFooters(msg)) |
| |
| def testConvertQueryResults(self) -> None: |
| """Verify basic ConvertQueryResults behavior.""" |
| j = FAKE_CHANGE_JSON |
| exp = { |
| "project": j["project"], |
| "url": "https://host/#/c/8366/", |
| "status": j["status"], |
| "branch": j["branch"], |
| "owner": { |
| "username": "Duckworth Duck", |
| "name": "Duckworth Duck", |
| "email": "happy-funky-duck@chromium.org", |
| }, |
| "createdOn": 1473894166, |
| "commitMessage": ( |
| "my super great message\n\nChange-Id: " |
| "I3a753d6bacfbe76e5675a6f2f7941fe520c095e5\n" |
| ), |
| "currentPatchSet": { |
| "approvals": [], |
| "date": 1473894123, |
| "draft": False, |
| "number": "1", |
| "ref": "refs/changes/15/8366/1", |
| "revision": j["current_revision"], |
| }, |
| "dependsOn": [ |
| {"revision": "3d54362a9b010330bae2dde973fc5c3efc4e5f44"} |
| ], |
| "subject": j["subject"], |
| "id": j["change_id"], |
| "lastUpdated": 1473894166, |
| "number": str(j["_number"]), |
| "private": False, |
| "topic": "some-topic", |
| } |
| ret = cros_patch.GerritPatch.ConvertQueryResults(j, "host") |
| self.assertEqual(ret, exp) |
| |
| def testConvertQueryResultsProtoNoHttp(self) -> None: |
| """Verify ConvertQueryResults handling of non-http protos.""" |
| j = copy.deepcopy(FAKE_CHANGE_JSON) |
| fetch = j["revisions"][j["current_revision"]]["fetch"] |
| fetch.pop("http", None) |
| fetch.pop("https", None) |
| # Mostly just verifying it still parses. |
| ret = cros_patch.GerritPatch.ConvertQueryResults(j, "host") |
| self.assertNotEqual(ret, None) |
| |
| |
| class PrepareRemotePatchesTest(cros_test_lib.TestCase): |
| """Test preparing remote patches.""" |
| |
| def MkRemote( |
| self, |
| project="my/project", |
| original_branch="my-local", |
| ref="refs/tryjobs/elmer/patches", |
| tracking_branch="main", |
| internal=False, |
| ): |
| l = [ |
| project, |
| original_branch, |
| ref, |
| tracking_branch, |
| getattr( |
| constants, |
| ("%s_PATCH_TAG" % ("INTERNAL" if internal else "EXTERNAL")), |
| ), |
| ] |
| return ":".join(l) |
| |
| def assertRemote( |
| self, |
| patch, |
| project="my/project", |
| original_branch="my-local", |
| ref="refs/tryjobs/elmer/patches", |
| tracking_branch="main", |
| internal=False, |
| ) -> None: |
| self.assertEqual(patch.project, project) |
| self.assertEqual(patch.original_branch, original_branch) |
| self.assertEqual(patch.ref, ref) |
| self.assertEqual(patch.tracking_branch, tracking_branch) |
| self.assertEqual(patch.internal, internal) |
| |
| def test(self) -> None: |
| # Check handling of a single patch... |
| patches = cros_patch.PrepareRemotePatches([self.MkRemote()]) |
| self.assertEqual(len(patches), 1) |
| self.assertRemote(patches[0]) |
| |
| # Check handling of a multiple... |
| patches = cros_patch.PrepareRemotePatches( |
| [self.MkRemote(), self.MkRemote(project="foon")] |
| ) |
| self.assertEqual(len(patches), 2) |
| self.assertRemote(patches[0]) |
| self.assertRemote(patches[1], project="foon") |
| |
| # Ensure basic validation occurs: |
| chunks = self.MkRemote().split(":") |
| self.assertRaises( |
| ValueError, cros_patch.PrepareRemotePatches, ":".join(chunks[:-1]) |
| ) |
| self.assertRaises( |
| ValueError, |
| cros_patch.PrepareRemotePatches, |
| ":".join(chunks[:-1] + ["monkeys"]), |
| ) |
| self.assertRaises( |
| ValueError, |
| cros_patch.PrepareRemotePatches, |
| ":".join(chunks + [":"]), |
| ) |
| |
| |
| class PrepareLocalPatchesTests(cros_test_lib.RunCommandTestCase): |
| """Test preparing local patches.""" |
| |
| def setUp(self) -> None: |
| self.path, self.project, self.branch = "mydir", "my/project", "mybranch" |
| self.tracking_branch = "kernel" |
| self.patches = ["%s:%s" % (self.project, self.branch)] |
| self.manifest = mock.MagicMock() |
| attrs = dict( |
| tracking_branch=self.tracking_branch, |
| local_path=self.path, |
| remote="cros", |
| ) |
| checkout = git.ProjectCheckout(attrs) |
| self.PatchObject( |
| self.manifest, "FindCheckouts", return_value=[checkout] |
| ) |
| |
| def PrepareLocalPatches(self, output) -> None: |
| """Check the returned GitRepoPatchInfo against golden values.""" |
| output_obj = mock.MagicMock() |
| output_obj.stdout = output |
| self.PatchObject( |
| cros_patch.LocalPatch, "Fetch", return_value=output_obj |
| ) |
| self.PatchObject(git, "RunGit", return_value=output_obj) |
| patch_info = cros_patch.PrepareLocalPatches( |
| self.manifest, self.patches |
| )[0] |
| self.assertEqual(patch_info.project, self.project) |
| self.assertEqual(patch_info.ref, self.branch) |
| self.assertEqual(patch_info.tracking_branch, self.tracking_branch) |
| |
| def testBranchSpecifiedSuccessRun(self) -> None: |
| """Test success with branch specified by user.""" |
| self.PrepareLocalPatches("12345".rjust(40, "0")) |
| |
| def testBranchSpecifiedNoChanges(self) -> None: |
| """Test when no changes on the branch specified by user.""" |
| self.assertRaises(SystemExit, self.PrepareLocalPatches, "") |
| |
| |
| class TestFormatting(cros_test_lib.TestCase): |
| """Test formatting of output.""" |
| |
| VALID_CHANGE_ID = "I47ea30385af60ae4cc2acc5d1a283a46423bc6e1" |
| |
| def _assertResult( |
| self, functor, value, expected=None, raises=False, **kwargs |
| ) -> None: |
| if raises: |
| self.assertRaises2( |
| ValueError, |
| functor, |
| value, |
| msg="%s(%r) did not throw a ValueError" |
| % (functor.__name__, value), |
| **kwargs, |
| ) |
| else: |
| self.assertEqual( |
| functor(value, **kwargs), |
| expected, |
| msg="failed: %s(%r) != %r" |
| % (functor.__name__, value, expected), |
| ) |
| |
| def _assertBad(self, functor, values, **kwargs) -> None: |
| for value in values: |
| self._assertResult(functor, value, raises=True, **kwargs) |
| |
| def _assertGood(self, functor, values, **kwargs) -> None: |
| for value, expected in values: |
| self._assertResult(functor, value, expected, **kwargs) |
| |
| def testGerritNumber(self) -> None: |
| """Tests that we can pasre a Gerrit number.""" |
| self._assertGood( |
| cros_patch.ParseGerritNumber, |
| [("12345",) * 2, ("12",) * 2, ("123",) * 2], |
| ) |
| |
| self._assertBad( |
| cros_patch.ParseGerritNumber, |
| ["is", "i1325", "01234567", "012345a", "**12345", "+123", "/0123"], |
| error_ok=False, |
| ) |
| |
| def testChangeID(self) -> None: |
| """Tests that we can parse a change-ID.""" |
| self._assertGood( |
| cros_patch.ParseChangeID, [(self.VALID_CHANGE_ID,) * 2] |
| ) |
| |
| # Change-IDs too short/long, with unexpected characters in it. |
| self._assertBad( |
| cros_patch.ParseChangeID, |
| [ |
| "is", |
| "**i1325", |
| "i134".ljust(41, "0"), |
| "I1234+".ljust(41, "0"), |
| "I123".ljust(42, "0"), |
| ], |
| error_ok=False, |
| ) |
| |
| def testSHA1(self) -> None: |
| """Tests that we can parse a SHA1 hash.""" |
| self._assertGood( |
| cros_patch.ParseSHA1, |
| [("1" * 40,) * 2, ("a" * 40,) * 2, ("1a7e034".ljust(40, "0"),) * 2], |
| ) |
| |
| self._assertBad( |
| cros_patch.ParseSHA1, |
| ["0abcg", "Z", "**a", "+123", "1234ab" * 10], |
| error_ok=False, |
| ) |
| |
| def testFullChangeID(self) -> None: |
| """Tests that we can parse a full change-ID.""" |
| change_id = self.VALID_CHANGE_ID |
| self._assertGood( |
| cros_patch.ParseFullChangeID, |
| ( |
| ( |
| "foo~bar~%s" % change_id, |
| cros_patch.FullChangeId("foo", "bar", change_id), |
| ), |
| ( |
| "foo/bar/baz~refs/heads/_my-branch_~%s" % change_id, |
| cros_patch.FullChangeId( |
| "foo/bar/baz", "refs/heads/_my-branch_", change_id |
| ), |
| ), |
| ), |
| ) |
| |
| def testInvalidFullChangeID(self) -> None: |
| """Should throw an error on bad inputs.""" |
| change_id = self.VALID_CHANGE_ID |
| self._assertBad( |
| cros_patch.ParseFullChangeID, |
| ["foo", "foo~bar", "foo~bar~baz", "foo~refs/bar~%s" % change_id], |
| error_ok=False, |
| ) |
| |
| def testParsePatchDeps(self) -> None: |
| """Tests that we can parse the dependency specified by the user.""" |
| change_id = self.VALID_CHANGE_ID |
| vals = [ |
| "CL:12345", |
| "project~branch~%s" % change_id, |
| change_id, |
| change_id[1:], |
| ] |
| for val in vals: |
| self.assertTrue(cros_patch.ParsePatchDep(val) is not None) |
| |
| self._assertBad( |
| cros_patch.ParsePatchDep, |
| ["145462399", "I47ea3", "i47ea3".ljust(41, "0")], |
| ) |
| |
| |
| class MockPatchFactory: |
| """Helper class to create patches or series of them, for unit tests.""" |
| |
| def __init__(self, patch_mock=None) -> None: |
| """Constructor for factory. |
| |
| patch_mock: Optional PatchMock instance. |
| """ |
| self.patch_mock = patch_mock |
| self._patch_counter = functools.partial(next, itertools.count(1)) |
| |
| def MockPatch( |
| self, |
| change_id=None, |
| patch_number=None, |
| is_merged=False, |
| project="chromiumos/chromite", |
| remote=config_lib.GetSiteParams().EXTERNAL_REMOTE, |
| tracking_branch="refs/heads/main", |
| is_draft=False, |
| approvals=(), |
| commit_message=None, |
| ): |
| """Helper function to create mock GerritPatch objects.""" |
| if change_id is None: |
| # TODO(b/236161656): Fix. |
| # pylint: disable-next=assignment-from-no-return |
| change_id = self._patch_counter() |
| subject = f"PatchSet: {change_id}" |
| gerrit_number = str(change_id) |
| change_id = hex(change_id)[2:].rstrip("L").lower() |
| change_id = "I%s" % change_id.rjust(40, "0") |
| sha1 = hex(_GetNumber())[2:].rstrip("L").lower().rjust(40, "0") |
| if patch_number is None: |
| # TODO(b/236161656): Fix. |
| # pylint: disable-next=assignment-from-no-return |
| patch_number = _GetNumber() |
| fake_url = "http://foo/bar" |
| if not approvals: |
| approvals = [ |
| {"type": "VRIF", "value": "1", "grantedOn": 1391733002}, |
| {"type": "CRVW", "value": "2", "grantedOn": 1391733002}, |
| {"type": "COMR", "value": "2", "grantedOn": 1391733002}, |
| ] |
| |
| current_patch_set = { |
| "number": patch_number, |
| "revision": sha1, |
| "draft": is_draft, |
| "approvals": approvals, |
| } |
| patch_dict = { |
| "currentPatchSet": current_patch_set, |
| "id": change_id, |
| "number": gerrit_number, |
| "project": project, |
| "branch": tracking_branch, |
| "subject": subject, |
| "owner": {"email": "elmer.fudd@chromium.org"}, |
| "remote": remote, |
| "status": "MERGED" if is_merged else "NEW", |
| "url": "%s/%s" % (fake_url, change_id), |
| } |
| |
| patch = cros_patch.GerritPatch(patch_dict, remote, fake_url) |
| patch.pass_count = 0 |
| patch.fail_count = 1 |
| patch.total_fail_count = 3 |
| patch.commit_message = commit_message |
| |
| return patch |
| |
| def GetPatches(self, how_many=1, always_use_list=False, **kwargs): |
| """Get a sequential list of patches. |
| |
| Args: |
| how_many: How many patches to return. |
| always_use_list: Whether to use a list for a single item list. |
| **kwargs: Keyword arguments for self.MockPatch. |
| """ |
| patches = [self.MockPatch(**kwargs) for _ in range(how_many)] |
| if self.patch_mock: |
| for i, patch in enumerate(patches): |
| self.patch_mock.SetGerritDependencies(patch, patches[: i + 1]) |
| if how_many == 1 and not always_use_list: |
| return patches[0] |
| return patches |
| |
| |
| class DependencyErrorTests(cros_test_lib.MockTestCase): |
| """Tests for DependencyError.""" |
| |
| def testGetRootError(self) -> None: |
| """Test GetRootError on nested DependencyError.""" |
| p_1, p_2, p_3 = MockPatchFactory().GetPatches(how_many=3) |
| ex_1 = cros_patch.ApplyPatchException(p_1) |
| ex_2 = cros_patch.DependencyError(p_2, ex_1) |
| ex_3 = cros_patch.DependencyError(p_3, ex_2) |
| |
| self.assertEqual(ex_3.GetRootError(), ex_1) |
| |
| def testGetRootErrorOnCircurlarError(self) -> None: |
| """Test GetRootError on circular.""" |
| p_1, p_2, p_3 = MockPatchFactory().GetPatches(how_many=3) |
| ex_1 = cros_patch.DependencyError( |
| p_2, cros_patch.ApplyPatchException(p_1) |
| ) |
| ex_2 = cros_patch.DependencyError(p_2, ex_1) |
| ex_3 = cros_patch.DependencyError(p_3, ex_2) |
| ex_1.error = ex_3 |
| |
| self.assertIsNone(ex_3.GetRootError()) |