| #!/usr/bin/python |
| # |
| # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import Queue |
| import os |
| import shutil |
| import tempfile |
| import threading |
| |
| import common_util |
| import log_util |
| |
| |
| class Downloader(log_util.Loggable): |
| """Download images to the devsever. |
| |
| Given a URL to a build on the archive server: |
| |
| - Determine if the build already exists. |
| - Download and extract the build to a staging directory. |
| - Package autotest tests. |
| - Install components to static dir. |
| """ |
| |
| # This filename must be kept in sync with clean_staged_images.py |
| _TIMESTAMP_FILENAME = 'staged.timestamp' |
| |
| def __init__(self, static_dir): |
| self._static_dir = static_dir |
| self._build_dir = None |
| self._staging_dir = None |
| self._status_queue = Queue.Queue(maxsize=1) |
| self._lock_tag = None |
| |
| @staticmethod |
| def ParseUrl(archive_url): |
| """Parse archive_url into rel_path and short_build |
| e.g. gs://chromeos-image-archive/{rel_path}/{short_build} |
| |
| @param archive_url: a URL at which build artifacts are archived. |
| @return a tuple of (build relative path, short build name) |
| """ |
| # The archive_url is of the form gs://server/[some_path/target]/...]/build |
| # This function discards 'gs://server/' and extracts the [some_path/target] |
| # as rel_path and the build as short_build. |
| sub_url = archive_url.partition('://')[2] |
| split_sub_url = sub_url.split('/') |
| rel_path = '/'.join(split_sub_url[1:-1]) |
| short_build = split_sub_url[-1] |
| return rel_path, short_build |
| |
| @staticmethod |
| def GenerateLockTag(rel_path, short_build): |
| """Generate a name for a lock scoped to this rel_path/build pair. |
| |
| @param rel_path: the relative path for the build. |
| @param short_build: short build name |
| @return a name to use with AcquireLock that will scope the lock. |
| """ |
| return '/'.join([rel_path, short_build]) |
| |
| @staticmethod |
| def _TouchTimestampForStaged(directory_path): |
| file_name = os.path.join(directory_path, Downloader._TIMESTAMP_FILENAME) |
| # Easiest python version of |touch file_name| |
| with file(file_name, 'a'): |
| os.utime(file_name, None) |
| |
| @staticmethod |
| def BuildStaged(archive_url, static_dir): |
| """Returns True if the build is already staged.""" |
| rel_path, short_build = Downloader.ParseUrl(archive_url) |
| sub_directory = Downloader.GenerateLockTag(rel_path, short_build) |
| directory_path = os.path.join(static_dir, sub_directory) |
| exists = os.path.isdir(directory_path) |
| # If the build exists, then touch the timestamp to tell |
| # clean_stages_images.py that we're using this build. |
| if exists: |
| Downloader._TouchTimestampForStaged(directory_path) |
| return exists |
| |
| def Download(self, archive_url, background=False): |
| """Downloads the given build artifacts defined by the |archive_url|. |
| |
| If background is set to True, will return back early before all artifacts |
| have been downloaded. The artifacts that can be backgrounded are all those |
| that are not set as synchronous. |
| |
| TODO: refactor this into a common Download method, once unit tests are |
| fixed up to make iterating on the code easier. |
| """ |
| # Parse archive_url into rel_path (contains the build target) and |
| # short_build. |
| # e.g. gs://chromeos-image-archive/{rel_path}/{short_build} |
| rel_path, short_build = self.ParseUrl(archive_url) |
| # This should never happen. The Devserver should only try to call this |
| # method if no previous downloads have been staged for this archive_url. |
| assert not Downloader.BuildStaged(archive_url, self._static_dir) |
| # Bind build_dir and staging_dir here so we can tell if we need to do any |
| # cleanup after an exception occurs before build_dir is set. |
| self._lock_tag = self.GenerateLockTag(rel_path, short_build) |
| try: |
| # Create Dev Server directory for this build and tell other Downloader |
| # instances we have processed this build. Note that during normal |
| # execution, this lock is only released in the actual downloading |
| # procedure called below. |
| self._build_dir = common_util.AcquireLock( |
| static_dir=self._static_dir, tag=self._lock_tag) |
| |
| # Replace '/' with '_' in rel_path because it may contain multiple levels |
| # which would not be qualified as part of the suffix. |
| self._staging_dir = tempfile.mkdtemp(suffix='_'.join( |
| [rel_path.replace('/', '_'), short_build])) |
| Downloader._TouchTimestampForStaged(self._staging_dir) |
| self._Log('Gathering download requirements %s' % archive_url) |
| artifacts = self.GatherArtifactDownloads( |
| self._staging_dir, archive_url, self._build_dir, short_build) |
| common_util.PrepareBuildDirectory(self._build_dir) |
| |
| self._Log('Downloading foreground artifacts from %s' % archive_url) |
| background_artifacts = [] |
| for artifact in artifacts: |
| if artifact.Synchronous(): |
| artifact.Download() |
| artifact.Stage() |
| else: |
| background_artifacts.append(artifact) |
| |
| if background: |
| self._DownloadArtifactsInBackground(background_artifacts) |
| else: |
| self._DownloadArtifactsSerially(background_artifacts) |
| |
| except Exception, e: |
| # Release processing lock, which will remove build components directory |
| # so future runs can retry. |
| if self._build_dir: |
| common_util.ReleaseLock(static_dir=self._static_dir, tag=self._lock_tag, |
| destroy=True) |
| |
| self._status_queue.put(e) |
| self._Cleanup() |
| raise |
| return 'Success' |
| |
| def _Cleanup(self): |
| """Cleans up the staging dir for this downloader instanfce.""" |
| if self._staging_dir: |
| self._Log('Cleaning up staging directory %s' % self._staging_dir) |
| shutil.rmtree(self._staging_dir) |
| |
| self._staging_dir = None |
| |
| def _DownloadArtifactsSerially(self, artifacts): |
| """Simple function to download all the given artifacts serially.""" |
| self._Log('Downloading artifacts serially.') |
| try: |
| for artifact in artifacts: |
| artifact.Download() |
| artifact.Stage() |
| except Exception, e: |
| self._status_queue.put(e) |
| |
| # Release processing lock, which will remove build components directory |
| # so future runs can retry. |
| if self._build_dir: |
| common_util.ReleaseLock(static_dir=self._static_dir, tag=self._lock_tag, |
| destroy=True) |
| else: |
| # Release processing lock, keeping directory intact. |
| if self._build_dir: |
| common_util.ReleaseLock(static_dir=self._static_dir, tag=self._lock_tag) |
| self._status_queue.put('Success') |
| finally: |
| self._Cleanup() |
| |
| def _DownloadArtifactsInBackground(self, artifacts): |
| """Downloads |artifacts| in the background and signals when complete.""" |
| self._Log('Invoking background download of artifacts') |
| thread = threading.Thread(target=self._DownloadArtifactsSerially, |
| args=(artifacts,)) |
| thread.start() |
| |
| def GatherArtifactDownloads(self, main_staging_dir, archive_url, build_dir, |
| short_build): |
| """Wrapper around common_util.GatherArtifactDownloads(). |
| |
| The wrapper allows mocking and overriding in derived classes. |
| """ |
| return common_util.GatherArtifactDownloads( |
| main_staging_dir, archive_url, build_dir, short_build) |
| |
| def GetStatusOfBackgroundDownloads(self): |
| """Returns the status of the background downloads. |
| |
| This commands returns the status of the background downloads and blocks |
| until a status is returned. |
| """ |
| status = self._status_queue.get() |
| # In case anyone else is calling. |
| self._status_queue.put(status) |
| # If someone is curious about the status of a build, then we should |
| # probably keep it around for a bit longer. |
| if self._staging_dir and os.path.exists(self._staging_dir): |
| Downloader._TouchTimestampForStaged(self._staging_dir) |
| # It's possible we received an exception, if so, re-raise it here. |
| if isinstance(status, Exception): |
| raise status |
| |
| return status |
| |
| |
| class SymbolDownloader(Downloader): |
| """Download and stage debug symbols for a build on the devsever. |
| |
| Given a URL to a build on the archive server: |
| |
| - Determine if the build already exists. |
| - Download and extract the debug symbols to a staging directory. |
| - Install symbols to static dir. |
| """ |
| |
| _DONE_FLAG = 'done' |
| |
| @staticmethod |
| def GenerateLockTag(rel_path, short_build): |
| return '/'.join([rel_path, short_build, 'symbols']) |
| |
| def Download(self, archive_url, _background=False): |
| """Downloads debug symbols for the build defined by the |archive_url|. |
| |
| The symbols will be downloaded synchronously |
| """ |
| # Parse archive_url into rel_path (contains the build target) and |
| # short_build. |
| # e.g. gs://chromeos-image-archive/{rel_path}/{short_build} |
| rel_path, short_build = self.ParseUrl(archive_url) |
| |
| # Bind build_dir and staging_dir here so we can tell if we need to do any |
| # cleanup after an exception occurs before build_dir is set. |
| self._lock_tag = self.GenerateLockTag(rel_path, short_build) |
| if self.SymbolsStaged(archive_url, self._static_dir): |
| self._Log('Symbols for build %s have already been staged.' % |
| self._lock_tag) |
| return 'Success' |
| |
| try: |
| # Create Dev Server directory for this build and tell other Downloader |
| # instances we have processed this build. |
| self._build_dir = common_util.AcquireLock( |
| static_dir=self._static_dir, tag=self._lock_tag) |
| |
| # Replace '/' with '_' in rel_path because it may contain multiple levels |
| # which would not be qualified as part of the suffix. |
| self._staging_dir = tempfile.mkdtemp(suffix='_'.join( |
| [rel_path.replace('/', '_'), short_build])) |
| self._Log('Downloading debug symbols from %s' % archive_url) |
| |
| [symbol_artifact] = self.GatherArtifactDownloads( |
| self._staging_dir, archive_url, self._static_dir) |
| symbol_artifact.Download() |
| symbol_artifact.Stage() |
| self.MarkSymbolsStaged() |
| |
| except Exception: |
| # Release processing "lock", which will indicate to future runs that we |
| # did not succeed, and so they should try again. |
| if self._build_dir: |
| common_util.ReleaseLock(static_dir=self._static_dir, tag=self._lock_tag, |
| destroy=True) |
| |
| raise |
| else: |
| # Release processing "lock", keeping directory intact. |
| if self._build_dir: |
| common_util.ReleaseLock(static_dir=self._static_dir, tag=self._lock_tag) |
| finally: |
| self._Cleanup() |
| |
| return 'Success' |
| |
| def GatherArtifactDownloads(self, temp_download_dir, archive_url, static_dir, |
| short_build=None): |
| """Call SymbolDownloader-appropriate artifact gathering method. |
| |
| @param temp_download_dir: the tempdir into which we're downloading artifacts |
| prior to staging them. |
| @param archive_url: the google storage url of the bucket where the debug |
| symbols for the desired build are stored. |
| @param staging_dir: the dir into which to stage the symbols |
| @param short_build: (ignored) |
| |
| @return an iterable of one DebugTarballBuildArtifact pointing to the right |
| debug symbols. This is an iterable so that it's similar to |
| GatherArtifactDownloads. Also, it's possible that someday we might |
| have more than one. |
| """ |
| return common_util.GatherSymbolArtifactDownloads( |
| temp_download_dir, archive_url, static_dir) |
| |
| def MarkSymbolsStaged(self): |
| """Puts a flag file on disk to signal that symbols are staged.""" |
| with open(os.path.join(self._build_dir, self._DONE_FLAG), 'w') as flag: |
| flag.write(self._DONE_FLAG) |
| |
| def SymbolsStaged(self, archive_url, static_dir): |
| """Returns True if the build is already staged.""" |
| rel_path, short_build = self.ParseUrl(archive_url) |
| sub_directory = self.GenerateLockTag(rel_path, short_build) |
| return os.path.isfile(os.path.join(static_dir, |
| sub_directory, |
| self._DONE_FLAG)) |
| |
| |
| class ImagesDownloader(Downloader): |
| """Download and stage prebuilt images for a given build. |
| |
| Given a URL to a build on the archive server and a list of images: |
| - Determine which images have not been staged yet. |
| - Download the image archive. |
| - Extract missing images to the staging directory. |
| |
| """ |
| _DONE_FLAG = 'staged' |
| |
| # List of images to be staged; empty (default) means all. |
| _image_list = [] |
| |
| # A mapping from test image types to their archived file names. |
| _IMAGE_TO_FNAME = { |
| 'test': 'chromiumos_test_image.bin', |
| 'base': 'chromiumos_base_image.bin', |
| 'recovery': 'recovery_image.bin', |
| } |
| |
| @staticmethod |
| def GenerateLockTag(rel_path, short_build): |
| return os.path.join('images', rel_path, short_build) |
| |
| def Download(self, archive_url, image_list, _background=False): |
| """Downloads images in |image_list| from the build defined by |archive_url|. |
| |
| Download happens synchronously. |images| may include any of those in |
| self._IMAGE_TO_FNAME.keys(). |
| |
| """ |
| # Check correctness of image list, remove duplicates. |
| if not image_list: |
| raise DevServerError('empty list of image types') |
| invalid_images = list(set(image_list) - set(self._IMAGE_TO_FNAME.keys())) |
| if invalid_images: |
| raise DevServerError('invalid images requested: %s' % invalid_images) |
| image_list = list(set(image_list)) |
| |
| # Parse archive_url into rel_path (contains the build target) and |
| # short_build. |
| # e.g. gs://chromeos-image-archive/{rel_path}/{short_build} |
| rel_path, short_build = self.ParseUrl(archive_url) |
| |
| # Bind build_dir and staging_dir here so we can tell if we need to do any |
| # cleanup after an exception occurs before build_dir is set. |
| self._lock_tag = self.GenerateLockTag(rel_path, short_build) |
| staged_image_list = self._CheckStagedImages(archive_url, self._static_dir) |
| unstaged_image_list = [image for image in image_list |
| if image not in staged_image_list] |
| if not unstaged_image_list: |
| self._Log( |
| 'All requested images (%s) for build %s have already been staged.' % |
| (common_util.CommaSeparatedList(image_list, is_quoted=True) |
| if image_list else 'none', |
| self._lock_tag)) |
| return 'Success' |
| |
| self._Log( |
| 'Image(s) %s for build %s will be staged' % |
| (common_util.CommaSeparatedList(unstaged_image_list, is_quoted=True), |
| self._lock_tag)) |
| self._image_list = unstaged_image_list |
| |
| try: |
| # Create a static target directory and lock it for processing. We permit |
| # the directory to preexist, as different images might be downloaded and |
| # extracted at different times. |
| self._build_dir = common_util.AcquireLock( |
| static_dir=self._static_dir, tag=self._lock_tag, |
| create_once=False) |
| |
| # Replace '/' with '_' in rel_path because it may contain multiple levels |
| # which would not be qualified as part of the suffix. |
| self._staging_dir = tempfile.mkdtemp(suffix='_'.join( |
| [rel_path.replace('/', '_'), short_build])) |
| self._Log('Downloading image archive from %s' % archive_url) |
| dest_static_dir = os.path.join(self._static_dir, self._lock_tag) |
| [image_archive_artifact] = self.GatherArtifactDownloads( |
| self._staging_dir, archive_url, dest_static_dir) |
| image_archive_artifact.Download() |
| self._Log('Staging images to %s' % dest_static_dir) |
| image_archive_artifact.Stage() |
| self._MarkStagedImages(unstaged_image_list) |
| |
| except Exception: |
| # Release processing "lock", which will indicate to future runs that we |
| # did not succeed, and so they should try again. |
| if self._build_dir: |
| common_util.ReleaseLock(static_dir=self._static_dir, tag=self._lock_tag, |
| destroy=True) |
| raise |
| else: |
| # Release processing "lock", keeping directory intact. |
| if self._build_dir: |
| common_util.ReleaseLock(static_dir=self._static_dir, tag=self._lock_tag) |
| finally: |
| self._Cleanup() |
| |
| return 'Success' |
| |
| def GatherArtifactDownloads(self, temp_download_dir, archive_url, static_dir, |
| short_build=None): |
| """Call appropriate artifact gathering method. |
| |
| Args: |
| temp_download_dir: temporary directory for downloading artifacts to |
| archive_url: URI to the bucket where image archive is stored |
| staging_dir: directory into which to stage extracted images |
| short_build: (ignored) |
| Returns: |
| list of downloadable artifacts (of type ZipfileBuildArtifact), currently |
| containing a single object, configured for extracting a predetermined |
| list of images |
| """ |
| return common_util.GatherImageArchiveArtifactDownloads( |
| temp_download_dir, archive_url, static_dir, |
| [self._IMAGE_TO_FNAME[image] for image in self._image_list]) |
| |
| def _MarkStagedImages(self, image_list): |
| """Update the on-disk flag file with the list of newly staged images. |
| |
| This does not check for duplicates against already listed images, and will |
| add any listed images regardless. |
| |
| """ |
| flag_fname = os.path.join(self._build_dir, self._DONE_FLAG) |
| with open(flag_fname, 'a') as flag_file: |
| flag_file.writelines([image + '\n' for image in image_list]) |
| |
| def _CheckStagedImages(self, archive_url, static_dir): |
| """Returns a list of images that were already staged. |
| |
| Reads the list of images from a flag file, if one is present, and returns |
| after removing duplicates. |
| |
| """ |
| rel_path, short_build = self.ParseUrl(archive_url) |
| sub_directory = self.GenerateLockTag(rel_path, short_build) |
| flag_fname = os.path.join(static_dir, sub_directory, self._DONE_FLAG) |
| staged_image_list = [] |
| # TODO(garnold) make this code immune to race conditions, probably by |
| # acquiring a lock around the file access code. |
| if os.path.isfile(flag_fname): |
| with open(flag_fname) as flag_file: |
| staged_image_list = [image.strip() for image in flag_file.readlines()] |
| return list(set(staged_image_list)) |