blob: 50ee753eff1d7e60463d67a0a1a6330f114083c9 [file] [log] [blame]
# Copyright 2018 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""ChromeOS encryption/decryption key management"""
import collections
import configparser
import logging
import os
import re
from chromite.lib import cros_build_lib
from chromite.lib import osutils
class SignerKeyError(Exception):
"""Raise when there in an error related to keys"""
class SignerKeyMissingError(SignerKeyError):
"""Raise if key is missing from subset"""
class SignerRootOfTrustKeyMissingError(SignerKeyMissingError):
"""Raise if a root_of_trust-specific key is missing"""
class VersionOverflowError(SignerKeyError):
"""Raise if incrementing a version would overflow 16 bits"""
class KeyPair:
"""Container for a key's files.
A KeyPair contains the information about a particular public/private pair of
keys, including file names, and version.
Attributes:
name: name of keypair
keydir: location of key files
version: version of key
public: public key file complete path
private: private key file complete path
keyblock: keyblock file complete path
"""
# On disk, we have (valid) file names like:
# - firmware_data_key.vbprivk -- non-unified build
# - firmware_data_key.loem1.vbprivk -- unified build
# - kernel_data_key.vbprivk
# - kernel_subkey.vbprivk
# - key_ec_efs.vbprik2 (which pairs with .vbpubk2, instead of .vbpubk)
# All the following regular expressions are used to validate names and
# extensions. From KeyPair's perspective, a name is simply a string of 2 or
# more alphanumeric characters, possibly containing a single '.' somewhere
# in the middle, with one of two valid extensions ('.vbprik2' or
# '.vbprivk').
_name_re = re.compile(r"\w+\.?\w+$")
# Valid key file name endings:
# - .vbprivk (pairs with .vbpubk)
# - .vbprik2 (pairs with .vbpubk2)
_priv_ext_re = re.compile(r"\.vbpri(vk|k2)$")
_pub_ext_re = re.compile(r"\.vbpubk2?$")
# This is used by ParsePrivateKeyFilename to recognize a valid private key
# file name. Keyset initialization only creates KeyPairs for valid private
# keys that it finds in the directory.
_priv_filename_re = re.compile(
r"(?P<name>\w+\.?\w+)(?P<ext>\.vbpri(?:vk|k2))$"
)
def __init__(
self, name, keydir, version=1, pub_ext=None, priv_ext=".vbprivk"
):
"""Initialize KeyPair.
Args:
name: name of key
keydir: directory containing key files
version: version of the key (forced to int)
pub_ext: file extension used for public key
priv_ext: file extension used for private key
"""
self.name = name
self.keydir = keydir
if version is None:
version = 1
self.version = int(version)
# Use the correct version for pub_ext if they did not specify.
self._pub_ext = (
pub_ext
if pub_ext
else ".vbpubk"
if priv_ext == ".vbprivk"
else ".vbpubk2"
)
self._priv_ext = priv_ext
# Validate input parameters.
if not self._name_re.match(name):
raise ValueError("Illegal value for name")
if not self._pub_ext_re.match(self._pub_ext):
raise ValueError("Illegal value for pub_ext")
if not self._priv_ext_re.match(priv_ext):
raise ValueError("Illegal value for priv_ext")
self.public = os.path.join(keydir, name + self._pub_ext)
self.private = os.path.join(keydir, name + self._priv_ext)
# As we create the keyblock name, strip '_data_key' from any name, since
# this is a common convention.
keyblock_name = "".join(name.split("_data_key"))
self.keyblock = os.path.join(keydir, keyblock_name + ".keyblock")
def __eq__(self, other):
return (
isinstance(other, KeyPair)
and self.name == other.name
and self.version == other.version
and self.public == other.public
and self.private == other.private
)
def Copy(self):
"""Return a copy of ourselves."""
return KeyPair(
self.name, self.keydir, self.version, self._pub_ext, self._priv_ext
)
@classmethod
def ParsePrivateKeyFilename(cls, file_name):
"""Extract the name and extension from the filename.
Args:
file_name: a filename that may or may not be a private key. Leading
directories are ignored.
Returns:
None or re.MatchObject with named groups 'name' and 'ext'.
"""
basename = file_name.split("/")[-1]
return cls._priv_filename_re.match(basename)
def Exists(self, require_public=False, require_private=False):
"""Returns True if key exists on disk."""
has_public = os.path.exists(self.public)
has_private = os.path.exists(self.private)
if require_public and not has_public:
return False
if require_private and not has_private:
return False
return has_public or has_private
def KeyblockExists(self):
"""Returns if keyblock exist."""
return os.path.exists(self.keyblock)
def GetSHA1sum(self):
"""Returns key's sha1sum returns.
Raises:
SignerKeyError: If error getting sha1sum from public key
RunCommandError: if vbutil_key fails
"""
res = cros_build_lib.run(
["vbutil_key", "--unpack", self.public],
check=False,
encoding="utf-8",
stdout=True,
)
# Match line that looks like: 'Key sha1sum: <sha1sum>'.
match = re.search(r"Key sha1sum: +(\w+)", res.stdout)
if match:
return match.group(1)
else:
raise SignerKeyError("Unable to get sha1sum for %s" % self.public)
class KeyVersions:
"""Manage key.versions file
Attributes:
saved: True if the file on disk matches the contents of the instance.
None of the methods save the instance to disk automatically.
"""
def __init__(self, filename):
self._versions = {}
self._path = filename
if os.path.exists(filename):
self.saved = True
# TODO(b/236161656): Fix.
# pylint: disable-next=consider-using-with
for line in open(filename, "r", encoding="utf-8").readlines():
if line.find("=") > 0:
k, v = line.strip().split("=")
try:
v = int(v)
except ValueError:
# Keys that end with "_version" must be ints.
if k.endswith("_version"):
raise
self._versions[k] = v
# Make sure that 'name' is in the version dictionary.
if "name" not in self._versions:
logging.warning('%s lacks a name. Using "unknown".', filename)
self._versions["name"] = "unknown"
else:
self.saved = False
self._versions = {
"name": "unknown",
"firmware_key_version": 1,
"firmware_version": 1,
"kernel_key_version": 1,
"kernel_version": 1,
}
# Caller is responsible for calling Save()
def _KeyName(self, key):
"""return the correct name to use when looking up version."""
# If an entry exists with the given name, then return that.
if key in self._versions:
return key
# We want to be idempotent.
if key.endswith("_version"):
key = key[:-8]
# Strip anything after a '.', since those are root-of-trust names.
key = key.split(".")[0]
# Strip off any _data_key ending.
if key.endswith("_data_key"):
key = key[:-9]
return key + "_version"
def Get(self, key, default=None):
"""Get a key's version, return default if unknown."""
return self._versions.get(self._KeyName(key), default)
def Set(self, key, version):
"""Set a key's version. Caller is responsible for calling Save()."""
key = self._KeyName(key)
# If it converts to an int, we want the int.
try:
version = int(version)
except ValueError:
pass
self._versions[key] = version
self.saved = False
# Caller is responsible for calling Save()
def Increment(self, key):
"""Increment key's version. Caller is responsible for calling Save().
Args:
key: name of the key.
Returns:
Incremented version.
Raises:
VersionOverflowError: version is 16 bit, and incrementing would
cause overflow.
"""
key = self._KeyName(key)
if self._versions[key] == 0xFFFF:
raise VersionOverflowError("%s version overflow" % key)
self._versions[key] += 1
self.saved = False
return self._versions[key]
# Caller is responsible for calling Save()
def Save(self):
"""Save KeyVersions to disk if needed."""
if self.saved:
return
keys = sorted(self._versions.keys())
if "name" in keys:
keys.remove("name")
keys.insert(0, "name")
lines = ["%s=%s" % (k, str(self._versions[k])) for k in keys]
contents = "\n".join(lines) + "\n"
osutils.WriteFile(self._path, contents)
self.saved = True
class Keyset:
"""Store signer keys and keyblocks (think keychain).
A Keyset is the collection of KeyPairs needed to work with a specific Build
Target image.
This includes both keys shared by the Build (self.keys):
- installer_kernel_data_key
- kernel_data_key
- kernel_subkey
- recovery
- recovery_kernel_data_key
as well as keys specific to the artifact (self._root_of_trust_keys):
- root_key
- firmware_data_key
Attributes:
keys: dict of keypairs, indexed on key's name
root_of_trust_map: dict of root_of_trust alias (e.g., 'loem1') to use
for each root_of_trust name (e.g. 'ACME'). Keys in the table are
both root_of_trust names and root_of_trust aliases, so that
root_of_trust_map[root_of_trust_map[root_of_trust_name]] works.
"""
# If we have a root_of_trust name, it is of the form 'name.root_of_trust',
# so we will simply split('.') the name to get the components. If this is a
# unified root_of_trust, then there are per-root_of_trust keys, and
# self.root_of_trust_key_prefixes will be set to this.
_root_of_trust_key_names = set(("firmware_data_key", "root_key"))
def __init__(self, key_dir=None):
"""Initialize the Keyset from key_dir, if given.
Note: every public key and keyblock must have an accompanying private
key.
Args:
key_dir: directory from which to load Keyset. [default=None]
"""
self.keys = {}
self.key_dir = key_dir
self.root_of_trust_map = {}
self._root_of_trust_key_prefixes = set()
self._root_of_trust_keys = collections.defaultdict(dict)
self.name = "unknown"
if key_dir and os.path.exists(key_dir):
# Get all root_of_trust aliases. The legacy code base refers to
# 'root_of_trust' as 'loem'. We need to support the on-disk
# structures which have a table of 'XX = ALIAS', with the implied
# name 'loemXX'.
loem_config_filename = os.path.join(key_dir, "loem.ini")
if os.path.exists(loem_config_filename):
logging.debug("Reading loem.ini file")
loem_config = configparser.ConfigParser()
if loem_config.read(loem_config_filename):
if loem_config.has_section("loem"):
self._root_of_trust_key_prefixes = (
self._root_of_trust_key_names
)
for idx, loem in loem_config.items("loem"):
alias = "loem" + idx
logging.debug(
"Adding loem alias %s %s", loem, alias
)
self.root_of_trust_map[loem] = alias
# We also want loemXX to point to loemXX, since our
# callers tend to use both name and alias
# interchangeably.
# TODO(lamontjones) evaluate whether or not we
# should force it to be indexed by only name,
# instead of both.
self.root_of_trust_map[alias] = alias
else:
logging.warning("Error reading loem.ini file")
versions_filename = os.path.join(key_dir, "key.versions")
self._versions = KeyVersions(versions_filename)
self.name = self._versions.Get("name")
if self.name is None:
logging.warning("key.versions does not set name.")
# Match any private key file name
# Ex: firmware_data_key.loem4.vbprivk, kernel_subkey.vbprivk
for f_name in os.listdir(key_dir):
match = KeyPair.ParsePrivateKeyFilename(f_name)
if match:
key_name = match.group("name")
if key_name not in self.keys:
logging.debug("Found new key %s", key_name)
key = KeyPair(
key_name,
key_dir,
version=self._versions.Get(key_name, 1),
priv_ext=match.group("ext"),
)
# AddKey will detect whether this is a
# root_of_trust-specific key and do the right thing.
self.AddKey(key)
def __eq__(self, other):
return (
isinstance(other, Keyset)
and self.root_of_trust_map == other.root_of_trust_map
and self.keys == other.keys
)
def Prune(self):
"""Check that all keys exists, else remove them."""
for k in list(self.keys):
if not self.keys[k].Exists():
self.keys.pop(k)
for root_of_trust in list(self._root_of_trust_keys):
for k in list(self._root_of_trust_keys[root_of_trust]):
if not self._root_of_trust_keys[root_of_trust][k].Exists():
self._root_of_trust_keys[root_of_trust].pop(k)
def AddKey(self, key):
"""Add key to Keyset.
Args:
key: The KeyPair to add. key.name is checked to see if it is
root_of_trust-specific, and the correct group is used.
"""
if "." in key.name:
key_name, root_of_trust_name = key.name.split(".")
# Some legacy keyfiles have .vN.vprivk suffixes, even though they
# are not root_of_trust keys. (They are backup keys for older
# versions of the key.) Restricting the root_of_trust_keys to those
# in _root_of_trust_key_prefixes helps with that.
if key_name in self._root_of_trust_key_prefixes:
logging.debug(
"Found root_of_trust %s.%s", key_name, root_of_trust_name
)
self.AddRootOfTrustKey(key_name, root_of_trust_name, key)
return
self.keys[key.name] = key
def AddRootOfTrustKey(self, key_name, root_of_trust_alias, key):
"""Attach the root_of_trust-specific key to the base key."""
# _root_of_trust_keys['loem2']['root_key'] = KeyPair(
# 'root_key.loem2', ...)
self._root_of_trust_keys[root_of_trust_alias][key_name] = key
def KeyExists(self, key_name, require_public=False, require_private=False):
"""Returns if key is in Keyset and exists.
If this Keyset has root_of_trust-specific keys, then
root_of_trust-specific keys will only be found if GetBuildKeyset() has
been called to get the root_of_trust-specific Keyset.
"""
return key_name in self.keys and self.keys[key_name].Exists(
require_public=require_public, require_private=require_private
)
def KeyblockExists(self, key_name):
"""Returns if keyblock exists
If this Keyset has root_of_trust-specific keys, then keyblocks for
root_of_trust-specific keys will only be found if GetBuildKeyset() has
been called to get the root_of_trust-specific Keyset.
"""
return key_name in self.keys and self.keys[key_name].KeyblockExists()
def GetRootOfTrustKeys(self, key_name):
"""Get root_of_trust-specific keys by keyname.
Args:
key_name: name of root_of_trust-specific key. e.g., 'root_key'
Returns:
dict of root_of_trust_alias: key
"""
ret = {}
for k, v in self._root_of_trust_keys.items():
if key_name in v:
ret[k] = v[key_name]
if key_name in self.keys:
ret[key_name] = self.keys[key_name]
return ret
def GetBuildKeyset(self, root_of_trust_name):
"""Get new Keyset containing keys based on the root_of_trust_name given.
The following keys are included:
* This root_of_trust's root_of_trust-specific keys
* Any non-root_of_trust-specific keys.
Args:
root_of_trust_name: either the root_of_trust name (e.g., 'acme') or
alias (e.g., 'loem1').
Raises:
SignerRootOfTrustKeyMissingError if subkey not found
"""
ks = Keyset()
found = False
# Use alias if exists
root_of_trust_alias = self.root_of_trust_map.get(root_of_trust_name, "")
for key in self.keys.values():
ks.AddKey(key)
for key in self._root_of_trust_keys[root_of_trust_alias].values():
found = True
ks.AddKey(key)
# Also add the key as its base name.
key = key.Copy()
key.name = key.name.split(".")[0]
ks.AddKey(key)
if not found:
raise SignerRootOfTrustKeyMissingError(
"Unable to find %s" % (root_of_trust_name,)
)
return ks