blob: 4863e40f78f9230699a83b4010cab4a3b9ae9425 [file] [log] [blame]
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Install/copy the image to the device."""
from __future__ import print_function
import cStringIO
import os
import re
import shutil
import tempfile
import time
from chromite.cbuildbot import constants
from chromite.cli import command
from chromite.lib import commandline
from chromite.lib import cros_build_lib
from chromite.lib import cros_logging as logging
from chromite.lib import dev_server_wrapper as ds_wrapper
from chromite.lib import operation
from chromite.lib import osutils
from chromite.lib import path_util
from chromite.lib import remote_access
DEVSERVER_STATIC_DIR = path_util.FromChrootPath(
os.path.join(constants.CHROOT_SOURCE_ROOT, 'devserver', 'static'))
class UsbImagerOperation(operation.ProgressBarOperation):
"""Progress bar for flashing image to operation."""
def __init__(self, image):
super(UsbImagerOperation, self).__init__()
self._size = os.path.getsize(image)
self._transferred = 0.
self._bytes = re.compile(r'(\d+) bytes')
def _GetDDPid(self):
"""Get the Pid of dd."""
try:
pids = cros_build_lib.RunCommand(['pgrep', 'dd'], capture_output=True,
print_cmd=False).output
for pid in pids.splitlines():
if osutils.IsChildProcess(int(pid), name='dd'):
return int(pid)
return -1
except cros_build_lib.RunCommandError:
# If dd isn't still running, then we assume that it is finished.
return -1
def _PingDD(self, dd_pid):
"""Send USR1 signal to dd to get status update."""
try:
cmd = ['kill', '-USR1', str(dd_pid)]
cros_build_lib.SudoRunCommand(cmd, print_cmd=False)
except cros_build_lib.RunCommandError:
# Here we assume that dd finished in the background.
return
def ParseOutput(self, output=None):
"""Parse the output of dd to update progress bar."""
dd_pid = self._GetDDPid()
if dd_pid == -1:
return
self._PingDD(dd_pid)
if output is None:
stdout = self._stdout.read()
stderr = self._stderr.read()
output = stdout + stderr
match = self._bytes.search(output)
if match:
self._transferred = match.groups()[0]
self.ProgressBar(float(self._transferred) / self._size)
def _IsFilePathGPTDiskImage(file_path):
"""Determines if a file is a valid GPT disk.
Args:
file_path: Path to the file to test.
"""
if os.path.isfile(file_path):
with cros_build_lib.Open(file_path) as image_file:
image_file.seek(0x1fe)
if image_file.read(10) == '\x55\xaaEFI PART':
return True
return False
def _ChooseImageFromDirectory(dir_path):
"""Lists all image files in |dir_path| and ask user to select one.
Args:
dir_path: Path to the directory.
"""
images = sorted([x for x in os.listdir(dir_path) if
_IsFilePathGPTDiskImage(os.path.join(dir_path, x))])
idx = 0
if len(images) == 0:
raise ValueError('No image found in %s.' % dir_path)
elif len(images) > 1:
idx = cros_build_lib.GetChoice(
'Multiple images found in %s. Please select one to continue:' % (
(dir_path,)),
images)
return os.path.join(dir_path, images[idx])
class FlashError(Exception):
"""Thrown when there is an unrecoverable error during flash."""
class USBImager(object):
"""Copy image to the target removable device."""
def __init__(self, device, board, image, debug=False, install=False,
yes=False):
"""Initalizes USBImager."""
self.device = device
self.board = board if board else cros_build_lib.GetDefaultBoard()
self.image = image
self.debug = debug
self.debug_level = logging.DEBUG if debug else logging.INFO
self.install = install
self.yes = yes
def DeviceNameToPath(self, device_name):
return '/dev/%s' % device_name
def GetRemovableDeviceDescription(self, device):
"""Returns a informational description of the removable |device|.
Args:
device: the device name (e.g. sdc).
Returns:
A string describing |device| (e.g. Patriot Memory 7918 MB).
"""
desc = [
osutils.GetDeviceInfo(device, keyword='manufacturer'),
osutils.GetDeviceInfo(device, keyword='product'),
osutils.GetDeviceSize(self.DeviceNameToPath(device)),
'(%s)' % self.DeviceNameToPath(device),
]
return ' '.join([x for x in desc if x])
def ListAllRemovableDevices(self):
"""Returns a list of removable devices.
Returns:
A list of device names (e.g. ['sdb', 'sdc']).
"""
devices = osutils.ListBlockDevices()
removable_devices = []
for d in devices:
if d.TYPE == 'disk' and d.RM == '1':
removable_devices.append(d.NAME)
return removable_devices
def ChooseRemovableDevice(self, devices):
"""Lists all removable devices and asks user to select/confirm.
Args:
devices: a list of device names (e.g. ['sda', 'sdb']).
Returns:
The device name chosen by the user.
"""
idx = cros_build_lib.GetChoice(
'Removable device(s) found. Please select/confirm to continue:',
[self.GetRemovableDeviceDescription(x) for x in devices])
return devices[idx]
def InstallImageToDevice(self, image, device):
"""Installs |image| to the removable |device|.
Args:
image: Path to the image to copy.
device: Device to copy to.
"""
cmd = [
'chromeos-install',
'--yes',
'--skip_src_removable',
'--skip_dst_removable',
'--payload_image=%s' % image,
'--dst=%s' % device,
'--skip_postinstall',
]
cros_build_lib.SudoRunCommand(cmd)
def CopyImageToDevice(self, image, device):
"""Copies |image| to the removable |device|.
Args:
image: Path to the image to copy.
device: Device to copy to.
"""
cmd = ['dd', 'if=%s' % image, 'of=%s' % device, 'bs=4M', 'iflag=fullblock',
'oflag=sync']
if logging.getLogger().getEffectiveLevel() <= logging.NOTICE:
op = UsbImagerOperation(image)
op.Run(cros_build_lib.SudoRunCommand, cmd, debug_level=logging.NOTICE,
update_period=0.5)
else:
cros_build_lib.SudoRunCommand(
cmd, debug_level=logging.NOTICE,
print_cmd=logging.getLogger().getEffectiveLevel() < logging.NOTICE)
cros_build_lib.SudoRunCommand(['sync'], debug_level=self.debug_level)
def _GetImagePath(self):
"""Returns the image path to use."""
image_path = translated_path = None
if os.path.isfile(self.image):
if not self.yes and not _IsFilePathGPTDiskImage(self.image):
# TODO(wnwen): Open the tarball and if there is just one file in it,
# use that instead. Existing code in upload_symbols.py.
if cros_build_lib.BooleanPrompt(
prolog='The given image file is not a valid disk image. Perhaps '
'you forgot to untar it.',
prompt='Terminate the current flash process?'):
raise FlashError('Update terminated by user.')
image_path = self.image
elif os.path.isdir(self.image):
# Ask user which image (*.bin) in the folder to use.
image_path = _ChooseImageFromDirectory(self.image)
else:
# Translate the xbuddy path to get the exact image to use.
translated_path, _ = ds_wrapper.GetImagePathWithXbuddy(
self.image, self.board, static_dir=DEVSERVER_STATIC_DIR)
image_path = ds_wrapper.TranslatedPathToLocalPath(
translated_path, DEVSERVER_STATIC_DIR)
logging.info('Using image %s', translated_path or image_path)
return image_path
def Run(self):
"""Image the removable device."""
devices = self.ListAllRemovableDevices()
if self.device:
# If user specified a device path, check if it exists.
if not os.path.exists(self.device):
raise FlashError('Device path %s does not exist.' % self.device)
# Then check if it is removable.
if self.device not in [self.DeviceNameToPath(x) for x in devices]:
msg = '%s is not a removable device.' % self.device
if not (self.yes or cros_build_lib.BooleanPrompt(
default=False, prolog=msg)):
raise FlashError('You can specify usb:// to choose from a list of '
'removable devices.')
target = None
if self.device:
# Get device name from path (e.g. sdc in /dev/sdc).
target = self.device.rsplit(os.path.sep, 1)[-1]
elif devices:
# Ask user to choose from the list.
target = self.ChooseRemovableDevice(devices)
else:
raise FlashError('No removable devices detected.')
image_path = self._GetImagePath()
try:
device = self.DeviceNameToPath(target)
if self.install:
self.InstallImageToDevice(image_path, device)
else:
self.CopyImageToDevice(image_path, device)
except cros_build_lib.RunCommandError:
logging.error('Failed copying image to device %s',
self.DeviceNameToPath(target))
class FileImager(USBImager):
"""Copy image to the target path."""
def Run(self):
"""Copy the image to the path specified by self.device."""
if not os.path.exists(self.device):
raise FlashError('Path %s does not exist.' % self.device)
image_path = self._GetImagePath()
if os.path.isdir(self.device):
logging.info('Copying to %s',
os.path.join(self.device, os.path.basename(image_path)))
else:
logging.info('Copying to %s', self.device)
try:
shutil.copy(image_path, self.device)
except IOError:
logging.error('Failed to copy image %s to %s', image_path, self.device)
class RemoteDeviceUpdater(object):
"""Performs update on a remote device."""
DEVSERVER_FILENAME = 'devserver.py'
STATEFUL_UPDATE_BIN = '/usr/bin/stateful_update'
UPDATE_ENGINE_BIN = 'update_engine_client'
# Root working directory on the device. This directory is in the
# stateful partition and thus has enough space to store the payloads.
DEVICE_BASE_DIR = '/mnt/stateful_partition/cros-flash'
UPDATE_CHECK_INTERVAL_PROGRESSBAR = 0.5
UPDATE_CHECK_INTERVAL_NORMAL = 10
def __init__(self, ssh_hostname, ssh_port, image, stateful_update=True,
rootfs_update=True, clobber_stateful=False, reboot=True,
board=None, src_image_to_delta=None, wipe=True, debug=False,
yes=False, force=False, ping=True,
disable_verification=False):
"""Initializes RemoteDeviceUpdater"""
if not stateful_update and not rootfs_update:
raise ValueError('No update operation to perform; either stateful or'
' rootfs partitions must be updated.')
self.tempdir = tempfile.mkdtemp(prefix='cros-flash')
self.ssh_hostname = ssh_hostname
self.ssh_port = ssh_port
self.image = image
self.board = board
self.src_image_to_delta = src_image_to_delta
self.do_stateful_update = stateful_update
self.do_rootfs_update = rootfs_update
self.disable_verification = disable_verification
self.clobber_stateful = clobber_stateful
self.reboot = reboot
self.debug = debug
self.ping = ping
# Do not wipe if debug is set.
self.wipe = wipe and not debug
self.yes = yes
self.force = force
# pylint: disable=unbalanced-tuple-unpacking
@classmethod
def GetUpdateStatus(cls, device, keys=None):
"""Returns the status of the update engine on the |device|.
Retrieves the status from update engine and confirms all keys are
in the status.
Args:
device: A ChromiumOSDevice object.
keys: the keys to look for in the status result (defaults to
['CURRENT_OP']).
Returns:
A list of values in the order of |keys|.
"""
keys = ['CURRENT_OP'] if not keys else keys
result = device.RunCommand([cls.UPDATE_ENGINE_BIN, '--status'],
capture_output=True)
if not result.output:
raise Exception('Cannot get update status')
try:
status = cros_build_lib.LoadKeyValueFile(
cStringIO.StringIO(result.output))
except ValueError:
raise ValueError('Cannot parse update status')
values = []
for key in keys:
if key not in status:
raise ValueError('Missing %s in the update engine status')
values.append(status.get(key))
return values
def UpdateStateful(self, device, payload, clobber=False):
"""Update the stateful partition of the device.
Args:
device: The ChromiumOSDevice object to update.
payload: The path to the update payload.
clobber: Clobber stateful partition (defaults to False).
"""
# Copy latest stateful_update to device.
stateful_update_bin = path_util.FromChrootPath(self.STATEFUL_UPDATE_BIN)
device.CopyToWorkDir(stateful_update_bin)
msg = 'Updating stateful partition'
logging.info('Copying stateful payload to device...')
device.CopyToWorkDir(payload)
cmd = ['sh',
os.path.join(device.work_dir,
os.path.basename(self.STATEFUL_UPDATE_BIN)),
os.path.join(device.work_dir, os.path.basename(payload))]
if clobber:
cmd.append('--stateful_change=clean')
msg += ' with clobber enabled'
logging.info('%s...', msg)
try:
device.RunCommand(cmd)
except cros_build_lib.RunCommandError:
logging.error('Faild to perform stateful partition update.')
def _CopyDevServerPackage(self, device, tempdir):
"""Copy devserver package to work directory of device.
Args:
device: The ChromiumOSDevice object to copy the package to.
tempdir: The directory to temporarily store devserver package.
"""
logging.info('Copying devserver package to device...')
src_dir = os.path.join(tempdir, 'src')
osutils.RmDir(src_dir, ignore_missing=True)
shutil.copytree(
ds_wrapper.DEVSERVER_PKG_DIR, src_dir,
ignore=shutil.ignore_patterns('*.pyc', 'tmp*', '.*', 'static', '*~'))
device.CopyToWorkDir(src_dir)
return os.path.join(device.work_dir, os.path.basename(src_dir))
def SetupRootfsUpdate(self, device):
"""Makes sure |device| is ready for rootfs update."""
logging.info('Checking if update engine is idle...')
status, = self.GetUpdateStatus(device)
if status == 'UPDATE_STATUS_UPDATED_NEED_REBOOT':
logging.info('Device needs to reboot before updating...')
device.Reboot()
status, = self.GetUpdateStatus(device)
if status != 'UPDATE_STATUS_IDLE':
raise FlashError('Update engine is not idle. Status: %s' % status)
def UpdateRootfs(self, device, payload, tempdir):
"""Update the rootfs partition of the device.
Args:
device: The ChromiumOSDevice object to update.
payload: The path to the update payload.
tempdir: The directory to store temporary files.
"""
# Setup devserver and payload on the target device.
static_dir = os.path.join(device.work_dir, 'static')
payload_dir = os.path.join(static_dir, 'pregenerated')
src_dir = self._CopyDevServerPackage(device, tempdir)
device.RunCommand(['mkdir', '-p', payload_dir])
logging.info('Copying rootfs payload to device...')
device.CopyToDevice(payload, payload_dir)
devserver_bin = os.path.join(src_dir, self.DEVSERVER_FILENAME)
ds = ds_wrapper.RemoteDevServerWrapper(
device, devserver_bin, static_dir=static_dir, log_dir=device.work_dir)
logging.info('Updating rootfs partition')
try:
ds.Start()
# Use the localhost IP address to ensure that update engine
# client can connect to the devserver.
omaha_url = ds.GetDevServerURL(
ip='127.0.0.1', port=ds.port, sub_dir='update/pregenerated')
cmd = [self.UPDATE_ENGINE_BIN, '-check_for_update',
'-omaha_url=%s' % omaha_url]
device.RunCommand(cmd)
# If we are using a progress bar, update it every 0.5s instead of 10s.
if command.UseProgressBar():
update_check_interval = self.UPDATE_CHECK_INTERVAL_PROGRESSBAR
oper = operation.ProgressBarOperation()
else:
update_check_interval = self.UPDATE_CHECK_INTERVAL_NORMAL
oper = None
end_message_not_printed = True
# Loop until update is complete.
while True:
op, progress = self.GetUpdateStatus(device, ['CURRENT_OP', 'PROGRESS'])
logging.info('Waiting for update...status: %s at progress %s',
op, progress)
if op == 'UPDATE_STATUS_UPDATED_NEED_REBOOT':
logging.notice('Update completed.')
break
if op == 'UPDATE_STATUS_IDLE':
raise FlashError(
'Update failed with unexpected update status: %s' % op)
if oper is not None:
if op == 'UPDATE_STATUS_DOWNLOADING':
oper.ProgressBar(float(progress))
elif end_message_not_printed and op == 'UPDATE_STATUS_FINALIZING':
oper.Cleanup()
logging.notice('Finalizing image.')
end_message_not_printed = False
time.sleep(update_check_interval)
ds.Stop()
except Exception:
logging.error('Rootfs update failed.')
logging.warning(ds.TailLog() or 'No devserver log is available.')
raise
finally:
ds.Stop()
device.CopyFromDevice(ds.log_file,
os.path.join(tempdir, 'target_devserver.log'),
error_code_ok=True)
device.CopyFromDevice('/var/log/update_engine.log', tempdir,
follow_symlinks=True,
error_code_ok=True)
def _CheckPayloads(self, payload_dir):
"""Checks that all update payloads exists in |payload_dir|."""
filenames = []
filenames += [ds_wrapper.ROOTFS_FILENAME] if self.do_rootfs_update else []
if self.do_stateful_update:
filenames += [ds_wrapper.STATEFUL_FILENAME]
for fname in filenames:
payload = os.path.join(payload_dir, fname)
if not os.path.exists(payload):
raise FlashError('Payload %s does not exist!' % payload)
def Verify(self, old_root_dev, new_root_dev):
"""Verifies that the root deivce changed after reboot."""
assert new_root_dev and old_root_dev
if new_root_dev == old_root_dev:
raise FlashError(
'Failed to boot into the new version. Possibly there was a '
'signing problem, or an automated rollback occurred because '
'your new image failed to boot.')
@classmethod
def GetRootDev(cls, device):
"""Get the current root device on |device|."""
rootdev = device.RunCommand(
['rootdev', '-s'], capture_output=True).output.strip()
logging.debug('Current root device is %s', rootdev)
return rootdev
def Cleanup(self):
"""Cleans up the temporary directory."""
if self.wipe:
logging.info('Cleaning up temporary working directory...')
osutils.RmDir(self.tempdir)
else:
logging.info('You can find the log files and/or payloads in %s',
self.tempdir)
def _CanRunDevserver(self, device, tempdir):
"""We can run devserver on |device|.
If the stateful partition is corrupted, Python or other packages
(e.g. cherrypy) needed for rootfs update may be missing on |device|.
This will also use `ldconfig` to update library paths on the target
device if it looks like that's causing problems, which is necessary
for base images.
Args:
device: A ChromiumOSDevice object.
tempdir: A temporary directory to store files.
Returns:
True if we can start devserver; False otherwise.
"""
logging.info('Checking if we can run devserver on the device.')
src_dir = self._CopyDevServerPackage(device, tempdir)
devserver_bin = os.path.join(src_dir, self.DEVSERVER_FILENAME)
devserver_check_command = ['python', devserver_bin, '--help']
try:
device.RunCommand(devserver_check_command)
except cros_build_lib.RunCommandError as e:
logging.warning('Cannot start devserver: %s', e)
if 'python: error while loading shared libraries' in str(e):
logging.info('Attempting to correct device library paths...')
try:
device.RunCommand(['ldconfig', '-r', '/'])
device.RunCommand(devserver_check_command)
logging.info('Library path correction successful.')
return True
except cros_build_lib.RunCommandError as e2:
logging.warning('Library path correction failed: %s', e2)
return False
return True
def Run(self):
"""Performs remote device update."""
old_root_dev, new_root_dev = None, None
try:
device_connected = False
with remote_access.ChromiumOSDeviceHandler(
self.ssh_hostname, port=self.ssh_port,
base_dir=self.DEVICE_BASE_DIR, ping=self.ping) as device:
device_connected = True
payload_dir = self.tempdir
if os.path.isdir(self.image):
# If the given path is a directory, we use the provided update
# payload(s) in the directory.
payload_dir = self.image
logging.info('Using provided payloads in %s', payload_dir)
elif os.path.isfile(self.image):
# If the given path is an image, make sure devserver can access it
# and generate payloads.
logging.info('Using image %s', self.image)
ds_wrapper.GetUpdatePayloadsFromLocalPath(
self.image, payload_dir,
src_image_to_delta=self.src_image_to_delta,
static_dir=DEVSERVER_STATIC_DIR)
else:
self.board = cros_build_lib.GetBoard(device_board=device.board,
override_board=self.board,
force=self.yes)
if not self.board:
raise FlashError('No board identified')
if not self.force and self.board != device.board:
# If a board was specified, it must be compatible with the device.
raise FlashError('Device (%s) is incompatible with board %s',
device.board, self.board)
logging.info('Board is %s', self.board)
# Translate the xbuddy path to get the exact image to use.
translated_path, resolved_path = ds_wrapper.GetImagePathWithXbuddy(
self.image, self.board, static_dir=DEVSERVER_STATIC_DIR,
lookup_only=True)
logging.info('Using image %s', translated_path)
# Convert the translated path to be used in the update request.
image_path = ds_wrapper.ConvertTranslatedPath(resolved_path,
translated_path)
# Launch a local devserver to generate/serve update payloads.
ds_wrapper.GetUpdatePayloads(
image_path, payload_dir, board=self.board,
src_image_to_delta=self.src_image_to_delta,
static_dir=DEVSERVER_STATIC_DIR)
# Verify that all required payloads are in the payload directory.
self._CheckPayloads(payload_dir)
restore_stateful = False
if (not self._CanRunDevserver(device, self.tempdir) and
self.do_rootfs_update):
msg = ('Cannot start devserver! The stateful partition may be '
'corrupted.')
prompt = 'Attempt to restore the stateful partition?'
restore_stateful = self.yes or cros_build_lib.BooleanPrompt(
prompt=prompt, default=False, prolog=msg)
if not restore_stateful:
raise FlashError('Cannot continue to perform rootfs update!')
if restore_stateful:
logging.warning('Restoring the stateful partition...')
payload = os.path.join(payload_dir, ds_wrapper.STATEFUL_FILENAME)
self.UpdateStateful(device, payload, clobber=self.clobber_stateful)
device.Reboot()
if self._CanRunDevserver(device, self.tempdir):
logging.info('Stateful partition restored.')
else:
raise FlashError('Unable to restore stateful partition.')
# Perform device updates.
if self.do_rootfs_update:
self.SetupRootfsUpdate(device)
# Record the current root device. This must be done after
# SetupRootfsUpdate because SetupRootfsUpdate may reboot the
# device if there is a pending update, which changes the
# root device.
old_root_dev = self.GetRootDev(device)
payload = os.path.join(payload_dir, ds_wrapper.ROOTFS_FILENAME)
self.UpdateRootfs(device, payload, self.tempdir)
logging.info('Rootfs update completed.')
if self.do_stateful_update and not restore_stateful:
payload = os.path.join(payload_dir, ds_wrapper.STATEFUL_FILENAME)
self.UpdateStateful(device, payload, clobber=self.clobber_stateful)
logging.info('Stateful update completed.')
if self.reboot:
logging.notice('Rebooting device...')
device.Reboot()
if self.clobber_stateful:
# --clobber-stateful wipes the stateful partition and the
# working directory on the device no longer exists. To
# remedy this, we recreate the working directory here.
device.BaseRunCommand(['mkdir', '-p', device.work_dir])
if self.do_rootfs_update and self.reboot:
logging.notice('Verifying that the device has been updated...')
new_root_dev = self.GetRootDev(device)
self.Verify(old_root_dev, new_root_dev)
if self.disable_verification:
logging.info('Disabling rootfs verification on the device...')
device.DisableRootfsVerification()
except Exception:
logging.error('Device update failed.')
if device_connected and device.lsb_release:
lsb_entries = sorted(device.lsb_release.items())
logging.info('Following are the LSB version details of the device:\n%s',
'\n'.join('%s=%s' % (k, v) for k, v in lsb_entries))
raise
else:
logging.notice('Update performed successfully.')
finally:
self.Cleanup()
def Flash(device, image, board=None, install=False, src_image_to_delta=None,
rootfs_update=True, stateful_update=True, clobber_stateful=False,
reboot=True, wipe=True, ping=True, disable_rootfs_verification=False,
clear_cache=False, yes=False, force=False, debug=False):
"""Flashes a device, USB drive, or file with an image.
This provides functionality common to `cros flash` and `brillo flash`
so that they can parse the commandline separately but still use the
same underlying functionality.
Args:
device: commandline.Device object; None to use the default device.
image: Path (string) to the update image. Can be a local or xbuddy path;
non-existant local paths are converted to xbuddy.
board: Board to use; None to automatically detect.
install: Install to USB using base disk layout; USB |device| scheme only.
src_image_to_delta: Local path to an image to be used as the base to
generate delta payloads; SSH |device| scheme only.
rootfs_update: Update rootfs partition; SSH |device| scheme only.
stateful_update: Update stateful partition; SSH |device| scheme only.
clobber_stateful: Clobber stateful partition; SSH |device| scheme only.
reboot: Reboot device after update; SSH |device| scheme only.
wipe: Wipe temporary working directory; SSH |device| scheme only.
ping: Ping the device before attempting update; SSH |device| scheme only.
disable_rootfs_verification: Remove rootfs verification after update; SSH
|device| scheme only.
clear_cache: Clear the devserver static directory.
yes: Assume "yes" for any prompt.
force: Ignore sanity checks and prompts. Overrides |yes| if True.
debug: Print additional debugging messages.
Raises:
FlashError: An unrecoverable error occured.
ValueError: Invalid parameter combination.
"""
if force:
yes = True
if clear_cache:
logging.info('Clearing the cache...')
ds_wrapper.DevServerWrapper.WipeStaticDirectory(DEVSERVER_STATIC_DIR)
try:
osutils.SafeMakedirsNonRoot(DEVSERVER_STATIC_DIR)
except OSError:
logging.error('Failed to create %s', DEVSERVER_STATIC_DIR)
if install:
if not device or device.scheme != commandline.DEVICE_SCHEME_USB:
raise ValueError(
'--install can only be used when writing to a USB device')
if not cros_build_lib.IsInsideChroot():
raise ValueError('--install can only be used inside the chroot')
if not device or device.scheme == commandline.DEVICE_SCHEME_SSH:
if device:
hostname, port = device.hostname, device.port
else:
hostname, port = None, None
logging.notice('Preparing to update the remote device %s', hostname)
updater = RemoteDeviceUpdater(
hostname,
port,
image,
board=board,
src_image_to_delta=src_image_to_delta,
rootfs_update=rootfs_update,
stateful_update=stateful_update,
clobber_stateful=clobber_stateful,
reboot=reboot,
wipe=wipe,
debug=debug,
yes=yes,
force=force,
ping=ping,
disable_verification=disable_rootfs_verification)
updater.Run()
elif device.scheme == commandline.DEVICE_SCHEME_USB:
path = osutils.ExpandPath(device.path) if device.path else ''
logging.info('Preparing to image the removable device %s', path)
imager = USBImager(path,
board,
image,
debug=debug,
install=install,
yes=yes)
imager.Run()
elif device.scheme == commandline.DEVICE_SCHEME_FILE:
logging.info('Preparing to copy image to %s', device.path)
imager = FileImager(device.path,
board,
image,
debug=debug,
yes=yes)
imager.Run()