#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2016 The ChromiumOS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Unittest for the results reporter."""

from __future__ import division
from __future__ import print_function

import collections
import io
import os
import unittest
import unittest.mock as mock

import test_flag

from benchmark_run import MockBenchmarkRun
from cros_utils import logger
from experiment_factory import ExperimentFactory
from experiment_file import ExperimentFile
from machine_manager import MockCrosMachine
from machine_manager import MockMachineManager
from results_cache import MockResult
from results_report import BenchmarkResults
from results_report import HTMLResultsReport
from results_report import JSONResultsReport
from results_report import ParseChromeosImage
from results_report import ParseStandardPerfReport
from results_report import TextResultsReport


class FreeFunctionsTest(unittest.TestCase):
  """Tests for any free functions in results_report."""

  def testParseChromeosImage(self):
    # N.B. the cases with blank versions aren't explicitly supported by
    # ParseChromeosImage. I'm not sure if they need to be supported, but the
    # goal of this was to capture existing functionality as much as possible.
    base_case = '/my/chroot/src/build/images/x86-generic/R01-1.0.date-time' \
        '/chromiumos_test_image.bin'
    self.assertEqual(ParseChromeosImage(base_case), ('R01-1.0', base_case))

    dir_base_case = os.path.dirname(base_case)
    self.assertEqual(ParseChromeosImage(dir_base_case), ('', dir_base_case))

    buildbot_case = '/my/chroot/chroot/tmp/buildbot-build/R02-1.0.date-time' \
        '/chromiumos_test_image.bin'
    buildbot_img = buildbot_case.split('/chroot/tmp')[1]

    self.assertEqual(
        ParseChromeosImage(buildbot_case), ('R02-1.0', buildbot_img))
    self.assertEqual(
        ParseChromeosImage(os.path.dirname(buildbot_case)),
        ('', os.path.dirname(buildbot_img)))

    # Ensure we do something reasonable when giving paths that don't quite
    # match the expected pattern.
    fun_case = '/chromiumos_test_image.bin'
    self.assertEqual(ParseChromeosImage(fun_case), ('', fun_case))

    fun_case2 = 'chromiumos_test_image.bin'
    self.assertEqual(ParseChromeosImage(fun_case2), ('', fun_case2))


# There are many ways for this to be done better, but the linter complains
# about all of them (that I can think of, at least).
_fake_path_number = [0]


def FakePath(ext):
  """Makes a unique path that shouldn't exist on the host system.

  Each call returns a different path, so if said path finds its way into an
  error message, it may be easier to track it to its source.
  """
  _fake_path_number[0] += 1
  prefix = '/tmp/should/not/exist/%d/' % (_fake_path_number[0],)
  return os.path.join(prefix, ext)


def MakeMockExperiment(compiler='gcc'):
  """Mocks an experiment using the given compiler."""
  mock_experiment_file = io.StringIO("""
      board: x86-alex
      remote: 127.0.0.1
      locks_dir: /tmp
      perf_args: record -a -e cycles
      benchmark: PageCycler {
        iterations: 3
      }

      image1 {
        chromeos_image: %s
      }

      image2 {
        remote: 127.0.0.2
        chromeos_image: %s
      }
      """ % (FakePath('cros_image1.bin'), FakePath('cros_image2.bin')))
  efile = ExperimentFile(mock_experiment_file)
  experiment = ExperimentFactory().GetExperiment(efile,
                                                 FakePath('working_directory'),
                                                 FakePath('log_dir'))
  for label in experiment.labels:
    label.compiler = compiler
  return experiment


def _InjectSuccesses(experiment, how_many, keyvals, for_benchmark=0):
  """Injects successful experiment runs (for each label) into the experiment."""
  # Defensive copy of keyvals, so if it's modified, we'll know.
  keyvals = dict(keyvals)
  num_configs = len(experiment.benchmarks) * len(experiment.labels)
  num_runs = len(experiment.benchmark_runs) // num_configs

  # TODO(gbiv): Centralize the mocking of these, maybe? (It's also done in
  # benchmark_run_unittest)
  bench = experiment.benchmarks[for_benchmark]
  cache_conditions = []
  log_level = 'average'
  share_cache = ''
  locks_dir = ''
  log = logger.GetLogger()
  machine_manager = MockMachineManager(
      FakePath('chromeos_root'), 0, log_level, locks_dir)
  machine_manager.AddMachine('testing_machine')
  machine = next(
      m for m in machine_manager.GetMachines() if m.name == 'testing_machine')

  def MakeSuccessfulRun(n, label):
    run = MockBenchmarkRun('mock_success%d' % (n,), bench, label,
                           1 + n + num_runs, cache_conditions, machine_manager,
                           log, log_level, share_cache, {})
    mock_result = MockResult(log, label, log_level, machine)
    mock_result.keyvals = keyvals
    run.result = mock_result
    return run

  for label in experiment.labels:
    experiment.benchmark_runs.extend(
        MakeSuccessfulRun(n, label) for n in range(how_many))
  return experiment


class TextResultsReportTest(unittest.TestCase):
  """Tests that the output of a text report contains the things we pass in.

  At the moment, this doesn't care deeply about the format in which said
  things are displayed. It just cares that they're present.
  """

  def _checkReport(self, mock_getcooldown, email):
    num_success = 2
    success_keyvals = {'retval': 0, 'machine': 'some bot', 'a_float': 3.96}
    experiment = _InjectSuccesses(MakeMockExperiment(), num_success,
                                  success_keyvals)
    SECONDS_IN_MIN = 60
    mock_getcooldown.return_value = {
        experiment.remote[0]: 12 * SECONDS_IN_MIN,
        experiment.remote[1]: 8 * SECONDS_IN_MIN
    }

    text_report = TextResultsReport.FromExperiment(
        experiment, email=email).GetReport()
    self.assertIn(str(success_keyvals['a_float']), text_report)
    self.assertIn(success_keyvals['machine'], text_report)
    self.assertIn(MockCrosMachine.CPUINFO_STRING, text_report)
    self.assertIn('\nDuration\n', text_report)
    self.assertIn('Total experiment time:\n', text_report)
    self.assertIn('Cooldown wait time:\n', text_report)
    self.assertIn('DUT %s: %d min' % (experiment.remote[0], 12), text_report)
    self.assertIn('DUT %s: %d min' % (experiment.remote[1], 8), text_report)
    return text_report

  @mock.patch.object(TextResultsReport, 'GetTotalWaitCooldownTime')
  def testOutput(self, mock_getcooldown):
    email_report = self._checkReport(mock_getcooldown, email=True)
    text_report = self._checkReport(mock_getcooldown, email=False)

    # Ensure that the reports somehow different. Otherwise, having the
    # distinction is useless.
    self.assertNotEqual(email_report, text_report)

  def test_get_totalwait_cooldowntime(self):
    experiment = MakeMockExperiment()
    cros_machines = experiment.machine_manager.GetMachines()
    cros_machines[0].AddCooldownWaitTime(120)
    cros_machines[1].AddCooldownWaitTime(240)
    text_results = TextResultsReport.FromExperiment(experiment, email=False)
    total = text_results.GetTotalWaitCooldownTime()
    self.assertEqual(total[experiment.remote[0]], 120)
    self.assertEqual(total[experiment.remote[1]], 240)


class HTMLResultsReportTest(unittest.TestCase):
  """Tests that the output of a HTML report contains the things we pass in.

  At the moment, this doesn't care deeply about the format in which said
  things are displayed. It just cares that they're present.
  """

  _TestOutput = collections.namedtuple('TestOutput', [
      'summary_table', 'perf_html', 'chart_js', 'charts', 'full_table',
      'experiment_file'
  ])

  @staticmethod
  def _GetTestOutput(perf_table, chart_js, summary_table, print_table,
                     chart_divs, full_table, experiment_file):
    # N.B. Currently we don't check chart_js; it's just passed through because
    # cros lint complains otherwise.
    summary_table = print_table(summary_table, 'HTML')
    perf_html = print_table(perf_table, 'HTML')
    full_table = print_table(full_table, 'HTML')
    return HTMLResultsReportTest._TestOutput(
        summary_table=summary_table,
        perf_html=perf_html,
        chart_js=chart_js,
        charts=chart_divs,
        full_table=full_table,
        experiment_file=experiment_file)

  def _GetOutput(self, experiment=None, benchmark_results=None):
    with mock.patch('results_report_templates.GenerateHTMLPage') as standin:
      if experiment is not None:
        HTMLResultsReport.FromExperiment(experiment).GetReport()
      else:
        HTMLResultsReport(benchmark_results).GetReport()
      mod_mock = standin
    self.assertEqual(mod_mock.call_count, 1)
    # call_args[0] is positional args, call_args[1] is kwargs.
    self.assertEqual(mod_mock.call_args[0], tuple())
    fmt_args = mod_mock.call_args[1]
    return self._GetTestOutput(**fmt_args)

  def testNoSuccessOutput(self):
    output = self._GetOutput(MakeMockExperiment())
    self.assertIn('no result', output.summary_table)
    self.assertIn('no result', output.full_table)
    self.assertEqual(output.charts, '')
    self.assertNotEqual(output.experiment_file, '')

  def testSuccessfulOutput(self):
    num_success = 2
    success_keyvals = {'retval': 0, 'a_float': 3.96}
    output = self._GetOutput(
        _InjectSuccesses(MakeMockExperiment(), num_success, success_keyvals))

    self.assertNotIn('no result', output.summary_table)
    # self.assertIn(success_keyvals['machine'], output.summary_table)
    self.assertIn('a_float', output.summary_table)
    self.assertIn(str(success_keyvals['a_float']), output.summary_table)
    self.assertIn('a_float', output.full_table)
    # The _ in a_float is filtered out when we're generating HTML.
    self.assertIn('afloat', output.charts)
    # And make sure we have our experiment file...
    self.assertNotEqual(output.experiment_file, '')

  def testBenchmarkResultFailure(self):
    labels = ['label1']
    benchmark_names_and_iterations = [('bench1', 1)]
    benchmark_keyvals = {'bench1': [[]]}
    results = BenchmarkResults(labels, benchmark_names_and_iterations,
                               benchmark_keyvals)
    output = self._GetOutput(benchmark_results=results)
    self.assertIn('no result', output.summary_table)
    self.assertEqual(output.charts, '')
    self.assertEqual(output.experiment_file, '')

  def testBenchmarkResultSuccess(self):
    labels = ['label1']
    benchmark_names_and_iterations = [('bench1', 1)]
    benchmark_keyvals = {'bench1': [[{'retval': 1, 'foo': 2.0}]]}
    results = BenchmarkResults(labels, benchmark_names_and_iterations,
                               benchmark_keyvals)
    output = self._GetOutput(benchmark_results=results)
    self.assertNotIn('no result', output.summary_table)
    self.assertIn('bench1', output.summary_table)
    self.assertIn('bench1', output.full_table)
    self.assertNotEqual(output.charts, '')
    self.assertEqual(output.experiment_file, '')


class JSONResultsReportTest(unittest.TestCase):
  """Tests JSONResultsReport."""

  REQUIRED_REPORT_KEYS = ('date', 'time', 'label', 'test_name', 'pass')
  EXPERIMENT_REPORT_KEYS = ('board', 'chromeos_image', 'chromeos_version',
                            'chrome_version', 'compiler')

  @staticmethod
  def _GetRequiredKeys(is_experiment):
    required_keys = JSONResultsReportTest.REQUIRED_REPORT_KEYS
    if is_experiment:
      required_keys += JSONResultsReportTest.EXPERIMENT_REPORT_KEYS
    return required_keys

  def _CheckRequiredKeys(self, test_output, is_experiment):
    required_keys = self._GetRequiredKeys(is_experiment)
    for output in test_output:
      for key in required_keys:
        self.assertIn(key, output)

  def testAllFailedJSONReportOutput(self):
    experiment = MakeMockExperiment()
    results = JSONResultsReport.FromExperiment(experiment).GetReportObject()
    self._CheckRequiredKeys(results, is_experiment=True)
    # Nothing succeeded; we don't send anything more than what's required.
    required_keys = self._GetRequiredKeys(is_experiment=True)
    for result in results:
      self.assertCountEqual(result.keys(), required_keys)

  def testJSONReportOutputWithSuccesses(self):
    success_keyvals = {
        'retval': 0,
        'a_float': '2.3',
        'many_floats': [['1.0', '2.0'], ['3.0']],
        'machine': "i'm a pirate"
    }

    # 2 is arbitrary.
    num_success = 2
    experiment = _InjectSuccesses(MakeMockExperiment(), num_success,
                                  success_keyvals)
    results = JSONResultsReport.FromExperiment(experiment).GetReportObject()
    self._CheckRequiredKeys(results, is_experiment=True)

    num_passes = num_success * len(experiment.labels)
    non_failures = [r for r in results if r['pass']]
    self.assertEqual(num_passes, len(non_failures))

    # TODO(gbiv): ...Is the 3.0 *actually* meant to be dropped?
    expected_detailed = {'a_float': 2.3, 'many_floats': [1.0, 2.0]}
    for pass_ in non_failures:
      self.assertIn('detailed_results', pass_)
      self.assertDictEqual(expected_detailed, pass_['detailed_results'])
      self.assertIn('machine', pass_)
      self.assertEqual(success_keyvals['machine'], pass_['machine'])

  def testFailedJSONReportOutputWithoutExperiment(self):
    labels = ['label1']
    # yapf:disable
    benchmark_names_and_iterations = [('bench1', 1), ('bench2', 2),
                                      ('bench3', 1), ('bench4', 0)]
    # yapf:enable

    benchmark_keyvals = {
        'bench1': [[{
            'retval': 1,
            'foo': 2.0
        }]],
        'bench2': [[{
            'retval': 1,
            'foo': 4.0
        }, {
            'retval': -1,
            'bar': 999
        }]],
        # lack of retval is considered a failure.
        'bench3': [[{}]],
        'bench4': [[]]
    }
    bench_results = BenchmarkResults(labels, benchmark_names_and_iterations,
                                     benchmark_keyvals)
    results = JSONResultsReport(bench_results).GetReportObject()
    self._CheckRequiredKeys(results, is_experiment=False)
    self.assertFalse(any(r['pass'] for r in results))

  def testJSONGetReportObeysJSONSettings(self):
    labels = ['label1']
    benchmark_names_and_iterations = [('bench1', 1)]
    # These can be anything, really. So long as they're distinctive.
    separators = (',\t\n\t', ':\t\n\t')
    benchmark_keyvals = {'bench1': [[{'retval': 0, 'foo': 2.0}]]}
    bench_results = BenchmarkResults(labels, benchmark_names_and_iterations,
                                     benchmark_keyvals)
    reporter = JSONResultsReport(
        bench_results, json_args={'separators': separators})
    result_str = reporter.GetReport()
    self.assertIn(separators[0], result_str)
    self.assertIn(separators[1], result_str)

  def testSuccessfulJSONReportOutputWithoutExperiment(self):
    labels = ['label1']
    benchmark_names_and_iterations = [('bench1', 1), ('bench2', 2)]
    benchmark_keyvals = {
        'bench1': [[{
            'retval': 0,
            'foo': 2.0
        }]],
        'bench2': [[{
            'retval': 0,
            'foo': 4.0
        }, {
            'retval': 0,
            'bar': 999
        }]]
    }
    bench_results = BenchmarkResults(labels, benchmark_names_and_iterations,
                                     benchmark_keyvals)
    results = JSONResultsReport(bench_results).GetReportObject()
    self._CheckRequiredKeys(results, is_experiment=False)
    self.assertTrue(all(r['pass'] for r in results))
    # Enforce that the results have *some* deterministic order.
    keyfn = lambda r: (r['test_name'], r['detailed_results'].get('foo', 5.0))
    sorted_results = sorted(results, key=keyfn)
    detailed_results = [r['detailed_results'] for r in sorted_results]
    bench1, bench2_foo, bench2_bar = detailed_results
    self.assertEqual(bench1['foo'], 2.0)
    self.assertEqual(bench2_foo['foo'], 4.0)
    self.assertEqual(bench2_bar['bar'], 999)
    self.assertNotIn('bar', bench1)
    self.assertNotIn('bar', bench2_foo)
    self.assertNotIn('foo', bench2_bar)


class PerfReportParserTest(unittest.TestCase):
  """Tests for the perf report parser in results_report."""

  @staticmethod
  def _ReadRealPerfReport():
    my_dir = os.path.dirname(os.path.realpath(__file__))
    with open(os.path.join(my_dir, 'perf_files/perf.data.report.0')) as f:
      return f.read()

  def testParserParsesRealWorldPerfReport(self):
    report = ParseStandardPerfReport(self._ReadRealPerfReport())
    self.assertCountEqual(['cycles', 'instructions'], list(report.keys()))

    # Arbitrarily selected known percentages from the perf report.
    known_cycles_percentages = {
        '0xffffffffa4a1f1c9': 0.66,
        '0x0000115bb7ba9b54': 0.47,
        '0x0000000000082e08': 0.00,
        '0xffffffffa4a13e63': 0.00,
    }
    report_cycles = report['cycles']
    self.assertEqual(len(report_cycles), 214)
    for k, v in known_cycles_percentages.items():
      self.assertIn(k, report_cycles)
      self.assertEqual(v, report_cycles[k])

    known_instrunctions_percentages = {
        '0x0000115bb6c35d7a': 1.65,
        '0x0000115bb7ba9b54': 0.67,
        '0x0000000000024f56': 0.00,
        '0xffffffffa4a0ee03': 0.00,
    }
    report_instructions = report['instructions']
    self.assertEqual(len(report_instructions), 492)
    for k, v in known_instrunctions_percentages.items():
      self.assertIn(k, report_instructions)
      self.assertEqual(v, report_instructions[k])


if __name__ == '__main__':
  test_flag.SetTestMode(True)
  unittest.main()
