#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2019 The ChromiumOS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Runs presubmit checks against a bundle of files."""

import argparse
import datetime
import multiprocessing
import multiprocessing.pool
import os
from pathlib import Path
import re
import shlex
import shutil
import subprocess
import sys
import threading
import traceback
import typing as t


def run_command_unchecked(command: t.List[str],
                          cwd: str,
                          env: t.Dict[str, str] = None) -> t.Tuple[int, str]:
  """Runs a command in the given dir, returning its exit code and stdio."""
  p = subprocess.Popen(
      command,
      cwd=cwd,
      stdin=subprocess.DEVNULL,
      stdout=subprocess.PIPE,
      stderr=subprocess.STDOUT,
      env=env,
  )

  stdout, _ = p.communicate()
  exit_code = p.wait()
  return exit_code, stdout.decode('utf-8', 'replace')


def has_executable_on_path(exe: str) -> bool:
  """Returns whether we have `exe` somewhere on our $PATH"""
  return shutil.which(exe) is not None


def escape_command(command: t.Iterable[str]) -> str:
  """Returns a human-readable and copy-pastable shell command.

  Only intended for use in output to users. shell=True is strongly discouraged.
  """
  return ' '.join(shlex.quote(x) for x in command)


def remove_deleted_files(files: t.Iterable[str]) -> t.List[str]:
  return [f for f in files if os.path.exists(f)]


def is_file_executable(file_path: str) -> bool:
  return os.access(file_path, os.X_OK)


# As noted in our docs, some of our Python code depends on modules that sit in
# toolchain-utils/. Add that to PYTHONPATH to ensure that things like `cros
# lint` are kept happy.
def env_with_pythonpath(toolchain_utils_root: str) -> t.Dict[str, str]:
  env = dict(os.environ)
  if 'PYTHONPATH' in env:
    env['PYTHONPATH'] += ':' + toolchain_utils_root
  else:
    env['PYTHONPATH'] = toolchain_utils_root
  return env


# Each checker represents an independent check that's done on our sources.
#
# They should:
#  - never write to stdout/stderr or read from stdin directly
#  - return either a CheckResult, or a list of [(subcheck_name, CheckResult)]
#  - ideally use thread_pool to check things concurrently
#    - though it's important to note that these *also* live on the threadpool
#      we've provided. It's the caller's responsibility to guarantee that at
#      least ${number_of_concurrently_running_checkers}+1 threads are present
#      in the pool. In order words, blocking on results from the provided
#      threadpool is OK.
CheckResult = t.NamedTuple(
    'CheckResult',
    (
        ('ok', bool),
        ('output', str),
        ('autofix_commands', t.List[t.List[str]]),
    ),
)


def get_check_result_or_catch(
    task: multiprocessing.pool.ApplyResult) -> CheckResult:
  """Returns the result of task(); if that raises, returns a CheckResult.

  The task is expected to return a CheckResult on get().
  """
  try:
    return task.get()
  except Exception:
    return CheckResult(
        ok=False,
        output='Check exited with an unexpected exception:\n%s' %
        traceback.format_exc(),
        autofix_commands=[],
    )


def check_isort(toolchain_utils_root: str,
                python_files: t.Iterable[str]) -> CheckResult:
  """Subchecker of check_py_format. Checks python file formats with isort"""
  chromite = Path('/mnt/host/source/chromite')
  isort = chromite / 'scripts' / 'isort'
  config_file = chromite / '.isort.cfg'

  if not (isort.exists() and config_file.exists()):
    return CheckResult(
        ok=True,
        output='isort not found; skipping',
        autofix_commands=[],
    )

  config_file_flag = f'--settings-file={config_file}'
  command = [isort, '-c', config_file_flag] + python_files
  exit_code, stdout_and_stderr = run_command_unchecked(
      command, cwd=toolchain_utils_root)

  # isort fails when files have broken formatting.
  if not exit_code:
    return CheckResult(
        ok=True,
        output='',
        autofix_commands=[],
    )

  bad_files = []
  bad_file_re = re.compile(
      r'^ERROR: (.*) Imports are incorrectly sorted and/or formatted\.$')
  for line in stdout_and_stderr.splitlines():
    m = bad_file_re.match(line)
    if m:
      file_name, = m.groups()
      bad_files.append(file_name.strip())

  if not bad_files:
    return CheckResult(
        ok=False,
        output='`%s` failed; stdout/stderr:\n%s' %
        (escape_command(command), stdout_and_stderr),
        autofix_commands=[],
    )

  autofix = [str(isort), config_file_flag] + bad_files
  return CheckResult(
      ok=False,
      output='The following file(s) have formatting errors: %s' % bad_files,
      autofix_commands=[autofix],
  )


def check_yapf(toolchain_utils_root: str, yapf: Path,
               python_files: t.Iterable[str]) -> CheckResult:
  """Subchecker of check_py_format. Checks python file formats with yapf"""
  # Folks have been bitten by accidentally using multiple yapf versions in the
  # past. This is an issue, since newer versions of yapf sometimes format
  # things differently. Make the version obvious.
  command = [yapf, '--version']
  exit_code, stdout_and_stderr = run_command_unchecked(
      command, cwd=toolchain_utils_root)
  if exit_code:
    return CheckResult(
        ok=False,
        output=f'Failed getting yapf version; stdstreams: {stdout_and_stderr}',
        autofix_commands=[],
    )

  yapf_version = stdout_and_stderr.strip()
  # This is the depot_tools version. If folks have this, things will break for
  # them. Ask them to upgrade. Peephole this rather than making some
  # complicated version-parsing scheme, since it's likely that everyone with a
  # too-old version is using specifically the depot_tools one.
  if yapf_version == 'yapf 0.27.0':
    return CheckResult(
        ok=False,
        output='YAPF is too old; please upgrade it: `pip install --user yapf`',
        autofix_commands=[],
    )

  command = [yapf, '-d'] + python_files
  exit_code, stdout_and_stderr = run_command_unchecked(
      command, cwd=toolchain_utils_root)

  # yapf fails when files are poorly formatted.
  if exit_code == 0:
    return CheckResult(
        ok=True,
        output=f'Using {yapf_version}, no issues were found.',
        autofix_commands=[],
    )

  bad_files = []
  bad_file_re = re.compile(r'^--- (.*)\s+\(original\)\s*$')
  for line in stdout_and_stderr.splitlines():
    m = bad_file_re.match(line)
    if not m:
      continue

    file_name, = m.groups()
    bad_files.append(file_name.strip())

  # ... and doesn't really differentiate "your files have broken formatting"
  # errors from general ones. So if we see nothing diffed, assume that a
  # general error happened.
  if not bad_files:
    return CheckResult(
        ok=False,
        output='`%s` failed; stdout/stderr:\n%s' %
        (escape_command(command), stdout_and_stderr),
        autofix_commands=[],
    )

  autofix = [str(yapf), '-i'] + bad_files
  return CheckResult(
      ok=False,
      output=f'Using {yapf_version}, these file(s) have formatting errors: '
      f'{bad_files}',
      autofix_commands=[autofix],
  )


def check_python_file_headers(python_files: t.Iterable[str]) -> CheckResult:
  """Subchecker of check_py_format. Checks python #!s"""
  add_hashbang = []
  remove_hashbang = []

  for python_file in python_files:
    needs_hashbang = is_file_executable(python_file)
    with open(python_file, encoding='utf-8') as f:
      has_hashbang = f.read(2) == '#!'
      if needs_hashbang == has_hashbang:
        continue

      if needs_hashbang:
        add_hashbang.append(python_file)
      else:
        remove_hashbang.append(python_file)

  autofix = []
  output = []
  if add_hashbang:
    output.append('The following files have no #!, but need one: %s' %
                  add_hashbang)
    autofix.append(['sed', '-i', '1i#!/usr/bin/env python3'] + add_hashbang)

  if remove_hashbang:
    output.append("The following files have a #!, but shouldn't: %s" %
                  remove_hashbang)
    autofix.append(['sed', '-i', '1d'] + remove_hashbang)

  if not output:
    return CheckResult(
        ok=True,
        output='',
        autofix_commands=[],
    )
  return CheckResult(
      ok=False,
      output='\n'.join(output),
      autofix_commands=autofix,
  )


def check_py_format(toolchain_utils_root: str,
                    thread_pool: multiprocessing.pool.ThreadPool,
                    files: t.Iterable[str]) -> t.List[CheckResult]:
  """Runs yapf on files to check for style bugs. Also checks for #!s."""
  pip_yapf = Path('~/.local/bin/yapf').expanduser()
  if pip_yapf.exists():
    yapf = pip_yapf
  else:
    yapf = 'yapf'
    if not has_executable_on_path(yapf):
      return CheckResult(
          ok=False,
          output="yapf isn't available on your $PATH. Please either "
          'enter a chroot, or place depot_tools on your $PATH.',
          autofix_commands=[],
      )

  python_files = [f for f in remove_deleted_files(files) if f.endswith('.py')]
  if not python_files:
    return CheckResult(
        ok=True,
        output='no python files to check',
        autofix_commands=[],
    )

  tasks = [
      ('check_yapf',
       thread_pool.apply_async(check_yapf,
                               (toolchain_utils_root, yapf, python_files))),
      ('check_isort',
       thread_pool.apply_async(check_isort,
                               (toolchain_utils_root, python_files))),
      ('check_file_headers',
       thread_pool.apply_async(check_python_file_headers, (python_files, ))),
  ]
  return [(name, get_check_result_or_catch(task)) for name, task in tasks]


def find_chromeos_root_directory() -> t.Optional[str]:
  return os.getenv('CHROMEOS_ROOT_DIRECTORY')


def check_cros_lint(
    toolchain_utils_root: str, thread_pool: multiprocessing.pool.ThreadPool,
    files: t.Iterable[str]) -> t.Union[t.List[CheckResult], CheckResult]:
  """Runs `cros lint`"""

  fixed_env = env_with_pythonpath(toolchain_utils_root)

  # We have to support users who don't have a chroot. So we either run `cros
  # lint` (if it's been made available to us), or we try a mix of
  # pylint+golint.
  def try_run_cros_lint(cros_binary: str) -> t.Optional[CheckResult]:
    exit_code, output = run_command_unchecked([cros_binary, 'lint', '--'] +
                                              files,
                                              toolchain_utils_root,
                                              env=fixed_env)

    # This is returned specifically if cros couldn't find the ChromeOS tree
    # root.
    if exit_code == 127:
      return None

    return CheckResult(
        ok=exit_code == 0,
        output=output,
        autofix_commands=[],
    )

  cros_lint = try_run_cros_lint('cros')
  if cros_lint is not None:
    return cros_lint

  cros_root = find_chromeos_root_directory()
  if cros_root:
    cros_lint = try_run_cros_lint(os.path.join(cros_root, 'chromite/bin/cros'))
    if cros_lint is not None:
      return cros_lint

  tasks = []

  def check_result_from_command(command: t.List[str]) -> CheckResult:
    exit_code, output = run_command_unchecked(command,
                                              toolchain_utils_root,
                                              env=fixed_env)
    return CheckResult(
        ok=exit_code == 0,
        output=output,
        autofix_commands=[],
    )

  python_files = [f for f in remove_deleted_files(files) if f.endswith('.py')]
  if python_files:

    def run_pylint() -> CheckResult:
      # pylint is required. Fail hard if it DNE.
      return check_result_from_command(['pylint'] + python_files)

    tasks.append(('pylint', thread_pool.apply_async(run_pylint)))

  go_files = [f for f in remove_deleted_files(files) if f.endswith('.go')]
  if go_files:

    def run_golint() -> CheckResult:
      if has_executable_on_path('golint'):
        return check_result_from_command(['golint', '-set_exit_status'] +
                                         go_files)

      complaint = '\n'.join((
          'WARNING: go linting disabled. golint is not on your $PATH.',
          'Please either enter a chroot, or install go locally. Continuing.',
      ))
      return CheckResult(
          ok=True,
          output=complaint,
          autofix_commands=[],
      )

    tasks.append(('golint', thread_pool.apply_async(run_golint)))

  complaint = '\n'.join((
      'WARNING: No ChromeOS checkout detected, and no viable CrOS tree',
      'found; falling back to linting only python and go. If you have a',
      'ChromeOS checkout, please either develop from inside of the source',
      'tree, or set $CHROMEOS_ROOT_DIRECTORY to the root of it.',
  ))

  results = [(name, get_check_result_or_catch(task)) for name, task in tasks]
  if not results:
    return CheckResult(
        ok=True,
        output=complaint,
        autofix_commands=[],
    )

  # We need to complain _somewhere_.
  name, angry_result = results[0]
  angry_complaint = (complaint + '\n\n' + angry_result.output).strip()
  results[0] = (name, angry_result._replace(output=angry_complaint))
  return results


def check_go_format(toolchain_utils_root, _thread_pool, files):
  """Runs gofmt on files to check for style bugs."""
  gofmt = 'gofmt'
  if not has_executable_on_path(gofmt):
    return CheckResult(
        ok=False,
        output="gofmt isn't available on your $PATH. Please either "
        'enter a chroot, or place your go bin/ directory on your $PATH.',
        autofix_commands=[],
    )

  go_files = [f for f in remove_deleted_files(files) if f.endswith('.go')]
  if not go_files:
    return CheckResult(
        ok=True,
        output='no go files to check',
        autofix_commands=[],
    )

  command = [gofmt, '-l'] + go_files
  exit_code, output = run_command_unchecked(command, cwd=toolchain_utils_root)

  if exit_code:
    return CheckResult(
        ok=False,
        output='%s failed; stdout/stderr:\n%s' %
        (escape_command(command), output),
        autofix_commands=[],
    )

  output = output.strip()
  if not output:
    return CheckResult(
        ok=True,
        output='',
        autofix_commands=[],
    )

  broken_files = [x.strip() for x in output.splitlines()]
  autofix = [gofmt, '-w'] + broken_files
  return CheckResult(
      ok=False,
      output='The following Go files have incorrect '
      'formatting: %s' % broken_files,
      autofix_commands=[autofix],
  )


def check_tests(toolchain_utils_root: str,
                _thread_pool: multiprocessing.pool.ThreadPool,
                files: t.List[str]) -> CheckResult:
  """Runs tests."""
  exit_code, stdout_and_stderr = run_command_unchecked(
      [os.path.join(toolchain_utils_root, 'run_tests_for.py'), '--'] + files,
      toolchain_utils_root)
  return CheckResult(
      ok=exit_code == 0,
      output=stdout_and_stderr,
      autofix_commands=[],
  )


def detect_toolchain_utils_root() -> str:
  return os.path.dirname(os.path.dirname(os.path.abspath(__file__)))


def process_check_result(
    check_name: str, check_results: t.Union[t.List[CheckResult], CheckResult],
    start_time: datetime.datetime) -> t.Tuple[bool, t.List[t.List[str]]]:
  """Prints human-readable output for the given check_results."""
  indent = '  '

  def indent_block(text: str) -> str:
    return indent + text.replace('\n', '\n' + indent)

  if isinstance(check_results, CheckResult):
    ok, output, autofix_commands = check_results
    if not ok and autofix_commands:
      recommendation = ('Recommended command(s) to fix this: %s' %
                        [escape_command(x) for x in autofix_commands])
      if output:
        output += '\n' + recommendation
      else:
        output = recommendation
  else:
    output_pieces = []
    autofix_commands = []
    for subname, (ok, output, autofix) in check_results:
      status = 'succeeded' if ok else 'failed'
      message = ['*** %s.%s %s' % (check_name, subname, status)]
      if output:
        message.append(indent_block(output))
      if not ok and autofix:
        message.append(
            indent_block('Recommended command(s) to fix this: %s' %
                         [escape_command(x) for x in autofix]))

      output_pieces.append('\n'.join(message))
      autofix_commands += autofix

    ok = all(x.ok for _, x in check_results)
    output = '\n\n'.join(output_pieces)

  time_taken = datetime.datetime.now() - start_time
  if ok:
    print('*** %s succeeded after %s' % (check_name, time_taken))
  else:
    print('*** %s failed after %s' % (check_name, time_taken))

  if output:
    print(indent_block(output))

  print()
  return ok, autofix_commands


def try_autofix(all_autofix_commands: t.List[t.List[str]],
                toolchain_utils_root: str) -> None:
  """Tries to run all given autofix commands, if appropriate."""
  if not all_autofix_commands:
    return

  exit_code, output = run_command_unchecked(['git', 'status', '--porcelain'],
                                            cwd=toolchain_utils_root)
  if exit_code != 0:
    print("Autofix aborted: couldn't get toolchain-utils git status.")
    return

  if output.strip():
    # A clean repo makes checking/undoing autofix commands trivial. A dirty
    # one... less so. :)
    print('Git repo seems dirty; skipping autofix.')
    return

  anything_succeeded = False
  for command in all_autofix_commands:
    exit_code, output = run_command_unchecked(command,
                                              cwd=toolchain_utils_root)

    if exit_code:
      print('*** Autofix command `%s` exited with code %d; stdout/stderr:' %
            (escape_command(command), exit_code))
      print(output)
    else:
      print('*** Autofix `%s` succeeded' % escape_command(command))
      anything_succeeded = True

  if anything_succeeded:
    print('NOTE: Autofixes have been applied. Please check your tree, since '
          'some lints may now be fixed')


def find_repo_root(base_dir: str) -> t.Optional[str]:
  current = base_dir
  while current != '/':
    if os.path.isdir(os.path.join(current, '.repo')):
      return current
    current = os.path.dirname(current)
  return None


def is_in_chroot() -> bool:
  return os.path.exists('/etc/cros_chroot_version')


def maybe_reexec_inside_chroot(autofix: bool, files: t.List[str]) -> None:
  if is_in_chroot():
    return

  enter_chroot = True
  chdir_to = None
  toolchain_utils = detect_toolchain_utils_root()
  if find_repo_root(toolchain_utils) is None:
    chromeos_root_dir = find_chromeos_root_directory()
    if chromeos_root_dir is None:
      print('Standalone toolchain-utils checkout detected; cannot enter '
            'chroot.')
      enter_chroot = False
    else:
      chdir_to = chromeos_root_dir

  if not has_executable_on_path('cros_sdk'):
    print('No `cros_sdk` detected on $PATH; cannot enter chroot.')
    enter_chroot = False

  if not enter_chroot:
    print('Giving up on entering the chroot; be warned that some presubmits '
          'may be broken.')
    return

  # We'll be changing ${PWD}, so make everything relative to toolchain-utils,
  # which resides at a well-known place inside of the chroot.
  chroot_toolchain_utils = '/mnt/host/source/src/third_party/toolchain-utils'

  def rebase_path(path: str) -> str:
    return os.path.join(chroot_toolchain_utils,
                        os.path.relpath(path, toolchain_utils))

  args = [
      'cros_sdk',
      '--enter',
      '--',
      rebase_path(__file__),
  ]

  if not autofix:
    args.append('--no_autofix')
  args.extend(rebase_path(x) for x in files)

  if chdir_to is None:
    print('Attempting to enter the chroot...')
  else:
    print(f'Attempting to enter the chroot for tree at {chdir_to}...')
    os.chdir(chdir_to)
  os.execvp(args[0], args)


def ensure_pip_deps_installed() -> None:
  if not has_executable_on_path('pip'):
    print('Autoinstalling `pip`...')
    subprocess.check_call(['sudo', 'emerge', 'dev-python/pip'])

  for package in ('scipy', 'yapf'):
    exit_code = subprocess.call(
        ['python3', '-c', f'import {package}'],
        stdout=subprocess.DEVNULL,
        stderr=subprocess.DEVNULL,
    )
    if exit_code != 0:
      print(f'Autoinstalling `{package}`...')
      subprocess.check_call(['pip', 'install', '--user', package])


def main(argv: t.List[str]) -> int:
  parser = argparse.ArgumentParser(description=__doc__)
  parser.add_argument('--no_autofix',
                      dest='autofix',
                      action='store_false',
                      help="Don't run any autofix commands.")
  parser.add_argument(
      '--no_enter_chroot',
      dest='enter_chroot',
      action='store_false',
      help="Prevent auto-entering the chroot if we're not already in it.")
  parser.add_argument('files', nargs='*')
  opts = parser.parse_args(argv)

  files = opts.files
  if not files:
    return 0

  if opts.enter_chroot:
    maybe_reexec_inside_chroot(opts.autofix, opts.files)

  # If you ask for --no_enter_chroot, you're on your own for installing these
  # things.
  if is_in_chroot():
    ensure_pip_deps_installed()

  files = [os.path.abspath(f) for f in files]

  # Note that we extract .__name__s from these, so please name them in a
  # user-friendly way.
  checks = [
      check_cros_lint,
      check_py_format,
      check_go_format,
      check_tests,
  ]

  toolchain_utils_root = detect_toolchain_utils_root()

  # NOTE: As mentioned above, checks can block on threads they spawn in this
  # pool, so we need at least len(checks)+1 threads to avoid deadlock. Use *2
  # so all checks can make progress at a decent rate.
  num_threads = max(multiprocessing.cpu_count(), len(checks) * 2)
  start_time = datetime.datetime.now()

  # For our single print statement...
  spawn_print_lock = threading.RLock()

  def run_check(check_fn):
    name = check_fn.__name__
    with spawn_print_lock:
      print('*** Spawning %s' % name)
    return name, check_fn(toolchain_utils_root, pool, files)

  # ThreadPool is a ContextManager in py3.
  # pylint: disable=not-context-manager
  with multiprocessing.pool.ThreadPool(num_threads) as pool:
    all_checks_ok = True
    all_autofix_commands = []
    for check_name, result in pool.imap_unordered(run_check, checks):
      ok, autofix_commands = process_check_result(check_name, result,
                                                  start_time)
      all_checks_ok = ok and all_checks_ok
      all_autofix_commands += autofix_commands

  # Run these after everything settles, so:
  # - we don't collide with checkers that are running concurrently
  # - we clearly print out everything that went wrong ahead of time, in case
  #   any of these fail
  if opts.autofix:
    try_autofix(all_autofix_commands, toolchain_utils_root)

  if not all_checks_ok:
    return 1
  return 0


if __name__ == '__main__':
  sys.exit(main(sys.argv[1:]))
