| #!/usr/bin/env vpython3 |
| # coding=utf-8 |
| # Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| from __future__ import print_function |
| from __future__ import unicode_literals |
| |
| import io |
| import os |
| import sys |
| import tempfile |
| import time |
| import unittest |
| |
| if sys.version_info.major == 2: |
| from StringIO import StringIO |
| import mock |
| else: |
| from io import StringIO |
| from unittest import mock |
| |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| |
| from testing_support import trial_dir |
| |
| import gclient_utils |
| import subprocess2 |
| |
| |
| class CheckCallAndFilterTestCase(unittest.TestCase): |
| class ProcessIdMock(object): |
| def __init__(self, test_string, return_code=0): |
| self.stdout = io.BytesIO(test_string.encode('utf-8')) |
| self.pid = 9284 |
| self.return_code = return_code |
| |
| def wait(self): |
| return self.return_code |
| |
| def setUp(self): |
| super(CheckCallAndFilterTestCase, self).setUp() |
| self.printfn = io.StringIO() |
| self.stdout = io.BytesIO() |
| if sys.version_info.major == 2: |
| mock.patch('sys.stdout', self.stdout).start() |
| mock.patch('__builtin__.print', self.printfn.write).start() |
| else: |
| mock.patch('sys.stdout', mock.Mock()).start() |
| mock.patch('sys.stdout.buffer', self.stdout).start() |
| mock.patch('builtins.print', self.printfn.write).start() |
| mock.patch('sys.stdout.flush', lambda: None).start() |
| self.addCleanup(mock.patch.stopall) |
| |
| @mock.patch('subprocess2.Popen') |
| def testCheckCallAndFilter(self, mockPopen): |
| cwd = 'bleh' |
| args = ['boo', 'foo', 'bar'] |
| test_string = 'ahah\naccb\nallo\naddb\n✔' |
| |
| mockPopen.return_value = self.ProcessIdMock(test_string) |
| |
| line_list = [] |
| result = gclient_utils.CheckCallAndFilter( |
| args, cwd=cwd, show_header=True, always_show_header=True, |
| filter_fn=line_list.append) |
| |
| self.assertEqual(result, test_string.encode('utf-8')) |
| self.assertEqual(line_list, [ |
| '________ running \'boo foo bar\' in \'bleh\'\n', |
| 'ahah', |
| 'accb', |
| 'allo', |
| 'addb', |
| '✔']) |
| self.assertEqual(self.stdout.getvalue(), b'') |
| |
| mockPopen.assert_called_with( |
| args, cwd=cwd, stdout=subprocess2.PIPE, stderr=subprocess2.STDOUT, |
| bufsize=0) |
| |
| @mock.patch('time.sleep') |
| @mock.patch('subprocess2.Popen') |
| def testCheckCallAndFilter_RetryOnce(self, mockPopen, mockTime): |
| cwd = 'bleh' |
| args = ['boo', 'foo', 'bar'] |
| test_string = 'ahah\naccb\nallo\naddb\n✔' |
| |
| mockPopen.side_effect = [ |
| self.ProcessIdMock(test_string, 1), |
| self.ProcessIdMock(test_string, 0), |
| ] |
| |
| line_list = [] |
| result = gclient_utils.CheckCallAndFilter( |
| args, cwd=cwd, show_header=True, always_show_header=True, |
| filter_fn=line_list.append, retry=True) |
| |
| self.assertEqual(result, test_string.encode('utf-8')) |
| |
| self.assertEqual(line_list, [ |
| '________ running \'boo foo bar\' in \'bleh\'\n', |
| 'ahah', |
| 'accb', |
| 'allo', |
| 'addb', |
| '✔', |
| '________ running \'boo foo bar\' in \'bleh\' attempt 2 / 4\n', |
| 'ahah', |
| 'accb', |
| 'allo', |
| 'addb', |
| '✔', |
| ]) |
| |
| mockTime.assert_called_with(gclient_utils.RETRY_INITIAL_SLEEP) |
| |
| self.assertEqual( |
| mockPopen.mock_calls, |
| [ |
| mock.call( |
| args, cwd=cwd, stdout=subprocess2.PIPE, |
| stderr=subprocess2.STDOUT, bufsize=0), |
| mock.call( |
| args, cwd=cwd, stdout=subprocess2.PIPE, |
| stderr=subprocess2.STDOUT, bufsize=0), |
| ]) |
| |
| self.assertEqual(self.stdout.getvalue(), b'') |
| self.assertEqual( |
| self.printfn.getvalue(), |
| 'WARNING: subprocess \'"boo" "foo" "bar"\' in bleh failed; will retry ' |
| 'after a short nap...') |
| |
| @mock.patch('subprocess2.Popen') |
| def testCheckCallAndFilter_PrintStdout(self, mockPopen): |
| cwd = 'bleh' |
| args = ['boo', 'foo', 'bar'] |
| test_string = 'ahah\naccb\nallo\naddb\n✔' |
| |
| mockPopen.return_value = self.ProcessIdMock(test_string) |
| |
| result = gclient_utils.CheckCallAndFilter( |
| args, cwd=cwd, show_header=True, always_show_header=True, |
| print_stdout=True) |
| |
| self.assertEqual(result, test_string.encode('utf-8')) |
| self.assertEqual(self.stdout.getvalue().splitlines(), [ |
| b"________ running 'boo foo bar' in 'bleh'", |
| b'ahah', |
| b'accb', |
| b'allo', |
| b'addb', |
| b'\xe2\x9c\x94', |
| ]) |
| |
| |
| class AnnotatedTestCase(unittest.TestCase): |
| def setUp(self): |
| self.out = gclient_utils.MakeFileAnnotated(io.BytesIO()) |
| self.annotated = gclient_utils.MakeFileAnnotated( |
| io.BytesIO(), include_zero=True) |
| |
| def testWrite(self): |
| test_cases = [ |
| ('test string\n', b'test string\n'), |
| (b'test string\n', b'test string\n'), |
| ('✔\n', b'\xe2\x9c\x94\n'), |
| (b'\xe2\x9c\x94\n', b'\xe2\x9c\x94\n'), |
| ('first line\nsecondline\n', b'first line\nsecondline\n'), |
| (b'first line\nsecondline\n', b'first line\nsecondline\n'), |
| ] |
| |
| for test_input, expected_output in test_cases: |
| out = gclient_utils.MakeFileAnnotated(io.BytesIO()) |
| out.write(test_input) |
| self.assertEqual(out.getvalue(), expected_output) |
| |
| def testWrite_Annotated(self): |
| test_cases = [ |
| ('test string\n', b'0>test string\n'), |
| (b'test string\n', b'0>test string\n'), |
| ('✔\n', b'0>\xe2\x9c\x94\n'), |
| (b'\xe2\x9c\x94\n', b'0>\xe2\x9c\x94\n'), |
| ('first line\nsecondline\n', b'0>first line\n0>secondline\n'), |
| (b'first line\nsecondline\n', b'0>first line\n0>secondline\n'), |
| ] |
| |
| for test_input, expected_output in test_cases: |
| out = gclient_utils.MakeFileAnnotated(io.BytesIO(), include_zero=True) |
| out.write(test_input) |
| self.assertEqual(out.getvalue(), expected_output) |
| |
| def testByteByByteInput(self): |
| self.out.write(b'\xe2') |
| self.out.write(b'\x9c') |
| self.out.write(b'\x94') |
| self.out.write(b'\n') |
| self.out.write(b'\xe2') |
| self.out.write(b'\n') |
| self.assertEqual(self.out.getvalue(), b'\xe2\x9c\x94\n\xe2\n') |
| |
| def testByteByByteInput_Annotated(self): |
| self.annotated.write(b'\xe2') |
| self.annotated.write(b'\x9c') |
| self.annotated.write(b'\x94') |
| self.annotated.write(b'\n') |
| self.annotated.write(b'\xe2') |
| self.annotated.write(b'\n') |
| self.assertEqual(self.annotated.getvalue(), b'0>\xe2\x9c\x94\n0>\xe2\n') |
| |
| def testFlush_Annotated(self): |
| self.annotated.write(b'first line\nsecond line') |
| self.assertEqual(self.annotated.getvalue(), b'0>first line\n') |
| self.annotated.flush() |
| self.assertEqual( |
| self.annotated.getvalue(), b'0>first line\n0>second line\n') |
| |
| |
| class SplitUrlRevisionTestCase(unittest.TestCase): |
| def testSSHUrl(self): |
| url = "ssh://test@example.com/test.git" |
| rev = "ac345e52dc" |
| out_url, out_rev = gclient_utils.SplitUrlRevision(url) |
| self.assertEqual(out_rev, None) |
| self.assertEqual(out_url, url) |
| out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev)) |
| self.assertEqual(out_rev, rev) |
| self.assertEqual(out_url, url) |
| url = "ssh://example.com/test.git" |
| out_url, out_rev = gclient_utils.SplitUrlRevision(url) |
| self.assertEqual(out_rev, None) |
| self.assertEqual(out_url, url) |
| out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev)) |
| self.assertEqual(out_rev, rev) |
| self.assertEqual(out_url, url) |
| url = "ssh://example.com/git/test.git" |
| out_url, out_rev = gclient_utils.SplitUrlRevision(url) |
| self.assertEqual(out_rev, None) |
| self.assertEqual(out_url, url) |
| out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev)) |
| self.assertEqual(out_rev, rev) |
| self.assertEqual(out_url, url) |
| rev = "test-stable" |
| out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev)) |
| self.assertEqual(out_rev, rev) |
| self.assertEqual(out_url, url) |
| url = "ssh://user-name@example.com/~/test.git" |
| out_url, out_rev = gclient_utils.SplitUrlRevision(url) |
| self.assertEqual(out_rev, None) |
| self.assertEqual(out_url, url) |
| out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev)) |
| self.assertEqual(out_rev, rev) |
| self.assertEqual(out_url, url) |
| url = "ssh://user-name@example.com/~username/test.git" |
| out_url, out_rev = gclient_utils.SplitUrlRevision(url) |
| self.assertEqual(out_rev, None) |
| self.assertEqual(out_url, url) |
| out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev)) |
| self.assertEqual(out_rev, rev) |
| self.assertEqual(out_url, url) |
| url = "git@github.com:dart-lang/spark.git" |
| out_url, out_rev = gclient_utils.SplitUrlRevision(url) |
| self.assertEqual(out_rev, None) |
| self.assertEqual(out_url, url) |
| out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev)) |
| self.assertEqual(out_rev, rev) |
| self.assertEqual(out_url, url) |
| |
| def testSVNUrl(self): |
| url = "svn://example.com/test" |
| rev = "ac345e52dc" |
| out_url, out_rev = gclient_utils.SplitUrlRevision(url) |
| self.assertEqual(out_rev, None) |
| self.assertEqual(out_url, url) |
| out_url, out_rev = gclient_utils.SplitUrlRevision("%s@%s" % (url, rev)) |
| self.assertEqual(out_rev, rev) |
| self.assertEqual(out_url, url) |
| |
| |
| class GClientUtilsTest(trial_dir.TestCase): |
| def testHardToDelete(self): |
| # Use the fact that tearDown will delete the directory to make it hard to do |
| # so. |
| l1 = os.path.join(self.root_dir, 'l1') |
| l2 = os.path.join(l1, 'l2') |
| l3 = os.path.join(l2, 'l3') |
| f3 = os.path.join(l3, 'f3') |
| os.mkdir(l1) |
| os.mkdir(l2) |
| os.mkdir(l3) |
| gclient_utils.FileWrite(f3, 'foo') |
| os.chmod(f3, 0) |
| os.chmod(l3, 0) |
| os.chmod(l2, 0) |
| os.chmod(l1, 0) |
| |
| def testUpgradeToHttps(self): |
| values = [ |
| ['', ''], |
| [None, None], |
| ['foo', 'https://foo'], |
| ['http://foo', 'https://foo'], |
| ['foo/', 'https://foo/'], |
| ['ssh-svn://foo', 'ssh-svn://foo'], |
| ['ssh-svn://foo/bar/', 'ssh-svn://foo/bar/'], |
| ['codereview.chromium.org', 'https://codereview.chromium.org'], |
| ['codereview.chromium.org/', 'https://codereview.chromium.org/'], |
| ['http://foo:10000', 'http://foo:10000'], |
| ['http://foo:10000/bar', 'http://foo:10000/bar'], |
| ['foo:10000', 'http://foo:10000'], |
| ['foo:', 'https://foo:'], |
| ] |
| for content, expected in values: |
| self.assertEqual( |
| expected, gclient_utils.UpgradeToHttps(content)) |
| |
| def testParseCodereviewSettingsContent(self): |
| values = [ |
| ['# bleh\n', {}], |
| ['\t# foo : bar\n', {}], |
| ['Foo:bar', {'Foo': 'bar'}], |
| ['Foo:bar:baz\n', {'Foo': 'bar:baz'}], |
| [' Foo : bar ', {'Foo': 'bar'}], |
| [' Foo : bar \n', {'Foo': 'bar'}], |
| ['a:b\n\rc:d\re:f', {'a': 'b', 'c': 'd', 'e': 'f'}], |
| ['an_url:http://value/', {'an_url': 'http://value/'}], |
| [ |
| 'CODE_REVIEW_SERVER : http://r/s', |
| {'CODE_REVIEW_SERVER': 'https://r/s'} |
| ], |
| ['VIEW_VC:http://r/s', {'VIEW_VC': 'https://r/s'}], |
| ] |
| for content, expected in values: |
| self.assertEqual( |
| expected, gclient_utils.ParseCodereviewSettingsContent(content)) |
| |
| def testFileRead_Bytes(self): |
| with gclient_utils.temporary_file() as tmp: |
| gclient_utils.FileWrite( |
| tmp, b'foo \xe2\x9c bar', mode='wb', encoding=None) |
| self.assertEqual('foo \ufffd bar', gclient_utils.FileRead(tmp)) |
| |
| def testFileRead_Unicode(self): |
| with gclient_utils.temporary_file() as tmp: |
| gclient_utils.FileWrite(tmp, 'foo ✔ bar') |
| self.assertEqual('foo ✔ bar', gclient_utils.FileRead(tmp)) |
| |
| def testTemporaryFile(self): |
| with gclient_utils.temporary_file() as tmp: |
| gclient_utils.FileWrite(tmp, 'test') |
| self.assertEqual('test', gclient_utils.FileRead(tmp)) |
| self.assertFalse(os.path.exists(tmp)) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |
| |
| # vim: ts=2:sw=2:tw=80:et: |