blob: 40fa1a1d817cf4cf0dea8e2c3f81bc53bef57971 [file] [log] [blame]
# Copyright 2012 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Contains on-disk caching functionality."""
import datetime
import errno
import hashlib
import logging
import os
import shutil
import tempfile
from typing import Optional, Union
import urllib.parse
from chromite.lib import cros_build_lib
from chromite.lib import locking
from chromite.lib import osutils
from chromite.lib import retry_util
# pylint: disable=protected-access
class Error(Exception):
"""Raised on fatal errors."""
def EntryLock(f):
"""Decorator that provides monitor access control."""
def new_f(self, *args, **kwargs):
# Ensure we don't have a read lock before potentially blocking while
# trying to access the monitor.
if self.read_locked:
raise AssertionError(
"Cannot call %s while holding a read lock." % f.__name__
with self._entry_lock:
return f(self, *args, **kwargs)
return new_f
def WriteLock(f):
"""Decorator that takes a write lock."""
def new_f(self, *args, **kwargs):
with self._lock.write_lock():
return f(self, *args, **kwargs)
return new_f
class CacheReference:
"""Encapsulates operations on a cache key reference.
CacheReferences are returned by the DiskCache.Lookup() function. They are
used to read from and insert into the cache.
A typical example of using a CacheReference:
def FetchFromCache()
with cache.Lookup(key) as ref:
# If entry doesn't exist in cache already, generate it ourselves,
# and insert it into the cache, acquiring a read lock on it in the
# process. If the entry does exist, we grab a read lock on it.
if not ref.Exists(lock=True):
path = PrepareItem()
ref.SetDefault(path, lock=True)
# yield the path to the cached entry to consuming code.
yield ref.path
def __init__(self, cache, key):
self._cache = cache
self.key = key
self.acquired = False
self.read_locked = False
self._lock = cache._LockForKey(key)
self._entry_lock = cache._LockForKey(key, suffix=".entry_lock")
def path(self):
"""Returns on-disk path to the cached item."""
return self._cache.GetKeyPath(self.key)
def Acquire(self):
"""Prepare the cache reference for operation.
This must be called (either explicitly or through entering a 'with'
context) before calling any methods that acquire locks, or mutates
if self.acquired:
raise AssertionError(
"Attempting to acquire an already acquired reference."
self.acquired = True
def Release(self):
"""Release the cache reference. Causes any held locks to be released."""
if not self.acquired:
raise AssertionError(
"Attempting to release an unacquired reference."
self.acquired = False
self._lock.__exit__(None, None, None)
self.read_locked = False
def __enter__(self):
return self
def __exit__(self, *args):
def _ReadLock(self):
self.read_locked = True
def _Assign(self, path):
self._cache._Insert(self.key, path)
def _AssignText(self, text):
self._cache._InsertText(self.key, text)
def _Remove(self):
def _Exists(self):
return self._cache._KeyExists(self.key)
def Assign(self, path):
"""Insert a file or a directory into the cache at the referenced key."""
def AssignText(self, text):
"""Create a file containing |text| and assign it to the key.
text: Can be a string or an iterable.
def Remove(self):
"""Removes the entry from the cache."""
def Exists(self, lock=False):
"""Tests for existence of entry.
lock: If the entry exists, acquire and maintain a read lock on it.
if self._Exists():
if lock:
return True
return False
def SetDefault(self, default_path, lock=False):
"""Assigns default_path if the entry doesn't exist.
default_path: The path to assign if the entry doesn't exist.
lock: Acquire and maintain a read lock on the entry.
if not self._Exists():
if lock:
class DiskCache:
"""Locked file system cache keyed by tuples.
Key entries can be files or directories. Access to the cache is provided
through CacheReferences, which are retrieved by using the cache Lookup()
_STAGING_DIR = "staging"
def __init__(
cache_dir: Union[str, os.PathLike],
cache_user: Optional[str] = None,
lock_suffix: str = ".lock",
# TODO(vapier): Convert this to Path.
self._cache_dir = str(cache_dir)
self._cache_user = cache_user
self._lock_suffix = lock_suffix
self.staging_dir = os.path.join(cache_dir, self._STAGING_DIR)
osutils.SafeMakedirsNonRoot(self._cache_dir, user=self._cache_user)
osutils.SafeMakedirsNonRoot(self.staging_dir, user=self._cache_user)
def _KeyExists(self, key):
return os.path.lexists(self.GetKeyPath(key))
def GetKeyPath(self, key):
"""Get the on-disk path of a key."""
return os.path.join(self._cache_dir, "+".join(key))
def _LockForKey(self, key, suffix=None):
"""Returns an unacquired lock associated with a key."""
suffix = suffix or self._lock_suffix
key_path = self.GetKeyPath(key)
os.path.dirname(key_path), user=self._cache_user
lock_path = os.path.join(
os.path.basename(key_path) + suffix,
return locking.FileLock(lock_path)
def _TempDirContext(self):
return osutils.TempDir(base_dir=self.staging_dir)
def _Insert(self, key, path):
"""Insert a file or a directory into the cache at a given key."""
key_path = self.GetKeyPath(key)
os.path.dirname(key_path), user=self._cache_user
shutil.move(path, key_path)
def _InsertText(self, key, text):
"""Inserts a file containing |text| into the cache."""
with self._TempDirContext() as tempdir:
file_path = os.path.join(tempdir, "tempfile")
osutils.WriteFile(file_path, text)
self._Insert(key, file_path)
def _Remove(self, key):
"""Remove a key from the cache."""
if self._KeyExists(key):
with self._TempDirContext() as tempdir:
shutil.move(self.GetKeyPath(key), tempdir)
def GetKey(self, path: Union[str, os.PathLike]):
"""Returns the key for an item's path in the cache."""
path = str(path)
if path.startswith(self._cache_dir):
path = os.path.relpath(path, self._cache_dir)
return tuple(path.split("+"))
def ListKeys(self):
"""Returns a list of keys for every item present in the cache."""
keys = []
for root, dirs, files in os.walk(self._cache_dir):
for f in dirs + files:
key_path = os.path.join(root, f)
if os.path.exists(key_path + self._lock_suffix):
# Test for the presence of the key's lock file to determine
# if this is the root key path, or some file nested within a
# key's dir.
return keys
def Lookup(self, key):
"""Get a reference to a given key."""
return CacheReference(self, key)
def DeleteStale(self, max_age):
"""Removes any item from the cache that was modified after |max_age|.
max_age: An instance of datetime.timedelta. Any item not modified
within this amount of time will be removed.
List of keys removed.
if not isinstance(max_age, datetime.timedelta):
raise TypeError(
"max_age must be an instance of datetime.timedelta."
keys_removed = []
for key in self.ListKeys():
path = self.GetKeyPath(key)
mtime = max(os.path.getmtime(path), os.path.getctime(path))
time_since_last_modify = ( - datetime.datetime.fromtimestamp(mtime)
if time_since_last_modify > max_age:
return keys_removed
class RemoteCache(DiskCache):
"""Supports caching of remote objects via URI."""
def _Fetch(
url: str,
local_path: str,
hash_sha1: Optional[str] = None,
mode: Optional[int] = None,
"""Fetch a remote file.
url: URL of the remote object.
local_path: Path to store
hash_sha1: If set, check for the SHA-1 sum.
mode: If set, the file is chmod-ed to mode.
# We have to nest the import because gs.GSContext uses us to cache its
# own gsutil tarball. We know we won't get into a recursive loop though
# as it only fetches files via non-gs URIs.
from chromite.lib import gs
if gs.PathIsGs(url):
ctx = gs.GSContext()
ctx.Copy(url, local_path)
# Note: unittests assume local_path is at the end.
["--fail", url, "-o", local_path],
if hash_sha1 is not None:
actual_sha1 = Sha1File(local_path)
if actual_sha1 != hash_sha1:
raise Error(f"sha1({url!r}) = {actual_sha1} != {hash_sha1}")
if mode is not None:
osutils.Chmod(local_path, mode)
def _Insert(self, key, url): # pylint: disable=arguments-renamed
"""Insert a remote file into the cache."""
o = urllib.parse.urlparse(url)
if o.scheme in ("file", ""):
DiskCache._Insert(self, key, o.path)
with tempfile.NamedTemporaryFile(
dir=self.staging_dir, delete=False
) as local_path:
DiskCache._Insert(self, key,
def Untar(path, cwd, sudo=False):
"""Untar a tarball."""
functor = cros_build_lib.sudo_run if sudo else
comp = cros_build_lib.CompressionDetectType(path)
cmd = ["tar"]
if comp != cros_build_lib.CompressionType.NONE:
extra_comp_args = [cros_build_lib.FindCompressor(comp)]
if os.path.basename(extra_comp_args[0]) == "pbzip2":
elif os.path.basename(extra_comp_args[0]).startswith("zstd"):
cmd += ["-I", " ".join(extra_comp_args)]
cmd + ["-xpf", path],
class TarballCache(RemoteCache):
"""Supports caching of extracted tarball contents."""
def _Insert(self, key, tarball_path): # pylint: disable=arguments-renamed
"""Insert a tarball and its extracted contents into the cache.
Download the tarball first if a URL is provided as tarball_path.
with osutils.TempDir(
prefix="tarball-cache", base_dir=self.staging_dir
) as tempdir:
o = urllib.parse.urlsplit(tarball_path)
if o.scheme == "file":
tarball_path = o.path
elif o.scheme:
url = tarball_path
tarball_path = os.path.join(tempdir, os.path.basename(o.path))
self._Fetch(url, tarball_path)
extract_path = os.path.join(tempdir, "extract")
Untar(tarball_path, extract_path)
DiskCache._Insert(self, key, extract_path)
def _KeyExists(self, key):
"""Specialized DiskCache._KeyExits that ignores empty directories.
The normal _KeyExists just checks to see if the key path exists in the
cache directory. Many tests mock out run then fetch a tarball. The mock
blocks untarring into it. This leaves behind an empty dir which blocks
future untarring in non-test scripts.
# Wipe out empty directories before testing for existence.
key_path = self.GetKeyPath(key)
except OSError as ex:
if ex.errno not in (errno.ENOTEMPTY, errno.ENOENT):
return os.path.exists(key_path)
def Sha1File(path: Union[str, os.PathLike]) -> str:
"""Computes the SHA-1 checksum of path as a hex string."""
# Reusable buffer to reduce allocations.
buf = bytearray(4096)
view = memoryview(buf)
sha1 = hashlib.sha1()
with open(path, "rb") as fileobj:
while True:
size = fileobj.readinto(buf)
if size == 0:
return sha1.hexdigest()