# -*- coding: utf-8 -*-
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Unit tests for chromite.lib.git and helpers for testing that module."""

from __future__ import print_function

import errno
import os
import shutil

import mock

from chromite.lib import constants
from chromite.lib import cros_build_lib
from chromite.lib import cros_test_lib
from chromite.lib import git
from chromite.lib import osutils
from chromite.lib import partial_mock


class ManifestMock(partial_mock.PartialMock):
  """Partial mock for git.Manifest."""
  TARGET = 'chromite.lib.git.Manifest'
  ATTRS = ('_RunParser',)

  def _RunParser(self, *_args):
    pass


class ManifestCheckoutMock(partial_mock.PartialMock):
  """Partial mock for git.ManifestCheckout."""
  TARGET = 'chromite.lib.git.ManifestCheckout'
  ATTRS = ('_GetManifestsBranch',)

  def _GetManifestsBranch(self, _root):
    return 'default'


class GitWrappersTest(cros_test_lib.RunCommandTempDirTestCase):
  """Tests for small git wrappers"""

  CHANGE_ID = 'I0da12ef6d2c670305f0281641bc53db22faf5c1a'
  COMMIT_LOG = """
foo: Change to foo.

Change-Id: %s
""" % CHANGE_ID

  PUSH_REMOTE = 'fake_remote'
  PUSH_BRANCH = 'fake_branch'
  PUSH_LOCAL = 'fake_local_branch'

  def setUp(self):
    self.fake_git_dir = os.path.join(self.tempdir, 'foo/bar')
    self.fake_file = 'baz'
    self.fake_path = os.path.join(self.fake_git_dir, self.fake_file)

  def testInit(self):
    git.Init(self.fake_path)

    # Should have created the git repo directory, if it didn't exist.
    self.assertExists(self.fake_git_dir)
    self.assertCommandContains(['init'])

  def testClone(self):
    url = 'http://happy/git/repo'

    git.Clone(self.fake_git_dir, url)

    # Should have created the git repo directory, if it didn't exist.
    self.assertExists(self.fake_git_dir)
    self.assertCommandContains(['git', 'clone', url, self.fake_git_dir])

  def testCloneComplex(self):
    url = 'http://happy/git/repo'
    ref = 'other/git/repo'

    git.Clone(self.fake_git_dir, url,
              reference=ref, branch='feature', single_branch=True)

    self.assertCommandContains(['git', 'clone', url, self.fake_git_dir,
                                '--reference', ref,
                                '--branch', 'feature', '--single-branch'])

  def testShallowFetch(self):
    url = 'http://happy/git/repo'

    sparse_checkout = os.path.join(self.fake_git_dir,
                                   '.git', 'info', 'sparse-checkout')
    osutils.SafeMakedirs(os.path.dirname(sparse_checkout))

    git.ShallowFetch(self.fake_git_dir, url,
                     sparse_checkout=['dir1/file1', 'dir2/file2'])

    # Should have created the git repo directory, if it didn't exist.
    self.assertExists(self.fake_git_dir)
    self.assertCommandContains(['init'])
    self.assertCommandContains(['config', 'core.sparsecheckout', 'true'])
    self.assertCommandContains(['remote', 'add', 'origin', url])
    self.assertCommandContains(['fetch', '--depth=1'])
    self.assertCommandContains(['pull', 'origin', 'master'])
    self.assertEqual(osutils.ReadFile(sparse_checkout),
                     'dir1/file1\ndir2/file2')

  def testFindGitTopLevel(self):
    git.FindGitTopLevel(self.fake_path)
    self.assertCommandContains(['--show-toplevel'])

  def testGetCurrentBranchOrId_NoBranch(self):
    test_hash = '5' * 40
    self.rc.AddCmdResult(partial_mock.In('symbolic-ref'), returncode=1)
    self.rc.AddCmdResult(
        partial_mock.In('rev-parse'), output='%s\n' % test_hash)
    self.assertEqual(git.GetCurrentBranchOrId(self.fake_path), test_hash)
    self.assertCommandContains(['rev-parse', 'HEAD'])

  def testGetCurrentBranchOrId_OnBranch(self):
    self.rc.AddCmdResult(
        partial_mock.In('symbolic-ref'), output='refs/heads/branch\n')
    self.assertEqual(git.GetCurrentBranchOrId(self.fake_path), 'branch')
    self.assertCommandContains(['symbolic-ref', '-q', 'HEAD'])

  def testAddPath(self):
    git.AddPath(self.fake_path)
    self.assertCommandContains(['add'])
    self.assertCommandContains([self.fake_file])

  def testRmPath(self):
    git.RmPath(self.fake_path)
    self.assertCommandContains(['rm'])
    self.assertCommandContains([self.fake_file])

  def testGetObjectAtRev(self):
    git.GetObjectAtRev(self.fake_git_dir, '.', '1234')
    self.assertCommandContains(['show'])

  def testRevertPath(self):
    git.RevertPath(self.fake_git_dir, self.fake_file, '1234')
    self.assertCommandContains(['checkout'])
    self.assertCommandContains([self.fake_file])

  def testCommit(self):
    self.rc.AddCmdResult(partial_mock.In('log'), output=self.COMMIT_LOG)
    git.Commit(self.fake_git_dir, 'bar')
    self.assertCommandContains(['--amend'], expected=False)
    cid = git.Commit(self.fake_git_dir, 'bar', amend=True)
    self.assertCommandContains(['--amend'])
    self.assertCommandContains(['--allow-empty'], expected=False)
    self.assertEqual(cid, self.CHANGE_ID)
    cid = git.Commit(self.fake_git_dir, 'new', allow_empty=True)
    self.assertCommandContains(['--allow-empty'])

  def testUploadCLNormal(self):
    git.UploadCL(self.fake_git_dir, self.PUSH_REMOTE, self.PUSH_BRANCH,
                 local_branch=self.PUSH_LOCAL)
    self.assertCommandContains(['%s:refs/for/%s' % (self.PUSH_LOCAL,
                                                    self.PUSH_BRANCH)],
                               capture_output=False)

  def testUploadCLDraft(self):
    git.UploadCL(self.fake_git_dir, self.PUSH_REMOTE, self.PUSH_BRANCH,
                 local_branch=self.PUSH_LOCAL, draft=True)
    self.assertCommandContains(['%s:refs/drafts/%s' % (self.PUSH_LOCAL,
                                                       self.PUSH_BRANCH)],
                               capture_output=False)

  def testUploadCLCaptured(self):
    git.UploadCL(self.fake_git_dir, self.PUSH_REMOTE, self.PUSH_BRANCH,
                 local_branch=self.PUSH_LOCAL, draft=True, capture_output=True)
    self.assertCommandContains(['%s:refs/drafts/%s' % (self.PUSH_LOCAL,
                                                       self.PUSH_BRANCH)],
                               capture_output=True)

  def testGetGitRepoRevision(self):
    git.GetGitRepoRevision(self.fake_git_dir)
    self.assertCommandContains(['rev-parse', 'HEAD'])
    git.GetGitRepoRevision(self.fake_git_dir, branch='branch')
    self.assertCommandContains(['rev-parse', 'branch'])
    git.GetGitRepoRevision(self.fake_git_dir, short=True)
    self.assertCommandContains(['rev-parse', '--short', 'HEAD'])
    git.GetGitRepoRevision(self.fake_git_dir, branch='branch', short=True)
    self.assertCommandContains(['rev-parse', '--short', 'branch'])

  def testGetGitGitdir(self):
    git.Init(self.fake_git_dir)
    os.makedirs(os.path.join(self.fake_git_dir, '.git', 'refs', 'heads'))
    os.makedirs(os.path.join(self.fake_git_dir, '.git', 'objects'))
    other_file = os.path.join(self.fake_git_dir, 'other_file')
    osutils.Touch(other_file)

    ret = git.GetGitGitdir(self.fake_git_dir)
    self.assertEqual(ret, os.path.join(self.fake_git_dir, '.git'))

  def testGetGitGitdir_bare(self):
    git.Init(self.fake_git_dir)
    os.makedirs(os.path.join(self.fake_git_dir, 'refs', 'heads'))
    os.makedirs(os.path.join(self.fake_git_dir, 'objects'))
    config_file = os.path.join(self.fake_git_dir, 'config')
    osutils.Touch(config_file)

    ret = git.GetGitGitdir(self.fake_git_dir)
    self.assertEqual(ret, self.fake_git_dir)

  def testGetGitGitdir_worktree(self):
    dotgit = os.path.join(self.tempdir, '.git')
    osutils.WriteFile(dotgit, 'gitdir: /foo')
    ret = git.GetGitGitdir(self.tempdir)
    self.assertEqual(ret, dotgit)

  def testGetGitGitdir_negative(self):
    ret = git.GetGitGitdir(self.tempdir)
    self.assertFalse(ret)

  def testDeleteStaleLocks(self):
    git.Init(self.fake_git_dir)
    refs_heads = os.path.join(self.fake_git_dir, '.git', 'refs', 'heads')
    os.makedirs(refs_heads)
    objects = os.path.join(self.fake_git_dir, '.git', 'objects')
    os.makedirs(objects)
    fake_lock = os.path.join(refs_heads, 'master.lock')
    osutils.Touch(fake_lock)
    os.makedirs(self.fake_path)
    dot_lock_not_in_dot_git = os.path.join(self.fake_git_dir, 'some.lock')
    osutils.Touch(dot_lock_not_in_dot_git)
    other_file = os.path.join(self.fake_path, 'other_file')
    osutils.Touch(other_file)

    git.DeleteStaleLocks(self.fake_git_dir)
    self.assertExists(os.path.join(self.fake_git_dir, '.git'))
    self.assertExists(refs_heads)
    self.assertExists(objects)
    self.assertExists(dot_lock_not_in_dot_git)
    self.assertExists(other_file)
    self.assertNotExists(fake_lock)

  def testDeleteStaleLocks_bare(self):
    git.Init(self.fake_git_dir)
    refs_heads = os.path.join(self.fake_git_dir, 'refs', 'heads')
    os.makedirs(refs_heads)
    objects = os.path.join(self.fake_git_dir, 'objects')
    os.makedirs(objects)
    fake_lock = os.path.join(refs_heads, 'master.lock')
    osutils.Touch(fake_lock)
    os.makedirs(self.fake_path)
    other_file = os.path.join(self.fake_path, 'other_file')
    osutils.Touch(other_file)

    git.DeleteStaleLocks(self.fake_git_dir)
    self.assertExists(refs_heads)
    self.assertExists(objects)
    self.assertExists(other_file)
    self.assertNotExists(fake_lock)


class LogTest(cros_test_lib.RunCommandTestCase):
  """Tests for git.Log"""

  def testNoArgs(self):
    git.Log('git/repo/path')
    self.assertCommandContains(['git', 'log'], cwd='git/repo/path')

  def testAllArgs(self):
    git.Log('git/repo/path', format='format:"%cd"',
            after='1996-01-01', until='1997-01-01', reverse=True,
            date='unix', max_count='1',
            grep='^Change-ID: I9f701664d849197cf183fc1fb46f7523095c359c$',
            rev='m/master', paths=['my/path'])
    self.assertCommandContains(
        [
            'git', 'log', '--format=format:"%cd"', '--after=1996-01-01',
            '--until=1997-01-01', '--reverse', '--date=unix', '--max-count=1',
            '--grep=^Change-ID: I9f701664d849197cf183fc1fb46f7523095c359c$',
            'm/master',
            '--', 'my/path',
        ], cwd='git/repo/path')


class ChangeIdTest(cros_test_lib.MockTestCase):
  """Tests for git.GetChangeId function."""

  def testHEAD(self):
    """Test the parsing of the git.GetChangeId function for HEAD."""

    log_output = """
lib/git: break out ChangeId into its own function

Code in Commit() will get the Change-Id after doing a git commit,
but in some use cases, we want to get the Change-Id of a commit
that already exists, without changing it. Move this code into its
own function that Commit() calls or an external user can call it
directly.

BUG=None
TEST=Start python3
>>> from chromite.lib import git
>>> print(git.GetChangeId('.'))
>>> exit(0)
$ git show
Compare the Change-Id printed by the python code with that shown

Change-Id: Ia7b712c42ff83c52c0fb5d88d1ef6c62f49da88d
"""
    result = cros_build_lib.CommandResult(output=log_output)
    self.PatchObject(git, 'RunGit', return_value=result)

    changeid = git.GetChangeId('git/repo/path')
    self.assertEqual(changeid, 'Ia7b712c42ff83c52c0fb5d88d1ef6c62f49da88d')

  def testSpecificSHA(self):
    """Test the parsing of the git.GetChangeId function for a specific SHA."""

    sha = '235511fbd7158c6d02c070944eb59cf47b37fcb5'
    log_output = """
Add user cros-disks to group android-everybody

This allows cros-disks to access 'Play Files'.

BUG=chromium:996549
TEST=Manually built and inspected group file

Cq-Depend: chromium:2032906
Change-Id: Id31c1211f95d7f5c3a94fbe8c028f65d3509f363
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/chromite/+/2040633
Reviewed-by: Mike Frysinger <vapier@chromium.org>
Commit-Queue: François Degros <fdegros@chromium.org>
Tested-by: François Degros <fdegros@chromium.org>
"""
    result = cros_build_lib.CommandResult(output=log_output)
    self.PatchObject(git, 'RunGit', return_value=result)

    changeid = git.GetChangeId('git/repo/path', sha)
    self.assertEqual(changeid, 'Id31c1211f95d7f5c3a94fbe8c028f65d3509f363')

  def testNoChangeId(self):
    """Test git.GetChangeId function if there is no Change-Id."""

    log_output = """
lib/git: break out ChangeId into its own function

Code in Commit() will get the Change-Id after doing a git commit,
but in some use cases, we want to get the Change-Id of a commit
that already exists, without changing it. Move this code into its
own function that Commit() calls or an external user can call it
directly.

BUG=None
TEST=Start python3
>>> from chromite.lib import git
>>> print(git.GetChangeId('.'))
>>> exit(0)
$ git show
Compare the Change-Id printed by the python code with that shown
"""
    result = cros_build_lib.CommandResult(output=log_output)
    self.PatchObject(git, 'RunGit', return_value=result)

    changeid = git.GetChangeId('git/repo/path')
    self.assertIsNone(changeid)

  def testChangeIdInTextCol1(self):
    """Test git.GetChangeId when 'Change-Id' is in the text, not as a key."""

    log_output = """
new_variant: track branch name and change-id

new_variant.py calls several scripts to create a new variant of a
reference board. Each of these scripts adds or modifies files and
creates a new commit. Track the branch name and change-id of each
commit in preparation for uploading to gerrit.
This CL builds on the changes in
Change-Id: I53af157625257ee1ecf39a4ced979138890b54f1
and while I would normally use a relation chain or cq-depend, I'm
putting the text directly in here so that the unit test will fail
until I fix the code to handle the pathological cases.

Cq-Depend: chromium:2041804
Change-Id: Ib71696f76dc80f1a76b8e7a73493c6c2668e2c6f
"""
    result = cros_build_lib.CommandResult(output=log_output)
    self.PatchObject(git, 'RunGit', return_value=result)

    self.assertRaises(ValueError, git.GetChangeId, 'git/repo/path')

  def testChangeIdInTextNotCol1(self):
    """Test git.GetChangeId when 'Change-Id' is in the text, not as a key."""

    log_output = """
new_variant: track branch name and change-id

new_variant.py calls several scripts to create a new variant of a
reference board. Each of these scripts adds or modifies files and
creates a new commit. Track the branch name and change-id of each
commit in preparation for uploading to gerrit. This CL builds on the
changes in Change-Id: I53af157625257ee1ecf39a4ced979138890b54f1
and while I would normally use a relation chain or cq-depend, I'm
putting the text directly in here so that the unit test will fail
until I fix the code to handle the pathological cases.

Cq-Depend: chromium:2041804
Change-Id: Ib71696f76dc80f1a76b8e7a73493c6c2668e2c6f
"""
    result = cros_build_lib.CommandResult(output=log_output)
    self.PatchObject(git, 'RunGit', return_value=result)

    changeid = git.GetChangeId('git/repo/path')
    self.assertEqual(changeid, 'Ib71696f76dc80f1a76b8e7a73493c6c2668e2c6f')


class ProjectCheckoutTest(cros_test_lib.TestCase):
  """Tests for git.ProjectCheckout"""

  def setUp(self):
    self.fake_unversioned_patchable = git.ProjectCheckout(
        dict(name='chromite',
             path='src/chromite',
             revision='remotes/for/master'))
    self.fake_unversioned_unpatchable = git.ProjectCheckout(
        dict(name='chromite',
             path='src/platform/somethingsomething/chromite',
             # Pinned to a SHA1.
             revision='1deadbeeaf1deadbeeaf1deadbeeaf1deadbeeaf'))
    self.fake_versioned_patchable = git.ProjectCheckout(
        dict(name='chromite',
             path='src/chromite',
             revision='1deadbeeaf1deadbeeaf1deadbeeaf1deadbeeaf',
             upstream='remotes/for/master'))
    self.fake_versioned_unpatchable = git.ProjectCheckout(
        dict(name='chromite',
             path='src/chromite',
             revision='1deadbeeaf1deadbeeaf1deadbeeaf1deadbeeaf',
             upstream='1deadbeeaf1deadbeeaf1deadbeeaf1deadbeeaf'))


class RawDiffTest(cros_test_lib.MockTestCase):
  """Tests for git.RawDiff function."""

  def testRawDiff(self):
    """Test the parsing of the git.RawDiff function."""

    diff_output = """
:100644 100644 ac234b2... 077d1f8... M\tchromeos-base/chromeos-chrome/Manifest
:100644 100644 9e5d11b... 806bf9b... R099\tchromeos-base/chromeos-chrome/chromeos-chrome-40.0.2197.0_rc-r1.ebuild\tchromeos-base/chromeos-chrome/chromeos-chrome-40.0.2197.2_rc-r1.ebuild
:100644 100644 70d6e94... 821c642... M\tchromeos-base/chromeos-chrome/chromeos-chrome-9999.ebuild
:100644 100644 be445f9... be445f9... R100\tchromeos-base/chromium-source/chromium-source-40.0.2197.0_rc-r1.ebuild\tchromeos-base/chromium-source/chromium-source-40.0.2197.2_rc-r1.ebuild
"""
    result = cros_build_lib.CommandResult(output=diff_output)
    self.PatchObject(git, 'RunGit', return_value=result)

    entries = git.RawDiff('foo', 'bar')
    self.assertEqual(entries, [
        ('100644', '100644', 'ac234b2', '077d1f8', 'M', None,
         'chromeos-base/chromeos-chrome/Manifest', None),
        ('100644', '100644', '9e5d11b', '806bf9b', 'R', '099',
         'chromeos-base/chromeos-chrome/'
         'chromeos-chrome-40.0.2197.0_rc-r1.ebuild',
         'chromeos-base/chromeos-chrome/'
         'chromeos-chrome-40.0.2197.2_rc-r1.ebuild'),
        ('100644', '100644', '70d6e94', '821c642', 'M', None,
         'chromeos-base/chromeos-chrome/chromeos-chrome-9999.ebuild', None),
        ('100644', '100644', 'be445f9', 'be445f9', 'R', '100',
         'chromeos-base/chromium-source/'
         'chromium-source-40.0.2197.0_rc-r1.ebuild',
         'chromeos-base/chromium-source/'
         'chromium-source-40.0.2197.2_rc-r1.ebuild')
    ])

  def testEmptyDiff(self):
    """Verify an empty diff doesn't crash."""
    result = cros_build_lib.CommandResult(output='\n')
    self.PatchObject(git, 'RunGit', return_value=result)
    entries = git.RawDiff('foo', 'bar')
    self.assertEqual([], entries)


class GitPushTest(cros_test_lib.RunCommandTestCase):
  """Tests for git.GitPush function."""

  # Non fast-forward push error message.
  NON_FF_PUSH_ERROR = (
      'To https://localhost/repo.git\n'
      '! [remote rejected] master -> master (non-fast-forward)\n'
      "error: failed to push some refs to 'https://localhost/repo.git'\n")

  # List of possible GoB transient errors.
  TRANSIENT_ERRORS = (
      # Hook error when creating a new branch from SHA1 ref.
      ('remote: Processing changes: (-)To https://localhost/repo.git\n'
       '! [remote rejected] 6c78ca083c3a9d64068c945fd9998eb1e0a3e739 -> '
       'stabilize-4636.B (error in hook)\n'
       "error: failed to push some refs to 'https://localhost/repo.git'\n"),

      # 'failed to lock' error when creating a new branch from SHA1 ref.
      ('remote: Processing changes: done\nTo https://localhost/repo.git\n'
       '! [remote rejected] 4ea09c129b5fedb261bae2431ce2511e35ac3923 -> '
       'stabilize-daisy-4319.96.B (failed to lock)\n'
       "error: failed to push some refs to 'https://localhost/repo.git'\n"),

      # Hook error when pushing branch.
      ('remote: Processing changes: (\\)To https://localhost/repo.git\n'
       '! [remote rejected] temp_auto_checkin_branch -> '
       'master (error in hook)\n'
       "error: failed to push some refs to 'https://localhost/repo.git'\n"),

      # Another kind of error when pushing a branch.
      'fatal: remote error: Internal Server Error',

      # crbug.com/298189
      ('error: gnutls_handshake() failed: A TLS packet with unexpected length '
       'was received. while accessing '
       'http://localhost/repo.git/info/refs?service=git-upload-pack\n'
       'fatal: HTTP request failed'),

      # crbug.com/298189
      ("fatal: unable to access 'https://localhost/repo.git': GnuTLS recv "
       'error (-9): A TLS packet with unexpected length was received.'),
  )

  def setUp(self):
    self.StartPatcher(mock.patch('time.sleep'))

  @staticmethod
  def _RunGitPush():
    """Runs git.GitPush with some default arguments."""
    git.GitPush('some_repo_path', 'local-ref',
                git.RemoteRef('some-remote', 'remote-ref'))

  def testGitPushSimple(self):
    """Test GitPush with minimal arguments."""
    git.GitPush('git_path', 'HEAD', git.RemoteRef('origin', 'master'))
    self.assertCommandCalled(['git', 'push', 'origin', 'HEAD:master'],
                             capture_output=True, print_cmd=False,
                             cwd='git_path', encoding='utf-8')

  def testGitPushComplix(self):
    """Test GitPush with some arguments."""
    git.GitPush('git_path', 'HEAD', git.RemoteRef('origin', 'master'),
                force=True, dry_run=True)
    self.assertCommandCalled(['git', 'push', 'origin', 'HEAD:master',
                              '--force', '--dry-run'],
                             capture_output=True, print_cmd=False,
                             cwd='git_path', encoding='utf-8')

  def testNonFFPush(self):
    """Non fast-forward push error propagates to the caller."""
    self.rc.AddCmdResult(partial_mock.In('push'), returncode=128,
                         error=self.NON_FF_PUSH_ERROR)
    self.assertRaises(cros_build_lib.RunCommandError, self._RunGitPush)

  def testPersistentTransientError(self):
    """GitPush fails if transient error occurs multiple times."""
    for error in self.TRANSIENT_ERRORS:
      self.rc.AddCmdResult(partial_mock.In('push'), returncode=128,
                           error=error)
      self.assertRaises(cros_build_lib.RunCommandError, self._RunGitPush)


class GitIntegrationTest(cros_test_lib.TempDirTestCase):
  """Tests that git library functions work with actual git repos."""

  def setUp(self):
    self.source = os.path.join(self.tempdir, 'src')
    git.Init(self.source)
    # Nerf any hooks the OS might have installed on us as they aren't going to
    # be useful to us, just slow things down.
    shutil.rmtree(os.path.join(self.source, '.git', 'hooks'))
    cros_build_lib.run(
        ['git', 'commit', '--allow-empty', '-m', 'initial commit'],
        cwd=self.source, print_cmd=False, capture_output=True)

  def _CommitFile(self, repo, filename, content):
    osutils.WriteFile(os.path.join(repo, filename), content)
    git.AddPath(os.path.join(repo, filename))
    git.Commit(repo, 'commit %s' % (cros_build_lib.GetRandomString(),))
    return git.GetGitRepoRevision(repo)

  def testIsReachable(self):
    sha1 = self._CommitFile(self.source, 'foo', 'foo')
    sha2 = self._CommitFile(self.source, 'bar', 'bar')
    self.assertTrue(git.IsReachable(self.source, sha1, sha2))
    self.assertFalse(git.IsReachable(self.source, sha2, sha1))

  def testDoesCommitExistInRepoWithAmbiguousBranchName(self):
    git.CreateBranch(self.source, 'peach', track=True)
    self._CommitFile(self.source, 'peach', 'Keep me.')
    self.assertTrue(git.DoesCommitExistInRepo(self.source, 'peach'))


class ManifestCheckoutTest(cros_test_lib.TempDirTestCase):
  """Tests for ManifestCheckout functionality."""

  def setUp(self):
    self.manifest_dir = os.path.join(self.tempdir, '.repo', 'manifests')

    # Initialize a repo instance here.
    local_repo = os.path.join(constants.SOURCE_ROOT, '.repo/repo/.git')

    # TODO(evanhernandez): This is a hack. Find a way to simplify this test.
    # We used to use the current checkout's manifests.git, but that caused
    # problems in production environemnts.
    remote_manifests = os.path.join(self.tempdir, 'remote', 'manifests.git')
    osutils.SafeMakedirs(remote_manifests)
    git.Init(remote_manifests)
    default_manifest = os.path.join(remote_manifests, 'default.xml')
    osutils.WriteFile(
        default_manifest,
        '<?xml version="1.0" encoding="UTF-8"?><manifest></manifest>')
    git.AddPath(default_manifest)
    git.Commit(remote_manifests, 'dummy commit', allow_empty=True)
    git.CreateBranch(remote_manifests, 'default')
    git.CreateBranch(remote_manifests, 'release-R23-2913.B')
    git.CreateBranch(remote_manifests, 'release-R23-2913.B-suffix')
    git.CreateBranch(remote_manifests, 'firmware-link-')
    # This must come last as it sets up HEAD for the default branch, and repo
    # uses that to figure out which branch to check out.
    git.CreateBranch(remote_manifests, 'master')

    # Create a copy of our existing manifests.git, but rewrite it so it
    # looks like a remote manifests.git.  This is to avoid hitting the
    # network, and speeds things up in general.
    local_manifests = 'file://%s' % remote_manifests
    temp_manifests = os.path.join(self.tempdir, 'manifests.git')
    git.RunGit(self.tempdir, ['clone', '-n', '--bare', local_manifests])
    git.RunGit(temp_manifests,
               ['fetch', '-f', '-u', local_manifests,
                'refs/remotes/origin/*:refs/heads/*'])
    git.RunGit(temp_manifests, ['branch', '-D', 'default'])
    cros_build_lib.run([
        'repo', 'init', '-u', temp_manifests,
        '--repo-branch', 'default', '--repo-url', 'file://%s' % local_repo,
    ], cwd=self.tempdir)

    self.active_manifest = os.path.realpath(
        os.path.join(self.tempdir, '.repo', 'manifest.xml'))

  def testManifestInheritance(self):
    osutils.WriteFile(self.active_manifest, """
        <manifest>
          <include name="include-target.xml" />
          <include name="empty.xml" />
          <project name="monkeys" path="baz" remote="foon" revision="master" />
        </manifest>""")
    # First, verify it properly explodes if the include can't be found.
    self.assertRaises(EnvironmentError,
                      git.ManifestCheckout, self.tempdir)

    # Next, verify it can read an empty manifest; this is to ensure
    # that we can point Manifest at the empty manifest without exploding,
    # same for ManifestCheckout; this sort of thing is primarily useful
    # to ensure no step of an include assumes everything is yet assembled.
    empty_path = os.path.join(self.manifest_dir, 'empty.xml')
    osutils.WriteFile(empty_path, '<manifest/>')
    git.Manifest(empty_path)
    git.ManifestCheckout(self.tempdir, manifest_path=empty_path)

    # Next, verify include works.
    osutils.WriteFile(
        os.path.join(self.manifest_dir, 'include-target.xml'),
        """
        <manifest>
          <remote name="foon" fetch="http://localhost" />
        </manifest>""")
    manifest = git.ManifestCheckout(self.tempdir)
    self.assertEqual(list(manifest.checkouts_by_name), ['monkeys'])
    self.assertEqual(list(manifest.remotes), ['foon'])

  def testGetManifestsBranch(self):
    # pylint: disable=protected-access
    func = git.ManifestCheckout._GetManifestsBranch
    manifest = self.manifest_dir
    repo_root = self.tempdir

    # pylint: disable=unused-argument
    def reconfig(merge='master', origin='origin'):
      if merge is not None:
        merge = 'refs/heads/%s' % merge
      for key in ('merge', 'origin'):
        val = locals()[key]
        key = 'branch.default.%s' % key
        if val is None:
          git.RunGit(manifest, ['config', '--unset', key], check=False)
        else:
          git.RunGit(manifest, ['config', key, val])

    # First, verify our assumptions about a fresh repo init are correct.
    self.assertEqual('default', git.GetCurrentBranch(manifest))
    self.assertEqual('master', func(repo_root))

    # Ensure we can handle a missing origin; this can occur jumping between
    # branches, and can be worked around.
    reconfig(origin=None)
    self.assertEqual('default', git.GetCurrentBranch(manifest))
    self.assertEqual('master', func(repo_root))

    def assertExcept(message, **kwargs):
      reconfig(**kwargs)
      self.assertRaises2(OSError, func, repo_root, ex_msg=message,
                         check_attrs={'errno': errno.ENOENT})

    # No merge target means the configuration isn't usable, period.
    assertExcept('git tracking configuration for that branch is broken',
                 merge=None)

    # Ensure we detect if we're on the wrong branch, even if it has
    # tracking setup.
    git.RunGit(manifest, ['checkout', '-t', 'origin/master', '-b', 'test'])
    assertExcept("It should be checked out to 'default'")

    # Ensure we handle detached HEAD w/ an appropriate exception.
    git.RunGit(manifest, ['checkout', '--detach', 'test'])
    assertExcept("It should be checked out to 'default'")

    # Finally, ensure that if the default branch is non-existant, we still throw
    # a usable exception.
    git.RunGit(manifest, ['branch', '-d', 'default'])
    assertExcept("It should be checked out to 'default'")

  def testGitMatchBranchName(self):
    git_repo = os.path.join(self.tempdir, '.repo', 'manifests')

    branches = git.MatchBranchName(git_repo, 'default', namespace='')
    self.assertEqual(branches, ['refs/heads/default'])

    branches = git.MatchBranchName(git_repo, 'default', namespace='refs/heads/')
    self.assertEqual(branches, ['default'])

    branches = git.MatchBranchName(git_repo, 'origin/f.*link',
                                   namespace='refs/remotes/')
    self.assertTrue('firmware-link-' in branches[0])

    branches = git.MatchBranchName(git_repo, 'r23')
    self.assertEqual(branches,
                     ['refs/remotes/origin/release-R23-2913.B',
                      'refs/remotes/origin/release-R23-2913.B-suffix'])

    branches = git.MatchBranchName(git_repo, 'release-R23-2913.B')
    self.assertEqual(branches, ['refs/remotes/origin/release-R23-2913.B'])

    branches = git.MatchBranchName(git_repo, 'release-R23-2913.B',
                                   namespace='refs/remotes/origin/')
    self.assertEqual(branches, ['release-R23-2913.B'])

    branches = git.MatchBranchName(git_repo, 'release-R23-2913.B',
                                   namespace='refs/remotes/')
    self.assertEqual(branches, ['origin/release-R23-2913.B'])
