blob: 5f972a7651a5dd0b5eefdc879d2f9427d77c64fd [file] [log] [blame] [edit]
# Copyright 2013 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Main module for parsing and interpreting XBuddy paths for the devserver."""
import configparser
import datetime
import distutils.version # pylint: disable=import-error,no-name-in-module
import logging
import operator
import os
import re
import shutil
import sys
import threading
import time
from typing import List, NamedTuple, Optional, Tuple
import urllib
from chromite.lib import constants
from chromite.lib import gs
from chromite.lib import image_lib
from chromite.lib import osutils
from chromite.lib import path_util
from chromite.lib.xbuddy import artifact_info
from chromite.lib.xbuddy import build_artifact
from chromite.lib.xbuddy import common_util
from chromite.lib.xbuddy import devserver_constants
from chromite.lib.xbuddy import downloader
# xBuddy config constants
CONFIG_FILE = "xbuddy_config.ini"
SHADOW_CONFIG_FILE = "shadow_xbuddy_config.ini"
PATH_REWRITES = "PATH_REWRITES"
GENERAL = "GENERAL"
LOCATION_SUFFIXES = "LOCATION_SUFFIXES"
# Path for shadow config in chroot.
CHROOT_SHADOW_DIR = "/mnt/host/source/src/platform/dev"
# XBuddy aliases
TEST = "test"
BASE = "base"
DEV = "dev"
FULL = "full_payload"
DELTA = "delta_payload"
SIGNED = "signed"
RECOVERY = "recovery"
STATEFUL = "stateful"
AUTOTEST = "autotest"
FACTORY_SHIM = "factory_shim"
# Local build constants
ANY = "ANY"
LATEST = "latest"
LOCAL = "local"
REMOTE = "remote"
# TODO(sosa): Fix a lot of assumptions about these aliases. There is too much
# implicit logic here that's unnecessary. What should be done:
# 1) Collapse Alias logic to one set of aliases for xbuddy (not local/remote).
# 2) Do not use zip when creating these dicts. Better to not rely on ordering.
# 3) Move alias/artifact mapping to a central module rather than having it here.
# 4) Be explicit when things are missing i.e. no dev images in image.zip.
LOCAL_ALIASES = [
TEST,
DEV,
BASE,
RECOVERY,
FACTORY_SHIM,
FULL,
STATEFUL,
ANY,
]
LOCAL_FILE_NAMES = [
devserver_constants.TEST_IMAGE_FILE,
devserver_constants.IMAGE_FILE,
devserver_constants.BASE_IMAGE_FILE,
devserver_constants.RECOVERY_IMAGE_FILE,
devserver_constants.FACTORY_SHIM_IMAGE_FILE,
devserver_constants.UPDATE_FILE,
devserver_constants.STATEFUL_FILE,
None, # For ANY.
]
LOCAL_ALIAS_TO_FILENAME = dict(zip(LOCAL_ALIASES, LOCAL_FILE_NAMES))
# Google Storage constants
GS_ALIASES = [
TEST,
BASE,
RECOVERY,
SIGNED,
FACTORY_SHIM,
FULL,
DELTA,
STATEFUL,
AUTOTEST,
]
ARTIFACTS = [
artifact_info.TEST_IMAGE,
artifact_info.BASE_IMAGE,
artifact_info.RECOVERY_IMAGE,
artifact_info.SIGNED_IMAGE,
artifact_info.FACTORY_SHIM_IMAGE,
artifact_info.FULL_PAYLOAD,
artifact_info.DELTA_PAYLOAD,
artifact_info.STATEFUL_PAYLOAD,
artifact_info.AUTOTEST,
]
GS_ALIAS_TO_ARTIFACT = dict(zip(GS_ALIASES, ARTIFACTS))
LATEST_OFFICIAL = "latest-official"
RELEASE = "-release"
DEFAULT_STATIC_DIR = os.path.join(constants.SOURCE_ROOT, "devserver", "static")
_TIMESTAMP_DELIMITER = "SLASH"
_XBUDDY_TIMESTAMP_DIR = "xbuddy_UpdateTimestamps"
class XBuddyComponents(NamedTuple):
"""Container for XBuddy URI components."""
image_type: str
board: Optional[str]
version: str
is_local: bool
class XBuddyException(Exception):
"""Exception classes used by this module."""
class XBuddyInvalidSchemeException(XBuddyException):
"""Exception raised for invalid URI scheme in XBuddy URI."""
def update_timestamp(timestamp_dir: os.PathLike, build_id: os.PathLike) -> None:
"""Update timestamp file of build with build_id."""
common_util.MkDirP(timestamp_dir)
logging.debug("Updating timestamp for %s", build_id)
osutils.Touch(os.path.join(timestamp_dir, build_to_timestamp(build_id)))
def timestamp_to_build(timestamp_filename: os.PathLike) -> os.PathLike:
"""Perform delimiter conversion on a timestamp file to get an image path."""
return timestamp_filename.replace(_TIMESTAMP_DELIMITER, "/")
def build_to_timestamp(build_path: os.PathLike) -> os.PathLike:
"""Perform delimiter conversion on an image path to get a timestamp file."""
return build_path.replace("/", _TIMESTAMP_DELIMITER)
def find_any(local_dir: os.PathLike) -> Optional[str]:
"""Returns the image_type for ANY given the local_dir."""
test_image = os.path.join(local_dir, devserver_constants.TEST_IMAGE_FILE)
dev_image = os.path.join(local_dir, devserver_constants.IMAGE_FILE)
base_image = os.path.join(local_dir, devserver_constants.BASE_IMAGE_FILE)
# Prioritize test images over dev images.
if os.path.exists(test_image):
return "test"
if os.path.exists(dev_image):
return "dev"
if os.path.exists(base_image):
logging.warning(
"Using base image as test and dev images were not found. "
"This will likely cause cros flash to fail leading to "
"needing to repair the device with a USB stick."
)
return "base"
return None
class XBuddy:
"""Class that manages image retrieval and caching by the devserver.
Image retrieval by xBuddy path:
XBuddy accesses images and artifacts that it stores using an xBuddy
path of the form: board/version/alias
The primary xbuddy.Get call retrieves the correct artifact or url to where
the artifacts can be found.
Image caching:
Images and other artifacts are stored identically to how they would have
been if devserver's stage rpc was called and the xBuddy cache replaces
build versions on a LRU basis. Timestamps are maintained by last accessed
times of representative files in the a directory in the static serve
directory (XBUDDY_TIMESTAMP_DIR).
Private class members:
_true_values: used for interpreting boolean values
_staging_thread_count: track download requests
_timestamp_folder: directory with empty files standing in as timestamps
for each image currently cached by xBuddy
"""
_true_values = ["true", "t", "yes", "y"]
# Number of threads that are staging images.
_staging_thread_count = 0
# Lock used to lock increasing/decreasing count.
_staging_thread_count_lock = threading.Lock()
def __init__(
self,
manage_builds: bool = False,
board: str = "",
version: str = "",
images_dir: os.PathLike = "",
static_dir: os.PathLike = DEFAULT_STATIC_DIR,
gsutil_bin: Optional[os.PathLike] = None,
):
"""Initialize an XBuddy image file manager.
Args:
manage_builds: Indicates whether XBuddy is managing local builds.
board: The name of the board of interest.
version: The version to fill in when rewriting paths; usually either
a specific version number or an alias like LATEST.
images_dir: The (local) directory where images will be cached.
static_dir: A utility folder used for caching operations; of
particular interest is the timestamp folder located in a subdir,
which is the tracking mechanism for LRU caching and retrieval.
gsutil_bin: The absolute path to the `gsutil` binary.
"""
self.config = self._ReadConfig()
self._manage_builds = manage_builds or self._ManageBuilds()
self._board = board
self._version = version
self.static_dir = static_dir
self._timestamp_folder = os.path.join(
self.static_dir, _XBUDDY_TIMESTAMP_DIR
)
self.images_dir = images_dir or os.path.join(
constants.SOURCE_ROOT, "src/build/images"
)
cache_user = "chronos" if common_util.IsRunningOnMoblab() else None
self._ctx = gs.GSContext(cache_user=cache_user, gsutil_bin=gsutil_bin)
common_util.MkDirP(self._timestamp_folder)
@classmethod
def ParseBoolean(cls, boolean_string: Optional[str]) -> bool:
"""Evaluate a string to a boolean value"""
if boolean_string:
return boolean_string.lower() in cls._true_values
else:
return False
def _ReadConfig(self) -> configparser.ConfigParser:
"""Read xbuddy config from ini files.
Reads the base config from xbuddy_config.ini, and then merges in the
shadow config from shadow_xbuddy_config.ini
Returns:
The merged configuration.
Raises:
XBuddyException if the config file is missing.
"""
devserver_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
config_file = os.path.join(devserver_dir, CONFIG_FILE)
xbuddy_config = configparser.ConfigParser()
if os.path.exists(config_file):
xbuddy_config.read(config_file)
else:
# Get the directory of xbuddy.py file.
file_dir = os.path.dirname(os.path.realpath(__file__))
# Read the default xbuddy_config.ini from the directory.
xbuddy_config.read(os.path.join(file_dir, CONFIG_FILE))
# Read the shadow file if there is one.
if os.path.isdir(CHROOT_SHADOW_DIR):
shadow_config_file = os.path.join(
CHROOT_SHADOW_DIR, SHADOW_CONFIG_FILE
)
else:
shadow_config_file = os.path.join(devserver_dir, SHADOW_CONFIG_FILE)
logging.debug(
"Using shadow config file stored at %s", shadow_config_file
)
if os.path.exists(shadow_config_file):
shadow_xbuddy_config = configparser.ConfigParser()
shadow_xbuddy_config.read(shadow_config_file)
# Merge shadow config in.
sections = shadow_xbuddy_config.sections()
for s in sections:
if not xbuddy_config.has_section(s):
xbuddy_config.add_section(s)
options = shadow_xbuddy_config.options(s)
for o in options:
val = shadow_xbuddy_config.get(s, o)
xbuddy_config.set(s, o, val)
return xbuddy_config
def _ManageBuilds(self) -> bool:
"""Check if xBuddy is managing local builds using the current config."""
try:
return self.ParseBoolean(self.config.get(GENERAL, "manage_builds"))
except configparser.Error:
return False
def _Capacity(self) -> int:
"""Gets the xbuddy capacity from the current config."""
try:
return int(self.config.get(GENERAL, "capacity"))
except configparser.Error:
return 5
def LookupAlias(
self,
alias: str,
board: Optional[str] = None,
version: Optional[str] = None,
) -> Tuple[os.PathLike, str]:
"""Given the full xbuddy config, look up an alias for path rewrite.
Args:
alias: The xbuddy path that could be one of the aliases in the
rewrite table.
board: The board to fill in with when paths are rewritten. Can be
from the update request xml or the default board from devserver.
If None, defers to the value given during XBuddy initialization.
version: The version to fill in when rewriting paths. Could be a
specific version number or a version alias like LATEST. If None,
defers to the value given during XBuddy initialization, or
LATEST.
Returns:
A pair (val, suffix) where val is the rewritten path, or the
original string if no rewrite was found; and suffix is the assigned
location suffix, or the default suffix if none was found.
"""
try:
suffix = self.config.get(LOCATION_SUFFIXES, alias)
except configparser.Error:
suffix = RELEASE
try:
val = self.config.get(PATH_REWRITES, alias)
except configparser.Error:
# No alias lookup found. Return original path.
val = None
if not (val and val.strip()):
val = alias
else:
# The found value is not an empty string.
# Fill in the board and version.
val = val.replace("BOARD", "%(board)s")
val = val.replace("VERSION", "%(version)s")
val = val % {
"board": board or self._board,
"version": version or self._version or LATEST,
}
return val, suffix
def _LookupOfficial(
self, board: str, suffix: str, image_dir: Optional[os.PathLike] = None
) -> os.PathLike:
"""Check LATEST-master for the version number of interest."""
logging.debug("Checking gs for latest %s-%s image", board, suffix)
image_dir = (image_dir or devserver_constants.GS_IMAGE_DIR).rstrip("/")
version = None
for f in (
devserver_constants.GS_LATEST_MAIN,
devserver_constants.GS_LATEST_MASTER,
):
latest_addr = f % {
"image_dir": image_dir,
"board": board,
"suffix": suffix,
}
try:
# Full release + version is in the LATEST file.
version = self._ctx.Cat(latest_addr, encoding="utf-8")
return devserver_constants.IMAGE_DIR % {
"board": board,
"suffix": suffix,
"version": version,
}
except gs.GSContextException as e:
logging.warning(
"Failed to look up file %s with error %s. ignoring", f, e
)
raise build_artifact.ArtifactDownloadError(
"Cannot look up the official image version."
)
def _LS(
self, path: os.PathLike, list_subdirectory: bool = False
) -> List[os.PathLike]:
"""Does a directory listing of the given gs path.
Args:
path: directory location on google storage to check.
list_subdirectory: whether to only list subdirectory for |path|.
Returns:
A list of paths that matched |path|.
"""
if list_subdirectory:
return self._ctx.DoCommand(
["ls", "-d", "--", path], stdout=True
).stdout.splitlines()
else:
return self._ctx.LS(path)
def _GetLatestVersionFromGsDir(
self,
path: os.PathLike,
list_subdirectory: bool = False,
with_release: bool = True,
) -> str:
"""Get most recent version number found in a google storage directory.
This lists out the contents of the given GS bucket or regex to GS
buckets, and tries to grab the newest version found in the directory
names.
Args:
path: directory location on google storage to check.
list_subdirectory: whether to only list subdirectory for |path|.
with_release: whether versions include a release milestone (e.g.
R12).
Returns:
The most recent version number found.
"""
list_result = self._LS(path, list_subdirectory=list_subdirectory)
dir_names = [os.path.basename(p.rstrip("/")) for p in list_result]
try:
versions_re = re.compile(
devserver_constants.VERSION_RE
if with_release
else devserver_constants.VERSION
)
versions = [d for d in dir_names if versions_re.match(d)]
latest_version = max(versions, key=distutils.version.LooseVersion)
except ValueError:
raise gs.GSContextException(
"Failed to find most recent builds at %s" % path
)
return latest_version
def _LookupChannel(
self,
board: str,
suffix: str,
channel: str = "stable",
image_dir: Optional[os.PathLike] = None,
) -> str:
"""Check the channel folder for the version number of interest."""
# Get all names in channel dir. Get 10 highest directories by version.
logging.debug(
"Checking channel '%s' for latest '%s' image", channel, board
)
# Due to historical reasons, gs://chromeos-releases uses
# daisy-spring as opposed to the board name daisy_spring. Convert
# he board name for the lookup.
channel_dir = devserver_constants.GS_CHANNEL_DIR % {
"channel": channel,
"board": re.sub("_", "-", board),
}
latest_version = self._GetLatestVersionFromGsDir(
channel_dir, with_release=False
)
# Figure out release number from the version number.
image_url = devserver_constants.IMAGE_DIR % {
"board": board,
"suffix": suffix,
"version": "R*" + latest_version,
}
image_dir = (image_dir or devserver_constants.GS_IMAGE_DIR).rstrip("/")
gs_url = os.path.join(image_dir, image_url)
# There should only be one match on cros-image-archive.
full_version = self._GetLatestVersionFromGsDir(
gs_url, list_subdirectory=True
)
return devserver_constants.IMAGE_DIR % {
"board": board,
"suffix": suffix,
"version": full_version,
}
def _LookupVersion(self, board: str, suffix: str, version: str) -> str:
"""Search GS image releases for the highest match to a version."""
# Build the pattern for GS to match.
logging.debug(
"Checking gs for latest '%s' image with prefix '%s'", board, version
)
image_url = devserver_constants.IMAGE_DIR % {
"board": board,
"suffix": suffix,
"version": version + "*",
}
image_dir = os.path.join(devserver_constants.GS_IMAGE_DIR, image_url)
# Grab the newest version of the ones matched.
full_version = self._GetLatestVersionFromGsDir(
image_dir, list_subdirectory=True
)
return devserver_constants.IMAGE_DIR % {
"board": board,
"suffix": suffix,
"version": full_version,
}
def _RemoteBuildId(
self, board: str, suffix: str, version: str
) -> os.PathLike:
"""Returns the remote build_id for the given board and version.
Raises:
XBuddyException: If we failed to resolve the version to a valid
build_id.
"""
build_id_as_is = devserver_constants.IMAGE_DIR % {
"board": board,
"suffix": "",
"version": version,
}
build_id_suffix = devserver_constants.IMAGE_DIR % {
"board": board,
"suffix": suffix,
"version": version,
}
# Return the first path that exists. We assume that what the user typed
# is better than with a default suffix added i.e. x86-generic/blah is
# more valuable than x86-generic-release/blah.
for build_id in build_id_as_is, build_id_suffix:
try:
version = self._ctx.LS(
"%s/%s" % (devserver_constants.GS_IMAGE_DIR, build_id)
)
return build_id
except gs.GSContextException as e:
if common_util.IsAnonymousCaller(e):
logging.error(
"Anonymous caller cannot list chromeos image archive"
)
return build_id_as_is
continue
raise XBuddyException(
"Could not find remote build_id for %s %s" % (board, version)
)
def _ResolveBuildVersion(
self, board: str, suffix: str, base_version: str
) -> str:
"""Check LATEST-<base_version> and returns a full build version."""
logging.debug(
"Checking gs for full version for %s of %s", base_version, board
)
# TODO(garnold) We might want to accommodate version prefixes and pick
# the most recent found, as done in _LookupVersion().
latest_addr = devserver_constants.GS_LATEST_BASE_VERSION % {
"image_dir": devserver_constants.GS_IMAGE_DIR,
"board": board,
"suffix": suffix,
"base_version": base_version,
}
# Full release + version is in the LATEST file.
return self._ctx.Cat(latest_addr)
def _ResolveVersionToBuildIdAndChannel(
self,
board: str,
suffix: str,
version: str,
image_dir: Optional[os.PathLike] = None,
) -> Tuple[os.PathLike, str]:
"""Handle version aliases for remote payloads in GS.
Args:
board: as specified in the original call. (i.e. x86-generic, parrot)
suffix: The location suffix, to be added to board name.
version: as entered in the original call. can be
{TBD, 0. some custom alias as defined in a config file}
1. fully qualified build version.
2. latest
3. latest-{channel}
4. latest-official-{board suffix}
5. version prefix (i.e. RX-Y.X, RX-Y, RX)
image_dir: image directory to check in Google Storage. If none,
the default bucket is used.
Returns:
Tuple of (Location where the image dir is actually found on GS
(build_id), best guess for the channel).
Raises:
XBuddyException: If we failed to resolve the version to a valid url.
"""
# Only the last segment of the alias is variable relative to the rest.
version_tuple = version.rsplit("-", 1)
if re.match(devserver_constants.VERSION_RE, version):
return self._RemoteBuildId(board, suffix, version), None
elif re.match(devserver_constants.VERSION, version):
raise XBuddyException(
'"%s" is not valid. Should provide the fully '
'qualified version with a version prefix "RX-" '
"due to crbug.com/585914" % version
)
elif version == LATEST_OFFICIAL:
# latest-official --> LATEST build in board-release
return (
self._LookupOfficial(board, suffix, image_dir=image_dir),
None,
)
elif version_tuple[0] == LATEST_OFFICIAL:
# latest-official-{suffix} --> LATEST build in board-{suffix}
return (
self._LookupOfficial(
board, version_tuple[1], image_dir=image_dir
),
None,
)
elif version == LATEST:
# latest --> latest build on stable channel
return (
self._LookupChannel(board, suffix, image_dir=image_dir),
"stable",
)
elif version_tuple[0] == LATEST:
if re.match(devserver_constants.VERSION_RE, version_tuple[1]):
# latest-R* --> most recent qualifying build
return (
self._LookupVersion(board, suffix, version_tuple[1]),
None,
)
else:
# latest-{channel} --> latest build within that channel
return (
self._LookupChannel(
board,
suffix,
channel=version_tuple[1],
image_dir=image_dir,
),
version_tuple[1],
)
else:
# The given version doesn't match any known patterns.
raise XBuddyException(
"Version %s unknown. Can't find on GS." % version
)
def _GetLatestLocalVersion(self, board: str) -> str:
"""Get the version of the latest image built by `cros build-image`.
Updates the symlink reference within the xBuddy static dir to point to
the real image dir in the local /build/images directory.
Args:
board: board that image was built for.
Returns:
The discovered version of the image.
Raises:
XBuddyException: if neither test nor dev image was found in latest
built directory.
"""
latest_local_dir = image_lib.GetLatestImageLink(board)
if not latest_local_dir or not os.path.exists(latest_local_dir):
raise XBuddyException(
f"No builds found for {board}. Did you run "
f"`cros build-image --board {board}`?"
)
# Assume that the version number is the name of the directory.
return os.path.basename(os.path.realpath(latest_local_dir))
def _SyncRegistryWithBuildImages(self) -> None:
"""Crawl images_dir for build_ids of images from `cros build-image`.
This will find images and symlink them in xBuddy's static dir so that
xBuddy's cache can serve them.
If xBuddy's _manage_builds option is on, then a timestamp will also be
generated, and xBuddy will clear them from the directory they are in, as
necessary.
"""
if not os.path.isdir(self.images_dir):
# Skip syncing if images_dir does not exist.
logging.error(
"Cannot find %s; skip syncing image registry.", self.images_dir
)
return
build_ids = []
for b in os.listdir(self.images_dir):
# Ignore random files in the build dir.
board_dir = os.path.join(self.images_dir, b)
if not os.path.isdir(board_dir):
continue
# Ensure we have directories to track all boards in build/images
common_util.MkDirP(os.path.join(self.static_dir, b))
build_ids.extend(
[
"/".join([b, v])
for v in os.listdir(board_dir)
if not v == LATEST
]
)
# Symlink undiscovered images, and update timestamps if manage_builds is
# on.
for build_id in build_ids:
link = os.path.join(self.static_dir, build_id)
target = os.path.join(self.images_dir, build_id)
osutils.SafeSymlink(target, link)
if self._manage_builds:
update_timestamp(self._timestamp_folder, build_id)
def _ListBuildTimes(self) -> List[Tuple[os.PathLike, datetime.timedelta]]:
"""Returns the currently cached builds and their last access timestamp.
Returns:
list of tuples that matches xBuddy build/version to timestamps in
long
"""
# Update currently cached builds.
build_dict = {}
for f in os.listdir(self._timestamp_folder):
last_accessed = os.path.getmtime(
os.path.join(self._timestamp_folder, f)
)
build_id = timestamp_to_build(f)
stale_time = datetime.timedelta(
seconds=(time.time() - last_accessed)
)
build_dict[build_id] = stale_time
return_tup = sorted(build_dict.items(), key=operator.itemgetter(1))
return return_tup
def _Download(
self, gs_url: str, artifacts: List[str], build_id: os.PathLike
) -> List[List[str]]:
"""Download the artifacts from the given gs_url.
Returns:
A list containing lists of downloaded files for each artifact.
e.g.: artifacts = ['a', 'b'] -> return [['f1', 'f2'], ['f3']]
Raises:
build_artifact.ArtifactDownloadError: If we failed to download the
artifact.
"""
with XBuddy._staging_thread_count_lock:
XBuddy._staging_thread_count += 1
try:
logging.debug("Downloading %s from %s", artifacts, gs_url)
dl = downloader.GoogleStorageDownloader(
self.static_dir, gs_url, build_id
)
factory = build_artifact.ChromeOSArtifactFactory(
dl.GetBuildDir(), artifacts, [], dl.GetBuild()
)
dl.Download(factory)
downloaded_artifacts = factory.RequiredArtifacts()
return [x.StagedFiles() for x in downloaded_artifacts]
finally:
with XBuddy._staging_thread_count_lock:
XBuddy._staging_thread_count -= 1
def CleanCache(self) -> None:
"""Delete all builds besides the newest N builds"""
if not self._manage_builds:
return
cached_builds = [e[0] for e in self._ListBuildTimes()]
logging.debug("In cache now: %s", cached_builds)
for b in range(self._Capacity(), len(cached_builds)):
b_path = cached_builds[b]
logging.debug("Clearing '%s' from cache", b_path)
time_file = os.path.join(
self._timestamp_folder, build_to_timestamp(b_path)
)
os.unlink(time_file)
clear_dir = os.path.join(self.static_dir, b_path)
try:
# Handle symlinks, in the case of links to local builds if
# enabled.
if os.path.islink(clear_dir):
target = os.readlink(clear_dir)
logging.debug("Deleting locally built image at %s", target)
os.unlink(clear_dir)
if os.path.exists(target):
shutil.rmtree(target)
elif os.path.exists(clear_dir):
logging.debug("Deleting downloaded image at %s", clear_dir)
shutil.rmtree(clear_dir)
except OSError as e:
raise XBuddyException(f"Failed to clear {clear_dir}: {e}")
def _TranslateSignedGSUrl(
self, build_id: os.PathLike, channel: str = None
) -> os.PathLike:
"""Translate the GS URL to be able to find signed images.
Args:
build_id: Path to the image or update directory on the devserver or
in Google Storage. e.g. 'x86-generic/R26-4000.0.0'
channel: The channel for the image. If none, it tries to guess it in
order of stability.
Returns:
The GS URL for the directory where the signed image can be found.
Raises:
build_artifact.ArtifactDownloadError: If we failed to download the
artifact.
"""
match = re.match(r"^([^/]+?)(?:-release)?/R\d+-(.*)$", build_id)
channels = []
if channel:
channels.append(channel)
else:
# Attempt to enumerate all channels, in order of stability.
channels.extend(devserver_constants.CHANNELS[::-1])
for c in channels:
image_dir = devserver_constants.GS_CHANNEL_DIR % {
"channel": c,
"board": match.group(1),
}
gs_url = os.path.join(image_dir, match.group(2))
try:
self._LS(gs_url)
return gs_url
except gs.GSNoSuchKey:
continue
raise build_artifact.ArtifactDownloadError(
"Could not find signed image URL for %s in Google Storage"
% build_id
)
def _GetFromGS(
self,
build_id: os.PathLike,
image_type: str,
image_dir: Optional[os.PathLike] = None,
channel: Optional[str] = None,
) -> List[os.PathLike]:
"""Check if the artifact is available locally. Download from GS if not.
Args:
build_id: Path to the image or update directory on the devserver or
in Google Storage. e.g. 'x86-generic/R26-4000.0.0'
image_type: Image type to download. Look at aliases at top of file
for options.
image_dir: Google Storage image archive to search in if requesting a
remote artifact. If none uses the default bucket.
channel: The channel for the image. If none, it tries to guess it in
order of stability.
Returns:
The list of files downloaded for the given image_type.
Raises:
build_artifact.ArtifactDownloadError: If we failed to download the
artifact.
"""
artifact = GS_ALIAS_TO_ARTIFACT[image_type]
if image_type == SIGNED:
gs_url = self._TranslateSignedGSUrl(build_id, channel=channel)
else:
image_dir = (image_dir or devserver_constants.GS_IMAGE_DIR).rstrip(
"/"
)
gs_url = os.path.join(image_dir, build_id)
return self._Download(gs_url, [artifact], build_id)[0]
def _GetArtifact(
self,
path_list: List[str],
board: Optional[str] = None,
version: Optional[str] = None,
lookup_only: bool = False,
image_dir: Optional[os.PathLike] = None,
) -> Tuple[os.PathLike, os.PathLike]:
"""Interpret an xBuddy path and return directory/file_name to resource.
Note board can be passed that in but by default if self._board is set,
that is used rather than board.
Args:
path_list: [board, version, alias] as split from the xbuddy call
url.
board: Board whose artifacts we are looking for. Only used if no
board was given during XBuddy initialization.
version: Version whose artifacts we are looking for. Used if no
version was given during XBuddy initialization. If None, defers
to LATEST.
lookup_only: If true just look up the artifact, if False stage it.
TODO(ahassani): If true, it should also return the file name.
image_dir: Google Storage image archive to search in if requesting a
remote artifact. If none uses the default bucket.
Returns:
build_id: Path to the image or update directory on the devserver or
in Google Storage. e.g. 'x86-generic/R26-4000.0.0'
file_name: of the artifact in the build_id directory.
Raises:
XBuddyException: if the path could not be translated.
build_artifact.ArtifactDownloadError: if we failed to download the
artifact.
"""
path = "/".join(path_list)
default_board = self._board or board
default_version = self._version or version or LATEST
# Rewrite the path if there is an appropriate default.
path, suffix = self.LookupAlias(
path, board=default_board, version=default_version
)
# Parse the path.
xbuddy_components = InterpretPath(path, default_board, default_version)
file_name = None
if xbuddy_components.is_local:
# Get a local image.
version = (
self._GetLatestLocalVersion(xbuddy_components.board)
if xbuddy_components.version == LATEST
else xbuddy_components.version
)
build_id = os.path.join(xbuddy_components.board, version)
artifact_dir = os.path.join(self.static_dir, build_id)
image_type = (
find_any(artifact_dir)
if xbuddy_components.image_type == ANY
else xbuddy_components.image_type
)
# If there was an image type discovered, get the file name
# otherwise, just return with no file name.
if image_type:
file_name = os.path.join(
artifact_dir, LOCAL_ALIAS_TO_FILENAME[image_type]
)
if not os.path.exists(file_name):
raise XBuddyException(
"Local %s artifact not in static_dir at %s"
% (image_type, file_name)
)
else:
# Get a remote image.
if xbuddy_components.image_type not in GS_ALIASES:
raise XBuddyException(
"Bad remote image type: %s. Use one of: %s"
% (xbuddy_components.image_type, GS_ALIASES)
)
build_id, channel = self._ResolveVersionToBuildIdAndChannel(
xbuddy_components.board,
suffix,
xbuddy_components.version,
image_dir=image_dir,
)
logging.debug("Resolved version %s to %s.", version, build_id)
if not lookup_only:
file_name = self._GetFromGS(
build_id,
xbuddy_components.image_type,
image_dir=image_dir,
channel=channel,
)[0]
return build_id, file_name
############################ BEGIN PUBLIC METHODS
def Translate(
self,
path_list: List[str],
board: Optional[str] = None,
version: Optional[str] = None,
image_dir: Optional[os.PathLike] = None,
) -> Tuple[os.PathLike, os.PathLike]:
"""Translates an xBuddy path to a real path to artifact if it exists.
Equivalent to the Get call, minus downloading and updating timestamps,
Args:
path_list: [board, version, alias] as split from the xbuddy call
url.
board: Board whose artifacts we are looking for. If None, use the
board XBuddy was initialized to use.
version: Version whose artifacts we are looking for. If None, use
the version XBuddy was initialized with, or LATEST.
image_dir: image directory to check in Google Storage. If none, the
default bucket is used.
Returns:
build_id: Path to the image or update directory on the devserver.
e.g. 'x86-generic/R26-4000.0.0'
The returned path is always the path to the directory within
static_dir, so it is always the build_id of the image.
file_name: The file name of the artifact. Can take any of the file
values in devserver_constants.
e.g. 'chromiumos_test_image.bin' or 'update.gz' if the path list
specified 'test' or 'full_payload' artifacts, respectively.
Raises:
XBuddyException: if the path couldn't be translated
"""
self._SyncRegistryWithBuildImages()
return self._GetArtifact(
path_list,
board=board,
version=version,
lookup_only=True,
image_dir=image_dir,
)
def Get(
self, path_list: List[str], image_dir: Optional[os.PathLike] = None
) -> Tuple[os.PathLike, os.PathLike]:
"""The full xBuddy call, returns resource specified by path_list.
Please see devserver.py:xbuddy for full documentation.
Args:
path_list: [board, version, alias] as split from the xbuddy call
url.
image_dir: image directory to check in Google Storage. If none, the
default bucket is used.
Returns:
build_id: Path to the image or update directory on the devserver.
e.g. 'x86-generic/R26-4000.0.0'
The returned path is always the path to the directory within
static_dir, so it is always the build_id of the image.
file_name: The file name of the artifact. Can take any of the file
values in devserver_constants.
e.g. 'chromiumos_test_image.bin' or 'update.gz' if the path list
specified 'test' or 'full_payload' artifacts, respectively.
Raises:
XBuddyException: if the path could not be translated
build_artifact.ArtifactDownloadError: if we failed to download the
artifact.
"""
self._SyncRegistryWithBuildImages()
build_id, file_name = self._GetArtifact(path_list, image_dir=image_dir)
update_timestamp(self._timestamp_folder, build_id)
# TODO(joyc): Run in separate thread.
self.CleanCache()
logging.debug(
"Returning build id: %s and path to payload: %s",
build_id,
file_name,
)
return build_id, file_name
def InterpretPath(
path: str, default_board: str = None, default_version: str = None
) -> XBuddyComponents:
"""Split and return the pieces of an xBuddy path name
Args:
path: the path xBuddy Get was called with.
default_board: board to use in case board isn't in path.
default_version: Version to use in case version isn't in path.
Returns:
tuple of (image_type, board, version, whether the path is local)
Raises:
XBuddyException: if the path can't be resolved into valid components
"""
if path.lower().startswith("xbuddy://"):
path = path[9:]
path_list = [p for p in path.split("/") if p]
# Do the stuff that is well known first. We know that if paths have a
# image_type, it must be one of the GS/LOCAL aliases and it must be at the
# end. Similarly, local/remote are well-known and must start the path list.
# Default to remote for chrome checkout.
is_local = (
path_util.DetermineCheckout().type != path_util.CHECKOUT_TYPE_GCLIENT
)
if path_list and path_list[0] in (REMOTE, LOCAL):
is_local = path_list.pop(0) == LOCAL
# Default image type is determined by remote vs. local.
image_type = ANY if is_local else TEST
if path_list and path_list[-1] in GS_ALIASES + LOCAL_ALIASES:
image_type = path_list.pop(-1)
# Now for the tricky part. We don't actually know at this point if the rest
# of the path is just a board | version (like R33-2341.0.0) or just a board
# or just a version. So we do our best to do the right thing.
board = default_board
version = default_version or LATEST
if len(path_list) == 1:
path = path_list.pop(0)
if path in (board, version):
pass
# Treat this as a version if it's one we know (contains default or
# latest), or we were given an actual default board.
elif default_version in path or LATEST in path or board is not None:
version = path
else:
board = path
elif len(path_list) == 2:
# Assumes board/version.
board = path_list.pop(0)
version = path_list.pop(0)
if path_list:
raise XBuddyException(
"Path is not valid. Could not figure out how to "
"parse remaining components: %s." % path_list
)
logging.debug(
"Get artifact '%s' with board %s and version %s'. Locally? %s",
image_type,
board,
version,
is_local,
)
return XBuddyComponents(image_type, board, version, is_local)
def parse(path: str, strict: bool = False) -> XBuddyComponents:
"""Given a fully formed URI, parse and return the XBuddy URI components.
Based on observations, here's how the URI may be arranged, and what defaults
are used if URI components aren't provided:
Full URI: xbuddy://${LOCATION}/${BOARD}/${VERSION}/${IMAGE_TYPE}
- ${LOCATION} specifies whether to check image folder locally or to
retrieve an image from a remote host (i.e. a Google Storage bucket.)
- DEFAULT: local
- ${BOARD} specifies a particular build target; useful for testing
hardware and/or overlay-specific software.
- DEFAULT: None
- ${VERSION} specifies a milestone-version identifier to target a
specific ChromeOS release build.
- DEFAULT: latest (an alias which gets resolved to the most recent
milestone-version pair available).
- ${IMAGE_TYPE} specifies the image type used to build an image in
question. Different image types include useful tools for developers
(e.g. extra packages used for testing software and hardware
functionality). The most common targeted types are dev, test, and
base.
- DEFAULT:
- If ${LOCATION} == 'local': ANY (an alias which resolves to any
usable image type.
- If ${LOCATION} == 'remote': 'test'
Any or all of these URI components may be omitted from the URI, but when
any components are present, this hierarchical ordering should be respected.
If reordered, the URI will not be parsed correctly.
Args:
path: The XBuddy URI. This method will perform URI validation and
parsing, and the client is not expected perform any validation.
strict: A client-specified option that enforces a requirement that the
XBuddy URI begin with a well-formed scheme (e.g. 'xbuddy://').
Returns:
A Components tuple containing the parsed elements of the URI.
Raises:
XBuddyInvalidSchemeException if the scheme is not xbuddy, or is missing
when strict checking is enforced.
"""
parsed = urllib.parse.urlparse(path)
if parsed.scheme == "":
if strict:
# Client has requested the scheme checking to be strict and require
# the URI to start with 'xbuddy://'. Enforce this and raise an
# exception.
raise XBuddyInvalidSchemeException(
"A URI scheme starting with 'xbuddy://' was expected but none "
"was found."
)
else:
logging.debug(
'Assuming "%s" is an xbuddy path; scheme enforcement was not '
"requested",
path,
)
elif parsed.scheme != "xbuddy":
raise XBuddyInvalidSchemeException(
"Scheme of XBuddy URI was invalid. Provided path: %s" % path
)
return InterpretPath(parsed.netloc + parsed.path)