| #!/usr/bin/env vpython3 |
| # coding=utf-8 |
| # Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """Unit tests for fetch.py.""" |
| |
| import contextlib |
| import logging |
| import optparse |
| import os |
| import subprocess |
| import sys |
| import tempfile |
| import unittest |
| |
| import distutils |
| |
| if sys.version_info.major == 2: |
| from StringIO import StringIO |
| import mock |
| else: |
| from io import StringIO |
| from unittest import mock |
| |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| |
| import fetch |
| |
| |
| class SystemExitMock(Exception): |
| pass |
| |
| |
| class UsageMock(Exception): |
| pass |
| |
| |
| class TestUtilityFunctions(unittest.TestCase): |
| """This test case is against utility functions""" |
| |
| @mock.patch('sys.stdout', StringIO()) |
| @mock.patch('os.listdir', return_value=['foo.py', 'bar']) |
| @mock.patch('sys.exit', side_effect=SystemExitMock) |
| def test_usage_with_message(self, exit_mock, listdir): |
| with self.assertRaises(SystemExitMock): |
| fetch.usage('foo') |
| exit_mock.assert_called_once_with(1) |
| listdir.assert_called_once() |
| |
| stdout = sys.stdout.getvalue() |
| self.assertTrue(stdout.startswith('Error: foo')) |
| |
| self._usage_static_message(stdout) |
| |
| @mock.patch('sys.stdout', StringIO()) |
| @mock.patch('os.listdir', return_value=['foo.py', 'bar']) |
| @mock.patch('sys.exit', side_effect=SystemExitMock) |
| def test_usage_with_no_message(self, exit_mock, listdir): |
| with self.assertRaises(SystemExitMock): |
| fetch.usage() |
| exit_mock.assert_called_once_with(0) |
| listdir.assert_called_once() |
| |
| self._usage_static_message(sys.stdout.getvalue()) |
| |
| def _usage_static_message(self, stdout): |
| valid_fetch_config_text = 'Valid fetch configs:' |
| self.assertIn(valid_fetch_config_text, stdout) |
| |
| # split[0] contains static text, whereas split[1] contains list of configs |
| split = stdout.split(valid_fetch_config_text) |
| self.assertEqual(2, len(split)) |
| |
| # verify a few fetch_configs |
| self.assertIn('foo', split[1]) |
| self.assertNotIn('bar', split[1]) |
| |
| @mock.patch('fetch.usage', side_effect=UsageMock) |
| def test_handle_args_invalid_usage(self, usage): |
| with self.assertRaises(UsageMock): |
| fetch.handle_args(['filename', '-h']) |
| usage.assert_called_with() |
| |
| with self.assertRaises(UsageMock): |
| fetch.handle_args(['filename']) |
| usage.assert_called_with('Must specify a config.') |
| |
| with self.assertRaises(UsageMock): |
| fetch.handle_args(['filename', '--foo']) |
| usage.assert_called_with('Invalid option --foo.') |
| |
| bad_arguments = [ |
| '--not-valid-param', '-not-valid-param=1', '--not-valid-param=1=2' |
| ] |
| |
| for bad_argument in bad_arguments: |
| with self.assertRaises(UsageMock): |
| fetch.handle_args(['filename', 'foo', bad_argument]) |
| usage.assert_called_with('Got bad arguments [\'%s\']' % (bad_argument)) |
| |
| @mock.patch('optparse.Values', return_value=None) |
| def test_handle_args_valid_usage(self, values): |
| response = fetch.handle_args(['filename', 'foo']) |
| values.assert_called_with({ |
| 'dry_run': False, |
| 'nohooks': False, |
| 'no_history': False, |
| 'force': False |
| }) |
| self.assertEqual((None, 'foo', []), response) |
| |
| response = fetch.handle_args([ |
| 'filename', '-n', '--dry-run', '--nohooks', '--no-history', '--force', |
| 'foo', '--some-param=1', '--bar=2' |
| ]) |
| values.assert_called_with({ |
| 'dry_run': True, |
| 'nohooks': True, |
| 'no_history': True, |
| 'force': True |
| }) |
| self.assertEqual((None, 'foo', ['--some-param=1', '--bar=2']), response) |
| |
| @mock.patch('os.path.exists', return_value=False) |
| @mock.patch('sys.stdout', StringIO()) |
| @mock.patch('sys.exit', side_effect=SystemExitMock) |
| def test_run_config_fetch_not_found(self, exit_mock, exists): |
| with self.assertRaises(SystemExitMock): |
| fetch.run_config_fetch('foo', []) |
| exit_mock.assert_called_with(1) |
| exists.assert_called_once() |
| |
| self.assertEqual(1, len(exists.call_args[0])) |
| self.assertTrue(exists.call_args[0][0].endswith('foo.py')) |
| |
| stdout = sys.stdout.getvalue() |
| self.assertEqual('Could not find a config for foo\n', stdout) |
| |
| def test_run_config_fetch_integration(self): |
| config = fetch.run_config_fetch('depot_tools', []) |
| url = 'https://chromium.googlesource.com/chromium/tools/depot_tools.git' |
| spec = { |
| 'type': 'gclient_git', |
| 'gclient_git_spec': { |
| 'solutions': [{ |
| 'url': url, |
| 'managed': False, |
| 'name': 'depot_tools', |
| 'deps_file': 'DEPS', |
| }], |
| } |
| } |
| self.assertEqual((spec, 'depot_tools'), config) |
| |
| def test_checkout_factory(self): |
| with self.assertRaises(KeyError): |
| fetch.CheckoutFactory('invalid', {}, {}, "root") |
| |
| gclient = fetch.CheckoutFactory('gclient', {}, {}, "root") |
| self.assertTrue(isinstance(gclient, fetch.GclientCheckout)) |
| |
| |
| class TestCheckout(unittest.TestCase): |
| def setUp(self): |
| mock.patch('sys.stdout', StringIO()).start() |
| self.addCleanup(mock.patch.stopall) |
| |
| self.opts = optparse.Values({'dry_run': False}) |
| self.checkout = fetch.Checkout(self.opts, {}, '') |
| |
| @contextlib.contextmanager |
| def _temporary_file(self): |
| """Creates a temporary file and removes it once it's out of scope""" |
| name = tempfile.mktemp() |
| try: |
| with open(name, 'w+') as f: |
| yield f |
| finally: |
| os.remove(name) |
| |
| def test_run_dry(self): |
| self.opts.dry_run = True |
| self.checkout.run(['foo-not-found']) |
| self.assertEqual('Running: foo-not-found\n', sys.stdout.getvalue()) |
| |
| def test_run_non_existing_command(self): |
| with self.assertRaises(OSError): |
| self.checkout.run(['foo-not-found']) |
| self.assertEqual('Running: foo-not-found\n', sys.stdout.getvalue()) |
| |
| def test_run_non_existing_command_return_stdout(self): |
| with self.assertRaises(OSError): |
| self.checkout.run(['foo-not-found'], return_stdout=True) |
| self.assertEqual('Running: foo-not-found\n', sys.stdout.getvalue()) |
| |
| @mock.patch('sys.stderr', StringIO()) |
| @mock.patch('sys.exit', side_effect=SystemExitMock) |
| def test_run_wrong_param(self, exit_mock): |
| # mocked version of sys.std* is not passed to subprocess, use temp files |
| with self._temporary_file() as f: |
| with self.assertRaises(subprocess.CalledProcessError): |
| self.checkout.run([sys.executable, '-invalid-param'], |
| return_stdout=True, |
| stderr=f) |
| f.seek(0) |
| # Expect some message to stderr |
| self.assertNotEqual('', f.read()) |
| self.assertEqual('', sys.stderr.getvalue()) |
| |
| with self._temporary_file() as f: |
| with self.assertRaises(SystemExitMock): |
| self.checkout.run([sys.executable, '-invalid-param'], stderr=f) |
| f.seek(0) |
| # Expect some message to stderr |
| self.assertNotEqual('', f.read()) |
| self.assertIn('Subprocess failed with return code', sys.stdout.getvalue()) |
| exit_mock.assert_called_once() |
| |
| def test_run_return_as_value(self): |
| cmd = ['python', '-c', 'print("foo")'] |
| |
| response = self.checkout.run(cmd, return_stdout=True) |
| # we expect no response other than information about command |
| self.assertNotIn('foo', sys.stdout.getvalue().split('\n')) |
| # this file should be included in response |
| self.assertEqual('foo', response.strip()) |
| |
| def test_run_print_to_stdout(self): |
| cmd = ['python', '-c', 'print("foo")'] |
| |
| # mocked version of sys.std* is not passed to subprocess, use temp files |
| with self._temporary_file() as stdout: |
| with self._temporary_file() as stderr: |
| response = self.checkout.run(cmd, stdout=stdout, stderr=stderr) |
| stdout.seek(0) |
| stderr.seek(0) |
| self.assertEqual('foo\n', stdout.read()) |
| self.assertEqual('', stderr.read()) |
| |
| stdout = sys.stdout.getvalue() |
| self.assertEqual('', response) |
| |
| |
| class TestGClientCheckout(unittest.TestCase): |
| def setUp(self): |
| self.run = mock.patch('fetch.Checkout.run').start() |
| |
| self.opts = optparse.Values({'dry_run': False}) |
| self.checkout = fetch.GclientCheckout(self.opts, {}, '/root') |
| |
| self.addCleanup(mock.patch.stopall) |
| |
| @mock.patch('distutils.spawn.find_executable', return_value=True) |
| def test_run_gclient_executable_found(self, find_executable): |
| self.checkout.run_gclient('foo', 'bar', baz='qux') |
| find_executable.assert_called_once_with('gclient') |
| self.run.assert_called_once_with(('gclient', 'foo', 'bar'), baz='qux') |
| |
| @mock.patch('distutils.spawn.find_executable', return_value=False) |
| def test_run_gclient_executable_not_found(self, find_executable): |
| self.checkout.run_gclient('foo', 'bar', baz='qux') |
| find_executable.assert_called_once_with('gclient') |
| args = self.run.call_args[0][0] |
| kargs = self.run.call_args[1] |
| |
| self.assertEqual(4, len(args)) |
| self.assertEqual(sys.executable, args[0]) |
| self.assertTrue(args[1].endswith('gclient.py')) |
| self.assertEqual(('foo', 'bar'), args[2:]) |
| self.assertEqual({'baz': 'qux'}, kargs) |
| |
| |
| class TestGclientGitCheckout(unittest.TestCase): |
| def setUp(self): |
| self.run_gclient = mock.patch('fetch.GclientCheckout.run_gclient').start() |
| self.run_git = mock.patch('fetch.GitCheckout.run_git').start() |
| |
| self.opts = optparse.Values({ |
| 'dry_run': False, |
| 'nohooks': True, |
| 'no_history': False |
| }) |
| specs = { |
| 'solutions': [{ |
| 'foo': 'bar', |
| 'baz': 1 |
| }, { |
| 'foo': False |
| }], |
| 'with_branch_heads': True, |
| } |
| |
| self.checkout = fetch.GclientGitCheckout(self.opts, specs, '/root') |
| |
| self.addCleanup(mock.patch.stopall) |
| |
| def test_init(self): |
| self.checkout.init() |
| self.assertEqual(2, self.run_gclient.call_count) |
| self.assertEqual(3, self.run_git.call_count) |
| |
| # Verify only expected commands and ignore arguments to avoid copying |
| # commands from fetch.py |
| self.assertEqual(['config', 'sync'], |
| [a[0][0] for a in self.run_gclient.call_args_list]) |
| self.assertEqual(['submodule', 'config', 'config'], |
| [a[0][0] for a in self.run_git.call_args_list]) |
| |
| # First call to gclient, format spec is expected to be called so "foo" is |
| # expected to be present |
| args = self.run_gclient.call_args_list[0][0] |
| self.assertEqual('config', args[0]) |
| self.assertIn('foo', args[2]) |
| |
| |
| if __name__ == '__main__': |
| logging.basicConfig( |
| level=logging.DEBUG if '-v' in sys.argv else logging.ERROR) |
| unittest.main() |