blob: 5a958a461572e11fa1a809e0f563c27f0bb00ee2 [file] [log] [blame]
# -*- coding:utf-8 -*-
'''fetches.py
Performs the src_uri fetchlist and files checks
'''
from stat import S_ISDIR
# import our initialized portage instance
from repoman._portage import portage
from repoman.modules.vcs.vcs import vcs_new_changed
from repoman.modules.scan.scanbase import ScanBase
from portage import os
class FetchChecks(ScanBase):
'''Performs checks on the files needed for the ebuild'''
def __init__(self, **kwargs):
'''
@param portdb: portdb instance
@param qatracker: QATracker instance
@param repo_settings: repository settings instance
@param vcs_settings: VCSSettings instance
'''
super(FetchChecks, self).__init__(**kwargs)
self.portdb = kwargs.get('portdb')
self.qatracker = kwargs.get('qatracker')
self.repo_settings = kwargs.get('repo_settings')
self.repoman_settings = self.repo_settings.repoman_settings
self.vcs_settings = kwargs.get('vcs_settings')
self._src_uri_error = False
# TODO: Build a regex instead here, for the SRC_URI.mirror check.
self.thirdpartymirrors = {}
profile_thirdpartymirrors = self.repo_settings.repoman_settings.thirdpartymirrors().items()
for mirror_alias, mirrors in profile_thirdpartymirrors:
# Skip thirdpartymirrors that do not list more than one mirror
# anymore. There is no point in using mirror:// there and this
# means that the thirdpartymirrors entry will most likely
# be removed anyway.
if len(mirrors) <= 1:
continue
for mirror in mirrors:
if not mirror.endswith("/"):
mirror += "/"
self.thirdpartymirrors[mirror] = mirror_alias
def check(self, **kwargs):
'''Checks the ebuild sources and files for errors
@param xpkg: the pacakge being checked
@param checkdir: string, directory path
@param checkdir_relative: repolevel determined path
@returns: boolean
'''
xpkg = kwargs.get('xpkg')
checkdir = kwargs.get('checkdir')
checkdir_relative = kwargs.get('checkdir_relative')
changed = kwargs.get('changed').changed
new = kwargs.get('changed').new
_digests = self.digests(checkdir)
fetchlist_dict = portage.FetchlistDict(
checkdir, self.repoman_settings, self.portdb)
myfiles_all = []
self._src_uri_error = False
for mykey in fetchlist_dict:
try:
myfiles_all.extend(fetchlist_dict[mykey])
except portage.exception.InvalidDependString as e:
self._src_uri_error = True
try:
self.portdb.aux_get(mykey, ["SRC_URI"])
except KeyError:
# This will be reported as an "ebuild.syntax" error.
pass
else:
self.qatracker.add_error(
"SRC_URI.syntax", "%s.ebuild SRC_URI: %s" % (mykey, e))
del fetchlist_dict
if not self._src_uri_error:
# This test can produce false positives if SRC_URI could not
# be parsed for one or more ebuilds. There's no point in
# producing a false error here since the root cause will
# produce a valid error elsewhere, such as "SRC_URI.syntax"
# or "ebuild.sytax".
myfiles_all = set(myfiles_all)
for entry in _digests:
if entry not in myfiles_all:
self.qatracker.add_error("digest.unused", checkdir + "::" + entry)
for entry in myfiles_all:
if entry not in _digests:
self.qatracker.add_error("digest.missing", checkdir + "::" + entry)
del myfiles_all
if os.path.exists(checkdir + "/files"):
filesdirlist = os.listdir(checkdir + "/files")
# Recurse through files directory, use filesdirlist as a stack;
# appending directories as needed,
# so people can't hide > 20k files in a subdirectory.
while filesdirlist:
y = filesdirlist.pop(0)
relative_path = os.path.join(xpkg, "files", y)
full_path = os.path.join(self.repo_settings.repodir, relative_path)
try:
mystat = os.stat(full_path)
except OSError as oe:
if oe.errno == 2:
# don't worry about it. it likely was removed via fix above.
continue
else:
raise oe
if S_ISDIR(mystat.st_mode):
if self.vcs_settings.status.isVcsDir(y):
continue
for z in os.listdir(checkdir + "/files/" + y):
if self.vcs_settings.status.isVcsDir(z):
continue
filesdirlist.append(y + "/" + z)
# Current policy is no files over 20 KiB, these are the checks.
# File size between 20 KiB and 60 KiB causes a warning,
# while file size over 60 KiB causes an error.
elif mystat.st_size > 61440:
self.qatracker.add_error(
"file.size-fatal", "(%d KiB) %s/files/%s" % (
mystat.st_size // 1024, xpkg, y))
elif mystat.st_size > 20480:
self.qatracker.add_error(
"file.size", "(%d KiB) %s/files/%s" % (
mystat.st_size // 1024, xpkg, y))
elif mystat.st_size == 0:
self.qatracker.add_error(
"file.empty", "%s/files/%s" % (xpkg, y))
index = self.repo_settings.repo_config.find_invalid_path_char(y)
if index != -1:
y_relative = os.path.join(checkdir_relative, "files", y)
if self.vcs_settings.vcs is not None \
and not vcs_new_changed(y_relative, changed, new):
# If the file isn't in the VCS new or changed set, then
# assume that it's an irrelevant temporary file (Manifest
# entries are not generated for file names containing
# prohibited characters). See bug #406877.
index = -1
if index != -1:
self.qatracker.add_error(
"file.name",
"%s/files/%s: char '%s'" % (checkdir, y, y[index]))
return False
def digests(self, checkdir):
'''Returns the freshly loaded digests
@param checkdir: string, directory path
'''
mf = self.repoman_settings.repositories.get_repo_for_location(
os.path.dirname(os.path.dirname(checkdir)))
mf = mf.load_manifest(checkdir, self.repoman_settings["DISTDIR"])
_digests = mf.getTypeDigests("DIST")
del mf
return _digests
def check_mirrors(self, **kwargs):
'''Check that URIs don't reference a server from thirdpartymirrors
@param ebuild: Ebuild which we check (object).
@returns: boolean
'''
ebuild = kwargs.get('ebuild').get()
for uri in portage.dep.use_reduce(
ebuild.metadata["SRC_URI"], matchall=True, is_src_uri=True,
eapi=ebuild.eapi, flat=True):
contains_mirror = False
for mirror, mirror_alias in self.thirdpartymirrors.items():
if uri.startswith(mirror):
contains_mirror = True
break
if not contains_mirror:
continue
new_uri = "mirror://%s/%s" % (mirror_alias, uri[len(mirror):])
self.qatracker.add_error(
"SRC_URI.mirror",
"%s: '%s' found in thirdpartymirrors, use '%s'" % (
ebuild.relative_path, mirror, new_uri))
return False
@property
def runInPkgs(self):
'''Package level scans'''
return (True, [self.check])
@property
def runInEbuilds(self):
'''Ebuild level scans'''
return (True, [self.check_mirrors])