#!/usr/bin/python

# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Unit tests for autotest_quickmerge."""

import os
import sys
import types
import unittest
import mox


sys.path.insert(0, os.path.abspath('%s/../..' % os.path.dirname(__file__)))
from chromite.lib import cros_build_lib_unittest
from chromite.lib import cros_test_lib
from chromite.scripts import autotest_quickmerge


RSYNC_TEST_OUTPUT = """.d..t...... ./
>f..t...... touched file with spaces
>f..t...... touched_file
>f.st...... modified_contents_file
.f...p..... modified_permissions_file
.f....o.... modified_owner_file
>f+++++++++ new_file
cL+++++++++ new_symlink -> directory_a/new_file_in_directory
.d..t...... directory_a/
>f+++++++++ directory_a/new_file_in_directory
>f..t...... directory_a/touched_file_in_directory
cd+++++++++ new_empty_directory/
.d..t...... touched_empty_directory/"""
# The output format of rsync's itemized changes has a few unusual cases
# that are ambiguous. For instance, if the operation involved creating a
# symbolic link named "a -> b" to a file named "c", the rsync output would be:
# cL+++++++++ a -> b -> c
# which is indistinguishable from the output for creating a symbolic link named
# "a" to a file named "b -> c".
# Since there is no easy resolution to this ambiguity, and it seems like a case
# that would rarely or never be encountered in the wild, rsync quickmerge
# will exclude all files which contain the substring " -> " in their name.

RSYNC_TEST_OUTPUT_FOR_PACKAGE_UPDATE = \
""">f..t...... client/ardvark.py
.d..t...... client/site_tests/
>f+++++++++ client/site_tests/nothing.py
.d..t...... client/site_tests/factory_Leds/
>f+++++++++ client/site_tests/factory_Leds/factory_Leds2.py
>f..tpog... client/site_tests/login_UserPolicyKeys/control
>f..tpog... client/site_tests/login_UserPolicyKeys/login_UserPolicyKeys.py
>f..t...... client/site_tests/platform_Cryptohome/platform_Cryptohome.py
>f..tpog... server/site_tests/security_DbusFuzzServer/control
>f..t.og... utils/coverage_suite.py
.d..t...... client/site_tests/power_Thermal/
cd+++++++++ client/site_tests/power_Thermal/a/
cd+++++++++ client/site_tests/power_Thermal/a/b/
cd+++++++++ client/site_tests/power_Thermal/a/b/c/
>f+++++++++ client/site_tests/power_Thermal/a/b/c/d.py"""

RSYNC_TEST_DESTINATION_PATH = '/foo/bar/'

TEST_PACKAGE_CP = 'a_cute/little_puppy'
TEST_PACKAGE_CPV = 'a_cute/little_puppy-3.14159'
TEST_PACKAGE_C = 'a_cute'
TEST_PACKAGE_PV = 'little_puppy-3.14159'
TEST_PORTAGE_ROOT = '/bib/bob/'
TEST_PACKAGE_OLDCONTENTS = {
  u'/by/the/prickling/of/my/thumbs'   : (u'obj', '1234', '4321'),
  u'/something/wicked/this/way/comes' : (u'dir',)
}

class ItemizeChangesFromRsyncOutput(unittest.TestCase):
  """Test autotest_quickmerge.ItemizeChangesFromRsyncOutput."""

  def testItemizeChangesFromRsyncOutput(self):
    """Test that rsync output parser returns correct FileMutations."""
    expected_new = set(
        [('>f+++++++++', '/foo/bar/new_file'),
         ('>f+++++++++', '/foo/bar/directory_a/new_file_in_directory'),
         ('cL+++++++++', '/foo/bar/new_symlink')])

    expected_mod = set(
        [('>f..t......', '/foo/bar/touched file with spaces'),
         ('>f..t......', '/foo/bar/touched_file'),
         ('>f.st......', '/foo/bar/modified_contents_file'),
         ('.f...p.....', '/foo/bar/modified_permissions_file'),
         ('.f....o....', '/foo/bar/modified_owner_file'),
         ('>f..t......', '/foo/bar/directory_a/touched_file_in_directory')])

    expected_dir = set([('cd+++++++++', '/foo/bar/new_empty_directory/')])

    report = autotest_quickmerge.ItemizeChangesFromRsyncOutput(
        RSYNC_TEST_OUTPUT, RSYNC_TEST_DESTINATION_PATH)

    self.assertEqual(expected_new, set(report.new_files))
    self.assertEqual(expected_mod, set(report.modified_files))
    self.assertEqual(expected_dir, set(report.new_directories))


class PackageNameParsingTest(unittest.TestCase):
  """Test autotest_quickmerge.GetStalePackageNames."""

  def testGetStalePackageNames(self):
    autotest_sysroot = '/an/arbitrary/path/'
    change_report = autotest_quickmerge.ItemizeChangesFromRsyncOutput(
        RSYNC_TEST_OUTPUT_FOR_PACKAGE_UPDATE, autotest_sysroot)
    package_matches = autotest_quickmerge.GetStalePackageNames(
        change_report.modified_files + change_report.new_files,
        autotest_sysroot)
    expected_set = set(['factory_Leds', 'login_UserPolicyKeys',
                        'platform_Cryptohome', 'power_Thermal'])
    self.assertEqual(set(package_matches), expected_set)


class RsyncCommandTest(cros_build_lib_unittest.RunCommandTestCase):
  """Test autotest_quickmerge.RsyncQuickmerge."""

  def testRsyncQuickmergeCommand(self):
    """Test that RsyncQuickMerge makes correct call to SudoRunCommand"""
    include_file_name = 'an_include_file_name'
    source_path = 'a_source_path'
    sysroot_path = 'a_sysroot_path'

    expected_command = ['rsync', '-a', '-n', '-u', '-i',
                        '--exclude=**.pyc', '--exclude=**.pyo',
                        '--exclude=** -> *',
                        '--include-from=%s' % include_file_name,
                        '--exclude=*',
                        source_path,
                        sysroot_path]

    autotest_quickmerge.RsyncQuickmerge(source_path, sysroot_path,
                                        include_file_name,
                                        pretend=True,
                                        overwrite=False)

    self.assertCommandContains(expected_command)


class PortageManipulationsTest(mox.MoxTestBase):
  """Test usage of autotest_quickmerge.portage."""

  def testUpdatePackageContents(self):
    """Test that UpdatePackageContents makes the correct calls to portage."""
    autotest_quickmerge.portage = self.mox.CreateMockAnything('portage')
    portage = autotest_quickmerge.portage

    portage.root = TEST_PORTAGE_ROOT

    mock_vartree = self.mox.CreateMockAnything('vartree')
    mock_vartree.settings = {'an arbitrary' : 'dictionary'}
    mock_tree = {TEST_PORTAGE_ROOT : {'vartree' : mock_vartree}}
    portage.create_trees(TEST_PORTAGE_ROOT,
        TEST_PORTAGE_ROOT).AndReturn(mock_tree)

    mock_vartree.dbapi = self.mox.CreateMockAnything('dbapi')
    mock_vartree.dbapi.cp_list(TEST_PACKAGE_CP).AndReturn([TEST_PACKAGE_CPV])

    mock_package = self.mox.CreateMockAnything('dblink')
    portage.dblink(TEST_PACKAGE_C, TEST_PACKAGE_PV, #pylint: disable-msg=E1101
        settings=mock_vartree.settings,
        vartree=mock_vartree).AndReturn(mock_package)
    mock_package.getcontents().AndReturn(TEST_PACKAGE_OLDCONTENTS)

    EXPECTED_NEW_ENTRIES = {
        '/foo/bar/new_empty_directory': (u'dir',),
        '/foo/bar/directory_a/new_file_in_directory': (u'obj', '0', '0'),
        '/foo/bar/new_file': (u'obj', '0', '0'),
        '/foo/bar/new_symlink': (u'obj', '0', '0')
    }
    RESULT_DICIONARY = TEST_PACKAGE_OLDCONTENTS.copy()
    RESULT_DICIONARY.update(EXPECTED_NEW_ENTRIES)

    mock_vartree.dbapi.writeContentsToContentsFile(mock_package,
      RESULT_DICIONARY)

    self.mox.ReplayAll()

    change_report = autotest_quickmerge.ItemizeChangesFromRsyncOutput(
      RSYNC_TEST_OUTPUT, RSYNC_TEST_DESTINATION_PATH)
    autotest_quickmerge.UpdatePackageContents(change_report, TEST_PACKAGE_CP,
        TEST_PORTAGE_ROOT)

    self.mox.VerifyAll()

class PortageAPITest(unittest.TestCase):
  """Ensures that required portage API exists."""
  def runTest(self):
    try:
      # pylint: disable=F0401
      import portage
    except ImportError:
      self.skipTest('Portage not available in test environment. Re-run test '
                    'in chroot.')
    try:
      # pylint: disable-msg=E1101
      f = portage.vardbapi.writeContentsToContentsFile
    except AttributeError:
      self.fail('Required writeContentsToContentsFile function does '
                'not exist.')

    self.assertIsInstance(f, types.UnboundMethodType,
                          'Required writeContentsToContentsFile is not '
                          'a function.')

if __name__ == '__main__':
  cros_test_lib.main()
