| # Copyright 2013-2018 Gentoo Foundation |
| # Distributed under the terms of the GNU General Public License v2 |
| |
| import threading |
| |
| from portage import os |
| from portage.checksum import (_apply_hash_filter, |
| _filter_unaccelarated_hashes, _hash_filter) |
| from portage.dep import use_reduce |
| from portage.exception import PortageException, PortageKeyError |
| from portage.util._async.AsyncTaskFuture import AsyncTaskFuture |
| from portage.util._async.TaskScheduler import TaskScheduler |
| from portage.util.futures.iter_completed import iter_gather |
| from .FetchTask import FetchTask |
| from _emerge.CompositeTask import CompositeTask |
| |
| |
| class FetchIterator(object): |
| |
| def __init__(self, config): |
| self._config = config |
| self._terminated = threading.Event() |
| |
| def terminate(self): |
| """ |
| Schedules early termination of the __iter__ method, which is |
| useful because under some conditions it's possible for __iter__ |
| to loop for a long time without yielding to the caller. For |
| example, it's useful when there are many ebuilds with stale |
| cache and RESTRICT=mirror. |
| |
| This method is thread-safe (and safe for signal handlers). |
| """ |
| self._terminated.set() |
| |
| def _iter_every_cp(self): |
| # List categories individually, in order to start yielding quicker, |
| # and in order to reduce latency in case of a signal interrupt. |
| cp_all = self._config.portdb.cp_all |
| for category in sorted(self._config.portdb.categories): |
| for cp in cp_all(categories=(category,)): |
| yield cp |
| |
| def __iter__(self): |
| |
| portdb = self._config.portdb |
| get_repo_for_location = portdb.repositories.get_repo_for_location |
| |
| hash_filter = _hash_filter( |
| portdb.settings.get("PORTAGE_CHECKSUM_FILTER", "")) |
| if hash_filter.transparent: |
| hash_filter = None |
| |
| for cp in self._iter_every_cp(): |
| |
| if self._terminated.is_set(): |
| return |
| |
| for tree in portdb.porttrees: |
| |
| # Reset state so the Manifest is pulled once |
| # for this cp / tree combination. |
| repo_config = get_repo_for_location(tree) |
| digests_future = portdb._event_loop.create_future() |
| |
| for cpv in portdb.cp_list(cp, mytree=tree): |
| |
| if self._terminated.is_set(): |
| return |
| |
| yield _EbuildFetchTasks( |
| fetch_tasks_future=_async_fetch_tasks( |
| self._config, |
| hash_filter, |
| repo_config, |
| digests_future, |
| cpv, |
| portdb._event_loop) |
| ) |
| |
| |
| class _EbuildFetchTasks(CompositeTask): |
| """ |
| Executes FetchTask instances (which are asynchronously constructed) |
| for each of the files referenced by an ebuild. |
| """ |
| __slots__ = ('fetch_tasks_future',) |
| def _start(self): |
| self._start_task(AsyncTaskFuture(future=self.fetch_tasks_future), |
| self._start_fetch_tasks) |
| |
| def _start_fetch_tasks(self, task): |
| if self._default_exit(task) != os.EX_OK: |
| self._async_wait() |
| return |
| |
| self._start_task( |
| TaskScheduler( |
| iter(self.fetch_tasks_future.result()), |
| max_jobs=1, |
| event_loop=self.scheduler), |
| self._default_final_exit) |
| |
| |
| def _async_fetch_tasks(config, hash_filter, repo_config, digests_future, cpv, |
| loop): |
| """ |
| Asynchronously construct FetchTask instances for each of the files |
| referenced by an ebuild. |
| |
| @param config: emirrordist config |
| @type config: portage._emirrordist.Config.Config |
| @param hash_filter: PORTAGE_CHECKSUM_FILTER settings |
| @type hash_filter: portage.checksum._hash_filter |
| @param repo_config: repository configuration |
| @type repo_config: RepoConfig |
| @param digests_future: future that contains cached distfiles digests |
| for the current cp if available |
| @type digests_future: asyncio.Future |
| @param cpv: current ebuild cpv |
| @type cpv: portage.versions._pkg_str |
| @param loop: event loop |
| @type loop: EventLoop |
| @return: A future that results in a list containing FetchTask |
| instances for each of the files referenced by an ebuild. |
| @rtype: asyncio.Future (or compatible) |
| """ |
| result = loop.create_future() |
| fetch_tasks = [] |
| |
| def aux_get_done(gather_result): |
| # All exceptions must be consumed from gather_result before this |
| # function returns, in order to avoid triggering the event loop's |
| # exception handler. |
| if not gather_result.cancelled(): |
| list(future.exception() for future in gather_result.result() |
| if not future.cancelled()) |
| else: |
| result.cancel() |
| |
| if result.cancelled(): |
| return |
| |
| aux_get_result, fetch_map_result = gather_result.result() |
| if aux_get_result.cancelled() or fetch_map_result.cancelled(): |
| # Cancel result after consuming any exceptions which |
| # are now irrelevant due to cancellation. |
| aux_get_result.cancelled() or aux_get_result.exception() |
| fetch_map_result.cancelled() or fetch_map_result.exception() |
| result.cancel() |
| return |
| |
| try: |
| restrict, = aux_get_result.result() |
| except (PortageKeyError, PortageException) as e: |
| config.log_failure("%s\t\taux_get exception %s" % |
| (cpv, e)) |
| result.set_result(fetch_tasks) |
| return |
| |
| # Here we use matchnone=True to ignore conditional parts |
| # of RESTRICT since they don't apply unconditionally. |
| # Assume such conditionals only apply on the client side. |
| try: |
| restrict = frozenset(use_reduce(restrict, |
| flat=True, matchnone=True)) |
| except PortageException as e: |
| config.log_failure("%s\t\tuse_reduce exception %s" % |
| (cpv, e)) |
| result.set_result(fetch_tasks) |
| return |
| |
| if "fetch" in restrict: |
| result.set_result(fetch_tasks) |
| return |
| |
| try: |
| uri_map = fetch_map_result.result() |
| except PortageException as e: |
| config.log_failure("%s\t\tgetFetchMap exception %s" % |
| (cpv, e)) |
| result.set_result(fetch_tasks) |
| return |
| |
| if not uri_map: |
| result.set_result(fetch_tasks) |
| return |
| |
| if "mirror" in restrict: |
| skip = False |
| if config.restrict_mirror_exemptions is not None: |
| new_uri_map = {} |
| for filename, uri_tuple in uri_map.items(): |
| for uri in uri_tuple: |
| if uri[:9] == "mirror://": |
| i = uri.find("/", 9) |
| if i != -1 and uri[9:i].strip("/") in \ |
| config.restrict_mirror_exemptions: |
| new_uri_map[filename] = uri_tuple |
| break |
| if new_uri_map: |
| uri_map = new_uri_map |
| else: |
| skip = True |
| else: |
| skip = True |
| |
| if skip: |
| result.set_result(fetch_tasks) |
| return |
| |
| # Parse Manifest for this cp if we haven't yet. |
| try: |
| if digests_future.done(): |
| # If there's an exception then raise it. |
| digests = digests_future.result() |
| else: |
| digests = repo_config.load_manifest( |
| os.path.join(repo_config.location, cpv.cp)).\ |
| getTypeDigests("DIST") |
| except (EnvironmentError, PortageException) as e: |
| digests_future.done() or digests_future.set_exception(e) |
| for filename in uri_map: |
| config.log_failure( |
| "%s\t%s\tManifest exception %s" % |
| (cpv, filename, e)) |
| config.file_failures[filename] = cpv |
| result.set_result(fetch_tasks) |
| return |
| else: |
| digests_future.done() or digests_future.set_result(digests) |
| |
| if not digests: |
| for filename in uri_map: |
| config.log_failure("%s\t%s\tdigest entry missing" % |
| (cpv, filename)) |
| config.file_failures[filename] = cpv |
| result.set_result(fetch_tasks) |
| return |
| |
| for filename, uri_tuple in uri_map.items(): |
| file_digests = digests.get(filename) |
| if file_digests is None: |
| config.log_failure("%s\t%s\tdigest entry missing" % |
| (cpv, filename)) |
| config.file_failures[filename] = cpv |
| continue |
| if filename in config.file_owners: |
| continue |
| config.file_owners[filename] = cpv |
| |
| file_digests = \ |
| _filter_unaccelarated_hashes(file_digests) |
| if hash_filter is not None: |
| file_digests = _apply_hash_filter( |
| file_digests, hash_filter) |
| |
| fetch_tasks.append(FetchTask( |
| cpv=cpv, |
| background=True, |
| digests=file_digests, |
| distfile=filename, |
| restrict=restrict, |
| uri_tuple=uri_tuple, |
| config=config)) |
| |
| result.set_result(fetch_tasks) |
| |
| def future_generator(): |
| yield config.portdb.async_aux_get(cpv, ("RESTRICT",), |
| myrepo=repo_config.name, loop=loop) |
| yield config.portdb.async_fetch_map(cpv, |
| mytree=repo_config.location, loop=loop) |
| |
| # Use iter_gather(max_jobs=1) to limit the number of processes per |
| # _EbuildFetchTask instance, and also to avoid spawning two bash |
| # processes for the same cpv simultaneously (the second one can |
| # use metadata cached by the first one). |
| gather_result = iter_gather( |
| future_generator(), |
| max_jobs=1, |
| loop=loop, |
| ) |
| gather_result.add_done_callback(aux_get_done) |
| result.add_done_callback(lambda result: |
| gather_result.cancel() if result.cancelled() and |
| not gather_result.done() else None) |
| |
| return result |