# Copyright 2018 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""ChromeOS encryption/decryption key management"""

import collections
import configparser
import logging
import os
import re
from typing import Any

from chromite.lib import cros_build_lib
from chromite.lib import osutils


class SignerKeyError(Exception):
    """Raise when there in an error related to keys"""


class SignerKeyMissingError(SignerKeyError):
    """Raise if key is missing from subset"""


class SignerRootOfTrustKeyMissingError(SignerKeyMissingError):
    """Raise if a root_of_trust-specific key is missing"""


class VersionOverflowError(SignerKeyError):
    """Raise if incrementing a version would overflow 16 bits"""


class KeyPair:
    """Container for a key's files.

    A KeyPair contains the information about a particular public/private pair of
    keys, including file names, and version.

    Attributes:
        name: name of keypair
        keydir: location of key files
        version: version of key
        public: public key file complete path
        private: private key file complete path
        keyblock: keyblock file complete path
    """

    # On disk, we have (valid) file names like:
    # - firmware_data_key.vbprivk  -- non-unified build
    # - firmware_data_key.loem1.vbprivk -- unified build
    # - kernel_data_key.vbprivk
    # - kernel_subkey.vbprivk
    # - key_ec_efs.vbprik2 (which pairs with .vbpubk2, instead of .vbpubk)

    # All the following regular expressions are used to validate names and
    # extensions. From KeyPair's perspective, a name is simply a string of 2 or
    # more alphanumeric characters, possibly containing a single '.' somewhere
    # in the middle, with one of two valid extensions ('.vbprik2' or
    # '.vbprivk').
    _name_re = re.compile(r"\w+\.?\w+$")

    # Valid key file name endings:
    # - .vbprivk (pairs with .vbpubk)
    # - .vbprik2 (pairs with .vbpubk2)
    _priv_ext_re = re.compile(r"\.vbpri(vk|k2)$")
    _pub_ext_re = re.compile(r"\.vbpubk2?$")

    # This is used by ParsePrivateKeyFilename to recognize a valid private key
    # file name.  Keyset initialization only creates KeyPairs for valid private
    # keys that it finds in the directory.
    _priv_filename_re = re.compile(
        r"(?P<name>\w+\.?\w+)(?P<ext>\.vbpri(?:vk|k2))$"
    )

    def __init__(
        self, name, keydir, version=1, pub_ext=None, priv_ext=".vbprivk"
    ) -> None:
        """Initialize KeyPair.

        Args:
            name: name of key
            keydir: directory containing key files
            version: version of the key (forced to int)
            pub_ext: file extension used for public key
            priv_ext: file extension used for private key
        """
        self.name = name
        self.keydir = keydir
        if version is None:
            version = 1
        self.version = int(version)
        # Use the correct version for pub_ext if they did not specify.
        self._pub_ext = (
            pub_ext
            if pub_ext
            else ".vbpubk"
            if priv_ext == ".vbprivk"
            else ".vbpubk2"
        )
        self._priv_ext = priv_ext

        # Validate input parameters.
        if not self._name_re.match(name):
            raise ValueError("Illegal value for name")
        if not self._pub_ext_re.match(self._pub_ext):
            raise ValueError("Illegal value for pub_ext")
        if not self._priv_ext_re.match(priv_ext):
            raise ValueError("Illegal value for priv_ext")

        self.public = os.path.join(keydir, name + self._pub_ext)
        self.private = os.path.join(keydir, name + self._priv_ext)

        # As we create the keyblock name, strip '_data_key' from any name, since
        # this is a common convention.
        keyblock_name = "".join(name.split("_data_key"))
        self.keyblock = os.path.join(keydir, keyblock_name + ".keyblock")

    def __eq__(self, other: Any) -> bool:
        return (
            isinstance(other, KeyPair)
            and self.name == other.name
            and self.version == other.version
            and self.public == other.public
            and self.private == other.private
        )

    def Copy(self):
        """Return a copy of ourselves."""
        return KeyPair(
            self.name, self.keydir, self.version, self._pub_ext, self._priv_ext
        )

    @classmethod
    def ParsePrivateKeyFilename(cls, file_name):
        """Extract the name and extension from the filename.

        Args:
            file_name: a filename that may or may not be a private key.  Leading
                directories are ignored.

        Returns:
            None or re.MatchObject with named groups 'name' and 'ext'.
        """
        basename = file_name.split("/")[-1]
        return cls._priv_filename_re.match(basename)

    def Exists(self, require_public=False, require_private=False):
        """Returns True if key exists on disk."""

        has_public = os.path.exists(self.public)
        has_private = os.path.exists(self.private)

        if require_public and not has_public:
            return False
        if require_private and not has_private:
            return False
        return has_public or has_private

    def KeyblockExists(self):
        """Returns if keyblock exist."""
        return os.path.exists(self.keyblock)

    def GetSHA1sum(self):
        """Returns key's sha1sum returns.

        Raises:
            SignerKeyError: If error getting sha1sum from public key
            RunCommandError: if vbutil_key fails
        """
        res = cros_build_lib.run(
            ["vbutil_key", "--unpack", self.public],
            check=False,
            encoding="utf-8",
            stdout=True,
        )

        # Match line that looks like: 'Key sha1sum: <sha1sum>'.
        match = re.search(r"Key sha1sum: +(\w+)", res.stdout)
        if match:
            return match.group(1)
        else:
            raise SignerKeyError("Unable to get sha1sum for %s" % self.public)


class KeyVersions:
    """Manage key.versions file

    Attributes:
        saved: True if the file on disk matches the contents of the instance.
            None of the methods save the instance to disk automatically.
    """

    def __init__(self, filename) -> None:
        self._versions = {}
        self._path = filename
        if os.path.exists(filename):
            self.saved = True
            # TODO(b/236161656): Fix.
            # pylint: disable-next=consider-using-with
            for line in open(filename, "r", encoding="utf-8").readlines():
                if line.find("=") > 0:
                    k, v = line.strip().split("=")
                    try:
                        v = int(v)
                    except ValueError:
                        # Keys that end with "_version" must be ints.
                        if k.endswith("_version"):
                            raise
                    self._versions[k] = v
            # Make sure that 'name' is in the version dictionary.
            if "name" not in self._versions:
                logging.warning('%s lacks a name.  Using "unknown".', filename)
                self._versions["name"] = "unknown"
        else:
            self.saved = False
            self._versions = {
                "name": "unknown",
                "firmware_key_version": 1,
                "firmware_version": 1,
                "kernel_key_version": 1,
                "kernel_version": 1,
            }
        # Caller is responsible for calling Save()

    def _KeyName(self, key):
        """return the correct name to use when looking up version."""
        # If an entry exists with the given name, then return that.
        if key in self._versions:
            return key
        # We want to be idempotent.
        if key.endswith("_version"):
            key = key[:-8]
        # Strip anything after a '.', since those are root-of-trust names.
        key = key.split(".")[0]
        # Strip off any _data_key ending.
        if key.endswith("_data_key"):
            key = key[:-9]
        return key + "_version"

    def Get(self, key, default=None):
        """Get a key's version, return default if unknown."""
        return self._versions.get(self._KeyName(key), default)

    def Set(self, key, version) -> None:
        """Set a key's version.  Caller is responsible for calling Save()."""
        key = self._KeyName(key)
        # If it converts to an int, we want the int.
        try:
            version = int(version)
        except ValueError:
            pass
        self._versions[key] = version
        self.saved = False
        # Caller is responsible for calling Save()

    def Increment(self, key):
        """Increment key's version.  Caller is responsible for calling Save().

        Args:
            key: name of the key.

        Returns:
            Incremented version.

        Raises:
            VersionOverflowError: version is 16 bit, and incrementing would
                cause overflow.
        """
        key = self._KeyName(key)
        if self._versions[key] == 0xFFFF:
            raise VersionOverflowError("%s version overflow" % key)
        self._versions[key] += 1
        self.saved = False
        return self._versions[key]
        # Caller is responsible for calling Save()

    def Save(self) -> None:
        """Save KeyVersions to disk if needed."""
        if self.saved:
            return
        keys = sorted(self._versions.keys())
        if "name" in keys:
            keys.remove("name")
            keys.insert(0, "name")
        lines = ["%s=%s" % (k, str(self._versions[k])) for k in keys]
        contents = "\n".join(lines) + "\n"
        osutils.WriteFile(self._path, contents)
        self.saved = True


class Keyset:
    """Store signer keys and keyblocks (think keychain).

    A Keyset is the collection of KeyPairs needed to work with a specific Build
    Target image.
    This includes both keys shared by the Build (self.keys):
        - installer_kernel_data_key
        - kernel_data_key
        - kernel_subkey
        - recovery
        - recovery_kernel_data_key
    as well as keys specific to the artifact (self._root_of_trust_keys):
        - root_key
        - firmware_data_key

    Attributes:
        keys: dict of keypairs, indexed on key's name
        root_of_trust_map: dict of root_of_trust alias (e.g., 'loem1') to use
            for each root_of_trust name (e.g.  'ACME').  Keys in the table are
            both root_of_trust names and root_of_trust aliases, so that
            root_of_trust_map[root_of_trust_map[root_of_trust_name]] works.
    """

    # If we have a root_of_trust name, it is of the form 'name.root_of_trust',
    # so we will simply split('.') the name to get the components.  If this is a
    # unified root_of_trust, then there are per-root_of_trust keys, and
    # self.root_of_trust_key_prefixes will be set to this.
    _root_of_trust_key_names = set(("firmware_data_key", "root_key"))

    def __init__(self, key_dir=None) -> None:
        """Initialize the Keyset from key_dir, if given.

        Note: every public key and keyblock must have an accompanying private
            key.

        Args:
            key_dir: directory from which to load Keyset.  [default=None]
        """
        self.keys = {}
        self.key_dir = key_dir
        self.root_of_trust_map = {}
        self._root_of_trust_key_prefixes = set()
        self._root_of_trust_keys = collections.defaultdict(dict)
        self.name = "unknown"
        if key_dir and os.path.exists(key_dir):
            # Get all root_of_trust aliases.  The legacy code base refers to
            # 'root_of_trust' as 'loem'. We need to support the on-disk
            # structures which have a table of 'XX = ALIAS', with the implied
            # name 'loemXX'.
            loem_config_filename = os.path.join(key_dir, "loem.ini")
            if os.path.exists(loem_config_filename):
                logging.debug("Reading loem.ini file")
                loem_config = configparser.ConfigParser()
                if loem_config.read(loem_config_filename):
                    if loem_config.has_section("loem"):
                        self._root_of_trust_key_prefixes = (
                            self._root_of_trust_key_names
                        )
                        for idx, loem in loem_config.items("loem"):
                            alias = "loem" + idx
                            logging.debug(
                                "Adding loem alias %s %s", loem, alias
                            )
                            self.root_of_trust_map[loem] = alias
                            # We also want loemXX to point to loemXX, since our
                            # callers tend to use both name and alias
                            # interchangeably.
                            # TODO(lamontjones) evaluate whether or not we
                            #   should force it to be indexed by only name,
                            #   instead of both.
                            self.root_of_trust_map[alias] = alias
                else:
                    logging.warning("Error reading loem.ini file")

            versions_filename = os.path.join(key_dir, "key.versions")
            self._versions = KeyVersions(versions_filename)
            self.name = self._versions.Get("name")
            if self.name is None:
                logging.warning("key.versions does not set name.")

            # Match any private key file name
            # Ex: firmware_data_key.loem4.vbprivk, kernel_subkey.vbprivk
            for f_name in os.listdir(key_dir):
                match = KeyPair.ParsePrivateKeyFilename(f_name)
                if match:
                    key_name = match.group("name")
                    if key_name not in self.keys:
                        logging.debug("Found new key %s", key_name)
                        key = KeyPair(
                            key_name,
                            key_dir,
                            version=self._versions.Get(key_name, 1),
                            priv_ext=match.group("ext"),
                        )
                        # AddKey will detect whether this is a
                        # root_of_trust-specific key and do the right thing.
                        self.AddKey(key)

    def __eq__(self, other: Any) -> bool:
        return (
            isinstance(other, Keyset)
            and self.root_of_trust_map == other.root_of_trust_map
            and self.keys == other.keys
        )

    def Prune(self) -> None:
        """Check that all keys exists, else remove them."""
        for k in list(self.keys):
            if not self.keys[k].Exists():
                self.keys.pop(k)
        for root_of_trust in list(self._root_of_trust_keys):
            for k in list(self._root_of_trust_keys[root_of_trust]):
                if not self._root_of_trust_keys[root_of_trust][k].Exists():
                    self._root_of_trust_keys[root_of_trust].pop(k)

    def AddKey(self, key) -> None:
        """Add key to Keyset.

        Args:
            key: The KeyPair to add.  key.name is checked to see if it is
                root_of_trust-specific, and the correct group is used.
        """
        if "." in key.name:
            key_name, root_of_trust_name = key.name.split(".")
            # Some legacy keyfiles have .vN.vprivk suffixes, even though they
            # are not root_of_trust keys. (They are backup keys for older
            # versions of the key.) Restricting the root_of_trust_keys to those
            # in _root_of_trust_key_prefixes helps with that.
            if key_name in self._root_of_trust_key_prefixes:
                logging.debug(
                    "Found root_of_trust %s.%s", key_name, root_of_trust_name
                )
                self.AddRootOfTrustKey(key_name, root_of_trust_name, key)
                return
        self.keys[key.name] = key

    def AddRootOfTrustKey(self, key_name, root_of_trust_alias, key) -> None:
        """Attach the root_of_trust-specific key to the base key."""
        # _root_of_trust_keys['loem2']['root_key'] = KeyPair(
        #   'root_key.loem2', ...)
        self._root_of_trust_keys[root_of_trust_alias][key_name] = key

    def KeyExists(self, key_name, require_public=False, require_private=False):
        """Returns if key is in Keyset and exists.

        If this Keyset has root_of_trust-specific keys, then
        root_of_trust-specific keys will only be found if GetBuildKeyset() has
        been called to get the root_of_trust-specific Keyset.
        """
        return key_name in self.keys and self.keys[key_name].Exists(
            require_public=require_public, require_private=require_private
        )

    def KeyblockExists(self, key_name):
        """Returns if keyblock exists

        If this Keyset has root_of_trust-specific keys, then keyblocks for
        root_of_trust-specific keys will only be found if GetBuildKeyset() has
        been called to get the root_of_trust-specific Keyset.
        """
        return key_name in self.keys and self.keys[key_name].KeyblockExists()

    def GetRootOfTrustKeys(self, key_name):
        """Get root_of_trust-specific keys by keyname.

        Args:
            key_name: name of root_of_trust-specific key.  e.g., 'root_key'

        Returns:
            dict of root_of_trust_alias: key
        """
        ret = {}
        for k, v in self._root_of_trust_keys.items():
            if key_name in v:
                ret[k] = v[key_name]
        if key_name in self.keys:
            ret[key_name] = self.keys[key_name]
        return ret

    def GetBuildKeyset(self, root_of_trust_name):
        """Get new Keyset containing keys based on the root_of_trust_name given.

        The following keys are included:
            * This root_of_trust's root_of_trust-specific keys
            * Any non-root_of_trust-specific keys.

        Args:
            root_of_trust_name: either the root_of_trust name (e.g., 'acme') or
                alias (e.g., 'loem1').

        Raises:
            SignerRootOfTrustKeyMissingError if subkey not found
        """
        ks = Keyset()

        found = False

        # Use alias if exists
        root_of_trust_alias = self.root_of_trust_map.get(root_of_trust_name, "")

        for key in self.keys.values():
            ks.AddKey(key)
        for key in self._root_of_trust_keys[root_of_trust_alias].values():
            found = True
            ks.AddKey(key)
            # Also add the key as its base name.
            key = key.Copy()
            key.name = key.name.split(".")[0]
            ks.AddKey(key)

        if not found:
            raise SignerRootOfTrustKeyMissingError(
                "Unable to find %s" % (root_of_trust_name,)
            )

        return ks
