blob: 3e2775f01a90d21958a26ed9c9eec5186e372634 [file] [log] [blame] [edit]
#!/usr/bin/env python
# Copyright 2015 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""payload_info: Show information about an update payload."""
import argparse
import sys
import textwrap
import update_payload
MAJOR_PAYLOAD_VERSION_BRILLO = 2
def DisplayValue(key, value):
"""Print out a key, value pair with values left-aligned."""
if value is not None:
print("%-*s %s" % (28, key + ":", value))
else:
raise ValueError("Cannot display an empty value.")
def DisplayHexData(data, indent=0):
"""Print out binary data as a hex values."""
for off in range(0, len(data), 16):
chunk = bytearray(data[off : off + 16])
print(
" " * indent
+ " ".join("%.2x" % c for c in chunk)
+ " " * (16 - len(chunk))
+ " | "
+ "".join(chr(c) if 32 <= c < 127 else "." for c in chunk)
)
class PayloadCommand:
"""Show basic information about an update payload.
This command parses an update payload and displays information from
its header and manifest.
"""
def __init__(self, options):
self.options = options
self.payload = None
def _DisplayHeader(self):
"""Show information from the payload header."""
header = self.payload.header
DisplayValue("Payload version", header.version)
DisplayValue("Manifest length", header.manifest_len)
def _DisplayManifest(self):
"""Show information from the payload manifest."""
manifest = self.payload.manifest
DisplayValue("Number of partitions", len(manifest.partitions))
for partition in manifest.partitions:
DisplayValue(
' Number of "%s" ops' % partition.partition_name,
len(partition.operations),
)
for partition in manifest.partitions:
DisplayValue(
"Timestamp for " + partition.partition_name, partition.version
)
DisplayValue("Block size", manifest.block_size)
DisplayValue("Minor version", manifest.minor_version)
def _DisplaySignatures(self):
"""Show information about the signatures from the manifest."""
header = self.payload.header
if header.metadata_signature_len:
offset = header.size + header.manifest_len
DisplayValue(
"Metadata signatures blob",
"file_offset=%d (%d bytes)"
% (offset, header.metadata_signature_len),
)
# pylint: disable=invalid-unary-operand-type
signatures_blob = self.payload.ReadDataBlob(
-header.metadata_signature_len, header.metadata_signature_len
)
self._DisplaySignaturesBlob("Metadata", signatures_blob)
else:
print("No metadata signatures stored in the payload")
manifest = self.payload.manifest
if manifest.HasField("signatures_offset"):
signature_msg = "blob_offset=%d" % manifest.signatures_offset
if manifest.signatures_size:
signature_msg += " (%d bytes)" % manifest.signatures_size
DisplayValue("Payload signatures blob", signature_msg)
signatures_blob = self.payload.ReadDataBlob(
manifest.signatures_offset, manifest.signatures_size
)
self._DisplaySignaturesBlob("Payload", signatures_blob)
else:
print("No payload signatures stored in the payload")
@staticmethod
def _DisplaySignaturesBlob(signature_name, signatures_blob):
"""Show information about the signatures blob."""
signatures = update_payload.update_metadata_pb2.Signatures()
signatures.ParseFromString(signatures_blob)
print(
"%s signatures: (%d entries)"
% (signature_name, len(signatures.signatures))
)
for signature in signatures.signatures:
print(
" version=%s, hex_data: (%d bytes)"
% (
signature.version
if signature.HasField("version")
else None,
len(signature.data),
)
)
DisplayHexData(signature.data, indent=4)
def _DisplayOps(self, name, operations):
"""Show information about the install operations from the manifest.
The list shown includes operation type, data offset, data length, source
extents, source length, destination extents, and destinations length.
Args:
name: The name you want displayed above the operation table.
operations: The operations object that you want to display information
about.
"""
def _DisplayExtents(extents, name):
"""Show information about extents."""
num_blocks = sum([ext.num_blocks for ext in extents])
ext_str = " ".join(
"(%s,%s)" % (ext.start_block, ext.num_blocks) for ext in extents
)
# Make extent list wrap around at 80 chars.
ext_str = "\n ".join(textwrap.wrap(ext_str, 74))
extent_plural = "s" if len(extents) > 1 else ""
block_plural = "s" if num_blocks > 1 else ""
print(
" %s: %d extent%s (%d block%s)"
% (name, len(extents), extent_plural, num_blocks, block_plural)
)
print(" %s" % ext_str)
op_dict = update_payload.common.OpType.NAMES
print("%s:" % name)
for op_count, op in enumerate(operations):
print(" %d: %s" % (op_count, op_dict[op.type]))
if op.HasField("data_offset"):
print(" Data offset: %s" % op.data_offset)
if op.HasField("data_length"):
print(" Data length: %s" % op.data_length)
if op.src_extents:
_DisplayExtents(op.src_extents, "Source")
if op.dst_extents:
_DisplayExtents(op.dst_extents, "Destination")
def _GetStats(self, manifest):
"""Returns various statistics about a payload file.
Returns a dictionary containing the number of blocks read during payload
application, the number of blocks written, and the number of seeks done
when writing during operation application.
"""
read_blocks = 0
written_blocks = 0
num_write_seeks = 0
for partition in manifest.partitions:
last_ext = None
for curr_op in partition.operations:
read_blocks += sum(
[ext.num_blocks for ext in curr_op.src_extents]
)
written_blocks += sum(
[ext.num_blocks for ext in curr_op.dst_extents]
)
for curr_ext in curr_op.dst_extents:
# See if the extent is contiguous with the last extent seen.
if last_ext and (
curr_ext.start_block
!= last_ext.start_block + last_ext.num_blocks
):
num_write_seeks += 1
last_ext = curr_ext
# Old and new partitions are read once during verification.
read_blocks += (
partition.old_partition_info.size // manifest.block_size
)
read_blocks += (
partition.new_partition_info.size // manifest.block_size
)
stats = {
"read_blocks": read_blocks,
"written_blocks": written_blocks,
"num_write_seeks": num_write_seeks,
}
return stats
def _DisplayStats(self, manifest):
stats = self._GetStats(manifest)
DisplayValue("Blocks read", stats["read_blocks"])
DisplayValue("Blocks written", stats["written_blocks"])
DisplayValue("Seeks when writing", stats["num_write_seeks"])
def Run(self):
"""Parse the update payload and display information from it."""
self.payload = update_payload.Payload(
self.options.payload_file, is_zip=self.options.zipfile
)
self.payload.Init()
self._DisplayHeader()
self._DisplayManifest()
if self.options.signatures:
self._DisplaySignatures()
if self.options.stats:
self._DisplayStats(self.payload.manifest)
if self.options.list_ops:
print()
for partition in self.payload.manifest.partitions:
self._DisplayOps(
"%s install operations" % partition.partition_name,
partition.operations,
)
def main():
parser = argparse.ArgumentParser(
description="Show information about an update payload."
)
parser.add_argument(
"payload_file",
type=argparse.FileType("rb"),
help="The update payload file.",
)
parser.add_argument(
"--list_ops",
default=False,
action="store_true",
help="List the install operations and their extents.",
)
parser.add_argument(
"--stats",
default=False,
action="store_true",
help="Show information about overall input/output.",
)
parser.add_argument(
"--signatures",
default=False,
action="store_true",
help="Show signatures stored in the payload.",
)
parser.add_argument(
"--zipfile",
default=False,
action="store_true",
help="Extract the payload from a zipfile.",
)
args = parser.parse_args()
PayloadCommand(args).Run()
if __name__ == "__main__":
sys.exit(main())