# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Unittest for cros_test_lib (tests for tests? Who'd a thunk it)."""

from __future__ import print_function

import mock
import os
import sys
import time
import unittest

from chromite.lib import cros_test_lib
from chromite.lib import cros_build_lib
from chromite.lib import cros_build_lib_unittest
from chromite.lib import osutils
from chromite.lib import partial_mock
from chromite.lib import timeout_util


# pylint: disable=W0212,W0233

# Convenience alias
Dir = cros_test_lib.Directory


class CrosTestCaseTest(cros_test_lib.TestCase):
  """Test the cros_test_lib.TestCase."""

  def testAssertStartsWith(self):
    s = "abcdef"
    prefix = "abc"
    self.assertStartsWith(s, prefix)
    prefix = "def"
    self.assertRaises(AssertionError, self.assertStartsWith, s, prefix)

  def testAssertEndsWith(self):
    s = "abcdef"
    suffix = "abc"
    self.assertRaises(AssertionError, self.assertEndsWith, s, suffix)
    suffix = "def"
    self.assertEndsWith(s, suffix)


class TruthTableTest(cros_test_lib.TestCase):
  """Test TruthTable functionality."""

  def _TestTableSanity(self, tt, lines):
    """Run the given truth table through basic sanity checks.

    Args:
      tt: A TruthTable object.
      lines: The expect input lines, in order (list of tuples).
    """
    # Check that more than one iterable can be used at once.
    iter1 = iter(tt)
    iter2 = iter(tt)
    self.assertEquals(lines[0], iter1.next())
    self.assertEquals(lines[0], iter2.next())
    self.assertEquals(lines[1], iter2.next())

    # Check that iteration again works again.
    for ix, line in enumerate(tt):
      self.assertEquals(lines[ix], line)

    # Check direct access of input lines.
    for i in xrange(len(tt)):
      self.assertEquals(lines[i], tt.GetInputs(i))

    # Check assertions on bad input to GetInputs.
    self.assertRaises(ValueError, tt.GetInputs, -1)
    self.assertRaises(ValueError, tt.GetInputs, len(tt))

  def testTwoDimensions(self):
    """Test TruthTable behavior for two boolean inputs."""
    tt = cros_test_lib.TruthTable(inputs=[(True, True), (True, False)])
    self.assertEquals(len(tt), pow(2, 2))

    # Check truth table output.
    self.assertFalse(tt.GetOutput((False, False)))
    self.assertFalse(tt.GetOutput((False, True)))
    self.assertTrue(tt.GetOutput((True, False)))
    self.assertTrue(tt.GetOutput((True, True)))

    # Check assertions on bad input to GetOutput.
    self.assertRaises(TypeError, tt.GetOutput, True)
    self.assertRaises(ValueError, tt.GetOutput, (True, True, True))

    # Check iteration over input lines.
    lines = list(tt)
    self.assertEquals((False, False), lines[0])
    self.assertEquals((False, True), lines[1])
    self.assertEquals((True, False), lines[2])
    self.assertEquals((True, True), lines[3])

    self._TestTableSanity(tt, lines)

  def testFourDimensions(self):
    """Test TruthTable behavior for four boolean inputs."""
    false1 = (True, True, True, False)
    false2 = (True, False, True, False)
    true1 = (False, True, False, True)
    true2 = (True, True, False, False)
    tt = cros_test_lib.TruthTable(inputs=(false1, false2), input_result=False)
    self.assertEquals(len(tt), pow(2, 4))

    # Check truth table output.
    self.assertFalse(tt.GetOutput(false1))
    self.assertFalse(tt.GetOutput(false2))
    self.assertTrue(tt.GetOutput(true1))
    self.assertTrue(tt.GetOutput(true2))

    # Check assertions on bad input to GetOutput.
    self.assertRaises(TypeError, tt.GetOutput, True)
    self.assertRaises(ValueError, tt.GetOutput, (True, True, True))

    # Check iteration over input lines.
    lines = list(tt)
    self.assertEquals((False, False, False, False), lines[0])
    self.assertEquals((False, False, False, True), lines[1])
    self.assertEquals((False, True, True, True), lines[7])
    self.assertEquals((True, True, True, True), lines[15])

    self._TestTableSanity(tt, lines)


class VerifyTarballTest(cros_test_lib.MockTempDirTestCase):
  """Test tarball verification functionality."""

  TARBALL = 'fake_tarball'

  def setUp(self):
    self.rc_mock = self.StartPatcher(cros_build_lib_unittest.RunCommandMock())

  def _MockTarList(self, files):
    """Mock out tarball content list call.

    Args:
      files: A list of contents to return.
    """
    self.rc_mock.AddCmdResult(
        partial_mock.ListRegex('tar -tf'), output='\n'.join(files))

  def testNormPath(self):
    """Test path normalization."""
    tar_contents = ['./', './foo/', './foo/./a', './foo/./b']
    dir_struct = [Dir('.', []), Dir('foo', ['a', 'b'])]
    self._MockTarList(tar_contents)
    cros_test_lib.VerifyTarball(self.TARBALL, dir_struct)

  def testDuplicate(self):
    """Test duplicate detection."""
    tar_contents = ['a', 'b', 'a']
    dir_struct = ['a', 'b']
    self._MockTarList(tar_contents)
    self.assertRaises(AssertionError, cros_test_lib.VerifyTarball, self.TARBALL,
                      dir_struct)


class MockTestCaseTest(cros_test_lib.TestCase):
  """Tests MockTestCase functionality."""

  class MyMockTestCase(cros_test_lib.MockTestCase):
    """Helper class for testing MockTestCase."""
    def testIt(self):
      pass

  class Mockable(object):
    """Helper test class intended for having values mocked out."""
    TO_BE_MOCKED = 0
    TO_BE_MOCKED2 = 10
    TO_BE_MOCKED3 = 20

  def GetPatcher(self, attr, val):
    return mock.patch('%s.MockTestCaseTest.Mockable.%s' % (__name__, attr),
                      new=val)

  def testPatchRemovalError(self):
    """Verify that patch removal during tearDown is robust to Exceptions."""
    tc = self.MyMockTestCase('testIt')
    patcher = self.GetPatcher('TO_BE_MOCKED', -100)
    patcher2 = self.GetPatcher('TO_BE_MOCKED2', -200)
    patcher3 = self.GetPatcher('TO_BE_MOCKED3', -300)
    patcher3.start()
    tc.setUp()
    tc.StartPatcher(patcher)
    tc.StartPatcher(patcher2)
    patcher.stop()
    self.assertEquals(self.Mockable.TO_BE_MOCKED2, -200)
    self.assertEquals(self.Mockable.TO_BE_MOCKED3, -300)
    self.assertRaises(RuntimeError, tc.tearDown)
    # Make sure that even though exception is raised for stopping 'patcher', we
    # continue to stop 'patcher2', and run patcher.stopall().
    self.assertEquals(self.Mockable.TO_BE_MOCKED2, 10)
    self.assertEquals(self.Mockable.TO_BE_MOCKED3, 20)


class TestCaseTest(unittest.TestCase):
  """Tests TestCase functionality."""

  def testTimeout(self):
    """Test that test cases are interrupted when they are hanging."""

    class TimeoutTestCase(cros_test_lib.TestCase):
      """Test case that raises a TimeoutError because it takes too long."""

      TEST_CASE_TIMEOUT = 1

      def testSleeping(self):
        """Sleep for 2 minutes. This should raise a TimeoutError."""
        time.sleep(2 * 60)
        raise AssertionError('Test case should have timed out.')

    # Run the test case, verifying it raises a TimeoutError.
    test = TimeoutTestCase(methodName='testSleeping')
    self.assertRaises(timeout_util.TimeoutError, test.testSleeping)


class OutputTestCaseTest(cros_test_lib.OutputTestCase,
                         cros_test_lib.TempDirTestCase):
  """Tests OutputTestCase functionality."""

  def testStdoutAndStderr(self):
    """Check capturing stdout and stderr."""
    with self.OutputCapturer():
      print('foo')
      print('bar', file=sys.stderr)
    self.AssertOutputContainsLine('foo')
    self.AssertOutputContainsLine('bar', check_stdout=False, check_stderr=True)

  def testStdoutReadDuringCapture(self):
    """Check reading stdout mid-capture."""
    with self.OutputCapturer():
      print('foo')
      self.AssertOutputContainsLine('foo')
      print('bar')
      self.AssertOutputContainsLine('bar')
    self.AssertOutputContainsLine('foo')
    self.AssertOutputContainsLine('bar')

  def testClearCaptured(self):
    """Check writing data, clearing it, then writing more data."""
    with self.OutputCapturer() as cap:
      print('foo')
      self.AssertOutputContainsLine('foo')
      cap.ClearCaptured()
      self.AssertOutputContainsLine('foo', invert=True)
      print('bar')
    self.AssertOutputContainsLine('bar')

  def testRunCommandCapture(self):
    """Check capturing RunCommand() subprocess output."""
    with self.OutputCapturer():
      cros_build_lib.RunCommand(['sh', '-c', 'echo foo; echo bar >&2'],
                                mute_output=False)
    self.AssertOutputContainsLine('foo')
    self.AssertOutputContainsLine('bar', check_stdout=False, check_stderr=True)

  def testCapturingStdoutAndStderrToFile(self):
    """Check that OutputCapturer captures to a named file."""
    stdout_path = os.path.join(self.tempdir, 'stdout')
    stderr_path = os.path.join(self.tempdir, 'stderr')
    with self.OutputCapturer(stdout_path=stdout_path, stderr_path=stderr_path):
      print('foo')
      print('bar', file=sys.stderr)

    # Check that output can be read by OutputCapturer.
    self.AssertOutputContainsLine('foo')
    self.AssertOutputContainsLine('bar', check_stdout=False, check_stderr=True)
    # Verify that output is actually written to the correct files.
    self.assertEqual('foo\n', osutils.ReadFile(stdout_path))
    self.assertEqual('bar\n', osutils.ReadFile(stderr_path))
