| #!/usr/bin/env python3 |
| # -*- coding: utf-8 -*- |
| # |
| # Copyright 2019 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Script to image a ChromeOS device. |
| |
| This script images a remote ChromeOS device with a specific image." |
| """ |
| |
| from __future__ import print_function |
| |
| __author__ = 'asharif@google.com (Ahmad Sharif)' |
| |
| import argparse |
| import filecmp |
| import getpass |
| import glob |
| import os |
| import re |
| import shutil |
| import sys |
| import tempfile |
| import time |
| |
| from cros_utils import command_executer |
| from cros_utils import locks |
| from cros_utils import logger |
| from cros_utils import misc |
| from cros_utils.file_utils import FileUtils |
| |
| checksum_file = '/usr/local/osimage_checksum_file' |
| lock_file = '/tmp/image_chromeos_lock/image_chromeos_lock' |
| |
| |
| def Usage(parser, message): |
| print('ERROR: %s' % message) |
| parser.print_help() |
| sys.exit(0) |
| |
| |
| def CheckForCrosFlash(chromeos_root, remote, log_level): |
| cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) |
| |
| # Check to see if remote machine has cherrypy, ctypes |
| command = "python -c 'import cherrypy, ctypes'" |
| ret = cmd_executer.CrosRunCommand( |
| command, chromeos_root=chromeos_root, machine=remote) |
| logger.GetLogger().LogFatalIf( |
| ret == 255, 'Failed ssh to %s (for checking cherrypy)' % remote) |
| logger.GetLogger().LogFatalIf( |
| ret != 0, "Failed to find cherrypy or ctypes on remote '{}', " |
| 'cros flash cannot work.'.format(remote)) |
| |
| |
| def DisableCrosBeeps(chromeos_root, remote, log_level): |
| """Disable annoying chromebooks beeps after reboots.""" |
| cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) |
| |
| command = '/usr/share/vboot/bin/set_gbb_flags.sh 0x1' |
| logger.GetLogger().LogOutput('Trying to disable beeping.') |
| |
| ret, o, _ = cmd_executer.CrosRunCommandWOutput( |
| command, chromeos_root=chromeos_root, machine=remote) |
| if ret != 0: |
| logger.GetLogger().LogOutput(o) |
| logger.GetLogger().LogOutput('Failed to disable beeps.') |
| |
| |
| def FindChromeOSImage(image_file, chromeos_root): |
| """Find path for ChromeOS image inside chroot. |
| |
| This function could be called with image paths that are either inside |
| or outside the chroot. In either case the path needs to be translated |
| to an real/absolute path inside the chroot. |
| Example input paths: |
| /usr/local/google/home/uname/chromeos/chroot/tmp/my-test-images/image |
| ~/trunk/src/build/images/board/latest/image |
| /tmp/peppy-release/R67-1235.0.0/image |
| |
| Corresponding example output paths: |
| /tmp/my-test-images/image |
| /home/uname/trunk/src/build/images/board/latest/image |
| /tmp/peppy-release/R67-1235.0,0/image |
| """ |
| |
| # Get the name of the user, for "/home/<user>" part of the path. |
| whoami = getpass.getuser() |
| # Get the full path for the chroot dir, including 'chroot' |
| real_chroot_dir = os.path.join(os.path.realpath(chromeos_root), 'chroot') |
| # Get the full path for the chromeos root, excluding 'chroot' |
| real_chromeos_root = os.path.realpath(chromeos_root) |
| |
| # If path name starts with real_chroot_dir, remove that piece, but assume |
| # the rest of the path is correct. |
| if image_file.find(real_chroot_dir) != -1: |
| chroot_image = image_file[len(real_chroot_dir):] |
| # If path name starts with chromeos_root, excluding 'chroot', replace the |
| # chromeos_root with the prefix: '/home/<username>/trunk'. |
| elif image_file.find(real_chromeos_root) != -1: |
| chroot_image = image_file[len(real_chromeos_root):] |
| chroot_image = '/home/%s/trunk%s' % (whoami, chroot_image) |
| # Else assume the path is already internal, so leave it alone. |
| else: |
| chroot_image = image_file |
| |
| return chroot_image |
| |
| |
| def DoImage(argv): |
| """Image ChromeOS.""" |
| |
| parser = argparse.ArgumentParser() |
| parser.add_argument( |
| '-c', |
| '--chromeos_root', |
| dest='chromeos_root', |
| help='Target directory for ChromeOS installation.') |
| parser.add_argument('-r', '--remote', dest='remote', help='Target device.') |
| parser.add_argument('-i', '--image', dest='image', help='Image binary file.') |
| parser.add_argument( |
| '-b', '--board', dest='board', help='Target board override.') |
| parser.add_argument( |
| '-f', |
| '--force', |
| dest='force', |
| action='store_true', |
| default=False, |
| help='Force an image even if it is non-test.') |
| parser.add_argument( |
| '-n', |
| '--no_lock', |
| dest='no_lock', |
| default=False, |
| action='store_true', |
| help='Do not attempt to lock remote before imaging. ' |
| 'This option should only be used in cases where the ' |
| 'exclusive lock has already been acquired (e.g. in ' |
| 'a script that calls this one).') |
| parser.add_argument( |
| '-l', |
| '--logging_level', |
| dest='log_level', |
| default='verbose', |
| help='Amount of logging to be used. Valid levels are ' |
| "'quiet', 'average', and 'verbose'.") |
| parser.add_argument('-a', '--image_args', dest='image_args') |
| |
| options = parser.parse_args(argv[1:]) |
| |
| if not options.log_level in command_executer.LOG_LEVEL: |
| Usage(parser, "--logging_level must be 'quiet', 'average' or 'verbose'") |
| else: |
| log_level = options.log_level |
| |
| # Common initializations |
| cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) |
| l = logger.GetLogger() |
| |
| if options.chromeos_root is None: |
| Usage(parser, '--chromeos_root must be set') |
| |
| if options.remote is None: |
| Usage(parser, '--remote must be set') |
| |
| options.chromeos_root = os.path.expanduser(options.chromeos_root) |
| |
| if options.board is None: |
| board = cmd_executer.CrosLearnBoard(options.chromeos_root, options.remote) |
| else: |
| board = options.board |
| |
| if options.image is None: |
| images_dir = misc.GetImageDir(options.chromeos_root, board) |
| image = os.path.join(images_dir, 'latest', 'chromiumos_test_image.bin') |
| if not os.path.exists(image): |
| image = os.path.join(images_dir, 'latest', 'chromiumos_image.bin') |
| is_xbuddy_image = False |
| else: |
| image = options.image |
| is_xbuddy_image = image.startswith('xbuddy://') |
| if not is_xbuddy_image: |
| image = os.path.expanduser(image) |
| |
| if not is_xbuddy_image: |
| image = os.path.realpath(image) |
| |
| if not os.path.exists(image) and not is_xbuddy_image: |
| Usage(parser, 'Image file: ' + image + ' does not exist!') |
| |
| try: |
| should_unlock = False |
| if not options.no_lock: |
| try: |
| _ = locks.AcquireLock( |
| list(options.remote.split()), options.chromeos_root) |
| should_unlock = True |
| except Exception as e: |
| raise RuntimeError('Error acquiring machine: %s' % str(e)) |
| |
| reimage = False |
| local_image = False |
| if not is_xbuddy_image: |
| local_image = True |
| image_checksum = FileUtils().Md5File(image, log_level=log_level) |
| |
| command = 'cat ' + checksum_file |
| ret, device_checksum, _ = cmd_executer.CrosRunCommandWOutput( |
| command, chromeos_root=options.chromeos_root, machine=options.remote) |
| |
| device_checksum = device_checksum.strip() |
| image_checksum = str(image_checksum) |
| |
| l.LogOutput('Image checksum: ' + image_checksum) |
| l.LogOutput('Device checksum: ' + device_checksum) |
| |
| if image_checksum != device_checksum: |
| [found, located_image] = LocateOrCopyImage( |
| options.chromeos_root, image, board=board) |
| |
| reimage = True |
| l.LogOutput('Checksums do not match. Re-imaging...') |
| |
| chroot_image = FindChromeOSImage(located_image, options.chromeos_root) |
| |
| is_test_image = IsImageModdedForTest(options.chromeos_root, |
| chroot_image, log_level) |
| |
| if not is_test_image and not options.force: |
| logger.GetLogger().LogFatal('Have to pass --force to image a ' |
| 'non-test image!') |
| else: |
| reimage = True |
| found = True |
| l.LogOutput('Using non-local image; Re-imaging...') |
| |
| if reimage: |
| # If the device has /tmp mounted as noexec, image_to_live.sh can fail. |
| command = 'mount -o remount,rw,exec /tmp' |
| cmd_executer.CrosRunCommand( |
| command, chromeos_root=options.chromeos_root, machine=options.remote) |
| |
| # Check to see if cros flash will work for the remote machine. |
| CheckForCrosFlash(options.chromeos_root, options.remote, log_level) |
| |
| # Disable the annoying chromebook beeps after reboot. |
| DisableCrosBeeps(options.chromeos_root, options.remote, log_level) |
| |
| cros_flash_args = [ |
| 'cros', 'flash', |
| '--board=%s' % board, '--clobber-stateful', options.remote |
| ] |
| if local_image: |
| cros_flash_args.append(chroot_image) |
| else: |
| cros_flash_args.append(image) |
| |
| command = ' '.join(cros_flash_args) |
| |
| # Workaround for crosbug.com/35684. |
| os.chmod(misc.GetChromeOSKeyFile(options.chromeos_root), 0o600) |
| |
| if log_level == 'average': |
| cmd_executer.SetLogLevel('verbose') |
| retries = 0 |
| while True: |
| if log_level == 'quiet': |
| l.LogOutput('CMD : %s' % command) |
| ret = cmd_executer.ChrootRunCommand( |
| options.chromeos_root, command, command_timeout=1800) |
| if ret == 0 or retries >= 2: |
| break |
| retries += 1 |
| if log_level == 'quiet': |
| l.LogOutput('Imaging failed. Retry # %d.' % retries) |
| |
| if log_level == 'average': |
| cmd_executer.SetLogLevel(log_level) |
| |
| logger.GetLogger().LogFatalIf(ret, 'Image command failed') |
| |
| # Unfortunately cros_image_to_target.py sometimes returns early when the |
| # machine isn't fully up yet. |
| ret = EnsureMachineUp(options.chromeos_root, options.remote, log_level) |
| |
| # If this is a non-local image, then the ret returned from |
| # EnsureMachineUp is the one that will be returned by this function; |
| # in that case, make sure the value in 'ret' is appropriate. |
| if not local_image and ret: |
| ret = 0 |
| else: |
| ret = 1 |
| |
| if local_image: |
| if log_level == 'average': |
| l.LogOutput('Verifying image.') |
| command = 'echo %s > %s && chmod -w %s' % (image_checksum, |
| checksum_file, checksum_file) |
| ret = cmd_executer.CrosRunCommand( |
| command, |
| chromeos_root=options.chromeos_root, |
| machine=options.remote) |
| logger.GetLogger().LogFatalIf(ret, 'Writing checksum failed.') |
| |
| successfully_imaged = VerifyChromeChecksum( |
| options.chromeos_root, chroot_image, options.remote, log_level) |
| logger.GetLogger().LogFatalIf(not successfully_imaged, |
| 'Image verification failed!') |
| TryRemountPartitionAsRW(options.chromeos_root, options.remote, |
| log_level) |
| |
| if not found: |
| temp_dir = os.path.dirname(located_image) |
| l.LogOutput('Deleting temp image dir: %s' % temp_dir) |
| shutil.rmtree(temp_dir) |
| l.LogOutput('Image updated.') |
| else: |
| l.LogOutput('Checksums match, skip image update and reboot.') |
| command = 'reboot && exit' |
| _ = cmd_executer.CrosRunCommand( |
| command, chromeos_root=options.chromeos_root, machine=options.remote) |
| # Wait 30s after reboot. |
| time.sleep(30) |
| |
| finally: |
| if should_unlock: |
| locks.ReleaseLock(list(options.remote.split()), options.chromeos_root) |
| |
| return ret |
| |
| |
| def LocateOrCopyImage(chromeos_root, image, board=None): |
| l = logger.GetLogger() |
| if board is None: |
| board_glob = '*' |
| else: |
| board_glob = board |
| |
| chromeos_root_realpath = os.path.realpath(chromeos_root) |
| image = os.path.realpath(image) |
| |
| if image.startswith('%s/' % chromeos_root_realpath): |
| return [True, image] |
| |
| # First search within the existing build dirs for any matching files. |
| images_glob = ( |
| '%s/src/build/images/%s/*/*.bin' % (chromeos_root_realpath, board_glob)) |
| images_list = glob.glob(images_glob) |
| for potential_image in images_list: |
| if filecmp.cmp(potential_image, image): |
| l.LogOutput('Found matching image %s in chromeos_root.' % potential_image) |
| return [True, potential_image] |
| # We did not find an image. Copy it in the src dir and return the copied |
| # file. |
| if board is None: |
| board = '' |
| base_dir = ('%s/src/build/images/%s' % (chromeos_root_realpath, board)) |
| if not os.path.isdir(base_dir): |
| os.makedirs(base_dir) |
| temp_dir = tempfile.mkdtemp(prefix='%s/tmp' % base_dir) |
| new_image = '%s/%s' % (temp_dir, os.path.basename(image)) |
| l.LogOutput('No matching image found. Copying %s to %s' % (image, new_image)) |
| shutil.copyfile(image, new_image) |
| return [False, new_image] |
| |
| |
| def GetImageMountCommand(image, rootfs_mp, stateful_mp): |
| image_dir = os.path.dirname(image) |
| image_file = os.path.basename(image) |
| mount_command = ('cd /mnt/host/source/src/scripts &&' |
| './mount_gpt_image.sh --from=%s --image=%s' |
| ' --safe --read_only' |
| ' --rootfs_mountpt=%s' |
| ' --stateful_mountpt=%s' % (image_dir, image_file, rootfs_mp, |
| stateful_mp)) |
| return mount_command |
| |
| |
| def MountImage(chromeos_root, |
| image, |
| rootfs_mp, |
| stateful_mp, |
| log_level, |
| unmount=False, |
| extra_commands=''): |
| cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) |
| command = GetImageMountCommand(image, rootfs_mp, stateful_mp) |
| if unmount: |
| command = '%s --unmount' % command |
| if extra_commands: |
| command = '%s ; %s' % (command, extra_commands) |
| ret, out, _ = cmd_executer.ChrootRunCommandWOutput(chromeos_root, command) |
| logger.GetLogger().LogFatalIf(ret, 'Mount/unmount command failed!') |
| return out |
| |
| |
| def IsImageModdedForTest(chromeos_root, image, log_level): |
| if log_level != 'verbose': |
| log_level = 'quiet' |
| command = 'mktemp -d' |
| cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) |
| _, rootfs_mp, _ = cmd_executer.ChrootRunCommandWOutput(chromeos_root, command) |
| _, stateful_mp, _ = cmd_executer.ChrootRunCommandWOutput( |
| chromeos_root, command) |
| rootfs_mp = rootfs_mp.strip() |
| stateful_mp = stateful_mp.strip() |
| lsb_release_file = os.path.join(rootfs_mp, 'etc/lsb-release') |
| extra = ('grep CHROMEOS_RELEASE_TRACK %s | grep -i test' % lsb_release_file) |
| output = MountImage( |
| chromeos_root, |
| image, |
| rootfs_mp, |
| stateful_mp, |
| log_level, |
| extra_commands=extra) |
| is_test_image = re.search('test', output, re.IGNORECASE) |
| MountImage( |
| chromeos_root, image, rootfs_mp, stateful_mp, log_level, unmount=True) |
| return is_test_image |
| |
| |
| def VerifyChromeChecksum(chromeos_root, image, remote, log_level): |
| command = 'mktemp -d' |
| cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) |
| _, rootfs_mp, _ = cmd_executer.ChrootRunCommandWOutput(chromeos_root, command) |
| _, stateful_mp, _ = cmd_executer.ChrootRunCommandWOutput( |
| chromeos_root, command) |
| rootfs_mp = rootfs_mp.strip() |
| stateful_mp = stateful_mp.strip() |
| chrome_file = '%s/opt/google/chrome/chrome' % rootfs_mp |
| extra = 'md5sum %s' % chrome_file |
| out = MountImage( |
| chromeos_root, |
| image, |
| rootfs_mp, |
| stateful_mp, |
| log_level, |
| extra_commands=extra) |
| image_chrome_checksum = out.strip().split()[0] |
| MountImage( |
| chromeos_root, image, rootfs_mp, stateful_mp, log_level, unmount=True) |
| |
| command = 'md5sum /opt/google/chrome/chrome' |
| [_, o, _] = cmd_executer.CrosRunCommandWOutput( |
| command, chromeos_root=chromeos_root, machine=remote) |
| device_chrome_checksum = o.split()[0] |
| return image_chrome_checksum.strip() == device_chrome_checksum.strip() |
| |
| |
| # Remount partition as writable. |
| # TODO: auto-detect if an image is built using --noenable_rootfs_verification. |
| def TryRemountPartitionAsRW(chromeos_root, remote, log_level): |
| l = logger.GetLogger() |
| cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) |
| command = 'sudo mount -o remount,rw /' |
| ret = cmd_executer.CrosRunCommand(\ |
| command, chromeos_root=chromeos_root, machine=remote, |
| terminated_timeout=10) |
| if ret: |
| ## Safely ignore. |
| l.LogWarning('Failed to remount partition as rw, ' |
| 'probably the image was not built with ' |
| '"--noenable_rootfs_verification", ' |
| 'you can safely ignore this.') |
| else: |
| l.LogOutput('Re-mounted partition as writable.') |
| |
| |
| def EnsureMachineUp(chromeos_root, remote, log_level): |
| l = logger.GetLogger() |
| cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) |
| timeout = 600 |
| magic = 'abcdefghijklmnopqrstuvwxyz' |
| command = 'echo %s' % magic |
| start_time = time.time() |
| while True: |
| current_time = time.time() |
| if current_time - start_time > timeout: |
| l.LogError( |
| 'Timeout of %ss reached. Machine still not up. Aborting.' % timeout) |
| return False |
| ret = cmd_executer.CrosRunCommand( |
| command, chromeos_root=chromeos_root, machine=remote) |
| if not ret: |
| return True |
| |
| |
| if __name__ == '__main__': |
| retval = DoImage(sys.argv) |
| sys.exit(retval) |