| # Copyright 2022 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """This script emerges packages and retrieves their lints. |
| |
| Currently support is provided for both general and differential linting of C++ |
| with Clang Tidy and Rust with Cargo Clippy for all packages within platform2. |
| """ |
| |
| import collections |
| import json |
| import logging |
| import os |
| from pathlib import Path |
| import sys |
| from typing import DefaultDict, Dict, Iterable, List, Optional, Text, Tuple |
| |
| from chromite.lib import build_target_lib |
| from chromite.lib import commandline |
| from chromite.lib import constants |
| from chromite.lib import cros_build_lib |
| from chromite.lib import git |
| from chromite.lib import portage_util |
| from chromite.lib import terminal |
| from chromite.lib import workon_helper |
| from chromite.lib.parser import package_info |
| from chromite.service import toolchain |
| from chromite.utils import file_util |
| |
| |
| PLATFORM2_PATH = constants.CHROOT_SOURCE_ROOT / "src/platform2" |
| |
| |
| def create_fixes_cl(formatted_fixes: Text, bug: Optional[Text]) -> None: |
| """Make a commit in src/platform2 with all changes.""" |
| message = ( |
| "Apply generated linter fixes\n\n" |
| "This CL was generated by the lint_package chromite script.\n" |
| f"The following lints should be fixed:\n\n{formatted_fixes}\n\n" |
| f"BUG={bug}\n" |
| "TEST=CQ\n" |
| ) |
| |
| git.RunGit(PLATFORM2_PATH, ["add", "--all"]) |
| git.Commit(PLATFORM2_PATH, message) |
| |
| |
| def check_plat2_diff() -> bool: |
| """Check if src/platform2 has changes in it's diff.""" |
| diff = git.RawDiff(PLATFORM2_PATH, ".") |
| return bool(diff) |
| |
| |
| def parse_packages( |
| build_target: build_target_lib.BuildTarget, packages: List[str] |
| ) -> List[package_info.PackageInfo]: |
| """Parse packages and insert the category if none is given. |
| |
| Args: |
| build_target: build_target to find ebuild for |
| packages: user input package names to parse |
| |
| Returns: |
| A list of parsed PackageInfo objects |
| """ |
| package_infos: List[package_info.PackageInfo] = [] |
| for package in packages: |
| parsed = package_info.parse(package) |
| if not parsed.category: |
| # If a category is not specified, get it from the ebuild path. |
| if build_target.is_host(): |
| ebuild_path = portage_util.FindEbuildForPackage( |
| package, build_target.root |
| ) |
| else: |
| ebuild_path = portage_util.FindEbuildForBoardPackage( |
| package, build_target.name, build_target.root |
| ) |
| ebuild_data = portage_util.EBuild(ebuild_path) |
| parsed = package_info.parse(ebuild_data.package) |
| package_infos.append(parsed) |
| return package_infos |
| |
| |
| def make_relative_to_cros(file_path: str) -> Path: |
| """removes /mnt/host/source from file_paths if present.""" |
| path = Path(file_path) |
| try: |
| return path.relative_to(constants.CHROOT_SOURCE_ROOT) |
| except ValueError: |
| return path |
| |
| |
| def process_fixes_by_file( |
| lint: toolchain.LinterFinding, |
| file_lengths: Dict[Path, int], |
| allowed_subdirs: Optional[List[Text]], |
| ) -> Optional[DefaultDict[Path, List[toolchain.SuggestedFix]]]: |
| """Get fixes grouped by file if all the fixes apply to valid files. |
| |
| If any fixes modify invalid files this returns None. |
| |
| Args: |
| lint: LinterFinding to get fixes from |
| file_lengths: dictionary of previously determined file lengths which |
| may be modified with additional entries |
| allowed_subdirs: subdirectories in platform2 that we can modify or none |
| """ |
| if not lint.suggested_fixes: |
| return None |
| |
| new_fixes_by_file: DefaultDict[ |
| Path, List[toolchain.SuggestedFix] |
| ] = collections.defaultdict(list) |
| for fix in lint.suggested_fixes: |
| filepath = Path(fix.location.filepath) |
| # These are files that we locate, and are usually generated files. |
| if filepath.is_absolute(): |
| logging.warning( |
| "Skipped applying fix due to invalid path: %s", filepath |
| ) |
| return None |
| # Check if file is in an allowed subdirectory. |
| if allowed_subdirs and filepath.parts[0] not in allowed_subdirs: |
| logging.warning( |
| "Skipped applying fix due to invalid path: %s", filepath |
| ) |
| return None |
| # Make sure this file exists in platform2 |
| file_in_platform2 = PLATFORM2_PATH / filepath |
| if not file_in_platform2.exists(): |
| logging.warning( |
| "Skipped applying fix due to invalid path: %s", filepath |
| ) |
| return None |
| if file_in_platform2 not in file_lengths: |
| file_lengths[file_in_platform2] = len( |
| file_in_platform2.read_text(encoding="utf-8") |
| ) |
| if fix.location.end_offset > file_lengths[file_in_platform2]: |
| logging.warning( |
| "Skipped applying fix due to out of bounds change to: %s", |
| filepath, |
| ) |
| return None |
| new_fixes_by_file[file_in_platform2].append(fix) |
| |
| return new_fixes_by_file |
| |
| |
| def get_noconflict_fixes( |
| lints: List[toolchain.LinterFinding], |
| allowed_subdirs: Optional[List[Text]], |
| ) -> Tuple[ |
| DefaultDict[Path, List[toolchain.SuggestedFix]], |
| List[toolchain.LinterFinding], |
| ]: |
| """Get a conflict free set of replacements to apply for each file. |
| |
| Fixes will not be included in results if they: |
| A) include a replacement to a path which does not exist |
| B) include a replacement to a path outside of platform2 |
| C) include a replacement to file location that exceeds the file size |
| D) overlap a previous replacement. |
| |
| Args: |
| lints: List of lints to aggregate suggested fixes from. |
| allowed_subdirs: subdirectories in platform2 that we can modify or none |
| |
| Returns: |
| A tuple including: |
| 0) the mapping of paths to a list of their suggested fixes |
| 1) the list of lints which were fixed |
| """ |
| fixes_by_file: DefaultDict[ |
| Path, List[toolchain.SuggestedFix] |
| ] = collections.defaultdict(list) |
| lints_fixed = [] |
| file_lengths = {} |
| for lint in lints: |
| new_fixes_by_file = process_fixes_by_file( |
| lint, file_lengths, allowed_subdirs |
| ) |
| if not new_fixes_by_file: |
| continue |
| files_with_overlap = set( |
| filepath |
| for filepath, new_fixes in new_fixes_by_file.items() |
| if has_overlap(fixes_by_file[filepath], new_fixes) |
| ) |
| if files_with_overlap: |
| logging.warning( |
| "Skipped applying fix for %s due to conflicts in:\n\t%s.", |
| lint.name, |
| "\n\t".join(files_with_overlap), |
| ) |
| else: |
| for filepath, new_fixes in new_fixes_by_file.items(): |
| fixes_by_file[filepath].extend(new_fixes) |
| lints_fixed.append(lint) |
| |
| return fixes_by_file, lints_fixed |
| |
| |
| def has_overlap( |
| prior_fixes: List[toolchain.SuggestedFix], |
| new_fixes: List[toolchain.SuggestedFix], |
| ) -> bool: |
| """Check if new fixes have overlapping ranges with a prior replacement. |
| |
| Note: this implementation is n^2, but the amount of lints in a single file |
| is experimentally pretty small, so optimizing this is probably not a large |
| concern. |
| """ |
| for new in new_fixes: |
| for old in prior_fixes: |
| if ( |
| (old.location.start_offset <= new.location.start_offset) |
| and (new.location.start_offset <= old.location.end_offset) |
| ) or ( |
| (old.location.start_offset <= new.location.end_offset) |
| and (new.location.end_offset <= old.location.end_offset) |
| ): |
| return True |
| return False |
| |
| |
| def apply_edits(content: Text, fixes: List[toolchain.SuggestedFix]) -> Text: |
| """Modify a file by applying a list of fixes.""" |
| |
| # We need to be able to apply fixes in reverse order within a file to |
| # preserve code locations. |
| def fix_sort_key(fix: toolchain.SuggestedFix) -> int: |
| return fix.location.start_offset |
| |
| pieces = [] |
| content_end = len(content) |
| for fix in sorted(fixes, key=fix_sort_key, reverse=True): |
| pieces += [ |
| content[fix.location.end_offset : content_end], |
| fix.replacement, |
| ] |
| content_end = fix.location.start_offset |
| pieces.append(content[:content_end]) |
| |
| return "".join(reversed(pieces)) |
| |
| |
| def apply_fixes( |
| lints: List[toolchain.LinterFinding], |
| allowed_subdirs: Optional[List[Text]], |
| ) -> Tuple[List[toolchain.LinterFinding], Iterable[Path]]: |
| """Modify files in Platform2 to apply suggested fixes from linter findings. |
| |
| Some fixes which cannot be applied cleanly will be discarded (see the |
| `get_noconflict_fixes_by_file` description for more details). |
| |
| Args: |
| lints: LinterFindings to apply potential fixes from. |
| allowed_subdirs: subdirectories in platform2 that we can modify or none |
| |
| Returns: |
| A tuple including: |
| 0) The list of lints which were fixed |
| 1) The list of files which were modified |
| """ |
| |
| fixes_by_file, lints_fixed = get_noconflict_fixes(lints, allowed_subdirs) |
| |
| for filepath, fixes in fixes_by_file.items(): |
| file_content = filepath.read_text(encoding="utf-8") |
| rewrite = apply_edits(file_content, fixes) |
| filepath.write_text(rewrite, encoding="utf-8") |
| |
| return lints_fixed, fixes_by_file.keys() |
| |
| |
| def format_lint(lint: toolchain.LinterFinding) -> Text: |
| """Formats a lint for human-readable printing. |
| |
| Example output: |
| [ClangTidy] In 'path/to/file.c' on line 36: |
| Also in 'path/to/file.c' on line 40: |
| Also in 'path/to/file.c' on lines 50-53: |
| You did something bad, don't do it. |
| |
| Args: |
| lint: A linter finding from the toolchain service. |
| |
| Returns: |
| A correctly formatted string ready to be displayed to the user. |
| """ |
| |
| color = terminal.Color(True) |
| lines = [] |
| linter_prefix = color.Color( |
| terminal.Color.YELLOW, |
| f"[{lint.linter}]", |
| background_color=terminal.Color.BLACK, |
| ) |
| for loc in lint.locations: |
| filepath = make_relative_to_cros(loc.filepath) |
| if not lines: |
| location_prefix = f"\n{linter_prefix} In" |
| else: |
| location_prefix = " and in" |
| if loc.line_start != loc.line_end: |
| lines.append( |
| f"{location_prefix} '{filepath}' " |
| f"lines {loc.line_start}-{loc.line_end}:" |
| ) |
| else: |
| lines.append( |
| f"{location_prefix} '{filepath}' line {loc.line_start}:" |
| ) |
| message_lines = lint.message.split("\n") |
| for line in message_lines: |
| lines.append(f" {line}") |
| lines.append("") |
| return "\n".join(lines) |
| |
| |
| def json_format_lint(lint: toolchain.LinterFinding) -> Text: |
| """Formats a lint in json for machine parsing. |
| |
| Args: |
| lint: A linter finding from the toolchain service. |
| |
| Returns: |
| A correctly formatted json string ready to be displayed to the user. |
| """ |
| |
| def _dictify(original): |
| """Turns namedtuple's to dictionaries recursively.""" |
| # Handle namedtuples |
| if isinstance(original, tuple) and hasattr(original, "_asdict"): |
| return _dictify(original._asdict()) |
| # Handle collection types |
| elif hasattr(original, "__iter__"): |
| # Handle strings |
| if isinstance(original, (str, bytes)): |
| return original |
| # Handle dictionaries |
| elif isinstance(original, dict): |
| return {k: _dictify(v) for k, v in original.items()} |
| # Handle lists, sets, etc. |
| else: |
| return [_dictify(x) for x in original] |
| # Handle PackageInfo objects |
| elif isinstance(original, package_info.PackageInfo): |
| return original.atom |
| # Handle everything else |
| return original |
| |
| return json.dumps(_dictify(lint)) |
| |
| |
| def get_all_sysroots() -> List[Text]: |
| """Gets all available sysroots for both host and boards.""" |
| host_root = Path(build_target_lib.BuildTarget(None).root) |
| roots = [str(host_root)] |
| build_dir = host_root / "build" |
| for board in os.listdir(build_dir): |
| if board != "bin": |
| board_root = build_dir / board |
| if board_root.is_dir(): |
| roots.append(str(board_root)) |
| return roots |
| |
| |
| def get_arg_parser() -> commandline.ArgumentParser: |
| """Creates an argument parser for this script.""" |
| default_board = cros_build_lib.GetDefaultBoard() |
| parser = commandline.ArgumentParser(description=__doc__) |
| |
| board_group = parser.add_mutually_exclusive_group() |
| board_group.add_argument( |
| "-b", |
| "--board", |
| "--build-target", |
| dest="board", |
| default=default_board, |
| help="The board to emerge packages for", |
| ) |
| board_group.add_argument( |
| "--host", action="store_true", help="emerge for host instead of board." |
| ) |
| parser.add_argument( |
| "--fetch-only", |
| action="store_true", |
| help="Fetch lints from previous run without resetting or calling " |
| "emerge.", |
| ) |
| parser.add_argument( |
| "--apply-fixes", |
| action="store_true", |
| help="Apply suggested fixes from linters.", |
| ) |
| parser.add_argument( |
| "--create-cl", |
| action="store_true", |
| help="Generate a CL for fixes.", |
| ) |
| parser.add_argument( |
| "--bug", |
| default="None", |
| help="Sets the tracking bug for the CL if --create_cl is used.", |
| ) |
| parser.add_argument( |
| "--filter-names", |
| help="Only keep lints if the name contains one of the provided filters", |
| action="append", |
| ) |
| parser.add_argument( |
| "--restrict-fix-subdirs", |
| help="Only fix lints if all fixes are in the given directories", |
| action="append", |
| ) |
| parser.add_argument( |
| "--differential", |
| action="store_true", |
| help="only lint lines touched by the last commit", |
| ) |
| parser.add_argument( |
| "-o", |
| "--output", |
| default=sys.stdout, |
| help="File to use instead of stdout.", |
| ) |
| parser.add_argument( |
| "--json", action="store_true", help="Output lints in JSON format." |
| ) |
| parser.add_argument( |
| "--no-clippy", |
| dest="clippy", |
| action="store_false", |
| help="Disable cargo clippy linter.", |
| ) |
| parser.add_argument( |
| "--no-tidy", |
| dest="tidy", |
| action="store_false", |
| help="Disable clang tidy linter.", |
| ) |
| parser.add_argument( |
| "--no-golint", |
| dest="golint", |
| action="store_false", |
| help="Disable golint linter.", |
| ) |
| parser.add_argument( |
| "packages", |
| nargs="*", |
| help="package(s) to emerge and retrieve lints for", |
| ) |
| |
| return parser |
| |
| |
| def parse_args(argv: List[str]): |
| """Parses arguments in argv and returns the options.""" |
| parser = get_arg_parser() |
| opts = parser.parse_args(argv) |
| opts.Freeze() |
| |
| # A package must be specified unless we are in fetch-only mode |
| if not (opts.fetch_only or opts.packages): |
| parser.error("Emerge mode requires specified package(s).") |
| if opts.fetch_only and opts.packages: |
| parser.error("Cannot specify packages for fetch-only mode.") |
| |
| # A board must be specified unless we are in fetch-only mode |
| if not (opts.fetch_only or opts.board or opts.host): |
| parser.error("Emerge mode requires either --board or --host.") |
| |
| # Require apply_fix for flags that only affect this mode |
| if opts.restrict_fix_subdirs and not opts.apply_fixes: |
| parser.error( |
| "--restrict-fix-subdirs is meaningless if fixes aren't applied" |
| ) |
| |
| if opts.create_cl and not opts.apply_fixes: |
| parser.error("--create-cl not allowed if fixes aren't applied") |
| |
| if opts.bug != "None" and not opts.create_cl: |
| parser.error("--bug not allowed if a CL is not being created") |
| |
| return opts |
| |
| |
| def filter_lints( |
| lints: List[toolchain.LinterFinding], names_filters: List[Text] |
| ) -> List[toolchain.LinterFinding]: |
| """Filter linter finding by name.""" |
| return [l for l in lints if any(f in l.name for f in names_filters)] |
| |
| |
| def main(argv: List[str]) -> None: |
| cros_build_lib.AssertInsideChroot() |
| opts = parse_args(argv) |
| |
| if opts.host: |
| # BuildTarget interprets None as host target |
| build_target = build_target_lib.BuildTarget(None) |
| else: |
| build_target = build_target_lib.BuildTarget(opts.board) |
| packages = parse_packages(build_target, opts.packages) |
| package_atoms = [x.atom for x in packages] |
| |
| with workon_helper.WorkonScope(build_target, package_atoms): |
| build_linter = toolchain.BuildLinter( |
| packages, build_target.root, opts.differential |
| ) |
| create_cl = False |
| if opts.apply_fixes and opts.create_cl: |
| if not check_plat2_diff(): |
| create_cl = True |
| else: |
| create_cl = cros_build_lib.BooleanPrompt( |
| "Platform2 contains uncommited changes which will be " |
| "added to the generated cl. Would you still like to " |
| "create a CL from fixes?" |
| ) |
| |
| if opts.fetch_only: |
| if opts.apply_fixes: |
| logging.warning( |
| "Apply fixes with fetch_only may lead to fixes being" |
| " applied incorrectly if source files have changed!" |
| ) |
| if opts.host or opts.board: |
| roots = [build_target.root] |
| else: |
| roots = get_all_sysroots() |
| lints = [] |
| for root in roots: |
| build_linter.sysroot = root |
| lints.extend( |
| build_linter.fetch_findings( |
| use_clippy=opts.clippy, |
| use_tidy=opts.tidy, |
| use_golint=opts.golint, |
| ) |
| ) |
| else: |
| lints = build_linter.emerge_with_linting( |
| use_clippy=opts.clippy, |
| use_tidy=opts.tidy, |
| use_golint=opts.golint, |
| ) |
| |
| if opts.filter_names: |
| lints = filter_lints(lints, opts.filter_names) |
| |
| if opts.json: |
| formatted_output_inner = ",\n".join(json_format_lint(l) for l in lints) |
| formatted_output = f"[{formatted_output_inner}]" |
| else: |
| formatted_output = "\n".join(format_lint(l) for l in lints) |
| |
| if opts.apply_fixes: |
| fixed_lints, modified_files = apply_fixes( |
| lints, opts.restrict_fix_subdirs |
| ) |
| if opts.json: |
| formatted_fixes_inner = ",\n".join( |
| json_format_lint(l) for l in lints |
| ) |
| formatted_fixes = f"[{formatted_fixes_inner}]" |
| else: |
| formatted_fixes = "\n".join(format_lint(l) for l in fixed_lints) |
| |
| if create_cl: |
| if fixed_lints: |
| create_fixes_cl(formatted_fixes, opts.bug) |
| else: |
| logging.warning( |
| "Skipped creating CL since no fixes were applied." |
| ) |
| |
| with file_util.Open(opts.output, "w") as output_file: |
| output_file.write(formatted_output) |
| if not opts.json: |
| output_file.write(f"\nFound {len(lints)} lints.") |
| if opts.apply_fixes: |
| output_file.write("\n\n\n--------- Fixed Problems ---------\n\n") |
| output_file.write(formatted_fixes) |
| if not opts.json: |
| output_file.write( |
| f"\nFixed {len(fixed_lints)}/{len(lints)} lints." |
| ) |
| output_file.write("\n\n\n--------- Modified Files ---------\n\n") |
| output_file.write("\n".join(str(f) for f in sorted(modified_files))) |
| output_file.write("\n") |