blob: 8bb8f5fdadcd4413c249a0c85765dd604198a4d4 [file] [log] [blame]
# Copyright 1999-2012 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
import sys
import portage
from portage import os
from portage import digraph
from portage._sets.base import InternalPackageSet
from _emerge.BlockerCache import BlockerCache
from _emerge.Package import Package
from _emerge.show_invalid_depstring_notice import show_invalid_depstring_notice
if sys.hexversion >= 0x3000000:
long = int
class BlockerDB(object):
def __init__(self, fake_vartree):
root_config = fake_vartree._root_config
self._root_config = root_config
self._vartree = root_config.trees["vartree"]
self._portdb = root_config.trees["porttree"].dbapi
self._dep_check_trees = None
self._fake_vartree = fake_vartree
self._dep_check_trees = {
self._vartree.settings["EROOT"] : {
"porttree" : fake_vartree,
"vartree" : fake_vartree,
}}
def findInstalledBlockers(self, new_pkg):
"""
Search for installed run-time blockers in the root where
new_pkg is planned to be installed. This ignores build-time
blockers, since new_pkg is assumed to be built already.
"""
blocker_cache = BlockerCache(None,
self._vartree.dbapi)
dep_keys = Package._runtime_keys
settings = self._vartree.settings
stale_cache = set(blocker_cache)
fake_vartree = self._fake_vartree
dep_check_trees = self._dep_check_trees
vardb = fake_vartree.dbapi
installed_pkgs = list(vardb)
for inst_pkg in installed_pkgs:
stale_cache.discard(inst_pkg.cpv)
cached_blockers = blocker_cache.get(inst_pkg.cpv)
if cached_blockers is not None and \
cached_blockers.counter != inst_pkg.counter:
cached_blockers = None
if cached_blockers is not None:
blocker_atoms = cached_blockers.atoms
else:
# Use aux_get() to trigger FakeVartree global
# updates on *DEPEND when appropriate.
depstr = " ".join(vardb.aux_get(inst_pkg.cpv, dep_keys))
success, atoms = portage.dep_check(depstr,
vardb, settings, myuse=inst_pkg.use.enabled,
trees=dep_check_trees, myroot=inst_pkg.root)
if not success:
pkg_location = os.path.join(inst_pkg.root,
portage.VDB_PATH, inst_pkg.category, inst_pkg.pf)
portage.writemsg("!!! %s/*DEPEND: %s\n" % \
(pkg_location, atoms), noiselevel=-1)
continue
blocker_atoms = [atom for atom in atoms \
if atom.startswith("!")]
blocker_atoms.sort()
blocker_cache[inst_pkg.cpv] = \
blocker_cache.BlockerData(inst_pkg.counter, blocker_atoms)
for cpv in stale_cache:
del blocker_cache[cpv]
blocker_cache.flush()
blocker_parents = digraph()
blocker_atoms = []
for pkg in installed_pkgs:
for blocker_atom in blocker_cache[pkg.cpv].atoms:
blocker_atom = blocker_atom.lstrip("!")
blocker_atoms.append(blocker_atom)
blocker_parents.add(blocker_atom, pkg)
blocker_atoms = InternalPackageSet(initial_atoms=blocker_atoms)
blocking_pkgs = set()
for atom in blocker_atoms.iterAtomsForPackage(new_pkg):
blocking_pkgs.update(blocker_parents.parent_nodes(atom))
# Check for blockers in the other direction.
depstr = " ".join(new_pkg._metadata[k] for k in dep_keys)
success, atoms = portage.dep_check(depstr,
vardb, settings, myuse=new_pkg.use.enabled,
trees=dep_check_trees, myroot=new_pkg.root)
if not success:
# We should never get this far with invalid deps.
show_invalid_depstring_notice(new_pkg, depstr, atoms)
assert False
blocker_atoms = [atom.lstrip("!") for atom in atoms \
if atom[:1] == "!"]
if blocker_atoms:
blocker_atoms = InternalPackageSet(initial_atoms=blocker_atoms)
for inst_pkg in installed_pkgs:
try:
next(blocker_atoms.iterAtomsForPackage(inst_pkg))
except (portage.exception.InvalidDependString, StopIteration):
continue
blocking_pkgs.add(inst_pkg)
return blocking_pkgs
def discardBlocker(self, pkg):
"""Discard a package from the list of potential blockers.
This will match any package(s) with identical cpv or cp:slot."""
for cpv_match in self._fake_vartree.dbapi.match_pkgs("=%s" % (pkg.cpv,)):
if cpv_match.cp == pkg.cp:
self._fake_vartree.cpv_discard(cpv_match)
for slot_match in self._fake_vartree.dbapi.match_pkgs(pkg.slot_atom):
if slot_match.cp == pkg.cp:
self._fake_vartree.cpv_discard(slot_match)