blob: 761162f953ccc663d0cfd3a86d18dbea0e125d28 [file] [log] [blame]
# Copyright 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import json
import logging
import os
import re
import xml.etree.cElementTree as et
from autotest_lib.client.bin import test, utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.cros import service_stopper
class graphics_dEQP(test.test):
"""
Run the drawElements Quality Program test suite.
"""
version = 1
_services = None
_blacklist = []
DEQP_BASEDIR = "/usr/local/deqp"
DEQP_MODULES = {
"dEQP-EGL" : "egl",
"dEQP-GLES2" : "gles2",
"dEQP-GLES3" : "gles3",
"dEQP-GLES31" : "gles31"
}
def initialize(self):
self._services = service_stopper.ServiceStopper(['ui', 'powerd'])
def cleanup(self):
if self._services:
self._services.restore_services()
def _get_blacklist(self):
bl_file = "blacklist"
bl_json = ""
bl_data = {}
# Read blacklist json file
bl_path = os.path.join(self.bindir, bl_file)
if os.path.isfile(bl_path):
with open(bl_path, "r") as bl_f:
for line in bl_f.readlines():
if line.strip().startswith("#"):
# Ignore comments
continue
bl_json += line
# Load json data
try:
bl_data = json.loads(bl_json)
except:
raise error.TestFail("Bad JSON data in %s" % bl_file)
# Parse json data
for bl_set_name in bl_data:
bl_set = bl_data[bl_set_name]
matches = {}
# Criteria section defines what configuration this
# blacklist set applies to.
if "criteria" in bl_set:
bl_set_criteria = bl_set["criteria"]
# Only criteria is CPU architecture right now
# More will likely be added
if "cpu_arch" in bl_set_criteria:
cpu_arch = utils.get_cpu_arch()
matches["cpu_arch"] = ""
if cpu_arch in bl_set_criteria["cpu_arch"]:
matches["cpu_arch"] = cpu_arch
# Check if all criteria were met
criteria_met = True
for criterion in matches:
if matches[criterion] == "":
criteria_met = False
break
# If criteria weren't met, ignore this blacklist set
if not criteria_met:
continue
# If no tests are specified, all tests are blacklisted,
# so create message and exit.
if not "tests" in bl_set:
msg = "All tests blacklisted"
num_criteria = len(matches)
if num_criteria:
msg += " for"
remaining_criteria = num_criteria
for criterion in matches:
remaining_criteria -= 1
msg += " %s=%s" % (criterion, matches[criterion])
if remaining_criteria:
msg += " and"
raise error.TestFail(msg)
# Add regular expressions in this set to global blacklist
bl_set_tests = bl_set["tests"]
for failure_category in bl_set_tests:
for bl_regex in bl_set_tests[failure_category]:
self._blacklist.append(re.compile(bl_regex))
def _generate_case_list_file(self, exe):
initial_dir = os.path.abspath(os.path.curdir)
base_dir = os.path.dirname(exe)
# Must be in the exe directory when running the exe for it to
# find it's test data files!
os.chdir(base_dir)
result = utils.run("%s --deqp-runmode=txt-caselist --deqp-surface-type=fbo" % exe,
stderr_is_expected=False,
stdout_tee = utils.TEE_TO_LOGS,
stderr_tee = utils.TEE_TO_LOGS)
os.chdir(initial_dir)
pattern = "Dumping all test case names in '.*' to file " \
"'(?P<caselist_path>.*)'.."
match = re.search(pattern, result.stdout)
return os.path.join(base_dir, match.group("caselist_path"))
def _test_cases(self, exe, test_filter):
with open(self._generate_case_list_file(exe)) as caselist_file:
pattern = "^TEST: (?P<testcase>.*)"
for line in caselist_file:
match = re.search(pattern, line)
if match is not None:
testcase = match.group("testcase").strip()
if testcase.startswith(test_filter):
if any(bl_regex.search(testcase) for bl_regex in self._blacklist):
logging.info("Blacklisted: " + testcase)
continue
yield match.group("testcase")
def _parse_test_result(self, result_filename):
xml = ""
xml_start = False
xml_complete = False
result = "ParseTestResultFail"
try:
with open(result_filename) as result_file:
for line in result_file.readlines():
# If the test terminates early, the XML will be incomplete
# and should not be parsed
if line.startswith("#terminateTestCaseResult"):
result = line.strip().split()[1]
break
# Will only see #endTestCaseResult if the test does not
# terminate early
if line.startswith("#endTestCaseResult"):
xml_complete = True
break
if xml_start:
xml += line
elif line.startswith("#beginTestCaseResult"):
xml_start = True
if xml_complete:
root = et.fromstring(xml)
result = root.find("Result").get("StatusCode").strip()
except:
# Default result of ParseTestResultFail is sufficient
pass
return result
def run_once(self, opts = []):
test_results = {}
test_failures = 0
test_count = 0
options = dict(
filter = "",
timeout = 10 * 60, # 10 minutes max per test
ignore_blacklist = "False"
)
options.update(utils.args_to_dict(opts))
logging.info("Test Options: %s", options)
timeout = int(options["timeout"])
test_filter = options["filter"]
ignore_blacklist = options["ignore_blacklist"] == "True"
if not test_filter:
raise error.TestError("No dEQP test filter specified")
# Determine module from filter
test_prefix = test_filter.split('.')[0]
if test_prefix in self.DEQP_MODULES:
module = self.DEQP_MODULES[test_prefix]
else:
raise error.TestError("Invalid test filter: %s" % test_filter)
# Read blacklist file
if not ignore_blacklist:
self._get_blacklist()
log_path = os.path.join(self.outputdir, "deqp-%s-logs" % module)
mod_path = os.path.join(self.DEQP_BASEDIR, "modules", module)
mod_exe = os.path.join(mod_path, "deqp-%s" % module)
self._services.stop_services()
utils.run('pkill frecon', ignore_status=True)
os.mkdir(log_path)
# Must be in the mod_exe directory when running the mod_exe for it
# to find it's test data files!
os.chdir(mod_path)
for test_case in self._test_cases(mod_exe, test_filter):
logging.info('TestCase: %s', test_case)
test_count += 1
log_file = "%s.log" % os.path.join(log_path, test_case)
command = ("%s --deqp-case=%s --deqp-log-filename=%s "
"--deqp-surface-type=fbo --deqp-log-images=disable "
"--deqp-surface-width=256 --deqp-surface-height=256 "
"--deqp-watchdog=enable "
% (mod_exe, test_case, log_file))
try:
utils.run(command, timeout = timeout,
stderr_is_expected=False,
ignore_status=True,
stdout_tee = utils.TEE_TO_LOGS,
stderr_tee = utils.TEE_TO_LOGS)
result = self._parse_test_result(log_file)
except error.CmdTimeoutError:
result = "TestTimeout"
except error.CmdError:
result = "CommandFailed"
except:
result = "UnexpectedError"
logging.info('Result: %s', result)
if result.lower() not in ["pass", "notsupported"]:
test_failures += 1
test_results[result] = test_results.get(result, 0) + 1
self.write_perf_keyval(test_results)
if test_failures:
raise error.TestFail("%d/%d tests failed." \
% (test_failures, test_count))
if test_count == 0:
raise error.TestError("No test cases found for filter:%s!" \
% (test_filter))