blob: 62b0974d73b23eb26c7dc939d55af31d64a584ec [file] [log] [blame]
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Install/copy the image to the device."""
from __future__ import division
import logging
import os
import re
import shutil
from chromite.cli import device_imager
from chromite.cli.cros import cros_chrome_sdk
from chromite.lib import commandline
from chromite.lib import cros_build_lib
from chromite.lib import dev_server_wrapper as ds_wrapper
from chromite.lib import operation
from chromite.lib import osutils
from chromite.lib import path_util
from chromite.lib import remote_access
def GetDefaultBoard():
"""Look up default board.
In a chrome checkout, return $SDK_BOARD. In a chromeos checkout,
return the contents of .default_board.
if path_util.DetermineCheckout().type == path_util.CHECKOUT_TYPE_GCLIENT:
return os.environ.get(cros_chrome_sdk.SDKFetcher.SDK_BOARD_ENV)
return cros_build_lib.GetDefaultBoard()
class UsbImagerOperation(operation.ProgressBarOperation):
"""Progress bar for flashing image to operation."""
def __init__(self, image):
self._size = os.path.getsize(image)
self._transferred = 0
self._bytes = re.compile(r'(\d+) bytes')
def _GetDDPid(self):
"""Get the Pid of dd."""
pids =['pgrep', 'dd'], capture_output=True,
print_cmd=False, encoding='utf-8').stdout
for pid in pids.splitlines():
if osutils.IsChildProcess(int(pid), name='dd'):
return int(pid)
return -1
except cros_build_lib.RunCommandError:
# If dd isn't still running, then we assume that it is finished.
return -1
def _PingDD(self, dd_pid):
"""Send USR1 signal to dd to get status update."""
cmd = ['kill', '-USR1', str(dd_pid)]
cros_build_lib.sudo_run(cmd, print_cmd=False)
except cros_build_lib.RunCommandError:
# Here we assume that dd finished in the background.
def ParseOutput(self, output=None):
"""Parse the output of dd to update progress bar."""
dd_pid = self._GetDDPid()
if dd_pid == -1:
if output is None:
stdout =
stderr =
output = stdout + stderr
match =
if match:
self._transferred = int(match.groups()[0])
self.ProgressBar(self._transferred / self._size)
def _IsFilePathGPTDiskImage(file_path, require_pmbr=False):
"""Determines if a file is a valid GPT disk.
file_path: Path to the file to test.
require_pmbr: Whether to require a PMBR in LBA0.
if os.path.isfile(file_path):
with open(file_path, 'rb') as image_file:
if require_pmbr:
# Seek to the end of LBA0 and look for the PMBR boot signature.
if != b'\x55\xaa':
return False
# Current file position is start of LBA1 now.
# Seek to LBA1 where the GPT starts.
# See if there's a GPT here.
if == b'EFI PART':
return True
return False
def _ChooseImageFromDirectory(dir_path):
"""Lists all image files in |dir_path| and ask user to select one.
dir_path: Path to the directory.
images = sorted([x for x in os.listdir(dir_path) if
_IsFilePathGPTDiskImage(os.path.join(dir_path, x))])
idx = 0
if not images:
raise ValueError('No image found in %s.' % dir_path)
elif len(images) > 1:
idx = cros_build_lib.GetChoice(
'Multiple images found in %s. Please select one to continue:' % (
return os.path.join(dir_path, images[idx])
class FlashError(Exception):
"""Thrown when there is an unrecoverable error during flash."""
class USBImager(object):
"""Copy image to the target removable device."""
def __init__(self, device, board, image, version, debug=False, yes=False):
"""Initializes USBImager."""
self.device = device
self.board = board if board else GetDefaultBoard()
self.image = image
self.version = version
self.debug = debug
self.debug_level = logging.DEBUG if debug else logging.INFO
self.yes = yes
def DeviceNameToPath(self, device_name):
return '/dev/%s' % device_name
def GetRemovableDeviceDescription(self, device):
"""Returns a informational description of the removable |device|.
device: the device name (e.g. sdc).
A string describing |device| (e.g. Patriot Memory 7918 MB).
desc = [
osutils.GetDeviceInfo(device, keyword='manufacturer'),
osutils.GetDeviceInfo(device, keyword='product'),
'(%s)' % self.DeviceNameToPath(device),
return ' '.join([x for x in desc if x])
def ListAllRemovableDevices(self):
"""Returns a list of removable devices.
A list of device names (e.g. ['sdb', 'sdc']).
devices = osutils.ListBlockDevices()
removable_devices = []
for d in devices:
if d.TYPE == 'disk' and (d.RM == '1' or d.HOTPLUG == '1'):
return removable_devices
def ChooseRemovableDevice(self, devices):
"""Lists all removable devices and asks user to select/confirm.
devices: a list of device names (e.g. ['sda', 'sdb']).
The device name chosen by the user.
idx = cros_build_lib.GetChoice(
'Removable device(s) found. Please select/confirm to continue:',
[self.GetRemovableDeviceDescription(x) for x in devices])
return devices[idx]
def CopyImageToDevice(self, image, device):
"""Copies |image| to the removable |device|.
image: Path to the image to copy.
device: Device to copy to.
cmd = ['dd', 'if=%s' % image, 'of=%s' % device, 'bs=4M', 'iflag=fullblock',
'oflag=direct', 'conv=fdatasync']
if logging.getLogger().getEffectiveLevel() <= logging.NOTICE:
op = UsbImagerOperation(image)
op.Run(cros_build_lib.sudo_run, cmd, debug_level=logging.NOTICE,
encoding='utf-8', update_period=0.5)
cmd, debug_level=logging.NOTICE,
print_cmd=logging.getLogger().getEffectiveLevel() < logging.NOTICE)
# dd likely didn't put the backup GPT in the last block. sfdisk fixes this
# up for us with a 'write' command, so we have a standards-conforming GPT.
# Ignore errors because sfdisk (util-linux < v2.32) isn't always happy to
# fix GPT correctness issues.
cros_build_lib.sudo_run(['sfdisk', device], input='write\n',
cros_build_lib.sudo_run(['partx', '-u', device],
cros_build_lib.sudo_run(['sync', '-d', device],
def _GetImagePath(self):
"""Returns the image path to use."""
image_path = None
if os.path.isfile(self.image):
if not self.yes and not _IsFilePathGPTDiskImage(self.image):
# TODO(wnwen): Open the tarball and if there is just one file in it,
# use that instead. Existing code in
if cros_build_lib.BooleanPrompt(
prolog='The given image file is not a valid disk image. Perhaps '
'you forgot to untar it.',
prompt='Terminate the current flash process?'):
raise FlashError('Update terminated by user.')
image_path = self.image
elif os.path.isdir(self.image):
# Ask user which image (*.bin) in the folder to use.
image_path = _ChooseImageFromDirectory(self.image)
# Translate the xbuddy path to get the exact image to use.
_, image_path = ds_wrapper.GetImagePathWithXbuddy(
self.image, self.board, self.version)'Using image %s', image_path)
return image_path
def Run(self):
"""Image the removable device."""
devices = self.ListAllRemovableDevices()
if self.device:
# If user specified a device path, check if it exists.
if not os.path.exists(self.device):
raise FlashError('Device path %s does not exist.' % self.device)
# Then check if it is removable.
if self.device not in [self.DeviceNameToPath(x) for x in devices]:
msg = '%s is not a removable device.' % self.device
if not (self.yes or cros_build_lib.BooleanPrompt(
default=False, prolog=msg)):
raise FlashError('You can specify usb:// to choose from a list of '
'removable devices.')
target = None
if self.device:
# Get device name from path (e.g. sdc in /dev/sdc).
target = self.device.rsplit(os.path.sep, 1)[-1]
elif devices:
# Ask user to choose from the list.
target = self.ChooseRemovableDevice(devices)
raise FlashError('No removable devices detected.')
image_path = self._GetImagePath()
device = self.DeviceNameToPath(target)
device_size_bytes = osutils.GetDeviceSize(device, in_bytes=True)
image_size_bytes = os.path.getsize(image_path)
if device_size_bytes < image_size_bytes:
raise FlashError(
'Removable device %s has less storage (%d) than the image size (%d).'
% (device, device_size_bytes, image_size_bytes))
self.CopyImageToDevice(image_path, device)
except cros_build_lib.RunCommandError:
logging.error('Failed copying image to device %s',
class FileImager(USBImager):
"""Copy image to the target path."""
def Run(self):
"""Copy the image to the path specified by self.device."""
if not os.path.isdir(os.path.dirname(self.device)):
raise FlashError('Parent of path %s is not a directory.' % self.device)
image_path = self._GetImagePath()
if os.path.isdir(self.device):'Copying to %s',
os.path.join(self.device, os.path.basename(image_path)))
else:'Copying to %s', self.device)
shutil.copy(image_path, self.device)
except IOError:
logging.error('Failed to copy image %s to %s', image_path, self.device)
# TODO(b/190631159, b/196056723): Change default of no_minios_update to |False|.
def Flash(device, image, board=None, version=None,
no_rootfs_update=False, no_stateful_update=False,
no_minios_update=True, clobber_stateful=False, reboot=True,
ssh_private_key=None, ping=True, disable_rootfs_verification=False,
clear_cache=False, yes=False, force=False, debug=False,
clear_tpm_owner=False, delta=False):
"""Flashes a device, USB drive, or file with an image.
This provides functionality common to `cros flash` and `brillo flash`
so that they can parse the commandline separately but still use the
same underlying functionality.
device: commandline.Device object; None to use the default device.
image: Path (string) to the update image. Can be a local or xbuddy path;
non-existant local paths are converted to xbuddy.
board: Board to use; None to automatically detect.
no_rootfs_update: Don't update rootfs partition; SSH |device| scheme only.
no_stateful_update: Don't update stateful partition; SSH |device| scheme
no_minios_update: Don't update miniOS partition; SSH |device| scheme only.
clobber_stateful: Clobber stateful partition; SSH |device| scheme only.
clear_tpm_owner: Clear the TPM owner on reboot; SSH |device| scheme only.
reboot: Reboot device after update; SSH |device| scheme only.
ssh_private_key: Path to an SSH private key file; None to use test keys.
ping: Ping the device before attempting update; SSH |device| scheme only.
disable_rootfs_verification: Remove rootfs verification after update; SSH
|device| scheme only.
clear_cache: Clear the devserver static directory.
yes: Assume "yes" for any prompt.
force: Ignore confidence checks and prompts. Overrides |yes| if True.
debug: Print additional debugging messages.
version: Default version.
delta: Whether to use delta compression when tranferring image bytes.
FlashError: An unrecoverable error occured.
ValueError: Invalid parameter combination.
if force:
yes = True
if clear_cache:
# The user may not have specified a source image, use version as the default.
image = image or version
if not device or device.scheme == commandline.DEVICE_SCHEME_SSH:
if device:
hostname, port = device.hostname, device.port
hostname, port = None, None
with remote_access.ChromiumOSDeviceHandler(
hostname, port=port,
private_key=ssh_private_key, ping=ping) as device_p:
no_reboot=not reboot,
elif device.scheme == commandline.DEVICE_SCHEME_USB:
path = osutils.ExpandPath(device.path) if device.path else '''Preparing to image the removable device %s', path)
imager = USBImager(path,
elif device.scheme == commandline.DEVICE_SCHEME_FILE:'Preparing to copy image to %s', device.path)
imager = FileImager(device.path,