blob: e4c304c7524db7489b6385bd552dfca2c16e7f4e [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""ChromeOS encryption/decryption key management"""
from __future__ import print_function
import ConfigParser
import os
import re
from chromite.lib import cros_logging as logging
class SignerKeyError(Exception):
"""Raise when there in an error related to keys"""
class SignerKeyMissingError(SignerKeyError):
"""Raise if key is missing from subset"""
class SignerSubkeyMissingError(SignerKeyMissingError):
"""Raise if a subkey is missing"""
class KeyPair(object):
"""Container for a key's files.
Attributes:
name: name of keypair
keydir: location of key files
version: version of key
subkeys: dictionary of sub keys
public: public key file complete path
private: private key file complete path
keyblock: keyblock file complete path
"""
def __init__(self, name, keydir, version=1, subkeys=(),
pub_ext='.vbpubk', priv_ext='.vbprivk'):
"""Initialize KeyPair.
Args:
name: name of key
keydir: directory containing key files
version: version of the key
subkeys: list of subkeys to create
pub_ext: file extension used for public key
priv_ext: file extension used for private key
"""
self.name = name
self.keydir = keydir
self.version = version
self._pub_ext = pub_ext
self._priv_ext = priv_ext
self.subkeys = {}
for subkey in subkeys:
self.AddSubkey(subkey)
self.public = os.path.join(keydir, name + pub_ext)
self.private = os.path.join(keydir, name + priv_ext)
#strip '_data_key' from any name, since this is a common convention
keyblock_name = ''.join(name.split('_data_key'))
self.keyblock = os.path.join(keydir, keyblock_name + '.keyblock')
def __eq__(self, other):
return (isinstance(other, KeyPair)
and self.name == other.name
and self.version == other.version
and self.public == other.public
and self.private == other.private
and self.subkeys == other.subkeys)
def AddSubkey(self, sub_id):
"""Add new Subkey with the given name."""
if sub_id in self.subkeys:
return
self.subkeys[sub_id] = KeyPair(self.name + '.' + sub_id,
self.keydir,
version=self.version,
pub_ext=self._pub_ext,
priv_ext=self._priv_ext)
def Exists(self, require_public=False, require_private=False):
"""Returns True if ether key or subkeys exists on disk."""
if self.subkeys:
for sub_key in self.subkeys.values():
if not sub_key.Exists(require_public=require_public,
require_private=require_private):
return False
return True
has_public = os.path.exists(self.public)
has_private = os.path.exists(self.private)
if require_public and not has_public:
return False
if require_private and not has_private:
return False
return has_public or has_private
def KeyblockExists(self):
"""Returns if keyblocks or subkeyblocks exist."""
if self.subkeys:
for sub_key in self.subkeys.values():
if not sub_key.KeyblockExists():
return False
return True
else:
return os.path.exists(self.keyblock)
class Keyset(object):
"""Store signer keys and keyblocks (think keychain).
Attributes:
keys: dict of keypairs, indexed on key's name
subkey_aliases: dict of alias to use for subkeys (i.e. loem -> board)
"""
def __init__(self):
self.keys = {}
self.subkey_aliases = {}
def __eq__(self, other):
return (isinstance(other, Keyset)
and self.subkey_aliases == other.subkey_aliases
and self.keys == other.keys)
def Prune(self):
"""Check that all keys exists, else remove them."""
for k, key in self.keys.items():
if not key.Exists():
self.keys.pop(k)
def AddKey(self, key, key_name=None):
"""Add the given key to keyset, using key.name if key_name is None."""
key_name = key_name if key_name else key.name
self.keys[key_name] = key
def AddSubkey(self, key_name, subkey):
"""Add subkey to key_name, using its alias if available."""
subkey = (self.subkey_aliases[subkey] if subkey in self.subkey_aliases
else subkey)
self.keys[key_name].AddSubkey(subkey)
def KeyExists(self, key_name, require_public=False, require_private=False):
"""Returns if key is in keyset and exists."""
return (key_name in self.keys and
self.keys[key_name].Exists(require_public=require_public,
require_private=require_private))
def KeyblockExists(self, key_name):
"""Returns if keyblock exists"""
return (key_name in self.keys and
self.keys[key_name].KeyblockExists())
def GetSubKeyset(self, subkey_name):
"""Returns new keyset containing keys based on the subkey_name given.
Keys are added based on the following rules:
* Keys does not have a subkey of the given name
* Subkey exists for the given name, or alias. Key will be indexed under
it's parent key's name. ex: 'firmware.loem1' -> 'firmware'
Raises SubkeyMissingError if subkey not found
"""
ks = Keyset()
found = False
# Use alias if exists
subkey_alias = self.subkey_aliases.get(subkey_name, '')
for key_name, key in self.keys.items():
if subkey_alias in key.subkeys:
found = True
ks.AddKey(key.subkeys[subkey_alias], key_name=key_name)
elif subkey_name in key.subkeys:
found = True
ks.AddKey(key.subkeys[subkey_name], key_name=key_name)
else:
ks.AddKey(key, key_name=key_name)
if not found:
raise SignerSubkeyMissingError("Unable to find %s", subkey_name)
return ks
def KeysetFromDir(key_dir):
"""Returns a populated keyset generated from given directory.
Note: every public key and keyblock must have an accompanying private key
"""
ks = Keyset()
# Get all loem subkey mappings
loem_config_filename = os.path.join(key_dir, 'loem.ini')
if os.path.exists(loem_config_filename):
logging.info("Reading loem.ini file")
loem_config = ConfigParser.ConfigParser()
if loem_config.read(loem_config_filename):
if loem_config.has_section('loem'):
for loem_id in loem_config.options('loem'):
loem_board = loem_config.get('loem', loem_id)
loem_alias = 'loem' + loem_id
logging.debug('Adding key alias %s %s', loem_board, loem_alias)
ks.subkey_aliases[loem_board] = 'loem' + loem_id
else:
logging.warning("Error reading loem.ini file")
# TODO (chingcodes): add versions from file - needed for keygen
# Match any private key file name
# Ex: firmware_data_key.loem4.vbprivk, kernel_subkey.vbprivk
keypair_re = re.compile(r'(?P<name>\w+)(\.(?P<subkey>\w+))?\.vbprivk')
for f_name in os.listdir(key_dir):
key_match = keypair_re.match(f_name)
if key_match:
key_name = key_match.group('name')
if key_name not in ks.keys:
logging.debug('Found new key %s', key_name)
ks.AddKey(KeyPair(key_name, key_dir, version=1))
subkey = key_match.group('subkey')
if subkey:
logging.debug('Found subkey %s.%s', key_name, subkey)
ks.AddSubkey(key_name, subkey)
elif f_name == 'key_ec_efs.vbprik2':
ks.AddKey(KeyPair('key_ec_efs', key_dir,
pub_ext='.vbpubk2', priv_ext='.vbprivk2'))
return ks