blob: b17ade10557292ec778a84e46c5fe7b1c80fd4e2 [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Contains on-disk caching functionality."""
from __future__ import print_function
import datetime
import errno
import os
import shutil
import tempfile
from six.moves import urllib
from chromite.lib import cros_build_lib
from chromite.lib import cros_logging as logging
from chromite.lib import locking
from chromite.lib import osutils
from chromite.lib import retry_util
# pylint: disable=protected-access
def EntryLock(f):
"""Decorator that provides monitor access control."""
def new_f(self, *args, **kwargs):
# Ensure we don't have a read lock before potentially blocking while trying
# to access the monitor.
if self.read_locked:
raise AssertionError(
'Cannot call %s while holding a read lock.' % f.__name__)
with self._entry_lock:
return f(self, *args, **kwargs)
return new_f
def WriteLock(f):
"""Decorator that takes a write lock."""
def new_f(self, *args, **kwargs):
with self._lock.write_lock():
return f(self, *args, **kwargs)
return new_f
class CacheReference(object):
"""Encapsulates operations on a cache key reference.
CacheReferences are returned by the DiskCache.Lookup() function. They are
used to read from and insert into the cache.
A typical example of using a CacheReference:
def FetchFromCache()
with cache.Lookup(key) as ref:
# If entry doesn't exist in cache already, generate it ourselves, and
# insert it into the cache, acquiring a read lock on it in the process.
# If the entry does exist, we grab a read lock on it.
if not ref.Exists(lock=True):
path = PrepareItem()
ref.SetDefault(path, lock=True)
# yield the path to the cached entry to consuming code.
yield ref.path
def __init__(self, cache, key):
self._cache = cache
self.key = key
self.acquired = False
self.read_locked = False
self._lock = cache._LockForKey(key)
self._entry_lock = cache._LockForKey(key, suffix='.entry_lock')
def path(self):
"""Returns on-disk path to the cached item."""
return self._cache.GetKeyPath(self.key)
def Acquire(self):
"""Prepare the cache reference for operation.
This must be called (either explicitly or through entering a 'with'
context) before calling any methods that acquire locks, or mutates
if self.acquired:
raise AssertionError(
'Attempting to acquire an already acquired reference.')
self.acquired = True
def Release(self):
"""Release the cache reference. Causes any held locks to be released."""
if not self.acquired:
raise AssertionError(
'Attempting to release an unacquired reference.')
self.acquired = False
self._lock.__exit__(None, None, None)
self.read_locked = False
def __enter__(self):
return self
def __exit__(self, *args):
def _ReadLock(self):
self.read_locked = True
def _Assign(self, path):
self._cache._Insert(self.key, path)
def _AssignText(self, text):
self._cache._InsertText(self.key, text)
def _Remove(self):
def _Exists(self):
return self._cache._KeyExists(self.key)
def Assign(self, path):
"""Insert a file or a directory into the cache at the referenced key."""
def AssignText(self, text):
"""Create a file containing |text| and assign it to the key.
text: Can be a string or an iterable.
def Remove(self):
"""Removes the entry from the cache."""
def Exists(self, lock=False):
"""Tests for existence of entry.
lock: If the entry exists, acquire and maintain a read lock on it.
if self._Exists():
if lock:
return True
return False
def SetDefault(self, default_path, lock=False):
"""Assigns default_path if the entry doesn't exist.
default_path: The path to assign if the entry doesn't exist.
lock: Acquire and maintain a read lock on the entry.
if not self._Exists():
if lock:
class DiskCache(object):
"""Locked file system cache keyed by tuples.
Key entries can be files or directories. Access to the cache is provided
through CacheReferences, which are retrieved by using the cache Lookup()
_STAGING_DIR = 'staging'
def __init__(self, cache_dir, cache_user=None, lock_suffix='.lock'):
self._cache_dir = cache_dir
self._cache_user = cache_user
self._lock_suffix = lock_suffix
self.staging_dir = os.path.join(cache_dir, self._STAGING_DIR)
osutils.SafeMakedirsNonRoot(self._cache_dir, user=self._cache_user)
osutils.SafeMakedirsNonRoot(self.staging_dir, user=self._cache_user)
def _KeyExists(self, key):
return os.path.lexists(self.GetKeyPath(key))
def GetKeyPath(self, key):
"""Get the on-disk path of a key."""
return os.path.join(self._cache_dir, '+'.join(key))
def _LockForKey(self, key, suffix=None):
"""Returns an unacquired lock associated with a key."""
suffix = suffix or self._lock_suffix
key_path = self.GetKeyPath(key)
lock_path = os.path.join(self._cache_dir, os.path.dirname(key_path),
os.path.basename(key_path) + suffix)
return locking.FileLock(lock_path)
def _TempDirContext(self):
return osutils.TempDir(base_dir=self.staging_dir)
def _Insert(self, key, path):
"""Insert a file or a directory into the cache at a given key."""
key_path = self.GetKeyPath(key)
shutil.move(path, key_path)
def _InsertText(self, key, text):
"""Inserts a file containing |text| into the cache."""
with self._TempDirContext() as tempdir:
file_path = os.path.join(tempdir, 'tempfile')
osutils.WriteFile(file_path, text)
self._Insert(key, file_path)
def _Remove(self, key):
"""Remove a key from the cache."""
if self._KeyExists(key):
with self._TempDirContext() as tempdir:
shutil.move(self.GetKeyPath(key), tempdir)
def GetKey(self, path):
"""Returns the key for an item's path in the cache."""
if self._cache_dir in path:
path = os.path.relpath(path, self._cache_dir)
return tuple(path.split('+'))
def ListKeys(self):
"""Returns a list of keys for every item present in the cache."""
keys = []
for root, dirs, files in os.walk(self._cache_dir):
for f in dirs + files:
key_path = os.path.join(root, f)
if os.path.exists(key_path + self._lock_suffix):
# Test for the presence of the key's lock file to determine if this
# is the root key path, or some file nested within a key's dir.
return keys
def Lookup(self, key):
"""Get a reference to a given key."""
return CacheReference(self, key)
def DeleteStale(self, max_age):
"""Removes any item from the cache that was modified after a given lifetime.
max_age: An instance of datetime.timedelta. Any item not modified within
this amount of time will be removed.
List of keys removed.
if not isinstance(max_age, datetime.timedelta):
raise TypeError('max_age must be an instance of datetime.timedelta.')
keys_removed = []
for key in self.ListKeys():
path = self.GetKeyPath(key)
mtime = max(os.path.getmtime(path), os.path.getctime(path))
time_since_last_modify = ( - datetime.datetime.fromtimestamp(mtime))
if time_since_last_modify > max_age:
return keys_removed
class RemoteCache(DiskCache):
"""Supports caching of remote objects via URI."""
def _Fetch(self, url, local_path):
"""Fetch a remote file."""
# We have to nest the import because gs.GSContext uses us to cache its own
# gsutil tarball. We know we won't get into a recursive loop though as it
# only fetches files via non-gs URIs.
from chromite.lib import gs
if gs.PathIsGs(url):
ctx = gs.GSContext()
ctx.Copy(url, local_path)
# Note: unittests assume local_path is at the end.
retry_util.RunCurl(['--fail', url, '-o', local_path],
debug_level=logging.DEBUG, capture_output=True)
def _Insert(self, key, url): # pylint: disable=arguments-differ
"""Insert a remote file into the cache."""
o = urllib.parse.urlparse(url)
if o.scheme in ('file', ''):
DiskCache._Insert(self, key, o.path)
with tempfile.NamedTemporaryFile(dir=self.staging_dir,
delete=False) as local_path:
DiskCache._Insert(self, key,
def Untar(path, cwd, sudo=False):
"""Untar a tarball."""
functor = cros_build_lib.sudo_run if sudo else
comp = cros_build_lib.CompressionExtToType(path)
cmd = ['tar']
if comp != cros_build_lib.COMP_NONE:
extra_comp_args = [cros_build_lib.FindCompressor(comp)]
if os.path.basename(extra_comp_args[0]) == 'pbzip2':
cmd += ['-I', ' '.join(extra_comp_args)]
functor(cmd + ['-xpf', path], cwd=cwd, debug_level=logging.DEBUG, quiet=True)
class TarballCache(RemoteCache):
"""Supports caching of extracted tarball contents."""
def _Insert(self, key, tarball_path): # pylint: disable=arguments-differ
"""Insert a tarball and its extracted contents into the cache.
Download the tarball first if a URL is provided as tarball_path.
with osutils.TempDir(prefix='tarball-cache',
base_dir=self.staging_dir) as tempdir:
o = urllib.parse.urlsplit(tarball_path)
if o.scheme == 'file':
tarball_path = o.path
elif o.scheme:
url = tarball_path
tarball_path = os.path.join(tempdir, os.path.basename(o.path))
self._Fetch(url, tarball_path)
extract_path = os.path.join(tempdir, 'extract')
Untar(tarball_path, extract_path)
DiskCache._Insert(self, key, extract_path)
def _KeyExists(self, key):
"""Specialized DiskCache._KeyExits that ignores empty directories.
The normal _KeyExists just checks to see if the key path exists in the cache
directory. Many tests mock out run then fetch a tarball. The mock
blocks untarring into it. This leaves behind an empty dir which blocks
future untarring in non-test scripts.
# Wipe out empty directories before testing for existence.
key_path = self.GetKeyPath(key)
except OSError as ex:
if ex.errno not in (errno.ENOTEMPTY, errno.ENOENT):
return os.path.exists(key_path)