blob: 738acbd3cf3a853fee5b94d2b57e5a4fbf6a3f0e [file] [log] [blame]
# Copyright 2012 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Contains on-disk caching functionality."""
import datetime
import errno
import hashlib
import logging
import os
import shutil
import tempfile
from typing import Optional, Tuple, Union
import urllib.parse
from chromite.lib import cros_build_lib
from chromite.lib import locking
from chromite.lib import osutils
from chromite.lib import retry_util
from chromite.utils import gs_urls_util
# pylint: disable=protected-access
class Error(Exception):
"""Raised on fatal errors."""
def EntryLock(f):
"""Decorator that provides monitor access control."""
def new_f(self, *args, **kwargs):
# Ensure we don't have a read lock before potentially blocking while
# trying to access the monitor.
if self.read_locked:
raise AssertionError(
"Cannot call %s while holding a read lock." % f.__name__
)
with self._entry_lock:
self._entry_lock.write_lock()
return f(self, *args, **kwargs)
return new_f
def WriteLock(f):
"""Decorator that takes a write lock."""
def new_f(self, *args, **kwargs):
with self._lock.write_lock():
return f(self, *args, **kwargs)
return new_f
class CacheReference:
"""Encapsulates operations on a cache key reference.
CacheReferences are returned by the DiskCache.Lookup() function. They are
used to read from and insert into the cache.
A typical example of using a CacheReference:
@contextlib.contextmanager
def FetchFromCache()
with cache.Lookup(key) as ref:
# If entry doesn't exist in cache already, generate it ourselves,
# and insert it into the cache, acquiring a read lock on it in the
# process. If the entry does exist, we grab a read lock on it.
if not ref.Exists(lock=True):
path = PrepareItem()
ref.SetDefault(path, lock=True)
# yield the path to the cached entry to consuming code.
yield ref.path
"""
def __init__(self, cache, key) -> None:
self._cache = cache
self.key = key
self.acquired = False
self.read_locked = False
self._lock = cache._LockForKey(key)
self._entry_lock = cache._LockForKey(key, suffix=".entry_lock")
@property
def path(self) -> "os.PathLike[str]":
"""Returns on-disk path to the cached item."""
return self._cache.GetKeyPath(self.key)
def Acquire(self) -> None:
"""Prepare the cache reference for operation.
This must be called (either explicitly or through entering a 'with'
context) before calling any methods that acquire locks, or mutates
reference.
"""
if self.acquired:
raise AssertionError(
"Attempting to acquire an already acquired reference."
)
self.acquired = True
self._lock.__enter__()
def Release(self) -> None:
"""Release the cache reference. Causes any held locks to be released."""
if not self.acquired:
raise AssertionError(
"Attempting to release an unacquired reference."
)
self.acquired = False
self._lock.__exit__(None, None, None)
self.read_locked = False
def __enter__(self):
self.Acquire()
return self
def __exit__(self, *args) -> None:
self.Release()
def _ReadLock(self) -> None:
self._lock.read_lock()
self.read_locked = True
@WriteLock
def _Assign(self, path) -> None:
self._cache._Insert(self.key, path)
@WriteLock
def _AssignText(self, text) -> None:
self._cache._InsertText(self.key, text)
@WriteLock
def _Remove(self) -> None:
self._cache._Remove(self.key)
osutils.SafeUnlink(self._lock.path)
osutils.SafeUnlink(self._entry_lock.path)
def _Exists(self):
return self._cache._KeyExists(self.key)
@EntryLock
def Assign(self, path) -> None:
"""Insert a file or a directory into the cache at the referenced key."""
self._Assign(path)
@EntryLock
def AssignText(self, text) -> None:
"""Create a file containing |text| and assign it to the key.
Args:
text: Can be a string or an iterable.
"""
self._AssignText(text)
@EntryLock
def Remove(self) -> None:
"""Removes the entry from the cache."""
self._Remove()
@EntryLock
def Exists(self, lock=False):
"""Tests for existence of entry.
Args:
lock: If the entry exists, acquire and maintain a read lock on it.
"""
if self._Exists():
if lock:
self._ReadLock()
return True
return False
@EntryLock
def SetDefault(self, default_path, lock=False) -> None:
"""Assigns default_path if the entry doesn't exist.
Args:
default_path: The path to assign if the entry doesn't exist.
lock: Acquire and maintain a read lock on the entry.
"""
if not self._Exists():
self._Assign(default_path)
if lock:
self._ReadLock()
class DiskCache:
"""Locked file system cache keyed by tuples.
Key entries can be files or directories. Access to the cache is provided
through CacheReferences, which are retrieved by using the cache Lookup()
method.
"""
_STAGING_DIR = "staging"
def __init__(
self,
cache_dir: Union[str, os.PathLike],
cache_user: Optional[str] = None,
lock_suffix: str = ".lock",
) -> None:
# TODO(vapier): Convert this to Path.
self._cache_dir = str(cache_dir)
self._cache_user = cache_user
self._lock_suffix = lock_suffix
self.staging_dir = os.path.join(cache_dir, self._STAGING_DIR)
osutils.SafeMakedirsNonRoot(self._cache_dir, user=self._cache_user)
osutils.SafeMakedirsNonRoot(self.staging_dir, user=self._cache_user)
def _KeyExists(self, key):
return os.path.lexists(self.GetKeyPath(key))
def GetKeyPath(self, key: Tuple[str, ...]) -> "os.PathLike[str]":
"""Get the on-disk path of a key."""
return os.path.join(self._cache_dir, "+".join(key))
def _LockForKey(self, key, suffix=None):
"""Returns an unacquired lock associated with a key."""
suffix = suffix or self._lock_suffix
key_path = self.GetKeyPath(key)
osutils.SafeMakedirsNonRoot(
os.path.dirname(key_path), user=self._cache_user
)
lock_path = os.path.join(
self._cache_dir,
os.path.dirname(key_path),
os.path.basename(key_path) + suffix,
)
return locking.FileLock(lock_path)
def _TempDirContext(self):
return osutils.TempDir(base_dir=self.staging_dir)
def _Insert(self, key, path) -> None:
"""Insert a file or a directory into the cache at a given key."""
self._Remove(key)
key_path = self.GetKeyPath(key)
osutils.SafeMakedirsNonRoot(
os.path.dirname(key_path), user=self._cache_user
)
shutil.move(path, key_path)
def _InsertText(self, key, text) -> None:
"""Inserts a file containing |text| into the cache."""
with self._TempDirContext() as tempdir:
file_path = os.path.join(tempdir, "tempfile")
osutils.WriteFile(file_path, text)
self._Insert(key, file_path)
def _Remove(self, key) -> None:
"""Remove a key from the cache."""
if self._KeyExists(key):
with self._TempDirContext() as tempdir:
shutil.move(self.GetKeyPath(key), tempdir)
def GetKey(self, path: Union[str, os.PathLike]):
"""Returns the key for an item's path in the cache."""
path = str(path)
if path.startswith(self._cache_dir):
path = os.path.relpath(path, self._cache_dir)
return tuple(path.split("+"))
def ListKeys(self):
"""Returns a list of keys for every item present in the cache."""
keys = []
for root, dirs, files in os.walk(self._cache_dir):
for f in dirs + files:
key_path = os.path.join(root, f)
if os.path.exists(key_path + self._lock_suffix):
# Test for the presence of the key's lock file to determine
# if this is the root key path, or some file nested within a
# key's dir.
keys.append(self.GetKey(key_path))
return keys
def Lookup(self, key: Tuple[str, ...]) -> CacheReference:
"""Get a reference to a given key."""
return CacheReference(self, key)
def DeleteStale(self, max_age):
"""Removes any item from the cache that was modified after |max_age|.
Args:
max_age: An instance of datetime.timedelta. Any item not modified
within this amount of time will be removed.
Returns:
List of keys removed.
"""
if not isinstance(max_age, datetime.timedelta):
raise TypeError(
"max_age must be an instance of datetime.timedelta."
)
keys_removed = []
for key in self.ListKeys():
path = self.GetKeyPath(key)
mtime = max(os.path.getmtime(path), os.path.getctime(path))
time_since_last_modify = (
datetime.datetime.now() - datetime.datetime.fromtimestamp(mtime)
)
if time_since_last_modify > max_age:
self.Lookup(key).Remove()
keys_removed.append(key)
return keys_removed
class RemoteCache(DiskCache):
"""Supports caching of remote objects via URI."""
def _Fetch(
self,
url: str,
local_path: str,
*,
hash_sha1: Optional[str] = None,
mode: Optional[int] = None,
) -> None:
"""Fetch a remote file.
Args:
url: URL of the remote object.
local_path: Path to store
hash_sha1: If set, check for the SHA-1 sum.
mode: If set, the file is chmod-ed to mode.
"""
# We have to nest the import because gs.GSContext uses us to cache its
# own gsutil tarball. We know we won't get into a recursive loop though
# as it only fetches files via non-gs URIs.
from chromite.lib import gs
if gs_urls_util.PathIsGs(url):
ctx = gs.GSContext()
ctx.Copy(url, local_path)
else:
# Note: unittests assume local_path is at the end.
retry_util.RunCurl(
["--fail", url, "-o", local_path],
debug_level=logging.DEBUG,
capture_output=True,
)
if hash_sha1 is not None:
actual_sha1 = Sha1File(local_path)
if actual_sha1 != hash_sha1:
raise Error(f"sha1({url!r}) = {actual_sha1} != {hash_sha1}")
if mode is not None:
osutils.Chmod(local_path, mode)
def _Insert(self, key, url) -> None: # pylint: disable=arguments-renamed
"""Insert a remote file into the cache."""
o = urllib.parse.urlparse(url)
if o.scheme in ("file", ""):
DiskCache._Insert(self, key, o.path)
return
with tempfile.NamedTemporaryFile(
dir=self.staging_dir, delete=False
) as local_path:
self._Fetch(url, local_path.name)
DiskCache._Insert(self, key, local_path.name)
def Untar(path, cwd, sudo=False) -> None:
"""Untar a tarball."""
functor = cros_build_lib.sudo_run if sudo else cros_build_lib.run
comp = cros_build_lib.CompressionDetectType(path)
cmd = ["tar"]
if comp != cros_build_lib.CompressionType.NONE:
extra_comp_args = [cros_build_lib.FindCompressor(comp)]
if os.path.basename(extra_comp_args[0]) == "pbzip2":
extra_comp_args.append("--ignore-trailing-garbage=1")
elif os.path.basename(extra_comp_args[0]).startswith("zstd"):
extra_comp_args.append("-f")
cmd += ["-I", " ".join(extra_comp_args)]
functor(
cmd + ["-xpf", path],
cwd=cwd,
debug_level=logging.DEBUG,
capture_output=True,
)
class TarballCache(RemoteCache):
"""Supports caching of extracted tarball contents."""
# pylint: disable-next=arguments-renamed
def _Insert(self, key, tarball_path) -> None:
"""Insert a tarball and its extracted contents into the cache.
Download the tarball first if a URL is provided as tarball_path.
"""
with osutils.TempDir(
prefix="tarball-cache", base_dir=self.staging_dir
) as tempdir:
o = urllib.parse.urlsplit(tarball_path)
if o.scheme == "file":
tarball_path = o.path
elif o.scheme:
url = tarball_path
tarball_path = os.path.join(tempdir, os.path.basename(o.path))
self._Fetch(url, tarball_path)
extract_path = os.path.join(tempdir, "extract")
os.mkdir(extract_path)
Untar(tarball_path, extract_path)
DiskCache._Insert(self, key, extract_path)
def _KeyExists(self, key):
"""Specialized DiskCache._KeyExits that ignores empty directories.
The normal _KeyExists just checks to see if the key path exists in the
cache directory. Many tests mock out run then fetch a tarball. The mock
blocks untarring into it. This leaves behind an empty dir which blocks
future untarring in non-test scripts.
See crbug.com/468838
"""
# Wipe out empty directories before testing for existence.
key_path = self.GetKeyPath(key)
try:
os.rmdir(key_path)
except OSError as ex:
if ex.errno not in (errno.ENOTEMPTY, errno.ENOENT):
raise
return os.path.exists(key_path)
def Sha1File(path: Union[str, os.PathLike]) -> str:
"""Computes the SHA-1 checksum of path as a hex string."""
# Reusable buffer to reduce allocations.
buf = bytearray(4096)
view = memoryview(buf)
sha1 = hashlib.sha1()
with open(path, "rb") as fileobj:
while True:
size = fileobj.readinto(buf)
if size == 0:
# EOF
break
sha1.update(view[:size])
return sha1.hexdigest()