| #!/usr/bin/env vpython3 |
| # coding=utf-8 |
| # Copyright (c) 2019 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import httplib2 |
| from io import StringIO |
| import json |
| import os |
| import socket |
| import sys |
| import unittest |
| from unittest import mock |
| |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| |
| import gerrit_util |
| import metrics |
| import subprocess2 |
| |
| |
| class CookiesAuthenticatorTest(unittest.TestCase): |
| _GITCOOKIES = '\n'.join([ |
| '\t'.join([ |
| 'chromium.googlesource.com', |
| 'FALSE', |
| '/', |
| 'TRUE', |
| '2147483647', |
| 'o', |
| 'git-user.chromium.org=1/chromium-secret', |
| ]), |
| '\t'.join([ |
| 'chromium-review.googlesource.com', |
| 'FALSE', |
| '/', |
| 'TRUE', |
| '2147483647', |
| 'o', |
| 'git-user.chromium.org=1/chromium-secret', |
| ]), |
| '\t'.join([ |
| '.example.com', |
| 'FALSE', |
| '/', |
| 'TRUE', |
| '2147483647', |
| 'o', |
| 'example-bearer-token', |
| ]), |
| '\t'.join([ |
| 'another-path.example.com', |
| 'FALSE', |
| '/foo', |
| 'TRUE', |
| '2147483647', |
| 'o', |
| 'git-example.com=1/another-path-secret', |
| ]), |
| '\t'.join([ |
| 'another-key.example.com', |
| 'FALSE', |
| '/', |
| 'TRUE', |
| '2147483647', |
| 'not-o', |
| 'git-example.com=1/another-key-secret', |
| ]), |
| '#' + '\t'.join([ |
| 'chromium-review.googlesource.com', |
| 'FALSE', |
| '/', |
| 'TRUE', |
| '2147483647', |
| 'o', |
| 'git-invalid-user.chromium.org=1/invalid-chromium-secret', |
| ]), |
| 'Some unrelated line\t that should not be here', |
| ]) |
| |
| def setUp(self): |
| mock.patch('gclient_utils.FileRead', |
| return_value=self._GITCOOKIES).start() |
| mock.patch('os.getenv', return_value={}).start() |
| mock.patch('os.environ', {'HOME': '$HOME'}).start() |
| mock.patch('os.path.exists', return_value=True).start() |
| mock.patch( |
| 'subprocess2.check_output', |
| side_effect=[ |
| subprocess2.CalledProcessError(1, ['cmd'], 'cwd', 'out', 'err') |
| ], |
| ).start() |
| self.addCleanup(mock.patch.stopall) |
| self.maxDiff = None |
| |
| def testGetNewPasswordUrl(self): |
| auth = gerrit_util.CookiesAuthenticator() |
| self.assertEqual('https://chromium.googlesource.com/new-password', |
| auth.get_new_password_url('chromium.googlesource.com')) |
| self.assertEqual( |
| 'https://chrome-internal.googlesource.com/new-password', |
| auth.get_new_password_url( |
| 'chrome-internal-review.googlesource.com')) |
| |
| def testGetNewPasswordMessage(self): |
| auth = gerrit_util.CookiesAuthenticator() |
| self.assertIn( |
| 'https://chromium.googlesource.com/new-password', |
| auth.get_new_password_message('chromium-review.googlesource.com')) |
| self.assertIn( |
| 'https://chrome-internal.googlesource.com/new-password', |
| auth.get_new_password_message('chrome-internal.googlesource.com')) |
| |
| def testGetGitcookiesPath(self): |
| self.assertEqual( |
| os.path.expanduser(os.path.join('~', '.gitcookies')), |
| gerrit_util.CookiesAuthenticator().get_gitcookies_path()) |
| |
| subprocess2.check_output.side_effect = [ |
| b'http.cookiefile\nhttp.cookiefile\x00' |
| ] |
| self.assertEqual( |
| 'http.cookiefile', |
| gerrit_util.CookiesAuthenticator().get_gitcookies_path()) |
| subprocess2.check_output.assert_called_with( |
| ['git', 'config', '--list', '-z'], |
| cwd=os.getcwd(), |
| env=mock.ANY, |
| stderr=mock.ANY) |
| |
| os.getenv.return_value = 'git-cookies-path' |
| self.assertEqual( |
| 'git-cookies-path', |
| gerrit_util.CookiesAuthenticator().get_gitcookies_path()) |
| os.getenv.assert_called_with('GIT_COOKIES_PATH') |
| |
| def testGitcookies(self): |
| auth = gerrit_util.CookiesAuthenticator() |
| self.assertEqual( |
| auth.gitcookies, { |
| 'chromium.googlesource.com': |
| ('git-user.chromium.org', '1/chromium-secret'), |
| 'chromium-review.googlesource.com': |
| ('git-user.chromium.org', '1/chromium-secret'), |
| '.example.com': ('', 'example-bearer-token'), |
| }) |
| |
| def testGetAuthHeader(self): |
| expected_chromium_header = ( |
| 'Basic Z2l0LXVzZXIuY2hyb21pdW0ub3JnOjEvY2hyb21pdW0tc2VjcmV0') |
| |
| auth = gerrit_util.CookiesAuthenticator() |
| self.assertEqual(expected_chromium_header, |
| auth.get_auth_header('chromium.googlesource.com')) |
| self.assertEqual( |
| expected_chromium_header, |
| auth.get_auth_header('chromium-review.googlesource.com')) |
| self.assertEqual('Bearer example-bearer-token', |
| auth.get_auth_header('some-review.example.com')) |
| |
| def testGetAuthEmail(self): |
| auth = gerrit_util.CookiesAuthenticator() |
| self.assertEqual('user@chromium.org', |
| auth.get_auth_email('chromium.googlesource.com')) |
| self.assertEqual( |
| 'user@chromium.org', |
| auth.get_auth_email('chromium-review.googlesource.com')) |
| self.assertIsNone(auth.get_auth_email('some-review.example.com')) |
| |
| |
| class GceAuthenticatorTest(unittest.TestCase): |
| def setUp(self): |
| super(GceAuthenticatorTest, self).setUp() |
| mock.patch('httplib2.Http').start() |
| mock.patch('os.getenv', return_value=None).start() |
| mock.patch('gerrit_util.time_sleep').start() |
| mock.patch('gerrit_util.time_time').start() |
| self.addCleanup(mock.patch.stopall) |
| |
| # GceAuthenticator has class variables that cache the results. Build a |
| # new class for every test to avoid inter-test dependencies. |
| class GceAuthenticator(gerrit_util.GceAuthenticator): |
| pass |
| |
| self.GceAuthenticator = GceAuthenticator |
| |
| def testIsGce_EnvVarSkip(self, *_mocks): |
| os.getenv.return_value = '1' |
| self.assertFalse(self.GceAuthenticator.is_gce()) |
| os.getenv.assert_called_once_with('SKIP_GCE_AUTH_FOR_GIT') |
| |
| def testIsGce_Error(self): |
| httplib2.Http().request.side_effect = httplib2.HttpLib2Error |
| self.assertFalse(self.GceAuthenticator.is_gce()) |
| |
| def testIsGce_500(self): |
| httplib2.Http().request.return_value = (mock.Mock(status=500), None) |
| self.assertFalse(self.GceAuthenticator.is_gce()) |
| last_call = gerrit_util.time_sleep.mock_calls[-1] |
| self.assertLessEqual(last_call, mock.call(43.0)) |
| |
| def testIsGce_FailsThenSucceeds(self): |
| response = mock.Mock(status=200) |
| response.get.return_value = 'Google' |
| httplib2.Http().request.side_effect = [ |
| (mock.Mock(status=500), None), |
| (response, 'who cares'), |
| ] |
| self.assertTrue(self.GceAuthenticator.is_gce()) |
| |
| def testIsGce_MetadataFlavorIsNotGoogle(self): |
| response = mock.Mock(status=200) |
| response.get.return_value = None |
| httplib2.Http().request.return_value = (response, 'who cares') |
| self.assertFalse(self.GceAuthenticator.is_gce()) |
| response.get.assert_called_once_with('metadata-flavor') |
| |
| def testIsGce_ResultIsCached(self): |
| response = mock.Mock(status=200) |
| response.get.return_value = 'Google' |
| httplib2.Http().request.side_effect = [(response, 'who cares')] |
| self.assertTrue(self.GceAuthenticator.is_gce()) |
| self.assertTrue(self.GceAuthenticator.is_gce()) |
| httplib2.Http().request.assert_called_once() |
| |
| def testGetAuthHeader_Error(self): |
| httplib2.Http().request.side_effect = httplib2.HttpLib2Error |
| self.assertIsNone(self.GceAuthenticator().get_auth_header('')) |
| |
| def testGetAuthHeader_500(self): |
| httplib2.Http().request.return_value = (mock.Mock(status=500), None) |
| self.assertIsNone(self.GceAuthenticator().get_auth_header('')) |
| |
| def testGetAuthHeader_Non200(self): |
| httplib2.Http().request.return_value = (mock.Mock(status=403), None) |
| self.assertIsNone(self.GceAuthenticator().get_auth_header('')) |
| |
| def testGetAuthHeader_OK(self): |
| httplib2.Http().request.return_value = ( |
| mock.Mock(status=200), |
| '{"expires_in": 125, "token_type": "TYPE", "access_token": "TOKEN"}' |
| ) |
| gerrit_util.time_time.return_value = 0 |
| self.assertEqual('TYPE TOKEN', |
| self.GceAuthenticator().get_auth_header('')) |
| |
| def testGetAuthHeader_Cache(self): |
| httplib2.Http().request.return_value = ( |
| mock.Mock(status=200), |
| '{"expires_in": 125, "token_type": "TYPE", "access_token": "TOKEN"}' |
| ) |
| gerrit_util.time_time.return_value = 0 |
| self.assertEqual('TYPE TOKEN', |
| self.GceAuthenticator().get_auth_header('')) |
| self.assertEqual('TYPE TOKEN', |
| self.GceAuthenticator().get_auth_header('')) |
| httplib2.Http().request.assert_called_once() |
| |
| def testGetAuthHeader_CacheOld(self): |
| httplib2.Http().request.return_value = ( |
| mock.Mock(status=200), |
| '{"expires_in": 125, "token_type": "TYPE", "access_token": "TOKEN"}' |
| ) |
| gerrit_util.time_time.side_effect = [0, 100, 200] |
| self.assertEqual('TYPE TOKEN', |
| self.GceAuthenticator().get_auth_header('')) |
| self.assertEqual('TYPE TOKEN', |
| self.GceAuthenticator().get_auth_header('')) |
| self.assertEqual(2, len(httplib2.Http().request.mock_calls)) |
| |
| |
| class GerritUtilTest(unittest.TestCase): |
| def setUp(self): |
| super(GerritUtilTest, self).setUp() |
| mock.patch('gerrit_util.LOGGER').start() |
| mock.patch('gerrit_util.time_sleep').start() |
| mock.patch('metrics.collector').start() |
| mock.patch('metrics_utils.extract_http_metrics', |
| return_value='http_metrics').start() |
| self.addCleanup(mock.patch.stopall) |
| |
| def testQueryString(self): |
| self.assertEqual('', gerrit_util._QueryString([])) |
| self.assertEqual('first%20param%2B', |
| gerrit_util._QueryString([], 'first param+')) |
| self.assertEqual( |
| 'key:val+foo:bar', |
| gerrit_util._QueryString([('key', 'val'), ('foo', 'bar')])) |
| self.assertEqual( |
| 'first%20param%2B+key:val+foo:bar', |
| gerrit_util._QueryString([('key', 'val'), ('foo', 'bar')], |
| 'first param+')) |
| |
| @mock.patch('gerrit_util.Authenticator') |
| def testCreateHttpConn_Basic(self, mockAuth): |
| mockAuth.get().get_auth_header.return_value = None |
| conn = gerrit_util.CreateHttpConn('host.example.com', 'foo/bar') |
| self.assertEqual('host.example.com', conn.req_host) |
| self.assertEqual( |
| { |
| 'uri': 'https://host.example.com/foo/bar', |
| 'method': 'GET', |
| 'headers': {}, |
| 'body': None, |
| }, conn.req_params) |
| |
| @mock.patch('gerrit_util.Authenticator') |
| def testCreateHttpConn_Authenticated(self, mockAuth): |
| mockAuth.get().get_auth_header.return_value = 'Bearer token' |
| conn = gerrit_util.CreateHttpConn('host.example.com', |
| 'foo/bar', |
| headers={'header': 'value'}) |
| self.assertEqual('host.example.com', conn.req_host) |
| self.assertEqual( |
| { |
| 'uri': 'https://host.example.com/a/foo/bar', |
| 'method': 'GET', |
| 'headers': { |
| 'Authorization': 'Bearer token', |
| 'header': 'value' |
| }, |
| 'body': None, |
| }, conn.req_params) |
| |
| @mock.patch('gerrit_util.Authenticator') |
| def testCreateHttpConn_Body(self, mockAuth): |
| mockAuth.get().get_auth_header.return_value = None |
| conn = gerrit_util.CreateHttpConn('host.example.com', |
| 'foo/bar', |
| body={ |
| 'l': [1, 2, 3], |
| 'd': { |
| 'k': 'v' |
| } |
| }) |
| self.assertEqual('host.example.com', conn.req_host) |
| self.assertEqual( |
| { |
| 'uri': 'https://host.example.com/foo/bar', |
| 'method': 'GET', |
| 'headers': { |
| 'Content-Type': 'application/json' |
| }, |
| 'body': '{"d": {"k": "v"}, "l": [1, 2, 3]}', |
| }, conn.req_params) |
| |
| def testReadHttpResponse_200(self): |
| conn = mock.Mock() |
| conn.req_params = {'uri': 'uri', 'method': 'method'} |
| conn.request.return_value = (mock.Mock(status=200), |
| b'content\xe2\x9c\x94') |
| |
| content = gerrit_util.ReadHttpResponse(conn) |
| self.assertEqual('content✔', content.getvalue()) |
| metrics.collector.add_repeated.assert_called_once_with( |
| 'http_requests', 'http_metrics') |
| |
| def testReadHttpResponse_AuthenticationIssue(self): |
| for status in (302, 401, 403): |
| response = mock.Mock(status=status) |
| response.get.return_value = None |
| conn = mock.Mock(req_params={'uri': 'uri', 'method': 'method'}) |
| conn.request.return_value = (response, b'') |
| |
| with mock.patch('sys.stdout', StringIO()): |
| with self.assertRaises(gerrit_util.GerritError) as cm: |
| gerrit_util.ReadHttpResponse(conn) |
| |
| self.assertEqual(status, cm.exception.http_status) |
| self.assertIn('Your Gerrit credentials might be misconfigured', |
| sys.stdout.getvalue()) |
| |
| def testReadHttpResponse_ClientError(self): |
| conn = mock.Mock(req_params={'uri': 'uri', 'method': 'method'}) |
| conn.request.return_value = (mock.Mock(status=404), b'') |
| |
| with self.assertRaises(gerrit_util.GerritError) as cm: |
| gerrit_util.ReadHttpResponse(conn) |
| |
| self.assertEqual(404, cm.exception.http_status) |
| |
| def readHttpResponse_ServerErrorHelper(self, status): |
| conn = mock.Mock(req_params={'uri': 'uri', 'method': 'method'}) |
| conn.request.return_value = (mock.Mock(status=status), b'') |
| |
| with self.assertRaises(gerrit_util.GerritError) as cm: |
| gerrit_util.ReadHttpResponse(conn) |
| |
| self.assertEqual(status, cm.exception.http_status) |
| self.assertEqual(gerrit_util.TRY_LIMIT, len(conn.request.mock_calls)) |
| last_call = gerrit_util.time_sleep.mock_calls[-1] |
| self.assertLessEqual(last_call, mock.call(422.0)) |
| |
| def testReadHttpResponse_ServerError(self): |
| self.readHttpResponse_ServerErrorHelper(status=404) |
| self.readHttpResponse_ServerErrorHelper(status=409) |
| self.readHttpResponse_ServerErrorHelper(status=429) |
| self.readHttpResponse_ServerErrorHelper(status=500) |
| |
| def testReadHttpResponse_ServerErrorAndSuccess(self): |
| conn = mock.Mock(req_params={'uri': 'uri', 'method': 'method'}) |
| conn.request.side_effect = [ |
| (mock.Mock(status=500), b''), |
| (mock.Mock(status=200), b'content\xe2\x9c\x94'), |
| ] |
| |
| self.assertEqual('content✔', |
| gerrit_util.ReadHttpResponse(conn).getvalue()) |
| self.assertEqual(2, len(conn.request.mock_calls)) |
| gerrit_util.time_sleep.assert_called_once_with(12.0) |
| |
| def testReadHttpResponse_TimeoutAndSuccess(self): |
| conn = mock.Mock(req_params={'uri': 'uri', 'method': 'method'}) |
| conn.request.side_effect = [ |
| socket.timeout('timeout'), |
| (mock.Mock(status=200), b'content\xe2\x9c\x94'), |
| ] |
| |
| self.assertEqual('content✔', |
| gerrit_util.ReadHttpResponse(conn).getvalue()) |
| self.assertEqual(2, len(conn.request.mock_calls)) |
| gerrit_util.time_sleep.assert_called_once_with(12.0) |
| |
| def testReadHttpResponse_Expected404(self): |
| conn = mock.Mock() |
| conn.req_params = {'uri': 'uri', 'method': 'method'} |
| conn.request.return_value = (mock.Mock(status=404), |
| b'content\xe2\x9c\x94') |
| |
| content = gerrit_util.ReadHttpResponse(conn, (404, )) |
| self.assertEqual('', content.getvalue()) |
| |
| @mock.patch('gerrit_util.ReadHttpResponse') |
| def testReadHttpJsonResponse_NotJSON(self, mockReadHttpResponse): |
| mockReadHttpResponse.return_value = StringIO('not json') |
| with self.assertRaises(gerrit_util.GerritError) as cm: |
| gerrit_util.ReadHttpJsonResponse(None) |
| self.assertEqual(cm.exception.http_status, 200) |
| self.assertEqual(cm.exception.message, |
| '(200) Unexpected json output: not json') |
| |
| @mock.patch('gerrit_util.ReadHttpResponse') |
| def testReadHttpJsonResponse_EmptyValue(self, mockReadHttpResponse): |
| mockReadHttpResponse.return_value = StringIO(')]}\'') |
| self.assertIsNone(gerrit_util.ReadHttpJsonResponse(None)) |
| |
| @mock.patch('gerrit_util.ReadHttpResponse') |
| def testReadHttpJsonResponse_JSON(self, mockReadHttpResponse): |
| expected_value = {'foo': 'bar', 'baz': [1, '2', 3]} |
| mockReadHttpResponse.return_value = StringIO(')]}\'\n' + |
| json.dumps(expected_value)) |
| self.assertEqual(expected_value, gerrit_util.ReadHttpJsonResponse(None)) |
| |
| @mock.patch('gerrit_util.CreateHttpConn') |
| @mock.patch('gerrit_util.ReadHttpJsonResponse') |
| def testQueryChanges(self, mockJsonResponse, mockCreateHttpConn): |
| gerrit_util.QueryChanges('host', [('key', 'val'), ('foo', 'bar baz')], |
| 'first param', |
| limit=500, |
| o_params=['PARAM_A', 'PARAM_B'], |
| start='start') |
| mockCreateHttpConn.assert_called_once_with( |
| 'host', ('changes/?q=first%20param+key:val+foo:bar+baz' |
| '&start=start' |
| '&n=500' |
| '&o=PARAM_A' |
| '&o=PARAM_B'), |
| timeout=30.0) |
| |
| def testQueryChanges_NoParams(self): |
| self.assertRaises(RuntimeError, gerrit_util.QueryChanges, 'host', []) |
| |
| @mock.patch('gerrit_util.QueryChanges') |
| def testGenerateAllChanges(self, mockQueryChanges): |
| mockQueryChanges.side_effect = [ |
| # First results page |
| [ |
| { |
| '_number': '4' |
| }, |
| { |
| '_number': '3' |
| }, |
| { |
| '_number': '2', |
| '_more_changes': True |
| }, |
| ], |
| # Second results page, there are new changes, so second page |
| # includes some results from the first page. |
| [ |
| { |
| '_number': '2' |
| }, |
| { |
| '_number': '1' |
| }, |
| ], |
| # GenerateAllChanges queries again from the start to get any new |
| # changes (5 in this case). |
| [ |
| { |
| '_number': '5' |
| }, |
| { |
| '_number': '4' |
| }, |
| { |
| '_number': '3', |
| '_more_changes': True |
| }, |
| ], |
| ] |
| |
| changes = list(gerrit_util.GenerateAllChanges('host', 'params')) |
| self.assertEqual([ |
| { |
| '_number': '4' |
| }, |
| { |
| '_number': '3' |
| }, |
| { |
| '_number': '2', |
| '_more_changes': True |
| }, |
| { |
| '_number': '1' |
| }, |
| { |
| '_number': '5' |
| }, |
| ], changes) |
| self.assertEqual([ |
| mock.call('host', 'params', None, 500, None, 0), |
| mock.call('host', 'params', None, 500, None, 3), |
| mock.call('host', 'params', None, 500, None, 0), |
| ], mockQueryChanges.mock_calls) |
| |
| @mock.patch('gerrit_util.CreateHttpConn') |
| @mock.patch('gerrit_util.ReadHttpJsonResponse') |
| def testIsCodeOwnersEnabledOnRepo_Disabled(self, mockJsonResponse, |
| mockCreateHttpConn): |
| mockJsonResponse.return_value = {'status': {'disabled': True}} |
| self.assertFalse(gerrit_util.IsCodeOwnersEnabledOnRepo('host', 'repo')) |
| |
| @mock.patch('gerrit_util.CreateHttpConn') |
| @mock.patch('gerrit_util.ReadHttpJsonResponse') |
| def testIsCodeOwnersEnabledOnRepo_Enabled(self, mockJsonResponse, |
| mockCreateHttpConn): |
| mockJsonResponse.return_value = {'status': {}} |
| self.assertTrue(gerrit_util.IsCodeOwnersEnabledOnRepo('host', 'repo')) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |