blob: 1afe142f98094f515898630cca30e11f6d2a0f71 [file] [log] [blame]
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Install/copy the image to the device."""
from __future__ import division
import logging
import os
import re
import shutil
from chromite.cli import device_imager
from chromite.cli.cros import cros_chrome_sdk
from chromite.lib import commandline
from chromite.lib import cros_build_lib
from chromite.lib import dev_server_wrapper as ds_wrapper
from chromite.lib import operation
from chromite.lib import osutils
from chromite.lib import path_util
from chromite.lib import remote_access
def GetDefaultBoard():
"""Look up default board.
In a chrome checkout, return $SDK_BOARD. In a chromeos checkout,
return the contents of .default_board.
"""
if path_util.DetermineCheckout().type == path_util.CHECKOUT_TYPE_GCLIENT:
return os.environ.get(cros_chrome_sdk.SDKFetcher.SDK_BOARD_ENV)
return cros_build_lib.GetDefaultBoard()
class UsbImagerOperation(operation.ProgressBarOperation):
"""Progress bar for flashing image to operation."""
def __init__(self, image):
super().__init__()
self._size = os.path.getsize(image)
self._transferred = 0
self._bytes = re.compile(r'(\d+) bytes')
def _GetDDPid(self):
"""Get the Pid of dd."""
try:
pids = cros_build_lib.run(['pgrep', 'dd'], capture_output=True,
print_cmd=False, encoding='utf-8').stdout
for pid in pids.splitlines():
if osutils.IsChildProcess(int(pid), name='dd'):
return int(pid)
return -1
except cros_build_lib.RunCommandError:
# If dd isn't still running, then we assume that it is finished.
return -1
def _PingDD(self, dd_pid):
"""Send USR1 signal to dd to get status update."""
try:
cmd = ['kill', '-USR1', str(dd_pid)]
cros_build_lib.sudo_run(cmd, print_cmd=False)
except cros_build_lib.RunCommandError:
# Here we assume that dd finished in the background.
return
def ParseOutput(self, output=None):
"""Parse the output of dd to update progress bar."""
dd_pid = self._GetDDPid()
if dd_pid == -1:
return
self._PingDD(dd_pid)
if output is None:
stdout = self._stdout.read()
stderr = self._stderr.read()
output = stdout + stderr
match = self._bytes.search(output)
if match:
self._transferred = int(match.groups()[0])
self.ProgressBar(self._transferred / self._size)
def _IsFilePathGPTDiskImage(file_path, require_pmbr=False):
"""Determines if a file is a valid GPT disk.
Args:
file_path: Path to the file to test.
require_pmbr: Whether to require a PMBR in LBA0.
"""
if os.path.isfile(file_path):
with open(file_path, 'rb') as image_file:
if require_pmbr:
# Seek to the end of LBA0 and look for the PMBR boot signature.
image_file.seek(0x1fe)
if image_file.read(2) != b'\x55\xaa':
return False
# Current file position is start of LBA1 now.
else:
# Seek to LBA1 where the GPT starts.
image_file.seek(0x200)
# See if there's a GPT here.
if image_file.read(8) == b'EFI PART':
return True
return False
def _ChooseImageFromDirectory(dir_path):
"""Lists all image files in |dir_path| and ask user to select one.
Args:
dir_path: Path to the directory.
"""
images = sorted([x for x in os.listdir(dir_path) if
_IsFilePathGPTDiskImage(os.path.join(dir_path, x))])
idx = 0
if not images:
raise ValueError('No image found in %s.' % dir_path)
elif len(images) > 1:
idx = cros_build_lib.GetChoice(
'Multiple images found in %s. Please select one to continue:' % (
(dir_path,)),
images)
return os.path.join(dir_path, images[idx])
class FlashError(Exception):
"""Thrown when there is an unrecoverable error during flash."""
class USBImager(object):
"""Copy image to the target removable device."""
def __init__(self, device, board, image, version, debug=False, yes=False):
"""Initializes USBImager."""
self.device = device
self.board = board if board else GetDefaultBoard()
self.image = image
self.version = version
self.debug = debug
self.debug_level = logging.DEBUG if debug else logging.INFO
self.yes = yes
def DeviceNameToPath(self, device_name):
return '/dev/%s' % device_name
def GetRemovableDeviceDescription(self, device):
"""Returns a informational description of the removable |device|.
Args:
device: the device name (e.g. sdc).
Returns:
A string describing |device| (e.g. Patriot Memory 7918 MB).
"""
desc = [
osutils.GetDeviceInfo(device, keyword='manufacturer'),
osutils.GetDeviceInfo(device, keyword='product'),
osutils.GetDeviceSize(self.DeviceNameToPath(device)),
'(%s)' % self.DeviceNameToPath(device),
]
return ' '.join([x for x in desc if x])
def ListAllRemovableDevices(self):
"""Returns a list of removable devices.
Returns:
A list of device names (e.g. ['sdb', 'sdc']).
"""
devices = osutils.ListBlockDevices()
removable_devices = []
for d in devices:
if d.TYPE == 'disk' and d.RM == '1':
removable_devices.append(d.NAME)
return removable_devices
def ChooseRemovableDevice(self, devices):
"""Lists all removable devices and asks user to select/confirm.
Args:
devices: a list of device names (e.g. ['sda', 'sdb']).
Returns:
The device name chosen by the user.
"""
idx = cros_build_lib.GetChoice(
'Removable device(s) found. Please select/confirm to continue:',
[self.GetRemovableDeviceDescription(x) for x in devices])
return devices[idx]
def CopyImageToDevice(self, image, device):
"""Copies |image| to the removable |device|.
Args:
image: Path to the image to copy.
device: Device to copy to.
"""
cmd = ['dd', 'if=%s' % image, 'of=%s' % device, 'bs=4M', 'iflag=fullblock',
'oflag=direct', 'conv=fdatasync']
if logging.getLogger().getEffectiveLevel() <= logging.NOTICE:
op = UsbImagerOperation(image)
op.Run(cros_build_lib.sudo_run, cmd, debug_level=logging.NOTICE,
encoding='utf-8', update_period=0.5)
else:
cros_build_lib.sudo_run(
cmd, debug_level=logging.NOTICE,
print_cmd=logging.getLogger().getEffectiveLevel() < logging.NOTICE)
# dd likely didn't put the backup GPT in the last block. sfdisk fixes this
# up for us with a 'write' command, so we have a standards-conforming GPT.
# Ignore errors because sfdisk (util-linux < v2.32) isn't always happy to
# fix GPT sanity issues.
cros_build_lib.sudo_run(['sfdisk', device], input='write\n',
check=False,
debug_level=self.debug_level)
cros_build_lib.sudo_run(['partx', '-u', device],
debug_level=self.debug_level)
cros_build_lib.sudo_run(['sync', '-d', device],
debug_level=self.debug_level)
def _GetImagePath(self):
"""Returns the image path to use."""
image_path = None
if os.path.isfile(self.image):
if not self.yes and not _IsFilePathGPTDiskImage(self.image):
# TODO(wnwen): Open the tarball and if there is just one file in it,
# use that instead. Existing code in upload_symbols.py.
if cros_build_lib.BooleanPrompt(
prolog='The given image file is not a valid disk image. Perhaps '
'you forgot to untar it.',
prompt='Terminate the current flash process?'):
raise FlashError('Update terminated by user.')
image_path = self.image
elif os.path.isdir(self.image):
# Ask user which image (*.bin) in the folder to use.
image_path = _ChooseImageFromDirectory(self.image)
else:
# Translate the xbuddy path to get the exact image to use.
_, image_path = ds_wrapper.GetImagePathWithXbuddy(
self.image, self.board, self.version)
logging.info('Using image %s', image_path)
return image_path
def Run(self):
"""Image the removable device."""
devices = self.ListAllRemovableDevices()
if self.device:
# If user specified a device path, check if it exists.
if not os.path.exists(self.device):
raise FlashError('Device path %s does not exist.' % self.device)
# Then check if it is removable.
if self.device not in [self.DeviceNameToPath(x) for x in devices]:
msg = '%s is not a removable device.' % self.device
if not (self.yes or cros_build_lib.BooleanPrompt(
default=False, prolog=msg)):
raise FlashError('You can specify usb:// to choose from a list of '
'removable devices.')
target = None
if self.device:
# Get device name from path (e.g. sdc in /dev/sdc).
target = self.device.rsplit(os.path.sep, 1)[-1]
elif devices:
# Ask user to choose from the list.
target = self.ChooseRemovableDevice(devices)
else:
raise FlashError('No removable devices detected.')
image_path = self._GetImagePath()
try:
device = self.DeviceNameToPath(target)
self.CopyImageToDevice(image_path, device)
except cros_build_lib.RunCommandError:
logging.error('Failed copying image to device %s',
self.DeviceNameToPath(target))
class FileImager(USBImager):
"""Copy image to the target path."""
def Run(self):
"""Copy the image to the path specified by self.device."""
if not os.path.isdir(os.path.dirname(self.device)):
raise FlashError('Parent of path %s is not a directory.' % self.device)
image_path = self._GetImagePath()
if os.path.isdir(self.device):
logging.info('Copying to %s',
os.path.join(self.device, os.path.basename(image_path)))
else:
logging.info('Copying to %s', self.device)
try:
shutil.copy(image_path, self.device)
except IOError:
logging.error('Failed to copy image %s to %s', image_path, self.device)
# TODO(b/190631159, b/196056723): Change default of no_minios_update to |False|.
def Flash(device, image, board=None, version=None,
no_rootfs_update=False, no_stateful_update=False,
no_minios_update=True, clobber_stateful=False, reboot=True,
ssh_private_key=None, ping=True, disable_rootfs_verification=False,
clear_cache=False, yes=False, force=False, debug=False,
clear_tpm_owner=False):
"""Flashes a device, USB drive, or file with an image.
This provides functionality common to `cros flash` and `brillo flash`
so that they can parse the commandline separately but still use the
same underlying functionality.
Args:
device: commandline.Device object; None to use the default device.
image: Path (string) to the update image. Can be a local or xbuddy path;
non-existant local paths are converted to xbuddy.
board: Board to use; None to automatically detect.
no_rootfs_update: Don't update rootfs partition; SSH |device| scheme only.
no_stateful_update: Don't update stateful partition; SSH |device| scheme
only.
no_minios_update: Don't update miniOS partition; SSH |device| scheme only.
clobber_stateful: Clobber stateful partition; SSH |device| scheme only.
clear_tpm_owner: Clear the TPM owner on reboot; SSH |device| scheme only.
reboot: Reboot device after update; SSH |device| scheme only.
ssh_private_key: Path to an SSH private key file; None to use test keys.
ping: Ping the device before attempting update; SSH |device| scheme only.
disable_rootfs_verification: Remove rootfs verification after update; SSH
|device| scheme only.
clear_cache: Clear the devserver static directory.
yes: Assume "yes" for any prompt.
force: Ignore sanity checks and prompts. Overrides |yes| if True.
debug: Print additional debugging messages.
version: Default version.
Raises:
FlashError: An unrecoverable error occured.
ValueError: Invalid parameter combination.
"""
if force:
yes = True
if clear_cache:
ds_wrapper.DevServerWrapper.WipeStaticDirectory()
ds_wrapper.DevServerWrapper.CreateStaticDirectory()
# The user may not have specified a source image, use version as the default.
image = image or version
if not device or device.scheme == commandline.DEVICE_SCHEME_SSH:
if device:
hostname, port = device.hostname, device.port
else:
hostname, port = None, None
with remote_access.ChromiumOSDeviceHandler(
hostname, port=port,
private_key=ssh_private_key, ping=ping) as device_p:
device_imager.DeviceImager(
device_p,
image,
board=board,
version=version,
no_rootfs_update=no_rootfs_update,
no_stateful_update=no_stateful_update,
no_minios_update=no_minios_update,
no_reboot=not reboot,
disable_verification=disable_rootfs_verification,
clobber_stateful=clobber_stateful,
clear_tpm_owner=clear_tpm_owner).Run()
elif device.scheme == commandline.DEVICE_SCHEME_USB:
path = osutils.ExpandPath(device.path) if device.path else ''
logging.info('Preparing to image the removable device %s', path)
imager = USBImager(path,
board,
image,
version,
debug=debug,
yes=yes)
imager.Run()
elif device.scheme == commandline.DEVICE_SCHEME_FILE:
logging.info('Preparing to copy image to %s', device.path)
imager = FileImager(device.path,
board,
image,
version,
debug=debug,
yes=yes)
imager.Run()