blob: ecbcd510d384f53b1ba7ea06a7e109c21928404c [file] [log] [blame]
# Copyright 2019 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Runs tests for the given input files.
All tests are run in parallel.
"""
# NOTE: An alternative mentioned on the initial CL for this
# https://chromium-review.googlesource.com/c/chromiumos/third_party/toolchain-utils/+/1516414
# is pytest. It looks like that brings some complexity (and makes use outside
# of the chroot a bit more obnoxious?), but might be worth exploring if this
# starts to grow quite complex on its own.
import argparse
import collections
import multiprocessing.pool
import os
import shlex
import signal
import subprocess
import sys
from typing import Optional, Tuple
TestSpec = collections.namedtuple("TestSpec", ["directory", "command"])
def _make_relative_to_toolchain_utils(toolchain_utils, path):
"""Cleans & makes a path relative to toolchain_utils.
Raises if that path isn't under toolchain_utils.
"""
# abspath has the nice property that it removes any markers like './'.
as_abs = os.path.abspath(path)
result = os.path.relpath(as_abs, start=toolchain_utils)
if result.startswith("../"):
raise ValueError("Non toolchain-utils directory found: %s" % result)
return result
def _run_test(test_spec: TestSpec, timeout: int) -> Tuple[Optional[int], str]:
"""Runs a test.
Returns a tuple indicating the process' exit code, and the combined
stdout+stderr of the process. If the exit code is None, the process timed
out.
"""
# Each subprocess gets its own process group, since many of these tests
# spawn subprocesses for a variety of reasons. If these tests time out, we
# want to be able to clean up all of the children swiftly.
# pylint: disable=subprocess-popen-preexec-fn
with subprocess.Popen(
test_spec.command,
cwd=test_spec.directory,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
encoding="utf-8",
# TODO(b/296616854): This is unsafe, and we should use
# process_group=0 when we have upgraded to Python 3.11.
preexec_fn=lambda: os.setpgid(0, 0),
) as p:
child_pgid = p.pid
try:
out, _ = p.communicate(timeout=timeout)
return p.returncode, out
except BaseException as e:
# Try to shut the processes down gracefully.
os.killpg(child_pgid, signal.SIGINT)
try:
# 2 seconds is arbitrary, but given that these are unittests,
# should be plenty of time for them to shut down.
p.wait(timeout=2)
except subprocess.TimeoutExpired:
os.killpg(child_pgid, signal.SIGKILL)
except:
os.killpg(child_pgid, signal.SIGKILL)
raise
if isinstance(e, subprocess.TimeoutExpired):
# We just killed the entire process group. This should complete
# ~immediately. If it doesn't, something is very wrong.
out, _ = p.communicate(timeout=5)
return (None, out)
raise
def _run_test_scripts(pool, all_tests, timeout, show_successful_output=False):
"""Runs a list of TestSpecs. Returns whether all of them succeeded."""
results = [
pool.apply_async(_run_test, (test, timeout)) for test in all_tests
]
failures = []
for i, (test, future) in enumerate(zip(all_tests, results)):
# Add a bit more spacing between outputs.
if show_successful_output and i:
print("\n")
pretty_test = shlex.join(test.command)
pretty_directory = os.path.relpath(test.directory)
if pretty_directory == ".":
test_message = pretty_test
else:
test_message = "%s in %s/" % (pretty_test, pretty_directory)
print("## %s ... " % test_message, end="")
# Be sure that the users sees which test is running.
sys.stdout.flush()
exit_code, stdout = future.get()
if exit_code == 0:
print("PASS")
is_failure = False
else:
print("TIMEOUT" if exit_code is None else "FAIL")
failures.append(test_message)
is_failure = True
if show_successful_output or is_failure:
if stdout:
print("-- Stdout:\n", stdout)
else:
print("-- No stdout was produced.")
if failures:
word = "tests" if len(failures) > 1 else "test"
print(f"{len(failures)} {word} failed:")
for failure in failures:
print(f"\t{failure}")
return not failures
def _compress_list(l):
"""Removes consecutive duplicate elements from |l|.
>>> _compress_list([])
[]
>>> _compress_list([1, 1])
[1]
>>> _compress_list([1, 2, 1])
[1, 2, 1]
"""
result = []
for e in l:
if result and result[-1] == e:
continue
result.append(e)
return result
def _find_go_tests(test_paths):
"""Returns TestSpecs for the go folders of the given files"""
assert all(os.path.isabs(path) for path in test_paths)
dirs_with_gofiles = set(
os.path.dirname(p) for p in test_paths if p.endswith(".go")
)
command = ["go", "test", "-vet=all"]
# Note: We sort the directories to be deterministic.
return [
TestSpec(directory=d, command=command)
for d in sorted(dirs_with_gofiles)
]
def main(argv):
default_toolchain_utils = os.path.abspath(os.path.dirname(__file__))
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--show_all_output",
action="store_true",
help="show stdout of successful tests",
)
parser.add_argument(
"--toolchain_utils",
default=default_toolchain_utils,
help="directory of toolchain-utils. Often auto-detected",
)
parser.add_argument(
"file", nargs="*", help="a file that we should run tests for"
)
parser.add_argument(
"--timeout",
default=120,
type=int,
help="Time to allow a test to execute before timing it out, in "
"seconds.",
)
args = parser.parse_args(argv)
modified_files = [os.path.abspath(f) for f in args.file]
show_all_output = args.show_all_output
toolchain_utils = args.toolchain_utils
if not modified_files:
print("No files given. Exit.")
return 0
tests_to_run = []
if any(x.endswith(".py") for x in modified_files):
tests_to_run.append(
TestSpec(
directory=toolchain_utils,
command=("./run_python_tests.sh",),
)
)
tests_to_run += _find_go_tests(modified_files)
# TestSpecs have lists, so we can't use a set. We'd likely want to keep them
# sorted for determinism anyway.
tests_to_run = sorted(set(tests_to_run))
with multiprocessing.pool.ThreadPool() as pool:
success = _run_test_scripts(
pool, tests_to_run, args.timeout, show_all_output
)
return 0 if success else 1