blob: 571caa52da74dd0e5c36c640db9cc3545c319a99 [file] [log] [blame]
# Copyright 2013-2014 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
import errno
import logging
import sys
import time
try:
import threading
except ImportError:
import dummy_threading as threading
import portage
from portage import os
from portage.util._async.TaskScheduler import TaskScheduler
from _emerge.CompositeTask import CompositeTask
from .FetchIterator import FetchIterator
from .DeletionIterator import DeletionIterator
if sys.hexversion >= 0x3000000:
# pylint: disable=W0622
long = int
class MirrorDistTask(CompositeTask):
__slots__ = ('_config', '_terminated', '_term_check_id')
def __init__(self, config):
CompositeTask.__init__(self, scheduler=config.event_loop)
self._config = config
self._terminated = threading.Event()
def _start(self):
self._term_check_id = self.scheduler.idle_add(self._termination_check)
fetch = TaskScheduler(iter(FetchIterator(self._config)),
max_jobs=self._config.options.jobs,
max_load=self._config.options.load_average,
event_loop=self._config.event_loop)
self._start_task(fetch, self._fetch_exit)
def _fetch_exit(self, fetch):
self._assert_current(fetch)
if self._was_cancelled():
self.wait()
return
if self._config.options.delete:
deletion = TaskScheduler(iter(DeletionIterator(self._config)),
max_jobs=self._config.options.jobs,
max_load=self._config.options.load_average,
event_loop=self._config.event_loop)
self._start_task(deletion, self._deletion_exit)
return
self._post_deletion()
def _deletion_exit(self, deletion):
self._assert_current(deletion)
if self._was_cancelled():
self.wait()
return
self._post_deletion()
def _post_deletion(self):
if self._config.options.recycle_db is not None:
self._update_recycle_db()
if self._config.options.scheduled_deletion_log is not None:
self._scheduled_deletion_log()
self._summary()
self.returncode = os.EX_OK
self._current_task = None
self.wait()
def _update_recycle_db(self):
start_time = self._config.start_time
recycle_dir = self._config.options.recycle_dir
recycle_db = self._config.recycle_db
r_deletion_delay = self._config.options.recycle_deletion_delay
# Use a dict optimize access.
recycle_db_cache = dict(recycle_db.items())
for filename in os.listdir(recycle_dir):
recycle_file = os.path.join(recycle_dir, filename)
try:
st = os.stat(recycle_file)
except OSError as e:
if e.errno not in (errno.ENOENT, errno.ESTALE):
logging.error(("stat failed for '%s' in "
"recycle: %s") % (filename, e))
continue
value = recycle_db_cache.pop(filename, None)
if value is None:
logging.debug(("add '%s' to "
"recycle db") % filename)
recycle_db[filename] = (st.st_size, start_time)
else:
r_size, r_time = value
if long(r_size) != st.st_size:
recycle_db[filename] = (st.st_size, start_time)
elif r_time + r_deletion_delay < start_time:
if self._config.options.dry_run:
logging.info(("dry-run: delete '%s' from "
"recycle") % filename)
logging.info(("drop '%s' from "
"recycle db") % filename)
else:
try:
os.unlink(recycle_file)
except OSError as e:
if e.errno not in (errno.ENOENT, errno.ESTALE):
logging.error(("delete '%s' from "
"recycle failed: %s") % (filename, e))
else:
logging.debug(("delete '%s' from "
"recycle") % filename)
try:
del recycle_db[filename]
except KeyError:
pass
else:
logging.debug(("drop '%s' from "
"recycle db") % filename)
# Existing files were popped from recycle_db_cache,
# so any remaining entries are for files that no
# longer exist.
for filename in recycle_db_cache:
try:
del recycle_db[filename]
except KeyError:
pass
else:
logging.debug(("drop non-existent '%s' from "
"recycle db") % filename)
def _scheduled_deletion_log(self):
start_time = self._config.start_time
dry_run = self._config.options.dry_run
deletion_delay = self._config.options.deletion_delay
distfiles_db = self._config.distfiles_db
date_map = {}
for filename, timestamp in self._config.deletion_db.items():
date = timestamp + deletion_delay
if date < start_time:
date = start_time
date = time.strftime("%Y-%m-%d", time.gmtime(date))
date_files = date_map.get(date)
if date_files is None:
date_files = []
date_map[date] = date_files
date_files.append(filename)
if dry_run:
logging.warn(("dry-run: scheduled-deletions log "
"will be summarized via logging.info"))
lines = []
for date in sorted(date_map):
date_files = date_map[date]
if dry_run:
logging.info(("dry-run: scheduled deletions for %s: %s files") %
(date, len(date_files)))
lines.append("%s\n" % date)
for filename in date_files:
cpv = "unknown"
if distfiles_db is not None:
cpv = distfiles_db.get(filename, cpv)
lines.append("\t%s\t%s\n" % (filename, cpv))
if not dry_run:
portage.util.write_atomic(
self._config.options.scheduled_deletion_log,
"".join(lines))
def _summary(self):
elapsed_time = time.time() - self._config.start_time
fail_count = len(self._config.file_failures)
delete_count = self._config.delete_count
scheduled_deletion_count = self._config.scheduled_deletion_count - delete_count
added_file_count = self._config.added_file_count
added_byte_count = self._config.added_byte_count
logging.info("finished in %i seconds" % elapsed_time)
logging.info("failed to fetch %i files" % fail_count)
logging.info("deleted %i files" % delete_count)
logging.info("deletion of %i files scheduled" %
scheduled_deletion_count)
logging.info("added %i files" % added_file_count)
logging.info("added %i bytes total" % added_byte_count)
def terminate(self):
self._terminated.set()
def _termination_check(self):
if self._terminated.is_set():
self.cancel()
self.wait()
return True
def _wait(self):
CompositeTask._wait(self)
if self._term_check_id is not None:
self.scheduler.source_remove(self._term_check_id)
self._term_check_id = None