blob: a882ef9a22865f9347c00d3e94d3151c91bfc662 [file] [log] [blame]
# Copyright 2018 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""ChromeOS firmware Signers"""
import csv
import glob
import logging
import os
import re
import shutil
import tempfile
from chromite.lib import cros_build_lib
from chromite.lib import image_lib
from chromite.lib import osutils
from chromite.signing.lib import signer
class BiosSigner(signer.FutilitySigner):
"""Sign bios.bin file using futility."""
required_keys_private = ("firmware_data_key",)
required_keys_public = ("kernel_subkey",)
required_keyblocks = ("firmware_data_key",)
def __init__(self, sig_id="", sig_dir="", preamble_flags=None) -> None:
"""Init BiosSigner
Args:
sig_id: Signature ID (aka loem id)
sig_dir: Signature Output Directory (i.e shellball/keyset)
preamble_flags: preamble flags passed to futility
"""
self.sig_id = sig_id
self.sig_dir = sig_dir
self.preamble_flags = preamble_flags
def GetFutilityArgs(self, keyset, input_name, output_name):
"""Returns futility arguments for signing bios
Args:
keyset: keyset used for signing
input_name: bios image
output_name: output firmware file
"""
fw_key = keyset.keys["firmware_data_key"]
kernel_key = keyset.keys["kernel_subkey"]
args = [
"sign",
"--type",
"bios",
"--signprivate",
fw_key.private,
"--keyblock",
fw_key.keyblock,
"--kernelkey",
kernel_key.public,
"--version",
str(fw_key.version),
]
if self.preamble_flags is not None:
args += ["--flags", str(self.preamble_flags)]
# Add loem related arguments
if self.sig_id and self.sig_dir:
args += ["--loemdir", self.sig_dir, "--loemid", self.sig_id]
# Add final input/output arguments
args += [input_name, output_name]
return args
class ECSigner(signer.BaseSigner):
"""Sign EC bin file."""
required_keys_private = ("key_ec_efs",)
def IsROSigned(self, ec_image):
"""Returns True if the given ec.bin is RO signed"""
# Check fmap for KEY_RO
fmap = cros_build_lib.run(
["futility", "dump_fmap", "-p", ec_image], capture_output=True
)
return b"KEY_RO" in fmap.stdout
def Sign(self, keyset, input_name, output_name) -> None:
"""Sign EC image
Args:
keyset: keyset used for this signing step
input_name: ec image path to be signed (i.e. to ec.bin)
output_name: bios image path to be updated with new hashes
Raises:
SigningFailedError: if a signing fails
"""
# Use absolute paths since we use a temp directory
ec_path = os.path.abspath(input_name)
bios_path = os.path.abspath(output_name)
if self.IsROSigned(ec_path):
# Only sign if not read-only, nothing to do
return
logging.info("Signing EC %s", ec_path)
# Run futility in temp_dir to avoid cwd artifacts
with osutils.TempDir() as temp_dir:
ec_rw_bin = os.path.join(temp_dir, "EC_RW.bin")
ec_rw_hash = os.path.join(temp_dir, "EC_RW.hash")
try:
cros_build_lib.run(
[
"futility",
"sign",
"--type",
"rwsig",
"--prikey",
keyset.keys["key_ec_efs"].private,
"--ecrw_out",
ec_rw_bin,
ec_path,
],
cwd=temp_dir,
)
cros_build_lib.run(
["openssl", "dgst", "-sha256", "-binary", ec_rw_bin],
stdout=ec_rw_hash,
cwd=temp_dir,
)
cros_build_lib.run(
["store_file_in_cbfs", bios_path, ec_rw_bin, "ecrw"]
)
cros_build_lib.run(
["store_file_in_cbfs", bios_path, ec_rw_hash, "ecrw.hash"]
)
except cros_build_lib.RunCommandError as err:
logging.warning("Signing EC failed: %s", str(err))
raise signer.SigningFailedError("Signing EC failed")
class GBBSigner(signer.FutilitySigner):
"""Sign GBB"""
required_keys_public = ("recovery_key", "root_key")
required_keys_private = ()
def GetFutilityArgs(self, keyset, input_name, output_name):
"""Return args for signing GBB
Args:
keyset: Keyset used for signing
input_name: Firmware image
output_name: Bios path (i.e. tobios.bin)
"""
return [
"gbb",
"--set",
"--recoverykey=" + keyset.keys["recovery_key"].public,
"--rootkey=" + keyset.keys["root_key"].public,
input_name,
output_name,
]
class FirmwareSigner(signer.BaseSigner):
"""Signs all firmware related to the given configuration."""
required_keys_private = (
BiosSigner.required_keys_private + GBBSigner.required_keys_private
)
required_keys_public = (
BiosSigner.required_keys_public + GBBSigner.required_keys_public
)
required_keyblocks = (
BiosSigner.required_keyblocks + GBBSigner.required_keyblocks
)
def SignOne(
self,
keyset,
shellball_dir,
bios_image,
ec_image="",
model_name="",
key_id="",
keyset_out_dir="keyset",
) -> None:
"""Perform one signing based on the given args.
Args:
keyset: keyset directory used for signing,
shellball_dir: location of extracted shellball
bios_image: relative path of bios.bin in shellball
ec_image: relative path of ec.bin in shellball
model_name: name of target's model_name as define in
signer_config.csv
key_id: subkey id to be used for signing
keyset_out_dir: relative path of keyset output dir in shellball
Raises:
SigningFailedError: if a signing fails
"""
if key_id:
keyset = keyset.GetBuildKeyset(key_id)
shellball_keydir = os.path.join(shellball_dir, keyset_out_dir)
osutils.SafeMakedirs(shellball_keydir)
if model_name:
shutil.copy(
keyset.keys["root_key"].public,
os.path.join(shellball_dir, "rootkey." + model_name),
)
bios_path = os.path.join(shellball_dir, bios_image)
if ec_image:
ec_path = os.path.join(shellball_dir, ec_image)
logging.info("Signing EC: %s", ec_path)
ECSigner().Sign(keyset, ec_path, bios_path)
logging.info("Signing BIOS: %s", bios_path)
with tempfile.NamedTemporaryFile() as temp_fw:
bios_signer = BiosSigner(
sig_id=model_name, sig_dir=shellball_keydir
)
bios_signer.Sign(keyset, bios_path, temp_fw.name)
GBBSigner().Sign(keyset, temp_fw.name, bios_path)
def Sign(self, keyset, input_name, output_name) -> None:
"""Sign Firmware shellball.
Signing is based on if 'signer_config.csv', then all rows defined in
file are signed. Else all bios*.bin in shellball will be signed.
Args:
keyset: keyset directory, with subkeys[key_id] if defined
input_name: location of extracted shellball
output_name: unused
Raises:
SigningFailedError: if a signing step fails
"""
shellball_dir = input_name
signerconfig_csv = os.path.join(shellball_dir, "signer_config.csv")
if os.path.exists(signerconfig_csv):
with open(signerconfig_csv, encoding="utf-8") as csv_file:
signerconfigs = SignerConfigsFromCSV(csv_file)
for signerconfig in signerconfigs:
self.SignOne(
keyset,
shellball_dir,
signerconfig["firmware_image"],
ec_image=signerconfig["ec_image"],
model_name=signerconfig["model_name"],
key_id=signerconfig["key_id"],
)
else:
# Sign all ./bios*.bin
for bios_path in glob.glob(os.path.join(input_name, "bios*.bin")):
key_id_match = re.match(r".*bios\.(\w+)\.bin", bios_path)
key_id = key_id_match.group(1) if key_id_match else ""
keyset_out_dir = "keyset." + key_id if key_id else "keyset"
self.SignOne(
keyset,
shellball_dir,
bios_path,
model_name=key_id,
key_id=key_id,
keyset_out_dir=keyset_out_dir,
)
class ShellballError(Exception):
"""Error occurred with firmware shellball"""
class ShellballExtractError(ShellballError):
"""Raised when extracting fails."""
class ShellballRepackError(ShellballError):
"""Raised when repacking fails."""
class Shellball:
"""Firmware shellball image created from pack_firmware.
Can be called as a Context Manager which will extract itself to a temp
directory and repack itself on exit.
https://sites.google.com/a/google.com/chromeos-partner/platforms/creating-a-firmware-updater
"""
def __init__(self, filename) -> None:
"""Initial Shellball, no disk changes.
Args:
filename: filename of shellball
"""
self.filename = filename
self._extract_dir = None
def __enter__(self):
"""Extract the shellball to a temp directory, returns directory."""
self._extract_dir = osutils.TempDir()
self.Extract(self._extract_dir.tempdir)
return self._extract_dir.tempdir
def __exit__(self, exc_type, exc_value, traceback) -> None:
"""Repack shellball and delete temp directory."""
try:
if exc_type is None:
self.Repack(self._extract_dir.tempdir)
finally:
if self._extract_dir:
# Always clear up temp directory
self._extract_dir.Cleanup()
def Extract(self, out_dir) -> None:
"""Extract self to given directory, raises ExtractFail on fail"""
try:
self._Run("--sb_extract", out_dir)
except cros_build_lib.RunCommandError as err:
logging.error("Extracting firmware shellball failed")
raise ShellballExtractError(err.msg)
def Repack(self, src_dir) -> None:
"""Repack shellball with |src_dir|, raises RepackFailed on fail.
Only supports shellballs that honor '--sb_repack' which should include
everything that has been signed since 2014
"""
with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
orig_file = self.filename
self.filename = tmp_file.name
try:
shutil.copy(orig_file, tmp_file.name)
self._Run("--sb_repack", src_dir)
shutil.move(tmp_file.name, orig_file)
except cros_build_lib.RunCommandError as err:
logging.error("Repacking firmware shellball failed")
raise ShellballRepackError(err.msg)
finally:
self.filename = orig_file
# Clean up file if still exists
if os.path.exists(tmp_file.name):
os.remove(tmp_file.name)
def _Run(self, *args) -> None:
"""Execute shellball with given arguments."""
cmd = [os.path.realpath(self.filename)]
cmd += args
cros_build_lib.run(cmd)
# TODO(vapier): Should switch this to image_lib.LoopbackPartitions or
# signing.lib.imagefile as makes sense.
def _MountImagePartition(
image_file,
part_id,
destination,
gpt_table=None,
sudo=True,
makedirs=True,
mount_opts=("ro",),
skip_mtab=False,
) -> None:
"""Mount a |partition| from |image_file| to |destination|.
If there is a GPT table (GetImageDiskPartitionInfo), it will be used for
start offset and size of the selected partition. Otherwise, the GPT will
be read again from |image_file|.
The mount option will be:
-o offset=XXX,sizelimit=YYY,(*mount_opts)
Args:
image_file: A path to the image file (chromiumos_base_image.bin).
part_id: A partition name or number.
destination: A path to the mount point.
gpt_table: A list of PartitionInfo objects. See
image_lib.GetImageDiskPartitionInfo.
sudo: Same as MountDir.
makedirs: Same as MountDir.
mount_opts: Same as MountDir.
skip_mtab: Same as MountDir.
"""
if gpt_table is None:
gpt_table = image_lib.GetImageDiskPartitionInfo(image_file)
for part in gpt_table:
if part_id in (part.name, part.number):
break
else:
part = None
raise ValueError(
"Partition number %s not found in the GPT %r."
% (part_id, gpt_table)
)
opts = ["loop", "offset=%d" % part.start, "sizelimit=%d" % part.size]
opts += mount_opts
osutils.MountDir(
image_file,
destination,
sudo=sudo,
makedirs=makedirs,
mount_opts=opts,
skip_mtab=skip_mtab,
)
def ResignImageFirmware(image_file, keyset) -> None:
"""Resign the given firmware image.
Args:
image_file: string path to image.
keyset: Keyset to use for signing.
Raises SignerFailedError
"""
with osutils.TempDir() as rootfs_dir:
# TODO(b/236161656): Fix.
# pylint: disable-next=not-context-manager
with _MountImagePartition(image_file, "ROOT-A", rootfs_dir):
sb_file = os.path.join(rootfs_dir, "usr/sbin/chromeos-firmware")
if os.path.exists(sb_file):
logging.info("Found firmware, signing")
with Shellball(sb_file) as sb_dir:
fw_signer = FirmwareSigner()
if not fw_signer.Sign(keyset, sb_dir, None):
raise signer.SigningFailedError(
"Signing Firmware Image Failed: %s" % sb_file
)
version_signer_path = os.path.join(sb_dir, "VERSION.signer")
with open(
version_signer_path, "w", encoding="utf-8"
) as version_signer:
WriteSignerNotes(keyset, version_signer)
else:
logging.warning(
"No firmware found in image. Not signing firmware"
)
def SignerConfigsFromCSV(signer_config_file):
"""Returns list of SignerConfigs from a signer_config.csv file
CSV should have a header with fields model_name, firmware_image, key_id, and
ec_image
go/cros-unibuild-signing
Args:
signer_config_file: File descriptor for signer_configs.csv.
Returns:
List of dicts in the signer_configs file.
"""
csv_reader = csv.DictReader(signer_config_file)
for field in ("model_name", "firmware_image", "key_id", "ec_image"):
if field not in csv_reader.fieldnames:
raise csv.Error("Missing field: " + field)
return list(csv_reader)
def WriteSignerNotes(keyset, outfile) -> None:
"""Writes signer notes (a.k.a. VERSION.signer) to file.
Args:
keyset: keyset used for generating signer file.
outfile: file object that signer notes are written to.
"""
recovery_key = keyset.keys["recovery_key"]
outfile.write("Signed with keyset in %s\n" % recovery_key.keydir)
outfile.write("recovery: %s\n" % recovery_key.GetSHA1sum())
root_keys = keyset.GetRootOfTrustKeys("root_key")
if "root_key" in root_keys and len(root_keys) == 1:
outfile.write("root: %s\n" % (root_keys["root_key"].GetSHA1sum()))
else:
outfile.write("List sha1sum of all loem/model's signatures:\n")
for key_id, key in root_keys.items():
outfile.write("%s: %s\n" % (key_id, key.GetSHA1sum()))