# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Main module for parsing and interpreting XBuddy paths for the devserver."""
from __future__ import print_function
import cherrypy
import ConfigParser
import datetime
import operator
import os
import re
import shutil
import time
import threading
import build_util
import artifact_info
import common_util
import devserver_constants
import downloader
import gsutil_util
import log_util
# Module-local log function.
def _Log(message, *args):
return log_util.LogWithTag('XBUDDY', message, *args)
# xBuddy config constants
CONFIG_FILE = 'xbuddy_config.ini'
SHADOW_CONFIG_FILE = 'shadow_xbuddy_config.ini'
# Path for shadow config in chroot.
CHROOT_SHADOW_DIR = '/mnt/host/source/src/platform/dev'
# XBuddy aliases
TEST = 'test'
BASE = 'base'
DEV = 'dev'
FULL = 'full_payload'
RECOVERY = 'recovery'
STATEFUL = 'stateful'
AUTOTEST = 'autotest'
# Local build constants
LATEST = "latest"
LOCAL = "local"
REMOTE = "remote"
# TODO(sosa): Fix a lot of assumptions about these aliases. There is too much
# implicit logic here that's unnecessary. What should be done:
# 1) Collapse Alias logic to one set of aliases for xbuddy (not local/remote).
# 2) Do not use zip when creating these dicts. Better to not rely on ordering.
# 3) Move alias/artifact mapping to a central module rather than having it here.
# 4) Be explicit when things are missing i.e. no dev images in
None, # For ANY.
# Google Storage constants
LATEST_OFFICIAL = "latest-official"
RELEASE = "-release"
class XBuddyException(Exception):
"""Exception classes used by this module."""
# no __init__ method
#pylint: disable=W0232
class Timestamp(object):
"""Class to translate build path strings and timestamp filenames."""
XBUDDY_TIMESTAMP_DIR = 'xbuddy_UpdateTimestamps'
def TimestampToBuild(timestamp_filename):
return timestamp_filename.replace(Timestamp._TIMESTAMP_DELIMITER, '/')
def BuildToTimestamp(build_path):
return build_path.replace('/', Timestamp._TIMESTAMP_DELIMITER)
def UpdateTimestamp(timestamp_dir, build_id):
"""Update timestamp file of build with build_id."""
_Log("Updating timestamp for %s", build_id)
time_file = os.path.join(timestamp_dir,
with file(time_file, 'a'):
os.utime(time_file, None)
#pylint: enable=W0232
class XBuddy(build_util.BuildObject):
"""Class that manages image retrieval and caching by the devserver.
Image retrieval by xBuddy path:
XBuddy accesses images and artifacts that it stores using an xBuddy
path of the form: board/version/alias
The primary xbuddy.Get call retrieves the correct artifact or url to where
the artifacts can be found.
Image caching:
Images and other artifacts are stored identically to how they would have
been if devserver's stage rpc was called and the xBuddy cache replaces
build versions on a LRU basis. Timestamps are maintained by last accessed
times of representative files in the a directory in the static serve
Private class members:
_true_values: used for interpreting boolean values
_staging_thread_count: track download requests
_timestamp_folder: directory with empty files standing in as timestamps
for each image currently cached by xBuddy
_true_values = ['true', 't', 'yes', 'y']
# Number of threads that are staging images.
_staging_thread_count = 0
# Lock used to lock increasing/decreasing count.
_staging_thread_count_lock = threading.Lock()
def __init__(self, manage_builds=False, board=None, version=None,
images_dir=None, log_screen=True, **kwargs):
super(XBuddy, self).__init__(**kwargs)
if not log_screen:
cherrypy.config.update({'log.screen': False})
self.config = self._ReadConfig()
self._manage_builds = manage_builds or self._ManageBuilds()
self._board = board
self._version = version
self._timestamp_folder = os.path.join(self.static_dir,
if images_dir:
self.images_dir = images_dir
self.images_dir = os.path.join(self.GetSourceRoot(), 'src/build/images')
def ParseBoolean(cls, boolean_string):
"""Evaluate a string to a boolean value"""
if boolean_string:
return boolean_string.lower() in cls._true_values
return False
def _ReadConfig(self):
"""Read xbuddy config from ini files.
Reads the base config from xbuddy_config.ini, and then merges in the
shadow config from shadow_xbuddy_config.ini
The merged configuration.
XBuddyException if the config file is missing.
xbuddy_config = ConfigParser.ConfigParser()
config_file = os.path.join(self.devserver_dir, CONFIG_FILE)
if os.path.exists(config_file):
# Get the directory of file.
file_dir = os.path.dirname(os.path.realpath(__file__))
# Read the default xbuddy_config.ini from the directory., CONFIG_FILE))
# Read the shadow file if there is one.
if os.path.isdir(CHROOT_SHADOW_DIR):
shadow_config_file = os.path.join(CHROOT_SHADOW_DIR, SHADOW_CONFIG_FILE)
shadow_config_file = os.path.join(self.devserver_dir, SHADOW_CONFIG_FILE)
_Log('Using shadow config file stored at %s', shadow_config_file)
if os.path.exists(shadow_config_file):
shadow_xbuddy_config = ConfigParser.ConfigParser()
# Merge shadow config in.
sections = shadow_xbuddy_config.sections()
for s in sections:
if not xbuddy_config.has_section(s):
options = shadow_xbuddy_config.options(s)
for o in options:
val = shadow_xbuddy_config.get(s, o)
xbuddy_config.set(s, o, val)
return xbuddy_config
def _ManageBuilds(self):
"""Checks if xBuddy is managing local builds using the current config."""
return self.ParseBoolean(self.config.get(GENERAL, 'manage_builds'))
except ConfigParser.Error:
return False
def _Capacity(self):
"""Gets the xbuddy capacity from the current config."""
return int(self.config.get(GENERAL, 'capacity'))
except ConfigParser.Error:
return 5
def _LookupAlias(self, alias, board, version):
"""Given the full xbuddy config, look up an alias for path rewrite.
alias: The xbuddy path that could be one of the aliases in the
rewrite table.
board: The board to fill in with when paths are rewritten. Can be from
the update request xml or the default board from devserver.
version: The version to fill in when rewriting paths. Could be a specific
version number or a version alias like LATEST.
If a rewrite is found, a string with the current board substituted in.
If no rewrite is found, just return the original string.
val = self.config.get(PATH_REWRITES, alias)
except ConfigParser.Error:
# No alias lookup found. Return original path.
return alias
if not val.strip():
# The found value was an empty string.
return alias
# Fill in the board and version.
rewrite = val.replace("BOARD", "%(board)s")
rewrite = rewrite.replace("VERSION", "%(version)s")
rewrite = rewrite % {'board': board, 'version': version}
_Log("Path was rewritten to %s", rewrite)
return rewrite
def _ResolveImageDir(image_dir):
"""Clean up and return the image dir to use.
image_dir: directory in Google Storage to use.
|image_dir| if |image_dir| is not None. Otherwise, returns
image_dir = image_dir or devserver_constants.GS_IMAGE_DIR
# Remove trailing slashes.
return image_dir.rstrip('/')
def _LookupOfficial(self, board, suffix=RELEASE, image_dir=None):
"""Check LATEST-master for the version number of interest."""
_Log("Checking gs for latest %s-%s image", board, suffix)
image_dir = XBuddy._ResolveImageDir(image_dir)
latest_addr = (devserver_constants.GS_LATEST_MASTER %
{'image_dir': image_dir,
'board': board,
'suffix': suffix})
cmd = 'gsutil cat %s' % latest_addr
msg = 'Failed to find build at %s' % latest_addr
# Full release + version is in the LATEST file.
version = gsutil_util.GSUtilRun(cmd, msg)
return devserver_constants.IMAGE_DIR % {'board':board,
def _LookupChannel(self, board, channel='stable', image_dir=None):
"""Check the channel folder for the version number of interest."""
# Get all names in channel dir. Get 10 highest directories by version.
_Log("Checking channel '%s' for latest '%s' image", channel, board)
# Due to historical reasons, gs://chromeos-releases uses
# daisy-spring as opposed to the board name daisy_spring. Convert
# the board name for the lookup.
channel_dir = devserver_constants.GS_CHANNEL_DIR % {
'board':re.sub('_', '-', board)}
latest_version = gsutil_util.GetLatestVersionFromGSDir(
channel_dir, with_release=False)
# Figure out release number from the version number.
image_url = devserver_constants.IMAGE_DIR % {
'version':'R*' + latest_version}
image_dir = XBuddy._ResolveImageDir(image_dir)
gs_url = os.path.join(image_dir, image_url)
# There should only be one match on cros-image-archive.
full_version = gsutil_util.GetLatestVersionFromGSDir(gs_url)
return devserver_constants.IMAGE_DIR % {'board':board,
def _LookupVersion(self, board, version):
"""Search GS image releases for the highest match to a version prefix."""
# Build the pattern for GS to match.
_Log("Checking gs for latest '%s' image with prefix '%s'", board, version)
image_url = devserver_constants.IMAGE_DIR % {'board':board,
'version':version + '*'}
image_dir = os.path.join(devserver_constants.GS_IMAGE_DIR, image_url)
# Grab the newest version of the ones matched.
full_version = gsutil_util.GetLatestVersionFromGSDir(image_dir)
return devserver_constants.IMAGE_DIR % {'board':board,
def _RemoteBuildId(self, board, version):
"""Returns the remote build_id for the given board and version.
XBuddyException: If we failed to resolve the version to a valid build_id.
build_id_as_is = devserver_constants.IMAGE_DIR % {'board':board,
build_id_release = devserver_constants.IMAGE_DIR % {'board':board,
# Return the first path that exists. We assume that what the user typed
# is better than with a default suffix added i.e. x86-generic/blah is
# more valuable than x86-generic-release/blah.
for build_id in build_id_as_is, build_id_release:
cmd = 'gsutil ls %s/%s' % (devserver_constants.GS_IMAGE_DIR, build_id)
version = gsutil_util.GSUtilRun(cmd, None)
return build_id
except gsutil_util.GSUtilError:
raise XBuddyException('Could not find remote build_id for %s %s' % (
board, version))
def _ResolveBuildVersion(self, board, base_version):
"""Check LATEST-<base_version> and returns a full build version."""
_Log('Checking gs for full version for %s of %s', base_version, board)
# TODO(garnold) We might want to accommodate version prefixes and pick the
# most recent found, as done in _LookupVersion().
latest_addr = (devserver_constants.GS_LATEST_BASE_VERSION %
{'image_dir': devserver_constants.GS_IMAGE_DIR,
'board': board,
'suffix': RELEASE,
'base_version': base_version})
cmd = 'gsutil cat %s' % latest_addr
msg = 'Failed to find build at %s' % latest_addr
# Full release + version is in the LATEST file.
return gsutil_util.GSUtilRun(cmd, msg)
def _ResolveVersionToBuildId(self, board, version, image_dir=None):
"""Handle version aliases for remote payloads in GS.
board: as specified in the original call. (i.e. x86-generic, parrot)
version: as entered in the original call. can be
{TBD, 0. some custom alias as defined in a config file}
1. fully qualified build version or base version.
2. latest
3. latest-{channel}
4. latest-official-{board suffix}
5. version prefix (i.e. RX-Y.X, RX-Y, RX)
image_dir: image directory to check in Google Storage. If none,
the default bucket is used.
Location where the image dir is actually found on GS (build_id)
XBuddyException: If we failed to resolve the version to a valid url.
# Only the last segment of the alias is variable relative to the rest.
version_tuple = version.rsplit('-', 1)
if re.match(devserver_constants.VERSION_RE, version):
return self._RemoteBuildId(board, version)
elif re.match(devserver_constants.VERSION, version):
return self._RemoteBuildId(board,
self._ResolveBuildVersion(board, version))
elif version == LATEST_OFFICIAL:
# latest-official --> LATEST build in board-release
return self._LookupOfficial(board, image_dir=image_dir)
elif version_tuple[0] == LATEST_OFFICIAL:
# latest-official-{suffix} --> LATEST build in board-{suffix}
return self._LookupOfficial(board, version_tuple[1], image_dir=image_dir)
elif version == LATEST:
# latest --> latest build on stable channel
return self._LookupChannel(board, image_dir=image_dir)
elif version_tuple[0] == LATEST:
if re.match(devserver_constants.VERSION_RE, version_tuple[1]):
# latest-R* --> most recent qualifying build
return self._LookupVersion(board, version_tuple[1])
# latest-{channel} --> latest build within that channel
return self._LookupChannel(board, version_tuple[1],
# The given version doesn't match any known patterns.
raise XBuddyException("Version %s unknown. Can't find on GS." % version)
def _Symlink(link, target):
"""Symlinks link to target, and removes whatever link was there before."""
_Log("Linking to %s from %s", link, target)
if os.path.lexists(link):
os.symlink(target, link)
def _GetLatestLocalVersion(self, board):
"""Get the version of the latest image built for board by build_image
Updates the symlink reference within the xBuddy static dir to point to
the real image dir in the local /build/images directory.
board: board that image was built for.
The discovered version of the image.
XBuddyException if neither test nor dev image was found in latest built
latest_local_dir = self.GetLatestImageDir(board)
if not latest_local_dir or not os.path.exists(latest_local_dir):
raise XBuddyException('No builds found for %s. Did you run build_image?' %
# Assume that the version number is the name of the directory.
return os.path.basename(latest_local_dir.rstrip('/'))
def _FindAny(local_dir):
"""Returns the image_type for ANY given the local_dir."""
test_image = os.path.join(local_dir, devserver_constants.TEST_IMAGE_FILE)
dev_image = os.path.join(local_dir, devserver_constants.IMAGE_FILE)
# Prioritize test images over dev images.
if os.path.exists(test_image):
return 'test'
if os.path.exists(dev_image):
return 'dev'
raise XBuddyException('No images found in %s' % local_dir)
def _InterpretPath(path, default_board=None, default_version=None):
"""Split and return the pieces of an xBuddy path name
path: the path xBuddy Get was called with.
default_board: board to use in case board isn't in path.
default_version: Version to use in case version isn't in path.
tuple of (image_type, board, version, whether the path is local)
XBuddyException: if the path can't be resolved into valid components
path_list = filter(None, path.split('/'))
# Do the stuff that is well known first. We know that if paths have a
# image_type, it must be one of the GS/LOCAL aliases and it must be at the
# end. Similarly, local/remote are well-known and must start the path list.
is_local = True
if path_list and path_list[0] in (REMOTE, LOCAL):
is_local = (path_list.pop(0) == LOCAL)
# Default image type is determined by remote vs. local.
if is_local:
image_type = ANY
image_type = TEST
if path_list and path_list[-1] in GS_ALIASES + LOCAL_ALIASES:
image_type = path_list.pop(-1)
# Now for the tricky part. We don't actually know at this point if the rest
# of the path is just a board | version (like R33-2341.0.0) or just a board
# or just a version. So we do our best to do the right thing.
board = default_board
version = default_version or LATEST
if len(path_list) == 1:
path = path_list.pop(0)
# Treat this as a version if it's one we know (contains default or
# latest), or we were given an actual default board.
if default_version in path or LATEST in path or default_board is not None:
version = path
board = path
elif len(path_list) == 2:
# Assumes board/version.
board = path_list.pop(0)
version = path_list.pop(0)
if path_list:
raise XBuddyException("Path isn't valid. Could not figure out how to "
"parse remaining components: %s." % path_list)
_Log("Get artifact '%s' with board %s and version %s'. Locally? %s",
image_type, board, version, is_local)
return image_type, board, version, is_local
def _SyncRegistryWithBuildImages(self):
"""Crawl images_dir for build_ids of images generated from build_image.
This will find images and symlink them in xBuddy's static dir so that
xBuddy's cache can serve them.
If xBuddy's _manage_builds option is on, then a timestamp will also be
generated, and xBuddy will clear them from the directory they are in, as
if not os.path.isdir(self.images_dir):
# Skip syncing if images_dir does not exist.
_Log('Cannot find %s; skip syncing image registry.', self.images_dir)
build_ids = []
for b in os.listdir(self.images_dir):
# Ensure we have directories to track all boards in build/images
common_util.MkDirP(os.path.join(self.static_dir, b))
board_dir = os.path.join(self.images_dir, b)
build_ids.extend(['/'.join([b, v]) for v
in os.listdir(board_dir) if not v == LATEST])
# Symlink undiscovered images, and update timestamps if manage_builds is on.
for build_id in build_ids:
link = os.path.join(self.static_dir, build_id)
target = os.path.join(self.images_dir, build_id)
XBuddy._Symlink(link, target)
if self._manage_builds:
Timestamp.UpdateTimestamp(self._timestamp_folder, build_id)
def _ListBuildTimes(self):
"""Returns the currently cached builds and their last access timestamp.
list of tuples that matches xBuddy build/version to timestamps in long
# Update currently cached builds.
build_dict = {}
for f in os.listdir(self._timestamp_folder):
last_accessed = os.path.getmtime(os.path.join(self._timestamp_folder, f))
build_id = Timestamp.TimestampToBuild(f)
stale_time = datetime.timedelta(seconds=(time.time() - last_accessed))
build_dict[build_id] = stale_time
return_tup = sorted(build_dict.iteritems(), key=operator.itemgetter(1))
return return_tup
def _Download(self, gs_url, artifacts):
"""Download the artifacts from the given gs_url.
build_artifact.ArtifactDownloadError: If we failed to download the
with XBuddy._staging_thread_count_lock:
XBuddy._staging_thread_count += 1
_Log("Downloading %s from %s", artifacts, gs_url)
downloader.Downloader(self.static_dir, gs_url).Download(artifacts, [])
with XBuddy._staging_thread_count_lock:
XBuddy._staging_thread_count -= 1
def CleanCache(self):
"""Delete all builds besides the newest N builds"""
if not self._manage_builds:
cached_builds = [e[0] for e in self._ListBuildTimes()]
_Log('In cache now: %s', cached_builds)
for b in range(self._Capacity(), len(cached_builds)):
b_path = cached_builds[b]
_Log("Clearing '%s' from cache", b_path)
time_file = os.path.join(self._timestamp_folder,
clear_dir = os.path.join(self.static_dir, b_path)
# Handle symlinks, in the case of links to local builds if enabled.
if os.path.islink(clear_dir):
target = os.readlink(clear_dir)
_Log('Deleting locally built image at %s', target)
if os.path.exists(target):
elif os.path.exists(clear_dir):
_Log('Deleting downloaded image at %s', clear_dir)
except Exception as err:
raise XBuddyException('Failed to clear %s: %s' % (clear_dir, err))
def _GetFromGS(self, build_id, image_type, image_dir=None):
"""Check if the artifact is available locally. Download from GS if not.
build_id: Path to the image or update directory on the devserver or
in Google Storage. e.g. 'x86-generic/R26-4000.0.0'
image_type: Image type to download. Look at aliases at top of file for
image_dir: Google Storage image archive to search in if requesting a
remote artifact. If none uses the default bucket.
build_artifact.ArtifactDownloadError: If we failed to download the
image_dir = XBuddy._ResolveImageDir(image_dir)
gs_url = os.path.join(image_dir, build_id)
# Stage image if not found in cache.
file_name = GS_ALIAS_TO_FILENAME[image_type]
file_loc = os.path.join(self.static_dir, build_id, file_name)
cached = os.path.exists(file_loc)
if not cached:
artifact = GS_ALIAS_TO_ARTIFACT[image_type]
self._Download(gs_url, [artifact])
_Log('Image already cached.')
def _GetArtifact(self, path_list, board=None, version=None,
lookup_only=False, image_dir=None):
"""Interpret an xBuddy path and return directory/file_name to resource.
Note board can be passed that in but by default if self._board is set,
that is used rather than board.
path_list: [board, version, alias] as split from the xbuddy call url.
board: Board whos artifacts we are looking for. Only used if no board was
given during XBuddy initialization.
version: Version whose artifacts we are looking for. Used if no version
was given during XBuddy initialization. If None, defers to LATEST.
lookup_only: If true just look up the artifact, if False stage it on
the devserver as well.
image_dir: Google Storage image archive to search in if requesting a
remote artifact. If none uses the default bucket.
build_id: Path to the image or update directory on the devserver or
in Google Storage. e.g. 'x86-generic/R26-4000.0.0'
file_name: of the artifact in the build_id directory.
XBuddyException: if the path could not be translated
build_artifact.ArtifactDownloadError: if we failed to download the
path = '/'.join(path_list)
default_board = self._board if self._board else board
default_version = self._version or version or LATEST
# Rewrite the path if there is an appropriate default.
path = self._LookupAlias(path, default_board, default_version)
# Parse the path.
image_type, board, version, is_local = self._InterpretPath(
path, default_board, default_version)
if is_local:
# Get a local image.
if version == LATEST:
# Get the latest local image for the given board.
version = self._GetLatestLocalVersion(board)
build_id = os.path.join(board, version)
artifact_dir = os.path.join(self.static_dir, build_id)
if image_type == ANY:
image_type = self._FindAny(artifact_dir)
file_name = LOCAL_ALIAS_TO_FILENAME[image_type]
artifact_path = os.path.join(artifact_dir, file_name)
if not os.path.exists(artifact_path):
raise XBuddyException('Local %s artifact not in static_dir at %s' %
(image_type, artifact_path))
# Get a remote image.
if image_type not in GS_ALIASES:
raise XBuddyException('Bad remote image type: %s. Use one of: %s' %
(image_type, GS_ALIASES))
build_id = self._ResolveVersionToBuildId(board, version,
_Log('Resolved version %s to %s.', version, build_id)
file_name = GS_ALIAS_TO_FILENAME[image_type]
if not lookup_only:
self._GetFromGS(build_id, image_type, image_dir=image_dir)
return build_id, file_name
############################ BEGIN PUBLIC METHODS
def List(self):
"""Lists the currently available images & time since last access."""
builds = self._ListBuildTimes()
return_string = ''
for build, timestamp in builds:
return_string += '<b>' + build + '</b> '
return_string += '(time since last access: ' + str(timestamp) + ')<br>'
return return_string
def Capacity(self):
"""Returns the number of images cached by xBuddy."""
return str(self._Capacity())
def Translate(self, path_list, board=None, version=None, image_dir=None):
"""Translates an xBuddy path to a real path to artifact if it exists.
Equivalent to the Get call, minus downloading and updating timestamps,
path_list: [board, version, alias] as split from the xbuddy call url.
board: Board whos artifacts we are looking for. If None, use the board
XBuddy was initialized to use.
version: Version whose artifacts we are looking for. If None, use the
version XBuddy was initialized with, or LATEST.
image_dir: image directory to check in Google Storage. If none,
the default bucket is used.
build_id: Path to the image or update directory on the devserver.
e.g. 'x86-generic/R26-4000.0.0'
The returned path is always the path to the directory within
static_dir, so it is always the build_id of the image.
file_name: The file name of the artifact. Can take any of the file
values in devserver_constants.
e.g. 'chromiumos_test_image.bin' or 'update.gz' if the path list
specified 'test' or 'full_payload' artifacts, respectively.
XBuddyException: if the path couldn't be translated
build_id, file_name = self._GetArtifact(path_list, board=board,
_Log('Returning path to payload: %s/%s', build_id, file_name)
return build_id, file_name
def StageTestArtifactsForUpdate(self, path_list):
"""Stages test artifacts for update and returns build_id.
XBuddyException: if the path could not be translated
build_artifact.ArtifactDownloadError: if we failed to download the test
build_id, file_name = self.Translate(path_list)
if file_name == devserver_constants.TEST_IMAGE_FILE:
gs_url = os.path.join(devserver_constants.GS_IMAGE_DIR,
artifacts = [FULL, STATEFUL]
self._Download(gs_url, artifacts)
return build_id
def Get(self, path_list, image_dir=None):
"""The full xBuddy call, returns resource specified by path_list.
Please see for full documentation.
path_list: [board, version, alias] as split from the xbuddy call url.
image_dir: image directory to check in Google Storage. If none,
the default bucket is used.
build_id: Path to the image or update directory on the devserver.
e.g. 'x86-generic/R26-4000.0.0'
The returned path is always the path to the directory within
static_dir, so it is always the build_id of the image.
file_name: The file name of the artifact. Can take any of the file
values in devserver_constants.
e.g. 'chromiumos_test_image.bin' or 'update.gz' if the path list
specified 'test' or 'full_payload' artifacts, respectively.
XBuddyException: if the path could not be translated
build_artifact.ArtifactDownloadError: if we failed to download the
build_id, file_name = self._GetArtifact(path_list, image_dir=image_dir)
Timestamp.UpdateTimestamp(self._timestamp_folder, build_id)
#TODO (joyc): run in sep thread
_Log('Returning path to payload: %s/%s', build_id, file_name)
return build_id, file_name