| # Copyright 2013-2014 Gentoo Foundation |
| # Distributed under the terms of the GNU General Public License v2 |
| |
| import errno |
| import logging |
| import sys |
| import time |
| |
| try: |
| import threading |
| except ImportError: |
| import dummy_threading as threading |
| |
| import portage |
| from portage import os |
| from portage.util._async.TaskScheduler import TaskScheduler |
| from _emerge.CompositeTask import CompositeTask |
| from .FetchIterator import FetchIterator |
| from .DeletionIterator import DeletionIterator |
| |
| if sys.hexversion >= 0x3000000: |
| # pylint: disable=W0622 |
| long = int |
| |
| class MirrorDistTask(CompositeTask): |
| |
| __slots__ = ('_config', '_terminated', '_term_check_id') |
| |
| def __init__(self, config): |
| CompositeTask.__init__(self, scheduler=config.event_loop) |
| self._config = config |
| self._terminated = threading.Event() |
| |
| def _start(self): |
| self._term_check_id = self.scheduler.idle_add(self._termination_check) |
| fetch = TaskScheduler(iter(FetchIterator(self._config)), |
| max_jobs=self._config.options.jobs, |
| max_load=self._config.options.load_average, |
| event_loop=self._config.event_loop) |
| self._start_task(fetch, self._fetch_exit) |
| |
| def _fetch_exit(self, fetch): |
| |
| self._assert_current(fetch) |
| if self._was_cancelled(): |
| self.wait() |
| return |
| |
| if self._config.options.delete: |
| deletion = TaskScheduler(iter(DeletionIterator(self._config)), |
| max_jobs=self._config.options.jobs, |
| max_load=self._config.options.load_average, |
| event_loop=self._config.event_loop) |
| self._start_task(deletion, self._deletion_exit) |
| return |
| |
| self._post_deletion() |
| |
| def _deletion_exit(self, deletion): |
| |
| self._assert_current(deletion) |
| if self._was_cancelled(): |
| self.wait() |
| return |
| |
| self._post_deletion() |
| |
| def _post_deletion(self): |
| |
| if self._config.options.recycle_db is not None: |
| self._update_recycle_db() |
| |
| if self._config.options.scheduled_deletion_log is not None: |
| self._scheduled_deletion_log() |
| |
| self._summary() |
| |
| self.returncode = os.EX_OK |
| self._current_task = None |
| self.wait() |
| |
| def _update_recycle_db(self): |
| |
| start_time = self._config.start_time |
| recycle_dir = self._config.options.recycle_dir |
| recycle_db = self._config.recycle_db |
| r_deletion_delay = self._config.options.recycle_deletion_delay |
| |
| # Use a dict optimize access. |
| recycle_db_cache = dict(recycle_db.items()) |
| |
| for filename in os.listdir(recycle_dir): |
| |
| recycle_file = os.path.join(recycle_dir, filename) |
| |
| try: |
| st = os.stat(recycle_file) |
| except OSError as e: |
| if e.errno not in (errno.ENOENT, errno.ESTALE): |
| logging.error(("stat failed for '%s' in " |
| "recycle: %s") % (filename, e)) |
| continue |
| |
| value = recycle_db_cache.pop(filename, None) |
| if value is None: |
| logging.debug(("add '%s' to " |
| "recycle db") % filename) |
| recycle_db[filename] = (st.st_size, start_time) |
| else: |
| r_size, r_time = value |
| if long(r_size) != st.st_size: |
| recycle_db[filename] = (st.st_size, start_time) |
| elif r_time + r_deletion_delay < start_time: |
| if self._config.options.dry_run: |
| logging.info(("dry-run: delete '%s' from " |
| "recycle") % filename) |
| logging.info(("drop '%s' from " |
| "recycle db") % filename) |
| else: |
| try: |
| os.unlink(recycle_file) |
| except OSError as e: |
| if e.errno not in (errno.ENOENT, errno.ESTALE): |
| logging.error(("delete '%s' from " |
| "recycle failed: %s") % (filename, e)) |
| else: |
| logging.debug(("delete '%s' from " |
| "recycle") % filename) |
| try: |
| del recycle_db[filename] |
| except KeyError: |
| pass |
| else: |
| logging.debug(("drop '%s' from " |
| "recycle db") % filename) |
| |
| # Existing files were popped from recycle_db_cache, |
| # so any remaining entries are for files that no |
| # longer exist. |
| for filename in recycle_db_cache: |
| try: |
| del recycle_db[filename] |
| except KeyError: |
| pass |
| else: |
| logging.debug(("drop non-existent '%s' from " |
| "recycle db") % filename) |
| |
| def _scheduled_deletion_log(self): |
| |
| start_time = self._config.start_time |
| dry_run = self._config.options.dry_run |
| deletion_delay = self._config.options.deletion_delay |
| distfiles_db = self._config.distfiles_db |
| |
| date_map = {} |
| for filename, timestamp in self._config.deletion_db.items(): |
| date = timestamp + deletion_delay |
| if date < start_time: |
| date = start_time |
| date = time.strftime("%Y-%m-%d", time.gmtime(date)) |
| date_files = date_map.get(date) |
| if date_files is None: |
| date_files = [] |
| date_map[date] = date_files |
| date_files.append(filename) |
| |
| if dry_run: |
| logging.warn(("dry-run: scheduled-deletions log " |
| "will be summarized via logging.info")) |
| |
| lines = [] |
| for date in sorted(date_map): |
| date_files = date_map[date] |
| if dry_run: |
| logging.info(("dry-run: scheduled deletions for %s: %s files") % |
| (date, len(date_files))) |
| lines.append("%s\n" % date) |
| for filename in date_files: |
| cpv = "unknown" |
| if distfiles_db is not None: |
| cpv = distfiles_db.get(filename, cpv) |
| lines.append("\t%s\t%s\n" % (filename, cpv)) |
| |
| if not dry_run: |
| portage.util.write_atomic( |
| self._config.options.scheduled_deletion_log, |
| "".join(lines)) |
| |
| def _summary(self): |
| elapsed_time = time.time() - self._config.start_time |
| fail_count = len(self._config.file_failures) |
| delete_count = self._config.delete_count |
| scheduled_deletion_count = self._config.scheduled_deletion_count - delete_count |
| added_file_count = self._config.added_file_count |
| added_byte_count = self._config.added_byte_count |
| |
| logging.info("finished in %i seconds" % elapsed_time) |
| logging.info("failed to fetch %i files" % fail_count) |
| logging.info("deleted %i files" % delete_count) |
| logging.info("deletion of %i files scheduled" % |
| scheduled_deletion_count) |
| logging.info("added %i files" % added_file_count) |
| logging.info("added %i bytes total" % added_byte_count) |
| |
| def terminate(self): |
| self._terminated.set() |
| |
| def _termination_check(self): |
| if self._terminated.is_set(): |
| self.cancel() |
| self.wait() |
| return True |
| |
| def _wait(self): |
| CompositeTask._wait(self) |
| if self._term_check_id is not None: |
| self.scheduler.source_remove(self._term_check_id) |
| self._term_check_id = None |