blob: 22bbbcc1f7a8f3719147e63fa880bbf22b2111c6 [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Common file and os related utilities, including tempdir manipulation."""
import contextlib
import errno
import logging
import os
import shutil
import tempfile
from chromite.lib import cros_build_lib
# Env vars that tempdir can be gotten from; minimally, this
# needs to match python's tempfile module and match normal
# unix standards.
_TEMPDIR_ENV_VARS = ('TMPDIR', 'TEMP', 'TMP')
def ExpandPath(path):
"""Returns path after passing through realpath and expanduser."""
return os.path.realpath(os.path.expanduser(path))
def WriteFile(path, content, mode='w', atomic=False):
"""Write the given content to disk.
Args:
path: Pathway to write the content to.
content: Content to write. May be either an iterable, or a string.
mode: Optional; if binary mode is necessary, pass 'wb'. If appending is
desired, 'w+', etc.
atomic: If the updating of the file should be done atomically. Note this
option is incompatible w/ append mode.
"""
write_path = path
if atomic:
write_path = path + '.tmp'
with open(write_path, mode) as f:
f.writelines(cros_build_lib.iflatten_instance(content))
if not atomic:
return
try:
os.rename(write_path, path)
except EnvironmentError:
SafeUnlink(write_path)
raise
def Touch(path):
"""Simulate unix touch. Create if doesn't exist and update its timestamp."""
# Create the file if nonexistant.
open(path, 'a').close()
# Update timestamp to right now.
os.utime(path, None)
def ReadFile(path, mode='r'):
"""Read a given file on disk. Primarily useful for one off small files."""
with open(path, mode) as f:
return f.read()
def SafeUnlink(path, sudo=False):
"""Unlink a file from disk, ignoring if it doesn't exist.
Returns True if the file existed and was removed, False if it didn't exist.
"""
if sudo:
try:
cros_build_lib.SudoRunCommand(
['rm', '--', path], print_cmd=False, redirect_stderr=True)
return True
except cros_build_lib.RunCommandError:
if os.path.exists(path):
# Technically racey, but oh well; very hard to actually hit...
raise
return False
try:
os.unlink(path)
return True
except EnvironmentError, e:
if e.errno != errno.ENOENT:
raise
return False
def SafeMakedirs(path, mode=0775, sudo=False):
"""Make parent directories if needed. Ignore if existing.
Arguments:
path: The path to create. Intermediate directories will be created as
needed.
mode: The access permissions in the style of chmod.
sudo: If True, create it via sudo, thus root owned.
Raises:
EnvironmentError: if the makedir failed and it was non sudo.
RunCommandError: If sudo mode, and the command failed for any reason.
Returns:
True if the directory had to be created, False if otherwise.
"""
if sudo:
if os.path.isdir(path):
return False
cros_build_lib.SudoRunCommand(
['mkdir', '-p', '--mode', oct(mode), path], print_cmd=False,
redirect_stderr=True, redirect_stdout=True)
return True
try:
os.makedirs(path, mode)
return True
except EnvironmentError, e:
if e.errno != errno.EEXIST or not os.path.isdir(path):
raise
return False
def RmDir(path, ignore_missing=False, sudo=False):
"""Recursively remove a directory.
Arguments:
ignore_missing: Do not error when path does not exist.
sudo: Remove directories as root.
"""
if sudo:
try:
cros_build_lib.SudoRunCommand(
['rm', '-r%s' % ('f' if ignore_missing else '',), '--', path],
debug_level=logging.DEBUG,
redirect_stdout=True, redirect_stderr=True)
except cros_build_lib.RunCommandError, e:
if not ignore_missing or os.path.exists(path):
# If we're not ignoring the rm ENOENT equivalent, throw it;
# if the pathway still exists, something failed, thus throw it.
raise
else:
try:
shutil.rmtree(path)
except EnvironmentError, e:
if not ignore_missing or e.errno != errno.ENOENT:
raise
def Which(binary):
"""Return the absolute path to the specified binary.
Arguments:
binary: The binary to look for.
Returns the specified binary if found. Otherwise returns None.
"""
for p in os.environ.get('PATH', '').split(':'):
p = os.path.join(p, binary)
if os.access(p, os.X_OK):
return p
return None
def FindMissingBinaries(needed_tools):
"""Verifies that the required tools are present on the system.
This is especially important for scripts that are intended to run
outside the chroot.
Arguments:
needed_tools: an array of string specified binaries to look for.
Returns:
If all tools are found, returns the empty list. Otherwise, returns the
list of missing tools.
"""
return [binary for binary in needed_tools if Which(binary) is None]
def IteratePathParents(start_path):
"""Generator that iterates through a directory's parents.
Yields:
The passed-in path, along with its parents. i.e.,
IteratePathParents('/usr/local') would yield '/usr/local', '/usr/', and '/'.
Arguments:
start_path: The path to start from.
"""
path = os.path.abspath(start_path)
yield path
while path.strip('/'):
path = os.path.dirname(path)
yield path
def FindInPathParents(path_to_find, start_path, test_func=None):
"""Look for a relative path, ascending through parent directories.
Ascend through parent directories of current path looking for a relative
path. I.e., given a directory structure like:
-/
|
--usr
|
--bin
|
--local
|
--google
the call FindInPathParents('bin', '/usr/local') would return '/usr/bin', and
the call FindInPathParents('google', '/usr/local') would return
'/usr/local/google'.
Arguments:
rel_path: The relative path to look for.
start_path: The path to start the search from. If |start_path| is a
directory, it will be included in the directories that are searched.
test_func: The function to use to verify the relative path. Defaults to
os.path.exists. The function will be passed one argument - the target
path to test. A True return value will cause AscendingLookup to return
the target.
"""
if test_func is None:
test_func = os.path.exists
for path in IteratePathParents(start_path):
target = os.path.join(path, path_to_find)
if test_func(target):
return target
return None
# pylint: disable=W0212,R0904,W0702
def _TempDirSetup(self, prefix='tmp', update_env=True):
"""Generate a tempdir, modifying the object, and env to use it.
Specifically, if update_env is True, then from this invocation forward,
python and all subprocesses will use this location for their tempdir.
The matching _TempDirTearDown restores the env to what it was.
"""
# Stash the old tempdir that was used so we can
# switch it back on the way out.
self.tempdir = tempfile.mkdtemp(prefix=prefix)
os.chmod(self.tempdir, 0700)
if update_env:
with tempfile._once_lock:
self._tempdir_value = tempfile._get_default_tempdir()
self._tempdir_env = tuple((x, os.environ.get(x))
for x in _TEMPDIR_ENV_VARS)
# Now update TMPDIR/TEMP/TMP, and poke the python
# internal to ensure all subprocess/raw tempfile
# access goes into this location.
os.environ.update((x, self.tempdir) for x in _TEMPDIR_ENV_VARS)
# Finally, adjust python's cached value (we know it's cached by here
# since we invoked _get_default_tempdir from above). Note this
# is necessary since we want *all* output from that point
# forward to go to this location.
tempfile.tempdir = self.tempdir
# pylint: disable=W0212,R0904,W0702
def _TempDirTearDown(self, force_sudo):
# Note that _TempDirSetup may have failed, resulting in these attributes
# not being set; this is why we use getattr here (and must).
tempdir = getattr(self, 'tempdir', None)
try:
if tempdir is not None:
RmDir(tempdir, ignore_missing=True, sudo=force_sudo)
except EnvironmentError, e:
# Suppress ENOENT since we may be invoked
# in a context where parallel wipes of the tempdir
# may be occuring; primarily during hard shutdowns.
if e.errno != errno.ENOENT:
raise
# Restore environment modification if necessary.
tempdir_value = getattr(self, '_tempdir_value', None)
if tempdir_value is not None:
with tempfile._once_lock:
tempfile.tempdir = self._tempdir_value
for key, value in self._tempdir_env:
if value is None:
os.environ.pop(key, None)
else:
os.environ[key] = value
@contextlib.contextmanager
def TempDirContextManager(prefix='tmp', storage=None, sudo_rm=False):
"""ContextManager constraining all tempfile/TMPDIR activity to a tempdir
Arguments:
storage: The object that will have its 'tempdir' attribute set.
sudo_rm: Whether the temporary dir will need root privileges to remove.
"""
if storage is None:
# Fake up a mutable object.
# TOOD(build): rebase this to EasyAttr from cros_test_lib once it moves
# to an importable location.
class foon(object):
pass
storage = foon()
_TempDirSetup(storage, prefix=prefix)
try:
yield storage.tempdir
finally:
_TempDirTearDown(storage, sudo_rm)
# pylint: disable=W0212,R0904,W0702
def TempDirDecorator(func):
"""Populates self.tempdir with path to a temporary writeable directory."""
def f(self, *args, **kwargs):
with TempDirContextManager(storage=self):
return func(self, *args, **kwargs)
f.__name__ = func.__name__
f.__doc__ = func.__doc__
f.__module__ = func.__module__
return f
def TempFileDecorator(func):
"""Populates self.tempfile with path to a temporary writeable file"""
def f(self, *args, **kwargs):
with tempfile.NamedTemporaryFile(dir=self.tempdir, delete=False) as f:
self.tempfile = f.name
return func(self, *args, **kwargs)
f.__name__ = func.__name__
f.__doc__ = func.__doc__
f.__module__ = func.__module__
return TempDirDecorator(f)
def SetEnvironment(env):
"""Restore the environment variables to that of passed in dictionary."""
for var in set(os.environ).difference(env):
del os.environ[var]
# Just brute force overwrite what's there with the passed in copy.
os.environ.update(env)