blob: 6b992d72f2b0f648f7cfaa7f7890a7ecdc74cf43 [file] [log] [blame]
#!/usr/bin/env vpython3
# Copyright (c) 2020 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import os
import re
import sys
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
GCLIENT_PATH = os.path.join(ROOT_DIR, 'gclient')
sys.path.insert(0, ROOT_DIR)
import subprocess2
from testing_support import fake_repos
class GClientSmokeBase(fake_repos.FakeReposTestBase):
def setUp(self):
super(GClientSmokeBase, self).setUp()
# Make sure it doesn't try to auto update when testing!
self.env = os.environ.copy()
self.env['DEPOT_TOOLS_UPDATE'] = '0'
self.env['DEPOT_TOOLS_METRICS'] = '0'
# Suppress Python 3 warnings and other test undesirables.
self.env['GCLIENT_TEST'] = '1'
self.env['GCLIENT_PY3'] = '0' if sys.version_info.major == 2 else '1'
self.maxDiff = None
def gclient(self, cmd, cwd=None, error_ok=False):
if not cwd:
cwd = self.root_dir
cmd = [GCLIENT_PATH] + cmd
process = subprocess2.Popen(
cmd, cwd=cwd, env=self.env, stdout=subprocess2.PIPE,
stderr=subprocess2.PIPE, universal_newlines=True)
(stdout, stderr) = process.communicate()
logging.debug("XXX: %s\n%s\nXXX" % (' '.join(cmd), stdout))
logging.debug("YYY: %s\n%s\nYYY" % (' '.join(cmd), stderr))
if not error_ok:
self.assertEqual(0, process.returncode, stderr)
return (stdout.replace('\r\n', '\n'), stderr.replace('\r\n', '\n'),
process.returncode)
def untangle(self, stdout):
"""Separates output based on thread IDs."""
tasks = {}
remaining = []
task_id = 0
for line in stdout.splitlines(False):
m = re.match(r'^(\d)+>(.*)$', line)
if not m:
if task_id:
# Lines broken with carriage breaks don't have a thread ID, but belong
# to the last seen thread ID.
tasks.setdefault(task_id, []).append(line)
else:
remaining.append(line)
else:
self.assertEqual([], remaining)
task_id = int(m.group(1))
tasks.setdefault(task_id, []).append(m.group(2))
out = []
for key in sorted(tasks.keys()):
out.extend(tasks[key])
out.extend(remaining)
return '\n'.join(out)
def parseGclient(self, cmd, items, expected_stderr='', untangle=False):
"""Parse gclient's output to make it easier to test.
If untangle is True, tries to sort out the output from parallel checkout."""
(stdout, stderr, _) = self.gclient(cmd)
if untangle:
stdout = self.untangle(stdout)
self.checkString(expected_stderr, stderr)
return self.checkBlock(stdout, items)
def splitBlock(self, stdout):
"""Split gclient's output into logical execution blocks.
___ running 'foo' at '/bar'
(...)
___ running 'baz' at '/bar'
(...)
will result in 2 items of len((...).splitlines()) each.
"""
results = []
for line in stdout.splitlines(False):
# Intentionally skips empty lines.
if not line:
continue
if not line.startswith('__'):
if results:
results[-1].append(line)
else:
# TODO(maruel): gclient's git stdout is inconsistent.
# This should fail the test instead!!
pass
continue
match = re.match(r'^________ ([a-z]+) \'(.*)\' in \'(.*)\'$', line)
if match:
results.append([[match.group(1), match.group(2), match.group(3)]])
continue
match = re.match(r'^_____ (.*) is missing, syncing instead$', line)
if match:
# Blah, it's when a dependency is deleted, we should probably not
# output this message.
results.append([line])
continue
# These two regexps are a bit too broad, they are necessary only for git
# checkouts.
if (re.match(r'_____ [^ ]+ at [^ ]+', line) or
re.match(r'_____ [^ ]+ : Attempting rebase onto [0-9a-f]+...', line)):
continue
# Fail for any unrecognized lines that start with '__'.
self.fail(line)
return results
def checkBlock(self, stdout, items):
results = self.splitBlock(stdout)
for i in range(min(len(results), len(items))):
if isinstance(items[i], (list, tuple)):
verb = items[i][0]
path = items[i][1]
else:
verb = items[i]
path = self.root_dir
self.checkString(results[i][0][0], verb, (i, results[i][0][0], verb))
if sys.platform == 'win32':
# Make path lower case since casing can change randomly.
self.checkString(
results[i][0][2].lower(),
path.lower(),
(i, results[i][0][2].lower(), path.lower()))
else:
self.checkString(results[i][0][2], path, (i, results[i][0][2], path))
self.assertEqual(len(results), len(items), (stdout, items, len(results)))
return results