| # Copyright 2018 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """ChromeOS encryption/decryption key management""" |
| |
| import collections |
| import configparser |
| import logging |
| import os |
| import re |
| from typing import Any |
| |
| from chromite.lib import cros_build_lib |
| from chromite.lib import osutils |
| |
| |
| class SignerKeyError(Exception): |
| """Raise when there in an error related to keys""" |
| |
| |
| class SignerKeyMissingError(SignerKeyError): |
| """Raise if key is missing from subset""" |
| |
| |
| class SignerRootOfTrustKeyMissingError(SignerKeyMissingError): |
| """Raise if a root_of_trust-specific key is missing""" |
| |
| |
| class VersionOverflowError(SignerKeyError): |
| """Raise if incrementing a version would overflow 16 bits""" |
| |
| |
| class KeyPair: |
| """Container for a key's files. |
| |
| A KeyPair contains the information about a particular public/private pair of |
| keys, including file names, and version. |
| |
| Attributes: |
| name: name of keypair |
| keydir: location of key files |
| version: version of key |
| public: public key file complete path |
| private: private key file complete path |
| keyblock: keyblock file complete path |
| """ |
| |
| # On disk, we have (valid) file names like: |
| # - firmware_data_key.vbprivk -- non-unified build |
| # - firmware_data_key.loem1.vbprivk -- unified build |
| # - kernel_data_key.vbprivk |
| # - kernel_subkey.vbprivk |
| # - key_ec_efs.vbprik2 (which pairs with .vbpubk2, instead of .vbpubk) |
| |
| # All the following regular expressions are used to validate names and |
| # extensions. From KeyPair's perspective, a name is simply a string of 2 or |
| # more alphanumeric characters, possibly containing a single '.' somewhere |
| # in the middle, with one of two valid extensions ('.vbprik2' or |
| # '.vbprivk'). |
| _name_re = re.compile(r"\w+\.?\w+$") |
| |
| # Valid key file name endings: |
| # - .vbprivk (pairs with .vbpubk) |
| # - .vbprik2 (pairs with .vbpubk2) |
| _priv_ext_re = re.compile(r"\.vbpri(vk|k2)$") |
| _pub_ext_re = re.compile(r"\.vbpubk2?$") |
| |
| # This is used by ParsePrivateKeyFilename to recognize a valid private key |
| # file name. Keyset initialization only creates KeyPairs for valid private |
| # keys that it finds in the directory. |
| _priv_filename_re = re.compile( |
| r"(?P<name>\w+\.?\w+)(?P<ext>\.vbpri(?:vk|k2))$" |
| ) |
| |
| def __init__( |
| self, name, keydir, version=1, pub_ext=None, priv_ext=".vbprivk" |
| ) -> None: |
| """Initialize KeyPair. |
| |
| Args: |
| name: name of key |
| keydir: directory containing key files |
| version: version of the key (forced to int) |
| pub_ext: file extension used for public key |
| priv_ext: file extension used for private key |
| """ |
| self.name = name |
| self.keydir = keydir |
| if version is None: |
| version = 1 |
| self.version = int(version) |
| # Use the correct version for pub_ext if they did not specify. |
| self._pub_ext = ( |
| pub_ext |
| if pub_ext |
| else ".vbpubk" |
| if priv_ext == ".vbprivk" |
| else ".vbpubk2" |
| ) |
| self._priv_ext = priv_ext |
| |
| # Validate input parameters. |
| if not self._name_re.match(name): |
| raise ValueError("Illegal value for name") |
| if not self._pub_ext_re.match(self._pub_ext): |
| raise ValueError("Illegal value for pub_ext") |
| if not self._priv_ext_re.match(priv_ext): |
| raise ValueError("Illegal value for priv_ext") |
| |
| self.public = os.path.join(keydir, name + self._pub_ext) |
| self.private = os.path.join(keydir, name + self._priv_ext) |
| |
| # As we create the keyblock name, strip '_data_key' from any name, since |
| # this is a common convention. |
| keyblock_name = "".join(name.split("_data_key")) |
| self.keyblock = os.path.join(keydir, keyblock_name + ".keyblock") |
| |
| def __eq__(self, other: Any) -> bool: |
| return ( |
| isinstance(other, KeyPair) |
| and self.name == other.name |
| and self.version == other.version |
| and self.public == other.public |
| and self.private == other.private |
| ) |
| |
| def Copy(self): |
| """Return a copy of ourselves.""" |
| return KeyPair( |
| self.name, self.keydir, self.version, self._pub_ext, self._priv_ext |
| ) |
| |
| @classmethod |
| def ParsePrivateKeyFilename(cls, file_name): |
| """Extract the name and extension from the filename. |
| |
| Args: |
| file_name: a filename that may or may not be a private key. Leading |
| directories are ignored. |
| |
| Returns: |
| None or re.MatchObject with named groups 'name' and 'ext'. |
| """ |
| basename = file_name.split("/")[-1] |
| return cls._priv_filename_re.match(basename) |
| |
| def Exists(self, require_public=False, require_private=False): |
| """Returns True if key exists on disk.""" |
| |
| has_public = os.path.exists(self.public) |
| has_private = os.path.exists(self.private) |
| |
| if require_public and not has_public: |
| return False |
| if require_private and not has_private: |
| return False |
| return has_public or has_private |
| |
| def KeyblockExists(self): |
| """Returns if keyblock exist.""" |
| return os.path.exists(self.keyblock) |
| |
| def GetSHA1sum(self): |
| """Returns key's sha1sum returns. |
| |
| Raises: |
| SignerKeyError: If error getting sha1sum from public key |
| RunCommandError: if vbutil_key fails |
| """ |
| res = cros_build_lib.run( |
| ["vbutil_key", "--unpack", self.public], |
| check=False, |
| encoding="utf-8", |
| stdout=True, |
| ) |
| |
| # Match line that looks like: 'Key sha1sum: <sha1sum>'. |
| match = re.search(r"Key sha1sum: +(\w+)", res.stdout) |
| if match: |
| return match.group(1) |
| else: |
| raise SignerKeyError("Unable to get sha1sum for %s" % self.public) |
| |
| |
| class KeyVersions: |
| """Manage key.versions file |
| |
| Attributes: |
| saved: True if the file on disk matches the contents of the instance. |
| None of the methods save the instance to disk automatically. |
| """ |
| |
| def __init__(self, filename) -> None: |
| self._versions = {} |
| self._path = filename |
| if os.path.exists(filename): |
| self.saved = True |
| # TODO(b/236161656): Fix. |
| # pylint: disable-next=consider-using-with |
| for line in open(filename, "r", encoding="utf-8").readlines(): |
| if line.find("=") > 0: |
| k, v = line.strip().split("=") |
| try: |
| v = int(v) |
| except ValueError: |
| # Keys that end with "_version" must be ints. |
| if k.endswith("_version"): |
| raise |
| self._versions[k] = v |
| # Make sure that 'name' is in the version dictionary. |
| if "name" not in self._versions: |
| logging.warning('%s lacks a name. Using "unknown".', filename) |
| self._versions["name"] = "unknown" |
| else: |
| self.saved = False |
| self._versions = { |
| "name": "unknown", |
| "firmware_key_version": 1, |
| "firmware_version": 1, |
| "kernel_key_version": 1, |
| "kernel_version": 1, |
| } |
| # Caller is responsible for calling Save() |
| |
| def _KeyName(self, key): |
| """return the correct name to use when looking up version.""" |
| # If an entry exists with the given name, then return that. |
| if key in self._versions: |
| return key |
| # We want to be idempotent. |
| if key.endswith("_version"): |
| key = key[:-8] |
| # Strip anything after a '.', since those are root-of-trust names. |
| key = key.split(".")[0] |
| # Strip off any _data_key ending. |
| if key.endswith("_data_key"): |
| key = key[:-9] |
| return key + "_version" |
| |
| def Get(self, key, default=None): |
| """Get a key's version, return default if unknown.""" |
| return self._versions.get(self._KeyName(key), default) |
| |
| def Set(self, key, version) -> None: |
| """Set a key's version. Caller is responsible for calling Save().""" |
| key = self._KeyName(key) |
| # If it converts to an int, we want the int. |
| try: |
| version = int(version) |
| except ValueError: |
| pass |
| self._versions[key] = version |
| self.saved = False |
| # Caller is responsible for calling Save() |
| |
| def Increment(self, key): |
| """Increment key's version. Caller is responsible for calling Save(). |
| |
| Args: |
| key: name of the key. |
| |
| Returns: |
| Incremented version. |
| |
| Raises: |
| VersionOverflowError: version is 16 bit, and incrementing would |
| cause overflow. |
| """ |
| key = self._KeyName(key) |
| if self._versions[key] == 0xFFFF: |
| raise VersionOverflowError("%s version overflow" % key) |
| self._versions[key] += 1 |
| self.saved = False |
| return self._versions[key] |
| # Caller is responsible for calling Save() |
| |
| def Save(self) -> None: |
| """Save KeyVersions to disk if needed.""" |
| if self.saved: |
| return |
| keys = sorted(self._versions.keys()) |
| if "name" in keys: |
| keys.remove("name") |
| keys.insert(0, "name") |
| lines = ["%s=%s" % (k, str(self._versions[k])) for k in keys] |
| contents = "\n".join(lines) + "\n" |
| osutils.WriteFile(self._path, contents) |
| self.saved = True |
| |
| |
| class Keyset: |
| """Store signer keys and keyblocks (think keychain). |
| |
| A Keyset is the collection of KeyPairs needed to work with a specific Build |
| Target image. |
| This includes both keys shared by the Build (self.keys): |
| - installer_kernel_data_key |
| - kernel_data_key |
| - kernel_subkey |
| - recovery |
| - recovery_kernel_data_key |
| as well as keys specific to the artifact (self._root_of_trust_keys): |
| - root_key |
| - firmware_data_key |
| |
| Attributes: |
| keys: dict of keypairs, indexed on key's name |
| root_of_trust_map: dict of root_of_trust alias (e.g., 'loem1') to use |
| for each root_of_trust name (e.g. 'ACME'). Keys in the table are |
| both root_of_trust names and root_of_trust aliases, so that |
| root_of_trust_map[root_of_trust_map[root_of_trust_name]] works. |
| """ |
| |
| # If we have a root_of_trust name, it is of the form 'name.root_of_trust', |
| # so we will simply split('.') the name to get the components. If this is a |
| # unified root_of_trust, then there are per-root_of_trust keys, and |
| # self.root_of_trust_key_prefixes will be set to this. |
| _root_of_trust_key_names = set(("firmware_data_key", "root_key")) |
| |
| def __init__(self, key_dir=None) -> None: |
| """Initialize the Keyset from key_dir, if given. |
| |
| Note: every public key and keyblock must have an accompanying private |
| key. |
| |
| Args: |
| key_dir: directory from which to load Keyset. [default=None] |
| """ |
| self.keys = {} |
| self.key_dir = key_dir |
| self.root_of_trust_map = {} |
| self._root_of_trust_key_prefixes = set() |
| self._root_of_trust_keys = collections.defaultdict(dict) |
| self.name = "unknown" |
| if key_dir and os.path.exists(key_dir): |
| # Get all root_of_trust aliases. The legacy code base refers to |
| # 'root_of_trust' as 'loem'. We need to support the on-disk |
| # structures which have a table of 'XX = ALIAS', with the implied |
| # name 'loemXX'. |
| loem_config_filename = os.path.join(key_dir, "loem.ini") |
| if os.path.exists(loem_config_filename): |
| logging.debug("Reading loem.ini file") |
| loem_config = configparser.ConfigParser() |
| if loem_config.read(loem_config_filename): |
| if loem_config.has_section("loem"): |
| self._root_of_trust_key_prefixes = ( |
| self._root_of_trust_key_names |
| ) |
| for idx, loem in loem_config.items("loem"): |
| alias = "loem" + idx |
| logging.debug( |
| "Adding loem alias %s %s", loem, alias |
| ) |
| self.root_of_trust_map[loem] = alias |
| # We also want loemXX to point to loemXX, since our |
| # callers tend to use both name and alias |
| # interchangeably. |
| # TODO(lamontjones) evaluate whether or not we |
| # should force it to be indexed by only name, |
| # instead of both. |
| self.root_of_trust_map[alias] = alias |
| else: |
| logging.warning("Error reading loem.ini file") |
| |
| versions_filename = os.path.join(key_dir, "key.versions") |
| self._versions = KeyVersions(versions_filename) |
| self.name = self._versions.Get("name") |
| if self.name is None: |
| logging.warning("key.versions does not set name.") |
| |
| # Match any private key file name |
| # Ex: firmware_data_key.loem4.vbprivk, kernel_subkey.vbprivk |
| for f_name in os.listdir(key_dir): |
| match = KeyPair.ParsePrivateKeyFilename(f_name) |
| if match: |
| key_name = match.group("name") |
| if key_name not in self.keys: |
| logging.debug("Found new key %s", key_name) |
| key = KeyPair( |
| key_name, |
| key_dir, |
| version=self._versions.Get(key_name, 1), |
| priv_ext=match.group("ext"), |
| ) |
| # AddKey will detect whether this is a |
| # root_of_trust-specific key and do the right thing. |
| self.AddKey(key) |
| |
| def __eq__(self, other: Any) -> bool: |
| return ( |
| isinstance(other, Keyset) |
| and self.root_of_trust_map == other.root_of_trust_map |
| and self.keys == other.keys |
| ) |
| |
| def Prune(self) -> None: |
| """Check that all keys exists, else remove them.""" |
| for k in list(self.keys): |
| if not self.keys[k].Exists(): |
| self.keys.pop(k) |
| for root_of_trust in list(self._root_of_trust_keys): |
| for k in list(self._root_of_trust_keys[root_of_trust]): |
| if not self._root_of_trust_keys[root_of_trust][k].Exists(): |
| self._root_of_trust_keys[root_of_trust].pop(k) |
| |
| def AddKey(self, key) -> None: |
| """Add key to Keyset. |
| |
| Args: |
| key: The KeyPair to add. key.name is checked to see if it is |
| root_of_trust-specific, and the correct group is used. |
| """ |
| if "." in key.name: |
| key_name, root_of_trust_name = key.name.split(".") |
| # Some legacy keyfiles have .vN.vprivk suffixes, even though they |
| # are not root_of_trust keys. (They are backup keys for older |
| # versions of the key.) Restricting the root_of_trust_keys to those |
| # in _root_of_trust_key_prefixes helps with that. |
| if key_name in self._root_of_trust_key_prefixes: |
| logging.debug( |
| "Found root_of_trust %s.%s", key_name, root_of_trust_name |
| ) |
| self.AddRootOfTrustKey(key_name, root_of_trust_name, key) |
| return |
| self.keys[key.name] = key |
| |
| def AddRootOfTrustKey(self, key_name, root_of_trust_alias, key) -> None: |
| """Attach the root_of_trust-specific key to the base key.""" |
| # _root_of_trust_keys['loem2']['root_key'] = KeyPair( |
| # 'root_key.loem2', ...) |
| self._root_of_trust_keys[root_of_trust_alias][key_name] = key |
| |
| def KeyExists(self, key_name, require_public=False, require_private=False): |
| """Returns if key is in Keyset and exists. |
| |
| If this Keyset has root_of_trust-specific keys, then |
| root_of_trust-specific keys will only be found if GetBuildKeyset() has |
| been called to get the root_of_trust-specific Keyset. |
| """ |
| return key_name in self.keys and self.keys[key_name].Exists( |
| require_public=require_public, require_private=require_private |
| ) |
| |
| def KeyblockExists(self, key_name): |
| """Returns if keyblock exists |
| |
| If this Keyset has root_of_trust-specific keys, then keyblocks for |
| root_of_trust-specific keys will only be found if GetBuildKeyset() has |
| been called to get the root_of_trust-specific Keyset. |
| """ |
| return key_name in self.keys and self.keys[key_name].KeyblockExists() |
| |
| def GetRootOfTrustKeys(self, key_name): |
| """Get root_of_trust-specific keys by keyname. |
| |
| Args: |
| key_name: name of root_of_trust-specific key. e.g., 'root_key' |
| |
| Returns: |
| dict of root_of_trust_alias: key |
| """ |
| ret = {} |
| for k, v in self._root_of_trust_keys.items(): |
| if key_name in v: |
| ret[k] = v[key_name] |
| if key_name in self.keys: |
| ret[key_name] = self.keys[key_name] |
| return ret |
| |
| def GetBuildKeyset(self, root_of_trust_name): |
| """Get new Keyset containing keys based on the root_of_trust_name given. |
| |
| The following keys are included: |
| * This root_of_trust's root_of_trust-specific keys |
| * Any non-root_of_trust-specific keys. |
| |
| Args: |
| root_of_trust_name: either the root_of_trust name (e.g., 'acme') or |
| alias (e.g., 'loem1'). |
| |
| Raises: |
| SignerRootOfTrustKeyMissingError if subkey not found |
| """ |
| ks = Keyset() |
| |
| found = False |
| |
| # Use alias if exists |
| root_of_trust_alias = self.root_of_trust_map.get(root_of_trust_name, "") |
| |
| for key in self.keys.values(): |
| ks.AddKey(key) |
| for key in self._root_of_trust_keys[root_of_trust_alias].values(): |
| found = True |
| ks.AddKey(key) |
| # Also add the key as its base name. |
| key = key.Copy() |
| key.name = key.name.split(".")[0] |
| ks.AddKey(key) |
| |
| if not found: |
| raise SignerRootOfTrustKeyMissingError( |
| "Unable to find %s" % (root_of_trust_name,) |
| ) |
| |
| return ks |