blob: f67f66741ac527737b4b4ff5dbd6203a0b4da04c [file] [log] [blame] [edit]
#!/usr/bin/python
"""
Software package management library.
This is an abstraction layer on top of the existing distributions high level
package managers. It supports package operations useful for testing purposes,
and multiple high level package managers (here called backends). If you want
to make this lib to support your particular package manager/distro, please
implement the given backend class.
@author: Higor Vieira Alves (halves@br.ibm.com)
@author: Lucas Meneghel Rodrigues (lmr@redhat.com)
@author: Ramon de Carvalho Valle (rcvalle@br.ibm.com)
@copyright: IBM 2008-2009
@copyright: Red Hat 2009-2010
"""
import os, re, logging, ConfigParser, optparse, random, string
try:
import yum
except:
pass
import common
from autotest_lib.client.bin import os_dep, utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib import logging_config, logging_manager
def generate_random_string(length):
"""
Return a random string using alphanumeric characters.
@length: Length of the string that will be generated.
"""
r = random.SystemRandom()
str = ""
chars = string.letters + string.digits
while length > 0:
str += r.choice(chars)
length -= 1
return str
class SoftwareManagerLoggingConfig(logging_config.LoggingConfig):
"""
Used with the sole purpose of providing convenient logging setup
for the KVM test auxiliary programs.
"""
def configure_logging(self, results_dir=None, verbose=False):
super(SoftwareManagerLoggingConfig, self).configure_logging(
use_console=True,
verbose=verbose)
class SystemInspector(object):
"""
System inspector class.
This may grow up to include more complete reports of operating system and
machine properties.
"""
def __init__(self):
"""
Probe system, and save information for future reference.
"""
self.distro = utils.get_os_vendor()
self.high_level_pms = ['apt-get', 'yum', 'zypper']
def get_package_management(self):
"""
Determine the supported package management systems present on the
system. If more than one package management system installed, try
to find the best supported system.
"""
list_supported = []
for high_level_pm in self.high_level_pms:
try:
os_dep.command(high_level_pm)
list_supported.append(high_level_pm)
except:
pass
pm_supported = None
if len(list_supported) == 0:
pm_supported = None
if len(list_supported) == 1:
pm_supported = list_supported[0]
elif len(list_supported) > 1:
if 'apt-get' in list_supported and self.distro in ['Debian', 'Ubuntu']:
pm_supported = 'apt-get'
elif 'yum' in list_supported and self.distro == 'Fedora':
pm_supported = 'yum'
else:
pm_supported = list_supported[0]
logging.debug('Package Manager backend: %s' % pm_supported)
return pm_supported
class SoftwareManager(object):
"""
Package management abstraction layer.
It supports a set of common package operations for testing purposes, and it
uses the concept of a backend, a helper class that implements the set of
operations of a given package management tool.
"""
def __init__(self):
"""
Class constructor.
Determines the best supported package management system for the given
operating system running and initializes the appropriate backend.
"""
inspector = SystemInspector()
backend_type = inspector.get_package_management()
if backend_type == 'yum':
self.backend = YumBackend()
elif backend_type == 'zypper':
self.backend = ZypperBackend()
elif backend_type == 'apt-get':
self.backend = AptBackend()
else:
raise NotImplementedError('Unimplemented package management '
'system: %s.' % backend_type)
def check_installed(self, name, version=None, arch=None):
"""
Check whether a package is installed on this system.
@param name: Package name.
@param version: Package version.
@param arch: Package architecture.
"""
return self.backend.check_installed(name, version, arch)
def list_all(self):
"""
List all installed packages.
"""
return self.backend.list_all()
def list_files(self, name):
"""
Get a list of all files installed by package [name].
@param name: Package name.
"""
return self.backend.list_files(name)
def install(self, name):
"""
Install package [name].
@param name: Package name.
"""
return self.backend.install(name)
def remove(self, name):
"""
Remove package [name].
@param name: Package name.
"""
return self.backend.remove(name)
def add_repo(self, url):
"""
Add package repo described by [url].
@param name: URL of the package repo.
"""
return self.backend.add_repo(url)
def remove_repo(self, url):
"""
Remove package repo described by [url].
@param url: URL of the package repo.
"""
return self.backend.remove_repo(url)
def upgrade(self):
"""
Upgrade all packages available.
"""
return self.backend.upgrade()
def provides(self, file):
"""
Returns a list of packages that provides a given capability to the
system (be it a binary, a library).
@param file: Path to the file.
"""
return self.backend.provides(file)
def install_what_provides(self, file):
"""
Installs package that provides [file].
@param file: Path to file.
"""
provides = self.provides(file)
if provides is not None:
self.install(provides)
else:
logging.warning('No package seems to provide %s', file)
class RpmBackend(object):
"""
This class implements operations executed with the rpm package manager.
rpm is a lower level package manager, used by higher level managers such
as yum and zypper.
"""
def __init__(self):
self.lowlevel_base_cmd = os_dep.command('rpm')
def _check_installed_version(self, name, version):
"""
Helper for the check_installed public method.
@param name: Package name.
@param version: Package version.
"""
cmd = (self.lowlevel_base_cmd + ' -q --qf %{VERSION} ' + name +
' 2> /dev/null')
inst_version = utils.system_output(cmd)
if inst_version >= version:
return True
else:
return False
def check_installed(self, name, version=None, arch=None):
"""
Check if package [name] is installed.
@param name: Package name.
@param version: Package version.
@param arch: Package architecture.
"""
if arch:
cmd = (self.lowlevel_base_cmd + ' -q --qf %{ARCH} ' + name +
' 2> /dev/null')
inst_archs = utils.system_output(cmd)
inst_archs = inst_archs.split('\n')
for inst_arch in inst_archs:
if inst_arch == arch:
return self._check_installed_version(name, version)
return False
elif version:
return self._check_installed_version(name, version)
else:
cmd = 'rpm -q ' + name + ' 2> /dev/null'
return (os.system(cmd) == 0)
def list_all(self):
"""
List all installed packages.
"""
installed_packages = utils.system_output('rpm -qa').splitlines()
return installed_packages
def list_files(self, name):
"""
List files installed on the system by package [name].
@param name: Package name.
"""
path = os.path.abspath(name)
if os.path.isfile(path):
option = '-qlp'
name = path
else:
option = '-ql'
l_cmd = 'rpm' + ' ' + option + ' ' + name + ' 2> /dev/null'
try:
result = utils.system_output(l_cmd)
list_files = result.split('\n')
return list_files
except error.CmdError:
return []
class DpkgBackend(object):
"""
This class implements operations executed with the dpkg package manager.
dpkg is a lower level package manager, used by higher level managers such
as apt and aptitude.
"""
def __init__(self):
self.lowlevel_base_cmd = os_dep.command('dpkg')
def check_installed(self, name):
if os.path.isfile(name):
n_cmd = (self.lowlevel_base_cmd + ' -f ' + name +
' Package 2>/dev/null')
name = utils.system_output(n_cmd)
i_cmd = self.lowlevel_base_cmd + ' -s ' + name + ' 2>/dev/null'
# Checking if package is installed
package_status = utils.system_output(i_cmd, ignore_status=True)
not_inst_pattern = re.compile('not-installed', re.IGNORECASE)
dpkg_not_installed = re.search(not_inst_pattern, package_status)
if dpkg_not_installed:
return False
return True
def list_all(self):
"""
List all packages available in the system.
"""
installed_packages = []
raw_list = utils.system_output('dpkg -l').splitlines()[5:]
for line in raw_list:
parts = line.split()
if parts[0] == "ii": # only grab "installed" packages
installed_packages.append("%s-%s" % (parts[1], parts[2]))
def list_files(self, package):
"""
List files installed by package [package].
@param package: Package name.
@return: List of paths installed by package.
"""
if os.path.isfile(package):
l_cmd = self.lowlevel_base_cmd + ' -c ' + package
else:
l_cmd = self.lowlevel_base_cmd + ' -l ' + package
return utils.system_output(l_cmd).split('\n')
class YumBackend(RpmBackend):
"""
Implements the yum backend for software manager.
Set of operations for the yum package manager, commonly found on Yellow Dog
Linux and Red Hat based distributions, such as Fedora and Red Hat
Enterprise Linux.
"""
def __init__(self):
"""
Initializes the base command and the yum package repository.
"""
super(YumBackend, self).__init__()
executable = os_dep.command('yum')
base_arguments = '-y'
self.base_command = executable + ' ' + base_arguments
self.repo_file_path = '/etc/yum.repos.d/autotest.repo'
self.cfgparser = ConfigParser.ConfigParser()
self.cfgparser.read(self.repo_file_path)
y_cmd = executable + ' --version | head -1'
self.yum_version = utils.system_output(y_cmd, ignore_status=True)
logging.debug('Yum backend initialized')
logging.debug('Yum version: %s' % self.yum_version)
self.yum_base = yum.YumBase()
def _cleanup(self):
"""
Clean up the yum cache so new package information can be downloaded.
"""
utils.system("yum clean all")
def install(self, name):
"""
Installs package [name]. Handles local installs.
"""
if os.path.isfile(name):
name = os.path.abspath(name)
command = 'localinstall'
else:
command = 'install'
i_cmd = self.base_command + ' ' + command + ' ' + name
try:
utils.system(i_cmd)
return True
except:
return False
def remove(self, name):
"""
Removes package [name].
@param name: Package name (eg. 'ipython').
"""
r_cmd = self.base_command + ' ' + 'erase' + ' ' + name
try:
utils.system(r_cmd)
return True
except:
return False
def add_repo(self, url):
"""
Adds package repository located on [url].
@param url: Universal Resource Locator of the repository.
"""
# Check if we URL is already set
for section in self.cfgparser.sections():
for option, value in self.cfgparser.items(section):
if option == 'url' and value == url:
return True
# Didn't find it, let's set it up
while True:
section_name = 'software_manager' + '_' + generate_random_string(4)
if not self.cfgparser.has_section(section_name):
break
self.cfgparser.add_section(section_name)
self.cfgparser.set(section_name, 'name',
'Repository added by the autotest software manager.')
self.cfgparser.set(section_name, 'url', url)
self.cfgparser.set(section_name, 'enabled', 1)
self.cfgparser.set(section_name, 'gpgcheck', 0)
self.cfgparser.write(self.repo_file_path)
def remove_repo(self, url):
"""
Removes package repository located on [url].
@param url: Universal Resource Locator of the repository.
"""
for section in self.cfgparser.sections():
for option, value in self.cfgparser.items(section):
if option == 'url' and value == url:
self.cfgparser.remove_section(section)
self.cfgparser.write(self.repo_file_path)
def upgrade(self):
"""
Upgrade all available packages.
"""
r_cmd = self.base_command + ' ' + 'update'
try:
utils.system(r_cmd)
return True
except:
return False
def provides(self, name):
"""
Returns a list of packages that provides a given capability.
@param name: Capability name (eg, 'foo').
"""
d_provides = self.yum_base.searchPackageProvides(args=[name])
provides_list = [key for key in d_provides]
if provides_list:
logging.info("Package %s provides %s", provides_list[0], name)
return str(provides_list[0])
else:
return None
class ZypperBackend(RpmBackend):
"""
Implements the zypper backend for software manager.
Set of operations for the zypper package manager, found on SUSE Linux.
"""
def __init__(self):
"""
Initializes the base command and the yum package repository.
"""
super(ZypperBackend, self).__init__()
self.base_command = os_dep.command('zypper') + ' -n'
z_cmd = self.base_command + ' --version'
self.zypper_version = utils.system_output(z_cmd, ignore_status=True)
logging.debug('Zypper backend initialized')
logging.debug('Zypper version: %s' % self.zypper_version)
def install(self, name):
"""
Installs package [name]. Handles local installs.
@param name: Package Name.
"""
path = os.path.abspath(name)
i_cmd = self.base_command + ' install -l ' + name
try:
utils.system(i_cmd)
return True
except:
return False
def add_repo(self, url):
"""
Adds repository [url].
@param url: URL for the package repository.
"""
ar_cmd = self.base_command + ' addrepo ' + url
try:
utils.system(ar_cmd)
return True
except:
return False
def remove_repo(self, url):
"""
Removes repository [url].
@param url: URL for the package repository.
"""
rr_cmd = self.base_command + ' removerepo ' + url
try:
utils.system(rr_cmd)
return True
except:
return False
def remove(self, name):
"""
Removes package [name].
"""
r_cmd = self.base_command + ' ' + 'erase' + ' ' + name
try:
utils.system(r_cmd)
return True
except:
return False
def upgrade(self):
"""
Upgrades all packages of the system.
"""
u_cmd = self.base_command + ' update -l'
try:
utils.system(u_cmd)
return True
except:
return False
def provides(self, name):
"""
Searches for what provides a given file.
@param name: File path.
"""
p_cmd = self.base_command + ' what-provides ' + name
list_provides = []
try:
p_output = utils.system_output(p_cmd).split('\n')[4:]
for line in p_output:
line = [a.strip() for a in line.split('|')]
try:
state, pname, type, version, arch, repository = line
if pname not in list_provides:
list_provides.append(pname)
except IndexError:
pass
if len(list_provides) > 1:
logging.warning('More than one package found, '
'opting by the first queue result')
if list_provides:
logging.info("Package %s provides %s", list_provides[0], name)
return list_provides[0]
return None
except:
return None
class AptBackend(DpkgBackend):
"""
Implements the apt backend for software manager.
Set of operations for the apt package manager, commonly found on Debian and
Debian based distributions, such as Ubuntu Linux.
"""
def __init__(self):
"""
Initializes the base command and the debian package repository.
"""
super(AptBackend, self).__init__()
executable = os_dep.command('apt-get')
self.base_command = executable + ' -y'
self.repo_file_path = '/etc/apt/sources.list.d/autotest'
self.apt_version = utils.system_output('apt-get -v | head -1',
ignore_status=True)
logging.debug('Apt backend initialized')
logging.debug('apt version: %s' % self.apt_version)
def install(self, name):
"""
Installs package [name].
@param name: Package name.
"""
command = 'install'
i_cmd = self.base_command + ' ' + command + ' ' + name
try:
utils.system(i_cmd)
return True
except:
return False
def remove(self, name):
"""
Remove package [name].
@param name: Package name.
"""
command = 'remove'
flag = '--purge'
r_cmd = self.base_command + ' ' + command + ' ' + flag + ' ' + name
try:
utils.system(r_cmd)
return True
except:
return False
def add_repo(self, repo):
"""
Add an apt repository.
@param repo: Repository string. Example:
'deb http://archive.ubuntu.com/ubuntu/ maverick universe'
"""
repo_file = open(self.repo_file_path, 'a')
repo_file_contents = repo_file.read()
if repo not in repo_file_contents:
repo_file.write(repo)
def remove_repo(self, repo):
"""
Remove an apt repository.
@param repo: Repository string. Example:
'deb http://archive.ubuntu.com/ubuntu/ maverick universe'
"""
repo_file = open(self.repo_file_path, 'r')
new_file_contents = []
for line in repo_file.readlines:
if not line == repo:
new_file_contents.append(line)
repo_file.close()
new_file_contents = "\n".join(new_file_contents)
repo_file.open(self.repo_file_path, 'w')
repo_file.write(new_file_contents)
repo_file.close()
def upgrade(self):
"""
Upgrade all packages of the system with eventual new versions.
"""
ud_command = 'update'
ud_cmd = self.base_command + ' ' + ud_command
try:
utils.system(ud_cmd)
except:
logging.error("Apt package update failed")
up_command = 'upgrade'
up_cmd = self.base_command + ' ' + up_command
try:
utils.system(up_cmd)
return True
except:
return False
def provides(self, file):
"""
Return a list of packages that provide [file].
@param file: File path.
"""
if not self.check_installed('apt-file'):
self.install('apt-file')
command = os_dep.command('apt-file')
cache_update_cmd = command + ' update'
try:
utils.system(cache_update_cmd, ignore_status=True)
except:
logging.error("Apt file cache update failed")
fu_cmd = command + ' search ' + file
try:
provides = utils.system_output(fu_cmd).split('\n')
list_provides = []
for line in provides:
if line:
try:
line = line.split(':')
package = line[0].strip()
path = line[1].strip()
if path == file and package not in list_provides:
list_provides.append(package)
except IndexError:
pass
if len(list_provides) > 1:
logging.warning('More than one package found, '
'opting by the first queue result')
if list_provides:
logging.info("Package %s provides %s", list_provides[0], file)
return list_provides[0]
return None
except:
return None
if __name__ == '__main__':
parser = optparse.OptionParser(
"usage: %prog [install|remove|list-all|list-files|add-repo|remove-repo|"
"upgrade|what-provides|install-what-provides] arguments")
parser.add_option('--verbose', dest="debug", action='store_true',
help='include debug messages in console output')
options, args = parser.parse_args()
debug = options.debug
logging_manager.configure_logging(SoftwareManagerLoggingConfig(),
verbose=debug)
software_manager = SoftwareManager()
if args:
action = args[0]
args = " ".join(args[1:])
else:
action = 'show-help'
if action == 'install':
software_manager.install(args)
elif action == 'remove':
software_manager.remove(args)
if action == 'list-all':
software_manager.list_all()
elif action == 'list-files':
software_manager.list_files(args)
elif action == 'add-repo':
software_manager.add_repo(args)
elif action == 'remove-repo':
software_manager.remove_repo(args)
elif action == 'upgrade':
software_manager.upgrade()
elif action == 'what-provides':
software_manager.provides(args)
elif action == 'install-what-provides':
software_manager.install_what_provides(args)
elif action == 'show-help':
parser.print_help()