| # Copyright 2019 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Import keys from legacy signer config.""" |
| |
| import configparser |
| import logging |
| import os |
| |
| import yaml # pylint: disable=import-error |
| |
| from chromite.lib import commandline |
| from chromite.lib import osutils |
| from chromite.signing.lib import keys |
| |
| |
| METADATA_VERSION = 0 |
| PROD_DIR = "/cros" |
| BASE_DIR = os.path.join(PROD_DIR, "v2-keys") |
| AU_KEYDIR = "update-payload-key" |
| |
| |
| class Error(Exception): |
| """Base error class.""" |
| |
| |
| class KeyimportError(Error): |
| """Key Import Errors.""" |
| |
| |
| class KeyringData: |
| """A collection of keyset information.""" |
| |
| def __init__(self, prod_dir, base_dir, options, signer_config): |
| self.prod_dir = prod_dir |
| self.base_dir = base_dir |
| self.options = options |
| self.signer_config = signer_config |
| self.prod_keys = os.path.join(prod_dir, "keys") |
| self.public = os.path.join(base_dir, "public") |
| self.private = "../private" |
| self.configs = "../config" |
| self.contents_yaml = os.path.join(self.base_dir, "contents.yaml") |
| self.keysets = self._ReadRepoYaml() |
| |
| def _ReadRepoYaml(self): |
| """Read the directory of active keysets in the repo.""" |
| default = {"metadata-version": METADATA_VERSION} |
| if os.path.exists(self.contents_yaml): |
| with open(self.contents_yaml, encoding="utf-8") as fp: |
| ret = yaml.safe_load(fp) |
| if not ret or "metadata-version" not in ret: |
| logging.error("%s: no metadata-version.", self.contents_yaml) |
| raise KeyimportError("metadata-version missing.") |
| if ret["metadata-version"] != METADATA_VERSION: |
| logging.error( |
| "%s: metadata-version %s", |
| self.contents_yaml, |
| ret["metadata-version"], |
| ) |
| raise KeyimportError("metadata-version bad.") |
| return ret |
| else: |
| return default |
| |
| def WriteRepoYaml(self): |
| """Write repo.yaml.""" |
| self._WriteConfig(self.contents_yaml, self.keysets) |
| |
| def _WriteConfig(self, filename, config): |
| """Write the config |
| |
| Args: |
| filename: path to file. |
| config: keyset dictionary. |
| """ |
| content = yaml.dump(config, default_flow_style=False) |
| osutils.WriteFile(filename, content) |
| |
| def ImportKeyset(self, setname, directory): |
| """Import a Keyset. |
| |
| Args: |
| setname: Friendly setname (e.g., sarien-mp-v3) |
| directory: Directory where keyset is stored. (e.g., SarienMPKeys-v3) |
| |
| Returns: |
| True if all files were processed. |
| """ |
| config_file = os.path.join(self.configs, "%s.yaml" % setname) |
| if os.path.exists(config_file): |
| if self.options.new_only: |
| logging.debug( |
| "Skipping existing keyset %s (%s).", setname, directory |
| ) |
| return False |
| if self.options.update: |
| osutils.SafeUnlink(config_file) |
| else: |
| raise KeyimportError("%s.yaml already exists" % setname) |
| |
| logging.info("Processing keyset %s (%s):", setname, directory) |
| |
| set_config = { |
| "name": setname, |
| "metadata-version": METADATA_VERSION, |
| "directory": directory, |
| } |
| |
| if setname.startswith("test-keys-"): |
| group = "test-keysets" |
| elif "-premp" in setname: |
| group = "premp-keysets" |
| elif "-mp" in setname: |
| group = "mp-keysets" |
| # These two are special case. |
| elif setname in ("update_signer", "oci_containers_signer"): |
| group = "mp-keysets" |
| else: |
| # This will cause the schema validation to fail, notifying the user. |
| logging.warning("Bad keyset setname %s", setname) |
| group = "bad-keysets" |
| self.keysets.setdefault(group, []).append(setname) |
| self.keysets[group].sort() |
| return self._WriteConfig(config_file, set_config) |
| |
| |
| def ParseArgs(argv): |
| """Parse the commandline arguments.""" |
| parser = commandline.ArgumentParser(description=__doc__) |
| parser.add_argument("-P", "--prefix", type="path", default="/") |
| parser.add_argument("-d", "--directory", default=BASE_DIR) |
| parser.add_argument( |
| "-D", "--discover", action="store_true", help="Discover keysets" |
| ) |
| parser.add_argument("-s", "-p", "--production", default=PROD_DIR) |
| parser.add_argument( |
| "-N", "--new-only", action="store_true", help="Only process new keysets" |
| ) |
| parser.add_argument("-u", "--update", action="store_true") |
| parser.add_argument("keysets", nargs="*") |
| |
| options = parser.parse_args(argv) |
| options.Freeze() |
| return options |
| |
| |
| def ParseSignerConfig(prod_path): |
| """Return the parsed signer config. |
| |
| Args: |
| prod_path: Path to production checkout. Typically '/cros'. |
| |
| Returns: |
| Parsed signer config. |
| """ |
| config = configparser.ConfigParser() |
| config_path = os.path.join(prod_path, "signer/configs/cros_common.config") |
| if not os.path.exists(config_path): |
| logging.warning("%s not found, using production config.", config_path) |
| config_path = "/cros/signer/configs/cros_common.config" |
| config.read(config_path) |
| return config |
| |
| |
| def DiscoverKeysets(keysets_dir): |
| """Discover keysets. |
| |
| Args: |
| keysets_dir: directory where the keysets live. Typically /cros/keys. |
| |
| Returns: |
| A sorted list of (setname: directory) tuples. |
| """ |
| _, dirs, _ = next(os.walk(keysets_dir)) |
| ret = {} |
| for src in sorted(dirs): |
| path = os.path.join(keysets_dir, src) |
| keyset = keys.Keyset(path) |
| if keyset.name != "unknown": |
| if keyset.name in ret: |
| logging.warning( |
| "Ignoring %s because name is duplicate of %s", |
| src, |
| ret[keyset.name], |
| ) |
| else: |
| logging.info("Discovered %s in %s", keyset.name, src) |
| ret[keyset.name] = src |
| return sorted(ret.items()) |
| |
| |
| def main(argv): |
| """Import the keys.""" |
| options = ParseArgs(argv) |
| # Always prepend the prefix directory. |
| production = os.path.join(options.prefix, options.production.lstrip("/")) |
| directory = os.path.join(options.prefix, options.directory.lstrip("/")) |
| config = ParseSignerConfig(production) |
| |
| keydata = KeyringData(production, directory, options, config) |
| |
| if osutils.SafeMakedirs(keydata.public): |
| os.chdir(keydata.public) |
| osutils.SafeMakedirs(keydata.private) |
| osutils.SafeMakedirs(keydata.configs) |
| else: |
| os.chdir(keydata.public) |
| |
| if not options.discover and config.has_section("keysets"): |
| config_keysets = config.items("keysets") |
| else: |
| logging.notice("Discovering keysets in %s" % options.production) |
| config_keysets = DiscoverKeysets( |
| os.path.join(options.production, "keys") |
| ) |
| |
| # Handle autoupdate keysets first. |
| if not options.keysets: |
| keysets = config_keysets |
| else: |
| # Allow '<setname>:<directory>' as well as '<setname>'. |
| keysets = [] |
| keyset_dict = dict(config_keysets) |
| for opt in options.keysets: |
| if ":" in opt: |
| keysets.append(opt.split(":", 1)) |
| elif opt in keyset_dict: |
| keysets.append((opt, keyset_dict[opt])) |
| |
| # If the auto-update keyset is present, process it first. |
| au_keysets = [x for x in keysets if x[1] == AU_KEYDIR] |
| for keyset in au_keysets: |
| keysets.remove(keyset) |
| keysets = au_keysets + keysets |
| logging.notice("Processing: %s", " ".join(x[0] for x in keysets)) |
| |
| for setname, directory in keysets: |
| # TODO(lamontjones): consider flattening the namespace by combining |
| # multiple directories/sets into a common directory. That would require |
| # adding version numbers to colliding files. |
| keydata.ImportKeyset(setname, directory) |
| |
| keydata.WriteRepoYaml() |
| logging.notice("Done") |