# Copyright 2012 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Unit tests for cros_portage_upgrade.py."""

import filecmp
import logging
import os
import re
import subprocess
from typing import Any
import unittest
from unittest import mock

import pytest  # pylint: disable=import-error

from chromite.lib import cros_build_lib
from chromite.lib import cros_test_lib
from chromite.lib import depgraph
from chromite.lib import osutils
from chromite.lib import terminal
from chromite.lib import upgrade_table as utable
from chromite.lib.parser import package_info
from chromite.scripts import cros_portage_upgrade as cpu


pytestmark = cros_test_lib.pytestmark_inside_only


# This left in, but disabled, until we can get GetCurrentVersionsTest
# working again.
# pylint: disable=import-error
# from portage.package.ebuild import config as portcfg


# This unittest module pokes a lot of internal cros_portage_upgrade state
# that we don't want exported to anyone else.
# pylint: disable=protected-access


# Enable color invariably. Since we rely on color for error/warn message
# recognition, leaving this to be decided based on stdout being a tty
# will make the tests fail/succeed based on how they are run.
cpu.oper._color._enabled = True

# Configuration for generating a temporary valid ebuild hierarchy.
# ResolverPlayground sets up a default profile with ARCH=x86, so
# other architectures are irrelevant for now.
DEFAULT_ARCH = "x86"
EBUILDS = {
    "dev-libs/A-1": {"RDEPEND": "dev-libs/B"},
    "dev-libs/A-2": {"RDEPEND": "dev-libs/B"},
    "dev-libs/B-1": {"RDEPEND": "dev-libs/C"},
    "dev-libs/B-2": {"RDEPEND": "dev-libs/C"},
    "dev-libs/C-1": {},
    "dev-libs/C-2": {},
    "dev-libs/D-1": {"RDEPEND": "!dev-libs/E"},
    "dev-libs/D-2": {},
    "dev-libs/D-3": {},
    "dev-libs/E-2": {"RDEPEND": "!dev-libs/D"},
    "dev-libs/E-3": {},
    "dev-libs/F-1": {"SLOT": "1"},
    "dev-libs/F-2": {"SLOT": "2"},
    "dev-libs/F-2-r1": {
        "SLOT": "2",
        "KEYWORDS": "~amd64 ~x86 ~arm",
    },
    "dev-apps/X-1": {
        "EAPI": "3",
        "SLOT": "0",
        "KEYWORDS": "amd64 arm x86",
        "RDEPEND": "=dev-libs/C-1",
    },
    "dev-apps/Y-2": {
        "EAPI": "3",
        "SLOT": "0",
        "KEYWORDS": "amd64 arm x86",
        "RDEPEND": "=dev-libs/C-2",
    },
    "chromeos-base/flimflam-0.0.1-r228": {
        "EAPI": "2",
        "SLOT": "0",
        "KEYWORDS": "amd64 x86 arm",
        "RDEPEND": ">=dev-libs/D-2",
    },
    "chromeos-base/flimflam-0.0.2-r123": {
        "EAPI": "2",
        "SLOT": "0",
        "KEYWORDS": "~amd64 ~x86 ~arm",
        "RDEPEND": ">=dev-libs/D-3",
    },
    "chromeos-base/libchrome-57098-r4": {
        "EAPI": "2",
        "SLOT": "0",
        "KEYWORDS": "amd64 x86 arm",
        "RDEPEND": ">=dev-libs/E-2",
    },
    "chromeos-base/libcros-1": {
        "EAPI": "2",
        "SLOT": "0",
        "KEYWORDS": "amd64 x86 arm",
        "RDEPEND": "dev-libs/B dev-libs/C chromeos-base/flimflam",
        "DEPEND": (
            "dev-libs/B dev-libs/C chromeos-base/flimflam "
            "chromeos-base/libchrome"
        ),
    },
    "virtual/libusb-0": {
        "EAPI": "2",
        "SLOT": "0",
        "RDEPEND": (
            "|| ( >=dev-libs/libusb-0.1.12-r1:0 dev-libs/libusb-compat "
            ">=sys-freebsd/freebsd-lib-8.0[usb] )"
        ),
    },
    "virtual/libusb-1": {
        "EAPI": "2",
        "SLOT": "1",
        "RDEPEND": ">=dev-libs/libusb-1.0.4:1",
    },
    "dev-libs/libusb-0.1.13": {},
    "dev-libs/libusb-1.0.5": {"SLOT": "1"},
    "dev-libs/libusb-compat-1": {},
    "sys-freebsd/freebsd-lib-8": {"IUSE": "+usb"},
    "sys-fs/udev-164": {"EAPI": "1", "RDEPEND": "virtual/libusb:0"},
    "virtual/jre-1.5.0": {
        "SLOT": "1.5",
        "RDEPEND": "|| ( =dev-java/sun-jre-bin-1.5.0* =virtual/jdk-1.5.0* )",
    },
    "virtual/jre-1.5.0-r1": {
        "SLOT": "1.5",
        "RDEPEND": "|| ( =dev-java/sun-jre-bin-1.5.0* =virtual/jdk-1.5.0* )",
    },
    "virtual/jre-1.6.0": {
        "SLOT": "1.6",
        "RDEPEND": "|| ( =dev-java/sun-jre-bin-1.6.0* =virtual/jdk-1.6.0* )",
    },
    "virtual/jre-1.6.0-r1": {
        "SLOT": "1.6",
        "RDEPEND": "|| ( =dev-java/sun-jre-bin-1.6.0* =virtual/jdk-1.6.0* )",
    },
    "virtual/jdk-1.5.0": {
        "SLOT": "1.5",
        "RDEPEND": "|| ( =dev-java/sun-jdk-1.5.0* dev-java/gcj-jdk )",
    },
    "virtual/jdk-1.5.0-r1": {
        "SLOT": "1.5",
        "RDEPEND": "|| ( =dev-java/sun-jdk-1.5.0* dev-java/gcj-jdk )",
    },
    "virtual/jdk-1.6.0": {
        "SLOT": "1.6",
        "RDEPEND": "|| ( =dev-java/icedtea-6* =dev-java/sun-jdk-1.6.0* )",
    },
    "virtual/jdk-1.6.0-r1": {
        "SLOT": "1.6",
        "RDEPEND": "|| ( =dev-java/icedtea-6* =dev-java/sun-jdk-1.6.0* )",
    },
    "dev-java/gcj-jdk-4.5": {},
    "dev-java/gcj-jdk-4.5-r1": {},
    "dev-java/icedtea-6.1": {},
    "dev-java/icedtea-6.1-r1": {},
    "dev-java/sun-jdk-1.5": {"SLOT": "1.5"},
    "dev-java/sun-jdk-1.6": {"SLOT": "1.6"},
    "dev-java/sun-jre-bin-1.5": {"SLOT": "1.5"},
    "dev-java/sun-jre-bin-1.6": {"SLOT": "1.6"},
    "dev-java/ant-core-1.8": {"DEPEND": ">=virtual/jdk-1.4"},
    "dev-db/hsqldb-1.8": {"RDEPEND": ">=virtual/jre-1.6"},
}

WORLD = [
    "dev-libs/A",
    "dev-libs/D",
    "virtual/jre",
]

INSTALLED = {
    "dev-libs/A-1": {},
    "dev-libs/B-1": {},
    "dev-libs/C-1": {},
    "dev-libs/D-1": {},
    "virtual/jre-1.5.0": {
        "SLOT": "1.5",
        "RDEPEND": "|| ( =virtual/jdk-1.5.0* =dev-java/sun-jre-bin-1.5.0* )",
    },
    "virtual/jre-1.6.0": {
        "SLOT": "1.6",
        "RDEPEND": "|| ( =virtual/jdk-1.6.0* =dev-java/sun-jre-bin-1.6.0* )",
    },
    "virtual/jdk-1.5.0": {
        "SLOT": "1.5",
        "RDEPEND": "|| ( =dev-java/sun-jdk-1.5.0* dev-java/gcj-jdk )",
    },
    "virtual/jdk-1.6.0": {
        "SLOT": "1.6",
        "RDEPEND": "|| ( =dev-java/icedtea-6* =dev-java/sun-jdk-1.6.0* )",
    },
    "dev-java/gcj-jdk-4.5": {},
    "dev-java/icedtea-6.1": {},
    "virtual/libusb-0": {
        "EAPI": "2",
        "SLOT": "0",
        "RDEPEND": (
            "|| ( >=dev-libs/libusb-0.1.12-r1:0 dev-libs/libusb-compat "
            ">=sys-freebsd/freebsd-lib-8.0[usb] )"
        ),
    },
}

# For verifying dependency graph results.
GOLDEN_DEP_GRAPHS = {
    "dev-libs/A-2": {
        "needs": {"dev-libs/B-2": "runtime"},
        "action": "merge",
    },
    "dev-libs/B-2": {"needs": {"dev-libs/C-2": "runtime"}},
    "dev-libs/C-2": {"needs": {}},
    "dev-libs/D-3": {"needs": {}},
    # TODO(mtennant): Bug in parallel_emerge deps graph makes blocker show up
    # for E-3, rather than in just E-2 where it belongs.
    # See https://crbug.com/205225.
    # To repeat bug, swap the commented status of next two lines.
    # 'dev-libs/E-3': {'needs': {}},
    "dev-libs/E-3": {"needs": {"dev-libs/D-3": "blocker"}},
    "chromeos-base/libcros-1": {
        "needs": {
            "dev-libs/B-2": "runtime/buildtime",
            "dev-libs/C-2": "runtime/buildtime",
            "chromeos-base/libchrome-57098-r4": "buildtime",
            "chromeos-base/flimflam-0.0.1-r228": "runtime/buildtime",
        }
    },
    "chromeos-base/flimflam-0.0.1-r228": {
        "needs": {"dev-libs/D-3": "runtime"},
    },
    "chromeos-base/libchrome-57098-r4": {
        "needs": {"dev-libs/E-3": "runtime"},
    },
}

# For verifying dependency set results.
GOLDEN_DEP_SETS = {
    "dev-libs/A": set(["dev-libs/A-2", "dev-libs/B-2", "dev-libs/C-2"]),
    "dev-libs/B": set(["dev-libs/B-2", "dev-libs/C-2"]),
    "dev-libs/C": set(["dev-libs/C-2"]),
    "dev-libs/D": set(["dev-libs/D-3"]),
    "virtual/libusb": set(["virtual/libusb-1", "dev-libs/libusb-1.0.5"]),
    "chromeos-base/libcros": set(
        [
            "chromeos-base/libcros-1",
            "dev-libs/B-2",
            "chromeos-base/libchrome-57098-r4",
            "dev-libs/E-3",
            "chromeos-base/flimflam-0.0.1-r228",
            "dev-libs/D-3",
            "dev-libs/C-2",
        ]
    ),
}


def _GetGoldenDepsSet(pkg):
    """Retrieve the golden dependency set for |pkg| from GOLDEN_DEP_SETS."""
    return GOLDEN_DEP_SETS.get(pkg, None)


def _VerifyDepsGraph(deps_graph, pkgs):
    for pkg in pkgs:
        if not _VerifyDepsGraphOnePkg(deps_graph, pkg):
            return False

    return True


def _VerifyDepsGraphOnePkg(deps_graph, pkg):
    """Verfication function to validate deps graph for |pkg|."""

    if deps_graph is None:
        print("Error: no dependency graph passed into _GetPreOrderDepGraph")
        return False

    if not isinstance(deps_graph, dict):
        print(
            "Error: dependency graph is expected to be a dict.  Instead:\n%r"
            % deps_graph
        )
        return False

    validated = True

    golden_deps_set = _GetGoldenDepsSet(pkg)
    if golden_deps_set is None:
        print(
            "Error: golden dependency list not configured for %s package" % pkg
        )
        validated = False

    # Verify dependencies by comparing them to GOLDEN_DEP_GRAPHS.
    for p in deps_graph:
        golden_pkg_info = None
        try:
            golden_pkg_info = GOLDEN_DEP_GRAPHS[p]
        except KeyError:
            print(
                "Error: golden dependency graph not configured for %s package"
                % p
            )
            validated = False
            continue

        pkg_info = deps_graph[p]
        for key in golden_pkg_info:
            golden_value = golden_pkg_info[key]
            value = pkg_info[key]
            if not value == golden_value:
                print(
                    'Error: while verifying "%s" value for %s package,'
                    " expected:\n%r\nBut instead found:\n%r"
                    % (key, p, golden_value, value)
                )
                validated = False

    if not validated:
        print(
            "Error: dependency graph for %s is not as expected.  Instead:\n%r"
            % (pkg, deps_graph)
        )

    return validated


def _GenDepsGraphVerifier(pkgs):
    """Generate a graph verification function for the given package."""
    return lambda deps_graph: _VerifyDepsGraph(deps_graph, pkgs)


class ManifestLine:
    """Class to represent a Manifest line."""

    __slots__ = (
        "type",  # DIST, EBUILD, etc.
        "file",
        "size",
        "RMD160",
        "SHA1",
        "SHA256",
    )

    __attrlist__ = __slots__

    def __init__(self, line=None, **kwargs) -> None:
        """Parse |line| from manifest file."""
        if line:
            tokens = line.split()
            self.type = tokens[0]
            self.file = tokens[1]
            self.size = tokens[2]
            self.RMD160 = tokens[4]
            self.SHA1 = tokens[6]
            self.SHA256 = tokens[8]

            assert tokens[3] == "RMD160"
            assert tokens[5] == "SHA1"
            assert tokens[7] == "SHA256"

        # Entries in kwargs are overwrites.
        for attr in self.__attrlist__:
            if attr in kwargs or not hasattr(self, attr):
                setattr(self, attr, kwargs.get(attr))

    def __str__(self) -> str:
        return "%s %s %s RMD160 %s SHA1 %s SHA256 %s" % (
            self.type,
            self.file,
            self.size,
            self.RMD160,
            self.SHA1,
            self.SHA256,
        )

    def __eq__(self, other: Any) -> bool:
        """Equality support."""
        if not isinstance(other, type(self)):
            return False

        no_attr = object()
        for attr in self.__attrlist__:
            if getattr(self, attr, no_attr) != getattr(other, attr, no_attr):
                return False

        return True

    def __ne__(self, other: Any) -> bool:
        """Inequality for completeness."""
        return not self == other


class PInfoTest(cros_test_lib.TestCase):
    """Tests for the PInfo class."""

    def testInit(self) -> None:
        pinfo = cpu.PInfo(category="SomeCat", user_arg="SomeArg")

        self.assertEqual("SomeCat", pinfo.category)
        self.assertEqual("SomeArg", pinfo.user_arg)

        self.assertEqual(None, pinfo.cpv)
        self.assertEqual(None, pinfo.overlay)

        self.assertRaises(AttributeError, getattr, pinfo, "foobar")

    def testEqAndNe(self) -> None:
        pinfo1 = cpu.PInfo(category="SomeCat", user_arg="SomeArg")

        # We do redundant tests because we implement the comparison methods
        # ourselves, and we want to make sure they work.  The 2nd option here
        # here is Python 3 specific, so we have to suppress for Python 2.
        # pylint: disable=bad-option-value,comparison-with-itself
        self.assertEqual(pinfo1, pinfo1)
        self.assertTrue(pinfo1 == pinfo1)
        self.assertFalse(pinfo1 != pinfo1)

        pinfo2 = cpu.PInfo(category="SomeCat", user_arg="SomeArg")

        self.assertEqual(pinfo1, pinfo2)
        self.assertTrue(pinfo1 == pinfo2)
        self.assertFalse(pinfo1 != pinfo2)

        pinfo3 = cpu.PInfo(category="SomeCat", user_arg="SomeOtherArg")

        self.assertNotEqual(pinfo1, pinfo3)
        self.assertFalse(pinfo1 == pinfo3)
        self.assertTrue(pinfo1 != pinfo3)

        pinfo4 = cpu.PInfo(category="SomeCat", slot="SomeSlot")

        self.assertNotEqual(pinfo1, pinfo4)
        self.assertFalse(pinfo1 == pinfo4)
        self.assertTrue(pinfo1 != pinfo4)


class CpuTestBase(cros_test_lib.MockTempDirTestCase):
    """Base class for all test classes in this file."""

    def setUp(self) -> None:
        # Mock overlay we can run tests against.  Tests have to manually call
        # _SetUpPlayground first to initialize it.
        self.portage_stable = os.path.join(self.tempdir, "portage-stable")

        # A mock system where we have packages installed.
        self.eroot = os.path.join(self.tempdir, "root")

        # Where we clone the upstream git repo to.
        self.upstream_tmp_repo = os.path.join(self.tempdir, "git-checkout")

    def _SetUpPlayground(
        self, ebuilds=None, installed=None, world=None
    ) -> None:
        """Prepare the temporary ebuild playground.

        This used to leverage portage.tests.resolver.ResolverPlayground, but
        that module isn't installed anymore.  So we do some of our own ad-hoc
        setup in place of the portage code.  This works for some unittests, but
        needs to fill out the playground more for others.  See the unittest.skip
        tests for the ones we've disabled for now.

        Args:
            ebuilds: A list of hashes representing ebuild files in a portdir.
            installed: A list of hashes representing ebuilds files already
                installed.
            world: A list of lines to simulate in the world file.
        """
        # It's safe to use these globals as we treat these dicts are read-only.
        if ebuilds is None:
            ebuilds = EBUILDS
        if installed is None:
            installed = INSTALLED
        if world is None:
            world = WORLD

        osutils.SafeMakedirs(self.portage_stable)

        # Default keys that we often look at for packages.
        DEFAULT_METADATA = {
            "EAPI": "5",
            "KEYWORDS": DEFAULT_ARCH,
            "SLOT": "0",
        }

        var_lib_portage = os.path.join(self.eroot, "var", "lib", "portage")
        osutils.WriteFile(
            os.path.join(var_lib_portage, "world"),
            "\n".join(world),
            makedirs=True,
        )

        vdb = os.path.join(var_lib_portage, "pkgs")
        for ebuild, custom_metadata in installed.items():
            pkg_info = package_info.parse(ebuild)
            vdb_pkg = os.path.join(vdb, pkg_info.cpvr)
            osutils.SafeMakedirs(vdb_pkg)
            metadata = DEFAULT_METADATA.copy()
            metadata.update(custom_metadata)
            for key, value in metadata.items():
                osutils.WriteFile(os.path.join(vdb_pkg, key), value)

        for ebuild, custom_metadata in ebuilds.items():
            pkg_info = package_info.parse(ebuild)
            metadata = DEFAULT_METADATA.copy()
            metadata.update(custom_metadata)
            content = "EAPI=%s\n" % metadata.pop("EAPI")
            content += "\n".join('%s="%s"' % x for x in metadata.items())
            osutils.WriteFile(
                os.path.join(
                    self.upstream_tmp_repo,
                    pkg_info.relative_path,
                ),
                content,
                makedirs=True,
            )

        # Set all envvars needed by parallel_emerge, since parallel_emerge
        # normally does that when --board is given.
        os.environ.update(
            {
                "PORTAGE_CONFIGROOT": self.eroot,
                "ROOT": self.eroot,
                "PORTDIR": self.portage_stable,
                # See _GenPortageEnvvars for more info on this setting.
                "PORTDIR_OVERLAY": self.portage_stable,
            }
        )

    def _MockUpgrader(self, cmdargs=None, **kwargs):
        """Set up a mocked Upgrader object with the given args."""
        upgrader_slot_defaults = {
            "_curr_arch": DEFAULT_ARCH,
            "_curr_board": "some_board",
            "_unstable_ok": False,
            "_verbose": False,
        }

        # The mock autospec will mock out class constants too.  Copy over the
        # members by hand (assuming everything uppercase is a constant).
        upgrader = mock.create_autospec(cpu.Upgrader)
        for member in dir(cpu.Upgrader):
            if member.upper() == member:
                setattr(upgrader, member, getattr(cpu.Upgrader, member))

        # Initialize each attribute with default value.
        for slot in cpu.Upgrader.__slots__:
            value = upgrader_slot_defaults.get(slot, None)
            upgrader.__setattr__(slot, value)

        # Initialize with command line if given.
        if cmdargs is not None:
            parser = cpu._CreateParser()
            options = parser.parse_args(cmdargs)
            cpu.Upgrader.__init__(upgrader, options)

        # Override Upgrader attributes if requested.
        for key, value in kwargs.items():
            self.assertIn(key, cpu.Upgrader.__slots__)
            upgrader.__setattr__(key, value)

        # Copy over static methods.
        upgrader._CreateCommitMessage = cpu.Upgrader._CreateCommitMessage
        upgrader._ExtractUpgradedPkgs = cpu.Upgrader._ExtractUpgradedPkgs
        upgrader._GenPortageEnvvars = cpu.Upgrader._GenPortageEnvvars
        upgrader._GetCatPkgFromCpv = cpu.Upgrader._GetCatPkgFromCpv
        upgrader._SplitEBuildPath = cpu.Upgrader._SplitEBuildPath
        upgrader._EqueryWhich = cpu.Upgrader._EqueryWhich

        # Point to our tempdir to avoid clobbering /tmp/portage by accident.
        upgrader.UPSTREAM_TMP_REPO = upgrader._upstream = self.upstream_tmp_repo
        upgrader._stable_repo = self.portage_stable

        return upgrader


class CopyUpstreamTest(CpuTestBase):
    """Test Upgrader._CopyUpstreamPackage, _CopyUpstreamEclass"""

    def _AddEclassToPlayground(
        self, eclass, content="", ebuilds=None, missing=False
    ) -> None:
        """Hack to insert an eclass into the playground source.

        Args:
            eclass: Name of eclass to create (without .eclass suffix).  Will be
                created as an empty file unless |content| is specified.
            content: Text to put into created eclass.
            ebuilds: List of ebuilds to put inherit line into.  Should be path
                to ebuild from playground portdir.
            missing: If True, do not actually create the eclass file.  Only
                makes sense if |ebuilds| is non-empty, presumably to test
                inherit failure.
        """
        if ebuilds is None:
            ebuilds = []

        portdir = self.upstream_tmp_repo
        eclass_path = os.path.join(portdir, "eclass", "%s.eclass" % eclass)

        # Create the eclass file.
        osutils.WriteFile(eclass_path, content, makedirs=True)

        # Insert the inherit line into the ebuild file, if requested.
        for ebuild in ebuilds:
            ebuild_path = os.path.join(self.upstream_tmp_repo, ebuild)

            text = osutils.ReadFile(ebuild_path)

            def repl(match):
                return match.group(1) + "\ninherit " + eclass

            text = re.sub(r"(EAPI.*)", repl, text)

            osutils.WriteFile(ebuild_path, text)

            # Remove the Manifest file.
            osutils.SafeUnlink(
                os.path.join(os.path.dirname(ebuild_path), "Manifest")
            )

            # Recreate the Manifests using the ebuild utility.
            cmd = ["ebuild", ebuild_path, "manifest"]
            cros_build_lib.run(
                cmd, print_cmd=False, stdout=True, stderr=subprocess.STDOUT
            )

        # If requested, remove the eclass.
        if missing:
            os.remove(eclass_path)

    #
    # _IdentifyNeededEclass
    #

    def _TestIdentifyNeededEclass(self, cpv, ebuild, eclass, create_eclass):
        """Test Upgrader._IdentifyNeededEclass"""
        self._SetUpPlayground()
        mocked_upgrader = self._MockUpgrader(cmdargs=[])
        self._AddEclassToPlayground(
            eclass, ebuilds=[ebuild], missing=not create_eclass
        )

        # Replay script.
        mocked_upgrader._GetBoardCmd.return_value = "equery"

        # Verify.
        return cpu.Upgrader._IdentifyNeededEclass(mocked_upgrader, cpv)

    @unittest.skip("playground setup needs more work")
    def testIdentifyNeededEclassMissing(self) -> None:
        result = self._TestIdentifyNeededEclass(
            "dev-libs/A-2", "dev-libs/A/A-2.ebuild", "inheritme", False
        )
        self.assertEqual("inheritme.eclass", result)

    def testIdentifyNeededEclassOK(self) -> None:
        result = self._TestIdentifyNeededEclass(
            "dev-libs/A-2", "dev-libs/A/A-2.ebuild", "inheritme", True
        )
        self.assertIsNone(result)

    #
    # _CopyUpstreamEclass
    #

    def _TestCopyUpstreamEclass(
        self, eclass, local_content=None, upstream_content=None
    ) -> None:
        """Test Upgrader._CopyUpstreamEclass"""
        self._SetUpPlayground()
        mocked_upgrader = self._MockUpgrader(_curr_board=None)

        eclass_name = "%s.eclass" % eclass
        eclass_subpath = os.path.join("eclass", eclass_name)
        eclass_path = os.path.join(self.portage_stable, eclass_subpath)
        upstream_eclass_path = os.path.join(
            self.upstream_tmp_repo, eclass_subpath
        )

        if local_content:
            osutils.WriteFile(eclass_path, local_content, makedirs=True)

        if upstream_content:
            osutils.WriteFile(
                upstream_eclass_path, upstream_content, makedirs=True
            )

        # Verify.
        result = None
        if upstream_content is None:
            self.assertRaises(
                RuntimeError,
                cpu.Upgrader._CopyUpstreamEclass,
                mocked_upgrader,
                eclass_name,
            )
        else:
            result = cpu.Upgrader._CopyUpstreamEclass(
                mocked_upgrader, eclass_name
            )

        if upstream_content and local_content != upstream_content:
            mocked_upgrader._RunGit.assert_called_once_with(
                mocked_upgrader._stable_repo, ["add", eclass_subpath]
            )

            self.assertTrue(result)
            # Verify that eclass has been copied into portage-stable.
            self.assertExists(eclass_path)
            # Verify that eclass contents are correct.
            self.assertTrue(filecmp.cmp(upstream_eclass_path, eclass_path))

        else:
            self.assertFalse(result)

    def testCopyUpstreamEclassCopyBecauseMissing(self) -> None:
        self._TestCopyUpstreamEclass("inheritme", upstream_content="# Up")

    def testCopyUpstreamEclassCopyBecauseDifferent(self) -> None:
        self._TestCopyUpstreamEclass(
            "inheritme", local_content="# Local", upstream_content="# Up"
        )

    def testCopyUpstreamEclassNoCopyBecauseIdentical(self) -> None:
        self._TestCopyUpstreamEclass(
            "inheritme", local_content="# Bar", upstream_content="# Bar"
        )

    def testCopyUpstreamEclassNoCopyBecauseUpstreamMissing(self) -> None:
        self._TestCopyUpstreamEclass("inheritme", local_content="# Local")

    #
    # _CopyUpstreamPackage
    #

    def _TestCopyUpstreamPackage(
        self,
        catpkg,
        verrev,
        success,
        existing_files,
        extra_upstream_files,
        error=None,
    ) -> None:
        """Test Upgrader._CopyUpstreamPackage"""
        upstream_cpv = "%s-%s" % (catpkg, verrev)
        ebuild = "%s-%s.ebuild" % (catpkg.split("/")[-1], verrev)

        self._SetUpPlayground()

        # Simulate extra files in upsteam package dir.
        if extra_upstream_files:
            pkg_dir = os.path.join(self.upstream_tmp_repo, catpkg)
            if os.path.exists(pkg_dir):
                for extra_file in extra_upstream_files:
                    osutils.Touch(os.path.join(pkg_dir, extra_file))

        # Prepare stub portage-stable dir, with extra previously
        # existing files simulated if requested.
        if existing_files:
            pkg_dir = os.path.join(self.portage_stable, catpkg)
            for existing_file in existing_files:
                osutils.Touch(
                    os.path.join(pkg_dir, existing_file), makedirs=True
                )

        mocked_upgrader = self._MockUpgrader(_curr_board=None)

        # Replay script.
        if success:
            if existing_files:

                def git_rm(cwd, cmd, **_kwargs) -> None:
                    # Identify file that pseudo-git is to remove, then remove
                    # it.
                    self.assertEqual("rm", cmd[0])
                    self.assertEqual("-rf", cmd[1])

                    # Remove the files after the options.
                    paths = cmd[1:]
                    while paths[0].startswith("-"):
                        paths.pop(0)
                    self.assertEqual(sorted(rm_list), sorted(paths))
                    for path in paths:
                        os.remove(os.path.join(cwd, path))

                    # As with real "git rm", if the dir is then empty remove
                    # that.
                    try:
                        os.rmdir(cwd)
                        os.rmdir(os.path.dirname(cwd))
                    except OSError:
                        pass

                rm_list = [os.path.join(catpkg, f) for f in existing_files]
                mocked_upgrader._RunGit.side_effect = git_rm

            mocked_upgrader._IdentifyNeededEclass.return_value = None

        # Verify.
        result = None
        if error:
            self.assertRaises(
                error,
                cpu.Upgrader._CopyUpstreamPackage,
                mocked_upgrader,
                upstream_cpv,
            )
        else:
            result = cpu.Upgrader._CopyUpstreamPackage(
                mocked_upgrader, upstream_cpv
            )

        if success:
            self.assertEqual(result, upstream_cpv)

            mocked_upgrader._IdentifyNeededEclass.assert_called_once_with(
                upstream_cpv
            )

            # Verify that ebuild has been copied into portage-stable.
            ebuild_path = os.path.join(self.portage_stable, catpkg, ebuild)
            self.assertExists(
                ebuild_path,
                msg="Missing expected ebuild after copy from upstream",
            )

            # Verify that any extra files upstream are also copied.
            for extra_file in extra_upstream_files:
                file_path = os.path.join(
                    self.portage_stable, catpkg, extra_file
                )
                msg = (
                    "Missing expected extra file %s after copy from upstream"
                    % extra_file
                )
                self.assertExists(file_path, msg=msg)
        else:
            self.assertIsNone(result)

    def testCopyUpstreamPackageEmptyStable(self) -> None:
        existing_files = []
        extra_upstream_files = []
        self._TestCopyUpstreamPackage(
            "dev-libs/D", "2", True, existing_files, extra_upstream_files
        )

    def testCopyUpstreamPackageClutteredStable(self) -> None:
        existing_files = ["foo", "bar", "foobar.ebuild", "D-1.ebuild"]
        extra_upstream_files = []
        self._TestCopyUpstreamPackage(
            "dev-libs/D", "2", True, existing_files, extra_upstream_files
        )

    def testCopyUpstreamPackageVersionNotAvailable(self) -> None:
        """Should fail, dev-libs/D version 5 does not exist 'upstream'"""
        existing_files = []
        extra_upstream_files = []
        self._TestCopyUpstreamPackage(
            "dev-libs/D",
            "5",
            False,
            existing_files,
            extra_upstream_files,
            error=RuntimeError,
        )

    def testCopyUpstreamPackagePackageNotAvailable(self) -> None:
        """Should fail, a-b-c/D does not exist 'upstream' in any version"""
        existing_files = []
        extra_upstream_files = []
        self._TestCopyUpstreamPackage(
            "a-b-c/D",
            "5",
            False,
            existing_files,
            extra_upstream_files,
            error=RuntimeError,
        )

    def testCopyUpstreamPackageExtraUpstreamFiles(self) -> None:
        existing_files = ["foo", "bar"]
        extra_upstream_files = ["keepme", "andme"]
        self._TestCopyUpstreamPackage(
            "dev-libs/F", "2-r1", True, existing_files, extra_upstream_files
        )


class GetPackageUpgradeStateTest(CpuTestBase):
    """Test Upgrader._GetPackageUpgradeState"""

    def _TestGetPackageUpgradeState(self, pinfo, exists_upstream):
        cmdargs = []
        mocked_upgrader = self._MockUpgrader(cmdargs=cmdargs)

        def FindUpstream(pkg, unstable_ok=False):
            self.assertEqual(pinfo.cpv, pkg)
            self.assertTrue(unstable_ok)
            return exists_upstream

        mocked_upgrader._FindUpstreamCPV.side_effect = FindUpstream

        # Verify.
        return cpu.Upgrader._GetPackageUpgradeState(mocked_upgrader, pinfo)

    def testGetPackageUpgradeStateLocalOnly(self) -> None:
        pinfo = cpu.PInfo(
            cpv="foo/bar-2",
            overlay="chromiumos-overlay",
            cpv_cmp_upstream=None,
            latest_upstream_cpv=None,
        )
        result = self._TestGetPackageUpgradeState(pinfo, exists_upstream=False)
        self.assertEqual(result, utable.UpgradeTable.STATE_LOCAL_ONLY)

    def testGetPackageUpgradeStateUnknown(self) -> None:
        pinfo = cpu.PInfo(
            cpv="foo/bar-2",
            overlay="portage",
            cpv_cmp_upstream=None,
            latest_upstream_cpv=None,
        )
        result = self._TestGetPackageUpgradeState(pinfo, exists_upstream=False)
        self.assertEqual(result, utable.UpgradeTable.STATE_UNKNOWN)

    def testGetPackageUpgradeStateUpgradeAndDuplicated(self) -> None:
        pinfo = cpu.PInfo(
            cpv="foo/bar-2",
            overlay="chromiumos-overlay",
            cpv_cmp_upstream=1,  # outdated
            latest_upstream_cpv="not important",
        )
        result = self._TestGetPackageUpgradeState(pinfo, exists_upstream=True)
        self.assertEqual(
            result, utable.UpgradeTable.STATE_NEEDS_UPGRADE_AND_DUPLICATED
        )

    def testGetPackageUpgradeStateUpgradeAndPatched(self) -> None:
        pinfo = cpu.PInfo(
            cpv="foo/bar-2",
            overlay="chromiumos-overlay",
            cpv_cmp_upstream=1,  # outdated
            latest_upstream_cpv="not important",
        )
        result = self._TestGetPackageUpgradeState(pinfo, exists_upstream=False)
        self.assertEqual(
            result, utable.UpgradeTable.STATE_NEEDS_UPGRADE_AND_PATCHED
        )

    def testGetPackageUpgradeStateUpgrade(self) -> None:
        pinfo = cpu.PInfo(
            cpv="foo/bar-2",
            overlay="portage-stable",
            cpv_cmp_upstream=1,  # outdated
            latest_upstream_cpv="not important",
        )
        result = self._TestGetPackageUpgradeState(pinfo, exists_upstream=False)
        self.assertEqual(result, utable.UpgradeTable.STATE_NEEDS_UPGRADE)

    def testGetPackageUpgradeStateDuplicated(self) -> None:
        pinfo = cpu.PInfo(
            cpv="foo/bar-2",
            overlay="chromiumos-overlay",
            cpv_cmp_upstream=0,  # current
            latest_upstream_cpv="not important",
        )
        result = self._TestGetPackageUpgradeState(pinfo, exists_upstream=True)
        self.assertEqual(result, utable.UpgradeTable.STATE_DUPLICATED)

    def testGetPackageUpgradeStatePatched(self) -> None:
        pinfo = cpu.PInfo(
            cpv="foo/bar-2",
            overlay="chromiumos-overlay",
            cpv_cmp_upstream=0,  # current
            latest_upstream_cpv="not important",
        )
        result = self._TestGetPackageUpgradeState(pinfo, exists_upstream=False)
        self.assertEqual(result, utable.UpgradeTable.STATE_PATCHED)

    def testGetPackageUpgradeStateCurrent(self) -> None:
        pinfo = cpu.PInfo(
            cpv="foo/bar-2",
            overlay="portage-stable",
            cpv_cmp_upstream=0,  # current
            latest_upstream_cpv="not important",
        )
        result = self._TestGetPackageUpgradeState(pinfo, exists_upstream=False)
        self.assertEqual(result, utable.UpgradeTable.STATE_CURRENT)


@unittest.skip("playground setup needs more work")
class EmergeableTest(CpuTestBase):
    """Test Upgrader._AreEmergeable."""

    def _TestAreEmergeable(self, cpvlist, expect, world=None) -> None:
        """Test the Upgrader._AreEmergeable method.

        Args:
            cpvlist: Passed to _AreEmergeable.
            expect: Expected boolean return value of _AreEmergeable.
            world: List of lines to override default world contents.
        """
        # It's safe to use these globals as we treat these dicts are read-only.
        if world is None:
            world = WORLD

        cmdargs = ["--upgrade"] + cpvlist
        mocked_upgrader = self._MockUpgrader(cmdargs=cmdargs)
        self._SetUpPlayground(world=world)

        # Replay script.
        mocked_upgrader._GetBoardCmd.return_value = "emerge"

        # Verify.
        result = cpu.Upgrader._AreEmergeable(mocked_upgrader, cpvlist)

        (code, _cmd, output) = result
        logging.debug(
            "Test ended with success==%r (expected==%r)", code, expect
        )
        logging.debug("Emerge output:\n%s", output)

        self.assertEqual(code, expect)

    def testAreEmergeableOnePkg(self):
        """Should pass, one cpv target."""
        cpvlist = ["dev-libs/A-1"]
        return self._TestAreEmergeable(cpvlist, True)

    def testAreEmergeableTwoPkgs(self):
        """Should pass, two cpv targets."""
        cpvlist = ["dev-libs/A-1", "dev-libs/B-1"]
        return self._TestAreEmergeable(cpvlist, True)

    def testAreEmergeableOnePkgTwoVersions(self):
        """Should fail, targets two versions of same package."""
        cpvlist = ["dev-libs/A-1", "dev-libs/A-2"]
        return self._TestAreEmergeable(cpvlist, False)

    def testAreEmergeableStableFlimFlam(self):
        """Should pass, target stable version of pkg."""
        cpvlist = ["chromeos-base/flimflam-0.0.1-r228"]
        return self._TestAreEmergeable(cpvlist, True)

    def testAreEmergeableUnstableFlimFlam(self):
        """Should fail, target unstable version of pkg."""
        cpvlist = ["chromeos-base/flimflam-0.0.2-r123"]
        return self._TestAreEmergeable(cpvlist, False)

    def testAreEmergeableBlockedPackages(self):
        """Should fail, targets have blocking deps on each other."""
        cpvlist = ["dev-libs/D-1", "dev-libs/E-2"]
        return self._TestAreEmergeable(cpvlist, False)

    def testAreEmergeableBlockedByInstalledPkg(self):
        """Should fail because of installed D-1 pkg."""
        cpvlist = ["dev-libs/E-2"]
        return self._TestAreEmergeable(cpvlist, False)

    def testAreEmergeableNotBlockedByInstalledPkgNotInWorld(self):
        """Should pass because installed D-1 pkg not in world."""
        cpvlist = ["dev-libs/E-2"]
        return self._TestAreEmergeable(cpvlist, True, world=[])

    def testAreEmergeableSamePkgDiffSlots(self):
        """Should pass, same package but different slots."""
        cpvlist = ["dev-libs/F-1", "dev-libs/F-2"]
        return self._TestAreEmergeable(cpvlist, True)

    def testAreEmergeableTwoPackagesIncompatibleDeps(self):
        """Should fail, targets depend on two versions of same pkg."""
        cpvlist = ["dev-apps/X-1", "dev-apps/Y-2"]
        return self._TestAreEmergeable(cpvlist, False)


class CPVUtilTest(cros_test_lib.TestCase):
    """Test various CPV utilities in Upgrader"""

    def _TestCmpCpv(self, cpv1, cpv2):
        """Test Upgrader._CmpCpv"""
        return cpu.Upgrader._CmpCpv(cpv1, cpv2)

    def testCmpCpv(self) -> None:
        # cpvs to compare.
        equal = [
            ("foo/bar-1", "foo/bar-1"),
            ("a-b-c/x-y-z-1.2.3-r1", "a-b-c/x-y-z-1.2.3-r1"),
            ("foo/bar-1", "foo/bar-1-r0"),
            (None, None),
        ]
        for cpv1, cpv2 in equal:
            self.assertEqual(0, self._TestCmpCpv(cpv1, cpv2))

        lessthan = [
            (None, "foo/bar-1"),
            ("foo/bar-1", "foo/bar-2"),
            ("foo/bar-1", "foo/bar-1-r1"),
            ("foo/bar-1-r1", "foo/bar-1-r2"),
            ("foo/bar-1.2.3", "foo/bar-1.2.4"),
            ("foo/bar-5a", "foo/bar-5b"),
        ]
        for cpv1, cpv2 in lessthan:
            # pylint: disable=bad-option-value,arguments-out-of-order
            self.assertTrue(self._TestCmpCpv(cpv1, cpv2) < 0)
            self.assertTrue(self._TestCmpCpv(cpv2, cpv1) > 0)

        not_comparable = [("foo/bar-1", "bar/foo-1")]
        for cpv1, cpv2 in not_comparable:
            self.assertEqual(None, self._TestCmpCpv(cpv1, cpv2))

    def _TestGetCatPkgFromCpv(self, cpv):
        """Test Upgrader._GetCatPkgFromCpv"""
        return cpu.Upgrader._GetCatPkgFromCpv(cpv)

    def testGetCatPkgFromCpv(self) -> None:
        # (input, output) tuples.
        data = [
            ("foo/bar-1", "foo/bar"),
            ("a-b-c/x-y-z-1", "a-b-c/x-y-z"),
            ("a-b-c/x-y-z-1.2.3-r3", "a-b-c/x-y-z"),
            ("bar-1", "bar"),
            ("bar", None),
        ]

        for cpv, catpn in data:
            result = self._TestGetCatPkgFromCpv(cpv)
            self.assertEqual(catpn, result)

    def _TestGetVerRevFromCpv(self, cpv):
        """Test Upgrader._GetVerRevFromCpv"""
        return cpu.Upgrader._GetVerRevFromCpv(cpv)

    def testGetVerRevFromCpv(self) -> None:
        # (input, output) tuples.
        data = [
            ("foo/bar-1", "1"),
            ("a-b-c/x-y-z-1", "1"),
            ("a-b-c/x-y-z-1.2.3-r3", "1.2.3-r3"),
            ("foo/bar-3.222-r0", "3.222"),
            ("bar-1", "1"),
            ("bar", None),
        ]

        for cpv, verrev in data:
            result = self._TestGetVerRevFromCpv(cpv)
            self.assertEqual(verrev, result)

    def _TestGetEbuildPathFromCpv(self, cpv):
        """Test Upgrader._GetEbuildPathFromCpv"""
        return cpu.Upgrader._GetEbuildPathFromCpv(cpv)

    def testGetEbuildPathFromCpv(self) -> None:
        # (input, output) tuples.
        data = [
            ("foo/bar-1", "foo/bar/bar-1.ebuild"),
            ("a-b-c/x-y-z-1", "a-b-c/x-y-z/x-y-z-1.ebuild"),
            ("a-b-c/x-y-z-1.2.3-r3", "a-b-c/x-y-z/x-y-z-1.2.3-r3.ebuild"),
            ("foo/bar-3.222-r0", "foo/bar/bar-3.222-r0.ebuild"),
        ]

        for cpv, verrev in data:
            result = self._TestGetEbuildPathFromCpv(cpv)
            self.assertEqual(verrev, result)


class PortageStableTest(CpuTestBase):
    """Test Upgrader.{_SaveStatusOnStableRepo, _CheckStableRepoOnBranch}."""

    STATUS_MIX = {
        "path1/file1": "M",
        "path1/path2/file2": "A",
        "a/b/.x/y~": "D",
        "foo/bar": "C",
        "/bar/foo": "U",
        "unknown/file": "??",
    }
    STATUS_UNKNOWN = {
        "foo/bar": "??",
        "a/b/c": "??",
    }
    STATUS_EMPTY = {}

    #
    # _CheckStableRepoOnBranch
    #

    def _TestCheckStableRepoOnBranch(self, run_result, expect_err) -> None:
        """Test Upgrader._CheckStableRepoOnBranch"""
        cmdargs = []
        mocked_upgrader = self._MockUpgrader(cmdargs=cmdargs)
        mocked_upgrader._RunGit.return_value = run_result

        # Verify.
        try:
            cpu.Upgrader._CheckStableRepoOnBranch(mocked_upgrader)
            self.assertFalse(
                expect_err, "Expected RuntimeError, but none raised."
            )
        except RuntimeError as ex:
            self.assertTrue(expect_err, "Unexpected RuntimeError: %s" % str(ex))

        mocked_upgrader._RunGit.assert_called_once_with(
            mocked_upgrader._stable_repo, ["branch"], stdout=True
        )

    def testCheckStableRepoOnBranchNoBranch(self) -> None:
        """Should fail due to 'git branch' saying 'no branch'"""
        output = "* (no branch)\n  somebranch\n  otherbranch\n"
        run_result = cros_build_lib.CompletedProcess(
            returncode=0, stdout=output
        )
        self._TestCheckStableRepoOnBranch(run_result, True)

    def testCheckStableRepoOnBranchOK1(self) -> None:
        """Should pass as 'git branch' indicates a branch"""
        output = "* somebranch\n  otherbranch\n"
        run_result = cros_build_lib.CompletedProcess(
            returncode=0, stdout=output
        )
        self._TestCheckStableRepoOnBranch(run_result, False)

    def testCheckStableRepoOnBranchOK2(self) -> None:
        """Should pass as 'git branch' indicates a branch"""
        output = "  somebranch\n* otherbranch\n"
        run_result = cros_build_lib.CompletedProcess(
            returncode=0, stdout=output
        )
        self._TestCheckStableRepoOnBranch(run_result, False)

    def testCheckStableRepoOnBranchFail(self) -> None:
        """Should fail as 'git branch' failed"""
        output = "does not matter"
        run_result = cros_build_lib.CompletedProcess(
            returncode=1, stdout=output
        )
        self._TestCheckStableRepoOnBranch(run_result, True)

    #
    # _SaveStatusOnStableRepo
    #

    def _TestSaveStatusOnStableRepo(self, run_result):
        """Test Upgrader._SaveStatusOnStableRepo"""
        cmdargs = []
        mocked_upgrader = self._MockUpgrader(cmdargs=cmdargs)
        mocked_upgrader._RunGit.return_value = run_result

        # Verify.
        cpu.Upgrader._SaveStatusOnStableRepo(mocked_upgrader)
        mocked_upgrader._RunGit.assert_called_once_with(
            mocked_upgrader._stable_repo, ["status", "-s"], stdout=True
        )

        self.assertFalse(mocked_upgrader._stable_repo_stashed)
        return mocked_upgrader._stable_repo_status

    def testSaveStatusOnStableRepoFailed(self) -> None:
        """Test case where 'git status -s' fails, should raise RuntimeError"""
        run_result = cros_build_lib.CompletedProcess(returncode=1)

        self.assertRaises(
            RuntimeError, self._TestSaveStatusOnStableRepo, run_result
        )

    def testSaveStatusOnStableRepoAllKinds(self) -> None:
        """Test where 'git status -s' returns all status kinds"""
        status_lines = ["%2s %s" % (v, k) for (k, v) in self.STATUS_MIX.items()]
        status_output = "\n".join(status_lines)
        run_result = cros_build_lib.CompletedProcess(
            returncode=0, stdout=status_output
        )
        status = self._TestSaveStatusOnStableRepo(run_result)
        self.assertEqual(status, self.STATUS_MIX)

    def testSaveStatusOnStableRepoRename(self) -> None:
        """Test where 'git status -s' shows a file rename"""
        old = "path/foo-1"
        new = "path/foo-2"
        status_lines = [" R %s --> %s" % (old, new)]
        status_output = "\n".join(status_lines)
        run_result = cros_build_lib.CompletedProcess(
            returncode=0, stdout=status_output
        )
        status = self._TestSaveStatusOnStableRepo(run_result)
        self.assertEqual(status, {old: "D", new: "A"})

    def testSaveStatusOnStableRepoEmpty(self) -> None:
        """Test empty response from 'git status -s'"""
        run_result = cros_build_lib.CompletedProcess(returncode=0, stdout="")
        status = self._TestSaveStatusOnStableRepo(run_result)
        self.assertEqual(status, {})

    #
    # _AnyChangesStaged
    #

    def _TestAnyChangesStaged(self, status_dict):
        """Test Upgrader._AnyChangesStaged"""
        mocked_upgrader = self._MockUpgrader(_stable_repo_status=status_dict)
        return cpu.Upgrader._AnyChangesStaged(mocked_upgrader)

    def testAnyChangesStagedMix(self) -> None:
        """Should return True"""
        self.assertTrue(
            self._TestAnyChangesStaged(self.STATUS_MIX),
            "Failed to notice files with changed status.",
        )

    def testAnyChangesStagedUnknown(self) -> None:
        """Should return False, only files with '??' status"""
        self.assertFalse(
            self._TestAnyChangesStaged(self.STATUS_UNKNOWN),
            'Should not consider files with "??" status.',
        )

    def testAnyChangesStagedEmpty(self) -> None:
        """Should return False, no file statuses"""
        self.assertFalse(
            self._TestAnyChangesStaged(self.STATUS_EMPTY),
            "No files should mean no changes staged.",
        )

    #
    # _StashChanges
    #

    def testStashChanges(self) -> None:
        """Test Upgrader._StashChanges"""
        mocked_upgrader = self._MockUpgrader(
            cmdargs=[], _stable_repo_stashed=False
        )
        self.assertFalse(mocked_upgrader._stable_repo_stashed)

        # Verify.
        cpu.Upgrader._StashChanges(mocked_upgrader)
        mocked_upgrader._RunGit.assert_called_once_with(
            mocked_upgrader._stable_repo,
            ["stash", "save"],
            stdout=True,
            stderr=subprocess.STDOUT,
        )

        self.assertTrue(mocked_upgrader._stable_repo_stashed)

    #
    # _UnstashAnyChanges
    #

    def _TestUnstashAnyChanges(self, stashed) -> None:
        """Test Upgrader._UnstashAnyChanges"""
        mocked_upgrader = self._MockUpgrader(
            cmdargs=[], _stable_repo_stashed=stashed
        )
        self.assertEqual(stashed, mocked_upgrader._stable_repo_stashed)

        # Verify.
        cpu.Upgrader._UnstashAnyChanges(mocked_upgrader)

        if stashed:
            mocked_upgrader._RunGit.assert_called_once_with(
                mocked_upgrader._stable_repo,
                ["stash", "pop", "--index"],
                stdout=True,
                stderr=subprocess.STDOUT,
            )

        self.assertFalse(mocked_upgrader._stable_repo_stashed)

    def testUnstashAnyChanges(self) -> None:
        self._TestUnstashAnyChanges(True)
        self._TestUnstashAnyChanges(False)

    #
    # _DropAnyStashedChanges
    #

    def _TestDropAnyStashedChanges(self, stashed) -> None:
        """Test Upgrader._DropAnyStashedChanges"""
        mocked_upgrader = self._MockUpgrader(
            cmdargs=[], _stable_repo_stashed=stashed
        )
        self.assertEqual(stashed, mocked_upgrader._stable_repo_stashed)

        # Verify.
        cpu.Upgrader._DropAnyStashedChanges(mocked_upgrader)

        if stashed:
            mocked_upgrader._RunGit.assert_called_once_with(
                mocked_upgrader._stable_repo,
                ["stash", "drop"],
                stdout=True,
                stderr=subprocess.STDOUT,
            )

        self.assertFalse(mocked_upgrader._stable_repo_stashed)

    def testDropAnyStashedChanges(self) -> None:
        self._TestDropAnyStashedChanges(True)
        self._TestDropAnyStashedChanges(False)


class UtilityTest(CpuTestBase):
    """Test several Upgrader methods.

    Test these Upgrader methods: _SplitEBuildPath, _GenPortageEnvvars
    """

    #
    # _IsInUpgradeMode
    #

    def _TestIsInUpgradeMode(self, cmdargs):
        """Test Upgrader._IsInUpgradeMode.  Pretty simple."""
        mocked_upgrader = self._MockUpgrader(cmdargs=cmdargs)
        return cpu.Upgrader._IsInUpgradeMode(mocked_upgrader)

    def testIsInUpgradeModeNoOpts(self) -> None:
        """Should not be in upgrade mode with no options."""
        result = self._TestIsInUpgradeMode([])
        self.assertFalse(result)

    def testIsInUpgradeModeUpgrade(self) -> None:
        """Should be in upgrade mode with --upgrade."""
        result = self._TestIsInUpgradeMode(["--upgrade"])
        self.assertTrue(result)

    def testIsInUpgradeModeUpgradeDeep(self) -> None:
        """Should be in upgrade mode with --upgrade-deep."""
        result = self._TestIsInUpgradeMode(["--upgrade-deep"])
        self.assertTrue(result)

    #
    # _GetBoardCmd
    #

    def _TestGetBoardCmd(self, cmd, board):
        """Test Upgrader._GetBoardCmd."""
        mocked_upgrader = self._MockUpgrader(_curr_board=board)
        return cpu.Upgrader._GetBoardCmd(mocked_upgrader, cmd)

    def testGetBoardCmdKnownCmds(self) -> None:
        board = "x86-alex"
        for cmd in ["emerge", "equery", "portageq"]:
            result = self._TestGetBoardCmd(cmd, cpu.Upgrader.HOST_BOARD)
            self.assertEqual(result, cmd)
            result = self._TestGetBoardCmd(cmd, board)
            self.assertEqual(result, "%s-%s" % (cmd, board))

    def testGetBoardCmdUnknownCmd(self) -> None:
        board = "x86-alex"
        cmd = "foo"
        result = self._TestGetBoardCmd(cmd, cpu.Upgrader.HOST_BOARD)
        self.assertEqual(result, cmd)
        result = self._TestGetBoardCmd(cmd, board)
        self.assertEqual(result, cmd)

    #
    # _GenPortageEnvvars
    #

    def _TestGenPortageEnvvars(
        self, arch, unstable_ok, portdir=None, portage_configroot=None
    ) -> None:
        """Testing the behavior of the Upgrader._GenPortageEnvvars method."""
        result = cpu.Upgrader._GenPortageEnvvars(
            arch, unstable_ok, portdir, portage_configroot
        )

        keyw = arch
        if unstable_ok:
            keyw = arch + " ~" + arch

        self.assertEqual(result["ACCEPT_KEYWORDS"], keyw)
        if portdir is None:
            self.assertFalse("PORTDIR" in result)
        else:
            self.assertEqual(result["PORTDIR"], portdir)
        if portage_configroot is None:
            self.assertFalse("PORTAGE_CONFIGROOT" in result)
        else:
            self.assertEqual(result["PORTAGE_CONFIGROOT"], portage_configroot)

    def testGenPortageEnvvars1(self) -> None:
        self._TestGenPortageEnvvars("arm", False)

    def testGenPortageEnvvars2(self) -> None:
        self._TestGenPortageEnvvars("x86", True)

    def testGenPortageEnvvars3(self) -> None:
        self._TestGenPortageEnvvars(
            "x86", True, portdir="/foo/bar", portage_configroot="/bar/foo"
        )

    #
    # _SplitEBuildPath
    #

    def _TestSplitEBuildPath(self, ebuild_path, golden_result) -> None:
        """Test the behavior of the Upgrader._SplitEBuildPath method."""
        result = cpu.Upgrader._SplitEBuildPath(ebuild_path)
        self.assertEqual(result, golden_result)

    def testSplitEBuildPath1(self) -> None:
        self._TestSplitEBuildPath(
            "/foo/bar/portage/dev-libs/A/A-2.ebuild",
            ("portage", "dev-libs", "A", "A-2"),
        )

    def testSplitEBuildPath2(self) -> None:
        self._TestSplitEBuildPath(
            "/foo/ooo/ccc/ppp/ppp-1.2.3-r123.ebuild",
            ("ooo", "ccc", "ppp", "ppp-1.2.3-r123"),
        )


@unittest.skip("playground setup needs more work")
class TreeInspectTest(CpuTestBase):
    """Test Upgrader methods: _FindCurrentCPV, _FindUpstreamCPV"""

    def _GenerateTestInput(self, category, pkg_name, ver_rev):
        """Return tuple (ebuild_path, cpv, cp)."""
        ebuild_path = None
        cpv = None
        if ver_rev:
            ebuild_path = "%s/%s/%s/%s-%s.ebuild" % (
                self.portage_stable,
                category,
                pkg_name,
                pkg_name,
                ver_rev,
            )
            cpv = "%s/%s-%s" % (category, pkg_name, ver_rev)
        cp = "%s/%s" % (category, pkg_name)
        return (ebuild_path, cpv, cp)

    #
    # _FindUpstreamCPV
    #

    def _TestFindUpstreamCPV(self, pkg_arg, ebuild_expect, unstable_ok=False):
        """Test Upgrader._FindUpstreamCPV

        This points _FindUpstreamCPV at the ResolverPlayground as if it is
        the upstream tree.
        """
        self._SetUpPlayground()
        mocked_upgrader = self._MockUpgrader(_curr_board=None)

        # Replay script.
        if ebuild_expect:
            ebuild_path = self.eroot + ebuild_expect
            split_path = cpu.Upgrader._SplitEBuildPath(ebuild_path)
            mocked_upgrader._SplitEBuildPath.return_value = split_path

        # Verify.
        result = cpu.Upgrader._FindUpstreamCPV(
            mocked_upgrader, pkg_arg, unstable_ok
        )

        if ebuild_expect:
            mocked_upgrader._SplitEBuildPath.assert_called_once_with(
                ebuild_path
            )

        self.assertEqual(bool(ebuild_expect), bool(result))

        return result

    def testFindUpstreamA2(self) -> None:
        (ebuild, cpv, cp) = self._GenerateTestInput(
            category="dev-libs", pkg_name="A", ver_rev="2"
        )
        result = self._TestFindUpstreamCPV(cp, ebuild)
        self.assertEqual(result, cpv)

    def testFindUpstreamAAA(self) -> None:
        (ebuild, cpv, cp) = self._GenerateTestInput(
            category="dev-apps", pkg_name="AAA", ver_rev=None
        )
        result = self._TestFindUpstreamCPV(cp, ebuild)
        self.assertEqual(result, cpv)

    def testFindUpstreamF(self) -> None:
        (ebuild, cpv, cp) = self._GenerateTestInput(
            category="dev-libs", pkg_name="F", ver_rev="2"
        )
        result = self._TestFindUpstreamCPV(cp, ebuild)
        self.assertEqual(result, cpv)

    def testFindUpstreamFlimflam(self) -> None:
        """Should find 0.0.1-r228 because more recent flimflam unstable."""
        (ebuild, cpv, cp) = self._GenerateTestInput(
            category="chromeos-base", pkg_name="flimflam", ver_rev="0.0.1-r228"
        )
        result = self._TestFindUpstreamCPV(cp, ebuild)
        self.assertEqual(result, cpv)

    def testFindUpstreamFlimflamUnstable(self) -> None:
        """Should find 0.0.2-r123 because of unstable_ok."""
        (ebuild, cpv, cp) = self._GenerateTestInput(
            category="chromeos-base", pkg_name="flimflam", ver_rev="0.0.2-r123"
        )
        result = self._TestFindUpstreamCPV(cp, ebuild, unstable_ok=True)
        self.assertEqual(result, cpv)

    #
    # _FindCurrentCPV
    #

    def _TestFindCurrentCPV(self, pkg_arg, ebuild_expect):
        """Test Upgrader._FindCurrentCPV

        This test points Upgrader._FindCurrentCPV to the ResolverPlayground
        tree as if it is the local source.
        """
        mocked_upgrader = self._MockUpgrader(_curr_board=None)
        self._SetUpPlayground()

        # Replay script.
        mocked_upgrader._GetBoardCmd.return_value = "equery"

        if ebuild_expect:
            ebuild_path = self.eroot + ebuild_expect
            split_path = cpu.Upgrader._SplitEBuildPath(ebuild_path)
            mocked_upgrader._SplitEBuildPath.return_value = split_path

        # Verify.
        result = cpu.Upgrader._FindCurrentCPV(mocked_upgrader, pkg_arg)

        if ebuild_expect:
            mocked_upgrader._SplitEBuildPath.assert_called_once_with(
                ebuild_path
            )

        return result

    def testFindCurrentA(self) -> None:
        """Should find dev-libs/A-2."""
        (ebuild, cpv, cp) = self._GenerateTestInput(
            category="dev-libs", pkg_name="A", ver_rev="2"
        )
        result = self._TestFindCurrentCPV(cp, ebuild)
        self.assertEqual(result, cpv)

    def testFindCurrentAAA(self) -> None:
        """Should find None, because dev-libs/AAA does not exist in tree."""
        (ebuild, cpv, cp) = self._GenerateTestInput(
            category="dev-libs", pkg_name="AAA", ver_rev=None
        )
        result = self._TestFindCurrentCPV(cp, ebuild)
        self.assertEqual(result, cpv)

    def testFindCurrentF(self) -> None:
        """Should find dev-libs/F-2."""
        (ebuild, cpv, cp) = self._GenerateTestInput(
            category="dev-libs", pkg_name="F", ver_rev="2"
        )
        result = self._TestFindCurrentCPV(cp, ebuild)
        self.assertEqual(result, cpv)

    def testFindCurrentFlimflam(self) -> None:
        """Should find 0.0.1-r228 because more recent flimflam unstable."""
        (ebuild, cpv, cp) = self._GenerateTestInput(
            category="chromeos-base", pkg_name="flimflam", ver_rev="0.0.1-r228"
        )
        result = self._TestFindCurrentCPV(cp, ebuild)
        self.assertEqual(result, cpv)


class RunBoardTest(CpuTestBase):
    """Test Upgrader.RunBoard,PrepareToRun,RunCompleted."""

    def testRunCompletedSpecified(self) -> None:
        cmdargs = ["--upstream=/some/dir"]
        mocked_upgrader = self._MockUpgrader(cmdargs=cmdargs, _curr_board=None)
        cpu.Upgrader.RunCompleted(mocked_upgrader)

    def testRunCompletedRemoveCache(self) -> None:
        # TODO: Create cache and check it's cleaned up.
        cmdargs = ["--no-upstream-cache"]
        mocked_upgrader = self._MockUpgrader(cmdargs=cmdargs, _curr_board=None)
        cpu.Upgrader.RunCompleted(mocked_upgrader)

    def testRunCompletedKeepCache(self) -> None:
        # TODO: Create cache and check it's left behind.
        cmdargs = []
        mocked_upgrader = self._MockUpgrader(cmdargs=cmdargs, _curr_board=None)
        cpu.Upgrader.RunCompleted(mocked_upgrader)

    def testPrepareToRunUpstreamRepoExists(self) -> None:
        osutils.Touch(
            os.path.join(self.upstream_tmp_repo, ".git", "shallow"),
            makedirs=True,
        )

        cmdargs = []
        mocked_upgrader = self._MockUpgrader(cmdargs=cmdargs, _curr_board=None)

        # Verify.
        cpu.Upgrader.PrepareToRun(mocked_upgrader)

        mocked_upgrader._RunGit.assert_has_calls(
            [
                mock.call(
                    self.upstream_tmp_repo,
                    [
                        "remote",
                        "set-url",
                        "origin",
                        cpu.Upgrader.PORTAGE_GIT_URL,
                    ],
                ),
                mock.call(
                    self.upstream_tmp_repo,
                    [
                        "config",
                        "remote.origin.fetch",
                        "+refs/heads/master:refs/remotes/origin/master",
                    ],
                ),
                mock.call(self.upstream_tmp_repo, ["remote", "update"]),
                mock.call(
                    self.upstream_tmp_repo,
                    ["checkout", "-f", "origin/master"],
                    stderr=subprocess.STDOUT,
                    stdout=True,
                ),
            ]
        )

    def testPrepareToRunUpstreamRepoNew(self) -> None:
        cmdargs = []
        mocked_upgrader = self._MockUpgrader(cmdargs=cmdargs, _curr_board=None)

        # Verify.
        cpu.Upgrader.PrepareToRun(mocked_upgrader)

        self.assertExists(self.upstream_tmp_repo + "-README")
        mocked_upgrader._RunGit.assert_called_once_with(
            str(self.tempdir),
            [
                "clone",
                "--branch",
                "master",
                "--depth",
                "1",
                cpu.Upgrader.PORTAGE_GIT_URL,
                os.path.basename(self.upstream_tmp_repo),
            ],
        )

    def _TestRunBoard(
        self, pinfolist, upgrade=False, staged_changes=False
    ) -> None:
        """Test Upgrader.RunBoard."""
        targetlist = [pinfo.user_arg for pinfo in pinfolist]
        upstream_only_pinfolist = [
            pinfo for pinfo in pinfolist if not pinfo.cpv
        ]

        cmdargs = targetlist
        if upgrade:
            cmdargs = ["--upgrade"] + cmdargs
        mocked_upgrader = self._MockUpgrader(cmdargs=cmdargs, _curr_board=None)
        board = "runboard_testboard"

        self.PatchObject(
            cpu.Upgrader, "_FindBoardArch", return_value=DEFAULT_ARCH
        )
        upgrade_mode = cpu.Upgrader._IsInUpgradeMode(mocked_upgrader)
        mocked_upgrader._IsInUpgradeMode.return_value = upgrade_mode
        mocked_upgrader._AnyChangesStaged.return_value = staged_changes
        if staged_changes:
            mocked_upgrader._StashChanges()
        mocked_upgrader._ResolveAndVerifyArgs.return_value = pinfolist

        if upgrade:
            mocked_upgrader._FinalizeUpstreamPInfolist.return_value = []
        else:
            mocked_upgrader._GetCurrentVersions.return_value = pinfolist
            mocked_upgrader._FinalizeLocalPInfolist.return_value = []

            if upgrade_mode:
                mocked_upgrader._FinalizeUpstreamPInfolist.return_value = []

        # Verify.
        cpu.Upgrader.RunBoard(mocked_upgrader, board)

        mocked_upgrader._ResolveAndVerifyArgs.assert_called_once_with(
            targetlist, upgrade_mode
        )
        if not upgrade and upgrade_mode:
            mocked_upgrader._FinalizeUpstreamPInfolist.assert_called_once_with(
                upstream_only_pinfolist
            )

    def testRunBoard1(self):
        target_pinfolist = [
            cpu.PInfo(
                user_arg="dev-libs/A",
                cpv="dev-libs/A-1",
                upstream_cpv="dev-libs/A-2",
            )
        ]
        return self._TestRunBoard(target_pinfolist)

    def testRunBoard2(self):
        target_pinfolist = [
            cpu.PInfo(
                user_arg="dev-libs/A",
                cpv="dev-libs/A-1",
                upstream_cpv="dev-libs/A-2",
            )
        ]
        return self._TestRunBoard(target_pinfolist, upgrade=True)

    def testRunBoard3(self):
        target_pinfolist = [
            cpu.PInfo(
                user_arg="dev-libs/A",
                cpv="dev-libs/A-1",
                upstream_cpv="dev-libs/A-2",
            )
        ]
        return self._TestRunBoard(
            target_pinfolist, upgrade=True, staged_changes=True
        )

    def testRunBoardUpstreamOnlyStatusMode(self) -> None:
        """Status mode with package that is only upstream should error."""
        pinfolist = [
            cpu.PInfo(
                user_arg="dev-libs/M", cpv=None, upstream_cpv="dev-libs/M-2"
            ),
        ]

        targetlist = [pinfo.user_arg for pinfo in pinfolist]

        mocked_upgrader = self._MockUpgrader(
            cmdargs=["dev-libs/M"], _curr_board=None
        )
        board = "runboard_testboard"

        self.PatchObject(
            cpu.Upgrader, "_FindBoardArch", return_value=DEFAULT_ARCH
        )
        upgrade_mode = cpu.Upgrader._IsInUpgradeMode(mocked_upgrader)
        mocked_upgrader._IsInUpgradeMode.return_value = upgrade_mode
        mocked_upgrader._AnyChangesStaged.return_value = False
        mocked_upgrader._ResolveAndVerifyArgs.return_value = pinfolist

        # Verify.
        self.assertRaises(
            RuntimeError, cpu.Upgrader.RunBoard, mocked_upgrader, board
        )

        mocked_upgrader._ResolveAndVerifyArgs.assert_called_once_with(
            targetlist, upgrade_mode
        )


class GiveEmergeResultsTest(CpuTestBase):
    """Test Upgrader._GiveEmergeResults"""

    def _TestGiveEmergeResultsOK(self, pinfolist, ok, error=None) -> None:
        cmdargs = []
        mocked_upgrader = self._MockUpgrader(cmdargs=cmdargs)

        # Replay script.
        mocked_upgrader._AreEmergeable.return_value = (ok, None, None)

        # Verify.
        if error:
            self.assertRaises(
                error,
                cpu.Upgrader._GiveEmergeResults,
                mocked_upgrader,
                pinfolist,
            )
        else:
            cpu.Upgrader._GiveEmergeResults(mocked_upgrader, pinfolist)

    def testGiveEmergeResultsUnmaskedOK(self) -> None:
        pinfolist = [
            cpu.PInfo(upgraded_cpv="abc/def-4", upgraded_unmasked=True),
            cpu.PInfo(upgraded_cpv="bcd/efg-8", upgraded_unmasked=True),
        ]
        self._TestGiveEmergeResultsOK(pinfolist, True)

    def testGiveEmergeResultsUnmaskedNotOK(self) -> None:
        pinfolist = [
            cpu.PInfo(upgraded_cpv="abc/def-4", upgraded_unmasked=True),
            cpu.PInfo(upgraded_cpv="bcd/efg-8", upgraded_unmasked=True),
        ]
        self._TestGiveEmergeResultsOK(pinfolist, False, error=RuntimeError)

    def _TestGiveEmergeResultsMasked(
        self, pinfolist, ok, masked_cpvs, error=None
    ) -> None:
        cmdargs = []
        mocked_upgrader = self._MockUpgrader(cmdargs=cmdargs)

        # Replay script.
        emergeable_tuple = (ok, "some-cmd", "some-output")
        mocked_upgrader._AreEmergeable.return_value = emergeable_tuple

        # Verify.
        if error:
            self.assertRaises(
                error,
                cpu.Upgrader._GiveEmergeResults,
                mocked_upgrader,
                pinfolist,
            )
        else:
            cpu.Upgrader._GiveEmergeResults(mocked_upgrader, pinfolist)

        if not ok:
            self.assertEqual(
                sorted(mocked_upgrader._GiveMaskedError.call_args_list),
                sorted(mock.call(x, "some-output") for x in masked_cpvs),
            )

    def testGiveEmergeResultsMaskedOK(self) -> None:
        pinfolist = [
            cpu.PInfo(upgraded_cpv="abc/def-4", upgraded_unmasked=False),
            cpu.PInfo(upgraded_cpv="bcd/efg-8", upgraded_unmasked=False),
        ]
        masked_cpvs = ["abc/def-4", "bcd/efg-8"]
        self._TestGiveEmergeResultsMasked(
            pinfolist, True, masked_cpvs, error=RuntimeError
        )

    def testGiveEmergeResultsMaskedNotOK(self) -> None:
        pinfolist = [
            cpu.PInfo(upgraded_cpv="abc/def-4", upgraded_unmasked=False),
            cpu.PInfo(upgraded_cpv="bcd/efg-8", upgraded_unmasked=False),
        ]
        masked_cpvs = ["abc/def-4", "bcd/efg-8"]
        self._TestGiveEmergeResultsMasked(
            pinfolist, False, masked_cpvs, error=RuntimeError
        )


class CheckStagedUpgradesTest(CpuTestBase):
    """Test Upgrader._CheckStagedUpgrades"""

    def testCheckStagedUpgradesTwoStaged(self) -> None:
        cmdargs = []

        ebuild1 = "a/b/foo/bar/bar-1.ebuild"
        ebuild2 = "x/y/bar/foo/foo-3.ebuild"
        repo_status = {
            ebuild1: "A",
            "a/b/foo/garbage": "A",
            ebuild2: "A",
        }

        pinfolist = [cpu.PInfo(package="foo/bar"), cpu.PInfo(package="bar/foo")]

        mocked_upgrader = self._MockUpgrader(
            cmdargs=cmdargs, _stable_repo_status=repo_status
        )

        # Verify.
        cpu.Upgrader._CheckStagedUpgrades(mocked_upgrader, pinfolist)

    def testCheckStagedUpgradesTwoStagedOneUnexpected(self) -> None:
        cmdargs = []

        ebuild1 = "a/b/foo/bar/bar-1.ebuild"
        ebuild2 = "x/y/bar/foo/foo-3.ebuild"
        repo_status = {
            ebuild1: "A",
            "a/b/foo/garbage": "A",
            ebuild2: "A",
        }

        # Without foo/bar in the pinfolist it should complain about that
        # package having staged changes.
        pinfolist = [cpu.PInfo(package="bar/foo")]

        mocked_upgrader = self._MockUpgrader(
            cmdargs=cmdargs, _stable_repo_status=repo_status
        )

        # Verify.
        self.assertRaises(
            RuntimeError,
            cpu.Upgrader._CheckStagedUpgrades,
            mocked_upgrader,
            pinfolist,
        )

    def testCheckStagedUpgradesNoneStaged(self) -> None:
        cmdargs = []

        pinfolist = [
            cpu.PInfo(package="foo/bar-1"),
            cpu.PInfo(package="bar/foo-3"),
        ]

        mocked_upgrader = self._MockUpgrader(
            cmdargs=cmdargs, _stable_repo_status=None
        )

        # Verify.
        cpu.Upgrader._CheckStagedUpgrades(mocked_upgrader, pinfolist)


class UpgradePackagesTest(CpuTestBase):
    """Test Upgrader._UpgradePackages"""

    def _TestUpgradePackages(self, pinfolist, upgrade) -> None:
        cmdargs = []
        if upgrade:
            cmdargs.append("--upgrade")
        mocked_upgrader = self._MockUpgrader(cmdargs=cmdargs)

        # Replay script.
        upgrades_this_run = False
        calls_pinfo = []
        calls_VerifyPackageUpgrade = []
        side_effect_UpgradePackage = []
        for pinfo in pinfolist:
            pkg_result = bool(pinfo.upgraded_cpv)
            calls_pinfo.append(mock.call(pinfo))
            side_effect_UpgradePackage.append(pkg_result)
            if pinfo.upgraded_cpv:
                calls_VerifyPackageUpgrade.append(mock.call(pinfo))
            if pkg_result:
                upgrades_this_run = True
        mocked_upgrader._UpgradePackage.side_effect = side_effect_UpgradePackage

        upgrade_mode = cpu.Upgrader._IsInUpgradeMode(mocked_upgrader)
        mocked_upgrader._IsInUpgradeMode.return_value = upgrade_mode

        # Verify.
        cpu.Upgrader._UpgradePackages(mocked_upgrader, pinfolist)

        if upgrades_this_run:
            self.assertEqual(1, mocked_upgrader._GiveEmergeResults.call_count)

        mocked_upgrader._UpgradePackage.assert_has_calls(calls_pinfo)
        mocked_upgrader._VerifyPackageUpgrade.assert_has_calls(
            calls_VerifyPackageUpgrade
        )
        mocked_upgrader._PackageReport.assert_has_calls(calls_pinfo)

    def testUpgradePackagesUpgradeModeWithUpgrades(self) -> None:
        pinfolist = [
            cpu.PInfo(upgraded_cpv="abc/def-4"),
            cpu.PInfo(upgraded_cpv="bcd/efg-8"),
            cpu.PInfo(upgraded_cpv=None),
            cpu.PInfo(upgraded_cpv=None),
        ]
        self._TestUpgradePackages(pinfolist, True)

    def testUpgradePackagesUpgradeModeNoUpgrades(self) -> None:
        pinfolist = [cpu.PInfo(upgraded_cpv=None), cpu.PInfo(upgraded_cpv=None)]
        self._TestUpgradePackages(pinfolist, True)

    def testUpgradePackagesStatusModeNoUpgrades(self) -> None:
        pinfolist = [cpu.PInfo(upgraded_cpv=None), cpu.PInfo(upgraded_cpv=None)]
        self._TestUpgradePackages(pinfolist, False)


class CategoriesRoundtripTest(cros_test_lib.MockTempDirTestCase):
    """Tests for full "round trip" runs."""

    def _TestCategoriesRoundtrip(self, categories) -> None:
        stable_repo = self.tempdir
        cat_file = cpu.Upgrader.CATEGORIES_FILE
        profiles_dir = os.path.join(stable_repo, os.path.dirname(cat_file))

        git_mock = self.PatchObject(cpu.Upgrader, "_RunGit")

        options = cros_test_lib.EasyAttr(
            srcroot="foobar", upstream=None, packages=""
        )
        upgrader = cpu.Upgrader(options=options)
        upgrader._stable_repo = stable_repo
        os.makedirs(profiles_dir)

        # Verification phase.  Write then load categories.
        upgrader._stable_repo_categories = set(categories)
        upgrader._WriteStableRepoCategories()
        upgrader._stable_repo_categories = None
        upgrader._LoadStableRepoCategories()
        git_mock.assert_called_once_with(stable_repo, ["add", cat_file])
        self.assertEqual(
            sorted(categories), sorted(upgrader._stable_repo_categories)
        )

    def test1(self) -> None:
        categories = ["alpha-omega", "omega-beta", "beta-chi"]
        self._TestCategoriesRoundtrip(categories)

    def test2(self) -> None:
        categories = []
        self._TestCategoriesRoundtrip(categories)

    def test3(self) -> None:
        categories = ["virtual", "happy-days", "virtually-there"]
        self._TestCategoriesRoundtrip(categories)


class UpgradePackageTest(CpuTestBase):
    """Test Upgrader._UpgradePackage"""

    def _TestUpgradePackage(
        self,
        pinfo,
        upstream_cpv,
        upstream_cmp,
        stable_up,
        latest_up,
        upgrade_requested,
        upgrade_staged,
        unstable_ok,
        force,
    ):
        cmdargs = []
        if unstable_ok:
            cmdargs.append("--unstable-ok")
        if force:
            cmdargs.append("--force")
        mocked_upgrader = self._MockUpgrader(cmdargs=cmdargs)

        # Replay script.
        def FindUpstreamCPV(pkg, unstable_ok=False):
            self.assertEqual(pinfo.package, pkg)
            return latest_up if unstable_ok else stable_up

        mocked_upgrader._FindUpstreamCPV.side_effect = FindUpstreamCPV

        git_calls = []
        if upstream_cpv:
            mocked_upgrader._PkgUpgradeRequested.return_value = (
                upgrade_requested
            )

            if upgrade_requested:
                mocked_upgrader._PkgUpgradeStaged.return_value = upgrade_staged
                if not upgrade_staged and (
                    upstream_cmp > 0 or (upstream_cmp == 0 and force)
                ):
                    mocked_upgrader._CopyUpstreamPackage.return_value = (
                        upstream_cpv
                    )
                    upgrade_staged = True

                if upgrade_staged:
                    mocked_upgrader._SetUpgradedMaskBits(pinfo)
                    ebuild_path = cpu.Upgrader._GetEbuildPathFromCpv(
                        upstream_cpv
                    )
                    ebuild_path = os.path.join(
                        mocked_upgrader._stable_repo, ebuild_path
                    )
                    git_calls += [
                        mock.call(
                            mocked_upgrader._stable_repo, ["add", pinfo.package]
                        ),
                    ]

        # Verify.
        result = cpu.Upgrader._UpgradePackage(mocked_upgrader, pinfo)

        mocked_upgrader._RunGit.assert_has_calls(git_calls)

        if upstream_cpv:
            mocked_upgrader._PkgUpgradeRequested.assert_called_once_with(pinfo)
            self.assertEqual(upstream_cpv, pinfo.upstream_cpv)

            if upgrade_requested:
                mocked_upgrader._PkgUpgradeStaged.assert_called_once_with(
                    upstream_cpv
                )

            if upgrade_requested and (upstream_cpv != pinfo.cpv or force):
                self.assertEqual(upstream_cpv, pinfo.upgraded_cpv)
            else:
                self.assertIsNone(pinfo.upgraded_cpv)
        else:
            self.assertIsNone(pinfo.upstream_cpv)
            self.assertIsNone(pinfo.upgraded_cpv)
        self.assertEqual(stable_up, pinfo.stable_upstream_cpv)
        self.assertEqual(latest_up, pinfo.latest_upstream_cpv)

        return result

    # Dimensions to vary:
    # 1) Upgrade for this package requested or not.
    # 2) Upgrade can be stable or not.
    # 3) Specific version to upgrade to specified.
    # 4) Upgrade already staged or not.
    # 5) Upgrade needed or not (current).

    def testUpgradePackageOutdatedRequestedStable(self) -> None:
        pinfo = cpu.PInfo(cpv="foo/bar-2", package="foo/bar", upstream_cpv=None)
        result = self._TestUpgradePackage(
            pinfo,
            upstream_cpv="foo/bar-3",
            upstream_cmp=1,  # outdated
            stable_up="foo/bar-3",
            latest_up="foo/bar-5",
            upgrade_requested=True,
            upgrade_staged=False,
            unstable_ok=False,
            force=False,
        )
        self.assertTrue(result)

    def testUpgradePackageOutdatedRequestedUnstable(self) -> None:
        pinfo = cpu.PInfo(cpv="foo/bar-2", package="foo/bar", upstream_cpv=None)
        result = self._TestUpgradePackage(
            pinfo,
            upstream_cpv="foo/bar-5",
            upstream_cmp=1,  # outdated
            stable_up="foo/bar-3",
            latest_up="foo/bar-5",
            upgrade_requested=True,
            upgrade_staged=False,
            unstable_ok=True,
            force=False,
        )
        self.assertTrue(result)

    def testUpgradePackageOutdatedRequestedStableSpecified(self) -> None:
        pinfo = cpu.PInfo(
            cpv="foo/bar-2", package="foo/bar", upstream_cpv="foo/bar-4"
        )
        result = self._TestUpgradePackage(
            pinfo,
            upstream_cpv="foo/bar-4",
            upstream_cmp=1,  # outdated
            stable_up="foo/bar-3",
            latest_up="foo/bar-5",
            upgrade_requested=True,
            upgrade_staged=False,
            unstable_ok=False,  # not important
            force=False,
        )
        self.assertTrue(result)

    def testUpgradePackageCurrentRequestedStable(self) -> None:
        pinfo = cpu.PInfo(cpv="foo/bar-3", package="foo/bar", upstream_cpv=None)
        result = self._TestUpgradePackage(
            pinfo,
            upstream_cpv="foo/bar-3",
            upstream_cmp=0,  # current
            stable_up="foo/bar-3",
            latest_up="foo/bar-5",
            upgrade_requested=True,
            upgrade_staged=False,
            unstable_ok=False,
            force=False,
        )
        self.assertFalse(result)

    def testUpgradePackageCurrentRequestedStableForce(self) -> None:
        pinfo = cpu.PInfo(
            cpv="foo/bar-3", package="foo/bar", upstream_cpv="foo/bar-3"
        )
        result = self._TestUpgradePackage(
            pinfo,
            upstream_cpv="foo/bar-3",
            upstream_cmp=0,  # current
            stable_up="foo/bar-3",
            latest_up="foo/bar-5",
            upgrade_requested=True,
            upgrade_staged=False,
            unstable_ok=False,
            force=True,
        )
        self.assertTrue(result)

    def testUpgradePackageOutdatedStable(self) -> None:
        pinfo = cpu.PInfo(cpv="foo/bar-2", package="foo/bar", upstream_cpv=None)
        result = self._TestUpgradePackage(
            pinfo,
            upstream_cpv="foo/bar-3",
            upstream_cmp=1,  # outdated
            stable_up="foo/bar-3",
            latest_up="foo/bar-5",
            upgrade_requested=False,
            upgrade_staged=False,
            unstable_ok=False,
            force=False,
        )
        self.assertFalse(result)

    def testUpgradePackageOutdatedRequestedStableStaged(self) -> None:
        pinfo = cpu.PInfo(cpv="foo/bar-2", package="foo/bar", upstream_cpv=None)
        result = self._TestUpgradePackage(
            pinfo,
            upstream_cpv="foo/bar-3",
            upstream_cmp=1,  # outdated
            stable_up="foo/bar-3",
            latest_up="foo/bar-5",
            upgrade_requested=True,
            upgrade_staged=True,
            unstable_ok=False,
            force=False,
        )
        self.assertTrue(result)

    def testUpgradePackageOutdatedRequestedUnstableStaged(self) -> None:
        pinfo = cpu.PInfo(
            cpv="foo/bar-2", package="foo/bar", upstream_cpv="foo/bar-5"
        )
        result = self._TestUpgradePackage(
            pinfo,
            upstream_cpv="foo/bar-5",
            upstream_cmp=1,  # outdated
            stable_up="foo/bar-3",
            latest_up="foo/bar-5",
            upgrade_requested=True,
            upgrade_staged=True,
            unstable_ok=True,
            force=False,
        )
        self.assertTrue(result)


class VerifyPackageTest(CpuTestBase):
    """Tests for _VerifyPackageUpgrade()."""

    def _TestVerifyPackageUpgrade(self, pinfo) -> None:
        cmdargs = []
        mocked_upgrader = self._MockUpgrader(cmdargs=cmdargs)
        was_overwrite = pinfo.cpv_cmp_upstream == 0

        # Verify.
        cpu.Upgrader._VerifyPackageUpgrade(mocked_upgrader, pinfo)

        mocked_upgrader._VerifyEbuildOverlay.assert_called_once_with(
            pinfo.upgraded_cpv, "portage-stable", was_overwrite
        )

    def testVerifyPackageUpgrade(self) -> None:
        pinfo = cpu.PInfo(upgraded_cpv="foo/bar-3")

        for cpv_cmp_upstream in (0, 1):
            pinfo.cpv_cmp_upstream = cpv_cmp_upstream
            self._TestVerifyPackageUpgrade(pinfo)

    def _TestVerifyEbuildOverlay(
        self, cpv, overlay, ebuild_path, was_overwrite
    ) -> None:
        """Test Upgrader._VerifyEbuildOverlay"""
        cmdargs = []
        mocked_upgrader = self._MockUpgrader(cmdargs=cmdargs)

        rc_mock = self.StartPatcher(cros_test_lib.RunCommandMock())

        # Replay script.
        mocked_upgrader._EqueryBoardWhich.side_effect = (
            cpu.Upgrader._EqueryWhich
        )
        rc_mock.SetDefaultCmdResult(stdout=ebuild_path)

        # Verify.
        cpu.Upgrader._VerifyEbuildOverlay(
            mocked_upgrader, cpv, overlay, was_overwrite
        )

        rc_mock.assertCommandCalled(
            [
                "equery",
                "--no-color",
                "--no-pipe",
                "which",
                "--include-masked",
                cpv,
            ],
            check=False,
            extra_env={"ACCEPT_KEYWORDS": DEFAULT_ARCH},
            stdout=True,
            stderr=subprocess.STDOUT,
            encoding="utf-8",
        )

    def testVerifyEbuildOverlayGood(self) -> None:
        cpv = "foo/bar-2"
        overlay = "some-overlay"
        good_path = "/some/path/%s/foo/bar/bar-2.ebuild" % overlay

        self._TestVerifyEbuildOverlay(cpv, overlay, good_path, False)

    def testVerifyEbuildOverlayEvilNonOverwrite(self) -> None:
        cpv = "foo/bar-2"
        overlay = "some-overlay"
        evil_path = "/some/path/spam/foo/bar/bar-2.ebuild"

        self.assertRaises(
            RuntimeError,
            self._TestVerifyEbuildOverlay,
            cpv,
            overlay,
            evil_path,
            False,
        )

    def testVerifyEbuildOverlayEvilOverwrite(self) -> None:
        cpv = "foo/bar-2"
        overlay = "some-overlay"
        evil_path = "/some/path/spam/foo/bar/bar-2.ebuild"

        self.assertRaises(
            RuntimeError,
            self._TestVerifyEbuildOverlay,
            cpv,
            overlay,
            evil_path,
            True,
        )

    def _TestSetUpgradedMaskBits(self, pinfo, output) -> None:
        cpv = pinfo.upgraded_cpv
        cmdargs = []
        mocked_upgrader = self._MockUpgrader(cmdargs=cmdargs)

        rc_mock = self.StartPatcher(cros_test_lib.RunCommandMock())

        # Replay script.
        mocked_upgrader._GenPortageEnvvars.return_value = {"env": "vars"}
        mocked_upgrader._GetBoardCmd.return_value = "equery"
        rc_mock.SetDefaultCmdResult(stdout=output)

        # Verify.
        cpu.Upgrader._SetUpgradedMaskBits(mocked_upgrader, pinfo)

        rc_mock.assertCommandCalled(
            ["equery", "-qCN", "list", "-F", "$mask|$cpv:$slot", "-op", cpv],
            check=False,
            extra_env={"ACCEPT_KEYWORDS": DEFAULT_ARCH},
            stdout=True,
            stderr=subprocess.STDOUT,
            encoding="utf-8",
        )

    def testGetMaskBitsUnmaskedStable(self) -> None:
        output = "  |foo/bar-2.7.0:0"
        pinfo = cpu.PInfo(upgraded_cpv="foo/bar-2.7.0")
        self._TestSetUpgradedMaskBits(pinfo, output)
        self.assertTrue(pinfo.upgraded_unmasked)

    def testGetMaskBitsUnmaskedUnstable(self) -> None:
        output = " ~|foo/bar-2.7.3:0"
        pinfo = cpu.PInfo(upgraded_cpv="foo/bar-2.7.3")
        self._TestSetUpgradedMaskBits(pinfo, output)
        self.assertTrue(pinfo.upgraded_unmasked)

    def testGetMaskBitsMaskedStable(self) -> None:
        output = "M |foo/bar-2.7.4:0"
        pinfo = cpu.PInfo(upgraded_cpv="foo/bar-2.7.4")
        self._TestSetUpgradedMaskBits(pinfo, output)
        self.assertFalse(pinfo.upgraded_unmasked)

    def testGetMaskBitsMaskedUnstable(self) -> None:
        output = "M~|foo/bar-2.7.4-r1:0"
        pinfo = cpu.PInfo(upgraded_cpv="foo/bar-2.7.4-r1")
        self._TestSetUpgradedMaskBits(pinfo, output)
        self.assertFalse(pinfo.upgraded_unmasked)


class CommitTest(CpuTestBase):
    """Test various commit-related Upgrader methods"""

    #
    # _ExtractUpgradedPkgs
    #

    def _TestExtractUpgradedPkgs(self, upgrade_lines):
        """Test Upgrader._ExtractUpgradedPkgs"""
        return cpu.Upgrader._ExtractUpgradedPkgs(upgrade_lines)

    def testExtractUpgradedPkgs(self) -> None:
        upgrade_lines = [
            "Upgraded abc/efg to version 1.2.3 on amd64, arm, x86",
            "Upgraded xyz/uvw to version 1.2.3 on amd64",
            "Upgraded xyz/uvw to version 3.2.1 on arm, x86",
            "Upgraded mno/pqr to version 12345 on x86",
        ]
        result = self._TestExtractUpgradedPkgs(upgrade_lines)
        self.assertEqual(result, ["efg", "pqr", "uvw"])

    #
    # _AmendCommitMessage
    #

    def _TestAmendCommitMessage(
        self, new_upgrade_lines, old_upgrade_lines, remaining_lines, git_show
    ) -> None:
        """Test Upgrader._AmendCommitMessage"""
        mocked_upgrader = self._MockUpgrader()

        gold_lines = new_upgrade_lines + old_upgrade_lines

        # Replay script.
        def RunGit(cwd, _cmd, **_kwargs):
            self.assertEqual(mocked_upgrader._stable_repo, cwd)
            return cros_build_lib.CompletedProcess(
                returncode=0, stdout=git_show
            )

        mocked_upgrader._RunGit.side_effect = RunGit

        def CreateCommit(mock_upgrade_lines, mock_remaining_lines) -> None:
            self.assertEqual(gold_lines, mock_upgrade_lines)
            self.assertEqual(remaining_lines, mock_remaining_lines)

        mocked_upgrader._CreateCommitMessage = CreateCommit

        # Verify.
        cpu.Upgrader._AmendCommitMessage(mocked_upgrader, new_upgrade_lines)

    def testOldAndNew(self) -> None:
        new_upgrade_lines = [
            "Upgraded abc/efg to version 1.2.3 on amd64, arm, x86",
            "Upgraded mno/pqr to version 4.5-r1 on x86",
        ]
        old_upgrade_lines = [
            "Upgraded xyz/uvw to version 3.2.1 on arm, x86",
            "Upgraded mno/pqr to version 12345 on x86",
        ]
        remaining_lines = [
            "Extraneous extra comments in commit body.",
            "",
            "BUG=chromium-os:12345",
            "TEST=test everything",
            "again and again",
        ]
        git_show_output = (
            "\n".join(old_upgrade_lines)
            + "\n"
            + "\n"
            + "\n".join(remaining_lines)
        )
        self._TestAmendCommitMessage(
            new_upgrade_lines,
            old_upgrade_lines,
            remaining_lines,
            git_show_output,
        )

    def testOldOnly(self) -> None:
        old_upgrade_lines = [
            "Upgraded xyz/uvw to version 3.2.1 on arm, x86",
            "Upgraded mno/pqr to version 12345 on x86",
        ]
        git_show_output = "\n".join(old_upgrade_lines)
        self._TestAmendCommitMessage([], old_upgrade_lines, [], git_show_output)

    def testNewOnly(self) -> None:
        new_upgrade_lines = [
            "Upgraded abc/efg to version 1.2.3 on amd64, arm, x86",
            "Upgraded mno/pqr to version 4.5-r1 on x86",
        ]
        git_show_output = ""
        self._TestAmendCommitMessage(new_upgrade_lines, [], [], git_show_output)

    def testOldEditedAndNew(self) -> None:
        new_upgrade_lines = [
            "Upgraded abc/efg to version 1.2.3 on amd64, arm, x86",
            "Upgraded mno/pqr to version 4.5-r1 on x86",
        ]
        old_upgrade_lines = [
            "So I upgraded xyz/uvw to version 3.2.1 on arm, x86",
            "Then I Upgraded mno/pqr to version 12345 on x86",
        ]
        remaining_lines = [
            "Extraneous extra comments in commit body.",
            "",
            "BUG=chromium-os:12345",
            "TEST=test everything",
            "again and again",
        ]
        git_show_output = (
            "\n".join(old_upgrade_lines)
            + "\n"
            + "\n"
            + "\n".join(remaining_lines)
        )

        # In this test, it should not recognize the existing old_upgrade_lines
        # as a previous commit message from this script.  So it should give a
        # warning and push those lines to the end (grouped with
        # remaining_lines).
        remaining_lines = old_upgrade_lines + [""] + remaining_lines
        self._TestAmendCommitMessage(
            new_upgrade_lines, [], remaining_lines, git_show_output
        )

    #
    # _CreateCommitMessage
    #

    def _TestCreateCommitMessage(self, upgrade_lines):
        """Test Upgrader._CreateCommitMessage"""
        result = cpu.Upgrader._CreateCommitMessage(upgrade_lines)

        self.assertTrue(
            ": upgraded package" in result or "Upgraded the following" in result
        )
        return result

    def testCreateCommitMessageOnePkg(self) -> None:
        upgrade_lines = ["Upgraded abc/efg to version 1.2.3 on amd64, arm, x86"]
        result = self._TestCreateCommitMessage(upgrade_lines)

        # Commit message should have:
        # - Summary that mentions 'efg' and ends in "package" (singular).
        # - Body corresponding to upgrade_lines.
        # - BUG= line (with space after '=' to invalidate it).
        # - TEST= line (with space after '=' to invalidate it).
        body = r"\n".join(
            re.sub(r"\s+", r"\\s", line) for line in upgrade_lines
        )
        regexp = re.compile(
            r"""^efg:\supgraded\spackage\sto\supstream\n # Summary
                            ^\s*\n                            # Blank line
                            %s\n                              # Body
                            ^\s*\n                            # Blank line
                            ^BUG=\s.+\n                       # BUG line
                            ^TEST=\s                          # TEST line
                            """
            % body,
            re.VERBOSE | re.MULTILINE,
        )
        self.assertTrue(regexp.search(result))

    def testCreateCommitMessageThreePkgs(self) -> None:
        upgrade_lines = [
            "Upgraded abc/efg to version 1.2.3 on amd64, arm, x86",
            "Upgraded xyz/uvw to version 1.2.3 on amd64",
            "Upgraded xyz/uvw to version 3.2.1 on arm, x86",
            "Upgraded mno/pqr to version 12345 on x86",
        ]
        result = self._TestCreateCommitMessage(upgrade_lines)

        # Commit message should have:
        # - Summary that mentions 'efg, pqr, uvw' and ends in "packages"
        #   (plural).
        # - Body corresponding to upgrade_lines.
        # - BUG= line (with space after '=' to invalidate it).
        # - TEST= line (with space after '=' to invalidate it).
        body = r"\n".join(
            re.sub(r"\s+", r"\\s", line) for line in upgrade_lines
        )
        regexp = re.compile(
            r"""^efg,\spqr,\suvw:\supgraded\spackages.*\n # Summary
                            ^\s*\n                            # Blank line
                            %s\n                              # Body
                            ^\s*\n                            # Blank line
                            ^BUG=\s.+\n                       # BUG line
                            ^TEST=\s                          # TEST line
                            """
            % body,
            re.VERBOSE | re.MULTILINE,
        )
        self.assertTrue(regexp.search(result))

    def testCreateCommitMessageTenPkgs(self) -> None:
        upgrade_lines = [
            "Upgraded abc/efg to version 1.2.3 on amd64, arm, x86",
            "Upgraded bcd/fgh to version 1.2.3 on amd64",
            "Upgraded cde/ghi to version 3.2.1 on arm, x86",
            "Upgraded def/hij to version 12345 on x86",
            "Upgraded efg/ijk to version 1.2.3 on amd64",
            "Upgraded fgh/jkl to version 3.2.1 on arm, x86",
            "Upgraded ghi/klm to version 12345 on x86",
            "Upgraded hij/lmn to version 1.2.3 on amd64",
            "Upgraded ijk/mno to version 3.2.1 on arm, x86",
            "Upgraded jkl/nop to version 12345 on x86",
        ]
        result = self._TestCreateCommitMessage(upgrade_lines)

        # Commit message should have:
        # - Summary that mentions '10' and ends in "packages" (plural).
        # - Body corresponding to upgrade_lines.
        # - BUG= line (with space after '=' to invalidate it).
        # - TEST= line (with space after '=' to invalidate it).
        body = r"\n".join(
            re.sub(r"\s+", r"\\s", line) for line in upgrade_lines
        )
        regexp = re.compile(
            r"""^Upgraded\s.*10.*\spackages\n     # Summary
                            ^\s*\n                            # Blank line
                            %s\n                              # Body
                            ^\s*\n                            # Blank line
                            ^BUG=\s.+\n                       # BUG line
                            ^TEST=\s                          # TEST line
                            """
            % body,
            re.VERBOSE | re.MULTILINE,
        )
        self.assertTrue(regexp.search(result))


@unittest.skip("playground setup needs more work")
class GetCurrentVersionsTest(CpuTestBase):
    """Test Upgrader._GetCurrentVersions"""

    def _TestGetCurrentVersionsLocalCpv(self, target_pinfolist) -> None:
        cmdargs = []
        mocked_upgrader = self._MockUpgrader(cmdargs=cmdargs, _curr_board=None)
        self._SetUpPlayground()

        # Add test-specific mocks/stubs.
        _depgraph_mock = self.PatchObject(
            cpu.Upgrader, "_GetPreOrderDepGraph", return_value=[]
        )

        # Replay script.
        targets = ["=" + pinfo.cpv for pinfo in target_pinfolist]
        pm_argv = cpu.Upgrader._GenParallelEmergeArgv(mocked_upgrader, targets)
        pm_argv.append("--root-deps")
        mocked_upgrader._GenParallelEmergeArgv.return_value = pm_argv

        # Verify.
        cpu.Upgrader._GetCurrentVersions(mocked_upgrader, target_pinfolist)

        mocked_upgrader._GenParallelEmergeArgv.assert_called_once_with(targets)
        # If we can get this unittest passing again, this is the old mox way of
        # checking the test worked.  We need a fuller sandbox first.
        # mocked_upgrader._SetPortTree(mox.IsA(portcfg.config), mox.IsA(dict))
        # packages = [pinfo.package for pinfo in target_pinfolist]
        # verifier = _GenDepsGraphVerifier(packages)
        # depgraph_mock.assert_called_once_with(mox.Func(verifier))

    def testGetCurrentVersionsTwoPkgs(self) -> None:
        target_pinfolist = [
            cpu.PInfo(package="dev-libs/A", cpv="dev-libs/A-2"),
            cpu.PInfo(package="dev-libs/D", cpv="dev-libs/D-3"),
        ]
        self._TestGetCurrentVersionsLocalCpv(target_pinfolist)

    def testGetCurrentVersionsOnePkgB(self) -> None:
        target_pinfolist = [cpu.PInfo(package="dev-libs/B", cpv="dev-libs/B-2")]
        self._TestGetCurrentVersionsLocalCpv(target_pinfolist)

    def testGetCurrentVersionsOnePkgLibcros(self) -> None:
        target_pinfolist = [
            cpu.PInfo(
                package="chromeos-base/libcros", cpv="chromeos-base/libcros-1"
            )
        ]
        self._TestGetCurrentVersionsLocalCpv(target_pinfolist)

    def _TestGetCurrentVersionsPackageOnly(self, target_pinfolist) -> None:
        cmdargs = []
        mocked_upgrader = self._MockUpgrader(cmdargs=cmdargs, _curr_board=None)
        self._SetUpPlayground()

        # Add test-specific mocks/stubs.
        self.PatchObject(cpu.Upgrader, "_GetPreOrderDepGraph", return_value=[])

        # Replay script.
        packages = [pinfo.package for pinfo in target_pinfolist]
        pm_argv = cpu.Upgrader._GenParallelEmergeArgv(mocked_upgrader, packages)
        pm_argv.append("--root-deps")
        mocked_upgrader._GenParallelEmergeArgv.return_value = pm_argv

        # Verify.
        cpu.Upgrader._GetCurrentVersions(mocked_upgrader, target_pinfolist)

        mocked_upgrader._GenParallelEmergeArgv.assert_called_once_with(packages)
        # If we can get this unittest passing again, this is the old mox way of
        # checking the test worked.  We need a fuller sandbox first.
        # mocked_upgrader._SetPortTree(mox.IsA(portcfg.config), mox.IsA(dict))

    def testGetCurrentVersionsWorld(self) -> None:
        target_pinfolist = [cpu.PInfo(package="world", cpv="world")]
        self._TestGetCurrentVersionsPackageOnly(target_pinfolist)

    def testGetCurrentVersionsLocalOnlyB(self) -> None:
        target_pinfolist = [cpu.PInfo(package="dev-libs/B", cpv=None)]
        self._TestGetCurrentVersionsPackageOnly(target_pinfolist)


class ResolveAndVerifyArgsTest(CpuTestBase):
    """Test Upgrader._ResolveAndVerifyArgs"""

    def _TestResolveAndVerifyArgsWorld(self, upgrade_mode) -> None:
        args = ["world"]
        cmdargs = []
        mocked_upgrader = self._MockUpgrader(cmdargs=cmdargs, _curr_board=None)

        # Verify.
        result = cpu.Upgrader._ResolveAndVerifyArgs(
            mocked_upgrader, args, upgrade_mode=upgrade_mode
        )

        self.assertEqual(
            result,
            [
                cpu.PInfo(
                    user_arg="world",
                    package="world",
                    package_name="world",
                    category=None,
                    cpv="world",
                )
            ],
        )

    def testResolveAndVerifyArgsWorldUpgradeMode(self) -> None:
        self._TestResolveAndVerifyArgsWorld(True)

    def testResolveAndVerifyArgsWorldStatusMode(self) -> None:
        self._TestResolveAndVerifyArgsWorld(False)

    def _TestResolveAndVerifyArgsNonWorld(
        self, pinfolist, cmdargs=None, error=None, error_checker=None
    ):
        if cmdargs is None:
            cmdargs = []
        mocked_upgrader = self._MockUpgrader(cmdargs=cmdargs, _curr_board=None)
        upgrade_mode = cpu.Upgrader._IsInUpgradeMode(mocked_upgrader)

        # Replay script.
        calls_FindCurrentCPV = []
        calls_FindUpstreamCPV = []
        side_effect_FindCurrentCPV = []
        side_effect_FindUpstreamCPV = []
        args = []
        for pinfo in pinfolist:
            arg = pinfo.user_arg
            local_cpv = pinfo.cpv
            upstream_cpv = pinfo.upstream_cpv
            args.append(arg)

            catpkg = cpu.Upgrader._GetCatPkgFromCpv(arg)
            local_arg = catpkg if catpkg else arg

            calls_FindCurrentCPV.append(mock.call(local_arg))
            side_effect_FindCurrentCPV.append(local_cpv)

            calls_FindUpstreamCPV.append(
                mock.call(arg, mocked_upgrader._unstable_ok)
            )
            side_effect_FindUpstreamCPV.append(upstream_cpv)

            if not upstream_cpv and upgrade_mode:
                # Real method raises an exception here.
                if not mocked_upgrader._unstable_ok:
                    calls_FindUpstreamCPV.append(mock.call(arg, True))
                    side_effect_FindUpstreamCPV.append(arg)
                break

        mocked_upgrader._FindCurrentCPV.side_effect = side_effect_FindCurrentCPV
        mocked_upgrader._FindUpstreamCPV.side_effect = (
            side_effect_FindUpstreamCPV
        )

        # Verify.
        result = None
        if error:
            with pytest.raises(error) as exc:
                cpu.Upgrader._ResolveAndVerifyArgs(
                    mocked_upgrader, args, upgrade_mode
                )
                if error_checker:
                    check = error_checker(exc.value)
                    self.assertTrue(check[0], msg=check[1])
        else:
            result = cpu.Upgrader._ResolveAndVerifyArgs(
                mocked_upgrader, args, upgrade_mode
            )

        mocked_upgrader._FindCurrentCPV.assert_has_calls(calls_FindCurrentCPV)
        mocked_upgrader._FindUpstreamCPV.assert_has_calls(calls_FindUpstreamCPV)

        return result

    def testResolveAndVerifyArgsNonWorldUpgrade(self) -> None:
        pinfolist = [
            cpu.PInfo(
                user_arg="dev-libs/B",
                cpv="dev-libs/B-1",
                upstream_cpv="dev-libs/B-2",
            )
        ]
        cmdargs = ["--upgrade", "--unstable-ok"]
        result = self._TestResolveAndVerifyArgsNonWorld(pinfolist, cmdargs)
        self.assertEqual(result, pinfolist)

    def testResolveAndVerifyArgsNonWorldUpgradeSpecificVer(self) -> None:
        pinfolist = [
            cpu.PInfo(
                user_arg="dev-libs/B-2",
                cpv="dev-libs/B-1",
                upstream_cpv="dev-libs/B-2",
            )
        ]
        cmdargs = ["--upgrade", "--unstable-ok"]
        result = self._TestResolveAndVerifyArgsNonWorld(pinfolist, cmdargs)
        self.assertEqual(result, pinfolist)

    def testResolveAndVerifyArgsNonWorldUpgradeSpecificVerNotFoundStable(
        self,
    ) -> None:
        pinfolist = [cpu.PInfo(user_arg="dev-libs/B-2", cpv="dev-libs/B-1")]
        cmdargs = ["--upgrade"]

        def _error_checker(exception):
            # RuntimeError text should mention 'is unstable'.
            text = str(exception)
            phrase = "is unstable"
            msg = 'No mention of "%s" in error message: %s' % (phrase, text)
            return (text.find(phrase) >= 0, msg)

        self._TestResolveAndVerifyArgsNonWorld(
            pinfolist, cmdargs, error=RuntimeError, error_checker=_error_checker
        )

    def testResolveAndVerifyArgsNonWorldUpgradeSpecificVerNotFoundUnstable(
        self,
    ) -> None:
        pinfolist = [cpu.PInfo(user_arg="dev-libs/B-2", cpv="dev-libs/B-1")]
        cmdargs = ["--upgrade", "--unstable-ok"]

        def _error_checker(exception):
            # RuntimeError text should start with 'Unable to find'.
            text = str(exception)
            phrase = "Unable to find"
            msg = 'Error message expected to start with "%s": %s' % (
                phrase,
                text,
            )
            return (text.startswith(phrase), msg)

        self._TestResolveAndVerifyArgsNonWorld(
            pinfolist, cmdargs, error=RuntimeError, error_checker=_error_checker
        )

    def testResolveAndVerifyArgsNonWorldLocalOnly(self) -> None:
        pinfolist = [cpu.PInfo(user_arg="dev-libs/B", cpv="dev-libs/B-1")]
        cmdargs = ["--upgrade", "--unstable-ok"]

        def _error_checker(exception):
            # RuntimeError text should start with 'Unable to find'.
            text = str(exception)
            phrase = "Unable to find"
            msg = 'Error message expected to start with "%s": %s' % (
                phrase,
                text,
            )
            return (text.startswith(phrase), msg)

        self._TestResolveAndVerifyArgsNonWorld(
            pinfolist, cmdargs, error=RuntimeError, error_checker=_error_checker
        )

    def testResolveAndVerifyArgsNonWorldUpstreamOnly(self) -> None:
        pinfolist = [
            cpu.PInfo(user_arg="dev-libs/B", upstream_cpv="dev-libs/B-2")
        ]
        cmdargs = ["--upgrade", "--unstable-ok"]
        result = self._TestResolveAndVerifyArgsNonWorld(pinfolist, cmdargs)
        self.assertEqual(result, pinfolist)

    def testResolveAndVerifyArgsNonWorldNeither(self) -> None:
        pinfolist = [cpu.PInfo(user_arg="dev-libs/B")]
        cmdargs = ["--upgrade", "--unstable-ok"]
        self._TestResolveAndVerifyArgsNonWorld(
            pinfolist, cmdargs, error=RuntimeError
        )

    def testResolveAndVerifyArgsNonWorldStatusSpecificVer(self) -> None:
        """Exception because specific cpv arg not allowed without --ugprade."""
        cmdargs = ["--unstable-ok"]
        mocked_upgrader = self._MockUpgrader(cmdargs=cmdargs, _curr_board=None)
        upgrade_mode = cpu.Upgrader._IsInUpgradeMode(mocked_upgrader)

        # Verify.
        self.assertRaises(
            RuntimeError,
            cpu.Upgrader._ResolveAndVerifyArgs,
            mocked_upgrader,
            ["dev-libs/B-2"],
            upgrade_mode,
        )


class StabilizeEbuildTest(CpuTestBase):
    """Tests for _StabilizeEbuild()."""

    PREFIX_LINES = [
        "Garbletygook nonsense unimportant",
        "Some other nonsense with KEYWORDS mention",
    ]
    POSTFIX_LINES = [
        "Some mention of KEYWORDS in a line",
        "And other nonsense",
    ]

    def _TestStabilizeEbuild(self, ebuild_path, arch) -> None:
        mocked_upgrader = self._MockUpgrader(cmdargs=[], _curr_arch=arch)

        # This is the verification phase.
        cpu.Upgrader._StabilizeEbuild(mocked_upgrader, ebuild_path)

    def _AssertEqualsExcludingComments(self, lines1, lines2) -> None:
        lines1 = [ln for ln in lines1 if not ln.startswith("#")]
        lines2 = [ln for ln in lines2 if not ln.startswith("#")]

        self.assertEqual(lines1, lines2)

    def _TestStabilizeEbuildWrapper(
        self, ebuild_path, arch, keyword_line, gold_keyword_line
    ) -> None:
        if not isinstance(keyword_line, list):
            keyword_line = [keyword_line]
        if not isinstance(gold_keyword_line, list):
            gold_keyword_line = [gold_keyword_line]

        input_content = self.PREFIX_LINES + keyword_line + self.POSTFIX_LINES
        gold_content = (
            self.PREFIX_LINES + gold_keyword_line + self.POSTFIX_LINES
        )

        # Write contents to ebuild_path before test.
        osutils.WriteFile(ebuild_path, "\n".join(input_content))

        self._TestStabilizeEbuild(ebuild_path, arch)

        # Read content back after test.
        content_lines = osutils.ReadFile(ebuild_path).splitlines()

        self._AssertEqualsExcludingComments(gold_content, content_lines)

    def testNothingToDo(self) -> None:
        arch = "arm"
        keyword_line = 'KEYWORDS="amd64 arm mips x86"'
        gold_keyword_line = 'KEYWORDS="*"'
        self._TestStabilizeEbuildWrapper(
            self.tempdir / "ebuild", arch, keyword_line, gold_keyword_line
        )

    def testNothingToDoFbsd(self) -> None:
        arch = "x86"
        keyword_line = 'KEYWORDS="amd64 arm ~mips x86 ~x86-fbsd"'
        gold_keyword_line = 'KEYWORDS="*"'
        self._TestStabilizeEbuildWrapper(
            self.tempdir / "ebuild", arch, keyword_line, gold_keyword_line
        )

    def testSimpleMiddleOfLine(self) -> None:
        arch = "arm"
        keyword_line = 'KEYWORDS="amd64 ~arm ~mips x86"'
        gold_keyword_line = 'KEYWORDS="*"'
        self._TestStabilizeEbuildWrapper(
            self.tempdir / "ebuild", arch, keyword_line, gold_keyword_line
        )

    def testSimpleMiddleOfLineSpacePrefix(self) -> None:
        arch = "arm"
        keyword_line = '    KEYWORDS="amd64 ~arm ~mips x86"'
        gold_keyword_line = '    KEYWORDS="*"'
        self._TestStabilizeEbuildWrapper(
            self.tempdir / "ebuild", arch, keyword_line, gold_keyword_line
        )

    def testSimpleStartOfLine(self) -> None:
        arch = "arm"
        keyword_line = 'KEYWORDS="~arm amd64 ~mips x86"'
        gold_keyword_line = 'KEYWORDS="*"'
        self._TestStabilizeEbuildWrapper(
            self.tempdir / "ebuild", arch, keyword_line, gold_keyword_line
        )

    def testSimpleEndOfLine(self) -> None:
        arch = "arm"
        keyword_line = 'KEYWORDS="amd64 ~mips x86 ~arm"'
        gold_keyword_line = 'KEYWORDS="*"'
        self._TestStabilizeEbuildWrapper(
            self.tempdir / "ebuild", arch, keyword_line, gold_keyword_line
        )

    def testPreFbsd(self) -> None:
        arch = "x86"
        keyword_line = 'KEYWORDS="amd64 ~arm ~mips ~x86 ~x86-fbsd"'
        gold_keyword_line = 'KEYWORDS="*"'
        self._TestStabilizeEbuildWrapper(
            self.tempdir / "ebuild", arch, keyword_line, gold_keyword_line
        )

    def testPostFbsd(self) -> None:
        arch = "x86"
        keyword_line = 'KEYWORDS="amd64 ~arm ~mips ~x86-fbsd ~x86"'
        gold_keyword_line = 'KEYWORDS="*"'
        self._TestStabilizeEbuildWrapper(
            self.tempdir / "ebuild", arch, keyword_line, gold_keyword_line
        )

    def testMultilineKeywordsMiddle(self) -> None:
        arch = "arm"
        keyword_lines = [
            'KEYWORDS="amd64',
            "  ~arm",
            "  ~mips",
            '  x86"',
        ]
        gold_keyword_lines = [
            'KEYWORDS="*"',
        ]
        self._TestStabilizeEbuildWrapper(
            self.tempdir / "ebuild", arch, keyword_lines, gold_keyword_lines
        )

    def testMultilineKeywordsStart(self) -> None:
        arch = "amd64"
        keyword_lines = [
            'KEYWORDS="~amd64',
            "  arm",
            "  ~mips",
            '  x86"',
        ]
        gold_keyword_lines = [
            'KEYWORDS="*"',
        ]
        self._TestStabilizeEbuildWrapper(
            self.tempdir / "ebuild", arch, keyword_lines, gold_keyword_lines
        )

    def testMultilineKeywordsEnd(self) -> None:
        arch = "x86"
        keyword_lines = [
            'KEYWORDS="amd64',
            "  arm",
            "  ~mips",
            '  ~x86"',
        ]
        gold_keyword_lines = [
            'KEYWORDS="*"',
        ]
        self._TestStabilizeEbuildWrapper(
            self.tempdir / "ebuild", arch, keyword_lines, gold_keyword_lines
        )

    def testMultipleKeywordLinesOneChange(self) -> None:
        arch = "arm"
        keyword_lines = [
            'KEYWORDS="amd64 arm mips x86"',
            'KEYWORDS="~amd64 ~arm ~mips ~x86"',
        ]
        gold_keyword_lines = [
            'KEYWORDS="*"',
        ] * 2
        self._TestStabilizeEbuildWrapper(
            self.tempdir / "ebuild", arch, keyword_lines, gold_keyword_lines
        )

    def testMultipleKeywordLinesMultipleChanges(self) -> None:
        arch = "arm"
        keyword_lines = [
            'KEYWORDS="amd64 ~arm mips x86"',
            'KEYWORDS="~amd64 ~arm ~mips ~x86"',
        ]
        gold_keyword_lines = [
            'KEYWORDS="*"',
        ] * 2
        self._TestStabilizeEbuildWrapper(
            self.tempdir / "ebuild", arch, keyword_lines, gold_keyword_lines
        )

    def testMultipleKeywordLinesMultipleChangesSpacePrefix(self) -> None:
        arch = "arm"
        keyword_lines = [
            '     KEYWORDS="amd64 ~arm mips x86"',
            '     KEYWORDS="~amd64 ~arm ~mips ~x86"',
        ]
        gold_keyword_lines = [
            '     KEYWORDS="*"',
        ] * 2
        self._TestStabilizeEbuildWrapper(
            self.tempdir / "ebuild", arch, keyword_lines, gold_keyword_lines
        )


@unittest.skip("playground setup needs more work")
class GetPreOrderDepGraphTest(CpuTestBase):
    """Test the Upgrader class from cros_portage_upgrade."""

    #
    # _GetPreOrderDepGraph (defunct - to be replaced)
    #

    def _TestGetPreOrderDepGraph(self, pkg) -> None:
        """Test the behavior of the Upgrader._GetPreOrderDepGraph method."""

        cmdargs = []
        mocked_upgrader = self._MockUpgrader(cmdargs=cmdargs, _curr_board=None)
        self._SetUpPlayground()

        # Verify.
        pm_argv = cpu.Upgrader._GenParallelEmergeArgv(mocked_upgrader, [pkg])
        pm_argv.append("--root-deps")
        deps = depgraph.DepGraphGenerator()
        deps.Initialize(pm_argv)
        deps_tree, deps_info, _ = deps.GenDependencyTree()
        deps_graph = deps.GenDependencyGraph(deps_tree, deps_info)

        deps_list = cpu.Upgrader._GetPreOrderDepGraph(deps_graph)
        golden_deps_set = _GetGoldenDepsSet(pkg)
        self.assertEqual(set(deps_list), golden_deps_set)

    def testGetPreOrderDepGraphDevLibsA(self):
        return self._TestGetPreOrderDepGraph("dev-libs/A")

    def testGetPreOrderDepGraphDevLibsC(self):
        return self._TestGetPreOrderDepGraph("dev-libs/C")

    def testGetPreOrderDepGraphVirtualLibusb(self):
        return self._TestGetPreOrderDepGraph("virtual/libusb")

    def testGetPreOrderDepGraphCrosbaseLibcros(self):
        return self._TestGetPreOrderDepGraph("chromeos-base/libcros")


@pytest.mark.usefixtures("legacy_capture_output")
class MainTest(CpuTestBase):
    """Test argument handling at the main method level."""

    def _AssertCPUMain(self, args, expect_zero) -> None:
        """Run cpu.main() and assert exit value is expected.

        If |expect_zero| is True, assert exit value = 0.  If False,
        assert exit value != 0.
        """
        try:
            cpu.main(args)
        except SystemExit as e:
            if expect_zero:
                self.assertEqual(
                    e.args[0],
                    0,
                    msg="expected call to main() to exit cleanly, "
                    "but it exited with code %d" % e.args[0],
                )
            else:
                self.assertNotEqual(
                    e.args[0],
                    0,
                    msg="expected call to main() to exit with "
                    "failure code, but exited with code 0 instead.",
                )

    def AssertOutputEndsInError(self) -> None:
        """Assert stderr of the current test ends in error line."""

        ERROR_MSG_RE = re.compile(
            r"^\033\[1;%dm(.+?)(?:\033\[0m)+$" % (30 + terminal.Color.RED,),
            re.DOTALL,
        )

        res = self.capfd.readouterr()
        last_line = res.out.splitlines()[-1]

        assert ERROR_MSG_RE.search(last_line)

    def testHelp(self) -> None:
        """Test that --help is functioning"""

        # Running with --help should exit with code==0.
        try:
            cpu.main(["--help"])
        except SystemExit as e:
            self.assertEqual(e.args[0], 0)

        # Verify that a message beginning with "Usage: " was printed.

        stdout = self.capfd.readouterr().out
        assert stdout.startswith("usage: ")

    def testMissingBoard(self) -> None:
        """Test that running without --board exits with an error."""
        # Running without --board should exit with code!=0.
        try:
            cpu.main([])
        except SystemExit as e:
            self.assertNotEqual(e.args[0], 0)

        # Verify that an error message was printed.
        self.AssertOutputEndsInError()

    def testBoardWithoutPackage(self) -> None:
        """Test that running without a package argument exits with an error."""
        # Running without a package should exit with code!=0.
        self._AssertCPUMain(["--board=any-board"], expect_zero=False)

        # Verify that an error message was printed.
        self.AssertOutputEndsInError()

    def testHostWithoutPackage(self) -> None:
        """Test that running without a package argument exits with an error."""
        # Running without a package should exit with code!=0.
        self._AssertCPUMain(["--host"], expect_zero=False)

        # Verify that an error message was printed.
        self.AssertOutputEndsInError()

    def testUpgradeAndUpgradeDeep(self) -> None:
        """Running with --upgrade and --upgrade-deep exits with an error."""
        # Expect exit with code!=0.
        self._AssertCPUMain(
            ["--host", "--upgrade", "--upgrade-deep", "any-package"],
            expect_zero=False,
        )

        # Verify that an error message was printed.
        self.AssertOutputEndsInError()

    def testForceWithoutUpgrade(self) -> None:
        """Running with --force requires --upgrade or --upgrade-deep."""
        # Expect exit with code!=0.
        self._AssertCPUMain(
            ["--host", "--force", "any-package"], expect_zero=False
        )

        # Verify that an error message was printed.
        self.AssertOutputEndsInError()

    def testFlowStatusReportOneBoard(self) -> None:
        """Test main flow for basic one-board status report."""
        self.PatchObject(cpu.Upgrader, "PreRunChecks")
        self.PatchObject(cpu, "_BoardIsSetUp", return_value=True)
        self.PatchObject(cpu.Upgrader, "PrepareToRun")
        self.PatchObject(cpu.Upgrader, "RunBoard")
        self.PatchObject(cpu.Upgrader, "RunCompleted")

        self._AssertCPUMain(
            ["--board=any-board", "any-package"],
            expect_zero=True,
        )

    def testFlowStatusReportOneBoardNotSetUp(self) -> None:
        """Test main flow for basic one-board status report."""
        self.PatchObject(cpu.Upgrader, "PreRunChecks")
        self.PatchObject(cpu, "_BoardIsSetUp", return_value=False)

        # Running with a package not set up should exit with code!=0.
        self._AssertCPUMain(
            ["--board=any-board", "any-package"],
            expect_zero=False,
        )

        # Verify that an error message was printed.
        self.AssertOutputEndsInError()

    def testFlowStatusReportTwoBoards(self) -> None:
        """Test main flow for two-board status report."""
        self.PatchObject(cpu.Upgrader, "PreRunChecks")
        self.PatchObject(cpu, "_BoardIsSetUp", return_value=True)
        self.PatchObject(cpu.Upgrader, "PrepareToRun")
        self.PatchObject(cpu.Upgrader, "RunBoard")
        self.PatchObject(cpu.Upgrader, "RunCompleted")

        self._AssertCPUMain(
            ["--board=board1:board2", "any-package"], expect_zero=True
        )

    def testFlowUpgradeOneBoard(self) -> None:
        """Test main flow for basic one-board upgrade."""
        self.PatchObject(cpu.Upgrader, "PreRunChecks")
        self.PatchObject(cpu, "_BoardIsSetUp", return_value=True)
        self.PatchObject(cpu.Upgrader, "PrepareToRun")
        self.PatchObject(cpu.Upgrader, "RunBoard")
        self.PatchObject(cpu.Upgrader, "RunCompleted")

        self._AssertCPUMain(
            ["--upgrade", "--board=any-board", "any-package"], expect_zero=True
        )

    def testFlowUpgradeTwoBoards(self) -> None:
        """Test main flow for two-board upgrade."""
        self.PatchObject(cpu.Upgrader, "PreRunChecks")
        self.PatchObject(cpu, "_BoardIsSetUp", return_value=True)
        self.PatchObject(cpu.Upgrader, "PrepareToRun")
        self.PatchObject(cpu.Upgrader, "RunBoard")
        self.PatchObject(cpu.Upgrader, "RunCompleted")

        self._AssertCPUMain(
            [
                "--upgrade",
                "--board=board1:board2",
                "any-package",
            ],
            expect_zero=True,
        )

    def testFlowUpgradeTwoBoardsAndHost(self) -> None:
        """Test main flow for two-board and host upgrade."""
        self.PatchObject(cpu.Upgrader, "PreRunChecks")
        self.PatchObject(cpu, "_BoardIsSetUp", return_value=True)
        self.PatchObject(cpu.Upgrader, "PrepareToRun")
        self.PatchObject(cpu.Upgrader, "RunBoard")
        self.PatchObject(cpu.Upgrader, "RunCompleted")

        self._AssertCPUMain(
            [
                "--upgrade",
                "--host",
                "--board=board1:host:board2",
                "any-package",
            ],
            expect_zero=True,
        )
