blob: 1a74d54ec9e388798b93b1640506767fdc7008ea [file] [log] [blame]
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# Script to download ChromeOS images or test folders from google storage and
# output download location or directly copy (image) to a usb stick or ip.
# In order to install anything, file needs to be run from a place such that
# ./REL_SRC_DIR reaches chromiumos/src, or the src_dir input must be
# specified.
# Downloads image via gsutil to chromiumos/src/REL_DL_DIR or to given folder.
"""Download and output or run cros flash command to copy image to usb."""
import argparse
from multiprocessing import Manager
from multiprocessing import Process
import os
import shutil
import subprocess
# Relative path from this file to the chromiumos/src folder
REL_SRC_DIR = '../../../'
# Path to default download directory relative to chromiumos/src folder,
# i.e. REL_SRC_DIR+REL_DIR_DIR is a direct path from here to the directory
REL_DL_DIR = 'build/crosdl/'
# Conversions from common simplified/misspelled names of boards
PLATFORM_CONVERT = {'spring': 'daisy-spring', 'alex': 'x86-alex',
'alex-he': 'x86-alex-he', 'mario': 'x86-mario',
'zgb': 'x86-zgb', 'zgb-he': 'x86-zgb-he',
'pit': 'peach-pit', 'pi': 'peach-pi',
'snow': 'daisy', 'lucas': 'daisy', 'big': 'nyan-big',
'skate': 'daisy-skate', 'blaze': 'nyan-blaze',
'paine': 'auron-paine', 'yuna': 'auron-yuna',
'pinky': 'veyron-pinky', 'jerry': 'veyron-jerry',
'mighty': 'veyron-mighty', 'speedy': 'veyron-speedy',
'minnie': 'veyron-minnie', 'gus': 'veyron-gus',
'jaq': 'veyron-jaq'}
# Download types
RECOVERY = 0
TEST = 1
AUTOTEST = 2
FACTORY = 3
def _GenerateGstorageLink(c, p, b):
"""Generate Google storage link given channel, platform, and build."""
return 'gs://chromeos-releases/%s-channel/%s/%s/' % (c, p, b)
def _GenerateFolderName(download_type, b, p, c, mp):
"""Generate a folder name unique to the download."""
if download_type == TEST:
type_string = 'Test'
elif download_type == AUTOTEST:
type_string = 'Autotest'
elif download_type == FACTORY:
type_string = 'Factory'
else:
type_string = 'Recovery'
return '%s_%s_%s_%s%s' % (p, b, c, type_string, mp)
def main():
"""Download and output or run cros flash command."""
parser = argparse.ArgumentParser(
description=('Download a testing resource: recovery image, test image, '
'autotest folder, or factory\nbundle. Optionally make usb '
'sticks from these downloaded images.\n\n'
'e.g. ./crosdl.py -c dev -b 4996.0.0 -p '
'link daisy --tostick /dev/sdc /dev/sda.\n\nDefault '
'download location is src/%s.' % REL_DL_DIR),
formatter_class=argparse.RawDescriptionHelpFormatter)
group_type = parser.add_mutually_exclusive_group()
group_type.add_argument('-r', '--recovery', dest='recovery',
action='store_true', help='Recovery image (default).')
group_type.add_argument('-t', '--test', dest='test', action='store_true',
help='Test image.')
group_type.add_argument('-a', '--autotest', dest='autotest',
action='store_true', help='Autotest folder.')
group_type.add_argument('-f', '--factory', dest='factory',
action='store_true', help='Factory bundle.')
parser.add_argument('-c', '--channel', dest='channel', default='dev',
choices=['canary', 'dev', 'beta', 'stable'],
help='Channel (dev default).')
parser.add_argument('-b', '--build', dest='build',
help='Build number, e.g. 4996.0.0.')
parser.add_argument('-p', '--platform', dest='board', nargs='+',
help='Platform(s) to download, e.g. link daisy.')
parser.add_argument('--premp', action='store_true',
help='PreMP image instead of MP.')
parser.add_argument('--force', dest='force', action='store_true',
help=('Force new download of builds, even if files '
'already exist.'))
group_usb = parser.add_mutually_exclusive_group()
group_usb.add_argument('--to_stick', dest='to_stick', nargs='*',
help=('Copy to usb stick after download in a one-'
'to-one way. Can either specify drive(s) '
'(e.g. /dev/sdc) at which to install or leave '
'blank for interactive dialog later. For '
'multiple boards, list out all drives (e.g. '
'/dev/sdc /dev/sda /dev/sdd). Script will '
'match this list directly to the list of input '
'boards (in -p).'))
group_usb.add_argument('--one_to_multiple_sticks', dest='to_many', nargs='+',
help=('Copy one image to the listed multiple usb '
'sticks after download (e.g. /dev/sdc /dev/sdd)'
'. Must specify only one board at a time.'))
group_usb.add_argument('--to_ip', dest='to_ip', nargs=1,
help=('Flash to given ip address (using cros flash).'
' Only works when specifying one board. '
'Can list multiple ip addresses.'))
parser.add_argument('--folder', dest='folder',
help=('Specify a new download folder (default is src/'
'%s).' % REL_DL_DIR))
parser.add_argument('--delete_files', dest='delete', action='store_true',
help=('Delete any files downloaded from this command'
'when finished with copying to usb stick. '
'Applicable only when using --to_stick or '
'--one_to_multiple_sticks arguments.'))
parser.add_argument('--clear_folder', dest='clear', action='store_true',
help=('Delete all the sub-folders in the download '
'location (useful if you have filled up your '
'harddrive).'))
arguments = parser.parse_args()
# Ensure gsutil is installed.
try:
subprocess.check_output(['which', 'gsutil'])
except subprocess.CalledProcessError:
print('Error finding gsutil! Please make sure it is installed.')
return
# Find src/ dir.
script_file_dir = os.path.dirname(os.path.realpath(__file__))
src_dir = os.path.join(script_file_dir, REL_SRC_DIR)
src_dir = os.path.abspath(src_dir)
if os.path.basename(src_dir) != 'src':
print('Could not find src/ directory! Has this script been moved?')
return
# Set download folder as user defined or default.
user_folder = arguments.folder
if user_folder:
download_folder = user_folder
else:
download_folder = os.path.join(src_dir, REL_DL_DIR)
# Delete download folder contents if clearfolder flag present.
if arguments.clear:
if os.path.exists(download_folder):
print('Deleting sub-folder contents of %s.' % download_folder)
for item in os.listdir(download_folder):
item_path = os.path.join(download_folder, item)
if os.path.isdir(item_path):
shutil.rmtree(item_path)
else:
print('Download folder %s did not exist. Exiting.' % download_folder)
return
# Require board and platform arguments if not clearing downloads.
if not (arguments.board and arguments.build):
print('Must provide build number and platform(s). See crosdl.py -h for '
'usage description.')
return
# Require --deletefiles flag to be used only with --tostick flag.
if arguments.delete and (arguments.to_stick or arguments.to_many):
print('Will delete all newly downloaded files once finished.')
elif arguments.delete:
print('This command will download and immediately delete all files. '
'You probably meant to use the --tostick flag as well.')
return
# Deal with board name(s).
dupe_boards = []
boards = arguments.board
for i in range(len(boards)):
boards[i] = boards[i].lower().replace('_', '-')
if boards[i] in PLATFORM_CONVERT:
boards[i] = PLATFORM_CONVERT[boards[i]]
# Disallow duplicates.
if boards[i] in dupe_boards:
print('%s is listed twice in the boards list!' % boards[i])
return 1
dupe_boards.append(boards[i])
# Set download_type based on input flags. to_ip flag is TEST by default.
if arguments.test or arguments.to_ip:
print('Downloading test image(s).')
download_type = TEST
is_image = True
elif arguments.autotest:
print('Downloading autotest folder(s).')
download_type = AUTOTEST
is_image = False
elif arguments.factory:
print('Downloading factory folder(s).')
download_type = FACTORY
is_image = False
else: # RECOVERY
print('Downloading recovery image(s).')
download_type = RECOVERY
is_image = True
# String to identify premp/mp images.
mp_str = ''
if download_type == RECOVERY:
mp_str = '_premp' if arguments.premp else '_mp'
# Disallow listing multiple boards for 'to_many' or 'to_ip' flags.
installing_ip = type(arguments.to_ip) == list
installing_many = type(arguments.to_many) == list
if len(boards) > 1 and (installing_many or installing_ip):
print('Please only specify one board if using --one_to_multiple_'
'sticks or --to_ip option. See help menu.')
return 1
# If multiple boards to usb, must provide same number of drive names.
installing_one = type(arguments.to_stick) == list
if installing_one and (len(boards) > 1 or len(arguments.to_stick) > 1):
if not arguments.to_stick:
print('Error: To make sticks for multiple boards, please provide drive '
'names (e.g. /dev/sdc /dev/sdd). See -h for help.')
return 1
if len(arguments.to_stick) != len(boards):
print('Error: Was given %d boards but %d usb drive locations.'
% (len(boards), len(arguments.to_stick)))
return 1
for drive in arguments.to_stick:
if not os.path.exists(drive):
print('%s does not exist!' % drive)
# Disallow installing anything that isn't an image.
installing = installing_one or installing_many or installing_ip
if not is_image and installing:
print('Can only copy to usb or ip if downloading an image. See help.')
return 1
# Request sudo permissions if installing later.
if installing:
subprocess.call(['sudo', '-v'])
# Subroutine to download a file for a single board.
channel = arguments.channel
build = arguments.build
def _DownloadBoard(board, output_str, dl_error, dl_folder):
"""Download the file for a single board."""
# Assume error happened unless changed below.
dl_error[board] = True
# See if file already exists locally.
folder_name = _GenerateFolderName(download_type=download_type, p=board,
b=build, c=channel, mp=mp_str)
folder_path = os.path.join(download_folder, folder_name)
dl_folder[board] = folder_path
if download_type == TEST:
target_name = 'chromiumos_test_image.bin'
elif download_type == AUTOTEST:
target_name = ''
elif download_type == FACTORY:
target_name = ''
else: # RECOVERY
target_name = 'recovery_image.bin'
target_path = os.path.join(folder_path, target_name)
# Skip for already present files, else download new file.
if os.path.exists(target_path) and not arguments.force:
print('%s: Found file locally. Skipping download.' % board)
else:
# Make folder if needed.
if not os.path.exists(folder_path):
subprocess.call(['mkdir', '-p', folder_path])
# Generate search terms.
folder = _GenerateGstorageLink(c=channel, p=board, b=build)
if download_type == TEST:
file_search = '%s*test*.tar.xz' % folder
elif download_type == AUTOTEST:
file_search = '%s*hwqual*.tar.bz2' % folder
elif download_type == FACTORY:
file_search = '%s*factory*.zip' % folder
else: # RECOVERY
file_search = '%s*recovery*.bin' % folder
# Output error if no files found
def _no_file_error(message):
"""Actions to take if file is not found."""
print('%s: %s' % (board, message))
output_str[board] = '%s: Could not find file.' % board
# Look for folder while file belongs.
try:
possible_files = subprocess.check_output(['gsutil', 'ls', folder])
except subprocess.CalledProcessError:
_no_file_error('Could not find folder %s where this file is supposed '
'to be. Please check input values.' % folder)
return 1
# Look for file in folder.
try:
possible_files = subprocess.check_output(['gsutil', 'ls', file_search])
except subprocess.CalledProcessError:
_no_file_error('Could not find file but found folder.')
return 1
# Locate exact file_name.
possible_files = possible_files.splitlines()
if len(possible_files) != 1:
_no_file_error('Found %d possible files, not 1.' % len(possible_files))
return 1
gsfile_path = possible_files[0]
file_name = os.path.basename(gsfile_path)
# Download file to local machine.
try:
subprocess.call(['gsutil', 'cp', gsfile_path, folder_path])
except subprocess.CalledProcessError:
print('gsutil error. Try running this command outside of chroot?')
output_str[board] = '%s: Could not run gsutil command.' % board
return 1
# Untar/rename files as needed.
file_path = os.path.join(folder_path, file_name.decode('utf-8'))
if download_type == TEST:
subprocess.call(['tar', '-xf', file_path, '-C', folder_path])
os.remove(file_path)
elif download_type == AUTOTEST:
print('%s: running tar -xf command' % board)
subprocess.call(['tar', '-xf', file_path, '-C', folder_path])
target_name = 'autotest'
target_path = os.path.join(folder_path, file_name[:-len('.tar.bz2')],
target_name)
os.remove(file_path)
elif download_type == FACTORY:
print('trying to unzip %s to %s' % (file_path, folder_path))
subprocess.call(['unzip', '-q', file_path, '-d', folder_path])
os.remove(file_path)
else: # RECOVERY
os.rename(file_path, os.path.join(folder_path, target_name))
# Report successful download and return path to downloaded thing
output_str[board] = target_path
dl_error[board] = False
print('%s: DONE' % board)
# For each board, download file.
manager = Manager()
output_str = manager.dict()
dl_error = manager.dict()
dl_folder = manager.dict()
jobs = []
for board in boards:
# Run download in separate process.
proc = Process(target=_DownloadBoard, args=(board, output_str, dl_error,
dl_folder,))
jobs.append(proc)
proc.start()
# Wait for all downloads to finish.
for job in jobs:
job.join()
# Print output or run cros flash command.
errors = ''
if installing:
# Move to src/ dir to access 'cros' command.
starting_dir = os.getcwd()
os.chdir(src_dir)
# Use 'cros flash' to copy images to usb or ip.
jobs = []
if installing_one:
to_list = arguments.to_stick if arguments.to_stick else ['']
from_list = boards
destination_str = 'usb://%s'
elif installing_many:
to_list = arguments.to_many
from_list = [boards[0]] * len(to_list)
destination_str = 'usb://%s'
else: # installing_ip
to_list = arguments.to_ip
from_list = [boards[0]] * len(to_list)
destination_str = '%s'
for i in range(len(from_list)):
board = from_list[i]
if not dl_error[board]:
destination = destination_str % to_list[i]
print('Copying %s to %s.' % (board, destination))
cmd = 'cros flash %s %s' % (destination, output_str[board])
proc = subprocess.Popen(cmd.split(' '))
jobs.append(proc)
else:
errors += '%s\n' % output_str[board]
# Wait for all copies to finish.
for job in jobs:
job.wait()
if arguments.delete:
print('Deleting all files created for %s.' % board)
shutil.rmtree(dl_folder[board])
# Return to previous directory.
os.chdir(starting_dir)
else:
print('\nDownloaded File(s):')
for board in boards:
if not dl_error[board]:
print(output_str[board])
else:
errors += '%s\n' % output_str[board]
# Summarize errors, if any.
print('\nScript complete.')
print(errors)
if __name__ == '__main__':
main()