blob: 4a1f2f09e11b3729ba37d2df95527d33e63a00c2 [file] [log] [blame]
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""cros deploy: Deploy the packages onto the target device."""
from __future__ import print_function
import os
import logging
import urlparse
from chromite import cros
from chromite.lib import cros_build_lib
from chromite.lib import portage_util
from chromite.lib import remote_access
class DeployCommand(cros.CrosCommand):
"""Deploy the requested packages to the target device.
This command assumes the requested packages are already built in the
chroot. This command needs to run inside the chroot for inspecting
the installed packages.
Note: If the rootfs on your device is read-only, this command
remounts it as read-write. If the rootfs verification is enabled on
your device, this command disables it.
EPILOG = """
To deploy packages:
cros deploy device power_manager cherrypy
cros deploy device /path/to/package
To uninstall packages:
cros deploy --unmerge cherrypy
For more information of cros build usage:
cros build -h
DEVICE_BASE_DIR = '/usr/local/tmp/cros-deploy'
# This is defined in src/platform/dev/
STRIPPED_PACKAGES_DIR = 'stripped-packages'
# Override base class property to enable stats upload.
upload_stats = True
def __init__(self, options):
"""Initializes DeployCommand."""
cros.CrosCommand.__init__(self, options)
self.emerge = True
self.strip = True
self.clean_binpkg = True
self.ssh_hostname = None
self.ssh_port = None
self.ssh_username = None
self.ssh_private_key = None
# The installation root of packages.
self.root = None = True
self.board = None
self.sysroot = None
def AddParser(cls, parser):
"""Add a parser."""
super(cls, DeployCommand).AddParser(parser)
'device', help='IP[:port] address of the target device.')
'packages', help='Packages to install. You can specify '
'[category/]package[:slot] or the path to the binary package.',
'--board', default=None, help='The board to use. By default it is '
'automatically detected. You can override the detected board with '
'this option.')
'--no-strip', dest='strip', action='store_false', default=True,
help='Do not run strip_package to filter out preset paths in the '
'package. Stripping removes debug symbol files and reduces the size '
'of the package significantly. Defaults to always strip.')
'--unmerge', dest='emerge', action='store_false', default=True,
help='Unmerge requested packages.')
'--root', default='/',
help="Package installation root, e.g. '/' or '/usr/local'"
"(default: '/').")
'--no-clean-binpkg', dest='clean_binpkg', action='store_false',
default=True, help='Do not clean outdated binary packages. '
' Defaults to always clean.')
'--emerge-args', default=None,
help='Extra arguments to pass to emerge.')
'--private-key', type='path', default=None,
help='SSH identify file (private key).')
'--no-ping', dest='ping', action='store_false', default=True,
help='Do not ping the device before attempting to connect to it.')
def _FindPackageCPV(self, pkg):
"""Returns the CPV of a package matching |pkg|."""
matches = portage_util.FindPackageNameMatches(pkg, board=self.board)
if not matches:
raise ValueError('Package %s is not installed!' % pkg)
idx = 0
if len(matches) > 1:
# Ask user to pick among multiple matches.
idx = cros_build_lib.GetChoice(
'Multiple matches found for %s: ' % pkg,
[os.path.join(x.category, x.pv) for x in matches])
return matches[idx]
def _GetPackageByCPV(self, cpv):
"""Returns the path to a binary package corresponding to |cpv|."""
packages_dir = None
if self.strip:
['strip_package', '--board', self.board,
os.path.join(cpv.category, '%s' % (cpv.pv))])
packages_dir = self.STRIPPED_PACKAGES_DIR
except cros_build_lib.RunCommandError:
logging.error('Cannot strip package %s', cpv)
return portage_util.GetBinaryPackagePath(
cpv.category, cpv.package, cpv.version, sysroot=self.sysroot,
def _GetLatestPackage(self, pkg):
"""Returns the path to latest binary package matching |pkg|."""
return self._GetPackageByCPV(self._FindPackageCPV(pkg))
def _Emerge(self, device, pkg, extra_args=None):
"""Copies |pkg| to |device| and emerges it.
device: A ChromiumOSDevice object.
pkg: A package name.
extra_args: Extra arguments to pass to emerge.
if os.path.isfile(pkg):
latest_pkg = pkg
latest_pkg = self._GetLatestPackage(pkg)
if not latest_pkg:
cros_build_lib.Die('Missing package %s.' % pkg)
pkgroot = os.path.join(device.work_dir, 'packages')
pkg_name = os.path.basename(latest_pkg)
pkg_dirname = os.path.basename(os.path.dirname(latest_pkg))
pkg_dir = os.path.join(pkgroot, pkg_dirname)
device.RunCommand(['mkdir', '-p', pkg_dir], remote_sudo=True)'Copying %s to device...', latest_pkg)
device.CopyToDevice(latest_pkg, pkg_dir, remote_sudo=True)
portage_tmpdir = os.path.join(device.work_dir, 'portage-tmp')
device.RunCommand(['mkdir', '-p', portage_tmpdir], remote_sudo=True)'Use portage temp dir %s', portage_tmpdir)'Installing %s...', latest_pkg)
pkg_path = os.path.join(pkg_dir, pkg_name)
# We set PORTAGE_CONFIGROOT to '/usr/local' because by default all
# chromeos-base packages will be skipped due to the configuration
# in /etc/protage/make.profile/package.provided. However, there is
# a known bug that /usr/local/etc/portage is not setup properly
# ( This does not affect `cros deploy` because
# we do not use the preset PKGDIR.
extra_env = {
'FEATURES': '-sandbox',
'PKGDIR': pkgroot,
'PORTAGE_CONFIGROOT': '/usr/local',
'PORTAGE_TMPDIR': portage_tmpdir,
'PORTDIR': device.work_dir,
cmd = ['emerge', '--usepkg', pkg_path, '--root=%s' % self.root]
if extra_args:
# Always showing the emerge output for clarity.
device.RunCommand(cmd, extra_env=extra_env, remote_sudo=True,
capture_output=False, debug_level=logging.INFO)
except Exception:
logging.error('Failed to emerge package %s', pkg)
else:'%s has been installed.', pkg)
# Free up the space for other packages.
device.RunCommand(['rm', '-rf', portage_tmpdir, pkg_dir],
error_code_ok=True, remote_sudo=True)
def _Unmerge(self, device, pkg):
"""Unmerges |pkg| on |device|.
device: A RemoteDevice object.
pkg: A package name.
"""'Unmerging %s...', pkg)
cmd = ['qmerge', '--yes']
# Check if qmerge is available on the device. If not, use emerge.
if device.RunCommand(
['qmerge', '--version'], error_code_ok=True).returncode != 0:
cmd = ['emerge']
cmd.extend(['--unmerge', pkg, '--root=%s' % self.root])
# Always showing the qmerge/emerge output for clarity.
device.RunCommand(cmd, capture_output=False, remote_sudo=True,
except Exception:
logging.error('Failed to unmerge package %s', pkg)
else:'%s has been uninstalled.', pkg)
def _ReadOptions(self):
"""Processes options and set variables."""
self.emerge = self.options.emerge
self.strip = self.options.strip
self.clean_binpkg = self.options.clean_binpkg
self.root = self.options.root =
device = self.options.device
# pylint: disable=E1101
if urlparse.urlparse(device).scheme == '':
# For backward compatibility, prepend ssh:// ourselves.
device = 'ssh://%s' % device
parsed = urlparse.urlparse(device)
if parsed.scheme == 'ssh':
self.ssh_hostname = parsed.hostname
self.ssh_username = parsed.username
self.ssh_port = parsed.port
self.ssh_private_key = self.options.private_key
cros_build_lib.Die('Does not support device %s' % self.options.device)
def Run(self):
"""Run cros deploy."""
device_connected = False
with remote_access.ChromiumOSDeviceHandler(
self.ssh_hostname, port=self.ssh_port, username=self.ssh_username,
private_key=self.ssh_private_key, base_dir=self.DEVICE_BASE_DIR, as device:
device_connected = True
self.board = cros_build_lib.GetBoard(device_board=device.board,
override_board=self.options.board)'Board is %s', self.board)
self.sysroot = cros_build_lib.GetSysroot(board=self.board)
if self.clean_binpkg:'Cleaning outdated binary packages for %s', self.board)
if not device.IsPathWritable(self.root):
# Only remounts rootfs if the given root is not writable.
if not device.MountRootfsReadWrite():
cros_build_lib.Die('Cannot remount rootfs as read-write. Exiting.')
for pkg in self.options.packages:
if self.emerge:
self._Emerge(device, pkg, extra_args=self.options.emerge_args)
self._Unmerge(device, pkg)
except (Exception, KeyboardInterrupt) as e:
logging.error('Cros Deploy terminated before completing!')
if device_connected and device.lsb_release:
lsb_entries = sorted(device.lsb_release.items())'Following are the LSB version details of the device:\n%s',
'\n'.join('%s=%s' % (k, v) for k, v in lsb_entries))
if self.options.debug:
else:'Cros Deploy completed successfully.')