blob: 6bc30c90b690fb991b92a361a1e3aff02f69d7b0 [file] [log] [blame]
#!/usr/bin/python -b
# Copyright 2017 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
#
# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Core implementation of doins ebuild helper command.
This script is designed to be executed by ebuild-helpers/doins.
"""
from __future__ import print_function
import argparse
import errno
import grp
import logging
import os
import pwd
import re
import shlex
import shutil
import stat
import subprocess
import sys
from portage.util import movefile
from portage.util.file_copy import copyfile
def _warn(helper, msg):
"""Output warning message to stderr.
Args:
helper: helper executable name.
msg: Message to be output.
"""
print('!!! %s: %s\n' % (helper, msg), file=sys.stderr)
def _parse_group(group):
"""Parses gid.
Args:
group: string representation of the group. Maybe name or gid.
Returns:
Parsed gid.
"""
try:
return grp.getgrnam(group).gr_gid
except KeyError:
pass
return int(group)
def _parse_user(user):
"""Parses uid.
Args:
user: string representation of the user. Maybe name or uid.
Returns:
Parsed uid.
"""
try:
return pwd.getpwnam(user).pw_uid
except KeyError:
pass
return int(user)
def _parse_mode(mode):
"""Parses mode.
Args:
mode: string representation of the permission.
Returns:
Parsed mode.
"""
# `install`'s --mode option is complicated. So here is partially
# supported.
try:
return int(mode, 8)
except ValueError:
# In case of fail, returns None, so that caller can check
# if unknown '-m' is set or not.
return None
def _parse_install_options(
options, is_strict, helper, inprocess_runner_class,
subprocess_runner_class):
"""Parses command line arguments for `install` command.
Args:
options: string representation of `install` options.
is_strict: bool. If True, this exits the program in case of
that an unknown option is found.
helper: helper executable name.
inprocess_runner_class: Constructor to run procedure which
`install` command will do.
subprocess_runner_class: Constructor to run `install` command.
"""
parser = argparse.ArgumentParser()
parser.add_argument('-g', '--group', default=-1, type=_parse_group)
parser.add_argument('-o', '--owner', default=-1, type=_parse_user)
parser.add_argument('-m', '--mode', default=0o755, type=_parse_mode)
parser.add_argument('-p', '--preserve-timestamps', action='store_true')
split_options = shlex.split(options)
namespace, remaining = parser.parse_known_args(split_options)
if namespace.preserve_timestamps and sys.version_info < (3, 3):
# -p is not supported in this case, since timestamps cannot
# be preserved with full precision
remaining.append('-p')
# Because parsing '--mode' option is partially supported. If unknown
# arg for --mode is passed, namespace.mode is set to None.
if remaining or namespace.mode is None:
_warn(helper, 'Unknown install options: %s, %r' % (
options, remaining))
if is_strict:
sys.exit(1)
_warn(helper, 'Continue with falling back to `install` '
'command execution, which can be slower.')
return subprocess_runner_class(split_options)
return inprocess_runner_class(namespace)
def _set_attributes(options, path):
"""Sets attributes the file/dir at given |path|.
Args:
options: object which has |owner|, |group| and |mode| fields.
|owner| is int value representing uid. Similary |group|
represents gid.
If -1 is set, just unchanged.
|mode| is the bits of permissions.
path: File/directory path.
"""
if options.owner != -1 or options.group != -1:
os.lchown(path, options.owner, options.group)
if options.mode is not None:
os.chmod(path, options.mode)
def _set_timestamps(source_stat, dest):
"""Apply timestamps from source_stat to dest.
Args:
source_stat: stat result for the source file.
dest: path to the dest file.
"""
os.utime(dest, (source_stat.st_atime, source_stat.st_mtime))
if sys.version_info >= (3, 3):
def _set_timestamps_ns(source_stat, dest):
os.utime(dest, ns=(source_stat.st_atime_ns, source_stat.st_mtime_ns))
_set_timestamps_ns.__doc__ = _set_timestamps.__doc__
_set_timestamps = _set_timestamps_ns
class _InsInProcessInstallRunner(object):
"""Implements `install` command behavior running in a process."""
def __init__(self, opts, parsed_options):
"""Initializes the instance.
Args:
opts: namespace object containing the parsed
arguments for this program.
parsed_options: namespace object contaning the parsed
options for `install`.
"""
self._parsed_options = parsed_options
self._helper = opts.helper
self._copy_xattr = opts.enable_copy_xattr
if self._copy_xattr:
self._xattr_exclude = opts.xattr_exclude
def run(self, source, dest_dir):
"""Installs a file at |source| into |dest_dir| in process.
Args:
source: Path to the file to be installed.
dest_dir: Path to the directory which |source| will be
installed into.
Returns:
True on success, otherwise False.
"""
dest = os.path.join(dest_dir, os.path.basename(source))
# Raise an exception if stat(source) fails, intentionally.
sstat = os.stat(source)
if not self._is_install_allowed(source, sstat, dest):
return False
# To emulate the `install` command, remove the dest file in
# advance.
try:
os.unlink(dest)
except OSError as e:
# Removing a non-existing entry should be handled as a
# regular case.
if e.errno != errno.ENOENT:
raise
try:
copyfile(source, dest)
_set_attributes(self._parsed_options, dest)
if self._copy_xattr:
movefile._copyxattr(
source, dest,
exclude=self._xattr_exclude)
if self._parsed_options.preserve_timestamps:
_set_timestamps(sstat, dest)
except Exception:
logging.exception(
'Failed to copy file: '
'_parsed_options=%r, source=%r, dest_dir=%r',
self._parsed_options, source, dest_dir)
return False
return True
def _is_install_allowed(self, source, source_stat, dest):
"""Returns if installing source into dest should work.
This is to keep compatibility with the `install` command.
Args:
source: path to the source file.
source_stat: stat result for the source file, using stat()
rather than lstat(), in order to match the `install`
command
dest: path to the dest file.
Returns:
True if it should succeed.
"""
# To match `install` command, use stat() for source, while
# lstat() for dest.
try:
dest_lstat = os.lstat(dest)
except OSError as e:
# It is common to install a file into a new path,
# so if the destination doesn't exist, ignore it.
if e.errno == errno.ENOENT:
return True
raise
# Allowing install, if the target is a symlink.
if stat.S_ISLNK(dest_lstat.st_mode):
return True
# Allowing install, if source file and dest file are different.
# Note that, later, dest will be unlinked.
if not os.path.samestat(source_stat, dest_lstat):
return True
# Allowing install, in hardlink case, if the actual path are
# different, because source can be preserved even after dest is
# unlinked.
if (dest_lstat.st_nlink > 1 and
os.path.realpath(source) != os.path.realpath(dest)):
return True
_warn(self._helper, '%s and %s are same file.' % (
source, dest))
return False
class _InsSubprocessInstallRunner(object):
"""Runs `install` command in a subprocess to install a file."""
def __init__(self, split_options):
"""Initializes the instance.
Args:
split_options: Command line options to be passed to
`install` command. List of str.
"""
self._split_options = split_options
def run(self, source, dest_dir):
"""Installs a file at |source| into |dest_dir| by `install`.
Args:
source: Path to the file to be installed.
dest_dir: Path to the directory which |source| will be
installed into.
Returns:
True on success, otherwise False.
"""
command = ['install'] + self._split_options + [source, dest_dir]
return subprocess.call(command) == 0
class _DirInProcessInstallRunner(object):
"""Implements `install` command behavior running in a process."""
def __init__(self, parsed_options):
"""Initializes the instance.
Args:
parsed_options: namespace object contaning the parsed
options for `install`.
"""
self._parsed_options = parsed_options
def run(self, dest):
"""Installs a dir into |dest| in process.
Args:
dest: Path where a directory should be created.
"""
try:
os.makedirs(dest)
except OSError as e:
if e.errno != errno.EEXIST or not os.path.isdir(dest):
raise
_set_attributes(self._parsed_options, dest)
class _DirSubprocessInstallRunner(object):
"""Runs `install` command to create a directory."""
def __init__(self, split_options):
"""Initializes the instance.
Args:
split_options: Command line options to be passed to
`install` command. List of str.
"""
self._split_options = split_options
def run(self, dest):
"""Installs a dir into |dest| by `install` command.
Args:
dest: Path where a directory should be created.
"""
command = ['install', '-d'] + self._split_options + [dest]
subprocess.check_call(command)
class _InstallRunner(object):
"""Handles `install` command operation.
Runs operations which `install` command should work. If possible,
this may just call in-process functions, instead of executing `install`
in a subprocess for performance.
"""
def __init__(self, opts):
"""Initializes the instance.
Args:
opts: namespace object containing the parsed
arguments for this program.
"""
self._ins_runner = _parse_install_options(
opts.insoptions,
opts.strict_option,
opts.helper,
lambda options: _InsInProcessInstallRunner(
opts, options),
_InsSubprocessInstallRunner)
self._dir_runner = _parse_install_options(
opts.diroptions,
opts.strict_option,
opts.helper,
_DirInProcessInstallRunner,
_DirSubprocessInstallRunner)
self._helpers_can_die = opts.helpers_can_die
def install_file(self, source, dest_dir):
"""Installs a file at |source| into |dest_dir| directory.
Args:
source: Path to the file to be installed.
dest_dir: Path to the directory which |source| will be
installed into.
Returns:
True on success, otherwise False.
"""
return self._ins_runner.run(source, dest_dir)
def install_dir(self, dest):
"""Creates a directory at |dest|.
Args:
dest: Path where a directory should be created.
"""
try:
self._dir_runner.run(dest)
except Exception:
if self._helpers_can_die:
raise
logging.exception('install_dir failed.')
def _doins(opts, install_runner, relpath, source_root):
"""Installs a file as if `install` command runs.
Installs a file at |source_root|/|relpath| into
|opts.dest|/|relpath|.
If |args.preserve_symlinks| is set, creates symlink if the source is a
symlink.
Args:
opts: parsed arguments. It should have following fields.
- preserve_symlinks: bool representing whether symlinks
needs to be preserved.
- dest: Destination root directory.
- distdir: location where Portage stores the downloaded
source code archives.
install_runner: _InstallRunner instance for file install.
relpath: Relative path of the file being installed.
source_root: Source root directory.
Returns: True on success.
"""
source = os.path.join(source_root, relpath)
dest = os.path.join(opts.dest, relpath)
if os.path.islink(source):
# Our fake $DISTDIR contains symlinks that should not be
# reproduced inside $D. In order to ensure that things like
# dodoc "$DISTDIR"/foo.pdf work as expected, we dereference
# symlinked files that refer to absolute paths inside
# $PORTAGE_ACTUAL_DISTDIR/.
try:
if (opts.preserve_symlinks and
not os.readlink(source).startswith(
opts.distdir)):
linkto = os.readlink(source)
try:
os.unlink(dest)
except OSError as e:
if e.errno == errno.EISDIR:
shutil.rmtree(dest, ignore_errors=True)
os.symlink(linkto, dest)
return True
except Exception:
logging.exception(
'Failed to create symlink: '
'opts=%r, relpath=%r, source_root=%r',
opts, relpath, source_root)
return False
return install_runner.install_file(source, os.path.dirname(dest))
def _create_arg_parser():
"""Returns the parser for the command line arguments."""
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
'--recursive', action='store_true',
help='If set, installs files recursively. Otherwise, '
'just skips directories.')
parser.add_argument(
'--preserve_symlinks', action='store_true',
help='If set, a symlink will be installed as symlink.')
parser.add_argument(
'--helpers_can_die', action='store_true',
help='If set, die in isolated-functions.sh is enabled. '
'Specifically this is used to keep compatible dodir\'s '
'behavior.')
parser.add_argument(
'--distdir', default='', help='Path to the actual distdir.')
parser.add_argument(
'--insoptions', default='',
help='Options passed to `install` command for installing a '
'file.')
parser.add_argument(
'--diroptions', default='',
help='Options passed to `install` command for installing a '
'dir.')
parser.add_argument(
'--strict_option', action='store_true',
help='If set True, abort if insoptions/diroptions contains an '
'option which cannot be interpreted by this script, instead of '
'fallback to execute `install` command.')
parser.add_argument(
'--enable_copy_xattr', action='store_true',
help='Copies xattrs, if set True')
parser.add_argument(
'--xattr_exclude', default='',
help='White space delimited glob pattern to exclude xattr copy.'
'Used only if --enable_xattr_copy is set.')
# If helper is dodoc, it changes the behavior for the directory
# install without --recursive.
parser.add_argument('--helper', help='Name of helper.')
parser.add_argument(
'--dest',
help='Destination where the files are installed.')
parser.add_argument(
'sources', nargs='*',
help='Source file/directory paths to be installed.')
return parser
def _parse_args(argv):
"""Parses the command line arguments.
Args:
argv: command line arguments to be parsed.
Returns:
namespace instance containing the parsed argument data.
"""
parser = _create_arg_parser()
opts = parser.parse_args(argv)
# Encode back to the original byte stream. Please see
# http://bugs.python.org/issue8776.
if sys.version_info.major >= 3:
opts.distdir = os.fsencode(opts.distdir) + b'/'
opts.dest = os.fsencode(opts.dest)
opts.sources = [os.fsencode(source) for source in opts.sources]
return opts
def _install_dir(opts, install_runner, source):
"""Installs directory at |source|.
Args:
opts: namespace instance containing parsed command line
argument data.
install_runner: _InstallRunner instance for dir install.
source: Path to the source directory.
Returns:
True on success, False on failure, or None on skipped.
"""
if not opts.recursive:
if opts.helper == 'dodoc':
_warn(opts.helper, '%s is a directory' % (source,))
return False
# Neither success nor fail. Return None to indicate skipped.
return None
# Strip trailing '/'s.
source = source.rstrip(b'/')
source_root = os.path.dirname(source)
dest_dir = os.path.join(opts.dest, os.path.basename(source))
install_runner.install_dir(dest_dir)
relpath_list = []
for dirpath, dirnames, filenames in os.walk(source):
for dirname in dirnames:
source_dir = os.path.join(dirpath, dirname)
relpath = os.path.relpath(source_dir, source_root)
if os.path.islink(source_dir):
# If this is a symlink, it will be processed
# in _doins() called later.
relpath_list.append(relpath)
else:
dest = os.path.join(opts.dest, relpath)
install_runner.install_dir(dest)
relpath_list.extend(
os.path.relpath(
os.path.join(dirpath, filename), source_root)
for filename in filenames)
if not relpath_list:
# NOTE: Even if only an empty directory is installed here, it
# still counts as success, since an empty directory given as
# an argument to doins -r should not trigger failure.
return True
success = True
for relpath in relpath_list:
if not _doins(opts, install_runner, relpath, source_root):
success = False
return success
def main(argv):
opts = _parse_args(argv)
install_runner = _InstallRunner(opts)
if not os.path.isdir(opts.dest):
install_runner.install_dir(opts.dest)
any_success = False
any_failure = False
for source in opts.sources:
if (os.path.isdir(source) and
(not opts.preserve_symlinks or
not os.path.islink(source))):
ret = _install_dir(opts, install_runner, source)
if ret is None:
continue
if ret:
any_success = True
else:
any_failure = True
else:
if _doins(
opts, install_runner,
os.path.basename(source),
os.path.dirname(source)):
any_success = True
else:
any_failure = True
return 0 if not any_failure and any_success else 1
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))