# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Unittest for operation"""

from __future__ import print_function

import multiprocessing
import os
import sys

from chromite.lib import cros_logging as logging
from chromite.lib import cros_test_lib
from chromite.lib import operation
from chromite.lib import parallel
from chromite.lib import workspace_lib


class TestWrapperProgressBarOperation(operation.ProgressBarOperation):
  """Inherit from operation.ProgressBarOperation for testing."""
  def ParseOutput(self, output=None):
    print("Calling ParseOutput")
    print(self._stdout.read())


class FakeParallelEmergeOperation(operation.ParallelEmergeOperation):
  """Fake for operation.ParallelEmergeOperation."""
  def __init__(self, queue):
    super(FakeParallelEmergeOperation, self).__init__()
    self._queue = queue

  def ParseOutput(self, output=None):
    super(FakeParallelEmergeOperation, self).ParseOutput()
    self._queue.put('advance')


class FakeException(Exception):
  """Fake exception used for testing exception handling."""


class ProgressBarOperationTest(cros_test_lib.WorkspaceTestCase,
                               cros_test_lib.OutputTestCase,
                               cros_test_lib.LoggingTestCase):
  """Test the Progress Bar Operation class."""
  # pylint: disable=protected-access

  def setUp(self):
    terminal_width = 20
    self._terminal = self.PatchObject(
        operation.ProgressBarOperation, '_GetTerminalSize',
        return_value=operation._TerminalSize(100, terminal_width))
    self.PatchObject(os, 'isatty', return_value=True)

  def _VerifyProgressBar(self, width, percent, expected_shaded,
                         expected_unshaded):
    """Helper to test progress bar with different percentages and lengths."""
    terminal_width = width + (
        operation.ProgressBarOperation._PROGRESS_BAR_BORDER_SIZE)
    self._terminal.return_value = operation._TerminalSize(100, terminal_width)
    op = operation.ProgressBarOperation()
    with self.OutputCapturer() as output:
      op.ProgressBar(percent)
    stdout = output.GetStdout()

    #Check that the shaded and unshaded regions are the expected size.
    self.assertEqual(stdout.count('#'), expected_shaded)
    self.assertEqual(stdout.count('-'), expected_unshaded)

  def testProgressBar(self):
    """Test progress bar at different percentages."""
    self._VerifyProgressBar(10, 0.7, 7, 3)
    self._VerifyProgressBar(10, 0, 0, 10)
    self._VerifyProgressBar(10, 1, 10, 0)
    self._VerifyProgressBar(1, 0.9, 0, 1)
    # If width of progress bar is less than _PROGRESS_BAR_BORDER_SIZE, the width
    # defaults to 1.
    self._VerifyProgressBar(-5, 0, 0, 1)
    self._VerifyProgressBar(-5, 1, 1, 0)

  def testWaitUntilComplete(self):
    """Test WaitUntilComplete returns False if background task isn't complete.

    As the background task is not started in this test, we expect it not to
    complete.
    """
    op = operation.ProgressBarOperation()
    self.assertFalse(op.WaitUntilComplete(0))

  def testCaptureOutputInBackground(self):
    """Test CaptureOutputInBackground puts finished in reasonable time."""
    def func():
      print('hi')

    op = operation.ProgressBarOperation()
    op.CaptureOutputInBackground(func)

    # This function should really finish in < 1 sec. However, we wait for a
    # longer time so the test does not fail on highly loaded builders.
    self.assertTrue(op.WaitUntilComplete(10))

  def testRun(self):
    """Test that ParseOutput is called and foo is run in background."""
    expected_output = 'hi'
    def func():
      print(expected_output)

    op = TestWrapperProgressBarOperation()
    with self.OutputCapturer():
      op.Run(func, update_period=0.05)

    # Check that foo is executed and its output is captured.
    self.AssertOutputContainsLine(expected_output)
    # Check that ParseOutput is executed at least once. It can be called twice:
    #   Once in the while loop.
    #   Once after the while loop.
    #   However, it is possible for func to execute and finish before the while
    #   statement is executed even once in which case ParseOutput would only be
    #   called once.
    self.AssertOutputContainsLine('Calling ParseOutput')

  def testExceptionHandlingNotInWorkspace(self):
    """Test exception handling if not in a workspace."""
    def func():
      print('foo')
      print('bar', file=sys.stderr)
      raise FakeException()

    op = TestWrapperProgressBarOperation()
    with self.OutputCapturer():
      try:
        with cros_test_lib.LoggingCapturer() as logs:
          op.Run(func)
      except parallel.BackgroundFailure:
        pass

    # Check that the output was dumped correctly.
    self.AssertLogsContain(logs, 'Something went wrong.')
    self.AssertOutputContainsLine('Captured stdout was')
    self.AssertOutputContainsLine('Captured stderr was')
    self.AssertOutputContainsLine('foo')
    self.AssertOutputContainsLine('bar', check_stderr=True)

  def testExceptionHandlingInWorkspace(self):
    """Test that stdout/stderr files are moved correctly if in a workspace."""
    def func():
      print('foo')
      print('bar', file=sys.stderr)
      raise FakeException()

    self.CreateWorkspace()
    op = TestWrapperProgressBarOperation()
    try:
      with cros_test_lib.LoggingCapturer() as logs:
        op.Run(func)
    except parallel.BackgroundFailure as e:
      if not e.HasFailureType(FakeException):
        raise e

    # Check that the files have been moved to the right location.
    self.assertExists(os.path.join(self.workspace_path,
                                   workspace_lib.WORKSPACE_LOGS_DIR, 'stdout'))
    self.assertExists(os.path.join(self.workspace_path,
                                   workspace_lib.WORKSPACE_LOGS_DIR, 'stderr'))
    # Check that the log message contains the path.
    self.AssertLogsContain(logs, self.workspace_path)

  def testLogLevel(self):
    """Test that the log level of the function running is set correctly."""
    func_log_level = logging.DEBUG
    test_log_level = logging.NOTICE
    expected_output = 'hi'
    def func():
      if logging.getLogger().getEffectiveLevel() == func_log_level:
        print(expected_output)

    logging.getLogger().setLevel(test_log_level)
    op = TestWrapperProgressBarOperation()
    with self.OutputCapturer():
      op.Run(func, update_period=0.05, log_level=func_log_level)

    # Check that OutputCapturer contains the expected output. This means that
    # the log level was changed.
    self.AssertOutputContainsLine(expected_output)
    # Check that the log level was restored after the function executed.
    self.assertEqual(logging.getLogger().getEffectiveLevel(), test_log_level)

  def testParallelEmergeOperationParseOutputTotalNotFound(self):
    """Test that ParallelEmergeOperation.ParseOutput if total is not set."""
    def func():
      print('hi')

    op = operation.ParallelEmergeOperation()
    with self.OutputCapturer():
      op.Run(func)

    # Check that the output is empty.
    self.AssertOutputContainsLine('hi', check_stderr=True, invert=True)

  def testParallelEmergeOperationParseOutputTotalIsZero(self):
    """Test that ParallelEmergeOperation.ParseOutput if total is zero."""
    def func():
      print('Total: 0 packages.')

    op = operation.ParallelEmergeOperation()
    with self.OutputCapturer():
      with cros_test_lib.LoggingCapturer() as logs:
        op.Run(func)

    # Check that no progress bar is printed.
    self.AssertOutputContainsLine('%', check_stderr=True, invert=True)
    # Check logs contain message.
    self.AssertLogsContain(logs, 'No packages to build.')

  def testParallelEmergeOperationParseOutputTotalNonZero(self):
    """Test that ParallelEmergeOperation.ParseOutput's progress bar updates."""
    def func(queue):
      print('Total: 2 packages.')
      for _ in xrange(2):
        queue.get()
        print('Completed ')

    queue = multiprocessing.Queue()
    op = FakeParallelEmergeOperation(queue)
    with self.OutputCapturer():
      op.Run(func, queue, update_period=0.005)

    # Check that progress bar prints correctly at 0%, 50%, and 100%.
    self.AssertOutputContainsLine('0%')
    self.AssertOutputContainsLine('50%')
    self.AssertOutputContainsLine('100%')
