blob: 64358a33621544bb9edf1f37ffb640268b33e0d7 [file] [log] [blame] [edit]
#!/usr/bin/env python
# Copyright 2013 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Command-line tool for checking and applying Chrome OS update payloads."""
# pylint: disable=import-error
import argparse
import filecmp
import os
import sys
import tempfile
from update_payload import error
lib_dir = os.path.join(os.path.dirname(__file__), "lib")
if os.path.exists(lib_dir) and os.path.isdir(lib_dir):
sys.path.insert(1, lib_dir)
import update_payload # pylint: disable=wrong-import-position
_TYPE_FULL = "full"
_TYPE_DELTA = "delta"
def CheckApplyPayload(args):
"""Whether to check the result after applying the payload.
Args:
args: Parsed command arguments (the return value of
ArgumentParser.parse_args).
Returns:
Boolean value whether to check.
"""
return args.dst_part_paths is not None
def ApplyPayload(args):
"""Whether to apply the payload.
Args:
args: Parsed command arguments (the return value of
ArgumentParser.parse_args).
Returns:
Boolean value whether to apply the payload.
"""
return CheckApplyPayload(args) or args.out_dst_part_paths is not None
def ParseArguments(argv):
"""Parse and validate command-line arguments.
Args:
argv: command-line arguments to parse (excluding the program name)
Returns:
Returns the arguments returned by the argument parser.
"""
parser = argparse.ArgumentParser(
description=(
"Applies a Chrome OS update PAYLOAD to src_part_paths"
"emitting dst_part_paths, respectively. "
"src_part_paths are only needed for delta payloads. "
"When no partitions are provided, verifies the payload "
"integrity."
),
epilog=(
"Note: a payload may verify correctly but fail to apply, and "
"vice versa; this is by design and can be thought of as static "
"vs dynamic correctness. A payload that both verifies and "
"applies correctly should be safe for use by the Chrome OS "
"Update Engine. Use --check to verify a payload prior to "
"applying it."
),
formatter_class=argparse.RawDescriptionHelpFormatter,
)
check_args = parser.add_argument_group("Checking payload integrity")
check_args.add_argument(
"-c",
"--check",
action="store_true",
default=False,
help=("force payload integrity check (e.g. before " "applying)"),
)
check_args.add_argument(
"-r",
"--report",
metavar="FILE",
help="dump payload report (`-' for stdout)",
)
check_args.add_argument(
"-t",
"--type",
dest="assert_type",
help="assert the payload type",
choices=[_TYPE_FULL, _TYPE_DELTA],
)
check_args.add_argument(
"-z",
"--block-size",
metavar="NUM",
default=0,
type=int,
help="assert a non-default (4096) payload block size",
)
check_args.add_argument(
"-u",
"--allow-unhashed",
action="store_true",
default=False,
help="allow unhashed operations",
)
check_args.add_argument(
"-d",
"--disabled_tests",
default=(),
metavar="",
help=(
"space separated list of tests to disable. "
"allowed options include: "
+ ", ".join(update_payload.CHECKS_TO_DISABLE)
),
choices=update_payload.CHECKS_TO_DISABLE,
)
check_args.add_argument(
"-k",
"--key",
metavar="FILE",
help=("override standard key used for signature " "validation"),
)
check_args.add_argument(
"-m",
"--meta-sig",
metavar="FILE",
help="verify metadata against its signature",
)
check_args.add_argument(
"-s",
"--metadata-size",
metavar="NUM",
default=0,
help="the metadata size to verify with the one in" " payload",
)
check_args.add_argument(
"--part_sizes",
metavar="NUM",
nargs="+",
type=int,
help="override partition size auto-inference",
)
apply_args = parser.add_argument_group("Applying payload")
# TODO(ahassani): Extent extract-bsdiff to puffdiff too.
apply_args.add_argument(
"-x",
"--extract-bsdiff",
action="store_true",
default=False,
help=(
"use temp input/output files with BSDIFF "
"operations (not in-place)"
),
)
apply_args.add_argument(
"--bspatch-path",
metavar="FILE",
help="use the specified bspatch binary",
)
apply_args.add_argument(
"--puffpatch-path",
metavar="FILE",
help="use the specified puffpatch binary",
)
apply_args.add_argument(
"--src_part_paths",
metavar="FILE",
nargs="+",
help="source partitition files",
)
apply_args.add_argument(
"--dst_part_paths",
metavar="FILE",
nargs="+",
help="destination partition files",
)
apply_args.add_argument(
"--out_dst_part_paths",
metavar="FILE",
nargs="+",
help="created destination partition files",
)
parser.add_argument("payload", metavar="PAYLOAD", help="the payload file")
parser.add_argument(
"--part_names", metavar="NAME", nargs="+", help="names of partitions"
)
# Parse command-line arguments.
args = parser.parse_args(argv)
# There are several options that imply --check.
args.check = (
args.check
or args.report
or args.assert_type
or args.block_size
or args.allow_unhashed
or args.disabled_tests
or args.meta_sig
or args.key
or args.part_sizes is not None
or args.metadata_size
)
# Makes sure the following arguments have the same length as |part_names| if
# set.
for arg in [
"part_sizes",
"src_part_paths",
"dst_part_paths",
"out_dst_part_paths",
]:
if getattr(args, arg) is None:
# Parameter is not set.
continue
if len(args.part_names) != len(getattr(args, arg, [])):
parser.error("partitions in --%s do not match --part_names" % arg)
def _IsSrcPartPathsProvided(args):
return args.src_part_paths is not None
# Makes sure parameters are coherent with payload type.
if ApplyPayload(args):
if _IsSrcPartPathsProvided(args):
if args.assert_type == _TYPE_FULL:
parser.error(
"%s payload does not accept source partition arguments"
% _TYPE_FULL
)
else:
args.assert_type = _TYPE_DELTA
else:
if args.assert_type == _TYPE_DELTA:
parser.error(
"%s payload requires source partitions arguments"
% _TYPE_DELTA
)
else:
args.assert_type = _TYPE_FULL
else:
# Not applying payload.
if args.extract_bsdiff:
parser.error(
"--extract-bsdiff can only be used when applying payloads"
)
if args.bspatch_path:
parser.error(
"--bspatch-path can only be used when applying payloads"
)
if args.puffpatch_path:
parser.error(
"--puffpatch-path can only be used when applying payloads"
)
# By default, look for a metadata-signature file with a name based on the name
# of the payload we are checking. We only do it if check was triggered.
if args.check and not args.meta_sig:
default_meta_sig = args.payload + ".metadata-signature"
if os.path.isfile(default_meta_sig):
args.meta_sig = default_meta_sig
print(
"Using default metadata signature",
args.meta_sig,
file=sys.stderr,
)
return args
def main(argv):
# Parse and validate arguments.
args = ParseArguments(argv[1:])
with open(args.payload, "rb") as payload_file:
payload = update_payload.Payload(payload_file)
try:
# Initialize payload.
payload.Init()
# Perform payload integrity checks.
if args.check:
report_file = None
do_close_report_file = False
metadata_sig_file = None
try:
if args.report:
if args.report == "-":
report_file = sys.stdout
else:
report_file = open(args.report, "w")
do_close_report_file = True
part_sizes = args.part_sizes and dict(
zip(args.part_names, args.part_sizes)
)
metadata_sig_file = args.meta_sig and open(
args.meta_sig, "rb"
)
payload.Check(
pubkey_file_name=args.key,
metadata_sig_file=metadata_sig_file,
metadata_size=int(args.metadata_size),
report_out_file=report_file,
assert_type=args.assert_type,
block_size=int(args.block_size),
part_sizes=part_sizes,
allow_unhashed=args.allow_unhashed,
disabled_tests=args.disabled_tests,
)
finally:
if metadata_sig_file:
metadata_sig_file.close()
if do_close_report_file:
report_file.close()
# Apply payload.
if ApplyPayload(args):
dargs = {"bsdiff_in_place": not args.extract_bsdiff}
if args.bspatch_path:
dargs["bspatch_path"] = args.bspatch_path
if args.puffpatch_path:
dargs["puffpatch_path"] = args.puffpatch_path
if args.assert_type == _TYPE_DELTA:
dargs["old_parts"] = dict(
zip(args.part_names, args.src_part_paths)
)
out_dst_parts = {}
file_handles = []
if args.out_dst_part_paths is not None:
for name, path in zip(
args.part_names, args.out_dst_part_paths
):
handle = open(path, "wb+")
file_handles.append(handle)
out_dst_parts[name] = handle.name
else:
for name in args.part_names:
handle = tempfile.NamedTemporaryFile()
file_handles.append(handle)
out_dst_parts[name] = handle.name
payload.Apply(out_dst_parts, **dargs)
# If destination kernel and rootfs partitions are not given, then this
# just becomes an apply operation with no check.
if CheckApplyPayload(args):
# Prior to comparing, add the unused space past the filesystem
# boundary in the new target partitions to become the same size as
# the given partitions. This will truncate to larger size.
for part_name, out_dst_part, dst_part in zip(
args.part_names, file_handles, args.dst_part_paths
):
out_dst_part.truncate(os.path.getsize(dst_part))
# Compare resulting partitions with the ones from the target image.
if not filecmp.cmp(out_dst_part.name, dst_part):
raise error.PayloadError(
"Resulting %s partition corrupted." % part_name
)
# Close the output files. If args.out_dst_* was not given, then these
# files are created as temp files and will be deleted upon close().
for handle in file_handles:
handle.close()
except error.PayloadError as e:
sys.stderr.write("Error: %s\n" % e)
return 1
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv))