A script to download and create a stick with test image.
TEST=None
BUG=None
Change-Id: Id3cc3297f776b7247794f7f392c68cc7feb57fb0
Reviewed-on: https://gerrit.chromium.org/gerrit/56119
Tested-by: Deepak Gopal <deepakg@chromium.org>
Reviewed-by: Kris Rambish <krisr@chromium.org>
Commit-Queue: Deepak Gopal <deepakg@chromium.org>
diff --git a/provingground/download_test_build.py b/provingground/download_test_build.py
new file mode 100755
index 0000000..6e2d82e
--- /dev/null
+++ b/provingground/download_test_build.py
@@ -0,0 +1,397 @@
+#!/usr/bin/python
+# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import os
+import subprocess as sub
+import sys
+import xml.parsers.expat
+
+
+"""
+This script can be used to download chromeos test images from the the cloud
+storage. We assume that the cloud storage path remains similar all the time.
+
+eg: gs://chromeos-releases/dev-channel/link/4162.0.0/
+ ChromeOS-R29-4162.0.0-link.zip
+
+So passing the above value to 'gstuil cp ' will download
+the relevant CrOS image.
+
+After downloading the file we uzip the file and wait for the user
+to give the path to which the image should be copied.
+
+eg: /dev/sdb
+
+This script will create a build_defaults.xml file with the defaults info in
+the scripts folder. So please run this from a folder with a write permissions.
+
+"""
+
+
+def run_command(command, shell=False):
+ """ Run a given command using subprocess.
+
+ @param command: The command to run. Should be a list.
+ @param shell: Flag to use bash to run the command.
+
+ @return out: Output of the process.
+ """
+ process = sub.Popen(command, stdout=sub.PIPE,
+ stderr=sub.PIPE, shell=shell)
+ (out, err) = process.communicate()
+
+ if err and 'Copying gs://chromeos-releases/' not in err:
+ raise Exception('We got an error %s' % err)
+ return out
+
+
+class defaults():
+ # TODO: Have a defaults file to read/write into it.
+ channel_list = ['dev', 'beta', 'stable']
+ device_list = ['x86-mario', 'x86-zgb', 'x86-alex', 'daisy', 'stout',
+ 'link', 'butterfly', 'stout', 'stumpy', 'parrot', 'lumpy',]
+ default_dict = {'channel': None, 'device': None,
+ 'branch': None, 'version': None}
+ defaults_file = 'build_defaults.xml'
+
+
+ def __init__(self):
+ if not os.path.exists(self.defaults_file):
+ self._create_defaults()
+
+
+ def set_defaults(self, channel, device, version, branch):
+ """Set the new defaults.
+
+ @param channel: default channel to be saved.
+ @param device: defalut device to be saved.
+ @param version: default version to be saved.
+ @param branch: default branch to be saved.
+ """
+ self.default_dict['channel'] = channel
+ self.default_dict['branch'] = branch
+ self.default_dict['version'] = version
+ self.default_dict['device'] = device
+ self._update_defaults()
+
+
+ def _create_defaults_element(self, initial=True):
+ """Create a defaults element to be added to the xml file.
+
+ @param initial: Flag to check if we are creating or updating the
+ defaults.
+
+ @return defaults_element: Element with defaults attributes to be added.
+ """
+ defaults_element = '<defaults '
+ for key in self.default_dict.keys():
+ if initial or self.default_dict[key]:
+ defaults_element += '%s="%s" ' % (key, self.default_dict[key])
+ defaults_element += ('></defaults>\n')
+ return defaults_element
+
+
+ def _update_defaults(self):
+ """Update the defaults list in database/xml."""
+ defaults_element = self._create_defaults_element(initial=False)
+ lines = open(self.defaults_file, 'r').readlines()
+ os.remove(self.defaults_file)
+ fin = open(self.defaults_file, 'w')
+ for line in lines:
+ if '<defaults ' in line:
+ fin.write(defaults_element)
+ break # this will always be the last line
+ else:
+ fin.write(line)
+ fin.close()
+
+
+ def _create_defaults(self):
+ """Create all the defaults."""
+ print 'Creating Defaults file.'
+ fout = open(self.defaults_file, 'wb')
+ root = '<feed xmlns=\'http://www.w3.org/2005/Atom\' xml:lang=\'en\'>\n'
+ fout.write(root)
+ channel_element = '<channel>'
+ for channel in self.channel_list:
+ channel_element += '%s, ' % channel
+ channel_element += ('</channel>\n')
+ fout.write(channel_element)
+ device_element = '<device>'
+ for device in self.device_list:
+ device_element += '%s, ' % device
+ device_element += ('</device>\n')
+ fout.write(device_element)
+ defaults_element = self._create_defaults_element()
+ fout.write(defaults_element)
+ fout.close()
+
+
+ def previous_defaults(self):
+ """Get the default values."""
+ # Parse and read the xml data
+ self.read_xml()
+ return self.default_dict
+
+
+ def start_element(self, name, attrs):
+ if name == 'defaults':
+ self.default_dict = attrs
+
+
+ def char_data(self, data):
+ if 'dev' in data:
+ self.add_default_channel(data)
+ if 'link' in data:
+ self.add_default_devices(data)
+
+
+ def read_xml(self):
+ """Read and parse the xml file."""
+ fin = open(self.defaults_file, 'r')
+ parse_data = fin.read()
+ parser = xml.parsers.expat.ParserCreate()
+ parser.StartElementHandler = self.start_element
+ parser.CharacterDataHandler = self.char_data
+ parser.Parse(parse_data)
+
+
+ def channel(self):
+ """Read the default channel from the logs"""
+ return self.channel_list
+
+
+ def add_default_channel(self, data):
+ """Add a channel if it does not exist in defaults."""
+ for channel in data.split(','):
+ if channel and (channel not in self.channel_list):
+ self.channel_list.append(channel)
+
+
+ def add_default_devices(self, data):
+ """Add a device if it does not exist in defaults."""
+ for device in data.split(','):
+ if device and device not in self.device_list:
+ self.device_list.append(device)
+
+
+ def element_in_list(self, element, element_list):
+ for old_element in element_list:
+ if element in old_element:
+ return False
+ return True
+
+
+class download_image():
+ # This script will work until the image path remains unchanged. If the
+ # cloud team decides to change the default path to the images, change this
+ # path constant accordingly. This is used build_source.
+ image_path = 'gs://chromeos-releases/%s-channel/%s/%s/ChromeOS-%s-%s-%s.zip'
+
+ def file_check(self, dest, folder=True, action=False):
+ """ Check if a file or folder exists.
+ This is used in two places, first to check if we have the test
+ folder and second to check if we already have the file to download.
+
+ @param dest: The path to look for. Should be a string.
+ @param folder: Flag to indicate if the path is a directory or not.
+ @param action: Flag to indicate if an action is required.
+ eg: Creation or deletion.
+ """
+ if folder and not os.path.exists(dest):
+ if action:
+ os.makedirs(dest)
+ print 'Created directory %s' % dest
+ else:
+ raise Exception('The path %s does not exist' % dest)
+ elif folder and not os.path.isdir(dest):
+ raise Exception('The path %s exists and is not a folder.' % dest)
+ elif not folder and os.path.exists(dest):
+ zip_file = os.path.basename(dest)
+ file_type = 'file'
+ if os.path.isdir(zip_file):
+ file_type = 'folder'
+ if action:
+ print ('The %s %s exists or is downloaded in %s' %
+ (os.path.basename(dest), file_type, os.path.dirname(dest)))
+ delete_flag = raw_input('Do you want to remove the existing?'
+ ' (Y/N): ')
+ if delete_flag.lower() == 'y':
+ print 'Removing the file %s' % dest
+ os.remove(dest)
+ else:
+ print 'Continuing with the download.'
+
+
+ def check_download(self, dest, source):
+ """Check if the download was successful. Verify the checksum and the
+ size of the downloaded file.
+
+ @param dest: The complete path to the downloaded file
+ @param source: The complete path to the file in cloudstorage.
+ """
+ out = run_command(['gsutil', 'ls', '-L', source])
+ checksum = size = ''
+ for line in out.split('\n'):
+ if 'ETag:' in line:
+ checksum = line.split(':')[1].strip('\t')
+ elif 'Content-Length:' in line:
+ size = line.split(':')[1].strip('\t')
+ sout = run_command(['ls -Sl %s' % dest], shell=True)
+ md5out = run_command(['md5sum', dest])
+ if not size in sout:
+ raise Exception ('File did not download completely. '
+ 'It should be %s and we got %s' % (size, sout))
+ elif not checksum in md5out:
+ raise Exception ('The downloaded file is corrupted. '
+ 'The checksum should %s and we got %s' %
+ (checksum, md5out))
+
+
+ def download(self, device=None, channel=None, version=None, branch=None):
+ """Download a zip file containing the test image. The user is
+ expected to provide a clean input. A blank input means that
+ we use defaults. Defaults will be the last build
+ that the script attempted to download.
+
+ @param device : The device under test. eg: x86-mario, link. This should
+ be the exact name of the device as shown in the
+ http://chromeos-images.
+ @param channel: The channel under which we can get the test build.
+ eg: dev, beta, stable.
+ @param version: The version of the build. eg:3701.0.0, 3701.71.0.
+ @param branch : The branch under which we can find the version.
+ eg: R26, R27.
+
+ @return final_dest: The complete downloaded file path.
+ """
+ # TODO: download stops at 4109631488, check why
+ build_info = self.build_source(device=device, channel=channel,
+ version=version, branch=branch)
+ source = build_info[0]
+ dest = '/tmp/%s/test' % build_info[1]
+ final_dest = os.path.join(dest, os.path.basename(source))
+ self.file_check(dest, folder=True, action=True)
+ self.file_check(final_dest, folder=False, action=True)
+ print 'Downloading %s, Please wait ...' % source
+ run_command(['gsutil', 'cp', source, dest])
+ self.file_check(final_dest, folder=False)
+ self.check_download(final_dest, source)
+ run_command(['nautilus', dest])
+ return final_dest
+
+
+ def build_source(self, device=None, channel=None,
+ version=None, branch=None):
+ """Multiple option parser. Allows user to give arguments or options
+ or call the function from their module to create the source path
+ of the CrOS test build zip file to be downloaded. The user is
+ expected to provide a clean input. A blank input means that we
+ use defaults. Defaults will be the last build
+ that the script attempted to download.
+
+ @param device : The device under test. eg: x86-mario, link. This should
+ be the exact name of the device as shown in the
+ http://chromeos-images.
+ @param channel: The channel under which we can get the test build.
+ eg: dev, beta, stable.
+ @param version: The version of the build. eg:3701.0.0, 3701.71.0.
+ @param branch : The branch under which we can find the version.
+ eg: R26, R27.
+ """
+ default = defaults()
+ # We save the previous configuration as default
+ prev_values = default.previous_defaults()
+ defa_channel = prev_values['channel']
+ defa_version = prev_values['version']
+ defa_branch = prev_values['branch']
+ defa_device = prev_values['device']
+ if not (channel and branch and version and device):
+ channel = raw_input('Enter the channel(default: %s): ' %
+ defa_channel)
+ device = raw_input('Enter the device(default: %s): ' %
+ defa_device)
+ branch = raw_input('Enter the branch(default: %s): ' %
+ defa_branch)
+ version = raw_input('Enter the version(default: %s): ' %
+ defa_version)
+
+ # Check the inputs again
+ if not (channel and branch and version and device):
+ if not (defa_channel and defa_branch and defa_version and
+ defa_device):
+ raise Exception ('Insufficient input to download the build. '
+ 'Please use \"build_down.py --help\" to get '
+ 'more information about passing arguments.')
+ sys.exit(0)
+ if not channel: # If no input, we use defaults
+ channel = defa_channel
+ if not branch:
+ branch = defa_branch
+ if not version:
+ version = defa_version
+ if not device:
+ device = defa_device
+ path = (self.image_path % (channel, device, version, branch,
+ version, device))
+ default.set_defaults(channel=channel, device=device,
+ version=version, branch=branch)
+ return [path, device]
+
+
+class unzip_burn():
+
+ def unzip_file(self, zip_file):
+ """ Unzip the given file into a folder with the same name.
+
+ @param zip_file: The file to be unzipped.
+
+ @return dest: The folder with unzipped files.
+ """
+ dest = os.path.splitext(zip_file)[0]
+ # We avoid all the unnecesssary files to reduce the unzip time
+ command = ['unzip', zip_file, '-d', dest, '-x',
+ 'autotest.tar.bz2', 'chromiumos_base_image.bin',
+ 'recovery_image.bin']
+ large_zips = ['stout', 'link']
+ for board in large_zips:
+ if board in zip_file:
+ command.append('chromiumos_qemu_image.bin')
+ break
+ if run_command(command):
+ return dest
+
+
+ def burn(self, image, drive):
+ """ Burn the image to the given drive.
+
+ @param image: The complete path to the image.
+ @param drive: The complete path to the drive.
+ """
+ print 'Burning the image %s on to drive %s ...' % (image, drive)
+ if run_command(['sudo dd if=%s of=%s' % (image, drive)],
+ shell=True):
+ print 'Image is now on %s drive.'
+
+
+def main():
+ download = download_image()
+ image = download.download()
+ burn = unzip_burn()
+ print 'Unzipping the folder, please wait ...'
+ dest = burn.unzip_file(image)
+ if dest:
+ download_path = os.path.join(dest, 'chromiumos_test_image.bin')
+ print 'The test image is in %s' % download_path
+ drive_path = raw_input('Which drive to use?(default: /dev/sdc): ')
+ if '/dev' not in drive_path:
+ print 'Using the default drive path /dev/sdc.'
+ drive_path = '/dev/sdc'
+ if os.path.exists(download_path) and os.path.exists(drive_path):
+ burn.burn(download_path, drive_path)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/provingground/test_build_download.py b/provingground/test_build_download.py
new file mode 100644
index 0000000..720facf
--- /dev/null
+++ b/provingground/test_build_download.py
@@ -0,0 +1,59 @@
+#!/usr/sbin/python
+# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import download_test_build
+import os
+import unittest
+
+"""
+This test can be used to download multiple test images using
+download_test_build.
+"""
+
+
+class TestBuilds(unittest.TestCase):
+ channel = 'dev'
+ branch = 'R28'
+ version = '4100.17.0'
+ image = []
+
+ def setUp(self):
+ self.build = download_test_build.download_image()
+
+ def tearDown(self):
+ """Remove all test paths."""
+# disabled by default
+# for path in self.image:
+# print 'Removing the download %s' % path
+# os.remove(path)
+
+ def test_daisy(self):
+ """Test downloading daisy."""
+ path = self.build.download(channel=self.channel,
+ device='daisy', branch=self.branch,
+ version=self.version)
+ self.image.append(path)
+ self.failUnless(os.path.exists(path))
+
+
+ def test_link(self):
+ """Test downloading link."""
+ path = self.build.download(channel=self.channel,
+ device='link', version=self.version,
+ branch=self.branch)
+ self.image.append(path)
+ self.failUnless(os.path.exists(path))
+
+
+ def test_parrot(self):
+ """Test downloading parrot."""
+ path = self.build.download(channel=self.channel,
+ device='parrot', branch=self.branch,
+ version=self.version)
+ self.image.append(path)
+ self.failUnless(os.path.exists(path))
+
+if __name__ == '__main__':
+ unittest.main()