# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging, os, platform, re, signal, tempfile, time, uuid
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib import utils
class TimeoutError(error.TestError):
"""Error raised when we time out when waiting on a condition."""
class Crossystem(object):
"""A wrapper for the crossystem utility."""
def __init__(self, client):
self.cros_system_data = {}
self._client = client
def init(self):
self.cros_system_data = {}
(_, fname) = tempfile.mkstemp()
f = open(fname, 'w')'crossystem', stdout_tee=f)
text = utils.read_file(fname)
for line in text.splitlines():
assignment_string = line.split('#')[0]
if not assignment_string.count('='):
(name, value) = assignment_string.split('=', 1)
self.cros_system_data[name.strip()] = value.strip()
def __getattr__(self, name):
Retrieve a crosssystem attribute.
The call will return the crossystem reported
return lambda : self.cros_system_data[name]
def get_oldest_pid_by_name(name):
Return the oldest pid of a process whose name perfectly matches |name|.
name is an egrep expression, which will be matched against the entire name
of processes on the system. For example:
on a system running
8600 ? 00:00:04 chrome
8601 ? 00:00:00 chrome
8602 ? 00:00:00 chrome-sandbox
would return 8600, as that's the oldest process that matches.
chrome-sandbox would not be matched.
name: egrep expression to match. Will be anchored at the beginning and
end of the match string.
pid as an integer, or None if one cannot be found.
ValueError if pgrep returns something odd.
str_pid = utils.system_output(
'pgrep -o ^%s$' % name, ignore_status=True).rstrip()
if str_pid:
return int(str_pid)
def get_process_list(name, command_line=None):
Return the list of pid for matching process |name command_line|.
on a system running
31475 ? 0:06 /opt/google/chrome/chrome --allow-webui-compositing -
31478 ? 0:00 /opt/google/chrome/chrome-sandbox /opt/google/chrome/
31485 ? 0:00 /opt/google/chrome/chrome --type=zygote --log-level=1
31532 ? 1:05 /opt/google/chrome/chrome --type=renderer
would return ['31475', '31485', '31532']
get_process_list('chrome', '--type=renderer')
would return ['31532']
name: process name to search for. If command_line is provided, name is
matched against full command line. If command_line is not provided,
name is only matched against the process name.
command line: when command line is passed, the full process command line
is used for matching.
list of PIDs of the matching processes.
# TODO(rohitbm)
flag = '-x' if not command_line else '-f'
name = '\'%s.*%s\'' % (name, command_line) if command_line else name
str_pid = utils.system_output(
'pgrep %s %s' % (flag, name), ignore_status=True).rstrip()
return str_pid
def nuke_process_by_name(name, with_prejudice=False):
pid = get_oldest_pid_by_name(name)
except Exception as e:
if pid is None:
raise error.AutoservPidAlreadyDeadError(
'No process matching %s.' % name)
if with_prejudice:
utils.nuke_pid(pid, [signal.SIGKILL])
def poll_for_condition(
condition, exception=None, timeout=10, sleep_interval=0.1, desc=None):
"""Poll until a condition becomes true.
condition: function taking no args and returning bool
exception: exception to throw if condition doesn't become true
timeout: maximum number of seconds to wait
sleep_interval: time to sleep between polls
desc: description of default TimeoutError used if 'exception' is None
The true value that caused the poll loop to terminate.
'exception' arg if supplied; site_utils.TimeoutError otherwise
start_time = time.time()
while True:
value = condition()
if value:
return value
if time.time() + sleep_interval - start_time > timeout:
if exception:
raise exception
if desc:
desc = 'Timed out waiting for condition: %s' % desc
desc = 'Timed out waiting for unnamed condition'
raise TimeoutError, desc
def save_vm_state(checkpoint):
"""Saves the current state of the virtual machine.
This function is a NOOP if the test is not running under a virtual machine
with the USB serial port redirected.
checkpoint - Name used to identify this state
# The QEMU monitor has been redirected to the guest serial port located at
# /dev/ttyUSB0. To save the state of the VM, we just send the 'savevm'
# command to the serial port.
proc = platform.processor()
if 'QEMU' in proc and os.path.exists('/dev/ttyUSB0'):'Saving VM state "%s"' % checkpoint)
serial = open('/dev/ttyUSB0', 'w')
serial.write("savevm %s\r\n" % checkpoint)'Done saving VM state "%s"' % checkpoint)
def check_raw_dmesg(dmesg, message_level, whitelist):
"""Checks dmesg for unexpected warnings.
This function parses dmesg for message with message_level <= message_level
which do not appear in the whitelist.
dmesg - string containing raw dmesg buffer
message_level - minimum message priority to check
whitelist - messages to ignore
List of unexpected warnings
whitelist_re = re.compile(r'(%s)' % '|'.join(whitelist))
unexpected = []
for line in dmesg.splitlines():
if int(line[1]) <= message_level:
stripped_line = line.split('] ', 1)[1]
return unexpected
def verify_mesg_set(mesg, regex, whitelist):
"""Verifies that the exact set of messages are present in a text.
This function finds all strings in the text matching a certain regex, and
then verifies that all expected strings are present in the set, and no
unexpected strings are there.
mesg - the mutiline text to be scanned
regex - regular expression to match
whitelist - messages to find in the output, a list of strings
(potentially regexes) to look for in the filtered output. All these
strings must be there, and no other strings should be present in the
filtered output.
string of inconsistent findings (i.e. an empty string on success).
rv = []
missing_strings = []
present_strings = []
for line in mesg.splitlines():
if not'%s' % regex, line):
present_strings.append(line.split('] ', 1)[1])
for string in whitelist:
for present_string in list(present_strings):
if'^%s$' % string, present_string):
if present_strings:
rv.append('unexpected strings:')
if missing_strings:
rv.append('missing strings:')
return '\n'.join(rv)
def target_is_pie():
"""Returns whether the toolchain produces a PIE (position independent
executable) by default.
True if the target toolchain produces a PIE by default.
False otherwise.
command = 'echo | ${CC} -E -dD -P - | grep -i pie'
result = utils.system_output(command, retain_output=True,
if'#define __PIE__', result):
return True
return False
def target_is_x86():
"""Returns whether the toolchain produces an x86 object
True if the target toolchain produces an x86 object
False otherwise.
command = 'echo | ${CC} -E -dD -P - | grep -i 86'
result = utils.system_output(command, retain_output=True,
if'__i386__', result) or'__x86_64__', result):
return True
return False
def mounts():
ret = []
for line in file('/proc/mounts'):
m = re.match(r'(?P<src>\S+) (?P<dest>\S+) (?P<type>\S+) (?P<opts>\S+).*', line)
if m:
return ret
def is_mountpoint(path):
return path in [ m['dest'] for m in mounts() ]
def require_mountpoint(path):
Raises an exception if path is not a mountpoint.
if not is_mountpoint(path):
raise error.TestFail('Path not mounted: "%s"' % path)
def random_username():
return str(uuid.uuid4()) + ''
def parse_cmd_output(command,
"""Runs a command on a host object to retrieve host attributes.
The command should output to stdout in the format of:
<key> = <value> # <optional_comment>
@param command: Command to execute on the host.
@param run_method: Function to use to execute the command. Defaults to so that the command will be executed locally.
Can be replace with a call so that it will
execute on a DUT or external machine. Method must accept
a command argument, stdout_tee and stderr_tee args and
return a result object with a string attribute stdout
which will be parsed.
@returns a dictionary mapping host attributes to their values.
result = {}
# Suppresses stdout so that the files are not printed to the logs.
cmd_result = run_method(command, stdout_tee=None, stderr_tee=None)
for line in cmd_result.stdout.splitlines():
# Lines are of the format "<key> = <value> # <comment>"
key_value = re.match('^\s*(?P<key>[^ ]+)\s*=\s*(?P<value>[^ ]+)'
'(?:\s*#.*)?$', line)
if key_value:
result['key')] ='value')
return result
def set_from_keyval_output(out, delimiter=' '):
"""Parse delimiter-separated key-val output into a set of tuples.
Output is expected to be multiline text output from a command.
Stuffs the key-vals into tuples in a set to be later compared.
e.g. deactivated 0
disableForceClear 0
==> set(('deactivated', '0'), ('disableForceClear', '0'))
@param out: multiple lines of space-separated key-val pairs.
@param delimiter: character that separates key from val. Usually a
space but may be '=' or something else.
@return set of key-val tuples.
results = set()
kv_match_re = re.compile('([^ ]+)%s(.*)' % delimiter)
for linecr in out.splitlines():
match = kv_match_re.match(linecr.strip())
if match:
return results