| #!/usr/bin/env vpython3 |
| # Copyright (c) 2020 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import logging |
| import os |
| import re |
| import sys |
| |
| ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| GCLIENT_PATH = os.path.join(ROOT_DIR, 'gclient') |
| sys.path.insert(0, ROOT_DIR) |
| |
| import subprocess2 |
| from testing_support import fake_repos |
| |
| |
| class GClientSmokeBase(fake_repos.FakeReposTestBase): |
| def setUp(self): |
| super(GClientSmokeBase, self).setUp() |
| # Make sure it doesn't try to auto update when testing! |
| self.env = os.environ.copy() |
| self.env['DEPOT_TOOLS_UPDATE'] = '0' |
| self.env['DEPOT_TOOLS_METRICS'] = '0' |
| # Suppress Python 3 warnings and other test undesirables. |
| self.env['GCLIENT_TEST'] = '1' |
| self.env['GCLIENT_PY3'] = '0' if sys.version_info.major == 2 else '1' |
| self.maxDiff = None |
| |
| def gclient(self, cmd, cwd=None, error_ok=False): |
| if not cwd: |
| cwd = self.root_dir |
| cmd = [GCLIENT_PATH] + cmd |
| process = subprocess2.Popen( |
| cmd, cwd=cwd, env=self.env, stdout=subprocess2.PIPE, |
| stderr=subprocess2.PIPE, universal_newlines=True) |
| (stdout, stderr) = process.communicate() |
| logging.debug("XXX: %s\n%s\nXXX" % (' '.join(cmd), stdout)) |
| logging.debug("YYY: %s\n%s\nYYY" % (' '.join(cmd), stderr)) |
| |
| if not error_ok: |
| self.assertEqual(0, process.returncode, stderr) |
| |
| return (stdout.replace('\r\n', '\n'), stderr.replace('\r\n', '\n'), |
| process.returncode) |
| |
| def untangle(self, stdout): |
| """Separates output based on thread IDs.""" |
| tasks = {} |
| remaining = [] |
| task_id = 0 |
| for line in stdout.splitlines(False): |
| m = re.match(r'^(\d)+>(.*)$', line) |
| if not m: |
| if task_id: |
| # Lines broken with carriage breaks don't have a thread ID, but belong |
| # to the last seen thread ID. |
| tasks.setdefault(task_id, []).append(line) |
| else: |
| remaining.append(line) |
| else: |
| self.assertEqual([], remaining) |
| task_id = int(m.group(1)) |
| tasks.setdefault(task_id, []).append(m.group(2)) |
| out = [] |
| for key in sorted(tasks.keys()): |
| out.extend(tasks[key]) |
| out.extend(remaining) |
| return '\n'.join(out) |
| |
| def parseGclient(self, cmd, items, expected_stderr='', untangle=False): |
| """Parse gclient's output to make it easier to test. |
| If untangle is True, tries to sort out the output from parallel checkout.""" |
| (stdout, stderr, _) = self.gclient(cmd) |
| if untangle: |
| stdout = self.untangle(stdout) |
| self.checkString(expected_stderr, stderr) |
| return self.checkBlock(stdout, items) |
| |
| def splitBlock(self, stdout): |
| """Split gclient's output into logical execution blocks. |
| ___ running 'foo' at '/bar' |
| (...) |
| ___ running 'baz' at '/bar' |
| (...) |
| |
| will result in 2 items of len((...).splitlines()) each. |
| """ |
| results = [] |
| for line in stdout.splitlines(False): |
| # Intentionally skips empty lines. |
| if not line: |
| continue |
| if not line.startswith('__'): |
| if results: |
| results[-1].append(line) |
| else: |
| # TODO(maruel): gclient's git stdout is inconsistent. |
| # This should fail the test instead!! |
| pass |
| continue |
| |
| match = re.match(r'^________ ([a-z]+) \'(.*)\' in \'(.*)\'$', line) |
| if match: |
| results.append([[match.group(1), match.group(2), match.group(3)]]) |
| continue |
| |
| match = re.match(r'^_____ (.*) is missing, syncing instead$', line) |
| if match: |
| # Blah, it's when a dependency is deleted, we should probably not |
| # output this message. |
| results.append([line]) |
| continue |
| |
| # These two regexps are a bit too broad, they are necessary only for git |
| # checkouts. |
| if (re.match(r'_____ [^ ]+ at [^ ]+', line) or |
| re.match(r'_____ [^ ]+ : Attempting rebase onto [0-9a-f]+...', line)): |
| continue |
| |
| # Fail for any unrecognized lines that start with '__'. |
| self.fail(line) |
| return results |
| |
| def checkBlock(self, stdout, items): |
| results = self.splitBlock(stdout) |
| for i in range(min(len(results), len(items))): |
| if isinstance(items[i], (list, tuple)): |
| verb = items[i][0] |
| path = items[i][1] |
| else: |
| verb = items[i] |
| path = self.root_dir |
| self.checkString(results[i][0][0], verb, (i, results[i][0][0], verb)) |
| if sys.platform == 'win32': |
| # Make path lower case since casing can change randomly. |
| self.checkString( |
| results[i][0][2].lower(), |
| path.lower(), |
| (i, results[i][0][2].lower(), path.lower())) |
| else: |
| self.checkString(results[i][0][2], path, (i, results[i][0][2], path)) |
| self.assertEqual(len(results), len(items), (stdout, items, len(results))) |
| return results |
| |
| |