| #!/usr/bin/env vpython3 |
| # Copyright 2015 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """Tests for git_drover.""" |
| |
| import os |
| import shutil |
| import subprocess |
| import sys |
| import tempfile |
| import unittest |
| |
| if sys.version_info.major == 2: |
| import mock |
| else: |
| from unittest import mock |
| |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| |
| import gclient_utils |
| import git_drover |
| |
| |
| class GitDroverTest(unittest.TestCase): |
| |
| def setUp(self): |
| super(GitDroverTest, self).setUp() |
| self.maxDiff = None |
| self._temp_directory = tempfile.mkdtemp() |
| self._parent_repo = os.path.join(self._temp_directory, 'parent_repo') |
| self._target_repo = os.path.join(self._temp_directory, 'drover_branch_123') |
| os.makedirs(os.path.join(self._parent_repo, '.git')) |
| with open(os.path.join(self._parent_repo, '.git', 'config'), 'w') as f: |
| f.write('config') |
| with open(os.path.join(self._parent_repo, '.git', 'HEAD'), 'w') as f: |
| f.write('HEAD') |
| os.mkdir(os.path.join(self._parent_repo, '.git', 'info')) |
| with open( |
| os.path.join(self._parent_repo, '.git', 'info', 'refs'), 'w') as f: |
| f.write('refs') |
| mock.patch('tempfile.mkdtemp', self._mkdtemp).start() |
| mock.patch('gclient_utils.AskForData', self._get_input).start() |
| mock.patch('subprocess.check_call', self._check_call).start() |
| mock.patch('subprocess.check_output', self._check_call).start() |
| self.real_popen = subprocess.Popen |
| mock.patch('subprocess.Popen', self._Popen).start() |
| self.addCleanup(mock.patch.stopall) |
| |
| self._commands = [] |
| self._input = [] |
| self._fail_on_command = None |
| self._reviewers = '' |
| |
| self.REPO_CHECK_COMMANDS = [ |
| (['git', '--help'], self._parent_repo), |
| (['git', 'status'], self._parent_repo), |
| (['git', 'fetch', 'origin'], self._parent_repo), |
| (['git', 'rev-parse', 'refs/remotes/branch-heads/branch^{commit}'], |
| self._parent_repo), |
| (['git', 'rev-parse', 'cl^{commit}'], self._parent_repo), |
| (['git', 'show', '-s', 'cl'], self._parent_repo), |
| ] |
| self.LOCAL_REPO_COMMANDS = [ |
| (['git', 'rev-parse', '--git-dir'], self._parent_repo), |
| (['git', 'config', 'core.sparsecheckout', 'true'], self._target_repo), |
| (['git', 'checkout', '-b', 'drover_branch_123', |
| 'refs/remotes/branch-heads/branch'], self._target_repo), |
| (['git', 'cherry-pick', '-x', 'cl'], self._target_repo), |
| ] |
| self.UPLOAD_COMMANDS = [ |
| (['git', 'reset', '--hard'], self._target_repo), |
| (['git', 'log', '-1', '--format=%ae'], self._target_repo), |
| (['git', 'cl', 'upload', '--send-mail', '--tbrs', 'author@domain.org'], |
| self._target_repo), |
| ] |
| self.LAND_COMMAND = [ |
| (['git', 'cl', 'set-commit'], self._target_repo), |
| ] |
| if os.name == 'nt': |
| self.BRANCH_CLEANUP_COMMANDS = [ |
| (['rmdir', '/s', '/q', self._target_repo], None), |
| (['git', 'branch', '-D', 'drover_branch_123'], self._parent_repo), |
| ] |
| else: |
| self.BRANCH_CLEANUP_COMMANDS = [ |
| (['git', 'branch', '-D', 'drover_branch_123'], self._parent_repo), |
| ] |
| self.MANUAL_RESOLVE_PREPARATION_COMMANDS = [ |
| (['git', '-c', 'core.quotePath=false', 'status', '--porcelain'], |
| self._target_repo), |
| (['git', 'update-index', '--skip-worktree', '--stdin'], |
| self._target_repo), |
| ] |
| self.FINISH_MANUAL_RESOLVE_COMMANDS = [ |
| (['git', 'commit', '--no-edit'], self._target_repo), |
| ] |
| |
| def tearDown(self): |
| shutil.rmtree(self._temp_directory) |
| super(GitDroverTest, self).tearDown() |
| |
| def _mkdtemp(self, prefix='tmp'): |
| self.assertEqual('drover_branch_', prefix) |
| os.mkdir(self._target_repo) |
| return self._target_repo |
| |
| def _get_input(self, message): |
| result = self._input.pop(0) |
| if result == 'EOF': |
| raise EOFError |
| return result |
| |
| def _check_call(self, args, stderr=None, stdout=None, shell='', cwd=None): |
| if args[0] == 'rmdir': |
| subprocess.call(args, shell=shell) |
| else: |
| self.assertFalse(shell) |
| self._commands.append((args, cwd)) |
| if (self._fail_on_command is not None and |
| self._fail_on_command == len(self._commands)): |
| self._fail_on_command = None |
| raise subprocess.CalledProcessError(1, args[0]) |
| rv = '' |
| if args == ['git', 'rev-parse', '--git-dir']: |
| rv = os.path.join(self._parent_repo, '.git') |
| if args == ['git', '-c', 'core.quotePath=false', 'status', '--porcelain']: |
| rv = ' D foo\nUU baz\n D bar\n' |
| if args == ['git', 'log', '-1', '--format=%ae']: |
| rv = 'author@domain.org' |
| if sys.version_info.major == 3: |
| return bytes(rv, 'utf-8') |
| return rv |
| |
| def _Popen(self, args, shell=False, cwd=None, stdin=None, stdout=None, |
| stderr=None): |
| if args == ['git', 'update-index', '--skip-worktree', '--stdin']: |
| self._commands.append((args, cwd)) |
| self.assertFalse(shell) |
| self.assertEqual(stdin, subprocess.PIPE) |
| class MockPopen(object): |
| def __init__(self, *args, **kwargs): |
| self.returncode = -999 |
| def communicate(self, stdin): |
| if stdin == b'foo\nbar\n': |
| self.returncode = 0 |
| else: |
| self.returncode = 1 |
| return MockPopen() |
| else: |
| return self.real_popen(args, shell=shell, cwd=cwd, stdin=stdin, |
| stdout=stdout, stderr=stderr) |
| |
| def testSuccess(self): |
| self._input = ['y', 'y'] |
| git_drover.cherry_pick_change('branch', 'cl', self._parent_repo, False) |
| self.assertEqual( |
| self.REPO_CHECK_COMMANDS + self.LOCAL_REPO_COMMANDS + |
| self.UPLOAD_COMMANDS + self.LAND_COMMAND + self.BRANCH_CLEANUP_COMMANDS, |
| self._commands) |
| self.assertFalse(os.path.exists(self._target_repo)) |
| self.assertFalse(self._input) |
| |
| def testDryRun(self): |
| self._input = ['y'] |
| git_drover.cherry_pick_change('branch', 'cl', self._parent_repo, True) |
| self.assertEqual(self.REPO_CHECK_COMMANDS + self.LOCAL_REPO_COMMANDS + |
| self.BRANCH_CLEANUP_COMMANDS, self._commands) |
| self.assertFalse(os.path.exists(self._target_repo)) |
| self.assertFalse(self._input) |
| |
| def testCancelEarly(self): |
| self._input = ['n'] |
| git_drover.cherry_pick_change('branch', 'cl', self._parent_repo, False) |
| self.assertEqual(self.REPO_CHECK_COMMANDS, self._commands) |
| self.assertFalse(os.path.exists(self._target_repo)) |
| self.assertFalse(self._input) |
| |
| def testEOFOnConfirm(self): |
| self._input = ['EOF'] |
| git_drover.cherry_pick_change('branch', 'cl', self._parent_repo, False) |
| self.assertEqual(self.REPO_CHECK_COMMANDS, self._commands) |
| self.assertFalse(os.path.exists(self._target_repo)) |
| self.assertFalse(self._input) |
| |
| def testCancelLate(self): |
| self._input = ['y', 'n'] |
| git_drover.cherry_pick_change('branch', 'cl', self._parent_repo, False) |
| self.assertEqual(self.REPO_CHECK_COMMANDS + self.LOCAL_REPO_COMMANDS + |
| self.UPLOAD_COMMANDS + self.BRANCH_CLEANUP_COMMANDS, |
| self._commands) |
| self.assertFalse(os.path.exists(self._target_repo)) |
| self.assertFalse(self._input) |
| |
| def testFailDuringCheck(self): |
| self._input = [] |
| self._fail_on_command = 1 |
| self.assertRaises(git_drover.Error, git_drover.cherry_pick_change, 'branch', |
| 'cl', self._parent_repo, False) |
| self.assertEqual(self.REPO_CHECK_COMMANDS[:1], self._commands) |
| self.assertFalse(os.path.exists(self._target_repo)) |
| self.assertFalse(self._input) |
| |
| def testFailDuringBranchCreation(self): |
| self._input = ['y'] |
| self._fail_on_command = 8 |
| self.assertRaises(git_drover.Error, git_drover.cherry_pick_change, 'branch', |
| 'cl', self._parent_repo, False) |
| self.assertEqual(self.REPO_CHECK_COMMANDS + self.LOCAL_REPO_COMMANDS[:2] + |
| self.BRANCH_CLEANUP_COMMANDS[:-1], self._commands) |
| self.assertFalse(os.path.exists(self._target_repo)) |
| self.assertFalse(self._input) |
| |
| def testFailDuringCherryPickAndAbort(self): |
| self._input = ['y'] |
| self._fail_on_command = 10 |
| self.assertRaises(git_drover.Error, git_drover.cherry_pick_change, 'branch', |
| 'cl', self._parent_repo, False) |
| self.assertEqual(self.REPO_CHECK_COMMANDS + self.LOCAL_REPO_COMMANDS[:4] + |
| self.MANUAL_RESOLVE_PREPARATION_COMMANDS, self._commands) |
| self.assertTrue(os.path.exists(self._target_repo)) |
| self.assertFalse(self._input) |
| self._commands = [] |
| git_drover.abort_cherry_pick(self._target_repo) |
| self.assertEqual(self.BRANCH_CLEANUP_COMMANDS, self._commands) |
| self.assertFalse(os.path.exists(self._target_repo)) |
| |
| def testFailDuringCherryPickAndContinue(self): |
| self._input = ['y'] |
| self._fail_on_command = 10 |
| self.assertRaises(git_drover.PatchError, git_drover.cherry_pick_change, |
| 'branch', 'cl', self._parent_repo, False) |
| self.assertEqual(self.REPO_CHECK_COMMANDS + self.LOCAL_REPO_COMMANDS[:4] + |
| self.MANUAL_RESOLVE_PREPARATION_COMMANDS, self._commands) |
| self.assertTrue(os.path.exists(self._target_repo)) |
| self.assertFalse(self._input) |
| |
| self._commands = [] |
| self._input = ['n'] |
| git_drover.continue_cherry_pick(self._target_repo) |
| self.assertEqual(self.UPLOAD_COMMANDS, self._commands) |
| self.assertTrue(os.path.exists(self._target_repo)) |
| self.assertFalse(self._input) |
| |
| self._commands = [] |
| self._input = ['y'] |
| git_drover.continue_cherry_pick(self._target_repo) |
| self.assertEqual( |
| self.UPLOAD_COMMANDS + self.LAND_COMMAND + self.BRANCH_CLEANUP_COMMANDS, |
| self._commands) |
| self.assertFalse(os.path.exists(self._target_repo)) |
| self.assertFalse(self._input) |
| |
| def testFailDuringCherryPickAndContinueWithoutCommitting(self): |
| self._input = ['y'] |
| self._fail_on_command = 10 |
| self.assertRaises(git_drover.PatchError, git_drover.cherry_pick_change, |
| 'branch', 'cl', self._parent_repo, False) |
| self.assertEqual(self.REPO_CHECK_COMMANDS + self.LOCAL_REPO_COMMANDS[:4] + |
| self.MANUAL_RESOLVE_PREPARATION_COMMANDS, self._commands) |
| self.assertTrue(os.path.exists(self._target_repo)) |
| self.assertFalse(self._input) |
| self._commands = [] |
| with open(os.path.join(self._target_repo, '.git', 'CHERRY_PICK_HEAD'), 'w'): |
| pass |
| |
| self._commands = [] |
| self._input = ['y'] |
| git_drover.continue_cherry_pick(self._target_repo) |
| self.assertEqual(self.FINISH_MANUAL_RESOLVE_COMMANDS + self.UPLOAD_COMMANDS |
| + self.LAND_COMMAND + self.BRANCH_CLEANUP_COMMANDS, |
| self._commands) |
| self.assertFalse(os.path.exists(self._target_repo)) |
| self.assertFalse(self._input) |
| |
| def testFailDuringCherryPickAndContinueWithoutResolving(self): |
| self._input = ['y'] |
| self._fail_on_command = 10 |
| self.assertRaises(git_drover.PatchError, git_drover.cherry_pick_change, |
| 'branch', 'cl', self._parent_repo, False) |
| self.assertEqual(self.REPO_CHECK_COMMANDS + self.LOCAL_REPO_COMMANDS[:4] + |
| self.MANUAL_RESOLVE_PREPARATION_COMMANDS, self._commands) |
| self.assertTrue(os.path.exists(self._target_repo)) |
| self.assertFalse(self._input) |
| self._commands = [] |
| self._fail_on_command = 1 |
| with open(os.path.join(self._target_repo, '.git', 'CHERRY_PICK_HEAD'), 'w'): |
| pass |
| self.assertRaisesRegexp(git_drover.Error, |
| 'All conflicts must be resolved before continuing', |
| git_drover.continue_cherry_pick, self._target_repo) |
| self.assertEqual(self.FINISH_MANUAL_RESOLVE_COMMANDS, self._commands) |
| self.assertTrue(os.path.exists(self._target_repo)) |
| |
| self._commands = [] |
| git_drover.abort_cherry_pick(self._target_repo) |
| self.assertEqual(self.BRANCH_CLEANUP_COMMANDS, self._commands) |
| self.assertFalse(os.path.exists(self._target_repo)) |
| |
| def testFailAfterCherryPick(self): |
| self._input = ['y'] |
| self._fail_on_command = 11 |
| self.assertRaises(git_drover.Error, git_drover.cherry_pick_change, 'branch', |
| 'cl', self._parent_repo, False) |
| self.assertEqual(self.REPO_CHECK_COMMANDS + self.LOCAL_REPO_COMMANDS + |
| self.UPLOAD_COMMANDS[:1] + self.BRANCH_CLEANUP_COMMANDS, |
| self._commands) |
| self.assertFalse(os.path.exists(self._target_repo)) |
| self.assertFalse(self._input) |
| |
| def testFailOnUpload(self): |
| self._input = ['y'] |
| self._fail_on_command = 13 |
| self.assertRaises(git_drover.Error, git_drover.cherry_pick_change, 'branch', |
| 'cl', self._parent_repo, False) |
| self.assertEqual(self.REPO_CHECK_COMMANDS + self.LOCAL_REPO_COMMANDS + |
| self.UPLOAD_COMMANDS + self.BRANCH_CLEANUP_COMMANDS, |
| self._commands) |
| self.assertFalse(os.path.exists(self._target_repo)) |
| self.assertFalse(self._input) |
| |
| def testInvalidParentRepoDirectory(self): |
| self.assertRaises(git_drover.Error, git_drover.cherry_pick_change, 'branch', |
| 'cl', os.path.join(self._parent_repo, 'fake'), False) |
| self.assertFalse(self._commands) |
| self.assertFalse(os.path.exists(self._target_repo)) |
| self.assertFalse(self._input) |
| |
| def testContinueInvalidWorkdir(self): |
| self.assertRaises(git_drover.Error, git_drover.continue_cherry_pick, |
| self._parent_repo) |
| |
| def testAbortInvalidWorkdir(self): |
| self.assertRaises(git_drover.Error, git_drover.abort_cherry_pick, |
| self._parent_repo) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |