#!/usr/bin/env python3

# Copyright 2021 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Wrapper to run the platform.BootPerf tast test.

This script runs the 'platform.BootPerf' boot timing test and store the results
for later analysis by the 'showbootdata' script.

NOTE: This script must be run from inside the chromeos build chroot
environment.
"""

import argparse
import datetime
from distutils import dir_util
from distutils import file_util
import glob
import os
import os.path
import re
import shutil
import stat
import subprocess
import sys
import tempfile
import textwrap

# Tast tmp results (/tmp/bootperf.XXXXXX) structure:
# dut-info.txt
# full.txt
# results.json
# streamed_results.jsonl
# system_logs/
# tests/
#     platform.BootPerf/
#         log.txt
#         results-chart.json
#         raw.001/
#         raw.002/
#         (one dir of raw data for each boot iteration)
# timing.json

# Constants
_TEST = 'platform.BootPerf'
_RESULTS_DIR = f'tests/{_TEST}'
_RESULTS_KEYVAL = f'{_RESULTS_DIR}/results-chart.json'
_RESULTS_SUMMARY = 'results_json'
_RESULTS_SUMMARY_FILES_RAW_GLOB = f'{_RESULTS_DIR}/raw.*'

# Structure of a results directory:
#   $RESULTS_SUMMARY/       - file
#   $RUNDIR.$ITER/          - directory
#       $RUNDIR_LOG             - file
#       $RUNDIR_SUMMARY/        - directory
#       $RUNDIR_ALL_RESULTS/    - optional directory
# If you add any other content under the results directory, you'll
# probably need to change extra_files(), below.
_RUNDIR = 'run'
_RUNDIR_LOG = 'log.txt'
_RUNDIR_SUMMARY = 'summary'
_RUNDIR_ALL_RESULTS = 'logs'

_DESCRIPTION = """\
Summary:
  Run the {} tast test, and store results in the
  given destination directory. The test target is specified by
  <ip_address>.

  By default, the test is run once; if <count> is given, the test is
  run that many times. By default, each test run reboots the test target 10
  times, and this can be overridden using the [-r REBOOT_ITERATIONS] option.

  If the destination directory doesn't exist, it is created.  If the
  destination directory already holds test results, additional
  results are added in without overwriting earlier results.

  If no destination is specified, the current directory is used,
  provided that the directory is empty, or has been previously used
  as a destination directory for this command.

  By default, only a summary subset of the log files created by
  tast are preserved; with --keep_logs the (potentially large)
  logs are preserved with the test result.
""".format(_TEST)


def print_error(error):
  """A utility function for printing a color-highlighted error if possible."""
  cred = '\033[1;31;40m'
  cend = '\033[0m'
  if hasattr(sys.stdout, 'isatty') and sys.stdout.isatty():
    error = f'{cred}ERROR: {error}{cend}'
  print(error)


def _assert_in_chroot():
  """Asserts that we are inside the cros chroot."""
  if not os.path.exists('/etc/cros_chroot_version'):
    print_error(
        textwrap.dedent("""\
            This script must be run inside the chroot. Run this first:
                cros_sdk
            """))
    sys.exit(1)


class BootPerf:
  """Runs the boot timing tests.

  This class drives the execution running the boot timing test:
  * Parse and validate command line arguments.
  * Handle working and output directories.
  * Run the platform.BootPerf tast test.
  * Collect test results.
  """

  def __init__(self):
    self.parser = None
    self.args = None
    self.count = None
    self.current_iter = None
    self.output_dir = None
    self.tmp_dir = None

  def process_cmdline(self):
    """Process the command line arguments."""
    parser = argparse.ArgumentParser(
        epilog=_DESCRIPTION,
        formatter_class=argparse.RawDescriptionHelpFormatter)
    # Positional arguments: ip-address and (optional) count.
    parser.add_argument('ip_address', help='Address of the test target')
    parser.add_argument(
        'count',
        nargs='?',
        help='The number of iterations (default: %(default)s)',
        type=int,
        default=1)
    parser.add_argument(
        '-o',
        '--output_dir',
        help='Specify output directory for results')
    parser.add_argument(
        '-k',
        '--keep_logs',
        help='Keep tast log files',
        action='store_true')
    parser.add_argument(
        '-r',
        '--reboot_iterations',
        help='Specify the number of reboots in each iteration ' \
             '(default: %(default)s)',
        type=int,
        default=10)
    self.parser = parser
    self.args = parser.parse_args()

  def validate_args(self):
    """Utility for validating command line arguments."""
    if self.args.count <= 0:
      self.print_usage('<count> argument must be a positive number')
    self.count = self.args.count

    if self.args.reboot_iterations <= 0:
      self.print_usage(
          '[-r REBOOT_ITERATIONS] argument must be a positive number')

  def print_usage(self, error=None):
    """Prints usage help message and terminates the script."""
    self.parser.print_help()
    if error:
      print_error(error)
    sys.exit(1)

  def _validate_output_dir(self):
    """Check for extra files in the output dir other than _RUNDIR ones.

       Also gets the current iteration number.
    """
    max_iter = 0
    for entry in os.listdir(self.output_dir):
      basename = os.path.basename(entry)
      if basename == _RESULTS_SUMMARY:
        continue
      matches = re.match(_RUNDIR + r'\.(\d+)', basename)
      if matches is None:
        print_error(
            textwrap.dedent("""\
                No results directory specified, and current directory
                contains contents other than run results.
                You can override this error by using the --output_dir option
                """))
        self.print_usage()
      # Update to find the current iteration.
      max_iter = max(max_iter, int(matches.group(1)))

    self.current_iter = max_iter + 1

  def _current_iter_str(self):
    """Utility for converting the current iteration numbder string.

    Returns:
      The current iteration number as a string.
    """
    return '{:03d}'.format(self.current_iter)

  def prepare_directories(self):
    """Prepares the working temp and output directories for the test."""
    self._process_output_dir()
    self._validate_output_dir()

  def _process_output_dir(self):
    """Creates the output dir or use the current working dir for test output."""
    if self.args.output_dir is not None:
      if not os.path.exists(self.args.output_dir):
        try:
          os.mkdir(self.args.output_dir)
        except OSError:
          self.print_usage(f'Unable to create {self.args.output_dir}')
      self.output_dir = self.args.output_dir
    else:
      self.output_dir = os.getcwd()

  def _make_tmp_dir(self):
    """Creates a temp directory as the test working dir."""
    self.tmp_dir = tempfile.mkdtemp(prefix='bootperf.')

  def _copy_results_summary(self, dst_dir):
    """Copies the summary of test artifacts."""
    # Copy regular files in the tmp directory (non-recursively).
    for filename in os.listdir(self.tmp_dir):
      src = os.path.join(self.tmp_dir, filename)
      if os.path.isfile(src):
        dst = os.path.join(dst_dir, filename)
        file_util.copy_file(src, dst)
    # Copy the results dir for the test.
    src = os.path.join(self.tmp_dir, _RESULTS_DIR)
    dst = os.path.join(dst_dir, _RESULTS_DIR)
    dir_util.copy_tree(src, dst)

    # The reboots dir contains archives of syslog messages for each reboot
    # and can be potentially large. Remove it from the summary directory.
    shutil.rmtree(os.path.join(dst_dir, _RESULTS_DIR, 'reboots'))

  def _run_boot_test_once(self):
    """Run the platform.BootPerf tast test once."""
    remote = self.args.ip_address
    # |iter_rundir| is the absolute path of the run.??? directory for the
    # current iteration.
    iter_rundir = os.path.join(
        self.output_dir,
        f'{_RUNDIR}.{self._current_iter_str()}')
    logfile = os.path.join(iter_rundir, _RUNDIR_LOG)
    summary_dir = os.path.join(iter_rundir, _RUNDIR_SUMMARY)
    all_results_dir = os.path.join(iter_rundir, _RUNDIR_ALL_RESULTS)

    self._make_tmp_dir()
    os.mkdir(iter_rundir)
    time_now = datetime.datetime.now().strftime('%H:%M:%S')
    print(f'Test started: {time_now} - {logfile}')

    # bootperf is typically run by devs in local tests, where rootfs
    # verification is often disabled. Disable the assertion of rootfs
    # verification in tast.platform.BootPerf.
    skiprootfs_check = '-var=platform.BootPerf.skipRootfsCheck=true'
    # Test option of the number of reboots in each test run. Default is 10.
    # Note that the test runs for <count> times so the total number of reboots
    # will be 10*<count>.
    iterations = '-var=platform.BootPerf.iterations={}'.format(
        self.args.reboot_iterations)

    tast_args = [
        'tast',
        'run',
        f'--resultsdir={self.tmp_dir}',
        skiprootfs_check,
        iterations,
        remote,
        _TEST,
    ]
    with open(logfile, 'w') as output:
      subprocess.call(
          tast_args, stdout=output, stderr=output, cwd=self.output_dir)

    if not os.path.exists(os.path.join(self.tmp_dir, _RESULTS_KEYVAL)):
      print_error(
          textwrap.dedent("""\
            No results file; terminating test runs.
            Check {} for output from the test run,
            and see {} for full test logs and output.
            """.format(logfile, self.tmp_dir)))
      sys.exit(1)

    os.mkdir(summary_dir)
    self._copy_results_summary(summary_dir)

    if self.args.keep_logs:
      shutil.move(self.tmp_dir, all_results_dir)
      os.chmod(
          all_results_dir, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
          | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
    else:
      shutil.rmtree(self.tmp_dir)

    self.current_iter += 1

  def _copy_all_results_summary(self):
    """Utility to copy all results-chart.json into the summary file."""
    with open(os.path.join(self.output_dir, _RESULTS_SUMMARY), 'w') as outf:
      for path in glob.glob(
          os.path.join(f'{_RUNDIR}.???', _RUNDIR_SUMMARY, _RESULTS_KEYVAL)):
        with open(path) as inf:
          for line in inf:
            outf.write(line)

  def run_boot_test(self):
    """Main function to run the boot performance test.

    Run the boot performance test for the given count, putting output into the
    current directory.

    Arguments are <ip-address> and <count> arguments, as for the main command.

    We terminate test runs if the _RESULTS_SUMMARY file isn't produced;
    generally this is the result of a serious error (e.g. disk full) that
    won't go away if we just plow on.
    """
    for _ in range(self.count):
      self._run_boot_test_once()

    print('Test finished:', datetime.datetime.now().strftime('%H:%M:%S'))
    self._copy_all_results_summary()


def main():
  _assert_in_chroot()

  boot_perf = BootPerf()
  boot_perf.process_cmdline()
  boot_perf.validate_args()
  boot_perf.prepare_directories()
  boot_perf.run_boot_test()


if __name__ == '__main__':
  main()
