| #!/usr/bin/env python |
| # Copyright 1999-2021 Gentoo Authors |
| # Distributed under the terms of the GNU General Public License v2 |
| |
| import argparse |
| import re |
| import sys |
| import codecs |
| from functools import reduce |
| import operator |
| |
| from os import path as osp |
| |
| if osp.isfile( |
| osp.join(osp.dirname(osp.dirname(osp.realpath(__file__))), ".portage_not_installed") |
| ): |
| sys.path.insert( |
| 0, osp.join(osp.dirname(osp.dirname(osp.realpath(__file__))), "lib") |
| ) |
| import portage |
| |
| portage._internal_caller = True |
| from portage import os |
| from portage.glsa import match |
| from portage.output import green, red, nocolor, white |
| |
| __program__ = "glsa-check" |
| __author__ = "Marius Mauch <genone@gentoo.org>" |
| __version__ = "1.0.1" |
| |
| # option parsing |
| epilog = ( |
| "glsa-list can contain an arbitrary number of GLSA ids," |
| " filenames containing GLSAs or the special identifiers" |
| " 'all' and 'affected'" |
| ) |
| parser = argparse.ArgumentParser( |
| usage=__program__ + " <option> [glsa-id | all | new | affected]", epilog=epilog |
| ) |
| |
| modes = parser.add_argument_group("Modes") |
| modes.add_argument( |
| "-l", |
| "--list", |
| action="store_const", |
| const="list", |
| dest="mode", |
| help="List a summary for the given GLSA(s) or set and whether they affect the system", |
| ) |
| modes.add_argument( |
| "-d", |
| "--dump", |
| action="store_const", |
| const="dump", |
| dest="mode", |
| help="Show all information about the GLSA(s) or set", |
| ) |
| modes.add_argument( |
| "--print", action="store_const", const="dump", dest="mode", help="Alias for --dump" |
| ) |
| modes.add_argument( |
| "-t", |
| "--test", |
| action="store_const", |
| const="test", |
| dest="mode", |
| help="Test if this system is affected by the GLSA(s) or set and output the GLSA ID(s)", |
| ) |
| modes.add_argument( |
| "-p", |
| "--pretend", |
| action="store_const", |
| const="pretend", |
| dest="mode", |
| help="Show the necessary steps to remediate the system", |
| ) |
| modes.add_argument( |
| "-f", |
| "--fix", |
| action="store_const", |
| const="fix", |
| dest="mode", |
| help="(experimental) Attempt to remediate the system based on the instructions given in the GLSA(s) or set. This will only upgrade (when an upgrade path exists) or remove packages", |
| ) |
| modes.add_argument( |
| "-i", |
| "--inject", |
| action="store_const", |
| const="inject", |
| dest="mode", |
| help="Inject the given GLSA(s) into the glsa_injected file", |
| ) |
| modes.add_argument( |
| "-m", |
| "--mail", |
| action="store_const", |
| const="mail", |
| dest="mode", |
| help="Send a mail with the given GLSAs to the administrator", |
| ) |
| parser.add_argument( |
| "-V", "--version", action="store_true", help="Show information about glsa-check" |
| ) |
| parser.add_argument( |
| "-q", |
| "--quiet", |
| action="store_true", |
| dest="quiet", |
| help="Be less verbose and do not send empty mail", |
| ) |
| parser.add_argument( |
| "-v", "--verbose", action="store_true", dest="verbose", help="Print more messages" |
| ) |
| parser.add_argument( |
| "-n", "--nocolor", action="store_true", help="Removes color from output" |
| ) |
| parser.add_argument( |
| "-e", |
| "--emergelike", |
| action="store_false", |
| dest="least_change", |
| help="Upgrade to latest version (not least-change)", |
| ) |
| parser.add_argument( |
| "-c", |
| "--cve", |
| action="store_true", |
| dest="list_cve", |
| help="Show CVE IDs in listing mode", |
| ) |
| parser.add_argument( |
| "-r", |
| "--reverse", |
| action="store_true", |
| dest="reverse", |
| help="List GLSAs in reverse order", |
| ) |
| |
| options, params = parser.parse_known_args() |
| |
| if options.nocolor: |
| nocolor() |
| |
| if options.version: |
| sys.stderr.write("\n" + __program__ + ", version " + __version__ + "\n") |
| sys.stderr.write("Author: " + __author__ + "\n") |
| sys.stderr.write("This program is licensed under the GPL, version 2\n\n") |
| sys.exit(0) |
| |
| mode = options.mode |
| least_change = options.least_change |
| list_cve = options.list_cve |
| quiet = options.quiet |
| verbose = options.verbose |
| |
| # Sanity checking |
| if mode is None: |
| sys.stderr.write("No mode given: what should I do?\n") |
| parser.print_help() |
| sys.exit(1) |
| elif mode != "list" and not params: |
| sys.stderr.write("\nno GLSA given, so we'll do nothing for now. \n") |
| sys.stderr.write("If you want to run on all GLSA please tell me so \n") |
| sys.stderr.write('(specify "all" as parameter)\n\n') |
| parser.print_help() |
| sys.exit(1) |
| elif mode in ["fix", "inject"] and os.geteuid() != 0: |
| # we need root privileges for write access |
| sys.stderr.write( |
| "\nThis tool needs root access to " + options.mode + " this GLSA\n\n" |
| ) |
| sys.exit(2) |
| elif mode == "list" and not params: |
| params.append("affected") |
| |
| # delay this for speed increase |
| from portage.glsa import ( |
| Glsa, |
| GlsaTypeException, |
| GlsaFormatException, |
| get_applied_glsas, |
| get_glsa_list, |
| ) |
| |
| eroot = portage.settings["EROOT"] |
| vardb = portage.db[eroot]["vartree"].dbapi |
| portdb = portage.db[eroot]["porttree"].dbapi |
| |
| # build glsa lists |
| completelist = get_glsa_list(portage.settings) |
| |
| checklist = get_applied_glsas(portage.settings) |
| todolist = [e for e in completelist if e not in checklist] |
| |
| glsalist = [] |
| if "new" in params: |
| params.remove("new") |
| sys.stderr.write( |
| "Warning: The 'new' glsa-list target has been removed, using 'affected'.\n" |
| ) |
| params.append("affected") |
| |
| if "all" in params: |
| glsalist = completelist |
| params.remove("all") |
| |
| if "affected" in params: |
| for x in todolist: |
| try: |
| myglsa = Glsa(x, portage.settings, vardb, portdb) |
| except (GlsaTypeException, GlsaFormatException) as e: |
| if verbose: |
| sys.stderr.write( |
| ("invalid GLSA: %s (error message was: %s)\n" % (x, e)) |
| ) |
| continue |
| if myglsa.isVulnerable(): |
| glsalist.append(x) |
| params.remove("affected") |
| |
| # remove invalid parameters |
| for p in params[:]: |
| if not (p in completelist or os.path.exists(p)): |
| sys.stderr.write( |
| ( |
| "(removing %s from parameter list as it isn't a valid GLSA specification)\n" |
| % p |
| ) |
| ) |
| params.remove(p) |
| |
| glsalist.extend([g for g in params if g not in glsalist]) |
| |
| |
| def summarylist(myglsalist, fd1=sys.stdout, fd2=sys.stderr, encoding="utf-8"): |
| # Get to the raw streams in py3k before wrapping them with an encoded writer |
| # to avoid writing bytes to a text stream (stdout/stderr are text streams |
| # by default in py3k) |
| if hasattr(fd1, "buffer"): |
| fd1 = fd1.buffer |
| if hasattr(fd2, "buffer"): |
| fd2 = fd2.buffer |
| fd1 = codecs.getwriter(encoding)(fd1) |
| fd2 = codecs.getwriter(encoding)(fd2) |
| if not quiet: |
| fd2.write(white("[A]") + " means this GLSA was marked as applied (injected),\n") |
| fd2.write(green("[U]") + " means the system is not affected and\n") |
| fd2.write(red("[N]") + " indicates that the system might be affected.\n\n") |
| |
| for myid in sorted(myglsalist, reverse=options.reverse): |
| try: |
| myglsa = Glsa(myid, portage.settings, vardb, portdb) |
| except (GlsaTypeException, GlsaFormatException) as e: |
| if verbose: |
| fd2.write(("invalid GLSA: %s (error message was: %s)\n" % (myid, e))) |
| continue |
| if myglsa.isInjected(): |
| status = "[A]" |
| color = white |
| elif myglsa.isVulnerable(): |
| status = "[N]" |
| color = red |
| else: |
| status = "[U]" |
| color = green |
| |
| if verbose: |
| access = "[%-8s] " % myglsa.access |
| else: |
| access = "" |
| |
| fd1.write( |
| color(myglsa.nr) |
| + " " |
| + color(status) |
| + " " |
| + color(access) |
| + myglsa.title |
| + " (" |
| ) |
| if not verbose: |
| for pkg in list(myglsa.packages)[:3]: |
| fd1.write(" " + pkg + " ") |
| if len(myglsa.packages) > 3: |
| fd1.write("... ") |
| else: |
| for cpv in myglsa.packages.keys(): |
| pkg = myglsa.packages[cpv] |
| for path in pkg: |
| v_installed = reduce( |
| operator.add, [match(v, vardb) for v in path["vul_atoms"]], [] |
| ) |
| u_installed = reduce( |
| operator.add, [match(u, vardb) for u in path["unaff_atoms"]], [] |
| ) |
| mylist = sorted(set(v_installed).difference(set(u_installed))) |
| if len(mylist) > 0: |
| cpv = color(" ".join(mylist)) |
| fd1.write(" " + cpv + " ") |
| |
| fd1.write(")") |
| if list_cve: |
| cve_ids = [] |
| for r in myglsa.references: |
| m = re.search(r"(CAN|CVE)-[\d-]+", r) |
| if m is not None: |
| cve_ids.append(m.group(0)) |
| if cve_ids: |
| fd1.write(" " + (",".join(cve_ids))) |
| fd1.write("\n") |
| return 0 |
| |
| |
| if mode == "list": |
| sys.exit(summarylist(glsalist)) |
| |
| # dump, fix, inject and fix are nearly the same code, only the glsa method call differs |
| if mode in ["dump", "fix", "inject", "pretend"]: |
| for myid in glsalist: |
| try: |
| myglsa = Glsa(myid, portage.settings, vardb, portdb) |
| except (GlsaTypeException, GlsaFormatException) as e: |
| if verbose: |
| sys.stderr.write( |
| ("invalid GLSA: %s (error message was: %s)\n" % (myid, e)) |
| ) |
| continue |
| if mode == "dump": |
| myglsa.dump() |
| elif mode == "fix": |
| if not quiet: |
| sys.stdout.write("Fixing GLSA " + myid + "\n") |
| if not myglsa.isVulnerable(): |
| if not quiet: |
| sys.stdout.write(">>> no vulnerable packages installed\n") |
| else: |
| if quiet: |
| sys.stdout.write("Fixing GLSA " + myid + "\n") |
| mergelist = myglsa.getMergeList(least_change=least_change) |
| if mergelist == []: |
| sys.stdout.write( |
| ">>> cannot fix GLSA, no unaffected packages available\n" |
| ) |
| sys.exit(2) |
| for pkg in mergelist: |
| sys.stdout.write(">>> merging " + pkg + "\n") |
| # using emerge for the actual merging as it contains the dependency |
| # code and we want to be consistent in behaviour. Also this functionality |
| # will be integrated in emerge later, so it shouldn't hurt much. |
| emergecmd = ( |
| "emerge --oneshot" + (" --quiet" if quiet else "") + " =" + pkg |
| ) |
| if verbose: |
| sys.stderr.write(emergecmd + "\n") |
| exitcode = os.system(emergecmd) |
| # system() returns the exitcode in the high byte of a 16bit integer |
| if exitcode >= 1 << 8: |
| exitcode >>= 8 |
| if exitcode: |
| sys.exit(exitcode) |
| if len(mergelist): |
| sys.stdout.write("\n") |
| elif mode == "pretend": |
| if not quiet: |
| sys.stdout.write("Checking GLSA " + myid + "\n") |
| if not myglsa.isVulnerable(): |
| if not quiet: |
| sys.stdout.write(">>> no vulnerable packages installed\n") |
| else: |
| if quiet: |
| sys.stdout.write("Checking GLSA " + myid + "\n") |
| mergedict = {} |
| for (vuln, update) in myglsa.getAffectionTable( |
| least_change=least_change |
| ): |
| mergedict.setdefault(update, []).append(vuln) |
| |
| # first, extract the atoms that cannot be upgraded (where key == "") |
| no_upgrades = [] |
| sys.stdout.write( |
| ">>> The following updates will be performed for this GLSA:\n" |
| ) |
| if "" in mergedict: |
| no_upgrades = mergedict[""] |
| del mergedict[""] |
| |
| # see if anything is left that can be upgraded |
| if mergedict: |
| sys.stdout.write(">>> Updates that will be performed:\n") |
| for (upd, vuln) in mergedict.items(): |
| sys.stdout.write( |
| " " |
| + green(upd) |
| + " (vulnerable: " |
| + red(", ".join(vuln)) |
| + ")\n" |
| ) |
| |
| if no_upgrades: |
| sys.stdout.write(">>> No upgrade path exists for these packages:\n") |
| sys.stdout.write(" " + red(", ".join(no_upgrades)) + "\n") |
| sys.stdout.write("\n") |
| elif mode == "inject": |
| sys.stdout.write("injecting " + myid + "\n") |
| myglsa.inject() |
| if not quiet: |
| sys.stdout.write("\n") |
| sys.exit(0) |
| |
| # test is a bit different as Glsa.test() produces no output |
| if mode == "test": |
| outputlist = [] |
| for myid in glsalist: |
| try: |
| myglsa = Glsa(myid, portage.settings, vardb, portdb) |
| except (GlsaTypeException, GlsaFormatException) as e: |
| if verbose: |
| sys.stderr.write( |
| ("invalid GLSA: %s (error message was: %s)\n" % (myid, e)) |
| ) |
| continue |
| if myglsa.isVulnerable(): |
| outputlist.append(str(myglsa.nr)) |
| if len(outputlist) > 0: |
| sys.stderr.write("This system is affected by the following GLSAs:\n") |
| if verbose: |
| summarylist(outputlist) |
| else: |
| sys.stdout.write("\n".join(outputlist) + "\n") |
| sys.exit(6) |
| else: |
| sys.stderr.write("This system is not affected by any of the listed GLSAs\n") |
| sys.exit(0) |
| |
| # mail mode as requested by solar |
| if mode == "mail": |
| import socket |
| from io import BytesIO |
| from email.mime.text import MIMEText |
| import portage.mail |
| |
| # color doesn't make any sense for mail |
| nocolor() |
| |
| if "PORTAGE_ELOG_MAILURI" in portage.settings: |
| myrecipient = portage.settings["PORTAGE_ELOG_MAILURI"].split()[0] |
| else: |
| myrecipient = "root@localhost" |
| |
| if "PORTAGE_ELOG_MAILFROM" in portage.settings: |
| myfrom = portage.settings["PORTAGE_ELOG_MAILFROM"] |
| else: |
| myfrom = "glsa-check" |
| |
| mysubject = "[glsa-check] Summary for %s" % socket.getfqdn() |
| |
| # need a file object for summarylist() |
| myfd = BytesIO() |
| line = "GLSA Summary report for host %s\n" % socket.getfqdn() |
| myfd.write(line.encode("utf-8")) |
| line = "(Command was: %s)\n\n" % " ".join(sys.argv) |
| myfd.write(line.encode("utf-8")) |
| summarylist(glsalist, fd1=myfd, fd2=myfd) |
| summary = myfd.getvalue().decode("utf-8") |
| myfd.close() |
| |
| myattachments = [] |
| for myid in glsalist: |
| try: |
| myglsa = Glsa(myid, portage.settings, vardb, portdb) |
| except (GlsaTypeException, GlsaFormatException) as e: |
| if verbose: |
| sys.stderr.write( |
| ("invalid GLSA: %s (error message was: %s)\n" % (myid, e)) |
| ) |
| continue |
| myfd = BytesIO() |
| myglsa.dump(outstream=myfd) |
| attachment = myfd.getvalue().decode("utf-8") |
| myattachments.append(MIMEText(attachment, _charset="utf8")) |
| myfd.close() |
| |
| if glsalist or not quiet: |
| mymessage = portage.mail.create_message( |
| myfrom, myrecipient, mysubject, summary, myattachments |
| ) |
| portage.mail.send_mail(portage.settings, mymessage) |
| |
| sys.exit(0) |
| |
| # something wrong here, all valid paths are covered with sys.exit() |
| sys.stderr.write("nothing more to do\n") |
| sys.exit(2) |