blob: f564a6bc8b210a3864851c58df52e0eb2acda381 [file] [log] [blame]
#!/usr/bin/python
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""This implements calling of scripts and other utilities/tools.
We support running inside and outside the chroot. To do this, we permit
a ## prefix which resolves to the chroot. Within the chroot this will be
/ but outside it will be the full path to the chroot.
So we can use filenames like this:
##/usr/share/vboot/bitmaps/make_bmp_images.sh
"""
import doctest
import optparse
import os
import re
import shutil
import struct
import sys
import tempfile
import unittest
from chromite.lib import cros_build_lib
from chromite.lib import git
import cros_output
# Attributes defined outside __init__
#pylint: disable=W0201
class CmdError(Exception):
"""An error in the execution of a command."""
pass
class Tools:
"""A class to encapsulate the external tools we want to run.
This provides convenient functions for running tools inside/outside the
chroot.
Public properties:
outdir: The output directory to write output files to.
search_paths: The list of directories to search for files we are asked
to read.
The tools class also provides common paths:
chroot_path: chroot directory
src_path: source directory
script_path: scripts directory (src/scripts)
overlay_path: overlays directory (src/overlays)
priv_overlay_path: private overlays directory (src/private-overlays)
board_path: build directory (/build in chroot)
third_party_path: third_parth directory (src/third_party)
cros_overlay_path: Chromium OS overlay (src/chromiumos-overlay)
"""
def __init__(self, output):
"""Set up the tools system.
Args:
output: cros_output object to use for output.
Raises:
IOError: Unable to find .repo directory
"""
# Detect whether we're inside a chroot or not
self.in_chroot = cros_build_lib.IsInsideChroot()
self._out = output
self._root = None
self.chroot_path = None
root_dir = None
if self.in_chroot:
root_dir = os.getenv('CROS_WORKON_SRCROOT')
else:
repo = git.FindRepoDir('.')
if repo:
root_dir = os.path.dirname(repo)
if root_dir:
self._SetRoot(root_dir)
self._out.Info("Chroot is at '%s'" % self.chroot_path)
else:
self._out.Info('Running outside chroot')
self._tools = {
'make_bmp_image': '##/usr/share/vboot/bitmaps/make_bmp_images.sh',
'bct_dump': '##/usr/bin/bct_dump',
'tegrarcm': '##/usr/bin/tegrarcm',
'gbb_utility': '##/usr/bin/gbb_utility',
'cbfstool': '##/usr/bin/cbfstool',
'fdisk': '##/sbin/fdisk',
}
self.outdir = None # We have no output directory yet
self.search_paths = []
def __enter__(self):
return self
def __exit__(self, the_type, value, traceback):
self.FinalizeOutputDir()
return False
def _SetRoot(self, root_dir):
"""Sets the root directory for the build envionrment.
The root directory is the one containing .repo, chroot and src.
This should be called once the root is known. All other parts are
calculated from this.
Args:
root_dir: The path to the root directory.
"""
self._root = os.path.normpath(root_dir)
# Set up the path to prepend to get to the chroot
if self.in_chroot:
self.chroot_path = '/'
else:
self.chroot_path = os.path.join(self._root, 'chroot')
self.src_path = os.path.join(self._root, 'src')
self.script_path = os.path.join(self.src_path, 'scripts')
self.overlay_path = os.path.join(self.src_path, 'overlays')
self.priv_overlay_path = os.path.join(self.src_path,
'private-overlays')
self.board_path = os.path.join(self.chroot_path, 'build')
self.third_party_path = os.path.join(self.src_path, 'third_party')
self.cros_overlay_path = os.path.join(self.third_party_path,
'chromiumos-overlay')
def Filename(self, fname):
"""Resolve a file path to an absolute path.
If fname starts with ##/ and chroot is available, ##/ gets replaced with
the chroot path. If chroot is not available, this file name can not be
resolved, `None' is returned.
If fname is not prepended with the above prefix, and is not an existing
file, the actual file name is retrieved from the passed in string and the
search_paths directories (if any) are searched to for the file. If found -
the path to the found file is returned, `None' is returned otherwise.
Args:
fname: a string, the path to resolve.
Returns:
Absolute path to the file or None if not found.
"""
if fname.startswith('##/'):
if self.chroot_path:
fname = os.path.join(self.chroot_path, fname[3:])
else:
return None
# Search for a pathname that exists, and return it if found
if fname and not os.path.exists(fname):
for path in self.search_paths:
pathname = os.path.join(path, os.path.basename(fname))
if os.path.exists(pathname):
return pathname
# If not found, just return the standard, unchanged path
return fname
def Run(self, tool, args, cwd=None, sudo=False):
"""Run a tool with given arguments.
The tool name may be used unchanged or substituted with a full path if
required.
The tool and arguments can use ##/ to signify the chroot (at the beginning
of the tool/argument).
Args:
tool: Name of tool to run.
args: List of arguments to pass to tool.
cwd: Directory to change into before running tool (None if none).
sudo: True to run the tool with sudo
Returns:
Output of tool (stdout).
Raises:
CmdError: If running the tool, or the tool itself creates an error.
"""
if tool in self._tools:
tool = self._tools[tool]
tool = self.Filename(tool)
args = [self.Filename(arg) for arg in args]
cmd = [tool] + args
if sudo and os.getuid():
cmd.insert(0, 'sudo')
try:
self._out.Debug('will run: "%s"' % ' '.join(cmd))
result = cros_build_lib.RunCommand(
cmd, cwd=cwd, capture_output=True,
combine_stdout_stderr=True, error_code_ok=True)
except cros_build_lib.RunCommandError as ex:
raise CmdError(str(ex))
stdout = result.output
if result.returncode:
raise CmdError('Command failed: %s\n%s' % (' '.join(cmd), stdout))
self._out.Debug(stdout)
return stdout
def ReadFile(self, fname):
"""Read and return the contents of a file.
Args:
fname: path to filename to read, where ## signifiies the chroot.
Returns:
data read from file, as a string.
"""
fd = open(self.Filename(fname), 'rb')
data = fd.read()
fd.close()
self._out.Info("Read file '%s' size %d (%#0x)" %
(fname, len(data), len(data)))
return data
def WriteFile(self, fname, data):
"""Write data into a file.
Args:
fname: path to filename to write, where ## signifiies the chroot.
data: data to write to file, as a string.
"""
self._out.Info("Write file '%s' size %d (%#0x)" %
(fname, len(data), len(data)))
fd = open(self.Filename(fname), 'wb')
fd.write(data)
fd.close()
def ReadFileAndConcat(self, filenames, compress=None, with_index=False):
"""Read several files and concat them.
Args:
filenames: a list containing name of the files to read.
compress: a string, the name of the compression method, if any, to apply
to the files. The only supported method is 'lzo'
with_index: If true, an index structure is prepended to the data.
Returns:
A tuple of a string and two list. The string is the concated data read
from file, in the same order as in filenames, aligned to 4-byte. The
first list contains the offset of each file in the data string and
the second one contains the actual (non-padded) length of each file,
both in the same order.
The optional index structure is a 32 bit integer set to the number of
entries in the index, followed by that many pairs of integers which
describe the offset and length of each chunk.
"""
data = ''
offsets = []
lengths = []
for fname in filenames:
offsets.append(len(data))
content = self.ReadFile(fname)
pad_len = ((len(content) + 3) & ~3) - len(content)
data += content + chr(0xff) * pad_len
lengths.append(len(content))
if with_index:
index_size = 4 + len(filenames) * 8
index = struct.pack("<I", len(filenames))
offsets = tuple(offset + index_size for offset in offsets)
for _, offset, length in zip(filenames, offsets, lengths):
index += struct.pack("<II", offset, length)
data = index + data
if compress:
if compress == 'lzo':
# Would be nice to just pipe here. but we don't have RunPipe().
fname = self.GetOutputFilename('data.tmp')
outname = self.GetOutputFilename('data.tmp.lzo')
if os.path.exists(outname):
os.remove(outname)
self.WriteFile(fname, data)
args = ['-9', fname]
self.Run('lzop', args)
data = self.ReadFile(outname)
else:
raise ValueError("Unknown compression method '%s'" % compress)
return data, offsets, lengths
def GetChromeosVersion(self):
"""Returns the ChromeOS version string.
This works by finding and executing the version script:
src/third_party/chromiumos-overlay/chromeos/config/chromeos_version.sh
Returns:
Version string in the form '0.14.726.2011_07_07_1635'
Raises:
CmdError: If the version script cannot be found, or is found but cannot
be executed.
"""
version_script = os.path.join(self.cros_overlay_path, 'chromeos', 'config',
'chromeos_version.sh')
if os.path.exists(version_script):
result = self.Run('sh', ['-c', '. %s >/dev/null; '
'echo ${CHROMEOS_VERSION_STRING}'
% version_script])
return result.strip()
raise CmdError("Cannot find version script 'chromeos_version.sh'")
def CheckTool(self, name, ebuild=None):
"""Check that the specified tool exists.
If it does not exist in the PATH, then generate a useful error message
indicating how to install the ebuild that contains the required tool.
Args:
name: filename of tool to look for on path.
ebuild: name of ebuild which should be emerged to install this tool,
or None if it is the same as the filename.
Raises:
CmdError(msg) if the tool is not found.
"""
try:
filename = name
if filename in self._tools:
filename = self._tools[filename]
filename = self.Filename(filename)
self.Run('which', [filename])
except CmdError:
raise CmdError("The '%s' utility was not found in your path. "
"Run the following command in \nyour chroot to install "
"it: sudo -E emerge %s" % (filename, ebuild or name))
def OutputSize(self, label, filename, level=cros_output.NOTICE):
"""Display the filename and size of an object.
Args:
label: Label for this file.
filename: Filename to output.
level: Verbosity level to attach to this message
"""
filename = self.Filename(filename)
size = os.stat(filename).st_size
self._out.DoOutput(level, '%s: %s; size: %d / %#x' %
(label, filename, size, size))
def PrepareOutputDir(self, outdir, preserve=False):
"""Select an output directory, ensuring it exists.
This either creates a temporary directory or checks that the one supplied
by the user is valid. For a temporary directory, it makes a note to
remove it later if required.
Args:
outdir: a string, name of the output directory to use to store
intermediate and output files. If is None - create a temporary
directory.
preserve: a Boolean. If outdir above is None and preserve is False, the
created temporary directory will be destroyed on exit.
Raises:
OSError: If it cannot create the output directory.
"""
self.preserve_outdir = outdir or preserve
if outdir:
self.outdir = outdir
if not os.path.isdir(self.outdir):
try:
os.makedirs(self.outdir)
except OSError as err:
raise CmdError("Cannot make output directory '%s': '%s'" %
(self.outdir, err.strerror))
else:
self.outdir = tempfile.mkdtemp(prefix='cros-dev.')
self._out.Debug("Using temporary directory '%s'" % self.outdir)
def FinalizeOutputDir(self):
"""Tidy up: delete output directory if temporary and not preserved."""
if self.outdir and not self.preserve_outdir:
shutil.rmtree(self.outdir)
self._out.Debug("Deleted temporary directory '%s'" %
self.outdir)
self.outdir = None
def GetOutputFilename(self, fname):
"""Return a filename within the output directory.
Args:
fname: Filename to use for new file
Returns:
The full path of the filename, within the output directory
"""
return os.path.join(self.outdir, fname)
# pylint: disable=W0212
class ToolsTests(unittest.TestCase):
"""Unit tests for this module."""
def setUp(self):
self.out = cros_output.Output(False)
self.tools = Tools(self.out)
def MakeOutsideChroot(self, base):
tools = Tools(self.out)
tools.in_chroot = False
tools._SetRoot(base)
return tools
def testPaths(self):
tools = self.tools
self.assertTrue(os.path.isdir(os.path.join(tools._root, '.repo')))
def _testToolsPaths(self, base, tools):
"""Common paths tests to run inside and outside chroot.
These tests are the same inside and outside the choot, so we put them in a
separate function.
Args:
base: Base directory to use for testing (contains the 'src' directory).
tools: Tools object to use.
"""
self.assertEqual(tools._root, base[:-1])
self.assertEqual(tools.src_path, base + 'src')
self.assertEqual(tools.script_path, base + 'src/scripts')
self.assertEqual(tools.overlay_path, base + 'src/overlays')
self.assertEqual(tools.priv_overlay_path, base + 'src/private-overlays')
self.assertEqual(tools.third_party_path, base + 'src/third_party')
self.assertEqual(tools.cros_overlay_path, base +
'src/third_party/chromiumos-overlay')
def testSetRootInsideChroot(self):
"""Inside the chroot, paths are slightly different from outside."""
tools = Tools(self.out)
tools.in_chroot = True
# Force our own path.
base = '/air/bridge/'
tools._SetRoot(base)
# We should get a full path from that without the trailing '/'.
self.assertEqual(tools.chroot_path, '/')
self.assertEqual(tools.board_path, '/build')
self._testToolsPaths(base, tools)
def testSetRootOutsideChroot(self):
"""Pretend to be outside the chroot, and check that paths are correct."""
# Force our own path, outside the chroot.
base = '/spotty/light/'
tools = self.MakeOutsideChroot(base)
# We should get a full path from that without the trailing '/'.
self.assertEqual(tools.chroot_path, base + 'chroot')
self.assertEqual(tools.board_path, tools.chroot_path + '/build')
self._testToolsPaths(base, tools)
def _testToolsFilenames(self, tools):
"""Common filename tests to run inside and outside chroot.
These tests are the same inside and outside the choot, so we put them in a
separate function.
Args:
tools: Tools object to use.
"""
self.assertEqual(tools.Filename('/root/based/'),
'/root/based/')
# Try search paths in /bin and /ls.
tools.search_paths = ['/bin', '/lib']
file_in_bin = os.listdir('/bin')[0]
self.assertEqual(tools.Filename(file_in_bin), '/bin/%s' % file_in_bin)
file_in_lib = os.listdir('/lib')[0]
self.assertEqual(tools.Filename(file_in_lib), '/lib/%s' % file_in_lib)
self.assertEqual(tools.Filename('i-am-not-here'), 'i-am-not-here')
# Don't search for an empty file.
self.assertEqual(tools.Filename(''), '')
def testFilenameInsideChroot(self):
"""Test that we can specify search paths and they work correctly.
Test search patches inside the chroot.
"""
tools = Tools(self.out)
tools.in_chroot = True
# Force our own path.
base = '/air/bridge/'
tools._SetRoot(base)
self.assertEqual(tools.Filename('##/fred'), '/fred')
self.assertEqual(tools.Filename('##/just/a/short/dir/'),
'/just/a/short/dir/')
self._testToolsFilenames(tools)
def testFilenameOutsideChroot(self):
"""Test that we can specify search paths and they work correctly.
Test search patches outside the chroot.
"""
base = '/home/'
tools = self.MakeOutsideChroot(base)
self.assertEqual(tools.Filename('##/fred'), base + 'chroot/fred')
self.assertEqual(tools.Filename('##/just/a/short/dir/'),
base + 'chroot/just/a/short/dir/')
self._testToolsFilenames(tools)
def testReadWriteFile(self):
"""Test our read/write utility functions."""
tools = Tools(self.out)
tools.PrepareOutputDir(None)
data = 'some context here' * 2
fname = tools.GetOutputFilename('bang')
tools.WriteFile(fname, data)
# Check that the file looks correct.
compare = tools.ReadFile(fname)
self.assertEqual(data, compare)
def testReadFileAndConcat(self):
"""Test 'cat' of several files."""
tools = Tools(self.out)
tools.PrepareOutputDir(None)
file_list = ['one', 'empty', 'two', 'three', 'four']
out_list = [tools.GetOutputFilename(fname) for fname in file_list]
file_list[1] = '' # Empty the 'empty' file.
for upto in range(len(file_list)):
tools.WriteFile(out_list[upto], file_list[upto])
data, offset, length = tools.ReadFileAndConcat(out_list)
self.assertEqual(len(data), 20)
self.assertEqual(offset, [0, 4, 4, 8, 16])
self.assertEqual(length, [3, 0, 3, 5, 4])
def testGetChromeosVersion(self):
"""Test for GetChromeosVersion() inside and outside chroot.
This function returns a string like '2893.0.2012_09_16_2219'.
"""
tools = self.tools
re_version_pattern = '^\d{4}.\d+.(?:\d{4}_\d{2}_\d{2}_\d+|\d+)$'
re_version = re.compile(re_version_pattern)
reported_version = tools.GetChromeosVersion()
self.assertTrue(re_version.match(reported_version),
msg='%s !~= %s' % (reported_version, re_version_pattern))
tools = Tools(self.out)
# Force our own path, outside the chroot. This should fail.
base = 'invalid-dir'
tools = self.MakeOutsideChroot(base)
tools.in_chroot = False
self.assertRaises(CmdError, tools.GetChromeosVersion)
def testCheckTool(self):
"""Test for the CheckTool() method."""
tools = self.tools
tools.CheckTool('fdisk')
tools.CheckTool('gbb_utility')
self.assertRaises(CmdError, tools.CheckTool, 'non-existent-tool')
tools.CheckTool('fdisk')
self.assertRaises(CmdError, tools.CheckTool, '/usr/bin/fdisk')
def testRun(self):
"""Test for the Run() method."""
tools = self.tools
# Ask fdisk for its version - this utility must be in the chroot.
re_fdisk = re.compile('fdisk .*util-linux .*')
self.assertTrue(re_fdisk.match(tools.Run('fdisk', ['-v'])))
# We need sudo for looking at disks.
out = tools.Run('fdisk', ['-l', '/dev/sda'], sudo=True)
# Don't look at the specific output, but it will have > 5 lines.
self.assertTrue(len(out.splitlines()) > 5)
self.assertEqual(tools.Run('pwd', [], cwd='/tmp'), '/tmp\n')
def testOutputDir(self):
"""Test output directory creation and deletion."""
tools = self.tools
# First check basic operation, creating and deleting a tmpdir.
tools.PrepareOutputDir(None)
fname = tools.GetOutputFilename('fred')
tools.WriteFile(fname, 'You are old, Father William, the young man said')
dirname = tools.outdir
tools.FinalizeOutputDir()
self.assertFalse(os.path.exists(fname))
self.assertFalse(os.path.exists(dirname))
# Try preserving it.
tools.PrepareOutputDir(None, True)
fname = tools.GetOutputFilename('fred')
tools.WriteFile(fname, 'and your hair has become very white')
dirname = tools.outdir
tools.FinalizeOutputDir()
self.assertTrue(os.path.exists(fname))
self.assertTrue(os.path.exists(dirname))
shutil.rmtree(dirname)
# Use our own directory, which is always preserved.
testdir = '/tmp/tools-test.test'
tools.PrepareOutputDir(testdir)
fname = tools.GetOutputFilename('fred')
tools.WriteFile(fname, 'and yet you incessantly stand on your head')
dirname = tools.outdir
tools.FinalizeOutputDir()
self.assertTrue(os.path.exists(fname))
self.assertTrue(os.path.exists(dirname))
shutil.rmtree(dirname)
# Try creating an invalid directory.
testdir = '/sys/cannot/do/this/here'
self.assertRaises(CmdError, tools.PrepareOutputDir, testdir)
fname = tools.GetOutputFilename('fred')
self.assertRaises(IOError, tools.WriteFile, fname,
'do you think at your age it is right?')
dirname = tools.outdir
tools.FinalizeOutputDir()
def _OutputMock(self, level, msg, color=None):
self._level = level
self._msg = msg
self._color = color
def testOutputSize(self):
"""Test for OutputSize() function."""
tools = self.tools
# Rather than mocks, use a special Output object.
out = tools._out
out._Output = self._OutputMock
tools.PrepareOutputDir(None)
fname = tools.GetOutputFilename('fred')
text_string = 'test of output size'
tools.WriteFile(fname, text_string)
re_fname = re.compile('fred')
re_size = re.compile('.*size: (\d*)')
tools.OutputSize('first', fname, level=cros_output.ERROR)
self.assertEqual(self._level, cros_output.ERROR)
self.assertTrue(re_fname.search(self._msg))
self.assertEqual(self._color, None)
# Check the default level, and that the filename length is given.
tools.OutputSize('second', fname)
self.assertEqual(self._level, cros_output.NOTICE)
self.assertTrue(re_fname.search(self._msg))
self.assertEqual(self._color, None)
m = re_size.match(self._msg)
self.assertEqual(m.group(1), str(len(text_string)))
tools.FinalizeOutputDir()
def _Test(argv):
"""Run any built-in tests."""
unittest.main(argv=argv)
assert doctest.testmod().failed == 0
def main():
"""Main function for tools.
We provide a way to call a few of our useful functions.
TODO(sjg) Move into the Chromite libraries when these are ready.
"""
parser = optparse.OptionParser()
parser.add_option('-v', '--verbosity', dest='verbosity', default=1,
type='int',
help='Control verbosity: 0=silent, 1=progress, 3=full, '
'4=debug')
help_str = '%s [options] cmd [args]\n\nAvailable commands:\n' % sys.argv[0]
help_str += '\tchromeos-version\tDisplay Chrome OS version'
parser.usage = help_str
(options, args) = parser.parse_args(sys.argv)
args = args[1:]
out = cros_output.Output(options.verbosity)
tools = Tools(out)
if not args:
parser.error('No command provided')
elif args[0] == 'chromeos-version':
print tools.GetChromeosVersion()
else:
parser.error("Unknown command '%s'" % args[0])
if __name__ == '__main__':
if sys.argv[1:2] == ['--test']:
_Test([sys.argv[0]] + sys.argv[2:])
else:
main()