| # Copyright 2015 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Install/copy the image to the device.""" |
| |
| from __future__ import division |
| |
| import logging |
| import os |
| import re |
| import shutil |
| |
| from chromite.cli import device_imager |
| from chromite.cli.cros import cros_chrome_sdk |
| from chromite.lib import commandline |
| from chromite.lib import cros_build_lib |
| from chromite.lib import dev_server_wrapper as ds_wrapper |
| from chromite.lib import operation |
| from chromite.lib import osutils |
| from chromite.lib import path_util |
| from chromite.lib import remote_access |
| |
| |
| def GetDefaultBoard(): |
| """Look up default board. |
| |
| In a chrome checkout, return $SDK_BOARD. In a chromeos checkout, |
| return the contents of .default_board. |
| """ |
| if path_util.DetermineCheckout().type == path_util.CHECKOUT_TYPE_GCLIENT: |
| return os.environ.get(cros_chrome_sdk.SDKFetcher.SDK_BOARD_ENV) |
| return cros_build_lib.GetDefaultBoard() |
| |
| |
| class UsbImagerOperation(operation.ProgressBarOperation): |
| """Progress bar for flashing image to operation.""" |
| |
| def __init__(self, image): |
| super().__init__() |
| self._size = os.path.getsize(image) |
| self._transferred = 0 |
| self._bytes = re.compile(r'(\d+) bytes') |
| |
| def _GetDDPid(self): |
| """Get the Pid of dd.""" |
| try: |
| pids = cros_build_lib.run(['pgrep', 'dd'], capture_output=True, |
| print_cmd=False, encoding='utf-8').stdout |
| for pid in pids.splitlines(): |
| if osutils.IsChildProcess(int(pid), name='dd'): |
| return int(pid) |
| return -1 |
| except cros_build_lib.RunCommandError: |
| # If dd isn't still running, then we assume that it is finished. |
| return -1 |
| |
| def _PingDD(self, dd_pid): |
| """Send USR1 signal to dd to get status update.""" |
| try: |
| cmd = ['kill', '-USR1', str(dd_pid)] |
| cros_build_lib.sudo_run(cmd, print_cmd=False) |
| except cros_build_lib.RunCommandError: |
| # Here we assume that dd finished in the background. |
| return |
| |
| def ParseOutput(self, output=None): |
| """Parse the output of dd to update progress bar.""" |
| dd_pid = self._GetDDPid() |
| if dd_pid == -1: |
| return |
| |
| self._PingDD(dd_pid) |
| |
| if output is None: |
| stdout = self._stdout.read() |
| stderr = self._stderr.read() |
| output = stdout + stderr |
| |
| match = self._bytes.search(output) |
| if match: |
| self._transferred = int(match.groups()[0]) |
| |
| self.ProgressBar(self._transferred / self._size) |
| |
| |
| def _IsFilePathGPTDiskImage(file_path, require_pmbr=False): |
| """Determines if a file is a valid GPT disk. |
| |
| Args: |
| file_path: Path to the file to test. |
| require_pmbr: Whether to require a PMBR in LBA0. |
| """ |
| if os.path.isfile(file_path): |
| with open(file_path, 'rb') as image_file: |
| if require_pmbr: |
| # Seek to the end of LBA0 and look for the PMBR boot signature. |
| image_file.seek(0x1fe) |
| if image_file.read(2) != b'\x55\xaa': |
| return False |
| # Current file position is start of LBA1 now. |
| else: |
| # Seek to LBA1 where the GPT starts. |
| image_file.seek(0x200) |
| |
| # See if there's a GPT here. |
| if image_file.read(8) == b'EFI PART': |
| return True |
| |
| return False |
| |
| |
| def _ChooseImageFromDirectory(dir_path): |
| """Lists all image files in |dir_path| and ask user to select one. |
| |
| Args: |
| dir_path: Path to the directory. |
| """ |
| images = sorted([x for x in os.listdir(dir_path) if |
| _IsFilePathGPTDiskImage(os.path.join(dir_path, x))]) |
| idx = 0 |
| if not images: |
| raise ValueError('No image found in %s.' % dir_path) |
| elif len(images) > 1: |
| idx = cros_build_lib.GetChoice( |
| 'Multiple images found in %s. Please select one to continue:' % ( |
| (dir_path,)), |
| images) |
| |
| return os.path.join(dir_path, images[idx]) |
| |
| |
| class FlashError(Exception): |
| """Thrown when there is an unrecoverable error during flash.""" |
| |
| |
| class USBImager(object): |
| """Copy image to the target removable device.""" |
| |
| def __init__(self, device, board, image, version, debug=False, yes=False): |
| """Initializes USBImager.""" |
| self.device = device |
| self.board = board if board else GetDefaultBoard() |
| self.image = image |
| self.version = version |
| self.debug = debug |
| self.debug_level = logging.DEBUG if debug else logging.INFO |
| self.yes = yes |
| |
| def DeviceNameToPath(self, device_name): |
| return '/dev/%s' % device_name |
| |
| def GetRemovableDeviceDescription(self, device): |
| """Returns a informational description of the removable |device|. |
| |
| Args: |
| device: the device name (e.g. sdc). |
| |
| Returns: |
| A string describing |device| (e.g. Patriot Memory 7918 MB). |
| """ |
| desc = [ |
| osutils.GetDeviceInfo(device, keyword='manufacturer'), |
| osutils.GetDeviceInfo(device, keyword='product'), |
| osutils.GetDeviceSize(self.DeviceNameToPath(device)), |
| '(%s)' % self.DeviceNameToPath(device), |
| ] |
| return ' '.join([x for x in desc if x]) |
| |
| def ListAllRemovableDevices(self): |
| """Returns a list of removable devices. |
| |
| Returns: |
| A list of device names (e.g. ['sdb', 'sdc']). |
| """ |
| devices = osutils.ListBlockDevices() |
| removable_devices = [] |
| for d in devices: |
| if d.TYPE == 'disk' and d.RM == '1': |
| removable_devices.append(d.NAME) |
| |
| return removable_devices |
| |
| def ChooseRemovableDevice(self, devices): |
| """Lists all removable devices and asks user to select/confirm. |
| |
| Args: |
| devices: a list of device names (e.g. ['sda', 'sdb']). |
| |
| Returns: |
| The device name chosen by the user. |
| """ |
| idx = cros_build_lib.GetChoice( |
| 'Removable device(s) found. Please select/confirm to continue:', |
| [self.GetRemovableDeviceDescription(x) for x in devices]) |
| |
| return devices[idx] |
| |
| def CopyImageToDevice(self, image, device): |
| """Copies |image| to the removable |device|. |
| |
| Args: |
| image: Path to the image to copy. |
| device: Device to copy to. |
| """ |
| cmd = ['dd', 'if=%s' % image, 'of=%s' % device, 'bs=4M', 'iflag=fullblock', |
| 'oflag=direct', 'conv=fdatasync'] |
| if logging.getLogger().getEffectiveLevel() <= logging.NOTICE: |
| op = UsbImagerOperation(image) |
| op.Run(cros_build_lib.sudo_run, cmd, debug_level=logging.NOTICE, |
| encoding='utf-8', update_period=0.5) |
| else: |
| cros_build_lib.sudo_run( |
| cmd, debug_level=logging.NOTICE, |
| print_cmd=logging.getLogger().getEffectiveLevel() < logging.NOTICE) |
| |
| # dd likely didn't put the backup GPT in the last block. sfdisk fixes this |
| # up for us with a 'write' command, so we have a standards-conforming GPT. |
| # Ignore errors because sfdisk (util-linux < v2.32) isn't always happy to |
| # fix GPT sanity issues. |
| cros_build_lib.sudo_run(['sfdisk', device], input='write\n', |
| check=False, |
| debug_level=self.debug_level) |
| |
| cros_build_lib.sudo_run(['partx', '-u', device], |
| debug_level=self.debug_level) |
| cros_build_lib.sudo_run(['sync', '-d', device], |
| debug_level=self.debug_level) |
| |
| def _GetImagePath(self): |
| """Returns the image path to use.""" |
| image_path = None |
| if os.path.isfile(self.image): |
| if not self.yes and not _IsFilePathGPTDiskImage(self.image): |
| # TODO(wnwen): Open the tarball and if there is just one file in it, |
| # use that instead. Existing code in upload_symbols.py. |
| if cros_build_lib.BooleanPrompt( |
| prolog='The given image file is not a valid disk image. Perhaps ' |
| 'you forgot to untar it.', |
| prompt='Terminate the current flash process?'): |
| raise FlashError('Update terminated by user.') |
| image_path = self.image |
| elif os.path.isdir(self.image): |
| # Ask user which image (*.bin) in the folder to use. |
| image_path = _ChooseImageFromDirectory(self.image) |
| else: |
| # Translate the xbuddy path to get the exact image to use. |
| _, image_path = ds_wrapper.GetImagePathWithXbuddy( |
| self.image, self.board, self.version) |
| |
| logging.info('Using image %s', image_path) |
| return image_path |
| |
| def Run(self): |
| """Image the removable device.""" |
| devices = self.ListAllRemovableDevices() |
| |
| if self.device: |
| # If user specified a device path, check if it exists. |
| if not os.path.exists(self.device): |
| raise FlashError('Device path %s does not exist.' % self.device) |
| |
| # Then check if it is removable. |
| if self.device not in [self.DeviceNameToPath(x) for x in devices]: |
| msg = '%s is not a removable device.' % self.device |
| if not (self.yes or cros_build_lib.BooleanPrompt( |
| default=False, prolog=msg)): |
| raise FlashError('You can specify usb:// to choose from a list of ' |
| 'removable devices.') |
| target = None |
| if self.device: |
| # Get device name from path (e.g. sdc in /dev/sdc). |
| target = self.device.rsplit(os.path.sep, 1)[-1] |
| elif devices: |
| # Ask user to choose from the list. |
| target = self.ChooseRemovableDevice(devices) |
| else: |
| raise FlashError('No removable devices detected.') |
| |
| image_path = self._GetImagePath() |
| try: |
| device = self.DeviceNameToPath(target) |
| self.CopyImageToDevice(image_path, device) |
| except cros_build_lib.RunCommandError: |
| logging.error('Failed copying image to device %s', |
| self.DeviceNameToPath(target)) |
| |
| |
| class FileImager(USBImager): |
| """Copy image to the target path.""" |
| |
| def Run(self): |
| """Copy the image to the path specified by self.device.""" |
| if not os.path.isdir(os.path.dirname(self.device)): |
| raise FlashError('Parent of path %s is not a directory.' % self.device) |
| |
| image_path = self._GetImagePath() |
| if os.path.isdir(self.device): |
| logging.info('Copying to %s', |
| os.path.join(self.device, os.path.basename(image_path))) |
| else: |
| logging.info('Copying to %s', self.device) |
| try: |
| shutil.copy(image_path, self.device) |
| except IOError: |
| logging.error('Failed to copy image %s to %s', image_path, self.device) |
| |
| |
| # TODO(b/190631159, b/196056723): Change default of no_minios_update to |False|. |
| def Flash(device, image, board=None, version=None, |
| no_rootfs_update=False, no_stateful_update=False, |
| no_minios_update=True, clobber_stateful=False, reboot=True, |
| ssh_private_key=None, ping=True, disable_rootfs_verification=False, |
| clear_cache=False, yes=False, force=False, debug=False, |
| clear_tpm_owner=False): |
| """Flashes a device, USB drive, or file with an image. |
| |
| This provides functionality common to `cros flash` and `brillo flash` |
| so that they can parse the commandline separately but still use the |
| same underlying functionality. |
| |
| Args: |
| device: commandline.Device object; None to use the default device. |
| image: Path (string) to the update image. Can be a local or xbuddy path; |
| non-existant local paths are converted to xbuddy. |
| board: Board to use; None to automatically detect. |
| no_rootfs_update: Don't update rootfs partition; SSH |device| scheme only. |
| no_stateful_update: Don't update stateful partition; SSH |device| scheme |
| only. |
| no_minios_update: Don't update miniOS partition; SSH |device| scheme only. |
| clobber_stateful: Clobber stateful partition; SSH |device| scheme only. |
| clear_tpm_owner: Clear the TPM owner on reboot; SSH |device| scheme only. |
| reboot: Reboot device after update; SSH |device| scheme only. |
| ssh_private_key: Path to an SSH private key file; None to use test keys. |
| ping: Ping the device before attempting update; SSH |device| scheme only. |
| disable_rootfs_verification: Remove rootfs verification after update; SSH |
| |device| scheme only. |
| clear_cache: Clear the devserver static directory. |
| yes: Assume "yes" for any prompt. |
| force: Ignore sanity checks and prompts. Overrides |yes| if True. |
| debug: Print additional debugging messages. |
| version: Default version. |
| |
| Raises: |
| FlashError: An unrecoverable error occured. |
| ValueError: Invalid parameter combination. |
| """ |
| if force: |
| yes = True |
| |
| if clear_cache: |
| ds_wrapper.DevServerWrapper.WipeStaticDirectory() |
| ds_wrapper.DevServerWrapper.CreateStaticDirectory() |
| |
| # The user may not have specified a source image, use version as the default. |
| image = image or version |
| if not device or device.scheme == commandline.DEVICE_SCHEME_SSH: |
| if device: |
| hostname, port = device.hostname, device.port |
| else: |
| hostname, port = None, None |
| |
| with remote_access.ChromiumOSDeviceHandler( |
| hostname, port=port, |
| private_key=ssh_private_key, ping=ping) as device_p: |
| device_imager.DeviceImager( |
| device_p, |
| image, |
| board=board, |
| version=version, |
| no_rootfs_update=no_rootfs_update, |
| no_stateful_update=no_stateful_update, |
| no_minios_update=no_minios_update, |
| no_reboot=not reboot, |
| disable_verification=disable_rootfs_verification, |
| clobber_stateful=clobber_stateful, |
| clear_tpm_owner=clear_tpm_owner).Run() |
| elif device.scheme == commandline.DEVICE_SCHEME_USB: |
| path = osutils.ExpandPath(device.path) if device.path else '' |
| logging.info('Preparing to image the removable device %s', path) |
| imager = USBImager(path, |
| board, |
| image, |
| version, |
| debug=debug, |
| yes=yes) |
| imager.Run() |
| elif device.scheme == commandline.DEVICE_SCHEME_FILE: |
| logging.info('Preparing to copy image to %s', device.path) |
| imager = FileImager(device.path, |
| board, |
| image, |
| version, |
| debug=debug, |
| yes=yes) |
| imager.Run() |