blob: 8550b0487661e11e1d8ddd296d8a395ba15c1c68 [file] [log] [blame]
# Copyright 1999-2020 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
import difflib
import re
import portage
from portage import os
from portage.dbapi.porttree import _parse_uri_map
from portage.dbapi.IndexedPortdb import IndexedPortdb
from portage.dbapi.IndexedVardb import IndexedVardb
from portage.localization import localized_size
from portage.output import bold, darkgreen, green, red
from portage.util import writemsg_stdout
from portage.util.iterators.MultiIterGroupBy import MultiIterGroupBy
from _emerge.Package import Package
class search:
#
# class constants
#
VERSION_SHORT = 1
VERSION_RELEASE = 2
#
# public interface
#
def __init__(
self,
root_config,
spinner,
searchdesc,
verbose,
usepkg,
usepkgonly,
search_index=True,
search_similarity=None,
fuzzy=True,
regex_auto=False,
):
"""Searches the available and installed packages for the supplied search key.
The list of available and installed packages is created at object instantiation.
This makes successive searches faster."""
self.settings = root_config.settings
self.verbose = verbose
self.searchdesc = searchdesc
self.searchkey = None
self._results_specified = False
# Disable the spinner since search results are displayed
# incrementally.
self.spinner = None
self.root_config = root_config
self.setconfig = root_config.setconfig
self.regex_auto = regex_auto
self.fuzzy = fuzzy
self.search_similarity = 80 if search_similarity is None else search_similarity
self.matches = {"pkg": []}
self.mlen = 0
self._dbs = []
portdb = root_config.trees["porttree"].dbapi
bindb = root_config.trees["bintree"].dbapi
vardb = root_config.trees["vartree"].dbapi
if search_index:
portdb = IndexedPortdb(portdb)
vardb = IndexedVardb(vardb)
if not usepkgonly and portdb._have_root_eclass_dir:
self._dbs.append(portdb)
if (usepkg or usepkgonly) and bindb.cp_all():
self._dbs.append(bindb)
self._dbs.append(vardb)
self._portdb = portdb
self._vardb = vardb
def _spinner_update(self):
if self.spinner:
self.spinner.update()
def _cp_all(self):
iterators = []
for db in self._dbs:
# MultiIterGroupBy requires sorted input
i = db.cp_all(sort=True)
try:
i = iter(i)
except TypeError:
pass
iterators.append(i)
for group in MultiIterGroupBy(iterators):
yield group[0]
def _aux_get(self, *args, **kwargs):
for db in self._dbs:
try:
return db.aux_get(*args, **kwargs)
except KeyError:
pass
raise KeyError(args[0])
def _aux_get_error(self, cpv):
portage.writemsg(
"emerge: search: " "aux_get('%s') failed, skipping\n" % cpv, noiselevel=-1
)
def _findname(self, *args, **kwargs):
for db in self._dbs:
if db is not self._portdb:
# We don't want findname to return anything
# unless it's an ebuild in a repository.
# Otherwise, it's already built and we don't
# care about it.
continue
func = getattr(db, "findname", None)
if func:
value = func(*args, **kwargs)
if value:
return value
return None
def _getFetchMap(self, *args, **kwargs):
for db in self._dbs:
func = getattr(db, "getFetchMap", None)
if func:
value = func(*args, **kwargs)
if value:
return value
return {}
def _visible(self, db, cpv, metadata):
installed = db is self._vardb
built = installed or db is not self._portdb
pkg_type = "ebuild"
if installed:
pkg_type = "installed"
elif built:
pkg_type = "binary"
return Package(
type_name=pkg_type,
root_config=self.root_config,
cpv=cpv,
built=built,
installed=installed,
metadata=metadata,
).visible
def _first_cp(self, cp):
for db in self._dbs:
if hasattr(db, "cp_list"):
matches = db.cp_list(cp)
if matches:
return matches[-1]
else:
matches = db.match(cp)
for cpv in matches:
if cpv.cp == cp:
return cpv
return None
def _xmatch(self, level, atom):
"""
This method does not expand old-style virtuals because it
is restricted to returning matches for a single ${CATEGORY}/${PN}
and old-style virual matches unreliable for that when querying
multiple package databases. If necessary, old-style virtuals
can be performed on atoms prior to calling this method.
"""
if not isinstance(atom, portage.dep.Atom):
atom = portage.dep.Atom(atom)
cp = atom.cp
if level == "match-all":
matches = set()
for db in self._dbs:
if hasattr(db, "xmatch"):
matches.update(db.xmatch(level, atom))
else:
matches.update(db.match(atom))
result = list(x for x in matches if portage.cpv_getkey(x) == cp)
db._cpv_sort_ascending(result)
elif level == "match-visible":
matches = set()
for db in self._dbs:
if hasattr(db, "xmatch"):
matches.update(db.xmatch(level, atom))
else:
db_keys = list(db._aux_cache_keys)
for cpv in db.match(atom):
try:
metadata = zip(db_keys, db.aux_get(cpv, db_keys))
except KeyError:
self._aux_get_error(cpv)
continue
if not self._visible(db, cpv, metadata):
continue
matches.add(cpv)
result = list(x for x in matches if portage.cpv_getkey(x) == cp)
db._cpv_sort_ascending(result)
elif level == "bestmatch-visible":
result = None
for db in self._dbs:
if hasattr(db, "xmatch"):
cpv = db.xmatch("bestmatch-visible", atom)
if not cpv or portage.cpv_getkey(cpv) != cp:
continue
if not result or cpv == portage.best([cpv, result]):
result = cpv
else:
db_keys = list(db._aux_cache_keys)
matches = db.match(atom)
try:
db.match_unordered
except AttributeError:
pass
else:
db._cpv_sort_ascending(matches)
# break out of this loop with highest visible
# match, checked in descending order
for cpv in reversed(matches):
if portage.cpv_getkey(cpv) != cp:
continue
try:
metadata = zip(db_keys, db.aux_get(cpv, db_keys))
except KeyError:
self._aux_get_error(cpv)
continue
if not self._visible(db, cpv, metadata):
continue
if not result or cpv == portage.best([cpv, result]):
result = cpv
break
else:
raise NotImplementedError(level)
return result
def execute(self, searchkey):
"""Performs the search for the supplied search key"""
self.searchkey = searchkey
def _iter_search(self):
match_category = 0
self.packagematches = []
if self.searchdesc:
self.searchdesc = 1
self.matches = {"pkg": [], "desc": [], "set": []}
else:
self.searchdesc = 0
self.matches = {"pkg": [], "set": []}
writemsg_stdout("Searching...\n\n", noiselevel=-1)
regexsearch = False
if self.searchkey.startswith("%"):
regexsearch = True
self.searchkey = self.searchkey[1:]
if self.searchkey.startswith("@"):
match_category = 1
self.searchkey = self.searchkey[1:]
# Auto-detect category match mode (@ symbol can be deprecated
# after this is available in a stable version of portage).
if "/" in self.searchkey:
match_category = 1
fuzzy = False
if (
self.regex_auto
and not regexsearch
and re.search(r"[\^\$\*\[\]\{\}\|\?]|\.\+", self.searchkey) is not None
):
try:
re.compile(self.searchkey, re.I)
except Exception:
pass
else:
regexsearch = True
if regexsearch:
self.searchre = re.compile(self.searchkey, re.I)
else:
self.searchre = re.compile(re.escape(self.searchkey), re.I)
# Fuzzy search does not support regular expressions, therefore
# it is disabled for regular expression searches.
if self.fuzzy:
fuzzy = True
cutoff = float(self.search_similarity) / 100
if match_category:
# Weigh the similarity of category and package
# names independently, in order to avoid matching
# lots of irrelevant packages in the same category
# when the package name is much shorter than the
# category name.
part_split = portage.catsplit
else:
part_split = lambda match_string: (match_string,)
part_matchers = []
for part in part_split(self.searchkey):
seq_match = difflib.SequenceMatcher()
seq_match.set_seq2(part.lower())
part_matchers.append(seq_match)
def fuzzy_search_part(seq_match, match_string):
seq_match.set_seq1(match_string.lower())
return (
seq_match.real_quick_ratio() >= cutoff
and seq_match.quick_ratio() >= cutoff
and seq_match.ratio() >= cutoff
)
def fuzzy_search(match_string):
return all(
fuzzy_search_part(seq_match, part)
for seq_match, part in zip(
part_matchers, part_split(match_string)
)
)
for package in self._cp_all():
self._spinner_update()
if match_category:
match_string = package[:]
else:
match_string = package.split("/")[-1]
if self.searchre.search(match_string):
yield ("pkg", package)
elif fuzzy and fuzzy_search(match_string):
yield ("pkg", package)
elif self.searchdesc: # DESCRIPTION searching
# Use _first_cp to avoid an expensive visibility check,
# since the visibility check can be avoided entirely
# when the DESCRIPTION does not match.
full_package = self._first_cp(package)
if not full_package:
continue
try:
full_desc = self._aux_get(full_package, ["DESCRIPTION"])[0]
except KeyError:
self._aux_get_error(full_package)
continue
if not self.searchre.search(full_desc):
continue
yield ("desc", package)
self.sdict = self.setconfig.getSets()
for setname in self.sdict:
self._spinner_update()
if match_category:
match_string = setname
else:
match_string = setname.split("/")[-1]
if self.searchre.search(match_string):
yield ("set", setname)
elif self.searchdesc:
if self.searchre.search(self.sdict[setname].getMetadata("DESCRIPTION")):
yield ("set", setname)
def addCP(self, cp):
"""
Add a specific cp to the search results. This modifies the
behavior of the output method, so that it only displays specific
packages added via this method.
"""
self._results_specified = True
if not self._xmatch("match-all", cp):
return
self.matches["pkg"].append(cp)
self.mlen += 1
def output(self):
"""Outputs the results of the search."""
class msg:
@staticmethod
def append(msg):
writemsg_stdout(msg, noiselevel=-1)
msg.append(
"\b\b \n[ Results for search key : " + bold(self.searchkey) + " ]\n"
)
vardb = self._vardb
metadata_keys = set(Package.metadata_keys)
metadata_keys.update(["DESCRIPTION", "HOMEPAGE", "LICENSE", "SRC_URI"])
metadata_keys = tuple(metadata_keys)
if self._results_specified:
# Handle results added via addCP
addCP_matches = []
for mytype, matches in self.matches.items():
for match in matches:
addCP_matches.append((mytype, match))
iterator = iter(addCP_matches)
else:
# Do a normal search
iterator = self._iter_search()
for mtype, match in iterator:
self.mlen += 1
masked = False
full_package = None
if mtype in ("pkg", "desc"):
full_package = self._xmatch("bestmatch-visible", match)
if not full_package:
masked = True
full_package = self._xmatch("match-all", match)
if full_package:
full_package = full_package[-1]
elif mtype == "set":
msg.append(green("*") + " " + bold(match) + "\n")
if self.verbose:
msg.append(
" "
+ darkgreen("Description:")
+ " "
+ self.sdict[match].getMetadata("DESCRIPTION")
+ "\n\n"
)
if full_package:
try:
metadata = dict(
zip(metadata_keys, self._aux_get(full_package, metadata_keys))
)
except KeyError:
self._aux_get_error(full_package)
continue
desc = metadata["DESCRIPTION"]
homepage = metadata["HOMEPAGE"]
license = metadata["LICENSE"] # pylint: disable=redefined-builtin
if masked:
msg.append(
green("*") + " " + bold(match) + " " + red("[ Masked ]") + "\n"
)
else:
msg.append(green("*") + " " + bold(match) + "\n")
myversion = self.getVersion(full_package, search.VERSION_RELEASE)
mysum = [0, 0]
file_size_str = None
mycat = match.split("/")[0]
mypkg = match.split("/")[1]
mycpv = match + "-" + myversion
myebuild = self._findname(mycpv)
if myebuild:
pkg = Package(
built=False,
cpv=mycpv,
installed=False,
metadata=metadata,
root_config=self.root_config,
type_name="ebuild",
)
pkgdir = os.path.dirname(myebuild)
mf = self.settings.repositories.get_repo_for_location(
os.path.dirname(os.path.dirname(pkgdir))
)
mf = mf.load_manifest(pkgdir, self.settings["DISTDIR"])
try:
uri_map = _parse_uri_map(mycpv, metadata, use=pkg.use.enabled)
except portage.exception.InvalidDependString as e:
file_size_str = "Unknown (%s)" % (e,)
del e
else:
try:
mysum[0] = mf.getDistfilesSize(uri_map)
except KeyError as e:
file_size_str = "Unknown (missing " + "digest for %s)" % (
e,
)
del e
available = False
for db in self._dbs:
if db is not vardb and db.cpv_exists(mycpv):
available = True
if not myebuild and hasattr(db, "bintree"):
myebuild = db.bintree.getname(mycpv)
try:
mysum[0] = os.stat(myebuild).st_size
except OSError:
myebuild = None
break
if myebuild and file_size_str is None:
file_size_str = localized_size(mysum[0])
if self.verbose:
if available:
msg.append(
" %s %s\n"
% (darkgreen("Latest version available:"), myversion)
)
msg.append(
" %s\n" % self.getInstallationStatus(mycat + "/" + mypkg)
)
if myebuild:
msg.append(
" %s %s\n"
% (darkgreen("Size of files:"), file_size_str)
)
msg.append(
" " + darkgreen("Homepage:") + " " + homepage + "\n"
)
msg.append(
" " + darkgreen("Description:") + " " + desc + "\n"
)
msg.append(
" " + darkgreen("License:") + " " + license + "\n\n"
)
msg.append("[ Applications found : " + bold(str(self.mlen)) + " ]\n\n")
# This method can be called multiple times, so
# reset the match count for the next call. Don't
# reset it at the beginning of this method, since
# that would lose modfications from the addCP
# method.
self.mlen = 0
#
# private interface
#
def getInstallationStatus(self, package):
if not isinstance(package, portage.dep.Atom):
package = portage.dep.Atom(package)
installed_package = self._vardb.match(package)
if installed_package:
try:
self._vardb.match_unordered
except AttributeError:
installed_package = installed_package[-1]
else:
installed_package = portage.best(installed_package)
else:
installed_package = ""
result = ""
version = self.getVersion(installed_package, search.VERSION_RELEASE)
if len(version) > 0:
result = darkgreen("Latest version installed:") + " " + version
else:
result = darkgreen("Latest version installed:") + " [ Not Installed ]"
return result
def getVersion(self, full_package, detail):
if len(full_package) > 1:
package_parts = portage.catpkgsplit(full_package)
if detail == search.VERSION_RELEASE and package_parts[3] != "r0":
result = package_parts[2] + "-" + package_parts[3]
else:
result = package_parts[2]
else:
result = ""
return result