presubmits: add types to check-presubmit.py.
Since presubmits handle py3 gracefully now && we're starting to add
types in many places, it seems worth it to add these for better
understandability in the future.
No functional change is intended.
BUG=chromium:1100659
TEST=Ran the presubmits.
Change-Id: If4134b4d10b0f3692a52803e12c8e503b4574be5
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/third_party/toolchain-utils/+/2274727
Reviewed-by: Manoj Gupta <manojgupta@chromium.org>
Tested-by: George Burgess <gbiv@chromium.org>
diff --git a/toolchain_utils_githooks/check-presubmit.py b/toolchain_utils_githooks/check-presubmit.py
index 2637334..10eb6e3 100755
--- a/toolchain_utils_githooks/check-presubmit.py
+++ b/toolchain_utils_githooks/check-presubmit.py
@@ -7,11 +7,7 @@
"""Runs presubmit checks against a bundle of files."""
-# To keep `cros lint` happy
-from __future__ import division, print_function
-
import argparse
-import collections
import datetime
import multiprocessing
import multiprocessing.pool
@@ -26,7 +22,9 @@
import typing as t
-def run_command_unchecked(command, cwd, env=None):
+def run_command_unchecked(command: t.List[str],
+ cwd: str,
+ env: t.Dict[str, str] = None) -> t.Tuple[int, str]:
"""Runs a command in the given dir, returning its exit code and stdio."""
p = subprocess.Popen(
command,
@@ -42,12 +40,12 @@
return exit_code, stdout.decode('utf-8', 'replace')
-def has_executable_on_path(exe):
+def has_executable_on_path(exe: str) -> bool:
"""Returns whether we have `exe` somewhere on our $PATH"""
return shutil.which(exe) is not None
-def escape_command(command):
+def escape_command(command: t.Iterable[str]) -> str:
"""Returns a human-readable and copy-pastable shell command.
Only intended for use in output to users. shell=True is strongly discouraged.
@@ -55,18 +53,18 @@
return ' '.join(shlex.quote(x) for x in command)
-def remove_deleted_files(files):
+def remove_deleted_files(files: t.Iterable[str]) -> t.List[str]:
return [f for f in files if os.path.exists(f)]
-def is_file_executable(file_path):
+def is_file_executable(file_path: str) -> bool:
return os.access(file_path, os.X_OK)
# As noted in our docs, some of our Python code depends on modules that sit in
# toolchain-utils/. Add that to PYTHONPATH to ensure that things like `cros
# lint` are kept happy.
-def env_with_pythonpath(toolchain_utils_root):
+def env_with_pythonpath(toolchain_utils_root: str) -> t.Dict[str, str]:
env = dict(os.environ)
if 'PYTHONPATH' in env:
env['PYTHONPATH'] += ':' + toolchain_utils_root
@@ -86,12 +84,22 @@
# least ${number_of_concurrently_running_checkers}+1 threads are present
# in the pool. In order words, blocking on results from the provided
# threadpool is OK.
-CheckResult = collections.namedtuple('CheckResult',
- ('ok', 'output', 'autofix_commands'))
+CheckResult = t.NamedTuple(
+ 'CheckResult',
+ (
+ ('ok', bool),
+ ('output', str),
+ ('autofix_commands', t.List[t.List[str]]),
+ ),
+)
-def get_check_result_or_catch(task):
- """Returns the result of task(); if that raises, returns a CheckResult."""
+def get_check_result_or_catch(
+ task: multiprocessing.pool.ApplyResult) -> CheckResult:
+ """Returns the result of task(); if that raises, returns a CheckResult.
+
+ The task is expected to return a CheckResult on get().
+ """
try:
return task.get()
except Exception:
@@ -103,7 +111,8 @@
)
-def check_yapf(toolchain_utils_root, python_files):
+def check_yapf(toolchain_utils_root: str,
+ python_files: t.Iterable[str]) -> CheckResult:
"""Subchecker of check_py_format. Checks python file formats with yapf"""
command = ['yapf', '-d'] + python_files
exit_code, stdout_and_stderr = run_command_unchecked(
@@ -146,7 +155,7 @@
)
-def check_python_file_headers(python_files):
+def check_python_file_headers(python_files: t.Iterable[str]) -> CheckResult:
"""Subchecker of check_py_format. Checks python #!s"""
add_hashbang = []
remove_hashbang = []
@@ -188,7 +197,9 @@
)
-def check_py_format(toolchain_utils_root, thread_pool, files):
+def check_py_format(toolchain_utils_root: str,
+ thread_pool: multiprocessing.pool.ThreadPool,
+ files: t.Iterable[str]) -> CheckResult:
"""Runs yapf on files to check for style bugs. Also checks for #!s."""
yapf = 'yapf'
if not has_executable_on_path(yapf):
@@ -217,7 +228,9 @@
return [(name, get_check_result_or_catch(task)) for name, task in tasks]
-def check_cros_lint(toolchain_utils_root, thread_pool, files):
+def check_cros_lint(
+ toolchain_utils_root: str, thread_pool: multiprocessing.pool.ThreadPool,
+ files: t.Iterable[str]) -> t.Union[t.List[CheckResult], CheckResult]:
"""Runs `cros lint`"""
fixed_env = env_with_pythonpath(toolchain_utils_root)
@@ -225,7 +238,7 @@
# We have to support users who don't have a chroot. So we either run `cros
# lint` (if it's been made available to us), or we try a mix of
# pylint+golint.
- def try_run_cros_lint(cros_binary):
+ def try_run_cros_lint(cros_binary: str) -> t.Optional[CheckResult]:
exit_code, output = run_command_unchecked(
[cros_binary, 'lint', '--py3', '--'] + files,
toolchain_utils_root,
@@ -254,7 +267,7 @@
tasks = []
- def check_result_from_command(command):
+ def check_result_from_command(command: t.List[str]) -> CheckResult:
exit_code, output = run_command_unchecked(
command, toolchain_utils_root, env=fixed_env)
return CheckResult(
@@ -266,7 +279,7 @@
python_files = [f for f in remove_deleted_files(files) if f.endswith('.py')]
if python_files:
- def run_pylint():
+ def run_pylint() -> CheckResult:
# pylint is required. Fail hard if it DNE.
return check_result_from_command(['pylint'] + python_files)
@@ -275,7 +288,7 @@
go_files = [f for f in remove_deleted_files(files) if f.endswith('.go')]
if go_files:
- def run_golint():
+ def run_golint() -> CheckResult:
if has_executable_on_path('golint'):
return check_result_from_command(['golint', '-set_exit_status'] +
go_files)
@@ -362,7 +375,9 @@
)
-def check_tests(toolchain_utils_root, _thread_pool, files):
+def check_tests(toolchain_utils_root: str,
+ _thread_pool: multiprocessing.pool.ThreadPool,
+ files: t.List[str]) -> CheckResult:
"""Runs tests."""
exit_code, stdout_and_stderr = run_command_unchecked(
[os.path.join(toolchain_utils_root, 'run_tests_for.py'), '--'] + files,
@@ -374,15 +389,17 @@
)
-def detect_toolchain_utils_root():
+def detect_toolchain_utils_root() -> str:
return os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
-def process_check_result(check_name, check_results, start_time):
+def process_check_result(
+ check_name: str, check_results: t.Union[t.List[CheckResult], CheckResult],
+ start_time: datetime.datetime) -> t.Tuple[bool, t.List[t.List[str]]]:
"""Prints human-readable output for the given check_results."""
indent = ' '
- def indent_block(text):
+ def indent_block(text: str) -> str:
return indent + text.replace('\n', '\n' + indent)
if isinstance(check_results, CheckResult):
@@ -426,7 +443,8 @@
return ok, autofix_commands
-def try_autofix(all_autofix_commands, toolchain_utils_root):
+def try_autofix(all_autofix_commands: t.List[t.List[str]],
+ toolchain_utils_root: str) -> None:
"""Tries to run all given autofix commands, if appropriate."""
if not all_autofix_commands:
return
@@ -534,7 +552,7 @@
subprocess.check_call(['pip', 'install', '--user', 'scipy'])
-def main(argv):
+def main(argv: t.List[str]) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
'--no_autofix',