blob: fb594b74c6b224aeecbdb57e8ce746b5a8c42161 [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Unittests for chromite.lib.patch."""
import copy
import itertools
import os
import shutil
import sys
import time
import unittest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(
from chromite.buildbot import constants
from chromite.lib import cros_build_lib
from chromite.lib import cros_build_lib_unittest
from chromite.lib import cros_test_lib
from chromite.lib import gerrit
from chromite.lib import git
from chromite.lib import osutils
from chromite.lib import patch as cros_patch
import mock
_GetNumber = iter(itertools.count()).next
"project":"tacos/chromite", "branch":"master",
"currentPatchSet": {
"number":"2", "ref":gerrit.GetChangeRef(1112, 2),
"subject":"chromite commit",
"owner":{"name":"Chromite Master", "email":""},
"open": True,
# Change-ID of a known open change in public gerrit.
class TestGitRepoPatch(cros_test_lib.TempDirTestCase):
# No mock bits are to be used in this class's tests.
# This needs to actually validate git output, and git behaviour, rather
# than test our assumptions about git's behaviour/output.
patch_kls = cros_patch.GitRepoPatch
"""commit abcdefgh
Author: Fake person
Date: Tue Oct 99
I am the first commit.
# Boolean controlling whether the target class natively knows its
# ChangeId; only GerritPatches do.
has_native_change_id = False
DEFAULT_TRACKING = 'refs/remotes/%s/master' % constants.EXTERNAL_REMOTE
def _CreateSourceRepo(self, path):
"""Generate a new repo with a single commit."""
tmp_path = '%s-tmp' % path
self._run(['git', 'init', '--separate-git-dir', path], cwd=tmp_path)
# Add an initial commit then wipe the working tree.
self._run(['git', 'commit', '--allow-empty', '-m', 'initial commit'],
def setUp(self):
# Create an empty repo to work from.
self.source = os.path.join(self.tempdir, 'source.git')
self.default_cwd = os.path.join(self.tempdir, 'unwritable')
self.original_cwd = os.getcwd()
# Disallow write so as to smoke out any invalid writes to
# cwd.
os.chmod(self.default_cwd, 0o500)
def tearDown(self):
if hasattr(self, 'original_cwd'):
def _MkPatch(self, source, sha1, ref='refs/heads/master', **kwds):
return self.patch_kls(source, 'chromiumos/chromite', ref,
'%s/master' % constants.EXTERNAL_REMOTE,
kwds.pop('remote', constants.EXTERNAL_REMOTE),
sha1=sha1, **kwds)
def _run(self, cmd, cwd=None):
# Note that cwd is intentionally set to a location the user can't write
# to; this flushes out any bad usage in the tests that would work by
# fluke of being invoked from w/in a git repo.
if cwd is None:
cwd = self.default_cwd
return cros_build_lib.RunCommandCaptureOutput(
cmd, cwd=cwd, print_cmd=False).output.strip()
def _GetSha1(self, cwd, refspec):
return self._run(['git', 'rev-list', '-n1', refspec], cwd=cwd)
def _MakeRepo(self, name, clone, remote=None, alternates=True):
path = os.path.join(self.tempdir, name)
cmd = ['git', 'clone', clone, path]
if alternates:
cmd += ['--reference', clone]
if remote is None:
remote = constants.EXTERNAL_REMOTE
cmd += ['--origin', remote]
return path
def _MakeCommit(self, repo, commit=None):
if commit is None:
commit = "commit at %s" % (time.time(),)
self._run(['git', 'commit', '-a', '-m', commit], repo)
return self._GetSha1(repo, 'HEAD')
def CommitFile(self, repo, filename, content, commit=None, **kwds):
osutils.WriteFile(os.path.join(repo, filename), content)
self._run(['git', 'add', filename], repo)
sha1 = self._MakeCommit(repo, commit=commit)
if not self.has_native_change_id:
kwds.pop('ChangeId', None)
patch = self._MkPatch(repo, sha1, **kwds)
self.assertEqual(patch.sha1, sha1)
return patch
def _CommonGitSetup(self):
git1 = self._MakeRepo('git1', self.source)
git2 = self._MakeRepo('git2', self.source)
patch = self.CommitFile(git1, 'monkeys', 'foon')
return git1, git2, patch
def testGetDiffStatus(self):
git1, _, patch1 = self._CommonGitSetup()
# Ensure that it can work on the first commit, even if it
# doesn't report anything (no delta; it's the first files).
patch1 = self._MkPatch(git1, self._GetSha1(git1, self.DEFAULT_TRACKING))
self.assertEqual({}, patch1.GetDiffStatus(git1))
patch2 = self.CommitFile(git1, 'monkeys', 'blah')
self.assertEqual({'monkeys': 'M'}, patch2.GetDiffStatus(git1))
git.RunGit(git1, ['mv', 'monkeys', 'monkeys2'])
patch3 = self._MkPatch(git1, self._MakeCommit(git1, commit="mv"))
self.assertEqual({'monkeys': 'D', 'monkeys2': 'A'},
patch4 = self.CommitFile(git1, 'monkey2', 'blah')
self.assertEqual({'monkey2': 'A'}, patch4.GetDiffStatus(git1))
def testFetch(self):
_, git2, patch = self._CommonGitSetup()
self.assertEqual(patch.sha1, self._GetSha1(git2, 'FETCH_HEAD'))
# Verify reuse; specifically that Fetch doesn't actually run since
# the rev is already available locally via alternates.
patch.project_url = '/dev/null'
git3 = self._MakeRepo('git3', git2)
self.assertEqual(patch.sha1, self._GetSha1(git3, patch.sha1))
def testAlreadyApplied(self):
git1 = self._MakeRepo('git1', self.source)
patch1 = self._MkPatch(git1, self._GetSha1(git1, 'HEAD'))
self.assertRaises2(cros_patch.PatchAlreadyApplied, patch1.Apply, git1,
self.DEFAULT_TRACKING, check_attrs={'inflight':False})
patch2 = self.CommitFile(git1, 'monkeys', 'rule')
self.assertRaises2(cros_patch.PatchAlreadyApplied, patch2.Apply, git1,
self.DEFAULT_TRACKING, check_attrs={'inflight':True})
def testCleanlyApply(self):
_, git2, patch = self._CommonGitSetup()
# Clone git3 before we modify git2; else we'll just wind up
# cloning it's master.
git3 = self._MakeRepo('git3', git2)
patch.Apply(git2, self.DEFAULT_TRACKING)
self.assertEqual(patch.sha1, self._GetSha1(git2, 'HEAD'))
# Verify reuse; specifically that Fetch doesn't actually run since
# the object is available in alternates. testFetch partially
# validates this; the Apply usage here fully validates it via
# ensuring that the attempted Apply goes boom if it can't get the
# required sha1.
patch.project_url = '/dev/null'
patch.Apply(git3, self.DEFAULT_TRACKING)
self.assertEqual(patch.sha1, self._GetSha1(git3, 'HEAD'))
def testFailsApply(self):
_, git2, patch1 = self._CommonGitSetup()
patch2 = self.CommitFile(git2, 'monkeys', 'not foon')
# Note that Apply creates it's own branch, resetting to master
# thus we have to re-apply (even if it looks stupid, it's right).
patch2.Apply(git2, self.DEFAULT_TRACKING)
patch1.Apply, git2, self.DEFAULT_TRACKING,
exact_kls=True, check_attrs={'inflight':True})
def testTrivial(self):
_, git2, patch1 = self._CommonGitSetup()
# Throw in a bunch of newlines so that content-merging would work.
content = 'not foon%s' % ('\n' * 100)
patch1 = self._MkPatch(git2, self._GetSha1(git2, 'HEAD'))
patch1 = self.CommitFile(git2, 'monkeys', content)
git2, ['update-ref', self.DEFAULT_TRACKING, patch1.sha1])
patch2 = self.CommitFile(git2, 'monkeys', '%sblah' % content)
patch3 = self.CommitFile(git2, 'monkeys', '%sblahblah' % content)
# Get us a back to the basic, then derive from there; this is used to
# verify that even if content merging works, trivial is flagged.
self.CommitFile(git2, 'monkeys', 'foon')
patch4 = self.CommitFile(git2, 'monkeys', content)
patch5 = self.CommitFile(git2, 'monkeys', '%sfoon' % content)
# Reset so we derive the next changes from patch1.
git.RunGit(git2, ['reset', '--hard', patch1.sha1])
patch6 = self.CommitFile(git2, 'blah', 'some-other-file')
self.CommitFile(git2, 'monkeys',
'%sblah' % content.replace('not', 'bot'))
patch1.Apply, git2, self.DEFAULT_TRACKING, trivial=True,
check_attrs={'inflight':False, 'trivial':False})
# Now test conflicts since we're still at ToT; note that this is an actual
# conflict because the fuzz anchors have changed.
patch3.Apply, git2, self.DEFAULT_TRACKING, trivial=True,
check_attrs={'inflight':False, 'trivial':False},
# Now test trivial conflict; this would've merged fine were it not for
# trivial.
patch4.Apply, git2, self.DEFAULT_TRACKING, trivial=True,
check_attrs={'inflight':False, 'trivial':False},
# Move us into inflight testing.
patch2.Apply(git2, self.DEFAULT_TRACKING, trivial=True)
# Repeat the tests from above; should still be the same.
patch4.Apply, git2, self.DEFAULT_TRACKING, trivial=True,
check_attrs={'inflight':False, 'trivial':False})
# Actual conflict merge conflict due to inflight; non trivial induced.
patch5.Apply, git2, self.DEFAULT_TRACKING, trivial=True,
check_attrs={'inflight':True, 'trivial':False},
patch1.Apply, git2, self.DEFAULT_TRACKING, trivial=True,
patch5.Apply, git2, self.DEFAULT_TRACKING, trivial=True,
check_attrs={'inflight':True, 'trivial':False},
# And this should apply without issue, despite the differing history.
patch6.Apply(git2, self.DEFAULT_TRACKING, trivial=True)
def _assertLookupAliases(self, remote):
git1 = self._MakeRepo('git1', self.source)
patch = self.CommitChangeIdFile(git1, remote=remote)
prefix = '*' if patch.internal else ''
vals = [patch.change_id, patch.sha1, getattr(patch, 'gerrit_number', None),
getattr(patch, 'original_sha1', None)]
vals = [x for x in vals if x is not None]
self.assertEqual(set(prefix + x for x in vals),
def testExternalLookupAliases(self):
def testInternalLookupAliases(self):
def MakeChangeId(self, how_many=1):
l = [cros_patch.MakeChangeId() for _ in xrange(how_many)]
if how_many == 1:
return l[0]
return l
def CommitChangeIdFile(self, repo, changeid=None, extra=None,
filename='monkeys', content='flinging',
raw_changeid_text=None, **kwargs):
template = self.COMMIT_TEMPLATE
if changeid is None:
changeid = self.MakeChangeId()
if raw_changeid_text is None:
raw_changeid_text = 'Change-Id: %s' % (changeid,)
if extra is None:
extra = ''
commit = template % {'change-id':raw_changeid_text, 'extra':extra}
return self.CommitFile(repo, filename, content, commit=commit,
ChangeId=changeid, **kwargs)
def _CheckPaladin(self, repo, master_id, ids, extra):
patch = self.CommitChangeIdFile(
repo, master_id, extra=extra,
filename='paladincheck', content=str(_GetNumber()))
deps = patch.PaladinDependencies(repo)
# Assert that are parsing unique'ifies the results.
self.assertEqual(len(deps), len(set(deps)))
deps = set(deps)
ids = set(ids)
self.assertEqual(ids, deps)
set(cros_patch.FormatPatchDep(x) for x in deps),
set(cros_patch.FormatPatchDep(x) for x in ids))
return patch
def testPaladinDependencies(self):
git1 = self._MakeRepo('git1', self.source)
cid1, cid2, cid3, cid4 = self.MakeChangeId(4)
# Verify it handles nonexistant CQ-DEPEND.
self._CheckPaladin(git1, cid1, [], '')
# Single key, single value.
self._CheckPaladin(git1, cid1, [cid2],
'CQ-DEPEND=%s' % cid2)
# Single key, gerrit number.
self._CheckPaladin(git1, cid1, ['123'],
'CQ-DEPEND=%s' % 123)
# Single key, gerrit number.
self._CheckPaladin(git1, cid1, ['123456'],
'CQ-DEPEND=%s' % 123456)
# Single key, gerrit number; ensure it
# cuts off before a million changes (this
# is done to avoid collisions w/ sha1 when
# we're using shortened versions).
self._CheckPaladin, git1, cid1,
['1234567'], 'CQ-DEPEND=%s' % '1234567')
# Single key, gerrit number, internal.
self._CheckPaladin(git1, cid1, ['*123'],
'CQ-DEPEND=%s' % '*123')
# Ensure SHA1's aren't allowed.
sha1 = '0' * 40
self._CheckPaladin, git1, cid1,
[sha1], 'CQ-DEPEND=%s' % sha1)
# Single key, multiple values
self._CheckPaladin(git1, cid1, [cid2, '1223'],
'CQ-DEPEND=%s %s' % (cid2, '1223'))
# Dumb comma behaviour
self._CheckPaladin(git1, cid1, [cid2, cid3],
'CQ-DEPEND=%s, %s,' % (cid2, cid3))
# Multiple keys.
self._CheckPaladin(git1, cid1, [cid2, '*245', cid4],
'CQ-DEPEND=%s, %s\nCQ-DEPEND=%s' % (cid2, '*245', cid4))
# Ensure it goes boom on invalid data.
self.assertRaises(cros_patch.BrokenCQDepends, self._CheckPaladin,
git1, cid1, [], 'CQ-DEPEND=monkeys')
self.assertRaises(cros_patch.BrokenCQDepends, self._CheckPaladin,
git1, cid1, [], 'CQ-DEPEND=%s monkeys' % (cid2,))
# Validate numeric is allowed.
self._CheckPaladin(git1, cid1, [cid2, '1'], 'CQ-DEPEND=1 %s' % cid2)
# Validate that it unique'ifies the results.
self._CheckPaladin(git1, cid1, ['1'], 'CQ-DEPEND=1 1')
class TestLocalPatchGit(TestGitRepoPatch):
patch_kls = cros_patch.LocalPatch
def setUp(self):
self.sourceroot = os.path.join(self.tempdir, 'sourceroot')
def _MkPatch(self, source, sha1, ref='refs/heads/master', **kwds):
remote = kwds.pop('remote', constants.EXTERNAL_REMOTE)
return self.patch_kls(source, 'chromiumos/chromite', ref,
'%s/master' % remote, remote, sha1, **kwds)
def testUpload(self):
def ProjectDirMock(_sourceroot):
return git1
git1, git2, patch = self._CommonGitSetup()
git2_sha1 = self._GetSha1(git2, 'HEAD')
patch.ProjectDir = ProjectDirMock
# First suppress carbon copy behaviour so we verify pushing
# plain works.
# pylint: disable=E1101
sha1 = patch.sha1
patch._GetCarbonCopy = lambda: sha1
patch.Upload(git2, 'refs/testing/test1')
self.assertEqual(self._GetSha1(git2, 'refs/testing/test1'),
# Enable CarbonCopy behaviour; verify it lands a different
# sha1. Additionally verify it didn't corrupt the patch's sha1 locally.
del patch._GetCarbonCopy
patch.Upload(git2, 'refs/testing/test2')
self.assertNotEqual(self._GetSha1(git2, 'refs/testing/test2'),
self.assertEqual(patch.sha1, sha1)
# Ensure the carbon creation didn't damage the target repo.
self.assertEqual(self._GetSha1(git1, 'HEAD'), sha1)
# Ensure we didn't damage the target repo's state at all.
self.assertEqual(git2_sha1, self._GetSha1(git2, 'HEAD'))
# Ensure the content is the same.
base = ['git', 'show']
self._run(base + ['refs/testing/test1:monkeys'], git2),
self._run(base + ['refs/testing/test2:monkeys'], git2))
base = ['git', 'log', '--format=%B', '-n1']
self._run(base + ['refs/testing/test1'], git2),
self._run(base + ['refs/testing/test2'], git2))
class TestUploadedLocalPatch(TestGitRepoPatch):
PROJECT = 'chromiumos/chromite'
ORIGINAL_BRANCH = 'original_branch'
ORIGINAL_SHA1 = 'ffffffff'.ljust(40, '0')
patch_kls = cros_patch.UploadedLocalPatch
def _MkPatch(self, source, sha1, ref='refs/heads/master', **kwds):
return self.patch_kls(source, self.PROJECT, ref,
'%s/master' % constants.EXTERNAL_REMOTE,
kwds.pop('remote', constants.EXTERNAL_REMOTE),
carbon_copy_sha1=sha1, **kwds)
def testStringRepresentation(self):
_, _, patch = self._CommonGitSetup()
str_rep = str(patch).split(':')
for element in [self.PROJECT, self.ORIGINAL_BRANCH, self.ORIGINAL_SHA1[:8]]:
self.assertTrue(element in str_rep,
msg="Couldn't find %s in %s" % (element, str_rep))
class TestGerritPatch(TestGitRepoPatch):
has_native_change_id = True
class patch_kls(cros_patch.GerritPatch):
# Suppress the behaviour pointing the project url at actual gerrit,
# instead slaving it back to a local repo for tests.
def __init__(self, *args, **kwargs):
cros_patch.GerritPatch.__init__(self, *args, **kwargs)
assert hasattr(self, 'patch_dict')
self.project_url = self.patch_dict['_unittest_url_bypass']
def test_json(self):
return copy.deepcopy(FAKE_PATCH_JSON)
def _MkPatch(self, source, sha1, ref='refs/heads/master', **kwds):
json = self.test_json
remote = kwds.pop('remote', constants.EXTERNAL_REMOTE)
url_prefix = kwds.pop('url_prefix', constants.EXTERNAL_GERRIT_URL)
suppress_branch = kwds.pop('suppress_branch', False)
change_id = kwds.pop('ChangeId', None)
if change_id is None:
change_id = self.MakeChangeId()
change_num, patch_num = _GetNumber(), _GetNumber()
# Note we intentionally use a gerrit like refspec here; we want to
# ensure that none of our common code pathways puke on a non head/tag.
refspec = gerrit.GetChangeRef(change_num + 1000, patch_num)
dict(number=patch_num, ref=refspec, revision=sha1))
json['branch'] = os.path.basename(ref)
json['_unittest_url_bypass'] = source
json['id'] = change_id
obj = self.patch_kls(json.copy(), remote, url_prefix)
self.assertEqual(obj.patch_dict, json)
self.assertEqual(obj.remote, remote)
self.assertEqual(obj.url_prefix, url_prefix)
self.assertEqual(obj.project, json['project'])
self.assertEqual(obj.ref, refspec)
self.assertEqual(obj.change_id, change_id)
cros_patch.FormatChangeId(change_id, force_internal=obj.internal))
# Now make the fetching actually work, if desired.
if not suppress_branch:
# Note that a push is needed here, rather than a branch; branch
# will just make it under refs/heads, we want it literally in
# refs/changes/
self._run(['git', 'push', source, '%s:%s' % (sha1, refspec)], source)
return obj
def testApprovalTimestamp(self):
"""Test that the approval timestamp is correctly extracted from JSON."""
repo = self._MakeRepo('git', self.source)
for approvals, expected in [(None, 0), ([], 0), ([1], 1), ([1, 3, 2], 3)]:
currentPatchSet = copy.deepcopy(FAKE_PATCH_JSON['currentPatchSet'])
if approvals is not None:
currentPatchSet['approvals'] = [{'grantedOn': x} for x in approvals]
patch = self._MkPatch(repo, self._GetSha1(repo, self.DEFAULT_TRACKING),
msg = 'Expected %r, but got %r (approvals=%r)' % (
expected, patch.approval_timestamp, approvals)
self.assertEqual(patch.approval_timestamp, expected, msg)
def _assertGerritDependencies(self, remote=constants.EXTERNAL_REMOTE):
convert = str
if remote == constants.INTERNAL_REMOTE:
convert = lambda val: '*%s' % (val,)
git1 = self._MakeRepo('git1', self.source, remote=remote)
patch = self._MkPatch(git1, self._GetSha1(git1, 'HEAD'), remote=remote)
cid1, cid2 = '1', '2'
# Test cases with no dependencies, 1 dependency, and 2 dependencies.
self.assertEqual(patch.GerritDependencies(), [])
patch.patch_dict['dependsOn'] = [{'number': cid1}]
self.assertEqual(patch.GerritDependencies(), [convert(cid1)])
patch.patch_dict['dependsOn'].append({'number': cid2})
self.assertEqual(patch.GerritDependencies(), [convert(cid1), convert(cid2)])
def testExternalGerritDependencies(self):
def testInternalGerritDependencies(self):
class PrepareRemotePatchesTest(cros_test_lib.TestCase):
def MkRemote(self,
project='my/project', original_branch='my-local',
ref='refs/tryjobs/elmer/patches', tracking_branch='master',
l = [project, original_branch, ref, tracking_branch,
getattr(constants, '%s_PATCH_TAG' % (
'INTERNAL' if internal else 'EXTERNAL'))]
return ':'.join(l)
def assertRemote(self, patch, project='my/project',
ref='refs/tryjobs/elmer/patches', tracking_branch='master',
self.assertEqual(patch.project, project)
self.assertEqual(patch.original_branch, original_branch)
self.assertEqual(patch.ref, ref)
self.assertEqual(patch.tracking_branch, tracking_branch)
self.assertEqual(patch.internal, internal)
def test(self):
# Check handling of a single patch...
patches = cros_patch.PrepareRemotePatches([self.MkRemote()])
self.assertEqual(len(patches), 1)
# Check handling of a multiple...
patches = cros_patch.PrepareRemotePatches(
[self.MkRemote(), self.MkRemote(project='foon')])
self.assertEqual(len(patches), 2)
self.assertRemote(patches[1], project='foon')
# Ensure basic validation occurs:
chunks = self.MkRemote().split(':')
self.assertRaises(ValueError, cros_patch.PrepareRemotePatches,
self.assertRaises(ValueError, cros_patch.PrepareRemotePatches,
':'.join(chunks[:-1] + ['monkeys']))
self.assertRaises(ValueError, cros_patch.PrepareRemotePatches,
':'.join(chunks + [':']))
class PrepareLocalPatchesTests(cros_build_lib_unittest.RunCommandTestCase):
def setUp(self):
self.path, self.project, self.branch = 'mydir', 'my/project', 'mybranch'
self.tracking_branch = 'kernel'
self.patches = ['%s:%s' % (self.project, self.branch)]
self.manifest = mock.MagicMock()
attrs = dict(tracking_branch=self.tracking_branch,
checkout = git.ProjectCheckout(attrs)
self.manifest, 'FindCheckouts', return_value=[checkout]
def PrepareLocalPatches(self, output):
"""Check the returned GitRepoPatchInfo against golden values."""
output_obj = mock.MagicMock()
output_obj.output = output
self.PatchObject(cros_patch.LocalPatch, 'Fetch', return_value=output_obj)
self.PatchObject(git, 'RunGit', return_value=output_obj)
patch_info, = cros_patch.PrepareLocalPatches(self.manifest, self.patches)
self.assertEquals(patch_info.project, self.project)
self.assertEquals(patch_info.ref, self.branch)
self.assertEquals(patch_info.tracking_branch, self.tracking_branch)
def testBranchSpecifiedSuccessRun(self):
"""Test success with branch specified by user."""
self.PrepareLocalPatches('12345'.rjust(40, '0'))
def testBranchSpecifiedNoChanges(self):
"""Test when no changes on the branch specified by user."""
self.assertRaises(SystemExit, self.PrepareLocalPatches, '')
class TestFormatting(cros_test_lib.TestCase):
def _assertResult(self, functor, value, expected=None, raises=False,
fixup=str, **kwds):
if raises:
self.assertRaises2(ValueError, functor, fixup(value),
msg="%s(%r), original %r, did not throw a ValueError"
% (functor.__name__, fixup(value), value), **kwds)
self.assertEqual(functor(value, **kwds), expected,
msg="failed: %s(%r) != %r; originals: %r %r"
% (functor.__name__, fixup(value), fixup(expected),
value, expected))
def _assertBad(self, functor, values, fixup=str, allow_CL=False, **kwds):
values = map(fixup, values)
pass_allow_CL = kwds.pop('pass_allow', False)
for prefix in ([""] + (['CL:'] if allow_CL else [])):
if pass_allow_CL:
kwds['allow_CL'] = bool(prefix)
for value in values:
self._assertResult(functor, prefix + value, raises=True, **kwds)
for value in values:
self._assertResult(functor, prefix + '*' + value, raises=True, **kwds)
def _assertGood(self, functor, values, fixup=str, allow_CL=False, **kwds):
pass_allow_CL = kwds.pop('pass_allow', False)
values = [map(fixup, x) for x in values]
for prefix in ([""] + (['CL:'] if allow_CL else [])):
if pass_allow_CL:
kwds['allow_CL'] = bool(prefix)
for value, expected in values:
self._assertResult(functor, prefix + value, expected, **kwds)
for value, expected in values:
self._assertResult(functor, prefix + '*' + value, '*' + expected,
def _ChangeIdFixup(value):
s = value.lstrip('iI*')
l = len(value)
return '%s%s' % (value[0:l-len(s)], s.ljust(40, "0"))
def testFormatChangeId(self):
fixup = self._ChangeIdFixup
['is', '**i1325', 'iz12345', 'Iz12365', 'II', 'ii', 'I1234+'],
fixup=fixup, allow_CL=True)
# Note the lack of fixup; we're checking size handling here.
['I012365', 'Ia0123456'.ljust(42, '0'), 'Ia'.ljust(40, '0')],
allow_CL=True, strict=True)
[('I012345', 'I012345'),
('Iabcdf', 'Iabcdf'),
('IABCDF', 'Iabcdf')],
def testFormatGerritNumber(self):
['is', 'i1325', '01234567', '012345a', '**12345', '+123', '/0123'],
[('1',) * 2,
('123',) * 2,
('123456',) * 2,
('001', '1')])
def _Sha1Fixup(value):
return value.ljust(40, '0')
def testFormatSha1(self):
fixup = self._Sha1Fixup
['0abcg', 'Z', '**a', '+123', '1234ab' * 10],
fixup=fixup, allow_CL=True)
# Length checks.
['0' * 41, 'a' * 39],
[('1' * 40,) * 2,
('a' * 40,) * 2,
('0123456789abcdef',) * 2,
('0123456789ABCDEF', '0123456789abcdef')],
def _FullChangeIdFixup(value):
pieces = value.split('~')
pieces[-1] = TestFormatting._ChangeIdFixup(pieces[-1])
return '~'.join(pieces)
def testFormatFullChangeId(self):
fixup = self._FullChangeIdFixup
['foo', 'foo~bar', 'foo~bar~baz', 'foo~refs/bar~*I1234'],
fixup=fixup, allow_CL=True)
[('foo~bar~ifade', 'foo~bar~Ifade'),
('foo/bar/baz~refs/heads/_my-branch_~Iface',) * 2],
def testFormatPatchDeps(self):
sha1 = self._Sha1Fixup
changeId = self._ChangeIdFixup
# Validate control over formats allowed:
['12345', '1234567'],
gerrit_number=False, allow_CL=True, pass_allow=True)
map(changeId, ['I1234567']),
changeId=False, allow_CL=True, pass_allow=True)
map(sha1, ['1234567', 'asdf1']),
sha1=False, allow_CL=True, pass_allow=True)
[('12345', '12345'), ('1', '1'), ('98765', '98765'), ('001', '1'),
(changeId('Iabcd'),) *2,
(sha1('0123cde'),) *2], allow_CL=True, pass_allow=True)
if __name__ == '__main__':