blob: a421eefdec5ab3e6989d144d5f97fa768392cc85 [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""ChromeOS encryption/decryption key management"""
from __future__ import print_function
import collections
import os
import re
from six.moves import configparser
from chromite.lib import cros_build_lib
from chromite.lib import cros_logging as logging
from chromite.lib import osutils
class SignerKeyError(Exception):
"""Raise when there in an error related to keys"""
class SignerKeyMissingError(SignerKeyError):
"""Raise if key is missing from subset"""
class SignerRootOfTrustKeyMissingError(SignerKeyMissingError):
"""Raise if a root_of_trust-specific key is missing"""
class VersionOverflowError(SignerKeyError):
"""Raise if incrementing a version would overflow 16 bits"""
class KeyPair(object):
"""Container for a key's files.
A KeyPair contains the information about a particular public/private pair of
keys, including file names, and version.
Attributes:
name: name of keypair
keydir: location of key files
version: version of key
public: public key file complete path
private: private key file complete path
keyblock: keyblock file complete path
"""
# On disk, we have (valid) file names like:
# - firmware_data_key.vbprivk -- non-unified build
# - firmware_data_key.loem1.vbprivk -- unified build
# - kernel_data_key.vbprivk
# - kernel_subkey.vbprivk
# - key_ec_efs.vbprik2 (which pairs with .vbpubk2, instead of .vbpubk)
# All of the following regular expressions are used to validate names and
# extensions. From KeyPair's perspective, a name is simply a string of 2 or
# more alphanumeric characters, possibly containing a single '.' somewhere in
# the middle, with one of two valid extensions ('.vbprik2' or '.vbprivk').
_name_re = re.compile(r'\w+\.?\w+$')
# Valid key file name endings:
# - .vbprivk (pairs with .vbpubk)
# - .vbprik2 (pairs with .vbpubk2)
_priv_ext_re = re.compile(r'\.vbpri(vk|k2)$')
_pub_ext_re = re.compile(r'\.vbpubk2?$')
# This is used by ParsePrivateKeyFilename to recognize a valid private key
# file name. Keyset initialization only creates KeyPairs for valid private
# keys that it finds in the directory.
_priv_filename_re = re.compile(
r'(?P<name>\w+\.?\w+)(?P<ext>\.vbpri(?:vk|k2))$')
def __init__(self, name, keydir, version=1,
pub_ext=None, priv_ext='.vbprivk'):
"""Initialize KeyPair.
Args:
name: name of key
keydir: directory containing key files
version: version of the key (forced to int)
pub_ext: file extension used for public key
priv_ext: file extension used for private key
"""
self.name = name
self.keydir = keydir
if version is None:
version = 1
self.version = int(version)
# Use the correct version for pub_ext if they did not specify.
self._pub_ext = (pub_ext if pub_ext else
'.vbpubk' if priv_ext == '.vbprivk' else
'.vbpubk2')
self._priv_ext = priv_ext
# Validate input parameters.
if not self._name_re.match(name):
raise ValueError('Illegal value for name')
if not self._pub_ext_re.match(self._pub_ext):
raise ValueError('Illegal value for pub_ext')
if not self._priv_ext_re.match(priv_ext):
raise ValueError('Illegal value for priv_ext')
self.public = os.path.join(keydir, name + self._pub_ext)
self.private = os.path.join(keydir, name + self._priv_ext)
# As we create the keyblock name, strip '_data_key' from any name, since
# this is a common convention.
keyblock_name = ''.join(name.split('_data_key'))
self.keyblock = os.path.join(keydir, keyblock_name + '.keyblock')
def __eq__(self, other):
return (isinstance(other, KeyPair)
and self.name == other.name
and self.version == other.version
and self.public == other.public
and self.private == other.private)
def Copy(self):
"""Return a copy of ourselves."""
return KeyPair(
self.name, self.keydir, self.version, self._pub_ext, self._priv_ext)
@classmethod
def ParsePrivateKeyFilename(cls, file_name):
"""Extract the name and extension from the filename.
Args:
file_name: a filename that may or may not be a private key. Leading
directories are ignored.
Returns:
None or re.MatchObject with named groups 'name' and 'ext'.
"""
basename = file_name.split('/')[-1]
return cls._priv_filename_re.match(basename)
def Exists(self, require_public=False, require_private=False):
"""Returns True if key exists on disk."""
has_public = os.path.exists(self.public)
has_private = os.path.exists(self.private)
if require_public and not has_public:
return False
if require_private and not has_private:
return False
return has_public or has_private
def KeyblockExists(self):
"""Returns if keyblock exist."""
return os.path.exists(self.keyblock)
def GetSHA1sum(self):
"""Returns key's sha1sum returns.
Raises:
SignerKeyError: If error getting sha1sum from public key
RunCommandError: if vbutil_key fails
"""
res = cros_build_lib.run(['vbutil_key', '--unpack', self.public],
check=False, encoding='utf-8', stdout=True)
# Match line that looks like: 'Key sha1sum: <sha1sum>'.
match = re.search(r'Key sha1sum: +(\w+)', res.output)
if match:
return match.group(1)
else:
raise SignerKeyError('Unable to get sha1sum for %s' % self.public)
class KeyVersions(object):
"""Manage key.versions file
Attributes:
saved: True if the file on disk matches the contents of the instance. None
of the methods save the instance to disk automatically.
"""
def __init__(self, filename):
self._versions = {}
self._path = filename
if os.path.exists(filename):
self.saved = True
for line in open(filename, 'r').readlines():
if line.find('=') > 0:
k, v = line.strip().split('=')
try:
v = int(v)
except ValueError:
# Keys that end with "_version" must be ints.
if k.endswith('_version'):
raise
self._versions[k] = v
# Make sure that 'name' is in the version dictionary.
if 'name' not in self._versions:
logging.warning('%s lacks a name. Using "unknown".', filename)
self._versions['name'] = 'unknown'
else:
self.saved = False
self._versions = {
'name': 'unknown',
'firmware_key_version': 1,
'firmware_version': 1,
'kernel_key_version': 1,
'kernel_version': 1}
# Caller is resonsible for calling Save()
def _KeyName(self, key):
"""return the correct name to use when looking up version."""
# If an entry exists with the given name, then return that.
if key in self._versions:
return key
# We want to be idempotent.
if key.endswith('_version'):
key = key[:-8]
# Strip anything after a '.', since those are root-of-trust names.
key = key.split('.')[0]
# Strip off any _data_key ending.
if key.endswith('_data_key'):
key = key[:-9]
return key + '_version'
def Get(self, key, default=None):
"""Get a key's version, return default if unknown."""
return self._versions.get(self._KeyName(key), default)
def Set(self, key, version):
"""Set a key's version. Caller is responsible for calling Save()."""
key = self._KeyName(key)
# If it converts to an int, we want the int.
try:
version = int(version)
except ValueError:
pass
self._versions[key] = version
self.saved = False
# Caller is resonsible for calling Save()
def Increment(self, key):
"""Increment key's version. Caller is responsible for calling Save().
Args:
key: name of the key.
Returns:
Incremented version.
Raises:
VersionOverflowError: version is 16 bit, and incrementing would cause
overflow.
"""
key = self._KeyName(key)
if self._versions[key] == 0xffff:
raise VersionOverflowError('%s version overflow' % key)
self._versions[key] += 1
self.saved = False
return self._versions[key]
# Caller is resonsible for calling Save()
def Save(self):
"""Save KeyVersions to disk if needed."""
if self.saved:
return
keys = sorted(self._versions.keys())
if 'name' in keys:
keys.remove('name')
keys.insert(0, 'name')
lines = ['%s=%s' % (k, str(self._versions[k])) for k in keys]
contents = '\n'.join(lines) + '\n'
osutils.WriteFile(self._path, contents)
self.saved = True
class Keyset(object):
"""Store signer keys and keyblocks (think keychain).
A Keyset is the collection of KeyPairs needed to work with a specific Build
Target image.
This includes both keys shared by the Build (self.keys):
- installer_kernel_data_key
- kernel_data_key
- kernel_subkey
- recovery
- recovery_kernel_data_key
as well as keys specific to the artifact (self._root_of_trust_keys):
- root_key
- firmware_data_key
Attributes:
keys: dict of keypairs, indexed on key's name
root_of_trust_map: dict of root_of_trust alias (e.g., 'loem1') to use for
each root_of_trust name (e.g. 'ACME'). Keys in the table are both
root_of_trust names and root_of_trust aliases, so that
root_of_trust_map[root_of_trust_map[root_of_trust_name]] works.
"""
# If we have a root_of_trust name, it is of the form 'name.root_of_trust', so
# we will simply split('.') the name to get the components. If this is a
# unified root_of_trust, then there are per-root_of_trust keys, and
# self.root_of_trust_key_prefixes will be set to this.
_root_of_trust_key_names = set(('firmware_data_key', 'root_key'))
def __init__(self, key_dir=None):
"""Initialize the Keyset from key_dir, if given.
Args:
key_dir: directory from which to load Keyset. [default=None]
Note: every public key and keyblock must have an accompanying private key
"""
self.keys = {}
self.key_dir = key_dir
self.root_of_trust_map = {}
self._root_of_trust_key_prefixes = set()
self._root_of_trust_keys = collections.defaultdict(dict)
self.name = 'unknown'
if key_dir and os.path.exists(key_dir):
# Get all root_of_trust aliases. The legacy code base refers to
# 'root_of_trust' as 'loem'. We need to support the on-disk structures
# which have a table of 'XX = ALIAS', with the implied name 'loemXX'.
loem_config_filename = os.path.join(key_dir, 'loem.ini')
if os.path.exists(loem_config_filename):
logging.debug('Reading loem.ini file')
loem_config = configparser.ConfigParser()
if loem_config.read(loem_config_filename):
if loem_config.has_section('loem'):
self._root_of_trust_key_prefixes = self._root_of_trust_key_names
for idx, loem in loem_config.items('loem'):
alias = 'loem' + idx
logging.debug('Adding loem alias %s %s', loem, alias)
self.root_of_trust_map[loem] = alias
# We also want loemXX to point to loemXX, since our callers tend
# to use both name and alias interchangably.
# TODO(lamontjones) evaluate whether or not we should force it to
# be indexed by only name, instead of both.
self.root_of_trust_map[alias] = alias
else:
logging.warning('Error reading loem.ini file')
versions_filename = os.path.join(key_dir, 'key.versions')
self._versions = KeyVersions(versions_filename)
self.name = self._versions.Get('name')
if self.name is None:
logging.warning('key.versions does not set name.')
# Match any private key file name
# Ex: firmware_data_key.loem4.vbprivk, kernel_subkey.vbprivk
for f_name in os.listdir(key_dir):
match = KeyPair.ParsePrivateKeyFilename(f_name)
if match:
key_name = match.group('name')
if key_name not in self.keys:
logging.debug('Found new key %s', key_name)
key = KeyPair(
key_name,
key_dir,
version=self._versions.Get(key_name, 1),
priv_ext=match.group('ext'))
# AddKey will detect whether or not this is a root_of_trust-specific
# key and do the right thing.
self.AddKey(key)
def __eq__(self, other):
return (isinstance(other, Keyset)
and self.root_of_trust_map == other.root_of_trust_map
and self.keys == other.keys)
def Prune(self):
"""Check that all keys exists, else remove them."""
for k in list(self.keys):
if not self.keys[k].Exists():
self.keys.pop(k)
for root_of_trust in list(self._root_of_trust_keys):
for k in list(self._root_of_trust_keys[root_of_trust]):
if not self._root_of_trust_keys[root_of_trust][k].Exists():
self._root_of_trust_keys[root_of_trust].pop(k)
def AddKey(self, key):
"""Add key to Keyset.
Args:
key: The KeyPair to add. key.name is checked to see if it is
root_of_trust-specific, and the correct group is used.
"""
if '.' in key.name:
key_name, root_of_trust_name = key.name.split('.')
# Some of the legacy keyfiles have .vN.vprivk suffixes, even though they
# are not root_of_trust keys. (They are backup keys for older versions of
# the key.) Restricting the root_of_trust_keys to those in
# _root_of_trust_key_prefixes helps with that.
if key_name in self._root_of_trust_key_prefixes:
logging.debug('Found root_of_trust %s.%s', key_name, root_of_trust_name)
self.AddRootOfTrustKey(key_name, root_of_trust_name, key)
return
self.keys[key.name] = key
def AddRootOfTrustKey(self, key_name, root_of_trust_alias, key):
"""Attach the root_of_trust-specific key to the base key."""
# _root_of_trust_keys['loem2']['root_key'] = KeyPair('root_key.loem2', ...)
self._root_of_trust_keys[root_of_trust_alias][key_name] = key
def KeyExists(self, key_name, require_public=False, require_private=False):
"""Returns if key is in Keyset and exists.
If this Keyset has root_of_trust-specific keys, then root_of_trust-specific
keys will only be found if GetBuildKeyset() has been called to get the
root_of_trust-specific Keyset.
"""
return (key_name in self.keys and
self.keys[key_name].Exists(require_public=require_public,
require_private=require_private))
def KeyblockExists(self, key_name):
"""Returns if keyblock exists
If this Keyset has root_of_trust-specific keys, then keyblocks for
root_of_trust-specific keys will only be found if GetBuildKeyset() has been
called to get the root_of_trust-specific Keyset.
"""
return (key_name in self.keys and
self.keys[key_name].KeyblockExists())
def GetRootOfTrustKeys(self, key_name):
"""Get root_of_trust-specific keys by keyname.
Args:
key_name: name of root_of_trust-specific key. e.g., 'root_key'
Returns:
dict of root_of_trust_alias: key
"""
ret = {}
for k, v in self._root_of_trust_keys.items():
if key_name in v:
ret[k] = v[key_name]
if key_name in self.keys:
ret[key_name] = self.keys[key_name]
return ret
def GetBuildKeyset(self, root_of_trust_name):
"""Returns new Keyset containing keys based on the root_of_trust_name given.
The following keys are included:
* This root_of_trust's root_of_trust-specific keys
* Any non-root_of_trust-specific keys.
Args:
root_of_trust_name: either the root_of_trust name (e.g., 'acme') or alias
(e.g., 'loem1').
Raises SignerRootOfTrustKeyMissingError if subkey not found
"""
ks = Keyset()
found = False
# Use alias if exists
root_of_trust_alias = self.root_of_trust_map.get(root_of_trust_name, '')
for key in self.keys.values():
ks.AddKey(key)
for key in self._root_of_trust_keys[root_of_trust_alias].values():
found = True
ks.AddKey(key)
# Also add the key as its base name.
key = key.Copy()
key.name = key.name.split('.')[0]
ks.AddKey(key)
if not found:
raise SignerRootOfTrustKeyMissingError(
'Unable to find %s' % (root_of_trust_name,))
return ks