blob: 7c3343b14e383adc825419865d62337bbe59f014 [file] [log] [blame]
High-level KVM test utility functions.
This module is meant to reduce code size by performing common test procedures.
Generally, code here should look like test code.
More specifically:
- Functions in this module should raise exceptions if things go wrong
(unlike functions in and which report failure via
their returned values).
- Functions in this module may use, in addition to
logging.debug() and logging.error(), to log messages the user may be
interested in (unlike and which use
logging.debug() for anything that isn't an error).
- Functions in this module typically use functions and classes from
lower-level modules (e.g.,,
- Functions in this module should not be used by lower-level modules.
- Functions in this module should be used in the right context.
For example, a function should not be used where it may display
misleading or inaccurate info or debug messages.
@copyright: 2008-2009 Red Hat Inc.
import time, os, logging, re, signal
from autotest_lib.client.common_lib import error
from autotest_lib.client.bin import utils
from import scan_results
import aexpect, virt_utils, virt_vm
def get_living_vm(env, vm_name):
Get a VM object from the environment and make sure it's alive.
@param env: Dictionary with test environment.
@param vm_name: Name of the desired VM object.
@return: A VM object.
vm = env.get_vm(vm_name)
if not vm:
raise error.TestError("VM '%s' not found in environment" % vm_name)
if not vm.is_alive():
raise error.TestError("VM '%s' seems to be dead; test requires a "
"living VM" % vm_name)
return vm
def wait_for_login(vm, nic_index=0, timeout=240, start=0, step=2, serial=None):
Try logging into a VM repeatedly. Stop on success or when timeout expires.
@param vm: VM object.
@param nic_index: Index of NIC to access in the VM.
@param timeout: Time to wait before giving up.
@param serial: Whether to use a serial connection instead of a remote
(ssh, rss) one.
@return: A shell session object.
end_time = time.time() + timeout
session = None
if serial:
type = 'serial'"Trying to log into guest %s using serial connection,"
" timeout %ds",, timeout)
while time.time() < end_time:
session = vm.serial_login()
except virt_utils.LoginError, e:
type = 'remote'"Trying to log into guest %s using remote connection,"
" timeout %ds",, timeout)
while time.time() < end_time:
session = vm.login(nic_index=nic_index)
except (virt_utils.LoginError, virt_vm.VMError), e:
if not session:
raise error.TestFail("Could not log into guest %s using %s connection" %
(, type))"Logged into guest %s using %s connection",, type)
return session
def reboot(vm, session, method="shell", sleep_before_reset=10, nic_index=0,
Reboot the VM and wait for it to come back up by trying to log in until
timeout expires.
@param vm: VM object.
@param session: A shell session object.
@param method: Reboot method. Can be "shell" (send a shell reboot
command) or "system_reset" (send a system_reset monitor command).
@param nic_index: Index of NIC to access in the VM, when logging in after
@param timeout: Time to wait before giving up (after rebooting).
@return: A new shell session object.
if method == "shell":
# Send a reboot command to the guest's shell
session.sendline(vm.get_params().get("reboot_command"))"Reboot command sent. Waiting for guest to go down")
elif method == "system_reset":
# Sleep for a while before sending the command
# Clear the event list of all QMP monitors
monitors = [m for m in vm.monitors if m.protocol == "qmp"]
for m in monitors:
# Send a system_reset monitor command
vm.monitor.cmd("system_reset")"Monitor command system_reset sent. Waiting for guest to "
"go down")
# Look for RESET QMP events
for m in monitors:
if not m.get_event("RESET"):
raise error.TestFail("RESET QMP event not received after "
"system_reset (monitor '%s')" %
else:"RESET QMP event received")
logging.error("Unknown reboot method: %s", method)
# Wait for the session to become unresponsive and close it
if not virt_utils.wait_for(lambda: not session.is_responsive(timeout=30),
120, 0, 1):
raise error.TestFail("Guest refuses to go down")
# Try logging into the guest until timeout expires"Guest is down. Waiting for it to go up again, timeout %ds",
session = vm.wait_for_login(nic_index, timeout=timeout)"Guest is up again")
return session
def migrate(vm, env=None, mig_timeout=3600, mig_protocol="tcp",
mig_cancel=False, offline=False, stable_check=False,
clean=False, save_path=None, dest_host='localhost', mig_port=None):
Migrate a VM locally and re-register it in the environment.
@param vm: The VM to migrate.
@param env: The environment dictionary. If omitted, the migrated VM will
not be registered.
@param mig_timeout: timeout value for migration.
@param mig_protocol: migration protocol
@param mig_cancel: Test migrate_cancel or not when protocol is tcp.
@param dest_host: Destination host (defaults to 'localhost').
@param mig_port: Port that will be used for migration.
@return: The post-migration VM, in case of same host migration, True in
case of multi-host migration.
def mig_finished():
o ="migrate")
if isinstance(o, str):
return "status: active" not in o
return o.get("status") != "active"
def mig_succeeded():
o ="migrate")
if isinstance(o, str):
return "status: completed" in o
return o.get("status") == "completed"
def mig_failed():
o ="migrate")
if isinstance(o, str):
return "status: failed" in o
return o.get("status") == "failed"
def mig_cancelled():
o ="migrate")
if isinstance(o, str):
return ("Migration status: cancelled" in o or
"Migration status: canceled" in o)
return (o.get("status") == "cancelled" or
o.get("status") == "canceled")
def wait_for_migration():
if not virt_utils.wait_for(mig_finished, mig_timeout, 2, 2,
"Waiting for migration to finish"):
raise error.TestFail("Timeout expired while waiting for migration "
"to finish")
if dest_host == 'localhost':
dest_vm = vm.clone()
if (dest_host == 'localhost') and stable_check:
# Pause the dest vm after creation
dest_vm.params['extra_params'] = (dest_vm.params.get('extra_params','')
+ ' -S')
if dest_host == 'localhost':
dest_vm.create(migration_mode=mig_protocol, mac_source=vm)
if mig_protocol == "tcp":
if dest_host == 'localhost':
uri = "tcp:localhost:%d" % dest_vm.migration_port
uri = 'tcp:%s:%d' % (dest_host, mig_port)
elif mig_protocol == "unix":
uri = "unix:%s" % dest_vm.migration_file
elif mig_protocol == "exec":
uri = '"exec:nc localhost %s"' % dest_vm.migration_port
if offline:
if mig_cancel:
if not virt_utils.wait_for(mig_cancelled, 60, 2, 2,
"Waiting for migration "
raise error.TestFail("Failed to cancel migration")
if offline:
if dest_host == 'localhost':
return vm
if (dest_host == 'localhost') and stable_check:
save_path = None or "/tmp"
save1 = os.path.join(save_path, "src")
save2 = os.path.join(save_path, "dst")
# Fail if we see deltas
md5_save1 = utils.hash_file(save1)
md5_save2 = utils.hash_file(save2)
if md5_save1 != md5_save2:
raise error.TestFail("Mismatch of VM state before "
"and after migration")
if (dest_host == 'localhost') and offline:
if dest_host == 'localhost':
if (dest_host == 'localhost') and stable_check and clean:
logging.debug("Cleaning the state files")
if os.path.isfile(save1):
if os.path.isfile(save2):
# Report migration status
if mig_succeeded():"Migration finished successfully")
elif mig_failed():
raise error.TestFail("Migration failed")
raise error.TestFail("Migration ended with unknown status")
if dest_host == 'localhost':
if "paused" in"status"):
logging.debug("Destination VM is paused, resuming it")
# Kill the source VM
# Replace the source VM with the new cloned VM
if (dest_host == 'localhost') and (env is not None):
env.register_vm(, dest_vm)
# Return the new cloned VM
if dest_host == 'localhost':
return dest_vm
return vm
def stop_windows_service(session, service, timeout=120):
Stop a Windows service using sc.
If the service is already stopped or is not installed, do nothing.
@param service: The name of the service
@param timeout: Time duration to wait for service to stop
@raise error.TestError: Raised if the service can't be stopped
end_time = time.time() + timeout
while time.time() < end_time:
o = session.cmd_output("sc stop %s" % service, timeout=60)
# FAILED 1060 means the service isn't installed.
# FAILED 1062 means the service hasn't been started.
if"\bFAILED (1060|1062)\b", o, re.I):
raise error.TestError("Could not stop service '%s'" % service)
def start_windows_service(session, service, timeout=120):
Start a Windows service using sc.
If the service is already running, do nothing.
If the service isn't installed, fail.
@param service: The name of the service
@param timeout: Time duration to wait for service to start
@raise error.TestError: Raised if the service can't be started
end_time = time.time() + timeout
while time.time() < end_time:
o = session.cmd_output("sc start %s" % service, timeout=60)
# FAILED 1060 means the service isn't installed.
if"\bFAILED 1060\b", o, re.I):
raise error.TestError("Could not start service '%s' "
"(service not installed)" % service)
# FAILED 1056 means the service is already running.
if"\bFAILED 1056\b", o, re.I):
raise error.TestError("Could not start service '%s'" % service)
def get_time(session, time_command, time_filter_re, time_format):
Return the host time and guest time. If the guest time cannot be fetched
a TestError exception is raised.
Note that the shell session should be ready to receive commands
(i.e. should "display" a command prompt and should be done with all
previous commands).
@param session: A shell session.
@param time_command: Command to issue to get the current guest time.
@param time_filter_re: Regex filter to apply on the output of
time_command in order to get the current time.
@param time_format: Format string to pass to time.strptime() with the
result of the regex filter.
@return: A tuple containing the host time and guest time.
if len(re.findall("ntpdate|w32tm", time_command)) == 0:
host_time = time.time()
s = session.cmd_output(time_command)
s = re.findall(time_filter_re, s)[0]
except IndexError:
logging.debug("The time string from guest is:\n%s", s)
raise error.TestError("The time string from guest is unexpected.")
except Exception, e:
logging.debug("(time_filter_re, time_string): (%s, %s)",
time_filter_re, s)
raise e
guest_time = time.mktime(time.strptime(s, time_format))
o = session.cmd(time_command)
if re.match('ntpdate', time_command):
offset = re.findall('offset (.*) sec', o)[0]
host_main, host_mantissa = re.findall(time_filter_re, o)[0]
host_time = (time.mktime(time.strptime(host_main, time_format)) +
float("0.%s" % host_mantissa))
guest_time = host_time - float(offset)
guest_time = re.findall(time_filter_re, o)[0]
offset = re.findall("o:(.*)s", o)[0]
if re.match('PM', guest_time):
hour = re.findall('\d+ (\d+):', guest_time)[0]
hour = str(int(hour) + 12)
guest_time = re.sub('\d+\s\d+:', "\d+\s%s:" % hour,
guest_time = guest_time[:-3]
guest_time = time.mktime(time.strptime(guest_time, time_format))
host_time = guest_time + float(offset)
return (host_time, guest_time)
def get_memory_info(lvms):
Get memory information from host and guests in format:
Host: memfree = XXXM; Guests memsh = {XXX,XXX,...}
@params lvms: List of VM objects
@return: String with memory info report
if not isinstance(lvms, list):
raise error.TestError("Invalid list passed to get_stat: %s " % lvms)
meminfo = "Host: memfree = "
meminfo += str(int(utils.freememtotal()) / 1024) + "M; "
meminfo += "swapfree = "
mf = int(utils.read_from_meminfo("SwapFree")) / 1024
meminfo += str(mf) + "M; "
except Exception, e:
raise error.TestFail("Could not fetch host free memory info, "
"reason: %s" % e)
meminfo += "Guests memsh = {"
for vm in lvms:
shm = vm.get_shared_meminfo()
if shm is None:
raise error.TestError("Could not get shared meminfo from "
"VM %s" % vm)
meminfo += "%dM; " % shm
meminfo = meminfo[0:-2] + "}"
return meminfo
def run_autotest(vm, session, control_path, timeout, outputdir, params):
Run an autotest control file inside a guest (linux only utility).
@param vm: VM object.
@param session: A shell session on the VM provided.
@param control_path: A path to an autotest control file.
@param timeout: Timeout under which the autotest control file must complete.
@param outputdir: Path on host where we should copy the guest autotest
results to.
The following params is used by the migration
@param params: Test params used in the migration test
def copy_if_hash_differs(vm, local_path, remote_path):
Copy a file to a guest if it doesn't exist or if its MD5sum differs.
@param vm: VM object.
@param local_path: Local path.
@param remote_path: Remote path.
@return: Whether the hash differs (True) or not (False).
hash_differs = False
local_hash = utils.hash_file(local_path)
basename = os.path.basename(local_path)
output = session.cmd_output("md5sum %s" % remote_path)
if "such file" in output:
remote_hash = "0"
elif output:
remote_hash = output.split()[0]
logging.warning("MD5 check for remote path %s did not return.",
# Let's be a little more lenient here and see if it wasn't a
# temporary problem
remote_hash = "0"
if remote_hash != local_hash:
hash_differs = True
logging.debug("Copying %s to guest "
"(remote hash: %s, local hash:%s)",
basename, remote_hash, local_hash)
vm.copy_files_to(local_path, remote_path)
return hash_differs
def extract(vm, remote_path, dest_dir):
Extract the autotest .tar.bz2 file on the guest, ensuring the final
destination path will be dest_dir.
@param vm: VM object
@param remote_path: Remote file path
@param dest_dir: Destination dir for the contents
basename = os.path.basename(remote_path)
logging.debug("Extracting %s on VM %s", basename,
session.cmd("rm -rf %s" % dest_dir)
dirname = os.path.dirname(remote_path)
session.cmd("cd %s" % dirname)
session.cmd("mkdir -p %s" % os.path.dirname(dest_dir))
e_cmd = "tar xjvf %s -C %s" % (basename, os.path.dirname(dest_dir))
output = session.cmd(e_cmd, timeout=120)
autotest_dirname = ""
for line in output.splitlines():
autotest_dirname = line.split("/")[0]
if autotest_dirname != os.path.basename(dest_dir):
session.cmd("cd %s" % os.path.dirname(dest_dir))
session.cmd("mv %s %s" %
(autotest_dirname, os.path.basename(dest_dir)))
def get_results(guest_autotest_path):
Copy autotest results present on the guest back to the host.
logging.debug("Trying to copy autotest results from guest")
guest_results_dir = os.path.join(outputdir, "guest_autotest_results")
if not os.path.exists(guest_results_dir):
vm.copy_files_from("%s/results/default/*" % guest_autotest_path,
def get_results_summary(guest_autotest_path):
Get the status of the tests that were executed on the host and close
the session where autotest was being executed.
session.cmd("cd %s" % guest_autotest_path)
output = session.cmd_output("cat results/*/status")
results = scan_results.parse_results(output)
# Report test results"Results (test, status, duration, info):")
for result in results:
return results
except Exception, e:
logging.error("Error processing guest autotest results: %s", e)
return None
if not os.path.isfile(control_path):
raise error.TestError("Invalid path to autotest control file: %s" %
migrate_background = params.get("migrate_background") == "yes"
if migrate_background:
mig_timeout = float(params.get("mig_timeout", "3600"))
mig_protocol = params.get("migration_protocol", "tcp")
compressed_autotest_path = "/tmp/autotest.tar.bz2"
destination_autotest_path = "/usr/local/autotest"
# To avoid problems, let's make the test use the current AUTODIR
# (autotest client path) location
autotest_path = os.environ['AUTODIR']
autotest_basename = os.path.basename(autotest_path)
autotest_parentdir = os.path.dirname(autotest_path)
# tar the contents of bindir/autotest
cmd = ("cd %s; tar cvjf %s %s/*" %
(autotest_parentdir, compressed_autotest_path, autotest_basename))
# Until we have nested virtualization, we don't need the kvm test :)
cmd += " --exclude=%s/tests/kvm" % autotest_basename
cmd += " --exclude=%s/results" % autotest_basename
cmd += " --exclude=%s/tmp" % autotest_basename
cmd += " --exclude=%s/control*" % autotest_basename
cmd += " --exclude=*.pyc"
cmd += " --exclude=*.svn"
cmd += " --exclude=*.git"
# Copy autotest.tar.bz2
update = copy_if_hash_differs(vm, compressed_autotest_path,
# Extract autotest.tar.bz2
if update:
extract(vm, compressed_autotest_path, destination_autotest_path)
os.path.join(destination_autotest_path, 'control'))
# Run the test"Running autotest control file %s on guest, timeout %ss",
os.path.basename(control_path), timeout)
session.cmd("cd %s" % destination_autotest_path)
session.cmd("rm -f control.state")
session.cmd("rm -rf results/*")
except aexpect.ShellError:
bg = None
try:"---------------- Test output ----------------")
if migrate_background:
mig_timeout = float(params.get("mig_timeout", "3600"))
mig_protocol = params.get("migration_protocol", "tcp")
bg = virt_utils.Thread(session.cmd_output,
kwargs={'cmd': "bin/autotest control",
'timeout': timeout,
while bg.is_alive():"Autotest job did not end, start a round of "
vm.migrate(timeout=mig_timeout, protocol=mig_protocol)
session.cmd_output("bin/autotest control", timeout=timeout,
finally:"------------- End of test output ------------")
if migrate_background and bg:
except aexpect.ShellTimeoutError:
if vm.is_alive():
raise error.TestError("Timeout elapsed while waiting for job to "
raise error.TestError("Autotest job on guest failed "
"(VM terminated during job)")
except aexpect.ShellProcessTerminatedError:
raise error.TestError("Autotest job on guest failed "
"(Remote session terminated during job)")
results = get_results_summary(destination_autotest_path)
# Make a list of FAIL/ERROR/ABORT results (make sure FAIL results appear
# before ERROR results, and ERROR results appear before ABORT results)
bad_results = [r[0] for r in results if r[1] == "FAIL"]
bad_results += [r[0] for r in results if r[1] == "ERROR"]
bad_results += [r[0] for r in results if r[1] == "ABORT"]
# Fail the test if necessary
if not results:
raise error.TestFail("Autotest control file run did not produce any "
"recognizable results")
if bad_results:
if len(bad_results) == 1:
e_msg = ("Test %s failed during control file execution" %
e_msg = ("Tests %s failed during control file execution" %
" ".join(bad_results))
raise error.TestFail(e_msg)
def get_loss_ratio(output):
Get the packet loss ratio from the output of ping
@param output: Ping output.
return int(re.findall('(\d+)% packet loss', output)[0])
except IndexError:
return -1
def raw_ping(command, timeout, session, output_func):
Low-level ping command execution.
@param command: Ping command.
@param timeout: Timeout of the ping command.
@param session: Local executon hint or session to execute the ping command.
if session is None:
process = aexpect.run_bg(command, output_func=output_func,
# Send SIGINT signal to notify the timeout of running ping process,
# Because ping have the ability to catch the SIGINT signal so we can
# always get the packet loss ratio even if timeout.
if process.is_alive():
virt_utils.kill_process_tree(process.get_pid(), signal.SIGINT)
status = process.get_status()
output = process.get_output()
return status, output
output = ""
output = session.cmd_output(command, timeout=timeout,
except aexpect.ShellTimeoutError:
# Send ctrl+c (SIGINT) through ssh session
output2 = session.read_up_to_prompt(print_func=output_func)
output += output2
except aexpect.ExpectTimeoutError, e:
output += e.output
# We also need to use this session to query the return value
o2 = session.read_up_to_prompt()
except aexpect.ExpectError:
status = -1
status = int(re.findall("\d+", o2)[0])
status = -1
return status, output
def ping(dest=None, count=None, interval=None, interface=None,
packetsize=None, ttl=None, hint=None, adaptive=False,
broadcast=False, flood=False, timeout=0,
output_func=logging.debug, session=None):
Wrapper of ping.
@param dest: Destination address.
@param count: Count of icmp packet.
@param interval: Interval of two icmp echo request.
@param interface: Specified interface of the source address.
@param packetsize: Packet size of icmp.
@param ttl: IP time to live.
@param hint: Path mtu discovery hint.
@param adaptive: Adaptive ping flag.
@param broadcast: Broadcast ping flag.
@param flood: Flood ping flag.
@param timeout: Timeout for the ping command.
@param output_func: Function used to log the result of ping.
@param session: Local executon hint or session to execute the ping command.
if dest is not None:
command = "ping %s " % dest
command = "ping localhost "
if count is not None:
command += " -c %s" % count
if interval is not None:
command += " -i %s" % interval
if interface is not None:
command += " -I %s" % interface
if packetsize is not None:
command += " -s %s" % packetsize
if ttl is not None:
command += " -t %s" % ttl
if hint is not None:
command += " -M %s" % hint
if adaptive:
command += " -A"
if broadcast:
command += " -b"
if flood:
command += " -f -q"
output_func = None
return raw_ping(command, timeout, session, output_func)
def get_linux_ifname(session, mac_address):
Get the interface name through the mac address.
@param session: session to the virtual machine
@mac_address: the macaddress of nic
output = session.cmd_output("ifconfig -a")
ethname = re.findall("(\w+)\s+Link.*%s" % mac_address, output,
return ethname
return None