| # Lint as: python3 |
| # Copyright 2020 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import ast |
| import collections |
| import glob |
| import logging |
| import os |
| import pdb |
| from typing import Generator, List, Optional, Tuple |
| |
| |
| class ControlFileError(Exception): |
| """Generic error from this package.""" |
| |
| |
| Control = collections.namedtuple('Control', |
| 'path, category, name, suites, main_package') |
| |
| |
| def load_all() -> List[Control]: |
| controls = [] |
| for path in _enumerate_files(): |
| logging.debug('Processing %s', path) |
| control = _load_one(path) |
| logging.debug(' -> %s', control) |
| controls.append(control) |
| return controls |
| |
| |
| _ROOT_DIR = os.path.realpath( |
| os.path.join(os.path.realpath(__file__), "../../..")) |
| |
| |
| def _enumerate_files() -> Generator[str, None, None]: |
| for ttype in ['client', 'server']: |
| tpath = os.path.join(_ROOT_DIR, ttype) |
| for path in glob.iglob(tpath + '/site_tests/*/control*'): |
| # There are some text files with names like control_sequence.txt |
| _, ext = os.path.splitext(path) |
| if ext: |
| continue |
| yield path |
| |
| |
| def _load_one(path: str) -> Control: |
| with open(path) as r: |
| text = r.read() |
| module = ast.parse(text) |
| name = _extract_name(module) |
| category, name = _categorize_name(name) |
| return Control( |
| path=path, |
| category=category, |
| name=name, |
| suites=_extract_suites(module), |
| main_package=_extract_main_package(path, module) or '', |
| ) |
| |
| |
| def _extract_name(module: ast.Module) -> Optional[str]: |
| stmt = _find_last_global_assignment(module.body, 'NAME') |
| if stmt is None: |
| raise ControlFileError('No global NAME assignment') |
| name = _extract_str_value(stmt) |
| if not name: |
| raise ControlFileError('Empty value') |
| return name |
| |
| |
| def _find_last_global_assignment(stmts: List[ast.Assign], |
| name: str) -> Optional[ast.Assign]: |
| found = _find_global_assignments(stmts, name) |
| if len(found) > 0: |
| return found[-1] |
| return None |
| |
| |
| def _find_global_assignments(stmts: List[ast.Assign], |
| name: str) -> List[ast.Assign]: |
| found = [] |
| for stmt in stmts: |
| if isinstance(stmt, ast.Assign) and _contains_name(stmt.targets, name): |
| found.append(stmt) |
| return found |
| |
| |
| def _contains_name(targets: List[ast.Expr], want: str) -> bool: |
| for target in targets: |
| if not isinstance(target, ast.Name): |
| # We do not support complex lvalues. |
| # In particular, multi-valued assignments are not handled properly. |
| continue |
| name: ast.Name = target |
| if name.id == want: |
| return True |
| return False |
| |
| |
| def _extract_str_value(stmt: ast.Assign) -> str: |
| if not isinstance(stmt.value, ast.Constant): |
| raise ControlFileError( |
| 'Name assignment value is of type %s, want ast.Constant' % |
| type(stmt.value)) |
| v = str(stmt.value.value) |
| return v |
| |
| |
| def _categorize_name(name: str) -> Tuple[str, str]: |
| parts = name.split('_', 1) |
| if len(parts) == 2: |
| category, rest = parts[0], parts[1] |
| else: |
| category, rest = '', parts[0] |
| return category, rest |
| |
| |
| _SUITE_PREFIX_LEN = len('suite:') |
| |
| |
| def _extract_suites(module: ast.Module) -> List[str]: |
| stmt = _find_last_global_assignment(module.body, 'ATTRIBUTES') |
| if stmt is None: |
| return [] |
| v = _extract_str_value(stmt) |
| suites = [] |
| for attr in v.split(','): |
| attr = attr.strip() |
| if attr.startswith('suite:'): |
| suites.append(attr[_SUITE_PREFIX_LEN:]) |
| return suites |
| |
| |
| def _extract_main_package(path: str, module: ast.Module) -> Optional[str]: |
| fname = _extract_main_file(path, module) |
| if fname is None: |
| return None |
| relpath = os.path.relpath(os.path.dirname(path), _ROOT_DIR) |
| assert '.' not in relpath |
| return 'autotest_lib.%s.%s' % (relpath.replace('/', '.'), fname) |
| |
| |
| def _extract_main_file(path: str, module: ast.Module) -> Optional[str]: |
| calls = _find_run_test_calls(module) |
| if not calls: |
| logging.warning('Found no job.run_test() calls in %s', path) |
| return None |
| if len(calls) > 1: |
| logging.warning('Found %d job.run_test() calls in %s, want 1', len(calls), |
| path) |
| return None |
| return _extract_run_test_target(path, calls[0]) |
| |
| |
| def _find_run_test_calls(module: ast.Module) -> List[ast.Call]: |
| calls = [] |
| for stmt in module.body: |
| if isinstance(stmt, ast.Expr) and isinstance(stmt.value, ast.Call): |
| call = stmt.value |
| func = call.func |
| if (isinstance(func, ast.Attribute) and func.attr == 'run_test' and |
| isinstance(func.value, ast.Name) and func.value.id == 'job'): |
| calls.append(call) |
| return calls |
| |
| |
| def _extract_run_test_target(path: str, call: ast.Call) -> Optional[str]: |
| if len(call.args) != 1: |
| logging.warning('job.run_test() has %d arguments in %s, want 1', |
| len(call.args), path) |
| return None |
| arg = call.args[0] |
| if not isinstance(arg, ast.Constant): |
| logging.warning('job.run_test() has a non constant argument in %s', path) |
| return None |
| return arg.value |