| # Copyright (c) 2020 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import os |
| import sys |
| import unittest |
| |
| if sys.version_info.major == 2: |
| import mock |
| else: |
| from unittest import mock |
| |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| |
| import gerrit_util |
| import owners_client |
| |
| from testing_support import filesystem_mock |
| |
| |
| alice = 'alice@example.com' |
| bob = 'bob@example.com' |
| chris = 'chris@example.com' |
| dave = 'dave@example.com' |
| emily = 'emily@example.com' |
| |
| |
| class DepotToolsClientTest(unittest.TestCase): |
| def setUp(self): |
| self.repo = filesystem_mock.MockFileSystem(files={ |
| '/OWNERS': '\n'.join([ |
| 'per-file approved.cc=approver@example.com', |
| 'per-file reviewed.h=reviewer@example.com', |
| 'missing@example.com', |
| ]), |
| '/approved.cc': '', |
| '/reviewed.h': '', |
| '/bar/insufficient_reviewers.py': '', |
| '/bar/everyone/OWNERS': '*', |
| '/bar/everyone/foo.txt': '', |
| }) |
| self.root = '/' |
| self.fopen = self.repo.open_for_reading |
| self.addCleanup(mock.patch.stopall) |
| self.client = owners_client.DepotToolsClient( |
| '/', 'branch', self.fopen, self.repo) |
| |
| @mock.patch('scm.GIT.CaptureStatus') |
| @mock.patch('scm.GIT.GetOldContents') |
| def testListOwners(self, mockGetOldContents, mockCaptureStatus): |
| mockGetOldContents.side_effect = lambda r, f, _b: self.repo.files[r + f] |
| mockCaptureStatus.return_value = [ |
| ('M', 'bar/everyone/foo.txt'), |
| ('M', 'OWNERS'), |
| ] |
| |
| self.assertEqual( |
| ['*', 'missing@example.com'], |
| self.client.ListOwners('bar/everyone/foo.txt')) |
| mockCaptureStatus.assert_called_once_with('/', 'branch') |
| |
| |
| class GerritClientTest(unittest.TestCase): |
| def setUp(self): |
| self.client = owners_client.GerritClient('host', 'project', 'branch') |
| self.addCleanup(mock.patch.stopall) |
| |
| def testListOwners(self): |
| mock.patch( |
| 'gerrit_util.GetOwnersForFile', |
| return_value={ |
| "code_owners": [ |
| { |
| "account": { |
| "email": 'approver@example.com' |
| } |
| }, |
| { |
| "account": { |
| "email": 'reviewer@example.com' |
| }, |
| }, |
| { |
| "account": { |
| "email": 'missing@example.com' |
| }, |
| }, |
| { |
| "account": {}, |
| } |
| ] |
| }).start() |
| self.assertEquals( |
| ['approver@example.com', 'reviewer@example.com', 'missing@example.com'], |
| self.client.ListOwners(os.path.join('bar', 'everyone', 'foo.txt'))) |
| |
| # Result should be cached. |
| self.assertEquals( |
| ['approver@example.com', 'reviewer@example.com', 'missing@example.com'], |
| self.client.ListOwners(os.path.join('bar', 'everyone', 'foo.txt'))) |
| # Always use slashes as separators. |
| gerrit_util.GetOwnersForFile.assert_called_once_with( |
| 'host', 'project', 'branch', 'bar/everyone/foo.txt', |
| resolve_all_users=False, seed=mock.ANY) |
| |
| def testListOwnersOwnedByAll(self): |
| mock.patch( |
| 'gerrit_util.GetOwnersForFile', |
| side_effect=[ |
| { |
| "code_owners": [ |
| { |
| "account": { |
| "email": 'foo@example.com' |
| }, |
| }, |
| ], |
| "owned_by_all_users": True, |
| }, |
| { |
| "code_owners": [ |
| { |
| "account": { |
| "email": 'bar@example.com' |
| }, |
| }, |
| ], |
| "owned_by_all_users": False, |
| }, |
| ] |
| ).start() |
| self.assertEquals( |
| ['foo@example.com', self.client.EVERYONE], |
| self.client.ListOwners('foo.txt')) |
| self.assertEquals( |
| ['bar@example.com'], |
| self.client.ListOwners('bar.txt')) |
| |
| |
| class TestClient(owners_client.OwnersClient): |
| def __init__(self, owners_by_path): |
| super(TestClient, self).__init__() |
| self.owners_by_path = owners_by_path |
| |
| def ListOwners(self, path): |
| return self.owners_by_path[path] |
| |
| |
| class OwnersClientTest(unittest.TestCase): |
| def setUp(self): |
| self.owners = {} |
| self.client = TestClient(self.owners) |
| |
| def testGetFilesApprovalStatus(self): |
| self.client.owners_by_path = { |
| 'approved': ['approver@example.com'], |
| 'pending': ['reviewer@example.com'], |
| 'insufficient': ['insufficient@example.com'], |
| 'everyone': [owners_client.OwnersClient.EVERYONE], |
| } |
| self.assertEqual( |
| self.client.GetFilesApprovalStatus( |
| ['approved', 'pending', 'insufficient'], |
| ['approver@example.com'], ['reviewer@example.com']), |
| { |
| 'approved': owners_client.OwnersClient.APPROVED, |
| 'pending': owners_client.OwnersClient.PENDING, |
| 'insufficient': owners_client.OwnersClient.INSUFFICIENT_REVIEWERS, |
| }) |
| self.assertEqual( |
| self.client.GetFilesApprovalStatus( |
| ['everyone'], ['anyone@example.com'], []), |
| {'everyone': owners_client.OwnersClient.APPROVED}) |
| self.assertEqual( |
| self.client.GetFilesApprovalStatus( |
| ['everyone'], [], ['anyone@example.com']), |
| {'everyone': owners_client.OwnersClient.PENDING}) |
| self.assertEqual( |
| self.client.GetFilesApprovalStatus(['everyone'], [], []), |
| {'everyone': owners_client.OwnersClient.INSUFFICIENT_REVIEWERS}) |
| |
| def testScoreOwners(self): |
| self.client.owners_by_path = { |
| 'a': [alice, bob, chris] |
| } |
| self.assertEqual( |
| self.client.ScoreOwners(self.client.owners_by_path.keys()), |
| [alice, bob, chris] |
| ) |
| |
| self.client.owners_by_path = { |
| 'a': [alice, bob], |
| 'b': [bob], |
| 'c': [bob, chris] |
| } |
| self.assertEqual( |
| self.client.ScoreOwners(self.client.owners_by_path.keys()), |
| [alice, bob, chris] |
| ) |
| |
| self.client.owners_by_path = { |
| 'a': [alice, bob], |
| 'b': [bob], |
| 'c': [bob, chris] |
| } |
| self.assertEqual( |
| self.client.ScoreOwners( |
| self.client.owners_by_path.keys(), exclude=[chris]), |
| [alice, bob], |
| ) |
| |
| self.client.owners_by_path = { |
| 'a': [alice, bob, chris, dave], |
| 'b': [chris, bob, dave], |
| 'c': [chris, dave], |
| 'd': [alice, chris, dave] |
| } |
| self.assertEqual( |
| self.client.ScoreOwners(self.client.owners_by_path.keys()), |
| [alice, chris, bob, dave] |
| ) |
| |
| def assertSuggestsOwners(self, owners_by_path, exclude=None): |
| self.client.owners_by_path = owners_by_path |
| suggested = self.client.SuggestOwners( |
| owners_by_path.keys(), exclude=exclude) |
| |
| # Owners should appear only once |
| self.assertEqual(len(suggested), len(set(suggested))) |
| |
| # All paths should be covered. |
| suggested = set(suggested) |
| for owners in owners_by_path.values(): |
| self.assertTrue(suggested & set(owners)) |
| |
| # No excluded owners should be present. |
| if exclude: |
| for owner in suggested: |
| self.assertNotIn(owner, exclude) |
| |
| def testSuggestOwners(self): |
| self.assertSuggestsOwners({}) |
| self.assertSuggestsOwners({'a': [alice]}) |
| self.assertSuggestsOwners({'abcd': [alice, bob, chris, dave]}) |
| self.assertSuggestsOwners( |
| {'abcd': [alice, bob, chris, dave]}, |
| exclude=[alice, bob]) |
| self.assertSuggestsOwners( |
| {'ae': [alice, emily], |
| 'be': [bob, emily], |
| 'ce': [chris, emily], |
| 'de': [dave, emily]}) |
| self.assertSuggestsOwners( |
| {'ad': [alice, dave], |
| 'cad': [chris, alice, dave], |
| 'ead': [emily, alice, dave], |
| 'bd': [bob, dave]}) |
| self.assertSuggestsOwners( |
| {'a': [alice], |
| 'b': [bob], |
| 'c': [chris], |
| 'ad': [alice, dave]}) |
| self.assertSuggestsOwners( |
| {'abc': [alice, bob, chris], |
| 'acb': [alice, chris, bob], |
| 'bac': [bob, alice, chris], |
| 'bca': [bob, chris, alice], |
| 'cab': [chris, alice, bob], |
| 'cba': [chris, bob, alice]}) |
| |
| # Check that we can handle a large amount of files with unrelated owners. |
| self.assertSuggestsOwners( |
| {str(x): [str(x)] for x in range(100)}) |
| |
| def testBatchListOwners(self): |
| self.client.owners_by_path = { |
| 'bar/everyone/foo.txt': [alice, bob], |
| 'bar/everyone/bar.txt': [bob], |
| 'bar/foo/': [bob, chris] |
| } |
| |
| self.assertEquals( |
| { |
| 'bar/everyone/foo.txt': [alice, bob], |
| 'bar/everyone/bar.txt': [bob], |
| 'bar/foo/': [bob, chris] |
| }, |
| self.client.BatchListOwners( |
| ['bar/everyone/foo.txt', 'bar/everyone/bar.txt', 'bar/foo/'])) |
| |
| |
| class GetCodeOwnersClientTest(unittest.TestCase): |
| def setUp(self): |
| mock.patch('gerrit_util.IsCodeOwnersEnabled').start() |
| self.addCleanup(mock.patch.stopall) |
| |
| def testGetCodeOwnersClient_GerritClient(self): |
| gerrit_util.IsCodeOwnersEnabled.return_value = True |
| self.assertIsInstance( |
| owners_client.GetCodeOwnersClient('', '', 'host', 'project', 'branch'), |
| owners_client.GerritClient) |
| |
| def testGetCodeOwnersClient_DepotToolsClient(self): |
| gerrit_util.IsCodeOwnersEnabled.return_value = False |
| self.assertIsInstance( |
| owners_client.GetCodeOwnersClient('root', 'branch', '', '', ''), |
| owners_client.DepotToolsClient) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |