blob: 743160c42ae5ae50f4e258877d6b5782df388fc1 [file] [log] [blame]
# Copyright 2010-2012 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
__all__ = (
'UseManager',
)
from _emerge.Package import Package
from portage import os
from portage.dep import Atom, dep_getrepo, dep_getslot, ExtendedAtomDict, remove_slot, _get_useflag_re, _repo_separator
from portage.eapi import eapi_has_use_aliases, eapi_supports_stable_use_forcing_and_masking
from portage.exception import InvalidAtom
from portage.localization import _
from portage.util import grabfile, grabdict, grabdict_package, read_corresponding_eapi_file, stack_lists, writemsg
from portage.versions import _pkg_str
from portage.package.ebuild._config.helper import ordered_by_atom_specificity
class UseManager(object):
def __init__(self, repositories, profiles, abs_user_config, is_stable,
user_config=True):
# file variable
#--------------------------------
# repositories
#--------------------------------
# use.mask _repo_usemask_dict
# use.stable.mask _repo_usestablemask_dict
# use.force _repo_useforce_dict
# use.stable.force _repo_usestableforce_dict
# use.aliases _repo_usealiases_dict
# package.use.mask _repo_pusemask_dict
# package.use.stable.mask _repo_pusestablemask_dict
# package.use.force _repo_puseforce_dict
# package.use.stable.force _repo_pusestableforce_dict
# package.use.aliases _repo_pusealiases_dict
#--------------------------------
# profiles
#--------------------------------
# use.mask _usemask_list
# use.stable.mask _usestablemask_list
# use.force _useforce_list
# use.stable.force _usestableforce_list
# package.use.mask _pusemask_list
# package.use.stable.mask _pusestablemask_list
# package.use _pkgprofileuse
# package.use.force _puseforce_list
# package.use.stable.force _pusestableforce_list
#--------------------------------
# user config
#--------------------------------
# package.use _pusedict
# Dynamic variables tracked by the config class
#--------------------------------
# profiles
#--------------------------------
# usemask
# useforce
#--------------------------------
# user config
#--------------------------------
# puse
self._user_config = user_config
self._is_stable = is_stable
self._repo_usemask_dict = self._parse_repository_files_to_dict_of_tuples("use.mask", repositories)
self._repo_usestablemask_dict = \
self._parse_repository_files_to_dict_of_tuples("use.stable.mask",
repositories, eapi_filter=eapi_supports_stable_use_forcing_and_masking)
self._repo_useforce_dict = self._parse_repository_files_to_dict_of_tuples("use.force", repositories)
self._repo_usestableforce_dict = \
self._parse_repository_files_to_dict_of_tuples("use.stable.force",
repositories, eapi_filter=eapi_supports_stable_use_forcing_and_masking)
self._repo_pusemask_dict = self._parse_repository_files_to_dict_of_dicts("package.use.mask", repositories)
self._repo_pusestablemask_dict = \
self._parse_repository_files_to_dict_of_dicts("package.use.stable.mask",
repositories, eapi_filter=eapi_supports_stable_use_forcing_and_masking)
self._repo_puseforce_dict = self._parse_repository_files_to_dict_of_dicts("package.use.force", repositories)
self._repo_pusestableforce_dict = \
self._parse_repository_files_to_dict_of_dicts("package.use.stable.force",
repositories, eapi_filter=eapi_supports_stable_use_forcing_and_masking)
self._repo_puse_dict = self._parse_repository_files_to_dict_of_dicts("package.use", repositories)
self._usemask_list = self._parse_profile_files_to_tuple_of_tuples("use.mask", profiles)
self._usestablemask_list = \
self._parse_profile_files_to_tuple_of_tuples("use.stable.mask",
profiles, eapi_filter=eapi_supports_stable_use_forcing_and_masking)
self._useforce_list = self._parse_profile_files_to_tuple_of_tuples("use.force", profiles)
self._usestableforce_list = \
self._parse_profile_files_to_tuple_of_tuples("use.stable.force",
profiles, eapi_filter=eapi_supports_stable_use_forcing_and_masking)
self._pusemask_list = self._parse_profile_files_to_tuple_of_dicts("package.use.mask", profiles)
self._pusestablemask_list = \
self._parse_profile_files_to_tuple_of_dicts("package.use.stable.mask",
profiles, eapi_filter=eapi_supports_stable_use_forcing_and_masking)
self._pkgprofileuse = self._parse_profile_files_to_tuple_of_dicts("package.use", profiles, juststrings=True)
self._puseforce_list = self._parse_profile_files_to_tuple_of_dicts("package.use.force", profiles)
self._pusestableforce_list = \
self._parse_profile_files_to_tuple_of_dicts("package.use.stable.force",
profiles, eapi_filter=eapi_supports_stable_use_forcing_and_masking)
self._pusedict = self._parse_user_files_to_extatomdict("package.use", abs_user_config, user_config)
self._repo_usealiases_dict = self._parse_repository_usealiases(repositories)
self._repo_pusealiases_dict = self._parse_repository_packageusealiases(repositories)
self.repositories = repositories
def _parse_file_to_tuple(self, file_name, recursive=True, eapi_filter=None):
ret = []
lines = grabfile(file_name, recursive=recursive)
eapi = read_corresponding_eapi_file(file_name)
if eapi_filter is not None and not eapi_filter(eapi):
if lines:
writemsg(_("--- EAPI '%s' does not support '%s': '%s'\n") %
(eapi, os.path.basename(file_name), file_name),
noiselevel=-1)
return ()
useflag_re = _get_useflag_re(eapi)
for prefixed_useflag in lines:
if prefixed_useflag[:1] == "-":
useflag = prefixed_useflag[1:]
else:
useflag = prefixed_useflag
if useflag_re.match(useflag) is None:
writemsg(_("--- Invalid USE flag in '%s': '%s'\n") %
(file_name, prefixed_useflag), noiselevel=-1)
else:
ret.append(prefixed_useflag)
return tuple(ret)
def _parse_file_to_dict(self, file_name, juststrings=False, recursive=True,
eapi_filter=None, user_config=False):
ret = {}
location_dict = {}
eapi = read_corresponding_eapi_file(file_name, default=None)
if eapi is None and not user_config:
eapi = "0"
if eapi is None:
ret = ExtendedAtomDict(dict)
else:
ret = {}
file_dict = grabdict_package(file_name, recursive=recursive,
allow_wildcard=(eapi is None), allow_repo=(eapi is None),
verify_eapi=(eapi is not None))
if eapi is not None and eapi_filter is not None and not eapi_filter(eapi):
if file_dict:
writemsg(_("--- EAPI '%s' does not support '%s': '%s'\n") %
(eapi, os.path.basename(file_name), file_name),
noiselevel=-1)
return ret
useflag_re = _get_useflag_re(eapi)
for k, v in file_dict.items():
useflags = []
for prefixed_useflag in v:
if prefixed_useflag[:1] == "-":
useflag = prefixed_useflag[1:]
else:
useflag = prefixed_useflag
if useflag_re.match(useflag) is None:
writemsg(_("--- Invalid USE flag for '%s' in '%s': '%s'\n") %
(k, file_name, prefixed_useflag), noiselevel=-1)
else:
useflags.append(prefixed_useflag)
location_dict.setdefault(k, []).extend(useflags)
for k, v in location_dict.items():
if juststrings:
v = " ".join(v)
else:
v = tuple(v)
ret.setdefault(k.cp, {})[k] = v
return ret
def _parse_user_files_to_extatomdict(self, file_name, location, user_config):
ret = ExtendedAtomDict(dict)
if user_config:
pusedict = grabdict_package(
os.path.join(location, file_name), recursive=1, allow_wildcard=True, allow_repo=True, verify_eapi=False)
for k, v in pusedict.items():
ret.setdefault(k.cp, {})[k] = tuple(v)
return ret
def _parse_repository_files_to_dict_of_tuples(self, file_name, repositories, eapi_filter=None):
ret = {}
for repo in repositories.repos_with_profiles():
ret[repo.name] = self._parse_file_to_tuple(os.path.join(repo.location, "profiles", file_name), eapi_filter=eapi_filter)
return ret
def _parse_repository_files_to_dict_of_dicts(self, file_name, repositories, eapi_filter=None):
ret = {}
for repo in repositories.repos_with_profiles():
ret[repo.name] = self._parse_file_to_dict(os.path.join(repo.location, "profiles", file_name), eapi_filter=eapi_filter)
return ret
def _parse_profile_files_to_tuple_of_tuples(self, file_name, locations,
eapi_filter=None):
return tuple(self._parse_file_to_tuple(
os.path.join(profile.location, file_name),
recursive=profile.portage1_directories, eapi_filter=eapi_filter)
for profile in locations)
def _parse_profile_files_to_tuple_of_dicts(self, file_name, locations,
juststrings=False, eapi_filter=None):
return tuple(self._parse_file_to_dict(
os.path.join(profile.location, file_name), juststrings,
recursive=profile.portage1_directories, eapi_filter=eapi_filter,
user_config=profile.user_config)
for profile in locations)
def _parse_repository_usealiases(self, repositories):
ret = {}
for repo in repositories.repos_with_profiles():
file_name = os.path.join(repo.location, "profiles", "use.aliases")
eapi = read_corresponding_eapi_file(file_name)
useflag_re = _get_useflag_re(eapi)
raw_file_dict = grabdict(file_name, recursive=True)
file_dict = {}
for real_flag, aliases in raw_file_dict.items():
if useflag_re.match(real_flag) is None:
writemsg(_("--- Invalid real USE flag in '%s': '%s'\n") % (file_name, real_flag), noiselevel=-1)
else:
for alias in aliases:
if useflag_re.match(alias) is None:
writemsg(_("--- Invalid USE flag alias for '%s' real USE flag in '%s': '%s'\n") %
(real_flag, file_name, alias), noiselevel=-1)
else:
if any(alias in v for k, v in file_dict.items() if k != real_flag):
writemsg(_("--- Duplicated USE flag alias in '%s': '%s'\n") %
(file_name, alias), noiselevel=-1)
else:
file_dict.setdefault(real_flag, []).append(alias)
ret[repo.name] = file_dict
return ret
def _parse_repository_packageusealiases(self, repositories):
ret = {}
for repo in repositories.repos_with_profiles():
file_name = os.path.join(repo.location, "profiles", "package.use.aliases")
eapi = read_corresponding_eapi_file(file_name)
useflag_re = _get_useflag_re(eapi)
lines = grabfile(file_name, recursive=True)
file_dict = {}
for line in lines:
elements = line.split()
atom = elements[0]
try:
atom = Atom(atom, eapi=eapi)
except InvalidAtom:
writemsg(_("--- Invalid atom in '%s': '%s'\n") % (file_name, atom))
continue
if len(elements) == 1:
writemsg(_("--- Missing real USE flag for '%s' in '%s'\n") % (atom, file_name), noiselevel=-1)
continue
real_flag = elements[1]
if useflag_re.match(real_flag) is None:
writemsg(_("--- Invalid real USE flag for '%s' in '%s': '%s'\n") % (atom, file_name, real_flag), noiselevel=-1)
else:
for alias in elements[2:]:
if useflag_re.match(alias) is None:
writemsg(_("--- Invalid USE flag alias for '%s' real USE flag for '%s' in '%s': '%s'\n") %
(real_flag, atom, file_name, alias), noiselevel=-1)
else:
# Duplicated USE flag aliases in entries for different atoms
# matching the same package version are detected in getUseAliases().
if any(alias in v for k, v in file_dict.get(atom.cp, {}).get(atom, {}).items() if k != real_flag):
writemsg(_("--- Duplicated USE flag alias for '%s' in '%s': '%s'\n") %
(atom, file_name, alias), noiselevel=-1)
else:
file_dict.setdefault(atom.cp, {}).setdefault(atom, {}).setdefault(real_flag, []).append(alias)
ret[repo.name] = file_dict
return ret
def _isStable(self, pkg):
if self._user_config:
try:
return pkg.stable
except AttributeError:
# KEYWORDS is unavailable (prior to "depend" phase)
return False
try:
pkg._metadata
except AttributeError:
# KEYWORDS is unavailable (prior to "depend" phase)
return False
# Since repoman uses different config instances for
# different profiles, we have to be careful to do the
# stable check against the correct profile here.
return self._is_stable(pkg)
def getUseMask(self, pkg=None):
if pkg is None:
return frozenset(stack_lists(
self._usemask_list, incremental=True))
slot = None
cp = getattr(pkg, "cp", None)
if cp is None:
slot = dep_getslot(pkg)
repo = dep_getrepo(pkg)
pkg = _pkg_str(remove_slot(pkg), slot=slot, repo=repo)
cp = pkg.cp
stable = self._isStable(pkg)
usemask = []
if hasattr(pkg, "repo") and pkg.repo != Package.UNKNOWN_REPO:
repos = []
try:
repos.extend(repo.name for repo in
self.repositories[pkg.repo].masters)
except KeyError:
pass
repos.append(pkg.repo)
for repo in repos:
usemask.append(self._repo_usemask_dict.get(repo, {}))
if stable:
usemask.append(self._repo_usestablemask_dict.get(repo, {}))
cpdict = self._repo_pusemask_dict.get(repo, {}).get(cp)
if cpdict:
pkg_usemask = ordered_by_atom_specificity(cpdict, pkg)
if pkg_usemask:
usemask.extend(pkg_usemask)
if stable:
cpdict = self._repo_pusestablemask_dict.get(repo, {}).get(cp)
if cpdict:
pkg_usemask = ordered_by_atom_specificity(cpdict, pkg)
if pkg_usemask:
usemask.extend(pkg_usemask)
for i, pusemask_dict in enumerate(self._pusemask_list):
if self._usemask_list[i]:
usemask.append(self._usemask_list[i])
if stable and self._usestablemask_list[i]:
usemask.append(self._usestablemask_list[i])
cpdict = pusemask_dict.get(cp)
if cpdict:
pkg_usemask = ordered_by_atom_specificity(cpdict, pkg)
if pkg_usemask:
usemask.extend(pkg_usemask)
if stable:
cpdict = self._pusestablemask_list[i].get(cp)
if cpdict:
pkg_usemask = ordered_by_atom_specificity(cpdict, pkg)
if pkg_usemask:
usemask.extend(pkg_usemask)
return frozenset(stack_lists(usemask, incremental=True))
def getUseForce(self, pkg=None):
if pkg is None:
return frozenset(stack_lists(
self._useforce_list, incremental=True))
cp = getattr(pkg, "cp", None)
if cp is None:
slot = dep_getslot(pkg)
repo = dep_getrepo(pkg)
pkg = _pkg_str(remove_slot(pkg), slot=slot, repo=repo)
cp = pkg.cp
stable = self._isStable(pkg)
useforce = []
if hasattr(pkg, "repo") and pkg.repo != Package.UNKNOWN_REPO:
repos = []
try:
repos.extend(repo.name for repo in
self.repositories[pkg.repo].masters)
except KeyError:
pass
repos.append(pkg.repo)
for repo in repos:
useforce.append(self._repo_useforce_dict.get(repo, {}))
if stable:
useforce.append(self._repo_usestableforce_dict.get(repo, {}))
cpdict = self._repo_puseforce_dict.get(repo, {}).get(cp)
if cpdict:
pkg_useforce = ordered_by_atom_specificity(cpdict, pkg)
if pkg_useforce:
useforce.extend(pkg_useforce)
if stable:
cpdict = self._repo_pusestableforce_dict.get(repo, {}).get(cp)
if cpdict:
pkg_useforce = ordered_by_atom_specificity(cpdict, pkg)
if pkg_useforce:
useforce.extend(pkg_useforce)
for i, puseforce_dict in enumerate(self._puseforce_list):
if self._useforce_list[i]:
useforce.append(self._useforce_list[i])
if stable and self._usestableforce_list[i]:
useforce.append(self._usestableforce_list[i])
cpdict = puseforce_dict.get(cp)
if cpdict:
pkg_useforce = ordered_by_atom_specificity(cpdict, pkg)
if pkg_useforce:
useforce.extend(pkg_useforce)
if stable:
cpdict = self._pusestableforce_list[i].get(cp)
if cpdict:
pkg_useforce = ordered_by_atom_specificity(cpdict, pkg)
if pkg_useforce:
useforce.extend(pkg_useforce)
return frozenset(stack_lists(useforce, incremental=True))
def getUseAliases(self, pkg):
if not eapi_has_use_aliases(pkg.eapi):
return {}
cp = getattr(pkg, "cp", None)
if cp is None:
slot = dep_getslot(pkg)
repo = dep_getrepo(pkg)
pkg = _pkg_str(remove_slot(pkg), slot=slot, repo=repo)
cp = pkg.cp
usealiases = {}
if hasattr(pkg, "repo") and pkg.repo != Package.UNKNOWN_REPO:
repos = []
try:
repos.extend(repo.name for repo in
self.repositories[pkg.repo].masters)
except KeyError:
pass
repos.append(pkg.repo)
for repo in repos:
usealiases_dict = self._repo_usealiases_dict.get(repo, {})
for real_flag, aliases in usealiases_dict.items():
for alias in aliases:
if any(alias in v for k, v in usealiases.items() if k != real_flag):
writemsg(_("--- Duplicated USE flag alias for '%s%s%s': '%s'\n") %
(pkg.cpv, _repo_separator, pkg.repo, alias), noiselevel=-1)
else:
usealiases.setdefault(real_flag, []).append(alias)
cp_usealiases_dict = self._repo_pusealiases_dict.get(repo, {}).get(cp)
if cp_usealiases_dict:
usealiases_dict_list = ordered_by_atom_specificity(cp_usealiases_dict, pkg)
for usealiases_dict in usealiases_dict_list:
for real_flag, aliases in usealiases_dict.items():
for alias in aliases:
if any(alias in v for k, v in usealiases.items() if k != real_flag):
writemsg(_("--- Duplicated USE flag alias for '%s%s%s': '%s'\n") %
(pkg.cpv, _repo_separator, pkg.repo, alias), noiselevel=-1)
else:
usealiases.setdefault(real_flag, []).append(alias)
return usealiases
def getPUSE(self, pkg):
cp = getattr(pkg, "cp", None)
if cp is None:
slot = dep_getslot(pkg)
repo = dep_getrepo(pkg)
pkg = _pkg_str(remove_slot(pkg), slot=slot, repo=repo)
cp = pkg.cp
ret = ""
cpdict = self._pusedict.get(cp)
if cpdict:
puse_matches = ordered_by_atom_specificity(cpdict, pkg)
if puse_matches:
puse_list = []
for x in puse_matches:
puse_list.extend(x)
ret = " ".join(puse_list)
return ret
def extract_global_USE_changes(self, old=""):
ret = old
cpdict = self._pusedict.get("*/*")
if cpdict is not None:
v = cpdict.pop("*/*", None)
if v is not None:
ret = " ".join(v)
if old:
ret = old + " " + ret
if not cpdict:
#No tokens left in atom_license_map, remove it.
del self._pusedict["*/*"]
return ret