blob: 154865e8978b4784bad66f84eb4752a1909386f3 [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright 2019 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Import keys from legacy signer config."""
from __future__ import print_function
import os
from six.moves import configparser
import yaml # pylint: disable=import-error
from chromite.lib import commandline
from chromite.lib import cros_logging as logging
from chromite.lib import osutils
from chromite.signing.lib import keys
METADATA_VERSION = 0
PROD_DIR = '/cros'
BASE_DIR = os.path.join(PROD_DIR, 'v2-keys')
AU_KEYDIR = 'update-payload-key'
class Error(Exception):
"""Base error class."""
class KeyimportError(Error):
"""Key Import Errors."""
class KeyringData(object):
"""A collection of keyset information."""
def __init__(self, prod_dir, base_dir, options, signer_config):
self.prod_dir = prod_dir
self.base_dir = base_dir
self.options = options
self.signer_config = signer_config
self.prod_keys = os.path.join(prod_dir, 'keys')
self.public = os.path.join(base_dir, 'public')
self.private = '../private'
self.configs = '../config'
self.contents_yaml = os.path.join(self.base_dir, 'contents.yaml')
self.keysets = self._ReadRepoYaml()
def _ReadRepoYaml(self):
"""Read the directory of active keysets in the repo."""
default = {'metadata-version': METADATA_VERSION}
if os.path.exists(self.contents_yaml):
with open(self.contents_yaml) as fp:
ret = yaml.load(fp)
if not ret or 'metadata-version' not in ret:
logging.error('%s: no metadata-version.', self.contents_yaml)
raise KeyimportError('metadata-version missing.')
if ret['metadata-version'] != METADATA_VERSION:
logging.error('%s: metadata-version %s', self.contents_yaml,
ret['metadata-version'])
raise KeyimportError('metadata-version bad.')
return ret
else:
return default
def WriteRepoYaml(self):
"""Write repo.yaml."""
self._WriteConfig(self.contents_yaml, self.keysets)
def _WriteConfig(self, filename, config):
"""Write the config
Args:
filename: path to file.
config: keyset dictionary.
"""
content = yaml.dump(config, default_flow_style=False)
osutils.WriteFile(filename, content)
def ImportKeyset(self, setname, directory):
"""Import a Keyset.
Args:
setname: Friendly setname (e.g., sarien-mp-v3)
directory: Directory where keyset is stored. (e.g., SarienMPKeys-v3)
Returns:
True if all files were processed.
"""
config_file = os.path.join(self.configs, '%s.yaml' % setname)
if os.path.exists(config_file):
if self.options.new_only:
logging.debug('Skipping existing keyset %s (%s).', setname, directory)
return False
if self.options.update:
osutils.SafeUnlink(config_file)
else:
raise KeyimportError('%s.yaml already exists' % setname)
logging.info('Processing keyset %s (%s):', setname, directory)
set_config = {
'name': setname,
'metadata-version': METADATA_VERSION,
'directory': directory,
}
if setname.startswith('test-keys-'):
group = 'test-keysets'
elif '-premp' in setname:
group = 'premp-keysets'
elif '-mp' in setname:
group = 'mp-keysets'
# These two are special case.
elif setname in ('update_signer', 'oci_containers_signer'):
group = 'mp-keysets'
else:
# This will cause the schema validation to fail, notifying the user.
logging.warning('Bad keyset setname %s', setname)
group = 'bad-keysets'
self.keysets.setdefault(group, []).append(setname)
return self._WriteConfig(config_file, set_config)
def ParseArgs(argv):
"""Parse the commandline arguments."""
parser = commandline.ArgumentParser(description=__doc__)
parser.add_argument('-P', '--prefix', type='path', default='/')
parser.add_argument('-d', '--directory', default=BASE_DIR)
parser.add_argument('-D', '--discover', action='store_true',
help='Discover keysets')
parser.add_argument('-s', '-p', '--production', default=PROD_DIR)
parser.add_argument('-N', '--new-only', action='store_true',
help='Only process new keysets')
parser.add_argument('-u', '--update', action='store_true')
parser.add_argument('keysets', nargs='*')
options = parser.parse_args(argv)
options.Freeze()
return options
def ParseSignerConfig(prod_path):
"""Return the parsed signer config.
Args:
prod_path: Path to production checkout. Typically '/cros'.
Returns:
Parsed signer config.
"""
config = configparser.ConfigParser()
config_path = os.path.join(prod_path, 'signer/configs/cros_common.config')
if not os.path.exists(config_path):
logging.warning('%s not found, using production config.', config_path)
config_path = '/cros/signer/configs/cros_common.config'
config.read(config_path)
return config
def DiscoverKeysets(keysets_dir):
"""Discover keysets.
Args:
keysets_dir: directory where the keysets live. Typically /cros/keys.
Returns:
A sorted list of (setname: directory) tuples.
"""
_, dirs, _ = next(os.walk(keysets_dir))
ret = {}
for src in sorted(dirs):
path = os.path.join(keysets_dir, src)
keyset = keys.Keyset(path)
if keyset.name != 'unknown':
if keyset.name in ret:
logging.warning('Ignoring %s because name is duplicate of %s',
src, ret[keyset.name])
else:
logging.info('Discovered %s in %s', keyset.name, src)
ret[keyset.name] = src
return sorted(ret.items())
def main(argv):
"""Import the keys."""
options = ParseArgs(argv)
# Always prepend the prefix directory.
production = os.path.join(
options.prefix, options.production.lstrip('/'))
directory = os.path.join(
options.prefix, options.directory.lstrip('/'))
config = ParseSignerConfig(production)
keydata = KeyringData(production, directory, options, config)
if osutils.SafeMakedirs(keydata.public):
os.chdir(keydata.public)
osutils.SafeMakedirs(keydata.private)
osutils.SafeMakedirs(keydata.configs)
else:
os.chdir(keydata.public)
if not options.discover and config.has_section('keysets'):
config_keysets = config.items('keysets')
else:
logging.notice('Discovering keysets in %s' % options.production)
config_keysets = DiscoverKeysets(os.path.join(options.production, 'keys'))
# Handle autoupdate keysets first.
if not options.keysets:
keysets = config_keysets
else:
# Allow '<setname>:<directory>' as well as '<setname>'.
keysets = []
keyset_dict = dict(config_keysets)
for opt in options.keysets:
if ':' in opt:
keysets.append(opt.split(':', 1))
elif opt in keyset_dict:
keysets.append((opt, keyset_dict[opt]))
# If the auto-update keyset is present, process it first.
au_keysets = [x for x in keysets if x[1] == AU_KEYDIR]
for keyset in au_keysets:
keysets.remove(keyset)
keysets = au_keysets + keysets
logging.notice('Processing: %s', ' '.join(x[0] for x in keysets))
for setname, directory in keysets:
# TODO(lamontjones): consider flattening the namespace by combining multiple
# directories/sets into a common directory. That would require adding
# version numbers to colliding files.
keydata.ImportKeyset(setname, directory)
keydata.WriteRepoYaml()
logging.notice('Done')